1use std::sync::{Arc, RwLock};
13
14use serde::{Deserialize, Serialize};
15
16use crate::forward::PlanExecutor;
17use crate::lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
18use crate::multi_raft::GroupStatus;
19use crate::raft_loop::{CommitApplier, RaftLoop};
20use crate::routing::RoutingTable;
21use crate::topology::ClusterTopology;
22
23pub trait GroupStatusProvider: Send + Sync {
30 fn group_statuses(&self) -> Vec<GroupStatus>;
32}
33
34impl<A, P> GroupStatusProvider for RaftLoop<A, P>
35where
36 A: CommitApplier,
37 P: PlanExecutor,
38{
39 fn group_statuses(&self) -> Vec<GroupStatus> {
40 RaftLoop::group_statuses(self)
41 }
42}
43
44pub struct ClusterObserver {
55 pub node_id: u64,
57 pub lifecycle: ClusterLifecycleTracker,
59 pub topology: Arc<RwLock<ClusterTopology>>,
61 pub routing: Arc<RwLock<RoutingTable>>,
63 pub group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
65}
66
67impl ClusterObserver {
68 pub fn new(
69 node_id: u64,
70 lifecycle: ClusterLifecycleTracker,
71 topology: Arc<RwLock<ClusterTopology>>,
72 routing: Arc<RwLock<RoutingTable>>,
73 group_status: Arc<dyn GroupStatusProvider + Send + Sync>,
74 ) -> Self {
75 Self {
76 node_id,
77 lifecycle,
78 topology,
79 routing,
80 group_status,
81 }
82 }
83
84 pub fn snapshot(&self) -> ClusterInfoSnapshot {
87 let lifecycle = self.lifecycle.current();
88 let peers: Vec<PeerSnapshot> = {
89 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
90 topo.all_nodes()
91 .map(|n| PeerSnapshot {
92 node_id: n.node_id,
93 addr: n.addr.clone(),
94 state: format!("{:?}", n.state),
95 })
96 .collect()
97 };
98
99 let routing_snapshot: Vec<(u64, Vec<u64>, Vec<u64>)> = {
105 let rt = self.routing.read().unwrap_or_else(|p| p.into_inner());
106 rt.group_members()
107 .iter()
108 .map(|(&gid, info)| (gid, info.members.clone(), info.learners.clone()))
109 .collect()
110 };
111
112 let raft_groups = self.group_status.group_statuses();
113 let groups: Vec<GroupSnapshot> = raft_groups
114 .into_iter()
115 .map(|gs| {
116 let (members, learners) = routing_snapshot
117 .iter()
118 .find(|(gid, _, _)| *gid == gs.group_id)
119 .map(|(_, m, l)| (m.clone(), l.clone()))
120 .unwrap_or_default();
121 GroupSnapshot {
122 group_id: gs.group_id,
123 role: gs.role,
124 leader_id: gs.leader_id,
125 term: gs.term,
126 commit_index: gs.commit_index,
127 last_applied: gs.last_applied,
128 member_count: gs.member_count,
129 vshard_count: gs.vshard_count,
130 members,
131 learners,
132 }
133 })
134 .collect();
135
136 ClusterInfoSnapshot {
137 node_id: self.node_id,
138 lifecycle,
139 peers,
140 groups,
141 }
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ClusterInfoSnapshot {
151 pub node_id: u64,
153 pub lifecycle: ClusterLifecycleState,
155 pub peers: Vec<PeerSnapshot>,
157 pub groups: Vec<GroupSnapshot>,
159}
160
161impl ClusterInfoSnapshot {
162 pub fn lifecycle_label(&self) -> &'static str {
164 self.lifecycle.label()
165 }
166
167 pub fn members_count(&self) -> usize {
170 self.peers.len()
171 }
172
173 pub fn groups_count(&self) -> usize {
175 self.groups.len()
176 }
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct PeerSnapshot {
182 pub node_id: u64,
183 pub addr: String,
184 pub state: String,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct GroupSnapshot {
190 pub group_id: u64,
191 pub role: String,
192 pub leader_id: u64,
193 pub term: u64,
194 pub commit_index: u64,
195 pub last_applied: u64,
196 pub member_count: usize,
197 pub vshard_count: usize,
198 pub members: Vec<u64>,
199 pub learners: Vec<u64>,
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::topology::{NodeInfo, NodeState};
206
207 struct FakeProvider(Vec<GroupStatus>);
211 impl GroupStatusProvider for FakeProvider {
212 fn group_statuses(&self) -> Vec<GroupStatus> {
213 self.0.clone()
214 }
215 }
216
217 fn make_observer(
218 lifecycle: ClusterLifecycleTracker,
219 peers: Vec<NodeInfo>,
220 routing: RoutingTable,
221 raft_groups: Vec<GroupStatus>,
222 ) -> ClusterObserver {
223 let mut topology = ClusterTopology::new();
224 for p in peers {
225 topology.add_node(p);
226 }
227 ClusterObserver::new(
228 1,
229 lifecycle,
230 Arc::new(RwLock::new(topology)),
231 Arc::new(RwLock::new(routing)),
232 Arc::new(FakeProvider(raft_groups)),
233 )
234 }
235
236 fn gs(group_id: u64, role: &str, leader: u64) -> GroupStatus {
237 GroupStatus {
238 group_id,
239 role: role.into(),
240 leader_id: leader,
241 term: 1,
242 commit_index: 5,
243 last_applied: 5,
244 member_count: 3,
245 vshard_count: 512,
246 }
247 }
248
249 #[test]
250 fn snapshot_renders_full_state() {
251 let lifecycle = ClusterLifecycleTracker::new();
252 lifecycle.to_ready(3);
253
254 let peers = vec![
255 NodeInfo::new(1, "10.0.0.1:9400".parse().unwrap(), NodeState::Active),
256 NodeInfo::new(2, "10.0.0.2:9400".parse().unwrap(), NodeState::Active),
257 NodeInfo::new(3, "10.0.0.3:9400".parse().unwrap(), NodeState::Active),
258 ];
259
260 let mut routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
261 routing.add_group_learner(0, 4);
263
264 let raft_groups = vec![gs(0, "Leader", 1), gs(1, "Follower", 2)];
265
266 let observer = make_observer(lifecycle, peers, routing, raft_groups);
267 let snap = observer.snapshot();
268
269 assert_eq!(snap.node_id, 1);
270 assert_eq!(snap.lifecycle_label(), "ready");
271 assert_eq!(snap.members_count(), 3);
272 assert_eq!(snap.groups_count(), 2);
273
274 let g0 = snap
277 .groups
278 .iter()
279 .find(|g| g.group_id == 0)
280 .expect("group 0 present");
281 assert_eq!(g0.role, "Leader");
282 assert_eq!(g0.leader_id, 1);
283 assert!(g0.members.contains(&1));
284 assert!(g0.learners.contains(&4));
285
286 let addrs: Vec<&str> = snap.peers.iter().map(|p| p.addr.as_str()).collect();
288 assert!(addrs.contains(&"10.0.0.1:9400"));
289 assert!(addrs.contains(&"10.0.0.3:9400"));
290 }
291
292 #[test]
293 fn snapshot_without_groups() {
294 let lifecycle = ClusterLifecycleTracker::new();
297 lifecycle.to_bootstrapping();
298 let peers = vec![NodeInfo::new(
299 1,
300 "127.0.0.1:9400".parse().unwrap(),
301 NodeState::Active,
302 )];
303 let routing = RoutingTable::uniform(1, &[1], 1);
304 let observer = make_observer(lifecycle, peers, routing, vec![]);
305
306 let snap = observer.snapshot();
307 assert_eq!(snap.lifecycle_label(), "bootstrapping");
308 assert_eq!(snap.members_count(), 1);
309 assert_eq!(snap.groups_count(), 0);
310 }
311
312 #[test]
313 fn snapshot_is_json_roundtrippable() {
314 let lifecycle = ClusterLifecycleTracker::new();
318 lifecycle.to_joining(2);
319 let peers = vec![NodeInfo::new(
320 1,
321 "127.0.0.1:9400".parse().unwrap(),
322 NodeState::Active,
323 )];
324 let routing = RoutingTable::uniform(1, &[1], 1);
325 let observer = make_observer(lifecycle, peers, routing, vec![gs(0, "Leader", 1)]);
326 let snap = observer.snapshot();
327
328 let json = serde_json::to_string(&snap).expect("serialize");
329 let back: ClusterInfoSnapshot = serde_json::from_str(&json).expect("deserialize");
330 assert_eq!(back.node_id, snap.node_id);
331 assert_eq!(back.peers.len(), 1);
332 assert_eq!(back.groups.len(), 1);
333 match back.lifecycle {
335 ClusterLifecycleState::Joining { attempt } => assert_eq!(attempt, 2),
336 other => panic!("expected Joining, got {other:?}"),
337 }
338 }
339}