Skip to main content

quiver_server/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The Quiver daemon: gRPC and REST over the embeddable [`Database`], with
3//! API-key auth and secure-by-default configuration.
4//!
5//! Both transports are thin shells over the same shared engine operations; the
6//! engine is synchronous and CPU/`fsync`-bound, so every call is offloaded with
7//! `spawn_blocking`. Access is guarded by a reader–writer lock (ADR-0057): reads
8//! take the shared lock and run **concurrently**, writes take the exclusive lock,
9//! and the single-writer model is unchanged (ADR-0002/0006). A read that finds a
10//! collection's index stale (a prior write deferred its rebuild) serves the prior
11//! snapshot and schedules an **off-lock** rebuild (ADR-0062): the index is rebuilt
12//! under the shared lock and swapped in under a brief write lock, so a rebuild never
13//! stalls concurrent readers.
14//!
15//! Auth is by scoped API key (Bearer / gRPC `authorization` metadata) with
16//! default-deny RBAC: each key carries a role (read ⊆ write ⊆ admin) and a
17//! collection scope, enforced on every operation at the shared op layer
18//! (ADR-0011/0013, the `auth` module). Encryption-at-rest is on by default
19//! (ADR-0010): unless `insecure` is set, an `encryption_key` is required and the
20//! engine is opened through `quiver-crypto`'s AEAD codec; payloads may also be
21//! client-side-encrypted (ADR-0012). TLS-in-transit uses `rustls` over the
22//! audited `ring` provider — REST via `axum-server`, gRPC via tonic's `tls-ring`
23//! — and a non-loopback bind requires it; setting a client CA additionally
24//! requires mutual TLS. Mutating and administrative operations, and every
25//! access-control denial, are recorded to an append-only audit log (ADR-0011,
26//! the `audit` module) when `audit_log` is set. Per-request cost limits bound
27//! the work any single authenticated request can demand (ADR-0040, the
28//! [`Limits`] type), and an opt-in per-key token-bucket rate limiter bounds the
29//! request *rate* (ADR-0049, the [`RateLimiter`] type); per-tenant engine
30//! partitioning is a later phase. Design: `docs/api/rest-grpc.md`.
31
32mod audit;
33mod auth;
34mod error;
35mod grpc;
36mod metrics;
37mod otlp;
38mod rate_limit;
39mod replication;
40mod rest;
41
42use std::collections::{HashMap, HashSet};
43use std::future::Future;
44use std::net::{IpAddr, Ipv4Addr, SocketAddr};
45use std::path::PathBuf;
46use std::pin::Pin;
47use std::sync::{Arc, Mutex, RwLock};
48
49use axum_server::tls_rustls::RustlsConfig;
50use figment::Figment;
51use figment::providers::{Env, Format, Serialized, Toml};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use tokio::net::TcpListener;
55use tokio::sync::broadcast;
56use tokio_stream::wrappers::TcpListenerStream;
57use tonic::transport::{Certificate, Identity, ServerTlsConfig};
58
59use quiver_crypto::AeadCodec;
60use quiver_embed::{
61    Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
62    SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
63};
64use quiver_query::Filter;
65
66pub use auth::{Action, ApiKey, CollectionScope};
67pub use error::Error;
68pub use otlp::OtlpConfig;
69// The embedding/rerank seam lives in its own lean crate (ADR-0058) so the MCP
70// server can share it; re-exported here so the server's public API is unchanged.
71pub use quiver_providers::{
72    EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
73    RerankProvider,
74};
75pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
76
77use audit::{AuditLog, Outcome};
78use auth::Principal;
79
80/// Per-request cost limits (ADR-0040). Each cap bounds the work a single
81/// authenticated request can demand, so one oversized request cannot exhaust the
82/// node under the single-writer model (ADR-0006). Over-limit requests are
83/// **rejected** with HTTP 400 / gRPC `InvalidArgument` rather than silently
84/// clamped — a truncated `k` or `ef_search` would return surprising, lower-quality
85/// results with no signal. Defaults are generous; raise a cap with a `[limits]`
86/// table in `quiver.toml` or the matching `QUIVER_MAX_*` environment variable.
87#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
88#[serde(default)]
89pub struct Limits {
90    /// Maximum `k` (top-k) for `search` / `search_multi_vector`.
91    pub max_k: usize,
92    /// Maximum search beam width (`ef_search`).
93    pub max_ef_search: usize,
94    /// Maximum `fetch` page size.
95    pub max_fetch_limit: usize,
96    /// Maximum vector dimension: the declared collection dimension at creation
97    /// and the length of any query vector (per token, for multi-vector).
98    pub max_vector_dim: usize,
99    /// Maximum serialized-JSON payload size per point, in bytes.
100    pub max_payload_bytes: usize,
101    /// Maximum number of points / documents in a single upsert request.
102    pub max_batch_size: usize,
103    /// Maximum HTTP request body size, in bytes (enforced by the REST layer via
104    /// axum's `DefaultBodyLimit`; gRPC is bounded by tonic's decode limit).
105    pub max_request_body_bytes: usize,
106    /// Maximum number of non-zero terms in a hybrid-search sparse query (ADR-0043),
107    /// bounding the posting-list scan.
108    pub max_sparse_terms: usize,
109    /// Maximum number of points in a single bulk upsert (`POST …/points:bulk`,
110    /// ADR-0045). Larger than `max_batch_size` because the bulk path defers index
111    /// maintenance to one rebuild; the request is still bounded by
112    /// `max_request_body_bytes`, so raise that too for very large bulk loads.
113    pub max_bulk_batch_size: usize,
114}
115
116impl Default for Limits {
117    fn default() -> Self {
118        Self {
119            max_k: 10_000,
120            max_ef_search: 4_096,
121            max_fetch_limit: 10_000,
122            max_vector_dim: 8_192,
123            max_payload_bytes: 65_536,
124            max_batch_size: 1_000,
125            max_request_body_bytes: 32 * 1024 * 1024,
126            max_sparse_terms: 4_096,
127            max_bulk_batch_size: 50_000,
128        }
129    }
130}
131
132impl Limits {
133    // Apply `QUIVER_MAX_*` overrides after figment extraction (ADR-0013 env
134    // layer). The flat env keys do not nest under figment's `limits` table, so
135    // they are read explicitly here; a malformed value is a hard config error.
136    fn apply_env_overrides(&mut self) -> Result<(), Error> {
137        let slots: [(&str, &mut usize); 9] = [
138            ("QUIVER_MAX_K", &mut self.max_k),
139            ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
140            ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
141            ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
142            ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
143            ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
144            (
145                "QUIVER_MAX_REQUEST_BODY_BYTES",
146                &mut self.max_request_body_bytes,
147            ),
148            ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
149            ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
150        ];
151        for (key, slot) in slots {
152            if let Ok(raw) = std::env::var(key) {
153                *slot = raw.parse().map_err(|_| {
154                    Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
155                })?;
156            }
157        }
158        Ok(())
159    }
160
161    // Reject a zero cap (a `0` limit would refuse every request).
162    fn validate(&self) -> Result<(), Error> {
163        let named = [
164            ("max_k", self.max_k),
165            ("max_ef_search", self.max_ef_search),
166            ("max_fetch_limit", self.max_fetch_limit),
167            ("max_vector_dim", self.max_vector_dim),
168            ("max_payload_bytes", self.max_payload_bytes),
169            ("max_batch_size", self.max_batch_size),
170            ("max_request_body_bytes", self.max_request_body_bytes),
171            ("max_sparse_terms", self.max_sparse_terms),
172            ("max_bulk_batch_size", self.max_bulk_batch_size),
173        ];
174        if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
175            return Err(Error::Config(format!(
176                "limits.{name} must be greater than zero"
177            )));
178        }
179        Ok(())
180    }
181
182    fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
183        if k > self.max_k {
184            return Err(Error::BadRequest(format!(
185                "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
186                self.max_k
187            )));
188        }
189        if ef_search > self.max_ef_search {
190            return Err(Error::BadRequest(format!(
191                "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
192                self.max_ef_search
193            )));
194        }
195        Ok(())
196    }
197
198    fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
199        if n > self.max_sparse_terms {
200            return Err(Error::BadRequest(format!(
201                "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
202                self.max_sparse_terms
203            )));
204        }
205        Ok(())
206    }
207
208    fn check_fetch(&self, limit: usize) -> Result<(), Error> {
209        if limit > self.max_fetch_limit {
210            return Err(Error::BadRequest(format!(
211                "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
212                self.max_fetch_limit
213            )));
214        }
215        Ok(())
216    }
217
218    fn check_dim(&self, dim: usize) -> Result<(), Error> {
219        if dim > self.max_vector_dim {
220            return Err(Error::BadRequest(format!(
221                "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
222                self.max_vector_dim
223            )));
224        }
225        Ok(())
226    }
227
228    fn check_vector_len(&self, len: usize) -> Result<(), Error> {
229        if len > self.max_vector_dim {
230            return Err(Error::BadRequest(format!(
231                "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
232                self.max_vector_dim
233            )));
234        }
235        Ok(())
236    }
237
238    fn check_batch(&self, n: usize) -> Result<(), Error> {
239        if n > self.max_batch_size {
240            return Err(Error::BadRequest(format!(
241                "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
242                self.max_batch_size
243            )));
244        }
245        Ok(())
246    }
247
248    fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
249        if n > self.max_bulk_batch_size {
250            return Err(Error::BadRequest(format!(
251                "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
252                self.max_bulk_batch_size
253            )));
254        }
255        Ok(())
256    }
257
258    fn check_payload(&self, payload: &Value) -> Result<(), Error> {
259        let size = serde_json::to_vec(payload)
260            .map(|v| v.len())
261            .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
262        if size > self.max_payload_bytes {
263            return Err(Error::BadRequest(format!(
264                "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
265                self.max_payload_bytes
266            )));
267        }
268        Ok(())
269    }
270}
271
272/// Server configuration, layered defaults → `quiver.toml` → `QUIVER_*` env and
273/// validated at startup (ADR-0013).
274#[derive(Debug, Clone, Serialize, Deserialize)]
275#[serde(default)]
276pub struct Config {
277    /// Data directory for the storage engine.
278    pub data_dir: PathBuf,
279    /// REST (HTTP/1.1) bind address.
280    pub rest_addr: SocketAddr,
281    /// gRPC (HTTP/2) bind address.
282    pub grpc_addr: SocketAddr,
283    /// Accepted API keys with their RBAC scopes (ADR-0011). A bare secret —
284    /// from a comma-separated `QUIVER_API_KEYS` (the env form) or a plain TOML
285    /// array entry — is an all-collections admin key; a structured
286    /// `{secret, role, collections}` entry pins a narrower scope. Empty is
287    /// allowed only with `insecure = true`.
288    #[serde(default, deserialize_with = "auth::de_api_keys")]
289    pub api_keys: Vec<ApiKey>,
290    /// Hex-encoded 256-bit **master key** for encryption-at-rest (64 hex
291    /// characters). It wraps per-collection data-encryption keys (ADR-0010).
292    /// Required unless `insecure = true` or [`master_key_file`] is set; source it
293    /// from the environment or a secret store, never the committed config. `None`
294    /// ⇒ data is stored unencrypted (only valid in `insecure` mode).
295    ///
296    /// [`master_key_file`]: Config::master_key_file
297    pub encryption_key: Option<String>,
298    /// Path to a file holding the hex master key, as an alternative to
299    /// [`encryption_key`] (set exactly one). Lets the key arrive as a mounted
300    /// secret (Docker/Kubernetes) or a KMS-decrypted file rather than an
301    /// environment variable. It should be mode `0600`; a group/world-accessible
302    /// file is warned about at startup.
303    ///
304    /// [`encryption_key`]: Config::encryption_key
305    pub master_key_file: Option<PathBuf>,
306    /// Path to the PEM-encoded TLS certificate chain. Must be set together with
307    /// `tls_key`. Required for a non-loopback bind unless `insecure = true`.
308    pub tls_cert: Option<PathBuf>,
309    /// Path to the PEM-encoded TLS private key. Must be set together with
310    /// `tls_cert`.
311    pub tls_key: Option<PathBuf>,
312    /// Path to a PEM-encoded CA certificate that signs accepted client
313    /// certificates. When set, both transports require **mutual TLS**: a client
314    /// must present a certificate chaining to this CA to connect (ADR-0011).
315    /// Requires `tls_cert`/`tls_key`; bearer API keys still carry the RBAC scope.
316    pub tls_client_ca: Option<PathBuf>,
317    /// Path to an append-only audit log file (ADR-0011). When set, every
318    /// mutating and administrative operation and every access-control denial is
319    /// appended as one JSON object per line (JSON Lines); records are always
320    /// also emitted as `tracing` events. Unset ⇒ tracing only. Secrets are never
321    /// written — see `docs/security/audit.md`.
322    pub audit_log: Option<PathBuf>,
323    /// Run as a **read-replica follower** (ADR-0030): connect to a leader's gRPC
324    /// endpoint at this URL (e.g. `http://leader:6334`) and continuously apply its
325    /// committed operations, serving reads. A follower **refuses writes**. Unset ⇒
326    /// this node is a normal read-write leader. (Plaintext `http://` for now; TLS
327    /// to the leader is a follow-up — run replication over a trusted network.)
328    pub leader_url: Option<String>,
329    /// API key the follower presents to the leader's admin-scoped `Replicate`
330    /// stream (used with `leader_url`). Source it like any secret.
331    pub leader_api_key: Option<String>,
332    /// Opt out of the secure defaults (no auth, no encryption-at-rest, allow a
333    /// non-loopback bind without TLS). For local development only; never the
334    /// default.
335    pub insecure: bool,
336    /// Per-request cost limits (ADR-0040). Set with a `[limits]` table in
337    /// `quiver.toml` or the `QUIVER_MAX_*` environment variables.
338    pub limits: Limits,
339    /// Opt-in server-side embedding providers, keyed by collection name (ADR-0047).
340    /// Configured with `[embedding.<collection>]` tables in `quiver.toml`; default
341    /// empty, so the engine stays model-agnostic and library mode is unaffected.
342    /// API keys are referenced by env-var *name* and resolved at startup, never
343    /// stored. Enables `search_text` / `upsert_text` for the named collections.
344    #[serde(default)]
345    pub embedding: HashMap<String, EmbeddingConfig>,
346    /// Opt-in server-side rerank providers, keyed by collection name (ADR-0047).
347    /// Configured with `[rerank.<collection>]` tables; enables the one-call
348    /// retrieve→rerank stage of `search_text`.
349    #[serde(default)]
350    pub rerank: HashMap<String, RerankConfig>,
351    /// Opt-in per-key rate limiting (ADR-0049). Set a `[rate_limit]` table in
352    /// `quiver.toml` or the `QUIVER_RATE_LIMIT_*` env vars;
353    /// `requests_per_second = 0` (the default) disables it.
354    #[serde(default)]
355    pub rate_limit: RateLimitConfig,
356    /// Opt-in OpenTelemetry traces export (ADR-0059). Set an `[otlp]` table or the
357    /// `QUIVER_OTLP_*` env vars; an empty `endpoint` (the default) disables it.
358    /// Export additionally requires the server's `otlp` build feature.
359    #[serde(default)]
360    pub otlp: OtlpConfig,
361}
362
363impl Default for Config {
364    fn default() -> Self {
365        Self {
366            data_dir: PathBuf::from("./quiver-data"),
367            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
368            grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
369            api_keys: Vec::new(),
370            encryption_key: None,
371            master_key_file: None,
372            tls_cert: None,
373            tls_key: None,
374            tls_client_ca: None,
375            audit_log: None,
376            leader_url: None,
377            leader_api_key: None,
378            insecure: false,
379            limits: Limits::default(),
380            embedding: HashMap::new(),
381            rerank: HashMap::new(),
382            rate_limit: RateLimitConfig::default(),
383            otlp: OtlpConfig::default(),
384        }
385    }
386}
387
388impl Config {
389    /// Load configuration from defaults, an optional `quiver.toml`, and
390    /// `QUIVER_*` environment variables.
391    pub fn load() -> Result<Self, Error> {
392        let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
393            .merge(Toml::file("quiver.toml"))
394            .merge(Env::prefixed("QUIVER_"))
395            .extract()
396            .map_err(|e| Error::Config(e.to_string()))?;
397        // The flat `QUIVER_MAX_*` env keys do not nest under the `limits` table
398        // figment builds, so apply them explicitly (ADR-0040).
399        config.limits.apply_env_overrides()?;
400        // Same for the flat `QUIVER_RATE_LIMIT_*` keys (ADR-0049).
401        config
402            .rate_limit
403            .apply_env_overrides()
404            .map_err(Error::Config)?;
405        // Same for the flat `QUIVER_OTLP_*` keys (ADR-0059).
406        config.otlp.apply_env_overrides().map_err(Error::Config)?;
407        Ok(config)
408    }
409
410    /// Reject insecure configurations unless explicitly opted out (ADR-0013):
411    /// no anonymous access, encryption-at-rest on by default with a valid key,
412    /// and no non-loopback bind without TLS.
413    pub fn validate(&self) -> Result<(), Error> {
414        if self.api_keys.is_empty() && !self.insecure {
415            return Err(Error::Config(
416                "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
417                 set insecure=true for local development"
418                    .to_owned(),
419            ));
420        }
421        // Resolve the master key from the env var or a key file (exactly one).
422        let master_key = self.master_key_hex()?;
423        if master_key.is_none() && !self.insecure {
424            return Err(Error::Config(
425                "no encryption key configured: encryption-at-rest is on by default — \
426                 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
427                 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
428                 store data unencrypted (development only)"
429                    .to_owned(),
430            ));
431        }
432        // Fail fast on a malformed key rather than at first write.
433        if let Some(key) = &master_key {
434            AeadCodec::from_hex(key)
435                .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
436        }
437        // TLS certificate and key are set together or not at all.
438        if self.tls_cert.is_some() != self.tls_key.is_some() {
439            return Err(Error::Config(
440                "tls_cert and tls_key must be set together".to_owned(),
441            ));
442        }
443        // mTLS layers on top of server TLS: a client CA needs a server cert/key.
444        if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
445            return Err(Error::Config(
446                "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
447            ));
448        }
449        let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
450        let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
451        if non_loopback && !tls_enabled && !self.insecure {
452            return Err(Error::Config(
453                "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
454                 or insecure=true for local development"
455                    .to_owned(),
456            ));
457        }
458        // Reject a nonsensical cost limit (a `0` cap would refuse every request).
459        self.limits.validate()?;
460        Ok(())
461    }
462
463    /// The effective hex master key: from [`master_key_file`] when set (read and
464    /// trimmed), otherwise [`encryption_key`]. `None` means no key is configured
465    /// (only valid with `insecure`).
466    ///
467    /// # Errors
468    /// [`Error::Config`] if both sources are set, or the key file cannot be read.
469    ///
470    /// [`master_key_file`]: Config::master_key_file
471    /// [`encryption_key`]: Config::encryption_key
472    pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
473        let env_key = self
474            .encryption_key
475            .as_deref()
476            .map(str::trim)
477            .filter(|k| !k.is_empty());
478        match (&self.master_key_file, env_key) {
479            (Some(_), Some(_)) => Err(Error::Config(
480                "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
481                 (QUIVER_MASTER_KEY_FILE), not both"
482                    .to_owned(),
483            )),
484            (Some(path), None) => {
485                warn_if_world_readable(path);
486                let hex = std::fs::read_to_string(path).map_err(|e| {
487                    Error::Config(format!("reading master_key_file {}: {e}", path.display()))
488                })?;
489                Ok(Some(hex.trim().to_owned()))
490            }
491            (None, Some(key)) => Ok(Some(key.to_owned())),
492            (None, None) => Ok(None),
493        }
494    }
495}
496
497// Warn (don't fail — Docker/Kubernetes secrets often mount group/world-readable)
498// when a master-key file is more permissive than `0600`.
499#[cfg(unix)]
500fn warn_if_world_readable(path: &std::path::Path) {
501    use std::os::unix::fs::PermissionsExt;
502    if let Ok(meta) = std::fs::metadata(path)
503        && meta.permissions().mode() & 0o077 != 0
504    {
505        tracing::warn!(
506            path = %path.display(),
507            mode = format!("{:o}", meta.permissions().mode() & 0o777),
508            "master key file is group/world-accessible; restrict it to 0600"
509        );
510    }
511}
512
513#[cfg(not(unix))]
514fn warn_if_world_readable(_path: &std::path::Path) {}
515
516/// Shared server state: the engine behind a single-writer lock, the accepted
517/// API keys with their RBAC scopes, and the audit log.
518#[derive(Clone)]
519pub(crate) struct AppState {
520    // The engine behind a reader–writer lock (ADR-0057): reads take the shared
521    // lock and run concurrently; writes take the exclusive lock. A read that finds
522    // a collection's index stale upgrades to the write lock once to rebuild it,
523    // then serves concurrently again.
524    db: Arc<RwLock<Database>>,
525    keys: Arc<Vec<ApiKey>>,
526    audit: Arc<AuditLog>,
527    // Fan-out of every committed op to replication followers (ADR-0030). The
528    // commit observer publishes here; each `Replicate` stream subscribes.
529    replication_tx: broadcast::Sender<WalEntry>,
530    // True on a replication follower: external writes are refused; the engine's
531    // state is owned by the stream it applies from the leader (ADR-0030).
532    read_only: bool,
533    // Per-request cost limits, enforced at this op layer so both transports are
534    // covered by one implementation (ADR-0040).
535    limits: Limits,
536    // Opt-in, provider-agnostic server-side embedding/rerank providers, keyed by
537    // collection (ADR-0047). Empty on the common path; `search_text`/`upsert_text`
538    // require a configured embedder for the target collection.
539    embed: Arc<EmbedRegistry>,
540    // Opt-in per-key token-bucket rate limiter (ADR-0049). A no-op when disabled.
541    rate_limiter: Arc<RateLimiter>,
542    // Prometheus metrics registry (ADR-0014/0054), scraped at `GET /metrics`.
543    metrics: Arc<metrics::Metrics>,
544    // Collections with an off-lock index rebuild in flight (ADR-0062). A search that
545    // observes a deferred rebuild schedules one here, single-flighted so concurrent
546    // readers never kick off duplicate builds for the same collection.
547    rebuilding: Arc<Mutex<HashSet<String>>>,
548}
549
550/// A collection's metadata.
551pub(crate) struct CollectionInfo {
552    pub name: String,
553    pub dim: u32,
554    pub metric: DistanceMetric,
555    pub count: u64,
556    pub index: IndexSpec,
557    pub filterable: Vec<FilterableField>,
558    pub multivector: bool,
559    pub vector_encryption: VectorEncryption,
560}
561
562/// A point to upsert.
563pub(crate) struct PointIn {
564    pub id: String,
565    pub vector: Vec<f32>,
566    pub payload: Value,
567}
568
569/// A text point to embed server-side and upsert (ADR-0047).
570pub(crate) struct TextPointIn {
571    pub id: String,
572    pub text: String,
573    pub payload: Value,
574}
575
576/// Default candidate pool size a rerank stage over-fetches before reordering to
577/// the requested `k` (ADR-0047).
578const RERANK_CANDIDATES: usize = 50;
579
580/// The document text a rerank stage scores: the original text stored under
581/// [`TEXT_KEY`] by `upsert_text`, else the whole payload as a string so the
582/// reranker still has something to compare.
583fn doc_text(payload: Option<&Value>) -> String {
584    match payload {
585        Some(Value::Object(map)) => map
586            .get(TEXT_KEY)
587            .and_then(Value::as_str)
588            .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
589        Some(v) => v.to_string(),
590        None => String::new(),
591    }
592}
593
594/// A fetched point.
595pub(crate) struct PointOut {
596    pub id: String,
597    pub vector: Option<Vec<f32>>,
598    pub payload: Value,
599}
600
601/// A search hit.
602pub(crate) struct MatchOut {
603    pub id: String,
604    pub score: f32,
605    pub payload: Option<Value>,
606    pub vector: Option<Vec<f32>>,
607}
608
609/// A multi-vector document to upsert.
610pub(crate) struct DocumentIn {
611    pub id: String,
612    pub vectors: Vec<Vec<f32>>,
613    pub payload: Value,
614}
615
616/// A multi-vector document hit (MaxSim).
617pub(crate) struct DocumentMatchOut {
618    pub id: String,
619    pub score: f32,
620    pub payload: Option<Value>,
621    pub vectors: Option<Vec<Vec<f32>>>,
622}
623
624impl AppState {
625    /// Authenticate a presented bearer token to its [`Principal`], or `None`
626    /// (a 401). An empty key set means `insecure` mode (validated at startup),
627    /// which admits any caller as an all-collections admin.
628    pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
629        auth::authenticate(&self.keys, presented)
630    }
631
632    /// Consume one rate-limit token for `actor` (ADR-0049). A no-op `Allowed` when
633    /// rate limiting is disabled. Both transports call this at their auth choke
634    /// point so the limiter is enforced by one implementation.
635    pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
636        self.rate_limiter.check(actor)
637    }
638
639    /// Whether the per-key rate limiter is active (lets a transport skip the work
640    /// entirely on the common, disabled path).
641    pub(crate) fn rate_limit_enabled(&self) -> bool {
642        self.rate_limiter.enabled()
643    }
644
645    // Run a **mutating** engine op behind the exclusive write lock, off the async
646    // runtime (the engine is synchronous and CPU/IO-bound). The single writer is
647    // unchanged from the prior single-mutex model (ADR-0006/0057).
648    async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
649    where
650        T: Send + 'static,
651        F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
652    {
653        let db = Arc::clone(&self.db);
654        tokio::task::spawn_blocking(move || -> Result<T, Error> {
655            let mut guard = db
656                .write()
657                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
658            f(&mut guard).map_err(Error::Engine)
659        })
660        .await
661        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
662    }
663
664    // Run a **read-only** engine op behind the shared read lock — many of these run
665    // concurrently (ADR-0057). The closure gets `&Database`, so it can only call the
666    // `&self` reads (`*_snapshot`, `fetch`, accessors).
667    async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
668    where
669        T: Send + 'static,
670        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
671    {
672        let db = Arc::clone(&self.db);
673        tokio::task::spawn_blocking(move || -> Result<T, Error> {
674            let guard = db
675                .read()
676                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
677            f(&guard).map_err(Error::Engine)
678        })
679        .await
680        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
681    }
682
683    // Run a snapshot read for `collection` (ADR-0057/0062). Takes only the shared
684    // read lock, so concurrent searches run in parallel. When a prior write deferred
685    // this collection's rebuild, the snapshot read serves the **prior** snapshot
686    // (snapshot-isolated, slightly stale) rather than blocking, and we schedule an
687    // **off-lock** rebuild so the next read is fresh — the reader never waits on a
688    // rebuild under the exclusive lock.
689    async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
690    where
691        T: Send + 'static,
692        F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
693    {
694        let db = Arc::clone(&self.db);
695        let coll = collection.clone();
696        let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
697            let guard = db
698                .read()
699                .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
700            let result = f(&guard).map_err(Error::Engine)?;
701            // Report staleness so the caller can schedule a background rebuild; a
702            // missing collection (raced a drop) is simply "nothing to rebuild".
703            let stale = guard.needs_rebuild(&coll).unwrap_or(false);
704            Ok((result, stale))
705        })
706        .await
707        .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
708        if stale {
709            self.schedule_rebuild(collection);
710        }
711        Ok(result)
712    }
713
714    // Schedule a single off-lock rebuild for `collection` (ADR-0062), deduplicated
715    // so concurrent readers that all observe the same deferred rebuild start exactly
716    // one. A no-op if one is already in flight.
717    fn schedule_rebuild(&self, collection: String) {
718        {
719            let mut inflight = match self.rebuilding.lock() {
720                Ok(g) => g,
721                Err(_) => return,
722            };
723            if !inflight.insert(collection.clone()) {
724                return; // already rebuilding this collection
725            }
726        }
727        let state = self.clone();
728        tokio::spawn(async move {
729            state.run_rebuild(&collection).await;
730            if let Ok(mut inflight) = state.rebuilding.lock() {
731                inflight.remove(&collection);
732            }
733        });
734    }
735
736    // Drive a collection's off-lock rebuild to completion (ADR-0062): capture the
737    // inputs under the shared read lock, build with no lock held, install under a
738    // brief write lock, and repeat while a write landed during the build (the commit
739    // reports the collection still stale). Each phase is a `spawn_blocking` so the
740    // CPU-bound build never stalls the async runtime; errors end the attempt (the
741    // collection stays stale and the next read reschedules).
742    async fn run_rebuild(&self, collection: &str) {
743        loop {
744            let db = Arc::clone(&self.db);
745            let coll = collection.to_owned();
746            let inputs = tokio::task::spawn_blocking(move || {
747                let guard = db.read().ok()?;
748                guard.snapshot_rebuild_inputs(&coll).ok().flatten()
749            })
750            .await
751            .ok()
752            .flatten();
753            let Some(inputs) = inputs else { return };
754
755            let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
756                return;
757            };
758
759            let db = Arc::clone(&self.db);
760            let still_stale = tokio::task::spawn_blocking(move || {
761                let mut guard = db.write().ok()?;
762                guard.commit_rebuild(rebuilt).ok()
763            })
764            .await
765            .ok()
766            .flatten();
767            match still_stale {
768                Some(true) => continue, // a write landed during the build — rebuild again
769                _ => return,
770            }
771        }
772    }
773
774    // Authorize `action` on `resource`, recording a denial in the audit log
775    // before propagating it. The shared choke point for both transports.
776    fn authorize(
777        &self,
778        principal: &Principal,
779        action: Action,
780        op: &str,
781        resource: &str,
782    ) -> Result<(), Error> {
783        principal
784            .require(action, Some(resource))
785            .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
786    }
787
788    // Authorize a collection-agnostic operation (listing): only the role is
789    // checked; a denial is recorded against the `*` resource.
790    fn authorize_global(
791        &self,
792        principal: &Principal,
793        action: Action,
794        op: &str,
795    ) -> Result<(), Error> {
796        principal
797            .require(action, None)
798            .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
799    }
800
801    /// Open a replication stream (ADR-0030): authorize (admin), then — in a single
802    /// engine critical section so no commit can interleave — subscribe to the live
803    /// commit tail and snapshot current state. The caller streams the snapshot,
804    /// then the tail from the receiver; because the subscription is taken under the
805    /// same lock as the snapshot, every post-snapshot op arrives on the receiver
806    /// and none is missed or duplicated.
807    pub(crate) async fn open_replication(
808        &self,
809        principal: &Principal,
810    ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
811        self.authorize_global(principal, Action::Admin, "replicate")?;
812        let tx = self.replication_tx.clone();
813        self.read_blocking(move |db| {
814            let rx = tx.subscribe();
815            let snapshot = db.replication_snapshot()?;
816            Ok((snapshot, rx))
817        })
818        .await
819    }
820
821    /// Apply a replicated op received from the leader (ADR-0030). Internal to the
822    /// follower stream — deliberately NOT gated by `read_only`, which only refuses
823    /// *external* client writes.
824    pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
825        self.write_blocking(move |db| db.apply_replicated(op)).await
826    }
827
828    // Refuse a mutating operation on a read-only replication follower (ADR-0030);
829    // its state is owned by the leader's stream, not by external clients.
830    fn ensure_writable(&self, op: &str) -> Result<(), Error> {
831        if self.read_only {
832            return Err(Error::Forbidden(format!(
833                "{op}: this node is a read-only replication follower"
834            )));
835        }
836        Ok(())
837    }
838
839    #[allow(clippy::too_many_arguments)]
840    pub(crate) async fn create_collection(
841        &self,
842        principal: &Principal,
843        name: String,
844        dim: u32,
845        metric: DistanceMetric,
846        index: IndexSpec,
847        filterable: Vec<FilterableField>,
848        multivector: bool,
849        vector_encryption: VectorEncryption,
850    ) -> Result<CollectionInfo, Error> {
851        self.ensure_writable("create_collection")?;
852        self.authorize(principal, Action::Admin, "create_collection", &name)?;
853        self.limits.check_dim(dim as usize)?;
854        let descriptor = Descriptor::new(dim, Dtype::F32, metric)
855            .with_index(index)
856            .with_filterable(filterable.clone())
857            .with_multivector(multivector)
858            .with_vector_encryption(vector_encryption);
859        let owned = name.clone();
860        let result = self
861            .write_blocking(move |db| db.create_collection(&owned, descriptor))
862            .await;
863        self.audit.record(
864            principal.actor(),
865            "create_collection",
866            &name,
867            Outcome::of(&result),
868        );
869        result?;
870        Ok(CollectionInfo {
871            name,
872            dim,
873            metric,
874            count: 0,
875            index,
876            filterable,
877            multivector,
878            vector_encryption,
879        })
880    }
881
882    pub(crate) async fn get_collection(
883        &self,
884        principal: &Principal,
885        name: String,
886    ) -> Result<CollectionInfo, Error> {
887        self.authorize(principal, Action::Read, "get_collection", &name)?;
888        self.read_blocking(move |db| {
889            let descriptor = db
890                .descriptor(&name)
891                .cloned()
892                .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
893            // A multi-vector collection reports its document count, not its
894            // (much larger) token-row count.
895            let count = if descriptor.multivector {
896                db.document_count(&name)? as u64
897            } else {
898                db.len(&name)? as u64
899            };
900            Ok(CollectionInfo {
901                name,
902                dim: descriptor.dim,
903                metric: descriptor.metric,
904                count,
905                index: descriptor.index,
906                filterable: descriptor.filterable,
907                multivector: descriptor.multivector,
908                vector_encryption: descriptor.vector_encryption,
909            })
910        })
911        .await
912    }
913
914    pub(crate) async fn list_collections(
915        &self,
916        principal: &Principal,
917    ) -> Result<Vec<CollectionInfo>, Error> {
918        self.authorize_global(principal, Action::Read, "list_collections")?;
919        let mut infos = self
920            .read_blocking(|db| {
921                let mut out = Vec::new();
922                for name in db.collection_names() {
923                    if let Some(descriptor) = db.descriptor(&name).cloned() {
924                        let count = if descriptor.multivector {
925                            db.document_count(&name)? as u64
926                        } else {
927                            db.len(&name)? as u64
928                        };
929                        out.push(CollectionInfo {
930                            name,
931                            dim: descriptor.dim,
932                            metric: descriptor.metric,
933                            count,
934                            index: descriptor.index,
935                            filterable: descriptor.filterable,
936                            multivector: descriptor.multivector,
937                            vector_encryption: descriptor.vector_encryption,
938                        });
939                    }
940                }
941                Ok(out)
942            })
943            .await?;
944        // Never reveal collections outside the caller's scope.
945        infos.retain(|info| principal.can_see(&info.name));
946        Ok(infos)
947    }
948
949    pub(crate) async fn delete_collection(
950        &self,
951        principal: &Principal,
952        name: String,
953    ) -> Result<bool, Error> {
954        self.ensure_writable("delete_collection")?;
955        self.authorize(principal, Action::Admin, "delete_collection", &name)?;
956        let resource = name.clone();
957        let result = self
958            .write_blocking(move |db| db.drop_collection(&name))
959            .await;
960        self.audit.record(
961            principal.actor(),
962            "delete_collection",
963            &resource,
964            Outcome::of(&result),
965        );
966        result
967    }
968
969    #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
970    pub(crate) async fn upsert(
971        &self,
972        principal: &Principal,
973        collection: String,
974        points: Vec<PointIn>,
975    ) -> Result<u64, Error> {
976        self.ensure_writable("upsert")?;
977        self.authorize(principal, Action::Write, "upsert", &collection)?;
978        self.limits.check_batch(points.len())?;
979        for p in &points {
980            self.limits.check_vector_len(p.vector.len())?;
981            self.limits.check_payload(&p.payload)?;
982        }
983        let resource = collection.clone();
984        let result = self
985            .write_blocking(move |db| {
986                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
987                    .iter()
988                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
989                    .collect();
990                db.upsert_batch(&collection, &records)
991            })
992            .await;
993        self.audit
994            .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
995        result
996    }
997
998    // Bulk upsert for a load-then-query workload (ADR-0045): one WAL fsync plus a
999    // single deferred index-build pass, with the larger `max_bulk_batch_size` cap.
1000    pub(crate) async fn upsert_bulk(
1001        &self,
1002        principal: &Principal,
1003        collection: String,
1004        points: Vec<PointIn>,
1005    ) -> Result<u64, Error> {
1006        self.ensure_writable("upsert")?;
1007        self.authorize(principal, Action::Write, "upsert", &collection)?;
1008        self.limits.check_bulk_batch(points.len())?;
1009        for p in &points {
1010            self.limits.check_vector_len(p.vector.len())?;
1011            self.limits.check_payload(&p.payload)?;
1012        }
1013        let resource = collection.clone();
1014        let result = self
1015            .write_blocking(move |db| {
1016                let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1017                    .iter()
1018                    .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1019                    .collect();
1020                db.upsert_bulk(&collection, &records)
1021            })
1022            .await;
1023        self.audit.record(
1024            principal.actor(),
1025            "upsert_bulk",
1026            &resource,
1027            Outcome::of(&result),
1028        );
1029        result
1030    }
1031
1032    /// Take a consistent online snapshot of the whole database into a
1033    /// server-local `destination` directory (ADR-0050). A global admin
1034    /// operation; runs the checkpoint + copy on the blocking pool.
1035    #[tracing::instrument(skip_all)]
1036    pub(crate) async fn snapshot(
1037        &self,
1038        principal: &Principal,
1039        destination: String,
1040    ) -> Result<SnapshotInfo, Error> {
1041        self.ensure_writable("snapshot")?;
1042        self.authorize_global(principal, Action::Admin, "snapshot")?;
1043        let dest = std::path::PathBuf::from(&destination);
1044        let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1045        self.audit.record(
1046            principal.actor(),
1047            "snapshot",
1048            &destination,
1049            Outcome::of(&result),
1050        );
1051        result
1052    }
1053
1054    pub(crate) async fn delete_points(
1055        &self,
1056        principal: &Principal,
1057        collection: String,
1058        ids: Vec<String>,
1059    ) -> Result<u64, Error> {
1060        self.ensure_writable("delete_points")?;
1061        self.authorize(principal, Action::Write, "delete_points", &collection)?;
1062        let resource = collection.clone();
1063        let result = self
1064            .write_blocking(move |db| {
1065                let mut count = 0u64;
1066                for id in &ids {
1067                    if db.delete(&collection, id)? {
1068                        count += 1;
1069                    }
1070                }
1071                Ok(count)
1072            })
1073            .await;
1074        self.audit.record(
1075            principal.actor(),
1076            "delete_points",
1077            &resource,
1078            Outcome::of(&result),
1079        );
1080        result
1081    }
1082
1083    pub(crate) async fn get_points(
1084        &self,
1085        principal: &Principal,
1086        collection: String,
1087        ids: Vec<String>,
1088        with_vector: bool,
1089    ) -> Result<Vec<PointOut>, Error> {
1090        self.authorize(principal, Action::Read, "get_points", &collection)?;
1091        self.read_blocking(move |db| {
1092            let mut out = Vec::new();
1093            for id in &ids {
1094                if let Some(m) = db.get(&collection, id)? {
1095                    out.push(PointOut {
1096                        id: m.id,
1097                        vector: if with_vector { m.vector } else { None },
1098                        payload: m.payload.unwrap_or(Value::Null),
1099                    });
1100                }
1101            }
1102            Ok(out)
1103        })
1104        .await
1105    }
1106
1107    #[allow(clippy::too_many_arguments)]
1108    #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1109    pub(crate) async fn search(
1110        &self,
1111        principal: &Principal,
1112        collection: String,
1113        vector: Vec<f32>,
1114        k: usize,
1115        filter: Option<Filter>,
1116        ef_search: usize,
1117        with_payload: bool,
1118        with_vector: bool,
1119    ) -> Result<Vec<MatchOut>, Error> {
1120        self.authorize(principal, Action::Read, "search", &collection)?;
1121        self.limits.check_search(k, ef_search)?;
1122        self.limits.check_vector_len(vector.len())?;
1123        let params = SearchParams {
1124            k,
1125            filter,
1126            ef_search,
1127            with_payload,
1128            with_vector,
1129        };
1130        let coll = collection.clone();
1131        self.search_blocking(coll, move |db| {
1132            let matches = db.search_snapshot(&collection, &vector, &params)?;
1133            Ok(matches
1134                .into_iter()
1135                .map(|m| MatchOut {
1136                    id: m.id,
1137                    score: m.score,
1138                    payload: m.payload,
1139                    vector: m.vector,
1140                })
1141                .collect())
1142        })
1143        .await
1144    }
1145
1146    #[allow(clippy::too_many_arguments)]
1147    pub(crate) async fn hybrid_search(
1148        &self,
1149        principal: &Principal,
1150        collection: String,
1151        dense: Option<Vec<f32>>,
1152        sparse: Option<(Vec<u32>, Vec<f32>)>,
1153        text: Option<String>,
1154        k: usize,
1155        filter: Option<Filter>,
1156        ef_search: usize,
1157        rrf_k0: f32,
1158        with_payload: bool,
1159        with_vector: bool,
1160    ) -> Result<Vec<MatchOut>, Error> {
1161        self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1162        self.limits.check_search(k, ef_search)?;
1163        if let Some(v) = &dense {
1164            self.limits.check_vector_len(v.len())?;
1165        }
1166        if let Some((indices, values)) = &sparse {
1167            self.limits.check_sparse_terms(indices.len())?;
1168            if indices.len() != values.len() {
1169                return Err(Error::BadRequest(format!(
1170                    "sparse query indices ({}) and values ({}) length mismatch",
1171                    indices.len(),
1172                    values.len()
1173                )));
1174            }
1175        }
1176        let params = SearchParams {
1177            k,
1178            filter,
1179            ef_search,
1180            with_payload,
1181            with_vector,
1182        };
1183        let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1184        let coll = collection.clone();
1185        self.search_blocking(coll, move |db| {
1186            let matches = db.hybrid_search_snapshot(
1187                &collection,
1188                dense.as_deref(),
1189                sv.as_ref(),
1190                text.as_deref(),
1191                &params,
1192                rrf_k0,
1193            )?;
1194            Ok(matches
1195                .into_iter()
1196                .map(|m| MatchOut {
1197                    id: m.id,
1198                    score: m.score,
1199                    payload: m.payload,
1200                    vector: m.vector,
1201                })
1202                .collect())
1203        })
1204        .await
1205    }
1206
1207    /// Embed `text` with the collection's configured provider and run a dense (or
1208    /// dense ⊕ BM25, if the collection has text) search, optionally reranking the
1209    /// candidates in one call (ADR-0047). The text is also passed to the BM25 side,
1210    /// so a `upsert_text` corpus yields hybrid lexical+semantic retrieval for free.
1211    #[allow(clippy::too_many_arguments)]
1212    pub(crate) async fn search_text(
1213        &self,
1214        principal: &Principal,
1215        collection: String,
1216        text: String,
1217        k: usize,
1218        filter: Option<Filter>,
1219        ef_search: usize,
1220        rrf_k0: f32,
1221        with_payload: bool,
1222        with_vector: bool,
1223        rerank: bool,
1224    ) -> Result<Vec<MatchOut>, Error> {
1225        self.authorize(principal, Action::Read, "search_text", &collection)?;
1226        self.limits.check_search(k, ef_search)?;
1227        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1228            Error::BadRequest(format!(
1229                "collection {collection:?} has no embedding provider configured \
1230                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1231            ))
1232        })?;
1233        // Embed off the async runtime: the provider call is blocking network I/O.
1234        let query = text.clone();
1235        let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1236            .await
1237            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1238            .map_err(|e| Error::Upstream(e.to_string()))?
1239            .into_iter()
1240            .next()
1241            .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1242        self.limits.check_vector_len(vector.len())?;
1243
1244        let reranker = if rerank {
1245            self.embed.reranker(&collection)
1246        } else {
1247            None
1248        };
1249        // Over-fetch when reranking so the reranker can reorder a wide candidate set
1250        // down to the requested `k`; fetch payloads when reranking (we need the doc
1251        // text) even if the caller did not ask for them.
1252        let need_payload = with_payload || reranker.is_some();
1253        let fetch_k = if reranker.is_some() {
1254            k.max(RERANK_CANDIDATES)
1255        } else {
1256            k
1257        };
1258
1259        let mut hits = self
1260            .hybrid_search(
1261                principal,
1262                collection,
1263                Some(vector),
1264                None,
1265                Some(text.clone()),
1266                fetch_k,
1267                filter,
1268                ef_search,
1269                rrf_k0,
1270                need_payload,
1271                with_vector,
1272            )
1273            .await?;
1274
1275        if let Some(rr) = reranker {
1276            let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1277            let query = text;
1278            let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1279                .await
1280                .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1281                .map_err(|e| Error::Upstream(e.to_string()))?;
1282            // Re-score each hit and sort by the rerank score, descending.
1283            let mut scored: Vec<(f32, MatchOut)> = scores
1284                .into_iter()
1285                .zip(hits)
1286                .map(|(s, mut h)| {
1287                    h.score = s;
1288                    (s, h)
1289                })
1290                .collect();
1291            scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1292            hits = scored.into_iter().map(|(_, h)| h).collect();
1293        }
1294
1295        hits.truncate(k);
1296        // Drop payloads we only fetched for reranking if the caller didn't want them.
1297        if !with_payload {
1298            for h in &mut hits {
1299                h.payload = None;
1300            }
1301        }
1302        Ok(hits)
1303    }
1304
1305    /// Embed each point's `text` with the collection's provider and upsert it as a
1306    /// dense point, co-populating the `__quiver_text__` payload key (ADR-0046) so the
1307    /// same text is indexed for BM25 — one call feeds both the dense and lexical
1308    /// sides (ADR-0047).
1309    pub(crate) async fn upsert_text(
1310        &self,
1311        principal: &Principal,
1312        collection: String,
1313        points: Vec<TextPointIn>,
1314    ) -> Result<u64, Error> {
1315        self.ensure_writable("upsert_text")?;
1316        self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1317        self.limits.check_batch(points.len())?;
1318        for p in &points {
1319            if !matches!(p.payload, Value::Object(_) | Value::Null) {
1320                return Err(Error::BadRequest(
1321                    "upsert_text payload must be a JSON object or null".to_owned(),
1322                ));
1323            }
1324        }
1325        let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1326            Error::BadRequest(format!(
1327                "collection {collection:?} has no embedding provider configured \
1328                 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1329            ))
1330        })?;
1331        let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1332        let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1333            .await
1334            .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1335            .map_err(|e| Error::Upstream(e.to_string()))?;
1336        if vectors.len() != points.len() {
1337            return Err(Error::Upstream(format!(
1338                "embedding provider returned {} vectors for {} inputs",
1339                vectors.len(),
1340                points.len()
1341            )));
1342        }
1343        let dense: Vec<PointIn> = points
1344            .into_iter()
1345            .zip(vectors)
1346            .map(|(p, vector)| {
1347                let mut payload = match p.payload {
1348                    Value::Object(map) => map,
1349                    _ => serde_json::Map::new(),
1350                };
1351                // Don't clobber a caller-supplied text key.
1352                payload
1353                    .entry(TEXT_KEY.to_owned())
1354                    .or_insert_with(|| Value::String(p.text.clone()));
1355                PointIn {
1356                    id: p.id,
1357                    vector,
1358                    payload: Value::Object(payload),
1359                }
1360            })
1361            .collect();
1362        self.upsert(principal, collection, dense).await
1363    }
1364
1365    pub(crate) async fn fetch(
1366        &self,
1367        principal: &Principal,
1368        collection: String,
1369        filter: Option<Filter>,
1370        limit: usize,
1371        with_payload: bool,
1372        with_vector: bool,
1373    ) -> Result<Vec<MatchOut>, Error> {
1374        self.authorize(principal, Action::Read, "fetch", &collection)?;
1375        self.limits.check_fetch(limit)?;
1376        self.read_blocking(move |db| {
1377            let matches = db.fetch(
1378                &collection,
1379                filter.as_ref(),
1380                limit,
1381                with_payload,
1382                with_vector,
1383            )?;
1384            Ok(matches
1385                .into_iter()
1386                .map(|m| MatchOut {
1387                    id: m.id,
1388                    score: m.score,
1389                    payload: m.payload,
1390                    vector: m.vector,
1391                })
1392                .collect())
1393        })
1394        .await
1395    }
1396
1397    pub(crate) async fn upsert_documents(
1398        &self,
1399        principal: &Principal,
1400        collection: String,
1401        documents: Vec<DocumentIn>,
1402    ) -> Result<u64, Error> {
1403        self.ensure_writable("upsert_documents")?;
1404        self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1405        self.limits.check_batch(documents.len())?;
1406        for doc in &documents {
1407            self.limits.check_payload(&doc.payload)?;
1408            for token in &doc.vectors {
1409                self.limits.check_vector_len(token.len())?;
1410            }
1411        }
1412        let resource = collection.clone();
1413        let result = self
1414            .write_blocking(move |db| {
1415                let mut count = 0u64;
1416                for doc in &documents {
1417                    db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1418                    count += 1;
1419                }
1420                Ok(count)
1421            })
1422            .await;
1423        self.audit.record(
1424            principal.actor(),
1425            "upsert_documents",
1426            &resource,
1427            Outcome::of(&result),
1428        );
1429        result
1430    }
1431
1432    pub(crate) async fn delete_documents(
1433        &self,
1434        principal: &Principal,
1435        collection: String,
1436        ids: Vec<String>,
1437    ) -> Result<u64, Error> {
1438        self.ensure_writable("delete_documents")?;
1439        self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1440        let resource = collection.clone();
1441        let result = self
1442            .write_blocking(move |db| {
1443                let mut count = 0u64;
1444                for id in &ids {
1445                    if db.delete_document(&collection, id)? {
1446                        count += 1;
1447                    }
1448                }
1449                Ok(count)
1450            })
1451            .await;
1452        self.audit.record(
1453            principal.actor(),
1454            "delete_documents",
1455            &resource,
1456            Outcome::of(&result),
1457        );
1458        result
1459    }
1460
1461    #[allow(clippy::too_many_arguments)]
1462    pub(crate) async fn search_multi_vector(
1463        &self,
1464        principal: &Principal,
1465        collection: String,
1466        query: Vec<Vec<f32>>,
1467        k: usize,
1468        filter: Option<Filter>,
1469        ef_search: usize,
1470        with_payload: bool,
1471        with_vector: bool,
1472    ) -> Result<Vec<DocumentMatchOut>, Error> {
1473        self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1474        self.limits.check_search(k, ef_search)?;
1475        for token in &query {
1476            self.limits.check_vector_len(token.len())?;
1477        }
1478        let params = SearchParams {
1479            k,
1480            filter,
1481            ef_search,
1482            with_payload,
1483            with_vector,
1484        };
1485        let coll = collection.clone();
1486        self.search_blocking(coll, move |db| {
1487            let matches = db.search_multi_vector_snapshot(&collection, &query, &params)?;
1488            Ok(matches
1489                .into_iter()
1490                .map(|m| DocumentMatchOut {
1491                    id: m.id,
1492                    score: m.score,
1493                    payload: m.payload,
1494                    vectors: m.vectors,
1495                })
1496                .collect())
1497        })
1498        .await
1499    }
1500}
1501
1502/// How many recently-committed ops the leader buffers for replication followers
1503/// (ADR-0030). A follower that falls further behind than this re-bootstraps.
1504const REPLICATION_BUFFER: usize = 1024;
1505
1506/// Run the server from `config` until a shutdown signal (Ctrl-C).
1507pub async fn run(config: Config) -> Result<(), Error> {
1508    config.validate()?;
1509    let rest_listener = TcpListener::bind(config.rest_addr)
1510        .await
1511        .map_err(Error::Io)?;
1512    let grpc_listener = TcpListener::bind(config.grpc_addr)
1513        .await
1514        .map_err(Error::Io)?;
1515    tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1516    tokio::select! {
1517        result = serve(config, rest_listener, grpc_listener) => result,
1518        () = shutdown_signal() => {
1519            tracing::info!("shutdown signal received");
1520            Ok(())
1521        }
1522    }
1523}
1524
1525/// Serve REST and gRPC on the given (already-bound) listeners until a transport
1526/// error. Exposed so tests can bind ephemeral ports.
1527pub async fn serve(
1528    config: Config,
1529    rest_listener: TcpListener,
1530    grpc_listener: TcpListener,
1531) -> Result<(), Error> {
1532    let mut db = open_database(&config)?;
1533    let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1534    // Publish every committed op to replication followers (ADR-0030). The observer
1535    // runs inside the engine's write critical section; `broadcast::Sender::send` is
1536    // non-blocking, so it never stalls a write.
1537    let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1538    {
1539        let tx = replication_tx.clone();
1540        db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1541            let _ = tx.send(entry.clone());
1542        }));
1543    }
1544    // Build the opt-in embedding/rerank providers, resolving each `api_key_env`
1545    // from the environment now (ADR-0047) so a missing key fails fast at startup
1546    // rather than on the first request.
1547    let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1548        .map_err(|e| Error::Config(e.to_string()))?;
1549
1550    let state = AppState {
1551        db: Arc::new(RwLock::new(db)),
1552        keys: Arc::new(config.api_keys.clone()),
1553        audit,
1554        replication_tx,
1555        read_only: config.leader_url.is_some(),
1556        limits: config.limits,
1557        embed: Arc::new(embed),
1558        rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1559        metrics: Arc::new(metrics::Metrics::default()),
1560        rebuilding: Arc::new(Mutex::new(HashSet::new())),
1561    };
1562
1563    // A follower continuously applies the leader's committed-op stream (ADR-0030).
1564    if let Some(leader_url) = config.leader_url.clone() {
1565        replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1566    }
1567
1568    let app = rest::router(state.clone());
1569    let grpc = grpc::service(state);
1570
1571    let tls = load_tls(&config)?;
1572
1573    // REST: terminate TLS with axum-server when configured, else serve plaintext.
1574    let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1575        Some(material) => {
1576            let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1577            let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1578            let server =
1579                axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1580            Box::pin(async move {
1581                server
1582                    .serve(app.into_make_service())
1583                    .await
1584                    .map_err(Error::Io)
1585            })
1586        }
1587        None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1588    };
1589
1590    // gRPC: tonic terminates TLS itself (ring provider) when an identity is set.
1591    let mut grpc_builder = tonic::transport::Server::builder();
1592    if let Some(material) = &tls {
1593        let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1594        let mut tls_config = ServerTlsConfig::new().identity(identity);
1595        // Require client certificates chaining to the configured CA (mTLS).
1596        if let Some(ca_pem) = &material.client_ca_pem {
1597            tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1598        }
1599        grpc_builder = grpc_builder
1600            .tls_config(tls_config)
1601            .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1602    }
1603    let grpc_fut = async move {
1604        grpc_builder
1605            .add_service(grpc)
1606            .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1607            .await
1608            .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1609    };
1610
1611    tokio::try_join!(rest_fut, grpc_fut)?;
1612    Ok(())
1613}
1614
1615async fn shutdown_signal() {
1616    let _ = tokio::signal::ctrl_c().await;
1617}
1618
1619// Open the engine, enabling encryption-at-rest when a key is configured. The
1620// configured key is the **master key** of an envelope key-ring (ADR-0010): it
1621// wraps a per-collection data-encryption key, so dropping a collection
1622// crypto-shreds it. With no key (only valid in `insecure` mode, enforced by
1623// `Config::validate`) the engine is opened in plaintext.
1624fn open_database(config: &Config) -> Result<Database, Error> {
1625    let master_key = config.master_key_hex()?;
1626    let keyring =
1627        quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1628            .map_err(|e| Error::Config(e.to_string()))?;
1629    let db = match keyring {
1630        Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1631        None => Database::open(&config.data_dir)?,
1632    };
1633    Ok(db)
1634}
1635
1636// TLS material shared by both transports: the raw PEM (for tonic's `Identity`
1637// and `client_ca_root`) and a parsed rustls server config (for axum-server's
1638// REST acceptor). `client_ca_pem` is set only when mutual TLS is configured.
1639struct TlsMaterial {
1640    cert_pem: Vec<u8>,
1641    key_pem: Vec<u8>,
1642    client_ca_pem: Option<Vec<u8>>,
1643    rest_config: Arc<rustls::ServerConfig>,
1644}
1645
1646// Read the configured certificate, key, and optional client CA, returning `None`
1647// when TLS is not configured. `Config::validate` already enforces that the cert
1648// and key are set together, that a client CA requires them, and that a
1649// non-loopback bind requires TLS.
1650fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1651    match (&config.tls_cert, &config.tls_key) {
1652        (Some(cert_path), Some(key_path)) => {
1653            let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1654            let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1655            let client_ca_pem = config
1656                .tls_client_ca
1657                .as_ref()
1658                .map(std::fs::read)
1659                .transpose()
1660                .map_err(Error::Io)?;
1661            let rest_config = Arc::new(rustls_server_config(
1662                &cert_pem,
1663                &key_pem,
1664                client_ca_pem.as_deref(),
1665            )?);
1666            Ok(Some(TlsMaterial {
1667                cert_pem,
1668                key_pem,
1669                client_ca_pem,
1670                rest_config,
1671            }))
1672        }
1673        (None, None) => Ok(None),
1674        _ => Err(Error::Config(
1675            "tls_cert and tls_key must be set together".to_owned(),
1676        )),
1677    }
1678}
1679
1680// Build a rustls server config from PEM bytes over the audited `ring` provider
1681// (no OpenSSL, no aws-lc-rs C toolchain). TLS 1.3 and 1.2 are offered. When a
1682// client CA is supplied, client certificates chaining to it are required (mTLS).
1683fn rustls_server_config(
1684    cert_pem: &[u8],
1685    key_pem: &[u8],
1686    client_ca_pem: Option<&[u8]>,
1687) -> Result<rustls::ServerConfig, Error> {
1688    use rustls_pki_types::pem::PemObject;
1689    use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1690
1691    let certs = CertificateDer::pem_slice_iter(cert_pem)
1692        .collect::<std::result::Result<Vec<_>, _>>()
1693        .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1694    if certs.is_empty() {
1695        return Err(Error::Config(
1696            "tls_cert contains no certificates".to_owned(),
1697        ));
1698    }
1699    let key = PrivateKeyDer::from_pem_slice(key_pem)
1700        .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1701    let provider = Arc::new(rustls::crypto::ring::default_provider());
1702    let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1703        .with_safe_default_protocol_versions()
1704        .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1705    let builder = match client_ca_pem {
1706        Some(ca_pem) => {
1707            let mut roots = rustls::RootCertStore::empty();
1708            for cert in CertificateDer::pem_slice_iter(ca_pem) {
1709                let cert =
1710                    cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1711                roots
1712                    .add(cert)
1713                    .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1714            }
1715            let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1716                Arc::new(roots),
1717                provider,
1718            )
1719            .build()
1720            .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1721            builder.with_client_cert_verifier(verifier)
1722        }
1723        None => builder.with_no_client_auth(),
1724    };
1725    builder
1726        .with_single_cert(certs, key)
1727        .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1728}
1729
1730/// Initialize structured logging from `RUST_LOG` (defaulting to `info`). Safe to
1731/// call once at startup; a second call is ignored.
1732pub fn init_tracing() {
1733    init_observability(&Config::default());
1734}
1735
1736/// Install the global tracing subscriber: an `RUST_LOG`-driven `fmt` layer plus,
1737/// when the `otlp` feature is built **and** an OTLP endpoint is configured
1738/// (ADR-0059), an OpenTelemetry traces export layer. Safe to call once at
1739/// startup; a second call is a no-op. A failure to build the OTLP exporter logs a
1740/// warning and falls back to `fmt`-only rather than taking the server down.
1741#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1742pub fn init_observability(config: &Config) {
1743    use tracing_subscriber::EnvFilter;
1744    use tracing_subscriber::prelude::*;
1745    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1746    let registry = tracing_subscriber::registry()
1747        .with(filter)
1748        .with(tracing_subscriber::fmt::layer());
1749
1750    #[cfg(feature = "otlp")]
1751    if config.otlp.is_enabled() {
1752        match otlp::build_provider(&config.otlp) {
1753            Ok(provider) => {
1754                use opentelemetry::trace::TracerProvider as _;
1755                let tracer = provider.tracer("quiver");
1756                otlp::store_provider(provider);
1757                let _ = registry
1758                    .with(tracing_opentelemetry::layer().with_tracer(tracer))
1759                    .try_init();
1760                return;
1761            }
1762            Err(e) => eprintln!("OTLP traces export disabled: {e}"),
1763        }
1764    }
1765
1766    let _ = registry.try_init();
1767}
1768
1769/// Flush and shut down the OpenTelemetry exporter, if one was installed. A no-op
1770/// without the `otlp` feature or when no endpoint was configured. Call once on
1771/// server shutdown so batched spans are not lost.
1772pub fn shutdown_observability() {
1773    #[cfg(feature = "otlp")]
1774    otlp::shutdown();
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779    use super::*;
1780
1781    // A valid 64-hex-character (256-bit) test key.
1782    const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
1783
1784    #[test]
1785    fn config_rejects_missing_keys_unless_insecure() {
1786        let mut config = Config::default();
1787        assert!(config.validate().is_err());
1788        config.insecure = true;
1789        assert!(config.validate().is_ok());
1790        config.insecure = false;
1791        config.api_keys = vec!["secret".into()];
1792        config.encryption_key = Some(TEST_KEY.to_owned());
1793        assert!(config.validate().is_ok());
1794    }
1795
1796    #[test]
1797    fn config_requires_encryption_key_unless_insecure() {
1798        let mut config = Config {
1799            api_keys: vec!["secret".into()],
1800            ..Config::default()
1801        };
1802        // API key set but no encryption key, not insecure ⇒ rejected.
1803        assert!(config.validate().is_err());
1804        config.encryption_key = Some(TEST_KEY.to_owned());
1805        assert!(config.validate().is_ok());
1806        // A malformed key is rejected up front, not at first write.
1807        config.encryption_key = Some("not-a-valid-hex-key".to_owned());
1808        assert!(config.validate().is_err());
1809        // Insecure mode may run without encryption-at-rest.
1810        config.insecure = true;
1811        config.encryption_key = None;
1812        assert!(config.validate().is_ok());
1813    }
1814
1815    #[test]
1816    fn master_key_file_is_an_alternative_to_the_env_key() {
1817        let dir = tempfile::tempdir().unwrap();
1818        let path = dir.path().join("master.key");
1819        // A trailing newline (as editors and `echo` add) is trimmed off.
1820        std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
1821
1822        let mut config = Config {
1823            api_keys: vec!["secret".into()],
1824            master_key_file: Some(path.clone()),
1825            ..Config::default()
1826        };
1827        // The file alone satisfies encryption-at-rest and resolves to the key.
1828        assert!(config.validate().is_ok());
1829        assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
1830
1831        // Setting both the env key and a file is rejected as ambiguous.
1832        config.encryption_key = Some(TEST_KEY.to_owned());
1833        assert!(config.validate().is_err());
1834
1835        // A file holding malformed hex is rejected up front.
1836        config.encryption_key = None;
1837        std::fs::write(&path, "not-a-valid-key").unwrap();
1838        assert!(config.validate().is_err());
1839    }
1840
1841    #[test]
1842    fn config_rejects_public_bind_without_optout() {
1843        let mut config = Config {
1844            api_keys: vec!["secret".into()],
1845            encryption_key: Some(TEST_KEY.to_owned()),
1846            ..Config::default()
1847        };
1848        config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
1849        // Auth and encryption are satisfied, so the only failure is the bind rule.
1850        assert!(config.validate().is_err());
1851        config.insecure = true;
1852        assert!(config.validate().is_ok());
1853    }
1854
1855    #[test]
1856    fn config_public_bind_allowed_with_tls() {
1857        let config = Config {
1858            api_keys: vec!["secret".into()],
1859            encryption_key: Some(TEST_KEY.to_owned()),
1860            tls_cert: Some(PathBuf::from("cert.pem")),
1861            tls_key: Some(PathBuf::from("key.pem")),
1862            rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
1863            ..Config::default()
1864        };
1865        // TLS configured ⇒ a non-loopback bind is allowed without insecure.
1866        assert!(config.validate().is_ok());
1867    }
1868
1869    #[test]
1870    fn config_tls_cert_and_key_must_pair() {
1871        let mut config = Config {
1872            api_keys: vec!["secret".into()],
1873            encryption_key: Some(TEST_KEY.to_owned()),
1874            tls_cert: Some(PathBuf::from("cert.pem")),
1875            ..Config::default()
1876        };
1877        // Cert without key ⇒ rejected.
1878        assert!(config.validate().is_err());
1879        config.tls_key = Some(PathBuf::from("key.pem"));
1880        assert!(config.validate().is_ok());
1881    }
1882}