datadog_formatting_layer/
layer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
use crate::{
    datadog_ids,
    event_sink::{EventSink, StdoutSink},
    fields::{self, FieldPair, FieldStore},
    formatting::DatadogLog,
};
use chrono::Utc;
use tracing::{span::Attributes, Event, Id, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

/// The layer responsible for formatting tracing events in a way datadog can parse them
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DatadogFormattingLayer<Sink: EventSink + 'static> {
    event_sink: Sink,
}

impl<S: EventSink + 'static> DatadogFormattingLayer<S> {
    /// Create a new `DatadogFormattingLayer` with the provided event sink
    pub const fn with_sink(sink: S) -> Self {
        Self { event_sink: sink }
    }
}

impl Default for DatadogFormattingLayer<StdoutSink> {
    fn default() -> Self {
        Self::with_sink(StdoutSink::default())
    }
}

impl<S: Subscriber + for<'a> LookupSpan<'a>, Sink: EventSink + 'static> Layer<S>
    for DatadogFormattingLayer<Sink>
{
    fn on_new_span(&self, span_attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
        #[allow(clippy::expect_used)]
        let span = ctx.span(id).expect("Span not found, this is a bug");

        let mut extensions = span.extensions_mut();

        let fields = fields::from_attributes(span_attrs);

        // insert fields from new span e.g #[instrument(fields(hello = "world"))]
        if extensions.get_mut::<FieldStore>().is_none() {
            extensions.insert(FieldStore { fields });
        }
    }

    // IDEA: maybe a on record implementation is required here

    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
        let event_fields = fields::from_event(event);

        // find message if present in event fields
        let message = event_fields
            .iter()
            .find(|pair| pair.name == "message")
            .map(|pair| pair.value.clone())
            .unwrap_or_default();

        let all_fields: Vec<FieldPair> = Vec::default()
            .into_iter()
            .chain(fields::from_spans(&ctx, event))
            .chain(event_fields)
            .collect();

        // look for datadog trace- and span-id
        let datadog_ids = datadog_ids::read_from_context(&ctx);

        let log = DatadogLog {
            timestamp: Utc::now(),
            level: event.metadata().level().to_owned(),
            message,
            fields: all_fields,
            target: event.metadata().target().to_string(),
            datadog_ids,
        };

        let serialized_event = log.format();

        self.event_sink.write(serialized_event);
    }
}

#[cfg(test)]
mod simple_layer {
    use self::setup::first_span;
    use super::*;
    use simple_layer::setup::setup_simple_subscriber;
    use smoothy::assert_that;
    use tracing::info;

    #[test]
    fn simple_log() {
        let (sink, _guard) = setup_simple_subscriber();

        info!("Hello World!");

        let events = sink.events();
        assert_that(&events).size().is(1);

        assert_that(events).first().contains_string("\",\"level\":\"INFO\",\"message\":\"Hello World!\",\"target\":\"datadog_formatting_layer::layer::simple_layer\"}");
    }

    #[test]
    fn log_with_fields() {
        let (sink, _guard) = setup_simple_subscriber();

        info!(user = "John Doe", "Hello World!");

        let events = sink.events();
        assert_that(&events).size().is(1);

        assert_that(events).first().contains_string("\",\"level\":\"INFO\",\"fields.user\":\"John Doe\",\"message\":\"Hello World! user=John Doe\",\"target\":\"datadog_formatting_layer::layer::simple_layer\"}");
    }

    #[allow(clippy::redundant_clone)]
    #[test]
    fn complex_logs() {
        let (sink, _guard) = setup_simple_subscriber();

        first_span("Argument");

        let events = sink.events();
        assert_that(&events).size().is(3);

        assert_that(events.clone()).first().contains_string("\",\"level\":\"DEBUG\",\"fields.first_value\":\"Argument\",\"message\":\"First Span! first_value=Argument\",\"target\":\"datadog_formatting_layer::layer::setup\"}");
        assert_that(events.clone()).second().contains_string("\",\"level\":\"DEBUG\",\"fields.attr\":\"value\",\"fields.first_value\":\"Argument\",\"message\":\"Second Span! attr=value first_value=Argument\",\"target\":\"datadog_formatting_layer::layer::setup\"}");
        assert_that(events.clone()).third().contains_string("\",\"level\":\"INFO\",\"fields.attr\":\"value\",\"fields.first_value\":\"Argument\",\"fields.return\":\"Return Value\",\"message\":\" attr=value first_value=Argument return=Return Value\",\"target\":\"datadog_formatting_layer::layer::setup\"}");
    }
}

#[cfg(test)]
mod layer_with_otel {
    use self::setup::{first_span, setup_otel_subscriber};
    use super::*;
    use crate::layer::setup::{LogMessageExt, SmoothyExt};
    use smoothy::assert_that;
    use tracing::info;

    #[tokio::test]
    async fn without_spans_has_no_datadog_ids() {
        let (sink, _guard) = setup_otel_subscriber().await;

        info!("Hello World!");

        let events = sink.events();

        assert_that(events.clone()).size().is(1);
        assert_that(events[0].trace_id()).is_none();
        assert_that(events[0].span_id()).is_none();
    }

    #[tokio::test]
    async fn with_spans_has_correct_datadog_ids() {
        let (sink, _guard) = setup_otel_subscriber().await;

        first_span("Argument");

        let events = sink.events();
        assert_that(&events).size().is(3);

        // First message has no trace id but a span id
        assert_that(events[0].trace_id()).is_not_valid();
        assert_that(events[0].span_id()).is_valid();
        // second message has trace id and different span id
        assert_that(events[1].trace_id()).is_valid();
        assert_that(events[1].span_id()).is_valid();
        assert_that(events[1].span_id()).is_not(events[0].span_id());
        // third message has same trace id as the second and different span id
        assert_that(events[2].trace_id()).is_valid();
        assert_that(events[2].trace_id()).is(events[1].trace_id());
        assert_that(events[2].span_id()).is_valid();
        assert_that(events[2].span_id()).is(events[1].span_id());
    }
}

#[cfg(test)]
mod setup {
    use super::*;
    use opentelemetry::global;
    use opentelemetry_datadog::ApiVersion;
    use opentelemetry_sdk::{
        propagation::TraceContextPropagator,
        runtime::Tokio,
        trace::{config, RandomIdGenerator, Sampler},
    };
    use serde_json::Value;
    use smoothy::BasicAsserter;
    use std::sync::{Arc, Mutex};
    use tracing::{debug, instrument, subscriber::DefaultGuard};
    use tracing_subscriber::prelude::*;

    pub fn setup_simple_subscriber() -> (ObservableSink, DefaultGuard) {
        let sink = ObservableSink::default();

        let subscriber =
            tracing_subscriber::registry().with(DatadogFormattingLayer::with_sink(sink.clone()));

        let guard = tracing::subscriber::set_default(subscriber);

        (sink, guard)
    }

    pub async fn setup_otel_subscriber() -> (ObservableSink, DefaultGuard) {
        let sink = ObservableSink::default();

        // otel boilerplate
        global::set_text_map_propagator(TraceContextPropagator::new());

        let tracer = opentelemetry_datadog::new_pipeline()
            .with_service_name("my-service")
            .with_trace_config(
                config()
                    .with_sampler(Sampler::AlwaysOn)
                    .with_id_generator(RandomIdGenerator::default()),
            )
            .with_api_version(ApiVersion::Version05)
            .with_env("rls")
            .with_version("420")
            .install_batch(Tokio)
            .unwrap();

        let subscriber = tracing_subscriber::registry()
            .with(DatadogFormattingLayer::with_sink(sink.clone()))
            .with(tracing_opentelemetry::layer().with_tracer(tracer));

        let guard = tracing::subscriber::set_default(subscriber);

        (sink, guard)
    }

    pub trait LogMessageExt {
        fn span_id(&self) -> Option<u64>;
        fn trace_id(&self) -> Option<u64>;
    }

    impl LogMessageExt for String {
        fn span_id(&self) -> Option<u64> {
            let log: Value = serde_json::from_str(self).unwrap();
            log.get("dd.span_id")
                .map(|span_id| span_id.as_u64().unwrap())
        }

        fn trace_id(&self) -> Option<u64> {
            let log: Value = serde_json::from_str(self).unwrap();
            log.get("dd.trace_id")
                .map(|span_id| span_id.as_u64().unwrap())
        }
    }

    pub trait SmoothyExt {
        #[allow(clippy::wrong_self_convention)]
        fn is_valid(self);
        #[allow(clippy::wrong_self_convention)]
        fn is_not_valid(self);
    }

    impl SmoothyExt for BasicAsserter<Option<u64>> {
        fn is_valid(self) {
            self.is_some().and_value().is_not(0);
        }

        fn is_not_valid(self) {
            self.is_some().and_value().is(0);
        }
    }

    #[derive(Debug, Clone, Default)]
    pub struct ObservableSink {
        events: Arc<Mutex<Vec<String>>>,
    }

    impl EventSink for ObservableSink {
        #[allow(clippy::print_stdout)]
        fn write(&self, event: String) {
            println!("{event}");
            self.events.lock().unwrap().push(event);
        }
    }

    impl ObservableSink {
        pub fn events(&self) -> Vec<String> {
            self.events.lock().unwrap().clone()
        }
    }

    #[instrument]
    pub fn first_span(first_value: &str) {
        debug!("First Span!");
        second_span();
    }

    #[instrument(fields(attr = "value"), ret)]
    fn second_span() -> String {
        debug!("Second Span!");
        String::from("Return Value")
    }
}