tracing_cloudwatch/
dispatch.rs1use crate::{
2 CloudWatchClient,
3 export::{BatchExporter, ExportConfig},
4 guard::ShutdownSignal,
5};
6
7use chrono::{DateTime, Utc};
8use tokio::sync::{
9 mpsc::{self, UnboundedSender},
10 oneshot,
11};
12
13pub trait Dispatcher {
14 fn dispatch(&self, input: LogEvent);
15}
16
17#[derive(Debug)]
18pub struct LogEvent {
19 pub message: String,
20 pub timestamp: DateTime<Utc>,
21}
22
23pub struct NoopDispatcher {}
24
25impl Dispatcher for NoopDispatcher {
26 fn dispatch(&self, _event: LogEvent) {}
27}
28
29impl NoopDispatcher {
30 pub(crate) fn new() -> Self {
31 Self {}
32 }
33}
34
35pub struct CloudWatchDispatcher {
36 tx: UnboundedSender<LogEvent>,
37}
38
39impl CloudWatchDispatcher {
40 pub(crate) fn new<C>(
41 client: C,
42 export_config: ExportConfig,
43 shutdown_rx: oneshot::Receiver<ShutdownSignal>,
44 ) -> Self
45 where
46 C: CloudWatchClient + Send + Sync + 'static,
47 {
48 let (tx, rx) = mpsc::unbounded_channel();
50 let exporter = BatchExporter::new(client, export_config);
51
52 tokio::spawn(exporter.run(rx, shutdown_rx));
53
54 Self { tx }
55 }
56}
57
58impl Dispatcher for CloudWatchDispatcher {
59 fn dispatch(&self, event: LogEvent) {
60 let _ = self.tx.send(event);
63 }
64}
65
66impl std::io::Write for &NoopDispatcher {
67 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
68 Ok(buf.len())
69 }
70
71 fn flush(&mut self) -> std::io::Result<()> {
72 Ok(())
73 }
74}
75
76impl std::io::Write for &CloudWatchDispatcher {
77 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
78 let timestamp = Utc::now();
79 let message = String::from_utf8_lossy(buf).to_string();
80
81 self.dispatch(LogEvent { message, timestamp });
82
83 Ok(buf.len())
84 }
85
86 fn flush(&mut self) -> std::io::Result<()> {
87 Ok(())
88 }
89}