use std::time::{Duration, Instant};
use nodedb::bridge::dispatch::{BridgeRequest, BridgeResponse};
use nodedb::bridge::envelope::{Priority, Request, Status};
use nodedb::data::executor::core_loop::CoreLoop;
use nodedb::data::executor::response_codec::decode_payload_to_json;
use nodedb::types::*;
use nodedb_bridge::buffer::{Consumer, Producer, RingBuffer};
use nodedb_physical::physical_plan::{DocumentOp, PhysicalPlan, TextOp, VectorOp};
use nodedb_types::vector_distance::DistanceMetric;
use nodedb_types::{Surrogate, SurrogateBitmap};
fn open_core() -> (
CoreLoop,
Producer<BridgeRequest>,
Consumer<BridgeResponse>,
tempfile::TempDir,
) {
let dir = tempfile::tempdir().unwrap();
let (req_tx, req_rx) = RingBuffer::channel::<BridgeRequest>(128);
let (resp_tx, resp_rx) = RingBuffer::channel::<BridgeResponse>(128);
let core = CoreLoop::open(
0,
req_rx,
resp_tx,
dir.path(),
std::sync::Arc::new(nodedb_types::OrdinalClock::new()),
)
.unwrap();
(core, req_tx, resp_rx, dir)
}
fn make_req(plan: PhysicalPlan) -> Request {
Request {
request_id: RequestId::new(1),
tenant_id: TenantId::new(1),
vshard_id: VShardId::new(0),
database_id: nodedb::types::DatabaseId::DEFAULT,
plan,
deadline: Instant::now() + Duration::from_secs(5),
priority: Priority::Normal,
trace_id: nodedb_types::TraceId::ZERO,
consistency: ReadConsistency::Strong,
idempotency_key: None,
event_source: nodedb::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
}
}
fn send_ok(
core: &mut CoreLoop,
tx: &mut Producer<BridgeRequest>,
rx: &mut Consumer<BridgeResponse>,
plan: PhysicalPlan,
) -> Vec<u8> {
tx.try_push(BridgeRequest {
inner: make_req(plan),
})
.unwrap();
core.tick();
let resp = rx.try_pop().unwrap();
assert_eq!(
resp.inner.status,
Status::Ok,
"expected Ok status, got {:?}: {:?}",
resp.inner.status,
resp.inner.error_code
);
resp.inner.payload.to_vec()
}
fn parse_json(payload: &[u8]) -> serde_json::Value {
let s = decode_payload_to_json(payload);
serde_json::from_str(&s).unwrap_or(serde_json::Value::Array(vec![]))
}
fn extract_vector_surrogates(payload: &[u8]) -> Vec<u32> {
parse_json(payload)
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|h| h.get("id").and_then(|v| v.as_u64()).map(|n| n as u32))
.collect()
}
fn extract_fts_surrogates(payload: &[u8]) -> Vec<u32> {
parse_json(payload)
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|h| {
h.get("id")
.and_then(|v| v.as_str())
.and_then(|hex| u32::from_str_radix(hex, 16).ok())
})
.collect()
}
fn extract_doc_surrogates(payload: &[u8]) -> Vec<u32> {
parse_json(payload)
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|h| {
h.get("id")
.and_then(|v| v.as_str())
.and_then(|hex| u32::from_str_radix(hex, 16).ok())
})
.collect()
}
const LEARNING_SURS: &[u32] = &[1, 2, 3, 4, 5, 6, 7];
const NON_LEARNING_SURS: &[u32] = &[8, 9, 10];
const COLLECTION: &str = "articles_3way";
#[test]
fn three_way_fts_vector_doc_bitmap() {
let (mut core, mut tx, mut rx, _dir) = open_core();
for &s in LEARNING_SURS {
let hex = format!("{s:08x}");
let doc = serde_json::json!({
"id": hex,
"body": format!("machine learning fundamentals row {s}"),
"emb": [1.0f64, s as f64 * 0.01, 0.0],
});
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: COLLECTION.into(),
document_id: hex.clone(),
value: serde_json::to_vec(&doc).unwrap(),
surrogate: Surrogate::new(s),
pk_bytes: hex.into_bytes(),
}),
);
}
for &s in NON_LEARNING_SURS {
let hex = format!("{s:08x}");
let doc = serde_json::json!({
"id": hex,
"body": format!("quantum computing photonic qubits row {s}"),
"emb": [-1.0f64, s as f64 * 0.01, 0.0],
});
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: COLLECTION.into(),
document_id: hex.clone(),
value: serde_json::to_vec(&doc).unwrap(),
surrogate: Surrogate::new(s),
pk_bytes: hex.into_bytes(),
}),
);
}
for &s in LEARNING_SURS.iter().chain(NON_LEARNING_SURS) {
tx.try_push(BridgeRequest {
inner: make_req(PhysicalPlan::Vector(VectorOp::Insert {
collection: COLLECTION.into(),
vector: if s <= 7 {
vec![1.0f32, s as f32 * 0.01, 0.0]
} else {
vec![-1.0f32, s as f32 * 0.01, 0.0]
},
dim: 3,
field_name: String::new(),
surrogate: Surrogate::new(s),
})),
})
.unwrap();
}
core.tick();
for _ in 0..10 {
let resp = rx.try_pop().unwrap();
assert_eq!(resp.inner.status, Status::Ok);
}
let fts_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Text(TextOp::Search {
collection: COLLECTION.into(),
query: "learning".into(),
top_k: 20,
fuzzy: false,
prefilter: None,
rls_filters: Vec::new(),
}),
);
let fts_surs = extract_fts_surrogates(&fts_payload);
let mut bitmap_a = SurrogateBitmap::new();
for &s in &fts_surs {
bitmap_a.insert(Surrogate::new(s));
}
for &s in LEARNING_SURS {
assert!(
bitmap_a.contains(Surrogate::new(s)),
"FTS bitmap must contain surrogate {s} (row {s} has 'learning')"
);
}
for &s in NON_LEARNING_SURS {
assert!(
!bitmap_a.contains(Surrogate::new(s)),
"FTS bitmap must NOT contain surrogate {s} (row {s} has no 'learning')"
);
}
assert_eq!(
bitmap_a.len(),
LEARNING_SURS.len() as u64,
"FTS bitmap must contain exactly {} surrogates, got {}",
LEARNING_SURS.len(),
bitmap_a.len()
);
let vec_filtered = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Vector(VectorOp::Search {
collection: COLLECTION.into(),
query_vector: vec![1.0f32, 0.0, 0.0],
top_k: 20,
ef_search: 0,
filter_bitmap: Some(bitmap_a.clone()),
field_name: String::new(),
rls_filters: Vec::new(),
inline_prefilter_plan: None,
ann_options: Default::default(),
skip_payload_fetch: false,
payload_filters: Vec::new(),
metric: DistanceMetric::L2,
}),
);
let vec_surs = extract_vector_surrogates(&vec_filtered);
for &s in &vec_surs {
assert!(
bitmap_a.contains(Surrogate::new(s)),
"vector result surrogate {s} is not in the FTS bitmap"
);
}
for &s in NON_LEARNING_SURS {
assert!(
!vec_surs.contains(&s),
"non-learning surrogate {s} must not appear in FTS-prefiltered vector search"
);
}
assert!(
!vec_surs.is_empty(),
"prefiltered vector search must return at least one result"
);
let doc_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::Scan {
collection: COLLECTION.into(),
limit: 100,
offset: 0,
sort_keys: Vec::new(),
filters: Vec::new(),
distinct: false,
projection: Vec::new(),
computed_columns: Vec::new(),
window_functions: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
prefilter: Some(bitmap_a.clone()),
}),
);
let doc_surs = extract_doc_surrogates(&doc_payload);
let mut doc_sur_set: std::collections::HashSet<u32> = doc_surs.iter().copied().collect();
for &s in LEARNING_SURS {
assert!(
doc_sur_set.remove(&s),
"document scan must include surrogate {s} (in FTS bitmap)"
);
}
assert!(
doc_sur_set.is_empty(),
"document scan returned unexpected surrogates not in FTS bitmap: {doc_sur_set:?}"
);
let mut result_bitmap = SurrogateBitmap::new();
for &s in &doc_surs {
result_bitmap.insert(Surrogate::new(s));
}
let intersection = bitmap_a.intersect(&result_bitmap);
assert_eq!(
intersection.len(),
LEARNING_SURS.len() as u64,
"intersection of FTS bitmap and doc scan results must equal {{1..7}}"
);
for &s in LEARNING_SURS {
assert!(
intersection.contains(Surrogate::new(s)),
"intersection must contain surrogate {s}"
);
}
}