1pub mod api;
5pub mod capture;
6mod config;
7mod error;
8mod handle;
9pub mod provider;
10pub mod proxy;
11mod session;
12mod state;
13pub mod store;
14
15pub use capture::CaptureEvent;
16pub use config::ServeConfig;
17pub use error::ServeError;
18pub use handle::ProxyHandle;
19pub use provider::ProviderKind;
20pub use session::SessionId;
21pub use state::AppState;
22
23use crate::store::Store;
24use std::sync::Arc;
25use tokio::net::TcpListener;
26use tokio::sync::oneshot;
27use tokio::task::JoinHandle;
28
29#[allow(clippy::too_many_lines)]
37pub async fn serve(cfg: ServeConfig) -> Result<ProxyHandle, ServeError> {
38 if !cfg!(unix) {
39 return Err(ServeError::UnsupportedPlatform(
40 "only unix (macOS / Linux) is supported in v1",
41 ));
42 }
43
44 std::fs::create_dir_all(&cfg.data_dir).map_err(|err| ServeError::DataDir {
45 path: cfg.data_dir.clone(),
46 source: err,
47 })?;
48
49 let store: Arc<dyn Store> = Arc::new(
50 store::FsStore::open(cfg.data_dir.clone())
51 .map_err(|err| ServeError::Internal(anyhow::Error::from(err)))?,
52 );
53 let session_id = SessionId::new();
54
55 let proxy_listener = TcpListener::bind(("127.0.0.1", cfg.proxy_port))
56 .await
57 .map_err(ServeError::BindProxy)?;
58 let proxy_addr = proxy_listener.local_addr().map_err(ServeError::BindProxy)?;
59
60 let (api_listener, api_addr) = if cfg.api_server {
61 let listener = TcpListener::bind(("127.0.0.1", cfg.api_port))
62 .await
63 .map_err(ServeError::BindApi)?;
64 let addr = listener.local_addr().map_err(ServeError::BindApi)?;
65 (Some(listener), Some(addr))
66 } else {
67 (None, None)
68 };
69
70 let meta = store::SessionMeta {
71 session_id: session_id.to_string(),
72 provider: cfg.provider.as_str().into(),
73 upstream: cfg.upstream.to_string(),
74 proxy_port: proxy_addr.port(),
75 api_port: api_addr.map_or(0, |a| a.port()),
76 started_at: chrono::Utc::now(),
77 ended_at: None,
78 request_count: 0,
79 schema_version: 1,
80 cwd: None,
81 models: vec![],
82 };
83 if let Err(err) = store.init_session(meta).await {
84 tracing::warn!(error = %err, "failed to persist initial session metadata");
85 }
86
87 let state = AppState::new(
88 store.clone(),
89 cfg.provider,
90 cfg.upstream.clone(),
91 session_id.clone(),
92 cfg.redact,
93 );
94
95 let proxy_app = proxy::build_proxy_app(state.clone());
96 let events = state.events.clone();
97
98 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
99 let join = spawn_servers(
100 proxy_listener,
101 api_listener,
102 proxy_app,
103 state,
104 shutdown_rx,
105 store,
106 session_id,
107 );
108
109 Ok(ProxyHandle {
110 provider: cfg.provider,
111 upstream: cfg.upstream,
112 proxy_port: proxy_addr.port(),
113 api_port: api_addr.map(|a| a.port()),
114 shutdown_tx: Some(shutdown_tx),
115 join: Some(join),
116 events,
117 })
118}
119
120fn spawn_servers(
121 proxy_listener: TcpListener,
122 api_listener: Option<TcpListener>,
123 proxy_app: axum::Router,
124 state: AppState,
125 shutdown_rx: oneshot::Receiver<()>,
126 store: Arc<dyn Store>,
127 session_id: SessionId,
128) -> JoinHandle<()> {
129 tokio::spawn(async move {
130 let proxy_fut = axum::serve(proxy_listener, proxy_app);
131
132 if let Some(api_listener) = api_listener {
133 let api_app = api::build_api_app(state);
134 let api_fut = axum::serve(api_listener, api_app);
135 tokio::select! {
136 res = proxy_fut => {
137 if let Err(err) = res {
138 tracing::warn!(error = %err, "proxy server exited");
139 }
140 }
141 res = api_fut => {
142 if let Err(err) = res {
143 tracing::warn!(error = %err, "api server exited");
144 }
145 }
146 _ = shutdown_rx => {}
147 }
148 } else {
149 tokio::select! {
150 res = proxy_fut => {
151 if let Err(err) = res {
152 tracing::warn!(error = %err, "proxy server exited");
153 }
154 }
155 _ = shutdown_rx => {}
156 }
157 }
158
159 if let Err(err) = store.finalize_session(session_id.as_str()).await {
160 tracing::warn!(error = %err, "failed to finalize session");
161 }
162 })
163}