conclave_room/
lib.rs

1/*----------------------------------------------------------------------------------------------------------
2 *  Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/piot/conclave-room-rs
3 *  Licensed under the MIT License. See LICENSE in the project root for license information.
4 *--------------------------------------------------------------------------------------------------------*/
5use std::collections::HashMap;
6use std::time::Instant;
7
8pub type ConnectionIndex = u8;
9
10const ILLEGAL_CONNECTION_INDEX: u8 = 0xff;
11
12pub type Knowledge = u64;
13
14pub type Term = u16;
15
16pub struct RateMetrics {
17    count: u32,
18    last_calculated_at: Instant,
19}
20
21impl RateMetrics {
22    fn new(time: Instant) -> Self {
23        Self {
24            count: 0,
25            last_calculated_at: time,
26        }
27    }
28
29    fn increment(&mut self) {
30        self.count += 1;
31    }
32
33    fn has_enough_time_passed(&self, time: Instant) -> bool {
34        (time - self.last_calculated_at).as_millis() > 200
35    }
36
37    fn calculate_rate(&mut self, time: Instant) -> f32 {
38        let elapsed_time = time - self.last_calculated_at;
39        let milliseconds = elapsed_time.as_secs() as f32 * 1000.0
40            + elapsed_time.subsec_nanos() as f32 / 1_000_000.0;
41
42        let rate = if milliseconds > 0.0 {
43            self.count as f32 / milliseconds
44        } else {
45            0.0
46        };
47
48        // Reset the counter and start time for the next period
49        self.count = 0;
50        self.last_calculated_at = time;
51
52        rate
53    }
54}
55
56#[derive(Debug, Clone, Eq, PartialEq)]
57pub enum QualityAssessment {
58    NeedMoreTimeOrInformation,
59    RecommendDisconnect,
60    Acceptable,
61    Good,
62}
63
64pub struct ConnectionQuality {
65    pub last_ping_at: Instant,
66    pub pings_per_second: RateMetrics,
67    pub assessment: QualityAssessment,
68    threshold: f32,
69}
70
71impl ConnectionQuality {
72    pub fn new(threshold: f32, time: Instant) -> Self {
73        Self {
74            assessment: QualityAssessment::NeedMoreTimeOrInformation,
75            last_ping_at: Instant::now(),
76            pings_per_second: RateMetrics::new(time),
77            threshold,
78        }
79    }
80
81    pub fn on_ping(&mut self, time: Instant) {
82        self.last_ping_at = time;
83        self.pings_per_second.increment();
84    }
85
86    pub fn update(&mut self, time: Instant) {
87        if !self.pings_per_second.has_enough_time_passed(time) {
88            self.assessment = QualityAssessment::NeedMoreTimeOrInformation;
89        } else {
90            let pings_per_second = self.pings_per_second.calculate_rate(time);
91            self.assessment = if pings_per_second < self.threshold {
92                QualityAssessment::RecommendDisconnect
93            } else if pings_per_second > self.threshold * 2.0 {
94                QualityAssessment::Good
95            } else {
96                QualityAssessment::Acceptable
97            };
98        }
99    }
100}
101
102pub enum ConnectionState {
103    Online,
104    Disconnected,
105}
106
107pub struct Connection {
108    pub id: ConnectionIndex,
109    pub quality: ConnectionQuality,
110    pub knowledge: Knowledge,
111    pub state: ConnectionState,
112    pub last_reported_term: Term,
113    pub has_connection_host: bool,
114}
115
116const PINGS_PER_SECOND_THRESHOLD: f32 = 10.0;
117
118impl Connection {
119    fn new(connection_id: ConnectionIndex, term: Term, time: Instant) -> Self {
120        Connection {
121            has_connection_host: false,
122            last_reported_term: term,
123            id: connection_id,
124            quality: ConnectionQuality::new(PINGS_PER_SECOND_THRESHOLD, time),
125            knowledge: 0,
126            state: ConnectionState::Online,
127        }
128    }
129
130    fn on_ping(
131        &mut self,
132        term: Term,
133        has_connection_to_host: bool,
134        knowledge: Knowledge,
135        time: Instant,
136    ) {
137        self.last_reported_term = term;
138        self.has_connection_host = has_connection_to_host;
139        self.quality.on_ping(time);
140        self.knowledge = knowledge;
141    }
142}
143
144pub struct Room {
145    pub id: ConnectionIndex,
146    pub connections: HashMap<ConnectionIndex, Connection>,
147    pub leader_index: ConnectionIndex,
148    pub term: Term,
149}
150
151impl Room {
152    pub fn new() -> Self {
153        Self {
154            term: 0,
155            id: 0,
156            connections: Default::default(),
157            leader_index: ILLEGAL_CONNECTION_INDEX,
158        }
159    }
160
161    /// checks if most connections, that are on the same term, has lost connection to leader
162    pub fn has_most_lost_connection_to_leader(&self) -> bool {
163        let mut disappointed_count = 0;
164        for (_, connection) in self.connections.iter() {
165            if !connection.has_connection_host && connection.last_reported_term == self.term {
166                disappointed_count += 1;
167            }
168        }
169        disappointed_count >= self.connections.len()
170    }
171
172    pub fn connection_with_most_knowledge_and_acceptable_quality(
173        &self,
174        exclude_index: ConnectionIndex,
175    ) -> ConnectionIndex {
176        let mut knowledge: Knowledge = 0;
177        let mut connection_index: ConnectionIndex = ILLEGAL_CONNECTION_INDEX;
178
179        for (_, connection) in self.connections.iter() {
180            if (connection.knowledge >= knowledge || connection_index == ILLEGAL_CONNECTION_INDEX)
181                && (connection.id != exclude_index)
182            {
183                knowledge = connection.knowledge;
184                connection_index = connection.id;
185            }
186        }
187
188        connection_index
189    }
190
191    pub fn change_leader_if_down_voted(&mut self) {
192        if self.leader_index == ILLEGAL_CONNECTION_INDEX {
193            return;
194        }
195
196        if self.has_most_lost_connection_to_leader() {
197            self.leader_index =
198                self.connection_with_most_knowledge_and_acceptable_quality(self.leader_index);
199            // We start a new term, since we have a new leader
200            self.term += 1;
201        }
202    }
203
204    fn find_unique_connection_index(&self) -> ConnectionIndex {
205        let mut candidate = self.id;
206
207        while self.connections.contains_key(&candidate) {
208            candidate += 1;
209            if candidate == self.id {
210                panic!("No unique connection index available");
211            }
212        }
213
214        candidate
215    }
216
217    pub fn create_connection(&mut self, time: Instant) -> &mut Connection {
218        eprintln!("creating a connection");
219        self.id += 1;
220        let connection_id = self.find_unique_connection_index();
221        let connection = Connection::new(connection_id, self.term, time);
222        self.connections.insert(self.id, connection);
223        self.connections
224            .get_mut(&self.id)
225            .expect("key is missing from hashmap somehow")
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use std::time::{Duration, Instant};
232
233    use crate::{Knowledge, QualityAssessment, Room, Term};
234
235    #[test]
236    fn check_ping() {
237        let mut room = Room::new();
238        let now = Instant::now();
239        let connection = room.create_connection(now);
240        assert_eq!(connection.id, 1);
241        let knowledge: Knowledge = 42;
242        let term: Term = 1;
243        connection.on_ping(term, true, knowledge, now);
244
245        connection.quality.update(now);
246        assert_eq!(
247            connection.quality.assessment,
248            QualityAssessment::NeedMoreTimeOrInformation
249        );
250
251        let time_in_future = now + Duration::new(10, 0);
252        connection.quality.update(time_in_future);
253        assert_eq!(
254            connection.quality.assessment,
255            QualityAssessment::RecommendDisconnect
256        );
257    }
258}