use crate::{
db::{
Db,
commit::CommitRowOp,
data::{DataKey, RawDataKey, RawRow},
index::{IndexState, IndexStore, RawIndexEntry, RawIndexKey},
registry::StoreHandle,
schema::commit_schema_fingerprint_for_model,
},
error::InternalError,
traits::CanisterKind,
};
pub(in crate::db) fn rebuild_secondary_indexes_from_rows(
db: &Db<impl CanisterKind>,
) -> Result<(), InternalError> {
if !db.has_runtime_hooks() {
return Ok(());
}
let stores = sorted_store_handles(db);
let snapshots = stores
.iter()
.map(|(_, handle)| IndexStoreSnapshot::capture(*handle))
.collect::<Vec<_>>();
let rebuild_result = rebuild_secondary_indexes_in_place(db, &stores);
if let Err(err) = rebuild_result {
for snapshot in snapshots {
snapshot.restore();
}
return Err(err);
}
Ok(())
}
#[derive(Clone)]
struct IndexStoreSnapshot {
handle: StoreHandle,
entries: Vec<(RawIndexKey, RawIndexEntry)>,
state: IndexState,
}
impl IndexStoreSnapshot {
fn capture(handle: StoreHandle) -> Self {
Self {
handle,
entries: handle.with_index(IndexStore::entries),
state: handle.index_state(),
}
}
fn restore(self) {
self.handle.with_index_mut(|index_store| {
index_store.clear();
for (raw_key, raw_entry) in self.entries {
index_store.insert(raw_key, raw_entry);
}
match self.state {
IndexState::Building => index_store.mark_building(),
IndexState::Ready => index_store.mark_ready(),
IndexState::Dropping => index_store.mark_dropping(),
}
});
}
}
fn sorted_store_handles(db: &Db<impl CanisterKind>) -> Vec<(&'static str, StoreHandle)> {
let mut stores = db.with_store_registry(|registry| registry.iter().collect::<Vec<_>>());
stores.sort_by(|(left, _), (right, _)| left.cmp(right));
debug_assert!(
stores.windows(2).all(|pair| pair[0].0 <= pair[1].0),
"store registry iteration order must not affect semantic rebuild ordering",
);
stores
}
fn rebuild_secondary_indexes_in_place(
db: &Db<impl CanisterKind>,
stores: &[(&'static str, StoreHandle)],
) -> Result<(), InternalError> {
for (_, handle) in stores {
handle.mark_index_building();
}
for (_, handle) in stores {
handle.with_index_mut(IndexStore::clear);
}
for (store_path, handle) in stores {
let rows = handle.with_data(|data_store| {
data_store
.iter()
.map(|entry| (*entry.key(), entry.value()))
.collect::<Vec<(RawDataKey, RawRow)>>()
});
for (raw_key, raw_row) in rows {
let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
InternalError::startup_index_rebuild_invalid_data_key(store_path, err)
})?;
let hooks = db.runtime_hook_for_entity_tag(data_key.entity_tag())?;
let row_op = CommitRowOp::new(
hooks.entity_path,
raw_key,
None,
Some(raw_row.as_bytes().to_vec()),
commit_schema_fingerprint_for_model(hooks.entity_path, hooks.model),
);
let prepared = db.prepare_row_commit_op(&row_op).map_err(|err| {
let message = format!(
"startup index rebuild failed: store='{}' entity='{}' ({})",
store_path, hooks.entity_path, err.message
);
err.with_message(message)
})?;
for index_op in prepared.index_ops {
index_op.apply();
}
}
}
Ok(())
}