xerv_core/logging/
event.rs

1//! Log event types for trace execution logging.
2//!
3//! Provides structured log events with correlation IDs (trace_id, node_id)
4//! for debugging and observability of flow executions.
5
6use crate::types::{NodeId, TraceId};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12/// Log severity level.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
14#[serde(rename_all = "lowercase")]
15pub enum LogLevel {
16    /// Fine-grained debugging information.
17    Trace,
18    /// Debugging information.
19    Debug,
20    /// Informational messages.
21    Info,
22    /// Warning messages.
23    Warn,
24    /// Error messages.
25    Error,
26}
27
28impl LogLevel {
29    /// Parse a log level from a string.
30    pub fn parse(s: &str) -> Option<Self> {
31        match s.to_lowercase().as_str() {
32            "trace" => Some(Self::Trace),
33            "debug" => Some(Self::Debug),
34            "info" => Some(Self::Info),
35            "warn" | "warning" => Some(Self::Warn),
36            "error" => Some(Self::Error),
37            _ => None,
38        }
39    }
40
41    /// Get the string representation.
42    pub fn as_str(&self) -> &'static str {
43        match self {
44            Self::Trace => "trace",
45            Self::Debug => "debug",
46            Self::Info => "info",
47            Self::Warn => "warn",
48            Self::Error => "error",
49        }
50    }
51}
52
53impl Default for LogLevel {
54    fn default() -> Self {
55        Self::Info
56    }
57}
58
59impl fmt::Display for LogLevel {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(f, "{}", self.as_str())
62    }
63}
64
65impl std::str::FromStr for LogLevel {
66    type Err = &'static str;
67
68    fn from_str(s: &str) -> Result<Self, Self::Err> {
69        Self::parse(s).ok_or("invalid log level")
70    }
71}
72
73/// Category of log event.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum LogCategory {
77    /// Trace lifecycle events (start, complete, fail).
78    Trace,
79    /// Node execution events (start, complete, error).
80    Node,
81    /// Trigger events (fire, pause, resume).
82    Trigger,
83    /// Pipeline events (deploy, start, stop).
84    Pipeline,
85    /// Schema/data validation events.
86    Schema,
87    /// System/internal events.
88    System,
89    /// User-defined custom events.
90    Custom,
91}
92
93impl LogCategory {
94    /// Get the string representation.
95    pub fn as_str(&self) -> &'static str {
96        match self {
97            Self::Trace => "trace",
98            Self::Node => "node",
99            Self::Trigger => "trigger",
100            Self::Pipeline => "pipeline",
101            Self::Schema => "schema",
102            Self::System => "system",
103            Self::Custom => "custom",
104        }
105    }
106}
107
108impl fmt::Display for LogCategory {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        write!(f, "{}", self.as_str())
111    }
112}
113
114/// A structured log event with correlation IDs.
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct LogEvent {
117    /// Unique event ID.
118    pub id: u64,
119    /// Timestamp in nanoseconds since UNIX epoch.
120    pub timestamp_ns: u64,
121    /// Log severity level.
122    pub level: LogLevel,
123    /// Event category.
124    pub category: LogCategory,
125    /// Associated trace ID (if any).
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub trace_id: Option<TraceId>,
128    /// Associated node ID (if any).
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub node_id: Option<NodeId>,
131    /// Associated pipeline ID (if any).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub pipeline_id: Option<String>,
134    /// Human-readable message.
135    pub message: String,
136    /// Structured fields for additional context.
137    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
138    pub fields: HashMap<String, serde_json::Value>,
139}
140
141impl LogEvent {
142    /// Create a new log event with the current timestamp.
143    pub fn new(level: LogLevel, category: LogCategory, message: impl Into<String>) -> Self {
144        Self {
145            id: 0, // Will be assigned by collector
146            timestamp_ns: current_timestamp_ns(),
147            level,
148            category,
149            trace_id: None,
150            node_id: None,
151            pipeline_id: None,
152            message: message.into(),
153            fields: HashMap::new(),
154        }
155    }
156
157    /// Create a trace-level log event.
158    pub fn trace(category: LogCategory, message: impl Into<String>) -> Self {
159        Self::new(LogLevel::Trace, category, message)
160    }
161
162    /// Create a debug-level log event.
163    pub fn debug(category: LogCategory, message: impl Into<String>) -> Self {
164        Self::new(LogLevel::Debug, category, message)
165    }
166
167    /// Create an info-level log event.
168    pub fn info(category: LogCategory, message: impl Into<String>) -> Self {
169        Self::new(LogLevel::Info, category, message)
170    }
171
172    /// Create a warn-level log event.
173    pub fn warn(category: LogCategory, message: impl Into<String>) -> Self {
174        Self::new(LogLevel::Warn, category, message)
175    }
176
177    /// Create an error-level log event.
178    pub fn error(category: LogCategory, message: impl Into<String>) -> Self {
179        Self::new(LogLevel::Error, category, message)
180    }
181
182    /// Set the trace ID.
183    pub fn with_trace_id(mut self, trace_id: TraceId) -> Self {
184        self.trace_id = Some(trace_id);
185        self
186    }
187
188    /// Set the node ID.
189    pub fn with_node_id(mut self, node_id: NodeId) -> Self {
190        self.node_id = Some(node_id);
191        self
192    }
193
194    /// Set the pipeline ID.
195    pub fn with_pipeline_id(mut self, pipeline_id: impl Into<String>) -> Self {
196        self.pipeline_id = Some(pipeline_id.into());
197        self
198    }
199
200    /// Add a string field.
201    pub fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
202        self.fields
203            .insert(key.into(), serde_json::Value::String(value.into()));
204        self
205    }
206
207    /// Add a numeric field.
208    pub fn with_field_i64(mut self, key: impl Into<String>, value: i64) -> Self {
209        self.fields
210            .insert(key.into(), serde_json::Value::Number(value.into()));
211        self
212    }
213
214    /// Add a boolean field.
215    pub fn with_field_bool(mut self, key: impl Into<String>, value: bool) -> Self {
216        self.fields
217            .insert(key.into(), serde_json::Value::Bool(value));
218        self
219    }
220
221    /// Add a JSON value field.
222    pub fn with_field_json(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
223        self.fields.insert(key.into(), value);
224        self
225    }
226
227    /// Get the timestamp as a DateTime string (ISO 8601).
228    pub fn timestamp_iso(&self) -> String {
229        let secs = self.timestamp_ns / 1_000_000_000;
230        let nanos = (self.timestamp_ns % 1_000_000_000) as u32;
231
232        if let Some(datetime) = chrono::DateTime::from_timestamp(secs as i64, nanos) {
233            datetime.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
234        } else {
235            format!("{}ns", self.timestamp_ns)
236        }
237    }
238
239    /// Format as a single log line.
240    pub fn format_line(&self) -> String {
241        let mut parts = vec![
242            self.timestamp_iso(),
243            format!("[{}]", self.level.as_str().to_uppercase()),
244            format!("[{}]", self.category.as_str()),
245        ];
246
247        if let Some(ref trace_id) = self.trace_id {
248            parts.push(format!("trace={}", trace_id));
249        }
250
251        if let Some(node_id) = self.node_id {
252            parts.push(format!("node={}", node_id.as_u32()));
253        }
254
255        if let Some(ref pipeline_id) = self.pipeline_id {
256            parts.push(format!("pipeline={}", pipeline_id));
257        }
258
259        parts.push(self.message.clone());
260
261        if !self.fields.is_empty() {
262            let fields_str: Vec<String> = self
263                .fields
264                .iter()
265                .map(|(k, v)| format!("{}={}", k, v))
266                .collect();
267            parts.push(format!("{{{}}}", fields_str.join(", ")));
268        }
269
270        parts.join(" ")
271    }
272}
273
274/// Get current timestamp in nanoseconds since UNIX epoch.
275fn current_timestamp_ns() -> u64 {
276    SystemTime::now()
277        .duration_since(UNIX_EPOCH)
278        .map(|d| d.as_nanos() as u64)
279        .unwrap_or(0)
280}
281
282/// Builder for creating log events with common context.
283#[derive(Debug, Clone)]
284pub struct LogEventBuilder {
285    trace_id: Option<TraceId>,
286    node_id: Option<NodeId>,
287    pipeline_id: Option<String>,
288}
289
290impl LogEventBuilder {
291    /// Create a new builder.
292    pub fn new() -> Self {
293        Self {
294            trace_id: None,
295            node_id: None,
296            pipeline_id: None,
297        }
298    }
299
300    /// Set the trace ID for all events.
301    pub fn with_trace_id(mut self, trace_id: TraceId) -> Self {
302        self.trace_id = Some(trace_id);
303        self
304    }
305
306    /// Set the node ID for all events.
307    pub fn with_node_id(mut self, node_id: NodeId) -> Self {
308        self.node_id = Some(node_id);
309        self
310    }
311
312    /// Set the pipeline ID for all events.
313    pub fn with_pipeline_id(mut self, pipeline_id: impl Into<String>) -> Self {
314        self.pipeline_id = Some(pipeline_id.into());
315        self
316    }
317
318    /// Create a log event with the builder's context.
319    pub fn event(
320        &self,
321        level: LogLevel,
322        category: LogCategory,
323        message: impl Into<String>,
324    ) -> LogEvent {
325        let mut event = LogEvent::new(level, category, message);
326        if let Some(trace_id) = self.trace_id {
327            event.trace_id = Some(trace_id);
328        }
329        if let Some(node_id) = self.node_id {
330            event.node_id = Some(node_id);
331        }
332        if let Some(ref pipeline_id) = self.pipeline_id {
333            event.pipeline_id = Some(pipeline_id.clone());
334        }
335        event
336    }
337
338    /// Create a trace-level event.
339    pub fn trace(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
340        self.event(LogLevel::Trace, category, message)
341    }
342
343    /// Create a debug-level event.
344    pub fn debug(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
345        self.event(LogLevel::Debug, category, message)
346    }
347
348    /// Create an info-level event.
349    pub fn info(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
350        self.event(LogLevel::Info, category, message)
351    }
352
353    /// Create a warn-level event.
354    pub fn warn(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
355        self.event(LogLevel::Warn, category, message)
356    }
357
358    /// Create an error-level event.
359    pub fn error(&self, category: LogCategory, message: impl Into<String>) -> LogEvent {
360        self.event(LogLevel::Error, category, message)
361    }
362}
363
364impl Default for LogEventBuilder {
365    fn default() -> Self {
366        Self::new()
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[test]
375    fn log_level_parsing() {
376        assert_eq!(LogLevel::parse("trace"), Some(LogLevel::Trace));
377        assert_eq!(LogLevel::parse("DEBUG"), Some(LogLevel::Debug));
378        assert_eq!(LogLevel::parse("Info"), Some(LogLevel::Info));
379        assert_eq!(LogLevel::parse("WARN"), Some(LogLevel::Warn));
380        assert_eq!(LogLevel::parse("warning"), Some(LogLevel::Warn));
381        assert_eq!(LogLevel::parse("error"), Some(LogLevel::Error));
382        assert_eq!(LogLevel::parse("invalid"), None);
383    }
384
385    #[test]
386    fn log_level_ordering() {
387        assert!(LogLevel::Trace < LogLevel::Debug);
388        assert!(LogLevel::Debug < LogLevel::Info);
389        assert!(LogLevel::Info < LogLevel::Warn);
390        assert!(LogLevel::Warn < LogLevel::Error);
391    }
392
393    #[test]
394    fn log_event_creation() {
395        let event = LogEvent::info(LogCategory::Node, "Node started")
396            .with_trace_id(TraceId::new())
397            .with_node_id(NodeId::new(42))
398            .with_field("duration_ms", "123");
399
400        assert_eq!(event.level, LogLevel::Info);
401        assert_eq!(event.category, LogCategory::Node);
402        assert_eq!(event.message, "Node started");
403        assert!(event.trace_id.is_some());
404        assert_eq!(event.node_id, Some(NodeId::new(42)));
405        assert!(event.fields.contains_key("duration_ms"));
406    }
407
408    #[test]
409    fn log_event_builder() {
410        let trace_id = TraceId::new();
411        let builder = LogEventBuilder::new()
412            .with_trace_id(trace_id)
413            .with_pipeline_id("test_pipeline");
414
415        let event = builder.info(LogCategory::Trace, "Trace started");
416
417        assert_eq!(event.trace_id, Some(trace_id));
418        assert_eq!(event.pipeline_id, Some("test_pipeline".to_string()));
419        assert_eq!(event.level, LogLevel::Info);
420    }
421
422    #[test]
423    fn log_event_format_line() {
424        let event = LogEvent::info(LogCategory::Node, "Processing order")
425            .with_pipeline_id("order_pipeline")
426            .with_field("order_id", "ORD-123");
427
428        let line = event.format_line();
429        assert!(line.contains("[INFO]"));
430        assert!(line.contains("[node]"));
431        assert!(line.contains("pipeline=order_pipeline"));
432        assert!(line.contains("Processing order"));
433        assert!(line.contains("order_id"));
434    }
435
436    #[test]
437    fn log_event_serialization() {
438        let event = LogEvent::error(LogCategory::System, "Connection failed")
439            .with_field("host", "localhost")
440            .with_field_i64("port", 8080);
441
442        let json = serde_json::to_string(&event).unwrap();
443        let parsed: LogEvent = serde_json::from_str(&json).unwrap();
444
445        assert_eq!(parsed.level, LogLevel::Error);
446        assert_eq!(parsed.category, LogCategory::System);
447        assert_eq!(parsed.message, "Connection failed");
448        assert_eq!(parsed.fields.len(), 2);
449    }
450}