Skip to main content

reifydb_transaction/transaction/catalog/
sink.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::catalog::{
5	change::CatalogTrackSinkChangeOperations,
6	id::{NamespaceId, SinkId},
7	sink::Sink,
8};
9use reifydb_type::Result;
10
11use crate::{
12	change::{
13		Change,
14		OperationType::{Create, Delete},
15		TransactionalSinkChanges,
16	},
17	transaction::admin::AdminTransaction,
18};
19
20impl CatalogTrackSinkChangeOperations for AdminTransaction {
21	fn track_sink_created(&mut self, sink: Sink) -> Result<()> {
22		let change = Change {
23			pre: None,
24			post: Some(sink),
25			op: Create,
26		};
27		self.changes.add_sink_change(change);
28		Ok(())
29	}
30
31	fn track_sink_deleted(&mut self, sink: Sink) -> Result<()> {
32		let change = Change {
33			pre: Some(sink),
34			post: None,
35			op: Delete,
36		};
37		self.changes.add_sink_change(change);
38		Ok(())
39	}
40}
41
42impl TransactionalSinkChanges for AdminTransaction {
43	fn find_sink(&self, id: SinkId) -> Option<&Sink> {
44		for change in self.changes.sink.iter().rev() {
45			if let Some(sink) = &change.post
46				&& sink.id == id
47			{
48				return Some(sink);
49			}
50			if let Some(sink) = &change.pre
51				&& sink.id == id && change.op == Delete
52			{
53				return None;
54			}
55		}
56		None
57	}
58
59	fn find_sink_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Sink> {
60		for change in self.changes.sink.iter().rev() {
61			if let Some(sink) = &change.post
62				&& sink.namespace == namespace
63				&& sink.name == name
64			{
65				return Some(sink);
66			}
67			if let Some(sink) = &change.pre
68				&& sink.namespace == namespace
69				&& sink.name == name && change.op == Delete
70			{
71				return None;
72			}
73		}
74		None
75	}
76
77	fn is_sink_deleted(&self, id: SinkId) -> bool {
78		self.changes
79			.sink
80			.iter()
81			.any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id == id).unwrap_or(false))
82	}
83
84	fn is_sink_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
85		self.changes.sink.iter().any(|change| {
86			change.op == Delete
87				&& change
88					.pre
89					.as_ref()
90					.map(|s| s.namespace == namespace && s.name == name)
91					.unwrap_or(false)
92		})
93	}
94}