use reifydb_core::{
interface::catalog::flow::FlowEdge,
key::flow_edge::{FlowEdgeByFlowKey, FlowEdgeKey},
};
use reifydb_transaction::transaction::admin::AdminTransaction;
use crate::{
CatalogStore, Result,
store::flow_edge::shape::{flow_edge, flow_edge_by_flow},
};
impl CatalogStore {
pub(crate) fn create_flow_edge(txn: &mut AdminTransaction, edge_def: &FlowEdge) -> Result<()> {
let mut row = flow_edge::SHAPE.allocate();
flow_edge::SHAPE.set_u64(&mut row, flow_edge::ID, edge_def.id);
flow_edge::SHAPE.set_u64(&mut row, flow_edge::FLOW, edge_def.flow);
flow_edge::SHAPE.set_u64(&mut row, flow_edge::SOURCE, edge_def.source);
flow_edge::SHAPE.set_u64(&mut row, flow_edge::TARGET, edge_def.target);
txn.set(&FlowEdgeKey::encoded(edge_def.id), row)?;
let mut index_row = flow_edge_by_flow::SHAPE.allocate();
flow_edge_by_flow::SHAPE.set_u64(&mut index_row, flow_edge_by_flow::FLOW, edge_def.flow);
flow_edge_by_flow::SHAPE.set_u64(&mut index_row, flow_edge_by_flow::ID, edge_def.id);
txn.set(&FlowEdgeByFlowKey::encoded(edge_def.flow, edge_def.id), index_row)?;
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use crate::{
CatalogStore,
test_utils::{create_flow, create_flow_edge, create_flow_node, create_namespace, ensure_test_flow},
};
#[test]
fn test_create_flow_edge() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]);
let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id);
let result = CatalogStore::get_flow_edge(&mut Transaction::Admin(&mut txn), edge.id).unwrap();
assert_eq!(result.id, edge.id);
assert_eq!(result.flow, flow.id);
assert_eq!(result.source, node1.id);
assert_eq!(result.target, node2.id);
}
#[test]
fn test_create_multiple_edges_same_flow() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]);
let node3 = create_flow_node(&mut txn, flow.id, 5, &[0x03]);
let edge1 = create_flow_edge(&mut txn, flow.id, node1.id, node2.id);
let edge2 = create_flow_edge(&mut txn, flow.id, node2.id, node3.id);
let result1 = CatalogStore::get_flow_edge(&mut Transaction::Admin(&mut txn), edge1.id).unwrap();
let result2 = CatalogStore::get_flow_edge(&mut Transaction::Admin(&mut txn), edge2.id).unwrap();
assert_eq!(result1.source, node1.id);
assert_eq!(result1.target, node2.id);
assert_eq!(result2.source, node2.id);
assert_eq!(result2.target, node3.id);
}
#[test]
fn test_create_edges_different_flows() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow1 = create_flow(&mut txn, "test_namespace", "flow_one");
let flow2 = create_flow(&mut txn, "test_namespace", "flow_two");
let node1a = create_flow_node(&mut txn, flow1.id, 1, &[0x01]);
let node1b = create_flow_node(&mut txn, flow1.id, 4, &[0x02]);
let node2a = create_flow_node(&mut txn, flow2.id, 1, &[0x03]);
let node2b = create_flow_node(&mut txn, flow2.id, 4, &[0x04]);
let edge1 = create_flow_edge(&mut txn, flow1.id, node1a.id, node1b.id);
let edge2 = create_flow_edge(&mut txn, flow2.id, node2a.id, node2b.id);
let result1 = CatalogStore::get_flow_edge(&mut Transaction::Admin(&mut txn), edge1.id).unwrap();
let result2 = CatalogStore::get_flow_edge(&mut Transaction::Admin(&mut txn), edge2.id).unwrap();
assert_eq!(result1.flow, flow1.id);
assert_eq!(result2.flow, flow2.id);
}
#[test]
fn test_edge_appears_in_index() {
let mut txn = create_test_admin_transaction();
let _namespace = create_namespace(&mut txn, "test_namespace");
let flow = ensure_test_flow(&mut txn);
let node1 = create_flow_node(&mut txn, flow.id, 1, &[0x01]);
let node2 = create_flow_node(&mut txn, flow.id, 4, &[0x02]);
let edge = create_flow_edge(&mut txn, flow.id, node1.id, node2.id);
let edges = CatalogStore::list_flow_edges_by_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].id, edge.id);
}
}