Skip to main content

quiver_server/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The Quiver daemon: gRPC and REST over the embeddable [`Database`], with
3//! API-key auth and secure-by-default configuration.
4//!
5//! Both transports are thin shells over the same shared engine operations; the
6//! engine is synchronous and CPU/`fsync`-bound, so every call is offloaded with
7//! `spawn_blocking`. Access is guarded by a reader–writer lock (ADR-0057): reads
8//! take the shared lock and run **concurrently**, writes take the exclusive lock,
9//! and the single-writer model is unchanged (ADR-0002/0006). A read that finds a
10//! collection's index stale (a prior write deferred its rebuild) serves the prior
11//! snapshot and schedules an **off-lock** rebuild (ADR-0062): the index is rebuilt
12//! under the shared lock and swapped in under a brief write lock, so a rebuild never
13//! stalls concurrent readers.
14//!
15//! Auth is by scoped API key (Bearer / gRPC `authorization` metadata) with
16//! default-deny RBAC: each key carries a role (read ⊆ write ⊆ admin) and a
17//! collection scope, enforced on every operation at the shared op layer
18//! (ADR-0011/0013, the `auth` module). Encryption-at-rest is on by default
19//! (ADR-0010): unless `insecure` is set, an `encryption_key` is required and the
20//! engine is opened through `quiver-crypto`'s AEAD codec; payloads may also be
21//! client-side-encrypted (ADR-0012). TLS-in-transit uses `rustls` over the
22//! audited `ring` provider — REST via `axum-server`, gRPC via tonic's `tls-ring`
23//! — and a non-loopback bind requires it; setting a client CA additionally
24//! requires mutual TLS. Mutating and administrative operations, and every
25//! access-control denial, are recorded to an append-only audit log (ADR-0011,
26//! the `audit` module) when `audit_log` is set. Per-request cost limits bound
27//! the work any single authenticated request can demand (ADR-0040, the
28//! [`Limits`] type), and an opt-in per-key token-bucket rate limiter bounds the
29//! request *rate* (ADR-0049, the [`RateLimiter`] type); per-tenant engine
30//! partitioning is a later phase. Design: `docs/api/rest-grpc.md`.
31
32mod audit;
33mod auth;
34mod cluster;
35mod coordinator;
36mod error;
37mod grpc;
38mod metrics;
39mod otlp;
40mod rate_limit;
41mod replication;
42mod rest;
43
44use std::collections::{HashMap, HashSet};
45use std::future::Future;
46use std::net::{IpAddr, Ipv4Addr, SocketAddr};
47use std::path::PathBuf;
48use std::pin::Pin;
49use std::sync::{Arc, Mutex, RwLock};
50
51use axum_server::tls_rustls::RustlsConfig;
52use figment::Figment;
53use figment::providers::{Env, Format, Serialized, Toml};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use tokio::net::TcpListener;
57use tokio::sync::broadcast;
58use tokio_stream::wrappers::TcpListenerStream;
59use tonic::transport::{Certificate, Identity, ServerTlsConfig};
60
61use quiver_crypto::AeadCodec;
62use quiver_embed::{
63    Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
64    SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
65};
66use quiver_query::Filter;
67
68pub use auth::{Action, ApiKey, CollectionScope};
69pub use error::Error;
70pub use otlp::OtlpConfig;
71// The embedding/rerank seam lives in its own lean crate (ADR-0058) so the MCP
72// server can share it; re-exported here so the server's public API is unchanged.
73pub use quiver_providers::{
74    EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
75    RerankProvider,
76};
77pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
78
79use audit::{AuditLog, Outcome};
80use auth::Principal;
81
82/// Per-request cost limits (ADR-0040). Each cap bounds the work a single
83/// authenticated request can demand, so one oversized request cannot exhaust the
84/// node under the single-writer model (ADR-0006). Over-limit requests are
85/// **rejected** with HTTP 400 / gRPC `InvalidArgument` rather than silently
86/// clamped — a truncated `k` or `ef_search` would return surprising, lower-quality
87/// results with no signal. Defaults are generous; raise a cap with a `[limits]`
88/// table in `quiver.toml` or the matching `QUIVER_MAX_*` environment variable.
89#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
90#[serde(default)]
91pub struct Limits {
92    /// Maximum `k` (top-k) for `search` / `search_multi_vector`.
93    pub max_k: usize,
94    /// Maximum search beam width (`ef_search`).
95    pub max_ef_search: usize,
96    /// Maximum `fetch` page size.
97    pub max_fetch_limit: usize,
98    /// Maximum vector dimension: the declared collection dimension at creation
99    /// and the length of any query vector (per token, for multi-vector).
100    pub max_vector_dim: usize,
101    /// Maximum serialized-JSON payload size per point, in bytes.
102    pub max_payload_bytes: usize,
103    /// Maximum number of points / documents in a single upsert request.
104    pub max_batch_size: usize,
105    /// Maximum HTTP request body size, in bytes (enforced by the REST layer via
106    /// axum's `DefaultBodyLimit`; gRPC is bounded by tonic's decode limit).
107    pub max_request_body_bytes: usize,
108    /// Maximum number of non-zero terms in a hybrid-search sparse query (ADR-0043),
109    /// bounding the posting-list scan.
110    pub max_sparse_terms: usize,
111    /// Maximum number of points in a single bulk upsert (`POST …/points:bulk`,
112    /// ADR-0045). Larger than `max_batch_size` because the bulk path defers index
113    /// maintenance to one rebuild; the request is still bounded by
114    /// `max_request_body_bytes`, so raise that too for very large bulk loads.
115    pub max_bulk_batch_size: usize,
116}
117
118impl Default for Limits {
119    fn default() -> Self {
120        Self {
121            max_k: 10_000,
122            max_ef_search: 4_096,
123            max_fetch_limit: 10_000,
124            max_vector_dim: 8_192,
125            max_payload_bytes: 65_536,
126            max_batch_size: 1_000,
127            max_request_body_bytes: 32 * 1024 * 1024,
128            max_sparse_terms: 4_096,
129            max_bulk_batch_size: 50_000,
130        }
131    }
132}
133
134impl Limits {
135    // Apply `QUIVER_MAX_*` overrides after figment extraction (ADR-0013 env
136    // layer). The flat env keys do not nest under figment's `limits` table, so
137    // they are read explicitly here; a malformed value is a hard config error.
138    fn apply_env_overrides(&mut self) -> Result<(), Error> {
139        let slots: [(&str, &mut usize); 9] = [
140            ("QUIVER_MAX_K", &mut self.max_k),
141            ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
142            ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
143            ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
144            ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
145            ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
146            (
147                "QUIVER_MAX_REQUEST_BODY_BYTES",
148                &mut self.max_request_body_bytes,
149            ),
150            ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
151            ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
152        ];
153        for (key, slot) in slots {
154            if let Ok(raw) = std::env::var(key) {
155                *slot = raw.parse().map_err(|_| {
156                    Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
157                })?;
158            }
159        }
160        Ok(())
161    }
162
163    // Reject a zero cap (a `0` limit would refuse every request).
164    fn validate(&self) -> Result<(), Error> {
165        let named = [
166            ("max_k", self.max_k),
167            ("max_ef_search", self.max_ef_search),
168            ("max_fetch_limit", self.max_fetch_limit),
169            ("max_vector_dim", self.max_vector_dim),
170            ("max_payload_bytes", self.max_payload_bytes),
171            ("max_batch_size", self.max_batch_size),
172            ("max_request_body_bytes", self.max_request_body_bytes),
173            ("max_sparse_terms", self.max_sparse_terms),
174            ("max_bulk_batch_size", self.max_bulk_batch_size),
175        ];
176        if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
177            return Err(Error::Config(format!(
178                "limits.{name} must be greater than zero"
179            )));
180        }
181        Ok(())
182    }
183
184    fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
185        if k > self.max_k {
186            return Err(Error::BadRequest(format!(
187                "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
188                self.max_k
189            )));
190        }
191        if ef_search > self.max_ef_search {
192            return Err(Error::BadRequest(format!(
193                "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
194                self.max_ef_search
195            )));
196        }
197        Ok(())
198    }
199
200    fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
201        if n > self.max_sparse_terms {
202            return Err(Error::BadRequest(format!(
203                "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
204                self.max_sparse_terms
205            )));
206        }
207        Ok(())
208    }
209
210    fn check_fetch(&self, limit: usize) -> Result<(), Error> {
211        if limit > self.max_fetch_limit {
212            return Err(Error::BadRequest(format!(
213                "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
214                self.max_fetch_limit
215            )));
216        }
217        Ok(())
218    }
219
220    fn check_dim(&self, dim: usize) -> Result<(), Error> {
221        if dim > self.max_vector_dim {
222            return Err(Error::BadRequest(format!(
223                "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
224                self.max_vector_dim
225            )));
226        }
227        Ok(())
228    }
229
230    fn check_vector_len(&self, len: usize) -> Result<(), Error> {
231        if len > self.max_vector_dim {
232            return Err(Error::BadRequest(format!(
233                "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
234                self.max_vector_dim
235            )));
236        }
237        Ok(())
238    }
239
240    fn check_batch(&self, n: usize) -> Result<(), Error> {
241        if n > self.max_batch_size {
242            return Err(Error::BadRequest(format!(
243                "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
244                self.max_batch_size
245            )));
246        }
247        Ok(())
248    }
249
250    fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
251        if n > self.max_bulk_batch_size {
252            return Err(Error::BadRequest(format!(
253                "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
254                self.max_bulk_batch_size
255            )));
256        }
257        Ok(())
258    }
259
260    fn check_payload(&self, payload: &Value) -> Result<(), Error> {
261        let size = serde_json::to_vec(payload)
262            .map(|v| v.len())
263            .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
264        if size > self.max_payload_bytes {
265            return Err(Error::BadRequest(format!(
266                "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
267                self.max_payload_bytes
268            )));
269        }
270        Ok(())
271    }
272}
273
274/// Server configuration, layered defaults → `quiver.toml` → `QUIVER_*` env and
275/// validated at startup (ADR-0013).
276#[derive(Debug, Clone, Serialize, Deserialize)]
277#[serde(default)]
278pub struct Config {
279    /// Data directory for the storage engine.
280    pub data_dir: PathBuf,
281    /// REST (HTTP/1.1) bind address.
282    pub rest_addr: SocketAddr,
283    /// gRPC (HTTP/2) bind address.
284    pub grpc_addr: SocketAddr,
285    /// Accepted API keys with their RBAC scopes (ADR-0011). A bare secret —
286    /// from a comma-separated `QUIVER_API_KEYS` (the env form) or a plain TOML
287    /// array entry — is an all-collections admin key; a structured
288    /// `{secret, role, collections}` entry pins a narrower scope. Empty is
289    /// allowed only with `insecure = true`.
290    #[serde(default, deserialize_with = "auth::de_api_keys")]
291    pub api_keys: Vec<ApiKey>,
292    /// Hex-encoded 256-bit **master key** for encryption-at-rest (64 hex
293    /// characters). It wraps per-collection data-encryption keys (ADR-0010).
294    /// Required unless `insecure = true` or [`master_key_file`] is set; source it
295    /// from the environment or a secret store, never the committed config. `None`
296    /// ⇒ data is stored unencrypted (only valid in `insecure` mode).
297    ///
298    /// [`master_key_file`]: Config::master_key_file
299    pub encryption_key: Option<String>,
300    /// Path to a file holding the hex master key, as an alternative to
301    /// [`encryption_key`] (set exactly one). Lets the key arrive as a mounted
302    /// secret (Docker/Kubernetes) or a KMS-decrypted file rather than an
303    /// environment variable. It should be mode `0600`; a group/world-accessible
304    /// file is warned about at startup.
305    ///
306    /// [`encryption_key`]: Config::encryption_key
307    pub master_key_file: Option<PathBuf>,
308    /// Path to the PEM-encoded TLS certificate chain. Must be set together with
309    /// `tls_key`. Required for a non-loopback bind unless `insecure = true`.
310    pub tls_cert: Option<PathBuf>,
311    /// Path to the PEM-encoded TLS private key. Must be set together with
312    /// `tls_cert`.
313    pub tls_key: Option<PathBuf>,
314    /// Path to a PEM-encoded CA certificate that signs accepted client
315    /// certificates. When set, both transports require **mutual TLS**: a client
316    /// must present a certificate chaining to this CA to connect (ADR-0011).
317    /// Requires `tls_cert`/`tls_key`; bearer API keys still carry the RBAC scope.
318    pub tls_client_ca: Option<PathBuf>,
319    /// Path to an append-only audit log file (ADR-0011). When set, every
320    /// mutating and administrative operation and every access-control denial is
321    /// appended as one JSON object per line (JSON Lines); records are always
322    /// also emitted as `tracing` events. Unset ⇒ tracing only. Secrets are never
323    /// written — see `docs/security/audit.md`.
324    pub audit_log: Option<PathBuf>,
325    /// Run as a **read-replica follower** (ADR-0030): connect to a leader's gRPC
326    /// endpoint at this URL (e.g. `http://leader:6334`) and continuously apply its
327    /// committed operations, serving reads. A follower **refuses writes**. Unset ⇒
328    /// this node is a normal read-write leader. (Plaintext `http://` for now; TLS
329    /// to the leader is a follow-up — run replication over a trusted network.)
330    pub leader_url: Option<String>,
331    /// API key the follower presents to the leader's admin-scoped `Replicate`
332    /// stream (used with `leader_url`). Source it like any secret.
333    pub leader_api_key: Option<String>,
334    /// Opt out of the secure defaults (no auth, no encryption-at-rest, allow a
335    /// non-loopback bind without TLS). For local development only; never the
336    /// default.
337    pub insecure: bool,
338    /// Per-request cost limits (ADR-0040). Set with a `[limits]` table in
339    /// `quiver.toml` or the `QUIVER_MAX_*` environment variables.
340    pub limits: Limits,
341    /// Opt-in server-side embedding providers, keyed by collection name (ADR-0047).
342    /// Configured with `[embedding.<collection>]` tables in `quiver.toml`; default
343    /// empty, so the engine stays model-agnostic and library mode is unaffected.
344    /// API keys are referenced by env-var *name* and resolved at startup, never
345    /// stored. Enables `search_text` / `upsert_text` for the named collections.
346    #[serde(default)]
347    pub embedding: HashMap<String, EmbeddingConfig>,
348    /// Opt-in server-side rerank providers, keyed by collection name (ADR-0047).
349    /// Configured with `[rerank.<collection>]` tables; enables the one-call
350    /// retrieve→rerank stage of `search_text`.
351    #[serde(default)]
352    pub rerank: HashMap<String, RerankConfig>,
353    /// Opt-in per-key rate limiting (ADR-0049). Set a `[rate_limit]` table in
354    /// `quiver.toml` or the `QUIVER_RATE_LIMIT_*` env vars;
355    /// `requests_per_second = 0` (the default) disables it.
356    #[serde(default)]
357    pub rate_limit: RateLimitConfig,
358    /// Opt-in OpenTelemetry traces export (ADR-0059). Set an `[otlp]` table or the
359    /// `QUIVER_OTLP_*` env vars; an empty `endpoint` (the default) disables it.
360    /// Export additionally requires the server's `otlp` build feature.
361    #[serde(default)]
362    pub otlp: OtlpConfig,
363    /// Lock-free MVCC reads (ADR-0064), **experimental and default-off**. Serves
364    /// reads of single-vector, in-memory collections from an `arc-swap` snapshot so
365    /// pure-vector reads never block on a concurrent writer. Also settable via
366    /// `QUIVER_MVCC_READS`. See `docs/adr/0064-mvcc-reads-implementation.md`.
367    #[serde(default)]
368    pub mvcc_reads: bool,
369    /// Opt-in **cluster router** mode (ADR-0065): a non-empty list of shard base
370    /// URLs (e.g. `["http://s1:6333","http://s2:6333"]`, or the bracketed
371    /// `QUIVER_CLUSTER_SHARDS` env array `[http://s1:6333,http://s2:6333]`) makes
372    /// this server a stateless router that shards writes and scatter-gathers
373    /// searches across the shards. Empty (the default) = an ordinary single-node
374    /// server.
375    #[serde(default)]
376    pub cluster_shards: Vec<String>,
377    /// Optional per-shard **read replicas** (ADR-0065 increment 2). Each entry is
378    /// `"<shard_index>=<replica_url>"` (repeat the index for several replicas of one
379    /// shard), e.g. `["0=http://s1b:6333", "1=http://s2b:6333"]` or the bracketed
380    /// `QUIVER_CLUSTER_REPLICAS` env list. A replica is an ordinary follower
381    /// (ADR-0030) of its shard's primary; the router fans searches across
382    /// `{primary} ∪ replicas` to spread read load. A shard with no entry is
383    /// primary-only. Empty (the default) = no replicas.
384    #[serde(default)]
385    pub cluster_replicas: Vec<String>,
386    /// Optional API key the router presents to its shards (a cluster runs over a
387    /// trusted network). `None` = shards are unauthenticated.
388    #[serde(default)]
389    pub cluster_shard_key: Option<String>,
390    /// Run this process as the **cluster coordinator** (ADR-0066) instead of a
391    /// normal server: it owns the authoritative versioned shard map and serves the
392    /// `/cluster/*` membership API, off the data path. It seeds its map from
393    /// `cluster_shards`/`cluster_replicas` (or recovers it from `coordinator_state`).
394    /// Default `false`.
395    #[serde(default)]
396    pub coordinator: bool,
397    /// Base URL of the cluster coordinator (ADR-0066). When set on a **router**, the
398    /// router refreshes its shard map from `GET {url}/cluster/map` on an interval, so
399    /// a membership change propagates without a restart. `None` = a static map from
400    /// `cluster_shards` (increments 1–2 behaviour).
401    #[serde(default)]
402    pub coordinator_url: Option<String>,
403    /// Where the **coordinator** persists its versioned map + id counter, so a
404    /// restart recovers the exact membership (and never reuses a shard id). `None` =
405    /// in-memory only (lost on restart). Only meaningful with `coordinator = true`.
406    #[serde(default)]
407    pub coordinator_state: Option<PathBuf>,
408}
409
410impl Default for Config {
411    fn default() -> Self {
412        Self {
413            data_dir: PathBuf::from("./quiver-data"),
414            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
415            grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
416            api_keys: Vec::new(),
417            encryption_key: None,
418            master_key_file: None,
419            tls_cert: None,
420            tls_key: None,
421            tls_client_ca: None,
422            audit_log: None,
423            leader_url: None,
424            leader_api_key: None,
425            insecure: false,
426            limits: Limits::default(),
427            embedding: HashMap::new(),
428            rerank: HashMap::new(),
429            rate_limit: RateLimitConfig::default(),
430            otlp: OtlpConfig::default(),
431            mvcc_reads: false,
432            cluster_shards: Vec::new(),
433            cluster_replicas: Vec::new(),
434            cluster_shard_key: None,
435            coordinator: false,
436            coordinator_url: None,
437            coordinator_state: None,
438        }
439    }
440}
441
442impl Config {
443    /// Load configuration from defaults, an optional `quiver.toml`, and
444    /// `QUIVER_*` environment variables.
445    pub fn load() -> Result<Self, Error> {
446        let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
447            .merge(Toml::file("quiver.toml"))
448            .merge(Env::prefixed("QUIVER_"))
449            .extract()
450            .map_err(|e| Error::Config(e.to_string()))?;
451        // The flat `QUIVER_MAX_*` env keys do not nest under the `limits` table
452        // figment builds, so apply them explicitly (ADR-0040).
453        config.limits.apply_env_overrides()?;
454        // Same for the flat `QUIVER_RATE_LIMIT_*` keys (ADR-0049).
455        config
456            .rate_limit
457            .apply_env_overrides()
458            .map_err(Error::Config)?;
459        // Same for the flat `QUIVER_OTLP_*` keys (ADR-0059).
460        config.otlp.apply_env_overrides().map_err(Error::Config)?;
461        Ok(config)
462    }
463
464    /// Reject insecure configurations unless explicitly opted out (ADR-0013):
465    /// no anonymous access, encryption-at-rest on by default with a valid key,
466    /// and no non-loopback bind without TLS.
467    pub fn validate(&self) -> Result<(), Error> {
468        if self.api_keys.is_empty() && !self.insecure {
469            return Err(Error::Config(
470                "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
471                 set insecure=true for local development"
472                    .to_owned(),
473            ));
474        }
475        // Resolve the master key from the env var or a key file (exactly one).
476        let master_key = self.master_key_hex()?;
477        if master_key.is_none() && !self.insecure {
478            return Err(Error::Config(
479                "no encryption key configured: encryption-at-rest is on by default — \
480                 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
481                 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
482                 store data unencrypted (development only)"
483                    .to_owned(),
484            ));
485        }
486        // Fail fast on a malformed key rather than at first write.
487        if let Some(key) = &master_key {
488            AeadCodec::from_hex(key)
489                .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
490        }
491        // TLS certificate and key are set together or not at all.
492        if self.tls_cert.is_some() != self.tls_key.is_some() {
493            return Err(Error::Config(
494                "tls_cert and tls_key must be set together".to_owned(),
495            ));
496        }
497        // mTLS layers on top of server TLS: a client CA needs a server cert/key.
498        if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
499            return Err(Error::Config(
500                "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
501            ));
502        }
503        let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
504        let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
505        if non_loopback && !tls_enabled && !self.insecure {
506            return Err(Error::Config(
507                "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
508                 or insecure=true for local development"
509                    .to_owned(),
510            ));
511        }
512        // Reject a nonsensical cost limit (a `0` cap would refuse every request).
513        self.limits.validate()?;
514        Ok(())
515    }
516
517    /// The effective hex master key: from [`master_key_file`] when set (read and
518    /// trimmed), otherwise [`encryption_key`]. `None` means no key is configured
519    /// (only valid with `insecure`).
520    ///
521    /// # Errors
522    /// [`Error::Config`] if both sources are set, or the key file cannot be read.
523    ///
524    /// [`master_key_file`]: Config::master_key_file
525    /// [`encryption_key`]: Config::encryption_key
526    pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
527        let env_key = self
528            .encryption_key
529            .as_deref()
530            .map(str::trim)
531            .filter(|k| !k.is_empty());
532        match (&self.master_key_file, env_key) {
533            (Some(_), Some(_)) => Err(Error::Config(
534                "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
535                 (QUIVER_MASTER_KEY_FILE), not both"
536                    .to_owned(),
537            )),
538            (Some(path), None) => {
539                warn_if_world_readable(path);
540                let hex = std::fs::read_to_string(path).map_err(|e| {
541                    Error::Config(format!("reading master_key_file {}: {e}", path.display()))
542                })?;
543                Ok(Some(hex.trim().to_owned()))
544            }
545            (None, Some(key)) => Ok(Some(key.to_owned())),
546            (None, None) => Ok(None),
547        }
548    }
549}
550
551// Warn (don't fail — Docker/Kubernetes secrets often mount group/world-readable)
552// when a master-key file is more permissive than `0600`.
553#[cfg(unix)]
554fn warn_if_world_readable(path: &std::path::Path) {
555    use std::os::unix::fs::PermissionsExt;
556    if let Ok(meta) = std::fs::metadata(path)
557        && meta.permissions().mode() & 0o077 != 0
558    {
559        tracing::warn!(
560            path = %path.display(),
561            mode = format!("{:o}", meta.permissions().mode() & 0o777),
562            "master key file is group/world-accessible; restrict it to 0600"
563        );
564    }
565}
566
567#[cfg(not(unix))]
568fn warn_if_world_readable(_path: &std::path::Path) {}
569
570/// Shared server state: the engine behind a single-writer lock, the accepted
571/// API keys with their RBAC scopes, and the audit log.
572#[derive(Clone)]
573pub(crate) struct AppState {
574    // The engine behind a reader–writer lock (ADR-0057): reads take the shared
575    // lock and run concurrently; writes take the exclusive lock. A read that finds
576    // a collection's index stale upgrades to the write lock once to rebuild it,
577    // then serves concurrently again.
578    db: Arc<RwLock<Database>>,
579    keys: Arc<Vec<ApiKey>>,
580    audit: Arc<AuditLog>,
581    // Fan-out of every committed op to replication followers (ADR-0030). The
582    // commit observer publishes here; each `Replicate` stream subscribes.
583    replication_tx: broadcast::Sender<WalEntry>,
584    // True on a replication follower: external writes are refused; the engine's
585    // state is owned by the stream it applies from the leader (ADR-0030).
586    read_only: bool,
587    // Per-request cost limits, enforced at this op layer so both transports are
588    // covered by one implementation (ADR-0040).
589    limits: Limits,
590    // Opt-in, provider-agnostic server-side embedding/rerank providers, keyed by
591    // collection (ADR-0047). Empty on the common path; `search_text`/`upsert_text`
592    // require a configured embedder for the target collection.
593    embed: Arc<EmbedRegistry>,
594    // Opt-in per-key token-bucket rate limiter (ADR-0049). A no-op when disabled.
595    rate_limiter: Arc<RateLimiter>,
596    // Prometheus metrics registry (ADR-0014/0054), scraped at `GET /metrics`.
597    metrics: Arc<metrics::Metrics>,
598    // Collections with an off-lock index rebuild in flight (ADR-0062). A search that
599    // observes a deferred rebuild schedules one here, single-flighted so concurrent
600    // readers never kick off duplicate builds for the same collection.
601    rebuilding: Arc<Mutex<HashSet<String>>>,
602    // Lock-free MVCC read cache (ADR-0064 increment 3): cached `arc-swap` snapshot
603    // cells for MVCC-served collections. A pure-vector search loads the cell and
604    // searches it **without taking the database lock**, so it never blocks on a
605    // concurrent writer. Populated under the read lock the first time a collection
606    // is seen fresh; the map itself changes only on create/drop, never on a data
607    // write. Empty unless `QUIVER_MVCC_READS` is on.
608    snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
609    // Snapshot of the engine's `QUIVER_MVCC_READS` flag, read once at startup. When
610    // on, the writer drives off-lock consolidation rebuilds (ADR-0064 increment 3)
611    // because pure-vector fast-path reads bypass the reader-driven scheduler. Off by
612    // default, so the non-MVCC path is byte-identical.
613    mvcc: bool,
614    // Opt-in cluster router (ADR-0065): when `Some`, every collection/point op
615    // fans out to the shards instead of touching the local engine. `None` = an
616    // ordinary single-node server (the engine-backed path, untouched).
617    cluster: Option<Arc<cluster::Cluster>>,
618}
619
620/// A collection's metadata.
621pub(crate) struct CollectionInfo {
622    pub name: String,
623    pub dim: u32,
624    pub metric: DistanceMetric,
625    pub count: u64,
626    pub index: IndexSpec,
627    pub filterable: Vec<FilterableField>,
628    pub multivector: bool,
629    pub vector_encryption: VectorEncryption,
630}
631
632/// A point to upsert.
633pub(crate) struct PointIn {
634    pub id: String,
635    pub vector: Vec<f32>,
636    pub payload: Value,
637}
638
639/// A text point to embed server-side and upsert (ADR-0047).
640pub(crate) struct TextPointIn {
641    pub id: String,
642    pub text: String,
643    pub payload: Value,
644}
645
646/// Default candidate pool size a rerank stage over-fetches before reordering to
647/// the requested `k` (ADR-0047).
648const RERANK_CANDIDATES: usize = 50;
649
650/// The document text a rerank stage scores: the original text stored under
651/// [`TEXT_KEY`] by `upsert_text`, else the whole payload as a string so the
652/// reranker still has something to compare.
653fn doc_text(payload: Option<&Value>) -> String {
654    match payload {
655        Some(Value::Object(map)) => map
656            .get(TEXT_KEY)
657            .and_then(Value::as_str)
658            .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
659        Some(v) => v.to_string(),
660        None => String::new(),
661    }
662}
663
664/// A fetched point.
665pub(crate) struct PointOut {
666    pub id: String,
667    pub vector: Option<Vec<f32>>,
668    pub payload: Value,
669}
670
671/// A search hit.
672pub(crate) struct MatchOut {
673    pub id: String,
674    pub score: f32,
675    pub payload: Option<Value>,
676    pub vector: Option<Vec<f32>>,
677}
678
679/// A multi-vector document to upsert.
680pub(crate) struct DocumentIn {
681    pub id: String,
682    pub vectors: Vec<Vec<f32>>,
683    pub payload: Value,
684}
685
686/// A multi-vector document hit (MaxSim).
687pub(crate) struct DocumentMatchOut {
688    pub id: String,
689    pub score: f32,
690    pub payload: Option<Value>,
691    pub vectors: Option<Vec<Vec<f32>>>,
692}
693
694impl AppState {
695    /// Authenticate a presented bearer token to its [`Principal`], or `None`
696    /// (a 401). An empty key set means `insecure` mode (validated at startup),
697    /// which admits any caller as an all-collections admin.
698    pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
699        auth::authenticate(&self.keys, presented)
700    }
701
702    /// Consume one rate-limit token for `actor` (ADR-0049). A no-op `Allowed` when
703    /// rate limiting is disabled. Both transports call this at their auth choke
704    /// point so the limiter is enforced by one implementation.
705    pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
706        self.rate_limiter.check(actor)
707    }
708
709    /// Whether the per-key rate limiter is active (lets a transport skip the work
710    /// entirely on the common, disabled path).
711    pub(crate) fn rate_limit_enabled(&self) -> bool {
712        self.rate_limiter.enabled()
713    }
714
715    // Run a **mutating** engine op behind the exclusive write lock, off the async
716    // runtime (the engine is synchronous and CPU/IO-bound). The single writer is
717    // unchanged from the prior single-mutex model (ADR-0006/0057).
718    async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
719    where
720        T: Send + 'static,
721        F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
722    {
723        let db = Arc::clone(&self.db);
724        tokio::task::spawn_blocking(move || -> Result<T, Error> {
725            let mut guard = db
726                .write()
727                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
728            f(&mut guard).map_err(Error::Engine)
729        })
730        .await
731        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
732    }
733
734    // Run a **read-only** engine op behind the shared read lock — many of these run
735    // concurrently (ADR-0057). The closure gets `&Database`, so it can only call the
736    // `&self` reads (`*_snapshot`, `fetch`, accessors).
737    async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
738    where
739        T: Send + 'static,
740        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
741    {
742        let db = Arc::clone(&self.db);
743        tokio::task::spawn_blocking(move || -> Result<T, Error> {
744            let guard = db
745                .read()
746                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
747            f(&guard).map_err(Error::Engine)
748        })
749        .await
750        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
751    }
752
753    // Run a snapshot read for `collection` (ADR-0057/0062). Takes only the shared
754    // read lock, so concurrent searches run in parallel. When a prior write deferred
755    // this collection's rebuild, the snapshot read serves the **prior** snapshot
756    // (snapshot-isolated, slightly stale) rather than blocking, and we schedule an
757    // **off-lock** rebuild so the next read is fresh — the reader never waits on a
758    // rebuild under the exclusive lock.
759    async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
760    where
761        T: Send + 'static,
762        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
763    {
764        let db = Arc::clone(&self.db);
765        let cells = Arc::clone(&self.snapshot_cells);
766        let coll = collection.clone();
767        let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
768            let guard = db
769                .read()
770                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
771            let result = f(&guard).map_err(Error::Engine)?;
772            // Report staleness so the caller can schedule a background rebuild; a
773            // missing collection (raced a drop) is simply "nothing to rebuild".
774            let stale = guard.needs_rebuild(&coll).unwrap_or(false);
775            // Warm the lock-free read cache (ADR-0064 increment 3): once an
776            // MVCC-served collection is fresh (its base snapshot is published), cache
777            // its cell so subsequent pure-vector reads skip the lock. The cell
778            // self-updates as the writer republishes, so it is never re-fetched.
779            if !stale
780                && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
781                && let Ok(mut map) = cells.write()
782            {
783                map.entry(coll.clone()).or_insert(cell);
784            }
785            Ok((result, stale))
786        })
787        .await
788        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
789        if stale {
790            self.schedule_rebuild(collection);
791        }
792        Ok(result)
793    }
794
795    // The cached lock-free snapshot cell for `collection`, if one has been warmed
796    // (ADR-0064 increment 3). The map read contends only with create/drop, never
797    // with a data write — that is what makes the pure-vector read lock-free.
798    fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
799        self.snapshot_cells.read().ok()?.get(collection).cloned()
800    }
801
802    // After an MVCC-mode data write, schedule an off-lock consolidation rebuild if
803    // the write left the overlay stale (ADR-0064 increment 3): pure-vector fast-path
804    // reads bypass the reader-driven scheduler, so the writer drives consolidation.
805    // A brief read-lock staleness check; the rebuild itself runs off-lock and
806    // single-flighted. No-op (and not called) when MVCC is off.
807    async fn schedule_if_stale(&self, collection: &str) {
808        let db = Arc::clone(&self.db);
809        let coll = collection.to_owned();
810        let stale = tokio::task::spawn_blocking(move || {
811            db.read()
812                .ok()
813                .and_then(|g| g.needs_rebuild(&coll).ok())
814                .unwrap_or(false)
815        })
816        .await
817        .unwrap_or(false);
818        if stale {
819            self.schedule_rebuild(collection.to_owned());
820        }
821    }
822
823    // Schedule a single off-lock rebuild for `collection` (ADR-0062), deduplicated
824    // so concurrent readers that all observe the same deferred rebuild start exactly
825    // one. A no-op if one is already in flight.
826    fn schedule_rebuild(&self, collection: String) {
827        {
828            let mut inflight = match self.rebuilding.lock() {
829                Ok(g) => g,
830                Err(_) => return,
831            };
832            if !inflight.insert(collection.clone()) {
833                return; // already rebuilding this collection
834            }
835        }
836        let state = self.clone();
837        tokio::spawn(async move {
838            state.run_rebuild(&collection).await;
839            if let Ok(mut inflight) = state.rebuilding.lock() {
840                inflight.remove(&collection);
841            }
842        });
843    }
844
845    // Drive a collection's off-lock rebuild to completion (ADR-0062): capture the
846    // inputs under the shared read lock, build with no lock held, install under a
847    // brief write lock, and repeat while a write landed during the build (the commit
848    // reports the collection still stale). Each phase is a `spawn_blocking` so the
849    // CPU-bound build never stalls the async runtime; errors end the attempt (the
850    // collection stays stale and the next read reschedules).
851    async fn run_rebuild(&self, collection: &str) {
852        loop {
853            let db = Arc::clone(&self.db);
854            let coll = collection.to_owned();
855            let inputs = tokio::task::spawn_blocking(move || {
856                let guard = db.read().ok()?;
857                guard.snapshot_rebuild_inputs(&coll).ok().flatten()
858            })
859            .await
860            .ok()
861            .flatten();
862            let Some(inputs) = inputs else { return };
863
864            let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
865                return;
866            };
867
868            let db = Arc::clone(&self.db);
869            let still_stale = tokio::task::spawn_blocking(move || {
870                let mut guard = db.write().ok()?;
871                guard.commit_rebuild(rebuilt).ok()
872            })
873            .await
874            .ok()
875            .flatten();
876            match still_stale {
877                Some(true) => continue, // a write landed during the build — rebuild again
878                _ => return,
879            }
880        }
881    }
882
883    // Authorize `action` on `resource`, recording a denial in the audit log
884    // before propagating it. The shared choke point for both transports.
885    fn authorize(
886        &self,
887        principal: &Principal,
888        action: Action,
889        op: &str,
890        resource: &str,
891    ) -> Result<(), Error> {
892        principal
893            .require(action, Some(resource))
894            .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
895    }
896
897    // Authorize a collection-agnostic operation (listing): only the role is
898    // checked; a denial is recorded against the `*` resource.
899    fn authorize_global(
900        &self,
901        principal: &Principal,
902        action: Action,
903        op: &str,
904    ) -> Result<(), Error> {
905        principal
906            .require(action, None)
907            .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
908    }
909
910    /// Open a replication stream (ADR-0030): authorize (admin), then — in a single
911    /// engine critical section so no commit can interleave — subscribe to the live
912    /// commit tail and snapshot current state. The caller streams the snapshot,
913    /// then the tail from the receiver; because the subscription is taken under the
914    /// same lock as the snapshot, every post-snapshot op arrives on the receiver
915    /// and none is missed or duplicated.
916    pub(crate) async fn open_replication(
917        &self,
918        principal: &Principal,
919    ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
920        self.authorize_global(principal, Action::Admin, "replicate")?;
921        let tx = self.replication_tx.clone();
922        self.read_blocking(move |db| {
923            let rx = tx.subscribe();
924            let snapshot = db.replication_snapshot()?;
925            Ok((snapshot, rx))
926        })
927        .await
928    }
929
930    /// Apply a replicated op received from the leader (ADR-0030). Internal to the
931    /// follower stream — deliberately NOT gated by `read_only`, which only refuses
932    /// *external* client writes.
933    pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
934        self.write_blocking(move |db| db.apply_replicated(op)).await
935    }
936
937    // Refuse a mutating operation on a read-only replication follower (ADR-0030);
938    // its state is owned by the leader's stream, not by external clients.
939    fn ensure_writable(&self, op: &str) -> Result<(), Error> {
940        if self.read_only {
941            return Err(Error::Forbidden(format!(
942                "{op}: this node is a read-only replication follower"
943            )));
944        }
945        Ok(())
946    }
947
948    #[allow(clippy::too_many_arguments)]
949    pub(crate) async fn create_collection(
950        &self,
951        principal: &Principal,
952        name: String,
953        dim: u32,
954        metric: DistanceMetric,
955        index: IndexSpec,
956        filterable: Vec<FilterableField>,
957        multivector: bool,
958        vector_encryption: VectorEncryption,
959    ) -> Result<CollectionInfo, Error> {
960        self.ensure_writable("create_collection")?;
961        self.authorize(principal, Action::Admin, "create_collection", &name)?;
962        self.limits.check_dim(dim as usize)?;
963        if let Some(c) = &self.cluster {
964            return c
965                .create_collection(
966                    name,
967                    dim,
968                    metric,
969                    index,
970                    filterable,
971                    multivector,
972                    vector_encryption,
973                )
974                .await;
975        }
976        let descriptor = Descriptor::new(dim, Dtype::F32, metric)
977            .with_index(index)
978            .with_filterable(filterable.clone())
979            .with_multivector(multivector)
980            .with_vector_encryption(vector_encryption);
981        let owned = name.clone();
982        let result = self
983            .write_blocking(move |db| db.create_collection(&owned, descriptor))
984            .await;
985        self.audit.record(
986            principal.actor(),
987            "create_collection",
988            &name,
989            Outcome::of(&result),
990        );
991        result?;
992        Ok(CollectionInfo {
993            name,
994            dim,
995            metric,
996            count: 0,
997            index,
998            filterable,
999            multivector,
1000            vector_encryption,
1001        })
1002    }
1003
1004    pub(crate) async fn get_collection(
1005        &self,
1006        principal: &Principal,
1007        name: String,
1008    ) -> Result<CollectionInfo, Error> {
1009        self.authorize(principal, Action::Read, "get_collection", &name)?;
1010        self.read_blocking(move |db| {
1011            let descriptor = db
1012                .descriptor(&name)
1013                .cloned()
1014                .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
1015            // A multi-vector collection reports its document count, not its
1016            // (much larger) token-row count.
1017            let count = if descriptor.multivector {
1018                db.document_count(&name)? as u64
1019            } else {
1020                db.len(&name)? as u64
1021            };
1022            Ok(CollectionInfo {
1023                name,
1024                dim: descriptor.dim,
1025                metric: descriptor.metric,
1026                count,
1027                index: descriptor.index,
1028                filterable: descriptor.filterable,
1029                multivector: descriptor.multivector,
1030                vector_encryption: descriptor.vector_encryption,
1031            })
1032        })
1033        .await
1034    }
1035
1036    pub(crate) async fn list_collections(
1037        &self,
1038        principal: &Principal,
1039    ) -> Result<Vec<CollectionInfo>, Error> {
1040        self.authorize_global(principal, Action::Read, "list_collections")?;
1041        let mut infos = self
1042            .read_blocking(|db| {
1043                let mut out = Vec::new();
1044                for name in db.collection_names() {
1045                    if let Some(descriptor) = db.descriptor(&name).cloned() {
1046                        let count = if descriptor.multivector {
1047                            db.document_count(&name)? as u64
1048                        } else {
1049                            db.len(&name)? as u64
1050                        };
1051                        out.push(CollectionInfo {
1052                            name,
1053                            dim: descriptor.dim,
1054                            metric: descriptor.metric,
1055                            count,
1056                            index: descriptor.index,
1057                            filterable: descriptor.filterable,
1058                            multivector: descriptor.multivector,
1059                            vector_encryption: descriptor.vector_encryption,
1060                        });
1061                    }
1062                }
1063                Ok(out)
1064            })
1065            .await?;
1066        // Never reveal collections outside the caller's scope.
1067        infos.retain(|info| principal.can_see(&info.name));
1068        Ok(infos)
1069    }
1070
1071    pub(crate) async fn delete_collection(
1072        &self,
1073        principal: &Principal,
1074        name: String,
1075    ) -> Result<bool, Error> {
1076        self.ensure_writable("delete_collection")?;
1077        self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1078        if let Some(c) = &self.cluster {
1079            return c.drop_collection(&name).await;
1080        }
1081        let resource = name.clone();
1082        let result = self
1083            .write_blocking(move |db| db.drop_collection(&name))
1084            .await;
1085        self.audit.record(
1086            principal.actor(),
1087            "delete_collection",
1088            &resource,
1089            Outcome::of(&result),
1090        );
1091        // Evict the dropped collection's cached lock-free cell (ADR-0064 increment 3)
1092        // so a later same-named collection never serves a stale cell.
1093        if matches!(result, Ok(true))
1094            && let Ok(mut map) = self.snapshot_cells.write()
1095        {
1096            map.remove(&resource);
1097        }
1098        result
1099    }
1100
1101    #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1102    pub(crate) async fn upsert(
1103        &self,
1104        principal: &Principal,
1105        collection: String,
1106        points: Vec<PointIn>,
1107    ) -> Result<u64, Error> {
1108        self.ensure_writable("upsert")?;
1109        self.authorize(principal, Action::Write, "upsert", &collection)?;
1110        self.limits.check_batch(points.len())?;
1111        for p in &points {
1112            self.limits.check_vector_len(p.vector.len())?;
1113            self.limits.check_payload(&p.payload)?;
1114        }
1115        if let Some(c) = &self.cluster {
1116            return c.upsert(&collection, points).await;
1117        }
1118        let resource = collection.clone();
1119        let result = self
1120            .write_blocking(move |db| {
1121                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1122                    .iter()
1123                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1124                    .collect();
1125                db.upsert_batch(&collection, &records)
1126            })
1127            .await;
1128        self.audit
1129            .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1130        if self.mvcc && result.is_ok() {
1131            self.schedule_if_stale(&resource).await;
1132        }
1133        result
1134    }
1135
1136    // Bulk upsert for a load-then-query workload (ADR-0045): one WAL fsync plus a
1137    // single deferred index-build pass, with the larger `max_bulk_batch_size` cap.
1138    pub(crate) async fn upsert_bulk(
1139        &self,
1140        principal: &Principal,
1141        collection: String,
1142        points: Vec<PointIn>,
1143    ) -> Result<u64, Error> {
1144        self.ensure_writable("upsert")?;
1145        self.authorize(principal, Action::Write, "upsert", &collection)?;
1146        self.limits.check_bulk_batch(points.len())?;
1147        for p in &points {
1148            self.limits.check_vector_len(p.vector.len())?;
1149            self.limits.check_payload(&p.payload)?;
1150        }
1151        if let Some(c) = &self.cluster {
1152            return c.upsert_bulk(&collection, points).await;
1153        }
1154        let resource = collection.clone();
1155        let result = self
1156            .write_blocking(move |db| {
1157                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1158                    .iter()
1159                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1160                    .collect();
1161                db.upsert_bulk(&collection, &records)
1162            })
1163            .await;
1164        self.audit.record(
1165            principal.actor(),
1166            "upsert_bulk",
1167            &resource,
1168            Outcome::of(&result),
1169        );
1170        if self.mvcc && result.is_ok() {
1171            self.schedule_if_stale(&resource).await;
1172        }
1173        result
1174    }
1175
1176    /// Take a consistent online snapshot of the whole database into a
1177    /// server-local `destination` directory (ADR-0050). A global admin
1178    /// operation; runs the checkpoint + copy on the blocking pool.
1179    #[tracing::instrument(skip_all)]
1180    pub(crate) async fn snapshot(
1181        &self,
1182        principal: &Principal,
1183        destination: String,
1184    ) -> Result<SnapshotInfo, Error> {
1185        self.ensure_writable("snapshot")?;
1186        self.authorize_global(principal, Action::Admin, "snapshot")?;
1187        let dest = std::path::PathBuf::from(&destination);
1188        let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1189        self.audit.record(
1190            principal.actor(),
1191            "snapshot",
1192            &destination,
1193            Outcome::of(&result),
1194        );
1195        result
1196    }
1197
1198    pub(crate) async fn delete_points(
1199        &self,
1200        principal: &Principal,
1201        collection: String,
1202        ids: Vec<String>,
1203    ) -> Result<u64, Error> {
1204        self.ensure_writable("delete_points")?;
1205        self.authorize(principal, Action::Write, "delete_points", &collection)?;
1206        if let Some(c) = &self.cluster {
1207            return c.delete_points(&collection, ids).await;
1208        }
1209        let resource = collection.clone();
1210        let result = self
1211            .write_blocking(move |db| {
1212                let mut count = 0u64;
1213                for id in &ids {
1214                    if db.delete(&collection, id)? {
1215                        count += 1;
1216                    }
1217                }
1218                Ok(count)
1219            })
1220            .await;
1221        self.audit.record(
1222            principal.actor(),
1223            "delete_points",
1224            &resource,
1225            Outcome::of(&result),
1226        );
1227        if self.mvcc && result.is_ok() {
1228            self.schedule_if_stale(&resource).await;
1229        }
1230        result
1231    }
1232
1233    pub(crate) async fn get_points(
1234        &self,
1235        principal: &Principal,
1236        collection: String,
1237        ids: Vec<String>,
1238        with_vector: bool,
1239    ) -> Result<Vec<PointOut>, Error> {
1240        self.authorize(principal, Action::Read, "get_points", &collection)?;
1241        if let Some(c) = &self.cluster {
1242            return c.get_points(&collection, ids, with_vector).await;
1243        }
1244        self.read_blocking(move |db| {
1245            let mut out = Vec::new();
1246            for id in &ids {
1247                if let Some(m) = db.get(&collection, id)? {
1248                    out.push(PointOut {
1249                        id: m.id,
1250                        vector: if with_vector { m.vector } else { None },
1251                        payload: m.payload.unwrap_or(Value::Null),
1252                    });
1253                }
1254            }
1255            Ok(out)
1256        })
1257        .await
1258    }
1259
1260    #[allow(clippy::too_many_arguments)]
1261    #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1262    pub(crate) async fn search(
1263        &self,
1264        principal: &Principal,
1265        collection: String,
1266        vector: Vec<f32>,
1267        k: usize,
1268        filter: Option<Filter>,
1269        ef_search: usize,
1270        with_payload: bool,
1271        with_vector: bool,
1272    ) -> Result<Vec<MatchOut>, Error> {
1273        self.authorize(principal, Action::Read, "search", &collection)?;
1274        self.limits.check_search(k, ef_search)?;
1275        self.limits.check_vector_len(vector.len())?;
1276
1277        // Cluster router (ADR-0065): scatter-gather across the shards, merge top-k.
1278        if let Some(c) = &self.cluster {
1279            return c
1280                .search(
1281                    &collection,
1282                    vector,
1283                    k,
1284                    filter,
1285                    ef_search,
1286                    with_payload,
1287                    with_vector,
1288                )
1289                .await;
1290        }
1291
1292        // Lock-free fast path (ADR-0064 increment 3): a **pure-vector** read of an
1293        // MVCC-served collection loads the cached snapshot cell and searches it with
1294        // **no database lock**, so it never blocks on a concurrent writer. Payload,
1295        // filter, and vector results need the store (behind the lock) and fall
1296        // through to `search_blocking`.
1297        if filter.is_none()
1298            && !with_payload
1299            && !with_vector
1300            && let Some(cell) = self.cached_cell(&collection)
1301        {
1302            return tokio::task::spawn_blocking(move || {
1303                let matches = cell.load().search(&vector, k, ef_search)?;
1304                Ok::<_, quiver_embed::Error>(
1305                    matches
1306                        .into_iter()
1307                        .map(|m| MatchOut {
1308                            id: m.id,
1309                            score: m.score,
1310                            payload: None,
1311                            vector: None,
1312                        })
1313                        .collect(),
1314                )
1315            })
1316            .await
1317            .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1318            .map_err(Error::Engine);
1319        }
1320
1321        let params = SearchParams {
1322            k,
1323            filter,
1324            ef_search,
1325            with_payload,
1326            with_vector,
1327        };
1328        let coll = collection.clone();
1329        self.search_blocking(coll, move |db| {
1330            let matches = db.search_snapshot(&collection, &vector, &params)?;
1331            Ok(matches
1332                .into_iter()
1333                .map(|m| MatchOut {
1334                    id: m.id,
1335                    score: m.score,
1336                    payload: m.payload,
1337                    vector: m.vector,
1338                })
1339                .collect())
1340        })
1341        .await
1342    }
1343
1344    #[allow(clippy::too_many_arguments)]
1345    pub(crate) async fn hybrid_search(
1346        &self,
1347        principal: &Principal,
1348        collection: String,
1349        dense: Option<Vec<f32>>,
1350        sparse: Option<(Vec<u32>, Vec<f32>)>,
1351        text: Option<String>,
1352        k: usize,
1353        filter: Option<Filter>,
1354        ef_search: usize,
1355        rrf_k0: f32,
1356        with_payload: bool,
1357        with_vector: bool,
1358    ) -> Result<Vec<MatchOut>, Error> {
1359        self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1360        self.limits.check_search(k, ef_search)?;
1361        if let Some(v) = &dense {
1362            self.limits.check_vector_len(v.len())?;
1363        }
1364        if let Some((indices, values)) = &sparse {
1365            self.limits.check_sparse_terms(indices.len())?;
1366            if indices.len() != values.len() {
1367                return Err(Error::BadRequest(format!(
1368                    "sparse query indices ({}) and values ({}) length mismatch",
1369                    indices.len(),
1370                    values.len()
1371                )));
1372            }
1373        }
1374        let params = SearchParams {
1375            k,
1376            filter,
1377            ef_search,
1378            with_payload,
1379            with_vector,
1380        };
1381        let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1382        let coll = collection.clone();
1383        self.search_blocking(coll, move |db| {
1384            let matches = db.hybrid_search_snapshot(
1385                &collection,
1386                dense.as_deref(),
1387                sv.as_ref(),
1388                text.as_deref(),
1389                &params,
1390                rrf_k0,
1391            )?;
1392            Ok(matches
1393                .into_iter()
1394                .map(|m| MatchOut {
1395                    id: m.id,
1396                    score: m.score,
1397                    payload: m.payload,
1398                    vector: m.vector,
1399                })
1400                .collect())
1401        })
1402        .await
1403    }
1404
1405    /// Embed `text` with the collection's configured provider and run a dense (or
1406    /// dense ⊕ BM25, if the collection has text) search, optionally reranking the
1407    /// candidates in one call (ADR-0047). The text is also passed to the BM25 side,
1408    /// so a `upsert_text` corpus yields hybrid lexical+semantic retrieval for free.
1409    #[allow(clippy::too_many_arguments)]
1410    pub(crate) async fn search_text(
1411        &self,
1412        principal: &Principal,
1413        collection: String,
1414        text: String,
1415        k: usize,
1416        filter: Option<Filter>,
1417        ef_search: usize,
1418        rrf_k0: f32,
1419        with_payload: bool,
1420        with_vector: bool,
1421        rerank: bool,
1422    ) -> Result<Vec<MatchOut>, Error> {
1423        self.authorize(principal, Action::Read, "search_text", &collection)?;
1424        self.limits.check_search(k, ef_search)?;
1425        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1426            Error::BadRequest(format!(
1427                "collection {collection:?} has no embedding provider configured \
1428                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1429            ))
1430        })?;
1431        // Embed off the async runtime: the provider call is blocking network I/O.
1432        let query = text.clone();
1433        let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1434            .await
1435            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1436            .map_err(|e| Error::Upstream(e.to_string()))?
1437            .into_iter()
1438            .next()
1439            .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1440        self.limits.check_vector_len(vector.len())?;
1441
1442        let reranker = if rerank {
1443            self.embed.reranker(&collection)
1444        } else {
1445            None
1446        };
1447        // Over-fetch when reranking so the reranker can reorder a wide candidate set
1448        // down to the requested `k`; fetch payloads when reranking (we need the doc
1449        // text) even if the caller did not ask for them.
1450        let need_payload = with_payload || reranker.is_some();
1451        let fetch_k = if reranker.is_some() {
1452            k.max(RERANK_CANDIDATES)
1453        } else {
1454            k
1455        };
1456
1457        let mut hits = self
1458            .hybrid_search(
1459                principal,
1460                collection,
1461                Some(vector),
1462                None,
1463                Some(text.clone()),
1464                fetch_k,
1465                filter,
1466                ef_search,
1467                rrf_k0,
1468                need_payload,
1469                with_vector,
1470            )
1471            .await?;
1472
1473        if let Some(rr) = reranker {
1474            let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1475            let query = text;
1476            let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1477                .await
1478                .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1479                .map_err(|e| Error::Upstream(e.to_string()))?;
1480            // Re-score each hit and sort by the rerank score, descending.
1481            let mut scored: Vec<(f32, MatchOut)> = scores
1482                .into_iter()
1483                .zip(hits)
1484                .map(|(s, mut h)| {
1485                    h.score = s;
1486                    (s, h)
1487                })
1488                .collect();
1489            scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1490            hits = scored.into_iter().map(|(_, h)| h).collect();
1491        }
1492
1493        hits.truncate(k);
1494        // Drop payloads we only fetched for reranking if the caller didn't want them.
1495        if !with_payload {
1496            for h in &mut hits {
1497                h.payload = None;
1498            }
1499        }
1500        Ok(hits)
1501    }
1502
1503    /// Embed each point's `text` with the collection's provider and upsert it as a
1504    /// dense point, co-populating the `__quiver_text__` payload key (ADR-0046) so the
1505    /// same text is indexed for BM25 — one call feeds both the dense and lexical
1506    /// sides (ADR-0047).
1507    pub(crate) async fn upsert_text(
1508        &self,
1509        principal: &Principal,
1510        collection: String,
1511        points: Vec<TextPointIn>,
1512    ) -> Result<u64, Error> {
1513        self.ensure_writable("upsert_text")?;
1514        self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1515        self.limits.check_batch(points.len())?;
1516        for p in &points {
1517            if !matches!(p.payload, Value::Object(_) | Value::Null) {
1518                return Err(Error::BadRequest(
1519                    "upsert_text payload must be a JSON object or null".to_owned(),
1520                ));
1521            }
1522        }
1523        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1524            Error::BadRequest(format!(
1525                "collection {collection:?} has no embedding provider configured \
1526                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1527            ))
1528        })?;
1529        let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1530        let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1531            .await
1532            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1533            .map_err(|e| Error::Upstream(e.to_string()))?;
1534        if vectors.len() != points.len() {
1535            return Err(Error::Upstream(format!(
1536                "embedding provider returned {} vectors for {} inputs",
1537                vectors.len(),
1538                points.len()
1539            )));
1540        }
1541        let dense: Vec<PointIn> = points
1542            .into_iter()
1543            .zip(vectors)
1544            .map(|(p, vector)| {
1545                let mut payload = match p.payload {
1546                    Value::Object(map) => map,
1547                    _ => serde_json::Map::new(),
1548                };
1549                // Don't clobber a caller-supplied text key.
1550                payload
1551                    .entry(TEXT_KEY.to_owned())
1552                    .or_insert_with(|| Value::String(p.text.clone()));
1553                PointIn {
1554                    id: p.id,
1555                    vector,
1556                    payload: Value::Object(payload),
1557                }
1558            })
1559            .collect();
1560        self.upsert(principal, collection, dense).await
1561    }
1562
1563    #[allow(clippy::too_many_arguments)]
1564    pub(crate) async fn fetch(
1565        &self,
1566        principal: &Principal,
1567        collection: String,
1568        filter: Option<Filter>,
1569        offset: usize,
1570        limit: usize,
1571        with_payload: bool,
1572        with_vector: bool,
1573    ) -> Result<Vec<MatchOut>, Error> {
1574        self.authorize(principal, Action::Read, "fetch", &collection)?;
1575        self.limits.check_fetch(limit)?;
1576        self.read_blocking(move |db| {
1577            let matches = db.fetch(
1578                &collection,
1579                filter.as_ref(),
1580                offset,
1581                limit,
1582                with_payload,
1583                with_vector,
1584            )?;
1585            Ok(matches
1586                .into_iter()
1587                .map(|m| MatchOut {
1588                    id: m.id,
1589                    score: m.score,
1590                    payload: m.payload,
1591                    vector: m.vector,
1592                })
1593                .collect())
1594        })
1595        .await
1596    }
1597
1598    pub(crate) async fn upsert_documents(
1599        &self,
1600        principal: &Principal,
1601        collection: String,
1602        documents: Vec<DocumentIn>,
1603    ) -> Result<u64, Error> {
1604        self.ensure_writable("upsert_documents")?;
1605        self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1606        self.limits.check_batch(documents.len())?;
1607        for doc in &documents {
1608            self.limits.check_payload(&doc.payload)?;
1609            for token in &doc.vectors {
1610                self.limits.check_vector_len(token.len())?;
1611            }
1612        }
1613        let resource = collection.clone();
1614        let result = self
1615            .write_blocking(move |db| {
1616                let mut count = 0u64;
1617                for doc in &documents {
1618                    db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1619                    count += 1;
1620                }
1621                Ok(count)
1622            })
1623            .await;
1624        self.audit.record(
1625            principal.actor(),
1626            "upsert_documents",
1627            &resource,
1628            Outcome::of(&result),
1629        );
1630        result
1631    }
1632
1633    pub(crate) async fn delete_documents(
1634        &self,
1635        principal: &Principal,
1636        collection: String,
1637        ids: Vec<String>,
1638    ) -> Result<u64, Error> {
1639        self.ensure_writable("delete_documents")?;
1640        self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1641        let resource = collection.clone();
1642        let result = self
1643            .write_blocking(move |db| {
1644                let mut count = 0u64;
1645                for id in &ids {
1646                    if db.delete_document(&collection, id)? {
1647                        count += 1;
1648                    }
1649                }
1650                Ok(count)
1651            })
1652            .await;
1653        self.audit.record(
1654            principal.actor(),
1655            "delete_documents",
1656            &resource,
1657            Outcome::of(&result),
1658        );
1659        result
1660    }
1661
1662    #[allow(clippy::too_many_arguments)]
1663    pub(crate) async fn search_multi_vector(
1664        &self,
1665        principal: &Principal,
1666        collection: String,
1667        query: Vec<Vec<f32>>,
1668        k: usize,
1669        filter: Option<Filter>,
1670        ef_search: usize,
1671        with_payload: bool,
1672        with_vector: bool,
1673    ) -> Result<Vec<DocumentMatchOut>, Error> {
1674        self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1675        self.limits.check_search(k, ef_search)?;
1676        for token in &query {
1677            self.limits.check_vector_len(token.len())?;
1678        }
1679        let params = SearchParams {
1680            k,
1681            filter,
1682            ef_search,
1683            with_payload,
1684            with_vector,
1685        };
1686        let coll = collection.clone();
1687        self.search_blocking(coll, move |db| {
1688            let matches = db.search_multi_vector_snapshot(&collection, &query, &params)?;
1689            Ok(matches
1690                .into_iter()
1691                .map(|m| DocumentMatchOut {
1692                    id: m.id,
1693                    score: m.score,
1694                    payload: m.payload,
1695                    vectors: m.vectors,
1696                })
1697                .collect())
1698        })
1699        .await
1700    }
1701}
1702
1703/// How many recently-committed ops the leader buffers for replication followers
1704/// (ADR-0030). A follower that falls further behind than this re-bootstraps.
1705const REPLICATION_BUFFER: usize = 1024;
1706
1707/// Run the server from `config` until a shutdown signal (Ctrl-C).
1708pub async fn run(config: Config) -> Result<(), Error> {
1709    config.validate()?;
1710    let rest_listener = TcpListener::bind(config.rest_addr)
1711        .await
1712        .map_err(Error::Io)?;
1713    let grpc_listener = TcpListener::bind(config.grpc_addr)
1714        .await
1715        .map_err(Error::Io)?;
1716    tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1717    tokio::select! {
1718        result = serve(config, rest_listener, grpc_listener) => result,
1719        () = shutdown_signal() => {
1720            tracing::info!("shutdown signal received");
1721            Ok(())
1722        }
1723    }
1724}
1725
1726/// Serve REST and gRPC on the given (already-bound) listeners until a transport
1727/// error. Exposed so tests can bind ephemeral ports.
1728pub async fn serve(
1729    config: Config,
1730    rest_listener: TcpListener,
1731    grpc_listener: TcpListener,
1732) -> Result<(), Error> {
1733    // Coordinator mode (ADR-0066) is a wholly separate, data-plane-free service: it
1734    // owns the versioned shard map and serves the membership API on the REST
1735    // listener. It opens no database and runs no gRPC.
1736    if config.coordinator {
1737        drop(grpc_listener);
1738        return coordinator::serve_coordinator(config, rest_listener).await;
1739    }
1740    let mut db = open_database(&config)?;
1741    let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1742    // Publish every committed op to replication followers (ADR-0030). The observer
1743    // runs inside the engine's write critical section; `broadcast::Sender::send` is
1744    // non-blocking, so it never stalls a write.
1745    let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1746    {
1747        let tx = replication_tx.clone();
1748        db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1749            let _ = tx.send(entry.clone());
1750        }));
1751    }
1752    // Build the opt-in embedding/rerank providers, resolving each `api_key_env`
1753    // from the environment now (ADR-0047) so a missing key fails fast at startup
1754    // rather than on the first request.
1755    let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1756        .map_err(|e| Error::Config(e.to_string()))?;
1757
1758    // Enable lock-free MVCC reads if requested (ADR-0064): `config.mvcc_reads` or
1759    // `QUIVER_MVCC_READS` (the latter also defaults the engine flag at open).
1760    if config.mvcc_reads {
1761        db.set_mvcc_reads(true);
1762    }
1763    let mvcc = db.mvcc_reads();
1764
1765    // Opt-in cluster router (ADR-0065): build the shard fan-out when configured.
1766    let cluster = if config.cluster_shards.is_empty() {
1767        None
1768    } else {
1769        let c = cluster::Cluster::new(
1770            config.cluster_shards.clone(),
1771            config.cluster_replicas.clone(),
1772            config.cluster_shard_key.clone(),
1773        )?;
1774        tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
1775        let c = Arc::new(c);
1776        // With a coordinator (ADR-0066), refresh the shard map from it so a
1777        // membership change propagates without a restart. The seed above is the
1778        // bootstrap map; the coordinator's map (≥ version 0) takes over on refresh.
1779        if let Some(coord) = config.coordinator_url.clone() {
1780            let router = c.clone();
1781            tokio::spawn(async move {
1782                loop {
1783                    if let Err(e) = router.refresh_from(&coord).await {
1784                        tracing::debug!(error = %e, "shard-map refresh failed; will retry");
1785                    }
1786                    tokio::time::sleep(cluster::MAP_REFRESH_INTERVAL).await;
1787                }
1788            });
1789        }
1790        Some(c)
1791    };
1792
1793    let state = AppState {
1794        db: Arc::new(RwLock::new(db)),
1795        keys: Arc::new(config.api_keys.clone()),
1796        audit,
1797        replication_tx,
1798        read_only: config.leader_url.is_some(),
1799        limits: config.limits,
1800        embed: Arc::new(embed),
1801        rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1802        metrics: Arc::new(metrics::Metrics::default()),
1803        rebuilding: Arc::new(Mutex::new(HashSet::new())),
1804        snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
1805        mvcc,
1806        cluster,
1807    };
1808
1809    // A follower continuously applies the leader's committed-op stream (ADR-0030).
1810    if let Some(leader_url) = config.leader_url.clone() {
1811        replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1812    }
1813
1814    let app = rest::router(state.clone());
1815    let grpc = grpc::service(state);
1816
1817    let tls = load_tls(&config)?;
1818
1819    // REST: terminate TLS with axum-server when configured, else serve plaintext.
1820    let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1821        Some(material) => {
1822            let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1823            let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1824            let server =
1825                axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1826            Box::pin(async move {
1827                server
1828                    .serve(app.into_make_service())
1829                    .await
1830                    .map_err(Error::Io)
1831            })
1832        }
1833        None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1834    };
1835
1836    // gRPC: tonic terminates TLS itself (ring provider) when an identity is set.
1837    let mut grpc_builder = tonic::transport::Server::builder();
1838    if let Some(material) = &tls {
1839        let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1840        let mut tls_config = ServerTlsConfig::new().identity(identity);
1841        // Require client certificates chaining to the configured CA (mTLS).
1842        if let Some(ca_pem) = &material.client_ca_pem {
1843            tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1844        }
1845        grpc_builder = grpc_builder
1846            .tls_config(tls_config)
1847            .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1848    }
1849    let grpc_fut = async move {
1850        grpc_builder
1851            .add_service(grpc)
1852            .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1853            .await
1854            .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1855    };
1856
1857    tokio::try_join!(rest_fut, grpc_fut)?;
1858    Ok(())
1859}
1860
1861async fn shutdown_signal() {
1862    let _ = tokio::signal::ctrl_c().await;
1863}
1864
1865// Open the engine, enabling encryption-at-rest when a key is configured. The
1866// configured key is the **master key** of an envelope key-ring (ADR-0010): it
1867// wraps a per-collection data-encryption key, so dropping a collection
1868// crypto-shreds it. With no key (only valid in `insecure` mode, enforced by
1869// `Config::validate`) the engine is opened in plaintext.
1870fn open_database(config: &Config) -> Result<Database, Error> {
1871    let master_key = config.master_key_hex()?;
1872    let keyring =
1873        quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1874            .map_err(|e| Error::Config(e.to_string()))?;
1875    let db = match keyring {
1876        Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1877        None => Database::open(&config.data_dir)?,
1878    };
1879    Ok(db)
1880}
1881
1882// TLS material shared by both transports: the raw PEM (for tonic's `Identity`
1883// and `client_ca_root`) and a parsed rustls server config (for axum-server's
1884// REST acceptor). `client_ca_pem` is set only when mutual TLS is configured.
1885struct TlsMaterial {
1886    cert_pem: Vec<u8>,
1887    key_pem: Vec<u8>,
1888    client_ca_pem: Option<Vec<u8>>,
1889    rest_config: Arc<rustls::ServerConfig>,
1890}
1891
1892// Read the configured certificate, key, and optional client CA, returning `None`
1893// when TLS is not configured. `Config::validate` already enforces that the cert
1894// and key are set together, that a client CA requires them, and that a
1895// non-loopback bind requires TLS.
1896fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1897    match (&config.tls_cert, &config.tls_key) {
1898        (Some(cert_path), Some(key_path)) => {
1899            let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1900            let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1901            let client_ca_pem = config
1902                .tls_client_ca
1903                .as_ref()
1904                .map(std::fs::read)
1905                .transpose()
1906                .map_err(Error::Io)?;
1907            let rest_config = Arc::new(rustls_server_config(
1908                &cert_pem,
1909                &key_pem,
1910                client_ca_pem.as_deref(),
1911            )?);
1912            Ok(Some(TlsMaterial {
1913                cert_pem,
1914                key_pem,
1915                client_ca_pem,
1916                rest_config,
1917            }))
1918        }
1919        (None, None) => Ok(None),
1920        _ => Err(Error::Config(
1921            "tls_cert and tls_key must be set together".to_owned(),
1922        )),
1923    }
1924}
1925
1926// Build a rustls server config from PEM bytes over the audited `ring` provider
1927// (no OpenSSL, no aws-lc-rs C toolchain). TLS 1.3 and 1.2 are offered. When a
1928// client CA is supplied, client certificates chaining to it are required (mTLS).
1929fn rustls_server_config(
1930    cert_pem: &[u8],
1931    key_pem: &[u8],
1932    client_ca_pem: Option<&[u8]>,
1933) -> Result<rustls::ServerConfig, Error> {
1934    use rustls_pki_types::pem::PemObject;
1935    use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1936
1937    let certs = CertificateDer::pem_slice_iter(cert_pem)
1938        .collect::<std::result::Result<Vec<_>, _>>()
1939        .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1940    if certs.is_empty() {
1941        return Err(Error::Config(
1942            "tls_cert contains no certificates".to_owned(),
1943        ));
1944    }
1945    let key = PrivateKeyDer::from_pem_slice(key_pem)
1946        .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1947    let provider = Arc::new(rustls::crypto::ring::default_provider());
1948    let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1949        .with_safe_default_protocol_versions()
1950        .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1951    let builder = match client_ca_pem {
1952        Some(ca_pem) => {
1953            let mut roots = rustls::RootCertStore::empty();
1954            for cert in CertificateDer::pem_slice_iter(ca_pem) {
1955                let cert =
1956                    cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1957                roots
1958                    .add(cert)
1959                    .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1960            }
1961            let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1962                Arc::new(roots),
1963                provider,
1964            )
1965            .build()
1966            .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1967            builder.with_client_cert_verifier(verifier)
1968        }
1969        None => builder.with_no_client_auth(),
1970    };
1971    builder
1972        .with_single_cert(certs, key)
1973        .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1974}
1975
1976/// Initialize structured logging from `RUST_LOG` (defaulting to `info`). Safe to
1977/// call once at startup; a second call is ignored.
1978pub fn init_tracing() {
1979    init_observability(&Config::default());
1980}
1981
1982/// Install the global tracing subscriber: an `RUST_LOG`-driven `fmt` layer plus,
1983/// when the `otlp` feature is built **and** an OTLP endpoint is configured
1984/// (ADR-0059), an OpenTelemetry traces export layer. Safe to call once at
1985/// startup; a second call is a no-op. A failure to build the OTLP exporter logs a
1986/// warning and falls back to `fmt`-only rather than taking the server down.
1987#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1988pub fn init_observability(config: &Config) {
1989    use tracing_subscriber::EnvFilter;
1990    use tracing_subscriber::prelude::*;
1991    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1992    let registry = tracing_subscriber::registry()
1993        .with(filter)
1994        .with(tracing_subscriber::fmt::layer());
1995
1996    #[cfg(feature = "otlp")]
1997    if config.otlp.is_enabled() {
1998        match otlp::build_provider(&config.otlp) {
1999            Ok(provider) => {
2000                use opentelemetry::trace::TracerProvider as _;
2001                let tracer = provider.tracer("quiver");
2002                otlp::store_provider(provider);
2003                let _ = registry
2004                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
2005                    .try_init();
2006                return;
2007            }
2008            Err(e) => eprintln!("OTLP traces export disabled: {e}"),
2009        }
2010    }
2011
2012    let _ = registry.try_init();
2013}
2014
2015/// Flush and shut down the OpenTelemetry exporter, if one was installed. A no-op
2016/// without the `otlp` feature or when no endpoint was configured. Call once on
2017/// server shutdown so batched spans are not lost.
2018pub fn shutdown_observability() {
2019    #[cfg(feature = "otlp")]
2020    otlp::shutdown();
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025    use super::*;
2026
2027    // A valid 64-hex-character (256-bit) test key.
2028    const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
2029
2030    #[test]
2031    fn config_rejects_missing_keys_unless_insecure() {
2032        let mut config = Config::default();
2033        assert!(config.validate().is_err());
2034        config.insecure = true;
2035        assert!(config.validate().is_ok());
2036        config.insecure = false;
2037        config.api_keys = vec!["secret".into()];
2038        config.encryption_key = Some(TEST_KEY.to_owned());
2039        assert!(config.validate().is_ok());
2040    }
2041
2042    #[test]
2043    fn config_requires_encryption_key_unless_insecure() {
2044        let mut config = Config {
2045            api_keys: vec!["secret".into()],
2046            ..Config::default()
2047        };
2048        // API key set but no encryption key, not insecure ⇒ rejected.
2049        assert!(config.validate().is_err());
2050        config.encryption_key = Some(TEST_KEY.to_owned());
2051        assert!(config.validate().is_ok());
2052        // A malformed key is rejected up front, not at first write.
2053        config.encryption_key = Some("not-a-valid-hex-key".to_owned());
2054        assert!(config.validate().is_err());
2055        // Insecure mode may run without encryption-at-rest.
2056        config.insecure = true;
2057        config.encryption_key = None;
2058        assert!(config.validate().is_ok());
2059    }
2060
2061    #[test]
2062    fn master_key_file_is_an_alternative_to_the_env_key() {
2063        let dir = tempfile::tempdir().unwrap();
2064        let path = dir.path().join("master.key");
2065        // A trailing newline (as editors and `echo` add) is trimmed off.
2066        std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2067
2068        let mut config = Config {
2069            api_keys: vec!["secret".into()],
2070            master_key_file: Some(path.clone()),
2071            ..Config::default()
2072        };
2073        // The file alone satisfies encryption-at-rest and resolves to the key.
2074        assert!(config.validate().is_ok());
2075        assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2076
2077        // Setting both the env key and a file is rejected as ambiguous.
2078        config.encryption_key = Some(TEST_KEY.to_owned());
2079        assert!(config.validate().is_err());
2080
2081        // A file holding malformed hex is rejected up front.
2082        config.encryption_key = None;
2083        std::fs::write(&path, "not-a-valid-key").unwrap();
2084        assert!(config.validate().is_err());
2085    }
2086
2087    #[test]
2088    fn config_rejects_public_bind_without_optout() {
2089        let mut config = Config {
2090            api_keys: vec!["secret".into()],
2091            encryption_key: Some(TEST_KEY.to_owned()),
2092            ..Config::default()
2093        };
2094        config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2095        // Auth and encryption are satisfied, so the only failure is the bind rule.
2096        assert!(config.validate().is_err());
2097        config.insecure = true;
2098        assert!(config.validate().is_ok());
2099    }
2100
2101    #[test]
2102    fn config_public_bind_allowed_with_tls() {
2103        let config = Config {
2104            api_keys: vec!["secret".into()],
2105            encryption_key: Some(TEST_KEY.to_owned()),
2106            tls_cert: Some(PathBuf::from("cert.pem")),
2107            tls_key: Some(PathBuf::from("key.pem")),
2108            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2109            ..Config::default()
2110        };
2111        // TLS configured ⇒ a non-loopback bind is allowed without insecure.
2112        assert!(config.validate().is_ok());
2113    }
2114
2115    #[test]
2116    fn config_tls_cert_and_key_must_pair() {
2117        let mut config = Config {
2118            api_keys: vec!["secret".into()],
2119            encryption_key: Some(TEST_KEY.to_owned()),
2120            tls_cert: Some(PathBuf::from("cert.pem")),
2121            ..Config::default()
2122        };
2123        // Cert without key ⇒ rejected.
2124        assert!(config.validate().is_err());
2125        config.tls_key = Some(PathBuf::from("key.pem"));
2126        assert!(config.validate().is_ok());
2127    }
2128}