dora_daemon/
log.rs

1use std::{
2    ops::{Deref, DerefMut},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use dora_core::{
8    build::{BuildLogger, LogLevelOrStdout},
9    config::NodeId,
10    uhlc,
11};
12use dora_message::{
13    BuildId,
14    common::{DaemonId, LogLevel, LogMessage, Timestamped},
15    daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
16};
17use eyre::Context;
18use flume::Sender;
19use tokio::net::TcpStream;
20use uuid::Uuid;
21
22use crate::socket_stream_utils::socket_stream_send;
23
24pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
25    let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string());
26    dataflow_dir.join(format!("log_{node_id}.txt"))
27}
28
29pub struct NodeLogger<'a> {
30    node_id: NodeId,
31    logger: DataflowLogger<'a>,
32}
33
34impl NodeLogger<'_> {
35    pub fn inner(&self) -> &DataflowLogger {
36        &self.logger
37    }
38
39    pub async fn log(
40        &mut self,
41        level: LogLevel,
42        target: Option<String>,
43        message: impl Into<String>,
44    ) {
45        self.logger
46            .log(level, Some(self.node_id.clone()), target, message)
47            .await
48    }
49
50    pub async fn try_clone(&self) -> eyre::Result<NodeLogger<'static>> {
51        Ok(NodeLogger {
52            node_id: self.node_id.clone(),
53            logger: self.logger.try_clone().await?,
54        })
55    }
56}
57
58pub struct DataflowLogger<'a> {
59    dataflow_id: Uuid,
60    logger: CowMut<'a, DaemonLogger>,
61}
62
63impl<'a> DataflowLogger<'a> {
64    pub fn for_node(self, node_id: NodeId) -> NodeLogger<'a> {
65        NodeLogger {
66            node_id,
67            logger: self,
68        }
69    }
70
71    pub fn reborrow(&mut self) -> DataflowLogger {
72        DataflowLogger {
73            dataflow_id: self.dataflow_id,
74            logger: CowMut::Borrowed(&mut self.logger),
75        }
76    }
77
78    pub fn inner(&self) -> &DaemonLogger {
79        &self.logger
80    }
81
82    pub async fn log(
83        &mut self,
84        level: LogLevel,
85        node_id: Option<NodeId>,
86        target: Option<String>,
87        message: impl Into<String>,
88    ) {
89        self.logger
90            .log(level, Some(self.dataflow_id), node_id, target, message)
91            .await
92    }
93
94    pub async fn try_clone(&self) -> eyre::Result<DataflowLogger<'static>> {
95        Ok(DataflowLogger {
96            dataflow_id: self.dataflow_id,
97            logger: CowMut::Owned(self.logger.try_clone().await?),
98        })
99    }
100}
101
102pub struct NodeBuildLogger<'a> {
103    build_id: BuildId,
104    node_id: NodeId,
105    logger: CowMut<'a, DaemonLogger>,
106}
107
108impl NodeBuildLogger<'_> {
109    pub async fn log(
110        &mut self,
111        level: impl Into<LogLevelOrStdout> + Send,
112        message: impl Into<String>,
113    ) {
114        self.logger
115            .log_build(
116                self.build_id,
117                level.into(),
118                None,
119                Some(self.node_id.clone()),
120                message,
121            )
122            .await
123    }
124
125    pub async fn try_clone_impl(&self) -> eyre::Result<NodeBuildLogger<'static>> {
126        Ok(NodeBuildLogger {
127            build_id: self.build_id,
128            node_id: self.node_id.clone(),
129            logger: CowMut::Owned(self.logger.try_clone().await?),
130        })
131    }
132}
133
134impl BuildLogger for NodeBuildLogger<'_> {
135    type Clone = NodeBuildLogger<'static>;
136
137    fn log_message(
138        &mut self,
139        level: impl Into<LogLevelOrStdout> + Send,
140        message: impl Into<String> + Send,
141    ) -> impl std::future::Future<Output = ()> + Send {
142        self.log(level, message)
143    }
144
145    fn try_clone(&self) -> impl std::future::Future<Output = eyre::Result<Self::Clone>> + Send {
146        self.try_clone_impl()
147    }
148}
149
150pub struct DaemonLogger {
151    daemon_id: DaemonId,
152    logger: Logger,
153}
154
155impl DaemonLogger {
156    pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger {
157        DataflowLogger {
158            dataflow_id,
159            logger: CowMut::Borrowed(self),
160        }
161    }
162
163    pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
164        NodeBuildLogger {
165            build_id,
166            node_id,
167            logger: CowMut::Borrowed(self),
168        }
169    }
170
171    pub fn inner(&self) -> &Logger {
172        &self.logger
173    }
174
175    pub async fn log(
176        &mut self,
177        level: LogLevel,
178        dataflow_id: Option<Uuid>,
179        node_id: Option<NodeId>,
180        target: Option<String>,
181        message: impl Into<String>,
182    ) {
183        let message = LogMessage {
184            build_id: None,
185            daemon_id: Some(self.daemon_id.clone()),
186            dataflow_id,
187            node_id,
188            level: level.into(),
189            target,
190            module_path: None,
191            file: None,
192            line: None,
193            message: message.into(),
194            timestamp: self
195                .logger
196                .clock
197                .new_timestamp()
198                .get_time()
199                .to_system_time()
200                .into(),
201
202            fields: None,
203        };
204        self.logger.log(message).await
205    }
206
207    pub async fn log_build(
208        &mut self,
209        build_id: BuildId,
210        level: LogLevelOrStdout,
211        target: Option<String>,
212        node_id: Option<NodeId>,
213        message: impl Into<String>,
214    ) {
215        let message = LogMessage {
216            build_id: Some(build_id),
217            daemon_id: Some(self.daemon_id.clone()),
218            dataflow_id: None,
219            node_id,
220            level,
221            target,
222            module_path: None,
223            file: None,
224            line: None,
225            message: message.into(),
226            timestamp: self
227                .logger
228                .clock
229                .new_timestamp()
230                .get_time()
231                .to_system_time()
232                .into(),
233            fields: None,
234        };
235        self.logger.log(message).await
236    }
237
238    pub(crate) fn daemon_id(&self) -> &DaemonId {
239        &self.daemon_id
240    }
241
242    pub async fn try_clone(&self) -> eyre::Result<Self> {
243        Ok(Self {
244            daemon_id: self.daemon_id.clone(),
245            logger: self.logger.try_clone().await?,
246        })
247    }
248}
249
250pub struct Logger {
251    pub(super) destination: LogDestination,
252    pub(super) daemon_id: DaemonId,
253    pub(super) clock: Arc<uhlc::HLC>,
254}
255
256impl Logger {
257    pub fn for_daemon(self, daemon_id: DaemonId) -> DaemonLogger {
258        DaemonLogger {
259            daemon_id,
260            logger: self,
261        }
262    }
263
264    pub async fn log(&mut self, message: LogMessage) {
265        match &mut self.destination {
266            LogDestination::Coordinator {
267                coordinator_connection,
268            } => {
269                let message = Timestamped {
270                    inner: CoordinatorRequest::Event {
271                        daemon_id: self.daemon_id.clone(),
272                        event: DaemonEvent::Log(message.clone()),
273                    },
274                    timestamp: self.clock.new_timestamp(),
275                };
276                Self::log_to_coordinator(message, coordinator_connection).await
277            }
278            LogDestination::Channel { sender } => {
279                let _ = sender.send_async(message).await;
280            }
281            LogDestination::Tracing => {
282                // log message using tracing if reporting to coordinator is not possible
283                match message.level {
284                    LogLevelOrStdout::Stdout => {
285                        tracing::info!(
286                            build_id = ?message.build_id.map(|id| id.to_string()),
287                            dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
288                            node_id = ?message.node_id.map(|id| id.to_string()),
289                            target = message.target,
290                            module_path = message.module_path,
291                            file = message.file,
292                            line = message.line,
293                            "{}",
294                            Indent(&message.message)
295                        )
296                    }
297                    LogLevelOrStdout::LogLevel(level) => match level {
298                        LogLevel::Error => {
299                            tracing::error!(
300                                build_id = ?message.build_id.map(|id| id.to_string()),
301                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
302                                node_id = ?message.node_id.map(|id| id.to_string()),
303                                target = message.target,
304                                module_path = message.module_path,
305                                file = message.file,
306                                line = message.line,
307                                "{}",
308                                Indent(&message.message)
309                            );
310                        }
311                        LogLevel::Warn => {
312                            tracing::warn!(
313                                build_id = ?message.build_id.map(|id| id.to_string()),
314                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
315                                node_id = ?message.node_id.map(|id| id.to_string()),
316                                target = message.target,
317                                module_path = message.module_path,
318                                file = message.file,
319                                line = message.line,
320                                "{}",
321                                Indent(&message.message)
322                            );
323                        }
324                        LogLevel::Info => {
325                            tracing::info!(
326                                build_id = ?message.build_id.map(|id| id.to_string()),
327                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
328                                node_id = ?message.node_id.map(|id| id.to_string()),
329                                target = message.target,
330                                module_path = message.module_path,
331                                file = message.file,
332                                line = message.line,
333                                "{}",
334                                Indent(&message.message)
335                            );
336                        }
337                        LogLevel::Debug => {
338                            tracing::debug!(
339                                build_id = ?message.build_id.map(|id| id.to_string()),
340                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
341                                node_id = ?message.node_id.map(|id| id.to_string()),
342                                target = message.target,
343                                module_path = message.module_path,
344                                file = message.file,
345                                line = message.line,
346                                "{}",
347                                Indent(&message.message)
348                            );
349                        }
350                        _ => {}
351                    },
352                }
353            }
354        }
355    }
356
357    pub async fn try_clone(&self) -> eyre::Result<Self> {
358        let destination = match &self.destination {
359            LogDestination::Coordinator {
360                coordinator_connection,
361            } => {
362                let addr = coordinator_connection
363                    .peer_addr()
364                    .context("failed to get coordinator peer addr")?;
365                let new_connection = TcpStream::connect(addr)
366                    .await
367                    .context("failed to connect to coordinator during logger clone")?;
368                LogDestination::Coordinator {
369                    coordinator_connection: new_connection,
370                }
371            }
372            LogDestination::Channel { sender } => LogDestination::Channel {
373                sender: sender.clone(),
374            },
375            LogDestination::Tracing => LogDestination::Tracing,
376        };
377
378        Ok(Self {
379            destination,
380            daemon_id: self.daemon_id.clone(),
381            clock: self.clock.clone(),
382        })
383    }
384
385    async fn log_to_coordinator(
386        message: Timestamped<CoordinatorRequest>,
387        connection: &mut TcpStream,
388    ) {
389        let msg = serde_json::to_vec(&message).expect("failed to serialize log message");
390        match socket_stream_send(connection, &msg)
391            .await
392            .wrap_err("failed to send log message to dora-coordinator")
393        {
394            Ok(()) => (),
395            Err(err) => tracing::warn!("{err:?}"),
396        }
397    }
398}
399
400pub enum LogDestination {
401    Coordinator { coordinator_connection: TcpStream },
402    Channel { sender: Sender<LogMessage> },
403    Tracing,
404}
405
406enum CowMut<'a, T> {
407    Borrowed(&'a mut T),
408    Owned(T),
409}
410
411impl<T> Deref for CowMut<'_, T> {
412    type Target = T;
413
414    fn deref(&self) -> &Self::Target {
415        match self {
416            CowMut::Borrowed(v) => v,
417            CowMut::Owned(v) => v,
418        }
419    }
420}
421
422impl<T> DerefMut for CowMut<'_, T> {
423    fn deref_mut(&mut self) -> &mut Self::Target {
424        match self {
425            CowMut::Borrowed(v) => v,
426            CowMut::Owned(v) => v,
427        }
428    }
429}
430
431struct Indent<'a>(&'a str);
432
433impl std::fmt::Display for Indent<'_> {
434    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435        for line in self.0.lines() {
436            write!(f, "   {line}")?;
437        }
438        Ok(())
439    }
440}