1use std::{
2 collections::{BTreeMap, BTreeSet},
3 net::IpAddr,
4};
5
6use uuid::Uuid;
7
8pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
9use crate::{BuildId, common::DaemonId, descriptor::Descriptor, id::NodeId};
10
11#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
12pub enum ControlRequestReply {
13 Error(String),
14 CoordinatorStopped,
15 DataflowBuildTriggered {
16 build_id: BuildId,
17 },
18 DataflowBuildFinished {
19 build_id: BuildId,
20 result: Result<(), String>,
21 },
22 DataflowStartTriggered {
23 uuid: Uuid,
24 },
25 DataflowSpawned {
26 uuid: Uuid,
27 },
28 DataflowReloaded {
29 uuid: Uuid,
30 },
31 DataflowStopped {
32 uuid: Uuid,
33 result: DataflowResult,
34 },
35 DataflowList(DataflowList),
36 DataflowInfo {
37 uuid: Uuid,
38 name: Option<String>,
39 descriptor: Descriptor,
40 },
41 DestroyOk,
42 DaemonConnected(bool),
43 ConnectedDaemons(BTreeSet<DaemonId>),
44 Logs(Vec<u8>),
45 CliAndDefaultDaemonIps {
46 default_daemon: Option<IpAddr>,
47 cli: Option<IpAddr>,
48 },
49 NodeInfoList(Vec<NodeInfo>),
50}
51
52#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
53pub struct NodeInfo {
54 pub dataflow_id: Uuid,
55 pub dataflow_name: Option<String>,
56 pub node_id: NodeId,
57 pub daemon_id: DaemonId,
58 pub metrics: Option<NodeMetricsInfo>,
59}
60
61#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
63pub struct NodeMetricsInfo {
64 pub pid: u32,
66 pub cpu_usage: f32,
68 pub memory_mb: f64,
70 pub disk_read_mb_s: Option<f64>,
72 pub disk_write_mb_s: Option<f64>,
74}
75
76#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
77pub struct DataflowResult {
78 pub uuid: Uuid,
79 pub timestamp: uhlc::Timestamp,
80 pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
81}
82
83impl DataflowResult {
84 pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self {
85 Self {
86 uuid,
87 timestamp,
88 node_results: Default::default(),
89 }
90 }
91
92 pub fn is_ok(&self) -> bool {
93 self.node_results.values().all(|r| r.is_ok())
94 }
95}
96
97#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
98pub struct DataflowList(pub Vec<DataflowListEntry>);
99
100impl DataflowList {
101 pub fn get_active(&self) -> Vec<DataflowIdAndName> {
102 self.0
103 .iter()
104 .filter(|d| d.status == DataflowStatus::Running)
105 .map(|d| d.id.clone())
106 .collect()
107 }
108}
109
110#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
111pub struct DataflowListEntry {
112 pub id: DataflowIdAndName,
113 pub status: DataflowStatus,
114}
115
116#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
117pub enum DataflowStatus {
118 Running,
119 Finished,
120 Failed,
121}
122
123#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
124pub struct DataflowIdAndName {
125 pub uuid: Uuid,
126 pub name: Option<String>,
127}
128
129impl std::fmt::Display for DataflowIdAndName {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 if let Some(name) = &self.name {
132 write!(f, "[{name}] {}", self.uuid)
133 } else {
134 write!(f, "[<unnamed>] {}", self.uuid)
135 }
136 }
137}