Skip to main content

allsource_core/infrastructure/persistence/
compaction.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4    infrastructure::persistence::storage::ParquetStorage,
5};
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::{
10    fs,
11    path::{Path, PathBuf},
12    sync::Arc,
13    time::Duration,
14};
15
16/// Manages Parquet file compaction for optimal storage and query performance
17pub struct CompactionManager {
18    /// Directory where Parquet files are stored
19    storage_dir: PathBuf,
20
21    /// Configuration
22    config: CompactionConfig,
23
24    /// Statistics
25    stats: Arc<RwLock<CompactionStats>>,
26
27    /// Last compaction time
28    last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
29}
30
31#[derive(Debug, Clone)]
32pub struct CompactionConfig {
33    /// Minimum number of files to trigger compaction
34    pub min_files_to_compact: usize,
35
36    /// Target size for compacted files (in bytes)
37    pub target_file_size: usize,
38
39    /// Maximum size for a single compacted file (in bytes)
40    pub max_file_size: usize,
41
42    /// Minimum file size to consider for compaction (small files)
43    pub small_file_threshold: usize,
44
45    /// Time interval between automatic compactions (in seconds)
46    pub compaction_interval_seconds: u64,
47
48    /// Enable automatic background compaction
49    pub auto_compact: bool,
50
51    /// Compaction strategy
52    pub strategy: CompactionStrategy,
53}
54
55impl Default for CompactionConfig {
56    fn default() -> Self {
57        Self {
58            min_files_to_compact: 3,
59            target_file_size: 128 * 1024 * 1024,    // 128 MB
60            max_file_size: 256 * 1024 * 1024,       // 256 MB
61            small_file_threshold: 10 * 1024 * 1024, // 10 MB
62            compaction_interval_seconds: 3600,      // 1 hour
63            auto_compact: true,
64            strategy: CompactionStrategy::SizeBased,
65        }
66    }
67}
68
69#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
70#[serde(rename_all = "lowercase")]
71pub enum CompactionStrategy {
72    /// Compact based on file size (default)
73    SizeBased,
74    /// Compact based on file age
75    TimeBased,
76    /// Compact all files into one
77    FullCompaction,
78}
79
80#[derive(Debug, Clone, Default, Serialize)]
81pub struct CompactionStats {
82    pub total_compactions: u64,
83    pub total_files_compacted: u64,
84    pub total_bytes_before: u64,
85    pub total_bytes_after: u64,
86    pub total_events_compacted: u64,
87    pub last_compaction_duration_ms: u64,
88    pub space_saved_bytes: u64,
89}
90
91/// Information about a Parquet file candidate for compaction
92#[derive(Debug, Clone)]
93struct FileInfo {
94    path: PathBuf,
95    size: u64,
96    created: DateTime<Utc>,
97}
98
99impl CompactionManager {
100    /// Create a new compaction manager
101    pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
102        let storage_dir = storage_dir.into();
103
104        tracing::info!(
105            "✅ Compaction manager initialized at: {}",
106            storage_dir.display()
107        );
108
109        Self {
110            storage_dir,
111            config,
112            stats: Arc::new(RwLock::new(CompactionStats::default())),
113            last_compaction: Arc::new(RwLock::new(None)),
114        }
115    }
116
117    /// List all Parquet files in the storage directory
118    fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
119        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
120            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
121        })?;
122
123        let mut files = Vec::new();
124
125        for entry in entries {
126            let entry = entry.map_err(|e| {
127                AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
128            })?;
129
130            let path = entry.path();
131            if let Some(ext) = path.extension()
132                && ext == "parquet"
133            {
134                let metadata = entry.metadata().map_err(|e| {
135                    AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
136                })?;
137
138                let size = metadata.len();
139                let created = metadata
140                    .created()
141                    .ok()
142                    .and_then(|t| {
143                        t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
144                            DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
145                        })
146                    })
147                    .unwrap_or_else(Utc::now);
148
149                files.push(FileInfo {
150                    path,
151                    size,
152                    created,
153                });
154            }
155        }
156
157        // Sort by creation time (oldest first)
158        files.sort_by_key(|f| f.created);
159
160        Ok(files)
161    }
162
163    /// Identify files that should be compacted based on strategy
164    fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
165        match self.config.strategy {
166            CompactionStrategy::SizeBased => self.select_small_files(files),
167            CompactionStrategy::TimeBased => self.select_old_files(files),
168            CompactionStrategy::FullCompaction => files.to_vec(),
169        }
170    }
171
172    /// Select small files for compaction
173    fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
174        let small_files: Vec<FileInfo> = files
175            .iter()
176            .filter(|f| f.size < self.config.small_file_threshold as u64)
177            .cloned()
178            .collect();
179
180        // Only compact if we have enough small files
181        if small_files.len() >= self.config.min_files_to_compact {
182            small_files
183        } else {
184            Vec::new()
185        }
186    }
187
188    /// Select old files for time-based compaction
189    fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
190        let now = Utc::now();
191        let age_threshold = chrono::Duration::hours(24); // Files older than 24 hours
192
193        let old_files: Vec<FileInfo> = files
194            .iter()
195            .filter(|f| now - f.created > age_threshold)
196            .cloned()
197            .collect();
198
199        if old_files.len() >= self.config.min_files_to_compact {
200            old_files
201        } else {
202            Vec::new()
203        }
204    }
205
206    /// Check if compaction should run
207    pub fn should_compact(&self) -> bool {
208        if !self.config.auto_compact {
209            return false;
210        }
211
212        let last = self.last_compaction.read();
213        match *last {
214            None => true, // Never compacted
215            Some(last_time) => {
216                let elapsed = (Utc::now() - last_time).num_seconds();
217                elapsed >= self.config.compaction_interval_seconds as i64
218            }
219        }
220    }
221
222    /// Perform compaction of Parquet files
223    pub fn compact(&self) -> Result<CompactionResult> {
224        let start_time = std::time::Instant::now();
225        tracing::info!("🔄 Starting Parquet compaction...");
226
227        // List all Parquet files
228        let files = self.list_parquet_files()?;
229
230        if files.is_empty() {
231            tracing::debug!("No Parquet files to compact");
232            return Ok(CompactionResult {
233                files_compacted: 0,
234                bytes_before: 0,
235                bytes_after: 0,
236                events_compacted: 0,
237                duration_ms: 0,
238            });
239        }
240
241        // Select files for compaction
242        let files_to_compact = self.select_files_for_compaction(&files);
243
244        if files_to_compact.is_empty() {
245            tracing::debug!(
246                "No files meet compaction criteria (strategy: {:?})",
247                self.config.strategy
248            );
249            return Ok(CompactionResult {
250                files_compacted: 0,
251                bytes_before: 0,
252                bytes_after: 0,
253                events_compacted: 0,
254                duration_ms: 0,
255            });
256        }
257
258        let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
259
260        tracing::info!(
261            "Compacting {} files ({:.2} MB)",
262            files_to_compact.len(),
263            bytes_before as f64 / (1024.0 * 1024.0)
264        );
265
266        // Read events from all files to be compacted
267        let mut all_events = Vec::new();
268        for file_info in &files_to_compact {
269            match self.read_parquet_file(&file_info.path) {
270                Ok(mut events) => {
271                    all_events.append(&mut events);
272                }
273                Err(e) => {
274                    tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
275                    // Continue with other files
276                }
277            }
278        }
279
280        if all_events.is_empty() {
281            tracing::warn!("No events read from files to compact");
282            return Ok(CompactionResult {
283                files_compacted: 0,
284                bytes_before,
285                bytes_after: 0,
286                events_compacted: 0,
287                duration_ms: start_time.elapsed().as_millis() as u64,
288            });
289        }
290
291        // Sort events by timestamp for better compression and query performance
292        all_events.sort_by_key(|e| e.timestamp);
293
294        tracing::debug!("Read {} events for compaction", all_events.len());
295
296        // Write compacted file(s)
297        let compacted_files = self.write_compacted_files(&all_events)?;
298
299        let bytes_after: u64 = compacted_files
300            .iter()
301            .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
302            .sum();
303
304        // Delete original files atomically
305        for file_info in &files_to_compact {
306            if let Err(e) = fs::remove_file(&file_info.path) {
307                tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
308            } else {
309                tracing::debug!("Removed old file: {:?}", file_info.path);
310            }
311        }
312
313        let duration_ms = start_time.elapsed().as_millis() as u64;
314
315        // Update statistics
316        let mut stats = self.stats.write();
317        stats.total_compactions += 1;
318        stats.total_files_compacted += files_to_compact.len() as u64;
319        stats.total_bytes_before += bytes_before;
320        stats.total_bytes_after += bytes_after;
321        stats.total_events_compacted += all_events.len() as u64;
322        stats.last_compaction_duration_ms = duration_ms;
323        stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
324        drop(stats);
325
326        // Update last compaction time
327        *self.last_compaction.write() = Some(Utc::now());
328
329        let compression_ratio = if bytes_before > 0 {
330            (bytes_after as f64 / bytes_before as f64) * 100.0
331        } else {
332            100.0
333        };
334
335        tracing::info!(
336            "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
337            files_to_compact.len(),
338            compacted_files.len(),
339            bytes_before as f64 / (1024.0 * 1024.0),
340            bytes_after as f64 / (1024.0 * 1024.0),
341            compression_ratio,
342            all_events.len(),
343            duration_ms
344        );
345
346        Ok(CompactionResult {
347            files_compacted: files_to_compact.len(),
348            bytes_before,
349            bytes_after,
350            events_compacted: all_events.len(),
351            duration_ms,
352        })
353    }
354
355    /// Read events from a Parquet file
356    fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
357        // Use ParquetStorage to read the file
358        let storage = ParquetStorage::new(&self.storage_dir)?;
359
360        // For now, we'll read all events and filter by file
361        // In a production system, you'd want to read specific files
362        let all_events = storage.load_all_events()?;
363
364        Ok(all_events)
365    }
366
367    /// Write compacted events to new Parquet file(s)
368    fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
369        let mut compacted_files = Vec::new();
370        let mut current_batch = Vec::new();
371        let mut current_size = 0;
372
373        for event in events {
374            // Estimate event size (rough approximation)
375            let event_size = serde_json::to_string(event)
376                .map(|s| s.len())
377                .unwrap_or(1024);
378
379            // Check if adding this event would exceed target size
380            if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
381            {
382                // Write current batch
383                let file_path = self.write_batch(&current_batch)?;
384                compacted_files.push(file_path);
385
386                // Start new batch
387                current_batch.clear();
388                current_size = 0;
389            }
390
391            current_batch.push(event.clone());
392            current_size += event_size;
393
394            // Also check max file size
395            if current_size >= self.config.max_file_size {
396                let file_path = self.write_batch(&current_batch)?;
397                compacted_files.push(file_path);
398
399                current_batch.clear();
400                current_size = 0;
401            }
402        }
403
404        // Write remaining events
405        if !current_batch.is_empty() {
406            let file_path = self.write_batch(&current_batch)?;
407            compacted_files.push(file_path);
408        }
409
410        Ok(compacted_files)
411    }
412
413    /// Write a batch of events to a new Parquet file
414    fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
415        let storage = ParquetStorage::new(&self.storage_dir)?;
416
417        // Generate filename with timestamp
418        let filename = format!(
419            "events-compacted-{}.parquet",
420            Utc::now().format("%Y%m%d-%H%M%S-%f")
421        );
422        let file_path = self.storage_dir.join(filename);
423
424        // Write events
425        for event in events {
426            storage.append_event(event.clone())?;
427        }
428
429        // Flush to disk
430        storage.flush()?;
431
432        tracing::debug!(
433            "Wrote compacted file: {:?} ({} events)",
434            file_path,
435            events.len()
436        );
437
438        Ok(file_path)
439    }
440
441    /// Get compaction statistics
442    pub fn stats(&self) -> CompactionStats {
443        (*self.stats.read()).clone()
444    }
445
446    /// Get configuration
447    pub fn config(&self) -> &CompactionConfig {
448        &self.config
449    }
450
451    /// Trigger manual compaction
452    pub fn compact_now(&self) -> Result<CompactionResult> {
453        tracing::info!("Manual compaction triggered");
454        self.compact()
455    }
456}
457
458/// Result of a compaction operation
459#[derive(Debug, Clone, Serialize)]
460pub struct CompactionResult {
461    pub files_compacted: usize,
462    pub bytes_before: u64,
463    pub bytes_after: u64,
464    pub events_compacted: usize,
465    pub duration_ms: u64,
466}
467
468/// Background compaction task
469pub struct CompactionTask {
470    manager: Arc<CompactionManager>,
471    interval: Duration,
472}
473
474impl CompactionTask {
475    /// Create a new background compaction task
476    pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
477        Self {
478            manager,
479            interval: Duration::from_secs(interval_seconds),
480        }
481    }
482
483    /// Run the compaction task in a loop
484    pub async fn run(self) {
485        let mut interval = tokio::time::interval(self.interval);
486
487        loop {
488            interval.tick().await;
489
490            if self.manager.should_compact() {
491                tracing::debug!("Auto-compaction check triggered");
492
493                match self.manager.compact() {
494                    Ok(result) => {
495                        if result.files_compacted > 0 {
496                            tracing::info!(
497                                "Auto-compaction succeeded: {} files, {:.2} MB saved",
498                                result.files_compacted,
499                                (result.bytes_before - result.bytes_after) as f64
500                                    / (1024.0 * 1024.0)
501                            );
502                        }
503                    }
504                    Err(e) => {
505                        tracing::error!("Auto-compaction failed: {}", e);
506                    }
507                }
508            }
509        }
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use tempfile::TempDir;
517
518    #[test]
519    fn test_compaction_manager_creation() {
520        let temp_dir = TempDir::new().unwrap();
521        let config = CompactionConfig::default();
522        let manager = CompactionManager::new(temp_dir.path(), config);
523
524        assert_eq!(manager.stats().total_compactions, 0);
525    }
526
527    #[test]
528    fn test_should_compact() {
529        let temp_dir = TempDir::new().unwrap();
530        let config = CompactionConfig {
531            auto_compact: true,
532            compaction_interval_seconds: 1,
533            ..Default::default()
534        };
535        let manager = CompactionManager::new(temp_dir.path(), config);
536
537        // Should compact on first check (never compacted)
538        assert!(manager.should_compact());
539    }
540
541    #[test]
542    fn test_file_selection_size_based() {
543        let temp_dir = TempDir::new().unwrap();
544        let config = CompactionConfig {
545            small_file_threshold: 1024 * 1024, // 1 MB
546            min_files_to_compact: 2,
547            strategy: CompactionStrategy::SizeBased,
548            ..Default::default()
549        };
550        let manager = CompactionManager::new(temp_dir.path(), config);
551
552        let files = vec![
553            FileInfo {
554                path: PathBuf::from("small1.parquet"),
555                size: 500_000, // 500 KB
556                created: Utc::now(),
557            },
558            FileInfo {
559                path: PathBuf::from("small2.parquet"),
560                size: 600_000, // 600 KB
561                created: Utc::now(),
562            },
563            FileInfo {
564                path: PathBuf::from("large.parquet"),
565                size: 10_000_000, // 10 MB
566                created: Utc::now(),
567            },
568        ];
569
570        let selected = manager.select_files_for_compaction(&files);
571        assert_eq!(selected.len(), 2); // Only the 2 small files
572    }
573
574    #[test]
575    fn test_default_compaction_config() {
576        let config = CompactionConfig::default();
577        assert_eq!(config.min_files_to_compact, 3);
578        assert_eq!(config.target_file_size, 128 * 1024 * 1024);
579        assert_eq!(config.max_file_size, 256 * 1024 * 1024);
580        assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
581        assert_eq!(config.compaction_interval_seconds, 3600);
582        assert!(config.auto_compact);
583        assert_eq!(config.strategy, CompactionStrategy::SizeBased);
584    }
585
586    #[test]
587    fn test_should_compact_disabled() {
588        let temp_dir = TempDir::new().unwrap();
589        let config = CompactionConfig {
590            auto_compact: false,
591            ..Default::default()
592        };
593        let manager = CompactionManager::new(temp_dir.path(), config);
594
595        assert!(!manager.should_compact());
596    }
597
598    #[test]
599    fn test_compact_empty_directory() {
600        let temp_dir = TempDir::new().unwrap();
601        let config = CompactionConfig::default();
602        let manager = CompactionManager::new(temp_dir.path(), config);
603
604        let result = manager.compact().unwrap();
605        assert_eq!(result.files_compacted, 0);
606        assert_eq!(result.bytes_before, 0);
607        assert_eq!(result.bytes_after, 0);
608        assert_eq!(result.events_compacted, 0);
609    }
610
611    #[test]
612    fn test_compact_now() {
613        let temp_dir = TempDir::new().unwrap();
614        let config = CompactionConfig::default();
615        let manager = CompactionManager::new(temp_dir.path(), config);
616
617        let result = manager.compact_now().unwrap();
618        assert_eq!(result.files_compacted, 0);
619    }
620
621    #[test]
622    fn test_get_config() {
623        let temp_dir = TempDir::new().unwrap();
624        let config = CompactionConfig {
625            min_files_to_compact: 5,
626            ..Default::default()
627        };
628        let manager = CompactionManager::new(temp_dir.path(), config);
629
630        assert_eq!(manager.config().min_files_to_compact, 5);
631    }
632
633    #[test]
634    fn test_get_stats() {
635        let temp_dir = TempDir::new().unwrap();
636        let config = CompactionConfig::default();
637        let manager = CompactionManager::new(temp_dir.path(), config);
638
639        let stats = manager.stats();
640        assert_eq!(stats.total_compactions, 0);
641        assert_eq!(stats.total_files_compacted, 0);
642        assert_eq!(stats.total_bytes_before, 0);
643        assert_eq!(stats.total_bytes_after, 0);
644        assert_eq!(stats.total_events_compacted, 0);
645        assert_eq!(stats.last_compaction_duration_ms, 0);
646        assert_eq!(stats.space_saved_bytes, 0);
647    }
648
649    #[test]
650    fn test_file_selection_not_enough_small_files() {
651        let temp_dir = TempDir::new().unwrap();
652        let config = CompactionConfig {
653            small_file_threshold: 1024 * 1024,
654            min_files_to_compact: 3, // Need 3 files
655            strategy: CompactionStrategy::SizeBased,
656            ..Default::default()
657        };
658        let manager = CompactionManager::new(temp_dir.path(), config);
659
660        let files = vec![
661            FileInfo {
662                path: PathBuf::from("small1.parquet"),
663                size: 500_000,
664                created: Utc::now(),
665            },
666            FileInfo {
667                path: PathBuf::from("small2.parquet"),
668                size: 600_000,
669                created: Utc::now(),
670            },
671        ];
672
673        let selected = manager.select_files_for_compaction(&files);
674        assert_eq!(selected.len(), 0); // Not enough small files
675    }
676
677    #[test]
678    fn test_file_selection_time_based() {
679        let temp_dir = TempDir::new().unwrap();
680        let config = CompactionConfig {
681            min_files_to_compact: 2,
682            strategy: CompactionStrategy::TimeBased,
683            ..Default::default()
684        };
685        let manager = CompactionManager::new(temp_dir.path(), config);
686
687        let old_time = Utc::now() - chrono::Duration::hours(48);
688        let files = vec![
689            FileInfo {
690                path: PathBuf::from("old1.parquet"),
691                size: 1_000_000,
692                created: old_time,
693            },
694            FileInfo {
695                path: PathBuf::from("old2.parquet"),
696                size: 2_000_000,
697                created: old_time,
698            },
699            FileInfo {
700                path: PathBuf::from("new.parquet"),
701                size: 500_000,
702                created: Utc::now(),
703            },
704        ];
705
706        let selected = manager.select_files_for_compaction(&files);
707        assert_eq!(selected.len(), 2); // Only the 2 old files
708    }
709
710    #[test]
711    fn test_file_selection_time_based_not_enough() {
712        let temp_dir = TempDir::new().unwrap();
713        let config = CompactionConfig {
714            min_files_to_compact: 3,
715            strategy: CompactionStrategy::TimeBased,
716            ..Default::default()
717        };
718        let manager = CompactionManager::new(temp_dir.path(), config);
719
720        let old_time = Utc::now() - chrono::Duration::hours(48);
721        let files = vec![
722            FileInfo {
723                path: PathBuf::from("old1.parquet"),
724                size: 1_000_000,
725                created: old_time,
726            },
727            FileInfo {
728                path: PathBuf::from("new.parquet"),
729                size: 500_000,
730                created: Utc::now(),
731            },
732        ];
733
734        let selected = manager.select_files_for_compaction(&files);
735        assert_eq!(selected.len(), 0); // Not enough old files
736    }
737
738    #[test]
739    fn test_file_selection_full_compaction() {
740        let temp_dir = TempDir::new().unwrap();
741        let config = CompactionConfig {
742            strategy: CompactionStrategy::FullCompaction,
743            ..Default::default()
744        };
745        let manager = CompactionManager::new(temp_dir.path(), config);
746
747        let files = vec![
748            FileInfo {
749                path: PathBuf::from("file1.parquet"),
750                size: 1_000_000,
751                created: Utc::now(),
752            },
753            FileInfo {
754                path: PathBuf::from("file2.parquet"),
755                size: 2_000_000,
756                created: Utc::now(),
757            },
758        ];
759
760        let selected = manager.select_files_for_compaction(&files);
761        assert_eq!(selected.len(), 2); // All files selected
762    }
763
764    #[test]
765    fn test_compaction_strategy_serde() {
766        let strategies = vec![
767            CompactionStrategy::SizeBased,
768            CompactionStrategy::TimeBased,
769            CompactionStrategy::FullCompaction,
770        ];
771
772        for strategy in strategies {
773            let json = serde_json::to_string(&strategy).unwrap();
774            let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
775            assert_eq!(parsed, strategy);
776        }
777    }
778
779    #[test]
780    fn test_compaction_stats_default() {
781        let stats = CompactionStats::default();
782        assert_eq!(stats.total_compactions, 0);
783        assert_eq!(stats.total_files_compacted, 0);
784    }
785
786    #[test]
787    fn test_compaction_stats_serde() {
788        let stats = CompactionStats {
789            total_compactions: 5,
790            total_files_compacted: 20,
791            total_bytes_before: 1000000,
792            total_bytes_after: 500000,
793            total_events_compacted: 10000,
794            last_compaction_duration_ms: 500,
795            space_saved_bytes: 500000,
796        };
797
798        let json = serde_json::to_string(&stats).unwrap();
799        assert!(json.contains("\"total_compactions\":5"));
800        assert!(json.contains("\"space_saved_bytes\":500000"));
801    }
802
803    #[test]
804    fn test_compaction_result_serde() {
805        let result = CompactionResult {
806            files_compacted: 3,
807            bytes_before: 1000000,
808            bytes_after: 500000,
809            events_compacted: 5000,
810            duration_ms: 250,
811        };
812
813        let json = serde_json::to_string(&result).unwrap();
814        assert!(json.contains("\"files_compacted\":3"));
815        assert!(json.contains("\"bytes_before\":1000000"));
816    }
817
818    #[test]
819    fn test_compaction_task_creation() {
820        let temp_dir = TempDir::new().unwrap();
821        let config = CompactionConfig::default();
822        let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
823
824        let _task = CompactionTask::new(manager.clone(), 60);
825        // Task created successfully
826    }
827
828    #[test]
829    fn test_list_parquet_files_empty() {
830        let temp_dir = TempDir::new().unwrap();
831        let config = CompactionConfig::default();
832        let manager = CompactionManager::new(temp_dir.path(), config);
833
834        let files = manager.list_parquet_files().unwrap();
835        assert!(files.is_empty());
836    }
837
838    #[test]
839    fn test_list_parquet_files_with_non_parquet() {
840        let temp_dir = TempDir::new().unwrap();
841        let config = CompactionConfig::default();
842        let manager = CompactionManager::new(temp_dir.path(), config);
843
844        // Create non-parquet files
845        std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
846        std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
847
848        let files = manager.list_parquet_files().unwrap();
849        assert!(files.is_empty()); // No parquet files
850    }
851}