Skip to main content

tracing_cloudwatch/
layer.rs

1use std::sync::Arc;
2
3use tracing_core::{Event, Subscriber, span};
4use tracing_subscriber::{
5    Layer,
6    fmt::{self, MakeWriter, format},
7    layer::Context,
8    registry::LookupSpan,
9};
10
11use crate::{
12    client::CloudWatchClient,
13    dispatch::{CloudWatchDispatcher, Dispatcher, NoopDispatcher},
14    export::ExportConfig,
15    guard::{CloudWatchWorkerGuard, ShutdownSignal},
16};
17
18/// An AWS Cloudwatch propagation layer.
19pub struct CloudWatchLayer<S, D, N = format::DefaultFields, E = format::Format<format::Full, ()>> {
20    fmt_layer: fmt::Layer<S, N, E, Arc<D>>,
21}
22
23/// Construct [CloudWatchLayer] to compose with tracing subscriber.
24pub fn layer<S>() -> CloudWatchLayer<S, NoopDispatcher>
25where
26    S: Subscriber + for<'span> LookupSpan<'span>,
27{
28    CloudWatchLayer::default()
29}
30
31impl<S> Default
32    for CloudWatchLayer<S, NoopDispatcher, format::DefaultFields, format::Format<format::Full, ()>>
33where
34    S: Subscriber + for<'span> LookupSpan<'span>,
35{
36    fn default() -> Self {
37        CloudWatchLayer::<S,NoopDispatcher, format::DefaultFields, format::Format<format::Full,()>>::new(Arc::new(NoopDispatcher::new()))
38    }
39}
40
41impl<S, D> CloudWatchLayer<S, D, format::DefaultFields, format::Format<format::Full, ()>>
42where
43    S: Subscriber + for<'span> LookupSpan<'span>,
44    D: Dispatcher + 'static,
45    Arc<D>: for<'writer> MakeWriter<'writer>,
46{
47    pub fn new(dispatcher: Arc<D>) -> Self {
48        Self {
49            fmt_layer: fmt::Layer::default()
50                .without_time()
51                .with_writer(dispatcher)
52                .with_ansi(false)
53                .with_level(true)
54                .with_line_number(true)
55                .with_file(true)
56                .with_target(false),
57        }
58    }
59}
60
61impl<S, D, N, L, T> CloudWatchLayer<S, D, N, format::Format<L, T>>
62where
63    N: for<'writer> fmt::FormatFields<'writer> + 'static,
64{
65    /// Configure to display line number and filename.
66    /// Default true
67    pub fn with_code_location(self, display: bool) -> Self {
68        Self {
69            fmt_layer: self.fmt_layer.with_line_number(display).with_file(display),
70        }
71    }
72
73    /// Configure to display target module.
74    /// Default false.
75    pub fn with_target(self, display: bool) -> Self {
76        Self {
77            fmt_layer: self.fmt_layer.with_target(display),
78        }
79    }
80}
81
82impl<S, D, N, E> CloudWatchLayer<S, D, N, E>
83where
84    S: Subscriber + for<'span> LookupSpan<'span>,
85    D: Dispatcher + 'static,
86    Arc<D>: for<'writer> MakeWriter<'writer>,
87{
88    /// Set client.
89    pub fn with_client<Client>(
90        self,
91        client: Client,
92        export_config: ExportConfig,
93    ) -> (
94        CloudWatchLayer<S, CloudWatchDispatcher, N, E>,
95        CloudWatchWorkerGuard,
96    )
97    where
98        Client: CloudWatchClient + Send + Sync + 'static,
99    {
100        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<ShutdownSignal>();
101
102        let guard = CloudWatchWorkerGuard::new(shutdown_tx);
103
104        (
105            CloudWatchLayer {
106                fmt_layer: self
107                    .fmt_layer
108                    .with_writer(Arc::new(CloudWatchDispatcher::new(
109                        client,
110                        export_config,
111                        shutdown_rx,
112                    ))),
113            },
114            guard,
115        )
116    }
117
118    /// Set the [`fmt::Layer`] provided as an argument.
119    /// You can control the log format for CloudWatch by setting a pre-configured [`fmt::Layer`]
120    /// However, the writer configuration will be overridden.
121    pub fn with_fmt_layer<N2, E2, W>(
122        self,
123        fmt_layer: fmt::Layer<S, N2, E2, W>,
124    ) -> CloudWatchLayer<S, D, N2, E2> {
125        let writer = self.fmt_layer.writer().clone();
126        CloudWatchLayer {
127            fmt_layer: fmt_layer.with_writer(writer),
128        }
129    }
130}
131
132impl<S, D, N, E> Layer<S> for CloudWatchLayer<S, D, N, E>
133where
134    S: Subscriber + for<'span> LookupSpan<'span>,
135    D: Dispatcher + 'static,
136    Arc<D>: for<'writer> MakeWriter<'writer>,
137    N: for<'writer> format::FormatFields<'writer> + 'static,
138    E: format::FormatEvent<S, N> + 'static,
139{
140    fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
141        self.fmt_layer.on_enter(id, ctx)
142    }
143    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
144        self.fmt_layer.on_event(event, ctx)
145    }
146
147    fn on_register_dispatch(&self, collector: &tracing::Dispatch) {
148        self.fmt_layer.on_register_dispatch(collector)
149    }
150
151    fn on_layer(&mut self, subscriber: &mut S) {
152        let _ = subscriber;
153    }
154
155    fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool {
156        self.fmt_layer.enabled(metadata, ctx)
157    }
158
159    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
160        self.fmt_layer.on_new_span(attrs, id, ctx)
161    }
162
163    fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
164        self.fmt_layer.on_record(id, values, ctx)
165    }
166
167    fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
168        self.fmt_layer.on_follows_from(span, follows, ctx)
169    }
170
171    fn event_enabled(&self, event: &Event<'_>, ctx: Context<'_, S>) -> bool {
172        self.fmt_layer.event_enabled(event, ctx)
173    }
174
175    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
176        self.fmt_layer.on_exit(id, ctx)
177    }
178
179    fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
180        self.fmt_layer.on_close(id, ctx)
181    }
182
183    fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
184        self.fmt_layer.on_id_change(old, new, ctx)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use std::sync::Mutex;
191
192    use chrono::{DateTime, TimeZone, Utc};
193    use tracing_subscriber::layer::SubscriberExt;
194
195    use crate::dispatch::LogEvent;
196
197    use super::*;
198
199    struct TestDispatcher {
200        events: Mutex<Vec<LogEvent>>,
201    }
202
203    impl TestDispatcher {
204        fn new() -> Self {
205            Self {
206                events: Mutex::new(Vec::new()),
207            }
208        }
209    }
210
211    impl Dispatcher for TestDispatcher {
212        fn dispatch(&self, input: crate::dispatch::LogEvent) {
213            self.events.lock().unwrap().push(input)
214        }
215    }
216
217    impl std::io::Write for &TestDispatcher {
218        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
219            let timestamp: DateTime<Utc> = Utc.timestamp_opt(1_5000_000_000, 0).unwrap();
220
221            let message = String::from_utf8_lossy(buf).to_string();
222
223            self.events
224                .lock()
225                .unwrap()
226                .push(LogEvent { timestamp, message });
227
228            Ok(buf.len())
229        }
230
231        fn flush(&mut self) -> std::io::Result<()> {
232            Ok(())
233        }
234    }
235
236    #[test]
237    fn format() {
238        let dispatcher = Arc::new(TestDispatcher::new());
239        let subscriber = tracing_subscriber::registry().with(
240            CloudWatchLayer::new(dispatcher.clone())
241                .with_code_location(false)
242                .with_target(false),
243        );
244
245        tracing::subscriber::with_default(subscriber, || {
246            tracing::info_span!("span-1", xxx = "yyy").in_scope(|| {
247                tracing::debug_span!("span-2", key = "value").in_scope(|| {
248                    tracing::info!("Hello!");
249                })
250            });
251
252            tracing::error!("Error");
253        });
254
255        let dispatched = dispatcher.events.lock().unwrap().remove(0);
256        assert_eq!(
257            dispatched.message,
258            " INFO span-1{xxx=\"yyy\"}:span-2{key=\"value\"}: Hello!\n"
259        );
260
261        let dispatched = dispatcher.events.lock().unwrap().remove(0);
262        assert_eq!(dispatched.message, "ERROR Error\n");
263    }
264
265    #[test]
266    fn with_fmt_layer_json() {
267        let dispatcher = Arc::new(TestDispatcher::new());
268        let subscriber = tracing_subscriber::registry().with(
269            CloudWatchLayer::new(dispatcher.clone())
270                .with_fmt_layer(fmt::layer().json().without_time()),
271        );
272
273        tracing::subscriber::with_default(subscriber, || {
274            tracing::info_span!("span-1", xxx = "yyy").in_scope(|| {
275                tracing::debug_span!("span-2", key = "value").in_scope(|| {
276                    tracing::info!("Hello!");
277                })
278            });
279        });
280
281        let dispatched = dispatcher.events.lock().unwrap().remove(0);
282        insta::assert_debug_snapshot!(dispatched.message);
283    }
284}