dora_message/
daemon_to_coordinator.rs

1use std::collections::BTreeMap;
2
3pub use crate::common::{
4    DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
5};
6use crate::{
7    BuildId, DataflowId, common::DaemonId, current_crate_version, id::NodeId, versions_compatible,
8};
9
10#[derive(Debug, serde::Serialize, serde::Deserialize)]
11pub enum CoordinatorRequest {
12    Register(DaemonRegisterRequest),
13    Event {
14        daemon_id: DaemonId,
15        event: DaemonEvent,
16    },
17}
18
19#[derive(Debug, serde::Serialize, serde::Deserialize)]
20pub struct DaemonRegisterRequest {
21    dora_version: semver::Version,
22    pub machine_id: Option<String>,
23}
24
25impl DaemonRegisterRequest {
26    pub fn new(machine_id: Option<String>) -> Self {
27        Self {
28            dora_version: current_crate_version(),
29            machine_id,
30        }
31    }
32
33    pub fn check_version(&self) -> Result<(), String> {
34        let crate_version = current_crate_version();
35        let specified_version = &self.dora_version;
36
37        if versions_compatible(&crate_version, specified_version)? {
38            Ok(())
39        } else {
40            Err(format!(
41                "version mismatch: message format v{} is not compatible \
42                with expected message format v{crate_version}",
43                self.dora_version
44            ))
45        }
46    }
47}
48
49#[derive(Debug, serde::Serialize, serde::Deserialize)]
50pub enum DaemonEvent {
51    BuildResult {
52        build_id: BuildId,
53        result: Result<(), String>,
54    },
55    SpawnResult {
56        dataflow_id: DataflowId,
57        result: Result<(), String>,
58    },
59    AllNodesReady {
60        dataflow_id: DataflowId,
61        exited_before_subscribe: Vec<NodeId>,
62    },
63    AllNodesFinished {
64        dataflow_id: DataflowId,
65        result: DataflowDaemonResult,
66    },
67    Heartbeat,
68    Log(LogMessage),
69    Exit,
70}
71
72#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
73pub struct DataflowDaemonResult {
74    pub timestamp: uhlc::Timestamp,
75    pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
76}
77
78impl DataflowDaemonResult {
79    pub fn is_ok(&self) -> bool {
80        self.node_results.values().all(|r| r.is_ok())
81    }
82}
83
84#[derive(Debug, serde::Deserialize, serde::Serialize)]
85pub enum DaemonCoordinatorReply {
86    TriggerBuildResult(Result<(), String>),
87    TriggerSpawnResult(Result<(), String>),
88    ReloadResult(Result<(), String>),
89    StopResult(Result<(), String>),
90    DestroyResult {
91        result: Result<(), String>,
92        #[serde(skip)]
93        notify: Option<tokio::sync::oneshot::Sender<()>>,
94    },
95    Logs(Result<Vec<u8>, String>),
96}