reifydb_catalog/store/source/
find.rs1use std::sync::Arc;
5
6use reifydb_core::interface::{QueryTransaction, SourceDef, SourceId};
7
8use crate::{CatalogStore, table_virtual::VirtualTableRegistry};
9
10impl CatalogStore {
11 pub async fn find_source(
14 rx: &mut impl QueryTransaction,
15 source: impl Into<SourceId>,
16 ) -> crate::Result<Option<SourceDef>> {
17 let source_id = source.into();
18
19 match source_id {
20 SourceId::Table(table_id) => {
21 if let Some(table) = Self::find_table(rx, table_id).await? {
22 Ok(Some(SourceDef::Table(table)))
23 } else {
24 Ok(None)
25 }
26 }
27 SourceId::View(view_id) => {
28 if let Some(view) = Self::find_view(rx, view_id).await? {
29 Ok(Some(SourceDef::View(view)))
30 } else {
31 Ok(None)
32 }
33 }
34 SourceId::Flow(flow_id) => {
35 if let Some(flow) = Self::find_flow(rx, flow_id).await? {
36 Ok(Some(SourceDef::Flow(flow)))
37 } else {
38 Ok(None)
39 }
40 }
41 SourceId::TableVirtual(table_virtual_id) => {
42 if let Some(table_virtual) =
43 VirtualTableRegistry::find_table_virtual(rx, table_virtual_id)?
44 {
45 let table_virtual_def =
47 Arc::try_unwrap(table_virtual).unwrap_or_else(|arc| (*arc).clone());
48 Ok(Some(SourceDef::TableVirtual(table_virtual_def)))
49 } else {
50 Ok(None)
51 }
52 }
53 SourceId::RingBuffer(_ringbuffer_id) => {
54 Ok(None)
58 }
59 SourceId::Dictionary(_dictionary_id) => {
60 Ok(None)
64 }
65 }
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use reifydb_core::interface::{SourceDef, SourceId, TableId, TableVirtualId, ViewId};
72 use reifydb_engine::test_utils::create_test_command_transaction;
73 use reifydb_type::{Type, TypeConstraint};
74
75 use crate::{
76 CatalogStore,
77 store::view::{ViewColumnToCreate, ViewToCreate},
78 test_utils::{ensure_test_namespace, ensure_test_table},
79 };
80
81 #[tokio::test]
82 async fn test_find_source_table() {
83 let mut txn = create_test_command_transaction().await;
84 let table = ensure_test_table(&mut txn).await;
85
86 let source = CatalogStore::find_source(&mut txn, table.id).await.unwrap().expect("Source should exist");
88
89 match source {
90 SourceDef::Table(t) => {
91 assert_eq!(t.id, table.id);
92 assert_eq!(t.name, table.name);
93 }
94 _ => panic!("Expected table"),
95 }
96
97 let source = CatalogStore::find_source(&mut txn, SourceId::Table(table.id))
99 .await
100 .unwrap()
101 .expect("Source should exist");
102
103 match source {
104 SourceDef::Table(t) => {
105 assert_eq!(t.id, table.id);
106 }
107 _ => panic!("Expected table"),
108 }
109 }
110
111 #[tokio::test]
112 async fn test_find_source_view() {
113 let mut txn = create_test_command_transaction().await;
114 let namespace = ensure_test_namespace(&mut txn).await;
115
116 let view = CatalogStore::create_deferred_view(
117 &mut txn,
118 ViewToCreate {
119 fragment: None,
120 namespace: namespace.id,
121 name: "test_view".to_string(),
122 columns: vec![ViewColumnToCreate {
123 name: "id".to_string(),
124 constraint: TypeConstraint::unconstrained(Type::Uint8),
125 fragment: None,
126 }],
127 },
128 )
129 .await
130 .unwrap();
131
132 let source = CatalogStore::find_source(&mut txn, view.id).await.unwrap().expect("Source should exist");
134
135 match source {
136 SourceDef::View(v) => {
137 assert_eq!(v.id, view.id);
138 assert_eq!(v.name, view.name);
139 }
140 _ => panic!("Expected view"),
141 }
142
143 let source = CatalogStore::find_source(&mut txn, SourceId::View(view.id))
145 .await
146 .unwrap()
147 .expect("Source should exist");
148
149 match source {
150 SourceDef::View(v) => {
151 assert_eq!(v.id, view.id);
152 }
153 _ => panic!("Expected view"),
154 }
155 }
156
157 #[tokio::test]
158 async fn test_find_source_not_found() {
159 let mut txn = create_test_command_transaction().await;
160
161 let source = CatalogStore::find_source(&mut txn, TableId(999)).await.unwrap();
163 assert!(source.is_none());
164
165 let source = CatalogStore::find_source(&mut txn, ViewId(999)).await.unwrap();
167 assert!(source.is_none());
168
169 let source = CatalogStore::find_source(&mut txn, TableVirtualId(999)).await.unwrap();
171 assert!(source.is_none());
172 }
173
174 #[tokio::test]
175 async fn test_find_source_table_virtual() {
176 let mut txn = create_test_command_transaction().await;
177
178 let sequences_id = crate::system::ids::table_virtual::SEQUENCES;
180 let source = CatalogStore::find_source(&mut txn, sequences_id)
181 .await
182 .unwrap()
183 .expect("Sequences virtual table should exist");
184
185 match source {
186 SourceDef::TableVirtual(tv) => {
187 assert_eq!(tv.id, sequences_id);
188 assert_eq!(tv.name, "sequences");
189 }
190 _ => panic!("Expected virtual table"),
191 }
192
193 let source = CatalogStore::find_source(&mut txn, SourceId::TableVirtual(sequences_id))
195 .await
196 .unwrap()
197 .expect("Source should exist");
198
199 match source {
200 SourceDef::TableVirtual(tv) => {
201 assert_eq!(tv.id, sequences_id);
202 }
203 _ => panic!("Expected virtual table"),
204 }
205 }
206}