use std::collections::HashMap;
use wasm_dbms_api::prelude::{
DEFAULT_ALIGNMENT, DataSize, Encode, MSize, MemoryResult, Page, PageOffset, TableFingerprint,
TableSchema,
};
use crate::table_registry::{AutoincrementLedger, IndexLedger};
use crate::{MemoryAccess, MemoryManager, MemoryProvider};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TableRegistryPage {
pub pages_list_page: Page,
pub free_segments_page: Page,
pub index_registry_page: Page,
pub autoincrement_registry_page: Option<Page>,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct SchemaRegistry {
tables: HashMap<TableFingerprint, TableRegistryPage>,
}
impl SchemaRegistry {
pub fn load(mm: &mut MemoryManager<impl MemoryProvider>) -> MemoryResult<Self> {
let page = mm.schema_page();
let registry: Self = mm.read_at(page, 0)?;
Ok(registry)
}
pub fn register_table<TS>(
&mut self,
mm: &mut MemoryManager<impl MemoryProvider>,
) -> MemoryResult<TableRegistryPage>
where
TS: TableSchema,
{
let fingerprint = TS::fingerprint();
if let Some(pages) = self.tables.get(&fingerprint) {
return Ok(*pages);
}
let pages_list_page = mm.allocate_page()?;
let free_segments_page = mm.allocate_page()?;
let index_registry_page = mm.allocate_page()?;
let has_autoincrement = TS::columns().iter().any(|col| col.auto_increment);
let autoincrement_registry_page = if has_autoincrement {
Some(mm.allocate_page()?)
} else {
None
};
let pages = TableRegistryPage {
pages_list_page,
free_segments_page,
index_registry_page,
autoincrement_registry_page,
};
self.tables.insert(fingerprint, pages);
let page = mm.schema_page();
mm.write_at(page, 0, self)?;
IndexLedger::init(pages.index_registry_page, TS::indexes(), mm)?;
if let Some(autoinc_page) = pages.autoincrement_registry_page {
AutoincrementLedger::init::<TS>(autoinc_page, mm)?;
}
Ok(pages)
}
pub fn save(&self, mm: &mut MemoryManager<impl MemoryProvider>) -> MemoryResult<()> {
let page = mm.schema_page();
mm.write_at(page, 0, self)
}
pub fn table_registry_page<TS>(&self) -> Option<TableRegistryPage>
where
TS: TableSchema,
{
self.tables.get(&TS::fingerprint()).copied()
}
}
impl Encode for SchemaRegistry {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
let mut buffer = Vec::with_capacity(self.size() as usize);
buffer.extend_from_slice(&(self.tables.len() as u64).to_le_bytes());
for (fingerprint, page) in &self.tables {
buffer.extend_from_slice(&fingerprint.to_le_bytes());
buffer.extend_from_slice(&page.pages_list_page.to_le_bytes());
buffer.extend_from_slice(&page.free_segments_page.to_le_bytes());
buffer.extend_from_slice(&page.index_registry_page.to_le_bytes());
if let Some(autoinc_page) = page.autoincrement_registry_page {
buffer.push(1); buffer.extend_from_slice(&autoinc_page.to_le_bytes());
} else {
buffer.push(0); }
}
std::borrow::Cow::Owned(buffer)
}
fn decode(data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
let mut offset = 0;
let len = u64::from_le_bytes(
data[offset..offset + 8]
.try_into()
.expect("failed to read length"),
) as usize;
offset += 8;
let mut tables = HashMap::with_capacity(len);
for _ in 0..len {
let fingerprint = u64::from_le_bytes(data[offset..offset + 8].try_into()?);
offset += 8;
let pages_list_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
let deleted_records_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
let index_registry_page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
let has_autoincrement = data[offset] == 1;
offset += 1;
let autoincrement_registry_page = if has_autoincrement {
let page = Page::from_le_bytes(data[offset..offset + 4].try_into()?);
offset += 4;
Some(page)
} else {
None
};
tables.insert(
fingerprint,
TableRegistryPage {
pages_list_page,
free_segments_page: deleted_records_page,
index_registry_page,
autoincrement_registry_page,
},
);
}
Ok(Self { tables })
}
fn size(&self) -> MSize {
let autoinc_pages = self
.tables
.values()
.filter(|page| page.autoincrement_registry_page.is_some())
.count() as MSize;
8 + (self.tables.len() as MSize * (4 * 3 + 8 + 1)) + (autoinc_pages * 4)
}
}
#[cfg(test)]
mod tests {
use candid::CandidType;
use serde::{Deserialize, Serialize};
use wasm_dbms_api::prelude::{
ColumnDef, DbmsResult, IndexDef, InsertRecord, Int32, NoForeignFetcher, TableColumns,
TableRecord, UpdateRecord,
};
use super::*;
use crate::{HeapMemoryProvider, RecordAddress};
fn make_mm() -> MemoryManager<HeapMemoryProvider> {
MemoryManager::init(HeapMemoryProvider::default())
}
#[test]
fn test_should_encode_and_decode_schema_registry() {
let mut mm = make_mm();
let mut registry =
SchemaRegistry::load(&mut mm).expect("failed to load init schema registry");
let registry_page = registry
.register_table::<User>(&mut mm)
.expect("failed to register table");
let fetched_page = registry
.table_registry_page::<User>()
.expect("failed to get table registry page");
assert_eq!(registry_page, fetched_page);
let encoded = registry.encode();
let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
assert_eq!(registry, decoded);
let another_registry_page = registry
.register_table::<AnotherTable>(&mut mm)
.expect("failed to register another table");
let another_fetched_page = registry
.table_registry_page::<AnotherTable>()
.expect("failed to get another table registry page");
assert_eq!(another_registry_page, another_fetched_page);
let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload schema registry");
assert_eq!(registry, reloaded);
assert_eq!(reloaded.tables.len(), 2);
assert_eq!(
reloaded
.table_registry_page::<User>()
.expect("failed to get first table registry page after reload"),
registry_page
);
assert_eq!(
reloaded
.table_registry_page::<AnotherTable>()
.expect("failed to get second table registry page after reload"),
another_registry_page
);
}
#[test]
fn test_should_not_register_same_table_twice() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let first_page = registry
.register_table::<User>(&mut mm)
.expect("failed to register table first time");
let second_page = registry
.register_table::<User>(&mut mm)
.expect("failed to register table second time");
assert_eq!(first_page, second_page);
assert_eq!(registry.tables.len(), 1);
}
#[test]
fn test_should_init_index_ledger() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register table");
let mut index_ledger = IndexLedger::load(pages.index_registry_page, &mut mm)
.expect("failed to load index ledger");
index_ledger
.insert(
&["id"],
Int32::from(1i32),
RecordAddress { page: 1, offset: 0 },
&mut mm,
)
.expect("failed to insert index");
let result = index_ledger
.search(&["id"], &Int32::from(1i32), &mut mm)
.expect("failed to search index")
.get(0)
.copied()
.expect("no index at 0");
assert_eq!(result, RecordAddress { page: 1, offset: 0 });
}
#[derive(Clone, CandidType)]
struct AnotherTable;
impl Encode for AnotherTable {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
std::borrow::Cow::Owned(vec![])
}
fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
Ok(AnotherTable)
}
fn size(&self) -> MSize {
0
}
}
#[derive(Clone, CandidType, Deserialize)]
struct AnotherTableRecord;
impl TableRecord for AnotherTableRecord {
type Schema = AnotherTable;
fn from_values(_values: TableColumns) -> Self {
AnotherTableRecord
}
fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
}
#[derive(Clone, CandidType, Serialize)]
struct AnotherTableInsert;
impl InsertRecord for AnotherTableInsert {
type Record = AnotherTableRecord;
type Schema = AnotherTable;
fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
Ok(AnotherTableInsert)
}
fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn into_record(self) -> Self::Schema {
AnotherTable
}
}
#[derive(Clone, CandidType, Serialize)]
struct AnotherTableUpdate;
impl UpdateRecord for AnotherTableUpdate {
type Record = AnotherTableRecord;
type Schema = AnotherTable;
fn from_values(
_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
_where_clause: Option<wasm_dbms_api::prelude::Filter>,
) -> Self {
AnotherTableUpdate
}
fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
None
}
}
impl TableSchema for AnotherTable {
type Record = AnotherTableRecord;
type Insert = AnotherTableInsert;
type Update = AnotherTableUpdate;
type ForeignFetcher = NoForeignFetcher;
fn table_name() -> &'static str {
"another_table"
}
fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
&[]
}
fn primary_key() -> &'static str {
""
}
fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
&[]
}
fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn sanitizer(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
None
}
fn validator(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
None
}
}
#[derive(Clone, CandidType)]
struct User;
impl Encode for User {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
std::borrow::Cow::Owned(vec![])
}
fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
Ok(User)
}
fn size(&self) -> MSize {
0
}
}
#[derive(Clone, CandidType, Deserialize)]
struct UserRecord;
impl TableRecord for UserRecord {
type Schema = User;
fn from_values(_values: TableColumns) -> Self {
UserRecord
}
fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
}
#[derive(Clone, CandidType, Serialize)]
struct UserInsert;
impl InsertRecord for UserInsert {
type Record = UserRecord;
type Schema = User;
fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
Ok(UserInsert)
}
fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn into_record(self) -> Self::Schema {
User
}
}
#[derive(Clone, CandidType, Serialize)]
struct UserUpdate;
impl UpdateRecord for UserUpdate {
type Record = UserRecord;
type Schema = User;
fn from_values(
_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
_where_clause: Option<wasm_dbms_api::prelude::Filter>,
) -> Self {
UserUpdate
}
fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
None
}
}
impl TableSchema for User {
type Record = UserRecord;
type Insert = UserInsert;
type Update = UserUpdate;
type ForeignFetcher = NoForeignFetcher;
fn table_name() -> &'static str {
"users"
}
fn columns() -> &'static [wasm_dbms_api::prelude::ColumnDef] {
&[]
}
fn primary_key() -> &'static str {
"id"
}
fn indexes() -> &'static [wasm_dbms_api::prelude::IndexDef] {
&[IndexDef(&["id"])]
}
fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn sanitizer(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
None
}
fn validator(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
None
}
}
#[test]
fn test_table_registry_page_returns_none_for_unregistered_table() {
let registry = SchemaRegistry::default();
assert!(registry.table_registry_page::<User>().is_none());
}
#[test]
fn test_empty_registry_encode_decode() {
let registry = SchemaRegistry::default();
let encoded = registry.encode();
let decoded = SchemaRegistry::decode(encoded).expect("failed to decode empty registry");
assert_eq!(registry, decoded);
assert_eq!(decoded.tables.len(), 0);
}
#[test]
fn test_load_fresh_memory_returns_empty_registry() {
let mut mm = make_mm();
let registry = SchemaRegistry::load(&mut mm).expect("failed to load from fresh memory");
assert_eq!(registry.tables.len(), 0);
}
#[test]
fn test_save_and_reload() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
registry
.register_table::<User>(&mut mm)
.expect("failed to register");
registry
.register_table::<AnotherTable>(&mut mm)
.expect("failed to register another");
registry.save(&mut mm).expect("failed to save");
let reloaded = SchemaRegistry::load(&mut mm).expect("failed to reload");
assert_eq!(reloaded.tables.len(), 2);
assert_eq!(registry, reloaded);
}
#[test]
fn test_schema_registry_size() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
assert_eq!(registry.size(), 8);
registry
.register_table::<User>(&mut mm)
.expect("failed to register");
assert_eq!(registry.size(), 29);
}
#[test]
fn test_should_allocate_autoincrement_page_when_column_has_autoincrement() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<AutoincrementTable>(&mut mm)
.expect("failed to register autoincrement table");
assert!(
pages.autoincrement_registry_page.is_some(),
"autoincrement registry page should be allocated for tables with autoincrement columns"
);
}
#[test]
fn test_should_not_allocate_autoincrement_page_when_no_autoincrement_column() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
let pages = registry
.register_table::<User>(&mut mm)
.expect("failed to register user table");
assert!(
pages.autoincrement_registry_page.is_none(),
"autoincrement registry page should not be allocated for tables without autoincrement columns"
);
}
#[test]
fn test_schema_registry_size_with_autoincrement() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
registry
.register_table::<AutoincrementTable>(&mut mm)
.expect("failed to register");
assert_eq!(registry.size(), 33);
}
#[test]
fn test_should_encode_and_decode_registry_with_autoincrement() {
let mut mm = make_mm();
let mut registry = SchemaRegistry::default();
registry
.register_table::<AutoincrementTable>(&mut mm)
.expect("failed to register");
let encoded = registry.encode();
let decoded = SchemaRegistry::decode(encoded).expect("failed to decode");
assert_eq!(registry, decoded);
let page = decoded
.table_registry_page::<AutoincrementTable>()
.expect("missing autoincrement table");
assert!(page.autoincrement_registry_page.is_some());
}
#[derive(Clone, CandidType)]
struct AutoincrementTable;
impl Encode for AutoincrementTable {
const SIZE: DataSize = DataSize::Dynamic;
const ALIGNMENT: PageOffset = DEFAULT_ALIGNMENT;
fn encode(&'_ self) -> std::borrow::Cow<'_, [u8]> {
std::borrow::Cow::Owned(vec![])
}
fn decode(_data: std::borrow::Cow<[u8]>) -> MemoryResult<Self>
where
Self: Sized,
{
Ok(AutoincrementTable)
}
fn size(&self) -> MSize {
0
}
}
#[derive(Clone, CandidType, Deserialize)]
struct AutoincrementTableRecord;
impl TableRecord for AutoincrementTableRecord {
type Schema = AutoincrementTable;
fn from_values(_values: TableColumns) -> Self {
AutoincrementTableRecord
}
fn to_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
}
#[derive(Clone, CandidType, Serialize)]
struct AutoincrementTableInsert;
impl InsertRecord for AutoincrementTableInsert {
type Record = AutoincrementTableRecord;
type Schema = AutoincrementTable;
fn from_values(_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)]) -> DbmsResult<Self> {
Ok(AutoincrementTableInsert)
}
fn into_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn into_record(self) -> Self::Schema {
AutoincrementTable
}
}
#[derive(Clone, CandidType, Serialize)]
struct AutoincrementTableUpdate;
impl UpdateRecord for AutoincrementTableUpdate {
type Record = AutoincrementTableRecord;
type Schema = AutoincrementTable;
fn from_values(
_values: &[(ColumnDef, wasm_dbms_api::prelude::Value)],
_where_clause: Option<wasm_dbms_api::prelude::Filter>,
) -> Self {
AutoincrementTableUpdate
}
fn update_values(&self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn where_clause(&self) -> Option<wasm_dbms_api::prelude::Filter> {
None
}
}
impl TableSchema for AutoincrementTable {
type Record = AutoincrementTableRecord;
type Insert = AutoincrementTableInsert;
type Update = AutoincrementTableUpdate;
type ForeignFetcher = NoForeignFetcher;
fn table_name() -> &'static str {
"autoincrement_table"
}
fn columns() -> &'static [ColumnDef] {
use wasm_dbms_api::prelude::DataTypeKind;
&[ColumnDef {
name: "id",
data_type: DataTypeKind::Uint32,
auto_increment: true,
nullable: false,
primary_key: true,
unique: true,
foreign_key: None,
}]
}
fn primary_key() -> &'static str {
"id"
}
fn indexes() -> &'static [IndexDef] {
&[IndexDef(&["id"])]
}
fn to_values(self) -> Vec<(ColumnDef, wasm_dbms_api::prelude::Value)> {
vec![]
}
fn sanitizer(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Sanitize>> {
None
}
fn validator(
_column_name: &'static str,
) -> Option<Box<dyn wasm_dbms_api::prelude::Validate>> {
None
}
}
}