use crate::{
db::{
Db,
commit::{
CommitMarker,
memory::configure_commit_memory_id,
rebuild::rebuild_secondary_indexes_from_rows,
replay::replay_commit_marker_row_ops,
store::{
commit_marker_may_be_present, commit_marker_present_fast,
mark_commit_marker_verified_absent, with_commit_store,
},
},
data::{DataStore, DecodedDataStoreKey, RawDataStoreKey, RawRow},
diagnostics::integrity_report_after_recovery,
index::IndexStore,
journal::{
FoldWatermark, JournalBatch, JournalRecord, JournalSequence, JournalTailStore,
JournalTailVisit,
},
registry::{StoreHandle, StoreRecoveryCapability},
schema::{
SchemaStore, accepted_commit_schema_fingerprint, decode_persisted_schema_snapshot,
ensure_accepted_schema_snapshot, reconcile_runtime_schemas,
},
},
error::{ErrorOrigin, InternalError},
traits::CanisterKind,
};
use std::sync::OnceLock;
static RECOVERED: OnceLock<()> = OnceLock::new();
pub(crate) fn ensure_recovered<C: CanisterKind>(db: &Db<C>) -> Result<(), InternalError> {
configure_commit_memory_id(C::COMMIT_MEMORY_ID, C::COMMIT_STABLE_KEY)
.map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
if RECOVERED.get().is_none() {
ensure_schema_reconciled(db)?;
perform_recovery(db)?;
return ensure_schema_reconciled(db);
}
if !commit_marker_may_be_present() {
return ensure_schema_reconciled(db);
}
if commit_marker_present_fast().map_err(|err| err.with_origin(ErrorOrigin::Recovery))? {
ensure_schema_reconciled(db)?;
perform_recovery(db)?;
return ensure_schema_reconciled(db);
}
mark_commit_marker_verified_absent();
ensure_schema_reconciled(db)
}
fn perform_recovery<C: CanisterKind>(db: &Db<C>) -> Result<(), InternalError> {
let marker = with_commit_store(|store| store.load())
.map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
let had_marker = marker.is_some();
if let Some(marker) = marker {
publish_marker_bound_journal_batches(db, &marker)?;
replay_commit_marker_row_ops(db, &marker.row_ops)
.map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
}
fold_journaled_tails(db).map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
rebuild_journaled_live_projections(db).map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
rebuild_secondary_indexes_from_rows(db)
.map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
fold_journaled_index_materialized_views(db)
.map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
validate_recovery_integrity(db).map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
if had_marker {
with_commit_store(super::store::CommitStore::clear_verified)
.map_err(|err| err.with_origin(ErrorOrigin::Recovery))?;
}
db.mark_all_registered_index_stores_ready();
mark_commit_marker_verified_absent();
let _ = RECOVERED.set(());
Ok(())
}
fn publish_marker_bound_journal_batches<C: CanisterKind>(
db: &Db<C>,
marker: &CommitMarker,
) -> Result<(), InternalError> {
for batch in marker.journal_batches() {
let (_, handle) = journal_batch_store_handle(db, batch)?;
let journal_store = handle.journal_tail_store().ok_or_else(|| {
InternalError::store_corruption(
"marker-bound journal batch resolved to store without journal tail",
)
})?;
journal_store.with_borrow_mut(|store| store.append_batch(batch))?;
}
Ok(())
}
fn rebuild_journaled_live_projections<C: CanisterKind>(db: &Db<C>) -> Result<(), InternalError> {
let stores = sorted_journaled_store_handles(db);
for (_, handle) in &stores {
handle.with_data_mut(DataStore::reset_journaled_live_projection)?;
handle.with_schema_mut(SchemaStore::reset_journaled_live_projection)?;
}
for (store_path, handle) in stores {
let journal_store = handle.journal_tail_store().ok_or_else(|| {
InternalError::store_corruption(
"journaled recovery handle does not expose journal-tail storage",
)
})?;
journal_store.with_borrow(|store| {
let watermark = store.fold_watermark()?.highest_folded_journal_sequence();
store.visit_batches_after(watermark, |batch| {
replay_journal_batch(db, store_path, handle, batch)?;
Ok(JournalTailVisit::Continue)
})
})?;
}
Ok(())
}
fn fold_journaled_tails<C: CanisterKind>(db: &Db<C>) -> Result<(), InternalError> {
for (store_path, handle) in sorted_journaled_store_handles(db) {
let journal_store = handle.journal_tail_store().ok_or_else(|| {
InternalError::store_corruption(
"journaled fold handle does not expose journal-tail storage",
)
})?;
let watermark = journal_store.with_borrow(JournalTailStore::fold_watermark)?;
let mut highest_folded = watermark.highest_folded_journal_sequence();
journal_store.with_borrow(|store| {
store.visit_batches_after(watermark.highest_folded_journal_sequence(), |batch| {
fold_journal_batch(db, store_path, handle, batch)?;
highest_folded = batch.journal_sequence();
Ok(JournalTailVisit::Continue)
})
})?;
if highest_folded > watermark.highest_folded_journal_sequence() {
let next_epoch = watermark.fold_epoch().checked_add(1).ok_or_else(|| {
InternalError::store_corruption("journal fold epoch space exhausted")
})?;
let next_watermark = FoldWatermark::new(highest_folded, next_epoch);
journal_store.with_borrow_mut(|store| {
store.persist_fold_watermark(next_watermark)?;
store.clear_batches_through(highest_folded);
Ok::<(), InternalError>(())
})?;
} else if watermark.highest_folded_journal_sequence() > JournalSequence::new(0) {
journal_store.with_borrow_mut(|store| {
store.clear_batches_through(watermark.highest_folded_journal_sequence());
Ok::<(), InternalError>(())
})?;
}
}
Ok(())
}
fn fold_journaled_index_materialized_views<C: CanisterKind>(
db: &Db<C>,
) -> Result<(), InternalError> {
for (_, handle) in sorted_journaled_store_handles(db) {
handle.with_index_mut(IndexStore::fold_journaled_materialized_view)?;
}
Ok(())
}
fn sorted_journaled_store_handles<C: CanisterKind>(db: &Db<C>) -> Vec<(&'static str, StoreHandle)> {
let mut stores = db.with_store_registry(|registry| registry.iter().collect::<Vec<_>>());
stores.retain(|(_, handle)| {
handle.storage_capabilities().recovery()
== StoreRecoveryCapability::StableBasePlusJournalReplay
});
stores.sort_by_key(|(path, _)| *path);
stores
}
fn replay_journal_batch<C: CanisterKind>(
db: &Db<C>,
expected_store_path: &'static str,
expected_handle: StoreHandle,
batch: &JournalBatch,
) -> Result<(), InternalError> {
let (_, batch_handle) = journal_batch_store_handle(db, batch)?;
if !std::ptr::eq(batch_handle.data_store(), expected_handle.data_store()) {
return Err(InternalError::store_corruption(
"journal batch replay resolved to a different store handle than its journal tail",
));
}
for record in batch.records() {
replay_journal_record(db, expected_store_path, expected_handle, record)?;
}
Ok(())
}
fn fold_journal_batch<C: CanisterKind>(
db: &Db<C>,
expected_store_path: &'static str,
expected_handle: StoreHandle,
batch: &JournalBatch,
) -> Result<(), InternalError> {
let (_, batch_handle) = journal_batch_store_handle(db, batch)?;
if !std::ptr::eq(batch_handle.data_store(), expected_handle.data_store()) {
return Err(InternalError::store_corruption(
"journal batch fold resolved to a different store handle than its journal tail",
));
}
for record in batch.records() {
fold_journal_record(db, expected_store_path, expected_handle, record)?;
}
Ok(())
}
fn replay_journal_record<C: CanisterKind>(
db: &Db<C>,
expected_store_path: &'static str,
expected_handle: StoreHandle,
record: &JournalRecord,
) -> Result<(), InternalError> {
match record {
JournalRecord::RowPut {
entity_path,
primary_key,
row_bytes,
schema_fingerprint,
} => {
validate_journal_row_record(
db,
expected_store_path,
expected_handle,
entity_path,
primary_key,
schema_fingerprint,
)?;
let row =
RawRow::from_untrusted_bytes(row_bytes.clone()).map_err(InternalError::from)?;
expected_handle.with_data_mut(|store| {
store
.apply_recovered_journal_put(primary_key.clone(), row)
.map(|_| ())
})
}
JournalRecord::RowDelete {
entity_path,
primary_key,
schema_fingerprint,
} => {
validate_journal_row_record(
db,
expected_store_path,
expected_handle,
entity_path,
primary_key,
schema_fingerprint,
)?;
expected_handle.with_data_mut(|store| {
store
.apply_recovered_journal_delete(primary_key)
.map(|_| ())
})
}
JournalRecord::SchemaPut {
store_path,
schema_snapshot_bytes,
} => {
if store_path != expected_store_path {
return Err(InternalError::store_corruption(format!(
"journal schema record store mismatch: expected '{expected_store_path}', found '{store_path}'",
)));
}
let snapshot = decode_persisted_schema_snapshot(schema_snapshot_bytes)?;
let hooks = db.runtime_hook_for_entity_path(snapshot.entity_path())?;
if hooks.store_path != expected_store_path {
return Err(InternalError::store_corruption(format!(
"journal schema record entity '{}' belongs to store '{}' not '{}'",
snapshot.entity_path(),
hooks.store_path,
expected_store_path,
)));
}
expected_handle.with_schema_mut(|schema_store| {
schema_store.insert_persisted_snapshot(hooks.entity_tag, &snapshot)
})
}
}
}
fn fold_journal_record<C: CanisterKind>(
db: &Db<C>,
expected_store_path: &'static str,
expected_handle: StoreHandle,
record: &JournalRecord,
) -> Result<(), InternalError> {
match record {
JournalRecord::RowPut {
entity_path,
primary_key,
row_bytes,
schema_fingerprint,
} => {
validate_journal_row_record(
db,
expected_store_path,
expected_handle,
entity_path,
primary_key,
schema_fingerprint,
)?;
let row =
RawRow::from_untrusted_bytes(row_bytes.clone()).map_err(InternalError::from)?;
expected_handle.with_data_mut(|store| {
store
.fold_recovered_journal_put(primary_key.clone(), row)
.map(|_| ())
})
}
JournalRecord::RowDelete {
entity_path,
primary_key,
schema_fingerprint,
} => {
validate_journal_row_record(
db,
expected_store_path,
expected_handle,
entity_path,
primary_key,
schema_fingerprint,
)?;
expected_handle
.with_data_mut(|store| store.fold_recovered_journal_delete(primary_key).map(|_| ()))
}
JournalRecord::SchemaPut {
store_path,
schema_snapshot_bytes,
} => {
if store_path != expected_store_path {
return Err(InternalError::store_corruption(format!(
"journal schema record store mismatch: expected '{expected_store_path}', found '{store_path}'",
)));
}
let snapshot = decode_persisted_schema_snapshot(schema_snapshot_bytes)?;
let hooks = db.runtime_hook_for_entity_path(snapshot.entity_path())?;
if hooks.store_path != expected_store_path {
return Err(InternalError::store_corruption(format!(
"journal schema record entity '{}' belongs to store '{}' not '{}'",
snapshot.entity_path(),
hooks.store_path,
expected_store_path,
)));
}
expected_handle.with_schema_mut(|schema_store| {
schema_store.fold_persisted_snapshot(hooks.entity_tag, &snapshot)
})
}
}
}
fn validate_journal_row_record<C: CanisterKind>(
db: &Db<C>,
expected_store_path: &'static str,
expected_handle: StoreHandle,
entity_path: &str,
primary_key: &RawDataStoreKey,
schema_fingerprint: &[u8; 16],
) -> Result<(), InternalError> {
let hooks = db.runtime_hook_for_entity_path(entity_path)?;
if hooks.store_path != expected_store_path {
return Err(InternalError::store_corruption(format!(
"journal row record entity '{entity_path}' belongs to store '{}' not '{expected_store_path}'",
hooks.store_path,
)));
}
let decoded_key = DecodedDataStoreKey::try_from_raw(primary_key).map_err(|err| {
InternalError::store_corruption(format!("journal row record key decode failed: {err}"))
})?;
if decoded_key.entity_tag() != hooks.entity_tag {
return Err(InternalError::store_corruption(format!(
"journal row record key tag does not match entity '{entity_path}'",
)));
}
let accepted = expected_handle.with_schema_mut(|schema_store| {
ensure_accepted_schema_snapshot(
schema_store,
hooks.entity_tag,
hooks.entity_path,
hooks.model,
)
})?;
let expected_fingerprint = accepted_commit_schema_fingerprint(&accepted)?;
if &expected_fingerprint != schema_fingerprint {
return Err(InternalError::store_corruption(format!(
"journal row record schema fingerprint mismatch for '{entity_path}'",
)));
}
Ok(())
}
fn journal_batch_store_handle<C: CanisterKind>(
db: &Db<C>,
batch: &JournalBatch,
) -> Result<(&'static str, StoreHandle), InternalError> {
let mut resolved = None::<(&'static str, StoreHandle)>;
for record in batch.records() {
let (path, handle) = journal_record_store_handle(db, record)?;
if let Some((existing_path, _)) = resolved {
if existing_path != path {
return Err(InternalError::store_corruption(format!(
"journal batch contains records for multiple stores: '{existing_path}' and '{path}'",
)));
}
} else {
resolved = Some((path, handle));
}
}
let Some((path, handle)) = resolved else {
return Err(InternalError::store_corruption(
"journal batch contains no records",
));
};
if handle.storage_capabilities().recovery()
!= StoreRecoveryCapability::StableBasePlusJournalReplay
{
return Err(InternalError::store_corruption(format!(
"journal batch resolved to non-journaled store '{path}'",
)));
}
Ok((path, handle))
}
fn journal_record_store_handle<C: CanisterKind>(
db: &Db<C>,
record: &JournalRecord,
) -> Result<(&'static str, StoreHandle), InternalError> {
let store_path = match record {
JournalRecord::RowPut { entity_path, .. }
| JournalRecord::RowDelete { entity_path, .. } => {
db.runtime_hook_for_entity_path(entity_path.as_str())?
.store_path
}
JournalRecord::SchemaPut { store_path, .. } => store_path.as_str(),
};
db.with_store_registry(|registry| {
registry
.iter()
.find(|(path, _)| *path == store_path)
.ok_or_else(|| {
InternalError::store_corruption(format!(
"journal record resolved to unknown store '{store_path}'",
))
})
})
}
fn ensure_schema_reconciled<C: CanisterKind>(db: &Db<C>) -> Result<(), InternalError> {
if !db.has_runtime_hooks() {
return Ok(());
}
reconcile_runtime_schemas(db, db.entity_runtime_hooks)
.map_err(|err| err.with_origin(ErrorOrigin::Recovery))
}
fn validate_recovery_integrity<C: CanisterKind>(db: &Db<C>) -> Result<(), InternalError> {
if !db.has_runtime_hooks() {
return Ok(());
}
let report = integrity_report_after_recovery(db)?;
let totals = report.totals();
if totals.missing_index_entries() > 0
|| totals.divergent_index_entries() > 0
|| totals.orphan_index_references() > 0
{
return Err(InternalError::recovery_integrity_validation_failed(
totals.missing_index_entries(),
totals.divergent_index_entries(),
totals.orphan_index_references(),
));
}
Ok(())
}