use std::io::Write;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Duration, TimeZone, Utc};
use flate2::write::GzEncoder;
use flate2::Compression;
use crate::core::error::{Error, Result};
use crate::core::eventlog::{EventLog, Segment};
use crate::core::fs::atomic_write;
use crate::core::HealPaths;
#[derive(Debug, Clone, Copy)]
pub struct CompactionPolicy {
pub gzip_after: Duration,
pub delete_after: Duration,
}
impl CompactionPolicy {
pub const DEFAULT_GZIP_AFTER_DAYS: i64 = 90;
pub const DEFAULT_DELETE_AFTER_DAYS: i64 = 365;
}
impl Default for CompactionPolicy {
fn default() -> Self {
Self {
gzip_after: Duration::days(Self::DEFAULT_GZIP_AFTER_DAYS),
delete_after: Duration::days(Self::DEFAULT_DELETE_AFTER_DAYS),
}
}
}
#[derive(Debug, Default, Clone)]
pub struct CompactionStats {
pub gzipped: Vec<PathBuf>,
pub deleted: Vec<PathBuf>,
}
impl CompactionStats {
#[must_use]
pub fn touched(&self) -> bool {
!self.gzipped.is_empty() || !self.deleted.is_empty()
}
}
pub fn compact(
log: &EventLog,
policy: &CompactionPolicy,
now: DateTime<Utc>,
) -> Result<CompactionStats> {
let mut stats = CompactionStats::default();
for segment in log.segments()? {
if is_older_than(&segment, policy.delete_after, now) {
std::fs::remove_file(&segment.path).map_err(|e| Error::Io {
path: segment.path.clone(),
source: e,
})?;
stats.deleted.push(segment.path);
continue;
}
if is_older_than(&segment, policy.gzip_after, now) && !segment.compressed {
let gz_path = gzip_path(&segment.path);
let bytes = std::fs::read(&segment.path).map_err(|e| Error::Io {
path: segment.path.clone(),
source: e,
})?;
atomic_write(&gz_path, &gzip(&bytes, &gz_path)?)?;
std::fs::remove_file(&segment.path).map_err(|e| Error::Io {
path: segment.path.clone(),
source: e,
})?;
stats.gzipped.push(gz_path);
}
}
Ok(stats)
}
pub fn compact_all(
paths: &HealPaths,
policy: &CompactionPolicy,
now: DateTime<Utc>,
) -> Result<Vec<(&'static str, CompactionStats)>> {
[
("snapshots", paths.snapshots_dir()),
("logs", paths.logs_dir()),
("checks", paths.checks_dir()),
]
.into_iter()
.map(|(label, dir)| compact(&EventLog::new(&dir), policy, now).map(|stats| (label, stats)))
.collect()
}
fn gzip_path(plaintext: &Path) -> PathBuf {
let mut s = plaintext.as_os_str().to_owned();
s.push(".gz");
PathBuf::from(s)
}
fn gzip(bytes: &[u8], path: &Path) -> Result<Vec<u8>> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(bytes).map_err(|e| Error::Io {
path: path.to_path_buf(),
source: e,
})?;
encoder.finish().map_err(|e| Error::Io {
path: path.to_path_buf(),
source: e,
})
}
fn is_older_than(segment: &Segment, threshold: Duration, now: DateTime<Utc>) -> bool {
let completed = segment_completed_at(segment.year, segment.month);
now - completed >= threshold
}
fn segment_completed_at(year: i32, month: u32) -> DateTime<Utc> {
let (ny, nm) = if month == 12 {
(year + 1, 1)
} else {
(year, month + 1)
};
Utc.with_ymd_and_hms(ny, nm, 1, 0, 0, 0)
.single()
.expect("valid first-of-month")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::eventlog::Event;
use serde_json::json;
use tempfile::tempdir;
fn write_event(log: &EventLog, ts: DateTime<Utc>, payload: i64) {
log.append(&Event {
timestamp: ts,
event: "test".into(),
data: json!({ "n": payload }),
})
.expect("append");
}
#[test]
fn gzips_old_segment_in_place() {
let dir = tempdir().unwrap();
let log = EventLog::new(dir.path());
let old = Utc.with_ymd_and_hms(2025, 12, 15, 0, 0, 0).unwrap();
write_event(&log, old, 1);
write_event(&log, old + Duration::hours(1), 2);
let now = Utc.with_ymd_and_hms(2026, 4, 30, 0, 0, 0).unwrap();
let stats = compact(&log, &CompactionPolicy::default(), now).unwrap();
assert_eq!(stats.gzipped.len(), 1);
assert!(stats.deleted.is_empty());
assert!(!dir.path().join("2025-12.jsonl").exists());
assert!(dir.path().join("2025-12.jsonl.gz").exists());
let segments = log.segments().unwrap();
assert_eq!(segments.len(), 1);
assert!(segments[0].compressed);
let events: Vec<_> = EventLog::iter_segments(segments)
.filter_map(std::result::Result::ok)
.collect();
assert_eq!(events.len(), 2);
}
#[test]
fn deletes_segments_past_year() {
let dir = tempdir().unwrap();
let log = EventLog::new(dir.path());
let ancient = Utc.with_ymd_and_hms(2024, 1, 5, 0, 0, 0).unwrap();
write_event(&log, ancient, 1);
let now = Utc.with_ymd_and_hms(2026, 4, 30, 0, 0, 0).unwrap();
let stats = compact(&log, &CompactionPolicy::default(), now).unwrap();
assert_eq!(stats.deleted.len(), 1);
assert!(!dir.path().join("2024-01.jsonl").exists());
assert!(log.segments().unwrap().is_empty());
}
#[test]
fn does_not_touch_recent_segments() {
let dir = tempdir().unwrap();
let log = EventLog::new(dir.path());
let recent = Utc.with_ymd_and_hms(2026, 3, 10, 0, 0, 0).unwrap();
write_event(&log, recent, 1);
let now = Utc.with_ymd_and_hms(2026, 4, 30, 0, 0, 0).unwrap();
let stats = compact(&log, &CompactionPolicy::default(), now).unwrap();
assert!(!stats.touched());
assert!(dir.path().join("2026-03.jsonl").exists());
}
#[test]
fn second_pass_is_a_noop() {
let dir = tempdir().unwrap();
let log = EventLog::new(dir.path());
let old = Utc.with_ymd_and_hms(2025, 12, 15, 0, 0, 0).unwrap();
write_event(&log, old, 1);
let now = Utc.with_ymd_and_hms(2026, 4, 30, 0, 0, 0).unwrap();
compact(&log, &CompactionPolicy::default(), now).unwrap();
let stats2 = compact(&log, &CompactionPolicy::default(), now).unwrap();
assert!(!stats2.touched());
}
#[test]
fn segment_completed_at_handles_year_boundary() {
let dec = segment_completed_at(2025, 12);
assert_eq!(dec, Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap());
let jan = segment_completed_at(2025, 1);
assert_eq!(jan, Utc.with_ymd_and_hms(2025, 2, 1, 0, 0, 0).unwrap());
}
#[test]
fn missing_directory_is_a_noop() {
let dir = tempdir().unwrap();
let log = EventLog::new(dir.path().join("does-not-exist"));
let now = Utc.with_ymd_and_hms(2026, 4, 30, 0, 0, 0).unwrap();
let stats = compact(&log, &CompactionPolicy::default(), now).unwrap();
assert!(!stats.touched());
}
}