Skip to main content

ccs_proxy/store/
fs.rs

1use super::{RequestSummary, SessionMeta, Store, StoreError};
2use crate::capture::CaptureRecord;
3use async_trait::async_trait;
4use std::path::{Path, PathBuf};
5use std::sync::Mutex;
6use tokio::fs;
7
8pub struct FsStore {
9    root: PathBuf,
10    write_failures: Mutex<u32>,
11}
12
13impl FsStore {
14    pub fn open(root: PathBuf) -> Result<Self, StoreError> {
15        std::fs::create_dir_all(root.join("sessions"))?;
16        std::fs::create_dir_all(root.join("logs"))?;
17        #[cfg(unix)]
18        {
19            use std::os::unix::fs::PermissionsExt;
20            if let Err(e) = std::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o700))
21            {
22                tracing::warn!(?root, ?e, "failed to set root data dir permissions to 0700");
23            }
24        }
25        Ok(Self {
26            root,
27            write_failures: Mutex::new(0),
28        })
29    }
30
31    pub fn root(&self) -> &Path {
32        &self.root
33    }
34
35    pub fn consecutive_write_failures(&self) -> u32 {
36        *self.write_failures.lock().unwrap()
37    }
38
39    fn session_dir(&self, sid: &str) -> PathBuf {
40        self.root.join("sessions").join(sid)
41    }
42
43    fn meta_path(&self, sid: &str) -> PathBuf {
44        self.session_dir(sid).join("meta.json")
45    }
46
47    fn record_path(&self, sid: &str, seq: u64) -> PathBuf {
48        self.session_dir(sid).join(format!("{seq:04}.json"))
49    }
50
51    async fn atomic_write(&self, path: &Path, bytes: &[u8]) -> Result<(), StoreError> {
52        let staging_path = path.with_extension("tmp");
53        fs::write(&staging_path, bytes).await?;
54        #[cfg(unix)]
55        {
56            use std::os::unix::fs::PermissionsExt;
57            let _ =
58                fs::set_permissions(&staging_path, std::fs::Permissions::from_mode(0o600)).await;
59        }
60        fs::rename(&staging_path, path).await?;
61        Ok(())
62    }
63
64    fn note_write_failure(&self) -> u32 {
65        let mut guard = self.write_failures.lock().unwrap();
66        *guard = guard.saturating_add(1);
67        *guard
68    }
69
70    fn note_write_success(&self) {
71        let mut guard = self.write_failures.lock().unwrap();
72        *guard = 0;
73    }
74
75    async fn bump_meta(&self, rec: &CaptureRecord) {
76        let meta_path = self.meta_path(&rec.session_id);
77        let Ok(meta_bytes) = fs::read(&meta_path).await else {
78            return;
79        };
80        let Ok(mut meta) = serde_json::from_slice::<SessionMeta>(&meta_bytes) else {
81            return;
82        };
83        meta.request_count = meta.request_count.max(rec.seq);
84        if meta.cwd.is_none()
85            && let Some(found) = crate::capture::extract::extract_cwd(&rec.request.body)
86        {
87            meta.cwd = Some(found);
88        }
89        if let Some(m) = rec.model.as_deref()
90            && !meta.models.iter().any(|existing| existing == m)
91        {
92            meta.models.push(m.to_string());
93        }
94        let Ok(out) = serde_json::to_vec_pretty(&meta) else {
95            return;
96        };
97        let _ = self.atomic_write(&meta_path, &out).await;
98    }
99}
100
101#[async_trait]
102impl Store for FsStore {
103    fn as_any(&self) -> &dyn std::any::Any {
104        self
105    }
106
107    async fn init_session(&self, meta: SessionMeta) -> Result<(), StoreError> {
108        let dir = self.session_dir(&meta.session_id);
109        fs::create_dir_all(&dir).await?;
110        #[cfg(unix)]
111        {
112            use std::os::unix::fs::PermissionsExt;
113            let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
114        }
115        let bytes = serde_json::to_vec_pretty(&meta)?;
116        match self
117            .atomic_write(&self.meta_path(&meta.session_id), &bytes)
118            .await
119        {
120            Ok(()) => {
121                self.note_write_success();
122                Ok(())
123            }
124            Err(err) => {
125                self.note_write_failure();
126                Err(err)
127            }
128        }
129    }
130
131    async fn finalize_session(&self, session_id: &str) -> Result<(), StoreError> {
132        let path = self.meta_path(session_id);
133        let Ok(bytes) = fs::read(&path).await else {
134            return Ok(());
135        };
136        let mut meta: SessionMeta = serde_json::from_slice(&bytes)?;
137        meta.ended_at = Some(chrono::Utc::now());
138        let out = serde_json::to_vec_pretty(&meta)?;
139        self.atomic_write(&path, &out).await?;
140        Ok(())
141    }
142
143    async fn append(&self, rec: CaptureRecord) -> Result<(), StoreError> {
144        let dir = self.session_dir(&rec.session_id);
145        fs::create_dir_all(&dir).await?;
146        #[cfg(unix)]
147        {
148            use std::os::unix::fs::PermissionsExt;
149            let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
150        }
151        let path = self.record_path(&rec.session_id, rec.seq);
152        let bytes = serde_json::to_vec_pretty(&rec)?;
153        match self.atomic_write(&path, &bytes).await {
154            Ok(()) => {
155                self.note_write_success();
156                self.bump_meta(&rec).await;
157                Ok(())
158            }
159            Err(err) => {
160                self.note_write_failure();
161                Err(err)
162            }
163        }
164    }
165
166    async fn list_sessions(&self) -> Result<Vec<SessionMeta>, StoreError> {
167        let dir = self.root.join("sessions");
168        let mut out = Vec::new();
169        let Ok(mut rd) = fs::read_dir(&dir).await else {
170            return Ok(out);
171        };
172        while let Some(entry) = rd.next_entry().await? {
173            if !entry.file_type().await?.is_dir() {
174                continue;
175            }
176            let meta_path = entry.path().join("meta.json");
177            let Ok(bytes) = fs::read(&meta_path).await else {
178                continue;
179            };
180            match serde_json::from_slice::<SessionMeta>(&bytes) {
181                Ok(meta) => out.push(meta),
182                Err(err) => {
183                    tracing::warn!(path = ?meta_path, error = ?err, "skipping corrupt meta.json");
184                }
185            }
186        }
187        out.sort_by_key(|meta| std::cmp::Reverse(meta.started_at));
188        Ok(out)
189    }
190
191    async fn list_requests(&self, session_id: &str) -> Result<Vec<RequestSummary>, StoreError> {
192        let dir = self.session_dir(session_id);
193        let mut out = Vec::new();
194        let Ok(mut rd) = fs::read_dir(&dir).await else {
195            return Ok(out);
196        };
197        while let Some(entry) = rd.next_entry().await? {
198            let name = entry.file_name();
199            let name_str = name.to_string_lossy();
200            if name_str == "meta.json" || !name_str.ends_with(".json") {
201                continue;
202            }
203            let Ok(bytes) = fs::read(entry.path()).await else {
204                continue;
205            };
206            let Ok(rec) = serde_json::from_slice::<CaptureRecord>(&bytes) else {
207                tracing::warn!(path = ?entry.path(), "skipping unparseable record");
208                continue;
209            };
210            out.push(summary_of(&rec));
211        }
212        out.sort_by_key(|summary| summary.seq);
213        Ok(out)
214    }
215
216    async fn get_request(
217        &self,
218        session_id: &str,
219        seq: u64,
220    ) -> Result<Option<CaptureRecord>, StoreError> {
221        let path = self.record_path(session_id, seq);
222        match fs::read(&path).await {
223            Ok(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
224            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
225            Err(err) => Err(err.into()),
226        }
227    }
228}
229
230fn summary_of(rec: &CaptureRecord) -> RequestSummary {
231    RequestSummary {
232        seq: rec.seq,
233        session_id: rec.session_id.clone(),
234        request_id: rec.request_id.clone(),
235        started_at: rec.started_at,
236        duration_ms: rec.duration_ms,
237        model: rec.model.clone(),
238        status: rec.response.as_ref().map(|resp| resp.status),
239        input_tokens: rec.usage.as_ref().map(|usage| usage.input_tokens),
240        output_tokens: rec.usage.as_ref().map(|usage| usage.output_tokens),
241        has_error: rec.error.is_some(),
242    }
243}