use nodedb_types::TenantId;
use crate::bridge::physical_plan::{DocumentOp, PhysicalPlan};
use crate::control::server::dispatch_utils::dispatch_to_data_plane;
use crate::control::state::SharedState;
use crate::types::{TraceId, VShardId};
pub async fn run_preexec_scan(
shared: &SharedState,
tenant_id: TenantId,
database_id: crate::types::DatabaseId,
collection: &str,
filter_bytes: Vec<u8>,
) -> crate::Result<Vec<u32>> {
let vshard_id = VShardId::from_collection_in_database(database_id, collection);
let scan_plan = PhysicalPlan::Document(DocumentOp::Scan {
collection: collection.to_owned(),
filters: filter_bytes,
limit: usize::MAX,
offset: 0,
sort_keys: vec![],
distinct: false,
projection: vec![],
computed_columns: vec![],
window_functions: vec![],
system_as_of_ms: None,
valid_at_ms: None,
prefilter: None,
});
let response =
dispatch_to_data_plane(shared, tenant_id, vshard_id, scan_plan, TraceId::ZERO).await?;
if response.status != crate::bridge::envelope::Status::Ok {
return Err(crate::Error::Storage {
engine: "preexec-scan".into(),
detail: format!("pre-execution scan failed: {:?}", response.error_code),
});
}
let surrogates = decode_scan_surrogates(&response.payload);
Ok(surrogates)
}
fn decode_scan_surrogates(payload: &[u8]) -> Vec<u32> {
if payload.is_empty() {
return vec![];
}
let mut surrogates = Vec::new();
let json_str = nodedb_types::msgpack_to_json_string(payload)
.unwrap_or_else(|_| String::from_utf8_lossy(payload).into_owned());
if let Ok(rows) = sonic_rs::from_str::<sonic_rs::Value>(&json_str) {
use sonic_rs::{JsonContainerTrait, JsonValueTrait};
if rows.is_array() {
for row in rows.as_array().into_iter().flatten() {
if let Some(id_val) = row.get("id")
&& let Some(id_str) = id_val.as_str()
&& id_str.len() == 8
&& let Ok(surrogate) = u32::from_str_radix(id_str, 16)
{
surrogates.push(surrogate);
}
}
}
}
surrogates.sort_unstable();
surrogates
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn decode_empty_payload_returns_empty() {
let result = decode_scan_surrogates(&[]);
assert!(result.is_empty());
}
}