reifydb-rql 0.4.12

ReifyDB Query Language (RQL) parser and AST
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_catalog::error::{CatalogError, CatalogObjectKind};
use reifydb_core::{error::diagnostic::catalog::namespace_not_found, interface::resolved::ResolvedNamespace};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{fragment::Fragment, return_error};

use crate::{
	Result,
	nodes::CreateSeriesNode,
	plan::{
		logical,
		physical::{Compiler, PhysicalPlan},
	},
};

impl<'bump> Compiler<'bump> {
	pub(crate) fn compile_create_series(
		&mut self,
		rx: &mut Transaction<'_>,
		create: logical::CreateSeriesNode<'_>,
	) -> Result<PhysicalPlan<'bump>> {
		let ns_segments: Vec<&str> = create.series.namespace.iter().map(|n| n.text()).collect();
		let Some(namespace) = self.catalog.find_namespace_by_segments(rx, &ns_segments)? else {
			let ns_fragment = if let Some(n) = create.series.namespace.first() {
				let interned = self.interner.intern_fragment(n);
				interned.with_text(ns_segments.join("::"))
			} else {
				Fragment::internal("default".to_string())
			};
			return_error!(namespace_not_found(ns_fragment, &ns_segments.join("::")));
		};

		let namespace_id = if let Some(n) = create.series.namespace.first() {
			let interned = self.interner.intern_fragment(n);
			interned.with_text(namespace.name())
		} else {
			Fragment::internal(namespace.name().to_string())
		};
		let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);

		// Resolve optional tag type
		let tag = if let Some(tag_ident) = create.tag {
			let tag_ns_segments: Vec<&str> = if tag_ident.namespace.is_empty() {
				ns_segments.clone()
			} else {
				tag_ident.namespace.iter().map(|n| n.text()).collect()
			};
			let Some(tag_ns) = self.catalog.find_namespace_by_segments(rx, &tag_ns_segments)? else {
				let ns_fragment = if let Some(n) = tag_ident.namespace.first() {
					let interned = self.interner.intern_fragment(n);
					interned.with_text(tag_ns_segments.join("::"))
				} else {
					Fragment::internal(tag_ns_segments.join("::"))
				};
				return Err(CatalogError::NotFound {
					kind: CatalogObjectKind::Tag,
					namespace: tag_ns_segments.join("::"),
					name: tag_ident.name.text().to_string(),
					fragment: ns_fragment,
				}
				.into());
			};

			let tag_name = tag_ident.name.text();
			let Some(sumtype) = self.catalog.find_sumtype_by_name(rx, tag_ns.id(), tag_name)? else {
				return Err(CatalogError::NotFound {
					kind: CatalogObjectKind::Tag,
					namespace: tag_ns_segments.join("::"),
					name: tag_name.to_string(),
					fragment: self.interner.intern_fragment(&tag_ident.name),
				}
				.into());
			};

			Some(sumtype.id)
		} else {
			None
		};

		Ok(PhysicalPlan::CreateSeries(CreateSeriesNode {
			namespace: resolved_namespace,
			series: self.interner.intern_fragment(&create.series.name),
			columns: create.columns,
			tag,
			key: create.key,
			ttl: create.ttl,
		}))
	}
}