allsource_core/infrastructure/persistence/
compaction.rs1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::infrastructure::persistence::storage::ParquetStorage;
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12pub struct CompactionManager {
14 storage_dir: PathBuf,
16
17 config: CompactionConfig,
19
20 stats: Arc<RwLock<CompactionStats>>,
22
23 last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
25}
26
27#[derive(Debug, Clone)]
28pub struct CompactionConfig {
29 pub min_files_to_compact: usize,
31
32 pub target_file_size: usize,
34
35 pub max_file_size: usize,
37
38 pub small_file_threshold: usize,
40
41 pub compaction_interval_seconds: u64,
43
44 pub auto_compact: bool,
46
47 pub strategy: CompactionStrategy,
49}
50
51impl Default for CompactionConfig {
52 fn default() -> Self {
53 Self {
54 min_files_to_compact: 3,
55 target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
60 strategy: CompactionStrategy::SizeBased,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
66#[serde(rename_all = "lowercase")]
67pub enum CompactionStrategy {
68 SizeBased,
70 TimeBased,
72 FullCompaction,
74}
75
76#[derive(Debug, Clone, Default, Serialize)]
77pub struct CompactionStats {
78 pub total_compactions: u64,
79 pub total_files_compacted: u64,
80 pub total_bytes_before: u64,
81 pub total_bytes_after: u64,
82 pub total_events_compacted: u64,
83 pub last_compaction_duration_ms: u64,
84 pub space_saved_bytes: u64,
85}
86
87#[derive(Debug, Clone)]
89struct FileInfo {
90 path: PathBuf,
91 size: u64,
92 created: DateTime<Utc>,
93}
94
95impl CompactionManager {
96 pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
98 let storage_dir = storage_dir.into();
99
100 tracing::info!(
101 "✅ Compaction manager initialized at: {}",
102 storage_dir.display()
103 );
104
105 Self {
106 storage_dir,
107 config,
108 stats: Arc::new(RwLock::new(CompactionStats::default())),
109 last_compaction: Arc::new(RwLock::new(None)),
110 }
111 }
112
113 fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
115 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
116 AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
117 })?;
118
119 let mut files = Vec::new();
120
121 for entry in entries {
122 let entry = entry.map_err(|e| {
123 AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
124 })?;
125
126 let path = entry.path();
127 if let Some(ext) = path.extension() {
128 if ext == "parquet" {
129 let metadata = entry.metadata().map_err(|e| {
130 AllSourceError::StorageError(format!("Failed to read file metadata: {}", e))
131 })?;
132
133 let size = metadata.len();
134 let created = metadata
135 .created()
136 .ok()
137 .and_then(|t| {
138 t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
139 DateTime::from_timestamp(d.as_secs() as i64, 0)
140 .unwrap_or_else(Utc::now)
141 })
142 })
143 .unwrap_or_else(Utc::now);
144
145 files.push(FileInfo {
146 path,
147 size,
148 created,
149 });
150 }
151 }
152 }
153
154 files.sort_by_key(|f| f.created);
156
157 Ok(files)
158 }
159
160 fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
162 match self.config.strategy {
163 CompactionStrategy::SizeBased => self.select_small_files(files),
164 CompactionStrategy::TimeBased => self.select_old_files(files),
165 CompactionStrategy::FullCompaction => files.to_vec(),
166 }
167 }
168
169 fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
171 let small_files: Vec<FileInfo> = files
172 .iter()
173 .filter(|f| f.size < self.config.small_file_threshold as u64)
174 .cloned()
175 .collect();
176
177 if small_files.len() >= self.config.min_files_to_compact {
179 small_files
180 } else {
181 Vec::new()
182 }
183 }
184
185 fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
187 let now = Utc::now();
188 let age_threshold = chrono::Duration::hours(24); let old_files: Vec<FileInfo> = files
191 .iter()
192 .filter(|f| now - f.created > age_threshold)
193 .cloned()
194 .collect();
195
196 if old_files.len() >= self.config.min_files_to_compact {
197 old_files
198 } else {
199 Vec::new()
200 }
201 }
202
203 pub fn should_compact(&self) -> bool {
205 if !self.config.auto_compact {
206 return false;
207 }
208
209 let last = self.last_compaction.read();
210 match *last {
211 None => true, Some(last_time) => {
213 let elapsed = (Utc::now() - last_time).num_seconds();
214 elapsed >= self.config.compaction_interval_seconds as i64
215 }
216 }
217 }
218
219 pub fn compact(&self) -> Result<CompactionResult> {
221 let start_time = std::time::Instant::now();
222 tracing::info!("🔄 Starting Parquet compaction...");
223
224 let files = self.list_parquet_files()?;
226
227 if files.is_empty() {
228 tracing::debug!("No Parquet files to compact");
229 return Ok(CompactionResult {
230 files_compacted: 0,
231 bytes_before: 0,
232 bytes_after: 0,
233 events_compacted: 0,
234 duration_ms: 0,
235 });
236 }
237
238 let files_to_compact = self.select_files_for_compaction(&files);
240
241 if files_to_compact.is_empty() {
242 tracing::debug!(
243 "No files meet compaction criteria (strategy: {:?})",
244 self.config.strategy
245 );
246 return Ok(CompactionResult {
247 files_compacted: 0,
248 bytes_before: 0,
249 bytes_after: 0,
250 events_compacted: 0,
251 duration_ms: 0,
252 });
253 }
254
255 let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
256
257 tracing::info!(
258 "Compacting {} files ({:.2} MB)",
259 files_to_compact.len(),
260 bytes_before as f64 / (1024.0 * 1024.0)
261 );
262
263 let mut all_events = Vec::new();
265 for file_info in &files_to_compact {
266 match self.read_parquet_file(&file_info.path) {
267 Ok(mut events) => {
268 all_events.append(&mut events);
269 }
270 Err(e) => {
271 tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
272 }
274 }
275 }
276
277 if all_events.is_empty() {
278 tracing::warn!("No events read from files to compact");
279 return Ok(CompactionResult {
280 files_compacted: 0,
281 bytes_before,
282 bytes_after: 0,
283 events_compacted: 0,
284 duration_ms: start_time.elapsed().as_millis() as u64,
285 });
286 }
287
288 all_events.sort_by_key(|e| e.timestamp);
290
291 tracing::debug!("Read {} events for compaction", all_events.len());
292
293 let compacted_files = self.write_compacted_files(&all_events)?;
295
296 let bytes_after: u64 = compacted_files
297 .iter()
298 .map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
299 .sum();
300
301 for file_info in &files_to_compact {
303 if let Err(e) = fs::remove_file(&file_info.path) {
304 tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
305 } else {
306 tracing::debug!("Removed old file: {:?}", file_info.path);
307 }
308 }
309
310 let duration_ms = start_time.elapsed().as_millis() as u64;
311
312 let mut stats = self.stats.write();
314 stats.total_compactions += 1;
315 stats.total_files_compacted += files_to_compact.len() as u64;
316 stats.total_bytes_before += bytes_before;
317 stats.total_bytes_after += bytes_after;
318 stats.total_events_compacted += all_events.len() as u64;
319 stats.last_compaction_duration_ms = duration_ms;
320 stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
321 drop(stats);
322
323 *self.last_compaction.write() = Some(Utc::now());
325
326 let compression_ratio = if bytes_before > 0 {
327 (bytes_after as f64 / bytes_before as f64) * 100.0
328 } else {
329 100.0
330 };
331
332 tracing::info!(
333 "✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
334 files_to_compact.len(),
335 compacted_files.len(),
336 bytes_before as f64 / (1024.0 * 1024.0),
337 bytes_after as f64 / (1024.0 * 1024.0),
338 compression_ratio,
339 all_events.len(),
340 duration_ms
341 );
342
343 Ok(CompactionResult {
344 files_compacted: files_to_compact.len(),
345 bytes_before,
346 bytes_after,
347 events_compacted: all_events.len(),
348 duration_ms,
349 })
350 }
351
352 fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
354 let storage = ParquetStorage::new(&self.storage_dir)?;
356
357 let all_events = storage.load_all_events()?;
360
361 Ok(all_events)
362 }
363
364 fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
366 let mut compacted_files = Vec::new();
367 let mut current_batch = Vec::new();
368 let mut current_size = 0;
369
370 for event in events {
371 let event_size = serde_json::to_string(event)
373 .map(|s| s.len())
374 .unwrap_or(1024);
375
376 if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
378 {
379 let file_path = self.write_batch(¤t_batch)?;
381 compacted_files.push(file_path);
382
383 current_batch.clear();
385 current_size = 0;
386 }
387
388 current_batch.push(event.clone());
389 current_size += event_size;
390
391 if current_size >= self.config.max_file_size {
393 let file_path = self.write_batch(¤t_batch)?;
394 compacted_files.push(file_path);
395
396 current_batch.clear();
397 current_size = 0;
398 }
399 }
400
401 if !current_batch.is_empty() {
403 let file_path = self.write_batch(¤t_batch)?;
404 compacted_files.push(file_path);
405 }
406
407 Ok(compacted_files)
408 }
409
410 fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
412 let mut storage = ParquetStorage::new(&self.storage_dir)?;
413
414 let filename = format!(
416 "events-compacted-{}.parquet",
417 Utc::now().format("%Y%m%d-%H%M%S-%f")
418 );
419 let file_path = self.storage_dir.join(filename);
420
421 for event in events {
423 storage.append_event(event.clone())?;
424 }
425
426 storage.flush()?;
428
429 tracing::debug!(
430 "Wrote compacted file: {:?} ({} events)",
431 file_path,
432 events.len()
433 );
434
435 Ok(file_path)
436 }
437
438 pub fn stats(&self) -> CompactionStats {
440 (*self.stats.read()).clone()
441 }
442
443 pub fn config(&self) -> &CompactionConfig {
445 &self.config
446 }
447
448 pub fn compact_now(&self) -> Result<CompactionResult> {
450 tracing::info!("Manual compaction triggered");
451 self.compact()
452 }
453}
454
455#[derive(Debug, Clone, Serialize)]
457pub struct CompactionResult {
458 pub files_compacted: usize,
459 pub bytes_before: u64,
460 pub bytes_after: u64,
461 pub events_compacted: usize,
462 pub duration_ms: u64,
463}
464
465pub struct CompactionTask {
467 manager: Arc<CompactionManager>,
468 interval: Duration,
469}
470
471impl CompactionTask {
472 pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
474 Self {
475 manager,
476 interval: Duration::from_secs(interval_seconds),
477 }
478 }
479
480 pub async fn run(self) {
482 let mut interval = tokio::time::interval(self.interval);
483
484 loop {
485 interval.tick().await;
486
487 if self.manager.should_compact() {
488 tracing::debug!("Auto-compaction check triggered");
489
490 match self.manager.compact() {
491 Ok(result) => {
492 if result.files_compacted > 0 {
493 tracing::info!(
494 "Auto-compaction succeeded: {} files, {:.2} MB saved",
495 result.files_compacted,
496 (result.bytes_before - result.bytes_after) as f64
497 / (1024.0 * 1024.0)
498 );
499 }
500 }
501 Err(e) => {
502 tracing::error!("Auto-compaction failed: {}", e);
503 }
504 }
505 }
506 }
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use tempfile::TempDir;
514
515 #[test]
516 fn test_compaction_manager_creation() {
517 let temp_dir = TempDir::new().unwrap();
518 let config = CompactionConfig::default();
519 let manager = CompactionManager::new(temp_dir.path(), config);
520
521 assert_eq!(manager.stats().total_compactions, 0);
522 }
523
524 #[test]
525 fn test_should_compact() {
526 let temp_dir = TempDir::new().unwrap();
527 let config = CompactionConfig {
528 auto_compact: true,
529 compaction_interval_seconds: 1,
530 ..Default::default()
531 };
532 let manager = CompactionManager::new(temp_dir.path(), config);
533
534 assert!(manager.should_compact());
536 }
537
538 #[test]
539 fn test_file_selection_size_based() {
540 let temp_dir = TempDir::new().unwrap();
541 let config = CompactionConfig {
542 small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
544 strategy: CompactionStrategy::SizeBased,
545 ..Default::default()
546 };
547 let manager = CompactionManager::new(temp_dir.path(), config);
548
549 let files = vec![
550 FileInfo {
551 path: PathBuf::from("small1.parquet"),
552 size: 500_000, created: Utc::now(),
554 },
555 FileInfo {
556 path: PathBuf::from("small2.parquet"),
557 size: 600_000, created: Utc::now(),
559 },
560 FileInfo {
561 path: PathBuf::from("large.parquet"),
562 size: 10_000_000, created: Utc::now(),
564 },
565 ];
566
567 let selected = manager.select_files_for_compaction(&files);
568 assert_eq!(selected.len(), 2); }
570}