use reifydb_core::interface::catalog::{
change::CatalogTrackSinkChangeOperations,
id::{NamespaceId, SinkId},
sink::Sink,
};
use reifydb_type::Result;
use crate::{
change::{
Change,
OperationType::{Create, Delete},
TransactionalSinkChanges,
},
transaction::admin::AdminTransaction,
};
impl CatalogTrackSinkChangeOperations for AdminTransaction {
fn track_sink_created(&mut self, sink: Sink) -> Result<()> {
let change = Change {
pre: None,
post: Some(sink),
op: Create,
};
self.changes.add_sink_change(change);
Ok(())
}
fn track_sink_deleted(&mut self, sink: Sink) -> Result<()> {
let change = Change {
pre: Some(sink),
post: None,
op: Delete,
};
self.changes.add_sink_change(change);
Ok(())
}
}
impl TransactionalSinkChanges for AdminTransaction {
fn find_sink(&self, id: SinkId) -> Option<&Sink> {
for change in self.changes.sink.iter().rev() {
if let Some(sink) = &change.post
&& sink.id == id
{
return Some(sink);
}
if let Some(sink) = &change.pre
&& sink.id == id && change.op == Delete
{
return None;
}
}
None
}
fn find_sink_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Sink> {
for change in self.changes.sink.iter().rev() {
if let Some(sink) = &change.post
&& sink.namespace == namespace
&& sink.name == name
{
return Some(sink);
}
if let Some(sink) = &change.pre
&& sink.namespace == namespace
&& sink.name == name && change.op == Delete
{
return None;
}
}
None
}
fn is_sink_deleted(&self, id: SinkId) -> bool {
self.changes
.sink
.iter()
.any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id == id).unwrap_or(false))
}
fn is_sink_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
self.changes.sink.iter().any(|change| {
change.op == Delete
&& change
.pre
.as_ref()
.map(|s| s.namespace == namespace && s.name == name)
.unwrap_or(false)
})
}
}