Skip to main content

crabllm_proxy/
lib.rs

1use axum::{
2    Json, Router,
3    extract::{DefaultBodyLimit, Request},
4    middleware,
5    response::Response,
6    routing::{get, post},
7};
8use crabllm_core::{Prefix, Provider, Storage};
9
10pub use auth::Principal;
11pub use state::{AppState, UsageEvent};
12
13// Storage table prefixes. Each 4-byte prefix namespaces a logical table
14// in the key-value storage backend.
15pub const PREFIX_KEYS: Prefix = *b"keys";
16pub const PREFIX_RATE_LIMIT: Prefix = *b"rlim";
17pub const PREFIX_USAGE: Prefix = *b"usge";
18pub const PREFIX_CACHE: Prefix = *b"cach";
19pub const PREFIX_BUDGET: Prefix = *b"bdgt";
20pub const PREFIX_AUDIT: Prefix = *b"alog";
21pub const PREFIX_PROVIDERS: Prefix = *b"prvd";
22
23pub mod admin;
24pub mod admin_providers;
25pub mod anthropic;
26pub mod auth;
27mod body;
28pub mod ext;
29pub mod gemini;
30pub mod handlers;
31#[cfg(feature = "openapi")]
32pub mod openapi;
33mod state;
34pub mod storage;
35
36/// Middleware that tracks the number of in-flight API requests.
37/// For SSE streams, the gauge decrements when the response starts (not when the
38/// stream ends), so it undercounts long-lived streaming connections.
39async fn track_active_connections(request: Request, next: middleware::Next) -> Response {
40    metrics::gauge!("crabllm_active_connections").increment(1.0);
41    let response = next.run(request).await;
42    metrics::gauge!("crabllm_active_connections").decrement(1.0);
43    response
44}
45
46/// Middleware that logs every incoming HTTP request with method, path,
47/// status, and latency. 2xx/3xx log at `info`, 4xx/5xx at `warn`.
48/// `/health` and `/metrics` log at `debug` so probes don't flood the
49/// default output.
50pub async fn log_request(request: Request, next: middleware::Next) -> Response {
51    let method = request.method().clone();
52    let path = request.uri().path().to_string();
53    let start = std::time::Instant::now();
54
55    let response = next.run(request).await;
56    let status = response.status();
57    let latency_ms = start.elapsed().as_millis() as u64;
58    let is_probe = path == "/health" || path == "/metrics";
59
60    if is_probe {
61        tracing::debug!(%method, path, status = status.as_u16(), latency_ms, "request");
62    } else if status.is_client_error() || status.is_server_error() {
63        tracing::warn!(%method, path, status = status.as_u16(), latency_ms, "request");
64    } else {
65        tracing::info!(%method, path, status = status.as_u16(), latency_ms, "request");
66    }
67
68    response
69}
70
71/// Build the bare API route tree (`/v1/*`) bound to `state`, with **no
72/// auth, observability, or admin layers**. Embedders use this when they
73/// want to install their own auth or observability middleware instead of
74/// the defaults in [`router`]. Standalone callers should prefer [`router`].
75pub fn routes<S, P>(state: AppState<S, P>) -> Router
76where
77    S: Storage + 'static,
78    P: Provider + 'static,
79{
80    Router::<AppState<S, P>>::new()
81        .route(
82            "/v1/chat/completions",
83            post(handlers::chat_completions::<S, P>),
84        )
85        .route("/v1/messages", post(anthropic::messages::<S, P>))
86        .route(
87            "/v1beta/models/{model_action}",
88            post(gemini::generate_content::<S, P>),
89        )
90        .route("/v1/embeddings", post(handlers::embeddings::<S, P>))
91        .route(
92            "/v1/images/generations",
93            post(handlers::image_generations::<S, P>),
94        )
95        .route("/v1/audio/speech", post(handlers::audio_speech::<S, P>))
96        .route(
97            "/v1/audio/transcriptions",
98            post(handlers::audio_transcriptions::<S, P>),
99        )
100        .route("/v1/models", get(handlers::models::<S, P>))
101        .route("/v1/usage", get(handlers::usage::<S, P>))
102        .layer(DefaultBodyLimit::max(8 * 1024 * 1024))
103        .with_state(state)
104}
105
106/// Build the Axum router with all API routes, the built-in auth middleware,
107/// active-connection tracking, `/health`, and merged admin routes. This is
108/// the batteries-included shape used by the standalone gateway binary.
109pub fn router<S, P>(state: AppState<S, P>, admin_routes: Vec<Router>) -> Router
110where
111    S: Storage + 'static,
112    P: Provider + 'static,
113{
114    let mut app = routes(state.clone())
115        .layer(middleware::from_fn_with_state(state, auth::auth::<S, P>))
116        .layer(middleware::from_fn(track_active_connections));
117
118    // Health check — outside auth middleware so load balancers can probe it.
119    app = app.route(
120        "/health",
121        get(|| async { Json(serde_json::json!({"status": "ok"})) }),
122    );
123
124    // Merge extension-provided admin routes (stateless — extensions
125    // capture their own state via closures in the Router<()>).
126    for admin_router in admin_routes {
127        app = app.merge(admin_router);
128    }
129
130    app
131}