dora_daemon/
log.rs

1use std::{
2    ops::{Deref, DerefMut},
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use dora_core::{
8    build::{BuildLogger, LogLevelOrStdout},
9    config::NodeId,
10    uhlc,
11};
12use dora_message::{
13    BuildId,
14    common::{DaemonId, LogLevel, LogMessage, Timestamped},
15    daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
16};
17use eyre::Context;
18use flume::Sender;
19use tokio::net::TcpStream;
20use uuid::Uuid;
21
22use crate::socket_stream_utils::socket_stream_send;
23
24pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
25    let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string());
26    dataflow_dir.join(format!("log_{node_id}.txt"))
27}
28
29pub struct NodeLogger<'a> {
30    node_id: NodeId,
31    logger: DataflowLogger<'a>,
32}
33
34impl NodeLogger<'_> {
35    pub fn inner(&self) -> &DataflowLogger {
36        &self.logger
37    }
38
39    pub async fn log(
40        &mut self,
41        level: LogLevel,
42        target: Option<String>,
43        message: impl Into<String>,
44    ) {
45        self.logger
46            .log(level, Some(self.node_id.clone()), target, message)
47            .await
48    }
49
50    pub async fn try_clone(&self) -> eyre::Result<NodeLogger<'static>> {
51        Ok(NodeLogger {
52            node_id: self.node_id.clone(),
53            logger: self.logger.try_clone().await?,
54        })
55    }
56}
57
58pub struct DataflowLogger<'a> {
59    dataflow_id: Uuid,
60    logger: CowMut<'a, DaemonLogger>,
61}
62
63impl<'a> DataflowLogger<'a> {
64    pub fn for_node(self, node_id: NodeId) -> NodeLogger<'a> {
65        NodeLogger {
66            node_id,
67            logger: self,
68        }
69    }
70
71    pub fn reborrow(&mut self) -> DataflowLogger {
72        DataflowLogger {
73            dataflow_id: self.dataflow_id,
74            logger: CowMut::Borrowed(&mut self.logger),
75        }
76    }
77
78    pub fn inner(&self) -> &DaemonLogger {
79        &self.logger
80    }
81
82    pub async fn log(
83        &mut self,
84        level: LogLevel,
85        node_id: Option<NodeId>,
86        target: Option<String>,
87        message: impl Into<String>,
88    ) {
89        self.logger
90            .log(level, Some(self.dataflow_id), node_id, target, message)
91            .await
92    }
93
94    pub async fn try_clone(&self) -> eyre::Result<DataflowLogger<'static>> {
95        Ok(DataflowLogger {
96            dataflow_id: self.dataflow_id,
97            logger: CowMut::Owned(self.logger.try_clone().await?),
98        })
99    }
100}
101
102pub struct NodeBuildLogger<'a> {
103    build_id: BuildId,
104    node_id: NodeId,
105    logger: CowMut<'a, DaemonLogger>,
106}
107
108impl NodeBuildLogger<'_> {
109    pub async fn log(
110        &mut self,
111        level: impl Into<LogLevelOrStdout> + Send,
112        message: impl Into<String>,
113    ) {
114        self.logger
115            .log_build(
116                self.build_id,
117                level.into(),
118                None,
119                Some(self.node_id.clone()),
120                message,
121            )
122            .await
123    }
124
125    pub async fn try_clone_impl(&self) -> eyre::Result<NodeBuildLogger<'static>> {
126        Ok(NodeBuildLogger {
127            build_id: self.build_id,
128            node_id: self.node_id.clone(),
129            logger: CowMut::Owned(self.logger.try_clone().await?),
130        })
131    }
132}
133
134impl BuildLogger for NodeBuildLogger<'_> {
135    type Clone = NodeBuildLogger<'static>;
136
137    fn log_message(
138        &mut self,
139        level: impl Into<LogLevelOrStdout> + Send,
140        message: impl Into<String> + Send,
141    ) -> impl std::future::Future<Output = ()> + Send {
142        self.log(level, message)
143    }
144
145    fn try_clone(&self) -> impl std::future::Future<Output = eyre::Result<Self::Clone>> + Send {
146        self.try_clone_impl()
147    }
148}
149
150pub struct DaemonLogger {
151    daemon_id: DaemonId,
152    logger: Logger,
153}
154
155impl DaemonLogger {
156    pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger {
157        DataflowLogger {
158            dataflow_id,
159            logger: CowMut::Borrowed(self),
160        }
161    }
162
163    pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
164        NodeBuildLogger {
165            build_id,
166            node_id,
167            logger: CowMut::Borrowed(self),
168        }
169    }
170
171    pub fn inner(&self) -> &Logger {
172        &self.logger
173    }
174
175    pub async fn log(
176        &mut self,
177        level: LogLevel,
178        dataflow_id: Option<Uuid>,
179        node_id: Option<NodeId>,
180        target: Option<String>,
181        message: impl Into<String>,
182    ) {
183        let message = LogMessage {
184            build_id: None,
185            daemon_id: Some(self.daemon_id.clone()),
186            dataflow_id,
187            node_id,
188            level: level.into(),
189            target,
190            module_path: None,
191            file: None,
192            line: None,
193            message: message.into(),
194        };
195        self.logger.log(message).await
196    }
197
198    pub async fn log_build(
199        &mut self,
200        build_id: BuildId,
201        level: LogLevelOrStdout,
202        target: Option<String>,
203        node_id: Option<NodeId>,
204        message: impl Into<String>,
205    ) {
206        let message = LogMessage {
207            build_id: Some(build_id),
208            daemon_id: Some(self.daemon_id.clone()),
209            dataflow_id: None,
210            node_id,
211            level,
212            target,
213            module_path: None,
214            file: None,
215            line: None,
216            message: message.into(),
217        };
218        self.logger.log(message).await
219    }
220
221    pub(crate) fn daemon_id(&self) -> &DaemonId {
222        &self.daemon_id
223    }
224
225    pub async fn try_clone(&self) -> eyre::Result<Self> {
226        Ok(Self {
227            daemon_id: self.daemon_id.clone(),
228            logger: self.logger.try_clone().await?,
229        })
230    }
231}
232
233pub struct Logger {
234    pub(super) destination: LogDestination,
235    pub(super) daemon_id: DaemonId,
236    pub(super) clock: Arc<uhlc::HLC>,
237}
238
239impl Logger {
240    pub fn for_daemon(self, daemon_id: DaemonId) -> DaemonLogger {
241        DaemonLogger {
242            daemon_id,
243            logger: self,
244        }
245    }
246
247    pub async fn log(&mut self, message: LogMessage) {
248        match &mut self.destination {
249            LogDestination::Coordinator {
250                coordinator_connection,
251            } => {
252                let message = Timestamped {
253                    inner: CoordinatorRequest::Event {
254                        daemon_id: self.daemon_id.clone(),
255                        event: DaemonEvent::Log(message.clone()),
256                    },
257                    timestamp: self.clock.new_timestamp(),
258                };
259                Self::log_to_coordinator(message, coordinator_connection).await
260            }
261            LogDestination::Channel { sender } => {
262                let _ = sender.send_async(message).await;
263            }
264            LogDestination::Tracing => {
265                // log message using tracing if reporting to coordinator is not possible
266                match message.level {
267                    LogLevelOrStdout::Stdout => {
268                        tracing::info!(
269                            build_id = ?message.build_id.map(|id| id.to_string()),
270                            dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
271                            node_id = ?message.node_id.map(|id| id.to_string()),
272                            target = message.target,
273                            module_path = message.module_path,
274                            file = message.file,
275                            line = message.line,
276                            "{}",
277                            Indent(&message.message)
278                        )
279                    }
280                    LogLevelOrStdout::LogLevel(level) => match level {
281                        LogLevel::Error => {
282                            tracing::error!(
283                                build_id = ?message.build_id.map(|id| id.to_string()),
284                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
285                                node_id = ?message.node_id.map(|id| id.to_string()),
286                                target = message.target,
287                                module_path = message.module_path,
288                                file = message.file,
289                                line = message.line,
290                                "{}",
291                                Indent(&message.message)
292                            );
293                        }
294                        LogLevel::Warn => {
295                            tracing::warn!(
296                                build_id = ?message.build_id.map(|id| id.to_string()),
297                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
298                                node_id = ?message.node_id.map(|id| id.to_string()),
299                                target = message.target,
300                                module_path = message.module_path,
301                                file = message.file,
302                                line = message.line,
303                                "{}",
304                                Indent(&message.message)
305                            );
306                        }
307                        LogLevel::Info => {
308                            tracing::info!(
309                                build_id = ?message.build_id.map(|id| id.to_string()),
310                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
311                                node_id = ?message.node_id.map(|id| id.to_string()),
312                                target = message.target,
313                                module_path = message.module_path,
314                                file = message.file,
315                                line = message.line,
316                                "{}",
317                                Indent(&message.message)
318                            );
319                        }
320                        LogLevel::Debug => {
321                            tracing::debug!(
322                                build_id = ?message.build_id.map(|id| id.to_string()),
323                                dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
324                                node_id = ?message.node_id.map(|id| id.to_string()),
325                                target = message.target,
326                                module_path = message.module_path,
327                                file = message.file,
328                                line = message.line,
329                                "{}",
330                                Indent(&message.message)
331                            );
332                        }
333                        _ => {}
334                    },
335                }
336            }
337        }
338    }
339
340    pub async fn try_clone(&self) -> eyre::Result<Self> {
341        let destination = match &self.destination {
342            LogDestination::Coordinator {
343                coordinator_connection,
344            } => {
345                let addr = coordinator_connection
346                    .peer_addr()
347                    .context("failed to get coordinator peer addr")?;
348                let new_connection = TcpStream::connect(addr)
349                    .await
350                    .context("failed to connect to coordinator during logger clone")?;
351                LogDestination::Coordinator {
352                    coordinator_connection: new_connection,
353                }
354            }
355            LogDestination::Channel { sender } => LogDestination::Channel {
356                sender: sender.clone(),
357            },
358            LogDestination::Tracing => LogDestination::Tracing,
359        };
360
361        Ok(Self {
362            destination,
363            daemon_id: self.daemon_id.clone(),
364            clock: self.clock.clone(),
365        })
366    }
367
368    async fn log_to_coordinator(
369        message: Timestamped<CoordinatorRequest>,
370        connection: &mut TcpStream,
371    ) {
372        let msg = serde_json::to_vec(&message).expect("failed to serialize log message");
373        match socket_stream_send(connection, &msg)
374            .await
375            .wrap_err("failed to send log message to dora-coordinator")
376        {
377            Ok(()) => (),
378            Err(err) => tracing::warn!("{err:?}"),
379        }
380    }
381}
382
383pub enum LogDestination {
384    Coordinator { coordinator_connection: TcpStream },
385    Channel { sender: Sender<LogMessage> },
386    Tracing,
387}
388
389enum CowMut<'a, T> {
390    Borrowed(&'a mut T),
391    Owned(T),
392}
393
394impl<T> Deref for CowMut<'_, T> {
395    type Target = T;
396
397    fn deref(&self) -> &Self::Target {
398        match self {
399            CowMut::Borrowed(v) => v,
400            CowMut::Owned(v) => v,
401        }
402    }
403}
404
405impl<T> DerefMut for CowMut<'_, T> {
406    fn deref_mut(&mut self) -> &mut Self::Target {
407        match self {
408            CowMut::Borrowed(v) => v,
409            CowMut::Owned(v) => v,
410        }
411    }
412}
413
414struct Indent<'a>(&'a str);
415
416impl std::fmt::Display for Indent<'_> {
417    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418        for line in self.0.lines() {
419            write!(f, "   {line}")?;
420        }
421        Ok(())
422    }
423}