1use message::{AcceptData, AcceptedData, Message, Messenger, PromiseData, ProposalData};
4use std::collections::{HashMap, HashSet};
5use std::hash::Hash;
6use std::sync::Arc;
7
8pub struct Proposer<T> {
12 pub id: u64,
14 pub messenger: Option<Box<Messenger<T>>>,
16 pub value: Option<Arc<T>>,
18 pub proposal_n: u64,
20 pub last_accepted_n: u64,
22 pub promises_received: HashMap<u64, HashSet<PromiseData<T>>>,
24 pub accepted_received: HashMap<u64, HashSet<AcceptedData<T>>>,
26 pub quorum: u8,
28}
29
30impl<T: 'static> Proposer<T>
31where
32 T: Eq + Hash + Clone,
33{
34 pub fn new(id: u64, quorum: u8) -> Self {
36 Self {
37 id,
38 quorum,
39 ..Self::default()
40 }
41 }
42
43 pub fn prepare(&mut self, value: T) {
45 self.value = Some(Arc::new(value));
46 self.proposal_n += 1;
47 self.promises_received
48 .insert(self.proposal_n, HashSet::new());
49 self.accepted_received
50 .insert(self.proposal_n, HashSet::new());
51 let prepare = Message::Prepare(ProposalData { id: self.id });
52
53 if let Some(ref mut messenger) = self.messenger {
54 messenger.send_prepare(prepare);
55 }
56 }
57
58 pub fn receive_promise(&mut self, msg: Message<T>) {
60 if let Message::Promise(data) = msg {
61 self.promises_received
62 .get_mut(&data.id)
63 .unwrap()
64 .insert(data);
65
66 if self.promises_received.get(&self.proposal_n).unwrap().len() == self.quorum as usize {
67 self.accept();
68 }
69 }
70 }
71
72 pub fn accept(&mut self) {
74 let mut promises_vec = self
75 .promises_received
76 .get_mut(&self.proposal_n)
77 .unwrap()
78 .iter()
79 .filter(|p| p.value.is_some())
80 .collect::<Vec<_>>();
81
82 promises_vec.sort_by(|a, b| b.id.cmp(&a.id));
83
84 self.value = Some({
85 if promises_vec.len() == 0 {
86 self.value.clone().unwrap()
87 } else {
88 promises_vec[0].value.clone().unwrap()
89 }
90 });
91 let msg = Message::Accept(AcceptData {
92 id: self.proposal_n,
93 value: self.value.clone().unwrap(),
94 });
95
96 if let Some(ref mut messenger) = self.messenger {
97 messenger.send_accept(msg);
98 }
99 }
100
101 pub fn receive_accepted(&mut self, msg: Message<T>) {
103 if let Message::Accepted(data) = msg {
104 self.accepted_received
105 .get_mut(&data.id)
106 .unwrap()
107 .insert(data);
108
109 if self.accepted_received.get(&self.proposal_n).unwrap().len() == self.quorum as usize {
110 if let Some(ref mut messenger) = self.messenger {
111 self.last_accepted_n = self.proposal_n;
112 messenger.on_resolution(self.proposal_n, self.value.clone().unwrap());
113 }
114 }
115 }
116 }
117}
118
119impl<T> Default for Proposer<T> {
120 fn default() -> Self {
121 Self {
122 id: 1,
123 quorum: 7,
124 value: None,
125 messenger: None,
126 proposal_n: 0,
127 last_accepted_n: 0,
128 promises_received: HashMap::new(),
129 accepted_received: HashMap::new(),
130 }
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn proposer_new() {
140 let p: Proposer<u64> = Proposer::default();
141
142 assert_eq!(p.id, 1);
143 assert_eq!(p.proposal_n, 0);
144 assert_eq!(p.value, None);
145 assert!(p.messenger.is_none());
146 assert_eq!(p.promises_received.len(), 0);
147 assert_eq!(p.accepted_received.len(), 0);
148 assert_eq!(p.quorum, 7);
149 }
150
151 #[test]
152 fn proposer_prepare() {
153 let mut p: Proposer<u64> = Proposer::default();
154
155 p.prepare(60);
156
157 assert_eq!(p.proposal_n, 1);
158
159 assert_eq!(p.value, Some(Arc::new(60)));
160 }
161
162 #[test]
163 fn proposer_receive_promise() {
164 let mut p: Proposer<u64> = Proposer::default();
165
166 p.prepare(60);
167
168 let msg = Message::Promise(PromiseData {
169 id: 1,
170 value: None,
171 from: 2,
172 });
173
174 p.receive_promise(msg);
175
176 assert_eq!(p.promises_received.len(), 1);
177 assert!(p.promises_received.get(&1).is_some());
178 }
179
180 #[test]
181 fn proposer_accept() {
182 let mut p: Proposer<u64> = Proposer::default();
183
184 p.prepare(60);
185
186 let msg = Message::Promise(PromiseData {
189 id: 1,
190 value: None,
191 from: 2,
192 });
193
194 p.receive_promise(msg);
195
196 p.accept();
197
198 assert_eq!(p.value, Some(Arc::new(60)));
199
200 let msg = Message::Promise(PromiseData {
203 id: 1,
204 value: Some(Arc::new(25)),
205 from: 2,
206 });
207
208 p.receive_promise(msg);
209
210 p.accept();
211
212 assert_eq!(p.value, Some(Arc::new(25)));
213 }
214
215 #[test]
216 fn proposer_receive_accepted() {
217 let mut p: Proposer<u64> = Proposer::default();
218
219 p.prepare(60);
220
221 let msg = Message::Accepted(AcceptedData {
222 id: 1,
223 value: Arc::new(60),
224 from: 2,
225 });
226
227 p.receive_accepted(msg);
228
229 assert_eq!(p.accepted_received.len(), 1);
230 assert!(p.accepted_received.get(&1).is_some());
231 }
232}