1use core::fmt;
2use std::borrow::Cow;
3
4use aligned_vec::{AVec, ConstAlign};
5use eyre::Context as _;
6use uuid::Uuid;
7
8use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId};
9
10pub use log::Level as LogLevel;
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13#[must_use]
14pub struct LogMessage {
15 pub dataflow_id: DataflowId,
16 pub node_id: Option<NodeId>,
17 pub daemon_id: Option<DaemonId>,
18 pub level: LogLevel,
19 pub target: Option<String>,
20 pub module_path: Option<String>,
21 pub file: Option<String>,
22 pub line: Option<u32>,
23 pub message: String,
24}
25
26#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
27pub struct NodeError {
28 pub timestamp: uhlc::Timestamp,
29 pub cause: NodeErrorCause,
30 pub exit_status: NodeExitStatus,
31}
32
33impl std::fmt::Display for NodeError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match &self.exit_status {
36 NodeExitStatus::Success => write!(f, "<success>"),
37 NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
38 NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
39 NodeExitStatus::Signal(signal) => {
40 let signal_str: Cow<_> = match signal {
41 1 => "SIGHUP".into(),
42 2 => "SIGINT".into(),
43 3 => "SIGQUIT".into(),
44 4 => "SIGILL".into(),
45 6 => "SIGABRT".into(),
46 8 => "SIGFPE".into(),
47 9 => "SIGKILL".into(),
48 11 => "SIGSEGV".into(),
49 13 => "SIGPIPE".into(),
50 14 => "SIGALRM".into(),
51 15 => "SIGTERM".into(),
52 22 => "SIGABRT".into(),
53 23 => "NSIG".into(),
54 other => other.to_string().into(),
55 };
56 if matches!(self.cause, NodeErrorCause::GraceDuration) {
57 write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
58 } else {
59 write!(f, "exited because of signal {signal_str}")
60 }
61 }
62 NodeExitStatus::Unknown => write!(f, "unknown exit status"),
63 }?;
64
65 match &self.cause {
66 NodeErrorCause::GraceDuration => {}, NodeErrorCause::Cascading { caused_by_node } => write!(
68 f,
69 ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
70 )?,
71 NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
72 NodeErrorCause::Other { stderr } => {
73 let line: &str = "---------------------------------------------------------------------------------\n";
74 let stderr = stderr.trim_end();
75 write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
76 },
77 }
78
79 Ok(())
80 }
81}
82
83#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
84pub enum NodeErrorCause {
85 GraceDuration,
87 Cascading {
89 caused_by_node: NodeId,
90 },
91 Other {
92 stderr: String,
93 },
94}
95
96#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
97pub enum NodeExitStatus {
98 Success,
99 IoError(String),
100 ExitCode(i32),
101 Signal(i32),
102 Unknown,
103}
104
105impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
106 fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
107 match result {
108 Ok(status) => {
109 if status.success() {
110 NodeExitStatus::Success
111 } else if let Some(code) = status.code() {
112 Self::ExitCode(code)
113 } else {
114 #[cfg(unix)]
115 {
116 use std::os::unix::process::ExitStatusExt;
117 if let Some(signal) = status.signal() {
118 return Self::Signal(signal);
119 }
120 }
121 Self::Unknown
122 }
123 }
124 Err(err) => Self::IoError(err.to_string()),
125 }
126 }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
130pub struct Timestamped<T> {
131 pub inner: T,
132 pub timestamp: uhlc::Timestamp,
133}
134
135impl<T> Timestamped<T>
136where
137 T: serde::Serialize,
138{
139 pub fn serialize(&self) -> Vec<u8> {
140 bincode::serialize(self).unwrap()
141 }
142}
143
144impl Timestamped<InterDaemonEvent> {
145 pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
146 bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
147 }
148}
149
150pub type SharedMemoryId = String;
151
152#[derive(serde::Serialize, serde::Deserialize, Clone)]
153pub enum DataMessage {
154 Vec(AVec<u8, ConstAlign<128>>),
155 SharedMemory {
156 shared_memory_id: String,
157 len: usize,
158 drop_token: DropToken,
159 },
160}
161
162impl DataMessage {
163 pub fn drop_token(&self) -> Option<DropToken> {
164 match self {
165 DataMessage::Vec(_) => None,
166 DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
167 }
168 }
169}
170
171impl fmt::Debug for DataMessage {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 match self {
174 Self::Vec(v) => f
175 .debug_struct("Vec")
176 .field("len", &v.len())
177 .finish_non_exhaustive(),
178 Self::SharedMemory {
179 shared_memory_id,
180 len,
181 drop_token,
182 } => f
183 .debug_struct("SharedMemory")
184 .field("shared_memory_id", shared_memory_id)
185 .field("len", len)
186 .field("drop_token", drop_token)
187 .finish(),
188 }
189 }
190}
191
192#[derive(
193 Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
194)]
195pub struct DropToken(Uuid);
196
197impl DropToken {
198 pub fn generate() -> Self {
199 Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
200 }
201}
202
203#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
204pub struct DaemonId {
205 machine_id: Option<String>,
206 uuid: Uuid,
207}
208
209impl DaemonId {
210 pub fn new(machine_id: Option<String>) -> Self {
211 DaemonId {
212 machine_id,
213 uuid: Uuid::new_v4(),
214 }
215 }
216
217 pub fn matches_machine_id(&self, machine_id: &str) -> bool {
218 self.machine_id
219 .as_ref()
220 .map(|id| id == machine_id)
221 .unwrap_or_default()
222 }
223
224 pub fn machine_id(&self) -> Option<&str> {
225 self.machine_id.as_deref()
226 }
227}
228
229impl std::fmt::Display for DaemonId {
230 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231 if let Some(id) = &self.machine_id {
232 write!(f, "{id}-")?;
233 }
234 write!(f, "{}", self.uuid)
235 }
236}