1mod audit;
33mod auth;
34mod cluster;
35mod coordinator;
36mod error;
37mod grpc;
38mod metrics;
39mod otlp;
40mod rate_limit;
41mod replication;
42mod rest;
43
44use std::collections::{HashMap, HashSet};
45use std::future::Future;
46use std::net::{IpAddr, Ipv4Addr, SocketAddr};
47use std::path::PathBuf;
48use std::pin::Pin;
49use std::sync::{Arc, Mutex, RwLock};
50
51use axum_server::tls_rustls::RustlsConfig;
52use figment::Figment;
53use figment::providers::{Env, Format, Serialized, Toml};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use tokio::net::TcpListener;
57use tokio::sync::broadcast;
58use tokio_stream::wrappers::TcpListenerStream;
59use tonic::transport::{Certificate, Identity, ServerTlsConfig};
60
61use quiver_crypto::AeadCodec;
62use quiver_embed::{
63 Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
64 SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
65};
66use quiver_query::Filter;
67
68pub use auth::{Action, ApiKey, CollectionScope};
69pub use error::Error;
70pub use otlp::OtlpConfig;
71pub use quiver_providers::{
74 EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
75 RerankProvider,
76};
77pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
78
79use audit::{AuditLog, Outcome};
80use auth::Principal;
81
82#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
90#[serde(default)]
91pub struct Limits {
92 pub max_k: usize,
94 pub max_ef_search: usize,
96 pub max_fetch_limit: usize,
98 pub max_vector_dim: usize,
101 pub max_payload_bytes: usize,
103 pub max_batch_size: usize,
105 pub max_request_body_bytes: usize,
108 pub max_sparse_terms: usize,
111 pub max_bulk_batch_size: usize,
116}
117
118impl Default for Limits {
119 fn default() -> Self {
120 Self {
121 max_k: 10_000,
122 max_ef_search: 4_096,
123 max_fetch_limit: 10_000,
124 max_vector_dim: 8_192,
125 max_payload_bytes: 65_536,
126 max_batch_size: 1_000,
127 max_request_body_bytes: 32 * 1024 * 1024,
128 max_sparse_terms: 4_096,
129 max_bulk_batch_size: 50_000,
130 }
131 }
132}
133
134impl Limits {
135 fn apply_env_overrides(&mut self) -> Result<(), Error> {
139 let slots: [(&str, &mut usize); 9] = [
140 ("QUIVER_MAX_K", &mut self.max_k),
141 ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
142 ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
143 ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
144 ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
145 ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
146 (
147 "QUIVER_MAX_REQUEST_BODY_BYTES",
148 &mut self.max_request_body_bytes,
149 ),
150 ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
151 ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
152 ];
153 for (key, slot) in slots {
154 if let Ok(raw) = std::env::var(key) {
155 *slot = raw.parse().map_err(|_| {
156 Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
157 })?;
158 }
159 }
160 Ok(())
161 }
162
163 fn validate(&self) -> Result<(), Error> {
165 let named = [
166 ("max_k", self.max_k),
167 ("max_ef_search", self.max_ef_search),
168 ("max_fetch_limit", self.max_fetch_limit),
169 ("max_vector_dim", self.max_vector_dim),
170 ("max_payload_bytes", self.max_payload_bytes),
171 ("max_batch_size", self.max_batch_size),
172 ("max_request_body_bytes", self.max_request_body_bytes),
173 ("max_sparse_terms", self.max_sparse_terms),
174 ("max_bulk_batch_size", self.max_bulk_batch_size),
175 ];
176 if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
177 return Err(Error::Config(format!(
178 "limits.{name} must be greater than zero"
179 )));
180 }
181 Ok(())
182 }
183
184 fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
185 if k > self.max_k {
186 return Err(Error::BadRequest(format!(
187 "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
188 self.max_k
189 )));
190 }
191 if ef_search > self.max_ef_search {
192 return Err(Error::BadRequest(format!(
193 "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
194 self.max_ef_search
195 )));
196 }
197 Ok(())
198 }
199
200 fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
201 if n > self.max_sparse_terms {
202 return Err(Error::BadRequest(format!(
203 "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
204 self.max_sparse_terms
205 )));
206 }
207 Ok(())
208 }
209
210 fn check_fetch(&self, limit: usize) -> Result<(), Error> {
211 if limit > self.max_fetch_limit {
212 return Err(Error::BadRequest(format!(
213 "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
214 self.max_fetch_limit
215 )));
216 }
217 Ok(())
218 }
219
220 fn check_dim(&self, dim: usize) -> Result<(), Error> {
221 if dim > self.max_vector_dim {
222 return Err(Error::BadRequest(format!(
223 "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
224 self.max_vector_dim
225 )));
226 }
227 Ok(())
228 }
229
230 fn check_vector_len(&self, len: usize) -> Result<(), Error> {
231 if len > self.max_vector_dim {
232 return Err(Error::BadRequest(format!(
233 "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
234 self.max_vector_dim
235 )));
236 }
237 Ok(())
238 }
239
240 fn check_batch(&self, n: usize) -> Result<(), Error> {
241 if n > self.max_batch_size {
242 return Err(Error::BadRequest(format!(
243 "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
244 self.max_batch_size
245 )));
246 }
247 Ok(())
248 }
249
250 fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
251 if n > self.max_bulk_batch_size {
252 return Err(Error::BadRequest(format!(
253 "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
254 self.max_bulk_batch_size
255 )));
256 }
257 Ok(())
258 }
259
260 fn check_payload(&self, payload: &Value) -> Result<(), Error> {
261 let size = serde_json::to_vec(payload)
262 .map(|v| v.len())
263 .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
264 if size > self.max_payload_bytes {
265 return Err(Error::BadRequest(format!(
266 "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
267 self.max_payload_bytes
268 )));
269 }
270 Ok(())
271 }
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
277#[serde(default)]
278pub struct Config {
279 pub data_dir: PathBuf,
281 pub rest_addr: SocketAddr,
283 pub grpc_addr: SocketAddr,
285 #[serde(default, deserialize_with = "auth::de_api_keys")]
291 pub api_keys: Vec<ApiKey>,
292 pub encryption_key: Option<String>,
300 pub master_key_file: Option<PathBuf>,
308 pub tls_cert: Option<PathBuf>,
311 pub tls_key: Option<PathBuf>,
314 pub tls_client_ca: Option<PathBuf>,
319 pub audit_log: Option<PathBuf>,
325 pub leader_url: Option<String>,
331 pub leader_api_key: Option<String>,
334 pub insecure: bool,
338 pub limits: Limits,
341 #[serde(default)]
347 pub embedding: HashMap<String, EmbeddingConfig>,
348 #[serde(default)]
352 pub rerank: HashMap<String, RerankConfig>,
353 #[serde(default)]
357 pub rate_limit: RateLimitConfig,
358 #[serde(default)]
362 pub otlp: OtlpConfig,
363 #[serde(default)]
368 pub mvcc_reads: bool,
369 #[serde(default)]
376 pub cluster_shards: Vec<String>,
377 #[serde(default)]
385 pub cluster_replicas: Vec<String>,
386 #[serde(default)]
389 pub cluster_shard_key: Option<String>,
390 #[serde(default)]
396 pub coordinator: bool,
397 #[serde(default)]
402 pub coordinator_url: Option<String>,
403 #[serde(default)]
407 pub coordinator_state: Option<PathBuf>,
408}
409
410impl Default for Config {
411 fn default() -> Self {
412 Self {
413 data_dir: PathBuf::from("./quiver-data"),
414 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
415 grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
416 api_keys: Vec::new(),
417 encryption_key: None,
418 master_key_file: None,
419 tls_cert: None,
420 tls_key: None,
421 tls_client_ca: None,
422 audit_log: None,
423 leader_url: None,
424 leader_api_key: None,
425 insecure: false,
426 limits: Limits::default(),
427 embedding: HashMap::new(),
428 rerank: HashMap::new(),
429 rate_limit: RateLimitConfig::default(),
430 otlp: OtlpConfig::default(),
431 mvcc_reads: false,
432 cluster_shards: Vec::new(),
433 cluster_replicas: Vec::new(),
434 cluster_shard_key: None,
435 coordinator: false,
436 coordinator_url: None,
437 coordinator_state: None,
438 }
439 }
440}
441
442impl Config {
443 pub fn load() -> Result<Self, Error> {
446 let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
447 .merge(Toml::file("quiver.toml"))
448 .merge(Env::prefixed("QUIVER_"))
449 .extract()
450 .map_err(|e| Error::Config(e.to_string()))?;
451 config.limits.apply_env_overrides()?;
454 config
456 .rate_limit
457 .apply_env_overrides()
458 .map_err(Error::Config)?;
459 config.otlp.apply_env_overrides().map_err(Error::Config)?;
461 Ok(config)
462 }
463
464 pub fn validate(&self) -> Result<(), Error> {
468 if self.api_keys.is_empty() && !self.insecure {
469 return Err(Error::Config(
470 "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
471 set insecure=true for local development"
472 .to_owned(),
473 ));
474 }
475 let master_key = self.master_key_hex()?;
477 if master_key.is_none() && !self.insecure {
478 return Err(Error::Config(
479 "no encryption key configured: encryption-at-rest is on by default — \
480 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
481 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
482 store data unencrypted (development only)"
483 .to_owned(),
484 ));
485 }
486 if let Some(key) = &master_key {
488 AeadCodec::from_hex(key)
489 .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
490 }
491 if self.tls_cert.is_some() != self.tls_key.is_some() {
493 return Err(Error::Config(
494 "tls_cert and tls_key must be set together".to_owned(),
495 ));
496 }
497 if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
499 return Err(Error::Config(
500 "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
501 ));
502 }
503 let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
504 let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
505 if non_loopback && !tls_enabled && !self.insecure {
506 return Err(Error::Config(
507 "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
508 or insecure=true for local development"
509 .to_owned(),
510 ));
511 }
512 self.limits.validate()?;
514 Ok(())
515 }
516
517 pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
527 let env_key = self
528 .encryption_key
529 .as_deref()
530 .map(str::trim)
531 .filter(|k| !k.is_empty());
532 match (&self.master_key_file, env_key) {
533 (Some(_), Some(_)) => Err(Error::Config(
534 "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
535 (QUIVER_MASTER_KEY_FILE), not both"
536 .to_owned(),
537 )),
538 (Some(path), None) => {
539 warn_if_world_readable(path);
540 let hex = std::fs::read_to_string(path).map_err(|e| {
541 Error::Config(format!("reading master_key_file {}: {e}", path.display()))
542 })?;
543 Ok(Some(hex.trim().to_owned()))
544 }
545 (None, Some(key)) => Ok(Some(key.to_owned())),
546 (None, None) => Ok(None),
547 }
548 }
549}
550
551#[cfg(unix)]
554fn warn_if_world_readable(path: &std::path::Path) {
555 use std::os::unix::fs::PermissionsExt;
556 if let Ok(meta) = std::fs::metadata(path)
557 && meta.permissions().mode() & 0o077 != 0
558 {
559 tracing::warn!(
560 path = %path.display(),
561 mode = format!("{:o}", meta.permissions().mode() & 0o777),
562 "master key file is group/world-accessible; restrict it to 0600"
563 );
564 }
565}
566
567#[cfg(not(unix))]
568fn warn_if_world_readable(_path: &std::path::Path) {}
569
570#[derive(Clone)]
573pub(crate) struct AppState {
574 db: Arc<RwLock<Database>>,
579 keys: Arc<Vec<ApiKey>>,
580 audit: Arc<AuditLog>,
581 replication_tx: broadcast::Sender<WalEntry>,
584 read_only: bool,
587 limits: Limits,
590 embed: Arc<EmbedRegistry>,
594 rate_limiter: Arc<RateLimiter>,
596 metrics: Arc<metrics::Metrics>,
598 rebuilding: Arc<Mutex<HashSet<String>>>,
602 snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
609 mvcc: bool,
614 cluster: Option<Arc<cluster::Cluster>>,
618}
619
620pub(crate) struct CollectionInfo {
622 pub name: String,
623 pub dim: u32,
624 pub metric: DistanceMetric,
625 pub count: u64,
626 pub index: IndexSpec,
627 pub filterable: Vec<FilterableField>,
628 pub multivector: bool,
629 pub vector_encryption: VectorEncryption,
630}
631
632pub(crate) struct PointIn {
634 pub id: String,
635 pub vector: Vec<f32>,
636 pub payload: Value,
637}
638
639pub(crate) struct TextPointIn {
641 pub id: String,
642 pub text: String,
643 pub payload: Value,
644}
645
646const RERANK_CANDIDATES: usize = 50;
649
650fn doc_text(payload: Option<&Value>) -> String {
654 match payload {
655 Some(Value::Object(map)) => map
656 .get(TEXT_KEY)
657 .and_then(Value::as_str)
658 .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
659 Some(v) => v.to_string(),
660 None => String::new(),
661 }
662}
663
664pub(crate) struct PointOut {
666 pub id: String,
667 pub vector: Option<Vec<f32>>,
668 pub payload: Value,
669}
670
671pub(crate) struct MatchOut {
673 pub id: String,
674 pub score: f32,
675 pub payload: Option<Value>,
676 pub vector: Option<Vec<f32>>,
677}
678
679pub(crate) struct DocumentIn {
681 pub id: String,
682 pub vectors: Vec<Vec<f32>>,
683 pub payload: Value,
684}
685
686pub(crate) struct DocumentMatchOut {
688 pub id: String,
689 pub score: f32,
690 pub payload: Option<Value>,
691 pub vectors: Option<Vec<Vec<f32>>>,
692}
693
694impl AppState {
695 pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
699 auth::authenticate(&self.keys, presented)
700 }
701
702 pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
706 self.rate_limiter.check(actor)
707 }
708
709 pub(crate) fn rate_limit_enabled(&self) -> bool {
712 self.rate_limiter.enabled()
713 }
714
715 async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
719 where
720 T: Send + 'static,
721 F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
722 {
723 let db = Arc::clone(&self.db);
724 tokio::task::spawn_blocking(move || -> Result<T, Error> {
725 let mut guard = db
726 .write()
727 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
728 f(&mut guard).map_err(Error::Engine)
729 })
730 .await
731 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
732 }
733
734 async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
738 where
739 T: Send + 'static,
740 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
741 {
742 let db = Arc::clone(&self.db);
743 tokio::task::spawn_blocking(move || -> Result<T, Error> {
744 let guard = db
745 .read()
746 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
747 f(&guard).map_err(Error::Engine)
748 })
749 .await
750 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
751 }
752
753 async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
760 where
761 T: Send + 'static,
762 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
763 {
764 let db = Arc::clone(&self.db);
765 let cells = Arc::clone(&self.snapshot_cells);
766 let coll = collection.clone();
767 let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
768 let guard = db
769 .read()
770 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
771 let result = f(&guard).map_err(Error::Engine)?;
772 let stale = guard.needs_rebuild(&coll).unwrap_or(false);
775 if !stale
780 && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
781 && let Ok(mut map) = cells.write()
782 {
783 map.entry(coll.clone()).or_insert(cell);
784 }
785 Ok((result, stale))
786 })
787 .await
788 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
789 if stale {
790 self.schedule_rebuild(collection);
791 }
792 Ok(result)
793 }
794
795 fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
799 self.snapshot_cells.read().ok()?.get(collection).cloned()
800 }
801
802 async fn schedule_if_stale(&self, collection: &str) {
808 let db = Arc::clone(&self.db);
809 let coll = collection.to_owned();
810 let stale = tokio::task::spawn_blocking(move || {
811 db.read()
812 .ok()
813 .and_then(|g| g.needs_rebuild(&coll).ok())
814 .unwrap_or(false)
815 })
816 .await
817 .unwrap_or(false);
818 if stale {
819 self.schedule_rebuild(collection.to_owned());
820 }
821 }
822
823 fn schedule_rebuild(&self, collection: String) {
827 {
828 let mut inflight = match self.rebuilding.lock() {
829 Ok(g) => g,
830 Err(_) => return,
831 };
832 if !inflight.insert(collection.clone()) {
833 return; }
835 }
836 let state = self.clone();
837 tokio::spawn(async move {
838 state.run_rebuild(&collection).await;
839 if let Ok(mut inflight) = state.rebuilding.lock() {
840 inflight.remove(&collection);
841 }
842 });
843 }
844
845 async fn run_rebuild(&self, collection: &str) {
852 loop {
853 let db = Arc::clone(&self.db);
854 let coll = collection.to_owned();
855 let inputs = tokio::task::spawn_blocking(move || {
856 let guard = db.read().ok()?;
857 guard.snapshot_rebuild_inputs(&coll).ok().flatten()
858 })
859 .await
860 .ok()
861 .flatten();
862 let Some(inputs) = inputs else { return };
863
864 let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
865 return;
866 };
867
868 let db = Arc::clone(&self.db);
869 let still_stale = tokio::task::spawn_blocking(move || {
870 let mut guard = db.write().ok()?;
871 guard.commit_rebuild(rebuilt).ok()
872 })
873 .await
874 .ok()
875 .flatten();
876 match still_stale {
877 Some(true) => continue, _ => return,
879 }
880 }
881 }
882
883 fn authorize(
886 &self,
887 principal: &Principal,
888 action: Action,
889 op: &str,
890 resource: &str,
891 ) -> Result<(), Error> {
892 principal
893 .require(action, Some(resource))
894 .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
895 }
896
897 fn authorize_global(
900 &self,
901 principal: &Principal,
902 action: Action,
903 op: &str,
904 ) -> Result<(), Error> {
905 principal
906 .require(action, None)
907 .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
908 }
909
910 pub(crate) async fn open_replication(
917 &self,
918 principal: &Principal,
919 ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
920 self.authorize_global(principal, Action::Admin, "replicate")?;
921 let tx = self.replication_tx.clone();
922 self.read_blocking(move |db| {
923 let rx = tx.subscribe();
924 let snapshot = db.replication_snapshot()?;
925 Ok((snapshot, rx))
926 })
927 .await
928 }
929
930 pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
934 self.write_blocking(move |db| db.apply_replicated(op)).await
935 }
936
937 fn ensure_writable(&self, op: &str) -> Result<(), Error> {
940 if self.read_only {
941 return Err(Error::Forbidden(format!(
942 "{op}: this node is a read-only replication follower"
943 )));
944 }
945 Ok(())
946 }
947
948 #[allow(clippy::too_many_arguments)]
949 pub(crate) async fn create_collection(
950 &self,
951 principal: &Principal,
952 name: String,
953 dim: u32,
954 metric: DistanceMetric,
955 index: IndexSpec,
956 filterable: Vec<FilterableField>,
957 multivector: bool,
958 vector_encryption: VectorEncryption,
959 ) -> Result<CollectionInfo, Error> {
960 self.ensure_writable("create_collection")?;
961 self.authorize(principal, Action::Admin, "create_collection", &name)?;
962 self.limits.check_dim(dim as usize)?;
963 if let Some(c) = &self.cluster {
964 return c
965 .create_collection(
966 name,
967 dim,
968 metric,
969 index,
970 filterable,
971 multivector,
972 vector_encryption,
973 )
974 .await;
975 }
976 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
977 .with_index(index)
978 .with_filterable(filterable.clone())
979 .with_multivector(multivector)
980 .with_vector_encryption(vector_encryption);
981 let owned = name.clone();
982 let result = self
983 .write_blocking(move |db| db.create_collection(&owned, descriptor))
984 .await;
985 self.audit.record(
986 principal.actor(),
987 "create_collection",
988 &name,
989 Outcome::of(&result),
990 );
991 result?;
992 Ok(CollectionInfo {
993 name,
994 dim,
995 metric,
996 count: 0,
997 index,
998 filterable,
999 multivector,
1000 vector_encryption,
1001 })
1002 }
1003
1004 pub(crate) async fn get_collection(
1005 &self,
1006 principal: &Principal,
1007 name: String,
1008 ) -> Result<CollectionInfo, Error> {
1009 self.authorize(principal, Action::Read, "get_collection", &name)?;
1010 self.read_blocking(move |db| {
1011 let descriptor = db
1012 .descriptor(&name)
1013 .cloned()
1014 .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
1015 let count = if descriptor.multivector {
1018 db.document_count(&name)? as u64
1019 } else {
1020 db.len(&name)? as u64
1021 };
1022 Ok(CollectionInfo {
1023 name,
1024 dim: descriptor.dim,
1025 metric: descriptor.metric,
1026 count,
1027 index: descriptor.index,
1028 filterable: descriptor.filterable,
1029 multivector: descriptor.multivector,
1030 vector_encryption: descriptor.vector_encryption,
1031 })
1032 })
1033 .await
1034 }
1035
1036 pub(crate) async fn list_collections(
1037 &self,
1038 principal: &Principal,
1039 ) -> Result<Vec<CollectionInfo>, Error> {
1040 self.authorize_global(principal, Action::Read, "list_collections")?;
1041 let mut infos = self
1042 .read_blocking(|db| {
1043 let mut out = Vec::new();
1044 for name in db.collection_names() {
1045 if let Some(descriptor) = db.descriptor(&name).cloned() {
1046 let count = if descriptor.multivector {
1047 db.document_count(&name)? as u64
1048 } else {
1049 db.len(&name)? as u64
1050 };
1051 out.push(CollectionInfo {
1052 name,
1053 dim: descriptor.dim,
1054 metric: descriptor.metric,
1055 count,
1056 index: descriptor.index,
1057 filterable: descriptor.filterable,
1058 multivector: descriptor.multivector,
1059 vector_encryption: descriptor.vector_encryption,
1060 });
1061 }
1062 }
1063 Ok(out)
1064 })
1065 .await?;
1066 infos.retain(|info| principal.can_see(&info.name));
1068 Ok(infos)
1069 }
1070
1071 pub(crate) async fn delete_collection(
1072 &self,
1073 principal: &Principal,
1074 name: String,
1075 ) -> Result<bool, Error> {
1076 self.ensure_writable("delete_collection")?;
1077 self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1078 if let Some(c) = &self.cluster {
1079 return c.drop_collection(&name).await;
1080 }
1081 let resource = name.clone();
1082 let result = self
1083 .write_blocking(move |db| db.drop_collection(&name))
1084 .await;
1085 self.audit.record(
1086 principal.actor(),
1087 "delete_collection",
1088 &resource,
1089 Outcome::of(&result),
1090 );
1091 if matches!(result, Ok(true))
1094 && let Ok(mut map) = self.snapshot_cells.write()
1095 {
1096 map.remove(&resource);
1097 }
1098 result
1099 }
1100
1101 #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1102 pub(crate) async fn upsert(
1103 &self,
1104 principal: &Principal,
1105 collection: String,
1106 points: Vec<PointIn>,
1107 ) -> Result<u64, Error> {
1108 self.ensure_writable("upsert")?;
1109 self.authorize(principal, Action::Write, "upsert", &collection)?;
1110 self.limits.check_batch(points.len())?;
1111 for p in &points {
1112 self.limits.check_vector_len(p.vector.len())?;
1113 self.limits.check_payload(&p.payload)?;
1114 }
1115 if let Some(c) = &self.cluster {
1116 return c.upsert(&collection, points).await;
1117 }
1118 let resource = collection.clone();
1119 let result = self
1120 .write_blocking(move |db| {
1121 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1122 .iter()
1123 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1124 .collect();
1125 db.upsert_batch(&collection, &records)
1126 })
1127 .await;
1128 self.audit
1129 .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1130 if self.mvcc && result.is_ok() {
1131 self.schedule_if_stale(&resource).await;
1132 }
1133 result
1134 }
1135
1136 pub(crate) async fn upsert_bulk(
1139 &self,
1140 principal: &Principal,
1141 collection: String,
1142 points: Vec<PointIn>,
1143 ) -> Result<u64, Error> {
1144 self.ensure_writable("upsert")?;
1145 self.authorize(principal, Action::Write, "upsert", &collection)?;
1146 self.limits.check_bulk_batch(points.len())?;
1147 for p in &points {
1148 self.limits.check_vector_len(p.vector.len())?;
1149 self.limits.check_payload(&p.payload)?;
1150 }
1151 if let Some(c) = &self.cluster {
1152 return c.upsert_bulk(&collection, points).await;
1153 }
1154 let resource = collection.clone();
1155 let result = self
1156 .write_blocking(move |db| {
1157 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1158 .iter()
1159 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1160 .collect();
1161 db.upsert_bulk(&collection, &records)
1162 })
1163 .await;
1164 self.audit.record(
1165 principal.actor(),
1166 "upsert_bulk",
1167 &resource,
1168 Outcome::of(&result),
1169 );
1170 if self.mvcc && result.is_ok() {
1171 self.schedule_if_stale(&resource).await;
1172 }
1173 result
1174 }
1175
1176 #[tracing::instrument(skip_all)]
1180 pub(crate) async fn snapshot(
1181 &self,
1182 principal: &Principal,
1183 destination: String,
1184 ) -> Result<SnapshotInfo, Error> {
1185 self.ensure_writable("snapshot")?;
1186 self.authorize_global(principal, Action::Admin, "snapshot")?;
1187 let dest = std::path::PathBuf::from(&destination);
1188 let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1189 self.audit.record(
1190 principal.actor(),
1191 "snapshot",
1192 &destination,
1193 Outcome::of(&result),
1194 );
1195 result
1196 }
1197
1198 pub(crate) async fn delete_points(
1199 &self,
1200 principal: &Principal,
1201 collection: String,
1202 ids: Vec<String>,
1203 ) -> Result<u64, Error> {
1204 self.ensure_writable("delete_points")?;
1205 self.authorize(principal, Action::Write, "delete_points", &collection)?;
1206 if let Some(c) = &self.cluster {
1207 return c.delete_points(&collection, ids).await;
1208 }
1209 let resource = collection.clone();
1210 let result = self
1211 .write_blocking(move |db| {
1212 let mut count = 0u64;
1213 for id in &ids {
1214 if db.delete(&collection, id)? {
1215 count += 1;
1216 }
1217 }
1218 Ok(count)
1219 })
1220 .await;
1221 self.audit.record(
1222 principal.actor(),
1223 "delete_points",
1224 &resource,
1225 Outcome::of(&result),
1226 );
1227 if self.mvcc && result.is_ok() {
1228 self.schedule_if_stale(&resource).await;
1229 }
1230 result
1231 }
1232
1233 pub(crate) async fn get_points(
1234 &self,
1235 principal: &Principal,
1236 collection: String,
1237 ids: Vec<String>,
1238 with_vector: bool,
1239 ) -> Result<Vec<PointOut>, Error> {
1240 self.authorize(principal, Action::Read, "get_points", &collection)?;
1241 if let Some(c) = &self.cluster {
1242 return c.get_points(&collection, ids, with_vector).await;
1243 }
1244 self.read_blocking(move |db| {
1245 let mut out = Vec::new();
1246 for id in &ids {
1247 if let Some(m) = db.get(&collection, id)? {
1248 out.push(PointOut {
1249 id: m.id,
1250 vector: if with_vector { m.vector } else { None },
1251 payload: m.payload.unwrap_or(Value::Null),
1252 });
1253 }
1254 }
1255 Ok(out)
1256 })
1257 .await
1258 }
1259
1260 #[allow(clippy::too_many_arguments)]
1261 #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1262 pub(crate) async fn search(
1263 &self,
1264 principal: &Principal,
1265 collection: String,
1266 vector: Vec<f32>,
1267 k: usize,
1268 filter: Option<Filter>,
1269 ef_search: usize,
1270 with_payload: bool,
1271 with_vector: bool,
1272 ) -> Result<Vec<MatchOut>, Error> {
1273 self.authorize(principal, Action::Read, "search", &collection)?;
1274 self.limits.check_search(k, ef_search)?;
1275 self.limits.check_vector_len(vector.len())?;
1276
1277 if let Some(c) = &self.cluster {
1279 return c
1280 .search(
1281 &collection,
1282 vector,
1283 k,
1284 filter,
1285 ef_search,
1286 with_payload,
1287 with_vector,
1288 )
1289 .await;
1290 }
1291
1292 if filter.is_none()
1298 && !with_payload
1299 && !with_vector
1300 && let Some(cell) = self.cached_cell(&collection)
1301 {
1302 return tokio::task::spawn_blocking(move || {
1303 let matches = cell.load().search(&vector, k, ef_search)?;
1304 Ok::<_, quiver_embed::Error>(
1305 matches
1306 .into_iter()
1307 .map(|m| MatchOut {
1308 id: m.id,
1309 score: m.score,
1310 payload: None,
1311 vector: None,
1312 })
1313 .collect(),
1314 )
1315 })
1316 .await
1317 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1318 .map_err(Error::Engine);
1319 }
1320
1321 let params = SearchParams {
1322 k,
1323 filter,
1324 ef_search,
1325 with_payload,
1326 with_vector,
1327 };
1328 let coll = collection.clone();
1329 self.search_blocking(coll, move |db| {
1330 let matches = db.search_snapshot(&collection, &vector, ¶ms)?;
1331 Ok(matches
1332 .into_iter()
1333 .map(|m| MatchOut {
1334 id: m.id,
1335 score: m.score,
1336 payload: m.payload,
1337 vector: m.vector,
1338 })
1339 .collect())
1340 })
1341 .await
1342 }
1343
1344 #[allow(clippy::too_many_arguments)]
1345 pub(crate) async fn hybrid_search(
1346 &self,
1347 principal: &Principal,
1348 collection: String,
1349 dense: Option<Vec<f32>>,
1350 sparse: Option<(Vec<u32>, Vec<f32>)>,
1351 text: Option<String>,
1352 k: usize,
1353 filter: Option<Filter>,
1354 ef_search: usize,
1355 rrf_k0: f32,
1356 with_payload: bool,
1357 with_vector: bool,
1358 ) -> Result<Vec<MatchOut>, Error> {
1359 self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1360 self.limits.check_search(k, ef_search)?;
1361 if let Some(v) = &dense {
1362 self.limits.check_vector_len(v.len())?;
1363 }
1364 if let Some((indices, values)) = &sparse {
1365 self.limits.check_sparse_terms(indices.len())?;
1366 if indices.len() != values.len() {
1367 return Err(Error::BadRequest(format!(
1368 "sparse query indices ({}) and values ({}) length mismatch",
1369 indices.len(),
1370 values.len()
1371 )));
1372 }
1373 }
1374 let params = SearchParams {
1375 k,
1376 filter,
1377 ef_search,
1378 with_payload,
1379 with_vector,
1380 };
1381 let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1382 let coll = collection.clone();
1383 self.search_blocking(coll, move |db| {
1384 let matches = db.hybrid_search_snapshot(
1385 &collection,
1386 dense.as_deref(),
1387 sv.as_ref(),
1388 text.as_deref(),
1389 ¶ms,
1390 rrf_k0,
1391 )?;
1392 Ok(matches
1393 .into_iter()
1394 .map(|m| MatchOut {
1395 id: m.id,
1396 score: m.score,
1397 payload: m.payload,
1398 vector: m.vector,
1399 })
1400 .collect())
1401 })
1402 .await
1403 }
1404
1405 #[allow(clippy::too_many_arguments)]
1410 pub(crate) async fn search_text(
1411 &self,
1412 principal: &Principal,
1413 collection: String,
1414 text: String,
1415 k: usize,
1416 filter: Option<Filter>,
1417 ef_search: usize,
1418 rrf_k0: f32,
1419 with_payload: bool,
1420 with_vector: bool,
1421 rerank: bool,
1422 ) -> Result<Vec<MatchOut>, Error> {
1423 self.authorize(principal, Action::Read, "search_text", &collection)?;
1424 self.limits.check_search(k, ef_search)?;
1425 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1426 Error::BadRequest(format!(
1427 "collection {collection:?} has no embedding provider configured \
1428 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1429 ))
1430 })?;
1431 let query = text.clone();
1433 let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1434 .await
1435 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1436 .map_err(|e| Error::Upstream(e.to_string()))?
1437 .into_iter()
1438 .next()
1439 .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1440 self.limits.check_vector_len(vector.len())?;
1441
1442 let reranker = if rerank {
1443 self.embed.reranker(&collection)
1444 } else {
1445 None
1446 };
1447 let need_payload = with_payload || reranker.is_some();
1451 let fetch_k = if reranker.is_some() {
1452 k.max(RERANK_CANDIDATES)
1453 } else {
1454 k
1455 };
1456
1457 let mut hits = self
1458 .hybrid_search(
1459 principal,
1460 collection,
1461 Some(vector),
1462 None,
1463 Some(text.clone()),
1464 fetch_k,
1465 filter,
1466 ef_search,
1467 rrf_k0,
1468 need_payload,
1469 with_vector,
1470 )
1471 .await?;
1472
1473 if let Some(rr) = reranker {
1474 let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1475 let query = text;
1476 let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1477 .await
1478 .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1479 .map_err(|e| Error::Upstream(e.to_string()))?;
1480 let mut scored: Vec<(f32, MatchOut)> = scores
1482 .into_iter()
1483 .zip(hits)
1484 .map(|(s, mut h)| {
1485 h.score = s;
1486 (s, h)
1487 })
1488 .collect();
1489 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1490 hits = scored.into_iter().map(|(_, h)| h).collect();
1491 }
1492
1493 hits.truncate(k);
1494 if !with_payload {
1496 for h in &mut hits {
1497 h.payload = None;
1498 }
1499 }
1500 Ok(hits)
1501 }
1502
1503 pub(crate) async fn upsert_text(
1508 &self,
1509 principal: &Principal,
1510 collection: String,
1511 points: Vec<TextPointIn>,
1512 ) -> Result<u64, Error> {
1513 self.ensure_writable("upsert_text")?;
1514 self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1515 self.limits.check_batch(points.len())?;
1516 for p in &points {
1517 if !matches!(p.payload, Value::Object(_) | Value::Null) {
1518 return Err(Error::BadRequest(
1519 "upsert_text payload must be a JSON object or null".to_owned(),
1520 ));
1521 }
1522 }
1523 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1524 Error::BadRequest(format!(
1525 "collection {collection:?} has no embedding provider configured \
1526 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1527 ))
1528 })?;
1529 let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1530 let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1531 .await
1532 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1533 .map_err(|e| Error::Upstream(e.to_string()))?;
1534 if vectors.len() != points.len() {
1535 return Err(Error::Upstream(format!(
1536 "embedding provider returned {} vectors for {} inputs",
1537 vectors.len(),
1538 points.len()
1539 )));
1540 }
1541 let dense: Vec<PointIn> = points
1542 .into_iter()
1543 .zip(vectors)
1544 .map(|(p, vector)| {
1545 let mut payload = match p.payload {
1546 Value::Object(map) => map,
1547 _ => serde_json::Map::new(),
1548 };
1549 payload
1551 .entry(TEXT_KEY.to_owned())
1552 .or_insert_with(|| Value::String(p.text.clone()));
1553 PointIn {
1554 id: p.id,
1555 vector,
1556 payload: Value::Object(payload),
1557 }
1558 })
1559 .collect();
1560 self.upsert(principal, collection, dense).await
1561 }
1562
1563 #[allow(clippy::too_many_arguments)]
1564 pub(crate) async fn fetch(
1565 &self,
1566 principal: &Principal,
1567 collection: String,
1568 filter: Option<Filter>,
1569 offset: usize,
1570 limit: usize,
1571 with_payload: bool,
1572 with_vector: bool,
1573 ) -> Result<Vec<MatchOut>, Error> {
1574 self.authorize(principal, Action::Read, "fetch", &collection)?;
1575 self.limits.check_fetch(limit)?;
1576 self.read_blocking(move |db| {
1577 let matches = db.fetch(
1578 &collection,
1579 filter.as_ref(),
1580 offset,
1581 limit,
1582 with_payload,
1583 with_vector,
1584 )?;
1585 Ok(matches
1586 .into_iter()
1587 .map(|m| MatchOut {
1588 id: m.id,
1589 score: m.score,
1590 payload: m.payload,
1591 vector: m.vector,
1592 })
1593 .collect())
1594 })
1595 .await
1596 }
1597
1598 pub(crate) async fn upsert_documents(
1599 &self,
1600 principal: &Principal,
1601 collection: String,
1602 documents: Vec<DocumentIn>,
1603 ) -> Result<u64, Error> {
1604 self.ensure_writable("upsert_documents")?;
1605 self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1606 self.limits.check_batch(documents.len())?;
1607 for doc in &documents {
1608 self.limits.check_payload(&doc.payload)?;
1609 for token in &doc.vectors {
1610 self.limits.check_vector_len(token.len())?;
1611 }
1612 }
1613 let resource = collection.clone();
1614 let result = self
1615 .write_blocking(move |db| {
1616 let mut count = 0u64;
1617 for doc in &documents {
1618 db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1619 count += 1;
1620 }
1621 Ok(count)
1622 })
1623 .await;
1624 self.audit.record(
1625 principal.actor(),
1626 "upsert_documents",
1627 &resource,
1628 Outcome::of(&result),
1629 );
1630 result
1631 }
1632
1633 pub(crate) async fn delete_documents(
1634 &self,
1635 principal: &Principal,
1636 collection: String,
1637 ids: Vec<String>,
1638 ) -> Result<u64, Error> {
1639 self.ensure_writable("delete_documents")?;
1640 self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1641 let resource = collection.clone();
1642 let result = self
1643 .write_blocking(move |db| {
1644 let mut count = 0u64;
1645 for id in &ids {
1646 if db.delete_document(&collection, id)? {
1647 count += 1;
1648 }
1649 }
1650 Ok(count)
1651 })
1652 .await;
1653 self.audit.record(
1654 principal.actor(),
1655 "delete_documents",
1656 &resource,
1657 Outcome::of(&result),
1658 );
1659 result
1660 }
1661
1662 #[allow(clippy::too_many_arguments)]
1663 pub(crate) async fn search_multi_vector(
1664 &self,
1665 principal: &Principal,
1666 collection: String,
1667 query: Vec<Vec<f32>>,
1668 k: usize,
1669 filter: Option<Filter>,
1670 ef_search: usize,
1671 with_payload: bool,
1672 with_vector: bool,
1673 ) -> Result<Vec<DocumentMatchOut>, Error> {
1674 self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1675 self.limits.check_search(k, ef_search)?;
1676 for token in &query {
1677 self.limits.check_vector_len(token.len())?;
1678 }
1679 let params = SearchParams {
1680 k,
1681 filter,
1682 ef_search,
1683 with_payload,
1684 with_vector,
1685 };
1686 let coll = collection.clone();
1687 self.search_blocking(coll, move |db| {
1688 let matches = db.search_multi_vector_snapshot(&collection, &query, ¶ms)?;
1689 Ok(matches
1690 .into_iter()
1691 .map(|m| DocumentMatchOut {
1692 id: m.id,
1693 score: m.score,
1694 payload: m.payload,
1695 vectors: m.vectors,
1696 })
1697 .collect())
1698 })
1699 .await
1700 }
1701}
1702
1703const REPLICATION_BUFFER: usize = 1024;
1706
1707pub async fn run(config: Config) -> Result<(), Error> {
1709 config.validate()?;
1710 let rest_listener = TcpListener::bind(config.rest_addr)
1711 .await
1712 .map_err(Error::Io)?;
1713 let grpc_listener = TcpListener::bind(config.grpc_addr)
1714 .await
1715 .map_err(Error::Io)?;
1716 tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1717 tokio::select! {
1718 result = serve(config, rest_listener, grpc_listener) => result,
1719 () = shutdown_signal() => {
1720 tracing::info!("shutdown signal received");
1721 Ok(())
1722 }
1723 }
1724}
1725
1726pub async fn serve(
1729 config: Config,
1730 rest_listener: TcpListener,
1731 grpc_listener: TcpListener,
1732) -> Result<(), Error> {
1733 if config.coordinator {
1737 drop(grpc_listener);
1738 return coordinator::serve_coordinator(config, rest_listener).await;
1739 }
1740 let mut db = open_database(&config)?;
1741 let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1742 let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1746 {
1747 let tx = replication_tx.clone();
1748 db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1749 let _ = tx.send(entry.clone());
1750 }));
1751 }
1752 let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1756 .map_err(|e| Error::Config(e.to_string()))?;
1757
1758 if config.mvcc_reads {
1761 db.set_mvcc_reads(true);
1762 }
1763 let mvcc = db.mvcc_reads();
1764
1765 let cluster = if config.cluster_shards.is_empty() {
1767 None
1768 } else {
1769 let c = cluster::Cluster::new(
1770 config.cluster_shards.clone(),
1771 config.cluster_replicas.clone(),
1772 config.cluster_shard_key.clone(),
1773 )?;
1774 tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
1775 let c = Arc::new(c);
1776 if let Some(coord) = config.coordinator_url.clone() {
1780 let router = c.clone();
1781 tokio::spawn(async move {
1782 loop {
1783 if let Err(e) = router.refresh_from(&coord).await {
1784 tracing::debug!(error = %e, "shard-map refresh failed; will retry");
1785 }
1786 tokio::time::sleep(cluster::MAP_REFRESH_INTERVAL).await;
1787 }
1788 });
1789 }
1790 Some(c)
1791 };
1792
1793 let state = AppState {
1794 db: Arc::new(RwLock::new(db)),
1795 keys: Arc::new(config.api_keys.clone()),
1796 audit,
1797 replication_tx,
1798 read_only: config.leader_url.is_some(),
1799 limits: config.limits,
1800 embed: Arc::new(embed),
1801 rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
1802 metrics: Arc::new(metrics::Metrics::default()),
1803 rebuilding: Arc::new(Mutex::new(HashSet::new())),
1804 snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
1805 mvcc,
1806 cluster,
1807 };
1808
1809 if let Some(leader_url) = config.leader_url.clone() {
1811 replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
1812 }
1813
1814 let app = rest::router(state.clone());
1815 let grpc = grpc::service(state);
1816
1817 let tls = load_tls(&config)?;
1818
1819 let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
1821 Some(material) => {
1822 let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
1823 let std_listener = rest_listener.into_std().map_err(Error::Io)?;
1824 let server =
1825 axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
1826 Box::pin(async move {
1827 server
1828 .serve(app.into_make_service())
1829 .await
1830 .map_err(Error::Io)
1831 })
1832 }
1833 None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
1834 };
1835
1836 let mut grpc_builder = tonic::transport::Server::builder();
1838 if let Some(material) = &tls {
1839 let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
1840 let mut tls_config = ServerTlsConfig::new().identity(identity);
1841 if let Some(ca_pem) = &material.client_ca_pem {
1843 tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
1844 }
1845 grpc_builder = grpc_builder
1846 .tls_config(tls_config)
1847 .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
1848 }
1849 let grpc_fut = async move {
1850 grpc_builder
1851 .add_service(grpc)
1852 .serve_with_incoming(TcpListenerStream::new(grpc_listener))
1853 .await
1854 .map_err(|e| Error::Internal(format!("grpc server: {e}")))
1855 };
1856
1857 tokio::try_join!(rest_fut, grpc_fut)?;
1858 Ok(())
1859}
1860
1861async fn shutdown_signal() {
1862 let _ = tokio::signal::ctrl_c().await;
1863}
1864
1865fn open_database(config: &Config) -> Result<Database, Error> {
1871 let master_key = config.master_key_hex()?;
1872 let keyring =
1873 quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
1874 .map_err(|e| Error::Config(e.to_string()))?;
1875 let db = match keyring {
1876 Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
1877 None => Database::open(&config.data_dir)?,
1878 };
1879 Ok(db)
1880}
1881
1882struct TlsMaterial {
1886 cert_pem: Vec<u8>,
1887 key_pem: Vec<u8>,
1888 client_ca_pem: Option<Vec<u8>>,
1889 rest_config: Arc<rustls::ServerConfig>,
1890}
1891
1892fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
1897 match (&config.tls_cert, &config.tls_key) {
1898 (Some(cert_path), Some(key_path)) => {
1899 let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
1900 let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
1901 let client_ca_pem = config
1902 .tls_client_ca
1903 .as_ref()
1904 .map(std::fs::read)
1905 .transpose()
1906 .map_err(Error::Io)?;
1907 let rest_config = Arc::new(rustls_server_config(
1908 &cert_pem,
1909 &key_pem,
1910 client_ca_pem.as_deref(),
1911 )?);
1912 Ok(Some(TlsMaterial {
1913 cert_pem,
1914 key_pem,
1915 client_ca_pem,
1916 rest_config,
1917 }))
1918 }
1919 (None, None) => Ok(None),
1920 _ => Err(Error::Config(
1921 "tls_cert and tls_key must be set together".to_owned(),
1922 )),
1923 }
1924}
1925
1926fn rustls_server_config(
1930 cert_pem: &[u8],
1931 key_pem: &[u8],
1932 client_ca_pem: Option<&[u8]>,
1933) -> Result<rustls::ServerConfig, Error> {
1934 use rustls_pki_types::pem::PemObject;
1935 use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1936
1937 let certs = CertificateDer::pem_slice_iter(cert_pem)
1938 .collect::<std::result::Result<Vec<_>, _>>()
1939 .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
1940 if certs.is_empty() {
1941 return Err(Error::Config(
1942 "tls_cert contains no certificates".to_owned(),
1943 ));
1944 }
1945 let key = PrivateKeyDer::from_pem_slice(key_pem)
1946 .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
1947 let provider = Arc::new(rustls::crypto::ring::default_provider());
1948 let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
1949 .with_safe_default_protocol_versions()
1950 .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
1951 let builder = match client_ca_pem {
1952 Some(ca_pem) => {
1953 let mut roots = rustls::RootCertStore::empty();
1954 for cert in CertificateDer::pem_slice_iter(ca_pem) {
1955 let cert =
1956 cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
1957 roots
1958 .add(cert)
1959 .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
1960 }
1961 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
1962 Arc::new(roots),
1963 provider,
1964 )
1965 .build()
1966 .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
1967 builder.with_client_cert_verifier(verifier)
1968 }
1969 None => builder.with_no_client_auth(),
1970 };
1971 builder
1972 .with_single_cert(certs, key)
1973 .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
1974}
1975
1976pub fn init_tracing() {
1979 init_observability(&Config::default());
1980}
1981
1982#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
1988pub fn init_observability(config: &Config) {
1989 use tracing_subscriber::EnvFilter;
1990 use tracing_subscriber::prelude::*;
1991 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
1992 let registry = tracing_subscriber::registry()
1993 .with(filter)
1994 .with(tracing_subscriber::fmt::layer());
1995
1996 #[cfg(feature = "otlp")]
1997 if config.otlp.is_enabled() {
1998 match otlp::build_provider(&config.otlp) {
1999 Ok(provider) => {
2000 use opentelemetry::trace::TracerProvider as _;
2001 let tracer = provider.tracer("quiver");
2002 otlp::store_provider(provider);
2003 let _ = registry
2004 .with(tracing_opentelemetry::layer().with_tracer(tracer))
2005 .try_init();
2006 return;
2007 }
2008 Err(e) => eprintln!("OTLP traces export disabled: {e}"),
2009 }
2010 }
2011
2012 let _ = registry.try_init();
2013}
2014
2015pub fn shutdown_observability() {
2019 #[cfg(feature = "otlp")]
2020 otlp::shutdown();
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025 use super::*;
2026
2027 const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
2029
2030 #[test]
2031 fn config_rejects_missing_keys_unless_insecure() {
2032 let mut config = Config::default();
2033 assert!(config.validate().is_err());
2034 config.insecure = true;
2035 assert!(config.validate().is_ok());
2036 config.insecure = false;
2037 config.api_keys = vec!["secret".into()];
2038 config.encryption_key = Some(TEST_KEY.to_owned());
2039 assert!(config.validate().is_ok());
2040 }
2041
2042 #[test]
2043 fn config_requires_encryption_key_unless_insecure() {
2044 let mut config = Config {
2045 api_keys: vec!["secret".into()],
2046 ..Config::default()
2047 };
2048 assert!(config.validate().is_err());
2050 config.encryption_key = Some(TEST_KEY.to_owned());
2051 assert!(config.validate().is_ok());
2052 config.encryption_key = Some("not-a-valid-hex-key".to_owned());
2054 assert!(config.validate().is_err());
2055 config.insecure = true;
2057 config.encryption_key = None;
2058 assert!(config.validate().is_ok());
2059 }
2060
2061 #[test]
2062 fn master_key_file_is_an_alternative_to_the_env_key() {
2063 let dir = tempfile::tempdir().unwrap();
2064 let path = dir.path().join("master.key");
2065 std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2067
2068 let mut config = Config {
2069 api_keys: vec!["secret".into()],
2070 master_key_file: Some(path.clone()),
2071 ..Config::default()
2072 };
2073 assert!(config.validate().is_ok());
2075 assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2076
2077 config.encryption_key = Some(TEST_KEY.to_owned());
2079 assert!(config.validate().is_err());
2080
2081 config.encryption_key = None;
2083 std::fs::write(&path, "not-a-valid-key").unwrap();
2084 assert!(config.validate().is_err());
2085 }
2086
2087 #[test]
2088 fn config_rejects_public_bind_without_optout() {
2089 let mut config = Config {
2090 api_keys: vec!["secret".into()],
2091 encryption_key: Some(TEST_KEY.to_owned()),
2092 ..Config::default()
2093 };
2094 config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2095 assert!(config.validate().is_err());
2097 config.insecure = true;
2098 assert!(config.validate().is_ok());
2099 }
2100
2101 #[test]
2102 fn config_public_bind_allowed_with_tls() {
2103 let config = Config {
2104 api_keys: vec!["secret".into()],
2105 encryption_key: Some(TEST_KEY.to_owned()),
2106 tls_cert: Some(PathBuf::from("cert.pem")),
2107 tls_key: Some(PathBuf::from("key.pem")),
2108 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2109 ..Config::default()
2110 };
2111 assert!(config.validate().is_ok());
2113 }
2114
2115 #[test]
2116 fn config_tls_cert_and_key_must_pair() {
2117 let mut config = Config {
2118 api_keys: vec!["secret".into()],
2119 encryption_key: Some(TEST_KEY.to_owned()),
2120 tls_cert: Some(PathBuf::from("cert.pem")),
2121 ..Config::default()
2122 };
2123 assert!(config.validate().is_err());
2125 config.tls_key = Some(PathBuf::from("key.pem"));
2126 assert!(config.validate().is_ok());
2127 }
2128}