1use core::fmt;
2use std::{borrow::Cow, collections::BTreeMap};
3
4use aligned_vec::{AVec, ConstAlign};
5use chrono::{DateTime, Utc};
6use eyre::Context as _;
7use serde::{Deserialize, Deserializer};
8use uuid::Uuid;
9
10use crate::{BuildId, DataflowId, daemon_to_daemon::InterDaemonEvent, id::NodeId};
11
12pub use log::Level as LogLevel;
13
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
15#[must_use]
16pub struct LogMessage {
17 pub build_id: Option<BuildId>,
18 pub dataflow_id: Option<DataflowId>,
19 pub node_id: Option<NodeId>,
20 pub daemon_id: Option<DaemonId>,
21 pub level: LogLevelOrStdout,
22 pub target: Option<String>,
23 pub module_path: Option<String>,
24 pub file: Option<String>,
25 pub line: Option<u32>,
26 pub message: String,
27 pub timestamp: DateTime<Utc>,
28 pub fields: Option<BTreeMap<String, String>>,
29}
30
31#[derive(Deserialize)]
32pub struct LogMessageHelper {
33 build_id: Option<BuildId>,
34 dataflow_id: Option<DataflowId>,
35 node_id: Option<NodeId>,
36 daemon_id: Option<DaemonId>,
37 level: LogLevelOrStdout,
38 target: Option<String>,
39 module_path: Option<String>,
40 file: Option<String>,
41 line: Option<u32>,
42 message: Option<String>,
43 timestamp: DateTime<Utc>,
44 fields: Option<BTreeMap<String, String>>,
45}
46
47impl From<LogMessageHelper> for LogMessage {
48 fn from(helper: LogMessageHelper) -> Self {
49 let fields = helper.fields.as_ref();
50 LogMessage {
51 build_id: helper.build_id.or(fields
52 .and_then(|f| f.get("build_id").cloned())
53 .map(|id| BuildId(Uuid::parse_str(&id).unwrap()))),
54 dataflow_id: helper.dataflow_id.or(fields
55 .and_then(|f| f.get("dataflow_id").cloned())
56 .map(|id| DataflowId::from(Uuid::parse_str(&id).unwrap()))),
57 node_id: helper.node_id.or(fields
58 .and_then(|f| f.get("node_id").cloned())
59 .map(|id| NodeId(id))),
60 daemon_id: helper
61 .daemon_id
62 .or(fields.and_then(|f| f.get("daemon_id").cloned()).map(|id| {
63 let parts: Vec<&str> = id.splitn(2, '-').collect();
64 if parts.len() == 2 {
65 DaemonId {
66 machine_id: Some(parts[0].to_string()),
67 uuid: Uuid::parse_str(parts[1]).unwrap(),
68 }
69 } else {
70 DaemonId {
71 machine_id: None,
72 uuid: Uuid::parse_str(&parts[0]).unwrap(),
73 }
74 }
75 })),
76 level: helper.level,
77 target: helper
78 .target
79 .or(fields.and_then(|f| f.get("target").cloned())),
80 module_path: helper
81 .module_path
82 .or(fields.and_then(|f| f.get("module_path").cloned())),
83 file: helper.file.or(fields.and_then(|f| f.get("file").cloned())),
84 line: helper.line.or(fields
85 .and_then(|f| f.get("line").cloned())
86 .and_then(|s| s.parse().ok())),
87 message: helper
88 .message
89 .or(fields.and_then(|f| f.get("message").cloned()))
90 .unwrap_or_default(),
91 fields: helper.fields,
92 timestamp: helper.timestamp,
93 }
94 }
95}
96
97#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
98#[serde(rename_all = "UPPERCASE")]
99pub enum LogLevelOrStdout {
100 #[serde(rename = "stdout")]
101 Stdout,
102 #[serde(untagged)]
103 LogLevel(LogLevel),
104}
105
106impl From<LogLevel> for LogLevelOrStdout {
107 fn from(level: LogLevel) -> Self {
108 Self::LogLevel(level)
109 }
110}
111
112#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
113pub struct NodeError {
114 pub timestamp: uhlc::Timestamp,
115 pub cause: NodeErrorCause,
116 pub exit_status: NodeExitStatus,
117}
118
119impl std::fmt::Display for NodeError {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 if let NodeErrorCause::FailedToSpawn(err) = &self.cause {
122 return write!(f, "failed to spawn node: {err}");
123 }
124 match &self.exit_status {
125 NodeExitStatus::Success => write!(f, "<success>"),
126 NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
127 NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
128 NodeExitStatus::Signal(signal) => {
129 let signal_str: Cow<_> = match signal {
130 1 => "SIGHUP".into(),
131 2 => "SIGINT".into(),
132 3 => "SIGQUIT".into(),
133 4 => "SIGILL".into(),
134 6 => "SIGABRT".into(),
135 8 => "SIGFPE".into(),
136 9 => "SIGKILL".into(),
137 11 => "SIGSEGV".into(),
138 13 => "SIGPIPE".into(),
139 14 => "SIGALRM".into(),
140 15 => "SIGTERM".into(),
141 22 => "SIGABRT".into(),
142 23 => "NSIG".into(),
143 other => other.to_string().into(),
144 };
145 if matches!(self.cause, NodeErrorCause::GraceDuration) {
146 write!(
147 f,
148 "node was killed by dora because it didn't react to a stop message in time ({signal_str})"
149 )
150 } else {
151 write!(f, "exited because of signal {signal_str}")
152 }
153 }
154 NodeExitStatus::Unknown => write!(f, "unknown exit status"),
155 }?;
156
157 match &self.cause {
158 NodeErrorCause::GraceDuration => {} NodeErrorCause::Cascading { caused_by_node } => write!(
160 f,
161 ". This error occurred because node `{caused_by_node}` exited before connecting to dora."
162 )?,
163 NodeErrorCause::FailedToSpawn(_) => unreachable!(), NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
165 NodeErrorCause::Other { stderr } => {
166 let line: &str = "---------------------------------------------------------------------------------\n";
167 let stderr = stderr.trim_end();
168 write!(f, " with stderr output:\n{line}{stderr}\n{line}")?
169 }
170 }
171
172 Ok(())
173 }
174}
175
176#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
177pub enum NodeErrorCause {
178 GraceDuration,
180 Cascading {
182 caused_by_node: NodeId,
183 },
184 FailedToSpawn(String),
185 Other {
186 stderr: String,
187 },
188}
189
190#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
191pub enum NodeExitStatus {
192 Success,
193 IoError(String),
194 ExitCode(i32),
195 Signal(i32),
196 Unknown,
197}
198
199impl NodeExitStatus {
200 pub fn is_success(&self) -> bool {
201 matches!(self, NodeExitStatus::Success)
202 }
203}
204
205impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
206 fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
207 match result {
208 Ok(status) => {
209 if status.success() {
210 NodeExitStatus::Success
211 } else if let Some(code) = status.code() {
212 Self::ExitCode(code)
213 } else {
214 #[cfg(unix)]
215 {
216 use std::os::unix::process::ExitStatusExt;
217 if let Some(signal) = status.signal() {
218 return Self::Signal(signal);
219 }
220 }
221 Self::Unknown
222 }
223 }
224 Err(err) => Self::IoError(err.to_string()),
225 }
226 }
227}
228
229#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
230pub struct Timestamped<T> {
231 pub inner: T,
232 pub timestamp: uhlc::Timestamp,
233}
234
235impl<T> Timestamped<T>
236where
237 T: serde::Serialize,
238{
239 pub fn serialize(&self) -> Vec<u8> {
240 bincode::serialize(self).unwrap()
241 }
242}
243
244impl Timestamped<InterDaemonEvent> {
245 pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
246 bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
247 }
248}
249
250pub type SharedMemoryId = String;
251
252#[derive(serde::Serialize, serde::Deserialize, Clone)]
253pub enum DataMessage {
254 Vec(AVec<u8, ConstAlign<128>>),
255 SharedMemory {
256 shared_memory_id: String,
257 len: usize,
258 drop_token: DropToken,
259 },
260}
261
262impl DataMessage {
263 pub fn drop_token(&self) -> Option<DropToken> {
264 match self {
265 DataMessage::Vec(_) => None,
266 DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
267 }
268 }
269}
270
271impl fmt::Debug for DataMessage {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 match self {
274 Self::Vec(v) => f
275 .debug_struct("Vec")
276 .field("len", &v.len())
277 .finish_non_exhaustive(),
278 Self::SharedMemory {
279 shared_memory_id,
280 len,
281 drop_token,
282 } => f
283 .debug_struct("SharedMemory")
284 .field("shared_memory_id", shared_memory_id)
285 .field("len", len)
286 .field("drop_token", drop_token)
287 .finish(),
288 }
289 }
290}
291
292#[derive(
293 Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
294)]
295pub struct DropToken(Uuid);
296
297impl DropToken {
298 pub fn generate() -> Self {
299 Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
304pub struct DaemonId {
305 machine_id: Option<String>,
306 uuid: Uuid,
307}
308
309impl DaemonId {
310 pub fn new(machine_id: Option<String>) -> Self {
311 DaemonId {
312 machine_id,
313 uuid: Uuid::new_v4(),
314 }
315 }
316
317 pub fn matches_machine_id(&self, machine_id: &str) -> bool {
318 self.machine_id
319 .as_ref()
320 .map(|id| id == machine_id)
321 .unwrap_or_default()
322 }
323
324 pub fn machine_id(&self) -> Option<&str> {
325 self.machine_id.as_deref()
326 }
327}
328
329impl std::fmt::Display for DaemonId {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 if let Some(id) = &self.machine_id {
332 write!(f, "{id}-")?;
333 }
334 write!(f, "{}", self.uuid)
335 }
336}
337
338#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
339pub struct GitSource {
340 pub repo: String,
341 pub commit_hash: String,
342}
343
344#[cfg(test)]
346mod tests {
347 use super::*;
348 #[test]
349 fn test_log_message_serialization() {
350 let log_message = LogMessage {
351 build_id: Some(BuildId(Uuid::new_v4())),
352 dataflow_id: Some(DataflowId::from(Uuid::new_v4())),
353 node_id: Some(NodeId("node-1".to_string())),
354 daemon_id: Some(DaemonId::new(Some("machine-1".to_string()))),
355 level: LogLevelOrStdout::LogLevel(LogLevel::Info),
356 target: Some("target".to_string()),
357 module_path: Some("module::path".to_string()),
358 file: Some("file.rs".to_string()),
359 line: Some(42),
360 message: "This is a log message".to_string(),
361 timestamp: Utc::now(),
362 fields: Some(BTreeMap::from([("key".to_string(), "value".to_string())])),
363 };
364 let serialized = serde_yaml::to_string(&log_message).unwrap();
365 let deserialized: LogMessageHelper = serde_yaml::from_str(&serialized).unwrap();
366 assert_eq!(log_message, LogMessage::from(deserialized));
367 }
368}