Skip to main content

oxillama_server/
state.rs

1//! Shared application state for the API server.
2//!
3//! `AppState` carries all read/write shared data needed by route handlers:
4//! - The inference request queue (mpsc sender).
5//! - Cached model metadata (id, sampler, vocab, hidden size).
6//! - Metrics store.
7//! - In-memory batch store (legacy).
8//! - Disk-backed batch store + queue sender (C3).
9//! - Multi-model LRU pool (C1), protected by a `Mutex` for admin mutations.
10//! - Prefix KV cache for system-prompt reuse across requests.
11//! - LoRA adapter registry (name → `Arc<LoadedLora>`).
12//! - Persistent thread store + run queue (Assistants API).
13
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex, RwLock};
16use std::time::SystemTime;
17
18use tokio::sync::mpsc;
19
20use crate::batch::{new_batch_store, BatchStore};
21use crate::batch_spool::{BatchQueueSender, BatchStore as DiskBatchStore};
22use crate::files_store::FilesStore;
23use crate::metrics::Metrics;
24use crate::queue::{BatchRequest, VocabBytes};
25use crate::rate_limit::PerKeyRateLimiter;
26use crate::responses_store::ResponseStore;
27use crate::router::ModelPool;
28use crate::threads::stream::RunEventSender;
29use crate::threads::{RunQueueSender, ThreadStore};
30
31use oxillama_runtime::sampling::SamplerConfig;
32use oxillama_runtime::{LoadedLora, PrefixCacheConfig, PrefixKvCache};
33
34/// Shared application state accessible by all route handlers.
35///
36/// All inference is delegated to the single background worker via `queue`.
37/// Read-only metadata (model ID, default sampler, vocabulary, hidden size)
38/// is cached here so handlers never need to reach into the engine.
39pub struct AppState {
40    /// Channel to send inference requests to the worker.
41    pub queue: mpsc::Sender<BatchRequest>,
42
43    /// The model name/identifier for API responses.
44    pub model_id: String,
45
46    /// Unix timestamp (seconds) when the model was loaded.
47    pub loaded_at: u64,
48
49    /// Default sampler configuration read from `EngineConfig` at startup.
50    ///
51    /// Route handlers clone this and apply per-request overrides on top.
52    pub default_sampler: SamplerConfig,
53
54    /// Vocabulary byte table used for grammar-constrained sampling.
55    ///
56    /// `None` when the model has no tokenizer (should not happen at serve time).
57    pub vocab_bytes: Option<VocabBytes>,
58
59    /// Hidden-state dimension for the `/v1/embeddings` endpoint.
60    pub hidden_size: usize,
61
62    /// Shared metrics store.
63    pub metrics: Arc<Metrics>,
64
65    /// In-memory batch job registry (legacy OpenAI batch compat layer).
66    pub batch_store: BatchStore,
67
68    /// Disk-backed batch job store (C3: disk-spool backend).
69    pub batch_disk_store: Arc<DiskBatchStore>,
70
71    /// Sender into the disk-backed batch processing queue (C3).
72    pub batch_queue_tx: BatchQueueSender,
73
74    /// Multi-model LRU warm-pool (C1).
75    ///
76    /// Wrapped in `Mutex` so admin routes can mutate it without blocking the
77    /// inference worker. In the current single-worker design the worker also
78    /// holds the pool; admin mutations use `try_lock` to avoid deadlocks.
79    pub model_pool: Mutex<ModelPool>,
80
81    /// Prefix KV cache for system-prompt reuse across requests.
82    ///
83    /// When a new request shares a long prefix with a previously-cached
84    /// sequence (e.g. a fixed system prompt), the matching KV state is
85    /// restored and only the suffix tokens need a fresh prefill pass.
86    pub prefix_cache: Arc<Mutex<PrefixKvCache>>,
87
88    /// Loaded LoRA adapter registry: stable name → `Arc<LoadedLora>`.
89    ///
90    /// Populated via `POST /admin/loras`.  Request handlers look up adapters
91    /// by name and pass them to the worker via `BatchRequest::Generate`.
92    pub loras: Arc<RwLock<HashMap<String, Arc<LoadedLora>>>>,
93
94    /// Persistent thread/message/run store for the Assistants API.
95    ///
96    /// `None` when the Assistants API has not been configured (no `--threads-dir`
97    /// flag was passed at startup).  Route handlers return 503 in this case.
98    pub threads_store: Option<Arc<ThreadStore>>,
99
100    /// Sender into the run processing queue for the Assistants API.
101    ///
102    /// `None` when `threads_store` is `None`.
103    pub run_queue_tx: Option<RunQueueSender>,
104
105    /// Persistent files store for the Files API (`/v1/files`).
106    ///
107    /// `None` when the Files API has not been configured.
108    pub files_store: Option<Arc<FilesStore>>,
109
110    /// Broadcast sender for run lifecycle events (SSE streaming).
111    ///
112    /// `None` when SSE streaming is not enabled.
113    pub run_event_tx_broadcast: Option<RunEventSender>,
114
115    /// In-memory store for Responses API objects.
116    ///
117    /// `None` when the Responses API has not been enabled.  Route handlers
118    /// return 503 (`ModelNotReady`) in this case.
119    pub responses_store: Option<Arc<ResponseStore>>,
120
121    /// Per-API-key token-bucket rate limiter.
122    ///
123    /// `None` when per-key rate limiting has not been configured.
124    pub per_key_rate_limiter: Option<Arc<PerKeyRateLimiter>>,
125}
126
127impl AppState {
128    /// Create new app state from all required fields.
129    ///
130    /// `queue` must be connected to a live inference worker.
131    pub fn new(
132        queue: mpsc::Sender<BatchRequest>,
133        model_id: String,
134        default_sampler: SamplerConfig,
135        vocab_bytes: Option<VocabBytes>,
136        hidden_size: usize,
137    ) -> Self {
138        let loaded_at = SystemTime::now()
139            .duration_since(SystemTime::UNIX_EPOCH)
140            .map(|d| d.as_secs())
141            .unwrap_or(0);
142
143        // Default disk store goes to a temp-dir location when not configured.
144        let spool_dir = std::env::temp_dir().join("oxillama_batch_spool");
145        let batch_disk_store = Arc::new(DiskBatchStore::new(spool_dir).unwrap_or_else(|_| {
146            DiskBatchStore::new(std::env::temp_dir()).expect("fallback spool dir")
147        }));
148
149        // Create a no-op batch queue (capacity 0 → sends will fail gracefully).
150        let (batch_queue_tx, _) =
151            tokio::sync::mpsc::channel::<crate::batch_spool::BatchWorkItem>(1);
152
153        Self {
154            queue,
155            model_id,
156            loaded_at,
157            default_sampler,
158            vocab_bytes,
159            hidden_size,
160            metrics: Arc::new(Metrics::new()),
161            batch_store: new_batch_store(),
162            batch_disk_store,
163            batch_queue_tx,
164            model_pool: Mutex::new(ModelPool::new(4, 0)),
165            prefix_cache: Arc::new(Mutex::new(PrefixKvCache::new(PrefixCacheConfig::default()))),
166            loras: Arc::new(RwLock::new(HashMap::new())),
167            threads_store: None,
168            run_queue_tx: None,
169            files_store: None,
170            run_event_tx_broadcast: None,
171            responses_store: None,
172            per_key_rate_limiter: None,
173        }
174    }
175
176    /// Attach a threads store and run queue to this `AppState`.
177    ///
178    /// Returns `self` with the `threads_store` and `run_queue_tx` fields
179    /// populated.  Designed for use in a builder chain:
180    ///
181    /// ```text
182    /// let state = AppState::new(...).with_threads(store, tx);
183    /// ```
184    pub fn with_threads(mut self, store: Arc<ThreadStore>, tx: RunQueueSender) -> Self {
185        self.threads_store = Some(store);
186        self.run_queue_tx = Some(tx);
187        self
188    }
189
190    /// Attach a files store to this `AppState`.
191    pub fn with_files(mut self, store: Arc<FilesStore>) -> Self {
192        self.files_store = Some(store);
193        self
194    }
195
196    /// Attach a run-event broadcast sender to this `AppState`.
197    ///
198    /// When set, the run worker broadcasts lifecycle events that SSE handlers
199    /// can subscribe to.
200    pub fn with_run_event_sender(mut self, tx: RunEventSender) -> Self {
201        self.run_event_tx_broadcast = Some(tx);
202        self
203    }
204
205    /// Attach a Responses API store to this `AppState`.
206    ///
207    /// When set, the `/v1/responses` routes are fully operational.
208    pub fn with_responses_store(mut self, store: Arc<ResponseStore>) -> Self {
209        self.responses_store = Some(store);
210        self
211    }
212
213    /// Attach a per-API-key rate limiter to this `AppState`.
214    ///
215    /// When set, the `per_key_rate_limit_middleware` is applied to all routes
216    /// in `build_app_with_config`.
217    pub fn with_per_key_rate_limiter(mut self, limiter: Arc<PerKeyRateLimiter>) -> Self {
218        self.per_key_rate_limiter = Some(limiter);
219        self
220    }
221
222    /// Create app state with an explicit disk batch store and queue sender.
223    ///
224    /// Used by the server startup code to wire up the full batch pipeline.
225    pub fn with_batch_pipeline(
226        queue: mpsc::Sender<BatchRequest>,
227        model_id: String,
228        default_sampler: SamplerConfig,
229        vocab_bytes: Option<VocabBytes>,
230        hidden_size: usize,
231        batch_disk_store: Arc<DiskBatchStore>,
232        batch_queue_tx: BatchQueueSender,
233    ) -> Self {
234        let loaded_at = SystemTime::now()
235            .duration_since(SystemTime::UNIX_EPOCH)
236            .map(|d| d.as_secs())
237            .unwrap_or(0);
238
239        Self {
240            queue,
241            model_id,
242            loaded_at,
243            default_sampler,
244            vocab_bytes,
245            hidden_size,
246            metrics: Arc::new(Metrics::new()),
247            batch_store: new_batch_store(),
248            batch_disk_store,
249            batch_queue_tx,
250            model_pool: Mutex::new(ModelPool::new(4, 0)),
251            prefix_cache: Arc::new(Mutex::new(PrefixKvCache::new(PrefixCacheConfig::default()))),
252            loras: Arc::new(RwLock::new(HashMap::new())),
253            threads_store: None,
254            run_queue_tx: None,
255            files_store: None,
256            run_event_tx_broadcast: None,
257            responses_store: None,
258            per_key_rate_limiter: None,
259        }
260    }
261}