Skip to main content

quiver_server/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The Quiver daemon: gRPC and REST over the embeddable [`Database`], with
3//! API-key auth and secure-by-default configuration.
4//!
5//! Both transports are thin shells over the same shared engine operations; the
6//! engine is synchronous and CPU/`fsync`-bound, so every call is offloaded with
7//! `spawn_blocking`. Access is guarded by a reader–writer lock (ADR-0057): reads
8//! take the shared lock and run **concurrently**, writes take the exclusive lock,
9//! and the single-writer model is unchanged (ADR-0002/0006). A read that finds a
10//! collection's index stale (a prior write deferred its rebuild) serves the prior
11//! snapshot and schedules an **off-lock** rebuild (ADR-0062): the index is rebuilt
12//! under the shared lock and swapped in under a brief write lock, so a rebuild never
13//! stalls concurrent readers.
14//!
15//! Auth is by scoped API key (Bearer / gRPC `authorization` metadata) with
16//! default-deny RBAC: each key carries a role (read ⊆ write ⊆ admin) and a
17//! collection scope, enforced on every operation at the shared op layer
18//! (ADR-0011/0013, the `auth` module). Encryption-at-rest is on by default
19//! (ADR-0010): unless `insecure` is set, an `encryption_key` is required and the
20//! engine is opened through `quiver-crypto`'s AEAD codec; payloads may also be
21//! client-side-encrypted (ADR-0012). TLS-in-transit uses `rustls` over the
22//! audited `ring` provider — REST via `axum-server`, gRPC via tonic's `tls-ring`
23//! — and a non-loopback bind requires it; setting a client CA additionally
24//! requires mutual TLS. Mutating and administrative operations, and every
25//! access-control denial, are recorded to an append-only audit log (ADR-0011,
26//! the `audit` module) when `audit_log` is set. Per-request cost limits bound
27//! the work any single authenticated request can demand (ADR-0040, the
28//! [`Limits`] type), and an opt-in per-key token-bucket rate limiter bounds the
29//! request *rate* (ADR-0049, the [`RateLimiter`] type); per-tenant engine
30//! partitioning is a later phase. Design: `docs/api/rest-grpc.md`.
31
32mod audit;
33mod auth;
34mod cluster;
35mod coordinator;
36mod error;
37mod grpc;
38mod metrics;
39mod otlp;
40#[cfg(feature = "raft")]
41pub mod raft;
42mod rate_limit;
43mod replication;
44mod rest;
45
46use std::collections::{HashMap, HashSet};
47use std::future::Future;
48use std::net::{IpAddr, Ipv4Addr, SocketAddr};
49use std::path::PathBuf;
50use std::pin::Pin;
51use std::sync::{Arc, Mutex, RwLock};
52
53use axum_server::tls_rustls::RustlsConfig;
54use figment::Figment;
55use figment::providers::{Env, Format, Serialized, Toml};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use tokio::net::TcpListener;
59use tokio::sync::broadcast;
60use tokio_stream::wrappers::TcpListenerStream;
61use tonic::transport::{Certificate, Identity, ServerTlsConfig};
62
63use quiver_crypto::AeadCodec;
64use quiver_embed::{
65    Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
66    SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
67};
68use quiver_query::Filter;
69
70pub use auth::{Action, ApiKey, CollectionScope};
71pub use error::Error;
72pub use otlp::OtlpConfig;
73// The embedding/rerank seam lives in its own lean crate (ADR-0058) so the MCP
74// server can share it; re-exported here so the server's public API is unchanged.
75pub use quiver_providers::{
76    EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
77    RerankProvider,
78};
79pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
80
81use audit::{AuditLog, Outcome};
82use auth::Principal;
83
84/// Per-request cost limits (ADR-0040). Each cap bounds the work a single
85/// authenticated request can demand, so one oversized request cannot exhaust the
86/// node under the single-writer model (ADR-0006). Over-limit requests are
87/// **rejected** with HTTP 400 / gRPC `InvalidArgument` rather than silently
88/// clamped — a truncated `k` or `ef_search` would return surprising, lower-quality
89/// results with no signal. Defaults are generous; raise a cap with a `[limits]`
90/// table in `quiver.toml` or the matching `QUIVER_MAX_*` environment variable.
91#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
92#[serde(default)]
93pub struct Limits {
94    /// Maximum `k` (top-k) for `search` / `search_multi_vector`.
95    pub max_k: usize,
96    /// Maximum search beam width (`ef_search`).
97    pub max_ef_search: usize,
98    /// Maximum `fetch` page size.
99    pub max_fetch_limit: usize,
100    /// Maximum vector dimension: the declared collection dimension at creation
101    /// and the length of any query vector (per token, for multi-vector).
102    pub max_vector_dim: usize,
103    /// Maximum serialized-JSON payload size per point, in bytes.
104    pub max_payload_bytes: usize,
105    /// Maximum number of points / documents in a single upsert request.
106    pub max_batch_size: usize,
107    /// Maximum HTTP request body size, in bytes (enforced by the REST layer via
108    /// axum's `DefaultBodyLimit`; gRPC is bounded by tonic's decode limit).
109    pub max_request_body_bytes: usize,
110    /// Maximum number of non-zero terms in a hybrid-search sparse query (ADR-0043),
111    /// bounding the posting-list scan.
112    pub max_sparse_terms: usize,
113    /// Maximum number of points in a single bulk upsert (`POST …/points:bulk`,
114    /// ADR-0045). Larger than `max_batch_size` because the bulk path defers index
115    /// maintenance to one rebuild; the request is still bounded by
116    /// `max_request_body_bytes`, so raise that too for very large bulk loads.
117    pub max_bulk_batch_size: usize,
118}
119
120impl Default for Limits {
121    fn default() -> Self {
122        Self {
123            max_k: 10_000,
124            max_ef_search: 4_096,
125            max_fetch_limit: 10_000,
126            max_vector_dim: 8_192,
127            max_payload_bytes: 65_536,
128            max_batch_size: 1_000,
129            max_request_body_bytes: 32 * 1024 * 1024,
130            max_sparse_terms: 4_096,
131            max_bulk_batch_size: 50_000,
132        }
133    }
134}
135
136impl Limits {
137    // Apply `QUIVER_MAX_*` overrides after figment extraction (ADR-0013 env
138    // layer). The flat env keys do not nest under figment's `limits` table, so
139    // they are read explicitly here; a malformed value is a hard config error.
140    fn apply_env_overrides(&mut self) -> Result<(), Error> {
141        let slots: [(&str, &mut usize); 9] = [
142            ("QUIVER_MAX_K", &mut self.max_k),
143            ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
144            ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
145            ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
146            ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
147            ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
148            (
149                "QUIVER_MAX_REQUEST_BODY_BYTES",
150                &mut self.max_request_body_bytes,
151            ),
152            ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
153            ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
154        ];
155        for (key, slot) in slots {
156            if let Ok(raw) = std::env::var(key) {
157                *slot = raw.parse().map_err(|_| {
158                    Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
159                })?;
160            }
161        }
162        Ok(())
163    }
164
165    // Reject a zero cap (a `0` limit would refuse every request).
166    fn validate(&self) -> Result<(), Error> {
167        let named = [
168            ("max_k", self.max_k),
169            ("max_ef_search", self.max_ef_search),
170            ("max_fetch_limit", self.max_fetch_limit),
171            ("max_vector_dim", self.max_vector_dim),
172            ("max_payload_bytes", self.max_payload_bytes),
173            ("max_batch_size", self.max_batch_size),
174            ("max_request_body_bytes", self.max_request_body_bytes),
175            ("max_sparse_terms", self.max_sparse_terms),
176            ("max_bulk_batch_size", self.max_bulk_batch_size),
177        ];
178        if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
179            return Err(Error::Config(format!(
180                "limits.{name} must be greater than zero"
181            )));
182        }
183        Ok(())
184    }
185
186    fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
187        if k > self.max_k {
188            return Err(Error::BadRequest(format!(
189                "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
190                self.max_k
191            )));
192        }
193        if ef_search > self.max_ef_search {
194            return Err(Error::BadRequest(format!(
195                "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
196                self.max_ef_search
197            )));
198        }
199        Ok(())
200    }
201
202    fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
203        if n > self.max_sparse_terms {
204            return Err(Error::BadRequest(format!(
205                "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
206                self.max_sparse_terms
207            )));
208        }
209        Ok(())
210    }
211
212    fn check_fetch(&self, limit: usize) -> Result<(), Error> {
213        if limit > self.max_fetch_limit {
214            return Err(Error::BadRequest(format!(
215                "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
216                self.max_fetch_limit
217            )));
218        }
219        Ok(())
220    }
221
222    fn check_dim(&self, dim: usize) -> Result<(), Error> {
223        if dim > self.max_vector_dim {
224            return Err(Error::BadRequest(format!(
225                "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
226                self.max_vector_dim
227            )));
228        }
229        Ok(())
230    }
231
232    fn check_vector_len(&self, len: usize) -> Result<(), Error> {
233        if len > self.max_vector_dim {
234            return Err(Error::BadRequest(format!(
235                "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
236                self.max_vector_dim
237            )));
238        }
239        Ok(())
240    }
241
242    fn check_batch(&self, n: usize) -> Result<(), Error> {
243        if n > self.max_batch_size {
244            return Err(Error::BadRequest(format!(
245                "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
246                self.max_batch_size
247            )));
248        }
249        Ok(())
250    }
251
252    fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
253        if n > self.max_bulk_batch_size {
254            return Err(Error::BadRequest(format!(
255                "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
256                self.max_bulk_batch_size
257            )));
258        }
259        Ok(())
260    }
261
262    fn check_payload(&self, payload: &Value) -> Result<(), Error> {
263        let size = serde_json::to_vec(payload)
264            .map(|v| v.len())
265            .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
266        if size > self.max_payload_bytes {
267            return Err(Error::BadRequest(format!(
268                "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
269                self.max_payload_bytes
270            )));
271        }
272        Ok(())
273    }
274}
275
276/// Server configuration, layered defaults → `quiver.toml` → `QUIVER_*` env and
277/// validated at startup (ADR-0013).
278#[derive(Debug, Clone, Serialize, Deserialize)]
279#[serde(default)]
280pub struct Config {
281    /// Data directory for the storage engine.
282    pub data_dir: PathBuf,
283    /// REST (HTTP/1.1) bind address.
284    pub rest_addr: SocketAddr,
285    /// gRPC (HTTP/2) bind address.
286    pub grpc_addr: SocketAddr,
287    /// Accepted API keys with their RBAC scopes (ADR-0011). A bare secret —
288    /// from a comma-separated `QUIVER_API_KEYS` (the env form) or a plain TOML
289    /// array entry — is an all-collections admin key; a structured
290    /// `{secret, role, collections}` entry pins a narrower scope. Empty is
291    /// allowed only with `insecure = true`.
292    #[serde(default, deserialize_with = "auth::de_api_keys")]
293    pub api_keys: Vec<ApiKey>,
294    /// Hex-encoded 256-bit **master key** for encryption-at-rest (64 hex
295    /// characters). It wraps per-collection data-encryption keys (ADR-0010).
296    /// Required unless `insecure = true` or [`master_key_file`] is set; source it
297    /// from the environment or a secret store, never the committed config. `None`
298    /// ⇒ data is stored unencrypted (only valid in `insecure` mode).
299    ///
300    /// [`master_key_file`]: Config::master_key_file
301    pub encryption_key: Option<String>,
302    /// Path to a file holding the hex master key, as an alternative to
303    /// [`encryption_key`] (set exactly one). Lets the key arrive as a mounted
304    /// secret (Docker/Kubernetes) or a KMS-decrypted file rather than an
305    /// environment variable. It should be mode `0600`; a group/world-accessible
306    /// file is warned about at startup.
307    ///
308    /// [`encryption_key`]: Config::encryption_key
309    pub master_key_file: Option<PathBuf>,
310    /// Path to the PEM-encoded TLS certificate chain. Must be set together with
311    /// `tls_key`. Required for a non-loopback bind unless `insecure = true`.
312    pub tls_cert: Option<PathBuf>,
313    /// Path to the PEM-encoded TLS private key. Must be set together with
314    /// `tls_cert`.
315    pub tls_key: Option<PathBuf>,
316    /// Path to a PEM-encoded CA certificate that signs accepted client
317    /// certificates. When set, both transports require **mutual TLS**: a client
318    /// must present a certificate chaining to this CA to connect (ADR-0011).
319    /// Requires `tls_cert`/`tls_key`; bearer API keys still carry the RBAC scope.
320    pub tls_client_ca: Option<PathBuf>,
321    /// Path to an append-only audit log file (ADR-0011). When set, every
322    /// mutating and administrative operation and every access-control denial is
323    /// appended as one JSON object per line (JSON Lines); records are always
324    /// also emitted as `tracing` events. Unset ⇒ tracing only. Secrets are never
325    /// written — see `docs/security/audit.md`.
326    pub audit_log: Option<PathBuf>,
327    /// Run as a **read-replica follower** (ADR-0030): connect to a leader's gRPC
328    /// endpoint at this URL (e.g. `http://leader:6334`) and continuously apply its
329    /// committed operations, serving reads. A follower **refuses writes**. Unset ⇒
330    /// this node is a normal read-write leader. (Plaintext `http://` for now; TLS
331    /// to the leader is a follow-up — run replication over a trusted network.)
332    pub leader_url: Option<String>,
333    /// API key the follower presents to the leader's admin-scoped `Replicate`
334    /// stream (used with `leader_url`). Source it like any secret.
335    pub leader_api_key: Option<String>,
336    /// Opt out of the secure defaults (no auth, no encryption-at-rest, allow a
337    /// non-loopback bind without TLS). For local development only; never the
338    /// default.
339    pub insecure: bool,
340    /// Per-request cost limits (ADR-0040). Set with a `[limits]` table in
341    /// `quiver.toml` or the `QUIVER_MAX_*` environment variables.
342    pub limits: Limits,
343    /// Opt-in server-side embedding providers, keyed by collection name (ADR-0047).
344    /// Configured with `[embedding.<collection>]` tables in `quiver.toml`; default
345    /// empty, so the engine stays model-agnostic and library mode is unaffected.
346    /// API keys are referenced by env-var *name* and resolved at startup, never
347    /// stored. Enables `search_text` / `upsert_text` for the named collections.
348    #[serde(default)]
349    pub embedding: HashMap<String, EmbeddingConfig>,
350    /// Opt-in server-side rerank providers, keyed by collection name (ADR-0047).
351    /// Configured with `[rerank.<collection>]` tables; enables the one-call
352    /// retrieve→rerank stage of `search_text`.
353    #[serde(default)]
354    pub rerank: HashMap<String, RerankConfig>,
355    /// Opt-in per-key rate limiting (ADR-0049). Set a `[rate_limit]` table in
356    /// `quiver.toml` or the `QUIVER_RATE_LIMIT_*` env vars;
357    /// `requests_per_second = 0` (the default) disables it.
358    #[serde(default)]
359    pub rate_limit: RateLimitConfig,
360    /// Opt-in OpenTelemetry traces export (ADR-0059). Set an `[otlp]` table or the
361    /// `QUIVER_OTLP_*` env vars; an empty `endpoint` (the default) disables it.
362    /// Export additionally requires the server's `otlp` build feature.
363    #[serde(default)]
364    pub otlp: OtlpConfig,
365    /// Lock-free MVCC reads (ADR-0064), **experimental and default-off**. Serves
366    /// reads of single-vector, in-memory collections from an `arc-swap` snapshot so
367    /// pure-vector reads never block on a concurrent writer. Also settable via
368    /// `QUIVER_MVCC_READS`. See `docs/adr/0064-mvcc-reads-implementation.md`.
369    #[serde(default)]
370    pub mvcc_reads: bool,
371    /// Opt-in **cluster router** mode (ADR-0065): a non-empty list of shard base
372    /// URLs (e.g. `["http://s1:6333","http://s2:6333"]`, or the bracketed
373    /// `QUIVER_CLUSTER_SHARDS` env array `[http://s1:6333,http://s2:6333]`) makes
374    /// this server a stateless router that shards writes and scatter-gathers
375    /// searches across the shards. Empty (the default) = an ordinary single-node
376    /// server.
377    #[serde(default)]
378    pub cluster_shards: Vec<String>,
379    /// Optional per-shard **read replicas** (ADR-0065 increment 2). Each entry is
380    /// `"<shard_index>=<replica_url>"` (repeat the index for several replicas of one
381    /// shard), e.g. `["0=http://s1b:6333", "1=http://s2b:6333"]` or the bracketed
382    /// `QUIVER_CLUSTER_REPLICAS` env list. A replica is an ordinary follower
383    /// (ADR-0030) of its shard's primary; the router fans searches across
384    /// `{primary} ∪ replicas` to spread read load. A shard with no entry is
385    /// primary-only. Empty (the default) = no replicas.
386    #[serde(default)]
387    pub cluster_replicas: Vec<String>,
388    /// Optional API key the router presents to its shards (a cluster runs over a
389    /// trusted network). `None` = shards are unauthenticated.
390    #[serde(default)]
391    pub cluster_shard_key: Option<String>,
392    /// Run this process as the **cluster coordinator** (ADR-0066) instead of a
393    /// normal server: it owns the authoritative versioned shard map and serves the
394    /// `/cluster/*` membership API, off the data path. It seeds its map from
395    /// `cluster_shards`/`cluster_replicas` (or recovers it from `coordinator_state`).
396    /// Default `false`.
397    #[serde(default)]
398    pub coordinator: bool,
399    /// Base URL of the cluster coordinator (ADR-0066). When set on a **router**, the
400    /// router refreshes its shard map from `GET {url}/cluster/map` on an interval, so
401    /// a membership change propagates without a restart. `None` = a static map from
402    /// `cluster_shards` (increments 1–2 behaviour).
403    #[serde(default)]
404    pub coordinator_url: Option<String>,
405    /// Where the **coordinator** persists its versioned map + id counter, so a
406    /// restart recovers the exact membership (and never reuses a shard id). `None` =
407    /// in-memory only (lost on restart). Only meaningful with `coordinator = true`.
408    #[serde(default)]
409    pub coordinator_state: Option<PathBuf>,
410    /// This node's **Raft member id** within its shard's group (ADR-0067, write
411    /// HA). Setting it opts the node into per-shard Raft: it joins the group
412    /// described by [`raft_members`], commits writes only after a **quorum**
413    /// commit, and fails over automatically if the leader dies. `None` (the
414    /// default) = the single-primary shard of increments 1–3, unchanged. Requires
415    /// the server's `raft` build feature.
416    ///
417    /// [`raft_members`]: Config::raft_members
418    #[serde(default)]
419    pub raft_node_id: Option<u64>,
420    /// The Raft group's members as `"<id>=<grpc-url>"` entries (ADR-0067), e.g.
421    /// `["1=http://n1:6334","2=http://n2:6334","3=http://n3:6334"]` or the
422    /// bracketed `QUIVER_RAFT_MEMBERS` env array. Each URL is the member's gRPC
423    /// address (the `RaftService` rides the existing gRPC server). Used only when
424    /// [`raft_node_id`] is set; the lowest id bootstraps the group.
425    ///
426    /// [`raft_node_id`]: Config::raft_node_id
427    #[serde(default)]
428    pub raft_members: Vec<String>,
429}
430
431impl Default for Config {
432    fn default() -> Self {
433        Self {
434            data_dir: PathBuf::from("./quiver-data"),
435            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
436            grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
437            api_keys: Vec::new(),
438            encryption_key: None,
439            master_key_file: None,
440            tls_cert: None,
441            tls_key: None,
442            tls_client_ca: None,
443            audit_log: None,
444            leader_url: None,
445            leader_api_key: None,
446            insecure: false,
447            limits: Limits::default(),
448            embedding: HashMap::new(),
449            rerank: HashMap::new(),
450            rate_limit: RateLimitConfig::default(),
451            otlp: OtlpConfig::default(),
452            mvcc_reads: false,
453            cluster_shards: Vec::new(),
454            cluster_replicas: Vec::new(),
455            cluster_shard_key: None,
456            coordinator: false,
457            coordinator_url: None,
458            coordinator_state: None,
459            raft_node_id: None,
460            raft_members: Vec::new(),
461        }
462    }
463}
464
465impl Config {
466    /// Load configuration from defaults, an optional `quiver.toml`, and
467    /// `QUIVER_*` environment variables.
468    pub fn load() -> Result<Self, Error> {
469        let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
470            .merge(Toml::file("quiver.toml"))
471            .merge(Env::prefixed("QUIVER_"))
472            .extract()
473            .map_err(|e| Error::Config(e.to_string()))?;
474        // The flat `QUIVER_MAX_*` env keys do not nest under the `limits` table
475        // figment builds, so apply them explicitly (ADR-0040).
476        config.limits.apply_env_overrides()?;
477        // Same for the flat `QUIVER_RATE_LIMIT_*` keys (ADR-0049).
478        config
479            .rate_limit
480            .apply_env_overrides()
481            .map_err(Error::Config)?;
482        // Same for the flat `QUIVER_OTLP_*` keys (ADR-0059).
483        config.otlp.apply_env_overrides().map_err(Error::Config)?;
484        Ok(config)
485    }
486
487    /// Reject insecure configurations unless explicitly opted out (ADR-0013):
488    /// no anonymous access, encryption-at-rest on by default with a valid key,
489    /// and no non-loopback bind without TLS.
490    pub fn validate(&self) -> Result<(), Error> {
491        if self.api_keys.is_empty() && !self.insecure {
492            return Err(Error::Config(
493                "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
494                 set insecure=true for local development"
495                    .to_owned(),
496            ));
497        }
498        // Resolve the master key from the env var or a key file (exactly one).
499        let master_key = self.master_key_hex()?;
500        if master_key.is_none() && !self.insecure {
501            return Err(Error::Config(
502                "no encryption key configured: encryption-at-rest is on by default — \
503                 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
504                 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
505                 store data unencrypted (development only)"
506                    .to_owned(),
507            ));
508        }
509        // Fail fast on a malformed key rather than at first write.
510        if let Some(key) = &master_key {
511            AeadCodec::from_hex(key)
512                .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
513        }
514        // TLS certificate and key are set together or not at all.
515        if self.tls_cert.is_some() != self.tls_key.is_some() {
516            return Err(Error::Config(
517                "tls_cert and tls_key must be set together".to_owned(),
518            ));
519        }
520        // mTLS layers on top of server TLS: a client CA needs a server cert/key.
521        if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
522            return Err(Error::Config(
523                "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
524            ));
525        }
526        let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
527        let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
528        if non_loopback && !tls_enabled && !self.insecure {
529            return Err(Error::Config(
530                "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
531                 or insecure=true for local development"
532                    .to_owned(),
533            ));
534        }
535        // Reject a nonsensical cost limit (a `0` cap would refuse every request).
536        self.limits.validate()?;
537        Ok(())
538    }
539
540    /// The effective hex master key: from [`master_key_file`] when set (read and
541    /// trimmed), otherwise [`encryption_key`]. `None` means no key is configured
542    /// (only valid with `insecure`).
543    ///
544    /// # Errors
545    /// [`Error::Config`] if both sources are set, or the key file cannot be read.
546    ///
547    /// [`master_key_file`]: Config::master_key_file
548    /// [`encryption_key`]: Config::encryption_key
549    pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
550        let env_key = self
551            .encryption_key
552            .as_deref()
553            .map(str::trim)
554            .filter(|k| !k.is_empty());
555        match (&self.master_key_file, env_key) {
556            (Some(_), Some(_)) => Err(Error::Config(
557                "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
558                 (QUIVER_MASTER_KEY_FILE), not both"
559                    .to_owned(),
560            )),
561            (Some(path), None) => {
562                warn_if_world_readable(path);
563                let hex = std::fs::read_to_string(path).map_err(|e| {
564                    Error::Config(format!("reading master_key_file {}: {e}", path.display()))
565                })?;
566                Ok(Some(hex.trim().to_owned()))
567            }
568            (None, Some(key)) => Ok(Some(key.to_owned())),
569            (None, None) => Ok(None),
570        }
571    }
572}
573
574// Warn (don't fail — Docker/Kubernetes secrets often mount group/world-readable)
575// when a master-key file is more permissive than `0600`.
576#[cfg(unix)]
577fn warn_if_world_readable(path: &std::path::Path) {
578    use std::os::unix::fs::PermissionsExt;
579    if let Ok(meta) = std::fs::metadata(path)
580        && meta.permissions().mode() & 0o077 != 0
581    {
582        tracing::warn!(
583            path = %path.display(),
584            mode = format!("{:o}", meta.permissions().mode() & 0o777),
585            "master key file is group/world-accessible; restrict it to 0600"
586        );
587    }
588}
589
590#[cfg(not(unix))]
591fn warn_if_world_readable(_path: &std::path::Path) {}
592
593/// Shared server state: the engine behind a single-writer lock, the accepted
594/// API keys with their RBAC scopes, and the audit log.
595#[derive(Clone)]
596pub(crate) struct AppState {
597    // The engine behind a reader–writer lock (ADR-0057): reads take the shared
598    // lock and run concurrently; writes take the exclusive lock. A read that finds
599    // a collection's index stale upgrades to the write lock once to rebuild it,
600    // then serves concurrently again.
601    db: Arc<RwLock<Database>>,
602    keys: Arc<Vec<ApiKey>>,
603    audit: Arc<AuditLog>,
604    // Fan-out of every committed op to replication followers (ADR-0030). The
605    // commit observer publishes here; each `Replicate` stream subscribes.
606    replication_tx: broadcast::Sender<WalEntry>,
607    // True on a replication follower: external writes are refused; the engine's
608    // state is owned by the stream it applies from the leader (ADR-0030).
609    read_only: bool,
610    // Per-request cost limits, enforced at this op layer so both transports are
611    // covered by one implementation (ADR-0040).
612    limits: Limits,
613    // Opt-in, provider-agnostic server-side embedding/rerank providers, keyed by
614    // collection (ADR-0047). Empty on the common path; `search_text`/`upsert_text`
615    // require a configured embedder for the target collection.
616    embed: Arc<EmbedRegistry>,
617    // Opt-in per-key token-bucket rate limiter (ADR-0049). A no-op when disabled.
618    rate_limiter: Arc<RateLimiter>,
619    // Prometheus metrics registry (ADR-0014/0054), scraped at `GET /metrics`.
620    metrics: Arc<metrics::Metrics>,
621    // Collections with an off-lock index rebuild in flight (ADR-0062). A search that
622    // observes a deferred rebuild schedules one here, single-flighted so concurrent
623    // readers never kick off duplicate builds for the same collection.
624    rebuilding: Arc<Mutex<HashSet<String>>>,
625    // Lock-free MVCC read cache (ADR-0064 increment 3): cached `arc-swap` snapshot
626    // cells for MVCC-served collections. A pure-vector search loads the cell and
627    // searches it **without taking the database lock**, so it never blocks on a
628    // concurrent writer. Populated under the read lock the first time a collection
629    // is seen fresh; the map itself changes only on create/drop, never on a data
630    // write. Empty unless `QUIVER_MVCC_READS` is on.
631    snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
632    // Snapshot of the engine's `QUIVER_MVCC_READS` flag, read once at startup. When
633    // on, the writer drives off-lock consolidation rebuilds (ADR-0064 increment 3)
634    // because pure-vector fast-path reads bypass the reader-driven scheduler. Off by
635    // default, so the non-MVCC path is byte-identical.
636    mvcc: bool,
637    // Opt-in cluster router (ADR-0065): when `Some`, every collection/point op
638    // fans out to the shards instead of touching the local engine. `None` = an
639    // ordinary single-node server (the engine-backed path, untouched).
640    cluster: Option<Arc<cluster::Cluster>>,
641    // Opt-in per-shard Raft write HA (ADR-0067): when `Some`, this node is a Raft
642    // voter for its shard — writes are proposed through consensus and applied only
643    // after a quorum commits, so a leader failure fails over with no lost
644    // acknowledged write. `None` = the single-primary shard (increments 1–3).
645    // Compiled only behind the off-by-default `raft` feature.
646    #[cfg(feature = "raft")]
647    raft: Option<Arc<raft::RaftShard>>,
648}
649
650/// A collection's metadata.
651pub(crate) struct CollectionInfo {
652    pub name: String,
653    pub dim: u32,
654    pub metric: DistanceMetric,
655    pub count: u64,
656    pub index: IndexSpec,
657    pub filterable: Vec<FilterableField>,
658    pub multivector: bool,
659    pub vector_encryption: VectorEncryption,
660}
661
662/// A point to upsert.
663pub(crate) struct PointIn {
664    pub id: String,
665    pub vector: Vec<f32>,
666    pub payload: Value,
667}
668
669/// A text point to embed server-side and upsert (ADR-0047).
670pub(crate) struct TextPointIn {
671    pub id: String,
672    pub text: String,
673    pub payload: Value,
674}
675
676/// Default candidate pool size a rerank stage over-fetches before reordering to
677/// the requested `k` (ADR-0047).
678const RERANK_CANDIDATES: usize = 50;
679
680/// The document text a rerank stage scores: the original text stored under
681/// [`TEXT_KEY`] by `upsert_text`, else the whole payload as a string so the
682/// reranker still has something to compare.
683fn doc_text(payload: Option<&Value>) -> String {
684    match payload {
685        Some(Value::Object(map)) => map
686            .get(TEXT_KEY)
687            .and_then(Value::as_str)
688            .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
689        Some(v) => v.to_string(),
690        None => String::new(),
691    }
692}
693
694/// A fetched point.
695pub(crate) struct PointOut {
696    pub id: String,
697    pub vector: Option<Vec<f32>>,
698    pub payload: Value,
699}
700
701/// A search hit.
702pub(crate) struct MatchOut {
703    pub id: String,
704    pub score: f32,
705    pub payload: Option<Value>,
706    pub vector: Option<Vec<f32>>,
707}
708
709/// A multi-vector document to upsert.
710pub(crate) struct DocumentIn {
711    pub id: String,
712    pub vectors: Vec<Vec<f32>>,
713    pub payload: Value,
714}
715
716/// A multi-vector document hit (MaxSim).
717pub(crate) struct DocumentMatchOut {
718    pub id: String,
719    pub score: f32,
720    pub payload: Option<Value>,
721    pub vectors: Option<Vec<Vec<f32>>>,
722}
723
724impl AppState {
725    /// Authenticate a presented bearer token to its [`Principal`], or `None`
726    /// (a 401). An empty key set means `insecure` mode (validated at startup),
727    /// which admits any caller as an all-collections admin.
728    pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
729        auth::authenticate(&self.keys, presented)
730    }
731
732    /// Consume one rate-limit token for `actor` (ADR-0049). A no-op `Allowed` when
733    /// rate limiting is disabled. Both transports call this at their auth choke
734    /// point so the limiter is enforced by one implementation.
735    pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
736        self.rate_limiter.check(actor)
737    }
738
739    /// Whether the per-key rate limiter is active (lets a transport skip the work
740    /// entirely on the common, disabled path).
741    pub(crate) fn rate_limit_enabled(&self) -> bool {
742        self.rate_limiter.enabled()
743    }
744
745    // Run a **mutating** engine op behind the exclusive write lock, off the async
746    // runtime (the engine is synchronous and CPU/IO-bound). The single writer is
747    // unchanged from the prior single-mutex model (ADR-0006/0057).
748    async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
749    where
750        T: Send + 'static,
751        F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
752    {
753        let db = Arc::clone(&self.db);
754        tokio::task::spawn_blocking(move || -> Result<T, Error> {
755            let mut guard = db
756                .write()
757                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
758            f(&mut guard).map_err(Error::Engine)
759        })
760        .await
761        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
762    }
763
764    // Run a **read-only** engine op behind the shared read lock — many of these run
765    // concurrently (ADR-0057). The closure gets `&Database`, so it can only call the
766    // `&self` reads (`*_snapshot`, `fetch`, accessors).
767    async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
768    where
769        T: Send + 'static,
770        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
771    {
772        let db = Arc::clone(&self.db);
773        tokio::task::spawn_blocking(move || -> Result<T, Error> {
774            let guard = db
775                .read()
776                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
777            f(&guard).map_err(Error::Engine)
778        })
779        .await
780        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
781    }
782
783    // Run a snapshot read for `collection` (ADR-0057/0062). Takes only the shared
784    // read lock, so concurrent searches run in parallel. When a prior write deferred
785    // this collection's rebuild, the snapshot read serves the **prior** snapshot
786    // (snapshot-isolated, slightly stale) rather than blocking, and we schedule an
787    // **off-lock** rebuild so the next read is fresh — the reader never waits on a
788    // rebuild under the exclusive lock.
789    async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
790    where
791        T: Send + 'static,
792        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
793    {
794        let db = Arc::clone(&self.db);
795        let cells = Arc::clone(&self.snapshot_cells);
796        let coll = collection.clone();
797        let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
798            let guard = db
799                .read()
800                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
801            let result = f(&guard).map_err(Error::Engine)?;
802            // Report staleness so the caller can schedule a background rebuild; a
803            // missing collection (raced a drop) is simply "nothing to rebuild".
804            let stale = guard.needs_rebuild(&coll).unwrap_or(false);
805            // Warm the lock-free read cache (ADR-0064 increment 3): once an
806            // MVCC-served collection is fresh (its base snapshot is published), cache
807            // its cell so subsequent pure-vector reads skip the lock. The cell
808            // self-updates as the writer republishes, so it is never re-fetched.
809            if !stale
810                && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
811                && let Ok(mut map) = cells.write()
812            {
813                map.entry(coll.clone()).or_insert(cell);
814            }
815            Ok((result, stale))
816        })
817        .await
818        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
819        if stale {
820            self.schedule_rebuild(collection);
821        }
822        Ok(result)
823    }
824
825    // The cached lock-free snapshot cell for `collection`, if one has been warmed
826    // (ADR-0064 increment 3). The map read contends only with create/drop, never
827    // with a data write — that is what makes the pure-vector read lock-free.
828    fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
829        self.snapshot_cells.read().ok()?.get(collection).cloned()
830    }
831
832    // After an MVCC-mode data write, schedule an off-lock consolidation rebuild if
833    // the write left the overlay stale (ADR-0064 increment 3): pure-vector fast-path
834    // reads bypass the reader-driven scheduler, so the writer drives consolidation.
835    // A brief read-lock staleness check; the rebuild itself runs off-lock and
836    // single-flighted. No-op (and not called) when MVCC is off.
837    async fn schedule_if_stale(&self, collection: &str) {
838        let db = Arc::clone(&self.db);
839        let coll = collection.to_owned();
840        let stale = tokio::task::spawn_blocking(move || {
841            db.read()
842                .ok()
843                .and_then(|g| g.needs_rebuild(&coll).ok())
844                .unwrap_or(false)
845        })
846        .await
847        .unwrap_or(false);
848        if stale {
849            self.schedule_rebuild(collection.to_owned());
850        }
851    }
852
853    // Schedule a single off-lock rebuild for `collection` (ADR-0062), deduplicated
854    // so concurrent readers that all observe the same deferred rebuild start exactly
855    // one. A no-op if one is already in flight.
856    fn schedule_rebuild(&self, collection: String) {
857        {
858            let mut inflight = match self.rebuilding.lock() {
859                Ok(g) => g,
860                Err(_) => return,
861            };
862            if !inflight.insert(collection.clone()) {
863                return; // already rebuilding this collection
864            }
865        }
866        let state = self.clone();
867        tokio::spawn(async move {
868            state.run_rebuild(&collection).await;
869            if let Ok(mut inflight) = state.rebuilding.lock() {
870                inflight.remove(&collection);
871            }
872        });
873    }
874
875    // Drive a collection's off-lock rebuild to completion (ADR-0062): capture the
876    // inputs under the shared read lock, build with no lock held, install under a
877    // brief write lock, and repeat while a write landed during the build (the commit
878    // reports the collection still stale). Each phase is a `spawn_blocking` so the
879    // CPU-bound build never stalls the async runtime; errors end the attempt (the
880    // collection stays stale and the next read reschedules).
881    async fn run_rebuild(&self, collection: &str) {
882        loop {
883            let db = Arc::clone(&self.db);
884            let coll = collection.to_owned();
885            let inputs = tokio::task::spawn_blocking(move || {
886                let guard = db.read().ok()?;
887                guard.snapshot_rebuild_inputs(&coll).ok().flatten()
888            })
889            .await
890            .ok()
891            .flatten();
892            let Some(inputs) = inputs else { return };
893
894            let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
895                return;
896            };
897
898            let db = Arc::clone(&self.db);
899            let still_stale = tokio::task::spawn_blocking(move || {
900                let mut guard = db.write().ok()?;
901                guard.commit_rebuild(rebuilt).ok()
902            })
903            .await
904            .ok()
905            .flatten();
906            match still_stale {
907                Some(true) => continue, // a write landed during the build — rebuild again
908                _ => return,
909            }
910        }
911    }
912
913    // Authorize `action` on `resource`, recording a denial in the audit log
914    // before propagating it. The shared choke point for both transports.
915    fn authorize(
916        &self,
917        principal: &Principal,
918        action: Action,
919        op: &str,
920        resource: &str,
921    ) -> Result<(), Error> {
922        principal
923            .require(action, Some(resource))
924            .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
925    }
926
927    // Authorize a collection-agnostic operation (listing): only the role is
928    // checked; a denial is recorded against the `*` resource.
929    fn authorize_global(
930        &self,
931        principal: &Principal,
932        action: Action,
933        op: &str,
934    ) -> Result<(), Error> {
935        principal
936            .require(action, None)
937            .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
938    }
939
940    /// Open a replication stream (ADR-0030): authorize (admin), then — in a single
941    /// engine critical section so no commit can interleave — subscribe to the live
942    /// commit tail and snapshot current state. The caller streams the snapshot,
943    /// then the tail from the receiver; because the subscription is taken under the
944    /// same lock as the snapshot, every post-snapshot op arrives on the receiver
945    /// and none is missed or duplicated.
946    pub(crate) async fn open_replication(
947        &self,
948        principal: &Principal,
949    ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
950        self.authorize_global(principal, Action::Admin, "replicate")?;
951        let tx = self.replication_tx.clone();
952        self.read_blocking(move |db| {
953            let rx = tx.subscribe();
954            let snapshot = db.replication_snapshot()?;
955            Ok((snapshot, rx))
956        })
957        .await
958    }
959
960    /// Apply a replicated op received from the leader (ADR-0030). Internal to the
961    /// follower stream — deliberately NOT gated by `read_only`, which only refuses
962    /// *external* client writes.
963    pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
964        self.write_blocking(move |db| db.apply_replicated(op)).await
965    }
966
967    // Reject a write up front if this node is not its shard's Raft leader
968    // (ADR-0067), returning the leader's URL so a router redirects. Checking
969    // before preparing the op avoids a wasted local prepare on a follower (and a
970    // follower's stale local state silently producing a no-op). A leadership change
971    // between this check and the proposal is still caught: `client_write` then
972    // returns `ForwardToLeader`, mapped by `map_client_write_err`.
973    #[cfg(feature = "raft")]
974    fn ensure_raft_leader(&self, rs: &raft::RaftShard) -> Result<(), Error> {
975        let leader = rs.raft.metrics().borrow().current_leader;
976        if leader == Some(rs.node_id) {
977            Ok(())
978        } else {
979            Err(Error::NotLeader {
980                leader: leader.and_then(|id| rs.member_url(id)),
981            })
982        }
983    }
984
985    /// Add a voter to this node's Raft shard at runtime (ADR-0067 increment 4c,
986    /// admin only). Returns a bad-request error if this node is not a Raft shard or
987    /// the server was built without the `raft` feature.
988    pub(crate) async fn raft_add_voter(
989        &self,
990        principal: &Principal,
991        id: u64,
992        url: String,
993    ) -> Result<(), Error> {
994        self.authorize_global(principal, Action::Admin, "raft_add_voter")?;
995        #[cfg(feature = "raft")]
996        {
997            let rs = self
998                .raft
999                .clone()
1000                .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1001            return rs
1002                .add_voter(id, url)
1003                .await
1004                .map_err(|e| Error::Internal(format!("add raft voter: {e}")));
1005        }
1006        #[cfg(not(feature = "raft"))]
1007        {
1008            let _ = (id, url);
1009            Err(Error::BadRequest(
1010                "server built without the raft feature".to_owned(),
1011            ))
1012        }
1013    }
1014
1015    /// Remove a voter from this node's Raft shard at runtime (ADR-0067 increment 4c,
1016    /// admin only).
1017    pub(crate) async fn raft_remove_voter(
1018        &self,
1019        principal: &Principal,
1020        id: u64,
1021    ) -> Result<(), Error> {
1022        self.authorize_global(principal, Action::Admin, "raft_remove_voter")?;
1023        #[cfg(feature = "raft")]
1024        {
1025            let rs = self
1026                .raft
1027                .clone()
1028                .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1029            return rs
1030                .remove_voter(id)
1031                .await
1032                .map_err(|e| Error::Internal(format!("remove raft voter: {e}")));
1033        }
1034        #[cfg(not(feature = "raft"))]
1035        {
1036            let _ = id;
1037            Err(Error::BadRequest(
1038                "server built without the raft feature".to_owned(),
1039            ))
1040        }
1041    }
1042
1043    // Propose one prepared op through the shard's Raft group and wait for the
1044    // quorum commit + local apply (ADR-0067). The op is acknowledged only after it
1045    // is committed, so a leader failover never loses it.
1046    #[cfg(feature = "raft")]
1047    async fn raft_propose(&self, rs: &raft::RaftShard, op: WalOp) -> Result<(), Error> {
1048        rs.raft
1049            .client_write(op)
1050            .await
1051            .map(|_| ())
1052            .map_err(map_client_write_err)
1053    }
1054
1055    // Propose a batch of prepared ops in order, returning how many committed. Stops
1056    // at the first failure (e.g. a mid-batch leadership change) and surfaces it, so
1057    // a partial batch is reported honestly rather than as a silent success.
1058    #[cfg(feature = "raft")]
1059    async fn raft_propose_all(&self, rs: &raft::RaftShard, ops: Vec<WalOp>) -> Result<u64, Error> {
1060        let mut committed = 0u64;
1061        for op in ops {
1062            self.raft_propose(rs, op).await?;
1063            committed += 1;
1064        }
1065        Ok(committed)
1066    }
1067
1068    // Refuse a mutating operation on a read-only replication follower (ADR-0030);
1069    // its state is owned by the leader's stream, not by external clients.
1070    fn ensure_writable(&self, op: &str) -> Result<(), Error> {
1071        if self.read_only {
1072            return Err(Error::Forbidden(format!(
1073                "{op}: this node is a read-only replication follower"
1074            )));
1075        }
1076        Ok(())
1077    }
1078
1079    #[allow(clippy::too_many_arguments)]
1080    pub(crate) async fn create_collection(
1081        &self,
1082        principal: &Principal,
1083        name: String,
1084        dim: u32,
1085        metric: DistanceMetric,
1086        index: IndexSpec,
1087        filterable: Vec<FilterableField>,
1088        multivector: bool,
1089        vector_encryption: VectorEncryption,
1090    ) -> Result<CollectionInfo, Error> {
1091        self.ensure_writable("create_collection")?;
1092        self.authorize(principal, Action::Admin, "create_collection", &name)?;
1093        self.limits.check_dim(dim as usize)?;
1094        if let Some(c) = &self.cluster {
1095            return c
1096                .create_collection(
1097                    name,
1098                    dim,
1099                    metric,
1100                    index,
1101                    filterable,
1102                    multivector,
1103                    vector_encryption,
1104                )
1105                .await;
1106        }
1107        // Per-shard Raft write path (ADR-0067): the leader prepares the op (which
1108        // assigns the new collection id) and proposes it through consensus; every
1109        // voter applies it after the quorum commit. The `create_lock` serializes
1110        // concurrent creates so two cannot claim the same next id.
1111        #[cfg(feature = "raft")]
1112        if let Some(rs) = self.raft.clone() {
1113            self.ensure_raft_leader(&rs)?;
1114            let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1115                .with_index(index)
1116                .with_filterable(filterable.clone())
1117                .with_multivector(multivector)
1118                .with_vector_encryption(vector_encryption);
1119            let _guard = rs.create_lock.lock().await;
1120            let prep_name = name.clone();
1121            let result = match self
1122                .read_blocking(move |db| db.prepare_create_collection(&prep_name, &descriptor))
1123                .await
1124            {
1125                Ok(op) => self.raft_propose(&rs, op).await,
1126                Err(e) => Err(e),
1127            };
1128            self.audit.record(
1129                principal.actor(),
1130                "create_collection",
1131                &name,
1132                Outcome::of(&result),
1133            );
1134            result?;
1135            return Ok(CollectionInfo {
1136                name,
1137                dim,
1138                metric,
1139                count: 0,
1140                index,
1141                filterable,
1142                multivector,
1143                vector_encryption,
1144            });
1145        }
1146        let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1147            .with_index(index)
1148            .with_filterable(filterable.clone())
1149            .with_multivector(multivector)
1150            .with_vector_encryption(vector_encryption);
1151        let owned = name.clone();
1152        let result = self
1153            .write_blocking(move |db| db.create_collection(&owned, descriptor))
1154            .await;
1155        self.audit.record(
1156            principal.actor(),
1157            "create_collection",
1158            &name,
1159            Outcome::of(&result),
1160        );
1161        result?;
1162        Ok(CollectionInfo {
1163            name,
1164            dim,
1165            metric,
1166            count: 0,
1167            index,
1168            filterable,
1169            multivector,
1170            vector_encryption,
1171        })
1172    }
1173
1174    pub(crate) async fn get_collection(
1175        &self,
1176        principal: &Principal,
1177        name: String,
1178    ) -> Result<CollectionInfo, Error> {
1179        self.authorize(principal, Action::Read, "get_collection", &name)?;
1180        self.read_blocking(move |db| {
1181            let descriptor = db
1182                .descriptor(&name)
1183                .cloned()
1184                .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
1185            // A multi-vector collection reports its document count, not its
1186            // (much larger) token-row count.
1187            let count = if descriptor.multivector {
1188                db.document_count(&name)? as u64
1189            } else {
1190                db.len(&name)? as u64
1191            };
1192            Ok(CollectionInfo {
1193                name,
1194                dim: descriptor.dim,
1195                metric: descriptor.metric,
1196                count,
1197                index: descriptor.index,
1198                filterable: descriptor.filterable,
1199                multivector: descriptor.multivector,
1200                vector_encryption: descriptor.vector_encryption,
1201            })
1202        })
1203        .await
1204    }
1205
1206    pub(crate) async fn list_collections(
1207        &self,
1208        principal: &Principal,
1209    ) -> Result<Vec<CollectionInfo>, Error> {
1210        self.authorize_global(principal, Action::Read, "list_collections")?;
1211        let mut infos = self
1212            .read_blocking(|db| {
1213                let mut out = Vec::new();
1214                for name in db.collection_names() {
1215                    if let Some(descriptor) = db.descriptor(&name).cloned() {
1216                        let count = if descriptor.multivector {
1217                            db.document_count(&name)? as u64
1218                        } else {
1219                            db.len(&name)? as u64
1220                        };
1221                        out.push(CollectionInfo {
1222                            name,
1223                            dim: descriptor.dim,
1224                            metric: descriptor.metric,
1225                            count,
1226                            index: descriptor.index,
1227                            filterable: descriptor.filterable,
1228                            multivector: descriptor.multivector,
1229                            vector_encryption: descriptor.vector_encryption,
1230                        });
1231                    }
1232                }
1233                Ok(out)
1234            })
1235            .await?;
1236        // Never reveal collections outside the caller's scope.
1237        infos.retain(|info| principal.can_see(&info.name));
1238        Ok(infos)
1239    }
1240
1241    pub(crate) async fn delete_collection(
1242        &self,
1243        principal: &Principal,
1244        name: String,
1245    ) -> Result<bool, Error> {
1246        self.ensure_writable("delete_collection")?;
1247        self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1248        if let Some(c) = &self.cluster {
1249            return c.drop_collection(&name).await;
1250        }
1251        let resource = name.clone();
1252        let result = self
1253            .write_blocking(move |db| db.drop_collection(&name))
1254            .await;
1255        self.audit.record(
1256            principal.actor(),
1257            "delete_collection",
1258            &resource,
1259            Outcome::of(&result),
1260        );
1261        // Evict the dropped collection's cached lock-free cell (ADR-0064 increment 3)
1262        // so a later same-named collection never serves a stale cell.
1263        if matches!(result, Ok(true))
1264            && let Ok(mut map) = self.snapshot_cells.write()
1265        {
1266            map.remove(&resource);
1267        }
1268        result
1269    }
1270
1271    #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1272    pub(crate) async fn upsert(
1273        &self,
1274        principal: &Principal,
1275        collection: String,
1276        points: Vec<PointIn>,
1277    ) -> Result<u64, Error> {
1278        self.ensure_writable("upsert")?;
1279        self.authorize(principal, Action::Write, "upsert", &collection)?;
1280        self.limits.check_batch(points.len())?;
1281        for p in &points {
1282            self.limits.check_vector_len(p.vector.len())?;
1283            self.limits.check_payload(&p.payload)?;
1284        }
1285        if let Some(c) = &self.cluster {
1286            return c.upsert(&collection, points).await;
1287        }
1288        // Per-shard Raft write path (ADR-0067): prepare each point's op on the
1289        // leader, then propose each through consensus. One op per point keeps the
1290        // apply seam identical to a direct write; a batched Raft entry is a later
1291        // optimization (ponytail: correctness first, opt-in path).
1292        #[cfg(feature = "raft")]
1293        if let Some(rs) = self.raft.clone() {
1294            self.ensure_raft_leader(&rs)?;
1295            let prep = collection.clone();
1296            let result = match self
1297                .read_blocking(move |db| {
1298                    points
1299                        .iter()
1300                        .map(|p| db.prepare_upsert(&prep, &p.id, &p.vector, &p.payload))
1301                        .collect::<quiver_embed::Result<Vec<_>>>()
1302                })
1303                .await
1304            {
1305                Ok(ops) => self.raft_propose_all(&rs, ops).await,
1306                Err(e) => Err(e),
1307            };
1308            self.audit.record(
1309                principal.actor(),
1310                "upsert",
1311                &collection,
1312                Outcome::of(&result),
1313            );
1314            return result;
1315        }
1316        let resource = collection.clone();
1317        let result = self
1318            .write_blocking(move |db| {
1319                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1320                    .iter()
1321                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1322                    .collect();
1323                db.upsert_batch(&collection, &records)
1324            })
1325            .await;
1326        self.audit
1327            .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1328        if self.mvcc && result.is_ok() {
1329            self.schedule_if_stale(&resource).await;
1330        }
1331        result
1332    }
1333
1334    // Bulk upsert for a load-then-query workload (ADR-0045): one WAL fsync plus a
1335    // single deferred index-build pass, with the larger `max_bulk_batch_size` cap.
1336    pub(crate) async fn upsert_bulk(
1337        &self,
1338        principal: &Principal,
1339        collection: String,
1340        points: Vec<PointIn>,
1341    ) -> Result<u64, Error> {
1342        self.ensure_writable("upsert")?;
1343        self.authorize(principal, Action::Write, "upsert", &collection)?;
1344        self.limits.check_bulk_batch(points.len())?;
1345        for p in &points {
1346            self.limits.check_vector_len(p.vector.len())?;
1347            self.limits.check_payload(&p.payload)?;
1348        }
1349        if let Some(c) = &self.cluster {
1350            return c.upsert_bulk(&collection, points).await;
1351        }
1352        let resource = collection.clone();
1353        let result = self
1354            .write_blocking(move |db| {
1355                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1356                    .iter()
1357                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1358                    .collect();
1359                db.upsert_bulk(&collection, &records)
1360            })
1361            .await;
1362        self.audit.record(
1363            principal.actor(),
1364            "upsert_bulk",
1365            &resource,
1366            Outcome::of(&result),
1367        );
1368        if self.mvcc && result.is_ok() {
1369            self.schedule_if_stale(&resource).await;
1370        }
1371        result
1372    }
1373
1374    /// Take a consistent online snapshot of the whole database into a
1375    /// server-local `destination` directory (ADR-0050). A global admin
1376    /// operation; runs the checkpoint + copy on the blocking pool.
1377    #[tracing::instrument(skip_all)]
1378    pub(crate) async fn snapshot(
1379        &self,
1380        principal: &Principal,
1381        destination: String,
1382    ) -> Result<SnapshotInfo, Error> {
1383        self.ensure_writable("snapshot")?;
1384        self.authorize_global(principal, Action::Admin, "snapshot")?;
1385        let dest = std::path::PathBuf::from(&destination);
1386        let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1387        self.audit.record(
1388            principal.actor(),
1389            "snapshot",
1390            &destination,
1391            Outcome::of(&result),
1392        );
1393        result
1394    }
1395
1396    pub(crate) async fn delete_points(
1397        &self,
1398        principal: &Principal,
1399        collection: String,
1400        ids: Vec<String>,
1401    ) -> Result<u64, Error> {
1402        self.ensure_writable("delete_points")?;
1403        self.authorize(principal, Action::Write, "delete_points", &collection)?;
1404        if let Some(c) = &self.cluster {
1405            return c.delete_points(&collection, ids).await;
1406        }
1407        // Per-shard Raft write path (ADR-0067): prepare each delete on the leader
1408        // (`None` for an absent point), then propose the present ones through
1409        // consensus. The returned count is the number actually deleted.
1410        #[cfg(feature = "raft")]
1411        if let Some(rs) = self.raft.clone() {
1412            self.ensure_raft_leader(&rs)?;
1413            let prep = collection.clone();
1414            let result = match self
1415                .read_blocking(move |db| {
1416                    ids.iter()
1417                        .map(|id| db.prepare_delete(&prep, id))
1418                        .collect::<quiver_embed::Result<Vec<Option<WalOp>>>>()
1419                })
1420                .await
1421            {
1422                Ok(ops) => {
1423                    self.raft_propose_all(&rs, ops.into_iter().flatten().collect())
1424                        .await
1425                }
1426                Err(e) => Err(e),
1427            };
1428            self.audit.record(
1429                principal.actor(),
1430                "delete_points",
1431                &collection,
1432                Outcome::of(&result),
1433            );
1434            return result;
1435        }
1436        let resource = collection.clone();
1437        let result = self
1438            .write_blocking(move |db| {
1439                let mut count = 0u64;
1440                for id in &ids {
1441                    if db.delete(&collection, id)? {
1442                        count += 1;
1443                    }
1444                }
1445                Ok(count)
1446            })
1447            .await;
1448        self.audit.record(
1449            principal.actor(),
1450            "delete_points",
1451            &resource,
1452            Outcome::of(&result),
1453        );
1454        if self.mvcc && result.is_ok() {
1455            self.schedule_if_stale(&resource).await;
1456        }
1457        result
1458    }
1459
1460    pub(crate) async fn get_points(
1461        &self,
1462        principal: &Principal,
1463        collection: String,
1464        ids: Vec<String>,
1465        with_vector: bool,
1466    ) -> Result<Vec<PointOut>, Error> {
1467        self.authorize(principal, Action::Read, "get_points", &collection)?;
1468        if let Some(c) = &self.cluster {
1469            return c.get_points(&collection, ids, with_vector).await;
1470        }
1471        self.read_blocking(move |db| {
1472            let mut out = Vec::new();
1473            for id in &ids {
1474                if let Some(m) = db.get(&collection, id)? {
1475                    out.push(PointOut {
1476                        id: m.id,
1477                        vector: if with_vector { m.vector } else { None },
1478                        payload: m.payload.unwrap_or(Value::Null),
1479                    });
1480                }
1481            }
1482            Ok(out)
1483        })
1484        .await
1485    }
1486
1487    #[allow(clippy::too_many_arguments)]
1488    #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1489    pub(crate) async fn search(
1490        &self,
1491        principal: &Principal,
1492        collection: String,
1493        vector: Vec<f32>,
1494        k: usize,
1495        filter: Option<Filter>,
1496        ef_search: usize,
1497        with_payload: bool,
1498        with_vector: bool,
1499    ) -> Result<Vec<MatchOut>, Error> {
1500        self.authorize(principal, Action::Read, "search", &collection)?;
1501        self.limits.check_search(k, ef_search)?;
1502        self.limits.check_vector_len(vector.len())?;
1503
1504        // Cluster router (ADR-0065): scatter-gather across the shards, merge top-k.
1505        if let Some(c) = &self.cluster {
1506            return c
1507                .search(
1508                    &collection,
1509                    vector,
1510                    k,
1511                    filter,
1512                    ef_search,
1513                    with_payload,
1514                    with_vector,
1515                )
1516                .await;
1517        }
1518
1519        // Lock-free fast path (ADR-0064 increment 3): a **pure-vector** read of an
1520        // MVCC-served collection loads the cached snapshot cell and searches it with
1521        // **no database lock**, so it never blocks on a concurrent writer. Payload,
1522        // filter, and vector results need the store (behind the lock) and fall
1523        // through to `search_blocking`.
1524        if filter.is_none()
1525            && !with_payload
1526            && !with_vector
1527            && let Some(cell) = self.cached_cell(&collection)
1528        {
1529            return tokio::task::spawn_blocking(move || {
1530                let matches = cell.load().search(&vector, k, ef_search)?;
1531                Ok::<_, quiver_embed::Error>(
1532                    matches
1533                        .into_iter()
1534                        .map(|m| MatchOut {
1535                            id: m.id,
1536                            score: m.score,
1537                            payload: None,
1538                            vector: None,
1539                        })
1540                        .collect(),
1541                )
1542            })
1543            .await
1544            .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1545            .map_err(Error::Engine);
1546        }
1547
1548        let params = SearchParams {
1549            k,
1550            filter,
1551            ef_search,
1552            with_payload,
1553            with_vector,
1554        };
1555        let coll = collection.clone();
1556        self.search_blocking(coll, move |db| {
1557            let matches = db.search_snapshot(&collection, &vector, &params)?;
1558            Ok(matches
1559                .into_iter()
1560                .map(|m| MatchOut {
1561                    id: m.id,
1562                    score: m.score,
1563                    payload: m.payload,
1564                    vector: m.vector,
1565                })
1566                .collect())
1567        })
1568        .await
1569    }
1570
1571    #[allow(clippy::too_many_arguments)]
1572    pub(crate) async fn hybrid_search(
1573        &self,
1574        principal: &Principal,
1575        collection: String,
1576        dense: Option<Vec<f32>>,
1577        sparse: Option<(Vec<u32>, Vec<f32>)>,
1578        text: Option<String>,
1579        k: usize,
1580        filter: Option<Filter>,
1581        ef_search: usize,
1582        rrf_k0: f32,
1583        with_payload: bool,
1584        with_vector: bool,
1585    ) -> Result<Vec<MatchOut>, Error> {
1586        self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1587        self.limits.check_search(k, ef_search)?;
1588        if let Some(v) = &dense {
1589            self.limits.check_vector_len(v.len())?;
1590        }
1591        if let Some((indices, values)) = &sparse {
1592            self.limits.check_sparse_terms(indices.len())?;
1593            if indices.len() != values.len() {
1594                return Err(Error::BadRequest(format!(
1595                    "sparse query indices ({}) and values ({}) length mismatch",
1596                    indices.len(),
1597                    values.len()
1598                )));
1599            }
1600        }
1601        let params = SearchParams {
1602            k,
1603            filter,
1604            ef_search,
1605            with_payload,
1606            with_vector,
1607        };
1608        let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1609        let coll = collection.clone();
1610        self.search_blocking(coll, move |db| {
1611            let matches = db.hybrid_search_snapshot(
1612                &collection,
1613                dense.as_deref(),
1614                sv.as_ref(),
1615                text.as_deref(),
1616                &params,
1617                rrf_k0,
1618            )?;
1619            Ok(matches
1620                .into_iter()
1621                .map(|m| MatchOut {
1622                    id: m.id,
1623                    score: m.score,
1624                    payload: m.payload,
1625                    vector: m.vector,
1626                })
1627                .collect())
1628        })
1629        .await
1630    }
1631
1632    /// Embed `text` with the collection's configured provider and run a dense (or
1633    /// dense ⊕ BM25, if the collection has text) search, optionally reranking the
1634    /// candidates in one call (ADR-0047). The text is also passed to the BM25 side,
1635    /// so a `upsert_text` corpus yields hybrid lexical+semantic retrieval for free.
1636    #[allow(clippy::too_many_arguments)]
1637    pub(crate) async fn search_text(
1638        &self,
1639        principal: &Principal,
1640        collection: String,
1641        text: String,
1642        k: usize,
1643        filter: Option<Filter>,
1644        ef_search: usize,
1645        rrf_k0: f32,
1646        with_payload: bool,
1647        with_vector: bool,
1648        rerank: bool,
1649    ) -> Result<Vec<MatchOut>, Error> {
1650        self.authorize(principal, Action::Read, "search_text", &collection)?;
1651        self.limits.check_search(k, ef_search)?;
1652        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1653            Error::BadRequest(format!(
1654                "collection {collection:?} has no embedding provider configured \
1655                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1656            ))
1657        })?;
1658        // Embed off the async runtime: the provider call is blocking network I/O.
1659        let query = text.clone();
1660        let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1661            .await
1662            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1663            .map_err(|e| Error::Upstream(e.to_string()))?
1664            .into_iter()
1665            .next()
1666            .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1667        self.limits.check_vector_len(vector.len())?;
1668
1669        let reranker = if rerank {
1670            self.embed.reranker(&collection)
1671        } else {
1672            None
1673        };
1674        // Over-fetch when reranking so the reranker can reorder a wide candidate set
1675        // down to the requested `k`; fetch payloads when reranking (we need the doc
1676        // text) even if the caller did not ask for them.
1677        let need_payload = with_payload || reranker.is_some();
1678        let fetch_k = if reranker.is_some() {
1679            k.max(RERANK_CANDIDATES)
1680        } else {
1681            k
1682        };
1683
1684        let mut hits = self
1685            .hybrid_search(
1686                principal,
1687                collection,
1688                Some(vector),
1689                None,
1690                Some(text.clone()),
1691                fetch_k,
1692                filter,
1693                ef_search,
1694                rrf_k0,
1695                need_payload,
1696                with_vector,
1697            )
1698            .await?;
1699
1700        if let Some(rr) = reranker {
1701            let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1702            let query = text;
1703            let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1704                .await
1705                .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1706                .map_err(|e| Error::Upstream(e.to_string()))?;
1707            // Re-score each hit and sort by the rerank score, descending.
1708            let mut scored: Vec<(f32, MatchOut)> = scores
1709                .into_iter()
1710                .zip(hits)
1711                .map(|(s, mut h)| {
1712                    h.score = s;
1713                    (s, h)
1714                })
1715                .collect();
1716            scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1717            hits = scored.into_iter().map(|(_, h)| h).collect();
1718        }
1719
1720        hits.truncate(k);
1721        // Drop payloads we only fetched for reranking if the caller didn't want them.
1722        if !with_payload {
1723            for h in &mut hits {
1724                h.payload = None;
1725            }
1726        }
1727        Ok(hits)
1728    }
1729
1730    /// Embed each point's `text` with the collection's provider and upsert it as a
1731    /// dense point, co-populating the `__quiver_text__` payload key (ADR-0046) so the
1732    /// same text is indexed for BM25 — one call feeds both the dense and lexical
1733    /// sides (ADR-0047).
1734    pub(crate) async fn upsert_text(
1735        &self,
1736        principal: &Principal,
1737        collection: String,
1738        points: Vec<TextPointIn>,
1739    ) -> Result<u64, Error> {
1740        self.ensure_writable("upsert_text")?;
1741        self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1742        self.limits.check_batch(points.len())?;
1743        for p in &points {
1744            if !matches!(p.payload, Value::Object(_) | Value::Null) {
1745                return Err(Error::BadRequest(
1746                    "upsert_text payload must be a JSON object or null".to_owned(),
1747                ));
1748            }
1749        }
1750        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1751            Error::BadRequest(format!(
1752                "collection {collection:?} has no embedding provider configured \
1753                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1754            ))
1755        })?;
1756        let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1757        let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1758            .await
1759            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1760            .map_err(|e| Error::Upstream(e.to_string()))?;
1761        if vectors.len() != points.len() {
1762            return Err(Error::Upstream(format!(
1763                "embedding provider returned {} vectors for {} inputs",
1764                vectors.len(),
1765                points.len()
1766            )));
1767        }
1768        let dense: Vec<PointIn> = points
1769            .into_iter()
1770            .zip(vectors)
1771            .map(|(p, vector)| {
1772                let mut payload = match p.payload {
1773                    Value::Object(map) => map,
1774                    _ => serde_json::Map::new(),
1775                };
1776                // Don't clobber a caller-supplied text key.
1777                payload
1778                    .entry(TEXT_KEY.to_owned())
1779                    .or_insert_with(|| Value::String(p.text.clone()));
1780                PointIn {
1781                    id: p.id,
1782                    vector,
1783                    payload: Value::Object(payload),
1784                }
1785            })
1786            .collect();
1787        self.upsert(principal, collection, dense).await
1788    }
1789
1790    #[allow(clippy::too_many_arguments)]
1791    pub(crate) async fn fetch(
1792        &self,
1793        principal: &Principal,
1794        collection: String,
1795        filter: Option<Filter>,
1796        offset: usize,
1797        limit: usize,
1798        with_payload: bool,
1799        with_vector: bool,
1800    ) -> Result<Vec<MatchOut>, Error> {
1801        self.authorize(principal, Action::Read, "fetch", &collection)?;
1802        self.limits.check_fetch(limit)?;
1803        self.read_blocking(move |db| {
1804            let matches = db.fetch(
1805                &collection,
1806                filter.as_ref(),
1807                offset,
1808                limit,
1809                with_payload,
1810                with_vector,
1811            )?;
1812            Ok(matches
1813                .into_iter()
1814                .map(|m| MatchOut {
1815                    id: m.id,
1816                    score: m.score,
1817                    payload: m.payload,
1818                    vector: m.vector,
1819                })
1820                .collect())
1821        })
1822        .await
1823    }
1824
1825    pub(crate) async fn upsert_documents(
1826        &self,
1827        principal: &Principal,
1828        collection: String,
1829        documents: Vec<DocumentIn>,
1830    ) -> Result<u64, Error> {
1831        self.ensure_writable("upsert_documents")?;
1832        self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1833        self.limits.check_batch(documents.len())?;
1834        for doc in &documents {
1835            self.limits.check_payload(&doc.payload)?;
1836            for token in &doc.vectors {
1837                self.limits.check_vector_len(token.len())?;
1838            }
1839        }
1840        let resource = collection.clone();
1841        let result = self
1842            .write_blocking(move |db| {
1843                let mut count = 0u64;
1844                for doc in &documents {
1845                    db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1846                    count += 1;
1847                }
1848                Ok(count)
1849            })
1850            .await;
1851        self.audit.record(
1852            principal.actor(),
1853            "upsert_documents",
1854            &resource,
1855            Outcome::of(&result),
1856        );
1857        result
1858    }
1859
1860    pub(crate) async fn delete_documents(
1861        &self,
1862        principal: &Principal,
1863        collection: String,
1864        ids: Vec<String>,
1865    ) -> Result<u64, Error> {
1866        self.ensure_writable("delete_documents")?;
1867        self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1868        let resource = collection.clone();
1869        let result = self
1870            .write_blocking(move |db| {
1871                let mut count = 0u64;
1872                for id in &ids {
1873                    if db.delete_document(&collection, id)? {
1874                        count += 1;
1875                    }
1876                }
1877                Ok(count)
1878            })
1879            .await;
1880        self.audit.record(
1881            principal.actor(),
1882            "delete_documents",
1883            &resource,
1884            Outcome::of(&result),
1885        );
1886        result
1887    }
1888
1889    #[allow(clippy::too_many_arguments)]
1890    pub(crate) async fn search_multi_vector(
1891        &self,
1892        principal: &Principal,
1893        collection: String,
1894        query: Vec<Vec<f32>>,
1895        k: usize,
1896        filter: Option<Filter>,
1897        ef_search: usize,
1898        with_payload: bool,
1899        with_vector: bool,
1900    ) -> Result<Vec<DocumentMatchOut>, Error> {
1901        self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1902        self.limits.check_search(k, ef_search)?;
1903        for token in &query {
1904            self.limits.check_vector_len(token.len())?;
1905        }
1906        let params = SearchParams {
1907            k,
1908            filter,
1909            ef_search,
1910            with_payload,
1911            with_vector,
1912        };
1913        let coll = collection.clone();
1914        self.search_blocking(coll, move |db| {
1915            let matches = db.search_multi_vector_snapshot(&collection, &query, &params)?;
1916            Ok(matches
1917                .into_iter()
1918                .map(|m| DocumentMatchOut {
1919                    id: m.id,
1920                    score: m.score,
1921                    payload: m.payload,
1922                    vectors: m.vectors,
1923                })
1924                .collect())
1925        })
1926        .await
1927    }
1928}
1929
1930/// How many recently-committed ops the leader buffers for replication followers
1931/// (ADR-0030). A follower that falls further behind than this re-bootstraps.
1932const REPLICATION_BUFFER: usize = 1024;
1933
1934/// Run the server from `config` until a shutdown signal (Ctrl-C).
1935pub async fn run(config: Config) -> Result<(), Error> {
1936    config.validate()?;
1937    let rest_listener = TcpListener::bind(config.rest_addr)
1938        .await
1939        .map_err(Error::Io)?;
1940    let grpc_listener = TcpListener::bind(config.grpc_addr)
1941        .await
1942        .map_err(Error::Io)?;
1943    tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1944    tokio::select! {
1945        result = serve(config, rest_listener, grpc_listener) => result,
1946        () = shutdown_signal() => {
1947            tracing::info!("shutdown signal received");
1948            Ok(())
1949        }
1950    }
1951}
1952
1953/// Serve REST and gRPC on the given (already-bound) listeners until a transport
1954/// error. Exposed so tests can bind ephemeral ports.
1955pub async fn serve(
1956    config: Config,
1957    rest_listener: TcpListener,
1958    grpc_listener: TcpListener,
1959) -> Result<(), Error> {
1960    // Coordinator mode (ADR-0066) is a wholly separate, data-plane-free service: it
1961    // owns the versioned shard map and serves the membership API on the REST
1962    // listener. It opens no database and runs no gRPC.
1963    if config.coordinator {
1964        drop(grpc_listener);
1965        return coordinator::serve_coordinator(config, rest_listener).await;
1966    }
1967    let mut db = open_database(&config)?;
1968    let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1969    // Publish every committed op to replication followers (ADR-0030). The observer
1970    // runs inside the engine's write critical section; `broadcast::Sender::send` is
1971    // non-blocking, so it never stalls a write.
1972    let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1973    {
1974        let tx = replication_tx.clone();
1975        db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1976            let _ = tx.send(entry.clone());
1977        }));
1978    }
1979    // Build the opt-in embedding/rerank providers, resolving each `api_key_env`
1980    // from the environment now (ADR-0047) so a missing key fails fast at startup
1981    // rather than on the first request.
1982    let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1983        .map_err(|e| Error::Config(e.to_string()))?;
1984
1985    // Enable lock-free MVCC reads if requested (ADR-0064): `config.mvcc_reads` or
1986    // `QUIVER_MVCC_READS` (the latter also defaults the engine flag at open).
1987    if config.mvcc_reads {
1988        db.set_mvcc_reads(true);
1989    }
1990    let mvcc = db.mvcc_reads();
1991
1992    // Opt-in cluster router (ADR-0065): build the shard fan-out when configured.
1993    let cluster = if config.cluster_shards.is_empty() {
1994        None
1995    } else {
1996        let c = cluster::Cluster::new(
1997            config.cluster_shards.clone(),
1998            config.cluster_replicas.clone(),
1999            config.cluster_shard_key.clone(),
2000        )?;
2001        tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
2002        let c = Arc::new(c);
2003        // With a coordinator (ADR-0066), refresh the shard map from it so a
2004        // membership change propagates without a restart. The seed above is the
2005        // bootstrap map; the coordinator's map (≥ version 0) takes over on refresh.
2006        if let Some(coord) = config.coordinator_url.clone() {
2007            let router = c.clone();
2008            tokio::spawn(async move {
2009                loop {
2010                    if let Err(e) = router.refresh_from(&coord).await {
2011                        tracing::debug!(error = %e, "shard-map refresh failed; will retry");
2012                    }
2013                    tokio::time::sleep(cluster::MAP_REFRESH_INTERVAL).await;
2014                }
2015            });
2016        }
2017        Some(c)
2018    };
2019
2020    // The shared engine handle. Wrapped before `AppState` so the Raft applier can
2021    // hold the same handle (it drives `apply_replicated` on commit) without
2022    // referencing `AppState` back — see `raft::EngineApplier`.
2023    let db = Arc::new(RwLock::new(db));
2024
2025    // Opt-in per-shard Raft write HA (ADR-0067): when this node has a Raft id, join
2026    // its shard's group over the gRPC transport. The applier holds `db` directly,
2027    // so the state machine does not reference the Raft group it lives in.
2028    #[cfg(feature = "raft")]
2029    let raft = if let Some(node_id) = config.raft_node_id {
2030        let members = parse_raft_members(&config.raft_members)?;
2031        let applier = raft::EngineApplier::new(Arc::clone(&db));
2032        // The Raft log persists under the data dir, beside the engine (ADR-0067 4c).
2033        let log_dir = config.data_dir.join("raft");
2034        let shard = raft::start_member(node_id, members, applier, &log_dir)
2035            .await
2036            .map_err(|e| Error::Config(format!("raft startup: {e}")))?;
2037        tracing::info!(node_id, "per-shard raft write HA enabled");
2038        Some(Arc::new(shard))
2039    } else {
2040        None
2041    };
2042
2043    let state = AppState {
2044        db,
2045        keys: Arc::new(config.api_keys.clone()),
2046        audit,
2047        replication_tx,
2048        read_only: config.leader_url.is_some(),
2049        limits: config.limits,
2050        embed: Arc::new(embed),
2051        rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
2052        metrics: Arc::new(metrics::Metrics::default()),
2053        rebuilding: Arc::new(Mutex::new(HashSet::new())),
2054        snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
2055        mvcc,
2056        cluster,
2057        #[cfg(feature = "raft")]
2058        raft,
2059    };
2060
2061    // A follower continuously applies the leader's committed-op stream (ADR-0030).
2062    if let Some(leader_url) = config.leader_url.clone() {
2063        replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
2064    }
2065
2066    let app = rest::router(state.clone());
2067    // Mount the per-shard Raft transport on the same gRPC server (ADR-0067): build
2068    // it before `state` is consumed by `grpc::service`.
2069    #[cfg(feature = "raft")]
2070    let raft_service = state
2071        .raft
2072        .as_ref()
2073        .map(|rs| raft::grpc::RaftRpc::service(rs.raft.clone()));
2074    let grpc = grpc::service(state);
2075
2076    let tls = load_tls(&config)?;
2077
2078    // REST: terminate TLS with axum-server when configured, else serve plaintext.
2079    let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
2080        Some(material) => {
2081            let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
2082            let std_listener = rest_listener.into_std().map_err(Error::Io)?;
2083            let server =
2084                axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
2085            Box::pin(async move {
2086                server
2087                    .serve(app.into_make_service())
2088                    .await
2089                    .map_err(Error::Io)
2090            })
2091        }
2092        None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
2093    };
2094
2095    // gRPC: tonic terminates TLS itself (ring provider) when an identity is set.
2096    let mut grpc_builder = tonic::transport::Server::builder();
2097    if let Some(material) = &tls {
2098        let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
2099        let mut tls_config = ServerTlsConfig::new().identity(identity);
2100        // Require client certificates chaining to the configured CA (mTLS).
2101        if let Some(ca_pem) = &material.client_ca_pem {
2102            tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
2103        }
2104        grpc_builder = grpc_builder
2105            .tls_config(tls_config)
2106            .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
2107    }
2108    let grpc_fut = async move {
2109        let routes = grpc_builder.add_service(grpc);
2110        // Add the Raft service when this node runs a Raft group; a no-op otherwise.
2111        #[cfg(feature = "raft")]
2112        let routes = routes.add_optional_service(raft_service);
2113        routes
2114            .serve_with_incoming(TcpListenerStream::new(grpc_listener))
2115            .await
2116            .map_err(|e| Error::Internal(format!("grpc server: {e}")))
2117    };
2118
2119    tokio::try_join!(rest_fut, grpc_fut)?;
2120    Ok(())
2121}
2122
2123async fn shutdown_signal() {
2124    let _ = tokio::signal::ctrl_c().await;
2125}
2126
2127// Parse `["<id>=<url>", …]` Raft member entries into an id→URL map (ADR-0067).
2128// Mirrors the `cluster_replicas` `<idx>=<url>` form; the bracketed
2129// `QUIVER_RAFT_MEMBERS` env array reaches here as these strings. A node opting
2130// into Raft must name a non-empty member set.
2131#[cfg(feature = "raft")]
2132fn parse_raft_members(
2133    entries: &[String],
2134) -> Result<std::collections::BTreeMap<u64, String>, Error> {
2135    let mut members = std::collections::BTreeMap::new();
2136    for entry in entries {
2137        let (id, url) = entry
2138            .split_once('=')
2139            .ok_or_else(|| Error::Config(format!("raft member '{entry}' must be '<id>=<url>'")))?;
2140        let id: u64 = id
2141            .trim()
2142            .parse()
2143            .map_err(|_| Error::Config(format!("raft member id '{id}' is not a number")))?;
2144        members.insert(id, url.trim().to_owned());
2145    }
2146    if members.is_empty() {
2147        return Err(Error::Config(
2148            "raft_node_id is set but raft_members is empty".to_owned(),
2149        ));
2150    }
2151    Ok(members)
2152}
2153
2154// Map an openraft `client_write` failure to a server [`Error`]. A `ForwardToLeader`
2155// (this node is not the leader) becomes a [`Error::NotLeader`] carrying the
2156// leader's URL so a router redirects; anything else is an internal fault.
2157#[cfg(feature = "raft")]
2158fn map_client_write_err(
2159    err: openraft::error::RaftError<
2160        u64,
2161        openraft::error::ClientWriteError<u64, openraft::BasicNode>,
2162    >,
2163) -> Error {
2164    use openraft::error::{ClientWriteError, RaftError};
2165    match err {
2166        RaftError::APIError(ClientWriteError::ForwardToLeader(f)) => Error::NotLeader {
2167            leader: f.leader_node.map(|n| n.addr),
2168        },
2169        other => Error::Internal(format!("raft write failed: {other}")),
2170    }
2171}
2172
2173// Open the engine, enabling encryption-at-rest when a key is configured. The
2174// configured key is the **master key** of an envelope key-ring (ADR-0010): it
2175// wraps a per-collection data-encryption key, so dropping a collection
2176// crypto-shreds it. With no key (only valid in `insecure` mode, enforced by
2177// `Config::validate`) the engine is opened in plaintext.
2178fn open_database(config: &Config) -> Result<Database, Error> {
2179    let master_key = config.master_key_hex()?;
2180    let keyring =
2181        quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
2182            .map_err(|e| Error::Config(e.to_string()))?;
2183    let db = match keyring {
2184        Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
2185        None => Database::open(&config.data_dir)?,
2186    };
2187    Ok(db)
2188}
2189
2190// TLS material shared by both transports: the raw PEM (for tonic's `Identity`
2191// and `client_ca_root`) and a parsed rustls server config (for axum-server's
2192// REST acceptor). `client_ca_pem` is set only when mutual TLS is configured.
2193struct TlsMaterial {
2194    cert_pem: Vec<u8>,
2195    key_pem: Vec<u8>,
2196    client_ca_pem: Option<Vec<u8>>,
2197    rest_config: Arc<rustls::ServerConfig>,
2198}
2199
2200// Read the configured certificate, key, and optional client CA, returning `None`
2201// when TLS is not configured. `Config::validate` already enforces that the cert
2202// and key are set together, that a client CA requires them, and that a
2203// non-loopback bind requires TLS.
2204fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
2205    match (&config.tls_cert, &config.tls_key) {
2206        (Some(cert_path), Some(key_path)) => {
2207            let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
2208            let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
2209            let client_ca_pem = config
2210                .tls_client_ca
2211                .as_ref()
2212                .map(std::fs::read)
2213                .transpose()
2214                .map_err(Error::Io)?;
2215            let rest_config = Arc::new(rustls_server_config(
2216                &cert_pem,
2217                &key_pem,
2218                client_ca_pem.as_deref(),
2219            )?);
2220            Ok(Some(TlsMaterial {
2221                cert_pem,
2222                key_pem,
2223                client_ca_pem,
2224                rest_config,
2225            }))
2226        }
2227        (None, None) => Ok(None),
2228        _ => Err(Error::Config(
2229            "tls_cert and tls_key must be set together".to_owned(),
2230        )),
2231    }
2232}
2233
2234// Build a rustls server config from PEM bytes over the audited `ring` provider
2235// (no OpenSSL, no aws-lc-rs C toolchain). TLS 1.3 and 1.2 are offered. When a
2236// client CA is supplied, client certificates chaining to it are required (mTLS).
2237fn rustls_server_config(
2238    cert_pem: &[u8],
2239    key_pem: &[u8],
2240    client_ca_pem: Option<&[u8]>,
2241) -> Result<rustls::ServerConfig, Error> {
2242    use rustls_pki_types::pem::PemObject;
2243    use rustls_pki_types::{CertificateDer, PrivateKeyDer};
2244
2245    let certs = CertificateDer::pem_slice_iter(cert_pem)
2246        .collect::<std::result::Result<Vec<_>, _>>()
2247        .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
2248    if certs.is_empty() {
2249        return Err(Error::Config(
2250            "tls_cert contains no certificates".to_owned(),
2251        ));
2252    }
2253    let key = PrivateKeyDer::from_pem_slice(key_pem)
2254        .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
2255    let provider = Arc::new(rustls::crypto::ring::default_provider());
2256    let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
2257        .with_safe_default_protocol_versions()
2258        .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
2259    let builder = match client_ca_pem {
2260        Some(ca_pem) => {
2261            let mut roots = rustls::RootCertStore::empty();
2262            for cert in CertificateDer::pem_slice_iter(ca_pem) {
2263                let cert =
2264                    cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
2265                roots
2266                    .add(cert)
2267                    .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
2268            }
2269            let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
2270                Arc::new(roots),
2271                provider,
2272            )
2273            .build()
2274            .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
2275            builder.with_client_cert_verifier(verifier)
2276        }
2277        None => builder.with_no_client_auth(),
2278    };
2279    builder
2280        .with_single_cert(certs, key)
2281        .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
2282}
2283
2284/// Initialize structured logging from `RUST_LOG` (defaulting to `info`). Safe to
2285/// call once at startup; a second call is ignored.
2286pub fn init_tracing() {
2287    init_observability(&Config::default());
2288}
2289
2290/// Install the global tracing subscriber: an `RUST_LOG`-driven `fmt` layer plus,
2291/// when the `otlp` feature is built **and** an OTLP endpoint is configured
2292/// (ADR-0059), an OpenTelemetry traces export layer. Safe to call once at
2293/// startup; a second call is a no-op. A failure to build the OTLP exporter logs a
2294/// warning and falls back to `fmt`-only rather than taking the server down.
2295#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
2296pub fn init_observability(config: &Config) {
2297    use tracing_subscriber::EnvFilter;
2298    use tracing_subscriber::prelude::*;
2299    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
2300    let registry = tracing_subscriber::registry()
2301        .with(filter)
2302        .with(tracing_subscriber::fmt::layer());
2303
2304    #[cfg(feature = "otlp")]
2305    if config.otlp.is_enabled() {
2306        match otlp::build_provider(&config.otlp) {
2307            Ok(provider) => {
2308                use opentelemetry::trace::TracerProvider as _;
2309                let tracer = provider.tracer("quiver");
2310                otlp::store_provider(provider);
2311                let _ = registry
2312                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
2313                    .try_init();
2314                return;
2315            }
2316            Err(e) => eprintln!("OTLP traces export disabled: {e}"),
2317        }
2318    }
2319
2320    let _ = registry.try_init();
2321}
2322
2323/// Flush and shut down the OpenTelemetry exporter, if one was installed. A no-op
2324/// without the `otlp` feature or when no endpoint was configured. Call once on
2325/// server shutdown so batched spans are not lost.
2326pub fn shutdown_observability() {
2327    #[cfg(feature = "otlp")]
2328    otlp::shutdown();
2329}
2330
2331#[cfg(test)]
2332mod tests {
2333    use super::*;
2334
2335    // A valid 64-hex-character (256-bit) test key.
2336    const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
2337
2338    #[test]
2339    fn config_rejects_missing_keys_unless_insecure() {
2340        let mut config = Config::default();
2341        assert!(config.validate().is_err());
2342        config.insecure = true;
2343        assert!(config.validate().is_ok());
2344        config.insecure = false;
2345        config.api_keys = vec!["secret".into()];
2346        config.encryption_key = Some(TEST_KEY.to_owned());
2347        assert!(config.validate().is_ok());
2348    }
2349
2350    #[test]
2351    fn config_requires_encryption_key_unless_insecure() {
2352        let mut config = Config {
2353            api_keys: vec!["secret".into()],
2354            ..Config::default()
2355        };
2356        // API key set but no encryption key, not insecure ⇒ rejected.
2357        assert!(config.validate().is_err());
2358        config.encryption_key = Some(TEST_KEY.to_owned());
2359        assert!(config.validate().is_ok());
2360        // A malformed key is rejected up front, not at first write.
2361        config.encryption_key = Some("not-a-valid-hex-key".to_owned());
2362        assert!(config.validate().is_err());
2363        // Insecure mode may run without encryption-at-rest.
2364        config.insecure = true;
2365        config.encryption_key = None;
2366        assert!(config.validate().is_ok());
2367    }
2368
2369    #[test]
2370    fn master_key_file_is_an_alternative_to_the_env_key() {
2371        let dir = tempfile::tempdir().unwrap();
2372        let path = dir.path().join("master.key");
2373        // A trailing newline (as editors and `echo` add) is trimmed off.
2374        std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2375
2376        let mut config = Config {
2377            api_keys: vec!["secret".into()],
2378            master_key_file: Some(path.clone()),
2379            ..Config::default()
2380        };
2381        // The file alone satisfies encryption-at-rest and resolves to the key.
2382        assert!(config.validate().is_ok());
2383        assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2384
2385        // Setting both the env key and a file is rejected as ambiguous.
2386        config.encryption_key = Some(TEST_KEY.to_owned());
2387        assert!(config.validate().is_err());
2388
2389        // A file holding malformed hex is rejected up front.
2390        config.encryption_key = None;
2391        std::fs::write(&path, "not-a-valid-key").unwrap();
2392        assert!(config.validate().is_err());
2393    }
2394
2395    #[test]
2396    fn config_rejects_public_bind_without_optout() {
2397        let mut config = Config {
2398            api_keys: vec!["secret".into()],
2399            encryption_key: Some(TEST_KEY.to_owned()),
2400            ..Config::default()
2401        };
2402        config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2403        // Auth and encryption are satisfied, so the only failure is the bind rule.
2404        assert!(config.validate().is_err());
2405        config.insecure = true;
2406        assert!(config.validate().is_ok());
2407    }
2408
2409    #[test]
2410    fn config_public_bind_allowed_with_tls() {
2411        let config = Config {
2412            api_keys: vec!["secret".into()],
2413            encryption_key: Some(TEST_KEY.to_owned()),
2414            tls_cert: Some(PathBuf::from("cert.pem")),
2415            tls_key: Some(PathBuf::from("key.pem")),
2416            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2417            ..Config::default()
2418        };
2419        // TLS configured ⇒ a non-loopback bind is allowed without insecure.
2420        assert!(config.validate().is_ok());
2421    }
2422
2423    #[test]
2424    fn config_tls_cert_and_key_must_pair() {
2425        let mut config = Config {
2426            api_keys: vec!["secret".into()],
2427            encryption_key: Some(TEST_KEY.to_owned()),
2428            tls_cert: Some(PathBuf::from("cert.pem")),
2429            ..Config::default()
2430        };
2431        // Cert without key ⇒ rejected.
2432        assert!(config.validate().is_err());
2433        config.tls_key = Some(PathBuf::from("key.pem"));
2434        assert!(config.validate().is_ok());
2435    }
2436}