viewstamped_replication/
buffer.rs

1use crate::mail::{Inbox, Mailbox, Outbox};
2use crate::protocol::{
3    Commit, DoViewChange, GetState, NewState, Prepare, PrepareOk, Recovery, RecoveryResponse,
4    StartView, StartViewChange,
5};
6use crate::request::{ClientIdentifier, Reply};
7use crate::service::Protocol;
8use std::collections::VecDeque;
9use std::fmt::{Debug, Formatter};
10use std::iter::FusedIterator;
11
12pub struct Envelope<D, P> {
13    pub destination: D,
14    pub payload: P,
15}
16
17#[derive(Eq, PartialEq)]
18pub enum ProtocolPayload<P>
19where
20    P: Protocol,
21{
22    Prepare(Prepare<P::Request, P::Prediction>),
23    PrepareOk(PrepareOk),
24    Commit(Commit),
25    GetState(GetState),
26    NewState(NewState<P::Request, P::Prediction>),
27    StartViewChange(StartViewChange),
28    DoViewChange(DoViewChange<P::Request, P::Prediction>),
29    StartView(StartView<P::Request, P::Prediction>),
30    Recovery(Recovery),
31    RecoveryResponse(RecoveryResponse<P::Request, P::Prediction>),
32}
33
34impl<P> Clone for ProtocolPayload<P>
35where
36    P: Protocol,
37{
38    fn clone(&self) -> Self {
39        match self {
40            ProtocolPayload::Prepare(message) => Self::Prepare(message.clone()),
41            ProtocolPayload::PrepareOk(message) => Self::PrepareOk(message.clone()),
42            ProtocolPayload::Commit(message) => Self::Commit(message.clone()),
43            ProtocolPayload::GetState(message) => Self::GetState(message.clone()),
44            ProtocolPayload::NewState(message) => Self::NewState(message.clone()),
45            ProtocolPayload::StartViewChange(message) => Self::StartViewChange(message.clone()),
46            ProtocolPayload::DoViewChange(message) => Self::DoViewChange(message.clone()),
47            ProtocolPayload::StartView(message) => Self::StartView(message.clone()),
48            ProtocolPayload::Recovery(message) => Self::Recovery(message.clone()),
49            ProtocolPayload::RecoveryResponse(message) => Self::RecoveryResponse(message.clone()),
50        }
51    }
52}
53
54impl<P, Req, Pre> Debug for ProtocolPayload<P>
55where
56    P: Protocol<Request = Req, Prediction = Pre>,
57    Req: Debug,
58    Pre: Debug,
59{
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        match self {
62            ProtocolPayload::Prepare(message) => write!(f, "{message:?}"),
63            ProtocolPayload::PrepareOk(message) => write!(f, "{message:?}"),
64            ProtocolPayload::Commit(message) => write!(f, "{message:?}"),
65            ProtocolPayload::GetState(message) => write!(f, "{message:?}"),
66            ProtocolPayload::NewState(message) => write!(f, "{message:?}"),
67            ProtocolPayload::StartViewChange(message) => write!(f, "{message:?}"),
68            ProtocolPayload::DoViewChange(message) => write!(f, "{message:?}"),
69            ProtocolPayload::StartView(message) => write!(f, "{message:?}"),
70            ProtocolPayload::Recovery(message) => write!(f, "{message:?}"),
71            ProtocolPayload::RecoveryResponse(message) => write!(f, "{message:?}"),
72        }
73    }
74}
75
76impl<P> ProtocolPayload<P>
77where
78    P: Protocol,
79{
80    pub fn unwrap_prepare(self) -> Prepare<P::Request, P::Prediction> {
81        let Self::Prepare(message) = self else {
82            panic!("called `ProtocolPayload::unwrap_prepare` on a unsupported variant",)
83        };
84        message
85    }
86
87    pub fn unwrap_prepare_ok(self) -> PrepareOk {
88        let Self::PrepareOk(message) = self else {
89            panic!("called `ProtocolPayload::unwrap_prepare_ok` on a unsupported variant",)
90        };
91        message
92    }
93
94    pub fn unwrap_commit(self) -> Commit {
95        let Self::Commit(message) = self else {
96            panic!("called `ProtocolPayload::unwrap_commit` on a unsupported variant",)
97        };
98        message
99    }
100
101    pub fn unwrap_get_state(self) -> GetState {
102        let Self::GetState(message) = self else {
103            panic!("called `ProtocolPayload::unwrap_get_state` on a unsupported variant",)
104        };
105        message
106    }
107}
108
109pub struct BufferedMailbox<P>
110where
111    P: Protocol,
112{
113    inbound: VecDeque<ProtocolPayload<P>>,
114    replies: VecDeque<Envelope<ClientIdentifier, Reply<P::Reply>>>,
115    send: VecDeque<Envelope<usize, ProtocolPayload<P>>>,
116    broadcast: VecDeque<ProtocolPayload<P>>,
117}
118
119impl<P> Default for BufferedMailbox<P>
120where
121    P: Protocol,
122{
123    fn default() -> Self {
124        Self {
125            inbound: Default::default(),
126            replies: Default::default(),
127            send: Default::default(),
128            broadcast: Default::default(),
129        }
130    }
131}
132
133impl<P> BufferedMailbox<P>
134where
135    P: Protocol,
136{
137    pub fn is_empty(&self) -> bool {
138        self.inbound.is_empty()
139            && self.replies.is_empty()
140            && self.send.is_empty()
141            && self.broadcast.is_empty()
142    }
143
144    pub fn pop_inbound(&mut self) -> Option<ProtocolPayload<P>> {
145        self.inbound.pop_front()
146    }
147
148    pub fn drain_inbound(
149        &mut self,
150    ) -> impl DoubleEndedIterator<Item = ProtocolPayload<P>> + ExactSizeIterator + FusedIterator + '_
151    {
152        self.inbound.drain(..)
153    }
154
155    pub fn drain_replies(
156        &mut self,
157    ) -> impl DoubleEndedIterator<Item = Envelope<ClientIdentifier, Reply<P::Reply>>>
158           + ExactSizeIterator
159           + FusedIterator
160           + '_ {
161        self.replies.drain(..)
162    }
163
164    pub fn drain_send(
165        &mut self,
166    ) -> impl DoubleEndedIterator<Item = Envelope<usize, ProtocolPayload<P>>>
167           + ExactSizeIterator
168           + FusedIterator
169           + '_ {
170        self.send.drain(..)
171    }
172
173    pub fn drain_broadcast(
174        &mut self,
175    ) -> impl DoubleEndedIterator<Item = ProtocolPayload<P>> + ExactSizeIterator + FusedIterator + '_
176    {
177        self.broadcast.drain(..)
178    }
179}
180
181impl<P> Outbox<P> for BufferedMailbox<P>
182where
183    P: Protocol,
184{
185    fn prepare(&mut self, message: Prepare<P::Request, P::Prediction>) {
186        self.broadcast.push_back(ProtocolPayload::Prepare(message));
187    }
188
189    fn prepare_ok(&mut self, index: usize, message: PrepareOk) {
190        self.send.push_back(Envelope {
191            destination: index,
192            payload: ProtocolPayload::PrepareOk(message),
193        });
194    }
195
196    fn commit(&mut self, message: Commit) {
197        self.broadcast.push_back(ProtocolPayload::Commit(message));
198    }
199
200    fn get_state(&mut self, index: usize, message: GetState) {
201        self.send.push_back(Envelope {
202            destination: index,
203            payload: ProtocolPayload::GetState(message),
204        });
205    }
206
207    fn new_state(&mut self, index: usize, message: NewState<P::Request, P::Prediction>) {
208        self.send.push_back(Envelope {
209            destination: index,
210            payload: ProtocolPayload::NewState(message),
211        });
212    }
213
214    fn start_view_change(&mut self, message: StartViewChange) {
215        self.broadcast
216            .push_back(ProtocolPayload::StartViewChange(message));
217    }
218
219    fn do_view_change(&mut self, index: usize, message: DoViewChange<P::Request, P::Prediction>) {
220        self.send.push_back(Envelope {
221            destination: index,
222            payload: ProtocolPayload::DoViewChange(message),
223        });
224    }
225
226    fn start_view(&mut self, message: StartView<P::Request, P::Prediction>) {
227        self.broadcast
228            .push_back(ProtocolPayload::StartView(message));
229    }
230
231    fn recovery(&mut self, message: Recovery) {
232        self.broadcast.push_back(ProtocolPayload::Recovery(message));
233    }
234
235    fn recovery_response(
236        &mut self,
237        index: usize,
238        message: RecoveryResponse<P::Request, P::Prediction>,
239    ) {
240        self.send.push_back(Envelope {
241            destination: index,
242            payload: ProtocolPayload::RecoveryResponse(message),
243        });
244    }
245
246    fn reply(&mut self, client: ClientIdentifier, reply: &Reply<P::Reply>) {
247        self.replies.push_back(Envelope {
248            destination: client,
249            payload: reply.clone(),
250        });
251    }
252}
253
254impl<P> Inbox<P> for BufferedMailbox<P>
255where
256    P: Protocol,
257{
258    fn push_prepare(&mut self, message: Prepare<P::Request, P::Prediction>) {
259        self.inbound.push_back(ProtocolPayload::Prepare(message));
260    }
261
262    fn push_prepare_ok(&mut self, message: PrepareOk) {
263        self.inbound.push_back(ProtocolPayload::PrepareOk(message));
264    }
265
266    fn push_commit(&mut self, message: Commit) {
267        self.inbound.push_back(ProtocolPayload::Commit(message));
268    }
269
270    fn push_get_state(&mut self, message: GetState) {
271        self.inbound.push_back(ProtocolPayload::GetState(message));
272    }
273
274    fn push_new_state(&mut self, message: NewState<P::Request, P::Prediction>) {
275        self.inbound.push_back(ProtocolPayload::NewState(message));
276    }
277
278    fn push_start_view_change(&mut self, message: StartViewChange) {
279        self.inbound
280            .push_back(ProtocolPayload::StartViewChange(message));
281    }
282
283    fn push_do_view_change(&mut self, message: DoViewChange<P::Request, P::Prediction>) {
284        self.inbound
285            .push_back(ProtocolPayload::DoViewChange(message));
286    }
287
288    fn push_start_view(&mut self, message: StartView<P::Request, P::Prediction>) {
289        self.inbound.push_back(ProtocolPayload::StartView(message));
290    }
291
292    fn push_recovery(&mut self, message: Recovery) {
293        self.inbound.push_back(ProtocolPayload::Recovery(message));
294    }
295
296    fn push_recovery_response(&mut self, message: RecoveryResponse<P::Request, P::Prediction>) {
297        self.inbound
298            .push_back(ProtocolPayload::RecoveryResponse(message));
299    }
300}
301
302impl<P> Mailbox<P> for BufferedMailbox<P> where P: Protocol {}