1use crate::sanitization::get_sanitizer;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{RwLock, mpsc};
17use tracing::{error, info};
18use uuid::Uuid;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct AggregationConfig {
23 pub enabled: bool,
25
26 pub buffer_size: usize,
28
29 pub batch_size: usize,
31
32 pub batch_timeout_ms: u64,
34
35 pub deduplication_enabled: bool,
37
38 pub deduplication_window_secs: u64,
40
41 pub max_entry_size_bytes: usize,
43
44 pub destinations: Vec<LogDestination>,
46
47 pub compression_enabled: bool,
49
50 pub retry_config: RetryConfig,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(tag = "type")]
57pub enum LogDestination {
58 Http {
60 url: String,
61 headers: HashMap<String, String>,
62 timeout_secs: u64,
63 },
64 Syslog {
66 host: String,
67 port: u16,
68 protocol: SyslogProtocol,
69 facility: u8,
70 },
71 File {
73 path: String,
74 rotation_size_mb: u64,
75 max_files: usize,
76 },
77 Elasticsearch {
79 urls: Vec<String>,
80 index_pattern: String,
81 username: Option<String>,
82 password: Option<String>,
83 },
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "lowercase")]
89pub enum SyslogProtocol {
90 Udp,
91 Tcp,
92 Tls,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct RetryConfig {
98 pub max_attempts: u32,
100
101 pub initial_delay_ms: u64,
103
104 pub max_delay_ms: u64,
106
107 pub backoff_multiplier: f64,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct LogEntry {
114 pub id: Uuid,
116
117 pub timestamp: DateTime<Utc>,
119
120 pub level: String,
122
123 pub source: String,
125
126 pub message: String,
128
129 pub fields: HashMap<String, serde_json::Value>,
131
132 pub request_id: Option<String>,
134
135 pub correlation_id: Option<String>,
137
138 pub service: String,
140
141 pub environment: Option<String>,
143}
144
145pub struct LogAggregator {
147 config: AggregationConfig,
148 buffer: Arc<RwLock<VecDeque<LogEntry>>>,
149 dedup_cache: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
150 tx: mpsc::Sender<LogEntry>,
151 rx: Arc<RwLock<mpsc::Receiver<LogEntry>>>,
152}
153
154impl LogAggregator {
155 pub fn new(config: AggregationConfig) -> Self {
157 let (tx, rx) = mpsc::channel(config.buffer_size);
158 let buffer_size = config.buffer_size;
159
160 Self {
161 config,
162 buffer: Arc::new(RwLock::new(VecDeque::with_capacity(buffer_size))),
163 dedup_cache: Arc::new(RwLock::new(HashMap::new())),
164 tx,
165 rx: Arc::new(RwLock::new(rx)),
166 }
167 }
168
169 pub async fn start(&self) {
171 if !self.config.enabled {
172 info!("Log aggregation is disabled");
173 return;
174 }
175
176 info!("Starting log aggregation service");
177
178 let buffer = self.buffer.clone();
180 let config = self.config.clone();
181 let rx = self.rx.clone();
182
183 tokio::spawn(async move {
184 Self::process_logs(buffer, config, rx).await;
185 });
186
187 if self.config.deduplication_enabled {
189 let dedup_cache = self.dedup_cache.clone();
190 let window_secs = self.config.deduplication_window_secs;
191
192 tokio::spawn(async move {
193 Self::cleanup_dedup_cache(dedup_cache, window_secs).await;
194 });
195 }
196 }
197
198 pub async fn submit(&self, entry: LogEntry) -> Result<(), AggregationError> {
200 if !self.config.enabled {
201 return Ok(());
202 }
203
204 let entry_size = serde_json::to_vec(&entry)?.len();
206 if entry_size > self.config.max_entry_size_bytes {
207 return Err(AggregationError::EntryTooLarge {
208 size: entry_size,
209 max: self.config.max_entry_size_bytes,
210 });
211 }
212
213 if self.config.deduplication_enabled {
215 let hash = Self::compute_entry_hash(&entry);
216 let mut cache = self.dedup_cache.write().await;
217
218 if let Some(last_seen) = cache.get(&hash) {
219 let age = Utc::now().signed_duration_since(*last_seen);
220 if age.num_seconds() < self.config.deduplication_window_secs as i64 {
221 return Ok(()); }
223 }
224
225 cache.insert(hash, entry.timestamp);
226 }
227
228 let sanitized = self.sanitize_entry(entry);
230
231 self.tx
233 .send(sanitized)
234 .await
235 .map_err(|_| AggregationError::BufferFull)?;
236
237 Ok(())
238 }
239
240 async fn process_logs(
242 _buffer: Arc<RwLock<VecDeque<LogEntry>>>,
243 config: AggregationConfig,
244 rx: Arc<RwLock<mpsc::Receiver<LogEntry>>>,
245 ) {
246 let mut interval = tokio::time::interval(Duration::from_millis(config.batch_timeout_ms));
247 let mut batch = Vec::with_capacity(config.batch_size);
248
249 loop {
250 tokio::select! {
251 _ = interval.tick() => {
252 if !batch.is_empty() {
253 Self::forward_batch(&batch, &config).await;
254 batch.clear();
255 }
256 }
257 Some(entry) = async {
258 let mut rx_guard = rx.write().await;
259 rx_guard.recv().await
260 } => {
261 batch.push(entry);
262
263 if batch.len() >= config.batch_size {
264 Self::forward_batch(&batch, &config).await;
265 batch.clear();
266 }
267 }
268 }
269 }
270 }
271
272 async fn forward_batch(batch: &[LogEntry], config: &AggregationConfig) {
274 let compressed = if config.compression_enabled {
275 match Self::compress_batch(batch) {
276 Ok(data) => Some(data),
277 Err(e) => {
278 error!("Failed to compress batch: {}", e);
279 None
280 }
281 }
282 } else {
283 None
284 };
285
286 for destination in &config.destinations {
287 let result = match destination {
288 LogDestination::Http {
289 url,
290 headers,
291 timeout_secs,
292 } => {
293 Self::forward_to_http(batch, compressed.as_ref(), url, headers, *timeout_secs)
294 .await
295 }
296 LogDestination::Syslog {
297 host,
298 port,
299 protocol,
300 facility,
301 } => Self::forward_to_syslog(batch, host, *port, protocol, *facility).await,
302 LogDestination::File { path, .. } => Self::forward_to_file(batch, path).await,
303 LogDestination::Elasticsearch {
304 urls,
305 index_pattern,
306 username,
307 password,
308 } => {
309 Self::forward_to_elasticsearch(
310 batch,
311 urls,
312 index_pattern,
313 username.as_ref(),
314 password.as_ref(),
315 )
316 .await
317 }
318 };
319
320 if let Err(e) = result {
321 error!("Failed to forward logs to {:?}: {}", destination, e);
322 }
324 }
325 }
326
327 async fn forward_to_http(
329 batch: &[LogEntry],
330 _compressed: Option<&Vec<u8>>,
331 url: &str,
332 _headers: &HashMap<String, String>,
333 _timeout_secs: u64,
334 ) -> Result<(), AggregationError> {
335 info!("Forwarding {} logs to HTTP endpoint: {}", batch.len(), url);
338 Ok(())
339 }
340
341 async fn forward_to_syslog(
343 batch: &[LogEntry],
344 host: &str,
345 port: u16,
346 _protocol: &SyslogProtocol,
347 _facility: u8,
348 ) -> Result<(), AggregationError> {
349 info!(
352 "Forwarding {} logs to syslog {}:{}",
353 batch.len(),
354 host,
355 port
356 );
357 Ok(())
358 }
359
360 async fn forward_to_file(batch: &[LogEntry], path: &str) -> Result<(), AggregationError> {
362 use tokio::io::AsyncWriteExt;
363
364 let mut file = tokio::fs::OpenOptions::new()
365 .create(true)
366 .append(true)
367 .open(path)
368 .await?;
369
370 for entry in batch {
371 let line = serde_json::to_string(entry)?;
372 file.write_all(line.as_bytes()).await?;
373 file.write_all(b"\n").await?;
374 }
375
376 file.flush().await?;
377 Ok(())
378 }
379
380 async fn forward_to_elasticsearch(
382 batch: &[LogEntry],
383 _urls: &[String],
384 index_pattern: &str,
385 _username: Option<&String>,
386 _password: Option<&String>,
387 ) -> Result<(), AggregationError> {
388 info!(
391 "Forwarding {} logs to Elasticsearch index: {}",
392 batch.len(),
393 index_pattern
394 );
395 Ok(())
396 }
397
398 fn compress_batch(batch: &[LogEntry]) -> Result<Vec<u8>, AggregationError> {
400 let json = serde_json::to_vec(batch)?;
403 Ok(json)
404 }
405
406 fn compute_entry_hash(entry: &LogEntry) -> String {
408 use std::collections::hash_map::DefaultHasher;
409 use std::hash::{Hash, Hasher};
410
411 let mut hasher = DefaultHasher::new();
412 entry.level.hash(&mut hasher);
413 entry.source.hash(&mut hasher);
414 entry.message.hash(&mut hasher);
415 entry.service.hash(&mut hasher);
416
417 format!("{:x}", hasher.finish())
418 }
419
420 async fn cleanup_dedup_cache(
422 cache: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
423 window_secs: u64,
424 ) {
425 let mut interval = tokio::time::interval(Duration::from_secs(60)); loop {
428 interval.tick().await;
429
430 let cutoff = Utc::now() - chrono::Duration::seconds(window_secs as i64);
431 let mut cache = cache.write().await;
432
433 cache.retain(|_, timestamp| *timestamp > cutoff);
434 }
435 }
436
437 fn sanitize_entry(&self, mut entry: LogEntry) -> LogEntry {
439 let sanitizer = get_sanitizer();
440
441 entry.message = sanitizer.sanitize(&entry.message);
443
444 for (_, value) in entry.fields.iter_mut() {
446 if let serde_json::Value::String(s) = value {
447 *s = sanitizer.sanitize(s);
448 }
449 }
450
451 entry
452 }
453}
454
455impl Default for AggregationConfig {
456 fn default() -> Self {
457 Self {
458 enabled: true,
459 buffer_size: 10000,
460 batch_size: 100,
461 batch_timeout_ms: 5000,
462 deduplication_enabled: true,
463 deduplication_window_secs: 60,
464 max_entry_size_bytes: 1_048_576, destinations: vec![],
466 compression_enabled: true,
467 retry_config: RetryConfig {
468 max_attempts: 3,
469 initial_delay_ms: 1000,
470 max_delay_ms: 30000,
471 backoff_multiplier: 2.0,
472 },
473 }
474 }
475}
476
477#[derive(Debug, thiserror::Error)]
479pub enum AggregationError {
480 #[error("Log entry too large: {size} bytes (max: {max})")]
481 EntryTooLarge { size: usize, max: usize },
482
483 #[error("Buffer full")]
484 BufferFull,
485
486 #[error("Serialization error: {0}")]
487 Serialization(#[from] serde_json::Error),
488
489 #[error("IO error: {0}")]
490 Io(#[from] std::io::Error),
491
492 #[error("Forward error: {0}")]
493 Forward(String),
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 #[tokio::test]
501 async fn test_log_aggregator_creation() {
502 let config = AggregationConfig::default();
503 let aggregator = LogAggregator::new(config);
504
505 assert!(aggregator.tx.capacity() > 0);
506 }
507
508 #[tokio::test]
509 async fn test_log_entry_submission() {
510 let config = AggregationConfig {
511 deduplication_enabled: false,
512 ..Default::default()
513 };
514
515 let aggregator = LogAggregator::new(config);
516
517 let entry = LogEntry {
518 id: Uuid::new_v4(),
519 timestamp: Utc::now(),
520 level: "INFO".to_string(),
521 source: "test-server".to_string(),
522 message: "Test log message".to_string(),
523 fields: HashMap::new(),
524 request_id: None,
525 correlation_id: None,
526 service: "test-service".to_string(),
527 environment: Some("test".to_string()),
528 };
529
530 let result = aggregator.submit(entry).await;
531 assert!(result.is_ok());
532 }
533
534 #[test]
535 fn test_entry_hash_computation() {
536 let entry1 = LogEntry {
537 id: Uuid::new_v4(),
538 timestamp: Utc::now(),
539 level: "INFO".to_string(),
540 source: "server1".to_string(),
541 message: "Test message".to_string(),
542 fields: HashMap::new(),
543 request_id: None,
544 correlation_id: None,
545 service: "test".to_string(),
546 environment: None,
547 };
548
549 let mut entry2 = entry1.clone();
550 entry2.id = Uuid::new_v4(); entry2.timestamp = Utc::now(); let hash1 = LogAggregator::compute_entry_hash(&entry1);
555 let hash2 = LogAggregator::compute_entry_hash(&entry2);
556 assert_eq!(hash1, hash2);
557
558 entry2.message = "Different message".to_string();
560 let hash3 = LogAggregator::compute_entry_hash(&entry2);
561 assert_ne!(hash1, hash3);
562 }
563}