use std::collections::HashMap;
use nodedb_types::Hlc;
use tracing::{debug, info, warn};
use crate::error::{ClusterError, MigrationCheckpointError};
use crate::metadata_group::compensation::Compensation;
use crate::metadata_group::descriptors::{DescriptorId, DescriptorLease};
use crate::metadata_group::entry::{MetadataEntry, RoutingChange, TopologyChange};
use crate::metadata_group::migration_state::{
MigrationPhaseTag, PersistedMigrationCheckpoint, SharedMigrationStateTable,
};
use crate::routing::RoutingTable;
#[derive(Debug, Default)]
pub struct MetadataCache {
pub applied_index: u64,
pub last_applied_hlc: Hlc,
pub leases: HashMap<(DescriptorId, u64), DescriptorLease>,
pub topology_log: Vec<TopologyChange>,
pub routing_log: Vec<RoutingChange>,
pub cluster_version: u16,
pub catalog_entries_applied: u64,
}
impl MetadataCache {
pub fn new() -> Self {
Self::default()
}
pub fn apply(&mut self, index: u64, entry: &MetadataEntry) {
if index != 0 && index <= self.applied_index {
debug!(
index,
watermark = self.applied_index,
"metadata cache: skipping already-applied entry"
);
return;
}
self.applied_index = index;
match entry {
MetadataEntry::CatalogDdl { payload: _ }
| MetadataEntry::CatalogDdlAudited { payload: _, .. } => {
self.catalog_entries_applied += 1;
}
MetadataEntry::TopologyChange(change) => self.topology_log.push(change.clone()),
MetadataEntry::RoutingChange(change) => self.routing_log.push(change.clone()),
MetadataEntry::ClusterVersionBump { from, to } => {
if *from != self.cluster_version && self.cluster_version != 0 {
warn!(
expected = self.cluster_version,
got = *from,
"cluster version bump mismatch"
);
}
self.cluster_version = *to;
}
MetadataEntry::DescriptorLeaseGrant(lease) => {
if lease.expires_at > self.last_applied_hlc {
self.last_applied_hlc = lease.expires_at;
}
self.leases
.insert((lease.descriptor_id.clone(), lease.node_id), lease.clone());
}
MetadataEntry::DescriptorLeaseRelease {
node_id,
descriptor_ids,
} => {
for id in descriptor_ids {
self.leases.remove(&(id.clone(), *node_id));
}
}
MetadataEntry::DescriptorDrainStart { expires_at, .. } => {
if *expires_at > self.last_applied_hlc {
self.last_applied_hlc = *expires_at;
}
}
MetadataEntry::DescriptorDrainEnd { .. } => {}
MetadataEntry::CaTrustChange { .. } => {
}
MetadataEntry::SurrogateAlloc { .. } => {
}
MetadataEntry::JoinTokenTransition { .. } => {
}
MetadataEntry::Batch { entries } => {
for sub in entries {
self.apply(index, sub);
}
}
MetadataEntry::MigrationCheckpoint { .. } => {}
MetadataEntry::MigrationAbort { .. } => {}
}
}
}
pub fn apply_migration_checkpoint(
table: &SharedMigrationStateTable,
migration_id: uuid::Uuid,
_phase: MigrationPhaseTag,
attempt: u32,
payload: crate::metadata_group::migration_state::MigrationCheckpointPayload,
expected_crc: u32,
ts_ms: u64,
) -> Result<(), ClusterError> {
let actual_crc = payload.crc32c()?;
if actual_crc != expected_crc {
return Err(ClusterError::MigrationCheckpoint(
MigrationCheckpointError::Crc32cMismatch {
migration_id,
expected: expected_crc,
actual: actual_crc,
},
));
}
let row = PersistedMigrationCheckpoint {
migration_id: migration_id.hyphenated().to_string(),
attempt,
payload,
crc32c: actual_crc,
ts_ms,
};
let mut guard = table.lock().unwrap_or_else(|p| p.into_inner());
guard.upsert(row)
}
pub fn apply_migration_abort(
table: &SharedMigrationStateTable,
routing: Option<&std::sync::Arc<std::sync::RwLock<RoutingTable>>>,
migration_id: uuid::Uuid,
reason: &str,
compensations: &[Compensation],
) -> Result<(), ClusterError> {
info!(
migration_id = %migration_id,
reason,
steps = compensations.len(),
"applying migration abort"
);
for (step, comp) in compensations.iter().enumerate() {
apply_compensation(routing, step, migration_id, comp)?;
}
let mut guard = table.lock().unwrap_or_else(|p| p.into_inner());
guard.remove(&migration_id)
}
fn apply_compensation(
routing: Option<&std::sync::Arc<std::sync::RwLock<RoutingTable>>>,
step: usize,
migration_id: uuid::Uuid,
comp: &Compensation,
) -> Result<(), ClusterError> {
let Some(live) = routing else {
debug!(
migration_id = %migration_id,
step,
?comp,
"compensation: no live routing handle, skipping"
);
return Ok(());
};
let mut rt = live.write().unwrap_or_else(|p| p.into_inner());
match comp {
Compensation::RemoveLearner { group_id, peer_id }
| Compensation::RemoveVoter { group_id, peer_id } => {
rt.remove_group_member(*group_id, *peer_id);
}
Compensation::RestoreLeaderHint { group_id, peer_id } => {
rt.set_leader(*group_id, *peer_id);
}
Compensation::RemoveGhostStub { vshard_id: _ } => {
}
}
drop(rt);
debug!(
migration_id = %migration_id,
step,
?comp,
"compensation applied"
);
Ok(())
}