Skip to main content

quiver_server/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The Quiver daemon: gRPC and REST over the embeddable [`Database`], with
3//! API-key auth and secure-by-default configuration.
4//!
5//! Both transports are thin shells over the same shared engine operations; the
6//! engine is synchronous and CPU/`fsync`-bound, so every call is offloaded with
7//! `spawn_blocking`. Access is guarded by a reader–writer lock (ADR-0057): reads
8//! take the shared lock and run **concurrently**, writes take the exclusive lock,
9//! and the single-writer model is unchanged (ADR-0002/0006). A read that finds a
10//! collection's index stale (a prior write deferred its rebuild) serves the prior
11//! snapshot and schedules an **off-lock** rebuild (ADR-0062): the index is rebuilt
12//! under the shared lock and swapped in under a brief write lock, so a rebuild never
13//! stalls concurrent readers.
14//!
15//! Auth is by scoped API key (Bearer / gRPC `authorization` metadata) with
16//! default-deny RBAC: each key carries a role (read ⊆ write ⊆ admin) and a
17//! collection scope, enforced on every operation at the shared op layer
18//! (ADR-0011/0013, the `auth` module). Encryption-at-rest is on by default
19//! (ADR-0010): unless `insecure` is set, an `encryption_key` is required and the
20//! engine is opened through `quiver-crypto`'s AEAD codec; payloads may also be
21//! client-side-encrypted (ADR-0012). TLS-in-transit uses `rustls` over the
22//! audited `ring` provider — REST via `axum-server`, gRPC via tonic's `tls-ring`
23//! — and a non-loopback bind requires it; setting a client CA additionally
24//! requires mutual TLS. Mutating and administrative operations, and every
25//! access-control denial, are recorded to an append-only audit log (ADR-0011,
26//! the `audit` module) when `audit_log` is set. Per-request cost limits bound
27//! the work any single authenticated request can demand (ADR-0040, the
28//! [`Limits`] type), and an opt-in per-key token-bucket rate limiter bounds the
29//! request *rate* (ADR-0049, the [`RateLimiter`] type); per-tenant engine
30//! partitioning is a later phase. Design: `docs/api/rest-grpc.md`.
31
32mod audit;
33mod auth;
34mod cluster;
35mod coordinator;
36mod error;
37mod grpc;
38mod metrics;
39mod otlp;
40#[cfg(feature = "raft")]
41pub mod raft;
42mod rate_limit;
43mod replication;
44mod rest;
45
46use std::collections::{HashMap, HashSet};
47use std::future::Future;
48use std::net::{IpAddr, Ipv4Addr, SocketAddr};
49use std::path::PathBuf;
50use std::pin::Pin;
51use std::sync::{Arc, Mutex, RwLock};
52
53use axum_server::tls_rustls::RustlsConfig;
54use figment::Figment;
55use figment::providers::{Env, Format, Serialized, Toml};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use tokio::net::TcpListener;
59use tokio::sync::broadcast;
60use tokio_stream::wrappers::TcpListenerStream;
61use tonic::transport::{Certificate, Identity, ServerTlsConfig};
62
63use quiver_crypto::AeadCodec;
64use quiver_embed::{
65    Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
66    SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
67};
68use quiver_query::Filter;
69
70pub use auth::{Action, ApiKey, CollectionScope};
71pub use error::Error;
72pub use otlp::OtlpConfig;
73// The embedding/rerank seam lives in its own lean crate (ADR-0058) so the MCP
74// server can share it; re-exported here so the server's public API is unchanged.
75pub use coordinator::AutoscaleConfig;
76pub use quiver_providers::{
77    EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
78    RerankProvider,
79};
80pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
81
82use audit::{AuditLog, Outcome};
83use auth::Principal;
84
85/// Per-request cost limits (ADR-0040). Each cap bounds the work a single
86/// authenticated request can demand, so one oversized request cannot exhaust the
87/// node under the single-writer model (ADR-0006). Over-limit requests are
88/// **rejected** with HTTP 400 / gRPC `InvalidArgument` rather than silently
89/// clamped — a truncated `k` or `ef_search` would return surprising, lower-quality
90/// results with no signal. Defaults are generous; raise a cap with a `[limits]`
91/// table in `quiver.toml` or the matching `QUIVER_MAX_*` environment variable.
92#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
93#[serde(default)]
94pub struct Limits {
95    /// Maximum `k` (top-k) for `search` / `search_multi_vector`.
96    pub max_k: usize,
97    /// Maximum search beam width (`ef_search`).
98    pub max_ef_search: usize,
99    /// Maximum `fetch` page size.
100    pub max_fetch_limit: usize,
101    /// Maximum vector dimension: the declared collection dimension at creation
102    /// and the length of any query vector (per token, for multi-vector).
103    pub max_vector_dim: usize,
104    /// Maximum serialized-JSON payload size per point, in bytes.
105    pub max_payload_bytes: usize,
106    /// Maximum number of points / documents in a single upsert request.
107    pub max_batch_size: usize,
108    /// Maximum HTTP request body size, in bytes (enforced by the REST layer via
109    /// axum's `DefaultBodyLimit`; gRPC is bounded by tonic's decode limit).
110    pub max_request_body_bytes: usize,
111    /// Maximum number of non-zero terms in a hybrid-search sparse query (ADR-0043),
112    /// bounding the posting-list scan.
113    pub max_sparse_terms: usize,
114    /// Maximum number of points in a single bulk upsert (`POST …/points:bulk`,
115    /// ADR-0045). Larger than `max_batch_size` because the bulk path defers index
116    /// maintenance to one rebuild; the request is still bounded by
117    /// `max_request_body_bytes`, so raise that too for very large bulk loads.
118    pub max_bulk_batch_size: usize,
119}
120
121impl Default for Limits {
122    fn default() -> Self {
123        Self {
124            max_k: 10_000,
125            max_ef_search: 4_096,
126            max_fetch_limit: 10_000,
127            max_vector_dim: 8_192,
128            max_payload_bytes: 65_536,
129            max_batch_size: 1_000,
130            max_request_body_bytes: 32 * 1024 * 1024,
131            max_sparse_terms: 4_096,
132            max_bulk_batch_size: 50_000,
133        }
134    }
135}
136
137impl Limits {
138    // Apply `QUIVER_MAX_*` overrides after figment extraction (ADR-0013 env
139    // layer). The flat env keys do not nest under figment's `limits` table, so
140    // they are read explicitly here; a malformed value is a hard config error.
141    fn apply_env_overrides(&mut self) -> Result<(), Error> {
142        let slots: [(&str, &mut usize); 9] = [
143            ("QUIVER_MAX_K", &mut self.max_k),
144            ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
145            ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
146            ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
147            ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
148            ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
149            (
150                "QUIVER_MAX_REQUEST_BODY_BYTES",
151                &mut self.max_request_body_bytes,
152            ),
153            ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
154            ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
155        ];
156        for (key, slot) in slots {
157            if let Ok(raw) = std::env::var(key) {
158                *slot = raw.parse().map_err(|_| {
159                    Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
160                })?;
161            }
162        }
163        Ok(())
164    }
165
166    // Reject a zero cap (a `0` limit would refuse every request).
167    fn validate(&self) -> Result<(), Error> {
168        let named = [
169            ("max_k", self.max_k),
170            ("max_ef_search", self.max_ef_search),
171            ("max_fetch_limit", self.max_fetch_limit),
172            ("max_vector_dim", self.max_vector_dim),
173            ("max_payload_bytes", self.max_payload_bytes),
174            ("max_batch_size", self.max_batch_size),
175            ("max_request_body_bytes", self.max_request_body_bytes),
176            ("max_sparse_terms", self.max_sparse_terms),
177            ("max_bulk_batch_size", self.max_bulk_batch_size),
178        ];
179        if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
180            return Err(Error::Config(format!(
181                "limits.{name} must be greater than zero"
182            )));
183        }
184        Ok(())
185    }
186
187    fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
188        if k > self.max_k {
189            return Err(Error::BadRequest(format!(
190                "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
191                self.max_k
192            )));
193        }
194        if ef_search > self.max_ef_search {
195            return Err(Error::BadRequest(format!(
196                "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
197                self.max_ef_search
198            )));
199        }
200        Ok(())
201    }
202
203    fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
204        if n > self.max_sparse_terms {
205            return Err(Error::BadRequest(format!(
206                "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
207                self.max_sparse_terms
208            )));
209        }
210        Ok(())
211    }
212
213    fn check_fetch(&self, limit: usize) -> Result<(), Error> {
214        if limit > self.max_fetch_limit {
215            return Err(Error::BadRequest(format!(
216                "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
217                self.max_fetch_limit
218            )));
219        }
220        Ok(())
221    }
222
223    fn check_dim(&self, dim: usize) -> Result<(), Error> {
224        if dim > self.max_vector_dim {
225            return Err(Error::BadRequest(format!(
226                "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
227                self.max_vector_dim
228            )));
229        }
230        Ok(())
231    }
232
233    fn check_vector_len(&self, len: usize) -> Result<(), Error> {
234        if len > self.max_vector_dim {
235            return Err(Error::BadRequest(format!(
236                "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
237                self.max_vector_dim
238            )));
239        }
240        Ok(())
241    }
242
243    fn check_batch(&self, n: usize) -> Result<(), Error> {
244        if n > self.max_batch_size {
245            return Err(Error::BadRequest(format!(
246                "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
247                self.max_batch_size
248            )));
249        }
250        Ok(())
251    }
252
253    fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
254        if n > self.max_bulk_batch_size {
255            return Err(Error::BadRequest(format!(
256                "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
257                self.max_bulk_batch_size
258            )));
259        }
260        Ok(())
261    }
262
263    fn check_payload(&self, payload: &Value) -> Result<(), Error> {
264        let size = serde_json::to_vec(payload)
265            .map(|v| v.len())
266            .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
267        if size > self.max_payload_bytes {
268            return Err(Error::BadRequest(format!(
269                "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
270                self.max_payload_bytes
271            )));
272        }
273        Ok(())
274    }
275}
276
277/// Server configuration, layered defaults → `quiver.toml` → `QUIVER_*` env and
278/// validated at startup (ADR-0013).
279#[derive(Debug, Clone, Serialize, Deserialize)]
280#[serde(default)]
281pub struct Config {
282    /// Data directory for the storage engine.
283    pub data_dir: PathBuf,
284    /// REST (HTTP/1.1) bind address.
285    pub rest_addr: SocketAddr,
286    /// gRPC (HTTP/2) bind address.
287    pub grpc_addr: SocketAddr,
288    /// Accepted API keys with their RBAC scopes (ADR-0011). A bare secret —
289    /// from a comma-separated `QUIVER_API_KEYS` (the env form) or a plain TOML
290    /// array entry — is an all-collections admin key; a structured
291    /// `{secret, role, collections}` entry pins a narrower scope. Empty is
292    /// allowed only with `insecure = true`.
293    #[serde(default, deserialize_with = "auth::de_api_keys")]
294    pub api_keys: Vec<ApiKey>,
295    /// Hex-encoded 256-bit **master key** for encryption-at-rest (64 hex
296    /// characters). It wraps per-collection data-encryption keys (ADR-0010).
297    /// Required unless `insecure = true` or [`master_key_file`] is set; source it
298    /// from the environment or a secret store, never the committed config. `None`
299    /// ⇒ data is stored unencrypted (only valid in `insecure` mode).
300    ///
301    /// [`master_key_file`]: Config::master_key_file
302    pub encryption_key: Option<String>,
303    /// Path to a file holding the hex master key, as an alternative to
304    /// [`encryption_key`] (set exactly one). Lets the key arrive as a mounted
305    /// secret (Docker/Kubernetes) or a KMS-decrypted file rather than an
306    /// environment variable. It should be mode `0600`; a group/world-accessible
307    /// file is warned about at startup.
308    ///
309    /// [`encryption_key`]: Config::encryption_key
310    pub master_key_file: Option<PathBuf>,
311    /// Path to the PEM-encoded TLS certificate chain. Must be set together with
312    /// `tls_key`. Required for a non-loopback bind unless `insecure = true`.
313    pub tls_cert: Option<PathBuf>,
314    /// Path to the PEM-encoded TLS private key. Must be set together with
315    /// `tls_cert`.
316    pub tls_key: Option<PathBuf>,
317    /// Path to a PEM-encoded CA certificate that signs accepted client
318    /// certificates. When set, both transports require **mutual TLS**: a client
319    /// must present a certificate chaining to this CA to connect (ADR-0011).
320    /// Requires `tls_cert`/`tls_key`; bearer API keys still carry the RBAC scope.
321    pub tls_client_ca: Option<PathBuf>,
322    /// Path to an append-only audit log file (ADR-0011). When set, every
323    /// mutating and administrative operation and every access-control denial is
324    /// appended as one JSON object per line (JSON Lines); records are always
325    /// also emitted as `tracing` events. Unset ⇒ tracing only. Secrets are never
326    /// written — see `docs/security/audit.md`.
327    pub audit_log: Option<PathBuf>,
328    /// Run as a **read-replica follower** (ADR-0030): connect to a leader's gRPC
329    /// endpoint at this URL (e.g. `http://leader:6334`) and continuously apply its
330    /// committed operations, serving reads. A follower **refuses writes**. Unset ⇒
331    /// this node is a normal read-write leader. (Plaintext `http://` for now; TLS
332    /// to the leader is a follow-up — run replication over a trusted network.)
333    pub leader_url: Option<String>,
334    /// API key the follower presents to the leader's admin-scoped `Replicate`
335    /// stream (used with `leader_url`). Source it like any secret.
336    pub leader_api_key: Option<String>,
337    /// Opt out of the secure defaults (no auth, no encryption-at-rest, allow a
338    /// non-loopback bind without TLS). For local development only; never the
339    /// default.
340    pub insecure: bool,
341    /// Per-request cost limits (ADR-0040). Set with a `[limits]` table in
342    /// `quiver.toml` or the `QUIVER_MAX_*` environment variables.
343    pub limits: Limits,
344    /// Opt-in server-side embedding providers, keyed by collection name (ADR-0047).
345    /// Configured with `[embedding.<collection>]` tables in `quiver.toml`; default
346    /// empty, so the engine stays model-agnostic and library mode is unaffected.
347    /// API keys are referenced by env-var *name* and resolved at startup, never
348    /// stored. Enables `search_text` / `upsert_text` for the named collections.
349    #[serde(default)]
350    pub embedding: HashMap<String, EmbeddingConfig>,
351    /// Opt-in server-side rerank providers, keyed by collection name (ADR-0047).
352    /// Configured with `[rerank.<collection>]` tables; enables the one-call
353    /// retrieve→rerank stage of `search_text`.
354    #[serde(default)]
355    pub rerank: HashMap<String, RerankConfig>,
356    /// Opt-in per-key rate limiting (ADR-0049). Set a `[rate_limit]` table in
357    /// `quiver.toml` or the `QUIVER_RATE_LIMIT_*` env vars;
358    /// `requests_per_second = 0` (the default) disables it.
359    #[serde(default)]
360    pub rate_limit: RateLimitConfig,
361    /// Opt-in OpenTelemetry traces export (ADR-0059). Set an `[otlp]` table or the
362    /// `QUIVER_OTLP_*` env vars; an empty `endpoint` (the default) disables it.
363    /// Export additionally requires the server's `otlp` build feature.
364    #[serde(default)]
365    pub otlp: OtlpConfig,
366    /// Lock-free MVCC reads (ADR-0064), **experimental and default-off**. Serves
367    /// reads of single-vector, in-memory collections from an `arc-swap` snapshot so
368    /// pure-vector reads never block on a concurrent writer. Also settable via
369    /// `QUIVER_MVCC_READS`. See `docs/adr/0064-mvcc-reads-implementation.md`.
370    #[serde(default)]
371    pub mvcc_reads: bool,
372    /// Opt-in **cluster router** mode (ADR-0065): a non-empty list of shard base
373    /// URLs (e.g. `["http://s1:6333","http://s2:6333"]`, or the bracketed
374    /// `QUIVER_CLUSTER_SHARDS` env array `[http://s1:6333,http://s2:6333]`) makes
375    /// this server a stateless router that shards writes and scatter-gathers
376    /// searches across the shards. Empty (the default) = an ordinary single-node
377    /// server.
378    #[serde(default)]
379    pub cluster_shards: Vec<String>,
380    /// Optional per-shard **read replicas** (ADR-0065 increment 2). Each entry is
381    /// `"<shard_index>=<replica_url>"` (repeat the index for several replicas of one
382    /// shard), e.g. `["0=http://s1b:6333", "1=http://s2b:6333"]` or the bracketed
383    /// `QUIVER_CLUSTER_REPLICAS` env list. A replica is an ordinary follower
384    /// (ADR-0030) of its shard's primary; the router fans searches across
385    /// `{primary} ∪ replicas` to spread read load. A shard with no entry is
386    /// primary-only. Empty (the default) = no replicas.
387    #[serde(default)]
388    pub cluster_replicas: Vec<String>,
389    /// Optional API key the router presents to its shards (a cluster runs over a
390    /// trusted network). `None` = shards are unauthenticated.
391    #[serde(default)]
392    pub cluster_shard_key: Option<String>,
393    /// Run this process as the **cluster coordinator** (ADR-0066) instead of a
394    /// normal server: it owns the authoritative versioned shard map and serves the
395    /// `/cluster/*` membership API, off the data path. It seeds its map from
396    /// `cluster_shards`/`cluster_replicas` (or recovers it from `coordinator_state`).
397    /// Default `false`.
398    #[serde(default)]
399    pub coordinator: bool,
400    /// Base URL of the cluster coordinator (ADR-0066). When set on a **router**, the
401    /// router refreshes its shard map from `GET {url}/cluster/map` on an interval, so
402    /// a membership change propagates without a restart. `None` = a static map from
403    /// `cluster_shards` (increments 1–2 behaviour).
404    #[serde(default)]
405    pub coordinator_url: Option<String>,
406    /// Where the **coordinator** persists its versioned map + id counter, so a
407    /// restart recovers the exact membership (and never reuses a shard id). `None` =
408    /// in-memory only (lost on restart). Only meaningful with `coordinator = true`.
409    #[serde(default)]
410    pub coordinator_state: Option<PathBuf>,
411    /// Opt-in automatic **scale-out** policy for a coordinator (ADR-0065 increment
412    /// 5). Set an `[autoscale]` table in `quiver.toml` (or `QUIVER_AUTOSCALE_*`):
413    /// `enabled`, a per-shard `high_water_points` threshold, a `standby_urls` pool to
414    /// grow into, and `interval_secs` / `cooldown_secs` / `max_shards`. Default off.
415    #[serde(default)]
416    pub autoscale: AutoscaleConfig,
417    /// This node's **Raft member id** within its shard's group (ADR-0067, write
418    /// HA). Setting it opts the node into per-shard Raft: it joins the group
419    /// described by [`raft_members`], commits writes only after a **quorum**
420    /// commit, and fails over automatically if the leader dies. `None` (the
421    /// default) = the single-primary shard of increments 1–3, unchanged. Requires
422    /// the server's `raft` build feature.
423    ///
424    /// [`raft_members`]: Config::raft_members
425    #[serde(default)]
426    pub raft_node_id: Option<u64>,
427    /// The Raft group's members as `"<id>=<grpc-url>"` entries (ADR-0067), e.g.
428    /// `["1=http://n1:6334","2=http://n2:6334","3=http://n3:6334"]` or the
429    /// bracketed `QUIVER_RAFT_MEMBERS` env array. Each URL is the member's gRPC
430    /// address (the `RaftService` rides the existing gRPC server). Used only when
431    /// [`raft_node_id`] is set; the lowest id bootstraps the group.
432    ///
433    /// [`raft_node_id`]: Config::raft_node_id
434    #[serde(default)]
435    pub raft_members: Vec<String>,
436}
437
438impl Default for Config {
439    fn default() -> Self {
440        Self {
441            data_dir: PathBuf::from("./quiver-data"),
442            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
443            grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
444            api_keys: Vec::new(),
445            encryption_key: None,
446            master_key_file: None,
447            tls_cert: None,
448            tls_key: None,
449            tls_client_ca: None,
450            audit_log: None,
451            leader_url: None,
452            leader_api_key: None,
453            insecure: false,
454            limits: Limits::default(),
455            embedding: HashMap::new(),
456            rerank: HashMap::new(),
457            rate_limit: RateLimitConfig::default(),
458            otlp: OtlpConfig::default(),
459            mvcc_reads: false,
460            cluster_shards: Vec::new(),
461            cluster_replicas: Vec::new(),
462            cluster_shard_key: None,
463            coordinator: false,
464            coordinator_url: None,
465            coordinator_state: None,
466            autoscale: AutoscaleConfig::default(),
467            raft_node_id: None,
468            raft_members: Vec::new(),
469        }
470    }
471}
472
473impl Config {
474    /// Load configuration from defaults, an optional `quiver.toml`, and
475    /// `QUIVER_*` environment variables.
476    pub fn load() -> Result<Self, Error> {
477        let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
478            .merge(Toml::file("quiver.toml"))
479            .merge(Env::prefixed("QUIVER_"))
480            .extract()
481            .map_err(|e| Error::Config(e.to_string()))?;
482        // The flat `QUIVER_MAX_*` env keys do not nest under the `limits` table
483        // figment builds, so apply them explicitly (ADR-0040).
484        config.limits.apply_env_overrides()?;
485        // Same for the flat `QUIVER_RATE_LIMIT_*` keys (ADR-0049).
486        config
487            .rate_limit
488            .apply_env_overrides()
489            .map_err(Error::Config)?;
490        // Same for the flat `QUIVER_OTLP_*` keys (ADR-0059).
491        config.otlp.apply_env_overrides().map_err(Error::Config)?;
492        Ok(config)
493    }
494
495    /// Reject insecure configurations unless explicitly opted out (ADR-0013):
496    /// no anonymous access, encryption-at-rest on by default with a valid key,
497    /// and no non-loopback bind without TLS.
498    pub fn validate(&self) -> Result<(), Error> {
499        if self.api_keys.is_empty() && !self.insecure {
500            return Err(Error::Config(
501                "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
502                 set insecure=true for local development"
503                    .to_owned(),
504            ));
505        }
506        // Resolve the master key from the env var or a key file (exactly one).
507        let master_key = self.master_key_hex()?;
508        if master_key.is_none() && !self.insecure {
509            return Err(Error::Config(
510                "no encryption key configured: encryption-at-rest is on by default — \
511                 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
512                 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
513                 store data unencrypted (development only)"
514                    .to_owned(),
515            ));
516        }
517        // Fail fast on a malformed key rather than at first write.
518        if let Some(key) = &master_key {
519            AeadCodec::from_hex(key)
520                .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
521        }
522        // TLS certificate and key are set together or not at all.
523        if self.tls_cert.is_some() != self.tls_key.is_some() {
524            return Err(Error::Config(
525                "tls_cert and tls_key must be set together".to_owned(),
526            ));
527        }
528        // mTLS layers on top of server TLS: a client CA needs a server cert/key.
529        if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
530            return Err(Error::Config(
531                "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
532            ));
533        }
534        let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
535        let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
536        if non_loopback && !tls_enabled && !self.insecure {
537            return Err(Error::Config(
538                "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
539                 or insecure=true for local development"
540                    .to_owned(),
541            ));
542        }
543        // Reject a nonsensical cost limit (a `0` cap would refuse every request).
544        self.limits.validate()?;
545        Ok(())
546    }
547
548    /// The effective hex master key: from [`master_key_file`] when set (read and
549    /// trimmed), otherwise [`encryption_key`]. `None` means no key is configured
550    /// (only valid with `insecure`).
551    ///
552    /// # Errors
553    /// [`Error::Config`] if both sources are set, or the key file cannot be read.
554    ///
555    /// [`master_key_file`]: Config::master_key_file
556    /// [`encryption_key`]: Config::encryption_key
557    pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
558        let env_key = self
559            .encryption_key
560            .as_deref()
561            .map(str::trim)
562            .filter(|k| !k.is_empty());
563        match (&self.master_key_file, env_key) {
564            (Some(_), Some(_)) => Err(Error::Config(
565                "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
566                 (QUIVER_MASTER_KEY_FILE), not both"
567                    .to_owned(),
568            )),
569            (Some(path), None) => {
570                warn_if_world_readable(path);
571                let hex = std::fs::read_to_string(path).map_err(|e| {
572                    Error::Config(format!("reading master_key_file {}: {e}", path.display()))
573                })?;
574                Ok(Some(hex.trim().to_owned()))
575            }
576            (None, Some(key)) => Ok(Some(key.to_owned())),
577            (None, None) => Ok(None),
578        }
579    }
580}
581
582// Warn (don't fail — Docker/Kubernetes secrets often mount group/world-readable)
583// when a master-key file is more permissive than `0600`.
584#[cfg(unix)]
585fn warn_if_world_readable(path: &std::path::Path) {
586    use std::os::unix::fs::PermissionsExt;
587    if let Ok(meta) = std::fs::metadata(path)
588        && meta.permissions().mode() & 0o077 != 0
589    {
590        tracing::warn!(
591            path = %path.display(),
592            mode = format!("{:o}", meta.permissions().mode() & 0o777),
593            "master key file is group/world-accessible; restrict it to 0600"
594        );
595    }
596}
597
598#[cfg(not(unix))]
599fn warn_if_world_readable(_path: &std::path::Path) {}
600
601/// Shared server state: the engine behind a single-writer lock, the accepted
602/// API keys with their RBAC scopes, and the audit log.
603#[derive(Clone)]
604pub(crate) struct AppState {
605    // The engine behind a reader–writer lock (ADR-0057): reads take the shared
606    // lock and run concurrently; writes take the exclusive lock. A read that finds
607    // a collection's index stale upgrades to the write lock once to rebuild it,
608    // then serves concurrently again.
609    db: Arc<RwLock<Database>>,
610    keys: Arc<Vec<ApiKey>>,
611    audit: Arc<AuditLog>,
612    // Fan-out of every committed op to replication followers (ADR-0030). The
613    // commit observer publishes here; each `Replicate` stream subscribes.
614    replication_tx: broadcast::Sender<WalEntry>,
615    // True on a replication follower: external writes are refused; the engine's
616    // state is owned by the stream it applies from the leader (ADR-0030).
617    read_only: bool,
618    // Per-request cost limits, enforced at this op layer so both transports are
619    // covered by one implementation (ADR-0040).
620    limits: Limits,
621    // Opt-in, provider-agnostic server-side embedding/rerank providers, keyed by
622    // collection (ADR-0047). Empty on the common path; `search_text`/`upsert_text`
623    // require a configured embedder for the target collection.
624    embed: Arc<EmbedRegistry>,
625    // Opt-in per-key token-bucket rate limiter (ADR-0049). A no-op when disabled.
626    rate_limiter: Arc<RateLimiter>,
627    // Prometheus metrics registry (ADR-0014/0054), scraped at `GET /metrics`.
628    metrics: Arc<metrics::Metrics>,
629    // Collections with an off-lock index rebuild in flight (ADR-0062). A search that
630    // observes a deferred rebuild schedules one here, single-flighted so concurrent
631    // readers never kick off duplicate builds for the same collection.
632    rebuilding: Arc<Mutex<HashSet<String>>>,
633    // Lock-free MVCC read cache (ADR-0064 increment 3): cached `arc-swap` snapshot
634    // cells for MVCC-served collections. A pure-vector search loads the cell and
635    // searches it **without taking the database lock**, so it never blocks on a
636    // concurrent writer. Populated under the read lock the first time a collection
637    // is seen fresh; the map itself changes only on create/drop, never on a data
638    // write. Empty unless `QUIVER_MVCC_READS` is on.
639    snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
640    // Snapshot of the engine's `QUIVER_MVCC_READS` flag, read once at startup. When
641    // on, the writer drives off-lock consolidation rebuilds (ADR-0064 increment 3)
642    // because pure-vector fast-path reads bypass the reader-driven scheduler. Off by
643    // default, so the non-MVCC path is byte-identical.
644    mvcc: bool,
645    // Opt-in cluster router (ADR-0065): when `Some`, every collection/point op
646    // fans out to the shards instead of touching the local engine. `None` = an
647    // ordinary single-node server (the engine-backed path, untouched).
648    cluster: Option<Arc<cluster::Cluster>>,
649    // Opt-in per-shard Raft write HA (ADR-0067): when `Some`, this node is a Raft
650    // voter for its shard — writes are proposed through consensus and applied only
651    // after a quorum commits, so a leader failure fails over with no lost
652    // acknowledged write. `None` = the single-primary shard (increments 1–3).
653    // Compiled only behind the off-by-default `raft` feature.
654    #[cfg(feature = "raft")]
655    raft: Option<Arc<raft::RaftShard>>,
656}
657
658/// A collection's metadata.
659pub(crate) struct CollectionInfo {
660    pub name: String,
661    pub dim: u32,
662    pub metric: DistanceMetric,
663    pub count: u64,
664    pub index: IndexSpec,
665    pub filterable: Vec<FilterableField>,
666    pub multivector: bool,
667    pub vector_encryption: VectorEncryption,
668}
669
670/// A point to upsert.
671pub(crate) struct PointIn {
672    pub id: String,
673    pub vector: Vec<f32>,
674    pub payload: Value,
675}
676
677/// A text point to embed server-side and upsert (ADR-0047).
678pub(crate) struct TextPointIn {
679    pub id: String,
680    pub text: String,
681    pub payload: Value,
682}
683
684/// Default candidate pool size a rerank stage over-fetches before reordering to
685/// the requested `k` (ADR-0047).
686const RERANK_CANDIDATES: usize = 50;
687
688/// The document text a rerank stage scores: the original text stored under
689/// [`TEXT_KEY`] by `upsert_text`, else the whole payload as a string so the
690/// reranker still has something to compare.
691fn doc_text(payload: Option<&Value>) -> String {
692    match payload {
693        Some(Value::Object(map)) => map
694            .get(TEXT_KEY)
695            .and_then(Value::as_str)
696            .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
697        Some(v) => v.to_string(),
698        None => String::new(),
699    }
700}
701
702/// A fetched point.
703pub(crate) struct PointOut {
704    pub id: String,
705    pub vector: Option<Vec<f32>>,
706    pub payload: Value,
707}
708
709/// A search hit.
710pub(crate) struct MatchOut {
711    pub id: String,
712    pub score: f32,
713    pub payload: Option<Value>,
714    pub vector: Option<Vec<f32>>,
715}
716
717/// A multi-vector document to upsert.
718pub(crate) struct DocumentIn {
719    pub id: String,
720    pub vectors: Vec<Vec<f32>>,
721    pub payload: Value,
722}
723
724/// A multi-vector document hit (MaxSim).
725pub(crate) struct DocumentMatchOut {
726    pub id: String,
727    pub score: f32,
728    pub payload: Option<Value>,
729    pub vectors: Option<Vec<Vec<f32>>>,
730}
731
732impl AppState {
733    /// Authenticate a presented bearer token to its [`Principal`], or `None`
734    /// (a 401). An empty key set means `insecure` mode (validated at startup),
735    /// which admits any caller as an all-collections admin.
736    pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
737        auth::authenticate(&self.keys, presented)
738    }
739
740    /// Consume one rate-limit token for `actor` (ADR-0049). A no-op `Allowed` when
741    /// rate limiting is disabled. Both transports call this at their auth choke
742    /// point so the limiter is enforced by one implementation.
743    pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
744        self.rate_limiter.check(actor)
745    }
746
747    /// Whether the per-key rate limiter is active (lets a transport skip the work
748    /// entirely on the common, disabled path).
749    pub(crate) fn rate_limit_enabled(&self) -> bool {
750        self.rate_limiter.enabled()
751    }
752
753    // Run a **mutating** engine op behind the exclusive write lock, off the async
754    // runtime (the engine is synchronous and CPU/IO-bound). The single writer is
755    // unchanged from the prior single-mutex model (ADR-0006/0057).
756    async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
757    where
758        T: Send + 'static,
759        F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
760    {
761        let db = Arc::clone(&self.db);
762        tokio::task::spawn_blocking(move || -> Result<T, Error> {
763            let mut guard = db
764                .write()
765                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
766            f(&mut guard).map_err(Error::Engine)
767        })
768        .await
769        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
770    }
771
772    // Run a **read-only** engine op behind the shared read lock — many of these run
773    // concurrently (ADR-0057). The closure gets `&Database`, so it can only call the
774    // `&self` reads (`*_snapshot`, `fetch`, accessors).
775    async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
776    where
777        T: Send + 'static,
778        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
779    {
780        let db = Arc::clone(&self.db);
781        tokio::task::spawn_blocking(move || -> Result<T, Error> {
782            let guard = db
783                .read()
784                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
785            f(&guard).map_err(Error::Engine)
786        })
787        .await
788        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
789    }
790
791    // Run a snapshot read for `collection` (ADR-0057/0062). Takes only the shared
792    // read lock, so concurrent searches run in parallel. When a prior write deferred
793    // this collection's rebuild, the snapshot read serves the **prior** snapshot
794    // (snapshot-isolated, slightly stale) rather than blocking, and we schedule an
795    // **off-lock** rebuild so the next read is fresh — the reader never waits on a
796    // rebuild under the exclusive lock.
797    async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
798    where
799        T: Send + 'static,
800        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
801    {
802        let db = Arc::clone(&self.db);
803        let cells = Arc::clone(&self.snapshot_cells);
804        let coll = collection.clone();
805        let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
806            let guard = db
807                .read()
808                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
809            let result = f(&guard).map_err(Error::Engine)?;
810            // Report staleness so the caller can schedule a background rebuild; a
811            // missing collection (raced a drop) is simply "nothing to rebuild".
812            let stale = guard.needs_rebuild(&coll).unwrap_or(false);
813            // Warm the lock-free read cache (ADR-0064 increment 3): once an
814            // MVCC-served collection is fresh (its base snapshot is published), cache
815            // its cell so subsequent pure-vector reads skip the lock. The cell
816            // self-updates as the writer republishes, so it is never re-fetched.
817            if !stale
818                && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
819                && let Ok(mut map) = cells.write()
820            {
821                map.entry(coll.clone()).or_insert(cell);
822            }
823            Ok((result, stale))
824        })
825        .await
826        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
827        if stale {
828            self.schedule_rebuild(collection);
829        }
830        Ok(result)
831    }
832
833    // The cached lock-free snapshot cell for `collection`, if one has been warmed
834    // (ADR-0064 increment 3). The map read contends only with create/drop, never
835    // with a data write — that is what makes the pure-vector read lock-free.
836    fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
837        self.snapshot_cells.read().ok()?.get(collection).cloned()
838    }
839
840    // After an MVCC-mode data write, schedule an off-lock consolidation rebuild if
841    // the write left the overlay stale (ADR-0064 increment 3): pure-vector fast-path
842    // reads bypass the reader-driven scheduler, so the writer drives consolidation.
843    // A brief read-lock staleness check; the rebuild itself runs off-lock and
844    // single-flighted. No-op (and not called) when MVCC is off.
845    async fn schedule_if_stale(&self, collection: &str) {
846        let db = Arc::clone(&self.db);
847        let coll = collection.to_owned();
848        let stale = tokio::task::spawn_blocking(move || {
849            db.read()
850                .ok()
851                .and_then(|g| g.needs_rebuild(&coll).ok())
852                .unwrap_or(false)
853        })
854        .await
855        .unwrap_or(false);
856        if stale {
857            self.schedule_rebuild(collection.to_owned());
858        }
859    }
860
861    // Schedule a single off-lock rebuild for `collection` (ADR-0062), deduplicated
862    // so concurrent readers that all observe the same deferred rebuild start exactly
863    // one. A no-op if one is already in flight.
864    fn schedule_rebuild(&self, collection: String) {
865        {
866            let mut inflight = match self.rebuilding.lock() {
867                Ok(g) => g,
868                Err(_) => return,
869            };
870            if !inflight.insert(collection.clone()) {
871                return; // already rebuilding this collection
872            }
873        }
874        let state = self.clone();
875        tokio::spawn(async move {
876            state.run_rebuild(&collection).await;
877            if let Ok(mut inflight) = state.rebuilding.lock() {
878                inflight.remove(&collection);
879            }
880        });
881    }
882
883    // Drive a collection's off-lock rebuild to completion (ADR-0062): capture the
884    // inputs under the shared read lock, build with no lock held, install under a
885    // brief write lock, and repeat while a write landed during the build (the commit
886    // reports the collection still stale). Each phase is a `spawn_blocking` so the
887    // CPU-bound build never stalls the async runtime; errors end the attempt (the
888    // collection stays stale and the next read reschedules).
889    async fn run_rebuild(&self, collection: &str) {
890        loop {
891            let db = Arc::clone(&self.db);
892            let coll = collection.to_owned();
893            let inputs = tokio::task::spawn_blocking(move || {
894                let guard = db.read().ok()?;
895                guard.snapshot_rebuild_inputs(&coll).ok().flatten()
896            })
897            .await
898            .ok()
899            .flatten();
900            let Some(inputs) = inputs else { return };
901
902            let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
903                return;
904            };
905
906            let db = Arc::clone(&self.db);
907            let still_stale = tokio::task::spawn_blocking(move || {
908                let mut guard = db.write().ok()?;
909                guard.commit_rebuild(rebuilt).ok()
910            })
911            .await
912            .ok()
913            .flatten();
914            match still_stale {
915                Some(true) => continue, // a write landed during the build — rebuild again
916                _ => return,
917            }
918        }
919    }
920
921    // Authorize `action` on `resource`, recording a denial in the audit log
922    // before propagating it. The shared choke point for both transports.
923    fn authorize(
924        &self,
925        principal: &Principal,
926        action: Action,
927        op: &str,
928        resource: &str,
929    ) -> Result<(), Error> {
930        principal
931            .require(action, Some(resource))
932            .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
933    }
934
935    // Authorize a collection-agnostic operation (listing): only the role is
936    // checked; a denial is recorded against the `*` resource.
937    fn authorize_global(
938        &self,
939        principal: &Principal,
940        action: Action,
941        op: &str,
942    ) -> Result<(), Error> {
943        principal
944            .require(action, None)
945            .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
946    }
947
948    /// Open a replication stream (ADR-0030): authorize (admin), then — in a single
949    /// engine critical section so no commit can interleave — subscribe to the live
950    /// commit tail and snapshot current state. The caller streams the snapshot,
951    /// then the tail from the receiver; because the subscription is taken under the
952    /// same lock as the snapshot, every post-snapshot op arrives on the receiver
953    /// and none is missed or duplicated.
954    pub(crate) async fn open_replication(
955        &self,
956        principal: &Principal,
957    ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
958        self.authorize_global(principal, Action::Admin, "replicate")?;
959        let tx = self.replication_tx.clone();
960        self.read_blocking(move |db| {
961            let rx = tx.subscribe();
962            let snapshot = db.replication_snapshot()?;
963            Ok((snapshot, rx))
964        })
965        .await
966    }
967
968    /// Apply a replicated op received from the leader (ADR-0030). Internal to the
969    /// follower stream — deliberately NOT gated by `read_only`, which only refuses
970    /// *external* client writes.
971    pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
972        self.write_blocking(move |db| db.apply_replicated(op)).await
973    }
974
975    // Reject a write up front if this node is not its shard's Raft leader
976    // (ADR-0067), returning the leader's URL so a router redirects. Checking
977    // before preparing the op avoids a wasted local prepare on a follower (and a
978    // follower's stale local state silently producing a no-op). A leadership change
979    // between this check and the proposal is still caught: `client_write` then
980    // returns `ForwardToLeader`, mapped by `map_client_write_err`.
981    #[cfg(feature = "raft")]
982    fn ensure_raft_leader(&self, rs: &raft::RaftShard) -> Result<(), Error> {
983        let leader = rs.raft.metrics().borrow().current_leader;
984        if leader == Some(rs.node_id) {
985            Ok(())
986        } else {
987            Err(Error::NotLeader {
988                leader: leader.and_then(|id| rs.member_url(id)),
989            })
990        }
991    }
992
993    /// Add a voter to this node's Raft shard at runtime (ADR-0067 increment 4c,
994    /// admin only). Returns a bad-request error if this node is not a Raft shard or
995    /// the server was built without the `raft` feature.
996    pub(crate) async fn raft_add_voter(
997        &self,
998        principal: &Principal,
999        id: u64,
1000        url: String,
1001    ) -> Result<(), Error> {
1002        self.authorize_global(principal, Action::Admin, "raft_add_voter")?;
1003        #[cfg(feature = "raft")]
1004        {
1005            let rs = self
1006                .raft
1007                .clone()
1008                .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1009            return rs
1010                .add_voter(id, url)
1011                .await
1012                .map_err(|e| Error::Internal(format!("add raft voter: {e}")));
1013        }
1014        #[cfg(not(feature = "raft"))]
1015        {
1016            let _ = (id, url);
1017            Err(Error::BadRequest(
1018                "server built without the raft feature".to_owned(),
1019            ))
1020        }
1021    }
1022
1023    /// Remove a voter from this node's Raft shard at runtime (ADR-0067 increment 4c,
1024    /// admin only).
1025    pub(crate) async fn raft_remove_voter(
1026        &self,
1027        principal: &Principal,
1028        id: u64,
1029    ) -> Result<(), Error> {
1030        self.authorize_global(principal, Action::Admin, "raft_remove_voter")?;
1031        #[cfg(feature = "raft")]
1032        {
1033            let rs = self
1034                .raft
1035                .clone()
1036                .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1037            return rs
1038                .remove_voter(id)
1039                .await
1040                .map_err(|e| Error::Internal(format!("remove raft voter: {e}")));
1041        }
1042        #[cfg(not(feature = "raft"))]
1043        {
1044            let _ = id;
1045            Err(Error::BadRequest(
1046                "server built without the raft feature".to_owned(),
1047            ))
1048        }
1049    }
1050
1051    // Propose one prepared op through the shard's Raft group and wait for the
1052    // quorum commit + local apply (ADR-0067). The op is acknowledged only after it
1053    // is committed, so a leader failover never loses it.
1054    #[cfg(feature = "raft")]
1055    async fn raft_propose(&self, rs: &raft::RaftShard, op: WalOp) -> Result<(), Error> {
1056        rs.raft
1057            .client_write(op)
1058            .await
1059            .map(|_| ())
1060            .map_err(map_client_write_err)
1061    }
1062
1063    // Propose a batch of prepared ops in order, returning how many committed. Stops
1064    // at the first failure (e.g. a mid-batch leadership change) and surfaces it, so
1065    // a partial batch is reported honestly rather than as a silent success.
1066    #[cfg(feature = "raft")]
1067    async fn raft_propose_all(&self, rs: &raft::RaftShard, ops: Vec<WalOp>) -> Result<u64, Error> {
1068        let mut committed = 0u64;
1069        for op in ops {
1070            self.raft_propose(rs, op).await?;
1071            committed += 1;
1072        }
1073        Ok(committed)
1074    }
1075
1076    // Refuse a mutating operation on a read-only replication follower (ADR-0030);
1077    // its state is owned by the leader's stream, not by external clients.
1078    fn ensure_writable(&self, op: &str) -> Result<(), Error> {
1079        if self.read_only {
1080            return Err(Error::Forbidden(format!(
1081                "{op}: this node is a read-only replication follower"
1082            )));
1083        }
1084        Ok(())
1085    }
1086
1087    #[allow(clippy::too_many_arguments)]
1088    pub(crate) async fn create_collection(
1089        &self,
1090        principal: &Principal,
1091        name: String,
1092        dim: u32,
1093        metric: DistanceMetric,
1094        index: IndexSpec,
1095        filterable: Vec<FilterableField>,
1096        multivector: bool,
1097        vector_encryption: VectorEncryption,
1098    ) -> Result<CollectionInfo, Error> {
1099        self.ensure_writable("create_collection")?;
1100        self.authorize(principal, Action::Admin, "create_collection", &name)?;
1101        self.limits.check_dim(dim as usize)?;
1102        if let Some(c) = &self.cluster {
1103            return c
1104                .create_collection(
1105                    name,
1106                    dim,
1107                    metric,
1108                    index,
1109                    filterable,
1110                    multivector,
1111                    vector_encryption,
1112                )
1113                .await;
1114        }
1115        // Per-shard Raft write path (ADR-0067): the leader prepares the op (which
1116        // assigns the new collection id) and proposes it through consensus; every
1117        // voter applies it after the quorum commit. The `create_lock` serializes
1118        // concurrent creates so two cannot claim the same next id.
1119        #[cfg(feature = "raft")]
1120        if let Some(rs) = self.raft.clone() {
1121            self.ensure_raft_leader(&rs)?;
1122            let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1123                .with_index(index)
1124                .with_filterable(filterable.clone())
1125                .with_multivector(multivector)
1126                .with_vector_encryption(vector_encryption);
1127            let _guard = rs.create_lock.lock().await;
1128            let prep_name = name.clone();
1129            let result = match self
1130                .read_blocking(move |db| db.prepare_create_collection(&prep_name, &descriptor))
1131                .await
1132            {
1133                Ok(op) => self.raft_propose(&rs, op).await,
1134                Err(e) => Err(e),
1135            };
1136            self.audit.record(
1137                principal.actor(),
1138                "create_collection",
1139                &name,
1140                Outcome::of(&result),
1141            );
1142            result?;
1143            return Ok(CollectionInfo {
1144                name,
1145                dim,
1146                metric,
1147                count: 0,
1148                index,
1149                filterable,
1150                multivector,
1151                vector_encryption,
1152            });
1153        }
1154        let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1155            .with_index(index)
1156            .with_filterable(filterable.clone())
1157            .with_multivector(multivector)
1158            .with_vector_encryption(vector_encryption);
1159        let owned = name.clone();
1160        let result = self
1161            .write_blocking(move |db| db.create_collection(&owned, descriptor))
1162            .await;
1163        self.audit.record(
1164            principal.actor(),
1165            "create_collection",
1166            &name,
1167            Outcome::of(&result),
1168        );
1169        result?;
1170        Ok(CollectionInfo {
1171            name,
1172            dim,
1173            metric,
1174            count: 0,
1175            index,
1176            filterable,
1177            multivector,
1178            vector_encryption,
1179        })
1180    }
1181
1182    pub(crate) async fn get_collection(
1183        &self,
1184        principal: &Principal,
1185        name: String,
1186    ) -> Result<CollectionInfo, Error> {
1187        self.authorize(principal, Action::Read, "get_collection", &name)?;
1188        self.read_blocking(move |db| {
1189            let descriptor = db
1190                .descriptor(&name)
1191                .cloned()
1192                .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
1193            // A multi-vector collection reports its document count, not its
1194            // (much larger) token-row count.
1195            let count = if descriptor.multivector {
1196                db.document_count(&name)? as u64
1197            } else {
1198                db.len(&name)? as u64
1199            };
1200            Ok(CollectionInfo {
1201                name,
1202                dim: descriptor.dim,
1203                metric: descriptor.metric,
1204                count,
1205                index: descriptor.index,
1206                filterable: descriptor.filterable,
1207                multivector: descriptor.multivector,
1208                vector_encryption: descriptor.vector_encryption,
1209            })
1210        })
1211        .await
1212    }
1213
1214    pub(crate) async fn list_collections(
1215        &self,
1216        principal: &Principal,
1217    ) -> Result<Vec<CollectionInfo>, Error> {
1218        self.authorize_global(principal, Action::Read, "list_collections")?;
1219        let mut infos = self
1220            .read_blocking(|db| {
1221                let mut out = Vec::new();
1222                for name in db.collection_names() {
1223                    if let Some(descriptor) = db.descriptor(&name).cloned() {
1224                        let count = if descriptor.multivector {
1225                            db.document_count(&name)? as u64
1226                        } else {
1227                            db.len(&name)? as u64
1228                        };
1229                        out.push(CollectionInfo {
1230                            name,
1231                            dim: descriptor.dim,
1232                            metric: descriptor.metric,
1233                            count,
1234                            index: descriptor.index,
1235                            filterable: descriptor.filterable,
1236                            multivector: descriptor.multivector,
1237                            vector_encryption: descriptor.vector_encryption,
1238                        });
1239                    }
1240                }
1241                Ok(out)
1242            })
1243            .await?;
1244        // Never reveal collections outside the caller's scope.
1245        infos.retain(|info| principal.can_see(&info.name));
1246        Ok(infos)
1247    }
1248
1249    pub(crate) async fn delete_collection(
1250        &self,
1251        principal: &Principal,
1252        name: String,
1253    ) -> Result<bool, Error> {
1254        self.ensure_writable("delete_collection")?;
1255        self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1256        if let Some(c) = &self.cluster {
1257            return c.drop_collection(&name).await;
1258        }
1259        let resource = name.clone();
1260        let result = self
1261            .write_blocking(move |db| db.drop_collection(&name))
1262            .await;
1263        self.audit.record(
1264            principal.actor(),
1265            "delete_collection",
1266            &resource,
1267            Outcome::of(&result),
1268        );
1269        // Evict the dropped collection's cached lock-free cell (ADR-0064 increment 3)
1270        // so a later same-named collection never serves a stale cell.
1271        if matches!(result, Ok(true))
1272            && let Ok(mut map) = self.snapshot_cells.write()
1273        {
1274            map.remove(&resource);
1275        }
1276        result
1277    }
1278
1279    #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1280    pub(crate) async fn upsert(
1281        &self,
1282        principal: &Principal,
1283        collection: String,
1284        points: Vec<PointIn>,
1285    ) -> Result<u64, Error> {
1286        self.ensure_writable("upsert")?;
1287        self.authorize(principal, Action::Write, "upsert", &collection)?;
1288        self.limits.check_batch(points.len())?;
1289        for p in &points {
1290            self.limits.check_vector_len(p.vector.len())?;
1291            self.limits.check_payload(&p.payload)?;
1292        }
1293        if let Some(c) = &self.cluster {
1294            return c.upsert(&collection, points).await;
1295        }
1296        // Per-shard Raft write path (ADR-0067): prepare each point's op on the
1297        // leader, then propose each through consensus. One op per point keeps the
1298        // apply seam identical to a direct write; a batched Raft entry is a later
1299        // optimization (ponytail: correctness first, opt-in path).
1300        #[cfg(feature = "raft")]
1301        if let Some(rs) = self.raft.clone() {
1302            self.ensure_raft_leader(&rs)?;
1303            let prep = collection.clone();
1304            let result = match self
1305                .read_blocking(move |db| {
1306                    points
1307                        .iter()
1308                        .map(|p| db.prepare_upsert(&prep, &p.id, &p.vector, &p.payload))
1309                        .collect::<quiver_embed::Result<Vec<_>>>()
1310                })
1311                .await
1312            {
1313                Ok(ops) => self.raft_propose_all(&rs, ops).await,
1314                Err(e) => Err(e),
1315            };
1316            self.audit.record(
1317                principal.actor(),
1318                "upsert",
1319                &collection,
1320                Outcome::of(&result),
1321            );
1322            return result;
1323        }
1324        let resource = collection.clone();
1325        let result = self
1326            .write_blocking(move |db| {
1327                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1328                    .iter()
1329                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1330                    .collect();
1331                db.upsert_batch(&collection, &records)
1332            })
1333            .await;
1334        self.audit
1335            .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1336        if self.mvcc && result.is_ok() {
1337            self.schedule_if_stale(&resource).await;
1338        }
1339        result
1340    }
1341
1342    // Bulk upsert for a load-then-query workload (ADR-0045): one WAL fsync plus a
1343    // single deferred index-build pass, with the larger `max_bulk_batch_size` cap.
1344    pub(crate) async fn upsert_bulk(
1345        &self,
1346        principal: &Principal,
1347        collection: String,
1348        points: Vec<PointIn>,
1349    ) -> Result<u64, Error> {
1350        self.ensure_writable("upsert")?;
1351        self.authorize(principal, Action::Write, "upsert", &collection)?;
1352        self.limits.check_bulk_batch(points.len())?;
1353        for p in &points {
1354            self.limits.check_vector_len(p.vector.len())?;
1355            self.limits.check_payload(&p.payload)?;
1356        }
1357        if let Some(c) = &self.cluster {
1358            return c.upsert_bulk(&collection, points).await;
1359        }
1360        let resource = collection.clone();
1361        let result = self
1362            .write_blocking(move |db| {
1363                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1364                    .iter()
1365                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1366                    .collect();
1367                db.upsert_bulk(&collection, &records)
1368            })
1369            .await;
1370        self.audit.record(
1371            principal.actor(),
1372            "upsert_bulk",
1373            &resource,
1374            Outcome::of(&result),
1375        );
1376        if self.mvcc && result.is_ok() {
1377            self.schedule_if_stale(&resource).await;
1378        }
1379        result
1380    }
1381
1382    /// Take a consistent online snapshot of the whole database into a
1383    /// server-local `destination` directory (ADR-0050). A global admin
1384    /// operation; runs the checkpoint + copy on the blocking pool.
1385    #[tracing::instrument(skip_all)]
1386    pub(crate) async fn snapshot(
1387        &self,
1388        principal: &Principal,
1389        destination: String,
1390    ) -> Result<SnapshotInfo, Error> {
1391        self.ensure_writable("snapshot")?;
1392        self.authorize_global(principal, Action::Admin, "snapshot")?;
1393        let dest = std::path::PathBuf::from(&destination);
1394        let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1395        self.audit.record(
1396            principal.actor(),
1397            "snapshot",
1398            &destination,
1399            Outcome::of(&result),
1400        );
1401        result
1402    }
1403
1404    pub(crate) async fn delete_points(
1405        &self,
1406        principal: &Principal,
1407        collection: String,
1408        ids: Vec<String>,
1409    ) -> Result<u64, Error> {
1410        self.ensure_writable("delete_points")?;
1411        self.authorize(principal, Action::Write, "delete_points", &collection)?;
1412        if let Some(c) = &self.cluster {
1413            return c.delete_points(&collection, ids).await;
1414        }
1415        // Per-shard Raft write path (ADR-0067): prepare each delete on the leader
1416        // (`None` for an absent point), then propose the present ones through
1417        // consensus. The returned count is the number actually deleted.
1418        #[cfg(feature = "raft")]
1419        if let Some(rs) = self.raft.clone() {
1420            self.ensure_raft_leader(&rs)?;
1421            let prep = collection.clone();
1422            let result = match self
1423                .read_blocking(move |db| {
1424                    ids.iter()
1425                        .map(|id| db.prepare_delete(&prep, id))
1426                        .collect::<quiver_embed::Result<Vec<Option<WalOp>>>>()
1427                })
1428                .await
1429            {
1430                Ok(ops) => {
1431                    self.raft_propose_all(&rs, ops.into_iter().flatten().collect())
1432                        .await
1433                }
1434                Err(e) => Err(e),
1435            };
1436            self.audit.record(
1437                principal.actor(),
1438                "delete_points",
1439                &collection,
1440                Outcome::of(&result),
1441            );
1442            return result;
1443        }
1444        let resource = collection.clone();
1445        let result = self
1446            .write_blocking(move |db| {
1447                let mut count = 0u64;
1448                for id in &ids {
1449                    if db.delete(&collection, id)? {
1450                        count += 1;
1451                    }
1452                }
1453                Ok(count)
1454            })
1455            .await;
1456        self.audit.record(
1457            principal.actor(),
1458            "delete_points",
1459            &resource,
1460            Outcome::of(&result),
1461        );
1462        if self.mvcc && result.is_ok() {
1463            self.schedule_if_stale(&resource).await;
1464        }
1465        result
1466    }
1467
1468    pub(crate) async fn get_points(
1469        &self,
1470        principal: &Principal,
1471        collection: String,
1472        ids: Vec<String>,
1473        with_vector: bool,
1474    ) -> Result<Vec<PointOut>, Error> {
1475        self.authorize(principal, Action::Read, "get_points", &collection)?;
1476        if let Some(c) = &self.cluster {
1477            return c.get_points(&collection, ids, with_vector).await;
1478        }
1479        self.read_blocking(move |db| {
1480            let mut out = Vec::new();
1481            for id in &ids {
1482                if let Some(m) = db.get(&collection, id)? {
1483                    out.push(PointOut {
1484                        id: m.id,
1485                        vector: if with_vector { m.vector } else { None },
1486                        payload: m.payload.unwrap_or(Value::Null),
1487                    });
1488                }
1489            }
1490            Ok(out)
1491        })
1492        .await
1493    }
1494
1495    #[allow(clippy::too_many_arguments)]
1496    #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1497    pub(crate) async fn search(
1498        &self,
1499        principal: &Principal,
1500        collection: String,
1501        vector: Vec<f32>,
1502        k: usize,
1503        filter: Option<Filter>,
1504        ef_search: usize,
1505        with_payload: bool,
1506        with_vector: bool,
1507    ) -> Result<Vec<MatchOut>, Error> {
1508        self.authorize(principal, Action::Read, "search", &collection)?;
1509        self.limits.check_search(k, ef_search)?;
1510        self.limits.check_vector_len(vector.len())?;
1511
1512        // Cluster router (ADR-0065): scatter-gather across the shards, merge top-k.
1513        if let Some(c) = &self.cluster {
1514            return c
1515                .search(
1516                    &collection,
1517                    vector,
1518                    k,
1519                    filter,
1520                    ef_search,
1521                    with_payload,
1522                    with_vector,
1523                )
1524                .await;
1525        }
1526
1527        // Lock-free fast path (ADR-0064 increment 3): a **pure-vector** read of an
1528        // MVCC-served collection loads the cached snapshot cell and searches it with
1529        // **no database lock**, so it never blocks on a concurrent writer. Payload,
1530        // filter, and vector results need the store (behind the lock) and fall
1531        // through to `search_blocking`.
1532        if filter.is_none()
1533            && !with_payload
1534            && !with_vector
1535            && let Some(cell) = self.cached_cell(&collection)
1536        {
1537            return tokio::task::spawn_blocking(move || {
1538                let matches = cell.load().search(&vector, k, ef_search)?;
1539                Ok::<_, quiver_embed::Error>(
1540                    matches
1541                        .into_iter()
1542                        .map(|m| MatchOut {
1543                            id: m.id,
1544                            score: m.score,
1545                            payload: None,
1546                            vector: None,
1547                        })
1548                        .collect(),
1549                )
1550            })
1551            .await
1552            .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1553            .map_err(Error::Engine);
1554        }
1555
1556        let params = SearchParams {
1557            k,
1558            filter,
1559            ef_search,
1560            with_payload,
1561            with_vector,
1562        };
1563        let coll = collection.clone();
1564        self.search_blocking(coll, move |db| {
1565            let matches = db.search_snapshot(&collection, &vector, &params)?;
1566            Ok(matches
1567                .into_iter()
1568                .map(|m| MatchOut {
1569                    id: m.id,
1570                    score: m.score,
1571                    payload: m.payload,
1572                    vector: m.vector,
1573                })
1574                .collect())
1575        })
1576        .await
1577    }
1578
1579    #[allow(clippy::too_many_arguments)]
1580    pub(crate) async fn hybrid_search(
1581        &self,
1582        principal: &Principal,
1583        collection: String,
1584        dense: Option<Vec<f32>>,
1585        sparse: Option<(Vec<u32>, Vec<f32>)>,
1586        text: Option<String>,
1587        k: usize,
1588        filter: Option<Filter>,
1589        ef_search: usize,
1590        rrf_k0: f32,
1591        with_payload: bool,
1592        with_vector: bool,
1593    ) -> Result<Vec<MatchOut>, Error> {
1594        self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1595        self.limits.check_search(k, ef_search)?;
1596        if let Some(v) = &dense {
1597            self.limits.check_vector_len(v.len())?;
1598        }
1599        if let Some((indices, values)) = &sparse {
1600            self.limits.check_sparse_terms(indices.len())?;
1601            if indices.len() != values.len() {
1602                return Err(Error::BadRequest(format!(
1603                    "sparse query indices ({}) and values ({}) length mismatch",
1604                    indices.len(),
1605                    values.len()
1606                )));
1607            }
1608        }
1609        let params = SearchParams {
1610            k,
1611            filter,
1612            ef_search,
1613            with_payload,
1614            with_vector,
1615        };
1616        let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1617        let coll = collection.clone();
1618        self.search_blocking(coll, move |db| {
1619            let matches = db.hybrid_search_snapshot(
1620                &collection,
1621                dense.as_deref(),
1622                sv.as_ref(),
1623                text.as_deref(),
1624                &params,
1625                rrf_k0,
1626            )?;
1627            Ok(matches
1628                .into_iter()
1629                .map(|m| MatchOut {
1630                    id: m.id,
1631                    score: m.score,
1632                    payload: m.payload,
1633                    vector: m.vector,
1634                })
1635                .collect())
1636        })
1637        .await
1638    }
1639
1640    /// Embed `text` with the collection's configured provider and run a dense (or
1641    /// dense ⊕ BM25, if the collection has text) search, optionally reranking the
1642    /// candidates in one call (ADR-0047). The text is also passed to the BM25 side,
1643    /// so a `upsert_text` corpus yields hybrid lexical+semantic retrieval for free.
1644    #[allow(clippy::too_many_arguments)]
1645    pub(crate) async fn search_text(
1646        &self,
1647        principal: &Principal,
1648        collection: String,
1649        text: String,
1650        k: usize,
1651        filter: Option<Filter>,
1652        ef_search: usize,
1653        rrf_k0: f32,
1654        with_payload: bool,
1655        with_vector: bool,
1656        rerank: bool,
1657    ) -> Result<Vec<MatchOut>, Error> {
1658        self.authorize(principal, Action::Read, "search_text", &collection)?;
1659        self.limits.check_search(k, ef_search)?;
1660        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1661            Error::BadRequest(format!(
1662                "collection {collection:?} has no embedding provider configured \
1663                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1664            ))
1665        })?;
1666        // Embed off the async runtime: the provider call is blocking network I/O.
1667        let query = text.clone();
1668        let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1669            .await
1670            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1671            .map_err(|e| Error::Upstream(e.to_string()))?
1672            .into_iter()
1673            .next()
1674            .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1675        self.limits.check_vector_len(vector.len())?;
1676
1677        let reranker = if rerank {
1678            self.embed.reranker(&collection)
1679        } else {
1680            None
1681        };
1682        // Over-fetch when reranking so the reranker can reorder a wide candidate set
1683        // down to the requested `k`; fetch payloads when reranking (we need the doc
1684        // text) even if the caller did not ask for them.
1685        let need_payload = with_payload || reranker.is_some();
1686        let fetch_k = if reranker.is_some() {
1687            k.max(RERANK_CANDIDATES)
1688        } else {
1689            k
1690        };
1691
1692        let mut hits = self
1693            .hybrid_search(
1694                principal,
1695                collection,
1696                Some(vector),
1697                None,
1698                Some(text.clone()),
1699                fetch_k,
1700                filter,
1701                ef_search,
1702                rrf_k0,
1703                need_payload,
1704                with_vector,
1705            )
1706            .await?;
1707
1708        if let Some(rr) = reranker {
1709            let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1710            let query = text;
1711            let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1712                .await
1713                .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1714                .map_err(|e| Error::Upstream(e.to_string()))?;
1715            // Re-score each hit and sort by the rerank score, descending.
1716            let mut scored: Vec<(f32, MatchOut)> = scores
1717                .into_iter()
1718                .zip(hits)
1719                .map(|(s, mut h)| {
1720                    h.score = s;
1721                    (s, h)
1722                })
1723                .collect();
1724            scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1725            hits = scored.into_iter().map(|(_, h)| h).collect();
1726        }
1727
1728        hits.truncate(k);
1729        // Drop payloads we only fetched for reranking if the caller didn't want them.
1730        if !with_payload {
1731            for h in &mut hits {
1732                h.payload = None;
1733            }
1734        }
1735        Ok(hits)
1736    }
1737
1738    /// Embed each point's `text` with the collection's provider and upsert it as a
1739    /// dense point, co-populating the `__quiver_text__` payload key (ADR-0046) so the
1740    /// same text is indexed for BM25 — one call feeds both the dense and lexical
1741    /// sides (ADR-0047).
1742    pub(crate) async fn upsert_text(
1743        &self,
1744        principal: &Principal,
1745        collection: String,
1746        points: Vec<TextPointIn>,
1747    ) -> Result<u64, Error> {
1748        self.ensure_writable("upsert_text")?;
1749        self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1750        self.limits.check_batch(points.len())?;
1751        for p in &points {
1752            if !matches!(p.payload, Value::Object(_) | Value::Null) {
1753                return Err(Error::BadRequest(
1754                    "upsert_text payload must be a JSON object or null".to_owned(),
1755                ));
1756            }
1757        }
1758        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1759            Error::BadRequest(format!(
1760                "collection {collection:?} has no embedding provider configured \
1761                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1762            ))
1763        })?;
1764        let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1765        let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1766            .await
1767            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1768            .map_err(|e| Error::Upstream(e.to_string()))?;
1769        if vectors.len() != points.len() {
1770            return Err(Error::Upstream(format!(
1771                "embedding provider returned {} vectors for {} inputs",
1772                vectors.len(),
1773                points.len()
1774            )));
1775        }
1776        let dense: Vec<PointIn> = points
1777            .into_iter()
1778            .zip(vectors)
1779            .map(|(p, vector)| {
1780                let mut payload = match p.payload {
1781                    Value::Object(map) => map,
1782                    _ => serde_json::Map::new(),
1783                };
1784                // Don't clobber a caller-supplied text key.
1785                payload
1786                    .entry(TEXT_KEY.to_owned())
1787                    .or_insert_with(|| Value::String(p.text.clone()));
1788                PointIn {
1789                    id: p.id,
1790                    vector,
1791                    payload: Value::Object(payload),
1792                }
1793            })
1794            .collect();
1795        self.upsert(principal, collection, dense).await
1796    }
1797
1798    #[allow(clippy::too_many_arguments)]
1799    pub(crate) async fn fetch(
1800        &self,
1801        principal: &Principal,
1802        collection: String,
1803        filter: Option<Filter>,
1804        offset: usize,
1805        limit: usize,
1806        with_payload: bool,
1807        with_vector: bool,
1808    ) -> Result<Vec<MatchOut>, Error> {
1809        self.authorize(principal, Action::Read, "fetch", &collection)?;
1810        self.limits.check_fetch(limit)?;
1811        self.read_blocking(move |db| {
1812            let matches = db.fetch(
1813                &collection,
1814                filter.as_ref(),
1815                offset,
1816                limit,
1817                with_payload,
1818                with_vector,
1819            )?;
1820            Ok(matches
1821                .into_iter()
1822                .map(|m| MatchOut {
1823                    id: m.id,
1824                    score: m.score,
1825                    payload: m.payload,
1826                    vector: m.vector,
1827                })
1828                .collect())
1829        })
1830        .await
1831    }
1832
1833    pub(crate) async fn upsert_documents(
1834        &self,
1835        principal: &Principal,
1836        collection: String,
1837        documents: Vec<DocumentIn>,
1838    ) -> Result<u64, Error> {
1839        self.ensure_writable("upsert_documents")?;
1840        self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1841        self.limits.check_batch(documents.len())?;
1842        for doc in &documents {
1843            self.limits.check_payload(&doc.payload)?;
1844            for token in &doc.vectors {
1845                self.limits.check_vector_len(token.len())?;
1846            }
1847        }
1848        let resource = collection.clone();
1849        let result = self
1850            .write_blocking(move |db| {
1851                let mut count = 0u64;
1852                for doc in &documents {
1853                    db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1854                    count += 1;
1855                }
1856                Ok(count)
1857            })
1858            .await;
1859        self.audit.record(
1860            principal.actor(),
1861            "upsert_documents",
1862            &resource,
1863            Outcome::of(&result),
1864        );
1865        result
1866    }
1867
1868    pub(crate) async fn delete_documents(
1869        &self,
1870        principal: &Principal,
1871        collection: String,
1872        ids: Vec<String>,
1873    ) -> Result<u64, Error> {
1874        self.ensure_writable("delete_documents")?;
1875        self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1876        let resource = collection.clone();
1877        let result = self
1878            .write_blocking(move |db| {
1879                let mut count = 0u64;
1880                for id in &ids {
1881                    if db.delete_document(&collection, id)? {
1882                        count += 1;
1883                    }
1884                }
1885                Ok(count)
1886            })
1887            .await;
1888        self.audit.record(
1889            principal.actor(),
1890            "delete_documents",
1891            &resource,
1892            Outcome::of(&result),
1893        );
1894        result
1895    }
1896
1897    #[allow(clippy::too_many_arguments)]
1898    pub(crate) async fn search_multi_vector(
1899        &self,
1900        principal: &Principal,
1901        collection: String,
1902        query: Vec<Vec<f32>>,
1903        k: usize,
1904        filter: Option<Filter>,
1905        ef_search: usize,
1906        with_payload: bool,
1907        with_vector: bool,
1908    ) -> Result<Vec<DocumentMatchOut>, Error> {
1909        self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1910        self.limits.check_search(k, ef_search)?;
1911        for token in &query {
1912            self.limits.check_vector_len(token.len())?;
1913        }
1914        let params = SearchParams {
1915            k,
1916            filter,
1917            ef_search,
1918            with_payload,
1919            with_vector,
1920        };
1921        let coll = collection.clone();
1922        self.search_blocking(coll, move |db| {
1923            let matches = db.search_multi_vector_snapshot(&collection, &query, &params)?;
1924            Ok(matches
1925                .into_iter()
1926                .map(|m| DocumentMatchOut {
1927                    id: m.id,
1928                    score: m.score,
1929                    payload: m.payload,
1930                    vectors: m.vectors,
1931                })
1932                .collect())
1933        })
1934        .await
1935    }
1936}
1937
1938/// How many recently-committed ops the leader buffers for replication followers
1939/// (ADR-0030). A follower that falls further behind than this re-bootstraps.
1940const REPLICATION_BUFFER: usize = 1024;
1941
1942/// Run the server from `config` until a shutdown signal (Ctrl-C).
1943pub async fn run(config: Config) -> Result<(), Error> {
1944    config.validate()?;
1945    let rest_listener = TcpListener::bind(config.rest_addr)
1946        .await
1947        .map_err(Error::Io)?;
1948    let grpc_listener = TcpListener::bind(config.grpc_addr)
1949        .await
1950        .map_err(Error::Io)?;
1951    tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1952    tokio::select! {
1953        result = serve(config, rest_listener, grpc_listener) => result,
1954        () = shutdown_signal() => {
1955            tracing::info!("shutdown signal received");
1956            Ok(())
1957        }
1958    }
1959}
1960
1961/// Serve REST and gRPC on the given (already-bound) listeners until a transport
1962/// error. Exposed so tests can bind ephemeral ports.
1963pub async fn serve(
1964    config: Config,
1965    rest_listener: TcpListener,
1966    grpc_listener: TcpListener,
1967) -> Result<(), Error> {
1968    // Coordinator mode (ADR-0066) is a wholly separate, data-plane-free service: it
1969    // owns the versioned shard map and serves the membership API on the REST
1970    // listener. It opens no database and runs no gRPC.
1971    if config.coordinator {
1972        drop(grpc_listener);
1973        return coordinator::serve_coordinator(config, rest_listener).await;
1974    }
1975    let mut db = open_database(&config)?;
1976    let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1977    // Publish every committed op to replication followers (ADR-0030). The observer
1978    // runs inside the engine's write critical section; `broadcast::Sender::send` is
1979    // non-blocking, so it never stalls a write.
1980    let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1981    {
1982        let tx = replication_tx.clone();
1983        db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1984            let _ = tx.send(entry.clone());
1985        }));
1986    }
1987    // Build the opt-in embedding/rerank providers, resolving each `api_key_env`
1988    // from the environment now (ADR-0047) so a missing key fails fast at startup
1989    // rather than on the first request.
1990    let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1991        .map_err(|e| Error::Config(e.to_string()))?;
1992
1993    // Enable lock-free MVCC reads if requested (ADR-0064): `config.mvcc_reads` or
1994    // `QUIVER_MVCC_READS` (the latter also defaults the engine flag at open).
1995    if config.mvcc_reads {
1996        db.set_mvcc_reads(true);
1997    }
1998    let mvcc = db.mvcc_reads();
1999
2000    // Opt-in cluster router (ADR-0065): build the shard fan-out when configured.
2001    let cluster = if config.cluster_shards.is_empty() {
2002        None
2003    } else {
2004        let c = cluster::Cluster::new(
2005            config.cluster_shards.clone(),
2006            config.cluster_replicas.clone(),
2007            config.cluster_shard_key.clone(),
2008        )?;
2009        tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
2010        let c = Arc::new(c);
2011        // With a coordinator (ADR-0066), refresh the shard map from it so a
2012        // membership change propagates without a restart. The seed above is the
2013        // bootstrap map; the coordinator's map (≥ version 0) takes over on refresh.
2014        if let Some(coord) = config.coordinator_url.clone() {
2015            let router = c.clone();
2016            tokio::spawn(async move {
2017                loop {
2018                    if let Err(e) = router.refresh_from(&coord).await {
2019                        tracing::debug!(error = %e, "shard-map refresh failed; will retry");
2020                    }
2021                    tokio::time::sleep(cluster::MAP_REFRESH_INTERVAL).await;
2022                }
2023            });
2024        }
2025        Some(c)
2026    };
2027
2028    // The shared engine handle. Wrapped before `AppState` so the Raft applier can
2029    // hold the same handle (it drives `apply_replicated` on commit) without
2030    // referencing `AppState` back — see `raft::EngineApplier`.
2031    let db = Arc::new(RwLock::new(db));
2032
2033    // Opt-in per-shard Raft write HA (ADR-0067): when this node has a Raft id, join
2034    // its shard's group over the gRPC transport. The applier holds `db` directly,
2035    // so the state machine does not reference the Raft group it lives in.
2036    #[cfg(feature = "raft")]
2037    let raft = if let Some(node_id) = config.raft_node_id {
2038        let members = parse_raft_members(&config.raft_members)?;
2039        let applier = raft::EngineApplier::new(Arc::clone(&db));
2040        // The Raft log persists under the data dir, beside the engine (ADR-0067 4c).
2041        let log_dir = config.data_dir.join("raft");
2042        let shard = raft::start_member(node_id, members, applier, &log_dir)
2043            .await
2044            .map_err(|e| Error::Config(format!("raft startup: {e}")))?;
2045        tracing::info!(node_id, "per-shard raft write HA enabled");
2046        Some(Arc::new(shard))
2047    } else {
2048        None
2049    };
2050
2051    let state = AppState {
2052        db,
2053        keys: Arc::new(config.api_keys.clone()),
2054        audit,
2055        replication_tx,
2056        read_only: config.leader_url.is_some(),
2057        limits: config.limits,
2058        embed: Arc::new(embed),
2059        rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
2060        metrics: Arc::new(metrics::Metrics::default()),
2061        rebuilding: Arc::new(Mutex::new(HashSet::new())),
2062        snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
2063        mvcc,
2064        cluster,
2065        #[cfg(feature = "raft")]
2066        raft,
2067    };
2068
2069    // A follower continuously applies the leader's committed-op stream (ADR-0030).
2070    if let Some(leader_url) = config.leader_url.clone() {
2071        replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
2072    }
2073
2074    let app = rest::router(state.clone());
2075    // Mount the per-shard Raft transport on the same gRPC server (ADR-0067): build
2076    // it before `state` is consumed by `grpc::service`.
2077    #[cfg(feature = "raft")]
2078    let raft_service = state
2079        .raft
2080        .as_ref()
2081        .map(|rs| raft::grpc::RaftRpc::service(rs.raft.clone()));
2082    let grpc = grpc::service(state);
2083
2084    let tls = load_tls(&config)?;
2085
2086    // REST: terminate TLS with axum-server when configured, else serve plaintext.
2087    let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
2088        Some(material) => {
2089            let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
2090            let std_listener = rest_listener.into_std().map_err(Error::Io)?;
2091            let server =
2092                axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
2093            Box::pin(async move {
2094                server
2095                    .serve(app.into_make_service())
2096                    .await
2097                    .map_err(Error::Io)
2098            })
2099        }
2100        None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
2101    };
2102
2103    // gRPC: tonic terminates TLS itself (ring provider) when an identity is set.
2104    let mut grpc_builder = tonic::transport::Server::builder();
2105    if let Some(material) = &tls {
2106        let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
2107        let mut tls_config = ServerTlsConfig::new().identity(identity);
2108        // Require client certificates chaining to the configured CA (mTLS).
2109        if let Some(ca_pem) = &material.client_ca_pem {
2110            tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
2111        }
2112        grpc_builder = grpc_builder
2113            .tls_config(tls_config)
2114            .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
2115    }
2116    let grpc_fut = async move {
2117        let routes = grpc_builder.add_service(grpc);
2118        // Add the Raft service when this node runs a Raft group; a no-op otherwise.
2119        #[cfg(feature = "raft")]
2120        let routes = routes.add_optional_service(raft_service);
2121        routes
2122            .serve_with_incoming(TcpListenerStream::new(grpc_listener))
2123            .await
2124            .map_err(|e| Error::Internal(format!("grpc server: {e}")))
2125    };
2126
2127    tokio::try_join!(rest_fut, grpc_fut)?;
2128    Ok(())
2129}
2130
2131async fn shutdown_signal() {
2132    let _ = tokio::signal::ctrl_c().await;
2133}
2134
2135// Parse `["<id>=<url>", …]` Raft member entries into an id→URL map (ADR-0067).
2136// Mirrors the `cluster_replicas` `<idx>=<url>` form; the bracketed
2137// `QUIVER_RAFT_MEMBERS` env array reaches here as these strings. A node opting
2138// into Raft must name a non-empty member set.
2139#[cfg(feature = "raft")]
2140fn parse_raft_members(
2141    entries: &[String],
2142) -> Result<std::collections::BTreeMap<u64, String>, Error> {
2143    let mut members = std::collections::BTreeMap::new();
2144    for entry in entries {
2145        let (id, url) = entry
2146            .split_once('=')
2147            .ok_or_else(|| Error::Config(format!("raft member '{entry}' must be '<id>=<url>'")))?;
2148        let id: u64 = id
2149            .trim()
2150            .parse()
2151            .map_err(|_| Error::Config(format!("raft member id '{id}' is not a number")))?;
2152        members.insert(id, url.trim().to_owned());
2153    }
2154    if members.is_empty() {
2155        return Err(Error::Config(
2156            "raft_node_id is set but raft_members is empty".to_owned(),
2157        ));
2158    }
2159    Ok(members)
2160}
2161
2162// Map an openraft `client_write` failure to a server [`Error`]. A `ForwardToLeader`
2163// (this node is not the leader) becomes a [`Error::NotLeader`] carrying the
2164// leader's URL so a router redirects; anything else is an internal fault.
2165#[cfg(feature = "raft")]
2166fn map_client_write_err(
2167    err: openraft::error::RaftError<
2168        u64,
2169        openraft::error::ClientWriteError<u64, openraft::BasicNode>,
2170    >,
2171) -> Error {
2172    use openraft::error::{ClientWriteError, RaftError};
2173    match err {
2174        RaftError::APIError(ClientWriteError::ForwardToLeader(f)) => Error::NotLeader {
2175            leader: f.leader_node.map(|n| n.addr),
2176        },
2177        other => Error::Internal(format!("raft write failed: {other}")),
2178    }
2179}
2180
2181// Open the engine, enabling encryption-at-rest when a key is configured. The
2182// configured key is the **master key** of an envelope key-ring (ADR-0010): it
2183// wraps a per-collection data-encryption key, so dropping a collection
2184// crypto-shreds it. With no key (only valid in `insecure` mode, enforced by
2185// `Config::validate`) the engine is opened in plaintext.
2186fn open_database(config: &Config) -> Result<Database, Error> {
2187    let master_key = config.master_key_hex()?;
2188    let keyring =
2189        quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
2190            .map_err(|e| Error::Config(e.to_string()))?;
2191    let db = match keyring {
2192        Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
2193        None => Database::open(&config.data_dir)?,
2194    };
2195    Ok(db)
2196}
2197
2198// TLS material shared by both transports: the raw PEM (for tonic's `Identity`
2199// and `client_ca_root`) and a parsed rustls server config (for axum-server's
2200// REST acceptor). `client_ca_pem` is set only when mutual TLS is configured.
2201struct TlsMaterial {
2202    cert_pem: Vec<u8>,
2203    key_pem: Vec<u8>,
2204    client_ca_pem: Option<Vec<u8>>,
2205    rest_config: Arc<rustls::ServerConfig>,
2206}
2207
2208// Read the configured certificate, key, and optional client CA, returning `None`
2209// when TLS is not configured. `Config::validate` already enforces that the cert
2210// and key are set together, that a client CA requires them, and that a
2211// non-loopback bind requires TLS.
2212fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
2213    match (&config.tls_cert, &config.tls_key) {
2214        (Some(cert_path), Some(key_path)) => {
2215            let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
2216            let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
2217            let client_ca_pem = config
2218                .tls_client_ca
2219                .as_ref()
2220                .map(std::fs::read)
2221                .transpose()
2222                .map_err(Error::Io)?;
2223            let rest_config = Arc::new(rustls_server_config(
2224                &cert_pem,
2225                &key_pem,
2226                client_ca_pem.as_deref(),
2227            )?);
2228            Ok(Some(TlsMaterial {
2229                cert_pem,
2230                key_pem,
2231                client_ca_pem,
2232                rest_config,
2233            }))
2234        }
2235        (None, None) => Ok(None),
2236        _ => Err(Error::Config(
2237            "tls_cert and tls_key must be set together".to_owned(),
2238        )),
2239    }
2240}
2241
2242// Build a rustls server config from PEM bytes over the audited `ring` provider
2243// (no OpenSSL, no aws-lc-rs C toolchain). TLS 1.3 and 1.2 are offered. When a
2244// client CA is supplied, client certificates chaining to it are required (mTLS).
2245fn rustls_server_config(
2246    cert_pem: &[u8],
2247    key_pem: &[u8],
2248    client_ca_pem: Option<&[u8]>,
2249) -> Result<rustls::ServerConfig, Error> {
2250    use rustls_pki_types::pem::PemObject;
2251    use rustls_pki_types::{CertificateDer, PrivateKeyDer};
2252
2253    let certs = CertificateDer::pem_slice_iter(cert_pem)
2254        .collect::<std::result::Result<Vec<_>, _>>()
2255        .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
2256    if certs.is_empty() {
2257        return Err(Error::Config(
2258            "tls_cert contains no certificates".to_owned(),
2259        ));
2260    }
2261    let key = PrivateKeyDer::from_pem_slice(key_pem)
2262        .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
2263    let provider = Arc::new(rustls::crypto::ring::default_provider());
2264    let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
2265        .with_safe_default_protocol_versions()
2266        .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
2267    let builder = match client_ca_pem {
2268        Some(ca_pem) => {
2269            let mut roots = rustls::RootCertStore::empty();
2270            for cert in CertificateDer::pem_slice_iter(ca_pem) {
2271                let cert =
2272                    cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
2273                roots
2274                    .add(cert)
2275                    .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
2276            }
2277            let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
2278                Arc::new(roots),
2279                provider,
2280            )
2281            .build()
2282            .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
2283            builder.with_client_cert_verifier(verifier)
2284        }
2285        None => builder.with_no_client_auth(),
2286    };
2287    builder
2288        .with_single_cert(certs, key)
2289        .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
2290}
2291
2292/// Initialize structured logging from `RUST_LOG` (defaulting to `info`). Safe to
2293/// call once at startup; a second call is ignored.
2294pub fn init_tracing() {
2295    init_observability(&Config::default());
2296}
2297
2298/// Install the global tracing subscriber: an `RUST_LOG`-driven `fmt` layer plus,
2299/// when the `otlp` feature is built **and** an OTLP endpoint is configured
2300/// (ADR-0059), an OpenTelemetry traces export layer. Safe to call once at
2301/// startup; a second call is a no-op. A failure to build the OTLP exporter logs a
2302/// warning and falls back to `fmt`-only rather than taking the server down.
2303#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
2304pub fn init_observability(config: &Config) {
2305    use tracing_subscriber::EnvFilter;
2306    use tracing_subscriber::prelude::*;
2307    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
2308    let registry = tracing_subscriber::registry()
2309        .with(filter)
2310        .with(tracing_subscriber::fmt::layer());
2311
2312    #[cfg(feature = "otlp")]
2313    if config.otlp.is_enabled() {
2314        match otlp::build_provider(&config.otlp) {
2315            Ok(provider) => {
2316                use opentelemetry::trace::TracerProvider as _;
2317                let tracer = provider.tracer("quiver");
2318                otlp::store_provider(provider);
2319                let _ = registry
2320                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
2321                    .try_init();
2322                return;
2323            }
2324            Err(e) => eprintln!("OTLP traces export disabled: {e}"),
2325        }
2326    }
2327
2328    let _ = registry.try_init();
2329}
2330
2331/// Flush and shut down the OpenTelemetry exporter, if one was installed. A no-op
2332/// without the `otlp` feature or when no endpoint was configured. Call once on
2333/// server shutdown so batched spans are not lost.
2334pub fn shutdown_observability() {
2335    #[cfg(feature = "otlp")]
2336    otlp::shutdown();
2337}
2338
2339#[cfg(test)]
2340mod tests {
2341    use super::*;
2342
2343    // A valid 64-hex-character (256-bit) test key.
2344    const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
2345
2346    #[test]
2347    fn config_rejects_missing_keys_unless_insecure() {
2348        let mut config = Config::default();
2349        assert!(config.validate().is_err());
2350        config.insecure = true;
2351        assert!(config.validate().is_ok());
2352        config.insecure = false;
2353        config.api_keys = vec!["secret".into()];
2354        config.encryption_key = Some(TEST_KEY.to_owned());
2355        assert!(config.validate().is_ok());
2356    }
2357
2358    #[test]
2359    fn config_requires_encryption_key_unless_insecure() {
2360        let mut config = Config {
2361            api_keys: vec!["secret".into()],
2362            ..Config::default()
2363        };
2364        // API key set but no encryption key, not insecure ⇒ rejected.
2365        assert!(config.validate().is_err());
2366        config.encryption_key = Some(TEST_KEY.to_owned());
2367        assert!(config.validate().is_ok());
2368        // A malformed key is rejected up front, not at first write.
2369        config.encryption_key = Some("not-a-valid-hex-key".to_owned());
2370        assert!(config.validate().is_err());
2371        // Insecure mode may run without encryption-at-rest.
2372        config.insecure = true;
2373        config.encryption_key = None;
2374        assert!(config.validate().is_ok());
2375    }
2376
2377    #[test]
2378    fn master_key_file_is_an_alternative_to_the_env_key() {
2379        let dir = tempfile::tempdir().unwrap();
2380        let path = dir.path().join("master.key");
2381        // A trailing newline (as editors and `echo` add) is trimmed off.
2382        std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2383
2384        let mut config = Config {
2385            api_keys: vec!["secret".into()],
2386            master_key_file: Some(path.clone()),
2387            ..Config::default()
2388        };
2389        // The file alone satisfies encryption-at-rest and resolves to the key.
2390        assert!(config.validate().is_ok());
2391        assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2392
2393        // Setting both the env key and a file is rejected as ambiguous.
2394        config.encryption_key = Some(TEST_KEY.to_owned());
2395        assert!(config.validate().is_err());
2396
2397        // A file holding malformed hex is rejected up front.
2398        config.encryption_key = None;
2399        std::fs::write(&path, "not-a-valid-key").unwrap();
2400        assert!(config.validate().is_err());
2401    }
2402
2403    #[test]
2404    fn config_rejects_public_bind_without_optout() {
2405        let mut config = Config {
2406            api_keys: vec!["secret".into()],
2407            encryption_key: Some(TEST_KEY.to_owned()),
2408            ..Config::default()
2409        };
2410        config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2411        // Auth and encryption are satisfied, so the only failure is the bind rule.
2412        assert!(config.validate().is_err());
2413        config.insecure = true;
2414        assert!(config.validate().is_ok());
2415    }
2416
2417    #[test]
2418    fn config_public_bind_allowed_with_tls() {
2419        let config = Config {
2420            api_keys: vec!["secret".into()],
2421            encryption_key: Some(TEST_KEY.to_owned()),
2422            tls_cert: Some(PathBuf::from("cert.pem")),
2423            tls_key: Some(PathBuf::from("key.pem")),
2424            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2425            ..Config::default()
2426        };
2427        // TLS configured ⇒ a non-loopback bind is allowed without insecure.
2428        assert!(config.validate().is_ok());
2429    }
2430
2431    #[test]
2432    fn config_tls_cert_and_key_must_pair() {
2433        let mut config = Config {
2434            api_keys: vec!["secret".into()],
2435            encryption_key: Some(TEST_KEY.to_owned()),
2436            tls_cert: Some(PathBuf::from("cert.pem")),
2437            ..Config::default()
2438        };
2439        // Cert without key ⇒ rejected.
2440        assert!(config.validate().is_err());
2441        config.tls_key = Some(PathBuf::from("key.pem"));
2442        assert!(config.validate().is_ok());
2443    }
2444}