Skip to main content

kaizen/proxy/
server.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Local HTTP server: any path and method, forward to Anthropic base URL.
3
4use crate::proxy::forward::run_forward_inner;
5use crate::proxy::opts::ProxyRunOptions;
6use crate::proxy::state::ProxyState;
7use axum::Router;
8use axum::http::{Request, StatusCode};
9use axum::response::IntoResponse;
10use axum::response::Response;
11use axum::routing::any;
12use std::net::SocketAddr;
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use axum::extract::DefaultBodyLimit;
17
18/// Forward any method+path+query+body, buffer upstream response, append one store row.
19async fn handle(
20    axum::extract::State(st): axum::extract::State<Arc<ProxyState>>,
21    request: Request<axum::body::Body>,
22) -> axum::response::Response {
23    match do_forward(&st, request).await {
24        Ok(r) => r,
25        Err(e) => (StatusCode::BAD_GATEWAY, e.to_string()).into_response(),
26    }
27}
28
29async fn do_forward(
30    st: &Arc<ProxyState>,
31    request: Request<axum::body::Body>,
32) -> Result<Response, anyhow::Error> {
33    let (parts, body) = request.into_parts();
34    let method = parts.method;
35    let path = parts.uri.path().trim_start_matches('/').to_string();
36    let q = parts.uri.query().unwrap_or("").to_string();
37    let headers = &parts.headers;
38    let body = axum::body::to_bytes(body, st.options.max_request_bytes as usize).await?;
39    let path_ref = if path.is_empty() { "" } else { &path };
40    run_forward_inner(st, method, path_ref, &q, headers, &body).await
41}
42
43/// Build `Client`, bind, run until the process is killed.
44pub async fn run(
45    options: Arc<ProxyRunOptions>,
46    workspace: PathBuf,
47    config: crate::core::config::Config,
48) -> Result<(), anyhow::Error> {
49    let addr: SocketAddr = options
50        .listen
51        .parse()
52        .map_err(|e: std::net::AddrParseError| {
53            anyhow::anyhow!(r#"bad --listen (expected e.g. "127.0.0.1:3847"): {e}"#)
54        })?;
55    let listener = tokio::net::TcpListener::bind(addr).await?;
56    run_with_listener(options, workspace, config, listener).await
57}
58
59/// Serve on a pre-bound listener so daemon supervisor can publish the chosen endpoint.
60pub async fn run_with_listener(
61    options: Arc<ProxyRunOptions>,
62    workspace: PathBuf,
63    config: crate::core::config::Config,
64    listener: tokio::net::TcpListener,
65) -> Result<(), anyhow::Error> {
66    let store_path = crate::core::workspace::db_path(&workspace)?;
67    let client = build_client(&options)?;
68    let st = Arc::new(ProxyState {
69        options: options.clone(),
70        store_path,
71        workspace: workspace.clone(),
72        config: Arc::new(config),
73        client,
74    });
75    let limit = usize::try_from(st.options.max_request_bytes).unwrap_or(usize::MAX);
76    let app = Router::new()
77        .route("/{*path}", any(handle))
78        .layer(DefaultBodyLimit::max(limit))
79        .with_state(st);
80    let seen = listener.local_addr()?;
81    tracing::info!(
82        addr = %seen,
83        provider = %options.provider,
84        "kaizen LLM proxy listening"
85    );
86    axum::serve(listener, app).await?;
87    Ok(())
88}
89
90fn build_client(o: &ProxyRunOptions) -> Result<reqwest::Client, reqwest::Error> {
91    use std::time::Duration;
92    let mut b = reqwest::Client::builder()
93        .connect_timeout(Duration::from_secs(30))
94        .timeout(Duration::from_secs(300));
95    if !o.compress_transport {
96        b = b.no_gzip();
97    }
98    b.build()
99}