dora_message/
daemon_to_coordinator.rs

1use std::collections::BTreeMap;
2
3pub use crate::common::{
4    DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
5};
6use crate::{
7    BuildId, DataflowId, common::DaemonId, current_crate_version, id::NodeId, versions_compatible,
8};
9
10#[derive(Debug, serde::Serialize, serde::Deserialize)]
11pub enum CoordinatorRequest {
12    Register(DaemonRegisterRequest),
13    Event {
14        daemon_id: DaemonId,
15        event: DaemonEvent,
16    },
17}
18
19#[derive(Debug, serde::Serialize, serde::Deserialize)]
20pub struct DaemonRegisterRequest {
21    dora_version: semver::Version,
22    pub machine_id: Option<String>,
23}
24
25impl DaemonRegisterRequest {
26    pub fn new(machine_id: Option<String>) -> Self {
27        Self {
28            dora_version: current_crate_version(),
29            machine_id,
30        }
31    }
32
33    pub fn check_version(&self) -> Result<(), String> {
34        let crate_version = current_crate_version();
35        let specified_version = &self.dora_version;
36
37        if versions_compatible(&crate_version, specified_version)? {
38            Ok(())
39        } else {
40            Err(format!(
41                "version mismatch: message format v{} is not compatible \
42                with expected message format v{crate_version}",
43                self.dora_version
44            ))
45        }
46    }
47}
48
49#[derive(Debug, serde::Serialize, serde::Deserialize)]
50pub enum DaemonEvent {
51    BuildResult {
52        build_id: BuildId,
53        result: Result<(), String>,
54    },
55    SpawnResult {
56        dataflow_id: DataflowId,
57        result: Result<(), String>,
58    },
59    AllNodesReady {
60        dataflow_id: DataflowId,
61        exited_before_subscribe: Vec<NodeId>,
62    },
63    AllNodesFinished {
64        dataflow_id: DataflowId,
65        result: DataflowDaemonResult,
66    },
67    Heartbeat,
68    Log(LogMessage),
69    Exit,
70    NodeMetrics {
71        dataflow_id: DataflowId,
72        metrics: BTreeMap<NodeId, NodeMetrics>,
73    },
74}
75
76/// Resource metrics for a node process
77#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78pub struct NodeMetrics {
79    /// Process ID
80    pub pid: u32,
81    /// CPU usage percentage (0-100 per core)
82    pub cpu_usage: f32,
83    /// Memory usage in bytes
84    pub memory_bytes: u64,
85    /// Disk read bytes per second (if available)
86    pub disk_read_bytes: Option<u64>,
87    /// Disk write bytes per second (if available)
88    pub disk_write_bytes: Option<u64>,
89}
90
91#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
92pub struct DataflowDaemonResult {
93    pub timestamp: uhlc::Timestamp,
94    pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
95}
96
97impl DataflowDaemonResult {
98    pub fn is_ok(&self) -> bool {
99        self.node_results.values().all(|r| r.is_ok())
100    }
101}
102
103#[derive(Debug, serde::Deserialize, serde::Serialize)]
104pub enum DaemonCoordinatorReply {
105    TriggerBuildResult(Result<(), String>),
106    TriggerSpawnResult(Result<(), String>),
107    ReloadResult(Result<(), String>),
108    StopResult(Result<(), String>),
109    DestroyResult {
110        result: Result<(), String>,
111        #[serde(skip)]
112        notify: Option<tokio::sync::oneshot::Sender<()>>,
113    },
114    Logs(Result<Vec<u8>, String>),
115}