reifydb_catalog/transaction/
source.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::{
6	error,
7	interface::{NamespaceId, QueryTransaction, SourceDef, SourceId},
8};
9use reifydb_type::internal;
10use tracing::instrument;
11
12use crate::{CatalogTableQueryOperations, CatalogViewQueryOperations};
13
14#[async_trait]
15pub trait CatalogSourceQueryOperations {
16	async fn find_source_by_name(
17		&mut self,
18		namespace: NamespaceId,
19		source: &str,
20	) -> crate::Result<Option<SourceDef>>;
21
22	async fn find_source(&mut self, id: SourceId) -> crate::Result<Option<SourceDef>>;
23
24	async fn get_source(&mut self, id: SourceId) -> crate::Result<SourceDef>;
25
26	async fn get_source_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<SourceDef>;
27}
28
29#[async_trait]
30impl<T: QueryTransaction + CatalogTableQueryOperations + CatalogViewQueryOperations> CatalogSourceQueryOperations
31	for T
32{
33	#[instrument(name = "catalog::source::find_by_name", level = "trace", skip(self, _source))]
34	async fn find_source_by_name(
35		&mut self,
36		_namespace: NamespaceId,
37		_source: &str,
38	) -> reifydb_core::Result<Option<SourceDef>> {
39		todo!()
40	}
41
42	#[instrument(name = "catalog::source::find", level = "trace", skip(self))]
43	async fn find_source(&mut self, id: SourceId) -> reifydb_core::Result<Option<SourceDef>> {
44		match id {
45			SourceId::Table(table_id) => {
46				Ok(self.find_table(table_id).await?.and_then(|s| Some(SourceDef::Table(s))))
47			}
48			SourceId::View(view_id) => {
49				Ok(self.find_view(view_id).await?.and_then(|s| Some(SourceDef::View(s))))
50			}
51			SourceId::Flow(_) => unimplemented!(),
52			SourceId::TableVirtual(_) => unimplemented!(),
53			SourceId::RingBuffer(_) => unimplemented!(),
54			SourceId::Dictionary(_) => unimplemented!(),
55		}
56	}
57
58	#[instrument(name = "catalog::source::get", level = "trace", skip(self))]
59	async fn get_source(&mut self, id: SourceId) -> reifydb_core::Result<SourceDef> {
60		self.find_source(id).await?.ok_or_else(|| {
61			error!(internal!(
62				"Source with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
63				id
64			))
65		})
66	}
67
68	#[instrument(name = "catalog::source::get_by_name", level = "trace", skip(self, _name))]
69	async fn get_source_by_name(
70		&mut self,
71		_namespace: NamespaceId,
72		_name: &str,
73	) -> reifydb_core::Result<SourceDef> {
74		todo!()
75	}
76}