Skip to main content

oxigdal_workflow/monitoring/
logging.rs

1//! Workflow logging system.
2
3use chrono::{DateTime, Utc};
4use dashmap::DashMap;
5use serde::{Deserialize, Serialize};
6use std::collections::VecDeque;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10/// Log level enumeration.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
12pub enum LogLevel {
13    /// Trace level (most verbose).
14    Trace,
15    /// Debug level.
16    Debug,
17    /// Info level.
18    Info,
19    /// Warning level.
20    Warn,
21    /// Error level.
22    Error,
23    /// Fatal level (least verbose).
24    Fatal,
25}
26
27impl LogLevel {
28    /// Convert to string representation.
29    pub fn as_str(&self) -> &'static str {
30        match self {
31            Self::Trace => "TRACE",
32            Self::Debug => "DEBUG",
33            Self::Info => "INFO",
34            Self::Warn => "WARN",
35            Self::Error => "ERROR",
36            Self::Fatal => "FATAL",
37        }
38    }
39}
40
41/// Log entry.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct LogEntry {
44    /// Log timestamp.
45    pub timestamp: DateTime<Utc>,
46    /// Log level.
47    pub level: LogLevel,
48    /// Workflow ID.
49    pub workflow_id: String,
50    /// Task ID (if applicable).
51    pub task_id: Option<String>,
52    /// Log message.
53    pub message: String,
54    /// Additional context fields.
55    pub context: std::collections::HashMap<String, String>,
56}
57
58impl LogEntry {
59    /// Create a new log entry.
60    pub fn new<S1: Into<String>, S2: Into<String>>(
61        level: LogLevel,
62        workflow_id: S1,
63        message: S2,
64    ) -> Self {
65        Self {
66            timestamp: Utc::now(),
67            level,
68            workflow_id: workflow_id.into(),
69            task_id: None,
70            message: message.into(),
71            context: std::collections::HashMap::new(),
72        }
73    }
74
75    /// Set the task ID.
76    pub fn with_task_id<S: Into<String>>(mut self, task_id: S) -> Self {
77        self.task_id = Some(task_id.into());
78        self
79    }
80
81    /// Add context field.
82    pub fn with_context<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
83        self.context.insert(key.into(), value.into());
84        self
85    }
86
87    /// Format the log entry as a string.
88    pub fn format(&self) -> String {
89        let task_info = self
90            .task_id
91            .as_ref()
92            .map(|id| format!(" [task:{}]", id))
93            .unwrap_or_default();
94
95        let context_info = if self.context.is_empty() {
96            String::new()
97        } else {
98            let mut parts: Vec<String> = self
99                .context
100                .iter()
101                .map(|(k, v)| format!("{}={}", k, v))
102                .collect();
103            parts.sort();
104            format!(" {{{}}}", parts.join(", "))
105        };
106
107        format!(
108            "[{}] {} [workflow:{}]{}{} {}",
109            self.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"),
110            self.level.as_str(),
111            self.workflow_id,
112            task_info,
113            context_info,
114            self.message
115        )
116    }
117}
118
119/// Workflow logger.
120pub struct WorkflowLogger {
121    logs: Arc<DashMap<String, Arc<RwLock<VecDeque<LogEntry>>>>>,
122    max_logs_per_workflow: usize,
123    min_level: LogLevel,
124}
125
126impl WorkflowLogger {
127    /// Create a new workflow logger.
128    pub fn new() -> Self {
129        Self {
130            logs: Arc::new(DashMap::new()),
131            max_logs_per_workflow: 10000,
132            min_level: LogLevel::Info,
133        }
134    }
135
136    /// Create a logger with custom configuration.
137    pub fn with_config(max_logs_per_workflow: usize, min_level: LogLevel) -> Self {
138        Self {
139            logs: Arc::new(DashMap::new()),
140            max_logs_per_workflow,
141            min_level,
142        }
143    }
144
145    /// Set the minimum log level.
146    pub fn set_min_level(&mut self, level: LogLevel) {
147        self.min_level = level;
148    }
149
150    /// Log a message.
151    pub async fn log(&self, entry: LogEntry) {
152        if entry.level < self.min_level {
153            return;
154        }
155
156        let workflow_id = entry.workflow_id.clone();
157        let logs = self
158            .logs
159            .entry(workflow_id)
160            .or_insert_with(|| Arc::new(RwLock::new(VecDeque::new())));
161
162        let mut log_queue = logs.write().await;
163
164        // Maintain max size
165        if log_queue.len() >= self.max_logs_per_workflow {
166            log_queue.pop_front();
167        }
168
169        log_queue.push_back(entry);
170    }
171
172    /// Log a trace message.
173    pub async fn trace<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
174        self.log(LogEntry::new(
175            LogLevel::Trace,
176            workflow_id.into(),
177            message.into(),
178        ))
179        .await;
180    }
181
182    /// Log a debug message.
183    pub async fn debug<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
184        self.log(LogEntry::new(
185            LogLevel::Debug,
186            workflow_id.into(),
187            message.into(),
188        ))
189        .await;
190    }
191
192    /// Log an info message.
193    pub async fn info<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
194        self.log(LogEntry::new(
195            LogLevel::Info,
196            workflow_id.into(),
197            message.into(),
198        ))
199        .await;
200    }
201
202    /// Log a warning message.
203    pub async fn warn<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
204        self.log(LogEntry::new(
205            LogLevel::Warn,
206            workflow_id.into(),
207            message.into(),
208        ))
209        .await;
210    }
211
212    /// Log an error message.
213    pub async fn error<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
214        self.log(LogEntry::new(
215            LogLevel::Error,
216            workflow_id.into(),
217            message.into(),
218        ))
219        .await;
220    }
221
222    /// Log a fatal message.
223    pub async fn fatal<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
224        self.log(LogEntry::new(
225            LogLevel::Fatal,
226            workflow_id.into(),
227            message.into(),
228        ))
229        .await;
230    }
231
232    /// Get logs for a workflow.
233    pub async fn get_logs(&self, workflow_id: &str) -> Vec<LogEntry> {
234        if let Some(logs) = self.logs.get(workflow_id) {
235            let log_queue = logs.read().await;
236            log_queue.iter().cloned().collect()
237        } else {
238            Vec::new()
239        }
240    }
241
242    /// Get logs for a workflow with filtering.
243    pub async fn get_logs_filtered(
244        &self,
245        workflow_id: &str,
246        min_level: LogLevel,
247        limit: Option<usize>,
248    ) -> Vec<LogEntry> {
249        if let Some(logs) = self.logs.get(workflow_id) {
250            let log_queue = logs.read().await;
251            let mut filtered: Vec<LogEntry> = log_queue
252                .iter()
253                .filter(|entry| entry.level >= min_level)
254                .cloned()
255                .collect();
256
257            if let Some(limit_count) = limit {
258                let start = if filtered.len() > limit_count {
259                    filtered.len() - limit_count
260                } else {
261                    0
262                };
263                filtered = filtered[start..].to_vec();
264            }
265
266            filtered
267        } else {
268            Vec::new()
269        }
270    }
271
272    /// Get logs for a specific task.
273    pub async fn get_task_logs(&self, workflow_id: &str, task_id: &str) -> Vec<LogEntry> {
274        if let Some(logs) = self.logs.get(workflow_id) {
275            let log_queue = logs.read().await;
276            log_queue
277                .iter()
278                .filter(|entry| entry.task_id.as_deref() == Some(task_id))
279                .cloned()
280                .collect()
281        } else {
282            Vec::new()
283        }
284    }
285
286    /// Clear logs for a workflow.
287    pub async fn clear_logs(&self, workflow_id: &str) {
288        if let Some(logs) = self.logs.get(workflow_id) {
289            let mut log_queue = logs.write().await;
290            log_queue.clear();
291        }
292    }
293
294    /// Clear all logs.
295    pub fn clear_all_logs(&self) {
296        self.logs.clear();
297    }
298
299    /// Get log count for a workflow.
300    pub async fn get_log_count(&self, workflow_id: &str) -> usize {
301        if let Some(logs) = self.logs.get(workflow_id) {
302            let log_queue = logs.read().await;
303            log_queue.len()
304        } else {
305            0
306        }
307    }
308
309    /// Get total log count across all workflows.
310    pub async fn get_total_log_count(&self) -> usize {
311        let mut total = 0;
312        for entry in self.logs.iter() {
313            let log_queue = entry.value().read().await;
314            total += log_queue.len();
315        }
316        total
317    }
318
319    /// Export logs to JSON.
320    pub async fn export_logs_json(&self, workflow_id: &str) -> Result<String, serde_json::Error> {
321        let logs = self.get_logs(workflow_id).await;
322        serde_json::to_string_pretty(&logs)
323    }
324}
325
326impl Default for WorkflowLogger {
327    fn default() -> Self {
328        Self::new()
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[tokio::test]
337    async fn test_logger_creation() {
338        let logger = WorkflowLogger::new();
339        assert_eq!(logger.get_log_count("workflow1").await, 0);
340    }
341
342    #[tokio::test]
343    async fn test_logging() {
344        let logger = WorkflowLogger::new();
345
346        logger.info("workflow1", "Test message").await;
347
348        let logs = logger.get_logs("workflow1").await;
349        assert_eq!(logs.len(), 1);
350        assert_eq!(logs[0].message, "Test message");
351    }
352
353    #[tokio::test]
354    async fn test_log_levels() {
355        let logger = WorkflowLogger::with_config(100, LogLevel::Warn);
356
357        logger.info("workflow1", "Info message").await;
358        logger.warn("workflow1", "Warning message").await;
359        logger.error("workflow1", "Error message").await;
360
361        let logs = logger.get_logs("workflow1").await;
362        // Only warn and error should be logged
363        assert_eq!(logs.len(), 2);
364    }
365
366    #[tokio::test]
367    async fn test_log_filtering() {
368        let logger = WorkflowLogger::new();
369
370        logger.info("workflow1", "Info").await;
371        logger.warn("workflow1", "Warning").await;
372        logger.error("workflow1", "Error").await;
373
374        let filtered = logger
375            .get_logs_filtered("workflow1", LogLevel::Warn, None)
376            .await;
377
378        assert_eq!(filtered.len(), 2);
379    }
380
381    #[tokio::test]
382    async fn test_task_logs() {
383        let logger = WorkflowLogger::new();
384
385        let entry =
386            LogEntry::new(LogLevel::Info, "workflow1", "Task message").with_task_id("task1");
387
388        logger.log(entry).await;
389
390        let task_logs = logger.get_task_logs("workflow1", "task1").await;
391        assert_eq!(task_logs.len(), 1);
392    }
393
394    #[test]
395    fn test_log_entry_format() {
396        let entry = LogEntry::new(LogLevel::Info, "workflow1", "Test message")
397            .with_task_id("task1")
398            .with_context("key", "value");
399
400        let formatted = entry.format();
401        assert!(formatted.contains("INFO"));
402        assert!(formatted.contains("workflow1"));
403        assert!(formatted.contains("task1"));
404        assert!(formatted.contains("Test message"));
405    }
406}