use std::collections::HashMap;
use std::path::PathBuf;
use nodedb_types::value::Value;
use super::materialize_scan::{build_response, encode_cursor};
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::task::ExecutionTask;
use crate::engine::timeseries::columnar_memtable::{ColumnData, ColumnType};
use crate::engine::timeseries::columnar_segment::ColumnarSegmentReader;
impl CoreLoop {
pub(in crate::data::executor) fn execute_ts_materialize_scan(
&self,
task: &ExecutionTask,
collection: &str,
cursor: &[u8],
count: usize,
system_as_of_ms: Option<i64>,
) -> crate::bridge::envelope::Response {
let tid = task.request.tenant_id;
let engine_key = (tid, collection.to_string());
let (start_segment, start_row) = parse_cursor_ts(cursor);
let mut entries: Vec<(u32, Vec<u8>)> = Vec::with_capacity(count.min(256));
let mut last_segment: u32 = start_segment;
let mut last_row: u32 = start_row;
if start_segment == 0
&& let Some(mt) = self.columnar_memtables.get(&engine_key)
&& !mt.is_empty()
{
let schema = mt.schema();
let ts_system_idx = schema.ts_system_idx();
let col_count = schema.columns.len();
let row_count = mt.row_count() as usize;
let first_row = start_row as usize;
for row_idx in first_row..row_count {
if let (Some(sys_idx), Some(cutoff)) = (ts_system_idx, system_as_of_ms) {
let ts_val = ts_memtable_value(mt.column(sys_idx), row_idx);
if ts_val > cutoff {
continue;
}
}
let value_bytes = match encode_ts_memtable_row(mt, col_count, row_idx, collection) {
Some(b) => b,
None => continue,
};
let surrogate: u32 = 0x8000_0000 | (row_idx as u32 & 0x7FFF_FFFF);
entries.push((surrogate, value_bytes));
last_segment = 0;
last_row = (row_idx + 1) as u32;
if entries.len() >= count {
break;
}
}
}
let enter_partitions = start_segment >= 1 || (entries.len() < count && start_segment == 0);
if enter_partitions
&& entries.len() < count
&& let Some(registry) = self.ts_registries.get(&engine_key)
{
let partition_dirs: Vec<(usize, PathBuf)> = registry
.iter()
.enumerate()
.map(|(i, (_start_ts, entry))| {
let part_id = i + 1; let dir = self
.data_dir
.join("ts")
.join(collection)
.join(&entry.dir_name);
(part_id, dir)
})
.collect();
let first_part_id = if start_segment >= 1 {
start_segment as usize
} else {
1
};
'part_loop: for (part_id, part_dir) in &partition_dirs {
if *part_id < first_part_id {
continue;
}
if !part_dir.exists() {
continue;
}
let schema = match ColumnarSegmentReader::read_schema(part_dir, None) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
collection,
part_id,
error = %e,
"ts_materialize_scan: failed to read schema; skipping partition"
);
continue;
}
};
let ts_system_idx = schema.ts_system_idx();
let col_data: Vec<Option<ColumnData>> = schema
.columns
.iter()
.map(|(name, ty)| {
ColumnarSegmentReader::read_column(part_dir, name, *ty, None).ok()
})
.collect();
let sym_dicts: HashMap<usize, nodedb_types::timeseries::SymbolDictionary> = schema
.columns
.iter()
.enumerate()
.filter(|(_, (_, ty))| *ty == ColumnType::Symbol)
.filter_map(|(i, (name, _))| {
ColumnarSegmentReader::read_symbol_dict(part_dir, name, None)
.ok()
.map(|dict| (i, dict))
})
.collect();
let ts_col = col_data.get(schema.timestamp_idx).and_then(|d| d.as_ref());
let row_count = match ts_col {
Some(col) => col.len(),
None => continue,
};
let first_row_in_part = if *part_id == start_segment as usize {
start_row as usize
} else {
0
};
for row_idx in first_row_in_part..row_count {
if let (Some(sys_idx), Some(cutoff)) = (ts_system_idx, system_as_of_ms)
&& let Some(sys_col) = &col_data[sys_idx]
&& ts_partition_value(sys_col, row_idx) > cutoff
{
continue;
}
let value_bytes = match encode_ts_partition_row(
&schema.columns,
&col_data,
&sym_dicts,
row_idx,
collection,
) {
Some(b) => b,
None => continue,
};
let surrogate: u32 = encode_ts_part_surrogate(*part_id as u32, row_idx as u32);
entries.push((surrogate, value_bytes));
last_segment = *part_id as u32;
last_row = (row_idx + 1) as u32;
if entries.len() >= count {
break 'part_loop;
}
}
}
}
let next_cursor = if entries.len() < count {
Vec::new()
} else {
encode_cursor(last_segment, last_row)
};
build_response(self, task, entries, next_cursor)
}
}
fn parse_cursor_ts(cursor: &[u8]) -> (u32, u32) {
if cursor.len() < 8 {
return (0, 0); }
let seg = u32::from_be_bytes([cursor[0], cursor[1], cursor[2], cursor[3]]);
let row = u32::from_be_bytes([cursor[4], cursor[5], cursor[6], cursor[7]]);
(seg, row)
}
pub(super) fn encode_ts_part_surrogate(part_id_1based: u32, row_idx: u32) -> u32 {
(part_id_1based & 0xFFFF) << 16 | (row_idx & 0xFFFF)
}
fn encode_ts_memtable_row(
mt: &crate::engine::timeseries::columnar_memtable::ColumnarMemtable,
col_count: usize,
row_idx: usize,
collection: &str,
) -> Option<Vec<u8>> {
let mut map: HashMap<String, Value> = HashMap::with_capacity(col_count);
let schema = mt.schema();
for (col_idx, (col_name, col_type)) in schema.columns.iter().enumerate() {
let col_data = mt.column(col_idx);
let val = memtable_col_to_value(col_data, col_type, col_idx, mt, row_idx);
map.insert(col_name.clone(), val);
}
let ndb_val = Value::Object(map);
match nodedb_types::value_to_msgpack(&ndb_val) {
Ok(b) => Some(b),
Err(e) => {
tracing::warn!(
collection,
row_idx,
error = %e,
"ts_materialize_scan: memtable row encode failed; skipping"
);
None
}
}
}
fn encode_ts_partition_row(
schema_columns: &[(String, ColumnType)],
col_data: &[Option<ColumnData>],
sym_dicts: &HashMap<usize, nodedb_types::timeseries::SymbolDictionary>,
row_idx: usize,
collection: &str,
) -> Option<Vec<u8>> {
let mut map: HashMap<String, Value> = HashMap::with_capacity(schema_columns.len());
for (col_i, (col_name, col_type)) in schema_columns.iter().enumerate() {
let Some(data) = &col_data[col_i] else {
continue;
};
let val = partition_col_to_value(data, col_type, col_i, sym_dicts, row_idx);
map.insert(col_name.clone(), val);
}
let ndb_val = Value::Object(map);
match nodedb_types::value_to_msgpack(&ndb_val) {
Ok(b) => Some(b),
Err(e) => {
tracing::warn!(
collection,
row_idx,
error = %e,
"ts_materialize_scan: partition row encode failed; skipping"
);
None
}
}
}
fn memtable_col_to_value(
col_data: &ColumnData,
col_type: &ColumnType,
col_idx: usize,
mt: &crate::engine::timeseries::columnar_memtable::ColumnarMemtable,
row_idx: usize,
) -> Value {
match col_type {
ColumnType::Timestamp => Value::Integer(col_data.as_timestamps()[row_idx]),
ColumnType::Float64 => {
let v = col_data.as_f64()[row_idx];
if v.is_nan() {
Value::Null
} else {
Value::Float(v)
}
}
ColumnType::Int64 => Value::Integer(col_data.as_i64()[row_idx]),
ColumnType::Symbol => {
let sym_id = col_data.as_symbols()[row_idx];
mt.symbol_dict(col_idx)
.and_then(|d| d.get(sym_id))
.map(|s| Value::String(s.to_string()))
.unwrap_or(Value::Null)
}
}
}
fn partition_col_to_value(
data: &ColumnData,
col_type: &ColumnType,
col_i: usize,
sym_dicts: &HashMap<usize, nodedb_types::timeseries::SymbolDictionary>,
row_idx: usize,
) -> Value {
match col_type {
ColumnType::Timestamp => Value::Integer(data.as_timestamps()[row_idx]),
ColumnType::Float64 => {
let v = data.as_f64()[row_idx];
if v.is_nan() {
Value::Null
} else {
Value::Float(v)
}
}
ColumnType::Int64 => {
if let ColumnData::Int64(vals) = data {
Value::Integer(vals[row_idx])
} else {
Value::Null
}
}
ColumnType::Symbol => {
if let ColumnData::Symbol(ids) = data {
sym_dicts
.get(&col_i)
.and_then(|dict| dict.get(ids[row_idx]))
.map(|s| Value::String(s.to_string()))
.unwrap_or(Value::Null)
} else {
Value::Null
}
}
}
}
fn ts_memtable_value(col_data: &ColumnData, row_idx: usize) -> i64 {
match col_data {
ColumnData::Timestamp(v) | ColumnData::Int64(v) => v.get(row_idx).copied().unwrap_or(0),
_ => 0,
}
}
fn ts_partition_value(col_data: &ColumnData, row_idx: usize) -> i64 {
match col_data {
ColumnData::Timestamp(v) | ColumnData::Int64(v) => v.get(row_idx).copied().unwrap_or(0),
_ => 0,
}
}