Skip to main content

reifydb_engine/transaction/operation/
dictionary.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::{from_bytes, to_stdvec};
5use reifydb_core::{
6	common::CommitVersion,
7	encoded::row::EncodedRow,
8	interface::{
9		catalog::{dictionary::Dictionary, shape::ShapeId},
10		change::{Change, ChangeOrigin, Diff},
11	},
12	internal_error,
13	key::{
14		EncodableKey,
15		dictionary::{DictionaryEntryIndexKey, DictionaryEntryKey, DictionarySequenceKey},
16	},
17	value::column::columns::Columns,
18};
19use reifydb_runtime::hash::xxh3_128;
20use reifydb_transaction::{
21	interceptor::dictionary_row::DictionaryRowInterceptor,
22	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
23};
24use reifydb_type::{
25	util::cowvec::CowVec,
26	value::{Value, datetime::DateTime, dictionary::DictionaryEntryId},
27};
28use smallvec::smallvec;
29
30use crate::Result;
31
32pub(crate) trait DictionaryOperations {
33	/// Insert a value into the dictionary, returning its ID.
34	/// If the value already exists, returns the existing ID.
35	/// If the value is new, assigns a new ID and stores it.
36	/// The returned ID type matches the dictionary's `id_type`.
37	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId>;
38
39	/// Get a value from the dictionary by its ID.
40	/// Returns None if the ID doesn't exist.
41	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>>;
42
43	/// Find the ID of a value in the dictionary without inserting.
44	/// Returns the ID if the value exists, None otherwise.
45	/// The returned ID type matches the dictionary's `id_type`.
46	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>>;
47}
48
49impl DictionaryOperations for CommandTransaction {
50	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId> {
51		let value = DictionaryRowInterceptor::pre_insert(self, dictionary, value.clone())?;
52
53		// 1. Serialize value and compute hash
54		let value_bytes = to_stdvec(&value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
55		let hash = xxh3_128(&value_bytes).0.to_be_bytes();
56
57		// 2. Check if value already exists (lookup by hash)
58		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
59		if let Some(existing) = self.get(&entry_key)? {
60			// Value exists, return existing ID
61			let id = u128::from_be_bytes(existing.row[..16].try_into().unwrap());
62			return DictionaryEntryId::from_u128(id, dictionary.id_type.clone());
63		}
64
65		// 3. Value doesn't exist - get next ID from sequence
66		let seq_key = DictionarySequenceKey::encoded(dictionary.id);
67		let next_id = match self.get(&seq_key)? {
68			Some(v) => u128::from_be_bytes(v.row[..16].try_into().unwrap()) + 1,
69			None => 1, // First entry
70		};
71
72		// 4. Validate the new ID fits in the dictionary's id_type (early check)
73		let entry_id = DictionaryEntryId::from_u128(next_id, dictionary.id_type.clone())?;
74
75		// 5. Store the entry (hash -> id + value_bytes)
76		let mut entry_value = Vec::with_capacity(16 + value_bytes.len());
77		entry_value.extend_from_slice(&next_id.to_be_bytes());
78		entry_value.extend_from_slice(&value_bytes);
79		self.set(&entry_key, EncodedRow(CowVec::new(entry_value)))?;
80
81		// 6. Store reverse index (id -> value_bytes)
82		// Note: DictionaryEntryIndexKey currently uses u64, so we truncate
83		// This limits practical dictionary size to u64::MAX entries
84		let index_key = DictionaryEntryIndexKey::encoded(dictionary.id, next_id as u64);
85		self.set(&index_key, EncodedRow(CowVec::new(value_bytes)))?;
86
87		// 7. Update sequence
88		self.set(&seq_key, EncodedRow(CowVec::new(next_id.to_be_bytes().to_vec())))?;
89
90		DictionaryRowInterceptor::post_insert(self, dictionary, entry_id, &value)?;
91
92		Ok(entry_id)
93	}
94
95	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>> {
96		// Note: DictionaryEntryIndexKey currently uses u64, so we truncate
97		let index_key = DictionaryEntryIndexKey::new(dictionary.id, id.to_u128() as u64).encode();
98		match self.get(&index_key)? {
99			Some(v) => {
100				let value: Value = from_bytes(&v.row)
101					.map_err(|e| internal_error!("Failed to deserialize value: {}", e))?;
102				Ok(Some(value))
103			}
104			None => Ok(None),
105		}
106	}
107
108	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>> {
109		let value_bytes = to_stdvec(value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
110		let hash = xxh3_128(&value_bytes).0.to_be_bytes();
111
112		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
113		match self.get(&entry_key)? {
114			Some(v) => {
115				let id = u128::from_be_bytes(v.row[..16].try_into().unwrap());
116				let entry_id = DictionaryEntryId::from_u128(id, dictionary.id_type.clone())?;
117				Ok(Some(entry_id))
118			}
119			None => Ok(None),
120		}
121	}
122}
123
124impl DictionaryOperations for AdminTransaction {
125	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId> {
126		let value = DictionaryRowInterceptor::pre_insert(self, dictionary, value.clone())?;
127
128		// 1. Serialize value and compute hash
129		let value_bytes = to_stdvec(&value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
130		let hash = xxh3_128(&value_bytes).0.to_be_bytes();
131
132		// 2. Check if value already exists (lookup by hash)
133		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
134		if let Some(existing) = self.get(&entry_key)? {
135			// Value exists, return existing ID
136			let id = u128::from_be_bytes(existing.row[..16].try_into().unwrap());
137			return DictionaryEntryId::from_u128(id, dictionary.id_type.clone());
138		}
139
140		// 3. Value doesn't exist - get next ID from sequence
141		let seq_key = DictionarySequenceKey::encoded(dictionary.id);
142		let next_id = match self.get(&seq_key)? {
143			Some(v) => u128::from_be_bytes(v.row[..16].try_into().unwrap()) + 1,
144			None => 1, // First entry
145		};
146
147		// 4. Validate the new ID fits in the dictionary's id_type (early check)
148		let entry_id = DictionaryEntryId::from_u128(next_id, dictionary.id_type.clone())?;
149
150		// 5. Store the entry (hash -> id + value_bytes)
151		let mut entry_value = Vec::with_capacity(16 + value_bytes.len());
152		entry_value.extend_from_slice(&next_id.to_be_bytes());
153		entry_value.extend_from_slice(&value_bytes);
154		self.set(&entry_key, EncodedRow(CowVec::new(entry_value)))?;
155
156		// 6. Store reverse index (id -> value_bytes)
157		let index_key = DictionaryEntryIndexKey::encoded(dictionary.id, next_id as u64);
158		self.set(&index_key, EncodedRow(CowVec::new(value_bytes)))?;
159
160		// 7. Update sequence
161		self.set(&seq_key, EncodedRow(CowVec::new(next_id.to_be_bytes().to_vec())))?;
162
163		DictionaryRowInterceptor::post_insert(self, dictionary, entry_id, &value)?;
164
165		// Track for testing::dictionaries::changed()
166		self.track_flow_change(Change {
167			origin: ChangeOrigin::Shape(ShapeId::Dictionary(dictionary.id)),
168			version: CommitVersion(0),
169			diffs: smallvec![Diff::insert(Columns::single_row([("value", value)]))],
170			changed_at: DateTime::default(),
171		});
172
173		Ok(entry_id)
174	}
175
176	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>> {
177		let index_key = DictionaryEntryIndexKey::new(dictionary.id, id.to_u128() as u64).encode();
178		match self.get(&index_key)? {
179			Some(v) => {
180				let value: Value = from_bytes(&v.row)
181					.map_err(|e| internal_error!("Failed to deserialize value: {}", e))?;
182				Ok(Some(value))
183			}
184			None => Ok(None),
185		}
186	}
187
188	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>> {
189		let value_bytes = to_stdvec(value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
190		let hash = xxh3_128(&value_bytes).0.to_be_bytes();
191
192		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
193		match self.get(&entry_key)? {
194			Some(v) => {
195				let id = u128::from_be_bytes(v.row[..16].try_into().unwrap());
196				let entry_id = DictionaryEntryId::from_u128(id, dictionary.id_type.clone())?;
197				Ok(Some(entry_id))
198			}
199			None => Ok(None),
200		}
201	}
202}
203
204/// Implementation for Transaction (both Command and Query)
205/// This provides read-only access to dictionaries for query operations.
206impl DictionaryOperations for Transaction<'_> {
207	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId> {
208		// Only command and admin transactions can insert
209		match self {
210			Transaction::Command(cmd) => cmd.insert_into_dictionary(dictionary, value),
211			Transaction::Admin(admin) => admin.insert_into_dictionary(dictionary, value),
212			Transaction::Test(t) => t.inner.insert_into_dictionary(dictionary, value),
213			Transaction::Query(_) => {
214				Err(internal_error!("Cannot insert into dictionary during a query transaction"))
215			}
216			Transaction::Replica(_) => {
217				Err(internal_error!("Cannot insert into dictionary during a replica transaction"))
218			}
219		}
220	}
221
222	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>> {
223		// Both command and query transactions can read
224		let index_key = DictionaryEntryIndexKey::encoded(dictionary.id, id.to_u128() as u64);
225		match self.get(&index_key)? {
226			Some(v) => {
227				let value: Value = from_bytes(&v.row)
228					.map_err(|e| internal_error!("Failed to deserialize value: {}", e))?;
229				Ok(Some(value))
230			}
231			None => Ok(None),
232		}
233	}
234
235	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>> {
236		// Both command and query transactions can read
237		let value_bytes = to_stdvec(value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
238		let hash = xxh3_128(&value_bytes).0.to_be_bytes();
239
240		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
241		match self.get(&entry_key)? {
242			Some(v) => {
243				let id = u128::from_be_bytes(v.row[..16].try_into().unwrap());
244				let entry_id = DictionaryEntryId::from_u128(id, dictionary.id_type.clone())?;
245				Ok(Some(entry_id))
246			}
247			None => Ok(None),
248		}
249	}
250}
251
252#[cfg(test)]
253pub mod tests {
254	use reifydb_core::interface::catalog::{dictionary::Dictionary, id::NamespaceId};
255	use reifydb_type::value::{
256		Value,
257		dictionary::{DictionaryEntryId, DictionaryId},
258		r#type::Type,
259	};
260
261	use super::DictionaryOperations;
262	use crate::test_harness::create_test_admin_transaction;
263
264	fn test_dictionary() -> Dictionary {
265		Dictionary {
266			id: DictionaryId(1),
267			namespace: NamespaceId::SYSTEM,
268			name: "test_dict".to_string(),
269			value_type: Type::Utf8,
270			id_type: Type::Uint8,
271		}
272	}
273
274	#[test]
275	fn test_insert_into_dictionary() {
276		let mut txn = create_test_admin_transaction();
277		let dict = test_dictionary();
278		let value = Value::Utf8("hello".to_string());
279
280		let id = txn.insert_into_dictionary(&dict, &value).unwrap();
281		assert_eq!(id, DictionaryEntryId::U8(1)); // First entry gets ID 1
282	}
283
284	#[test]
285	fn test_insert_duplicate_value() {
286		let mut txn = create_test_admin_transaction();
287		let dict = test_dictionary();
288		let value = Value::Utf8("hello".to_string());
289
290		let id1 = txn.insert_into_dictionary(&dict, &value).unwrap();
291		let id2 = txn.insert_into_dictionary(&dict, &value).unwrap();
292
293		// Same value should return same ID
294		assert_eq!(id1, id2);
295		assert_eq!(id1, DictionaryEntryId::U8(1));
296	}
297
298	#[test]
299	fn test_insert_multiple_values() {
300		let mut txn = create_test_admin_transaction();
301		let dict = test_dictionary();
302
303		let id1 = txn.insert_into_dictionary(&dict, &Value::Utf8("hello".to_string())).unwrap();
304		let id2 = txn.insert_into_dictionary(&dict, &Value::Utf8("world".to_string())).unwrap();
305		let id3 = txn.insert_into_dictionary(&dict, &Value::Utf8("foo".to_string())).unwrap();
306
307		// Different values get sequential IDs
308		assert_eq!(id1, DictionaryEntryId::U8(1));
309		assert_eq!(id2, DictionaryEntryId::U8(2));
310		assert_eq!(id3, DictionaryEntryId::U8(3));
311	}
312
313	#[test]
314	fn test_get_from_dictionary() {
315		let mut txn = create_test_admin_transaction();
316		let dict = test_dictionary();
317		let value = Value::Utf8("hello".to_string());
318
319		let id = txn.insert_into_dictionary(&dict, &value).unwrap();
320		let retrieved = txn.get_from_dictionary(&dict, id).unwrap();
321
322		assert_eq!(retrieved, Some(value));
323	}
324
325	#[test]
326	fn test_get_nonexistent_id() {
327		let mut txn = create_test_admin_transaction();
328		let dict = test_dictionary();
329
330		// Try to get an ID that doesn't exist
331		let retrieved = txn.get_from_dictionary(&dict, DictionaryEntryId::U8(999)).unwrap();
332		assert_eq!(retrieved, None);
333	}
334
335	#[test]
336	fn test_find_in_dictionary() {
337		let mut txn = create_test_admin_transaction();
338		let dict = test_dictionary();
339		let value = Value::Utf8("hello".to_string());
340
341		// First insert a value
342		let id = txn.insert_into_dictionary(&dict, &value).unwrap();
343
344		// Then find should locate it
345		let found = txn.find_in_dictionary(&dict, &value).unwrap();
346		assert_eq!(found, Some(id));
347	}
348
349	#[test]
350	fn test_find_nonexistent_value() {
351		let mut txn = create_test_admin_transaction();
352		let dict = test_dictionary();
353		let value = Value::Utf8("not_inserted".to_string());
354
355		// Find without inserting should return None
356		let found = txn.find_in_dictionary(&dict, &value).unwrap();
357		assert_eq!(found, None);
358	}
359
360	#[test]
361	fn test_dictionary_with_uint1_id() {
362		let mut txn = create_test_admin_transaction();
363		let dict = Dictionary {
364			id: DictionaryId(2),
365			namespace: NamespaceId::SYSTEM,
366			name: "dict_u1".to_string(),
367			value_type: Type::Utf8,
368			id_type: Type::Uint1,
369		};
370
371		let id = txn.insert_into_dictionary(&dict, &Value::Utf8("test".to_string())).unwrap();
372		assert_eq!(id, DictionaryEntryId::U1(1));
373		assert_eq!(id.id_type(), Type::Uint1);
374	}
375
376	#[test]
377	fn test_dictionary_with_uint2_id() {
378		let mut txn = create_test_admin_transaction();
379		let dict = Dictionary {
380			id: DictionaryId(3),
381			namespace: NamespaceId::SYSTEM,
382			name: "dict_u2".to_string(),
383			value_type: Type::Utf8,
384			id_type: Type::Uint2,
385		};
386
387		let id = txn.insert_into_dictionary(&dict, &Value::Utf8("test".to_string())).unwrap();
388		assert_eq!(id, DictionaryEntryId::U2(1));
389		assert_eq!(id.id_type(), Type::Uint2);
390	}
391
392	#[test]
393	fn test_dictionary_with_uint4_id() {
394		let mut txn = create_test_admin_transaction();
395		let dict = Dictionary {
396			id: DictionaryId(4),
397			namespace: NamespaceId::SYSTEM,
398			name: "dict_u4".to_string(),
399			value_type: Type::Utf8,
400			id_type: Type::Uint4,
401		};
402
403		let id = txn.insert_into_dictionary(&dict, &Value::Utf8("test".to_string())).unwrap();
404		assert_eq!(id, DictionaryEntryId::U4(1));
405		assert_eq!(id.id_type(), Type::Uint4);
406	}
407}