Skip to main content

nodedb_cluster/decommission/
flow.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Decommission flow — emit the full ordered sequence of metadata
4//! entries that move a node from `Active` to fully removed.
5//!
6//! [`plan_full_decommission`] is pure: given a snapshot of topology
7//! and routing, it returns the exact list of
8//! [`MetadataEntry`](crate::metadata_group::MetadataEntry) values the
9//! coordinator will propose through the metadata Raft group, in the
10//! order they must commit. The flow is deterministic — two nodes
11//! looking at the same snapshot produce byte-identical plans, which
12//! means a failed coordinator can be resumed from any consistent
13//! snapshot without needing per-plan state to be replicated.
14
15use crate::error::Result;
16use crate::metadata_group::{MetadataEntry, RoutingChange, TopologyChange};
17use crate::routing::RoutingTable;
18use crate::topology::ClusterTopology;
19
20use super::safety::check_can_decommission;
21
22/// Output of [`plan_full_decommission`] — the caller proposes
23/// `entries` in order, waiting for each to commit before moving on.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct DecommissionPlan {
26    pub node_id: u64,
27    pub entries: Vec<MetadataEntry>,
28}
29
30/// Build the complete decommission plan for `node_id`.
31///
32/// Steps (in the order they appear in the returned `entries`):
33///
34/// 1. `TopologyChange::StartDecommission` — flip the target to
35///    `Draining`.
36/// 2. `RoutingChange::LeadershipTransfer` — for every group the
37///    target currently leads, hand leadership to another voter.
38/// 3. `RoutingChange::RemoveMember` — strip the target out of every
39///    group's member (and learner) list.
40/// 4. `TopologyChange::FinishDecommission` — flip the target to
41///    `Decommissioned`.
42/// 5. `TopologyChange::Leave` — remove the target from topology
43///    entirely so future peer lookups return `NodeNotFound`.
44///
45/// The safety gate in [`check_can_decommission`] runs first and
46/// returns an error without producing a plan if any group would drop
47/// below the configured replication factor.
48pub fn plan_full_decommission(
49    node_id: u64,
50    topology: &ClusterTopology,
51    routing: &RoutingTable,
52    replication_factor: usize,
53) -> Result<DecommissionPlan> {
54    check_can_decommission(node_id, topology, routing, replication_factor)?;
55
56    let mut entries = Vec::new();
57    entries.push(MetadataEntry::TopologyChange(
58        TopologyChange::StartDecommission { node_id },
59    ));
60
61    // Collect a stable, sorted group_id ordering so the plan is
62    // reproducible across HashMap iterations.
63    let mut group_ids: Vec<u64> = routing
64        .group_members()
65        .iter()
66        .filter(|(_, info)| info.members.contains(&node_id) || info.learners.contains(&node_id))
67        .map(|(gid, _)| *gid)
68        .collect();
69    group_ids.sort_unstable();
70
71    // 2. Leadership transfers for every group the target currently leads.
72    for gid in &group_ids {
73        let info = routing
74            .group_info(*gid)
75            .expect("group id came from routing snapshot");
76        if info.leader != node_id {
77            continue;
78        }
79        if let Some(&new_leader) = info.members.iter().find(|&&m| m != node_id) {
80            entries.push(MetadataEntry::RoutingChange(
81                RoutingChange::LeadershipTransfer {
82                    group_id: *gid,
83                    new_leader_node_id: new_leader,
84                },
85            ));
86        }
87    }
88
89    // 3. Remove the target from every group's member and learner sets.
90    for gid in &group_ids {
91        entries.push(MetadataEntry::RoutingChange(RoutingChange::RemoveMember {
92            group_id: *gid,
93            node_id,
94        }));
95    }
96
97    // 4. Finish decommission (topology state → Decommissioned).
98    entries.push(MetadataEntry::TopologyChange(
99        TopologyChange::FinishDecommission { node_id },
100    ));
101
102    // 5. Leave — remove from topology entirely.
103    entries.push(MetadataEntry::TopologyChange(TopologyChange::Leave {
104        node_id,
105    }));
106
107    Ok(DecommissionPlan { node_id, entries })
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use crate::topology::{NodeInfo, NodeState};
114    use std::net::SocketAddr;
115
116    fn topo(nodes: &[u64]) -> ClusterTopology {
117        let mut t = ClusterTopology::new();
118        for (i, id) in nodes.iter().enumerate() {
119            let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
120            t.add_node(NodeInfo::new(*id, addr, NodeState::Active));
121        }
122        t
123    }
124
125    #[test]
126    fn plan_shape_matches_spec() {
127        let t = topo(&[1, 2, 3]);
128        // 2 groups, RF=3 (each group has all 3 nodes). Decommission
129        // 1 with RF=2 (the surviving quorum).
130        let routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
131        let plan = plan_full_decommission(1, &t, &routing, 2).unwrap();
132        assert_eq!(plan.node_id, 1);
133
134        // First entry: StartDecommission.
135        assert!(matches!(
136            plan.entries.first(),
137            Some(MetadataEntry::TopologyChange(
138                TopologyChange::StartDecommission { node_id: 1 }
139            ))
140        ));
141
142        // Last two entries: FinishDecommission, Leave.
143        let n = plan.entries.len();
144        assert!(matches!(
145            plan.entries[n - 2],
146            MetadataEntry::TopologyChange(TopologyChange::FinishDecommission { node_id: 1 })
147        ));
148        assert!(matches!(
149            plan.entries[n - 1],
150            MetadataEntry::TopologyChange(TopologyChange::Leave { node_id: 1 })
151        ));
152
153        // Every group the target is in must get a RemoveMember.
154        // uniform(2, ...) creates 3 groups (metadata group 0 + data groups 1 and 2),
155        // and node 1 is a member of all three.
156        let remove_count = plan
157            .entries
158            .iter()
159            .filter(|e| {
160                matches!(
161                    e,
162                    MetadataEntry::RoutingChange(RoutingChange::RemoveMember { node_id: 1, .. })
163                )
164            })
165            .count();
166        assert_eq!(remove_count, 3);
167    }
168
169    #[test]
170    fn plan_emits_leadership_transfer_when_target_leads() {
171        let t = topo(&[1, 2, 3]);
172        // Data groups are 1..=2 (group 0 is metadata). Move metadata leader
173        // off the decommission target so we get exactly one data-group transfer.
174        let mut routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
175        routing.set_leader(0, 2);
176        routing.set_leader(1, 1);
177        routing.set_leader(2, 2);
178        let plan = plan_full_decommission(1, &t, &routing, 2).unwrap();
179        let transfers: Vec<_> = plan
180            .entries
181            .iter()
182            .filter_map(|e| match e {
183                MetadataEntry::RoutingChange(RoutingChange::LeadershipTransfer {
184                    group_id,
185                    new_leader_node_id,
186                }) => Some((*group_id, *new_leader_node_id)),
187                _ => None,
188            })
189            .collect();
190        assert_eq!(transfers.len(), 1);
191        assert_eq!(transfers[0].0, 1);
192        assert_ne!(transfers[0].1, 1, "new leader must not be the target");
193    }
194
195    #[test]
196    fn plan_is_deterministic() {
197        let t = topo(&[1, 2, 3]);
198        let routing = RoutingTable::uniform(4, &[1, 2, 3], 3);
199        let p1 = plan_full_decommission(2, &t, &routing, 2).unwrap();
200        let p2 = plan_full_decommission(2, &t, &routing, 2).unwrap();
201        assert_eq!(p1.entries, p2.entries);
202    }
203
204    #[test]
205    fn plan_rejected_when_safety_fails() {
206        let t = topo(&[1, 2]);
207        let routing = RoutingTable::uniform(2, &[1, 2], 2);
208        let err = plan_full_decommission(1, &t, &routing, 2).unwrap_err();
209        assert!(err.to_string().contains("replication factor"));
210    }
211
212    #[test]
213    fn plan_skips_groups_target_is_not_in() {
214        let t = topo(&[1, 2, 3]);
215        // uniform(4, ...) creates metadata group 0 + data groups 1..=4.
216        let mut routing = RoutingTable::uniform(4, &[1, 2, 3], 3);
217        // Remove node 1 from groups 0, 1, 4 so only groups 2 and 3 have node 1.
218        routing.set_group_members(0, vec![2, 3]);
219        routing.set_group_members(1, vec![2, 3]);
220        routing.set_group_members(2, vec![1, 2, 3]);
221        routing.set_group_members(3, vec![1, 2, 3]);
222        routing.set_group_members(4, vec![2, 3]);
223        let plan = plan_full_decommission(1, &t, &routing, 2).unwrap();
224        let removes: Vec<u64> = plan
225            .entries
226            .iter()
227            .filter_map(|e| match e {
228                MetadataEntry::RoutingChange(RoutingChange::RemoveMember { group_id, .. }) => {
229                    Some(*group_id)
230                }
231                _ => None,
232            })
233            .collect();
234        assert_eq!(removes, vec![2, 3]);
235    }
236}