nodedb_cluster/metadata_group/
applier.rs1use std::net::SocketAddr;
7use std::sync::{Arc, RwLock};
8
9use tracing::{error, warn};
10use uuid;
11
12use crate::auth::raft_backed_store::apply_token_transition_to_mirror;
13use crate::auth::token_state::SharedTokenStateMirror;
14use crate::metadata_group::cache::{
15 MetadataCache, apply_migration_abort, apply_migration_checkpoint,
16};
17use crate::metadata_group::codec::decode_entry;
18use crate::metadata_group::entry::{MetadataEntry, RoutingChange, TopologyChange};
19use crate::metadata_group::migration_state::SharedMigrationStateTable;
20use crate::routing::RoutingTable;
21use crate::topology::{ClusterTopology, NodeInfo, NodeState};
22
23pub trait MetadataApplier: Send + Sync + 'static {
32 fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64;
36}
37
38#[derive(Clone)]
42pub struct CacheApplier {
43 cache: Arc<RwLock<MetadataCache>>,
44 live_topology: Option<Arc<RwLock<ClusterTopology>>>,
50 live_routing: Option<Arc<RwLock<RoutingTable>>>,
54 migration_state: Option<SharedMigrationStateTable>,
59 token_state: Option<SharedTokenStateMirror>,
65}
66
67impl CacheApplier {
68 pub fn new(cache: Arc<RwLock<MetadataCache>>) -> Self {
69 Self {
70 cache,
71 live_topology: None,
72 live_routing: None,
73 migration_state: None,
74 token_state: None,
75 }
76 }
77
78 pub fn with_live_state(
85 mut self,
86 topology: Arc<RwLock<ClusterTopology>>,
87 routing: Arc<RwLock<RoutingTable>>,
88 ) -> Self {
89 self.live_topology = Some(topology);
90 self.live_routing = Some(routing);
91 self
92 }
93
94 pub fn with_migration_state(mut self, migration_state: SharedMigrationStateTable) -> Self {
99 self.migration_state = Some(migration_state);
100 self
101 }
102
103 pub fn with_token_state(mut self, token_state: SharedTokenStateMirror) -> Self {
110 self.token_state = Some(token_state);
111 self
112 }
113
114 pub fn cache(&self) -> Arc<RwLock<MetadataCache>> {
115 self.cache.clone()
116 }
117
118 fn apply_topology_change(&self, change: &TopologyChange) {
121 let Some(live) = &self.live_topology else {
122 return;
123 };
124 let mut topo = live.write().unwrap_or_else(|p| p.into_inner());
125 match change {
126 TopologyChange::Join { node_id, addr } => {
127 if topo.contains(*node_id) {
128 return;
129 }
130 let parsed: SocketAddr = addr.parse().unwrap_or_else(|_| {
131 warn!(node_id, addr, "join: invalid address, using placeholder");
132 SocketAddr::from(([0, 0, 0, 0], 0))
133 });
134 topo.join_as_learner(NodeInfo::new(*node_id, parsed, NodeState::Joining));
135 }
136 TopologyChange::PromoteToVoter { node_id } => {
137 topo.promote_to_voter(*node_id);
138 }
139 TopologyChange::StartDecommission { node_id } => {
140 topo.set_state(*node_id, NodeState::Draining);
141 }
142 TopologyChange::FinishDecommission { node_id } => {
143 topo.set_state(*node_id, NodeState::Decommissioned);
144 }
145 TopologyChange::Leave { node_id } => {
146 topo.remove_node(*node_id);
147 }
148 }
149 }
150
151 fn cascade_live_state(&self, entry: &MetadataEntry) {
154 match entry {
155 MetadataEntry::TopologyChange(change) => self.apply_topology_change(change),
156 MetadataEntry::RoutingChange(change) => self.apply_routing_change(change),
157 MetadataEntry::Batch { entries } => {
158 for sub in entries {
159 self.cascade_live_state(sub);
160 }
161 }
162 MetadataEntry::MigrationCheckpoint {
163 migration_id,
164 phase,
165 attempt,
166 payload,
167 crc32c,
168 ts_ms,
169 } => {
170 if let Some(table) = &self.migration_state {
171 let parsed_id = migration_id
172 .parse::<uuid::Uuid>()
173 .unwrap_or_else(|_| uuid::Uuid::nil());
174 if let Err(e) = apply_migration_checkpoint(
175 table,
176 parsed_id,
177 *phase,
178 *attempt,
179 payload.clone(),
180 *crc32c,
181 *ts_ms,
182 ) {
183 error!(
185 migration_id = %migration_id,
186 error = %e,
187 "FATAL: migration checkpoint CRC32C mismatch — possible corruption"
188 );
189 panic!("migration checkpoint CRC32C mismatch: {e}");
190 }
191 }
192 }
193 MetadataEntry::MigrationAbort {
194 migration_id,
195 reason,
196 compensations,
197 } => {
198 if let Some(table) = &self.migration_state {
199 let parsed_id = migration_id
200 .parse::<uuid::Uuid>()
201 .unwrap_or_else(|_| uuid::Uuid::nil());
202 if let Err(e) = apply_migration_abort(
203 table,
204 self.live_routing.as_ref(),
205 parsed_id,
206 reason,
207 compensations,
208 ) {
209 error!(
210 migration_id = %migration_id,
211 error = %e,
212 "FATAL: migration abort compensation failed"
213 );
214 panic!("migration abort compensation failed: {e}");
215 }
216 }
217 }
218 MetadataEntry::JoinTokenTransition {
219 token_hash,
220 transition,
221 ts_ms,
222 } => {
223 if let Some(mirror) = &self.token_state {
224 apply_token_transition_to_mirror(mirror, *token_hash, transition, *ts_ms);
225 }
226 }
227 _ => {}
228 }
229 }
230
231 fn apply_routing_change(&self, change: &RoutingChange) {
234 let Some(live) = &self.live_routing else {
235 return;
236 };
237 let mut rt = live.write().unwrap_or_else(|p| p.into_inner());
238 match change {
239 RoutingChange::ReassignVShard {
240 vshard_id,
241 new_group_id,
242 new_leaseholder_node_id,
243 } => {
244 rt.reassign_vshard(*vshard_id, *new_group_id);
245 rt.set_leader(*new_group_id, *new_leaseholder_node_id);
246 }
247 RoutingChange::LeadershipTransfer {
248 group_id,
249 new_leader_node_id,
250 } => {
251 rt.set_leader(*group_id, *new_leader_node_id);
252 }
253 RoutingChange::RemoveMember { group_id, node_id } => {
254 rt.remove_group_member(*group_id, *node_id);
255 }
256 }
257 }
258}
259
260impl MetadataApplier for CacheApplier {
261 fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
262 let mut last = 0u64;
263 let mut guard = self
264 .cache
265 .write()
266 .unwrap_or_else(|poison| poison.into_inner());
267 for (index, data) in entries {
268 last = *index;
269 if data.is_empty() {
270 continue;
271 }
272 match decode_entry(data) {
273 Ok(entry) => {
274 guard.apply(*index, &entry);
275 self.cascade_live_state(&entry);
276 }
277 Err(e) => warn!(index = *index, error = %e, "metadata decode failed"),
278 }
279 }
280 last
281 }
282}
283
284pub struct NoopMetadataApplier;
288
289impl MetadataApplier for NoopMetadataApplier {
290 fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
291 entries.last().map(|(idx, _)| *idx).unwrap_or(0)
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::metadata_group::codec::encode_entry;
299 use crate::metadata_group::entry::{MetadataEntry, TopologyChange};
300
301 #[test]
302 fn cache_applier_counts_catalog_ddl() {
303 let cache = Arc::new(RwLock::new(MetadataCache::new()));
304 let applier = CacheApplier::new(cache.clone());
305
306 let ddl = encode_entry(&MetadataEntry::CatalogDdl {
307 payload: vec![1, 2, 3],
308 })
309 .unwrap();
310 let topo = encode_entry(&MetadataEntry::TopologyChange(TopologyChange::Join {
311 node_id: 7,
312 addr: "10.0.0.7:9000".into(),
313 }))
314 .unwrap();
315
316 let last = applier.apply(&[(1, ddl), (2, topo)]);
317 assert_eq!(last, 2);
318
319 let guard = cache.read().unwrap();
320 assert_eq!(guard.applied_index, 2);
321 assert_eq!(guard.catalog_entries_applied, 1);
322 assert_eq!(guard.topology_log.len(), 1);
323 }
324
325 #[test]
326 fn cache_applier_idempotent() {
327 let cache = Arc::new(RwLock::new(MetadataCache::new()));
328 let applier = CacheApplier::new(cache.clone());
329
330 let bytes = encode_entry(&MetadataEntry::CatalogDdl {
331 payload: vec![9, 9],
332 })
333 .unwrap();
334 applier.apply(&[(5, bytes.clone())]);
335 applier.apply(&[(3, bytes)]); let guard = cache.read().unwrap();
338 assert_eq!(guard.applied_index, 5);
339 assert_eq!(guard.catalog_entries_applied, 1);
340 }
341
342 #[test]
343 fn cache_applier_mutates_live_topology_on_start_decommission() {
344 use crate::topology::{ClusterTopology, NodeInfo, NodeState};
345 use std::net::SocketAddr;
346
347 let cache = Arc::new(RwLock::new(MetadataCache::new()));
348 let mut t = ClusterTopology::new();
349 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
350 t.add_node(NodeInfo::new(7, addr, NodeState::Active));
351 let topology = Arc::new(RwLock::new(t));
352 let routing = Arc::new(RwLock::new(crate::routing::RoutingTable::uniform(
353 1,
354 &[7],
355 1,
356 )));
357 let applier =
358 CacheApplier::new(cache.clone()).with_live_state(topology.clone(), routing.clone());
359
360 let bytes = encode_entry(&MetadataEntry::TopologyChange(
361 TopologyChange::StartDecommission { node_id: 7 },
362 ))
363 .unwrap();
364 applier.apply(&[(1, bytes)]);
365
366 let topo = topology.read().unwrap();
367 assert_eq!(topo.get_node(7).unwrap().state, NodeState::Draining);
368 }
369
370 #[test]
371 fn cache_applier_mutates_live_routing_on_remove_member() {
372 use crate::metadata_group::entry::RoutingChange;
373
374 let cache = Arc::new(RwLock::new(MetadataCache::new()));
375 let topology = Arc::new(RwLock::new(crate::topology::ClusterTopology::new()));
376 let routing = Arc::new(RwLock::new(crate::routing::RoutingTable::uniform(
377 1,
378 &[1, 2, 3],
379 3,
380 )));
381 let applier =
382 CacheApplier::new(cache.clone()).with_live_state(topology.clone(), routing.clone());
383
384 let bytes = encode_entry(&MetadataEntry::RoutingChange(RoutingChange::RemoveMember {
385 group_id: 0,
386 node_id: 2,
387 }))
388 .unwrap();
389 applier.apply(&[(1, bytes)]);
390
391 let rt = routing.read().unwrap();
392 assert!(!rt.group_info(0).unwrap().members.contains(&2));
393 }
394
395 #[test]
396 fn cache_applier_without_live_state_stays_log_only() {
397 let cache = Arc::new(RwLock::new(MetadataCache::new()));
398 let applier = CacheApplier::new(cache.clone());
399 let bytes = encode_entry(&MetadataEntry::TopologyChange(
400 TopologyChange::StartDecommission { node_id: 5 },
401 ))
402 .unwrap();
403 let last = applier.apply(&[(1, bytes)]);
405 assert_eq!(last, 1);
406 }
407
408 #[test]
409 fn noop_applier_advances_watermark() {
410 let noop = NoopMetadataApplier;
411 assert_eq!(noop.apply(&[(7, b"x".to_vec()), (9, b"y".to_vec())]), 9);
412 assert_eq!(noop.apply(&[]), 0);
413 }
414}