1use std::{
2 ops::{Deref, DerefMut},
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use dora_core::{
8 build::{BuildLogger, LogLevelOrStdout},
9 config::NodeId,
10 uhlc,
11};
12use dora_message::{
13 BuildId,
14 common::{DaemonId, LogLevel, LogMessage, Timestamped},
15 daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
16};
17use eyre::Context;
18use flume::Sender;
19use tokio::net::TcpStream;
20use uuid::Uuid;
21
22use crate::socket_stream_utils::socket_stream_send;
23
24pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
25 let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string());
26 dataflow_dir.join(format!("log_{node_id}.txt"))
27}
28
29pub struct NodeLogger<'a> {
30 node_id: NodeId,
31 logger: DataflowLogger<'a>,
32}
33
34impl NodeLogger<'_> {
35 pub fn inner(&self) -> &DataflowLogger {
36 &self.logger
37 }
38
39 pub async fn log(
40 &mut self,
41 level: LogLevel,
42 target: Option<String>,
43 message: impl Into<String>,
44 ) {
45 self.logger
46 .log(level, Some(self.node_id.clone()), target, message)
47 .await
48 }
49
50 pub async fn try_clone(&self) -> eyre::Result<NodeLogger<'static>> {
51 Ok(NodeLogger {
52 node_id: self.node_id.clone(),
53 logger: self.logger.try_clone().await?,
54 })
55 }
56}
57
58pub struct DataflowLogger<'a> {
59 dataflow_id: Uuid,
60 logger: CowMut<'a, DaemonLogger>,
61}
62
63impl<'a> DataflowLogger<'a> {
64 pub fn for_node(self, node_id: NodeId) -> NodeLogger<'a> {
65 NodeLogger {
66 node_id,
67 logger: self,
68 }
69 }
70
71 pub fn reborrow(&mut self) -> DataflowLogger {
72 DataflowLogger {
73 dataflow_id: self.dataflow_id,
74 logger: CowMut::Borrowed(&mut self.logger),
75 }
76 }
77
78 pub fn inner(&self) -> &DaemonLogger {
79 &self.logger
80 }
81
82 pub async fn log(
83 &mut self,
84 level: LogLevel,
85 node_id: Option<NodeId>,
86 target: Option<String>,
87 message: impl Into<String>,
88 ) {
89 self.logger
90 .log(level, Some(self.dataflow_id), node_id, target, message)
91 .await
92 }
93
94 pub async fn try_clone(&self) -> eyre::Result<DataflowLogger<'static>> {
95 Ok(DataflowLogger {
96 dataflow_id: self.dataflow_id,
97 logger: CowMut::Owned(self.logger.try_clone().await?),
98 })
99 }
100}
101
102pub struct NodeBuildLogger<'a> {
103 build_id: BuildId,
104 node_id: NodeId,
105 logger: CowMut<'a, DaemonLogger>,
106}
107
108impl NodeBuildLogger<'_> {
109 pub async fn log(
110 &mut self,
111 level: impl Into<LogLevelOrStdout> + Send,
112 message: impl Into<String>,
113 ) {
114 self.logger
115 .log_build(
116 self.build_id,
117 level.into(),
118 None,
119 Some(self.node_id.clone()),
120 message,
121 )
122 .await
123 }
124
125 pub async fn try_clone_impl(&self) -> eyre::Result<NodeBuildLogger<'static>> {
126 Ok(NodeBuildLogger {
127 build_id: self.build_id,
128 node_id: self.node_id.clone(),
129 logger: CowMut::Owned(self.logger.try_clone().await?),
130 })
131 }
132}
133
134impl BuildLogger for NodeBuildLogger<'_> {
135 type Clone = NodeBuildLogger<'static>;
136
137 fn log_message(
138 &mut self,
139 level: impl Into<LogLevelOrStdout> + Send,
140 message: impl Into<String> + Send,
141 ) -> impl std::future::Future<Output = ()> + Send {
142 self.log(level, message)
143 }
144
145 fn try_clone(&self) -> impl std::future::Future<Output = eyre::Result<Self::Clone>> + Send {
146 self.try_clone_impl()
147 }
148}
149
150pub struct DaemonLogger {
151 daemon_id: DaemonId,
152 logger: Logger,
153}
154
155impl DaemonLogger {
156 pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger {
157 DataflowLogger {
158 dataflow_id,
159 logger: CowMut::Borrowed(self),
160 }
161 }
162
163 pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
164 NodeBuildLogger {
165 build_id,
166 node_id,
167 logger: CowMut::Borrowed(self),
168 }
169 }
170
171 pub fn inner(&self) -> &Logger {
172 &self.logger
173 }
174
175 pub async fn log(
176 &mut self,
177 level: LogLevel,
178 dataflow_id: Option<Uuid>,
179 node_id: Option<NodeId>,
180 target: Option<String>,
181 message: impl Into<String>,
182 ) {
183 let message = LogMessage {
184 build_id: None,
185 daemon_id: Some(self.daemon_id.clone()),
186 dataflow_id,
187 node_id,
188 level: level.into(),
189 target,
190 module_path: None,
191 file: None,
192 line: None,
193 message: message.into(),
194 };
195 self.logger.log(message).await
196 }
197
198 pub async fn log_build(
199 &mut self,
200 build_id: BuildId,
201 level: LogLevelOrStdout,
202 target: Option<String>,
203 node_id: Option<NodeId>,
204 message: impl Into<String>,
205 ) {
206 let message = LogMessage {
207 build_id: Some(build_id),
208 daemon_id: Some(self.daemon_id.clone()),
209 dataflow_id: None,
210 node_id,
211 level,
212 target,
213 module_path: None,
214 file: None,
215 line: None,
216 message: message.into(),
217 };
218 self.logger.log(message).await
219 }
220
221 pub(crate) fn daemon_id(&self) -> &DaemonId {
222 &self.daemon_id
223 }
224
225 pub async fn try_clone(&self) -> eyre::Result<Self> {
226 Ok(Self {
227 daemon_id: self.daemon_id.clone(),
228 logger: self.logger.try_clone().await?,
229 })
230 }
231}
232
233pub struct Logger {
234 pub(super) destination: LogDestination,
235 pub(super) daemon_id: DaemonId,
236 pub(super) clock: Arc<uhlc::HLC>,
237}
238
239impl Logger {
240 pub fn for_daemon(self, daemon_id: DaemonId) -> DaemonLogger {
241 DaemonLogger {
242 daemon_id,
243 logger: self,
244 }
245 }
246
247 pub async fn log(&mut self, message: LogMessage) {
248 match &mut self.destination {
249 LogDestination::Coordinator {
250 coordinator_connection,
251 } => {
252 let message = Timestamped {
253 inner: CoordinatorRequest::Event {
254 daemon_id: self.daemon_id.clone(),
255 event: DaemonEvent::Log(message.clone()),
256 },
257 timestamp: self.clock.new_timestamp(),
258 };
259 Self::log_to_coordinator(message, coordinator_connection).await
260 }
261 LogDestination::Channel { sender } => {
262 let _ = sender.send_async(message).await;
263 }
264 LogDestination::Tracing => {
265 match message.level {
267 LogLevelOrStdout::Stdout => {
268 tracing::info!(
269 build_id = ?message.build_id.map(|id| id.to_string()),
270 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
271 node_id = ?message.node_id.map(|id| id.to_string()),
272 target = message.target,
273 module_path = message.module_path,
274 file = message.file,
275 line = message.line,
276 "{}",
277 Indent(&message.message)
278 )
279 }
280 LogLevelOrStdout::LogLevel(level) => match level {
281 LogLevel::Error => {
282 tracing::error!(
283 build_id = ?message.build_id.map(|id| id.to_string()),
284 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
285 node_id = ?message.node_id.map(|id| id.to_string()),
286 target = message.target,
287 module_path = message.module_path,
288 file = message.file,
289 line = message.line,
290 "{}",
291 Indent(&message.message)
292 );
293 }
294 LogLevel::Warn => {
295 tracing::warn!(
296 build_id = ?message.build_id.map(|id| id.to_string()),
297 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
298 node_id = ?message.node_id.map(|id| id.to_string()),
299 target = message.target,
300 module_path = message.module_path,
301 file = message.file,
302 line = message.line,
303 "{}",
304 Indent(&message.message)
305 );
306 }
307 LogLevel::Info => {
308 tracing::info!(
309 build_id = ?message.build_id.map(|id| id.to_string()),
310 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
311 node_id = ?message.node_id.map(|id| id.to_string()),
312 target = message.target,
313 module_path = message.module_path,
314 file = message.file,
315 line = message.line,
316 "{}",
317 Indent(&message.message)
318 );
319 }
320 LogLevel::Debug => {
321 tracing::debug!(
322 build_id = ?message.build_id.map(|id| id.to_string()),
323 dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
324 node_id = ?message.node_id.map(|id| id.to_string()),
325 target = message.target,
326 module_path = message.module_path,
327 file = message.file,
328 line = message.line,
329 "{}",
330 Indent(&message.message)
331 );
332 }
333 _ => {}
334 },
335 }
336 }
337 }
338 }
339
340 pub async fn try_clone(&self) -> eyre::Result<Self> {
341 let destination = match &self.destination {
342 LogDestination::Coordinator {
343 coordinator_connection,
344 } => {
345 let addr = coordinator_connection
346 .peer_addr()
347 .context("failed to get coordinator peer addr")?;
348 let new_connection = TcpStream::connect(addr)
349 .await
350 .context("failed to connect to coordinator during logger clone")?;
351 LogDestination::Coordinator {
352 coordinator_connection: new_connection,
353 }
354 }
355 LogDestination::Channel { sender } => LogDestination::Channel {
356 sender: sender.clone(),
357 },
358 LogDestination::Tracing => LogDestination::Tracing,
359 };
360
361 Ok(Self {
362 destination,
363 daemon_id: self.daemon_id.clone(),
364 clock: self.clock.clone(),
365 })
366 }
367
368 async fn log_to_coordinator(
369 message: Timestamped<CoordinatorRequest>,
370 connection: &mut TcpStream,
371 ) {
372 let msg = serde_json::to_vec(&message).expect("failed to serialize log message");
373 match socket_stream_send(connection, &msg)
374 .await
375 .wrap_err("failed to send log message to dora-coordinator")
376 {
377 Ok(()) => (),
378 Err(err) => tracing::warn!("{err:?}"),
379 }
380 }
381}
382
383pub enum LogDestination {
384 Coordinator { coordinator_connection: TcpStream },
385 Channel { sender: Sender<LogMessage> },
386 Tracing,
387}
388
389enum CowMut<'a, T> {
390 Borrowed(&'a mut T),
391 Owned(T),
392}
393
394impl<T> Deref for CowMut<'_, T> {
395 type Target = T;
396
397 fn deref(&self) -> &Self::Target {
398 match self {
399 CowMut::Borrowed(v) => v,
400 CowMut::Owned(v) => v,
401 }
402 }
403}
404
405impl<T> DerefMut for CowMut<'_, T> {
406 fn deref_mut(&mut self) -> &mut Self::Target {
407 match self {
408 CowMut::Borrowed(v) => v,
409 CowMut::Owned(v) => v,
410 }
411 }
412}
413
414struct Indent<'a>(&'a str);
415
416impl std::fmt::Display for Indent<'_> {
417 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418 for line in self.0.lines() {
419 write!(f, " {line}")?;
420 }
421 Ok(())
422 }
423}