dora_message/
coordinator_to_cli.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    net::IpAddr,
4};
5
6use uuid::Uuid;
7
8pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
9use crate::{BuildId, common::DaemonId, descriptor::Descriptor, id::NodeId};
10
11#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
12pub enum ControlRequestReply {
13    Error(String),
14    CoordinatorStopped,
15    DataflowBuildTriggered {
16        build_id: BuildId,
17    },
18    DataflowBuildFinished {
19        build_id: BuildId,
20        result: Result<(), String>,
21    },
22    DataflowStartTriggered {
23        uuid: Uuid,
24    },
25    DataflowSpawned {
26        uuid: Uuid,
27    },
28    DataflowReloaded {
29        uuid: Uuid,
30    },
31    DataflowStopped {
32        uuid: Uuid,
33        result: DataflowResult,
34    },
35    DataflowList(DataflowList),
36    DataflowInfo {
37        uuid: Uuid,
38        name: Option<String>,
39        descriptor: Descriptor,
40    },
41    DestroyOk,
42    DaemonConnected(bool),
43    ConnectedDaemons(BTreeSet<DaemonId>),
44    Logs(Vec<u8>),
45    CliAndDefaultDaemonIps {
46        default_daemon: Option<IpAddr>,
47        cli: Option<IpAddr>,
48    },
49    NodeInfoList(Vec<NodeInfo>),
50}
51
52#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
53pub struct NodeInfo {
54    pub dataflow_id: Uuid,
55    pub dataflow_name: Option<String>,
56    pub node_id: NodeId,
57    pub daemon_id: DaemonId,
58    pub metrics: Option<NodeMetricsInfo>,
59}
60
61/// Resource metrics for a node (from daemon)
62#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
63pub struct NodeMetricsInfo {
64    /// Process ID
65    pub pid: u32,
66    /// CPU usage percentage (0-100 per core)
67    pub cpu_usage: f32,
68    /// Memory usage in megabytes
69    pub memory_mb: f64,
70    /// Disk read MB/s (if available)
71    pub disk_read_mb_s: Option<f64>,
72    /// Disk write MB/s (if available)
73    pub disk_write_mb_s: Option<f64>,
74}
75
76#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
77pub struct DataflowResult {
78    pub uuid: Uuid,
79    pub timestamp: uhlc::Timestamp,
80    pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
81}
82
83impl DataflowResult {
84    pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self {
85        Self {
86            uuid,
87            timestamp,
88            node_results: Default::default(),
89        }
90    }
91
92    pub fn is_ok(&self) -> bool {
93        self.node_results.values().all(|r| r.is_ok())
94    }
95}
96
97#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
98pub struct DataflowList(pub Vec<DataflowListEntry>);
99
100impl DataflowList {
101    pub fn get_active(&self) -> Vec<DataflowIdAndName> {
102        self.0
103            .iter()
104            .filter(|d| d.status == DataflowStatus::Running)
105            .map(|d| d.id.clone())
106            .collect()
107    }
108}
109
110#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
111pub struct DataflowListEntry {
112    pub id: DataflowIdAndName,
113    pub status: DataflowStatus,
114}
115
116#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
117pub enum DataflowStatus {
118    Running,
119    Finished,
120    Failed,
121}
122
123#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
124pub struct DataflowIdAndName {
125    pub uuid: Uuid,
126    pub name: Option<String>,
127}
128
129impl std::fmt::Display for DataflowIdAndName {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        if let Some(name) = &self.name {
132            write!(f, "[{name}] {}", self.uuid)
133        } else {
134            write!(f, "[<unnamed>] {}", self.uuid)
135        }
136    }
137}