reifydb-engine 0.4.12

Query execution and processing engine for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_catalog::catalog::namespace::NamespaceToCreate;
use reifydb_core::{
	interface::catalog::{change::CatalogTrackNamespaceChangeOperations, id::NamespaceId},
	value::column::columns::Columns,
};
use reifydb_rql::nodes::CreateNamespaceNode;
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::value::Value;

use crate::{Result, vm::services::Services};

pub(crate) fn create_namespace(
	services: &Services,
	txn: &mut AdminTransaction,
	plan: CreateNamespaceNode,
) -> Result<Columns> {
	let full_name: String = plan.segments.iter().map(|s| s.text()).collect::<Vec<_>>().join("::");

	// Auto-create parent namespaces (mkdir -p semantics)
	let mut parent_id = NamespaceId::ROOT;
	for i in 0..plan.segments.len().saturating_sub(1) {
		let prefix: String = plan.segments[..=i].iter().map(|s| s.text()).collect::<Vec<_>>().join("::");
		if let Some(existing) =
			services.catalog.find_namespace_by_name(&mut Transaction::Admin(txn), &prefix)?
		{
			parent_id = existing.id();
		} else {
			let result = services.catalog.create_namespace(
				txn,
				NamespaceToCreate {
					namespace_fragment: Some(plan.segments[i].clone()),
					name: prefix,
					local_name: plan.segments[i].text().to_string(),
					parent_id,
					grpc: None,
					token: None,
				},
			)?;
			txn.track_namespace_created(result.clone())?;
			parent_id = result.id();
		}
	}

	// Create the final (leaf) namespace
	if let Some(existing) = services.catalog.find_namespace_by_name(&mut Transaction::Admin(txn), &full_name)?
		&& plan.if_not_exists
	{
		return Ok(Columns::single_row([
			("id", Value::Uint8(existing.id().0)),
			("namespace", Value::Utf8(full_name)),
			("created", Value::Boolean(false)),
		]));
	}

	let result = services.catalog.create_namespace(
		txn,
		NamespaceToCreate {
			namespace_fragment: plan.segments.last().cloned(),
			name: full_name,
			local_name: plan.segments.last().unwrap().text().to_string(),
			parent_id,
			grpc: None,
			token: None,
		},
	)?;
	txn.track_namespace_created(result.clone())?;

	Ok(Columns::single_row([
		("id", Value::Uint8(result.id().0)),
		("namespace", Value::Utf8(result.name().to_string())),
		("created", Value::Boolean(true)),
	]))
}

#[cfg(test)]
pub mod tests {
	use reifydb_type::{params::Params, value::Value};

	use crate::{
		test_harness::create_test_admin_transaction,
		vm::{Admin, executor::Executor},
	};

	#[test]
	fn test_create_namespace() {
		let instance = Executor::testing();
		let mut txn = create_test_admin_transaction();

		// First creation should succeed
		let r = instance.admin(
			&mut txn,
			Admin {
				rql: "CREATE NAMESPACE my_shape",
				params: Params::default(),
			},
		);
		if let Some(e) = r.error {
			panic!("{e:?}");
		}
		let frame = &r[0];

		assert_eq!(frame[0].get_value(0), Value::Uint8(16385));
		assert_eq!(frame[1].get_value(0), Value::Utf8("my_shape".to_string()));
		assert_eq!(frame[2].get_value(0), Value::Boolean(true));

		// Creating the same namespace again with `IF NOT EXISTS`
		// should not error and return the same id
		let r = instance.admin(
			&mut txn,
			Admin {
				rql: "CREATE NAMESPACE IF NOT EXISTS my_shape",
				params: Params::default(),
			},
		);
		if let Some(e) = r.error {
			panic!("{e:?}");
		}
		let frame = &r[0];
		assert_eq!(frame[0].get_value(0), Value::Uint8(16385));
		assert_eq!(frame[1].get_value(0), Value::Utf8("my_shape".to_string()));
		assert_eq!(frame[2].get_value(0), Value::Boolean(false));

		// Creating the same namespace again without `IF NOT EXISTS`
		// should return error
		let r = instance.admin(
			&mut txn,
			Admin {
				rql: "CREATE NAMESPACE my_shape",
				params: Params::default(),
			},
		);
		assert!(r.is_err());
		assert_eq!(r.error.unwrap().diagnostic().code, "CA_001");
	}
}