1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::infrastructure::persistence::storage::ParquetStorage;
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12pub struct CompactionManager {
14 storage_dir: PathBuf,
16
17 config: CompactionConfig,
19
20 stats: Arc<RwLock<CompactionStats>>,
22
23 last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
25}
26
27#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29 pub min_files_to_compact: usize,
31
32 pub target_file_size: usize,
34
35 pub max_file_size: usize,
37
38 pub small_file_threshold: usize,
40
41 pub compaction_interval_seconds: u64,
43
44 pub auto_compact: bool,
46
47 pub strategy: CompactionStrategy,
49}
50
51impl Default for CompactionConfig {
52 fn default() -> Self {
53 Self {
54 min_files_to_compact: 3,
55 target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
60 strategy: CompactionStrategy::SizeBased,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
66#[serde(rename_all = "lowercase")]
67pub enum CompactionStrategy {
68 SizeBased,
70 TimeBased,
72 FullCompaction,
74}
75
76#[derive(Debug, Clone, Default, Serialize)]
77pub struct CompactionStats {
78 pub total_compactions: u64,
79 pub total_files_compacted: u64,
80 pub total_bytes_before: u64,
81 pub total_bytes_after: u64,
82 pub total_events_compacted: u64,
83 pub last_compaction_duration_ms: u64,
84 pub space_saved_bytes: u64,
85}
86
87#[derive(Debug, Clone)]
89struct FileInfo {
90 path: PathBuf,
91 size: u64,
92 created: DateTime<Utc>,
93}
94
95impl CompactionManager {
96 pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
98 let storage_dir = storage_dir.into();
99
100 tracing::info!(
101 "✅ Compaction manager initialized at: {}",
102 storage_dir.display()
103 );
104
105 Self {
106 storage_dir,
107 config,
108 stats: Arc::new(RwLock::new(CompactionStats::default())),
109 last_compaction: Arc::new(RwLock::new(None)),
110 }
111 }
112
113 fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
115 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
116 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
117 })?;
118
119 let mut files = Vec::new();
120
121 for entry in entries {
122 let entry = entry.map_err(|e| {
123 AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
124 })?;
125
126 let path = entry.path();
127 if let Some(ext) = path.extension() {
128 if ext == "parquet" {
129 let metadata = entry.metadata().map_err(|e| {
130 AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
131 })?;
132
133 let size = metadata.len();
134 let created = metadata
135 .created()
136 .ok()
137 .and_then(|t| {
138 t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
139 DateTime::from_timestamp(d.as_secs() as i64, 0)
140 .unwrap_or_else(Utc::now)
141 })
142 })
143 .unwrap_or_else(Utc::now);
144
145 files.push(FileInfo {
146 path,
147 size,
148 created,
149 });
150 }
151 }
152 }
153
154 files.sort_by_key(|f| f.created);
156
157 Ok(files)
158 }
159
160 fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
162 match self.config.strategy {
163 CompactionStrategy::SizeBased => self.select_small_files(files),
164 CompactionStrategy::TimeBased => self.select_old_files(files),
165 CompactionStrategy::FullCompaction => files.to_vec(),
166 }
167 }
168
169 fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
171 let small_files: Vec<FileInfo> = files
172 .iter()
173 .filter(|f| f.size < self.config.small_file_threshold as u64)
174 .cloned()
175 .collect();
176
177 if small_files.len() >= self.config.min_files_to_compact {
179 small_files
180 } else {
181 Vec::new()
182 }
183 }
184
185 fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
187 let now = Utc::now();
188 let age_threshold = chrono::Duration::hours(24); let old_files: Vec<FileInfo> = files
191 .iter()
192 .filter(|f| now - f.created > age_threshold)
193 .cloned()
194 .collect();
195
196 if old_files.len() >= self.config.min_files_to_compact {
197 old_files
198 } else {
199 Vec::new()
200 }
201 }
202
203 pub fn should_compact(&self) -> bool {
205 if !self.config.auto_compact {
206 return false;
207 }
208
209 let last = self.last_compaction.read();
210 match *last {
211 None => true, Some(last_time) => {
213 let elapsed = (Utc::now() - last_time).num_seconds();
214 elapsed >= self.config.compaction_interval_seconds as i64
215 }
216 }
217 }
218
219 pub fn compact(&self) -> Result<CompactionResult> {
221 let start_time = std::time::Instant::now();
222 tracing::info!("🔄 Starting Parquet compaction...");
223
224 let files = self.list_parquet_files()?;
226
227 if files.is_empty() {
228 tracing::debug!("No Parquet files to compact");
229 return Ok(CompactionResult {
230 files_compacted: 0,
231 bytes_before: 0,
232 bytes_after: 0,
233 events_compacted: 0,
234 duration_ms: 0,
235 });
236 }
237
238 let files_to_compact = self.select_files_for_compaction(&files);
240
241 if files_to_compact.is_empty() {
242 tracing::debug!(
243 "No files meet compaction criteria (strategy: {:?})",
244 self.config.strategy
245 );
246 return Ok(CompactionResult {
247 files_compacted: 0,
248 bytes_before: 0,
249 bytes_after: 0,
250 events_compacted: 0,
251 duration_ms: 0,
252 });
253 }
254
255 let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
256
257 tracing::info!(
258 "Compacting {} files ({:.2} MB)",
259 files_to_compact.len(),
260 bytes_before as f64 / (1024.0 * 1024.0)
261 );
262
263 let mut all_events = Vec::new();
265 for file_info in &files_to_compact {
266 match self.read_parquet_file(&file_info.path) {
267 Ok(mut events) => {
268 all_events.append(&mut events);
269 }
270 Err(e) => {
271 tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
272 }
274 }
275 }
276
277 if all_events.is_empty() {
278 tracing::warn!("No events read from files to compact");
279 return Ok(CompactionResult {
280 files_compacted: 0,
281 bytes_before,
282 bytes_after: 0,
283 events_compacted: 0,
284 duration_ms: start_time.elapsed().as_millis() as u64,
285 });
286 }
287
288 all_events.sort_by_key(|e| e.timestamp);
290
291 tracing::debug!("Read {} events for compaction", all_events.len());
292
293 let compacted_files = self.write_compacted_files(&all_events)?;
295
296 let bytes_after: u64 = compacted_files
297 .iter()
298 .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
299 .sum();
300
301 for file_info in &files_to_compact {
303 if let Err(e) = fs::remove_file(&file_info.path) {
304 tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
305 } else {
306 tracing::debug!("Removed old file: {:?}", file_info.path);
307 }
308 }
309
310 let duration_ms = start_time.elapsed().as_millis() as u64;
311
312 let mut stats = self.stats.write();
314 stats.total_compactions += 1;
315 stats.total_files_compacted += files_to_compact.len() as u64;
316 stats.total_bytes_before += bytes_before;
317 stats.total_bytes_after += bytes_after;
318 stats.total_events_compacted += all_events.len() as u64;
319 stats.last_compaction_duration_ms = duration_ms;
320 stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
321 drop(stats);
322
323 *self.last_compaction.write() = Some(Utc::now());
325
326 let compression_ratio = if bytes_before > 0 {
327 (bytes_after as f64 / bytes_before as f64) * 100.0
328 } else {
329 100.0
330 };
331
332 tracing::info!(
333 "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
334 files_to_compact.len(),
335 compacted_files.len(),
336 bytes_before as f64 / (1024.0 * 1024.0),
337 bytes_after as f64 / (1024.0 * 1024.0),
338 compression_ratio,
339 all_events.len(),
340 duration_ms
341 );
342
343 Ok(CompactionResult {
344 files_compacted: files_to_compact.len(),
345 bytes_before,
346 bytes_after,
347 events_compacted: all_events.len(),
348 duration_ms,
349 })
350 }
351
352 fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
354 let storage = ParquetStorage::new(&self.storage_dir)?;
356
357 let all_events = storage.load_all_events()?;
360
361 Ok(all_events)
362 }
363
364 fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
366 let mut compacted_files = Vec::new();
367 let mut current_batch = Vec::new();
368 let mut current_size = 0;
369
370 for event in events {
371 let event_size = serde_json::to_string(event)
373 .map(|s| s.len())
374 .unwrap_or(1024);
375
376 if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
378 {
379 let file_path = self.write_batch(¤t_batch)?;
381 compacted_files.push(file_path);
382
383 current_batch.clear();
385 current_size = 0;
386 }
387
388 current_batch.push(event.clone());
389 current_size += event_size;
390
391 if current_size >= self.config.max_file_size {
393 let file_path = self.write_batch(¤t_batch)?;
394 compacted_files.push(file_path);
395
396 current_batch.clear();
397 current_size = 0;
398 }
399 }
400
401 if !current_batch.is_empty() {
403 let file_path = self.write_batch(¤t_batch)?;
404 compacted_files.push(file_path);
405 }
406
407 Ok(compacted_files)
408 }
409
410 fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
412 let storage = ParquetStorage::new(&self.storage_dir)?;
413
414 let filename = format!(
416 "events-compacted-{}.parquet",
417 Utc::now().format("%Y%m%d-%H%M%S-%f")
418 );
419 let file_path = self.storage_dir.join(filename);
420
421 for event in events {
423 storage.append_event(event.clone())?;
424 }
425
426 storage.flush()?;
428
429 tracing::debug!(
430 "Wrote compacted file: {:?} ({} events)",
431 file_path,
432 events.len()
433 );
434
435 Ok(file_path)
436 }
437
438 pub fn stats(&self) -> CompactionStats {
440 (*self.stats.read()).clone()
441 }
442
443 pub fn config(&self) -> &CompactionConfig {
445 &self.config
446 }
447
448 pub fn compact_now(&self) -> Result<CompactionResult> {
450 tracing::info!("Manual compaction triggered");
451 self.compact()
452 }
453}
454
455#[derive(Debug, Clone, Serialize)]
457pub struct CompactionResult {
458 pub files_compacted: usize,
459 pub bytes_before: u64,
460 pub bytes_after: u64,
461 pub events_compacted: usize,
462 pub duration_ms: u64,
463}
464
465pub struct CompactionTask {
467 manager: Arc<CompactionManager>,
468 interval: Duration,
469}
470
471impl CompactionTask {
472 pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
474 Self {
475 manager,
476 interval: Duration::from_secs(interval_seconds),
477 }
478 }
479
480 pub async fn run(self) {
482 let mut interval = tokio::time::interval(self.interval);
483
484 loop {
485 interval.tick().await;
486
487 if self.manager.should_compact() {
488 tracing::debug!("Auto-compaction check triggered");
489
490 match self.manager.compact() {
491 Ok(result) => {
492 if result.files_compacted > 0 {
493 tracing::info!(
494 "Auto-compaction succeeded: {} files, {:.2} MB saved",
495 result.files_compacted,
496 (result.bytes_before - result.bytes_after) as f64
497 / (1024.0 * 1024.0)
498 );
499 }
500 }
501 Err(e) => {
502 tracing::error!("Auto-compaction failed: {}", e);
503 }
504 }
505 }
506 }
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use tempfile::TempDir;
514
515 #[test]
516 fn test_compaction_manager_creation() {
517 let temp_dir = TempDir::new().unwrap();
518 let config = CompactionConfig::default();
519 let manager = CompactionManager::new(temp_dir.path(), config);
520
521 assert_eq!(manager.stats().total_compactions, 0);
522 }
523
524 #[test]
525 fn test_should_compact() {
526 let temp_dir = TempDir::new().unwrap();
527 let config = CompactionConfig {
528 auto_compact: true,
529 compaction_interval_seconds: 1,
530 ..Default::default()
531 };
532 let manager = CompactionManager::new(temp_dir.path(), config);
533
534 assert!(manager.should_compact());
536 }
537
538 #[test]
539 fn test_file_selection_size_based() {
540 let temp_dir = TempDir::new().unwrap();
541 let config = CompactionConfig {
542 small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
544 strategy: CompactionStrategy::SizeBased,
545 ..Default::default()
546 };
547 let manager = CompactionManager::new(temp_dir.path(), config);
548
549 let files = vec![
550 FileInfo {
551 path: PathBuf::from("small1.parquet"),
552 size: 500_000, created: Utc::now(),
554 },
555 FileInfo {
556 path: PathBuf::from("small2.parquet"),
557 size: 600_000, created: Utc::now(),
559 },
560 FileInfo {
561 path: PathBuf::from("large.parquet"),
562 size: 10_000_000, created: Utc::now(),
564 },
565 ];
566
567 let selected = manager.select_files_for_compaction(&files);
568 assert_eq!(selected.len(), 2); }
570
571 #[test]
572 fn test_default_compaction_config() {
573 let config = CompactionConfig::default();
574 assert_eq!(config.min_files_to_compact, 3);
575 assert_eq!(config.target_file_size, 128 * 1024 * 1024);
576 assert_eq!(config.max_file_size, 256 * 1024 * 1024);
577 assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
578 assert_eq!(config.compaction_interval_seconds, 3600);
579 assert!(config.auto_compact);
580 assert_eq!(config.strategy, CompactionStrategy::SizeBased);
581 }
582
583 #[test]
584 fn test_should_compact_disabled() {
585 let temp_dir = TempDir::new().unwrap();
586 let config = CompactionConfig {
587 auto_compact: false,
588 ..Default::default()
589 };
590 let manager = CompactionManager::new(temp_dir.path(), config);
591
592 assert!(!manager.should_compact());
593 }
594
595 #[test]
596 fn test_compact_empty_directory() {
597 let temp_dir = TempDir::new().unwrap();
598 let config = CompactionConfig::default();
599 let manager = CompactionManager::new(temp_dir.path(), config);
600
601 let result = manager.compact().unwrap();
602 assert_eq!(result.files_compacted, 0);
603 assert_eq!(result.bytes_before, 0);
604 assert_eq!(result.bytes_after, 0);
605 assert_eq!(result.events_compacted, 0);
606 }
607
608 #[test]
609 fn test_compact_now() {
610 let temp_dir = TempDir::new().unwrap();
611 let config = CompactionConfig::default();
612 let manager = CompactionManager::new(temp_dir.path(), config);
613
614 let result = manager.compact_now().unwrap();
615 assert_eq!(result.files_compacted, 0);
616 }
617
618 #[test]
619 fn test_get_config() {
620 let temp_dir = TempDir::new().unwrap();
621 let config = CompactionConfig {
622 min_files_to_compact: 5,
623 ..Default::default()
624 };
625 let manager = CompactionManager::new(temp_dir.path(), config);
626
627 assert_eq!(manager.config().min_files_to_compact, 5);
628 }
629
630 #[test]
631 fn test_get_stats() {
632 let temp_dir = TempDir::new().unwrap();
633 let config = CompactionConfig::default();
634 let manager = CompactionManager::new(temp_dir.path(), config);
635
636 let stats = manager.stats();
637 assert_eq!(stats.total_compactions, 0);
638 assert_eq!(stats.total_files_compacted, 0);
639 assert_eq!(stats.total_bytes_before, 0);
640 assert_eq!(stats.total_bytes_after, 0);
641 assert_eq!(stats.total_events_compacted, 0);
642 assert_eq!(stats.last_compaction_duration_ms, 0);
643 assert_eq!(stats.space_saved_bytes, 0);
644 }
645
646 #[test]
647 fn test_file_selection_not_enough_small_files() {
648 let temp_dir = TempDir::new().unwrap();
649 let config = CompactionConfig {
650 small_file_threshold: 1024 * 1024,
651 min_files_to_compact: 3, strategy: CompactionStrategy::SizeBased,
653 ..Default::default()
654 };
655 let manager = CompactionManager::new(temp_dir.path(), config);
656
657 let files = vec![
658 FileInfo {
659 path: PathBuf::from("small1.parquet"),
660 size: 500_000,
661 created: Utc::now(),
662 },
663 FileInfo {
664 path: PathBuf::from("small2.parquet"),
665 size: 600_000,
666 created: Utc::now(),
667 },
668 ];
669
670 let selected = manager.select_files_for_compaction(&files);
671 assert_eq!(selected.len(), 0); }
673
674 #[test]
675 fn test_file_selection_time_based() {
676 let temp_dir = TempDir::new().unwrap();
677 let config = CompactionConfig {
678 min_files_to_compact: 2,
679 strategy: CompactionStrategy::TimeBased,
680 ..Default::default()
681 };
682 let manager = CompactionManager::new(temp_dir.path(), config);
683
684 let old_time = Utc::now() - chrono::Duration::hours(48);
685 let files = vec![
686 FileInfo {
687 path: PathBuf::from("old1.parquet"),
688 size: 1_000_000,
689 created: old_time,
690 },
691 FileInfo {
692 path: PathBuf::from("old2.parquet"),
693 size: 2_000_000,
694 created: old_time,
695 },
696 FileInfo {
697 path: PathBuf::from("new.parquet"),
698 size: 500_000,
699 created: Utc::now(),
700 },
701 ];
702
703 let selected = manager.select_files_for_compaction(&files);
704 assert_eq!(selected.len(), 2); }
706
707 #[test]
708 fn test_file_selection_time_based_not_enough() {
709 let temp_dir = TempDir::new().unwrap();
710 let config = CompactionConfig {
711 min_files_to_compact: 3,
712 strategy: CompactionStrategy::TimeBased,
713 ..Default::default()
714 };
715 let manager = CompactionManager::new(temp_dir.path(), config);
716
717 let old_time = Utc::now() - chrono::Duration::hours(48);
718 let files = vec![
719 FileInfo {
720 path: PathBuf::from("old1.parquet"),
721 size: 1_000_000,
722 created: old_time,
723 },
724 FileInfo {
725 path: PathBuf::from("new.parquet"),
726 size: 500_000,
727 created: Utc::now(),
728 },
729 ];
730
731 let selected = manager.select_files_for_compaction(&files);
732 assert_eq!(selected.len(), 0); }
734
735 #[test]
736 fn test_file_selection_full_compaction() {
737 let temp_dir = TempDir::new().unwrap();
738 let config = CompactionConfig {
739 strategy: CompactionStrategy::FullCompaction,
740 ..Default::default()
741 };
742 let manager = CompactionManager::new(temp_dir.path(), config);
743
744 let files = vec![
745 FileInfo {
746 path: PathBuf::from("file1.parquet"),
747 size: 1_000_000,
748 created: Utc::now(),
749 },
750 FileInfo {
751 path: PathBuf::from("file2.parquet"),
752 size: 2_000_000,
753 created: Utc::now(),
754 },
755 ];
756
757 let selected = manager.select_files_for_compaction(&files);
758 assert_eq!(selected.len(), 2); }
760
761 #[test]
762 fn test_compaction_strategy_serde() {
763 let strategies = vec![
764 CompactionStrategy::SizeBased,
765 CompactionStrategy::TimeBased,
766 CompactionStrategy::FullCompaction,
767 ];
768
769 for strategy in strategies {
770 let json = serde_json::to_string(&strategy).unwrap();
771 let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
772 assert_eq!(parsed, strategy);
773 }
774 }
775
776 #[test]
777 fn test_compaction_stats_default() {
778 let stats = CompactionStats::default();
779 assert_eq!(stats.total_compactions, 0);
780 assert_eq!(stats.total_files_compacted, 0);
781 }
782
783 #[test]
784 fn test_compaction_stats_serde() {
785 let stats = CompactionStats {
786 total_compactions: 5,
787 total_files_compacted: 20,
788 total_bytes_before: 1000000,
789 total_bytes_after: 500000,
790 total_events_compacted: 10000,
791 last_compaction_duration_ms: 500,
792 space_saved_bytes: 500000,
793 };
794
795 let json = serde_json::to_string(&stats).unwrap();
796 assert!(json.contains("\"total_compactions\":5"));
797 assert!(json.contains("\"space_saved_bytes\":500000"));
798 }
799
800 #[test]
801 fn test_compaction_result_serde() {
802 let result = CompactionResult {
803 files_compacted: 3,
804 bytes_before: 1000000,
805 bytes_after: 500000,
806 events_compacted: 5000,
807 duration_ms: 250,
808 };
809
810 let json = serde_json::to_string(&result).unwrap();
811 assert!(json.contains("\"files_compacted\":3"));
812 assert!(json.contains("\"bytes_before\":1000000"));
813 }
814
815 #[test]
816 fn test_compaction_task_creation() {
817 let temp_dir = TempDir::new().unwrap();
818 let config = CompactionConfig::default();
819 let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
820
821 let _task = CompactionTask::new(manager.clone(), 60);
822 }
824
825 #[test]
826 fn test_list_parquet_files_empty() {
827 let temp_dir = TempDir::new().unwrap();
828 let config = CompactionConfig::default();
829 let manager = CompactionManager::new(temp_dir.path(), config);
830
831 let files = manager.list_parquet_files().unwrap();
832 assert!(files.is_empty());
833 }
834
835 #[test]
836 fn test_list_parquet_files_with_non_parquet() {
837 let temp_dir = TempDir::new().unwrap();
838 let config = CompactionConfig::default();
839 let manager = CompactionManager::new(temp_dir.path(), config);
840
841 std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
843 std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
844
845 let files = manager.list_parquet_files().unwrap();
846 assert!(files.is_empty()); }
848}