1mod audit;
33mod auth;
34mod error;
35mod grpc;
36mod metrics;
37mod otlp;
38mod rate_limit;
39mod replication;
40mod rest;
41
42use std::collections::{HashMap, HashSet};
43use std::future::Future;
44use std::net::{IpAddr, Ipv4Addr, SocketAddr};
45use std::path::PathBuf;
46use std::pin::Pin;
47use std::sync::{Arc, Mutex, RwLock};
48
49use axum_server::tls_rustls::RustlsConfig;
50use figment::Figment;
51use figment::providers::{Env, Format, Serialized, Toml};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use tokio::net::TcpListener;
55use tokio::sync::broadcast;
56use tokio_stream::wrappers::TcpListenerStream;
57use tonic::transport::{Certificate, Identity, ServerTlsConfig};
58
59use quiver_crypto::AeadCodec;
60use quiver_embed::{
61 Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
62 SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
63};
64use quiver_query::Filter;
65
66pub use auth::{Action, ApiKey, CollectionScope};
67pub use error::Error;
68pub use otlp::OtlpConfig;
69pub use quiver_providers::{
72 EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
73 RerankProvider,
74};
75pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
76
77use audit::{AuditLog, Outcome};
78use auth::Principal;
79
80#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
88#[serde(default)]
89pub struct Limits {
90 pub max_k: usize,
92 pub max_ef_search: usize,
94 pub max_fetch_limit: usize,
96 pub max_vector_dim: usize,
99 pub max_payload_bytes: usize,
101 pub max_batch_size: usize,
103 pub max_request_body_bytes: usize,
106 pub max_sparse_terms: usize,
109 pub max_bulk_batch_size: usize,
114}
115
116impl Default for Limits {
117 fn default() -> Self {
118 Self {
119 max_k: 10_000,
120 max_ef_search: 4_096,
121 max_fetch_limit: 10_000,
122 max_vector_dim: 8_192,
123 max_payload_bytes: 65_536,
124 max_batch_size: 1_000,
125 max_request_body_bytes: 32 * 1024 * 1024,
126 max_sparse_terms: 4_096,
127 max_bulk_batch_size: 50_000,
128 }
129 }
130}
131
132impl Limits {
133 fn apply_env_overrides(&mut self) -> Result<(), Error> {
137 let slots: [(&str, &mut usize); 9] = [
138 ("QUIVER_MAX_K", &mut self.max_k),
139 ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
140 ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
141 ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
142 ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
143 ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
144 (
145 "QUIVER_MAX_REQUEST_BODY_BYTES",
146 &mut self.max_request_body_bytes,
147 ),
148 ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
149 ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
150 ];
151 for (key, slot) in slots {
152 if let Ok(raw) = std::env::var(key) {
153 *slot = raw.parse().map_err(|_| {
154 Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
155 })?;
156 }
157 }
158 Ok(())
159 }
160
161 fn validate(&self) -> Result<(), Error> {
163 let named = [
164 ("max_k", self.max_k),
165 ("max_ef_search", self.max_ef_search),
166 ("max_fetch_limit", self.max_fetch_limit),
167 ("max_vector_dim", self.max_vector_dim),
168 ("max_payload_bytes", self.max_payload_bytes),
169 ("max_batch_size", self.max_batch_size),
170 ("max_request_body_bytes", self.max_request_body_bytes),
171 ("max_sparse_terms", self.max_sparse_terms),
172 ("max_bulk_batch_size", self.max_bulk_batch_size),
173 ];
174 if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
175 return Err(Error::Config(format!(
176 "limits.{name} must be greater than zero"
177 )));
178 }
179 Ok(())
180 }
181
182 fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
183 if k > self.max_k {
184 return Err(Error::BadRequest(format!(
185 "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
186 self.max_k
187 )));
188 }
189 if ef_search > self.max_ef_search {
190 return Err(Error::BadRequest(format!(
191 "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
192 self.max_ef_search
193 )));
194 }
195 Ok(())
196 }
197
198 fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
199 if n > self.max_sparse_terms {
200 return Err(Error::BadRequest(format!(
201 "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
202 self.max_sparse_terms
203 )));
204 }
205 Ok(())
206 }
207
208 fn check_fetch(&self, limit: usize) -> Result<(), Error> {
209 if limit > self.max_fetch_limit {
210 return Err(Error::BadRequest(format!(
211 "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
212 self.max_fetch_limit
213 )));
214 }
215 Ok(())
216 }
217
218 fn check_dim(&self, dim: usize) -> Result<(), Error> {
219 if dim > self.max_vector_dim {
220 return Err(Error::BadRequest(format!(
221 "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
222 self.max_vector_dim
223 )));
224 }
225 Ok(())
226 }
227
228 fn check_vector_len(&self, len: usize) -> Result<(), Error> {
229 if len > self.max_vector_dim {
230 return Err(Error::BadRequest(format!(
231 "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
232 self.max_vector_dim
233 )));
234 }
235 Ok(())
236 }
237
238 fn check_batch(&self, n: usize) -> Result<(), Error> {
239 if n > self.max_batch_size {
240 return Err(Error::BadRequest(format!(
241 "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
242 self.max_batch_size
243 )));
244 }
245 Ok(())
246 }
247
248 fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
249 if n > self.max_bulk_batch_size {
250 return Err(Error::BadRequest(format!(
251 "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
252 self.max_bulk_batch_size
253 )));
254 }
255 Ok(())
256 }
257
258 fn check_payload(&self, payload: &Value) -> Result<(), Error> {
259 let size = serde_json::to_vec(payload)
260 .map(|v| v.len())
261 .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
262 if size > self.max_payload_bytes {
263 return Err(Error::BadRequest(format!(
264 "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
265 self.max_payload_bytes
266 )));
267 }
268 Ok(())
269 }
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
275#[serde(default)]
276pub struct Config {
277 pub data_dir: PathBuf,
279 pub rest_addr: SocketAddr,
281 pub grpc_addr: SocketAddr,
283 #[serde(default, deserialize_with = "auth::de_api_keys")]
289 pub api_keys: Vec<ApiKey>,
290 pub encryption_key: Option<String>,
298 pub master_key_file: Option<PathBuf>,
306 pub tls_cert: Option<PathBuf>,
309 pub tls_key: Option<PathBuf>,
312 pub tls_client_ca: Option<PathBuf>,
317 pub audit_log: Option<PathBuf>,
323 pub leader_url: Option<String>,
329 pub leader_api_key: Option<String>,
332 pub insecure: bool,
336 pub limits: Limits,
339 #[serde(default)]
345 pub embedding: HashMap<String, EmbeddingConfig>,
346 #[serde(default)]
350 pub rerank: HashMap<String, RerankConfig>,
351 #[serde(default)]
355 pub rate_limit: RateLimitConfig,
356 #[serde(default)]
360 pub otlp: OtlpConfig,
361}
362
363impl Default for Config {
364 fn default() -> Self {
365 Self {
366 data_dir: PathBuf::from("./quiver-data"),
367 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
368 grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
369 api_keys: Vec::new(),
370 encryption_key: None,
371 master_key_file: None,
372 tls_cert: None,
373 tls_key: None,
374 tls_client_ca: None,
375 audit_log: None,
376 leader_url: None,
377 leader_api_key: None,
378 insecure: false,
379 limits: Limits::default(),
380 embedding: HashMap::new(),
381 rerank: HashMap::new(),
382 rate_limit: RateLimitConfig::default(),
383 otlp: OtlpConfig::default(),
384 }
385 }
386}
387
388impl Config {
389 pub fn load() -> Result<Self, Error> {
392 let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
393 .merge(Toml::file("quiver.toml"))
394 .merge(Env::prefixed("QUIVER_"))
395 .extract()
396 .map_err(|e| Error::Config(e.to_string()))?;
397 config.limits.apply_env_overrides()?;
400 config
402 .rate_limit
403 .apply_env_overrides()
404 .map_err(Error::Config)?;
405 config.otlp.apply_env_overrides().map_err(Error::Config)?;
407 Ok(config)
408 }
409
410 pub fn validate(&self) -> Result<(), Error> {
414 if self.api_keys.is_empty() && !self.insecure {
415 return Err(Error::Config(
416 "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
417 set insecure=true for local development"
418 .to_owned(),
419 ));
420 }
421 let master_key = self.master_key_hex()?;
423 if master_key.is_none() && !self.insecure {
424 return Err(Error::Config(
425 "no encryption key configured: encryption-at-rest is on by default — \
426 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
427 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
428 store data unencrypted (development only)"
429 .to_owned(),
430 ));
431 }
432 if let Some(key) = &master_key {
434 AeadCodec::from_hex(key)
435 .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
436 }
437 if self.tls_cert.is_some() != self.tls_key.is_some() {
439 return Err(Error::Config(
440 "tls_cert and tls_key must be set together".to_owned(),
441 ));
442 }
443 if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
445 return Err(Error::Config(
446 "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
447 ));
448 }
449 let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
450 let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
451 if non_loopback && !tls_enabled && !self.insecure {
452 return Err(Error::Config(
453 "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
454 or insecure=true for local development"
455 .to_owned(),
456 ));
457 }
458 self.limits.validate()?;
460 Ok(())
461 }
462
463 pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
473 let env_key = self
474 .encryption_key
475 .as_deref()
476 .map(str::trim)
477 .filter(|k| !k.is_empty());
478 match (&self.master_key_file, env_key) {
479 (Some(_), Some(_)) => Err(Error::Config(
480 "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
481 (QUIVER_MASTER_KEY_FILE), not both"
482 .to_owned(),
483 )),
484 (Some(path), None) => {
485 warn_if_world_readable(path);
486 let hex = std::fs::read_to_string(path).map_err(|e| {
487 Error::Config(format!("reading master_key_file {}: {e}", path.display()))
488 })?;
489 Ok(Some(hex.trim().to_owned()))
490 }
491 (None, Some(key)) => Ok(Some(key.to_owned())),
492 (None, None) => Ok(None),
493 }
494 }
495}
496
497#[cfg(unix)]
500fn warn_if_world_readable(path: &std::path::Path) {
501 use std::os::unix::fs::PermissionsExt;
502 if let Ok(meta) = std::fs::metadata(path)
503 && meta.permissions().mode() & 0o077 != 0
504 {
505 tracing::warn!(
506 path = %path.display(),
507 mode = format!("{:o}", meta.permissions().mode() & 0o777),
508 "master key file is group/world-accessible; restrict it to 0600"
509 );
510 }
511}
512
513#[cfg(not(unix))]
514fn warn_if_world_readable(_path: &std::path::Path) {}
515
516#[derive(Clone)]
519pub(crate) struct AppState {
520 db: Arc<RwLock<Database>>,
525 keys: Arc<Vec<ApiKey>>,
526 audit: Arc<AuditLog>,
527 replication_tx: broadcast::Sender<WalEntry>,
530 read_only: bool,
533 limits: Limits,
536 embed: Arc<EmbedRegistry>,
540 rate_limiter: Arc<RateLimiter>,
542 metrics: Arc<metrics::Metrics>,
544 rebuilding: Arc<Mutex<HashSet<String>>>,
548}
549
550pub(crate) struct CollectionInfo {
552 pub name: String,
553 pub dim: u32,
554 pub metric: DistanceMetric,
555 pub count: u64,
556 pub index: IndexSpec,
557 pub filterable: Vec<FilterableField>,
558 pub multivector: bool,
559 pub vector_encryption: VectorEncryption,
560}
561
562pub(crate) struct PointIn {
564 pub id: String,
565 pub vector: Vec<f32>,
566 pub payload: Value,
567}
568
569pub(crate) struct TextPointIn {
571 pub id: String,
572 pub text: String,
573 pub payload: Value,
574}
575
576const RERANK_CANDIDATES: usize = 50;
579
580fn doc_text(payload: Option<&Value>) -> String {
584 match payload {
585 Some(Value::Object(map)) => map
586 .get(TEXT_KEY)
587 .and_then(Value::as_str)
588 .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
589 Some(v) => v.to_string(),
590 None => String::new(),
591 }
592}
593
594pub(crate) struct PointOut {
596 pub id: String,
597 pub vector: Option<Vec<f32>>,
598 pub payload: Value,
599}
600
601pub(crate) struct MatchOut {
603 pub id: String,
604 pub score: f32,
605 pub payload: Option<Value>,
606 pub vector: Option<Vec<f32>>,
607}
608
609pub(crate) struct DocumentIn {
611 pub id: String,
612 pub vectors: Vec<Vec<f32>>,
613 pub payload: Value,
614}
615
616pub(crate) struct DocumentMatchOut {
618 pub id: String,
619 pub score: f32,
620 pub payload: Option<Value>,
621 pub vectors: Option<Vec<Vec<f32>>>,
622}
623
624impl AppState {
625 pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
629 auth::authenticate(&self.keys, presented)
630 }
631
632 pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
636 self.rate_limiter.check(actor)
637 }
638
639 pub(crate) fn rate_limit_enabled(&self) -> bool {
642 self.rate_limiter.enabled()
643 }
644
645 async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
649 where
650 T: Send + 'static,
651 F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
652 {
653 let db = Arc::clone(&self.db);
654 tokio::task::spawn_blocking(move || -> Result<T, Error> {
655 let mut guard = db
656 .write()
657 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
658 f(&mut guard).map_err(Error::Engine)
659 })
660 .await
661 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
662 }
663
664 async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
668 where
669 T: Send + 'static,
670 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
671 {
672 let db = Arc::clone(&self.db);
673 tokio::task::spawn_blocking(move || -> Result<T, Error> {
674 let guard = db
675 .read()
676 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
677 f(&guard).map_err(Error::Engine)
678 })
679 .await
680 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
681 }
682
683 async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
690 where
691 T: Send + 'static,
692 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
693 {
694 let db = Arc::clone(&self.db);
695 let coll = collection.clone();
696 let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
697 let guard = db
698 .read()
699 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
700 let result = f(&guard).map_err(Error::Engine)?;
701 let stale = guard.needs_rebuild(&coll).unwrap_or(false);
704 Ok((result, stale))
705 })
706 .await
707 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
708 if stale {
709 self.schedule_rebuild(collection);
710 }
711 Ok(result)
712 }
713
714 fn schedule_rebuild(&self, collection: String) {
718 {
719 let mut inflight = match self.rebuilding.lock() {
720 Ok(g) => g,
721 Err(_) => return,
722 };
723 if !inflight.insert(collection.clone()) {
724 return; }
726 }
727 let state = self.clone();
728 tokio::spawn(async move {
729 state.run_rebuild(&collection).await;
730 if let Ok(mut inflight) = state.rebuilding.lock() {
731 inflight.remove(&collection);
732 }
733 });
734 }
735
736 async fn run_rebuild(&self, collection: &str) {
743 loop {
744 let db = Arc::clone(&self.db);
745 let coll = collection.to_owned();
746 let inputs = tokio::task::spawn_blocking(move || {
747 let guard = db.read().ok()?;
748 guard.snapshot_rebuild_inputs(&coll).ok().flatten()
749 })
750 .await
751 .ok()
752 .flatten();
753 let Some(inputs) = inputs else { return };
754
755 let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
756 return;
757 };
758
759 let db = Arc::clone(&self.db);
760 let still_stale = tokio::task::spawn_blocking(move || {
761 let mut guard = db.write().ok()?;
762 guard.commit_rebuild(rebuilt).ok()
763 })
764 .await
765 .ok()
766 .flatten();
767 match still_stale {
768 Some(true) => continue, _ => return,
770 }
771 }
772 }
773
774 fn authorize(
777 &self,
778 principal: &Principal,
779 action: Action,
780 op: &str,
781 resource: &str,
782 ) -> Result<(), Error> {
783 principal
784 .require(action, Some(resource))
785 .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
786 }
787
788 fn authorize_global(
791 &self,
792 principal: &Principal,
793 action: Action,
794 op: &str,
795 ) -> Result<(), Error> {
796 principal
797 .require(action, None)
798 .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
799 }
800
801 pub(crate) async fn open_replication(
808 &self,
809 principal: &Principal,
810 ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
811 self.authorize_global(principal, Action::Admin, "replicate")?;
812 let tx = self.replication_tx.clone();
813 self.read_blocking(move |db| {
814 let rx = tx.subscribe();
815 let snapshot = db.replication_snapshot()?;
816 Ok((snapshot, rx))
817 })
818 .await
819 }
820
821 pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
825 self.write_blocking(move |db| db.apply_replicated(op)).await
826 }
827
828 fn ensure_writable(&self, op: &str) -> Result<(), Error> {
831 if self.read_only {
832 return Err(Error::Forbidden(format!(
833 "{op}: this node is a read-only replication follower"
834 )));
835 }
836 Ok(())
837 }
838
839 #[allow(clippy::too_many_arguments)]
840 pub(crate) async fn create_collection(
841 &self,
842 principal: &Principal,
843 name: String,
844 dim: u32,
845 metric: DistanceMetric,
846 index: IndexSpec,
847 filterable: Vec<FilterableField>,
848 multivector: bool,
849 vector_encryption: VectorEncryption,
850 ) -> Result<CollectionInfo, Error> {
851 self.ensure_writable("create_collection")?;
852 self.authorize(principal, Action::Admin, "create_collection", &name)?;
853 self.limits.check_dim(dim as usize)?;
854 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
855 .with_index(index)
856 .with_filterable(filterable.clone())
857 .with_multivector(multivector)
858 .with_vector_encryption(vector_encryption);
859 let owned = name.clone();
860 let result = self
861 .write_blocking(move |db| db.create_collection(&owned, descriptor))
862 .await;
863 self.audit.record(
864 principal.actor(),
865 "create_collection",
866 &name,
867 Outcome::of(&result),
868 );
869 result?;
870 Ok(CollectionInfo {
871 name,
872 dim,
873 metric,
874 count: 0,
875 index,
876 filterable,
877 multivector,
878 vector_encryption,
879 })
880 }
881
882 pub(crate) async fn get_collection(
883 &self,
884 principal: &Principal,
885 name: String,
886 ) -> Result<CollectionInfo, Error> {
887 self.authorize(principal, Action::Read, "get_collection", &name)?;
888 self.read_blocking(move |db| {
889 let descriptor = db
890 .descriptor(&name)
891 .cloned()
892 .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
893 let count = if descriptor.multivector {
896 db.document_count(&name)? as u64
897 } else {
898 db.len(&name)? as u64
899 };
900 Ok(CollectionInfo {
901 name,
902 dim: descriptor.dim,
903 metric: descriptor.metric,
904 count,
905 index: descriptor.index,
906 filterable: descriptor.filterable,
907 multivector: descriptor.multivector,
908 vector_encryption: descriptor.vector_encryption,
909 })
910 })
911 .await
912 }
913
914 pub(crate) async fn list_collections(
915 &self,
916 principal: &Principal,
917 ) -> Result<Vec<CollectionInfo>, Error> {
918 self.authorize_global(principal, Action::Read, "list_collections")?;
919 let mut infos = self
920 .read_blocking(|db| {
921 let mut out = Vec::new();
922 for name in db.collection_names() {
923 if let Some(descriptor) = db.descriptor(&name).cloned() {
924 let count = if descriptor.multivector {
925 db.document_count(&name)? as u64
926 } else {
927 db.len(&name)? as u64
928 };
929 out.push(CollectionInfo {
930 name,
931 dim: descriptor.dim,
932 metric: descriptor.metric,
933 count,
934 index: descriptor.index,
935 filterable: descriptor.filterable,
936 multivector: descriptor.multivector,
937 vector_encryption: descriptor.vector_encryption,
938 });
939 }
940 }
941 Ok(out)
942 })
943 .await?;
944 infos.retain(|info| principal.can_see(&info.name));
946 Ok(infos)
947 }
948
949 pub(crate) async fn delete_collection(
950 &self,
951 principal: &Principal,
952 name: String,
953 ) -> Result<bool, Error> {
954 self.ensure_writable("delete_collection")?;
955 self.authorize(principal, Action::Admin, "delete_collection", &name)?;
956 let resource = name.clone();
957 let result = self
958 .write_blocking(move |db| db.drop_collection(&name))
959 .await;
960 self.audit.record(
961 principal.actor(),
962 "delete_collection",
963 &resource,
964 Outcome::of(&result),
965 );
966 result
967 }
968
969 #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
970 pub(crate) async fn upsert(
971 &self,
972 principal: &Principal,
973 collection: String,
974 points: Vec<PointIn>,
975 ) -> Result<u64, Error> {
976 self.ensure_writable("upsert")?;
977 self.authorize(principal, Action::Write, "upsert", &collection)?;
978 self.limits.check_batch(points.len())?;
979 for p in &points {
980 self.limits.check_vector_len(p.vector.len())?;
981 self.limits.check_payload(&p.payload)?;
982 }
983 let resource = collection.clone();
984 let result = self
985 .write_blocking(move |db| {
986 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
987 .iter()
988 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
989 .collect();
990 db.upsert_batch(&collection, &records)
991 })
992 .await;
993 self.audit
994 .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
995 result
996 }
997
998 pub(crate) async fn upsert_bulk(
1001 &self,
1002 principal: &Principal,
1003 collection: String,
1004 points: Vec<PointIn>,
1005 ) -> Result<u64, Error> {
1006 self.ensure_writable("upsert")?;
1007 self.authorize(principal, Action::Write, "upsert", &collection)?;
1008 self.limits.check_bulk_batch(points.len())?;
1009 for p in &points {
1010 self.limits.check_vector_len(p.vector.len())?;
1011 self.limits.check_payload(&p.payload)?;
1012 }
1013 let resource = collection.clone();
1014 let result = self
1015 .write_blocking(move |db| {
1016 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1017 .iter()
1018 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1019 .collect();
1020 db.upsert_bulk(&collection, &records)
1021 })
1022 .await;
1023 self.audit.record(
1024 principal.actor(),
1025 "upsert_bulk",
1026 &resource,
1027 Outcome::of(&result),
1028 );
1029 result
1030 }
1031
1032 #[tracing::instrument(skip_all)]
1036 pub(crate) async fn snapshot(
1037 &self,
1038 principal: &Principal,
1039 destination: String,
1040 ) -> Result<SnapshotInfo, Error> {
1041 self.ensure_writable("snapshot")?;
1042 self.authorize_global(principal, Action::Admin, "snapshot")?;
1043 let dest = std::path::PathBuf::from(&destination);
1044 let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1045 self.audit.record(
1046 principal.actor(),
1047 "snapshot",
1048 &destination,
1049 Outcome::of(&result),
1050 );
1051 result
1052 }
1053
1054 pub(crate) async fn delete_points(
1055 &self,
1056 principal: &Principal,
1057 collection: String,
1058 ids: Vec<String>,
1059 ) -> Result<u64, Error> {
1060 self.ensure_writable("delete_points")?;
1061 self.authorize(principal, Action::Write, "delete_points", &collection)?;
1062 let resource = collection.clone();
1063 let result = self
1064 .write_blocking(move |db| {
1065 let mut count = 0u64;
1066 for id in &ids {
1067 if db.delete(&collection, id)? {
1068 count += 1;
1069 }
1070 }
1071 Ok(count)
1072 })
1073 .await;
1074 self.audit.record(
1075 principal.actor(),
1076 "delete_points",
1077 &resource,
1078 Outcome::of(&result),
1079 );
1080 result
1081 }
1082
1083 pub(crate) async fn get_points(
1084 &self,
1085 principal: &Principal,
1086 collection: String,
1087 ids: Vec<String>,
1088 with_vector: bool,
1089 ) -> Result<Vec<PointOut>, Error> {
1090 self.authorize(principal, Action::Read, "get_points", &collection)?;
1091 self.read_blocking(move |db| {
1092 let mut out = Vec::new();
1093 for id in &ids {
1094 if let Some(m) = db.get(&collection, id)? {
1095 out.push(PointOut {
1096 id: m.id,
1097 vector: if with_vector { m.vector } else { None },
1098 payload: m.payload.unwrap_or(Value::Null),
1099 });
1100 }
1101 }
1102 Ok(out)
1103 })
1104 .await
1105 }
1106
1107 #[allow(clippy::too_many_arguments)]
1108 #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1109 pub(crate) async fn search(
1110 &self,
1111 principal: &Principal,
1112 collection: String,
1113 vector: Vec<f32>,
1114 k: usize,
1115 filter: Option<Filter>,
1116 ef_search: usize,
1117 with_payload: bool,
1118 with_vector: bool,
1119 ) -> Result<Vec<MatchOut>, Error> {
1120 self.authorize(principal, Action::Read, "search", &collection)?;
1121 self.limits.check_search(k, ef_search)?;
1122 self.limits.check_vector_len(vector.len())?;
1123 let params = SearchParams {
1124 k,
1125 filter,
1126 ef_search,
1127 with_payload,
1128 with_vector,
1129 };
1130 let coll = collection.clone();
1131 self.search_blocking(coll, move |db| {
1132 let matches = db.search_snapshot(&collection, &vector, ¶ms)?;
1133 Ok(matches
1134 .into_iter()
1135 .map(|m| MatchOut {
1136 id: m.id,
1137 score: m.score,
1138 payload: m.payload,
1139 vector: m.vector,
1140 })
1141 .collect())
1142 })
1143 .await
1144 }
1145
1146 #[allow(clippy::too_many_arguments)]
1147 pub(crate) async fn hybrid_search(
1148 &self,
1149 principal: &Principal,
1150 collection: String,
1151 dense: Option<Vec<f32>>,
1152 sparse: Option<(Vec<u32>, Vec<f32>)>,
1153 text: Option<String>,
1154 k: usize,
1155 filter: Option<Filter>,
1156 ef_search: usize,
1157 rrf_k0: f32,
1158 with_payload: bool,
1159 with_vector: bool,
1160 ) -> Result<Vec<MatchOut>, Error> {
1161 self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1162 self.limits.check_search(k, ef_search)?;
1163 if let Some(v) = &dense {
1164 self.limits.check_vector_len(v.len())?;
1165 }
1166 if let Some((indices, values)) = &sparse {
1167 self.limits.check_sparse_terms(indices.len())?;
1168 if indices.len() != values.len() {
1169 return Err(Error::BadRequest(format!(
1170 "sparse query indices ({}) and values ({}) length mismatch",
1171 indices.len(),
1172 values.len()
1173 )));
1174 }
1175 }
1176 let params = SearchParams {
1177 k,
1178 filter,
1179 ef_search,
1180 with_payload,
1181 with_vector,
1182 };
1183 let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1184 let coll = collection.clone();
1185 self.search_blocking(coll, move |db| {
1186 let matches = db.hybrid_search_snapshot(
1187 &collection,
1188 dense.as_deref(),
1189 sv.as_ref(),
1190 text.as_deref(),
1191 ¶ms,
1192 rrf_k0,
1193 )?;
1194 Ok(matches
1195 .into_iter()
1196 .map(|m| MatchOut {
1197 id: m.id,
1198 score: m.score,
1199 payload: m.payload,
1200 vector: m.vector,
1201 })
1202 .collect())
1203 })
1204 .await
1205 }
1206
1207 #[allow(clippy::too_many_arguments)]
1212 pub(crate) async fn search_text(
1213 &self,
1214 principal: &Principal,
1215 collection: String,
1216 text: String,
1217 k: usize,
1218 filter: Option<Filter>,
1219 ef_search: usize,
1220 rrf_k0: f32,
1221 with_payload: bool,
1222 with_vector: bool,
1223 rerank: bool,
1224 ) -> Result<Vec<MatchOut>, Error> {
1225 self.authorize(principal, Action::Read, "search_text", &collection)?;
1226 self.limits.check_search(k, ef_search)?;
1227 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1228 Error::BadRequest(format!(
1229 "collection {collection:?} has no embedding provider configured \
1230 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1231 ))
1232 })?;
1233 let query = text.clone();
1235 let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1236 .await
1237 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1238 .map_err(|e| Error::Upstream(e.to_string()))?
1239 .into_iter()
1240 .next()
1241 .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1242 self.limits.check_vector_len(vector.len())?;
1243
1244 let reranker = if rerank {
1245 self.embed.reranker(&collection)
1246 } else {
1247 None
1248 };
1249 let need_payload = with_payload || reranker.is_some();
1253 let fetch_k = if reranker.is_some() {
1254 k.max(RERANK_CANDIDATES)
1255 } else {
1256 k
1257 };
1258
1259 let mut hits = self
1260 .hybrid_search(
1261 principal,
1262 collection,
1263 Some(vector),
1264 None,
1265 Some(text.clone()),
1266 fetch_k,
1267 filter,
1268 ef_search,
1269 rrf_k0,
1270 need_payload,
1271 with_vector,
1272 )
1273 .await?;
1274
1275 if let Some(rr) = reranker {
1276 let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1277 let query = text;
1278 let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1279 .await
1280 .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1281 .map_err(|e| Error::Upstream(e.to_string()))?;
1282 let mut scored: Vec<(f32, MatchOut)> = scores
1284 .into_iter()
1285 .zip(hits)
1286 .map(|(s, mut h)| {
1287 h.score = s;
1288 (s, h)
1289 })
1290 .collect();
1291 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1292 hits = scored.into_iter().map(|(_, h)| h).collect();
1293 }
1294
1295 hits.truncate(k);
1296 if !with_payload {
1298 for h in &mut hits {
1299 h.payload = None;
1300 }
1301 }
1302 Ok(hits)
1303 }
1304
1305 pub(crate) async fn upsert_text(
1310 &self,
1311 principal: &Principal,
1312 collection: String,
1313 points: Vec<TextPointIn>,
1314 ) -> Result<u64, Error> {
1315 self.ensure_writable("upsert_text")?;
1316 self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1317 self.limits.check_batch(points.len())?;
1318 for p in &points {
1319 if !matches!(p.payload, Value::Object(_) | Value::Null) {
1320 return Err(Error::BadRequest(
1321 "upsert_text payload must be a JSON object or null".to_owned(),
1322 ));
1323 }
1324 }
1325 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1326 Error::BadRequest(format!(
1327 "collection {collection:?} has no embedding provider configured \
1328 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1329 ))
1330 })?;
1331 let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1332 let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1333 .await
1334 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1335 .map_err(|e| Error::Upstream(e.to_string()))?;
1336 if vectors.len() != points.len() {
1337 return Err(Error::Upstream(format!(
1338 "embedding provider returned {} vectors for {} inputs",
1339 vectors.len(),
1340 points.len()
1341 )));
1342 }
1343 let dense: Vec<PointIn> = points
1344 .into_iter()
1345 .zip(vectors)
1346 .map(|(p, vector)| {
1347 let mut payload = match p.payload {
1348 Value::Object(map) => map,
1349 _ => serde_json::Map::new(),
1350 };
1351 payload
1353 .entry(TEXT_KEY.to_owned())
1354 .or_insert_with(|| Value::String(p.text.clone()));
1355 PointIn {
1356 id: p.id,
1357 vector,
1358 payload: Value::Object(payload),
1359 }
1360 })
1361 .collect();
1362 self.upsert(principal, collection, dense).await
1363 }
1364
1365 pub(crate) async fn fetch(
1366 &self,
1367 principal: &Principal,
1368 collection: String,
1369 filter: Option<Filter>,
1370 limit: usize,
1371 with_payload: bool,
1372 with_vector: bool,
1373 ) -> Result<Vec<MatchOut>, Error> {
1374 self.authorize(principal, Action::Read, "fetch", &collection)?;
1375 self.limits.check_fetch(limit)?;
1376 self.read_blocking(move |db| {
1377 let matches = db.fetch(
1378 &collection,
1379 filter.as_ref(),
1380 limit,
1381 with_payload,
1382 with_vector,
1383 )?;
1384 Ok(matches
1385 .into_iter()
1386 .map(|m| MatchOut {
1387 id: m.id,
1388 score: m.score,
1389 payload: m.payload,
1390 vector: m.vector,
1391 })
1392 .collect())
1393 })
1394 .await
1395 }
1396
1397 pub(crate) async fn upsert_documents(
1398 &self,
1399 principal: &Principal,
1400 collection: String,
1401 documents: Vec<DocumentIn>,
1402 ) -> Result<u64, Error> {
1403 self.ensure_writable("upsert_documents")?;
1404 self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1405 self.limits.check_batch(documents.len())?;
1406 for doc in &documents {
1407 self.limits.check_payload(&doc.payload)?;
1408 for token in &doc.vectors {
1409 self.limits.check_vector_len(token.len())?;
1410 }
1411 }
1412 let resource = collection.clone();
1413 let result = self
1414 .write_blocking(move |db| {
1415 let mut count = 0u64;
1416 for doc in &documents {
1417 db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1418 count += 1;
1419 }
1420 Ok(count)
1421 })
1422 .await;
1423 self.audit.record(
1424 principal.actor(),
1425 "upsert_documents",
1426 &resource,
1427 Outcome::of(&result),
1428 );
1429 result
1430 }
1431
1432 pub(crate) async fn delete_documents(
1433 &self,
1434 principal: &Principal,
1435 collection: String,
1436 ids: Vec<String>,
1437 ) -> Result<u64, Error> {
1438 self.ensure_writable("delete_documents")?;
1439 self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1440 let resource = collection.clone();
1441 let result = self
1442 .write_blocking(move |db| {
1443 let mut count = 0u64;
1444 for id in &ids {
1445 if db.delete_document(&collection, id)? {
1446 count += 1;
1447 }
1448 }
1449 Ok(count)
1450 })
1451 .await;
1452 self.audit.record(
1453 principal.actor(),
1454 "delete_documents",
1455 &resource,
1456 Outcome::of(&result),
1457 );
1458 result
1459 }
1460
1461 #[allow(clippy::too_many_arguments)]
1462 pub(crate) async fn search_multi_vector(
1463 &self,
1464 principal: &Principal,
1465 collection: String,
1466 query: Vec<Vec<f32>>,
1467 k: usize,
1468 filter: Option<Filter>,
1469 ef_search: usize,
1470 with_payload: bool,
1471 with_vector: bool,
1472 ) -> Result<Vec<DocumentMatchOut>, Error> {
1473 self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1474 self.limits.check_search(k, ef_search)?;
1475 for token in &query {
1476 self.limits.check_vector_len(token.len())?;
1477 }
1478 let params = SearchParams {
1479 k,
1480 filter,
1481 ef_search,
1482 with_payload,
1483 with_vector,
1484 };
1485 let coll = collection.clone();
1486 self.search_blocking(coll, move |db| {
1487 let matches = db.search_multi_vector_snapshot(&collection, &query, ¶ms)?;
1488 Ok(matches
1489 .into_iter()
1490 .map(|m| DocumentMatchOut {
1491 id: m.id,
1492 score: m.score,
1493 payload: m.payload,
1494 vectors: m.vectors,
1495 })
1496 .collect())
1497 })
1498 .await
1499 }
1500}
1501
1502const REPLICATION_BUFFER: usize = 1024;
1505
1506pub async fn run(config: Config) -> Result<(), Error> {
1508 config.validate()?;
1509 let rest_listener = TcpListener::bind(config.rest_addr)
1510 .await
1511 .map_err(Error::Io)?;
1512 let grpc_listener = TcpListener::bind(config.grpc_addr)
1513 .await
1514 .map_err(Error::Io)?;
1515 tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1516 tokio::select! {
1517 result = serve(config, rest_listener, grpc_listener) => result,
1518 () = shutdown_signal() => {
1519 tracing::info!("shutdown signal received");
1520 Ok(())
1521 }
1522 }
1523}
1524
1525pub async fn serve(
1528 config: Config,
1529 rest_listener: TcpListener,
1530 grpc_listener: TcpListener,
1531) -> Result<(), Error> {
1532 let mut db = open_database(&config)?;
1533 let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1534 let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1538 {
1539 let tx = replication_tx.clone();
1540 db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1541 let _ = tx.send(entry.clone());
1542 }));
1543 }
1544 let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1548 .map_err(|e| Error::Config(e.to_string()))?;
1549
1550 let state = AppState {
1551 db: Arc::new(RwLock::new(db)),
1552 keys: Arc::new(config.api_keys.clone()),
1553 audit,
1554 replication_tx,
1555 read_only: config.leader_url.is_some(),
1556 limits: config.limits,
1557 embed: Arc::new(embed),
1558 rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1559 metrics: Arc::new(metrics::Metrics::default()),
1560 rebuilding: Arc::new(Mutex::new(HashSet::new())),
1561 };
1562
1563 if let Some(leader_url) = config.leader_url.clone() {
1565 replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1566 }
1567
1568 let app = rest::router(state.clone());
1569 let grpc = grpc::service(state);
1570
1571 let tls = load_tls(&config)?;
1572
1573 let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1575 Some(material) => {
1576 let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1577 let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1578 let server =
1579 axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1580 Box::pin(async move {
1581 server
1582 .serve(app.into_make_service())
1583 .await
1584 .map_err(Error::Io)
1585 })
1586 }
1587 None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1588 };
1589
1590 let mut grpc_builder = tonic::transport::Server::builder();
1592 if let Some(material) = &tls {
1593 let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1594 let mut tls_config = ServerTlsConfig::new().identity(identity);
1595 if let Some(ca_pem) = &material.client_ca_pem {
1597 tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1598 }
1599 grpc_builder = grpc_builder
1600 .tls_config(tls_config)
1601 .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1602 }
1603 let grpc_fut = async move {
1604 grpc_builder
1605 .add_service(grpc)
1606 .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1607 .await
1608 .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1609 };
1610
1611 tokio::try_join!(rest_fut, grpc_fut)?;
1612 Ok(())
1613}
1614
1615async fn shutdown_signal() {
1616 let _ = tokio::signal::ctrl_c().await;
1617}
1618
1619fn open_database(config: &Config) -> Result<Database, Error> {
1625 let master_key = config.master_key_hex()?;
1626 let keyring =
1627 quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1628 .map_err(|e| Error::Config(e.to_string()))?;
1629 let db = match keyring {
1630 Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1631 None => Database::open(&config.data_dir)?,
1632 };
1633 Ok(db)
1634}
1635
1636struct TlsMaterial {
1640 cert_pem: Vec<u8>,
1641 key_pem: Vec<u8>,
1642 client_ca_pem: Option<Vec<u8>>,
1643 rest_config: Arc<rustls::ServerConfig>,
1644}
1645
1646fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1651 match (&config.tls_cert, &config.tls_key) {
1652 (Some(cert_path), Some(key_path)) => {
1653 let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1654 let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1655 let client_ca_pem = config
1656 .tls_client_ca
1657 .as_ref()
1658 .map(std::fs::read)
1659 .transpose()
1660 .map_err(Error::Io)?;
1661 let rest_config = Arc::new(rustls_server_config(
1662 &cert_pem,
1663 &key_pem,
1664 client_ca_pem.as_deref(),
1665 )?);
1666 Ok(Some(TlsMaterial {
1667 cert_pem,
1668 key_pem,
1669 client_ca_pem,
1670 rest_config,
1671 }))
1672 }
1673 (None, None) => Ok(None),
1674 _ => Err(Error::Config(
1675 "tls_cert and tls_key must be set together".to_owned(),
1676 )),
1677 }
1678}
1679
1680fn rustls_server_config(
1684 cert_pem: &[u8],
1685 key_pem: &[u8],
1686 client_ca_pem: Option<&[u8]>,
1687) -> Result<rustls::ServerConfig, Error> {
1688 use rustls_pki_types::pem::PemObject;
1689 use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1690
1691 let certs = CertificateDer::pem_slice_iter(cert_pem)
1692 .collect::<std::result::Result<Vec<_>, _>>()
1693 .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1694 if certs.is_empty() {
1695 return Err(Error::Config(
1696 "tls_cert contains no certificates".to_owned(),
1697 ));
1698 }
1699 let key = PrivateKeyDer::from_pem_slice(key_pem)
1700 .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1701 let provider = Arc::new(rustls::crypto::ring::default_provider());
1702 let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1703 .with_safe_default_protocol_versions()
1704 .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1705 let builder = match client_ca_pem {
1706 Some(ca_pem) => {
1707 let mut roots = rustls::RootCertStore::empty();
1708 for cert in CertificateDer::pem_slice_iter(ca_pem) {
1709 let cert =
1710 cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1711 roots
1712 .add(cert)
1713 .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1714 }
1715 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1716 Arc::new(roots),
1717 provider,
1718 )
1719 .build()
1720 .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1721 builder.with_client_cert_verifier(verifier)
1722 }
1723 None => builder.with_no_client_auth(),
1724 };
1725 builder
1726 .with_single_cert(certs, key)
1727 .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1728}
1729
1730pub fn init_tracing() {
1733 init_observability(&Config::default());
1734}
1735
1736#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1742pub fn init_observability(config: &Config) {
1743 use tracing_subscriber::EnvFilter;
1744 use tracing_subscriber::prelude::*;
1745 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1746 let registry = tracing_subscriber::registry()
1747 .with(filter)
1748 .with(tracing_subscriber::fmt::layer());
1749
1750 #[cfg(feature = "otlp")]
1751 if config.otlp.is_enabled() {
1752 match otlp::build_provider(&config.otlp) {
1753 Ok(provider) => {
1754 use opentelemetry::trace::TracerProvider as _;
1755 let tracer = provider.tracer("quiver");
1756 otlp::store_provider(provider);
1757 let _ = registry
1758 .with(tracing_opentelemetry::layer().with_tracer(tracer))
1759 .try_init();
1760 return;
1761 }
1762 Err(e) => eprintln!("OTLP traces export disabled: {e}"),
1763 }
1764 }
1765
1766 let _ = registry.try_init();
1767}
1768
1769pub fn shutdown_observability() {
1773 #[cfg(feature = "otlp")]
1774 otlp::shutdown();
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779 use super::*;
1780
1781 const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
1783
1784 #[test]
1785 fn config_rejects_missing_keys_unless_insecure() {
1786 let mut config = Config::default();
1787 assert!(config.validate().is_err());
1788 config.insecure = true;
1789 assert!(config.validate().is_ok());
1790 config.insecure = false;
1791 config.api_keys = vec!["secret".into()];
1792 config.encryption_key = Some(TEST_KEY.to_owned());
1793 assert!(config.validate().is_ok());
1794 }
1795
1796 #[test]
1797 fn config_requires_encryption_key_unless_insecure() {
1798 let mut config = Config {
1799 api_keys: vec!["secret".into()],
1800 ..Config::default()
1801 };
1802 assert!(config.validate().is_err());
1804 config.encryption_key = Some(TEST_KEY.to_owned());
1805 assert!(config.validate().is_ok());
1806 config.encryption_key = Some("not-a-valid-hex-key".to_owned());
1808 assert!(config.validate().is_err());
1809 config.insecure = true;
1811 config.encryption_key = None;
1812 assert!(config.validate().is_ok());
1813 }
1814
1815 #[test]
1816 fn master_key_file_is_an_alternative_to_the_env_key() {
1817 let dir = tempfile::tempdir().unwrap();
1818 let path = dir.path().join("master.key");
1819 std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
1821
1822 let mut config = Config {
1823 api_keys: vec!["secret".into()],
1824 master_key_file: Some(path.clone()),
1825 ..Config::default()
1826 };
1827 assert!(config.validate().is_ok());
1829 assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
1830
1831 config.encryption_key = Some(TEST_KEY.to_owned());
1833 assert!(config.validate().is_err());
1834
1835 config.encryption_key = None;
1837 std::fs::write(&path, "not-a-valid-key").unwrap();
1838 assert!(config.validate().is_err());
1839 }
1840
1841 #[test]
1842 fn config_rejects_public_bind_without_optout() {
1843 let mut config = Config {
1844 api_keys: vec!["secret".into()],
1845 encryption_key: Some(TEST_KEY.to_owned()),
1846 ..Config::default()
1847 };
1848 config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
1849 assert!(config.validate().is_err());
1851 config.insecure = true;
1852 assert!(config.validate().is_ok());
1853 }
1854
1855 #[test]
1856 fn config_public_bind_allowed_with_tls() {
1857 let config = Config {
1858 api_keys: vec!["secret".into()],
1859 encryption_key: Some(TEST_KEY.to_owned()),
1860 tls_cert: Some(PathBuf::from("cert.pem")),
1861 tls_key: Some(PathBuf::from("key.pem")),
1862 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
1863 ..Config::default()
1864 };
1865 assert!(config.validate().is_ok());
1867 }
1868
1869 #[test]
1870 fn config_tls_cert_and_key_must_pair() {
1871 let mut config = Config {
1872 api_keys: vec!["secret".into()],
1873 encryption_key: Some(TEST_KEY.to_owned()),
1874 tls_cert: Some(PathBuf::from("cert.pem")),
1875 ..Config::default()
1876 };
1877 assert!(config.validate().is_err());
1879 config.tls_key = Some(PathBuf::from("key.pem"));
1880 assert!(config.validate().is_ok());
1881 }
1882}