use std::sync::Arc;
use postcard::from_bytes;
use reifydb_core::{
encoded::key::EncodedKey,
interface::resolved::ResolvedDictionary,
internal_error,
key::{EncodableKey, dictionary::DictionaryEntryIndexKey},
value::column::{Column, columns::Columns, data::ColumnData, headers::ColumnHeaders},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{
fragment::Fragment,
value::{Value, dictionary::DictionaryEntryId, r#type::Type},
};
use tracing::instrument;
use crate::{
Result,
vm::volcano::query::{QueryContext, QueryNode},
};
pub struct DictionaryScanNode {
dictionary: ResolvedDictionary,
context: Option<Arc<QueryContext>>,
headers: ColumnHeaders,
last_key: Option<EncodedKey>,
exhausted: bool,
scan_limit: Option<usize>,
}
impl DictionaryScanNode {
pub fn new(dictionary: ResolvedDictionary, context: Arc<QueryContext>) -> Result<Self> {
let headers = ColumnHeaders {
columns: vec![Fragment::internal("id"), Fragment::internal("value")],
};
Ok(Self {
dictionary,
context: Some(context),
headers,
last_key: None,
exhausted: false,
scan_limit: None,
})
}
}
impl QueryNode for DictionaryScanNode {
#[instrument(name = "volcano::scan::dictionary::initialize", level = "trace", skip_all)]
fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
Ok(())
}
#[instrument(name = "volcano::scan::dictionary::next", level = "trace", skip_all)]
fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
debug_assert!(self.context.is_some(), "DictionaryScan::next() called before initialize()");
let stored_ctx = self.context.as_ref().unwrap();
if self.exhausted {
return Ok(None);
}
let batch_size = match self.scan_limit {
Some(limit) => (limit as u64).min(stored_ctx.batch_size),
None => stored_ctx.batch_size,
};
let dict_def = self.dictionary.def();
let range = DictionaryEntryIndexKey::full_scan(dict_def.id);
let mut ids: Vec<DictionaryEntryId> = Vec::new();
let mut values: Vec<Value> = Vec::new();
let mut new_last_key = None;
let stream = rx.range(range, batch_size as usize)?;
let mut count = 0;
for entry in stream {
let entry = entry?;
if let Some(ref last) = self.last_key
&& &entry.key <= last
{
continue;
}
if let Some(key) = DictionaryEntryIndexKey::decode(&entry.key) {
let entry_id = DictionaryEntryId::from_u128(key.id as u128, dict_def.id_type.clone())?;
let value: Value = from_bytes(&entry.row).map_err(|e| {
internal_error!("Failed to deserialize dictionary value: {}", e)
})?;
ids.push(entry_id);
values.push(value);
new_last_key = Some(entry.key);
count += 1;
if count >= batch_size as usize {
break;
}
}
}
if ids.is_empty() {
self.exhausted = true;
if self.last_key.is_none() {
let columns = Columns::new(vec![
Column {
name: Fragment::internal("id"),
data: ColumnData::none_typed(dict_def.id_type.clone(), 0),
},
Column {
name: Fragment::internal("value"),
data: ColumnData::none_typed(dict_def.value_type.clone(), 0),
},
]);
return Ok(Some(columns));
}
return Ok(None);
}
self.last_key = new_last_key;
let id_column = build_id_column(&ids, dict_def.id_type.clone())?;
let value_column = build_value_column(&values, dict_def.value_type.clone())?;
let columns = Columns::new(vec![id_column, value_column]);
Ok(Some(columns))
}
fn headers(&self) -> Option<ColumnHeaders> {
Some(self.headers.clone())
}
fn set_scan_limit(&mut self, limit: usize) {
self.scan_limit = Some(limit);
}
}
fn build_id_column(ids: &[DictionaryEntryId], id_type: Type) -> Result<Column> {
let data = match id_type {
Type::Uint1 => {
let vals: Vec<u8> = ids.iter().map(|id| id.to_u128() as u8).collect();
ColumnData::uint1(vals)
}
Type::Uint2 => {
let vals: Vec<u16> = ids.iter().map(|id| id.to_u128() as u16).collect();
ColumnData::uint2(vals)
}
Type::Uint4 => {
let vals: Vec<u32> = ids.iter().map(|id| id.to_u128() as u32).collect();
ColumnData::uint4(vals)
}
Type::Uint8 => {
let vals: Vec<u64> = ids.iter().map(|id| id.to_u128() as u64).collect();
ColumnData::uint8(vals)
}
Type::Uint16 => {
let vals: Vec<u128> = ids.iter().map(|id| id.to_u128()).collect();
ColumnData::uint16(vals)
}
_ => return Err(internal_error!("Invalid dictionary id_type: {:?}", id_type)),
};
Ok(Column {
name: Fragment::internal("id"),
data,
})
}
fn build_value_column(values: &[Value], value_type: Type) -> Result<Column> {
let data = match value_type {
Type::Utf8 => {
let vals: Vec<String> = values
.iter()
.map(|v| match v {
Value::Utf8(s) => s.clone(),
_ => format!("{:?}", v), })
.collect();
ColumnData::utf8(vals)
}
Type::Int1 => {
let vals: Vec<i8> = values
.iter()
.map(|v| match v {
Value::Int1(n) => *n,
_ => 0,
})
.collect();
ColumnData::int1(vals)
}
Type::Int2 => {
let vals: Vec<i16> = values
.iter()
.map(|v| match v {
Value::Int2(n) => *n,
_ => 0,
})
.collect();
ColumnData::int2(vals)
}
Type::Int4 => {
let vals: Vec<i32> = values
.iter()
.map(|v| match v {
Value::Int4(n) => *n,
_ => 0,
})
.collect();
ColumnData::int4(vals)
}
Type::Int8 => {
let vals: Vec<i64> = values
.iter()
.map(|v| match v {
Value::Int8(n) => *n,
_ => 0,
})
.collect();
ColumnData::int8(vals)
}
Type::Uint1 => {
let vals: Vec<u8> = values
.iter()
.map(|v| match v {
Value::Uint1(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint1(vals)
}
Type::Uint2 => {
let vals: Vec<u16> = values
.iter()
.map(|v| match v {
Value::Uint2(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint2(vals)
}
Type::Uint4 => {
let vals: Vec<u32> = values
.iter()
.map(|v| match v {
Value::Uint4(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint4(vals)
}
Type::Uint8 => {
let vals: Vec<u64> = values
.iter()
.map(|v| match v {
Value::Uint8(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint8(vals)
}
_ => {
let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
ColumnData::utf8(vals)
}
};
Ok(Column {
name: Fragment::internal("value"),
data,
})
}