reifydb_catalog/transaction/
source.rs1use async_trait::async_trait;
5use reifydb_core::{
6 error,
7 interface::{NamespaceId, QueryTransaction, SourceDef, SourceId},
8};
9use reifydb_type::internal;
10use tracing::instrument;
11
12use crate::{CatalogTableQueryOperations, CatalogViewQueryOperations};
13
14#[async_trait]
15pub trait CatalogSourceQueryOperations {
16 async fn find_source_by_name(
17 &mut self,
18 namespace: NamespaceId,
19 source: &str,
20 ) -> crate::Result<Option<SourceDef>>;
21
22 async fn find_source(&mut self, id: SourceId) -> crate::Result<Option<SourceDef>>;
23
24 async fn get_source(&mut self, id: SourceId) -> crate::Result<SourceDef>;
25
26 async fn get_source_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<SourceDef>;
27}
28
29#[async_trait]
30impl<T: QueryTransaction + CatalogTableQueryOperations + CatalogViewQueryOperations> CatalogSourceQueryOperations
31 for T
32{
33 #[instrument(name = "catalog::source::find_by_name", level = "trace", skip(self, _source))]
34 async fn find_source_by_name(
35 &mut self,
36 _namespace: NamespaceId,
37 _source: &str,
38 ) -> reifydb_core::Result<Option<SourceDef>> {
39 todo!()
40 }
41
42 #[instrument(name = "catalog::source::find", level = "trace", skip(self))]
43 async fn find_source(&mut self, id: SourceId) -> reifydb_core::Result<Option<SourceDef>> {
44 match id {
45 SourceId::Table(table_id) => {
46 Ok(self.find_table(table_id).await?.and_then(|s| Some(SourceDef::Table(s))))
47 }
48 SourceId::View(view_id) => {
49 Ok(self.find_view(view_id).await?.and_then(|s| Some(SourceDef::View(s))))
50 }
51 SourceId::Flow(_) => unimplemented!(),
52 SourceId::TableVirtual(_) => unimplemented!(),
53 SourceId::RingBuffer(_) => unimplemented!(),
54 SourceId::Dictionary(_) => unimplemented!(),
55 }
56 }
57
58 #[instrument(name = "catalog::source::get", level = "trace", skip(self))]
59 async fn get_source(&mut self, id: SourceId) -> reifydb_core::Result<SourceDef> {
60 self.find_source(id).await?.ok_or_else(|| {
61 error!(internal!(
62 "Source with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
63 id
64 ))
65 })
66 }
67
68 #[instrument(name = "catalog::source::get_by_name", level = "trace", skip(self, _name))]
69 async fn get_source_by_name(
70 &mut self,
71 _namespace: NamespaceId,
72 _name: &str,
73 ) -> reifydb_core::Result<SourceDef> {
74 todo!()
75 }
76}