Skip to main content

quiver_server/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The Quiver daemon: gRPC and REST over the embeddable [`Database`], with
3//! API-key auth and secure-by-default configuration.
4//!
5//! Both transports are thin shells over the same shared engine operations; the
6//! engine is synchronous and CPU/`fsync`-bound, so every call is offloaded with
7//! `spawn_blocking`. Access is guarded by a reader–writer lock (ADR-0057): reads
8//! take the shared lock and run **concurrently**, writes take the exclusive lock,
9//! and the single-writer model is unchanged (ADR-0002/0006). A read that finds a
10//! collection's index stale (a prior write deferred its rebuild) serves the prior
11//! snapshot and schedules an **off-lock** rebuild (ADR-0062): the index is rebuilt
12//! under the shared lock and swapped in under a brief write lock, so a rebuild never
13//! stalls concurrent readers.
14//!
15//! Auth is by scoped API key (Bearer / gRPC `authorization` metadata) with
16//! default-deny RBAC: each key carries a role (read ⊆ write ⊆ admin) and a
17//! collection scope, enforced on every operation at the shared op layer
18//! (ADR-0011/0013, the `auth` module). Encryption-at-rest is on by default
19//! (ADR-0010): unless `insecure` is set, an `encryption_key` is required and the
20//! engine is opened through `quiver-crypto`'s AEAD codec; payloads may also be
21//! client-side-encrypted (ADR-0012). TLS-in-transit uses `rustls` over the
22//! audited `ring` provider — REST via `axum-server`, gRPC via tonic's `tls-ring`
23//! — and a non-loopback bind requires it; setting a client CA additionally
24//! requires mutual TLS. Mutating and administrative operations, and every
25//! access-control denial, are recorded to an append-only audit log (ADR-0011,
26//! the `audit` module) when `audit_log` is set. Per-request cost limits bound
27//! the work any single authenticated request can demand (ADR-0040, the
28//! [`Limits`] type), and an opt-in per-key token-bucket rate limiter bounds the
29//! request *rate* (ADR-0049, the [`RateLimiter`] type); per-tenant engine
30//! partitioning is a later phase. Design: `docs/api/rest-grpc.md`.
31
32mod audit;
33mod auth;
34mod cluster;
35mod error;
36mod grpc;
37mod metrics;
38mod otlp;
39mod rate_limit;
40mod replication;
41mod rest;
42
43use std::collections::{HashMap, HashSet};
44use std::future::Future;
45use std::net::{IpAddr, Ipv4Addr, SocketAddr};
46use std::path::PathBuf;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex, RwLock};
49
50use axum_server::tls_rustls::RustlsConfig;
51use figment::Figment;
52use figment::providers::{Env, Format, Serialized, Toml};
53use serde::{Deserialize, Serialize};
54use serde_json::Value;
55use tokio::net::TcpListener;
56use tokio::sync::broadcast;
57use tokio_stream::wrappers::TcpListenerStream;
58use tonic::transport::{Certificate, Identity, ServerTlsConfig};
59
60use quiver_crypto::AeadCodec;
61use quiver_embed::{
62    Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
63    SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
64};
65use quiver_query::Filter;
66
67pub use auth::{Action, ApiKey, CollectionScope};
68pub use error::Error;
69pub use otlp::OtlpConfig;
70// The embedding/rerank seam lives in its own lean crate (ADR-0058) so the MCP
71// server can share it; re-exported here so the server's public API is unchanged.
72pub use quiver_providers::{
73    EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
74    RerankProvider,
75};
76pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
77
78use audit::{AuditLog, Outcome};
79use auth::Principal;
80
81/// Per-request cost limits (ADR-0040). Each cap bounds the work a single
82/// authenticated request can demand, so one oversized request cannot exhaust the
83/// node under the single-writer model (ADR-0006). Over-limit requests are
84/// **rejected** with HTTP 400 / gRPC `InvalidArgument` rather than silently
85/// clamped — a truncated `k` or `ef_search` would return surprising, lower-quality
86/// results with no signal. Defaults are generous; raise a cap with a `[limits]`
87/// table in `quiver.toml` or the matching `QUIVER_MAX_*` environment variable.
88#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
89#[serde(default)]
90pub struct Limits {
91    /// Maximum `k` (top-k) for `search` / `search_multi_vector`.
92    pub max_k: usize,
93    /// Maximum search beam width (`ef_search`).
94    pub max_ef_search: usize,
95    /// Maximum `fetch` page size.
96    pub max_fetch_limit: usize,
97    /// Maximum vector dimension: the declared collection dimension at creation
98    /// and the length of any query vector (per token, for multi-vector).
99    pub max_vector_dim: usize,
100    /// Maximum serialized-JSON payload size per point, in bytes.
101    pub max_payload_bytes: usize,
102    /// Maximum number of points / documents in a single upsert request.
103    pub max_batch_size: usize,
104    /// Maximum HTTP request body size, in bytes (enforced by the REST layer via
105    /// axum's `DefaultBodyLimit`; gRPC is bounded by tonic's decode limit).
106    pub max_request_body_bytes: usize,
107    /// Maximum number of non-zero terms in a hybrid-search sparse query (ADR-0043),
108    /// bounding the posting-list scan.
109    pub max_sparse_terms: usize,
110    /// Maximum number of points in a single bulk upsert (`POST …/points:bulk`,
111    /// ADR-0045). Larger than `max_batch_size` because the bulk path defers index
112    /// maintenance to one rebuild; the request is still bounded by
113    /// `max_request_body_bytes`, so raise that too for very large bulk loads.
114    pub max_bulk_batch_size: usize,
115}
116
117impl Default for Limits {
118    fn default() -> Self {
119        Self {
120            max_k: 10_000,
121            max_ef_search: 4_096,
122            max_fetch_limit: 10_000,
123            max_vector_dim: 8_192,
124            max_payload_bytes: 65_536,
125            max_batch_size: 1_000,
126            max_request_body_bytes: 32 * 1024 * 1024,
127            max_sparse_terms: 4_096,
128            max_bulk_batch_size: 50_000,
129        }
130    }
131}
132
133impl Limits {
134    // Apply `QUIVER_MAX_*` overrides after figment extraction (ADR-0013 env
135    // layer). The flat env keys do not nest under figment's `limits` table, so
136    // they are read explicitly here; a malformed value is a hard config error.
137    fn apply_env_overrides(&mut self) -> Result<(), Error> {
138        let slots: [(&str, &mut usize); 9] = [
139            ("QUIVER_MAX_K", &mut self.max_k),
140            ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
141            ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
142            ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
143            ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
144            ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
145            (
146                "QUIVER_MAX_REQUEST_BODY_BYTES",
147                &mut self.max_request_body_bytes,
148            ),
149            ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
150            ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
151        ];
152        for (key, slot) in slots {
153            if let Ok(raw) = std::env::var(key) {
154                *slot = raw.parse().map_err(|_| {
155                    Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
156                })?;
157            }
158        }
159        Ok(())
160    }
161
162    // Reject a zero cap (a `0` limit would refuse every request).
163    fn validate(&self) -> Result<(), Error> {
164        let named = [
165            ("max_k", self.max_k),
166            ("max_ef_search", self.max_ef_search),
167            ("max_fetch_limit", self.max_fetch_limit),
168            ("max_vector_dim", self.max_vector_dim),
169            ("max_payload_bytes", self.max_payload_bytes),
170            ("max_batch_size", self.max_batch_size),
171            ("max_request_body_bytes", self.max_request_body_bytes),
172            ("max_sparse_terms", self.max_sparse_terms),
173            ("max_bulk_batch_size", self.max_bulk_batch_size),
174        ];
175        if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
176            return Err(Error::Config(format!(
177                "limits.{name} must be greater than zero"
178            )));
179        }
180        Ok(())
181    }
182
183    fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
184        if k > self.max_k {
185            return Err(Error::BadRequest(format!(
186                "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
187                self.max_k
188            )));
189        }
190        if ef_search > self.max_ef_search {
191            return Err(Error::BadRequest(format!(
192                "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
193                self.max_ef_search
194            )));
195        }
196        Ok(())
197    }
198
199    fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
200        if n > self.max_sparse_terms {
201            return Err(Error::BadRequest(format!(
202                "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
203                self.max_sparse_terms
204            )));
205        }
206        Ok(())
207    }
208
209    fn check_fetch(&self, limit: usize) -> Result<(), Error> {
210        if limit > self.max_fetch_limit {
211            return Err(Error::BadRequest(format!(
212                "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
213                self.max_fetch_limit
214            )));
215        }
216        Ok(())
217    }
218
219    fn check_dim(&self, dim: usize) -> Result<(), Error> {
220        if dim > self.max_vector_dim {
221            return Err(Error::BadRequest(format!(
222                "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
223                self.max_vector_dim
224            )));
225        }
226        Ok(())
227    }
228
229    fn check_vector_len(&self, len: usize) -> Result<(), Error> {
230        if len > self.max_vector_dim {
231            return Err(Error::BadRequest(format!(
232                "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
233                self.max_vector_dim
234            )));
235        }
236        Ok(())
237    }
238
239    fn check_batch(&self, n: usize) -> Result<(), Error> {
240        if n > self.max_batch_size {
241            return Err(Error::BadRequest(format!(
242                "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
243                self.max_batch_size
244            )));
245        }
246        Ok(())
247    }
248
249    fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
250        if n > self.max_bulk_batch_size {
251            return Err(Error::BadRequest(format!(
252                "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
253                self.max_bulk_batch_size
254            )));
255        }
256        Ok(())
257    }
258
259    fn check_payload(&self, payload: &Value) -> Result<(), Error> {
260        let size = serde_json::to_vec(payload)
261            .map(|v| v.len())
262            .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
263        if size > self.max_payload_bytes {
264            return Err(Error::BadRequest(format!(
265                "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
266                self.max_payload_bytes
267            )));
268        }
269        Ok(())
270    }
271}
272
273/// Server configuration, layered defaults → `quiver.toml` → `QUIVER_*` env and
274/// validated at startup (ADR-0013).
275#[derive(Debug, Clone, Serialize, Deserialize)]
276#[serde(default)]
277pub struct Config {
278    /// Data directory for the storage engine.
279    pub data_dir: PathBuf,
280    /// REST (HTTP/1.1) bind address.
281    pub rest_addr: SocketAddr,
282    /// gRPC (HTTP/2) bind address.
283    pub grpc_addr: SocketAddr,
284    /// Accepted API keys with their RBAC scopes (ADR-0011). A bare secret —
285    /// from a comma-separated `QUIVER_API_KEYS` (the env form) or a plain TOML
286    /// array entry — is an all-collections admin key; a structured
287    /// `{secret, role, collections}` entry pins a narrower scope. Empty is
288    /// allowed only with `insecure = true`.
289    #[serde(default, deserialize_with = "auth::de_api_keys")]
290    pub api_keys: Vec<ApiKey>,
291    /// Hex-encoded 256-bit **master key** for encryption-at-rest (64 hex
292    /// characters). It wraps per-collection data-encryption keys (ADR-0010).
293    /// Required unless `insecure = true` or [`master_key_file`] is set; source it
294    /// from the environment or a secret store, never the committed config. `None`
295    /// ⇒ data is stored unencrypted (only valid in `insecure` mode).
296    ///
297    /// [`master_key_file`]: Config::master_key_file
298    pub encryption_key: Option<String>,
299    /// Path to a file holding the hex master key, as an alternative to
300    /// [`encryption_key`] (set exactly one). Lets the key arrive as a mounted
301    /// secret (Docker/Kubernetes) or a KMS-decrypted file rather than an
302    /// environment variable. It should be mode `0600`; a group/world-accessible
303    /// file is warned about at startup.
304    ///
305    /// [`encryption_key`]: Config::encryption_key
306    pub master_key_file: Option<PathBuf>,
307    /// Path to the PEM-encoded TLS certificate chain. Must be set together with
308    /// `tls_key`. Required for a non-loopback bind unless `insecure = true`.
309    pub tls_cert: Option<PathBuf>,
310    /// Path to the PEM-encoded TLS private key. Must be set together with
311    /// `tls_cert`.
312    pub tls_key: Option<PathBuf>,
313    /// Path to a PEM-encoded CA certificate that signs accepted client
314    /// certificates. When set, both transports require **mutual TLS**: a client
315    /// must present a certificate chaining to this CA to connect (ADR-0011).
316    /// Requires `tls_cert`/`tls_key`; bearer API keys still carry the RBAC scope.
317    pub tls_client_ca: Option<PathBuf>,
318    /// Path to an append-only audit log file (ADR-0011). When set, every
319    /// mutating and administrative operation and every access-control denial is
320    /// appended as one JSON object per line (JSON Lines); records are always
321    /// also emitted as `tracing` events. Unset ⇒ tracing only. Secrets are never
322    /// written — see `docs/security/audit.md`.
323    pub audit_log: Option<PathBuf>,
324    /// Run as a **read-replica follower** (ADR-0030): connect to a leader's gRPC
325    /// endpoint at this URL (e.g. `http://leader:6334`) and continuously apply its
326    /// committed operations, serving reads. A follower **refuses writes**. Unset ⇒
327    /// this node is a normal read-write leader. (Plaintext `http://` for now; TLS
328    /// to the leader is a follow-up — run replication over a trusted network.)
329    pub leader_url: Option<String>,
330    /// API key the follower presents to the leader's admin-scoped `Replicate`
331    /// stream (used with `leader_url`). Source it like any secret.
332    pub leader_api_key: Option<String>,
333    /// Opt out of the secure defaults (no auth, no encryption-at-rest, allow a
334    /// non-loopback bind without TLS). For local development only; never the
335    /// default.
336    pub insecure: bool,
337    /// Per-request cost limits (ADR-0040). Set with a `[limits]` table in
338    /// `quiver.toml` or the `QUIVER_MAX_*` environment variables.
339    pub limits: Limits,
340    /// Opt-in server-side embedding providers, keyed by collection name (ADR-0047).
341    /// Configured with `[embedding.<collection>]` tables in `quiver.toml`; default
342    /// empty, so the engine stays model-agnostic and library mode is unaffected.
343    /// API keys are referenced by env-var *name* and resolved at startup, never
344    /// stored. Enables `search_text` / `upsert_text` for the named collections.
345    #[serde(default)]
346    pub embedding: HashMap<String, EmbeddingConfig>,
347    /// Opt-in server-side rerank providers, keyed by collection name (ADR-0047).
348    /// Configured with `[rerank.<collection>]` tables; enables the one-call
349    /// retrieve→rerank stage of `search_text`.
350    #[serde(default)]
351    pub rerank: HashMap<String, RerankConfig>,
352    /// Opt-in per-key rate limiting (ADR-0049). Set a `[rate_limit]` table in
353    /// `quiver.toml` or the `QUIVER_RATE_LIMIT_*` env vars;
354    /// `requests_per_second = 0` (the default) disables it.
355    #[serde(default)]
356    pub rate_limit: RateLimitConfig,
357    /// Opt-in OpenTelemetry traces export (ADR-0059). Set an `[otlp]` table or the
358    /// `QUIVER_OTLP_*` env vars; an empty `endpoint` (the default) disables it.
359    /// Export additionally requires the server's `otlp` build feature.
360    #[serde(default)]
361    pub otlp: OtlpConfig,
362    /// Lock-free MVCC reads (ADR-0064), **experimental and default-off**. Serves
363    /// reads of single-vector, in-memory collections from an `arc-swap` snapshot so
364    /// pure-vector reads never block on a concurrent writer. Also settable via
365    /// `QUIVER_MVCC_READS`. See `docs/adr/0064-mvcc-reads-implementation.md`.
366    #[serde(default)]
367    pub mvcc_reads: bool,
368    /// Opt-in **cluster router** mode (ADR-0065): a non-empty list of shard base
369    /// URLs (e.g. `["http://s1:6333","http://s2:6333"]`, or `QUIVER_CLUSTER_SHARDS`
370    /// comma-separated) makes this server a stateless router that shards writes and
371    /// scatter-gathers searches across the shards. Empty (the default) = an ordinary
372    /// single-node server.
373    #[serde(default)]
374    pub cluster_shards: Vec<String>,
375    /// Optional API key the router presents to its shards (a cluster runs over a
376    /// trusted network). `None` = shards are unauthenticated.
377    #[serde(default)]
378    pub cluster_shard_key: Option<String>,
379}
380
381impl Default for Config {
382    fn default() -> Self {
383        Self {
384            data_dir: PathBuf::from("./quiver-data"),
385            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
386            grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
387            api_keys: Vec::new(),
388            encryption_key: None,
389            master_key_file: None,
390            tls_cert: None,
391            tls_key: None,
392            tls_client_ca: None,
393            audit_log: None,
394            leader_url: None,
395            leader_api_key: None,
396            insecure: false,
397            limits: Limits::default(),
398            embedding: HashMap::new(),
399            rerank: HashMap::new(),
400            rate_limit: RateLimitConfig::default(),
401            otlp: OtlpConfig::default(),
402            mvcc_reads: false,
403            cluster_shards: Vec::new(),
404            cluster_shard_key: None,
405        }
406    }
407}
408
409impl Config {
410    /// Load configuration from defaults, an optional `quiver.toml`, and
411    /// `QUIVER_*` environment variables.
412    pub fn load() -> Result<Self, Error> {
413        let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
414            .merge(Toml::file("quiver.toml"))
415            .merge(Env::prefixed("QUIVER_"))
416            .extract()
417            .map_err(|e| Error::Config(e.to_string()))?;
418        // The flat `QUIVER_MAX_*` env keys do not nest under the `limits` table
419        // figment builds, so apply them explicitly (ADR-0040).
420        config.limits.apply_env_overrides()?;
421        // Same for the flat `QUIVER_RATE_LIMIT_*` keys (ADR-0049).
422        config
423            .rate_limit
424            .apply_env_overrides()
425            .map_err(Error::Config)?;
426        // Same for the flat `QUIVER_OTLP_*` keys (ADR-0059).
427        config.otlp.apply_env_overrides().map_err(Error::Config)?;
428        Ok(config)
429    }
430
431    /// Reject insecure configurations unless explicitly opted out (ADR-0013):
432    /// no anonymous access, encryption-at-rest on by default with a valid key,
433    /// and no non-loopback bind without TLS.
434    pub fn validate(&self) -> Result<(), Error> {
435        if self.api_keys.is_empty() && !self.insecure {
436            return Err(Error::Config(
437                "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
438                 set insecure=true for local development"
439                    .to_owned(),
440            ));
441        }
442        // Resolve the master key from the env var or a key file (exactly one).
443        let master_key = self.master_key_hex()?;
444        if master_key.is_none() && !self.insecure {
445            return Err(Error::Config(
446                "no encryption key configured: encryption-at-rest is on by default — \
447                 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
448                 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
449                 store data unencrypted (development only)"
450                    .to_owned(),
451            ));
452        }
453        // Fail fast on a malformed key rather than at first write.
454        if let Some(key) = &master_key {
455            AeadCodec::from_hex(key)
456                .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
457        }
458        // TLS certificate and key are set together or not at all.
459        if self.tls_cert.is_some() != self.tls_key.is_some() {
460            return Err(Error::Config(
461                "tls_cert and tls_key must be set together".to_owned(),
462            ));
463        }
464        // mTLS layers on top of server TLS: a client CA needs a server cert/key.
465        if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
466            return Err(Error::Config(
467                "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
468            ));
469        }
470        let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
471        let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
472        if non_loopback && !tls_enabled && !self.insecure {
473            return Err(Error::Config(
474                "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
475                 or insecure=true for local development"
476                    .to_owned(),
477            ));
478        }
479        // Reject a nonsensical cost limit (a `0` cap would refuse every request).
480        self.limits.validate()?;
481        Ok(())
482    }
483
484    /// The effective hex master key: from [`master_key_file`] when set (read and
485    /// trimmed), otherwise [`encryption_key`]. `None` means no key is configured
486    /// (only valid with `insecure`).
487    ///
488    /// # Errors
489    /// [`Error::Config`] if both sources are set, or the key file cannot be read.
490    ///
491    /// [`master_key_file`]: Config::master_key_file
492    /// [`encryption_key`]: Config::encryption_key
493    pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
494        let env_key = self
495            .encryption_key
496            .as_deref()
497            .map(str::trim)
498            .filter(|k| !k.is_empty());
499        match (&self.master_key_file, env_key) {
500            (Some(_), Some(_)) => Err(Error::Config(
501                "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
502                 (QUIVER_MASTER_KEY_FILE), not both"
503                    .to_owned(),
504            )),
505            (Some(path), None) => {
506                warn_if_world_readable(path);
507                let hex = std::fs::read_to_string(path).map_err(|e| {
508                    Error::Config(format!("reading master_key_file {}: {e}", path.display()))
509                })?;
510                Ok(Some(hex.trim().to_owned()))
511            }
512            (None, Some(key)) => Ok(Some(key.to_owned())),
513            (None, None) => Ok(None),
514        }
515    }
516}
517
518// Warn (don't fail — Docker/Kubernetes secrets often mount group/world-readable)
519// when a master-key file is more permissive than `0600`.
520#[cfg(unix)]
521fn warn_if_world_readable(path: &std::path::Path) {
522    use std::os::unix::fs::PermissionsExt;
523    if let Ok(meta) = std::fs::metadata(path)
524        && meta.permissions().mode() & 0o077 != 0
525    {
526        tracing::warn!(
527            path = %path.display(),
528            mode = format!("{:o}", meta.permissions().mode() & 0o777),
529            "master key file is group/world-accessible; restrict it to 0600"
530        );
531    }
532}
533
534#[cfg(not(unix))]
535fn warn_if_world_readable(_path: &std::path::Path) {}
536
537/// Shared server state: the engine behind a single-writer lock, the accepted
538/// API keys with their RBAC scopes, and the audit log.
539#[derive(Clone)]
540pub(crate) struct AppState {
541    // The engine behind a reader–writer lock (ADR-0057): reads take the shared
542    // lock and run concurrently; writes take the exclusive lock. A read that finds
543    // a collection's index stale upgrades to the write lock once to rebuild it,
544    // then serves concurrently again.
545    db: Arc<RwLock<Database>>,
546    keys: Arc<Vec<ApiKey>>,
547    audit: Arc<AuditLog>,
548    // Fan-out of every committed op to replication followers (ADR-0030). The
549    // commit observer publishes here; each `Replicate` stream subscribes.
550    replication_tx: broadcast::Sender<WalEntry>,
551    // True on a replication follower: external writes are refused; the engine's
552    // state is owned by the stream it applies from the leader (ADR-0030).
553    read_only: bool,
554    // Per-request cost limits, enforced at this op layer so both transports are
555    // covered by one implementation (ADR-0040).
556    limits: Limits,
557    // Opt-in, provider-agnostic server-side embedding/rerank providers, keyed by
558    // collection (ADR-0047). Empty on the common path; `search_text`/`upsert_text`
559    // require a configured embedder for the target collection.
560    embed: Arc<EmbedRegistry>,
561    // Opt-in per-key token-bucket rate limiter (ADR-0049). A no-op when disabled.
562    rate_limiter: Arc<RateLimiter>,
563    // Prometheus metrics registry (ADR-0014/0054), scraped at `GET /metrics`.
564    metrics: Arc<metrics::Metrics>,
565    // Collections with an off-lock index rebuild in flight (ADR-0062). A search that
566    // observes a deferred rebuild schedules one here, single-flighted so concurrent
567    // readers never kick off duplicate builds for the same collection.
568    rebuilding: Arc<Mutex<HashSet<String>>>,
569    // Lock-free MVCC read cache (ADR-0064 increment 3): cached `arc-swap` snapshot
570    // cells for MVCC-served collections. A pure-vector search loads the cell and
571    // searches it **without taking the database lock**, so it never blocks on a
572    // concurrent writer. Populated under the read lock the first time a collection
573    // is seen fresh; the map itself changes only on create/drop, never on a data
574    // write. Empty unless `QUIVER_MVCC_READS` is on.
575    snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
576    // Snapshot of the engine's `QUIVER_MVCC_READS` flag, read once at startup. When
577    // on, the writer drives off-lock consolidation rebuilds (ADR-0064 increment 3)
578    // because pure-vector fast-path reads bypass the reader-driven scheduler. Off by
579    // default, so the non-MVCC path is byte-identical.
580    mvcc: bool,
581    // Opt-in cluster router (ADR-0065): when `Some`, every collection/point op
582    // fans out to the shards instead of touching the local engine. `None` = an
583    // ordinary single-node server (the engine-backed path, untouched).
584    cluster: Option<Arc<cluster::Cluster>>,
585}
586
587/// A collection's metadata.
588pub(crate) struct CollectionInfo {
589    pub name: String,
590    pub dim: u32,
591    pub metric: DistanceMetric,
592    pub count: u64,
593    pub index: IndexSpec,
594    pub filterable: Vec<FilterableField>,
595    pub multivector: bool,
596    pub vector_encryption: VectorEncryption,
597}
598
599/// A point to upsert.
600pub(crate) struct PointIn {
601    pub id: String,
602    pub vector: Vec<f32>,
603    pub payload: Value,
604}
605
606/// A text point to embed server-side and upsert (ADR-0047).
607pub(crate) struct TextPointIn {
608    pub id: String,
609    pub text: String,
610    pub payload: Value,
611}
612
613/// Default candidate pool size a rerank stage over-fetches before reordering to
614/// the requested `k` (ADR-0047).
615const RERANK_CANDIDATES: usize = 50;
616
617/// The document text a rerank stage scores: the original text stored under
618/// [`TEXT_KEY`] by `upsert_text`, else the whole payload as a string so the
619/// reranker still has something to compare.
620fn doc_text(payload: Option<&Value>) -> String {
621    match payload {
622        Some(Value::Object(map)) => map
623            .get(TEXT_KEY)
624            .and_then(Value::as_str)
625            .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
626        Some(v) => v.to_string(),
627        None => String::new(),
628    }
629}
630
631/// A fetched point.
632pub(crate) struct PointOut {
633    pub id: String,
634    pub vector: Option<Vec<f32>>,
635    pub payload: Value,
636}
637
638/// A search hit.
639pub(crate) struct MatchOut {
640    pub id: String,
641    pub score: f32,
642    pub payload: Option<Value>,
643    pub vector: Option<Vec<f32>>,
644}
645
646/// A multi-vector document to upsert.
647pub(crate) struct DocumentIn {
648    pub id: String,
649    pub vectors: Vec<Vec<f32>>,
650    pub payload: Value,
651}
652
653/// A multi-vector document hit (MaxSim).
654pub(crate) struct DocumentMatchOut {
655    pub id: String,
656    pub score: f32,
657    pub payload: Option<Value>,
658    pub vectors: Option<Vec<Vec<f32>>>,
659}
660
661impl AppState {
662    /// Authenticate a presented bearer token to its [`Principal`], or `None`
663    /// (a 401). An empty key set means `insecure` mode (validated at startup),
664    /// which admits any caller as an all-collections admin.
665    pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
666        auth::authenticate(&self.keys, presented)
667    }
668
669    /// Consume one rate-limit token for `actor` (ADR-0049). A no-op `Allowed` when
670    /// rate limiting is disabled. Both transports call this at their auth choke
671    /// point so the limiter is enforced by one implementation.
672    pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
673        self.rate_limiter.check(actor)
674    }
675
676    /// Whether the per-key rate limiter is active (lets a transport skip the work
677    /// entirely on the common, disabled path).
678    pub(crate) fn rate_limit_enabled(&self) -> bool {
679        self.rate_limiter.enabled()
680    }
681
682    // Run a **mutating** engine op behind the exclusive write lock, off the async
683    // runtime (the engine is synchronous and CPU/IO-bound). The single writer is
684    // unchanged from the prior single-mutex model (ADR-0006/0057).
685    async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
686    where
687        T: Send + 'static,
688        F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
689    {
690        let db = Arc::clone(&self.db);
691        tokio::task::spawn_blocking(move || -> Result<T, Error> {
692            let mut guard = db
693                .write()
694                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
695            f(&mut guard).map_err(Error::Engine)
696        })
697        .await
698        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
699    }
700
701    // Run a **read-only** engine op behind the shared read lock — many of these run
702    // concurrently (ADR-0057). The closure gets `&Database`, so it can only call the
703    // `&self` reads (`*_snapshot`, `fetch`, accessors).
704    async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
705    where
706        T: Send + 'static,
707        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
708    {
709        let db = Arc::clone(&self.db);
710        tokio::task::spawn_blocking(move || -> Result<T, Error> {
711            let guard = db
712                .read()
713                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
714            f(&guard).map_err(Error::Engine)
715        })
716        .await
717        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
718    }
719
720    // Run a snapshot read for `collection` (ADR-0057/0062). Takes only the shared
721    // read lock, so concurrent searches run in parallel. When a prior write deferred
722    // this collection's rebuild, the snapshot read serves the **prior** snapshot
723    // (snapshot-isolated, slightly stale) rather than blocking, and we schedule an
724    // **off-lock** rebuild so the next read is fresh — the reader never waits on a
725    // rebuild under the exclusive lock.
726    async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
727    where
728        T: Send + 'static,
729        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
730    {
731        let db = Arc::clone(&self.db);
732        let cells = Arc::clone(&self.snapshot_cells);
733        let coll = collection.clone();
734        let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
735            let guard = db
736                .read()
737                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
738            let result = f(&guard).map_err(Error::Engine)?;
739            // Report staleness so the caller can schedule a background rebuild; a
740            // missing collection (raced a drop) is simply "nothing to rebuild".
741            let stale = guard.needs_rebuild(&coll).unwrap_or(false);
742            // Warm the lock-free read cache (ADR-0064 increment 3): once an
743            // MVCC-served collection is fresh (its base snapshot is published), cache
744            // its cell so subsequent pure-vector reads skip the lock. The cell
745            // self-updates as the writer republishes, so it is never re-fetched.
746            if !stale
747                && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
748                && let Ok(mut map) = cells.write()
749            {
750                map.entry(coll.clone()).or_insert(cell);
751            }
752            Ok((result, stale))
753        })
754        .await
755        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
756        if stale {
757            self.schedule_rebuild(collection);
758        }
759        Ok(result)
760    }
761
762    // The cached lock-free snapshot cell for `collection`, if one has been warmed
763    // (ADR-0064 increment 3). The map read contends only with create/drop, never
764    // with a data write — that is what makes the pure-vector read lock-free.
765    fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
766        self.snapshot_cells.read().ok()?.get(collection).cloned()
767    }
768
769    // After an MVCC-mode data write, schedule an off-lock consolidation rebuild if
770    // the write left the overlay stale (ADR-0064 increment 3): pure-vector fast-path
771    // reads bypass the reader-driven scheduler, so the writer drives consolidation.
772    // A brief read-lock staleness check; the rebuild itself runs off-lock and
773    // single-flighted. No-op (and not called) when MVCC is off.
774    async fn schedule_if_stale(&self, collection: &str) {
775        let db = Arc::clone(&self.db);
776        let coll = collection.to_owned();
777        let stale = tokio::task::spawn_blocking(move || {
778            db.read()
779                .ok()
780                .and_then(|g| g.needs_rebuild(&coll).ok())
781                .unwrap_or(false)
782        })
783        .await
784        .unwrap_or(false);
785        if stale {
786            self.schedule_rebuild(collection.to_owned());
787        }
788    }
789
790    // Schedule a single off-lock rebuild for `collection` (ADR-0062), deduplicated
791    // so concurrent readers that all observe the same deferred rebuild start exactly
792    // one. A no-op if one is already in flight.
793    fn schedule_rebuild(&self, collection: String) {
794        {
795            let mut inflight = match self.rebuilding.lock() {
796                Ok(g) => g,
797                Err(_) => return,
798            };
799            if !inflight.insert(collection.clone()) {
800                return; // already rebuilding this collection
801            }
802        }
803        let state = self.clone();
804        tokio::spawn(async move {
805            state.run_rebuild(&collection).await;
806            if let Ok(mut inflight) = state.rebuilding.lock() {
807                inflight.remove(&collection);
808            }
809        });
810    }
811
812    // Drive a collection's off-lock rebuild to completion (ADR-0062): capture the
813    // inputs under the shared read lock, build with no lock held, install under a
814    // brief write lock, and repeat while a write landed during the build (the commit
815    // reports the collection still stale). Each phase is a `spawn_blocking` so the
816    // CPU-bound build never stalls the async runtime; errors end the attempt (the
817    // collection stays stale and the next read reschedules).
818    async fn run_rebuild(&self, collection: &str) {
819        loop {
820            let db = Arc::clone(&self.db);
821            let coll = collection.to_owned();
822            let inputs = tokio::task::spawn_blocking(move || {
823                let guard = db.read().ok()?;
824                guard.snapshot_rebuild_inputs(&coll).ok().flatten()
825            })
826            .await
827            .ok()
828            .flatten();
829            let Some(inputs) = inputs else { return };
830
831            let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
832                return;
833            };
834
835            let db = Arc::clone(&self.db);
836            let still_stale = tokio::task::spawn_blocking(move || {
837                let mut guard = db.write().ok()?;
838                guard.commit_rebuild(rebuilt).ok()
839            })
840            .await
841            .ok()
842            .flatten();
843            match still_stale {
844                Some(true) => continue, // a write landed during the build — rebuild again
845                _ => return,
846            }
847        }
848    }
849
850    // Authorize `action` on `resource`, recording a denial in the audit log
851    // before propagating it. The shared choke point for both transports.
852    fn authorize(
853        &self,
854        principal: &Principal,
855        action: Action,
856        op: &str,
857        resource: &str,
858    ) -> Result<(), Error> {
859        principal
860            .require(action, Some(resource))
861            .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
862    }
863
864    // Authorize a collection-agnostic operation (listing): only the role is
865    // checked; a denial is recorded against the `*` resource.
866    fn authorize_global(
867        &self,
868        principal: &Principal,
869        action: Action,
870        op: &str,
871    ) -> Result<(), Error> {
872        principal
873            .require(action, None)
874            .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
875    }
876
877    /// Open a replication stream (ADR-0030): authorize (admin), then — in a single
878    /// engine critical section so no commit can interleave — subscribe to the live
879    /// commit tail and snapshot current state. The caller streams the snapshot,
880    /// then the tail from the receiver; because the subscription is taken under the
881    /// same lock as the snapshot, every post-snapshot op arrives on the receiver
882    /// and none is missed or duplicated.
883    pub(crate) async fn open_replication(
884        &self,
885        principal: &Principal,
886    ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
887        self.authorize_global(principal, Action::Admin, "replicate")?;
888        let tx = self.replication_tx.clone();
889        self.read_blocking(move |db| {
890            let rx = tx.subscribe();
891            let snapshot = db.replication_snapshot()?;
892            Ok((snapshot, rx))
893        })
894        .await
895    }
896
897    /// Apply a replicated op received from the leader (ADR-0030). Internal to the
898    /// follower stream — deliberately NOT gated by `read_only`, which only refuses
899    /// *external* client writes.
900    pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
901        self.write_blocking(move |db| db.apply_replicated(op)).await
902    }
903
904    // Refuse a mutating operation on a read-only replication follower (ADR-0030);
905    // its state is owned by the leader's stream, not by external clients.
906    fn ensure_writable(&self, op: &str) -> Result<(), Error> {
907        if self.read_only {
908            return Err(Error::Forbidden(format!(
909                "{op}: this node is a read-only replication follower"
910            )));
911        }
912        Ok(())
913    }
914
915    #[allow(clippy::too_many_arguments)]
916    pub(crate) async fn create_collection(
917        &self,
918        principal: &Principal,
919        name: String,
920        dim: u32,
921        metric: DistanceMetric,
922        index: IndexSpec,
923        filterable: Vec<FilterableField>,
924        multivector: bool,
925        vector_encryption: VectorEncryption,
926    ) -> Result<CollectionInfo, Error> {
927        self.ensure_writable("create_collection")?;
928        self.authorize(principal, Action::Admin, "create_collection", &name)?;
929        self.limits.check_dim(dim as usize)?;
930        if let Some(c) = &self.cluster {
931            return c
932                .create_collection(
933                    name,
934                    dim,
935                    metric,
936                    index,
937                    filterable,
938                    multivector,
939                    vector_encryption,
940                )
941                .await;
942        }
943        let descriptor = Descriptor::new(dim, Dtype::F32, metric)
944            .with_index(index)
945            .with_filterable(filterable.clone())
946            .with_multivector(multivector)
947            .with_vector_encryption(vector_encryption);
948        let owned = name.clone();
949        let result = self
950            .write_blocking(move |db| db.create_collection(&owned, descriptor))
951            .await;
952        self.audit.record(
953            principal.actor(),
954            "create_collection",
955            &name,
956            Outcome::of(&result),
957        );
958        result?;
959        Ok(CollectionInfo {
960            name,
961            dim,
962            metric,
963            count: 0,
964            index,
965            filterable,
966            multivector,
967            vector_encryption,
968        })
969    }
970
971    pub(crate) async fn get_collection(
972        &self,
973        principal: &Principal,
974        name: String,
975    ) -> Result<CollectionInfo, Error> {
976        self.authorize(principal, Action::Read, "get_collection", &name)?;
977        self.read_blocking(move |db| {
978            let descriptor = db
979                .descriptor(&name)
980                .cloned()
981                .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
982            // A multi-vector collection reports its document count, not its
983            // (much larger) token-row count.
984            let count = if descriptor.multivector {
985                db.document_count(&name)? as u64
986            } else {
987                db.len(&name)? as u64
988            };
989            Ok(CollectionInfo {
990                name,
991                dim: descriptor.dim,
992                metric: descriptor.metric,
993                count,
994                index: descriptor.index,
995                filterable: descriptor.filterable,
996                multivector: descriptor.multivector,
997                vector_encryption: descriptor.vector_encryption,
998            })
999        })
1000        .await
1001    }
1002
1003    pub(crate) async fn list_collections(
1004        &self,
1005        principal: &Principal,
1006    ) -> Result<Vec<CollectionInfo>, Error> {
1007        self.authorize_global(principal, Action::Read, "list_collections")?;
1008        let mut infos = self
1009            .read_blocking(|db| {
1010                let mut out = Vec::new();
1011                for name in db.collection_names() {
1012                    if let Some(descriptor) = db.descriptor(&name).cloned() {
1013                        let count = if descriptor.multivector {
1014                            db.document_count(&name)? as u64
1015                        } else {
1016                            db.len(&name)? as u64
1017                        };
1018                        out.push(CollectionInfo {
1019                            name,
1020                            dim: descriptor.dim,
1021                            metric: descriptor.metric,
1022                            count,
1023                            index: descriptor.index,
1024                            filterable: descriptor.filterable,
1025                            multivector: descriptor.multivector,
1026                            vector_encryption: descriptor.vector_encryption,
1027                        });
1028                    }
1029                }
1030                Ok(out)
1031            })
1032            .await?;
1033        // Never reveal collections outside the caller's scope.
1034        infos.retain(|info| principal.can_see(&info.name));
1035        Ok(infos)
1036    }
1037
1038    pub(crate) async fn delete_collection(
1039        &self,
1040        principal: &Principal,
1041        name: String,
1042    ) -> Result<bool, Error> {
1043        self.ensure_writable("delete_collection")?;
1044        self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1045        if let Some(c) = &self.cluster {
1046            return c.drop_collection(&name).await;
1047        }
1048        let resource = name.clone();
1049        let result = self
1050            .write_blocking(move |db| db.drop_collection(&name))
1051            .await;
1052        self.audit.record(
1053            principal.actor(),
1054            "delete_collection",
1055            &resource,
1056            Outcome::of(&result),
1057        );
1058        // Evict the dropped collection's cached lock-free cell (ADR-0064 increment 3)
1059        // so a later same-named collection never serves a stale cell.
1060        if matches!(result, Ok(true))
1061            && let Ok(mut map) = self.snapshot_cells.write()
1062        {
1063            map.remove(&resource);
1064        }
1065        result
1066    }
1067
1068    #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1069    pub(crate) async fn upsert(
1070        &self,
1071        principal: &Principal,
1072        collection: String,
1073        points: Vec<PointIn>,
1074    ) -> Result<u64, Error> {
1075        self.ensure_writable("upsert")?;
1076        self.authorize(principal, Action::Write, "upsert", &collection)?;
1077        self.limits.check_batch(points.len())?;
1078        for p in &points {
1079            self.limits.check_vector_len(p.vector.len())?;
1080            self.limits.check_payload(&p.payload)?;
1081        }
1082        if let Some(c) = &self.cluster {
1083            return c.upsert(&collection, points).await;
1084        }
1085        let resource = collection.clone();
1086        let result = self
1087            .write_blocking(move |db| {
1088                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1089                    .iter()
1090                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1091                    .collect();
1092                db.upsert_batch(&collection, &records)
1093            })
1094            .await;
1095        self.audit
1096            .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1097        if self.mvcc && result.is_ok() {
1098            self.schedule_if_stale(&resource).await;
1099        }
1100        result
1101    }
1102
1103    // Bulk upsert for a load-then-query workload (ADR-0045): one WAL fsync plus a
1104    // single deferred index-build pass, with the larger `max_bulk_batch_size` cap.
1105    pub(crate) async fn upsert_bulk(
1106        &self,
1107        principal: &Principal,
1108        collection: String,
1109        points: Vec<PointIn>,
1110    ) -> Result<u64, Error> {
1111        self.ensure_writable("upsert")?;
1112        self.authorize(principal, Action::Write, "upsert", &collection)?;
1113        self.limits.check_bulk_batch(points.len())?;
1114        for p in &points {
1115            self.limits.check_vector_len(p.vector.len())?;
1116            self.limits.check_payload(&p.payload)?;
1117        }
1118        if let Some(c) = &self.cluster {
1119            return c.upsert_bulk(&collection, points).await;
1120        }
1121        let resource = collection.clone();
1122        let result = self
1123            .write_blocking(move |db| {
1124                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1125                    .iter()
1126                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1127                    .collect();
1128                db.upsert_bulk(&collection, &records)
1129            })
1130            .await;
1131        self.audit.record(
1132            principal.actor(),
1133            "upsert_bulk",
1134            &resource,
1135            Outcome::of(&result),
1136        );
1137        if self.mvcc && result.is_ok() {
1138            self.schedule_if_stale(&resource).await;
1139        }
1140        result
1141    }
1142
1143    /// Take a consistent online snapshot of the whole database into a
1144    /// server-local `destination` directory (ADR-0050). A global admin
1145    /// operation; runs the checkpoint + copy on the blocking pool.
1146    #[tracing::instrument(skip_all)]
1147    pub(crate) async fn snapshot(
1148        &self,
1149        principal: &Principal,
1150        destination: String,
1151    ) -> Result<SnapshotInfo, Error> {
1152        self.ensure_writable("snapshot")?;
1153        self.authorize_global(principal, Action::Admin, "snapshot")?;
1154        let dest = std::path::PathBuf::from(&destination);
1155        let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1156        self.audit.record(
1157            principal.actor(),
1158            "snapshot",
1159            &destination,
1160            Outcome::of(&result),
1161        );
1162        result
1163    }
1164
1165    pub(crate) async fn delete_points(
1166        &self,
1167        principal: &Principal,
1168        collection: String,
1169        ids: Vec<String>,
1170    ) -> Result<u64, Error> {
1171        self.ensure_writable("delete_points")?;
1172        self.authorize(principal, Action::Write, "delete_points", &collection)?;
1173        if let Some(c) = &self.cluster {
1174            return c.delete_points(&collection, ids).await;
1175        }
1176        let resource = collection.clone();
1177        let result = self
1178            .write_blocking(move |db| {
1179                let mut count = 0u64;
1180                for id in &ids {
1181                    if db.delete(&collection, id)? {
1182                        count += 1;
1183                    }
1184                }
1185                Ok(count)
1186            })
1187            .await;
1188        self.audit.record(
1189            principal.actor(),
1190            "delete_points",
1191            &resource,
1192            Outcome::of(&result),
1193        );
1194        if self.mvcc && result.is_ok() {
1195            self.schedule_if_stale(&resource).await;
1196        }
1197        result
1198    }
1199
1200    pub(crate) async fn get_points(
1201        &self,
1202        principal: &Principal,
1203        collection: String,
1204        ids: Vec<String>,
1205        with_vector: bool,
1206    ) -> Result<Vec<PointOut>, Error> {
1207        self.authorize(principal, Action::Read, "get_points", &collection)?;
1208        if let Some(c) = &self.cluster {
1209            return c.get_points(&collection, ids, with_vector).await;
1210        }
1211        self.read_blocking(move |db| {
1212            let mut out = Vec::new();
1213            for id in &ids {
1214                if let Some(m) = db.get(&collection, id)? {
1215                    out.push(PointOut {
1216                        id: m.id,
1217                        vector: if with_vector { m.vector } else { None },
1218                        payload: m.payload.unwrap_or(Value::Null),
1219                    });
1220                }
1221            }
1222            Ok(out)
1223        })
1224        .await
1225    }
1226
1227    #[allow(clippy::too_many_arguments)]
1228    #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1229    pub(crate) async fn search(
1230        &self,
1231        principal: &Principal,
1232        collection: String,
1233        vector: Vec<f32>,
1234        k: usize,
1235        filter: Option<Filter>,
1236        ef_search: usize,
1237        with_payload: bool,
1238        with_vector: bool,
1239    ) -> Result<Vec<MatchOut>, Error> {
1240        self.authorize(principal, Action::Read, "search", &collection)?;
1241        self.limits.check_search(k, ef_search)?;
1242        self.limits.check_vector_len(vector.len())?;
1243
1244        // Cluster router (ADR-0065): scatter-gather across the shards, merge top-k.
1245        if let Some(c) = &self.cluster {
1246            return c
1247                .search(
1248                    &collection,
1249                    vector,
1250                    k,
1251                    filter,
1252                    ef_search,
1253                    with_payload,
1254                    with_vector,
1255                )
1256                .await;
1257        }
1258
1259        // Lock-free fast path (ADR-0064 increment 3): a **pure-vector** read of an
1260        // MVCC-served collection loads the cached snapshot cell and searches it with
1261        // **no database lock**, so it never blocks on a concurrent writer. Payload,
1262        // filter, and vector results need the store (behind the lock) and fall
1263        // through to `search_blocking`.
1264        if filter.is_none()
1265            && !with_payload
1266            && !with_vector
1267            && let Some(cell) = self.cached_cell(&collection)
1268        {
1269            return tokio::task::spawn_blocking(move || {
1270                let matches = cell.load().search(&vector, k, ef_search)?;
1271                Ok::<_, quiver_embed::Error>(
1272                    matches
1273                        .into_iter()
1274                        .map(|m| MatchOut {
1275                            id: m.id,
1276                            score: m.score,
1277                            payload: None,
1278                            vector: None,
1279                        })
1280                        .collect(),
1281                )
1282            })
1283            .await
1284            .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1285            .map_err(Error::Engine);
1286        }
1287
1288        let params = SearchParams {
1289            k,
1290            filter,
1291            ef_search,
1292            with_payload,
1293            with_vector,
1294        };
1295        let coll = collection.clone();
1296        self.search_blocking(coll, move |db| {
1297            let matches = db.search_snapshot(&collection, &vector, &params)?;
1298            Ok(matches
1299                .into_iter()
1300                .map(|m| MatchOut {
1301                    id: m.id,
1302                    score: m.score,
1303                    payload: m.payload,
1304                    vector: m.vector,
1305                })
1306                .collect())
1307        })
1308        .await
1309    }
1310
1311    #[allow(clippy::too_many_arguments)]
1312    pub(crate) async fn hybrid_search(
1313        &self,
1314        principal: &Principal,
1315        collection: String,
1316        dense: Option<Vec<f32>>,
1317        sparse: Option<(Vec<u32>, Vec<f32>)>,
1318        text: Option<String>,
1319        k: usize,
1320        filter: Option<Filter>,
1321        ef_search: usize,
1322        rrf_k0: f32,
1323        with_payload: bool,
1324        with_vector: bool,
1325    ) -> Result<Vec<MatchOut>, Error> {
1326        self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1327        self.limits.check_search(k, ef_search)?;
1328        if let Some(v) = &dense {
1329            self.limits.check_vector_len(v.len())?;
1330        }
1331        if let Some((indices, values)) = &sparse {
1332            self.limits.check_sparse_terms(indices.len())?;
1333            if indices.len() != values.len() {
1334                return Err(Error::BadRequest(format!(
1335                    "sparse query indices ({}) and values ({}) length mismatch",
1336                    indices.len(),
1337                    values.len()
1338                )));
1339            }
1340        }
1341        let params = SearchParams {
1342            k,
1343            filter,
1344            ef_search,
1345            with_payload,
1346            with_vector,
1347        };
1348        let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1349        let coll = collection.clone();
1350        self.search_blocking(coll, move |db| {
1351            let matches = db.hybrid_search_snapshot(
1352                &collection,
1353                dense.as_deref(),
1354                sv.as_ref(),
1355                text.as_deref(),
1356                &params,
1357                rrf_k0,
1358            )?;
1359            Ok(matches
1360                .into_iter()
1361                .map(|m| MatchOut {
1362                    id: m.id,
1363                    score: m.score,
1364                    payload: m.payload,
1365                    vector: m.vector,
1366                })
1367                .collect())
1368        })
1369        .await
1370    }
1371
1372    /// Embed `text` with the collection's configured provider and run a dense (or
1373    /// dense ⊕ BM25, if the collection has text) search, optionally reranking the
1374    /// candidates in one call (ADR-0047). The text is also passed to the BM25 side,
1375    /// so a `upsert_text` corpus yields hybrid lexical+semantic retrieval for free.
1376    #[allow(clippy::too_many_arguments)]
1377    pub(crate) async fn search_text(
1378        &self,
1379        principal: &Principal,
1380        collection: String,
1381        text: String,
1382        k: usize,
1383        filter: Option<Filter>,
1384        ef_search: usize,
1385        rrf_k0: f32,
1386        with_payload: bool,
1387        with_vector: bool,
1388        rerank: bool,
1389    ) -> Result<Vec<MatchOut>, Error> {
1390        self.authorize(principal, Action::Read, "search_text", &collection)?;
1391        self.limits.check_search(k, ef_search)?;
1392        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1393            Error::BadRequest(format!(
1394                "collection {collection:?} has no embedding provider configured \
1395                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1396            ))
1397        })?;
1398        // Embed off the async runtime: the provider call is blocking network I/O.
1399        let query = text.clone();
1400        let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1401            .await
1402            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1403            .map_err(|e| Error::Upstream(e.to_string()))?
1404            .into_iter()
1405            .next()
1406            .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1407        self.limits.check_vector_len(vector.len())?;
1408
1409        let reranker = if rerank {
1410            self.embed.reranker(&collection)
1411        } else {
1412            None
1413        };
1414        // Over-fetch when reranking so the reranker can reorder a wide candidate set
1415        // down to the requested `k`; fetch payloads when reranking (we need the doc
1416        // text) even if the caller did not ask for them.
1417        let need_payload = with_payload || reranker.is_some();
1418        let fetch_k = if reranker.is_some() {
1419            k.max(RERANK_CANDIDATES)
1420        } else {
1421            k
1422        };
1423
1424        let mut hits = self
1425            .hybrid_search(
1426                principal,
1427                collection,
1428                Some(vector),
1429                None,
1430                Some(text.clone()),
1431                fetch_k,
1432                filter,
1433                ef_search,
1434                rrf_k0,
1435                need_payload,
1436                with_vector,
1437            )
1438            .await?;
1439
1440        if let Some(rr) = reranker {
1441            let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1442            let query = text;
1443            let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1444                .await
1445                .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1446                .map_err(|e| Error::Upstream(e.to_string()))?;
1447            // Re-score each hit and sort by the rerank score, descending.
1448            let mut scored: Vec<(f32, MatchOut)> = scores
1449                .into_iter()
1450                .zip(hits)
1451                .map(|(s, mut h)| {
1452                    h.score = s;
1453                    (s, h)
1454                })
1455                .collect();
1456            scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1457            hits = scored.into_iter().map(|(_, h)| h).collect();
1458        }
1459
1460        hits.truncate(k);
1461        // Drop payloads we only fetched for reranking if the caller didn't want them.
1462        if !with_payload {
1463            for h in &mut hits {
1464                h.payload = None;
1465            }
1466        }
1467        Ok(hits)
1468    }
1469
1470    /// Embed each point's `text` with the collection's provider and upsert it as a
1471    /// dense point, co-populating the `__quiver_text__` payload key (ADR-0046) so the
1472    /// same text is indexed for BM25 — one call feeds both the dense and lexical
1473    /// sides (ADR-0047).
1474    pub(crate) async fn upsert_text(
1475        &self,
1476        principal: &Principal,
1477        collection: String,
1478        points: Vec<TextPointIn>,
1479    ) -> Result<u64, Error> {
1480        self.ensure_writable("upsert_text")?;
1481        self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1482        self.limits.check_batch(points.len())?;
1483        for p in &points {
1484            if !matches!(p.payload, Value::Object(_) | Value::Null) {
1485                return Err(Error::BadRequest(
1486                    "upsert_text payload must be a JSON object or null".to_owned(),
1487                ));
1488            }
1489        }
1490        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1491            Error::BadRequest(format!(
1492                "collection {collection:?} has no embedding provider configured \
1493                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1494            ))
1495        })?;
1496        let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1497        let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1498            .await
1499            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1500            .map_err(|e| Error::Upstream(e.to_string()))?;
1501        if vectors.len() != points.len() {
1502            return Err(Error::Upstream(format!(
1503                "embedding provider returned {} vectors for {} inputs",
1504                vectors.len(),
1505                points.len()
1506            )));
1507        }
1508        let dense: Vec<PointIn> = points
1509            .into_iter()
1510            .zip(vectors)
1511            .map(|(p, vector)| {
1512                let mut payload = match p.payload {
1513                    Value::Object(map) => map,
1514                    _ => serde_json::Map::new(),
1515                };
1516                // Don't clobber a caller-supplied text key.
1517                payload
1518                    .entry(TEXT_KEY.to_owned())
1519                    .or_insert_with(|| Value::String(p.text.clone()));
1520                PointIn {
1521                    id: p.id,
1522                    vector,
1523                    payload: Value::Object(payload),
1524                }
1525            })
1526            .collect();
1527        self.upsert(principal, collection, dense).await
1528    }
1529
1530    pub(crate) async fn fetch(
1531        &self,
1532        principal: &Principal,
1533        collection: String,
1534        filter: Option<Filter>,
1535        limit: usize,
1536        with_payload: bool,
1537        with_vector: bool,
1538    ) -> Result<Vec<MatchOut>, Error> {
1539        self.authorize(principal, Action::Read, "fetch", &collection)?;
1540        self.limits.check_fetch(limit)?;
1541        self.read_blocking(move |db| {
1542            let matches = db.fetch(
1543                &collection,
1544                filter.as_ref(),
1545                limit,
1546                with_payload,
1547                with_vector,
1548            )?;
1549            Ok(matches
1550                .into_iter()
1551                .map(|m| MatchOut {
1552                    id: m.id,
1553                    score: m.score,
1554                    payload: m.payload,
1555                    vector: m.vector,
1556                })
1557                .collect())
1558        })
1559        .await
1560    }
1561
1562    pub(crate) async fn upsert_documents(
1563        &self,
1564        principal: &Principal,
1565        collection: String,
1566        documents: Vec<DocumentIn>,
1567    ) -> Result<u64, Error> {
1568        self.ensure_writable("upsert_documents")?;
1569        self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1570        self.limits.check_batch(documents.len())?;
1571        for doc in &documents {
1572            self.limits.check_payload(&doc.payload)?;
1573            for token in &doc.vectors {
1574                self.limits.check_vector_len(token.len())?;
1575            }
1576        }
1577        let resource = collection.clone();
1578        let result = self
1579            .write_blocking(move |db| {
1580                let mut count = 0u64;
1581                for doc in &documents {
1582                    db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1583                    count += 1;
1584                }
1585                Ok(count)
1586            })
1587            .await;
1588        self.audit.record(
1589            principal.actor(),
1590            "upsert_documents",
1591            &resource,
1592            Outcome::of(&result),
1593        );
1594        result
1595    }
1596
1597    pub(crate) async fn delete_documents(
1598        &self,
1599        principal: &Principal,
1600        collection: String,
1601        ids: Vec<String>,
1602    ) -> Result<u64, Error> {
1603        self.ensure_writable("delete_documents")?;
1604        self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1605        let resource = collection.clone();
1606        let result = self
1607            .write_blocking(move |db| {
1608                let mut count = 0u64;
1609                for id in &ids {
1610                    if db.delete_document(&collection, id)? {
1611                        count += 1;
1612                    }
1613                }
1614                Ok(count)
1615            })
1616            .await;
1617        self.audit.record(
1618            principal.actor(),
1619            "delete_documents",
1620            &resource,
1621            Outcome::of(&result),
1622        );
1623        result
1624    }
1625
1626    #[allow(clippy::too_many_arguments)]
1627    pub(crate) async fn search_multi_vector(
1628        &self,
1629        principal: &Principal,
1630        collection: String,
1631        query: Vec<Vec<f32>>,
1632        k: usize,
1633        filter: Option<Filter>,
1634        ef_search: usize,
1635        with_payload: bool,
1636        with_vector: bool,
1637    ) -> Result<Vec<DocumentMatchOut>, Error> {
1638        self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1639        self.limits.check_search(k, ef_search)?;
1640        for token in &query {
1641            self.limits.check_vector_len(token.len())?;
1642        }
1643        let params = SearchParams {
1644            k,
1645            filter,
1646            ef_search,
1647            with_payload,
1648            with_vector,
1649        };
1650        let coll = collection.clone();
1651        self.search_blocking(coll, move |db| {
1652            let matches = db.search_multi_vector_snapshot(&collection, &query, &params)?;
1653            Ok(matches
1654                .into_iter()
1655                .map(|m| DocumentMatchOut {
1656                    id: m.id,
1657                    score: m.score,
1658                    payload: m.payload,
1659                    vectors: m.vectors,
1660                })
1661                .collect())
1662        })
1663        .await
1664    }
1665}
1666
1667/// How many recently-committed ops the leader buffers for replication followers
1668/// (ADR-0030). A follower that falls further behind than this re-bootstraps.
1669const REPLICATION_BUFFER: usize = 1024;
1670
1671/// Run the server from `config` until a shutdown signal (Ctrl-C).
1672pub async fn run(config: Config) -> Result<(), Error> {
1673    config.validate()?;
1674    let rest_listener = TcpListener::bind(config.rest_addr)
1675        .await
1676        .map_err(Error::Io)?;
1677    let grpc_listener = TcpListener::bind(config.grpc_addr)
1678        .await
1679        .map_err(Error::Io)?;
1680    tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1681    tokio::select! {
1682        result = serve(config, rest_listener, grpc_listener) => result,
1683        () = shutdown_signal() => {
1684            tracing::info!("shutdown signal received");
1685            Ok(())
1686        }
1687    }
1688}
1689
1690/// Serve REST and gRPC on the given (already-bound) listeners until a transport
1691/// error. Exposed so tests can bind ephemeral ports.
1692pub async fn serve(
1693    config: Config,
1694    rest_listener: TcpListener,
1695    grpc_listener: TcpListener,
1696) -> Result<(), Error> {
1697    let mut db = open_database(&config)?;
1698    let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1699    // Publish every committed op to replication followers (ADR-0030). The observer
1700    // runs inside the engine's write critical section; `broadcast::Sender::send` is
1701    // non-blocking, so it never stalls a write.
1702    let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1703    {
1704        let tx = replication_tx.clone();
1705        db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1706            let _ = tx.send(entry.clone());
1707        }));
1708    }
1709    // Build the opt-in embedding/rerank providers, resolving each `api_key_env`
1710    // from the environment now (ADR-0047) so a missing key fails fast at startup
1711    // rather than on the first request.
1712    let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1713        .map_err(|e| Error::Config(e.to_string()))?;
1714
1715    // Enable lock-free MVCC reads if requested (ADR-0064): `config.mvcc_reads` or
1716    // `QUIVER_MVCC_READS` (the latter also defaults the engine flag at open).
1717    if config.mvcc_reads {
1718        db.set_mvcc_reads(true);
1719    }
1720    let mvcc = db.mvcc_reads();
1721
1722    // Opt-in cluster router (ADR-0065): build the shard fan-out when configured.
1723    let cluster = if config.cluster_shards.is_empty() {
1724        None
1725    } else {
1726        let c = cluster::Cluster::new(
1727            config.cluster_shards.clone(),
1728            config.cluster_shard_key.clone(),
1729        )?;
1730        tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
1731        Some(Arc::new(c))
1732    };
1733
1734    let state = AppState {
1735        db: Arc::new(RwLock::new(db)),
1736        keys: Arc::new(config.api_keys.clone()),
1737        audit,
1738        replication_tx,
1739        read_only: config.leader_url.is_some(),
1740        limits: config.limits,
1741        embed: Arc::new(embed),
1742        rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1743        metrics: Arc::new(metrics::Metrics::default()),
1744        rebuilding: Arc::new(Mutex::new(HashSet::new())),
1745        snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
1746        mvcc,
1747        cluster,
1748    };
1749
1750    // A follower continuously applies the leader's committed-op stream (ADR-0030).
1751    if let Some(leader_url) = config.leader_url.clone() {
1752        replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1753    }
1754
1755    let app = rest::router(state.clone());
1756    let grpc = grpc::service(state);
1757
1758    let tls = load_tls(&config)?;
1759
1760    // REST: terminate TLS with axum-server when configured, else serve plaintext.
1761    let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1762        Some(material) => {
1763            let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1764            let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1765            let server =
1766                axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1767            Box::pin(async move {
1768                server
1769                    .serve(app.into_make_service())
1770                    .await
1771                    .map_err(Error::Io)
1772            })
1773        }
1774        None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1775    };
1776
1777    // gRPC: tonic terminates TLS itself (ring provider) when an identity is set.
1778    let mut grpc_builder = tonic::transport::Server::builder();
1779    if let Some(material) = &tls {
1780        let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1781        let mut tls_config = ServerTlsConfig::new().identity(identity);
1782        // Require client certificates chaining to the configured CA (mTLS).
1783        if let Some(ca_pem) = &material.client_ca_pem {
1784            tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1785        }
1786        grpc_builder = grpc_builder
1787            .tls_config(tls_config)
1788            .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1789    }
1790    let grpc_fut = async move {
1791        grpc_builder
1792            .add_service(grpc)
1793            .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1794            .await
1795            .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1796    };
1797
1798    tokio::try_join!(rest_fut, grpc_fut)?;
1799    Ok(())
1800}
1801
1802async fn shutdown_signal() {
1803    let _ = tokio::signal::ctrl_c().await;
1804}
1805
1806// Open the engine, enabling encryption-at-rest when a key is configured. The
1807// configured key is the **master key** of an envelope key-ring (ADR-0010): it
1808// wraps a per-collection data-encryption key, so dropping a collection
1809// crypto-shreds it. With no key (only valid in `insecure` mode, enforced by
1810// `Config::validate`) the engine is opened in plaintext.
1811fn open_database(config: &Config) -> Result<Database, Error> {
1812    let master_key = config.master_key_hex()?;
1813    let keyring =
1814        quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1815            .map_err(|e| Error::Config(e.to_string()))?;
1816    let db = match keyring {
1817        Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1818        None => Database::open(&config.data_dir)?,
1819    };
1820    Ok(db)
1821}
1822
1823// TLS material shared by both transports: the raw PEM (for tonic's `Identity`
1824// and `client_ca_root`) and a parsed rustls server config (for axum-server's
1825// REST acceptor). `client_ca_pem` is set only when mutual TLS is configured.
1826struct TlsMaterial {
1827    cert_pem: Vec<u8>,
1828    key_pem: Vec<u8>,
1829    client_ca_pem: Option<Vec<u8>>,
1830    rest_config: Arc<rustls::ServerConfig>,
1831}
1832
1833// Read the configured certificate, key, and optional client CA, returning `None`
1834// when TLS is not configured. `Config::validate` already enforces that the cert
1835// and key are set together, that a client CA requires them, and that a
1836// non-loopback bind requires TLS.
1837fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1838    match (&config.tls_cert, &config.tls_key) {
1839        (Some(cert_path), Some(key_path)) => {
1840            let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1841            let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1842            let client_ca_pem = config
1843                .tls_client_ca
1844                .as_ref()
1845                .map(std::fs::read)
1846                .transpose()
1847                .map_err(Error::Io)?;
1848            let rest_config = Arc::new(rustls_server_config(
1849                &cert_pem,
1850                &key_pem,
1851                client_ca_pem.as_deref(),
1852            )?);
1853            Ok(Some(TlsMaterial {
1854                cert_pem,
1855                key_pem,
1856                client_ca_pem,
1857                rest_config,
1858            }))
1859        }
1860        (None, None) => Ok(None),
1861        _ => Err(Error::Config(
1862            "tls_cert and tls_key must be set together".to_owned(),
1863        )),
1864    }
1865}
1866
1867// Build a rustls server config from PEM bytes over the audited `ring` provider
1868// (no OpenSSL, no aws-lc-rs C toolchain). TLS 1.3 and 1.2 are offered. When a
1869// client CA is supplied, client certificates chaining to it are required (mTLS).
1870fn rustls_server_config(
1871    cert_pem: &[u8],
1872    key_pem: &[u8],
1873    client_ca_pem: Option<&[u8]>,
1874) -> Result<rustls::ServerConfig, Error> {
1875    use rustls_pki_types::pem::PemObject;
1876    use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1877
1878    let certs = CertificateDer::pem_slice_iter(cert_pem)
1879        .collect::<std::result::Result<Vec<_>, _>>()
1880        .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1881    if certs.is_empty() {
1882        return Err(Error::Config(
1883            "tls_cert contains no certificates".to_owned(),
1884        ));
1885    }
1886    let key = PrivateKeyDer::from_pem_slice(key_pem)
1887        .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1888    let provider = Arc::new(rustls::crypto::ring::default_provider());
1889    let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1890        .with_safe_default_protocol_versions()
1891        .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1892    let builder = match client_ca_pem {
1893        Some(ca_pem) => {
1894            let mut roots = rustls::RootCertStore::empty();
1895            for cert in CertificateDer::pem_slice_iter(ca_pem) {
1896                let cert =
1897                    cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1898                roots
1899                    .add(cert)
1900                    .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1901            }
1902            let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1903                Arc::new(roots),
1904                provider,
1905            )
1906            .build()
1907            .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1908            builder.with_client_cert_verifier(verifier)
1909        }
1910        None => builder.with_no_client_auth(),
1911    };
1912    builder
1913        .with_single_cert(certs, key)
1914        .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1915}
1916
1917/// Initialize structured logging from `RUST_LOG` (defaulting to `info`). Safe to
1918/// call once at startup; a second call is ignored.
1919pub fn init_tracing() {
1920    init_observability(&Config::default());
1921}
1922
1923/// Install the global tracing subscriber: an `RUST_LOG`-driven `fmt` layer plus,
1924/// when the `otlp` feature is built **and** an OTLP endpoint is configured
1925/// (ADR-0059), an OpenTelemetry traces export layer. Safe to call once at
1926/// startup; a second call is a no-op. A failure to build the OTLP exporter logs a
1927/// warning and falls back to `fmt`-only rather than taking the server down.
1928#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1929pub fn init_observability(config: &Config) {
1930    use tracing_subscriber::EnvFilter;
1931    use tracing_subscriber::prelude::*;
1932    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1933    let registry = tracing_subscriber::registry()
1934        .with(filter)
1935        .with(tracing_subscriber::fmt::layer());
1936
1937    #[cfg(feature = "otlp")]
1938    if config.otlp.is_enabled() {
1939        match otlp::build_provider(&config.otlp) {
1940            Ok(provider) => {
1941                use opentelemetry::trace::TracerProvider as _;
1942                let tracer = provider.tracer("quiver");
1943                otlp::store_provider(provider);
1944                let _ = registry
1945                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
1946                    .try_init();
1947                return;
1948            }
1949            Err(e) => eprintln!("OTLP traces export disabled: {e}"),
1950        }
1951    }
1952
1953    let _ = registry.try_init();
1954}
1955
1956/// Flush and shut down the OpenTelemetry exporter, if one was installed. A no-op
1957/// without the `otlp` feature or when no endpoint was configured. Call once on
1958/// server shutdown so batched spans are not lost.
1959pub fn shutdown_observability() {
1960    #[cfg(feature = "otlp")]
1961    otlp::shutdown();
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966    use super::*;
1967
1968    // A valid 64-hex-character (256-bit) test key.
1969    const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
1970
1971    #[test]
1972    fn config_rejects_missing_keys_unless_insecure() {
1973        let mut config = Config::default();
1974        assert!(config.validate().is_err());
1975        config.insecure = true;
1976        assert!(config.validate().is_ok());
1977        config.insecure = false;
1978        config.api_keys = vec!["secret".into()];
1979        config.encryption_key = Some(TEST_KEY.to_owned());
1980        assert!(config.validate().is_ok());
1981    }
1982
1983    #[test]
1984    fn config_requires_encryption_key_unless_insecure() {
1985        let mut config = Config {
1986            api_keys: vec!["secret".into()],
1987            ..Config::default()
1988        };
1989        // API key set but no encryption key, not insecure ⇒ rejected.
1990        assert!(config.validate().is_err());
1991        config.encryption_key = Some(TEST_KEY.to_owned());
1992        assert!(config.validate().is_ok());
1993        // A malformed key is rejected up front, not at first write.
1994        config.encryption_key = Some("not-a-valid-hex-key".to_owned());
1995        assert!(config.validate().is_err());
1996        // Insecure mode may run without encryption-at-rest.
1997        config.insecure = true;
1998        config.encryption_key = None;
1999        assert!(config.validate().is_ok());
2000    }
2001
2002    #[test]
2003    fn master_key_file_is_an_alternative_to_the_env_key() {
2004        let dir = tempfile::tempdir().unwrap();
2005        let path = dir.path().join("master.key");
2006        // A trailing newline (as editors and `echo` add) is trimmed off.
2007        std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2008
2009        let mut config = Config {
2010            api_keys: vec!["secret".into()],
2011            master_key_file: Some(path.clone()),
2012            ..Config::default()
2013        };
2014        // The file alone satisfies encryption-at-rest and resolves to the key.
2015        assert!(config.validate().is_ok());
2016        assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2017
2018        // Setting both the env key and a file is rejected as ambiguous.
2019        config.encryption_key = Some(TEST_KEY.to_owned());
2020        assert!(config.validate().is_err());
2021
2022        // A file holding malformed hex is rejected up front.
2023        config.encryption_key = None;
2024        std::fs::write(&path, "not-a-valid-key").unwrap();
2025        assert!(config.validate().is_err());
2026    }
2027
2028    #[test]
2029    fn config_rejects_public_bind_without_optout() {
2030        let mut config = Config {
2031            api_keys: vec!["secret".into()],
2032            encryption_key: Some(TEST_KEY.to_owned()),
2033            ..Config::default()
2034        };
2035        config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2036        // Auth and encryption are satisfied, so the only failure is the bind rule.
2037        assert!(config.validate().is_err());
2038        config.insecure = true;
2039        assert!(config.validate().is_ok());
2040    }
2041
2042    #[test]
2043    fn config_public_bind_allowed_with_tls() {
2044        let config = Config {
2045            api_keys: vec!["secret".into()],
2046            encryption_key: Some(TEST_KEY.to_owned()),
2047            tls_cert: Some(PathBuf::from("cert.pem")),
2048            tls_key: Some(PathBuf::from("key.pem")),
2049            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2050            ..Config::default()
2051        };
2052        // TLS configured ⇒ a non-loopback bind is allowed without insecure.
2053        assert!(config.validate().is_ok());
2054    }
2055
2056    #[test]
2057    fn config_tls_cert_and_key_must_pair() {
2058        let mut config = Config {
2059            api_keys: vec!["secret".into()],
2060            encryption_key: Some(TEST_KEY.to_owned()),
2061            tls_cert: Some(PathBuf::from("cert.pem")),
2062            ..Config::default()
2063        };
2064        // Cert without key ⇒ rejected.
2065        assert!(config.validate().is_err());
2066        config.tls_key = Some(PathBuf::from("key.pem"));
2067        assert!(config.validate().is_ok());
2068    }
2069}