1use std::collections::HashMap;
4
5use wasm_dbms_api::memory::MemoryError;
6use wasm_dbms_api::prelude::{
7 DEFAULT_ALIGNMENT, DataSize, Encode, MSize, MemoryResult, Page, PageOffset, TableFingerprint,
8 TableSchema, TableSchemaSnapshot, fingerprint_for_name,
9};
10use xxhash_rust::xxh3::Xxh3;
11
12use crate::memory_manager::{SCHEMA_PAGE, UNCLAIMED_PAGES_PAGE};
13use crate::table_registry::{AutoincrementLedger, IndexLedger, SchemaSnapshotLedger};
14use crate::{MemoryAccess, TableRegistry, UnclaimedPages};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct TableRegistryPage {
19 pub schema_snapshot_page: Page,
21 pub pages_list_page: Page,
23 pub free_segments_page: Page,
25 pub index_registry_page: Page,
27 pub autoincrement_registry_page: Option<Page>,
30}
31
32#[derive(Debug, Default, Clone, PartialEq, Eq)]
34pub struct SchemaRegistry {
35 schema_hash: u64,
36 tables: HashMap<TableFingerprint, TableRegistryPage>,
37}
38
39impl SchemaRegistry {
40 pub fn load(mm: &mut impl MemoryAccess) -> MemoryResult<Self> {
42 let registry: Self = mm.read_at(SCHEMA_PAGE, 0)?;
43 Ok(registry)
44 }
45
46 pub const fn schema_hash(&self) -> u64 {
48 self.schema_hash
49 }
50
51 pub fn register_table<TS>(
66 &mut self,
67 mm: &mut impl MemoryAccess,
68 ) -> MemoryResult<TableRegistryPage>
69 where
70 TS: TableSchema,
71 {
72 let fingerprint = TS::fingerprint();
74 let candidate_name = TS::table_name();
75 if let Some(pages) = self.tables.get(&fingerprint).copied() {
76 let existing = SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)?;
77 if existing.get().name != candidate_name {
78 return Err(MemoryError::NameCollision {
79 candidate: candidate_name.to_string(),
80 existing: existing.get().name.clone(),
81 });
82 }
83 return Ok(pages);
84 }
85
86 let schema_snapshot_page = mm.claim_page()?;
88 let pages_list_page = mm.claim_page()?;
89 let free_segments_page = mm.claim_page()?;
90 let index_registry_page = mm.claim_page()?;
91 let has_autoincrement = TS::columns().iter().any(|col| col.auto_increment);
93 let autoincrement_registry_page = if has_autoincrement {
94 Some(mm.claim_page()?)
95 } else {
96 None
97 };
98
99 let pages = TableRegistryPage {
101 schema_snapshot_page,
102 pages_list_page,
103 free_segments_page,
104 index_registry_page,
105 autoincrement_registry_page,
106 };
107 self.tables.insert(fingerprint, pages);
108
109 SchemaSnapshotLedger::init::<TS>(pages.schema_snapshot_page, mm)?;
111 IndexLedger::init(pages.index_registry_page, TS::indexes(), mm)?;
113 if let Some(autoinc_page) = pages.autoincrement_registry_page {
115 AutoincrementLedger::init::<TS>(autoinc_page, mm)?;
116 }
117
118 self.refresh_schema_hash(mm)?;
119 self.save(mm)?;
120
121 Ok(pages)
122 }
123
124 pub fn save(&self, mm: &mut impl MemoryAccess) -> MemoryResult<()> {
126 mm.write_at(SCHEMA_PAGE, 0, self)
127 }
128
129 pub fn table_registry_page<TS>(&self) -> Option<TableRegistryPage>
131 where
132 TS: TableSchema,
133 {
134 self.tables.get(&TS::fingerprint()).copied()
135 }
136
137 pub fn table_registry_page_by_name(&self, name: &str) -> Option<TableRegistryPage> {
142 self.tables.get(&fingerprint_for_name(name)).copied()
143 }
144
145 pub fn register_table_from_snapshot(
158 &mut self,
159 snapshot: &TableSchemaSnapshot,
160 mm: &mut impl MemoryAccess,
161 ) -> MemoryResult<TableRegistryPage> {
162 let fingerprint = fingerprint_for_name(&snapshot.name);
163 let candidate_name = snapshot.name.as_str();
164 if let Some(pages) = self.tables.get(&fingerprint).copied() {
165 let existing = SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)?;
166 if existing.get().name != candidate_name {
167 return Err(MemoryError::NameCollision {
168 candidate: candidate_name.to_string(),
169 existing: existing.get().name.clone(),
170 });
171 }
172 return Ok(pages);
173 }
174
175 let schema_snapshot_page = mm.claim_page()?;
176 let pages_list_page = mm.claim_page()?;
177 let free_segments_page = mm.claim_page()?;
178 let index_registry_page = mm.claim_page()?;
179 let has_autoincrement = snapshot.columns.iter().any(|col| col.auto_increment);
180 let autoincrement_registry_page = if has_autoincrement {
181 Some(mm.claim_page()?)
182 } else {
183 None
184 };
185
186 let pages = TableRegistryPage {
187 schema_snapshot_page,
188 pages_list_page,
189 free_segments_page,
190 index_registry_page,
191 autoincrement_registry_page,
192 };
193 self.tables.insert(fingerprint, pages);
194
195 mm.write_at(pages.schema_snapshot_page, 0, snapshot)?;
196 IndexLedger::init_from_keys(
197 pages.index_registry_page,
198 snapshot.indexes.iter().map(|idx| idx.columns.clone()),
199 mm,
200 )?;
201 self.refresh_schema_hash(mm)?;
202 self.save(mm)?;
203
204 Ok(pages)
205 }
206
207 pub fn unregister_table(
221 &mut self,
222 name: &str,
223 mm: &mut impl MemoryAccess,
224 ) -> MemoryResult<Option<TableRegistryPage>> {
225 let fingerprint = fingerprint_for_name(name);
226 let pages = self.tables.get(&fingerprint).copied();
227 if let Some(pages) = pages {
228 let registry = TableRegistry::load(pages, mm)?;
232 let pages_to_release = registry.releasable_pages_count(pages, mm)?;
233 let ledger: UnclaimedPages = mm.read_at(UNCLAIMED_PAGES_PAGE, 0)?;
234 if ledger.remaining_capacity() < pages_to_release as u32 {
235 return Err(MemoryError::UnclaimedPagesFull {
236 capacity: crate::UNCLAIMED_PAGES_CAPACITY,
237 });
238 }
239
240 let removed = self.tables.remove(&fingerprint);
241 debug_assert_eq!(removed, Some(pages));
242 registry.release_pages(pages, mm)?;
243 self.refresh_schema_hash(mm)?;
244 self.save(mm)?;
245 return Ok(removed);
246 }
247 Ok(None)
248 }
249
250 pub fn stored_snapshots(
260 &self,
261 mm: &mut impl MemoryAccess,
262 ) -> MemoryResult<Vec<TableSchemaSnapshot>> {
263 self.tables
264 .values()
265 .map(|pages| {
266 SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)
267 .map(|ledger| ledger.get().clone())
268 })
269 .collect()
270 }
271
272 pub fn refresh_schema_hash(&mut self, mm: &mut impl MemoryAccess) -> MemoryResult<()> {
274 self.schema_hash = compute_hash(self.stored_snapshots(mm)?);
275 Ok(())
276 }
277}
278
279fn compute_hash(mut snapshots: Vec<TableSchemaSnapshot>) -> u64 {
280 snapshots.sort_by(|a, b| a.name.cmp(&b.name));
281
282 let mut hasher = Xxh3::new();
283 hasher.update(&[TableSchemaSnapshot::latest_version()]);
284 for snapshot in &snapshots {
285 let bytes = snapshot.encode();
286 hasher.update(&(bytes.len() as u64).to_le_bytes());
287 hasher.update(&bytes);
288 }
289 hasher.digest()
290}
291
292impl Encode for SchemaRegistry {
293 const SIZE: DataSize = DataSize::Dynamic;
294
295 const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
296
297 fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
298 let mut buffer = Vec::with_capacity(self.size() as usize);
301 buffer.extend_from_slice(&self.schema_hash.to_le_bytes());
302 buffer.extend_from_slice(&(self.tables.len() as u64).to_le_bytes());
304 for (fingerprint, page) in &self.tables {
306 buffer.extend_from_slice(&fingerprint.to_le_bytes());
307 buffer.extend_from_slice(&page.schema_snapshot_page.to_le_bytes());
308 buffer.extend_from_slice(&page.pages_list_page.to_le_bytes());
309 buffer.extend_from_slice(&page.free_segments_page.to_le_bytes());
310 buffer.extend_from_slice(&page.index_registry_page.to_le_bytes());
311 if let Some(autoinc_page) = page.autoincrement_registry_page {
313 buffer.push(1); buffer.extend_from_slice(&autoinc_page.to_le_bytes());
315 } else {
316 buffer.push(0); }
318 }
319 std::borrow::Cow::Owned(buffer)
320 }
321
322 fn decode(data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
323 where
324 Self: Sized,
325 {
326 let mut offset = 0;
327 let schema_hash = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
328 offset += 8;
329 let len = u64::from_le_bytes(
331 data[offset..offset + 8]
332 .try_into()
333 .expect("failed to read length"),
334 ) as usize;
335 offset += 8;
336 let mut tables = HashMap::with_capacity(len);
337 for _ in 0..len {
339 let fingerprint = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
340 offset += 8;
341 let schema_snapshot_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
342 offset += 4;
343 let pages_list_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
344 offset += 4;
345 let free_segments_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
346 offset += 4;
347 let index_registry_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
348 offset += 4;
349 let has_autoincrement = data[offset] == 1;
350 offset += 1;
351 let autoincrement_registry_page = if has_autoincrement {
352 let page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
353 offset += 4;
354 Some(page)
355 } else {
356 None
357 };
358 tables.insert(
359 fingerprint,
360 TableRegistryPage {
361 schema_snapshot_page,
362 pages_list_page,
363 free_segments_page,
364 index_registry_page,
365 autoincrement_registry_page,
366 },
367 );
368 }
369 Ok(Self {
370 schema_hash,
371 tables,
372 })
373 }
374
375 fn size(&self) -> MSize {
376 let autoinc_pages = self
387 .tables
388 .values()
389 .filter(|page| page.autoincrement_registry_page.is_some())
390 .count() as MSize;
391
392 16 + (self.tables.len() as MSize * (4 * 4 + 8 + 1)) + (autoinc_pages * 4)
393 }
394}
395
396#[cfg(test)]
397mod tests {
398
399 use candid::CandidType;
400 use serde::{Deserialize, Serialize};
401 use wasm_dbms_api::prelude::{
402 ColumnDef, DbmsResult, IndexDef, InsertRecord, Int32, NoForeignFetcher, TableColumns,
403 TableRecord, UpdateRecord,
404 };
405
406 use super::*;
407 use crate::{
408 HeapMemoryProvider, MemoryAccess, MemoryManager, RecordAddress, UNCLAIMED_PAGES_CAPACITY,
409 UnclaimedPages,
410 };
411
412 fn make_mm() -> MemoryManager<HeapMemoryProvider> {
413 MemoryManager::init(HeapMemoryProvider::default())
414 }
415
416 #[test]
417 fn test_should_encode_and_decode_schema_registry() {
418 let mut mm = make_mm();
419
420 let mut registry =
422 SchemaRegistry::load(&mut mm).expect("failed to load init schema registry");
423
424 let registry_page = registry
426 .register_table::<User>(&mut mm)
427 .expect("failed to register table");
428
429 let fetched_page = registry
431 .table_registry_page::<User>()
432 .expect("failed to get table registry page");
433 assert_eq!(registry_page, fetched_page);
434
435 let encoded = registry.encode();
437 let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
439 assert_eq!(registry, decoded);
440
441 let another_registry_page = registry
443 .register_table::<AnotherTable>(&mut mm)
444 .expect("failed to register another table");
445 let another_fetched_page = registry
446 .table_registry_page::<AnotherTable>()
447 .expect("failed to get another table registry page");
448 assert_eq!(another_registry_page, another_fetched_page);
449
450 let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload schema registry");
452 assert_eq!(registry, reloaded);
453 assert_eq!(reloaded.tables.len(), 2);
455 assert_eq!(
456 reloaded
457 .table_registry_page::<User>()
458 .expect("failed to get first table registry page after reload"),
459 registry_page
460 );
461 assert_eq!(
462 reloaded
463 .table_registry_page::<AnotherTable>()
464 .expect("failed to get second table registry page after reload"),
465 another_registry_page
466 );
467 }
468
469 #[test]
470 fn test_register_table_writes_snapshot_to_ledger() {
471 let mut mm = make_mm();
472 let mut registry = SchemaRegistry::default();
473
474 let pages = registry
475 .register_table::<User>(&mut mm)
476 .expect("failed to register table");
477
478 let ledger = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm)
479 .expect("failed to load snapshot ledger after register_table");
480
481 assert_eq!(ledger.get(), &User::schema_snapshot());
482 assert_eq!(ledger.get().name, "users");
483 }
484
485 #[test]
486 fn test_register_table_returns_name_collision_when_hash_slot_belongs_to_another_name() {
487 let mut mm = make_mm();
488 let mut registry = SchemaRegistry::default();
489
490 let pages = registry
492 .register_table::<User>(&mut mm)
493 .expect("failed to register user");
494
495 let mut tampered = User::schema_snapshot();
497 tampered.name = "imposter".to_string();
498 mm.write_at(pages.schema_snapshot_page, 0, &tampered)
499 .expect("failed to overwrite snapshot");
500
501 let result = registry.register_table::<User>(&mut mm);
502 match result {
503 Err(MemoryError::NameCollision {
504 candidate,
505 existing,
506 }) => {
507 assert_eq!(candidate, "users");
508 assert_eq!(existing, "imposter");
509 }
510 other => panic!("expected NameCollision, got {other:?}"),
511 }
512 }
513
514 #[test]
515 fn test_should_not_register_same_table_twice() {
516 let mut mm = make_mm();
517 let mut registry = SchemaRegistry::default();
518
519 let first_page = registry
520 .register_table::<User>(&mut mm)
521 .expect("failed to register table first time");
522 let second_page = registry
523 .register_table::<User>(&mut mm)
524 .expect("failed to register table second time");
525
526 assert_eq!(first_page, second_page);
527 assert_eq!(registry.tables.len(), 1);
528 }
529
530 #[test]
531 fn test_should_init_index_ledger() {
532 let mut mm = make_mm();
533 let mut registry = SchemaRegistry::default();
534
535 let pages = registry
536 .register_table::<User>(&mut mm)
537 .expect("failed to register table");
538
539 let mut index_ledger = IndexLedger::load(pages.index_registry_page, &mut mm)
541 .expect("failed to load index ledger");
542
543 index_ledger
545 .insert(
546 &["id"],
547 Int32::from(1i32),
548 RecordAddress { page: 1, offset: 0 },
549 &mut mm,
550 )
551 .expect("failed to insert index");
552 let result = index_ledger
554 .search(&["id"], &Int32::from(1i32), &mut mm)
555 .expect("failed to search index")
556 .get(0)
557 .copied()
558 .expect("no index at 0");
559 assert_eq!(result, RecordAddress { page: 1, offset: 0 });
560 }
561
562 #[derive(Clone, CandidType)]
563 struct AnotherTable;
564
565 impl Encode for AnotherTable {
566 const SIZE: DataSize = DataSize::Dynamic;
567
568 const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
569
570 fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
571 std::borrow::Cow::Owned(vec![])
572 }
573
574 fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
575 where
576 Self: Sized,
577 {
578 Ok(AnotherTable)
579 }
580
581 fn size(&self) -> MSize {
582 0
583 }
584 }
585
586 #[derive(Clone, CandidType, Deserialize)]
587 struct AnotherTableRecord;
588
589 impl TableRecord for AnotherTableRecord {
590 type Schema = AnotherTable;
591
592 fn from_values(_values: TableColumns) -> Self {
593 AnotherTableRecord
594 }
595
596 fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
597 vec![]
598 }
599 }
600
601 #[derive(Clone, CandidType, Serialize)]
602 struct AnotherTableInsert;
603
604 impl InsertRecord for AnotherTableInsert {
605 type Record = AnotherTableRecord;
606 type Schema = AnotherTable;
607
608 fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
609 Ok(AnotherTableInsert)
610 }
611
612 fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
613 vec![]
614 }
615
616 fn into_record(self) -> Self::Schema {
617 AnotherTable
618 }
619 }
620
621 #[derive(Clone, CandidType, Serialize)]
622 struct AnotherTableUpdate;
623
624 impl UpdateRecord for AnotherTableUpdate {
625 type Record = AnotherTableRecord;
626 type Schema = AnotherTable;
627
628 fn from_values(
629 _values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
630 _where_clause: Option<wasm_dbms_api::prelude::Filter>,
631 ) -> Self {
632 AnotherTableUpdate
633 }
634
635 fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
636 vec![]
637 }
638
639 fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
640 None
641 }
642 }
643
644 impl TableSchema for AnotherTable {
645 type Record = AnotherTableRecord;
646 type Insert = AnotherTableInsert;
647 type Update = AnotherTableUpdate;
648 type ForeignFetcher = NoForeignFetcher;
649
650 fn table_name() -> &'static str {
651 "another_table"
652 }
653
654 fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
655 &[]
656 }
657
658 fn primary_key() -> &'static str {
659 ""
660 }
661
662 fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
663 &[]
664 }
665
666 fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
667 vec![]
668 }
669
670 fn sanitizer(
671 _column_name: &'static str,
672 ) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
673 None
674 }
675
676 fn validator(
677 _column_name: &'static str,
678 ) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
679 None
680 }
681 }
682
683 #[derive(Clone, CandidType)]
686 struct User;
687
688 impl Encode for User {
689 const SIZE: DataSize = DataSize::Dynamic;
690 const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
691
692 fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
693 std::borrow::Cow::Owned(vec![])
694 }
695
696 fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
697 where
698 Self: Sized,
699 {
700 Ok(User)
701 }
702
703 fn size(&self) -> MSize {
704 0
705 }
706 }
707
708 #[derive(Clone, CandidType, Deserialize)]
709 struct UserRecord;
710
711 impl TableRecord for UserRecord {
712 type Schema = User;
713
714 fn from_values(_values: TableColumns) -> Self {
715 UserRecord
716 }
717
718 fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
719 vec![]
720 }
721 }
722
723 #[derive(Clone, CandidType, Serialize)]
724 struct UserInsert;
725
726 impl InsertRecord for UserInsert {
727 type Record = UserRecord;
728 type Schema = User;
729
730 fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
731 Ok(UserInsert)
732 }
733
734 fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
735 vec![]
736 }
737
738 fn into_record(self) -> Self::Schema {
739 User
740 }
741 }
742
743 #[derive(Clone, CandidType, Serialize)]
744 struct UserUpdate;
745
746 impl UpdateRecord for UserUpdate {
747 type Record = UserRecord;
748 type Schema = User;
749
750 fn from_values(
751 _values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
752 _where_clause: Option<wasm_dbms_api::prelude::Filter>,
753 ) -> Self {
754 UserUpdate
755 }
756
757 fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
758 vec![]
759 }
760
761 fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
762 None
763 }
764 }
765
766 impl TableSchema for User {
767 type Record = UserRecord;
768 type Insert = UserInsert;
769 type Update = UserUpdate;
770 type ForeignFetcher = NoForeignFetcher;
771
772 fn table_name() -> &'static str {
773 "users"
774 }
775
776 fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
777 &[]
778 }
779
780 fn primary_key() -> &'static str {
781 "id"
782 }
783
784 fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
785 &[IndexDef(&["id"])]
786 }
787
788 fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
789 vec![]
790 }
791
792 fn sanitizer(
793 _column_name: &'static str,
794 ) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
795 None
796 }
797
798 fn validator(
799 _column_name: &'static str,
800 ) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
801 None
802 }
803 }
804
805 #[test]
806 fn test_table_registry_page_returns_none_for_unregistered_table() {
807 let registry = SchemaRegistry::default();
808 assert!(registry.table_registry_page::<User>().is_none());
809 }
810
811 #[test]
812 fn test_empty_registry_encode_decode() {
813 let registry = SchemaRegistry::default();
814 let encoded = registry.encode();
815 let decoded = SchemaRegistry::decode(encoded).expect("failed to decode empty registry");
816 assert_eq!(registry, decoded);
817 assert_eq!(decoded.tables.len(), 0);
818 }
819
820 #[test]
821 fn test_load_fresh_memory_returns_empty_registry() {
822 let mut mm = make_mm();
823 let registry = SchemaRegistry::load(&mut mm).expect("failed to load from fresh memory");
824 assert_eq!(registry.tables.len(), 0);
825 }
826
827 #[test]
828 fn test_save_and_reload() {
829 let mut mm = make_mm();
830 let mut registry = SchemaRegistry::default();
831 registry
832 .register_table::<User>(&mut mm)
833 .expect("failed to register");
834 registry
836 .register_table::<AnotherTable>(&mut mm)
837 .expect("failed to register another");
838 registry.save(&mut mm).expect("failed to save");
839
840 let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload");
841 assert_eq!(reloaded.tables.len(), 2);
842 assert_eq!(registry, reloaded);
843 }
844
845 #[test]
846 fn test_schema_registry_size() {
847 let mut mm = make_mm();
848 let mut registry = SchemaRegistry::default();
849 assert_eq!(registry.size(), 16);
851 registry
852 .register_table::<User>(&mut mm)
853 .expect("failed to register");
854 assert_eq!(registry.size(), 41);
858 }
859
860 #[test]
861 fn test_should_allocate_autoincrement_page_when_column_has_autoincrement() {
862 let mut mm = make_mm();
863 let mut registry = SchemaRegistry::default();
864
865 let pages = registry
866 .register_table::<AutoincrementTable>(&mut mm)
867 .expect("failed to register autoincrement table");
868
869 assert!(
870 pages.autoincrement_registry_page.is_some(),
871 "autoincrement registry page should be allocated for tables with autoincrement columns"
872 );
873 }
874
875 #[test]
876 fn test_should_not_allocate_autoincrement_page_when_no_autoincrement_column() {
877 let mut mm = make_mm();
878 let mut registry = SchemaRegistry::default();
879
880 let pages = registry
881 .register_table::<User>(&mut mm)
882 .expect("failed to register user table");
883
884 assert!(
885 pages.autoincrement_registry_page.is_none(),
886 "autoincrement registry page should not be allocated for tables without autoincrement columns"
887 );
888 }
889
890 #[test]
891 fn test_schema_registry_size_with_autoincrement() {
892 let mut mm = make_mm();
893 let mut registry = SchemaRegistry::default();
894
895 registry
896 .register_table::<AutoincrementTable>(&mut mm)
897 .expect("failed to register");
898 assert_eq!(registry.size(), 45);
902 }
903
904 #[test]
905 fn test_should_encode_and_decode_registry_with_autoincrement() {
906 let mut mm = make_mm();
907 let mut registry = SchemaRegistry::default();
908
909 registry
910 .register_table::<AutoincrementTable>(&mut mm)
911 .expect("failed to register");
912
913 let encoded = registry.encode();
914 let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
915 assert_eq!(registry, decoded);
916
917 let page = decoded
918 .table_registry_page::<AutoincrementTable>()
919 .expect("missing autoincrement table");
920 assert!(page.autoincrement_registry_page.is_some());
921 }
922
923 #[derive(Clone, CandidType)]
926 struct AutoincrementTable;
927
928 impl Encode for AutoincrementTable {
929 const SIZE: DataSize = DataSize::Dynamic;
930 const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
931
932 fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
933 std::borrow::Cow::Owned(vec![])
934 }
935
936 fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
937 where
938 Self: Sized,
939 {
940 Ok(AutoincrementTable)
941 }
942
943 fn size(&self) -> MSize {
944 0
945 }
946 }
947
948 #[derive(Clone, CandidType, Deserialize)]
949 struct AutoincrementTableRecord;
950
951 impl TableRecord for AutoincrementTableRecord {
952 type Schema = AutoincrementTable;
953
954 fn from_values(_values: TableColumns) -> Self {
955 AutoincrementTableRecord
956 }
957
958 fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
959 vec![]
960 }
961 }
962
963 #[derive(Clone, CandidType, Serialize)]
964 struct AutoincrementTableInsert;
965
966 impl InsertRecord for AutoincrementTableInsert {
967 type Record = AutoincrementTableRecord;
968 type Schema = AutoincrementTable;
969
970 fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
971 Ok(AutoincrementTableInsert)
972 }
973
974 fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
975 vec![]
976 }
977
978 fn into_record(self) -> Self::Schema {
979 AutoincrementTable
980 }
981 }
982
983 #[derive(Clone, CandidType, Serialize)]
984 struct AutoincrementTableUpdate;
985
986 impl UpdateRecord for AutoincrementTableUpdate {
987 type Record = AutoincrementTableRecord;
988 type Schema = AutoincrementTable;
989
990 fn from_values(
991 _values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
992 _where_clause: Option<wasm_dbms_api::prelude::Filter>,
993 ) -> Self {
994 AutoincrementTableUpdate
995 }
996
997 fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
998 vec![]
999 }
1000
1001 fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
1002 None
1003 }
1004 }
1005
1006 impl TableSchema for AutoincrementTable {
1007 type Record = AutoincrementTableRecord;
1008 type Insert = AutoincrementTableInsert;
1009 type Update = AutoincrementTableUpdate;
1010 type ForeignFetcher = NoForeignFetcher;
1011
1012 fn table_name() -> &'static str {
1013 "autoincrement_table"
1014 }
1015
1016 fn columns() -> &'static [ColumnDef] {
1017 use wasm_dbms_api::prelude::DataTypeKind;
1018
1019 &[ColumnDef {
1020 name: "id",
1021 data_type: DataTypeKind::Uint32,
1022 auto_increment: true,
1023 nullable: false,
1024 primary_key: true,
1025 unique: true,
1026 foreign_key: None,
1027 default: None,
1028 renamed_from: &[],
1029 }]
1030 }
1031
1032 fn primary_key() -> &'static str {
1033 "id"
1034 }
1035
1036 fn indexes() -> &'static [IndexDef] {
1037 &[IndexDef(&["id"])]
1038 }
1039
1040 fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
1041 vec![]
1042 }
1043
1044 fn sanitizer(
1045 _column_name: &'static str,
1046 ) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
1047 None
1048 }
1049
1050 fn validator(
1051 _column_name: &'static str,
1052 ) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
1053 None
1054 }
1055 }
1056
1057 use wasm_dbms_api::prelude::{ColumnSnapshot, DataTypeSnapshot, TableSchemaSnapshot};
1060
1061 fn dummy_snapshot(name: &str) -> TableSchemaSnapshot {
1062 TableSchemaSnapshot {
1063 version: TableSchemaSnapshot::latest_version(),
1064 name: name.to_string(),
1065 primary_key: "id".to_string(),
1066 alignment: 8,
1067 columns: vec![ColumnSnapshot {
1068 name: "id".to_string(),
1069 data_type: DataTypeSnapshot::Uint32,
1070 nullable: false,
1071 auto_increment: false,
1072 unique: true,
1073 primary_key: true,
1074 foreign_key: None,
1075 default: None,
1076 }],
1077 indexes: vec![],
1078 }
1079 }
1080
1081 #[test]
1082 fn test_table_registry_page_by_name_returns_pages_for_registered_table() {
1083 let mut mm = make_mm();
1084 let mut registry = SchemaRegistry::default();
1085 let pages = registry
1086 .register_table::<User>(&mut mm)
1087 .expect("failed to register user");
1088
1089 let by_name = registry
1090 .table_registry_page_by_name("users")
1091 .expect("missing pages by name");
1092 assert_eq!(by_name, pages);
1093 }
1094
1095 #[test]
1096 fn test_table_registry_page_by_name_returns_none_for_unknown_table() {
1097 let registry = SchemaRegistry::default();
1098 assert!(registry.table_registry_page_by_name("missing").is_none());
1099 }
1100
1101 #[test]
1102 fn test_stored_snapshots_returns_empty_for_unregistered_registry() {
1103 let mut mm = make_mm();
1104 let registry = SchemaRegistry::default();
1105 let snapshots = registry
1106 .stored_snapshots(&mut mm)
1107 .expect("failed to read snapshots");
1108 assert!(snapshots.is_empty());
1109 }
1110
1111 #[test]
1112 fn test_stored_snapshots_returns_one_entry_per_registered_table() {
1113 let mut mm = make_mm();
1114 let mut registry = SchemaRegistry::default();
1115 registry
1116 .register_table::<User>(&mut mm)
1117 .expect("failed to register user");
1118 registry
1119 .register_table::<AnotherTable>(&mut mm)
1120 .expect("failed to register another");
1121
1122 let snapshots = registry
1123 .stored_snapshots(&mut mm)
1124 .expect("failed to load snapshots");
1125 assert_eq!(snapshots.len(), 2);
1126 let names: Vec<&str> = snapshots.iter().map(|snap| snap.name.as_str()).collect();
1127 assert!(names.contains(&"users"));
1128 assert!(names.contains(&"another_table"));
1129 }
1130
1131 #[test]
1132 fn test_register_table_from_snapshot_allocates_pages_and_persists_snapshot() {
1133 let mut mm = make_mm();
1134 let mut registry = SchemaRegistry::default();
1135 let snapshot = dummy_snapshot("fresh");
1136
1137 let pages = registry
1138 .register_table_from_snapshot(&snapshot, &mut mm)
1139 .expect("failed to register from snapshot");
1140
1141 let loaded = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm).expect("load");
1142 assert_eq!(loaded.get(), &snapshot);
1143 assert!(registry.table_registry_page_by_name("fresh").is_some());
1144 }
1145
1146 #[test]
1147 fn test_register_table_from_snapshot_is_idempotent_for_same_name() {
1148 let mut mm = make_mm();
1149 let mut registry = SchemaRegistry::default();
1150 let snapshot = dummy_snapshot("fresh");
1151
1152 let first = registry
1153 .register_table_from_snapshot(&snapshot, &mut mm)
1154 .expect("first");
1155 let second = registry
1156 .register_table_from_snapshot(&snapshot, &mut mm)
1157 .expect("second");
1158 assert_eq!(first, second);
1159 }
1160
1161 #[test]
1162 fn test_register_table_from_snapshot_detects_name_collision() {
1163 let mut mm = make_mm();
1164 let mut registry = SchemaRegistry::default();
1165 let snapshot = dummy_snapshot("users");
1166
1167 let pages = registry
1168 .register_table_from_snapshot(&snapshot, &mut mm)
1169 .expect("first");
1170
1171 let mut tampered = snapshot.clone();
1174 tampered.name = "imposter".to_string();
1175 mm.write_at(pages.schema_snapshot_page, 0, &tampered)
1176 .expect("overwrite");
1177
1178 let result = registry.register_table_from_snapshot(&snapshot, &mut mm);
1179 assert!(matches!(
1180 result,
1181 Err(MemoryError::NameCollision {
1182 ref candidate,
1183 ref existing,
1184 }) if candidate == "users" && existing == "imposter"
1185 ));
1186 }
1187
1188 #[test]
1189 fn test_unregister_table_removes_entry_and_returns_previous_pages() {
1190 let mut mm = make_mm();
1191 let mut registry = SchemaRegistry::default();
1192 let pages = registry
1193 .register_table::<User>(&mut mm)
1194 .expect("failed to register");
1195
1196 let removed = registry
1197 .unregister_table("users", &mut mm)
1198 .expect("unregister");
1199 assert_eq!(removed, Some(pages));
1200 assert!(registry.table_registry_page_by_name("users").is_none());
1201 }
1202
1203 #[test]
1204 fn test_unregister_table_returns_none_for_unknown_table() {
1205 let mut mm = make_mm();
1206 let mut registry = SchemaRegistry::default();
1207 let removed = registry
1208 .unregister_table("missing", &mut mm)
1209 .expect("unregister");
1210 assert!(removed.is_none());
1211 }
1212
1213 #[test]
1214 fn test_unregister_table_releases_pages_for_reuse() {
1215 let mut mm = make_mm();
1220 let mut registry = SchemaRegistry::default();
1221
1222 let pages = registry.register_table::<User>(&mut mm).expect("register");
1223
1224 let last_page_before_drop = mm.last_page().expect("at least one page");
1225
1226 registry
1227 .unregister_table("users", &mut mm)
1228 .expect("unregister")
1229 .expect("expected returned pages");
1230
1231 let mut reclaimed = Vec::new();
1234 for _ in 0..5 {
1235 let page = mm.claim_page().expect("claim");
1236 assert!(
1237 page <= last_page_before_drop,
1238 "expected reclaimed page <= {last_page_before_drop}, got {page}"
1239 );
1240 reclaimed.push(page);
1241 }
1242 assert!(reclaimed.contains(&pages.schema_snapshot_page));
1244 assert!(reclaimed.contains(&pages.pages_list_page));
1245 assert!(reclaimed.contains(&pages.free_segments_page));
1246 assert!(reclaimed.contains(&pages.index_registry_page));
1247 }
1248
1249 #[test]
1250 fn test_unregister_table_rejects_release_when_unclaimed_ledger_is_full() {
1251 let mut mm = make_mm();
1252 let mut registry = SchemaRegistry::default();
1253 let pages = registry.register_table::<User>(&mut mm).expect("register");
1254
1255 let mut ledger = UnclaimedPages::new();
1256 for page in 0..(UNCLAIMED_PAGES_CAPACITY - 1) {
1257 ledger.push(page).expect("fill ledger");
1258 }
1259 mm.write_at(mm.unclaimed_pages_page(), 0, &ledger)
1260 .expect("persist ledger");
1261
1262 let err = registry
1263 .unregister_table("users", &mut mm)
1264 .expect_err("drop must fail before partially releasing pages");
1265 assert!(matches!(err, MemoryError::UnclaimedPagesFull { .. }));
1266
1267 assert!(
1268 registry.table_registry_page_by_name("users").is_some(),
1269 "registry entry must remain when release is rejected"
1270 );
1271
1272 let snapshot = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm)
1273 .expect("load snapshot ledger")
1274 .get()
1275 .clone();
1276 assert_eq!(snapshot.name, "users");
1277 }
1278}