Skip to main content

allsource_core/infrastructure/persistence/
compaction.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4    infrastructure::persistence::storage::ParquetStorage,
5};
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::{
10    fs,
11    path::{Path, PathBuf},
12    sync::Arc,
13    time::Duration,
14};
15
16/// Manages Parquet file compaction for optimal storage and query performance
17pub struct CompactionManager {
18    /// Directory where Parquet files are stored
19    storage_dir: PathBuf,
20
21    /// Configuration
22    config: CompactionConfig,
23
24    /// Statistics
25    stats: Arc<RwLock<CompactionStats>>,
26
27    /// Last compaction time
28    last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct CompactionConfig {
33    /// Minimum number of files to trigger compaction
34    pub min_files_to_compact: usize,
35
36    /// Target size for compacted files (in bytes)
37    pub target_file_size: usize,
38
39    /// Maximum size for a single compacted file (in bytes)
40    pub max_file_size: usize,
41
42    /// Minimum file size to consider for compaction (small files)
43    pub small_file_threshold: usize,
44
45    /// Time interval between automatic compactions (in seconds)
46    pub compaction_interval_seconds: u64,
47
48    /// Enable automatic background compaction
49    pub auto_compact: bool,
50
51    /// Compaction strategy
52    pub strategy: CompactionStrategy,
53}
54
55impl Default for CompactionConfig {
56    fn default() -> Self {
57        Self {
58            min_files_to_compact: 3,
59            target_file_size: 128 * 1024 * 1024,    // 128 MB
60            max_file_size: 256 * 1024 * 1024,       // 256 MB
61            small_file_threshold: 10 * 1024 * 1024, // 10 MB
62            compaction_interval_seconds: 3600,      // 1 hour
63            auto_compact: true,
64            strategy: CompactionStrategy::SizeBased,
65        }
66    }
67}
68
69#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
70#[serde(rename_all = "lowercase")]
71pub enum CompactionStrategy {
72    /// Compact based on file size (default)
73    SizeBased,
74    /// Compact based on file age
75    TimeBased,
76    /// Compact all files into one
77    FullCompaction,
78}
79
80#[derive(Debug, Clone, Default, Serialize)]
81pub struct CompactionStats {
82    pub total_compactions: u64,
83    pub total_files_compacted: u64,
84    pub total_bytes_before: u64,
85    pub total_bytes_after: u64,
86    pub total_events_compacted: u64,
87    pub last_compaction_duration_ms: u64,
88    pub space_saved_bytes: u64,
89}
90
91/// Information about a Parquet file candidate for compaction
92#[derive(Debug, Clone)]
93struct FileInfo {
94    path: PathBuf,
95    size: u64,
96    created: DateTime<Utc>,
97}
98
99impl CompactionManager {
100    /// Create a new compaction manager
101    pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
102        let storage_dir = storage_dir.into();
103
104        tracing::info!(
105            "✅ Compaction manager initialized at: {}",
106            storage_dir.display()
107        );
108
109        Self {
110            storage_dir,
111            config,
112            stats: Arc::new(RwLock::new(CompactionStats::default())),
113            last_compaction: Arc::new(RwLock::new(None)),
114        }
115    }
116
117    /// List all Parquet files in the storage directory
118    fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
119        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
120            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
121        })?;
122
123        let mut files = Vec::new();
124
125        for entry in entries {
126            let entry = entry.map_err(|e| {
127                AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
128            })?;
129
130            let path = entry.path();
131            if let Some(ext) = path.extension()
132                && ext == "parquet"
133            {
134                let metadata = entry.metadata().map_err(|e| {
135                    AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
136                })?;
137
138                let size = metadata.len();
139                let created = metadata
140                    .created()
141                    .ok()
142                    .and_then(|t| {
143                        t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
144                            DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
145                        })
146                    })
147                    .unwrap_or_else(Utc::now);
148
149                files.push(FileInfo {
150                    path,
151                    size,
152                    created,
153                });
154            }
155        }
156
157        // Sort by creation time (oldest first)
158        files.sort_by_key(|f| f.created);
159
160        Ok(files)
161    }
162
163    /// Identify files that should be compacted based on strategy
164    fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
165        match self.config.strategy {
166            CompactionStrategy::SizeBased => self.select_small_files(files),
167            CompactionStrategy::TimeBased => self.select_old_files(files),
168            CompactionStrategy::FullCompaction => files.to_vec(),
169        }
170    }
171
172    /// Select small files for compaction
173    fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
174        let small_files: Vec<FileInfo> = files
175            .iter()
176            .filter(|f| f.size < self.config.small_file_threshold as u64)
177            .cloned()
178            .collect();
179
180        // Only compact if we have enough small files
181        if small_files.len() >= self.config.min_files_to_compact {
182            small_files
183        } else {
184            Vec::new()
185        }
186    }
187
188    /// Select old files for time-based compaction
189    fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
190        let now = Utc::now();
191        let age_threshold = chrono::Duration::hours(24); // Files older than 24 hours
192
193        let old_files: Vec<FileInfo> = files
194            .iter()
195            .filter(|f| now - f.created > age_threshold)
196            .cloned()
197            .collect();
198
199        if old_files.len() >= self.config.min_files_to_compact {
200            old_files
201        } else {
202            Vec::new()
203        }
204    }
205
206    /// Check if compaction should run
207    #[cfg_attr(feature = "hotpath", hotpath::measure)]
208    pub fn should_compact(&self) -> bool {
209        if !self.config.auto_compact {
210            return false;
211        }
212
213        let last = self.last_compaction.read();
214        match *last {
215            None => true, // Never compacted
216            Some(last_time) => {
217                let elapsed = (Utc::now() - last_time).num_seconds();
218                elapsed >= self.config.compaction_interval_seconds as i64
219            }
220        }
221    }
222
223    /// Perform compaction of Parquet files
224    #[cfg_attr(feature = "hotpath", hotpath::measure)]
225    pub fn compact(&self) -> Result<CompactionResult> {
226        let start_time = std::time::Instant::now();
227        tracing::info!("🔄 Starting Parquet compaction...");
228
229        // List all Parquet files
230        let files = self.list_parquet_files()?;
231
232        if files.is_empty() {
233            tracing::debug!("No Parquet files to compact");
234            return Ok(CompactionResult {
235                files_compacted: 0,
236                bytes_before: 0,
237                bytes_after: 0,
238                events_compacted: 0,
239                duration_ms: 0,
240            });
241        }
242
243        // Select files for compaction
244        let files_to_compact = self.select_files_for_compaction(&files);
245
246        if files_to_compact.is_empty() {
247            tracing::debug!(
248                "No files meet compaction criteria (strategy: {:?})",
249                self.config.strategy
250            );
251            return Ok(CompactionResult {
252                files_compacted: 0,
253                bytes_before: 0,
254                bytes_after: 0,
255                events_compacted: 0,
256                duration_ms: 0,
257            });
258        }
259
260        let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
261
262        tracing::info!(
263            "Compacting {} files ({:.2} MB)",
264            files_to_compact.len(),
265            bytes_before as f64 / (1024.0 * 1024.0)
266        );
267
268        // Read events from all files to be compacted
269        let mut all_events = Vec::new();
270        for file_info in &files_to_compact {
271            match self.read_parquet_file(&file_info.path) {
272                Ok(mut events) => {
273                    all_events.append(&mut events);
274                }
275                Err(e) => {
276                    tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
277                    // Continue with other files
278                }
279            }
280        }
281
282        if all_events.is_empty() {
283            tracing::warn!("No events read from files to compact");
284            return Ok(CompactionResult {
285                files_compacted: 0,
286                bytes_before,
287                bytes_after: 0,
288                events_compacted: 0,
289                duration_ms: start_time.elapsed().as_millis() as u64,
290            });
291        }
292
293        // Sort events by timestamp for better compression and query performance
294        all_events.sort_by_key(|e| e.timestamp);
295
296        tracing::debug!("Read {} events for compaction", all_events.len());
297
298        // Write compacted file(s)
299        let compacted_files = self.write_compacted_files(&all_events)?;
300
301        let bytes_after: u64 = compacted_files
302            .iter()
303            .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
304            .sum();
305
306        // Delete original files atomically
307        for file_info in &files_to_compact {
308            if let Err(e) = fs::remove_file(&file_info.path) {
309                tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
310            } else {
311                tracing::debug!("Removed old file: {:?}", file_info.path);
312            }
313        }
314
315        let duration_ms = start_time.elapsed().as_millis() as u64;
316
317        // Update statistics
318        let mut stats = self.stats.write();
319        stats.total_compactions += 1;
320        stats.total_files_compacted += files_to_compact.len() as u64;
321        stats.total_bytes_before += bytes_before;
322        stats.total_bytes_after += bytes_after;
323        stats.total_events_compacted += all_events.len() as u64;
324        stats.last_compaction_duration_ms = duration_ms;
325        stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
326        drop(stats);
327
328        // Update last compaction time
329        *self.last_compaction.write() = Some(Utc::now());
330
331        let compression_ratio = if bytes_before > 0 {
332            (bytes_after as f64 / bytes_before as f64) * 100.0
333        } else {
334            100.0
335        };
336
337        tracing::info!(
338            "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
339            files_to_compact.len(),
340            compacted_files.len(),
341            bytes_before as f64 / (1024.0 * 1024.0),
342            bytes_after as f64 / (1024.0 * 1024.0),
343            compression_ratio,
344            all_events.len(),
345            duration_ms
346        );
347
348        Ok(CompactionResult {
349            files_compacted: files_to_compact.len(),
350            bytes_before,
351            bytes_after,
352            events_compacted: all_events.len(),
353            duration_ms,
354        })
355    }
356
357    /// Read events from a Parquet file
358    fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
359        // Use ParquetStorage to read the file
360        let storage = ParquetStorage::new(&self.storage_dir)?;
361
362        // For now, we'll read all events and filter by file
363        // In a production system, you'd want to read specific files
364        let all_events = storage.load_all_events()?;
365
366        Ok(all_events)
367    }
368
369    /// Write compacted events to new Parquet file(s)
370    fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
371        let mut compacted_files = Vec::new();
372        let mut current_batch = Vec::new();
373        let mut current_size = 0;
374
375        for event in events {
376            // Estimate event size (rough approximation)
377            let event_size = serde_json::to_string(event)
378                .map(|s| s.len())
379                .unwrap_or(1024);
380
381            // Check if adding this event would exceed target size
382            if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
383            {
384                // Write current batch
385                let file_path = self.write_batch(&current_batch)?;
386                compacted_files.push(file_path);
387
388                // Start new batch
389                current_batch.clear();
390                current_size = 0;
391            }
392
393            current_batch.push(event.clone());
394            current_size += event_size;
395
396            // Also check max file size
397            if current_size >= self.config.max_file_size {
398                let file_path = self.write_batch(&current_batch)?;
399                compacted_files.push(file_path);
400
401                current_batch.clear();
402                current_size = 0;
403            }
404        }
405
406        // Write remaining events
407        if !current_batch.is_empty() {
408            let file_path = self.write_batch(&current_batch)?;
409            compacted_files.push(file_path);
410        }
411
412        Ok(compacted_files)
413    }
414
415    /// Write a batch of events to a new Parquet file
416    fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
417        let storage = ParquetStorage::new(&self.storage_dir)?;
418
419        // Generate filename with timestamp
420        let filename = format!(
421            "events-compacted-{}.parquet",
422            Utc::now().format("%Y%m%d-%H%M%S-%f")
423        );
424        let file_path = self.storage_dir.join(filename);
425
426        // Write events
427        for event in events {
428            storage.append_event(event.clone())?;
429        }
430
431        // Flush to disk
432        storage.flush()?;
433
434        tracing::debug!(
435            "Wrote compacted file: {:?} ({} events)",
436            file_path,
437            events.len()
438        );
439
440        Ok(file_path)
441    }
442
443    /// Get compaction statistics
444    pub fn stats(&self) -> CompactionStats {
445        (*self.stats.read()).clone()
446    }
447
448    /// Get configuration
449    pub fn config(&self) -> &CompactionConfig {
450        &self.config
451    }
452
453    /// Trigger manual compaction
454    #[cfg_attr(feature = "hotpath", hotpath::measure)]
455    pub fn compact_now(&self) -> Result<CompactionResult> {
456        tracing::info!("Manual compaction triggered");
457        self.compact()
458    }
459}
460
461/// Result of a compaction operation
462#[derive(Debug, Clone, Serialize)]
463pub struct CompactionResult {
464    pub files_compacted: usize,
465    pub bytes_before: u64,
466    pub bytes_after: u64,
467    pub events_compacted: usize,
468    pub duration_ms: u64,
469}
470
471/// Background compaction task
472pub struct CompactionTask {
473    manager: Arc<CompactionManager>,
474    interval: Duration,
475}
476
477impl CompactionTask {
478    /// Create a new background compaction task
479    pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
480        Self {
481            manager,
482            interval: Duration::from_secs(interval_seconds),
483        }
484    }
485
486    /// Run the compaction task in a loop
487    #[cfg_attr(feature = "hotpath", hotpath::measure)]
488    pub async fn run(self) {
489        let mut interval = tokio::time::interval(self.interval);
490
491        loop {
492            interval.tick().await;
493
494            if self.manager.should_compact() {
495                tracing::debug!("Auto-compaction check triggered");
496
497                match self.manager.compact() {
498                    Ok(result) => {
499                        if result.files_compacted > 0 {
500                            tracing::info!(
501                                "Auto-compaction succeeded: {} files, {:.2} MB saved",
502                                result.files_compacted,
503                                (result.bytes_before - result.bytes_after) as f64
504                                    / (1024.0 * 1024.0)
505                            );
506                        }
507                    }
508                    Err(e) => {
509                        tracing::error!("Auto-compaction failed: {}", e);
510                    }
511                }
512            }
513        }
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use tempfile::TempDir;
521
522    #[test]
523    fn test_compaction_manager_creation() {
524        let temp_dir = TempDir::new().unwrap();
525        let config = CompactionConfig::default();
526        let manager = CompactionManager::new(temp_dir.path(), config);
527
528        assert_eq!(manager.stats().total_compactions, 0);
529    }
530
531    #[test]
532    fn test_should_compact() {
533        let temp_dir = TempDir::new().unwrap();
534        let config = CompactionConfig {
535            auto_compact: true,
536            compaction_interval_seconds: 1,
537            ..Default::default()
538        };
539        let manager = CompactionManager::new(temp_dir.path(), config);
540
541        // Should compact on first check (never compacted)
542        assert!(manager.should_compact());
543    }
544
545    #[test]
546    fn test_file_selection_size_based() {
547        let temp_dir = TempDir::new().unwrap();
548        let config = CompactionConfig {
549            small_file_threshold: 1024 * 1024, // 1 MB
550            min_files_to_compact: 2,
551            strategy: CompactionStrategy::SizeBased,
552            ..Default::default()
553        };
554        let manager = CompactionManager::new(temp_dir.path(), config);
555
556        let files = vec![
557            FileInfo {
558                path: PathBuf::from("small1.parquet"),
559                size: 500_000, // 500 KB
560                created: Utc::now(),
561            },
562            FileInfo {
563                path: PathBuf::from("small2.parquet"),
564                size: 600_000, // 600 KB
565                created: Utc::now(),
566            },
567            FileInfo {
568                path: PathBuf::from("large.parquet"),
569                size: 10_000_000, // 10 MB
570                created: Utc::now(),
571            },
572        ];
573
574        let selected = manager.select_files_for_compaction(&files);
575        assert_eq!(selected.len(), 2); // Only the 2 small files
576    }
577
578    #[test]
579    fn test_default_compaction_config() {
580        let config = CompactionConfig::default();
581        assert_eq!(config.min_files_to_compact, 3);
582        assert_eq!(config.target_file_size, 128 * 1024 * 1024);
583        assert_eq!(config.max_file_size, 256 * 1024 * 1024);
584        assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
585        assert_eq!(config.compaction_interval_seconds, 3600);
586        assert!(config.auto_compact);
587        assert_eq!(config.strategy, CompactionStrategy::SizeBased);
588    }
589
590    #[test]
591    fn test_should_compact_disabled() {
592        let temp_dir = TempDir::new().unwrap();
593        let config = CompactionConfig {
594            auto_compact: false,
595            ..Default::default()
596        };
597        let manager = CompactionManager::new(temp_dir.path(), config);
598
599        assert!(!manager.should_compact());
600    }
601
602    #[test]
603    fn test_compact_empty_directory() {
604        let temp_dir = TempDir::new().unwrap();
605        let config = CompactionConfig::default();
606        let manager = CompactionManager::new(temp_dir.path(), config);
607
608        let result = manager.compact().unwrap();
609        assert_eq!(result.files_compacted, 0);
610        assert_eq!(result.bytes_before, 0);
611        assert_eq!(result.bytes_after, 0);
612        assert_eq!(result.events_compacted, 0);
613    }
614
615    #[test]
616    fn test_compact_now() {
617        let temp_dir = TempDir::new().unwrap();
618        let config = CompactionConfig::default();
619        let manager = CompactionManager::new(temp_dir.path(), config);
620
621        let result = manager.compact_now().unwrap();
622        assert_eq!(result.files_compacted, 0);
623    }
624
625    #[test]
626    fn test_get_config() {
627        let temp_dir = TempDir::new().unwrap();
628        let config = CompactionConfig {
629            min_files_to_compact: 5,
630            ..Default::default()
631        };
632        let manager = CompactionManager::new(temp_dir.path(), config);
633
634        assert_eq!(manager.config().min_files_to_compact, 5);
635    }
636
637    #[test]
638    fn test_get_stats() {
639        let temp_dir = TempDir::new().unwrap();
640        let config = CompactionConfig::default();
641        let manager = CompactionManager::new(temp_dir.path(), config);
642
643        let stats = manager.stats();
644        assert_eq!(stats.total_compactions, 0);
645        assert_eq!(stats.total_files_compacted, 0);
646        assert_eq!(stats.total_bytes_before, 0);
647        assert_eq!(stats.total_bytes_after, 0);
648        assert_eq!(stats.total_events_compacted, 0);
649        assert_eq!(stats.last_compaction_duration_ms, 0);
650        assert_eq!(stats.space_saved_bytes, 0);
651    }
652
653    #[test]
654    fn test_file_selection_not_enough_small_files() {
655        let temp_dir = TempDir::new().unwrap();
656        let config = CompactionConfig {
657            small_file_threshold: 1024 * 1024,
658            min_files_to_compact: 3, // Need 3 files
659            strategy: CompactionStrategy::SizeBased,
660            ..Default::default()
661        };
662        let manager = CompactionManager::new(temp_dir.path(), config);
663
664        let files = vec![
665            FileInfo {
666                path: PathBuf::from("small1.parquet"),
667                size: 500_000,
668                created: Utc::now(),
669            },
670            FileInfo {
671                path: PathBuf::from("small2.parquet"),
672                size: 600_000,
673                created: Utc::now(),
674            },
675        ];
676
677        let selected = manager.select_files_for_compaction(&files);
678        assert_eq!(selected.len(), 0); // Not enough small files
679    }
680
681    #[test]
682    fn test_file_selection_time_based() {
683        let temp_dir = TempDir::new().unwrap();
684        let config = CompactionConfig {
685            min_files_to_compact: 2,
686            strategy: CompactionStrategy::TimeBased,
687            ..Default::default()
688        };
689        let manager = CompactionManager::new(temp_dir.path(), config);
690
691        let old_time = Utc::now() - chrono::Duration::hours(48);
692        let files = vec![
693            FileInfo {
694                path: PathBuf::from("old1.parquet"),
695                size: 1_000_000,
696                created: old_time,
697            },
698            FileInfo {
699                path: PathBuf::from("old2.parquet"),
700                size: 2_000_000,
701                created: old_time,
702            },
703            FileInfo {
704                path: PathBuf::from("new.parquet"),
705                size: 500_000,
706                created: Utc::now(),
707            },
708        ];
709
710        let selected = manager.select_files_for_compaction(&files);
711        assert_eq!(selected.len(), 2); // Only the 2 old files
712    }
713
714    #[test]
715    fn test_file_selection_time_based_not_enough() {
716        let temp_dir = TempDir::new().unwrap();
717        let config = CompactionConfig {
718            min_files_to_compact: 3,
719            strategy: CompactionStrategy::TimeBased,
720            ..Default::default()
721        };
722        let manager = CompactionManager::new(temp_dir.path(), config);
723
724        let old_time = Utc::now() - chrono::Duration::hours(48);
725        let files = vec![
726            FileInfo {
727                path: PathBuf::from("old1.parquet"),
728                size: 1_000_000,
729                created: old_time,
730            },
731            FileInfo {
732                path: PathBuf::from("new.parquet"),
733                size: 500_000,
734                created: Utc::now(),
735            },
736        ];
737
738        let selected = manager.select_files_for_compaction(&files);
739        assert_eq!(selected.len(), 0); // Not enough old files
740    }
741
742    #[test]
743    fn test_file_selection_full_compaction() {
744        let temp_dir = TempDir::new().unwrap();
745        let config = CompactionConfig {
746            strategy: CompactionStrategy::FullCompaction,
747            ..Default::default()
748        };
749        let manager = CompactionManager::new(temp_dir.path(), config);
750
751        let files = vec![
752            FileInfo {
753                path: PathBuf::from("file1.parquet"),
754                size: 1_000_000,
755                created: Utc::now(),
756            },
757            FileInfo {
758                path: PathBuf::from("file2.parquet"),
759                size: 2_000_000,
760                created: Utc::now(),
761            },
762        ];
763
764        let selected = manager.select_files_for_compaction(&files);
765        assert_eq!(selected.len(), 2); // All files selected
766    }
767
768    #[test]
769    fn test_compaction_strategy_serde() {
770        let strategies = vec![
771            CompactionStrategy::SizeBased,
772            CompactionStrategy::TimeBased,
773            CompactionStrategy::FullCompaction,
774        ];
775
776        for strategy in strategies {
777            let json = serde_json::to_string(&strategy).unwrap();
778            let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
779            assert_eq!(parsed, strategy);
780        }
781    }
782
783    #[test]
784    fn test_compaction_stats_default() {
785        let stats = CompactionStats::default();
786        assert_eq!(stats.total_compactions, 0);
787        assert_eq!(stats.total_files_compacted, 0);
788    }
789
790    #[test]
791    fn test_compaction_stats_serde() {
792        let stats = CompactionStats {
793            total_compactions: 5,
794            total_files_compacted: 20,
795            total_bytes_before: 1000000,
796            total_bytes_after: 500000,
797            total_events_compacted: 10000,
798            last_compaction_duration_ms: 500,
799            space_saved_bytes: 500000,
800        };
801
802        let json = serde_json::to_string(&stats).unwrap();
803        assert!(json.contains("\"total_compactions\":5"));
804        assert!(json.contains("\"space_saved_bytes\":500000"));
805    }
806
807    #[test]
808    fn test_compaction_result_serde() {
809        let result = CompactionResult {
810            files_compacted: 3,
811            bytes_before: 1000000,
812            bytes_after: 500000,
813            events_compacted: 5000,
814            duration_ms: 250,
815        };
816
817        let json = serde_json::to_string(&result).unwrap();
818        assert!(json.contains("\"files_compacted\":3"));
819        assert!(json.contains("\"bytes_before\":1000000"));
820    }
821
822    #[test]
823    fn test_compaction_task_creation() {
824        let temp_dir = TempDir::new().unwrap();
825        let config = CompactionConfig::default();
826        let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
827
828        let _task = CompactionTask::new(manager.clone(), 60);
829        // Task created successfully
830    }
831
832    #[test]
833    fn test_list_parquet_files_empty() {
834        let temp_dir = TempDir::new().unwrap();
835        let config = CompactionConfig::default();
836        let manager = CompactionManager::new(temp_dir.path(), config);
837
838        let files = manager.list_parquet_files().unwrap();
839        assert!(files.is_empty());
840    }
841
842    #[test]
843    fn test_list_parquet_files_with_non_parquet() {
844        let temp_dir = TempDir::new().unwrap();
845        let config = CompactionConfig::default();
846        let manager = CompactionManager::new(temp_dir.path(), config);
847
848        // Create non-parquet files
849        std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
850        std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
851
852        let files = manager.list_parquet_files().unwrap();
853        assert!(files.is_empty()); // No parquet files
854    }
855}