reifydb_catalog/store/primary_key/
create.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	diagnostic::catalog::{primary_key_column_not_found, primary_key_empty},
6	interface::{ColumnId, CommandTransaction, PrimaryKeyId, PrimaryKeyKey, SourceId},
7	return_error, return_internal_error,
8};
9use reifydb_type::Fragment;
10
11use crate::{
12	CatalogStore,
13	store::{
14		primary_key::layout::{
15			primary_key,
16			primary_key::{LAYOUT, serialize_column_ids},
17		},
18		sequence::SystemSequence,
19	},
20};
21
22pub struct PrimaryKeyToCreate {
23	pub source: SourceId,
24	pub column_ids: Vec<ColumnId>,
25}
26
27impl CatalogStore {
28	pub async fn create_primary_key(
29		txn: &mut impl CommandTransaction,
30		to_create: PrimaryKeyToCreate,
31	) -> crate::Result<PrimaryKeyId> {
32		// Validate that primary key has at least one column
33		if to_create.column_ids.is_empty() {
34			return_error!(primary_key_empty(Fragment::None));
35		}
36
37		// Get the columns for the table/view and validate all primary
38		// key columns belong to it
39		let source_columns = Self::list_columns(txn, to_create.source).await?;
40		let source_column_ids: std::collections::HashSet<_> = source_columns.iter().map(|c| c.id).collect();
41
42		// Validate that all columns belong to the table/view
43		for column_id in &to_create.column_ids {
44			if !source_column_ids.contains(column_id) {
45				return_error!(primary_key_column_not_found(Fragment::None, column_id.0));
46			}
47		}
48
49		let id = SystemSequence::next_primary_key_id(txn).await?;
50
51		// Create primary key encoded
52		let mut row = LAYOUT.allocate();
53		LAYOUT.set_u64(&mut row, primary_key::ID, id.0);
54		LAYOUT.set_u64(&mut row, primary_key::SOURCE, to_create.source.as_u64());
55		LAYOUT.set_blob(&mut row, primary_key::COLUMN_IDS, &serialize_column_ids(&to_create.column_ids));
56
57		// Store the primary key
58		txn.set(&PrimaryKeyKey::encoded(id), row).await?;
59
60		// Update the table or view to reference this primary key
61		match to_create.source {
62			SourceId::Table(table_id) => {
63				Self::set_table_primary_key(txn, table_id, id).await?;
64			}
65			SourceId::View(view_id) => {
66				Self::set_view_primary_key(txn, view_id, id).await?;
67			}
68			SourceId::Flow(_) => {
69				// Flows don't support primary keys
70				return_internal_error!(
71					"Cannot create primary key for flow. Flows do not support primary keys."
72				);
73			}
74			SourceId::TableVirtual(_) => {
75				// Virtual tables don't support primary keys
76				return_internal_error!(
77					"Cannot create primary key for virtual table. Virtual tables do not support primary keys."
78				);
79			}
80			SourceId::RingBuffer(ringbuffer_id) => {
81				Self::set_ringbuffer_primary_key(txn, ringbuffer_id, id).await?;
82			}
83			SourceId::Dictionary(_) => {
84				// Dictionaries don't support traditional primary keys
85				return_internal_error!(
86					"Cannot create primary key for dictionary. Dictionaries have their own key structure."
87				);
88			}
89		}
90
91		Ok(id)
92	}
93}
94
95#[cfg(test)]
96mod tests {
97	use reifydb_core::interface::{ColumnId, PrimaryKeyId, SourceId, TableId, ViewId};
98	use reifydb_engine::test_utils::create_test_command_transaction;
99	use reifydb_type::{Type, TypeConstraint};
100
101	use super::PrimaryKeyToCreate;
102	use crate::{
103		CatalogStore,
104		column::{ColumnIndex, ColumnToCreate},
105		table::TableToCreate,
106		test_utils::{ensure_test_namespace, ensure_test_table},
107		view::{ViewColumnToCreate, ViewToCreate},
108	};
109
110	#[tokio::test]
111	async fn test_create_primary_key_for_table() {
112		let mut txn = create_test_command_transaction().await;
113		let table = ensure_test_table(&mut txn).await;
114
115		// Create columns for the table
116		let col1 = CatalogStore::create_column(
117			&mut txn,
118			table.id,
119			ColumnToCreate {
120				fragment: None,
121				namespace_name: "test_namespace".to_string(),
122				table: table.id,
123				table_name: "test_table".to_string(),
124				column: "id".to_string(),
125				constraint: TypeConstraint::unconstrained(Type::Uint8),
126				if_not_exists: false,
127				policies: vec![],
128				index: ColumnIndex(0),
129				auto_increment: true,
130				dictionary_id: None,
131			},
132		)
133		.await
134		.unwrap();
135
136		let col2 = CatalogStore::create_column(
137			&mut txn,
138			table.id,
139			ColumnToCreate {
140				fragment: None,
141				namespace_name: "test_namespace".to_string(),
142				table: table.id,
143				table_name: "test_table".to_string(),
144				column: "tenant_id".to_string(),
145				constraint: TypeConstraint::unconstrained(Type::Uint8),
146				if_not_exists: false,
147				policies: vec![],
148				index: ColumnIndex(1),
149				auto_increment: false,
150				dictionary_id: None,
151			},
152		)
153		.await
154		.unwrap();
155
156		// Create primary key
157		let primary_key_id = CatalogStore::create_primary_key(
158			&mut txn,
159			PrimaryKeyToCreate {
160				source: SourceId::Table(table.id),
161				column_ids: vec![col1.id, col2.id],
162			},
163		)
164		.await
165		.unwrap();
166
167		// Verify the primary key was created
168		assert_eq!(primary_key_id, PrimaryKeyId(1));
169
170		// Find and verify the primary key
171		let found_pk = CatalogStore::find_primary_key(&mut txn, table.id)
172			.await
173			.unwrap()
174			.expect("Primary key should exist");
175
176		assert_eq!(found_pk.id, primary_key_id);
177		assert_eq!(found_pk.columns.len(), 2);
178		assert_eq!(found_pk.columns[0].id, col1.id);
179		assert_eq!(found_pk.columns[0].name, "id");
180		assert_eq!(found_pk.columns[1].id, col2.id);
181		assert_eq!(found_pk.columns[1].name, "tenant_id");
182	}
183
184	#[tokio::test]
185	async fn test_create_primary_key_for_view() {
186		let mut txn = create_test_command_transaction().await;
187		let namespace = ensure_test_namespace(&mut txn).await;
188
189		// Create a view
190		let view = CatalogStore::create_deferred_view(
191			&mut txn,
192			ViewToCreate {
193				fragment: None,
194				namespace: namespace.id,
195				name: "test_view".to_string(),
196				columns: vec![
197					ViewColumnToCreate {
198						name: "id".to_string(),
199						constraint: TypeConstraint::unconstrained(Type::Uint8),
200						fragment: None,
201					},
202					ViewColumnToCreate {
203						name: "name".to_string(),
204						constraint: TypeConstraint::unconstrained(Type::Utf8),
205						fragment: None,
206					},
207				],
208			},
209		)
210		.await
211		.unwrap();
212
213		// Get column IDs for the view
214		let columns = CatalogStore::list_columns(&mut txn, view.id).await.unwrap();
215		assert_eq!(columns.len(), 2);
216
217		// Create primary key on first column only
218		let primary_key_id = CatalogStore::create_primary_key(
219			&mut txn,
220			PrimaryKeyToCreate {
221				source: SourceId::View(view.id),
222				column_ids: vec![columns[0].id],
223			},
224		)
225		.await
226		.unwrap();
227
228		// Verify the primary key was created
229		assert_eq!(primary_key_id, PrimaryKeyId(1));
230
231		// Find and verify the primary key
232		let found_pk = CatalogStore::find_primary_key(&mut txn, view.id)
233			.await
234			.unwrap()
235			.expect("Primary key should exist");
236
237		assert_eq!(found_pk.id, primary_key_id);
238		assert_eq!(found_pk.columns.len(), 1);
239		assert_eq!(found_pk.columns[0].id, columns[0].id);
240		assert_eq!(found_pk.columns[0].name, "id");
241	}
242
243	#[tokio::test]
244	async fn test_create_composite_primary_key() {
245		let mut txn = create_test_command_transaction().await;
246		let table = ensure_test_table(&mut txn).await;
247
248		// Create multiple columns
249		let mut column_ids = Vec::new();
250		for i in 0..3 {
251			let col = CatalogStore::create_column(
252				&mut txn,
253				table.id,
254				ColumnToCreate {
255					fragment: None,
256					namespace_name: "test_namespace".to_string(),
257					table: table.id,
258					table_name: "test_table".to_string(),
259					column: format!("col_{}", i),
260					constraint: TypeConstraint::unconstrained(Type::Uint8),
261					if_not_exists: false,
262					policies: vec![],
263					index: ColumnIndex(i as u8),
264					auto_increment: false,
265					dictionary_id: None,
266				},
267			)
268			.await
269			.unwrap();
270			column_ids.push(col.id);
271		}
272
273		// Create composite primary key
274		let primary_key_id = CatalogStore::create_primary_key(
275			&mut txn,
276			PrimaryKeyToCreate {
277				source: SourceId::Table(table.id),
278				column_ids: column_ids.clone(),
279			},
280		)
281		.await
282		.unwrap();
283
284		// Find and verify the primary key
285		let found_pk = CatalogStore::find_primary_key(&mut txn, table.id)
286			.await
287			.unwrap()
288			.expect("Primary key should exist");
289
290		assert_eq!(found_pk.id, primary_key_id);
291		assert_eq!(found_pk.columns.len(), 3);
292		for (i, col) in found_pk.columns.iter().enumerate() {
293			assert_eq!(col.id, column_ids[i]);
294			assert_eq!(col.name, format!("col_{}", i));
295		}
296	}
297
298	#[tokio::test]
299	async fn test_create_primary_key_updates_table() {
300		let mut txn = create_test_command_transaction().await;
301		let table = ensure_test_table(&mut txn).await;
302
303		// Initially, table does not have primary key
304		let initial_pk = CatalogStore::find_primary_key(&mut txn, table.id).await.unwrap();
305		assert!(initial_pk.is_none());
306
307		// Create a column
308		let col = CatalogStore::create_column(
309			&mut txn,
310			table.id,
311			ColumnToCreate {
312				fragment: None,
313				namespace_name: "test_namespace".to_string(),
314				table: table.id,
315				table_name: "test_table".to_string(),
316				column: "id".to_string(),
317				constraint: TypeConstraint::unconstrained(Type::Uint8),
318				if_not_exists: false,
319				policies: vec![],
320				index: ColumnIndex(0),
321				auto_increment: true,
322				dictionary_id: None,
323			},
324		)
325		.await
326		.unwrap();
327
328		// Create primary key
329		let primary_key_id = CatalogStore::create_primary_key(
330			&mut txn,
331			PrimaryKeyToCreate {
332				source: SourceId::Table(table.id),
333				column_ids: vec![col.id],
334			},
335		)
336		.await
337		.unwrap();
338
339		// Now table should have the primary key
340		let updated_pk = CatalogStore::find_primary_key(&mut txn, table.id)
341			.await
342			.unwrap()
343			.expect("Primary key should exist");
344
345		assert_eq!(updated_pk.id, primary_key_id);
346	}
347
348	#[tokio::test]
349	async fn test_create_primary_key_on_nonexistent_table() {
350		let mut txn = create_test_command_transaction().await;
351
352		// Try to create primary key on non-existent table
353		// list_table_columns will return empty list for non-existent
354		// table, so the column validation will fail
355		let result = CatalogStore::create_primary_key(
356			&mut txn,
357			PrimaryKeyToCreate {
358				source: SourceId::Table(TableId(999)),
359				column_ids: vec![ColumnId(1)],
360			},
361		)
362		.await;
363
364		assert!(result.is_err());
365		let err = result.unwrap_err();
366		// Fails with CA_021 because column 1 won't be in the empty
367		// column list
368		assert_eq!(err.code, "CA_021");
369	}
370
371	#[tokio::test]
372	async fn test_create_primary_key_on_nonexistent_view() {
373		let mut txn = create_test_command_transaction().await;
374
375		// Try to create primary key on non-existent view
376		// list_table_columns will return empty list for non-existent
377		// view, so the column validation will fail
378		let result = CatalogStore::create_primary_key(
379			&mut txn,
380			PrimaryKeyToCreate {
381				source: SourceId::View(ViewId(999)),
382				column_ids: vec![ColumnId(1)],
383			},
384		)
385		.await;
386
387		assert!(result.is_err());
388		let err = result.unwrap_err();
389		// Fails with CA_021 because column 1 won't be in the empty
390		// column list
391		assert_eq!(err.code, "CA_021");
392	}
393
394	#[tokio::test]
395	async fn test_create_empty_primary_key() {
396		let mut txn = create_test_command_transaction().await;
397		let table = ensure_test_table(&mut txn).await;
398
399		// Try to create primary key with no columns - should fail
400		let result = CatalogStore::create_primary_key(
401			&mut txn,
402			PrimaryKeyToCreate {
403				source: SourceId::Table(table.id),
404				column_ids: vec![],
405			},
406		)
407		.await;
408
409		assert!(result.is_err());
410		let err = result.unwrap_err();
411		assert_eq!(err.code, "CA_020");
412	}
413
414	#[tokio::test]
415	async fn test_create_primary_key_with_nonexistent_column() {
416		let mut txn = create_test_command_transaction().await;
417		let table = ensure_test_table(&mut txn).await;
418
419		// Try to create primary key with non-existent column ID
420		let result = CatalogStore::create_primary_key(
421			&mut txn,
422			PrimaryKeyToCreate {
423				source: SourceId::Table(table.id),
424				column_ids: vec![ColumnId(999)],
425			},
426		)
427		.await;
428
429		assert!(result.is_err());
430		let err = result.unwrap_err();
431		assert_eq!(err.code, "CA_021");
432	}
433
434	#[tokio::test]
435	async fn test_create_primary_key_with_column_from_different_table() {
436		let mut txn = create_test_command_transaction().await;
437		let table1 = ensure_test_table(&mut txn).await;
438
439		// Create a column for table1
440		let _col1 = CatalogStore::create_column(
441			&mut txn,
442			table1.id,
443			ColumnToCreate {
444				fragment: None,
445				namespace_name: "test_namespace".to_string(),
446				table: table1.id,
447				table_name: "test_table".to_string(),
448				column: "id".to_string(),
449				constraint: TypeConstraint::unconstrained(Type::Uint8),
450				if_not_exists: false,
451				policies: vec![],
452				index: ColumnIndex(0),
453				auto_increment: false,
454				dictionary_id: None,
455			},
456		)
457		.await
458		.unwrap();
459
460		// Create another table
461		let namespace = CatalogStore::get_namespace(&mut txn, table1.namespace).await.unwrap();
462		let table2 = CatalogStore::create_table(
463			&mut txn,
464			TableToCreate {
465				fragment: None,
466				table: "test_table2".to_string(),
467				namespace: namespace.id,
468				columns: vec![],
469				retention_policy: None,
470			},
471		)
472		.await
473		.unwrap();
474
475		// Create a column for table2
476		let col2 = CatalogStore::create_column(
477			&mut txn,
478			table2.id,
479			ColumnToCreate {
480				fragment: None,
481				namespace_name: "test_namespace".to_string(),
482				table: table2.id,
483				table_name: "test_table2".to_string(),
484				column: "id".to_string(),
485				constraint: TypeConstraint::unconstrained(Type::Uint8),
486				if_not_exists: false,
487				policies: vec![],
488				index: ColumnIndex(0),
489				auto_increment: false,
490				dictionary_id: None,
491			},
492		)
493		.await
494		.unwrap();
495
496		// Try to create primary key for table1 using column from table2
497		// This must fail because we validate columns belong to the
498		// specific table
499		let result = CatalogStore::create_primary_key(
500			&mut txn,
501			PrimaryKeyToCreate {
502				source: SourceId::Table(table1.id),
503				column_ids: vec![col2.id],
504			},
505		)
506		.await;
507
508		// Should fail with CA_021 because col2 doesn't belong to table1
509		assert!(result.is_err());
510		let err = result.unwrap_err();
511		assert_eq!(err.code, "CA_021");
512	}
513}