Skip to main content

rabia_core/
network.rs

1use crate::messages::ProtocolMessage;
2use crate::{NodeId, Result};
3use async_trait::async_trait;
4use std::collections::HashSet;
5
6#[derive(Debug, Clone)]
7pub struct ClusterConfig {
8    pub node_id: NodeId,
9    pub all_nodes: HashSet<NodeId>,
10    pub quorum_size: usize,
11}
12
13impl ClusterConfig {
14    pub fn new(node_id: NodeId, all_nodes: HashSet<NodeId>) -> Self {
15        let quorum_size = (all_nodes.len() / 2) + 1;
16        Self {
17            node_id,
18            all_nodes,
19            quorum_size,
20        }
21    }
22
23    pub fn has_quorum(&self, active_nodes: &HashSet<NodeId>) -> bool {
24        active_nodes.len() >= self.quorum_size
25    }
26
27    pub fn is_majority(&self, count: usize) -> bool {
28        count >= self.quorum_size
29    }
30
31    pub fn total_nodes(&self) -> usize {
32        self.all_nodes.len()
33    }
34}
35
36#[async_trait]
37pub trait NetworkTransport: Send + Sync {
38    async fn send_to(&self, target: NodeId, message: ProtocolMessage) -> Result<()>;
39
40    async fn broadcast(&self, message: ProtocolMessage, exclude: Option<NodeId>) -> Result<()>;
41
42    async fn receive(&mut self) -> Result<(NodeId, ProtocolMessage)>;
43
44    async fn get_connected_nodes(&self) -> Result<HashSet<NodeId>>;
45
46    async fn is_connected(&self, node_id: NodeId) -> Result<bool>;
47
48    async fn disconnect(&mut self) -> Result<()>;
49
50    async fn reconnect(&mut self) -> Result<()>;
51}
52
53#[async_trait]
54pub trait NetworkEventHandler: Send + Sync {
55    async fn on_node_connected(&self, node_id: NodeId);
56
57    async fn on_node_disconnected(&self, node_id: NodeId);
58
59    async fn on_network_partition(&self, active_nodes: HashSet<NodeId>);
60
61    async fn on_quorum_lost(&self);
62
63    async fn on_quorum_restored(&self, active_nodes: HashSet<NodeId>);
64}
65
66pub struct NetworkMonitor {
67    config: ClusterConfig,
68    connected_nodes: HashSet<NodeId>,
69    has_quorum: bool,
70}
71
72impl NetworkMonitor {
73    pub fn new(config: ClusterConfig) -> Self {
74        let connected_nodes = config.all_nodes.clone();
75        let has_quorum = config.has_quorum(&connected_nodes);
76
77        Self {
78            config,
79            connected_nodes,
80            has_quorum,
81        }
82    }
83
84    pub fn update_connected_nodes(&mut self, nodes: HashSet<NodeId>) -> Vec<NetworkEvent> {
85        let mut events = Vec::new();
86
87        let newly_connected: HashSet<_> =
88            nodes.difference(&self.connected_nodes).copied().collect();
89        let newly_disconnected: HashSet<_> =
90            self.connected_nodes.difference(&nodes).copied().collect();
91
92        for &node in &newly_connected {
93            events.push(NetworkEvent::NodeConnected(node));
94        }
95
96        for &node in &newly_disconnected {
97            events.push(NetworkEvent::NodeDisconnected(node));
98        }
99
100        let new_has_quorum = self.config.has_quorum(&nodes);
101
102        if self.has_quorum && !new_has_quorum {
103            events.push(NetworkEvent::QuorumLost);
104        } else if !self.has_quorum && new_has_quorum {
105            events.push(NetworkEvent::QuorumRestored(nodes.clone()));
106        }
107
108        if !newly_connected.is_empty() || !newly_disconnected.is_empty() {
109            events.push(NetworkEvent::NetworkPartition(nodes.clone()));
110        }
111
112        self.connected_nodes = nodes;
113        self.has_quorum = new_has_quorum;
114
115        events
116    }
117
118    pub fn has_quorum(&self) -> bool {
119        self.has_quorum
120    }
121
122    pub fn connected_nodes(&self) -> &HashSet<NodeId> {
123        &self.connected_nodes
124    }
125
126    pub fn quorum_size(&self) -> usize {
127        self.config.quorum_size
128    }
129}
130
131#[derive(Debug, Clone)]
132pub enum NetworkEvent {
133    NodeConnected(NodeId),
134    NodeDisconnected(NodeId),
135    NetworkPartition(HashSet<NodeId>),
136    QuorumLost,
137    QuorumRestored(HashSet<NodeId>),
138}