mod composite_filters {
use std::sync::Arc;
use bytes::Bytes;
use slatedb::config::{
FlushOptions, FlushType, PutOptions, ScanOptions, Settings, WriteOptions,
};
use slatedb::object_store::memory::InMemory;
use slatedb::object_store::ObjectStore;
use slatedb::{BloomFilterPolicy, Db, FilterPolicy, PrefixExtractor, PrefixTarget};
const SAMPLE_USERS: &[&[u8]] = &[
b"u:alice", b"u:bob", b"u:carol", b"u:dave", b"u:eve", b"u:frank",
];
const SAMPLE_NON_USERS: &[&[u8]] = &[
b"session:s1",
b"session:s2",
b"metric:m1",
b"metric:m2",
b"metric:m3",
];
struct ConditionalPrefixExtractor {
marker: Bytes,
extra_len: usize,
name: String,
}
impl ConditionalPrefixExtractor {
fn new(marker: &[u8], extra_len: usize) -> Self {
Self {
marker: Bytes::copy_from_slice(marker),
extra_len,
name: format!(
"cond:{}:{}",
std::str::from_utf8(marker).expect("marker must be utf8"),
extra_len
),
}
}
}
impl PrefixExtractor for ConditionalPrefixExtractor {
fn name(&self) -> &str {
&self.name
}
fn prefix_len(&self, target: &PrefixTarget) -> Option<usize> {
let input = match target {
PrefixTarget::Point(k) => k.as_ref(),
PrefixTarget::Prefix(p) => p.as_ref(),
};
let total = self.marker.len() + self.extra_len;
if input.len() >= total && input.starts_with(self.marker.as_ref()) {
Some(total)
} else {
None
}
}
}
fn composite_policies() -> Vec<Arc<dyn FilterPolicy>> {
vec![
Arc::new(BloomFilterPolicy::new(10)),
Arc::new(
BloomFilterPolicy::new(10)
.with_whole_key_filtering(false)
.with_prefix_extractor(Arc::new(ConditionalPrefixExtractor::new(b"u:", 4))),
),
]
}
fn base_settings() -> Settings {
Settings {
min_filter_keys: 0,
compactor_options: None,
..Settings::default()
}
}
async fn flush_memtable(db: &Db) {
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.expect("memtable flush failed");
}
async fn write_sample_data(db: &Db) {
let put = PutOptions::default();
let write = WriteOptions {
await_durable: false,
seqnum: 0,
};
for batch in [SAMPLE_USERS, SAMPLE_NON_USERS] {
for (i, key) in batch.iter().enumerate() {
let value = format!(
"v{}-{}",
std::str::from_utf8(key).expect("sample key must be utf8"),
i
)
.into_bytes();
db.put_with_options(key, &value, &put, &write)
.await
.expect("put failed");
}
flush_memtable(db).await;
}
}
async fn assert_reads_consistent(db: &Db) {
for key in SAMPLE_USERS.iter().chain(SAMPLE_NON_USERS.iter()) {
let got = db.get(key).await.expect("get failed");
assert!(got.is_some(), "expected key {:?} to be present", key);
}
for absent in [
b"u:nope__".as_slice(),
b"u:zzzzzz".as_slice(),
b"session:nope".as_slice(),
b"metric:nope".as_slice(),
] {
let got = db.get(absent).await.expect("get failed");
assert!(got.is_none(), "expected absent key {:?}", absent);
}
let mut iter = db
.scan_prefix_with_options(b"u:", &ScanOptions::default())
.await
.expect("scan_prefix failed");
let mut keys = Vec::new();
while let Some(kv) = iter.next().await.expect("iterator next failed") {
keys.push(kv.key.to_vec());
}
keys.sort();
let mut expected: Vec<Vec<u8>> = SAMPLE_USERS.iter().map(|k| k.to_vec()).collect();
expected.sort();
assert_eq!(keys, expected, "scan_prefix(b\"u:\") mismatch");
let mut iter = db
.scan_prefix_with_options(b"metric:", &ScanOptions::default())
.await
.expect("scan_prefix failed");
let mut keys = Vec::new();
while let Some(kv) = iter.next().await.expect("iterator next failed") {
keys.push(kv.key.to_vec());
}
keys.sort();
let mut expected: Vec<Vec<u8>> = SAMPLE_NON_USERS
.iter()
.filter(|k| k.starts_with(b"metric:"))
.map(|k| k.to_vec())
.collect();
expected.sort();
assert_eq!(keys, expected, "scan_prefix(b\"metric:\") mismatch");
}
async fn open_db(
path: &str,
store: Arc<dyn ObjectStore>,
policies: Vec<Arc<dyn FilterPolicy>>,
) -> Db {
Db::builder(path, store)
.with_settings(base_settings())
.with_filter_policies(policies)
.build()
.await
.expect("failed to build db")
}
#[tokio::test]
async fn composite_policies_lifecycle() {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test/composite_reopen";
let db = open_db(path, store.clone(), composite_policies()).await;
write_sample_data(&db).await;
assert_reads_consistent(&db).await;
db.close().await.expect("close failed");
let mut reversed = composite_policies();
reversed.reverse();
let db = open_db(path, store.clone(), reversed).await;
assert_reads_consistent(&db).await;
db.close().await.expect("close failed");
}
}
mod prop_test {
use std::sync::Arc;
use proptest::collection::vec;
use proptest::prelude::*;
use slatedb::config::{PutOptions, ScanOptions, Settings, WriteOptions};
use slatedb::object_store::memory::InMemory;
use slatedb::{BloomFilterPolicy, Db, FilterPolicy, PrefixExtractor, PrefixTarget};
use tokio::runtime::Runtime;
const PREFIX_LEN: usize = 3;
struct FixedPrefixExtractor {
len: usize,
name: String,
}
impl FixedPrefixExtractor {
fn new(len: usize) -> Self {
Self {
len,
name: format!("fixed{}", len),
}
}
}
impl PrefixExtractor for FixedPrefixExtractor {
fn name(&self) -> &str {
&self.name
}
fn prefix_len(&self, target: &PrefixTarget) -> Option<usize> {
let input = match target {
PrefixTarget::Point(k) => k.as_ref(),
PrefixTarget::Prefix(p) => p.as_ref(),
};
(input.len() >= self.len).then_some(self.len)
}
}
fn settings() -> Settings {
Settings {
min_filter_keys: 0,
compactor_options: None,
..Settings::default()
}
}
async fn build_db(path: &str, filter_policies: Vec<Arc<dyn FilterPolicy>>) -> Db {
Db::builder(path, Arc::new(InMemory::new()))
.with_settings(settings())
.with_filter_policies(filter_policies)
.build()
.await
.expect("failed to build db")
}
async fn write_keys(db: &Db, keys: &[Vec<u8>]) {
let put_opts = PutOptions::default();
let write_opts = WriteOptions {
await_durable: false,
seqnum: 0,
};
for (i, key) in keys.iter().enumerate() {
let value = format!("v{}", i).into_bytes();
db.put_with_options(key, &value, &put_opts, &write_opts)
.await
.expect("put failed");
if i % 8 == 7 {
db.flush().await.expect("flush failed");
}
}
db.flush().await.expect("flush failed");
}
async fn collect_prefix_scan(db: &Db, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut iter = db
.scan_prefix_with_options(prefix, &ScanOptions::default())
.await
.expect("scan_prefix failed");
let mut out = Vec::new();
while let Some(kv) = iter.next().await.expect("iterator next failed") {
out.push((kv.key.to_vec(), kv.value.to_vec()));
}
out
}
fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
vec(
prop_oneof![Just(b'a'), Just(b'b'), Just(b'c'), Just(b'd')],
3..=8,
)
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 64,
..ProptestConfig::default()
})]
#[test]
fn prefix_scan_matches_with_and_without_filter(
keys in vec(key_strategy(), 1..50),
query_seed in any::<u8>(),
) {
let runtime = Runtime::new().expect("failed to create runtime");
runtime.block_on(async {
let db_no_filter = build_db("/test/no_filter", vec![]).await;
let db_with_filter = build_db(
"/test/with_filter",
vec![Arc::new(
BloomFilterPolicy::new(10).with_prefix_extractor(
Arc::new(FixedPrefixExtractor::new(PREFIX_LEN)),
),
)],
)
.await;
write_keys(&db_no_filter, &keys).await;
write_keys(&db_with_filter, &keys).await;
let mut prefixes: Vec<Vec<u8>> = keys
.iter()
.filter(|k| k.len() >= PREFIX_LEN)
.map(|k| k[..PREFIX_LEN].to_vec())
.collect();
prefixes.sort();
prefixes.dedup();
prefixes.push(b"zzz".to_vec());
prefixes.push(b"xyz".to_vec());
let pick = prefixes[query_seed as usize % prefixes.len()].clone();
let no_filter_results = collect_prefix_scan(&db_no_filter, &pick).await;
let with_filter_results = collect_prefix_scan(&db_with_filter, &pick).await;
prop_assert_eq!(no_filter_results, with_filter_results);
db_no_filter.close().await.expect("close failed");
db_with_filter.close().await.expect("close failed");
Ok(())
})?;
}
}
}