Skip to main content

crabllm_proxy/
state.rs

1use arc_swap::ArcSwap;
2use crabllm_core::{Extension, GatewayConfig, Provider, Storage};
3use crabllm_provider::ProviderRegistry;
4use std::{
5    collections::HashMap,
6    sync::{Arc, RwLock},
7    time::SystemTime,
8};
9use tokio::sync::broadcast;
10
11/// Per-request event emitted after a request completes. Embedders
12/// subscribe to the [`AppState::usage_events`] broadcast channel to
13/// observe live traffic without scraping the Prometheus endpoint.
14#[derive(Debug, Clone)]
15pub struct UsageEvent {
16    pub timestamp: SystemTime,
17    pub request_id: String,
18    pub principal: Option<String>,
19    pub model: String,
20    pub provider: String,
21    /// Logical endpoint: `"chat.completions"`, `"embeddings"`,
22    /// `"images.generations"`, `"audio.speech"`, `"audio.transcriptions"`.
23    pub endpoint: &'static str,
24    /// Full canonical usage with all axes (input, cache_read, cache_write,
25    /// output, reasoning, audio, per-call tools). Embeds the same type that
26    /// drives billing in [`crabllm_core::ModelInfo::cost`], so subscribers
27    /// see exactly what was charged for.
28    pub usage: crabllm_core::Usage,
29    pub duration_ms: u64,
30    /// The wire HTTP status the client observed, or `0` when a
31    /// streaming chat response sent 200 OK headers and then broke
32    /// mid-stream. `0` is a sentinel meaning "not a real HTTP
33    /// response" — consumers branching on `status` alone can
34    /// distinguish a clean 200 from a failed stream without having
35    /// to inspect [`Self::error`]. For non-streaming requests
36    /// `status` is always the real HTTP code the client saw.
37    pub status: u16,
38    /// `Some(msg)` if the request failed. Set alongside `status == 0`
39    /// for mid-stream streaming failures; set alongside a real error
40    /// status (4xx/5xx) for pre-stream and non-streaming failures.
41    pub error: Option<String>,
42}
43
44/// Shared application state passed to all handlers.
45///
46/// Generic over the storage backend `S` and the provider type `P`. The
47/// binary picks `P` by defining a workspace-level union enum that wraps
48/// every provider source it links — that enum implements `Provider` via
49/// match-and-delegate, so dispatch through `P` is fully monomorphized.
50pub struct AppState<S: Storage, P: Provider> {
51    pub registry: Arc<ArcSwap<ProviderRegistry<P>>>,
52    pub config: GatewayConfig,
53    pub extensions: Arc<Vec<Box<dyn Extension>>>,
54    pub storage: Arc<S>,
55    /// Precomputed token → key name lookup for O(1) auth.
56    /// Wrapped in RwLock to support runtime key management.
57    pub key_map: Arc<RwLock<HashMap<String, String>>>,
58    /// Optional broadcast sink for per-request [`UsageEvent`]s. `None`
59    /// is a no-op — the standalone `crabllm serve` binary leaves it
60    /// unset and behavior is unchanged. Embedders that want live
61    /// traffic construct a sender, pass `Some(sender.clone())`, and
62    /// call `sender.subscribe()` to observe events.
63    pub usage_events: Option<broadcast::Sender<UsageEvent>>,
64}
65
66impl<S: Storage, P: Provider> Clone for AppState<S, P> {
67    fn clone(&self) -> Self {
68        Self {
69            registry: self.registry.clone(),
70            config: self.config.clone(),
71            extensions: self.extensions.clone(),
72            storage: self.storage.clone(),
73            key_map: self.key_map.clone(),
74            usage_events: self.usage_events.clone(),
75        }
76    }
77}
78
79impl<S: Storage, P: Provider> AppState<S, P> {
80    /// Snapshot the current provider registry. Returns a guard that
81    /// keeps the referenced registry alive for the duration of the
82    /// request — even if a concurrent swap replaces the global pointer.
83    pub fn registry(&self) -> arc_swap::Guard<Arc<ProviderRegistry<P>>> {
84        self.registry.load()
85    }
86}