reifydb_catalog/transaction/
view.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::interface::{
6	CommandTransaction, NamespaceId, QueryTransaction, TransactionalChanges, TransactionalViewChanges, ViewDef,
7	ViewId,
8	interceptor::{ViewDefInterceptor, WithInterceptors},
9};
10use reifydb_type::{
11	Fragment,
12	diagnostic::catalog::{view_already_exists, view_not_found},
13	error, internal, return_error,
14};
15use tracing::{instrument, warn};
16
17use crate::{
18	CatalogNamespaceQueryOperations, CatalogStore, store::view::ViewToCreate,
19	transaction::MaterializedCatalogTransaction,
20};
21
22#[async_trait]
23pub trait CatalogViewCommandOperations: Send {
24	async fn create_view(&mut self, view: ViewToCreate) -> crate::Result<ViewDef>;
25
26	// TODO: Implement when update/delete are ready
27	// async fn update_view(&mut self, view_id: ViewId, updates: ViewUpdates) ->
28	// crate::Result<ViewDef>; async fn delete_view(&mut self, view_id: ViewId)
29	// -> crate::Result<()>;
30}
31
32pub trait CatalogTrackViewChangeOperations {
33	fn track_view_def_created(&mut self, view: ViewDef) -> crate::Result<()>;
34
35	fn track_view_def_updated(&mut self, pre: ViewDef, post: ViewDef) -> crate::Result<()>;
36
37	fn track_view_def_deleted(&mut self, view: ViewDef) -> crate::Result<()>;
38}
39
40#[async_trait]
41pub trait CatalogViewQueryOperations: CatalogNamespaceQueryOperations {
42	async fn find_view(&mut self, id: ViewId) -> crate::Result<Option<ViewDef>>;
43
44	async fn find_view_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<Option<ViewDef>>;
45
46	async fn get_view(&mut self, id: ViewId) -> crate::Result<ViewDef>;
47
48	async fn get_view_by_name(
49		&mut self,
50		namespace: NamespaceId,
51		name: impl Into<Fragment> + Send,
52	) -> crate::Result<ViewDef>;
53}
54
55#[async_trait]
56impl<
57	CT: CommandTransaction
58		+ MaterializedCatalogTransaction
59		+ CatalogTrackViewChangeOperations
60		+ WithInterceptors<CT>
61		+ TransactionalChanges
62		+ Send
63		+ 'static,
64> CatalogViewCommandOperations for CT
65{
66	#[instrument(name = "catalog::view::create", level = "debug", skip(self, to_create))]
67	async fn create_view(&mut self, to_create: ViewToCreate) -> reifydb_core::Result<ViewDef> {
68		if let Some(view) = self.find_view_by_name(to_create.namespace, to_create.name.as_str()).await? {
69			let namespace = self.get_namespace(to_create.namespace).await?;
70			return_error!(view_already_exists(
71				to_create.fragment.unwrap_or_else(|| Fragment::None),
72				&namespace.name,
73				&view.name
74			));
75		}
76		let result = CatalogStore::create_deferred_view(self, to_create).await?;
77		self.track_view_def_created(result.clone())?;
78		ViewDefInterceptor::post_create(self, &result).await?;
79		Ok(result)
80	}
81}
82
83#[async_trait]
84impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges + Send + 'static>
85	CatalogViewQueryOperations for QT
86{
87	#[instrument(name = "catalog::view::find", level = "trace", skip(self))]
88	async fn find_view(&mut self, id: ViewId) -> reifydb_core::Result<Option<ViewDef>> {
89		// 1. Check transactional changes first
90		// nop for QueryTransaction
91		if let Some(view) = TransactionalViewChanges::find_view(self, id) {
92			return Ok(Some(view.clone()));
93		}
94
95		// 2. Check if deleted
96		// nop for QueryTransaction
97		if TransactionalViewChanges::is_view_deleted(self, id) {
98			return Ok(None);
99		}
100
101		// 3. Check MaterializedCatalog
102		if let Some(view) = self.catalog().find_view(id, self.version()) {
103			return Ok(Some(view));
104		}
105
106		// 4. Fall back to storage as defensive measure
107		if let Some(view) = CatalogStore::find_view(self, id).await? {
108			warn!("View with ID {:?} found in storage but not in MaterializedCatalog", id);
109			return Ok(Some(view));
110		}
111
112		Ok(None)
113	}
114
115	#[instrument(name = "catalog::view::find_by_name", level = "trace", skip(self, name))]
116	async fn find_view_by_name(
117		&mut self,
118		namespace: NamespaceId,
119		name: &str,
120	) -> reifydb_core::Result<Option<ViewDef>> {
121		// 1. Check transactional changes first
122		// nop for QueryTransaction
123		if let Some(view) = TransactionalViewChanges::find_view_by_name(self, namespace, name) {
124			return Ok(Some(view.clone()));
125		}
126
127		// 2. Check if deleted
128		// nop for QueryTransaction
129		if TransactionalViewChanges::is_view_deleted_by_name(self, namespace, name) {
130			return Ok(None);
131		}
132
133		// 3. Check MaterializedCatalog
134		if let Some(view) = self.catalog().find_view_by_name(namespace, name, self.version()) {
135			return Ok(Some(view));
136		}
137
138		// 4. Fall back to storage as defensive measure
139		if let Some(view) = CatalogStore::find_view_by_name(self, namespace, name).await? {
140			warn!(
141				"View '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
142				name, namespace
143			);
144			return Ok(Some(view));
145		}
146
147		Ok(None)
148	}
149
150	#[instrument(name = "catalog::view::get", level = "trace", skip(self))]
151	async fn get_view(&mut self, id: ViewId) -> reifydb_core::Result<ViewDef> {
152		self.find_view(id).await?.ok_or_else(|| {
153			error!(internal!(
154				"View with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
155				id
156			))
157		})
158	}
159
160	#[instrument(name = "catalog::view::get_by_name", level = "trace", skip(self, name))]
161	async fn get_view_by_name(
162		&mut self,
163		namespace: NamespaceId,
164		name: impl Into<Fragment> + Send,
165	) -> reifydb_core::Result<ViewDef> {
166		let name = name.into();
167
168		let namespace_name = self.get_namespace(namespace).await?.name;
169
170		self.find_view_by_name(namespace, name.text())
171			.await?
172			.ok_or_else(|| error!(view_not_found(name.clone(), &namespace_name, name.text())))
173	}
174}