Skip to main content

tracing_cloudwatch/
dispatch.rs

1use crate::{
2    CloudWatchClient,
3    export::{BatchExporter, ExportConfig},
4    guard::ShutdownSignal,
5};
6
7use chrono::{DateTime, Utc};
8use tokio::sync::{
9    mpsc::{self, UnboundedSender},
10    oneshot,
11};
12
13pub trait Dispatcher {
14    fn dispatch(&self, input: LogEvent);
15}
16
17#[derive(Debug)]
18pub struct LogEvent {
19    pub message: String,
20    pub timestamp: DateTime<Utc>,
21}
22
23pub struct NoopDispatcher {}
24
25impl Dispatcher for NoopDispatcher {
26    fn dispatch(&self, _event: LogEvent) {}
27}
28
29impl NoopDispatcher {
30    pub(crate) fn new() -> Self {
31        Self {}
32    }
33}
34
35pub struct CloudWatchDispatcher {
36    tx: UnboundedSender<LogEvent>,
37}
38
39impl CloudWatchDispatcher {
40    pub(crate) fn new<C>(
41        client: C,
42        export_config: ExportConfig,
43        shutdown_rx: oneshot::Receiver<ShutdownSignal>,
44    ) -> Self
45    where
46        C: CloudWatchClient + Send + Sync + 'static,
47    {
48        // Should use bounded channel?
49        let (tx, rx) = mpsc::unbounded_channel();
50        let exporter = BatchExporter::new(client, export_config);
51
52        tokio::spawn(exporter.run(rx, shutdown_rx));
53
54        Self { tx }
55    }
56}
57
58impl Dispatcher for CloudWatchDispatcher {
59    fn dispatch(&self, event: LogEvent) {
60        // The exporter can already be shutting down when late logs arrive.
61        // Drop them instead of panicking the application.
62        let _ = self.tx.send(event);
63    }
64}
65
66impl std::io::Write for &NoopDispatcher {
67    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
68        Ok(buf.len())
69    }
70
71    fn flush(&mut self) -> std::io::Result<()> {
72        Ok(())
73    }
74}
75
76impl std::io::Write for &CloudWatchDispatcher {
77    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
78        let timestamp = Utc::now();
79        let message = String::from_utf8_lossy(buf).to_string();
80
81        self.dispatch(LogEvent { message, timestamp });
82
83        Ok(buf.len())
84    }
85
86    fn flush(&mut self) -> std::io::Result<()> {
87        Ok(())
88    }
89}