1use super::{RequestSummary, SessionMeta, Store, StoreError};
2use crate::capture::CaptureRecord;
3use async_trait::async_trait;
4use std::path::{Path, PathBuf};
5use std::sync::Mutex;
6use tokio::fs;
7
8pub struct FsStore {
9 root: PathBuf,
10 write_failures: Mutex<u32>,
11}
12
13impl FsStore {
14 pub fn open(root: PathBuf) -> Result<Self, StoreError> {
15 std::fs::create_dir_all(root.join("sessions"))?;
16 std::fs::create_dir_all(root.join("logs"))?;
17 #[cfg(unix)]
18 {
19 use std::os::unix::fs::PermissionsExt;
20 if let Err(e) = std::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o700))
21 {
22 tracing::warn!(?root, ?e, "failed to set root data dir permissions to 0700");
23 }
24 }
25 Ok(Self {
26 root,
27 write_failures: Mutex::new(0),
28 })
29 }
30
31 pub fn root(&self) -> &Path {
32 &self.root
33 }
34
35 pub fn consecutive_write_failures(&self) -> u32 {
36 *self.write_failures.lock().unwrap()
37 }
38
39 fn session_dir(&self, sid: &str) -> PathBuf {
40 self.root.join("sessions").join(sid)
41 }
42
43 fn meta_path(&self, sid: &str) -> PathBuf {
44 self.session_dir(sid).join("meta.json")
45 }
46
47 fn record_path(&self, sid: &str, seq: u64) -> PathBuf {
48 self.session_dir(sid).join(format!("{seq:04}.json"))
49 }
50
51 async fn atomic_write(&self, path: &Path, bytes: &[u8]) -> Result<(), StoreError> {
52 let staging_path = path.with_extension("tmp");
53 fs::write(&staging_path, bytes).await?;
54 #[cfg(unix)]
55 {
56 use std::os::unix::fs::PermissionsExt;
57 let _ =
58 fs::set_permissions(&staging_path, std::fs::Permissions::from_mode(0o600)).await;
59 }
60 fs::rename(&staging_path, path).await?;
61 Ok(())
62 }
63
64 fn note_write_failure(&self) -> u32 {
65 let mut guard = self.write_failures.lock().unwrap();
66 *guard = guard.saturating_add(1);
67 *guard
68 }
69
70 fn note_write_success(&self) {
71 let mut guard = self.write_failures.lock().unwrap();
72 *guard = 0;
73 }
74
75 async fn bump_request_count(&self, session_id: &str, seq: u64) {
76 let meta_path = self.meta_path(session_id);
77 let Ok(meta_bytes) = fs::read(&meta_path).await else {
78 return;
79 };
80 let Ok(mut meta) = serde_json::from_slice::<SessionMeta>(&meta_bytes) else {
81 return;
82 };
83 meta.request_count = meta.request_count.max(seq);
84 let Ok(out) = serde_json::to_vec_pretty(&meta) else {
85 return;
86 };
87 let _ = self.atomic_write(&meta_path, &out).await;
88 }
89}
90
91#[async_trait]
92impl Store for FsStore {
93 fn as_any(&self) -> &dyn std::any::Any {
94 self
95 }
96
97 async fn init_session(&self, meta: SessionMeta) -> Result<(), StoreError> {
98 let dir = self.session_dir(&meta.session_id);
99 fs::create_dir_all(&dir).await?;
100 #[cfg(unix)]
101 {
102 use std::os::unix::fs::PermissionsExt;
103 let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
104 }
105 let bytes = serde_json::to_vec_pretty(&meta)?;
106 match self
107 .atomic_write(&self.meta_path(&meta.session_id), &bytes)
108 .await
109 {
110 Ok(()) => {
111 self.note_write_success();
112 Ok(())
113 }
114 Err(err) => {
115 self.note_write_failure();
116 Err(err)
117 }
118 }
119 }
120
121 async fn finalize_session(&self, session_id: &str) -> Result<(), StoreError> {
122 let path = self.meta_path(session_id);
123 let Ok(bytes) = fs::read(&path).await else {
124 return Ok(());
125 };
126 let mut meta: SessionMeta = serde_json::from_slice(&bytes)?;
127 meta.ended_at = Some(chrono::Utc::now());
128 let out = serde_json::to_vec_pretty(&meta)?;
129 self.atomic_write(&path, &out).await?;
130 Ok(())
131 }
132
133 async fn append(&self, rec: CaptureRecord) -> Result<(), StoreError> {
134 let dir = self.session_dir(&rec.session_id);
135 fs::create_dir_all(&dir).await?;
136 #[cfg(unix)]
137 {
138 use std::os::unix::fs::PermissionsExt;
139 let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
140 }
141 let path = self.record_path(&rec.session_id, rec.seq);
142 let bytes = serde_json::to_vec_pretty(&rec)?;
143 match self.atomic_write(&path, &bytes).await {
144 Ok(()) => {
145 self.note_write_success();
146 self.bump_request_count(&rec.session_id, rec.seq).await;
147 Ok(())
148 }
149 Err(err) => {
150 self.note_write_failure();
151 Err(err)
152 }
153 }
154 }
155
156 async fn list_sessions(&self) -> Result<Vec<SessionMeta>, StoreError> {
157 let dir = self.root.join("sessions");
158 let mut out = Vec::new();
159 let Ok(mut rd) = fs::read_dir(&dir).await else {
160 return Ok(out);
161 };
162 while let Some(entry) = rd.next_entry().await? {
163 if !entry.file_type().await?.is_dir() {
164 continue;
165 }
166 let meta_path = entry.path().join("meta.json");
167 let Ok(bytes) = fs::read(&meta_path).await else {
168 continue;
169 };
170 match serde_json::from_slice::<SessionMeta>(&bytes) {
171 Ok(meta) => out.push(meta),
172 Err(err) => {
173 tracing::warn!(path = ?meta_path, error = ?err, "skipping corrupt meta.json");
174 }
175 }
176 }
177 out.sort_by_key(|meta| std::cmp::Reverse(meta.started_at));
178 Ok(out)
179 }
180
181 async fn list_requests(&self, session_id: &str) -> Result<Vec<RequestSummary>, StoreError> {
182 let dir = self.session_dir(session_id);
183 let mut out = Vec::new();
184 let Ok(mut rd) = fs::read_dir(&dir).await else {
185 return Ok(out);
186 };
187 while let Some(entry) = rd.next_entry().await? {
188 let name = entry.file_name();
189 let name_str = name.to_string_lossy();
190 if name_str == "meta.json" || !name_str.ends_with(".json") {
191 continue;
192 }
193 let Ok(bytes) = fs::read(entry.path()).await else {
194 continue;
195 };
196 let Ok(rec) = serde_json::from_slice::<CaptureRecord>(&bytes) else {
197 tracing::warn!(path = ?entry.path(), "skipping unparseable record");
198 continue;
199 };
200 out.push(summary_of(&rec));
201 }
202 out.sort_by_key(|summary| summary.seq);
203 Ok(out)
204 }
205
206 async fn get_request(
207 &self,
208 session_id: &str,
209 seq: u64,
210 ) -> Result<Option<CaptureRecord>, StoreError> {
211 let path = self.record_path(session_id, seq);
212 match fs::read(&path).await {
213 Ok(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
214 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
215 Err(err) => Err(err.into()),
216 }
217 }
218}
219
220fn summary_of(rec: &CaptureRecord) -> RequestSummary {
221 RequestSummary {
222 seq: rec.seq,
223 session_id: rec.session_id.clone(),
224 request_id: rec.request_id.clone(),
225 started_at: rec.started_at,
226 duration_ms: rec.duration_ms,
227 model: rec.model.clone(),
228 status: rec.response.as_ref().map(|resp| resp.status),
229 input_tokens: rec.usage.as_ref().map(|usage| usage.input_tokens),
230 output_tokens: rec.usage.as_ref().map(|usage| usage.output_tokens),
231 has_error: rec.error.is_some(),
232 }
233}