1use std::sync::{Arc, RwLock};
15
16use serde::{Deserialize, Serialize};
17
18use crate::forward::PlanExecutor;
19use crate::lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
20use crate::multi_raft::GroupStatus;
21use crate::raft_loop::{CommitApplier, RaftLoop};
22use crate::routing::RoutingTable;
23use crate::topology::ClusterTopology;
24
25pub trait GroupStatusProvider: Send + Sync {
32 fn group_statuses(&self) -> Vec<GroupStatus>;
34}
35
36impl<A, P> GroupStatusProvider for RaftLoop<A, P>
37where
38 A: CommitApplier,
39 P: PlanExecutor,
40{
41 fn group_statuses(&self) -> Vec<GroupStatus> {
42 RaftLoop::group_statuses(self)
43 }
44}
45
46pub struct ClusterObserver {
57 pub node_id: u64,
59 pub lifecycle: ClusterLifecycleTracker,
61 pub topology: Arc<RwLock<ClusterTopology>>,
63 pub routing: Arc<RwLock<RoutingTable>>,
65 pub group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
67}
68
69impl ClusterObserver {
70 pub fn new(
71 node_id: u64,
72 lifecycle: ClusterLifecycleTracker,
73 topology: Arc<RwLock<ClusterTopology>>,
74 routing: Arc<RwLock<RoutingTable>>,
75 group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
76 ) -> Self {
77 Self {
78 node_id,
79 lifecycle,
80 topology,
81 routing,
82 group_status,
83 }
84 }
85
86 pub fn snapshot(&self) -> ClusterInfoSnapshot {
89 let lifecycle = self.lifecycle.current();
90 let peers: Vec<PeerSnapshot> = {
91 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
92 topo.all_nodes()
93 .map(|n| PeerSnapshot {
94 node_id: n.node_id,
95 addr: n.addr.clone(),
96 state: format!("{:?}", n.state),
97 })
98 .collect()
99 };
100
101 let routing_snapshot: Vec<(u64, Vec<u64>, Vec<u64>)> = {
107 let rt = self.routing.read().unwrap_or_else(|p| p.into_inner());
108 rt.group_members()
109 .iter()
110 .map(|(&gid, info)| (gid, info.members.clone(), info.learners.clone()))
111 .collect()
112 };
113
114 let raft_groups = self.group_status.group_statuses();
115 let groups: Vec<GroupSnapshot> = raft_groups
116 .into_iter()
117 .map(|gs| {
118 let (members, learners) = routing_snapshot
119 .iter()
120 .find(|(gid, _, _)| *gid == gs.group_id)
121 .map(|(_, m, l)| (m.clone(), l.clone()))
122 .unwrap_or_default();
123 GroupSnapshot {
124 group_id: gs.group_id,
125 role: gs.role,
126 leader_id: gs.leader_id,
127 term: gs.term,
128 commit_index: gs.commit_index,
129 last_applied: gs.last_applied,
130 member_count: gs.member_count,
131 vshard_count: gs.vshard_count,
132 members,
133 learners,
134 }
135 })
136 .collect();
137
138 ClusterInfoSnapshot {
139 node_id: self.node_id,
140 lifecycle,
141 peers,
142 groups,
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct ClusterInfoSnapshot {
153 pub node_id: u64,
155 pub lifecycle: ClusterLifecycleState,
157 pub peers: Vec<PeerSnapshot>,
159 pub groups: Vec<GroupSnapshot>,
161}
162
163impl ClusterInfoSnapshot {
164 pub fn lifecycle_label(&self) -> &'static str {
166 self.lifecycle.label()
167 }
168
169 pub fn members_count(&self) -> usize {
172 self.peers.len()
173 }
174
175 pub fn groups_count(&self) -> usize {
177 self.groups.len()
178 }
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct PeerSnapshot {
184 pub node_id: u64,
185 pub addr: String,
186 pub state: String,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct GroupSnapshot {
192 pub group_id: u64,
193 pub role: String,
194 pub leader_id: u64,
195 pub term: u64,
196 pub commit_index: u64,
197 pub last_applied: u64,
198 pub member_count: usize,
199 pub vshard_count: usize,
200 pub members: Vec<u64>,
201 pub learners: Vec<u64>,
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::topology::{NodeInfo, NodeState};
208
209 struct FakeProvider(Vec<GroupStatus>);
213 impl GroupStatusProvider for FakeProvider {
214 fn group_statuses(&self) -> Vec<GroupStatus> {
215 self.0.clone()
216 }
217 }
218
219 fn make_observer(
220 lifecycle: ClusterLifecycleTracker,
221 peers: Vec<NodeInfo>,
222 routing: RoutingTable,
223 raft_groups: Vec<GroupStatus>,
224 ) -> ClusterObserver {
225 let mut topology = ClusterTopology::new();
226 for p in peers {
227 topology.add_node(p);
228 }
229 ClusterObserver::new(
230 1,
231 lifecycle,
232 Arc::new(RwLock::new(topology)),
233 Arc::new(RwLock::new(routing)),
234 Arc::new(FakeProvider(raft_groups)),
235 )
236 }
237
238 fn gs(group_id: u64, role: &str, leader: u64) -> GroupStatus {
239 GroupStatus {
240 group_id,
241 role: role.into(),
242 leader_id: leader,
243 term: 1,
244 commit_index: 5,
245 last_applied: 5,
246 member_count: 3,
247 vshard_count: 512,
248 }
249 }
250
251 #[test]
252 fn snapshot_renders_full_state() {
253 let lifecycle = ClusterLifecycleTracker::new();
254 lifecycle.to_ready(3);
255
256 let peers = vec![
257 NodeInfo::new(1, "10.0.0.1:9400".parse().unwrap(), NodeState::Active),
258 NodeInfo::new(2, "10.0.0.2:9400".parse().unwrap(), NodeState::Active),
259 NodeInfo::new(3, "10.0.0.3:9400".parse().unwrap(), NodeState::Active),
260 ];
261
262 let mut routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
263 routing.add_group_learner(0, 4);
265
266 let raft_groups = vec![gs(0, "Leader", 1), gs(1, "Follower", 2)];
267
268 let observer = make_observer(lifecycle, peers, routing, raft_groups);
269 let snap = observer.snapshot();
270
271 assert_eq!(snap.node_id, 1);
272 assert_eq!(snap.lifecycle_label(), "ready");
273 assert_eq!(snap.members_count(), 3);
274 assert_eq!(snap.groups_count(), 2);
275
276 let g0 = snap
279 .groups
280 .iter()
281 .find(|g| g.group_id == 0)
282 .expect("group 0 present");
283 assert_eq!(g0.role, "Leader");
284 assert_eq!(g0.leader_id, 1);
285 assert!(g0.members.contains(&1));
286 assert!(g0.learners.contains(&4));
287
288 let addrs: Vec<&str> = snap.peers.iter().map(|p| p.addr.as_str()).collect();
290 assert!(addrs.contains(&"10.0.0.1:9400"));
291 assert!(addrs.contains(&"10.0.0.3:9400"));
292 }
293
294 #[test]
295 fn snapshot_without_groups() {
296 let lifecycle = ClusterLifecycleTracker::new();
299 lifecycle.to_bootstrapping();
300 let peers = vec![NodeInfo::new(
301 1,
302 "127.0.0.1:9400".parse().unwrap(),
303 NodeState::Active,
304 )];
305 let routing = RoutingTable::uniform(1, &[1], 1);
306 let observer = make_observer(lifecycle, peers, routing, vec![]);
307
308 let snap = observer.snapshot();
309 assert_eq!(snap.lifecycle_label(), "bootstrapping");
310 assert_eq!(snap.members_count(), 1);
311 assert_eq!(snap.groups_count(), 0);
312 }
313
314 #[test]
315 fn snapshot_is_json_roundtrippable() {
316 let lifecycle = ClusterLifecycleTracker::new();
320 lifecycle.to_joining(2);
321 let peers = vec![NodeInfo::new(
322 1,
323 "127.0.0.1:9400".parse().unwrap(),
324 NodeState::Active,
325 )];
326 let routing = RoutingTable::uniform(1, &[1], 1);
327 let observer = make_observer(lifecycle, peers, routing, vec![gs(0, "Leader", 1)]);
328 let snap = observer.snapshot();
329
330 let json = serde_json::to_string(&snap).expect("serialize");
331 let back: ClusterInfoSnapshot = serde_json::from_str(&json).expect("deserialize");
332 assert_eq!(back.node_id, snap.node_id);
333 assert_eq!(back.peers.len(), 1);
334 assert_eq!(back.groups.len(), 1);
335 match back.lifecycle {
337 ClusterLifecycleState::Joining { attempt } => assert_eq!(attempt, 2),
338 other => panic!("expected Joining, got {other:?}"),
339 }
340 }
341}