1use std::path::{Path, PathBuf};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use time::OffsetDateTime;
14use tokio::fs::{File, OpenOptions};
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::sync::Mutex;
17
18use crate::{
19 event::{Event, EventReceipt, StoredEvent},
20 AuditError, AuditLog, AuditResult, GENESIS_HEAD,
21};
22
23#[derive(Debug)]
24pub struct FileAuditLog {
25 inner: Arc<Mutex<Inner>>,
26 path: PathBuf,
27}
28
29impl std::fmt::Debug for Inner {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("Inner")
32 .field("head", &self.head)
33 .field("len", &self.len)
34 .finish_non_exhaustive()
35 }
36}
37
38struct Inner {
39 file: File,
40 head: String,
41 len: u64,
42}
43
44impl FileAuditLog {
45 pub async fn open(path: impl Into<PathBuf>) -> AuditResult<Self> {
48 let path = path.into();
49 if let Some(parent) = path.parent() {
50 if !parent.as_os_str().is_empty() {
51 tokio::fs::create_dir_all(parent).await?;
52 }
53 }
54
55 let (head, len) = replay(&path).await?;
58
59 let file = OpenOptions::new()
60 .create(true)
61 .append(true)
62 .open(&path)
63 .await?;
64
65 Ok(Self {
66 inner: Arc::new(Mutex::new(Inner { file, head, len })),
67 path,
68 })
69 }
70
71 pub fn path(&self) -> &Path {
72 &self.path
73 }
74}
75
76async fn replay(path: &Path) -> AuditResult<(String, u64)> {
77 if !path.exists() {
78 return Ok((GENESIS_HEAD.to_string(), 0));
79 }
80 let file = File::open(path).await?;
81 let mut lines = BufReader::new(file).lines();
82 let mut head = GENESIS_HEAD.to_string();
83 let mut len: u64 = 0;
84 while let Some(line) = lines.next_line().await? {
85 if line.trim().is_empty() {
86 continue;
87 }
88 let stored: StoredEvent = serde_json::from_str(&line)?;
89 if stored.prev_hash != head {
90 return Err(AuditError::ChainBroken {
91 position: len,
92 expected: head,
93 found: stored.prev_hash,
94 });
95 }
96 stored.verify_hash()?;
97 head = stored.this_hash;
98 len += 1;
99 }
100 Ok((head, len))
101}
102
103#[async_trait]
104impl AuditLog for FileAuditLog {
105 async fn append(&self, event: Event) -> AuditResult<EventReceipt> {
106 let mut guard = self.inner.lock().await;
107 let timestamp = OffsetDateTime::now_utc();
108 let prev_hash = guard.head.clone();
109 let this_hash = event.compute_hash(timestamp, &prev_hash)?;
110 let stored = StoredEvent {
111 position: guard.len,
112 event_id: format!("evt_{}", uuid::Uuid::new_v4().simple()),
113 timestamp,
114 prev_hash,
115 this_hash: this_hash.clone(),
116 event,
117 };
118
119 let mut line = serde_json::to_vec(&stored)?;
120 line.push(b'\n');
121 guard.file.write_all(&line).await?;
122 guard.file.flush().await?;
123
124 guard.head = this_hash.clone();
125 guard.len += 1;
126
127 Ok(EventReceipt {
128 event_id: stored.event_id,
129 position: stored.position,
130 timestamp,
131 chain_head: this_hash,
132 })
133 }
134
135 async fn current_head(&self) -> AuditResult<String> {
136 Ok(self.inner.lock().await.head.clone())
137 }
138
139 async fn verify_chain(&self) -> AuditResult<()> {
140 let (head, len) = replay(&self.path).await?;
143 let guard = self.inner.lock().await;
144 if head != guard.head || len != guard.len {
145 return Err(AuditError::ChainBroken {
146 position: len,
147 expected: guard.head.clone(),
148 found: head,
149 });
150 }
151 Ok(())
152 }
153
154 async fn len(&self) -> AuditResult<u64> {
155 Ok(self.inner.lock().await.len)
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::event::EventKind;
163
164 fn tmpdir() -> tempfile::TempDir {
165 tempfile::tempdir().unwrap()
166 }
167
168 #[tokio::test]
169 async fn fresh_file_has_genesis_head() {
170 let dir = tmpdir();
171 let log = FileAuditLog::open(dir.path().join("audit.jsonl"))
172 .await
173 .unwrap();
174 assert_eq!(log.current_head().await.unwrap(), GENESIS_HEAD);
175 assert_eq!(log.len().await.unwrap(), 0);
176 }
177
178 #[tokio::test]
179 async fn append_persists_across_reopen() {
180 let dir = tmpdir();
181 let path = dir.path().join("audit.jsonl");
182 {
183 let log = FileAuditLog::open(&path).await.unwrap();
184 for i in 0..5 {
185 log.append(Event::new(
186 EventKind::TransferInitiated,
187 "",
188 format!("tx_{}", i),
189 serde_json::json!({"seq": i}),
190 ))
191 .await
192 .unwrap();
193 }
194 }
195 let log = FileAuditLog::open(&path).await.unwrap();
197 assert_eq!(log.len().await.unwrap(), 5);
198 log.verify_chain().await.unwrap();
199 }
200
201 #[tokio::test]
202 async fn tampered_file_rejected_on_reopen() {
203 let dir = tmpdir();
204 let path = dir.path().join("audit.jsonl");
205 {
206 let log = FileAuditLog::open(&path).await.unwrap();
207 for i in 0..3 {
208 log.append(Event::new(
209 EventKind::TransferInitiated,
210 "",
211 format!("tx_{}", i),
212 serde_json::json!({"seq": i}),
213 ))
214 .await
215 .unwrap();
216 }
217 }
218 let content = tokio::fs::read_to_string(&path).await.unwrap();
220 let mut lines: Vec<&str> = content.lines().collect();
221 let tampered = lines[1].replacen("\"seq\":1", "\"seq\":99", 1);
223 lines[1] = &tampered;
224 let rewritten = lines.join("\n") + "\n";
225 tokio::fs::write(&path, rewritten).await.unwrap();
226
227 let err = FileAuditLog::open(&path).await.unwrap_err();
228 assert!(matches!(err, AuditError::HashMismatch { position: 1, .. }));
229 }
230
231 #[tokio::test]
232 async fn empty_lines_skipped() {
233 let dir = tmpdir();
234 let path = dir.path().join("audit.jsonl");
235 {
236 let log = FileAuditLog::open(&path).await.unwrap();
237 log.append(Event::new(
238 EventKind::AgentRegistered,
239 "",
240 "",
241 serde_json::json!({}),
242 ))
243 .await
244 .unwrap();
245 }
246 tokio::fs::write(
248 &path,
249 tokio::fs::read_to_string(&path).await.unwrap() + "\n\n",
250 )
251 .await
252 .unwrap();
253 let log = FileAuditLog::open(&path).await.unwrap();
254 assert_eq!(log.len().await.unwrap(), 1);
255 }
256}