1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4 infrastructure::persistence::storage::ParquetStorage,
5};
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::{
10 fs,
11 path::{Path, PathBuf},
12 sync::Arc,
13 time::Duration,
14};
15
16pub struct CompactionManager {
18 storage_dir: PathBuf,
20
21 config: CompactionConfig,
23
24 stats: Arc<RwLock<CompactionStats>>,
26
27 last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct CompactionConfig {
33 pub min_files_to_compact: usize,
35
36 pub target_file_size: usize,
38
39 pub max_file_size: usize,
41
42 pub small_file_threshold: usize,
44
45 pub compaction_interval_seconds: u64,
47
48 pub auto_compact: bool,
50
51 pub strategy: CompactionStrategy,
53}
54
55impl Default for CompactionConfig {
56 fn default() -> Self {
57 Self {
58 min_files_to_compact: 3,
59 target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
64 strategy: CompactionStrategy::SizeBased,
65 }
66 }
67}
68
69#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
70#[serde(rename_all = "lowercase")]
71pub enum CompactionStrategy {
72 SizeBased,
74 TimeBased,
76 FullCompaction,
78}
79
80#[derive(Debug, Clone, Default, Serialize)]
81pub struct CompactionStats {
82 pub total_compactions: u64,
83 pub total_files_compacted: u64,
84 pub total_bytes_before: u64,
85 pub total_bytes_after: u64,
86 pub total_events_compacted: u64,
87 pub last_compaction_duration_ms: u64,
88 pub space_saved_bytes: u64,
89}
90
91#[derive(Debug, Clone)]
93struct FileInfo {
94 path: PathBuf,
95 size: u64,
96 created: DateTime<Utc>,
97}
98
99impl CompactionManager {
100 pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
102 let storage_dir = storage_dir.into();
103
104 tracing::info!(
105 "✅ Compaction manager initialized at: {}",
106 storage_dir.display()
107 );
108
109 Self {
110 storage_dir,
111 config,
112 stats: Arc::new(RwLock::new(CompactionStats::default())),
113 last_compaction: Arc::new(RwLock::new(None)),
114 }
115 }
116
117 fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
119 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
120 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
121 })?;
122
123 let mut files = Vec::new();
124
125 for entry in entries {
126 let entry = entry.map_err(|e| {
127 AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
128 })?;
129
130 let path = entry.path();
131 if let Some(ext) = path.extension()
132 && ext == "parquet"
133 {
134 let metadata = entry.metadata().map_err(|e| {
135 AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
136 })?;
137
138 let size = metadata.len();
139 let created = metadata
140 .created()
141 .ok()
142 .and_then(|t| {
143 t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
144 DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
145 })
146 })
147 .unwrap_or_else(Utc::now);
148
149 files.push(FileInfo {
150 path,
151 size,
152 created,
153 });
154 }
155 }
156
157 files.sort_by_key(|f| f.created);
159
160 Ok(files)
161 }
162
163 fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
165 match self.config.strategy {
166 CompactionStrategy::SizeBased => self.select_small_files(files),
167 CompactionStrategy::TimeBased => self.select_old_files(files),
168 CompactionStrategy::FullCompaction => files.to_vec(),
169 }
170 }
171
172 fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
174 let small_files: Vec<FileInfo> = files
175 .iter()
176 .filter(|f| f.size < self.config.small_file_threshold as u64)
177 .cloned()
178 .collect();
179
180 if small_files.len() >= self.config.min_files_to_compact {
182 small_files
183 } else {
184 Vec::new()
185 }
186 }
187
188 fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
190 let now = Utc::now();
191 let age_threshold = chrono::Duration::hours(24); let old_files: Vec<FileInfo> = files
194 .iter()
195 .filter(|f| now - f.created > age_threshold)
196 .cloned()
197 .collect();
198
199 if old_files.len() >= self.config.min_files_to_compact {
200 old_files
201 } else {
202 Vec::new()
203 }
204 }
205
206 #[cfg_attr(feature = "hotpath", hotpath::measure)]
208 pub fn should_compact(&self) -> bool {
209 if !self.config.auto_compact {
210 return false;
211 }
212
213 let last = self.last_compaction.read();
214 match *last {
215 None => true, Some(last_time) => {
217 let elapsed = (Utc::now() - last_time).num_seconds();
218 elapsed >= self.config.compaction_interval_seconds as i64
219 }
220 }
221 }
222
223 #[cfg_attr(feature = "hotpath", hotpath::measure)]
225 pub fn compact(&self) -> Result<CompactionResult> {
226 let start_time = std::time::Instant::now();
227 tracing::info!("🔄 Starting Parquet compaction...");
228
229 let files = self.list_parquet_files()?;
231
232 if files.is_empty() {
233 tracing::debug!("No Parquet files to compact");
234 return Ok(CompactionResult {
235 files_compacted: 0,
236 bytes_before: 0,
237 bytes_after: 0,
238 events_compacted: 0,
239 duration_ms: 0,
240 });
241 }
242
243 let files_to_compact = self.select_files_for_compaction(&files);
245
246 if files_to_compact.is_empty() {
247 tracing::debug!(
248 "No files meet compaction criteria (strategy: {:?})",
249 self.config.strategy
250 );
251 return Ok(CompactionResult {
252 files_compacted: 0,
253 bytes_before: 0,
254 bytes_after: 0,
255 events_compacted: 0,
256 duration_ms: 0,
257 });
258 }
259
260 let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
261
262 tracing::info!(
263 "Compacting {} files ({:.2} MB)",
264 files_to_compact.len(),
265 bytes_before as f64 / (1024.0 * 1024.0)
266 );
267
268 let mut all_events = Vec::new();
270 for file_info in &files_to_compact {
271 match self.read_parquet_file(&file_info.path) {
272 Ok(mut events) => {
273 all_events.append(&mut events);
274 }
275 Err(e) => {
276 tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
277 }
279 }
280 }
281
282 if all_events.is_empty() {
283 tracing::warn!("No events read from files to compact");
284 return Ok(CompactionResult {
285 files_compacted: 0,
286 bytes_before,
287 bytes_after: 0,
288 events_compacted: 0,
289 duration_ms: start_time.elapsed().as_millis() as u64,
290 });
291 }
292
293 all_events.sort_by_key(|e| e.timestamp);
295
296 tracing::debug!("Read {} events for compaction", all_events.len());
297
298 let compacted_files = self.write_compacted_files(&all_events)?;
300
301 let bytes_after: u64 = compacted_files
302 .iter()
303 .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
304 .sum();
305
306 for file_info in &files_to_compact {
308 if let Err(e) = fs::remove_file(&file_info.path) {
309 tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
310 } else {
311 tracing::debug!("Removed old file: {:?}", file_info.path);
312 }
313 }
314
315 let duration_ms = start_time.elapsed().as_millis() as u64;
316
317 let mut stats = self.stats.write();
319 stats.total_compactions += 1;
320 stats.total_files_compacted += files_to_compact.len() as u64;
321 stats.total_bytes_before += bytes_before;
322 stats.total_bytes_after += bytes_after;
323 stats.total_events_compacted += all_events.len() as u64;
324 stats.last_compaction_duration_ms = duration_ms;
325 stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
326 drop(stats);
327
328 *self.last_compaction.write() = Some(Utc::now());
330
331 let compression_ratio = if bytes_before > 0 {
332 (bytes_after as f64 / bytes_before as f64) * 100.0
333 } else {
334 100.0
335 };
336
337 tracing::info!(
338 "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
339 files_to_compact.len(),
340 compacted_files.len(),
341 bytes_before as f64 / (1024.0 * 1024.0),
342 bytes_after as f64 / (1024.0 * 1024.0),
343 compression_ratio,
344 all_events.len(),
345 duration_ms
346 );
347
348 Ok(CompactionResult {
349 files_compacted: files_to_compact.len(),
350 bytes_before,
351 bytes_after,
352 events_compacted: all_events.len(),
353 duration_ms,
354 })
355 }
356
357 fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
359 let storage = ParquetStorage::new(&self.storage_dir)?;
361
362 let all_events = storage.load_all_events()?;
365
366 Ok(all_events)
367 }
368
369 fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
371 let mut compacted_files = Vec::new();
372 let mut current_batch = Vec::new();
373 let mut current_size = 0;
374
375 for event in events {
376 let event_size = serde_json::to_string(event)
378 .map(|s| s.len())
379 .unwrap_or(1024);
380
381 if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
383 {
384 let file_path = self.write_batch(¤t_batch)?;
386 compacted_files.push(file_path);
387
388 current_batch.clear();
390 current_size = 0;
391 }
392
393 current_batch.push(event.clone());
394 current_size += event_size;
395
396 if current_size >= self.config.max_file_size {
398 let file_path = self.write_batch(¤t_batch)?;
399 compacted_files.push(file_path);
400
401 current_batch.clear();
402 current_size = 0;
403 }
404 }
405
406 if !current_batch.is_empty() {
408 let file_path = self.write_batch(¤t_batch)?;
409 compacted_files.push(file_path);
410 }
411
412 Ok(compacted_files)
413 }
414
415 fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
417 let storage = ParquetStorage::new(&self.storage_dir)?;
418
419 let filename = format!(
421 "events-compacted-{}.parquet",
422 Utc::now().format("%Y%m%d-%H%M%S-%f")
423 );
424 let file_path = self.storage_dir.join(filename);
425
426 for event in events {
428 storage.append_event(event.clone())?;
429 }
430
431 storage.flush()?;
433
434 tracing::debug!(
435 "Wrote compacted file: {:?} ({} events)",
436 file_path,
437 events.len()
438 );
439
440 Ok(file_path)
441 }
442
443 pub fn stats(&self) -> CompactionStats {
445 (*self.stats.read()).clone()
446 }
447
448 pub fn config(&self) -> &CompactionConfig {
450 &self.config
451 }
452
453 #[cfg_attr(feature = "hotpath", hotpath::measure)]
455 pub fn compact_now(&self) -> Result<CompactionResult> {
456 tracing::info!("Manual compaction triggered");
457 self.compact()
458 }
459}
460
461#[derive(Debug, Clone, Serialize)]
463pub struct CompactionResult {
464 pub files_compacted: usize,
465 pub bytes_before: u64,
466 pub bytes_after: u64,
467 pub events_compacted: usize,
468 pub duration_ms: u64,
469}
470
471pub struct CompactionTask {
473 manager: Arc<CompactionManager>,
474 interval: Duration,
475}
476
477impl CompactionTask {
478 pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
480 Self {
481 manager,
482 interval: Duration::from_secs(interval_seconds),
483 }
484 }
485
486 #[cfg_attr(feature = "hotpath", hotpath::measure)]
488 pub async fn run(self) {
489 let mut interval = tokio::time::interval(self.interval);
490
491 loop {
492 interval.tick().await;
493
494 if self.manager.should_compact() {
495 tracing::debug!("Auto-compaction check triggered");
496
497 match self.manager.compact() {
498 Ok(result) => {
499 if result.files_compacted > 0 {
500 tracing::info!(
501 "Auto-compaction succeeded: {} files, {:.2} MB saved",
502 result.files_compacted,
503 (result.bytes_before - result.bytes_after) as f64
504 / (1024.0 * 1024.0)
505 );
506 }
507 }
508 Err(e) => {
509 tracing::error!("Auto-compaction failed: {}", e);
510 }
511 }
512 }
513 }
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use tempfile::TempDir;
521
522 #[test]
523 fn test_compaction_manager_creation() {
524 let temp_dir = TempDir::new().unwrap();
525 let config = CompactionConfig::default();
526 let manager = CompactionManager::new(temp_dir.path(), config);
527
528 assert_eq!(manager.stats().total_compactions, 0);
529 }
530
531 #[test]
532 fn test_should_compact() {
533 let temp_dir = TempDir::new().unwrap();
534 let config = CompactionConfig {
535 auto_compact: true,
536 compaction_interval_seconds: 1,
537 ..Default::default()
538 };
539 let manager = CompactionManager::new(temp_dir.path(), config);
540
541 assert!(manager.should_compact());
543 }
544
545 #[test]
546 fn test_file_selection_size_based() {
547 let temp_dir = TempDir::new().unwrap();
548 let config = CompactionConfig {
549 small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
551 strategy: CompactionStrategy::SizeBased,
552 ..Default::default()
553 };
554 let manager = CompactionManager::new(temp_dir.path(), config);
555
556 let files = vec![
557 FileInfo {
558 path: PathBuf::from("small1.parquet"),
559 size: 500_000, created: Utc::now(),
561 },
562 FileInfo {
563 path: PathBuf::from("small2.parquet"),
564 size: 600_000, created: Utc::now(),
566 },
567 FileInfo {
568 path: PathBuf::from("large.parquet"),
569 size: 10_000_000, created: Utc::now(),
571 },
572 ];
573
574 let selected = manager.select_files_for_compaction(&files);
575 assert_eq!(selected.len(), 2); }
577
578 #[test]
579 fn test_default_compaction_config() {
580 let config = CompactionConfig::default();
581 assert_eq!(config.min_files_to_compact, 3);
582 assert_eq!(config.target_file_size, 128 * 1024 * 1024);
583 assert_eq!(config.max_file_size, 256 * 1024 * 1024);
584 assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
585 assert_eq!(config.compaction_interval_seconds, 3600);
586 assert!(config.auto_compact);
587 assert_eq!(config.strategy, CompactionStrategy::SizeBased);
588 }
589
590 #[test]
591 fn test_should_compact_disabled() {
592 let temp_dir = TempDir::new().unwrap();
593 let config = CompactionConfig {
594 auto_compact: false,
595 ..Default::default()
596 };
597 let manager = CompactionManager::new(temp_dir.path(), config);
598
599 assert!(!manager.should_compact());
600 }
601
602 #[test]
603 fn test_compact_empty_directory() {
604 let temp_dir = TempDir::new().unwrap();
605 let config = CompactionConfig::default();
606 let manager = CompactionManager::new(temp_dir.path(), config);
607
608 let result = manager.compact().unwrap();
609 assert_eq!(result.files_compacted, 0);
610 assert_eq!(result.bytes_before, 0);
611 assert_eq!(result.bytes_after, 0);
612 assert_eq!(result.events_compacted, 0);
613 }
614
615 #[test]
616 fn test_compact_now() {
617 let temp_dir = TempDir::new().unwrap();
618 let config = CompactionConfig::default();
619 let manager = CompactionManager::new(temp_dir.path(), config);
620
621 let result = manager.compact_now().unwrap();
622 assert_eq!(result.files_compacted, 0);
623 }
624
625 #[test]
626 fn test_get_config() {
627 let temp_dir = TempDir::new().unwrap();
628 let config = CompactionConfig {
629 min_files_to_compact: 5,
630 ..Default::default()
631 };
632 let manager = CompactionManager::new(temp_dir.path(), config);
633
634 assert_eq!(manager.config().min_files_to_compact, 5);
635 }
636
637 #[test]
638 fn test_get_stats() {
639 let temp_dir = TempDir::new().unwrap();
640 let config = CompactionConfig::default();
641 let manager = CompactionManager::new(temp_dir.path(), config);
642
643 let stats = manager.stats();
644 assert_eq!(stats.total_compactions, 0);
645 assert_eq!(stats.total_files_compacted, 0);
646 assert_eq!(stats.total_bytes_before, 0);
647 assert_eq!(stats.total_bytes_after, 0);
648 assert_eq!(stats.total_events_compacted, 0);
649 assert_eq!(stats.last_compaction_duration_ms, 0);
650 assert_eq!(stats.space_saved_bytes, 0);
651 }
652
653 #[test]
654 fn test_file_selection_not_enough_small_files() {
655 let temp_dir = TempDir::new().unwrap();
656 let config = CompactionConfig {
657 small_file_threshold: 1024 * 1024,
658 min_files_to_compact: 3, strategy: CompactionStrategy::SizeBased,
660 ..Default::default()
661 };
662 let manager = CompactionManager::new(temp_dir.path(), config);
663
664 let files = vec![
665 FileInfo {
666 path: PathBuf::from("small1.parquet"),
667 size: 500_000,
668 created: Utc::now(),
669 },
670 FileInfo {
671 path: PathBuf::from("small2.parquet"),
672 size: 600_000,
673 created: Utc::now(),
674 },
675 ];
676
677 let selected = manager.select_files_for_compaction(&files);
678 assert_eq!(selected.len(), 0); }
680
681 #[test]
682 fn test_file_selection_time_based() {
683 let temp_dir = TempDir::new().unwrap();
684 let config = CompactionConfig {
685 min_files_to_compact: 2,
686 strategy: CompactionStrategy::TimeBased,
687 ..Default::default()
688 };
689 let manager = CompactionManager::new(temp_dir.path(), config);
690
691 let old_time = Utc::now() - chrono::Duration::hours(48);
692 let files = vec![
693 FileInfo {
694 path: PathBuf::from("old1.parquet"),
695 size: 1_000_000,
696 created: old_time,
697 },
698 FileInfo {
699 path: PathBuf::from("old2.parquet"),
700 size: 2_000_000,
701 created: old_time,
702 },
703 FileInfo {
704 path: PathBuf::from("new.parquet"),
705 size: 500_000,
706 created: Utc::now(),
707 },
708 ];
709
710 let selected = manager.select_files_for_compaction(&files);
711 assert_eq!(selected.len(), 2); }
713
714 #[test]
715 fn test_file_selection_time_based_not_enough() {
716 let temp_dir = TempDir::new().unwrap();
717 let config = CompactionConfig {
718 min_files_to_compact: 3,
719 strategy: CompactionStrategy::TimeBased,
720 ..Default::default()
721 };
722 let manager = CompactionManager::new(temp_dir.path(), config);
723
724 let old_time = Utc::now() - chrono::Duration::hours(48);
725 let files = vec![
726 FileInfo {
727 path: PathBuf::from("old1.parquet"),
728 size: 1_000_000,
729 created: old_time,
730 },
731 FileInfo {
732 path: PathBuf::from("new.parquet"),
733 size: 500_000,
734 created: Utc::now(),
735 },
736 ];
737
738 let selected = manager.select_files_for_compaction(&files);
739 assert_eq!(selected.len(), 0); }
741
742 #[test]
743 fn test_file_selection_full_compaction() {
744 let temp_dir = TempDir::new().unwrap();
745 let config = CompactionConfig {
746 strategy: CompactionStrategy::FullCompaction,
747 ..Default::default()
748 };
749 let manager = CompactionManager::new(temp_dir.path(), config);
750
751 let files = vec![
752 FileInfo {
753 path: PathBuf::from("file1.parquet"),
754 size: 1_000_000,
755 created: Utc::now(),
756 },
757 FileInfo {
758 path: PathBuf::from("file2.parquet"),
759 size: 2_000_000,
760 created: Utc::now(),
761 },
762 ];
763
764 let selected = manager.select_files_for_compaction(&files);
765 assert_eq!(selected.len(), 2); }
767
768 #[test]
769 fn test_compaction_strategy_serde() {
770 let strategies = vec![
771 CompactionStrategy::SizeBased,
772 CompactionStrategy::TimeBased,
773 CompactionStrategy::FullCompaction,
774 ];
775
776 for strategy in strategies {
777 let json = serde_json::to_string(&strategy).unwrap();
778 let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
779 assert_eq!(parsed, strategy);
780 }
781 }
782
783 #[test]
784 fn test_compaction_stats_default() {
785 let stats = CompactionStats::default();
786 assert_eq!(stats.total_compactions, 0);
787 assert_eq!(stats.total_files_compacted, 0);
788 }
789
790 #[test]
791 fn test_compaction_stats_serde() {
792 let stats = CompactionStats {
793 total_compactions: 5,
794 total_files_compacted: 20,
795 total_bytes_before: 1000000,
796 total_bytes_after: 500000,
797 total_events_compacted: 10000,
798 last_compaction_duration_ms: 500,
799 space_saved_bytes: 500000,
800 };
801
802 let json = serde_json::to_string(&stats).unwrap();
803 assert!(json.contains("\"total_compactions\":5"));
804 assert!(json.contains("\"space_saved_bytes\":500000"));
805 }
806
807 #[test]
808 fn test_compaction_result_serde() {
809 let result = CompactionResult {
810 files_compacted: 3,
811 bytes_before: 1000000,
812 bytes_after: 500000,
813 events_compacted: 5000,
814 duration_ms: 250,
815 };
816
817 let json = serde_json::to_string(&result).unwrap();
818 assert!(json.contains("\"files_compacted\":3"));
819 assert!(json.contains("\"bytes_before\":1000000"));
820 }
821
822 #[test]
823 fn test_compaction_task_creation() {
824 let temp_dir = TempDir::new().unwrap();
825 let config = CompactionConfig::default();
826 let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
827
828 let _task = CompactionTask::new(manager.clone(), 60);
829 }
831
832 #[test]
833 fn test_list_parquet_files_empty() {
834 let temp_dir = TempDir::new().unwrap();
835 let config = CompactionConfig::default();
836 let manager = CompactionManager::new(temp_dir.path(), config);
837
838 let files = manager.list_parquet_files().unwrap();
839 assert!(files.is_empty());
840 }
841
842 #[test]
843 fn test_list_parquet_files_with_non_parquet() {
844 let temp_dir = TempDir::new().unwrap();
845 let config = CompactionConfig::default();
846 let manager = CompactionManager::new(temp_dir.path(), config);
847
848 std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
850 std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
851
852 let files = manager.list_parquet_files().unwrap();
853 assert!(files.is_empty()); }
855}