pub mod api;
pub mod capture;
mod config;
mod error;
mod handle;
pub mod provider;
pub mod proxy;
mod session;
mod state;
pub mod store;
pub use capture::CaptureEvent;
pub use config::ServeConfig;
pub use error::ServeError;
pub use handle::ProxyHandle;
pub use provider::ProviderKind;
pub use session::SessionId;
pub use state::AppState;
use crate::store::Store;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
#[allow(clippy::too_many_lines)]
pub async fn serve(cfg: ServeConfig) -> Result<ProxyHandle, ServeError> {
if !cfg!(unix) {
return Err(ServeError::UnsupportedPlatform(
"only unix (macOS / Linux) is supported in v1",
));
}
std::fs::create_dir_all(&cfg.data_dir).map_err(|err| ServeError::DataDir {
path: cfg.data_dir.clone(),
source: err,
})?;
let store: Arc<dyn Store> = Arc::new(
store::FsStore::open(cfg.data_dir.clone())
.map_err(|err| ServeError::Internal(anyhow::Error::from(err)))?,
);
let session_id = SessionId::new();
let proxy_listener = TcpListener::bind(("127.0.0.1", cfg.proxy_port))
.await
.map_err(ServeError::BindProxy)?;
let proxy_addr = proxy_listener.local_addr().map_err(ServeError::BindProxy)?;
let (api_listener, api_addr) = if cfg.api_server {
let listener = TcpListener::bind(("127.0.0.1", cfg.api_port))
.await
.map_err(ServeError::BindApi)?;
let addr = listener.local_addr().map_err(ServeError::BindApi)?;
(Some(listener), Some(addr))
} else {
(None, None)
};
let meta = store::SessionMeta {
session_id: session_id.to_string(),
provider: cfg.provider.as_str().into(),
upstream: cfg.upstream.to_string(),
proxy_port: proxy_addr.port(),
api_port: api_addr.map_or(0, |a| a.port()),
started_at: chrono::Utc::now(),
ended_at: None,
request_count: 0,
schema_version: 1,
cwd: None,
models: vec![],
};
if let Err(err) = store.init_session(meta).await {
tracing::warn!(error = %err, "failed to persist initial session metadata");
}
let state = AppState::new(
store.clone(),
cfg.provider,
cfg.upstream.clone(),
session_id.clone(),
cfg.redact,
);
let proxy_app = proxy::build_proxy_app(state.clone());
let events = state.events.clone();
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let join = spawn_servers(
proxy_listener,
api_listener,
proxy_app,
state,
shutdown_rx,
store,
session_id,
);
Ok(ProxyHandle {
provider: cfg.provider,
upstream: cfg.upstream,
proxy_port: proxy_addr.port(),
api_port: api_addr.map(|a| a.port()),
shutdown_tx: Some(shutdown_tx),
join: Some(join),
events,
})
}
fn spawn_servers(
proxy_listener: TcpListener,
api_listener: Option<TcpListener>,
proxy_app: axum::Router,
state: AppState,
shutdown_rx: oneshot::Receiver<()>,
store: Arc<dyn Store>,
session_id: SessionId,
) -> JoinHandle<()> {
tokio::spawn(async move {
let proxy_fut = axum::serve(proxy_listener, proxy_app);
if let Some(api_listener) = api_listener {
let api_app = api::build_api_app(state);
let api_fut = axum::serve(api_listener, api_app);
tokio::select! {
res = proxy_fut => {
if let Err(err) = res {
tracing::warn!(error = %err, "proxy server exited");
}
}
res = api_fut => {
if let Err(err) = res {
tracing::warn!(error = %err, "api server exited");
}
}
_ = shutdown_rx => {}
}
} else {
tokio::select! {
res = proxy_fut => {
if let Err(err) = res {
tracing::warn!(error = %err, "proxy server exited");
}
}
_ = shutdown_rx => {}
}
}
if let Err(err) = store.finalize_session(session_id.as_str()).await {
tracing::warn!(error = %err, "failed to finalize session");
}
})
}