reifydb_catalog/store/table/
create.rs1use reifydb_core::{
5 diagnostic::catalog::table_already_exists,
6 interface::{
7 ColumnPolicyKind, CommandTransaction, EncodableKey, Key, NamespaceId, NamespaceTableKey, SourceId,
8 TableDef, TableId, TableKey,
9 },
10 retention::RetentionPolicy,
11 return_error,
12};
13use reifydb_type::{OwnedFragment, TypeConstraint};
14
15use crate::{
16 CatalogStore,
17 store::{
18 column::{ColumnIndex, ColumnToCreate},
19 retention_policy::create::create_source_retention_policy,
20 sequence::SystemSequence,
21 table::layout::{table, table_namespace},
22 },
23};
24
25#[derive(Debug, Clone)]
26pub struct TableColumnToCreate {
27 pub name: String,
28 pub constraint: TypeConstraint,
29 pub policies: Vec<ColumnPolicyKind>,
30 pub auto_increment: bool,
31 pub fragment: Option<OwnedFragment>,
32}
33
34#[derive(Debug, Clone)]
35pub struct TableToCreate {
36 pub fragment: Option<OwnedFragment>,
37 pub table: String,
38 pub namespace: NamespaceId,
39 pub columns: Vec<TableColumnToCreate>,
40 pub retention_policy: Option<RetentionPolicy>,
41}
42
43impl CatalogStore {
44 pub fn create_table(txn: &mut impl CommandTransaction, to_create: TableToCreate) -> crate::Result<TableDef> {
45 let namespace_id = to_create.namespace;
46
47 if let Some(table) = CatalogStore::find_table_by_name(txn, namespace_id, &to_create.table)? {
48 let namespace = CatalogStore::get_namespace(txn, namespace_id)?;
49 return_error!(table_already_exists(to_create.fragment, &namespace.name, &table.name));
50 }
51
52 let table_id = SystemSequence::next_table_id(txn)?;
53 Self::store_table(txn, table_id, namespace_id, &to_create)?;
54 Self::link_table_to_namespace(txn, namespace_id, table_id, &to_create.table)?;
55
56 if let Some(retention_policy) = &to_create.retention_policy {
57 create_source_retention_policy(txn, SourceId::Table(table_id), retention_policy)?;
58 }
59
60 Self::insert_columns(txn, table_id, to_create)?;
61
62 Ok(Self::get_table(txn, table_id)?)
63 }
64
65 fn store_table(
66 txn: &mut impl CommandTransaction,
67 table: TableId,
68 namespace: NamespaceId,
69 to_create: &TableToCreate,
70 ) -> crate::Result<()> {
71 let mut row = table::LAYOUT.allocate();
72 table::LAYOUT.set_u64(&mut row, table::ID, table);
73 table::LAYOUT.set_u64(&mut row, table::NAMESPACE, namespace);
74 table::LAYOUT.set_utf8(&mut row, table::NAME, &to_create.table);
75
76 table::LAYOUT.set_u64(&mut row, table::PRIMARY_KEY, 0u64);
78
79 txn.set(
80 &TableKey {
81 table,
82 }
83 .encode(),
84 row,
85 )?;
86
87 Ok(())
88 }
89
90 fn link_table_to_namespace(
91 txn: &mut impl CommandTransaction,
92 namespace: NamespaceId,
93 table: TableId,
94 name: &str,
95 ) -> crate::Result<()> {
96 let mut row = table_namespace::LAYOUT.allocate();
97 table_namespace::LAYOUT.set_u64(&mut row, table_namespace::ID, table);
98 table_namespace::LAYOUT.set_utf8(&mut row, table_namespace::NAME, name);
99 txn.set(
100 &Key::NamespaceTable(NamespaceTableKey {
101 namespace,
102 table,
103 })
104 .encode(),
105 row,
106 )?;
107 Ok(())
108 }
109
110 fn insert_columns(
111 txn: &mut impl CommandTransaction,
112 table: TableId,
113 to_create: TableToCreate,
114 ) -> crate::Result<()> {
115 let namespace_name = Self::find_namespace(txn, to_create.namespace)?
117 .map(|s| s.name)
118 .unwrap_or_else(|| format!("namespace_{}", to_create.namespace));
119
120 for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
121 Self::create_column(
122 txn,
123 table,
124 ColumnToCreate {
125 fragment: column_to_create.fragment.clone(),
126 namespace_name: &namespace_name,
127 table,
128 table_name: &to_create.table,
129 column: column_to_create.name,
130 constraint: column_to_create.constraint.clone(),
131 if_not_exists: false,
132 policies: column_to_create.policies.clone(),
133 index: ColumnIndex(idx as u16),
134 auto_increment: column_to_create.auto_increment,
135 },
136 )?;
137 }
138 Ok(())
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use reifydb_core::interface::{MultiVersionQueryTransaction, NamespaceId, NamespaceTableKey, TableId};
145 use reifydb_engine::test_utils::create_test_command_transaction;
146
147 use crate::{
148 CatalogStore,
149 store::table::{TableToCreate, layout::table_namespace},
150 test_utils::ensure_test_namespace,
151 };
152
153 #[test]
154 fn test_create_table() {
155 let mut txn = create_test_command_transaction();
156
157 let test_namespace = ensure_test_namespace(&mut txn);
158
159 let to_create = TableToCreate {
160 namespace: test_namespace.id,
161 table: "test_table".to_string(),
162 columns: vec![],
163 fragment: None,
164 retention_policy: None,
165 };
166
167 let result = CatalogStore::create_table(&mut txn, to_create.clone()).unwrap();
169 assert_eq!(result.id, TableId(1025));
170 assert_eq!(result.namespace, NamespaceId(1025));
171 assert_eq!(result.name, "test_table");
172
173 let err = CatalogStore::create_table(&mut txn, to_create).unwrap_err();
174 assert_eq!(err.diagnostic().code, "CA_003");
175 }
176
177 #[test]
178 fn test_table_linked_to_namespace() {
179 let mut txn = create_test_command_transaction();
180 let test_namespace = ensure_test_namespace(&mut txn);
181
182 let to_create = TableToCreate {
183 namespace: test_namespace.id,
184 table: "test_table".to_string(),
185 columns: vec![],
186 fragment: None,
187 retention_policy: None,
188 };
189
190 CatalogStore::create_table(&mut txn, to_create).unwrap();
191
192 let to_create = TableToCreate {
193 namespace: test_namespace.id,
194 table: "another_table".to_string(),
195 columns: vec![],
196 fragment: None,
197 retention_policy: None,
198 };
199
200 CatalogStore::create_table(&mut txn, to_create).unwrap();
201
202 let links = txn.range(NamespaceTableKey::full_scan(test_namespace.id)).unwrap().collect::<Vec<_>>();
203 assert_eq!(links.len(), 2);
204
205 let link = &links[1];
206 let row = &link.values;
207 assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1025);
208 assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "test_table");
209
210 let link = &links[0];
211 let row = &link.values;
212 assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1026);
213 assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "another_table");
214 }
215}