reifydb_catalog/transaction/
view.rs1use async_trait::async_trait;
5use reifydb_core::interface::{
6 CommandTransaction, NamespaceId, QueryTransaction, TransactionalChanges, TransactionalViewChanges, ViewDef,
7 ViewId,
8 interceptor::{ViewDefInterceptor, WithInterceptors},
9};
10use reifydb_type::{
11 Fragment,
12 diagnostic::catalog::{view_already_exists, view_not_found},
13 error, internal, return_error,
14};
15use tracing::{instrument, warn};
16
17use crate::{
18 CatalogNamespaceQueryOperations, CatalogStore, store::view::ViewToCreate,
19 transaction::MaterializedCatalogTransaction,
20};
21
22#[async_trait]
23pub trait CatalogViewCommandOperations: Send {
24 async fn create_view(&mut self, view: ViewToCreate) -> crate::Result<ViewDef>;
25
26 }
31
32pub trait CatalogTrackViewChangeOperations {
33 fn track_view_def_created(&mut self, view: ViewDef) -> crate::Result<()>;
34
35 fn track_view_def_updated(&mut self, pre: ViewDef, post: ViewDef) -> crate::Result<()>;
36
37 fn track_view_def_deleted(&mut self, view: ViewDef) -> crate::Result<()>;
38}
39
40#[async_trait]
41pub trait CatalogViewQueryOperations: CatalogNamespaceQueryOperations {
42 async fn find_view(&mut self, id: ViewId) -> crate::Result<Option<ViewDef>>;
43
44 async fn find_view_by_name(&mut self, namespace: NamespaceId, name: &str) -> crate::Result<Option<ViewDef>>;
45
46 async fn get_view(&mut self, id: ViewId) -> crate::Result<ViewDef>;
47
48 async fn get_view_by_name(
49 &mut self,
50 namespace: NamespaceId,
51 name: impl Into<Fragment> + Send,
52 ) -> crate::Result<ViewDef>;
53}
54
55#[async_trait]
56impl<
57 CT: CommandTransaction
58 + MaterializedCatalogTransaction
59 + CatalogTrackViewChangeOperations
60 + WithInterceptors<CT>
61 + TransactionalChanges
62 + Send
63 + 'static,
64> CatalogViewCommandOperations for CT
65{
66 #[instrument(name = "catalog::view::create", level = "debug", skip(self, to_create))]
67 async fn create_view(&mut self, to_create: ViewToCreate) -> reifydb_core::Result<ViewDef> {
68 if let Some(view) = self.find_view_by_name(to_create.namespace, to_create.name.as_str()).await? {
69 let namespace = self.get_namespace(to_create.namespace).await?;
70 return_error!(view_already_exists(
71 to_create.fragment.unwrap_or_else(|| Fragment::None),
72 &namespace.name,
73 &view.name
74 ));
75 }
76 let result = CatalogStore::create_deferred_view(self, to_create).await?;
77 self.track_view_def_created(result.clone())?;
78 ViewDefInterceptor::post_create(self, &result).await?;
79 Ok(result)
80 }
81}
82
83#[async_trait]
84impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges + Send + 'static>
85 CatalogViewQueryOperations for QT
86{
87 #[instrument(name = "catalog::view::find", level = "trace", skip(self))]
88 async fn find_view(&mut self, id: ViewId) -> reifydb_core::Result<Option<ViewDef>> {
89 if let Some(view) = TransactionalViewChanges::find_view(self, id) {
92 return Ok(Some(view.clone()));
93 }
94
95 if TransactionalViewChanges::is_view_deleted(self, id) {
98 return Ok(None);
99 }
100
101 if let Some(view) = self.catalog().find_view(id, self.version()) {
103 return Ok(Some(view));
104 }
105
106 if let Some(view) = CatalogStore::find_view(self, id).await? {
108 warn!("View with ID {:?} found in storage but not in MaterializedCatalog", id);
109 return Ok(Some(view));
110 }
111
112 Ok(None)
113 }
114
115 #[instrument(name = "catalog::view::find_by_name", level = "trace", skip(self, name))]
116 async fn find_view_by_name(
117 &mut self,
118 namespace: NamespaceId,
119 name: &str,
120 ) -> reifydb_core::Result<Option<ViewDef>> {
121 if let Some(view) = TransactionalViewChanges::find_view_by_name(self, namespace, name) {
124 return Ok(Some(view.clone()));
125 }
126
127 if TransactionalViewChanges::is_view_deleted_by_name(self, namespace, name) {
130 return Ok(None);
131 }
132
133 if let Some(view) = self.catalog().find_view_by_name(namespace, name, self.version()) {
135 return Ok(Some(view));
136 }
137
138 if let Some(view) = CatalogStore::find_view_by_name(self, namespace, name).await? {
140 warn!(
141 "View '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
142 name, namespace
143 );
144 return Ok(Some(view));
145 }
146
147 Ok(None)
148 }
149
150 #[instrument(name = "catalog::view::get", level = "trace", skip(self))]
151 async fn get_view(&mut self, id: ViewId) -> reifydb_core::Result<ViewDef> {
152 self.find_view(id).await?.ok_or_else(|| {
153 error!(internal!(
154 "View with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
155 id
156 ))
157 })
158 }
159
160 #[instrument(name = "catalog::view::get_by_name", level = "trace", skip(self, name))]
161 async fn get_view_by_name(
162 &mut self,
163 namespace: NamespaceId,
164 name: impl Into<Fragment> + Send,
165 ) -> reifydb_core::Result<ViewDef> {
166 let name = name.into();
167
168 let namespace_name = self.get_namespace(namespace).await?.name;
169
170 self.find_view_by_name(namespace, name.text())
171 .await?
172 .ok_or_else(|| error!(view_not_found(name.clone(), &namespace_name, name.text())))
173 }
174}