dynamecs_analyze/
lib.rs

1use eyre::{eyre, ErrReport};
2use flate2::read::GzDecoder;
3use serde::{Deserialize, Serialize};
4use serde_json::{Map, Value};
5use std::ffi::OsStr;
6use std::fmt::{Display, Formatter};
7use std::fs::File;
8use std::io;
9use std::io::{BufRead, BufReader, Lines, Read, Write};
10use std::path::Path;
11use std::str::FromStr;
12use time::OffsetDateTime;
13
14pub mod timing;
15
16mod span_path;
17pub use span_path::SpanPath;
18
19mod span_tree;
20pub use span_tree::{SpanTree, SpanTreeNode};
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct Span {
24    name: String,
25    // TODO: Replace with Map<String, Value>, since we always expect it to be an object?
26    fields: serde_json::Value,
27}
28
29impl Span {
30    pub fn from_name_and_fields(name: impl Into<String>, fields: serde_json::Value) -> Self {
31        let mut fields = fields;
32        let name = name.into();
33        // fields must contain "name" as a field. Unfortunately tracing puts the name
34        // of the span and its fields in the same object, which might lead to collisions...
35        // The only way to fix this would be to use our own JSON format, which we will probably
36        // do some time in the future.
37        fields
38            .as_object_mut()
39            .expect("fields must be a JSON object")
40            .insert("name".to_string(), serde_json::Value::String(name.clone()));
41        Self { name, fields }
42    }
43
44    fn try_from_json_value(value: serde_json::Value) -> eyre::Result<Self> {
45        let name = value
46            .as_object()
47            .and_then(|obj| obj.get("name").and_then(|val| val.as_str()))
48            .ok_or_else(|| eyre!("missing name in span"))?
49            .to_string();
50        Ok(Self { name, fields: value })
51    }
52
53    fn to_json_value(self) -> serde_json::Value {
54        let mut fields = self.fields;
55        *fields.get_mut("name").unwrap() = Value::String(self.name);
56        fields
57    }
58
59    pub fn name(&self) -> &str {
60        &self.name
61    }
62
63    pub fn fields(&self) -> &serde_json::Value {
64        &self.fields
65    }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum RecordKind {
70    SpanEnter,
71    SpanExit,
72    Event,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct Record {
77    target: String,
78    span: Option<Span>,
79    level: Level,
80    spans: Option<Vec<Span>>,
81    kind: RecordKind,
82    message: Option<String>,
83    timestamp: OffsetDateTime,
84    thread_id: String,
85    fields: serde_json::Value,
86}
87
88impl Record {
89    pub fn level(&self) -> Level {
90        self.level
91    }
92
93    pub fn span(&self) -> Option<&Span> {
94        self.span.as_ref()
95    }
96
97    pub fn spans(&self) -> Option<&[Span]> {
98        self.spans.as_ref().map(Vec::as_ref)
99    }
100
101    pub fn target(&self) -> &str {
102        &self.target
103    }
104
105    pub fn message(&self) -> Option<&str> {
106        self.message.as_ref().map(AsRef::as_ref)
107    }
108
109    pub fn kind(&self) -> RecordKind {
110        self.kind
111    }
112
113    pub fn timestamp(&self) -> &OffsetDateTime {
114        &self.timestamp
115    }
116
117    /// Create the span path associated with this record.
118    ///
119    /// For span enter/exit records, this is the span that is currently being entered/exited,
120    /// and for events it is the path to the span in which the event takes place.
121    pub fn create_span_path(&self) -> eyre::Result<SpanPath> {
122        let mut span_names: Vec<_> = self
123            .spans
124            .iter()
125            .flatten()
126            .map(|span| span.name.clone())
127            .collect();
128        match self.kind() {
129            RecordKind::SpanEnter | RecordKind::Event => {}
130            RecordKind::SpanExit => {
131                // The exit record does not include the span currently being exited
132                // in the list of entered spans.
133                let span_name = self
134                    .span()
135                    .map(|span| span.name())
136                    .ok_or_else(|| eyre!("No span in exit record"))?;
137                span_names.push(span_name.to_string());
138            }
139        }
140        Ok(SpanPath::new(span_names))
141    }
142
143    pub fn thread_id(&self) -> &str {
144        &self.thread_id
145    }
146
147    pub fn fields(&self) -> &serde_json::Value {
148        &self.fields
149    }
150}
151
152#[derive(Default, Debug, Clone)]
153pub struct RecordBuilder {
154    target: Option<String>,
155    span: Option<Span>,
156    level: Option<Level>,
157    spans: Option<Vec<Span>>,
158    kind: Option<RecordKind>,
159    message: Option<String>,
160    timestamp: Option<OffsetDateTime>,
161    thread_id: Option<String>,
162    fields: Option<serde_json::Value>,
163}
164
165#[derive(Debug, Clone)]
166pub struct RecordBuildError {
167    message: String,
168}
169
170impl Display for RecordBuildError {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        write!(f, "error building record: {}", &self.message)
173    }
174}
175
176impl RecordBuildError {
177    fn missing_field(field_name: &str) -> Self {
178        Self {
179            message: format!("missing field {field_name} in Record construction"),
180        }
181    }
182
183    fn message(message: String) -> Self {
184        Self { message }
185    }
186}
187
188impl RecordBuilder {
189    pub fn new() -> Self {
190        Self::default()
191    }
192
193    pub fn from_record(record: Record) -> Self {
194        Self {
195            target: Some(record.target),
196            span: record.span,
197            level: Some(record.level),
198            spans: record.spans,
199            kind: Some(record.kind),
200            message: record.message,
201            timestamp: Some(record.timestamp),
202            thread_id: Some(record.thread_id),
203            fields: Some(record.fields),
204        }
205    }
206
207    pub fn event() -> Self {
208        Self {
209            kind: Some(RecordKind::Event),
210            ..Self::default()
211        }
212    }
213
214    pub fn span_enter() -> Self {
215        Self {
216            kind: Some(RecordKind::SpanEnter),
217            ..Self::default()
218        }
219    }
220
221    pub fn span_exit() -> Self {
222        Self {
223            kind: Some(RecordKind::SpanExit),
224            ..Self::default()
225        }
226    }
227
228    pub fn info(self) -> Self {
229        Self {
230            level: Some(Level::Info),
231            ..self
232        }
233    }
234
235    pub fn warn(self) -> Self {
236        Self {
237            level: Some(Level::Warn),
238            ..self
239        }
240    }
241
242    pub fn debug(self) -> Self {
243        Self {
244            level: Some(Level::Debug),
245            ..self
246        }
247    }
248
249    pub fn trace(self) -> Self {
250        Self {
251            level: Some(Level::Trace),
252            ..self
253        }
254    }
255
256    pub fn error(self) -> Self {
257        Self {
258            level: Some(Level::Error),
259            ..self
260        }
261    }
262
263    pub fn target(mut self, target: impl Into<String>) -> Self {
264        self.target.replace(target.into());
265        self
266    }
267
268    pub fn span(mut self, span: Span) -> Self {
269        self.span.replace(span);
270        self
271    }
272
273    pub fn level(mut self, level: Level) -> Self {
274        self.level.replace(level);
275        self
276    }
277
278    pub fn spans(mut self, spans: Vec<Span>) -> Self {
279        self.spans.replace(spans);
280        self
281    }
282
283    pub fn kind(mut self, kind: RecordKind) -> Self {
284        self.kind.replace(kind);
285        self
286    }
287
288    pub fn message(mut self, message: impl Into<String>) -> Self {
289        self.message.replace(message.into());
290        self
291    }
292
293    pub fn timestamp(mut self, timestamp: OffsetDateTime) -> Self {
294        self.timestamp.replace(timestamp);
295        self
296    }
297
298    pub fn thread_id(mut self, thread_id: impl Into<String>) -> Self {
299        self.thread_id.replace(thread_id.into());
300        self
301    }
302
303    pub fn fields(mut self, fields: serde_json::Value) -> Self {
304        self.fields.replace(fields);
305        self
306    }
307
308    pub fn try_build(self) -> Result<Record, RecordBuildError> {
309        let kind = self
310            .kind
311            .ok_or_else(|| RecordBuildError::missing_field("kind"))?;
312
313        let message = match kind {
314            RecordKind::SpanEnter => {
315                let msg_valid = self.message.map(|msg| msg == "enter").unwrap_or(true);
316                if !msg_valid {
317                    return Err(RecordBuildError::message(
318                        "span enter records cannot have \
319                             message other than \"enter\""
320                            .to_string(),
321                    ));
322                }
323                Some("enter".to_string())
324            }
325            RecordKind::SpanExit => {
326                let msg_valid = self.message.map(|msg| msg == "exit").unwrap_or(true);
327                if !msg_valid {
328                    return Err(RecordBuildError::message(
329                        "span exit records cannot have \
330                             message other than \"exit\""
331                            .to_string(),
332                    ));
333                }
334                Some("exit".to_string())
335            }
336            RecordKind::Event => self.message,
337        };
338
339        Ok(Record {
340            target: self
341                .target
342                .ok_or_else(|| RecordBuildError::missing_field("target"))?,
343            span: self.span,
344            level: self
345                .level
346                .ok_or_else(|| RecordBuildError::missing_field("level"))?,
347            spans: self.spans,
348            kind,
349            timestamp: self
350                .timestamp
351                .ok_or_else(|| RecordBuildError::missing_field("timestamp"))?,
352            thread_id: self
353                .thread_id
354                .ok_or_else(|| RecordBuildError::missing_field("thread_id"))?,
355            fields: {
356                let mut fields = self
357                    .fields
358                    .unwrap_or_else(|| serde_json::Value::Object(Map::default()));
359                if let Some(message) = &message {
360                    fields
361                        .as_object_mut()
362                        .expect("Fields must be a JSON object")
363                        .insert("message".to_string(), serde_json::Value::String(message.clone()));
364                }
365                fields
366            },
367            message,
368        })
369    }
370
371    pub fn build(self) -> Record {
372        self.try_build().unwrap()
373    }
374}
375
376pub struct RecordIter<'a> {
377    lines_iter: Lines<BufReader<Box<dyn Read + 'a>>>,
378}
379
380pub fn iterate_records(json_log_file_path: impl AsRef<Path>) -> eyre::Result<RecordIter<'static>> {
381    iterate_records_(json_log_file_path.as_ref())
382}
383
384fn iterate_records_(json_log_file_path: &Path) -> eyre::Result<RecordIter<'static>> {
385    let file = File::open(json_log_file_path)?;
386    let file_name = json_log_file_path
387        .file_name()
388        .and_then(OsStr::to_str)
389        .ok_or_else(|| eyre!("non-utf filename, cannot proceed"))?;
390    if file_name.ends_with(".jsonlog") {
391        Ok(iterate_records_from_reader(file))
392    } else if file_name.ends_with(".jsonlog.gz") {
393        Ok(iterate_records_from_reader(GzDecoder::new(file)))
394    } else {
395        Err(eyre!("unexpected extension. Expected .jsonlog or .jsonlog.gz"))
396    }
397}
398
399pub fn iterate_records_from_reader<'a, R: Read + 'a>(reader: R) -> RecordIter<'a> {
400    iterate_records_from_reader_(BufReader::new(Box::new(reader)))
401}
402
403fn iterate_records_from_reader_<'a>(reader: BufReader<Box<dyn Read + 'a>>) -> RecordIter<'a> {
404    RecordIter {
405        lines_iter: reader.lines(),
406    }
407}
408
409pub fn write_records(mut writer: impl Write, records: impl Iterator<Item = Record>) -> io::Result<()> {
410    for record in records {
411        let raw_record = RawRecord::from_record(record);
412        serde_json::to_writer(&mut writer, &raw_record)?;
413        writer.write_all(b"\n")?;
414    }
415    Ok(())
416}
417
418impl<'a> Iterator for RecordIter<'a> {
419    // TODO: Use a proper error type here
420    type Item = eyre::Result<Record>;
421
422    fn next(&mut self) -> Option<Self::Item> {
423        while let Some(line_result) = self.lines_iter.next() {
424            match line_result {
425                Ok(line) if line.trim().is_empty() => {}
426                Ok(line) => {
427                    return Some(
428                        serde_json::from_str(&line)
429                            .map_err(|err| ErrReport::from(err))
430                            .and_then(|raw_record: RawRecord| raw_record.try_to_record()),
431                    )
432                }
433                Err(err) => {
434                    return Some(Err(err.into()));
435                }
436            }
437        }
438
439        None
440    }
441}
442
443#[derive(Debug, Deserialize, Serialize)]
444struct RawRecord {
445    // TODO: Consider replacing time with Chrono. From my understanding, only Chrono
446    // properly and soundly works with local time on Linux
447    #[serde(with = "time::serde::rfc3339")]
448    timestamp: OffsetDateTime,
449    level: String,
450    fields: serde_json::Value,
451    target: String,
452    #[serde(skip_serializing_if = "Option::is_none")]
453    span: Option<serde_json::Value>,
454    #[serde(skip_serializing_if = "Option::is_none")]
455    spans: Option<Vec<serde_json::Value>>,
456    #[serde(rename = "threadId")]
457    thread_id: String,
458}
459
460impl RawRecord {
461    fn try_to_record(self) -> eyre::Result<Record> {
462        let message = self.fields.pointer("/message").and_then(|val| val.as_str());
463
464        Ok(Record {
465            target: self.target,
466            span: self
467                .span
468                .map(|json_val| Span::try_from_json_value(json_val))
469                .transpose()?,
470            level: Level::from_str(&self.level)?,
471            spans: self
472                .spans
473                .map(|json_vals| {
474                    json_vals
475                        .into_iter()
476                        .map(Span::try_from_json_value)
477                        .collect::<eyre::Result<_>>()
478                })
479                .transpose()?,
480            kind: match message {
481                Some(string) if string == "enter" => RecordKind::SpanEnter,
482                Some(string) if string == "exit" => RecordKind::SpanExit,
483                _ => RecordKind::Event,
484            },
485            message: message.map(str::to_string),
486            timestamp: self.timestamp,
487            thread_id: self.thread_id,
488            fields: self.fields,
489        })
490    }
491
492    fn from_record(record: Record) -> Self {
493        let mut fields = record.fields;
494
495        let mut message = record.message;
496        match record.kind {
497            RecordKind::SpanEnter => {
498                message.replace("enter".to_string());
499            }
500            RecordKind::SpanExit => {
501                message.replace("exit".to_string());
502            }
503            RecordKind::Event => {}
504        }
505
506        if let Some(message) = message {
507            fields
508                .as_object_mut()
509                .expect("Fields must always have object type")
510                .insert("message".to_string(), Value::String(message));
511        }
512
513        Self {
514            timestamp: record.timestamp,
515            level: record.level.to_string(),
516            fields,
517            target: record.target,
518            span: record.span.map(|span| span.to_json_value()),
519            spans: record
520                .spans
521                .map(|spans| spans.into_iter().map(|span| span.to_json_value()).collect()),
522            thread_id: record.thread_id,
523        }
524    }
525}
526
527// We reproduce a Level enum here so that we don't have to depend on tracing only for that one
528// type
529#[derive(Debug, Clone, Copy, PartialEq, Eq)]
530pub enum Level {
531    Error,
532    Warn,
533    Info,
534    Debug,
535    Trace,
536}
537
538#[derive(Debug, Clone)]
539pub struct InvalidLevelString;
540
541impl std::error::Error for InvalidLevelString {}
542
543impl Display for InvalidLevelString {
544    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
545        write!(f, "invalid level")
546    }
547}
548
549impl FromStr for Level {
550    type Err = InvalidLevelString;
551
552    fn from_str(s: &str) -> Result<Self, Self::Err> {
553        let trimmed = s.trim();
554        if trimmed.eq_ignore_ascii_case("ERROR") {
555            Ok(Self::Error)
556        } else if trimmed.eq_ignore_ascii_case("WARN") {
557            Ok(Self::Warn)
558        } else if trimmed.eq_ignore_ascii_case("INFO") {
559            Ok(Self::Info)
560        } else if trimmed.eq_ignore_ascii_case("DEBUG") {
561            Ok(Self::Debug)
562        } else if trimmed.eq_ignore_ascii_case("TRACE") {
563            Ok(Self::Trace)
564        } else {
565            Err(InvalidLevelString)
566        }
567    }
568}
569
570impl ToString for Level {
571    fn to_string(&self) -> String {
572        match self {
573            Level::Error => "ERROR",
574            Level::Warn => "WARN",
575            Level::Info => "INFO",
576            Level::Debug => "DEBUG",
577            Level::Trace => "TRACE",
578        }
579        .to_string()
580    }
581}
582
583// #[macro_export]
584// macro_rules! record {
585//     ($)
586//     // ($($field:ident = $val:expr),*) => {
587//     //     {
588//     //         $crate::RecordBuilder::new()
589//     //             $(.with_$field($val))*
590//     //             .build()
591//     //     }
592//     // }
593// }