1use crate::messages::ProtocolMessage;
2use crate::{NodeId, Result};
3use async_trait::async_trait;
4use std::collections::HashSet;
5
6#[derive(Debug, Clone)]
7pub struct ClusterConfig {
8 pub node_id: NodeId,
9 pub all_nodes: HashSet<NodeId>,
10 pub quorum_size: usize,
11}
12
13impl ClusterConfig {
14 pub fn new(node_id: NodeId, all_nodes: HashSet<NodeId>) -> Self {
15 let quorum_size = (all_nodes.len() / 2) + 1;
16 Self {
17 node_id,
18 all_nodes,
19 quorum_size,
20 }
21 }
22
23 pub fn has_quorum(&self, active_nodes: &HashSet<NodeId>) -> bool {
24 active_nodes.len() >= self.quorum_size
25 }
26
27 pub fn is_majority(&self, count: usize) -> bool {
28 count >= self.quorum_size
29 }
30
31 pub fn total_nodes(&self) -> usize {
32 self.all_nodes.len()
33 }
34}
35
36#[async_trait]
37pub trait NetworkTransport: Send + Sync {
38 async fn send_to(&self, target: NodeId, message: ProtocolMessage) -> Result<()>;
39
40 async fn broadcast(&self, message: ProtocolMessage, exclude: Option<NodeId>) -> Result<()>;
41
42 async fn receive(&mut self) -> Result<(NodeId, ProtocolMessage)>;
43
44 async fn get_connected_nodes(&self) -> Result<HashSet<NodeId>>;
45
46 async fn is_connected(&self, node_id: NodeId) -> Result<bool>;
47
48 async fn disconnect(&mut self) -> Result<()>;
49
50 async fn reconnect(&mut self) -> Result<()>;
51}
52
53#[async_trait]
54pub trait NetworkEventHandler: Send + Sync {
55 async fn on_node_connected(&self, node_id: NodeId);
56
57 async fn on_node_disconnected(&self, node_id: NodeId);
58
59 async fn on_network_partition(&self, active_nodes: HashSet<NodeId>);
60
61 async fn on_quorum_lost(&self);
62
63 async fn on_quorum_restored(&self, active_nodes: HashSet<NodeId>);
64}
65
66pub struct NetworkMonitor {
67 config: ClusterConfig,
68 connected_nodes: HashSet<NodeId>,
69 has_quorum: bool,
70}
71
72impl NetworkMonitor {
73 pub fn new(config: ClusterConfig) -> Self {
74 let connected_nodes = config.all_nodes.clone();
75 let has_quorum = config.has_quorum(&connected_nodes);
76
77 Self {
78 config,
79 connected_nodes,
80 has_quorum,
81 }
82 }
83
84 pub fn update_connected_nodes(&mut self, nodes: HashSet<NodeId>) -> Vec<NetworkEvent> {
85 let mut events = Vec::new();
86
87 let newly_connected: HashSet<_> =
88 nodes.difference(&self.connected_nodes).copied().collect();
89 let newly_disconnected: HashSet<_> =
90 self.connected_nodes.difference(&nodes).copied().collect();
91
92 for &node in &newly_connected {
93 events.push(NetworkEvent::NodeConnected(node));
94 }
95
96 for &node in &newly_disconnected {
97 events.push(NetworkEvent::NodeDisconnected(node));
98 }
99
100 let new_has_quorum = self.config.has_quorum(&nodes);
101
102 if self.has_quorum && !new_has_quorum {
103 events.push(NetworkEvent::QuorumLost);
104 } else if !self.has_quorum && new_has_quorum {
105 events.push(NetworkEvent::QuorumRestored(nodes.clone()));
106 }
107
108 if !newly_connected.is_empty() || !newly_disconnected.is_empty() {
109 events.push(NetworkEvent::NetworkPartition(nodes.clone()));
110 }
111
112 self.connected_nodes = nodes;
113 self.has_quorum = new_has_quorum;
114
115 events
116 }
117
118 pub fn has_quorum(&self) -> bool {
119 self.has_quorum
120 }
121
122 pub fn connected_nodes(&self) -> &HashSet<NodeId> {
123 &self.connected_nodes
124 }
125
126 pub fn quorum_size(&self) -> usize {
127 self.config.quorum_size
128 }
129}
130
131#[derive(Debug, Clone)]
132pub enum NetworkEvent {
133 NodeConnected(NodeId),
134 NodeDisconnected(NodeId),
135 NetworkPartition(HashSet<NodeId>),
136 QuorumLost,
137 QuorumRestored(HashSet<NodeId>),
138}