raft_consensus/
handler.rs1use std::collections::HashMap;
2use std::fmt::Debug;
3
4use message::*;
5use state::ConsensusState;
6use {ClientId, ServerId};
7
8pub trait ConsensusHandler: Debug {
10 fn send_peer_message(&mut self, id: ServerId, message: PeerMessage);
11 fn send_client_response(&mut self, id: ClientId, message: ClientResponse);
12 fn set_timeout(&mut self, timeout: ConsensusTimeout);
13 fn clear_timeout(&mut self, timeout: ConsensusTimeout);
14
15 #[allow(unused_variables)]
16 fn state_changed(&mut self, old: ConsensusState, new: ConsensusState) {}
18
19 fn done(&mut self) {}
21}
22
23#[derive(Debug)]
26pub struct CollectHandler {
27 pub peer_messages: HashMap<ServerId, Vec<PeerMessage>>,
28 pub client_messages: HashMap<ClientId, Vec<ClientResponse>>,
29 pub timeouts: Vec<ConsensusTimeout>,
30 pub clear_timeouts: Vec<ConsensusTimeout>,
31 pub state: ConsensusState,
32}
33
34impl CollectHandler {
35 pub fn new() -> Self {
36 Self {
37 peer_messages: HashMap::new(),
38 client_messages: HashMap::new(),
39 timeouts: Vec::new(),
40 clear_timeouts: Vec::new(),
41 state: ConsensusState::Follower,
42 }
43 }
44
45 pub fn clear(&mut self) {
47 self.peer_messages.clear();
48 self.client_messages.clear();
49 self.timeouts.clear();
50 self.clear_timeouts.clear();
51 }
52}
53
54impl Default for CollectHandler {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl ConsensusHandler for CollectHandler {
61 fn send_peer_message(&mut self, id: ServerId, message: PeerMessage) {
63 let peer = self.peer_messages.entry(id).or_insert_with(Vec::new);
64 peer.push(message);
65 }
66
67 fn send_client_response(&mut self, id: ClientId, message: ClientResponse) {
69 let client = self.client_messages.entry(id).or_insert_with(Vec::new);
70 client.push(message);
71 }
72
73 fn set_timeout(&mut self, timeout: ConsensusTimeout) {
75 if !self.timeouts.iter().any(|&t| t == timeout) {
76 self.timeouts.push(timeout);
77 }
78 }
79
80 fn clear_timeout(&mut self, timeout: ConsensusTimeout) {
81 if !self.clear_timeouts.iter().any(|&t| t == timeout) {
82 self.clear_timeouts.push(timeout);
83 }
84 }
85
86 fn state_changed(&mut self, _old: ConsensusState, new: ConsensusState) {
87 self.state = new
88 }
89}