dora_message/
common.rs

1use core::fmt;
2use std::borrow::Cow;
3
4use aligned_vec::{AVec, ConstAlign};
5use eyre::Context as _;
6use uuid::Uuid;
7
8use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId};
9
10pub use log::Level as LogLevel;
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13#[must_use]
14pub struct LogMessage {
15    pub dataflow_id: DataflowId,
16    pub node_id: Option<NodeId>,
17    pub daemon_id: Option<DaemonId>,
18    pub level: LogLevel,
19    pub target: Option<String>,
20    pub module_path: Option<String>,
21    pub file: Option<String>,
22    pub line: Option<u32>,
23    pub message: String,
24}
25
26#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
27pub struct NodeError {
28    pub timestamp: uhlc::Timestamp,
29    pub cause: NodeErrorCause,
30    pub exit_status: NodeExitStatus,
31}
32
33impl std::fmt::Display for NodeError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match &self.exit_status {
36            NodeExitStatus::Success => write!(f, "<success>"),
37            NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
38            NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
39            NodeExitStatus::Signal(signal) => {
40                let signal_str: Cow<_> = match signal {
41                    1 => "SIGHUP".into(),
42                    2 => "SIGINT".into(),
43                    3 => "SIGQUIT".into(),
44                    4 => "SIGILL".into(),
45                    6 => "SIGABRT".into(),
46                    8 => "SIGFPE".into(),
47                    9 => "SIGKILL".into(),
48                    11 => "SIGSEGV".into(),
49                    13 => "SIGPIPE".into(),
50                    14 => "SIGALRM".into(),
51                    15 => "SIGTERM".into(),
52                    22 => "SIGABRT".into(),
53                    23 => "NSIG".into(),
54                    other => other.to_string().into(),
55                };
56                if matches!(self.cause, NodeErrorCause::GraceDuration) {
57                    write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
58                } else {
59                    write!(f, "exited because of signal {signal_str}")
60                }
61            }
62            NodeExitStatus::Unknown => write!(f, "unknown exit status"),
63        }?;
64
65        match &self.cause {
66            NodeErrorCause::GraceDuration => {}, // handled above
67            NodeErrorCause::Cascading { caused_by_node } => write!(
68                f,
69                ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
70            )?,
71            NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
72            NodeErrorCause::Other { stderr } => {
73                let line: &str = "---------------------------------------------------------------------------------\n";
74                let stderr = stderr.trim_end();
75                write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
76            },
77        }
78
79        Ok(())
80    }
81}
82
83#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
84pub enum NodeErrorCause {
85    /// Node was killed because it didn't react to a stop message in time.
86    GraceDuration,
87    /// Node failed because another node failed before,
88    Cascading {
89        caused_by_node: NodeId,
90    },
91    Other {
92        stderr: String,
93    },
94}
95
96#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
97pub enum NodeExitStatus {
98    Success,
99    IoError(String),
100    ExitCode(i32),
101    Signal(i32),
102    Unknown,
103}
104
105impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
106    fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
107        match result {
108            Ok(status) => {
109                if status.success() {
110                    NodeExitStatus::Success
111                } else if let Some(code) = status.code() {
112                    Self::ExitCode(code)
113                } else {
114                    #[cfg(unix)]
115                    {
116                        use std::os::unix::process::ExitStatusExt;
117                        if let Some(signal) = status.signal() {
118                            return Self::Signal(signal);
119                        }
120                    }
121                    Self::Unknown
122                }
123            }
124            Err(err) => Self::IoError(err.to_string()),
125        }
126    }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
130pub struct Timestamped<T> {
131    pub inner: T,
132    pub timestamp: uhlc::Timestamp,
133}
134
135impl<T> Timestamped<T>
136where
137    T: serde::Serialize,
138{
139    pub fn serialize(&self) -> Vec<u8> {
140        bincode::serialize(self).unwrap()
141    }
142}
143
144impl Timestamped<InterDaemonEvent> {
145    pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
146        bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
147    }
148}
149
150pub type SharedMemoryId = String;
151
152#[derive(serde::Serialize, serde::Deserialize, Clone)]
153pub enum DataMessage {
154    Vec(AVec<u8, ConstAlign<128>>),
155    SharedMemory {
156        shared_memory_id: String,
157        len: usize,
158        drop_token: DropToken,
159    },
160}
161
162impl DataMessage {
163    pub fn drop_token(&self) -> Option<DropToken> {
164        match self {
165            DataMessage::Vec(_) => None,
166            DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
167        }
168    }
169}
170
171impl fmt::Debug for DataMessage {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        match self {
174            Self::Vec(v) => f
175                .debug_struct("Vec")
176                .field("len", &v.len())
177                .finish_non_exhaustive(),
178            Self::SharedMemory {
179                shared_memory_id,
180                len,
181                drop_token,
182            } => f
183                .debug_struct("SharedMemory")
184                .field("shared_memory_id", shared_memory_id)
185                .field("len", len)
186                .field("drop_token", drop_token)
187                .finish(),
188        }
189    }
190}
191
192#[derive(
193    Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
194)]
195pub struct DropToken(Uuid);
196
197impl DropToken {
198    pub fn generate() -> Self {
199        Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
200    }
201}
202
203#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
204pub struct DaemonId {
205    machine_id: Option<String>,
206    uuid: Uuid,
207}
208
209impl DaemonId {
210    pub fn new(machine_id: Option<String>) -> Self {
211        DaemonId {
212            machine_id,
213            uuid: Uuid::new_v4(),
214        }
215    }
216
217    pub fn matches_machine_id(&self, machine_id: &str) -> bool {
218        self.machine_id
219            .as_ref()
220            .map(|id| id == machine_id)
221            .unwrap_or_default()
222    }
223
224    pub fn machine_id(&self) -> Option<&str> {
225        self.machine_id.as_deref()
226    }
227}
228
229impl std::fmt::Display for DaemonId {
230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231        if let Some(id) = &self.machine_id {
232            write!(f, "{id}-")?;
233        }
234        write!(f, "{}", self.uuid)
235    }
236}