dora_message/
daemon_to_coordinator.rs1use std::collections::BTreeMap;
2
3pub use crate::common::{
4 DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
5};
6use crate::{
7 BuildId, DataflowId, common::DaemonId, current_crate_version, id::NodeId, versions_compatible,
8};
9
10#[derive(Debug, serde::Serialize, serde::Deserialize)]
11pub enum CoordinatorRequest {
12 Register(DaemonRegisterRequest),
13 Event {
14 daemon_id: DaemonId,
15 event: DaemonEvent,
16 },
17}
18
19#[derive(Debug, serde::Serialize, serde::Deserialize)]
20pub struct DaemonRegisterRequest {
21 dora_version: semver::Version,
22 pub machine_id: Option<String>,
23}
24
25impl DaemonRegisterRequest {
26 pub fn new(machine_id: Option<String>) -> Self {
27 Self {
28 dora_version: current_crate_version(),
29 machine_id,
30 }
31 }
32
33 pub fn check_version(&self) -> Result<(), String> {
34 let crate_version = current_crate_version();
35 let specified_version = &self.dora_version;
36
37 if versions_compatible(&crate_version, specified_version)? {
38 Ok(())
39 } else {
40 Err(format!(
41 "version mismatch: message format v{} is not compatible \
42 with expected message format v{crate_version}",
43 self.dora_version
44 ))
45 }
46 }
47}
48
49#[derive(Debug, serde::Serialize, serde::Deserialize)]
50pub enum DaemonEvent {
51 BuildResult {
52 build_id: BuildId,
53 result: Result<(), String>,
54 },
55 SpawnResult {
56 dataflow_id: DataflowId,
57 result: Result<(), String>,
58 },
59 AllNodesReady {
60 dataflow_id: DataflowId,
61 exited_before_subscribe: Vec<NodeId>,
62 },
63 AllNodesFinished {
64 dataflow_id: DataflowId,
65 result: DataflowDaemonResult,
66 },
67 Heartbeat,
68 Log(LogMessage),
69 Exit,
70 NodeMetrics {
71 dataflow_id: DataflowId,
72 metrics: BTreeMap<NodeId, NodeMetrics>,
73 },
74}
75
76#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78pub struct NodeMetrics {
79 pub pid: u32,
81 pub cpu_usage: f32,
83 pub memory_bytes: u64,
85 pub disk_read_bytes: Option<u64>,
87 pub disk_write_bytes: Option<u64>,
89}
90
91#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
92pub struct DataflowDaemonResult {
93 pub timestamp: uhlc::Timestamp,
94 pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
95}
96
97impl DataflowDaemonResult {
98 pub fn is_ok(&self) -> bool {
99 self.node_results.values().all(|r| r.is_ok())
100 }
101}
102
103#[derive(Debug, serde::Deserialize, serde::Serialize)]
104pub enum DaemonCoordinatorReply {
105 TriggerBuildResult(Result<(), String>),
106 TriggerSpawnResult(Result<(), String>),
107 ReloadResult(Result<(), String>),
108 StopResult(Result<(), String>),
109 DestroyResult {
110 result: Result<(), String>,
111 #[serde(skip)]
112 notify: Option<tokio::sync::oneshot::Sender<()>>,
113 },
114 Logs(Result<Vec<u8>, String>),
115}