reifydb_engine/vm/volcano/scan/
dictionary.rs1use std::sync::Arc;
5
6use postcard::from_bytes;
7use reifydb_core::{
8 encoded::key::EncodedKey,
9 interface::resolved::ResolvedDictionary,
10 internal_error,
11 key::{EncodableKey, dictionary::DictionaryEntryIndexKey},
12 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
13};
14use reifydb_transaction::transaction::Transaction;
15use reifydb_type::{
16 fragment::Fragment,
17 value::{Value, dictionary::DictionaryEntryId, r#type::Type},
18};
19use tracing::instrument;
20
21use crate::{
22 Result,
23 vm::volcano::query::{QueryContext, QueryNode},
24};
25
26pub struct DictionaryScanNode {
27 dictionary: ResolvedDictionary,
28 context: Option<Arc<QueryContext>>,
29 headers: ColumnHeaders,
30 last_key: Option<EncodedKey>,
31 exhausted: bool,
32}
33
34impl DictionaryScanNode {
35 pub fn new(dictionary: ResolvedDictionary, context: Arc<QueryContext>) -> Result<Self> {
36 let headers = ColumnHeaders {
37 columns: vec![Fragment::internal("id"), Fragment::internal("value")],
38 };
39
40 Ok(Self {
41 dictionary,
42 context: Some(context),
43 headers,
44 last_key: None,
45 exhausted: false,
46 })
47 }
48}
49
50impl QueryNode for DictionaryScanNode {
51 #[instrument(name = "volcano::scan::dictionary::initialize", level = "trace", skip_all)]
52 fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
53 Ok(())
54 }
55
56 #[instrument(name = "volcano::scan::dictionary::next", level = "trace", skip_all)]
57 fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
58 debug_assert!(self.context.is_some(), "DictionaryScan::next() called before initialize()");
59 let stored_ctx = self.context.as_ref().unwrap();
60
61 if self.exhausted {
62 return Ok(None);
63 }
64
65 let batch_size = stored_ctx.batch_size;
66 let dict_def = self.dictionary.def();
67
68 let range = DictionaryEntryIndexKey::full_scan(dict_def.id);
69
70 let mut ids: Vec<DictionaryEntryId> = Vec::new();
71 let mut values: Vec<Value> = Vec::new();
72 let mut new_last_key = None;
73
74 let stream = rx.range(range, batch_size as usize)?;
75 let mut count = 0;
76
77 for entry in stream {
78 let entry = entry?;
79
80 if let Some(ref last) = self.last_key
81 && &entry.key <= last
82 {
83 continue;
84 }
85
86 if let Some(key) = DictionaryEntryIndexKey::decode(&entry.key) {
87 let entry_id = DictionaryEntryId::from_u128(key.id as u128, dict_def.id_type.clone())?;
88
89 let value: Value = from_bytes(&entry.row).map_err(|e| {
90 internal_error!("Failed to deserialize dictionary value: {}", e)
91 })?;
92
93 ids.push(entry_id);
94 values.push(value);
95 new_last_key = Some(entry.key);
96
97 count += 1;
98 if count >= batch_size as usize {
99 break;
100 }
101 }
102 }
103
104 if ids.is_empty() {
105 self.exhausted = true;
106 if self.last_key.is_none() {
107 let columns = Columns::new(vec![
108 ColumnWithName {
109 name: Fragment::internal("id"),
110 data: ColumnBuffer::none_typed(dict_def.id_type.clone(), 0),
111 },
112 ColumnWithName {
113 name: Fragment::internal("value"),
114 data: ColumnBuffer::none_typed(dict_def.value_type.clone(), 0),
115 },
116 ]);
117 return Ok(Some(columns));
118 }
119 return Ok(None);
120 }
121
122 self.last_key = new_last_key;
123
124 let id_column = build_id_column(&ids, dict_def.id_type.clone())?;
125 let value_column = build_value_column(&values, dict_def.value_type.clone())?;
126
127 let columns = Columns::new(vec![id_column, value_column]);
128
129 Ok(Some(columns))
130 }
131
132 fn headers(&self) -> Option<ColumnHeaders> {
133 Some(self.headers.clone())
134 }
135}
136
137fn build_id_column(ids: &[DictionaryEntryId], id_type: Type) -> Result<ColumnWithName> {
138 let data = match id_type {
139 Type::Uint1 => {
140 let vals: Vec<u8> = ids.iter().map(|id| id.to_u128() as u8).collect();
141 ColumnBuffer::uint1(vals)
142 }
143 Type::Uint2 => {
144 let vals: Vec<u16> = ids.iter().map(|id| id.to_u128() as u16).collect();
145 ColumnBuffer::uint2(vals)
146 }
147 Type::Uint4 => {
148 let vals: Vec<u32> = ids.iter().map(|id| id.to_u128() as u32).collect();
149 ColumnBuffer::uint4(vals)
150 }
151 Type::Uint8 => {
152 let vals: Vec<u64> = ids.iter().map(|id| id.to_u128() as u64).collect();
153 ColumnBuffer::uint8(vals)
154 }
155 Type::Uint16 => {
156 let vals: Vec<u128> = ids.iter().map(|id| id.to_u128()).collect();
157 ColumnBuffer::uint16(vals)
158 }
159 _ => return Err(internal_error!("Invalid dictionary id_type: {:?}", id_type)),
160 };
161
162 Ok(ColumnWithName {
163 name: Fragment::internal("id"),
164 data,
165 })
166}
167
168fn build_value_column(values: &[Value], value_type: Type) -> Result<ColumnWithName> {
169 let data = match value_type {
170 Type::Utf8 => {
171 let vals: Vec<String> = values
172 .iter()
173 .map(|v| match v {
174 Value::Utf8(s) => s.clone(),
175 _ => format!("{:?}", v),
176 })
177 .collect();
178 ColumnBuffer::utf8(vals)
179 }
180 Type::Int1 => {
181 let vals: Vec<i8> = values
182 .iter()
183 .map(|v| match v {
184 Value::Int1(n) => *n,
185 _ => 0,
186 })
187 .collect();
188 ColumnBuffer::int1(vals)
189 }
190 Type::Int2 => {
191 let vals: Vec<i16> = values
192 .iter()
193 .map(|v| match v {
194 Value::Int2(n) => *n,
195 _ => 0,
196 })
197 .collect();
198 ColumnBuffer::int2(vals)
199 }
200 Type::Int4 => {
201 let vals: Vec<i32> = values
202 .iter()
203 .map(|v| match v {
204 Value::Int4(n) => *n,
205 _ => 0,
206 })
207 .collect();
208 ColumnBuffer::int4(vals)
209 }
210 Type::Int8 => {
211 let vals: Vec<i64> = values
212 .iter()
213 .map(|v| match v {
214 Value::Int8(n) => *n,
215 _ => 0,
216 })
217 .collect();
218 ColumnBuffer::int8(vals)
219 }
220 Type::Uint1 => {
221 let vals: Vec<u8> = values
222 .iter()
223 .map(|v| match v {
224 Value::Uint1(n) => *n,
225 _ => 0,
226 })
227 .collect();
228 ColumnBuffer::uint1(vals)
229 }
230 Type::Uint2 => {
231 let vals: Vec<u16> = values
232 .iter()
233 .map(|v| match v {
234 Value::Uint2(n) => *n,
235 _ => 0,
236 })
237 .collect();
238 ColumnBuffer::uint2(vals)
239 }
240 Type::Uint4 => {
241 let vals: Vec<u32> = values
242 .iter()
243 .map(|v| match v {
244 Value::Uint4(n) => *n,
245 _ => 0,
246 })
247 .collect();
248 ColumnBuffer::uint4(vals)
249 }
250 Type::Uint8 => {
251 let vals: Vec<u64> = values
252 .iter()
253 .map(|v| match v {
254 Value::Uint8(n) => *n,
255 _ => 0,
256 })
257 .collect();
258 ColumnBuffer::uint8(vals)
259 }
260 _ => {
261 let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
262 ColumnBuffer::utf8(vals)
263 }
264 };
265
266 Ok(ColumnWithName {
267 name: Fragment::internal("value"),
268 data,
269 })
270}