use reifydb_core::{
interface::{
catalog::{flow::FlowId, id::NamespaceId},
cdc::CdcConsumerId,
},
key::{
cdc_consumer::CdcConsumerKey, flow::FlowKey, flow_version::FlowVersionKey,
namespace_flow::NamespaceFlowKey,
},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use crate::{CatalogStore, Result};
impl CatalogStore {
pub(crate) fn drop_flow_by_name(txn: &mut AdminTransaction, namespace: NamespaceId, name: &str) -> Result<()> {
if let Some(flow) =
CatalogStore::find_flow_by_name(&mut Transaction::Admin(&mut *txn), namespace, name)?
{
CatalogStore::drop_flow(txn, flow.id)?;
}
Ok(())
}
pub(crate) fn drop_flow(txn: &mut AdminTransaction, flow_id: FlowId) -> Result<()> {
let flow = CatalogStore::find_flow(&mut Transaction::Admin(&mut *txn), flow_id)?;
if let Some(flow) = flow {
let nodes = CatalogStore::list_flow_nodes_by_flow(&mut Transaction::Admin(&mut *txn), flow_id)?;
for node in nodes {
CatalogStore::drop_flow_node(txn, node.id)?;
}
let edges = CatalogStore::list_flow_edges_by_flow(&mut Transaction::Admin(&mut *txn), flow_id)?;
for edge in edges {
CatalogStore::drop_flow_edge(txn, edge.id)?;
}
txn.remove(&FlowVersionKey::encoded(flow_id))?;
txn.remove(&CdcConsumerKey::encoded(CdcConsumerId::new(format!("flow:{}", flow_id.0))))?;
txn.remove(&NamespaceFlowKey::encoded(flow.namespace, flow_id))?;
txn.remove(&FlowKey::encoded(flow_id))?;
}
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::interface::catalog::flow::FlowId;
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use crate::{
CatalogStore,
test_utils::{create_flow, create_flow_edge, create_flow_node, create_namespace},
};
#[test]
fn test_drop_flow() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = create_flow(&mut txn, "test_namespace", "drop_test_flow");
let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]);
let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id);
assert!(CatalogStore::find_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap().is_some());
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node1.id).unwrap().is_some());
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node2.id).unwrap().is_some());
assert!(CatalogStore::find_flow_edge(&mut Transaction::Admin(&mut txn), edge.id).unwrap().is_some());
CatalogStore::drop_flow(&mut txn, flow.id).unwrap();
assert!(CatalogStore::find_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap().is_none());
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node1.id).unwrap().is_none());
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node2.id).unwrap().is_none());
assert!(CatalogStore::find_flow_edge(&mut Transaction::Admin(&mut txn), edge.id).unwrap().is_none());
}
#[test]
fn test_drop_nonexistent_flow() {
let mut txn = create_test_admin_transaction();
CatalogStore::drop_flow(&mut txn, FlowId(999)).unwrap();
}
#[test]
fn test_drop_flow_by_name() {
let mut txn = create_test_admin_transaction();
let ns = create_namespace(&mut txn, "test_namespace");
let _flow = create_flow(&mut txn, "test_namespace", "named_flow");
assert!(CatalogStore::find_flow_by_name(&mut Transaction::Admin(&mut txn), ns.id(), "named_flow")
.unwrap()
.is_some());
CatalogStore::drop_flow_by_name(&mut txn, ns.id(), "named_flow").unwrap();
assert!(CatalogStore::find_flow_by_name(&mut Transaction::Admin(&mut txn), ns.id(), "named_flow")
.unwrap()
.is_none());
}
}