use reifydb_core::{
interface::catalog::flow::FlowNodeId,
key::{
flow_node::{FlowNodeByFlowKey, FlowNodeKey},
flow_node_internal_state::FlowNodeInternalStateKey,
flow_node_state::FlowNodeStateKey,
retention_strategy::OperatorRetentionStrategyKey,
},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use crate::{CatalogStore, Result};
impl CatalogStore {
pub(crate) fn drop_flow_node(txn: &mut AdminTransaction, node_id: FlowNodeId) -> Result<()> {
let node = CatalogStore::find_flow_node(&mut Transaction::Admin(&mut *txn), node_id)?;
if let Some(node_def) = node {
let state_range = FlowNodeStateKey::node_range(node_id);
let mut state_stream = txn.range(state_range, 1024)?;
let mut state_keys = Vec::new();
for entry in state_stream.by_ref() {
state_keys.push(entry?.key.clone());
}
drop(state_stream);
for key in state_keys {
txn.remove(&key)?;
}
let internal_range = FlowNodeInternalStateKey::node_range(node_id);
let mut internal_stream = txn.range(internal_range, 1024)?;
let mut internal_keys = Vec::new();
for entry in internal_stream.by_ref() {
internal_keys.push(entry?.key.clone());
}
drop(internal_stream);
for key in internal_keys {
txn.remove(&key)?;
}
txn.remove(&OperatorRetentionStrategyKey::encoded(node_id))?;
txn.remove(&FlowNodeKey::encoded(node_id))?;
txn.remove(&FlowNodeByFlowKey::encoded(node_def.flow, node_id))?;
}
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::{
encoded::row::EncodedRow,
interface::catalog::flow::FlowNodeId,
key::{flow_node_internal_state::FlowNodeInternalStateKey, flow_node_state::FlowNodeStateKey},
};
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::util::cowvec::CowVec;
use crate::{
CatalogStore,
test_utils::{create_flow_node, create_namespace, ensure_test_flow},
};
#[test]
fn test_drop_flow_node() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node.id).unwrap().is_some());
CatalogStore::drop_flow_node(&mut txn, node.id).unwrap();
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node.id).unwrap().is_none());
}
#[test]
fn test_drop_node_removes_from_index() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
let nodes = CatalogStore::list_flow_nodes_by_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap();
assert_eq!(nodes.len(), 1);
CatalogStore::drop_flow_node(&mut txn, node.id).unwrap();
let nodes = CatalogStore::list_flow_nodes_by_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap();
assert!(nodes.is_empty());
}
#[test]
fn test_drop_nonexistent_node() {
let mut txn = create_test_admin_transaction();
CatalogStore::drop_flow_node(&mut txn, FlowNodeId(999)).unwrap();
}
#[test]
fn test_drop_one_node_keeps_others() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]);
CatalogStore::drop_flow_node(&mut txn, node1.id).unwrap();
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node1.id).unwrap().is_none());
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node2.id).unwrap().is_some());
let nodes = CatalogStore::list_flow_nodes_by_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].id, node2.id);
}
#[test]
fn test_drop_flow_node_cleans_up_state() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
let dummy_value = EncodedRow(CowVec::new(vec![42u8]));
txn.set(&FlowNodeStateKey::encoded(node.id, vec![1u8]), dummy_value.clone()).unwrap();
txn.set(&FlowNodeInternalStateKey::encoded(node.id, vec![1u8]), dummy_value.clone()).unwrap();
assert!(txn.get(&FlowNodeStateKey::encoded(node.id, vec![1u8])).unwrap().is_some());
assert!(txn.get(&FlowNodeInternalStateKey::encoded(node.id, vec![1u8])).unwrap().is_some());
CatalogStore::drop_flow_node(&mut txn, node.id).unwrap();
assert!(txn.get(&FlowNodeStateKey::encoded(node.id, vec![1u8])).unwrap().is_none());
assert!(txn.get(&FlowNodeInternalStateKey::encoded(node.id, vec![1u8])).unwrap().is_none());
assert!(CatalogStore::find_flow_node(&mut Transaction::Admin(&mut txn), node.id).unwrap().is_none());
}
}