nodedb_cluster/metadata_group/
cache.rs1use std::collections::HashMap;
16
17use nodedb_types::Hlc;
18use tracing::{debug, info, warn};
19
20use crate::error::{ClusterError, MigrationCheckpointError};
21use crate::metadata_group::compensation::Compensation;
22use crate::metadata_group::descriptors::{DescriptorId, DescriptorLease};
23use crate::metadata_group::entry::{MetadataEntry, RoutingChange, TopologyChange};
24use crate::metadata_group::migration_state::{
25 MigrationPhaseTag, PersistedMigrationCheckpoint, SharedMigrationStateTable,
26};
27use crate::routing::RoutingTable;
28
29#[derive(Debug, Default)]
31pub struct MetadataCache {
32 pub applied_index: u64,
33 pub last_applied_hlc: Hlc,
34
35 pub leases: HashMap<(DescriptorId, u64), DescriptorLease>,
37
38 pub topology_log: Vec<TopologyChange>,
40 pub routing_log: Vec<RoutingChange>,
41
42 pub cluster_version: u16,
43
44 pub catalog_entries_applied: u64,
49}
50
51impl MetadataCache {
52 pub fn new() -> Self {
53 Self::default()
54 }
55
56 pub fn apply(&mut self, index: u64, entry: &MetadataEntry) {
59 if index != 0 && index <= self.applied_index {
60 debug!(
61 index,
62 watermark = self.applied_index,
63 "metadata cache: skipping already-applied entry"
64 );
65 return;
66 }
67 self.applied_index = index;
68
69 match entry {
70 MetadataEntry::CatalogDdl { payload: _ }
71 | MetadataEntry::CatalogDdlAudited { payload: _, .. } => {
72 self.catalog_entries_applied += 1;
77 }
78 MetadataEntry::TopologyChange(change) => self.topology_log.push(change.clone()),
79 MetadataEntry::RoutingChange(change) => self.routing_log.push(change.clone()),
80
81 MetadataEntry::ClusterVersionBump { from, to } => {
82 if *from != self.cluster_version && self.cluster_version != 0 {
83 warn!(
84 expected = self.cluster_version,
85 got = *from,
86 "cluster version bump mismatch"
87 );
88 }
89 self.cluster_version = *to;
90 }
91
92 MetadataEntry::DescriptorLeaseGrant(lease) => {
93 if lease.expires_at > self.last_applied_hlc {
94 self.last_applied_hlc = lease.expires_at;
95 }
96 self.leases
97 .insert((lease.descriptor_id.clone(), lease.node_id), lease.clone());
98 }
99 MetadataEntry::DescriptorLeaseRelease {
100 node_id,
101 descriptor_ids,
102 } => {
103 for id in descriptor_ids {
104 self.leases.remove(&(id.clone(), *node_id));
105 }
106 }
107 MetadataEntry::DescriptorDrainStart { expires_at, .. } => {
114 if *expires_at > self.last_applied_hlc {
115 self.last_applied_hlc = *expires_at;
116 }
117 }
118 MetadataEntry::DescriptorDrainEnd { .. } => {}
119 MetadataEntry::CaTrustChange { .. } => {
120 }
125 MetadataEntry::SurrogateAlloc { .. } => {
126 }
130 MetadataEntry::JoinTokenTransition { .. } => {
131 }
136 MetadataEntry::Batch { entries } => {
137 for sub in entries {
138 self.apply(index, sub);
139 }
140 }
141 MetadataEntry::MigrationCheckpoint { .. } => {}
146 MetadataEntry::MigrationAbort { .. } => {}
147 }
148 }
149}
150
151pub fn apply_migration_checkpoint(
159 table: &SharedMigrationStateTable,
160 migration_id: uuid::Uuid,
161 _phase: MigrationPhaseTag,
162 attempt: u32,
163 payload: crate::metadata_group::migration_state::MigrationCheckpointPayload,
164 expected_crc: u32,
165 ts_ms: u64,
166) -> Result<(), ClusterError> {
167 let actual_crc = payload.crc32c()?;
169 if actual_crc != expected_crc {
170 return Err(ClusterError::MigrationCheckpoint(
171 MigrationCheckpointError::Crc32cMismatch {
172 migration_id,
173 expected: expected_crc,
174 actual: actual_crc,
175 },
176 ));
177 }
178
179 let row = PersistedMigrationCheckpoint {
180 migration_id: migration_id.hyphenated().to_string(),
181 attempt,
182 payload,
183 crc32c: actual_crc,
184 ts_ms,
185 };
186
187 let mut guard = table.lock().unwrap_or_else(|p| p.into_inner());
188 guard.upsert(row)
189}
190
191pub fn apply_migration_abort(
198 table: &SharedMigrationStateTable,
199 routing: Option<&std::sync::Arc<std::sync::RwLock<RoutingTable>>>,
200 migration_id: uuid::Uuid,
201 reason: &str,
202 compensations: &[Compensation],
203) -> Result<(), ClusterError> {
204 info!(
205 migration_id = %migration_id,
206 reason,
207 steps = compensations.len(),
208 "applying migration abort"
209 );
210
211 for (step, comp) in compensations.iter().enumerate() {
212 apply_compensation(routing, step, migration_id, comp)?;
213 }
214
215 let mut guard = table.lock().unwrap_or_else(|p| p.into_inner());
216 guard.remove(&migration_id)
217}
218
219fn apply_compensation(
220 routing: Option<&std::sync::Arc<std::sync::RwLock<RoutingTable>>>,
221 step: usize,
222 migration_id: uuid::Uuid,
223 comp: &Compensation,
224) -> Result<(), ClusterError> {
225 let Some(live) = routing else {
226 debug!(
229 migration_id = %migration_id,
230 step,
231 ?comp,
232 "compensation: no live routing handle, skipping"
233 );
234 return Ok(());
235 };
236
237 let mut rt = live.write().unwrap_or_else(|p| p.into_inner());
238 match comp {
239 Compensation::RemoveLearner { group_id, peer_id }
240 | Compensation::RemoveVoter { group_id, peer_id } => {
241 rt.remove_group_member(*group_id, *peer_id);
242 }
243 Compensation::RestoreLeaderHint { group_id, peer_id } => {
244 rt.set_leader(*group_id, *peer_id);
245 }
246 Compensation::RemoveGhostStub { vshard_id: _ } => {
247 }
250 }
251 drop(rt);
252
253 debug!(
254 migration_id = %migration_id,
255 step,
256 ?comp,
257 "compensation applied"
258 );
259 Ok(())
260}