1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use crate::storage::ParquetStorage;
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12pub struct CompactionManager {
14 storage_dir: PathBuf,
16
17 config: CompactionConfig,
19
20 stats: Arc<RwLock<CompactionStats>>,
22
23 last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
25}
26
27#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29 pub min_files_to_compact: usize,
31
32 pub target_file_size: usize,
34
35 pub max_file_size: usize,
37
38 pub small_file_threshold: usize,
40
41 pub compaction_interval_seconds: u64,
43
44 pub auto_compact: bool,
46
47 pub strategy: CompactionStrategy,
49}
50
51impl Default for CompactionConfig {
52 fn default() -> Self {
53 Self {
54 min_files_to_compact: 3,
55 target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
60 strategy: CompactionStrategy::SizeBased,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
66#[serde(rename_all = "lowercase")]
67pub enum CompactionStrategy {
68 SizeBased,
70 TimeBased,
72 FullCompaction,
74}
75
76#[derive(Debug, Clone, Default, Serialize)]
77pub struct CompactionStats {
78 pub total_compactions: u64,
79 pub total_files_compacted: u64,
80 pub total_bytes_before: u64,
81 pub total_bytes_after: u64,
82 pub total_events_compacted: u64,
83 pub last_compaction_duration_ms: u64,
84 pub space_saved_bytes: u64,
85}
86
87#[derive(Debug, Clone)]
89struct FileInfo {
90 path: PathBuf,
91 size: u64,
92 created: DateTime<Utc>,
93}
94
95impl CompactionManager {
96 pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
98 let storage_dir = storage_dir.into();
99
100 tracing::info!(
101 "✅ Compaction manager initialized at: {}",
102 storage_dir.display()
103 );
104
105 Self {
106 storage_dir,
107 config,
108 stats: Arc::new(RwLock::new(CompactionStats::default())),
109 last_compaction: Arc::new(RwLock::new(None)),
110 }
111 }
112
113 fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
115 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
116 AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
117 })?;
118
119 let mut files = Vec::new();
120
121 for entry in entries {
122 let entry = entry.map_err(|e| {
123 AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
124 })?;
125
126 let path = entry.path();
127 if let Some(ext) = path.extension() {
128 if ext == "parquet" {
129 let metadata = entry.metadata().map_err(|e| {
130 AllSourceError::StorageError(format!("Failed to read file metadata: {}", e))
131 })?;
132
133 let size = metadata.len();
134 let created = metadata
135 .created()
136 .ok()
137 .and_then(|t| {
138 t.duration_since(std::time::UNIX_EPOCH)
139 .ok()
140 .map(|d| {
141 DateTime::from_timestamp(d.as_secs() as i64, 0)
142 .unwrap_or_else(Utc::now)
143 })
144 })
145 .unwrap_or_else(Utc::now);
146
147 files.push(FileInfo {
148 path,
149 size,
150 created,
151 });
152 }
153 }
154 }
155
156 files.sort_by_key(|f| f.created);
158
159 Ok(files)
160 }
161
162 fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
164 match self.config.strategy {
165 CompactionStrategy::SizeBased => self.select_small_files(files),
166 CompactionStrategy::TimeBased => self.select_old_files(files),
167 CompactionStrategy::FullCompaction => files.to_vec(),
168 }
169 }
170
171 fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
173 let small_files: Vec<FileInfo> = files
174 .iter()
175 .filter(|f| f.size < self.config.small_file_threshold as u64)
176 .cloned()
177 .collect();
178
179 if small_files.len() >= self.config.min_files_to_compact {
181 small_files
182 } else {
183 Vec::new()
184 }
185 }
186
187 fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
189 let now = Utc::now();
190 let age_threshold = chrono::Duration::hours(24); let old_files: Vec<FileInfo> = files
193 .iter()
194 .filter(|f| now - f.created > age_threshold)
195 .cloned()
196 .collect();
197
198 if old_files.len() >= self.config.min_files_to_compact {
199 old_files
200 } else {
201 Vec::new()
202 }
203 }
204
205 pub fn should_compact(&self) -> bool {
207 if !self.config.auto_compact {
208 return false;
209 }
210
211 let last = self.last_compaction.read();
212 match *last {
213 None => true, Some(last_time) => {
215 let elapsed = (Utc::now() - last_time).num_seconds();
216 elapsed >= self.config.compaction_interval_seconds as i64
217 }
218 }
219 }
220
221 pub fn compact(&self) -> Result<CompactionResult> {
223 let start_time = std::time::Instant::now();
224 tracing::info!("🔄 Starting Parquet compaction...");
225
226 let files = self.list_parquet_files()?;
228
229 if files.is_empty() {
230 tracing::debug!("No Parquet files to compact");
231 return Ok(CompactionResult {
232 files_compacted: 0,
233 bytes_before: 0,
234 bytes_after: 0,
235 events_compacted: 0,
236 duration_ms: 0,
237 });
238 }
239
240 let files_to_compact = self.select_files_for_compaction(&files);
242
243 if files_to_compact.is_empty() {
244 tracing::debug!(
245 "No files meet compaction criteria (strategy: {:?})",
246 self.config.strategy
247 );
248 return Ok(CompactionResult {
249 files_compacted: 0,
250 bytes_before: 0,
251 bytes_after: 0,
252 events_compacted: 0,
253 duration_ms: 0,
254 });
255 }
256
257 let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
258
259 tracing::info!(
260 "Compacting {} files ({:.2} MB)",
261 files_to_compact.len(),
262 bytes_before as f64 / (1024.0 * 1024.0)
263 );
264
265 let mut all_events = Vec::new();
267 for file_info in &files_to_compact {
268 match self.read_parquet_file(&file_info.path) {
269 Ok(mut events) => {
270 all_events.append(&mut events);
271 }
272 Err(e) => {
273 tracing::error!(
274 "Failed to read Parquet file {:?}: {}",
275 file_info.path,
276 e
277 );
278 }
280 }
281 }
282
283 if all_events.is_empty() {
284 tracing::warn!("No events read from files to compact");
285 return Ok(CompactionResult {
286 files_compacted: 0,
287 bytes_before,
288 bytes_after: 0,
289 events_compacted: 0,
290 duration_ms: start_time.elapsed().as_millis() as u64,
291 });
292 }
293
294 all_events.sort_by_key(|e| e.timestamp);
296
297 tracing::debug!("Read {} events for compaction", all_events.len());
298
299 let compacted_files = self.write_compacted_files(&all_events)?;
301
302 let bytes_after: u64 = compacted_files.iter().map(|p| {
303 fs::metadata(p)
304 .map(|m| m.len())
305 .unwrap_or(0)
306 }).sum();
307
308 for file_info in &files_to_compact {
310 if let Err(e) = fs::remove_file(&file_info.path) {
311 tracing::error!(
312 "Failed to remove old file {:?}: {}",
313 file_info.path,
314 e
315 );
316 } else {
317 tracing::debug!("Removed old file: {:?}", file_info.path);
318 }
319 }
320
321 let duration_ms = start_time.elapsed().as_millis() as u64;
322
323 let mut stats = self.stats.write();
325 stats.total_compactions += 1;
326 stats.total_files_compacted += files_to_compact.len() as u64;
327 stats.total_bytes_before += bytes_before;
328 stats.total_bytes_after += bytes_after;
329 stats.total_events_compacted += all_events.len() as u64;
330 stats.last_compaction_duration_ms = duration_ms;
331 stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
332 drop(stats);
333
334 *self.last_compaction.write() = Some(Utc::now());
336
337 let compression_ratio = if bytes_before > 0 {
338 (bytes_after as f64 / bytes_before as f64) * 100.0
339 } else {
340 100.0
341 };
342
343 tracing::info!(
344 "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
345 files_to_compact.len(),
346 compacted_files.len(),
347 bytes_before as f64 / (1024.0 * 1024.0),
348 bytes_after as f64 / (1024.0 * 1024.0),
349 compression_ratio,
350 all_events.len(),
351 duration_ms
352 );
353
354 Ok(CompactionResult {
355 files_compacted: files_to_compact.len(),
356 bytes_before,
357 bytes_after,
358 events_compacted: all_events.len(),
359 duration_ms,
360 })
361 }
362
363 fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
365 let storage = ParquetStorage::new(&self.storage_dir)?;
367
368 let all_events = storage.load_all_events()?;
371
372 Ok(all_events)
373 }
374
375 fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
377 let mut compacted_files = Vec::new();
378 let mut current_batch = Vec::new();
379 let mut current_size = 0;
380
381 for event in events {
382 let event_size = serde_json::to_string(event)
384 .map(|s| s.len())
385 .unwrap_or(1024);
386
387 if current_size + event_size > self.config.target_file_size && !current_batch.is_empty() {
389 let file_path = self.write_batch(¤t_batch)?;
391 compacted_files.push(file_path);
392
393 current_batch.clear();
395 current_size = 0;
396 }
397
398 current_batch.push(event.clone());
399 current_size += event_size;
400
401 if current_size >= self.config.max_file_size {
403 let file_path = self.write_batch(¤t_batch)?;
404 compacted_files.push(file_path);
405
406 current_batch.clear();
407 current_size = 0;
408 }
409 }
410
411 if !current_batch.is_empty() {
413 let file_path = self.write_batch(¤t_batch)?;
414 compacted_files.push(file_path);
415 }
416
417 Ok(compacted_files)
418 }
419
420 fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
422 let mut storage = ParquetStorage::new(&self.storage_dir)?;
423
424 let filename = format!(
426 "events-compacted-{}.parquet",
427 Utc::now().format("%Y%m%d-%H%M%S-%f")
428 );
429 let file_path = self.storage_dir.join(filename);
430
431 for event in events {
433 storage.append_event(event.clone())?;
434 }
435
436 storage.flush()?;
438
439 tracing::debug!(
440 "Wrote compacted file: {:?} ({} events)",
441 file_path,
442 events.len()
443 );
444
445 Ok(file_path)
446 }
447
448 pub fn stats(&self) -> CompactionStats {
450 (*self.stats.read()).clone()
451 }
452
453 pub fn config(&self) -> &CompactionConfig {
455 &self.config
456 }
457
458 pub fn compact_now(&self) -> Result<CompactionResult> {
460 tracing::info!("Manual compaction triggered");
461 self.compact()
462 }
463}
464
465#[derive(Debug, Clone, Serialize)]
467pub struct CompactionResult {
468 pub files_compacted: usize,
469 pub bytes_before: u64,
470 pub bytes_after: u64,
471 pub events_compacted: usize,
472 pub duration_ms: u64,
473}
474
475pub struct CompactionTask {
477 manager: Arc<CompactionManager>,
478 interval: Duration,
479}
480
481impl CompactionTask {
482 pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
484 Self {
485 manager,
486 interval: Duration::from_secs(interval_seconds),
487 }
488 }
489
490 pub async fn run(self) {
492 let mut interval = tokio::time::interval(self.interval);
493
494 loop {
495 interval.tick().await;
496
497 if self.manager.should_compact() {
498 tracing::debug!("Auto-compaction check triggered");
499
500 match self.manager.compact() {
501 Ok(result) => {
502 if result.files_compacted > 0 {
503 tracing::info!(
504 "Auto-compaction succeeded: {} files, {:.2} MB saved",
505 result.files_compacted,
506 (result.bytes_before - result.bytes_after) as f64 / (1024.0 * 1024.0)
507 );
508 }
509 }
510 Err(e) => {
511 tracing::error!("Auto-compaction failed: {}", e);
512 }
513 }
514 }
515 }
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use tempfile::TempDir;
523
524 #[test]
525 fn test_compaction_manager_creation() {
526 let temp_dir = TempDir::new().unwrap();
527 let config = CompactionConfig::default();
528 let manager = CompactionManager::new(temp_dir.path(), config);
529
530 assert_eq!(manager.stats().total_compactions, 0);
531 }
532
533 #[test]
534 fn test_should_compact() {
535 let temp_dir = TempDir::new().unwrap();
536 let config = CompactionConfig {
537 auto_compact: true,
538 compaction_interval_seconds: 1,
539 ..Default::default()
540 };
541 let manager = CompactionManager::new(temp_dir.path(), config);
542
543 assert!(manager.should_compact());
545 }
546
547 #[test]
548 fn test_file_selection_size_based() {
549 let temp_dir = TempDir::new().unwrap();
550 let config = CompactionConfig {
551 small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
553 strategy: CompactionStrategy::SizeBased,
554 ..Default::default()
555 };
556 let manager = CompactionManager::new(temp_dir.path(), config);
557
558 let files = vec![
559 FileInfo {
560 path: PathBuf::from("small1.parquet"),
561 size: 500_000, created: Utc::now(),
563 },
564 FileInfo {
565 path: PathBuf::from("small2.parquet"),
566 size: 600_000, created: Utc::now(),
568 },
569 FileInfo {
570 path: PathBuf::from("large.parquet"),
571 size: 10_000_000, created: Utc::now(),
573 },
574 ];
575
576 let selected = manager.select_files_for_compaction(&files);
577 assert_eq!(selected.len(), 2); }
579}