reifydb_catalog/transaction/
flow.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{FlowDef, FlowId, NamespaceId, QueryTransaction},
6	return_error,
7};
8use reifydb_type::{IntoFragment, diagnostic::catalog::flow_not_found};
9use tracing::instrument;
10
11use crate::{CatalogStore, transaction::MaterializedCatalogTransaction};
12
13pub trait CatalogFlowQueryOperations {
14	fn find_flow(&mut self, id: FlowId) -> crate::Result<Option<FlowDef>>;
15
16	fn find_flow_by_name<'a>(
17		&mut self,
18		namespace: NamespaceId,
19		name: impl IntoFragment<'a>,
20	) -> crate::Result<Option<FlowDef>>;
21
22	fn get_flow(&mut self, id: FlowId) -> crate::Result<FlowDef>;
23
24	fn get_flow_by_name<'a>(
25		&mut self,
26		namespace: NamespaceId,
27		name: impl IntoFragment<'a>,
28	) -> crate::Result<FlowDef>;
29}
30
31impl<QT: QueryTransaction + MaterializedCatalogTransaction> CatalogFlowQueryOperations for QT {
32	#[instrument(level = "trace", skip(self))]
33	fn find_flow(&mut self, id: FlowId) -> crate::Result<Option<FlowDef>> {
34		CatalogStore::find_flow(self, id)
35	}
36
37	#[instrument(level = "trace", skip(self, name))]
38	fn find_flow_by_name<'a>(
39		&mut self,
40		namespace: NamespaceId,
41		name: impl IntoFragment<'a>,
42	) -> crate::Result<Option<FlowDef>> {
43		let name = name.into_fragment();
44		CatalogStore::find_flow_by_name(self, namespace, name.text())
45	}
46
47	#[instrument(level = "trace", skip(self))]
48	fn get_flow(&mut self, id: FlowId) -> crate::Result<FlowDef> {
49		CatalogStore::get_flow(self, id)
50	}
51
52	#[instrument(level = "trace", skip(self, name))]
53	fn get_flow_by_name<'a>(
54		&mut self,
55		namespace: NamespaceId,
56		name: impl IntoFragment<'a>,
57	) -> crate::Result<FlowDef> {
58		let name = name.into_fragment();
59		let name_text = name.text().to_string();
60		let flow = self.find_flow_by_name(namespace, name.clone())?;
61		match flow {
62			Some(f) => Ok(f),
63			None => {
64				let namespace = CatalogStore::get_namespace(self, namespace)?;
65				return_error!(flow_not_found(name, &namespace.name, &name_text))
66			}
67		}
68	}
69}
70
71pub trait CatalogTrackFlowChangeOperations {
72	fn track_flow_def_created(&mut self, flow: FlowDef) -> crate::Result<()>;
73
74	fn track_flow_def_updated(&mut self, pre: FlowDef, post: FlowDef) -> crate::Result<()>;
75
76	fn track_flow_def_deleted(&mut self, flow: FlowDef) -> crate::Result<()>;
77}