Skip to main content

crabllm_proxy/
state.rs

1use arc_swap::ArcSwap;
2use crabllm_core::{Extension, GatewayConfig, Provider, Storage};
3use crabllm_provider::ProviderRegistry;
4use std::{
5    collections::HashMap,
6    sync::{Arc, RwLock},
7    time::SystemTime,
8};
9use tokio::sync::broadcast;
10
11/// Per-request event emitted after a request completes. Embedders
12/// subscribe to the [`AppState::usage_events`] broadcast channel to
13/// observe live traffic without scraping the Prometheus endpoint.
14#[derive(Debug, Clone)]
15pub struct UsageEvent {
16    pub timestamp: SystemTime,
17    pub request_id: String,
18    pub key_name: Option<String>,
19    pub model: String,
20    pub provider: String,
21    /// Logical endpoint: `"chat.completions"`, `"embeddings"`,
22    /// `"images.generations"`, `"audio.speech"`, `"audio.transcriptions"`.
23    pub endpoint: &'static str,
24    /// Prompt / input tokens. For embeddings this is the input token
25    /// count; for image / audio endpoints it's 0.
26    pub tokens_in: u32,
27    /// Completion / output tokens. 0 for endpoints that don't generate
28    /// tokens (embeddings, images, audio).
29    pub tokens_out: u32,
30    pub duration_ms: u64,
31    /// The wire HTTP status the client observed, or `0` when a
32    /// streaming chat response sent 200 OK headers and then broke
33    /// mid-stream. `0` is a sentinel meaning "not a real HTTP
34    /// response" — consumers branching on `status` alone can
35    /// distinguish a clean 200 from a failed stream without having
36    /// to inspect [`Self::error`]. For non-streaming requests
37    /// `status` is always the real HTTP code the client saw.
38    pub status: u16,
39    /// `Some(msg)` if the request failed. Set alongside `status == 0`
40    /// for mid-stream streaming failures; set alongside a real error
41    /// status (4xx/5xx) for pre-stream and non-streaming failures.
42    pub error: Option<String>,
43}
44
45/// Shared application state passed to all handlers.
46///
47/// Generic over the storage backend `S` and the provider type `P`. The
48/// binary picks `P` by defining a workspace-level union enum that wraps
49/// every provider source it links — that enum implements `Provider` via
50/// match-and-delegate, so dispatch through `P` is fully monomorphized.
51pub struct AppState<S: Storage, P: Provider> {
52    pub registry: Arc<ArcSwap<ProviderRegistry<P>>>,
53    pub config: GatewayConfig,
54    pub extensions: Arc<Vec<Box<dyn Extension>>>,
55    pub storage: Arc<S>,
56    /// Precomputed token → key name lookup for O(1) auth.
57    /// Wrapped in RwLock to support runtime key management.
58    pub key_map: Arc<RwLock<HashMap<String, String>>>,
59    /// Optional broadcast sink for per-request [`UsageEvent`]s. `None`
60    /// is a no-op — the standalone `crabllm serve` binary leaves it
61    /// unset and behavior is unchanged. Embedders that want live
62    /// traffic construct a sender, pass `Some(sender.clone())`, and
63    /// call `sender.subscribe()` to observe events.
64    pub usage_events: Option<broadcast::Sender<UsageEvent>>,
65}
66
67impl<S: Storage, P: Provider> Clone for AppState<S, P> {
68    fn clone(&self) -> Self {
69        Self {
70            registry: self.registry.clone(),
71            config: self.config.clone(),
72            extensions: self.extensions.clone(),
73            storage: self.storage.clone(),
74            key_map: self.key_map.clone(),
75            usage_events: self.usage_events.clone(),
76        }
77    }
78}
79
80impl<S: Storage, P: Provider> AppState<S, P> {
81    /// Snapshot the current provider registry. Returns a guard that
82    /// keeps the referenced registry alive for the duration of the
83    /// request — even if a concurrent swap replaces the global pointer.
84    pub fn registry(&self) -> arc_swap::Guard<Arc<ProviderRegistry<P>>> {
85        self.registry.load()
86    }
87}