Skip to main content

reifydb_engine/vm/volcano/scan/
dictionary.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use postcard::from_bytes;
7use reifydb_core::{
8	encoded::key::EncodedKey,
9	interface::resolved::ResolvedDictionary,
10	internal_error,
11	key::{EncodableKey, dictionary::DictionaryEntryIndexKey},
12	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
13};
14use reifydb_transaction::transaction::Transaction;
15use reifydb_type::{
16	fragment::Fragment,
17	value::{Value, dictionary::DictionaryEntryId, r#type::Type},
18};
19use tracing::instrument;
20
21use crate::{
22	Result,
23	vm::volcano::query::{QueryContext, QueryNode},
24};
25
26pub struct DictionaryScanNode {
27	dictionary: ResolvedDictionary,
28	context: Option<Arc<QueryContext>>,
29	headers: ColumnHeaders,
30	last_key: Option<EncodedKey>,
31	exhausted: bool,
32	scan_limit: Option<usize>,
33}
34
35impl DictionaryScanNode {
36	pub fn new(dictionary: ResolvedDictionary, context: Arc<QueryContext>) -> Result<Self> {
37		// Create column headers for dictionary scan: (id, value)
38		let headers = ColumnHeaders {
39			columns: vec![Fragment::internal("id"), Fragment::internal("value")],
40		};
41
42		Ok(Self {
43			dictionary,
44			context: Some(context),
45			headers,
46			last_key: None,
47			exhausted: false,
48			scan_limit: None,
49		})
50	}
51}
52
53impl QueryNode for DictionaryScanNode {
54	#[instrument(name = "volcano::scan::dictionary::initialize", level = "trace", skip_all)]
55	fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
56		// Already has context from constructor
57		Ok(())
58	}
59
60	#[instrument(name = "volcano::scan::dictionary::next", level = "trace", skip_all)]
61	fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
62		debug_assert!(self.context.is_some(), "DictionaryScan::next() called before initialize()");
63		let stored_ctx = self.context.as_ref().unwrap();
64
65		if self.exhausted {
66			return Ok(None);
67		}
68
69		let batch_size = match self.scan_limit {
70			Some(limit) => (limit as u64).min(stored_ctx.batch_size),
71			None => stored_ctx.batch_size,
72		};
73		let dict_def = self.dictionary.def();
74
75		// Create scan range for dictionary entries
76		let range = DictionaryEntryIndexKey::full_scan(dict_def.id);
77
78		// Collect entries for this batch
79		let mut ids: Vec<DictionaryEntryId> = Vec::new();
80		let mut values: Vec<Value> = Vec::new();
81		let mut new_last_key = None;
82
83		// Get entries from storage using stream
84		let stream = rx.range(range, batch_size as usize)?;
85		let mut count = 0;
86
87		for entry in stream {
88			let entry = entry?;
89
90			// Skip entries we've already seen
91			if let Some(ref last) = self.last_key
92				&& &entry.key <= last
93			{
94				continue;
95			}
96
97			// Decode the key to get the entry ID
98			if let Some(key) = DictionaryEntryIndexKey::decode(&entry.key) {
99				// Create DictionaryEntryId with proper type
100				let entry_id = DictionaryEntryId::from_u128(key.id as u128, dict_def.id_type.clone())?;
101
102				// Decode the value from the entry
103				let value: Value = from_bytes(&entry.row).map_err(|e| {
104					internal_error!("Failed to deserialize dictionary value: {}", e)
105				})?;
106
107				ids.push(entry_id);
108				values.push(value);
109				new_last_key = Some(entry.key);
110
111				count += 1;
112				if count >= batch_size as usize {
113					break;
114				}
115			}
116		}
117
118		if ids.is_empty() {
119			self.exhausted = true;
120			if self.last_key.is_none() {
121				// Empty dictionary: return empty columns with correct types to preserve shape
122				let columns = Columns::new(vec![
123					ColumnWithName {
124						name: Fragment::internal("id"),
125						data: ColumnBuffer::none_typed(dict_def.id_type.clone(), 0),
126					},
127					ColumnWithName {
128						name: Fragment::internal("value"),
129						data: ColumnBuffer::none_typed(dict_def.value_type.clone(), 0),
130					},
131				]);
132				return Ok(Some(columns));
133			}
134			return Ok(None);
135		}
136
137		self.last_key = new_last_key;
138
139		// Build columns based on dictionary types
140		let id_column = build_id_column(&ids, dict_def.id_type.clone())?;
141		let value_column = build_value_column(&values, dict_def.value_type.clone())?;
142
143		let columns = Columns::new(vec![id_column, value_column]);
144
145		Ok(Some(columns))
146	}
147
148	fn headers(&self) -> Option<ColumnHeaders> {
149		Some(self.headers.clone())
150	}
151
152	fn set_scan_limit(&mut self, limit: usize) {
153		self.scan_limit = Some(limit);
154	}
155}
156
157/// Build the ID column based on the dictionary's id_type
158fn build_id_column(ids: &[DictionaryEntryId], id_type: Type) -> Result<ColumnWithName> {
159	let data = match id_type {
160		Type::Uint1 => {
161			let vals: Vec<u8> = ids.iter().map(|id| id.to_u128() as u8).collect();
162			ColumnBuffer::uint1(vals)
163		}
164		Type::Uint2 => {
165			let vals: Vec<u16> = ids.iter().map(|id| id.to_u128() as u16).collect();
166			ColumnBuffer::uint2(vals)
167		}
168		Type::Uint4 => {
169			let vals: Vec<u32> = ids.iter().map(|id| id.to_u128() as u32).collect();
170			ColumnBuffer::uint4(vals)
171		}
172		Type::Uint8 => {
173			let vals: Vec<u64> = ids.iter().map(|id| id.to_u128() as u64).collect();
174			ColumnBuffer::uint8(vals)
175		}
176		Type::Uint16 => {
177			let vals: Vec<u128> = ids.iter().map(|id| id.to_u128()).collect();
178			ColumnBuffer::uint16(vals)
179		}
180		_ => return Err(internal_error!("Invalid dictionary id_type: {:?}", id_type)),
181	};
182
183	Ok(ColumnWithName {
184		name: Fragment::internal("id"),
185		data,
186	})
187}
188
189/// Build the value column based on the dictionary's value_type
190fn build_value_column(values: &[Value], value_type: Type) -> Result<ColumnWithName> {
191	let data = match value_type {
192		Type::Utf8 => {
193			let vals: Vec<String> = values
194				.iter()
195				.map(|v| match v {
196					Value::Utf8(s) => s.clone(),
197					_ => format!("{:?}", v), // Fallback representation
198				})
199				.collect();
200			ColumnBuffer::utf8(vals)
201		}
202		Type::Int1 => {
203			let vals: Vec<i8> = values
204				.iter()
205				.map(|v| match v {
206					Value::Int1(n) => *n,
207					_ => 0,
208				})
209				.collect();
210			ColumnBuffer::int1(vals)
211		}
212		Type::Int2 => {
213			let vals: Vec<i16> = values
214				.iter()
215				.map(|v| match v {
216					Value::Int2(n) => *n,
217					_ => 0,
218				})
219				.collect();
220			ColumnBuffer::int2(vals)
221		}
222		Type::Int4 => {
223			let vals: Vec<i32> = values
224				.iter()
225				.map(|v| match v {
226					Value::Int4(n) => *n,
227					_ => 0,
228				})
229				.collect();
230			ColumnBuffer::int4(vals)
231		}
232		Type::Int8 => {
233			let vals: Vec<i64> = values
234				.iter()
235				.map(|v| match v {
236					Value::Int8(n) => *n,
237					_ => 0,
238				})
239				.collect();
240			ColumnBuffer::int8(vals)
241		}
242		Type::Uint1 => {
243			let vals: Vec<u8> = values
244				.iter()
245				.map(|v| match v {
246					Value::Uint1(n) => *n,
247					_ => 0,
248				})
249				.collect();
250			ColumnBuffer::uint1(vals)
251		}
252		Type::Uint2 => {
253			let vals: Vec<u16> = values
254				.iter()
255				.map(|v| match v {
256					Value::Uint2(n) => *n,
257					_ => 0,
258				})
259				.collect();
260			ColumnBuffer::uint2(vals)
261		}
262		Type::Uint4 => {
263			let vals: Vec<u32> = values
264				.iter()
265				.map(|v| match v {
266					Value::Uint4(n) => *n,
267					_ => 0,
268				})
269				.collect();
270			ColumnBuffer::uint4(vals)
271		}
272		Type::Uint8 => {
273			let vals: Vec<u64> = values
274				.iter()
275				.map(|v| match v {
276					Value::Uint8(n) => *n,
277					_ => 0,
278				})
279				.collect();
280			ColumnBuffer::uint8(vals)
281		}
282		_ => {
283			// For other types, convert to string representation
284			let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
285			ColumnBuffer::utf8(vals)
286		}
287	};
288
289	Ok(ColumnWithName {
290		name: Fragment::internal("value"),
291		data,
292	})
293}