reifydb_catalog/store/source/
get.rs1use reifydb_core::{
5 Error,
6 interface::{QueryTransaction, SourceDef, SourceId},
7};
8use reifydb_type::internal;
9
10use crate::CatalogStore;
11
12impl CatalogStore {
13 pub async fn get_source(
16 rx: &mut impl QueryTransaction,
17 source: impl Into<SourceId>,
18 ) -> crate::Result<SourceDef> {
19 let source_id = source.into();
20
21 CatalogStore::find_source(rx, source_id).await?.ok_or_else(|| {
22 let source_type = match source_id {
23 SourceId::Table(_) => "Table",
24 SourceId::View(_) => "View",
25 SourceId::Flow(_) => "Flow",
26 SourceId::TableVirtual(_) => "TableVirtual",
27 SourceId::RingBuffer(_) => "RingBuffer",
28 SourceId::Dictionary(_) => "Dictionary",
29 };
30
31 Error(internal!(
32 "{} with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
33 source_type,
34 source_id
35 ))
36 })
37 }
38}
39
40#[cfg(test)]
41mod tests {
42 use reifydb_core::interface::{SourceDef, SourceId, TableId, ViewId};
43 use reifydb_engine::test_utils::create_test_command_transaction;
44 use reifydb_type::{Type, TypeConstraint};
45
46 use crate::{
47 CatalogStore,
48 store::view::{ViewColumnToCreate, ViewToCreate},
49 test_utils::{ensure_test_namespace, ensure_test_table},
50 };
51
52 #[tokio::test]
53 async fn test_get_source_table() {
54 let mut txn = create_test_command_transaction().await;
55 let table = ensure_test_table(&mut txn).await;
56
57 let source = CatalogStore::get_source(&mut txn, table.id).await.unwrap();
59
60 match source {
61 SourceDef::Table(t) => {
62 assert_eq!(t.id, table.id);
63 assert_eq!(t.name, table.name);
64 }
65 _ => panic!("Expected table"),
66 }
67
68 let source = CatalogStore::get_source(&mut txn, SourceId::Table(table.id)).await.unwrap();
70
71 match source {
72 SourceDef::Table(t) => {
73 assert_eq!(t.id, table.id);
74 }
75 _ => panic!("Expected table"),
76 }
77 }
78
79 #[tokio::test]
80 async fn test_get_source_view() {
81 let mut txn = create_test_command_transaction().await;
82 let namespace = ensure_test_namespace(&mut txn).await;
83
84 let view = CatalogStore::create_deferred_view(
85 &mut txn,
86 ViewToCreate {
87 fragment: None,
88 namespace: namespace.id,
89 name: "test_view".to_string(),
90 columns: vec![ViewColumnToCreate {
91 name: "id".to_string(),
92 constraint: TypeConstraint::unconstrained(Type::Uint8),
93 fragment: None,
94 }],
95 },
96 )
97 .await
98 .unwrap();
99
100 let source = CatalogStore::get_source(&mut txn, view.id).await.unwrap();
102
103 match source {
104 SourceDef::View(v) => {
105 assert_eq!(v.id, view.id);
106 assert_eq!(v.name, view.name);
107 }
108 _ => panic!("Expected view"),
109 }
110
111 let source = CatalogStore::get_source(&mut txn, SourceId::View(view.id)).await.unwrap();
113
114 match source {
115 SourceDef::View(v) => {
116 assert_eq!(v.id, view.id);
117 }
118 _ => panic!("Expected view"),
119 }
120 }
121
122 #[tokio::test]
123 async fn test_get_source_not_found_table() {
124 let mut txn = create_test_command_transaction().await;
125
126 let result = CatalogStore::get_source(&mut txn, TableId(999)).await;
128 assert!(result.is_err());
129
130 let err = result.unwrap_err();
131 assert!(err.to_string().contains("Table with ID"));
132 assert!(err.to_string().contains("critical catalog inconsistency"));
133 }
134
135 #[tokio::test]
136 async fn test_get_source_not_found_view() {
137 let mut txn = create_test_command_transaction().await;
138
139 let result = CatalogStore::get_source(&mut txn, ViewId(999)).await;
141 assert!(result.is_err());
142
143 let err = result.unwrap_err();
144 assert!(err.to_string().contains("View with ID"));
145 assert!(err.to_string().contains("critical catalog inconsistency"));
146 }
147}