raft_consensus/
handler.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3
4use message::*;
5use state::ConsensusState;
6use {ClientId, ServerId};
7
8/// Handler for actions returned from consensus
9pub trait ConsensusHandler: Debug {
10    fn send_peer_message(&mut self, id: ServerId, message: PeerMessage);
11    fn send_client_response(&mut self, id: ClientId, message: ClientResponse);
12    fn set_timeout(&mut self, timeout: ConsensusTimeout);
13    fn clear_timeout(&mut self, timeout: ConsensusTimeout);
14
15    #[allow(unused_variables)]
16    /// Called when consensus goes to new state. Initializing new consensus does not call this function.
17    fn state_changed(&mut self, old: ConsensusState, new: ConsensusState) {}
18
19    /// Called when the particular event has been fully processed. Useful for doing actions in batches.
20    fn done(&mut self) {}
21}
22
23/// A handler that collects all messages leaving processing of them untouched.
24/// Note that timeouts vectors may intersect, that means both - clearing and setting a new timeout was requested.
25#[derive(Debug)]
26pub struct CollectHandler {
27    pub peer_messages: HashMap<ServerId, Vec<PeerMessage>>,
28    pub client_messages: HashMap<ClientId, Vec<ClientResponse>>,
29    pub timeouts: Vec<ConsensusTimeout>,
30    pub clear_timeouts: Vec<ConsensusTimeout>,
31    pub state: ConsensusState,
32}
33
34impl CollectHandler {
35    pub fn new() -> Self {
36        Self {
37            peer_messages: HashMap::new(),
38            client_messages: HashMap::new(),
39            timeouts: Vec::new(),
40            clear_timeouts: Vec::new(),
41            state: ConsensusState::Follower,
42        }
43    }
44
45    /// Delete all events
46    pub fn clear(&mut self) {
47        self.peer_messages.clear();
48        self.client_messages.clear();
49        self.timeouts.clear();
50        self.clear_timeouts.clear();
51    }
52}
53
54impl Default for CollectHandler {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl ConsensusHandler for CollectHandler {
61    /// Saves peer message to a vector
62    fn send_peer_message(&mut self, id: ServerId, message: PeerMessage) {
63        let peer = self.peer_messages.entry(id).or_insert_with(Vec::new);
64        peer.push(message);
65    }
66
67    /// Saves client message to a vector
68    fn send_client_response(&mut self, id: ClientId, message: ClientResponse) {
69        let client = self.client_messages.entry(id).or_insert_with(Vec::new);
70        client.push(message);
71    }
72
73    /// Collects timeouts uniquely
74    fn set_timeout(&mut self, timeout: ConsensusTimeout) {
75        if !self.timeouts.iter().any(|&t| t == timeout) {
76            self.timeouts.push(timeout);
77        }
78    }
79
80    fn clear_timeout(&mut self, timeout: ConsensusTimeout) {
81        if !self.clear_timeouts.iter().any(|&t| t == timeout) {
82            self.clear_timeouts.push(timeout);
83        }
84    }
85
86    fn state_changed(&mut self, _old: ConsensusState, new: ConsensusState) {
87        self.state = new
88    }
89}