1mod audit;
33mod auth;
34mod error;
35mod grpc;
36mod metrics;
37mod otlp;
38mod rate_limit;
39mod replication;
40mod rest;
41
42use std::collections::{HashMap, HashSet};
43use std::future::Future;
44use std::net::{IpAddr, Ipv4Addr, SocketAddr};
45use std::path::PathBuf;
46use std::pin::Pin;
47use std::sync::{Arc, Mutex, RwLock};
48
49use axum_server::tls_rustls::RustlsConfig;
50use figment::Figment;
51use figment::providers::{Env, Format, Serialized, Toml};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use tokio::net::TcpListener;
55use tokio::sync::broadcast;
56use tokio_stream::wrappers::TcpListenerStream;
57use tonic::transport::{Certificate, Identity, ServerTlsConfig};
58
59use quiver_crypto::AeadCodec;
60use quiver_embed::{
61 Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
62 SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
63};
64use quiver_query::Filter;
65
66pub use auth::{Action, ApiKey, CollectionScope};
67pub use error::Error;
68pub use otlp::OtlpConfig;
69pub use quiver_providers::{
72 EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
73 RerankProvider,
74};
75pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
76
77use audit::{AuditLog, Outcome};
78use auth::Principal;
79
80#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
88#[serde(default)]
89pub struct Limits {
90 pub max_k: usize,
92 pub max_ef_search: usize,
94 pub max_fetch_limit: usize,
96 pub max_vector_dim: usize,
99 pub max_payload_bytes: usize,
101 pub max_batch_size: usize,
103 pub max_request_body_bytes: usize,
106 pub max_sparse_terms: usize,
109 pub max_bulk_batch_size: usize,
114}
115
116impl Default for Limits {
117 fn default() -> Self {
118 Self {
119 max_k: 10_000,
120 max_ef_search: 4_096,
121 max_fetch_limit: 10_000,
122 max_vector_dim: 8_192,
123 max_payload_bytes: 65_536,
124 max_batch_size: 1_000,
125 max_request_body_bytes: 32 * 1024 * 1024,
126 max_sparse_terms: 4_096,
127 max_bulk_batch_size: 50_000,
128 }
129 }
130}
131
132impl Limits {
133 fn apply_env_overrides(&mut self) -> Result<(), Error> {
137 let slots: [(&str, &mut usize); 9] = [
138 ("QUIVER_MAX_K", &mut self.max_k),
139 ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
140 ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
141 ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
142 ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
143 ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
144 (
145 "QUIVER_MAX_REQUEST_BODY_BYTES",
146 &mut self.max_request_body_bytes,
147 ),
148 ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
149 ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
150 ];
151 for (key, slot) in slots {
152 if let Ok(raw) = std::env::var(key) {
153 *slot = raw.parse().map_err(|_| {
154 Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
155 })?;
156 }
157 }
158 Ok(())
159 }
160
161 fn validate(&self) -> Result<(), Error> {
163 let named = [
164 ("max_k", self.max_k),
165 ("max_ef_search", self.max_ef_search),
166 ("max_fetch_limit", self.max_fetch_limit),
167 ("max_vector_dim", self.max_vector_dim),
168 ("max_payload_bytes", self.max_payload_bytes),
169 ("max_batch_size", self.max_batch_size),
170 ("max_request_body_bytes", self.max_request_body_bytes),
171 ("max_sparse_terms", self.max_sparse_terms),
172 ("max_bulk_batch_size", self.max_bulk_batch_size),
173 ];
174 if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
175 return Err(Error::Config(format!(
176 "limits.{name} must be greater than zero"
177 )));
178 }
179 Ok(())
180 }
181
182 fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
183 if k > self.max_k {
184 return Err(Error::BadRequest(format!(
185 "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
186 self.max_k
187 )));
188 }
189 if ef_search > self.max_ef_search {
190 return Err(Error::BadRequest(format!(
191 "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
192 self.max_ef_search
193 )));
194 }
195 Ok(())
196 }
197
198 fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
199 if n > self.max_sparse_terms {
200 return Err(Error::BadRequest(format!(
201 "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
202 self.max_sparse_terms
203 )));
204 }
205 Ok(())
206 }
207
208 fn check_fetch(&self, limit: usize) -> Result<(), Error> {
209 if limit > self.max_fetch_limit {
210 return Err(Error::BadRequest(format!(
211 "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
212 self.max_fetch_limit
213 )));
214 }
215 Ok(())
216 }
217
218 fn check_dim(&self, dim: usize) -> Result<(), Error> {
219 if dim > self.max_vector_dim {
220 return Err(Error::BadRequest(format!(
221 "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
222 self.max_vector_dim
223 )));
224 }
225 Ok(())
226 }
227
228 fn check_vector_len(&self, len: usize) -> Result<(), Error> {
229 if len > self.max_vector_dim {
230 return Err(Error::BadRequest(format!(
231 "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
232 self.max_vector_dim
233 )));
234 }
235 Ok(())
236 }
237
238 fn check_batch(&self, n: usize) -> Result<(), Error> {
239 if n > self.max_batch_size {
240 return Err(Error::BadRequest(format!(
241 "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
242 self.max_batch_size
243 )));
244 }
245 Ok(())
246 }
247
248 fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
249 if n > self.max_bulk_batch_size {
250 return Err(Error::BadRequest(format!(
251 "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
252 self.max_bulk_batch_size
253 )));
254 }
255 Ok(())
256 }
257
258 fn check_payload(&self, payload: &Value) -> Result<(), Error> {
259 let size = serde_json::to_vec(payload)
260 .map(|v| v.len())
261 .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
262 if size > self.max_payload_bytes {
263 return Err(Error::BadRequest(format!(
264 "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
265 self.max_payload_bytes
266 )));
267 }
268 Ok(())
269 }
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
275#[serde(default)]
276pub struct Config {
277 pub data_dir: PathBuf,
279 pub rest_addr: SocketAddr,
281 pub grpc_addr: SocketAddr,
283 #[serde(default, deserialize_with = "auth::de_api_keys")]
289 pub api_keys: Vec<ApiKey>,
290 pub encryption_key: Option<String>,
298 pub master_key_file: Option<PathBuf>,
306 pub tls_cert: Option<PathBuf>,
309 pub tls_key: Option<PathBuf>,
312 pub tls_client_ca: Option<PathBuf>,
317 pub audit_log: Option<PathBuf>,
323 pub leader_url: Option<String>,
329 pub leader_api_key: Option<String>,
332 pub insecure: bool,
336 pub limits: Limits,
339 #[serde(default)]
345 pub embedding: HashMap<String, EmbeddingConfig>,
346 #[serde(default)]
350 pub rerank: HashMap<String, RerankConfig>,
351 #[serde(default)]
355 pub rate_limit: RateLimitConfig,
356 #[serde(default)]
360 pub otlp: OtlpConfig,
361 #[serde(default)]
366 pub mvcc_reads: bool,
367}
368
369impl Default for Config {
370 fn default() -> Self {
371 Self {
372 data_dir: PathBuf::from("./quiver-data"),
373 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
374 grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
375 api_keys: Vec::new(),
376 encryption_key: None,
377 master_key_file: None,
378 tls_cert: None,
379 tls_key: None,
380 tls_client_ca: None,
381 audit_log: None,
382 leader_url: None,
383 leader_api_key: None,
384 insecure: false,
385 limits: Limits::default(),
386 embedding: HashMap::new(),
387 rerank: HashMap::new(),
388 rate_limit: RateLimitConfig::default(),
389 otlp: OtlpConfig::default(),
390 mvcc_reads: false,
391 }
392 }
393}
394
395impl Config {
396 pub fn load() -> Result<Self, Error> {
399 let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
400 .merge(Toml::file("quiver.toml"))
401 .merge(Env::prefixed("QUIVER_"))
402 .extract()
403 .map_err(|e| Error::Config(e.to_string()))?;
404 config.limits.apply_env_overrides()?;
407 config
409 .rate_limit
410 .apply_env_overrides()
411 .map_err(Error::Config)?;
412 config.otlp.apply_env_overrides().map_err(Error::Config)?;
414 Ok(config)
415 }
416
417 pub fn validate(&self) -> Result<(), Error> {
421 if self.api_keys.is_empty() && !self.insecure {
422 return Err(Error::Config(
423 "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
424 set insecure=true for local development"
425 .to_owned(),
426 ));
427 }
428 let master_key = self.master_key_hex()?;
430 if master_key.is_none() && !self.insecure {
431 return Err(Error::Config(
432 "no encryption key configured: encryption-at-rest is on by default — \
433 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
434 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
435 store data unencrypted (development only)"
436 .to_owned(),
437 ));
438 }
439 if let Some(key) = &master_key {
441 AeadCodec::from_hex(key)
442 .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
443 }
444 if self.tls_cert.is_some() != self.tls_key.is_some() {
446 return Err(Error::Config(
447 "tls_cert and tls_key must be set together".to_owned(),
448 ));
449 }
450 if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
452 return Err(Error::Config(
453 "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
454 ));
455 }
456 let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
457 let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
458 if non_loopback && !tls_enabled && !self.insecure {
459 return Err(Error::Config(
460 "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
461 or insecure=true for local development"
462 .to_owned(),
463 ));
464 }
465 self.limits.validate()?;
467 Ok(())
468 }
469
470 pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
480 let env_key = self
481 .encryption_key
482 .as_deref()
483 .map(str::trim)
484 .filter(|k| !k.is_empty());
485 match (&self.master_key_file, env_key) {
486 (Some(_), Some(_)) => Err(Error::Config(
487 "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
488 (QUIVER_MASTER_KEY_FILE), not both"
489 .to_owned(),
490 )),
491 (Some(path), None) => {
492 warn_if_world_readable(path);
493 let hex = std::fs::read_to_string(path).map_err(|e| {
494 Error::Config(format!("reading master_key_file {}: {e}", path.display()))
495 })?;
496 Ok(Some(hex.trim().to_owned()))
497 }
498 (None, Some(key)) => Ok(Some(key.to_owned())),
499 (None, None) => Ok(None),
500 }
501 }
502}
503
504#[cfg(unix)]
507fn warn_if_world_readable(path: &std::path::Path) {
508 use std::os::unix::fs::PermissionsExt;
509 if let Ok(meta) = std::fs::metadata(path)
510 && meta.permissions().mode() & 0o077 != 0
511 {
512 tracing::warn!(
513 path = %path.display(),
514 mode = format!("{:o}", meta.permissions().mode() & 0o777),
515 "master key file is group/world-accessible; restrict it to 0600"
516 );
517 }
518}
519
520#[cfg(not(unix))]
521fn warn_if_world_readable(_path: &std::path::Path) {}
522
523#[derive(Clone)]
526pub(crate) struct AppState {
527 db: Arc<RwLock<Database>>,
532 keys: Arc<Vec<ApiKey>>,
533 audit: Arc<AuditLog>,
534 replication_tx: broadcast::Sender<WalEntry>,
537 read_only: bool,
540 limits: Limits,
543 embed: Arc<EmbedRegistry>,
547 rate_limiter: Arc<RateLimiter>,
549 metrics: Arc<metrics::Metrics>,
551 rebuilding: Arc<Mutex<HashSet<String>>>,
555 snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
562 mvcc: bool,
567}
568
569pub(crate) struct CollectionInfo {
571 pub name: String,
572 pub dim: u32,
573 pub metric: DistanceMetric,
574 pub count: u64,
575 pub index: IndexSpec,
576 pub filterable: Vec<FilterableField>,
577 pub multivector: bool,
578 pub vector_encryption: VectorEncryption,
579}
580
581pub(crate) struct PointIn {
583 pub id: String,
584 pub vector: Vec<f32>,
585 pub payload: Value,
586}
587
588pub(crate) struct TextPointIn {
590 pub id: String,
591 pub text: String,
592 pub payload: Value,
593}
594
595const RERANK_CANDIDATES: usize = 50;
598
599fn doc_text(payload: Option<&Value>) -> String {
603 match payload {
604 Some(Value::Object(map)) => map
605 .get(TEXT_KEY)
606 .and_then(Value::as_str)
607 .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
608 Some(v) => v.to_string(),
609 None => String::new(),
610 }
611}
612
613pub(crate) struct PointOut {
615 pub id: String,
616 pub vector: Option<Vec<f32>>,
617 pub payload: Value,
618}
619
620pub(crate) struct MatchOut {
622 pub id: String,
623 pub score: f32,
624 pub payload: Option<Value>,
625 pub vector: Option<Vec<f32>>,
626}
627
628pub(crate) struct DocumentIn {
630 pub id: String,
631 pub vectors: Vec<Vec<f32>>,
632 pub payload: Value,
633}
634
635pub(crate) struct DocumentMatchOut {
637 pub id: String,
638 pub score: f32,
639 pub payload: Option<Value>,
640 pub vectors: Option<Vec<Vec<f32>>>,
641}
642
643impl AppState {
644 pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
648 auth::authenticate(&self.keys, presented)
649 }
650
651 pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
655 self.rate_limiter.check(actor)
656 }
657
658 pub(crate) fn rate_limit_enabled(&self) -> bool {
661 self.rate_limiter.enabled()
662 }
663
664 async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
668 where
669 T: Send + 'static,
670 F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
671 {
672 let db = Arc::clone(&self.db);
673 tokio::task::spawn_blocking(move || -> Result<T, Error> {
674 let mut guard = db
675 .write()
676 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
677 f(&mut guard).map_err(Error::Engine)
678 })
679 .await
680 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
681 }
682
683 async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
687 where
688 T: Send + 'static,
689 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
690 {
691 let db = Arc::clone(&self.db);
692 tokio::task::spawn_blocking(move || -> Result<T, Error> {
693 let guard = db
694 .read()
695 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
696 f(&guard).map_err(Error::Engine)
697 })
698 .await
699 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
700 }
701
702 async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
709 where
710 T: Send + 'static,
711 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
712 {
713 let db = Arc::clone(&self.db);
714 let cells = Arc::clone(&self.snapshot_cells);
715 let coll = collection.clone();
716 let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
717 let guard = db
718 .read()
719 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
720 let result = f(&guard).map_err(Error::Engine)?;
721 let stale = guard.needs_rebuild(&coll).unwrap_or(false);
724 if !stale
729 && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
730 && let Ok(mut map) = cells.write()
731 {
732 map.entry(coll.clone()).or_insert(cell);
733 }
734 Ok((result, stale))
735 })
736 .await
737 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
738 if stale {
739 self.schedule_rebuild(collection);
740 }
741 Ok(result)
742 }
743
744 fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
748 self.snapshot_cells.read().ok()?.get(collection).cloned()
749 }
750
751 async fn schedule_if_stale(&self, collection: &str) {
757 let db = Arc::clone(&self.db);
758 let coll = collection.to_owned();
759 let stale = tokio::task::spawn_blocking(move || {
760 db.read()
761 .ok()
762 .and_then(|g| g.needs_rebuild(&coll).ok())
763 .unwrap_or(false)
764 })
765 .await
766 .unwrap_or(false);
767 if stale {
768 self.schedule_rebuild(collection.to_owned());
769 }
770 }
771
772 fn schedule_rebuild(&self, collection: String) {
776 {
777 let mut inflight = match self.rebuilding.lock() {
778 Ok(g) => g,
779 Err(_) => return,
780 };
781 if !inflight.insert(collection.clone()) {
782 return; }
784 }
785 let state = self.clone();
786 tokio::spawn(async move {
787 state.run_rebuild(&collection).await;
788 if let Ok(mut inflight) = state.rebuilding.lock() {
789 inflight.remove(&collection);
790 }
791 });
792 }
793
794 async fn run_rebuild(&self, collection: &str) {
801 loop {
802 let db = Arc::clone(&self.db);
803 let coll = collection.to_owned();
804 let inputs = tokio::task::spawn_blocking(move || {
805 let guard = db.read().ok()?;
806 guard.snapshot_rebuild_inputs(&coll).ok().flatten()
807 })
808 .await
809 .ok()
810 .flatten();
811 let Some(inputs) = inputs else { return };
812
813 let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
814 return;
815 };
816
817 let db = Arc::clone(&self.db);
818 let still_stale = tokio::task::spawn_blocking(move || {
819 let mut guard = db.write().ok()?;
820 guard.commit_rebuild(rebuilt).ok()
821 })
822 .await
823 .ok()
824 .flatten();
825 match still_stale {
826 Some(true) => continue, _ => return,
828 }
829 }
830 }
831
832 fn authorize(
835 &self,
836 principal: &Principal,
837 action: Action,
838 op: &str,
839 resource: &str,
840 ) -> Result<(), Error> {
841 principal
842 .require(action, Some(resource))
843 .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
844 }
845
846 fn authorize_global(
849 &self,
850 principal: &Principal,
851 action: Action,
852 op: &str,
853 ) -> Result<(), Error> {
854 principal
855 .require(action, None)
856 .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
857 }
858
859 pub(crate) async fn open_replication(
866 &self,
867 principal: &Principal,
868 ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
869 self.authorize_global(principal, Action::Admin, "replicate")?;
870 let tx = self.replication_tx.clone();
871 self.read_blocking(move |db| {
872 let rx = tx.subscribe();
873 let snapshot = db.replication_snapshot()?;
874 Ok((snapshot, rx))
875 })
876 .await
877 }
878
879 pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
883 self.write_blocking(move |db| db.apply_replicated(op)).await
884 }
885
886 fn ensure_writable(&self, op: &str) -> Result<(), Error> {
889 if self.read_only {
890 return Err(Error::Forbidden(format!(
891 "{op}: this node is a read-only replication follower"
892 )));
893 }
894 Ok(())
895 }
896
897 #[allow(clippy::too_many_arguments)]
898 pub(crate) async fn create_collection(
899 &self,
900 principal: &Principal,
901 name: String,
902 dim: u32,
903 metric: DistanceMetric,
904 index: IndexSpec,
905 filterable: Vec<FilterableField>,
906 multivector: bool,
907 vector_encryption: VectorEncryption,
908 ) -> Result<CollectionInfo, Error> {
909 self.ensure_writable("create_collection")?;
910 self.authorize(principal, Action::Admin, "create_collection", &name)?;
911 self.limits.check_dim(dim as usize)?;
912 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
913 .with_index(index)
914 .with_filterable(filterable.clone())
915 .with_multivector(multivector)
916 .with_vector_encryption(vector_encryption);
917 let owned = name.clone();
918 let result = self
919 .write_blocking(move |db| db.create_collection(&owned, descriptor))
920 .await;
921 self.audit.record(
922 principal.actor(),
923 "create_collection",
924 &name,
925 Outcome::of(&result),
926 );
927 result?;
928 Ok(CollectionInfo {
929 name,
930 dim,
931 metric,
932 count: 0,
933 index,
934 filterable,
935 multivector,
936 vector_encryption,
937 })
938 }
939
940 pub(crate) async fn get_collection(
941 &self,
942 principal: &Principal,
943 name: String,
944 ) -> Result<CollectionInfo, Error> {
945 self.authorize(principal, Action::Read, "get_collection", &name)?;
946 self.read_blocking(move |db| {
947 let descriptor = db
948 .descriptor(&name)
949 .cloned()
950 .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
951 let count = if descriptor.multivector {
954 db.document_count(&name)? as u64
955 } else {
956 db.len(&name)? as u64
957 };
958 Ok(CollectionInfo {
959 name,
960 dim: descriptor.dim,
961 metric: descriptor.metric,
962 count,
963 index: descriptor.index,
964 filterable: descriptor.filterable,
965 multivector: descriptor.multivector,
966 vector_encryption: descriptor.vector_encryption,
967 })
968 })
969 .await
970 }
971
972 pub(crate) async fn list_collections(
973 &self,
974 principal: &Principal,
975 ) -> Result<Vec<CollectionInfo>, Error> {
976 self.authorize_global(principal, Action::Read, "list_collections")?;
977 let mut infos = self
978 .read_blocking(|db| {
979 let mut out = Vec::new();
980 for name in db.collection_names() {
981 if let Some(descriptor) = db.descriptor(&name).cloned() {
982 let count = if descriptor.multivector {
983 db.document_count(&name)? as u64
984 } else {
985 db.len(&name)? as u64
986 };
987 out.push(CollectionInfo {
988 name,
989 dim: descriptor.dim,
990 metric: descriptor.metric,
991 count,
992 index: descriptor.index,
993 filterable: descriptor.filterable,
994 multivector: descriptor.multivector,
995 vector_encryption: descriptor.vector_encryption,
996 });
997 }
998 }
999 Ok(out)
1000 })
1001 .await?;
1002 infos.retain(|info| principal.can_see(&info.name));
1004 Ok(infos)
1005 }
1006
1007 pub(crate) async fn delete_collection(
1008 &self,
1009 principal: &Principal,
1010 name: String,
1011 ) -> Result<bool, Error> {
1012 self.ensure_writable("delete_collection")?;
1013 self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1014 let resource = name.clone();
1015 let result = self
1016 .write_blocking(move |db| db.drop_collection(&name))
1017 .await;
1018 self.audit.record(
1019 principal.actor(),
1020 "delete_collection",
1021 &resource,
1022 Outcome::of(&result),
1023 );
1024 if matches!(result, Ok(true))
1027 && let Ok(mut map) = self.snapshot_cells.write()
1028 {
1029 map.remove(&resource);
1030 }
1031 result
1032 }
1033
1034 #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1035 pub(crate) async fn upsert(
1036 &self,
1037 principal: &Principal,
1038 collection: String,
1039 points: Vec<PointIn>,
1040 ) -> Result<u64, Error> {
1041 self.ensure_writable("upsert")?;
1042 self.authorize(principal, Action::Write, "upsert", &collection)?;
1043 self.limits.check_batch(points.len())?;
1044 for p in &points {
1045 self.limits.check_vector_len(p.vector.len())?;
1046 self.limits.check_payload(&p.payload)?;
1047 }
1048 let resource = collection.clone();
1049 let result = self
1050 .write_blocking(move |db| {
1051 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1052 .iter()
1053 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1054 .collect();
1055 db.upsert_batch(&collection, &records)
1056 })
1057 .await;
1058 self.audit
1059 .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1060 if self.mvcc && result.is_ok() {
1061 self.schedule_if_stale(&resource).await;
1062 }
1063 result
1064 }
1065
1066 pub(crate) async fn upsert_bulk(
1069 &self,
1070 principal: &Principal,
1071 collection: String,
1072 points: Vec<PointIn>,
1073 ) -> Result<u64, Error> {
1074 self.ensure_writable("upsert")?;
1075 self.authorize(principal, Action::Write, "upsert", &collection)?;
1076 self.limits.check_bulk_batch(points.len())?;
1077 for p in &points {
1078 self.limits.check_vector_len(p.vector.len())?;
1079 self.limits.check_payload(&p.payload)?;
1080 }
1081 let resource = collection.clone();
1082 let result = self
1083 .write_blocking(move |db| {
1084 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1085 .iter()
1086 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1087 .collect();
1088 db.upsert_bulk(&collection, &records)
1089 })
1090 .await;
1091 self.audit.record(
1092 principal.actor(),
1093 "upsert_bulk",
1094 &resource,
1095 Outcome::of(&result),
1096 );
1097 if self.mvcc && result.is_ok() {
1098 self.schedule_if_stale(&resource).await;
1099 }
1100 result
1101 }
1102
1103 #[tracing::instrument(skip_all)]
1107 pub(crate) async fn snapshot(
1108 &self,
1109 principal: &Principal,
1110 destination: String,
1111 ) -> Result<SnapshotInfo, Error> {
1112 self.ensure_writable("snapshot")?;
1113 self.authorize_global(principal, Action::Admin, "snapshot")?;
1114 let dest = std::path::PathBuf::from(&destination);
1115 let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1116 self.audit.record(
1117 principal.actor(),
1118 "snapshot",
1119 &destination,
1120 Outcome::of(&result),
1121 );
1122 result
1123 }
1124
1125 pub(crate) async fn delete_points(
1126 &self,
1127 principal: &Principal,
1128 collection: String,
1129 ids: Vec<String>,
1130 ) -> Result<u64, Error> {
1131 self.ensure_writable("delete_points")?;
1132 self.authorize(principal, Action::Write, "delete_points", &collection)?;
1133 let resource = collection.clone();
1134 let result = self
1135 .write_blocking(move |db| {
1136 let mut count = 0u64;
1137 for id in &ids {
1138 if db.delete(&collection, id)? {
1139 count += 1;
1140 }
1141 }
1142 Ok(count)
1143 })
1144 .await;
1145 self.audit.record(
1146 principal.actor(),
1147 "delete_points",
1148 &resource,
1149 Outcome::of(&result),
1150 );
1151 if self.mvcc && result.is_ok() {
1152 self.schedule_if_stale(&resource).await;
1153 }
1154 result
1155 }
1156
1157 pub(crate) async fn get_points(
1158 &self,
1159 principal: &Principal,
1160 collection: String,
1161 ids: Vec<String>,
1162 with_vector: bool,
1163 ) -> Result<Vec<PointOut>, Error> {
1164 self.authorize(principal, Action::Read, "get_points", &collection)?;
1165 self.read_blocking(move |db| {
1166 let mut out = Vec::new();
1167 for id in &ids {
1168 if let Some(m) = db.get(&collection, id)? {
1169 out.push(PointOut {
1170 id: m.id,
1171 vector: if with_vector { m.vector } else { None },
1172 payload: m.payload.unwrap_or(Value::Null),
1173 });
1174 }
1175 }
1176 Ok(out)
1177 })
1178 .await
1179 }
1180
1181 #[allow(clippy::too_many_arguments)]
1182 #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1183 pub(crate) async fn search(
1184 &self,
1185 principal: &Principal,
1186 collection: String,
1187 vector: Vec<f32>,
1188 k: usize,
1189 filter: Option<Filter>,
1190 ef_search: usize,
1191 with_payload: bool,
1192 with_vector: bool,
1193 ) -> Result<Vec<MatchOut>, Error> {
1194 self.authorize(principal, Action::Read, "search", &collection)?;
1195 self.limits.check_search(k, ef_search)?;
1196 self.limits.check_vector_len(vector.len())?;
1197
1198 if filter.is_none()
1204 && !with_payload
1205 && !with_vector
1206 && let Some(cell) = self.cached_cell(&collection)
1207 {
1208 return tokio::task::spawn_blocking(move || {
1209 let matches = cell.load().search(&vector, k, ef_search)?;
1210 Ok::<_, quiver_embed::Error>(
1211 matches
1212 .into_iter()
1213 .map(|m| MatchOut {
1214 id: m.id,
1215 score: m.score,
1216 payload: None,
1217 vector: None,
1218 })
1219 .collect(),
1220 )
1221 })
1222 .await
1223 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1224 .map_err(Error::Engine);
1225 }
1226
1227 let params = SearchParams {
1228 k,
1229 filter,
1230 ef_search,
1231 with_payload,
1232 with_vector,
1233 };
1234 let coll = collection.clone();
1235 self.search_blocking(coll, move |db| {
1236 let matches = db.search_snapshot(&collection, &vector, ¶ms)?;
1237 Ok(matches
1238 .into_iter()
1239 .map(|m| MatchOut {
1240 id: m.id,
1241 score: m.score,
1242 payload: m.payload,
1243 vector: m.vector,
1244 })
1245 .collect())
1246 })
1247 .await
1248 }
1249
1250 #[allow(clippy::too_many_arguments)]
1251 pub(crate) async fn hybrid_search(
1252 &self,
1253 principal: &Principal,
1254 collection: String,
1255 dense: Option<Vec<f32>>,
1256 sparse: Option<(Vec<u32>, Vec<f32>)>,
1257 text: Option<String>,
1258 k: usize,
1259 filter: Option<Filter>,
1260 ef_search: usize,
1261 rrf_k0: f32,
1262 with_payload: bool,
1263 with_vector: bool,
1264 ) -> Result<Vec<MatchOut>, Error> {
1265 self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1266 self.limits.check_search(k, ef_search)?;
1267 if let Some(v) = &dense {
1268 self.limits.check_vector_len(v.len())?;
1269 }
1270 if let Some((indices, values)) = &sparse {
1271 self.limits.check_sparse_terms(indices.len())?;
1272 if indices.len() != values.len() {
1273 return Err(Error::BadRequest(format!(
1274 "sparse query indices ({}) and values ({}) length mismatch",
1275 indices.len(),
1276 values.len()
1277 )));
1278 }
1279 }
1280 let params = SearchParams {
1281 k,
1282 filter,
1283 ef_search,
1284 with_payload,
1285 with_vector,
1286 };
1287 let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1288 let coll = collection.clone();
1289 self.search_blocking(coll, move |db| {
1290 let matches = db.hybrid_search_snapshot(
1291 &collection,
1292 dense.as_deref(),
1293 sv.as_ref(),
1294 text.as_deref(),
1295 ¶ms,
1296 rrf_k0,
1297 )?;
1298 Ok(matches
1299 .into_iter()
1300 .map(|m| MatchOut {
1301 id: m.id,
1302 score: m.score,
1303 payload: m.payload,
1304 vector: m.vector,
1305 })
1306 .collect())
1307 })
1308 .await
1309 }
1310
1311 #[allow(clippy::too_many_arguments)]
1316 pub(crate) async fn search_text(
1317 &self,
1318 principal: &Principal,
1319 collection: String,
1320 text: String,
1321 k: usize,
1322 filter: Option<Filter>,
1323 ef_search: usize,
1324 rrf_k0: f32,
1325 with_payload: bool,
1326 with_vector: bool,
1327 rerank: bool,
1328 ) -> Result<Vec<MatchOut>, Error> {
1329 self.authorize(principal, Action::Read, "search_text", &collection)?;
1330 self.limits.check_search(k, ef_search)?;
1331 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1332 Error::BadRequest(format!(
1333 "collection {collection:?} has no embedding provider configured \
1334 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1335 ))
1336 })?;
1337 let query = text.clone();
1339 let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1340 .await
1341 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1342 .map_err(|e| Error::Upstream(e.to_string()))?
1343 .into_iter()
1344 .next()
1345 .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1346 self.limits.check_vector_len(vector.len())?;
1347
1348 let reranker = if rerank {
1349 self.embed.reranker(&collection)
1350 } else {
1351 None
1352 };
1353 let need_payload = with_payload || reranker.is_some();
1357 let fetch_k = if reranker.is_some() {
1358 k.max(RERANK_CANDIDATES)
1359 } else {
1360 k
1361 };
1362
1363 let mut hits = self
1364 .hybrid_search(
1365 principal,
1366 collection,
1367 Some(vector),
1368 None,
1369 Some(text.clone()),
1370 fetch_k,
1371 filter,
1372 ef_search,
1373 rrf_k0,
1374 need_payload,
1375 with_vector,
1376 )
1377 .await?;
1378
1379 if let Some(rr) = reranker {
1380 let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1381 let query = text;
1382 let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1383 .await
1384 .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1385 .map_err(|e| Error::Upstream(e.to_string()))?;
1386 let mut scored: Vec<(f32, MatchOut)> = scores
1388 .into_iter()
1389 .zip(hits)
1390 .map(|(s, mut h)| {
1391 h.score = s;
1392 (s, h)
1393 })
1394 .collect();
1395 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1396 hits = scored.into_iter().map(|(_, h)| h).collect();
1397 }
1398
1399 hits.truncate(k);
1400 if !with_payload {
1402 for h in &mut hits {
1403 h.payload = None;
1404 }
1405 }
1406 Ok(hits)
1407 }
1408
1409 pub(crate) async fn upsert_text(
1414 &self,
1415 principal: &Principal,
1416 collection: String,
1417 points: Vec<TextPointIn>,
1418 ) -> Result<u64, Error> {
1419 self.ensure_writable("upsert_text")?;
1420 self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1421 self.limits.check_batch(points.len())?;
1422 for p in &points {
1423 if !matches!(p.payload, Value::Object(_) | Value::Null) {
1424 return Err(Error::BadRequest(
1425 "upsert_text payload must be a JSON object or null".to_owned(),
1426 ));
1427 }
1428 }
1429 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1430 Error::BadRequest(format!(
1431 "collection {collection:?} has no embedding provider configured \
1432 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1433 ))
1434 })?;
1435 let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1436 let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1437 .await
1438 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1439 .map_err(|e| Error::Upstream(e.to_string()))?;
1440 if vectors.len() != points.len() {
1441 return Err(Error::Upstream(format!(
1442 "embedding provider returned {} vectors for {} inputs",
1443 vectors.len(),
1444 points.len()
1445 )));
1446 }
1447 let dense: Vec<PointIn> = points
1448 .into_iter()
1449 .zip(vectors)
1450 .map(|(p, vector)| {
1451 let mut payload = match p.payload {
1452 Value::Object(map) => map,
1453 _ => serde_json::Map::new(),
1454 };
1455 payload
1457 .entry(TEXT_KEY.to_owned())
1458 .or_insert_with(|| Value::String(p.text.clone()));
1459 PointIn {
1460 id: p.id,
1461 vector,
1462 payload: Value::Object(payload),
1463 }
1464 })
1465 .collect();
1466 self.upsert(principal, collection, dense).await
1467 }
1468
1469 pub(crate) async fn fetch(
1470 &self,
1471 principal: &Principal,
1472 collection: String,
1473 filter: Option<Filter>,
1474 limit: usize,
1475 with_payload: bool,
1476 with_vector: bool,
1477 ) -> Result<Vec<MatchOut>, Error> {
1478 self.authorize(principal, Action::Read, "fetch", &collection)?;
1479 self.limits.check_fetch(limit)?;
1480 self.read_blocking(move |db| {
1481 let matches = db.fetch(
1482 &collection,
1483 filter.as_ref(),
1484 limit,
1485 with_payload,
1486 with_vector,
1487 )?;
1488 Ok(matches
1489 .into_iter()
1490 .map(|m| MatchOut {
1491 id: m.id,
1492 score: m.score,
1493 payload: m.payload,
1494 vector: m.vector,
1495 })
1496 .collect())
1497 })
1498 .await
1499 }
1500
1501 pub(crate) async fn upsert_documents(
1502 &self,
1503 principal: &Principal,
1504 collection: String,
1505 documents: Vec<DocumentIn>,
1506 ) -> Result<u64, Error> {
1507 self.ensure_writable("upsert_documents")?;
1508 self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1509 self.limits.check_batch(documents.len())?;
1510 for doc in &documents {
1511 self.limits.check_payload(&doc.payload)?;
1512 for token in &doc.vectors {
1513 self.limits.check_vector_len(token.len())?;
1514 }
1515 }
1516 let resource = collection.clone();
1517 let result = self
1518 .write_blocking(move |db| {
1519 let mut count = 0u64;
1520 for doc in &documents {
1521 db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1522 count += 1;
1523 }
1524 Ok(count)
1525 })
1526 .await;
1527 self.audit.record(
1528 principal.actor(),
1529 "upsert_documents",
1530 &resource,
1531 Outcome::of(&result),
1532 );
1533 result
1534 }
1535
1536 pub(crate) async fn delete_documents(
1537 &self,
1538 principal: &Principal,
1539 collection: String,
1540 ids: Vec<String>,
1541 ) -> Result<u64, Error> {
1542 self.ensure_writable("delete_documents")?;
1543 self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1544 let resource = collection.clone();
1545 let result = self
1546 .write_blocking(move |db| {
1547 let mut count = 0u64;
1548 for id in &ids {
1549 if db.delete_document(&collection, id)? {
1550 count += 1;
1551 }
1552 }
1553 Ok(count)
1554 })
1555 .await;
1556 self.audit.record(
1557 principal.actor(),
1558 "delete_documents",
1559 &resource,
1560 Outcome::of(&result),
1561 );
1562 result
1563 }
1564
1565 #[allow(clippy::too_many_arguments)]
1566 pub(crate) async fn search_multi_vector(
1567 &self,
1568 principal: &Principal,
1569 collection: String,
1570 query: Vec<Vec<f32>>,
1571 k: usize,
1572 filter: Option<Filter>,
1573 ef_search: usize,
1574 with_payload: bool,
1575 with_vector: bool,
1576 ) -> Result<Vec<DocumentMatchOut>, Error> {
1577 self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1578 self.limits.check_search(k, ef_search)?;
1579 for token in &query {
1580 self.limits.check_vector_len(token.len())?;
1581 }
1582 let params = SearchParams {
1583 k,
1584 filter,
1585 ef_search,
1586 with_payload,
1587 with_vector,
1588 };
1589 let coll = collection.clone();
1590 self.search_blocking(coll, move |db| {
1591 let matches = db.search_multi_vector_snapshot(&collection, &query, ¶ms)?;
1592 Ok(matches
1593 .into_iter()
1594 .map(|m| DocumentMatchOut {
1595 id: m.id,
1596 score: m.score,
1597 payload: m.payload,
1598 vectors: m.vectors,
1599 })
1600 .collect())
1601 })
1602 .await
1603 }
1604}
1605
1606const REPLICATION_BUFFER: usize = 1024;
1609
1610pub async fn run(config: Config) -> Result<(), Error> {
1612 config.validate()?;
1613 let rest_listener = TcpListener::bind(config.rest_addr)
1614 .await
1615 .map_err(Error::Io)?;
1616 let grpc_listener = TcpListener::bind(config.grpc_addr)
1617 .await
1618 .map_err(Error::Io)?;
1619 tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1620 tokio::select! {
1621 result = serve(config, rest_listener, grpc_listener) => result,
1622 () = shutdown_signal() => {
1623 tracing::info!("shutdown signal received");
1624 Ok(())
1625 }
1626 }
1627}
1628
1629pub async fn serve(
1632 config: Config,
1633 rest_listener: TcpListener,
1634 grpc_listener: TcpListener,
1635) -> Result<(), Error> {
1636 let mut db = open_database(&config)?;
1637 let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1638 let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1642 {
1643 let tx = replication_tx.clone();
1644 db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1645 let _ = tx.send(entry.clone());
1646 }));
1647 }
1648 let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1652 .map_err(|e| Error::Config(e.to_string()))?;
1653
1654 if config.mvcc_reads {
1657 db.set_mvcc_reads(true);
1658 }
1659 let mvcc = db.mvcc_reads();
1660
1661 let state = AppState {
1662 db: Arc::new(RwLock::new(db)),
1663 keys: Arc::new(config.api_keys.clone()),
1664 audit,
1665 replication_tx,
1666 read_only: config.leader_url.is_some(),
1667 limits: config.limits,
1668 embed: Arc::new(embed),
1669 rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1670 metrics: Arc::new(metrics::Metrics::default()),
1671 rebuilding: Arc::new(Mutex::new(HashSet::new())),
1672 snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
1673 mvcc,
1674 };
1675
1676 if let Some(leader_url) = config.leader_url.clone() {
1678 replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1679 }
1680
1681 let app = rest::router(state.clone());
1682 let grpc = grpc::service(state);
1683
1684 let tls = load_tls(&config)?;
1685
1686 let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1688 Some(material) => {
1689 let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1690 let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1691 let server =
1692 axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1693 Box::pin(async move {
1694 server
1695 .serve(app.into_make_service())
1696 .await
1697 .map_err(Error::Io)
1698 })
1699 }
1700 None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1701 };
1702
1703 let mut grpc_builder = tonic::transport::Server::builder();
1705 if let Some(material) = &tls {
1706 let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1707 let mut tls_config = ServerTlsConfig::new().identity(identity);
1708 if let Some(ca_pem) = &material.client_ca_pem {
1710 tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1711 }
1712 grpc_builder = grpc_builder
1713 .tls_config(tls_config)
1714 .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1715 }
1716 let grpc_fut = async move {
1717 grpc_builder
1718 .add_service(grpc)
1719 .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1720 .await
1721 .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1722 };
1723
1724 tokio::try_join!(rest_fut, grpc_fut)?;
1725 Ok(())
1726}
1727
1728async fn shutdown_signal() {
1729 let _ = tokio::signal::ctrl_c().await;
1730}
1731
1732fn open_database(config: &Config) -> Result<Database, Error> {
1738 let master_key = config.master_key_hex()?;
1739 let keyring =
1740 quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1741 .map_err(|e| Error::Config(e.to_string()))?;
1742 let db = match keyring {
1743 Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1744 None => Database::open(&config.data_dir)?,
1745 };
1746 Ok(db)
1747}
1748
1749struct TlsMaterial {
1753 cert_pem: Vec<u8>,
1754 key_pem: Vec<u8>,
1755 client_ca_pem: Option<Vec<u8>>,
1756 rest_config: Arc<rustls::ServerConfig>,
1757}
1758
1759fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1764 match (&config.tls_cert, &config.tls_key) {
1765 (Some(cert_path), Some(key_path)) => {
1766 let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1767 let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1768 let client_ca_pem = config
1769 .tls_client_ca
1770 .as_ref()
1771 .map(std::fs::read)
1772 .transpose()
1773 .map_err(Error::Io)?;
1774 let rest_config = Arc::new(rustls_server_config(
1775 &cert_pem,
1776 &key_pem,
1777 client_ca_pem.as_deref(),
1778 )?);
1779 Ok(Some(TlsMaterial {
1780 cert_pem,
1781 key_pem,
1782 client_ca_pem,
1783 rest_config,
1784 }))
1785 }
1786 (None, None) => Ok(None),
1787 _ => Err(Error::Config(
1788 "tls_cert and tls_key must be set together".to_owned(),
1789 )),
1790 }
1791}
1792
1793fn rustls_server_config(
1797 cert_pem: &[u8],
1798 key_pem: &[u8],
1799 client_ca_pem: Option<&[u8]>,
1800) -> Result<rustls::ServerConfig, Error> {
1801 use rustls_pki_types::pem::PemObject;
1802 use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1803
1804 let certs = CertificateDer::pem_slice_iter(cert_pem)
1805 .collect::<std::result::Result<Vec<_>, _>>()
1806 .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1807 if certs.is_empty() {
1808 return Err(Error::Config(
1809 "tls_cert contains no certificates".to_owned(),
1810 ));
1811 }
1812 let key = PrivateKeyDer::from_pem_slice(key_pem)
1813 .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1814 let provider = Arc::new(rustls::crypto::ring::default_provider());
1815 let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1816 .with_safe_default_protocol_versions()
1817 .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1818 let builder = match client_ca_pem {
1819 Some(ca_pem) => {
1820 let mut roots = rustls::RootCertStore::empty();
1821 for cert in CertificateDer::pem_slice_iter(ca_pem) {
1822 let cert =
1823 cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1824 roots
1825 .add(cert)
1826 .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1827 }
1828 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1829 Arc::new(roots),
1830 provider,
1831 )
1832 .build()
1833 .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1834 builder.with_client_cert_verifier(verifier)
1835 }
1836 None => builder.with_no_client_auth(),
1837 };
1838 builder
1839 .with_single_cert(certs, key)
1840 .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1841}
1842
1843pub fn init_tracing() {
1846 init_observability(&Config::default());
1847}
1848
1849#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1855pub fn init_observability(config: &Config) {
1856 use tracing_subscriber::EnvFilter;
1857 use tracing_subscriber::prelude::*;
1858 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1859 let registry = tracing_subscriber::registry()
1860 .with(filter)
1861 .with(tracing_subscriber::fmt::layer());
1862
1863 #[cfg(feature = "otlp")]
1864 if config.otlp.is_enabled() {
1865 match otlp::build_provider(&config.otlp) {
1866 Ok(provider) => {
1867 use opentelemetry::trace::TracerProvider as _;
1868 let tracer = provider.tracer("quiver");
1869 otlp::store_provider(provider);
1870 let _ = registry
1871 .with(tracing_opentelemetry::layer().with_tracer(tracer))
1872 .try_init();
1873 return;
1874 }
1875 Err(e) => eprintln!("OTLP traces export disabled: {e}"),
1876 }
1877 }
1878
1879 let _ = registry.try_init();
1880}
1881
1882pub fn shutdown_observability() {
1886 #[cfg(feature = "otlp")]
1887 otlp::shutdown();
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892 use super::*;
1893
1894 const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
1896
1897 #[test]
1898 fn config_rejects_missing_keys_unless_insecure() {
1899 let mut config = Config::default();
1900 assert!(config.validate().is_err());
1901 config.insecure = true;
1902 assert!(config.validate().is_ok());
1903 config.insecure = false;
1904 config.api_keys = vec!["secret".into()];
1905 config.encryption_key = Some(TEST_KEY.to_owned());
1906 assert!(config.validate().is_ok());
1907 }
1908
1909 #[test]
1910 fn config_requires_encryption_key_unless_insecure() {
1911 let mut config = Config {
1912 api_keys: vec!["secret".into()],
1913 ..Config::default()
1914 };
1915 assert!(config.validate().is_err());
1917 config.encryption_key = Some(TEST_KEY.to_owned());
1918 assert!(config.validate().is_ok());
1919 config.encryption_key = Some("not-a-valid-hex-key".to_owned());
1921 assert!(config.validate().is_err());
1922 config.insecure = true;
1924 config.encryption_key = None;
1925 assert!(config.validate().is_ok());
1926 }
1927
1928 #[test]
1929 fn master_key_file_is_an_alternative_to_the_env_key() {
1930 let dir = tempfile::tempdir().unwrap();
1931 let path = dir.path().join("master.key");
1932 std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
1934
1935 let mut config = Config {
1936 api_keys: vec!["secret".into()],
1937 master_key_file: Some(path.clone()),
1938 ..Config::default()
1939 };
1940 assert!(config.validate().is_ok());
1942 assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
1943
1944 config.encryption_key = Some(TEST_KEY.to_owned());
1946 assert!(config.validate().is_err());
1947
1948 config.encryption_key = None;
1950 std::fs::write(&path, "not-a-valid-key").unwrap();
1951 assert!(config.validate().is_err());
1952 }
1953
1954 #[test]
1955 fn config_rejects_public_bind_without_optout() {
1956 let mut config = Config {
1957 api_keys: vec!["secret".into()],
1958 encryption_key: Some(TEST_KEY.to_owned()),
1959 ..Config::default()
1960 };
1961 config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
1962 assert!(config.validate().is_err());
1964 config.insecure = true;
1965 assert!(config.validate().is_ok());
1966 }
1967
1968 #[test]
1969 fn config_public_bind_allowed_with_tls() {
1970 let config = Config {
1971 api_keys: vec!["secret".into()],
1972 encryption_key: Some(TEST_KEY.to_owned()),
1973 tls_cert: Some(PathBuf::from("cert.pem")),
1974 tls_key: Some(PathBuf::from("key.pem")),
1975 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
1976 ..Config::default()
1977 };
1978 assert!(config.validate().is_ok());
1980 }
1981
1982 #[test]
1983 fn config_tls_cert_and_key_must_pair() {
1984 let mut config = Config {
1985 api_keys: vec!["secret".into()],
1986 encryption_key: Some(TEST_KEY.to_owned()),
1987 tls_cert: Some(PathBuf::from("cert.pem")),
1988 ..Config::default()
1989 };
1990 assert!(config.validate().is_err());
1992 config.tls_key = Some(PathBuf::from("key.pem"));
1993 assert!(config.validate().is_ok());
1994 }
1995}