rexis_rag/observability/
logging.rs

1//! # Log Aggregation and Search System
2//!
3//! Centralized logging with structured data, search capabilities,
4//! and real-time log streaming for RRAG system operations.
5
6use crate::{RragError, RragResult};
7use chrono::{DateTime, Duration, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::io::Write;
11use std::sync::Arc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13
14/// Logging configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct LogConfig {
17    pub enabled: bool,
18    pub level: LogLevel,
19    pub buffer_size: usize,
20    pub flush_interval_seconds: u64,
21    pub retention_days: u32,
22    pub structured_logging: bool,
23    pub include_stack_trace: bool,
24    pub log_to_file: bool,
25    pub log_file_path: Option<String>,
26    pub log_rotation_size_mb: u64,
27    pub max_log_files: u32,
28}
29
30impl Default for LogConfig {
31    fn default() -> Self {
32        Self {
33            enabled: true,
34            level: LogLevel::Info,
35            buffer_size: 10000,
36            flush_interval_seconds: 5,
37            retention_days: 30,
38            structured_logging: true,
39            include_stack_trace: false,
40            log_to_file: true,
41            log_file_path: Some("rrag.log".to_string()),
42            log_rotation_size_mb: 100,
43            max_log_files: 10,
44        }
45    }
46}
47
48/// Log levels
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
50pub enum LogLevel {
51    Trace = 0,
52    Debug = 1,
53    Info = 2,
54    Warn = 3,
55    Error = 4,
56    Fatal = 5,
57}
58
59impl std::fmt::Display for LogLevel {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::Trace => write!(f, "TRACE"),
63            Self::Debug => write!(f, "DEBUG"),
64            Self::Info => write!(f, "INFO"),
65            Self::Warn => write!(f, "WARN"),
66            Self::Error => write!(f, "ERROR"),
67            Self::Fatal => write!(f, "FATAL"),
68        }
69    }
70}
71
72impl From<&str> for LogLevel {
73    fn from(s: &str) -> Self {
74        match s.to_uppercase().as_str() {
75            "TRACE" => LogLevel::Trace,
76            "DEBUG" => LogLevel::Debug,
77            "INFO" => LogLevel::Info,
78            "WARN" | "WARNING" => LogLevel::Warn,
79            "ERROR" => LogLevel::Error,
80            "FATAL" | "CRITICAL" => LogLevel::Fatal,
81            _ => LogLevel::Info,
82        }
83    }
84}
85
86/// Structured log entry
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct LogEntry {
89    pub id: String,
90    pub timestamp: DateTime<Utc>,
91    pub level: LogLevel,
92    pub message: String,
93    pub component: String,
94    pub operation: Option<String>,
95    pub user_id: Option<String>,
96    pub session_id: Option<String>,
97    pub trace_id: Option<String>,
98    pub span_id: Option<String>,
99    pub fields: HashMap<String, serde_json::Value>,
100    pub stack_trace: Option<String>,
101    pub source_file: Option<String>,
102    pub source_line: Option<u32>,
103    pub duration_ms: Option<f64>,
104}
105
106impl LogEntry {
107    pub fn new(level: LogLevel, message: impl Into<String>, component: impl Into<String>) -> Self {
108        Self {
109            id: uuid::Uuid::new_v4().to_string(),
110            timestamp: Utc::now(),
111            level,
112            message: message.into(),
113            component: component.into(),
114            operation: None,
115            user_id: None,
116            session_id: None,
117            trace_id: None,
118            span_id: None,
119            fields: HashMap::new(),
120            stack_trace: None,
121            source_file: None,
122            source_line: None,
123            duration_ms: None,
124        }
125    }
126
127    pub fn with_operation(mut self, operation: impl Into<String>) -> Self {
128        self.operation = Some(operation.into());
129        self
130    }
131
132    pub fn with_user(mut self, user_id: impl Into<String>) -> Self {
133        self.user_id = Some(user_id.into());
134        self
135    }
136
137    pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
138        self.session_id = Some(session_id.into());
139        self
140    }
141
142    pub fn with_trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
143        self.trace_id = Some(trace_id.into());
144        self.span_id = Some(span_id.into());
145        self
146    }
147
148    pub fn with_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
149        self.fields.insert(key.into(), value);
150        self
151    }
152
153    pub fn with_duration(mut self, duration_ms: f64) -> Self {
154        self.duration_ms = Some(duration_ms);
155        self
156    }
157
158    pub fn with_source(mut self, file: impl Into<String>, line: u32) -> Self {
159        self.source_file = Some(file.into());
160        self.source_line = Some(line);
161        self
162    }
163
164    pub fn with_stack_trace(mut self, stack_trace: impl Into<String>) -> Self {
165        self.stack_trace = Some(stack_trace.into());
166        self
167    }
168
169    /// Format as JSON for structured logging
170    pub fn to_json(&self) -> RragResult<String> {
171        serde_json::to_string(self).map_err(|e| RragError::agent("log_formatter", e.to_string()))
172    }
173
174    /// Format as human-readable text
175    pub fn to_text(&self) -> String {
176        let timestamp = self.timestamp.format("%Y-%m-%d %H:%M:%S%.3f UTC");
177        let level_str = format!("{:5}", self.level);
178
179        let mut parts = vec![
180            format!("[{}]", timestamp),
181            format!("[{}]", level_str),
182            format!("[{}]", self.component),
183        ];
184
185        if let Some(ref operation) = self.operation {
186            parts.push(format!("[{}]", operation));
187        }
188
189        parts.push(self.message.clone());
190
191        if let Some(duration) = self.duration_ms {
192            parts.push(format!("({}ms)", duration));
193        }
194
195        if !self.fields.is_empty() {
196            let fields_str = self
197                .fields
198                .iter()
199                .map(|(k, v)| format!("{}={}", k, v))
200                .collect::<Vec<_>>()
201                .join(" ");
202            parts.push(format!("{{{}}}", fields_str));
203        }
204
205        parts.join(" ")
206    }
207}
208
209/// Log query for searching through logs
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct LogQuery {
212    pub level_filter: Option<LogLevel>,
213    pub component_filter: Option<String>,
214    pub operation_filter: Option<String>,
215    pub user_filter: Option<String>,
216    pub session_filter: Option<String>,
217    pub message_contains: Option<String>,
218    pub time_range: Option<TimeRange>,
219    pub limit: Option<usize>,
220    pub offset: Option<usize>,
221    pub sort_order: SortOrder,
222    pub field_filters: HashMap<String, FieldFilter>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct TimeRange {
227    pub start: DateTime<Utc>,
228    pub end: DateTime<Utc>,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub enum SortOrder {
233    Ascending,
234    Descending,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum FieldFilter {
239    Equals(serde_json::Value),
240    Contains(String),
241    GreaterThan(f64),
242    LessThan(f64),
243    Between(f64, f64),
244}
245
246impl Default for LogQuery {
247    fn default() -> Self {
248        Self {
249            level_filter: None,
250            component_filter: None,
251            operation_filter: None,
252            user_filter: None,
253            session_filter: None,
254            message_contains: None,
255            time_range: None,
256            limit: Some(100),
257            offset: None,
258            sort_order: SortOrder::Descending,
259            field_filters: HashMap::new(),
260        }
261    }
262}
263
264/// Log filter for real-time streaming
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct LogFilter {
267    pub min_level: LogLevel,
268    pub components: Vec<String>,
269    pub operations: Vec<String>,
270    pub include_fields: Vec<String>,
271    pub exclude_patterns: Vec<String>,
272}
273
274impl Default for LogFilter {
275    fn default() -> Self {
276        Self {
277            min_level: LogLevel::Info,
278            components: Vec::new(),
279            operations: Vec::new(),
280            include_fields: Vec::new(),
281            exclude_patterns: Vec::new(),
282        }
283    }
284}
285
286/// Log search engine for querying log entries
287pub struct LogSearchEngine {
288    logs: Arc<RwLock<Vec<LogEntry>>>,
289    max_size: usize,
290}
291
292impl LogSearchEngine {
293    pub fn new(max_size: usize) -> Self {
294        Self {
295            logs: Arc::new(RwLock::new(Vec::new())),
296            max_size,
297        }
298    }
299
300    pub async fn add_entry(&self, entry: LogEntry) {
301        let mut logs = self.logs.write().await;
302        logs.push(entry);
303
304        // Keep only recent entries
305        let logs_len = logs.len();
306        if logs_len > self.max_size {
307            logs.drain(0..logs_len - self.max_size);
308        }
309    }
310
311    pub async fn search(&self, query: &LogQuery) -> Vec<LogEntry> {
312        let logs = self.logs.read().await;
313        let mut results: Vec<_> = logs
314            .iter()
315            .filter(|entry| self.matches_query(entry, query))
316            .cloned()
317            .collect();
318
319        // Sort results
320        match query.sort_order {
321            SortOrder::Ascending => results.sort_by_key(|e| e.timestamp),
322            SortOrder::Descending => results.sort_by_key(|e| std::cmp::Reverse(e.timestamp)),
323        }
324
325        // Apply pagination
326        let start = query.offset.unwrap_or(0);
327        let end = if let Some(limit) = query.limit {
328            std::cmp::min(start + limit, results.len())
329        } else {
330            results.len()
331        };
332
333        if start < results.len() {
334            results[start..end].to_vec()
335        } else {
336            Vec::new()
337        }
338    }
339
340    fn matches_query(&self, entry: &LogEntry, query: &LogQuery) -> bool {
341        // Level filter
342        if let Some(min_level) = query.level_filter {
343            if entry.level < min_level {
344                return false;
345            }
346        }
347
348        // Component filter
349        if let Some(ref component) = query.component_filter {
350            if entry.component != *component {
351                return false;
352            }
353        }
354
355        // Operation filter
356        if let Some(ref operation) = query.operation_filter {
357            if entry.operation.as_ref() != Some(operation) {
358                return false;
359            }
360        }
361
362        // User filter
363        if let Some(ref user) = query.user_filter {
364            if entry.user_id.as_ref() != Some(user) {
365                return false;
366            }
367        }
368
369        // Session filter
370        if let Some(ref session) = query.session_filter {
371            if entry.session_id.as_ref() != Some(session) {
372                return false;
373            }
374        }
375
376        // Message contains filter
377        if let Some(ref text) = query.message_contains {
378            if !entry.message.to_lowercase().contains(&text.to_lowercase()) {
379                return false;
380            }
381        }
382
383        // Time range filter
384        if let Some(ref range) = query.time_range {
385            if entry.timestamp < range.start || entry.timestamp > range.end {
386                return false;
387            }
388        }
389
390        // Field filters
391        for (field_name, field_filter) in &query.field_filters {
392            if let Some(field_value) = entry.fields.get(field_name) {
393                if !self.matches_field_filter(field_value, field_filter) {
394                    return false;
395                }
396            } else {
397                return false; // Field doesn't exist
398            }
399        }
400
401        true
402    }
403
404    fn matches_field_filter(&self, value: &serde_json::Value, filter: &FieldFilter) -> bool {
405        match filter {
406            FieldFilter::Equals(expected) => value == expected,
407            FieldFilter::Contains(text) => {
408                if let Some(s) = value.as_str() {
409                    s.to_lowercase().contains(&text.to_lowercase())
410                } else {
411                    false
412                }
413            }
414            FieldFilter::GreaterThan(threshold) => {
415                if let Some(num) = value.as_f64() {
416                    num > *threshold
417                } else {
418                    false
419                }
420            }
421            FieldFilter::LessThan(threshold) => {
422                if let Some(num) = value.as_f64() {
423                    num < *threshold
424                } else {
425                    false
426                }
427            }
428            FieldFilter::Between(min, max) => {
429                if let Some(num) = value.as_f64() {
430                    num >= *min && num <= *max
431                } else {
432                    false
433                }
434            }
435        }
436    }
437
438    pub async fn get_log_stats(&self) -> LogStats {
439        let logs = self.logs.read().await;
440
441        let mut level_counts = HashMap::new();
442        let mut component_counts = HashMap::new();
443        let total_entries = logs.len();
444
445        for entry in logs.iter() {
446            *level_counts.entry(entry.level).or_insert(0) += 1;
447            *component_counts.entry(entry.component.clone()).or_insert(0) += 1;
448        }
449
450        let recent_errors = logs
451            .iter()
452            .filter(|e| e.level >= LogLevel::Error)
453            .filter(|e| e.timestamp > Utc::now() - Duration::hours(1))
454            .count();
455
456        LogStats {
457            total_entries,
458            entries_by_level: level_counts,
459            entries_by_component: component_counts,
460            recent_errors_count: recent_errors,
461            oldest_entry: logs.first().map(|e| e.timestamp),
462            newest_entry: logs.last().map(|e| e.timestamp),
463        }
464    }
465}
466
467/// Log statistics
468#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct LogStats {
470    pub total_entries: usize,
471    pub entries_by_level: HashMap<LogLevel, usize>,
472    pub entries_by_component: HashMap<String, usize>,
473    pub recent_errors_count: usize,
474    pub oldest_entry: Option<DateTime<Utc>>,
475    pub newest_entry: Option<DateTime<Utc>>,
476}
477
478/// Structured logger implementation
479pub struct StructuredLogger {
480    config: LogConfig,
481    sender: mpsc::UnboundedSender<LogEntry>,
482}
483
484impl StructuredLogger {
485    pub fn new(config: LogConfig, sender: mpsc::UnboundedSender<LogEntry>) -> Self {
486        Self { config, sender }
487    }
488
489    pub fn trace(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
490        self.log(LogLevel::Trace, message, component)
491    }
492
493    pub fn debug(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
494        self.log(LogLevel::Debug, message, component)
495    }
496
497    pub fn info(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
498        self.log(LogLevel::Info, message, component)
499    }
500
501    pub fn warn(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
502        self.log(LogLevel::Warn, message, component)
503    }
504
505    pub fn error(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
506        self.log(LogLevel::Error, message, component)
507    }
508
509    pub fn fatal(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
510        self.log(LogLevel::Fatal, message, component)
511    }
512
513    fn log(
514        &self,
515        level: LogLevel,
516        message: impl Into<String>,
517        component: impl Into<String>,
518    ) -> LogBuilder {
519        LogBuilder::new(level, message, component, self.sender.clone())
520    }
521}
522
523/// Fluent builder for log entries
524pub struct LogBuilder {
525    entry: LogEntry,
526    sender: mpsc::UnboundedSender<LogEntry>,
527}
528
529impl LogBuilder {
530    fn new(
531        level: LogLevel,
532        message: impl Into<String>,
533        component: impl Into<String>,
534        sender: mpsc::UnboundedSender<LogEntry>,
535    ) -> Self {
536        Self {
537            entry: LogEntry::new(level, message, component),
538            sender,
539        }
540    }
541
542    pub fn operation(mut self, operation: impl Into<String>) -> Self {
543        self.entry = self.entry.with_operation(operation);
544        self
545    }
546
547    pub fn user(mut self, user_id: impl Into<String>) -> Self {
548        self.entry = self.entry.with_user(user_id);
549        self
550    }
551
552    pub fn session(mut self, session_id: impl Into<String>) -> Self {
553        self.entry = self.entry.with_session(session_id);
554        self
555    }
556
557    pub fn trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
558        self.entry = self.entry.with_trace(trace_id, span_id);
559        self
560    }
561
562    pub fn field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
563        self.entry = self.entry.with_field(key, value);
564        self
565    }
566
567    pub fn duration(mut self, duration_ms: f64) -> Self {
568        self.entry = self.entry.with_duration(duration_ms);
569        self
570    }
571
572    pub fn source(mut self, file: impl Into<String>, line: u32) -> Self {
573        self.entry = self.entry.with_source(file, line);
574        self
575    }
576
577    pub fn send(self) {
578        let _ = self.sender.send(self.entry);
579    }
580}
581
582/// Main log aggregator
583pub struct LogAggregator {
584    config: LogConfig,
585    search_engine: Arc<LogSearchEngine>,
586    logger: Arc<StructuredLogger>,
587    log_sender: mpsc::UnboundedSender<LogEntry>,
588    log_receiver: Arc<RwLock<Option<mpsc::UnboundedReceiver<LogEntry>>>>,
589    file_writer: Option<Arc<RwLock<std::fs::File>>>,
590    stream_sender: broadcast::Sender<LogEntry>,
591    _stream_receiver: broadcast::Receiver<LogEntry>,
592    processing_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
593    is_running: Arc<RwLock<bool>>,
594}
595
596impl LogAggregator {
597    pub async fn new(config: LogConfig) -> RragResult<Self> {
598        let search_engine = Arc::new(LogSearchEngine::new(config.buffer_size));
599        let (log_sender, log_receiver) = mpsc::unbounded_channel();
600        let logger = Arc::new(StructuredLogger::new(config.clone(), log_sender.clone()));
601        let (stream_sender, stream_receiver) = broadcast::channel(1000);
602
603        // Initialize file writer if needed
604        let file_writer = if config.log_to_file {
605            if let Some(ref path) = config.log_file_path {
606                let file = std::fs::OpenOptions::new()
607                    .create(true)
608                    .append(true)
609                    .open(path)
610                    .map_err(|e| RragError::storage("log_file_create", e))?;
611                Some(Arc::new(RwLock::new(file)))
612            } else {
613                None
614            }
615        } else {
616            None
617        };
618
619        Ok(Self {
620            config,
621            search_engine,
622            logger,
623            log_sender,
624            log_receiver: Arc::new(RwLock::new(Some(log_receiver))),
625            file_writer,
626            stream_sender,
627            _stream_receiver: stream_receiver,
628            processing_handle: Arc::new(RwLock::new(None)),
629            is_running: Arc::new(RwLock::new(false)),
630        })
631    }
632
633    pub async fn start(&self) -> RragResult<()> {
634        let mut running = self.is_running.write().await;
635        if *running {
636            return Err(RragError::config(
637                "log_aggregator",
638                "stopped",
639                "already running",
640            ));
641        }
642
643        {
644            let mut receiver_guard = self.log_receiver.write().await;
645            if let Some(receiver) = receiver_guard.take() {
646                let handle = self.start_processing_loop(receiver).await?;
647                let mut handle_guard = self.processing_handle.write().await;
648                *handle_guard = Some(handle);
649            }
650        }
651
652        *running = true;
653        tracing::info!("Log aggregator started");
654        Ok(())
655    }
656
657    pub async fn stop(&self) -> RragResult<()> {
658        let mut running = self.is_running.write().await;
659        if !*running {
660            return Ok(());
661        }
662
663        {
664            let mut handle_guard = self.processing_handle.write().await;
665            if let Some(handle) = handle_guard.take() {
666                handle.abort();
667            }
668        }
669
670        // Flush any remaining logs
671        if let Some(ref file_writer) = self.file_writer {
672            let mut file = file_writer.write().await;
673            let _ = file.flush();
674        }
675
676        *running = false;
677        tracing::info!("Log aggregator stopped");
678        Ok(())
679    }
680
681    pub async fn is_healthy(&self) -> bool {
682        *self.is_running.read().await
683    }
684
685    async fn start_processing_loop(
686        &self,
687        mut receiver: mpsc::UnboundedReceiver<LogEntry>,
688    ) -> RragResult<tokio::task::JoinHandle<()>> {
689        let search_engine = self.search_engine.clone();
690        let file_writer = self.file_writer.clone();
691        let stream_sender = self.stream_sender.clone();
692        let config = self.config.clone();
693        let is_running = self.is_running.clone();
694
695        let handle = tokio::spawn(async move {
696            let mut flush_interval = tokio::time::interval(tokio::time::Duration::from_secs(
697                config.flush_interval_seconds,
698            ));
699
700            while *is_running.read().await {
701                tokio::select! {
702                    Some(entry) = receiver.recv() => {
703                        // Check if entry meets minimum level requirement
704                        if entry.level >= config.level {
705                            // Add to search engine
706                            search_engine.add_entry(entry.clone()).await;
707
708                            // Write to file if configured
709                            if let Some(ref writer) = file_writer {
710                                let log_line = if config.structured_logging {
711                                    entry.to_json().unwrap_or_else(|_| entry.to_text())
712                                } else {
713                                    entry.to_text()
714                                };
715
716                                let mut file = writer.write().await;
717                                if writeln!(file, "{}", log_line).is_err() {
718                                    tracing::debug!("Failed to write to log file");
719                                }
720                            }
721
722                            // Send to live stream subscribers
723                            let _ = stream_sender.send(entry);
724                        }
725                    }
726                    _ = flush_interval.tick() => {
727                        // Periodic flush
728                        if let Some(ref writer) = file_writer {
729                            let mut file = writer.write().await;
730                            let _ = file.flush();
731                        }
732                    }
733                }
734            }
735        });
736
737        Ok(handle)
738    }
739
740    pub fn logger(&self) -> &Arc<StructuredLogger> {
741        &self.logger
742    }
743
744    pub async fn search_logs(&self, query: &LogQuery) -> Vec<LogEntry> {
745        self.search_engine.search(query).await
746    }
747
748    pub async fn get_stats(&self) -> LogStats {
749        self.search_engine.get_log_stats().await
750    }
751
752    pub fn subscribe_to_stream(&self) -> broadcast::Receiver<LogEntry> {
753        self.stream_sender.subscribe()
754    }
755
756    pub async fn add_log_entry(&self, entry: LogEntry) -> RragResult<()> {
757        self.log_sender
758            .send(entry)
759            .map_err(|e| RragError::agent("log_aggregator", e.to_string()))?;
760        Ok(())
761    }
762
763    /// Convenience method for creating a log entry and adding it
764    pub async fn log(
765        &self,
766        level: LogLevel,
767        message: impl Into<String>,
768        component: impl Into<String>,
769    ) -> RragResult<()> {
770        let entry = LogEntry::new(level, message, component);
771        self.add_log_entry(entry).await
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778
779    #[tokio::test]
780    async fn test_log_entry_creation() {
781        let entry = LogEntry::new(LogLevel::Info, "Test message", "test_component")
782            .with_operation("test_operation")
783            .with_user("user123")
784            .with_session("session456")
785            .with_field("custom_field", serde_json::json!("custom_value"))
786            .with_duration(150.5);
787
788        assert_eq!(entry.level, LogLevel::Info);
789        assert_eq!(entry.message, "Test message");
790        assert_eq!(entry.component, "test_component");
791        assert_eq!(entry.operation.as_ref().unwrap(), "test_operation");
792        assert_eq!(entry.user_id.as_ref().unwrap(), "user123");
793        assert_eq!(entry.session_id.as_ref().unwrap(), "session456");
794        assert_eq!(entry.duration_ms.unwrap(), 150.5);
795        assert!(entry.fields.contains_key("custom_field"));
796
797        // Test formatting
798        let json_str = entry.to_json().unwrap();
799        assert!(json_str.contains("Test message"));
800        assert!(json_str.contains("INFO"));
801
802        let text_str = entry.to_text();
803        assert!(text_str.contains("Test message"));
804        assert!(text_str.contains("INFO"));
805        assert!(text_str.contains("test_component"));
806    }
807
808    #[tokio::test]
809    async fn test_log_search_engine() {
810        let engine = LogSearchEngine::new(1000);
811
812        // Add test log entries
813        let entries = vec![
814            LogEntry::new(LogLevel::Info, "Info message", "component1"),
815            LogEntry::new(LogLevel::Error, "Error message", "component1"),
816            LogEntry::new(LogLevel::Warn, "Warning message", "component2").with_user("user123"),
817            LogEntry::new(LogLevel::Debug, "Debug message", "component2"),
818        ];
819
820        for entry in entries {
821            engine.add_entry(entry).await;
822        }
823
824        // Test level filter
825        let query = LogQuery {
826            level_filter: Some(LogLevel::Warn),
827            ..Default::default()
828        };
829        let results = engine.search(&query).await;
830        assert_eq!(results.len(), 2); // Warn and Error (Error >= Warn)
831
832        // Test component filter
833        let query = LogQuery {
834            component_filter: Some("component1".to_string()),
835            ..Default::default()
836        };
837        let results = engine.search(&query).await;
838        assert_eq!(results.len(), 2);
839
840        // Test user filter
841        let query = LogQuery {
842            user_filter: Some("user123".to_string()),
843            ..Default::default()
844        };
845        let results = engine.search(&query).await;
846        assert_eq!(results.len(), 1);
847        assert_eq!(results[0].level, LogLevel::Warn);
848
849        // Test message contains
850        let query = LogQuery {
851            message_contains: Some("Error".to_string()),
852            ..Default::default()
853        };
854        let results = engine.search(&query).await;
855        assert_eq!(results.len(), 1);
856        assert_eq!(results[0].level, LogLevel::Error);
857
858        // Test stats
859        let stats = engine.get_log_stats().await;
860        assert_eq!(stats.total_entries, 4);
861        assert_eq!(stats.entries_by_level[&LogLevel::Info], 1);
862        assert_eq!(stats.entries_by_level[&LogLevel::Error], 1);
863        assert_eq!(stats.entries_by_component["component1"], 2);
864        assert_eq!(stats.entries_by_component["component2"], 2);
865    }
866
867    #[tokio::test]
868    async fn test_log_aggregator() {
869        let config = LogConfig {
870            log_to_file: false, // Don't write to file in tests
871            ..Default::default()
872        };
873
874        let mut aggregator = LogAggregator::new(config).await.unwrap();
875        assert!(!aggregator.is_healthy().await);
876
877        aggregator.start().await.unwrap();
878        assert!(aggregator.is_healthy().await);
879
880        // Test logging through the structured logger
881        let logger = aggregator.logger();
882        logger
883            .info("Test info message", "test_component")
884            .user("user123")
885            .operation("test_operation")
886            .field("test_field", serde_json::json!("test_value"))
887            .send();
888
889        logger
890            .error("Test error message", "test_component")
891            .session("session456")
892            .duration(200.0)
893            .send();
894
895        // Give some time for processing
896        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
897
898        // Test search
899        let query = LogQuery {
900            level_filter: Some(LogLevel::Info),
901            ..Default::default()
902        };
903        let results = aggregator.search_logs(&query).await;
904        assert!(results.len() >= 2);
905
906        let stats = aggregator.get_stats().await;
907        assert!(stats.total_entries >= 2);
908
909        aggregator.stop().await.unwrap();
910        assert!(!aggregator.is_healthy().await);
911    }
912
913    #[test]
914    fn test_log_level_ordering() {
915        assert!(LogLevel::Fatal > LogLevel::Error);
916        assert!(LogLevel::Error > LogLevel::Warn);
917        assert!(LogLevel::Warn > LogLevel::Info);
918        assert!(LogLevel::Info > LogLevel::Debug);
919        assert!(LogLevel::Debug > LogLevel::Trace);
920    }
921
922    #[test]
923    fn test_log_level_from_string() {
924        assert_eq!(LogLevel::from("INFO"), LogLevel::Info);
925        assert_eq!(LogLevel::from("info"), LogLevel::Info);
926        assert_eq!(LogLevel::from("ERROR"), LogLevel::Error);
927        assert_eq!(LogLevel::from("WARN"), LogLevel::Warn);
928        assert_eq!(LogLevel::from("WARNING"), LogLevel::Warn);
929        assert_eq!(LogLevel::from("FATAL"), LogLevel::Fatal);
930        assert_eq!(LogLevel::from("unknown"), LogLevel::Info); // Default
931    }
932
933    #[tokio::test]
934    async fn test_field_filters() {
935        let engine = LogSearchEngine::new(1000);
936
937        let entry = LogEntry::new(LogLevel::Info, "Test message", "component")
938            .with_field("number", serde_json::json!(42))
939            .with_field("text", serde_json::json!("hello world"))
940            .with_field("decimal", serde_json::json!(3.14));
941
942        engine.add_entry(entry).await;
943
944        // Test equals filter
945        let mut query = LogQuery::default();
946        query.field_filters.insert(
947            "number".to_string(),
948            FieldFilter::Equals(serde_json::json!(42)),
949        );
950        let results = engine.search(&query).await;
951        assert_eq!(results.len(), 1);
952
953        // Test contains filter
954        let mut query = LogQuery::default();
955        query.field_filters.insert(
956            "text".to_string(),
957            FieldFilter::Contains("hello".to_string()),
958        );
959        let results = engine.search(&query).await;
960        assert_eq!(results.len(), 1);
961
962        // Test greater than filter
963        let mut query = LogQuery::default();
964        query
965            .field_filters
966            .insert("number".to_string(), FieldFilter::GreaterThan(40.0));
967        let results = engine.search(&query).await;
968        assert_eq!(results.len(), 1);
969
970        // Test between filter
971        let mut query = LogQuery::default();
972        query
973            .field_filters
974            .insert("decimal".to_string(), FieldFilter::Between(3.0, 4.0));
975        let results = engine.search(&query).await;
976        assert_eq!(results.len(), 1);
977    }
978
979    #[tokio::test]
980    async fn test_log_streaming() {
981        let config = LogConfig {
982            log_to_file: false,
983            ..Default::default()
984        };
985
986        let mut aggregator = LogAggregator::new(config).await.unwrap();
987        aggregator.start().await.unwrap();
988
989        let mut stream = aggregator.subscribe_to_stream();
990
991        // Send a log entry
992        let entry = LogEntry::new(LogLevel::Info, "Stream test", "test_component");
993        aggregator.add_log_entry(entry.clone()).await.unwrap();
994
995        // Receive the streamed entry
996        let received = tokio::time::timeout(tokio::time::Duration::from_millis(100), stream.recv())
997            .await
998            .unwrap()
999            .unwrap();
1000
1001        assert_eq!(received.message, "Stream test");
1002        assert_eq!(received.component, "test_component");
1003
1004        aggregator.stop().await.unwrap();
1005    }
1006}