1use crate::types::{NodeId, TraceId};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
14#[serde(rename_all = "lowercase")]
15pub enum LogLevel {
16 Trace,
18 Debug,
20 Info,
22 Warn,
24 Error,
26}
27
28impl LogLevel {
29 pub fn parse(s: &str) -> Option<Self> {
31 match s.to_lowercase().as_str() {
32 "trace" => Some(Self::Trace),
33 "debug" => Some(Self::Debug),
34 "info" => Some(Self::Info),
35 "warn" | "warning" => Some(Self::Warn),
36 "error" => Some(Self::Error),
37 _ => None,
38 }
39 }
40
41 pub fn as_str(&self) -> &'static str {
43 match self {
44 Self::Trace => "trace",
45 Self::Debug => "debug",
46 Self::Info => "info",
47 Self::Warn => "warn",
48 Self::Error => "error",
49 }
50 }
51}
52
53impl Default for LogLevel {
54 fn default() -> Self {
55 Self::Info
56 }
57}
58
59impl fmt::Display for LogLevel {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 write!(f, "{}", self.as_str())
62 }
63}
64
65impl std::str::FromStr for LogLevel {
66 type Err = &'static str;
67
68 fn from_str(s: &str) -> Result<Self, Self::Err> {
69 Self::parse(s).ok_or("invalid log level")
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum LogCategory {
77 Trace,
79 Node,
81 Trigger,
83 Pipeline,
85 Schema,
87 System,
89 Custom,
91}
92
93impl LogCategory {
94 pub fn as_str(&self) -> &'static str {
96 match self {
97 Self::Trace => "trace",
98 Self::Node => "node",
99 Self::Trigger => "trigger",
100 Self::Pipeline => "pipeline",
101 Self::Schema => "schema",
102 Self::System => "system",
103 Self::Custom => "custom",
104 }
105 }
106}
107
108impl fmt::Display for LogCategory {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 write!(f, "{}", self.as_str())
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct LogEvent {
117 pub id: u64,
119 pub timestamp_ns: u64,
121 pub level: LogLevel,
123 pub category: LogCategory,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub trace_id: Option<TraceId>,
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub node_id: Option<NodeId>,
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub pipeline_id: Option<String>,
134 pub message: String,
136 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
138 pub fields: HashMap<String, serde_json::Value>,
139}
140
141impl LogEvent {
142 pub fn new(level: LogLevel, category: LogCategory, message: impl Into<String>) -> Self {
144 Self {
145 id: 0, timestamp_ns: current_timestamp_ns(),
147 level,
148 category,
149 trace_id: None,
150 node_id: None,
151 pipeline_id: None,
152 message: message.into(),
153 fields: HashMap::new(),
154 }
155 }
156
157 pub fn trace(category: LogCategory, message: impl Into<String>) -> Self {
159 Self::new(LogLevel::Trace, category, message)
160 }
161
162 pub fn debug(category: LogCategory, message: impl Into<String>) -> Self {
164 Self::new(LogLevel::Debug, category, message)
165 }
166
167 pub fn info(category: LogCategory, message: impl Into<String>) -> Self {
169 Self::new(LogLevel::Info, category, message)
170 }
171
172 pub fn warn(category: LogCategory, message: impl Into<String>) -> Self {
174 Self::new(LogLevel::Warn, category, message)
175 }
176
177 pub fn error(category: LogCategory, message: impl Into<String>) -> Self {
179 Self::new(LogLevel::Error, category, message)
180 }
181
182 pub fn with_trace_id(mut self, trace_id: TraceId) -> Self {
184 self.trace_id = Some(trace_id);
185 self
186 }
187
188 pub fn with_node_id(mut self, node_id: NodeId) -> Self {
190 self.node_id = Some(node_id);
191 self
192 }
193
194 pub fn with_pipeline_id(mut self, pipeline_id: impl Into<String>) -> Self {
196 self.pipeline_id = Some(pipeline_id.into());
197 self
198 }
199
200 pub fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
202 self.fields
203 .insert(key.into(), serde_json::Value::String(value.into()));
204 self
205 }
206
207 pub fn with_field_i64(mut self, key: impl Into<String>, value: i64) -> Self {
209 self.fields
210 .insert(key.into(), serde_json::Value::Number(value.into()));
211 self
212 }
213
214 pub fn with_field_bool(mut self, key: impl Into<String>, value: bool) -> Self {
216 self.fields
217 .insert(key.into(), serde_json::Value::Bool(value));
218 self
219 }
220
221 pub fn with_field_json(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
223 self.fields.insert(key.into(), value);
224 self
225 }
226
227 pub fn timestamp_iso(&self) -> String {
229 let secs = self.timestamp_ns / 1_000_000_000;
230 let nanos = (self.timestamp_ns % 1_000_000_000) as u32;
231
232 if let Some(datetime) = chrono::DateTime::from_timestamp(secs as i64, nanos) {
233 datetime.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
234 } else {
235 format!("{}ns", self.timestamp_ns)
236 }
237 }
238
239 pub fn format_line(&self) -> String {
241 let mut parts = vec![
242 self.timestamp_iso(),
243 format!("[{}]", self.level.as_str().to_uppercase()),
244 format!("[{}]", self.category.as_str()),
245 ];
246
247 if let Some(ref trace_id) = self.trace_id {
248 parts.push(format!("trace={}", trace_id));
249 }
250
251 if let Some(node_id) = self.node_id {
252 parts.push(format!("node={}", node_id.as_u32()));
253 }
254
255 if let Some(ref pipeline_id) = self.pipeline_id {
256 parts.push(format!("pipeline={}", pipeline_id));
257 }
258
259 parts.push(self.message.clone());
260
261 if !self.fields.is_empty() {
262 let fields_str: Vec<String> = self
263 .fields
264 .iter()
265 .map(|(k, v)| format!("{}={}", k, v))
266 .collect();
267 parts.push(format!("{{{}}}", fields_str.join(", ")));
268 }
269
270 parts.join(" ")
271 }
272}
273
274fn current_timestamp_ns() -> u64 {
276 SystemTime::now()
277 .duration_since(UNIX_EPOCH)
278 .map(|d| d.as_nanos() as u64)
279 .unwrap_or(0)
280}
281
282#[derive(Debug, Clone)]
284pub struct LogEventBuilder {
285 trace_id: Option<TraceId>,
286 node_id: Option<NodeId>,
287 pipeline_id: Option<String>,
288}
289
290impl LogEventBuilder {
291 pub fn new() -> Self {
293 Self {
294 trace_id: None,
295 node_id: None,
296 pipeline_id: None,
297 }
298 }
299
300 pub fn with_trace_id(mut self, trace_id: TraceId) -> Self {
302 self.trace_id = Some(trace_id);
303 self
304 }
305
306 pub fn with_node_id(mut self, node_id: NodeId) -> Self {
308 self.node_id = Some(node_id);
309 self
310 }
311
312 pub fn with_pipeline_id(mut self, pipeline_id: impl Into<String>) -> Self {
314 self.pipeline_id = Some(pipeline_id.into());
315 self
316 }
317
318 pub fn event(
320 &self,
321 level: LogLevel,
322 category: LogCategory,
323 message: impl Into<String>,
324 ) -> LogEvent {
325 let mut event = LogEvent::new(level, category, message);
326 if let Some(trace_id) = self.trace_id {
327 event.trace_id = Some(trace_id);
328 }
329 if let Some(node_id) = self.node_id {
330 event.node_id = Some(node_id);
331 }
332 if let Some(ref pipeline_id) = self.pipeline_id {
333 event.pipeline_id = Some(pipeline_id.clone());
334 }
335 event
336 }
337
338 pub fn trace(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
340 self.event(LogLevel::Trace, category, message)
341 }
342
343 pub fn debug(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
345 self.event(LogLevel::Debug, category, message)
346 }
347
348 pub fn info(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
350 self.event(LogLevel::Info, category, message)
351 }
352
353 pub fn warn(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
355 self.event(LogLevel::Warn, category, message)
356 }
357
358 pub fn error(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
360 self.event(LogLevel::Error, category, message)
361 }
362}
363
364impl Default for LogEventBuilder {
365 fn default() -> Self {
366 Self::new()
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 #[test]
375 fn log_level_parsing() {
376 assert_eq!(LogLevel::parse("trace"), Some(LogLevel::Trace));
377 assert_eq!(LogLevel::parse("DEBUG"), Some(LogLevel::Debug));
378 assert_eq!(LogLevel::parse("Info"), Some(LogLevel::Info));
379 assert_eq!(LogLevel::parse("WARN"), Some(LogLevel::Warn));
380 assert_eq!(LogLevel::parse("warning"), Some(LogLevel::Warn));
381 assert_eq!(LogLevel::parse("error"), Some(LogLevel::Error));
382 assert_eq!(LogLevel::parse("invalid"), None);
383 }
384
385 #[test]
386 fn log_level_ordering() {
387 assert!(LogLevel::Trace < LogLevel::Debug);
388 assert!(LogLevel::Debug < LogLevel::Info);
389 assert!(LogLevel::Info < LogLevel::Warn);
390 assert!(LogLevel::Warn < LogLevel::Error);
391 }
392
393 #[test]
394 fn log_event_creation() {
395 let event = LogEvent::info(LogCategory::Node, "Node started")
396 .with_trace_id(TraceId::new())
397 .with_node_id(NodeId::new(42))
398 .with_field("duration_ms", "123");
399
400 assert_eq!(event.level, LogLevel::Info);
401 assert_eq!(event.category, LogCategory::Node);
402 assert_eq!(event.message, "Node started");
403 assert!(event.trace_id.is_some());
404 assert_eq!(event.node_id, Some(NodeId::new(42)));
405 assert!(event.fields.contains_key("duration_ms"));
406 }
407
408 #[test]
409 fn log_event_builder() {
410 let trace_id = TraceId::new();
411 let builder = LogEventBuilder::new()
412 .with_trace_id(trace_id)
413 .with_pipeline_id("test_pipeline");
414
415 let event = builder.info(LogCategory::Trace, "Trace started");
416
417 assert_eq!(event.trace_id, Some(trace_id));
418 assert_eq!(event.pipeline_id, Some("test_pipeline".to_string()));
419 assert_eq!(event.level, LogLevel::Info);
420 }
421
422 #[test]
423 fn log_event_format_line() {
424 let event = LogEvent::info(LogCategory::Node, "Processing order")
425 .with_pipeline_id("order_pipeline")
426 .with_field("order_id", "ORD-123");
427
428 let line = event.format_line();
429 assert!(line.contains("[INFO]"));
430 assert!(line.contains("[node]"));
431 assert!(line.contains("pipeline=order_pipeline"));
432 assert!(line.contains("Processing order"));
433 assert!(line.contains("order_id"));
434 }
435
436 #[test]
437 fn log_event_serialization() {
438 let event = LogEvent::error(LogCategory::System, "Connection failed")
439 .with_field("host", "localhost")
440 .with_field_i64("port", 8080);
441
442 let json = serde_json::to_string(&event).unwrap();
443 let parsed: LogEvent = serde_json::from_str(&json).unwrap();
444
445 assert_eq!(parsed.level, LogLevel::Error);
446 assert_eq!(parsed.category, LogCategory::System);
447 assert_eq!(parsed.message, "Connection failed");
448 assert_eq!(parsed.fields.len(), 2);
449 }
450}