reifydb-engine 0.5.0

Query execution and processing engine for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use postcard::{from_bytes, to_stdvec};
use reifydb_core::{
	common::CommitVersion,
	encoded::row::EncodedRow,
	interface::{
		catalog::{dictionary::Dictionary, shape::ShapeId},
		change::{Change, ChangeOrigin, Diff},
	},
	internal_error,
	key::{
		EncodableKey,
		dictionary::{DictionaryEntryIndexKey, DictionaryEntryKey, DictionarySequenceKey},
	},
	value::column::columns::Columns,
};
use reifydb_runtime::hash::xxh3_128;
use reifydb_transaction::{
	interceptor::dictionary_row::DictionaryRowInterceptor,
	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
};
use reifydb_type::{
	util::cowvec::CowVec,
	value::{Value, datetime::DateTime, dictionary::DictionaryEntryId},
};
use smallvec::smallvec;

use crate::Result;

pub(crate) trait DictionaryOperations {
	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId>;

	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>>;

	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>>;
}

impl DictionaryOperations for CommandTransaction {
	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId> {
		let mut values_buf = [value.clone()];
		DictionaryRowInterceptor::pre_insert(self, dictionary, &mut values_buf)?;
		let [value] = values_buf;

		let value_bytes = to_stdvec(&value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
		let hash = xxh3_128(&value_bytes).0.to_be_bytes();

		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
		if let Some(existing) = self.get(&entry_key)? {
			let id = u128::from_be_bytes(existing.row[..16].try_into().unwrap());
			return DictionaryEntryId::from_u128(id, dictionary.id_type.clone());
		}

		let seq_key = DictionarySequenceKey::encoded(dictionary.id);
		let next_id = match self.get(&seq_key)? {
			Some(v) => u128::from_be_bytes(v.row[..16].try_into().unwrap()) + 1,
			None => 1,
		};

		let entry_id = DictionaryEntryId::from_u128(next_id, dictionary.id_type.clone())?;

		let mut entry_value = Vec::with_capacity(16 + value_bytes.len());
		entry_value.extend_from_slice(&next_id.to_be_bytes());
		entry_value.extend_from_slice(&value_bytes);
		self.set(&entry_key, EncodedRow(CowVec::new(entry_value)))?;

		let index_key = DictionaryEntryIndexKey::encoded(dictionary.id, next_id as u64);
		self.set(&index_key, EncodedRow(CowVec::new(value_bytes)))?;

		self.set(&seq_key, EncodedRow(CowVec::new(next_id.to_be_bytes().to_vec())))?;

		let ids = [entry_id];
		let values = [value.clone()];
		DictionaryRowInterceptor::post_insert(self, dictionary, &ids, &values)?;

		Ok(entry_id)
	}

	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>> {
		let index_key = DictionaryEntryIndexKey::new(dictionary.id, id.to_u128() as u64).encode();
		match self.get(&index_key)? {
			Some(v) => {
				let value: Value = from_bytes(&v.row)
					.map_err(|e| internal_error!("Failed to deserialize value: {}", e))?;
				Ok(Some(value))
			}
			None => Ok(None),
		}
	}

	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>> {
		let value_bytes = to_stdvec(value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
		let hash = xxh3_128(&value_bytes).0.to_be_bytes();

		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
		match self.get(&entry_key)? {
			Some(v) => {
				let id = u128::from_be_bytes(v.row[..16].try_into().unwrap());
				let entry_id = DictionaryEntryId::from_u128(id, dictionary.id_type.clone())?;
				Ok(Some(entry_id))
			}
			None => Ok(None),
		}
	}
}

impl DictionaryOperations for AdminTransaction {
	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId> {
		let mut values_buf = [value.clone()];
		DictionaryRowInterceptor::pre_insert(self, dictionary, &mut values_buf)?;
		let [value] = values_buf;

		let value_bytes = to_stdvec(&value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
		let hash = xxh3_128(&value_bytes).0.to_be_bytes();

		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
		if let Some(existing) = self.get(&entry_key)? {
			let id = u128::from_be_bytes(existing.row[..16].try_into().unwrap());
			return DictionaryEntryId::from_u128(id, dictionary.id_type.clone());
		}

		let seq_key = DictionarySequenceKey::encoded(dictionary.id);
		let next_id = match self.get(&seq_key)? {
			Some(v) => u128::from_be_bytes(v.row[..16].try_into().unwrap()) + 1,
			None => 1,
		};

		let entry_id = DictionaryEntryId::from_u128(next_id, dictionary.id_type.clone())?;

		let mut entry_value = Vec::with_capacity(16 + value_bytes.len());
		entry_value.extend_from_slice(&next_id.to_be_bytes());
		entry_value.extend_from_slice(&value_bytes);
		self.set(&entry_key, EncodedRow(CowVec::new(entry_value)))?;

		let index_key = DictionaryEntryIndexKey::encoded(dictionary.id, next_id as u64);
		self.set(&index_key, EncodedRow(CowVec::new(value_bytes)))?;

		self.set(&seq_key, EncodedRow(CowVec::new(next_id.to_be_bytes().to_vec())))?;

		let ids = [entry_id];
		let values = [value.clone()];
		DictionaryRowInterceptor::post_insert(self, dictionary, &ids, &values)?;

		self.track_flow_change(Change {
			origin: ChangeOrigin::Shape(ShapeId::Dictionary(dictionary.id)),
			version: CommitVersion(0),
			diffs: smallvec![Diff::insert(Columns::single_row([("value", value)]))],
			changed_at: DateTime::default(),
		});

		Ok(entry_id)
	}

	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>> {
		let index_key = DictionaryEntryIndexKey::new(dictionary.id, id.to_u128() as u64).encode();
		match self.get(&index_key)? {
			Some(v) => {
				let value: Value = from_bytes(&v.row)
					.map_err(|e| internal_error!("Failed to deserialize value: {}", e))?;
				Ok(Some(value))
			}
			None => Ok(None),
		}
	}

	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>> {
		let value_bytes = to_stdvec(value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
		let hash = xxh3_128(&value_bytes).0.to_be_bytes();

		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
		match self.get(&entry_key)? {
			Some(v) => {
				let id = u128::from_be_bytes(v.row[..16].try_into().unwrap());
				let entry_id = DictionaryEntryId::from_u128(id, dictionary.id_type.clone())?;
				Ok(Some(entry_id))
			}
			None => Ok(None),
		}
	}
}

impl DictionaryOperations for Transaction<'_> {
	fn insert_into_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<DictionaryEntryId> {
		match self {
			Transaction::Command(cmd) => cmd.insert_into_dictionary(dictionary, value),
			Transaction::Admin(admin) => admin.insert_into_dictionary(dictionary, value),
			Transaction::Test(t) => t.inner.insert_into_dictionary(dictionary, value),
			Transaction::Query(_) => {
				Err(internal_error!("Cannot insert into dictionary during a query transaction"))
			}
			Transaction::Replica(_) => {
				Err(internal_error!("Cannot insert into dictionary during a replica transaction"))
			}
		}
	}

	fn get_from_dictionary(&mut self, dictionary: &Dictionary, id: DictionaryEntryId) -> Result<Option<Value>> {
		let index_key = DictionaryEntryIndexKey::encoded(dictionary.id, id.to_u128() as u64);
		match self.get(&index_key)? {
			Some(v) => {
				let value: Value = from_bytes(&v.row)
					.map_err(|e| internal_error!("Failed to deserialize value: {}", e))?;
				Ok(Some(value))
			}
			None => Ok(None),
		}
	}

	fn find_in_dictionary(&mut self, dictionary: &Dictionary, value: &Value) -> Result<Option<DictionaryEntryId>> {
		let value_bytes = to_stdvec(value).map_err(|e| internal_error!("Failed to serialize value: {}", e))?;
		let hash = xxh3_128(&value_bytes).0.to_be_bytes();

		let entry_key = DictionaryEntryKey::encoded(dictionary.id, hash);
		match self.get(&entry_key)? {
			Some(v) => {
				let id = u128::from_be_bytes(v.row[..16].try_into().unwrap());
				let entry_id = DictionaryEntryId::from_u128(id, dictionary.id_type.clone())?;
				Ok(Some(entry_id))
			}
			None => Ok(None),
		}
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_core::interface::catalog::{dictionary::Dictionary, id::NamespaceId};
	use reifydb_type::value::{
		Value,
		dictionary::{DictionaryEntryId, DictionaryId},
		r#type::Type,
	};

	use super::DictionaryOperations;
	use crate::test_harness::create_test_admin_transaction;

	fn test_dictionary() -> Dictionary {
		Dictionary {
			id: DictionaryId(1),
			namespace: NamespaceId::SYSTEM,
			name: "test_dict".to_string(),
			value_type: Type::Utf8,
			id_type: Type::Uint8,
		}
	}

	#[test]
	fn test_insert_into_dictionary() {
		let mut txn = create_test_admin_transaction();
		let dict = test_dictionary();
		let value = Value::Utf8("hello".to_string());

		let id = txn.insert_into_dictionary(&dict, &value).unwrap();
		assert_eq!(id, DictionaryEntryId::U8(1)); // First entry gets ID 1
	}

	#[test]
	fn test_insert_duplicate_value() {
		let mut txn = create_test_admin_transaction();
		let dict = test_dictionary();
		let value = Value::Utf8("hello".to_string());

		let id1 = txn.insert_into_dictionary(&dict, &value).unwrap();
		let id2 = txn.insert_into_dictionary(&dict, &value).unwrap();

		// Same value should return same ID
		assert_eq!(id1, id2);
		assert_eq!(id1, DictionaryEntryId::U8(1));
	}

	#[test]
	fn test_insert_multiple_values() {
		let mut txn = create_test_admin_transaction();
		let dict = test_dictionary();

		let id1 = txn.insert_into_dictionary(&dict, &Value::Utf8("hello".to_string())).unwrap();
		let id2 = txn.insert_into_dictionary(&dict, &Value::Utf8("world".to_string())).unwrap();
		let id3 = txn.insert_into_dictionary(&dict, &Value::Utf8("foo".to_string())).unwrap();

		// Different values get sequential IDs
		assert_eq!(id1, DictionaryEntryId::U8(1));
		assert_eq!(id2, DictionaryEntryId::U8(2));
		assert_eq!(id3, DictionaryEntryId::U8(3));
	}

	#[test]
	fn test_get_from_dictionary() {
		let mut txn = create_test_admin_transaction();
		let dict = test_dictionary();
		let value = Value::Utf8("hello".to_string());

		let id = txn.insert_into_dictionary(&dict, &value).unwrap();
		let retrieved = txn.get_from_dictionary(&dict, id).unwrap();

		assert_eq!(retrieved, Some(value));
	}

	#[test]
	fn test_get_nonexistent_id() {
		let mut txn = create_test_admin_transaction();
		let dict = test_dictionary();

		// Try to get an ID that doesn't exist
		let retrieved = txn.get_from_dictionary(&dict, DictionaryEntryId::U8(999)).unwrap();
		assert_eq!(retrieved, None);
	}

	#[test]
	fn test_find_in_dictionary() {
		let mut txn = create_test_admin_transaction();
		let dict = test_dictionary();
		let value = Value::Utf8("hello".to_string());

		// First insert a value
		let id = txn.insert_into_dictionary(&dict, &value).unwrap();

		// Then find should locate it
		let found = txn.find_in_dictionary(&dict, &value).unwrap();
		assert_eq!(found, Some(id));
	}

	#[test]
	fn test_find_nonexistent_value() {
		let mut txn = create_test_admin_transaction();
		let dict = test_dictionary();
		let value = Value::Utf8("not_inserted".to_string());

		// Find without inserting should return None
		let found = txn.find_in_dictionary(&dict, &value).unwrap();
		assert_eq!(found, None);
	}

	#[test]
	fn test_dictionary_with_uint1_id() {
		let mut txn = create_test_admin_transaction();
		let dict = Dictionary {
			id: DictionaryId(2),
			namespace: NamespaceId::SYSTEM,
			name: "dict_u1".to_string(),
			value_type: Type::Utf8,
			id_type: Type::Uint1,
		};

		let id = txn.insert_into_dictionary(&dict, &Value::Utf8("test".to_string())).unwrap();
		assert_eq!(id, DictionaryEntryId::U1(1));
		assert_eq!(id.id_type(), Type::Uint1);
	}

	#[test]
	fn test_dictionary_with_uint2_id() {
		let mut txn = create_test_admin_transaction();
		let dict = Dictionary {
			id: DictionaryId(3),
			namespace: NamespaceId::SYSTEM,
			name: "dict_u2".to_string(),
			value_type: Type::Utf8,
			id_type: Type::Uint2,
		};

		let id = txn.insert_into_dictionary(&dict, &Value::Utf8("test".to_string())).unwrap();
		assert_eq!(id, DictionaryEntryId::U2(1));
		assert_eq!(id.id_type(), Type::Uint2);
	}

	#[test]
	fn test_dictionary_with_uint4_id() {
		let mut txn = create_test_admin_transaction();
		let dict = Dictionary {
			id: DictionaryId(4),
			namespace: NamespaceId::SYSTEM,
			name: "dict_u4".to_string(),
			value_type: Type::Utf8,
			id_type: Type::Uint4,
		};

		let id = txn.insert_into_dictionary(&dict, &Value::Utf8("test".to_string())).unwrap();
		assert_eq!(id, DictionaryEntryId::U4(1));
		assert_eq!(id.id_type(), Type::Uint4);
	}
}