Skip to main content

allsource_core/infrastructure/persistence/
compaction.rs

1use crate::{
2    error::{AllSourceError, Result},
3    infrastructure::persistence::{cold_tier::ArchiveTarget, storage::ParquetStorage},
4};
5use chrono::{DateTime, Utc};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use std::{collections::HashMap, fs, path::PathBuf, sync::Arc, time::Duration};
9
10/// Manages Parquet file compaction for optimal storage and query performance.
11///
12/// Step 4 of the sustainable data strategy moved compaction from a
13/// global-stream pass to a per-tenant pass. Each invocation iterates
14/// the tenants discovered under `<storage_dir>/<tenant>/...` and
15/// emits a `snapshot.<tenant>.<from>-<to>.parquet` per qualifying
16/// chunk. Snapshot files are written atomically (tmp + rename) and
17/// their constituent raw files are removed only after the rename
18/// succeeds — so a mid-compaction crash leaves data intact.
19pub struct CompactionManager {
20    /// Directory where Parquet files are stored
21    storage_dir: PathBuf,
22
23    /// Configuration
24    config: CompactionConfig,
25
26    /// Statistics
27    stats: Arc<RwLock<CompactionStats>>,
28
29    /// Last compaction time
30    last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
31}
32
33/// Filename prefix that marks a file as already-compacted output.
34/// Excluded from the input set when picking compaction candidates
35/// (we don't re-compact a snapshot until the snapshot itself
36/// triggers the strategy criteria from a future commit).
37const SNAPSHOT_PREFIX: &str = "snapshot.";
38
39#[derive(Debug, Clone)]
40pub struct CompactionConfig {
41    /// Minimum number of files to trigger compaction
42    pub min_files_to_compact: usize,
43
44    /// Target size for compacted files (in bytes)
45    pub target_file_size: usize,
46
47    /// Maximum size for a single compacted file (in bytes)
48    pub max_file_size: usize,
49
50    /// Minimum file size to consider for compaction (small files)
51    pub small_file_threshold: usize,
52
53    /// Time interval between automatic compactions (in seconds)
54    pub compaction_interval_seconds: u64,
55
56    /// Enable automatic background compaction
57    pub auto_compact: bool,
58
59    /// Compaction strategy
60    pub strategy: CompactionStrategy,
61
62    /// Per-tenant retention TTLs (Step 5 of the sustainable data
63    /// strategy). Applied during the same compaction pass — events
64    /// older than `now - ttl` for that tenant are dropped from the
65    /// snapshot output and the originals are removed. Default
66    /// honors the bead: tenant `system` keeps 30 days; everyone
67    /// else keeps forever.
68    pub retention: RetentionConfig,
69
70    /// Optional cold-tier archive. When set, events that would be
71    /// dropped by retention are archived to this target BEFORE the
72    /// originals are deleted. A failed archive aborts the
73    /// compaction pass — originals stay on disk and the next run
74    /// retries. Default `None` preserves the pre-cold-tier behavior:
75    /// retention deletes outright. See
76    /// `infrastructure::persistence::cold_tier`.
77    pub archive: Option<Arc<dyn ArchiveTarget>>,
78}
79
80/// Per-tenant retention configuration. Look up a TTL via
81/// `ttl_for(tenant_id)`; `None` means "keep forever" for that
82/// tenant.
83///
84/// The default rule (from the bead): the CP heartbeat tenant
85/// (`system`) defaults to 30 days. The CP emits ~69k heartbeat
86/// events/day; without retention this grows unbounded for data
87/// that has no audit value past the dashboard window. Other
88/// tenants default to no TTL — user data stays put unless the
89/// owner opts in.
90///
91/// Per-tenant overrides win over `default_ttl`; "no entry" falls
92/// back to `default_ttl`.
93#[derive(Debug, Clone)]
94pub struct RetentionConfig {
95    /// Default TTL when no per-tenant override exists. `None` = keep forever.
96    pub default_ttl: Option<Duration>,
97    /// Per-tenant overrides. `Some(None)` would mean "explicitly no
98    /// TTL"; the API uses `Option<Duration>` directly so an entry
99    /// can record an explicit "keep forever" decision distinct
100    /// from "no entry".
101    pub per_tenant_ttl: HashMap<String, Option<Duration>>,
102}
103
104impl Default for RetentionConfig {
105    fn default() -> Self {
106        let mut per_tenant_ttl = HashMap::new();
107        per_tenant_ttl.insert(
108            "system".to_string(),
109            Some(Duration::from_secs(30 * 24 * 3600)),
110        );
111        Self {
112            default_ttl: None,
113            per_tenant_ttl,
114        }
115    }
116}
117
118impl RetentionConfig {
119    /// Effective TTL for `tenant_id`. Returns `None` if the tenant
120    /// has no TTL (keep forever).
121    ///
122    /// Lookup order:
123    /// 1. Per-tenant entry → that value (whether Some or explicit None).
124    /// 2. No entry → fall back to `default_ttl`.
125    pub fn ttl_for(&self, tenant_id: &str) -> Option<Duration> {
126        match self.per_tenant_ttl.get(tenant_id) {
127            Some(v) => *v,
128            None => self.default_ttl,
129        }
130    }
131
132    /// Override the TTL for a specific tenant. Use `None` to mean
133    /// "keep forever for this tenant".
134    pub fn set(&mut self, tenant_id: &str, ttl: Option<Duration>) {
135        self.per_tenant_ttl.insert(tenant_id.to_string(), ttl);
136    }
137}
138
139impl Default for CompactionConfig {
140    fn default() -> Self {
141        Self {
142            min_files_to_compact: 3,
143            target_file_size: 128 * 1024 * 1024,    // 128 MB
144            max_file_size: 256 * 1024 * 1024,       // 256 MB
145            small_file_threshold: 10 * 1024 * 1024, // 10 MB
146            compaction_interval_seconds: 3600,      // 1 hour
147            auto_compact: true,
148            strategy: CompactionStrategy::SizeBased,
149            retention: RetentionConfig::default(),
150            archive: None,
151        }
152    }
153}
154
155impl CompactionConfig {
156    /// Build a config from the relevant env vars:
157    /// - `ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS`: per-pass cadence
158    ///   (default 3600).
159    /// - `ALLSOURCE_RETENTION_SYSTEM_DAYS`: TTL for the `system`
160    ///   tenant in days (default 30).
161    ///
162    /// Unparseable values log a warning and fall back to defaults
163    /// — boot doesn't fail.
164    pub fn from_env() -> Self {
165        Self::from_env_vars(
166            std::env::var("ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS").ok(),
167            std::env::var("ALLSOURCE_RETENTION_SYSTEM_DAYS").ok(),
168        )
169    }
170
171    /// Testable variant of `from_env`. Production calls `from_env`;
172    /// tests pass explicit values.
173    pub fn from_env_vars(
174        interval_var: Option<String>,
175        system_retention_days_var: Option<String>,
176    ) -> Self {
177        let mut config = Self::default();
178        if let Some(s) = interval_var.filter(|s| !s.is_empty()) {
179            match s.parse::<u64>() {
180                Ok(v) => config.compaction_interval_seconds = v,
181                Err(e) => {
182                    tracing::warn!(
183                        "ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS={s:?} could not be parsed as \
184                         u64: {e}; defaulting to {}s",
185                        config.compaction_interval_seconds
186                    );
187                }
188            }
189        }
190        if let Some(s) = system_retention_days_var.filter(|s| !s.is_empty()) {
191            match s.parse::<u64>() {
192                Ok(days) => {
193                    config
194                        .retention
195                        .set("system", Some(Duration::from_secs(days * 24 * 3600)));
196                }
197                Err(e) => {
198                    tracing::warn!(
199                        "ALLSOURCE_RETENTION_SYSTEM_DAYS={s:?} could not be parsed as u64: \
200                         {e}; defaulting to 30 days for tenant=system"
201                    );
202                }
203            }
204        }
205        config
206    }
207
208    /// Backwards-compatible single-arg variant for existing
209    /// callers that only set the snapshot interval.
210    pub fn from_env_var(interval_var: Option<String>) -> Self {
211        Self::from_env_vars(interval_var, None)
212    }
213}
214
215#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
216#[serde(rename_all = "lowercase")]
217pub enum CompactionStrategy {
218    /// Compact based on file size (default)
219    SizeBased,
220    /// Compact based on file age
221    TimeBased,
222    /// Compact all files into one
223    FullCompaction,
224}
225
226#[derive(Debug, Clone, Default, Serialize)]
227pub struct CompactionStats {
228    pub total_compactions: u64,
229    pub total_files_compacted: u64,
230    pub total_bytes_before: u64,
231    pub total_bytes_after: u64,
232    pub total_events_compacted: u64,
233    pub last_compaction_duration_ms: u64,
234    pub space_saved_bytes: u64,
235}
236
237/// Information about a Parquet file candidate for compaction
238#[derive(Debug, Clone)]
239struct FileInfo {
240    path: PathBuf,
241    size: u64,
242    created: DateTime<Utc>,
243}
244
245impl CompactionManager {
246    /// Create a new compaction manager
247    pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
248        let storage_dir = storage_dir.into();
249
250        tracing::info!(
251            "✅ Compaction manager initialized at: {}",
252            storage_dir.display()
253        );
254
255        Self {
256            storage_dir,
257            config,
258            stats: Arc::new(RwLock::new(CompactionStats::default())),
259            last_compaction: Arc::new(RwLock::new(None)),
260        }
261    }
262
263    /// List all Parquet files in the storage directory
264    fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
265        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
266            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
267        })?;
268
269        let mut files = Vec::new();
270
271        for entry in entries {
272            let entry = entry.map_err(|e| {
273                AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
274            })?;
275
276            let path = entry.path();
277            if let Some(ext) = path.extension()
278                && ext == "parquet"
279            {
280                let metadata = entry.metadata().map_err(|e| {
281                    AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
282                })?;
283
284                let size = metadata.len();
285                let created = metadata
286                    .created()
287                    .ok()
288                    .and_then(|t| {
289                        t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
290                            DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
291                        })
292                    })
293                    .unwrap_or_else(Utc::now);
294
295                files.push(FileInfo {
296                    path,
297                    size,
298                    created,
299                });
300            }
301        }
302
303        // Sort by creation time (oldest first)
304        files.sort_by_key(|f| f.created);
305
306        Ok(files)
307    }
308
309    /// Identify files that should be compacted based on strategy
310    fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
311        match self.config.strategy {
312            CompactionStrategy::SizeBased => self.select_small_files(files),
313            CompactionStrategy::TimeBased => self.select_old_files(files),
314            CompactionStrategy::FullCompaction => files.to_vec(),
315        }
316    }
317
318    /// Select small files for compaction
319    fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
320        let small_files: Vec<FileInfo> = files
321            .iter()
322            .filter(|f| f.size < self.config.small_file_threshold as u64)
323            .cloned()
324            .collect();
325
326        // Only compact if we have enough small files
327        if small_files.len() >= self.config.min_files_to_compact {
328            small_files
329        } else {
330            Vec::new()
331        }
332    }
333
334    /// Select old files for time-based compaction
335    fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
336        let now = Utc::now();
337        let age_threshold = chrono::Duration::hours(24); // Files older than 24 hours
338
339        let old_files: Vec<FileInfo> = files
340            .iter()
341            .filter(|f| now - f.created > age_threshold)
342            .cloned()
343            .collect();
344
345        if old_files.len() >= self.config.min_files_to_compact {
346            old_files
347        } else {
348            Vec::new()
349        }
350    }
351
352    /// Check if compaction should run
353    #[cfg_attr(feature = "hotpath", hotpath::measure)]
354    pub fn should_compact(&self) -> bool {
355        if !self.config.auto_compact {
356            return false;
357        }
358
359        let last = self.last_compaction.read();
360        match *last {
361            None => true, // Never compacted
362            Some(last_time) => {
363                let elapsed = (Utc::now() - last_time).num_seconds();
364                elapsed >= self.config.compaction_interval_seconds as i64
365            }
366        }
367    }
368
369    /// Perform compaction across every discovered tenant.
370    ///
371    /// Iterates the tenants under `<storage_dir>/<tenant>/...`, calls
372    /// `compact_tenant` for each, and aggregates the results. Step 4
373    /// of the sustainable data strategy: per-tenant compaction
374    /// instead of global, keyed off the per-tenant directory tree
375    /// Step 1 introduced.
376    ///
377    /// Errors compacting one tenant are logged but don't abort the
378    /// pass — other tenants still get compacted. The aggregate
379    /// result reflects what actually completed.
380    #[cfg_attr(feature = "hotpath", hotpath::measure)]
381    pub fn compact(&self) -> Result<CompactionResult> {
382        let start_time = std::time::Instant::now();
383        tracing::info!("🔄 Starting per-tenant compaction sweep...");
384
385        let tenants = self.discover_tenants()?;
386        if tenants.is_empty() {
387            tracing::debug!("No tenants found under {}", self.storage_dir.display());
388            return Ok(CompactionResult::default());
389        }
390
391        let mut aggregate = CompactionResult::default();
392        for tenant in &tenants {
393            match self.compact_tenant(tenant) {
394                Ok(r) => {
395                    aggregate.files_compacted += r.files_compacted;
396                    aggregate.bytes_before += r.bytes_before;
397                    aggregate.bytes_after += r.bytes_after;
398                    aggregate.events_compacted += r.events_compacted;
399                }
400                Err(e) => {
401                    tracing::error!(
402                        tenant_id = %tenant,
403                        "compact_tenant failed: {e}"
404                    );
405                }
406            }
407        }
408        aggregate.duration_ms = start_time.elapsed().as_millis() as u64;
409
410        if aggregate.files_compacted > 0 {
411            let mut stats = self.stats.write();
412            stats.total_compactions += 1;
413            stats.total_files_compacted += aggregate.files_compacted as u64;
414            stats.total_bytes_before += aggregate.bytes_before;
415            stats.total_bytes_after += aggregate.bytes_after;
416            stats.total_events_compacted += aggregate.events_compacted as u64;
417            stats.last_compaction_duration_ms = aggregate.duration_ms;
418            stats.space_saved_bytes += aggregate.bytes_before.saturating_sub(aggregate.bytes_after);
419        }
420        *self.last_compaction.write() = Some(Utc::now());
421
422        tracing::info!(
423            "✅ Compaction sweep complete: {} files → 1 snapshot per tenant, \
424             {:.2} MB → {:.2} MB, {} events, {} tenants in {}ms",
425            aggregate.files_compacted,
426            aggregate.bytes_before as f64 / (1024.0 * 1024.0),
427            aggregate.bytes_after as f64 / (1024.0 * 1024.0),
428            aggregate.events_compacted,
429            tenants.len(),
430            aggregate.duration_ms
431        );
432
433        Ok(aggregate)
434    }
435
436    /// Compact one tenant's raw event files into a single snapshot
437    /// file under that tenant's partition.
438    ///
439    /// Per-tenant pipeline:
440    /// 1. List `<storage>/<tenant>/...*.parquet` excluding existing
441    ///    `snapshot.*` files.
442    /// 2. Apply the configured strategy (size / time / full) to
443    ///    pick candidate raw files.
444    /// 3. If enough candidates: read events, sort by timestamp,
445    ///    atomically write `snapshot.<tenant>.<from>-<to>.parquet`
446    ///    via `ParquetStorage::write_atomic_parquet`.
447    /// 4. After the snapshot rename succeeds, delete the
448    ///    constituent raw files. Crash between snapshot and delete
449    ///    leaves both on disk; the dedupe in `append_loaded_event`
450    ///    keeps memory consistent on next load. A future commit can
451    ///    record the constituent file list in the snapshot's
452    ///    metadata and let a cleanup pass finish the deletion.
453    pub fn compact_tenant(&self, tenant_id: &str) -> Result<CompactionResult> {
454        let start_time = std::time::Instant::now();
455
456        // 1. List raw files for this tenant.
457        let storage = ParquetStorage::new(&self.storage_dir)?;
458        let all_files = storage.list_parquet_files_for_tenant(tenant_id)?;
459        let raw_files: Vec<FileInfo> = all_files
460            .into_iter()
461            .filter(|p| {
462                p.file_name()
463                    .and_then(|n| n.to_str())
464                    .is_none_or(|n| !n.starts_with(SNAPSHOT_PREFIX))
465            })
466            .filter_map(|p| {
467                let metadata = fs::metadata(&p).ok()?;
468                let size = metadata.len();
469                let created = metadata
470                    .created()
471                    .ok()
472                    .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
473                    .and_then(|d| DateTime::from_timestamp(d.as_secs() as i64, 0))
474                    .unwrap_or_else(Utc::now);
475                Some(FileInfo {
476                    path: p,
477                    size,
478                    created,
479                })
480            })
481            .collect();
482
483        // 2. Strategy filter.
484        let candidates = self.select_files_for_compaction(&raw_files);
485        if candidates.is_empty() {
486            tracing::debug!(
487                tenant_id = tenant_id,
488                strategy = ?self.config.strategy,
489                "no files meet compaction criteria"
490            );
491            return Ok(CompactionResult::default());
492        }
493
494        let bytes_before: u64 = candidates.iter().map(|f| f.size).sum();
495        tracing::info!(
496            tenant_id = tenant_id,
497            files = candidates.len(),
498            mib = bytes_before as f64 / (1024.0 * 1024.0),
499            "compacting tenant"
500        );
501
502        // 3. Read events from candidates. We deliberately read each
503        // file individually (not via load_events_for_tenant) so we
504        // only pick up the candidate set — concurrent writes that
505        // produced new files since step 1 are skipped, deferred to
506        // the next interval (AC #6).
507        let mut events = Vec::new();
508        for fi in &candidates {
509            // Recover tenant from path so loaded events keep
510            // their identity (Step 1's path-as-tenant-source).
511            let event_tenant = match fi.path.strip_prefix(&self.storage_dir).ok() {
512                Some(rel) => rel
513                    .components()
514                    .next()
515                    .and_then(|c| match c {
516                        std::path::Component::Normal(t) => Some(t.to_string_lossy().into_owned()),
517                        _ => None,
518                    })
519                    .unwrap_or_else(|| "default".to_string()),
520                None => "default".to_string(),
521            };
522            match storage.load_events_from_file_path(&fi.path, &event_tenant) {
523                Ok(mut e) => events.append(&mut e),
524                Err(e) => {
525                    tracing::error!(
526                        file = %fi.path.display(),
527                        "failed to read parquet file for compaction: {e}"
528                    );
529                }
530            }
531        }
532
533        if events.is_empty() {
534            tracing::warn!(
535                tenant_id = tenant_id,
536                "candidate files had no readable events; skipping snapshot"
537            );
538            return Ok(CompactionResult::default());
539        }
540
541        // Apply retention (Step 5). Drop events older than the
542        // tenant's TTL before they get rewritten into the
543        // snapshot. The originals get deleted at the end of the
544        // happy path either way, so dropped events go away with
545        // the same crash-safe guarantee as the rest of the
546        // compaction pipeline (snapshot rename completes BEFORE
547        // any original is deleted; AC #6).
548        //
549        // Cold-tier (sustainability): when an `archive` target is
550        // configured, dropped events are written to it BEFORE this
551        // function deletes any original file. A failed archive
552        // returns `Err`, originals stay on disk, and the next
553        // compaction pass retries — same crash-safety contract as
554        // the snapshot path. Without an archive, retention behaves
555        // exactly as before (delete outright).
556        let dropped_by_retention = if let Some(ttl) = self.config.retention.ttl_for(tenant_id) {
557            let cutoff = Utc::now()
558                - chrono::Duration::from_std(ttl).unwrap_or_else(|_| chrono::Duration::zero());
559            let before = events.len();
560
561            // Partition: drained = events past TTL, kept = events to keep.
562            // Using drain_filter would be simpler but it's unstable;
563            // partition + reassign keeps events: Vec<Event> with the
564            // kept slice in original order.
565            let (drained, kept): (Vec<_>, Vec<_>) = std::mem::take(&mut events)
566                .into_iter()
567                .partition(|e| e.timestamp < cutoff);
568            events = kept;
569            let dropped = before - events.len();
570
571            if dropped > 0 {
572                tracing::info!(
573                    retention_tenant = tenant_id,
574                    dropped = dropped,
575                    kept = events.len(),
576                    cutoff = %cutoff.to_rfc3339(),
577                    ttl_secs = ttl.as_secs(),
578                    "retention: dropped events older than TTL"
579                );
580
581                if let Some(archive) = self.config.archive.as_ref() {
582                    let from = drained
583                        .iter()
584                        .map(|e| e.timestamp)
585                        .min()
586                        .expect("dropped > 0 guarantees non-empty drained");
587                    let to = drained
588                        .iter()
589                        .map(|e| e.timestamp)
590                        .max()
591                        .expect("dropped > 0 guarantees non-empty drained");
592                    archive.archive(tenant_id, from, to, &drained)?;
593                    tracing::info!(
594                        retention_tenant = tenant_id,
595                        archived_to = %archive.description(),
596                        archived = drained.len(),
597                        "retention: dropped events archived to cold tier"
598                    );
599                }
600            }
601            dropped
602        } else {
603            0
604        };
605
606        // Edge case: every event aged out. Skip the snapshot
607        // write and just delete originals — the data is gone by
608        // design. Delete-without-snapshot is safe here because
609        // every event we'd have written to the snapshot was
610        // already past its TTL, and the input files live under
611        // the tenant's partition with nothing else relying on
612        // them.
613        if events.is_empty() {
614            tracing::info!(
615                tenant_id = tenant_id,
616                files_dropped = candidates.len(),
617                events_dropped = dropped_by_retention,
618                "retention: every event aged out — deleting originals without snapshot"
619            );
620            for fi in &candidates {
621                if let Err(e) = fs::remove_file(&fi.path) {
622                    tracing::error!(
623                        file = %fi.path.display(),
624                        "failed to remove fully-aged raw file: {e}"
625                    );
626                }
627            }
628            return Ok(CompactionResult {
629                files_compacted: candidates.len(),
630                bytes_before,
631                bytes_after: 0,
632                events_compacted: 0,
633                duration_ms: start_time.elapsed().as_millis() as u64,
634            });
635        }
636
637        events.sort_by_key(|e| e.timestamp);
638        let from = events.first().expect("non-empty checked above").timestamp;
639        let to = events.last().expect("non-empty checked above").timestamp;
640
641        // Filename: snapshot.<tenant>.<from>-<to> with filesystem-safe
642        // ISO-basic timestamps (no colons).
643        let file_stem = format!(
644            "snapshot.{tenant_id}.{}-{}",
645            format_iso_basic(from),
646            format_iso_basic(to)
647        );
648        let snapshot_path = storage.write_atomic_parquet(tenant_id, &file_stem, &events)?;
649        let bytes_after = fs::metadata(&snapshot_path).map(|m| m.len()).unwrap_or(0);
650
651        // 4. Delete originals AFTER snapshot is durably renamed.
652        // AC #6: a snapshot-write failure short-circuits via the
653        // `?` above, so originals stay on disk and events remain
654        // queryable until the next successful pass.
655        for fi in &candidates {
656            if let Err(e) = fs::remove_file(&fi.path) {
657                tracing::error!(
658                    file = %fi.path.display(),
659                    "failed to remove pre-snapshot raw file: {e}"
660                );
661            }
662        }
663
664        let duration_ms = start_time.elapsed().as_millis() as u64;
665        tracing::info!(
666            tenant_id = tenant_id,
667            files_compacted = candidates.len(),
668            events = events.len(),
669            dropped_by_retention = dropped_by_retention,
670            mib_before = bytes_before as f64 / (1024.0 * 1024.0),
671            mib_after = bytes_after as f64 / (1024.0 * 1024.0),
672            duration_ms = duration_ms,
673            "tenant compaction complete"
674        );
675
676        Ok(CompactionResult {
677            files_compacted: candidates.len(),
678            bytes_before,
679            bytes_after,
680            events_compacted: events.len(),
681            duration_ms,
682        })
683    }
684
685    /// Discover tenant ids by scanning `<storage_dir>/<X>/` for
686    /// directories. The migration tool's flat-layout files at the
687    /// root are not picked up — Step 1 #3's migration moves them
688    /// under `default/`.
689    fn discover_tenants(&self) -> Result<Vec<String>> {
690        let Ok(entries) = fs::read_dir(&self.storage_dir) else {
691            return Ok(Vec::new());
692        };
693        let mut tenants: Vec<String> = entries
694            .filter_map(std::result::Result::ok)
695            .filter_map(|entry| {
696                let ft = entry.file_type().ok()?;
697                if !ft.is_dir() {
698                    return None;
699                }
700                let name = entry.file_name().to_string_lossy().into_owned();
701                // Skip the system metadata subtree (Core's own
702                // event-sourced repos) and any hidden folders.
703                if name.starts_with('.') || name == "__system" {
704                    return None;
705                }
706                Some(name)
707            })
708            .collect();
709        tenants.sort();
710        Ok(tenants)
711    }
712
713    /// Get compaction statistics
714    pub fn stats(&self) -> CompactionStats {
715        (*self.stats.read()).clone()
716    }
717
718    /// Get configuration
719    pub fn config(&self) -> &CompactionConfig {
720        &self.config
721    }
722
723    /// Trigger manual compaction
724    #[cfg_attr(feature = "hotpath", hotpath::measure)]
725    pub fn compact_now(&self) -> Result<CompactionResult> {
726        tracing::info!("Manual compaction triggered");
727        self.compact()
728    }
729}
730
731/// Result of a compaction operation
732#[derive(Debug, Clone, Default, Serialize)]
733pub struct CompactionResult {
734    pub files_compacted: usize,
735    pub bytes_before: u64,
736    pub bytes_after: u64,
737    pub events_compacted: usize,
738    pub duration_ms: u64,
739}
740
741/// Format a UTC timestamp as a filename-safe ISO-8601 basic-form
742/// string: `2026-04-27T134567Z` — no colons, no fractional second.
743/// Used for the `<from>-<to>` portion of snapshot filenames so
744/// they're portable across filesystems. `pub(super)` so the
745/// cold-tier archive can reuse the same naming convention.
746pub(super) fn format_iso_basic(t: DateTime<Utc>) -> String {
747    t.format("%Y-%m-%dT%H%M%SZ").to_string()
748}
749
750/// Background compaction task
751pub struct CompactionTask {
752    manager: Arc<CompactionManager>,
753    interval: Duration,
754}
755
756impl CompactionTask {
757    /// Create a new background compaction task
758    pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
759        Self {
760            manager,
761            interval: Duration::from_secs(interval_seconds),
762        }
763    }
764
765    /// Run the compaction task in a loop
766    #[cfg_attr(feature = "hotpath", hotpath::measure)]
767    pub async fn run(self) {
768        let mut interval = tokio::time::interval(self.interval);
769
770        loop {
771            interval.tick().await;
772
773            if self.manager.should_compact() {
774                tracing::debug!("Auto-compaction check triggered");
775
776                match self.manager.compact() {
777                    Ok(result) => {
778                        if result.files_compacted > 0 {
779                            tracing::info!(
780                                "Auto-compaction succeeded: {} files, {:.2} MB saved",
781                                result.files_compacted,
782                                (result.bytes_before - result.bytes_after) as f64
783                                    / (1024.0 * 1024.0)
784                            );
785                        }
786                    }
787                    Err(e) => {
788                        tracing::error!("Auto-compaction failed: {}", e);
789                    }
790                }
791            }
792        }
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799    use tempfile::TempDir;
800
801    #[test]
802    fn test_compaction_manager_creation() {
803        let temp_dir = TempDir::new().unwrap();
804        let config = CompactionConfig::default();
805        let manager = CompactionManager::new(temp_dir.path(), config);
806
807        assert_eq!(manager.stats().total_compactions, 0);
808    }
809
810    #[test]
811    fn test_should_compact() {
812        let temp_dir = TempDir::new().unwrap();
813        let config = CompactionConfig {
814            auto_compact: true,
815            compaction_interval_seconds: 1,
816            ..Default::default()
817        };
818        let manager = CompactionManager::new(temp_dir.path(), config);
819
820        // Should compact on first check (never compacted)
821        assert!(manager.should_compact());
822    }
823
824    #[test]
825    fn test_file_selection_size_based() {
826        let temp_dir = TempDir::new().unwrap();
827        let config = CompactionConfig {
828            small_file_threshold: 1024 * 1024, // 1 MB
829            min_files_to_compact: 2,
830            strategy: CompactionStrategy::SizeBased,
831            ..Default::default()
832        };
833        let manager = CompactionManager::new(temp_dir.path(), config);
834
835        let files = vec![
836            FileInfo {
837                path: PathBuf::from("small1.parquet"),
838                size: 500_000, // 500 KB
839                created: Utc::now(),
840            },
841            FileInfo {
842                path: PathBuf::from("small2.parquet"),
843                size: 600_000, // 600 KB
844                created: Utc::now(),
845            },
846            FileInfo {
847                path: PathBuf::from("large.parquet"),
848                size: 10_000_000, // 10 MB
849                created: Utc::now(),
850            },
851        ];
852
853        let selected = manager.select_files_for_compaction(&files);
854        assert_eq!(selected.len(), 2); // Only the 2 small files
855    }
856
857    #[test]
858    fn test_default_compaction_config() {
859        let config = CompactionConfig::default();
860        assert_eq!(config.min_files_to_compact, 3);
861        assert_eq!(config.target_file_size, 128 * 1024 * 1024);
862        assert_eq!(config.max_file_size, 256 * 1024 * 1024);
863        assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
864        assert_eq!(config.compaction_interval_seconds, 3600);
865        assert!(config.auto_compact);
866        assert_eq!(config.strategy, CompactionStrategy::SizeBased);
867    }
868
869    #[test]
870    fn test_should_compact_disabled() {
871        let temp_dir = TempDir::new().unwrap();
872        let config = CompactionConfig {
873            auto_compact: false,
874            ..Default::default()
875        };
876        let manager = CompactionManager::new(temp_dir.path(), config);
877
878        assert!(!manager.should_compact());
879    }
880
881    #[test]
882    fn test_compact_empty_directory() {
883        let temp_dir = TempDir::new().unwrap();
884        let config = CompactionConfig::default();
885        let manager = CompactionManager::new(temp_dir.path(), config);
886
887        let result = manager.compact().unwrap();
888        assert_eq!(result.files_compacted, 0);
889        assert_eq!(result.bytes_before, 0);
890        assert_eq!(result.bytes_after, 0);
891        assert_eq!(result.events_compacted, 0);
892    }
893
894    #[test]
895    fn test_compact_now() {
896        let temp_dir = TempDir::new().unwrap();
897        let config = CompactionConfig::default();
898        let manager = CompactionManager::new(temp_dir.path(), config);
899
900        let result = manager.compact_now().unwrap();
901        assert_eq!(result.files_compacted, 0);
902    }
903
904    #[test]
905    fn test_get_config() {
906        let temp_dir = TempDir::new().unwrap();
907        let config = CompactionConfig {
908            min_files_to_compact: 5,
909            ..Default::default()
910        };
911        let manager = CompactionManager::new(temp_dir.path(), config);
912
913        assert_eq!(manager.config().min_files_to_compact, 5);
914    }
915
916    #[test]
917    fn test_get_stats() {
918        let temp_dir = TempDir::new().unwrap();
919        let config = CompactionConfig::default();
920        let manager = CompactionManager::new(temp_dir.path(), config);
921
922        let stats = manager.stats();
923        assert_eq!(stats.total_compactions, 0);
924        assert_eq!(stats.total_files_compacted, 0);
925        assert_eq!(stats.total_bytes_before, 0);
926        assert_eq!(stats.total_bytes_after, 0);
927        assert_eq!(stats.total_events_compacted, 0);
928        assert_eq!(stats.last_compaction_duration_ms, 0);
929        assert_eq!(stats.space_saved_bytes, 0);
930    }
931
932    #[test]
933    fn test_file_selection_not_enough_small_files() {
934        let temp_dir = TempDir::new().unwrap();
935        let config = CompactionConfig {
936            small_file_threshold: 1024 * 1024,
937            min_files_to_compact: 3, // Need 3 files
938            strategy: CompactionStrategy::SizeBased,
939            ..Default::default()
940        };
941        let manager = CompactionManager::new(temp_dir.path(), config);
942
943        let files = vec![
944            FileInfo {
945                path: PathBuf::from("small1.parquet"),
946                size: 500_000,
947                created: Utc::now(),
948            },
949            FileInfo {
950                path: PathBuf::from("small2.parquet"),
951                size: 600_000,
952                created: Utc::now(),
953            },
954        ];
955
956        let selected = manager.select_files_for_compaction(&files);
957        assert_eq!(selected.len(), 0); // Not enough small files
958    }
959
960    #[test]
961    fn test_file_selection_time_based() {
962        let temp_dir = TempDir::new().unwrap();
963        let config = CompactionConfig {
964            min_files_to_compact: 2,
965            strategy: CompactionStrategy::TimeBased,
966            ..Default::default()
967        };
968        let manager = CompactionManager::new(temp_dir.path(), config);
969
970        let old_time = Utc::now() - chrono::Duration::hours(48);
971        let files = vec![
972            FileInfo {
973                path: PathBuf::from("old1.parquet"),
974                size: 1_000_000,
975                created: old_time,
976            },
977            FileInfo {
978                path: PathBuf::from("old2.parquet"),
979                size: 2_000_000,
980                created: old_time,
981            },
982            FileInfo {
983                path: PathBuf::from("new.parquet"),
984                size: 500_000,
985                created: Utc::now(),
986            },
987        ];
988
989        let selected = manager.select_files_for_compaction(&files);
990        assert_eq!(selected.len(), 2); // Only the 2 old files
991    }
992
993    #[test]
994    fn test_file_selection_time_based_not_enough() {
995        let temp_dir = TempDir::new().unwrap();
996        let config = CompactionConfig {
997            min_files_to_compact: 3,
998            strategy: CompactionStrategy::TimeBased,
999            ..Default::default()
1000        };
1001        let manager = CompactionManager::new(temp_dir.path(), config);
1002
1003        let old_time = Utc::now() - chrono::Duration::hours(48);
1004        let files = vec![
1005            FileInfo {
1006                path: PathBuf::from("old1.parquet"),
1007                size: 1_000_000,
1008                created: old_time,
1009            },
1010            FileInfo {
1011                path: PathBuf::from("new.parquet"),
1012                size: 500_000,
1013                created: Utc::now(),
1014            },
1015        ];
1016
1017        let selected = manager.select_files_for_compaction(&files);
1018        assert_eq!(selected.len(), 0); // Not enough old files
1019    }
1020
1021    #[test]
1022    fn test_file_selection_full_compaction() {
1023        let temp_dir = TempDir::new().unwrap();
1024        let config = CompactionConfig {
1025            strategy: CompactionStrategy::FullCompaction,
1026            ..Default::default()
1027        };
1028        let manager = CompactionManager::new(temp_dir.path(), config);
1029
1030        let files = vec![
1031            FileInfo {
1032                path: PathBuf::from("file1.parquet"),
1033                size: 1_000_000,
1034                created: Utc::now(),
1035            },
1036            FileInfo {
1037                path: PathBuf::from("file2.parquet"),
1038                size: 2_000_000,
1039                created: Utc::now(),
1040            },
1041        ];
1042
1043        let selected = manager.select_files_for_compaction(&files);
1044        assert_eq!(selected.len(), 2); // All files selected
1045    }
1046
1047    #[test]
1048    fn test_compaction_strategy_serde() {
1049        let strategies = vec![
1050            CompactionStrategy::SizeBased,
1051            CompactionStrategy::TimeBased,
1052            CompactionStrategy::FullCompaction,
1053        ];
1054
1055        for strategy in strategies {
1056            let json = serde_json::to_string(&strategy).unwrap();
1057            let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
1058            assert_eq!(parsed, strategy);
1059        }
1060    }
1061
1062    #[test]
1063    fn test_compaction_stats_default() {
1064        let stats = CompactionStats::default();
1065        assert_eq!(stats.total_compactions, 0);
1066        assert_eq!(stats.total_files_compacted, 0);
1067    }
1068
1069    #[test]
1070    fn test_compaction_stats_serde() {
1071        let stats = CompactionStats {
1072            total_compactions: 5,
1073            total_files_compacted: 20,
1074            total_bytes_before: 1000000,
1075            total_bytes_after: 500000,
1076            total_events_compacted: 10000,
1077            last_compaction_duration_ms: 500,
1078            space_saved_bytes: 500000,
1079        };
1080
1081        let json = serde_json::to_string(&stats).unwrap();
1082        assert!(json.contains("\"total_compactions\":5"));
1083        assert!(json.contains("\"space_saved_bytes\":500000"));
1084    }
1085
1086    #[test]
1087    fn test_compaction_result_serde() {
1088        let result = CompactionResult {
1089            files_compacted: 3,
1090            bytes_before: 1000000,
1091            bytes_after: 500000,
1092            events_compacted: 5000,
1093            duration_ms: 250,
1094        };
1095
1096        let json = serde_json::to_string(&result).unwrap();
1097        assert!(json.contains("\"files_compacted\":3"));
1098        assert!(json.contains("\"bytes_before\":1000000"));
1099    }
1100
1101    #[test]
1102    fn test_compaction_task_creation() {
1103        let temp_dir = TempDir::new().unwrap();
1104        let config = CompactionConfig::default();
1105        let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
1106
1107        let _task = CompactionTask::new(manager.clone(), 60);
1108        // Task created successfully
1109    }
1110
1111    #[test]
1112    fn test_list_parquet_files_empty() {
1113        let temp_dir = TempDir::new().unwrap();
1114        let config = CompactionConfig::default();
1115        let manager = CompactionManager::new(temp_dir.path(), config);
1116
1117        let files = manager.list_parquet_files().unwrap();
1118        assert!(files.is_empty());
1119    }
1120
1121    #[test]
1122    fn test_list_parquet_files_with_non_parquet() {
1123        let temp_dir = TempDir::new().unwrap();
1124        let config = CompactionConfig::default();
1125        let manager = CompactionManager::new(temp_dir.path(), config);
1126
1127        // Create non-parquet files
1128        std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
1129        std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
1130
1131        let files = manager.list_parquet_files().unwrap();
1132        assert!(files.is_empty()); // No parquet files
1133    }
1134
1135    // -----------------------------------------------------------------
1136    // Per-tenant compaction tests (Step 4, commit #2).
1137    // -----------------------------------------------------------------
1138
1139    fn ingest_and_flush_per_call(storage_dir: &std::path::Path, tenant: &str, count: usize) {
1140        // Each ingested batch produces one parquet file when its
1141        // ParquetStorage is dropped, giving us multiple raw files
1142        // per tenant for the strategy filter to pick up.
1143        for i in 0..count {
1144            let storage = ParquetStorage::with_config(
1145                storage_dir,
1146                crate::infrastructure::persistence::ParquetStorageConfig {
1147                    batch_size: 1,
1148                    ..Default::default()
1149                },
1150            )
1151            .unwrap();
1152            let event = crate::domain::entities::Event::from_strings(
1153                "test.event".to_string(),
1154                format!("{tenant}-{i}"),
1155                tenant.to_string(),
1156                serde_json::json!({"i": i}),
1157                None,
1158            )
1159            .unwrap();
1160            storage.append_event(event).unwrap();
1161            storage.flush().unwrap();
1162        }
1163    }
1164
1165    #[test]
1166    fn test_compact_tenant_emits_one_snapshot_and_removes_originals() {
1167        let temp_dir = TempDir::new().unwrap();
1168
1169        // Seed 4 raw files for alice.
1170        ingest_and_flush_per_call(temp_dir.path(), "alice", 4);
1171
1172        let config = CompactionConfig {
1173            min_files_to_compact: 2,
1174            small_file_threshold: 100 * 1024 * 1024,
1175            strategy: CompactionStrategy::SizeBased,
1176            ..Default::default()
1177        };
1178        let manager = CompactionManager::new(temp_dir.path(), config);
1179
1180        let result = manager.compact_tenant("alice").unwrap();
1181        assert_eq!(result.files_compacted, 4);
1182        assert_eq!(result.events_compacted, 4);
1183
1184        // After: zero raw files, exactly one snapshot.* file under
1185        // alice's subtree.
1186        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1187        let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1188        assert_eq!(
1189            alice_files.len(),
1190            1,
1191            "expected exactly one snapshot file for alice"
1192        );
1193
1194        let name = alice_files[0]
1195            .file_name()
1196            .and_then(|n| n.to_str())
1197            .unwrap()
1198            .to_string();
1199        assert!(
1200            name.starts_with("snapshot.alice."),
1201            "expected snapshot prefix, got {name}"
1202        );
1203        assert!(name.ends_with(".parquet"));
1204
1205        // No tmp files left behind.
1206        let tmps: Vec<_> = std::fs::read_dir(alice_files[0].parent().unwrap())
1207            .unwrap()
1208            .filter_map(std::result::Result::ok)
1209            .filter(|e| e.path().to_string_lossy().ends_with(".tmp"))
1210            .collect();
1211        assert!(tmps.is_empty());
1212
1213        // Loaded events round-trip correctly.
1214        let loaded = storage.load_events_for_tenant("alice").unwrap();
1215        assert_eq!(loaded.len(), 4);
1216        for e in &loaded {
1217            assert_eq!(e.tenant_id_str(), "alice");
1218        }
1219    }
1220
1221    #[test]
1222    fn test_compact_tenant_skips_existing_snapshot_files() {
1223        // Seed alice with 4 raw files, run compaction → 1 snapshot.
1224        // Run compaction AGAIN: the snapshot is excluded from the
1225        // candidate set, no qualifying raw files remain, the second
1226        // pass is a no-op.
1227        let temp_dir = TempDir::new().unwrap();
1228        ingest_and_flush_per_call(temp_dir.path(), "alice", 4);
1229
1230        let config = CompactionConfig {
1231            min_files_to_compact: 2,
1232            small_file_threshold: 100 * 1024 * 1024,
1233            ..Default::default()
1234        };
1235        let manager = CompactionManager::new(temp_dir.path(), config);
1236
1237        let r1 = manager.compact_tenant("alice").unwrap();
1238        assert_eq!(r1.files_compacted, 4);
1239
1240        let r2 = manager.compact_tenant("alice").unwrap();
1241        assert_eq!(r2.files_compacted, 0, "snapshot must not be re-compacted");
1242
1243        // Still exactly one file on disk for alice.
1244        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1245        let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1246        assert_eq!(alice_files.len(), 1);
1247    }
1248
1249    #[test]
1250    fn test_compact_tenant_below_threshold_is_a_noop() {
1251        // 1 file < min_files_to_compact (default 3) → nothing happens.
1252        let temp_dir = TempDir::new().unwrap();
1253        ingest_and_flush_per_call(temp_dir.path(), "alice", 1);
1254
1255        let manager = CompactionManager::new(temp_dir.path(), CompactionConfig::default());
1256        let result = manager.compact_tenant("alice").unwrap();
1257        assert_eq!(result.files_compacted, 0);
1258
1259        // Raw file untouched.
1260        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1261        let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1262        assert_eq!(alice_files.len(), 1);
1263        let name = alice_files[0]
1264            .file_name()
1265            .unwrap()
1266            .to_string_lossy()
1267            .into_owned();
1268        assert!(
1269            !name.starts_with("snapshot."),
1270            "raw file must not be renamed"
1271        );
1272    }
1273
1274    #[test]
1275    fn test_compact_iterates_every_tenant() {
1276        // Two tenants each with enough raw files. compact() must
1277        // produce one snapshot per tenant.
1278        let temp_dir = TempDir::new().unwrap();
1279        ingest_and_flush_per_call(temp_dir.path(), "alice", 3);
1280        ingest_and_flush_per_call(temp_dir.path(), "bob", 3);
1281
1282        let config = CompactionConfig {
1283            min_files_to_compact: 2,
1284            small_file_threshold: 100 * 1024 * 1024,
1285            strategy: CompactionStrategy::SizeBased,
1286            ..Default::default()
1287        };
1288        let manager = CompactionManager::new(temp_dir.path(), config);
1289
1290        let result = manager.compact().unwrap();
1291        assert_eq!(result.files_compacted, 6);
1292        assert_eq!(result.events_compacted, 6);
1293
1294        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1295        for tenant in ["alice", "bob"] {
1296            let files = storage.list_parquet_files_for_tenant(tenant).unwrap();
1297            assert_eq!(files.len(), 1, "{tenant} should have one snapshot");
1298            let name = files[0].file_name().unwrap().to_string_lossy().into_owned();
1299            assert!(name.starts_with(&format!("snapshot.{tenant}.")));
1300        }
1301    }
1302
1303    #[test]
1304    fn test_retention_drops_events_older_than_ttl() {
1305        // Bead's integration test: ingest 100 events spanning 60
1306        // days, run compaction with 30-day TTL, only the last
1307        // 30 days remain queryable.
1308        let temp_dir = TempDir::new().unwrap();
1309
1310        // Seed alice with 100 events, timestamps spread across
1311        // 60 days. We can't backdate via ingest (timestamp = now()
1312        // in domain), so write parquet directly via flush of
1313        // back-dated events.
1314        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1315        let now = Utc::now();
1316        for i in 0..100 {
1317            // Day 0 = 60 days ago; day 99 ≈ now. Spreads evenly.
1318            let day_offset = 60 - (i * 60 / 99);
1319            let ts = now - chrono::Duration::days(i64::from(day_offset));
1320            let event = crate::domain::entities::Event::reconstruct_from_strings(
1321                uuid::Uuid::new_v4(),
1322                "test.event".to_string(),
1323                format!("e-{i}"),
1324                "alice".to_string(),
1325                serde_json::json!({"i": i}),
1326                ts,
1327                None,
1328                1,
1329            );
1330            storage.append_event(event).unwrap();
1331            // Every 10 events get a fresh storage to produce a
1332            // separate file (so compaction has multiple files
1333            // to merge).
1334            if i % 10 == 9 {
1335                storage.flush().unwrap();
1336            }
1337        }
1338        storage.flush().unwrap();
1339
1340        // 30-day TTL for alice via per-tenant override.
1341        let mut retention = RetentionConfig::default();
1342        retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
1343        let config = CompactionConfig {
1344            min_files_to_compact: 2,
1345            small_file_threshold: 100 * 1024 * 1024,
1346            strategy: CompactionStrategy::SizeBased,
1347            retention,
1348            ..Default::default()
1349        };
1350        let manager = CompactionManager::new(temp_dir.path(), config);
1351
1352        let result = manager.compact_tenant("alice").unwrap();
1353        assert!(result.events_compacted > 0);
1354        assert!(
1355            result.events_compacted < 100,
1356            "retention should have dropped some events; kept {} of 100",
1357            result.events_compacted
1358        );
1359
1360        // Re-load to confirm the dropped events are gone.
1361        let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
1362        let loaded = storage2.load_events_for_tenant("alice").unwrap();
1363        assert_eq!(loaded.len(), result.events_compacted);
1364
1365        // Every loaded event must be within the 30-day window
1366        // (with a generous fudge for test-clock drift).
1367        let cutoff = Utc::now() - chrono::Duration::days(30);
1368        for e in &loaded {
1369            assert!(
1370                e.timestamp >= cutoff - chrono::Duration::seconds(60),
1371                "event with ts {} survived retention but is older than cutoff {}",
1372                e.timestamp.to_rfc3339(),
1373                cutoff.to_rfc3339()
1374            );
1375        }
1376    }
1377
1378    #[test]
1379    fn test_retention_keeps_forever_by_default_for_non_system_tenants() {
1380        // alice has no override → falls through to default_ttl
1381        // which is None. All events kept regardless of age.
1382        let temp_dir = TempDir::new().unwrap();
1383        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1384        let now = Utc::now();
1385        for i in 0..6 {
1386            let ts = now - chrono::Duration::days(i * 365);
1387            let event = crate::domain::entities::Event::reconstruct_from_strings(
1388                uuid::Uuid::new_v4(),
1389                "test.event".to_string(),
1390                format!("e-{i}"),
1391                "alice".to_string(),
1392                serde_json::json!({"i": i}),
1393                ts,
1394                None,
1395                1,
1396            );
1397            storage.append_event(event).unwrap();
1398            if i % 2 == 1 {
1399                storage.flush().unwrap();
1400            }
1401        }
1402        storage.flush().unwrap();
1403
1404        // Default RetentionConfig: alice has no entry, default_ttl = None.
1405        let config = CompactionConfig {
1406            min_files_to_compact: 2,
1407            small_file_threshold: 100 * 1024 * 1024,
1408            strategy: CompactionStrategy::SizeBased,
1409            ..Default::default()
1410        };
1411        let manager = CompactionManager::new(temp_dir.path(), config);
1412        let result = manager.compact_tenant("alice").unwrap();
1413        assert_eq!(result.events_compacted, 6, "no events should be dropped");
1414    }
1415
1416    #[test]
1417    fn test_retention_system_tenant_default_is_30_days() {
1418        // The system tenant's 30-day TTL is the bead's headline
1419        // requirement. Default config without any overrides
1420        // should already enforce it.
1421        let cfg = RetentionConfig::default();
1422        let ttl = cfg.ttl_for("system").unwrap();
1423        assert_eq!(ttl.as_secs(), 30 * 24 * 3600);
1424        // No override for arbitrary tenants → keep forever.
1425        assert!(cfg.ttl_for("acme").is_none());
1426    }
1427
1428    #[test]
1429    fn test_retention_drops_all_events_deletes_originals_without_snapshot() {
1430        // Edge case: every event is past the TTL. We delete the
1431        // raw files and emit no snapshot — there's nothing to
1432        // write. Tenant ends with zero files on disk.
1433        let temp_dir = TempDir::new().unwrap();
1434        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1435        let very_old = Utc::now() - chrono::Duration::days(90);
1436        for i in 0..6 {
1437            let event = crate::domain::entities::Event::reconstruct_from_strings(
1438                uuid::Uuid::new_v4(),
1439                "test.event".to_string(),
1440                format!("e-{i}"),
1441                "alice".to_string(),
1442                serde_json::json!({"i": i}),
1443                very_old,
1444                None,
1445                1,
1446            );
1447            storage.append_event(event).unwrap();
1448            if i % 2 == 1 {
1449                storage.flush().unwrap();
1450            }
1451        }
1452        storage.flush().unwrap();
1453
1454        let mut retention = RetentionConfig::default();
1455        retention.set("alice", Some(Duration::from_secs(7 * 24 * 3600)));
1456        let config = CompactionConfig {
1457            min_files_to_compact: 2,
1458            small_file_threshold: 100 * 1024 * 1024,
1459            strategy: CompactionStrategy::SizeBased,
1460            retention,
1461            ..Default::default()
1462        };
1463        let manager = CompactionManager::new(temp_dir.path(), config);
1464        let result = manager.compact_tenant("alice").unwrap();
1465        assert_eq!(result.events_compacted, 0);
1466        assert!(result.files_compacted >= 2); // originals were deleted
1467
1468        // Tenant subtree has zero parquet files now.
1469        let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
1470        let alice_files = storage2.list_parquet_files_for_tenant("alice").unwrap();
1471        assert!(alice_files.is_empty(), "all originals should be deleted");
1472    }
1473
1474    #[test]
1475    fn test_compaction_with_simulated_crash_leaves_data_recoverable() {
1476        // AC #7-#8: simulate "crash mid-snapshot" by manually
1477        // dropping a tmp file in the partition, then asserting
1478        // a fresh ParquetStorage::new boot cleans it up and the
1479        // raw files (which would still be present in a real
1480        // mid-rename crash) remain queryable.
1481        let temp_dir = TempDir::new().unwrap();
1482
1483        // Seed alice with raw files.
1484        ingest_and_flush_per_call(temp_dir.path(), "alice", 3);
1485
1486        // Locate alice's partition dir.
1487        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1488        let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1489        let partition = alice_files[0].parent().unwrap().to_path_buf();
1490
1491        // Simulate the crash state: write a partial .tmp file as
1492        // if write_atomic_parquet had crashed mid-rename.
1493        let crashed_tmp = partition.join("snapshot.alice.range.parquet.tmp");
1494        std::fs::write(&crashed_tmp, b"partial parquet bytes").unwrap();
1495        assert!(crashed_tmp.is_file());
1496
1497        // Reboot — ParquetStorage::new triggers cleanup_partial_writes.
1498        let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
1499        assert!(
1500            !crashed_tmp.exists(),
1501            "stale tmp file should have been cleaned by ParquetStorage::new"
1502        );
1503
1504        // Raw files survived; events queryable.
1505        let events = storage2.load_events_for_tenant("alice").unwrap();
1506        assert_eq!(events.len(), 3);
1507    }
1508
1509    #[test]
1510    fn test_cold_tier_archives_dropped_events_before_deletion() {
1511        // Cold-tier integration: when retention drops events AND an
1512        // archive target is configured, the dropped events end up
1513        // in the archive root before originals are removed. This is
1514        // the load-bearing property — without archive-before-delete
1515        // the cold tier would silently lose data on retention runs.
1516        use crate::infrastructure::persistence::cold_tier::LocalFsArchive;
1517
1518        let live_dir = TempDir::new().unwrap();
1519        let archive_dir = TempDir::new().unwrap();
1520
1521        // Seed alice with 50 events spread across 60 days.
1522        let storage = ParquetStorage::new(live_dir.path()).unwrap();
1523        let now = Utc::now();
1524        for i in 0..50 {
1525            let day_offset = 60 - (i * 60 / 49);
1526            let ts = now - chrono::Duration::days(i64::from(day_offset));
1527            let event = crate::domain::entities::Event::reconstruct_from_strings(
1528                uuid::Uuid::new_v4(),
1529                "test.event".to_string(),
1530                format!("e-{i}"),
1531                "alice".to_string(),
1532                serde_json::json!({"i": i}),
1533                ts,
1534                None,
1535                1,
1536            );
1537            storage.append_event(event).unwrap();
1538            if i % 5 == 4 {
1539                storage.flush().unwrap();
1540            }
1541        }
1542        storage.flush().unwrap();
1543
1544        // 30-day TTL + cold-tier archive.
1545        let mut retention = RetentionConfig::default();
1546        retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
1547        let archive: Arc<dyn ArchiveTarget> =
1548            Arc::new(LocalFsArchive::new(archive_dir.path()).unwrap());
1549        let config = CompactionConfig {
1550            min_files_to_compact: 2,
1551            small_file_threshold: 100 * 1024 * 1024,
1552            strategy: CompactionStrategy::SizeBased,
1553            retention,
1554            archive: Some(archive),
1555            ..Default::default()
1556        };
1557        let manager = CompactionManager::new(live_dir.path(), config);
1558
1559        let result = manager.compact_tenant("alice").unwrap();
1560        assert!(result.events_compacted > 0, "some events kept");
1561        assert!(
1562            result.events_compacted < 50,
1563            "some events dropped to retention; kept {} of 50",
1564            result.events_compacted
1565        );
1566
1567        // Live storage: only events within the 30-day window.
1568        let live_after = ParquetStorage::new(live_dir.path())
1569            .unwrap()
1570            .load_events_for_tenant("alice")
1571            .unwrap();
1572        assert_eq!(live_after.len(), result.events_compacted);
1573
1574        // Archive: contains the dropped events. Walk the archive root
1575        // and load any archive.alice.* file we find.
1576        let mut archive_files = vec![];
1577        let mut stack = vec![archive_dir.path().to_path_buf()];
1578        while let Some(d) = stack.pop() {
1579            for entry in std::fs::read_dir(&d).unwrap().flatten() {
1580                let p = entry.path();
1581                if p.is_dir() {
1582                    stack.push(p);
1583                } else if p
1584                    .file_name()
1585                    .is_some_and(|n| n.to_string_lossy().starts_with("archive.alice."))
1586                {
1587                    archive_files.push(p);
1588                }
1589            }
1590        }
1591        assert!(
1592            !archive_files.is_empty(),
1593            "archive directory must contain at least one archive.alice.* file"
1594        );
1595
1596        // Sum events across archive files; total live + archived
1597        // must equal the original 50 (no events lost in the pipeline).
1598        let archive_storage = ParquetStorage::new(archive_dir.path()).unwrap();
1599        let archived = archive_storage.load_events_for_tenant("alice").unwrap();
1600        assert_eq!(
1601            live_after.len() + archived.len(),
1602            50,
1603            "live + archived must equal original event count (live={}, archived={})",
1604            live_after.len(),
1605            archived.len()
1606        );
1607    }
1608
1609    #[test]
1610    fn test_cold_tier_failure_keeps_originals_on_disk() {
1611        // Crash-safety contract: a failing archive must NOT delete
1612        // originals. We use a custom ArchiveTarget that always
1613        // returns Err to simulate an outage.
1614        let live_dir = TempDir::new().unwrap();
1615        let storage = ParquetStorage::new(live_dir.path()).unwrap();
1616        let now = Utc::now();
1617        for i in 0..20 {
1618            let ts = now - chrono::Duration::days(60 - i);
1619            let event = crate::domain::entities::Event::reconstruct_from_strings(
1620                uuid::Uuid::new_v4(),
1621                "test.event".to_string(),
1622                format!("e-{i}"),
1623                "alice".to_string(),
1624                serde_json::json!({"i": i}),
1625                ts,
1626                None,
1627                1,
1628            );
1629            storage.append_event(event).unwrap();
1630            if i % 5 == 4 {
1631                storage.flush().unwrap();
1632            }
1633        }
1634        storage.flush().unwrap();
1635
1636        // Count files before. The compaction pipeline should leave
1637        // them all on disk after the failed archive.
1638        let count_files = |dir: &std::path::Path| -> usize {
1639            let mut n = 0;
1640            let mut stack = vec![dir.to_path_buf()];
1641            while let Some(d) = stack.pop() {
1642                for entry in std::fs::read_dir(&d).unwrap().flatten() {
1643                    let p = entry.path();
1644                    if p.is_dir() {
1645                        stack.push(p);
1646                    } else if p.extension().is_some_and(|e| e == "parquet") {
1647                        n += 1;
1648                    }
1649                }
1650            }
1651            n
1652        };
1653        let before = count_files(live_dir.path());
1654        assert!(before > 0);
1655
1656        #[derive(Debug)]
1657        struct FailingArchive;
1658        impl ArchiveTarget for FailingArchive {
1659            fn archive(
1660                &self,
1661                _: &str,
1662                _: DateTime<Utc>,
1663                _: DateTime<Utc>,
1664                _: &[crate::domain::entities::Event],
1665            ) -> Result<()> {
1666                Err(AllSourceError::StorageError(
1667                    "simulated archive outage".to_string(),
1668                ))
1669            }
1670        }
1671
1672        let mut retention = RetentionConfig::default();
1673        retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
1674        let config = CompactionConfig {
1675            min_files_to_compact: 2,
1676            small_file_threshold: 100 * 1024 * 1024,
1677            strategy: CompactionStrategy::SizeBased,
1678            retention,
1679            archive: Some(Arc::new(FailingArchive) as Arc<dyn ArchiveTarget>),
1680            ..Default::default()
1681        };
1682        let manager = CompactionManager::new(live_dir.path(), config);
1683
1684        let result = manager.compact_tenant("alice");
1685        assert!(result.is_err(), "compaction must fail when archive fails");
1686
1687        // Originals still on disk — every event still queryable.
1688        let after = count_files(live_dir.path());
1689        assert_eq!(
1690            before, after,
1691            "no files should be removed after archive failure"
1692        );
1693
1694        let storage2 = ParquetStorage::new(live_dir.path()).unwrap();
1695        let loaded = storage2.load_events_for_tenant("alice").unwrap();
1696        assert_eq!(
1697            loaded.len(),
1698            20,
1699            "all 20 events still present after failed archive"
1700        );
1701    }
1702
1703    #[test]
1704    fn test_cold_tier_not_invoked_when_no_events_dropped() {
1705        // If retention drops zero events (e.g. tenant has no TTL),
1706        // the archive target must NOT be called. We assert by using
1707        // an archive that panics on call.
1708        let live_dir = TempDir::new().unwrap();
1709        let storage = ParquetStorage::new(live_dir.path()).unwrap();
1710        let now = Utc::now();
1711        for i in 0..10 {
1712            let ts = now - chrono::Duration::hours(i);
1713            let event = crate::domain::entities::Event::reconstruct_from_strings(
1714                uuid::Uuid::new_v4(),
1715                "test.event".to_string(),
1716                format!("e-{i}"),
1717                "alice".to_string(),
1718                serde_json::json!({"i": i}),
1719                ts,
1720                None,
1721                1,
1722            );
1723            storage.append_event(event).unwrap();
1724            if i % 3 == 2 {
1725                storage.flush().unwrap();
1726            }
1727        }
1728        storage.flush().unwrap();
1729
1730        #[derive(Debug)]
1731        struct PanickingArchive;
1732        impl ArchiveTarget for PanickingArchive {
1733            fn archive(
1734                &self,
1735                _: &str,
1736                _: DateTime<Utc>,
1737                _: DateTime<Utc>,
1738                _: &[crate::domain::entities::Event],
1739            ) -> Result<()> {
1740                panic!("archive must not be called when no events are dropped");
1741            }
1742        }
1743
1744        // Default retention → alice has no TTL → no events dropped.
1745        let config = CompactionConfig {
1746            min_files_to_compact: 2,
1747            small_file_threshold: 100 * 1024 * 1024,
1748            strategy: CompactionStrategy::SizeBased,
1749            archive: Some(Arc::new(PanickingArchive) as Arc<dyn ArchiveTarget>),
1750            ..Default::default()
1751        };
1752        let manager = CompactionManager::new(live_dir.path(), config);
1753        let result = manager.compact_tenant("alice").unwrap();
1754        assert_eq!(result.events_compacted, 10);
1755    }
1756
1757    #[test]
1758    fn test_discover_tenants_skips_system_and_hidden() {
1759        // Ensure the tenant scan doesn't pick up __system or hidden
1760        // directories — they're internal, not real tenants.
1761        let temp_dir = TempDir::new().unwrap();
1762        std::fs::create_dir_all(temp_dir.path().join("alice")).unwrap();
1763        std::fs::create_dir_all(temp_dir.path().join("bob")).unwrap();
1764        std::fs::create_dir_all(temp_dir.path().join("__system")).unwrap();
1765        std::fs::create_dir_all(temp_dir.path().join(".hidden")).unwrap();
1766
1767        let manager = CompactionManager::new(temp_dir.path(), CompactionConfig::default());
1768        let tenants = manager.discover_tenants().unwrap();
1769        assert_eq!(tenants, vec!["alice".to_string(), "bob".to_string()]);
1770    }
1771}