oxigdal_workflow/monitoring/
logging.rs1use chrono::{DateTime, Utc};
4use dashmap::DashMap;
5use serde::{Deserialize, Serialize};
6use std::collections::VecDeque;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
12pub enum LogLevel {
13 Trace,
15 Debug,
17 Info,
19 Warn,
21 Error,
23 Fatal,
25}
26
27impl LogLevel {
28 pub fn as_str(&self) -> &'static str {
30 match self {
31 Self::Trace => "TRACE",
32 Self::Debug => "DEBUG",
33 Self::Info => "INFO",
34 Self::Warn => "WARN",
35 Self::Error => "ERROR",
36 Self::Fatal => "FATAL",
37 }
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct LogEntry {
44 pub timestamp: DateTime<Utc>,
46 pub level: LogLevel,
48 pub workflow_id: String,
50 pub task_id: Option<String>,
52 pub message: String,
54 pub context: std::collections::HashMap<String, String>,
56}
57
58impl LogEntry {
59 pub fn new<S1: Into<String>, S2: Into<String>>(
61 level: LogLevel,
62 workflow_id: S1,
63 message: S2,
64 ) -> Self {
65 Self {
66 timestamp: Utc::now(),
67 level,
68 workflow_id: workflow_id.into(),
69 task_id: None,
70 message: message.into(),
71 context: std::collections::HashMap::new(),
72 }
73 }
74
75 pub fn with_task_id<S: Into<String>>(mut self, task_id: S) -> Self {
77 self.task_id = Some(task_id.into());
78 self
79 }
80
81 pub fn with_context<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
83 self.context.insert(key.into(), value.into());
84 self
85 }
86
87 pub fn format(&self) -> String {
89 let task_info = self
90 .task_id
91 .as_ref()
92 .map(|id| format!(" [task:{}]", id))
93 .unwrap_or_default();
94
95 let context_info = if self.context.is_empty() {
96 String::new()
97 } else {
98 let mut parts: Vec<String> = self
99 .context
100 .iter()
101 .map(|(k, v)| format!("{}={}", k, v))
102 .collect();
103 parts.sort();
104 format!(" {{{}}}", parts.join(", "))
105 };
106
107 format!(
108 "[{}] {} [workflow:{}]{}{} {}",
109 self.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"),
110 self.level.as_str(),
111 self.workflow_id,
112 task_info,
113 context_info,
114 self.message
115 )
116 }
117}
118
119pub struct WorkflowLogger {
121 logs: Arc<DashMap<String, Arc<RwLock<VecDeque<LogEntry>>>>>,
122 max_logs_per_workflow: usize,
123 min_level: LogLevel,
124}
125
126impl WorkflowLogger {
127 pub fn new() -> Self {
129 Self {
130 logs: Arc::new(DashMap::new()),
131 max_logs_per_workflow: 10000,
132 min_level: LogLevel::Info,
133 }
134 }
135
136 pub fn with_config(max_logs_per_workflow: usize, min_level: LogLevel) -> Self {
138 Self {
139 logs: Arc::new(DashMap::new()),
140 max_logs_per_workflow,
141 min_level,
142 }
143 }
144
145 pub fn set_min_level(&mut self, level: LogLevel) {
147 self.min_level = level;
148 }
149
150 pub async fn log(&self, entry: LogEntry) {
152 if entry.level < self.min_level {
153 return;
154 }
155
156 let workflow_id = entry.workflow_id.clone();
157 let logs = self
158 .logs
159 .entry(workflow_id)
160 .or_insert_with(|| Arc::new(RwLock::new(VecDeque::new())));
161
162 let mut log_queue = logs.write().await;
163
164 if log_queue.len() >= self.max_logs_per_workflow {
166 log_queue.pop_front();
167 }
168
169 log_queue.push_back(entry);
170 }
171
172 pub async fn trace<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
174 self.log(LogEntry::new(
175 LogLevel::Trace,
176 workflow_id.into(),
177 message.into(),
178 ))
179 .await;
180 }
181
182 pub async fn debug<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
184 self.log(LogEntry::new(
185 LogLevel::Debug,
186 workflow_id.into(),
187 message.into(),
188 ))
189 .await;
190 }
191
192 pub async fn info<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
194 self.log(LogEntry::new(
195 LogLevel::Info,
196 workflow_id.into(),
197 message.into(),
198 ))
199 .await;
200 }
201
202 pub async fn warn<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
204 self.log(LogEntry::new(
205 LogLevel::Warn,
206 workflow_id.into(),
207 message.into(),
208 ))
209 .await;
210 }
211
212 pub async fn error<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
214 self.log(LogEntry::new(
215 LogLevel::Error,
216 workflow_id.into(),
217 message.into(),
218 ))
219 .await;
220 }
221
222 pub async fn fatal<S1: Into<String>, S2: Into<String>>(&self, workflow_id: S1, message: S2) {
224 self.log(LogEntry::new(
225 LogLevel::Fatal,
226 workflow_id.into(),
227 message.into(),
228 ))
229 .await;
230 }
231
232 pub async fn get_logs(&self, workflow_id: &str) -> Vec<LogEntry> {
234 if let Some(logs) = self.logs.get(workflow_id) {
235 let log_queue = logs.read().await;
236 log_queue.iter().cloned().collect()
237 } else {
238 Vec::new()
239 }
240 }
241
242 pub async fn get_logs_filtered(
244 &self,
245 workflow_id: &str,
246 min_level: LogLevel,
247 limit: Option<usize>,
248 ) -> Vec<LogEntry> {
249 if let Some(logs) = self.logs.get(workflow_id) {
250 let log_queue = logs.read().await;
251 let mut filtered: Vec<LogEntry> = log_queue
252 .iter()
253 .filter(|entry| entry.level >= min_level)
254 .cloned()
255 .collect();
256
257 if let Some(limit_count) = limit {
258 let start = if filtered.len() > limit_count {
259 filtered.len() - limit_count
260 } else {
261 0
262 };
263 filtered = filtered[start..].to_vec();
264 }
265
266 filtered
267 } else {
268 Vec::new()
269 }
270 }
271
272 pub async fn get_task_logs(&self, workflow_id: &str, task_id: &str) -> Vec<LogEntry> {
274 if let Some(logs) = self.logs.get(workflow_id) {
275 let log_queue = logs.read().await;
276 log_queue
277 .iter()
278 .filter(|entry| entry.task_id.as_deref() == Some(task_id))
279 .cloned()
280 .collect()
281 } else {
282 Vec::new()
283 }
284 }
285
286 pub async fn clear_logs(&self, workflow_id: &str) {
288 if let Some(logs) = self.logs.get(workflow_id) {
289 let mut log_queue = logs.write().await;
290 log_queue.clear();
291 }
292 }
293
294 pub fn clear_all_logs(&self) {
296 self.logs.clear();
297 }
298
299 pub async fn get_log_count(&self, workflow_id: &str) -> usize {
301 if let Some(logs) = self.logs.get(workflow_id) {
302 let log_queue = logs.read().await;
303 log_queue.len()
304 } else {
305 0
306 }
307 }
308
309 pub async fn get_total_log_count(&self) -> usize {
311 let mut total = 0;
312 for entry in self.logs.iter() {
313 let log_queue = entry.value().read().await;
314 total += log_queue.len();
315 }
316 total
317 }
318
319 pub async fn export_logs_json(&self, workflow_id: &str) -> Result<String, serde_json::Error> {
321 let logs = self.get_logs(workflow_id).await;
322 serde_json::to_string_pretty(&logs)
323 }
324}
325
326impl Default for WorkflowLogger {
327 fn default() -> Self {
328 Self::new()
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[tokio::test]
337 async fn test_logger_creation() {
338 let logger = WorkflowLogger::new();
339 assert_eq!(logger.get_log_count("workflow1").await, 0);
340 }
341
342 #[tokio::test]
343 async fn test_logging() {
344 let logger = WorkflowLogger::new();
345
346 logger.info("workflow1", "Test message").await;
347
348 let logs = logger.get_logs("workflow1").await;
349 assert_eq!(logs.len(), 1);
350 assert_eq!(logs[0].message, "Test message");
351 }
352
353 #[tokio::test]
354 async fn test_log_levels() {
355 let logger = WorkflowLogger::with_config(100, LogLevel::Warn);
356
357 logger.info("workflow1", "Info message").await;
358 logger.warn("workflow1", "Warning message").await;
359 logger.error("workflow1", "Error message").await;
360
361 let logs = logger.get_logs("workflow1").await;
362 assert_eq!(logs.len(), 2);
364 }
365
366 #[tokio::test]
367 async fn test_log_filtering() {
368 let logger = WorkflowLogger::new();
369
370 logger.info("workflow1", "Info").await;
371 logger.warn("workflow1", "Warning").await;
372 logger.error("workflow1", "Error").await;
373
374 let filtered = logger
375 .get_logs_filtered("workflow1", LogLevel::Warn, None)
376 .await;
377
378 assert_eq!(filtered.len(), 2);
379 }
380
381 #[tokio::test]
382 async fn test_task_logs() {
383 let logger = WorkflowLogger::new();
384
385 let entry =
386 LogEntry::new(LogLevel::Info, "workflow1", "Task message").with_task_id("task1");
387
388 logger.log(entry).await;
389
390 let task_logs = logger.get_task_logs("workflow1", "task1").await;
391 assert_eq!(task_logs.len(), 1);
392 }
393
394 #[test]
395 fn test_log_entry_format() {
396 let entry = LogEntry::new(LogLevel::Info, "workflow1", "Test message")
397 .with_task_id("task1")
398 .with_context("key", "value");
399
400 let formatted = entry.format();
401 assert!(formatted.contains("INFO"));
402 assert!(formatted.contains("workflow1"));
403 assert!(formatted.contains("task1"));
404 assert!(formatted.contains("Test message"));
405 }
406}