Skip to main content

sqltool/core/
log_table.rs

1use crate::databases::DatabaseConnection;
2use anyhow::{Result, anyhow};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct LogTableConfig {
8    pub table_name: String,
9    pub partition_type: PartitionType,
10    pub retention_days: u32,
11    pub max_partition_size_gb: f64,
12    pub auto_create_partition: bool,
13    pub compression_enabled: bool,
14    pub indexed_fields: Vec<String>,
15    pub partition_prefix: String,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum PartitionType {
20    Daily,
21    Weekly,
22    Monthly,
23    Hourly,
24    SizeBased { max_size_gb: f64 },
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct LogPartition {
29    pub partition_name: String,
30    pub table_name: String,
31    pub start_date: String,
32    pub end_date: Option<String>,
33    pub row_count: u64,
34    pub size_mb: f64,
35    pub is_active: bool,
36    pub compressed: bool,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct LogQuery {
41    pub table_name: String,
42    pub start_time: Option<i64>,
43    pub end_time: Option<i64>,
44    pub level_filter: Vec<String>,
45    pub keyword_filter: Option<String>,
46    pub limit: u64,
47    pub offset: u64,
48    pub order_desc: bool,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct LogQueryResult {
53    pub total_matched: u64,
54    pub results: Vec<serde_json::Value>,
55    pub partitions_queried: Vec<String>,
56    pub query_time_ms: u64,
57    pub has_more: bool,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct LogStats {
62    pub table_name: String,
63    pub total_partitions: usize,
64    pub active_partitions: usize,
65    pub total_size_gb: f64,
66    pub total_rows: u64,
67    pub compression_ratio: f64,
68    pub oldest_partition: Option<String>,
69    pub newest_partition: Option<String>,
70}
71
72pub struct LogTableManager {
73    connection: Box<dyn DatabaseConnection>,
74    configs: HashMap<String, LogTableConfig>,
75    partitions: HashMap<String, Vec<LogPartition>>,
76}
77
78impl LogTableManager {
79    pub fn new(connection: Box<dyn DatabaseConnection>) -> Self {
80        Self {
81            connection,
82            configs: HashMap::new(),
83            partitions: HashMap::new(),
84        }
85    }
86
87    pub fn register_log_table(&mut self, config: LogTableConfig) {
88        self.configs.insert(config.table_name.clone(), config);
89    }
90
91    pub async fn create_log_table(&mut self, table_name: &str) -> Result<()> {
92        let create_sql = format!(
93            r#"
94            CREATE TABLE IF NOT EXISTS {} (
95                id BIGINT AUTO_INCREMENT PRIMARY KEY,
96                timestamp DATETIME NOT NULL,
97                level VARCHAR(20) NOT NULL,
98                message TEXT,
99                context JSON,
100                source VARCHAR(255),
101                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
102                INDEX idx_timestamp (timestamp),
103                INDEX idx_level (level),
104                INDEX idx_created_at (created_at)
105            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
106            "#,
107            table_name
108        );
109
110        self.connection.execute(&create_sql).await?;
111
112        let initial_partition = LogPartition {
113            partition_name: format!("{}_p0", table_name),
114            table_name: table_name.to_string(),
115            start_date: chrono::Utc::now().format("%Y-%m-%d").to_string(),
116            end_date: None,
117            row_count: 0,
118            size_mb: 0.0,
119            is_active: true,
120            compressed: false,
121        };
122
123        self.partitions
124            .entry(table_name.to_string())
125            .or_insert_with(Vec::new)
126            .push(initial_partition);
127
128        Ok(())
129    }
130
131    pub async fn insert_log(&mut self, table_name: &str, level: &str, message: &str, context: Option<serde_json::Value>, source: Option<&str>) -> Result<()> {
132        let _config = self.configs.get(table_name)
133            .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
134
135        let active_partition = self.get_active_partition(table_name).await?;
136
137        let context_json = context.map(|c| serde_json::to_string(&c).unwrap_or_default())
138            .unwrap_or_else(|| "null".to_string());
139
140        let insert_sql = format!(
141            "INSERT INTO {} (timestamp, level, message, context, source) VALUES (NOW(), '{}', '{}', '{}', '{}')",
142            active_partition,
143            level,
144            message.replace('\'', "''"),
145            context_json.replace('\'', "''"),
146            source.unwrap_or("unknown").replace('\'', "''")
147        );
148
149        self.connection.execute(&insert_sql).await?;
150
151        self.update_partition_stats(table_name, &active_partition).await?;
152
153        self.check_and_create_partition(table_name).await?;
154
155        Ok(())
156    }
157
158    pub async fn batch_insert_logs(&mut self, table_name: &str, logs: Vec<LogEntry>) -> Result<u64> {
159        let _config = self.configs.get(table_name)
160            .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
161
162        let active_partition = self.get_active_partition(table_name).await?;
163        let mut inserted = 0u64;
164
165        for log in logs {
166            let context_json = log.context.map(|c| serde_json::to_string(&c).unwrap_or_default())
167                .unwrap_or_else(|| "null".to_string());
168
169            let insert_sql = format!(
170                "INSERT INTO {} (timestamp, level, message, context, source) VALUES (NOW(), '{}', '{}', '{}', '{}')",
171                active_partition,
172                log.level,
173                log.message.replace('\'', "''"),
174                context_json.replace('\'', "''"),
175                log.source.unwrap_or_else(|| "unknown".to_string()).replace('\'', "''")
176            );
177
178            if self.connection.execute(&insert_sql).await.is_ok() {
179                inserted += 1;
180            }
181        }
182
183        self.update_partition_stats(table_name, &active_partition).await?;
184        self.check_and_create_partition(table_name).await?;
185
186        Ok(inserted)
187    }
188
189    async fn get_active_partition(&self, table_name: &str) -> Result<String> {
190        if let Some(partitions) = self.partitions.get(table_name) {
191            if let Some(active) = partitions.iter().find(|p| p.is_active) {
192                return Ok(active.partition_name.clone());
193            }
194        }
195        Ok(format!("{}_p0", table_name))
196    }
197
198    async fn check_and_create_partition(&mut self, table_name: &str) -> Result<()> {
199        let config = self.configs.get(table_name)
200            .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
201
202        let active_partition = self.get_active_partition(table_name).await?;
203        let partition_stats = self.get_partition_stats(table_name, &active_partition).await?;
204
205        let should_create = match &config.partition_type {
206            PartitionType::Daily => {
207                let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
208                partition_stats.start_date != today
209            }
210            PartitionType::Hourly => {
211                let hour = chrono::Utc::now().format("%Y-%m-%d %H:00:00").to_string();
212                partition_stats.start_date != hour
213            }
214            PartitionType::SizeBased { max_size_gb } => {
215                partition_stats.size_mb as f64 / 1024.0 >= *max_size_gb
216            }
217            _ => false,
218        };
219
220        if should_create && config.auto_create_partition {
221            self.create_new_partition(table_name).await?;
222        }
223
224        Ok(())
225    }
226
227    async fn get_partition_stats(&self, table_name: &str, partition_name: &str) -> Result<LogPartition> {
228        let sql = format!(
229            "SELECT COUNT(*) as cnt, AVG(LENGTH(message)) as avg_size FROM {}",
230            partition_name
231        );
232
233        let rows = self.connection.query(&sql).await?;
234        let row_count = rows.first()
235            .and_then(|r| r.get("cnt").and_then(|v| v.as_u64()))
236            .unwrap_or(0);
237
238        Ok(LogPartition {
239            partition_name: partition_name.to_string(),
240            table_name: table_name.to_string(),
241            start_date: chrono::Utc::now().format("%Y-%m-%d").to_string(),
242            end_date: None,
243            row_count,
244            size_mb: 0.0,
245            is_active: true,
246            compressed: false,
247        })
248    }
249
250    async fn create_new_partition(&mut self, table_name: &str) -> Result<()> {
251        let config = self.configs.get(table_name)
252            .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
253
254        let partitions = self.partitions.get(table_name);
255        let partition_num = partitions.map(|p| p.len()).unwrap_or(0);
256
257        let _new_partition_name = format!("{}_{}", config.partition_prefix, partition_num);
258
259        let date_pattern = match &config.partition_type {
260            PartitionType::Daily => chrono::Utc::now().format("%Y%m%d").to_string(),
261            PartitionType::Weekly => chrono::Utc::now().format("%Y-W%V").to_string(),
262            PartitionType::Monthly => chrono::Utc::now().format("%Y%m").to_string(),
263            PartitionType::Hourly => chrono::Utc::now().format("%Y%m%d%H").to_string(),
264            PartitionType::SizeBased { .. } => format!("{}", partition_num),
265        };
266
267        let partition_name = format!("{}_{}", config.partition_prefix, date_pattern);
268
269        let create_sql = format!(
270            "CREATE TABLE IF NOT EXISTS {} LIKE {}",
271            partition_name, table_name
272        );
273
274        self.connection.execute(&create_sql).await?;
275
276        if let Some(partitions) = self.partitions.get_mut(table_name) {
277            if let Some(last) = partitions.iter_mut().find(|p| p.is_active) {
278                last.is_active = false;
279                last.end_date = Some(chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string());
280            }
281
282            partitions.push(LogPartition {
283                partition_name: partition_name.clone(),
284                table_name: table_name.to_string(),
285                start_date: chrono::Utc::now().format("%Y-%m-%d").to_string(),
286                end_date: None,
287                row_count: 0,
288                size_mb: 0.0,
289                is_active: true,
290                compressed: false,
291            });
292        }
293
294        Ok(())
295    }
296
297    async fn update_partition_stats(&self, table_name: &str, partition_name: &str) -> Result<()> {
298        if let Some(partitions) = self.partitions.get(table_name) {
299            if let Some(_stats) = partitions.iter().find(|p| &p.partition_name == partition_name) {
300                let sql = format!("SELECT COUNT(*) as cnt FROM {}", partition_name);
301                if let Ok(rows) = self.connection.query(&sql).await {
302                    if let Some(_cnt) = rows.first().and_then(|r| r.get("cnt").and_then(|v| v.as_u64())) {
303                        return Ok(());
304                    }
305                }
306            }
307        }
308        Ok(())
309    }
310
311    pub async fn query_logs(&self, query: LogQuery) -> Result<LogQueryResult> {
312        let start = std::time::Instant::now();
313
314        let partitions = self.get_partitions_for_query(&query)?;
315        let mut all_results = Vec::new();
316        let mut partitions_queried = Vec::new();
317
318        let mut remaining = query.limit;
319        let mut current_offset = query.offset;
320
321        for partition_name in partitions {
322            if remaining == 0 {
323                break;
324            }
325
326            partitions_queried.push(partition_name.clone());
327
328            let where_clauses = self.build_where_clause(&query, &partition_name)?;
329            let sql = format!(
330                "SELECT * FROM {} WHERE {} ORDER BY timestamp {} LIMIT {} OFFSET {}",
331                partition_name,
332                where_clauses,
333                if query.order_desc { "DESC" } else { "ASC" },
334                remaining,
335                current_offset
336            );
337
338            match self.connection.query(&sql).await {
339                Ok(rows) => {
340                    let row_count = rows.len() as u64;
341
342                    for row in rows {
343                        if let Some(obj) = row.as_object() {
344                            all_results.push(serde_json::Value::Object(obj.clone()));
345                        }
346                    }
347
348                    if current_offset >= row_count {
349                        current_offset -= row_count;
350                    } else {
351                        current_offset = 0;
352                    }
353
354                    remaining = remaining.saturating_sub(row_count);
355                }
356                Err(_) => continue,
357            }
358        }
359
360        let total_matched = self.count_matching_logs(&query).await?;
361
362        Ok(LogQueryResult {
363            total_matched,
364            results: all_results,
365            partitions_queried,
366            query_time_ms: start.elapsed().as_millis() as u64,
367            has_more: total_matched > query.offset + query.limit,
368        })
369    }
370
371    pub async fn query_logs_with_aggregation(&self, query: LogQuery) -> Result<LogAggregationResult> {
372        let log_result = self.query_logs(query).await?;
373        let aggregator = LogAggregator::new(3600, 300);
374
375        let level_distribution = aggregator.aggregate_by_level(&log_result.results);
376        let time_distribution = aggregator.aggregate_by_time_window(&log_result.results);
377
378        Ok(LogAggregationResult {
379            total_matched: log_result.total_matched,
380            results: log_result.results,
381            level_distribution,
382            time_distribution,
383            query_time_ms: log_result.query_time_ms,
384        })
385    }
386
387    fn get_partitions_for_query(&self, query: &LogQuery) -> Result<Vec<String>> {
388        let partitions = self.partitions.get(&query.table_name)
389            .ok_or_else(|| anyhow!("No partitions found for table {}", query.table_name))?;
390
391        let mut relevant_partitions: Vec<&LogPartition> = partitions.iter()
392            .filter(|p| {
393                if let (Some(start), Some(end)) = (query.start_time, query.end_time) {
394                    let part_start = chrono::DateTime::parse_from_rfc3339(&p.start_date)
395                        .map(|dt| dt.timestamp())
396                        .unwrap_or(0);
397                    part_start >= start && part_start <= end
398                } else {
399                    p.is_active
400                }
401            })
402            .collect();
403
404        relevant_partitions.sort_by(|a, b| a.start_date.cmp(&b.start_date));
405
406        Ok(relevant_partitions.into_iter().map(|p| p.partition_name.clone()).collect())
407    }
408
409    fn build_where_clause(&self, query: &LogQuery, _partition: &str) -> Result<String> {
410        let mut clauses = Vec::new();
411
412        if !query.level_filter.is_empty() {
413            let levels = query.level_filter.iter()
414                .map(|l| format!("'{}'", l))
415                .collect::<Vec<_>>()
416                .join(", ");
417            clauses.push(format!("level IN ({})", levels));
418        }
419
420        if let Some(ref keyword) = query.keyword_filter {
421            clauses.push(format!("message LIKE '%{}%'", keyword.replace('\'', "''")));
422        }
423
424        if let (Some(start), Some(end)) = (query.start_time, query.end_time) {
425            clauses.push(format!(
426                "timestamp BETWEEN FROM_UNIXTIME({}) AND FROM_UNIXTIME({})",
427                start, end
428            ));
429        }
430
431        if clauses.is_empty() {
432            Ok("1=1".to_string())
433        } else {
434            Ok(clauses.join(" AND "))
435        }
436    }
437
438    async fn count_matching_logs(&self, query: &LogQuery) -> Result<u64> {
439        let partitions = self.get_partitions_for_query(query)?;
440        let mut total = 0u64;
441
442        for partition_name in partitions {
443            let where_clauses = self.build_where_clause(query, &partition_name)?;
444            let sql = format!(
445                "SELECT COUNT(*) as cnt FROM {} WHERE {}",
446                partition_name, where_clauses
447            );
448
449            if let Ok(rows) = self.connection.query(&sql).await {
450                total += rows.first()
451                    .and_then(|r| r.get("cnt").and_then(|v| v.as_u64()))
452                    .unwrap_or(0);
453            }
454        }
455
456        Ok(total)
457    }
458
459    pub async fn cleanup_old_partitions(&mut self, table_name: &str) -> Result<u32> {
460        let config = self.configs.get(table_name)
461            .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
462
463        let cutoff_date = chrono::Utc::now()
464            .checked_sub_signed(chrono::Duration::days(config.retention_days as i64))
465            .unwrap()
466            .format("%Y-%m-%d")
467            .to_string();
468
469        let mut deleted_count = 0u32;
470
471        if let Some(partitions) = self.partitions.get_mut(table_name) {
472            let to_delete: Vec<String> = partitions.iter()
473                .filter(|p| !p.is_active && p.start_date < cutoff_date)
474                .map(|p| p.partition_name.clone())
475                .collect();
476
477            for partition_name in to_delete {
478                let drop_sql = format!("DROP TABLE IF EXISTS {}", partition_name);
479                if self.connection.execute(&drop_sql).await.is_ok() {
480                    partitions.retain(|p| p.partition_name != partition_name);
481                    deleted_count += 1;
482                }
483            }
484        }
485
486        Ok(deleted_count)
487    }
488
489    pub async fn archive_old_partitions(&mut self, table_name: &str, archive_table: &str) -> Result<u64> {
490        let config = self.configs.get(table_name)
491            .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
492
493        let cutoff_date = chrono::Utc::now()
494            .checked_sub_signed(chrono::Duration::days(config.retention_days as i64))
495            .unwrap()
496            .format("%Y-%m-%d")
497            .to_string();
498
499        let mut archived_count = 0u64;
500
501        if let Some(partitions) = self.partitions.get(table_name) {
502            let to_archive: Vec<String> = partitions.iter()
503                .filter(|p| !p.is_active && p.start_date < cutoff_date)
504                .map(|p| p.partition_name.clone())
505                .collect();
506
507            for partition_name in to_archive {
508                let archive_sql = format!(
509                    "INSERT INTO {} SELECT * FROM {}",
510                    archive_table, partition_name
511                );
512                if self.connection.execute(&archive_sql).await.is_ok() {
513                    archived_count += 1;
514                }
515            }
516        }
517
518        Ok(archived_count)
519    }
520
521    pub async fn get_log_stats(&self, table_name: &str) -> Result<LogStats> {
522        let partitions = self.partitions.get(table_name)
523            .ok_or_else(|| anyhow!("No partitions found for table {}", table_name))?;
524
525        let total_partitions = partitions.len();
526        let active_partitions = partitions.iter().filter(|p| p.is_active).count();
527
528        let total_rows: u64 = partitions.iter().map(|p| p.row_count).sum();
529        let total_size: f64 = partitions.iter().map(|p| p.size_mb).sum();
530
531        let oldest = partitions.iter().min_by(|a, b| a.start_date.cmp(&b.start_date));
532        let newest = partitions.iter().max_by(|a, b| a.start_date.cmp(&b.start_date));
533
534        let compressed_count = partitions.iter().filter(|p| p.compressed).count();
535        let compression_ratio = if total_partitions > 0 {
536            compressed_count as f64 / total_partitions as f64
537        } else {
538            0.0
539        };
540
541        Ok(LogStats {
542            table_name: table_name.to_string(),
543            total_partitions,
544            active_partitions,
545            total_size_gb: total_size / 1024.0,
546            total_rows,
547            compression_ratio,
548            oldest_partition: oldest.map(|p| p.start_date.clone()),
549            newest_partition: newest.map(|p| p.start_date.clone()),
550        })
551    }
552
553    pub async fn compress_partition(&mut self, table_name: &str, partition_name: &str) -> Result<()> {
554        if let Some(partitions) = self.partitions.get_mut(table_name) {
555            if let Some(partition) = partitions.iter_mut().find(|p| &p.partition_name == partition_name) {
556                let optimize_sql = format!("OPTIMIZE TABLE {}", partition_name);
557                self.connection.execute(&optimize_sql).await?;
558                partition.compressed = true;
559            }
560        }
561        Ok(())
562    }
563
564    pub fn get_all_partitions(&self, table_name: &str) -> Option<Vec<LogPartition>> {
565        self.partitions.get(table_name).cloned()
566    }
567
568    pub fn get_partition_count(&self, table_name: &str) -> usize {
569        self.partitions.get(table_name).map(|p| p.len()).unwrap_or(0)
570    }
571
572    pub async fn truncate_old_logs(&mut self, table_name: &str, days_to_keep: u32) -> Result<u64> {
573        let cutoff = chrono::Utc::now()
574            .checked_sub_signed(chrono::Duration::days(days_to_keep as i64))
575            .unwrap()
576            .format("%Y-%m-%d %H:%M:%S")
577            .to_string();
578
579        let sql = format!(
580            "DELETE FROM {} WHERE timestamp < '{}'",
581            table_name, cutoff
582        );
583
584        self.connection.execute(&sql).await?;
585        Ok(1)
586    }
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct LogEntry {
591    pub level: String,
592    pub message: String,
593    pub context: Option<serde_json::Value>,
594    pub source: Option<String>,
595}
596
597#[derive(Debug, Clone, Serialize, Deserialize)]
598pub struct LogAggregationResult {
599    pub total_matched: u64,
600    pub results: Vec<serde_json::Value>,
601    pub level_distribution: HashMap<String, u64>,
602    pub time_distribution: Vec<TimeWindowAggregation>,
603    pub query_time_ms: u64,
604}
605
606pub struct LogAggregator {
607    time_window_secs: u64,
608    aggregation_interval_secs: u64,
609}
610
611impl LogAggregator {
612    pub fn new(time_window_secs: u64, aggregation_interval_secs: u64) -> Self {
613        Self {
614            time_window_secs,
615            aggregation_interval_secs,
616        }
617    }
618
619    pub fn aggregate_by_level(&self, logs: &[serde_json::Value]) -> HashMap<String, u64> {
620        let mut counts = HashMap::new();
621
622        for log in logs {
623            if let Some(level) = log.get("level").and_then(|v| v.as_str()) {
624                *counts.entry(level.to_string()).or_insert(0) += 1;
625            }
626        }
627
628        counts
629    }
630
631    pub fn aggregate_by_time_window(&self, logs: &[serde_json::Value]) -> Vec<TimeWindowAggregation> {
632        let mut windows: HashMap<u64, TimeWindowAggregation> = HashMap::new();
633
634        for log in logs {
635            if let Some(timestamp) = log.get("timestamp").and_then(|v| v.as_str()) {
636                if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) {
637                    let window_key = dt.timestamp() / self.aggregation_interval_secs as i64;
638
639                    let entry = windows.entry(window_key as u64).or_insert_with(|| TimeWindowAggregation {
640                        window_start: window_key as i64 * self.aggregation_interval_secs as i64,
641                        window_end: (window_key + 1) as i64 * self.aggregation_interval_secs as i64 - 1,
642                        count: 0,
643                        levels: HashMap::new(),
644                        error_count: 0,
645                        warning_count: 0,
646                    });
647
648                    entry.count += 1;
649
650                    if let Some(level) = log.get("level").and_then(|v| v.as_str()) {
651                        *entry.levels.entry(level.to_string()).or_insert(0) += 1;
652
653                        match level {
654                            "ERROR" | "FATAL" | "CRITICAL" => entry.error_count += 1,
655                            "WARN" | "WARNING" => entry.warning_count += 1,
656                            _ => {}
657                        }
658                    }
659                }
660            }
661        }
662
663        let mut result: Vec<_> = windows.into_values().collect();
664        result.sort_by_key(|w| w.window_start);
665        result
666    }
667
668    pub fn find_error_spikes(&self, logs: &[serde_json::Value], threshold: u64) -> Vec<ErrorSpike> {
669        let windows = self.aggregate_by_time_window(logs);
670        let mut spikes = Vec::new();
671
672        for window in windows {
673            if window.error_count >= threshold {
674                spikes.push(ErrorSpike {
675                    window_start: window.window_start,
676                    window_end: window.window_end,
677                    error_count: window.error_count,
678                    total_logs: window.count,
679                });
680            }
681        }
682
683        spikes
684    }
685
686    pub fn calculate_error_rate(&self, logs: &[serde_json::Value]) -> f64 {
687        if logs.is_empty() {
688            return 0.0;
689        }
690
691        let error_count = logs.iter()
692            .filter(|log| {
693                log.get("level")
694                    .and_then(|v| v.as_str())
695                    .map(|l| l == "ERROR" || l == "FATAL" || l == "CRITICAL")
696                    .unwrap_or(false)
697            })
698            .count() as f64;
699
700        error_count / logs.len() as f64
701    }
702}
703
704#[derive(Debug, Clone, Serialize, Deserialize)]
705pub struct TimeWindowAggregation {
706    pub window_start: i64,
707    pub window_end: i64,
708    pub count: u64,
709    pub levels: HashMap<String, u64>,
710    pub error_count: u64,
711    pub warning_count: u64,
712}
713
714#[derive(Debug, Clone, Serialize, Deserialize)]
715pub struct ErrorSpike {
716    pub window_start: i64,
717    pub window_end: i64,
718    pub error_count: u64,
719    pub total_logs: u64,
720}
721
722#[cfg(test)]
723mod tests {
724    use super::*;
725
726    #[test]
727    fn test_partition_type_serialization() {
728        let pt = PartitionType::Daily;
729        let json = serde_json::to_string(&pt).unwrap();
730        assert!(json.contains("Daily"));
731    }
732
733    #[test]
734    fn test_log_aggregator() {
735        let aggregator = LogAggregator::new(3600, 60);
736
737        let logs = vec![
738            serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
739            serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
740            serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
741        ];
742
743        let counts = aggregator.aggregate_by_level(&logs);
744        assert_eq!(counts.get("INFO"), Some(&2));
745        assert_eq!(counts.get("ERROR"), Some(&1));
746    }
747
748    #[test]
749    fn test_log_stats_calculation() {
750        let partitions = vec![
751            LogPartition {
752                partition_name: "logs_p0".to_string(),
753                table_name: "logs".to_string(),
754                start_date: "2024-01-01".to_string(),
755                end_date: Some("2024-01-02".to_string()),
756                row_count: 1000,
757                size_mb: 100.0,
758                is_active: false,
759                compressed: false,
760            },
761            LogPartition {
762                partition_name: "logs_p1".to_string(),
763                table_name: "logs".to_string(),
764                start_date: "2024-01-02".to_string(),
765                end_date: None,
766                row_count: 500,
767                size_mb: 50.0,
768                is_active: true,
769                compressed: true,
770            },
771        ];
772
773        let total_rows: u64 = partitions.iter().map(|p| p.row_count).sum();
774        assert_eq!(total_rows, 1500);
775
776        let compressed_ratio = partitions.iter().filter(|p| p.compressed).count() as f64 / partitions.len() as f64;
777        assert_eq!(compressed_ratio, 0.5);
778    }
779
780    #[test]
781    fn test_error_spike_detection() {
782        let aggregator = LogAggregator::new(3600, 60);
783
784        let logs = vec![
785            serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
786            serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
787            serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
788            serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
789        ];
790
791        let spikes = aggregator.find_error_spikes(&logs, 2);
792        assert!(!spikes.is_empty());
793        assert_eq!(spikes[0].error_count, 3);
794    }
795
796    #[test]
797    fn test_error_rate_calculation() {
798        let aggregator = LogAggregator::new(3600, 60);
799
800        let logs = vec![
801            serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
802            serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
803            serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
804            serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
805        ];
806
807        let rate = aggregator.calculate_error_rate(&logs);
808        assert_eq!(rate, 0.25);
809    }
810
811    #[test]
812    fn test_log_entry_creation() {
813        let entry = LogEntry {
814            level: "ERROR".to_string(),
815            message: "Test error".to_string(),
816            context: Some(serde_json::json!({"user_id": 123})),
817            source: Some("test_module".to_string()),
818        };
819
820        assert_eq!(entry.level, "ERROR");
821        assert_eq!(entry.message, "Test error");
822    }
823}