dora_message/
common.rs

1use core::fmt;
2use std::borrow::Cow;
3
4use aligned_vec::{AVec, ConstAlign};
5use eyre::Context as _;
6use uuid::Uuid;
7
8use crate::{BuildId, DataflowId, daemon_to_daemon::InterDaemonEvent, id::NodeId};
9
10pub use log::Level as LogLevel;
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13#[must_use]
14pub struct LogMessage {
15    pub build_id: Option<BuildId>,
16    pub dataflow_id: Option<DataflowId>,
17    pub node_id: Option<NodeId>,
18    pub daemon_id: Option<DaemonId>,
19    pub level: LogLevelOrStdout,
20    pub target: Option<String>,
21    pub module_path: Option<String>,
22    pub file: Option<String>,
23    pub line: Option<u32>,
24    pub message: String,
25}
26
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
28pub enum LogLevelOrStdout {
29    LogLevel(LogLevel),
30    Stdout,
31}
32
33impl From<LogLevel> for LogLevelOrStdout {
34    fn from(level: LogLevel) -> Self {
35        Self::LogLevel(level)
36    }
37}
38
39#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
40pub struct NodeError {
41    pub timestamp: uhlc::Timestamp,
42    pub cause: NodeErrorCause,
43    pub exit_status: NodeExitStatus,
44}
45
46impl std::fmt::Display for NodeError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        if let NodeErrorCause::FailedToSpawn(err) = &self.cause {
49            return write!(f, "failed to spawn node: {err}");
50        }
51        match &self.exit_status {
52            NodeExitStatus::Success => write!(f, "<success>"),
53            NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
54            NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
55            NodeExitStatus::Signal(signal) => {
56                let signal_str: Cow<_> = match signal {
57                    1 => "SIGHUP".into(),
58                    2 => "SIGINT".into(),
59                    3 => "SIGQUIT".into(),
60                    4 => "SIGILL".into(),
61                    6 => "SIGABRT".into(),
62                    8 => "SIGFPE".into(),
63                    9 => "SIGKILL".into(),
64                    11 => "SIGSEGV".into(),
65                    13 => "SIGPIPE".into(),
66                    14 => "SIGALRM".into(),
67                    15 => "SIGTERM".into(),
68                    22 => "SIGABRT".into(),
69                    23 => "NSIG".into(),
70                    other => other.to_string().into(),
71                };
72                if matches!(self.cause, NodeErrorCause::GraceDuration) {
73                    write!(
74                        f,
75                        "node was killed by dora because it didn't react to a stop message in time ({signal_str})"
76                    )
77                } else {
78                    write!(f, "exited because of signal {signal_str}")
79                }
80            }
81            NodeExitStatus::Unknown => write!(f, "unknown exit status"),
82        }?;
83
84        match &self.cause {
85            NodeErrorCause::GraceDuration => {} // handled above
86            NodeErrorCause::Cascading { caused_by_node } => write!(
87                f,
88                ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
89            )?,
90            NodeErrorCause::FailedToSpawn(_) => unreachable!(), // handled above
91            NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
92            NodeErrorCause::Other { stderr } => {
93                let line: &str = "---------------------------------------------------------------------------------\n";
94                let stderr = stderr.trim_end();
95                write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
96            }
97        }
98
99        Ok(())
100    }
101}
102
103#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
104pub enum NodeErrorCause {
105    /// Node was killed because it didn't react to a stop message in time.
106    GraceDuration,
107    /// Node failed because another node failed before,
108    Cascading {
109        caused_by_node: NodeId,
110    },
111    FailedToSpawn(String),
112    Other {
113        stderr: String,
114    },
115}
116
117#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
118pub enum NodeExitStatus {
119    Success,
120    IoError(String),
121    ExitCode(i32),
122    Signal(i32),
123    Unknown,
124}
125
126impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
127    fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
128        match result {
129            Ok(status) => {
130                if status.success() {
131                    NodeExitStatus::Success
132                } else if let Some(code) = status.code() {
133                    Self::ExitCode(code)
134                } else {
135                    #[cfg(unix)]
136                    {
137                        use std::os::unix::process::ExitStatusExt;
138                        if let Some(signal) = status.signal() {
139                            return Self::Signal(signal);
140                        }
141                    }
142                    Self::Unknown
143                }
144            }
145            Err(err) => Self::IoError(err.to_string()),
146        }
147    }
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
151pub struct Timestamped<T> {
152    pub inner: T,
153    pub timestamp: uhlc::Timestamp,
154}
155
156impl<T> Timestamped<T>
157where
158    T: serde::Serialize,
159{
160    pub fn serialize(&self) -> Vec<u8> {
161        bincode::serialize(self).unwrap()
162    }
163}
164
165impl Timestamped<InterDaemonEvent> {
166    pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
167        bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
168    }
169}
170
171pub type SharedMemoryId = String;
172
173#[derive(serde::Serialize, serde::Deserialize, Clone)]
174pub enum DataMessage {
175    Vec(AVec<u8, ConstAlign<128>>),
176    SharedMemory {
177        shared_memory_id: String,
178        len: usize,
179        drop_token: DropToken,
180    },
181}
182
183impl DataMessage {
184    pub fn drop_token(&self) -> Option<DropToken> {
185        match self {
186            DataMessage::Vec(_) => None,
187            DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
188        }
189    }
190}
191
192impl fmt::Debug for DataMessage {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        match self {
195            Self::Vec(v) => f
196                .debug_struct("Vec")
197                .field("len", &v.len())
198                .finish_non_exhaustive(),
199            Self::SharedMemory {
200                shared_memory_id,
201                len,
202                drop_token,
203            } => f
204                .debug_struct("SharedMemory")
205                .field("shared_memory_id", shared_memory_id)
206                .field("len", len)
207                .field("drop_token", drop_token)
208                .finish(),
209        }
210    }
211}
212
213#[derive(
214    Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
215)]
216pub struct DropToken(Uuid);
217
218impl DropToken {
219    pub fn generate() -> Self {
220        Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
221    }
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
225pub struct DaemonId {
226    machine_id: Option<String>,
227    uuid: Uuid,
228}
229
230impl DaemonId {
231    pub fn new(machine_id: Option<String>) -> Self {
232        DaemonId {
233            machine_id,
234            uuid: Uuid::new_v4(),
235        }
236    }
237
238    pub fn matches_machine_id(&self, machine_id: &str) -> bool {
239        self.machine_id
240            .as_ref()
241            .map(|id| id == machine_id)
242            .unwrap_or_default()
243    }
244
245    pub fn machine_id(&self) -> Option<&str> {
246        self.machine_id.as_deref()
247    }
248}
249
250impl std::fmt::Display for DaemonId {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        if let Some(id) = &self.machine_id {
253            write!(f, "{id}-")?;
254        }
255        write!(f, "{}", self.uuid)
256    }
257}
258
259#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
260pub struct GitSource {
261    pub repo: String,
262    pub commit_hash: String,
263}