1use core::fmt;
2use std::borrow::Cow;
3
4use aligned_vec::{AVec, ConstAlign};
5use eyre::Context as _;
6use uuid::Uuid;
7
8use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId};
9
10pub use log::Level as LogLevel;
11
12#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13#[must_use]
14pub struct LogMessage {
15 pub build_id: Option<BuildId>,
16 pub dataflow_id: Option<DataflowId>,
17 pub node_id: Option<NodeId>,
18 pub daemon_id: Option<DaemonId>,
19 pub level: LogLevelOrStdout,
20 pub target: Option<String>,
21 pub module_path: Option<String>,
22 pub file: Option<String>,
23 pub line: Option<u32>,
24 pub message: String,
25}
26
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
28pub enum LogLevelOrStdout {
29 LogLevel(LogLevel),
30 Stdout,
31}
32
33impl From<LogLevel> for LogLevelOrStdout {
34 fn from(level: LogLevel) -> Self {
35 Self::LogLevel(level)
36 }
37}
38
39#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
40pub struct NodeError {
41 pub timestamp: uhlc::Timestamp,
42 pub cause: NodeErrorCause,
43 pub exit_status: NodeExitStatus,
44}
45
46impl std::fmt::Display for NodeError {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 if let NodeErrorCause::FailedToSpawn(err) = &self.cause {
49 return write!(f, "failed to spawn node: {err}");
50 }
51 match &self.exit_status {
52 NodeExitStatus::Success => write!(f, "<success>"),
53 NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
54 NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
55 NodeExitStatus::Signal(signal) => {
56 let signal_str: Cow<_> = match signal {
57 1 => "SIGHUP".into(),
58 2 => "SIGINT".into(),
59 3 => "SIGQUIT".into(),
60 4 => "SIGILL".into(),
61 6 => "SIGABRT".into(),
62 8 => "SIGFPE".into(),
63 9 => "SIGKILL".into(),
64 11 => "SIGSEGV".into(),
65 13 => "SIGPIPE".into(),
66 14 => "SIGALRM".into(),
67 15 => "SIGTERM".into(),
68 22 => "SIGABRT".into(),
69 23 => "NSIG".into(),
70 other => other.to_string().into(),
71 };
72 if matches!(self.cause, NodeErrorCause::GraceDuration) {
73 write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
74 } else {
75 write!(f, "exited because of signal {signal_str}")
76 }
77 }
78 NodeExitStatus::Unknown => write!(f, "unknown exit status"),
79 }?;
80
81 match &self.cause {
82 NodeErrorCause::GraceDuration => {}, NodeErrorCause::Cascading { caused_by_node } => write!(
84 f,
85 ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
86 )?,
87 NodeErrorCause::FailedToSpawn(_) => unreachable!(), NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
89 NodeErrorCause::Other { stderr } => {
90 let line: &str = "---------------------------------------------------------------------------------\n";
91 let stderr = stderr.trim_end();
92 write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
93 },
94 }
95
96 Ok(())
97 }
98}
99
100#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
101pub enum NodeErrorCause {
102 GraceDuration,
104 Cascading {
106 caused_by_node: NodeId,
107 },
108 FailedToSpawn(String),
109 Other {
110 stderr: String,
111 },
112}
113
114#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
115pub enum NodeExitStatus {
116 Success,
117 IoError(String),
118 ExitCode(i32),
119 Signal(i32),
120 Unknown,
121}
122
123impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
124 fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
125 match result {
126 Ok(status) => {
127 if status.success() {
128 NodeExitStatus::Success
129 } else if let Some(code) = status.code() {
130 Self::ExitCode(code)
131 } else {
132 #[cfg(unix)]
133 {
134 use std::os::unix::process::ExitStatusExt;
135 if let Some(signal) = status.signal() {
136 return Self::Signal(signal);
137 }
138 }
139 Self::Unknown
140 }
141 }
142 Err(err) => Self::IoError(err.to_string()),
143 }
144 }
145}
146
147#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
148pub struct Timestamped<T> {
149 pub inner: T,
150 pub timestamp: uhlc::Timestamp,
151}
152
153impl<T> Timestamped<T>
154where
155 T: serde::Serialize,
156{
157 pub fn serialize(&self) -> Vec<u8> {
158 bincode::serialize(self).unwrap()
159 }
160}
161
162impl Timestamped<InterDaemonEvent> {
163 pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
164 bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
165 }
166}
167
168pub type SharedMemoryId = String;
169
170#[derive(serde::Serialize, serde::Deserialize, Clone)]
171pub enum DataMessage {
172 Vec(AVec<u8, ConstAlign<128>>),
173 SharedMemory {
174 shared_memory_id: String,
175 len: usize,
176 drop_token: DropToken,
177 },
178}
179
180impl DataMessage {
181 pub fn drop_token(&self) -> Option<DropToken> {
182 match self {
183 DataMessage::Vec(_) => None,
184 DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
185 }
186 }
187}
188
189impl fmt::Debug for DataMessage {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 match self {
192 Self::Vec(v) => f
193 .debug_struct("Vec")
194 .field("len", &v.len())
195 .finish_non_exhaustive(),
196 Self::SharedMemory {
197 shared_memory_id,
198 len,
199 drop_token,
200 } => f
201 .debug_struct("SharedMemory")
202 .field("shared_memory_id", shared_memory_id)
203 .field("len", len)
204 .field("drop_token", drop_token)
205 .finish(),
206 }
207 }
208}
209
210#[derive(
211 Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
212)]
213pub struct DropToken(Uuid);
214
215impl DropToken {
216 pub fn generate() -> Self {
217 Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
218 }
219}
220
221#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
222pub struct DaemonId {
223 machine_id: Option<String>,
224 uuid: Uuid,
225}
226
227impl DaemonId {
228 pub fn new(machine_id: Option<String>) -> Self {
229 DaemonId {
230 machine_id,
231 uuid: Uuid::new_v4(),
232 }
233 }
234
235 pub fn matches_machine_id(&self, machine_id: &str) -> bool {
236 self.machine_id
237 .as_ref()
238 .map(|id| id == machine_id)
239 .unwrap_or_default()
240 }
241
242 pub fn machine_id(&self) -> Option<&str> {
243 self.machine_id.as_deref()
244 }
245}
246
247impl std::fmt::Display for DaemonId {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 if let Some(id) = &self.machine_id {
250 write!(f, "{id}-")?;
251 }
252 write!(f, "{}", self.uuid)
253 }
254}
255
256#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
257pub struct GitSource {
258 pub repo: String,
259 pub commit_hash: String,
260}