1mod audit;
33mod auth;
34mod cluster;
35mod coordinator;
36mod error;
37mod grpc;
38mod metrics;
39mod otlp;
40#[cfg(feature = "raft")]
41pub mod raft;
42mod rate_limit;
43mod replication;
44mod rest;
45
46use std::collections::{HashMap, HashSet};
47use std::future::Future;
48use std::net::{IpAddr, Ipv4Addr, SocketAddr};
49use std::path::PathBuf;
50use std::pin::Pin;
51use std::sync::{Arc, Mutex, RwLock};
52
53use axum_server::tls_rustls::RustlsConfig;
54use figment::Figment;
55use figment::providers::{Env, Format, Serialized, Toml};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use tokio::net::TcpListener;
59use tokio::sync::broadcast;
60use tokio_stream::wrappers::TcpListenerStream;
61use tonic::transport::{Certificate, Identity, ServerTlsConfig};
62
63use quiver_crypto::AeadCodec;
64use quiver_embed::{
65 Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
66 SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
67};
68use quiver_query::Filter;
69
70pub use auth::{Action, ApiKey, CollectionScope};
71pub use error::Error;
72pub use otlp::OtlpConfig;
73pub use quiver_providers::{
76 EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
77 RerankProvider,
78};
79pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
80
81use audit::{AuditLog, Outcome};
82use auth::Principal;
83
84#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
92#[serde(default)]
93pub struct Limits {
94 pub max_k: usize,
96 pub max_ef_search: usize,
98 pub max_fetch_limit: usize,
100 pub max_vector_dim: usize,
103 pub max_payload_bytes: usize,
105 pub max_batch_size: usize,
107 pub max_request_body_bytes: usize,
110 pub max_sparse_terms: usize,
113 pub max_bulk_batch_size: usize,
118}
119
120impl Default for Limits {
121 fn default() -> Self {
122 Self {
123 max_k: 10_000,
124 max_ef_search: 4_096,
125 max_fetch_limit: 10_000,
126 max_vector_dim: 8_192,
127 max_payload_bytes: 65_536,
128 max_batch_size: 1_000,
129 max_request_body_bytes: 32 * 1024 * 1024,
130 max_sparse_terms: 4_096,
131 max_bulk_batch_size: 50_000,
132 }
133 }
134}
135
136impl Limits {
137 fn apply_env_overrides(&mut self) -> Result<(), Error> {
141 let slots: [(&str, &mut usize); 9] = [
142 ("QUIVER_MAX_K", &mut self.max_k),
143 ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
144 ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
145 ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
146 ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
147 ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
148 (
149 "QUIVER_MAX_REQUEST_BODY_BYTES",
150 &mut self.max_request_body_bytes,
151 ),
152 ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
153 ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
154 ];
155 for (key, slot) in slots {
156 if let Ok(raw) = std::env::var(key) {
157 *slot = raw.parse().map_err(|_| {
158 Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
159 })?;
160 }
161 }
162 Ok(())
163 }
164
165 fn validate(&self) -> Result<(), Error> {
167 let named = [
168 ("max_k", self.max_k),
169 ("max_ef_search", self.max_ef_search),
170 ("max_fetch_limit", self.max_fetch_limit),
171 ("max_vector_dim", self.max_vector_dim),
172 ("max_payload_bytes", self.max_payload_bytes),
173 ("max_batch_size", self.max_batch_size),
174 ("max_request_body_bytes", self.max_request_body_bytes),
175 ("max_sparse_terms", self.max_sparse_terms),
176 ("max_bulk_batch_size", self.max_bulk_batch_size),
177 ];
178 if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
179 return Err(Error::Config(format!(
180 "limits.{name} must be greater than zero"
181 )));
182 }
183 Ok(())
184 }
185
186 fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
187 if k > self.max_k {
188 return Err(Error::BadRequest(format!(
189 "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
190 self.max_k
191 )));
192 }
193 if ef_search > self.max_ef_search {
194 return Err(Error::BadRequest(format!(
195 "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
196 self.max_ef_search
197 )));
198 }
199 Ok(())
200 }
201
202 fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
203 if n > self.max_sparse_terms {
204 return Err(Error::BadRequest(format!(
205 "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
206 self.max_sparse_terms
207 )));
208 }
209 Ok(())
210 }
211
212 fn check_fetch(&self, limit: usize) -> Result<(), Error> {
213 if limit > self.max_fetch_limit {
214 return Err(Error::BadRequest(format!(
215 "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
216 self.max_fetch_limit
217 )));
218 }
219 Ok(())
220 }
221
222 fn check_dim(&self, dim: usize) -> Result<(), Error> {
223 if dim > self.max_vector_dim {
224 return Err(Error::BadRequest(format!(
225 "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
226 self.max_vector_dim
227 )));
228 }
229 Ok(())
230 }
231
232 fn check_vector_len(&self, len: usize) -> Result<(), Error> {
233 if len > self.max_vector_dim {
234 return Err(Error::BadRequest(format!(
235 "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
236 self.max_vector_dim
237 )));
238 }
239 Ok(())
240 }
241
242 fn check_batch(&self, n: usize) -> Result<(), Error> {
243 if n > self.max_batch_size {
244 return Err(Error::BadRequest(format!(
245 "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
246 self.max_batch_size
247 )));
248 }
249 Ok(())
250 }
251
252 fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
253 if n > self.max_bulk_batch_size {
254 return Err(Error::BadRequest(format!(
255 "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
256 self.max_bulk_batch_size
257 )));
258 }
259 Ok(())
260 }
261
262 fn check_payload(&self, payload: &Value) -> Result<(), Error> {
263 let size = serde_json::to_vec(payload)
264 .map(|v| v.len())
265 .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
266 if size > self.max_payload_bytes {
267 return Err(Error::BadRequest(format!(
268 "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
269 self.max_payload_bytes
270 )));
271 }
272 Ok(())
273 }
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
279#[serde(default)]
280pub struct Config {
281 pub data_dir: PathBuf,
283 pub rest_addr: SocketAddr,
285 pub grpc_addr: SocketAddr,
287 #[serde(default, deserialize_with = "auth::de_api_keys")]
293 pub api_keys: Vec<ApiKey>,
294 pub encryption_key: Option<String>,
302 pub master_key_file: Option<PathBuf>,
310 pub tls_cert: Option<PathBuf>,
313 pub tls_key: Option<PathBuf>,
316 pub tls_client_ca: Option<PathBuf>,
321 pub audit_log: Option<PathBuf>,
327 pub leader_url: Option<String>,
333 pub leader_api_key: Option<String>,
336 pub insecure: bool,
340 pub limits: Limits,
343 #[serde(default)]
349 pub embedding: HashMap<String, EmbeddingConfig>,
350 #[serde(default)]
354 pub rerank: HashMap<String, RerankConfig>,
355 #[serde(default)]
359 pub rate_limit: RateLimitConfig,
360 #[serde(default)]
364 pub otlp: OtlpConfig,
365 #[serde(default)]
370 pub mvcc_reads: bool,
371 #[serde(default)]
378 pub cluster_shards: Vec<String>,
379 #[serde(default)]
387 pub cluster_replicas: Vec<String>,
388 #[serde(default)]
391 pub cluster_shard_key: Option<String>,
392 #[serde(default)]
398 pub coordinator: bool,
399 #[serde(default)]
404 pub coordinator_url: Option<String>,
405 #[serde(default)]
409 pub coordinator_state: Option<PathBuf>,
410 #[serde(default)]
419 pub raft_node_id: Option<u64>,
420 #[serde(default)]
428 pub raft_members: Vec<String>,
429}
430
431impl Default for Config {
432 fn default() -> Self {
433 Self {
434 data_dir: PathBuf::from("./quiver-data"),
435 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
436 grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
437 api_keys: Vec::new(),
438 encryption_key: None,
439 master_key_file: None,
440 tls_cert: None,
441 tls_key: None,
442 tls_client_ca: None,
443 audit_log: None,
444 leader_url: None,
445 leader_api_key: None,
446 insecure: false,
447 limits: Limits::default(),
448 embedding: HashMap::new(),
449 rerank: HashMap::new(),
450 rate_limit: RateLimitConfig::default(),
451 otlp: OtlpConfig::default(),
452 mvcc_reads: false,
453 cluster_shards: Vec::new(),
454 cluster_replicas: Vec::new(),
455 cluster_shard_key: None,
456 coordinator: false,
457 coordinator_url: None,
458 coordinator_state: None,
459 raft_node_id: None,
460 raft_members: Vec::new(),
461 }
462 }
463}
464
465impl Config {
466 pub fn load() -> Result<Self, Error> {
469 let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
470 .merge(Toml::file("quiver.toml"))
471 .merge(Env::prefixed("QUIVER_"))
472 .extract()
473 .map_err(|e| Error::Config(e.to_string()))?;
474 config.limits.apply_env_overrides()?;
477 config
479 .rate_limit
480 .apply_env_overrides()
481 .map_err(Error::Config)?;
482 config.otlp.apply_env_overrides().map_err(Error::Config)?;
484 Ok(config)
485 }
486
487 pub fn validate(&self) -> Result<(), Error> {
491 if self.api_keys.is_empty() && !self.insecure {
492 return Err(Error::Config(
493 "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
494 set insecure=true for local development"
495 .to_owned(),
496 ));
497 }
498 let master_key = self.master_key_hex()?;
500 if master_key.is_none() && !self.insecure {
501 return Err(Error::Config(
502 "no encryption key configured: encryption-at-rest is on by default — \
503 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
504 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
505 store data unencrypted (development only)"
506 .to_owned(),
507 ));
508 }
509 if let Some(key) = &master_key {
511 AeadCodec::from_hex(key)
512 .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
513 }
514 if self.tls_cert.is_some() != self.tls_key.is_some() {
516 return Err(Error::Config(
517 "tls_cert and tls_key must be set together".to_owned(),
518 ));
519 }
520 if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
522 return Err(Error::Config(
523 "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
524 ));
525 }
526 let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
527 let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
528 if non_loopback && !tls_enabled && !self.insecure {
529 return Err(Error::Config(
530 "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
531 or insecure=true for local development"
532 .to_owned(),
533 ));
534 }
535 self.limits.validate()?;
537 Ok(())
538 }
539
540 pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
550 let env_key = self
551 .encryption_key
552 .as_deref()
553 .map(str::trim)
554 .filter(|k| !k.is_empty());
555 match (&self.master_key_file, env_key) {
556 (Some(_), Some(_)) => Err(Error::Config(
557 "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
558 (QUIVER_MASTER_KEY_FILE), not both"
559 .to_owned(),
560 )),
561 (Some(path), None) => {
562 warn_if_world_readable(path);
563 let hex = std::fs::read_to_string(path).map_err(|e| {
564 Error::Config(format!("reading master_key_file {}: {e}", path.display()))
565 })?;
566 Ok(Some(hex.trim().to_owned()))
567 }
568 (None, Some(key)) => Ok(Some(key.to_owned())),
569 (None, None) => Ok(None),
570 }
571 }
572}
573
574#[cfg(unix)]
577fn warn_if_world_readable(path: &std::path::Path) {
578 use std::os::unix::fs::PermissionsExt;
579 if let Ok(meta) = std::fs::metadata(path)
580 && meta.permissions().mode() & 0o077 != 0
581 {
582 tracing::warn!(
583 path = %path.display(),
584 mode = format!("{:o}", meta.permissions().mode() & 0o777),
585 "master key file is group/world-accessible; restrict it to 0600"
586 );
587 }
588}
589
590#[cfg(not(unix))]
591fn warn_if_world_readable(_path: &std::path::Path) {}
592
593#[derive(Clone)]
596pub(crate) struct AppState {
597 db: Arc<RwLock<Database>>,
602 keys: Arc<Vec<ApiKey>>,
603 audit: Arc<AuditLog>,
604 replication_tx: broadcast::Sender<WalEntry>,
607 read_only: bool,
610 limits: Limits,
613 embed: Arc<EmbedRegistry>,
617 rate_limiter: Arc<RateLimiter>,
619 metrics: Arc<metrics::Metrics>,
621 rebuilding: Arc<Mutex<HashSet<String>>>,
625 snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
632 mvcc: bool,
637 cluster: Option<Arc<cluster::Cluster>>,
641 #[cfg(feature = "raft")]
647 raft: Option<Arc<raft::RaftShard>>,
648}
649
650pub(crate) struct CollectionInfo {
652 pub name: String,
653 pub dim: u32,
654 pub metric: DistanceMetric,
655 pub count: u64,
656 pub index: IndexSpec,
657 pub filterable: Vec<FilterableField>,
658 pub multivector: bool,
659 pub vector_encryption: VectorEncryption,
660}
661
662pub(crate) struct PointIn {
664 pub id: String,
665 pub vector: Vec<f32>,
666 pub payload: Value,
667}
668
669pub(crate) struct TextPointIn {
671 pub id: String,
672 pub text: String,
673 pub payload: Value,
674}
675
676const RERANK_CANDIDATES: usize = 50;
679
680fn doc_text(payload: Option<&Value>) -> String {
684 match payload {
685 Some(Value::Object(map)) => map
686 .get(TEXT_KEY)
687 .and_then(Value::as_str)
688 .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
689 Some(v) => v.to_string(),
690 None => String::new(),
691 }
692}
693
694pub(crate) struct PointOut {
696 pub id: String,
697 pub vector: Option<Vec<f32>>,
698 pub payload: Value,
699}
700
701pub(crate) struct MatchOut {
703 pub id: String,
704 pub score: f32,
705 pub payload: Option<Value>,
706 pub vector: Option<Vec<f32>>,
707}
708
709pub(crate) struct DocumentIn {
711 pub id: String,
712 pub vectors: Vec<Vec<f32>>,
713 pub payload: Value,
714}
715
716pub(crate) struct DocumentMatchOut {
718 pub id: String,
719 pub score: f32,
720 pub payload: Option<Value>,
721 pub vectors: Option<Vec<Vec<f32>>>,
722}
723
724impl AppState {
725 pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
729 auth::authenticate(&self.keys, presented)
730 }
731
732 pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
736 self.rate_limiter.check(actor)
737 }
738
739 pub(crate) fn rate_limit_enabled(&self) -> bool {
742 self.rate_limiter.enabled()
743 }
744
745 async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
749 where
750 T: Send + 'static,
751 F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
752 {
753 let db = Arc::clone(&self.db);
754 tokio::task::spawn_blocking(move || -> Result<T, Error> {
755 let mut guard = db
756 .write()
757 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
758 f(&mut guard).map_err(Error::Engine)
759 })
760 .await
761 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
762 }
763
764 async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
768 where
769 T: Send + 'static,
770 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
771 {
772 let db = Arc::clone(&self.db);
773 tokio::task::spawn_blocking(move || -> Result<T, Error> {
774 let guard = db
775 .read()
776 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
777 f(&guard).map_err(Error::Engine)
778 })
779 .await
780 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
781 }
782
783 async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
790 where
791 T: Send + 'static,
792 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
793 {
794 let db = Arc::clone(&self.db);
795 let cells = Arc::clone(&self.snapshot_cells);
796 let coll = collection.clone();
797 let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
798 let guard = db
799 .read()
800 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
801 let result = f(&guard).map_err(Error::Engine)?;
802 let stale = guard.needs_rebuild(&coll).unwrap_or(false);
805 if !stale
810 && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
811 && let Ok(mut map) = cells.write()
812 {
813 map.entry(coll.clone()).or_insert(cell);
814 }
815 Ok((result, stale))
816 })
817 .await
818 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
819 if stale {
820 self.schedule_rebuild(collection);
821 }
822 Ok(result)
823 }
824
825 fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
829 self.snapshot_cells.read().ok()?.get(collection).cloned()
830 }
831
832 async fn schedule_if_stale(&self, collection: &str) {
838 let db = Arc::clone(&self.db);
839 let coll = collection.to_owned();
840 let stale = tokio::task::spawn_blocking(move || {
841 db.read()
842 .ok()
843 .and_then(|g| g.needs_rebuild(&coll).ok())
844 .unwrap_or(false)
845 })
846 .await
847 .unwrap_or(false);
848 if stale {
849 self.schedule_rebuild(collection.to_owned());
850 }
851 }
852
853 fn schedule_rebuild(&self, collection: String) {
857 {
858 let mut inflight = match self.rebuilding.lock() {
859 Ok(g) => g,
860 Err(_) => return,
861 };
862 if !inflight.insert(collection.clone()) {
863 return; }
865 }
866 let state = self.clone();
867 tokio::spawn(async move {
868 state.run_rebuild(&collection).await;
869 if let Ok(mut inflight) = state.rebuilding.lock() {
870 inflight.remove(&collection);
871 }
872 });
873 }
874
875 async fn run_rebuild(&self, collection: &str) {
882 loop {
883 let db = Arc::clone(&self.db);
884 let coll = collection.to_owned();
885 let inputs = tokio::task::spawn_blocking(move || {
886 let guard = db.read().ok()?;
887 guard.snapshot_rebuild_inputs(&coll).ok().flatten()
888 })
889 .await
890 .ok()
891 .flatten();
892 let Some(inputs) = inputs else { return };
893
894 let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
895 return;
896 };
897
898 let db = Arc::clone(&self.db);
899 let still_stale = tokio::task::spawn_blocking(move || {
900 let mut guard = db.write().ok()?;
901 guard.commit_rebuild(rebuilt).ok()
902 })
903 .await
904 .ok()
905 .flatten();
906 match still_stale {
907 Some(true) => continue, _ => return,
909 }
910 }
911 }
912
913 fn authorize(
916 &self,
917 principal: &Principal,
918 action: Action,
919 op: &str,
920 resource: &str,
921 ) -> Result<(), Error> {
922 principal
923 .require(action, Some(resource))
924 .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
925 }
926
927 fn authorize_global(
930 &self,
931 principal: &Principal,
932 action: Action,
933 op: &str,
934 ) -> Result<(), Error> {
935 principal
936 .require(action, None)
937 .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
938 }
939
940 pub(crate) async fn open_replication(
947 &self,
948 principal: &Principal,
949 ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
950 self.authorize_global(principal, Action::Admin, "replicate")?;
951 let tx = self.replication_tx.clone();
952 self.read_blocking(move |db| {
953 let rx = tx.subscribe();
954 let snapshot = db.replication_snapshot()?;
955 Ok((snapshot, rx))
956 })
957 .await
958 }
959
960 pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
964 self.write_blocking(move |db| db.apply_replicated(op)).await
965 }
966
967 #[cfg(feature = "raft")]
974 fn ensure_raft_leader(&self, rs: &raft::RaftShard) -> Result<(), Error> {
975 let leader = rs.raft.metrics().borrow().current_leader;
976 if leader == Some(rs.node_id) {
977 Ok(())
978 } else {
979 Err(Error::NotLeader {
980 leader: leader.and_then(|id| rs.member_url(id)),
981 })
982 }
983 }
984
985 pub(crate) async fn raft_add_voter(
989 &self,
990 principal: &Principal,
991 id: u64,
992 url: String,
993 ) -> Result<(), Error> {
994 self.authorize_global(principal, Action::Admin, "raft_add_voter")?;
995 #[cfg(feature = "raft")]
996 {
997 let rs = self
998 .raft
999 .clone()
1000 .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1001 return rs
1002 .add_voter(id, url)
1003 .await
1004 .map_err(|e| Error::Internal(format!("add raft voter: {e}")));
1005 }
1006 #[cfg(not(feature = "raft"))]
1007 {
1008 let _ = (id, url);
1009 Err(Error::BadRequest(
1010 "server built without the raft feature".to_owned(),
1011 ))
1012 }
1013 }
1014
1015 pub(crate) async fn raft_remove_voter(
1018 &self,
1019 principal: &Principal,
1020 id: u64,
1021 ) -> Result<(), Error> {
1022 self.authorize_global(principal, Action::Admin, "raft_remove_voter")?;
1023 #[cfg(feature = "raft")]
1024 {
1025 let rs = self
1026 .raft
1027 .clone()
1028 .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1029 return rs
1030 .remove_voter(id)
1031 .await
1032 .map_err(|e| Error::Internal(format!("remove raft voter: {e}")));
1033 }
1034 #[cfg(not(feature = "raft"))]
1035 {
1036 let _ = id;
1037 Err(Error::BadRequest(
1038 "server built without the raft feature".to_owned(),
1039 ))
1040 }
1041 }
1042
1043 #[cfg(feature = "raft")]
1047 async fn raft_propose(&self, rs: &raft::RaftShard, op: WalOp) -> Result<(), Error> {
1048 rs.raft
1049 .client_write(op)
1050 .await
1051 .map(|_| ())
1052 .map_err(map_client_write_err)
1053 }
1054
1055 #[cfg(feature = "raft")]
1059 async fn raft_propose_all(&self, rs: &raft::RaftShard, ops: Vec<WalOp>) -> Result<u64, Error> {
1060 let mut committed = 0u64;
1061 for op in ops {
1062 self.raft_propose(rs, op).await?;
1063 committed += 1;
1064 }
1065 Ok(committed)
1066 }
1067
1068 fn ensure_writable(&self, op: &str) -> Result<(), Error> {
1071 if self.read_only {
1072 return Err(Error::Forbidden(format!(
1073 "{op}: this node is a read-only replication follower"
1074 )));
1075 }
1076 Ok(())
1077 }
1078
1079 #[allow(clippy::too_many_arguments)]
1080 pub(crate) async fn create_collection(
1081 &self,
1082 principal: &Principal,
1083 name: String,
1084 dim: u32,
1085 metric: DistanceMetric,
1086 index: IndexSpec,
1087 filterable: Vec<FilterableField>,
1088 multivector: bool,
1089 vector_encryption: VectorEncryption,
1090 ) -> Result<CollectionInfo, Error> {
1091 self.ensure_writable("create_collection")?;
1092 self.authorize(principal, Action::Admin, "create_collection", &name)?;
1093 self.limits.check_dim(dim as usize)?;
1094 if let Some(c) = &self.cluster {
1095 return c
1096 .create_collection(
1097 name,
1098 dim,
1099 metric,
1100 index,
1101 filterable,
1102 multivector,
1103 vector_encryption,
1104 )
1105 .await;
1106 }
1107 #[cfg(feature = "raft")]
1112 if let Some(rs) = self.raft.clone() {
1113 self.ensure_raft_leader(&rs)?;
1114 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1115 .with_index(index)
1116 .with_filterable(filterable.clone())
1117 .with_multivector(multivector)
1118 .with_vector_encryption(vector_encryption);
1119 let _guard = rs.create_lock.lock().await;
1120 let prep_name = name.clone();
1121 let result = match self
1122 .read_blocking(move |db| db.prepare_create_collection(&prep_name, &descriptor))
1123 .await
1124 {
1125 Ok(op) => self.raft_propose(&rs, op).await,
1126 Err(e) => Err(e),
1127 };
1128 self.audit.record(
1129 principal.actor(),
1130 "create_collection",
1131 &name,
1132 Outcome::of(&result),
1133 );
1134 result?;
1135 return Ok(CollectionInfo {
1136 name,
1137 dim,
1138 metric,
1139 count: 0,
1140 index,
1141 filterable,
1142 multivector,
1143 vector_encryption,
1144 });
1145 }
1146 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1147 .with_index(index)
1148 .with_filterable(filterable.clone())
1149 .with_multivector(multivector)
1150 .with_vector_encryption(vector_encryption);
1151 let owned = name.clone();
1152 let result = self
1153 .write_blocking(move |db| db.create_collection(&owned, descriptor))
1154 .await;
1155 self.audit.record(
1156 principal.actor(),
1157 "create_collection",
1158 &name,
1159 Outcome::of(&result),
1160 );
1161 result?;
1162 Ok(CollectionInfo {
1163 name,
1164 dim,
1165 metric,
1166 count: 0,
1167 index,
1168 filterable,
1169 multivector,
1170 vector_encryption,
1171 })
1172 }
1173
1174 pub(crate) async fn get_collection(
1175 &self,
1176 principal: &Principal,
1177 name: String,
1178 ) -> Result<CollectionInfo, Error> {
1179 self.authorize(principal, Action::Read, "get_collection", &name)?;
1180 self.read_blocking(move |db| {
1181 let descriptor = db
1182 .descriptor(&name)
1183 .cloned()
1184 .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
1185 let count = if descriptor.multivector {
1188 db.document_count(&name)? as u64
1189 } else {
1190 db.len(&name)? as u64
1191 };
1192 Ok(CollectionInfo {
1193 name,
1194 dim: descriptor.dim,
1195 metric: descriptor.metric,
1196 count,
1197 index: descriptor.index,
1198 filterable: descriptor.filterable,
1199 multivector: descriptor.multivector,
1200 vector_encryption: descriptor.vector_encryption,
1201 })
1202 })
1203 .await
1204 }
1205
1206 pub(crate) async fn list_collections(
1207 &self,
1208 principal: &Principal,
1209 ) -> Result<Vec<CollectionInfo>, Error> {
1210 self.authorize_global(principal, Action::Read, "list_collections")?;
1211 let mut infos = self
1212 .read_blocking(|db| {
1213 let mut out = Vec::new();
1214 for name in db.collection_names() {
1215 if let Some(descriptor) = db.descriptor(&name).cloned() {
1216 let count = if descriptor.multivector {
1217 db.document_count(&name)? as u64
1218 } else {
1219 db.len(&name)? as u64
1220 };
1221 out.push(CollectionInfo {
1222 name,
1223 dim: descriptor.dim,
1224 metric: descriptor.metric,
1225 count,
1226 index: descriptor.index,
1227 filterable: descriptor.filterable,
1228 multivector: descriptor.multivector,
1229 vector_encryption: descriptor.vector_encryption,
1230 });
1231 }
1232 }
1233 Ok(out)
1234 })
1235 .await?;
1236 infos.retain(|info| principal.can_see(&info.name));
1238 Ok(infos)
1239 }
1240
1241 pub(crate) async fn delete_collection(
1242 &self,
1243 principal: &Principal,
1244 name: String,
1245 ) -> Result<bool, Error> {
1246 self.ensure_writable("delete_collection")?;
1247 self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1248 if let Some(c) = &self.cluster {
1249 return c.drop_collection(&name).await;
1250 }
1251 let resource = name.clone();
1252 let result = self
1253 .write_blocking(move |db| db.drop_collection(&name))
1254 .await;
1255 self.audit.record(
1256 principal.actor(),
1257 "delete_collection",
1258 &resource,
1259 Outcome::of(&result),
1260 );
1261 if matches!(result, Ok(true))
1264 && let Ok(mut map) = self.snapshot_cells.write()
1265 {
1266 map.remove(&resource);
1267 }
1268 result
1269 }
1270
1271 #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1272 pub(crate) async fn upsert(
1273 &self,
1274 principal: &Principal,
1275 collection: String,
1276 points: Vec<PointIn>,
1277 ) -> Result<u64, Error> {
1278 self.ensure_writable("upsert")?;
1279 self.authorize(principal, Action::Write, "upsert", &collection)?;
1280 self.limits.check_batch(points.len())?;
1281 for p in &points {
1282 self.limits.check_vector_len(p.vector.len())?;
1283 self.limits.check_payload(&p.payload)?;
1284 }
1285 if let Some(c) = &self.cluster {
1286 return c.upsert(&collection, points).await;
1287 }
1288 #[cfg(feature = "raft")]
1293 if let Some(rs) = self.raft.clone() {
1294 self.ensure_raft_leader(&rs)?;
1295 let prep = collection.clone();
1296 let result = match self
1297 .read_blocking(move |db| {
1298 points
1299 .iter()
1300 .map(|p| db.prepare_upsert(&prep, &p.id, &p.vector, &p.payload))
1301 .collect::<quiver_embed::Result<Vec<_>>>()
1302 })
1303 .await
1304 {
1305 Ok(ops) => self.raft_propose_all(&rs, ops).await,
1306 Err(e) => Err(e),
1307 };
1308 self.audit.record(
1309 principal.actor(),
1310 "upsert",
1311 &collection,
1312 Outcome::of(&result),
1313 );
1314 return result;
1315 }
1316 let resource = collection.clone();
1317 let result = self
1318 .write_blocking(move |db| {
1319 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1320 .iter()
1321 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1322 .collect();
1323 db.upsert_batch(&collection, &records)
1324 })
1325 .await;
1326 self.audit
1327 .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1328 if self.mvcc && result.is_ok() {
1329 self.schedule_if_stale(&resource).await;
1330 }
1331 result
1332 }
1333
1334 pub(crate) async fn upsert_bulk(
1337 &self,
1338 principal: &Principal,
1339 collection: String,
1340 points: Vec<PointIn>,
1341 ) -> Result<u64, Error> {
1342 self.ensure_writable("upsert")?;
1343 self.authorize(principal, Action::Write, "upsert", &collection)?;
1344 self.limits.check_bulk_batch(points.len())?;
1345 for p in &points {
1346 self.limits.check_vector_len(p.vector.len())?;
1347 self.limits.check_payload(&p.payload)?;
1348 }
1349 if let Some(c) = &self.cluster {
1350 return c.upsert_bulk(&collection, points).await;
1351 }
1352 let resource = collection.clone();
1353 let result = self
1354 .write_blocking(move |db| {
1355 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1356 .iter()
1357 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1358 .collect();
1359 db.upsert_bulk(&collection, &records)
1360 })
1361 .await;
1362 self.audit.record(
1363 principal.actor(),
1364 "upsert_bulk",
1365 &resource,
1366 Outcome::of(&result),
1367 );
1368 if self.mvcc && result.is_ok() {
1369 self.schedule_if_stale(&resource).await;
1370 }
1371 result
1372 }
1373
1374 #[tracing::instrument(skip_all)]
1378 pub(crate) async fn snapshot(
1379 &self,
1380 principal: &Principal,
1381 destination: String,
1382 ) -> Result<SnapshotInfo, Error> {
1383 self.ensure_writable("snapshot")?;
1384 self.authorize_global(principal, Action::Admin, "snapshot")?;
1385 let dest = std::path::PathBuf::from(&destination);
1386 let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1387 self.audit.record(
1388 principal.actor(),
1389 "snapshot",
1390 &destination,
1391 Outcome::of(&result),
1392 );
1393 result
1394 }
1395
1396 pub(crate) async fn delete_points(
1397 &self,
1398 principal: &Principal,
1399 collection: String,
1400 ids: Vec<String>,
1401 ) -> Result<u64, Error> {
1402 self.ensure_writable("delete_points")?;
1403 self.authorize(principal, Action::Write, "delete_points", &collection)?;
1404 if let Some(c) = &self.cluster {
1405 return c.delete_points(&collection, ids).await;
1406 }
1407 #[cfg(feature = "raft")]
1411 if let Some(rs) = self.raft.clone() {
1412 self.ensure_raft_leader(&rs)?;
1413 let prep = collection.clone();
1414 let result = match self
1415 .read_blocking(move |db| {
1416 ids.iter()
1417 .map(|id| db.prepare_delete(&prep, id))
1418 .collect::<quiver_embed::Result<Vec<Option<WalOp>>>>()
1419 })
1420 .await
1421 {
1422 Ok(ops) => {
1423 self.raft_propose_all(&rs, ops.into_iter().flatten().collect())
1424 .await
1425 }
1426 Err(e) => Err(e),
1427 };
1428 self.audit.record(
1429 principal.actor(),
1430 "delete_points",
1431 &collection,
1432 Outcome::of(&result),
1433 );
1434 return result;
1435 }
1436 let resource = collection.clone();
1437 let result = self
1438 .write_blocking(move |db| {
1439 let mut count = 0u64;
1440 for id in &ids {
1441 if db.delete(&collection, id)? {
1442 count += 1;
1443 }
1444 }
1445 Ok(count)
1446 })
1447 .await;
1448 self.audit.record(
1449 principal.actor(),
1450 "delete_points",
1451 &resource,
1452 Outcome::of(&result),
1453 );
1454 if self.mvcc && result.is_ok() {
1455 self.schedule_if_stale(&resource).await;
1456 }
1457 result
1458 }
1459
1460 pub(crate) async fn get_points(
1461 &self,
1462 principal: &Principal,
1463 collection: String,
1464 ids: Vec<String>,
1465 with_vector: bool,
1466 ) -> Result<Vec<PointOut>, Error> {
1467 self.authorize(principal, Action::Read, "get_points", &collection)?;
1468 if let Some(c) = &self.cluster {
1469 return c.get_points(&collection, ids, with_vector).await;
1470 }
1471 self.read_blocking(move |db| {
1472 let mut out = Vec::new();
1473 for id in &ids {
1474 if let Some(m) = db.get(&collection, id)? {
1475 out.push(PointOut {
1476 id: m.id,
1477 vector: if with_vector { m.vector } else { None },
1478 payload: m.payload.unwrap_or(Value::Null),
1479 });
1480 }
1481 }
1482 Ok(out)
1483 })
1484 .await
1485 }
1486
1487 #[allow(clippy::too_many_arguments)]
1488 #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1489 pub(crate) async fn search(
1490 &self,
1491 principal: &Principal,
1492 collection: String,
1493 vector: Vec<f32>,
1494 k: usize,
1495 filter: Option<Filter>,
1496 ef_search: usize,
1497 with_payload: bool,
1498 with_vector: bool,
1499 ) -> Result<Vec<MatchOut>, Error> {
1500 self.authorize(principal, Action::Read, "search", &collection)?;
1501 self.limits.check_search(k, ef_search)?;
1502 self.limits.check_vector_len(vector.len())?;
1503
1504 if let Some(c) = &self.cluster {
1506 return c
1507 .search(
1508 &collection,
1509 vector,
1510 k,
1511 filter,
1512 ef_search,
1513 with_payload,
1514 with_vector,
1515 )
1516 .await;
1517 }
1518
1519 if filter.is_none()
1525 && !with_payload
1526 && !with_vector
1527 && let Some(cell) = self.cached_cell(&collection)
1528 {
1529 return tokio::task::spawn_blocking(move || {
1530 let matches = cell.load().search(&vector, k, ef_search)?;
1531 Ok::<_, quiver_embed::Error>(
1532 matches
1533 .into_iter()
1534 .map(|m| MatchOut {
1535 id: m.id,
1536 score: m.score,
1537 payload: None,
1538 vector: None,
1539 })
1540 .collect(),
1541 )
1542 })
1543 .await
1544 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1545 .map_err(Error::Engine);
1546 }
1547
1548 let params = SearchParams {
1549 k,
1550 filter,
1551 ef_search,
1552 with_payload,
1553 with_vector,
1554 };
1555 let coll = collection.clone();
1556 self.search_blocking(coll, move |db| {
1557 let matches = db.search_snapshot(&collection, &vector, ¶ms)?;
1558 Ok(matches
1559 .into_iter()
1560 .map(|m| MatchOut {
1561 id: m.id,
1562 score: m.score,
1563 payload: m.payload,
1564 vector: m.vector,
1565 })
1566 .collect())
1567 })
1568 .await
1569 }
1570
1571 #[allow(clippy::too_many_arguments)]
1572 pub(crate) async fn hybrid_search(
1573 &self,
1574 principal: &Principal,
1575 collection: String,
1576 dense: Option<Vec<f32>>,
1577 sparse: Option<(Vec<u32>, Vec<f32>)>,
1578 text: Option<String>,
1579 k: usize,
1580 filter: Option<Filter>,
1581 ef_search: usize,
1582 rrf_k0: f32,
1583 with_payload: bool,
1584 with_vector: bool,
1585 ) -> Result<Vec<MatchOut>, Error> {
1586 self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1587 self.limits.check_search(k, ef_search)?;
1588 if let Some(v) = &dense {
1589 self.limits.check_vector_len(v.len())?;
1590 }
1591 if let Some((indices, values)) = &sparse {
1592 self.limits.check_sparse_terms(indices.len())?;
1593 if indices.len() != values.len() {
1594 return Err(Error::BadRequest(format!(
1595 "sparse query indices ({}) and values ({}) length mismatch",
1596 indices.len(),
1597 values.len()
1598 )));
1599 }
1600 }
1601 let params = SearchParams {
1602 k,
1603 filter,
1604 ef_search,
1605 with_payload,
1606 with_vector,
1607 };
1608 let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1609 let coll = collection.clone();
1610 self.search_blocking(coll, move |db| {
1611 let matches = db.hybrid_search_snapshot(
1612 &collection,
1613 dense.as_deref(),
1614 sv.as_ref(),
1615 text.as_deref(),
1616 ¶ms,
1617 rrf_k0,
1618 )?;
1619 Ok(matches
1620 .into_iter()
1621 .map(|m| MatchOut {
1622 id: m.id,
1623 score: m.score,
1624 payload: m.payload,
1625 vector: m.vector,
1626 })
1627 .collect())
1628 })
1629 .await
1630 }
1631
1632 #[allow(clippy::too_many_arguments)]
1637 pub(crate) async fn search_text(
1638 &self,
1639 principal: &Principal,
1640 collection: String,
1641 text: String,
1642 k: usize,
1643 filter: Option<Filter>,
1644 ef_search: usize,
1645 rrf_k0: f32,
1646 with_payload: bool,
1647 with_vector: bool,
1648 rerank: bool,
1649 ) -> Result<Vec<MatchOut>, Error> {
1650 self.authorize(principal, Action::Read, "search_text", &collection)?;
1651 self.limits.check_search(k, ef_search)?;
1652 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1653 Error::BadRequest(format!(
1654 "collection {collection:?} has no embedding provider configured \
1655 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1656 ))
1657 })?;
1658 let query = text.clone();
1660 let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1661 .await
1662 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1663 .map_err(|e| Error::Upstream(e.to_string()))?
1664 .into_iter()
1665 .next()
1666 .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1667 self.limits.check_vector_len(vector.len())?;
1668
1669 let reranker = if rerank {
1670 self.embed.reranker(&collection)
1671 } else {
1672 None
1673 };
1674 let need_payload = with_payload || reranker.is_some();
1678 let fetch_k = if reranker.is_some() {
1679 k.max(RERANK_CANDIDATES)
1680 } else {
1681 k
1682 };
1683
1684 let mut hits = self
1685 .hybrid_search(
1686 principal,
1687 collection,
1688 Some(vector),
1689 None,
1690 Some(text.clone()),
1691 fetch_k,
1692 filter,
1693 ef_search,
1694 rrf_k0,
1695 need_payload,
1696 with_vector,
1697 )
1698 .await?;
1699
1700 if let Some(rr) = reranker {
1701 let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1702 let query = text;
1703 let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1704 .await
1705 .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1706 .map_err(|e| Error::Upstream(e.to_string()))?;
1707 let mut scored: Vec<(f32, MatchOut)> = scores
1709 .into_iter()
1710 .zip(hits)
1711 .map(|(s, mut h)| {
1712 h.score = s;
1713 (s, h)
1714 })
1715 .collect();
1716 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1717 hits = scored.into_iter().map(|(_, h)| h).collect();
1718 }
1719
1720 hits.truncate(k);
1721 if !with_payload {
1723 for h in &mut hits {
1724 h.payload = None;
1725 }
1726 }
1727 Ok(hits)
1728 }
1729
1730 pub(crate) async fn upsert_text(
1735 &self,
1736 principal: &Principal,
1737 collection: String,
1738 points: Vec<TextPointIn>,
1739 ) -> Result<u64, Error> {
1740 self.ensure_writable("upsert_text")?;
1741 self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1742 self.limits.check_batch(points.len())?;
1743 for p in &points {
1744 if !matches!(p.payload, Value::Object(_) | Value::Null) {
1745 return Err(Error::BadRequest(
1746 "upsert_text payload must be a JSON object or null".to_owned(),
1747 ));
1748 }
1749 }
1750 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1751 Error::BadRequest(format!(
1752 "collection {collection:?} has no embedding provider configured \
1753 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1754 ))
1755 })?;
1756 let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1757 let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1758 .await
1759 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1760 .map_err(|e| Error::Upstream(e.to_string()))?;
1761 if vectors.len() != points.len() {
1762 return Err(Error::Upstream(format!(
1763 "embedding provider returned {} vectors for {} inputs",
1764 vectors.len(),
1765 points.len()
1766 )));
1767 }
1768 let dense: Vec<PointIn> = points
1769 .into_iter()
1770 .zip(vectors)
1771 .map(|(p, vector)| {
1772 let mut payload = match p.payload {
1773 Value::Object(map) => map,
1774 _ => serde_json::Map::new(),
1775 };
1776 payload
1778 .entry(TEXT_KEY.to_owned())
1779 .or_insert_with(|| Value::String(p.text.clone()));
1780 PointIn {
1781 id: p.id,
1782 vector,
1783 payload: Value::Object(payload),
1784 }
1785 })
1786 .collect();
1787 self.upsert(principal, collection, dense).await
1788 }
1789
1790 #[allow(clippy::too_many_arguments)]
1791 pub(crate) async fn fetch(
1792 &self,
1793 principal: &Principal,
1794 collection: String,
1795 filter: Option<Filter>,
1796 offset: usize,
1797 limit: usize,
1798 with_payload: bool,
1799 with_vector: bool,
1800 ) -> Result<Vec<MatchOut>, Error> {
1801 self.authorize(principal, Action::Read, "fetch", &collection)?;
1802 self.limits.check_fetch(limit)?;
1803 self.read_blocking(move |db| {
1804 let matches = db.fetch(
1805 &collection,
1806 filter.as_ref(),
1807 offset,
1808 limit,
1809 with_payload,
1810 with_vector,
1811 )?;
1812 Ok(matches
1813 .into_iter()
1814 .map(|m| MatchOut {
1815 id: m.id,
1816 score: m.score,
1817 payload: m.payload,
1818 vector: m.vector,
1819 })
1820 .collect())
1821 })
1822 .await
1823 }
1824
1825 pub(crate) async fn upsert_documents(
1826 &self,
1827 principal: &Principal,
1828 collection: String,
1829 documents: Vec<DocumentIn>,
1830 ) -> Result<u64, Error> {
1831 self.ensure_writable("upsert_documents")?;
1832 self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1833 self.limits.check_batch(documents.len())?;
1834 for doc in &documents {
1835 self.limits.check_payload(&doc.payload)?;
1836 for token in &doc.vectors {
1837 self.limits.check_vector_len(token.len())?;
1838 }
1839 }
1840 let resource = collection.clone();
1841 let result = self
1842 .write_blocking(move |db| {
1843 let mut count = 0u64;
1844 for doc in &documents {
1845 db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1846 count += 1;
1847 }
1848 Ok(count)
1849 })
1850 .await;
1851 self.audit.record(
1852 principal.actor(),
1853 "upsert_documents",
1854 &resource,
1855 Outcome::of(&result),
1856 );
1857 result
1858 }
1859
1860 pub(crate) async fn delete_documents(
1861 &self,
1862 principal: &Principal,
1863 collection: String,
1864 ids: Vec<String>,
1865 ) -> Result<u64, Error> {
1866 self.ensure_writable("delete_documents")?;
1867 self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1868 let resource = collection.clone();
1869 let result = self
1870 .write_blocking(move |db| {
1871 let mut count = 0u64;
1872 for id in &ids {
1873 if db.delete_document(&collection, id)? {
1874 count += 1;
1875 }
1876 }
1877 Ok(count)
1878 })
1879 .await;
1880 self.audit.record(
1881 principal.actor(),
1882 "delete_documents",
1883 &resource,
1884 Outcome::of(&result),
1885 );
1886 result
1887 }
1888
1889 #[allow(clippy::too_many_arguments)]
1890 pub(crate) async fn search_multi_vector(
1891 &self,
1892 principal: &Principal,
1893 collection: String,
1894 query: Vec<Vec<f32>>,
1895 k: usize,
1896 filter: Option<Filter>,
1897 ef_search: usize,
1898 with_payload: bool,
1899 with_vector: bool,
1900 ) -> Result<Vec<DocumentMatchOut>, Error> {
1901 self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1902 self.limits.check_search(k, ef_search)?;
1903 for token in &query {
1904 self.limits.check_vector_len(token.len())?;
1905 }
1906 let params = SearchParams {
1907 k,
1908 filter,
1909 ef_search,
1910 with_payload,
1911 with_vector,
1912 };
1913 let coll = collection.clone();
1914 self.search_blocking(coll, move |db| {
1915 let matches = db.search_multi_vector_snapshot(&collection, &query, ¶ms)?;
1916 Ok(matches
1917 .into_iter()
1918 .map(|m| DocumentMatchOut {
1919 id: m.id,
1920 score: m.score,
1921 payload: m.payload,
1922 vectors: m.vectors,
1923 })
1924 .collect())
1925 })
1926 .await
1927 }
1928}
1929
1930const REPLICATION_BUFFER: usize = 1024;
1933
1934pub async fn run(config: Config) -> Result<(), Error> {
1936 config.validate()?;
1937 let rest_listener = TcpListener::bind(config.rest_addr)
1938 .await
1939 .map_err(Error::Io)?;
1940 let grpc_listener = TcpListener::bind(config.grpc_addr)
1941 .await
1942 .map_err(Error::Io)?;
1943 tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1944 tokio::select! {
1945 result = serve(config, rest_listener, grpc_listener) => result,
1946 () = shutdown_signal() => {
1947 tracing::info!("shutdown signal received");
1948 Ok(())
1949 }
1950 }
1951}
1952
1953pub async fn serve(
1956 config: Config,
1957 rest_listener: TcpListener,
1958 grpc_listener: TcpListener,
1959) -> Result<(), Error> {
1960 if config.coordinator {
1964 drop(grpc_listener);
1965 return coordinator::serve_coordinator(config, rest_listener).await;
1966 }
1967 let mut db = open_database(&config)?;
1968 let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1969 let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1973 {
1974 let tx = replication_tx.clone();
1975 db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1976 let _ = tx.send(entry.clone());
1977 }));
1978 }
1979 let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1983 .map_err(|e| Error::Config(e.to_string()))?;
1984
1985 if config.mvcc_reads {
1988 db.set_mvcc_reads(true);
1989 }
1990 let mvcc = db.mvcc_reads();
1991
1992 let cluster = if config.cluster_shards.is_empty() {
1994 None
1995 } else {
1996 let c = cluster::Cluster::new(
1997 config.cluster_shards.clone(),
1998 config.cluster_replicas.clone(),
1999 config.cluster_shard_key.clone(),
2000 )?;
2001 tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
2002 let c = Arc::new(c);
2003 if let Some(coord) = config.coordinator_url.clone() {
2007 let router = c.clone();
2008 tokio::spawn(async move {
2009 loop {
2010 if let Err(e) = router.refresh_from(&coord).await {
2011 tracing::debug!(error = %e, "shard-map refresh failed; will retry");
2012 }
2013 tokio::time::sleep(cluster::MAP_REFRESH_INTERVAL).await;
2014 }
2015 });
2016 }
2017 Some(c)
2018 };
2019
2020 let db = Arc::new(RwLock::new(db));
2024
2025 #[cfg(feature = "raft")]
2029 let raft = if let Some(node_id) = config.raft_node_id {
2030 let members = parse_raft_members(&config.raft_members)?;
2031 let applier = raft::EngineApplier::new(Arc::clone(&db));
2032 let log_dir = config.data_dir.join("raft");
2034 let shard = raft::start_member(node_id, members, applier, &log_dir)
2035 .await
2036 .map_err(|e| Error::Config(format!("raft startup: {e}")))?;
2037 tracing::info!(node_id, "per-shard raft write HA enabled");
2038 Some(Arc::new(shard))
2039 } else {
2040 None
2041 };
2042
2043 let state = AppState {
2044 db,
2045 keys: Arc::new(config.api_keys.clone()),
2046 audit,
2047 replication_tx,
2048 read_only: config.leader_url.is_some(),
2049 limits: config.limits,
2050 embed: Arc::new(embed),
2051 rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
2052 metrics: Arc::new(metrics::Metrics::default()),
2053 rebuilding: Arc::new(Mutex::new(HashSet::new())),
2054 snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
2055 mvcc,
2056 cluster,
2057 #[cfg(feature = "raft")]
2058 raft,
2059 };
2060
2061 if let Some(leader_url) = config.leader_url.clone() {
2063 replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
2064 }
2065
2066 let app = rest::router(state.clone());
2067 #[cfg(feature = "raft")]
2070 let raft_service = state
2071 .raft
2072 .as_ref()
2073 .map(|rs| raft::grpc::RaftRpc::service(rs.raft.clone()));
2074 let grpc = grpc::service(state);
2075
2076 let tls = load_tls(&config)?;
2077
2078 let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
2080 Some(material) => {
2081 let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
2082 let std_listener = rest_listener.into_std().map_err(Error::Io)?;
2083 let server =
2084 axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
2085 Box::pin(async move {
2086 server
2087 .serve(app.into_make_service())
2088 .await
2089 .map_err(Error::Io)
2090 })
2091 }
2092 None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
2093 };
2094
2095 let mut grpc_builder = tonic::transport::Server::builder();
2097 if let Some(material) = &tls {
2098 let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
2099 let mut tls_config = ServerTlsConfig::new().identity(identity);
2100 if let Some(ca_pem) = &material.client_ca_pem {
2102 tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
2103 }
2104 grpc_builder = grpc_builder
2105 .tls_config(tls_config)
2106 .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
2107 }
2108 let grpc_fut = async move {
2109 let routes = grpc_builder.add_service(grpc);
2110 #[cfg(feature = "raft")]
2112 let routes = routes.add_optional_service(raft_service);
2113 routes
2114 .serve_with_incoming(TcpListenerStream::new(grpc_listener))
2115 .await
2116 .map_err(|e| Error::Internal(format!("grpc server: {e}")))
2117 };
2118
2119 tokio::try_join!(rest_fut, grpc_fut)?;
2120 Ok(())
2121}
2122
2123async fn shutdown_signal() {
2124 let _ = tokio::signal::ctrl_c().await;
2125}
2126
2127#[cfg(feature = "raft")]
2132fn parse_raft_members(
2133 entries: &[String],
2134) -> Result<std::collections::BTreeMap<u64, String>, Error> {
2135 let mut members = std::collections::BTreeMap::new();
2136 for entry in entries {
2137 let (id, url) = entry
2138 .split_once('=')
2139 .ok_or_else(|| Error::Config(format!("raft member '{entry}' must be '<id>=<url>'")))?;
2140 let id: u64 = id
2141 .trim()
2142 .parse()
2143 .map_err(|_| Error::Config(format!("raft member id '{id}' is not a number")))?;
2144 members.insert(id, url.trim().to_owned());
2145 }
2146 if members.is_empty() {
2147 return Err(Error::Config(
2148 "raft_node_id is set but raft_members is empty".to_owned(),
2149 ));
2150 }
2151 Ok(members)
2152}
2153
2154#[cfg(feature = "raft")]
2158fn map_client_write_err(
2159 err: openraft::error::RaftError<
2160 u64,
2161 openraft::error::ClientWriteError<u64, openraft::BasicNode>,
2162 >,
2163) -> Error {
2164 use openraft::error::{ClientWriteError, RaftError};
2165 match err {
2166 RaftError::APIError(ClientWriteError::ForwardToLeader(f)) => Error::NotLeader {
2167 leader: f.leader_node.map(|n| n.addr),
2168 },
2169 other => Error::Internal(format!("raft write failed: {other}")),
2170 }
2171}
2172
2173fn open_database(config: &Config) -> Result<Database, Error> {
2179 let master_key = config.master_key_hex()?;
2180 let keyring =
2181 quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
2182 .map_err(|e| Error::Config(e.to_string()))?;
2183 let db = match keyring {
2184 Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
2185 None => Database::open(&config.data_dir)?,
2186 };
2187 Ok(db)
2188}
2189
2190struct TlsMaterial {
2194 cert_pem: Vec<u8>,
2195 key_pem: Vec<u8>,
2196 client_ca_pem: Option<Vec<u8>>,
2197 rest_config: Arc<rustls::ServerConfig>,
2198}
2199
2200fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
2205 match (&config.tls_cert, &config.tls_key) {
2206 (Some(cert_path), Some(key_path)) => {
2207 let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
2208 let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
2209 let client_ca_pem = config
2210 .tls_client_ca
2211 .as_ref()
2212 .map(std::fs::read)
2213 .transpose()
2214 .map_err(Error::Io)?;
2215 let rest_config = Arc::new(rustls_server_config(
2216 &cert_pem,
2217 &key_pem,
2218 client_ca_pem.as_deref(),
2219 )?);
2220 Ok(Some(TlsMaterial {
2221 cert_pem,
2222 key_pem,
2223 client_ca_pem,
2224 rest_config,
2225 }))
2226 }
2227 (None, None) => Ok(None),
2228 _ => Err(Error::Config(
2229 "tls_cert and tls_key must be set together".to_owned(),
2230 )),
2231 }
2232}
2233
2234fn rustls_server_config(
2238 cert_pem: &[u8],
2239 key_pem: &[u8],
2240 client_ca_pem: Option<&[u8]>,
2241) -> Result<rustls::ServerConfig, Error> {
2242 use rustls_pki_types::pem::PemObject;
2243 use rustls_pki_types::{CertificateDer, PrivateKeyDer};
2244
2245 let certs = CertificateDer::pem_slice_iter(cert_pem)
2246 .collect::<std::result::Result<Vec<_>, _>>()
2247 .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
2248 if certs.is_empty() {
2249 return Err(Error::Config(
2250 "tls_cert contains no certificates".to_owned(),
2251 ));
2252 }
2253 let key = PrivateKeyDer::from_pem_slice(key_pem)
2254 .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
2255 let provider = Arc::new(rustls::crypto::ring::default_provider());
2256 let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
2257 .with_safe_default_protocol_versions()
2258 .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
2259 let builder = match client_ca_pem {
2260 Some(ca_pem) => {
2261 let mut roots = rustls::RootCertStore::empty();
2262 for cert in CertificateDer::pem_slice_iter(ca_pem) {
2263 let cert =
2264 cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
2265 roots
2266 .add(cert)
2267 .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
2268 }
2269 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
2270 Arc::new(roots),
2271 provider,
2272 )
2273 .build()
2274 .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
2275 builder.with_client_cert_verifier(verifier)
2276 }
2277 None => builder.with_no_client_auth(),
2278 };
2279 builder
2280 .with_single_cert(certs, key)
2281 .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
2282}
2283
2284pub fn init_tracing() {
2287 init_observability(&Config::default());
2288}
2289
2290#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
2296pub fn init_observability(config: &Config) {
2297 use tracing_subscriber::EnvFilter;
2298 use tracing_subscriber::prelude::*;
2299 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
2300 let registry = tracing_subscriber::registry()
2301 .with(filter)
2302 .with(tracing_subscriber::fmt::layer());
2303
2304 #[cfg(feature = "otlp")]
2305 if config.otlp.is_enabled() {
2306 match otlp::build_provider(&config.otlp) {
2307 Ok(provider) => {
2308 use opentelemetry::trace::TracerProvider as _;
2309 let tracer = provider.tracer("quiver");
2310 otlp::store_provider(provider);
2311 let _ = registry
2312 .with(tracing_opentelemetry::layer().with_tracer(tracer))
2313 .try_init();
2314 return;
2315 }
2316 Err(e) => eprintln!("OTLP traces export disabled: {e}"),
2317 }
2318 }
2319
2320 let _ = registry.try_init();
2321}
2322
2323pub fn shutdown_observability() {
2327 #[cfg(feature = "otlp")]
2328 otlp::shutdown();
2329}
2330
2331#[cfg(test)]
2332mod tests {
2333 use super::*;
2334
2335 const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
2337
2338 #[test]
2339 fn config_rejects_missing_keys_unless_insecure() {
2340 let mut config = Config::default();
2341 assert!(config.validate().is_err());
2342 config.insecure = true;
2343 assert!(config.validate().is_ok());
2344 config.insecure = false;
2345 config.api_keys = vec!["secret".into()];
2346 config.encryption_key = Some(TEST_KEY.to_owned());
2347 assert!(config.validate().is_ok());
2348 }
2349
2350 #[test]
2351 fn config_requires_encryption_key_unless_insecure() {
2352 let mut config = Config {
2353 api_keys: vec!["secret".into()],
2354 ..Config::default()
2355 };
2356 assert!(config.validate().is_err());
2358 config.encryption_key = Some(TEST_KEY.to_owned());
2359 assert!(config.validate().is_ok());
2360 config.encryption_key = Some("not-a-valid-hex-key".to_owned());
2362 assert!(config.validate().is_err());
2363 config.insecure = true;
2365 config.encryption_key = None;
2366 assert!(config.validate().is_ok());
2367 }
2368
2369 #[test]
2370 fn master_key_file_is_an_alternative_to_the_env_key() {
2371 let dir = tempfile::tempdir().unwrap();
2372 let path = dir.path().join("master.key");
2373 std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2375
2376 let mut config = Config {
2377 api_keys: vec!["secret".into()],
2378 master_key_file: Some(path.clone()),
2379 ..Config::default()
2380 };
2381 assert!(config.validate().is_ok());
2383 assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2384
2385 config.encryption_key = Some(TEST_KEY.to_owned());
2387 assert!(config.validate().is_err());
2388
2389 config.encryption_key = None;
2391 std::fs::write(&path, "not-a-valid-key").unwrap();
2392 assert!(config.validate().is_err());
2393 }
2394
2395 #[test]
2396 fn config_rejects_public_bind_without_optout() {
2397 let mut config = Config {
2398 api_keys: vec!["secret".into()],
2399 encryption_key: Some(TEST_KEY.to_owned()),
2400 ..Config::default()
2401 };
2402 config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2403 assert!(config.validate().is_err());
2405 config.insecure = true;
2406 assert!(config.validate().is_ok());
2407 }
2408
2409 #[test]
2410 fn config_public_bind_allowed_with_tls() {
2411 let config = Config {
2412 api_keys: vec!["secret".into()],
2413 encryption_key: Some(TEST_KEY.to_owned()),
2414 tls_cert: Some(PathBuf::from("cert.pem")),
2415 tls_key: Some(PathBuf::from("key.pem")),
2416 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2417 ..Config::default()
2418 };
2419 assert!(config.validate().is_ok());
2421 }
2422
2423 #[test]
2424 fn config_tls_cert_and_key_must_pair() {
2425 let mut config = Config {
2426 api_keys: vec!["secret".into()],
2427 encryption_key: Some(TEST_KEY.to_owned()),
2428 tls_cert: Some(PathBuf::from("cert.pem")),
2429 ..Config::default()
2430 };
2431 assert!(config.validate().is_err());
2433 config.tls_key = Some(PathBuf::from("key.pem"));
2434 assert!(config.validate().is_ok());
2435 }
2436}