reifydb_catalog/materialized/
primary_key.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	CommitVersion,
6	interface::{PrimaryKeyDef, PrimaryKeyId},
7};
8
9use crate::materialized::{MaterializedCatalog, MultiVersionPrimaryKeyDef};
10
11impl MaterializedCatalog {
12	/// Find a primary key by ID at a specific version
13	pub fn find_primary_key(&self, primary_key_id: PrimaryKeyId, version: CommitVersion) -> Option<PrimaryKeyDef> {
14		self.primary_keys.get(&primary_key_id).and_then(|entry| {
15			let multi = entry.value();
16			multi.get(version)
17		})
18	}
19
20	/// Set or update a primary key at a specific version
21	pub fn set_primary_key(&self, id: PrimaryKeyId, version: CommitVersion, primary_key: Option<PrimaryKeyDef>) {
22		// Update the multi primary key
23		let multi = self.primary_keys.get_or_insert_with(id, MultiVersionPrimaryKeyDef::new);
24		if let Some(new) = primary_key {
25			multi.value().insert(version, new);
26		} else {
27			multi.value().remove(version);
28		}
29	}
30}
31
32#[cfg(test)]
33mod tests {
34	use reifydb_core::interface::{ColumnDef, ColumnId, ColumnIndex, PrimaryKeyId};
35	use reifydb_type::{Type, TypeConstraint};
36
37	use super::*;
38	use crate::MaterializedCatalog;
39
40	fn create_test_primary_key(id: PrimaryKeyId) -> PrimaryKeyDef {
41		PrimaryKeyDef {
42			id,
43			columns: vec![ColumnDef {
44				id: ColumnId(1),
45				name: "id".to_string(),
46				constraint: TypeConstraint::unconstrained(Type::Int4),
47				policies: vec![],
48				index: ColumnIndex(0),
49				auto_increment: true,
50				dictionary_id: None,
51			}],
52		}
53	}
54
55	#[test]
56	fn test_set_and_find_primary_key() {
57		let catalog = MaterializedCatalog::new();
58		let pk_id = PrimaryKeyId(1);
59		let primary_key = create_test_primary_key(pk_id);
60
61		// Set primary key at version 1
62		catalog.set_primary_key(pk_id, CommitVersion(1), Some(primary_key.clone()));
63
64		// Find primary key at version 1
65		let found = catalog.find_primary_key(pk_id, CommitVersion(1));
66		assert_eq!(found, Some(primary_key.clone()));
67
68		// Find primary key at later version (should return same primary
69		// key)
70		let found = catalog.find_primary_key(pk_id, CommitVersion(5));
71		assert_eq!(found, Some(primary_key));
72
73		// Primary key shouldn't exist at version 0
74		let found = catalog.find_primary_key(pk_id, CommitVersion(0));
75		assert_eq!(found, None);
76	}
77
78	#[test]
79	fn test_primary_key_update() {
80		let catalog = MaterializedCatalog::new();
81		let pk_id = PrimaryKeyId(1);
82
83		// Create initial primary key with one column
84		let pk_v1 = create_test_primary_key(pk_id);
85		catalog.set_primary_key(pk_id, CommitVersion(1), Some(pk_v1.clone()));
86
87		// Update primary key with two columns
88		let mut pk_v2 = pk_v1.clone();
89		pk_v2.columns.push(ColumnDef {
90			id: ColumnId(2),
91			name: "name".to_string(),
92			constraint: TypeConstraint::unconstrained(Type::Utf8),
93			policies: vec![],
94			index: ColumnIndex(1),
95			auto_increment: false,
96			dictionary_id: None,
97		});
98		catalog.set_primary_key(pk_id, CommitVersion(2), Some(pk_v2.clone()));
99
100		// Version 1 should have one column
101		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(1)).unwrap().columns.len(), 1);
102
103		// Version 2 should have two columns
104		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(2)).unwrap().columns.len(), 2);
105	}
106
107	#[test]
108	fn test_primary_key_deletion() {
109		let catalog = MaterializedCatalog::new();
110		let pk_id = PrimaryKeyId(1);
111		let primary_key = create_test_primary_key(pk_id);
112
113		// Set primary key
114		catalog.set_primary_key(pk_id, CommitVersion(1), Some(primary_key.clone()));
115
116		// Verify it exists
117		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(1)), Some(primary_key.clone()));
118
119		// Delete the primary key
120		catalog.set_primary_key(pk_id, CommitVersion(2), None);
121
122		// Should not exist at version 2
123		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(2)), None);
124
125		// Should still exist at version 1 (historical)
126		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(1)), Some(primary_key));
127	}
128
129	#[test]
130	fn test_primary_key_versioning() {
131		let catalog = MaterializedCatalog::new();
132		let pk_id = PrimaryKeyId(1);
133
134		// Create multiple versions
135		let pk_v1 = create_test_primary_key(pk_id);
136		let mut pk_v2 = pk_v1.clone();
137		pk_v2.columns[0].name = "pk_id".to_string();
138		let mut pk_v3 = pk_v2.clone();
139		pk_v3.columns[0].constraint = TypeConstraint::unconstrained(Type::Int8);
140
141		// Set at different versions
142		catalog.set_primary_key(pk_id, CommitVersion(10), Some(pk_v1.clone()));
143		catalog.set_primary_key(pk_id, CommitVersion(20), Some(pk_v2.clone()));
144		catalog.set_primary_key(pk_id, CommitVersion(30), Some(pk_v3.clone()));
145
146		// Query at different versions
147		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(5)), None);
148		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(10)), Some(pk_v1.clone()));
149		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(15)), Some(pk_v1));
150		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(20)), Some(pk_v2.clone()));
151		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(25)), Some(pk_v2));
152		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(30)), Some(pk_v3.clone()));
153		assert_eq!(catalog.find_primary_key(pk_id, CommitVersion(100)), Some(pk_v3));
154	}
155}