1use std::collections::HashMap;
6use std::time::Instant;
7
8pub type ConnectionIndex = u8;
9
10const ILLEGAL_CONNECTION_INDEX: u8 = 0xff;
11
12pub type Knowledge = u64;
13
14pub type Term = u16;
15
16pub struct RateMetrics {
17 count: u32,
18 last_calculated_at: Instant,
19}
20
21impl RateMetrics {
22 fn new(time: Instant) -> Self {
23 Self {
24 count: 0,
25 last_calculated_at: time,
26 }
27 }
28
29 fn increment(&mut self) {
30 self.count += 1;
31 }
32
33 fn has_enough_time_passed(&self, time: Instant) -> bool {
34 (time - self.last_calculated_at).as_millis() > 200
35 }
36
37 fn calculate_rate(&mut self, time: Instant) -> f32 {
38 let elapsed_time = time - self.last_calculated_at;
39 let milliseconds = elapsed_time.as_secs() as f32 * 1000.0
40 + elapsed_time.subsec_nanos() as f32 / 1_000_000.0;
41
42 let rate = if milliseconds > 0.0 {
43 self.count as f32 / milliseconds
44 } else {
45 0.0
46 };
47
48 self.count = 0;
50 self.last_calculated_at = time;
51
52 rate
53 }
54}
55
56#[derive(Debug, Clone, Eq, PartialEq)]
57pub enum QualityAssessment {
58 NeedMoreTimeOrInformation,
59 RecommendDisconnect,
60 Acceptable,
61 Good,
62}
63
64pub struct ConnectionQuality {
65 pub last_ping_at: Instant,
66 pub pings_per_second: RateMetrics,
67 pub assessment: QualityAssessment,
68 threshold: f32,
69}
70
71impl ConnectionQuality {
72 pub fn new(threshold: f32, time: Instant) -> Self {
73 Self {
74 assessment: QualityAssessment::NeedMoreTimeOrInformation,
75 last_ping_at: Instant::now(),
76 pings_per_second: RateMetrics::new(time),
77 threshold,
78 }
79 }
80
81 pub fn on_ping(&mut self, time: Instant) {
82 self.last_ping_at = time;
83 self.pings_per_second.increment();
84 }
85
86 pub fn update(&mut self, time: Instant) {
87 if !self.pings_per_second.has_enough_time_passed(time) {
88 self.assessment = QualityAssessment::NeedMoreTimeOrInformation;
89 } else {
90 let pings_per_second = self.pings_per_second.calculate_rate(time);
91 self.assessment = if pings_per_second < self.threshold {
92 QualityAssessment::RecommendDisconnect
93 } else if pings_per_second > self.threshold * 2.0 {
94 QualityAssessment::Good
95 } else {
96 QualityAssessment::Acceptable
97 };
98 }
99 }
100}
101
102pub enum ConnectionState {
103 Online,
104 Disconnected,
105}
106
107pub struct Connection {
108 pub id: ConnectionIndex,
109 pub quality: ConnectionQuality,
110 pub knowledge: Knowledge,
111 pub state: ConnectionState,
112 pub last_reported_term: Term,
113 pub has_connection_host: bool,
114}
115
116const PINGS_PER_SECOND_THRESHOLD: f32 = 10.0;
117
118impl Connection {
119 fn new(connection_id: ConnectionIndex, term: Term, time: Instant) -> Self {
120 Connection {
121 has_connection_host: false,
122 last_reported_term: term,
123 id: connection_id,
124 quality: ConnectionQuality::new(PINGS_PER_SECOND_THRESHOLD, time),
125 knowledge: 0,
126 state: ConnectionState::Online,
127 }
128 }
129
130 fn on_ping(
131 &mut self,
132 term: Term,
133 has_connection_to_host: bool,
134 knowledge: Knowledge,
135 time: Instant,
136 ) {
137 self.last_reported_term = term;
138 self.has_connection_host = has_connection_to_host;
139 self.quality.on_ping(time);
140 self.knowledge = knowledge;
141 }
142}
143
144pub struct Room {
145 pub id: ConnectionIndex,
146 pub connections: HashMap<ConnectionIndex, Connection>,
147 pub leader_index: ConnectionIndex,
148 pub term: Term,
149}
150
151impl Room {
152 pub fn new() -> Self {
153 Self {
154 term: 0,
155 id: 0,
156 connections: Default::default(),
157 leader_index: ILLEGAL_CONNECTION_INDEX,
158 }
159 }
160
161 pub fn has_most_lost_connection_to_leader(&self) -> bool {
163 let mut disappointed_count = 0;
164 for (_, connection) in self.connections.iter() {
165 if !connection.has_connection_host && connection.last_reported_term == self.term {
166 disappointed_count += 1;
167 }
168 }
169 disappointed_count >= self.connections.len()
170 }
171
172 pub fn connection_with_most_knowledge_and_acceptable_quality(
173 &self,
174 exclude_index: ConnectionIndex,
175 ) -> ConnectionIndex {
176 let mut knowledge: Knowledge = 0;
177 let mut connection_index: ConnectionIndex = ILLEGAL_CONNECTION_INDEX;
178
179 for (_, connection) in self.connections.iter() {
180 if (connection.knowledge >= knowledge || connection_index == ILLEGAL_CONNECTION_INDEX)
181 && (connection.id != exclude_index)
182 {
183 knowledge = connection.knowledge;
184 connection_index = connection.id;
185 }
186 }
187
188 connection_index
189 }
190
191 pub fn change_leader_if_down_voted(&mut self) {
192 if self.leader_index == ILLEGAL_CONNECTION_INDEX {
193 return;
194 }
195
196 if self.has_most_lost_connection_to_leader() {
197 self.leader_index =
198 self.connection_with_most_knowledge_and_acceptable_quality(self.leader_index);
199 self.term += 1;
201 }
202 }
203
204 fn find_unique_connection_index(&self) -> ConnectionIndex {
205 let mut candidate = self.id;
206
207 while self.connections.contains_key(&candidate) {
208 candidate += 1;
209 if candidate == self.id {
210 panic!("No unique connection index available");
211 }
212 }
213
214 candidate
215 }
216
217 pub fn create_connection(&mut self, time: Instant) -> &mut Connection {
218 eprintln!("creating a connection");
219 self.id += 1;
220 let connection_id = self.find_unique_connection_index();
221 let connection = Connection::new(connection_id, self.term, time);
222 self.connections.insert(self.id, connection);
223 self.connections
224 .get_mut(&self.id)
225 .expect("key is missing from hashmap somehow")
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use std::time::{Duration, Instant};
232
233 use crate::{Knowledge, QualityAssessment, Room, Term};
234
235 #[test]
236 fn check_ping() {
237 let mut room = Room::new();
238 let now = Instant::now();
239 let connection = room.create_connection(now);
240 assert_eq!(connection.id, 1);
241 let knowledge: Knowledge = 42;
242 let term: Term = 1;
243 connection.on_ping(term, true, knowledge, now);
244
245 connection.quality.update(now);
246 assert_eq!(
247 connection.quality.assessment,
248 QualityAssessment::NeedMoreTimeOrInformation
249 );
250
251 let time_in_future = now + Duration::new(10, 0);
252 connection.quality.update(time_in_future);
253 assert_eq!(
254 connection.quality.assessment,
255 QualityAssessment::RecommendDisconnect
256 );
257 }
258}