ccs-proxy 0.1.1

Local logging reverse-proxy + dashboard for Claude Code / Codex traffic
Documentation
use super::{RequestSummary, SessionMeta, Store, StoreError};
use crate::capture::CaptureRecord;
use async_trait::async_trait;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use tokio::fs;

pub struct FsStore {
    root: PathBuf,
    write_failures: Mutex<u32>,
}

impl FsStore {
    pub fn open(root: PathBuf) -> Result<Self, StoreError> {
        std::fs::create_dir_all(root.join("sessions"))?;
        std::fs::create_dir_all(root.join("logs"))?;
        #[cfg(unix)]
        {
            use std::os::unix::fs::PermissionsExt;
            if let Err(e) = std::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o700))
            {
                tracing::warn!(?root, ?e, "failed to set root data dir permissions to 0700");
            }
        }
        Ok(Self {
            root,
            write_failures: Mutex::new(0),
        })
    }

    pub fn root(&self) -> &Path {
        &self.root
    }

    pub fn consecutive_write_failures(&self) -> u32 {
        *self.write_failures.lock().unwrap()
    }

    fn session_dir(&self, sid: &str) -> PathBuf {
        self.root.join("sessions").join(sid)
    }

    fn meta_path(&self, sid: &str) -> PathBuf {
        self.session_dir(sid).join("meta.json")
    }

    fn record_path(&self, sid: &str, seq: u64) -> PathBuf {
        self.session_dir(sid).join(format!("{seq:04}.json"))
    }

    async fn atomic_write(&self, path: &Path, bytes: &[u8]) -> Result<(), StoreError> {
        let staging_path = path.with_extension("tmp");
        fs::write(&staging_path, bytes).await?;
        #[cfg(unix)]
        {
            use std::os::unix::fs::PermissionsExt;
            let _ =
                fs::set_permissions(&staging_path, std::fs::Permissions::from_mode(0o600)).await;
        }
        fs::rename(&staging_path, path).await?;
        Ok(())
    }

    fn note_write_failure(&self) -> u32 {
        let mut guard = self.write_failures.lock().unwrap();
        *guard = guard.saturating_add(1);
        *guard
    }

    fn note_write_success(&self) {
        let mut guard = self.write_failures.lock().unwrap();
        *guard = 0;
    }

    async fn bump_meta(&self, rec: &CaptureRecord) {
        let meta_path = self.meta_path(&rec.session_id);
        let Ok(meta_bytes) = fs::read(&meta_path).await else {
            return;
        };
        let Ok(mut meta) = serde_json::from_slice::<SessionMeta>(&meta_bytes) else {
            return;
        };
        meta.request_count = meta.request_count.max(rec.seq);
        if meta.cwd.is_none()
            && let Some(found) = crate::capture::extract::extract_cwd(&rec.request.body)
        {
            meta.cwd = Some(found);
        }
        if let Some(m) = rec.model.as_deref()
            && !meta.models.iter().any(|existing| existing == m)
        {
            meta.models.push(m.to_string());
        }
        let Ok(out) = serde_json::to_vec_pretty(&meta) else {
            return;
        };
        let _ = self.atomic_write(&meta_path, &out).await;
    }
}

#[async_trait]
impl Store for FsStore {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    async fn init_session(&self, meta: SessionMeta) -> Result<(), StoreError> {
        let dir = self.session_dir(&meta.session_id);
        fs::create_dir_all(&dir).await?;
        #[cfg(unix)]
        {
            use std::os::unix::fs::PermissionsExt;
            let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
        }
        let bytes = serde_json::to_vec_pretty(&meta)?;
        match self
            .atomic_write(&self.meta_path(&meta.session_id), &bytes)
            .await
        {
            Ok(()) => {
                self.note_write_success();
                Ok(())
            }
            Err(err) => {
                self.note_write_failure();
                Err(err)
            }
        }
    }

    async fn finalize_session(&self, session_id: &str) -> Result<(), StoreError> {
        let path = self.meta_path(session_id);
        let Ok(bytes) = fs::read(&path).await else {
            return Ok(());
        };
        let mut meta: SessionMeta = serde_json::from_slice(&bytes)?;
        meta.ended_at = Some(chrono::Utc::now());
        let out = serde_json::to_vec_pretty(&meta)?;
        self.atomic_write(&path, &out).await?;
        Ok(())
    }

    async fn append(&self, rec: CaptureRecord) -> Result<(), StoreError> {
        let dir = self.session_dir(&rec.session_id);
        fs::create_dir_all(&dir).await?;
        #[cfg(unix)]
        {
            use std::os::unix::fs::PermissionsExt;
            let _ = fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)).await;
        }
        let path = self.record_path(&rec.session_id, rec.seq);
        let bytes = serde_json::to_vec_pretty(&rec)?;
        match self.atomic_write(&path, &bytes).await {
            Ok(()) => {
                self.note_write_success();
                self.bump_meta(&rec).await;
                Ok(())
            }
            Err(err) => {
                self.note_write_failure();
                Err(err)
            }
        }
    }

    async fn list_sessions(&self) -> Result<Vec<SessionMeta>, StoreError> {
        let dir = self.root.join("sessions");
        let mut out = Vec::new();
        let Ok(mut rd) = fs::read_dir(&dir).await else {
            return Ok(out);
        };
        while let Some(entry) = rd.next_entry().await? {
            if !entry.file_type().await?.is_dir() {
                continue;
            }
            let meta_path = entry.path().join("meta.json");
            let Ok(bytes) = fs::read(&meta_path).await else {
                continue;
            };
            match serde_json::from_slice::<SessionMeta>(&bytes) {
                Ok(meta) => out.push(meta),
                Err(err) => {
                    tracing::warn!(path = ?meta_path, error = ?err, "skipping corrupt meta.json");
                }
            }
        }
        out.sort_by_key(|meta| std::cmp::Reverse(meta.started_at));
        Ok(out)
    }

    async fn list_requests(&self, session_id: &str) -> Result<Vec<RequestSummary>, StoreError> {
        let dir = self.session_dir(session_id);
        let mut out = Vec::new();
        let Ok(mut rd) = fs::read_dir(&dir).await else {
            return Ok(out);
        };
        while let Some(entry) = rd.next_entry().await? {
            let name = entry.file_name();
            let name_str = name.to_string_lossy();
            if name_str == "meta.json" || !name_str.ends_with(".json") {
                continue;
            }
            let Ok(bytes) = fs::read(entry.path()).await else {
                continue;
            };
            let Ok(rec) = serde_json::from_slice::<CaptureRecord>(&bytes) else {
                tracing::warn!(path = ?entry.path(), "skipping unparseable record");
                continue;
            };
            out.push(summary_of(&rec));
        }
        out.sort_by_key(|summary| summary.seq);
        Ok(out)
    }

    async fn get_request(
        &self,
        session_id: &str,
        seq: u64,
    ) -> Result<Option<CaptureRecord>, StoreError> {
        let path = self.record_path(session_id, seq);
        match fs::read(&path).await {
            Ok(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
            Err(err) => Err(err.into()),
        }
    }
}

fn summary_of(rec: &CaptureRecord) -> RequestSummary {
    RequestSummary {
        seq: rec.seq,
        session_id: rec.session_id.clone(),
        request_id: rec.request_id.clone(),
        started_at: rec.started_at,
        duration_ms: rec.duration_ms,
        model: rec.model.clone(),
        status: rec.response.as_ref().map(|resp| resp.status),
        input_tokens: rec.usage.as_ref().map(|usage| usage.input_tokens),
        output_tokens: rec.usage.as_ref().map(|usage| usage.output_tokens),
        has_error: rec.error.is_some(),
    }
}