tracing_json/layers/formatter/
structured.rs1use crate::layers::formatter::errors::*;
2use crate::layers::prelude::JsonStorage;
3use serde::ser::SerializeMap;
4use serde::Serializer;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use std::{fmt, io::Write};
9use tracing::{span::Attributes, Event, Id, Level, Subscriber};
10use tracing_subscriber::fmt::MakeWriter;
11use tracing_subscriber::layer::Context;
12use tracing_subscriber::registry::LookupSpan;
13use tracing_subscriber::registry::SpanRef;
14use tracing_subscriber::Layer;
15
16#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
17pub enum Datatype {
18 Constant(String),
19 Level,
20 Message,
21 CurrentIso8601,
22 CurrentMilliseconds,
23 CurrentNanoseconds,
24}
25
26impl Datatype {
27 fn from(data: &Value) -> Result<Datatype> {
28 match *data {
29 Value::Object(ref map) => match map.get("type") {
30 Some(d) if d == "constant" => match map.get("value") {
31 Some(v) => Ok(Datatype::Constant(v.as_str().unwrap_or_default().into())),
32 _ => {
33 return Err(StructuredError::ParseError(
34 "Datatype missing 'value' at 'constant'".to_string(),
35 ))
36 }
37 },
38 Some(d) if d == "level" => Ok(Datatype::Level),
39 Some(d) if d == "message" => Ok(Datatype::Message),
40 Some(d) if d == "currentiso8601" => Ok(Datatype::CurrentIso8601),
41 Some(d) if d == "currentmilliseconds" => Ok(Datatype::CurrentMilliseconds),
42 Some(d) if d == "currentnanoseconds" => Ok(Datatype::CurrentNanoseconds),
43 _ => {
44 return Err(StructuredError::ParseError(
45 "Unexpected json type for datatype value".to_string(),
46 ))
47 }
48 },
49 _ => {
50 return Err(StructuredError::ParseError(
51 "Unexpected type for datatype value".to_string(),
52 ))
53 }
54 }
55 }
56}
57
58#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
59pub struct Field {
60 name: String,
61 dtype: Datatype,
62}
63
64impl Field {
65 pub fn from(data: &Value) -> Result<Self> {
68 match *data {
69 Value::Object(ref map) => {
70 let name = match map.get("name") {
71 Some(&Value::String(ref name)) => name.to_string(),
72 _ => {
73 return Err(StructuredError::ParseError(
74 "Field missing 'name' attribute".to_string(),
75 ));
76 }
77 };
78 let dtype = match map.get("dtype") {
79 Some(v) => Datatype::from(v)?,
80 _ => {
81 return Err(StructuredError::ParseError(
82 "Field missing 'dtype' attribute".to_string(),
83 ));
84 }
85 };
86
87 Ok(Field { name, dtype })
88 }
89 _ => Err(StructuredError::ParseError(
90 "Unexpected json type for field value".to_string(),
91 )),
92 }
93 }
94}
95
96#[derive(Clone, Debug)]
98pub enum SpanState {
99 Enter,
100 Exit,
101 Event,
102}
103
104impl fmt::Display for SpanState {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 let repr = match self {
107 SpanState::Enter => "start",
108 SpanState::Exit => "end",
109 SpanState::Event => "event",
110 };
111 write!(f, "{}", repr)
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct Structured<W>
117where
118 W: MakeWriter + 'static,
119{
120 make_writer: W,
121 pub(crate) fields: Vec<Field>,
122}
123
124impl<W> Structured<W>
125where
126 W: MakeWriter + 'static,
127{
128 pub fn new<'d>(format: &'d str, writer: W) -> Result<Self> {
129 let conf: Value = serde_json::from_str(format)
130 .map_err(|_| StructuredError::ParseError("Config is not in json format".to_string()))?;
131
132 match conf {
133 Value::Object(ref structure) => {
134 let fields = match structure.get("fields") {
135 Some(Value::Array(fields)) => fields
136 .iter()
137 .map(|f| Field::from(f))
138 .collect::<Result<_>>()?,
139 _ => {
140 return Err(StructuredError::ParseError(
141 "Fields should be an array".to_string(),
142 ));
143 }
144 };
145
146 Ok(Self {
147 fields,
148 make_writer: writer,
149 })
150 }
151 _ => Err(StructuredError::ParseError(
152 "Invalid format type".to_string(),
153 )),
154 }
155 }
156
157 fn structured_fields(
158 &self,
159 ms: &mut impl SerializeMap<Error = serde_json::Error>,
160 message: &str,
161 level: &Level,
162 ) -> Result<()> {
163 let now = chrono::Utc::now();
164
165 self.fields.iter().try_for_each(|f| match &f.dtype {
166 Datatype::Constant(s) => Ok(ms.serialize_entry(&f.name, &s)?),
167 Datatype::Level => Ok(ms.serialize_entry(&f.name, &level.to_string())?),
168 Datatype::Message => Ok(ms.serialize_entry(&f.name, message)?),
169 Datatype::CurrentIso8601 => Ok(ms.serialize_entry(&f.name, &now.to_rfc3339())?),
170 Datatype::CurrentMilliseconds => {
171 Ok(ms.serialize_entry(&f.name, &now.timestamp_millis())?)
172 }
173 Datatype::CurrentNanoseconds => {
174 Ok(ms.serialize_entry(&f.name, &now.timestamp_nanos())?)
175 }
176 })
177 }
178
179 fn format_span_context<S>(&self, span: &SpanRef<S>, state: SpanState) -> String
180 where
181 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
182 {
183 format!("[{} - {}]", span.metadata().name().to_lowercase(), state)
184 }
185
186 fn format_event_message<S>(
187 &self,
188 current_span: &Option<SpanRef<S>>,
189 event: &Event,
190 event_visitor: &JsonStorage<'_>,
191 ) -> String
192 where
193 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
194 {
195 let mut message = event_visitor
197 .values()
198 .get("message")
199 .map(|v| match v {
200 Value::String(s) => Some(s.as_str()),
201 _ => None,
202 })
203 .flatten()
204 .unwrap_or_else(|| event.metadata().target())
205 .to_owned();
206
207 if let Some(span) = ¤t_span {
209 message = format!(
210 "{} {}",
211 self.format_span_context(span, SpanState::Event),
212 message
213 );
214 }
215
216 message
217 }
218
219 fn format<S>(
220 &self,
221 event: &Event<'_>,
222 current_span: Option<SpanRef<S>>,
223 event_visitor: JsonStorage,
224 ) -> Result<Vec<u8>>
225 where
226 S: Subscriber + for<'a> LookupSpan<'a>,
227 {
228 let mut buffer = Vec::with_capacity(self.fields.len() * 128);
229
230 let mut serializer = serde_json::Serializer::new(&mut buffer);
231 let mut map_serializer = serializer.serialize_map(None)?;
232
233 let message = self.format_event_message(¤t_span, event, &event_visitor);
234 self.structured_fields(&mut map_serializer, &message, event.metadata().level())?;
235
236 let _ = event_visitor
238 .values()
239 .iter()
240 .filter(|(&key, _)| key != "message")
241 .try_for_each(|(key, value)| -> Result<()> {
242 Ok(map_serializer.serialize_entry(key, value)?)
243 });
244
245 if let Some(span) = ¤t_span {
247 let extensions = span.extensions();
248 if let Some(visitor) = extensions.get::<JsonStorage>() {
249 let _ = visitor
250 .values()
251 .iter()
252 .try_for_each(|(key, value)| -> Result<()> {
253 Ok(map_serializer.serialize_entry(key, value)?)
254 });
255 }
256 }
257 map_serializer.end()?;
258 Ok(buffer)
259 }
260
261 fn serialize_span<S>(&self, span: &SpanRef<S>, state: SpanState) -> Result<Vec<u8>>
262 where
263 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
264 {
265 let mut buffer = Vec::with_capacity(self.fields.len() * 128);
266 let mut serializer = serde_json::Serializer::new(&mut buffer);
267 let mut map_serializer = serializer.serialize_map(None)?;
268 let message = self.format_span_context(&span, state);
269 self.structured_fields(&mut map_serializer, &message, span.metadata().level())?;
270
271 let extensions = span.extensions();
272 if let Some(visitor) = extensions.get::<JsonStorage>() {
273 for (key, value) in visitor.values() {
274 map_serializer.serialize_entry(key, value)?;
275 }
276 }
277 map_serializer.end()?;
278 Ok(buffer)
279 }
280
281 fn emit(&self, mut buffer: Vec<u8>) -> Result<()> {
282 buffer.write_all(b"\n")?;
283 self.make_writer
284 .make_writer()
285 .write_all(&buffer)
286 .map_err(|e| StructuredError::WriterError(e.to_string()))
287 }
288}
289
290impl<S, W> Layer<S> for Structured<W>
291where
292 S: Subscriber + for<'a> LookupSpan<'a>,
293 W: MakeWriter + 'static,
294{
295 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
296 let current_span = ctx.lookup_current();
297
298 let mut event_visitor = JsonStorage::default();
299 event.record(&mut event_visitor);
300
301 let _ = self
302 .format(event, current_span, event_visitor)
303 .map(|formatted| {
304 let _ = self.emit(formatted);
305 });
306 }
307
308 fn new_span(&self, _attrs: &Attributes, id: &Id, ctx: Context<'_, S>) {
309 let span = ctx.span(id).expect("Span not found, this is a bug");
310 if let Ok(serialized) = self.serialize_span(&span, SpanState::Enter) {
311 let _ = self.emit(serialized);
312 }
313 }
314
315 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
316 let span = ctx.span(&id).expect("Span not found, this is a bug");
317 if let Ok(serialized) = self.serialize_span(&span, SpanState::Exit) {
318 let _ = self.emit(serialized);
319 }
320 }
321}
322
323#[cfg(test)]
324mod tracing_json_structured_tests {
325 use super::*;
326
327 #[test]
328 fn parse_structured() {
329 let config: &str = r#"
330 {
331 "fields": [
332 {
333 "name": "v",
334 "dtype": {
335 "type": "constant",
336 "value": "1"
337 }
338 },
339 {
340 "name": "l",
341 "dtype": {
342 "type": "level",
343 "value": "WARN"
344 }
345 },
346 {
347 "name": "current_ms",
348 "dtype": {
349 "type": "currentmilliseconds"
350 }
351 }
352 ]
353 }
354 "#;
355
356 let _s = Structured::new(config, std::io::stdout).unwrap();
357 }
358}