1use eyre::{eyre, ErrReport};
2use flate2::read::GzDecoder;
3use serde::{Deserialize, Serialize};
4use serde_json::{Map, Value};
5use std::ffi::OsStr;
6use std::fmt::{Display, Formatter};
7use std::fs::File;
8use std::io;
9use std::io::{BufRead, BufReader, Lines, Read, Write};
10use std::path::Path;
11use std::str::FromStr;
12use time::OffsetDateTime;
13
14pub mod timing;
15
16mod span_path;
17pub use span_path::SpanPath;
18
19mod span_tree;
20pub use span_tree::{SpanTree, SpanTreeNode};
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct Span {
24 name: String,
25 fields: serde_json::Value,
27}
28
29impl Span {
30 pub fn from_name_and_fields(name: impl Into<String>, fields: serde_json::Value) -> Self {
31 let mut fields = fields;
32 let name = name.into();
33 fields
38 .as_object_mut()
39 .expect("fields must be a JSON object")
40 .insert("name".to_string(), serde_json::Value::String(name.clone()));
41 Self { name, fields }
42 }
43
44 fn try_from_json_value(value: serde_json::Value) -> eyre::Result<Self> {
45 let name = value
46 .as_object()
47 .and_then(|obj| obj.get("name").and_then(|val| val.as_str()))
48 .ok_or_else(|| eyre!("missing name in span"))?
49 .to_string();
50 Ok(Self { name, fields: value })
51 }
52
53 fn to_json_value(self) -> serde_json::Value {
54 let mut fields = self.fields;
55 *fields.get_mut("name").unwrap() = Value::String(self.name);
56 fields
57 }
58
59 pub fn name(&self) -> &str {
60 &self.name
61 }
62
63 pub fn fields(&self) -> &serde_json::Value {
64 &self.fields
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum RecordKind {
70 SpanEnter,
71 SpanExit,
72 Event,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct Record {
77 target: String,
78 span: Option<Span>,
79 level: Level,
80 spans: Option<Vec<Span>>,
81 kind: RecordKind,
82 message: Option<String>,
83 timestamp: OffsetDateTime,
84 thread_id: String,
85 fields: serde_json::Value,
86}
87
88impl Record {
89 pub fn level(&self) -> Level {
90 self.level
91 }
92
93 pub fn span(&self) -> Option<&Span> {
94 self.span.as_ref()
95 }
96
97 pub fn spans(&self) -> Option<&[Span]> {
98 self.spans.as_ref().map(Vec::as_ref)
99 }
100
101 pub fn target(&self) -> &str {
102 &self.target
103 }
104
105 pub fn message(&self) -> Option<&str> {
106 self.message.as_ref().map(AsRef::as_ref)
107 }
108
109 pub fn kind(&self) -> RecordKind {
110 self.kind
111 }
112
113 pub fn timestamp(&self) -> &OffsetDateTime {
114 &self.timestamp
115 }
116
117 pub fn create_span_path(&self) -> eyre::Result<SpanPath> {
122 let mut span_names: Vec<_> = self
123 .spans
124 .iter()
125 .flatten()
126 .map(|span| span.name.clone())
127 .collect();
128 match self.kind() {
129 RecordKind::SpanEnter | RecordKind::Event => {}
130 RecordKind::SpanExit => {
131 let span_name = self
134 .span()
135 .map(|span| span.name())
136 .ok_or_else(|| eyre!("No span in exit record"))?;
137 span_names.push(span_name.to_string());
138 }
139 }
140 Ok(SpanPath::new(span_names))
141 }
142
143 pub fn thread_id(&self) -> &str {
144 &self.thread_id
145 }
146
147 pub fn fields(&self) -> &serde_json::Value {
148 &self.fields
149 }
150}
151
152#[derive(Default, Debug, Clone)]
153pub struct RecordBuilder {
154 target: Option<String>,
155 span: Option<Span>,
156 level: Option<Level>,
157 spans: Option<Vec<Span>>,
158 kind: Option<RecordKind>,
159 message: Option<String>,
160 timestamp: Option<OffsetDateTime>,
161 thread_id: Option<String>,
162 fields: Option<serde_json::Value>,
163}
164
165#[derive(Debug, Clone)]
166pub struct RecordBuildError {
167 message: String,
168}
169
170impl Display for RecordBuildError {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 write!(f, "error building record: {}", &self.message)
173 }
174}
175
176impl RecordBuildError {
177 fn missing_field(field_name: &str) -> Self {
178 Self {
179 message: format!("missing field {field_name} in Record construction"),
180 }
181 }
182
183 fn message(message: String) -> Self {
184 Self { message }
185 }
186}
187
188impl RecordBuilder {
189 pub fn new() -> Self {
190 Self::default()
191 }
192
193 pub fn from_record(record: Record) -> Self {
194 Self {
195 target: Some(record.target),
196 span: record.span,
197 level: Some(record.level),
198 spans: record.spans,
199 kind: Some(record.kind),
200 message: record.message,
201 timestamp: Some(record.timestamp),
202 thread_id: Some(record.thread_id),
203 fields: Some(record.fields),
204 }
205 }
206
207 pub fn event() -> Self {
208 Self {
209 kind: Some(RecordKind::Event),
210 ..Self::default()
211 }
212 }
213
214 pub fn span_enter() -> Self {
215 Self {
216 kind: Some(RecordKind::SpanEnter),
217 ..Self::default()
218 }
219 }
220
221 pub fn span_exit() -> Self {
222 Self {
223 kind: Some(RecordKind::SpanExit),
224 ..Self::default()
225 }
226 }
227
228 pub fn info(self) -> Self {
229 Self {
230 level: Some(Level::Info),
231 ..self
232 }
233 }
234
235 pub fn warn(self) -> Self {
236 Self {
237 level: Some(Level::Warn),
238 ..self
239 }
240 }
241
242 pub fn debug(self) -> Self {
243 Self {
244 level: Some(Level::Debug),
245 ..self
246 }
247 }
248
249 pub fn trace(self) -> Self {
250 Self {
251 level: Some(Level::Trace),
252 ..self
253 }
254 }
255
256 pub fn error(self) -> Self {
257 Self {
258 level: Some(Level::Error),
259 ..self
260 }
261 }
262
263 pub fn target(mut self, target: impl Into<String>) -> Self {
264 self.target.replace(target.into());
265 self
266 }
267
268 pub fn span(mut self, span: Span) -> Self {
269 self.span.replace(span);
270 self
271 }
272
273 pub fn level(mut self, level: Level) -> Self {
274 self.level.replace(level);
275 self
276 }
277
278 pub fn spans(mut self, spans: Vec<Span>) -> Self {
279 self.spans.replace(spans);
280 self
281 }
282
283 pub fn kind(mut self, kind: RecordKind) -> Self {
284 self.kind.replace(kind);
285 self
286 }
287
288 pub fn message(mut self, message: impl Into<String>) -> Self {
289 self.message.replace(message.into());
290 self
291 }
292
293 pub fn timestamp(mut self, timestamp: OffsetDateTime) -> Self {
294 self.timestamp.replace(timestamp);
295 self
296 }
297
298 pub fn thread_id(mut self, thread_id: impl Into<String>) -> Self {
299 self.thread_id.replace(thread_id.into());
300 self
301 }
302
303 pub fn fields(mut self, fields: serde_json::Value) -> Self {
304 self.fields.replace(fields);
305 self
306 }
307
308 pub fn try_build(self) -> Result<Record, RecordBuildError> {
309 let kind = self
310 .kind
311 .ok_or_else(|| RecordBuildError::missing_field("kind"))?;
312
313 let message = match kind {
314 RecordKind::SpanEnter => {
315 let msg_valid = self.message.map(|msg| msg == "enter").unwrap_or(true);
316 if !msg_valid {
317 return Err(RecordBuildError::message(
318 "span enter records cannot have \
319 message other than \"enter\""
320 .to_string(),
321 ));
322 }
323 Some("enter".to_string())
324 }
325 RecordKind::SpanExit => {
326 let msg_valid = self.message.map(|msg| msg == "exit").unwrap_or(true);
327 if !msg_valid {
328 return Err(RecordBuildError::message(
329 "span exit records cannot have \
330 message other than \"exit\""
331 .to_string(),
332 ));
333 }
334 Some("exit".to_string())
335 }
336 RecordKind::Event => self.message,
337 };
338
339 Ok(Record {
340 target: self
341 .target
342 .ok_or_else(|| RecordBuildError::missing_field("target"))?,
343 span: self.span,
344 level: self
345 .level
346 .ok_or_else(|| RecordBuildError::missing_field("level"))?,
347 spans: self.spans,
348 kind,
349 timestamp: self
350 .timestamp
351 .ok_or_else(|| RecordBuildError::missing_field("timestamp"))?,
352 thread_id: self
353 .thread_id
354 .ok_or_else(|| RecordBuildError::missing_field("thread_id"))?,
355 fields: {
356 let mut fields = self
357 .fields
358 .unwrap_or_else(|| serde_json::Value::Object(Map::default()));
359 if let Some(message) = &message {
360 fields
361 .as_object_mut()
362 .expect("Fields must be a JSON object")
363 .insert("message".to_string(), serde_json::Value::String(message.clone()));
364 }
365 fields
366 },
367 message,
368 })
369 }
370
371 pub fn build(self) -> Record {
372 self.try_build().unwrap()
373 }
374}
375
376pub struct RecordIter<'a> {
377 lines_iter: Lines<BufReader<Box<dyn Read + 'a>>>,
378}
379
380pub fn iterate_records(json_log_file_path: impl AsRef<Path>) -> eyre::Result<RecordIter<'static>> {
381 iterate_records_(json_log_file_path.as_ref())
382}
383
384fn iterate_records_(json_log_file_path: &Path) -> eyre::Result<RecordIter<'static>> {
385 let file = File::open(json_log_file_path)?;
386 let file_name = json_log_file_path
387 .file_name()
388 .and_then(OsStr::to_str)
389 .ok_or_else(|| eyre!("non-utf filename, cannot proceed"))?;
390 if file_name.ends_with(".jsonlog") {
391 Ok(iterate_records_from_reader(file))
392 } else if file_name.ends_with(".jsonlog.gz") {
393 Ok(iterate_records_from_reader(GzDecoder::new(file)))
394 } else {
395 Err(eyre!("unexpected extension. Expected .jsonlog or .jsonlog.gz"))
396 }
397}
398
399pub fn iterate_records_from_reader<'a, R: Read + 'a>(reader: R) -> RecordIter<'a> {
400 iterate_records_from_reader_(BufReader::new(Box::new(reader)))
401}
402
403fn iterate_records_from_reader_<'a>(reader: BufReader<Box<dyn Read + 'a>>) -> RecordIter<'a> {
404 RecordIter {
405 lines_iter: reader.lines(),
406 }
407}
408
409pub fn write_records(mut writer: impl Write, records: impl Iterator<Item = Record>) -> io::Result<()> {
410 for record in records {
411 let raw_record = RawRecord::from_record(record);
412 serde_json::to_writer(&mut writer, &raw_record)?;
413 writer.write_all(b"\n")?;
414 }
415 Ok(())
416}
417
418impl<'a> Iterator for RecordIter<'a> {
419 type Item = eyre::Result<Record>;
421
422 fn next(&mut self) -> Option<Self::Item> {
423 while let Some(line_result) = self.lines_iter.next() {
424 match line_result {
425 Ok(line) if line.trim().is_empty() => {}
426 Ok(line) => {
427 return Some(
428 serde_json::from_str(&line)
429 .map_err(|err| ErrReport::from(err))
430 .and_then(|raw_record: RawRecord| raw_record.try_to_record()),
431 )
432 }
433 Err(err) => {
434 return Some(Err(err.into()));
435 }
436 }
437 }
438
439 None
440 }
441}
442
443#[derive(Debug, Deserialize, Serialize)]
444struct RawRecord {
445 #[serde(with = "time::serde::rfc3339")]
448 timestamp: OffsetDateTime,
449 level: String,
450 fields: serde_json::Value,
451 target: String,
452 #[serde(skip_serializing_if = "Option::is_none")]
453 span: Option<serde_json::Value>,
454 #[serde(skip_serializing_if = "Option::is_none")]
455 spans: Option<Vec<serde_json::Value>>,
456 #[serde(rename = "threadId")]
457 thread_id: String,
458}
459
460impl RawRecord {
461 fn try_to_record(self) -> eyre::Result<Record> {
462 let message = self.fields.pointer("/message").and_then(|val| val.as_str());
463
464 Ok(Record {
465 target: self.target,
466 span: self
467 .span
468 .map(|json_val| Span::try_from_json_value(json_val))
469 .transpose()?,
470 level: Level::from_str(&self.level)?,
471 spans: self
472 .spans
473 .map(|json_vals| {
474 json_vals
475 .into_iter()
476 .map(Span::try_from_json_value)
477 .collect::<eyre::Result<_>>()
478 })
479 .transpose()?,
480 kind: match message {
481 Some(string) if string == "enter" => RecordKind::SpanEnter,
482 Some(string) if string == "exit" => RecordKind::SpanExit,
483 _ => RecordKind::Event,
484 },
485 message: message.map(str::to_string),
486 timestamp: self.timestamp,
487 thread_id: self.thread_id,
488 fields: self.fields,
489 })
490 }
491
492 fn from_record(record: Record) -> Self {
493 let mut fields = record.fields;
494
495 let mut message = record.message;
496 match record.kind {
497 RecordKind::SpanEnter => {
498 message.replace("enter".to_string());
499 }
500 RecordKind::SpanExit => {
501 message.replace("exit".to_string());
502 }
503 RecordKind::Event => {}
504 }
505
506 if let Some(message) = message {
507 fields
508 .as_object_mut()
509 .expect("Fields must always have object type")
510 .insert("message".to_string(), Value::String(message));
511 }
512
513 Self {
514 timestamp: record.timestamp,
515 level: record.level.to_string(),
516 fields,
517 target: record.target,
518 span: record.span.map(|span| span.to_json_value()),
519 spans: record
520 .spans
521 .map(|spans| spans.into_iter().map(|span| span.to_json_value()).collect()),
522 thread_id: record.thread_id,
523 }
524 }
525}
526
527#[derive(Debug, Clone, Copy, PartialEq, Eq)]
530pub enum Level {
531 Error,
532 Warn,
533 Info,
534 Debug,
535 Trace,
536}
537
538#[derive(Debug, Clone)]
539pub struct InvalidLevelString;
540
541impl std::error::Error for InvalidLevelString {}
542
543impl Display for InvalidLevelString {
544 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
545 write!(f, "invalid level")
546 }
547}
548
549impl FromStr for Level {
550 type Err = InvalidLevelString;
551
552 fn from_str(s: &str) -> Result<Self, Self::Err> {
553 let trimmed = s.trim();
554 if trimmed.eq_ignore_ascii_case("ERROR") {
555 Ok(Self::Error)
556 } else if trimmed.eq_ignore_ascii_case("WARN") {
557 Ok(Self::Warn)
558 } else if trimmed.eq_ignore_ascii_case("INFO") {
559 Ok(Self::Info)
560 } else if trimmed.eq_ignore_ascii_case("DEBUG") {
561 Ok(Self::Debug)
562 } else if trimmed.eq_ignore_ascii_case("TRACE") {
563 Ok(Self::Trace)
564 } else {
565 Err(InvalidLevelString)
566 }
567 }
568}
569
570impl ToString for Level {
571 fn to_string(&self) -> String {
572 match self {
573 Level::Error => "ERROR",
574 Level::Warn => "WARN",
575 Level::Info => "INFO",
576 Level::Debug => "DEBUG",
577 Level::Trace => "TRACE",
578 }
579 .to_string()
580 }
581}
582
583