paxos_rust/
proposer.rs

1//! Proposer
2
3use message::{AcceptData, AcceptedData, Message, Messenger, PromiseData, ProposalData};
4use std::collections::{HashMap, HashSet};
5use std::hash::Hash;
6use std::sync::Arc;
7
8/// A Proposer advocates a client request, attempting to convince the Acceptors
9/// to agree on it, and acting as a coordinator to move the protocol forward
10/// when conflicts occur.
11pub struct Proposer<T> {
12    /// `Proposer`'s ID
13    pub id: u64,
14    /// `Messenger` specifying communication with other nodes
15    pub messenger: Option<Box<Messenger<T>>>,
16    /// The proposed value
17    pub value: Option<Arc<T>>,
18    /// The highest proposal number seen
19    pub proposal_n: u64,
20    /// The last proposal that was accepted
21    pub last_accepted_n: u64,
22    /// Promises received (proposal_n => data)
23    pub promises_received: HashMap<u64, HashSet<PromiseData<T>>>,
24    /// Accepted messages received (proposal_n => data)
25    pub accepted_received: HashMap<u64, HashSet<AcceptedData<T>>>,
26    /// The minimum number of `Acceptor`s needed to continue
27    pub quorum: u8,
28}
29
30impl<T: 'static> Proposer<T>
31where
32    T: Eq + Hash + Clone,
33{
34    /// Creates a new `Proposer`.
35    pub fn new(id: u64, quorum: u8) -> Self {
36        Self {
37            id,
38            quorum,
39            ..Self::default()
40        }
41    }
42
43    /// The first phase. Creates a proposal.
44    pub fn prepare(&mut self, value: T) {
45        self.value = Some(Arc::new(value));
46        self.proposal_n += 1;
47        self.promises_received
48            .insert(self.proposal_n, HashSet::new());
49        self.accepted_received
50            .insert(self.proposal_n, HashSet::new());
51        let prepare = Message::Prepare(ProposalData { id: self.id });
52
53        if let Some(ref mut messenger) = self.messenger {
54            messenger.send_prepare(prepare);
55        }
56    }
57
58    /// Receives a `Promise` message from an `Acceptor`.
59    pub fn receive_promise(&mut self, msg: Message<T>) {
60        if let Message::Promise(data) = msg {
61            self.promises_received
62                .get_mut(&data.id)
63                .unwrap()
64                .insert(data);
65
66            if self.promises_received.get(&self.proposal_n).unwrap().len() == self.quorum as usize {
67                self.accept();
68            }
69        }
70    }
71
72    /// The second phase. Sets a value for the proposal, and builds an `Accept` request.
73    pub fn accept(&mut self) {
74        let mut promises_vec = self
75            .promises_received
76            .get_mut(&self.proposal_n)
77            .unwrap()
78            .iter()
79            .filter(|p| p.value.is_some())
80            .collect::<Vec<_>>();
81
82        promises_vec.sort_by(|a, b| b.id.cmp(&a.id));
83
84        self.value = Some({
85            if promises_vec.len() == 0 {
86                self.value.clone().unwrap()
87            } else {
88                promises_vec[0].value.clone().unwrap()
89            }
90        });
91        let msg = Message::Accept(AcceptData {
92            id: self.proposal_n,
93            value: self.value.clone().unwrap(),
94        });
95
96        if let Some(ref mut messenger) = self.messenger {
97            messenger.send_accept(msg);
98        }
99    }
100
101    /// Receives an `Accepted` message from an `Acceptor`.
102    pub fn receive_accepted(&mut self, msg: Message<T>) {
103        if let Message::Accepted(data) = msg {
104            self.accepted_received
105                .get_mut(&data.id)
106                .unwrap()
107                .insert(data);
108
109            if self.accepted_received.get(&self.proposal_n).unwrap().len() == self.quorum as usize {
110                if let Some(ref mut messenger) = self.messenger {
111                    self.last_accepted_n = self.proposal_n;
112                    messenger.on_resolution(self.proposal_n, self.value.clone().unwrap());
113                }
114            }
115        }
116    }
117}
118
119impl<T> Default for Proposer<T> {
120    fn default() -> Self {
121        Self {
122            id: 1,
123            quorum: 7,
124            value: None,
125            messenger: None,
126            proposal_n: 0,
127            last_accepted_n: 0,
128            promises_received: HashMap::new(),
129            accepted_received: HashMap::new(),
130        }
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn proposer_new() {
140        let p: Proposer<u64> = Proposer::default();
141
142        assert_eq!(p.id, 1);
143        assert_eq!(p.proposal_n, 0);
144        assert_eq!(p.value, None);
145        assert!(p.messenger.is_none());
146        assert_eq!(p.promises_received.len(), 0);
147        assert_eq!(p.accepted_received.len(), 0);
148        assert_eq!(p.quorum, 7);
149    }
150
151    #[test]
152    fn proposer_prepare() {
153        let mut p: Proposer<u64> = Proposer::default();
154
155        p.prepare(60);
156
157        assert_eq!(p.proposal_n, 1);
158
159        assert_eq!(p.value, Some(Arc::new(60)));
160    }
161
162    #[test]
163    fn proposer_receive_promise() {
164        let mut p: Proposer<u64> = Proposer::default();
165
166        p.prepare(60);
167
168        let msg = Message::Promise(PromiseData {
169            id: 1,
170            value: None,
171            from: 2,
172        });
173
174        p.receive_promise(msg);
175
176        assert_eq!(p.promises_received.len(), 1);
177        assert!(p.promises_received.get(&1).is_some());
178    }
179
180    #[test]
181    fn proposer_accept() {
182        let mut p: Proposer<u64> = Proposer::default();
183
184        p.prepare(60);
185
186        // Receive a Promise
187
188        let msg = Message::Promise(PromiseData {
189            id: 1,
190            value: None,
191            from: 2,
192        });
193
194        p.receive_promise(msg);
195
196        p.accept();
197
198        assert_eq!(p.value, Some(Arc::new(60)));
199
200        // Receive another Promise that has an existing value for that proposal.
201
202        let msg = Message::Promise(PromiseData {
203            id: 1,
204            value: Some(Arc::new(25)),
205            from: 2,
206        });
207
208        p.receive_promise(msg);
209
210        p.accept();
211
212        assert_eq!(p.value, Some(Arc::new(25)));
213    }
214
215    #[test]
216    fn proposer_receive_accepted() {
217        let mut p: Proposer<u64> = Proposer::default();
218
219        p.prepare(60);
220
221        let msg = Message::Accepted(AcceptedData {
222            id: 1,
223            value: Arc::new(60),
224            from: 2,
225        });
226
227        p.receive_accepted(msg);
228
229        assert_eq!(p.accepted_received.len(), 1);
230        assert!(p.accepted_received.get(&1).is_some());
231    }
232}