Skip to main content

nodedb_cluster/
cluster_info.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Observability view of the cluster — snapshot types, trait for
4//! querying per-group Raft status, and the `ClusterObserver` handle
5//! bundled into `SharedState` for HTTP / metrics readers.
6//!
7//! The split between `ClusterObserver` (operational aggregation) and
8//! the individual sources it pulls from (`ClusterTopology`,
9//! `RoutingTable`, `ClusterLifecycleTracker`, `GroupStatusProvider`)
10//! keeps every consumer parameter-free: an HTTP handler takes a
11//! single `Arc<ClusterObserver>` and gets a complete, serialisable
12//! snapshot of everything the cluster surface exposes.
13
14use std::sync::{Arc, RwLock};
15
16use serde::{Deserialize, Serialize};
17
18use crate::forward::PlanExecutor;
19use crate::lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
20use crate::multi_raft::GroupStatus;
21use crate::raft_loop::{CommitApplier, RaftLoop};
22use crate::routing::RoutingTable;
23use crate::topology::ClusterTopology;
24
25/// Read-only accessor for per-group Raft status.
26///
27/// Implemented for every `RaftLoop` via a blanket impl so the main
28/// binary can coerce `Arc<RaftLoop<...>>` to `Arc<dyn
29/// GroupStatusProvider + Send + Sync>` without thinking about the
30/// `CommitApplier` / `PlanExecutor` type parameters.
31pub trait GroupStatusProvider: Send + Sync {
32    /// Current status of every Raft group hosted on this node.
33    fn group_statuses(&self) -> Vec<GroupStatus>;
34}
35
36impl<A, P> GroupStatusProvider for RaftLoop<A, P>
37where
38    A: CommitApplier,
39    P: PlanExecutor,
40{
41    fn group_statuses(&self) -> Vec<GroupStatus> {
42        RaftLoop::group_statuses(self)
43    }
44}
45
46/// Aggregated observability handle for the cluster.
47///
48/// Stored in `SharedState::cluster_observer` as an
49/// `Arc<ClusterObserver>` so HTTP route handlers and the metrics
50/// endpoint can build snapshots without threading four separate
51/// handles through every call.
52///
53/// Construction is done exactly once — after `start_cluster` has
54/// returned and `start_raft` has built the `RaftLoop`. See
55/// `nodedb::control::cluster::start_raft` for the wiring.
56pub struct ClusterObserver {
57    /// This node's id.
58    pub node_id: u64,
59    /// Lifecycle phase tracker (shared with `start_cluster`).
60    pub lifecycle: ClusterLifecycleTracker,
61    /// Shared cluster topology.
62    pub topology: Arc<RwLock<ClusterTopology>>,
63    /// Shared routing table.
64    pub routing: Arc<RwLock<RoutingTable>>,
65    /// Type-erased per-group status provider backed by `RaftLoop`.
66    pub group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
67}
68
69impl ClusterObserver {
70    pub fn new(
71        node_id: u64,
72        lifecycle: ClusterLifecycleTracker,
73        topology: Arc<RwLock<ClusterTopology>>,
74        routing: Arc<RwLock<RoutingTable>>,
75        group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
76    ) -> Self {
77        Self {
78            node_id,
79            lifecycle,
80            topology,
81            routing,
82            group_status,
83        }
84    }
85
86    /// Build a complete `ClusterInfoSnapshot` for rendering via
87    /// `/cluster/status` or `/metrics`.
88    pub fn snapshot(&self) -> ClusterInfoSnapshot {
89        let lifecycle = self.lifecycle.current();
90        let peers: Vec<PeerSnapshot> = {
91            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
92            topo.all_nodes()
93                .map(|n| PeerSnapshot {
94                    node_id: n.node_id,
95                    addr: n.addr.clone(),
96                    state: format!("{:?}", n.state),
97                })
98                .collect()
99        };
100
101        // Merge Raft group status (from the live RaftLoop) with the
102        // routing-table members/learners view. The routing table is
103        // authoritative for "who is supposed to be in this group";
104        // the RaftLoop is authoritative for "who is leader / what's
105        // the commit index right now".
106        let routing_snapshot: Vec<(u64, Vec<u64>, Vec<u64>)> = {
107            let rt = self.routing.read().unwrap_or_else(|p| p.into_inner());
108            rt.group_members()
109                .iter()
110                .map(|(&gid, info)| (gid, info.members.clone(), info.learners.clone()))
111                .collect()
112        };
113
114        let raft_groups = self.group_status.group_statuses();
115        let groups: Vec<GroupSnapshot> = raft_groups
116            .into_iter()
117            .map(|gs| {
118                let (members, learners) = routing_snapshot
119                    .iter()
120                    .find(|(gid, _, _)| *gid == gs.group_id)
121                    .map(|(_, m, l)| (m.clone(), l.clone()))
122                    .unwrap_or_default();
123                GroupSnapshot {
124                    group_id: gs.group_id,
125                    role: gs.role,
126                    leader_id: gs.leader_id,
127                    term: gs.term,
128                    commit_index: gs.commit_index,
129                    last_applied: gs.last_applied,
130                    member_count: gs.member_count,
131                    vshard_count: gs.vshard_count,
132                    members,
133                    learners,
134                }
135            })
136            .collect();
137
138        ClusterInfoSnapshot {
139            node_id: self.node_id,
140            lifecycle,
141            peers,
142            groups,
143        }
144    }
145}
146
147/// Serialisable snapshot of the full cluster observability surface.
148///
149/// This is the JSON shape returned by `GET /cluster/status` and the
150/// source-of-truth for the Prometheus gauges.
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct ClusterInfoSnapshot {
153    /// This node's id.
154    pub node_id: u64,
155    /// Current lifecycle phase.
156    pub lifecycle: ClusterLifecycleState,
157    /// Every peer known to this node.
158    pub peers: Vec<PeerSnapshot>,
159    /// Every Raft group hosted on this node.
160    pub groups: Vec<GroupSnapshot>,
161}
162
163impl ClusterInfoSnapshot {
164    /// Convenience accessor used by the metrics endpoint.
165    pub fn lifecycle_label(&self) -> &'static str {
166        self.lifecycle.label()
167    }
168
169    /// Number of peers in the topology snapshot. Drives the
170    /// `nodedb_cluster_members` Prometheus gauge.
171    pub fn members_count(&self) -> usize {
172        self.peers.len()
173    }
174
175    /// Number of Raft groups hosted locally.
176    pub fn groups_count(&self) -> usize {
177        self.groups.len()
178    }
179}
180
181/// One peer entry rendered in `/cluster/status`.
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct PeerSnapshot {
184    pub node_id: u64,
185    pub addr: String,
186    pub state: String,
187}
188
189/// One Raft group entry rendered in `/cluster/status`.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct GroupSnapshot {
192    pub group_id: u64,
193    pub role: String,
194    pub leader_id: u64,
195    pub term: u64,
196    pub commit_index: u64,
197    pub last_applied: u64,
198    pub member_count: usize,
199    pub vshard_count: usize,
200    pub members: Vec<u64>,
201    pub learners: Vec<u64>,
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use crate::topology::{NodeInfo, NodeState};
208
209    /// Fake provider that returns a canned list — lets us test the
210    /// snapshot builder end-to-end without spinning up a real
211    /// `MultiRaft`.
212    struct FakeProvider(Vec<GroupStatus>);
213    impl GroupStatusProvider for FakeProvider {
214        fn group_statuses(&self) -> Vec<GroupStatus> {
215            self.0.clone()
216        }
217    }
218
219    fn make_observer(
220        lifecycle: ClusterLifecycleTracker,
221        peers: Vec<NodeInfo>,
222        routing: RoutingTable,
223        raft_groups: Vec<GroupStatus>,
224    ) -> ClusterObserver {
225        let mut topology = ClusterTopology::new();
226        for p in peers {
227            topology.add_node(p);
228        }
229        ClusterObserver::new(
230            1,
231            lifecycle,
232            Arc::new(RwLock::new(topology)),
233            Arc::new(RwLock::new(routing)),
234            Arc::new(FakeProvider(raft_groups)),
235        )
236    }
237
238    fn gs(group_id: u64, role: &str, leader: u64) -> GroupStatus {
239        GroupStatus {
240            group_id,
241            role: role.into(),
242            leader_id: leader,
243            term: 1,
244            commit_index: 5,
245            last_applied: 5,
246            member_count: 3,
247            vshard_count: 512,
248        }
249    }
250
251    #[test]
252    fn snapshot_renders_full_state() {
253        let lifecycle = ClusterLifecycleTracker::new();
254        lifecycle.to_ready(3);
255
256        let peers = vec![
257            NodeInfo::new(1, "10.0.0.1:9400".parse().unwrap(), NodeState::Active),
258            NodeInfo::new(2, "10.0.0.2:9400".parse().unwrap(), NodeState::Active),
259            NodeInfo::new(3, "10.0.0.3:9400".parse().unwrap(), NodeState::Active),
260        ];
261
262        let mut routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
263        // Inject a learner to verify it lands in the snapshot.
264        routing.add_group_learner(0, 4);
265
266        let raft_groups = vec![gs(0, "Leader", 1), gs(1, "Follower", 2)];
267
268        let observer = make_observer(lifecycle, peers, routing, raft_groups);
269        let snap = observer.snapshot();
270
271        assert_eq!(snap.node_id, 1);
272        assert_eq!(snap.lifecycle_label(), "ready");
273        assert_eq!(snap.members_count(), 3);
274        assert_eq!(snap.groups_count(), 2);
275
276        // Group 0 should carry both members (1,2,3) and the injected
277        // learner (4).
278        let g0 = snap
279            .groups
280            .iter()
281            .find(|g| g.group_id == 0)
282            .expect("group 0 present");
283        assert_eq!(g0.role, "Leader");
284        assert_eq!(g0.leader_id, 1);
285        assert!(g0.members.contains(&1));
286        assert!(g0.learners.contains(&4));
287
288        // Peer snapshots preserve addresses.
289        let addrs: Vec<&str> = snap.peers.iter().map(|p| p.addr.as_str()).collect();
290        assert!(addrs.contains(&"10.0.0.1:9400"));
291        assert!(addrs.contains(&"10.0.0.3:9400"));
292    }
293
294    #[test]
295    fn snapshot_without_groups() {
296        // A node whose RaftLoop reports zero groups (not in cluster
297        // mode, but we got a stub observer). Topology still rendered.
298        let lifecycle = ClusterLifecycleTracker::new();
299        lifecycle.to_bootstrapping();
300        let peers = vec![NodeInfo::new(
301            1,
302            "127.0.0.1:9400".parse().unwrap(),
303            NodeState::Active,
304        )];
305        let routing = RoutingTable::uniform(1, &[1], 1);
306        let observer = make_observer(lifecycle, peers, routing, vec![]);
307
308        let snap = observer.snapshot();
309        assert_eq!(snap.lifecycle_label(), "bootstrapping");
310        assert_eq!(snap.members_count(), 1);
311        assert_eq!(snap.groups_count(), 0);
312    }
313
314    #[test]
315    fn snapshot_is_json_roundtrippable() {
316        // The shape is consumed by clients, so serde must round-trip
317        // losslessly for every variant. This keeps downstream tooling
318        // honest as fields are added.
319        let lifecycle = ClusterLifecycleTracker::new();
320        lifecycle.to_joining(2);
321        let peers = vec![NodeInfo::new(
322            1,
323            "127.0.0.1:9400".parse().unwrap(),
324            NodeState::Active,
325        )];
326        let routing = RoutingTable::uniform(1, &[1], 1);
327        let observer = make_observer(lifecycle, peers, routing, vec![gs(0, "Leader", 1)]);
328        let snap = observer.snapshot();
329
330        let json = serde_json::to_string(&snap).expect("serialize");
331        let back: ClusterInfoSnapshot = serde_json::from_str(&json).expect("deserialize");
332        assert_eq!(back.node_id, snap.node_id);
333        assert_eq!(back.peers.len(), 1);
334        assert_eq!(back.groups.len(), 1);
335        // Joining preserves the attempt field through serde.
336        match back.lifecycle {
337            ClusterLifecycleState::Joining { attempt } => assert_eq!(attempt, 2),
338            other => panic!("expected Joining, got {other:?}"),
339        }
340    }
341}