crabllm_proxy/state.rs
1use arc_swap::ArcSwap;
2use crabllm_core::{Extension, GatewayConfig, Provider, Storage};
3use crabllm_provider::ProviderRegistry;
4use std::{
5 collections::HashMap,
6 sync::{Arc, RwLock},
7 time::SystemTime,
8};
9use tokio::sync::broadcast;
10
11/// Per-request event emitted after a request completes. Embedders
12/// subscribe to the [`AppState::usage_events`] broadcast channel to
13/// observe live traffic without scraping the Prometheus endpoint.
14#[derive(Debug, Clone)]
15pub struct UsageEvent {
16 pub timestamp: SystemTime,
17 pub request_id: String,
18 pub principal: Option<String>,
19 pub model: String,
20 pub provider: String,
21 /// Logical endpoint: `"chat.completions"`, `"embeddings"`,
22 /// `"images.generations"`, `"audio.speech"`, `"audio.transcriptions"`.
23 pub endpoint: &'static str,
24 /// Full canonical usage with all axes (input, cache_read, cache_write,
25 /// output, reasoning, audio, per-call tools). Embeds the same type that
26 /// drives billing in [`crabllm_core::ModelInfo::cost`], so subscribers
27 /// see exactly what was charged for.
28 pub usage: crabllm_core::Usage,
29 pub duration_ms: u64,
30 /// The wire HTTP status the client observed, or `0` when a
31 /// streaming chat response sent 200 OK headers and then broke
32 /// mid-stream. `0` is a sentinel meaning "not a real HTTP
33 /// response" — consumers branching on `status` alone can
34 /// distinguish a clean 200 from a failed stream without having
35 /// to inspect [`Self::error`]. For non-streaming requests
36 /// `status` is always the real HTTP code the client saw.
37 pub status: u16,
38 /// `Some(msg)` if the request failed. Set alongside `status == 0`
39 /// for mid-stream streaming failures; set alongside a real error
40 /// status (4xx/5xx) for pre-stream and non-streaming failures.
41 pub error: Option<String>,
42}
43
44/// Shared application state passed to all handlers.
45///
46/// Generic over the storage backend `S` and the provider type `P`. The
47/// binary picks `P` by defining a workspace-level union enum that wraps
48/// every provider source it links — that enum implements `Provider` via
49/// match-and-delegate, so dispatch through `P` is fully monomorphized.
50pub struct AppState<S: Storage, P: Provider> {
51 pub registry: Arc<ArcSwap<ProviderRegistry<P>>>,
52 pub config: GatewayConfig,
53 pub extensions: Arc<Vec<Box<dyn Extension>>>,
54 pub storage: Arc<S>,
55 /// Precomputed token → key name lookup for O(1) auth.
56 /// Wrapped in RwLock to support runtime key management.
57 pub key_map: Arc<RwLock<HashMap<String, String>>>,
58 /// Optional broadcast sink for per-request [`UsageEvent`]s. `None`
59 /// is a no-op — the standalone `crabllm serve` binary leaves it
60 /// unset and behavior is unchanged. Embedders that want live
61 /// traffic construct a sender, pass `Some(sender.clone())`, and
62 /// call `sender.subscribe()` to observe events.
63 pub usage_events: Option<broadcast::Sender<UsageEvent>>,
64}
65
66impl<S: Storage, P: Provider> Clone for AppState<S, P> {
67 fn clone(&self) -> Self {
68 Self {
69 registry: self.registry.clone(),
70 config: self.config.clone(),
71 extensions: self.extensions.clone(),
72 storage: self.storage.clone(),
73 key_map: self.key_map.clone(),
74 usage_events: self.usage_events.clone(),
75 }
76 }
77}
78
79impl<S: Storage, P: Provider> AppState<S, P> {
80 /// Snapshot the current provider registry. Returns a guard that
81 /// keeps the referenced registry alive for the duration of the
82 /// request — even if a concurrent swap replaces the global pointer.
83 pub fn registry(&self) -> arc_swap::Guard<Arc<ProviderRegistry<P>>> {
84 self.registry.load()
85 }
86}