reifydb_engine/vm/volcano/scan/
dictionary.rs1use std::sync::Arc;
5
6use postcard::from_bytes;
7use reifydb_core::{
8 encoded::key::EncodedKey,
9 interface::resolved::ResolvedDictionary,
10 internal_error,
11 key::{EncodableKey, dictionary::DictionaryEntryIndexKey},
12 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
13};
14use reifydb_transaction::transaction::Transaction;
15use reifydb_type::{
16 fragment::Fragment,
17 value::{Value, dictionary::DictionaryEntryId, r#type::Type},
18};
19use tracing::instrument;
20
21use crate::{
22 Result,
23 vm::volcano::query::{QueryContext, QueryNode},
24};
25
26pub struct DictionaryScanNode {
27 dictionary: ResolvedDictionary,
28 context: Option<Arc<QueryContext>>,
29 headers: ColumnHeaders,
30 last_key: Option<EncodedKey>,
31 exhausted: bool,
32 scan_limit: Option<usize>,
33}
34
35impl DictionaryScanNode {
36 pub fn new(dictionary: ResolvedDictionary, context: Arc<QueryContext>) -> Result<Self> {
37 let headers = ColumnHeaders {
39 columns: vec![Fragment::internal("id"), Fragment::internal("value")],
40 };
41
42 Ok(Self {
43 dictionary,
44 context: Some(context),
45 headers,
46 last_key: None,
47 exhausted: false,
48 scan_limit: None,
49 })
50 }
51}
52
53impl QueryNode for DictionaryScanNode {
54 #[instrument(name = "volcano::scan::dictionary::initialize", level = "trace", skip_all)]
55 fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
56 Ok(())
58 }
59
60 #[instrument(name = "volcano::scan::dictionary::next", level = "trace", skip_all)]
61 fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
62 debug_assert!(self.context.is_some(), "DictionaryScan::next() called before initialize()");
63 let stored_ctx = self.context.as_ref().unwrap();
64
65 if self.exhausted {
66 return Ok(None);
67 }
68
69 let batch_size = match self.scan_limit {
70 Some(limit) => (limit as u64).min(stored_ctx.batch_size),
71 None => stored_ctx.batch_size,
72 };
73 let dict_def = self.dictionary.def();
74
75 let range = DictionaryEntryIndexKey::full_scan(dict_def.id);
77
78 let mut ids: Vec<DictionaryEntryId> = Vec::new();
80 let mut values: Vec<Value> = Vec::new();
81 let mut new_last_key = None;
82
83 let stream = rx.range(range, batch_size as usize)?;
85 let mut count = 0;
86
87 for entry in stream {
88 let entry = entry?;
89
90 if let Some(ref last) = self.last_key
92 && &entry.key <= last
93 {
94 continue;
95 }
96
97 if let Some(key) = DictionaryEntryIndexKey::decode(&entry.key) {
99 let entry_id = DictionaryEntryId::from_u128(key.id as u128, dict_def.id_type.clone())?;
101
102 let value: Value = from_bytes(&entry.row).map_err(|e| {
104 internal_error!("Failed to deserialize dictionary value: {}", e)
105 })?;
106
107 ids.push(entry_id);
108 values.push(value);
109 new_last_key = Some(entry.key);
110
111 count += 1;
112 if count >= batch_size as usize {
113 break;
114 }
115 }
116 }
117
118 if ids.is_empty() {
119 self.exhausted = true;
120 if self.last_key.is_none() {
121 let columns = Columns::new(vec![
123 ColumnWithName {
124 name: Fragment::internal("id"),
125 data: ColumnBuffer::none_typed(dict_def.id_type.clone(), 0),
126 },
127 ColumnWithName {
128 name: Fragment::internal("value"),
129 data: ColumnBuffer::none_typed(dict_def.value_type.clone(), 0),
130 },
131 ]);
132 return Ok(Some(columns));
133 }
134 return Ok(None);
135 }
136
137 self.last_key = new_last_key;
138
139 let id_column = build_id_column(&ids, dict_def.id_type.clone())?;
141 let value_column = build_value_column(&values, dict_def.value_type.clone())?;
142
143 let columns = Columns::new(vec![id_column, value_column]);
144
145 Ok(Some(columns))
146 }
147
148 fn headers(&self) -> Option<ColumnHeaders> {
149 Some(self.headers.clone())
150 }
151
152 fn set_scan_limit(&mut self, limit: usize) {
153 self.scan_limit = Some(limit);
154 }
155}
156
157fn build_id_column(ids: &[DictionaryEntryId], id_type: Type) -> Result<ColumnWithName> {
159 let data = match id_type {
160 Type::Uint1 => {
161 let vals: Vec<u8> = ids.iter().map(|id| id.to_u128() as u8).collect();
162 ColumnBuffer::uint1(vals)
163 }
164 Type::Uint2 => {
165 let vals: Vec<u16> = ids.iter().map(|id| id.to_u128() as u16).collect();
166 ColumnBuffer::uint2(vals)
167 }
168 Type::Uint4 => {
169 let vals: Vec<u32> = ids.iter().map(|id| id.to_u128() as u32).collect();
170 ColumnBuffer::uint4(vals)
171 }
172 Type::Uint8 => {
173 let vals: Vec<u64> = ids.iter().map(|id| id.to_u128() as u64).collect();
174 ColumnBuffer::uint8(vals)
175 }
176 Type::Uint16 => {
177 let vals: Vec<u128> = ids.iter().map(|id| id.to_u128()).collect();
178 ColumnBuffer::uint16(vals)
179 }
180 _ => return Err(internal_error!("Invalid dictionary id_type: {:?}", id_type)),
181 };
182
183 Ok(ColumnWithName {
184 name: Fragment::internal("id"),
185 data,
186 })
187}
188
189fn build_value_column(values: &[Value], value_type: Type) -> Result<ColumnWithName> {
191 let data = match value_type {
192 Type::Utf8 => {
193 let vals: Vec<String> = values
194 .iter()
195 .map(|v| match v {
196 Value::Utf8(s) => s.clone(),
197 _ => format!("{:?}", v), })
199 .collect();
200 ColumnBuffer::utf8(vals)
201 }
202 Type::Int1 => {
203 let vals: Vec<i8> = values
204 .iter()
205 .map(|v| match v {
206 Value::Int1(n) => *n,
207 _ => 0,
208 })
209 .collect();
210 ColumnBuffer::int1(vals)
211 }
212 Type::Int2 => {
213 let vals: Vec<i16> = values
214 .iter()
215 .map(|v| match v {
216 Value::Int2(n) => *n,
217 _ => 0,
218 })
219 .collect();
220 ColumnBuffer::int2(vals)
221 }
222 Type::Int4 => {
223 let vals: Vec<i32> = values
224 .iter()
225 .map(|v| match v {
226 Value::Int4(n) => *n,
227 _ => 0,
228 })
229 .collect();
230 ColumnBuffer::int4(vals)
231 }
232 Type::Int8 => {
233 let vals: Vec<i64> = values
234 .iter()
235 .map(|v| match v {
236 Value::Int8(n) => *n,
237 _ => 0,
238 })
239 .collect();
240 ColumnBuffer::int8(vals)
241 }
242 Type::Uint1 => {
243 let vals: Vec<u8> = values
244 .iter()
245 .map(|v| match v {
246 Value::Uint1(n) => *n,
247 _ => 0,
248 })
249 .collect();
250 ColumnBuffer::uint1(vals)
251 }
252 Type::Uint2 => {
253 let vals: Vec<u16> = values
254 .iter()
255 .map(|v| match v {
256 Value::Uint2(n) => *n,
257 _ => 0,
258 })
259 .collect();
260 ColumnBuffer::uint2(vals)
261 }
262 Type::Uint4 => {
263 let vals: Vec<u32> = values
264 .iter()
265 .map(|v| match v {
266 Value::Uint4(n) => *n,
267 _ => 0,
268 })
269 .collect();
270 ColumnBuffer::uint4(vals)
271 }
272 Type::Uint8 => {
273 let vals: Vec<u64> = values
274 .iter()
275 .map(|v| match v {
276 Value::Uint8(n) => *n,
277 _ => 0,
278 })
279 .collect();
280 ColumnBuffer::uint8(vals)
281 }
282 _ => {
283 let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
285 ColumnBuffer::utf8(vals)
286 }
287 };
288
289 Ok(ColumnWithName {
290 name: Fragment::internal("value"),
291 data,
292 })
293}