use crate::bridge::envelope::Response;
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::task::ExecutionTask;
use crate::engine::kv::KvScanParams;
use crate::engine::kv::current_ms;
impl CoreLoop {
pub(in crate::data::executor) fn execute_kv_materialize_scan(
&self,
task: &ExecutionTask,
tid: u64,
collection: &str,
cursor: &[u8],
count: usize,
) -> Response {
let _scan_guard = match self.acquire_scan_guard(task, tid, collection) {
Ok(g) => g,
Err(resp) => return resp,
};
let now_ms = current_ms();
let (entries, next_cursor) = self.kv_engine.scan(KvScanParams {
tenant_id: tid,
collection,
cursor,
count,
now_ms,
match_pattern: None,
filter_field: None,
filter_value: None,
surrogate_ceiling: None,
});
let mut payload = Vec::with_capacity(
entries
.iter()
.map(|(k, v)| k.len() + v.len() + 6)
.sum::<usize>()
+ next_cursor.len()
+ 16,
);
nodedb_query::msgpack_scan::write_array_header(&mut payload, 2);
write_bin(&mut payload, &next_cursor);
nodedb_query::msgpack_scan::write_array_header(&mut payload, entries.len());
for (k, v) in &entries {
nodedb_query::msgpack_scan::write_array_header(&mut payload, 2);
write_bin(&mut payload, k);
write_bin(&mut payload, v);
}
if let Some(ref m) = self.metrics {
m.record_kv_scan();
}
self.response_with_payload(task, payload)
}
}
fn write_bin(out: &mut Vec<u8>, bytes: &[u8]) {
let len = bytes.len();
if len <= u8::MAX as usize {
out.push(0xc4);
out.push(len as u8);
} else if len <= u16::MAX as usize {
out.push(0xc5);
out.extend_from_slice(&(len as u16).to_be_bytes());
} else {
out.push(0xc6);
out.extend_from_slice(&(len as u32).to_be_bytes());
}
out.extend_from_slice(bytes);
}