1use crate::databases::DatabaseConnection;
2use anyhow::{Result, anyhow};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct LogTableConfig {
8 pub table_name: String,
9 pub partition_type: PartitionType,
10 pub retention_days: u32,
11 pub max_partition_size_gb: f64,
12 pub auto_create_partition: bool,
13 pub compression_enabled: bool,
14 pub indexed_fields: Vec<String>,
15 pub partition_prefix: String,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum PartitionType {
20 Daily,
21 Weekly,
22 Monthly,
23 Hourly,
24 SizeBased { max_size_gb: f64 },
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct LogPartition {
29 pub partition_name: String,
30 pub table_name: String,
31 pub start_date: String,
32 pub end_date: Option<String>,
33 pub row_count: u64,
34 pub size_mb: f64,
35 pub is_active: bool,
36 pub compressed: bool,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct LogQuery {
41 pub table_name: String,
42 pub start_time: Option<i64>,
43 pub end_time: Option<i64>,
44 pub level_filter: Vec<String>,
45 pub keyword_filter: Option<String>,
46 pub limit: u64,
47 pub offset: u64,
48 pub order_desc: bool,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct LogQueryResult {
53 pub total_matched: u64,
54 pub results: Vec<serde_json::Value>,
55 pub partitions_queried: Vec<String>,
56 pub query_time_ms: u64,
57 pub has_more: bool,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct LogStats {
62 pub table_name: String,
63 pub total_partitions: usize,
64 pub active_partitions: usize,
65 pub total_size_gb: f64,
66 pub total_rows: u64,
67 pub compression_ratio: f64,
68 pub oldest_partition: Option<String>,
69 pub newest_partition: Option<String>,
70}
71
72pub struct LogTableManager {
73 connection: Box<dyn DatabaseConnection>,
74 configs: HashMap<String, LogTableConfig>,
75 partitions: HashMap<String, Vec<LogPartition>>,
76}
77
78impl LogTableManager {
79 pub fn new(connection: Box<dyn DatabaseConnection>) -> Self {
80 Self {
81 connection,
82 configs: HashMap::new(),
83 partitions: HashMap::new(),
84 }
85 }
86
87 pub fn register_log_table(&mut self, config: LogTableConfig) {
88 self.configs.insert(config.table_name.clone(), config);
89 }
90
91 pub async fn create_log_table(&mut self, table_name: &str) -> Result<()> {
92 let create_sql = format!(
93 r#"
94 CREATE TABLE IF NOT EXISTS {} (
95 id BIGINT AUTO_INCREMENT PRIMARY KEY,
96 timestamp DATETIME NOT NULL,
97 level VARCHAR(20) NOT NULL,
98 message TEXT,
99 context JSON,
100 source VARCHAR(255),
101 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
102 INDEX idx_timestamp (timestamp),
103 INDEX idx_level (level),
104 INDEX idx_created_at (created_at)
105 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
106 "#,
107 table_name
108 );
109
110 self.connection.execute(&create_sql).await?;
111
112 let initial_partition = LogPartition {
113 partition_name: format!("{}_p0", table_name),
114 table_name: table_name.to_string(),
115 start_date: chrono::Utc::now().format("%Y-%m-%d").to_string(),
116 end_date: None,
117 row_count: 0,
118 size_mb: 0.0,
119 is_active: true,
120 compressed: false,
121 };
122
123 self.partitions
124 .entry(table_name.to_string())
125 .or_insert_with(Vec::new)
126 .push(initial_partition);
127
128 Ok(())
129 }
130
131 pub async fn insert_log(&mut self, table_name: &str, level: &str, message: &str, context: Option<serde_json::Value>, source: Option<&str>) -> Result<()> {
132 let _config = self.configs.get(table_name)
133 .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
134
135 let active_partition = self.get_active_partition(table_name).await?;
136
137 let context_json = context.map(|c| serde_json::to_string(&c).unwrap_or_default())
138 .unwrap_or_else(|| "null".to_string());
139
140 let insert_sql = format!(
141 "INSERT INTO {} (timestamp, level, message, context, source) VALUES (NOW(), '{}', '{}', '{}', '{}')",
142 active_partition,
143 level,
144 message.replace('\'', "''"),
145 context_json.replace('\'', "''"),
146 source.unwrap_or("unknown").replace('\'', "''")
147 );
148
149 self.connection.execute(&insert_sql).await?;
150
151 self.update_partition_stats(table_name, &active_partition).await?;
152
153 self.check_and_create_partition(table_name).await?;
154
155 Ok(())
156 }
157
158 pub async fn batch_insert_logs(&mut self, table_name: &str, logs: Vec<LogEntry>) -> Result<u64> {
159 let _config = self.configs.get(table_name)
160 .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
161
162 let active_partition = self.get_active_partition(table_name).await?;
163 let mut inserted = 0u64;
164
165 for log in logs {
166 let context_json = log.context.map(|c| serde_json::to_string(&c).unwrap_or_default())
167 .unwrap_or_else(|| "null".to_string());
168
169 let insert_sql = format!(
170 "INSERT INTO {} (timestamp, level, message, context, source) VALUES (NOW(), '{}', '{}', '{}', '{}')",
171 active_partition,
172 log.level,
173 log.message.replace('\'', "''"),
174 context_json.replace('\'', "''"),
175 log.source.unwrap_or_else(|| "unknown".to_string()).replace('\'', "''")
176 );
177
178 if self.connection.execute(&insert_sql).await.is_ok() {
179 inserted += 1;
180 }
181 }
182
183 self.update_partition_stats(table_name, &active_partition).await?;
184 self.check_and_create_partition(table_name).await?;
185
186 Ok(inserted)
187 }
188
189 async fn get_active_partition(&self, table_name: &str) -> Result<String> {
190 if let Some(partitions) = self.partitions.get(table_name) {
191 if let Some(active) = partitions.iter().find(|p| p.is_active) {
192 return Ok(active.partition_name.clone());
193 }
194 }
195 Ok(format!("{}_p0", table_name))
196 }
197
198 async fn check_and_create_partition(&mut self, table_name: &str) -> Result<()> {
199 let config = self.configs.get(table_name)
200 .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
201
202 let active_partition = self.get_active_partition(table_name).await?;
203 let partition_stats = self.get_partition_stats(table_name, &active_partition).await?;
204
205 let should_create = match &config.partition_type {
206 PartitionType::Daily => {
207 let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
208 partition_stats.start_date != today
209 }
210 PartitionType::Hourly => {
211 let hour = chrono::Utc::now().format("%Y-%m-%d %H:00:00").to_string();
212 partition_stats.start_date != hour
213 }
214 PartitionType::SizeBased { max_size_gb } => {
215 partition_stats.size_mb as f64 / 1024.0 >= *max_size_gb
216 }
217 _ => false,
218 };
219
220 if should_create && config.auto_create_partition {
221 self.create_new_partition(table_name).await?;
222 }
223
224 Ok(())
225 }
226
227 async fn get_partition_stats(&self, table_name: &str, partition_name: &str) -> Result<LogPartition> {
228 let sql = format!(
229 "SELECT COUNT(*) as cnt, AVG(LENGTH(message)) as avg_size FROM {}",
230 partition_name
231 );
232
233 let rows = self.connection.query(&sql).await?;
234 let row_count = rows.first()
235 .and_then(|r| r.get("cnt").and_then(|v| v.as_u64()))
236 .unwrap_or(0);
237
238 Ok(LogPartition {
239 partition_name: partition_name.to_string(),
240 table_name: table_name.to_string(),
241 start_date: chrono::Utc::now().format("%Y-%m-%d").to_string(),
242 end_date: None,
243 row_count,
244 size_mb: 0.0,
245 is_active: true,
246 compressed: false,
247 })
248 }
249
250 async fn create_new_partition(&mut self, table_name: &str) -> Result<()> {
251 let config = self.configs.get(table_name)
252 .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
253
254 let partitions = self.partitions.get(table_name);
255 let partition_num = partitions.map(|p| p.len()).unwrap_or(0);
256
257 let _new_partition_name = format!("{}_{}", config.partition_prefix, partition_num);
258
259 let date_pattern = match &config.partition_type {
260 PartitionType::Daily => chrono::Utc::now().format("%Y%m%d").to_string(),
261 PartitionType::Weekly => chrono::Utc::now().format("%Y-W%V").to_string(),
262 PartitionType::Monthly => chrono::Utc::now().format("%Y%m").to_string(),
263 PartitionType::Hourly => chrono::Utc::now().format("%Y%m%d%H").to_string(),
264 PartitionType::SizeBased { .. } => format!("{}", partition_num),
265 };
266
267 let partition_name = format!("{}_{}", config.partition_prefix, date_pattern);
268
269 let create_sql = format!(
270 "CREATE TABLE IF NOT EXISTS {} LIKE {}",
271 partition_name, table_name
272 );
273
274 self.connection.execute(&create_sql).await?;
275
276 if let Some(partitions) = self.partitions.get_mut(table_name) {
277 if let Some(last) = partitions.iter_mut().find(|p| p.is_active) {
278 last.is_active = false;
279 last.end_date = Some(chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string());
280 }
281
282 partitions.push(LogPartition {
283 partition_name: partition_name.clone(),
284 table_name: table_name.to_string(),
285 start_date: chrono::Utc::now().format("%Y-%m-%d").to_string(),
286 end_date: None,
287 row_count: 0,
288 size_mb: 0.0,
289 is_active: true,
290 compressed: false,
291 });
292 }
293
294 Ok(())
295 }
296
297 async fn update_partition_stats(&self, table_name: &str, partition_name: &str) -> Result<()> {
298 if let Some(partitions) = self.partitions.get(table_name) {
299 if let Some(_stats) = partitions.iter().find(|p| &p.partition_name == partition_name) {
300 let sql = format!("SELECT COUNT(*) as cnt FROM {}", partition_name);
301 if let Ok(rows) = self.connection.query(&sql).await {
302 if let Some(_cnt) = rows.first().and_then(|r| r.get("cnt").and_then(|v| v.as_u64())) {
303 return Ok(());
304 }
305 }
306 }
307 }
308 Ok(())
309 }
310
311 pub async fn query_logs(&self, query: LogQuery) -> Result<LogQueryResult> {
312 let start = std::time::Instant::now();
313
314 let partitions = self.get_partitions_for_query(&query)?;
315 let mut all_results = Vec::new();
316 let mut partitions_queried = Vec::new();
317
318 let mut remaining = query.limit;
319 let mut current_offset = query.offset;
320
321 for partition_name in partitions {
322 if remaining == 0 {
323 break;
324 }
325
326 partitions_queried.push(partition_name.clone());
327
328 let where_clauses = self.build_where_clause(&query, &partition_name)?;
329 let sql = format!(
330 "SELECT * FROM {} WHERE {} ORDER BY timestamp {} LIMIT {} OFFSET {}",
331 partition_name,
332 where_clauses,
333 if query.order_desc { "DESC" } else { "ASC" },
334 remaining,
335 current_offset
336 );
337
338 match self.connection.query(&sql).await {
339 Ok(rows) => {
340 let row_count = rows.len() as u64;
341
342 for row in rows {
343 if let Some(obj) = row.as_object() {
344 all_results.push(serde_json::Value::Object(obj.clone()));
345 }
346 }
347
348 if current_offset >= row_count {
349 current_offset -= row_count;
350 } else {
351 current_offset = 0;
352 }
353
354 remaining = remaining.saturating_sub(row_count);
355 }
356 Err(_) => continue,
357 }
358 }
359
360 let total_matched = self.count_matching_logs(&query).await?;
361
362 Ok(LogQueryResult {
363 total_matched,
364 results: all_results,
365 partitions_queried,
366 query_time_ms: start.elapsed().as_millis() as u64,
367 has_more: total_matched > query.offset + query.limit,
368 })
369 }
370
371 pub async fn query_logs_with_aggregation(&self, query: LogQuery) -> Result<LogAggregationResult> {
372 let log_result = self.query_logs(query).await?;
373 let aggregator = LogAggregator::new(3600, 300);
374
375 let level_distribution = aggregator.aggregate_by_level(&log_result.results);
376 let time_distribution = aggregator.aggregate_by_time_window(&log_result.results);
377
378 Ok(LogAggregationResult {
379 total_matched: log_result.total_matched,
380 results: log_result.results,
381 level_distribution,
382 time_distribution,
383 query_time_ms: log_result.query_time_ms,
384 })
385 }
386
387 fn get_partitions_for_query(&self, query: &LogQuery) -> Result<Vec<String>> {
388 let partitions = self.partitions.get(&query.table_name)
389 .ok_or_else(|| anyhow!("No partitions found for table {}", query.table_name))?;
390
391 let mut relevant_partitions: Vec<&LogPartition> = partitions.iter()
392 .filter(|p| {
393 if let (Some(start), Some(end)) = (query.start_time, query.end_time) {
394 let part_start = chrono::DateTime::parse_from_rfc3339(&p.start_date)
395 .map(|dt| dt.timestamp())
396 .unwrap_or(0);
397 part_start >= start && part_start <= end
398 } else {
399 p.is_active
400 }
401 })
402 .collect();
403
404 relevant_partitions.sort_by(|a, b| a.start_date.cmp(&b.start_date));
405
406 Ok(relevant_partitions.into_iter().map(|p| p.partition_name.clone()).collect())
407 }
408
409 fn build_where_clause(&self, query: &LogQuery, _partition: &str) -> Result<String> {
410 let mut clauses = Vec::new();
411
412 if !query.level_filter.is_empty() {
413 let levels = query.level_filter.iter()
414 .map(|l| format!("'{}'", l))
415 .collect::<Vec<_>>()
416 .join(", ");
417 clauses.push(format!("level IN ({})", levels));
418 }
419
420 if let Some(ref keyword) = query.keyword_filter {
421 clauses.push(format!("message LIKE '%{}%'", keyword.replace('\'', "''")));
422 }
423
424 if let (Some(start), Some(end)) = (query.start_time, query.end_time) {
425 clauses.push(format!(
426 "timestamp BETWEEN FROM_UNIXTIME({}) AND FROM_UNIXTIME({})",
427 start, end
428 ));
429 }
430
431 if clauses.is_empty() {
432 Ok("1=1".to_string())
433 } else {
434 Ok(clauses.join(" AND "))
435 }
436 }
437
438 async fn count_matching_logs(&self, query: &LogQuery) -> Result<u64> {
439 let partitions = self.get_partitions_for_query(query)?;
440 let mut total = 0u64;
441
442 for partition_name in partitions {
443 let where_clauses = self.build_where_clause(query, &partition_name)?;
444 let sql = format!(
445 "SELECT COUNT(*) as cnt FROM {} WHERE {}",
446 partition_name, where_clauses
447 );
448
449 if let Ok(rows) = self.connection.query(&sql).await {
450 total += rows.first()
451 .and_then(|r| r.get("cnt").and_then(|v| v.as_u64()))
452 .unwrap_or(0);
453 }
454 }
455
456 Ok(total)
457 }
458
459 pub async fn cleanup_old_partitions(&mut self, table_name: &str) -> Result<u32> {
460 let config = self.configs.get(table_name)
461 .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
462
463 let cutoff_date = chrono::Utc::now()
464 .checked_sub_signed(chrono::Duration::days(config.retention_days as i64))
465 .unwrap()
466 .format("%Y-%m-%d")
467 .to_string();
468
469 let mut deleted_count = 0u32;
470
471 if let Some(partitions) = self.partitions.get_mut(table_name) {
472 let to_delete: Vec<String> = partitions.iter()
473 .filter(|p| !p.is_active && p.start_date < cutoff_date)
474 .map(|p| p.partition_name.clone())
475 .collect();
476
477 for partition_name in to_delete {
478 let drop_sql = format!("DROP TABLE IF EXISTS {}", partition_name);
479 if self.connection.execute(&drop_sql).await.is_ok() {
480 partitions.retain(|p| p.partition_name != partition_name);
481 deleted_count += 1;
482 }
483 }
484 }
485
486 Ok(deleted_count)
487 }
488
489 pub async fn archive_old_partitions(&mut self, table_name: &str, archive_table: &str) -> Result<u64> {
490 let config = self.configs.get(table_name)
491 .ok_or_else(|| anyhow!("Log table {} not registered", table_name))?;
492
493 let cutoff_date = chrono::Utc::now()
494 .checked_sub_signed(chrono::Duration::days(config.retention_days as i64))
495 .unwrap()
496 .format("%Y-%m-%d")
497 .to_string();
498
499 let mut archived_count = 0u64;
500
501 if let Some(partitions) = self.partitions.get(table_name) {
502 let to_archive: Vec<String> = partitions.iter()
503 .filter(|p| !p.is_active && p.start_date < cutoff_date)
504 .map(|p| p.partition_name.clone())
505 .collect();
506
507 for partition_name in to_archive {
508 let archive_sql = format!(
509 "INSERT INTO {} SELECT * FROM {}",
510 archive_table, partition_name
511 );
512 if self.connection.execute(&archive_sql).await.is_ok() {
513 archived_count += 1;
514 }
515 }
516 }
517
518 Ok(archived_count)
519 }
520
521 pub async fn get_log_stats(&self, table_name: &str) -> Result<LogStats> {
522 let partitions = self.partitions.get(table_name)
523 .ok_or_else(|| anyhow!("No partitions found for table {}", table_name))?;
524
525 let total_partitions = partitions.len();
526 let active_partitions = partitions.iter().filter(|p| p.is_active).count();
527
528 let total_rows: u64 = partitions.iter().map(|p| p.row_count).sum();
529 let total_size: f64 = partitions.iter().map(|p| p.size_mb).sum();
530
531 let oldest = partitions.iter().min_by(|a, b| a.start_date.cmp(&b.start_date));
532 let newest = partitions.iter().max_by(|a, b| a.start_date.cmp(&b.start_date));
533
534 let compressed_count = partitions.iter().filter(|p| p.compressed).count();
535 let compression_ratio = if total_partitions > 0 {
536 compressed_count as f64 / total_partitions as f64
537 } else {
538 0.0
539 };
540
541 Ok(LogStats {
542 table_name: table_name.to_string(),
543 total_partitions,
544 active_partitions,
545 total_size_gb: total_size / 1024.0,
546 total_rows,
547 compression_ratio,
548 oldest_partition: oldest.map(|p| p.start_date.clone()),
549 newest_partition: newest.map(|p| p.start_date.clone()),
550 })
551 }
552
553 pub async fn compress_partition(&mut self, table_name: &str, partition_name: &str) -> Result<()> {
554 if let Some(partitions) = self.partitions.get_mut(table_name) {
555 if let Some(partition) = partitions.iter_mut().find(|p| &p.partition_name == partition_name) {
556 let optimize_sql = format!("OPTIMIZE TABLE {}", partition_name);
557 self.connection.execute(&optimize_sql).await?;
558 partition.compressed = true;
559 }
560 }
561 Ok(())
562 }
563
564 pub fn get_all_partitions(&self, table_name: &str) -> Option<Vec<LogPartition>> {
565 self.partitions.get(table_name).cloned()
566 }
567
568 pub fn get_partition_count(&self, table_name: &str) -> usize {
569 self.partitions.get(table_name).map(|p| p.len()).unwrap_or(0)
570 }
571
572 pub async fn truncate_old_logs(&mut self, table_name: &str, days_to_keep: u32) -> Result<u64> {
573 let cutoff = chrono::Utc::now()
574 .checked_sub_signed(chrono::Duration::days(days_to_keep as i64))
575 .unwrap()
576 .format("%Y-%m-%d %H:%M:%S")
577 .to_string();
578
579 let sql = format!(
580 "DELETE FROM {} WHERE timestamp < '{}'",
581 table_name, cutoff
582 );
583
584 self.connection.execute(&sql).await?;
585 Ok(1)
586 }
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct LogEntry {
591 pub level: String,
592 pub message: String,
593 pub context: Option<serde_json::Value>,
594 pub source: Option<String>,
595}
596
597#[derive(Debug, Clone, Serialize, Deserialize)]
598pub struct LogAggregationResult {
599 pub total_matched: u64,
600 pub results: Vec<serde_json::Value>,
601 pub level_distribution: HashMap<String, u64>,
602 pub time_distribution: Vec<TimeWindowAggregation>,
603 pub query_time_ms: u64,
604}
605
606pub struct LogAggregator {
607 time_window_secs: u64,
608 aggregation_interval_secs: u64,
609}
610
611impl LogAggregator {
612 pub fn new(time_window_secs: u64, aggregation_interval_secs: u64) -> Self {
613 Self {
614 time_window_secs,
615 aggregation_interval_secs,
616 }
617 }
618
619 pub fn aggregate_by_level(&self, logs: &[serde_json::Value]) -> HashMap<String, u64> {
620 let mut counts = HashMap::new();
621
622 for log in logs {
623 if let Some(level) = log.get("level").and_then(|v| v.as_str()) {
624 *counts.entry(level.to_string()).or_insert(0) += 1;
625 }
626 }
627
628 counts
629 }
630
631 pub fn aggregate_by_time_window(&self, logs: &[serde_json::Value]) -> Vec<TimeWindowAggregation> {
632 let mut windows: HashMap<u64, TimeWindowAggregation> = HashMap::new();
633
634 for log in logs {
635 if let Some(timestamp) = log.get("timestamp").and_then(|v| v.as_str()) {
636 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) {
637 let window_key = dt.timestamp() / self.aggregation_interval_secs as i64;
638
639 let entry = windows.entry(window_key as u64).or_insert_with(|| TimeWindowAggregation {
640 window_start: window_key as i64 * self.aggregation_interval_secs as i64,
641 window_end: (window_key + 1) as i64 * self.aggregation_interval_secs as i64 - 1,
642 count: 0,
643 levels: HashMap::new(),
644 error_count: 0,
645 warning_count: 0,
646 });
647
648 entry.count += 1;
649
650 if let Some(level) = log.get("level").and_then(|v| v.as_str()) {
651 *entry.levels.entry(level.to_string()).or_insert(0) += 1;
652
653 match level {
654 "ERROR" | "FATAL" | "CRITICAL" => entry.error_count += 1,
655 "WARN" | "WARNING" => entry.warning_count += 1,
656 _ => {}
657 }
658 }
659 }
660 }
661 }
662
663 let mut result: Vec<_> = windows.into_values().collect();
664 result.sort_by_key(|w| w.window_start);
665 result
666 }
667
668 pub fn find_error_spikes(&self, logs: &[serde_json::Value], threshold: u64) -> Vec<ErrorSpike> {
669 let windows = self.aggregate_by_time_window(logs);
670 let mut spikes = Vec::new();
671
672 for window in windows {
673 if window.error_count >= threshold {
674 spikes.push(ErrorSpike {
675 window_start: window.window_start,
676 window_end: window.window_end,
677 error_count: window.error_count,
678 total_logs: window.count,
679 });
680 }
681 }
682
683 spikes
684 }
685
686 pub fn calculate_error_rate(&self, logs: &[serde_json::Value]) -> f64 {
687 if logs.is_empty() {
688 return 0.0;
689 }
690
691 let error_count = logs.iter()
692 .filter(|log| {
693 log.get("level")
694 .and_then(|v| v.as_str())
695 .map(|l| l == "ERROR" || l == "FATAL" || l == "CRITICAL")
696 .unwrap_or(false)
697 })
698 .count() as f64;
699
700 error_count / logs.len() as f64
701 }
702}
703
704#[derive(Debug, Clone, Serialize, Deserialize)]
705pub struct TimeWindowAggregation {
706 pub window_start: i64,
707 pub window_end: i64,
708 pub count: u64,
709 pub levels: HashMap<String, u64>,
710 pub error_count: u64,
711 pub warning_count: u64,
712}
713
714#[derive(Debug, Clone, Serialize, Deserialize)]
715pub struct ErrorSpike {
716 pub window_start: i64,
717 pub window_end: i64,
718 pub error_count: u64,
719 pub total_logs: u64,
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725
726 #[test]
727 fn test_partition_type_serialization() {
728 let pt = PartitionType::Daily;
729 let json = serde_json::to_string(&pt).unwrap();
730 assert!(json.contains("Daily"));
731 }
732
733 #[test]
734 fn test_log_aggregator() {
735 let aggregator = LogAggregator::new(3600, 60);
736
737 let logs = vec![
738 serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
739 serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
740 serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
741 ];
742
743 let counts = aggregator.aggregate_by_level(&logs);
744 assert_eq!(counts.get("INFO"), Some(&2));
745 assert_eq!(counts.get("ERROR"), Some(&1));
746 }
747
748 #[test]
749 fn test_log_stats_calculation() {
750 let partitions = vec![
751 LogPartition {
752 partition_name: "logs_p0".to_string(),
753 table_name: "logs".to_string(),
754 start_date: "2024-01-01".to_string(),
755 end_date: Some("2024-01-02".to_string()),
756 row_count: 1000,
757 size_mb: 100.0,
758 is_active: false,
759 compressed: false,
760 },
761 LogPartition {
762 partition_name: "logs_p1".to_string(),
763 table_name: "logs".to_string(),
764 start_date: "2024-01-02".to_string(),
765 end_date: None,
766 row_count: 500,
767 size_mb: 50.0,
768 is_active: true,
769 compressed: true,
770 },
771 ];
772
773 let total_rows: u64 = partitions.iter().map(|p| p.row_count).sum();
774 assert_eq!(total_rows, 1500);
775
776 let compressed_ratio = partitions.iter().filter(|p| p.compressed).count() as f64 / partitions.len() as f64;
777 assert_eq!(compressed_ratio, 0.5);
778 }
779
780 #[test]
781 fn test_error_spike_detection() {
782 let aggregator = LogAggregator::new(3600, 60);
783
784 let logs = vec![
785 serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
786 serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
787 serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
788 serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
789 ];
790
791 let spikes = aggregator.find_error_spikes(&logs, 2);
792 assert!(!spikes.is_empty());
793 assert_eq!(spikes[0].error_count, 3);
794 }
795
796 #[test]
797 fn test_error_rate_calculation() {
798 let aggregator = LogAggregator::new(3600, 60);
799
800 let logs = vec![
801 serde_json::json!({"level": "ERROR", "timestamp": "2024-01-01T00:00:00Z"}),
802 serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
803 serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
804 serde_json::json!({"level": "INFO", "timestamp": "2024-01-01T00:00:00Z"}),
805 ];
806
807 let rate = aggregator.calculate_error_rate(&logs);
808 assert_eq!(rate, 0.25);
809 }
810
811 #[test]
812 fn test_log_entry_creation() {
813 let entry = LogEntry {
814 level: "ERROR".to_string(),
815 message: "Test error".to_string(),
816 context: Some(serde_json::json!({"user_id": 123})),
817 source: Some("test_module".to_string()),
818 };
819
820 assert_eq!(entry.level, "ERROR");
821 assert_eq!(entry.message, "Test error");
822 }
823}