Skip to main content

nodedb_cluster/
cluster_info.rs

1//! Observability view of the cluster — snapshot types, trait for
2//! querying per-group Raft status, and the `ClusterObserver` handle
3//! bundled into `SharedState` for HTTP / metrics readers.
4//!
5//! The split between `ClusterObserver` (operational aggregation) and
6//! the individual sources it pulls from (`ClusterTopology`,
7//! `RoutingTable`, `ClusterLifecycleTracker`, `GroupStatusProvider`)
8//! keeps every consumer parameter-free: an HTTP handler takes a
9//! single `Arc<ClusterObserver>` and gets a complete, serialisable
10//! snapshot of everything the cluster surface exposes.
11
12use std::sync::{Arc, RwLock};
13
14use serde::{Deserialize, Serialize};
15
16use crate::forward::RequestForwarder;
17use crate::lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
18use crate::multi_raft::GroupStatus;
19use crate::raft_loop::{CommitApplier, RaftLoop};
20use crate::routing::RoutingTable;
21use crate::topology::ClusterTopology;
22
23/// Read-only accessor for per-group Raft status.
24///
25/// Implemented for every `RaftLoop` via a blanket impl so the main
26/// binary can coerce `Arc<RaftLoop<...>>` to `Arc<dyn
27/// GroupStatusProvider + Send + Sync>` without thinking about the
28/// `CommitApplier` / `RequestForwarder` type parameters.
29pub trait GroupStatusProvider: Send + Sync {
30    /// Current status of every Raft group hosted on this node.
31    fn group_statuses(&self) -> Vec<GroupStatus>;
32}
33
34impl<A, F> GroupStatusProvider for RaftLoop<A, F>
35where
36    A: CommitApplier,
37    F: RequestForwarder,
38{
39    fn group_statuses(&self) -> Vec<GroupStatus> {
40        RaftLoop::group_statuses(self)
41    }
42}
43
44/// Aggregated observability handle for the cluster.
45///
46/// Stored in `SharedState::cluster_observer` as an
47/// `Arc<ClusterObserver>` so HTTP route handlers and the metrics
48/// endpoint can build snapshots without threading four separate
49/// handles through every call.
50///
51/// Construction is done exactly once — after `start_cluster` has
52/// returned and `start_raft` has built the `RaftLoop`. See
53/// `nodedb::control::cluster::start_raft` for the wiring.
54pub struct ClusterObserver {
55    /// This node's id.
56    pub node_id: u64,
57    /// Lifecycle phase tracker (shared with `start_cluster`).
58    pub lifecycle: ClusterLifecycleTracker,
59    /// Shared cluster topology.
60    pub topology: Arc<RwLock<ClusterTopology>>,
61    /// Shared routing table.
62    pub routing: Arc<RwLock<RoutingTable>>,
63    /// Type-erased per-group status provider backed by `RaftLoop`.
64    pub group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
65}
66
67impl ClusterObserver {
68    pub fn new(
69        node_id: u64,
70        lifecycle: ClusterLifecycleTracker,
71        topology: Arc<RwLock<ClusterTopology>>,
72        routing: Arc<RwLock<RoutingTable>>,
73        group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
74    ) -> Self {
75        Self {
76            node_id,
77            lifecycle,
78            topology,
79            routing,
80            group_status,
81        }
82    }
83
84    /// Build a complete `ClusterInfoSnapshot` for rendering via
85    /// `/cluster/status` or `/metrics`.
86    pub fn snapshot(&self) -> ClusterInfoSnapshot {
87        let lifecycle = self.lifecycle.current();
88        let peers: Vec<PeerSnapshot> = {
89            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
90            topo.all_nodes()
91                .map(|n| PeerSnapshot {
92                    node_id: n.node_id,
93                    addr: n.addr.clone(),
94                    state: format!("{:?}", n.state),
95                })
96                .collect()
97        };
98
99        // Merge Raft group status (from the live RaftLoop) with the
100        // routing-table members/learners view. The routing table is
101        // authoritative for "who is supposed to be in this group";
102        // the RaftLoop is authoritative for "who is leader / what's
103        // the commit index right now".
104        let routing_snapshot: Vec<(u64, Vec<u64>, Vec<u64>)> = {
105            let rt = self.routing.read().unwrap_or_else(|p| p.into_inner());
106            rt.group_members()
107                .iter()
108                .map(|(&gid, info)| (gid, info.members.clone(), info.learners.clone()))
109                .collect()
110        };
111
112        let raft_groups = self.group_status.group_statuses();
113        let groups: Vec<GroupSnapshot> = raft_groups
114            .into_iter()
115            .map(|gs| {
116                let (members, learners) = routing_snapshot
117                    .iter()
118                    .find(|(gid, _, _)| *gid == gs.group_id)
119                    .map(|(_, m, l)| (m.clone(), l.clone()))
120                    .unwrap_or_default();
121                GroupSnapshot {
122                    group_id: gs.group_id,
123                    role: gs.role,
124                    leader_id: gs.leader_id,
125                    term: gs.term,
126                    commit_index: gs.commit_index,
127                    last_applied: gs.last_applied,
128                    member_count: gs.member_count,
129                    vshard_count: gs.vshard_count,
130                    members,
131                    learners,
132                }
133            })
134            .collect();
135
136        ClusterInfoSnapshot {
137            node_id: self.node_id,
138            lifecycle,
139            peers,
140            groups,
141        }
142    }
143}
144
145/// Serialisable snapshot of the full cluster observability surface.
146///
147/// This is the JSON shape returned by `GET /cluster/status` and the
148/// source-of-truth for the Prometheus gauges.
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ClusterInfoSnapshot {
151    /// This node's id.
152    pub node_id: u64,
153    /// Current lifecycle phase.
154    pub lifecycle: ClusterLifecycleState,
155    /// Every peer known to this node.
156    pub peers: Vec<PeerSnapshot>,
157    /// Every Raft group hosted on this node.
158    pub groups: Vec<GroupSnapshot>,
159}
160
161impl ClusterInfoSnapshot {
162    /// Convenience accessor used by the metrics endpoint.
163    pub fn lifecycle_label(&self) -> &'static str {
164        self.lifecycle.label()
165    }
166
167    /// Number of peers in the topology snapshot. Drives the
168    /// `nodedb_cluster_members` Prometheus gauge.
169    pub fn members_count(&self) -> usize {
170        self.peers.len()
171    }
172
173    /// Number of Raft groups hosted locally.
174    pub fn groups_count(&self) -> usize {
175        self.groups.len()
176    }
177}
178
179/// One peer entry rendered in `/cluster/status`.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct PeerSnapshot {
182    pub node_id: u64,
183    pub addr: String,
184    pub state: String,
185}
186
187/// One Raft group entry rendered in `/cluster/status`.
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct GroupSnapshot {
190    pub group_id: u64,
191    pub role: String,
192    pub leader_id: u64,
193    pub term: u64,
194    pub commit_index: u64,
195    pub last_applied: u64,
196    pub member_count: usize,
197    pub vshard_count: usize,
198    pub members: Vec<u64>,
199    pub learners: Vec<u64>,
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::topology::{NodeInfo, NodeState};
206
207    /// Fake provider that returns a canned list — lets us test the
208    /// snapshot builder end-to-end without spinning up a real
209    /// `MultiRaft`.
210    struct FakeProvider(Vec<GroupStatus>);
211    impl GroupStatusProvider for FakeProvider {
212        fn group_statuses(&self) -> Vec<GroupStatus> {
213            self.0.clone()
214        }
215    }
216
217    fn make_observer(
218        lifecycle: ClusterLifecycleTracker,
219        peers: Vec<NodeInfo>,
220        routing: RoutingTable,
221        raft_groups: Vec<GroupStatus>,
222    ) -> ClusterObserver {
223        let mut topology = ClusterTopology::new();
224        for p in peers {
225            topology.add_node(p);
226        }
227        ClusterObserver::new(
228            1,
229            lifecycle,
230            Arc::new(RwLock::new(topology)),
231            Arc::new(RwLock::new(routing)),
232            Arc::new(FakeProvider(raft_groups)),
233        )
234    }
235
236    fn gs(group_id: u64, role: &str, leader: u64) -> GroupStatus {
237        GroupStatus {
238            group_id,
239            role: role.into(),
240            leader_id: leader,
241            term: 1,
242            commit_index: 5,
243            last_applied: 5,
244            member_count: 3,
245            vshard_count: 512,
246        }
247    }
248
249    #[test]
250    fn snapshot_renders_full_state() {
251        let lifecycle = ClusterLifecycleTracker::new();
252        lifecycle.to_ready(3);
253
254        let peers = vec![
255            NodeInfo::new(1, "10.0.0.1:9400".parse().unwrap(), NodeState::Active),
256            NodeInfo::new(2, "10.0.0.2:9400".parse().unwrap(), NodeState::Active),
257            NodeInfo::new(3, "10.0.0.3:9400".parse().unwrap(), NodeState::Active),
258        ];
259
260        let mut routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
261        // Inject a learner to verify it lands in the snapshot.
262        routing.add_group_learner(0, 4);
263
264        let raft_groups = vec![gs(0, "Leader", 1), gs(1, "Follower", 2)];
265
266        let observer = make_observer(lifecycle, peers, routing, raft_groups);
267        let snap = observer.snapshot();
268
269        assert_eq!(snap.node_id, 1);
270        assert_eq!(snap.lifecycle_label(), "ready");
271        assert_eq!(snap.members_count(), 3);
272        assert_eq!(snap.groups_count(), 2);
273
274        // Group 0 should carry both members (1,2,3) and the injected
275        // learner (4).
276        let g0 = snap
277            .groups
278            .iter()
279            .find(|g| g.group_id == 0)
280            .expect("group 0 present");
281        assert_eq!(g0.role, "Leader");
282        assert_eq!(g0.leader_id, 1);
283        assert!(g0.members.contains(&1));
284        assert!(g0.learners.contains(&4));
285
286        // Peer snapshots preserve addresses.
287        let addrs: Vec<&str> = snap.peers.iter().map(|p| p.addr.as_str()).collect();
288        assert!(addrs.contains(&"10.0.0.1:9400"));
289        assert!(addrs.contains(&"10.0.0.3:9400"));
290    }
291
292    #[test]
293    fn snapshot_without_groups() {
294        // A node whose RaftLoop reports zero groups (not in cluster
295        // mode, but we got a stub observer). Topology still rendered.
296        let lifecycle = ClusterLifecycleTracker::new();
297        lifecycle.to_bootstrapping();
298        let peers = vec![NodeInfo::new(
299            1,
300            "127.0.0.1:9400".parse().unwrap(),
301            NodeState::Active,
302        )];
303        let routing = RoutingTable::uniform(1, &[1], 1);
304        let observer = make_observer(lifecycle, peers, routing, vec![]);
305
306        let snap = observer.snapshot();
307        assert_eq!(snap.lifecycle_label(), "bootstrapping");
308        assert_eq!(snap.members_count(), 1);
309        assert_eq!(snap.groups_count(), 0);
310    }
311
312    #[test]
313    fn snapshot_is_json_roundtrippable() {
314        // The shape is consumed by clients, so serde must round-trip
315        // losslessly for every variant. This keeps downstream tooling
316        // honest as fields are added.
317        let lifecycle = ClusterLifecycleTracker::new();
318        lifecycle.to_joining(2);
319        let peers = vec![NodeInfo::new(
320            1,
321            "127.0.0.1:9400".parse().unwrap(),
322            NodeState::Active,
323        )];
324        let routing = RoutingTable::uniform(1, &[1], 1);
325        let observer = make_observer(lifecycle, peers, routing, vec![gs(0, "Leader", 1)]);
326        let snap = observer.snapshot();
327
328        let json = serde_json::to_string(&snap).expect("serialize");
329        let back: ClusterInfoSnapshot = serde_json::from_str(&json).expect("deserialize");
330        assert_eq!(back.node_id, snap.node_id);
331        assert_eq!(back.peers.len(), 1);
332        assert_eq!(back.groups.len(), 1);
333        // Joining preserves the attempt field through serde.
334        match back.lifecycle {
335            ClusterLifecycleState::Joining { attempt } => assert_eq!(attempt, 2),
336            other => panic!("expected Joining, got {other:?}"),
337        }
338    }
339}