ccs-proxy 0.1.0

Local logging reverse-proxy + dashboard for Claude Code / Codex traffic
Documentation
//! ccs-proxy: a local logging reverse-proxy + dashboard that captures the
//! traffic between Claude Code / Codex and their upstream LLM APIs.

pub mod api;
pub mod capture;
mod config;
mod error;
mod handle;
pub mod provider;
pub mod proxy;
mod session;
mod state;
pub mod store;

pub use capture::CaptureEvent;
pub use config::ServeConfig;
pub use error::ServeError;
pub use handle::ProxyHandle;
pub use provider::ProviderKind;
pub use session::SessionId;
pub use state::AppState;

use crate::store::Store;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

/// Bind both the reverse-proxy and the API listeners on `127.0.0.1`, wire
/// shared [`AppState`], spawn the server task, and return a [`ProxyHandle`]
/// that the caller can use to discover the bound ports and trigger shutdown.
///
/// The configured `proxy_port` / `api_port` of `0` ask the kernel to assign a
/// free port; the actually-bound ports are recorded in both the returned
/// handle and the persisted [`store::SessionMeta`].
#[allow(clippy::too_many_lines)]
pub async fn serve(cfg: ServeConfig) -> Result<ProxyHandle, ServeError> {
    if !cfg!(unix) {
        return Err(ServeError::UnsupportedPlatform(
            "only unix (macOS / Linux) is supported in v1",
        ));
    }

    std::fs::create_dir_all(&cfg.data_dir).map_err(|err| ServeError::DataDir {
        path: cfg.data_dir.clone(),
        source: err,
    })?;

    let store: Arc<dyn Store> = Arc::new(
        store::FsStore::open(cfg.data_dir.clone())
            .map_err(|err| ServeError::Internal(anyhow::Error::from(err)))?,
    );
    let session_id = SessionId::new();

    let proxy_listener = TcpListener::bind(("127.0.0.1", cfg.proxy_port))
        .await
        .map_err(ServeError::BindProxy)?;
    let proxy_addr = proxy_listener.local_addr().map_err(ServeError::BindProxy)?;

    let (api_listener, api_addr) = if cfg.api_server {
        let listener = TcpListener::bind(("127.0.0.1", cfg.api_port))
            .await
            .map_err(ServeError::BindApi)?;
        let addr = listener.local_addr().map_err(ServeError::BindApi)?;
        (Some(listener), Some(addr))
    } else {
        (None, None)
    };

    let meta = store::SessionMeta {
        session_id: session_id.to_string(),
        provider: cfg.provider.as_str().into(),
        upstream: cfg.upstream.to_string(),
        proxy_port: proxy_addr.port(),
        api_port: api_addr.map_or(0, |a| a.port()),
        started_at: chrono::Utc::now(),
        ended_at: None,
        request_count: 0,
        schema_version: 1,
    };
    if let Err(err) = store.init_session(meta).await {
        tracing::warn!(error = %err, "failed to persist initial session metadata");
    }

    let state = AppState::new(
        store.clone(),
        cfg.provider,
        cfg.upstream.clone(),
        session_id.clone(),
        cfg.redact,
    );

    let proxy_app = proxy::build_proxy_app(state.clone());
    let events = state.events.clone();

    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
    let join = spawn_servers(
        proxy_listener,
        api_listener,
        proxy_app,
        state,
        shutdown_rx,
        store,
        session_id,
    );

    Ok(ProxyHandle {
        provider: cfg.provider,
        upstream: cfg.upstream,
        proxy_port: proxy_addr.port(),
        api_port: api_addr.map(|a| a.port()),
        shutdown_tx: Some(shutdown_tx),
        join: Some(join),
        events,
    })
}

fn spawn_servers(
    proxy_listener: TcpListener,
    api_listener: Option<TcpListener>,
    proxy_app: axum::Router,
    state: AppState,
    shutdown_rx: oneshot::Receiver<()>,
    store: Arc<dyn Store>,
    session_id: SessionId,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        let proxy_fut = axum::serve(proxy_listener, proxy_app);

        if let Some(api_listener) = api_listener {
            let api_app = api::build_api_app(state);
            let api_fut = axum::serve(api_listener, api_app);
            tokio::select! {
                res = proxy_fut => {
                    if let Err(err) = res {
                        tracing::warn!(error = %err, "proxy server exited");
                    }
                }
                res = api_fut => {
                    if let Err(err) = res {
                        tracing::warn!(error = %err, "api server exited");
                    }
                }
                _ = shutdown_rx => {}
            }
        } else {
            tokio::select! {
                res = proxy_fut => {
                    if let Err(err) = res {
                        tracing::warn!(error = %err, "proxy server exited");
                    }
                }
                _ = shutdown_rx => {}
            }
        }

        if let Err(err) = store.finalize_session(session_id.as_str()).await {
            tracing::warn!(error = %err, "failed to finalize session");
        }
    })
}