dora_message/
common.rs

1use core::fmt;
2use std::{borrow::Cow, collections::BTreeMap};
3
4use aligned_vec::{AVec, ConstAlign};
5use chrono::{DateTime, Utc};
6use eyre::Context as _;
7use serde::{Deserialize, Deserializer};
8use uuid::Uuid;
9
10use crate::{BuildId, DataflowId, daemon_to_daemon::InterDaemonEvent, id::NodeId};
11
12pub use log::Level as LogLevel;
13
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
15#[must_use]
16pub struct LogMessage {
17    pub build_id: Option<BuildId>,
18    pub dataflow_id: Option<DataflowId>,
19    pub node_id: Option<NodeId>,
20    pub daemon_id: Option<DaemonId>,
21    pub level: LogLevelOrStdout,
22    pub target: Option<String>,
23    pub module_path: Option<String>,
24    pub file: Option<String>,
25    pub line: Option<u32>,
26    pub message: String,
27    pub timestamp: DateTime<Utc>,
28    pub fields: Option<BTreeMap<String, String>>,
29}
30
31#[derive(Deserialize)]
32pub struct LogMessageHelper {
33    build_id: Option<BuildId>,
34    dataflow_id: Option<DataflowId>,
35    node_id: Option<NodeId>,
36    daemon_id: Option<DaemonId>,
37    level: LogLevelOrStdout,
38    target: Option<String>,
39    module_path: Option<String>,
40    file: Option<String>,
41    line: Option<u32>,
42    message: Option<String>,
43    timestamp: DateTime<Utc>,
44    fields: Option<BTreeMap<String, String>>,
45}
46
47impl From<LogMessageHelper> for LogMessage {
48    fn from(helper: LogMessageHelper) -> Self {
49        let fields = helper.fields.as_ref();
50        LogMessage {
51            build_id: helper.build_id.or(fields
52                .and_then(|f| f.get("build_id").cloned())
53                .map(|id| BuildId(Uuid::parse_str(&id).unwrap()))),
54            dataflow_id: helper.dataflow_id.or(fields
55                .and_then(|f| f.get("dataflow_id").cloned())
56                .map(|id| DataflowId::from(Uuid::parse_str(&id).unwrap()))),
57            node_id: helper.node_id.or(fields
58                .and_then(|f| f.get("node_id").cloned())
59                .map(|id| NodeId(id))),
60            daemon_id: helper
61                .daemon_id
62                .or(fields.and_then(|f| f.get("daemon_id").cloned()).map(|id| {
63                    let parts: Vec<&str> = id.splitn(2, '-').collect();
64                    if parts.len() == 2 {
65                        DaemonId {
66                            machine_id: Some(parts[0].to_string()),
67                            uuid: Uuid::parse_str(parts[1]).unwrap(),
68                        }
69                    } else {
70                        DaemonId {
71                            machine_id: None,
72                            uuid: Uuid::parse_str(&parts[0]).unwrap(),
73                        }
74                    }
75                })),
76            level: helper.level,
77            target: helper
78                .target
79                .or(fields.and_then(|f| f.get("target").cloned())),
80            module_path: helper
81                .module_path
82                .or(fields.and_then(|f| f.get("module_path").cloned())),
83            file: helper.file.or(fields.and_then(|f| f.get("file").cloned())),
84            line: helper.line.or(fields
85                .and_then(|f| f.get("line").cloned())
86                .and_then(|s| s.parse().ok())),
87            message: helper
88                .message
89                .or(fields.and_then(|f| f.get("message").cloned()))
90                .unwrap_or_default(),
91            fields: helper.fields,
92            timestamp: helper.timestamp,
93        }
94    }
95}
96
97#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
98#[serde(rename_all = "UPPERCASE")]
99pub enum LogLevelOrStdout {
100    #[serde(rename = "stdout")]
101    Stdout,
102    #[serde(untagged)]
103    LogLevel(LogLevel),
104}
105
106impl From<LogLevel> for LogLevelOrStdout {
107    fn from(level: LogLevel) -> Self {
108        Self::LogLevel(level)
109    }
110}
111
112#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
113pub struct NodeError {
114    pub timestamp: uhlc::Timestamp,
115    pub cause: NodeErrorCause,
116    pub exit_status: NodeExitStatus,
117}
118
119impl std::fmt::Display for NodeError {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        if let NodeErrorCause::FailedToSpawn(err) = &self.cause {
122            return write!(f, "failed to spawn node: {err}");
123        }
124        match &self.exit_status {
125            NodeExitStatus::Success => write!(f, "<success>"),
126            NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
127            NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
128            NodeExitStatus::Signal(signal) => {
129                let signal_str: Cow<_> = match signal {
130                    1 => "SIGHUP".into(),
131                    2 => "SIGINT".into(),
132                    3 => "SIGQUIT".into(),
133                    4 => "SIGILL".into(),
134                    6 => "SIGABRT".into(),
135                    8 => "SIGFPE".into(),
136                    9 => "SIGKILL".into(),
137                    11 => "SIGSEGV".into(),
138                    13 => "SIGPIPE".into(),
139                    14 => "SIGALRM".into(),
140                    15 => "SIGTERM".into(),
141                    22 => "SIGABRT".into(),
142                    23 => "NSIG".into(),
143                    other => other.to_string().into(),
144                };
145                if matches!(self.cause, NodeErrorCause::GraceDuration) {
146                    write!(
147                        f,
148                        "node was killed by dora because it didn't react to a stop message in time ({signal_str})"
149                    )
150                } else {
151                    write!(f, "exited because of signal {signal_str}")
152                }
153            }
154            NodeExitStatus::Unknown => write!(f, "unknown exit status"),
155        }?;
156
157        match &self.cause {
158            NodeErrorCause::GraceDuration => {} // handled above
159            NodeErrorCause::Cascading { caused_by_node } => write!(
160                f,
161                ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
162            )?,
163            NodeErrorCause::FailedToSpawn(_) => unreachable!(), // handled above
164            NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
165            NodeErrorCause::Other { stderr } => {
166                let line: &str = "---------------------------------------------------------------------------------\n";
167                let stderr = stderr.trim_end();
168                write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
169            }
170        }
171
172        Ok(())
173    }
174}
175
176#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
177pub enum NodeErrorCause {
178    /// Node was killed because it didn't react to a stop message in time.
179    GraceDuration,
180    /// Node failed because another node failed before,
181    Cascading {
182        caused_by_node: NodeId,
183    },
184    FailedToSpawn(String),
185    Other {
186        stderr: String,
187    },
188}
189
190#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
191pub enum NodeExitStatus {
192    Success,
193    IoError(String),
194    ExitCode(i32),
195    Signal(i32),
196    Unknown,
197}
198
199impl NodeExitStatus {
200    pub fn is_success(&self) -> bool {
201        matches!(self, NodeExitStatus::Success)
202    }
203}
204
205impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
206    fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
207        match result {
208            Ok(status) => {
209                if status.success() {
210                    NodeExitStatus::Success
211                } else if let Some(code) = status.code() {
212                    Self::ExitCode(code)
213                } else {
214                    #[cfg(unix)]
215                    {
216                        use std::os::unix::process::ExitStatusExt;
217                        if let Some(signal) = status.signal() {
218                            return Self::Signal(signal);
219                        }
220                    }
221                    Self::Unknown
222                }
223            }
224            Err(err) => Self::IoError(err.to_string()),
225        }
226    }
227}
228
229#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
230pub struct Timestamped<T> {
231    pub inner: T,
232    pub timestamp: uhlc::Timestamp,
233}
234
235impl<T> Timestamped<T>
236where
237    T: serde::Serialize,
238{
239    pub fn serialize(&self) -> Vec<u8> {
240        bincode::serialize(self).unwrap()
241    }
242}
243
244impl Timestamped<InterDaemonEvent> {
245    pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
246        bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
247    }
248}
249
250pub type SharedMemoryId = String;
251
252#[derive(serde::Serialize, serde::Deserialize, Clone)]
253pub enum DataMessage {
254    Vec(AVec<u8, ConstAlign<128>>),
255    SharedMemory {
256        shared_memory_id: String,
257        len: usize,
258        drop_token: DropToken,
259    },
260}
261
262impl DataMessage {
263    pub fn drop_token(&self) -> Option<DropToken> {
264        match self {
265            DataMessage::Vec(_) => None,
266            DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
267        }
268    }
269}
270
271impl fmt::Debug for DataMessage {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        match self {
274            Self::Vec(v) => f
275                .debug_struct("Vec")
276                .field("len", &v.len())
277                .finish_non_exhaustive(),
278            Self::SharedMemory {
279                shared_memory_id,
280                len,
281                drop_token,
282            } => f
283                .debug_struct("SharedMemory")
284                .field("shared_memory_id", shared_memory_id)
285                .field("len", len)
286                .field("drop_token", drop_token)
287                .finish(),
288        }
289    }
290}
291
292#[derive(
293    Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
294)]
295pub struct DropToken(Uuid);
296
297impl DropToken {
298    pub fn generate() -> Self {
299        Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
300    }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
304pub struct DaemonId {
305    machine_id: Option<String>,
306    uuid: Uuid,
307}
308
309impl DaemonId {
310    pub fn new(machine_id: Option<String>) -> Self {
311        DaemonId {
312            machine_id,
313            uuid: Uuid::new_v4(),
314        }
315    }
316
317    pub fn matches_machine_id(&self, machine_id: &str) -> bool {
318        self.machine_id
319            .as_ref()
320            .map(|id| id == machine_id)
321            .unwrap_or_default()
322    }
323
324    pub fn machine_id(&self) -> Option<&str> {
325        self.machine_id.as_deref()
326    }
327}
328
329impl std::fmt::Display for DaemonId {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        if let Some(id) = &self.machine_id {
332            write!(f, "{id}-")?;
333        }
334        write!(f, "{}", self.uuid)
335    }
336}
337
338#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
339pub struct GitSource {
340    pub repo: String,
341    pub commit_hash: String,
342}
343
344// Test roundtrip serialization of LogMessage
345#[cfg(test)]
346mod tests {
347    use super::*;
348    #[test]
349    fn test_log_message_serialization() {
350        let log_message = LogMessage {
351            build_id: Some(BuildId(Uuid::new_v4())),
352            dataflow_id: Some(DataflowId::from(Uuid::new_v4())),
353            node_id: Some(NodeId("node-1".to_string())),
354            daemon_id: Some(DaemonId::new(Some("machine-1".to_string()))),
355            level: LogLevelOrStdout::LogLevel(LogLevel::Info),
356            target: Some("target".to_string()),
357            module_path: Some("module::path".to_string()),
358            file: Some("file.rs".to_string()),
359            line: Some(42),
360            message: "This is a log message".to_string(),
361            timestamp: Utc::now(),
362            fields: Some(BTreeMap::from([("key".to_string(), "value".to_string())])),
363        };
364        let serialized = serde_yaml::to_string(&log_message).unwrap();
365        let deserialized: LogMessageHelper = serde_yaml::from_str(&serialized).unwrap();
366        assert_eq!(log_message, LogMessage::from(deserialized));
367    }
368}