use reifydb_core::{
interface::catalog::flow::FlowNode,
key::flow_node::{FlowNodeByFlowKey, FlowNodeKey},
};
use reifydb_transaction::transaction::admin::AdminTransaction;
use crate::{
CatalogStore, Result,
store::flow_node::shape::{flow_node, flow_node_by_flow},
};
impl CatalogStore {
pub(crate) fn create_flow_node(txn: &mut AdminTransaction, node_def: &FlowNode) -> Result<()> {
let mut row = flow_node::SHAPE.allocate();
flow_node::SHAPE.set_u64(&mut row, flow_node::ID, node_def.id);
flow_node::SHAPE.set_u64(&mut row, flow_node::FLOW, node_def.flow);
flow_node::SHAPE.set_u8(&mut row, flow_node::TYPE, node_def.node_type);
flow_node::SHAPE.set_blob(&mut row, flow_node::DATA, &node_def.data);
txn.set(&FlowNodeKey::encoded(node_def.id), row)?;
let mut index_row = flow_node_by_flow::SHAPE.allocate();
flow_node_by_flow::SHAPE.set_u64(&mut index_row, flow_node_by_flow::FLOW, node_def.flow);
flow_node_by_flow::SHAPE.set_u64(&mut index_row, flow_node_by_flow::ID, node_def.id);
txn.set(&FlowNodeByFlowKey::encoded(node_def.flow, node_def.id), index_row)?;
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::interface::catalog::flow::FlowNode;
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::value::blob::Blob;
use crate::{
CatalogStore,
store::sequence::flow::next_flow_node_id,
test_utils::{create_flow, create_namespace, ensure_test_flow},
};
#[test]
fn test_create_flow_node() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node_id = next_flow_node_id(&mut txn).unwrap();
let node_def = FlowNode {
id: node_id,
flow: flow.id,
node_type: 1, data: Blob::from([0x01u8, 0x02, 0x03].as_slice()),
};
CatalogStore::create_flow_node(&mut txn, &node_def).unwrap();
let result = CatalogStore::get_flow_node(&mut Transaction::Admin(&mut txn), node_id).unwrap();
assert_eq!(result.id, node_id);
assert_eq!(result.flow, flow.id);
assert_eq!(result.node_type, 1);
assert_eq!(result.data.as_bytes(), &[0x01, 0x02, 0x03]);
}
#[test]
fn test_create_multiple_nodes_same_flow() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node1_id = next_flow_node_id(&mut txn).unwrap();
let node1 = FlowNode {
id: node1_id,
flow: flow.id,
node_type: 1, data: Blob::from([0x01u8].as_slice()),
};
CatalogStore::create_flow_node(&mut txn, &node1).unwrap();
let node2_id = next_flow_node_id(&mut txn).unwrap();
let node2 = FlowNode {
id: node2_id,
flow: flow.id,
node_type: 4, data: Blob::from([0x02u8].as_slice()),
};
CatalogStore::create_flow_node(&mut txn, &node2).unwrap();
let result1 = CatalogStore::get_flow_node(&mut Transaction::Admin(&mut txn), node1_id).unwrap();
let result2 = CatalogStore::get_flow_node(&mut Transaction::Admin(&mut txn), node2_id).unwrap();
assert_eq!(result1.node_type, 1);
assert_eq!(result2.node_type, 4);
}
#[test]
fn test_create_nodes_different_flows() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow1 = create_flow(&mut txn, "test_namespace", "flow_one");
let flow2 = create_flow(&mut txn, "test_namespace", "flow_two");
let node1_id = next_flow_node_id(&mut txn).unwrap();
let node1 = FlowNode {
id: node1_id,
flow: flow1.id,
node_type: 1,
data: Blob::from([0x01u8].as_slice()),
};
CatalogStore::create_flow_node(&mut txn, &node1).unwrap();
let node2_id = next_flow_node_id(&mut txn).unwrap();
let node2 = FlowNode {
id: node2_id,
flow: flow2.id,
node_type: 1,
data: Blob::from([0x02u8].as_slice()),
};
CatalogStore::create_flow_node(&mut txn, &node2).unwrap();
let result1 = CatalogStore::get_flow_node(&mut Transaction::Admin(&mut txn), node1_id).unwrap();
let result2 = CatalogStore::get_flow_node(&mut Transaction::Admin(&mut txn), node2_id).unwrap();
assert_eq!(result1.flow, flow1.id);
assert_eq!(result2.flow, flow2.id);
}
#[test]
fn test_node_appears_in_index() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node_id = next_flow_node_id(&mut txn).unwrap();
let node_def = FlowNode {
id: node_id,
flow: flow.id,
node_type: 1,
data: Blob::from([0x01u8].as_slice()),
};
CatalogStore::create_flow_node(&mut txn, &node_def).unwrap();
let nodes = CatalogStore::list_flow_nodes_by_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].id, node_id);
}
}