reifydb_catalog/store/source/
find.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use reifydb_core::interface::{QueryTransaction, SourceDef, SourceId};
7
8use crate::{CatalogStore, table_virtual::VirtualTableRegistry};
9
10impl CatalogStore {
11	/// Find a source (table, store::view, or virtual table) by its SourceId
12	/// Returns None if the source doesn't exist
13	pub async fn find_source(
14		rx: &mut impl QueryTransaction,
15		source: impl Into<SourceId>,
16	) -> crate::Result<Option<SourceDef>> {
17		let source_id = source.into();
18
19		match source_id {
20			SourceId::Table(table_id) => {
21				if let Some(table) = Self::find_table(rx, table_id).await? {
22					Ok(Some(SourceDef::Table(table)))
23				} else {
24					Ok(None)
25				}
26			}
27			SourceId::View(view_id) => {
28				if let Some(view) = Self::find_view(rx, view_id).await? {
29					Ok(Some(SourceDef::View(view)))
30				} else {
31					Ok(None)
32				}
33			}
34			SourceId::Flow(flow_id) => {
35				if let Some(flow) = Self::find_flow(rx, flow_id).await? {
36					Ok(Some(SourceDef::Flow(flow)))
37				} else {
38					Ok(None)
39				}
40			}
41			SourceId::TableVirtual(table_virtual_id) => {
42				if let Some(table_virtual) =
43					VirtualTableRegistry::find_table_virtual(rx, table_virtual_id)?
44				{
45					// Convert Arc<TableVirtualDef> to TableVirtualDef
46					let table_virtual_def =
47						Arc::try_unwrap(table_virtual).unwrap_or_else(|arc| (*arc).clone());
48					Ok(Some(SourceDef::TableVirtual(table_virtual_def)))
49				} else {
50					Ok(None)
51				}
52			}
53			SourceId::RingBuffer(_ringbuffer_id) => {
54				// TODO: Implement find_ringbuffer when ring
55				// buffer catalog is ready For now, ring
56				// buffers are not yet queryable
57				Ok(None)
58			}
59			SourceId::Dictionary(_dictionary_id) => {
60				// TODO: Implement find_dictionary when dictionary
61				// catalog is ready For now, dictionaries return
62				// None as they use a different retrieval mechanism
63				Ok(None)
64			}
65		}
66	}
67}
68
69#[cfg(test)]
70mod tests {
71	use reifydb_core::interface::{SourceDef, SourceId, TableId, TableVirtualId, ViewId};
72	use reifydb_engine::test_utils::create_test_command_transaction;
73	use reifydb_type::{Type, TypeConstraint};
74
75	use crate::{
76		CatalogStore,
77		store::view::{ViewColumnToCreate, ViewToCreate},
78		test_utils::{ensure_test_namespace, ensure_test_table},
79	};
80
81	#[tokio::test]
82	async fn test_find_source_table() {
83		let mut txn = create_test_command_transaction().await;
84		let table = ensure_test_table(&mut txn).await;
85
86		// Find source by TableId
87		let source = CatalogStore::find_source(&mut txn, table.id).await.unwrap().expect("Source should exist");
88
89		match source {
90			SourceDef::Table(t) => {
91				assert_eq!(t.id, table.id);
92				assert_eq!(t.name, table.name);
93			}
94			_ => panic!("Expected table"),
95		}
96
97		// Find source by SourceId::Table
98		let source = CatalogStore::find_source(&mut txn, SourceId::Table(table.id))
99			.await
100			.unwrap()
101			.expect("Source should exist");
102
103		match source {
104			SourceDef::Table(t) => {
105				assert_eq!(t.id, table.id);
106			}
107			_ => panic!("Expected table"),
108		}
109	}
110
111	#[tokio::test]
112	async fn test_find_source_view() {
113		let mut txn = create_test_command_transaction().await;
114		let namespace = ensure_test_namespace(&mut txn).await;
115
116		let view = CatalogStore::create_deferred_view(
117			&mut txn,
118			ViewToCreate {
119				fragment: None,
120				namespace: namespace.id,
121				name: "test_view".to_string(),
122				columns: vec![ViewColumnToCreate {
123					name: "id".to_string(),
124					constraint: TypeConstraint::unconstrained(Type::Uint8),
125					fragment: None,
126				}],
127			},
128		)
129		.await
130		.unwrap();
131
132		// Find source by ViewId
133		let source = CatalogStore::find_source(&mut txn, view.id).await.unwrap().expect("Source should exist");
134
135		match source {
136			SourceDef::View(v) => {
137				assert_eq!(v.id, view.id);
138				assert_eq!(v.name, view.name);
139			}
140			_ => panic!("Expected view"),
141		}
142
143		// Find source by SourceId::View
144		let source = CatalogStore::find_source(&mut txn, SourceId::View(view.id))
145			.await
146			.unwrap()
147			.expect("Source should exist");
148
149		match source {
150			SourceDef::View(v) => {
151				assert_eq!(v.id, view.id);
152			}
153			_ => panic!("Expected view"),
154		}
155	}
156
157	#[tokio::test]
158	async fn test_find_source_not_found() {
159		let mut txn = create_test_command_transaction().await;
160
161		// Non-existent table
162		let source = CatalogStore::find_source(&mut txn, TableId(999)).await.unwrap();
163		assert!(source.is_none());
164
165		// Non-existent view
166		let source = CatalogStore::find_source(&mut txn, ViewId(999)).await.unwrap();
167		assert!(source.is_none());
168
169		// Non-existent virtual table
170		let source = CatalogStore::find_source(&mut txn, TableVirtualId(999)).await.unwrap();
171		assert!(source.is_none());
172	}
173
174	#[tokio::test]
175	async fn test_find_source_table_virtual() {
176		let mut txn = create_test_command_transaction().await;
177
178		// Find the sequences virtual table
179		let sequences_id = crate::system::ids::table_virtual::SEQUENCES;
180		let source = CatalogStore::find_source(&mut txn, sequences_id)
181			.await
182			.unwrap()
183			.expect("Sequences virtual table should exist");
184
185		match source {
186			SourceDef::TableVirtual(tv) => {
187				assert_eq!(tv.id, sequences_id);
188				assert_eq!(tv.name, "sequences");
189			}
190			_ => panic!("Expected virtual table"),
191		}
192
193		// Find source by SourceId::TableVirtual
194		let source = CatalogStore::find_source(&mut txn, SourceId::TableVirtual(sequences_id))
195			.await
196			.unwrap()
197			.expect("Source should exist");
198
199		match source {
200			SourceDef::TableVirtual(tv) => {
201				assert_eq!(tv.id, sequences_id);
202			}
203			_ => panic!("Expected virtual table"),
204		}
205	}
206}