use crate::{
db::{
Db,
commit::{CommitRowOp, PreparedIndexMutation},
data::{DataKey, RawDataKey, RawRow},
index::{IndexState, IndexStore, RawIndexEntry, RawIndexKey},
registry::StoreHandle,
schema::commit_schema_fingerprint_for_model,
},
error::InternalError,
traits::CanisterKind,
};
pub(in crate::db) fn rebuild_secondary_indexes_from_rows(
db: &Db<impl CanisterKind>,
) -> Result<(), InternalError> {
if !db.has_runtime_hooks() {
return Ok(());
}
let stores = sorted_store_handles(db);
let snapshots = stores
.iter()
.map(|(_, handle)| IndexStoreSnapshot {
handle: *handle,
entries: handle.with_index(IndexStore::entries),
state: handle.index_state(),
})
.collect::<Vec<_>>();
let rebuild_result = rebuild_secondary_indexes_in_place(db, &stores);
if let Err(err) = rebuild_result {
restore_index_store_snapshots(snapshots);
return Err(err);
}
Ok(())
}
#[derive(Clone)]
struct IndexStoreSnapshot {
handle: StoreHandle,
entries: Vec<(RawIndexKey, RawIndexEntry)>,
state: IndexState,
}
fn sorted_store_handles(db: &Db<impl CanisterKind>) -> Vec<(&'static str, StoreHandle)> {
let mut stores = db.with_store_registry(|registry| registry.iter().collect::<Vec<_>>());
stores.sort_by(|(left, _), (right, _)| left.cmp(right));
debug_assert!(
stores.windows(2).all(|pair| pair[0].0 <= pair[1].0),
"store registry iteration order must not affect semantic rebuild ordering",
);
stores
}
fn rebuild_secondary_indexes_in_place(
db: &Db<impl CanisterKind>,
stores: &[(&'static str, StoreHandle)],
) -> Result<(), InternalError> {
for (_, handle) in stores {
handle.mark_index_building();
}
for (_, handle) in stores {
handle.with_index_mut(IndexStore::clear);
}
for (store_path, handle) in stores {
let rows = handle.with_data(|data_store| {
data_store
.iter()
.map(|entry| (*entry.key(), entry.value()))
.collect::<Vec<(RawDataKey, RawRow)>>()
});
for (raw_key, raw_row) in rows {
let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
InternalError::startup_index_rebuild_invalid_data_key(store_path, err)
})?;
let hooks = db.runtime_hook_for_entity_tag(data_key.entity_tag())?;
let row_op = CommitRowOp::new(
hooks.entity_path,
raw_key,
None,
Some(raw_row.as_bytes().to_vec()),
commit_schema_fingerprint_for_model(hooks.entity_path, hooks.model),
);
let prepared = (hooks.prepare_row_commit)(db, &row_op).map_err(|err| {
let message = format!(
"startup index rebuild failed: store='{}' entity='{}' ({})",
store_path, hooks.entity_path, err.message
);
err.with_message(message)
})?;
apply_index_mutations(prepared.index_ops);
}
}
Ok(())
}
fn apply_index_mutations(index_ops: Vec<PreparedIndexMutation>) {
for index_op in index_ops {
index_op.store.with_borrow_mut(|store| {
if let Some(value) = index_op.value {
store.insert(index_op.key, value);
} else {
store.remove(&index_op.key);
}
});
}
}
fn restore_index_store_snapshots(snapshots: Vec<IndexStoreSnapshot>) {
for snapshot in snapshots {
snapshot.handle.with_index_mut(|index_store| {
index_store.clear();
for (raw_key, raw_entry) in snapshot.entries {
index_store.insert(raw_key, raw_entry);
}
match snapshot.state {
IndexState::Building => index_store.mark_building(),
IndexState::Valid => index_store.mark_valid(),
IndexState::Dropping => index_store.mark_dropping(),
}
});
}
}