reifydb_catalog/transaction/
flow.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::{
6	interface::{FlowDef, FlowId, NamespaceId, QueryTransaction},
7	return_error,
8};
9use reifydb_type::{Fragment, diagnostic::catalog::flow_not_found};
10use tracing::instrument;
11
12use crate::{CatalogStore, transaction::MaterializedCatalogTransaction};
13
14#[async_trait]
15pub trait CatalogFlowQueryOperations: Send {
16	async fn find_flow(&mut self, id: FlowId) -> crate::Result<Option<FlowDef>>;
17
18	async fn find_flow_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<Option<FlowDef>>;
19
20	async fn get_flow(&mut self, id: FlowId) -> crate::Result<FlowDef>;
21
22	async fn get_flow_by_name(
23		&mut self,
24		namespace: NamespaceId,
25		name: impl Into<Fragment> + Send,
26	) -> crate::Result<FlowDef>;
27}
28
29#[async_trait]
30impl<QT: QueryTransaction + MaterializedCatalogTransaction + Send> CatalogFlowQueryOperations for QT {
31	#[instrument(name = "catalog::flow::find", level = "trace", skip(self))]
32	async fn find_flow(&mut self, id: FlowId) -> crate::Result<Option<FlowDef>> {
33		CatalogStore::find_flow(self, id).await
34	}
35
36	#[instrument(name = "catalog::flow::find_by_name", level = "trace", skip(self, name))]
37	async fn find_flow_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<Option<FlowDef>> {
38		CatalogStore::find_flow_by_name(self, namespace, name).await
39	}
40
41	#[instrument(name = "catalog::flow::get", level = "trace", skip(self))]
42	async fn get_flow(&mut self, id: FlowId) -> crate::Result<FlowDef> {
43		CatalogStore::get_flow(self, id).await
44	}
45
46	#[instrument(name = "catalog::flow::get_by_name", level = "trace", skip(self, name))]
47	async fn get_flow_by_name(
48		&mut self,
49		namespace: NamespaceId,
50		name: impl Into<Fragment> + Send,
51	) -> crate::Result<FlowDef> {
52		let name = name.into();
53		let name_text = name.text().to_string();
54		let flow = self.find_flow_by_name(namespace, name.text()).await?;
55		match flow {
56			Some(f) => Ok(f),
57			None => {
58				let namespace = CatalogStore::get_namespace(self, namespace).await?;
59				return_error!(flow_not_found(name, &namespace.name, &name_text))
60			}
61		}
62	}
63}
64
65pub trait CatalogTrackFlowChangeOperations {
66	fn track_flow_def_created(&mut self, flow: FlowDef) -> crate::Result<()>;
67
68	fn track_flow_def_updated(&mut self, pre: FlowDef, post: FlowDef) -> crate::Result<()>;
69
70	fn track_flow_def_deleted(&mut self, flow: FlowDef) -> crate::Result<()>;
71}