1use core::fmt;
2use std::borrow::Cow;
3
4use aligned_vec::{AVec, ConstAlign};
5use eyre::Context as _;
6use uuid::Uuid;
7
8use crate::{BuildId, DataflowId, daemon_to_daemon::InterDaemonEvent, id::NodeId};
9
10pub use log::Level as LogLevel;
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13#[must_use]
14pub struct LogMessage {
15 pub build_id: Option<BuildId>,
16 pub dataflow_id: Option<DataflowId>,
17 pub node_id: Option<NodeId>,
18 pub daemon_id: Option<DaemonId>,
19 pub level: LogLevelOrStdout,
20 pub target: Option<String>,
21 pub module_path: Option<String>,
22 pub file: Option<String>,
23 pub line: Option<u32>,
24 pub message: String,
25}
26
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
28pub enum LogLevelOrStdout {
29 LogLevel(LogLevel),
30 Stdout,
31}
32
33impl From<LogLevel> for LogLevelOrStdout {
34 fn from(level: LogLevel) -> Self {
35 Self::LogLevel(level)
36 }
37}
38
39#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
40pub struct NodeError {
41 pub timestamp: uhlc::Timestamp,
42 pub cause: NodeErrorCause,
43 pub exit_status: NodeExitStatus,
44}
45
46impl std::fmt::Display for NodeError {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 if let NodeErrorCause::FailedToSpawn(err) = &self.cause {
49 return write!(f, "failed to spawn node: {err}");
50 }
51 match &self.exit_status {
52 NodeExitStatus::Success => write!(f, "<success>"),
53 NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
54 NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
55 NodeExitStatus::Signal(signal) => {
56 let signal_str: Cow<_> = match signal {
57 1 => "SIGHUP".into(),
58 2 => "SIGINT".into(),
59 3 => "SIGQUIT".into(),
60 4 => "SIGILL".into(),
61 6 => "SIGABRT".into(),
62 8 => "SIGFPE".into(),
63 9 => "SIGKILL".into(),
64 11 => "SIGSEGV".into(),
65 13 => "SIGPIPE".into(),
66 14 => "SIGALRM".into(),
67 15 => "SIGTERM".into(),
68 22 => "SIGABRT".into(),
69 23 => "NSIG".into(),
70 other => other.to_string().into(),
71 };
72 if matches!(self.cause, NodeErrorCause::GraceDuration) {
73 write!(
74 f,
75 "node was killed by dora because it didn't react to a stop message in time ({signal_str})"
76 )
77 } else {
78 write!(f, "exited because of signal {signal_str}")
79 }
80 }
81 NodeExitStatus::Unknown => write!(f, "unknown exit status"),
82 }?;
83
84 match &self.cause {
85 NodeErrorCause::GraceDuration => {} NodeErrorCause::Cascading { caused_by_node } => write!(
87 f,
88 ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
89 )?,
90 NodeErrorCause::FailedToSpawn(_) => unreachable!(), NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
92 NodeErrorCause::Other { stderr } => {
93 let line: &str = "---------------------------------------------------------------------------------\n";
94 let stderr = stderr.trim_end();
95 write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
96 }
97 }
98
99 Ok(())
100 }
101}
102
103#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
104pub enum NodeErrorCause {
105 GraceDuration,
107 Cascading {
109 caused_by_node: NodeId,
110 },
111 FailedToSpawn(String),
112 Other {
113 stderr: String,
114 },
115}
116
117#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
118pub enum NodeExitStatus {
119 Success,
120 IoError(String),
121 ExitCode(i32),
122 Signal(i32),
123 Unknown,
124}
125
126impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
127 fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
128 match result {
129 Ok(status) => {
130 if status.success() {
131 NodeExitStatus::Success
132 } else if let Some(code) = status.code() {
133 Self::ExitCode(code)
134 } else {
135 #[cfg(unix)]
136 {
137 use std::os::unix::process::ExitStatusExt;
138 if let Some(signal) = status.signal() {
139 return Self::Signal(signal);
140 }
141 }
142 Self::Unknown
143 }
144 }
145 Err(err) => Self::IoError(err.to_string()),
146 }
147 }
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
151pub struct Timestamped<T> {
152 pub inner: T,
153 pub timestamp: uhlc::Timestamp,
154}
155
156impl<T> Timestamped<T>
157where
158 T: serde::Serialize,
159{
160 pub fn serialize(&self) -> Vec<u8> {
161 bincode::serialize(self).unwrap()
162 }
163}
164
165impl Timestamped<InterDaemonEvent> {
166 pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
167 bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
168 }
169}
170
171pub type SharedMemoryId = String;
172
173#[derive(serde::Serialize, serde::Deserialize, Clone)]
174pub enum DataMessage {
175 Vec(AVec<u8, ConstAlign<128>>),
176 SharedMemory {
177 shared_memory_id: String,
178 len: usize,
179 drop_token: DropToken,
180 },
181}
182
183impl DataMessage {
184 pub fn drop_token(&self) -> Option<DropToken> {
185 match self {
186 DataMessage::Vec(_) => None,
187 DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
188 }
189 }
190}
191
192impl fmt::Debug for DataMessage {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 match self {
195 Self::Vec(v) => f
196 .debug_struct("Vec")
197 .field("len", &v.len())
198 .finish_non_exhaustive(),
199 Self::SharedMemory {
200 shared_memory_id,
201 len,
202 drop_token,
203 } => f
204 .debug_struct("SharedMemory")
205 .field("shared_memory_id", shared_memory_id)
206 .field("len", len)
207 .field("drop_token", drop_token)
208 .finish(),
209 }
210 }
211}
212
213#[derive(
214 Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
215)]
216pub struct DropToken(Uuid);
217
218impl DropToken {
219 pub fn generate() -> Self {
220 Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
221 }
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
225pub struct DaemonId {
226 machine_id: Option<String>,
227 uuid: Uuid,
228}
229
230impl DaemonId {
231 pub fn new(machine_id: Option<String>) -> Self {
232 DaemonId {
233 machine_id,
234 uuid: Uuid::new_v4(),
235 }
236 }
237
238 pub fn matches_machine_id(&self, machine_id: &str) -> bool {
239 self.machine_id
240 .as_ref()
241 .map(|id| id == machine_id)
242 .unwrap_or_default()
243 }
244
245 pub fn machine_id(&self) -> Option<&str> {
246 self.machine_id.as_deref()
247 }
248}
249
250impl std::fmt::Display for DaemonId {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 if let Some(id) = &self.machine_id {
253 write!(f, "{id}-")?;
254 }
255 write!(f, "{}", self.uuid)
256 }
257}
258
259#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
260pub struct GitSource {
261 pub repo: String,
262 pub commit_hash: String,
263}