1use std::{
2 ops::{Deref, DerefMut},
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use dora_core::{
8 build::{BuildLogger, LogLevelOrStdout},
9 config::NodeId,
10 uhlc,
11};
12use dora_message::{
13 BuildId,
14 common::{DaemonId, LogLevel, LogMessage, Timestamped},
15 daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
16};
17use eyre::Context;
18use flume::Sender;
19use tokio::net::TcpStream;
20use uuid::Uuid;
21
22use crate::socket_stream_utils::socket_stream_send;
23
24pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
25 let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string());
26 dataflow_dir.join(format!("log_{node_id}.txt"))
27}
28
29pub struct NodeLogger<'a> {
30 node_id: NodeId,
31 logger: DataflowLogger<'a>,
32}
33
34impl NodeLogger<'_> {
35 pub fn inner(&self) -> &DataflowLogger {
36 &self.logger
37 }
38
39 pub async fn log(
40 &mut self,
41 level: LogLevel,
42 target: Option<String>,
43 message: impl Into<String>,
44 ) {
45 self.logger
46 .log(level, Some(self.node_id.clone()), target, message)
47 .await
48 }
49
50 pub async fn try_clone(&self) -> eyre::Result<NodeLogger<'static>> {
51 Ok(NodeLogger {
52 node_id: self.node_id.clone(),
53 logger: self.logger.try_clone().await?,
54 })
55 }
56}
57
58pub struct DataflowLogger<'a> {
59 dataflow_id: Uuid,
60 logger: CowMut<'a, DaemonLogger>,
61}
62
63impl<'a> DataflowLogger<'a> {
64 pub fn for_node(self, node_id: NodeId) -> NodeLogger<'a> {
65 NodeLogger {
66 node_id,
67 logger: self,
68 }
69 }
70
71 pub fn reborrow(&mut self) -> DataflowLogger {
72 DataflowLogger {
73 dataflow_id: self.dataflow_id,
74 logger: CowMut::Borrowed(&mut self.logger),
75 }
76 }
77
78 pub fn inner(&self) -> &DaemonLogger {
79 &self.logger
80 }
81
82 pub async fn log(
83 &mut self,
84 level: LogLevel,
85 node_id: Option<NodeId>,
86 target: Option<String>,
87 message: impl Into<String>,
88 ) {
89 self.logger
90 .log(level, Some(self.dataflow_id), node_id, target, message)
91 .await
92 }
93
94 pub async fn try_clone(&self) -> eyre::Result<DataflowLogger<'static>> {
95 Ok(DataflowLogger {
96 dataflow_id: self.dataflow_id,
97 logger: CowMut::Owned(self.logger.try_clone().await?),
98 })
99 }
100}
101
102pub struct NodeBuildLogger<'a> {
103 build_id: BuildId,
104 node_id: NodeId,
105 logger: CowMut<'a, DaemonLogger>,
106}
107
108impl NodeBuildLogger<'_> {
109 pub async fn log(
110 &mut self,
111 level: impl Into<LogLevelOrStdout> + Send,
112 message: impl Into<String>,
113 ) {
114 self.logger
115 .log_build(
116 self.build_id,
117 level.into(),
118 None,
119 Some(self.node_id.clone()),
120 message,
121 )
122 .await
123 }
124
125 pub async fn try_clone_impl(&self) -> eyre::Result<NodeBuildLogger<'static>> {
126 Ok(NodeBuildLogger {
127 build_id: self.build_id,
128 node_id: self.node_id.clone(),
129 logger: CowMut::Owned(self.logger.try_clone().await?),
130 })
131 }
132}
133
134impl BuildLogger for NodeBuildLogger<'_> {
135 type Clone = NodeBuildLogger<'static>;
136
137 fn log_message(
138 &mut self,
139 level: impl Into<LogLevelOrStdout> + Send,
140 message: impl Into<String> + Send,
141 ) -> impl std::future::Future<Output = ()> + Send {
142 self.log(level, message)
143 }
144
145 fn try_clone(&self) -> impl std::future::Future<Output = eyre::Result<Self::Clone>> + Send {
146 self.try_clone_impl()
147 }
148}
149
150pub struct DaemonLogger {
151 daemon_id: DaemonId,
152 logger: Logger,
153}
154
155impl DaemonLogger {
156 pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger {
157 DataflowLogger {
158 dataflow_id,
159 logger: CowMut::Borrowed(self),
160 }
161 }
162
163 pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
164 NodeBuildLogger {
165 build_id,
166 node_id,
167 logger: CowMut::Borrowed(self),
168 }
169 }
170
171 pub fn inner(&self) -> &Logger {
172 &self.logger
173 }
174
175 pub async fn log(
176 &mut self,
177 level: LogLevel,
178 dataflow_id: Option<Uuid>,
179 node_id: Option<NodeId>,
180 target: Option<String>,
181 message: impl Into<String>,
182 ) {
183 let message = LogMessage {
184 build_id: None,
185 daemon_id: Some(self.daemon_id.clone()),
186 dataflow_id,
187 node_id,
188 level: level.into(),
189 target,
190 module_path: None,
191 file: None,
192 line: None,
193 message: message.into(),
194 timestamp: self
195 .logger
196 .clock
197 .new_timestamp()
198 .get_time()
199 .to_system_time()
200 .into(),
201
202 fields: None,
203 };
204 self.logger.log(message).await
205 }
206
207 pub async fn log_build(
208 &mut self,
209 build_id: BuildId,
210 level: LogLevelOrStdout,
211 target: Option<String>,
212 node_id: Option<NodeId>,
213 message: impl Into<String>,
214 ) {
215 let message = LogMessage {
216 build_id: Some(build_id),
217 daemon_id: Some(self.daemon_id.clone()),
218 dataflow_id: None,
219 node_id,
220 level,
221 target,
222 module_path: None,
223 file: None,
224 line: None,
225 message: message.into(),
226 timestamp: self
227 .logger
228 .clock
229 .new_timestamp()
230 .get_time()
231 .to_system_time()
232 .into(),
233 fields: None,
234 };
235 self.logger.log(message).await
236 }
237
238 pub(crate) fn daemon_id(&self) -> &DaemonId {
239 &self.daemon_id
240 }
241
242 pub async fn try_clone(&self) -> eyre::Result<Self> {
243 Ok(Self {
244 daemon_id: self.daemon_id.clone(),
245 logger: self.logger.try_clone().await?,
246 })
247 }
248}
249
250pub struct Logger {
251 pub(super) destination: LogDestination,
252 pub(super) daemon_id: DaemonId,
253 pub(super) clock: Arc<uhlc::HLC>,
254}
255
256impl Logger {
257 pub fn for_daemon(self, daemon_id: DaemonId) -> DaemonLogger {
258 DaemonLogger {
259 daemon_id,
260 logger: self,
261 }
262 }
263
264 pub async fn log(&mut self, message: LogMessage) {
265 match &mut self.destination {
266 LogDestination::Coordinator {
267 coordinator_connection,
268 } => {
269 let message = Timestamped {
270 inner: CoordinatorRequest::Event {
271 daemon_id: self.daemon_id.clone(),
272 event: DaemonEvent::Log(message.clone()),
273 },
274 timestamp: self.clock.new_timestamp(),
275 };
276 Self::log_to_coordinator(message, coordinator_connection).await
277 }
278 LogDestination::Channel { sender } => {
279 let _ = sender.send_async(message).await;
280 }
281 LogDestination::Tracing => {
282 match message.level {
284 LogLevelOrStdout::Stdout => {
285 tracing::info!(
286 build_id = ?message.build_id.map(|id| id.to_string()),
287 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
288 node_id = ?message.node_id.map(|id| id.to_string()),
289 target = message.target,
290 module_path = message.module_path,
291 file = message.file,
292 line = message.line,
293 "{}",
294 Indent(&message.message)
295 )
296 }
297 LogLevelOrStdout::LogLevel(level) => match level {
298 LogLevel::Error => {
299 tracing::error!(
300 build_id = ?message.build_id.map(|id| id.to_string()),
301 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
302 node_id = ?message.node_id.map(|id| id.to_string()),
303 target = message.target,
304 module_path = message.module_path,
305 file = message.file,
306 line = message.line,
307 "{}",
308 Indent(&message.message)
309 );
310 }
311 LogLevel::Warn => {
312 tracing::warn!(
313 build_id = ?message.build_id.map(|id| id.to_string()),
314 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
315 node_id = ?message.node_id.map(|id| id.to_string()),
316 target = message.target,
317 module_path = message.module_path,
318 file = message.file,
319 line = message.line,
320 "{}",
321 Indent(&message.message)
322 );
323 }
324 LogLevel::Info => {
325 tracing::info!(
326 build_id = ?message.build_id.map(|id| id.to_string()),
327 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
328 node_id = ?message.node_id.map(|id| id.to_string()),
329 target = message.target,
330 module_path = message.module_path,
331 file = message.file,
332 line = message.line,
333 "{}",
334 Indent(&message.message)
335 );
336 }
337 LogLevel::Debug => {
338 tracing::debug!(
339 build_id = ?message.build_id.map(|id| id.to_string()),
340 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
341 node_id = ?message.node_id.map(|id| id.to_string()),
342 target = message.target,
343 module_path = message.module_path,
344 file = message.file,
345 line = message.line,
346 "{}",
347 Indent(&message.message)
348 );
349 }
350 _ => {}
351 },
352 }
353 }
354 }
355 }
356
357 pub async fn try_clone(&self) -> eyre::Result<Self> {
358 let destination = match &self.destination {
359 LogDestination::Coordinator {
360 coordinator_connection,
361 } => {
362 let addr = coordinator_connection
363 .peer_addr()
364 .context("failed to get coordinator peer addr")?;
365 let new_connection = TcpStream::connect(addr)
366 .await
367 .context("failed to connect to coordinator during logger clone")?;
368 LogDestination::Coordinator {
369 coordinator_connection: new_connection,
370 }
371 }
372 LogDestination::Channel { sender } => LogDestination::Channel {
373 sender: sender.clone(),
374 },
375 LogDestination::Tracing => LogDestination::Tracing,
376 };
377
378 Ok(Self {
379 destination,
380 daemon_id: self.daemon_id.clone(),
381 clock: self.clock.clone(),
382 })
383 }
384
385 async fn log_to_coordinator(
386 message: Timestamped<CoordinatorRequest>,
387 connection: &mut TcpStream,
388 ) {
389 let msg = serde_json::to_vec(&message).expect("failed to serialize log message");
390 match socket_stream_send(connection, &msg)
391 .await
392 .wrap_err("failed to send log message to dora-coordinator")
393 {
394 Ok(()) => (),
395 Err(err) => tracing::warn!("{err:?}"),
396 }
397 }
398}
399
400pub enum LogDestination {
401 Coordinator { coordinator_connection: TcpStream },
402 Channel { sender: Sender<LogMessage> },
403 Tracing,
404}
405
406enum CowMut<'a, T> {
407 Borrowed(&'a mut T),
408 Owned(T),
409}
410
411impl<T> Deref for CowMut<'_, T> {
412 type Target = T;
413
414 fn deref(&self) -> &Self::Target {
415 match self {
416 CowMut::Borrowed(v) => v,
417 CowMut::Owned(v) => v,
418 }
419 }
420}
421
422impl<T> DerefMut for CowMut<'_, T> {
423 fn deref_mut(&mut self) -> &mut Self::Target {
424 match self {
425 CowMut::Borrowed(v) => v,
426 CowMut::Owned(v) => v,
427 }
428 }
429}
430
431struct Indent<'a>(&'a str);
432
433impl std::fmt::Display for Indent<'_> {
434 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435 for line in self.0.lines() {
436 write!(f, " {line}")?;
437 }
438 Ok(())
439 }
440}