Skip to main content

ccs_proxy/store/
fs.rs

1use super::{RequestSummary, SessionMeta, Store, StoreError};
2use crate::capture::CaptureRecord;
3use async_trait::async_trait;
4use std::path::{Path, PathBuf};
5use std::sync::Mutex;
6use tokio::fs;
7
8pub struct FsStore {
9    root: PathBuf,
10    write_failures: Mutex<u32>,
11}
12
13impl FsStore {
14    pub fn open(root: PathBuf) -> Result<Self, StoreError> {
15        std::fs::create_dir_all(root.join("sessions"))?;
16        std::fs::create_dir_all(root.join("logs"))?;
17        #[cfg(unix)]
18        {
19            use std::os::unix::fs::PermissionsExt;
20            if let Err(e) = std::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o700))
21            {
22                tracing::warn!(?root, ?e, "failed to set root data dir permissions to 0700");
23            }
24        }
25        Ok(Self {
26            root,
27            write_failures: Mutex::new(0),
28        })
29    }
30
31    pub fn root(&self) -> &Path {
32        &self.root
33    }
34
35    pub fn consecutive_write_failures(&self) -> u32 {
36        *self.write_failures.lock().unwrap()
37    }
38
39    fn session_dir(&self, sid: &str) -> PathBuf {
40        self.root.join("sessions").join(sid)
41    }
42
43    fn meta_path(&self, sid: &str) -> PathBuf {
44        self.session_dir(sid).join("meta.json")
45    }
46
47    fn record_path(&self, sid: &str, seq: u64) -> PathBuf {
48        self.session_dir(sid).join(format!("{seq:04}.json"))
49    }
50
51    async fn atomic_write(&self, path: &Path, bytes: &[u8]) -> Result<(), StoreError> {
52        let staging_path = path.with_extension("tmp");
53        fs::write(&staging_path, bytes).await?;
54        #[cfg(unix)]
55        {
56            use std::os::unix::fs::PermissionsExt;
57            let _ =
58                fs::set_permissions(&staging_path, std::fs::Permissions::from_mode(0o600)).await;
59        }
60        fs::rename(&staging_path, path).await?;
61        Ok(())
62    }
63
64    fn note_write_failure(&self) -> u32 {
65        let mut guard = self.write_failures.lock().unwrap();
66        *guard = guard.saturating_add(1);
67        *guard
68    }
69
70    fn note_write_success(&self) {
71        let mut guard = self.write_failures.lock().unwrap();
72        *guard = 0;
73    }
74
75    async fn bump_request_count(&self, session_id: &str, seq: u64) {
76        let meta_path = self.meta_path(session_id);
77        let Ok(meta_bytes) = fs::read(&meta_path).await else {
78            return;
79        };
80        let Ok(mut meta) = serde_json::from_slice::<SessionMeta>(&meta_bytes) else {
81            return;
82        };
83        meta.request_count = meta.request_count.max(seq);
84        let Ok(out) = serde_json::to_vec_pretty(&meta) else {
85            return;
86        };
87        let _ = self.atomic_write(&meta_path, &out).await;
88    }
89}
90
91#[async_trait]
92impl Store for FsStore {
93    fn as_any(&self) -> &dyn std::any::Any {
94        self
95    }
96
97    async fn init_session(&self, meta: SessionMeta) -> Result<(), StoreError> {
98        let dir = self.session_dir(&meta.session_id);
99        fs::create_dir_all(&dir).await?;
100        #[cfg(unix)]
101        {
102            use std::os::unix::fs::PermissionsExt;
103            let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
104        }
105        let bytes = serde_json::to_vec_pretty(&meta)?;
106        match self
107            .atomic_write(&self.meta_path(&meta.session_id), &bytes)
108            .await
109        {
110            Ok(()) => {
111                self.note_write_success();
112                Ok(())
113            }
114            Err(err) => {
115                self.note_write_failure();
116                Err(err)
117            }
118        }
119    }
120
121    async fn finalize_session(&self, session_id: &str) -> Result<(), StoreError> {
122        let path = self.meta_path(session_id);
123        let Ok(bytes) = fs::read(&path).await else {
124            return Ok(());
125        };
126        let mut meta: SessionMeta = serde_json::from_slice(&bytes)?;
127        meta.ended_at = Some(chrono::Utc::now());
128        let out = serde_json::to_vec_pretty(&meta)?;
129        self.atomic_write(&path, &out).await?;
130        Ok(())
131    }
132
133    async fn append(&self, rec: CaptureRecord) -> Result<(), StoreError> {
134        let dir = self.session_dir(&rec.session_id);
135        fs::create_dir_all(&dir).await?;
136        #[cfg(unix)]
137        {
138            use std::os::unix::fs::PermissionsExt;
139            let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
140        }
141        let path = self.record_path(&rec.session_id, rec.seq);
142        let bytes = serde_json::to_vec_pretty(&rec)?;
143        match self.atomic_write(&path, &bytes).await {
144            Ok(()) => {
145                self.note_write_success();
146                self.bump_request_count(&rec.session_id, rec.seq).await;
147                Ok(())
148            }
149            Err(err) => {
150                self.note_write_failure();
151                Err(err)
152            }
153        }
154    }
155
156    async fn list_sessions(&self) -> Result<Vec<SessionMeta>, StoreError> {
157        let dir = self.root.join("sessions");
158        let mut out = Vec::new();
159        let Ok(mut rd) = fs::read_dir(&dir).await else {
160            return Ok(out);
161        };
162        while let Some(entry) = rd.next_entry().await? {
163            if !entry.file_type().await?.is_dir() {
164                continue;
165            }
166            let meta_path = entry.path().join("meta.json");
167            let Ok(bytes) = fs::read(&meta_path).await else {
168                continue;
169            };
170            match serde_json::from_slice::<SessionMeta>(&bytes) {
171                Ok(meta) => out.push(meta),
172                Err(err) => {
173                    tracing::warn!(path = ?meta_path, error = ?err, "skipping corrupt meta.json");
174                }
175            }
176        }
177        out.sort_by_key(|meta| std::cmp::Reverse(meta.started_at));
178        Ok(out)
179    }
180
181    async fn list_requests(&self, session_id: &str) -> Result<Vec<RequestSummary>, StoreError> {
182        let dir = self.session_dir(session_id);
183        let mut out = Vec::new();
184        let Ok(mut rd) = fs::read_dir(&dir).await else {
185            return Ok(out);
186        };
187        while let Some(entry) = rd.next_entry().await? {
188            let name = entry.file_name();
189            let name_str = name.to_string_lossy();
190            if name_str == "meta.json" || !name_str.ends_with(".json") {
191                continue;
192            }
193            let Ok(bytes) = fs::read(entry.path()).await else {
194                continue;
195            };
196            let Ok(rec) = serde_json::from_slice::<CaptureRecord>(&bytes) else {
197                tracing::warn!(path = ?entry.path(), "skipping unparseable record");
198                continue;
199            };
200            out.push(summary_of(&rec));
201        }
202        out.sort_by_key(|summary| summary.seq);
203        Ok(out)
204    }
205
206    async fn get_request(
207        &self,
208        session_id: &str,
209        seq: u64,
210    ) -> Result<Option<CaptureRecord>, StoreError> {
211        let path = self.record_path(session_id, seq);
212        match fs::read(&path).await {
213            Ok(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
214            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
215            Err(err) => Err(err.into()),
216        }
217    }
218}
219
220fn summary_of(rec: &CaptureRecord) -> RequestSummary {
221    RequestSummary {
222        seq: rec.seq,
223        session_id: rec.session_id.clone(),
224        request_id: rec.request_id.clone(),
225        started_at: rec.started_at,
226        duration_ms: rec.duration_ms,
227        model: rec.model.clone(),
228        status: rec.response.as_ref().map(|resp| resp.status),
229        input_tokens: rec.usage.as_ref().map(|usage| usage.input_tokens),
230        output_tokens: rec.usage.as_ref().map(|usage| usage.output_tokens),
231        has_error: rec.error.is_some(),
232    }
233}