1use super::*;
2
3use std::collections::HashMap;
4use std::fmt;
5
6#[derive(Eq, PartialEq, Debug, Clone)]
7enum Phase {
8 Propose,
9 Accept,
10}
11
12#[derive(Clone)]
13pub struct Pending {
14 client_addr: String,
15 id: u64,
16 req: Req,
17 new_v: Option<Value>,
18 phase: Phase,
19 waiting_for: Vec<String>,
20 acks_from: Vec<String>,
21 nacks_from: Vec<String>,
22 highest_promise_ballot: Ballot,
23 highest_promise_value: Option<Value>,
24 received_at: SystemTime,
25 cas_failed: Result<(), Error>,
26 has_retried_once: bool,
27}
28
29impl Pending {
30 fn apply_op(&mut self) {
31 match self.req {
32 Req::Get(_) => {
33 self.new_v = self.highest_promise_value.clone();
34 }
35 Req::Del(_) => {
36 self.new_v = None;
37 }
38 Req::Set(_, ref new_v) => {
39 self.new_v = Some(new_v.clone());
40 }
41 Req::Cas(_, ref old_v, ref new_v) => {
42 if *old_v == self.highest_promise_value {
43 self.new_v = new_v.clone();
44 } else {
45 self.new_v = self.highest_promise_value.clone();
46 self.cas_failed = Err(Error::CasFailed(
47 self.highest_promise_value.clone(),
48 ));
49 }
50 }
51 }
52 }
53
54 fn transition_to_accept(&mut self, acceptors: Vec<String>) {
55 self.phase = Phase::Accept;
56 self.acks_from = vec![];
57 self.nacks_from = vec![];
58 self.waiting_for = acceptors;
59 self.apply_op();
60 }
61}
62
63impl fmt::Debug for Pending {
64 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
65 write!(
66 f,
67 "Pending {{
68 client_addr: {},
69 id: {},
70 new_v: {:?},
71 phase: {:?},
72 waiting_for: {:?},
73 acks_from: {:?},
74 nacks_from: {:?},
75 highest_promise_ballot: {:?},
76 highest_promise_value: {:?},
77 received_at: {:?},
78 cas_failed: {:?},
79 has_retried_once: {},
80 }}",
81 self.client_addr,
82 self.id,
83 self.new_v,
84 self.phase,
85 self.waiting_for,
86 self.acks_from,
87 self.nacks_from,
88 self.highest_promise_ballot,
89 self.highest_promise_value,
90 self.received_at,
91 self.cas_failed,
92 self.has_retried_once,
93 )
94 }
95}
96
97#[derive(Default, Debug, Clone)]
98pub struct Proposer {
99 accept_acceptors: Vec<String>,
100 propose_acceptors: Vec<String>,
101 ballot_counter: u64,
102 in_flight: HashMap<Ballot, Pending>,
103 timeout: Duration,
104}
105
106impl Proposer {
107 pub fn new(timeout_ms: u64, proposers: Vec<String>) -> Proposer {
108 let mut ret = Proposer::default();
109 ret.accept_acceptors = proposers.clone();
110 ret.propose_acceptors = proposers;
111 ret.timeout = Duration::from_millis(timeout_ms);
112 ret
113 }
114
115 fn bump_ballot(&mut self) -> Ballot {
116 self.ballot_counter += 1;
117 Ballot(self.ballot_counter)
118 }
119
120 fn propose(
121 &mut self,
122 at: SystemTime,
123 from: String,
124 id: u64,
125 req: Req,
126 retry: bool,
127 ) -> Vec<(String, Rpc)> {
128 let ballot = self.bump_ballot();
129 self.in_flight.insert(
130 ballot.clone(),
131 Pending {
132 client_addr: from,
133 id: id,
134 req: req.clone(),
135 new_v: None,
136 phase: Phase::Propose,
137 waiting_for: self.propose_acceptors.clone(),
138 acks_from: vec![],
139 nacks_from: vec![],
140 highest_promise_ballot: Ballot(0),
141 highest_promise_value: None,
142 received_at: at,
143 cas_failed: Ok(()),
144 has_retried_once: retry,
145 },
146 );
147
148 self.propose_acceptors
149 .iter()
150 .map(|a| (a.clone(), ProposeReq(ballot.clone(), req.key())))
151 .collect()
152 }
153}
154
155impl Reactor for Proposer {
156 type Peer = String;
157 type Message = Rpc;
158
159 fn receive(
160 &mut self,
161 at: SystemTime,
162 from: Self::Peer,
163 msg: Self::Message,
164 ) -> Vec<(Self::Peer, Self::Message)> {
165 let mut clear_ballot = None;
166 let mut retry = None;
167 let res = match msg {
168 ClientRequest(id, r) => self.propose(at, from, id, r, false),
169 SetAcceptAcceptors(sas) => {
170 self.accept_acceptors = sas;
171 vec![]
172 }
173 SetProposeAcceptors(sas) => {
174 self.propose_acceptors = sas;
175 vec![]
176 }
177 ProposeRes {
178 req_ballot,
179 last_accepted_ballot,
180 last_accepted_value,
181 res,
182 } => {
183 if !self.in_flight.contains_key(&req_ballot) {
184 return vec![];
186 }
187
188 let mut pending = self.in_flight.get_mut(&req_ballot).unwrap();
189
190 if pending.phase != Phase::Propose {
191 return vec![];
193 }
194
195 assert!(
196 !pending.acks_from.contains(&from)
197 && !pending.nacks_from.contains(&from),
198 "somehow got a response from this peer already... \
199 we don't do retries in this game yet!"
200 );
201
202 assert!(
203 pending.waiting_for.contains(&from),
204 "somehow got a response from someone we didn't send \
205 a request to... maybe the network is funky and we \
206 should use a higher level identifier to identify them \
207 than their network address."
208 );
209
210 let majority = (pending.waiting_for.len() / 2) + 1;
211
212 match res {
213 Err(Error::ProposalRejected { ref last }) => {
214 if self.ballot_counter < last.0 {
216 self.ballot_counter = last.0;
217 }
218
219 pending.nacks_from.push(from);
220
221 if pending.nacks_from.len() >= majority {
222 clear_ballot = Some(req_ballot.clone());
223
224 if !pending.has_retried_once {
225 retry = Some((
226 pending.received_at,
227 pending.client_addr.clone(),
228 pending.id,
229 pending.req.clone(),
230 ));
231 vec![]
232 } else {
233 vec![(
234 pending.client_addr.clone(),
235 ClientResponse(
236 pending.id,
237 Err(Error::ProposalRejected {
238 last: last.clone(),
239 }),
240 ),
241 )]
242 }
243 } else {
244 vec![]
246 }
247 }
248 Ok(()) => {
249 assert!(
250 req_ballot.0 > pending.highest_promise_ballot.0,
251 "somehow the acceptor promised us a vote for our ballot {:?} \
252 even though their highest promise ballot of {:?} \
253 is higher than our request...",
254 req_ballot.0,
255 pending.highest_promise_ballot.0
256 );
257 pending.acks_from.push(from);
258
259 if last_accepted_ballot > pending.highest_promise_ballot
260 {
261 pending.highest_promise_ballot =
262 last_accepted_ballot;
263 pending.highest_promise_value = last_accepted_value;
264 }
265
266 if pending.acks_from.len() >= majority {
267 pending.transition_to_accept(
272 self.accept_acceptors.clone(),
273 );
274
275 pending
276 .waiting_for
277 .iter()
278 .map(|a| {
279 (
280 a.clone(),
281 AcceptReq(
282 req_ballot.clone(),
283 pending.req.key(),
284 pending.new_v.clone(),
285 ),
286 )
287 })
288 .collect()
289 } else {
290 vec![]
292 }
293 }
294 other => panic!("got unhandled ProposeRes: {:?}", other),
295 }
296 }
297 AcceptRes(ballot, res) => {
298 if !self.in_flight.contains_key(&ballot) {
299 return vec![];
301 }
302
303 let pending = self.in_flight.get_mut(&ballot).unwrap();
304
305 assert_eq!(
306 pending.phase,
307 Phase::Accept,
308 "somehow we went back in time and became a proposal..."
309 );
310
311 assert!(
312 !pending.acks_from.contains(&from)
313 && !pending.nacks_from.contains(&from),
314 "somehow got a response from this peer already... \
315 we don't do retries in this game yet!"
316 );
317
318 assert!(
319 pending.waiting_for.contains(&from),
320 "somehow got a response from someone we didn't send \
321 a request to... maybe the network is funky and we \
322 should use a higher level identifier to identify them \
323 than their network address."
324 );
325
326 let majority = (pending.waiting_for.len() / 2) + 1;
327
328 match res {
329 Err(Error::AcceptRejected { ref last }) => {
330 if self.ballot_counter < last.0 {
332 self.ballot_counter = last.0;
333 }
334
335 pending.nacks_from.push(from);
336
337 if pending.nacks_from.len() >= majority {
338 clear_ballot = Some(ballot);
339 vec![(
340 pending.client_addr.clone(),
341 ClientResponse(
342 pending.id,
343 Err(Error::AcceptRejected {
344 last: last.clone(),
345 }),
346 ),
347 )]
348 } else {
349 vec![]
350 }
351 }
352 Ok(()) => {
353 pending.acks_from.push(from);
354
355 if pending.acks_from.len() >= majority {
356 clear_ballot = Some(ballot);
358 vec![(
359 pending.client_addr.clone(),
360 ClientResponse(
361 pending.id,
362 pending
363 .cas_failed
364 .clone()
365 .map(|_| pending.new_v.clone()),
366 ),
367 )]
368 } else {
369 vec![]
371 }
372 }
373 other => panic!("got unhandled AcceptRes: {:?}", other),
374 }
375 }
376 other => panic!("proposer got unhandled rpc: {:?}", other),
377 };
378
379 if let Some(ballot) = clear_ballot.take() {
380 self.in_flight.remove(&ballot);
381 }
382
383 if let Some((received_at, client_addr, id, req)) = retry {
384 self.propose(received_at, client_addr, id, req, true)
385 } else {
386 res
387 }
388 }
389
390 fn tick(&mut self, at: SystemTime) -> Vec<(Self::Peer, Self::Message)> {
392 let ret = {
393 let late = self.in_flight.values().filter(|i| {
394 at.duration_since(i.received_at).unwrap() > self.timeout
395 });
396
397 late.map(|pending| {
398 (
399 pending.client_addr.clone(),
400 ClientResponse(pending.id, Err(Error::Timeout)),
401 )
402 })
403 .collect()
404 };
405
406 let timeout = self.timeout.clone();
407
408 self.in_flight.retain(|_, i| {
409 at.duration_since(i.received_at).unwrap() <= timeout
410 });
411
412 ret
413 }
414}