1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4 infrastructure::persistence::storage::ParquetStorage,
5};
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::{
10 fs,
11 path::{Path, PathBuf},
12 sync::Arc,
13 time::Duration,
14};
15
16pub struct CompactionManager {
18 storage_dir: PathBuf,
20
21 config: CompactionConfig,
23
24 stats: Arc<RwLock<CompactionStats>>,
26
27 last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct CompactionConfig {
33 pub min_files_to_compact: usize,
35
36 pub target_file_size: usize,
38
39 pub max_file_size: usize,
41
42 pub small_file_threshold: usize,
44
45 pub compaction_interval_seconds: u64,
47
48 pub auto_compact: bool,
50
51 pub strategy: CompactionStrategy,
53}
54
55impl Default for CompactionConfig {
56 fn default() -> Self {
57 Self {
58 min_files_to_compact: 3,
59 target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
64 strategy: CompactionStrategy::SizeBased,
65 }
66 }
67}
68
69#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
70#[serde(rename_all = "lowercase")]
71pub enum CompactionStrategy {
72 SizeBased,
74 TimeBased,
76 FullCompaction,
78}
79
80#[derive(Debug, Clone, Default, Serialize)]
81pub struct CompactionStats {
82 pub total_compactions: u64,
83 pub total_files_compacted: u64,
84 pub total_bytes_before: u64,
85 pub total_bytes_after: u64,
86 pub total_events_compacted: u64,
87 pub last_compaction_duration_ms: u64,
88 pub space_saved_bytes: u64,
89}
90
91#[derive(Debug, Clone)]
93struct FileInfo {
94 path: PathBuf,
95 size: u64,
96 created: DateTime<Utc>,
97}
98
99impl CompactionManager {
100 pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
102 let storage_dir = storage_dir.into();
103
104 tracing::info!(
105 "✅ Compaction manager initialized at: {}",
106 storage_dir.display()
107 );
108
109 Self {
110 storage_dir,
111 config,
112 stats: Arc::new(RwLock::new(CompactionStats::default())),
113 last_compaction: Arc::new(RwLock::new(None)),
114 }
115 }
116
117 fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
119 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
120 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
121 })?;
122
123 let mut files = Vec::new();
124
125 for entry in entries {
126 let entry = entry.map_err(|e| {
127 AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
128 })?;
129
130 let path = entry.path();
131 if let Some(ext) = path.extension()
132 && ext == "parquet"
133 {
134 let metadata = entry.metadata().map_err(|e| {
135 AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
136 })?;
137
138 let size = metadata.len();
139 let created = metadata
140 .created()
141 .ok()
142 .and_then(|t| {
143 t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
144 DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
145 })
146 })
147 .unwrap_or_else(Utc::now);
148
149 files.push(FileInfo {
150 path,
151 size,
152 created,
153 });
154 }
155 }
156
157 files.sort_by_key(|f| f.created);
159
160 Ok(files)
161 }
162
163 fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
165 match self.config.strategy {
166 CompactionStrategy::SizeBased => self.select_small_files(files),
167 CompactionStrategy::TimeBased => self.select_old_files(files),
168 CompactionStrategy::FullCompaction => files.to_vec(),
169 }
170 }
171
172 fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
174 let small_files: Vec<FileInfo> = files
175 .iter()
176 .filter(|f| f.size < self.config.small_file_threshold as u64)
177 .cloned()
178 .collect();
179
180 if small_files.len() >= self.config.min_files_to_compact {
182 small_files
183 } else {
184 Vec::new()
185 }
186 }
187
188 fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
190 let now = Utc::now();
191 let age_threshold = chrono::Duration::hours(24); let old_files: Vec<FileInfo> = files
194 .iter()
195 .filter(|f| now - f.created > age_threshold)
196 .cloned()
197 .collect();
198
199 if old_files.len() >= self.config.min_files_to_compact {
200 old_files
201 } else {
202 Vec::new()
203 }
204 }
205
206 pub fn should_compact(&self) -> bool {
208 if !self.config.auto_compact {
209 return false;
210 }
211
212 let last = self.last_compaction.read();
213 match *last {
214 None => true, Some(last_time) => {
216 let elapsed = (Utc::now() - last_time).num_seconds();
217 elapsed >= self.config.compaction_interval_seconds as i64
218 }
219 }
220 }
221
222 pub fn compact(&self) -> Result<CompactionResult> {
224 let start_time = std::time::Instant::now();
225 tracing::info!("🔄 Starting Parquet compaction...");
226
227 let files = self.list_parquet_files()?;
229
230 if files.is_empty() {
231 tracing::debug!("No Parquet files to compact");
232 return Ok(CompactionResult {
233 files_compacted: 0,
234 bytes_before: 0,
235 bytes_after: 0,
236 events_compacted: 0,
237 duration_ms: 0,
238 });
239 }
240
241 let files_to_compact = self.select_files_for_compaction(&files);
243
244 if files_to_compact.is_empty() {
245 tracing::debug!(
246 "No files meet compaction criteria (strategy: {:?})",
247 self.config.strategy
248 );
249 return Ok(CompactionResult {
250 files_compacted: 0,
251 bytes_before: 0,
252 bytes_after: 0,
253 events_compacted: 0,
254 duration_ms: 0,
255 });
256 }
257
258 let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
259
260 tracing::info!(
261 "Compacting {} files ({:.2} MB)",
262 files_to_compact.len(),
263 bytes_before as f64 / (1024.0 * 1024.0)
264 );
265
266 let mut all_events = Vec::new();
268 for file_info in &files_to_compact {
269 match self.read_parquet_file(&file_info.path) {
270 Ok(mut events) => {
271 all_events.append(&mut events);
272 }
273 Err(e) => {
274 tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
275 }
277 }
278 }
279
280 if all_events.is_empty() {
281 tracing::warn!("No events read from files to compact");
282 return Ok(CompactionResult {
283 files_compacted: 0,
284 bytes_before,
285 bytes_after: 0,
286 events_compacted: 0,
287 duration_ms: start_time.elapsed().as_millis() as u64,
288 });
289 }
290
291 all_events.sort_by_key(|e| e.timestamp);
293
294 tracing::debug!("Read {} events for compaction", all_events.len());
295
296 let compacted_files = self.write_compacted_files(&all_events)?;
298
299 let bytes_after: u64 = compacted_files
300 .iter()
301 .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
302 .sum();
303
304 for file_info in &files_to_compact {
306 if let Err(e) = fs::remove_file(&file_info.path) {
307 tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
308 } else {
309 tracing::debug!("Removed old file: {:?}", file_info.path);
310 }
311 }
312
313 let duration_ms = start_time.elapsed().as_millis() as u64;
314
315 let mut stats = self.stats.write();
317 stats.total_compactions += 1;
318 stats.total_files_compacted += files_to_compact.len() as u64;
319 stats.total_bytes_before += bytes_before;
320 stats.total_bytes_after += bytes_after;
321 stats.total_events_compacted += all_events.len() as u64;
322 stats.last_compaction_duration_ms = duration_ms;
323 stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
324 drop(stats);
325
326 *self.last_compaction.write() = Some(Utc::now());
328
329 let compression_ratio = if bytes_before > 0 {
330 (bytes_after as f64 / bytes_before as f64) * 100.0
331 } else {
332 100.0
333 };
334
335 tracing::info!(
336 "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
337 files_to_compact.len(),
338 compacted_files.len(),
339 bytes_before as f64 / (1024.0 * 1024.0),
340 bytes_after as f64 / (1024.0 * 1024.0),
341 compression_ratio,
342 all_events.len(),
343 duration_ms
344 );
345
346 Ok(CompactionResult {
347 files_compacted: files_to_compact.len(),
348 bytes_before,
349 bytes_after,
350 events_compacted: all_events.len(),
351 duration_ms,
352 })
353 }
354
355 fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
357 let storage = ParquetStorage::new(&self.storage_dir)?;
359
360 let all_events = storage.load_all_events()?;
363
364 Ok(all_events)
365 }
366
367 fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
369 let mut compacted_files = Vec::new();
370 let mut current_batch = Vec::new();
371 let mut current_size = 0;
372
373 for event in events {
374 let event_size = serde_json::to_string(event)
376 .map(|s| s.len())
377 .unwrap_or(1024);
378
379 if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
381 {
382 let file_path = self.write_batch(¤t_batch)?;
384 compacted_files.push(file_path);
385
386 current_batch.clear();
388 current_size = 0;
389 }
390
391 current_batch.push(event.clone());
392 current_size += event_size;
393
394 if current_size >= self.config.max_file_size {
396 let file_path = self.write_batch(¤t_batch)?;
397 compacted_files.push(file_path);
398
399 current_batch.clear();
400 current_size = 0;
401 }
402 }
403
404 if !current_batch.is_empty() {
406 let file_path = self.write_batch(¤t_batch)?;
407 compacted_files.push(file_path);
408 }
409
410 Ok(compacted_files)
411 }
412
413 fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
415 let storage = ParquetStorage::new(&self.storage_dir)?;
416
417 let filename = format!(
419 "events-compacted-{}.parquet",
420 Utc::now().format("%Y%m%d-%H%M%S-%f")
421 );
422 let file_path = self.storage_dir.join(filename);
423
424 for event in events {
426 storage.append_event(event.clone())?;
427 }
428
429 storage.flush()?;
431
432 tracing::debug!(
433 "Wrote compacted file: {:?} ({} events)",
434 file_path,
435 events.len()
436 );
437
438 Ok(file_path)
439 }
440
441 pub fn stats(&self) -> CompactionStats {
443 (*self.stats.read()).clone()
444 }
445
446 pub fn config(&self) -> &CompactionConfig {
448 &self.config
449 }
450
451 pub fn compact_now(&self) -> Result<CompactionResult> {
453 tracing::info!("Manual compaction triggered");
454 self.compact()
455 }
456}
457
458#[derive(Debug, Clone, Serialize)]
460pub struct CompactionResult {
461 pub files_compacted: usize,
462 pub bytes_before: u64,
463 pub bytes_after: u64,
464 pub events_compacted: usize,
465 pub duration_ms: u64,
466}
467
468pub struct CompactionTask {
470 manager: Arc<CompactionManager>,
471 interval: Duration,
472}
473
474impl CompactionTask {
475 pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
477 Self {
478 manager,
479 interval: Duration::from_secs(interval_seconds),
480 }
481 }
482
483 pub async fn run(self) {
485 let mut interval = tokio::time::interval(self.interval);
486
487 loop {
488 interval.tick().await;
489
490 if self.manager.should_compact() {
491 tracing::debug!("Auto-compaction check triggered");
492
493 match self.manager.compact() {
494 Ok(result) => {
495 if result.files_compacted > 0 {
496 tracing::info!(
497 "Auto-compaction succeeded: {} files, {:.2} MB saved",
498 result.files_compacted,
499 (result.bytes_before - result.bytes_after) as f64
500 / (1024.0 * 1024.0)
501 );
502 }
503 }
504 Err(e) => {
505 tracing::error!("Auto-compaction failed: {}", e);
506 }
507 }
508 }
509 }
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use tempfile::TempDir;
517
518 #[test]
519 fn test_compaction_manager_creation() {
520 let temp_dir = TempDir::new().unwrap();
521 let config = CompactionConfig::default();
522 let manager = CompactionManager::new(temp_dir.path(), config);
523
524 assert_eq!(manager.stats().total_compactions, 0);
525 }
526
527 #[test]
528 fn test_should_compact() {
529 let temp_dir = TempDir::new().unwrap();
530 let config = CompactionConfig {
531 auto_compact: true,
532 compaction_interval_seconds: 1,
533 ..Default::default()
534 };
535 let manager = CompactionManager::new(temp_dir.path(), config);
536
537 assert!(manager.should_compact());
539 }
540
541 #[test]
542 fn test_file_selection_size_based() {
543 let temp_dir = TempDir::new().unwrap();
544 let config = CompactionConfig {
545 small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
547 strategy: CompactionStrategy::SizeBased,
548 ..Default::default()
549 };
550 let manager = CompactionManager::new(temp_dir.path(), config);
551
552 let files = vec![
553 FileInfo {
554 path: PathBuf::from("small1.parquet"),
555 size: 500_000, created: Utc::now(),
557 },
558 FileInfo {
559 path: PathBuf::from("small2.parquet"),
560 size: 600_000, created: Utc::now(),
562 },
563 FileInfo {
564 path: PathBuf::from("large.parquet"),
565 size: 10_000_000, created: Utc::now(),
567 },
568 ];
569
570 let selected = manager.select_files_for_compaction(&files);
571 assert_eq!(selected.len(), 2); }
573
574 #[test]
575 fn test_default_compaction_config() {
576 let config = CompactionConfig::default();
577 assert_eq!(config.min_files_to_compact, 3);
578 assert_eq!(config.target_file_size, 128 * 1024 * 1024);
579 assert_eq!(config.max_file_size, 256 * 1024 * 1024);
580 assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
581 assert_eq!(config.compaction_interval_seconds, 3600);
582 assert!(config.auto_compact);
583 assert_eq!(config.strategy, CompactionStrategy::SizeBased);
584 }
585
586 #[test]
587 fn test_should_compact_disabled() {
588 let temp_dir = TempDir::new().unwrap();
589 let config = CompactionConfig {
590 auto_compact: false,
591 ..Default::default()
592 };
593 let manager = CompactionManager::new(temp_dir.path(), config);
594
595 assert!(!manager.should_compact());
596 }
597
598 #[test]
599 fn test_compact_empty_directory() {
600 let temp_dir = TempDir::new().unwrap();
601 let config = CompactionConfig::default();
602 let manager = CompactionManager::new(temp_dir.path(), config);
603
604 let result = manager.compact().unwrap();
605 assert_eq!(result.files_compacted, 0);
606 assert_eq!(result.bytes_before, 0);
607 assert_eq!(result.bytes_after, 0);
608 assert_eq!(result.events_compacted, 0);
609 }
610
611 #[test]
612 fn test_compact_now() {
613 let temp_dir = TempDir::new().unwrap();
614 let config = CompactionConfig::default();
615 let manager = CompactionManager::new(temp_dir.path(), config);
616
617 let result = manager.compact_now().unwrap();
618 assert_eq!(result.files_compacted, 0);
619 }
620
621 #[test]
622 fn test_get_config() {
623 let temp_dir = TempDir::new().unwrap();
624 let config = CompactionConfig {
625 min_files_to_compact: 5,
626 ..Default::default()
627 };
628 let manager = CompactionManager::new(temp_dir.path(), config);
629
630 assert_eq!(manager.config().min_files_to_compact, 5);
631 }
632
633 #[test]
634 fn test_get_stats() {
635 let temp_dir = TempDir::new().unwrap();
636 let config = CompactionConfig::default();
637 let manager = CompactionManager::new(temp_dir.path(), config);
638
639 let stats = manager.stats();
640 assert_eq!(stats.total_compactions, 0);
641 assert_eq!(stats.total_files_compacted, 0);
642 assert_eq!(stats.total_bytes_before, 0);
643 assert_eq!(stats.total_bytes_after, 0);
644 assert_eq!(stats.total_events_compacted, 0);
645 assert_eq!(stats.last_compaction_duration_ms, 0);
646 assert_eq!(stats.space_saved_bytes, 0);
647 }
648
649 #[test]
650 fn test_file_selection_not_enough_small_files() {
651 let temp_dir = TempDir::new().unwrap();
652 let config = CompactionConfig {
653 small_file_threshold: 1024 * 1024,
654 min_files_to_compact: 3, strategy: CompactionStrategy::SizeBased,
656 ..Default::default()
657 };
658 let manager = CompactionManager::new(temp_dir.path(), config);
659
660 let files = vec![
661 FileInfo {
662 path: PathBuf::from("small1.parquet"),
663 size: 500_000,
664 created: Utc::now(),
665 },
666 FileInfo {
667 path: PathBuf::from("small2.parquet"),
668 size: 600_000,
669 created: Utc::now(),
670 },
671 ];
672
673 let selected = manager.select_files_for_compaction(&files);
674 assert_eq!(selected.len(), 0); }
676
677 #[test]
678 fn test_file_selection_time_based() {
679 let temp_dir = TempDir::new().unwrap();
680 let config = CompactionConfig {
681 min_files_to_compact: 2,
682 strategy: CompactionStrategy::TimeBased,
683 ..Default::default()
684 };
685 let manager = CompactionManager::new(temp_dir.path(), config);
686
687 let old_time = Utc::now() - chrono::Duration::hours(48);
688 let files = vec![
689 FileInfo {
690 path: PathBuf::from("old1.parquet"),
691 size: 1_000_000,
692 created: old_time,
693 },
694 FileInfo {
695 path: PathBuf::from("old2.parquet"),
696 size: 2_000_000,
697 created: old_time,
698 },
699 FileInfo {
700 path: PathBuf::from("new.parquet"),
701 size: 500_000,
702 created: Utc::now(),
703 },
704 ];
705
706 let selected = manager.select_files_for_compaction(&files);
707 assert_eq!(selected.len(), 2); }
709
710 #[test]
711 fn test_file_selection_time_based_not_enough() {
712 let temp_dir = TempDir::new().unwrap();
713 let config = CompactionConfig {
714 min_files_to_compact: 3,
715 strategy: CompactionStrategy::TimeBased,
716 ..Default::default()
717 };
718 let manager = CompactionManager::new(temp_dir.path(), config);
719
720 let old_time = Utc::now() - chrono::Duration::hours(48);
721 let files = vec![
722 FileInfo {
723 path: PathBuf::from("old1.parquet"),
724 size: 1_000_000,
725 created: old_time,
726 },
727 FileInfo {
728 path: PathBuf::from("new.parquet"),
729 size: 500_000,
730 created: Utc::now(),
731 },
732 ];
733
734 let selected = manager.select_files_for_compaction(&files);
735 assert_eq!(selected.len(), 0); }
737
738 #[test]
739 fn test_file_selection_full_compaction() {
740 let temp_dir = TempDir::new().unwrap();
741 let config = CompactionConfig {
742 strategy: CompactionStrategy::FullCompaction,
743 ..Default::default()
744 };
745 let manager = CompactionManager::new(temp_dir.path(), config);
746
747 let files = vec![
748 FileInfo {
749 path: PathBuf::from("file1.parquet"),
750 size: 1_000_000,
751 created: Utc::now(),
752 },
753 FileInfo {
754 path: PathBuf::from("file2.parquet"),
755 size: 2_000_000,
756 created: Utc::now(),
757 },
758 ];
759
760 let selected = manager.select_files_for_compaction(&files);
761 assert_eq!(selected.len(), 2); }
763
764 #[test]
765 fn test_compaction_strategy_serde() {
766 let strategies = vec![
767 CompactionStrategy::SizeBased,
768 CompactionStrategy::TimeBased,
769 CompactionStrategy::FullCompaction,
770 ];
771
772 for strategy in strategies {
773 let json = serde_json::to_string(&strategy).unwrap();
774 let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
775 assert_eq!(parsed, strategy);
776 }
777 }
778
779 #[test]
780 fn test_compaction_stats_default() {
781 let stats = CompactionStats::default();
782 assert_eq!(stats.total_compactions, 0);
783 assert_eq!(stats.total_files_compacted, 0);
784 }
785
786 #[test]
787 fn test_compaction_stats_serde() {
788 let stats = CompactionStats {
789 total_compactions: 5,
790 total_files_compacted: 20,
791 total_bytes_before: 1000000,
792 total_bytes_after: 500000,
793 total_events_compacted: 10000,
794 last_compaction_duration_ms: 500,
795 space_saved_bytes: 500000,
796 };
797
798 let json = serde_json::to_string(&stats).unwrap();
799 assert!(json.contains("\"total_compactions\":5"));
800 assert!(json.contains("\"space_saved_bytes\":500000"));
801 }
802
803 #[test]
804 fn test_compaction_result_serde() {
805 let result = CompactionResult {
806 files_compacted: 3,
807 bytes_before: 1000000,
808 bytes_after: 500000,
809 events_compacted: 5000,
810 duration_ms: 250,
811 };
812
813 let json = serde_json::to_string(&result).unwrap();
814 assert!(json.contains("\"files_compacted\":3"));
815 assert!(json.contains("\"bytes_before\":1000000"));
816 }
817
818 #[test]
819 fn test_compaction_task_creation() {
820 let temp_dir = TempDir::new().unwrap();
821 let config = CompactionConfig::default();
822 let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
823
824 let _task = CompactionTask::new(manager.clone(), 60);
825 }
827
828 #[test]
829 fn test_list_parquet_files_empty() {
830 let temp_dir = TempDir::new().unwrap();
831 let config = CompactionConfig::default();
832 let manager = CompactionManager::new(temp_dir.path(), config);
833
834 let files = manager.list_parquet_files().unwrap();
835 assert!(files.is_empty());
836 }
837
838 #[test]
839 fn test_list_parquet_files_with_non_parquet() {
840 let temp_dir = TempDir::new().unwrap();
841 let config = CompactionConfig::default();
842 let manager = CompactionManager::new(temp_dir.path(), config);
843
844 std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
846 std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
847
848 let files = manager.list_parquet_files().unwrap();
849 assert!(files.is_empty()); }
851}