dora_message/
common.rs

1use core::fmt;
2use std::borrow::Cow;
3
4use aligned_vec::{AVec, ConstAlign};
5use eyre::Context as _;
6use uuid::Uuid;
7
8use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId};
9
10pub use log::Level as LogLevel;
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13#[must_use]
14pub struct LogMessage {
15    pub build_id: Option<BuildId>,
16    pub dataflow_id: Option<DataflowId>,
17    pub node_id: Option<NodeId>,
18    pub daemon_id: Option<DaemonId>,
19    pub level: LogLevelOrStdout,
20    pub target: Option<String>,
21    pub module_path: Option<String>,
22    pub file: Option<String>,
23    pub line: Option<u32>,
24    pub message: String,
25}
26
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
28pub enum LogLevelOrStdout {
29    LogLevel(LogLevel),
30    Stdout,
31}
32
33impl From<LogLevel> for LogLevelOrStdout {
34    fn from(level: LogLevel) -> Self {
35        Self::LogLevel(level)
36    }
37}
38
39#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
40pub struct NodeError {
41    pub timestamp: uhlc::Timestamp,
42    pub cause: NodeErrorCause,
43    pub exit_status: NodeExitStatus,
44}
45
46impl std::fmt::Display for NodeError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        if let NodeErrorCause::FailedToSpawn(err) = &self.cause {
49            return write!(f, "failed to spawn node: {err}");
50        }
51        match &self.exit_status {
52            NodeExitStatus::Success => write!(f, "<success>"),
53            NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
54            NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
55            NodeExitStatus::Signal(signal) => {
56                let signal_str: Cow<_> = match signal {
57                    1 => "SIGHUP".into(),
58                    2 => "SIGINT".into(),
59                    3 => "SIGQUIT".into(),
60                    4 => "SIGILL".into(),
61                    6 => "SIGABRT".into(),
62                    8 => "SIGFPE".into(),
63                    9 => "SIGKILL".into(),
64                    11 => "SIGSEGV".into(),
65                    13 => "SIGPIPE".into(),
66                    14 => "SIGALRM".into(),
67                    15 => "SIGTERM".into(),
68                    22 => "SIGABRT".into(),
69                    23 => "NSIG".into(),
70                    other => other.to_string().into(),
71                };
72                if matches!(self.cause, NodeErrorCause::GraceDuration) {
73                    write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
74                } else {
75                    write!(f, "exited because of signal {signal_str}")
76                }
77            }
78            NodeExitStatus::Unknown => write!(f, "unknown exit status"),
79        }?;
80
81        match &self.cause {
82            NodeErrorCause::GraceDuration => {}, // handled above
83            NodeErrorCause::Cascading { caused_by_node } => write!(
84                f,
85                ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
86            )?,
87            NodeErrorCause::FailedToSpawn(_) => unreachable!(), // handled above
88            NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
89            NodeErrorCause::Other { stderr } => {
90                let line: &str = "---------------------------------------------------------------------------------\n";
91                let stderr = stderr.trim_end();
92                write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
93            },
94        }
95
96        Ok(())
97    }
98}
99
100#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
101pub enum NodeErrorCause {
102    /// Node was killed because it didn't react to a stop message in time.
103    GraceDuration,
104    /// Node failed because another node failed before,
105    Cascading {
106        caused_by_node: NodeId,
107    },
108    FailedToSpawn(String),
109    Other {
110        stderr: String,
111    },
112}
113
114#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
115pub enum NodeExitStatus {
116    Success,
117    IoError(String),
118    ExitCode(i32),
119    Signal(i32),
120    Unknown,
121}
122
123impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
124    fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
125        match result {
126            Ok(status) => {
127                if status.success() {
128                    NodeExitStatus::Success
129                } else if let Some(code) = status.code() {
130                    Self::ExitCode(code)
131                } else {
132                    #[cfg(unix)]
133                    {
134                        use std::os::unix::process::ExitStatusExt;
135                        if let Some(signal) = status.signal() {
136                            return Self::Signal(signal);
137                        }
138                    }
139                    Self::Unknown
140                }
141            }
142            Err(err) => Self::IoError(err.to_string()),
143        }
144    }
145}
146
147#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
148pub struct Timestamped<T> {
149    pub inner: T,
150    pub timestamp: uhlc::Timestamp,
151}
152
153impl<T> Timestamped<T>
154where
155    T: serde::Serialize,
156{
157    pub fn serialize(&self) -> Vec<u8> {
158        bincode::serialize(self).unwrap()
159    }
160}
161
162impl Timestamped<InterDaemonEvent> {
163    pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
164        bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
165    }
166}
167
168pub type SharedMemoryId = String;
169
170#[derive(serde::Serialize, serde::Deserialize, Clone)]
171pub enum DataMessage {
172    Vec(AVec<u8, ConstAlign<128>>),
173    SharedMemory {
174        shared_memory_id: String,
175        len: usize,
176        drop_token: DropToken,
177    },
178}
179
180impl DataMessage {
181    pub fn drop_token(&self) -> Option<DropToken> {
182        match self {
183            DataMessage::Vec(_) => None,
184            DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
185        }
186    }
187}
188
189impl fmt::Debug for DataMessage {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        match self {
192            Self::Vec(v) => f
193                .debug_struct("Vec")
194                .field("len", &v.len())
195                .finish_non_exhaustive(),
196            Self::SharedMemory {
197                shared_memory_id,
198                len,
199                drop_token,
200            } => f
201                .debug_struct("SharedMemory")
202                .field("shared_memory_id", shared_memory_id)
203                .field("len", len)
204                .field("drop_token", drop_token)
205                .finish(),
206        }
207    }
208}
209
210#[derive(
211    Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
212)]
213pub struct DropToken(Uuid);
214
215impl DropToken {
216    pub fn generate() -> Self {
217        Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
218    }
219}
220
221#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
222pub struct DaemonId {
223    machine_id: Option<String>,
224    uuid: Uuid,
225}
226
227impl DaemonId {
228    pub fn new(machine_id: Option<String>) -> Self {
229        DaemonId {
230            machine_id,
231            uuid: Uuid::new_v4(),
232        }
233    }
234
235    pub fn matches_machine_id(&self, machine_id: &str) -> bool {
236        self.machine_id
237            .as_ref()
238            .map(|id| id == machine_id)
239            .unwrap_or_default()
240    }
241
242    pub fn machine_id(&self) -> Option<&str> {
243        self.machine_id.as_deref()
244    }
245}
246
247impl std::fmt::Display for DaemonId {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        if let Some(id) = &self.machine_id {
250            write!(f, "{id}-")?;
251        }
252        write!(f, "{}", self.uuid)
253    }
254}
255
256#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
257pub struct GitSource {
258    pub repo: String,
259    pub commit_hash: String,
260}