1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::collections::BTreeMap;
pub use crate::common::{
DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
};
use crate::{
BuildId, DataflowId, common::DaemonId, current_crate_version, id::NodeId, versions_compatible,
};
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register(DaemonRegisterRequest),
/// Register a notification channel for daemon→coordinator RPC.
///
/// Sent on a second TCP connection after the initial registration.
/// The coordinator sets up a `CoordinatorNotify` tarpc server
/// on this connection.
RegisterNotificationChannel {
daemon_id: DaemonId,
},
/// Forward a log message from a daemon over the legacy raw-TCP path.
///
/// All other daemon→coordinator communication now uses the
/// `CoordinatorNotify` tarpc service on the notification channel.
Log {
daemon_id: DaemonId,
message: LogMessage,
},
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct DaemonRegisterRequest {
dora_version: semver::Version,
pub machine_id: Option<String>,
/// System-level unique machine identifier (e.g. `/etc/machine-id` on Linux).
/// Used to reliably detect whether CLI and daemon run on the same machine,
/// even behind NAT.
#[serde(default)]
pub machine_uid: Option<String>,
}
impl DaemonRegisterRequest {
pub fn new(machine_id: Option<String>) -> Self {
Self {
dora_version: current_crate_version(),
machine_id,
machine_uid: crate::common::machine_uid(),
}
}
pub fn check_version(&self) -> Result<(), String> {
let crate_version = current_crate_version();
let specified_version = &self.dora_version;
if versions_compatible(&crate_version, specified_version)? {
Ok(())
} else {
Err(format!(
"version mismatch: message format v{} is not compatible \
with expected message format v{crate_version}",
self.dora_version
))
}
}
}
/// Resource metrics for a node process
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NodeMetrics {
/// Process ID
pub pid: u32,
/// CPU usage percentage (0-100 per core)
pub cpu_usage: f32,
/// Memory usage in bytes
pub memory_bytes: u64,
/// Disk read bytes per second (if available)
pub disk_read_bytes: Option<u64>,
/// Disk write bytes per second (if available)
pub disk_write_bytes: Option<u64>,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowDaemonResult {
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}
impl DataflowDaemonResult {
pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}
/// tarpc service for daemon→coordinator notifications.
///
/// The coordinator runs a tarpc server implementing this trait,
/// and each daemon holds a client to notify the coordinator about
/// events such as node readiness, dataflow completion, and metrics.
#[tarpc::service]
pub trait CoordinatorNotify {
/// Report that all local nodes on this daemon are ready.
async fn all_nodes_ready(dataflow_id: DataflowId, exited_before_subscribe: Vec<NodeId>);
/// Report that all nodes on this daemon have finished.
async fn all_nodes_finished(dataflow_id: DataflowId, result: DataflowDaemonResult);
/// Daemon heartbeat.
async fn heartbeat();
/// Forward a log message to the coordinator.
async fn log(message: LogMessage);
/// Notify the coordinator that this daemon is exiting.
async fn daemon_exit();
/// Report resource metrics for running nodes.
async fn node_metrics(dataflow_id: DataflowId, metrics: BTreeMap<NodeId, NodeMetrics>);
/// Report that a build has completed (or failed) on this daemon.
async fn build_result(build_id: BuildId, result: Result<(), String>);
/// Report that a dataflow spawn has completed (or failed) on this daemon.
async fn spawn_result(dataflow_id: DataflowId, result: Result<(), String>);
}