mod audit;
mod auth;
mod error;
mod grpc;
mod metrics;
mod otlp;
mod rate_limit;
mod replication;
mod rest;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Mutex, RwLock};
use axum_server::tls_rustls::RustlsConfig;
use figment::Figment;
use figment::providers::{Env, Format, Serialized, Toml};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::{Certificate, Identity, ServerTlsConfig};
use quiver_crypto::AeadCodec;
use quiver_embed::{
Database, Descriptor, DistanceMetric, Dtype, FilterableField, IndexSpec, SearchParams,
SnapshotInfo, SparseVector, TEXT_KEY, VectorEncryption, WalEntry, WalOp,
};
use quiver_query::Filter;
pub use auth::{Action, ApiKey, CollectionScope};
pub use error::Error;
pub use otlp::OtlpConfig;
pub use quiver_providers::{
EmbedRegistry, EmbeddingConfig, EmbeddingProvider, ProviderError, ProviderKind, RerankConfig,
RerankProvider,
};
pub use rate_limit::{RateDecision, RateLimitConfig, RateLimitSnapshot, RateLimiter};
use audit::{AuditLog, Outcome};
use auth::Principal;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(default)]
pub struct Limits {
pub max_k: usize,
pub max_ef_search: usize,
pub max_fetch_limit: usize,
pub max_vector_dim: usize,
pub max_payload_bytes: usize,
pub max_batch_size: usize,
pub max_request_body_bytes: usize,
pub max_sparse_terms: usize,
pub max_bulk_batch_size: usize,
}
impl Default for Limits {
fn default() -> Self {
Self {
max_k: 10_000,
max_ef_search: 4_096,
max_fetch_limit: 10_000,
max_vector_dim: 8_192,
max_payload_bytes: 65_536,
max_batch_size: 1_000,
max_request_body_bytes: 32 * 1024 * 1024,
max_sparse_terms: 4_096,
max_bulk_batch_size: 50_000,
}
}
}
impl Limits {
fn apply_env_overrides(&mut self) -> Result<(), Error> {
let slots: [(&str, &mut usize); 9] = [
("QUIVER_MAX_K", &mut self.max_k),
("QUIVER_MAX_EF_SEARCH", &mut self.max_ef_search),
("QUIVER_MAX_FETCH_LIMIT", &mut self.max_fetch_limit),
("QUIVER_MAX_VECTOR_DIM", &mut self.max_vector_dim),
("QUIVER_MAX_PAYLOAD_BYTES", &mut self.max_payload_bytes),
("QUIVER_MAX_BATCH_SIZE", &mut self.max_batch_size),
(
"QUIVER_MAX_REQUEST_BODY_BYTES",
&mut self.max_request_body_bytes,
),
("QUIVER_MAX_SPARSE_TERMS", &mut self.max_sparse_terms),
("QUIVER_MAX_BULK_BATCH_SIZE", &mut self.max_bulk_batch_size),
];
for (key, slot) in slots {
if let Ok(raw) = std::env::var(key) {
*slot = raw.parse().map_err(|_| {
Error::Config(format!("{key} must be a positive integer, got {raw:?}"))
})?;
}
}
Ok(())
}
fn validate(&self) -> Result<(), Error> {
let named = [
("max_k", self.max_k),
("max_ef_search", self.max_ef_search),
("max_fetch_limit", self.max_fetch_limit),
("max_vector_dim", self.max_vector_dim),
("max_payload_bytes", self.max_payload_bytes),
("max_batch_size", self.max_batch_size),
("max_request_body_bytes", self.max_request_body_bytes),
("max_sparse_terms", self.max_sparse_terms),
("max_bulk_batch_size", self.max_bulk_batch_size),
];
if let Some((name, _)) = named.into_iter().find(|&(_, v)| v == 0) {
return Err(Error::Config(format!(
"limits.{name} must be greater than zero"
)));
}
Ok(())
}
fn check_search(&self, k: usize, ef_search: usize) -> Result<(), Error> {
if k > self.max_k {
return Err(Error::BadRequest(format!(
"k ({k}) exceeds the maximum of {} (raise QUIVER_MAX_K)",
self.max_k
)));
}
if ef_search > self.max_ef_search {
return Err(Error::BadRequest(format!(
"ef_search ({ef_search}) exceeds the maximum of {} (raise QUIVER_MAX_EF_SEARCH)",
self.max_ef_search
)));
}
Ok(())
}
fn check_sparse_terms(&self, n: usize) -> Result<(), Error> {
if n > self.max_sparse_terms {
return Err(Error::BadRequest(format!(
"sparse query has {n} terms, exceeding the maximum of {} (raise QUIVER_MAX_SPARSE_TERMS)",
self.max_sparse_terms
)));
}
Ok(())
}
fn check_fetch(&self, limit: usize) -> Result<(), Error> {
if limit > self.max_fetch_limit {
return Err(Error::BadRequest(format!(
"limit ({limit}) exceeds the maximum of {} (raise QUIVER_MAX_FETCH_LIMIT)",
self.max_fetch_limit
)));
}
Ok(())
}
fn check_dim(&self, dim: usize) -> Result<(), Error> {
if dim > self.max_vector_dim {
return Err(Error::BadRequest(format!(
"dimension ({dim}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
self.max_vector_dim
)));
}
Ok(())
}
fn check_vector_len(&self, len: usize) -> Result<(), Error> {
if len > self.max_vector_dim {
return Err(Error::BadRequest(format!(
"vector length ({len}) exceeds the maximum of {} (raise QUIVER_MAX_VECTOR_DIM)",
self.max_vector_dim
)));
}
Ok(())
}
fn check_batch(&self, n: usize) -> Result<(), Error> {
if n > self.max_batch_size {
return Err(Error::BadRequest(format!(
"batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BATCH_SIZE)",
self.max_batch_size
)));
}
Ok(())
}
fn check_bulk_batch(&self, n: usize) -> Result<(), Error> {
if n > self.max_bulk_batch_size {
return Err(Error::BadRequest(format!(
"bulk batch of {n} exceeds the maximum of {} (raise QUIVER_MAX_BULK_BATCH_SIZE)",
self.max_bulk_batch_size
)));
}
Ok(())
}
fn check_payload(&self, payload: &Value) -> Result<(), Error> {
let size = serde_json::to_vec(payload)
.map(|v| v.len())
.map_err(|e| Error::Internal(format!("payload serialization: {e}")))?;
if size > self.max_payload_bytes {
return Err(Error::BadRequest(format!(
"payload of {size} bytes exceeds the maximum of {} (raise QUIVER_MAX_PAYLOAD_BYTES)",
self.max_payload_bytes
)));
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub data_dir: PathBuf,
pub rest_addr: SocketAddr,
pub grpc_addr: SocketAddr,
#[serde(default, deserialize_with = "auth::de_api_keys")]
pub api_keys: Vec<ApiKey>,
pub encryption_key: Option<String>,
pub master_key_file: Option<PathBuf>,
pub tls_cert: Option<PathBuf>,
pub tls_key: Option<PathBuf>,
pub tls_client_ca: Option<PathBuf>,
pub audit_log: Option<PathBuf>,
pub leader_url: Option<String>,
pub leader_api_key: Option<String>,
pub insecure: bool,
pub limits: Limits,
#[serde(default)]
pub embedding: HashMap<String, EmbeddingConfig>,
#[serde(default)]
pub rerank: HashMap<String, RerankConfig>,
#[serde(default)]
pub rate_limit: RateLimitConfig,
#[serde(default)]
pub otlp: OtlpConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
data_dir: PathBuf::from("./quiver-data"),
rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6333),
grpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6334),
api_keys: Vec::new(),
encryption_key: None,
master_key_file: None,
tls_cert: None,
tls_key: None,
tls_client_ca: None,
audit_log: None,
leader_url: None,
leader_api_key: None,
insecure: false,
limits: Limits::default(),
embedding: HashMap::new(),
rerank: HashMap::new(),
rate_limit: RateLimitConfig::default(),
otlp: OtlpConfig::default(),
}
}
}
impl Config {
pub fn load() -> Result<Self, Error> {
let mut config: Config = Figment::from(Serialized::defaults(Config::default()))
.merge(Toml::file("quiver.toml"))
.merge(Env::prefixed("QUIVER_"))
.extract()
.map_err(|e| Error::Config(e.to_string()))?;
config.limits.apply_env_overrides()?;
config
.rate_limit
.apply_env_overrides()
.map_err(Error::Config)?;
config.otlp.apply_env_overrides().map_err(Error::Config)?;
Ok(config)
}
pub fn validate(&self) -> Result<(), Error> {
if self.api_keys.is_empty() && !self.insecure {
return Err(Error::Config(
"no api_keys configured: set QUIVER_API_KEYS (comma-separated) or \
set insecure=true for local development"
.to_owned(),
));
}
let master_key = self.master_key_hex()?;
if master_key.is_none() && !self.insecure {
return Err(Error::Config(
"no encryption key configured: encryption-at-rest is on by default — \
set QUIVER_ENCRYPTION_KEY to a 64-hex-character (256-bit) key (or \
QUIVER_MASTER_KEY_FILE to a file holding it), or set insecure=true to \
store data unencrypted (development only)"
.to_owned(),
));
}
if let Some(key) = &master_key {
AeadCodec::from_hex(key)
.map_err(|e| Error::Config(format!("invalid master key: {e}")))?;
}
if self.tls_cert.is_some() != self.tls_key.is_some() {
return Err(Error::Config(
"tls_cert and tls_key must be set together".to_owned(),
));
}
if self.tls_client_ca.is_some() && !(self.tls_cert.is_some() && self.tls_key.is_some()) {
return Err(Error::Config(
"tls_client_ca (mutual TLS) requires tls_cert and tls_key".to_owned(),
));
}
let tls_enabled = self.tls_cert.is_some() && self.tls_key.is_some();
let non_loopback = !self.rest_addr.ip().is_loopback() || !self.grpc_addr.ip().is_loopback();
if non_loopback && !tls_enabled && !self.insecure {
return Err(Error::Config(
"non-loopback bind requires TLS: set tls_cert and tls_key (PEM files), \
or insecure=true for local development"
.to_owned(),
));
}
self.limits.validate()?;
Ok(())
}
pub(crate) fn master_key_hex(&self) -> Result<Option<String>, Error> {
let env_key = self
.encryption_key
.as_deref()
.map(str::trim)
.filter(|k| !k.is_empty());
match (&self.master_key_file, env_key) {
(Some(_), Some(_)) => Err(Error::Config(
"set either encryption_key (QUIVER_ENCRYPTION_KEY) or master_key_file \
(QUIVER_MASTER_KEY_FILE), not both"
.to_owned(),
)),
(Some(path), None) => {
warn_if_world_readable(path);
let hex = std::fs::read_to_string(path).map_err(|e| {
Error::Config(format!("reading master_key_file {}: {e}", path.display()))
})?;
Ok(Some(hex.trim().to_owned()))
}
(None, Some(key)) => Ok(Some(key.to_owned())),
(None, None) => Ok(None),
}
}
}
#[cfg(unix)]
fn warn_if_world_readable(path: &std::path::Path) {
use std::os::unix::fs::PermissionsExt;
if let Ok(meta) = std::fs::metadata(path)
&& meta.permissions().mode() & 0o077 != 0
{
tracing::warn!(
path = %path.display(),
mode = format!("{:o}", meta.permissions().mode() & 0o777),
"master key file is group/world-accessible; restrict it to 0600"
);
}
}
#[cfg(not(unix))]
fn warn_if_world_readable(_path: &std::path::Path) {}
#[derive(Clone)]
pub(crate) struct AppState {
db: Arc<RwLock<Database>>,
keys: Arc<Vec<ApiKey>>,
audit: Arc<AuditLog>,
replication_tx: broadcast::Sender<WalEntry>,
read_only: bool,
limits: Limits,
embed: Arc<EmbedRegistry>,
rate_limiter: Arc<RateLimiter>,
metrics: Arc<metrics::Metrics>,
rebuilding: Arc<Mutex<HashSet<String>>>,
}
pub(crate) struct CollectionInfo {
pub name: String,
pub dim: u32,
pub metric: DistanceMetric,
pub count: u64,
pub index: IndexSpec,
pub filterable: Vec<FilterableField>,
pub multivector: bool,
pub vector_encryption: VectorEncryption,
}
pub(crate) struct PointIn {
pub id: String,
pub vector: Vec<f32>,
pub payload: Value,
}
pub(crate) struct TextPointIn {
pub id: String,
pub text: String,
pub payload: Value,
}
const RERANK_CANDIDATES: usize = 50;
fn doc_text(payload: Option<&Value>) -> String {
match payload {
Some(Value::Object(map)) => map
.get(TEXT_KEY)
.and_then(Value::as_str)
.map_or_else(|| Value::Object(map.clone()).to_string(), str::to_owned),
Some(v) => v.to_string(),
None => String::new(),
}
}
pub(crate) struct PointOut {
pub id: String,
pub vector: Option<Vec<f32>>,
pub payload: Value,
}
pub(crate) struct MatchOut {
pub id: String,
pub score: f32,
pub payload: Option<Value>,
pub vector: Option<Vec<f32>>,
}
pub(crate) struct DocumentIn {
pub id: String,
pub vectors: Vec<Vec<f32>>,
pub payload: Value,
}
pub(crate) struct DocumentMatchOut {
pub id: String,
pub score: f32,
pub payload: Option<Value>,
pub vectors: Option<Vec<Vec<f32>>>,
}
impl AppState {
pub(crate) fn authenticate(&self, presented: Option<&str>) -> Option<Principal> {
auth::authenticate(&self.keys, presented)
}
pub(crate) fn rate_limit(&self, actor: &str) -> RateDecision {
self.rate_limiter.check(actor)
}
pub(crate) fn rate_limit_enabled(&self) -> bool {
self.rate_limiter.enabled()
}
async fn write_blocking<T, F>(&self, f: F) -> Result<T, Error>
where
T: Send + 'static,
F: FnOnce(&mut Database) -> quiver_embed::Result<T> + Send + 'static,
{
let db = Arc::clone(&self.db);
tokio::task::spawn_blocking(move || -> Result<T, Error> {
let mut guard = db
.write()
.map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
f(&mut guard).map_err(Error::Engine)
})
.await
.map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
}
async fn read_blocking<T, F>(&self, f: F) -> Result<T, Error>
where
T: Send + 'static,
F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
{
let db = Arc::clone(&self.db);
tokio::task::spawn_blocking(move || -> Result<T, Error> {
let guard = db
.read()
.map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
f(&guard).map_err(Error::Engine)
})
.await
.map_err(|e| Error::Internal(format!("blocking task failed: {e}")))?
}
async fn search_blocking<T, F>(&self, collection: String, f: F) -> Result<T, Error>
where
T: Send + 'static,
F: FnOnce(&Database) -> quiver_embed::Result<T> + Send + 'static,
{
let db = Arc::clone(&self.db);
let coll = collection.clone();
let (result, stale) = tokio::task::spawn_blocking(move || -> Result<(T, bool), Error> {
let guard = db
.read()
.map_err(|_| Error::Internal("database lock poisoned".to_owned()))?;
let result = f(&guard).map_err(Error::Engine)?;
let stale = guard.needs_rebuild(&coll).unwrap_or(false);
Ok((result, stale))
})
.await
.map_err(|e| Error::Internal(format!("blocking task failed: {e}")))??;
if stale {
self.schedule_rebuild(collection);
}
Ok(result)
}
fn schedule_rebuild(&self, collection: String) {
{
let mut inflight = match self.rebuilding.lock() {
Ok(g) => g,
Err(_) => return,
};
if !inflight.insert(collection.clone()) {
return; }
}
let state = self.clone();
tokio::spawn(async move {
state.run_rebuild(&collection).await;
if let Ok(mut inflight) = state.rebuilding.lock() {
inflight.remove(&collection);
}
});
}
async fn run_rebuild(&self, collection: &str) {
loop {
let db = Arc::clone(&self.db);
let coll = collection.to_owned();
let inputs = tokio::task::spawn_blocking(move || {
let guard = db.read().ok()?;
guard.snapshot_rebuild_inputs(&coll).ok().flatten()
})
.await
.ok()
.flatten();
let Some(inputs) = inputs else { return };
let Ok(Ok(rebuilt)) = tokio::task::spawn_blocking(move || inputs.build()).await else {
return;
};
let db = Arc::clone(&self.db);
let still_stale = tokio::task::spawn_blocking(move || {
let mut guard = db.write().ok()?;
guard.commit_rebuild(rebuilt).ok()
})
.await
.ok()
.flatten();
match still_stale {
Some(true) => continue, _ => return,
}
}
}
fn authorize(
&self,
principal: &Principal,
action: Action,
op: &str,
resource: &str,
) -> Result<(), Error> {
principal
.require(action, Some(resource))
.inspect_err(|_| self.audit.deny(principal.actor(), op, resource))
}
fn authorize_global(
&self,
principal: &Principal,
action: Action,
op: &str,
) -> Result<(), Error> {
principal
.require(action, None)
.inspect_err(|_| self.audit.deny(principal.actor(), op, "*"))
}
pub(crate) async fn open_replication(
&self,
principal: &Principal,
) -> Result<(Vec<WalOp>, broadcast::Receiver<WalEntry>), Error> {
self.authorize_global(principal, Action::Admin, "replicate")?;
let tx = self.replication_tx.clone();
self.read_blocking(move |db| {
let rx = tx.subscribe();
let snapshot = db.replication_snapshot()?;
Ok((snapshot, rx))
})
.await
}
pub(crate) async fn apply_replicated(&self, op: WalOp) -> Result<(), Error> {
self.write_blocking(move |db| db.apply_replicated(op)).await
}
fn ensure_writable(&self, op: &str) -> Result<(), Error> {
if self.read_only {
return Err(Error::Forbidden(format!(
"{op}: this node is a read-only replication follower"
)));
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_collection(
&self,
principal: &Principal,
name: String,
dim: u32,
metric: DistanceMetric,
index: IndexSpec,
filterable: Vec<FilterableField>,
multivector: bool,
vector_encryption: VectorEncryption,
) -> Result<CollectionInfo, Error> {
self.ensure_writable("create_collection")?;
self.authorize(principal, Action::Admin, "create_collection", &name)?;
self.limits.check_dim(dim as usize)?;
let descriptor = Descriptor::new(dim, Dtype::F32, metric)
.with_index(index)
.with_filterable(filterable.clone())
.with_multivector(multivector)
.with_vector_encryption(vector_encryption);
let owned = name.clone();
let result = self
.write_blocking(move |db| db.create_collection(&owned, descriptor))
.await;
self.audit.record(
principal.actor(),
"create_collection",
&name,
Outcome::of(&result),
);
result?;
Ok(CollectionInfo {
name,
dim,
metric,
count: 0,
index,
filterable,
multivector,
vector_encryption,
})
}
pub(crate) async fn get_collection(
&self,
principal: &Principal,
name: String,
) -> Result<CollectionInfo, Error> {
self.authorize(principal, Action::Read, "get_collection", &name)?;
self.read_blocking(move |db| {
let descriptor = db
.descriptor(&name)
.cloned()
.ok_or_else(|| quiver_embed::Error::CollectionNotFound(name.clone()))?;
let count = if descriptor.multivector {
db.document_count(&name)? as u64
} else {
db.len(&name)? as u64
};
Ok(CollectionInfo {
name,
dim: descriptor.dim,
metric: descriptor.metric,
count,
index: descriptor.index,
filterable: descriptor.filterable,
multivector: descriptor.multivector,
vector_encryption: descriptor.vector_encryption,
})
})
.await
}
pub(crate) async fn list_collections(
&self,
principal: &Principal,
) -> Result<Vec<CollectionInfo>, Error> {
self.authorize_global(principal, Action::Read, "list_collections")?;
let mut infos = self
.read_blocking(|db| {
let mut out = Vec::new();
for name in db.collection_names() {
if let Some(descriptor) = db.descriptor(&name).cloned() {
let count = if descriptor.multivector {
db.document_count(&name)? as u64
} else {
db.len(&name)? as u64
};
out.push(CollectionInfo {
name,
dim: descriptor.dim,
metric: descriptor.metric,
count,
index: descriptor.index,
filterable: descriptor.filterable,
multivector: descriptor.multivector,
vector_encryption: descriptor.vector_encryption,
});
}
}
Ok(out)
})
.await?;
infos.retain(|info| principal.can_see(&info.name));
Ok(infos)
}
pub(crate) async fn delete_collection(
&self,
principal: &Principal,
name: String,
) -> Result<bool, Error> {
self.ensure_writable("delete_collection")?;
self.authorize(principal, Action::Admin, "delete_collection", &name)?;
let resource = name.clone();
let result = self
.write_blocking(move |db| db.drop_collection(&name))
.await;
self.audit.record(
principal.actor(),
"delete_collection",
&resource,
Outcome::of(&result),
);
result
}
#[tracing::instrument(skip_all, fields(collection = %collection, points = points.len()))]
pub(crate) async fn upsert(
&self,
principal: &Principal,
collection: String,
points: Vec<PointIn>,
) -> Result<u64, Error> {
self.ensure_writable("upsert")?;
self.authorize(principal, Action::Write, "upsert", &collection)?;
self.limits.check_batch(points.len())?;
for p in &points {
self.limits.check_vector_len(p.vector.len())?;
self.limits.check_payload(&p.payload)?;
}
let resource = collection.clone();
let result = self
.write_blocking(move |db| {
let records: Vec<(&str, &[f32], &serde_json::Value)> = points
.iter()
.map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
.collect();
db.upsert_batch(&collection, &records)
})
.await;
self.audit
.record(principal.actor(), "upsert", &resource, Outcome::of(&result));
result
}
pub(crate) async fn upsert_bulk(
&self,
principal: &Principal,
collection: String,
points: Vec<PointIn>,
) -> Result<u64, Error> {
self.ensure_writable("upsert")?;
self.authorize(principal, Action::Write, "upsert", &collection)?;
self.limits.check_bulk_batch(points.len())?;
for p in &points {
self.limits.check_vector_len(p.vector.len())?;
self.limits.check_payload(&p.payload)?;
}
let resource = collection.clone();
let result = self
.write_blocking(move |db| {
let records: Vec<(&str, &[f32], &serde_json::Value)> = points
.iter()
.map(|p| (p.id.as_str(), p.vector.as_slice(), &p.payload))
.collect();
db.upsert_bulk(&collection, &records)
})
.await;
self.audit.record(
principal.actor(),
"upsert_bulk",
&resource,
Outcome::of(&result),
);
result
}
#[tracing::instrument(skip_all)]
pub(crate) async fn snapshot(
&self,
principal: &Principal,
destination: String,
) -> Result<SnapshotInfo, Error> {
self.ensure_writable("snapshot")?;
self.authorize_global(principal, Action::Admin, "snapshot")?;
let dest = std::path::PathBuf::from(&destination);
let result = self.write_blocking(move |db| db.snapshot(&dest)).await;
self.audit.record(
principal.actor(),
"snapshot",
&destination,
Outcome::of(&result),
);
result
}
pub(crate) async fn delete_points(
&self,
principal: &Principal,
collection: String,
ids: Vec<String>,
) -> Result<u64, Error> {
self.ensure_writable("delete_points")?;
self.authorize(principal, Action::Write, "delete_points", &collection)?;
let resource = collection.clone();
let result = self
.write_blocking(move |db| {
let mut count = 0u64;
for id in &ids {
if db.delete(&collection, id)? {
count += 1;
}
}
Ok(count)
})
.await;
self.audit.record(
principal.actor(),
"delete_points",
&resource,
Outcome::of(&result),
);
result
}
pub(crate) async fn get_points(
&self,
principal: &Principal,
collection: String,
ids: Vec<String>,
with_vector: bool,
) -> Result<Vec<PointOut>, Error> {
self.authorize(principal, Action::Read, "get_points", &collection)?;
self.read_blocking(move |db| {
let mut out = Vec::new();
for id in &ids {
if let Some(m) = db.get(&collection, id)? {
out.push(PointOut {
id: m.id,
vector: if with_vector { m.vector } else { None },
payload: m.payload.unwrap_or(Value::Null),
});
}
}
Ok(out)
})
.await
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip_all, fields(collection = %collection, k, filtered = filter.is_some()))]
pub(crate) async fn search(
&self,
principal: &Principal,
collection: String,
vector: Vec<f32>,
k: usize,
filter: Option<Filter>,
ef_search: usize,
with_payload: bool,
with_vector: bool,
) -> Result<Vec<MatchOut>, Error> {
self.authorize(principal, Action::Read, "search", &collection)?;
self.limits.check_search(k, ef_search)?;
self.limits.check_vector_len(vector.len())?;
let params = SearchParams {
k,
filter,
ef_search,
with_payload,
with_vector,
};
let coll = collection.clone();
self.search_blocking(coll, move |db| {
let matches = db.search_snapshot(&collection, &vector, ¶ms)?;
Ok(matches
.into_iter()
.map(|m| MatchOut {
id: m.id,
score: m.score,
payload: m.payload,
vector: m.vector,
})
.collect())
})
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn hybrid_search(
&self,
principal: &Principal,
collection: String,
dense: Option<Vec<f32>>,
sparse: Option<(Vec<u32>, Vec<f32>)>,
text: Option<String>,
k: usize,
filter: Option<Filter>,
ef_search: usize,
rrf_k0: f32,
with_payload: bool,
with_vector: bool,
) -> Result<Vec<MatchOut>, Error> {
self.authorize(principal, Action::Read, "hybrid_search", &collection)?;
self.limits.check_search(k, ef_search)?;
if let Some(v) = &dense {
self.limits.check_vector_len(v.len())?;
}
if let Some((indices, values)) = &sparse {
self.limits.check_sparse_terms(indices.len())?;
if indices.len() != values.len() {
return Err(Error::BadRequest(format!(
"sparse query indices ({}) and values ({}) length mismatch",
indices.len(),
values.len()
)));
}
}
let params = SearchParams {
k,
filter,
ef_search,
with_payload,
with_vector,
};
let sv = sparse.map(|(indices, values)| SparseVector { indices, values });
let coll = collection.clone();
self.search_blocking(coll, move |db| {
let matches = db.hybrid_search_snapshot(
&collection,
dense.as_deref(),
sv.as_ref(),
text.as_deref(),
¶ms,
rrf_k0,
)?;
Ok(matches
.into_iter()
.map(|m| MatchOut {
id: m.id,
score: m.score,
payload: m.payload,
vector: m.vector,
})
.collect())
})
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn search_text(
&self,
principal: &Principal,
collection: String,
text: String,
k: usize,
filter: Option<Filter>,
ef_search: usize,
rrf_k0: f32,
with_payload: bool,
with_vector: bool,
rerank: bool,
) -> Result<Vec<MatchOut>, Error> {
self.authorize(principal, Action::Read, "search_text", &collection)?;
self.limits.check_search(k, ef_search)?;
let embedder = self.embed.embedder(&collection).ok_or_else(|| {
Error::BadRequest(format!(
"collection {collection:?} has no embedding provider configured \
(set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
))
})?;
let query = text.clone();
let vector = tokio::task::spawn_blocking(move || embedder.embed(&[query]))
.await
.map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
.map_err(|e| Error::Upstream(e.to_string()))?
.into_iter()
.next()
.ok_or_else(|| Error::Upstream("embedding provider returned no vector".to_owned()))?;
self.limits.check_vector_len(vector.len())?;
let reranker = if rerank {
self.embed.reranker(&collection)
} else {
None
};
let need_payload = with_payload || reranker.is_some();
let fetch_k = if reranker.is_some() {
k.max(RERANK_CANDIDATES)
} else {
k
};
let mut hits = self
.hybrid_search(
principal,
collection,
Some(vector),
None,
Some(text.clone()),
fetch_k,
filter,
ef_search,
rrf_k0,
need_payload,
with_vector,
)
.await?;
if let Some(rr) = reranker {
let docs: Vec<String> = hits.iter().map(|h| doc_text(h.payload.as_ref())).collect();
let query = text;
let scores = tokio::task::spawn_blocking(move || rr.rerank(&query, &docs))
.await
.map_err(|e| Error::Internal(format!("rerank task failed: {e}")))?
.map_err(|e| Error::Upstream(e.to_string()))?;
let mut scored: Vec<(f32, MatchOut)> = scores
.into_iter()
.zip(hits)
.map(|(s, mut h)| {
h.score = s;
(s, h)
})
.collect();
scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
hits = scored.into_iter().map(|(_, h)| h).collect();
}
hits.truncate(k);
if !with_payload {
for h in &mut hits {
h.payload = None;
}
}
Ok(hits)
}
pub(crate) async fn upsert_text(
&self,
principal: &Principal,
collection: String,
points: Vec<TextPointIn>,
) -> Result<u64, Error> {
self.ensure_writable("upsert_text")?;
self.authorize(principal, Action::Write, "upsert_text", &collection)?;
self.limits.check_batch(points.len())?;
for p in &points {
if !matches!(p.payload, Value::Object(_) | Value::Null) {
return Err(Error::BadRequest(
"upsert_text payload must be a JSON object or null".to_owned(),
));
}
}
let embedder = self.embed.embedder(&collection).ok_or_else(|| {
Error::BadRequest(format!(
"collection {collection:?} has no embedding provider configured \
(set an [embedding.{collection}] table in quiver.toml — ADR-0047)"
))
})?;
let texts: Vec<String> = points.iter().map(|p| p.text.clone()).collect();
let vectors = tokio::task::spawn_blocking(move || embedder.embed(&texts))
.await
.map_err(|e| Error::Internal(format!("embedding task failed: {e}")))?
.map_err(|e| Error::Upstream(e.to_string()))?;
if vectors.len() != points.len() {
return Err(Error::Upstream(format!(
"embedding provider returned {} vectors for {} inputs",
vectors.len(),
points.len()
)));
}
let dense: Vec<PointIn> = points
.into_iter()
.zip(vectors)
.map(|(p, vector)| {
let mut payload = match p.payload {
Value::Object(map) => map,
_ => serde_json::Map::new(),
};
payload
.entry(TEXT_KEY.to_owned())
.or_insert_with(|| Value::String(p.text.clone()));
PointIn {
id: p.id,
vector,
payload: Value::Object(payload),
}
})
.collect();
self.upsert(principal, collection, dense).await
}
pub(crate) async fn fetch(
&self,
principal: &Principal,
collection: String,
filter: Option<Filter>,
limit: usize,
with_payload: bool,
with_vector: bool,
) -> Result<Vec<MatchOut>, Error> {
self.authorize(principal, Action::Read, "fetch", &collection)?;
self.limits.check_fetch(limit)?;
self.read_blocking(move |db| {
let matches = db.fetch(
&collection,
filter.as_ref(),
limit,
with_payload,
with_vector,
)?;
Ok(matches
.into_iter()
.map(|m| MatchOut {
id: m.id,
score: m.score,
payload: m.payload,
vector: m.vector,
})
.collect())
})
.await
}
pub(crate) async fn upsert_documents(
&self,
principal: &Principal,
collection: String,
documents: Vec<DocumentIn>,
) -> Result<u64, Error> {
self.ensure_writable("upsert_documents")?;
self.authorize(principal, Action::Write, "upsert_documents", &collection)?;
self.limits.check_batch(documents.len())?;
for doc in &documents {
self.limits.check_payload(&doc.payload)?;
for token in &doc.vectors {
self.limits.check_vector_len(token.len())?;
}
}
let resource = collection.clone();
let result = self
.write_blocking(move |db| {
let mut count = 0u64;
for doc in &documents {
db.upsert_document(&collection, &doc.id, &doc.vectors, &doc.payload)?;
count += 1;
}
Ok(count)
})
.await;
self.audit.record(
principal.actor(),
"upsert_documents",
&resource,
Outcome::of(&result),
);
result
}
pub(crate) async fn delete_documents(
&self,
principal: &Principal,
collection: String,
ids: Vec<String>,
) -> Result<u64, Error> {
self.ensure_writable("delete_documents")?;
self.authorize(principal, Action::Write, "delete_documents", &collection)?;
let resource = collection.clone();
let result = self
.write_blocking(move |db| {
let mut count = 0u64;
for id in &ids {
if db.delete_document(&collection, id)? {
count += 1;
}
}
Ok(count)
})
.await;
self.audit.record(
principal.actor(),
"delete_documents",
&resource,
Outcome::of(&result),
);
result
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn search_multi_vector(
&self,
principal: &Principal,
collection: String,
query: Vec<Vec<f32>>,
k: usize,
filter: Option<Filter>,
ef_search: usize,
with_payload: bool,
with_vector: bool,
) -> Result<Vec<DocumentMatchOut>, Error> {
self.authorize(principal, Action::Read, "search_multi_vector", &collection)?;
self.limits.check_search(k, ef_search)?;
for token in &query {
self.limits.check_vector_len(token.len())?;
}
let params = SearchParams {
k,
filter,
ef_search,
with_payload,
with_vector,
};
let coll = collection.clone();
self.search_blocking(coll, move |db| {
let matches = db.search_multi_vector_snapshot(&collection, &query, ¶ms)?;
Ok(matches
.into_iter()
.map(|m| DocumentMatchOut {
id: m.id,
score: m.score,
payload: m.payload,
vectors: m.vectors,
})
.collect())
})
.await
}
}
const REPLICATION_BUFFER: usize = 1024;
pub async fn run(config: Config) -> Result<(), Error> {
config.validate()?;
let rest_listener = TcpListener::bind(config.rest_addr)
.await
.map_err(Error::Io)?;
let grpc_listener = TcpListener::bind(config.grpc_addr)
.await
.map_err(Error::Io)?;
tracing::info!(rest = %config.rest_addr, grpc = %config.grpc_addr, "quiver listening");
tokio::select! {
result = serve(config, rest_listener, grpc_listener) => result,
() = shutdown_signal() => {
tracing::info!("shutdown signal received");
Ok(())
}
}
}
pub async fn serve(
config: Config,
rest_listener: TcpListener,
grpc_listener: TcpListener,
) -> Result<(), Error> {
let mut db = open_database(&config)?;
let audit = Arc::new(AuditLog::open(config.audit_log.as_deref())?);
let (replication_tx, _) = broadcast::channel(REPLICATION_BUFFER);
{
let tx = replication_tx.clone();
db.set_commit_observer(Arc::new(move |entry: &WalEntry| {
let _ = tx.send(entry.clone());
}));
}
let embed = EmbedRegistry::from_config(&config.embedding, &config.rerank)
.map_err(|e| Error::Config(e.to_string()))?;
let state = AppState {
db: Arc::new(RwLock::new(db)),
keys: Arc::new(config.api_keys.clone()),
audit,
replication_tx,
read_only: config.leader_url.is_some(),
limits: config.limits,
embed: Arc::new(embed),
rate_limiter: Arc::new(RateLimiter::new(config.rate_limit)),
metrics: Arc::new(metrics::Metrics::default()),
rebuilding: Arc::new(Mutex::new(HashSet::new())),
};
if let Some(leader_url) = config.leader_url.clone() {
replication::spawn_follower(state.clone(), leader_url, config.leader_api_key.clone());
}
let app = rest::router(state.clone());
let grpc = grpc::service(state);
let tls = load_tls(&config)?;
let rest_fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> = match &tls {
Some(material) => {
let rustls_config = RustlsConfig::from_config(Arc::clone(&material.rest_config));
let std_listener = rest_listener.into_std().map_err(Error::Io)?;
let server =
axum_server::from_tcp_rustls(std_listener, rustls_config).map_err(Error::Io)?;
Box::pin(async move {
server
.serve(app.into_make_service())
.await
.map_err(Error::Io)
})
}
None => Box::pin(async move { axum::serve(rest_listener, app).await.map_err(Error::Io) }),
};
let mut grpc_builder = tonic::transport::Server::builder();
if let Some(material) = &tls {
let identity = Identity::from_pem(&material.cert_pem, &material.key_pem);
let mut tls_config = ServerTlsConfig::new().identity(identity);
if let Some(ca_pem) = &material.client_ca_pem {
tls_config = tls_config.client_ca_root(Certificate::from_pem(ca_pem));
}
grpc_builder = grpc_builder
.tls_config(tls_config)
.map_err(|e| Error::Internal(format!("grpc tls config: {e}")))?;
}
let grpc_fut = async move {
grpc_builder
.add_service(grpc)
.serve_with_incoming(TcpListenerStream::new(grpc_listener))
.await
.map_err(|e| Error::Internal(format!("grpc server: {e}")))
};
tokio::try_join!(rest_fut, grpc_fut)?;
Ok(())
}
async fn shutdown_signal() {
let _ = tokio::signal::ctrl_c().await;
}
fn open_database(config: &Config) -> Result<Database, Error> {
let master_key = config.master_key_hex()?;
let keyring =
quiver_crypto::open_keyring(&config.data_dir, master_key.as_deref(), config.insecure)
.map_err(|e| Error::Config(e.to_string()))?;
let db = match keyring {
Some(keyring) => Database::open_with_keyring(&config.data_dir, keyring)?,
None => Database::open(&config.data_dir)?,
};
Ok(db)
}
struct TlsMaterial {
cert_pem: Vec<u8>,
key_pem: Vec<u8>,
client_ca_pem: Option<Vec<u8>>,
rest_config: Arc<rustls::ServerConfig>,
}
fn load_tls(config: &Config) -> Result<Option<TlsMaterial>, Error> {
match (&config.tls_cert, &config.tls_key) {
(Some(cert_path), Some(key_path)) => {
let cert_pem = std::fs::read(cert_path).map_err(Error::Io)?;
let key_pem = std::fs::read(key_path).map_err(Error::Io)?;
let client_ca_pem = config
.tls_client_ca
.as_ref()
.map(std::fs::read)
.transpose()
.map_err(Error::Io)?;
let rest_config = Arc::new(rustls_server_config(
&cert_pem,
&key_pem,
client_ca_pem.as_deref(),
)?);
Ok(Some(TlsMaterial {
cert_pem,
key_pem,
client_ca_pem,
rest_config,
}))
}
(None, None) => Ok(None),
_ => Err(Error::Config(
"tls_cert and tls_key must be set together".to_owned(),
)),
}
}
fn rustls_server_config(
cert_pem: &[u8],
key_pem: &[u8],
client_ca_pem: Option<&[u8]>,
) -> Result<rustls::ServerConfig, Error> {
use rustls_pki_types::pem::PemObject;
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
let certs = CertificateDer::pem_slice_iter(cert_pem)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| Error::Config(format!("parsing tls_cert: {e}")))?;
if certs.is_empty() {
return Err(Error::Config(
"tls_cert contains no certificates".to_owned(),
));
}
let key = PrivateKeyDer::from_pem_slice(key_pem)
.map_err(|e| Error::Config(format!("parsing tls_key: {e}")))?;
let provider = Arc::new(rustls::crypto::ring::default_provider());
let builder = rustls::ServerConfig::builder_with_provider(Arc::clone(&provider))
.with_safe_default_protocol_versions()
.map_err(|e| Error::Internal(format!("tls protocol versions: {e}")))?;
let builder = match client_ca_pem {
Some(ca_pem) => {
let mut roots = rustls::RootCertStore::empty();
for cert in CertificateDer::pem_slice_iter(ca_pem) {
let cert =
cert.map_err(|e| Error::Config(format!("parsing tls_client_ca: {e}")))?;
roots
.add(cert)
.map_err(|e| Error::Config(format!("adding tls_client_ca: {e}")))?;
}
let verifier = rustls::server::WebPkiClientVerifier::builder_with_provider(
Arc::new(roots),
provider,
)
.build()
.map_err(|e| Error::Config(format!("client certificate verifier: {e}")))?;
builder.with_client_cert_verifier(verifier)
}
None => builder.with_no_client_auth(),
};
builder
.with_single_cert(certs, key)
.map_err(|e| Error::Config(format!("tls certificate/key: {e}")))
}
pub fn init_tracing() {
init_observability(&Config::default());
}
#[cfg_attr(not(feature = "otlp"), allow(unused_variables))]
pub fn init_observability(config: &Config) {
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let registry = tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer());
#[cfg(feature = "otlp")]
if config.otlp.is_enabled() {
match otlp::build_provider(&config.otlp) {
Ok(provider) => {
use opentelemetry::trace::TracerProvider as _;
let tracer = provider.tracer("quiver");
otlp::store_provider(provider);
let _ = registry
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init();
return;
}
Err(e) => eprintln!("OTLP traces export disabled: {e}"),
}
}
let _ = registry.try_init();
}
pub fn shutdown_observability() {
#[cfg(feature = "otlp")]
otlp::shutdown();
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_KEY: &str = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff";
#[test]
fn config_rejects_missing_keys_unless_insecure() {
let mut config = Config::default();
assert!(config.validate().is_err());
config.insecure = true;
assert!(config.validate().is_ok());
config.insecure = false;
config.api_keys = vec!["secret".into()];
config.encryption_key = Some(TEST_KEY.to_owned());
assert!(config.validate().is_ok());
}
#[test]
fn config_requires_encryption_key_unless_insecure() {
let mut config = Config {
api_keys: vec!["secret".into()],
..Config::default()
};
assert!(config.validate().is_err());
config.encryption_key = Some(TEST_KEY.to_owned());
assert!(config.validate().is_ok());
config.encryption_key = Some("not-a-valid-hex-key".to_owned());
assert!(config.validate().is_err());
config.insecure = true;
config.encryption_key = None;
assert!(config.validate().is_ok());
}
#[test]
fn master_key_file_is_an_alternative_to_the_env_key() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("master.key");
std::fs::write(&path, format!("{TEST_KEY}\n")).unwrap();
let mut config = Config {
api_keys: vec!["secret".into()],
master_key_file: Some(path.clone()),
..Config::default()
};
assert!(config.validate().is_ok());
assert_eq!(config.master_key_hex().unwrap().as_deref(), Some(TEST_KEY));
config.encryption_key = Some(TEST_KEY.to_owned());
assert!(config.validate().is_err());
config.encryption_key = None;
std::fs::write(&path, "not-a-valid-key").unwrap();
assert!(config.validate().is_err());
}
#[test]
fn config_rejects_public_bind_without_optout() {
let mut config = Config {
api_keys: vec!["secret".into()],
encryption_key: Some(TEST_KEY.to_owned()),
..Config::default()
};
config.rest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333);
assert!(config.validate().is_err());
config.insecure = true;
assert!(config.validate().is_ok());
}
#[test]
fn config_public_bind_allowed_with_tls() {
let config = Config {
api_keys: vec!["secret".into()],
encryption_key: Some(TEST_KEY.to_owned()),
tls_cert: Some(PathBuf::from("cert.pem")),
tls_key: Some(PathBuf::from("key.pem")),
rest_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6333),
..Config::default()
};
assert!(config.validate().is_ok());
}
#[test]
fn config_tls_cert_and_key_must_pair() {
let mut config = Config {
api_keys: vec!["secret".into()],
encryption_key: Some(TEST_KEY.to_owned()),
tls_cert: Some(PathBuf::from("cert.pem")),
..Config::default()
};
assert!(config.validate().is_err());
config.tls_key = Some(PathBuf::from("key.pem"));
assert!(config.validate().is_ok());
}
}