reifydb_catalog/transaction/
flow.rs1use async_trait::async_trait;
5use reifydb_core::{
6 interface::{FlowDef, FlowId, NamespaceId, QueryTransaction},
7 return_error,
8};
9use reifydb_type::{Fragment, diagnostic::catalog::flow_not_found};
10use tracing::instrument;
11
12use crate::{CatalogStore, transaction::MaterializedCatalogTransaction};
13
14#[async_trait]
15pub trait CatalogFlowQueryOperations: Send {
16 async fn find_flow(&mut self, id: FlowId) -> crate::Result<Option<FlowDef>>;
17
18 async fn find_flow_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<Option<FlowDef>>;
19
20 async fn get_flow(&mut self, id: FlowId) -> crate::Result<FlowDef>;
21
22 async fn get_flow_by_name(
23 &mut self,
24 namespace: NamespaceId,
25 name: impl Into<Fragment> + Send,
26 ) -> crate::Result<FlowDef>;
27}
28
29#[async_trait]
30impl<QT: QueryTransaction + MaterializedCatalogTransaction + Send> CatalogFlowQueryOperations for QT {
31 #[instrument(name = "catalog::flow::find", level = "trace", skip(self))]
32 async fn find_flow(&mut self, id: FlowId) -> crate::Result<Option<FlowDef>> {
33 CatalogStore::find_flow(self, id).await
34 }
35
36 #[instrument(name = "catalog::flow::find_by_name", level = "trace", skip(self, name))]
37 async fn find_flow_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<Option<FlowDef>> {
38 CatalogStore::find_flow_by_name(self, namespace, name).await
39 }
40
41 #[instrument(name = "catalog::flow::get", level = "trace", skip(self))]
42 async fn get_flow(&mut self, id: FlowId) -> crate::Result<FlowDef> {
43 CatalogStore::get_flow(self, id).await
44 }
45
46 #[instrument(name = "catalog::flow::get_by_name", level = "trace", skip(self, name))]
47 async fn get_flow_by_name(
48 &mut self,
49 namespace: NamespaceId,
50 name: impl Into<Fragment> + Send,
51 ) -> crate::Result<FlowDef> {
52 let name = name.into();
53 let name_text = name.text().to_string();
54 let flow = self.find_flow_by_name(namespace, name.text()).await?;
55 match flow {
56 Some(f) => Ok(f),
57 None => {
58 let namespace = CatalogStore::get_namespace(self, namespace).await?;
59 return_error!(flow_not_found(name, &namespace.name, &name_text))
60 }
61 }
62 }
63}
64
65pub trait CatalogTrackFlowChangeOperations {
66 fn track_flow_def_created(&mut self, flow: FlowDef) -> crate::Result<()>;
67
68 fn track_flow_def_updated(&mut self, pre: FlowDef, post: FlowDef) -> crate::Result<()>;
69
70 fn track_flow_def_deleted(&mut self, flow: FlowDef) -> crate::Result<()>;
71}