use nodedb_types::DatabaseId;
use nodedb_types::Surrogate;
use serde::{Deserialize, Serialize};
use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::scan_filter::ScanFilter;
use crate::control::state::SharedState;
use nodedb_physical::physical_plan::VectorOp;
#[derive(Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack)]
#[msgpack(map)]
struct Hit {
id: u32,
distance: f32,
#[serde(skip_serializing_if = "Option::is_none")]
doc_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<Vec<u8>>,
}
fn apply_rls_filter(hits: &mut Vec<Hit>, rls_filters: &[u8], top_k: usize) {
if rls_filters.is_empty() {
return;
}
let filters: Vec<ScanFilter> = match zerompk::from_msgpack(rls_filters) {
Ok(f) => f,
Err(_) => {
tracing::warn!("RLS filter decode failed at CP boundary — denying all hits");
hits.clear();
return;
}
};
hits.retain(|h| match h.body.as_deref() {
Some(body) => filters.iter().all(|f| f.matches_binary(body)),
None => false,
});
if hits.len() > top_k {
hits.truncate(top_k);
}
for h in hits.iter_mut() {
h.body = None;
}
}
pub fn translate_vector_search_payload(
payload: &[u8],
state: &SharedState,
collection: &str,
rls_filters: &[u8],
top_k: usize,
) -> Vec<u8> {
if payload.is_empty() {
return payload.to_vec();
}
let first = payload[0];
if first == b'[' || first == b'{' || first == b'"' {
return payload.to_vec();
}
let mut hits: Vec<Hit> = match zerompk::from_msgpack(payload) {
Ok(h) => h,
Err(_) => return payload.to_vec(),
};
apply_rls_filter(&mut hits, rls_filters, top_k);
if let Some(catalog) = state.credentials.catalog().as_ref() {
for hit in &mut hits {
if hit.doc_id.is_some() {
continue;
}
if let Ok(Some(pk_bytes)) = catalog.get_pk_for_surrogate(
DatabaseId::DEFAULT,
collection,
Surrogate::new(hit.id),
) && let Ok(s) = String::from_utf8(pk_bytes)
{
hit.doc_id = Some(s);
}
}
}
use std::collections::BTreeMap;
let flattened: Vec<BTreeMap<String, serde_json::Value>> = hits
.iter()
.map(|h| {
let mut obj: BTreeMap<String, serde_json::Value> = BTreeMap::new();
obj.insert("distance".into(), serde_json::json!(h.distance));
if let Some(ref body) = h.body
&& let Ok(map) = zerompk::from_msgpack::<
std::collections::HashMap<String, nodedb_types::Value>,
>(body)
{
for (k, v) in map {
if let Ok(j) = serde_json::to_value(&v) {
obj.insert(k, j);
}
}
}
if !obj.contains_key("id") {
if let Some(ref doc) = h.doc_id {
obj.insert("id".into(), serde_json::json!(doc));
} else {
obj.insert("id".into(), serde_json::json!(h.id));
}
}
obj.insert("_surrogate".into(), serde_json::json!(h.id));
obj
})
.collect();
if let Ok(s) = sonic_rs::to_string(&flattened) {
return s.into_bytes();
}
zerompk::to_msgpack_vec(&hits).unwrap_or_else(|_| payload.to_vec())
}
pub fn translate_if_vector(payload: &[u8], plan: &PhysicalPlan, state: &SharedState) -> Vec<u8> {
let (collection, rls_filters, top_k): (&str, &[u8], usize) = match plan {
PhysicalPlan::Vector(VectorOp::Search {
collection,
rls_filters,
top_k,
..
}) => (collection.as_str(), rls_filters.as_slice(), *top_k),
PhysicalPlan::Vector(VectorOp::MultiSearch {
collection,
rls_filters,
top_k,
..
}) => (collection.as_str(), rls_filters.as_slice(), *top_k),
PhysicalPlan::Vector(VectorOp::MultiVectorScoreSearch {
collection, top_k, ..
}) => (collection.as_str(), &[][..], *top_k),
_ => return payload.to_vec(),
};
translate_vector_search_payload(payload, state, collection, rls_filters, top_k)
}