tracing_cloudwatch/
dispatch.rs1use crate::{
2 CloudWatchClient,
3 export::{BatchExporter, ExportConfig},
4 guard::ShutdownSignal,
5};
6
7use chrono::{DateTime, Utc};
8use tokio::sync::{
9 mpsc::{self, UnboundedSender},
10 oneshot,
11};
12use tracing::instrument::WithSubscriber;
13
14pub trait Dispatcher {
15 fn dispatch(&self, input: LogEvent);
16}
17
18#[derive(Debug)]
19pub struct LogEvent {
20 pub message: String,
21 pub timestamp: DateTime<Utc>,
22}
23
24pub struct NoopDispatcher {}
25
26impl Dispatcher for NoopDispatcher {
27 fn dispatch(&self, _event: LogEvent) {}
28}
29
30impl NoopDispatcher {
31 pub(crate) fn new() -> Self {
32 Self {}
33 }
34}
35
36pub struct CloudWatchDispatcher {
37 tx: UnboundedSender<LogEvent>,
38}
39
40impl CloudWatchDispatcher {
41 pub(crate) fn new<C>(
42 client: C,
43 export_config: ExportConfig,
44 shutdown_rx: oneshot::Receiver<ShutdownSignal>,
45 ) -> Self
46 where
47 C: CloudWatchClient + Send + Sync + 'static,
48 {
49 let (tx, rx) = mpsc::unbounded_channel();
51 let exporter = BatchExporter::new(client, export_config);
52
53 tokio::spawn(
54 exporter
55 .run(rx, shutdown_rx)
56 .with_subscriber(tracing::dispatcher::Dispatch::none()),
59 );
60
61 Self { tx }
62 }
63}
64
65impl Dispatcher for CloudWatchDispatcher {
66 fn dispatch(&self, event: LogEvent) {
67 let _ = self.tx.send(event);
70 }
71}
72
73impl std::io::Write for &NoopDispatcher {
74 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
75 Ok(buf.len())
76 }
77
78 fn flush(&mut self) -> std::io::Result<()> {
79 Ok(())
80 }
81}
82
83impl std::io::Write for &CloudWatchDispatcher {
84 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
85 let timestamp = Utc::now();
86 let message = String::from_utf8_lossy(buf).to_string();
87
88 self.dispatch(LogEvent { message, timestamp });
89
90 Ok(buf.len())
91 }
92
93 fn flush(&mut self) -> std::io::Result<()> {
94 Ok(())
95 }
96}