use std::collections::HashMap;
use wasm_dbms_api::memory::MemoryError;
use wasm_dbms_api::prelude::{
DEFAULT_ALIGNMENT, DataSize, Encode, MSize, MemoryResult, Page, PageOffset, TableFingerprint,
TableSchema, TableSchemaSnapshot, fingerprint_for_name,
};
use xxhash_rust::xxh3::Xxh3;
use crate::memory_manager::{SCHEMA_PAGE, UNCLAIMED_PAGES_PAGE};
use crate::table_registry::{AutoincrementLedger, IndexLedger, SchemaSnapshotLedger};
use crate::{MemoryAccess, TableRegistry, UnclaimedPages};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TableRegistryPage {
pub schema_snapshot_page: Page,
pub pages_list_page: Page,
pub free_segments_page: Page,
pub index_registry_page: Page,
pub autoincrement_registry_page: Option<Page>,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct SchemaRegistry {
schema_hash: u64,
tables: HashMap<TableFingerprint, TableRegistryPage>,
}
impl SchemaRegistry {
pub fn load(mm: &mut impl MemoryAccess) -> MemoryResult<Self> {
let registry: Self = mm.read_at(SCHEMA_PAGE, 0)?;
Ok(registry)
}
pub const fn schema_hash(&self) -> u64 {
self.schema_hash
}
pub fn register_table<TS>(
&mut self,
mm: &mut impl MemoryAccess,
) -> MemoryResult<TableRegistryPage>
where
TS: TableSchema,
{
let fingerprint = TS::fingerprint();
let candidate_name = TS::table_name();
if let Some(pages) = self.tables.get(&fingerprint).copied() {
let existing = SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)?;
if existing.get().name != candidate_name {
return Err(MemoryError::NameCollision {
candidate: candidate_name.to_string(),
existing: existing.get().name.clone(),
});
}
return Ok(pages);
}
let schema_snapshot_page = mm.claim_page()?;
let pages_list_page = mm.claim_page()?;
let free_segments_page = mm.claim_page()?;
let index_registry_page = mm.claim_page()?;
let has_autoincrement = TS::columns().iter().any(|col| col.auto_increment);
let autoincrement_registry_page = if has_autoincrement {
Some(mm.claim_page()?)
} else {
None
};
let pages = TableRegistryPage {
schema_snapshot_page,
pages_list_page,
free_segments_page,
index_registry_page,
autoincrement_registry_page,
};
self.tables.insert(fingerprint, pages);
SchemaSnapshotLedger::init::<TS>(pages.schema_snapshot_page, mm)?;
IndexLedger::init(pages.index_registry_page, TS::indexes(), mm)?;
if let Some(autoinc_page) = pages.autoincrement_registry_page {
AutoincrementLedger::init::<TS>(autoinc_page, mm)?;
}
self.refresh_schema_hash(mm)?;
self.save(mm)?;
Ok(pages)
}
pub fn save(&self, mm: &mut impl MemoryAccess) -> MemoryResult<()> {
mm.write_at(SCHEMA_PAGE, 0, self)
}
pub fn table_registry_page<TS>(&self) -> Option<TableRegistryPage>
where
TS: TableSchema,
{
self.tables.get(&TS::fingerprint()).copied()
}
pub fn table_registry_page_by_name(&self, name: &str) -> Option<TableRegistryPage> {
self.tables.get(&fingerprint_for_name(name)).copied()
}
pub fn register_table_from_snapshot(
&mut self,
snapshot: &TableSchemaSnapshot,
mm: &mut impl MemoryAccess,
) -> MemoryResult<TableRegistryPage> {
let fingerprint = fingerprint_for_name(&snapshot.name);
let candidate_name = snapshot.name.as_str();
if let Some(pages) = self.tables.get(&fingerprint).copied() {
let existing = SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)?;
if existing.get().name != candidate_name {
return Err(MemoryError::NameCollision {
candidate: candidate_name.to_string(),
existing: existing.get().name.clone(),
});
}
return Ok(pages);
}
let schema_snapshot_page = mm.claim_page()?;
let pages_list_page = mm.claim_page()?;
let free_segments_page = mm.claim_page()?;
let index_registry_page = mm.claim_page()?;
let has_autoincrement = snapshot.columns.iter().any(|col| col.auto_increment);
let autoincrement_registry_page = if has_autoincrement {
Some(mm.claim_page()?)
} else {
None
};
let pages = TableRegistryPage {
schema_snapshot_page,
pages_list_page,
free_segments_page,
index_registry_page,
autoincrement_registry_page,
};
self.tables.insert(fingerprint, pages);
mm.write_at(pages.schema_snapshot_page, 0, snapshot)?;
IndexLedger::init_from_keys(
pages.index_registry_page,
snapshot.indexes.iter().map(|idx| idx.columns.clone()),
mm,
)?;
self.refresh_schema_hash(mm)?;
self.save(mm)?;
Ok(pages)
}
pub fn unregister_table(
&mut self,
name: &str,
mm: &mut impl MemoryAccess,
) -> MemoryResult<Option<TableRegistryPage>> {
let fingerprint = fingerprint_for_name(name);
let pages = self.tables.get(&fingerprint).copied();
if let Some(pages) = pages {
let registry = TableRegistry::load(pages, mm)?;
let pages_to_release = registry.releasable_pages_count(pages, mm)?;
let ledger: UnclaimedPages = mm.read_at(UNCLAIMED_PAGES_PAGE, 0)?;
if ledger.remaining_capacity() < pages_to_release as u32 {
return Err(MemoryError::UnclaimedPagesFull {
capacity: crate::UNCLAIMED_PAGES_CAPACITY,
});
}
let removed = self.tables.remove(&fingerprint);
debug_assert_eq!(removed, Some(pages));
registry.release_pages(pages, mm)?;
self.refresh_schema_hash(mm)?;
self.save(mm)?;
return Ok(removed);
}
Ok(None)
}
pub fn stored_snapshots(
&self,
mm: &mut impl MemoryAccess,
) -> MemoryResult<Vec<TableSchemaSnapshot>> {
self.tables
.values()
.map(|pages| {
SchemaSnapshotLedger::load(pages.schema_snapshot_page, mm)
.map(|ledger| ledger.get().clone())
})
.collect()
}
pub fn refresh_schema_hash(&mut self, mm: &mut impl MemoryAccess) -> MemoryResult<()> {
self.schema_hash = compute_hash(self.stored_snapshots(mm)?);
Ok(())
}
}
fn compute_hash(mut snapshots: Vec<TableSchemaSnapshot>) -> u64 {
snapshots.sort_by(|a, b| a.name.cmp(&b.name));
let mut hasher = Xxh3::new();
hasher.update(&[TableSchemaSnapshot::latest_version()]);
for snapshot in &snapshots {
let bytes = snapshot.encode();
hasher.update(&(bytes.len() as u64).to_le_bytes());
hasher.update(&bytes);
}
hasher.digest()
}
impl Encode for SchemaRegistry {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
let mut buffer = Vec::with_capacity(self.size() as usize);
buffer.extend_from_slice(&self.schema_hash.to_le_bytes());
buffer.extend_from_slice(&(self.tables.len() as u64).to_le_bytes());
for (fingerprint, page) in &self.tables {
buffer.extend_from_slice(&fingerprint.to_le_bytes());
buffer.extend_from_slice(&page.schema_snapshot_page.to_le_bytes());
buffer.extend_from_slice(&page.pages_list_page.to_le_bytes());
buffer.extend_from_slice(&page.free_segments_page.to_le_bytes());
buffer.extend_from_slice(&page.index_registry_page.to_le_bytes());
if let Some(autoinc_page) = page.autoincrement_registry_page {
buffer.push(1); buffer.extend_from_slice(&autoinc_page.to_le_bytes());
} else {
buffer.push(0); }
}
std::borrow::Cow::Owned(buffer)
}
fn decode(data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
let mut offset = 0;
let schema_hash = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
offset += 8;
let len = u64::from_le_bytes(
data[offset..offset + 8]
.try_into()
.expect("failed to read length"),
) as usize;
offset += 8;
let mut tables = HashMap::with_capacity(len);
for _ in 0..len {
let fingerprint = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
offset += 8;
let schema_snapshot_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
let pages_list_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
let free_segments_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
let index_registry_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
let has_autoincrement = data[offset] == 1;
offset += 1;
let autoincrement_registry_page = if has_autoincrement {
let page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
Some(page)
} else {
None
};
tables.insert(
fingerprint,
TableRegistryPage {
schema_snapshot_page,
pages_list_page,
free_segments_page,
index_registry_page,
autoincrement_registry_page,
},
);
}
Ok(Self {
schema_hash,
tables,
})
}
fn size(&self) -> MSize {
let autoinc_pages = self
.tables
.values()
.filter(|page| page.autoincrement_registry_page.is_some())
.count() as MSize;
16 + (self.tables.len() as MSize * (4 * 4 + 8 + 1)) + (autoinc_pages * 4)
}
}
#[cfg(test)]
mod tests {
use candid::CandidType;
use serde::{Deserialize, Serialize};
use wasm_dbms_api::prelude::{
ColumnDef, DbmsResult, IndexDef, InsertRecord, Int32, NoForeignFetcher, TableColumns,
TableRecord, UpdateRecord,
};
use super::*;
use crate::{
HeapMemoryProvider, MemoryAccess, MemoryManager, RecordAddress, UNCLAIMED_PAGES_CAPACITY,
UnclaimedPages,
};
fn make_mm() -> MemoryManager<HeapMemoryProvider> {
MemoryManager::init(HeapMemoryProvider::default())
}
#[test]
fn test_should_encode_and_decode_schema_registry() {
let mut mm = make_mm();
let mut registry =
SchemaRegistry::load(&mut mm).expect("failed to load init schema registry");
let registry_page = registry
.register_table::<User>(&mut mm)
.expect("failed to register table");
let fetched_page = registry
.table_registry_page::<User>()
.expect("failed to get table registry page");
assert_eq!(registry_page, fetched_page);
let encoded = registry.encode();
let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
assert_eq!(registry, decoded);
let another_registry_page = registry
.register_table::<AnotherTable>(&mut mm)
.expect("failed to register another table");
let another_fetched_page = registry
.table_registry_page::<AnotherTable>()
.expect("failed to get another table registry page");
assert_eq!(another_registry_page, another_fetched_page);
let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload schema registry");
assert_eq!(registry, reloaded);
assert_eq!(reloaded.tables.len(), 2);
assert_eq!(
reloaded
.table_registry_page::<User>()
.expect("failed to get first table registry page after reload"),
registry_page
);
assert_eq!(
reloaded
.table_registry_page::<AnotherTable>()
.expect("failed to get second table registry page after reload"),
another_registry_page
);
}
#[test]
fn test_register_table_writes_snapshot_to_ledger() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register table");
let ledger = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm)
.expect("failed to load snapshot ledger after register_table");
assert_eq!(ledger.get(), &User::schema_snapshot());
assert_eq!(ledger.get().name, "users");
}
#[test]
fn test_register_table_returns_name_collision_when_hash_slot_belongs_to_another_name() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register user");
let mut tampered = User::schema_snapshot();
tampered.name = "imposter".to_string();
mm.write_at(pages.schema_snapshot_page, 0, &tampered)
.expect("failed to overwrite snapshot");
let result = registry.register_table::<User>(&mut mm);
match result {
Err(MemoryError::NameCollision {
candidate,
existing,
}) => {
assert_eq!(candidate, "users");
assert_eq!(existing, "imposter");
}
other => panic!("expected NameCollision, got {other:?}"),
}
}
#[test]
fn test_should_not_register_same_table_twice() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let first_page = registry
.register_table::<User>(&mut mm)
.expect("failed to register table first time");
let second_page = registry
.register_table::<User>(&mut mm)
.expect("failed to register table second time");
assert_eq!(first_page, second_page);
assert_eq!(registry.tables.len(), 1);
}
#[test]
fn test_should_init_index_ledger() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register table");
let mut index_ledger = IndexLedger::load(pages.index_registry_page, &mut mm)
.expect("failed to load index ledger");
index_ledger
.insert(
&["id"],
Int32::from(1i32),
RecordAddress { page: 1, offset: 0 },
&mut mm,
)
.expect("failed to insert index");
let result = index_ledger
.search(&["id"], &Int32::from(1i32), &mut mm)
.expect("failed to search index")
.get(0)
.copied()
.expect("no index at 0");
assert_eq!(result, RecordAddress { page: 1, offset: 0 });
}
#[derive(Clone, CandidType)]
struct AnotherTable;
impl Encode for AnotherTable {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
std::borrow::Cow::Owned(vec![])
}
fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
Ok(AnotherTable)
}
fn size(&self) -> MSize {
0
}
}
#[derive(Clone, CandidType, Deserialize)]
struct AnotherTableRecord;
impl TableRecord for AnotherTableRecord {
type Schema = AnotherTable;
fn from_values(_values: TableColumns) -> Self {
AnotherTableRecord
}
fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
}
#[derive(Clone, CandidType, Serialize)]
struct AnotherTableInsert;
impl InsertRecord for AnotherTableInsert {
type Record = AnotherTableRecord;
type Schema = AnotherTable;
fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
Ok(AnotherTableInsert)
}
fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn into_record(self) -> Self::Schema {
AnotherTable
}
}
#[derive(Clone, CandidType, Serialize)]
struct AnotherTableUpdate;
impl UpdateRecord for AnotherTableUpdate {
type Record = AnotherTableRecord;
type Schema = AnotherTable;
fn from_values(
_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
_where_clause: Option<wasm_dbms_api::prelude::Filter>,
) -> Self {
AnotherTableUpdate
}
fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
None
}
}
impl TableSchema for AnotherTable {
type Record = AnotherTableRecord;
type Insert = AnotherTableInsert;
type Update = AnotherTableUpdate;
type ForeignFetcher = NoForeignFetcher;
fn table_name() -> &'static str {
"another_table"
}
fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
&[]
}
fn primary_key() -> &'static str {
""
}
fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
&[]
}
fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn sanitizer(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
None
}
fn validator(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
None
}
}
#[derive(Clone, CandidType)]
struct User;
impl Encode for User {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
std::borrow::Cow::Owned(vec![])
}
fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
Ok(User)
}
fn size(&self) -> MSize {
0
}
}
#[derive(Clone, CandidType, Deserialize)]
struct UserRecord;
impl TableRecord for UserRecord {
type Schema = User;
fn from_values(_values: TableColumns) -> Self {
UserRecord
}
fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
}
#[derive(Clone, CandidType, Serialize)]
struct UserInsert;
impl InsertRecord for UserInsert {
type Record = UserRecord;
type Schema = User;
fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
Ok(UserInsert)
}
fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn into_record(self) -> Self::Schema {
User
}
}
#[derive(Clone, CandidType, Serialize)]
struct UserUpdate;
impl UpdateRecord for UserUpdate {
type Record = UserRecord;
type Schema = User;
fn from_values(
_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
_where_clause: Option<wasm_dbms_api::prelude::Filter>,
) -> Self {
UserUpdate
}
fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
None
}
}
impl TableSchema for User {
type Record = UserRecord;
type Insert = UserInsert;
type Update = UserUpdate;
type ForeignFetcher = NoForeignFetcher;
fn table_name() -> &'static str {
"users"
}
fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
&[]
}
fn primary_key() -> &'static str {
"id"
}
fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
&[IndexDef(&["id"])]
}
fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn sanitizer(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
None
}
fn validator(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
None
}
}
#[test]
fn test_table_registry_page_returns_none_for_unregistered_table() {
let registry = SchemaRegistry::default();
assert!(registry.table_registry_page::<User>().is_none());
}
#[test]
fn test_empty_registry_encode_decode() {
let registry = SchemaRegistry::default();
let encoded = registry.encode();
let decoded = SchemaRegistry::decode(encoded).expect("failed to decode empty registry");
assert_eq!(registry, decoded);
assert_eq!(decoded.tables.len(), 0);
}
#[test]
fn test_load_fresh_memory_returns_empty_registry() {
let mut mm = make_mm();
let registry = SchemaRegistry::load(&mut mm).expect("failed to load from fresh memory");
assert_eq!(registry.tables.len(), 0);
}
#[test]
fn test_save_and_reload() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
registry
.register_table::<User>(&mut mm)
.expect("failed to register");
registry
.register_table::<AnotherTable>(&mut mm)
.expect("failed to register another");
registry.save(&mut mm).expect("failed to save");
let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload");
assert_eq!(reloaded.tables.len(), 2);
assert_eq!(registry, reloaded);
}
#[test]
fn test_schema_registry_size() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
assert_eq!(registry.size(), 16);
registry
.register_table::<User>(&mut mm)
.expect("failed to register");
assert_eq!(registry.size(), 41);
}
#[test]
fn test_should_allocate_autoincrement_page_when_column_has_autoincrement() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<AutoincrementTable>(&mut mm)
.expect("failed to register autoincrement table");
assert!(
pages.autoincrement_registry_page.is_some(),
"autoincrement registry page should be allocated for tables with autoincrement columns"
);
}
#[test]
fn test_should_not_allocate_autoincrement_page_when_no_autoincrement_column() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register user table");
assert!(
pages.autoincrement_registry_page.is_none(),
"autoincrement registry page should not be allocated for tables without autoincrement columns"
);
}
#[test]
fn test_schema_registry_size_with_autoincrement() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
registry
.register_table::<AutoincrementTable>(&mut mm)
.expect("failed to register");
assert_eq!(registry.size(), 45);
}
#[test]
fn test_should_encode_and_decode_registry_with_autoincrement() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
registry
.register_table::<AutoincrementTable>(&mut mm)
.expect("failed to register");
let encoded = registry.encode();
let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
assert_eq!(registry, decoded);
let page = decoded
.table_registry_page::<AutoincrementTable>()
.expect("missing autoincrement table");
assert!(page.autoincrement_registry_page.is_some());
}
#[derive(Clone, CandidType)]
struct AutoincrementTable;
impl Encode for AutoincrementTable {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
std::borrow::Cow::Owned(vec![])
}
fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
Ok(AutoincrementTable)
}
fn size(&self) -> MSize {
0
}
}
#[derive(Clone, CandidType, Deserialize)]
struct AutoincrementTableRecord;
impl TableRecord for AutoincrementTableRecord {
type Schema = AutoincrementTable;
fn from_values(_values: TableColumns) -> Self {
AutoincrementTableRecord
}
fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
}
#[derive(Clone, CandidType, Serialize)]
struct AutoincrementTableInsert;
impl InsertRecord for AutoincrementTableInsert {
type Record = AutoincrementTableRecord;
type Schema = AutoincrementTable;
fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
Ok(AutoincrementTableInsert)
}
fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn into_record(self) -> Self::Schema {
AutoincrementTable
}
}
#[derive(Clone, CandidType, Serialize)]
struct AutoincrementTableUpdate;
impl UpdateRecord for AutoincrementTableUpdate {
type Record = AutoincrementTableRecord;
type Schema = AutoincrementTable;
fn from_values(
_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
_where_clause: Option<wasm_dbms_api::prelude::Filter>,
) -> Self {
AutoincrementTableUpdate
}
fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
None
}
}
impl TableSchema for AutoincrementTable {
type Record = AutoincrementTableRecord;
type Insert = AutoincrementTableInsert;
type Update = AutoincrementTableUpdate;
type ForeignFetcher = NoForeignFetcher;
fn table_name() -> &'static str {
"autoincrement_table"
}
fn columns() -> &'static [ColumnDef] {
use wasm_dbms_api::prelude::DataTypeKind;
&[ColumnDef {
name: "id",
data_type: DataTypeKind::Uint32,
auto_increment: true,
nullable: false,
primary_key: true,
unique: true,
foreign_key: None,
default: None,
renamed_from: &[],
}]
}
fn primary_key() -> &'static str {
"id"
}
fn indexes() -> &'static [IndexDef] {
&[IndexDef(&["id"])]
}
fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn sanitizer(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
None
}
fn validator(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
None
}
}
use wasm_dbms_api::prelude::{ColumnSnapshot, DataTypeSnapshot, TableSchemaSnapshot};
fn dummy_snapshot(name: &str) -> TableSchemaSnapshot {
TableSchemaSnapshot {
version: TableSchemaSnapshot::latest_version(),
name: name.to_string(),
primary_key: "id".to_string(),
alignment: 8,
columns: vec![ColumnSnapshot {
name: "id".to_string(),
data_type: DataTypeSnapshot::Uint32,
nullable: false,
auto_increment: false,
unique: true,
primary_key: true,
foreign_key: None,
default: None,
}],
indexes: vec![],
}
}
#[test]
fn test_table_registry_page_by_name_returns_pages_for_registered_table() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register user");
let by_name = registry
.table_registry_page_by_name("users")
.expect("missing pages by name");
assert_eq!(by_name, pages);
}
#[test]
fn test_table_registry_page_by_name_returns_none_for_unknown_table() {
let registry = SchemaRegistry::default();
assert!(registry.table_registry_page_by_name("missing").is_none());
}
#[test]
fn test_stored_snapshots_returns_empty_for_unregistered_registry() {
let mut mm = make_mm();
let registry = SchemaRegistry::default();
let snapshots = registry
.stored_snapshots(&mut mm)
.expect("failed to read snapshots");
assert!(snapshots.is_empty());
}
#[test]
fn test_stored_snapshots_returns_one_entry_per_registered_table() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
registry
.register_table::<User>(&mut mm)
.expect("failed to register user");
registry
.register_table::<AnotherTable>(&mut mm)
.expect("failed to register another");
let snapshots = registry
.stored_snapshots(&mut mm)
.expect("failed to load snapshots");
assert_eq!(snapshots.len(), 2);
let names: Vec<&str> = snapshots.iter().map(|snap| snap.name.as_str()).collect();
assert!(names.contains(&"users"));
assert!(names.contains(&"another_table"));
}
#[test]
fn test_register_table_from_snapshot_allocates_pages_and_persists_snapshot() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let snapshot = dummy_snapshot("fresh");
let pages = registry
.register_table_from_snapshot(&snapshot, &mut mm)
.expect("failed to register from snapshot");
let loaded = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm).expect("load");
assert_eq!(loaded.get(), &snapshot);
assert!(registry.table_registry_page_by_name("fresh").is_some());
}
#[test]
fn test_register_table_from_snapshot_is_idempotent_for_same_name() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let snapshot = dummy_snapshot("fresh");
let first = registry
.register_table_from_snapshot(&snapshot, &mut mm)
.expect("first");
let second = registry
.register_table_from_snapshot(&snapshot, &mut mm)
.expect("second");
assert_eq!(first, second);
}
#[test]
fn test_register_table_from_snapshot_detects_name_collision() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let snapshot = dummy_snapshot("users");
let pages = registry
.register_table_from_snapshot(&snapshot, &mut mm)
.expect("first");
let mut tampered = snapshot.clone();
tampered.name = "imposter".to_string();
mm.write_at(pages.schema_snapshot_page, 0, &tampered)
.expect("overwrite");
let result = registry.register_table_from_snapshot(&snapshot, &mut mm);
assert!(matches!(
result,
Err(MemoryError::NameCollision {
ref candidate,
ref existing,
}) if candidate == "users" && existing == "imposter"
));
}
#[test]
fn test_unregister_table_removes_entry_and_returns_previous_pages() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register");
let removed = registry
.unregister_table("users", &mut mm)
.expect("unregister");
assert_eq!(removed, Some(pages));
assert!(registry.table_registry_page_by_name("users").is_none());
}
#[test]
fn test_unregister_table_returns_none_for_unknown_table() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let removed = registry
.unregister_table("missing", &mut mm)
.expect("unregister");
assert!(removed.is_none());
}
#[test]
fn test_unregister_table_releases_pages_for_reuse() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry.register_table::<User>(&mut mm).expect("register");
let last_page_before_drop = mm.last_page().expect("at least one page");
registry
.unregister_table("users", &mut mm)
.expect("unregister")
.expect("expected returned pages");
let mut reclaimed = Vec::new();
for _ in 0..5 {
let page = mm.claim_page().expect("claim");
assert!(
page <= last_page_before_drop,
"expected reclaimed page <= {last_page_before_drop}, got {page}"
);
reclaimed.push(page);
}
assert!(reclaimed.contains(&pages.schema_snapshot_page));
assert!(reclaimed.contains(&pages.pages_list_page));
assert!(reclaimed.contains(&pages.free_segments_page));
assert!(reclaimed.contains(&pages.index_registry_page));
}
#[test]
fn test_unregister_table_rejects_release_when_unclaimed_ledger_is_full() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry.register_table::<User>(&mut mm).expect("register");
let mut ledger = UnclaimedPages::new();
for page in 0..(UNCLAIMED_PAGES_CAPACITY - 1) {
ledger.push(page).expect("fill ledger");
}
mm.write_at(mm.unclaimed_pages_page(), 0, &ledger)
.expect("persist ledger");
let err = registry
.unregister_table("users", &mut mm)
.expect_err("drop must fail before partially releasing pages");
assert!(matches!(err, MemoryError::UnclaimedPagesFull { .. }));
assert!(
registry.table_registry_page_by_name("users").is_some(),
"registry entry must remain when release is rejected"
);
let snapshot = SchemaSnapshotLedger::load(pages.schema_snapshot_page, &mut mm)
.expect("load snapshot ledger")
.get()
.clone();
assert_eq!(snapshot.name, "users");
}
}