use crate::column_store::{AutoVacuumConfig, ColumnStore, ColumnType, ColumnValue, VacuumConfig};
use crate::point::Point;
use crate::storage::{PayloadStorage, VectorStorage};
use parking_lot::RwLock;
use roaring::RoaringBitmap;
use rustc_hash::{FxHashMap, FxHashSet};
use std::sync::atomic::{AtomicU64, Ordering};
mod translate;
#[cfg(test)]
mod mirror_tests;
#[cfg(test)]
mod translate_tests;
pub(crate) const MIRROR_MIN_ROWS: usize = 256;
const MAX_MIRROR_COLUMNS: usize = 64;
const BLOAT_MIN_ROWS: usize = 4096;
#[derive(Default)]
pub(crate) struct PayloadMirror {
state: RwLock<Option<MirrorState>>,
scan_debt: AtomicU64,
}
pub(crate) enum MirrorAnswer {
Ids(Vec<u64>),
Unsupported,
NotBuilt,
}
#[derive(Default)]
pub(super) struct MirrorState {
pub(super) store: ColumnStore,
pub(super) row_ids: Vec<u64>,
pub(super) id_rows: FxHashMap<u64, u32>,
pub(super) live: RoaringBitmap,
pub(super) uncolumnized: FxHashSet<String>,
}
impl MirrorState {
pub(super) fn upsert_row(&mut self, id: u64, payload: Option<&serde_json::Value>) -> bool {
self.tombstone(id);
let Ok(row_idx) = u32::try_from(self.store.row_count()) else {
return false;
};
let cells = self.collect_cells(payload);
let cell_refs: Vec<(&str, ColumnValue)> = cells
.iter()
.map(|(name, value)| (name.as_str(), value.clone()))
.collect();
self.store.push_row_unchecked(&cell_refs);
self.row_ids.push(id);
self.id_rows.insert(id, row_idx);
self.live.insert(row_idx);
true
}
pub(super) fn tombstone(&mut self, id: u64) {
if let Some(row_idx) = self.id_rows.remove(&id) {
self.store.tombstone_row(row_idx as usize);
self.live.remove(row_idx);
}
}
fn collect_cells(&mut self, payload: Option<&serde_json::Value>) -> Vec<(String, ColumnValue)> {
let Some(serde_json::Value::Object(map)) = payload else {
return Vec::new();
};
let mut cells = Vec::with_capacity(map.len());
for (key, value) in map {
if key.contains('.') {
continue;
}
let Some((col_type, cell)) = self.scalar_cell(value) else {
continue;
};
if self.ensure_column(key, &col_type) {
cells.push((key.clone(), cell));
}
}
cells
}
fn scalar_cell(&mut self, value: &serde_json::Value) -> Option<(ColumnType, ColumnValue)> {
match value {
serde_json::Value::Number(n) => n
.as_f64()
.map(|f| (ColumnType::Float, ColumnValue::Float(f))),
serde_json::Value::String(s) => Some((
ColumnType::String,
ColumnValue::String(self.store.string_table_mut().intern(s)),
)),
serde_json::Value::Bool(b) => Some((ColumnType::Bool, ColumnValue::Bool(*b))),
_ => None,
}
}
fn ensure_column(&mut self, key: &str, col_type: &ColumnType) -> bool {
if self.store.get_column(key).is_some() {
return true;
}
if self.uncolumnized.contains(key) {
return false;
}
if self.store.column_names().count() >= MAX_MIRROR_COLUMNS {
self.uncolumnized.insert(key.to_string());
return false;
}
self.store.add_column_backfilled(key, col_type);
true
}
fn is_bloated(&self) -> bool {
let total = self.store.row_count();
total > BLOAT_MIN_ROWS && self.live.len().saturating_mul(2) < total as u64
}
fn auto_vacuum_if_due(&mut self, config: &AutoVacuumConfig) {
if config.should_trigger(self.store.row_count(), self.store.deleted_row_count()) {
self.vacuum_compact();
}
}
fn vacuum_compact(&mut self) {
self.store.vacuum(VacuumConfig::default());
let old_row_ids = std::mem::take(&mut self.row_ids);
let old_live = std::mem::take(&mut self.live);
self.id_rows.clear();
for (new_idx, old_idx) in old_live.iter().enumerate() {
let (Some(&id), Ok(idx32)) =
(old_row_ids.get(old_idx as usize), u32::try_from(new_idx))
else {
break;
};
self.row_ids.push(id);
self.id_rows.insert(id, idx32);
self.live.insert(idx32);
}
}
}
impl PayloadMirror {
pub(crate) fn invalidate(&self) {
*self.state.write() = None;
}
pub(crate) fn add_scan_debt(&self, rows: u64) {
self.scan_debt.fetch_add(rows, Ordering::Relaxed);
}
pub(crate) fn scan_debt(&self) -> u64 {
self.scan_debt.load(Ordering::Relaxed)
}
pub(crate) fn apply_upserts(&self, points: &[Point]) {
let mut guard = self.state.write();
let mut healthy = true;
if let Some(state) = guard.as_mut() {
for point in points {
if !state.upsert_row(point.id, point.payload.as_ref()) {
healthy = false;
break;
}
}
healthy = healthy && !state.is_bloated();
} else {
return;
}
if !healthy {
*guard = None;
}
}
pub(crate) fn apply_deletes(&self, ids: &[u64]) {
let mut guard = self.state.write();
if let Some(state) = guard.as_mut() {
for &id in ids {
state.tombstone(id);
}
state.auto_vacuum_if_due(&AutoVacuumConfig::default());
}
}
pub(crate) fn candidate_ids(&self, condition: &crate::filter::Condition) -> MirrorAnswer {
let guard = self.state.read();
let Some(state) = guard.as_ref() else {
return MirrorAnswer::NotBuilt;
};
match translate::condition_bitmap(state, condition) {
Some(eval) => MirrorAnswer::Ids(
eval.bits
.iter()
.filter_map(|row_idx| state.row_ids.get(row_idx as usize).copied())
.collect(),
),
None => MirrorAnswer::Unsupported,
}
}
}
impl crate::collection::types::Collection {
pub(crate) fn mirror_candidate_ids(
&self,
condition: &crate::filter::Condition,
) -> Option<Vec<u64>> {
match self.payload_mirror.candidate_ids(condition) {
MirrorAnswer::Ids(ids) => return Some(ids),
MirrorAnswer::Unsupported => return None,
MirrorAnswer::NotBuilt => {}
}
if !self.mirror_build_due() {
return None;
}
self.build_payload_mirror();
match self.payload_mirror.candidate_ids(condition) {
MirrorAnswer::Ids(ids) => Some(ids),
MirrorAnswer::Unsupported | MirrorAnswer::NotBuilt => None,
}
}
fn mirror_build_due(&self) -> bool {
let rows = self.config.read().point_count;
rows >= MIRROR_MIN_ROWS && self.payload_mirror.scan_debt() >= rows as u64
}
pub(crate) fn build_payload_mirror(&self) {
let mut guard = self.payload_mirror.state.write();
if guard.is_some() {
return; }
let vector_ids = {
let vectors = self.vector_storage.read();
vectors.ids()
};
let payload_storage = self.payload_storage.read();
let mut state = MirrorState::default();
let mut seen: FxHashSet<u64> = FxHashSet::default();
for id in vector_ids.into_iter().chain(payload_storage.ids()) {
if !seen.insert(id) {
continue;
}
let payload = payload_storage.retrieve(id).ok().flatten();
if !state.upsert_row(id, payload.as_ref()) {
return; }
}
drop(payload_storage);
self.payload_mirror.scan_debt.store(0, Ordering::Relaxed);
*guard = Some(state);
}
}