Skip to main content

effectful_logger/
pipeline.rs

1//! Composable log backends: tracing, JSON ([`serde_json`]), and plain structured lines.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::fmt::Write as _;
6use std::io::Write;
7use std::sync::{Arc, Mutex, RwLock};
8
9use ::effectful::EffectHashMap;
10use serde::Serialize;
11
12use crate::{EffectLoggerError, LogLevel};
13
14/// One log event passed to each backend in a [`CompositeLogBackend`].
15#[derive(Debug, Clone)]
16pub struct LogRecord<'a> {
17  /// Severity of this line.
18  pub level: LogLevel,
19  /// Human-readable message body.
20  pub message: Cow<'a, str>,
21  /// Structured key/value fields merged into the formatted line or JSON row.
22  pub annotations: EffectHashMap<String, String>,
23  /// Active span names from outermost to innermost (for nesting display).
24  pub spans: Vec<String>,
25}
26
27/// Sink for [`LogRecord`] values (tracing, JSON file, tests, etc.).
28pub trait LogBackend: Send + Sync {
29  /// Deliver one record to this sink (e.g. write a line, call `tracing`, …).
30  fn emit(&self, rec: &LogRecord<'_>) -> Result<(), EffectLoggerError>;
31}
32
33/// Effect.ts-style composable logger: [`Self::add`], [`Self::replace`], [`Self::remove`].
34///
35/// `Message` and `Output` from the gap doc map to [`LogRecord::message`] and
36/// `Result<(), EffectLoggerError>` respectively.
37pub trait Logger: Send + Sync {
38  /// Append a backend to the ordered fan-out list.
39  fn add(&self, backend: Arc<dyn LogBackend>) -> Result<(), EffectLoggerError>;
40  /// Swap the backend at `idx` without changing list length.
41  fn replace(&self, idx: usize, backend: Arc<dyn LogBackend>) -> Result<(), EffectLoggerError>;
42  /// Remove the backend at `idx`, shifting later entries down.
43  fn remove(&self, idx: usize) -> Result<(), EffectLoggerError>;
44}
45
46/// Thread-safe list of backends; also implements [`LogBackend`] by fan-out.
47pub struct CompositeLogBackend {
48  backends: RwLock<Vec<Arc<dyn LogBackend>>>,
49}
50
51impl Default for CompositeLogBackend {
52  fn default() -> Self {
53    Self::new()
54  }
55}
56
57impl CompositeLogBackend {
58  /// Empty backend list; use [`Logger::add`] to register sinks.
59  pub fn new() -> Self {
60    Self {
61      backends: RwLock::new(Vec::new()),
62    }
63  }
64
65  /// Emit to every registered backend in order.
66  pub fn emit_all(&self, rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
67    let bs: Vec<_> = self
68      .backends
69      .read()
70      .map_err(|e| EffectLoggerError::Sink(format!("composite read lock: {e}")))?
71      .clone();
72    for b in bs {
73      b.emit(rec)?;
74    }
75    Ok(())
76  }
77}
78
79impl Logger for CompositeLogBackend {
80  fn add(&self, backend: Arc<dyn LogBackend>) -> Result<(), EffectLoggerError> {
81    self
82      .backends
83      .write()
84      .map_err(|e| EffectLoggerError::Sink(format!("composite write lock: {e}")))?
85      .push(backend);
86    Ok(())
87  }
88
89  fn replace(&self, idx: usize, backend: Arc<dyn LogBackend>) -> Result<(), EffectLoggerError> {
90    let mut g = self
91      .backends
92      .write()
93      .map_err(|e| EffectLoggerError::Sink(format!("composite write lock: {e}")))?;
94    if idx >= g.len() {
95      return Err(EffectLoggerError::Sink(format!(
96        "logger replace: index {idx} out of bounds (len {})",
97        g.len()
98      )));
99    }
100    g[idx] = backend;
101    Ok(())
102  }
103
104  fn remove(&self, idx: usize) -> Result<(), EffectLoggerError> {
105    let mut g = self
106      .backends
107      .write()
108      .map_err(|e| EffectLoggerError::Sink(format!("composite write lock: {e}")))?;
109    if idx >= g.len() {
110      return Err(EffectLoggerError::Sink(format!(
111        "logger remove: index {idx} out of bounds (len {})",
112        g.len()
113      )));
114    }
115    g.remove(idx);
116    Ok(())
117  }
118}
119
120impl LogBackend for CompositeLogBackend {
121  fn emit(&self, rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
122    self.emit_all(rec)
123  }
124}
125
126fn format_tracing_line(rec: &LogRecord<'_>) -> String {
127  let mut full = String::new();
128  if !rec.spans.is_empty() {
129    let _ = write!(&mut full, "[{}] ", rec.spans.join(" > "));
130  }
131  full.push_str(rec.message.as_ref());
132  for (k, v) in rec.annotations.iter() {
133    let _ = write!(&mut full, " {k}={v}");
134  }
135  full
136}
137
138/// Forwards to the `tracing` crate (same levels as [`crate::EffectLogger`] legacy path).
139#[derive(Debug, Default, Clone, Copy)]
140pub struct TracingLogBackend;
141
142impl LogBackend for TracingLogBackend {
143  fn emit(&self, rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
144    let line = format_tracing_line(rec);
145    match rec.level {
146      LogLevel::Trace => tracing::trace!("{}", line),
147      LogLevel::Debug => tracing::debug!("{}", line),
148      LogLevel::Info => tracing::info!("{}", line),
149      LogLevel::Warn => tracing::warn!("{}", line),
150      LogLevel::Error | LogLevel::Fatal => tracing::error!("{}", line),
151      LogLevel::None => {}
152    }
153    Ok(())
154  }
155}
156
157/// One JSON object per line (`serde_json`), for files or test buffers.
158#[derive(Clone)]
159pub struct JsonLogBackend<W: Write + Send + 'static> {
160  writer: Arc<Mutex<W>>,
161}
162
163impl<W: Write + Send + 'static> JsonLogBackend<W> {
164  /// Wrap `writer`; each [`LogBackend::emit`] appends one JSON object and newline.
165  pub fn new(writer: W) -> Self {
166    Self {
167      writer: Arc::new(Mutex::new(writer)),
168    }
169  }
170
171  /// Clone the shared writer handle (e.g. read back a test [`Vec<u8>`] after logging).
172  pub fn writer_arc(&self) -> Arc<Mutex<W>> {
173    self.writer.clone()
174  }
175}
176
177#[derive(Serialize)]
178struct JsonLine<'a> {
179  level: &'a str,
180  message: &'a str,
181  #[serde(skip_serializing_if = "HashMap::is_empty")]
182  fields: HashMap<&'a str, &'a str>,
183  #[serde(skip_serializing_if = "spans_is_empty")]
184  spans: Vec<String>,
185}
186
187fn spans_is_empty(s: &[String]) -> bool {
188  s.is_empty()
189}
190
191impl<W: Write + Send + 'static> LogBackend for JsonLogBackend<W> {
192  fn emit(&self, rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
193    if rec.level == LogLevel::None {
194      return Ok(());
195    }
196    let mut fields = HashMap::new();
197    for (k, v) in rec.annotations.iter() {
198      fields.insert(k.as_str(), v.as_str());
199    }
200    let row = JsonLine {
201      level: rec.level.as_str(),
202      message: rec.message.as_ref(),
203      fields,
204      spans: rec.spans.clone(),
205    };
206    let mut w = self
207      .writer
208      .lock()
209      .map_err(|e| EffectLoggerError::Sink(format!("json backend mutex: {e}")))?;
210    serde_json::to_writer(&mut *w, &row).map_err(|e| EffectLoggerError::Sink(e.to_string()))?;
211    w.write_all(b"\n")
212      .map_err(|e| EffectLoggerError::Sink(e.to_string()))?;
213    Ok(())
214  }
215}
216
217/// Human-oriented `key=value` lines (no JSON), still machine-grep-friendly.
218#[derive(Clone)]
219pub struct StructuredLogBackend<W: Write + Send + 'static> {
220  writer: Arc<Mutex<W>>,
221}
222
223impl<W: Write + Send + 'static> StructuredLogBackend<W> {
224  /// Wrap `writer`; each emit writes a human-oriented `key=value` line.
225  pub fn new(writer: W) -> Self {
226    Self {
227      writer: Arc::new(Mutex::new(writer)),
228    }
229  }
230
231  /// Shared handle to the underlying writer (e.g. read a test buffer after logging).
232  pub fn writer_arc(&self) -> Arc<Mutex<W>> {
233    self.writer.clone()
234  }
235}
236
237impl<W: Write + Send + 'static> LogBackend for StructuredLogBackend<W> {
238  fn emit(&self, rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
239    if rec.level == LogLevel::None {
240      return Ok(());
241    }
242    let mut w = self
243      .writer
244      .lock()
245      .map_err(|e| EffectLoggerError::Sink(format!("structured backend mutex: {e}")))?;
246    write!(
247      w,
248      "level={} message={:?}",
249      rec.level.as_str(),
250      rec.message.as_ref()
251    )
252    .map_err(|e| EffectLoggerError::Sink(e.to_string()))?;
253    if !rec.spans.is_empty() {
254      write!(w, " spans={:?}", rec.spans.join(">"))
255        .map_err(|e| EffectLoggerError::Sink(e.to_string()))?;
256    }
257    for (k, v) in rec.annotations.iter() {
258      write!(w, " {k}={v:?}").map_err(|e| EffectLoggerError::Sink(e.to_string()))?;
259    }
260    w.write_all(b"\n")
261      .map_err(|e| EffectLoggerError::Sink(e.to_string()))?;
262    Ok(())
263  }
264}
265
266#[cfg(test)]
267mod tests {
268  use super::*;
269  use std::borrow::Cow;
270
271  fn make_record(level: crate::LogLevel, msg: &str) -> LogRecord<'_> {
272    LogRecord {
273      level,
274      message: Cow::Borrowed(msg),
275      annotations: Default::default(),
276      spans: vec![],
277    }
278  }
279
280  fn make_record_with_spans(
281    level: crate::LogLevel,
282    msg: &str,
283    spans: Vec<String>,
284  ) -> LogRecord<'_> {
285    LogRecord {
286      level,
287      message: Cow::Borrowed(msg),
288      annotations: Default::default(),
289      spans,
290    }
291  }
292
293  #[test]
294  fn composite_new_and_default_are_empty() {
295    let c1 = CompositeLogBackend::new();
296    let c2 = CompositeLogBackend::default();
297    let rec = make_record(crate::LogLevel::Info, "msg");
298    assert!(c1.emit_all(&rec).is_ok());
299    assert!(c2.emit_all(&rec).is_ok());
300  }
301
302  #[test]
303  fn composite_add_and_emit_all() {
304    let buf: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
305    let buf2 = buf.clone();
306    struct Capturing(Arc<Mutex<Vec<String>>>);
307    impl LogBackend for Capturing {
308      fn emit(&self, rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
309        self.0.lock().unwrap().push(rec.message.to_string());
310        Ok(())
311      }
312    }
313    let composite = CompositeLogBackend::new();
314    composite.add(Arc::new(Capturing(buf2))).unwrap();
315    let rec = make_record(crate::LogLevel::Info, "hello");
316    composite.emit(&rec).unwrap();
317    assert_eq!(*buf.lock().unwrap(), vec!["hello"]);
318  }
319
320  #[test]
321  fn composite_replace_success_and_out_of_bounds() {
322    struct Noop;
323    impl LogBackend for Noop {
324      fn emit(&self, _rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
325        Ok(())
326      }
327    }
328    let composite = CompositeLogBackend::new();
329    composite.add(Arc::new(Noop)).unwrap();
330    assert!(composite.replace(0, Arc::new(Noop)).is_ok());
331    assert!(composite.replace(99, Arc::new(Noop)).is_err());
332  }
333
334  #[test]
335  fn composite_remove_success_and_out_of_bounds() {
336    struct Noop;
337    impl LogBackend for Noop {
338      fn emit(&self, _rec: &LogRecord<'_>) -> Result<(), EffectLoggerError> {
339        Ok(())
340      }
341    }
342    let composite = CompositeLogBackend::new();
343    composite.add(Arc::new(Noop)).unwrap();
344    assert!(composite.remove(99).is_err());
345    assert!(composite.remove(0).is_ok());
346    assert!(composite.remove(0).is_err());
347  }
348
349  #[test]
350  fn tracing_backend_emits_all_levels_without_error() {
351    let backend = TracingLogBackend;
352    for level in [
353      crate::LogLevel::Trace,
354      crate::LogLevel::Debug,
355      crate::LogLevel::Info,
356      crate::LogLevel::Warn,
357      crate::LogLevel::Error,
358      crate::LogLevel::Fatal,
359      crate::LogLevel::None,
360    ] {
361      let rec = make_record(level, "test message");
362      assert!(backend.emit(&rec).is_ok());
363    }
364  }
365
366  #[test]
367  fn tracing_backend_with_spans_and_annotations() {
368    let backend = TracingLogBackend;
369    let mut rec = make_record_with_spans(
370      crate::LogLevel::Info,
371      "spanmsg",
372      vec!["outer".to_string(), "inner".to_string()],
373    );
374    rec.annotations.insert("key".to_string(), "val".to_string());
375    assert!(backend.emit(&rec).is_ok());
376  }
377
378  #[test]
379  fn json_backend_emits_valid_json_line() {
380    let buf: Vec<u8> = Vec::new();
381    let backend = JsonLogBackend::new(buf);
382    let rec = make_record(crate::LogLevel::Info, "json message");
383    backend.emit(&rec).unwrap();
384    let arc = backend.writer_arc();
385    let out = String::from_utf8(arc.lock().unwrap().clone()).unwrap();
386    assert!(out.contains("json message"), "output: {out}");
387    assert!(out.contains("INFO"), "output: {out}");
388  }
389
390  #[test]
391  fn json_backend_none_level_skips_emit() {
392    let buf: Vec<u8> = Vec::new();
393    let backend = JsonLogBackend::new(buf);
394    let rec = make_record(crate::LogLevel::None, "skip me");
395    backend.emit(&rec).unwrap();
396    let arc = backend.writer_arc();
397    assert!(arc.lock().unwrap().is_empty(), "should skip None level");
398  }
399
400  #[test]
401  fn json_backend_emits_spans_and_fields() {
402    let buf: Vec<u8> = Vec::new();
403    let backend = JsonLogBackend::new(buf);
404    let mut rec =
405      make_record_with_spans(crate::LogLevel::Debug, "with spans", vec!["s1".to_string()]);
406    rec.annotations.insert("foo".to_string(), "bar".to_string());
407    backend.emit(&rec).unwrap();
408    let arc = backend.writer_arc();
409    let out = String::from_utf8(arc.lock().unwrap().clone()).unwrap();
410    assert!(out.contains("s1"), "output: {out}");
411    assert!(out.contains("foo"), "output: {out}");
412  }
413
414  #[test]
415  fn structured_backend_emits_kv_line() {
416    let buf: Vec<u8> = Vec::new();
417    let backend = StructuredLogBackend::new(buf);
418    let mut rec = make_record(crate::LogLevel::Warn, "warn msg");
419    rec.annotations.insert("a".to_string(), "b".to_string());
420    backend.emit(&rec).unwrap();
421    let arc = backend.writer_arc();
422    let out = String::from_utf8(arc.lock().unwrap().clone()).unwrap();
423    assert!(out.contains("warn msg"), "output: {out}");
424  }
425
426  #[test]
427  fn structured_backend_none_level_skips_emit() {
428    let buf: Vec<u8> = Vec::new();
429    let backend = StructuredLogBackend::new(buf);
430    let rec = make_record(crate::LogLevel::None, "skip");
431    backend.emit(&rec).unwrap();
432    let arc = backend.writer_arc();
433    assert!(arc.lock().unwrap().is_empty());
434  }
435
436  #[test]
437  fn structured_backend_with_spans() {
438    let buf: Vec<u8> = Vec::new();
439    let backend = StructuredLogBackend::new(buf);
440    let rec = make_record_with_spans(crate::LogLevel::Error, "err msg", vec!["spn".to_string()]);
441    backend.emit(&rec).unwrap();
442    let arc = backend.writer_arc();
443    let out = String::from_utf8(arc.lock().unwrap().clone()).unwrap();
444    assert!(out.contains("spn"), "output: {out}");
445  }
446
447  #[test]
448  fn json_backend_write_error_returns_sink_error() {
449    use std::io::{self, Write};
450    struct FailWriter;
451    impl Write for FailWriter {
452      fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
453        Err(io::Error::new(io::ErrorKind::BrokenPipe, "test write fail"))
454      }
455      fn flush(&mut self) -> io::Result<()> {
456        Err(io::Error::new(io::ErrorKind::BrokenPipe, "test flush fail"))
457      }
458    }
459    let backend = JsonLogBackend::new(FailWriter);
460    let rec = make_record(crate::LogLevel::Info, "write fails");
461    let result = backend.emit(&rec);
462    assert!(result.is_err(), "expected error, got Ok");
463    assert!(result.unwrap_err().to_string().contains("test write fail"));
464  }
465
466  #[test]
467  fn json_backend_newline_write_error_returns_sink_error() {
468    use std::io::{self, Write};
469    // Succeeds for non-newline writes, fails on the trailing newline
470    struct NoNewlineWriter;
471    impl Write for NoNewlineWriter {
472      fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
473        if buf == b"\n" {
474          Err(io::Error::new(io::ErrorKind::BrokenPipe, "newline fail"))
475        } else {
476          Ok(buf.len())
477        }
478      }
479      fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
480        if buf == b"\n" {
481          Err(io::Error::new(io::ErrorKind::BrokenPipe, "newline fail"))
482        } else {
483          Ok(())
484        }
485      }
486      fn flush(&mut self) -> io::Result<()> {
487        Ok(())
488      }
489    }
490    let backend = JsonLogBackend::new(NoNewlineWriter);
491    let rec = make_record(crate::LogLevel::Info, "newline fails");
492    let result = backend.emit(&rec);
493    assert!(result.is_err(), "expected error on newline write");
494  }
495
496  #[test]
497  fn structured_backend_write_error_returns_sink_error() {
498    use std::io::{self, Write};
499    struct FailWriter;
500    impl Write for FailWriter {
501      fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
502        Err(io::Error::new(
503          io::ErrorKind::BrokenPipe,
504          "structured write fail",
505        ))
506      }
507      fn flush(&mut self) -> io::Result<()> {
508        Ok(())
509      }
510    }
511    let backend = StructuredLogBackend::new(FailWriter);
512    let rec = make_record(crate::LogLevel::Warn, "write fails");
513    let result = backend.emit(&rec);
514    assert!(result.is_err(), "expected error, got Ok");
515    assert!(
516      result
517        .unwrap_err()
518        .to_string()
519        .contains("structured write fail"),
520      "unexpected error message"
521    );
522  }
523}