paxos/
proposer.rs

1use super::*;
2
3use std::collections::HashMap;
4use std::fmt;
5
6#[derive(Eq, PartialEq, Debug, Clone)]
7enum Phase {
8    Propose,
9    Accept,
10}
11
12#[derive(Clone)]
13pub struct Pending {
14    client_addr: String,
15    id: u64,
16    req: Req,
17    new_v: Option<Value>,
18    phase: Phase,
19    waiting_for: Vec<String>,
20    acks_from: Vec<String>,
21    nacks_from: Vec<String>,
22    highest_promise_ballot: Ballot,
23    highest_promise_value: Option<Value>,
24    received_at: SystemTime,
25    cas_failed: Result<(), Error>,
26    has_retried_once: bool,
27}
28
29impl Pending {
30    fn apply_op(&mut self) {
31        match self.req {
32            Req::Get(_) => {
33                self.new_v = self.highest_promise_value.clone();
34            }
35            Req::Del(_) => {
36                self.new_v = None;
37            }
38            Req::Set(_, ref new_v) => {
39                self.new_v = Some(new_v.clone());
40            }
41            Req::Cas(_, ref old_v, ref new_v) => {
42                if *old_v == self.highest_promise_value {
43                    self.new_v = new_v.clone();
44                } else {
45                    self.new_v = self.highest_promise_value.clone();
46                    self.cas_failed = Err(Error::CasFailed(
47                        self.highest_promise_value.clone(),
48                    ));
49                }
50            }
51        }
52    }
53
54    fn transition_to_accept(&mut self, acceptors: Vec<String>) {
55        self.phase = Phase::Accept;
56        self.acks_from = vec![];
57        self.nacks_from = vec![];
58        self.waiting_for = acceptors;
59        self.apply_op();
60    }
61}
62
63impl fmt::Debug for Pending {
64    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
65        write!(
66            f,
67            "Pending {{
68                client_addr: {},
69                id: {},
70                new_v: {:?},
71                phase: {:?},
72                waiting_for: {:?},
73                acks_from: {:?},
74                nacks_from: {:?},
75                highest_promise_ballot: {:?},
76                highest_promise_value: {:?},
77                received_at: {:?},
78                cas_failed: {:?},
79                has_retried_once: {},
80            }}",
81            self.client_addr,
82            self.id,
83            self.new_v,
84            self.phase,
85            self.waiting_for,
86            self.acks_from,
87            self.nacks_from,
88            self.highest_promise_ballot,
89            self.highest_promise_value,
90            self.received_at,
91            self.cas_failed,
92            self.has_retried_once,
93        )
94    }
95}
96
97#[derive(Default, Debug, Clone)]
98pub struct Proposer {
99    accept_acceptors: Vec<String>,
100    propose_acceptors: Vec<String>,
101    ballot_counter: u64,
102    in_flight: HashMap<Ballot, Pending>,
103    timeout: Duration,
104}
105
106impl Proposer {
107    pub fn new(timeout_ms: u64, proposers: Vec<String>) -> Proposer {
108        let mut ret = Proposer::default();
109        ret.accept_acceptors = proposers.clone();
110        ret.propose_acceptors = proposers;
111        ret.timeout = Duration::from_millis(timeout_ms);
112        ret
113    }
114
115    fn bump_ballot(&mut self) -> Ballot {
116        self.ballot_counter += 1;
117        Ballot(self.ballot_counter)
118    }
119
120    fn propose(
121        &mut self,
122        at: SystemTime,
123        from: String,
124        id: u64,
125        req: Req,
126        retry: bool,
127    ) -> Vec<(String, Rpc)> {
128        let ballot = self.bump_ballot();
129        self.in_flight.insert(
130            ballot.clone(),
131            Pending {
132                client_addr: from,
133                id: id,
134                req: req.clone(),
135                new_v: None,
136                phase: Phase::Propose,
137                waiting_for: self.propose_acceptors.clone(),
138                acks_from: vec![],
139                nacks_from: vec![],
140                highest_promise_ballot: Ballot(0),
141                highest_promise_value: None,
142                received_at: at,
143                cas_failed: Ok(()),
144                has_retried_once: retry,
145            },
146        );
147
148        self.propose_acceptors
149            .iter()
150            .map(|a| (a.clone(), ProposeReq(ballot.clone(), req.key())))
151            .collect()
152    }
153}
154
155impl Reactor for Proposer {
156    type Peer = String;
157    type Message = Rpc;
158
159    fn receive(
160        &mut self,
161        at: SystemTime,
162        from: Self::Peer,
163        msg: Self::Message,
164    ) -> Vec<(Self::Peer, Self::Message)> {
165        let mut clear_ballot = None;
166        let mut retry = None;
167        let res = match msg {
168            ClientRequest(id, r) => self.propose(at, from, id, r, false),
169            SetAcceptAcceptors(sas) => {
170                self.accept_acceptors = sas;
171                vec![]
172            }
173            SetProposeAcceptors(sas) => {
174                self.propose_acceptors = sas;
175                vec![]
176            }
177            ProposeRes {
178                req_ballot,
179                last_accepted_ballot,
180                last_accepted_value,
181                res,
182            } => {
183                if !self.in_flight.contains_key(&req_ballot) {
184                    // we've already moved on
185                    return vec![];
186                }
187
188                let mut pending = self.in_flight.get_mut(&req_ballot).unwrap();
189
190                if pending.phase != Phase::Propose {
191                    // we've already moved on
192                    return vec![];
193                }
194
195                assert!(
196                    !pending.acks_from.contains(&from)
197                        && !pending.nacks_from.contains(&from),
198                    "somehow got a response from this peer already... \
199                    we don't do retries in this game yet!"
200                );
201
202                assert!(
203                    pending.waiting_for.contains(&from),
204                    "somehow got a response from someone we didn't send \
205                    a request to... maybe the network is funky and we \
206                    should use a higher level identifier to identify them \
207                    than their network address."
208                );
209
210                let majority = (pending.waiting_for.len() / 2) + 1;
211
212                match res {
213                    Err(Error::ProposalRejected { ref last }) => {
214                        // some nerd didn't like our request...
215                        if self.ballot_counter < last.0 {
216                            self.ballot_counter = last.0;
217                        }
218
219                        pending.nacks_from.push(from);
220
221                        if pending.nacks_from.len() >= majority {
222                            clear_ballot = Some(req_ballot.clone());
223
224                            if !pending.has_retried_once {
225                                retry = Some((
226                                    pending.received_at,
227                                    pending.client_addr.clone(),
228                                    pending.id,
229                                    pending.req.clone(),
230                                ));
231                                vec![]
232                            } else {
233                                vec![(
234                                    pending.client_addr.clone(),
235                                    ClientResponse(
236                                        pending.id,
237                                        Err(Error::ProposalRejected {
238                                            last: last.clone(),
239                                        }),
240                                    ),
241                                )]
242                            }
243                        } else {
244                            // still waiting for a majority of positive responses
245                            vec![]
246                        }
247                    }
248                    Ok(()) => {
249                        assert!(
250                            req_ballot.0 > pending.highest_promise_ballot.0,
251                            "somehow the acceptor promised us a vote for our ballot {:?} \
252                            even though their highest promise ballot of {:?} \
253                            is higher than our request...",
254                            req_ballot.0,
255                            pending.highest_promise_ballot.0
256                        );
257                        pending.acks_from.push(from);
258
259                        if last_accepted_ballot > pending.highest_promise_ballot
260                        {
261                            pending.highest_promise_ballot =
262                                last_accepted_ballot;
263                            pending.highest_promise_value = last_accepted_value;
264                        }
265
266                        if pending.acks_from.len() >= majority {
267                            // transition to ACCEPT phase
268                            // NB assumption: we use CURRENT acceptor list,
269                            // rather than the acceptor list when we received
270                            // the client request. need to think on this more.
271                            pending.transition_to_accept(
272                                self.accept_acceptors.clone(),
273                            );
274
275                            pending
276                                .waiting_for
277                                .iter()
278                                .map(|a| {
279                                    (
280                                        a.clone(),
281                                        AcceptReq(
282                                            req_ballot.clone(),
283                                            pending.req.key(),
284                                            pending.new_v.clone(),
285                                        ),
286                                    )
287                                })
288                                .collect()
289                        } else {
290                            // still waiting for promises
291                            vec![]
292                        }
293                    }
294                    other => panic!("got unhandled ProposeRes: {:?}", other),
295                }
296            }
297            AcceptRes(ballot, res) => {
298                if !self.in_flight.contains_key(&ballot) {
299                    // we've already moved on
300                    return vec![];
301                }
302
303                let pending = self.in_flight.get_mut(&ballot).unwrap();
304
305                assert_eq!(
306                    pending.phase,
307                    Phase::Accept,
308                    "somehow we went back in time and became a proposal..."
309                );
310
311                assert!(
312                    !pending.acks_from.contains(&from)
313                        && !pending.nacks_from.contains(&from),
314                    "somehow got a response from this peer already... \
315                    we don't do retries in this game yet!"
316                );
317
318                assert!(
319                    pending.waiting_for.contains(&from),
320                    "somehow got a response from someone we didn't send \
321                    a request to... maybe the network is funky and we \
322                    should use a higher level identifier to identify them \
323                    than their network address."
324                );
325
326                let majority = (pending.waiting_for.len() / 2) + 1;
327
328                match res {
329                    Err(Error::AcceptRejected { ref last }) => {
330                        // some nerd didn't like our request...
331                        if self.ballot_counter < last.0 {
332                            self.ballot_counter = last.0;
333                        }
334
335                        pending.nacks_from.push(from);
336
337                        if pending.nacks_from.len() >= majority {
338                            clear_ballot = Some(ballot);
339                            vec![(
340                                pending.client_addr.clone(),
341                                ClientResponse(
342                                    pending.id,
343                                    Err(Error::AcceptRejected {
344                                        last: last.clone(),
345                                    }),
346                                ),
347                            )]
348                        } else {
349                            vec![]
350                        }
351                    }
352                    Ok(()) => {
353                        pending.acks_from.push(from);
354
355                        if pending.acks_from.len() >= majority {
356                            // respond favorably to the client and nuke pending
357                            clear_ballot = Some(ballot);
358                            vec![(
359                                pending.client_addr.clone(),
360                                ClientResponse(
361                                    pending.id,
362                                    pending
363                                        .cas_failed
364                                        .clone()
365                                        .map(|_| pending.new_v.clone()),
366                                ),
367                            )]
368                        } else {
369                            // still waiting for acceptances
370                            vec![]
371                        }
372                    }
373                    other => panic!("got unhandled AcceptRes: {:?}", other),
374                }
375            }
376            other => panic!("proposer got unhandled rpc: {:?}", other),
377        };
378
379        if let Some(ballot) = clear_ballot.take() {
380            self.in_flight.remove(&ballot);
381        }
382
383        if let Some((received_at, client_addr, id, req)) = retry {
384            self.propose(received_at, client_addr, id, req, true)
385        } else {
386            res
387        }
388    }
389
390    // we use tick to handle timeouts
391    fn tick(&mut self, at: SystemTime) -> Vec<(Self::Peer, Self::Message)> {
392        let ret = {
393            let late = self.in_flight.values().filter(|i| {
394                at.duration_since(i.received_at).unwrap() > self.timeout
395            });
396
397            late.map(|pending| {
398                (
399                    pending.client_addr.clone(),
400                    ClientResponse(pending.id, Err(Error::Timeout)),
401                )
402            })
403            .collect()
404        };
405
406        let timeout = self.timeout.clone();
407
408        self.in_flight.retain(|_, i| {
409            at.duration_since(i.received_at).unwrap() <= timeout
410        });
411
412        ret
413    }
414}