reifydb_catalog/store/table/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	diagnostic::catalog::table_already_exists,
6	interface::{
7		ColumnPolicyKind, CommandTransaction, EncodableKey, Key, NamespaceId, NamespaceTableKey, SourceId,
8		TableDef, TableId, TableKey,
9	},
10	retention::RetentionPolicy,
11	return_error,
12};
13use reifydb_type::{OwnedFragment, TypeConstraint};
14
15use crate::{
16	CatalogStore,
17	store::{
18		column::{ColumnIndex, ColumnToCreate},
19		retention_policy::create::create_source_retention_policy,
20		sequence::SystemSequence,
21		table::layout::{table, table_namespace},
22	},
23};
24
25#[derive(Debug, Clone)]
26pub struct TableColumnToCreate {
27	pub name: String,
28	pub constraint: TypeConstraint,
29	pub policies: Vec<ColumnPolicyKind>,
30	pub auto_increment: bool,
31	pub fragment: Option<OwnedFragment>,
32}
33
34#[derive(Debug, Clone)]
35pub struct TableToCreate {
36	pub fragment: Option<OwnedFragment>,
37	pub table: String,
38	pub namespace: NamespaceId,
39	pub columns: Vec<TableColumnToCreate>,
40	pub retention_policy: Option<RetentionPolicy>,
41}
42
43impl CatalogStore {
44	pub fn create_table(txn: &mut impl CommandTransaction, to_create: TableToCreate) -> crate::Result<TableDef> {
45		let namespace_id = to_create.namespace;
46
47		if let Some(table) = CatalogStore::find_table_by_name(txn, namespace_id, &to_create.table)? {
48			let namespace = CatalogStore::get_namespace(txn, namespace_id)?;
49			return_error!(table_already_exists(to_create.fragment, &namespace.name, &table.name));
50		}
51
52		let table_id = SystemSequence::next_table_id(txn)?;
53		Self::store_table(txn, table_id, namespace_id, &to_create)?;
54		Self::link_table_to_namespace(txn, namespace_id, table_id, &to_create.table)?;
55
56		if let Some(retention_policy) = &to_create.retention_policy {
57			create_source_retention_policy(txn, SourceId::Table(table_id), retention_policy)?;
58		}
59
60		Self::insert_columns(txn, table_id, to_create)?;
61
62		Ok(Self::get_table(txn, table_id)?)
63	}
64
65	fn store_table(
66		txn: &mut impl CommandTransaction,
67		table: TableId,
68		namespace: NamespaceId,
69		to_create: &TableToCreate,
70	) -> crate::Result<()> {
71		let mut row = table::LAYOUT.allocate();
72		table::LAYOUT.set_u64(&mut row, table::ID, table);
73		table::LAYOUT.set_u64(&mut row, table::NAMESPACE, namespace);
74		table::LAYOUT.set_utf8(&mut row, table::NAME, &to_create.table);
75
76		// Initialize with no primary key
77		table::LAYOUT.set_u64(&mut row, table::PRIMARY_KEY, 0u64);
78
79		txn.set(
80			&TableKey {
81				table,
82			}
83			.encode(),
84			row,
85		)?;
86
87		Ok(())
88	}
89
90	fn link_table_to_namespace(
91		txn: &mut impl CommandTransaction,
92		namespace: NamespaceId,
93		table: TableId,
94		name: &str,
95	) -> crate::Result<()> {
96		let mut row = table_namespace::LAYOUT.allocate();
97		table_namespace::LAYOUT.set_u64(&mut row, table_namespace::ID, table);
98		table_namespace::LAYOUT.set_utf8(&mut row, table_namespace::NAME, name);
99		txn.set(
100			&Key::NamespaceTable(NamespaceTableKey {
101				namespace,
102				table,
103			})
104			.encode(),
105			row,
106		)?;
107		Ok(())
108	}
109
110	fn insert_columns(
111		txn: &mut impl CommandTransaction,
112		table: TableId,
113		to_create: TableToCreate,
114	) -> crate::Result<()> {
115		// Look up namespace name for error messages
116		let namespace_name = Self::find_namespace(txn, to_create.namespace)?
117			.map(|s| s.name)
118			.unwrap_or_else(|| format!("namespace_{}", to_create.namespace));
119
120		for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
121			Self::create_column(
122				txn,
123				table,
124				ColumnToCreate {
125					fragment: column_to_create.fragment.clone(),
126					namespace_name: &namespace_name,
127					table,
128					table_name: &to_create.table,
129					column: column_to_create.name,
130					constraint: column_to_create.constraint.clone(),
131					if_not_exists: false,
132					policies: column_to_create.policies.clone(),
133					index: ColumnIndex(idx as u16),
134					auto_increment: column_to_create.auto_increment,
135				},
136			)?;
137		}
138		Ok(())
139	}
140}
141
142#[cfg(test)]
143mod tests {
144	use reifydb_core::interface::{MultiVersionQueryTransaction, NamespaceId, NamespaceTableKey, TableId};
145	use reifydb_engine::test_utils::create_test_command_transaction;
146
147	use crate::{
148		CatalogStore,
149		store::table::{TableToCreate, layout::table_namespace},
150		test_utils::ensure_test_namespace,
151	};
152
153	#[test]
154	fn test_create_table() {
155		let mut txn = create_test_command_transaction();
156
157		let test_namespace = ensure_test_namespace(&mut txn);
158
159		let to_create = TableToCreate {
160			namespace: test_namespace.id,
161			table: "test_table".to_string(),
162			columns: vec![],
163			fragment: None,
164			retention_policy: None,
165		};
166
167		// First creation should succeed
168		let result = CatalogStore::create_table(&mut txn, to_create.clone()).unwrap();
169		assert_eq!(result.id, TableId(1025));
170		assert_eq!(result.namespace, NamespaceId(1025));
171		assert_eq!(result.name, "test_table");
172
173		let err = CatalogStore::create_table(&mut txn, to_create).unwrap_err();
174		assert_eq!(err.diagnostic().code, "CA_003");
175	}
176
177	#[test]
178	fn test_table_linked_to_namespace() {
179		let mut txn = create_test_command_transaction();
180		let test_namespace = ensure_test_namespace(&mut txn);
181
182		let to_create = TableToCreate {
183			namespace: test_namespace.id,
184			table: "test_table".to_string(),
185			columns: vec![],
186			fragment: None,
187			retention_policy: None,
188		};
189
190		CatalogStore::create_table(&mut txn, to_create).unwrap();
191
192		let to_create = TableToCreate {
193			namespace: test_namespace.id,
194			table: "another_table".to_string(),
195			columns: vec![],
196			fragment: None,
197			retention_policy: None,
198		};
199
200		CatalogStore::create_table(&mut txn, to_create).unwrap();
201
202		let links = txn.range(NamespaceTableKey::full_scan(test_namespace.id)).unwrap().collect::<Vec<_>>();
203		assert_eq!(links.len(), 2);
204
205		let link = &links[1];
206		let row = &link.values;
207		assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1025);
208		assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "test_table");
209
210		let link = &links[0];
211		let row = &link.values;
212		assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1026);
213		assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "another_table");
214	}
215}