use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::scan_filter::{FilterOp, ScanFilter};
use crate::data::executor::handlers::join::{binary_row_project, merge_join_docs_binary};
use nodedb_physical::physical_plan::{DocumentOp, JoinProjection};
use nodedb_query::msgpack_scan;
pub(super) const MAX_RESULT_ROWS: usize = 100_000;
pub(super) fn extract_outer_field(outer_bytes: &[u8], field: &str) -> Option<nodedb_types::Value> {
if let Some((s, e)) = msgpack_scan::extract_field(outer_bytes, 0, "data") {
let data_bytes = &outer_bytes[s..e];
if let Some((start, end)) = msgpack_scan::extract_field(data_bytes, 0, field)
&& let Ok(v) = nodedb_types::value_from_msgpack(&data_bytes[start..end])
{
return Some(v);
}
}
if let Some((start, end)) = msgpack_scan::extract_field(outer_bytes, 0, field)
&& let Ok(v) = nodedb_types::value_from_msgpack(&outer_bytes[start..end])
{
return Some(v);
}
None
}
pub(super) fn bind_outer_values(
filters: Vec<ScanFilter>,
outer_bytes: &[u8],
outer_alias: &str,
) -> Vec<ScanFilter> {
filters
.into_iter()
.map(|f| bind_filter_outer(f, outer_bytes, outer_alias))
.collect()
}
pub(super) fn strip_filter_qualifiers(filters: Vec<ScanFilter>) -> Vec<ScanFilter> {
filters
.into_iter()
.map(|f| {
let unqualified = f
.field
.find('.')
.map_or(f.field.clone(), |dot| f.field[dot + 1..].to_string());
ScanFilter {
field: unqualified,
..f
}
})
.collect()
}
pub(super) fn bind_filter_outer(
f: ScanFilter,
outer_bytes: &[u8],
outer_alias: &str,
) -> ScanFilter {
let literal_op = match f.op {
FilterOp::EqColumn => Some(FilterOp::Eq),
FilterOp::GtColumn => Some(FilterOp::Gt),
FilterOp::GteColumn => Some(FilterOp::Gte),
FilterOp::LtColumn => Some(FilterOp::Lt),
FilterOp::LteColumn => Some(FilterOp::Lte),
FilterOp::NeColumn => Some(FilterOp::Ne),
_ => None,
};
let Some(lit_op) = literal_op else {
let unqualified = f
.field
.find('.')
.map_or(f.field.clone(), |dot| f.field[dot + 1..].to_string());
if !f.clauses.is_empty() {
let bound_clauses = f
.clauses
.into_iter()
.map(|clause| {
clause
.into_iter()
.map(|sf| bind_filter_outer(sf, outer_bytes, outer_alias))
.collect()
})
.collect();
return ScanFilter {
field: unqualified,
clauses: bound_clauses,
..f
};
}
return ScanFilter {
field: unqualified,
..f
};
};
let col_ref = match &f.value {
nodedb_types::Value::String(s) => s.clone(),
_ => return f,
};
let bare = if let Some(rest) = col_ref
.strip_prefix(outer_alias)
.and_then(|s| s.strip_prefix('.'))
{
rest.to_string()
} else {
col_ref.clone()
};
if let Some(val) = extract_outer_field(outer_bytes, &bare) {
let unqualified_field = f
.field
.find('.')
.map_or(f.field.as_str(), |dot| &f.field[dot + 1..])
.to_string();
ScanFilter {
field: unqualified_field,
op: lit_op,
value: val,
clauses: Vec::new(),
expr: None,
}
} else {
f
}
}
pub(super) fn build_scan_plan(
collection: &str,
filter_bytes: Vec<u8>,
order_by: &[(String, bool)],
limit: usize,
) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::Scan {
collection: collection.to_string(),
limit: limit.min(100_000),
offset: 0,
sort_keys: order_by.to_vec(),
filters: filter_bytes,
distinct: false,
projection: Vec::new(),
computed_columns: Vec::new(),
window_functions: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
prefilter: None,
})
}
pub(super) fn build_row(
outer: &[u8],
inner: Option<&[u8]>,
outer_alias: &str,
lateral_alias: &str,
projection: &[JoinProjection],
) -> Vec<u8> {
let merged = merge_join_docs_binary(outer, inner, outer_alias, lateral_alias);
if projection.is_empty() {
merged
} else {
binary_row_project(&merged, projection)
}
}
pub(super) fn unwrap_data_field(bytes: &[u8]) -> &[u8] {
if let Some((start, end)) = msgpack_scan::extract_field(bytes, 0, "data") {
&bytes[start..end]
} else {
bytes
}
}
pub(super) fn flatten_outer_row(bytes: &[u8]) -> Vec<u8> {
if let Some((s, e)) = msgpack_scan::extract_field(bytes, 0, "data") {
bytes[s..e].to_vec()
} else {
bytes.to_vec()
}
}