1mod audit;
33mod auth;
34mod cluster;
35mod error;
36mod grpc;
37mod metrics;
38mod otlp;
39mod rate_limit;
40mod replication;
41mod rest;
42
43use std::collections::{HashMap, HashSet};
44use std::future::Future;
45use std::net::{IpAddr, Ipv4Addr, SocketAddr};
46use std::path::PathBuf;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex, RwLock};
49
50use axum_server::tls_rustls::RustlsConfig;
51use figment::Figment;
52use figment::providers::{Env, Format, Serialized, Toml};
53use serde::{Deserialize, Serialize};
54use serde_json::Value;
55use tokio::net::TcpListener;
56use tokio::sync::broadcast;
57use tokio_stream::wrappers::TcpListenerStream;
58use tonic::transport::{Certificate, Identity, ServerTlsConfig};
59
60use quiver_crypto::AeadCodec;
61use quiver_embed::{
62 Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
63 SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
64};
65use quiver_query::Filter;
66
67pub use auth::{Action, ApiKey, CollectionScope};
68pub use error::Error;
69pub use otlp::OtlpConfig;
70pub use quiver_providers::{
73 EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
74 RerankProvider,
75};
76pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
77
78use audit::{AuditLog, Outcome};
79use auth::Principal;
80
81#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
89#[serde(default)]
90pub struct Limits {
91 pub max_k: usize,
93 pub max_ef_search: usize,
95 pub max_fetch_limit: usize,
97 pub max_vector_dim: usize,
100 pub max_payload_bytes: usize,
102 pub max_batch_size: usize,
104 pub max_request_body_bytes: usize,
107 pub max_sparse_terms: usize,
110 pub max_bulk_batch_size: usize,
115}
116
117impl Default for Limits {
118 fn default() -> Self {
119 Self {
120 max_k: 10_000,
121 max_ef_search: 4_096,
122 max_fetch_limit: 10_000,
123 max_vector_dim: 8_192,
124 max_payload_bytes: 65_536,
125 max_batch_size: 1_000,
126 max_request_body_bytes: 32 * 1024 * 1024,
127 max_sparse_terms: 4_096,
128 max_bulk_batch_size: 50_000,
129 }
130 }
131}
132
133impl Limits {
134 fn apply_env_overrides(&mut self) -> Result<(), Error> {
138 let slots: [(&str, &mut usize); 9] = [
139 ("QUIVER_MAX_K", &mut self.max_k),
140 ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
141 ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
142 ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
143 ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
144 ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
145 (
146 "QUIVER_MAX_REQUEST_BODY_BYTES",
147 &mut self.max_request_body_bytes,
148 ),
149 ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
150 ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
151 ];
152 for (key, slot) in slots {
153 if let Ok(raw) = std::env::var(key) {
154 *slot = raw.parse().map_err(|_| {
155 Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
156 })?;
157 }
158 }
159 Ok(())
160 }
161
162 fn validate(&self) -> Result<(), Error> {
164 let named = [
165 ("max_k", self.max_k),
166 ("max_ef_search", self.max_ef_search),
167 ("max_fetch_limit", self.max_fetch_limit),
168 ("max_vector_dim", self.max_vector_dim),
169 ("max_payload_bytes", self.max_payload_bytes),
170 ("max_batch_size", self.max_batch_size),
171 ("max_request_body_bytes", self.max_request_body_bytes),
172 ("max_sparse_terms", self.max_sparse_terms),
173 ("max_bulk_batch_size", self.max_bulk_batch_size),
174 ];
175 if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
176 return Err(Error::Config(format!(
177 "limits.{name} must be greater than zero"
178 )));
179 }
180 Ok(())
181 }
182
183 fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
184 if k > self.max_k {
185 return Err(Error::BadRequest(format!(
186 "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
187 self.max_k
188 )));
189 }
190 if ef_search > self.max_ef_search {
191 return Err(Error::BadRequest(format!(
192 "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
193 self.max_ef_search
194 )));
195 }
196 Ok(())
197 }
198
199 fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
200 if n > self.max_sparse_terms {
201 return Err(Error::BadRequest(format!(
202 "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
203 self.max_sparse_terms
204 )));
205 }
206 Ok(())
207 }
208
209 fn check_fetch(&self, limit: usize) -> Result<(), Error> {
210 if limit > self.max_fetch_limit {
211 return Err(Error::BadRequest(format!(
212 "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
213 self.max_fetch_limit
214 )));
215 }
216 Ok(())
217 }
218
219 fn check_dim(&self, dim: usize) -> Result<(), Error> {
220 if dim > self.max_vector_dim {
221 return Err(Error::BadRequest(format!(
222 "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
223 self.max_vector_dim
224 )));
225 }
226 Ok(())
227 }
228
229 fn check_vector_len(&self, len: usize) -> Result<(), Error> {
230 if len > self.max_vector_dim {
231 return Err(Error::BadRequest(format!(
232 "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
233 self.max_vector_dim
234 )));
235 }
236 Ok(())
237 }
238
239 fn check_batch(&self, n: usize) -> Result<(), Error> {
240 if n > self.max_batch_size {
241 return Err(Error::BadRequest(format!(
242 "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
243 self.max_batch_size
244 )));
245 }
246 Ok(())
247 }
248
249 fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
250 if n > self.max_bulk_batch_size {
251 return Err(Error::BadRequest(format!(
252 "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
253 self.max_bulk_batch_size
254 )));
255 }
256 Ok(())
257 }
258
259 fn check_payload(&self, payload: &Value) -> Result<(), Error> {
260 let size = serde_json::to_vec(payload)
261 .map(|v| v.len())
262 .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
263 if size > self.max_payload_bytes {
264 return Err(Error::BadRequest(format!(
265 "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
266 self.max_payload_bytes
267 )));
268 }
269 Ok(())
270 }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
276#[serde(default)]
277pub struct Config {
278 pub data_dir: PathBuf,
280 pub rest_addr: SocketAddr,
282 pub grpc_addr: SocketAddr,
284 #[serde(default, deserialize_with = "auth::de_api_keys")]
290 pub api_keys: Vec<ApiKey>,
291 pub encryption_key: Option<String>,
299 pub master_key_file: Option<PathBuf>,
307 pub tls_cert: Option<PathBuf>,
310 pub tls_key: Option<PathBuf>,
313 pub tls_client_ca: Option<PathBuf>,
318 pub audit_log: Option<PathBuf>,
324 pub leader_url: Option<String>,
330 pub leader_api_key: Option<String>,
333 pub insecure: bool,
337 pub limits: Limits,
340 #[serde(default)]
346 pub embedding: HashMap<String, EmbeddingConfig>,
347 #[serde(default)]
351 pub rerank: HashMap<String, RerankConfig>,
352 #[serde(default)]
356 pub rate_limit: RateLimitConfig,
357 #[serde(default)]
361 pub otlp: OtlpConfig,
362 #[serde(default)]
367 pub mvcc_reads: bool,
368 #[serde(default)]
374 pub cluster_shards: Vec<String>,
375 #[serde(default)]
378 pub cluster_shard_key: Option<String>,
379}
380
381impl Default for Config {
382 fn default() -> Self {
383 Self {
384 data_dir: PathBuf::from("./quiver-data"),
385 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
386 grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
387 api_keys: Vec::new(),
388 encryption_key: None,
389 master_key_file: None,
390 tls_cert: None,
391 tls_key: None,
392 tls_client_ca: None,
393 audit_log: None,
394 leader_url: None,
395 leader_api_key: None,
396 insecure: false,
397 limits: Limits::default(),
398 embedding: HashMap::new(),
399 rerank: HashMap::new(),
400 rate_limit: RateLimitConfig::default(),
401 otlp: OtlpConfig::default(),
402 mvcc_reads: false,
403 cluster_shards: Vec::new(),
404 cluster_shard_key: None,
405 }
406 }
407}
408
409impl Config {
410 pub fn load() -> Result<Self, Error> {
413 let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
414 .merge(Toml::file("quiver.toml"))
415 .merge(Env::prefixed("QUIVER_"))
416 .extract()
417 .map_err(|e| Error::Config(e.to_string()))?;
418 config.limits.apply_env_overrides()?;
421 config
423 .rate_limit
424 .apply_env_overrides()
425 .map_err(Error::Config)?;
426 config.otlp.apply_env_overrides().map_err(Error::Config)?;
428 Ok(config)
429 }
430
431 pub fn validate(&self) -> Result<(), Error> {
435 if self.api_keys.is_empty() && !self.insecure {
436 return Err(Error::Config(
437 "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
438 set insecure=true for local development"
439 .to_owned(),
440 ));
441 }
442 let master_key = self.master_key_hex()?;
444 if master_key.is_none() && !self.insecure {
445 return Err(Error::Config(
446 "no encryption key configured: encryption-at-rest is on by default — \
447 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
448 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
449 store data unencrypted (development only)"
450 .to_owned(),
451 ));
452 }
453 if let Some(key) = &master_key {
455 AeadCodec::from_hex(key)
456 .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
457 }
458 if self.tls_cert.is_some() != self.tls_key.is_some() {
460 return Err(Error::Config(
461 "tls_cert and tls_key must be set together".to_owned(),
462 ));
463 }
464 if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
466 return Err(Error::Config(
467 "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
468 ));
469 }
470 let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
471 let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
472 if non_loopback && !tls_enabled && !self.insecure {
473 return Err(Error::Config(
474 "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
475 or insecure=true for local development"
476 .to_owned(),
477 ));
478 }
479 self.limits.validate()?;
481 Ok(())
482 }
483
484 pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
494 let env_key = self
495 .encryption_key
496 .as_deref()
497 .map(str::trim)
498 .filter(|k| !k.is_empty());
499 match (&self.master_key_file, env_key) {
500 (Some(_), Some(_)) => Err(Error::Config(
501 "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
502 (QUIVER_MASTER_KEY_FILE), not both"
503 .to_owned(),
504 )),
505 (Some(path), None) => {
506 warn_if_world_readable(path);
507 let hex = std::fs::read_to_string(path).map_err(|e| {
508 Error::Config(format!("reading master_key_file {}: {e}", path.display()))
509 })?;
510 Ok(Some(hex.trim().to_owned()))
511 }
512 (None, Some(key)) => Ok(Some(key.to_owned())),
513 (None, None) => Ok(None),
514 }
515 }
516}
517
518#[cfg(unix)]
521fn warn_if_world_readable(path: &std::path::Path) {
522 use std::os::unix::fs::PermissionsExt;
523 if let Ok(meta) = std::fs::metadata(path)
524 && meta.permissions().mode() & 0o077 != 0
525 {
526 tracing::warn!(
527 path = %path.display(),
528 mode = format!("{:o}", meta.permissions().mode() & 0o777),
529 "master key file is group/world-accessible; restrict it to 0600"
530 );
531 }
532}
533
534#[cfg(not(unix))]
535fn warn_if_world_readable(_path: &std::path::Path) {}
536
537#[derive(Clone)]
540pub(crate) struct AppState {
541 db: Arc<RwLock<Database>>,
546 keys: Arc<Vec<ApiKey>>,
547 audit: Arc<AuditLog>,
548 replication_tx: broadcast::Sender<WalEntry>,
551 read_only: bool,
554 limits: Limits,
557 embed: Arc<EmbedRegistry>,
561 rate_limiter: Arc<RateLimiter>,
563 metrics: Arc<metrics::Metrics>,
565 rebuilding: Arc<Mutex<HashSet<String>>>,
569 snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
576 mvcc: bool,
581 cluster: Option<Arc<cluster::Cluster>>,
585}
586
587pub(crate) struct CollectionInfo {
589 pub name: String,
590 pub dim: u32,
591 pub metric: DistanceMetric,
592 pub count: u64,
593 pub index: IndexSpec,
594 pub filterable: Vec<FilterableField>,
595 pub multivector: bool,
596 pub vector_encryption: VectorEncryption,
597}
598
599pub(crate) struct PointIn {
601 pub id: String,
602 pub vector: Vec<f32>,
603 pub payload: Value,
604}
605
606pub(crate) struct TextPointIn {
608 pub id: String,
609 pub text: String,
610 pub payload: Value,
611}
612
613const RERANK_CANDIDATES: usize = 50;
616
617fn doc_text(payload: Option<&Value>) -> String {
621 match payload {
622 Some(Value::Object(map)) => map
623 .get(TEXT_KEY)
624 .and_then(Value::as_str)
625 .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
626 Some(v) => v.to_string(),
627 None => String::new(),
628 }
629}
630
631pub(crate) struct PointOut {
633 pub id: String,
634 pub vector: Option<Vec<f32>>,
635 pub payload: Value,
636}
637
638pub(crate) struct MatchOut {
640 pub id: String,
641 pub score: f32,
642 pub payload: Option<Value>,
643 pub vector: Option<Vec<f32>>,
644}
645
646pub(crate) struct DocumentIn {
648 pub id: String,
649 pub vectors: Vec<Vec<f32>>,
650 pub payload: Value,
651}
652
653pub(crate) struct DocumentMatchOut {
655 pub id: String,
656 pub score: f32,
657 pub payload: Option<Value>,
658 pub vectors: Option<Vec<Vec<f32>>>,
659}
660
661impl AppState {
662 pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
666 auth::authenticate(&self.keys, presented)
667 }
668
669 pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
673 self.rate_limiter.check(actor)
674 }
675
676 pub(crate) fn rate_limit_enabled(&self) -> bool {
679 self.rate_limiter.enabled()
680 }
681
682 async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
686 where
687 T: Send + 'static,
688 F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
689 {
690 let db = Arc::clone(&self.db);
691 tokio::task::spawn_blocking(move || -> Result<T, Error> {
692 let mut guard = db
693 .write()
694 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
695 f(&mut guard).map_err(Error::Engine)
696 })
697 .await
698 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
699 }
700
701 async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
705 where
706 T: Send + 'static,
707 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
708 {
709 let db = Arc::clone(&self.db);
710 tokio::task::spawn_blocking(move || -> Result<T, Error> {
711 let guard = db
712 .read()
713 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
714 f(&guard).map_err(Error::Engine)
715 })
716 .await
717 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
718 }
719
720 async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
727 where
728 T: Send + 'static,
729 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
730 {
731 let db = Arc::clone(&self.db);
732 let cells = Arc::clone(&self.snapshot_cells);
733 let coll = collection.clone();
734 let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
735 let guard = db
736 .read()
737 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
738 let result = f(&guard).map_err(Error::Engine)?;
739 let stale = guard.needs_rebuild(&coll).unwrap_or(false);
742 if !stale
747 && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
748 && let Ok(mut map) = cells.write()
749 {
750 map.entry(coll.clone()).or_insert(cell);
751 }
752 Ok((result, stale))
753 })
754 .await
755 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
756 if stale {
757 self.schedule_rebuild(collection);
758 }
759 Ok(result)
760 }
761
762 fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
766 self.snapshot_cells.read().ok()?.get(collection).cloned()
767 }
768
769 async fn schedule_if_stale(&self, collection: &str) {
775 let db = Arc::clone(&self.db);
776 let coll = collection.to_owned();
777 let stale = tokio::task::spawn_blocking(move || {
778 db.read()
779 .ok()
780 .and_then(|g| g.needs_rebuild(&coll).ok())
781 .unwrap_or(false)
782 })
783 .await
784 .unwrap_or(false);
785 if stale {
786 self.schedule_rebuild(collection.to_owned());
787 }
788 }
789
790 fn schedule_rebuild(&self, collection: String) {
794 {
795 let mut inflight = match self.rebuilding.lock() {
796 Ok(g) => g,
797 Err(_) => return,
798 };
799 if !inflight.insert(collection.clone()) {
800 return; }
802 }
803 let state = self.clone();
804 tokio::spawn(async move {
805 state.run_rebuild(&collection).await;
806 if let Ok(mut inflight) = state.rebuilding.lock() {
807 inflight.remove(&collection);
808 }
809 });
810 }
811
812 async fn run_rebuild(&self, collection: &str) {
819 loop {
820 let db = Arc::clone(&self.db);
821 let coll = collection.to_owned();
822 let inputs = tokio::task::spawn_blocking(move || {
823 let guard = db.read().ok()?;
824 guard.snapshot_rebuild_inputs(&coll).ok().flatten()
825 })
826 .await
827 .ok()
828 .flatten();
829 let Some(inputs) = inputs else { return };
830
831 let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
832 return;
833 };
834
835 let db = Arc::clone(&self.db);
836 let still_stale = tokio::task::spawn_blocking(move || {
837 let mut guard = db.write().ok()?;
838 guard.commit_rebuild(rebuilt).ok()
839 })
840 .await
841 .ok()
842 .flatten();
843 match still_stale {
844 Some(true) => continue, _ => return,
846 }
847 }
848 }
849
850 fn authorize(
853 &self,
854 principal: &Principal,
855 action: Action,
856 op: &str,
857 resource: &str,
858 ) -> Result<(), Error> {
859 principal
860 .require(action, Some(resource))
861 .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
862 }
863
864 fn authorize_global(
867 &self,
868 principal: &Principal,
869 action: Action,
870 op: &str,
871 ) -> Result<(), Error> {
872 principal
873 .require(action, None)
874 .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
875 }
876
877 pub(crate) async fn open_replication(
884 &self,
885 principal: &Principal,
886 ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
887 self.authorize_global(principal, Action::Admin, "replicate")?;
888 let tx = self.replication_tx.clone();
889 self.read_blocking(move |db| {
890 let rx = tx.subscribe();
891 let snapshot = db.replication_snapshot()?;
892 Ok((snapshot, rx))
893 })
894 .await
895 }
896
897 pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
901 self.write_blocking(move |db| db.apply_replicated(op)).await
902 }
903
904 fn ensure_writable(&self, op: &str) -> Result<(), Error> {
907 if self.read_only {
908 return Err(Error::Forbidden(format!(
909 "{op}: this node is a read-only replication follower"
910 )));
911 }
912 Ok(())
913 }
914
915 #[allow(clippy::too_many_arguments)]
916 pub(crate) async fn create_collection(
917 &self,
918 principal: &Principal,
919 name: String,
920 dim: u32,
921 metric: DistanceMetric,
922 index: IndexSpec,
923 filterable: Vec<FilterableField>,
924 multivector: bool,
925 vector_encryption: VectorEncryption,
926 ) -> Result<CollectionInfo, Error> {
927 self.ensure_writable("create_collection")?;
928 self.authorize(principal, Action::Admin, "create_collection", &name)?;
929 self.limits.check_dim(dim as usize)?;
930 if let Some(c) = &self.cluster {
931 return c
932 .create_collection(
933 name,
934 dim,
935 metric,
936 index,
937 filterable,
938 multivector,
939 vector_encryption,
940 )
941 .await;
942 }
943 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
944 .with_index(index)
945 .with_filterable(filterable.clone())
946 .with_multivector(multivector)
947 .with_vector_encryption(vector_encryption);
948 let owned = name.clone();
949 let result = self
950 .write_blocking(move |db| db.create_collection(&owned, descriptor))
951 .await;
952 self.audit.record(
953 principal.actor(),
954 "create_collection",
955 &name,
956 Outcome::of(&result),
957 );
958 result?;
959 Ok(CollectionInfo {
960 name,
961 dim,
962 metric,
963 count: 0,
964 index,
965 filterable,
966 multivector,
967 vector_encryption,
968 })
969 }
970
971 pub(crate) async fn get_collection(
972 &self,
973 principal: &Principal,
974 name: String,
975 ) -> Result<CollectionInfo, Error> {
976 self.authorize(principal, Action::Read, "get_collection", &name)?;
977 self.read_blocking(move |db| {
978 let descriptor = db
979 .descriptor(&name)
980 .cloned()
981 .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
982 let count = if descriptor.multivector {
985 db.document_count(&name)? as u64
986 } else {
987 db.len(&name)? as u64
988 };
989 Ok(CollectionInfo {
990 name,
991 dim: descriptor.dim,
992 metric: descriptor.metric,
993 count,
994 index: descriptor.index,
995 filterable: descriptor.filterable,
996 multivector: descriptor.multivector,
997 vector_encryption: descriptor.vector_encryption,
998 })
999 })
1000 .await
1001 }
1002
1003 pub(crate) async fn list_collections(
1004 &self,
1005 principal: &Principal,
1006 ) -> Result<Vec<CollectionInfo>, Error> {
1007 self.authorize_global(principal, Action::Read, "list_collections")?;
1008 let mut infos = self
1009 .read_blocking(|db| {
1010 let mut out = Vec::new();
1011 for name in db.collection_names() {
1012 if let Some(descriptor) = db.descriptor(&name).cloned() {
1013 let count = if descriptor.multivector {
1014 db.document_count(&name)? as u64
1015 } else {
1016 db.len(&name)? as u64
1017 };
1018 out.push(CollectionInfo {
1019 name,
1020 dim: descriptor.dim,
1021 metric: descriptor.metric,
1022 count,
1023 index: descriptor.index,
1024 filterable: descriptor.filterable,
1025 multivector: descriptor.multivector,
1026 vector_encryption: descriptor.vector_encryption,
1027 });
1028 }
1029 }
1030 Ok(out)
1031 })
1032 .await?;
1033 infos.retain(|info| principal.can_see(&info.name));
1035 Ok(infos)
1036 }
1037
1038 pub(crate) async fn delete_collection(
1039 &self,
1040 principal: &Principal,
1041 name: String,
1042 ) -> Result<bool, Error> {
1043 self.ensure_writable("delete_collection")?;
1044 self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1045 if let Some(c) = &self.cluster {
1046 return c.drop_collection(&name).await;
1047 }
1048 let resource = name.clone();
1049 let result = self
1050 .write_blocking(move |db| db.drop_collection(&name))
1051 .await;
1052 self.audit.record(
1053 principal.actor(),
1054 "delete_collection",
1055 &resource,
1056 Outcome::of(&result),
1057 );
1058 if matches!(result, Ok(true))
1061 && let Ok(mut map) = self.snapshot_cells.write()
1062 {
1063 map.remove(&resource);
1064 }
1065 result
1066 }
1067
1068 #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1069 pub(crate) async fn upsert(
1070 &self,
1071 principal: &Principal,
1072 collection: String,
1073 points: Vec<PointIn>,
1074 ) -> Result<u64, Error> {
1075 self.ensure_writable("upsert")?;
1076 self.authorize(principal, Action::Write, "upsert", &collection)?;
1077 self.limits.check_batch(points.len())?;
1078 for p in &points {
1079 self.limits.check_vector_len(p.vector.len())?;
1080 self.limits.check_payload(&p.payload)?;
1081 }
1082 if let Some(c) = &self.cluster {
1083 return c.upsert(&collection, points).await;
1084 }
1085 let resource = collection.clone();
1086 let result = self
1087 .write_blocking(move |db| {
1088 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1089 .iter()
1090 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1091 .collect();
1092 db.upsert_batch(&collection, &records)
1093 })
1094 .await;
1095 self.audit
1096 .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1097 if self.mvcc && result.is_ok() {
1098 self.schedule_if_stale(&resource).await;
1099 }
1100 result
1101 }
1102
1103 pub(crate) async fn upsert_bulk(
1106 &self,
1107 principal: &Principal,
1108 collection: String,
1109 points: Vec<PointIn>,
1110 ) -> Result<u64, Error> {
1111 self.ensure_writable("upsert")?;
1112 self.authorize(principal, Action::Write, "upsert", &collection)?;
1113 self.limits.check_bulk_batch(points.len())?;
1114 for p in &points {
1115 self.limits.check_vector_len(p.vector.len())?;
1116 self.limits.check_payload(&p.payload)?;
1117 }
1118 if let Some(c) = &self.cluster {
1119 return c.upsert_bulk(&collection, points).await;
1120 }
1121 let resource = collection.clone();
1122 let result = self
1123 .write_blocking(move |db| {
1124 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1125 .iter()
1126 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1127 .collect();
1128 db.upsert_bulk(&collection, &records)
1129 })
1130 .await;
1131 self.audit.record(
1132 principal.actor(),
1133 "upsert_bulk",
1134 &resource,
1135 Outcome::of(&result),
1136 );
1137 if self.mvcc && result.is_ok() {
1138 self.schedule_if_stale(&resource).await;
1139 }
1140 result
1141 }
1142
1143 #[tracing::instrument(skip_all)]
1147 pub(crate) async fn snapshot(
1148 &self,
1149 principal: &Principal,
1150 destination: String,
1151 ) -> Result<SnapshotInfo, Error> {
1152 self.ensure_writable("snapshot")?;
1153 self.authorize_global(principal, Action::Admin, "snapshot")?;
1154 let dest = std::path::PathBuf::from(&destination);
1155 let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1156 self.audit.record(
1157 principal.actor(),
1158 "snapshot",
1159 &destination,
1160 Outcome::of(&result),
1161 );
1162 result
1163 }
1164
1165 pub(crate) async fn delete_points(
1166 &self,
1167 principal: &Principal,
1168 collection: String,
1169 ids: Vec<String>,
1170 ) -> Result<u64, Error> {
1171 self.ensure_writable("delete_points")?;
1172 self.authorize(principal, Action::Write, "delete_points", &collection)?;
1173 if let Some(c) = &self.cluster {
1174 return c.delete_points(&collection, ids).await;
1175 }
1176 let resource = collection.clone();
1177 let result = self
1178 .write_blocking(move |db| {
1179 let mut count = 0u64;
1180 for id in &ids {
1181 if db.delete(&collection, id)? {
1182 count += 1;
1183 }
1184 }
1185 Ok(count)
1186 })
1187 .await;
1188 self.audit.record(
1189 principal.actor(),
1190 "delete_points",
1191 &resource,
1192 Outcome::of(&result),
1193 );
1194 if self.mvcc && result.is_ok() {
1195 self.schedule_if_stale(&resource).await;
1196 }
1197 result
1198 }
1199
1200 pub(crate) async fn get_points(
1201 &self,
1202 principal: &Principal,
1203 collection: String,
1204 ids: Vec<String>,
1205 with_vector: bool,
1206 ) -> Result<Vec<PointOut>, Error> {
1207 self.authorize(principal, Action::Read, "get_points", &collection)?;
1208 if let Some(c) = &self.cluster {
1209 return c.get_points(&collection, ids, with_vector).await;
1210 }
1211 self.read_blocking(move |db| {
1212 let mut out = Vec::new();
1213 for id in &ids {
1214 if let Some(m) = db.get(&collection, id)? {
1215 out.push(PointOut {
1216 id: m.id,
1217 vector: if with_vector { m.vector } else { None },
1218 payload: m.payload.unwrap_or(Value::Null),
1219 });
1220 }
1221 }
1222 Ok(out)
1223 })
1224 .await
1225 }
1226
1227 #[allow(clippy::too_many_arguments)]
1228 #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1229 pub(crate) async fn search(
1230 &self,
1231 principal: &Principal,
1232 collection: String,
1233 vector: Vec<f32>,
1234 k: usize,
1235 filter: Option<Filter>,
1236 ef_search: usize,
1237 with_payload: bool,
1238 with_vector: bool,
1239 ) -> Result<Vec<MatchOut>, Error> {
1240 self.authorize(principal, Action::Read, "search", &collection)?;
1241 self.limits.check_search(k, ef_search)?;
1242 self.limits.check_vector_len(vector.len())?;
1243
1244 if let Some(c) = &self.cluster {
1246 return c
1247 .search(
1248 &collection,
1249 vector,
1250 k,
1251 filter,
1252 ef_search,
1253 with_payload,
1254 with_vector,
1255 )
1256 .await;
1257 }
1258
1259 if filter.is_none()
1265 && !with_payload
1266 && !with_vector
1267 && let Some(cell) = self.cached_cell(&collection)
1268 {
1269 return tokio::task::spawn_blocking(move || {
1270 let matches = cell.load().search(&vector, k, ef_search)?;
1271 Ok::<_, quiver_embed::Error>(
1272 matches
1273 .into_iter()
1274 .map(|m| MatchOut {
1275 id: m.id,
1276 score: m.score,
1277 payload: None,
1278 vector: None,
1279 })
1280 .collect(),
1281 )
1282 })
1283 .await
1284 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1285 .map_err(Error::Engine);
1286 }
1287
1288 let params = SearchParams {
1289 k,
1290 filter,
1291 ef_search,
1292 with_payload,
1293 with_vector,
1294 };
1295 let coll = collection.clone();
1296 self.search_blocking(coll, move |db| {
1297 let matches = db.search_snapshot(&collection, &vector, ¶ms)?;
1298 Ok(matches
1299 .into_iter()
1300 .map(|m| MatchOut {
1301 id: m.id,
1302 score: m.score,
1303 payload: m.payload,
1304 vector: m.vector,
1305 })
1306 .collect())
1307 })
1308 .await
1309 }
1310
1311 #[allow(clippy::too_many_arguments)]
1312 pub(crate) async fn hybrid_search(
1313 &self,
1314 principal: &Principal,
1315 collection: String,
1316 dense: Option<Vec<f32>>,
1317 sparse: Option<(Vec<u32>, Vec<f32>)>,
1318 text: Option<String>,
1319 k: usize,
1320 filter: Option<Filter>,
1321 ef_search: usize,
1322 rrf_k0: f32,
1323 with_payload: bool,
1324 with_vector: bool,
1325 ) -> Result<Vec<MatchOut>, Error> {
1326 self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1327 self.limits.check_search(k, ef_search)?;
1328 if let Some(v) = &dense {
1329 self.limits.check_vector_len(v.len())?;
1330 }
1331 if let Some((indices, values)) = &sparse {
1332 self.limits.check_sparse_terms(indices.len())?;
1333 if indices.len() != values.len() {
1334 return Err(Error::BadRequest(format!(
1335 "sparse query indices ({}) and values ({}) length mismatch",
1336 indices.len(),
1337 values.len()
1338 )));
1339 }
1340 }
1341 let params = SearchParams {
1342 k,
1343 filter,
1344 ef_search,
1345 with_payload,
1346 with_vector,
1347 };
1348 let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1349 let coll = collection.clone();
1350 self.search_blocking(coll, move |db| {
1351 let matches = db.hybrid_search_snapshot(
1352 &collection,
1353 dense.as_deref(),
1354 sv.as_ref(),
1355 text.as_deref(),
1356 ¶ms,
1357 rrf_k0,
1358 )?;
1359 Ok(matches
1360 .into_iter()
1361 .map(|m| MatchOut {
1362 id: m.id,
1363 score: m.score,
1364 payload: m.payload,
1365 vector: m.vector,
1366 })
1367 .collect())
1368 })
1369 .await
1370 }
1371
1372 #[allow(clippy::too_many_arguments)]
1377 pub(crate) async fn search_text(
1378 &self,
1379 principal: &Principal,
1380 collection: String,
1381 text: String,
1382 k: usize,
1383 filter: Option<Filter>,
1384 ef_search: usize,
1385 rrf_k0: f32,
1386 with_payload: bool,
1387 with_vector: bool,
1388 rerank: bool,
1389 ) -> Result<Vec<MatchOut>, Error> {
1390 self.authorize(principal, Action::Read, "search_text", &collection)?;
1391 self.limits.check_search(k, ef_search)?;
1392 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1393 Error::BadRequest(format!(
1394 "collection {collection:?} has no embedding provider configured \
1395 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1396 ))
1397 })?;
1398 let query = text.clone();
1400 let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1401 .await
1402 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1403 .map_err(|e| Error::Upstream(e.to_string()))?
1404 .into_iter()
1405 .next()
1406 .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1407 self.limits.check_vector_len(vector.len())?;
1408
1409 let reranker = if rerank {
1410 self.embed.reranker(&collection)
1411 } else {
1412 None
1413 };
1414 let need_payload = with_payload || reranker.is_some();
1418 let fetch_k = if reranker.is_some() {
1419 k.max(RERANK_CANDIDATES)
1420 } else {
1421 k
1422 };
1423
1424 let mut hits = self
1425 .hybrid_search(
1426 principal,
1427 collection,
1428 Some(vector),
1429 None,
1430 Some(text.clone()),
1431 fetch_k,
1432 filter,
1433 ef_search,
1434 rrf_k0,
1435 need_payload,
1436 with_vector,
1437 )
1438 .await?;
1439
1440 if let Some(rr) = reranker {
1441 let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1442 let query = text;
1443 let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1444 .await
1445 .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1446 .map_err(|e| Error::Upstream(e.to_string()))?;
1447 let mut scored: Vec<(f32, MatchOut)> = scores
1449 .into_iter()
1450 .zip(hits)
1451 .map(|(s, mut h)| {
1452 h.score = s;
1453 (s, h)
1454 })
1455 .collect();
1456 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1457 hits = scored.into_iter().map(|(_, h)| h).collect();
1458 }
1459
1460 hits.truncate(k);
1461 if !with_payload {
1463 for h in &mut hits {
1464 h.payload = None;
1465 }
1466 }
1467 Ok(hits)
1468 }
1469
1470 pub(crate) async fn upsert_text(
1475 &self,
1476 principal: &Principal,
1477 collection: String,
1478 points: Vec<TextPointIn>,
1479 ) -> Result<u64, Error> {
1480 self.ensure_writable("upsert_text")?;
1481 self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1482 self.limits.check_batch(points.len())?;
1483 for p in &points {
1484 if !matches!(p.payload, Value::Object(_) | Value::Null) {
1485 return Err(Error::BadRequest(
1486 "upsert_text payload must be a JSON object or null".to_owned(),
1487 ));
1488 }
1489 }
1490 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1491 Error::BadRequest(format!(
1492 "collection {collection:?} has no embedding provider configured \
1493 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1494 ))
1495 })?;
1496 let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1497 let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1498 .await
1499 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1500 .map_err(|e| Error::Upstream(e.to_string()))?;
1501 if vectors.len() != points.len() {
1502 return Err(Error::Upstream(format!(
1503 "embedding provider returned {} vectors for {} inputs",
1504 vectors.len(),
1505 points.len()
1506 )));
1507 }
1508 let dense: Vec<PointIn> = points
1509 .into_iter()
1510 .zip(vectors)
1511 .map(|(p, vector)| {
1512 let mut payload = match p.payload {
1513 Value::Object(map) => map,
1514 _ => serde_json::Map::new(),
1515 };
1516 payload
1518 .entry(TEXT_KEY.to_owned())
1519 .or_insert_with(|| Value::String(p.text.clone()));
1520 PointIn {
1521 id: p.id,
1522 vector,
1523 payload: Value::Object(payload),
1524 }
1525 })
1526 .collect();
1527 self.upsert(principal, collection, dense).await
1528 }
1529
1530 pub(crate) async fn fetch(
1531 &self,
1532 principal: &Principal,
1533 collection: String,
1534 filter: Option<Filter>,
1535 limit: usize,
1536 with_payload: bool,
1537 with_vector: bool,
1538 ) -> Result<Vec<MatchOut>, Error> {
1539 self.authorize(principal, Action::Read, "fetch", &collection)?;
1540 self.limits.check_fetch(limit)?;
1541 self.read_blocking(move |db| {
1542 let matches = db.fetch(
1543 &collection,
1544 filter.as_ref(),
1545 limit,
1546 with_payload,
1547 with_vector,
1548 )?;
1549 Ok(matches
1550 .into_iter()
1551 .map(|m| MatchOut {
1552 id: m.id,
1553 score: m.score,
1554 payload: m.payload,
1555 vector: m.vector,
1556 })
1557 .collect())
1558 })
1559 .await
1560 }
1561
1562 pub(crate) async fn upsert_documents(
1563 &self,
1564 principal: &Principal,
1565 collection: String,
1566 documents: Vec<DocumentIn>,
1567 ) -> Result<u64, Error> {
1568 self.ensure_writable("upsert_documents")?;
1569 self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1570 self.limits.check_batch(documents.len())?;
1571 for doc in &documents {
1572 self.limits.check_payload(&doc.payload)?;
1573 for token in &doc.vectors {
1574 self.limits.check_vector_len(token.len())?;
1575 }
1576 }
1577 let resource = collection.clone();
1578 let result = self
1579 .write_blocking(move |db| {
1580 let mut count = 0u64;
1581 for doc in &documents {
1582 db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1583 count += 1;
1584 }
1585 Ok(count)
1586 })
1587 .await;
1588 self.audit.record(
1589 principal.actor(),
1590 "upsert_documents",
1591 &resource,
1592 Outcome::of(&result),
1593 );
1594 result
1595 }
1596
1597 pub(crate) async fn delete_documents(
1598 &self,
1599 principal: &Principal,
1600 collection: String,
1601 ids: Vec<String>,
1602 ) -> Result<u64, Error> {
1603 self.ensure_writable("delete_documents")?;
1604 self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1605 let resource = collection.clone();
1606 let result = self
1607 .write_blocking(move |db| {
1608 let mut count = 0u64;
1609 for id in &ids {
1610 if db.delete_document(&collection, id)? {
1611 count += 1;
1612 }
1613 }
1614 Ok(count)
1615 })
1616 .await;
1617 self.audit.record(
1618 principal.actor(),
1619 "delete_documents",
1620 &resource,
1621 Outcome::of(&result),
1622 );
1623 result
1624 }
1625
1626 #[allow(clippy::too_many_arguments)]
1627 pub(crate) async fn search_multi_vector(
1628 &self,
1629 principal: &Principal,
1630 collection: String,
1631 query: Vec<Vec<f32>>,
1632 k: usize,
1633 filter: Option<Filter>,
1634 ef_search: usize,
1635 with_payload: bool,
1636 with_vector: bool,
1637 ) -> Result<Vec<DocumentMatchOut>, Error> {
1638 self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1639 self.limits.check_search(k, ef_search)?;
1640 for token in &query {
1641 self.limits.check_vector_len(token.len())?;
1642 }
1643 let params = SearchParams {
1644 k,
1645 filter,
1646 ef_search,
1647 with_payload,
1648 with_vector,
1649 };
1650 let coll = collection.clone();
1651 self.search_blocking(coll, move |db| {
1652 let matches = db.search_multi_vector_snapshot(&collection, &query, ¶ms)?;
1653 Ok(matches
1654 .into_iter()
1655 .map(|m| DocumentMatchOut {
1656 id: m.id,
1657 score: m.score,
1658 payload: m.payload,
1659 vectors: m.vectors,
1660 })
1661 .collect())
1662 })
1663 .await
1664 }
1665}
1666
1667const REPLICATION_BUFFER: usize = 1024;
1670
1671pub async fn run(config: Config) -> Result<(), Error> {
1673 config.validate()?;
1674 let rest_listener = TcpListener::bind(config.rest_addr)
1675 .await
1676 .map_err(Error::Io)?;
1677 let grpc_listener = TcpListener::bind(config.grpc_addr)
1678 .await
1679 .map_err(Error::Io)?;
1680 tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1681 tokio::select! {
1682 result = serve(config, rest_listener, grpc_listener) => result,
1683 () = shutdown_signal() => {
1684 tracing::info!("shutdown signal received");
1685 Ok(())
1686 }
1687 }
1688}
1689
1690pub async fn serve(
1693 config: Config,
1694 rest_listener: TcpListener,
1695 grpc_listener: TcpListener,
1696) -> Result<(), Error> {
1697 let mut db = open_database(&config)?;
1698 let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1699 let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1703 {
1704 let tx = replication_tx.clone();
1705 db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1706 let _ = tx.send(entry.clone());
1707 }));
1708 }
1709 let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1713 .map_err(|e| Error::Config(e.to_string()))?;
1714
1715 if config.mvcc_reads {
1718 db.set_mvcc_reads(true);
1719 }
1720 let mvcc = db.mvcc_reads();
1721
1722 let cluster = if config.cluster_shards.is_empty() {
1724 None
1725 } else {
1726 let c = cluster::Cluster::new(
1727 config.cluster_shards.clone(),
1728 config.cluster_shard_key.clone(),
1729 )?;
1730 tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
1731 Some(Arc::new(c))
1732 };
1733
1734 let state = AppState {
1735 db: Arc::new(RwLock::new(db)),
1736 keys: Arc::new(config.api_keys.clone()),
1737 audit,
1738 replication_tx,
1739 read_only: config.leader_url.is_some(),
1740 limits: config.limits,
1741 embed: Arc::new(embed),
1742 rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1743 metrics: Arc::new(metrics::Metrics::default()),
1744 rebuilding: Arc::new(Mutex::new(HashSet::new())),
1745 snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
1746 mvcc,
1747 cluster,
1748 };
1749
1750 if let Some(leader_url) = config.leader_url.clone() {
1752 replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1753 }
1754
1755 let app = rest::router(state.clone());
1756 let grpc = grpc::service(state);
1757
1758 let tls = load_tls(&config)?;
1759
1760 let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1762 Some(material) => {
1763 let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1764 let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1765 let server =
1766 axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1767 Box::pin(async move {
1768 server
1769 .serve(app.into_make_service())
1770 .await
1771 .map_err(Error::Io)
1772 })
1773 }
1774 None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1775 };
1776
1777 let mut grpc_builder = tonic::transport::Server::builder();
1779 if let Some(material) = &tls {
1780 let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1781 let mut tls_config = ServerTlsConfig::new().identity(identity);
1782 if let Some(ca_pem) = &material.client_ca_pem {
1784 tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1785 }
1786 grpc_builder = grpc_builder
1787 .tls_config(tls_config)
1788 .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1789 }
1790 let grpc_fut = async move {
1791 grpc_builder
1792 .add_service(grpc)
1793 .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1794 .await
1795 .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1796 };
1797
1798 tokio::try_join!(rest_fut, grpc_fut)?;
1799 Ok(())
1800}
1801
1802async fn shutdown_signal() {
1803 let _ = tokio::signal::ctrl_c().await;
1804}
1805
1806fn open_database(config: &Config) -> Result<Database, Error> {
1812 let master_key = config.master_key_hex()?;
1813 let keyring =
1814 quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1815 .map_err(|e| Error::Config(e.to_string()))?;
1816 let db = match keyring {
1817 Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1818 None => Database::open(&config.data_dir)?,
1819 };
1820 Ok(db)
1821}
1822
1823struct TlsMaterial {
1827 cert_pem: Vec<u8>,
1828 key_pem: Vec<u8>,
1829 client_ca_pem: Option<Vec<u8>>,
1830 rest_config: Arc<rustls::ServerConfig>,
1831}
1832
1833fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1838 match (&config.tls_cert, &config.tls_key) {
1839 (Some(cert_path), Some(key_path)) => {
1840 let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1841 let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1842 let client_ca_pem = config
1843 .tls_client_ca
1844 .as_ref()
1845 .map(std::fs::read)
1846 .transpose()
1847 .map_err(Error::Io)?;
1848 let rest_config = Arc::new(rustls_server_config(
1849 &cert_pem,
1850 &key_pem,
1851 client_ca_pem.as_deref(),
1852 )?);
1853 Ok(Some(TlsMaterial {
1854 cert_pem,
1855 key_pem,
1856 client_ca_pem,
1857 rest_config,
1858 }))
1859 }
1860 (None, None) => Ok(None),
1861 _ => Err(Error::Config(
1862 "tls_cert and tls_key must be set together".to_owned(),
1863 )),
1864 }
1865}
1866
1867fn rustls_server_config(
1871 cert_pem: &[u8],
1872 key_pem: &[u8],
1873 client_ca_pem: Option<&[u8]>,
1874) -> Result<rustls::ServerConfig, Error> {
1875 use rustls_pki_types::pem::PemObject;
1876 use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1877
1878 let certs = CertificateDer::pem_slice_iter(cert_pem)
1879 .collect::<std::result::Result<Vec<_>, _>>()
1880 .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1881 if certs.is_empty() {
1882 return Err(Error::Config(
1883 "tls_cert contains no certificates".to_owned(),
1884 ));
1885 }
1886 let key = PrivateKeyDer::from_pem_slice(key_pem)
1887 .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1888 let provider = Arc::new(rustls::crypto::ring::default_provider());
1889 let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1890 .with_safe_default_protocol_versions()
1891 .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1892 let builder = match client_ca_pem {
1893 Some(ca_pem) => {
1894 let mut roots = rustls::RootCertStore::empty();
1895 for cert in CertificateDer::pem_slice_iter(ca_pem) {
1896 let cert =
1897 cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1898 roots
1899 .add(cert)
1900 .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1901 }
1902 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1903 Arc::new(roots),
1904 provider,
1905 )
1906 .build()
1907 .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1908 builder.with_client_cert_verifier(verifier)
1909 }
1910 None => builder.with_no_client_auth(),
1911 };
1912 builder
1913 .with_single_cert(certs, key)
1914 .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1915}
1916
1917pub fn init_tracing() {
1920 init_observability(&Config::default());
1921}
1922
1923#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1929pub fn init_observability(config: &Config) {
1930 use tracing_subscriber::EnvFilter;
1931 use tracing_subscriber::prelude::*;
1932 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1933 let registry = tracing_subscriber::registry()
1934 .with(filter)
1935 .with(tracing_subscriber::fmt::layer());
1936
1937 #[cfg(feature = "otlp")]
1938 if config.otlp.is_enabled() {
1939 match otlp::build_provider(&config.otlp) {
1940 Ok(provider) => {
1941 use opentelemetry::trace::TracerProvider as _;
1942 let tracer = provider.tracer("quiver");
1943 otlp::store_provider(provider);
1944 let _ = registry
1945 .with(tracing_opentelemetry::layer().with_tracer(tracer))
1946 .try_init();
1947 return;
1948 }
1949 Err(e) => eprintln!("OTLP traces export disabled: {e}"),
1950 }
1951 }
1952
1953 let _ = registry.try_init();
1954}
1955
1956pub fn shutdown_observability() {
1960 #[cfg(feature = "otlp")]
1961 otlp::shutdown();
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966 use super::*;
1967
1968 const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
1970
1971 #[test]
1972 fn config_rejects_missing_keys_unless_insecure() {
1973 let mut config = Config::default();
1974 assert!(config.validate().is_err());
1975 config.insecure = true;
1976 assert!(config.validate().is_ok());
1977 config.insecure = false;
1978 config.api_keys = vec!["secret".into()];
1979 config.encryption_key = Some(TEST_KEY.to_owned());
1980 assert!(config.validate().is_ok());
1981 }
1982
1983 #[test]
1984 fn config_requires_encryption_key_unless_insecure() {
1985 let mut config = Config {
1986 api_keys: vec!["secret".into()],
1987 ..Config::default()
1988 };
1989 assert!(config.validate().is_err());
1991 config.encryption_key = Some(TEST_KEY.to_owned());
1992 assert!(config.validate().is_ok());
1993 config.encryption_key = Some("not-a-valid-hex-key".to_owned());
1995 assert!(config.validate().is_err());
1996 config.insecure = true;
1998 config.encryption_key = None;
1999 assert!(config.validate().is_ok());
2000 }
2001
2002 #[test]
2003 fn master_key_file_is_an_alternative_to_the_env_key() {
2004 let dir = tempfile::tempdir().unwrap();
2005 let path = dir.path().join("master.key");
2006 std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2008
2009 let mut config = Config {
2010 api_keys: vec!["secret".into()],
2011 master_key_file: Some(path.clone()),
2012 ..Config::default()
2013 };
2014 assert!(config.validate().is_ok());
2016 assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2017
2018 config.encryption_key = Some(TEST_KEY.to_owned());
2020 assert!(config.validate().is_err());
2021
2022 config.encryption_key = None;
2024 std::fs::write(&path, "not-a-valid-key").unwrap();
2025 assert!(config.validate().is_err());
2026 }
2027
2028 #[test]
2029 fn config_rejects_public_bind_without_optout() {
2030 let mut config = Config {
2031 api_keys: vec!["secret".into()],
2032 encryption_key: Some(TEST_KEY.to_owned()),
2033 ..Config::default()
2034 };
2035 config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2036 assert!(config.validate().is_err());
2038 config.insecure = true;
2039 assert!(config.validate().is_ok());
2040 }
2041
2042 #[test]
2043 fn config_public_bind_allowed_with_tls() {
2044 let config = Config {
2045 api_keys: vec!["secret".into()],
2046 encryption_key: Some(TEST_KEY.to_owned()),
2047 tls_cert: Some(PathBuf::from("cert.pem")),
2048 tls_key: Some(PathBuf::from("key.pem")),
2049 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2050 ..Config::default()
2051 };
2052 assert!(config.validate().is_ok());
2054 }
2055
2056 #[test]
2057 fn config_tls_cert_and_key_must_pair() {
2058 let mut config = Config {
2059 api_keys: vec!["secret".into()],
2060 encryption_key: Some(TEST_KEY.to_owned()),
2061 tls_cert: Some(PathBuf::from("cert.pem")),
2062 ..Config::default()
2063 };
2064 assert!(config.validate().is_err());
2066 config.tls_key = Some(PathBuf::from("key.pem"));
2067 assert!(config.validate().is_ok());
2068 }
2069}