1mod audit;
33mod auth;
34mod cluster;
35mod coordinator;
36mod error;
37mod grpc;
38mod metrics;
39mod otlp;
40#[cfg(feature = "raft")]
41pub mod raft;
42mod rate_limit;
43mod replication;
44mod rest;
45
46use std::collections::{HashMap, HashSet};
47use std::future::Future;
48use std::net::{IpAddr, Ipv4Addr, SocketAddr};
49use std::path::PathBuf;
50use std::pin::Pin;
51use std::sync::{Arc, Mutex, RwLock};
52
53use axum_server::tls_rustls::RustlsConfig;
54use figment::Figment;
55use figment::providers::{Env, Format, Serialized, Toml};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use tokio::net::TcpListener;
59use tokio::sync::broadcast;
60use tokio_stream::wrappers::TcpListenerStream;
61use tonic::transport::{Certificate, Identity, ServerTlsConfig};
62
63use quiver_crypto::AeadCodec;
64use quiver_embed::{
65 Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
66 SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
67};
68use quiver_query::Filter;
69
70pub use auth::{Action, ApiKey, CollectionScope};
71pub use error::Error;
72pub use otlp::OtlpConfig;
73pub use coordinator::AutoscaleConfig;
76pub use quiver_providers::{
77 EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
78 RerankProvider,
79};
80pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
81
82use audit::{AuditLog, Outcome};
83use auth::Principal;
84
85#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
93#[serde(default)]
94pub struct Limits {
95 pub max_k: usize,
97 pub max_ef_search: usize,
99 pub max_fetch_limit: usize,
101 pub max_vector_dim: usize,
104 pub max_payload_bytes: usize,
106 pub max_batch_size: usize,
108 pub max_request_body_bytes: usize,
111 pub max_sparse_terms: usize,
114 pub max_bulk_batch_size: usize,
119}
120
121impl Default for Limits {
122 fn default() -> Self {
123 Self {
124 max_k: 10_000,
125 max_ef_search: 4_096,
126 max_fetch_limit: 10_000,
127 max_vector_dim: 8_192,
128 max_payload_bytes: 65_536,
129 max_batch_size: 1_000,
130 max_request_body_bytes: 32 * 1024 * 1024,
131 max_sparse_terms: 4_096,
132 max_bulk_batch_size: 50_000,
133 }
134 }
135}
136
137impl Limits {
138 fn apply_env_overrides(&mut self) -> Result<(), Error> {
142 let slots: [(&str, &mut usize); 9] = [
143 ("QUIVER_MAX_K", &mut self.max_k),
144 ("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
145 ("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
146 ("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
147 ("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
148 ("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
149 (
150 "QUIVER_MAX_REQUEST_BODY_BYTES",
151 &mut self.max_request_body_bytes,
152 ),
153 ("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
154 ("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
155 ];
156 for (key, slot) in slots {
157 if let Ok(raw) = std::env::var(key) {
158 *slot = raw.parse().map_err(|_| {
159 Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
160 })?;
161 }
162 }
163 Ok(())
164 }
165
166 fn validate(&self) -> Result<(), Error> {
168 let named = [
169 ("max_k", self.max_k),
170 ("max_ef_search", self.max_ef_search),
171 ("max_fetch_limit", self.max_fetch_limit),
172 ("max_vector_dim", self.max_vector_dim),
173 ("max_payload_bytes", self.max_payload_bytes),
174 ("max_batch_size", self.max_batch_size),
175 ("max_request_body_bytes", self.max_request_body_bytes),
176 ("max_sparse_terms", self.max_sparse_terms),
177 ("max_bulk_batch_size", self.max_bulk_batch_size),
178 ];
179 if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
180 return Err(Error::Config(format!(
181 "limits.{name} must be greater than zero"
182 )));
183 }
184 Ok(())
185 }
186
187 fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
188 if k > self.max_k {
189 return Err(Error::BadRequest(format!(
190 "k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
191 self.max_k
192 )));
193 }
194 if ef_search > self.max_ef_search {
195 return Err(Error::BadRequest(format!(
196 "ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
197 self.max_ef_search
198 )));
199 }
200 Ok(())
201 }
202
203 fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
204 if n > self.max_sparse_terms {
205 return Err(Error::BadRequest(format!(
206 "sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
207 self.max_sparse_terms
208 )));
209 }
210 Ok(())
211 }
212
213 fn check_fetch(&self, limit: usize) -> Result<(), Error> {
214 if limit > self.max_fetch_limit {
215 return Err(Error::BadRequest(format!(
216 "limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
217 self.max_fetch_limit
218 )));
219 }
220 Ok(())
221 }
222
223 fn check_dim(&self, dim: usize) -> Result<(), Error> {
224 if dim > self.max_vector_dim {
225 return Err(Error::BadRequest(format!(
226 "dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
227 self.max_vector_dim
228 )));
229 }
230 Ok(())
231 }
232
233 fn check_vector_len(&self, len: usize) -> Result<(), Error> {
234 if len > self.max_vector_dim {
235 return Err(Error::BadRequest(format!(
236 "vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
237 self.max_vector_dim
238 )));
239 }
240 Ok(())
241 }
242
243 fn check_batch(&self, n: usize) -> Result<(), Error> {
244 if n > self.max_batch_size {
245 return Err(Error::BadRequest(format!(
246 "batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
247 self.max_batch_size
248 )));
249 }
250 Ok(())
251 }
252
253 fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
254 if n > self.max_bulk_batch_size {
255 return Err(Error::BadRequest(format!(
256 "bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
257 self.max_bulk_batch_size
258 )));
259 }
260 Ok(())
261 }
262
263 fn check_payload(&self, payload: &Value) -> Result<(), Error> {
264 let size = serde_json::to_vec(payload)
265 .map(|v| v.len())
266 .map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
267 if size > self.max_payload_bytes {
268 return Err(Error::BadRequest(format!(
269 "payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
270 self.max_payload_bytes
271 )));
272 }
273 Ok(())
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
280#[serde(default)]
281pub struct Config {
282 pub data_dir: PathBuf,
284 pub rest_addr: SocketAddr,
286 pub grpc_addr: SocketAddr,
288 #[serde(default, deserialize_with = "auth::de_api_keys")]
294 pub api_keys: Vec<ApiKey>,
295 pub encryption_key: Option<String>,
303 pub master_key_file: Option<PathBuf>,
311 pub tls_cert: Option<PathBuf>,
314 pub tls_key: Option<PathBuf>,
317 pub tls_client_ca: Option<PathBuf>,
322 pub audit_log: Option<PathBuf>,
328 pub leader_url: Option<String>,
334 pub leader_api_key: Option<String>,
337 pub insecure: bool,
341 pub limits: Limits,
344 #[serde(default)]
350 pub embedding: HashMap<String, EmbeddingConfig>,
351 #[serde(default)]
355 pub rerank: HashMap<String, RerankConfig>,
356 #[serde(default)]
360 pub rate_limit: RateLimitConfig,
361 #[serde(default)]
365 pub otlp: OtlpConfig,
366 #[serde(default)]
371 pub mvcc_reads: bool,
372 #[serde(default)]
379 pub cluster_shards: Vec<String>,
380 #[serde(default)]
388 pub cluster_replicas: Vec<String>,
389 #[serde(default)]
392 pub cluster_shard_key: Option<String>,
393 #[serde(default)]
399 pub coordinator: bool,
400 #[serde(default)]
405 pub coordinator_url: Option<String>,
406 #[serde(default)]
410 pub coordinator_state: Option<PathBuf>,
411 #[serde(default)]
416 pub autoscale: AutoscaleConfig,
417 #[serde(default)]
426 pub raft_node_id: Option<u64>,
427 #[serde(default)]
435 pub raft_members: Vec<String>,
436}
437
438impl Default for Config {
439 fn default() -> Self {
440 Self {
441 data_dir: PathBuf::from("./quiver-data"),
442 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
443 grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
444 api_keys: Vec::new(),
445 encryption_key: None,
446 master_key_file: None,
447 tls_cert: None,
448 tls_key: None,
449 tls_client_ca: None,
450 audit_log: None,
451 leader_url: None,
452 leader_api_key: None,
453 insecure: false,
454 limits: Limits::default(),
455 embedding: HashMap::new(),
456 rerank: HashMap::new(),
457 rate_limit: RateLimitConfig::default(),
458 otlp: OtlpConfig::default(),
459 mvcc_reads: false,
460 cluster_shards: Vec::new(),
461 cluster_replicas: Vec::new(),
462 cluster_shard_key: None,
463 coordinator: false,
464 coordinator_url: None,
465 coordinator_state: None,
466 autoscale: AutoscaleConfig::default(),
467 raft_node_id: None,
468 raft_members: Vec::new(),
469 }
470 }
471}
472
473impl Config {
474 pub fn load() -> Result<Self, Error> {
477 let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
478 .merge(Toml::file("quiver.toml"))
479 .merge(Env::prefixed("QUIVER_"))
480 .extract()
481 .map_err(|e| Error::Config(e.to_string()))?;
482 config.limits.apply_env_overrides()?;
485 config
487 .rate_limit
488 .apply_env_overrides()
489 .map_err(Error::Config)?;
490 config.otlp.apply_env_overrides().map_err(Error::Config)?;
492 Ok(config)
493 }
494
495 pub fn validate(&self) -> Result<(), Error> {
499 if self.api_keys.is_empty() && !self.insecure {
500 return Err(Error::Config(
501 "no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
502 set insecure=true for local development"
503 .to_owned(),
504 ));
505 }
506 let master_key = self.master_key_hex()?;
508 if master_key.is_none() && !self.insecure {
509 return Err(Error::Config(
510 "no encryption key configured: encryption-at-rest is on by default — \
511 set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
512 QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
513 store data unencrypted (development only)"
514 .to_owned(),
515 ));
516 }
517 if let Some(key) = &master_key {
519 AeadCodec::from_hex(key)
520 .map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
521 }
522 if self.tls_cert.is_some() != self.tls_key.is_some() {
524 return Err(Error::Config(
525 "tls_cert and tls_key must be set together".to_owned(),
526 ));
527 }
528 if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
530 return Err(Error::Config(
531 "tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
532 ));
533 }
534 let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
535 let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
536 if non_loopback && !tls_enabled && !self.insecure {
537 return Err(Error::Config(
538 "non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
539 or insecure=true for local development"
540 .to_owned(),
541 ));
542 }
543 self.limits.validate()?;
545 Ok(())
546 }
547
548 pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
558 let env_key = self
559 .encryption_key
560 .as_deref()
561 .map(str::trim)
562 .filter(|k| !k.is_empty());
563 match (&self.master_key_file, env_key) {
564 (Some(_), Some(_)) => Err(Error::Config(
565 "set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
566 (QUIVER_MASTER_KEY_FILE), not both"
567 .to_owned(),
568 )),
569 (Some(path), None) => {
570 warn_if_world_readable(path);
571 let hex = std::fs::read_to_string(path).map_err(|e| {
572 Error::Config(format!("reading master_key_file {}: {e}", path.display()))
573 })?;
574 Ok(Some(hex.trim().to_owned()))
575 }
576 (None, Some(key)) => Ok(Some(key.to_owned())),
577 (None, None) => Ok(None),
578 }
579 }
580}
581
582#[cfg(unix)]
585fn warn_if_world_readable(path: &std::path::Path) {
586 use std::os::unix::fs::PermissionsExt;
587 if let Ok(meta) = std::fs::metadata(path)
588 && meta.permissions().mode() & 0o077 != 0
589 {
590 tracing::warn!(
591 path = %path.display(),
592 mode = format!("{:o}", meta.permissions().mode() & 0o777),
593 "master key file is group/world-accessible; restrict it to 0600"
594 );
595 }
596}
597
598#[cfg(not(unix))]
599fn warn_if_world_readable(_path: &std::path::Path) {}
600
601#[derive(Clone)]
604pub(crate) struct AppState {
605 db: Arc<RwLock<Database>>,
610 keys: Arc<Vec<ApiKey>>,
611 audit: Arc<AuditLog>,
612 replication_tx: broadcast::Sender<WalEntry>,
615 read_only: bool,
618 limits: Limits,
621 embed: Arc<EmbedRegistry>,
625 rate_limiter: Arc<RateLimiter>,
627 metrics: Arc<metrics::Metrics>,
629 rebuilding: Arc<Mutex<HashSet<String>>>,
633 snapshot_cells: Arc<RwLock<HashMap<String, quiver_embed::SnapshotCell>>>,
640 mvcc: bool,
645 cluster: Option<Arc<cluster::Cluster>>,
649 #[cfg(feature = "raft")]
655 raft: Option<Arc<raft::RaftShard>>,
656}
657
658pub(crate) struct CollectionInfo {
660 pub name: String,
661 pub dim: u32,
662 pub metric: DistanceMetric,
663 pub count: u64,
664 pub index: IndexSpec,
665 pub filterable: Vec<FilterableField>,
666 pub multivector: bool,
667 pub vector_encryption: VectorEncryption,
668}
669
670pub(crate) struct PointIn {
672 pub id: String,
673 pub vector: Vec<f32>,
674 pub payload: Value,
675}
676
677pub(crate) struct TextPointIn {
679 pub id: String,
680 pub text: String,
681 pub payload: Value,
682}
683
684const RERANK_CANDIDATES: usize = 50;
687
688fn doc_text(payload: Option<&Value>) -> String {
692 match payload {
693 Some(Value::Object(map)) => map
694 .get(TEXT_KEY)
695 .and_then(Value::as_str)
696 .map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
697 Some(v) => v.to_string(),
698 None => String::new(),
699 }
700}
701
702pub(crate) struct PointOut {
704 pub id: String,
705 pub vector: Option<Vec<f32>>,
706 pub payload: Value,
707}
708
709pub(crate) struct MatchOut {
711 pub id: String,
712 pub score: f32,
713 pub payload: Option<Value>,
714 pub vector: Option<Vec<f32>>,
715}
716
717pub(crate) struct DocumentIn {
719 pub id: String,
720 pub vectors: Vec<Vec<f32>>,
721 pub payload: Value,
722}
723
724pub(crate) struct DocumentMatchOut {
726 pub id: String,
727 pub score: f32,
728 pub payload: Option<Value>,
729 pub vectors: Option<Vec<Vec<f32>>>,
730}
731
732impl AppState {
733 pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
737 auth::authenticate(&self.keys, presented)
738 }
739
740 pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
744 self.rate_limiter.check(actor)
745 }
746
747 pub(crate) fn rate_limit_enabled(&self) -> bool {
750 self.rate_limiter.enabled()
751 }
752
753 async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
757 where
758 T: Send + 'static,
759 F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
760 {
761 let db = Arc::clone(&self.db);
762 tokio::task::spawn_blocking(move || -> Result<T, Error> {
763 let mut guard = db
764 .write()
765 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
766 f(&mut guard).map_err(Error::Engine)
767 })
768 .await
769 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
770 }
771
772 async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
776 where
777 T: Send + 'static,
778 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
779 {
780 let db = Arc::clone(&self.db);
781 tokio::task::spawn_blocking(move || -> Result<T, Error> {
782 let guard = db
783 .read()
784 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
785 f(&guard).map_err(Error::Engine)
786 })
787 .await
788 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
789 }
790
791 async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
798 where
799 T: Send + 'static,
800 F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
801 {
802 let db = Arc::clone(&self.db);
803 let cells = Arc::clone(&self.snapshot_cells);
804 let coll = collection.clone();
805 let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
806 let guard = db
807 .read()
808 .map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
809 let result = f(&guard).map_err(Error::Engine)?;
810 let stale = guard.needs_rebuild(&coll).unwrap_or(false);
813 if !stale
818 && let Ok(Some(cell)) = guard.mvcc_cell(&coll)
819 && let Ok(mut map) = cells.write()
820 {
821 map.entry(coll.clone()).or_insert(cell);
822 }
823 Ok((result, stale))
824 })
825 .await
826 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
827 if stale {
828 self.schedule_rebuild(collection);
829 }
830 Ok(result)
831 }
832
833 fn cached_cell(&self, collection: &str) -> Option<quiver_embed::SnapshotCell> {
837 self.snapshot_cells.read().ok()?.get(collection).cloned()
838 }
839
840 async fn schedule_if_stale(&self, collection: &str) {
846 let db = Arc::clone(&self.db);
847 let coll = collection.to_owned();
848 let stale = tokio::task::spawn_blocking(move || {
849 db.read()
850 .ok()
851 .and_then(|g| g.needs_rebuild(&coll).ok())
852 .unwrap_or(false)
853 })
854 .await
855 .unwrap_or(false);
856 if stale {
857 self.schedule_rebuild(collection.to_owned());
858 }
859 }
860
861 fn schedule_rebuild(&self, collection: String) {
865 {
866 let mut inflight = match self.rebuilding.lock() {
867 Ok(g) => g,
868 Err(_) => return,
869 };
870 if !inflight.insert(collection.clone()) {
871 return; }
873 }
874 let state = self.clone();
875 tokio::spawn(async move {
876 state.run_rebuild(&collection).await;
877 if let Ok(mut inflight) = state.rebuilding.lock() {
878 inflight.remove(&collection);
879 }
880 });
881 }
882
883 async fn run_rebuild(&self, collection: &str) {
890 loop {
891 let db = Arc::clone(&self.db);
892 let coll = collection.to_owned();
893 let inputs = tokio::task::spawn_blocking(move || {
894 let guard = db.read().ok()?;
895 guard.snapshot_rebuild_inputs(&coll).ok().flatten()
896 })
897 .await
898 .ok()
899 .flatten();
900 let Some(inputs) = inputs else { return };
901
902 let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
903 return;
904 };
905
906 let db = Arc::clone(&self.db);
907 let still_stale = tokio::task::spawn_blocking(move || {
908 let mut guard = db.write().ok()?;
909 guard.commit_rebuild(rebuilt).ok()
910 })
911 .await
912 .ok()
913 .flatten();
914 match still_stale {
915 Some(true) => continue, _ => return,
917 }
918 }
919 }
920
921 fn authorize(
924 &self,
925 principal: &Principal,
926 action: Action,
927 op: &str,
928 resource: &str,
929 ) -> Result<(), Error> {
930 principal
931 .require(action, Some(resource))
932 .inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
933 }
934
935 fn authorize_global(
938 &self,
939 principal: &Principal,
940 action: Action,
941 op: &str,
942 ) -> Result<(), Error> {
943 principal
944 .require(action, None)
945 .inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
946 }
947
948 pub(crate) async fn open_replication(
955 &self,
956 principal: &Principal,
957 ) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
958 self.authorize_global(principal, Action::Admin, "replicate")?;
959 let tx = self.replication_tx.clone();
960 self.read_blocking(move |db| {
961 let rx = tx.subscribe();
962 let snapshot = db.replication_snapshot()?;
963 Ok((snapshot, rx))
964 })
965 .await
966 }
967
968 pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
972 self.write_blocking(move |db| db.apply_replicated(op)).await
973 }
974
975 #[cfg(feature = "raft")]
982 fn ensure_raft_leader(&self, rs: &raft::RaftShard) -> Result<(), Error> {
983 let leader = rs.raft.metrics().borrow().current_leader;
984 if leader == Some(rs.node_id) {
985 Ok(())
986 } else {
987 Err(Error::NotLeader {
988 leader: leader.and_then(|id| rs.member_url(id)),
989 })
990 }
991 }
992
993 pub(crate) async fn raft_add_voter(
997 &self,
998 principal: &Principal,
999 id: u64,
1000 url: String,
1001 ) -> Result<(), Error> {
1002 self.authorize_global(principal, Action::Admin, "raft_add_voter")?;
1003 #[cfg(feature = "raft")]
1004 {
1005 let rs = self
1006 .raft
1007 .clone()
1008 .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1009 return rs
1010 .add_voter(id, url)
1011 .await
1012 .map_err(|e| Error::Internal(format!("add raft voter: {e}")));
1013 }
1014 #[cfg(not(feature = "raft"))]
1015 {
1016 let _ = (id, url);
1017 Err(Error::BadRequest(
1018 "server built without the raft feature".to_owned(),
1019 ))
1020 }
1021 }
1022
1023 pub(crate) async fn raft_remove_voter(
1026 &self,
1027 principal: &Principal,
1028 id: u64,
1029 ) -> Result<(), Error> {
1030 self.authorize_global(principal, Action::Admin, "raft_remove_voter")?;
1031 #[cfg(feature = "raft")]
1032 {
1033 let rs = self
1034 .raft
1035 .clone()
1036 .ok_or_else(|| Error::BadRequest("this node is not a raft shard".to_owned()))?;
1037 return rs
1038 .remove_voter(id)
1039 .await
1040 .map_err(|e| Error::Internal(format!("remove raft voter: {e}")));
1041 }
1042 #[cfg(not(feature = "raft"))]
1043 {
1044 let _ = id;
1045 Err(Error::BadRequest(
1046 "server built without the raft feature".to_owned(),
1047 ))
1048 }
1049 }
1050
1051 #[cfg(feature = "raft")]
1055 async fn raft_propose(&self, rs: &raft::RaftShard, op: WalOp) -> Result<(), Error> {
1056 rs.raft
1057 .client_write(op)
1058 .await
1059 .map(|_| ())
1060 .map_err(map_client_write_err)
1061 }
1062
1063 #[cfg(feature = "raft")]
1067 async fn raft_propose_all(&self, rs: &raft::RaftShard, ops: Vec<WalOp>) -> Result<u64, Error> {
1068 let mut committed = 0u64;
1069 for op in ops {
1070 self.raft_propose(rs, op).await?;
1071 committed += 1;
1072 }
1073 Ok(committed)
1074 }
1075
1076 fn ensure_writable(&self, op: &str) -> Result<(), Error> {
1079 if self.read_only {
1080 return Err(Error::Forbidden(format!(
1081 "{op}: this node is a read-only replication follower"
1082 )));
1083 }
1084 Ok(())
1085 }
1086
1087 #[allow(clippy::too_many_arguments)]
1088 pub(crate) async fn create_collection(
1089 &self,
1090 principal: &Principal,
1091 name: String,
1092 dim: u32,
1093 metric: DistanceMetric,
1094 index: IndexSpec,
1095 filterable: Vec<FilterableField>,
1096 multivector: bool,
1097 vector_encryption: VectorEncryption,
1098 ) -> Result<CollectionInfo, Error> {
1099 self.ensure_writable("create_collection")?;
1100 self.authorize(principal, Action::Admin, "create_collection", &name)?;
1101 self.limits.check_dim(dim as usize)?;
1102 if let Some(c) = &self.cluster {
1103 return c
1104 .create_collection(
1105 name,
1106 dim,
1107 metric,
1108 index,
1109 filterable,
1110 multivector,
1111 vector_encryption,
1112 )
1113 .await;
1114 }
1115 #[cfg(feature = "raft")]
1120 if let Some(rs) = self.raft.clone() {
1121 self.ensure_raft_leader(&rs)?;
1122 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1123 .with_index(index)
1124 .with_filterable(filterable.clone())
1125 .with_multivector(multivector)
1126 .with_vector_encryption(vector_encryption);
1127 let _guard = rs.create_lock.lock().await;
1128 let prep_name = name.clone();
1129 let result = match self
1130 .read_blocking(move |db| db.prepare_create_collection(&prep_name, &descriptor))
1131 .await
1132 {
1133 Ok(op) => self.raft_propose(&rs, op).await,
1134 Err(e) => Err(e),
1135 };
1136 self.audit.record(
1137 principal.actor(),
1138 "create_collection",
1139 &name,
1140 Outcome::of(&result),
1141 );
1142 result?;
1143 return Ok(CollectionInfo {
1144 name,
1145 dim,
1146 metric,
1147 count: 0,
1148 index,
1149 filterable,
1150 multivector,
1151 vector_encryption,
1152 });
1153 }
1154 let descriptor = Descriptor::new(dim, Dtype::F32, metric)
1155 .with_index(index)
1156 .with_filterable(filterable.clone())
1157 .with_multivector(multivector)
1158 .with_vector_encryption(vector_encryption);
1159 let owned = name.clone();
1160 let result = self
1161 .write_blocking(move |db| db.create_collection(&owned, descriptor))
1162 .await;
1163 self.audit.record(
1164 principal.actor(),
1165 "create_collection",
1166 &name,
1167 Outcome::of(&result),
1168 );
1169 result?;
1170 Ok(CollectionInfo {
1171 name,
1172 dim,
1173 metric,
1174 count: 0,
1175 index,
1176 filterable,
1177 multivector,
1178 vector_encryption,
1179 })
1180 }
1181
1182 pub(crate) async fn get_collection(
1183 &self,
1184 principal: &Principal,
1185 name: String,
1186 ) -> Result<CollectionInfo, Error> {
1187 self.authorize(principal, Action::Read, "get_collection", &name)?;
1188 self.read_blocking(move |db| {
1189 let descriptor = db
1190 .descriptor(&name)
1191 .cloned()
1192 .ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
1193 let count = if descriptor.multivector {
1196 db.document_count(&name)? as u64
1197 } else {
1198 db.len(&name)? as u64
1199 };
1200 Ok(CollectionInfo {
1201 name,
1202 dim: descriptor.dim,
1203 metric: descriptor.metric,
1204 count,
1205 index: descriptor.index,
1206 filterable: descriptor.filterable,
1207 multivector: descriptor.multivector,
1208 vector_encryption: descriptor.vector_encryption,
1209 })
1210 })
1211 .await
1212 }
1213
1214 pub(crate) async fn list_collections(
1215 &self,
1216 principal: &Principal,
1217 ) -> Result<Vec<CollectionInfo>, Error> {
1218 self.authorize_global(principal, Action::Read, "list_collections")?;
1219 let mut infos = self
1220 .read_blocking(|db| {
1221 let mut out = Vec::new();
1222 for name in db.collection_names() {
1223 if let Some(descriptor) = db.descriptor(&name).cloned() {
1224 let count = if descriptor.multivector {
1225 db.document_count(&name)? as u64
1226 } else {
1227 db.len(&name)? as u64
1228 };
1229 out.push(CollectionInfo {
1230 name,
1231 dim: descriptor.dim,
1232 metric: descriptor.metric,
1233 count,
1234 index: descriptor.index,
1235 filterable: descriptor.filterable,
1236 multivector: descriptor.multivector,
1237 vector_encryption: descriptor.vector_encryption,
1238 });
1239 }
1240 }
1241 Ok(out)
1242 })
1243 .await?;
1244 infos.retain(|info| principal.can_see(&info.name));
1246 Ok(infos)
1247 }
1248
1249 pub(crate) async fn delete_collection(
1250 &self,
1251 principal: &Principal,
1252 name: String,
1253 ) -> Result<bool, Error> {
1254 self.ensure_writable("delete_collection")?;
1255 self.authorize(principal, Action::Admin, "delete_collection", &name)?;
1256 if let Some(c) = &self.cluster {
1257 return c.drop_collection(&name).await;
1258 }
1259 let resource = name.clone();
1260 let result = self
1261 .write_blocking(move |db| db.drop_collection(&name))
1262 .await;
1263 self.audit.record(
1264 principal.actor(),
1265 "delete_collection",
1266 &resource,
1267 Outcome::of(&result),
1268 );
1269 if matches!(result, Ok(true))
1272 && let Ok(mut map) = self.snapshot_cells.write()
1273 {
1274 map.remove(&resource);
1275 }
1276 result
1277 }
1278
1279 #[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
1280 pub(crate) async fn upsert(
1281 &self,
1282 principal: &Principal,
1283 collection: String,
1284 points: Vec<PointIn>,
1285 ) -> Result<u64, Error> {
1286 self.ensure_writable("upsert")?;
1287 self.authorize(principal, Action::Write, "upsert", &collection)?;
1288 self.limits.check_batch(points.len())?;
1289 for p in &points {
1290 self.limits.check_vector_len(p.vector.len())?;
1291 self.limits.check_payload(&p.payload)?;
1292 }
1293 if let Some(c) = &self.cluster {
1294 return c.upsert(&collection, points).await;
1295 }
1296 #[cfg(feature = "raft")]
1301 if let Some(rs) = self.raft.clone() {
1302 self.ensure_raft_leader(&rs)?;
1303 let prep = collection.clone();
1304 let result = match self
1305 .read_blocking(move |db| {
1306 points
1307 .iter()
1308 .map(|p| db.prepare_upsert(&prep, &p.id, &p.vector, &p.payload))
1309 .collect::<quiver_embed::Result<Vec<_>>>()
1310 })
1311 .await
1312 {
1313 Ok(ops) => self.raft_propose_all(&rs, ops).await,
1314 Err(e) => Err(e),
1315 };
1316 self.audit.record(
1317 principal.actor(),
1318 "upsert",
1319 &collection,
1320 Outcome::of(&result),
1321 );
1322 return result;
1323 }
1324 let resource = collection.clone();
1325 let result = self
1326 .write_blocking(move |db| {
1327 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1328 .iter()
1329 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1330 .collect();
1331 db.upsert_batch(&collection, &records)
1332 })
1333 .await;
1334 self.audit
1335 .record(principal.actor(), "upsert", &resource, Outcome::of(&result));
1336 if self.mvcc && result.is_ok() {
1337 self.schedule_if_stale(&resource).await;
1338 }
1339 result
1340 }
1341
1342 pub(crate) async fn upsert_bulk(
1345 &self,
1346 principal: &Principal,
1347 collection: String,
1348 points: Vec<PointIn>,
1349 ) -> Result<u64, Error> {
1350 self.ensure_writable("upsert")?;
1351 self.authorize(principal, Action::Write, "upsert", &collection)?;
1352 self.limits.check_bulk_batch(points.len())?;
1353 for p in &points {
1354 self.limits.check_vector_len(p.vector.len())?;
1355 self.limits.check_payload(&p.payload)?;
1356 }
1357 if let Some(c) = &self.cluster {
1358 return c.upsert_bulk(&collection, points).await;
1359 }
1360 let resource = collection.clone();
1361 let result = self
1362 .write_blocking(move |db| {
1363 let records: Vec<(&str, &[f32], &serde_json::Value)> = points
1364 .iter()
1365 .map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
1366 .collect();
1367 db.upsert_bulk(&collection, &records)
1368 })
1369 .await;
1370 self.audit.record(
1371 principal.actor(),
1372 "upsert_bulk",
1373 &resource,
1374 Outcome::of(&result),
1375 );
1376 if self.mvcc && result.is_ok() {
1377 self.schedule_if_stale(&resource).await;
1378 }
1379 result
1380 }
1381
1382 #[tracing::instrument(skip_all)]
1386 pub(crate) async fn snapshot(
1387 &self,
1388 principal: &Principal,
1389 destination: String,
1390 ) -> Result<SnapshotInfo, Error> {
1391 self.ensure_writable("snapshot")?;
1392 self.authorize_global(principal, Action::Admin, "snapshot")?;
1393 let dest = std::path::PathBuf::from(&destination);
1394 let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
1395 self.audit.record(
1396 principal.actor(),
1397 "snapshot",
1398 &destination,
1399 Outcome::of(&result),
1400 );
1401 result
1402 }
1403
1404 pub(crate) async fn delete_points(
1405 &self,
1406 principal: &Principal,
1407 collection: String,
1408 ids: Vec<String>,
1409 ) -> Result<u64, Error> {
1410 self.ensure_writable("delete_points")?;
1411 self.authorize(principal, Action::Write, "delete_points", &collection)?;
1412 if let Some(c) = &self.cluster {
1413 return c.delete_points(&collection, ids).await;
1414 }
1415 #[cfg(feature = "raft")]
1419 if let Some(rs) = self.raft.clone() {
1420 self.ensure_raft_leader(&rs)?;
1421 let prep = collection.clone();
1422 let result = match self
1423 .read_blocking(move |db| {
1424 ids.iter()
1425 .map(|id| db.prepare_delete(&prep, id))
1426 .collect::<quiver_embed::Result<Vec<Option<WalOp>>>>()
1427 })
1428 .await
1429 {
1430 Ok(ops) => {
1431 self.raft_propose_all(&rs, ops.into_iter().flatten().collect())
1432 .await
1433 }
1434 Err(e) => Err(e),
1435 };
1436 self.audit.record(
1437 principal.actor(),
1438 "delete_points",
1439 &collection,
1440 Outcome::of(&result),
1441 );
1442 return result;
1443 }
1444 let resource = collection.clone();
1445 let result = self
1446 .write_blocking(move |db| {
1447 let mut count = 0u64;
1448 for id in &ids {
1449 if db.delete(&collection, id)? {
1450 count += 1;
1451 }
1452 }
1453 Ok(count)
1454 })
1455 .await;
1456 self.audit.record(
1457 principal.actor(),
1458 "delete_points",
1459 &resource,
1460 Outcome::of(&result),
1461 );
1462 if self.mvcc && result.is_ok() {
1463 self.schedule_if_stale(&resource).await;
1464 }
1465 result
1466 }
1467
1468 pub(crate) async fn get_points(
1469 &self,
1470 principal: &Principal,
1471 collection: String,
1472 ids: Vec<String>,
1473 with_vector: bool,
1474 ) -> Result<Vec<PointOut>, Error> {
1475 self.authorize(principal, Action::Read, "get_points", &collection)?;
1476 if let Some(c) = &self.cluster {
1477 return c.get_points(&collection, ids, with_vector).await;
1478 }
1479 self.read_blocking(move |db| {
1480 let mut out = Vec::new();
1481 for id in &ids {
1482 if let Some(m) = db.get(&collection, id)? {
1483 out.push(PointOut {
1484 id: m.id,
1485 vector: if with_vector { m.vector } else { None },
1486 payload: m.payload.unwrap_or(Value::Null),
1487 });
1488 }
1489 }
1490 Ok(out)
1491 })
1492 .await
1493 }
1494
1495 #[allow(clippy::too_many_arguments)]
1496 #[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
1497 pub(crate) async fn search(
1498 &self,
1499 principal: &Principal,
1500 collection: String,
1501 vector: Vec<f32>,
1502 k: usize,
1503 filter: Option<Filter>,
1504 ef_search: usize,
1505 with_payload: bool,
1506 with_vector: bool,
1507 ) -> Result<Vec<MatchOut>, Error> {
1508 self.authorize(principal, Action::Read, "search", &collection)?;
1509 self.limits.check_search(k, ef_search)?;
1510 self.limits.check_vector_len(vector.len())?;
1511
1512 if let Some(c) = &self.cluster {
1514 return c
1515 .search(
1516 &collection,
1517 vector,
1518 k,
1519 filter,
1520 ef_search,
1521 with_payload,
1522 with_vector,
1523 )
1524 .await;
1525 }
1526
1527 if filter.is_none()
1533 && !with_payload
1534 && !with_vector
1535 && let Some(cell) = self.cached_cell(&collection)
1536 {
1537 return tokio::task::spawn_blocking(move || {
1538 let matches = cell.load().search(&vector, k, ef_search)?;
1539 Ok::<_, quiver_embed::Error>(
1540 matches
1541 .into_iter()
1542 .map(|m| MatchOut {
1543 id: m.id,
1544 score: m.score,
1545 payload: None,
1546 vector: None,
1547 })
1548 .collect(),
1549 )
1550 })
1551 .await
1552 .map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
1553 .map_err(Error::Engine);
1554 }
1555
1556 let params = SearchParams {
1557 k,
1558 filter,
1559 ef_search,
1560 with_payload,
1561 with_vector,
1562 };
1563 let coll = collection.clone();
1564 self.search_blocking(coll, move |db| {
1565 let matches = db.search_snapshot(&collection, &vector, ¶ms)?;
1566 Ok(matches
1567 .into_iter()
1568 .map(|m| MatchOut {
1569 id: m.id,
1570 score: m.score,
1571 payload: m.payload,
1572 vector: m.vector,
1573 })
1574 .collect())
1575 })
1576 .await
1577 }
1578
1579 #[allow(clippy::too_many_arguments)]
1580 pub(crate) async fn hybrid_search(
1581 &self,
1582 principal: &Principal,
1583 collection: String,
1584 dense: Option<Vec<f32>>,
1585 sparse: Option<(Vec<u32>, Vec<f32>)>,
1586 text: Option<String>,
1587 k: usize,
1588 filter: Option<Filter>,
1589 ef_search: usize,
1590 rrf_k0: f32,
1591 with_payload: bool,
1592 with_vector: bool,
1593 ) -> Result<Vec<MatchOut>, Error> {
1594 self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
1595 self.limits.check_search(k, ef_search)?;
1596 if let Some(v) = &dense {
1597 self.limits.check_vector_len(v.len())?;
1598 }
1599 if let Some((indices, values)) = &sparse {
1600 self.limits.check_sparse_terms(indices.len())?;
1601 if indices.len() != values.len() {
1602 return Err(Error::BadRequest(format!(
1603 "sparse query indices ({}) and values ({}) length mismatch",
1604 indices.len(),
1605 values.len()
1606 )));
1607 }
1608 }
1609 let params = SearchParams {
1610 k,
1611 filter,
1612 ef_search,
1613 with_payload,
1614 with_vector,
1615 };
1616 let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
1617 let coll = collection.clone();
1618 self.search_blocking(coll, move |db| {
1619 let matches = db.hybrid_search_snapshot(
1620 &collection,
1621 dense.as_deref(),
1622 sv.as_ref(),
1623 text.as_deref(),
1624 ¶ms,
1625 rrf_k0,
1626 )?;
1627 Ok(matches
1628 .into_iter()
1629 .map(|m| MatchOut {
1630 id: m.id,
1631 score: m.score,
1632 payload: m.payload,
1633 vector: m.vector,
1634 })
1635 .collect())
1636 })
1637 .await
1638 }
1639
1640 #[allow(clippy::too_many_arguments)]
1645 pub(crate) async fn search_text(
1646 &self,
1647 principal: &Principal,
1648 collection: String,
1649 text: String,
1650 k: usize,
1651 filter: Option<Filter>,
1652 ef_search: usize,
1653 rrf_k0: f32,
1654 with_payload: bool,
1655 with_vector: bool,
1656 rerank: bool,
1657 ) -> Result<Vec<MatchOut>, Error> {
1658 self.authorize(principal, Action::Read, "search_text", &collection)?;
1659 self.limits.check_search(k, ef_search)?;
1660 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1661 Error::BadRequest(format!(
1662 "collection {collection:?} has no embedding provider configured \
1663 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1664 ))
1665 })?;
1666 let query = text.clone();
1668 let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
1669 .await
1670 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1671 .map_err(|e| Error::Upstream(e.to_string()))?
1672 .into_iter()
1673 .next()
1674 .ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
1675 self.limits.check_vector_len(vector.len())?;
1676
1677 let reranker = if rerank {
1678 self.embed.reranker(&collection)
1679 } else {
1680 None
1681 };
1682 let need_payload = with_payload || reranker.is_some();
1686 let fetch_k = if reranker.is_some() {
1687 k.max(RERANK_CANDIDATES)
1688 } else {
1689 k
1690 };
1691
1692 let mut hits = self
1693 .hybrid_search(
1694 principal,
1695 collection,
1696 Some(vector),
1697 None,
1698 Some(text.clone()),
1699 fetch_k,
1700 filter,
1701 ef_search,
1702 rrf_k0,
1703 need_payload,
1704 with_vector,
1705 )
1706 .await?;
1707
1708 if let Some(rr) = reranker {
1709 let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
1710 let query = text;
1711 let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
1712 .await
1713 .map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
1714 .map_err(|e| Error::Upstream(e.to_string()))?;
1715 let mut scored: Vec<(f32, MatchOut)> = scores
1717 .into_iter()
1718 .zip(hits)
1719 .map(|(s, mut h)| {
1720 h.score = s;
1721 (s, h)
1722 })
1723 .collect();
1724 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1725 hits = scored.into_iter().map(|(_, h)| h).collect();
1726 }
1727
1728 hits.truncate(k);
1729 if !with_payload {
1731 for h in &mut hits {
1732 h.payload = None;
1733 }
1734 }
1735 Ok(hits)
1736 }
1737
1738 pub(crate) async fn upsert_text(
1743 &self,
1744 principal: &Principal,
1745 collection: String,
1746 points: Vec<TextPointIn>,
1747 ) -> Result<u64, Error> {
1748 self.ensure_writable("upsert_text")?;
1749 self.authorize(principal, Action::Write, "upsert_text", &collection)?;
1750 self.limits.check_batch(points.len())?;
1751 for p in &points {
1752 if !matches!(p.payload, Value::Object(_) | Value::Null) {
1753 return Err(Error::BadRequest(
1754 "upsert_text payload must be a JSON object or null".to_owned(),
1755 ));
1756 }
1757 }
1758 let embedder = self.embed.embedder(&collection).ok_or_else(|| {
1759 Error::BadRequest(format!(
1760 "collection {collection:?} has no embedding provider configured \
1761 (set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
1762 ))
1763 })?;
1764 let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
1765 let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
1766 .await
1767 .map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
1768 .map_err(|e| Error::Upstream(e.to_string()))?;
1769 if vectors.len() != points.len() {
1770 return Err(Error::Upstream(format!(
1771 "embedding provider returned {} vectors for {} inputs",
1772 vectors.len(),
1773 points.len()
1774 )));
1775 }
1776 let dense: Vec<PointIn> = points
1777 .into_iter()
1778 .zip(vectors)
1779 .map(|(p, vector)| {
1780 let mut payload = match p.payload {
1781 Value::Object(map) => map,
1782 _ => serde_json::Map::new(),
1783 };
1784 payload
1786 .entry(TEXT_KEY.to_owned())
1787 .or_insert_with(|| Value::String(p.text.clone()));
1788 PointIn {
1789 id: p.id,
1790 vector,
1791 payload: Value::Object(payload),
1792 }
1793 })
1794 .collect();
1795 self.upsert(principal, collection, dense).await
1796 }
1797
1798 #[allow(clippy::too_many_arguments)]
1799 pub(crate) async fn fetch(
1800 &self,
1801 principal: &Principal,
1802 collection: String,
1803 filter: Option<Filter>,
1804 offset: usize,
1805 limit: usize,
1806 with_payload: bool,
1807 with_vector: bool,
1808 ) -> Result<Vec<MatchOut>, Error> {
1809 self.authorize(principal, Action::Read, "fetch", &collection)?;
1810 self.limits.check_fetch(limit)?;
1811 self.read_blocking(move |db| {
1812 let matches = db.fetch(
1813 &collection,
1814 filter.as_ref(),
1815 offset,
1816 limit,
1817 with_payload,
1818 with_vector,
1819 )?;
1820 Ok(matches
1821 .into_iter()
1822 .map(|m| MatchOut {
1823 id: m.id,
1824 score: m.score,
1825 payload: m.payload,
1826 vector: m.vector,
1827 })
1828 .collect())
1829 })
1830 .await
1831 }
1832
1833 pub(crate) async fn upsert_documents(
1834 &self,
1835 principal: &Principal,
1836 collection: String,
1837 documents: Vec<DocumentIn>,
1838 ) -> Result<u64, Error> {
1839 self.ensure_writable("upsert_documents")?;
1840 self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
1841 self.limits.check_batch(documents.len())?;
1842 for doc in &documents {
1843 self.limits.check_payload(&doc.payload)?;
1844 for token in &doc.vectors {
1845 self.limits.check_vector_len(token.len())?;
1846 }
1847 }
1848 let resource = collection.clone();
1849 let result = self
1850 .write_blocking(move |db| {
1851 let mut count = 0u64;
1852 for doc in &documents {
1853 db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
1854 count += 1;
1855 }
1856 Ok(count)
1857 })
1858 .await;
1859 self.audit.record(
1860 principal.actor(),
1861 "upsert_documents",
1862 &resource,
1863 Outcome::of(&result),
1864 );
1865 result
1866 }
1867
1868 pub(crate) async fn delete_documents(
1869 &self,
1870 principal: &Principal,
1871 collection: String,
1872 ids: Vec<String>,
1873 ) -> Result<u64, Error> {
1874 self.ensure_writable("delete_documents")?;
1875 self.authorize(principal, Action::Write, "delete_documents", &collection)?;
1876 let resource = collection.clone();
1877 let result = self
1878 .write_blocking(move |db| {
1879 let mut count = 0u64;
1880 for id in &ids {
1881 if db.delete_document(&collection, id)? {
1882 count += 1;
1883 }
1884 }
1885 Ok(count)
1886 })
1887 .await;
1888 self.audit.record(
1889 principal.actor(),
1890 "delete_documents",
1891 &resource,
1892 Outcome::of(&result),
1893 );
1894 result
1895 }
1896
1897 #[allow(clippy::too_many_arguments)]
1898 pub(crate) async fn search_multi_vector(
1899 &self,
1900 principal: &Principal,
1901 collection: String,
1902 query: Vec<Vec<f32>>,
1903 k: usize,
1904 filter: Option<Filter>,
1905 ef_search: usize,
1906 with_payload: bool,
1907 with_vector: bool,
1908 ) -> Result<Vec<DocumentMatchOut>, Error> {
1909 self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
1910 self.limits.check_search(k, ef_search)?;
1911 for token in &query {
1912 self.limits.check_vector_len(token.len())?;
1913 }
1914 let params = SearchParams {
1915 k,
1916 filter,
1917 ef_search,
1918 with_payload,
1919 with_vector,
1920 };
1921 let coll = collection.clone();
1922 self.search_blocking(coll, move |db| {
1923 let matches = db.search_multi_vector_snapshot(&collection, &query, ¶ms)?;
1924 Ok(matches
1925 .into_iter()
1926 .map(|m| DocumentMatchOut {
1927 id: m.id,
1928 score: m.score,
1929 payload: m.payload,
1930 vectors: m.vectors,
1931 })
1932 .collect())
1933 })
1934 .await
1935 }
1936}
1937
1938const REPLICATION_BUFFER: usize = 1024;
1941
1942pub async fn run(config: Config) -> Result<(), Error> {
1944 config.validate()?;
1945 let rest_listener = TcpListener::bind(config.rest_addr)
1946 .await
1947 .map_err(Error::Io)?;
1948 let grpc_listener = TcpListener::bind(config.grpc_addr)
1949 .await
1950 .map_err(Error::Io)?;
1951 tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
1952 tokio::select! {
1953 result = serve(config, rest_listener, grpc_listener) => result,
1954 () = shutdown_signal() => {
1955 tracing::info!("shutdown signal received");
1956 Ok(())
1957 }
1958 }
1959}
1960
1961pub async fn serve(
1964 config: Config,
1965 rest_listener: TcpListener,
1966 grpc_listener: TcpListener,
1967) -> Result<(), Error> {
1968 if config.coordinator {
1972 drop(grpc_listener);
1973 return coordinator::serve_coordinator(config, rest_listener).await;
1974 }
1975 let mut db = open_database(&config)?;
1976 let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
1977 let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
1981 {
1982 let tx = replication_tx.clone();
1983 db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
1984 let _ = tx.send(entry.clone());
1985 }));
1986 }
1987 let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
1991 .map_err(|e| Error::Config(e.to_string()))?;
1992
1993 if config.mvcc_reads {
1996 db.set_mvcc_reads(true);
1997 }
1998 let mvcc = db.mvcc_reads();
1999
2000 let cluster = if config.cluster_shards.is_empty() {
2002 None
2003 } else {
2004 let c = cluster::Cluster::new(
2005 config.cluster_shards.clone(),
2006 config.cluster_replicas.clone(),
2007 config.cluster_shard_key.clone(),
2008 )?;
2009 tracing::info!(shards = c.shard_count(), "quiver cluster router enabled");
2010 let c = Arc::new(c);
2011 if let Some(coord) = config.coordinator_url.clone() {
2015 let router = c.clone();
2016 tokio::spawn(async move {
2017 loop {
2018 if let Err(e) = router.refresh_from(&coord).await {
2019 tracing::debug!(error = %e, "shard-map refresh failed; will retry");
2020 }
2021 tokio::time::sleep(cluster::MAP_REFRESH_INTERVAL).await;
2022 }
2023 });
2024 }
2025 Some(c)
2026 };
2027
2028 let db = Arc::new(RwLock::new(db));
2032
2033 #[cfg(feature = "raft")]
2037 let raft = if let Some(node_id) = config.raft_node_id {
2038 let members = parse_raft_members(&config.raft_members)?;
2039 let applier = raft::EngineApplier::new(Arc::clone(&db));
2040 let log_dir = config.data_dir.join("raft");
2042 let shard = raft::start_member(node_id, members, applier, &log_dir)
2043 .await
2044 .map_err(|e| Error::Config(format!("raft startup: {e}")))?;
2045 tracing::info!(node_id, "per-shard raft write HA enabled");
2046 Some(Arc::new(shard))
2047 } else {
2048 None
2049 };
2050
2051 let state = AppState {
2052 db,
2053 keys: Arc::new(config.api_keys.clone()),
2054 audit,
2055 replication_tx,
2056 read_only: config.leader_url.is_some(),
2057 limits: config.limits,
2058 embed: Arc::new(embed),
2059 rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
2060 metrics: Arc::new(metrics::Metrics::default()),
2061 rebuilding: Arc::new(Mutex::new(HashSet::new())),
2062 snapshot_cells: Arc::new(RwLock::new(HashMap::new())),
2063 mvcc,
2064 cluster,
2065 #[cfg(feature = "raft")]
2066 raft,
2067 };
2068
2069 if let Some(leader_url) = config.leader_url.clone() {
2071 replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
2072 }
2073
2074 let app = rest::router(state.clone());
2075 #[cfg(feature = "raft")]
2078 let raft_service = state
2079 .raft
2080 .as_ref()
2081 .map(|rs| raft::grpc::RaftRpc::service(rs.raft.clone()));
2082 let grpc = grpc::service(state);
2083
2084 let tls = load_tls(&config)?;
2085
2086 let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
2088 Some(material) => {
2089 let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
2090 let std_listener = rest_listener.into_std().map_err(Error::Io)?;
2091 let server =
2092 axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
2093 Box::pin(async move {
2094 server
2095 .serve(app.into_make_service())
2096 .await
2097 .map_err(Error::Io)
2098 })
2099 }
2100 None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
2101 };
2102
2103 let mut grpc_builder = tonic::transport::Server::builder();
2105 if let Some(material) = &tls {
2106 let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
2107 let mut tls_config = ServerTlsConfig::new().identity(identity);
2108 if let Some(ca_pem) = &material.client_ca_pem {
2110 tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
2111 }
2112 grpc_builder = grpc_builder
2113 .tls_config(tls_config)
2114 .map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
2115 }
2116 let grpc_fut = async move {
2117 let routes = grpc_builder.add_service(grpc);
2118 #[cfg(feature = "raft")]
2120 let routes = routes.add_optional_service(raft_service);
2121 routes
2122 .serve_with_incoming(TcpListenerStream::new(grpc_listener))
2123 .await
2124 .map_err(|e| Error::Internal(format!("grpc server: {e}")))
2125 };
2126
2127 tokio::try_join!(rest_fut, grpc_fut)?;
2128 Ok(())
2129}
2130
2131async fn shutdown_signal() {
2132 let _ = tokio::signal::ctrl_c().await;
2133}
2134
2135#[cfg(feature = "raft")]
2140fn parse_raft_members(
2141 entries: &[String],
2142) -> Result<std::collections::BTreeMap<u64, String>, Error> {
2143 let mut members = std::collections::BTreeMap::new();
2144 for entry in entries {
2145 let (id, url) = entry
2146 .split_once('=')
2147 .ok_or_else(|| Error::Config(format!("raft member '{entry}' must be '<id>=<url>'")))?;
2148 let id: u64 = id
2149 .trim()
2150 .parse()
2151 .map_err(|_| Error::Config(format!("raft member id '{id}' is not a number")))?;
2152 members.insert(id, url.trim().to_owned());
2153 }
2154 if members.is_empty() {
2155 return Err(Error::Config(
2156 "raft_node_id is set but raft_members is empty".to_owned(),
2157 ));
2158 }
2159 Ok(members)
2160}
2161
2162#[cfg(feature = "raft")]
2166fn map_client_write_err(
2167 err: openraft::error::RaftError<
2168 u64,
2169 openraft::error::ClientWriteError<u64, openraft::BasicNode>,
2170 >,
2171) -> Error {
2172 use openraft::error::{ClientWriteError, RaftError};
2173 match err {
2174 RaftError::APIError(ClientWriteError::ForwardToLeader(f)) => Error::NotLeader {
2175 leader: f.leader_node.map(|n| n.addr),
2176 },
2177 other => Error::Internal(format!("raft write failed: {other}")),
2178 }
2179}
2180
2181fn open_database(config: &Config) -> Result<Database, Error> {
2187 let master_key = config.master_key_hex()?;
2188 let keyring =
2189 quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
2190 .map_err(|e| Error::Config(e.to_string()))?;
2191 let db = match keyring {
2192 Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
2193 None => Database::open(&config.data_dir)?,
2194 };
2195 Ok(db)
2196}
2197
2198struct TlsMaterial {
2202 cert_pem: Vec<u8>,
2203 key_pem: Vec<u8>,
2204 client_ca_pem: Option<Vec<u8>>,
2205 rest_config: Arc<rustls::ServerConfig>,
2206}
2207
2208fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
2213 match (&config.tls_cert, &config.tls_key) {
2214 (Some(cert_path), Some(key_path)) => {
2215 let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
2216 let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
2217 let client_ca_pem = config
2218 .tls_client_ca
2219 .as_ref()
2220 .map(std::fs::read)
2221 .transpose()
2222 .map_err(Error::Io)?;
2223 let rest_config = Arc::new(rustls_server_config(
2224 &cert_pem,
2225 &key_pem,
2226 client_ca_pem.as_deref(),
2227 )?);
2228 Ok(Some(TlsMaterial {
2229 cert_pem,
2230 key_pem,
2231 client_ca_pem,
2232 rest_config,
2233 }))
2234 }
2235 (None, None) => Ok(None),
2236 _ => Err(Error::Config(
2237 "tls_cert and tls_key must be set together".to_owned(),
2238 )),
2239 }
2240}
2241
2242fn rustls_server_config(
2246 cert_pem: &[u8],
2247 key_pem: &[u8],
2248 client_ca_pem: Option<&[u8]>,
2249) -> Result<rustls::ServerConfig, Error> {
2250 use rustls_pki_types::pem::PemObject;
2251 use rustls_pki_types::{CertificateDer, PrivateKeyDer};
2252
2253 let certs = CertificateDer::pem_slice_iter(cert_pem)
2254 .collect::<std::result::Result<Vec<_>, _>>()
2255 .map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
2256 if certs.is_empty() {
2257 return Err(Error::Config(
2258 "tls_cert contains no certificates".to_owned(),
2259 ));
2260 }
2261 let key = PrivateKeyDer::from_pem_slice(key_pem)
2262 .map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
2263 let provider = Arc::new(rustls::crypto::ring::default_provider());
2264 let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
2265 .with_safe_default_protocol_versions()
2266 .map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
2267 let builder = match client_ca_pem {
2268 Some(ca_pem) => {
2269 let mut roots = rustls::RootCertStore::empty();
2270 for cert in CertificateDer::pem_slice_iter(ca_pem) {
2271 let cert =
2272 cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
2273 roots
2274 .add(cert)
2275 .map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
2276 }
2277 let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
2278 Arc::new(roots),
2279 provider,
2280 )
2281 .build()
2282 .map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
2283 builder.with_client_cert_verifier(verifier)
2284 }
2285 None => builder.with_no_client_auth(),
2286 };
2287 builder
2288 .with_single_cert(certs, key)
2289 .map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
2290}
2291
2292pub fn init_tracing() {
2295 init_observability(&Config::default());
2296}
2297
2298#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
2304pub fn init_observability(config: &Config) {
2305 use tracing_subscriber::EnvFilter;
2306 use tracing_subscriber::prelude::*;
2307 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
2308 let registry = tracing_subscriber::registry()
2309 .with(filter)
2310 .with(tracing_subscriber::fmt::layer());
2311
2312 #[cfg(feature = "otlp")]
2313 if config.otlp.is_enabled() {
2314 match otlp::build_provider(&config.otlp) {
2315 Ok(provider) => {
2316 use opentelemetry::trace::TracerProvider as _;
2317 let tracer = provider.tracer("quiver");
2318 otlp::store_provider(provider);
2319 let _ = registry
2320 .with(tracing_opentelemetry::layer().with_tracer(tracer))
2321 .try_init();
2322 return;
2323 }
2324 Err(e) => eprintln!("OTLP traces export disabled: {e}"),
2325 }
2326 }
2327
2328 let _ = registry.try_init();
2329}
2330
2331pub fn shutdown_observability() {
2335 #[cfg(feature = "otlp")]
2336 otlp::shutdown();
2337}
2338
2339#[cfg(test)]
2340mod tests {
2341 use super::*;
2342
2343 const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
2345
2346 #[test]
2347 fn config_rejects_missing_keys_unless_insecure() {
2348 let mut config = Config::default();
2349 assert!(config.validate().is_err());
2350 config.insecure = true;
2351 assert!(config.validate().is_ok());
2352 config.insecure = false;
2353 config.api_keys = vec!["secret".into()];
2354 config.encryption_key = Some(TEST_KEY.to_owned());
2355 assert!(config.validate().is_ok());
2356 }
2357
2358 #[test]
2359 fn config_requires_encryption_key_unless_insecure() {
2360 let mut config = Config {
2361 api_keys: vec!["secret".into()],
2362 ..Config::default()
2363 };
2364 assert!(config.validate().is_err());
2366 config.encryption_key = Some(TEST_KEY.to_owned());
2367 assert!(config.validate().is_ok());
2368 config.encryption_key = Some("not-a-valid-hex-key".to_owned());
2370 assert!(config.validate().is_err());
2371 config.insecure = true;
2373 config.encryption_key = None;
2374 assert!(config.validate().is_ok());
2375 }
2376
2377 #[test]
2378 fn master_key_file_is_an_alternative_to_the_env_key() {
2379 let dir = tempfile::tempdir().unwrap();
2380 let path = dir.path().join("master.key");
2381 std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
2383
2384 let mut config = Config {
2385 api_keys: vec!["secret".into()],
2386 master_key_file: Some(path.clone()),
2387 ..Config::default()
2388 };
2389 assert!(config.validate().is_ok());
2391 assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
2392
2393 config.encryption_key = Some(TEST_KEY.to_owned());
2395 assert!(config.validate().is_err());
2396
2397 config.encryption_key = None;
2399 std::fs::write(&path, "not-a-valid-key").unwrap();
2400 assert!(config.validate().is_err());
2401 }
2402
2403 #[test]
2404 fn config_rejects_public_bind_without_optout() {
2405 let mut config = Config {
2406 api_keys: vec!["secret".into()],
2407 encryption_key: Some(TEST_KEY.to_owned()),
2408 ..Config::default()
2409 };
2410 config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
2411 assert!(config.validate().is_err());
2413 config.insecure = true;
2414 assert!(config.validate().is_ok());
2415 }
2416
2417 #[test]
2418 fn config_public_bind_allowed_with_tls() {
2419 let config = Config {
2420 api_keys: vec!["secret".into()],
2421 encryption_key: Some(TEST_KEY.to_owned()),
2422 tls_cert: Some(PathBuf::from("cert.pem")),
2423 tls_key: Some(PathBuf::from("key.pem")),
2424 rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
2425 ..Config::default()
2426 };
2427 assert!(config.validate().is_ok());
2429 }
2430
2431 #[test]
2432 fn config_tls_cert_and_key_must_pair() {
2433 let mut config = Config {
2434 api_keys: vec!["secret".into()],
2435 encryption_key: Some(TEST_KEY.to_owned()),
2436 tls_cert: Some(PathBuf::from("cert.pem")),
2437 ..Config::default()
2438 };
2439 assert!(config.validate().is_err());
2441 config.tls_key = Some(PathBuf::from("key.pem"));
2442 assert!(config.validate().is_ok());
2443 }
2444}