oxillama-server 0.1.3

OpenAI-compatible HTTP API server for OxiLLaMa
Documentation
//! Shared application state for the API server.
//!
//! `AppState` carries all read/write shared data needed by route handlers:
//! - The inference request queue (mpsc sender).
//! - Cached model metadata (id, sampler, vocab, hidden size).
//! - Metrics store.
//! - In-memory batch store (legacy).
//! - Disk-backed batch store + queue sender (C3).
//! - Multi-model LRU pool (C1), protected by a `Mutex` for admin mutations.
//! - Prefix KV cache for system-prompt reuse across requests.
//! - LoRA adapter registry (name → `Arc<LoadedLora>`).
//! - Persistent thread store + run queue (Assistants API).

use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::SystemTime;

use tokio::sync::mpsc;

use crate::batch::{new_batch_store, BatchStore};
use crate::batch_spool::{BatchQueueSender, BatchStore as DiskBatchStore};
use crate::files_store::FilesStore;
use crate::metrics::Metrics;
use crate::queue::{BatchRequest, VocabBytes};
use crate::rate_limit::PerKeyRateLimiter;
use crate::responses_store::ResponseStore;
use crate::router::ModelPool;
use crate::threads::stream::RunEventSender;
use crate::threads::{RunQueueSender, ThreadStore};

use oxillama_runtime::sampling::SamplerConfig;
use oxillama_runtime::{LoadedLora, PrefixCacheConfig, PrefixKvCache};

/// Shared application state accessible by all route handlers.
///
/// All inference is delegated to the single background worker via `queue`.
/// Read-only metadata (model ID, default sampler, vocabulary, hidden size)
/// is cached here so handlers never need to reach into the engine.
pub struct AppState {
    /// Channel to send inference requests to the worker.
    pub queue: mpsc::Sender<BatchRequest>,

    /// The model name/identifier for API responses.
    pub model_id: String,

    /// Unix timestamp (seconds) when the model was loaded.
    pub loaded_at: u64,

    /// Default sampler configuration read from `EngineConfig` at startup.
    ///
    /// Route handlers clone this and apply per-request overrides on top.
    pub default_sampler: SamplerConfig,

    /// Vocabulary byte table used for grammar-constrained sampling.
    ///
    /// `None` when the model has no tokenizer (should not happen at serve time).
    pub vocab_bytes: Option<VocabBytes>,

    /// Hidden-state dimension for the `/v1/embeddings` endpoint.
    pub hidden_size: usize,

    /// Shared metrics store.
    pub metrics: Arc<Metrics>,

    /// In-memory batch job registry (legacy OpenAI batch compat layer).
    pub batch_store: BatchStore,

    /// Disk-backed batch job store (C3: disk-spool backend).
    pub batch_disk_store: Arc<DiskBatchStore>,

    /// Sender into the disk-backed batch processing queue (C3).
    pub batch_queue_tx: BatchQueueSender,

    /// Multi-model LRU warm-pool (C1).
    ///
    /// Wrapped in `Mutex` so admin routes can mutate it without blocking the
    /// inference worker. In the current single-worker design the worker also
    /// holds the pool; admin mutations use `try_lock` to avoid deadlocks.
    pub model_pool: Mutex<ModelPool>,

    /// Prefix KV cache for system-prompt reuse across requests.
    ///
    /// When a new request shares a long prefix with a previously-cached
    /// sequence (e.g. a fixed system prompt), the matching KV state is
    /// restored and only the suffix tokens need a fresh prefill pass.
    pub prefix_cache: Arc<Mutex<PrefixKvCache>>,

    /// Loaded LoRA adapter registry: stable name → `Arc<LoadedLora>`.
    ///
    /// Populated via `POST /admin/loras`.  Request handlers look up adapters
    /// by name and pass them to the worker via `BatchRequest::Generate`.
    pub loras: Arc<RwLock<HashMap<String, Arc<LoadedLora>>>>,

    /// Persistent thread/message/run store for the Assistants API.
    ///
    /// `None` when the Assistants API has not been configured (no `--threads-dir`
    /// flag was passed at startup).  Route handlers return 503 in this case.
    pub threads_store: Option<Arc<ThreadStore>>,

    /// Sender into the run processing queue for the Assistants API.
    ///
    /// `None` when `threads_store` is `None`.
    pub run_queue_tx: Option<RunQueueSender>,

    /// Persistent files store for the Files API (`/v1/files`).
    ///
    /// `None` when the Files API has not been configured.
    pub files_store: Option<Arc<FilesStore>>,

    /// Broadcast sender for run lifecycle events (SSE streaming).
    ///
    /// `None` when SSE streaming is not enabled.
    pub run_event_tx_broadcast: Option<RunEventSender>,

    /// In-memory store for Responses API objects.
    ///
    /// `None` when the Responses API has not been enabled.  Route handlers
    /// return 503 (`ModelNotReady`) in this case.
    pub responses_store: Option<Arc<ResponseStore>>,

    /// Per-API-key token-bucket rate limiter.
    ///
    /// `None` when per-key rate limiting has not been configured.
    pub per_key_rate_limiter: Option<Arc<PerKeyRateLimiter>>,
}

impl AppState {
    /// Create new app state from all required fields.
    ///
    /// `queue` must be connected to a live inference worker.
    pub fn new(
        queue: mpsc::Sender<BatchRequest>,
        model_id: String,
        default_sampler: SamplerConfig,
        vocab_bytes: Option<VocabBytes>,
        hidden_size: usize,
    ) -> Self {
        let loaded_at = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);

        // Default disk store goes to a temp-dir location when not configured.
        let spool_dir = std::env::temp_dir().join("oxillama_batch_spool");
        let batch_disk_store = Arc::new(DiskBatchStore::new(spool_dir).unwrap_or_else(|_| {
            DiskBatchStore::new(std::env::temp_dir()).expect("fallback spool dir")
        }));

        // Create a no-op batch queue (capacity 0 → sends will fail gracefully).
        let (batch_queue_tx, _) =
            tokio::sync::mpsc::channel::<crate::batch_spool::BatchWorkItem>(1);

        Self {
            queue,
            model_id,
            loaded_at,
            default_sampler,
            vocab_bytes,
            hidden_size,
            metrics: Arc::new(Metrics::new()),
            batch_store: new_batch_store(),
            batch_disk_store,
            batch_queue_tx,
            model_pool: Mutex::new(ModelPool::new(4, 0)),
            prefix_cache: Arc::new(Mutex::new(PrefixKvCache::new(PrefixCacheConfig::default()))),
            loras: Arc::new(RwLock::new(HashMap::new())),
            threads_store: None,
            run_queue_tx: None,
            files_store: None,
            run_event_tx_broadcast: None,
            responses_store: None,
            per_key_rate_limiter: None,
        }
    }

    /// Attach a threads store and run queue to this `AppState`.
    ///
    /// Returns `self` with the `threads_store` and `run_queue_tx` fields
    /// populated.  Designed for use in a builder chain:
    ///
    /// ```text
    /// let state = AppState::new(...).with_threads(store, tx);
    /// ```
    pub fn with_threads(mut self, store: Arc<ThreadStore>, tx: RunQueueSender) -> Self {
        self.threads_store = Some(store);
        self.run_queue_tx = Some(tx);
        self
    }

    /// Attach a files store to this `AppState`.
    pub fn with_files(mut self, store: Arc<FilesStore>) -> Self {
        self.files_store = Some(store);
        self
    }

    /// Attach a run-event broadcast sender to this `AppState`.
    ///
    /// When set, the run worker broadcasts lifecycle events that SSE handlers
    /// can subscribe to.
    pub fn with_run_event_sender(mut self, tx: RunEventSender) -> Self {
        self.run_event_tx_broadcast = Some(tx);
        self
    }

    /// Attach a Responses API store to this `AppState`.
    ///
    /// When set, the `/v1/responses` routes are fully operational.
    pub fn with_responses_store(mut self, store: Arc<ResponseStore>) -> Self {
        self.responses_store = Some(store);
        self
    }

    /// Attach a per-API-key rate limiter to this `AppState`.
    ///
    /// When set, the `per_key_rate_limit_middleware` is applied to all routes
    /// in `build_app_with_config`.
    pub fn with_per_key_rate_limiter(mut self, limiter: Arc<PerKeyRateLimiter>) -> Self {
        self.per_key_rate_limiter = Some(limiter);
        self
    }

    /// Create app state with an explicit disk batch store and queue sender.
    ///
    /// Used by the server startup code to wire up the full batch pipeline.
    pub fn with_batch_pipeline(
        queue: mpsc::Sender<BatchRequest>,
        model_id: String,
        default_sampler: SamplerConfig,
        vocab_bytes: Option<VocabBytes>,
        hidden_size: usize,
        batch_disk_store: Arc<DiskBatchStore>,
        batch_queue_tx: BatchQueueSender,
    ) -> Self {
        let loaded_at = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);

        Self {
            queue,
            model_id,
            loaded_at,
            default_sampler,
            vocab_bytes,
            hidden_size,
            metrics: Arc::new(Metrics::new()),
            batch_store: new_batch_store(),
            batch_disk_store,
            batch_queue_tx,
            model_pool: Mutex::new(ModelPool::new(4, 0)),
            prefix_cache: Arc::new(Mutex::new(PrefixKvCache::new(PrefixCacheConfig::default()))),
            loras: Arc::new(RwLock::new(HashMap::new())),
            threads_store: None,
            run_queue_tx: None,
            files_store: None,
            run_event_tx_broadcast: None,
            responses_store: None,
            per_key_rate_limiter: None,
        }
    }
}