use crate::{
error::{AllSourceError, Result},
infrastructure::persistence::{cold_tier::ArchiveTarget, storage::ParquetStorage},
};
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs, path::PathBuf, sync::Arc, time::Duration};
pub struct CompactionManager {
storage_dir: PathBuf,
config: CompactionConfig,
stats: Arc<RwLock<CompactionStats>>,
last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
}
const SNAPSHOT_PREFIX: &str = "snapshot.";
#[derive(Debug, Clone)]
pub struct CompactionConfig {
pub min_files_to_compact: usize,
pub target_file_size: usize,
pub max_file_size: usize,
pub small_file_threshold: usize,
pub compaction_interval_seconds: u64,
pub auto_compact: bool,
pub strategy: CompactionStrategy,
pub retention: RetentionConfig,
pub archive: Option<Arc<dyn ArchiveTarget>>,
}
#[derive(Debug, Clone)]
pub struct RetentionConfig {
pub default_ttl: Option<Duration>,
pub per_tenant_ttl: HashMap<String, Option<Duration>>,
}
impl Default for RetentionConfig {
fn default() -> Self {
let mut per_tenant_ttl = HashMap::new();
per_tenant_ttl.insert(
"system".to_string(),
Some(Duration::from_secs(30 * 24 * 3600)),
);
Self {
default_ttl: None,
per_tenant_ttl,
}
}
}
impl RetentionConfig {
pub fn ttl_for(&self, tenant_id: &str) -> Option<Duration> {
match self.per_tenant_ttl.get(tenant_id) {
Some(v) => *v,
None => self.default_ttl,
}
}
pub fn set(&mut self, tenant_id: &str, ttl: Option<Duration>) {
self.per_tenant_ttl.insert(tenant_id.to_string(), ttl);
}
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
min_files_to_compact: 3,
target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
strategy: CompactionStrategy::SizeBased,
retention: RetentionConfig::default(),
archive: None,
}
}
}
impl CompactionConfig {
pub fn from_env() -> Self {
Self::from_env_vars(
std::env::var("ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS").ok(),
std::env::var("ALLSOURCE_RETENTION_SYSTEM_DAYS").ok(),
)
}
pub fn from_env_vars(
interval_var: Option<String>,
system_retention_days_var: Option<String>,
) -> Self {
let mut config = Self::default();
if let Some(s) = interval_var.filter(|s| !s.is_empty()) {
match s.parse::<u64>() {
Ok(v) => config.compaction_interval_seconds = v,
Err(e) => {
tracing::warn!(
"ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS={s:?} could not be parsed as \
u64: {e}; defaulting to {}s",
config.compaction_interval_seconds
);
}
}
}
if let Some(s) = system_retention_days_var.filter(|s| !s.is_empty()) {
match s.parse::<u64>() {
Ok(days) => {
config
.retention
.set("system", Some(Duration::from_secs(days * 24 * 3600)));
}
Err(e) => {
tracing::warn!(
"ALLSOURCE_RETENTION_SYSTEM_DAYS={s:?} could not be parsed as u64: \
{e}; defaulting to 30 days for tenant=system"
);
}
}
}
config
}
pub fn from_env_var(interval_var: Option<String>) -> Self {
Self::from_env_vars(interval_var, None)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum CompactionStrategy {
SizeBased,
TimeBased,
FullCompaction,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct CompactionStats {
pub total_compactions: u64,
pub total_files_compacted: u64,
pub total_bytes_before: u64,
pub total_bytes_after: u64,
pub total_events_compacted: u64,
pub last_compaction_duration_ms: u64,
pub space_saved_bytes: u64,
}
#[derive(Debug, Clone)]
struct FileInfo {
path: PathBuf,
size: u64,
created: DateTime<Utc>,
}
impl CompactionManager {
pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
let storage_dir = storage_dir.into();
tracing::info!(
"✅ Compaction manager initialized at: {}",
storage_dir.display()
);
Self {
storage_dir,
config,
stats: Arc::new(RwLock::new(CompactionStats::default())),
last_compaction: Arc::new(RwLock::new(None)),
}
}
fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
})?;
let mut files = Vec::new();
for entry in entries {
let entry = entry.map_err(|e| {
AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
})?;
let path = entry.path();
if let Some(ext) = path.extension()
&& ext == "parquet"
{
let metadata = entry.metadata().map_err(|e| {
AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
})?;
let size = metadata.len();
let created = metadata
.created()
.ok()
.and_then(|t| {
t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
})
})
.unwrap_or_else(Utc::now);
files.push(FileInfo {
path,
size,
created,
});
}
}
files.sort_by_key(|f| f.created);
Ok(files)
}
fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
match self.config.strategy {
CompactionStrategy::SizeBased => self.select_small_files(files),
CompactionStrategy::TimeBased => self.select_old_files(files),
CompactionStrategy::FullCompaction => files.to_vec(),
}
}
fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
let small_files: Vec<FileInfo> = files
.iter()
.filter(|f| f.size < self.config.small_file_threshold as u64)
.cloned()
.collect();
if small_files.len() >= self.config.min_files_to_compact {
small_files
} else {
Vec::new()
}
}
fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
let now = Utc::now();
let age_threshold = chrono::Duration::hours(24);
let old_files: Vec<FileInfo> = files
.iter()
.filter(|f| now - f.created > age_threshold)
.cloned()
.collect();
if old_files.len() >= self.config.min_files_to_compact {
old_files
} else {
Vec::new()
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn should_compact(&self) -> bool {
if !self.config.auto_compact {
return false;
}
let last = self.last_compaction.read();
match *last {
None => true, Some(last_time) => {
let elapsed = (Utc::now() - last_time).num_seconds();
elapsed >= self.config.compaction_interval_seconds as i64
}
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn compact(&self) -> Result<CompactionResult> {
let start_time = std::time::Instant::now();
tracing::info!("🔄 Starting per-tenant compaction sweep...");
let tenants = self.discover_tenants()?;
if tenants.is_empty() {
tracing::debug!("No tenants found under {}", self.storage_dir.display());
return Ok(CompactionResult::default());
}
let mut aggregate = CompactionResult::default();
for tenant in &tenants {
match self.compact_tenant(tenant) {
Ok(r) => {
aggregate.files_compacted += r.files_compacted;
aggregate.bytes_before += r.bytes_before;
aggregate.bytes_after += r.bytes_after;
aggregate.events_compacted += r.events_compacted;
}
Err(e) => {
tracing::error!(
tenant_id = %tenant,
"compact_tenant failed: {e}"
);
}
}
}
aggregate.duration_ms = start_time.elapsed().as_millis() as u64;
if aggregate.files_compacted > 0 {
let mut stats = self.stats.write();
stats.total_compactions += 1;
stats.total_files_compacted += aggregate.files_compacted as u64;
stats.total_bytes_before += aggregate.bytes_before;
stats.total_bytes_after += aggregate.bytes_after;
stats.total_events_compacted += aggregate.events_compacted as u64;
stats.last_compaction_duration_ms = aggregate.duration_ms;
stats.space_saved_bytes += aggregate.bytes_before.saturating_sub(aggregate.bytes_after);
}
*self.last_compaction.write() = Some(Utc::now());
tracing::info!(
"✅ Compaction sweep complete: {} files → 1 snapshot per tenant, \
{:.2} MB → {:.2} MB, {} events, {} tenants in {}ms",
aggregate.files_compacted,
aggregate.bytes_before as f64 / (1024.0 * 1024.0),
aggregate.bytes_after as f64 / (1024.0 * 1024.0),
aggregate.events_compacted,
tenants.len(),
aggregate.duration_ms
);
Ok(aggregate)
}
pub fn compact_tenant(&self, tenant_id: &str) -> Result<CompactionResult> {
let start_time = std::time::Instant::now();
let storage = ParquetStorage::new(&self.storage_dir)?;
let all_files = storage.list_parquet_files_for_tenant(tenant_id)?;
let raw_files: Vec<FileInfo> = all_files
.into_iter()
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.is_none_or(|n| !n.starts_with(SNAPSHOT_PREFIX))
})
.filter_map(|p| {
let metadata = fs::metadata(&p).ok()?;
let size = metadata.len();
let created = metadata
.created()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.and_then(|d| DateTime::from_timestamp(d.as_secs() as i64, 0))
.unwrap_or_else(Utc::now);
Some(FileInfo {
path: p,
size,
created,
})
})
.collect();
let candidates = self.select_files_for_compaction(&raw_files);
if candidates.is_empty() {
tracing::debug!(
tenant_id = tenant_id,
strategy = ?self.config.strategy,
"no files meet compaction criteria"
);
return Ok(CompactionResult::default());
}
let bytes_before: u64 = candidates.iter().map(|f| f.size).sum();
tracing::info!(
tenant_id = tenant_id,
files = candidates.len(),
mib = bytes_before as f64 / (1024.0 * 1024.0),
"compacting tenant"
);
let mut events = Vec::new();
for fi in &candidates {
let event_tenant = match fi.path.strip_prefix(&self.storage_dir).ok() {
Some(rel) => rel
.components()
.next()
.and_then(|c| match c {
std::path::Component::Normal(t) => Some(t.to_string_lossy().into_owned()),
_ => None,
})
.unwrap_or_else(|| "default".to_string()),
None => "default".to_string(),
};
match storage.load_events_from_file_path(&fi.path, &event_tenant) {
Ok(mut e) => events.append(&mut e),
Err(e) => {
tracing::error!(
file = %fi.path.display(),
"failed to read parquet file for compaction: {e}"
);
}
}
}
if events.is_empty() {
tracing::warn!(
tenant_id = tenant_id,
"candidate files had no readable events; skipping snapshot"
);
return Ok(CompactionResult::default());
}
let dropped_by_retention = if let Some(ttl) = self.config.retention.ttl_for(tenant_id) {
let cutoff = Utc::now()
- chrono::Duration::from_std(ttl).unwrap_or_else(|_| chrono::Duration::zero());
let before = events.len();
let (drained, kept): (Vec<_>, Vec<_>) = std::mem::take(&mut events)
.into_iter()
.partition(|e| e.timestamp < cutoff);
events = kept;
let dropped = before - events.len();
if dropped > 0 {
tracing::info!(
retention_tenant = tenant_id,
dropped = dropped,
kept = events.len(),
cutoff = %cutoff.to_rfc3339(),
ttl_secs = ttl.as_secs(),
"retention: dropped events older than TTL"
);
if let Some(archive) = self.config.archive.as_ref() {
let from = drained
.iter()
.map(|e| e.timestamp)
.min()
.expect("dropped > 0 guarantees non-empty drained");
let to = drained
.iter()
.map(|e| e.timestamp)
.max()
.expect("dropped > 0 guarantees non-empty drained");
archive.archive(tenant_id, from, to, &drained)?;
tracing::info!(
retention_tenant = tenant_id,
archived_to = %archive.description(),
archived = drained.len(),
"retention: dropped events archived to cold tier"
);
}
}
dropped
} else {
0
};
if events.is_empty() {
tracing::info!(
tenant_id = tenant_id,
files_dropped = candidates.len(),
events_dropped = dropped_by_retention,
"retention: every event aged out — deleting originals without snapshot"
);
for fi in &candidates {
if let Err(e) = fs::remove_file(&fi.path) {
tracing::error!(
file = %fi.path.display(),
"failed to remove fully-aged raw file: {e}"
);
}
}
return Ok(CompactionResult {
files_compacted: candidates.len(),
bytes_before,
bytes_after: 0,
events_compacted: 0,
duration_ms: start_time.elapsed().as_millis() as u64,
});
}
events.sort_by_key(|e| e.timestamp);
let from = events.first().expect("non-empty checked above").timestamp;
let to = events.last().expect("non-empty checked above").timestamp;
let file_stem = format!(
"snapshot.{tenant_id}.{}-{}",
format_iso_basic(from),
format_iso_basic(to)
);
let snapshot_path = storage.write_atomic_parquet(tenant_id, &file_stem, &events)?;
let bytes_after = fs::metadata(&snapshot_path).map(|m| m.len()).unwrap_or(0);
for fi in &candidates {
if let Err(e) = fs::remove_file(&fi.path) {
tracing::error!(
file = %fi.path.display(),
"failed to remove pre-snapshot raw file: {e}"
);
}
}
let duration_ms = start_time.elapsed().as_millis() as u64;
tracing::info!(
tenant_id = tenant_id,
files_compacted = candidates.len(),
events = events.len(),
dropped_by_retention = dropped_by_retention,
mib_before = bytes_before as f64 / (1024.0 * 1024.0),
mib_after = bytes_after as f64 / (1024.0 * 1024.0),
duration_ms = duration_ms,
"tenant compaction complete"
);
Ok(CompactionResult {
files_compacted: candidates.len(),
bytes_before,
bytes_after,
events_compacted: events.len(),
duration_ms,
})
}
fn discover_tenants(&self) -> Result<Vec<String>> {
let Ok(entries) = fs::read_dir(&self.storage_dir) else {
return Ok(Vec::new());
};
let mut tenants: Vec<String> = entries
.filter_map(std::result::Result::ok)
.filter_map(|entry| {
let ft = entry.file_type().ok()?;
if !ft.is_dir() {
return None;
}
let name = entry.file_name().to_string_lossy().into_owned();
if name.starts_with('.') || name == "__system" {
return None;
}
Some(name)
})
.collect();
tenants.sort();
Ok(tenants)
}
pub fn stats(&self) -> CompactionStats {
(*self.stats.read()).clone()
}
pub fn config(&self) -> &CompactionConfig {
&self.config
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn compact_now(&self) -> Result<CompactionResult> {
tracing::info!("Manual compaction triggered");
self.compact()
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct CompactionResult {
pub files_compacted: usize,
pub bytes_before: u64,
pub bytes_after: u64,
pub events_compacted: usize,
pub duration_ms: u64,
}
pub(super) fn format_iso_basic(t: DateTime<Utc>) -> String {
t.format("%Y-%m-%dT%H%M%SZ").to_string()
}
pub struct CompactionTask {
manager: Arc<CompactionManager>,
interval: Duration,
}
impl CompactionTask {
pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
Self {
manager,
interval: Duration::from_secs(interval_seconds),
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub async fn run(self) {
let mut interval = tokio::time::interval(self.interval);
loop {
interval.tick().await;
if self.manager.should_compact() {
tracing::debug!("Auto-compaction check triggered");
match self.manager.compact() {
Ok(result) => {
if result.files_compacted > 0 {
tracing::info!(
"Auto-compaction succeeded: {} files, {:.2} MB saved",
result.files_compacted,
(result.bytes_before - result.bytes_after) as f64
/ (1024.0 * 1024.0)
);
}
}
Err(e) => {
tracing::error!("Auto-compaction failed: {}", e);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_compaction_manager_creation() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
assert_eq!(manager.stats().total_compactions, 0);
}
#[test]
fn test_should_compact() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
auto_compact: true,
compaction_interval_seconds: 1,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
assert!(manager.should_compact());
}
#[test]
fn test_file_selection_size_based() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
strategy: CompactionStrategy::SizeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let files = vec![
FileInfo {
path: PathBuf::from("small1.parquet"),
size: 500_000, created: Utc::now(),
},
FileInfo {
path: PathBuf::from("small2.parquet"),
size: 600_000, created: Utc::now(),
},
FileInfo {
path: PathBuf::from("large.parquet"),
size: 10_000_000, created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 2); }
#[test]
fn test_default_compaction_config() {
let config = CompactionConfig::default();
assert_eq!(config.min_files_to_compact, 3);
assert_eq!(config.target_file_size, 128 * 1024 * 1024);
assert_eq!(config.max_file_size, 256 * 1024 * 1024);
assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
assert_eq!(config.compaction_interval_seconds, 3600);
assert!(config.auto_compact);
assert_eq!(config.strategy, CompactionStrategy::SizeBased);
}
#[test]
fn test_should_compact_disabled() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
auto_compact: false,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
assert!(!manager.should_compact());
}
#[test]
fn test_compact_empty_directory() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact().unwrap();
assert_eq!(result.files_compacted, 0);
assert_eq!(result.bytes_before, 0);
assert_eq!(result.bytes_after, 0);
assert_eq!(result.events_compacted, 0);
}
#[test]
fn test_compact_now() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact_now().unwrap();
assert_eq!(result.files_compacted, 0);
}
#[test]
fn test_get_config() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
min_files_to_compact: 5,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
assert_eq!(manager.config().min_files_to_compact, 5);
}
#[test]
fn test_get_stats() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let stats = manager.stats();
assert_eq!(stats.total_compactions, 0);
assert_eq!(stats.total_files_compacted, 0);
assert_eq!(stats.total_bytes_before, 0);
assert_eq!(stats.total_bytes_after, 0);
assert_eq!(stats.total_events_compacted, 0);
assert_eq!(stats.last_compaction_duration_ms, 0);
assert_eq!(stats.space_saved_bytes, 0);
}
#[test]
fn test_file_selection_not_enough_small_files() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
small_file_threshold: 1024 * 1024,
min_files_to_compact: 3, strategy: CompactionStrategy::SizeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let files = vec![
FileInfo {
path: PathBuf::from("small1.parquet"),
size: 500_000,
created: Utc::now(),
},
FileInfo {
path: PathBuf::from("small2.parquet"),
size: 600_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 0); }
#[test]
fn test_file_selection_time_based() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
min_files_to_compact: 2,
strategy: CompactionStrategy::TimeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let old_time = Utc::now() - chrono::Duration::hours(48);
let files = vec![
FileInfo {
path: PathBuf::from("old1.parquet"),
size: 1_000_000,
created: old_time,
},
FileInfo {
path: PathBuf::from("old2.parquet"),
size: 2_000_000,
created: old_time,
},
FileInfo {
path: PathBuf::from("new.parquet"),
size: 500_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 2); }
#[test]
fn test_file_selection_time_based_not_enough() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
min_files_to_compact: 3,
strategy: CompactionStrategy::TimeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let old_time = Utc::now() - chrono::Duration::hours(48);
let files = vec![
FileInfo {
path: PathBuf::from("old1.parquet"),
size: 1_000_000,
created: old_time,
},
FileInfo {
path: PathBuf::from("new.parquet"),
size: 500_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 0); }
#[test]
fn test_file_selection_full_compaction() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig {
strategy: CompactionStrategy::FullCompaction,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let files = vec![
FileInfo {
path: PathBuf::from("file1.parquet"),
size: 1_000_000,
created: Utc::now(),
},
FileInfo {
path: PathBuf::from("file2.parquet"),
size: 2_000_000,
created: Utc::now(),
},
];
let selected = manager.select_files_for_compaction(&files);
assert_eq!(selected.len(), 2); }
#[test]
fn test_compaction_strategy_serde() {
let strategies = vec![
CompactionStrategy::SizeBased,
CompactionStrategy::TimeBased,
CompactionStrategy::FullCompaction,
];
for strategy in strategies {
let json = serde_json::to_string(&strategy).unwrap();
let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, strategy);
}
}
#[test]
fn test_compaction_stats_default() {
let stats = CompactionStats::default();
assert_eq!(stats.total_compactions, 0);
assert_eq!(stats.total_files_compacted, 0);
}
#[test]
fn test_compaction_stats_serde() {
let stats = CompactionStats {
total_compactions: 5,
total_files_compacted: 20,
total_bytes_before: 1000000,
total_bytes_after: 500000,
total_events_compacted: 10000,
last_compaction_duration_ms: 500,
space_saved_bytes: 500000,
};
let json = serde_json::to_string(&stats).unwrap();
assert!(json.contains("\"total_compactions\":5"));
assert!(json.contains("\"space_saved_bytes\":500000"));
}
#[test]
fn test_compaction_result_serde() {
let result = CompactionResult {
files_compacted: 3,
bytes_before: 1000000,
bytes_after: 500000,
events_compacted: 5000,
duration_ms: 250,
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("\"files_compacted\":3"));
assert!(json.contains("\"bytes_before\":1000000"));
}
#[test]
fn test_compaction_task_creation() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
let _task = CompactionTask::new(manager.clone(), 60);
}
#[test]
fn test_list_parquet_files_empty() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
let files = manager.list_parquet_files().unwrap();
assert!(files.is_empty());
}
#[test]
fn test_list_parquet_files_with_non_parquet() {
let temp_dir = TempDir::new().unwrap();
let config = CompactionConfig::default();
let manager = CompactionManager::new(temp_dir.path(), config);
std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
let files = manager.list_parquet_files().unwrap();
assert!(files.is_empty()); }
fn ingest_and_flush_per_call(storage_dir: &std::path::Path, tenant: &str, count: usize) {
for i in 0..count {
let storage = ParquetStorage::with_config(
storage_dir,
crate::infrastructure::persistence::ParquetStorageConfig {
batch_size: 1,
..Default::default()
},
)
.unwrap();
let event = crate::domain::entities::Event::from_strings(
"test.event".to_string(),
format!("{tenant}-{i}"),
tenant.to_string(),
serde_json::json!({"i": i}),
None,
)
.unwrap();
storage.append_event(event).unwrap();
storage.flush().unwrap();
}
}
#[test]
fn test_compact_tenant_emits_one_snapshot_and_removes_originals() {
let temp_dir = TempDir::new().unwrap();
ingest_and_flush_per_call(temp_dir.path(), "alice", 4);
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact_tenant("alice").unwrap();
assert_eq!(result.files_compacted, 4);
assert_eq!(result.events_compacted, 4);
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
assert_eq!(
alice_files.len(),
1,
"expected exactly one snapshot file for alice"
);
let name = alice_files[0]
.file_name()
.and_then(|n| n.to_str())
.unwrap()
.to_string();
assert!(
name.starts_with("snapshot.alice."),
"expected snapshot prefix, got {name}"
);
assert!(name.ends_with(".parquet"));
let tmps: Vec<_> = std::fs::read_dir(alice_files[0].parent().unwrap())
.unwrap()
.filter_map(std::result::Result::ok)
.filter(|e| e.path().to_string_lossy().ends_with(".tmp"))
.collect();
assert!(tmps.is_empty());
let loaded = storage.load_events_for_tenant("alice").unwrap();
assert_eq!(loaded.len(), 4);
for e in &loaded {
assert_eq!(e.tenant_id_str(), "alice");
}
}
#[test]
fn test_compact_tenant_skips_existing_snapshot_files() {
let temp_dir = TempDir::new().unwrap();
ingest_and_flush_per_call(temp_dir.path(), "alice", 4);
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let r1 = manager.compact_tenant("alice").unwrap();
assert_eq!(r1.files_compacted, 4);
let r2 = manager.compact_tenant("alice").unwrap();
assert_eq!(r2.files_compacted, 0, "snapshot must not be re-compacted");
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
assert_eq!(alice_files.len(), 1);
}
#[test]
fn test_compact_tenant_below_threshold_is_a_noop() {
let temp_dir = TempDir::new().unwrap();
ingest_and_flush_per_call(temp_dir.path(), "alice", 1);
let manager = CompactionManager::new(temp_dir.path(), CompactionConfig::default());
let result = manager.compact_tenant("alice").unwrap();
assert_eq!(result.files_compacted, 0);
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
assert_eq!(alice_files.len(), 1);
let name = alice_files[0]
.file_name()
.unwrap()
.to_string_lossy()
.into_owned();
assert!(
!name.starts_with("snapshot."),
"raw file must not be renamed"
);
}
#[test]
fn test_compact_iterates_every_tenant() {
let temp_dir = TempDir::new().unwrap();
ingest_and_flush_per_call(temp_dir.path(), "alice", 3);
ingest_and_flush_per_call(temp_dir.path(), "bob", 3);
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact().unwrap();
assert_eq!(result.files_compacted, 6);
assert_eq!(result.events_compacted, 6);
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
for tenant in ["alice", "bob"] {
let files = storage.list_parquet_files_for_tenant(tenant).unwrap();
assert_eq!(files.len(), 1, "{tenant} should have one snapshot");
let name = files[0].file_name().unwrap().to_string_lossy().into_owned();
assert!(name.starts_with(&format!("snapshot.{tenant}.")));
}
}
#[test]
fn test_retention_drops_events_older_than_ttl() {
let temp_dir = TempDir::new().unwrap();
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
let now = Utc::now();
for i in 0..100 {
let day_offset = 60 - (i * 60 / 99);
let ts = now - chrono::Duration::days(i64::from(day_offset));
let event = crate::domain::entities::Event::reconstruct_from_strings(
uuid::Uuid::new_v4(),
"test.event".to_string(),
format!("e-{i}"),
"alice".to_string(),
serde_json::json!({"i": i}),
ts,
None,
1,
);
storage.append_event(event).unwrap();
if i % 10 == 9 {
storage.flush().unwrap();
}
}
storage.flush().unwrap();
let mut retention = RetentionConfig::default();
retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
retention,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact_tenant("alice").unwrap();
assert!(result.events_compacted > 0);
assert!(
result.events_compacted < 100,
"retention should have dropped some events; kept {} of 100",
result.events_compacted
);
let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
let loaded = storage2.load_events_for_tenant("alice").unwrap();
assert_eq!(loaded.len(), result.events_compacted);
let cutoff = Utc::now() - chrono::Duration::days(30);
for e in &loaded {
assert!(
e.timestamp >= cutoff - chrono::Duration::seconds(60),
"event with ts {} survived retention but is older than cutoff {}",
e.timestamp.to_rfc3339(),
cutoff.to_rfc3339()
);
}
}
#[test]
fn test_retention_keeps_forever_by_default_for_non_system_tenants() {
let temp_dir = TempDir::new().unwrap();
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
let now = Utc::now();
for i in 0..6 {
let ts = now - chrono::Duration::days(i * 365);
let event = crate::domain::entities::Event::reconstruct_from_strings(
uuid::Uuid::new_v4(),
"test.event".to_string(),
format!("e-{i}"),
"alice".to_string(),
serde_json::json!({"i": i}),
ts,
None,
1,
);
storage.append_event(event).unwrap();
if i % 2 == 1 {
storage.flush().unwrap();
}
}
storage.flush().unwrap();
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact_tenant("alice").unwrap();
assert_eq!(result.events_compacted, 6, "no events should be dropped");
}
#[test]
fn test_retention_system_tenant_default_is_30_days() {
let cfg = RetentionConfig::default();
let ttl = cfg.ttl_for("system").unwrap();
assert_eq!(ttl.as_secs(), 30 * 24 * 3600);
assert!(cfg.ttl_for("acme").is_none());
}
#[test]
fn test_retention_drops_all_events_deletes_originals_without_snapshot() {
let temp_dir = TempDir::new().unwrap();
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
let very_old = Utc::now() - chrono::Duration::days(90);
for i in 0..6 {
let event = crate::domain::entities::Event::reconstruct_from_strings(
uuid::Uuid::new_v4(),
"test.event".to_string(),
format!("e-{i}"),
"alice".to_string(),
serde_json::json!({"i": i}),
very_old,
None,
1,
);
storage.append_event(event).unwrap();
if i % 2 == 1 {
storage.flush().unwrap();
}
}
storage.flush().unwrap();
let mut retention = RetentionConfig::default();
retention.set("alice", Some(Duration::from_secs(7 * 24 * 3600)));
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
retention,
..Default::default()
};
let manager = CompactionManager::new(temp_dir.path(), config);
let result = manager.compact_tenant("alice").unwrap();
assert_eq!(result.events_compacted, 0);
assert!(result.files_compacted >= 2);
let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
let alice_files = storage2.list_parquet_files_for_tenant("alice").unwrap();
assert!(alice_files.is_empty(), "all originals should be deleted");
}
#[test]
fn test_compaction_with_simulated_crash_leaves_data_recoverable() {
let temp_dir = TempDir::new().unwrap();
ingest_and_flush_per_call(temp_dir.path(), "alice", 3);
let storage = ParquetStorage::new(temp_dir.path()).unwrap();
let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
let partition = alice_files[0].parent().unwrap().to_path_buf();
let crashed_tmp = partition.join("snapshot.alice.range.parquet.tmp");
std::fs::write(&crashed_tmp, b"partial parquet bytes").unwrap();
assert!(crashed_tmp.is_file());
let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
assert!(
!crashed_tmp.exists(),
"stale tmp file should have been cleaned by ParquetStorage::new"
);
let events = storage2.load_events_for_tenant("alice").unwrap();
assert_eq!(events.len(), 3);
}
#[test]
fn test_cold_tier_archives_dropped_events_before_deletion() {
use crate::infrastructure::persistence::cold_tier::LocalFsArchive;
let live_dir = TempDir::new().unwrap();
let archive_dir = TempDir::new().unwrap();
let storage = ParquetStorage::new(live_dir.path()).unwrap();
let now = Utc::now();
for i in 0..50 {
let day_offset = 60 - (i * 60 / 49);
let ts = now - chrono::Duration::days(i64::from(day_offset));
let event = crate::domain::entities::Event::reconstruct_from_strings(
uuid::Uuid::new_v4(),
"test.event".to_string(),
format!("e-{i}"),
"alice".to_string(),
serde_json::json!({"i": i}),
ts,
None,
1,
);
storage.append_event(event).unwrap();
if i % 5 == 4 {
storage.flush().unwrap();
}
}
storage.flush().unwrap();
let mut retention = RetentionConfig::default();
retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
let archive: Arc<dyn ArchiveTarget> =
Arc::new(LocalFsArchive::new(archive_dir.path()).unwrap());
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
retention,
archive: Some(archive),
..Default::default()
};
let manager = CompactionManager::new(live_dir.path(), config);
let result = manager.compact_tenant("alice").unwrap();
assert!(result.events_compacted > 0, "some events kept");
assert!(
result.events_compacted < 50,
"some events dropped to retention; kept {} of 50",
result.events_compacted
);
let live_after = ParquetStorage::new(live_dir.path())
.unwrap()
.load_events_for_tenant("alice")
.unwrap();
assert_eq!(live_after.len(), result.events_compacted);
let mut archive_files = vec![];
let mut stack = vec![archive_dir.path().to_path_buf()];
while let Some(d) = stack.pop() {
for entry in std::fs::read_dir(&d).unwrap().flatten() {
let p = entry.path();
if p.is_dir() {
stack.push(p);
} else if p
.file_name()
.is_some_and(|n| n.to_string_lossy().starts_with("archive.alice."))
{
archive_files.push(p);
}
}
}
assert!(
!archive_files.is_empty(),
"archive directory must contain at least one archive.alice.* file"
);
let archive_storage = ParquetStorage::new(archive_dir.path()).unwrap();
let archived = archive_storage.load_events_for_tenant("alice").unwrap();
assert_eq!(
live_after.len() + archived.len(),
50,
"live + archived must equal original event count (live={}, archived={})",
live_after.len(),
archived.len()
);
}
#[test]
fn test_cold_tier_failure_keeps_originals_on_disk() {
let live_dir = TempDir::new().unwrap();
let storage = ParquetStorage::new(live_dir.path()).unwrap();
let now = Utc::now();
for i in 0..20 {
let ts = now - chrono::Duration::days(60 - i);
let event = crate::domain::entities::Event::reconstruct_from_strings(
uuid::Uuid::new_v4(),
"test.event".to_string(),
format!("e-{i}"),
"alice".to_string(),
serde_json::json!({"i": i}),
ts,
None,
1,
);
storage.append_event(event).unwrap();
if i % 5 == 4 {
storage.flush().unwrap();
}
}
storage.flush().unwrap();
let count_files = |dir: &std::path::Path| -> usize {
let mut n = 0;
let mut stack = vec![dir.to_path_buf()];
while let Some(d) = stack.pop() {
for entry in std::fs::read_dir(&d).unwrap().flatten() {
let p = entry.path();
if p.is_dir() {
stack.push(p);
} else if p.extension().is_some_and(|e| e == "parquet") {
n += 1;
}
}
}
n
};
let before = count_files(live_dir.path());
assert!(before > 0);
#[derive(Debug)]
struct FailingArchive;
impl ArchiveTarget for FailingArchive {
fn archive(
&self,
_: &str,
_: DateTime<Utc>,
_: DateTime<Utc>,
_: &[crate::domain::entities::Event],
) -> Result<()> {
Err(AllSourceError::StorageError(
"simulated archive outage".to_string(),
))
}
}
let mut retention = RetentionConfig::default();
retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
retention,
archive: Some(Arc::new(FailingArchive) as Arc<dyn ArchiveTarget>),
..Default::default()
};
let manager = CompactionManager::new(live_dir.path(), config);
let result = manager.compact_tenant("alice");
assert!(result.is_err(), "compaction must fail when archive fails");
let after = count_files(live_dir.path());
assert_eq!(
before, after,
"no files should be removed after archive failure"
);
let storage2 = ParquetStorage::new(live_dir.path()).unwrap();
let loaded = storage2.load_events_for_tenant("alice").unwrap();
assert_eq!(
loaded.len(),
20,
"all 20 events still present after failed archive"
);
}
#[test]
fn test_cold_tier_not_invoked_when_no_events_dropped() {
let live_dir = TempDir::new().unwrap();
let storage = ParquetStorage::new(live_dir.path()).unwrap();
let now = Utc::now();
for i in 0..10 {
let ts = now - chrono::Duration::hours(i);
let event = crate::domain::entities::Event::reconstruct_from_strings(
uuid::Uuid::new_v4(),
"test.event".to_string(),
format!("e-{i}"),
"alice".to_string(),
serde_json::json!({"i": i}),
ts,
None,
1,
);
storage.append_event(event).unwrap();
if i % 3 == 2 {
storage.flush().unwrap();
}
}
storage.flush().unwrap();
#[derive(Debug)]
struct PanickingArchive;
impl ArchiveTarget for PanickingArchive {
fn archive(
&self,
_: &str,
_: DateTime<Utc>,
_: DateTime<Utc>,
_: &[crate::domain::entities::Event],
) -> Result<()> {
panic!("archive must not be called when no events are dropped");
}
}
let config = CompactionConfig {
min_files_to_compact: 2,
small_file_threshold: 100 * 1024 * 1024,
strategy: CompactionStrategy::SizeBased,
archive: Some(Arc::new(PanickingArchive) as Arc<dyn ArchiveTarget>),
..Default::default()
};
let manager = CompactionManager::new(live_dir.path(), config);
let result = manager.compact_tenant("alice").unwrap();
assert_eq!(result.events_compacted, 10);
}
#[test]
fn test_discover_tenants_skips_system_and_hidden() {
let temp_dir = TempDir::new().unwrap();
std::fs::create_dir_all(temp_dir.path().join("alice")).unwrap();
std::fs::create_dir_all(temp_dir.path().join("bob")).unwrap();
std::fs::create_dir_all(temp_dir.path().join("__system")).unwrap();
std::fs::create_dir_all(temp_dir.path().join(".hidden")).unwrap();
let manager = CompactionManager::new(temp_dir.path(), CompactionConfig::default());
let tenants = manager.discover_tenants().unwrap();
assert_eq!(tenants, vec!["alice".to_string(), "bob".to_string()]);
}
}