reifydb_catalog/store/view/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use ViewKind::Deferred;
5use reifydb_core::{
6	diagnostic::catalog::view_already_exists,
7	interface::{
8		ColumnIndex, CommandTransaction, NamespaceId, NamespaceViewKey, TableId, ViewDef, ViewId, ViewKey,
9		ViewKind, ViewKind::Transactional,
10	},
11	return_error,
12};
13use reifydb_type::{Fragment, TypeConstraint};
14
15use crate::{
16	CatalogStore,
17	store::{
18		column::ColumnToCreate,
19		sequence::SystemSequence,
20		view::layout::{view, view_namespace},
21	},
22};
23
24#[derive(Debug, Clone)]
25pub struct ViewColumnToCreate {
26	pub name: String,
27	pub constraint: TypeConstraint,
28	pub fragment: Option<Fragment>,
29}
30
31#[derive(Debug, Clone)]
32pub struct ViewToCreate {
33	pub fragment: Option<Fragment>,
34	pub name: String,
35	pub namespace: NamespaceId,
36	pub columns: Vec<ViewColumnToCreate>,
37}
38
39impl CatalogStore {
40	pub async fn create_deferred_view(
41		txn: &mut impl CommandTransaction,
42		to_create: ViewToCreate,
43	) -> crate::Result<ViewDef> {
44		Self::create_view(txn, to_create, Deferred).await
45	}
46
47	pub async fn create_transactional_view(
48		txn: &mut impl CommandTransaction,
49		to_create: ViewToCreate,
50	) -> crate::Result<ViewDef> {
51		Self::create_view(txn, to_create, Transactional).await
52	}
53
54	async fn create_view(
55		txn: &mut impl CommandTransaction,
56		to_create: ViewToCreate,
57		kind: ViewKind,
58	) -> crate::Result<ViewDef> {
59		let namespace_id = to_create.namespace;
60
61		if let Some(table) = CatalogStore::find_view_by_name(txn, namespace_id, &to_create.name).await? {
62			let namespace = CatalogStore::get_namespace(txn, namespace_id).await?;
63			return_error!(view_already_exists(
64				to_create.fragment.unwrap_or_else(|| Fragment::None),
65				&namespace.name,
66				&table.name
67			));
68		}
69
70		let view_id = SystemSequence::next_view_id(txn).await?;
71		Self::store_view(txn, view_id, namespace_id, &to_create, kind).await?;
72		Self::link_view_to_namespace(txn, namespace_id, view_id, &to_create.name).await?;
73
74		Self::insert_columns_for_view(txn, view_id, to_create).await?;
75
76		Ok(Self::get_view(txn, view_id).await?)
77	}
78
79	async fn store_view(
80		txn: &mut impl CommandTransaction,
81		view: ViewId,
82		namespace: NamespaceId,
83		to_create: &ViewToCreate,
84		kind: ViewKind,
85	) -> crate::Result<()> {
86		let mut row = view::LAYOUT.allocate();
87		view::LAYOUT.set_u64(&mut row, view::ID, view);
88		view::LAYOUT.set_u64(&mut row, view::NAMESPACE, namespace);
89		view::LAYOUT.set_utf8(&mut row, view::NAME, &to_create.name);
90		view::LAYOUT.set_u8(
91			&mut row,
92			view::KIND,
93			match kind {
94				Deferred => 0,
95				Transactional => 1,
96			},
97		);
98		view::LAYOUT.set_u64(&mut row, view::PRIMARY_KEY, 0u64); // Initialize with no primary key
99
100		txn.set(&ViewKey::encoded(view), row).await?;
101
102		Ok(())
103	}
104
105	async fn link_view_to_namespace(
106		txn: &mut impl CommandTransaction,
107		namespace: NamespaceId,
108		view: ViewId,
109		name: &str,
110	) -> crate::Result<()> {
111		let mut row = view_namespace::LAYOUT.allocate();
112		view_namespace::LAYOUT.set_u64(&mut row, view_namespace::ID, view);
113		view_namespace::LAYOUT.set_utf8(&mut row, view_namespace::NAME, name);
114		txn.set(&NamespaceViewKey::encoded(namespace, view), row).await?;
115		Ok(())
116	}
117
118	async fn insert_columns_for_view(
119		txn: &mut impl CommandTransaction,
120		view: ViewId,
121		to_create: ViewToCreate,
122	) -> crate::Result<()> {
123		// Look up namespace name for error messages
124		let namespace = Self::get_namespace(txn, to_create.namespace).await?;
125
126		for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
127			Self::create_column(
128				txn,
129				view,
130				ColumnToCreate {
131					fragment: column_to_create.fragment.clone(),
132					namespace_name: namespace.name.clone(),
133					table: TableId(view.0), // Convert ViewId to TableId (both are u64)
134					table_name: to_create.name.clone(),
135					column: column_to_create.name,
136					constraint: column_to_create.constraint.clone(),
137					if_not_exists: false,
138					policies: vec![],
139					index: ColumnIndex(idx as u8),
140					auto_increment: false,
141					dictionary_id: None, // Views don't support dictionaries yet
142				},
143			)
144			.await?;
145		}
146		Ok(())
147	}
148}
149
150#[cfg(test)]
151mod tests {
152	use reifydb_core::interface::{MultiVersionQueryTransaction, NamespaceId, NamespaceViewKey, ViewId};
153	use reifydb_engine::test_utils::create_test_command_transaction;
154
155	use crate::{
156		CatalogStore,
157		store::view::{ViewToCreate, layout::view_namespace},
158		test_utils::ensure_test_namespace,
159	};
160
161	#[tokio::test]
162	async fn test_create_deferred_view() {
163		let mut txn = create_test_command_transaction().await;
164
165		let namespace = ensure_test_namespace(&mut txn).await;
166
167		let to_create = ViewToCreate {
168			namespace: namespace.id,
169			name: "test_view".to_string(),
170			columns: vec![],
171			fragment: None,
172		};
173
174		// First creation should succeed
175		let result = CatalogStore::create_deferred_view(&mut txn, to_create.clone()).await.unwrap();
176		assert_eq!(result.id, ViewId(1025));
177		assert_eq!(result.namespace, NamespaceId(1025));
178		assert_eq!(result.name, "test_view");
179
180		let err = CatalogStore::create_deferred_view(&mut txn, to_create).await.unwrap_err();
181		assert_eq!(err.diagnostic().code, "CA_003");
182	}
183
184	#[tokio::test]
185	async fn test_view_linked_to_namespace() {
186		let mut txn = create_test_command_transaction().await;
187		let namespace = ensure_test_namespace(&mut txn).await;
188
189		let to_create = ViewToCreate {
190			namespace: namespace.id,
191			name: "test_view".to_string(),
192			columns: vec![],
193			fragment: None,
194		};
195
196		CatalogStore::create_deferred_view(&mut txn, to_create).await.unwrap();
197
198		let to_create = ViewToCreate {
199			namespace: namespace.id,
200			name: "another_view".to_string(),
201			columns: vec![],
202			fragment: None,
203		};
204
205		CatalogStore::create_deferred_view(&mut txn, to_create).await.unwrap();
206
207		let links = txn
208			.range(NamespaceViewKey::full_scan(namespace.id))
209			.await
210			.unwrap()
211			.items
212			.into_iter()
213			.collect::<Vec<_>>();
214		assert_eq!(links.len(), 2);
215
216		let link = &links[1];
217		let row = &link.values;
218		assert_eq!(view_namespace::LAYOUT.get_u64(row, view_namespace::ID), 1025);
219		assert_eq!(view_namespace::LAYOUT.get_utf8(row, view_namespace::NAME), "test_view");
220
221		let link = &links[0];
222		let row = &link.values;
223		assert_eq!(view_namespace::LAYOUT.get_u64(row, view_namespace::ID), 1026);
224		assert_eq!(view_namespace::LAYOUT.get_utf8(row, view_namespace::NAME), "another_view");
225	}
226
227	#[tokio::test]
228	async fn test_create_deferred_view_missing_namespace() {
229		let mut txn = create_test_command_transaction().await;
230
231		let to_create = ViewToCreate {
232			namespace: NamespaceId(999), // Non-existent namespace
233			name: "my_view".to_string(),
234			columns: vec![],
235			fragment: None,
236		};
237
238		CatalogStore::create_deferred_view(&mut txn, to_create).await.unwrap_err();
239	}
240}