allsource_core/infrastructure/persistence/
compaction.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::infrastructure::persistence::storage::ParquetStorage;
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12/// Manages Parquet file compaction for optimal storage and query performance
13pub struct CompactionManager {
14    /// Directory where Parquet files are stored
15    storage_dir: PathBuf,
16
17    /// Configuration
18    config: CompactionConfig,
19
20    /// Statistics
21    stats: Arc<RwLock<CompactionStats>>,
22
23    /// Last compaction time
24    last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
25}
26
27#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29    /// Minimum number of files to trigger compaction
30    pub min_files_to_compact: usize,
31
32    /// Target size for compacted files (in bytes)
33    pub target_file_size: usize,
34
35    /// Maximum size for a single compacted file (in bytes)
36    pub max_file_size: usize,
37
38    /// Minimum file size to consider for compaction (small files)
39    pub small_file_threshold: usize,
40
41    /// Time interval between automatic compactions (in seconds)
42    pub compaction_interval_seconds: u64,
43
44    /// Enable automatic background compaction
45    pub auto_compact: bool,
46
47    /// Compaction strategy
48    pub strategy: CompactionStrategy,
49}
50
51impl Default for CompactionConfig {
52    fn default() -> Self {
53        Self {
54            min_files_to_compact: 3,
55            target_file_size: 128 * 1024 * 1024,    // 128 MB
56            max_file_size: 256 * 1024 * 1024,       // 256 MB
57            small_file_threshold: 10 * 1024 * 1024, // 10 MB
58            compaction_interval_seconds: 3600,      // 1 hour
59            auto_compact: true,
60            strategy: CompactionStrategy::SizeBased,
61        }
62    }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
66#[serde(rename_all = "lowercase")]
67pub enum CompactionStrategy {
68    /// Compact based on file size (default)
69    SizeBased,
70    /// Compact based on file age
71    TimeBased,
72    /// Compact all files into one
73    FullCompaction,
74}
75
76#[derive(Debug, Clone, Default, Serialize)]
77pub struct CompactionStats {
78    pub total_compactions: u64,
79    pub total_files_compacted: u64,
80    pub total_bytes_before: u64,
81    pub total_bytes_after: u64,
82    pub total_events_compacted: u64,
83    pub last_compaction_duration_ms: u64,
84    pub space_saved_bytes: u64,
85}
86
87/// Information about a Parquet file candidate for compaction
88#[derive(Debug, Clone)]
89struct FileInfo {
90    path: PathBuf,
91    size: u64,
92    created: DateTime<Utc>,
93}
94
95impl CompactionManager {
96    /// Create a new compaction manager
97    pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
98        let storage_dir = storage_dir.into();
99
100        tracing::info!(
101            "✅ Compaction manager initialized at: {}",
102            storage_dir.display()
103        );
104
105        Self {
106            storage_dir,
107            config,
108            stats: Arc::new(RwLock::new(CompactionStats::default())),
109            last_compaction: Arc::new(RwLock::new(None)),
110        }
111    }
112
113    /// List all Parquet files in the storage directory
114    fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
115        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
116            AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
117        })?;
118
119        let mut files = Vec::new();
120
121        for entry in entries {
122            let entry = entry.map_err(|e| {
123                AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
124            })?;
125
126            let path = entry.path();
127            if let Some(ext) = path.extension() {
128                if ext == "parquet" {
129                    let metadata = entry.metadata().map_err(|e| {
130                        AllSourceError::StorageError(format!("Failed to read file metadata: {}", e))
131                    })?;
132
133                    let size = metadata.len();
134                    let created = metadata
135                        .created()
136                        .ok()
137                        .and_then(|t| {
138                            t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
139                                DateTime::from_timestamp(d.as_secs() as i64, 0)
140                                    .unwrap_or_else(Utc::now)
141                            })
142                        })
143                        .unwrap_or_else(Utc::now);
144
145                    files.push(FileInfo {
146                        path,
147                        size,
148                        created,
149                    });
150                }
151            }
152        }
153
154        // Sort by creation time (oldest first)
155        files.sort_by_key(|f| f.created);
156
157        Ok(files)
158    }
159
160    /// Identify files that should be compacted based on strategy
161    fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
162        match self.config.strategy {
163            CompactionStrategy::SizeBased => self.select_small_files(files),
164            CompactionStrategy::TimeBased => self.select_old_files(files),
165            CompactionStrategy::FullCompaction => files.to_vec(),
166        }
167    }
168
169    /// Select small files for compaction
170    fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
171        let small_files: Vec<FileInfo> = files
172            .iter()
173            .filter(|f| f.size < self.config.small_file_threshold as u64)
174            .cloned()
175            .collect();
176
177        // Only compact if we have enough small files
178        if small_files.len() >= self.config.min_files_to_compact {
179            small_files
180        } else {
181            Vec::new()
182        }
183    }
184
185    /// Select old files for time-based compaction
186    fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
187        let now = Utc::now();
188        let age_threshold = chrono::Duration::hours(24); // Files older than 24 hours
189
190        let old_files: Vec<FileInfo> = files
191            .iter()
192            .filter(|f| now - f.created > age_threshold)
193            .cloned()
194            .collect();
195
196        if old_files.len() >= self.config.min_files_to_compact {
197            old_files
198        } else {
199            Vec::new()
200        }
201    }
202
203    /// Check if compaction should run
204    pub fn should_compact(&self) -> bool {
205        if !self.config.auto_compact {
206            return false;
207        }
208
209        let last = self.last_compaction.read();
210        match *last {
211            None => true, // Never compacted
212            Some(last_time) => {
213                let elapsed = (Utc::now() - last_time).num_seconds();
214                elapsed >= self.config.compaction_interval_seconds as i64
215            }
216        }
217    }
218
219    /// Perform compaction of Parquet files
220    pub fn compact(&self) -> Result<CompactionResult> {
221        let start_time = std::time::Instant::now();
222        tracing::info!("🔄 Starting Parquet compaction...");
223
224        // List all Parquet files
225        let files = self.list_parquet_files()?;
226
227        if files.is_empty() {
228            tracing::debug!("No Parquet files to compact");
229            return Ok(CompactionResult {
230                files_compacted: 0,
231                bytes_before: 0,
232                bytes_after: 0,
233                events_compacted: 0,
234                duration_ms: 0,
235            });
236        }
237
238        // Select files for compaction
239        let files_to_compact = self.select_files_for_compaction(&files);
240
241        if files_to_compact.is_empty() {
242            tracing::debug!(
243                "No files meet compaction criteria (strategy: {:?})",
244                self.config.strategy
245            );
246            return Ok(CompactionResult {
247                files_compacted: 0,
248                bytes_before: 0,
249                bytes_after: 0,
250                events_compacted: 0,
251                duration_ms: 0,
252            });
253        }
254
255        let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
256
257        tracing::info!(
258            "Compacting {} files ({:.2} MB)",
259            files_to_compact.len(),
260            bytes_before as f64 / (1024.0 * 1024.0)
261        );
262
263        // Read events from all files to be compacted
264        let mut all_events = Vec::new();
265        for file_info in &files_to_compact {
266            match self.read_parquet_file(&file_info.path) {
267                Ok(mut events) => {
268                    all_events.append(&mut events);
269                }
270                Err(e) => {
271                    tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
272                    // Continue with other files
273                }
274            }
275        }
276
277        if all_events.is_empty() {
278            tracing::warn!("No events read from files to compact");
279            return Ok(CompactionResult {
280                files_compacted: 0,
281                bytes_before,
282                bytes_after: 0,
283                events_compacted: 0,
284                duration_ms: start_time.elapsed().as_millis() as u64,
285            });
286        }
287
288        // Sort events by timestamp for better compression and query performance
289        all_events.sort_by_key(|e| e.timestamp);
290
291        tracing::debug!("Read {} events for compaction", all_events.len());
292
293        // Write compacted file(s)
294        let compacted_files = self.write_compacted_files(&all_events)?;
295
296        let bytes_after: u64 = compacted_files
297            .iter()
298            .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
299            .sum();
300
301        // Delete original files atomically
302        for file_info in &files_to_compact {
303            if let Err(e) = fs::remove_file(&file_info.path) {
304                tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
305            } else {
306                tracing::debug!("Removed old file: {:?}", file_info.path);
307            }
308        }
309
310        let duration_ms = start_time.elapsed().as_millis() as u64;
311
312        // Update statistics
313        let mut stats = self.stats.write();
314        stats.total_compactions += 1;
315        stats.total_files_compacted += files_to_compact.len() as u64;
316        stats.total_bytes_before += bytes_before;
317        stats.total_bytes_after += bytes_after;
318        stats.total_events_compacted += all_events.len() as u64;
319        stats.last_compaction_duration_ms = duration_ms;
320        stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
321        drop(stats);
322
323        // Update last compaction time
324        *self.last_compaction.write() = Some(Utc::now());
325
326        let compression_ratio = if bytes_before > 0 {
327            (bytes_after as f64 / bytes_before as f64) * 100.0
328        } else {
329            100.0
330        };
331
332        tracing::info!(
333            "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
334            files_to_compact.len(),
335            compacted_files.len(),
336            bytes_before as f64 / (1024.0 * 1024.0),
337            bytes_after as f64 / (1024.0 * 1024.0),
338            compression_ratio,
339            all_events.len(),
340            duration_ms
341        );
342
343        Ok(CompactionResult {
344            files_compacted: files_to_compact.len(),
345            bytes_before,
346            bytes_after,
347            events_compacted: all_events.len(),
348            duration_ms,
349        })
350    }
351
352    /// Read events from a Parquet file
353    fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
354        // Use ParquetStorage to read the file
355        let storage = ParquetStorage::new(&self.storage_dir)?;
356
357        // For now, we'll read all events and filter by file
358        // In a production system, you'd want to read specific files
359        let all_events = storage.load_all_events()?;
360
361        Ok(all_events)
362    }
363
364    /// Write compacted events to new Parquet file(s)
365    fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
366        let mut compacted_files = Vec::new();
367        let mut current_batch = Vec::new();
368        let mut current_size = 0;
369
370        for event in events {
371            // Estimate event size (rough approximation)
372            let event_size = serde_json::to_string(event)
373                .map(|s| s.len())
374                .unwrap_or(1024);
375
376            // Check if adding this event would exceed target size
377            if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
378            {
379                // Write current batch
380                let file_path = self.write_batch(&current_batch)?;
381                compacted_files.push(file_path);
382
383                // Start new batch
384                current_batch.clear();
385                current_size = 0;
386            }
387
388            current_batch.push(event.clone());
389            current_size += event_size;
390
391            // Also check max file size
392            if current_size >= self.config.max_file_size {
393                let file_path = self.write_batch(&current_batch)?;
394                compacted_files.push(file_path);
395
396                current_batch.clear();
397                current_size = 0;
398            }
399        }
400
401        // Write remaining events
402        if !current_batch.is_empty() {
403            let file_path = self.write_batch(&current_batch)?;
404            compacted_files.push(file_path);
405        }
406
407        Ok(compacted_files)
408    }
409
410    /// Write a batch of events to a new Parquet file
411    fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
412        let mut storage = ParquetStorage::new(&self.storage_dir)?;
413
414        // Generate filename with timestamp
415        let filename = format!(
416            "events-compacted-{}.parquet",
417            Utc::now().format("%Y%m%d-%H%M%S-%f")
418        );
419        let file_path = self.storage_dir.join(filename);
420
421        // Write events
422        for event in events {
423            storage.append_event(event.clone())?;
424        }
425
426        // Flush to disk
427        storage.flush()?;
428
429        tracing::debug!(
430            "Wrote compacted file: {:?} ({} events)",
431            file_path,
432            events.len()
433        );
434
435        Ok(file_path)
436    }
437
438    /// Get compaction statistics
439    pub fn stats(&self) -> CompactionStats {
440        (*self.stats.read()).clone()
441    }
442
443    /// Get configuration
444    pub fn config(&self) -> &CompactionConfig {
445        &self.config
446    }
447
448    /// Trigger manual compaction
449    pub fn compact_now(&self) -> Result<CompactionResult> {
450        tracing::info!("Manual compaction triggered");
451        self.compact()
452    }
453}
454
455/// Result of a compaction operation
456#[derive(Debug, Clone, Serialize)]
457pub struct CompactionResult {
458    pub files_compacted: usize,
459    pub bytes_before: u64,
460    pub bytes_after: u64,
461    pub events_compacted: usize,
462    pub duration_ms: u64,
463}
464
465/// Background compaction task
466pub struct CompactionTask {
467    manager: Arc<CompactionManager>,
468    interval: Duration,
469}
470
471impl CompactionTask {
472    /// Create a new background compaction task
473    pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
474        Self {
475            manager,
476            interval: Duration::from_secs(interval_seconds),
477        }
478    }
479
480    /// Run the compaction task in a loop
481    pub async fn run(self) {
482        let mut interval = tokio::time::interval(self.interval);
483
484        loop {
485            interval.tick().await;
486
487            if self.manager.should_compact() {
488                tracing::debug!("Auto-compaction check triggered");
489
490                match self.manager.compact() {
491                    Ok(result) => {
492                        if result.files_compacted > 0 {
493                            tracing::info!(
494                                "Auto-compaction succeeded: {} files, {:.2} MB saved",
495                                result.files_compacted,
496                                (result.bytes_before - result.bytes_after) as f64
497                                    / (1024.0 * 1024.0)
498                            );
499                        }
500                    }
501                    Err(e) => {
502                        tracing::error!("Auto-compaction failed: {}", e);
503                    }
504                }
505            }
506        }
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use tempfile::TempDir;
514
515    #[test]
516    fn test_compaction_manager_creation() {
517        let temp_dir = TempDir::new().unwrap();
518        let config = CompactionConfig::default();
519        let manager = CompactionManager::new(temp_dir.path(), config);
520
521        assert_eq!(manager.stats().total_compactions, 0);
522    }
523
524    #[test]
525    fn test_should_compact() {
526        let temp_dir = TempDir::new().unwrap();
527        let config = CompactionConfig {
528            auto_compact: true,
529            compaction_interval_seconds: 1,
530            ..Default::default()
531        };
532        let manager = CompactionManager::new(temp_dir.path(), config);
533
534        // Should compact on first check (never compacted)
535        assert!(manager.should_compact());
536    }
537
538    #[test]
539    fn test_file_selection_size_based() {
540        let temp_dir = TempDir::new().unwrap();
541        let config = CompactionConfig {
542            small_file_threshold: 1024 * 1024, // 1 MB
543            min_files_to_compact: 2,
544            strategy: CompactionStrategy::SizeBased,
545            ..Default::default()
546        };
547        let manager = CompactionManager::new(temp_dir.path(), config);
548
549        let files = vec![
550            FileInfo {
551                path: PathBuf::from("small1.parquet"),
552                size: 500_000, // 500 KB
553                created: Utc::now(),
554            },
555            FileInfo {
556                path: PathBuf::from("small2.parquet"),
557                size: 600_000, // 600 KB
558                created: Utc::now(),
559            },
560            FileInfo {
561                path: PathBuf::from("large.parquet"),
562                size: 10_000_000, // 10 MB
563                created: Utc::now(),
564            },
565        ];
566
567        let selected = manager.select_files_for_compaction(&files);
568        assert_eq!(selected.len(), 2); // Only the 2 small files
569    }
570}