allsource_core/
compaction.rs

1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use crate::storage::ParquetStorage;
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12/// Manages Parquet file compaction for optimal storage and query performance
13pub struct CompactionManager {
14    /// Directory where Parquet files are stored
15    storage_dir: PathBuf,
16
17    /// Configuration
18    config: CompactionConfig,
19
20    /// Statistics
21    stats: Arc<RwLock<CompactionStats>>,
22
23    /// Last compaction time
24    last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
25}
26
27#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29    /// Minimum number of files to trigger compaction
30    pub min_files_to_compact: usize,
31
32    /// Target size for compacted files (in bytes)
33    pub target_file_size: usize,
34
35    /// Maximum size for a single compacted file (in bytes)
36    pub max_file_size: usize,
37
38    /// Minimum file size to consider for compaction (small files)
39    pub small_file_threshold: usize,
40
41    /// Time interval between automatic compactions (in seconds)
42    pub compaction_interval_seconds: u64,
43
44    /// Enable automatic background compaction
45    pub auto_compact: bool,
46
47    /// Compaction strategy
48    pub strategy: CompactionStrategy,
49}
50
51impl Default for CompactionConfig {
52    fn default() -> Self {
53        Self {
54            min_files_to_compact: 3,
55            target_file_size: 128 * 1024 * 1024,      // 128 MB
56            max_file_size: 256 * 1024 * 1024,         // 256 MB
57            small_file_threshold: 10 * 1024 * 1024,   // 10 MB
58            compaction_interval_seconds: 3600,         // 1 hour
59            auto_compact: true,
60            strategy: CompactionStrategy::SizeBased,
61        }
62    }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
66#[serde(rename_all = "lowercase")]
67pub enum CompactionStrategy {
68    /// Compact based on file size (default)
69    SizeBased,
70    /// Compact based on file age
71    TimeBased,
72    /// Compact all files into one
73    FullCompaction,
74}
75
76#[derive(Debug, Clone, Default, Serialize)]
77pub struct CompactionStats {
78    pub total_compactions: u64,
79    pub total_files_compacted: u64,
80    pub total_bytes_before: u64,
81    pub total_bytes_after: u64,
82    pub total_events_compacted: u64,
83    pub last_compaction_duration_ms: u64,
84    pub space_saved_bytes: u64,
85}
86
87/// Information about a Parquet file candidate for compaction
88#[derive(Debug, Clone)]
89struct FileInfo {
90    path: PathBuf,
91    size: u64,
92    created: DateTime<Utc>,
93}
94
95impl CompactionManager {
96    /// Create a new compaction manager
97    pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
98        let storage_dir = storage_dir.into();
99
100        tracing::info!(
101            "✅ Compaction manager initialized at: {}",
102            storage_dir.display()
103        );
104
105        Self {
106            storage_dir,
107            config,
108            stats: Arc::new(RwLock::new(CompactionStats::default())),
109            last_compaction: Arc::new(RwLock::new(None)),
110        }
111    }
112
113    /// List all Parquet files in the storage directory
114    fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
115        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
116            AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
117        })?;
118
119        let mut files = Vec::new();
120
121        for entry in entries {
122            let entry = entry.map_err(|e| {
123                AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
124            })?;
125
126            let path = entry.path();
127            if let Some(ext) = path.extension() {
128                if ext == "parquet" {
129                    let metadata = entry.metadata().map_err(|e| {
130                        AllSourceError::StorageError(format!("Failed to read file metadata: {}", e))
131                    })?;
132
133                    let size = metadata.len();
134                    let created = metadata
135                        .created()
136                        .ok()
137                        .and_then(|t| {
138                            t.duration_since(std::time::UNIX_EPOCH)
139                                .ok()
140                                .map(|d| {
141                                    DateTime::from_timestamp(d.as_secs() as i64, 0)
142                                        .unwrap_or_else(Utc::now)
143                                })
144                        })
145                        .unwrap_or_else(Utc::now);
146
147                    files.push(FileInfo {
148                        path,
149                        size,
150                        created,
151                    });
152                }
153            }
154        }
155
156        // Sort by creation time (oldest first)
157        files.sort_by_key(|f| f.created);
158
159        Ok(files)
160    }
161
162    /// Identify files that should be compacted based on strategy
163    fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
164        match self.config.strategy {
165            CompactionStrategy::SizeBased => self.select_small_files(files),
166            CompactionStrategy::TimeBased => self.select_old_files(files),
167            CompactionStrategy::FullCompaction => files.to_vec(),
168        }
169    }
170
171    /// Select small files for compaction
172    fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
173        let small_files: Vec<FileInfo> = files
174            .iter()
175            .filter(|f| f.size < self.config.small_file_threshold as u64)
176            .cloned()
177            .collect();
178
179        // Only compact if we have enough small files
180        if small_files.len() >= self.config.min_files_to_compact {
181            small_files
182        } else {
183            Vec::new()
184        }
185    }
186
187    /// Select old files for time-based compaction
188    fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
189        let now = Utc::now();
190        let age_threshold = chrono::Duration::hours(24); // Files older than 24 hours
191
192        let old_files: Vec<FileInfo> = files
193            .iter()
194            .filter(|f| now - f.created > age_threshold)
195            .cloned()
196            .collect();
197
198        if old_files.len() >= self.config.min_files_to_compact {
199            old_files
200        } else {
201            Vec::new()
202        }
203    }
204
205    /// Check if compaction should run
206    pub fn should_compact(&self) -> bool {
207        if !self.config.auto_compact {
208            return false;
209        }
210
211        let last = self.last_compaction.read();
212        match *last {
213            None => true, // Never compacted
214            Some(last_time) => {
215                let elapsed = (Utc::now() - last_time).num_seconds();
216                elapsed >= self.config.compaction_interval_seconds as i64
217            }
218        }
219    }
220
221    /// Perform compaction of Parquet files
222    pub fn compact(&self) -> Result<CompactionResult> {
223        let start_time = std::time::Instant::now();
224        tracing::info!("🔄 Starting Parquet compaction...");
225
226        // List all Parquet files
227        let files = self.list_parquet_files()?;
228
229        if files.is_empty() {
230            tracing::debug!("No Parquet files to compact");
231            return Ok(CompactionResult {
232                files_compacted: 0,
233                bytes_before: 0,
234                bytes_after: 0,
235                events_compacted: 0,
236                duration_ms: 0,
237            });
238        }
239
240        // Select files for compaction
241        let files_to_compact = self.select_files_for_compaction(&files);
242
243        if files_to_compact.is_empty() {
244            tracing::debug!(
245                "No files meet compaction criteria (strategy: {:?})",
246                self.config.strategy
247            );
248            return Ok(CompactionResult {
249                files_compacted: 0,
250                bytes_before: 0,
251                bytes_after: 0,
252                events_compacted: 0,
253                duration_ms: 0,
254            });
255        }
256
257        let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
258
259        tracing::info!(
260            "Compacting {} files ({:.2} MB)",
261            files_to_compact.len(),
262            bytes_before as f64 / (1024.0 * 1024.0)
263        );
264
265        // Read events from all files to be compacted
266        let mut all_events = Vec::new();
267        for file_info in &files_to_compact {
268            match self.read_parquet_file(&file_info.path) {
269                Ok(mut events) => {
270                    all_events.append(&mut events);
271                }
272                Err(e) => {
273                    tracing::error!(
274                        "Failed to read Parquet file {:?}: {}",
275                        file_info.path,
276                        e
277                    );
278                    // Continue with other files
279                }
280            }
281        }
282
283        if all_events.is_empty() {
284            tracing::warn!("No events read from files to compact");
285            return Ok(CompactionResult {
286                files_compacted: 0,
287                bytes_before,
288                bytes_after: 0,
289                events_compacted: 0,
290                duration_ms: start_time.elapsed().as_millis() as u64,
291            });
292        }
293
294        // Sort events by timestamp for better compression and query performance
295        all_events.sort_by_key(|e| e.timestamp);
296
297        tracing::debug!("Read {} events for compaction", all_events.len());
298
299        // Write compacted file(s)
300        let compacted_files = self.write_compacted_files(&all_events)?;
301
302        let bytes_after: u64 = compacted_files.iter().map(|p| {
303            fs::metadata(p)
304                .map(|m| m.len())
305                .unwrap_or(0)
306        }).sum();
307
308        // Delete original files atomically
309        for file_info in &files_to_compact {
310            if let Err(e) = fs::remove_file(&file_info.path) {
311                tracing::error!(
312                    "Failed to remove old file {:?}: {}",
313                    file_info.path,
314                    e
315                );
316            } else {
317                tracing::debug!("Removed old file: {:?}", file_info.path);
318            }
319        }
320
321        let duration_ms = start_time.elapsed().as_millis() as u64;
322
323        // Update statistics
324        let mut stats = self.stats.write();
325        stats.total_compactions += 1;
326        stats.total_files_compacted += files_to_compact.len() as u64;
327        stats.total_bytes_before += bytes_before;
328        stats.total_bytes_after += bytes_after;
329        stats.total_events_compacted += all_events.len() as u64;
330        stats.last_compaction_duration_ms = duration_ms;
331        stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
332        drop(stats);
333
334        // Update last compaction time
335        *self.last_compaction.write() = Some(Utc::now());
336
337        let compression_ratio = if bytes_before > 0 {
338            (bytes_after as f64 / bytes_before as f64) * 100.0
339        } else {
340            100.0
341        };
342
343        tracing::info!(
344            "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
345            files_to_compact.len(),
346            compacted_files.len(),
347            bytes_before as f64 / (1024.0 * 1024.0),
348            bytes_after as f64 / (1024.0 * 1024.0),
349            compression_ratio,
350            all_events.len(),
351            duration_ms
352        );
353
354        Ok(CompactionResult {
355            files_compacted: files_to_compact.len(),
356            bytes_before,
357            bytes_after,
358            events_compacted: all_events.len(),
359            duration_ms,
360        })
361    }
362
363    /// Read events from a Parquet file
364    fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
365        // Use ParquetStorage to read the file
366        let storage = ParquetStorage::new(&self.storage_dir)?;
367
368        // For now, we'll read all events and filter by file
369        // In a production system, you'd want to read specific files
370        let all_events = storage.load_all_events()?;
371
372        Ok(all_events)
373    }
374
375    /// Write compacted events to new Parquet file(s)
376    fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
377        let mut compacted_files = Vec::new();
378        let mut current_batch = Vec::new();
379        let mut current_size = 0;
380
381        for event in events {
382            // Estimate event size (rough approximation)
383            let event_size = serde_json::to_string(event)
384                .map(|s| s.len())
385                .unwrap_or(1024);
386
387            // Check if adding this event would exceed target size
388            if current_size + event_size > self.config.target_file_size && !current_batch.is_empty() {
389                // Write current batch
390                let file_path = self.write_batch(&current_batch)?;
391                compacted_files.push(file_path);
392
393                // Start new batch
394                current_batch.clear();
395                current_size = 0;
396            }
397
398            current_batch.push(event.clone());
399            current_size += event_size;
400
401            // Also check max file size
402            if current_size >= self.config.max_file_size {
403                let file_path = self.write_batch(&current_batch)?;
404                compacted_files.push(file_path);
405
406                current_batch.clear();
407                current_size = 0;
408            }
409        }
410
411        // Write remaining events
412        if !current_batch.is_empty() {
413            let file_path = self.write_batch(&current_batch)?;
414            compacted_files.push(file_path);
415        }
416
417        Ok(compacted_files)
418    }
419
420    /// Write a batch of events to a new Parquet file
421    fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
422        let mut storage = ParquetStorage::new(&self.storage_dir)?;
423
424        // Generate filename with timestamp
425        let filename = format!(
426            "events-compacted-{}.parquet",
427            Utc::now().format("%Y%m%d-%H%M%S-%f")
428        );
429        let file_path = self.storage_dir.join(filename);
430
431        // Write events
432        for event in events {
433            storage.append_event(event.clone())?;
434        }
435
436        // Flush to disk
437        storage.flush()?;
438
439        tracing::debug!(
440            "Wrote compacted file: {:?} ({} events)",
441            file_path,
442            events.len()
443        );
444
445        Ok(file_path)
446    }
447
448    /// Get compaction statistics
449    pub fn stats(&self) -> CompactionStats {
450        (*self.stats.read()).clone()
451    }
452
453    /// Get configuration
454    pub fn config(&self) -> &CompactionConfig {
455        &self.config
456    }
457
458    /// Trigger manual compaction
459    pub fn compact_now(&self) -> Result<CompactionResult> {
460        tracing::info!("Manual compaction triggered");
461        self.compact()
462    }
463}
464
465/// Result of a compaction operation
466#[derive(Debug, Clone, Serialize)]
467pub struct CompactionResult {
468    pub files_compacted: usize,
469    pub bytes_before: u64,
470    pub bytes_after: u64,
471    pub events_compacted: usize,
472    pub duration_ms: u64,
473}
474
475/// Background compaction task
476pub struct CompactionTask {
477    manager: Arc<CompactionManager>,
478    interval: Duration,
479}
480
481impl CompactionTask {
482    /// Create a new background compaction task
483    pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
484        Self {
485            manager,
486            interval: Duration::from_secs(interval_seconds),
487        }
488    }
489
490    /// Run the compaction task in a loop
491    pub async fn run(self) {
492        let mut interval = tokio::time::interval(self.interval);
493
494        loop {
495            interval.tick().await;
496
497            if self.manager.should_compact() {
498                tracing::debug!("Auto-compaction check triggered");
499
500                match self.manager.compact() {
501                    Ok(result) => {
502                        if result.files_compacted > 0 {
503                            tracing::info!(
504                                "Auto-compaction succeeded: {} files, {:.2} MB saved",
505                                result.files_compacted,
506                                (result.bytes_before - result.bytes_after) as f64 / (1024.0 * 1024.0)
507                            );
508                        }
509                    }
510                    Err(e) => {
511                        tracing::error!("Auto-compaction failed: {}", e);
512                    }
513                }
514            }
515        }
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522    use tempfile::TempDir;
523
524    #[test]
525    fn test_compaction_manager_creation() {
526        let temp_dir = TempDir::new().unwrap();
527        let config = CompactionConfig::default();
528        let manager = CompactionManager::new(temp_dir.path(), config);
529
530        assert_eq!(manager.stats().total_compactions, 0);
531    }
532
533    #[test]
534    fn test_should_compact() {
535        let temp_dir = TempDir::new().unwrap();
536        let config = CompactionConfig {
537            auto_compact: true,
538            compaction_interval_seconds: 1,
539            ..Default::default()
540        };
541        let manager = CompactionManager::new(temp_dir.path(), config);
542
543        // Should compact on first check (never compacted)
544        assert!(manager.should_compact());
545    }
546
547    #[test]
548    fn test_file_selection_size_based() {
549        let temp_dir = TempDir::new().unwrap();
550        let config = CompactionConfig {
551            small_file_threshold: 1024 * 1024, // 1 MB
552            min_files_to_compact: 2,
553            strategy: CompactionStrategy::SizeBased,
554            ..Default::default()
555        };
556        let manager = CompactionManager::new(temp_dir.path(), config);
557
558        let files = vec![
559            FileInfo {
560                path: PathBuf::from("small1.parquet"),
561                size: 500_000, // 500 KB
562                created: Utc::now(),
563            },
564            FileInfo {
565                path: PathBuf::from("small2.parquet"),
566                size: 600_000, // 600 KB
567                created: Utc::now(),
568            },
569            FileInfo {
570                path: PathBuf::from("large.parquet"),
571                size: 10_000_000, // 10 MB
572                created: Utc::now(),
573            },
574        ];
575
576        let selected = manager.select_files_for_compaction(&files);
577        assert_eq!(selected.len(), 2); // Only the 2 small files
578    }
579}