1use reifydb_core::{
5 diagnostic::catalog::{primary_key_column_not_found, primary_key_empty},
6 interface::{ColumnId, CommandTransaction, PrimaryKeyId, PrimaryKeyKey, SourceId},
7 return_error, return_internal_error,
8};
9use reifydb_type::Fragment;
10
11use crate::{
12 CatalogStore,
13 store::{
14 primary_key::layout::{
15 primary_key,
16 primary_key::{LAYOUT, serialize_column_ids},
17 },
18 sequence::SystemSequence,
19 },
20};
21
22pub struct PrimaryKeyToCreate {
23 pub source: SourceId,
24 pub column_ids: Vec<ColumnId>,
25}
26
27impl CatalogStore {
28 pub async fn create_primary_key(
29 txn: &mut impl CommandTransaction,
30 to_create: PrimaryKeyToCreate,
31 ) -> crate::Result<PrimaryKeyId> {
32 if to_create.column_ids.is_empty() {
34 return_error!(primary_key_empty(Fragment::None));
35 }
36
37 let source_columns = Self::list_columns(txn, to_create.source).await?;
40 let source_column_ids: std::collections::HashSet<_> = source_columns.iter().map(|c| c.id).collect();
41
42 for column_id in &to_create.column_ids {
44 if !source_column_ids.contains(column_id) {
45 return_error!(primary_key_column_not_found(Fragment::None, column_id.0));
46 }
47 }
48
49 let id = SystemSequence::next_primary_key_id(txn).await?;
50
51 let mut row = LAYOUT.allocate();
53 LAYOUT.set_u64(&mut row, primary_key::ID, id.0);
54 LAYOUT.set_u64(&mut row, primary_key::SOURCE, to_create.source.as_u64());
55 LAYOUT.set_blob(&mut row, primary_key::COLUMN_IDS, &serialize_column_ids(&to_create.column_ids));
56
57 txn.set(&PrimaryKeyKey::encoded(id), row).await?;
59
60 match to_create.source {
62 SourceId::Table(table_id) => {
63 Self::set_table_primary_key(txn, table_id, id).await?;
64 }
65 SourceId::View(view_id) => {
66 Self::set_view_primary_key(txn, view_id, id).await?;
67 }
68 SourceId::Flow(_) => {
69 return_internal_error!(
71 "Cannot create primary key for flow. Flows do not support primary keys."
72 );
73 }
74 SourceId::TableVirtual(_) => {
75 return_internal_error!(
77 "Cannot create primary key for virtual table. Virtual tables do not support primary keys."
78 );
79 }
80 SourceId::RingBuffer(ringbuffer_id) => {
81 Self::set_ringbuffer_primary_key(txn, ringbuffer_id, id).await?;
82 }
83 SourceId::Dictionary(_) => {
84 return_internal_error!(
86 "Cannot create primary key for dictionary. Dictionaries have their own key structure."
87 );
88 }
89 }
90
91 Ok(id)
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use reifydb_core::interface::{ColumnId, PrimaryKeyId, SourceId, TableId, ViewId};
98 use reifydb_engine::test_utils::create_test_command_transaction;
99 use reifydb_type::{Type, TypeConstraint};
100
101 use super::PrimaryKeyToCreate;
102 use crate::{
103 CatalogStore,
104 column::{ColumnIndex, ColumnToCreate},
105 table::TableToCreate,
106 test_utils::{ensure_test_namespace, ensure_test_table},
107 view::{ViewColumnToCreate, ViewToCreate},
108 };
109
110 #[tokio::test]
111 async fn test_create_primary_key_for_table() {
112 let mut txn = create_test_command_transaction().await;
113 let table = ensure_test_table(&mut txn).await;
114
115 let col1 = CatalogStore::create_column(
117 &mut txn,
118 table.id,
119 ColumnToCreate {
120 fragment: None,
121 namespace_name: "test_namespace".to_string(),
122 table: table.id,
123 table_name: "test_table".to_string(),
124 column: "id".to_string(),
125 constraint: TypeConstraint::unconstrained(Type::Uint8),
126 if_not_exists: false,
127 policies: vec![],
128 index: ColumnIndex(0),
129 auto_increment: true,
130 dictionary_id: None,
131 },
132 )
133 .await
134 .unwrap();
135
136 let col2 = CatalogStore::create_column(
137 &mut txn,
138 table.id,
139 ColumnToCreate {
140 fragment: None,
141 namespace_name: "test_namespace".to_string(),
142 table: table.id,
143 table_name: "test_table".to_string(),
144 column: "tenant_id".to_string(),
145 constraint: TypeConstraint::unconstrained(Type::Uint8),
146 if_not_exists: false,
147 policies: vec![],
148 index: ColumnIndex(1),
149 auto_increment: false,
150 dictionary_id: None,
151 },
152 )
153 .await
154 .unwrap();
155
156 let primary_key_id = CatalogStore::create_primary_key(
158 &mut txn,
159 PrimaryKeyToCreate {
160 source: SourceId::Table(table.id),
161 column_ids: vec![col1.id, col2.id],
162 },
163 )
164 .await
165 .unwrap();
166
167 assert_eq!(primary_key_id, PrimaryKeyId(1));
169
170 let found_pk = CatalogStore::find_primary_key(&mut txn, table.id)
172 .await
173 .unwrap()
174 .expect("Primary key should exist");
175
176 assert_eq!(found_pk.id, primary_key_id);
177 assert_eq!(found_pk.columns.len(), 2);
178 assert_eq!(found_pk.columns[0].id, col1.id);
179 assert_eq!(found_pk.columns[0].name, "id");
180 assert_eq!(found_pk.columns[1].id, col2.id);
181 assert_eq!(found_pk.columns[1].name, "tenant_id");
182 }
183
184 #[tokio::test]
185 async fn test_create_primary_key_for_view() {
186 let mut txn = create_test_command_transaction().await;
187 let namespace = ensure_test_namespace(&mut txn).await;
188
189 let view = CatalogStore::create_deferred_view(
191 &mut txn,
192 ViewToCreate {
193 fragment: None,
194 namespace: namespace.id,
195 name: "test_view".to_string(),
196 columns: vec![
197 ViewColumnToCreate {
198 name: "id".to_string(),
199 constraint: TypeConstraint::unconstrained(Type::Uint8),
200 fragment: None,
201 },
202 ViewColumnToCreate {
203 name: "name".to_string(),
204 constraint: TypeConstraint::unconstrained(Type::Utf8),
205 fragment: None,
206 },
207 ],
208 },
209 )
210 .await
211 .unwrap();
212
213 let columns = CatalogStore::list_columns(&mut txn, view.id).await.unwrap();
215 assert_eq!(columns.len(), 2);
216
217 let primary_key_id = CatalogStore::create_primary_key(
219 &mut txn,
220 PrimaryKeyToCreate {
221 source: SourceId::View(view.id),
222 column_ids: vec![columns[0].id],
223 },
224 )
225 .await
226 .unwrap();
227
228 assert_eq!(primary_key_id, PrimaryKeyId(1));
230
231 let found_pk = CatalogStore::find_primary_key(&mut txn, view.id)
233 .await
234 .unwrap()
235 .expect("Primary key should exist");
236
237 assert_eq!(found_pk.id, primary_key_id);
238 assert_eq!(found_pk.columns.len(), 1);
239 assert_eq!(found_pk.columns[0].id, columns[0].id);
240 assert_eq!(found_pk.columns[0].name, "id");
241 }
242
243 #[tokio::test]
244 async fn test_create_composite_primary_key() {
245 let mut txn = create_test_command_transaction().await;
246 let table = ensure_test_table(&mut txn).await;
247
248 let mut column_ids = Vec::new();
250 for i in 0..3 {
251 let col = CatalogStore::create_column(
252 &mut txn,
253 table.id,
254 ColumnToCreate {
255 fragment: None,
256 namespace_name: "test_namespace".to_string(),
257 table: table.id,
258 table_name: "test_table".to_string(),
259 column: format!("col_{}", i),
260 constraint: TypeConstraint::unconstrained(Type::Uint8),
261 if_not_exists: false,
262 policies: vec![],
263 index: ColumnIndex(i as u8),
264 auto_increment: false,
265 dictionary_id: None,
266 },
267 )
268 .await
269 .unwrap();
270 column_ids.push(col.id);
271 }
272
273 let primary_key_id = CatalogStore::create_primary_key(
275 &mut txn,
276 PrimaryKeyToCreate {
277 source: SourceId::Table(table.id),
278 column_ids: column_ids.clone(),
279 },
280 )
281 .await
282 .unwrap();
283
284 let found_pk = CatalogStore::find_primary_key(&mut txn, table.id)
286 .await
287 .unwrap()
288 .expect("Primary key should exist");
289
290 assert_eq!(found_pk.id, primary_key_id);
291 assert_eq!(found_pk.columns.len(), 3);
292 for (i, col) in found_pk.columns.iter().enumerate() {
293 assert_eq!(col.id, column_ids[i]);
294 assert_eq!(col.name, format!("col_{}", i));
295 }
296 }
297
298 #[tokio::test]
299 async fn test_create_primary_key_updates_table() {
300 let mut txn = create_test_command_transaction().await;
301 let table = ensure_test_table(&mut txn).await;
302
303 let initial_pk = CatalogStore::find_primary_key(&mut txn, table.id).await.unwrap();
305 assert!(initial_pk.is_none());
306
307 let col = CatalogStore::create_column(
309 &mut txn,
310 table.id,
311 ColumnToCreate {
312 fragment: None,
313 namespace_name: "test_namespace".to_string(),
314 table: table.id,
315 table_name: "test_table".to_string(),
316 column: "id".to_string(),
317 constraint: TypeConstraint::unconstrained(Type::Uint8),
318 if_not_exists: false,
319 policies: vec![],
320 index: ColumnIndex(0),
321 auto_increment: true,
322 dictionary_id: None,
323 },
324 )
325 .await
326 .unwrap();
327
328 let primary_key_id = CatalogStore::create_primary_key(
330 &mut txn,
331 PrimaryKeyToCreate {
332 source: SourceId::Table(table.id),
333 column_ids: vec![col.id],
334 },
335 )
336 .await
337 .unwrap();
338
339 let updated_pk = CatalogStore::find_primary_key(&mut txn, table.id)
341 .await
342 .unwrap()
343 .expect("Primary key should exist");
344
345 assert_eq!(updated_pk.id, primary_key_id);
346 }
347
348 #[tokio::test]
349 async fn test_create_primary_key_on_nonexistent_table() {
350 let mut txn = create_test_command_transaction().await;
351
352 let result = CatalogStore::create_primary_key(
356 &mut txn,
357 PrimaryKeyToCreate {
358 source: SourceId::Table(TableId(999)),
359 column_ids: vec![ColumnId(1)],
360 },
361 )
362 .await;
363
364 assert!(result.is_err());
365 let err = result.unwrap_err();
366 assert_eq!(err.code, "CA_021");
369 }
370
371 #[tokio::test]
372 async fn test_create_primary_key_on_nonexistent_view() {
373 let mut txn = create_test_command_transaction().await;
374
375 let result = CatalogStore::create_primary_key(
379 &mut txn,
380 PrimaryKeyToCreate {
381 source: SourceId::View(ViewId(999)),
382 column_ids: vec![ColumnId(1)],
383 },
384 )
385 .await;
386
387 assert!(result.is_err());
388 let err = result.unwrap_err();
389 assert_eq!(err.code, "CA_021");
392 }
393
394 #[tokio::test]
395 async fn test_create_empty_primary_key() {
396 let mut txn = create_test_command_transaction().await;
397 let table = ensure_test_table(&mut txn).await;
398
399 let result = CatalogStore::create_primary_key(
401 &mut txn,
402 PrimaryKeyToCreate {
403 source: SourceId::Table(table.id),
404 column_ids: vec![],
405 },
406 )
407 .await;
408
409 assert!(result.is_err());
410 let err = result.unwrap_err();
411 assert_eq!(err.code, "CA_020");
412 }
413
414 #[tokio::test]
415 async fn test_create_primary_key_with_nonexistent_column() {
416 let mut txn = create_test_command_transaction().await;
417 let table = ensure_test_table(&mut txn).await;
418
419 let result = CatalogStore::create_primary_key(
421 &mut txn,
422 PrimaryKeyToCreate {
423 source: SourceId::Table(table.id),
424 column_ids: vec![ColumnId(999)],
425 },
426 )
427 .await;
428
429 assert!(result.is_err());
430 let err = result.unwrap_err();
431 assert_eq!(err.code, "CA_021");
432 }
433
434 #[tokio::test]
435 async fn test_create_primary_key_with_column_from_different_table() {
436 let mut txn = create_test_command_transaction().await;
437 let table1 = ensure_test_table(&mut txn).await;
438
439 let _col1 = CatalogStore::create_column(
441 &mut txn,
442 table1.id,
443 ColumnToCreate {
444 fragment: None,
445 namespace_name: "test_namespace".to_string(),
446 table: table1.id,
447 table_name: "test_table".to_string(),
448 column: "id".to_string(),
449 constraint: TypeConstraint::unconstrained(Type::Uint8),
450 if_not_exists: false,
451 policies: vec![],
452 index: ColumnIndex(0),
453 auto_increment: false,
454 dictionary_id: None,
455 },
456 )
457 .await
458 .unwrap();
459
460 let namespace = CatalogStore::get_namespace(&mut txn, table1.namespace).await.unwrap();
462 let table2 = CatalogStore::create_table(
463 &mut txn,
464 TableToCreate {
465 fragment: None,
466 table: "test_table2".to_string(),
467 namespace: namespace.id,
468 columns: vec![],
469 retention_policy: None,
470 },
471 )
472 .await
473 .unwrap();
474
475 let col2 = CatalogStore::create_column(
477 &mut txn,
478 table2.id,
479 ColumnToCreate {
480 fragment: None,
481 namespace_name: "test_namespace".to_string(),
482 table: table2.id,
483 table_name: "test_table2".to_string(),
484 column: "id".to_string(),
485 constraint: TypeConstraint::unconstrained(Type::Uint8),
486 if_not_exists: false,
487 policies: vec![],
488 index: ColumnIndex(0),
489 auto_increment: false,
490 dictionary_id: None,
491 },
492 )
493 .await
494 .unwrap();
495
496 let result = CatalogStore::create_primary_key(
500 &mut txn,
501 PrimaryKeyToCreate {
502 source: SourceId::Table(table1.id),
503 column_ids: vec![col2.id],
504 },
505 )
506 .await;
507
508 assert!(result.is_err());
510 let err = result.unwrap_err();
511 assert_eq!(err.code, "CA_021");
512 }
513}