Skip to main content

ccs_proxy/
lib.rs

1//! ccs-proxy: a local logging reverse-proxy + dashboard that captures the
2//! traffic between Claude Code / Codex and their upstream LLM APIs.
3
4pub mod api;
5pub mod capture;
6mod config;
7mod error;
8mod handle;
9pub mod provider;
10pub mod proxy;
11mod session;
12mod state;
13pub mod store;
14
15pub use capture::CaptureEvent;
16pub use config::ServeConfig;
17pub use error::ServeError;
18pub use handle::ProxyHandle;
19pub use provider::ProviderKind;
20pub use session::SessionId;
21pub use state::AppState;
22
23use crate::store::Store;
24use std::sync::Arc;
25use tokio::net::TcpListener;
26use tokio::sync::oneshot;
27use tokio::task::JoinHandle;
28
29/// Bind both the reverse-proxy and the API listeners on `127.0.0.1`, wire
30/// shared [`AppState`], spawn the server task, and return a [`ProxyHandle`]
31/// that the caller can use to discover the bound ports and trigger shutdown.
32///
33/// The configured `proxy_port` / `api_port` of `0` ask the kernel to assign a
34/// free port; the actually-bound ports are recorded in both the returned
35/// handle and the persisted [`store::SessionMeta`].
36#[allow(clippy::too_many_lines)]
37pub async fn serve(cfg: ServeConfig) -> Result<ProxyHandle, ServeError> {
38    if !cfg!(unix) {
39        return Err(ServeError::UnsupportedPlatform(
40            "only unix (macOS / Linux) is supported in v1",
41        ));
42    }
43
44    std::fs::create_dir_all(&cfg.data_dir).map_err(|err| ServeError::DataDir {
45        path: cfg.data_dir.clone(),
46        source: err,
47    })?;
48
49    let store: Arc<dyn Store> = Arc::new(
50        store::FsStore::open(cfg.data_dir.clone())
51            .map_err(|err| ServeError::Internal(anyhow::Error::from(err)))?,
52    );
53    let session_id = SessionId::new();
54
55    let proxy_listener = TcpListener::bind(("127.0.0.1", cfg.proxy_port))
56        .await
57        .map_err(ServeError::BindProxy)?;
58    let proxy_addr = proxy_listener.local_addr().map_err(ServeError::BindProxy)?;
59
60    let (api_listener, api_addr) = if cfg.api_server {
61        let listener = TcpListener::bind(("127.0.0.1", cfg.api_port))
62            .await
63            .map_err(ServeError::BindApi)?;
64        let addr = listener.local_addr().map_err(ServeError::BindApi)?;
65        (Some(listener), Some(addr))
66    } else {
67        (None, None)
68    };
69
70    let meta = store::SessionMeta {
71        session_id: session_id.to_string(),
72        provider: cfg.provider.as_str().into(),
73        upstream: cfg.upstream.to_string(),
74        proxy_port: proxy_addr.port(),
75        api_port: api_addr.map_or(0, |a| a.port()),
76        started_at: chrono::Utc::now(),
77        ended_at: None,
78        request_count: 0,
79        schema_version: 1,
80    };
81    if let Err(err) = store.init_session(meta).await {
82        tracing::warn!(error = %err, "failed to persist initial session metadata");
83    }
84
85    let state = AppState::new(
86        store.clone(),
87        cfg.provider,
88        cfg.upstream.clone(),
89        session_id.clone(),
90        cfg.redact,
91    );
92
93    let proxy_app = proxy::build_proxy_app(state.clone());
94    let events = state.events.clone();
95
96    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
97    let join = spawn_servers(
98        proxy_listener,
99        api_listener,
100        proxy_app,
101        state,
102        shutdown_rx,
103        store,
104        session_id,
105    );
106
107    Ok(ProxyHandle {
108        provider: cfg.provider,
109        upstream: cfg.upstream,
110        proxy_port: proxy_addr.port(),
111        api_port: api_addr.map(|a| a.port()),
112        shutdown_tx: Some(shutdown_tx),
113        join: Some(join),
114        events,
115    })
116}
117
118fn spawn_servers(
119    proxy_listener: TcpListener,
120    api_listener: Option<TcpListener>,
121    proxy_app: axum::Router,
122    state: AppState,
123    shutdown_rx: oneshot::Receiver<()>,
124    store: Arc<dyn Store>,
125    session_id: SessionId,
126) -> JoinHandle<()> {
127    tokio::spawn(async move {
128        let proxy_fut = axum::serve(proxy_listener, proxy_app);
129
130        if let Some(api_listener) = api_listener {
131            let api_app = api::build_api_app(state);
132            let api_fut = axum::serve(api_listener, api_app);
133            tokio::select! {
134                res = proxy_fut => {
135                    if let Err(err) = res {
136                        tracing::warn!(error = %err, "proxy server exited");
137                    }
138                }
139                res = api_fut => {
140                    if let Err(err) = res {
141                        tracing::warn!(error = %err, "api server exited");
142                    }
143                }
144                _ = shutdown_rx => {}
145            }
146        } else {
147            tokio::select! {
148                res = proxy_fut => {
149                    if let Err(err) = res {
150                        tracing::warn!(error = %err, "proxy server exited");
151                    }
152                }
153                _ = shutdown_rx => {}
154            }
155        }
156
157        if let Err(err) = store.finalize_session(session_id.as_str()).await {
158            tracing::warn!(error = %err, "failed to finalize session");
159        }
160    })
161}