jaslog/
line_formats.rs

1use crate::format::colored_with_level;
2use colored::*;
3use serde::{Deserialize, Serialize};
4use serde_json::{Map, Value};
5extern crate chrono;
6use chrono::prelude::*;
7
8pub trait FormatLogLine {
9  fn format(&self) -> ColoredString;
10}
11
12pub trait ToColoredString {
13  fn to_colored_string(entry: &Value) -> Option<ColoredString>;
14}
15
16fn format_mdc(mdc: &Map<String, Value>) -> String {
17  if !mdc.is_empty() {
18    let res = mdc
19      .clone()
20      .into_iter()
21      .map(|(key, value)| {
22        let shown_value = match value {
23          Value::String(val) => val,
24          other => format!("{other:?}"),
25        };
26        format!("{key}={}", shown_value.trim())
27      })
28      .collect::<Vec<String>>();
29    format!("[{}]", res.join(","))
30  } else {
31    "".to_string()
32  }
33}
34
35//////////////////////////////////
36/// ElixirLogLine
37//////////////////////////////////
38
39#[derive(Serialize, Deserialize)]
40pub struct ElixirLogLine {
41  app: String,
42  level: String,
43  message: String,
44  module: String,
45  pid: String,
46  timestamp: String,
47}
48
49impl FormatLogLine for ElixirLogLine {
50  fn format(&self) -> ColoredString {
51    colored_with_level(
52      &self.level,
53      &format!("{} {}", &self.format_meta().dimmed(), &self.message),
54    )
55  }
56}
57
58impl ElixirLogLine {
59  fn format_meta(&self) -> String {
60    format!(
61      "[{}] [{}] [{}] [{}] [{}]",
62      self.timestamp, self.level, self.app, self.module, self.pid
63    )
64  }
65}
66
67impl ToColoredString for ElixirLogLine {
68  fn to_colored_string(entry: &Value) -> Option<ColoredString> {
69    match ElixirLogLine::deserialize(entry) {
70      Err(_) => None,
71      Ok(line) => Some(line.format()),
72    }
73  }
74}
75
76//////////////////////////////////
77/// ElixirExtendedLogLine
78//////////////////////////////////
79
80#[derive(Serialize, Deserialize)]
81pub struct ElixirExtendedLogLine {
82  application: String,
83  level: String,
84  message: String,
85  module: String,
86  pid: String,
87  timestamp: String,
88}
89
90impl FormatLogLine for ElixirExtendedLogLine {
91  fn format(&self) -> ColoredString {
92    colored_with_level(
93      &self.level,
94      &format!("{} {}", &self.format_meta().dimmed(), &self.message),
95    )
96  }
97}
98
99impl ElixirExtendedLogLine {
100  fn format_meta(&self) -> String {
101    format!(
102      "[{}] [{}] [{}] [{}] [{}]",
103      self.timestamp, self.level, self.application, self.module, self.pid
104    )
105  }
106}
107
108impl ToColoredString for ElixirExtendedLogLine {
109  fn to_colored_string(entry: &Value) -> Option<ColoredString> {
110    match ElixirExtendedLogLine::deserialize(entry) {
111      Err(_) => None,
112      Ok(line) => Some(line.format()),
113    }
114  }
115}
116
117//////////////////////////////////
118/// LogstashJavaLogLine
119//////////////////////////////////
120
121#[derive(Serialize, Deserialize)]
122pub struct LogstashJavaLogLine {
123  level: String,
124  message: String,
125  logger_name: String,
126  thread_name: String,
127  #[serde(alias = "@timestamp")]
128  timestamp: String,
129  // --- unused
130  // level_value: i16,
131  // @version: 1,
132  // tags: list of tags (optional)
133  #[serde(default)]
134  mdc: Map<String, Value>,
135
136  #[serde(default)]
137  stack_trace: String,
138  #[serde(default)]
139  exception: LogstashLogLineException,
140}
141
142#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
143struct LogstashLogLineException {
144  #[serde(default, alias = "exception_message")]
145  message: String,
146  #[serde(default, alias = "exception_class")]
147  class: String,
148  #[serde(default)]
149  stacktrace: String,
150}
151
152impl FormatLogLine for LogstashJavaLogLine {
153  fn format(&self) -> ColoredString {
154    colored_with_level(
155      &self.level,
156      &format!(
157        "{} {}{}",
158        &self.format_meta().dimmed(),
159        &self.message,
160        &self.format_stacktrace()
161      ),
162    )
163  }
164}
165
166impl ToColoredString for LogstashJavaLogLine {
167  fn to_colored_string(entry: &Value) -> Option<ColoredString> {
168    match LogstashJavaLogLine::deserialize(entry) {
169      Err(_) => None,
170      Ok(line) => Some(line.format()),
171    }
172  }
173}
174
175impl LogstashJavaLogLine {
176  fn format_meta(&self) -> String {
177    format!(
178      "[{}] [{}] [{}] [{}]{}",
179      self.timestamp,
180      self.level,
181      self.logger_name,
182      self.thread_name,
183      self.format_mdc()
184    )
185  }
186  fn format_stacktrace(&self) -> ColoredString {
187    if !self.exception.message.is_empty() {
188      format!(
189        "\n\t{} ({})\n\t{}",
190        self.exception.message,
191        self.exception.class,
192        self.exception.stacktrace.replace('\n', "\n\t")
193      )
194      .red()
195    } else if !self.stack_trace.is_empty() {
196      format!("\n\t{}", self.stack_trace.replace('\n', "\n\t")).red()
197    } else {
198      "".normal()
199    }
200  }
201
202  fn format_mdc(&self) -> String {
203    format_mdc(&self.mdc)
204  }
205}
206
207//////////////////////////////////
208/// Log4J's default JSONLayout
209//////////////////////////////////
210
211#[derive(Serialize, Deserialize, PartialEq, Debug)]
212pub struct Log4JJsonLayoutLogLine {
213  #[serde(alias = "thread")]
214  thread_name: String,
215  level: String,
216  #[serde(alias = "loggerName")]
217  logger_name: String,
218  #[serde(alias = "endOfBatch")]
219  end_of_batch: bool,
220  #[serde(alias = "loggerFqcn")]
221  logger_fqcn: String,
222  message: String,
223
224  instant: Log4JJsonLayoutLogLineInstant,
225  #[serde(alias = "threadId")]
226  thread_id: i32,
227  #[serde(alias = "threadPriority")]
228  thread_priority: i32,
229  #[serde(default)]
230  thrown: Log4JJsonLayoutLogLineThrown,
231  #[serde(default)]
232  mdc: Map<String, Value>,
233}
234
235#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
236struct Log4JJsonLayoutLogLineThrown {
237  #[serde(alias = "commonElementCount", default)]
238  common_element_count: i32,
239  #[serde(alias = "localizedMessage", default)]
240  localized_message: String,
241  #[serde(default)]
242  message: String,
243  #[serde(default)]
244  name: String,
245}
246
247#[derive(Serialize, Deserialize, PartialEq, Debug)]
248struct Log4JJsonLayoutLogLineInstant {
249  #[serde(alias = "epochSecond")]
250  epoch_second: i64,
251  #[serde(alias = "nanoOfSecond")]
252  nano_of_second: u32,
253}
254
255impl FormatLogLine for Log4JJsonLayoutLogLine {
256  fn format(&self) -> ColoredString {
257    colored_with_level(
258      &self.level,
259      &format!(
260        "{} {}{}",
261        &self.format_meta().dimmed(),
262        &self.message,
263        &self.format_stacktrace()
264      ),
265    )
266  }
267}
268
269impl ToColoredString for Log4JJsonLayoutLogLine {
270  fn to_colored_string(entry: &Value) -> Option<ColoredString> {
271    match Log4JJsonLayoutLogLine::deserialize(entry) {
272      Err(_) => None,
273      Ok(line) => Some(line.format()),
274    }
275  }
276}
277
278impl Log4JJsonLayoutLogLine {
279  fn format_stacktrace(&self) -> ColoredString {
280    if !self.thrown.message.is_empty() && !self.thrown.name.is_empty() {
281      format!(
282        "\n\t{}\n\t{}",
283        self.thrown.name,
284        self.thrown.message.replace('\n', "\n\t")
285      )
286      .red()
287    } else if !self.thrown.message.is_empty() && self.thrown.name.is_empty() {
288      format!("\n\t{}", self.thrown.message.replace('\n', "\n\t")).red()
289    } else if !self.thrown.name.is_empty() {
290      format!(" ({})", self.thrown.name).red()
291    } else {
292      "".normal()
293    }
294  }
295
296  fn format_meta(&self) -> String {
297    format!(
298      "[{}] [{}] [{}] [{}]{}",
299      self.format_date(),
300      self.level,
301      self.logger_name,
302      self.thread_name,
303      self.format_mdc()
304    )
305  }
306
307  fn format_mdc(&self) -> String {
308    format_mdc(&self.mdc)
309  }
310
311  fn format_date(&self) -> String {
312    DateTime::from_timestamp(self.instant.epoch_second, self.instant.nano_of_second)
313      .map(|datetime: DateTime<Utc>| datetime.format("%+").to_string())
314      .unwrap_or(self.instant.epoch_second.to_string())
315  }
316}
317
318// {
319//   "epochSecond": 1622724607,
320//   "nanoOfSecond": 420000000
321// }
322
323// {"epochSecond":1675671221,"nanoOfSecond":782873000}
324
325#[cfg(test)]
326mod tests {
327
328  use super::*;
329
330  fn with_epoch_and_nano_format_date(epoch_second: i64, nano_of_second: u32) -> String {
331    Log4JJsonLayoutLogLine {
332      thread_name: String::new(),
333      level: String::new(),
334      logger_name: String::new(),
335      end_of_batch: true,
336      logger_fqcn: String::new(),
337      message: String::new(),
338      instant: Log4JJsonLayoutLogLineInstant {
339        epoch_second,
340        nano_of_second,
341      },
342      thread_id: 0,
343      thread_priority: 0,
344      thrown: Log4JJsonLayoutLogLineThrown::default(),
345      mdc: Map::new(),
346    }
347    .format_date()
348  }
349
350  #[test]
351  fn test_epoch_formatting_works() {
352    assert_eq!(
353      with_epoch_and_nano_format_date(1622724607, 420000000),
354      "2021-06-03T12:50:07.420+00:00"
355    );
356    assert_eq!(
357      with_epoch_and_nano_format_date(1675671221, 782873000),
358      "2023-02-06T08:13:41.782873+00:00"
359    );
360    assert_eq!(
361      with_epoch_and_nano_format_date(1675671481, 452180000),
362      "2023-02-06T08:18:01.452180+00:00"
363    );
364  }
365
366  #[test]
367  fn test_parse_log4j_line() {
368    let value = json!({
369      "instant": {
370        "epochSecond": 1675671481,
371        "nanoOfSecond": 452180000
372      },
373      "thread": "Source Data Fetcher for Source: MySourceName -> *anonymous_datastream_source$4*[18] (2/2)#2798",
374      "level": "INFO",
375      "loggerName": "org.apache.kafka.clients.FetchSessionHandler",
376      "message": "[Consumer clientId=name_72e14600-16b6-4c27-aff0-fae92ae52650-1, groupId=name_72e14600-16b6-4c27-aff0-fae92ae52650] Error sending fetch request (sessionId=1995808239, epoch=INITIAL) to node 0:",
377      "thrown": {
378        "commonElementCount": 0,
379        "name": "org.apache.kafka.common.errors.DisconnectException"
380      },
381      "endOfBatch": false,
382      "loggerFqcn": "org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger",
383      "threadId": 664,
384      "threadPriority": 5
385    });
386
387    let actual = Log4JJsonLayoutLogLine::deserialize(value).expect("Failed to unwrap test json");
388    let expected = Log4JJsonLayoutLogLine {
389      thread_name: "Source Data Fetcher for Source: MySourceName -> *anonymous_datastream_source$4*[18] (2/2)#2798".to_string(),
390      level: "INFO".to_string(),
391      logger_name: "org.apache.kafka.clients.FetchSessionHandler".to_string(),
392      end_of_batch: false,
393      logger_fqcn: "org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger".to_string(),
394      message: "[Consumer clientId=name_72e14600-16b6-4c27-aff0-fae92ae52650-1, groupId=name_72e14600-16b6-4c27-aff0-fae92ae52650] Error sending fetch request (sessionId=1995808239, epoch=INITIAL) to node 0:".to_string(),
395      instant: Log4JJsonLayoutLogLineInstant {
396        epoch_second: 1675671481,
397        nano_of_second: 452180000,
398      },
399      thread_id: 664,
400      thread_priority: 5,
401      thrown: Log4JJsonLayoutLogLineThrown {
402        common_element_count: 0,
403        localized_message: "".to_string(),
404        message: "".to_string(),
405        name: "org.apache.kafka.common.errors.DisconnectException".to_string()
406      },
407      mdc: Map::new(),
408    };
409
410    assert_eq!(actual, expected);
411  }
412}