Skip to main content

wasm_dbms_memory/
schema_registry.rs

1// Rust guideline compliant 2026-02-28
2
3use std::collections::HashMap;
4
5use wasm_dbms_api::memory::MemoryError;
6use wasm_dbms_api::prelude::{
7    DEFAULT_ALIGNMENT, DataSize, Encode, MSize, MemoryResult, Page, PageOffset, TableFingerprint,
8    TableSchema, TableSchemaSnapshot, fingerprint_for_name,
9};
10use xxhash_rust::xxh3::Xxh3;
11
12use crate::memory_manager::{SCHEMA_PAGE, UNCLAIMED_PAGES_PAGE};
13use crate::table_registry::{AutoincrementLedger, IndexLedger, SchemaSnapshotLedger};
14use crate::{MemoryAccess, TableRegistry, UnclaimedPages};
15
16/// The dictionary of tables, mapping the table schema fingerprint to the pages where the table data and metadata are stored.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct TableRegistryPage {
19    /// The page where the Schema Snapshot for this table is stored.
20    pub schema_snapshot_page: Page,
21    /// The page where the list of pages for this table is stored.
22    pub pages_list_page: Page,
23    /// The page where the free segments for this table are stored.
24    pub free_segments_page: Page,
25    /// The page where the index registry for this table is stored.
26    pub index_registry_page: Page,
27    /// The page where the autoincrement registry for this table is stored.
28    /// Only used if the table has an autoincrement column.
29    pub autoincrement_registry_page: Option<Page>,
30}
31
32/// The schema registry takes care of storing and retrieving table schemas from memory.
33#[derive(Debug, Default, Clone, PartialEq, Eq)]
34pub struct SchemaRegistry {
35    schema_hash: u64,
36    tables: HashMap<TableFingerprint, TableRegistryPage>,
37}
38
39impl SchemaRegistry {
40    /// Load the schema registry from memory.
41    pub fn load(mm: &mut impl MemoryAccess) -> MemoryResult<Self> {
42        let registry: Self = mm.read_at(SCHEMA_PAGE, 0)?;
43        Ok(registry)
44    }
45
46    /// Returns the cached hash of every persisted schema snapshot.
47    pub const fn schema_hash(&self) -> u64 {
48        self.schema_hash
49    }
50
51    /// Registers a table and allocates it registry page.
52    ///
53    /// The [`TableSchema`] type parameter is used to get the [`TableSchema::fingerprint`] of the
54    /// table schema. The fingerprint is derived from the table name, so two distinct names that
55    /// hash to the same value are detected eagerly: when the slot is already occupied by a table
56    /// with a different name, [`MemoryError::NameCollision`] is returned and no allocation is
57    /// performed.
58    ///
59    /// # Errors
60    ///
61    /// - [`MemoryError::NameCollision`] when the fingerprint slot is occupied by a table whose
62    ///   persisted snapshot carries a different name.
63    /// - Any [`MemoryError`] propagated from page allocation, snapshot init, or the registry
64    ///   write-back.
65    pub fn register_table<TS>(
66        &mut self,
67        mm: &mut impl MemoryAccess,
68    ) -> MemoryResult<TableRegistryPage>
69    where
70        TS: TableSchema,
71    {
72        // check if already registered, and detect name-hash collisions eagerly
73        let fingerprint = TS::fingerprint();
74        let candidate_name = TS::table_name();
75        if let Some(pages) = self.tables.get(&fingerprint).copied() {
76            let existing = SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)?;
77            if existing.get().name != candidate_name {
78                return Err(MemoryError::NameCollision {
79                    candidate: candidate_name.to_string(),
80                    existing: existing.get().name.clone(),
81                });
82            }
83            return Ok(pages);
84        }
85
86        // allocate table registry page
87        let schema_snapshot_page = mm.claim_page()?;
88        let pages_list_page = mm.claim_page()?;
89        let free_segments_page = mm.claim_page()?;
90        let index_registry_page = mm.claim_page()?;
91        // allocate autoincrement registry page if needed
92        let has_autoincrement = TS::columns().iter().any(|col| col.auto_increment);
93        let autoincrement_registry_page = if has_autoincrement {
94            Some(mm.claim_page()?)
95        } else {
96            None
97        };
98
99        // insert into tables map
100        let pages = TableRegistryPage {
101            schema_snapshot_page,
102            pages_list_page,
103            free_segments_page,
104            index_registry_page,
105            autoincrement_registry_page,
106        };
107        self.tables.insert(fingerprint, pages);
108
109        // init snapshot ledger for this table
110        SchemaSnapshotLedger::init::<TS>(pages.schema_snapshot_page, mm)?;
111        // init index ledger for this table
112        IndexLedger::init(pages.index_registry_page, TS::indexes(), mm)?;
113        // init autoincrement ledger for this table if needed
114        if let Some(autoinc_page) = pages.autoincrement_registry_page {
115            AutoincrementLedger::init::<TS>(autoinc_page, mm)?;
116        }
117
118        self.refresh_schema_hash(mm)?;
119        self.save(mm)?;
120
121        Ok(pages)
122    }
123
124    /// Save the schema registry to memory.
125    pub fn save(&self, mm: &mut impl MemoryAccess) -> MemoryResult<()> {
126        mm.write_at(SCHEMA_PAGE, 0, self)
127    }
128
129    /// Returns the table registry page for a given table schema.
130    pub fn table_registry_page<TS>(&self) -> Option<TableRegistryPage>
131    where
132        TS: TableSchema,
133    {
134        self.tables.get(&TS::fingerprint()).copied()
135    }
136
137    /// Returns the table registry page for the table with the given name.
138    ///
139    /// Used by the migration engine, which knows tables only by name when
140    /// applying ops decoded from snapshots.
141    pub fn table_registry_page_by_name(&self, name: &str) -> Option<TableRegistryPage> {
142        self.tables.get(&fingerprint_for_name(name)).copied()
143    }
144
145    /// Registers a table from a snapshot, allocating its registry pages.
146    ///
147    /// The migration engine uses this entry point when applying a
148    /// `MigrationOp::CreateTable`: the source of truth is a
149    /// [`TableSchemaSnapshot`], not a `TableSchema` type.
150    ///
151    /// # Errors
152    ///
153    /// - [`MemoryError::NameCollision`] when the fingerprint slot is occupied
154    ///   by a table with a different name.
155    /// - Any [`MemoryError`] propagated from page allocation, snapshot init,
156    ///   index init, or registry persistence.
157    pub fn register_table_from_snapshot(
158        &mut self,
159        snapshot: &TableSchemaSnapshot,
160        mm: &mut impl MemoryAccess,
161    ) -> MemoryResult<TableRegistryPage> {
162        let fingerprint = fingerprint_for_name(&snapshot.name);
163        let candidate_name = snapshot.name.as_str();
164        if let Some(pages) = self.tables.get(&fingerprint).copied() {
165            let existing = SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)?;
166            if existing.get().name != candidate_name {
167                return Err(MemoryError::NameCollision {
168                    candidate: candidate_name.to_string(),
169                    existing: existing.get().name.clone(),
170                });
171            }
172            return Ok(pages);
173        }
174
175        let schema_snapshot_page = mm.claim_page()?;
176        let pages_list_page = mm.claim_page()?;
177        let free_segments_page = mm.claim_page()?;
178        let index_registry_page = mm.claim_page()?;
179        let has_autoincrement = snapshot.columns.iter().any(|col| col.auto_increment);
180        let autoincrement_registry_page = if has_autoincrement {
181            Some(mm.claim_page()?)
182        } else {
183            None
184        };
185
186        let pages = TableRegistryPage {
187            schema_snapshot_page,
188            pages_list_page,
189            free_segments_page,
190            index_registry_page,
191            autoincrement_registry_page,
192        };
193        self.tables.insert(fingerprint, pages);
194
195        mm.write_at(pages.schema_snapshot_page, 0, snapshot)?;
196        IndexLedger::init_from_keys(
197            pages.index_registry_page,
198            snapshot.indexes.iter().map(|idx| idx.columns.clone()),
199            mm,
200        )?;
201        self.refresh_schema_hash(mm)?;
202        self.save(mm)?;
203
204        Ok(pages)
205    }
206
207    /// Removes the table identified by `name` from the registry, releases
208    /// every page it owned back to the unclaimed-pages ledger, and persists
209    /// the updated registry.
210    ///
211    /// Used by the migration engine when applying a `MigrationOp::DropTable`.
212    ///
213    /// Returns the [`TableRegistryPage`] previously associated with the table,
214    /// or `None` if no such table was registered.
215    ///
216    /// # Errors
217    ///
218    /// Returns a [`MemoryError`] if loading the table registry, releasing
219    /// its pages, or persisting the updated schema registry fails.
220    pub fn unregister_table(
221        &mut self,
222        name: &str,
223        mm: &mut impl MemoryAccess,
224    ) -> MemoryResult<Option<TableRegistryPage>> {
225        let fingerprint = fingerprint_for_name(name);
226        let pages = self.tables.get(&fingerprint).copied();
227        if let Some(pages) = pages {
228            // Release every page owned by the dropped table before persisting
229            // the updated schema registry — once the schema page is rewritten
230            // there is no way back to the table's `TableRegistryPage`.
231            let registry = TableRegistry::load(pages, mm)?;
232            let pages_to_release = registry.releasable_pages_count(pages, mm)?;
233            let ledger: UnclaimedPages = mm.read_at(UNCLAIMED_PAGES_PAGE, 0)?;
234            if ledger.remaining_capacity() < pages_to_release as u32 {
235                return Err(MemoryError::UnclaimedPagesFull {
236                    capacity: crate::UNCLAIMED_PAGES_CAPACITY,
237                });
238            }
239
240            let removed = self.tables.remove(&fingerprint);
241            debug_assert_eq!(removed, Some(pages));
242            registry.release_pages(pages, mm)?;
243            self.refresh_schema_hash(mm)?;
244            self.save(mm)?;
245            return Ok(removed);
246        }
247        Ok(None)
248    }
249
250    /// Returns the persisted [`TableSchemaSnapshot`] for every registered table.
251    ///
252    /// The order is unspecified. Callers that need a stable order (e.g. for
253    /// drift hashing) must sort by [`TableSchemaSnapshot::name`].
254    ///
255    /// # Errors
256    ///
257    /// Returns the first [`MemoryError`] encountered while loading any
258    /// snapshot page.
259    pub fn stored_snapshots(
260        &self,
261        mm: &mut impl MemoryAccess,
262    ) -> MemoryResult<Vec<TableSchemaSnapshot>> {
263        self.tables
264            .values()
265            .map(|pages| {
266                SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)
267                    .map(|ledger| ledger.get().clone())
268            })
269            .collect()
270    }
271
272    /// Recomputes the cached schema hash from the currently registered tables.
273    pub fn refresh_schema_hash(&mut self, mm: &mut impl MemoryAccess) -> MemoryResult<()> {
274        self.schema_hash = compute_hash(self.stored_snapshots(mm)?);
275        Ok(())
276    }
277}
278
279fn compute_hash(mut snapshots: Vec<TableSchemaSnapshot>) -> u64 {
280    snapshots.sort_by(|a, b| a.name.cmp(&b.name));
281
282    let mut hasher = Xxh3::new();
283    hasher.update(&[TableSchemaSnapshot::latest_version()]);
284    for snapshot in &snapshots {
285        let bytes = snapshot.encode();
286        hasher.update(&(bytes.len() as u64).to_le_bytes());
287        hasher.update(&bytes);
288    }
289    hasher.digest()
290}
291
292impl Encode for SchemaRegistry {
293    const SIZE: DataSize = DataSize::Dynamic;
294
295    const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
296
297    fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
298        // prepare buffer; size is 8 bytes for schema_hash + 8 bytes for len
299        // + fixed-size fields for each entry.
300        let mut buffer = Vec::with_capacity(self.size() as usize);
301        buffer.extend_from_slice(&self.schema_hash.to_le_bytes());
302        // write 8 bytes len of map
303        buffer.extend_from_slice(&(self.tables.len() as u64).to_le_bytes());
304        // write each entry
305        for (fingerprint, page) in &self.tables {
306            buffer.extend_from_slice(&fingerprint.to_le_bytes());
307            buffer.extend_from_slice(&page.schema_snapshot_page.to_le_bytes());
308            buffer.extend_from_slice(&page.pages_list_page.to_le_bytes());
309            buffer.extend_from_slice(&page.free_segments_page.to_le_bytes());
310            buffer.extend_from_slice(&page.index_registry_page.to_le_bytes());
311            // autoincrement registry page is optional, so we write a flag and then the page if it exists
312            if let Some(autoinc_page) = page.autoincrement_registry_page {
313                buffer.push(1); // flag for presence of autoincrement registry page
314                buffer.extend_from_slice(&autoinc_page.to_le_bytes());
315            } else {
316                buffer.push(0); // flag for absence of autoincrement registry page
317            }
318        }
319        std::borrow::Cow::Owned(buffer)
320    }
321
322    fn decode(data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
323    where
324        Self: Sized,
325    {
326        let mut offset = 0;
327        let schema_hash = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
328        offset += 8;
329        // read len
330        let len = u64::from_le_bytes(
331            data[offset..offset + 8]
332                .try_into()
333                .expect("failed to read length"),
334        ) as usize;
335        offset += 8;
336        let mut tables = HashMap::with_capacity(len);
337        // read each entry
338        for _ in 0..len {
339            let fingerprint = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
340            offset += 8;
341            let schema_snapshot_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
342            offset += 4;
343            let pages_list_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
344            offset += 4;
345            let free_segments_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
346            offset += 4;
347            let index_registry_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
348            offset += 4;
349            let has_autoincrement = data[offset] == 1;
350            offset += 1;
351            let autoincrement_registry_page = if has_autoincrement {
352                let page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
353                offset += 4;
354                Some(page)
355            } else {
356                None
357            };
358            tables.insert(
359                fingerprint,
360                TableRegistryPage {
361                    schema_snapshot_page,
362                    pages_list_page,
363                    free_segments_page,
364                    index_registry_page,
365                    autoincrement_registry_page,
366                },
367            );
368        }
369        Ok(Self {
370            schema_hash,
371            tables,
372        })
373    }
374
375    fn size(&self) -> MSize {
376        // - 8 bytes for `self.schema_hash`
377        // - 8 bytes for `self.tables.len()`
378        // - for each entry:
379        //  - 8 bytes for the fingerprint
380        //  - 4 bytes for the schema_snapshot_page
381        //  - 4 bytes for the pages_list_page
382        //  - 4 bytes for the free_segments_page
383        //  - 4 bytes for the index_registry_page
384        //  - 1 byte for the autoincrement registry page flag
385        //  - 4 bytes for the autoincrement registry page if it exists
386        let autoinc_pages = self
387            .tables
388            .values()
389            .filter(|page| page.autoincrement_registry_page.is_some())
390            .count() as MSize;
391
392        16 + (self.tables.len() as MSize * (4 * 4 + 8 + 1)) + (autoinc_pages * 4)
393    }
394}
395
396#[cfg(test)]
397mod tests {
398
399    use candid::CandidType;
400    use serde::{Deserialize, Serialize};
401    use wasm_dbms_api::prelude::{
402        ColumnDef, DbmsResult, IndexDef, InsertRecord, Int32, NoForeignFetcher, TableColumns,
403        TableRecord, UpdateRecord,
404    };
405
406    use super::*;
407    use crate::{
408        HeapMemoryProvider, MemoryAccess, MemoryManager, RecordAddress, UNCLAIMED_PAGES_CAPACITY,
409        UnclaimedPages,
410    };
411
412    fn make_mm() -> MemoryManager<HeapMemoryProvider> {
413        MemoryManager::init(HeapMemoryProvider::default())
414    }
415
416    #[test]
417    fn test_should_encode_and_decode_schema_registry() {
418        let mut mm = make_mm();
419
420        // load
421        let mut registry =
422            SchemaRegistry::load(&mut mm).expect("failed to load init schema registry");
423
424        // register table
425        let registry_page = registry
426            .register_table::<User>(&mut mm)
427            .expect("failed to register table");
428
429        // get table registry page
430        let fetched_page = registry
431            .table_registry_page::<User>()
432            .expect("failed to get table registry page");
433        assert_eq!(registry_page, fetched_page);
434
435        // encode
436        let encoded = registry.encode();
437        // decode
438        let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
439        assert_eq!(registry, decoded);
440
441        // try to actually add another
442        let another_registry_page = registry
443            .register_table::<AnotherTable>(&mut mm)
444            .expect("failed to register another table");
445        let another_fetched_page = registry
446            .table_registry_page::<AnotherTable>()
447            .expect("failed to get another table registry page");
448        assert_eq!(another_registry_page, another_fetched_page);
449
450        // re-init
451        let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload schema registry");
452        assert_eq!(registry, reloaded);
453        // should have two
454        assert_eq!(reloaded.tables.len(), 2);
455        assert_eq!(
456            reloaded
457                .table_registry_page::<User>()
458                .expect("failed to get first table registry page after reload"),
459            registry_page
460        );
461        assert_eq!(
462            reloaded
463                .table_registry_page::<AnotherTable>()
464                .expect("failed to get second table registry page after reload"),
465            another_registry_page
466        );
467    }
468
469    #[test]
470    fn test_register_table_writes_snapshot_to_ledger() {
471        let mut mm = make_mm();
472        let mut registry = SchemaRegistry::default();
473
474        let pages = registry
475            .register_table::<User>(&mut mm)
476            .expect("failed to register table");
477
478        let ledger = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm)
479            .expect("failed to load snapshot ledger after register_table");
480
481        assert_eq!(ledger.get(), &User::schema_snapshot());
482        assert_eq!(ledger.get().name, "users");
483    }
484
485    #[test]
486    fn test_register_table_returns_name_collision_when_hash_slot_belongs_to_another_name() {
487        let mut mm = make_mm();
488        let mut registry = SchemaRegistry::default();
489
490        // register `User` so its snapshot lives on disk
491        let pages = registry
492            .register_table::<User>(&mut mm)
493            .expect("failed to register user");
494
495        // simulate a hash collision by rewriting the persisted snapshot to carry a different name
496        let mut tampered = User::schema_snapshot();
497        tampered.name = "imposter".to_string();
498        mm.write_at(pages.schema_snapshot_page, 0, &tampered)
499            .expect("failed to overwrite snapshot");
500
501        let result = registry.register_table::<User>(&mut mm);
502        match result {
503            Err(MemoryError::NameCollision {
504                candidate,
505                existing,
506            }) => {
507                assert_eq!(candidate, "users");
508                assert_eq!(existing, "imposter");
509            }
510            other => panic!("expected NameCollision, got {other:?}"),
511        }
512    }
513
514    #[test]
515    fn test_should_not_register_same_table_twice() {
516        let mut mm = make_mm();
517        let mut registry = SchemaRegistry::default();
518
519        let first_page = registry
520            .register_table::<User>(&mut mm)
521            .expect("failed to register table first time");
522        let second_page = registry
523            .register_table::<User>(&mut mm)
524            .expect("failed to register table second time");
525
526        assert_eq!(first_page, second_page);
527        assert_eq!(registry.tables.len(), 1);
528    }
529
530    #[test]
531    fn test_should_init_index_ledger() {
532        let mut mm = make_mm();
533        let mut registry = SchemaRegistry::default();
534
535        let pages = registry
536            .register_table::<User>(&mut mm)
537            .expect("failed to register table");
538
539        // check that index ledger is initialized with the correct indexes
540        let mut index_ledger = IndexLedger::load(pages.index_registry_page, &mut mm)
541            .expect("failed to load index ledger");
542
543        // insert an index for id
544        index_ledger
545            .insert(
546                &["id"],
547                Int32::from(1i32),
548                RecordAddress { page: 1, offset: 0 },
549                &mut mm,
550            )
551            .expect("failed to insert index");
552        // search the index
553        let result = index_ledger
554            .search(&["id"], &Int32::from(1i32), &mut mm)
555            .expect("failed to search index")
556            .get(0)
557            .copied()
558            .expect("no index at 0");
559        assert_eq!(result, RecordAddress { page: 1, offset: 0 });
560    }
561
562    #[derive(Clone, CandidType)]
563    struct AnotherTable;
564
565    impl Encode for AnotherTable {
566        const SIZE: DataSize = DataSize::Dynamic;
567
568        const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
569
570        fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
571            std::borrow::Cow::Owned(vec![])
572        }
573
574        fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
575        where
576            Self: Sized,
577        {
578            Ok(AnotherTable)
579        }
580
581        fn size(&self) -> MSize {
582            0
583        }
584    }
585
586    #[derive(Clone, CandidType, Deserialize)]
587    struct AnotherTableRecord;
588
589    impl TableRecord for AnotherTableRecord {
590        type Schema = AnotherTable;
591
592        fn from_values(_values: TableColumns) -> Self {
593            AnotherTableRecord
594        }
595
596        fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
597            vec![]
598        }
599    }
600
601    #[derive(Clone, CandidType, Serialize)]
602    struct AnotherTableInsert;
603
604    impl InsertRecord for AnotherTableInsert {
605        type Record = AnotherTableRecord;
606        type Schema = AnotherTable;
607
608        fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
609            Ok(AnotherTableInsert)
610        }
611
612        fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
613            vec![]
614        }
615
616        fn into_record(self) -> Self::Schema {
617            AnotherTable
618        }
619    }
620
621    #[derive(Clone, CandidType, Serialize)]
622    struct AnotherTableUpdate;
623
624    impl UpdateRecord for AnotherTableUpdate {
625        type Record = AnotherTableRecord;
626        type Schema = AnotherTable;
627
628        fn from_values(
629            _values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
630            _where_clause: Option<wasm_dbms_api::prelude::Filter>,
631        ) -> Self {
632            AnotherTableUpdate
633        }
634
635        fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
636            vec![]
637        }
638
639        fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
640            None
641        }
642    }
643
644    impl TableSchema for AnotherTable {
645        type Record = AnotherTableRecord;
646        type Insert = AnotherTableInsert;
647        type Update = AnotherTableUpdate;
648        type ForeignFetcher = NoForeignFetcher;
649
650        fn table_name() -> &'static str {
651            "another_table"
652        }
653
654        fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
655            &[]
656        }
657
658        fn primary_key() -> &'static str {
659            ""
660        }
661
662        fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
663            &[]
664        }
665
666        fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
667            vec![]
668        }
669
670        fn sanitizer(
671            _column_name: &'static str,
672        ) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
673            None
674        }
675
676        fn validator(
677            _column_name: &'static str,
678        ) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
679            None
680        }
681    }
682
683    // -- User mock for tests --
684
685    #[derive(Clone, CandidType)]
686    struct User;
687
688    impl Encode for User {
689        const SIZE: DataSize = DataSize::Dynamic;
690        const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
691
692        fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
693            std::borrow::Cow::Owned(vec![])
694        }
695
696        fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
697        where
698            Self: Sized,
699        {
700            Ok(User)
701        }
702
703        fn size(&self) -> MSize {
704            0
705        }
706    }
707
708    #[derive(Clone, CandidType, Deserialize)]
709    struct UserRecord;
710
711    impl TableRecord for UserRecord {
712        type Schema = User;
713
714        fn from_values(_values: TableColumns) -> Self {
715            UserRecord
716        }
717
718        fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
719            vec![]
720        }
721    }
722
723    #[derive(Clone, CandidType, Serialize)]
724    struct UserInsert;
725
726    impl InsertRecord for UserInsert {
727        type Record = UserRecord;
728        type Schema = User;
729
730        fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
731            Ok(UserInsert)
732        }
733
734        fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
735            vec![]
736        }
737
738        fn into_record(self) -> Self::Schema {
739            User
740        }
741    }
742
743    #[derive(Clone, CandidType, Serialize)]
744    struct UserUpdate;
745
746    impl UpdateRecord for UserUpdate {
747        type Record = UserRecord;
748        type Schema = User;
749
750        fn from_values(
751            _values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
752            _where_clause: Option<wasm_dbms_api::prelude::Filter>,
753        ) -> Self {
754            UserUpdate
755        }
756
757        fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
758            vec![]
759        }
760
761        fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
762            None
763        }
764    }
765
766    impl TableSchema for User {
767        type Record = UserRecord;
768        type Insert = UserInsert;
769        type Update = UserUpdate;
770        type ForeignFetcher = NoForeignFetcher;
771
772        fn table_name() -> &'static str {
773            "users"
774        }
775
776        fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
777            &[]
778        }
779
780        fn primary_key() -> &'static str {
781            "id"
782        }
783
784        fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
785            &[IndexDef(&["id"])]
786        }
787
788        fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
789            vec![]
790        }
791
792        fn sanitizer(
793            _column_name: &'static str,
794        ) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
795            None
796        }
797
798        fn validator(
799            _column_name: &'static str,
800        ) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
801            None
802        }
803    }
804
805    #[test]
806    fn test_table_registry_page_returns_none_for_unregistered_table() {
807        let registry = SchemaRegistry::default();
808        assert!(registry.table_registry_page::<User>().is_none());
809    }
810
811    #[test]
812    fn test_empty_registry_encode_decode() {
813        let registry = SchemaRegistry::default();
814        let encoded = registry.encode();
815        let decoded = SchemaRegistry::decode(encoded).expect("failed to decode empty registry");
816        assert_eq!(registry, decoded);
817        assert_eq!(decoded.tables.len(), 0);
818    }
819
820    #[test]
821    fn test_load_fresh_memory_returns_empty_registry() {
822        let mut mm = make_mm();
823        let registry = SchemaRegistry::load(&mut mm).expect("failed to load from fresh memory");
824        assert_eq!(registry.tables.len(), 0);
825    }
826
827    #[test]
828    fn test_save_and_reload() {
829        let mut mm = make_mm();
830        let mut registry = SchemaRegistry::default();
831        registry
832            .register_table::<User>(&mut mm)
833            .expect("failed to register");
834        // Modify in-memory, then explicitly save
835        registry
836            .register_table::<AnotherTable>(&mut mm)
837            .expect("failed to register another");
838        registry.save(&mut mm).expect("failed to save");
839
840        let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload");
841        assert_eq!(reloaded.tables.len(), 2);
842        assert_eq!(registry, reloaded);
843    }
844
845    #[test]
846    fn test_schema_registry_size() {
847        let mut mm = make_mm();
848        let mut registry = SchemaRegistry::default();
849        // Empty size: 8 bytes for schema_hash + 8 bytes for table count.
850        assert_eq!(registry.size(), 16);
851        registry
852            .register_table::<User>(&mut mm)
853            .expect("failed to register");
854        // One entry without autoincrement:
855        // 16 + (8 + 4 + 4 + 4 + 4 + 1) = 41
856        // (1 byte for autoincrement flag, no page bytes since User has no autoincrement column)
857        assert_eq!(registry.size(), 41);
858    }
859
860    #[test]
861    fn test_should_allocate_autoincrement_page_when_column_has_autoincrement() {
862        let mut mm = make_mm();
863        let mut registry = SchemaRegistry::default();
864
865        let pages = registry
866            .register_table::<AutoincrementTable>(&mut mm)
867            .expect("failed to register autoincrement table");
868
869        assert!(
870            pages.autoincrement_registry_page.is_some(),
871            "autoincrement registry page should be allocated for tables with autoincrement columns"
872        );
873    }
874
875    #[test]
876    fn test_should_not_allocate_autoincrement_page_when_no_autoincrement_column() {
877        let mut mm = make_mm();
878        let mut registry = SchemaRegistry::default();
879
880        let pages = registry
881            .register_table::<User>(&mut mm)
882            .expect("failed to register user table");
883
884        assert!(
885            pages.autoincrement_registry_page.is_none(),
886            "autoincrement registry page should not be allocated for tables without autoincrement columns"
887        );
888    }
889
890    #[test]
891    fn test_schema_registry_size_with_autoincrement() {
892        let mut mm = make_mm();
893        let mut registry = SchemaRegistry::default();
894
895        registry
896            .register_table::<AutoincrementTable>(&mut mm)
897            .expect("failed to register");
898        // One entry with autoincrement:
899        // 16 + (8 + 4 + 4 + 4 + 4 + 1 + 4) = 45
900        // (1 byte for autoincrement flag + 4 bytes for the autoincrement page)
901        assert_eq!(registry.size(), 45);
902    }
903
904    #[test]
905    fn test_should_encode_and_decode_registry_with_autoincrement() {
906        let mut mm = make_mm();
907        let mut registry = SchemaRegistry::default();
908
909        registry
910            .register_table::<AutoincrementTable>(&mut mm)
911            .expect("failed to register");
912
913        let encoded = registry.encode();
914        let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
915        assert_eq!(registry, decoded);
916
917        let page = decoded
918            .table_registry_page::<AutoincrementTable>()
919            .expect("missing autoincrement table");
920        assert!(page.autoincrement_registry_page.is_some());
921    }
922
923    // -- AutoincrementTable mock for tests --
924
925    #[derive(Clone, CandidType)]
926    struct AutoincrementTable;
927
928    impl Encode for AutoincrementTable {
929        const SIZE: DataSize = DataSize::Dynamic;
930        const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
931
932        fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
933            std::borrow::Cow::Owned(vec![])
934        }
935
936        fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
937        where
938            Self: Sized,
939        {
940            Ok(AutoincrementTable)
941        }
942
943        fn size(&self) -> MSize {
944            0
945        }
946    }
947
948    #[derive(Clone, CandidType, Deserialize)]
949    struct AutoincrementTableRecord;
950
951    impl TableRecord for AutoincrementTableRecord {
952        type Schema = AutoincrementTable;
953
954        fn from_values(_values: TableColumns) -> Self {
955            AutoincrementTableRecord
956        }
957
958        fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
959            vec![]
960        }
961    }
962
963    #[derive(Clone, CandidType, Serialize)]
964    struct AutoincrementTableInsert;
965
966    impl InsertRecord for AutoincrementTableInsert {
967        type Record = AutoincrementTableRecord;
968        type Schema = AutoincrementTable;
969
970        fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
971            Ok(AutoincrementTableInsert)
972        }
973
974        fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
975            vec![]
976        }
977
978        fn into_record(self) -> Self::Schema {
979            AutoincrementTable
980        }
981    }
982
983    #[derive(Clone, CandidType, Serialize)]
984    struct AutoincrementTableUpdate;
985
986    impl UpdateRecord for AutoincrementTableUpdate {
987        type Record = AutoincrementTableRecord;
988        type Schema = AutoincrementTable;
989
990        fn from_values(
991            _values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
992            _where_clause: Option<wasm_dbms_api::prelude::Filter>,
993        ) -> Self {
994            AutoincrementTableUpdate
995        }
996
997        fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
998            vec![]
999        }
1000
1001        fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
1002            None
1003        }
1004    }
1005
1006    impl TableSchema for AutoincrementTable {
1007        type Record = AutoincrementTableRecord;
1008        type Insert = AutoincrementTableInsert;
1009        type Update = AutoincrementTableUpdate;
1010        type ForeignFetcher = NoForeignFetcher;
1011
1012        fn table_name() -> &'static str {
1013            "autoincrement_table"
1014        }
1015
1016        fn columns() -> &'static [ColumnDef] {
1017            use wasm_dbms_api::prelude::DataTypeKind;
1018
1019            &[ColumnDef {
1020                name: "id",
1021                data_type: DataTypeKind::Uint32,
1022                auto_increment: true,
1023                nullable: false,
1024                primary_key: true,
1025                unique: true,
1026                foreign_key: None,
1027                default: None,
1028                renamed_from: &[],
1029            }]
1030        }
1031
1032        fn primary_key() -> &'static str {
1033            "id"
1034        }
1035
1036        fn indexes() -> &'static [IndexDef] {
1037            &[IndexDef(&["id"])]
1038        }
1039
1040        fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
1041            vec![]
1042        }
1043
1044        fn sanitizer(
1045            _column_name: &'static str,
1046        ) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
1047            None
1048        }
1049
1050        fn validator(
1051            _column_name: &'static str,
1052        ) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
1053            None
1054        }
1055    }
1056
1057    // -- Migration-engine entry points -------------------------------------
1058
1059    use wasm_dbms_api::prelude::{ColumnSnapshot, DataTypeSnapshot, TableSchemaSnapshot};
1060
1061    fn dummy_snapshot(name: &str) -> TableSchemaSnapshot {
1062        TableSchemaSnapshot {
1063            version: TableSchemaSnapshot::latest_version(),
1064            name: name.to_string(),
1065            primary_key: "id".to_string(),
1066            alignment: 8,
1067            columns: vec![ColumnSnapshot {
1068                name: "id".to_string(),
1069                data_type: DataTypeSnapshot::Uint32,
1070                nullable: false,
1071                auto_increment: false,
1072                unique: true,
1073                primary_key: true,
1074                foreign_key: None,
1075                default: None,
1076            }],
1077            indexes: vec![],
1078        }
1079    }
1080
1081    #[test]
1082    fn test_table_registry_page_by_name_returns_pages_for_registered_table() {
1083        let mut mm = make_mm();
1084        let mut registry = SchemaRegistry::default();
1085        let pages = registry
1086            .register_table::<User>(&mut mm)
1087            .expect("failed to register user");
1088
1089        let by_name = registry
1090            .table_registry_page_by_name("users")
1091            .expect("missing pages by name");
1092        assert_eq!(by_name, pages);
1093    }
1094
1095    #[test]
1096    fn test_table_registry_page_by_name_returns_none_for_unknown_table() {
1097        let registry = SchemaRegistry::default();
1098        assert!(registry.table_registry_page_by_name("missing").is_none());
1099    }
1100
1101    #[test]
1102    fn test_stored_snapshots_returns_empty_for_unregistered_registry() {
1103        let mut mm = make_mm();
1104        let registry = SchemaRegistry::default();
1105        let snapshots = registry
1106            .stored_snapshots(&mut mm)
1107            .expect("failed to read snapshots");
1108        assert!(snapshots.is_empty());
1109    }
1110
1111    #[test]
1112    fn test_stored_snapshots_returns_one_entry_per_registered_table() {
1113        let mut mm = make_mm();
1114        let mut registry = SchemaRegistry::default();
1115        registry
1116            .register_table::<User>(&mut mm)
1117            .expect("failed to register user");
1118        registry
1119            .register_table::<AnotherTable>(&mut mm)
1120            .expect("failed to register another");
1121
1122        let snapshots = registry
1123            .stored_snapshots(&mut mm)
1124            .expect("failed to load snapshots");
1125        assert_eq!(snapshots.len(), 2);
1126        let names: Vec<&str> = snapshots.iter().map(|snap| snap.name.as_str()).collect();
1127        assert!(names.contains(&"users"));
1128        assert!(names.contains(&"another_table"));
1129    }
1130
1131    #[test]
1132    fn test_register_table_from_snapshot_allocates_pages_and_persists_snapshot() {
1133        let mut mm = make_mm();
1134        let mut registry = SchemaRegistry::default();
1135        let snapshot = dummy_snapshot("fresh");
1136
1137        let pages = registry
1138            .register_table_from_snapshot(&snapshot, &mut mm)
1139            .expect("failed to register from snapshot");
1140
1141        let loaded = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm).expect("load");
1142        assert_eq!(loaded.get(), &snapshot);
1143        assert!(registry.table_registry_page_by_name("fresh").is_some());
1144    }
1145
1146    #[test]
1147    fn test_register_table_from_snapshot_is_idempotent_for_same_name() {
1148        let mut mm = make_mm();
1149        let mut registry = SchemaRegistry::default();
1150        let snapshot = dummy_snapshot("fresh");
1151
1152        let first = registry
1153            .register_table_from_snapshot(&snapshot, &mut mm)
1154            .expect("first");
1155        let second = registry
1156            .register_table_from_snapshot(&snapshot, &mut mm)
1157            .expect("second");
1158        assert_eq!(first, second);
1159    }
1160
1161    #[test]
1162    fn test_register_table_from_snapshot_detects_name_collision() {
1163        let mut mm = make_mm();
1164        let mut registry = SchemaRegistry::default();
1165        let snapshot = dummy_snapshot("users");
1166
1167        let pages = registry
1168            .register_table_from_snapshot(&snapshot, &mut mm)
1169            .expect("first");
1170
1171        // Tamper persisted snapshot to simulate a colliding fingerprint with a
1172        // different name.
1173        let mut tampered = snapshot.clone();
1174        tampered.name = "imposter".to_string();
1175        mm.write_at(pages.schema_snapshot_page, 0, &tampered)
1176            .expect("overwrite");
1177
1178        let result = registry.register_table_from_snapshot(&snapshot, &mut mm);
1179        assert!(matches!(
1180            result,
1181            Err(MemoryError::NameCollision {
1182                ref candidate,
1183                ref existing,
1184            }) if candidate == "users" && existing == "imposter"
1185        ));
1186    }
1187
1188    #[test]
1189    fn test_unregister_table_removes_entry_and_returns_previous_pages() {
1190        let mut mm = make_mm();
1191        let mut registry = SchemaRegistry::default();
1192        let pages = registry
1193            .register_table::<User>(&mut mm)
1194            .expect("failed to register");
1195
1196        let removed = registry
1197            .unregister_table("users", &mut mm)
1198            .expect("unregister");
1199        assert_eq!(removed, Some(pages));
1200        assert!(registry.table_registry_page_by_name("users").is_none());
1201    }
1202
1203    #[test]
1204    fn test_unregister_table_returns_none_for_unknown_table() {
1205        let mut mm = make_mm();
1206        let mut registry = SchemaRegistry::default();
1207        let removed = registry
1208            .unregister_table("missing", &mut mm)
1209            .expect("unregister");
1210        assert!(removed.is_none());
1211    }
1212
1213    #[test]
1214    fn test_unregister_table_releases_pages_for_reuse() {
1215        // Registering a table allocates 4 metadata pages (no autoincrement on
1216        // `User`) plus an index B-tree root. Unregistering must hand each
1217        // back to the unclaimed-pages ledger so subsequent claims pop them
1218        // before bumping the high-water mark.
1219        let mut mm = make_mm();
1220        let mut registry = SchemaRegistry::default();
1221
1222        let pages = registry.register_table::<User>(&mut mm).expect("register");
1223
1224        let last_page_before_drop = mm.last_page().expect("at least one page");
1225
1226        registry
1227            .unregister_table("users", &mut mm)
1228            .expect("unregister")
1229            .expect("expected returned pages");
1230
1231        // Re-claim every previously-owned page; none of these should bump
1232        // the high-water mark.
1233        let mut reclaimed = Vec::new();
1234        for _ in 0..5 {
1235            let page = mm.claim_page().expect("claim");
1236            assert!(
1237                page <= last_page_before_drop,
1238                "expected reclaimed page <= {last_page_before_drop}, got {page}"
1239            );
1240            reclaimed.push(page);
1241        }
1242        // Pages handed out must include every TableRegistryPage page.
1243        assert!(reclaimed.contains(&pages.schema_snapshot_page));
1244        assert!(reclaimed.contains(&pages.pages_list_page));
1245        assert!(reclaimed.contains(&pages.free_segments_page));
1246        assert!(reclaimed.contains(&pages.index_registry_page));
1247    }
1248
1249    #[test]
1250    fn test_unregister_table_rejects_release_when_unclaimed_ledger_is_full() {
1251        let mut mm = make_mm();
1252        let mut registry = SchemaRegistry::default();
1253        let pages = registry.register_table::<User>(&mut mm).expect("register");
1254
1255        let mut ledger = UnclaimedPages::new();
1256        for page in 0..(UNCLAIMED_PAGES_CAPACITY - 1) {
1257            ledger.push(page).expect("fill ledger");
1258        }
1259        mm.write_at(mm.unclaimed_pages_page(), 0, &ledger)
1260            .expect("persist ledger");
1261
1262        let err = registry
1263            .unregister_table("users", &mut mm)
1264            .expect_err("drop must fail before partially releasing pages");
1265        assert!(matches!(err, MemoryError::UnclaimedPagesFull { .. }));
1266
1267        assert!(
1268            registry.table_registry_page_by_name("users").is_some(),
1269            "registry entry must remain when release is rejected"
1270        );
1271
1272        let snapshot = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm)
1273            .expect("load snapshot ledger")
1274            .get()
1275            .clone();
1276        assert_eq!(snapshot.name, "users");
1277    }
1278}