Skip to main content

allsource_core/infrastructure/persistence/
compaction.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::infrastructure::persistence::storage::ParquetStorage;
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12/// Manages Parquet file compaction for optimal storage and query performance
13pub struct CompactionManager {
14    /// Directory where Parquet files are stored
15    storage_dir: PathBuf,
16
17    /// Configuration
18    config: CompactionConfig,
19
20    /// Statistics
21    stats: Arc<RwLock<CompactionStats>>,
22
23    /// Last compaction time
24    last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
25}
26
27#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29    /// Minimum number of files to trigger compaction
30    pub min_files_to_compact: usize,
31
32    /// Target size for compacted files (in bytes)
33    pub target_file_size: usize,
34
35    /// Maximum size for a single compacted file (in bytes)
36    pub max_file_size: usize,
37
38    /// Minimum file size to consider for compaction (small files)
39    pub small_file_threshold: usize,
40
41    /// Time interval between automatic compactions (in seconds)
42    pub compaction_interval_seconds: u64,
43
44    /// Enable automatic background compaction
45    pub auto_compact: bool,
46
47    /// Compaction strategy
48    pub strategy: CompactionStrategy,
49}
50
51impl Default for CompactionConfig {
52    fn default() -> Self {
53        Self {
54            min_files_to_compact: 3,
55            target_file_size: 128 * 1024 * 1024,    // 128 MB
56            max_file_size: 256 * 1024 * 1024,       // 256 MB
57            small_file_threshold: 10 * 1024 * 1024, // 10 MB
58            compaction_interval_seconds: 3600,      // 1 hour
59            auto_compact: true,
60            strategy: CompactionStrategy::SizeBased,
61        }
62    }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
66#[serde(rename_all = "lowercase")]
67pub enum CompactionStrategy {
68    /// Compact based on file size (default)
69    SizeBased,
70    /// Compact based on file age
71    TimeBased,
72    /// Compact all files into one
73    FullCompaction,
74}
75
76#[derive(Debug, Clone, Default, Serialize)]
77pub struct CompactionStats {
78    pub total_compactions: u64,
79    pub total_files_compacted: u64,
80    pub total_bytes_before: u64,
81    pub total_bytes_after: u64,
82    pub total_events_compacted: u64,
83    pub last_compaction_duration_ms: u64,
84    pub space_saved_bytes: u64,
85}
86
87/// Information about a Parquet file candidate for compaction
88#[derive(Debug, Clone)]
89struct FileInfo {
90    path: PathBuf,
91    size: u64,
92    created: DateTime<Utc>,
93}
94
95impl CompactionManager {
96    /// Create a new compaction manager
97    pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
98        let storage_dir = storage_dir.into();
99
100        tracing::info!(
101            "✅ Compaction manager initialized at: {}",
102            storage_dir.display()
103        );
104
105        Self {
106            storage_dir,
107            config,
108            stats: Arc::new(RwLock::new(CompactionStats::default())),
109            last_compaction: Arc::new(RwLock::new(None)),
110        }
111    }
112
113    /// List all Parquet files in the storage directory
114    fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
115        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
116            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
117        })?;
118
119        let mut files = Vec::new();
120
121        for entry in entries {
122            let entry = entry.map_err(|e| {
123                AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
124            })?;
125
126            let path = entry.path();
127            if let Some(ext) = path.extension() {
128                if ext == "parquet" {
129                    let metadata = entry.metadata().map_err(|e| {
130                        AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
131                    })?;
132
133                    let size = metadata.len();
134                    let created = metadata
135                        .created()
136                        .ok()
137                        .and_then(|t| {
138                            t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
139                                DateTime::from_timestamp(d.as_secs() as i64, 0)
140                                    .unwrap_or_else(Utc::now)
141                            })
142                        })
143                        .unwrap_or_else(Utc::now);
144
145                    files.push(FileInfo {
146                        path,
147                        size,
148                        created,
149                    });
150                }
151            }
152        }
153
154        // Sort by creation time (oldest first)
155        files.sort_by_key(|f| f.created);
156
157        Ok(files)
158    }
159
160    /// Identify files that should be compacted based on strategy
161    fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
162        match self.config.strategy {
163            CompactionStrategy::SizeBased => self.select_small_files(files),
164            CompactionStrategy::TimeBased => self.select_old_files(files),
165            CompactionStrategy::FullCompaction => files.to_vec(),
166        }
167    }
168
169    /// Select small files for compaction
170    fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
171        let small_files: Vec<FileInfo> = files
172            .iter()
173            .filter(|f| f.size < self.config.small_file_threshold as u64)
174            .cloned()
175            .collect();
176
177        // Only compact if we have enough small files
178        if small_files.len() >= self.config.min_files_to_compact {
179            small_files
180        } else {
181            Vec::new()
182        }
183    }
184
185    /// Select old files for time-based compaction
186    fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
187        let now = Utc::now();
188        let age_threshold = chrono::Duration::hours(24); // Files older than 24 hours
189
190        let old_files: Vec<FileInfo> = files
191            .iter()
192            .filter(|f| now - f.created > age_threshold)
193            .cloned()
194            .collect();
195
196        if old_files.len() >= self.config.min_files_to_compact {
197            old_files
198        } else {
199            Vec::new()
200        }
201    }
202
203    /// Check if compaction should run
204    pub fn should_compact(&self) -> bool {
205        if !self.config.auto_compact {
206            return false;
207        }
208
209        let last = self.last_compaction.read();
210        match *last {
211            None => true, // Never compacted
212            Some(last_time) => {
213                let elapsed = (Utc::now() - last_time).num_seconds();
214                elapsed >= self.config.compaction_interval_seconds as i64
215            }
216        }
217    }
218
219    /// Perform compaction of Parquet files
220    pub fn compact(&self) -> Result<CompactionResult> {
221        let start_time = std::time::Instant::now();
222        tracing::info!("🔄 Starting Parquet compaction...");
223
224        // List all Parquet files
225        let files = self.list_parquet_files()?;
226
227        if files.is_empty() {
228            tracing::debug!("No Parquet files to compact");
229            return Ok(CompactionResult {
230                files_compacted: 0,
231                bytes_before: 0,
232                bytes_after: 0,
233                events_compacted: 0,
234                duration_ms: 0,
235            });
236        }
237
238        // Select files for compaction
239        let files_to_compact = self.select_files_for_compaction(&files);
240
241        if files_to_compact.is_empty() {
242            tracing::debug!(
243                "No files meet compaction criteria (strategy: {:?})",
244                self.config.strategy
245            );
246            return Ok(CompactionResult {
247                files_compacted: 0,
248                bytes_before: 0,
249                bytes_after: 0,
250                events_compacted: 0,
251                duration_ms: 0,
252            });
253        }
254
255        let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
256
257        tracing::info!(
258            "Compacting {} files ({:.2} MB)",
259            files_to_compact.len(),
260            bytes_before as f64 / (1024.0 * 1024.0)
261        );
262
263        // Read events from all files to be compacted
264        let mut all_events = Vec::new();
265        for file_info in &files_to_compact {
266            match self.read_parquet_file(&file_info.path) {
267                Ok(mut events) => {
268                    all_events.append(&mut events);
269                }
270                Err(e) => {
271                    tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
272                    // Continue with other files
273                }
274            }
275        }
276
277        if all_events.is_empty() {
278            tracing::warn!("No events read from files to compact");
279            return Ok(CompactionResult {
280                files_compacted: 0,
281                bytes_before,
282                bytes_after: 0,
283                events_compacted: 0,
284                duration_ms: start_time.elapsed().as_millis() as u64,
285            });
286        }
287
288        // Sort events by timestamp for better compression and query performance
289        all_events.sort_by_key(|e| e.timestamp);
290
291        tracing::debug!("Read {} events for compaction", all_events.len());
292
293        // Write compacted file(s)
294        let compacted_files = self.write_compacted_files(&all_events)?;
295
296        let bytes_after: u64 = compacted_files
297            .iter()
298            .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
299            .sum();
300
301        // Delete original files atomically
302        for file_info in &files_to_compact {
303            if let Err(e) = fs::remove_file(&file_info.path) {
304                tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
305            } else {
306                tracing::debug!("Removed old file: {:?}", file_info.path);
307            }
308        }
309
310        let duration_ms = start_time.elapsed().as_millis() as u64;
311
312        // Update statistics
313        let mut stats = self.stats.write();
314        stats.total_compactions += 1;
315        stats.total_files_compacted += files_to_compact.len() as u64;
316        stats.total_bytes_before += bytes_before;
317        stats.total_bytes_after += bytes_after;
318        stats.total_events_compacted += all_events.len() as u64;
319        stats.last_compaction_duration_ms = duration_ms;
320        stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
321        drop(stats);
322
323        // Update last compaction time
324        *self.last_compaction.write() = Some(Utc::now());
325
326        let compression_ratio = if bytes_before > 0 {
327            (bytes_after as f64 / bytes_before as f64) * 100.0
328        } else {
329            100.0
330        };
331
332        tracing::info!(
333            "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
334            files_to_compact.len(),
335            compacted_files.len(),
336            bytes_before as f64 / (1024.0 * 1024.0),
337            bytes_after as f64 / (1024.0 * 1024.0),
338            compression_ratio,
339            all_events.len(),
340            duration_ms
341        );
342
343        Ok(CompactionResult {
344            files_compacted: files_to_compact.len(),
345            bytes_before,
346            bytes_after,
347            events_compacted: all_events.len(),
348            duration_ms,
349        })
350    }
351
352    /// Read events from a Parquet file
353    fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
354        // Use ParquetStorage to read the file
355        let storage = ParquetStorage::new(&self.storage_dir)?;
356
357        // For now, we'll read all events and filter by file
358        // In a production system, you'd want to read specific files
359        let all_events = storage.load_all_events()?;
360
361        Ok(all_events)
362    }
363
364    /// Write compacted events to new Parquet file(s)
365    fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
366        let mut compacted_files = Vec::new();
367        let mut current_batch = Vec::new();
368        let mut current_size = 0;
369
370        for event in events {
371            // Estimate event size (rough approximation)
372            let event_size = serde_json::to_string(event)
373                .map(|s| s.len())
374                .unwrap_or(1024);
375
376            // Check if adding this event would exceed target size
377            if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
378            {
379                // Write current batch
380                let file_path = self.write_batch(&current_batch)?;
381                compacted_files.push(file_path);
382
383                // Start new batch
384                current_batch.clear();
385                current_size = 0;
386            }
387
388            current_batch.push(event.clone());
389            current_size += event_size;
390
391            // Also check max file size
392            if current_size >= self.config.max_file_size {
393                let file_path = self.write_batch(&current_batch)?;
394                compacted_files.push(file_path);
395
396                current_batch.clear();
397                current_size = 0;
398            }
399        }
400
401        // Write remaining events
402        if !current_batch.is_empty() {
403            let file_path = self.write_batch(&current_batch)?;
404            compacted_files.push(file_path);
405        }
406
407        Ok(compacted_files)
408    }
409
410    /// Write a batch of events to a new Parquet file
411    fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
412        let mut storage = ParquetStorage::new(&self.storage_dir)?;
413
414        // Generate filename with timestamp
415        let filename = format!(
416            "events-compacted-{}.parquet",
417            Utc::now().format("%Y%m%d-%H%M%S-%f")
418        );
419        let file_path = self.storage_dir.join(filename);
420
421        // Write events
422        for event in events {
423            storage.append_event(event.clone())?;
424        }
425
426        // Flush to disk
427        storage.flush()?;
428
429        tracing::debug!(
430            "Wrote compacted file: {:?} ({} events)",
431            file_path,
432            events.len()
433        );
434
435        Ok(file_path)
436    }
437
438    /// Get compaction statistics
439    pub fn stats(&self) -> CompactionStats {
440        (*self.stats.read()).clone()
441    }
442
443    /// Get configuration
444    pub fn config(&self) -> &CompactionConfig {
445        &self.config
446    }
447
448    /// Trigger manual compaction
449    pub fn compact_now(&self) -> Result<CompactionResult> {
450        tracing::info!("Manual compaction triggered");
451        self.compact()
452    }
453}
454
455/// Result of a compaction operation
456#[derive(Debug, Clone, Serialize)]
457pub struct CompactionResult {
458    pub files_compacted: usize,
459    pub bytes_before: u64,
460    pub bytes_after: u64,
461    pub events_compacted: usize,
462    pub duration_ms: u64,
463}
464
465/// Background compaction task
466pub struct CompactionTask {
467    manager: Arc<CompactionManager>,
468    interval: Duration,
469}
470
471impl CompactionTask {
472    /// Create a new background compaction task
473    pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
474        Self {
475            manager,
476            interval: Duration::from_secs(interval_seconds),
477        }
478    }
479
480    /// Run the compaction task in a loop
481    pub async fn run(self) {
482        let mut interval = tokio::time::interval(self.interval);
483
484        loop {
485            interval.tick().await;
486
487            if self.manager.should_compact() {
488                tracing::debug!("Auto-compaction check triggered");
489
490                match self.manager.compact() {
491                    Ok(result) => {
492                        if result.files_compacted > 0 {
493                            tracing::info!(
494                                "Auto-compaction succeeded: {} files, {:.2} MB saved",
495                                result.files_compacted,
496                                (result.bytes_before - result.bytes_after) as f64
497                                    / (1024.0 * 1024.0)
498                            );
499                        }
500                    }
501                    Err(e) => {
502                        tracing::error!("Auto-compaction failed: {}", e);
503                    }
504                }
505            }
506        }
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use tempfile::TempDir;
514
515    #[test]
516    fn test_compaction_manager_creation() {
517        let temp_dir = TempDir::new().unwrap();
518        let config = CompactionConfig::default();
519        let manager = CompactionManager::new(temp_dir.path(), config);
520
521        assert_eq!(manager.stats().total_compactions, 0);
522    }
523
524    #[test]
525    fn test_should_compact() {
526        let temp_dir = TempDir::new().unwrap();
527        let config = CompactionConfig {
528            auto_compact: true,
529            compaction_interval_seconds: 1,
530            ..Default::default()
531        };
532        let manager = CompactionManager::new(temp_dir.path(), config);
533
534        // Should compact on first check (never compacted)
535        assert!(manager.should_compact());
536    }
537
538    #[test]
539    fn test_file_selection_size_based() {
540        let temp_dir = TempDir::new().unwrap();
541        let config = CompactionConfig {
542            small_file_threshold: 1024 * 1024, // 1 MB
543            min_files_to_compact: 2,
544            strategy: CompactionStrategy::SizeBased,
545            ..Default::default()
546        };
547        let manager = CompactionManager::new(temp_dir.path(), config);
548
549        let files = vec![
550            FileInfo {
551                path: PathBuf::from("small1.parquet"),
552                size: 500_000, // 500 KB
553                created: Utc::now(),
554            },
555            FileInfo {
556                path: PathBuf::from("small2.parquet"),
557                size: 600_000, // 600 KB
558                created: Utc::now(),
559            },
560            FileInfo {
561                path: PathBuf::from("large.parquet"),
562                size: 10_000_000, // 10 MB
563                created: Utc::now(),
564            },
565        ];
566
567        let selected = manager.select_files_for_compaction(&files);
568        assert_eq!(selected.len(), 2); // Only the 2 small files
569    }
570
571    #[test]
572    fn test_default_compaction_config() {
573        let config = CompactionConfig::default();
574        assert_eq!(config.min_files_to_compact, 3);
575        assert_eq!(config.target_file_size, 128 * 1024 * 1024);
576        assert_eq!(config.max_file_size, 256 * 1024 * 1024);
577        assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
578        assert_eq!(config.compaction_interval_seconds, 3600);
579        assert!(config.auto_compact);
580        assert_eq!(config.strategy, CompactionStrategy::SizeBased);
581    }
582
583    #[test]
584    fn test_should_compact_disabled() {
585        let temp_dir = TempDir::new().unwrap();
586        let config = CompactionConfig {
587            auto_compact: false,
588            ..Default::default()
589        };
590        let manager = CompactionManager::new(temp_dir.path(), config);
591
592        assert!(!manager.should_compact());
593    }
594
595    #[test]
596    fn test_compact_empty_directory() {
597        let temp_dir = TempDir::new().unwrap();
598        let config = CompactionConfig::default();
599        let manager = CompactionManager::new(temp_dir.path(), config);
600
601        let result = manager.compact().unwrap();
602        assert_eq!(result.files_compacted, 0);
603        assert_eq!(result.bytes_before, 0);
604        assert_eq!(result.bytes_after, 0);
605        assert_eq!(result.events_compacted, 0);
606    }
607
608    #[test]
609    fn test_compact_now() {
610        let temp_dir = TempDir::new().unwrap();
611        let config = CompactionConfig::default();
612        let manager = CompactionManager::new(temp_dir.path(), config);
613
614        let result = manager.compact_now().unwrap();
615        assert_eq!(result.files_compacted, 0);
616    }
617
618    #[test]
619    fn test_get_config() {
620        let temp_dir = TempDir::new().unwrap();
621        let config = CompactionConfig {
622            min_files_to_compact: 5,
623            ..Default::default()
624        };
625        let manager = CompactionManager::new(temp_dir.path(), config);
626
627        assert_eq!(manager.config().min_files_to_compact, 5);
628    }
629
630    #[test]
631    fn test_get_stats() {
632        let temp_dir = TempDir::new().unwrap();
633        let config = CompactionConfig::default();
634        let manager = CompactionManager::new(temp_dir.path(), config);
635
636        let stats = manager.stats();
637        assert_eq!(stats.total_compactions, 0);
638        assert_eq!(stats.total_files_compacted, 0);
639        assert_eq!(stats.total_bytes_before, 0);
640        assert_eq!(stats.total_bytes_after, 0);
641        assert_eq!(stats.total_events_compacted, 0);
642        assert_eq!(stats.last_compaction_duration_ms, 0);
643        assert_eq!(stats.space_saved_bytes, 0);
644    }
645
646    #[test]
647    fn test_file_selection_not_enough_small_files() {
648        let temp_dir = TempDir::new().unwrap();
649        let config = CompactionConfig {
650            small_file_threshold: 1024 * 1024,
651            min_files_to_compact: 3, // Need 3 files
652            strategy: CompactionStrategy::SizeBased,
653            ..Default::default()
654        };
655        let manager = CompactionManager::new(temp_dir.path(), config);
656
657        let files = vec![
658            FileInfo {
659                path: PathBuf::from("small1.parquet"),
660                size: 500_000,
661                created: Utc::now(),
662            },
663            FileInfo {
664                path: PathBuf::from("small2.parquet"),
665                size: 600_000,
666                created: Utc::now(),
667            },
668        ];
669
670        let selected = manager.select_files_for_compaction(&files);
671        assert_eq!(selected.len(), 0); // Not enough small files
672    }
673
674    #[test]
675    fn test_file_selection_time_based() {
676        let temp_dir = TempDir::new().unwrap();
677        let config = CompactionConfig {
678            min_files_to_compact: 2,
679            strategy: CompactionStrategy::TimeBased,
680            ..Default::default()
681        };
682        let manager = CompactionManager::new(temp_dir.path(), config);
683
684        let old_time = Utc::now() - chrono::Duration::hours(48);
685        let files = vec![
686            FileInfo {
687                path: PathBuf::from("old1.parquet"),
688                size: 1_000_000,
689                created: old_time,
690            },
691            FileInfo {
692                path: PathBuf::from("old2.parquet"),
693                size: 2_000_000,
694                created: old_time,
695            },
696            FileInfo {
697                path: PathBuf::from("new.parquet"),
698                size: 500_000,
699                created: Utc::now(),
700            },
701        ];
702
703        let selected = manager.select_files_for_compaction(&files);
704        assert_eq!(selected.len(), 2); // Only the 2 old files
705    }
706
707    #[test]
708    fn test_file_selection_time_based_not_enough() {
709        let temp_dir = TempDir::new().unwrap();
710        let config = CompactionConfig {
711            min_files_to_compact: 3,
712            strategy: CompactionStrategy::TimeBased,
713            ..Default::default()
714        };
715        let manager = CompactionManager::new(temp_dir.path(), config);
716
717        let old_time = Utc::now() - chrono::Duration::hours(48);
718        let files = vec![
719            FileInfo {
720                path: PathBuf::from("old1.parquet"),
721                size: 1_000_000,
722                created: old_time,
723            },
724            FileInfo {
725                path: PathBuf::from("new.parquet"),
726                size: 500_000,
727                created: Utc::now(),
728            },
729        ];
730
731        let selected = manager.select_files_for_compaction(&files);
732        assert_eq!(selected.len(), 0); // Not enough old files
733    }
734
735    #[test]
736    fn test_file_selection_full_compaction() {
737        let temp_dir = TempDir::new().unwrap();
738        let config = CompactionConfig {
739            strategy: CompactionStrategy::FullCompaction,
740            ..Default::default()
741        };
742        let manager = CompactionManager::new(temp_dir.path(), config);
743
744        let files = vec![
745            FileInfo {
746                path: PathBuf::from("file1.parquet"),
747                size: 1_000_000,
748                created: Utc::now(),
749            },
750            FileInfo {
751                path: PathBuf::from("file2.parquet"),
752                size: 2_000_000,
753                created: Utc::now(),
754            },
755        ];
756
757        let selected = manager.select_files_for_compaction(&files);
758        assert_eq!(selected.len(), 2); // All files selected
759    }
760
761    #[test]
762    fn test_compaction_strategy_serde() {
763        let strategies = vec![
764            CompactionStrategy::SizeBased,
765            CompactionStrategy::TimeBased,
766            CompactionStrategy::FullCompaction,
767        ];
768
769        for strategy in strategies {
770            let json = serde_json::to_string(&strategy).unwrap();
771            let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
772            assert_eq!(parsed, strategy);
773        }
774    }
775
776    #[test]
777    fn test_compaction_stats_default() {
778        let stats = CompactionStats::default();
779        assert_eq!(stats.total_compactions, 0);
780        assert_eq!(stats.total_files_compacted, 0);
781    }
782
783    #[test]
784    fn test_compaction_stats_serde() {
785        let stats = CompactionStats {
786            total_compactions: 5,
787            total_files_compacted: 20,
788            total_bytes_before: 1000000,
789            total_bytes_after: 500000,
790            total_events_compacted: 10000,
791            last_compaction_duration_ms: 500,
792            space_saved_bytes: 500000,
793        };
794
795        let json = serde_json::to_string(&stats).unwrap();
796        assert!(json.contains("\"total_compactions\":5"));
797        assert!(json.contains("\"space_saved_bytes\":500000"));
798    }
799
800    #[test]
801    fn test_compaction_result_serde() {
802        let result = CompactionResult {
803            files_compacted: 3,
804            bytes_before: 1000000,
805            bytes_after: 500000,
806            events_compacted: 5000,
807            duration_ms: 250,
808        };
809
810        let json = serde_json::to_string(&result).unwrap();
811        assert!(json.contains("\"files_compacted\":3"));
812        assert!(json.contains("\"bytes_before\":1000000"));
813    }
814
815    #[test]
816    fn test_compaction_task_creation() {
817        let temp_dir = TempDir::new().unwrap();
818        let config = CompactionConfig::default();
819        let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
820
821        let _task = CompactionTask::new(manager.clone(), 60);
822        // Task created successfully
823    }
824
825    #[test]
826    fn test_list_parquet_files_empty() {
827        let temp_dir = TempDir::new().unwrap();
828        let config = CompactionConfig::default();
829        let manager = CompactionManager::new(temp_dir.path(), config);
830
831        let files = manager.list_parquet_files().unwrap();
832        assert!(files.is_empty());
833    }
834
835    #[test]
836    fn test_list_parquet_files_with_non_parquet() {
837        let temp_dir = TempDir::new().unwrap();
838        let config = CompactionConfig::default();
839        let manager = CompactionManager::new(temp_dir.path(), config);
840
841        // Create non-parquet files
842        std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
843        std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
844
845        let files = manager.list_parquet_files().unwrap();
846        assert!(files.is_empty()); // No parquet files
847    }
848}