1use std::sync::Arc;
2
3use tracing_core::{Event, Subscriber, span};
4use tracing_subscriber::{
5 Layer,
6 fmt::{self, MakeWriter, format},
7 layer::Context,
8 registry::LookupSpan,
9};
10
11use crate::{
12 client::CloudWatchClient,
13 dispatch::{CloudWatchDispatcher, Dispatcher, NoopDispatcher},
14 export::ExportConfig,
15 guard::{CloudWatchWorkerGuard, ShutdownSignal},
16};
17
18pub struct CloudWatchLayer<S, D, N = format::DefaultFields, E = format::Format<format::Full, ()>> {
20 fmt_layer: fmt::Layer<S, N, E, Arc<D>>,
21}
22
23pub fn layer<S>() -> CloudWatchLayer<S, NoopDispatcher>
25where
26 S: Subscriber + for<'span> LookupSpan<'span>,
27{
28 CloudWatchLayer::default()
29}
30
31impl<S> Default
32 for CloudWatchLayer<S, NoopDispatcher, format::DefaultFields, format::Format<format::Full, ()>>
33where
34 S: Subscriber + for<'span> LookupSpan<'span>,
35{
36 fn default() -> Self {
37 CloudWatchLayer::<S,NoopDispatcher, format::DefaultFields, format::Format<format::Full,()>>::new(Arc::new(NoopDispatcher::new()))
38 }
39}
40
41impl<S, D> CloudWatchLayer<S, D, format::DefaultFields, format::Format<format::Full, ()>>
42where
43 S: Subscriber + for<'span> LookupSpan<'span>,
44 D: Dispatcher + 'static,
45 Arc<D>: for<'writer> MakeWriter<'writer>,
46{
47 pub fn new(dispatcher: Arc<D>) -> Self {
48 Self {
49 fmt_layer: fmt::Layer::default()
50 .without_time()
51 .with_writer(dispatcher)
52 .with_ansi(false)
53 .with_level(true)
54 .with_line_number(true)
55 .with_file(true)
56 .with_target(false),
57 }
58 }
59}
60
61impl<S, D, N, L, T> CloudWatchLayer<S, D, N, format::Format<L, T>>
62where
63 N: for<'writer> fmt::FormatFields<'writer> + 'static,
64{
65 pub fn with_code_location(self, display: bool) -> Self {
68 Self {
69 fmt_layer: self.fmt_layer.with_line_number(display).with_file(display),
70 }
71 }
72
73 pub fn with_target(self, display: bool) -> Self {
76 Self {
77 fmt_layer: self.fmt_layer.with_target(display),
78 }
79 }
80}
81
82impl<S, D, N, E> CloudWatchLayer<S, D, N, E>
83where
84 S: Subscriber + for<'span> LookupSpan<'span>,
85 D: Dispatcher + 'static,
86 Arc<D>: for<'writer> MakeWriter<'writer>,
87{
88 pub fn with_client<Client>(
90 self,
91 client: Client,
92 export_config: ExportConfig,
93 ) -> (
94 CloudWatchLayer<S, CloudWatchDispatcher, N, E>,
95 CloudWatchWorkerGuard,
96 )
97 where
98 Client: CloudWatchClient + Send + Sync + 'static,
99 {
100 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<ShutdownSignal>();
101
102 let guard = CloudWatchWorkerGuard::new(shutdown_tx);
103
104 (
105 CloudWatchLayer {
106 fmt_layer: self
107 .fmt_layer
108 .with_writer(Arc::new(CloudWatchDispatcher::new(
109 client,
110 export_config,
111 shutdown_rx,
112 ))),
113 },
114 guard,
115 )
116 }
117
118 pub fn with_fmt_layer<N2, E2, W>(
122 self,
123 fmt_layer: fmt::Layer<S, N2, E2, W>,
124 ) -> CloudWatchLayer<S, D, N2, E2> {
125 let writer = self.fmt_layer.writer().clone();
126 CloudWatchLayer {
127 fmt_layer: fmt_layer.with_writer(writer),
128 }
129 }
130}
131
132impl<S, D, N, E> Layer<S> for CloudWatchLayer<S, D, N, E>
133where
134 S: Subscriber + for<'span> LookupSpan<'span>,
135 D: Dispatcher + 'static,
136 Arc<D>: for<'writer> MakeWriter<'writer>,
137 N: for<'writer> format::FormatFields<'writer> + 'static,
138 E: format::FormatEvent<S, N> + 'static,
139{
140 fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
141 self.fmt_layer.on_enter(id, ctx)
142 }
143 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
144 self.fmt_layer.on_event(event, ctx)
145 }
146
147 fn on_register_dispatch(&self, collector: &tracing::Dispatch) {
148 self.fmt_layer.on_register_dispatch(collector)
149 }
150
151 fn on_layer(&mut self, subscriber: &mut S) {
152 let _ = subscriber;
153 }
154
155 fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool {
156 self.fmt_layer.enabled(metadata, ctx)
157 }
158
159 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
160 self.fmt_layer.on_new_span(attrs, id, ctx)
161 }
162
163 fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
164 self.fmt_layer.on_record(id, values, ctx)
165 }
166
167 fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
168 self.fmt_layer.on_follows_from(span, follows, ctx)
169 }
170
171 fn event_enabled(&self, event: &Event<'_>, ctx: Context<'_, S>) -> bool {
172 self.fmt_layer.event_enabled(event, ctx)
173 }
174
175 fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
176 self.fmt_layer.on_exit(id, ctx)
177 }
178
179 fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
180 self.fmt_layer.on_close(id, ctx)
181 }
182
183 fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
184 self.fmt_layer.on_id_change(old, new, ctx)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::sync::Mutex;
191
192 use chrono::{DateTime, TimeZone, Utc};
193 use tracing_subscriber::layer::SubscriberExt;
194
195 use crate::dispatch::LogEvent;
196
197 use super::*;
198
199 struct TestDispatcher {
200 events: Mutex<Vec<LogEvent>>,
201 }
202
203 impl TestDispatcher {
204 fn new() -> Self {
205 Self {
206 events: Mutex::new(Vec::new()),
207 }
208 }
209 }
210
211 impl Dispatcher for TestDispatcher {
212 fn dispatch(&self, input: crate::dispatch::LogEvent) {
213 self.events.lock().unwrap().push(input)
214 }
215 }
216
217 impl std::io::Write for &TestDispatcher {
218 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
219 let timestamp: DateTime<Utc> = Utc.timestamp_opt(1_5000_000_000, 0).unwrap();
220
221 let message = String::from_utf8_lossy(buf).to_string();
222
223 self.events
224 .lock()
225 .unwrap()
226 .push(LogEvent { timestamp, message });
227
228 Ok(buf.len())
229 }
230
231 fn flush(&mut self) -> std::io::Result<()> {
232 Ok(())
233 }
234 }
235
236 #[test]
237 fn format() {
238 let dispatcher = Arc::new(TestDispatcher::new());
239 let subscriber = tracing_subscriber::registry().with(
240 CloudWatchLayer::new(dispatcher.clone())
241 .with_code_location(false)
242 .with_target(false),
243 );
244
245 tracing::subscriber::with_default(subscriber, || {
246 tracing::info_span!("span-1", xxx = "yyy").in_scope(|| {
247 tracing::debug_span!("span-2", key = "value").in_scope(|| {
248 tracing::info!("Hello!");
249 })
250 });
251
252 tracing::error!("Error");
253 });
254
255 let dispatched = dispatcher.events.lock().unwrap().remove(0);
256 assert_eq!(
257 dispatched.message,
258 " INFO span-1{xxx=\"yyy\"}:span-2{key=\"value\"}: Hello!\n"
259 );
260
261 let dispatched = dispatcher.events.lock().unwrap().remove(0);
262 assert_eq!(dispatched.message, "ERROR Error\n");
263 }
264
265 #[test]
266 fn with_fmt_layer_json() {
267 let dispatcher = Arc::new(TestDispatcher::new());
268 let subscriber = tracing_subscriber::registry().with(
269 CloudWatchLayer::new(dispatcher.clone())
270 .with_fmt_layer(fmt::layer().json().without_time()),
271 );
272
273 tracing::subscriber::with_default(subscriber, || {
274 tracing::info_span!("span-1", xxx = "yyy").in_scope(|| {
275 tracing::debug_span!("span-2", key = "value").in_scope(|| {
276 tracing::info!("Hello!");
277 })
278 });
279 });
280
281 let dispatched = dispatcher.events.lock().unwrap().remove(0);
282 insta::assert_debug_snapshot!(dispatched.message);
283 }
284}