use crate::authn::audit::archive::AuditArchiver;
use crate::authn::event::AuthEvent;
use chrono::Datelike;
use std::io::Write as _;
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Debug, thiserror::Error)]
pub enum FilesystemArchiveError {
#[error("serialise event to JSON: {0}")]
Serialize(#[from] serde_json::Error),
#[error("filesystem I/O: {0}")]
Io(#[from] std::io::Error),
}
#[derive(Clone)]
pub struct FilesystemAuditArchiver {
root: PathBuf,
write_lock: Arc<tokio::sync::Mutex<()>>,
}
impl FilesystemAuditArchiver {
pub fn new(root: impl Into<PathBuf>) -> Self {
Self {
root: root.into(),
write_lock: Arc::new(tokio::sync::Mutex::new(())),
}
}
pub fn root(&self) -> &std::path::Path {
&self.root
}
pub fn path_for_date(&self, date: chrono::NaiveDate) -> PathBuf {
self.root
.join(format!("{:04}", date.year()))
.join(format!("{:02}", date.month()))
.join(format!("{:02}", date.day()))
.join(format!("audit-{}.jsonl", date.format("%Y-%m-%d")))
}
}
impl AuditArchiver for FilesystemAuditArchiver {
type Error = FilesystemArchiveError;
async fn archive_batch(&self, events: &[AuthEvent]) -> Result<(), Self::Error> {
if events.is_empty() {
return Ok(());
}
let mut by_day: std::collections::BTreeMap<chrono::NaiveDate, Vec<String>> =
std::collections::BTreeMap::new();
for event in events {
let day = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(event.event_time)
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"event_time {} out of range for day bucketing",
event.event_time
),
)
})?
.date_naive();
let line = serde_json::to_string(event)?;
by_day.entry(day).or_default().push(line);
}
let write_guard = self.write_lock.lock().await;
let root = self.root.clone();
tokio::task::spawn_blocking(move || -> Result<(), FilesystemArchiveError> {
for (day, lines) in by_day {
let path = root
.join(format!("{:04}", day.year()))
.join(format!("{:02}", day.month()))
.join(format!("{:02}", day.day()))
.join(format!("audit-{}.jsonl", day.format("%Y-%m-%d")));
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let mut buf = Vec::with_capacity(lines.iter().map(|s| s.len() + 1).sum());
for line in lines {
buf.extend_from_slice(line.as_bytes());
buf.push(b'\n');
}
file.write_all(&buf)?;
file.sync_all()?;
}
Ok(())
})
.await
.map_err(|join_err| {
FilesystemArchiveError::Io(std::io::Error::other(format!(
"spawn_blocking task panicked: {join_err}"
)))
})??;
drop(write_guard);
Ok(())
}
fn name(&self) -> &'static str {
"filesystem"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::authn::event::{AuthEventStatus, AuthEventType};
use chrono::{TimeZone, Utc};
fn make_event(year: i32, month: u32, day: u32) -> AuthEvent {
use crate::authn::event::AuthEventBuilder;
AuthEventBuilder::new(
None,
None,
AuthEventType::LoginAttempt,
AuthEventStatus::Failure,
)
.with_ip("203.0.113.1")
.with_error("test event")
.build_at(Utc.with_ymd_and_hms(year, month, day, 12, 0, 0).unwrap())
}
#[tokio::test]
async fn empty_batch_is_noop() {
let tmp = tempdir();
let archiver = FilesystemAuditArchiver::new(tmp.path());
assert!(archiver.archive_batch(&[]).await.is_ok());
assert!(!tmp.path().join("2026").exists());
}
#[tokio::test]
async fn batch_writes_jsonl_under_date_partitioned_tree() {
let tmp = tempdir();
let archiver = FilesystemAuditArchiver::new(tmp.path());
let events = vec![make_event(2026, 5, 19), make_event(2026, 5, 19)];
archiver.archive_batch(&events).await.unwrap();
let path = archiver.path_for_date(chrono::NaiveDate::from_ymd_opt(2026, 5, 19).unwrap());
assert!(path.exists(), "expected day file at {:?}", path);
let contents = std::fs::read_to_string(&path).unwrap();
let line_count = contents.lines().count();
assert_eq!(line_count, 2, "expected 2 JSONL records, got {line_count}");
for line in contents.lines() {
let _: AuthEvent = serde_json::from_str(line).expect("each line must be valid JSON");
}
}
#[tokio::test]
async fn events_across_days_land_in_correct_files() {
let tmp = tempdir();
let archiver = FilesystemAuditArchiver::new(tmp.path());
let events = vec![
make_event(2026, 5, 18),
make_event(2026, 5, 19),
make_event(2026, 5, 19),
];
archiver.archive_batch(&events).await.unwrap();
let day1 = archiver.path_for_date(chrono::NaiveDate::from_ymd_opt(2026, 5, 18).unwrap());
let day2 = archiver.path_for_date(chrono::NaiveDate::from_ymd_opt(2026, 5, 19).unwrap());
assert_eq!(std::fs::read_to_string(&day1).unwrap().lines().count(), 1);
assert_eq!(std::fs::read_to_string(&day2).unwrap().lines().count(), 2);
}
#[tokio::test]
async fn repeated_batches_append_rather_than_overwrite() {
let tmp = tempdir();
let archiver = FilesystemAuditArchiver::new(tmp.path());
archiver
.archive_batch(&[make_event(2026, 5, 19)])
.await
.unwrap();
archiver
.archive_batch(&[make_event(2026, 5, 19)])
.await
.unwrap();
let path = archiver.path_for_date(chrono::NaiveDate::from_ymd_opt(2026, 5, 19).unwrap());
assert_eq!(std::fs::read_to_string(&path).unwrap().lines().count(), 2);
}
#[tokio::test]
async fn name_is_stable() {
let tmp = tempdir();
let archiver = FilesystemAuditArchiver::new(tmp.path());
assert_eq!(archiver.name(), "filesystem");
}
fn tempdir() -> TempDir {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let path =
std::env::temp_dir().join(format!("axess-archive-test-{}-{n}", std::process::id()));
std::fs::create_dir_all(&path).expect("create test tempdir");
TempDir { path }
}
struct TempDir {
path: PathBuf,
}
impl TempDir {
fn path(&self) -> &std::path::Path {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
}