tracing_json/layers/formatter/
structured.rs

1use crate::layers::formatter::errors::*;
2use crate::layers::prelude::JsonStorage;
3use serde::ser::SerializeMap;
4use serde::Serializer;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use std::{fmt, io::Write};
9use tracing::{span::Attributes, Event, Id, Level, Subscriber};
10use tracing_subscriber::fmt::MakeWriter;
11use tracing_subscriber::layer::Context;
12use tracing_subscriber::registry::LookupSpan;
13use tracing_subscriber::registry::SpanRef;
14use tracing_subscriber::Layer;
15
16#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
17pub enum Datatype {
18    Constant(String),
19    Level,
20    Message,
21    CurrentIso8601,
22    CurrentMilliseconds,
23    CurrentNanoseconds,
24}
25
26impl Datatype {
27    fn from(data: &Value) -> Result<Datatype> {
28        match *data {
29            Value::Object(ref map) => match map.get("type") {
30                Some(d) if d == "constant" => match map.get("value") {
31                    Some(v) => Ok(Datatype::Constant(v.as_str().unwrap_or_default().into())),
32                    _ => {
33                        return Err(StructuredError::ParseError(
34                            "Datatype missing 'value' at 'constant'".to_string(),
35                        ))
36                    }
37                },
38                Some(d) if d == "level" => Ok(Datatype::Level),
39                Some(d) if d == "message" => Ok(Datatype::Message),
40                Some(d) if d == "currentiso8601" => Ok(Datatype::CurrentIso8601),
41                Some(d) if d == "currentmilliseconds" => Ok(Datatype::CurrentMilliseconds),
42                Some(d) if d == "currentnanoseconds" => Ok(Datatype::CurrentNanoseconds),
43                _ => {
44                    return Err(StructuredError::ParseError(
45                        "Unexpected json type for datatype value".to_string(),
46                    ))
47                }
48            },
49            _ => {
50                return Err(StructuredError::ParseError(
51                    "Unexpected type for datatype value".to_string(),
52                ))
53            }
54        }
55    }
56}
57
58#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
59pub struct Field {
60    name: String,
61    dtype: Datatype,
62}
63
64impl Field {
65    ///
66    /// Parse `field` definition
67    pub fn from(data: &Value) -> Result<Self> {
68        match *data {
69            Value::Object(ref map) => {
70                let name = match map.get("name") {
71                    Some(&Value::String(ref name)) => name.to_string(),
72                    _ => {
73                        return Err(StructuredError::ParseError(
74                            "Field missing 'name' attribute".to_string(),
75                        ));
76                    }
77                };
78                let dtype = match map.get("dtype") {
79                    Some(v) => Datatype::from(v)?,
80                    _ => {
81                        return Err(StructuredError::ParseError(
82                            "Field missing 'dtype' attribute".to_string(),
83                        ));
84                    }
85                };
86
87                Ok(Field { name, dtype })
88            }
89            _ => Err(StructuredError::ParseError(
90                "Unexpected json type for field value".to_string(),
91            )),
92        }
93    }
94}
95
96/// The type of record we are dealing with: entering a span, exiting a span, an event.
97#[derive(Clone, Debug)]
98pub enum SpanState {
99    Enter,
100    Exit,
101    Event,
102}
103
104impl fmt::Display for SpanState {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        let repr = match self {
107            SpanState::Enter => "start",
108            SpanState::Exit => "end",
109            SpanState::Event => "event",
110        };
111        write!(f, "{}", repr)
112    }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct Structured<W>
117where
118    W: MakeWriter + 'static,
119{
120    make_writer: W,
121    pub(crate) fields: Vec<Field>,
122}
123
124impl<W> Structured<W>
125where
126    W: MakeWriter + 'static,
127{
128    pub fn new<'d>(format: &'d str, writer: W) -> Result<Self> {
129        let conf: Value = serde_json::from_str(format)
130            .map_err(|_| StructuredError::ParseError("Config is not in json format".to_string()))?;
131
132        match conf {
133            Value::Object(ref structure) => {
134                let fields = match structure.get("fields") {
135                    Some(Value::Array(fields)) => fields
136                        .iter()
137                        .map(|f| Field::from(f))
138                        .collect::<Result<_>>()?,
139                    _ => {
140                        return Err(StructuredError::ParseError(
141                            "Fields should be an array".to_string(),
142                        ));
143                    }
144                };
145
146                Ok(Self {
147                    fields,
148                    make_writer: writer,
149                })
150            }
151            _ => Err(StructuredError::ParseError(
152                "Invalid format type".to_string(),
153            )),
154        }
155    }
156
157    fn structured_fields(
158        &self,
159        ms: &mut impl SerializeMap<Error = serde_json::Error>,
160        message: &str,
161        level: &Level,
162    ) -> Result<()> {
163        let now = chrono::Utc::now();
164
165        self.fields.iter().try_for_each(|f| match &f.dtype {
166            Datatype::Constant(s) => Ok(ms.serialize_entry(&f.name, &s)?),
167            Datatype::Level => Ok(ms.serialize_entry(&f.name, &level.to_string())?),
168            Datatype::Message => Ok(ms.serialize_entry(&f.name, message)?),
169            Datatype::CurrentIso8601 => Ok(ms.serialize_entry(&f.name, &now.to_rfc3339())?),
170            Datatype::CurrentMilliseconds => {
171                Ok(ms.serialize_entry(&f.name, &now.timestamp_millis())?)
172            }
173            Datatype::CurrentNanoseconds => {
174                Ok(ms.serialize_entry(&f.name, &now.timestamp_nanos())?)
175            }
176        })
177    }
178
179    fn format_span_context<S>(&self, span: &SpanRef<S>, state: SpanState) -> String
180    where
181        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
182    {
183        format!("[{} - {}]", span.metadata().name().to_lowercase(), state)
184    }
185
186    fn format_event_message<S>(
187        &self,
188        current_span: &Option<SpanRef<S>>,
189        event: &Event,
190        event_visitor: &JsonStorage<'_>,
191    ) -> String
192    where
193        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
194    {
195        // Extract the "message" field, if provided. Fallback to the target, if missing.
196        let mut message = event_visitor
197            .values()
198            .get("message")
199            .map(|v| match v {
200                Value::String(s) => Some(s.as_str()),
201                _ => None,
202            })
203            .flatten()
204            .unwrap_or_else(|| event.metadata().target())
205            .to_owned();
206
207        // If the event is in the context of a span, prepend the span name to the message.
208        if let Some(span) = &current_span {
209            message = format!(
210                "{} {}",
211                self.format_span_context(span, SpanState::Event),
212                message
213            );
214        }
215
216        message
217    }
218
219    fn format<S>(
220        &self,
221        event: &Event<'_>,
222        current_span: Option<SpanRef<S>>,
223        event_visitor: JsonStorage,
224    ) -> Result<Vec<u8>>
225    where
226        S: Subscriber + for<'a> LookupSpan<'a>,
227    {
228        let mut buffer = Vec::with_capacity(self.fields.len() * 128);
229
230        let mut serializer = serde_json::Serializer::new(&mut buffer);
231        let mut map_serializer = serializer.serialize_map(None)?;
232
233        let message = self.format_event_message(&current_span, event, &event_visitor);
234        self.structured_fields(&mut map_serializer, &message, event.metadata().level())?;
235
236        // Add all the other fields associated with the event, expect the message we already used.
237        let _ = event_visitor
238            .values()
239            .iter()
240            .filter(|(&key, _)| key != "message")
241            .try_for_each(|(key, value)| -> Result<()> {
242                Ok(map_serializer.serialize_entry(key, value)?)
243            });
244
245        // Add all the fields from the current span, if we have one.
246        if let Some(span) = &current_span {
247            let extensions = span.extensions();
248            if let Some(visitor) = extensions.get::<JsonStorage>() {
249                let _ = visitor
250                    .values()
251                    .iter()
252                    .try_for_each(|(key, value)| -> Result<()> {
253                        Ok(map_serializer.serialize_entry(key, value)?)
254                    });
255            }
256        }
257        map_serializer.end()?;
258        Ok(buffer)
259    }
260
261    fn serialize_span<S>(&self, span: &SpanRef<S>, state: SpanState) -> Result<Vec<u8>>
262    where
263        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
264    {
265        let mut buffer = Vec::with_capacity(self.fields.len() * 128);
266        let mut serializer = serde_json::Serializer::new(&mut buffer);
267        let mut map_serializer = serializer.serialize_map(None)?;
268        let message = self.format_span_context(&span, state);
269        self.structured_fields(&mut map_serializer, &message, span.metadata().level())?;
270
271        let extensions = span.extensions();
272        if let Some(visitor) = extensions.get::<JsonStorage>() {
273            for (key, value) in visitor.values() {
274                map_serializer.serialize_entry(key, value)?;
275            }
276        }
277        map_serializer.end()?;
278        Ok(buffer)
279    }
280
281    fn emit(&self, mut buffer: Vec<u8>) -> Result<()> {
282        buffer.write_all(b"\n")?;
283        self.make_writer
284            .make_writer()
285            .write_all(&buffer)
286            .map_err(|e| StructuredError::WriterError(e.to_string()))
287    }
288}
289
290impl<S, W> Layer<S> for Structured<W>
291where
292    S: Subscriber + for<'a> LookupSpan<'a>,
293    W: MakeWriter + 'static,
294{
295    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
296        let current_span = ctx.lookup_current();
297
298        let mut event_visitor = JsonStorage::default();
299        event.record(&mut event_visitor);
300
301        let _ = self
302            .format(event, current_span, event_visitor)
303            .map(|formatted| {
304                let _ = self.emit(formatted);
305            });
306    }
307
308    fn new_span(&self, _attrs: &Attributes, id: &Id, ctx: Context<'_, S>) {
309        let span = ctx.span(id).expect("Span not found, this is a bug");
310        if let Ok(serialized) = self.serialize_span(&span, SpanState::Enter) {
311            let _ = self.emit(serialized);
312        }
313    }
314
315    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
316        let span = ctx.span(&id).expect("Span not found, this is a bug");
317        if let Ok(serialized) = self.serialize_span(&span, SpanState::Exit) {
318            let _ = self.emit(serialized);
319        }
320    }
321}
322
323#[cfg(test)]
324mod tracing_json_structured_tests {
325    use super::*;
326
327    #[test]
328    fn parse_structured() {
329        let config: &str = r#"
330        {
331            "fields": [
332                {
333                    "name": "v",
334                    "dtype": {
335                      "type": "constant",
336                      "value": "1"
337                    }
338                },
339                {
340                    "name": "l",
341                    "dtype": {
342                      "type": "level",
343                      "value": "WARN"
344                    }
345                },
346                {
347                    "name": "current_ms",
348                    "dtype": {
349                      "type": "currentmilliseconds"
350                    }
351                }
352            ]
353        }
354        "#;
355
356        let _s = Structured::new(config, std::io::stdout).unwrap();
357    }
358}