use crate::{
db::{
Db, EntityRuntimeHooks,
schema::{
AcceptedSchemaSnapshot, SchemaStore, SchemaTransitionDecision,
compiled_schema_proposal_for_model, decide_schema_transition,
},
},
error::InternalError,
model::entity::EntityModel,
traits::CanisterKind,
types::EntityTag,
};
pub(in crate::db) fn reconcile_runtime_schemas<C: CanisterKind>(
db: &Db<C>,
entity_runtime_hooks: &[EntityRuntimeHooks<C>],
) -> Result<(), InternalError> {
for hooks in entity_runtime_hooks {
reconcile_runtime_schema(db, hooks)?;
}
Ok(())
}
fn reconcile_runtime_schema<C: CanisterKind>(
db: &Db<C>,
hooks: &EntityRuntimeHooks<C>,
) -> Result<(), InternalError> {
let store = db.store_handle(hooks.store_path)?;
store.with_schema_mut(|schema_store| {
ensure_initial_schema_snapshot(
schema_store,
hooks.entity_tag,
hooks.entity_path,
hooks.model,
)
.map(|_| ())
})
}
pub(in crate::db) fn ensure_initial_schema_snapshot(
schema_store: &mut SchemaStore,
entity_tag: EntityTag,
entity_path: &str,
model: &EntityModel,
) -> Result<AcceptedSchemaSnapshot, InternalError> {
let proposal = compiled_schema_proposal_for_model(model);
let expected = proposal.initial_persisted_schema_snapshot();
if let Some(actual) = schema_store.latest_persisted_snapshot(entity_tag)? {
validate_existing_schema_snapshot(entity_path, &actual, &expected)?;
return Ok(AcceptedSchemaSnapshot::new(actual));
}
schema_store.insert_persisted_snapshot(entity_tag, &expected)?;
Ok(AcceptedSchemaSnapshot::new(expected))
}
fn validate_existing_schema_snapshot(
entity_path: &str,
actual: &crate::db::schema::PersistedSchemaSnapshot,
expected: &crate::db::schema::PersistedSchemaSnapshot,
) -> Result<(), InternalError> {
match decide_schema_transition(actual, expected) {
SchemaTransitionDecision::ExactMatch => Ok(()),
SchemaTransitionDecision::Rejected(rejection) => {
Err(InternalError::store_unsupported(format!(
"schema evolution is not yet supported for entity '{entity_path}': {}",
rejection.detail(),
)))
}
}
}
#[cfg(test)]
mod tests {
use crate::{
db::{
Db, EntityRuntimeHooks,
data::DataStore,
index::IndexStore,
registry::StoreRegistry,
schema::{
PersistedSchemaSnapshot, SchemaRowLayout, SchemaStore, SchemaVersion,
compiled_schema_proposal_for_model,
},
},
error::ErrorClass,
model::field::FieldKind,
testing::test_memory,
traits::{EntityKind, EntitySchema, Path},
types::{EntityTag, Ulid},
};
use icydb_derive::{FieldProjection, PersistedRow};
use serde::Deserialize;
use std::cell::RefCell;
crate::test_canister! {
ident = SchemaReconcileTestCanister,
commit_memory_id = crate::testing::test_commit_memory_id(),
}
crate::test_store! {
ident = SchemaReconcileTestStore,
canister = SchemaReconcileTestCanister,
}
#[derive(Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, PersistedRow)]
struct SchemaReconcileEntity {
id: Ulid,
name: String,
}
crate::test_entity_schema! {
ident = SchemaReconcileEntity,
id = Ulid,
id_field = id,
entity_name = "SchemaReconcileEntity",
entity_tag = EntityTag::new(0x7465_7374_7363_6865),
pk_index = 0,
fields = [
("id", FieldKind::Ulid),
("name", FieldKind::Text { max_len: None }),
],
indexes = [],
store = SchemaReconcileTestStore,
canister = SchemaReconcileTestCanister,
}
thread_local! {
static RECONCILE_DATA_STORE: RefCell<DataStore> =
RefCell::new(DataStore::init(test_memory(252)));
static RECONCILE_INDEX_STORE: RefCell<IndexStore> =
RefCell::new(IndexStore::init(test_memory(253)));
static RECONCILE_SCHEMA_STORE: RefCell<SchemaStore> =
RefCell::new(SchemaStore::init(test_memory(254)));
static RECONCILE_STORE_REGISTRY: StoreRegistry = {
let mut registry = StoreRegistry::new();
registry
.register_store(
SchemaReconcileTestStore::PATH,
&RECONCILE_DATA_STORE,
&RECONCILE_INDEX_STORE,
&RECONCILE_SCHEMA_STORE,
)
.expect("schema reconcile test store should register");
registry
};
}
static RECONCILE_RUNTIME_HOOKS: &[EntityRuntimeHooks<SchemaReconcileTestCanister>] =
&[EntityRuntimeHooks::for_entity::<SchemaReconcileEntity>()];
static RECONCILE_DB: Db<SchemaReconcileTestCanister> =
Db::new_with_hooks(&RECONCILE_STORE_REGISTRY, RECONCILE_RUNTIME_HOOKS);
fn reset_schema_store() {
RECONCILE_SCHEMA_STORE.with_borrow_mut(SchemaStore::clear);
}
#[test]
fn reconcile_runtime_schemas_writes_initial_snapshot_on_first_contact() {
reset_schema_store();
super::reconcile_runtime_schemas(&RECONCILE_DB, RECONCILE_RUNTIME_HOOKS)
.expect("initial schema reconciliation should write generated snapshot");
let snapshot = RECONCILE_SCHEMA_STORE
.with_borrow(|store| {
store.get_persisted_snapshot(
SchemaReconcileEntity::ENTITY_TAG,
SchemaVersion::initial(),
)
})
.expect("persisted schema snapshot should decode");
let snapshot = snapshot.expect("initial schema snapshot should be persisted");
assert_eq!(snapshot.entity_path(), SchemaReconcileEntity::PATH);
assert_eq!(snapshot.fields().len(), 2);
}
#[test]
fn reconcile_runtime_schemas_accepts_existing_matching_snapshot() {
reset_schema_store();
super::reconcile_runtime_schemas(&RECONCILE_DB, RECONCILE_RUNTIME_HOOKS)
.expect("initial schema reconciliation should write generated snapshot");
super::reconcile_runtime_schemas(&RECONCILE_DB, RECONCILE_RUNTIME_HOOKS)
.expect("matching persisted schema should be accepted");
assert_eq!(RECONCILE_SCHEMA_STORE.with_borrow(SchemaStore::len), 1);
}
#[test]
fn reconcile_runtime_schemas_rejects_changed_initial_snapshot() {
reset_schema_store();
let proposal = compiled_schema_proposal_for_model(SchemaReconcileEntity::MODEL);
let expected = proposal.initial_persisted_schema_snapshot();
let changed = PersistedSchemaSnapshot::new(
expected.version(),
expected.entity_path().to_string(),
"ChangedSchemaReconcileEntity".to_string(),
expected.primary_key_field_id(),
expected.row_layout().clone(),
expected.fields().to_vec(),
);
RECONCILE_SCHEMA_STORE.with_borrow_mut(|store| {
store
.insert_persisted_snapshot(SchemaReconcileEntity::ENTITY_TAG, &changed)
.expect("changed schema snapshot should encode");
});
let err = super::reconcile_runtime_schemas(&RECONCILE_DB, RECONCILE_RUNTIME_HOOKS)
.expect_err("schema reconciliation should reject changed persisted snapshot");
assert_eq!(err.class, ErrorClass::Unsupported);
assert!(
err.message
.contains("schema evolution is not yet supported"),
"schema mismatch should fail at the explicit evolution boundary"
);
assert!(
err.message
.contains("entity name changed: stored='ChangedSchemaReconcileEntity' generated='SchemaReconcileEntity'"),
"schema mismatch should include the first rejected difference"
);
}
#[test]
fn reconcile_runtime_schemas_rejects_newer_schema_snapshot() {
reset_schema_store();
let proposal = compiled_schema_proposal_for_model(SchemaReconcileEntity::MODEL);
let expected = proposal.initial_persisted_schema_snapshot();
let newer_row_layout = SchemaRowLayout::new(
SchemaVersion::new(2),
expected.row_layout().field_to_slot().to_vec(),
);
let newer = PersistedSchemaSnapshot::new(
SchemaVersion::new(2),
expected.entity_path().to_string(),
expected.entity_name().to_string(),
expected.primary_key_field_id(),
newer_row_layout,
expected.fields().to_vec(),
);
RECONCILE_SCHEMA_STORE.with_borrow_mut(|store| {
store
.insert_persisted_snapshot(SchemaReconcileEntity::ENTITY_TAG, &newer)
.expect("newer schema snapshot should encode");
});
let err = super::reconcile_runtime_schemas(&RECONCILE_DB, RECONCILE_RUNTIME_HOOKS)
.expect_err("schema reconciliation must not ignore newer persisted versions");
assert_eq!(err.class, ErrorClass::Unsupported);
assert!(
err.message
.contains("schema version changed: stored=2 generated=1"),
"schema reconciliation should compare against the latest persisted version"
);
}
}