use crate::{
domain::entities::Event,
error::{AllSourceError, Result},
infrastructure::persistence::storage::ParquetStorage,
};
use chrono::{DateTime, Utc};
use std::{
fmt,
path::{Path, PathBuf},
sync::Arc,
};
pub trait ArchiveTarget: Send + Sync + fmt::Debug {
fn archive(
&self,
tenant_id: &str,
from: DateTime<Utc>,
to: DateTime<Utc>,
events: &[Event],
) -> Result<()>;
fn description(&self) -> String {
format!("{self:?}")
}
}
pub struct LocalFsArchive {
storage: Arc<ParquetStorage>,
root: PathBuf,
}
impl LocalFsArchive {
pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
let root: PathBuf = root.into();
let storage = ParquetStorage::new(&root).map_err(|e| {
AllSourceError::StorageError(format!(
"cold-tier archive: failed to open ParquetStorage at {}: {e}",
root.display()
))
})?;
Ok(Self {
storage: Arc::new(storage),
root,
})
}
pub fn root(&self) -> &Path {
&self.root
}
}
impl fmt::Debug for LocalFsArchive {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalFsArchive")
.field("root", &self.root)
.finish()
}
}
impl ArchiveTarget for LocalFsArchive {
fn archive(
&self,
tenant_id: &str,
from: DateTime<Utc>,
to: DateTime<Utc>,
events: &[Event],
) -> Result<()> {
if events.is_empty() {
return Ok(());
}
let file_stem = format!(
"archive.{tenant_id}.{}-{}",
super::compaction::format_iso_basic(from),
super::compaction::format_iso_basic(to)
);
let path = self
.storage
.write_atomic_parquet(tenant_id, &file_stem, events)?;
tracing::info!(
tenant_id = tenant_id,
archive_path = %path.display(),
events = events.len(),
from = %from.to_rfc3339(),
to = %to.to_rfc3339(),
"cold-tier archive: wrote dropped events"
);
Ok(())
}
fn description(&self) -> String {
format!("local-fs:{}", self.root.display())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::entities::Event;
use chrono::Duration;
use serde_json::json;
use tempfile::TempDir;
use uuid::Uuid;
fn make_event(tenant: &str, ts: DateTime<Utc>) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
"test.event".to_string(),
"entity-1".to_string(),
tenant.to_string(),
json!({"x": 1}),
ts,
None,
1,
)
}
#[test]
fn test_local_fs_archive_writes_one_file() {
let dir = TempDir::new().unwrap();
let archive = LocalFsArchive::new(dir.path()).unwrap();
let now = Utc::now();
let events: Vec<_> = (0..10)
.map(|i| make_event("acme", now - Duration::days(i)))
.collect();
let from = events.iter().map(|e| e.timestamp).min().unwrap();
let to = events.iter().map(|e| e.timestamp).max().unwrap();
archive.archive("acme", from, to, &events).unwrap();
let mut found = vec![];
let mut stack = vec![dir.path().to_path_buf()];
while let Some(d) = stack.pop() {
for entry in std::fs::read_dir(&d).unwrap().flatten() {
let p = entry.path();
if p.is_dir() {
stack.push(p);
} else if p.extension().is_some_and(|e| e == "parquet") {
found.push(p);
}
}
}
assert_eq!(found.len(), 1, "exactly one archive file");
let name = found[0].file_name().unwrap().to_string_lossy().to_string();
assert!(
name.starts_with("archive.acme."),
"filename starts with archive.<tenant>., got {name}"
);
assert!(
found[0].to_string_lossy().contains("/acme/"),
"path contains tenant subdir: {}",
found[0].display()
);
}
#[test]
fn test_local_fs_archive_empty_batch_is_noop() {
let dir = TempDir::new().unwrap();
let archive = LocalFsArchive::new(dir.path()).unwrap();
let now = Utc::now();
archive.archive("acme", now, now, &[]).unwrap();
let mut count = 0;
let mut stack = vec![dir.path().to_path_buf()];
while let Some(d) = stack.pop() {
for entry in std::fs::read_dir(&d).unwrap().flatten() {
let p = entry.path();
if p.is_dir() {
stack.push(p);
} else if p.extension().is_some_and(|e| e == "parquet") {
count += 1;
}
}
}
assert_eq!(count, 0);
}
#[test]
fn test_local_fs_archive_idempotent_on_same_window() {
let dir = TempDir::new().unwrap();
let archive = LocalFsArchive::new(dir.path()).unwrap();
let now = Utc::now();
let events: Vec<_> = (0..3)
.map(|i| make_event("acme", now - Duration::hours(i)))
.collect();
let from = events.iter().map(|e| e.timestamp).min().unwrap();
let to = events.iter().map(|e| e.timestamp).max().unwrap();
archive.archive("acme", from, to, &events).unwrap();
archive.archive("acme", from, to, &events).unwrap();
let mut count = 0;
let mut stack = vec![dir.path().to_path_buf()];
while let Some(d) = stack.pop() {
for entry in std::fs::read_dir(&d).unwrap().flatten() {
let p = entry.path();
if p.is_dir() {
stack.push(p);
} else if p.extension().is_some_and(|e| e == "parquet") {
count += 1;
}
}
}
assert!(count == 1 || count == 2, "got {count} archive files");
}
#[test]
fn test_archive_target_description_includes_root() {
let dir = TempDir::new().unwrap();
let archive = LocalFsArchive::new(dir.path()).unwrap();
let desc = archive.description();
assert!(desc.starts_with("local-fs:"), "got {desc}");
assert!(
desc.contains(&dir.path().display().to_string()),
"got {desc}"
);
}
}