pulseengine_mcp_logging/
aggregation.rs

1//! Log aggregation and centralized logging for distributed MCP servers
2//!
3//! This module provides:
4//! - Multi-source log collection
5//! - Log buffering and batching
6//! - Centralized log forwarding
7//! - Log deduplication
8//! - Structured log parsing
9
10use crate::sanitization::get_sanitizer;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{RwLock, mpsc};
17use tracing::{error, info};
18use uuid::Uuid;
19
20/// Configuration for log aggregation
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct AggregationConfig {
23    /// Enable log aggregation
24    pub enabled: bool,
25
26    /// Buffer size for log entries
27    pub buffer_size: usize,
28
29    /// Batch size for forwarding
30    pub batch_size: usize,
31
32    /// Batch timeout in milliseconds
33    pub batch_timeout_ms: u64,
34
35    /// Enable log deduplication
36    pub deduplication_enabled: bool,
37
38    /// Deduplication window in seconds
39    pub deduplication_window_secs: u64,
40
41    /// Maximum log entry size in bytes
42    pub max_entry_size_bytes: usize,
43
44    /// Forwarding destinations
45    pub destinations: Vec<LogDestination>,
46
47    /// Enable compression for forwarding
48    pub compression_enabled: bool,
49
50    /// Retry configuration
51    pub retry_config: RetryConfig,
52}
53
54/// Log forwarding destination
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(tag = "type")]
57pub enum LogDestination {
58    /// HTTP/HTTPS endpoint
59    Http {
60        url: String,
61        headers: HashMap<String, String>,
62        timeout_secs: u64,
63    },
64    /// Syslog server
65    Syslog {
66        host: String,
67        port: u16,
68        protocol: SyslogProtocol,
69        facility: u8,
70    },
71    /// File destination
72    File {
73        path: String,
74        rotation_size_mb: u64,
75        max_files: usize,
76    },
77    /// Elasticsearch
78    Elasticsearch {
79        urls: Vec<String>,
80        index_pattern: String,
81        username: Option<String>,
82        password: Option<String>,
83    },
84}
85
86/// Syslog protocol
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "lowercase")]
89pub enum SyslogProtocol {
90    Udp,
91    Tcp,
92    Tls,
93}
94
95/// Retry configuration
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct RetryConfig {
98    /// Maximum retry attempts
99    pub max_attempts: u32,
100
101    /// Initial retry delay in milliseconds
102    pub initial_delay_ms: u64,
103
104    /// Maximum retry delay in milliseconds
105    pub max_delay_ms: u64,
106
107    /// Exponential backoff multiplier
108    pub backoff_multiplier: f64,
109}
110
111/// Aggregated log entry
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct LogEntry {
114    /// Unique log ID
115    pub id: Uuid,
116
117    /// Timestamp
118    pub timestamp: DateTime<Utc>,
119
120    /// Log level
121    pub level: String,
122
123    /// Source server/instance
124    pub source: String,
125
126    /// Log message
127    pub message: String,
128
129    /// Structured fields
130    pub fields: HashMap<String, serde_json::Value>,
131
132    /// Request context
133    pub request_id: Option<String>,
134
135    /// Correlation ID for distributed tracing
136    pub correlation_id: Option<String>,
137
138    /// Service name
139    pub service: String,
140
141    /// Environment (dev, staging, prod)
142    pub environment: Option<String>,
143}
144
145/// Log aggregator
146pub struct LogAggregator {
147    config: AggregationConfig,
148    buffer: Arc<RwLock<VecDeque<LogEntry>>>,
149    dedup_cache: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
150    tx: mpsc::Sender<LogEntry>,
151    rx: Arc<RwLock<mpsc::Receiver<LogEntry>>>,
152}
153
154impl LogAggregator {
155    /// Create a new log aggregator
156    pub fn new(config: AggregationConfig) -> Self {
157        let (tx, rx) = mpsc::channel(config.buffer_size);
158        let buffer_size = config.buffer_size;
159
160        Self {
161            config,
162            buffer: Arc::new(RwLock::new(VecDeque::with_capacity(buffer_size))),
163            dedup_cache: Arc::new(RwLock::new(HashMap::new())),
164            tx,
165            rx: Arc::new(RwLock::new(rx)),
166        }
167    }
168
169    /// Start the aggregation service
170    pub async fn start(&self) {
171        if !self.config.enabled {
172            info!("Log aggregation is disabled");
173            return;
174        }
175
176        info!("Starting log aggregation service");
177
178        // Start buffer processor
179        let buffer = self.buffer.clone();
180        let config = self.config.clone();
181        let rx = self.rx.clone();
182
183        tokio::spawn(async move {
184            Self::process_logs(buffer, config, rx).await;
185        });
186
187        // Start deduplication cache cleanup
188        if self.config.deduplication_enabled {
189            let dedup_cache = self.dedup_cache.clone();
190            let window_secs = self.config.deduplication_window_secs;
191
192            tokio::spawn(async move {
193                Self::cleanup_dedup_cache(dedup_cache, window_secs).await;
194            });
195        }
196    }
197
198    /// Submit a log entry
199    pub async fn submit(&self, entry: LogEntry) -> Result<(), AggregationError> {
200        if !self.config.enabled {
201            return Ok(());
202        }
203
204        // Check entry size
205        let entry_size = serde_json::to_vec(&entry)?.len();
206        if entry_size > self.config.max_entry_size_bytes {
207            return Err(AggregationError::EntryTooLarge {
208                size: entry_size,
209                max: self.config.max_entry_size_bytes,
210            });
211        }
212
213        // Check deduplication
214        if self.config.deduplication_enabled {
215            let hash = Self::compute_entry_hash(&entry);
216            let mut cache = self.dedup_cache.write().await;
217
218            if let Some(last_seen) = cache.get(&hash) {
219                let age = Utc::now().signed_duration_since(*last_seen);
220                if age.num_seconds() < self.config.deduplication_window_secs as i64 {
221                    return Ok(()); // Duplicate, skip
222                }
223            }
224
225            cache.insert(hash, entry.timestamp);
226        }
227
228        // Sanitize log entry
229        let sanitized = self.sanitize_entry(entry);
230
231        // Send to buffer
232        self.tx
233            .send(sanitized)
234            .await
235            .map_err(|_| AggregationError::BufferFull)?;
236
237        Ok(())
238    }
239
240    /// Process logs from the buffer
241    async fn process_logs(
242        _buffer: Arc<RwLock<VecDeque<LogEntry>>>,
243        config: AggregationConfig,
244        rx: Arc<RwLock<mpsc::Receiver<LogEntry>>>,
245    ) {
246        let mut interval = tokio::time::interval(Duration::from_millis(config.batch_timeout_ms));
247        let mut batch = Vec::with_capacity(config.batch_size);
248
249        loop {
250            tokio::select! {
251                _ = interval.tick() => {
252                    if !batch.is_empty() {
253                        Self::forward_batch(&batch, &config).await;
254                        batch.clear();
255                    }
256                }
257                Some(entry) = async {
258                    let mut rx_guard = rx.write().await;
259                    rx_guard.recv().await
260                } => {
261                    batch.push(entry);
262
263                    if batch.len() >= config.batch_size {
264                        Self::forward_batch(&batch, &config).await;
265                        batch.clear();
266                    }
267                }
268            }
269        }
270    }
271
272    /// Forward a batch of logs to destinations
273    async fn forward_batch(batch: &[LogEntry], config: &AggregationConfig) {
274        let compressed = if config.compression_enabled {
275            match Self::compress_batch(batch) {
276                Ok(data) => Some(data),
277                Err(e) => {
278                    error!("Failed to compress batch: {}", e);
279                    None
280                }
281            }
282        } else {
283            None
284        };
285
286        for destination in &config.destinations {
287            let result = match destination {
288                LogDestination::Http {
289                    url,
290                    headers,
291                    timeout_secs,
292                } => {
293                    Self::forward_to_http(batch, compressed.as_ref(), url, headers, *timeout_secs)
294                        .await
295                }
296                LogDestination::Syslog {
297                    host,
298                    port,
299                    protocol,
300                    facility,
301                } => Self::forward_to_syslog(batch, host, *port, protocol, *facility).await,
302                LogDestination::File { path, .. } => Self::forward_to_file(batch, path).await,
303                LogDestination::Elasticsearch {
304                    urls,
305                    index_pattern,
306                    username,
307                    password,
308                } => {
309                    Self::forward_to_elasticsearch(
310                        batch,
311                        urls,
312                        index_pattern,
313                        username.as_ref(),
314                        password.as_ref(),
315                    )
316                    .await
317                }
318            };
319
320            if let Err(e) = result {
321                error!("Failed to forward logs to {:?}: {}", destination, e);
322                // TODO: Implement retry logic based on config.retry_config
323            }
324        }
325    }
326
327    /// Forward logs to HTTP endpoint
328    async fn forward_to_http(
329        batch: &[LogEntry],
330        _compressed: Option<&Vec<u8>>,
331        url: &str,
332        _headers: &HashMap<String, String>,
333        _timeout_secs: u64,
334    ) -> Result<(), AggregationError> {
335        // Note: This is a placeholder implementation
336        // In a real implementation, you would use reqwest or similar
337        info!("Forwarding {} logs to HTTP endpoint: {}", batch.len(), url);
338        Ok(())
339    }
340
341    /// Forward logs to syslog
342    async fn forward_to_syslog(
343        batch: &[LogEntry],
344        host: &str,
345        port: u16,
346        _protocol: &SyslogProtocol,
347        _facility: u8,
348    ) -> Result<(), AggregationError> {
349        // Note: This is a placeholder implementation
350        // In a real implementation, you would use a syslog client
351        info!(
352            "Forwarding {} logs to syslog {}:{}",
353            batch.len(),
354            host,
355            port
356        );
357        Ok(())
358    }
359
360    /// Forward logs to file
361    async fn forward_to_file(batch: &[LogEntry], path: &str) -> Result<(), AggregationError> {
362        use tokio::io::AsyncWriteExt;
363
364        let mut file = tokio::fs::OpenOptions::new()
365            .create(true)
366            .append(true)
367            .open(path)
368            .await?;
369
370        for entry in batch {
371            let line = serde_json::to_string(entry)?;
372            file.write_all(line.as_bytes()).await?;
373            file.write_all(b"\n").await?;
374        }
375
376        file.flush().await?;
377        Ok(())
378    }
379
380    /// Forward logs to Elasticsearch
381    async fn forward_to_elasticsearch(
382        batch: &[LogEntry],
383        _urls: &[String],
384        index_pattern: &str,
385        _username: Option<&String>,
386        _password: Option<&String>,
387    ) -> Result<(), AggregationError> {
388        // Note: This is a placeholder implementation
389        // In a real implementation, you would use an Elasticsearch client
390        info!(
391            "Forwarding {} logs to Elasticsearch index: {}",
392            batch.len(),
393            index_pattern
394        );
395        Ok(())
396    }
397
398    /// Compress a batch of logs
399    fn compress_batch(batch: &[LogEntry]) -> Result<Vec<u8>, AggregationError> {
400        // Note: Using a simple JSON serialization for now
401        // In a real implementation, you might use gzip or zstd
402        let json = serde_json::to_vec(batch)?;
403        Ok(json)
404    }
405
406    /// Compute hash for deduplication
407    fn compute_entry_hash(entry: &LogEntry) -> String {
408        use std::collections::hash_map::DefaultHasher;
409        use std::hash::{Hash, Hasher};
410
411        let mut hasher = DefaultHasher::new();
412        entry.level.hash(&mut hasher);
413        entry.source.hash(&mut hasher);
414        entry.message.hash(&mut hasher);
415        entry.service.hash(&mut hasher);
416
417        format!("{:x}", hasher.finish())
418    }
419
420    /// Cleanup old entries from deduplication cache
421    async fn cleanup_dedup_cache(
422        cache: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
423        window_secs: u64,
424    ) {
425        let mut interval = tokio::time::interval(Duration::from_secs(60)); // Cleanup every minute
426
427        loop {
428            interval.tick().await;
429
430            let cutoff = Utc::now() - chrono::Duration::seconds(window_secs as i64);
431            let mut cache = cache.write().await;
432
433            cache.retain(|_, timestamp| *timestamp > cutoff);
434        }
435    }
436
437    /// Sanitize log entry
438    fn sanitize_entry(&self, mut entry: LogEntry) -> LogEntry {
439        let sanitizer = get_sanitizer();
440
441        // Sanitize message
442        entry.message = sanitizer.sanitize(&entry.message);
443
444        // Sanitize fields
445        for (_, value) in entry.fields.iter_mut() {
446            if let serde_json::Value::String(s) = value {
447                *s = sanitizer.sanitize(s);
448            }
449        }
450
451        entry
452    }
453}
454
455impl Default for AggregationConfig {
456    fn default() -> Self {
457        Self {
458            enabled: true,
459            buffer_size: 10000,
460            batch_size: 100,
461            batch_timeout_ms: 5000,
462            deduplication_enabled: true,
463            deduplication_window_secs: 60,
464            max_entry_size_bytes: 1_048_576, // 1MB
465            destinations: vec![],
466            compression_enabled: true,
467            retry_config: RetryConfig {
468                max_attempts: 3,
469                initial_delay_ms: 1000,
470                max_delay_ms: 30000,
471                backoff_multiplier: 2.0,
472            },
473        }
474    }
475}
476
477/// Aggregation errors
478#[derive(Debug, thiserror::Error)]
479pub enum AggregationError {
480    #[error("Log entry too large: {size} bytes (max: {max})")]
481    EntryTooLarge { size: usize, max: usize },
482
483    #[error("Buffer full")]
484    BufferFull,
485
486    #[error("Serialization error: {0}")]
487    Serialization(#[from] serde_json::Error),
488
489    #[error("IO error: {0}")]
490    Io(#[from] std::io::Error),
491
492    #[error("Forward error: {0}")]
493    Forward(String),
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499
500    #[tokio::test]
501    async fn test_log_aggregator_creation() {
502        let config = AggregationConfig::default();
503        let aggregator = LogAggregator::new(config);
504
505        assert!(aggregator.tx.capacity() > 0);
506    }
507
508    #[tokio::test]
509    async fn test_log_entry_submission() {
510        let config = AggregationConfig {
511            deduplication_enabled: false,
512            ..Default::default()
513        };
514
515        let aggregator = LogAggregator::new(config);
516
517        let entry = LogEntry {
518            id: Uuid::new_v4(),
519            timestamp: Utc::now(),
520            level: "INFO".to_string(),
521            source: "test-server".to_string(),
522            message: "Test log message".to_string(),
523            fields: HashMap::new(),
524            request_id: None,
525            correlation_id: None,
526            service: "test-service".to_string(),
527            environment: Some("test".to_string()),
528        };
529
530        let result = aggregator.submit(entry).await;
531        assert!(result.is_ok());
532    }
533
534    #[test]
535    fn test_entry_hash_computation() {
536        let entry1 = LogEntry {
537            id: Uuid::new_v4(),
538            timestamp: Utc::now(),
539            level: "INFO".to_string(),
540            source: "server1".to_string(),
541            message: "Test message".to_string(),
542            fields: HashMap::new(),
543            request_id: None,
544            correlation_id: None,
545            service: "test".to_string(),
546            environment: None,
547        };
548
549        let mut entry2 = entry1.clone();
550        entry2.id = Uuid::new_v4(); // Different ID
551        entry2.timestamp = Utc::now(); // Different timestamp
552
553        // Same content should produce same hash
554        let hash1 = LogAggregator::compute_entry_hash(&entry1);
555        let hash2 = LogAggregator::compute_entry_hash(&entry2);
556        assert_eq!(hash1, hash2);
557
558        // Different message should produce different hash
559        entry2.message = "Different message".to_string();
560        let hash3 = LogAggregator::compute_entry_hash(&entry2);
561        assert_ne!(hash1, hash3);
562    }
563}