use std::collections::HashSet;
use reifydb_catalog::error::{CatalogError, CatalogObjectKind};
use reifydb_core::value::column::columns::Columns;
use reifydb_rql::{flow::node::FlowNodeType, nodes::DropNamespaceNode};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::value::{Value, constraint::Constraint};
use super::dependent::{find_column_dependents, find_flow_dependents};
use crate::{Result, vm::services::Services};
pub(crate) fn drop_namespace(
services: &Services,
txn: &mut AdminTransaction,
plan: DropNamespaceNode,
) -> Result<Columns> {
let Some(namespace_id) = plan.namespace_id else {
return Ok(Columns::single_row([
("namespace", Value::Utf8(plan.namespace_name.text().to_string())),
("dropped", Value::Boolean(false)),
]));
};
let def = services.catalog.get_namespace(&mut Transaction::Admin(txn), namespace_id)?;
let all_namespaces = services.catalog.list_namespaces_all(&mut Transaction::Admin(txn))?;
let mut descendant_ids: HashSet<_> = HashSet::new();
descendant_ids.insert(namespace_id);
loop {
let mut changed = false;
for ns in &all_namespaces {
if !descendant_ids.contains(&ns.id()) && descendant_ids.contains(&ns.parent_id()) {
descendant_ids.insert(ns.id());
changed = true;
}
}
if !changed {
break;
}
}
let mut dependents = Vec::new();
let mut dictionaries = Vec::new();
let mut sumtypes = Vec::new();
for &ns_id in &descendant_ids {
dictionaries.extend(services.catalog.list_dictionaries(&mut Transaction::Admin(txn), ns_id)?);
sumtypes.extend(services.catalog.list_sumtypes(&mut Transaction::Admin(txn), ns_id)?);
}
let dictionary_ids: HashSet<_> = dictionaries.iter().map(|d| d.id).collect();
let sumtype_ids: HashSet<_> = sumtypes.iter().map(|s| s.id).collect();
if !dictionary_ids.is_empty() || !sumtype_ids.is_empty() {
let columns = services.catalog.list_columns_all(&mut Transaction::Admin(txn))?;
dependents.extend(find_column_dependents(&services.catalog, txn, &columns, |info| {
if descendant_ids.contains(&info.namespace) {
return None;
}
if let Some(dict_id) = info.column.dictionary_id
&& dictionary_ids.contains(&dict_id)
{
let name = dictionaries
.iter()
.find(|d| d.id == dict_id)
.map(|d| d.name.as_str())
.unwrap_or("?");
return Some(format!(" references dictionary `{}`", name));
}
None
})?);
dependents.extend(find_column_dependents(&services.catalog, txn, &columns, |info| {
if descendant_ids.contains(&info.namespace) {
return None;
}
if let Some(Constraint::SumType(id)) = info.column.constraint.constraint()
&& sumtype_ids.contains(id)
{
let name =
sumtypes.iter().find(|s| s.id == *id).map(|s| s.name.as_str()).unwrap_or("?");
return Some(format!(" references enum `{}`", name));
}
None
})?);
}
let all_tables = services.catalog.list_tables_all(&mut Transaction::Admin(txn))?;
let all_views = services.catalog.list_views_all(&mut Transaction::Admin(txn))?;
let all_ringbuffers = services.catalog.list_ringbuffers_all(&mut Transaction::Admin(txn))?;
let table_ids: HashSet<_> =
all_tables.iter().filter(|t| descendant_ids.contains(&t.namespace)).map(|t| t.id).collect();
let view_ids: HashSet<_> =
all_views.iter().filter(|v| descendant_ids.contains(&v.namespace())).map(|v| v.id()).collect();
let ringbuffer_ids: HashSet<_> =
all_ringbuffers.iter().filter(|r| descendant_ids.contains(&r.namespace)).map(|r| r.id).collect();
if !table_ids.is_empty() || !view_ids.is_empty() || !ringbuffer_ids.is_empty() {
let nodes = services.catalog.list_flow_nodes_all(&mut Transaction::Admin(txn))?;
let flows = services.catalog.list_flows_all(&mut Transaction::Admin(txn))?;
let external_nodes: Vec<_> = nodes
.iter()
.filter(|n| {
flows.iter()
.find(|f| f.id == n.flow)
.map(|f| !descendant_ids.contains(&f.namespace))
.unwrap_or(false)
})
.cloned()
.collect();
dependents.extend(find_flow_dependents(
&services.catalog,
txn,
&external_nodes,
&flows,
|node_type| matches!(node_type, FlowNodeType::SourceTable { table } if table_ids.contains(table)),
)?);
dependents.extend(find_flow_dependents(
&services.catalog,
txn,
&external_nodes,
&flows,
|node_type| {
matches!(node_type, FlowNodeType::SourceView { view } if view_ids.contains(view))
|| matches!(node_type, FlowNodeType::SinkTableView { view, .. } | FlowNodeType::SinkRingBufferView { view, .. } | FlowNodeType::SinkSeriesView { view, .. } if view_ids.contains(view))
},
)?);
dependents.extend(find_flow_dependents(&services.catalog, txn, &external_nodes, &flows, |node_type| {
matches!(node_type, FlowNodeType::SourceRingBuffer { ringbuffer } if ringbuffer_ids.contains(ringbuffer))
})?);
}
if !dependents.is_empty() {
let dependents_str = dependents.join(", ");
return Err(CatalogError::InUse {
kind: CatalogObjectKind::Namespace,
namespace: plan.namespace_name.text().to_string(),
name: None,
dependents: dependents_str,
fragment: plan.namespace_name.clone(),
}
.into());
}
services.catalog.drop_namespace(txn, def)?;
Ok(Columns::single_row([
("namespace", Value::Utf8(plan.namespace_name.text().to_string())),
("dropped", Value::Boolean(true)),
]))
}