Skip to main content

aex_audit/
file_log.rs

1//! File-backed [`AuditLog`]: append-only JSONL with a chain head cached in
2//! memory for fast `append`/`current_head`.
3//!
4//! On startup, the existing file is replayed to rebuild in-memory state
5//! (length, chain head). This is O(file size) at launch but constant-time
6//! afterwards. Acceptable for dev tier; Postgres-backed impl is on the
7//! Phase 2 roadmap for scale.
8
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use time::OffsetDateTime;
14use tokio::fs::{File, OpenOptions};
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::sync::Mutex;
17
18use crate::{
19    event::{Event, EventReceipt, StoredEvent},
20    AuditError, AuditLog, AuditResult, GENESIS_HEAD,
21};
22
23#[derive(Debug)]
24pub struct FileAuditLog {
25    inner: Arc<Mutex<Inner>>,
26    path: PathBuf,
27}
28
29impl std::fmt::Debug for Inner {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("Inner")
32            .field("head", &self.head)
33            .field("len", &self.len)
34            .finish_non_exhaustive()
35    }
36}
37
38struct Inner {
39    file: File,
40    head: String,
41    len: u64,
42}
43
44impl FileAuditLog {
45    /// Open (creating if necessary) the given path and rebuild state by
46    /// replaying any existing rows.
47    pub async fn open(path: impl Into<PathBuf>) -> AuditResult<Self> {
48        let path = path.into();
49        if let Some(parent) = path.parent() {
50            if !parent.as_os_str().is_empty() {
51                tokio::fs::create_dir_all(parent).await?;
52            }
53        }
54
55        // Replay existing file to find head + length. Use a read-only
56        // handle for the replay so we don't seek the write handle around.
57        let (head, len) = replay(&path).await?;
58
59        let file = OpenOptions::new()
60            .create(true)
61            .append(true)
62            .open(&path)
63            .await?;
64
65        Ok(Self {
66            inner: Arc::new(Mutex::new(Inner { file, head, len })),
67            path,
68        })
69    }
70
71    pub fn path(&self) -> &Path {
72        &self.path
73    }
74}
75
76async fn replay(path: &Path) -> AuditResult<(String, u64)> {
77    if !path.exists() {
78        return Ok((GENESIS_HEAD.to_string(), 0));
79    }
80    let file = File::open(path).await?;
81    let mut lines = BufReader::new(file).lines();
82    let mut head = GENESIS_HEAD.to_string();
83    let mut len: u64 = 0;
84    while let Some(line) = lines.next_line().await? {
85        if line.trim().is_empty() {
86            continue;
87        }
88        let stored: StoredEvent = serde_json::from_str(&line)?;
89        if stored.prev_hash != head {
90            return Err(AuditError::ChainBroken {
91                position: len,
92                expected: head,
93                found: stored.prev_hash,
94            });
95        }
96        stored.verify_hash()?;
97        head = stored.this_hash;
98        len += 1;
99    }
100    Ok((head, len))
101}
102
103#[async_trait]
104impl AuditLog for FileAuditLog {
105    async fn append(&self, event: Event) -> AuditResult<EventReceipt> {
106        let mut guard = self.inner.lock().await;
107        let timestamp = OffsetDateTime::now_utc();
108        let prev_hash = guard.head.clone();
109        let this_hash = event.compute_hash(timestamp, &prev_hash)?;
110        let stored = StoredEvent {
111            position: guard.len,
112            event_id: format!("evt_{}", uuid::Uuid::new_v4().simple()),
113            timestamp,
114            prev_hash,
115            this_hash: this_hash.clone(),
116            event,
117        };
118
119        let mut line = serde_json::to_vec(&stored)?;
120        line.push(b'\n');
121        guard.file.write_all(&line).await?;
122        guard.file.flush().await?;
123
124        guard.head = this_hash.clone();
125        guard.len += 1;
126
127        Ok(EventReceipt {
128            event_id: stored.event_id,
129            position: stored.position,
130            timestamp,
131            chain_head: this_hash,
132        })
133    }
134
135    async fn current_head(&self) -> AuditResult<String> {
136        Ok(self.inner.lock().await.head.clone())
137    }
138
139    async fn verify_chain(&self) -> AuditResult<()> {
140        // Re-replay the file from disk — this catches on-disk tampering
141        // even if our in-memory head looks fine.
142        let (head, len) = replay(&self.path).await?;
143        let guard = self.inner.lock().await;
144        if head != guard.head || len != guard.len {
145            return Err(AuditError::ChainBroken {
146                position: len,
147                expected: guard.head.clone(),
148                found: head,
149            });
150        }
151        Ok(())
152    }
153
154    async fn len(&self) -> AuditResult<u64> {
155        Ok(self.inner.lock().await.len)
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::event::EventKind;
163
164    fn tmpdir() -> tempfile::TempDir {
165        tempfile::tempdir().unwrap()
166    }
167
168    #[tokio::test]
169    async fn fresh_file_has_genesis_head() {
170        let dir = tmpdir();
171        let log = FileAuditLog::open(dir.path().join("audit.jsonl"))
172            .await
173            .unwrap();
174        assert_eq!(log.current_head().await.unwrap(), GENESIS_HEAD);
175        assert_eq!(log.len().await.unwrap(), 0);
176    }
177
178    #[tokio::test]
179    async fn append_persists_across_reopen() {
180        let dir = tmpdir();
181        let path = dir.path().join("audit.jsonl");
182        {
183            let log = FileAuditLog::open(&path).await.unwrap();
184            for i in 0..5 {
185                log.append(Event::new(
186                    EventKind::TransferInitiated,
187                    "",
188                    format!("tx_{}", i),
189                    serde_json::json!({"seq": i}),
190                ))
191                .await
192                .unwrap();
193            }
194        }
195        // Reopen and verify state is rebuilt.
196        let log = FileAuditLog::open(&path).await.unwrap();
197        assert_eq!(log.len().await.unwrap(), 5);
198        log.verify_chain().await.unwrap();
199    }
200
201    #[tokio::test]
202    async fn tampered_file_rejected_on_reopen() {
203        let dir = tmpdir();
204        let path = dir.path().join("audit.jsonl");
205        {
206            let log = FileAuditLog::open(&path).await.unwrap();
207            for i in 0..3 {
208                log.append(Event::new(
209                    EventKind::TransferInitiated,
210                    "",
211                    format!("tx_{}", i),
212                    serde_json::json!({"seq": i}),
213                ))
214                .await
215                .unwrap();
216            }
217        }
218        // Corrupt the middle line.
219        let content = tokio::fs::read_to_string(&path).await.unwrap();
220        let mut lines: Vec<&str> = content.lines().collect();
221        // Flip one char in the middle JSON — parseable JSON but wrong hash.
222        let tampered = lines[1].replacen("\"seq\":1", "\"seq\":99", 1);
223        lines[1] = &tampered;
224        let rewritten = lines.join("\n") + "\n";
225        tokio::fs::write(&path, rewritten).await.unwrap();
226
227        let err = FileAuditLog::open(&path).await.unwrap_err();
228        assert!(matches!(err, AuditError::HashMismatch { position: 1, .. }));
229    }
230
231    #[tokio::test]
232    async fn empty_lines_skipped() {
233        let dir = tmpdir();
234        let path = dir.path().join("audit.jsonl");
235        {
236            let log = FileAuditLog::open(&path).await.unwrap();
237            log.append(Event::new(
238                EventKind::AgentRegistered,
239                "",
240                "",
241                serde_json::json!({}),
242            ))
243            .await
244            .unwrap();
245        }
246        // Add a blank line at EOF.
247        tokio::fs::write(
248            &path,
249            tokio::fs::read_to_string(&path).await.unwrap() + "\n\n",
250        )
251        .await
252        .unwrap();
253        let log = FileAuditLog::open(&path).await.unwrap();
254        assert_eq!(log.len().await.unwrap(), 1);
255    }
256}