1use crate::{RragError, RragResult};
7use chrono::{DateTime, Duration, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::io::Write;
11use std::sync::Arc;
12use tokio::sync::{broadcast, mpsc, RwLock};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct LogConfig {
17 pub enabled: bool,
18 pub level: LogLevel,
19 pub buffer_size: usize,
20 pub flush_interval_seconds: u64,
21 pub retention_days: u32,
22 pub structured_logging: bool,
23 pub include_stack_trace: bool,
24 pub log_to_file: bool,
25 pub log_file_path: Option<String>,
26 pub log_rotation_size_mb: u64,
27 pub max_log_files: u32,
28}
29
30impl Default for LogConfig {
31 fn default() -> Self {
32 Self {
33 enabled: true,
34 level: LogLevel::Info,
35 buffer_size: 10000,
36 flush_interval_seconds: 5,
37 retention_days: 30,
38 structured_logging: true,
39 include_stack_trace: false,
40 log_to_file: true,
41 log_file_path: Some("rrag.log".to_string()),
42 log_rotation_size_mb: 100,
43 max_log_files: 10,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
50pub enum LogLevel {
51 Trace = 0,
52 Debug = 1,
53 Info = 2,
54 Warn = 3,
55 Error = 4,
56 Fatal = 5,
57}
58
59impl std::fmt::Display for LogLevel {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Self::Trace => write!(f, "TRACE"),
63 Self::Debug => write!(f, "DEBUG"),
64 Self::Info => write!(f, "INFO"),
65 Self::Warn => write!(f, "WARN"),
66 Self::Error => write!(f, "ERROR"),
67 Self::Fatal => write!(f, "FATAL"),
68 }
69 }
70}
71
72impl From<&str> for LogLevel {
73 fn from(s: &str) -> Self {
74 match s.to_uppercase().as_str() {
75 "TRACE" => LogLevel::Trace,
76 "DEBUG" => LogLevel::Debug,
77 "INFO" => LogLevel::Info,
78 "WARN" | "WARNING" => LogLevel::Warn,
79 "ERROR" => LogLevel::Error,
80 "FATAL" | "CRITICAL" => LogLevel::Fatal,
81 _ => LogLevel::Info,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct LogEntry {
89 pub id: String,
90 pub timestamp: DateTime<Utc>,
91 pub level: LogLevel,
92 pub message: String,
93 pub component: String,
94 pub operation: Option<String>,
95 pub user_id: Option<String>,
96 pub session_id: Option<String>,
97 pub trace_id: Option<String>,
98 pub span_id: Option<String>,
99 pub fields: HashMap<String, serde_json::Value>,
100 pub stack_trace: Option<String>,
101 pub source_file: Option<String>,
102 pub source_line: Option<u32>,
103 pub duration_ms: Option<f64>,
104}
105
106impl LogEntry {
107 pub fn new(level: LogLevel, message: impl Into<String>, component: impl Into<String>) -> Self {
108 Self {
109 id: uuid::Uuid::new_v4().to_string(),
110 timestamp: Utc::now(),
111 level,
112 message: message.into(),
113 component: component.into(),
114 operation: None,
115 user_id: None,
116 session_id: None,
117 trace_id: None,
118 span_id: None,
119 fields: HashMap::new(),
120 stack_trace: None,
121 source_file: None,
122 source_line: None,
123 duration_ms: None,
124 }
125 }
126
127 pub fn with_operation(mut self, operation: impl Into<String>) -> Self {
128 self.operation = Some(operation.into());
129 self
130 }
131
132 pub fn with_user(mut self, user_id: impl Into<String>) -> Self {
133 self.user_id = Some(user_id.into());
134 self
135 }
136
137 pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
138 self.session_id = Some(session_id.into());
139 self
140 }
141
142 pub fn with_trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
143 self.trace_id = Some(trace_id.into());
144 self.span_id = Some(span_id.into());
145 self
146 }
147
148 pub fn with_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
149 self.fields.insert(key.into(), value);
150 self
151 }
152
153 pub fn with_duration(mut self, duration_ms: f64) -> Self {
154 self.duration_ms = Some(duration_ms);
155 self
156 }
157
158 pub fn with_source(mut self, file: impl Into<String>, line: u32) -> Self {
159 self.source_file = Some(file.into());
160 self.source_line = Some(line);
161 self
162 }
163
164 pub fn with_stack_trace(mut self, stack_trace: impl Into<String>) -> Self {
165 self.stack_trace = Some(stack_trace.into());
166 self
167 }
168
169 pub fn to_json(&self) -> RragResult<String> {
171 serde_json::to_string(self).map_err(|e| RragError::agent("log_formatter", e.to_string()))
172 }
173
174 pub fn to_text(&self) -> String {
176 let timestamp = self.timestamp.format("%Y-%m-%d %H:%M:%S%.3f UTC");
177 let level_str = format!("{:5}", self.level);
178
179 let mut parts = vec![
180 format!("[{}]", timestamp),
181 format!("[{}]", level_str),
182 format!("[{}]", self.component),
183 ];
184
185 if let Some(ref operation) = self.operation {
186 parts.push(format!("[{}]", operation));
187 }
188
189 parts.push(self.message.clone());
190
191 if let Some(duration) = self.duration_ms {
192 parts.push(format!("({}ms)", duration));
193 }
194
195 if !self.fields.is_empty() {
196 let fields_str = self
197 .fields
198 .iter()
199 .map(|(k, v)| format!("{}={}", k, v))
200 .collect::<Vec<_>>()
201 .join(" ");
202 parts.push(format!("{{{}}}", fields_str));
203 }
204
205 parts.join(" ")
206 }
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct LogQuery {
212 pub level_filter: Option<LogLevel>,
213 pub component_filter: Option<String>,
214 pub operation_filter: Option<String>,
215 pub user_filter: Option<String>,
216 pub session_filter: Option<String>,
217 pub message_contains: Option<String>,
218 pub time_range: Option<TimeRange>,
219 pub limit: Option<usize>,
220 pub offset: Option<usize>,
221 pub sort_order: SortOrder,
222 pub field_filters: HashMap<String, FieldFilter>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct TimeRange {
227 pub start: DateTime<Utc>,
228 pub end: DateTime<Utc>,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub enum SortOrder {
233 Ascending,
234 Descending,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum FieldFilter {
239 Equals(serde_json::Value),
240 Contains(String),
241 GreaterThan(f64),
242 LessThan(f64),
243 Between(f64, f64),
244}
245
246impl Default for LogQuery {
247 fn default() -> Self {
248 Self {
249 level_filter: None,
250 component_filter: None,
251 operation_filter: None,
252 user_filter: None,
253 session_filter: None,
254 message_contains: None,
255 time_range: None,
256 limit: Some(100),
257 offset: None,
258 sort_order: SortOrder::Descending,
259 field_filters: HashMap::new(),
260 }
261 }
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct LogFilter {
267 pub min_level: LogLevel,
268 pub components: Vec<String>,
269 pub operations: Vec<String>,
270 pub include_fields: Vec<String>,
271 pub exclude_patterns: Vec<String>,
272}
273
274impl Default for LogFilter {
275 fn default() -> Self {
276 Self {
277 min_level: LogLevel::Info,
278 components: Vec::new(),
279 operations: Vec::new(),
280 include_fields: Vec::new(),
281 exclude_patterns: Vec::new(),
282 }
283 }
284}
285
286pub struct LogSearchEngine {
288 logs: Arc<RwLock<Vec<LogEntry>>>,
289 max_size: usize,
290}
291
292impl LogSearchEngine {
293 pub fn new(max_size: usize) -> Self {
294 Self {
295 logs: Arc::new(RwLock::new(Vec::new())),
296 max_size,
297 }
298 }
299
300 pub async fn add_entry(&self, entry: LogEntry) {
301 let mut logs = self.logs.write().await;
302 logs.push(entry);
303
304 let logs_len = logs.len();
306 if logs_len > self.max_size {
307 logs.drain(0..logs_len - self.max_size);
308 }
309 }
310
311 pub async fn search(&self, query: &LogQuery) -> Vec<LogEntry> {
312 let logs = self.logs.read().await;
313 let mut results: Vec<_> = logs
314 .iter()
315 .filter(|entry| self.matches_query(entry, query))
316 .cloned()
317 .collect();
318
319 match query.sort_order {
321 SortOrder::Ascending => results.sort_by_key(|e| e.timestamp),
322 SortOrder::Descending => results.sort_by_key(|e| std::cmp::Reverse(e.timestamp)),
323 }
324
325 let start = query.offset.unwrap_or(0);
327 let end = if let Some(limit) = query.limit {
328 std::cmp::min(start + limit, results.len())
329 } else {
330 results.len()
331 };
332
333 if start < results.len() {
334 results[start..end].to_vec()
335 } else {
336 Vec::new()
337 }
338 }
339
340 fn matches_query(&self, entry: &LogEntry, query: &LogQuery) -> bool {
341 if let Some(min_level) = query.level_filter {
343 if entry.level < min_level {
344 return false;
345 }
346 }
347
348 if let Some(ref component) = query.component_filter {
350 if entry.component != *component {
351 return false;
352 }
353 }
354
355 if let Some(ref operation) = query.operation_filter {
357 if entry.operation.as_ref() != Some(operation) {
358 return false;
359 }
360 }
361
362 if let Some(ref user) = query.user_filter {
364 if entry.user_id.as_ref() != Some(user) {
365 return false;
366 }
367 }
368
369 if let Some(ref session) = query.session_filter {
371 if entry.session_id.as_ref() != Some(session) {
372 return false;
373 }
374 }
375
376 if let Some(ref text) = query.message_contains {
378 if !entry.message.to_lowercase().contains(&text.to_lowercase()) {
379 return false;
380 }
381 }
382
383 if let Some(ref range) = query.time_range {
385 if entry.timestamp < range.start || entry.timestamp > range.end {
386 return false;
387 }
388 }
389
390 for (field_name, field_filter) in &query.field_filters {
392 if let Some(field_value) = entry.fields.get(field_name) {
393 if !self.matches_field_filter(field_value, field_filter) {
394 return false;
395 }
396 } else {
397 return false; }
399 }
400
401 true
402 }
403
404 fn matches_field_filter(&self, value: &serde_json::Value, filter: &FieldFilter) -> bool {
405 match filter {
406 FieldFilter::Equals(expected) => value == expected,
407 FieldFilter::Contains(text) => {
408 if let Some(s) = value.as_str() {
409 s.to_lowercase().contains(&text.to_lowercase())
410 } else {
411 false
412 }
413 }
414 FieldFilter::GreaterThan(threshold) => {
415 if let Some(num) = value.as_f64() {
416 num > *threshold
417 } else {
418 false
419 }
420 }
421 FieldFilter::LessThan(threshold) => {
422 if let Some(num) = value.as_f64() {
423 num < *threshold
424 } else {
425 false
426 }
427 }
428 FieldFilter::Between(min, max) => {
429 if let Some(num) = value.as_f64() {
430 num >= *min && num <= *max
431 } else {
432 false
433 }
434 }
435 }
436 }
437
438 pub async fn get_log_stats(&self) -> LogStats {
439 let logs = self.logs.read().await;
440
441 let mut level_counts = HashMap::new();
442 let mut component_counts = HashMap::new();
443 let total_entries = logs.len();
444
445 for entry in logs.iter() {
446 *level_counts.entry(entry.level).or_insert(0) += 1;
447 *component_counts.entry(entry.component.clone()).or_insert(0) += 1;
448 }
449
450 let recent_errors = logs
451 .iter()
452 .filter(|e| e.level >= LogLevel::Error)
453 .filter(|e| e.timestamp > Utc::now() - Duration::hours(1))
454 .count();
455
456 LogStats {
457 total_entries,
458 entries_by_level: level_counts,
459 entries_by_component: component_counts,
460 recent_errors_count: recent_errors,
461 oldest_entry: logs.first().map(|e| e.timestamp),
462 newest_entry: logs.last().map(|e| e.timestamp),
463 }
464 }
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct LogStats {
470 pub total_entries: usize,
471 pub entries_by_level: HashMap<LogLevel, usize>,
472 pub entries_by_component: HashMap<String, usize>,
473 pub recent_errors_count: usize,
474 pub oldest_entry: Option<DateTime<Utc>>,
475 pub newest_entry: Option<DateTime<Utc>>,
476}
477
478pub struct StructuredLogger {
480 config: LogConfig,
481 sender: mpsc::UnboundedSender<LogEntry>,
482}
483
484impl StructuredLogger {
485 pub fn new(config: LogConfig, sender: mpsc::UnboundedSender<LogEntry>) -> Self {
486 Self { config, sender }
487 }
488
489 pub fn trace(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
490 self.log(LogLevel::Trace, message, component)
491 }
492
493 pub fn debug(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
494 self.log(LogLevel::Debug, message, component)
495 }
496
497 pub fn info(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
498 self.log(LogLevel::Info, message, component)
499 }
500
501 pub fn warn(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
502 self.log(LogLevel::Warn, message, component)
503 }
504
505 pub fn error(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
506 self.log(LogLevel::Error, message, component)
507 }
508
509 pub fn fatal(&self, message: impl Into<String>, component: impl Into<String>) -> LogBuilder {
510 self.log(LogLevel::Fatal, message, component)
511 }
512
513 fn log(
514 &self,
515 level: LogLevel,
516 message: impl Into<String>,
517 component: impl Into<String>,
518 ) -> LogBuilder {
519 LogBuilder::new(level, message, component, self.sender.clone())
520 }
521}
522
523pub struct LogBuilder {
525 entry: LogEntry,
526 sender: mpsc::UnboundedSender<LogEntry>,
527}
528
529impl LogBuilder {
530 fn new(
531 level: LogLevel,
532 message: impl Into<String>,
533 component: impl Into<String>,
534 sender: mpsc::UnboundedSender<LogEntry>,
535 ) -> Self {
536 Self {
537 entry: LogEntry::new(level, message, component),
538 sender,
539 }
540 }
541
542 pub fn operation(mut self, operation: impl Into<String>) -> Self {
543 self.entry = self.entry.with_operation(operation);
544 self
545 }
546
547 pub fn user(mut self, user_id: impl Into<String>) -> Self {
548 self.entry = self.entry.with_user(user_id);
549 self
550 }
551
552 pub fn session(mut self, session_id: impl Into<String>) -> Self {
553 self.entry = self.entry.with_session(session_id);
554 self
555 }
556
557 pub fn trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
558 self.entry = self.entry.with_trace(trace_id, span_id);
559 self
560 }
561
562 pub fn field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
563 self.entry = self.entry.with_field(key, value);
564 self
565 }
566
567 pub fn duration(mut self, duration_ms: f64) -> Self {
568 self.entry = self.entry.with_duration(duration_ms);
569 self
570 }
571
572 pub fn source(mut self, file: impl Into<String>, line: u32) -> Self {
573 self.entry = self.entry.with_source(file, line);
574 self
575 }
576
577 pub fn send(self) {
578 let _ = self.sender.send(self.entry);
579 }
580}
581
582pub struct LogAggregator {
584 config: LogConfig,
585 search_engine: Arc<LogSearchEngine>,
586 logger: Arc<StructuredLogger>,
587 log_sender: mpsc::UnboundedSender<LogEntry>,
588 log_receiver: Arc<RwLock<Option<mpsc::UnboundedReceiver<LogEntry>>>>,
589 file_writer: Option<Arc<RwLock<std::fs::File>>>,
590 stream_sender: broadcast::Sender<LogEntry>,
591 _stream_receiver: broadcast::Receiver<LogEntry>,
592 processing_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
593 is_running: Arc<RwLock<bool>>,
594}
595
596impl LogAggregator {
597 pub async fn new(config: LogConfig) -> RragResult<Self> {
598 let search_engine = Arc::new(LogSearchEngine::new(config.buffer_size));
599 let (log_sender, log_receiver) = mpsc::unbounded_channel();
600 let logger = Arc::new(StructuredLogger::new(config.clone(), log_sender.clone()));
601 let (stream_sender, stream_receiver) = broadcast::channel(1000);
602
603 let file_writer = if config.log_to_file {
605 if let Some(ref path) = config.log_file_path {
606 let file = std::fs::OpenOptions::new()
607 .create(true)
608 .append(true)
609 .open(path)
610 .map_err(|e| RragError::storage("log_file_create", e))?;
611 Some(Arc::new(RwLock::new(file)))
612 } else {
613 None
614 }
615 } else {
616 None
617 };
618
619 Ok(Self {
620 config,
621 search_engine,
622 logger,
623 log_sender,
624 log_receiver: Arc::new(RwLock::new(Some(log_receiver))),
625 file_writer,
626 stream_sender,
627 _stream_receiver: stream_receiver,
628 processing_handle: Arc::new(RwLock::new(None)),
629 is_running: Arc::new(RwLock::new(false)),
630 })
631 }
632
633 pub async fn start(&self) -> RragResult<()> {
634 let mut running = self.is_running.write().await;
635 if *running {
636 return Err(RragError::config(
637 "log_aggregator",
638 "stopped",
639 "already running",
640 ));
641 }
642
643 {
644 let mut receiver_guard = self.log_receiver.write().await;
645 if let Some(receiver) = receiver_guard.take() {
646 let handle = self.start_processing_loop(receiver).await?;
647 let mut handle_guard = self.processing_handle.write().await;
648 *handle_guard = Some(handle);
649 }
650 }
651
652 *running = true;
653 tracing::info!("Log aggregator started");
654 Ok(())
655 }
656
657 pub async fn stop(&self) -> RragResult<()> {
658 let mut running = self.is_running.write().await;
659 if !*running {
660 return Ok(());
661 }
662
663 {
664 let mut handle_guard = self.processing_handle.write().await;
665 if let Some(handle) = handle_guard.take() {
666 handle.abort();
667 }
668 }
669
670 if let Some(ref file_writer) = self.file_writer {
672 let mut file = file_writer.write().await;
673 let _ = file.flush();
674 }
675
676 *running = false;
677 tracing::info!("Log aggregator stopped");
678 Ok(())
679 }
680
681 pub async fn is_healthy(&self) -> bool {
682 *self.is_running.read().await
683 }
684
685 async fn start_processing_loop(
686 &self,
687 mut receiver: mpsc::UnboundedReceiver<LogEntry>,
688 ) -> RragResult<tokio::task::JoinHandle<()>> {
689 let search_engine = self.search_engine.clone();
690 let file_writer = self.file_writer.clone();
691 let stream_sender = self.stream_sender.clone();
692 let config = self.config.clone();
693 let is_running = self.is_running.clone();
694
695 let handle = tokio::spawn(async move {
696 let mut flush_interval = tokio::time::interval(tokio::time::Duration::from_secs(
697 config.flush_interval_seconds,
698 ));
699
700 while *is_running.read().await {
701 tokio::select! {
702 Some(entry) = receiver.recv() => {
703 if entry.level >= config.level {
705 search_engine.add_entry(entry.clone()).await;
707
708 if let Some(ref writer) = file_writer {
710 let log_line = if config.structured_logging {
711 entry.to_json().unwrap_or_else(|_| entry.to_text())
712 } else {
713 entry.to_text()
714 };
715
716 let mut file = writer.write().await;
717 if writeln!(file, "{}", log_line).is_err() {
718 tracing::debug!("Failed to write to log file");
719 }
720 }
721
722 let _ = stream_sender.send(entry);
724 }
725 }
726 _ = flush_interval.tick() => {
727 if let Some(ref writer) = file_writer {
729 let mut file = writer.write().await;
730 let _ = file.flush();
731 }
732 }
733 }
734 }
735 });
736
737 Ok(handle)
738 }
739
740 pub fn logger(&self) -> &Arc<StructuredLogger> {
741 &self.logger
742 }
743
744 pub async fn search_logs(&self, query: &LogQuery) -> Vec<LogEntry> {
745 self.search_engine.search(query).await
746 }
747
748 pub async fn get_stats(&self) -> LogStats {
749 self.search_engine.get_log_stats().await
750 }
751
752 pub fn subscribe_to_stream(&self) -> broadcast::Receiver<LogEntry> {
753 self.stream_sender.subscribe()
754 }
755
756 pub async fn add_log_entry(&self, entry: LogEntry) -> RragResult<()> {
757 self.log_sender
758 .send(entry)
759 .map_err(|e| RragError::agent("log_aggregator", e.to_string()))?;
760 Ok(())
761 }
762
763 pub async fn log(
765 &self,
766 level: LogLevel,
767 message: impl Into<String>,
768 component: impl Into<String>,
769 ) -> RragResult<()> {
770 let entry = LogEntry::new(level, message, component);
771 self.add_log_entry(entry).await
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778
779 #[tokio::test]
780 async fn test_log_entry_creation() {
781 let entry = LogEntry::new(LogLevel::Info, "Test message", "test_component")
782 .with_operation("test_operation")
783 .with_user("user123")
784 .with_session("session456")
785 .with_field("custom_field", serde_json::json!("custom_value"))
786 .with_duration(150.5);
787
788 assert_eq!(entry.level, LogLevel::Info);
789 assert_eq!(entry.message, "Test message");
790 assert_eq!(entry.component, "test_component");
791 assert_eq!(entry.operation.as_ref().unwrap(), "test_operation");
792 assert_eq!(entry.user_id.as_ref().unwrap(), "user123");
793 assert_eq!(entry.session_id.as_ref().unwrap(), "session456");
794 assert_eq!(entry.duration_ms.unwrap(), 150.5);
795 assert!(entry.fields.contains_key("custom_field"));
796
797 let json_str = entry.to_json().unwrap();
799 assert!(json_str.contains("Test message"));
800 assert!(json_str.contains("INFO"));
801
802 let text_str = entry.to_text();
803 assert!(text_str.contains("Test message"));
804 assert!(text_str.contains("INFO"));
805 assert!(text_str.contains("test_component"));
806 }
807
808 #[tokio::test]
809 async fn test_log_search_engine() {
810 let engine = LogSearchEngine::new(1000);
811
812 let entries = vec![
814 LogEntry::new(LogLevel::Info, "Info message", "component1"),
815 LogEntry::new(LogLevel::Error, "Error message", "component1"),
816 LogEntry::new(LogLevel::Warn, "Warning message", "component2").with_user("user123"),
817 LogEntry::new(LogLevel::Debug, "Debug message", "component2"),
818 ];
819
820 for entry in entries {
821 engine.add_entry(entry).await;
822 }
823
824 let query = LogQuery {
826 level_filter: Some(LogLevel::Warn),
827 ..Default::default()
828 };
829 let results = engine.search(&query).await;
830 assert_eq!(results.len(), 2); let query = LogQuery {
834 component_filter: Some("component1".to_string()),
835 ..Default::default()
836 };
837 let results = engine.search(&query).await;
838 assert_eq!(results.len(), 2);
839
840 let query = LogQuery {
842 user_filter: Some("user123".to_string()),
843 ..Default::default()
844 };
845 let results = engine.search(&query).await;
846 assert_eq!(results.len(), 1);
847 assert_eq!(results[0].level, LogLevel::Warn);
848
849 let query = LogQuery {
851 message_contains: Some("Error".to_string()),
852 ..Default::default()
853 };
854 let results = engine.search(&query).await;
855 assert_eq!(results.len(), 1);
856 assert_eq!(results[0].level, LogLevel::Error);
857
858 let stats = engine.get_log_stats().await;
860 assert_eq!(stats.total_entries, 4);
861 assert_eq!(stats.entries_by_level[&LogLevel::Info], 1);
862 assert_eq!(stats.entries_by_level[&LogLevel::Error], 1);
863 assert_eq!(stats.entries_by_component["component1"], 2);
864 assert_eq!(stats.entries_by_component["component2"], 2);
865 }
866
867 #[tokio::test]
868 async fn test_log_aggregator() {
869 let config = LogConfig {
870 log_to_file: false, ..Default::default()
872 };
873
874 let mut aggregator = LogAggregator::new(config).await.unwrap();
875 assert!(!aggregator.is_healthy().await);
876
877 aggregator.start().await.unwrap();
878 assert!(aggregator.is_healthy().await);
879
880 let logger = aggregator.logger();
882 logger
883 .info("Test info message", "test_component")
884 .user("user123")
885 .operation("test_operation")
886 .field("test_field", serde_json::json!("test_value"))
887 .send();
888
889 logger
890 .error("Test error message", "test_component")
891 .session("session456")
892 .duration(200.0)
893 .send();
894
895 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
897
898 let query = LogQuery {
900 level_filter: Some(LogLevel::Info),
901 ..Default::default()
902 };
903 let results = aggregator.search_logs(&query).await;
904 assert!(results.len() >= 2);
905
906 let stats = aggregator.get_stats().await;
907 assert!(stats.total_entries >= 2);
908
909 aggregator.stop().await.unwrap();
910 assert!(!aggregator.is_healthy().await);
911 }
912
913 #[test]
914 fn test_log_level_ordering() {
915 assert!(LogLevel::Fatal > LogLevel::Error);
916 assert!(LogLevel::Error > LogLevel::Warn);
917 assert!(LogLevel::Warn > LogLevel::Info);
918 assert!(LogLevel::Info > LogLevel::Debug);
919 assert!(LogLevel::Debug > LogLevel::Trace);
920 }
921
922 #[test]
923 fn test_log_level_from_string() {
924 assert_eq!(LogLevel::from("INFO"), LogLevel::Info);
925 assert_eq!(LogLevel::from("info"), LogLevel::Info);
926 assert_eq!(LogLevel::from("ERROR"), LogLevel::Error);
927 assert_eq!(LogLevel::from("WARN"), LogLevel::Warn);
928 assert_eq!(LogLevel::from("WARNING"), LogLevel::Warn);
929 assert_eq!(LogLevel::from("FATAL"), LogLevel::Fatal);
930 assert_eq!(LogLevel::from("unknown"), LogLevel::Info); }
932
933 #[tokio::test]
934 async fn test_field_filters() {
935 let engine = LogSearchEngine::new(1000);
936
937 let entry = LogEntry::new(LogLevel::Info, "Test message", "component")
938 .with_field("number", serde_json::json!(42))
939 .with_field("text", serde_json::json!("hello world"))
940 .with_field("decimal", serde_json::json!(3.14));
941
942 engine.add_entry(entry).await;
943
944 let mut query = LogQuery::default();
946 query.field_filters.insert(
947 "number".to_string(),
948 FieldFilter::Equals(serde_json::json!(42)),
949 );
950 let results = engine.search(&query).await;
951 assert_eq!(results.len(), 1);
952
953 let mut query = LogQuery::default();
955 query.field_filters.insert(
956 "text".to_string(),
957 FieldFilter::Contains("hello".to_string()),
958 );
959 let results = engine.search(&query).await;
960 assert_eq!(results.len(), 1);
961
962 let mut query = LogQuery::default();
964 query
965 .field_filters
966 .insert("number".to_string(), FieldFilter::GreaterThan(40.0));
967 let results = engine.search(&query).await;
968 assert_eq!(results.len(), 1);
969
970 let mut query = LogQuery::default();
972 query
973 .field_filters
974 .insert("decimal".to_string(), FieldFilter::Between(3.0, 4.0));
975 let results = engine.search(&query).await;
976 assert_eq!(results.len(), 1);
977 }
978
979 #[tokio::test]
980 async fn test_log_streaming() {
981 let config = LogConfig {
982 log_to_file: false,
983 ..Default::default()
984 };
985
986 let mut aggregator = LogAggregator::new(config).await.unwrap();
987 aggregator.start().await.unwrap();
988
989 let mut stream = aggregator.subscribe_to_stream();
990
991 let entry = LogEntry::new(LogLevel::Info, "Stream test", "test_component");
993 aggregator.add_log_entry(entry.clone()).await.unwrap();
994
995 let received = tokio::time::timeout(tokio::time::Duration::from_millis(100), stream.recv())
997 .await
998 .unwrap()
999 .unwrap();
1000
1001 assert_eq!(received.message, "Stream test");
1002 assert_eq!(received.component, "test_component");
1003
1004 aggregator.stop().await.unwrap();
1005 }
1006}