use anda_db_utils::{UniqueVec, estimate_cbor_size};
use dashmap::DashMap;
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::json;
use std::{
collections::BTreeSet,
fmt::Debug,
hash::Hash,
io::{Read, Write},
sync::atomic::{AtomicU32, AtomicU64, Ordering},
};
use crate::{BTreeError, BoxError};
pub struct BTreeIndex<PK, FV>
where
PK: Ord + Debug + Clone + Serialize + DeserializeOwned,
FV: Eq + Ord + Hash + Debug + Clone + Serialize + DeserializeOwned,
{
name: String,
config: BTreeConfig,
buckets: DashMap<u32, (usize, bool, UniqueVec<FV>, u64)>,
postings: DashMap<FV, PostingValue<PK>>,
btree: RwLock<BTreeSet<FV>>,
metadata: RwLock<BTreeMetadata>,
max_bucket_id: AtomicU32,
query_count: AtomicU64,
last_saved_version: AtomicU64,
dirty_bucket_count: AtomicU32,
}
type PostingValue<PK> = (u32, u64, UniqueVec<PK>);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BTreeConfig {
pub bucket_overload_size: usize,
pub allow_duplicates: bool,
}
impl Default for BTreeConfig {
fn default() -> Self {
BTreeConfig {
bucket_overload_size: 1024 * 512,
allow_duplicates: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BTreeMetadata {
pub name: String,
pub config: BTreeConfig,
pub stats: BTreeStats,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct BTreeStats {
pub last_inserted: u64,
pub last_deleted: u64,
pub last_saved: u64,
pub version: u64,
pub num_elements: u64,
pub query_count: u64,
pub insert_count: u64,
pub delete_count: u64,
pub max_bucket_id: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BTreeIndexOwned {
metadata: BTreeMetadata,
}
#[derive(Serialize)]
struct BTreeIndexRef<'a> {
metadata: &'a BTreeMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(
serialize = "PK: Serialize, FV: Serialize",
deserialize = "PK: DeserializeOwned, FV: DeserializeOwned"
))]
struct BucketOwned<PK, FV>
where
PK: Eq + Ord + Hash + Clone,
FV: Eq + Ord + Hash + Clone,
{
#[serde(rename = "p")]
postings: FxHashMap<FV, PostingValue<PK>>,
}
#[derive(Serialize)]
struct BucketRef<'a, PK, FV>
where
PK: Eq + Ord + Hash + Clone + Serialize,
FV: Eq + Ord + Hash + Clone + Serialize,
{
#[serde(rename = "p")]
postings: &'a FxHashMap<&'a FV, dashmap::mapref::one::Ref<'a, FV, PostingValue<PK>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RangeQuery<FV> {
Eq(FV),
Gt(FV),
Ge(FV),
Lt(FV),
Le(FV),
Between(FV, FV),
Include(Vec<FV>),
Or(Vec<Box<RangeQuery<FV>>>),
And(Vec<Box<RangeQuery<FV>>>),
Not(Box<RangeQuery<FV>>),
}
impl<FV> RangeQuery<FV> {
pub fn try_convert_from<FV1>(value: RangeQuery<FV1>) -> Result<Self, BoxError>
where
FV: Ord,
FV: TryFrom<FV1, Error = BoxError>,
{
match value {
RangeQuery::Eq(key) => Ok(RangeQuery::Eq(key.try_into()?)),
RangeQuery::Gt(key) => Ok(RangeQuery::Gt(key.try_into()?)),
RangeQuery::Ge(key) => Ok(RangeQuery::Ge(key.try_into()?)),
RangeQuery::Lt(key) => Ok(RangeQuery::Lt(key.try_into()?)),
RangeQuery::Le(key) => Ok(RangeQuery::Le(key.try_into()?)),
RangeQuery::Between(start_key, end_key) => Ok(RangeQuery::Between(
start_key.try_into()?,
end_key.try_into()?,
)),
RangeQuery::Include(keys) => {
let converted_keys = keys
.into_iter()
.map(|key| key.try_into())
.collect::<Result<Vec<FV>, _>>()?;
Ok(RangeQuery::Include(converted_keys))
}
RangeQuery::And(queries) => {
let converted_queries = queries
.into_iter()
.map(|query| RangeQuery::try_convert_from(*query))
.collect::<Result<Vec<_>, _>>()?;
Ok(RangeQuery::And(
converted_queries.into_iter().map(Box::new).collect(),
))
}
RangeQuery::Or(queries) => {
let converted_queries = queries
.into_iter()
.map(|query| RangeQuery::try_convert_from(*query))
.collect::<Result<Vec<_>, _>>()?;
Ok(RangeQuery::Or(
converted_queries.into_iter().map(Box::new).collect(),
))
}
RangeQuery::Not(query) => {
let converted_query = RangeQuery::try_convert_from(*query)?;
Ok(RangeQuery::Not(Box::new(converted_query)))
}
}
}
}
impl<PK, FV> BTreeIndex<PK, FV>
where
PK: Ord + Eq + Hash + Debug + Clone + Serialize + DeserializeOwned,
FV: Ord + Eq + Hash + Debug + Clone + Serialize + DeserializeOwned,
{
fn mark_bucket_dirty(&self, bucket: &mut (usize, bool, UniqueVec<FV>, u64)) {
bucket.3 = bucket.3.wrapping_add(1);
if !bucket.1 {
bucket.1 = true;
self.dirty_bucket_count.fetch_add(1, Ordering::Release);
}
}
fn remove_btree_key_if_posting_absent(&self, field_value: &FV) {
let mut btree = self.btree.write();
if !self.postings.contains_key(field_value) {
btree.remove(field_value);
}
}
fn range_query_seed_rank(query: &RangeQuery<FV>) -> u8 {
match query {
RangeQuery::Eq(_) => 0,
RangeQuery::Between(start_key, end_key) if start_key > end_key => 0,
RangeQuery::Include(keys) if keys.is_empty() => 0,
RangeQuery::Include(_) => 1,
RangeQuery::Between(_, _) => 2,
RangeQuery::Gt(_) | RangeQuery::Ge(_) | RangeQuery::Lt(_) | RangeQuery::Le(_) => 3,
RangeQuery::And(queries) => queries
.iter()
.map(|query| Self::range_query_seed_rank(query))
.min()
.unwrap_or(0),
RangeQuery::Or(_) => 4,
RangeQuery::Not(_) => 5,
}
}
fn range_key_matches_query(key: &FV, query: &RangeQuery<FV>) -> bool {
match query {
RangeQuery::Eq(value) => key == value,
RangeQuery::Gt(start_key) => key > start_key,
RangeQuery::Ge(start_key) => key >= start_key,
RangeQuery::Lt(end_key) => key < end_key,
RangeQuery::Le(end_key) => key <= end_key,
RangeQuery::Between(start_key, end_key) => {
start_key <= end_key && key >= start_key && key <= end_key
}
RangeQuery::Include(keys) => keys.iter().any(|value| value == key),
RangeQuery::Or(queries) => queries
.iter()
.any(|query| Self::range_key_matches_query(key, query)),
RangeQuery::And(queries) => {
!queries.is_empty()
&& queries
.iter()
.all(|query| Self::range_key_matches_query(key, query))
}
RangeQuery::Not(query) => !Self::range_key_matches_query(key, query),
}
}
pub fn new(name: String, config: Option<BTreeConfig>) -> Self {
let config = config.unwrap_or_default();
let stats = BTreeStats {
version: 1,
..Default::default()
};
BTreeIndex {
name: name.clone(),
config: config.clone(),
postings: DashMap::new(),
buckets: DashMap::from_iter(vec![(0, (0, false, UniqueVec::default(), 0))]),
btree: RwLock::new(BTreeSet::new()),
metadata: RwLock::new(BTreeMetadata {
name,
config,
stats,
}),
max_bucket_id: AtomicU32::new(0),
query_count: AtomicU64::new(0),
last_saved_version: AtomicU64::new(0),
dirty_bucket_count: AtomicU32::new(0),
}
}
pub async fn load_all<R: Read, F>(metadata: R, f: F) -> Result<Self, BTreeError>
where
F: AsyncFnMut(u32) -> Result<Option<Vec<u8>>, BoxError>,
{
let mut index = Self::load_metadata(metadata)?;
index.load_buckets(f).await?;
Ok(index)
}
pub fn load_metadata<R: Read>(r: R) -> Result<Self, BTreeError> {
let index: BTreeIndexOwned =
ciborium::from_reader(r).map_err(|err| BTreeError::Serialization {
name: "unknown".to_string(),
source: err.into(),
})?;
let max_bucket_id = AtomicU32::new(index.metadata.stats.max_bucket_id);
let query_count = AtomicU64::new(index.metadata.stats.query_count);
let last_saved_version = AtomicU64::new(index.metadata.stats.version);
Ok(BTreeIndex {
name: index.metadata.name.clone(),
config: index.metadata.config.clone(),
postings: DashMap::with_capacity(index.metadata.stats.num_elements as usize),
buckets: DashMap::from_iter(vec![(0, (0, false, UniqueVec::default(), 0))]),
btree: RwLock::new(BTreeSet::new()),
metadata: RwLock::new(index.metadata),
query_count,
max_bucket_id,
last_saved_version,
dirty_bucket_count: AtomicU32::new(0),
})
}
pub async fn load_buckets<F>(&mut self, mut f: F) -> Result<(), BTreeError>
where
F: AsyncFnMut(u32) -> Result<Option<Vec<u8>>, BoxError>,
{
for i in 0..=self.max_bucket_id.load(Ordering::Relaxed) {
let data = f(i).await.map_err(|err| BTreeError::Generic {
name: self.name.clone(),
source: err,
})?;
if let Some(data) = data {
let bucket: BucketOwned<PK, FV> =
ciborium::from_reader(&data[..]).map_err(|err| BTreeError::Serialization {
name: self.name.clone(),
source: err.into(),
})?;
let bks = bucket.postings.keys().cloned().collect::<Vec<_>>();
self.btree.write().extend(bks.iter().cloned());
self.buckets.insert(i, (data.len(), false, bks.into(), 0));
self.postings.extend(bucket.postings);
}
}
Ok(())
}
pub fn len(&self) -> usize {
self.postings.len()
}
pub fn is_empty(&self) -> bool {
self.postings.is_empty()
}
pub fn name(&self) -> &str {
&self.name
}
pub fn allow_duplicates(&self) -> bool {
self.config.allow_duplicates
}
pub fn metadata(&self) -> BTreeMetadata {
let mut metadata = self.metadata.read().clone();
metadata.stats.num_elements = self.postings.len() as u64;
metadata.stats.query_count = self.query_count.load(Ordering::Relaxed);
metadata.stats.max_bucket_id = self.max_bucket_id.load(Ordering::Relaxed);
metadata
}
pub fn stats(&self) -> BTreeStats {
let mut stats = { self.metadata.read().stats.clone() };
stats.num_elements = self.postings.len() as u64;
stats.query_count = self.query_count.load(Ordering::Relaxed);
stats.max_bucket_id = self.max_bucket_id.load(Ordering::Relaxed);
stats
}
pub fn insert(&self, doc_id: PK, field_value: FV, now_ms: u64) -> Result<bool, BTreeError> {
let bucket = self.max_bucket_id.load(Ordering::Relaxed);
self.buckets
.entry(bucket)
.or_insert_with(|| (0, false, UniqueVec::default(), 0));
let mut is_new = false;
let mut size_increase = 0;
let mut previous_posting_size = 0;
let mut target_bucket = bucket;
match self.postings.entry(field_value.clone()) {
dashmap::Entry::Occupied(mut entry) => {
let posting = entry.get_mut();
target_bucket = posting.0;
let posting_size_before_update = estimate_cbor_size(&*posting) + 2;
if !self.config.allow_duplicates && !posting.2.contains(&doc_id) {
return Err(BTreeError::AlreadyExists {
name: self.name.clone(),
id: json!(doc_id),
value: json!(field_value),
});
}
if posting.2.push(doc_id.clone()) {
size_increase = estimate_cbor_size(&doc_id) + 2;
previous_posting_size = posting_size_before_update;
posting.1 += 1; }
}
dashmap::Entry::Vacant(entry) => {
let posting = (bucket, 1, vec![doc_id].into());
size_increase = estimate_cbor_size(&posting) + 2;
entry.insert(posting);
is_new = true;
}
};
if is_new {
self.btree.write().insert(field_value.clone());
}
let mut new_bucket = 0;
if size_increase > 0 {
let mut b = self
.buckets
.entry(target_bucket)
.or_insert_with(|| (0, false, UniqueVec::default(), 0));
if b.2.is_empty() || b.0 + size_increase < self.config.bucket_overload_size {
b.0 += size_increase;
self.mark_bucket_dirty(&mut b);
b.2.push(field_value.clone());
} else {
let mut source_size_decrease = 0;
new_bucket = self.max_bucket_id.fetch_add(1, Ordering::Relaxed) + 1;
{
if let Some(mut posting) = self.postings.get_mut(&field_value) {
posting.0 = new_bucket;
let migrated_posting_size = estimate_cbor_size(&posting) + 2;
source_size_decrease = if previous_posting_size > 0 {
previous_posting_size
} else {
migrated_posting_size
};
size_increase = migrated_posting_size;
} else {
size_increase = 0;
new_bucket = 0;
}
}
if b.2.swap_remove_if(|k| &field_value == k).is_some() {
b.0 = b.0.saturating_sub(source_size_decrease);
self.mark_bucket_dirty(&mut b);
}
}
}
if new_bucket > 0 {
match self.buckets.entry(new_bucket) {
dashmap::Entry::Vacant(entry) => {
entry.insert((size_increase, true, vec![field_value].into(), 1));
self.dirty_bucket_count.fetch_add(1, Ordering::Release);
}
dashmap::Entry::Occupied(mut entry) => {
let bucket_entry = entry.get_mut();
bucket_entry.0 += size_increase;
self.mark_bucket_dirty(bucket_entry);
bucket_entry.2.push(field_value);
}
}
}
if size_increase > 0 {
self.update_metadata(|m| {
m.stats.version += 1;
m.stats.last_inserted = now_ms;
m.stats.insert_count += 1;
});
}
Ok(size_increase > 0)
}
pub fn remove(&self, doc_id: PK, field_value: FV, now_ms: u64) -> bool {
let mut removed = false;
let mut doc_size_decrease = 0;
let mut full_size_decrease = 0;
let mut posting_empty = false;
let mut bucket_id = 0;
{
if let Some(mut posting) = self.postings.get_mut(&field_value) {
bucket_id = posting.0;
let prev_posting_size = estimate_cbor_size(&*posting) + 2;
if posting.2.swap_remove_if(|id| id == &doc_id).is_some() {
removed = true;
posting.1 += 1; posting_empty = posting.2.is_empty();
doc_size_decrease = estimate_cbor_size(&doc_id) + 2;
full_size_decrease = prev_posting_size;
}
}
}
if removed {
let mut entry_removed = false;
if posting_empty {
if let dashmap::Entry::Occupied(entry) = self.postings.entry(field_value.clone())
&& entry.get().2.is_empty()
{
entry.remove();
entry_removed = true;
}
if entry_removed {
self.remove_btree_key_if_posting_absent(&field_value);
}
}
let size_decrease = if entry_removed {
full_size_decrease
} else {
doc_size_decrease
};
if let Some(mut b) = self.buckets.get_mut(&bucket_id) {
b.0 = b.0.saturating_sub(size_decrease);
self.mark_bucket_dirty(&mut b);
if entry_removed {
let remove_from_bucket = match self.postings.get(&field_value) {
Some(posting) => posting.0 != bucket_id,
None => true,
};
if remove_from_bucket {
b.2.swap_remove_if(|k| &field_value == k);
}
}
}
self.update_metadata(|m| {
m.stats.version += 1;
m.stats.last_deleted = now_ms;
m.stats.delete_count += 1;
});
}
removed
}
pub fn insert_array(
&self,
doc_id: PK,
field_values: Vec<FV>,
now_ms: u64,
) -> Result<usize, BTreeError> {
if field_values.is_empty() {
return Ok(0);
}
let mut inserted_count = 0;
let mut bucket_updates: FxHashMap<u32, (usize, FxHashSet<FV>)> = FxHashMap::default();
let mut new_btree_values = Vec::new();
if !self.config.allow_duplicates {
for field_value in &field_values {
if let Some(posting) = self.postings.get(field_value)
&& !posting.2.contains(&doc_id)
{
return Err(BTreeError::AlreadyExists {
name: self.name.clone(),
id: json!(doc_id),
value: json!(field_value),
});
}
}
}
let bucket_id = self.max_bucket_id.load(Ordering::Relaxed);
self.buckets
.entry(bucket_id)
.or_insert_with(|| (0, false, UniqueVec::default(), 0));
for field_value in field_values {
let mut size_increase = 0;
let mut target_bucket_id = bucket_id;
match self.postings.entry(field_value.clone()) {
dashmap::Entry::Occupied(mut entry) => {
let posting = entry.get_mut();
target_bucket_id = posting.0;
if !self.config.allow_duplicates && !posting.2.contains(&doc_id) {
return Err(BTreeError::AlreadyExists {
name: self.name.clone(),
id: json!(doc_id),
value: json!(field_value),
});
}
if posting.2.push(doc_id.clone()) {
size_increase = estimate_cbor_size(&doc_id) + 2;
posting.1 += 1; }
}
dashmap::Entry::Vacant(entry) => {
let posting = (bucket_id, 1, vec![doc_id.clone()].into());
size_increase = estimate_cbor_size(&posting) + 2;
entry.insert(posting);
new_btree_values.push(field_value.clone());
}
};
if size_increase > 0 {
let bucket_entry = bucket_updates
.entry(target_bucket_id)
.or_insert_with(|| (0, FxHashSet::default()));
bucket_entry.0 += size_increase;
bucket_entry.1.insert(field_value);
inserted_count += 1;
}
}
if !new_btree_values.is_empty() {
self.btree.write().extend(new_btree_values);
}
let mut field_values_to_migrate: Vec<(u32, FV, usize)> = Vec::new();
for (bucket_id, (size_delta, field_values)) in bucket_updates {
let mut bucket_entry = self
.buckets
.entry(bucket_id)
.or_insert_with(|| (0, false, UniqueVec::default(), 0));
self.mark_bucket_dirty(&mut bucket_entry);
bucket_entry.0 = bucket_entry.0.saturating_add(size_delta);
for fv in field_values {
if bucket_entry.2.contains(&fv) {
continue;
}
let fv_size = if let Some(posting) = self.postings.get(&fv) {
estimate_cbor_size(&posting) + 2
} else {
continue;
};
if bucket_entry.2.is_empty() || bucket_entry.0 < self.config.bucket_overload_size {
bucket_entry.2.push(fv);
} else {
bucket_entry.0 = bucket_entry.0.saturating_sub(fv_size);
field_values_to_migrate.push((bucket_id, fv, fv_size));
}
}
}
if !field_values_to_migrate.is_empty() {
let mut next_bucket_id = self.max_bucket_id.fetch_add(1, Ordering::Relaxed) + 1;
{
self.buckets
.entry(next_bucket_id)
.or_insert_with(|| (0, false, UniqueVec::default(), 0));
}
for (old_bucket_id, field_value, size) in field_values_to_migrate {
if let Some(mut posting) = self.postings.get_mut(&field_value) {
posting.0 = next_bucket_id;
}
if let Some(mut ob) = self.buckets.get_mut(&old_bucket_id)
&& ob.2.swap_remove_if(|k| &field_value == k).is_some()
{
ob.0 = ob.0.saturating_sub(size);
self.mark_bucket_dirty(&mut ob);
}
let mut new_bucket = false;
if let Some(mut nb) = self.buckets.get_mut(&next_bucket_id) {
if nb.2.is_empty() || nb.0 + size < self.config.bucket_overload_size {
nb.0 += size;
self.mark_bucket_dirty(&mut nb);
nb.2.push(field_value.clone());
} else {
new_bucket = true;
}
}
if new_bucket {
next_bucket_id = self.max_bucket_id.fetch_add(1, Ordering::Relaxed) + 1;
if let Some(mut posting) = self.postings.get_mut(&field_value) {
posting.0 = next_bucket_id;
}
match self.buckets.entry(next_bucket_id) {
dashmap::Entry::Vacant(entry) => {
entry.insert((size, true, vec![field_value].into(), 1));
self.dirty_bucket_count.fetch_add(1, Ordering::Release);
}
dashmap::Entry::Occupied(mut entry) => {
let bucket_entry = entry.get_mut();
bucket_entry.0 += size;
self.mark_bucket_dirty(bucket_entry);
bucket_entry.2.push(field_value);
}
}
}
}
}
if inserted_count > 0 {
self.update_metadata(|m| {
m.stats.version += 1;
m.stats.last_inserted = now_ms;
m.stats.insert_count += inserted_count as u64;
});
}
Ok(inserted_count)
}
pub fn remove_array(&self, doc_id: PK, field_values: Vec<FV>, now_ms: u64) -> usize {
if field_values.is_empty() {
return 0;
}
let mut removed_count = 0;
let mut pending_removals = Vec::new();
for field_value in field_values {
let mut removed = false;
let mut doc_size_decrease = 0;
let mut full_size_decrease = 0;
let mut posting_empty = false;
let mut bucket_id = 0;
if let Some(mut posting) = self.postings.get_mut(&field_value) {
bucket_id = posting.0;
let prev_posting_size = estimate_cbor_size(&*posting) + 2;
if posting.2.swap_remove_if(|id| id == &doc_id).is_some() {
removed = true;
posting.1 += 1; posting_empty = posting.2.is_empty();
doc_size_decrease = estimate_cbor_size(&doc_id) + 2;
full_size_decrease = prev_posting_size;
}
}
if removed {
pending_removals.push((
field_value,
bucket_id,
doc_size_decrease,
full_size_decrease,
posting_empty,
));
removed_count += 1;
}
}
let mut entries_removed = FxHashSet::default();
let mut bucket_updates: FxHashMap<u32, (usize, FxHashSet<FV>)> = FxHashMap::default();
for (field_value, bucket_id, doc_size_decrease, full_size_decrease, posting_empty) in
pending_removals
{
let mut entry_removed = false;
if posting_empty
&& let dashmap::Entry::Occupied(entry) = self.postings.entry(field_value.clone())
&& entry.get().2.is_empty()
{
entry.remove();
entry_removed = true;
entries_removed.insert(field_value.clone());
}
let size_decrease = if entry_removed {
full_size_decrease
} else {
doc_size_decrease
};
let bucket_entry = bucket_updates
.entry(bucket_id)
.or_insert_with(|| (0, FxHashSet::default()));
bucket_entry.0 += size_decrease;
bucket_entry.1.insert(field_value);
}
if !entries_removed.is_empty() {
for value in &entries_removed {
self.remove_btree_key_if_posting_absent(value);
}
}
for (bucket_id, (size_decrease, field_values)) in bucket_updates {
if let Some(mut bucket) = self.buckets.get_mut(&bucket_id) {
bucket.0 = bucket.0.saturating_sub(size_decrease);
self.mark_bucket_dirty(&mut bucket);
for fv in &field_values {
if entries_removed.contains(fv) {
let remove_from_bucket = match self.postings.get(fv) {
Some(posting) => posting.0 != bucket_id,
None => true,
};
if remove_from_bucket {
bucket.2.swap_remove_if(|k| k == fv);
}
}
}
}
}
if removed_count > 0 {
self.update_metadata(|m| {
m.stats.version += 1;
m.stats.last_deleted = now_ms;
m.stats.delete_count += removed_count as u64;
});
}
removed_count
}
pub fn batch_update(
&self,
doc_id: PK,
old_field_values: Vec<FV>,
new_field_values: Vec<FV>,
now_ms: u64,
) -> Result<(usize, usize), BTreeError> {
let old_set: FxHashSet<_> = old_field_values.into_iter().collect();
let new_set: FxHashSet<_> = new_field_values.into_iter().collect();
let to_insert: Vec<_> = new_set.difference(&old_set).cloned().collect();
let to_remove: Vec<_> = old_set.difference(&new_set).cloned().collect();
let inserted = if !to_insert.is_empty() {
self.insert_array(doc_id.clone(), to_insert, now_ms)?
} else {
0
};
let removed = if !to_remove.is_empty() {
self.remove_array(doc_id, to_remove, now_ms)
} else {
0
};
Ok((removed, inserted))
}
pub fn query_with<F, R>(&self, field_value: &FV, f: F) -> Option<R>
where
F: FnOnce(&Vec<PK>) -> Option<R>,
{
self.query_count.fetch_add(1, Ordering::Relaxed);
self.postings
.get(field_value)
.and_then(|posting| f(&posting.2))
}
pub fn range_query_with<F, R>(&self, query: RangeQuery<FV>, mut f: F) -> Vec<R>
where
F: FnMut(&FV, &Vec<PK>) -> (bool, Vec<R>),
{
let mut results = Vec::new();
if self.postings.is_empty() {
return results;
}
self.query_count.fetch_add(1, Ordering::Relaxed);
match query {
RangeQuery::Eq(key) => {
if let Some(posting) = self.postings.get(&key) {
let (conti, rt) = f(&key, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
RangeQuery::Gt(start_key) => {
for k in self.btree.read().range((
std::ops::Bound::Excluded(start_key),
std::ops::Bound::Unbounded,
)) {
if let Some(posting) = self.postings.get(k) {
let (conti, rt) = f(k, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
}
RangeQuery::Ge(start_key) => {
for k in self
.btree
.read()
.range(std::ops::RangeFrom { start: start_key })
{
if let Some(posting) = self.postings.get(k) {
let (conti, rt) = f(k, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
}
RangeQuery::Lt(end_key) => {
let mut groups: Vec<Vec<R>> = Vec::new();
for k in self
.btree
.read()
.range(std::ops::RangeTo { end: end_key })
.rev()
{
if let Some(posting) = self.postings.get(k) {
let (conti, rt) = f(k, &posting.2);
if !rt.is_empty() {
groups.push(rt);
}
if !conti {
break;
}
}
}
return groups.into_iter().rev().flatten().collect();
}
RangeQuery::Le(end_key) => {
let mut groups: Vec<Vec<R>> = Vec::new();
for k in self
.btree
.read()
.range(std::ops::RangeToInclusive { end: end_key })
.rev()
{
if let Some(posting) = self.postings.get(k) {
let (conti, rt) = f(k, &posting.2);
if !rt.is_empty() {
groups.push(rt);
}
if !conti {
break;
}
}
}
return groups.into_iter().rev().flatten().collect();
}
RangeQuery::Between(start_key, end_key) => {
if start_key > end_key {
return results; }
for k in self.btree.read().range(start_key..=end_key) {
if let Some(posting) = self.postings.get(k) {
let (conti, rt) = f(k, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
}
RangeQuery::Include(keys) => {
let keys = BTreeSet::from_iter(keys);
for k in keys.into_iter() {
if let Some(posting) = self.postings.get(&k) {
let (conti, rt) = f(&k, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
}
RangeQuery::And(queries) => {
let keys = self.range_keys(RangeQuery::And(queries));
for k in keys {
if let Some(posting) = self.postings.get(&k) {
let (conti, rt) = f(&k, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
}
RangeQuery::Or(queries) => {
let keys = self.range_keys(RangeQuery::Or(queries));
for k in keys {
if let Some(posting) = self.postings.get(&k) {
let (conti, rt) = f(&k, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
}
RangeQuery::Not(query) => {
let exclude: FxHashSet<FV> = self.range_keys(*query).into_iter().collect();
for k in self.btree.read().iter() {
if exclude.contains(k) {
continue;
}
if let Some(posting) = self.postings.get(k) {
let (conti, rt) = f(k, &posting.2);
results.extend(rt);
if !conti {
return results;
}
}
}
}
}
results
}
pub fn keys(&self, cursor: Option<FV>, limit: Option<usize>) -> Vec<FV> {
match (cursor, limit) {
(Some(cursor), Some(limit)) => self
.btree
.read()
.range((
std::ops::Bound::Excluded(cursor),
std::ops::Bound::Unbounded,
))
.take(limit)
.cloned()
.collect(),
(Some(cursor), None) => self
.btree
.read()
.range((
std::ops::Bound::Excluded(cursor),
std::ops::Bound::Unbounded,
))
.cloned()
.collect(),
(None, Some(limit)) => self.btree.read().iter().take(limit).cloned().collect(),
(None, None) => self.btree.read().iter().cloned().collect(),
}
}
fn range_keys(&self, query: RangeQuery<FV>) -> Vec<FV> {
let mut results: Vec<FV> = Vec::new();
match query {
RangeQuery::Eq(key) => {
if self.btree.read().contains(&key) {
results.push(key);
}
}
RangeQuery::Gt(start_key) => {
results.extend(
self.btree
.read()
.range((
std::ops::Bound::Excluded(start_key),
std::ops::Bound::Unbounded,
))
.cloned(),
);
}
RangeQuery::Ge(start_key) => {
results.extend(
self.btree
.read()
.range(std::ops::RangeFrom { start: start_key })
.cloned(),
);
}
RangeQuery::Lt(end_key) => {
results.extend(
self.btree
.read()
.range(std::ops::RangeTo { end: end_key })
.cloned(),
);
}
RangeQuery::Le(end_key) => {
results.extend(
self.btree
.read()
.range(std::ops::RangeToInclusive { end: end_key })
.cloned(),
);
}
RangeQuery::Between(start_key, end_key) => {
if start_key <= end_key {
results.extend(self.btree.read().range(start_key..=end_key).cloned());
}
}
RangeQuery::Include(keys) => {
let keys = BTreeSet::from_iter(keys);
let btree = self.btree.read();
results.extend(keys.into_iter().filter(|k| btree.contains(k)));
}
RangeQuery::And(queries) => {
if queries.is_empty() {
return results;
}
let mut queries = queries;
let seed_index = queries
.iter()
.enumerate()
.min_by_key(|(_, query)| Self::range_query_seed_rank(query))
.map(|(index, _)| index)
.unwrap_or(0);
let seed_query = *queries.swap_remove(seed_index);
let mut intersection: BTreeSet<FV> =
self.range_keys(seed_query).into_iter().collect();
for query in queries {
intersection.retain(|key| Self::range_key_matches_query(key, &query));
if intersection.is_empty() {
return vec![];
}
}
results.extend(intersection);
}
RangeQuery::Or(queries) => {
let mut merged = BTreeSet::new();
for query in queries {
merged.extend(self.range_keys(*query));
}
results.extend(merged);
}
RangeQuery::Not(query) => {
let exclude: FxHashSet<FV> = self.range_keys(*query).into_iter().collect();
results.extend(
self.btree
.read()
.iter()
.filter(|k| !exclude.contains(k))
.cloned(),
);
}
}
results
}
pub async fn flush<W: Write, F>(
&self,
metadata: W,
now_ms: u64,
f: F,
) -> Result<bool, BTreeError>
where
F: AsyncFnMut(u32, &[u8]) -> Result<bool, BoxError>,
{
let meta_saved = self.store_metadata(metadata, now_ms)?;
let had_dirty = self.has_dirty_buckets();
if !meta_saved && !had_dirty {
return Ok(false);
}
self.store_dirty_buckets(f).await?;
Ok(meta_saved || had_dirty)
}
pub fn has_dirty_buckets(&self) -> bool {
self.dirty_bucket_count.load(Ordering::Acquire) > 0
}
pub fn has_pending_metadata_flush(&self) -> bool {
let current_version = { self.metadata.read().stats.version };
self.last_saved_version.load(Ordering::Acquire) < current_version
}
pub fn compact_buckets(&self) -> (usize, usize) {
let old_count = self.buckets.len();
if old_count <= 1 {
return (old_count, old_count);
}
let mut fv_sizes: Vec<(FV, usize)> = self
.postings
.iter()
.map(|entry| {
let size = estimate_cbor_size(&(entry.key(), entry.value())) + 2;
(entry.key().clone(), size)
})
.collect();
if fv_sizes.is_empty() {
self.buckets.clear();
self.buckets.insert(0, (0, true, UniqueVec::default(), 1));
self.max_bucket_id.store(0, Ordering::Relaxed);
self.dirty_bucket_count.store(1, Ordering::Release);
self.update_metadata(|m| {
m.stats.version += 1;
});
return (old_count, 1);
}
fv_sizes.sort_unstable_by(|a, b| b.1.cmp(&a.1));
let limit = self.config.bucket_overload_size;
let mut bins: Vec<(usize, Vec<FV>)> = Vec::new();
for (fv, size) in fv_sizes {
if let Some(bin) = bins.iter_mut().find(|b| b.0 + size < limit) {
bin.0 += size;
bin.1.push(fv);
} else {
bins.push((size, vec![fv]));
}
}
self.buckets.clear();
let new_count = bins.len();
let max_id = new_count.saturating_sub(1) as u32;
for (i, (size, field_values)) in bins.into_iter().enumerate() {
let bucket_id = i as u32;
for fv in &field_values {
if let Some(mut posting) = self.postings.get_mut(fv) {
posting.0 = bucket_id;
}
}
self.buckets
.insert(bucket_id, (size, true, field_values.into(), 1));
}
self.max_bucket_id.store(max_id, Ordering::Relaxed);
self.dirty_bucket_count
.store(new_count as u32, Ordering::Release);
self.update_metadata(|m| {
m.stats.version += 1;
});
(old_count, new_count)
}
pub fn store_metadata<W: Write>(&self, w: W, now_ms: u64) -> Result<bool, BTreeError> {
let current_version = { self.metadata.read().stats.version };
if self.last_saved_version.load(Ordering::Relaxed) >= current_version {
return Ok(false);
}
let mut meta = self.metadata();
let prev_saved_version = self
.last_saved_version
.fetch_max(meta.stats.version, Ordering::Relaxed);
if prev_saved_version >= meta.stats.version {
return Ok(false);
}
meta.stats.last_saved = now_ms.max(meta.stats.last_saved);
if let Err(err) = ciborium::into_writer(&BTreeIndexRef { metadata: &meta }, w) {
let _ = self.last_saved_version.compare_exchange(
meta.stats.version,
prev_saved_version,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Err(BTreeError::Serialization {
name: self.name.clone(),
source: err.into(),
});
}
self.update_metadata(|m| {
m.stats.last_saved = meta.stats.last_saved.max(m.stats.last_saved);
});
Ok(true)
}
pub async fn store_dirty_buckets<F>(&self, mut f: F) -> Result<(), BTreeError>
where
F: AsyncFnMut(u32, &[u8]) -> Result<bool, BoxError>,
{
let dirty_count_snapshot = self.dirty_bucket_count.load(Ordering::Acquire);
if dirty_count_snapshot == 0 {
return Ok(());
}
let dirty_bucket_ids: Vec<u32> = self
.buckets
.iter()
.filter_map(|b| if b.1 { Some(*b.key()) } else { None })
.collect();
if dirty_bucket_ids.is_empty() {
let _ = self.dirty_bucket_count.compare_exchange(
dirty_count_snapshot,
0,
Ordering::AcqRel,
Ordering::Relaxed,
);
return Ok(());
}
let mut buf = Vec::with_capacity(4096);
for bucket_id in dirty_bucket_ids {
let Some(bucket) = self.buckets.get(&bucket_id) else {
continue;
};
if !bucket.1 {
continue;
}
let dirty_version = bucket.3;
{
let postings: FxHashMap<_, _> = bucket
.2
.iter()
.filter_map(|fv| self.postings.get(fv).map(|p| (fv, p)))
.collect();
buf.clear();
ciborium::into_writer(
&BucketRef {
postings: &postings,
},
&mut buf,
)
.map_err(|err| BTreeError::Serialization {
name: self.name.clone(),
source: err.into(),
})?;
}
drop(bucket);
let conti = f(bucket_id, &buf)
.await
.map_err(|err| BTreeError::Generic {
name: self.name.clone(),
source: err,
})?;
if let Some(mut bucket) = self.buckets.get_mut(&bucket_id)
&& bucket.1
&& bucket.3 == dirty_version
{
bucket.1 = false;
let _ = self.dirty_bucket_count.fetch_update(
Ordering::AcqRel,
Ordering::Relaxed,
|v| Some(v.saturating_sub(1)),
);
}
if !conti {
return Ok(());
}
}
Ok(())
}
fn update_metadata<F>(&self, f: F)
where
F: FnOnce(&mut BTreeMetadata),
{
let mut metadata = self.metadata.write();
f(&mut metadata);
}
}
impl<PK> BTreeIndex<PK, String>
where
PK: Ord + Debug + Clone + Serialize + DeserializeOwned,
{
pub fn prefix_query_with<F, R>(&self, prefix: &str, mut f: F) -> Vec<R>
where
F: FnMut(&str, &Vec<PK>) -> (bool, Option<R>),
{
let mut results = Vec::new();
if self.postings.is_empty() {
return results;
}
self.query_count.fetch_add(1, Ordering::Relaxed);
if prefix.is_empty() {
for k in self.btree.read().iter() {
if let Some(posting) = self.postings.get(k) {
let (con, rt) = f(k, &posting.2);
if let Some(r) = rt {
results.push(r);
}
if !con {
break;
}
}
}
return results;
}
let lower = prefix.to_string();
let mut upper = String::with_capacity(prefix.len() + 4);
upper.push_str(prefix);
upper.push(char::MAX);
for k in self.btree.read().range(lower..=upper) {
if let Some(posting) = self.postings.get(k) {
let (con, rt) = f(k, &posting.2);
if let Some(r) = rt {
results.push(r);
}
if !con {
break;
}
}
}
results
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{Barrier, Mutex};
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
fn create_test_index() -> BTreeIndex<u64, String> {
let config = BTreeConfig {
bucket_overload_size: 1024,
allow_duplicates: true,
};
BTreeIndex::new("test_index".to_string(), Some(config))
}
fn create_populated_index() -> BTreeIndex<u64, String> {
let index = create_test_index();
let _ = index.insert(1, "apple".to_string(), now_ms());
let _ = index.insert(2, "banana".to_string(), now_ms());
let _ = index.insert(3, "cherry".to_string(), now_ms());
let _ = index.insert(4, "date".to_string(), now_ms());
let _ = index.insert(5, "eggplant".to_string(), now_ms());
let _ = index.insert(6, "apple".to_string(), now_ms());
let _ = index.insert(7, "banana".to_string(), now_ms());
index
}
#[test]
fn test_create_index() {
let index = create_test_index();
assert_eq!(index.name(), "test_index");
assert_eq!(index.len(), 0);
assert!(index.is_empty());
let metadata = index.metadata();
assert_eq!(metadata.name, "test_index");
assert_eq!(metadata.stats.num_elements, 0);
}
#[test]
fn test_insert() {
let index = create_test_index();
let result = index.insert(1, "apple".to_string(), now_ms());
assert!(result.is_ok());
assert!(result.unwrap());
assert_eq!(index.len(), 1);
assert!(!index.is_empty());
let result = index.insert(1, "apple".to_string(), now_ms());
assert!(result.is_ok());
assert!(!result.unwrap());
let result = index.insert(2, "apple".to_string(), now_ms());
assert!(result.is_ok());
assert!(result.unwrap());
let config = BTreeConfig {
bucket_overload_size: 1024,
allow_duplicates: false,
};
let unique_index = BTreeIndex::new("unique_index".to_string(), Some(config));
let result = unique_index.insert(1, "apple".to_string(), now_ms());
assert!(result.is_ok());
let result = unique_index.insert(1, "apple".to_string(), now_ms());
assert!(result.is_ok());
assert!(!result.unwrap());
let result = unique_index.insert(2, "apple".to_string(), now_ms());
assert!(result.is_err());
match result {
Err(BTreeError::AlreadyExists { .. }) => (),
_ => panic!("Expected AlreadyExists error"),
}
}
#[test]
fn test_insert_idempotent_does_not_update_stats() {
let index = create_test_index();
let inserted = index.insert(1, "apple".to_string(), now_ms()).unwrap();
assert!(inserted);
let stats_after_first = index.stats();
let inserted = index.insert(1, "apple".to_string(), now_ms()).unwrap();
assert!(!inserted);
let stats_after_second = index.stats();
assert_eq!(stats_after_first.insert_count, 1);
assert_eq!(
stats_after_second.insert_count,
stats_after_first.insert_count
);
assert_eq!(stats_after_second.version, stats_after_first.version);
}
#[test]
fn test_remove() {
let index = create_populated_index();
let result = index.remove(1, "apple".to_string(), now_ms());
assert!(result);
let result = index.remove(100, "nonexistent".to_string(), now_ms());
assert!(!result);
let result = index.remove(999, "banana".to_string(), now_ms());
assert!(!result);
let result = index.query_with(&"apple".to_string(), |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert!(!ids.contains(&1)); assert!(ids.contains(&6));
let result = index.remove(6, "apple".to_string(), now_ms());
assert!(result);
let result = index.query_with(&"apple".to_string(), |ids| Some(ids.clone()));
assert!(result.is_none()); }
#[test]
fn test_query() {
let index = create_populated_index();
let result = index.query_with(&"apple".to_string(), |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert!(ids.contains(&1));
assert!(ids.contains(&6));
let result = index.query_with(&"nonexistent".to_string(), |ids| Some(ids.clone()));
assert!(result.is_none());
}
#[test]
fn test_range_query() {
let index = create_populated_index();
let apple = "apple".to_string();
let banana = "banana".to_string();
let cherry = "cherry".to_string();
let date = "date".to_string();
let eggplant = "eggplant".to_string();
let query = RangeQuery::Eq(apple.clone());
let results =
index.range_query_with(query, |k, ids| (true, vec![(k.clone(), ids.clone())]));
assert_eq!(results.len(), 1);
assert_eq!(results[0].0, "apple");
let query = RangeQuery::Gt(cherry.clone());
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 2);
assert!(results.contains(&"date".to_string()));
assert!(results.contains(&"eggplant".to_string()));
let query = RangeQuery::Ge(cherry.clone());
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 3);
assert!(results.contains(&"cherry".to_string()));
let query = RangeQuery::Lt(cherry.clone());
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 2);
assert!(results.contains(&apple));
assert!(results.contains(&banana));
let query = RangeQuery::Le(cherry.clone());
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 3);
assert!(results.contains(&cherry));
let query = RangeQuery::Between(banana.clone(), date.clone());
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 3);
assert!(results.contains(&banana));
assert!(results.contains(&cherry));
assert!(results.contains(&date));
let keys = vec![apple.clone(), eggplant.clone()];
let query = RangeQuery::Include(keys);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 2);
assert!(results.contains(&apple));
assert!(results.contains(&eggplant));
let query = RangeQuery::Ge(apple.clone());
let results = index.range_query_with(query, |k, _| (k != "banana", vec![k.clone()]));
assert_eq!(results.len(), 2);
assert_eq!(results[0], "apple");
assert_eq!(results[1], "banana");
}
#[test]
fn test_logical_queries() {
let index = create_populated_index();
let _ = index.insert(8, "grape".to_string(), now_ms());
let _ = index.insert(9, "fig".to_string(), now_ms());
let _ = index.insert(10, "berry".to_string(), now_ms());
let _ = index.insert(11, "berry".to_string(), now_ms());
let apple = "apple".to_string();
let banana = "banana".to_string();
let berry = "berry".to_string();
let cherry = "cherry".to_string();
let date = "date".to_string();
let eggplant = "eggplant".to_string();
let fig = "fig".to_string();
let grape = "grape".to_string();
let query = RangeQuery::And(vec![
Box::new(RangeQuery::Le(date.clone())), Box::new(RangeQuery::Ge(cherry.clone())), ]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 2);
assert!(results.contains(&cherry));
assert!(results.contains(&date));
let query = RangeQuery::And(vec![
Box::new(RangeQuery::Lt(cherry.clone())), Box::new(RangeQuery::Gt(date.clone())), ]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 0);
let query = RangeQuery::And(vec![
Box::new(RangeQuery::Ge(banana.clone())), Box::new(RangeQuery::Lt(eggplant.clone())), Box::new(RangeQuery::Eq(cherry.clone())), ]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 1);
assert!(results.contains(&cherry));
let query = RangeQuery::Or(vec![
Box::new(RangeQuery::Le(banana.clone())), Box::new(RangeQuery::Ge(fig.clone())), ]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 4);
assert!(results.contains(&apple));
assert!(results.contains(&banana));
assert!(results.contains(&fig));
assert!(results.contains(&grape));
let query = RangeQuery::Or(vec![
Box::new(RangeQuery::Between(banana.clone(), date.clone())), Box::new(RangeQuery::Between(cherry.clone(), fig.clone())), ]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), 6);
assert!(results.contains(&banana));
assert!(results.contains(&berry));
assert!(results.contains(&cherry));
assert!(results.contains(&date));
assert!(results.contains(&eggplant));
assert!(results.contains(&fig));
let query = RangeQuery::Not(Box::new(RangeQuery::Between(
cherry.clone(),
eggplant.clone(),
)));
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert!(results.contains(&apple));
assert!(results.contains(&banana));
assert!(results.contains(&fig));
assert!(results.contains(&grape));
assert!(!results.contains(&cherry));
assert!(!results.contains(&date));
assert!(!results.contains(&eggplant));
let query = RangeQuery::Not(Box::new(RangeQuery::Eq(apple.clone())));
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert!(!results.contains(&apple));
assert!(results.contains(&banana));
assert!(results.contains(&cherry));
let query = RangeQuery::And(vec![
Box::new(RangeQuery::Or(vec![
Box::new(RangeQuery::Le(cherry.clone())), Box::new(RangeQuery::Ge(fig.clone())), ])),
Box::new(RangeQuery::Or(vec![
Box::new(RangeQuery::Le(banana.clone())), Box::new(RangeQuery::Ge(eggplant.clone())), ])),
]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert!(results.contains(&apple));
assert!(results.contains(&banana));
assert!(results.contains(&fig));
assert!(results.contains(&grape));
assert!(!results.contains(&cherry));
assert!(!results.contains(&date));
let query = RangeQuery::Or(vec![
Box::new(RangeQuery::Not(Box::new(RangeQuery::Ge(date.clone())))), Box::new(RangeQuery::Not(Box::new(RangeQuery::Le(cherry.clone())))), ]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results.len(), index.len());
let query = RangeQuery::Not(Box::new(RangeQuery::And(vec![
Box::new(RangeQuery::Ge(cherry.clone())), Box::new(RangeQuery::Le(eggplant.clone())), ])));
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert!(results.contains(&apple));
assert!(results.contains(&banana));
assert!(results.contains(&fig));
assert!(results.contains(&grape));
assert!(!results.contains(&cherry));
assert!(!results.contains(&date));
assert!(!results.contains(&eggplant));
let query = RangeQuery::Or(vec![
Box::new(RangeQuery::Ge(apple.clone())),
Box::new(RangeQuery::Le(grape.clone())),
]);
let mut count = 0;
let results = index.range_query_with(query, |_, _| {
count += 1;
(count < 3, vec![count.to_string()])
});
assert_eq!(results.len(), 3);
assert_eq!(count, 3); }
#[test]
fn test_range_query_lt_le_full_order() {
let index = create_populated_index();
let results = index.range_query_with(RangeQuery::Lt("date".to_string()), |k, _| {
(true, vec![k.clone()])
});
assert_eq!(results, vec!["apple", "banana", "cherry"]);
let results = index.range_query_with(RangeQuery::Le("date".to_string()), |k, _| {
(true, vec![k.clone()])
});
assert_eq!(results, vec!["apple", "banana", "cherry", "date"]);
}
#[test]
fn test_range_query_lt_le_with_early_stop_limit_semantics() {
let index = create_populated_index();
let mut count = 0usize;
let results = index.range_query_with(RangeQuery::Lt("date".to_string()), |k, _| {
count += 1;
(count < 2, vec![k.clone()])
});
assert_eq!(results, vec!["banana", "cherry"]);
let mut count = 0usize;
let results = index.range_query_with(RangeQuery::Le("date".to_string()), |k, _| {
count += 1;
(count < 2, vec![k.clone()])
});
assert_eq!(results, vec!["cherry", "date"]);
let mut count = 0usize;
let results = index.range_query_with(RangeQuery::Lt("banana".to_string()), |k, _| {
count += 1;
(count < 10, vec![k.clone()])
});
assert_eq!(results, vec!["apple"]);
}
#[test]
fn test_range_query_lt_le_group_order_preserved() {
let index = create_populated_index();
let mut count = 0usize;
let results = index.range_query_with(RangeQuery::Lt("date".to_string()), |k, _| {
count += 1;
let v = vec![format!("{k}-1"), format!("{k}-2")];
(count < 2, v)
});
assert_eq!(
results,
vec![
"banana-1".to_string(),
"banana-2".to_string(),
"cherry-1".to_string(),
"cherry-2".to_string()
]
);
let mut count = 0usize;
let results = index.range_query_with(RangeQuery::Le("date".to_string()), |k, _| {
count += 1;
let v = vec![format!("{k}-1"), format!("{k}-2")];
(count < 2, v)
});
assert_eq!(
results,
vec![
"cherry-1".to_string(),
"cherry-2".to_string(),
"date-1".to_string(),
"date-2".to_string()
]
);
}
#[test]
fn test_range_keys() {
let index = create_populated_index();
let apple = "apple".to_string();
let banana = "banana".to_string();
let cherry = "cherry".to_string();
let eggplant = "eggplant".to_string();
let query = RangeQuery::And(vec![
Box::new(RangeQuery::Ge(banana.clone())),
Box::new(RangeQuery::Le(cherry.clone())),
]);
let keys = index.range_keys(query);
assert_eq!(keys.len(), 2);
assert!(keys.contains(&banana));
assert!(keys.contains(&cherry));
let query = RangeQuery::Or(vec![
Box::new(RangeQuery::Eq(apple.clone())),
Box::new(RangeQuery::Eq(eggplant.clone())),
]);
let keys = index.range_keys(query);
assert_eq!(keys.len(), 2);
assert!(keys.contains(&apple));
assert!(keys.contains(&eggplant));
let query = RangeQuery::Not(Box::new(RangeQuery::Eq(apple.clone())));
let keys = index.range_keys(query);
assert!(!keys.contains(&apple));
assert!(keys.contains(&banana));
assert!(keys.contains(&cherry));
}
#[test]
fn test_range_keys_invalid_between_inside_logical_queries() {
let index = create_populated_index();
let invalid_between = RangeQuery::Between("date".to_string(), "banana".to_string());
let results = index.range_query_with(
RangeQuery::Or(vec![
Box::new(invalid_between.clone()),
Box::new(RangeQuery::Eq("apple".to_string())),
]),
|key, _| (true, vec![key.clone()]),
);
assert_eq!(results, vec!["apple"]);
let results = index.range_query_with(
RangeQuery::And(vec![
Box::new(RangeQuery::Ge("apple".to_string())),
Box::new(invalid_between.clone()),
]),
|key, _| (true, vec![key.clone()]),
);
assert!(results.is_empty());
let results = index
.range_query_with(RangeQuery::Not(Box::new(invalid_between)), |key, _| {
(true, vec![key.clone()])
});
assert_eq!(results, index.keys(None, None));
}
#[test]
fn test_prefix_query() {
let index = create_populated_index();
let _ = index.insert(10, "app".to_string(), now_ms());
let _ = index.insert(11, "application".to_string(), now_ms());
let results = index.prefix_query_with("app", |k, _| (true, Some(k.to_string())));
assert_eq!(results.len(), 3);
assert!(results.contains(&"app".to_string()));
assert!(results.contains(&"apple".to_string()));
assert!(results.contains(&"application".to_string()));
let results = index.prefix_query_with("app", |k, _| (k != "apple", Some(k.to_string())));
assert_eq!(results.len(), 2);
assert_eq!(results[0], "app");
assert_eq!(results[1], "apple");
}
#[tokio::test]
async fn test_serialization() {
let index = create_populated_index();
let mut buf = Vec::new();
let result = index.store_metadata(&mut buf, now_ms());
assert!(result.is_ok());
println!("Serialized metadata: {:?}", hex::encode(&buf));
let result = BTreeIndex::<u64, String>::load_metadata(&buf[..]);
let mut loaded_index = result.unwrap();
assert_eq!(loaded_index.name(), "test_index");
assert_eq!(loaded_index.len(), 0);
let bucket_data = Arc::new(Mutex::new(Vec::new()));
{
let bucket_data_clone = bucket_data.clone();
let result = index
.store_dirty_buckets(async |bucket_id, data| {
let mut guard = bucket_data_clone.lock().await;
while guard.len() <= bucket_id as usize {
guard.push(Vec::new());
}
guard[bucket_id as usize] = data.to_vec();
Ok(true)
})
.await;
assert!(result.is_ok());
}
{
let bucket_data_clone = bucket_data.clone();
let result = loaded_index
.load_buckets(async |bucket_id| {
let guard = bucket_data_clone.lock().await;
if bucket_id as usize >= guard.len() {
return Err(BTreeError::Generic {
name: "test".to_string(),
source: "Bucket not found".into(),
}
.into());
}
Ok(Some(guard[bucket_id as usize].clone()))
})
.await;
assert!(result.is_ok());
}
assert_eq!(loaded_index.len(), index.len());
let result = loaded_index.query_with(&"apple".to_string(), |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert!(ids.contains(&1));
assert!(ids.contains(&6));
}
#[tokio::test]
async fn test_flush_persists_dirty_buckets_even_if_metadata_already_saved() {
let index = create_test_index();
let _ = index.insert(1, "apple".to_string(), now_ms()).unwrap();
let mut meta_buf = Vec::new();
assert!(index.store_metadata(&mut meta_buf, now_ms()).unwrap());
assert!(index.has_dirty_buckets());
let writes = Arc::new(Mutex::new(0usize));
let writes_clone = writes.clone();
let mut meta_buf2 = Vec::new();
let saved = index
.flush(&mut meta_buf2, now_ms(), async move |_, _| {
let mut g = writes_clone.lock().await;
*g += 1;
Ok(true)
})
.await
.unwrap();
assert!(saved);
assert_eq!(*writes.lock().await, 1);
assert!(!index.has_dirty_buckets());
}
#[tokio::test]
async fn test_store_dirty_buckets_propagates_write_error() {
let index = create_test_index();
index.insert(1, "apple".to_string(), now_ms()).unwrap();
assert!(index.has_dirty_buckets());
let err = index
.store_dirty_buckets(async |_, _| Err::<bool, BoxError>("write failed".into()))
.await
.unwrap_err();
match err {
BTreeError::Generic { .. } => {}
other => panic!("Expected Generic error, got: {other:?}"),
}
assert!(index.has_dirty_buckets());
}
#[tokio::test]
async fn test_store_dirty_buckets_keeps_dirty_when_mutated_during_persist() {
let index = Arc::new(create_test_index());
index.insert(1, "apple".to_string(), now_ms()).unwrap();
assert!(index.has_dirty_buckets());
let index_for_cb = index.clone();
index
.store_dirty_buckets(async move |_, _| {
index_for_cb
.insert(2, "banana".to_string(), now_ms())
.unwrap();
Ok(true)
})
.await
.unwrap();
assert!(index.has_dirty_buckets());
index
.store_dirty_buckets(async |_, _| Ok(true))
.await
.unwrap();
assert!(!index.has_dirty_buckets());
}
#[tokio::test]
async fn test_store_dirty_buckets_self_heals_drifted_counter() {
let index = create_test_index();
index.insert(1, "apple".to_string(), now_ms()).unwrap();
assert!(index.has_dirty_buckets());
index
.store_dirty_buckets(async |_, _| Ok(true))
.await
.unwrap();
assert!(!index.has_dirty_buckets());
index.dirty_bucket_count.store(1, Ordering::Release);
index
.store_dirty_buckets(async |_, _| {
panic!("no dirty bucket should be written");
})
.await
.unwrap();
assert!(!index.has_dirty_buckets());
}
#[tokio::test]
async fn test_migrated_source_bucket_is_persisted_to_prevent_resurrection() {
let config = BTreeConfig {
bucket_overload_size: 80,
allow_duplicates: true,
};
let index = BTreeIndex::new("resurrection_test".to_string(), Some(config));
index.insert(1, "apple".to_string(), now_ms()).unwrap();
let metadata_buf = Arc::new(Mutex::new(Vec::new()));
let bucket_data = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
{
let mut meta = metadata_buf.lock().await;
index
.flush(&mut *meta, now_ms(), {
let bucket_data = bucket_data.clone();
async move |bucket_id, data| {
let mut guard = bucket_data.lock().await;
while guard.len() <= bucket_id as usize {
guard.push(Vec::new());
}
guard[bucket_id as usize] = data.to_vec();
Ok(true)
}
})
.await
.unwrap();
}
let mut doc_id = 2u64;
while index.stats().max_bucket_id == 0 && doc_id < 200 {
index.insert(doc_id, "apple".to_string(), now_ms()).unwrap();
doc_id += 1;
}
assert!(index.stats().max_bucket_id > 0);
{
let mut meta = metadata_buf.lock().await;
index
.flush(&mut *meta, now_ms(), {
let bucket_data = bucket_data.clone();
async move |bucket_id, data| {
let mut guard = bucket_data.lock().await;
while guard.len() <= bucket_id as usize {
guard.push(Vec::new());
}
guard[bucket_id as usize] = data.to_vec();
Ok(true)
}
})
.await
.unwrap();
}
for id in 1..doc_id {
index.remove(id, "apple".to_string(), now_ms());
}
assert!(
index
.query_with(&"apple".to_string(), |ids| Some(ids.clone()))
.is_none()
);
{
let mut meta = metadata_buf.lock().await;
index
.flush(&mut *meta, now_ms(), {
let bucket_data = bucket_data.clone();
async move |bucket_id, data| {
let mut guard = bucket_data.lock().await;
while guard.len() <= bucket_id as usize {
guard.push(Vec::new());
}
guard[bucket_id as usize] = data.to_vec();
Ok(true)
}
})
.await
.unwrap();
}
let meta = { metadata_buf.lock().await.clone() };
let mut loaded = BTreeIndex::<u64, String>::load_metadata(&meta[..]).unwrap();
loaded
.load_buckets({
let bucket_data = bucket_data.clone();
async move |bucket_id| {
let guard = bucket_data.lock().await;
if bucket_id as usize >= guard.len() {
return Ok(None);
}
if guard[bucket_id as usize].is_empty() {
return Ok(None);
}
Ok(Some(guard[bucket_id as usize].clone()))
}
})
.await
.unwrap();
assert!(
loaded
.query_with(&"apple".to_string(), |ids| Some(ids.clone()))
.is_none(),
"apple should not resurrect from stale source bucket"
);
}
#[test]
fn test_insert_after_load_metadata_without_loading_buckets() {
let meta = BTreeMetadata {
name: "loaded_index".to_string(),
config: BTreeConfig {
bucket_overload_size: 1024,
allow_duplicates: true,
},
stats: BTreeStats {
version: 1,
max_bucket_id: 3,
..Default::default()
},
};
let owned = BTreeIndexOwned { metadata: meta };
let mut buf = Vec::new();
ciborium::into_writer(&owned, &mut buf).unwrap();
let index = BTreeIndex::<u64, String>::load_metadata(&buf[..]).unwrap();
let result = index.insert(1, "apple".to_string(), now_ms());
assert!(result.is_ok());
}
#[test]
fn test_bucket_overflow() {
let config = BTreeConfig {
bucket_overload_size: 100, allow_duplicates: true,
};
let index = BTreeIndex::new("overflow_test".to_string(), Some(config));
for i in 0..100 {
let key = format!("key_{i}");
let _ = index.insert(i, key, now_ms());
}
println!("index.stats(): {:?}", index.stats());
assert!(index.stats().max_bucket_id > 1);
for i in 0..100 {
let key = format!("key_{i}");
let result = index.query_with(&key, |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert!(ids.contains(&i));
}
}
#[test]
fn test_insert_array() {
let index = create_test_index();
let result = index.insert_array(1, vec![], now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 0);
let values = vec![
"apple".to_string(),
"banana".to_string(),
"cherry".to_string(),
];
let result = index.insert_array(1, values.clone(), now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 3);
for value in &values {
let result = index.query_with(value, |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert!(ids.contains(&1));
}
let result = index.insert_array(1, values.clone(), now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 0);
let result = index.insert_array(2, values.clone(), now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 3);
for value in &values {
let result = index.query_with(value, |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
}
let config = BTreeConfig {
bucket_overload_size: 1024,
allow_duplicates: false,
};
let unique_index = BTreeIndex::new("unique_index".to_string(), Some(config));
let result = unique_index.insert_array(1, vec!["apple".to_string()], now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 1);
let result = unique_index.insert_array(2, vec!["apple".to_string()], now_ms());
assert!(result.is_err());
let small_bucket_config = BTreeConfig {
bucket_overload_size: 50,
allow_duplicates: true,
};
let overflow_index =
BTreeIndex::new("overflow_test".to_string(), Some(small_bucket_config));
let large_values: Vec<_> = (0..20).map(|i| format!("large_value_{i}")).collect();
let result = overflow_index.insert_array(1, large_values.clone(), now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 20);
let result = overflow_index.insert_array(2, large_values.clone(), now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 20);
let stats = overflow_index.stats();
println!("Overflow index stats: {stats:?}");
assert!(stats.max_bucket_id > 0);
for value in &large_values {
let result = overflow_index.query_with(value, |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
}
}
#[test]
fn test_remove_array() {
let index = create_test_index();
let values = vec![
"apple".to_string(),
"banana".to_string(),
"cherry".to_string(),
"date".to_string(),
"eggplant".to_string(),
];
let _ = index.insert_array(1, values.clone(), now_ms());
let _ = index.insert_array(2, values.clone(), now_ms());
let _ = index.insert_array(3, vec![values[0].clone(), values[1].clone()], now_ms());
for value in &values {
let result = index.query_with(value, |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
if value == "apple" || value == "banana" {
assert_eq!(ids.len(), 3); assert!(ids.contains(&1) && ids.contains(&2) && ids.contains(&3));
} else {
assert_eq!(ids.len(), 2); assert!(ids.contains(&1) && ids.contains(&2));
}
}
let removed = index.remove_array(1, vec![], now_ms());
assert_eq!(removed, 0);
assert_eq!(index.len(), 5);
let remove_values = vec![
"apple".to_string(),
"nonexistent".to_string(), "banana".to_string(),
];
let removed = index.remove_array(1, remove_values, now_ms());
assert_eq!(removed, 2);
let apple_result = index.query_with(&"apple".to_string(), |ids| Some(ids.clone()));
assert!(apple_result.is_some());
let apple_ids = apple_result.unwrap();
assert_eq!(apple_ids.len(), 2);
assert!(!apple_ids.contains(&1) && apple_ids.contains(&2) && apple_ids.contains(&3));
let banana_result = index.query_with(&"banana".to_string(), |ids| Some(ids.clone()));
assert!(banana_result.is_some());
let banana_ids = banana_result.unwrap();
assert_eq!(banana_ids.len(), 2);
assert!(!banana_ids.contains(&1) && banana_ids.contains(&2) && banana_ids.contains(&3));
let _ = index.remove_array(
2,
vec!["date".to_string(), "eggplant".to_string()],
now_ms(),
);
let remove_values = vec!["date".to_string(), "eggplant".to_string()];
let removed = index.remove_array(1, remove_values, now_ms());
assert_eq!(removed, 2);
assert!(
index
.query_with(&"date".to_string(), |ids| Some(ids.clone()))
.is_none()
);
assert!(
index
.query_with(&"eggplant".to_string(), |ids| Some(ids.clone()))
.is_none()
);
assert_eq!(index.len(), 3);
let stats = index.stats();
assert!(stats.delete_count > 0);
let small_bucket_config = BTreeConfig {
bucket_overload_size: 50,
allow_duplicates: true,
};
let overflow_index =
BTreeIndex::new("overflow_test".to_string(), Some(small_bucket_config));
let large_values: Vec<_> = (0..20).map(|i| format!("large_value_{i}")).collect();
let _ = overflow_index.insert_array(1, large_values.clone(), now_ms());
let _ = overflow_index.insert_array(2, large_values.clone(), now_ms());
let stats = overflow_index.stats();
assert!(stats.max_bucket_id > 0);
let removed = overflow_index.remove_array(1, large_values.clone(), now_ms());
assert_eq!(removed, 20);
for value in &large_values {
let result = overflow_index.query_with(value, |ids| Some(ids.clone()));
assert!(result.is_some());
let ids = result.unwrap();
assert_eq!(ids.len(), 1);
assert!(ids.contains(&2));
}
let removed = overflow_index.remove_array(2, large_values.clone(), now_ms());
assert_eq!(removed, 20);
assert_eq!(overflow_index.len(), 0);
for value in &large_values {
let result = overflow_index.query_with(value, |ids| Some(ids.clone()));
assert!(result.is_none());
}
}
#[test]
fn test_batch_update() {
let index = create_test_index();
let _ = index.insert_array(1, vec!["a".to_string(), "b".to_string()], now_ms());
let (removed, inserted) = index
.batch_update(
1,
vec!["a".to_string(), "b".to_string()],
vec!["a".to_string(), "b".to_string(), "c".to_string()],
now_ms(),
)
.unwrap();
assert_eq!(removed, 0);
assert_eq!(inserted, 1);
let ids = index
.query_with(&"c".to_string(), |ids| Some(ids.clone()))
.unwrap();
assert!(ids.contains(&1));
let (removed, inserted) = index
.batch_update(
1,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
vec!["a".to_string()],
now_ms(),
)
.unwrap();
assert_eq!(removed, 2);
assert_eq!(inserted, 0);
assert_eq!(
index
.query_with(&"a".to_string(), |ids| {
println!("ids for 'a': {:?}", ids);
Some(ids.clone())
})
.unwrap()
.len(),
1
);
assert!(
index
.query_with(&"c".to_string(), |ids| Some(ids.clone()))
.is_none()
);
let (removed, inserted) = index
.batch_update(
1,
vec!["a".to_string()],
vec!["b".to_string(), "c".to_string()],
now_ms(),
)
.unwrap();
assert_eq!(removed, 1);
assert_eq!(inserted, 2);
let ids_b = index
.query_with(&"b".to_string(), |ids| Some(ids.clone()))
.unwrap();
let ids_c = index
.query_with(&"c".to_string(), |ids| Some(ids.clone()))
.unwrap();
assert!(ids_b.contains(&1));
assert!(ids_c.contains(&1));
assert!(
index
.query_with(&"a".to_string(), |ids| Some(ids.clone()))
.unwrap_or_default()
.is_empty()
);
let (removed, inserted) = index
.batch_update(
1,
vec!["b".to_string(), "c".to_string()],
vec!["x".to_string(), "y".to_string()],
now_ms(),
)
.unwrap();
assert_eq!(removed, 2);
assert_eq!(inserted, 2);
let ids_x = index
.query_with(&"x".to_string(), |ids| Some(ids.clone()))
.unwrap();
let ids_y = index
.query_with(&"y".to_string(), |ids| Some(ids.clone()))
.unwrap();
assert!(ids_x.contains(&1));
assert!(ids_y.contains(&1));
assert!(
index
.query_with(&"b".to_string(), |ids| Some(ids.clone()))
.unwrap_or_default()
.is_empty()
);
assert!(
index
.query_with(&"c".to_string(), |ids| Some(ids.clone()))
.unwrap_or_default()
.is_empty()
);
let (removed, inserted) = index
.batch_update(
1,
vec!["x".to_string(), "y".to_string()],
vec!["x".to_string(), "y".to_string()],
now_ms(),
)
.unwrap();
assert_eq!(removed, 0);
assert_eq!(inserted, 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_chaos() {
let index = Arc::new(BTreeIndex::<u64, String>::new(
"chaos_index".to_string(),
Some(BTreeConfig {
bucket_overload_size: 256,
allow_duplicates: true,
}),
));
let n_threads = 10;
let n_keys_per_thread = 100;
let barrier = Arc::new(Barrier::new(n_threads));
let mut handles = Vec::new();
for t in 0..n_threads {
let index = index.clone();
let b = barrier.clone();
handles.push(tokio::spawn(async move {
b.wait().await;
let base = t * n_keys_per_thread;
let items: Vec<_> = (0..n_keys_per_thread)
.map(|i| format!("key_{}", base + i))
.collect();
for j in 0..5 {
let _ = index.insert_array((base + j) as u64, items.clone(), now_ms());
}
}));
}
futures::future::try_join_all(handles).await.unwrap();
for t in 0..n_threads {
let base = t * n_keys_per_thread;
for i in 0..n_keys_per_thread {
let key = format!("key_{}", base + i);
let result = index.query_with(&key, |ids| Some(ids.clone()));
assert!(result.is_some(), "key {key} not found");
let ids = result.unwrap();
assert_eq!(ids.len(), 5, "key {key} should have 5 doc IDs");
for j in 0..5 {
let doc_id = (base + j) as u64;
assert!(ids.contains(&doc_id), "id {doc_id} not found for key {key}");
}
}
}
let size_before_remove = index.len();
assert_eq!(size_before_remove, n_threads * n_keys_per_thread);
println!("索引大小 (删除前): {size_before_remove}");
let barrier = Arc::new(Barrier::new(n_threads));
let mut handles = Vec::new();
for t in 0..n_threads {
let index = index.clone();
let b = barrier.clone();
handles.push(tokio::spawn(async move {
b.wait().await;
let base = t * n_keys_per_thread;
let items: Vec<_> = (0..n_keys_per_thread)
.map(|i| format!("key_{}", base + i))
.collect();
for j in 0..3 {
let doc_id = (base + j) as u64;
let removed = index.remove_array(doc_id, items.clone(), now_ms());
assert_eq!(
removed, n_keys_per_thread,
"应删除 {n_keys_per_thread} 个键,实际删除 {removed}"
);
}
}));
}
futures::future::try_join_all(handles).await.unwrap();
for t in 0..n_threads {
let base = t * n_keys_per_thread;
for i in 0..n_keys_per_thread {
let key = format!("key_{}", base + i);
let result = index.query_with(&key, |ids| Some(ids.clone()));
assert!(result.is_some(), "删除后键 {key} 不应该被完全移除");
let ids = result.unwrap();
assert_eq!(ids.len(), 2, "删除后键 {key} 应该有2个文档ID");
for j in 0..3 {
let doc_id = (base + j) as u64;
assert!(!ids.contains(&doc_id), "文档ID {doc_id} 应该已被删除");
}
for j in 3..5 {
let doc_id = (base + j) as u64;
assert!(ids.contains(&doc_id), "文档ID {doc_id} 应该仍然存在");
}
}
}
let mut handles = Vec::new();
for t in 0..n_threads {
let index = index.clone();
handles.push(tokio::spawn(async move {
let base = t * n_keys_per_thread;
let items: Vec<_> = (0..n_keys_per_thread)
.map(|i| format!("key_{}", base + i))
.collect();
for j in 3..5 {
let doc_id = (base + j) as u64;
index.remove_array(doc_id, items.clone(), now_ms());
}
}));
}
futures::future::try_join_all(handles).await.unwrap();
assert_eq!(index.len(), 0, "删除所有文档ID后索引应该为空");
for t in 0..n_threads {
let base = t * n_keys_per_thread;
for i in 0..n_keys_per_thread {
let key = format!("key_{}", base + i);
let result = index.query_with(&key, |ids| Some(ids.clone()));
assert!(result.is_none(), "键 {key} 应该已完全从索引中移除");
}
}
}
#[test]
fn test_stats() {
let index = create_test_index();
let stats = index.stats();
assert_eq!(stats.num_elements, 0);
assert_eq!(stats.query_count, 0);
assert_eq!(stats.insert_count, 0);
assert_eq!(stats.delete_count, 0);
let _ = index.insert(1, "apple".to_string(), now_ms());
let _ = index.insert(2, "banana".to_string(), now_ms());
let stats = index.stats();
assert_eq!(stats.num_elements, 2);
assert_eq!(stats.insert_count, 2);
let _ = index.query_with(&"apple".to_string(), |_| Some(()));
let _: Vec<()> =
index.range_query_with(RangeQuery::Ge("a".to_string()), |_, _| (true, vec![]));
let stats = index.stats();
assert_eq!(stats.query_count, 2);
let _ = index.remove(1, "apple".to_string(), now_ms());
let stats = index.stats();
assert_eq!(stats.num_elements, 1);
assert_eq!(stats.delete_count, 1);
}
#[test]
fn test_insert_array_uses_correct_bucket_for_existing_postings() {
let config = BTreeConfig {
bucket_overload_size: 80, allow_duplicates: true,
};
let index = BTreeIndex::new("bucket_track".to_string(), Some(config));
let mut doc = 1u64;
while index.stats().max_bucket_id == 0 && doc < 200 {
index.insert(doc, "alpha".to_string(), now_ms()).unwrap();
doc += 1;
}
let bucket_after_migration = index.stats().max_bucket_id;
assert!(bucket_after_migration > 0, "migration should have occurred");
index.insert(1, "beta".to_string(), now_ms()).unwrap();
let result =
index.insert_array(999, vec!["alpha".to_string(), "beta".to_string()], now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 2);
let alpha_ids = index
.query_with(&"alpha".to_string(), |ids| Some(ids.clone()))
.unwrap();
assert!(alpha_ids.contains(&999));
let beta_ids = index
.query_with(&"beta".to_string(), |ids| Some(ids.clone()))
.unwrap();
assert!(beta_ids.contains(&999));
}
#[test]
fn test_insert_array_enforces_unique_in_occupied_branch() {
let config = BTreeConfig {
bucket_overload_size: 1024,
allow_duplicates: false,
};
let unique_index = BTreeIndex::new("unique_array".to_string(), Some(config));
unique_index
.insert(1, "apple".to_string(), now_ms())
.unwrap();
let result = unique_index.insert_array(2, vec!["apple".to_string()], now_ms());
assert!(result.is_err());
match result {
Err(BTreeError::AlreadyExists { .. }) => {}
other => panic!("Expected AlreadyExists, got: {other:?}"),
}
let result = unique_index.insert_array(1, vec!["apple".to_string()], now_ms());
assert!(result.is_ok());
assert_eq!(result.unwrap(), 0);
}
#[test]
fn test_range_keys_or_returns_sorted_order() {
let index = create_populated_index();
let query = RangeQuery::Or(vec![
Box::new(RangeQuery::Ge("eggplant".to_string())),
Box::new(RangeQuery::Le("banana".to_string())),
]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(
results,
vec!["apple", "banana", "eggplant"],
"Or query must return keys in global B-tree order"
);
}
#[test]
fn test_range_keys_or_deduplicates() {
let index = create_populated_index();
let query = RangeQuery::Or(vec![
Box::new(RangeQuery::Between(
"banana".to_string(),
"cherry".to_string(),
)),
Box::new(RangeQuery::Between(
"apple".to_string(),
"cherry".to_string(),
)),
]);
let results = index.range_query_with(query, |k, _| (true, vec![k.clone()]));
assert_eq!(results, vec!["apple", "banana", "cherry"]);
}
#[test]
fn test_insert_array_grows_bucket_size_for_existing_postings() {
let config = BTreeConfig {
bucket_overload_size: 1024,
allow_duplicates: true,
};
let index: BTreeIndex<u64, String> =
BTreeIndex::new("size_growth".to_string(), Some(config));
index
.insert_array(
1,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
now_ms(),
)
.unwrap();
let initial_size = index.buckets.get(&0).unwrap().0;
assert!(initial_size > 0);
for doc_id in 2u64..50 {
index
.insert_array(
doc_id,
vec!["a".to_string(), "b".to_string(), "c".to_string()],
now_ms(),
)
.unwrap();
}
let grown_size = index.buckets.get(&0).unwrap().0;
assert!(
grown_size > initial_size,
"bucket size should grow when doc_ids are appended via insert_array \
(initial={initial_size}, after={grown_size})"
);
}
#[test]
fn test_insert_migration_subtracts_previous_posting_size_from_source_bucket() {
let config = BTreeConfig {
bucket_overload_size: 128,
allow_duplicates: true,
};
let index: BTreeIndex<u64, String> =
BTreeIndex::new("single_insert_migration_size".to_string(), Some(config));
index.insert(1, "anchor".to_string(), now_ms()).unwrap();
index.insert(1, "moving".to_string(), now_ms()).unwrap();
let moving_key = "moving".to_string();
let previous_posting_size = {
let posting = index.postings.get(&moving_key).unwrap();
estimate_cbor_size(&*posting) + 2
};
let forced_source_size = {
let mut bucket = index.buckets.get_mut(&0).unwrap();
bucket.0 = index.config.bucket_overload_size - 1;
bucket.0
};
index.insert(2, moving_key.clone(), now_ms()).unwrap();
let moved_posting = index.postings.get(&moving_key).unwrap();
assert_ne!(moved_posting.0, 0, "posting should migrate to a new bucket");
let source_bucket = index.buckets.get(&0).unwrap();
assert_eq!(
source_bucket.0,
forced_source_size.saturating_sub(previous_posting_size),
"source bucket must subtract the posting size before the appended doc_id"
);
assert!(!source_bucket.2.contains(&moving_key));
}
#[tokio::test]
async fn test_compact_buckets() {
let config = BTreeConfig {
bucket_overload_size: 50,
allow_duplicates: true,
};
let index: BTreeIndex<u64, String> =
BTreeIndex::new("compact_test".to_string(), Some(config));
let values: Vec<String> = (0..30).map(|i| format!("value_{i:03}")).collect();
for (i, v) in values.iter().enumerate() {
index.insert(i as u64, v.clone(), now_ms()).unwrap();
}
let before = index.stats();
let bucket_count_before = before.max_bucket_id + 1;
println!("Before compact: {} buckets", bucket_count_before);
assert!(bucket_count_before > 2, "should have multiple buckets");
let mut metadata_buf = Vec::new();
let mut bucket_data: std::collections::HashMap<u32, Vec<u8>> =
std::collections::HashMap::new();
index
.flush(&mut metadata_buf, 1, async |id: u32, data: &[u8]| {
bucket_data.insert(id, data.to_vec());
Ok(true)
})
.await
.unwrap();
let mut loaded: BTreeIndex<u64, String> =
BTreeIndex::load_metadata(&metadata_buf[..]).unwrap();
loaded.config.bucket_overload_size = 1024 * 512;
loaded.metadata.write().config.bucket_overload_size = 1024 * 512;
loaded
.load_buckets(async |id| Ok(bucket_data.get(&id).cloned()))
.await
.unwrap();
let queries: Vec<&str> = vec!["value_000", "value_010", "value_020"];
let results_before: Vec<Option<Vec<u64>>> = queries
.iter()
.map(|q| loaded.query_with(&q.to_string(), |ids| Some(ids.to_vec())))
.collect();
let (old, new) = loaded.compact_buckets();
println!("Compacted: {} -> {} buckets", old, new);
assert!(
new < old,
"compaction should reduce bucket count significantly"
);
assert!(
new <= 2,
"with 512K limit all postings should fit in 1-2 buckets, got {}",
new,
);
for (i, q) in queries.iter().enumerate() {
let result = loaded.query_with(&q.to_string(), |ids| Some(ids.to_vec()));
assert_eq!(
results_before[i], result,
"query '{}' result changed after compaction",
q
);
}
let mut metadata_buf2 = Vec::new();
let mut bucket_data2: std::collections::HashMap<u32, Vec<u8>> =
std::collections::HashMap::new();
loaded
.flush(&mut metadata_buf2, 100, async |id: u32, data: &[u8]| {
bucket_data2.insert(id, data.to_vec());
Ok(true)
})
.await
.unwrap();
let final_loaded: BTreeIndex<u64, String> =
BTreeIndex::load_all(&metadata_buf2[..], async |id| {
Ok(bucket_data2.get(&id).cloned())
})
.await
.unwrap();
assert_eq!(
final_loaded.stats().num_elements,
loaded.stats().num_elements
);
for q in &queries {
let orig = loaded.query_with(&q.to_string(), |ids| Some(ids.to_vec()));
let reloaded = final_loaded.query_with(&q.to_string(), |ids| Some(ids.to_vec()));
assert_eq!(orig, reloaded, "query '{}' mismatch after reload", q);
}
}
}