use crate::{
domain::entities::Event,
error::{AllSourceError, Result},
infrastructure::persistence::storage::ParquetStorage,
};
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::{
fs,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
pub struct CompactionManager {
storage_dir: PathBuf,
config: CompactionConfig,
stats: Arc<RwLock<CompactionStats>>,
last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
}
#[derive(Debug, Clone)]
pub struct CompactionConfig {
pub min_files_to_compact: usize,
pub target_file_size: usize,
pub max_file_size: usize,
pub small_file_threshold: usize,
pub compaction_interval_seconds: u64,
pub auto_compact: bool,
pub strategy: CompactionStrategy,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
min_files_to_compact: 3,
target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
strategy: CompactionStrategy::SizeBased,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum CompactionStrategy {
SizeBased,
TimeBased,
FullCompaction,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct CompactionStats {
pub total_compactions: u64,
pub total_files_compacted: u64,
pub total_bytes_before: u64,
pub total_bytes_after: u64,
pub total_events_compacted: u64,
pub last_compaction_duration_ms: u64,
pub space_saved_bytes: u64,
}
#[derive(Debug, Clone)]
struct FileInfo {
path: PathBuf,
size: u64,
created: DateTime<Utc>,
}
impl CompactionManager {
pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
let storage_dir = storage_dir.into();
tracing::info!(
"✅ Compaction manager initialized at: {}",
storage_dir.display()
);
Self {
storage_dir,
config,
stats: Arc::new(RwLock::new(CompactionStats::default())),
last_compaction: Arc::new(RwLock::new(None)),
}
}
fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
})?;
let mut files = Vec::new();
for entry in entries {
let entry = entry.map_err(|e| {
AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
})?;
let path = entry.path();
if let Some(ext) = path.extension()
&& ext == "parquet"
{
let metadata = entry.metadata().map_err(|e| {
AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
})?;
let size = metadata.len();
let created = metadata
.created()
.ok()
.and_then(|t| {
t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
})
})
.unwrap_or_else(Utc::now);
files.push(FileInfo {
path,
size,
created,
});
}
}
files.sort_by_key(|f| f.created);
Ok(files)
}
fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
match self.config.strategy {
CompactionStrategy::SizeBased => self.select_small_files(files),
CompactionStrategy::TimeBased => self.select_old_files(files),
CompactionStrategy::FullCompaction => files.to_vec(),
}
}
fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
let small_files: Vec<FileInfo> = files
.iter()
.filter(|f| f.size < self.config.small_file_threshold as u64)
.cloned()
.collect();
if small_files.len() >= self.config.min_files_to_compact {
small_files
} else {
Vec::new()
}
}
fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
let now = Utc::now();
let age_threshold = chrono::Duration::hours(24);
let old_files: Vec<FileInfo> = files
.iter()
.filter(|f| now - f.created > age_threshold)
.cloned()
.collect();
if old_files.len() >= self.config.min_files_to_compact {
old_files
} else {
Vec::new()
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn should_compact(&self) -> bool {
if !self.config.auto_compact {
return false;
}
let last = self.last_compaction.read();
match *last {
None => true, Some(last_time) => {
let elapsed = (Utc::now() - last_time).num_seconds();
elapsed >= self.config.compaction_interval_seconds as i64
}
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn compact(&self) -> Result<CompactionResult> {
let start_time = std::time::Instant::now();
tracing::info!("🔄 Starting Parquet compaction...");
let files = self.list_parquet_files()?;
if files.is_empty() {
tracing::debug!("No Parquet files to compact");
return Ok(CompactionResult {
files_compacted: 0,
bytes_before: 0,
bytes_after: 0,
events_compacted: 0,
duration_ms: 0,
});
}
let files_to_compact = self.select_files_for_compaction(&files);
if files_to_compact.is_empty() {
tracing::debug!(
"No files meet compaction criteria (strategy: {:?})",
self.config.strategy
);
return Ok(CompactionResult {
files_compacted: 0,
bytes_before: 0,
bytes_after: 0,
events_compacted: 0,
duration_ms: 0,
});
}
let bytes_before: u64 = files_to_compact.iter().map(|f| f.size).sum();
tracing::info!(
"Compacting {} files ({:.2} MB)",
files_to_compact.len(),
bytes_before as f64 / (1024.0 * 1024.0)
);
let mut all_events = Vec::new();
for file_info in &files_to_compact {
match self.read_parquet_file(&file_info.path) {
Ok(mut events) => {
all_events.append(&mut events);
}
Err(e) => {
tracing::error!("Failed to read Parquet file {:?}: {}", file_info.path, e);
}
}
}
if all_events.is_empty() {
tracing::warn!("No events read from files to compact");
return Ok(CompactionResult {
files_compacted: 0,
bytes_before,
bytes_after: 0,
events_compacted: 0,
duration_ms: start_time.elapsed().as_millis() as u64,
});
}
all_events.sort_by_key(|e| e.timestamp);
tracing::debug!("Read {} events for compaction", all_events.len());
let compacted_files = self.write_compacted_files(&all_events)?;
let bytes_after: u64 = compacted_files
.iter()
.map(|p| fs::metadata(p).map(|m| m.len()).unwrap_or(0))
.sum();
for file_info in &files_to_compact {
if let Err(e) = fs::remove_file(&file_info.path) {
tracing::error!("Failed to remove old file {:?}: {}", file_info.path, e);
} else {
tracing::debug!("Removed old file: {:?}", file_info.path);
}
}
let duration_ms = start_time.elapsed().as_millis() as u64;
let mut stats = self.stats.write();
stats.total_compactions += 1;
stats.total_files_compacted += files_to_compact.len() as u64;
stats.total_bytes_before += bytes_before;
stats.total_bytes_after += bytes_after;
stats.total_events_compacted += all_events.len() as u64;
stats.last_compaction_duration_ms = duration_ms;
stats.space_saved_bytes += bytes_before.saturating_sub(bytes_after);
drop(stats);
*self.last_compaction.write() = Some(Utc::now());
let compression_ratio = if bytes_before > 0 {
(bytes_after as f64 / bytes_before as f64) * 100.0
} else {
100.0
};
tracing::info!(
"✅ Compaction complete: {} files → {} files, {:.2} MB → {:.2} MB ({:.1}%), {} events, {}ms",
files_to_compact.len(),
compacted_files.len(),
bytes_before as f64 / (1024.0 * 1024.0),
bytes_after as f64 / (1024.0 * 1024.0),
compression_ratio,
all_events.len(),
duration_ms
);
Ok(CompactionResult {
files_compacted: files_to_compact.len(),
bytes_before,
bytes_after,
events_compacted: all_events.len(),
duration_ms,
})
}
fn read_parquet_file(&self, path: &Path) -> Result<Vec<Event>> {
let storage = ParquetStorage::new(&self.storage_dir)?;
let all_events = storage.load_all_events()?;
Ok(all_events)
}
fn write_compacted_files(&self, events: &[Event]) -> Result<Vec<PathBuf>> {
let mut compacted_files = Vec::new();
let mut current_batch = Vec::new();
let mut current_size = 0;
for event in events {
let event_size = serde_json::to_string(event)
.map(|s| s.len())
.unwrap_or(1024);
if current_size + event_size > self.config.target_file_size && !current_batch.is_empty()
{
let file_path = self.write_batch(¤t_batch)?;
compacted_files.push(file_path);
current_batch.clear();
current_size = 0;
}
current_batch.push(event.clone());
current_size += event_size;
if current_size >= self.config.max_file_size {
let file_path = self.write_batch(¤t_batch)?;
compacted_files.push(file_path);
current_batch.clear();
current_size = 0;
}
}
if !current_batch.is_empty() {
let file_path = self.write_batch(¤t_batch)?;
compacted_files.push(file_path);
}
Ok(compacted_files)
}
fn write_batch(&self, events: &[Event]) -> Result<PathBuf> {
let storage = ParquetStorage::new(&self.storage_dir)?;
let filename = format!(
"events-compacted-{}.parquet",
Utc::now().format("%Y%m%d-%H%M%S-%f")
);
let file_path = self.storage_dir.join(filename);
for event in events {
storage.append_event(event.clone())?;
}
storage.flush()?;
tracing::debug!(
"Wrote compacted file: {:?} ({} events)",
file_path,
events.len()
);
Ok(file_path)
}
pub fn stats(&self) -> CompactionStats {
(*self.stats.read()).clone()
}
pub fn config(&self) -> &CompactionConfig {
&self.config
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn compact_now(&self) -> Result<CompactionResult> {
tracing::info!("Manual compaction triggered");
self.compact()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct CompactionResult {
pub files_compacted: usize,
pub bytes_before: u64,
pub bytes_after: u64,
pub events_compacted: usize,
pub duration_ms: u64,
}
pub struct CompactionTask {
manager: Arc<CompactionManager>,
interval: Duration,
}
impl CompactionTask {
pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
Self {
manager,
interval: Duration::from_secs(interval_seconds),
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub async fn run(self) {
let mut interval = tokio::time::interval(self.interval);
loop {
interval.tick().await;
if self.manager.should_compact() {
tracing::debug!("Auto-compaction check triggered");
match self.manager.compact() {
Ok(result) => {
if result.files_compacted > 0 {
tracing::info!(
"Auto-compaction succeeded: {} files, {:.2} MB saved",
result.files_compacted,
(result.bytes_before - result.bytes_after) as f64
/ (1024.0 * 1024.0)
);
}
}
Err(e) => {
tracing::error!("Auto-compaction failed: {}", e);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_compaction_manager_creation() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
assert_eq!(manager.stats().total_compactions, 0);
}
#[test]
fn test_should_compact() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
auto_compact: true,
compaction_interval_seconds: 1,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
assert!(manager.should_compact());
}
#[test]
fn test_file_selection_size_based() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
strategy: CompactionStrategy::SizeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let files = vec![
FileInfo {
path: PathBuf::from("small1.parquet"),
size: 500_000, created: Utc::now(),
},
FileInfo {
path: PathBuf::from("small2.parquet"),
size: 600_000, created: Utc::now(),
},
FileInfo {
path: PathBuf::from("large.parquet"),
size: 10_000_000, created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 2); }
#[test]
fn test_default_compaction_config() {
let config = CompactionConfig::default();
assert_eq!(config.min_files_to_compact, 3);
assert_eq!(config.target_file_size, 128 * 1024 * 1024);
assert_eq!(config.max_file_size, 256 * 1024 * 1024);
assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
assert_eq!(config.compaction_interval_seconds, 3600);
assert!(config.auto_compact);
assert_eq!(config.strategy, CompactionStrategy::SizeBased);
}
#[test]
fn test_should_compact_disabled() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
auto_compact: false,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
assert!(!manager.should_compact());
}
#[test]
fn test_compact_empty_directory() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact().unwrap();
assert_eq!(result.files_compacted, 0);
assert_eq!(result.bytes_before, 0);
assert_eq!(result.bytes_after, 0);
assert_eq!(result.events_compacted, 0);
}
#[test]
fn test_compact_now() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact_now().unwrap();
assert_eq!(result.files_compacted, 0);
}
#[test]
fn test_get_config() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
min_files_to_compact: 5,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
assert_eq!(manager.config().min_files_to_compact, 5);
}
#[test]
fn test_get_stats() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let stats = manager.stats();
assert_eq!(stats.total_compactions, 0);
assert_eq!(stats.total_files_compacted, 0);
assert_eq!(stats.total_bytes_before, 0);
assert_eq!(stats.total_bytes_after, 0);
assert_eq!(stats.total_events_compacted, 0);
assert_eq!(stats.last_compaction_duration_ms, 0);
assert_eq!(stats.space_saved_bytes, 0);
}
#[test]
fn test_file_selection_not_enough_small_files() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
small_file_threshold: 1024 * 1024,
min_files_to_compact: 3, strategy: CompactionStrategy::SizeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let files = vec![
FileInfo {
path: PathBuf::from("small1.parquet"),
size: 500_000,
created: Utc::now(),
},
FileInfo {
path: PathBuf::from("small2.parquet"),
size: 600_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 0); }
#[test]
fn test_file_selection_time_based() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
min_files_to_compact: 2,
strategy: CompactionStrategy::TimeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let old_time = Utc::now() - chrono::Duration::hours(48);
let files = vec![
FileInfo {
path: PathBuf::from("old1.parquet"),
size: 1_000_000,
created: old_time,
},
FileInfo {
path: PathBuf::from("old2.parquet"),
size: 2_000_000,
created: old_time,
},
FileInfo {
path: PathBuf::from("new.parquet"),
size: 500_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 2); }
#[test]
fn test_file_selection_time_based_not_enough() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
min_files_to_compact: 3,
strategy: CompactionStrategy::TimeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let old_time = Utc::now() - chrono::Duration::hours(48);
let files = vec![
FileInfo {
path: PathBuf::from("old1.parquet"),
size: 1_000_000,
created: old_time,
},
FileInfo {
path: PathBuf::from("new.parquet"),
size: 500_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 0); }
#[test]
fn test_file_selection_full_compaction() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
strategy: CompactionStrategy::FullCompaction,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let files = vec![
FileInfo {
path: PathBuf::from("file1.parquet"),
size: 1_000_000,
created: Utc::now(),
},
FileInfo {
path: PathBuf::from("file2.parquet"),
size: 2_000_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 2); }
#[test]
fn test_compaction_strategy_serde() {
let strategies = vec![
CompactionStrategy::SizeBased,
CompactionStrategy::TimeBased,
CompactionStrategy::FullCompaction,
];
for strategy in strategies {
let json = serde_json::to_string(&strategy).unwrap();
let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, strategy);
}
}
#[test]
fn test_compaction_stats_default() {
let stats = CompactionStats::default();
assert_eq!(stats.total_compactions, 0);
assert_eq!(stats.total_files_compacted, 0);
}
#[test]
fn test_compaction_stats_serde() {
let stats = CompactionStats {
total_compactions: 5,
total_files_compacted: 20,
total_bytes_before: 1000000,
total_bytes_after: 500000,
total_events_compacted: 10000,
last_compaction_duration_ms: 500,
space_saved_bytes: 500000,
};
let json = serde_json::to_string(&stats).unwrap();
assert!(json.contains("\"total_compactions\":5"));
assert!(json.contains("\"space_saved_bytes\":500000"));
}
#[test]
fn test_compaction_result_serde() {
let result = CompactionResult {
files_compacted: 3,
bytes_before: 1000000,
bytes_after: 500000,
events_compacted: 5000,
duration_ms: 250,
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("\"files_compacted\":3"));
assert!(json.contains("\"bytes_before\":1000000"));
}
#[test]
fn test_compaction_task_creation() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
let _task = CompactionTask::new(manager.clone(), 60);
}
#[test]
fn test_list_parquet_files_empty() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let files = manager.list_parquet_files().unwrap();
assert!(files.is_empty());
}
#[test]
fn test_list_parquet_files_with_non_parquet() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
let files = manager.list_parquet_files().unwrap();
assert!(files.is_empty()); }
}