Skip to main content

quiver_server/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The Quiver daemon: gRPC and REST over the embeddable [`Database`], with
3//! API-key auth and secure-by-default configuration.
4//!
5//! Both transports are thin shells over the same shared engine operations; the
6//! engine is synchronous and CPU/`fsync`-bound, so every call is offloaded with
7//! `spawn_blocking`. Access is guarded by a reader–writer lock (ADR-0057): reads
8//! take the shared lock and run **concurrently**, writes take the exclusive lock,
9//! and the single-writer model is unchanged (ADR-0002/0006). A read that finds a
10//! collection's index stale (a prior write deferred its rebuild) serves the prior
11//! snapshot and schedules an **off-lock** rebuild (ADR-0062): the index is rebuilt
12//! under the shared lock and swapped in under a brief write lock, so a rebuild never
13//! stalls concurrent readers.
14//!
15//! Auth is by scoped API key (Bearer / gRPC `authorization` metadata) with
16//! default-deny RBAC: each key carries a role (read ⊆ write ⊆ admin) and a
17//! collection scope, enforced on every operation at the shared op layer
18//! (ADR-0011/0013, the `auth` module). Encryption-at-rest is on by default
19//! (ADR-0010): unless `insecure` is set, an `encryption_key` is required and the
20//! engine is opened through `quiver-crypto`'s AEAD codec; payloads may also be
21//! client-side-encrypted (ADR-0012). TLS-in-transit uses `rustls` over the
22//! audited `ring` provider — REST via `axum-server`, gRPC via tonic's `tls-ring`
23//! — and a non-loopback bind requires it; setting a client CA additionally
24//! requires mutual TLS. Mutating and administrative operations, and every
25//! access-control denial, are recorded to an append-only audit log (ADR-0011,
26//! the `audit` module) when `audit_log` is set. Per-request cost limits bound
27//! the work any single authenticated request can demand (ADR-0040, the
28//! [`Limits`] type), and an opt-in per-key token-bucket rate limiter bounds the
29//! request *rate* (ADR-0049, the [`RateLimiter`] type); per-tenant engine
30//! partitioning is a later phase. Design: `docs/api/rest-grpc.md`.
31
32mod audit;
33mod auth;
34mod error;
35mod grpc;
36mod metrics;
37mod otlp;
38mod rate_limit;
39mod replication;
40mod rest;
41
42use std::collections::{HashMap, HashSet};
43use std::future::Future;
44use std::net::{IpAddr, Ipv4Addr, SocketAddr};
45use std::path::PathBuf;
46use std::pin::Pin;
47use std::sync::{Arc, Mutex, RwLock};
48
49use axum_server::tls_rustls::RustlsConfig;
50use figment::Figment;
51use figment::providers::{Env, Format, Serialized, Toml};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use tokio::net::TcpListener;
55use tokio::sync::broadcast;
56use tokio_stream::wrappers::TcpListenerStream;
57use tonic::transport::{Certificate, Identity, ServerTlsConfig};
58
59use quiver_crypto::AeadCodec;
60use quiver_embed::{
61    Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
62    SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
63};
64use quiver_query::Filter;
65
66pub use auth::{Action, ApiKey, CollectionScope};
67pub use error::Error;
68pub use otlp::OtlpConfig;
69// The embedding/rerank seam lives in its own lean crate (ADR-0058) so the MCP
70// server can share it; re-exported here so the server's public API is unchanged.
71pub use quiver_providers::{
72    EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
73    RerankProvider,
74};
75pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
76
77use audit::{AuditLog, Outcome};
78use auth::Principal;
79
80/// Per-request cost limits (ADR-0040). Each cap bounds the work a single
81/// authenticated request can demand, so one oversized request cannot exhaust the
82/// node under the single-writer model (ADR-0006). Over-limit requests are
83/// **rejected** with HTTP 400 / gRPC `InvalidArgument` rather than silently
84/// clamped — a truncated `k` or `ef_search` would return surprising, lower-quality
85/// results with no signal. Defaults are generous; raise a cap with a `[limits]`
86/// table in `quiver.toml` or the matching `QUIVER_MAX_*` environment variable.
87#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
88#[serde(default)]
89pub struct Limits {
90    /// Maximum `k` (top-k) for `search` / `search_multi_vector`.
91    pub max_k: usize,
92    /// Maximum search beam width (`ef_search`).
93    pub max_ef_search: usize,
94    /// Maximum `fetch` page size.
95    pub max_fetch_limit: usize,
96    /// Maximum vector dimension: the declared collection dimension at creation
97    /// and the length of any query vector (per token, for multi-vector).
98    pub max_vector_dim: usize,
99    /// Maximum serialized-JSON payload size per point, in bytes.
100    pub max_payload_bytes: usize,
101    /// Maximum number of points / documents in a single upsert request.
102    pub max_batch_size: usize,
103    /// Maximum HTTP request body size, in bytes (enforced by the REST layer via
104    /// axum's `DefaultBodyLimit`; gRPC is bounded by tonic's decode limit).
105    pub max_request_body_bytes: usize,
106    /// Maximum number of non-zero terms in a hybrid-search sparse query (ADR-0043),
107    /// bounding the posting-list scan.
108    pub max_sparse_terms: usize,
109    /// Maximum number of points in a single bulk upsert (`POST …/points:bulk`,
110    /// ADR-0045). Larger than `max_batch_size` because the bulk path defers index
111    /// maintenance to one rebuild; the request is still bounded by
112    /// `max_request_body_bytes`, so raise that too for very large bulk loads.
113    pub max_bulk_batch_size: usize,
114}
115
116impl Default for Limits {
117    fn default() -> Self {
118        Self {
119            max_k: 10_000,
120            max_ef_search: 4_096,
121            max_fetch_limit: 10_000,
122            max_vector_dim: 8_192,
123            max_payload_bytes: 65_536,
124            max_batch_size: 1_000,
125            max_request_body_bytes: 32 * 1024 * 1024,
126            max_sparse_terms: 4_096,
127            max_bulk_batch_size: 50_000,
128        }
129    }
130}
131
132impl Limits {
133    // Apply `QUIVER_MAX_*` overrides after figment extraction (ADR-0013 env
134    // layer). The flat env keys do not nest under figment's `limits` table, so
135    // they are read explicitly here; a malformed value is a hard config error.
136    fn apply_env_overrides(&mut self) -> Result<(), Error> {
137        let slots: [(&str, &mut usize); 9] = [
138            ("QUIVER_MAX_K", &mut self.max_k),
139            ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
140            ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
141            ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
142            ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
143            ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
144            (
145                "QUIVER_MAX_REQUEST_BODY_BYTES",
146                &mut self.max_request_body_bytes,
147            ),
148            ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
149            ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
150        ];
151        for (key, slot) in slots {
152            if let Ok(raw) = std::env::var(key) {
153                *slot = raw.parse().map_err(|_| {
154                    Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
155                })?;
156            }
157        }
158        Ok(())
159    }
160
161    // Reject a zero cap (a `0` limit would refuse every request).
162    fn validate(&self) -> Result<(), Error> {
163        let named = [
164            ("max_k", self.max_k),
165            ("max_ef_search", self.max_ef_search),
166            ("max_fetch_limit", self.max_fetch_limit),
167            ("max_vector_dim", self.max_vector_dim),
168            ("max_payload_bytes", self.max_payload_bytes),
169            ("max_batch_size", self.max_batch_size),
170            ("max_request_body_bytes", self.max_request_body_bytes),
171            ("max_sparse_terms", self.max_sparse_terms),
172            ("max_bulk_batch_size", self.max_bulk_batch_size),
173        ];
174        if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
175            return Err(Error::Config(format!(
176                "limits.{name} must be greater than zero"
177            )));
178        }
179        Ok(())
180    }
181
182    fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
183        if k > self.max_k {
184            return Err(Error::BadRequest(format!(
185                "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
186                self.max_k
187            )));
188        }
189        if ef_search > self.max_ef_search {
190            return Err(Error::BadRequest(format!(
191                "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
192                self.max_ef_search
193            )));
194        }
195        Ok(())
196    }
197
198    fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
199        if n > self.max_sparse_terms {
200            return Err(Error::BadRequest(format!(
201                "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
202                self.max_sparse_terms
203            )));
204        }
205        Ok(())
206    }
207
208    fn check_fetch(&self, limit: usize) -> Result<(), Error> {
209        if limit > self.max_fetch_limit {
210            return Err(Error::BadRequest(format!(
211                "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
212                self.max_fetch_limit
213            )));
214        }
215        Ok(())
216    }
217
218    fn check_dim(&self, dim: usize) -> Result<(), Error> {
219        if dim > self.max_vector_dim {
220            return Err(Error::BadRequest(format!(
221                "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
222                self.max_vector_dim
223            )));
224        }
225        Ok(())
226    }
227
228    fn check_vector_len(&self, len: usize) -> Result<(), Error> {
229        if len > self.max_vector_dim {
230            return Err(Error::BadRequest(format!(
231                "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
232                self.max_vector_dim
233            )));
234        }
235        Ok(())
236    }
237
238    fn check_batch(&self, n: usize) -> Result<(), Error> {
239        if n > self.max_batch_size {
240            return Err(Error::BadRequest(format!(
241                "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
242                self.max_batch_size
243            )));
244        }
245        Ok(())
246    }
247
248    fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
249        if n > self.max_bulk_batch_size {
250            return Err(Error::BadRequest(format!(
251                "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
252                self.max_bulk_batch_size
253            )));
254        }
255        Ok(())
256    }
257
258    fn check_payload(&self, payload: &Value) -> Result<(), Error> {
259        let size = serde_json::to_vec(payload)
260            .map(|v| v.len())
261            .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
262        if size > self.max_payload_bytes {
263            return Err(Error::BadRequest(format!(
264                "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
265                self.max_payload_bytes
266            )));
267        }
268        Ok(())
269    }
270}
271
272/// Server configuration, layered defaults → `quiver.toml` → `QUIVER_*` env and
273/// validated at startup (ADR-0013).
274#[derive(Debug, Clone, Serialize, Deserialize)]
275#[serde(default)]
276pub struct Config {
277    /// Data directory for the storage engine.
278    pub data_dir: PathBuf,
279    /// REST (HTTP/1.1) bind address.
280    pub rest_addr: SocketAddr,
281    /// gRPC (HTTP/2) bind address.
282    pub grpc_addr: SocketAddr,
283    /// Accepted API keys with their RBAC scopes (ADR-0011). A bare secret —
284    /// from a comma-separated `QUIVER_API_KEYS` (the env form) or a plain TOML
285    /// array entry — is an all-collections admin key; a structured
286    /// `{secret, role, collections}` entry pins a narrower scope. Empty is
287    /// allowed only with `insecure = true`.
288    #[serde(default, deserialize_with = "auth::de_api_keys")]
289    pub api_keys: Vec<ApiKey>,
290    /// Hex-encoded 256-bit **master key** for encryption-at-rest (64 hex
291    /// characters). It wraps per-collection data-encryption keys (ADR-0010).
292    /// Required unless `insecure = true` or [`master_key_file`] is set; source it
293    /// from the environment or a secret store, never the committed config. `None`
294    /// ⇒ data is stored unencrypted (only valid in `insecure` mode).
295    ///
296    /// [`master_key_file`]: Config::master_key_file
297    pub encryption_key: Option<String>,
298    /// Path to a file holding the hex master key, as an alternative to
299    /// [`encryption_key`] (set exactly one). Lets the key arrive as a mounted
300    /// secret (Docker/Kubernetes) or a KMS-decrypted file rather than an
301    /// environment variable. It should be mode `0600`; a group/world-accessible
302    /// file is warned about at startup.
303    ///
304    /// [`encryption_key`]: Config::encryption_key
305    pub master_key_file: Option<PathBuf>,
306    /// Path to the PEM-encoded TLS certificate chain. Must be set together with
307    /// `tls_key`. Required for a non-loopback bind unless `insecure = true`.
308    pub tls_cert: Option<PathBuf>,
309    /// Path to the PEM-encoded TLS private key. Must be set together with
310    /// `tls_cert`.
311    pub tls_key: Option<PathBuf>,
312    /// Path to a PEM-encoded CA certificate that signs accepted client
313    /// certificates. When set, both transports require **mutual TLS**: a client
314    /// must present a certificate chaining to this CA to connect (ADR-0011).
315    /// Requires `tls_cert`/`tls_key`; bearer API keys still carry the RBAC scope.
316    pub tls_client_ca: Option<PathBuf>,
317    /// Path to an append-only audit log file (ADR-0011). When set, every
318    /// mutating and administrative operation and every access-control denial is
319    /// appended as one JSON object per line (JSON Lines); records are always
320    /// also emitted as `tracing` events. Unset ⇒ tracing only. Secrets are never
321    /// written — see `docs/security/audit.md`.
322    pub audit_log: Option<PathBuf>,
323    /// Run as a **read-replica follower** (ADR-0030): connect to a leader's gRPC
324    /// endpoint at this URL (e.g. `http://leader:6334`) and continuously apply its
325    /// committed operations, serving reads. A follower **refuses writes**. Unset ⇒
326    /// this node is a normal read-write leader. (Plaintext `http://` for now; TLS
327    /// to the leader is a follow-up — run replication over a trusted network.)
328    pub leader_url: Option<String>,
329    /// API key the follower presents to the leader's admin-scoped `Replicate`
330    /// stream (used with `leader_url`). Source it like any secret.
331    pub leader_api_key: Option<String>,
332    /// Opt out of the secure defaults (no auth, no encryption-at-rest, allow a
333    /// non-loopback bind without TLS). For local development only; never the
334    /// default.
335    pub insecure: bool,
336    /// Per-request cost limits (ADR-0040). Set with a `[limits]` table in
337    /// `quiver.toml` or the `QUIVER_MAX_*` environment variables.
338    pub limits: Limits,
339    /// Opt-in server-side embedding providers, keyed by collection name (ADR-0047).
340    /// Configured with `[embedding.<collection>]` tables in `quiver.toml`; default
341    /// empty, so the engine stays model-agnostic and library mode is unaffected.
342    /// API keys are referenced by env-var *name* and resolved at startup, never
343    /// stored. Enables `search_text` / `upsert_text` for the named collections.
344    #[serde(default)]
345    pub embedding: HashMap<String, EmbeddingConfig>,
346    /// Opt-in server-side rerank providers, keyed by collection name (ADR-0047).
347    /// Configured with `[rerank.<collection>]` tables; enables the one-call
348    /// retrieve→rerank stage of `search_text`.
349    #[serde(default)]
350    pub rerank: HashMap<String, RerankConfig>,
351    /// Opt-in per-key rate limiting (ADR-0049). Set a `[rate_limit]` table in
352    /// `quiver.toml` or the `QUIVER_RATE_LIMIT_*` env vars;
353    /// `requests_per_second = 0` (the default) disables it.
354    #[serde(default)]
355    pub rate_limit: RateLimitConfig,
356    /// Opt-in OpenTelemetry traces export (ADR-0059). Set an `[otlp]` table or the
357    /// `QUIVER_OTLP_*` env vars; an empty `endpoint` (the default) disables it.
358    /// Export additionally requires the server's `otlp` build feature.
359    #[serde(default)]
360    pub otlp: OtlpConfig,
361    /// Lock-free MVCC reads (ADR-0064), **experimental and default-off**. Serves
362    /// reads of single-vector, in-memory collections from an `arc-swap` snapshot so
363    /// pure-vector reads never block on a concurrent writer. Also settable via
364    /// `QUIVER_MVCC_READS`. See `docs/adr/0064-mvcc-reads-implementation.md`.
365    #[serde(default)]
366    pub mvcc_reads: bool,
367}
368
369impl Default for Config {
370    fn default() -> Self {
371        Self {
372            data_dir: PathBuf::from("./quiver-data"),
373            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
374            grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
375            api_keys: Vec::new(),
376            encryption_key: None,
377            master_key_file: None,
378            tls_cert: None,
379            tls_key: None,
380            tls_client_ca: None,
381            audit_log: None,
382            leader_url: None,
383            leader_api_key: None,
384            insecure: false,
385            limits: Limits::default(),
386            embedding: HashMap::new(),
387            rerank: HashMap::new(),
388            rate_limit: RateLimitConfig::default(),
389            otlp: OtlpConfig::default(),
390            mvcc_reads: false,
391        }
392    }
393}
394
395impl Config {
396    /// Load configuration from defaults, an optional `quiver.toml`, and
397    /// `QUIVER_*` environment variables.
398    pub fn load() -> Result<Self, Error> {
399        let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
400            .merge(Toml::file("quiver.toml"))
401            .merge(Env::prefixed("QUIVER_"))
402            .extract()
403            .map_err(|e| Error::Config(e.to_string()))?;
404        // The flat `QUIVER_MAX_*` env keys do not nest under the `limits` table
405        // figment builds, so apply them explicitly (ADR-0040).
406        config.limits.apply_env_overrides()?;
407        // Same for the flat `QUIVER_RATE_LIMIT_*` keys (ADR-0049).
408        config
409            .rate_limit
410            .apply_env_overrides()
411            .map_err(Error::Config)?;
412        // Same for the flat `QUIVER_OTLP_*` keys (ADR-0059).
413        config.otlp.apply_env_overrides().map_err(Error::Config)?;
414        Ok(config)
415    }
416
417    /// Reject insecure configurations unless explicitly opted out (ADR-0013):
418    /// no anonymous access, encryption-at-rest on by default with a valid key,
419    /// and no non-loopback bind without TLS.
420    pub fn validate(&self) -> Result<(), Error> {
421        if self.api_keys.is_empty() && !self.insecure {
422            return Err(Error::Config(
423                "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
424                 set insecure=true for local development"
425                    .to_owned(),
426            ));
427        }
428        // Resolve the master key from the env var or a key file (exactly one).
429        let master_key = self.master_key_hex()?;
430        if master_key.is_none() && !self.insecure {
431            return Err(Error::Config(
432                "no encryption key configured: encryption-at-rest is on by default — \
433                 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
434                 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
435                 store data unencrypted (development only)"
436                    .to_owned(),
437            ));
438        }
439        // Fail fast on a malformed key rather than at first write.
440        if let Some(key) = &master_key {
441            AeadCodec::from_hex(key)
442                .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
443        }
444        // TLS certificate and key are set together or not at all.
445        if self.tls_cert.is_some() != self.tls_key.is_some() {
446            return Err(Error::Config(
447                "tls_cert and tls_key must be set together".to_owned(),
448            ));
449        }
450        // mTLS layers on top of server TLS: a client CA needs a server cert/key.
451        if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
452            return Err(Error::Config(
453                "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
454            ));
455        }
456        let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
457        let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
458        if non_loopback && !tls_enabled && !self.insecure {
459            return Err(Error::Config(
460                "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
461                 or insecure=true for local development"
462                    .to_owned(),
463            ));
464        }
465        // Reject a nonsensical cost limit (a `0` cap would refuse every request).
466        self.limits.validate()?;
467        Ok(())
468    }
469
470    /// The effective hex master key: from [`master_key_file`] when set (read and
471    /// trimmed), otherwise [`encryption_key`]. `None` means no key is configured
472    /// (only valid with `insecure`).
473    ///
474    /// # Errors
475    /// [`Error::Config`] if both sources are set, or the key file cannot be read.
476    ///
477    /// [`master_key_file`]: Config::master_key_file
478    /// [`encryption_key`]: Config::encryption_key
479    pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
480        let env_key = self
481            .encryption_key
482            .as_deref()
483            .map(str::trim)
484            .filter(|k| !k.is_empty());
485        match (&self.master_key_file, env_key) {
486            (Some(_), Some(_)) => Err(Error::Config(
487                "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
488                 (QUIVER_MASTER_KEY_FILE), not both"
489                    .to_owned(),
490            )),
491            (Some(path), None) => {
492                warn_if_world_readable(path);
493                let hex = std::fs::read_to_string(path).map_err(|e| {
494                    Error::Config(format!("reading master_key_file {}: {e}", path.display()))
495                })?;
496                Ok(Some(hex.trim().to_owned()))
497            }
498            (None, Some(key)) => Ok(Some(key.to_owned())),
499            (None, None) => Ok(None),
500        }
501    }
502}
503
504// Warn (don't fail — Docker/Kubernetes secrets often mount group/world-readable)
505// when a master-key file is more permissive than `0600`.
506#[cfg(unix)]
507fn warn_if_world_readable(path: &std::path::Path) {
508    use std::os::unix::fs::PermissionsExt;
509    if let Ok(meta) = std::fs::metadata(path)
510        && meta.permissions().mode() & 0o077 != 0
511    {
512        tracing::warn!(
513            path = %path.display(),
514            mode = format!("{:o}", meta.permissions().mode() & 0o777),
515            "master key file is group/world-accessible; restrict it to 0600"
516        );
517    }
518}
519
520#[cfg(not(unix))]
521fn warn_if_world_readable(_path: &std::path::Path) {}
522
523/// Shared server state: the engine behind a single-writer lock, the accepted
524/// API keys with their RBAC scopes, and the audit log.
525#[derive(Clone)]
526pub(crate) struct AppState {
527    // The engine behind a reader–writer lock (ADR-0057): reads take the shared
528    // lock and run concurrently; writes take the exclusive lock. A read that finds
529    // a collection's index stale upgrades to the write lock once to rebuild it,
530    // then serves concurrently again.
531    db: Arc<RwLock<Database>>,
532    keys: Arc<Vec<ApiKey>>,
533    audit: Arc<AuditLog>,
534    // Fan-out of every committed op to replication followers (ADR-0030). The
535    // commit observer publishes here; each `Replicate` stream subscribes.
536    replication_tx: broadcast::Sender<WalEntry>,
537    // True on a replication follower: external writes are refused; the engine's
538    // state is owned by the stream it applies from the leader (ADR-0030).
539    read_only: bool,
540    // Per-request cost limits, enforced at this op layer so both transports are
541    // covered by one implementation (ADR-0040).
542    limits: Limits,
543    // Opt-in, provider-agnostic server-side embedding/rerank providers, keyed by
544    // collection (ADR-0047). Empty on the common path; `search_text`/`upsert_text`
545    // require a configured embedder for the target collection.
546    embed: Arc<EmbedRegistry>,
547    // Opt-in per-key token-bucket rate limiter (ADR-0049). A no-op when disabled.
548    rate_limiter: Arc<RateLimiter>,
549    // Prometheus metrics registry (ADR-0014/0054), scraped at `GET /metrics`.
550    metrics: Arc<metrics::Metrics>,
551    // Collections with an off-lock index rebuild in flight (ADR-0062). A search that
552    // observes a deferred rebuild schedules one here, single-flighted so concurrent
553    // readers never kick off duplicate builds for the same collection.
554    rebuilding: Arc<Mutex<HashSet<String>>>,
555    // Lock-free MVCC read cache (ADR-0064 increment 3): cached `arc-swap` snapshot
556    // cells for MVCC-served collections. A pure-vector search loads the cell and
557    // searches it **without taking the database lock**, so it never blocks on a
558    // concurrent writer. Populated under the read lock the first time a collection
559    // is seen fresh; the map itself changes only on create/drop, never on a data
560    // write. Empty unless `QUIVER_MVCC_READS` is on.
561    snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
562    // Snapshot of the engine's `QUIVER_MVCC_READS` flag, read once at startup. When
563    // on, the writer drives off-lock consolidation rebuilds (ADR-0064 increment 3)
564    // because pure-vector fast-path reads bypass the reader-driven scheduler. Off by
565    // default, so the non-MVCC path is byte-identical.
566    mvcc: bool,
567}
568
569/// A collection's metadata.
570pub(crate) struct CollectionInfo {
571    pub name: String,
572    pub dim: u32,
573    pub metric: DistanceMetric,
574    pub count: u64,
575    pub index: IndexSpec,
576    pub filterable: Vec<FilterableField>,
577    pub multivector: bool,
578    pub vector_encryption: VectorEncryption,
579}
580
581/// A point to upsert.
582pub(crate) struct PointIn {
583    pub id: String,
584    pub vector: Vec<f32>,
585    pub payload: Value,
586}
587
588/// A text point to embed server-side and upsert (ADR-0047).
589pub(crate) struct TextPointIn {
590    pub id: String,
591    pub text: String,
592    pub payload: Value,
593}
594
595/// Default candidate pool size a rerank stage over-fetches before reordering to
596/// the requested `k` (ADR-0047).
597const RERANK_CANDIDATES: usize = 50;
598
599/// The document text a rerank stage scores: the original text stored under
600/// [`TEXT_KEY`] by `upsert_text`, else the whole payload as a string so the
601/// reranker still has something to compare.
602fn doc_text(payload: Option<&Value>) -> String {
603    match payload {
604        Some(Value::Object(map)) => map
605            .get(TEXT_KEY)
606            .and_then(Value::as_str)
607            .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
608        Some(v) => v.to_string(),
609        None => String::new(),
610    }
611}
612
613/// A fetched point.
614pub(crate) struct PointOut {
615    pub id: String,
616    pub vector: Option<Vec<f32>>,
617    pub payload: Value,
618}
619
620/// A search hit.
621pub(crate) struct MatchOut {
622    pub id: String,
623    pub score: f32,
624    pub payload: Option<Value>,
625    pub vector: Option<Vec<f32>>,
626}
627
628/// A multi-vector document to upsert.
629pub(crate) struct DocumentIn {
630    pub id: String,
631    pub vectors: Vec<Vec<f32>>,
632    pub payload: Value,
633}
634
635/// A multi-vector document hit (MaxSim).
636pub(crate) struct DocumentMatchOut {
637    pub id: String,
638    pub score: f32,
639    pub payload: Option<Value>,
640    pub vectors: Option<Vec<Vec<f32>>>,
641}
642
643impl AppState {
644    /// Authenticate a presented bearer token to its [`Principal`], or `None`
645    /// (a 401). An empty key set means `insecure` mode (validated at startup),
646    /// which admits any caller as an all-collections admin.
647    pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
648        auth::authenticate(&self.keys, presented)
649    }
650
651    /// Consume one rate-limit token for `actor` (ADR-0049). A no-op `Allowed` when
652    /// rate limiting is disabled. Both transports call this at their auth choke
653    /// point so the limiter is enforced by one implementation.
654    pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
655        self.rate_limiter.check(actor)
656    }
657
658    /// Whether the per-key rate limiter is active (lets a transport skip the work
659    /// entirely on the common, disabled path).
660    pub(crate) fn rate_limit_enabled(&self) -> bool {
661        self.rate_limiter.enabled()
662    }
663
664    // Run a **mutating** engine op behind the exclusive write lock, off the async
665    // runtime (the engine is synchronous and CPU/IO-bound). The single writer is
666    // unchanged from the prior single-mutex model (ADR-0006/0057).
667    async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
668    where
669        T: Send + 'static,
670        F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
671    {
672        let db = Arc::clone(&self.db);
673        tokio::task::spawn_blocking(move || -> Result<T, Error> {
674            let mut guard = db
675                .write()
676                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
677            f(&mut guard).map_err(Error::Engine)
678        })
679        .await
680        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
681    }
682
683    // Run a **read-only** engine op behind the shared read lock — many of these run
684    // concurrently (ADR-0057). The closure gets `&Database`, so it can only call the
685    // `&self` reads (`*_snapshot`, `fetch`, accessors).
686    async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
687    where
688        T: Send + 'static,
689        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
690    {
691        let db = Arc::clone(&self.db);
692        tokio::task::spawn_blocking(move || -> Result<T, Error> {
693            let guard = db
694                .read()
695                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
696            f(&guard).map_err(Error::Engine)
697        })
698        .await
699        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
700    }
701
702    // Run a snapshot read for `collection` (ADR-0057/0062). Takes only the shared
703    // read lock, so concurrent searches run in parallel. When a prior write deferred
704    // this collection's rebuild, the snapshot read serves the **prior** snapshot
705    // (snapshot-isolated, slightly stale) rather than blocking, and we schedule an
706    // **off-lock** rebuild so the next read is fresh — the reader never waits on a
707    // rebuild under the exclusive lock.
708    async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
709    where
710        T: Send + 'static,
711        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
712    {
713        let db = Arc::clone(&self.db);
714        let cells = Arc::clone(&self.snapshot_cells);
715        let coll = collection.clone();
716        let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
717            let guard = db
718                .read()
719                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
720            let result = f(&guard).map_err(Error::Engine)?;
721            // Report staleness so the caller can schedule a background rebuild; a
722            // missing collection (raced a drop) is simply "nothing to rebuild".
723            let stale = guard.needs_rebuild(&coll).unwrap_or(false);
724            // Warm the lock-free read cache (ADR-0064 increment 3): once an
725            // MVCC-served collection is fresh (its base snapshot is published), cache
726            // its cell so subsequent pure-vector reads skip the lock. The cell
727            // self-updates as the writer republishes, so it is never re-fetched.
728            if !stale
729                && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
730                && let Ok(mut map) = cells.write()
731            {
732                map.entry(coll.clone()).or_insert(cell);
733            }
734            Ok((result, stale))
735        })
736        .await
737        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
738        if stale {
739            self.schedule_rebuild(collection);
740        }
741        Ok(result)
742    }
743
744    // The cached lock-free snapshot cell for `collection`, if one has been warmed
745    // (ADR-0064 increment 3). The map read contends only with create/drop, never
746    // with a data write — that is what makes the pure-vector read lock-free.
747    fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
748        self.snapshot_cells.read().ok()?.get(collection).cloned()
749    }
750
751    // After an MVCC-mode data write, schedule an off-lock consolidation rebuild if
752    // the write left the overlay stale (ADR-0064 increment 3): pure-vector fast-path
753    // reads bypass the reader-driven scheduler, so the writer drives consolidation.
754    // A brief read-lock staleness check; the rebuild itself runs off-lock and
755    // single-flighted. No-op (and not called) when MVCC is off.
756    async fn schedule_if_stale(&self, collection: &str) {
757        let db = Arc::clone(&self.db);
758        let coll = collection.to_owned();
759        let stale = tokio::task::spawn_blocking(move || {
760            db.read()
761                .ok()
762                .and_then(|g| g.needs_rebuild(&coll).ok())
763                .unwrap_or(false)
764        })
765        .await
766        .unwrap_or(false);
767        if stale {
768            self.schedule_rebuild(collection.to_owned());
769        }
770    }
771
772    // Schedule a single off-lock rebuild for `collection` (ADR-0062), deduplicated
773    // so concurrent readers that all observe the same deferred rebuild start exactly
774    // one. A no-op if one is already in flight.
775    fn schedule_rebuild(&self, collection: String) {
776        {
777            let mut inflight = match self.rebuilding.lock() {
778                Ok(g) => g,
779                Err(_) => return,
780            };
781            if !inflight.insert(collection.clone()) {
782                return; // already rebuilding this collection
783            }
784        }
785        let state = self.clone();
786        tokio::spawn(async move {
787            state.run_rebuild(&collection).await;
788            if let Ok(mut inflight) = state.rebuilding.lock() {
789                inflight.remove(&collection);
790            }
791        });
792    }
793
794    // Drive a collection's off-lock rebuild to completion (ADR-0062): capture the
795    // inputs under the shared read lock, build with no lock held, install under a
796    // brief write lock, and repeat while a write landed during the build (the commit
797    // reports the collection still stale). Each phase is a `spawn_blocking` so the
798    // CPU-bound build never stalls the async runtime; errors end the attempt (the
799    // collection stays stale and the next read reschedules).
800    async fn run_rebuild(&self, collection: &str) {
801        loop {
802            let db = Arc::clone(&self.db);
803            let coll = collection.to_owned();
804            let inputs = tokio::task::spawn_blocking(move || {
805                let guard = db.read().ok()?;
806                guard.snapshot_rebuild_inputs(&coll).ok().flatten()
807            })
808            .await
809            .ok()
810            .flatten();
811            let Some(inputs) = inputs else { return };
812
813            let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
814                return;
815            };
816
817            let db = Arc::clone(&self.db);
818            let still_stale = tokio::task::spawn_blocking(move || {
819                let mut guard = db.write().ok()?;
820                guard.commit_rebuild(rebuilt).ok()
821            })
822            .await
823            .ok()
824            .flatten();
825            match still_stale {
826                Some(true) => continue, // a write landed during the build — rebuild again
827                _ => return,
828            }
829        }
830    }
831
832    // Authorize `action` on `resource`, recording a denial in the audit log
833    // before propagating it. The shared choke point for both transports.
834    fn authorize(
835        &self,
836        principal: &Principal,
837        action: Action,
838        op: &str,
839        resource: &str,
840    ) -> Result<(), Error> {
841        principal
842            .require(action, Some(resource))
843            .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
844    }
845
846    // Authorize a collection-agnostic operation (listing): only the role is
847    // checked; a denial is recorded against the `*` resource.
848    fn authorize_global(
849        &self,
850        principal: &Principal,
851        action: Action,
852        op: &str,
853    ) -> Result<(), Error> {
854        principal
855            .require(action, None)
856            .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
857    }
858
859    /// Open a replication stream (ADR-0030): authorize (admin), then — in a single
860    /// engine critical section so no commit can interleave — subscribe to the live
861    /// commit tail and snapshot current state. The caller streams the snapshot,
862    /// then the tail from the receiver; because the subscription is taken under the
863    /// same lock as the snapshot, every post-snapshot op arrives on the receiver
864    /// and none is missed or duplicated.
865    pub(crate) async fn open_replication(
866        &self,
867        principal: &Principal,
868    ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
869        self.authorize_global(principal, Action::Admin, "replicate")?;
870        let tx = self.replication_tx.clone();
871        self.read_blocking(move |db| {
872            let rx = tx.subscribe();
873            let snapshot = db.replication_snapshot()?;
874            Ok((snapshot, rx))
875        })
876        .await
877    }
878
879    /// Apply a replicated op received from the leader (ADR-0030). Internal to the
880    /// follower stream — deliberately NOT gated by `read_only`, which only refuses
881    /// *external* client writes.
882    pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
883        self.write_blocking(move |db| db.apply_replicated(op)).await
884    }
885
886    // Refuse a mutating operation on a read-only replication follower (ADR-0030);
887    // its state is owned by the leader's stream, not by external clients.
888    fn ensure_writable(&self, op: &str) -> Result<(), Error> {
889        if self.read_only {
890            return Err(Error::Forbidden(format!(
891                "{op}: this node is a read-only replication follower"
892            )));
893        }
894        Ok(())
895    }
896
897    #[allow(clippy::too_many_arguments)]
898    pub(crate) async fn create_collection(
899        &self,
900        principal: &Principal,
901        name: String,
902        dim: u32,
903        metric: DistanceMetric,
904        index: IndexSpec,
905        filterable: Vec<FilterableField>,
906        multivector: bool,
907        vector_encryption: VectorEncryption,
908    ) -> Result<CollectionInfo, Error> {
909        self.ensure_writable("create_collection")?;
910        self.authorize(principal, Action::Admin, "create_collection", &name)?;
911        self.limits.check_dim(dim as usize)?;
912        let descriptor = Descriptor::new(dim, Dtype::F32, metric)
913            .with_index(index)
914            .with_filterable(filterable.clone())
915            .with_multivector(multivector)
916            .with_vector_encryption(vector_encryption);
917        let owned = name.clone();
918        let result = self
919            .write_blocking(move |db| db.create_collection(&owned, descriptor))
920            .await;
921        self.audit.record(
922            principal.actor(),
923            "create_collection",
924            &name,
925            Outcome::of(&result),
926        );
927        result?;
928        Ok(CollectionInfo {
929            name,
930            dim,
931            metric,
932            count: 0,
933            index,
934            filterable,
935            multivector,
936            vector_encryption,
937        })
938    }
939
940    pub(crate) async fn get_collection(
941        &self,
942        principal: &Principal,
943        name: String,
944    ) -> Result<CollectionInfo, Error> {
945        self.authorize(principal, Action::Read, "get_collection", &name)?;
946        self.read_blocking(move |db| {
947            let descriptor = db
948                .descriptor(&name)
949                .cloned()
950                .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
951            // A multi-vector collection reports its document count, not its
952            // (much larger) token-row count.
953            let count = if descriptor.multivector {
954                db.document_count(&name)? as u64
955            } else {
956                db.len(&name)? as u64
957            };
958            Ok(CollectionInfo {
959                name,
960                dim: descriptor.dim,
961                metric: descriptor.metric,
962                count,
963                index: descriptor.index,
964                filterable: descriptor.filterable,
965                multivector: descriptor.multivector,
966                vector_encryption: descriptor.vector_encryption,
967            })
968        })
969        .await
970    }
971
972    pub(crate) async fn list_collections(
973        &self,
974        principal: &Principal,
975    ) -> Result<Vec<CollectionInfo>, Error> {
976        self.authorize_global(principal, Action::Read, "list_collections")?;
977        let mut infos = self
978            .read_blocking(|db| {
979                let mut out = Vec::new();
980                for name in db.collection_names() {
981                    if let Some(descriptor) = db.descriptor(&name).cloned() {
982                        let count = if descriptor.multivector {
983                            db.document_count(&name)? as u64
984                        } else {
985                            db.len(&name)? as u64
986                        };
987                        out.push(CollectionInfo {
988                            name,
989                            dim: descriptor.dim,
990                            metric: descriptor.metric,
991                            count,
992                            index: descriptor.index,
993                            filterable: descriptor.filterable,
994                            multivector: descriptor.multivector,
995                            vector_encryption: descriptor.vector_encryption,
996                        });
997                    }
998                }
999                Ok(out)
1000            })
1001            .await?;
1002        // Never reveal collections outside the caller's scope.
1003        infos.retain(|info| principal.can_see(&info.name));
1004        Ok(infos)
1005    }
1006
1007    pub(crate) async fn delete_collection(
1008        &self,
1009        principal: &Principal,
1010        name: String,
1011    ) -> Result<bool, Error> {
1012        self.ensure_writable("delete_collection")?;
1013        self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1014        let resource = name.clone();
1015        let result = self
1016            .write_blocking(move |db| db.drop_collection(&name))
1017            .await;
1018        self.audit.record(
1019            principal.actor(),
1020            "delete_collection",
1021            &resource,
1022            Outcome::of(&result),
1023        );
1024        // Evict the dropped collection's cached lock-free cell (ADR-0064 increment 3)
1025        // so a later same-named collection never serves a stale cell.
1026        if matches!(result, Ok(true))
1027            && let Ok(mut map) = self.snapshot_cells.write()
1028        {
1029            map.remove(&resource);
1030        }
1031        result
1032    }
1033
1034    #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1035    pub(crate) async fn upsert(
1036        &self,
1037        principal: &Principal,
1038        collection: String,
1039        points: Vec<PointIn>,
1040    ) -> Result<u64, Error> {
1041        self.ensure_writable("upsert")?;
1042        self.authorize(principal, Action::Write, "upsert", &collection)?;
1043        self.limits.check_batch(points.len())?;
1044        for p in &points {
1045            self.limits.check_vector_len(p.vector.len())?;
1046            self.limits.check_payload(&p.payload)?;
1047        }
1048        let resource = collection.clone();
1049        let result = self
1050            .write_blocking(move |db| {
1051                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1052                    .iter()
1053                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1054                    .collect();
1055                db.upsert_batch(&collection, &records)
1056            })
1057            .await;
1058        self.audit
1059            .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1060        if self.mvcc && result.is_ok() {
1061            self.schedule_if_stale(&resource).await;
1062        }
1063        result
1064    }
1065
1066    // Bulk upsert for a load-then-query workload (ADR-0045): one WAL fsync plus a
1067    // single deferred index-build pass, with the larger `max_bulk_batch_size` cap.
1068    pub(crate) async fn upsert_bulk(
1069        &self,
1070        principal: &Principal,
1071        collection: String,
1072        points: Vec<PointIn>,
1073    ) -> Result<u64, Error> {
1074        self.ensure_writable("upsert")?;
1075        self.authorize(principal, Action::Write, "upsert", &collection)?;
1076        self.limits.check_bulk_batch(points.len())?;
1077        for p in &points {
1078            self.limits.check_vector_len(p.vector.len())?;
1079            self.limits.check_payload(&p.payload)?;
1080        }
1081        let resource = collection.clone();
1082        let result = self
1083            .write_blocking(move |db| {
1084                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1085                    .iter()
1086                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1087                    .collect();
1088                db.upsert_bulk(&collection, &records)
1089            })
1090            .await;
1091        self.audit.record(
1092            principal.actor(),
1093            "upsert_bulk",
1094            &resource,
1095            Outcome::of(&result),
1096        );
1097        if self.mvcc && result.is_ok() {
1098            self.schedule_if_stale(&resource).await;
1099        }
1100        result
1101    }
1102
1103    /// Take a consistent online snapshot of the whole database into a
1104    /// server-local `destination` directory (ADR-0050). A global admin
1105    /// operation; runs the checkpoint + copy on the blocking pool.
1106    #[tracing::instrument(skip_all)]
1107    pub(crate) async fn snapshot(
1108        &self,
1109        principal: &Principal,
1110        destination: String,
1111    ) -> Result<SnapshotInfo, Error> {
1112        self.ensure_writable("snapshot")?;
1113        self.authorize_global(principal, Action::Admin, "snapshot")?;
1114        let dest = std::path::PathBuf::from(&destination);
1115        let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1116        self.audit.record(
1117            principal.actor(),
1118            "snapshot",
1119            &destination,
1120            Outcome::of(&result),
1121        );
1122        result
1123    }
1124
1125    pub(crate) async fn delete_points(
1126        &self,
1127        principal: &Principal,
1128        collection: String,
1129        ids: Vec<String>,
1130    ) -> Result<u64, Error> {
1131        self.ensure_writable("delete_points")?;
1132        self.authorize(principal, Action::Write, "delete_points", &collection)?;
1133        let resource = collection.clone();
1134        let result = self
1135            .write_blocking(move |db| {
1136                let mut count = 0u64;
1137                for id in &ids {
1138                    if db.delete(&collection, id)? {
1139                        count += 1;
1140                    }
1141                }
1142                Ok(count)
1143            })
1144            .await;
1145        self.audit.record(
1146            principal.actor(),
1147            "delete_points",
1148            &resource,
1149            Outcome::of(&result),
1150        );
1151        if self.mvcc && result.is_ok() {
1152            self.schedule_if_stale(&resource).await;
1153        }
1154        result
1155    }
1156
1157    pub(crate) async fn get_points(
1158        &self,
1159        principal: &Principal,
1160        collection: String,
1161        ids: Vec<String>,
1162        with_vector: bool,
1163    ) -> Result<Vec<PointOut>, Error> {
1164        self.authorize(principal, Action::Read, "get_points", &collection)?;
1165        self.read_blocking(move |db| {
1166            let mut out = Vec::new();
1167            for id in &ids {
1168                if let Some(m) = db.get(&collection, id)? {
1169                    out.push(PointOut {
1170                        id: m.id,
1171                        vector: if with_vector { m.vector } else { None },
1172                        payload: m.payload.unwrap_or(Value::Null),
1173                    });
1174                }
1175            }
1176            Ok(out)
1177        })
1178        .await
1179    }
1180
1181    #[allow(clippy::too_many_arguments)]
1182    #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1183    pub(crate) async fn search(
1184        &self,
1185        principal: &Principal,
1186        collection: String,
1187        vector: Vec<f32>,
1188        k: usize,
1189        filter: Option<Filter>,
1190        ef_search: usize,
1191        with_payload: bool,
1192        with_vector: bool,
1193    ) -> Result<Vec<MatchOut>, Error> {
1194        self.authorize(principal, Action::Read, "search", &collection)?;
1195        self.limits.check_search(k, ef_search)?;
1196        self.limits.check_vector_len(vector.len())?;
1197
1198        // Lock-free fast path (ADR-0064 increment 3): a **pure-vector** read of an
1199        // MVCC-served collection loads the cached snapshot cell and searches it with
1200        // **no database lock**, so it never blocks on a concurrent writer. Payload,
1201        // filter, and vector results need the store (behind the lock) and fall
1202        // through to `search_blocking`.
1203        if filter.is_none()
1204            && !with_payload
1205            && !with_vector
1206            && let Some(cell) = self.cached_cell(&collection)
1207        {
1208            return tokio::task::spawn_blocking(move || {
1209                let matches = cell.load().search(&vector, k, ef_search)?;
1210                Ok::<_, quiver_embed::Error>(
1211                    matches
1212                        .into_iter()
1213                        .map(|m| MatchOut {
1214                            id: m.id,
1215                            score: m.score,
1216                            payload: None,
1217                            vector: None,
1218                        })
1219                        .collect(),
1220                )
1221            })
1222            .await
1223            .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1224            .map_err(Error::Engine);
1225        }
1226
1227        let params = SearchParams {
1228            k,
1229            filter,
1230            ef_search,
1231            with_payload,
1232            with_vector,
1233        };
1234        let coll = collection.clone();
1235        self.search_blocking(coll, move |db| {
1236            let matches = db.search_snapshot(&collection, &vector, &params)?;
1237            Ok(matches
1238                .into_iter()
1239                .map(|m| MatchOut {
1240                    id: m.id,
1241                    score: m.score,
1242                    payload: m.payload,
1243                    vector: m.vector,
1244                })
1245                .collect())
1246        })
1247        .await
1248    }
1249
1250    #[allow(clippy::too_many_arguments)]
1251    pub(crate) async fn hybrid_search(
1252        &self,
1253        principal: &Principal,
1254        collection: String,
1255        dense: Option<Vec<f32>>,
1256        sparse: Option<(Vec<u32>, Vec<f32>)>,
1257        text: Option<String>,
1258        k: usize,
1259        filter: Option<Filter>,
1260        ef_search: usize,
1261        rrf_k0: f32,
1262        with_payload: bool,
1263        with_vector: bool,
1264    ) -> Result<Vec<MatchOut>, Error> {
1265        self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1266        self.limits.check_search(k, ef_search)?;
1267        if let Some(v) = &dense {
1268            self.limits.check_vector_len(v.len())?;
1269        }
1270        if let Some((indices, values)) = &sparse {
1271            self.limits.check_sparse_terms(indices.len())?;
1272            if indices.len() != values.len() {
1273                return Err(Error::BadRequest(format!(
1274                    "sparse query indices ({}) and values ({}) length mismatch",
1275                    indices.len(),
1276                    values.len()
1277                )));
1278            }
1279        }
1280        let params = SearchParams {
1281            k,
1282            filter,
1283            ef_search,
1284            with_payload,
1285            with_vector,
1286        };
1287        let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1288        let coll = collection.clone();
1289        self.search_blocking(coll, move |db| {
1290            let matches = db.hybrid_search_snapshot(
1291                &collection,
1292                dense.as_deref(),
1293                sv.as_ref(),
1294                text.as_deref(),
1295                &params,
1296                rrf_k0,
1297            )?;
1298            Ok(matches
1299                .into_iter()
1300                .map(|m| MatchOut {
1301                    id: m.id,
1302                    score: m.score,
1303                    payload: m.payload,
1304                    vector: m.vector,
1305                })
1306                .collect())
1307        })
1308        .await
1309    }
1310
1311    /// Embed `text` with the collection's configured provider and run a dense (or
1312    /// dense ⊕ BM25, if the collection has text) search, optionally reranking the
1313    /// candidates in one call (ADR-0047). The text is also passed to the BM25 side,
1314    /// so a `upsert_text` corpus yields hybrid lexical+semantic retrieval for free.
1315    #[allow(clippy::too_many_arguments)]
1316    pub(crate) async fn search_text(
1317        &self,
1318        principal: &Principal,
1319        collection: String,
1320        text: String,
1321        k: usize,
1322        filter: Option<Filter>,
1323        ef_search: usize,
1324        rrf_k0: f32,
1325        with_payload: bool,
1326        with_vector: bool,
1327        rerank: bool,
1328    ) -> Result<Vec<MatchOut>, Error> {
1329        self.authorize(principal, Action::Read, "search_text", &collection)?;
1330        self.limits.check_search(k, ef_search)?;
1331        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1332            Error::BadRequest(format!(
1333                "collection {collection:?} has no embedding provider configured \
1334                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1335            ))
1336        })?;
1337        // Embed off the async runtime: the provider call is blocking network I/O.
1338        let query = text.clone();
1339        let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1340            .await
1341            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1342            .map_err(|e| Error::Upstream(e.to_string()))?
1343            .into_iter()
1344            .next()
1345            .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1346        self.limits.check_vector_len(vector.len())?;
1347
1348        let reranker = if rerank {
1349            self.embed.reranker(&collection)
1350        } else {
1351            None
1352        };
1353        // Over-fetch when reranking so the reranker can reorder a wide candidate set
1354        // down to the requested `k`; fetch payloads when reranking (we need the doc
1355        // text) even if the caller did not ask for them.
1356        let need_payload = with_payload || reranker.is_some();
1357        let fetch_k = if reranker.is_some() {
1358            k.max(RERANK_CANDIDATES)
1359        } else {
1360            k
1361        };
1362
1363        let mut hits = self
1364            .hybrid_search(
1365                principal,
1366                collection,
1367                Some(vector),
1368                None,
1369                Some(text.clone()),
1370                fetch_k,
1371                filter,
1372                ef_search,
1373                rrf_k0,
1374                need_payload,
1375                with_vector,
1376            )
1377            .await?;
1378
1379        if let Some(rr) = reranker {
1380            let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1381            let query = text;
1382            let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1383                .await
1384                .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1385                .map_err(|e| Error::Upstream(e.to_string()))?;
1386            // Re-score each hit and sort by the rerank score, descending.
1387            let mut scored: Vec<(f32, MatchOut)> = scores
1388                .into_iter()
1389                .zip(hits)
1390                .map(|(s, mut h)| {
1391                    h.score = s;
1392                    (s, h)
1393                })
1394                .collect();
1395            scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1396            hits = scored.into_iter().map(|(_, h)| h).collect();
1397        }
1398
1399        hits.truncate(k);
1400        // Drop payloads we only fetched for reranking if the caller didn't want them.
1401        if !with_payload {
1402            for h in &mut hits {
1403                h.payload = None;
1404            }
1405        }
1406        Ok(hits)
1407    }
1408
1409    /// Embed each point's `text` with the collection's provider and upsert it as a
1410    /// dense point, co-populating the `__quiver_text__` payload key (ADR-0046) so the
1411    /// same text is indexed for BM25 — one call feeds both the dense and lexical
1412    /// sides (ADR-0047).
1413    pub(crate) async fn upsert_text(
1414        &self,
1415        principal: &Principal,
1416        collection: String,
1417        points: Vec<TextPointIn>,
1418    ) -> Result<u64, Error> {
1419        self.ensure_writable("upsert_text")?;
1420        self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1421        self.limits.check_batch(points.len())?;
1422        for p in &points {
1423            if !matches!(p.payload, Value::Object(_) | Value::Null) {
1424                return Err(Error::BadRequest(
1425                    "upsert_text payload must be a JSON object or null".to_owned(),
1426                ));
1427            }
1428        }
1429        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1430            Error::BadRequest(format!(
1431                "collection {collection:?} has no embedding provider configured \
1432                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1433            ))
1434        })?;
1435        let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1436        let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1437            .await
1438            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1439            .map_err(|e| Error::Upstream(e.to_string()))?;
1440        if vectors.len() != points.len() {
1441            return Err(Error::Upstream(format!(
1442                "embedding provider returned {} vectors for {} inputs",
1443                vectors.len(),
1444                points.len()
1445            )));
1446        }
1447        let dense: Vec<PointIn> = points
1448            .into_iter()
1449            .zip(vectors)
1450            .map(|(p, vector)| {
1451                let mut payload = match p.payload {
1452                    Value::Object(map) => map,
1453                    _ => serde_json::Map::new(),
1454                };
1455                // Don't clobber a caller-supplied text key.
1456                payload
1457                    .entry(TEXT_KEY.to_owned())
1458                    .or_insert_with(|| Value::String(p.text.clone()));
1459                PointIn {
1460                    id: p.id,
1461                    vector,
1462                    payload: Value::Object(payload),
1463                }
1464            })
1465            .collect();
1466        self.upsert(principal, collection, dense).await
1467    }
1468
1469    pub(crate) async fn fetch(
1470        &self,
1471        principal: &Principal,
1472        collection: String,
1473        filter: Option<Filter>,
1474        limit: usize,
1475        with_payload: bool,
1476        with_vector: bool,
1477    ) -> Result<Vec<MatchOut>, Error> {
1478        self.authorize(principal, Action::Read, "fetch", &collection)?;
1479        self.limits.check_fetch(limit)?;
1480        self.read_blocking(move |db| {
1481            let matches = db.fetch(
1482                &collection,
1483                filter.as_ref(),
1484                limit,
1485                with_payload,
1486                with_vector,
1487            )?;
1488            Ok(matches
1489                .into_iter()
1490                .map(|m| MatchOut {
1491                    id: m.id,
1492                    score: m.score,
1493                    payload: m.payload,
1494                    vector: m.vector,
1495                })
1496                .collect())
1497        })
1498        .await
1499    }
1500
1501    pub(crate) async fn upsert_documents(
1502        &self,
1503        principal: &Principal,
1504        collection: String,
1505        documents: Vec<DocumentIn>,
1506    ) -> Result<u64, Error> {
1507        self.ensure_writable("upsert_documents")?;
1508        self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1509        self.limits.check_batch(documents.len())?;
1510        for doc in &documents {
1511            self.limits.check_payload(&doc.payload)?;
1512            for token in &doc.vectors {
1513                self.limits.check_vector_len(token.len())?;
1514            }
1515        }
1516        let resource = collection.clone();
1517        let result = self
1518            .write_blocking(move |db| {
1519                let mut count = 0u64;
1520                for doc in &documents {
1521                    db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1522                    count += 1;
1523                }
1524                Ok(count)
1525            })
1526            .await;
1527        self.audit.record(
1528            principal.actor(),
1529            "upsert_documents",
1530            &resource,
1531            Outcome::of(&result),
1532        );
1533        result
1534    }
1535
1536    pub(crate) async fn delete_documents(
1537        &self,
1538        principal: &Principal,
1539        collection: String,
1540        ids: Vec<String>,
1541    ) -> Result<u64, Error> {
1542        self.ensure_writable("delete_documents")?;
1543        self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1544        let resource = collection.clone();
1545        let result = self
1546            .write_blocking(move |db| {
1547                let mut count = 0u64;
1548                for id in &ids {
1549                    if db.delete_document(&collection, id)? {
1550                        count += 1;
1551                    }
1552                }
1553                Ok(count)
1554            })
1555            .await;
1556        self.audit.record(
1557            principal.actor(),
1558            "delete_documents",
1559            &resource,
1560            Outcome::of(&result),
1561        );
1562        result
1563    }
1564
1565    #[allow(clippy::too_many_arguments)]
1566    pub(crate) async fn search_multi_vector(
1567        &self,
1568        principal: &Principal,
1569        collection: String,
1570        query: Vec<Vec<f32>>,
1571        k: usize,
1572        filter: Option<Filter>,
1573        ef_search: usize,
1574        with_payload: bool,
1575        with_vector: bool,
1576    ) -> Result<Vec<DocumentMatchOut>, Error> {
1577        self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1578        self.limits.check_search(k, ef_search)?;
1579        for token in &query {
1580            self.limits.check_vector_len(token.len())?;
1581        }
1582        let params = SearchParams {
1583            k,
1584            filter,
1585            ef_search,
1586            with_payload,
1587            with_vector,
1588        };
1589        let coll = collection.clone();
1590        self.search_blocking(coll, move |db| {
1591            let matches = db.search_multi_vector_snapshot(&collection, &query, &params)?;
1592            Ok(matches
1593                .into_iter()
1594                .map(|m| DocumentMatchOut {
1595                    id: m.id,
1596                    score: m.score,
1597                    payload: m.payload,
1598                    vectors: m.vectors,
1599                })
1600                .collect())
1601        })
1602        .await
1603    }
1604}
1605
1606/// How many recently-committed ops the leader buffers for replication followers
1607/// (ADR-0030). A follower that falls further behind than this re-bootstraps.
1608const REPLICATION_BUFFER: usize = 1024;
1609
1610/// Run the server from `config` until a shutdown signal (Ctrl-C).
1611pub async fn run(config: Config) -> Result<(), Error> {
1612    config.validate()?;
1613    let rest_listener = TcpListener::bind(config.rest_addr)
1614        .await
1615        .map_err(Error::Io)?;
1616    let grpc_listener = TcpListener::bind(config.grpc_addr)
1617        .await
1618        .map_err(Error::Io)?;
1619    tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1620    tokio::select! {
1621        result = serve(config, rest_listener, grpc_listener) => result,
1622        () = shutdown_signal() => {
1623            tracing::info!("shutdown signal received");
1624            Ok(())
1625        }
1626    }
1627}
1628
1629/// Serve REST and gRPC on the given (already-bound) listeners until a transport
1630/// error. Exposed so tests can bind ephemeral ports.
1631pub async fn serve(
1632    config: Config,
1633    rest_listener: TcpListener,
1634    grpc_listener: TcpListener,
1635) -> Result<(), Error> {
1636    let mut db = open_database(&config)?;
1637    let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1638    // Publish every committed op to replication followers (ADR-0030). The observer
1639    // runs inside the engine's write critical section; `broadcast::Sender::send` is
1640    // non-blocking, so it never stalls a write.
1641    let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1642    {
1643        let tx = replication_tx.clone();
1644        db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1645            let _ = tx.send(entry.clone());
1646        }));
1647    }
1648    // Build the opt-in embedding/rerank providers, resolving each `api_key_env`
1649    // from the environment now (ADR-0047) so a missing key fails fast at startup
1650    // rather than on the first request.
1651    let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1652        .map_err(|e| Error::Config(e.to_string()))?;
1653
1654    // Enable lock-free MVCC reads if requested (ADR-0064): `config.mvcc_reads` or
1655    // `QUIVER_MVCC_READS` (the latter also defaults the engine flag at open).
1656    if config.mvcc_reads {
1657        db.set_mvcc_reads(true);
1658    }
1659    let mvcc = db.mvcc_reads();
1660
1661    let state = AppState {
1662        db: Arc::new(RwLock::new(db)),
1663        keys: Arc::new(config.api_keys.clone()),
1664        audit,
1665        replication_tx,
1666        read_only: config.leader_url.is_some(),
1667        limits: config.limits,
1668        embed: Arc::new(embed),
1669        rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1670        metrics: Arc::new(metrics::Metrics::default()),
1671        rebuilding: Arc::new(Mutex::new(HashSet::new())),
1672        snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
1673        mvcc,
1674    };
1675
1676    // A follower continuously applies the leader's committed-op stream (ADR-0030).
1677    if let Some(leader_url) = config.leader_url.clone() {
1678        replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1679    }
1680
1681    let app = rest::router(state.clone());
1682    let grpc = grpc::service(state);
1683
1684    let tls = load_tls(&config)?;
1685
1686    // REST: terminate TLS with axum-server when configured, else serve plaintext.
1687    let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1688        Some(material) => {
1689            let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1690            let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1691            let server =
1692                axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1693            Box::pin(async move {
1694                server
1695                    .serve(app.into_make_service())
1696                    .await
1697                    .map_err(Error::Io)
1698            })
1699        }
1700        None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1701    };
1702
1703    // gRPC: tonic terminates TLS itself (ring provider) when an identity is set.
1704    let mut grpc_builder = tonic::transport::Server::builder();
1705    if let Some(material) = &tls {
1706        let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1707        let mut tls_config = ServerTlsConfig::new().identity(identity);
1708        // Require client certificates chaining to the configured CA (mTLS).
1709        if let Some(ca_pem) = &material.client_ca_pem {
1710            tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1711        }
1712        grpc_builder = grpc_builder
1713            .tls_config(tls_config)
1714            .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1715    }
1716    let grpc_fut = async move {
1717        grpc_builder
1718            .add_service(grpc)
1719            .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1720            .await
1721            .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1722    };
1723
1724    tokio::try_join!(rest_fut, grpc_fut)?;
1725    Ok(())
1726}
1727
1728async fn shutdown_signal() {
1729    let _ = tokio::signal::ctrl_c().await;
1730}
1731
1732// Open the engine, enabling encryption-at-rest when a key is configured. The
1733// configured key is the **master key** of an envelope key-ring (ADR-0010): it
1734// wraps a per-collection data-encryption key, so dropping a collection
1735// crypto-shreds it. With no key (only valid in `insecure` mode, enforced by
1736// `Config::validate`) the engine is opened in plaintext.
1737fn open_database(config: &Config) -> Result<Database, Error> {
1738    let master_key = config.master_key_hex()?;
1739    let keyring =
1740        quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1741            .map_err(|e| Error::Config(e.to_string()))?;
1742    let db = match keyring {
1743        Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1744        None => Database::open(&config.data_dir)?,
1745    };
1746    Ok(db)
1747}
1748
1749// TLS material shared by both transports: the raw PEM (for tonic's `Identity`
1750// and `client_ca_root`) and a parsed rustls server config (for axum-server's
1751// REST acceptor). `client_ca_pem` is set only when mutual TLS is configured.
1752struct TlsMaterial {
1753    cert_pem: Vec<u8>,
1754    key_pem: Vec<u8>,
1755    client_ca_pem: Option<Vec<u8>>,
1756    rest_config: Arc<rustls::ServerConfig>,
1757}
1758
1759// Read the configured certificate, key, and optional client CA, returning `None`
1760// when TLS is not configured. `Config::validate` already enforces that the cert
1761// and key are set together, that a client CA requires them, and that a
1762// non-loopback bind requires TLS.
1763fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1764    match (&config.tls_cert, &config.tls_key) {
1765        (Some(cert_path), Some(key_path)) => {
1766            let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1767            let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1768            let client_ca_pem = config
1769                .tls_client_ca
1770                .as_ref()
1771                .map(std::fs::read)
1772                .transpose()
1773                .map_err(Error::Io)?;
1774            let rest_config = Arc::new(rustls_server_config(
1775                &cert_pem,
1776                &key_pem,
1777                client_ca_pem.as_deref(),
1778            )?);
1779            Ok(Some(TlsMaterial {
1780                cert_pem,
1781                key_pem,
1782                client_ca_pem,
1783                rest_config,
1784            }))
1785        }
1786        (None, None) => Ok(None),
1787        _ => Err(Error::Config(
1788            "tls_cert and tls_key must be set together".to_owned(),
1789        )),
1790    }
1791}
1792
1793// Build a rustls server config from PEM bytes over the audited `ring` provider
1794// (no OpenSSL, no aws-lc-rs C toolchain). TLS 1.3 and 1.2 are offered. When a
1795// client CA is supplied, client certificates chaining to it are required (mTLS).
1796fn rustls_server_config(
1797    cert_pem: &[u8],
1798    key_pem: &[u8],
1799    client_ca_pem: Option<&[u8]>,
1800) -> Result<rustls::ServerConfig, Error> {
1801    use rustls_pki_types::pem::PemObject;
1802    use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1803
1804    let certs = CertificateDer::pem_slice_iter(cert_pem)
1805        .collect::<std::result::Result<Vec<_>, _>>()
1806        .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1807    if certs.is_empty() {
1808        return Err(Error::Config(
1809            "tls_cert contains no certificates".to_owned(),
1810        ));
1811    }
1812    let key = PrivateKeyDer::from_pem_slice(key_pem)
1813        .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1814    let provider = Arc::new(rustls::crypto::ring::default_provider());
1815    let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1816        .with_safe_default_protocol_versions()
1817        .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1818    let builder = match client_ca_pem {
1819        Some(ca_pem) => {
1820            let mut roots = rustls::RootCertStore::empty();
1821            for cert in CertificateDer::pem_slice_iter(ca_pem) {
1822                let cert =
1823                    cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1824                roots
1825                    .add(cert)
1826                    .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1827            }
1828            let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1829                Arc::new(roots),
1830                provider,
1831            )
1832            .build()
1833            .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1834            builder.with_client_cert_verifier(verifier)
1835        }
1836        None => builder.with_no_client_auth(),
1837    };
1838    builder
1839        .with_single_cert(certs, key)
1840        .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1841}
1842
1843/// Initialize structured logging from `RUST_LOG` (defaulting to `info`). Safe to
1844/// call once at startup; a second call is ignored.
1845pub fn init_tracing() {
1846    init_observability(&Config::default());
1847}
1848
1849/// Install the global tracing subscriber: an `RUST_LOG`-driven `fmt` layer plus,
1850/// when the `otlp` feature is built **and** an OTLP endpoint is configured
1851/// (ADR-0059), an OpenTelemetry traces export layer. Safe to call once at
1852/// startup; a second call is a no-op. A failure to build the OTLP exporter logs a
1853/// warning and falls back to `fmt`-only rather than taking the server down.
1854#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1855pub fn init_observability(config: &Config) {
1856    use tracing_subscriber::EnvFilter;
1857    use tracing_subscriber::prelude::*;
1858    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1859    let registry = tracing_subscriber::registry()
1860        .with(filter)
1861        .with(tracing_subscriber::fmt::layer());
1862
1863    #[cfg(feature = "otlp")]
1864    if config.otlp.is_enabled() {
1865        match otlp::build_provider(&config.otlp) {
1866            Ok(provider) => {
1867                use opentelemetry::trace::TracerProvider as _;
1868                let tracer = provider.tracer("quiver");
1869                otlp::store_provider(provider);
1870                let _ = registry
1871                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
1872                    .try_init();
1873                return;
1874            }
1875            Err(e) => eprintln!("OTLP traces export disabled: {e}"),
1876        }
1877    }
1878
1879    let _ = registry.try_init();
1880}
1881
1882/// Flush and shut down the OpenTelemetry exporter, if one was installed. A no-op
1883/// without the `otlp` feature or when no endpoint was configured. Call once on
1884/// server shutdown so batched spans are not lost.
1885pub fn shutdown_observability() {
1886    #[cfg(feature = "otlp")]
1887    otlp::shutdown();
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892    use super::*;
1893
1894    // A valid 64-hex-character (256-bit) test key.
1895    const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
1896
1897    #[test]
1898    fn config_rejects_missing_keys_unless_insecure() {
1899        let mut config = Config::default();
1900        assert!(config.validate().is_err());
1901        config.insecure = true;
1902        assert!(config.validate().is_ok());
1903        config.insecure = false;
1904        config.api_keys = vec!["secret".into()];
1905        config.encryption_key = Some(TEST_KEY.to_owned());
1906        assert!(config.validate().is_ok());
1907    }
1908
1909    #[test]
1910    fn config_requires_encryption_key_unless_insecure() {
1911        let mut config = Config {
1912            api_keys: vec!["secret".into()],
1913            ..Config::default()
1914        };
1915        // API key set but no encryption key, not insecure ⇒ rejected.
1916        assert!(config.validate().is_err());
1917        config.encryption_key = Some(TEST_KEY.to_owned());
1918        assert!(config.validate().is_ok());
1919        // A malformed key is rejected up front, not at first write.
1920        config.encryption_key = Some("not-a-valid-hex-key".to_owned());
1921        assert!(config.validate().is_err());
1922        // Insecure mode may run without encryption-at-rest.
1923        config.insecure = true;
1924        config.encryption_key = None;
1925        assert!(config.validate().is_ok());
1926    }
1927
1928    #[test]
1929    fn master_key_file_is_an_alternative_to_the_env_key() {
1930        let dir = tempfile::tempdir().unwrap();
1931        let path = dir.path().join("master.key");
1932        // A trailing newline (as editors and `echo` add) is trimmed off.
1933        std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
1934
1935        let mut config = Config {
1936            api_keys: vec!["secret".into()],
1937            master_key_file: Some(path.clone()),
1938            ..Config::default()
1939        };
1940        // The file alone satisfies encryption-at-rest and resolves to the key.
1941        assert!(config.validate().is_ok());
1942        assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
1943
1944        // Setting both the env key and a file is rejected as ambiguous.
1945        config.encryption_key = Some(TEST_KEY.to_owned());
1946        assert!(config.validate().is_err());
1947
1948        // A file holding malformed hex is rejected up front.
1949        config.encryption_key = None;
1950        std::fs::write(&path, "not-a-valid-key").unwrap();
1951        assert!(config.validate().is_err());
1952    }
1953
1954    #[test]
1955    fn config_rejects_public_bind_without_optout() {
1956        let mut config = Config {
1957            api_keys: vec!["secret".into()],
1958            encryption_key: Some(TEST_KEY.to_owned()),
1959            ..Config::default()
1960        };
1961        config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
1962        // Auth and encryption are satisfied, so the only failure is the bind rule.
1963        assert!(config.validate().is_err());
1964        config.insecure = true;
1965        assert!(config.validate().is_ok());
1966    }
1967
1968    #[test]
1969    fn config_public_bind_allowed_with_tls() {
1970        let config = Config {
1971            api_keys: vec!["secret".into()],
1972            encryption_key: Some(TEST_KEY.to_owned()),
1973            tls_cert: Some(PathBuf::from("cert.pem")),
1974            tls_key: Some(PathBuf::from("key.pem")),
1975            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
1976            ..Config::default()
1977        };
1978        // TLS configured ⇒ a non-loopback bind is allowed without insecure.
1979        assert!(config.validate().is_ok());
1980    }
1981
1982    #[test]
1983    fn config_tls_cert_and_key_must_pair() {
1984        let mut config = Config {
1985            api_keys: vec!["secret".into()],
1986            encryption_key: Some(TEST_KEY.to_owned()),
1987            tls_cert: Some(PathBuf::from("cert.pem")),
1988            ..Config::default()
1989        };
1990        // Cert without key ⇒ rejected.
1991        assert!(config.validate().is_err());
1992        config.tls_key = Some(PathBuf::from("key.pem"));
1993        assert!(config.validate().is_ok());
1994    }
1995}