reifydb_catalog/store/table/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	diagnostic::catalog::table_already_exists,
6	interface::{
7		ColumnPolicyKind, CommandTransaction, DictionaryId, NamespaceId, NamespaceTableKey, SourceId, TableDef,
8		TableId, TableKey,
9	},
10	retention::RetentionPolicy,
11	return_error,
12};
13use reifydb_type::{Fragment, TypeConstraint};
14
15use crate::{
16	CatalogStore,
17	store::{
18		column::{ColumnIndex, ColumnToCreate},
19		retention_policy::create::create_source_retention_policy,
20		sequence::SystemSequence,
21		table::layout::{table, table_namespace},
22	},
23};
24
25#[derive(Debug, Clone)]
26pub struct TableColumnToCreate {
27	pub name: String,
28	pub constraint: TypeConstraint,
29	pub policies: Vec<ColumnPolicyKind>,
30	pub auto_increment: bool,
31	pub fragment: Option<Fragment>,
32	pub dictionary_id: Option<DictionaryId>,
33}
34
35#[derive(Debug, Clone)]
36pub struct TableToCreate {
37	pub fragment: Option<Fragment>,
38	pub table: String,
39	pub namespace: NamespaceId,
40	pub columns: Vec<TableColumnToCreate>,
41	pub retention_policy: Option<RetentionPolicy>,
42}
43
44impl CatalogStore {
45	pub async fn create_table(
46		txn: &mut impl CommandTransaction,
47		to_create: TableToCreate,
48	) -> crate::Result<TableDef> {
49		let namespace_id = to_create.namespace;
50
51		if let Some(table) = CatalogStore::find_table_by_name(txn, namespace_id, &to_create.table).await? {
52			let namespace = CatalogStore::get_namespace(txn, namespace_id).await?;
53			return_error!(table_already_exists(
54				to_create.fragment.unwrap_or_else(|| Fragment::None),
55				&namespace.name,
56				&table.name
57			));
58		}
59
60		let table_id = SystemSequence::next_table_id(txn).await?;
61		Self::store_table(txn, table_id, namespace_id, &to_create).await?;
62		Self::link_table_to_namespace(txn, namespace_id, table_id, &to_create.table).await?;
63
64		if let Some(retention_policy) = &to_create.retention_policy {
65			create_source_retention_policy(txn, SourceId::Table(table_id), retention_policy).await?;
66		}
67
68		Self::insert_columns(txn, table_id, to_create).await?;
69
70		Ok(Self::get_table(txn, table_id).await?)
71	}
72
73	async fn store_table(
74		txn: &mut impl CommandTransaction,
75		table: TableId,
76		namespace: NamespaceId,
77		to_create: &TableToCreate,
78	) -> crate::Result<()> {
79		let mut row = table::LAYOUT.allocate();
80		table::LAYOUT.set_u64(&mut row, table::ID, table);
81		table::LAYOUT.set_u64(&mut row, table::NAMESPACE, namespace);
82		table::LAYOUT.set_utf8(&mut row, table::NAME, &to_create.table);
83
84		// Initialize with no primary key
85		table::LAYOUT.set_u64(&mut row, table::PRIMARY_KEY, 0u64);
86
87		txn.set(&TableKey::encoded(table), row).await?;
88
89		Ok(())
90	}
91
92	async fn link_table_to_namespace(
93		txn: &mut impl CommandTransaction,
94		namespace: NamespaceId,
95		table: TableId,
96		name: &str,
97	) -> crate::Result<()> {
98		let mut row = table_namespace::LAYOUT.allocate();
99		table_namespace::LAYOUT.set_u64(&mut row, table_namespace::ID, table);
100		table_namespace::LAYOUT.set_utf8(&mut row, table_namespace::NAME, name);
101		txn.set(&NamespaceTableKey::encoded(namespace, table), row).await?;
102		Ok(())
103	}
104
105	async fn insert_columns(
106		txn: &mut impl CommandTransaction,
107		table: TableId,
108		to_create: TableToCreate,
109	) -> crate::Result<()> {
110		// Look up namespace name for error messages
111		let namespace_name = Self::find_namespace(txn, to_create.namespace)
112			.await?
113			.map(|s| s.name)
114			.unwrap_or_else(|| format!("namespace_{}", to_create.namespace));
115
116		for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
117			Self::create_column(
118				txn,
119				table,
120				ColumnToCreate {
121					fragment: column_to_create.fragment.clone(),
122					namespace_name: namespace_name.clone(),
123					table,
124					table_name: to_create.table.clone(),
125					column: column_to_create.name,
126					constraint: column_to_create.constraint.clone(),
127					if_not_exists: false,
128					policies: column_to_create.policies.clone(),
129					index: ColumnIndex(idx as u8),
130					auto_increment: column_to_create.auto_increment,
131					dictionary_id: column_to_create.dictionary_id,
132				},
133			)
134			.await?;
135		}
136		Ok(())
137	}
138}
139
140#[cfg(test)]
141mod tests {
142	use reifydb_core::interface::{MultiVersionQueryTransaction, NamespaceId, NamespaceTableKey, TableId};
143	use reifydb_engine::test_utils::create_test_command_transaction;
144
145	use crate::{
146		CatalogStore,
147		store::table::{TableToCreate, layout::table_namespace},
148		test_utils::ensure_test_namespace,
149	};
150
151	#[tokio::test]
152	async fn test_create_table() {
153		let mut txn = create_test_command_transaction().await;
154
155		let test_namespace = ensure_test_namespace(&mut txn).await;
156
157		let to_create = TableToCreate {
158			namespace: test_namespace.id,
159			table: "test_table".to_string(),
160			columns: vec![],
161			fragment: None,
162			retention_policy: None,
163		};
164
165		// First creation should succeed
166		let result = CatalogStore::create_table(&mut txn, to_create.clone()).await.unwrap();
167		assert_eq!(result.id, TableId(1025));
168		assert_eq!(result.namespace, NamespaceId(1025));
169		assert_eq!(result.name, "test_table");
170
171		let err = CatalogStore::create_table(&mut txn, to_create).await.unwrap_err();
172		assert_eq!(err.diagnostic().code, "CA_003");
173	}
174
175	#[tokio::test]
176	async fn test_table_linked_to_namespace() {
177		let mut txn = create_test_command_transaction().await;
178		let test_namespace = ensure_test_namespace(&mut txn).await;
179
180		let to_create = TableToCreate {
181			namespace: test_namespace.id,
182			table: "test_table".to_string(),
183			columns: vec![],
184			fragment: None,
185			retention_policy: None,
186		};
187
188		CatalogStore::create_table(&mut txn, to_create).await.unwrap();
189
190		let to_create = TableToCreate {
191			namespace: test_namespace.id,
192			table: "another_table".to_string(),
193			columns: vec![],
194			fragment: None,
195			retention_policy: None,
196		};
197
198		CatalogStore::create_table(&mut txn, to_create).await.unwrap();
199
200		let links = txn
201			.range(NamespaceTableKey::full_scan(test_namespace.id))
202			.await
203			.unwrap()
204			.items
205			.into_iter()
206			.collect::<Vec<_>>();
207		assert_eq!(links.len(), 2);
208
209		let link = &links[1];
210		let row = &link.values;
211		assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1025);
212		assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "test_table");
213
214		let link = &links[0];
215		let row = &link.values;
216		assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1026);
217		assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "another_table");
218	}
219}