use std::collections::HashSet;
use postcard::from_bytes;
use reifydb_catalog::{catalog::Catalog, store::column::list::ColumnInfo};
use reifydb_core::{
interface::catalog::flow::{Flow, FlowNode},
internal_error,
};
use reifydb_rql::flow::node::FlowNodeType;
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use crate::Result;
pub(crate) fn find_column_dependents(
catalog: &Catalog,
txn: &mut AdminTransaction,
columns: &[ColumnInfo],
check: impl Fn(&ColumnInfo) -> Option<String>,
) -> Result<Vec<String>> {
let mut dependents = Vec::new();
for info in columns {
if let Some(suffix) = check(info) {
let ns = catalog.find_namespace(&mut Transaction::Admin(txn), info.namespace)?;
let ns_name = ns.map(|n| n.name().to_string()).unwrap_or_else(|| "?".to_string());
let mut desc = format!(
"column `{}` in {} `{}.{}`",
info.column.name, info.entity_kind, ns_name, info.entity_name
);
if !suffix.is_empty() {
desc.push_str(&suffix);
}
dependents.push(desc);
}
}
Ok(dependents)
}
pub(crate) fn find_flow_dependents(
catalog: &Catalog,
txn: &mut AdminTransaction,
nodes: &[FlowNode],
flows: &[Flow],
check: impl Fn(&FlowNodeType) -> bool,
) -> Result<Vec<String>> {
let mut dependents = Vec::new();
let mut seen_flows = HashSet::new();
for node in nodes {
let node_type: FlowNodeType = from_bytes(node.data.as_ref())
.map_err(|e| internal_error!("Failed to deserialize flow node type: {}", e))?;
if check(&node_type)
&& seen_flows.insert(node.flow)
&& let Some(flow) = flows.iter().find(|f| f.id == node.flow)
{
let ns = catalog.find_namespace(&mut Transaction::Admin(txn), flow.namespace)?;
let ns_name = ns.map(|n| n.name().to_string()).unwrap_or_else(|| "?".to_string());
dependents.push(format!("flow `{}.{}`", ns_name, flow.name));
}
}
Ok(dependents)
}