use nodedb_types::value::Value;
use crate::bridge::envelope::Response;
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::scan_normalize::decoded_col_to_value;
use crate::data::executor::task::ExecutionTask;
impl CoreLoop {
pub(in crate::data::executor) fn execute_columnar_materialize_scan(
&mut self,
task: &ExecutionTask,
collection: &str,
cursor: &[u8],
count: usize,
system_as_of_ms: Option<i64>,
) -> Response {
let _scan_guard =
match self.acquire_scan_guard(task, task.request.tenant_id.as_u64(), collection) {
Ok(g) => g,
Err(resp) => return resp,
};
let tid = task.request.tenant_id;
let engine_key = (tid, collection.to_string());
let Some(engine) = self.columnar_engines.get(&engine_key) else {
let has_ts_memtable = self
.columnar_memtables
.get(&engine_key)
.is_some_and(|mt| !mt.is_empty());
let has_ts_partitions = self.ts_registries.contains_key(&engine_key);
if has_ts_memtable || has_ts_partitions {
return self.execute_ts_materialize_scan(
task,
collection,
cursor,
count,
system_as_of_ms,
);
}
return build_response(self, task, Vec::new(), Vec::new());
};
let schema = engine.schema().clone();
let ts_system_idx = schema.columns.iter().position(|c| c.name == "_ts_system");
let (start_segment, start_row) = parse_cursor(cursor);
let mut entries: Vec<(u32, Vec<u8>)> = Vec::with_capacity(count.min(256));
let mut last_segment: u32 = start_segment;
let mut last_row: u32 = start_row;
let flushed: Vec<Vec<u8>> = self
.columnar_flushed_segments
.get(&engine_key)
.cloned()
.unwrap_or_default();
'seg_loop: for (seg_idx, seg_bytes) in flushed.iter().enumerate() {
let seg_id = (seg_idx as u32) + 1;
if seg_id < start_segment {
continue;
}
let reader = match nodedb_columnar::SegmentReader::open(seg_bytes) {
Ok(r) => r,
Err(e) => {
tracing::warn!(
collection,
seg_id,
error = %e,
"materialize_scan: failed to open flushed segment; skipping"
);
continue;
}
};
let row_count = reader.row_count() as usize;
let col_count = schema.columns.len();
let mut decoded_cols = Vec::with_capacity(col_count);
let mut decode_ok = true;
for col_idx in 0..col_count {
match reader.read_column(col_idx) {
Ok(dc) => decoded_cols.push(dc),
Err(e) => {
tracing::warn!(
collection,
seg_id,
col_idx,
error = %e,
"materialize_scan: column decode failed; skipping segment"
);
decode_ok = false;
break;
}
}
}
if !decode_ok {
continue;
}
let first_row_in_seg = if seg_id == start_segment {
start_row as usize
} else {
0
};
let delete_bm = engine.delete_bitmap(seg_id as u64);
for row_idx in first_row_in_seg..row_count {
if delete_bm.is_some_and(|bm| bm.is_deleted(row_idx as u32)) {
continue;
}
if let (Some(ts_idx), Some(cutoff)) = (ts_system_idx, system_as_of_ms) {
let ts_val = decoded_col_to_value(&decoded_cols[ts_idx], row_idx);
if let Value::Integer(ts) = ts_val
&& ts > cutoff
{
continue;
}
}
let mut map = std::collections::HashMap::new();
for (col_idx, col_def) in schema.columns.iter().enumerate() {
let val = decoded_col_to_value(&decoded_cols[col_idx], row_idx);
map.insert(col_def.name.clone(), val);
}
let ndb_val = Value::Object(map);
let value_bytes = match nodedb_types::value_to_msgpack(&ndb_val) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
collection,
seg_id,
row_idx,
error = %e,
"materialize_scan: row msgpack encode failed; skipping"
);
continue;
}
};
let synthetic_surrogate: u32 = encode_seg_row_as_u32(seg_id, row_idx as u32);
entries.push((synthetic_surrogate, value_bytes));
last_segment = seg_id;
last_row = (row_idx + 1) as u32;
if entries.len() >= count {
break 'seg_loop;
}
}
}
if entries.len() < count {
let all_flushed_done = last_segment == 0 || (last_segment as usize) >= flushed.len();
let memtable_start_row = if start_segment == 0 {
start_row as usize
} else if all_flushed_done {
0
} else {
usize::MAX
};
if memtable_start_row != usize::MAX {
let engine = self.columnar_engines.get(&engine_key).unwrap();
let schema = engine.schema().clone();
let ts_system_idx = schema.columns.iter().position(|c| c.name == "_ts_system");
let rows_with_surrogates: Vec<(Option<nodedb_types::Surrogate>, Vec<Value>)> =
engine
.scan_memtable_rows_with_surrogates()
.skip(memtable_start_row)
.collect();
for (mt_idx, (row_surrogate, row)) in rows_with_surrogates.iter().enumerate() {
if let (Some(ts_idx), Some(cutoff)) = (ts_system_idx, system_as_of_ms)
&& let Some(Value::Integer(ts)) = row.get(ts_idx)
&& *ts > cutoff
{
continue;
}
let mut map = std::collections::HashMap::new();
for (col_idx, col_def) in schema.columns.iter().enumerate() {
if col_idx < row.len() {
map.insert(col_def.name.clone(), row[col_idx].clone());
}
}
let ndb_val = Value::Object(map);
let value_bytes = match nodedb_types::value_to_msgpack(&ndb_val) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
collection,
mt_idx,
error = %e,
"materialize_scan: memtable row encode failed; skipping"
);
continue;
}
};
let abs_row = memtable_start_row + mt_idx;
let synthetic_surrogate: u32 =
row_surrogate.map(|s| s.as_u32()).unwrap_or_else(|| {
(abs_row as u32) | 0x8000_0000
});
entries.push((synthetic_surrogate, value_bytes));
last_segment = 0;
last_row = (abs_row + 1) as u32;
if entries.len() >= count {
break;
}
}
}
}
let next_cursor = if entries.len() < count {
Vec::new()
} else {
encode_cursor(last_segment, last_row)
};
build_response(self, task, entries, next_cursor)
}
}
pub(super) fn encode_seg_row_as_u32(seg_id: u32, row_idx: u32) -> u32 {
(seg_id & 0xFFFF) << 16 | (row_idx & 0xFFFF)
}
pub(super) fn parse_cursor(cursor: &[u8]) -> (u32, u32) {
if cursor.len() < 8 {
return (1, 0);
}
let seg = u32::from_be_bytes([cursor[0], cursor[1], cursor[2], cursor[3]]);
let row = u32::from_be_bytes([cursor[4], cursor[5], cursor[6], cursor[7]]);
(seg, row)
}
pub(super) fn encode_cursor(segment_id: u32, row_index: u32) -> Vec<u8> {
let mut c = Vec::with_capacity(8);
c.extend_from_slice(&segment_id.to_be_bytes());
c.extend_from_slice(&row_index.to_be_bytes());
c
}
pub(super) fn build_response(
core: &CoreLoop,
task: &ExecutionTask,
entries: Vec<(u32, Vec<u8>)>,
next_cursor: Vec<u8>,
) -> Response {
let mut payload = Vec::with_capacity(
entries.iter().map(|(_, v)| v.len() + 8).sum::<usize>() + next_cursor.len() + 16,
);
nodedb_query::msgpack_scan::write_array_header(&mut payload, 2);
write_bin(&mut payload, &next_cursor);
nodedb_query::msgpack_scan::write_array_header(&mut payload, entries.len());
for (surrogate, value_bytes) in &entries {
nodedb_query::msgpack_scan::write_array_header(&mut payload, 2);
write_u32(&mut payload, *surrogate);
write_bin(&mut payload, value_bytes);
}
if entries.is_empty() && next_cursor.is_empty() {
return core.response_with_payload(task, payload);
}
if let Some(ref m) = core.metrics {
m.record_query();
}
core.response_with_payload(task, payload)
}
fn write_bin(out: &mut Vec<u8>, bytes: &[u8]) {
let len = bytes.len();
if len <= u8::MAX as usize {
out.push(0xc4);
out.push(len as u8);
} else if len <= u16::MAX as usize {
out.push(0xc5);
out.extend_from_slice(&(len as u16).to_be_bytes());
} else {
out.push(0xc6);
out.extend_from_slice(&(len as u32).to_be_bytes());
}
out.extend_from_slice(bytes);
}
fn write_u32(out: &mut Vec<u8>, v: u32) {
out.push(0xce);
out.extend_from_slice(&v.to_be_bytes());
}