Skip to main content

allsource_core/infrastructure/persistence/
cold_tier.rs

1//! Cold-tier archive for events past their retention TTL.
2//!
3//! Step 5 (per-tenant retention) deletes events older than the
4//! tenant's TTL. For tenants where "delete" is too aggressive — audit
5//! tenants, compliance-driven workloads, anyone who'd want to answer
6//! "what happened on day X" months later — this module adds a layer
7//! between retention and deletion: an `ArchiveTarget` that receives
8//! the dropped events first.
9//!
10//! ## Crash safety
11//!
12//! The compaction pipeline calls `archive(...)` BEFORE deleting any
13//! original Parquet file. A failed archive returns `Err`, which makes
14//! `compact_tenant` short-circuit; originals stay on disk and the next
15//! compaction pass retries. There is no "archive then forget" path
16//! that risks data loss.
17//!
18//! ## Backends
19//!
20//! - `LocalFsArchive` (this file): copies the events into a
21//!   `<root>/<tenant>/<yyyy-mm>/archive-<from>-<to>.parquet` tree on
22//!   another filesystem. Use this for a separate cheap-disk volume,
23//!   or for tests. The directory tree mirrors the live tenant layout
24//!   so a future operator can grep for tenants the same way.
25//! - S3/R2/GCS backends: deferred behind a feature flag. The
26//!   `ArchiveTarget` trait is the integration seam — drop in an
27//!   `object_store::ObjectStore`-backed impl when needed.
28
29use crate::{
30    domain::entities::Event,
31    error::{AllSourceError, Result},
32    infrastructure::persistence::storage::ParquetStorage,
33};
34use chrono::{DateTime, Utc};
35use std::{
36    fmt,
37    path::{Path, PathBuf},
38    sync::Arc,
39};
40
41/// Archive target for events past their retention TTL.
42///
43/// Implementations MUST be crash-safe: a failed `archive` call must
44/// return an error that the caller can act on. Partial writes that
45/// claim success would cause silent data loss when compaction then
46/// deletes the originals.
47///
48/// Implementations MUST be idempotent on repeat calls with the same
49/// `(tenant_id, from, to)` window. Compaction may retry after a
50/// transient failure; reprocessing the same batch must not corrupt
51/// the archive or lose events.
52pub trait ArchiveTarget: Send + Sync + fmt::Debug {
53    /// Archive a batch of events scoped to `tenant_id` and the time
54    /// window `[from, to]`. Returns `Ok(())` only if the data is
55    /// durably persisted in the target.
56    fn archive(
57        &self,
58        tenant_id: &str,
59        from: DateTime<Utc>,
60        to: DateTime<Utc>,
61        events: &[Event],
62    ) -> Result<()>;
63
64    /// Human-readable description of the backend, for logging.
65    /// Defaults to the `Debug` representation.
66    fn description(&self) -> String {
67        format!("{self:?}")
68    }
69}
70
71/// Local-filesystem archive. Copies events to
72/// `<root>/<tenant>/<yyyy-mm>/archive-<from>-<to>.parquet` using the
73/// same atomic-write primitive as live storage.
74///
75/// Best fit: a separate, cheaper, possibly larger volume than the hot
76/// path uses. NOT a replacement for off-site backup — same datacenter
77/// failure modes apply. For multi-region durability, use an
78/// object-store backend.
79pub struct LocalFsArchive {
80    /// Backing storage. Reused so the archive directory walks the
81    /// same tenant-partitioned tree shape and benefits from the same
82    /// atomic-write primitive (tmp + fsync + rename + fsync dir).
83    storage: Arc<ParquetStorage>,
84    /// Where the archive root lives. Only retained for `description`
85    /// / logging — the storage Arc carries the working path.
86    root: PathBuf,
87}
88
89impl LocalFsArchive {
90    /// Create a new local-filesystem archive rooted at `root`.
91    /// The directory is created if it doesn't exist. Returns `Err`
92    /// if the path is unusable (no permissions, points at a file,
93    /// etc).
94    pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
95        let root: PathBuf = root.into();
96        let storage = ParquetStorage::new(&root).map_err(|e| {
97            AllSourceError::StorageError(format!(
98                "cold-tier archive: failed to open ParquetStorage at {}: {e}",
99                root.display()
100            ))
101        })?;
102        Ok(Self {
103            storage: Arc::new(storage),
104            root,
105        })
106    }
107
108    /// Path to the archive root (for tests and logging).
109    pub fn root(&self) -> &Path {
110        &self.root
111    }
112}
113
114impl fmt::Debug for LocalFsArchive {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        f.debug_struct("LocalFsArchive")
117            .field("root", &self.root)
118            .finish()
119    }
120}
121
122impl ArchiveTarget for LocalFsArchive {
123    fn archive(
124        &self,
125        tenant_id: &str,
126        from: DateTime<Utc>,
127        to: DateTime<Utc>,
128        events: &[Event],
129    ) -> Result<()> {
130        if events.is_empty() {
131            return Ok(());
132        }
133
134        // Filename mirrors the live snapshot convention so an operator
135        // pulling from cold storage can correlate easily.
136        let file_stem = format!(
137            "archive.{tenant_id}.{}-{}",
138            super::compaction::format_iso_basic(from),
139            super::compaction::format_iso_basic(to)
140        );
141        let path = self
142            .storage
143            .write_atomic_parquet(tenant_id, &file_stem, events)?;
144        tracing::info!(
145            tenant_id = tenant_id,
146            archive_path = %path.display(),
147            events = events.len(),
148            from = %from.to_rfc3339(),
149            to = %to.to_rfc3339(),
150            "cold-tier archive: wrote dropped events"
151        );
152        Ok(())
153    }
154
155    fn description(&self) -> String {
156        format!("local-fs:{}", self.root.display())
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::domain::entities::Event;
164    use chrono::Duration;
165    use serde_json::json;
166    use tempfile::TempDir;
167    use uuid::Uuid;
168
169    fn make_event(tenant: &str, ts: DateTime<Utc>) -> Event {
170        Event::reconstruct_from_strings(
171            Uuid::new_v4(),
172            "test.event".to_string(),
173            "entity-1".to_string(),
174            tenant.to_string(),
175            json!({"x": 1}),
176            ts,
177            None,
178            1,
179        )
180    }
181
182    #[test]
183    fn test_local_fs_archive_writes_one_file() {
184        let dir = TempDir::new().unwrap();
185        let archive = LocalFsArchive::new(dir.path()).unwrap();
186
187        let now = Utc::now();
188        let events: Vec<_> = (0..10)
189            .map(|i| make_event("acme", now - Duration::days(i)))
190            .collect();
191        let from = events.iter().map(|e| e.timestamp).min().unwrap();
192        let to = events.iter().map(|e| e.timestamp).max().unwrap();
193
194        archive.archive("acme", from, to, &events).unwrap();
195
196        // Walk the archive root looking for the file.
197        let mut found = vec![];
198        let mut stack = vec![dir.path().to_path_buf()];
199        while let Some(d) = stack.pop() {
200            for entry in std::fs::read_dir(&d).unwrap().flatten() {
201                let p = entry.path();
202                if p.is_dir() {
203                    stack.push(p);
204                } else if p.extension().is_some_and(|e| e == "parquet") {
205                    found.push(p);
206                }
207            }
208        }
209        assert_eq!(found.len(), 1, "exactly one archive file");
210        let name = found[0].file_name().unwrap().to_string_lossy().to_string();
211        assert!(
212            name.starts_with("archive.acme."),
213            "filename starts with archive.<tenant>., got {name}"
214        );
215        assert!(
216            found[0].to_string_lossy().contains("/acme/"),
217            "path contains tenant subdir: {}",
218            found[0].display()
219        );
220    }
221
222    #[test]
223    fn test_local_fs_archive_empty_batch_is_noop() {
224        let dir = TempDir::new().unwrap();
225        let archive = LocalFsArchive::new(dir.path()).unwrap();
226        let now = Utc::now();
227        archive.archive("acme", now, now, &[]).unwrap();
228
229        // No files should have been written.
230        let mut count = 0;
231        let mut stack = vec![dir.path().to_path_buf()];
232        while let Some(d) = stack.pop() {
233            for entry in std::fs::read_dir(&d).unwrap().flatten() {
234                let p = entry.path();
235                if p.is_dir() {
236                    stack.push(p);
237                } else if p.extension().is_some_and(|e| e == "parquet") {
238                    count += 1;
239                }
240            }
241        }
242        assert_eq!(count, 0);
243    }
244
245    #[test]
246    fn test_local_fs_archive_idempotent_on_same_window() {
247        // Calling archive twice with the same (tenant, from, to) MUST
248        // NOT corrupt either file. Filenames are
249        // archive.<tenant>.<from>-<to>.parquet via format_iso_basic;
250        // a second call to write_atomic_parquet appends a unique
251        // suffix to avoid collisions while keeping the archive tree
252        // self-consistent.
253        let dir = TempDir::new().unwrap();
254        let archive = LocalFsArchive::new(dir.path()).unwrap();
255
256        let now = Utc::now();
257        let events: Vec<_> = (0..3)
258            .map(|i| make_event("acme", now - Duration::hours(i)))
259            .collect();
260        let from = events.iter().map(|e| e.timestamp).min().unwrap();
261        let to = events.iter().map(|e| e.timestamp).max().unwrap();
262
263        archive.archive("acme", from, to, &events).unwrap();
264        archive.archive("acme", from, to, &events).unwrap();
265
266        let mut count = 0;
267        let mut stack = vec![dir.path().to_path_buf()];
268        while let Some(d) = stack.pop() {
269            for entry in std::fs::read_dir(&d).unwrap().flatten() {
270                let p = entry.path();
271                if p.is_dir() {
272                    stack.push(p);
273                } else if p.extension().is_some_and(|e| e == "parquet") {
274                    count += 1;
275                }
276            }
277        }
278        // Both calls succeeded — either the implementation
279        // overwrites (count = 1) or appends with a unique suffix
280        // (count = 2). Either is acceptable; the contract is "no
281        // error, no corruption, no lost events."
282        assert!(count == 1 || count == 2, "got {count} archive files");
283    }
284
285    #[test]
286    fn test_archive_target_description_includes_root() {
287        let dir = TempDir::new().unwrap();
288        let archive = LocalFsArchive::new(dir.path()).unwrap();
289        let desc = archive.description();
290        assert!(desc.starts_with("local-fs:"), "got {desc}");
291        assert!(
292            desc.contains(&dir.path().display().to_string()),
293            "got {desc}"
294        );
295    }
296}