use reifydb_core::{
interface::catalog::{change::CatalogTrackNamespaceChangeOperations, id::NamespaceId, namespace::Namespace},
internal,
};
use reifydb_transaction::{
change::TransactionalNamespaceChanges,
transaction::{Transaction, admin::AdminTransaction},
};
use reifydb_type::{error, fragment::Fragment};
use tracing::{instrument, warn};
use crate::{
CatalogStore, Result,
catalog::Catalog,
error::{CatalogError, CatalogObjectKind},
store::namespace::create::NamespaceToCreate as StoreNamespaceToCreate,
};
#[derive(Debug, Clone)]
pub struct NamespaceToCreate {
pub namespace_fragment: Option<Fragment>,
pub name: String,
pub local_name: String,
pub parent_id: NamespaceId,
pub grpc: Option<String>,
pub token: Option<String>,
}
impl From<NamespaceToCreate> for StoreNamespaceToCreate {
fn from(to_create: NamespaceToCreate) -> Self {
StoreNamespaceToCreate {
namespace_fragment: to_create.namespace_fragment,
name: to_create.name,
local_name: to_create.local_name,
parent_id: to_create.parent_id,
grpc: to_create.grpc,
token: to_create.token,
}
}
}
impl Catalog {
#[instrument(name = "catalog::namespace::find", level = "trace", skip(self, txn))]
pub fn find_namespace(&self, txn: &mut Transaction<'_>, id: NamespaceId) -> Result<Option<Namespace>> {
match txn.reborrow() {
Transaction::Command(cmd) => {
if let Some(namespace) = self.materialized.find_namespace_at(id, cmd.version()) {
return Ok(Some(namespace));
}
if let Some(namespace) =
CatalogStore::find_namespace(&mut Transaction::Command(&mut *cmd), id)?
{
warn!(
"Namespace with ID {:?} found in storage but not in MaterializedCatalog",
id
);
return Ok(Some(namespace));
}
Ok(None)
}
Transaction::Admin(admin) => {
if let Some(namespace) = TransactionalNamespaceChanges::find_namespace(admin, id) {
return Ok(Some(namespace.clone()));
}
if TransactionalNamespaceChanges::is_namespace_deleted(admin, id) {
return Ok(None);
}
if let Some(namespace) = self.materialized.find_namespace_at(id, admin.version()) {
return Ok(Some(namespace));
}
if let Some(namespace) =
CatalogStore::find_namespace(&mut Transaction::Admin(&mut *admin), id)?
{
warn!(
"Namespace with ID {:?} found in storage but not in MaterializedCatalog",
id
);
return Ok(Some(namespace));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(namespace) = self.materialized.find_namespace_at(id, qry.version()) {
return Ok(Some(namespace));
}
if let Some(namespace) =
CatalogStore::find_namespace(&mut Transaction::Query(&mut *qry), id)?
{
warn!(
"Namespace with ID {:?} found in storage but not in MaterializedCatalog",
id
);
return Ok(Some(namespace));
}
Ok(None)
}
Transaction::Test(mut t) => {
if let Some(ns) = TransactionalNamespaceChanges::find_namespace(t.inner, id) {
return Ok(Some(ns.clone()));
}
if TransactionalNamespaceChanges::is_namespace_deleted(t.inner, id) {
return Ok(None);
}
if let Some(ns) = CatalogStore::find_namespace(
&mut Transaction::Test(Box::new(t.reborrow())),
id,
)? {
return Ok(Some(ns));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(namespace) = self.materialized.find_namespace_at(id, rep.version()) {
return Ok(Some(namespace));
}
if let Some(namespace) =
CatalogStore::find_namespace(&mut Transaction::Replica(&mut *rep), id)?
{
warn!(
"Namespace with ID {:?} found in storage but not in MaterializedCatalog",
id
);
return Ok(Some(namespace));
}
Ok(None)
}
}
}
#[instrument(name = "catalog::namespace::find_by_name", level = "trace", skip(self, txn, name))]
pub fn find_namespace_by_name(&self, txn: &mut Transaction<'_>, name: &str) -> Result<Option<Namespace>> {
match txn.reborrow() {
Transaction::Command(cmd) => {
if let Some(namespace) =
self.materialized.find_namespace_by_name_at(name, cmd.version())
{
return Ok(Some(namespace));
}
if let Some(namespace) = CatalogStore::find_namespace_by_name(
&mut Transaction::Command(&mut *cmd),
name,
)? {
warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(namespace));
}
Ok(None)
}
Transaction::Admin(admin) => {
if let Some(namespace) =
TransactionalNamespaceChanges::find_namespace_by_name(admin, name)
{
return Ok(Some(namespace.clone()));
}
if TransactionalNamespaceChanges::is_namespace_deleted_by_name(admin, name) {
return Ok(None);
}
if let Some(namespace) =
self.materialized.find_namespace_by_name_at(name, admin.version())
{
return Ok(Some(namespace));
}
if let Some(namespace) = CatalogStore::find_namespace_by_name(
&mut Transaction::Admin(&mut *admin),
name,
)? {
warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(namespace));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(namespace) =
self.materialized.find_namespace_by_name_at(name, qry.version())
{
return Ok(Some(namespace));
}
if let Some(namespace) =
CatalogStore::find_namespace_by_name(&mut Transaction::Query(&mut *qry), name)?
{
warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(namespace));
}
Ok(None)
}
Transaction::Test(mut t) => {
if let Some(ns) = TransactionalNamespaceChanges::find_namespace_by_name(t.inner, name) {
return Ok(Some(ns.clone()));
}
if TransactionalNamespaceChanges::is_namespace_deleted_by_name(t.inner, name) {
return Ok(None);
}
if let Some(ns) = CatalogStore::find_namespace_by_name(
&mut Transaction::Test(Box::new(t.reborrow())),
name,
)? {
return Ok(Some(ns));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(namespace) =
self.materialized.find_namespace_by_name_at(name, rep.version())
{
return Ok(Some(namespace));
}
if let Some(namespace) = CatalogStore::find_namespace_by_name(
&mut Transaction::Replica(&mut *rep),
name,
)? {
warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(namespace));
}
Ok(None)
}
}
}
fn find_child_namespace(
&self,
txn: &mut Transaction<'_>,
parent_id: NamespaceId,
name: &str,
) -> Result<Option<Namespace>> {
match txn.reborrow() {
Transaction::Command(cmd) => {
if let Some(ns) =
self.materialized.find_child_namespace_at(parent_id, name, cmd.version())
{
return Ok(Some(ns));
}
let all = CatalogStore::list_namespaces_all(&mut Transaction::Command(&mut *cmd))?;
Ok(all.into_iter().find(|ns| ns.local_name() == name && ns.parent_id() == parent_id))
}
Transaction::Admin(admin) => {
if let Some(ns) = admin.changes.namespace.iter().rev().find_map(|change| {
change.post
.as_ref()
.filter(|ns| ns.local_name() == name && ns.parent_id() == parent_id)
}) {
return Ok(Some(ns.clone()));
}
if let Some(ns) =
self.materialized.find_child_namespace_at(parent_id, name, admin.version())
{
return Ok(Some(ns));
}
let all = CatalogStore::list_namespaces_all(&mut Transaction::Admin(&mut *admin))?;
Ok(all.into_iter().find(|ns| ns.local_name() == name && ns.parent_id() == parent_id))
}
Transaction::Query(qry) => {
if let Some(ns) =
self.materialized.find_child_namespace_at(parent_id, name, qry.version())
{
return Ok(Some(ns));
}
let all = CatalogStore::list_namespaces_all(&mut Transaction::Query(&mut *qry))?;
Ok(all.into_iter().find(|ns| ns.local_name() == name && ns.parent_id() == parent_id))
}
Transaction::Test(t) => {
if let Some(ns) = t.inner.changes.namespace.iter().rev().find_map(|change| {
change.post
.as_ref()
.filter(|ns| ns.local_name() == name && ns.parent_id() == parent_id)
}) {
return Ok(Some(ns.clone()));
}
if let Some(ns) =
self.materialized.find_child_namespace_at(parent_id, name, t.inner.version())
{
return Ok(Some(ns));
}
let all = CatalogStore::list_namespaces_all(&mut Transaction::Admin(&mut *t.inner))?;
Ok(all.into_iter().find(|ns| ns.local_name() == name && ns.parent_id() == parent_id))
}
Transaction::Replica(rep) => {
if let Some(ns) =
self.materialized.find_child_namespace_at(parent_id, name, rep.version())
{
return Ok(Some(ns));
}
let all = CatalogStore::list_namespaces_all(&mut Transaction::Replica(&mut *rep))?;
Ok(all.into_iter().find(|ns| ns.local_name() == name && ns.parent_id() == parent_id))
}
}
}
pub fn find_namespace_by_segments(
&self,
txn: &mut Transaction<'_>,
segments: &[&str],
) -> Result<Option<Namespace>> {
if segments.is_empty() {
return self.find_namespace_by_name(txn, "default");
}
let mut current = match self.find_namespace_by_name(txn, segments[0])? {
Some(ns) => ns,
None => return Ok(None),
};
for &segment in &segments[1..] {
match self.find_child_namespace(txn, current.id(), segment)? {
Some(ns) => current = ns,
None => return Ok(None),
}
}
Ok(Some(current))
}
pub fn find_namespace_by_path(&self, txn: &mut Transaction<'_>, path: &str) -> Result<Option<Namespace>> {
let segments: Vec<&str> = path.split("::").collect();
self.find_namespace_by_segments(txn, &segments)
}
#[instrument(name = "catalog::namespace::get", level = "trace", skip(self, txn))]
pub fn get_namespace(&self, txn: &mut Transaction<'_>, id: NamespaceId) -> Result<Namespace> {
self.find_namespace(txn, id)?.ok_or_else(|| {
error!(internal!(
"Namespace with ID {} not found in catalog. This indicates a critical catalog inconsistency.",
id
))
})
}
#[instrument(name = "catalog::namespace::get_by_name", level = "trace", skip(self, txn, name))]
pub fn get_namespace_by_name(
&self,
txn: &mut Transaction<'_>,
name: impl Into<Fragment> + Send,
) -> Result<Namespace> {
let name = name.into();
self.find_namespace_by_name(txn, name.text())?.ok_or_else(|| {
CatalogError::NotFound {
kind: CatalogObjectKind::Namespace,
namespace: name.text().to_string(),
name: name.text().to_string(),
fragment: name.clone(),
}
.into()
})
}
#[instrument(name = "catalog::namespace::create", level = "debug", skip(self, txn, to_create))]
pub fn create_namespace(&self, txn: &mut AdminTransaction, to_create: NamespaceToCreate) -> Result<Namespace> {
let namespace = CatalogStore::create_namespace(txn, to_create.into())?;
txn.track_namespace_created(namespace.clone())?;
Ok(namespace)
}
pub fn create_namespace_with_id(
&self,
txn: &mut AdminTransaction,
namespace_id: NamespaceId,
to_create: NamespaceToCreate,
) -> Result<Namespace> {
let namespace = CatalogStore::create_namespace_with_id(txn, namespace_id, to_create.into())?;
txn.track_namespace_created(namespace.clone())?;
Ok(namespace)
}
#[instrument(name = "catalog::namespace::drop", level = "debug", skip(self, txn))]
pub fn drop_namespace(&self, txn: &mut AdminTransaction, namespace: Namespace) -> Result<()> {
CatalogStore::drop_namespace(txn, namespace.id())?;
txn.track_namespace_deleted(namespace)?;
Ok(())
}
#[instrument(name = "catalog::namespace::list_all", level = "debug", skip(self, txn))]
pub fn list_namespaces_all(&self, txn: &mut Transaction<'_>) -> Result<Vec<Namespace>> {
CatalogStore::list_namespaces_all(txn)
}
#[instrument(name = "catalog::namespace::update_grpc", level = "debug", skip(self, txn))]
pub fn update_namespace_grpc(
&self,
txn: &mut AdminTransaction,
namespace_id: NamespaceId,
grpc: Option<String>,
) -> Result<()> {
CatalogStore::update_namespace_grpc(txn, namespace_id, grpc)?;
let updated = CatalogStore::get_namespace(&mut Transaction::Admin(&mut *txn), namespace_id)?;
txn.track_namespace_created(updated)?;
Ok(())
}
}