crabllm_proxy/state.rs
1use arc_swap::ArcSwap;
2use crabllm_core::{Extension, GatewayConfig, Provider, Storage};
3use crabllm_provider::ProviderRegistry;
4use std::{
5 collections::HashMap,
6 sync::{Arc, RwLock},
7 time::SystemTime,
8};
9use tokio::sync::broadcast;
10
11/// Per-request event emitted after a request completes. Embedders
12/// subscribe to the [`AppState::usage_events`] broadcast channel to
13/// observe live traffic without scraping the Prometheus endpoint.
14#[derive(Debug, Clone)]
15pub struct UsageEvent {
16 pub timestamp: SystemTime,
17 pub request_id: String,
18 pub key_name: Option<String>,
19 pub model: String,
20 pub provider: String,
21 /// Logical endpoint: `"chat.completions"`, `"embeddings"`,
22 /// `"images.generations"`, `"audio.speech"`, `"audio.transcriptions"`.
23 pub endpoint: &'static str,
24 /// Prompt / input tokens. For embeddings this is the input token
25 /// count; for image / audio endpoints it's 0.
26 pub tokens_in: u32,
27 /// Completion / output tokens. 0 for endpoints that don't generate
28 /// tokens (embeddings, images, audio).
29 pub tokens_out: u32,
30 pub duration_ms: u64,
31 /// The wire HTTP status the client observed, or `0` when a
32 /// streaming chat response sent 200 OK headers and then broke
33 /// mid-stream. `0` is a sentinel meaning "not a real HTTP
34 /// response" — consumers branching on `status` alone can
35 /// distinguish a clean 200 from a failed stream without having
36 /// to inspect [`Self::error`]. For non-streaming requests
37 /// `status` is always the real HTTP code the client saw.
38 pub status: u16,
39 /// `Some(msg)` if the request failed. Set alongside `status == 0`
40 /// for mid-stream streaming failures; set alongside a real error
41 /// status (4xx/5xx) for pre-stream and non-streaming failures.
42 pub error: Option<String>,
43}
44
45/// Shared application state passed to all handlers.
46///
47/// Generic over the storage backend `S` and the provider type `P`. The
48/// binary picks `P` by defining a workspace-level union enum that wraps
49/// every provider source it links — that enum implements `Provider` via
50/// match-and-delegate, so dispatch through `P` is fully monomorphized.
51pub struct AppState<S: Storage, P: Provider> {
52 pub registry: Arc<ArcSwap<ProviderRegistry<P>>>,
53 pub config: GatewayConfig,
54 pub extensions: Arc<Vec<Box<dyn Extension>>>,
55 pub storage: Arc<S>,
56 /// Precomputed token → key name lookup for O(1) auth.
57 /// Wrapped in RwLock to support runtime key management.
58 pub key_map: Arc<RwLock<HashMap<String, String>>>,
59 /// Optional broadcast sink for per-request [`UsageEvent`]s. `None`
60 /// is a no-op — the standalone `crabllm serve` binary leaves it
61 /// unset and behavior is unchanged. Embedders that want live
62 /// traffic construct a sender, pass `Some(sender.clone())`, and
63 /// call `sender.subscribe()` to observe events.
64 pub usage_events: Option<broadcast::Sender<UsageEvent>>,
65}
66
67impl<S: Storage, P: Provider> Clone for AppState<S, P> {
68 fn clone(&self) -> Self {
69 Self {
70 registry: self.registry.clone(),
71 config: self.config.clone(),
72 extensions: self.extensions.clone(),
73 storage: self.storage.clone(),
74 key_map: self.key_map.clone(),
75 usage_events: self.usage_events.clone(),
76 }
77 }
78}
79
80impl<S: Storage, P: Provider> AppState<S, P> {
81 /// Snapshot the current provider registry. Returns a guard that
82 /// keeps the referenced registry alive for the duration of the
83 /// request — even if a concurrent swap replaces the global pointer.
84 pub fn registry(&self) -> arc_swap::Guard<Arc<ProviderRegistry<P>>> {
85 self.registry.load()
86 }
87}