Skip to main content

tracing_cloudwatch/
dispatch.rs

1use crate::{
2    CloudWatchClient,
3    export::{BatchExporter, ExportConfig},
4    guard::ShutdownSignal,
5};
6
7use chrono::{DateTime, Utc};
8use tokio::sync::{
9    mpsc::{self, UnboundedSender},
10    oneshot,
11};
12use tracing::instrument::WithSubscriber;
13
14pub trait Dispatcher {
15    fn dispatch(&self, input: LogEvent);
16}
17
18#[derive(Debug)]
19pub struct LogEvent {
20    pub message: String,
21    pub timestamp: DateTime<Utc>,
22}
23
24pub struct NoopDispatcher {}
25
26impl Dispatcher for NoopDispatcher {
27    fn dispatch(&self, _event: LogEvent) {}
28}
29
30impl NoopDispatcher {
31    pub(crate) fn new() -> Self {
32        Self {}
33    }
34}
35
36pub struct CloudWatchDispatcher {
37    tx: UnboundedSender<LogEvent>,
38}
39
40impl CloudWatchDispatcher {
41    pub(crate) fn new<C>(
42        client: C,
43        export_config: ExportConfig,
44        shutdown_rx: oneshot::Receiver<ShutdownSignal>,
45    ) -> Self
46    where
47        C: CloudWatchClient + Send + Sync + 'static,
48    {
49        // Should use bounded channel?
50        let (tx, rx) = mpsc::unbounded_channel();
51        let exporter = BatchExporter::new(client, export_config);
52
53        tokio::spawn(
54            exporter
55                .run(rx, shutdown_rx)
56                // Override the subscriber for the exporter to prevent recursively
57                // tracing new events from sdk calls within the exporter
58                .with_subscriber(tracing::dispatcher::Dispatch::none()),
59        );
60
61        Self { tx }
62    }
63}
64
65impl Dispatcher for CloudWatchDispatcher {
66    fn dispatch(&self, event: LogEvent) {
67        // The exporter can already be shutting down when late logs arrive.
68        // Drop them instead of panicking the application.
69        let _ = self.tx.send(event);
70    }
71}
72
73impl std::io::Write for &NoopDispatcher {
74    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
75        Ok(buf.len())
76    }
77
78    fn flush(&mut self) -> std::io::Result<()> {
79        Ok(())
80    }
81}
82
83impl std::io::Write for &CloudWatchDispatcher {
84    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
85        let timestamp = Utc::now();
86        let message = String::from_utf8_lossy(buf).to_string();
87
88        self.dispatch(LogEvent { message, timestamp });
89
90        Ok(buf.len())
91    }
92
93    fn flush(&mut self) -> std::io::Result<()> {
94        Ok(())
95    }
96}