1use crate::format::colored_with_level;
2use colored::*;
3use serde::{Deserialize, Serialize};
4use serde_json::{Map, Value};
5extern crate chrono;
6use chrono::prelude::*;
7
8pub trait FormatLogLine {
9 fn format(&self) -> ColoredString;
10}
11
12pub trait ToColoredString {
13 fn to_colored_string(entry: &Value) -> Option<ColoredString>;
14}
15
16fn format_mdc(mdc: &Map<String, Value>) -> String {
17 if !mdc.is_empty() {
18 let res = mdc
19 .clone()
20 .into_iter()
21 .map(|(key, value)| {
22 let shown_value = match value {
23 Value::String(val) => val,
24 other => format!("{other:?}"),
25 };
26 format!("{key}={}", shown_value.trim())
27 })
28 .collect::<Vec<String>>();
29 format!("[{}]", res.join(","))
30 } else {
31 "".to_string()
32 }
33}
34
35#[derive(Serialize, Deserialize)]
40pub struct ElixirLogLine {
41 app: String,
42 level: String,
43 message: String,
44 module: String,
45 pid: String,
46 timestamp: String,
47}
48
49impl FormatLogLine for ElixirLogLine {
50 fn format(&self) -> ColoredString {
51 colored_with_level(
52 &self.level,
53 &format!("{} {}", &self.format_meta().dimmed(), &self.message),
54 )
55 }
56}
57
58impl ElixirLogLine {
59 fn format_meta(&self) -> String {
60 format!(
61 "[{}] [{}] [{}] [{}] [{}]",
62 self.timestamp, self.level, self.app, self.module, self.pid
63 )
64 }
65}
66
67impl ToColoredString for ElixirLogLine {
68 fn to_colored_string(entry: &Value) -> Option<ColoredString> {
69 match ElixirLogLine::deserialize(entry) {
70 Err(_) => None,
71 Ok(line) => Some(line.format()),
72 }
73 }
74}
75
76#[derive(Serialize, Deserialize)]
81pub struct ElixirExtendedLogLine {
82 application: String,
83 level: String,
84 message: String,
85 module: String,
86 pid: String,
87 timestamp: String,
88}
89
90impl FormatLogLine for ElixirExtendedLogLine {
91 fn format(&self) -> ColoredString {
92 colored_with_level(
93 &self.level,
94 &format!("{} {}", &self.format_meta().dimmed(), &self.message),
95 )
96 }
97}
98
99impl ElixirExtendedLogLine {
100 fn format_meta(&self) -> String {
101 format!(
102 "[{}] [{}] [{}] [{}] [{}]",
103 self.timestamp, self.level, self.application, self.module, self.pid
104 )
105 }
106}
107
108impl ToColoredString for ElixirExtendedLogLine {
109 fn to_colored_string(entry: &Value) -> Option<ColoredString> {
110 match ElixirExtendedLogLine::deserialize(entry) {
111 Err(_) => None,
112 Ok(line) => Some(line.format()),
113 }
114 }
115}
116
117#[derive(Serialize, Deserialize)]
122pub struct LogstashJavaLogLine {
123 level: String,
124 message: String,
125 logger_name: String,
126 thread_name: String,
127 #[serde(alias = "@timestamp")]
128 timestamp: String,
129 #[serde(default)]
134 mdc: Map<String, Value>,
135
136 #[serde(default)]
137 stack_trace: String,
138 #[serde(default)]
139 exception: LogstashLogLineException,
140}
141
142#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
143struct LogstashLogLineException {
144 #[serde(default, alias = "exception_message")]
145 message: String,
146 #[serde(default, alias = "exception_class")]
147 class: String,
148 #[serde(default)]
149 stacktrace: String,
150}
151
152impl FormatLogLine for LogstashJavaLogLine {
153 fn format(&self) -> ColoredString {
154 colored_with_level(
155 &self.level,
156 &format!(
157 "{} {}{}",
158 &self.format_meta().dimmed(),
159 &self.message,
160 &self.format_stacktrace()
161 ),
162 )
163 }
164}
165
166impl ToColoredString for LogstashJavaLogLine {
167 fn to_colored_string(entry: &Value) -> Option<ColoredString> {
168 match LogstashJavaLogLine::deserialize(entry) {
169 Err(_) => None,
170 Ok(line) => Some(line.format()),
171 }
172 }
173}
174
175impl LogstashJavaLogLine {
176 fn format_meta(&self) -> String {
177 format!(
178 "[{}] [{}] [{}] [{}]{}",
179 self.timestamp,
180 self.level,
181 self.logger_name,
182 self.thread_name,
183 self.format_mdc()
184 )
185 }
186 fn format_stacktrace(&self) -> ColoredString {
187 if !self.exception.message.is_empty() {
188 format!(
189 "\n\t{} ({})\n\t{}",
190 self.exception.message,
191 self.exception.class,
192 self.exception.stacktrace.replace('\n', "\n\t")
193 )
194 .red()
195 } else if !self.stack_trace.is_empty() {
196 format!("\n\t{}", self.stack_trace.replace('\n', "\n\t")).red()
197 } else {
198 "".normal()
199 }
200 }
201
202 fn format_mdc(&self) -> String {
203 format_mdc(&self.mdc)
204 }
205}
206
207#[derive(Serialize, Deserialize, PartialEq, Debug)]
212pub struct Log4JJsonLayoutLogLine {
213 #[serde(alias = "thread")]
214 thread_name: String,
215 level: String,
216 #[serde(alias = "loggerName")]
217 logger_name: String,
218 #[serde(alias = "endOfBatch")]
219 end_of_batch: bool,
220 #[serde(alias = "loggerFqcn")]
221 logger_fqcn: String,
222 message: String,
223
224 instant: Log4JJsonLayoutLogLineInstant,
225 #[serde(alias = "threadId")]
226 thread_id: i32,
227 #[serde(alias = "threadPriority")]
228 thread_priority: i32,
229 #[serde(default)]
230 thrown: Log4JJsonLayoutLogLineThrown,
231 #[serde(default)]
232 mdc: Map<String, Value>,
233}
234
235#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
236struct Log4JJsonLayoutLogLineThrown {
237 #[serde(alias = "commonElementCount", default)]
238 common_element_count: i32,
239 #[serde(alias = "localizedMessage", default)]
240 localized_message: String,
241 #[serde(default)]
242 message: String,
243 #[serde(default)]
244 name: String,
245}
246
247#[derive(Serialize, Deserialize, PartialEq, Debug)]
248struct Log4JJsonLayoutLogLineInstant {
249 #[serde(alias = "epochSecond")]
250 epoch_second: i64,
251 #[serde(alias = "nanoOfSecond")]
252 nano_of_second: u32,
253}
254
255impl FormatLogLine for Log4JJsonLayoutLogLine {
256 fn format(&self) -> ColoredString {
257 colored_with_level(
258 &self.level,
259 &format!(
260 "{} {}{}",
261 &self.format_meta().dimmed(),
262 &self.message,
263 &self.format_stacktrace()
264 ),
265 )
266 }
267}
268
269impl ToColoredString for Log4JJsonLayoutLogLine {
270 fn to_colored_string(entry: &Value) -> Option<ColoredString> {
271 match Log4JJsonLayoutLogLine::deserialize(entry) {
272 Err(_) => None,
273 Ok(line) => Some(line.format()),
274 }
275 }
276}
277
278impl Log4JJsonLayoutLogLine {
279 fn format_stacktrace(&self) -> ColoredString {
280 if !self.thrown.message.is_empty() && !self.thrown.name.is_empty() {
281 format!(
282 "\n\t{}\n\t{}",
283 self.thrown.name,
284 self.thrown.message.replace('\n', "\n\t")
285 )
286 .red()
287 } else if !self.thrown.message.is_empty() && self.thrown.name.is_empty() {
288 format!("\n\t{}", self.thrown.message.replace('\n', "\n\t")).red()
289 } else if !self.thrown.name.is_empty() {
290 format!(" ({})", self.thrown.name).red()
291 } else {
292 "".normal()
293 }
294 }
295
296 fn format_meta(&self) -> String {
297 format!(
298 "[{}] [{}] [{}] [{}]{}",
299 self.format_date(),
300 self.level,
301 self.logger_name,
302 self.thread_name,
303 self.format_mdc()
304 )
305 }
306
307 fn format_mdc(&self) -> String {
308 format_mdc(&self.mdc)
309 }
310
311 fn format_date(&self) -> String {
312 DateTime::from_timestamp(self.instant.epoch_second, self.instant.nano_of_second)
313 .map(|datetime: DateTime<Utc>| datetime.format("%+").to_string())
314 .unwrap_or(self.instant.epoch_second.to_string())
315 }
316}
317
318#[cfg(test)]
326mod tests {
327
328 use super::*;
329
330 fn with_epoch_and_nano_format_date(epoch_second: i64, nano_of_second: u32) -> String {
331 Log4JJsonLayoutLogLine {
332 thread_name: String::new(),
333 level: String::new(),
334 logger_name: String::new(),
335 end_of_batch: true,
336 logger_fqcn: String::new(),
337 message: String::new(),
338 instant: Log4JJsonLayoutLogLineInstant {
339 epoch_second,
340 nano_of_second,
341 },
342 thread_id: 0,
343 thread_priority: 0,
344 thrown: Log4JJsonLayoutLogLineThrown::default(),
345 mdc: Map::new(),
346 }
347 .format_date()
348 }
349
350 #[test]
351 fn test_epoch_formatting_works() {
352 assert_eq!(
353 with_epoch_and_nano_format_date(1622724607, 420000000),
354 "2021-06-03T12:50:07.420+00:00"
355 );
356 assert_eq!(
357 with_epoch_and_nano_format_date(1675671221, 782873000),
358 "2023-02-06T08:13:41.782873+00:00"
359 );
360 assert_eq!(
361 with_epoch_and_nano_format_date(1675671481, 452180000),
362 "2023-02-06T08:18:01.452180+00:00"
363 );
364 }
365
366 #[test]
367 fn test_parse_log4j_line() {
368 let value = json!({
369 "instant": {
370 "epochSecond": 1675671481,
371 "nanoOfSecond": 452180000
372 },
373 "thread": "Source Data Fetcher for Source: MySourceName -> *anonymous_datastream_source$4*[18] (2/2)#2798",
374 "level": "INFO",
375 "loggerName": "org.apache.kafka.clients.FetchSessionHandler",
376 "message": "[Consumer clientId=name_72e14600-16b6-4c27-aff0-fae92ae52650-1, groupId=name_72e14600-16b6-4c27-aff0-fae92ae52650] Error sending fetch request (sessionId=1995808239, epoch=INITIAL) to node 0:",
377 "thrown": {
378 "commonElementCount": 0,
379 "name": "org.apache.kafka.common.errors.DisconnectException"
380 },
381 "endOfBatch": false,
382 "loggerFqcn": "org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger",
383 "threadId": 664,
384 "threadPriority": 5
385 });
386
387 let actual = Log4JJsonLayoutLogLine::deserialize(value).expect("Failed to unwrap test json");
388 let expected = Log4JJsonLayoutLogLine {
389 thread_name: "Source Data Fetcher for Source: MySourceName -> *anonymous_datastream_source$4*[18] (2/2)#2798".to_string(),
390 level: "INFO".to_string(),
391 logger_name: "org.apache.kafka.clients.FetchSessionHandler".to_string(),
392 end_of_batch: false,
393 logger_fqcn: "org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger".to_string(),
394 message: "[Consumer clientId=name_72e14600-16b6-4c27-aff0-fae92ae52650-1, groupId=name_72e14600-16b6-4c27-aff0-fae92ae52650] Error sending fetch request (sessionId=1995808239, epoch=INITIAL) to node 0:".to_string(),
395 instant: Log4JJsonLayoutLogLineInstant {
396 epoch_second: 1675671481,
397 nano_of_second: 452180000,
398 },
399 thread_id: 664,
400 thread_priority: 5,
401 thrown: Log4JJsonLayoutLogLineThrown {
402 common_element_count: 0,
403 localized_message: "".to_string(),
404 message: "".to_string(),
405 name: "org.apache.kafka.common.errors.DisconnectException".to_string()
406 },
407 mdc: Map::new(),
408 };
409
410 assert_eq!(actual, expected);
411 }
412}