oxillama_server/state.rs
1//! Shared application state for the API server.
2//!
3//! `AppState` carries all read/write shared data needed by route handlers:
4//! - The inference request queue (mpsc sender).
5//! - Cached model metadata (id, sampler, vocab, hidden size).
6//! - Metrics store.
7//! - In-memory batch store (legacy).
8//! - Disk-backed batch store + queue sender (C3).
9//! - Multi-model LRU pool (C1), protected by a `Mutex` for admin mutations.
10//! - Prefix KV cache for system-prompt reuse across requests.
11//! - LoRA adapter registry (name → `Arc<LoadedLora>`).
12//! - Persistent thread store + run queue (Assistants API).
13
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex, RwLock};
16use std::time::SystemTime;
17
18use tokio::sync::mpsc;
19
20use crate::batch::{new_batch_store, BatchStore};
21use crate::batch_spool::{BatchQueueSender, BatchStore as DiskBatchStore};
22use crate::files_store::FilesStore;
23use crate::metrics::Metrics;
24use crate::queue::{BatchRequest, VocabBytes};
25use crate::rate_limit::PerKeyRateLimiter;
26use crate::responses_store::ResponseStore;
27use crate::router::ModelPool;
28use crate::threads::stream::RunEventSender;
29use crate::threads::{RunQueueSender, ThreadStore};
30
31use oxillama_runtime::sampling::SamplerConfig;
32use oxillama_runtime::{LoadedLora, PrefixCacheConfig, PrefixKvCache};
33
34/// Shared application state accessible by all route handlers.
35///
36/// All inference is delegated to the single background worker via `queue`.
37/// Read-only metadata (model ID, default sampler, vocabulary, hidden size)
38/// is cached here so handlers never need to reach into the engine.
39pub struct AppState {
40 /// Channel to send inference requests to the worker.
41 pub queue: mpsc::Sender<BatchRequest>,
42
43 /// The model name/identifier for API responses.
44 pub model_id: String,
45
46 /// Unix timestamp (seconds) when the model was loaded.
47 pub loaded_at: u64,
48
49 /// Default sampler configuration read from `EngineConfig` at startup.
50 ///
51 /// Route handlers clone this and apply per-request overrides on top.
52 pub default_sampler: SamplerConfig,
53
54 /// Vocabulary byte table used for grammar-constrained sampling.
55 ///
56 /// `None` when the model has no tokenizer (should not happen at serve time).
57 pub vocab_bytes: Option<VocabBytes>,
58
59 /// Hidden-state dimension for the `/v1/embeddings` endpoint.
60 pub hidden_size: usize,
61
62 /// Shared metrics store.
63 pub metrics: Arc<Metrics>,
64
65 /// In-memory batch job registry (legacy OpenAI batch compat layer).
66 pub batch_store: BatchStore,
67
68 /// Disk-backed batch job store (C3: disk-spool backend).
69 pub batch_disk_store: Arc<DiskBatchStore>,
70
71 /// Sender into the disk-backed batch processing queue (C3).
72 pub batch_queue_tx: BatchQueueSender,
73
74 /// Multi-model LRU warm-pool (C1).
75 ///
76 /// Wrapped in `Mutex` so admin routes can mutate it without blocking the
77 /// inference worker. In the current single-worker design the worker also
78 /// holds the pool; admin mutations use `try_lock` to avoid deadlocks.
79 pub model_pool: Mutex<ModelPool>,
80
81 /// Prefix KV cache for system-prompt reuse across requests.
82 ///
83 /// When a new request shares a long prefix with a previously-cached
84 /// sequence (e.g. a fixed system prompt), the matching KV state is
85 /// restored and only the suffix tokens need a fresh prefill pass.
86 pub prefix_cache: Arc<Mutex<PrefixKvCache>>,
87
88 /// Loaded LoRA adapter registry: stable name → `Arc<LoadedLora>`.
89 ///
90 /// Populated via `POST /admin/loras`. Request handlers look up adapters
91 /// by name and pass them to the worker via `BatchRequest::Generate`.
92 pub loras: Arc<RwLock<HashMap<String, Arc<LoadedLora>>>>,
93
94 /// Persistent thread/message/run store for the Assistants API.
95 ///
96 /// `None` when the Assistants API has not been configured (no `--threads-dir`
97 /// flag was passed at startup). Route handlers return 503 in this case.
98 pub threads_store: Option<Arc<ThreadStore>>,
99
100 /// Sender into the run processing queue for the Assistants API.
101 ///
102 /// `None` when `threads_store` is `None`.
103 pub run_queue_tx: Option<RunQueueSender>,
104
105 /// Persistent files store for the Files API (`/v1/files`).
106 ///
107 /// `None` when the Files API has not been configured.
108 pub files_store: Option<Arc<FilesStore>>,
109
110 /// Broadcast sender for run lifecycle events (SSE streaming).
111 ///
112 /// `None` when SSE streaming is not enabled.
113 pub run_event_tx_broadcast: Option<RunEventSender>,
114
115 /// In-memory store for Responses API objects.
116 ///
117 /// `None` when the Responses API has not been enabled. Route handlers
118 /// return 503 (`ModelNotReady`) in this case.
119 pub responses_store: Option<Arc<ResponseStore>>,
120
121 /// Per-API-key token-bucket rate limiter.
122 ///
123 /// `None` when per-key rate limiting has not been configured.
124 pub per_key_rate_limiter: Option<Arc<PerKeyRateLimiter>>,
125}
126
127impl AppState {
128 /// Create new app state from all required fields.
129 ///
130 /// `queue` must be connected to a live inference worker.
131 pub fn new(
132 queue: mpsc::Sender<BatchRequest>,
133 model_id: String,
134 default_sampler: SamplerConfig,
135 vocab_bytes: Option<VocabBytes>,
136 hidden_size: usize,
137 ) -> Self {
138 let loaded_at = SystemTime::now()
139 .duration_since(SystemTime::UNIX_EPOCH)
140 .map(|d| d.as_secs())
141 .unwrap_or(0);
142
143 // Default disk store goes to a temp-dir location when not configured.
144 let spool_dir = std::env::temp_dir().join("oxillama_batch_spool");
145 let batch_disk_store = Arc::new(DiskBatchStore::new(spool_dir).unwrap_or_else(|_| {
146 DiskBatchStore::new(std::env::temp_dir()).expect("fallback spool dir")
147 }));
148
149 // Create a no-op batch queue (capacity 0 → sends will fail gracefully).
150 let (batch_queue_tx, _) =
151 tokio::sync::mpsc::channel::<crate::batch_spool::BatchWorkItem>(1);
152
153 Self {
154 queue,
155 model_id,
156 loaded_at,
157 default_sampler,
158 vocab_bytes,
159 hidden_size,
160 metrics: Arc::new(Metrics::new()),
161 batch_store: new_batch_store(),
162 batch_disk_store,
163 batch_queue_tx,
164 model_pool: Mutex::new(ModelPool::new(4, 0)),
165 prefix_cache: Arc::new(Mutex::new(PrefixKvCache::new(PrefixCacheConfig::default()))),
166 loras: Arc::new(RwLock::new(HashMap::new())),
167 threads_store: None,
168 run_queue_tx: None,
169 files_store: None,
170 run_event_tx_broadcast: None,
171 responses_store: None,
172 per_key_rate_limiter: None,
173 }
174 }
175
176 /// Attach a threads store and run queue to this `AppState`.
177 ///
178 /// Returns `self` with the `threads_store` and `run_queue_tx` fields
179 /// populated. Designed for use in a builder chain:
180 ///
181 /// ```text
182 /// let state = AppState::new(...).with_threads(store, tx);
183 /// ```
184 pub fn with_threads(mut self, store: Arc<ThreadStore>, tx: RunQueueSender) -> Self {
185 self.threads_store = Some(store);
186 self.run_queue_tx = Some(tx);
187 self
188 }
189
190 /// Attach a files store to this `AppState`.
191 pub fn with_files(mut self, store: Arc<FilesStore>) -> Self {
192 self.files_store = Some(store);
193 self
194 }
195
196 /// Attach a run-event broadcast sender to this `AppState`.
197 ///
198 /// When set, the run worker broadcasts lifecycle events that SSE handlers
199 /// can subscribe to.
200 pub fn with_run_event_sender(mut self, tx: RunEventSender) -> Self {
201 self.run_event_tx_broadcast = Some(tx);
202 self
203 }
204
205 /// Attach a Responses API store to this `AppState`.
206 ///
207 /// When set, the `/v1/responses` routes are fully operational.
208 pub fn with_responses_store(mut self, store: Arc<ResponseStore>) -> Self {
209 self.responses_store = Some(store);
210 self
211 }
212
213 /// Attach a per-API-key rate limiter to this `AppState`.
214 ///
215 /// When set, the `per_key_rate_limit_middleware` is applied to all routes
216 /// in `build_app_with_config`.
217 pub fn with_per_key_rate_limiter(mut self, limiter: Arc<PerKeyRateLimiter>) -> Self {
218 self.per_key_rate_limiter = Some(limiter);
219 self
220 }
221
222 /// Create app state with an explicit disk batch store and queue sender.
223 ///
224 /// Used by the server startup code to wire up the full batch pipeline.
225 pub fn with_batch_pipeline(
226 queue: mpsc::Sender<BatchRequest>,
227 model_id: String,
228 default_sampler: SamplerConfig,
229 vocab_bytes: Option<VocabBytes>,
230 hidden_size: usize,
231 batch_disk_store: Arc<DiskBatchStore>,
232 batch_queue_tx: BatchQueueSender,
233 ) -> Self {
234 let loaded_at = SystemTime::now()
235 .duration_since(SystemTime::UNIX_EPOCH)
236 .map(|d| d.as_secs())
237 .unwrap_or(0);
238
239 Self {
240 queue,
241 model_id,
242 loaded_at,
243 default_sampler,
244 vocab_bytes,
245 hidden_size,
246 metrics: Arc::new(Metrics::new()),
247 batch_store: new_batch_store(),
248 batch_disk_store,
249 batch_queue_tx,
250 model_pool: Mutex::new(ModelPool::new(4, 0)),
251 prefix_cache: Arc::new(Mutex::new(PrefixKvCache::new(PrefixCacheConfig::default()))),
252 loras: Arc::new(RwLock::new(HashMap::new())),
253 threads_store: None,
254 run_queue_tx: None,
255 files_store: None,
256 run_event_tx_broadcast: None,
257 responses_store: None,
258 per_key_rate_limiter: None,
259 }
260 }
261}