use reifydb_core::{
interface::catalog::{
flow::{Flow, FlowId, FlowStatus},
id::NamespaceId,
},
key::{flow::FlowKey, namespace_flow::NamespaceFlowKey},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::value::duration::Duration;
use crate::{
CatalogStore, Result,
store::flow::shape::{flow, flow_namespace},
};
impl CatalogStore {
pub(crate) fn find_flow(rx: &mut Transaction<'_>, id: FlowId) -> Result<Option<Flow>> {
let Some(multi) = rx.get(&FlowKey::encoded(id))? else {
return Ok(None);
};
let row = multi.row;
let id = FlowId(flow::SHAPE.get_u64(&row, flow::ID));
let namespace = NamespaceId(flow::SHAPE.get_u64(&row, flow::NAMESPACE));
let name = flow::SHAPE.get_utf8(&row, flow::NAME).to_string();
let status_u8 = flow::SHAPE.get_u8(&row, flow::STATUS);
let status = FlowStatus::from_u8(status_u8);
let tick_nanos = flow::SHAPE.get_u64(&row, flow::TICK_NANOS);
let tick = if tick_nanos > 0 {
Some(Duration::from_nanoseconds(tick_nanos as i64)?)
} else {
None
};
Ok(Some(Flow {
id,
name,
namespace,
status,
tick,
}))
}
pub(crate) fn find_flow_by_name(
rx: &mut Transaction<'_>,
namespace: NamespaceId,
name: impl AsRef<str>,
) -> Result<Option<Flow>> {
let name = name.as_ref();
let mut stream = rx.range(NamespaceFlowKey::full_scan(namespace), 1024)?;
let mut found_flow = None;
for entry in stream.by_ref() {
let multi = entry?;
let row = &multi.row;
let flow_name = flow_namespace::SHAPE.get_utf8(row, flow_namespace::NAME);
if name == flow_name {
found_flow = Some(FlowId(flow_namespace::SHAPE.get_u64(row, flow_namespace::ID)));
break;
}
}
drop(stream);
let Some(flow) = found_flow else {
return Ok(None);
};
Ok(Some(Self::get_flow(rx, flow)?))
}
}
#[cfg(test)]
pub mod tests {
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use crate::{
CatalogStore,
test_utils::{create_flow, create_namespace, ensure_test_namespace},
};
#[test]
fn test_find_flow_by_name_ok() {
let mut txn = create_test_admin_transaction();
let _namespace_one = create_namespace(&mut txn, "namespace_one");
let namespace_two = create_namespace(&mut txn, "namespace_two");
create_flow(&mut txn, "namespace_one", "flow_one");
create_flow(&mut txn, "namespace_two", "flow_two");
let result = CatalogStore::find_flow_by_name(
&mut Transaction::Admin(&mut txn),
namespace_two.id(),
"flow_two",
)
.unwrap()
.unwrap();
assert_eq!(result.name, "flow_two");
assert_eq!(result.namespace, namespace_two.id());
}
#[test]
fn test_find_flow_by_name_empty() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
let result = CatalogStore::find_flow_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"some_flow",
)
.unwrap();
assert!(result.is_none());
}
#[test]
fn test_find_flow_by_name_not_found() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
create_flow(&mut txn, "test_namespace", "flow_one");
create_flow(&mut txn, "test_namespace", "flow_two");
let result = CatalogStore::find_flow_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"flow_three",
)
.unwrap();
assert!(result.is_none());
}
#[test]
fn test_find_flow_by_name_different_namespace() {
let mut txn = create_test_admin_transaction();
let _namespace_one = create_namespace(&mut txn, "namespace_one");
let namespace_two = create_namespace(&mut txn, "namespace_two");
create_flow(&mut txn, "namespace_one", "my_flow");
let result = CatalogStore::find_flow_by_name(
&mut Transaction::Admin(&mut txn),
namespace_two.id(),
"my_flow",
)
.unwrap();
assert!(result.is_none());
}
#[test]
fn test_find_flow_by_name_case_sensitive() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
create_flow(&mut txn, "test_namespace", "MyFlow");
let result = CatalogStore::find_flow_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"myflow",
)
.unwrap();
assert!(result.is_none());
let result = CatalogStore::find_flow_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"MyFlow",
)
.unwrap();
assert!(result.is_some());
}
#[test]
fn test_find_flow_by_id() {
let mut txn = create_test_admin_transaction();
ensure_test_namespace(&mut txn);
let flow = create_flow(&mut txn, "test_namespace", "test_flow");
let result = CatalogStore::find_flow(&mut Transaction::Admin(&mut txn), flow.id).unwrap().unwrap();
assert_eq!(result.id, flow.id);
assert_eq!(result.name, "test_flow");
}
#[test]
fn test_find_flow_by_id_not_found() {
let mut txn = create_test_admin_transaction();
let result = CatalogStore::find_flow(&mut Transaction::Admin(&mut txn), 999.into()).unwrap();
assert!(result.is_none());
}
}