reifydb_catalog/store/table/
create.rs1use reifydb_core::{
5 diagnostic::catalog::table_already_exists,
6 interface::{
7 ColumnPolicyKind, CommandTransaction, DictionaryId, NamespaceId, NamespaceTableKey, SourceId, TableDef,
8 TableId, TableKey,
9 },
10 retention::RetentionPolicy,
11 return_error,
12};
13use reifydb_type::{Fragment, TypeConstraint};
14
15use crate::{
16 CatalogStore,
17 store::{
18 column::{ColumnIndex, ColumnToCreate},
19 retention_policy::create::create_source_retention_policy,
20 sequence::SystemSequence,
21 table::layout::{table, table_namespace},
22 },
23};
24
25#[derive(Debug, Clone)]
26pub struct TableColumnToCreate {
27 pub name: String,
28 pub constraint: TypeConstraint,
29 pub policies: Vec<ColumnPolicyKind>,
30 pub auto_increment: bool,
31 pub fragment: Option<Fragment>,
32 pub dictionary_id: Option<DictionaryId>,
33}
34
35#[derive(Debug, Clone)]
36pub struct TableToCreate {
37 pub fragment: Option<Fragment>,
38 pub table: String,
39 pub namespace: NamespaceId,
40 pub columns: Vec<TableColumnToCreate>,
41 pub retention_policy: Option<RetentionPolicy>,
42}
43
44impl CatalogStore {
45 pub async fn create_table(
46 txn: &mut impl CommandTransaction,
47 to_create: TableToCreate,
48 ) -> crate::Result<TableDef> {
49 let namespace_id = to_create.namespace;
50
51 if let Some(table) = CatalogStore::find_table_by_name(txn, namespace_id, &to_create.table).await? {
52 let namespace = CatalogStore::get_namespace(txn, namespace_id).await?;
53 return_error!(table_already_exists(
54 to_create.fragment.unwrap_or_else(|| Fragment::None),
55 &namespace.name,
56 &table.name
57 ));
58 }
59
60 let table_id = SystemSequence::next_table_id(txn).await?;
61 Self::store_table(txn, table_id, namespace_id, &to_create).await?;
62 Self::link_table_to_namespace(txn, namespace_id, table_id, &to_create.table).await?;
63
64 if let Some(retention_policy) = &to_create.retention_policy {
65 create_source_retention_policy(txn, SourceId::Table(table_id), retention_policy).await?;
66 }
67
68 Self::insert_columns(txn, table_id, to_create).await?;
69
70 Ok(Self::get_table(txn, table_id).await?)
71 }
72
73 async fn store_table(
74 txn: &mut impl CommandTransaction,
75 table: TableId,
76 namespace: NamespaceId,
77 to_create: &TableToCreate,
78 ) -> crate::Result<()> {
79 let mut row = table::LAYOUT.allocate();
80 table::LAYOUT.set_u64(&mut row, table::ID, table);
81 table::LAYOUT.set_u64(&mut row, table::NAMESPACE, namespace);
82 table::LAYOUT.set_utf8(&mut row, table::NAME, &to_create.table);
83
84 table::LAYOUT.set_u64(&mut row, table::PRIMARY_KEY, 0u64);
86
87 txn.set(&TableKey::encoded(table), row).await?;
88
89 Ok(())
90 }
91
92 async fn link_table_to_namespace(
93 txn: &mut impl CommandTransaction,
94 namespace: NamespaceId,
95 table: TableId,
96 name: &str,
97 ) -> crate::Result<()> {
98 let mut row = table_namespace::LAYOUT.allocate();
99 table_namespace::LAYOUT.set_u64(&mut row, table_namespace::ID, table);
100 table_namespace::LAYOUT.set_utf8(&mut row, table_namespace::NAME, name);
101 txn.set(&NamespaceTableKey::encoded(namespace, table), row).await?;
102 Ok(())
103 }
104
105 async fn insert_columns(
106 txn: &mut impl CommandTransaction,
107 table: TableId,
108 to_create: TableToCreate,
109 ) -> crate::Result<()> {
110 let namespace_name = Self::find_namespace(txn, to_create.namespace)
112 .await?
113 .map(|s| s.name)
114 .unwrap_or_else(|| format!("namespace_{}", to_create.namespace));
115
116 for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
117 Self::create_column(
118 txn,
119 table,
120 ColumnToCreate {
121 fragment: column_to_create.fragment.clone(),
122 namespace_name: namespace_name.clone(),
123 table,
124 table_name: to_create.table.clone(),
125 column: column_to_create.name,
126 constraint: column_to_create.constraint.clone(),
127 if_not_exists: false,
128 policies: column_to_create.policies.clone(),
129 index: ColumnIndex(idx as u8),
130 auto_increment: column_to_create.auto_increment,
131 dictionary_id: column_to_create.dictionary_id,
132 },
133 )
134 .await?;
135 }
136 Ok(())
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use reifydb_core::interface::{MultiVersionQueryTransaction, NamespaceId, NamespaceTableKey, TableId};
143 use reifydb_engine::test_utils::create_test_command_transaction;
144
145 use crate::{
146 CatalogStore,
147 store::table::{TableToCreate, layout::table_namespace},
148 test_utils::ensure_test_namespace,
149 };
150
151 #[tokio::test]
152 async fn test_create_table() {
153 let mut txn = create_test_command_transaction().await;
154
155 let test_namespace = ensure_test_namespace(&mut txn).await;
156
157 let to_create = TableToCreate {
158 namespace: test_namespace.id,
159 table: "test_table".to_string(),
160 columns: vec![],
161 fragment: None,
162 retention_policy: None,
163 };
164
165 let result = CatalogStore::create_table(&mut txn, to_create.clone()).await.unwrap();
167 assert_eq!(result.id, TableId(1025));
168 assert_eq!(result.namespace, NamespaceId(1025));
169 assert_eq!(result.name, "test_table");
170
171 let err = CatalogStore::create_table(&mut txn, to_create).await.unwrap_err();
172 assert_eq!(err.diagnostic().code, "CA_003");
173 }
174
175 #[tokio::test]
176 async fn test_table_linked_to_namespace() {
177 let mut txn = create_test_command_transaction().await;
178 let test_namespace = ensure_test_namespace(&mut txn).await;
179
180 let to_create = TableToCreate {
181 namespace: test_namespace.id,
182 table: "test_table".to_string(),
183 columns: vec![],
184 fragment: None,
185 retention_policy: None,
186 };
187
188 CatalogStore::create_table(&mut txn, to_create).await.unwrap();
189
190 let to_create = TableToCreate {
191 namespace: test_namespace.id,
192 table: "another_table".to_string(),
193 columns: vec![],
194 fragment: None,
195 retention_policy: None,
196 };
197
198 CatalogStore::create_table(&mut txn, to_create).await.unwrap();
199
200 let links = txn
201 .range(NamespaceTableKey::full_scan(test_namespace.id))
202 .await
203 .unwrap()
204 .items
205 .into_iter()
206 .collect::<Vec<_>>();
207 assert_eq!(links.len(), 2);
208
209 let link = &links[1];
210 let row = &link.values;
211 assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1025);
212 assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "test_table");
213
214 let link = &links[0];
215 let row = &link.values;
216 assert_eq!(table_namespace::LAYOUT.get_u64(row, table_namespace::ID), 1026);
217 assert_eq!(table_namespace::LAYOUT.get_utf8(row, table_namespace::NAME), "another_table");
218 }
219}