1use super::{RequestSummary, SessionMeta, Store, StoreError};
2use crate::capture::CaptureRecord;
3use async_trait::async_trait;
4use std::path::{Path, PathBuf};
5use std::sync::Mutex;
6use tokio::fs;
7
8pub struct FsStore {
9 root: PathBuf,
10 write_failures: Mutex<u32>,
11}
12
13impl FsStore {
14 pub fn open(root: PathBuf) -> Result<Self, StoreError> {
15 std::fs::create_dir_all(root.join("sessions"))?;
16 std::fs::create_dir_all(root.join("logs"))?;
17 #[cfg(unix)]
18 {
19 use std::os::unix::fs::PermissionsExt;
20 if let Err(e) = std::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o700))
21 {
22 tracing::warn!(?root, ?e, "failed to set root data dir permissions to 0700");
23 }
24 }
25 Ok(Self {
26 root,
27 write_failures: Mutex::new(0),
28 })
29 }
30
31 pub fn root(&self) -> &Path {
32 &self.root
33 }
34
35 pub fn consecutive_write_failures(&self) -> u32 {
36 *self.write_failures.lock().unwrap()
37 }
38
39 fn session_dir(&self, sid: &str) -> PathBuf {
40 self.root.join("sessions").join(sid)
41 }
42
43 fn meta_path(&self, sid: &str) -> PathBuf {
44 self.session_dir(sid).join("meta.json")
45 }
46
47 fn record_path(&self, sid: &str, seq: u64) -> PathBuf {
48 self.session_dir(sid).join(format!("{seq:04}.json"))
49 }
50
51 async fn atomic_write(&self, path: &Path, bytes: &[u8]) -> Result<(), StoreError> {
52 let staging_path = path.with_extension("tmp");
53 fs::write(&staging_path, bytes).await?;
54 #[cfg(unix)]
55 {
56 use std::os::unix::fs::PermissionsExt;
57 let _ =
58 fs::set_permissions(&staging_path, std::fs::Permissions::from_mode(0o600)).await;
59 }
60 fs::rename(&staging_path, path).await?;
61 Ok(())
62 }
63
64 fn note_write_failure(&self) -> u32 {
65 let mut guard = self.write_failures.lock().unwrap();
66 *guard = guard.saturating_add(1);
67 *guard
68 }
69
70 fn note_write_success(&self) {
71 let mut guard = self.write_failures.lock().unwrap();
72 *guard = 0;
73 }
74
75 async fn bump_meta(&self, rec: &CaptureRecord) {
76 let meta_path = self.meta_path(&rec.session_id);
77 let Ok(meta_bytes) = fs::read(&meta_path).await else {
78 return;
79 };
80 let Ok(mut meta) = serde_json::from_slice::<SessionMeta>(&meta_bytes) else {
81 return;
82 };
83 meta.request_count = meta.request_count.max(rec.seq);
84 if meta.cwd.is_none()
85 && let Some(found) = crate::capture::extract::extract_cwd(&rec.request.body)
86 {
87 meta.cwd = Some(found);
88 }
89 if let Some(m) = rec.model.as_deref()
90 && !meta.models.iter().any(|existing| existing == m)
91 {
92 meta.models.push(m.to_string());
93 }
94 let Ok(out) = serde_json::to_vec_pretty(&meta) else {
95 return;
96 };
97 let _ = self.atomic_write(&meta_path, &out).await;
98 }
99}
100
101#[async_trait]
102impl Store for FsStore {
103 fn as_any(&self) -> &dyn std::any::Any {
104 self
105 }
106
107 async fn init_session(&self, meta: SessionMeta) -> Result<(), StoreError> {
108 let dir = self.session_dir(&meta.session_id);
109 fs::create_dir_all(&dir).await?;
110 #[cfg(unix)]
111 {
112 use std::os::unix::fs::PermissionsExt;
113 let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
114 }
115 let bytes = serde_json::to_vec_pretty(&meta)?;
116 match self
117 .atomic_write(&self.meta_path(&meta.session_id), &bytes)
118 .await
119 {
120 Ok(()) => {
121 self.note_write_success();
122 Ok(())
123 }
124 Err(err) => {
125 self.note_write_failure();
126 Err(err)
127 }
128 }
129 }
130
131 async fn finalize_session(&self, session_id: &str) -> Result<(), StoreError> {
132 let path = self.meta_path(session_id);
133 let Ok(bytes) = fs::read(&path).await else {
134 return Ok(());
135 };
136 let mut meta: SessionMeta = serde_json::from_slice(&bytes)?;
137 meta.ended_at = Some(chrono::Utc::now());
138 let out = serde_json::to_vec_pretty(&meta)?;
139 self.atomic_write(&path, &out).await?;
140 Ok(())
141 }
142
143 async fn append(&self, rec: CaptureRecord) -> Result<(), StoreError> {
144 let dir = self.session_dir(&rec.session_id);
145 fs::create_dir_all(&dir).await?;
146 #[cfg(unix)]
147 {
148 use std::os::unix::fs::PermissionsExt;
149 let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
150 }
151 let path = self.record_path(&rec.session_id, rec.seq);
152 let bytes = serde_json::to_vec_pretty(&rec)?;
153 match self.atomic_write(&path, &bytes).await {
154 Ok(()) => {
155 self.note_write_success();
156 self.bump_meta(&rec).await;
157 Ok(())
158 }
159 Err(err) => {
160 self.note_write_failure();
161 Err(err)
162 }
163 }
164 }
165
166 async fn list_sessions(&self) -> Result<Vec<SessionMeta>, StoreError> {
167 let dir = self.root.join("sessions");
168 let mut out = Vec::new();
169 let Ok(mut rd) = fs::read_dir(&dir).await else {
170 return Ok(out);
171 };
172 while let Some(entry) = rd.next_entry().await? {
173 if !entry.file_type().await?.is_dir() {
174 continue;
175 }
176 let meta_path = entry.path().join("meta.json");
177 let Ok(bytes) = fs::read(&meta_path).await else {
178 continue;
179 };
180 match serde_json::from_slice::<SessionMeta>(&bytes) {
181 Ok(meta) => out.push(meta),
182 Err(err) => {
183 tracing::warn!(path = ?meta_path, error = ?err, "skipping corrupt meta.json");
184 }
185 }
186 }
187 out.sort_by_key(|meta| std::cmp::Reverse(meta.started_at));
188 Ok(out)
189 }
190
191 async fn list_requests(&self, session_id: &str) -> Result<Vec<RequestSummary>, StoreError> {
192 let dir = self.session_dir(session_id);
193 let mut out = Vec::new();
194 let Ok(mut rd) = fs::read_dir(&dir).await else {
195 return Ok(out);
196 };
197 while let Some(entry) = rd.next_entry().await? {
198 let name = entry.file_name();
199 let name_str = name.to_string_lossy();
200 if name_str == "meta.json" || !name_str.ends_with(".json") {
201 continue;
202 }
203 let Ok(bytes) = fs::read(entry.path()).await else {
204 continue;
205 };
206 let Ok(rec) = serde_json::from_slice::<CaptureRecord>(&bytes) else {
207 tracing::warn!(path = ?entry.path(), "skipping unparseable record");
208 continue;
209 };
210 out.push(summary_of(&rec));
211 }
212 out.sort_by_key(|summary| summary.seq);
213 Ok(out)
214 }
215
216 async fn get_request(
217 &self,
218 session_id: &str,
219 seq: u64,
220 ) -> Result<Option<CaptureRecord>, StoreError> {
221 let path = self.record_path(session_id, seq);
222 match fs::read(&path).await {
223 Ok(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
224 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
225 Err(err) => Err(err.into()),
226 }
227 }
228}
229
230fn summary_of(rec: &CaptureRecord) -> RequestSummary {
231 RequestSummary {
232 seq: rec.seq,
233 session_id: rec.session_id.clone(),
234 request_id: rec.request_id.clone(),
235 started_at: rec.started_at,
236 duration_ms: rec.duration_ms,
237 model: rec.model.clone(),
238 status: rec.response.as_ref().map(|resp| resp.status),
239 input_tokens: rec.usage.as_ref().map(|usage| usage.input_tokens),
240 output_tokens: rec.usage.as_ref().map(|usage| usage.output_tokens),
241 has_error: rec.error.is_some(),
242 }
243}