Skip to main content

ccs_proxy/
lib.rs

1//! ccs-proxy: a local logging reverse-proxy + dashboard that captures the
2//! traffic between Claude Code / Codex and their upstream LLM APIs.
3
4pub mod api;
5pub mod capture;
6mod config;
7mod error;
8mod handle;
9pub mod provider;
10pub mod proxy;
11mod session;
12mod state;
13pub mod store;
14
15pub use capture::CaptureEvent;
16pub use config::ServeConfig;
17pub use error::ServeError;
18pub use handle::ProxyHandle;
19pub use provider::ProviderKind;
20pub use session::SessionId;
21pub use state::AppState;
22
23use crate::store::Store;
24use std::sync::Arc;
25use tokio::net::TcpListener;
26use tokio::sync::oneshot;
27use tokio::task::JoinHandle;
28
29/// Bind both the reverse-proxy and the API listeners on `127.0.0.1`, wire
30/// shared [`AppState`], spawn the server task, and return a [`ProxyHandle`]
31/// that the caller can use to discover the bound ports and trigger shutdown.
32///
33/// The configured `proxy_port` / `api_port` of `0` ask the kernel to assign a
34/// free port; the actually-bound ports are recorded in both the returned
35/// handle and the persisted [`store::SessionMeta`].
36#[allow(clippy::too_many_lines)]
37pub async fn serve(cfg: ServeConfig) -> Result<ProxyHandle, ServeError> {
38    if !cfg!(unix) {
39        return Err(ServeError::UnsupportedPlatform(
40            "only unix (macOS / Linux) is supported in v1",
41        ));
42    }
43
44    std::fs::create_dir_all(&cfg.data_dir).map_err(|err| ServeError::DataDir {
45        path: cfg.data_dir.clone(),
46        source: err,
47    })?;
48
49    let store: Arc<dyn Store> = Arc::new(
50        store::FsStore::open(cfg.data_dir.clone())
51            .map_err(|err| ServeError::Internal(anyhow::Error::from(err)))?,
52    );
53    let session_id = SessionId::new();
54
55    let proxy_listener = TcpListener::bind(("127.0.0.1", cfg.proxy_port))
56        .await
57        .map_err(ServeError::BindProxy)?;
58    let proxy_addr = proxy_listener.local_addr().map_err(ServeError::BindProxy)?;
59
60    let (api_listener, api_addr) = if cfg.api_server {
61        let listener = TcpListener::bind(("127.0.0.1", cfg.api_port))
62            .await
63            .map_err(ServeError::BindApi)?;
64        let addr = listener.local_addr().map_err(ServeError::BindApi)?;
65        (Some(listener), Some(addr))
66    } else {
67        (None, None)
68    };
69
70    let meta = store::SessionMeta {
71        session_id: session_id.to_string(),
72        provider: cfg.provider.as_str().into(),
73        upstream: cfg.upstream.to_string(),
74        proxy_port: proxy_addr.port(),
75        api_port: api_addr.map_or(0, |a| a.port()),
76        started_at: chrono::Utc::now(),
77        ended_at: None,
78        request_count: 0,
79        schema_version: 1,
80        cwd: None,
81        models: vec![],
82    };
83    if let Err(err) = store.init_session(meta).await {
84        tracing::warn!(error = %err, "failed to persist initial session metadata");
85    }
86
87    let state = AppState::new(
88        store.clone(),
89        cfg.provider,
90        cfg.upstream.clone(),
91        session_id.clone(),
92        cfg.redact,
93    );
94
95    let proxy_app = proxy::build_proxy_app(state.clone());
96    let events = state.events.clone();
97
98    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
99    let join = spawn_servers(
100        proxy_listener,
101        api_listener,
102        proxy_app,
103        state,
104        shutdown_rx,
105        store,
106        session_id,
107    );
108
109    Ok(ProxyHandle {
110        provider: cfg.provider,
111        upstream: cfg.upstream,
112        proxy_port: proxy_addr.port(),
113        api_port: api_addr.map(|a| a.port()),
114        shutdown_tx: Some(shutdown_tx),
115        join: Some(join),
116        events,
117    })
118}
119
120fn spawn_servers(
121    proxy_listener: TcpListener,
122    api_listener: Option<TcpListener>,
123    proxy_app: axum::Router,
124    state: AppState,
125    shutdown_rx: oneshot::Receiver<()>,
126    store: Arc<dyn Store>,
127    session_id: SessionId,
128) -> JoinHandle<()> {
129    tokio::spawn(async move {
130        let proxy_fut = axum::serve(proxy_listener, proxy_app);
131
132        if let Some(api_listener) = api_listener {
133            let api_app = api::build_api_app(state);
134            let api_fut = axum::serve(api_listener, api_app);
135            tokio::select! {
136                res = proxy_fut => {
137                    if let Err(err) = res {
138                        tracing::warn!(error = %err, "proxy server exited");
139                    }
140                }
141                res = api_fut => {
142                    if let Err(err) = res {
143                        tracing::warn!(error = %err, "api server exited");
144                    }
145                }
146                _ = shutdown_rx => {}
147            }
148        } else {
149            tokio::select! {
150                res = proxy_fut => {
151                    if let Err(err) = res {
152                        tracing::warn!(error = %err, "proxy server exited");
153                    }
154                }
155                _ = shutdown_rx => {}
156            }
157        }
158
159        if let Err(err) = store.finalize_session(session_id.as_str()).await {
160            tracing::warn!(error = %err, "failed to finalize session");
161        }
162    })
163}