Skip to main content

nodedb_cluster/metadata_group/
applier.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! [`MetadataApplier`] trait: the contract raft_loop uses to dispatch
4//! committed entries on the metadata group (group 0).
5
6use std::net::SocketAddr;
7use std::sync::{Arc, RwLock};
8
9use tracing::{error, warn};
10use uuid;
11
12use crate::auth::raft_backed_store::apply_token_transition_to_mirror;
13use crate::auth::token_state::SharedTokenStateMirror;
14use crate::metadata_group::cache::{
15    MetadataCache, apply_migration_abort, apply_migration_checkpoint,
16};
17use crate::metadata_group::codec::decode_entry;
18use crate::metadata_group::entry::{MetadataEntry, RoutingChange, TopologyChange};
19use crate::metadata_group::migration_state::SharedMigrationStateTable;
20use crate::routing::RoutingTable;
21use crate::topology::{ClusterTopology, NodeInfo, NodeState};
22
23/// Applies committed metadata entries to local state.
24///
25/// Implemented in the `nodedb-cluster` crate as [`CacheApplier`]
26/// (tracks cluster-owned state only: topology/routing/leases/
27/// version + a CatalogDdl counter) and wrapped by the production
28/// applier in the `nodedb` crate to additionally decode the
29/// `CatalogDdl` payload as a `CatalogEntry` and write through to
30/// `SystemCatalog`.
31pub trait MetadataApplier: Send + Sync + 'static {
32    /// Apply a batch of committed raft entries. Entries with empty
33    /// `data` (raft no-ops) are skipped. Returns the highest log
34    /// index applied.
35    fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64;
36}
37
38/// Default applier that writes committed entries to an in-memory
39/// [`MetadataCache`]. The cache is shared with the rest of the
40/// process via `Arc<RwLock<_>>`.
41#[derive(Clone)]
42pub struct CacheApplier {
43    cache: Arc<RwLock<MetadataCache>>,
44    /// Optional live topology handle. When set, committed
45    /// `TopologyChange` entries mutate this handle in place so the
46    /// rest of the process sees the new state immediately — decommission
47    /// state transitions, joiner promotion, and `Leave` removal all
48    /// flow through here.
49    live_topology: Option<Arc<RwLock<ClusterTopology>>>,
50    /// Optional live routing table handle. When set, committed
51    /// `RoutingChange` entries (leadership transfer, member removal,
52    /// vshard reassignment) mutate this handle in place.
53    live_routing: Option<Arc<RwLock<RoutingTable>>>,
54    /// Optional migration state table handle. When set, committed
55    /// `MigrationCheckpoint` and `MigrationAbort` entries mutate the
56    /// table in place. Missing handle is NOT an error — tests and
57    /// subsystems that don't manage migrations omit it.
58    migration_state: Option<SharedMigrationStateTable>,
59    /// Optional token state mirror. When set, committed
60    /// `JoinTokenTransition` entries mutate the mirror so that
61    /// `RaftBackedTokenStore` reads see the post-apply state immediately
62    /// after `propose_and_wait` returns. Missing handle is NOT an error
63    /// — tests and subsystems that don't manage join tokens omit it.
64    token_state: Option<SharedTokenStateMirror>,
65}
66
67impl CacheApplier {
68    pub fn new(cache: Arc<RwLock<MetadataCache>>) -> Self {
69        Self {
70            cache,
71            live_topology: None,
72            live_routing: None,
73            migration_state: None,
74            token_state: None,
75        }
76    }
77
78    /// Extend this applier with live topology/routing handles. When
79    /// set, committed `TopologyChange` and `RoutingChange` entries
80    /// mutate the handles in place in addition to the in-memory
81    /// history log kept in `MetadataCache`. Backward-compatible:
82    /// existing callers that don't attach handles see no behaviour
83    /// change.
84    pub fn with_live_state(
85        mut self,
86        topology: Arc<RwLock<ClusterTopology>>,
87        routing: Arc<RwLock<RoutingTable>>,
88    ) -> Self {
89        self.live_topology = Some(topology);
90        self.live_routing = Some(routing);
91        self
92    }
93
94    /// Attach a migration state table so that committed
95    /// `MigrationCheckpoint` and `MigrationAbort` entries are
96    /// durably persisted. Backward-compatible: existing callers that
97    /// don't manage migrations can omit this.
98    pub fn with_migration_state(mut self, migration_state: SharedMigrationStateTable) -> Self {
99        self.migration_state = Some(migration_state);
100        self
101    }
102
103    /// Attach a token state mirror so that committed
104    /// `JoinTokenTransition` entries are reflected into the shared
105    /// mirror immediately after apply. The same `Arc` must be passed to
106    /// `RaftBackedTokenStore::new` so both sides share the same table.
107    /// Backward-compatible: existing callers that don't use join tokens
108    /// omit this.
109    pub fn with_token_state(mut self, token_state: SharedTokenStateMirror) -> Self {
110        self.token_state = Some(token_state);
111        self
112    }
113
114    pub fn cache(&self) -> Arc<RwLock<MetadataCache>> {
115        self.cache.clone()
116    }
117
118    /// Mutate the live topology handle (if attached) in response to
119    /// a committed `TopologyChange`. Optional; no-op when not configured.
120    fn apply_topology_change(&self, change: &TopologyChange) {
121        let Some(live) = &self.live_topology else {
122            return;
123        };
124        let mut topo = live.write().unwrap_or_else(|p| p.into_inner());
125        match change {
126            TopologyChange::Join { node_id, addr } => {
127                if topo.contains(*node_id) {
128                    return;
129                }
130                let parsed: SocketAddr = addr.parse().unwrap_or_else(|_| {
131                    warn!(node_id, addr, "join: invalid address, using placeholder");
132                    SocketAddr::from(([0, 0, 0, 0], 0))
133                });
134                topo.join_as_learner(NodeInfo::new(*node_id, parsed, NodeState::Joining));
135            }
136            TopologyChange::PromoteToVoter { node_id } => {
137                topo.promote_to_voter(*node_id);
138            }
139            TopologyChange::StartDecommission { node_id } => {
140                topo.set_state(*node_id, NodeState::Draining);
141            }
142            TopologyChange::FinishDecommission { node_id } => {
143                topo.set_state(*node_id, NodeState::Decommissioned);
144            }
145            TopologyChange::Leave { node_id } => {
146                topo.remove_node(*node_id);
147            }
148        }
149    }
150
151    /// Cascade live-state mutations for a committed entry. Handles
152    /// `Batch` by recursing into each sub-entry.
153    fn cascade_live_state(&self, entry: &MetadataEntry) {
154        match entry {
155            MetadataEntry::TopologyChange(change) => self.apply_topology_change(change),
156            MetadataEntry::RoutingChange(change) => self.apply_routing_change(change),
157            MetadataEntry::Batch { entries } => {
158                for sub in entries {
159                    self.cascade_live_state(sub);
160                }
161            }
162            MetadataEntry::MigrationCheckpoint {
163                migration_id,
164                phase,
165                attempt,
166                payload,
167                crc32c,
168                ts_ms,
169            } => {
170                if let Some(table) = &self.migration_state {
171                    let parsed_id = migration_id
172                        .parse::<uuid::Uuid>()
173                        .unwrap_or_else(|_| uuid::Uuid::nil());
174                    if let Err(e) = apply_migration_checkpoint(
175                        table,
176                        parsed_id,
177                        *phase,
178                        *attempt,
179                        payload.clone(),
180                        *crc32c,
181                        *ts_ms,
182                    ) {
183                        // CRC32C mismatch is fatal — corruption must not be silenced.
184                        error!(
185                            migration_id = %migration_id,
186                            error = %e,
187                            "FATAL: migration checkpoint CRC32C mismatch — possible corruption"
188                        );
189                        panic!("migration checkpoint CRC32C mismatch: {e}");
190                    }
191                }
192            }
193            MetadataEntry::MigrationAbort {
194                migration_id,
195                reason,
196                compensations,
197            } => {
198                if let Some(table) = &self.migration_state {
199                    let parsed_id = migration_id
200                        .parse::<uuid::Uuid>()
201                        .unwrap_or_else(|_| uuid::Uuid::nil());
202                    if let Err(e) = apply_migration_abort(
203                        table,
204                        self.live_routing.as_ref(),
205                        parsed_id,
206                        reason,
207                        compensations,
208                    ) {
209                        error!(
210                            migration_id = %migration_id,
211                            error = %e,
212                            "FATAL: migration abort compensation failed"
213                        );
214                        panic!("migration abort compensation failed: {e}");
215                    }
216                }
217            }
218            MetadataEntry::JoinTokenTransition {
219                token_hash,
220                transition,
221                ts_ms,
222            } => {
223                if let Some(mirror) = &self.token_state {
224                    apply_token_transition_to_mirror(mirror, *token_hash, transition, *ts_ms);
225                }
226            }
227            _ => {}
228        }
229    }
230
231    /// Mutate the live routing handle (if attached) in response to
232    /// a committed `RoutingChange`.
233    fn apply_routing_change(&self, change: &RoutingChange) {
234        let Some(live) = &self.live_routing else {
235            return;
236        };
237        let mut rt = live.write().unwrap_or_else(|p| p.into_inner());
238        match change {
239            RoutingChange::ReassignVShard {
240                vshard_id,
241                new_group_id,
242                new_leaseholder_node_id,
243            } => {
244                rt.reassign_vshard(*vshard_id, *new_group_id);
245                rt.set_leader(*new_group_id, *new_leaseholder_node_id);
246            }
247            RoutingChange::LeadershipTransfer {
248                group_id,
249                new_leader_node_id,
250            } => {
251                rt.set_leader(*group_id, *new_leader_node_id);
252            }
253            RoutingChange::RemoveMember { group_id, node_id } => {
254                rt.remove_group_member(*group_id, *node_id);
255            }
256        }
257    }
258}
259
260impl MetadataApplier for CacheApplier {
261    fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
262        let mut last = 0u64;
263        let mut guard = self
264            .cache
265            .write()
266            .unwrap_or_else(|poison| poison.into_inner());
267        for (index, data) in entries {
268            last = *index;
269            if data.is_empty() {
270                continue;
271            }
272            match decode_entry(data) {
273                Ok(entry) => {
274                    guard.apply(*index, &entry);
275                    self.cascade_live_state(&entry);
276                }
277                Err(e) => warn!(index = *index, error = %e, "metadata decode failed"),
278            }
279        }
280        last
281    }
282}
283
284/// No-op applier used by tests and subsystems that don't care about the
285/// metadata stream. Still drains entries and returns the correct last
286/// index so raft can advance its applied watermark.
287pub struct NoopMetadataApplier;
288
289impl MetadataApplier for NoopMetadataApplier {
290    fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
291        entries.last().map(|(idx, _)| *idx).unwrap_or(0)
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::metadata_group::codec::encode_entry;
299    use crate::metadata_group::entry::{MetadataEntry, TopologyChange};
300
301    #[test]
302    fn cache_applier_counts_catalog_ddl() {
303        let cache = Arc::new(RwLock::new(MetadataCache::new()));
304        let applier = CacheApplier::new(cache.clone());
305
306        let ddl = encode_entry(&MetadataEntry::CatalogDdl {
307            payload: vec![1, 2, 3],
308        })
309        .unwrap();
310        let topo = encode_entry(&MetadataEntry::TopologyChange(TopologyChange::Join {
311            node_id: 7,
312            addr: "10.0.0.7:9000".into(),
313        }))
314        .unwrap();
315
316        let last = applier.apply(&[(1, ddl), (2, topo)]);
317        assert_eq!(last, 2);
318
319        let guard = cache.read().unwrap();
320        assert_eq!(guard.applied_index, 2);
321        assert_eq!(guard.catalog_entries_applied, 1);
322        assert_eq!(guard.topology_log.len(), 1);
323    }
324
325    #[test]
326    fn cache_applier_idempotent() {
327        let cache = Arc::new(RwLock::new(MetadataCache::new()));
328        let applier = CacheApplier::new(cache.clone());
329
330        let bytes = encode_entry(&MetadataEntry::CatalogDdl {
331            payload: vec![9, 9],
332        })
333        .unwrap();
334        applier.apply(&[(5, bytes.clone())]);
335        applier.apply(&[(3, bytes)]); // Earlier index — ignored.
336
337        let guard = cache.read().unwrap();
338        assert_eq!(guard.applied_index, 5);
339        assert_eq!(guard.catalog_entries_applied, 1);
340    }
341
342    #[test]
343    fn cache_applier_mutates_live_topology_on_start_decommission() {
344        use crate::topology::{ClusterTopology, NodeInfo, NodeState};
345        use std::net::SocketAddr;
346
347        let cache = Arc::new(RwLock::new(MetadataCache::new()));
348        let mut t = ClusterTopology::new();
349        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
350        t.add_node(NodeInfo::new(7, addr, NodeState::Active));
351        let topology = Arc::new(RwLock::new(t));
352        let routing = Arc::new(RwLock::new(crate::routing::RoutingTable::uniform(
353            1,
354            &[7],
355            1,
356        )));
357        let applier =
358            CacheApplier::new(cache.clone()).with_live_state(topology.clone(), routing.clone());
359
360        let bytes = encode_entry(&MetadataEntry::TopologyChange(
361            TopologyChange::StartDecommission { node_id: 7 },
362        ))
363        .unwrap();
364        applier.apply(&[(1, bytes)]);
365
366        let topo = topology.read().unwrap();
367        assert_eq!(topo.get_node(7).unwrap().state, NodeState::Draining);
368    }
369
370    #[test]
371    fn cache_applier_mutates_live_routing_on_remove_member() {
372        use crate::metadata_group::entry::RoutingChange;
373
374        let cache = Arc::new(RwLock::new(MetadataCache::new()));
375        let topology = Arc::new(RwLock::new(crate::topology::ClusterTopology::new()));
376        let routing = Arc::new(RwLock::new(crate::routing::RoutingTable::uniform(
377            1,
378            &[1, 2, 3],
379            3,
380        )));
381        let applier =
382            CacheApplier::new(cache.clone()).with_live_state(topology.clone(), routing.clone());
383
384        let bytes = encode_entry(&MetadataEntry::RoutingChange(RoutingChange::RemoveMember {
385            group_id: 0,
386            node_id: 2,
387        }))
388        .unwrap();
389        applier.apply(&[(1, bytes)]);
390
391        let rt = routing.read().unwrap();
392        assert!(!rt.group_info(0).unwrap().members.contains(&2));
393    }
394
395    #[test]
396    fn cache_applier_without_live_state_stays_log_only() {
397        let cache = Arc::new(RwLock::new(MetadataCache::new()));
398        let applier = CacheApplier::new(cache.clone());
399        let bytes = encode_entry(&MetadataEntry::TopologyChange(
400            TopologyChange::StartDecommission { node_id: 5 },
401        ))
402        .unwrap();
403        // Must not panic and must still advance the applied index.
404        let last = applier.apply(&[(1, bytes)]);
405        assert_eq!(last, 1);
406    }
407
408    #[test]
409    fn noop_applier_advances_watermark() {
410        let noop = NoopMetadataApplier;
411        assert_eq!(noop.apply(&[(7, b"x".to_vec()), (9, b"y".to_vec())]), 9);
412        assert_eq!(noop.apply(&[]), 0);
413    }
414}