Skip to main content

reifydb_engine/vm/volcano/scan/
dictionary.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use postcard::from_bytes;
7use reifydb_core::{
8	encoded::key::EncodedKey,
9	interface::resolved::ResolvedDictionary,
10	internal_error,
11	key::{EncodableKey, dictionary::DictionaryEntryIndexKey},
12	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
13};
14use reifydb_transaction::transaction::Transaction;
15use reifydb_type::{
16	fragment::Fragment,
17	value::{Value, dictionary::DictionaryEntryId, r#type::Type},
18};
19use tracing::instrument;
20
21use crate::{
22	Result,
23	vm::volcano::query::{QueryContext, QueryNode},
24};
25
26pub struct DictionaryScanNode {
27	dictionary: ResolvedDictionary,
28	context: Option<Arc<QueryContext>>,
29	headers: ColumnHeaders,
30	last_key: Option<EncodedKey>,
31	exhausted: bool,
32}
33
34impl DictionaryScanNode {
35	pub fn new(dictionary: ResolvedDictionary, context: Arc<QueryContext>) -> Result<Self> {
36		let headers = ColumnHeaders {
37			columns: vec![Fragment::internal("id"), Fragment::internal("value")],
38		};
39
40		Ok(Self {
41			dictionary,
42			context: Some(context),
43			headers,
44			last_key: None,
45			exhausted: false,
46		})
47	}
48}
49
50impl QueryNode for DictionaryScanNode {
51	#[instrument(name = "volcano::scan::dictionary::initialize", level = "trace", skip_all)]
52	fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
53		Ok(())
54	}
55
56	#[instrument(name = "volcano::scan::dictionary::next", level = "trace", skip_all)]
57	fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
58		debug_assert!(self.context.is_some(), "DictionaryScan::next() called before initialize()");
59		let stored_ctx = self.context.as_ref().unwrap();
60
61		if self.exhausted {
62			return Ok(None);
63		}
64
65		let batch_size = stored_ctx.batch_size;
66		let dict_def = self.dictionary.def();
67
68		let range = DictionaryEntryIndexKey::full_scan(dict_def.id);
69
70		let mut ids: Vec<DictionaryEntryId> = Vec::new();
71		let mut values: Vec<Value> = Vec::new();
72		let mut new_last_key = None;
73
74		let stream = rx.range(range, batch_size as usize)?;
75		let mut count = 0;
76
77		for entry in stream {
78			let entry = entry?;
79
80			if let Some(ref last) = self.last_key
81				&& &entry.key <= last
82			{
83				continue;
84			}
85
86			if let Some(key) = DictionaryEntryIndexKey::decode(&entry.key) {
87				let entry_id = DictionaryEntryId::from_u128(key.id as u128, dict_def.id_type.clone())?;
88
89				let value: Value = from_bytes(&entry.row).map_err(|e| {
90					internal_error!("Failed to deserialize dictionary value: {}", e)
91				})?;
92
93				ids.push(entry_id);
94				values.push(value);
95				new_last_key = Some(entry.key);
96
97				count += 1;
98				if count >= batch_size as usize {
99					break;
100				}
101			}
102		}
103
104		if ids.is_empty() {
105			self.exhausted = true;
106			if self.last_key.is_none() {
107				let columns = Columns::new(vec![
108					ColumnWithName {
109						name: Fragment::internal("id"),
110						data: ColumnBuffer::none_typed(dict_def.id_type.clone(), 0),
111					},
112					ColumnWithName {
113						name: Fragment::internal("value"),
114						data: ColumnBuffer::none_typed(dict_def.value_type.clone(), 0),
115					},
116				]);
117				return Ok(Some(columns));
118			}
119			return Ok(None);
120		}
121
122		self.last_key = new_last_key;
123
124		let id_column = build_id_column(&ids, dict_def.id_type.clone())?;
125		let value_column = build_value_column(&values, dict_def.value_type.clone())?;
126
127		let columns = Columns::new(vec![id_column, value_column]);
128
129		Ok(Some(columns))
130	}
131
132	fn headers(&self) -> Option<ColumnHeaders> {
133		Some(self.headers.clone())
134	}
135}
136
137fn build_id_column(ids: &[DictionaryEntryId], id_type: Type) -> Result<ColumnWithName> {
138	let data = match id_type {
139		Type::Uint1 => {
140			let vals: Vec<u8> = ids.iter().map(|id| id.to_u128() as u8).collect();
141			ColumnBuffer::uint1(vals)
142		}
143		Type::Uint2 => {
144			let vals: Vec<u16> = ids.iter().map(|id| id.to_u128() as u16).collect();
145			ColumnBuffer::uint2(vals)
146		}
147		Type::Uint4 => {
148			let vals: Vec<u32> = ids.iter().map(|id| id.to_u128() as u32).collect();
149			ColumnBuffer::uint4(vals)
150		}
151		Type::Uint8 => {
152			let vals: Vec<u64> = ids.iter().map(|id| id.to_u128() as u64).collect();
153			ColumnBuffer::uint8(vals)
154		}
155		Type::Uint16 => {
156			let vals: Vec<u128> = ids.iter().map(|id| id.to_u128()).collect();
157			ColumnBuffer::uint16(vals)
158		}
159		_ => return Err(internal_error!("Invalid dictionary id_type: {:?}", id_type)),
160	};
161
162	Ok(ColumnWithName {
163		name: Fragment::internal("id"),
164		data,
165	})
166}
167
168fn build_value_column(values: &[Value], value_type: Type) -> Result<ColumnWithName> {
169	let data = match value_type {
170		Type::Utf8 => {
171			let vals: Vec<String> = values
172				.iter()
173				.map(|v| match v {
174					Value::Utf8(s) => s.clone(),
175					_ => format!("{:?}", v),
176				})
177				.collect();
178			ColumnBuffer::utf8(vals)
179		}
180		Type::Int1 => {
181			let vals: Vec<i8> = values
182				.iter()
183				.map(|v| match v {
184					Value::Int1(n) => *n,
185					_ => 0,
186				})
187				.collect();
188			ColumnBuffer::int1(vals)
189		}
190		Type::Int2 => {
191			let vals: Vec<i16> = values
192				.iter()
193				.map(|v| match v {
194					Value::Int2(n) => *n,
195					_ => 0,
196				})
197				.collect();
198			ColumnBuffer::int2(vals)
199		}
200		Type::Int4 => {
201			let vals: Vec<i32> = values
202				.iter()
203				.map(|v| match v {
204					Value::Int4(n) => *n,
205					_ => 0,
206				})
207				.collect();
208			ColumnBuffer::int4(vals)
209		}
210		Type::Int8 => {
211			let vals: Vec<i64> = values
212				.iter()
213				.map(|v| match v {
214					Value::Int8(n) => *n,
215					_ => 0,
216				})
217				.collect();
218			ColumnBuffer::int8(vals)
219		}
220		Type::Uint1 => {
221			let vals: Vec<u8> = values
222				.iter()
223				.map(|v| match v {
224					Value::Uint1(n) => *n,
225					_ => 0,
226				})
227				.collect();
228			ColumnBuffer::uint1(vals)
229		}
230		Type::Uint2 => {
231			let vals: Vec<u16> = values
232				.iter()
233				.map(|v| match v {
234					Value::Uint2(n) => *n,
235					_ => 0,
236				})
237				.collect();
238			ColumnBuffer::uint2(vals)
239		}
240		Type::Uint4 => {
241			let vals: Vec<u32> = values
242				.iter()
243				.map(|v| match v {
244					Value::Uint4(n) => *n,
245					_ => 0,
246				})
247				.collect();
248			ColumnBuffer::uint4(vals)
249		}
250		Type::Uint8 => {
251			let vals: Vec<u64> = values
252				.iter()
253				.map(|v| match v {
254					Value::Uint8(n) => *n,
255					_ => 0,
256				})
257				.collect();
258			ColumnBuffer::uint8(vals)
259		}
260		_ => {
261			let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
262			ColumnBuffer::utf8(vals)
263		}
264	};
265
266	Ok(ColumnWithName {
267		name: Fragment::internal("value"),
268		data,
269	})
270}