use reifydb_core::{
interface::catalog::{
flow::FlowStatus,
id::{NamespaceId, SinkId},
sink::Sink,
},
key::{namespace_sink::NamespaceSinkKey, sink::SinkKey},
};
use reifydb_transaction::transaction::Transaction;
use serde_json::from_str;
use crate::{
CatalogStore, Result,
store::sink::shape::{sink, sink_namespace},
};
impl CatalogStore {
pub(crate) fn find_sink(rx: &mut Transaction<'_>, id: SinkId) -> Result<Option<Sink>> {
let Some(multi) = rx.get(&SinkKey::encoded(id))? else {
return Ok(None);
};
let row = multi.row;
let id = SinkId(sink::SHAPE.get_u64(&row, sink::ID));
let namespace = NamespaceId(sink::SHAPE.get_u64(&row, sink::NAMESPACE));
let name = sink::SHAPE.get_utf8(&row, sink::NAME).to_string();
let source_namespace = NamespaceId(sink::SHAPE.get_u64(&row, sink::SOURCE_NAMESPACE));
let source_name = sink::SHAPE.get_utf8(&row, sink::SOURCE_NAME).to_string();
let connector = sink::SHAPE.get_utf8(&row, sink::CONNECTOR).to_string();
let config_json = sink::SHAPE.get_utf8(&row, sink::CONFIG);
let config: Vec<(String, String)> = from_str(config_json).unwrap_or_default();
let status_u8 = sink::SHAPE.get_u8(&row, sink::STATUS);
let status = FlowStatus::from_u8(status_u8);
Ok(Some(Sink {
id,
name,
namespace,
source_namespace,
source_name,
connector,
config,
status,
}))
}
pub(crate) fn find_sink_by_name(
rx: &mut Transaction<'_>,
namespace: NamespaceId,
name: impl AsRef<str>,
) -> Result<Option<Sink>> {
let name = name.as_ref();
let mut stream = rx.range(NamespaceSinkKey::full_scan(namespace), 1024)?;
let mut found_sink = None;
for entry in stream.by_ref() {
let multi = entry?;
let row = &multi.row;
let sink_name = sink_namespace::SHAPE.get_utf8(row, sink_namespace::NAME);
if name == sink_name {
found_sink = Some(SinkId(sink_namespace::SHAPE.get_u64(row, sink_namespace::ID)));
break;
}
}
drop(stream);
let Some(sink) = found_sink else {
return Ok(None);
};
Ok(Some(Self::get_sink(rx, sink)?))
}
}
#[cfg(test)]
pub mod tests {
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use crate::{
CatalogStore,
test_utils::{create_namespace, create_sink, ensure_test_namespace},
};
#[test]
fn test_find_sink_by_name_ok() {
let mut txn = create_test_admin_transaction();
let _namespace_one = create_namespace(&mut txn, "namespace_one");
let namespace_two = create_namespace(&mut txn, "namespace_two");
create_sink(&mut txn, "namespace_one", "sink_one", "kafka");
create_sink(&mut txn, "namespace_two", "sink_two", "postgres");
let result = CatalogStore::find_sink_by_name(
&mut Transaction::Admin(&mut txn),
namespace_two.id(),
"sink_two",
)
.unwrap()
.unwrap();
assert_eq!(result.name, "sink_two");
assert_eq!(result.namespace, namespace_two.id());
assert_eq!(result.connector, "postgres");
}
#[test]
fn test_find_sink_by_name_empty() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
let result = CatalogStore::find_sink_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"some_sink",
)
.unwrap();
assert!(result.is_none());
}
#[test]
fn test_find_sink_by_name_not_found() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
create_sink(&mut txn, "test_namespace", "sink_one", "kafka");
create_sink(&mut txn, "test_namespace", "sink_two", "postgres");
let result = CatalogStore::find_sink_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"sink_three",
)
.unwrap();
assert!(result.is_none());
}
#[test]
fn test_find_sink_by_name_different_namespace() {
let mut txn = create_test_admin_transaction();
let _namespace_one = create_namespace(&mut txn, "namespace_one");
let namespace_two = create_namespace(&mut txn, "namespace_two");
create_sink(&mut txn, "namespace_one", "my_sink", "kafka");
let result = CatalogStore::find_sink_by_name(
&mut Transaction::Admin(&mut txn),
namespace_two.id(),
"my_sink",
)
.unwrap();
assert!(result.is_none());
}
#[test]
fn test_find_sink_by_name_case_sensitive() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
create_sink(&mut txn, "test_namespace", "MySink", "kafka");
let result = CatalogStore::find_sink_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"mysink",
)
.unwrap();
assert!(result.is_none());
let result = CatalogStore::find_sink_by_name(
&mut Transaction::Admin(&mut txn),
test_namespace.id(),
"MySink",
)
.unwrap();
assert!(result.is_some());
}
#[test]
fn test_find_sink_by_id() {
let mut txn = create_test_admin_transaction();
ensure_test_namespace(&mut txn);
let sink = create_sink(&mut txn, "test_namespace", "test_sink", "kafka");
let result = CatalogStore::find_sink(&mut Transaction::Admin(&mut txn), sink.id).unwrap().unwrap();
assert_eq!(result.id, sink.id);
assert_eq!(result.name, "test_sink");
assert_eq!(result.connector, "kafka");
}
#[test]
fn test_find_sink_by_id_not_found() {
let mut txn = create_test_admin_transaction();
let result = CatalogStore::find_sink(&mut Transaction::Admin(&mut txn), 999.into()).unwrap();
assert!(result.is_none());
}
}