1use crate::mail::{Inbox, Mailbox, Outbox};
2use crate::protocol::{
3 Commit, DoViewChange, GetState, NewState, Prepare, PrepareOk, Recovery, RecoveryResponse,
4 StartView, StartViewChange,
5};
6use crate::request::{ClientIdentifier, Reply};
7use crate::service::Protocol;
8use std::collections::VecDeque;
9use std::fmt::{Debug, Formatter};
10use std::iter::FusedIterator;
11
12pub struct Envelope<D, P> {
13 pub destination: D,
14 pub payload: P,
15}
16
17#[derive(Eq, PartialEq)]
18pub enum ProtocolPayload<P>
19where
20 P: Protocol,
21{
22 Prepare(Prepare<P::Request, P::Prediction>),
23 PrepareOk(PrepareOk),
24 Commit(Commit),
25 GetState(GetState),
26 NewState(NewState<P::Request, P::Prediction>),
27 StartViewChange(StartViewChange),
28 DoViewChange(DoViewChange<P::Request, P::Prediction>),
29 StartView(StartView<P::Request, P::Prediction>),
30 Recovery(Recovery),
31 RecoveryResponse(RecoveryResponse<P::Request, P::Prediction>),
32}
33
34impl<P> Clone for ProtocolPayload<P>
35where
36 P: Protocol,
37{
38 fn clone(&self) -> Self {
39 match self {
40 ProtocolPayload::Prepare(message) => Self::Prepare(message.clone()),
41 ProtocolPayload::PrepareOk(message) => Self::PrepareOk(message.clone()),
42 ProtocolPayload::Commit(message) => Self::Commit(message.clone()),
43 ProtocolPayload::GetState(message) => Self::GetState(message.clone()),
44 ProtocolPayload::NewState(message) => Self::NewState(message.clone()),
45 ProtocolPayload::StartViewChange(message) => Self::StartViewChange(message.clone()),
46 ProtocolPayload::DoViewChange(message) => Self::DoViewChange(message.clone()),
47 ProtocolPayload::StartView(message) => Self::StartView(message.clone()),
48 ProtocolPayload::Recovery(message) => Self::Recovery(message.clone()),
49 ProtocolPayload::RecoveryResponse(message) => Self::RecoveryResponse(message.clone()),
50 }
51 }
52}
53
54impl<P, Req, Pre> Debug for ProtocolPayload<P>
55where
56 P: Protocol<Request = Req, Prediction = Pre>,
57 Req: Debug,
58 Pre: Debug,
59{
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 match self {
62 ProtocolPayload::Prepare(message) => write!(f, "{message:?}"),
63 ProtocolPayload::PrepareOk(message) => write!(f, "{message:?}"),
64 ProtocolPayload::Commit(message) => write!(f, "{message:?}"),
65 ProtocolPayload::GetState(message) => write!(f, "{message:?}"),
66 ProtocolPayload::NewState(message) => write!(f, "{message:?}"),
67 ProtocolPayload::StartViewChange(message) => write!(f, "{message:?}"),
68 ProtocolPayload::DoViewChange(message) => write!(f, "{message:?}"),
69 ProtocolPayload::StartView(message) => write!(f, "{message:?}"),
70 ProtocolPayload::Recovery(message) => write!(f, "{message:?}"),
71 ProtocolPayload::RecoveryResponse(message) => write!(f, "{message:?}"),
72 }
73 }
74}
75
76impl<P> ProtocolPayload<P>
77where
78 P: Protocol,
79{
80 pub fn unwrap_prepare(self) -> Prepare<P::Request, P::Prediction> {
81 let Self::Prepare(message) = self else {
82 panic!("called `ProtocolPayload::unwrap_prepare` on a unsupported variant",)
83 };
84 message
85 }
86
87 pub fn unwrap_prepare_ok(self) -> PrepareOk {
88 let Self::PrepareOk(message) = self else {
89 panic!("called `ProtocolPayload::unwrap_prepare_ok` on a unsupported variant",)
90 };
91 message
92 }
93
94 pub fn unwrap_commit(self) -> Commit {
95 let Self::Commit(message) = self else {
96 panic!("called `ProtocolPayload::unwrap_commit` on a unsupported variant",)
97 };
98 message
99 }
100
101 pub fn unwrap_get_state(self) -> GetState {
102 let Self::GetState(message) = self else {
103 panic!("called `ProtocolPayload::unwrap_get_state` on a unsupported variant",)
104 };
105 message
106 }
107}
108
109pub struct BufferedMailbox<P>
110where
111 P: Protocol,
112{
113 inbound: VecDeque<ProtocolPayload<P>>,
114 replies: VecDeque<Envelope<ClientIdentifier, Reply<P::Reply>>>,
115 send: VecDeque<Envelope<usize, ProtocolPayload<P>>>,
116 broadcast: VecDeque<ProtocolPayload<P>>,
117}
118
119impl<P> Default for BufferedMailbox<P>
120where
121 P: Protocol,
122{
123 fn default() -> Self {
124 Self {
125 inbound: Default::default(),
126 replies: Default::default(),
127 send: Default::default(),
128 broadcast: Default::default(),
129 }
130 }
131}
132
133impl<P> BufferedMailbox<P>
134where
135 P: Protocol,
136{
137 pub fn is_empty(&self) -> bool {
138 self.inbound.is_empty()
139 && self.replies.is_empty()
140 && self.send.is_empty()
141 && self.broadcast.is_empty()
142 }
143
144 pub fn pop_inbound(&mut self) -> Option<ProtocolPayload<P>> {
145 self.inbound.pop_front()
146 }
147
148 pub fn drain_inbound(
149 &mut self,
150 ) -> impl DoubleEndedIterator<Item = ProtocolPayload<P>> + ExactSizeIterator + FusedIterator + '_
151 {
152 self.inbound.drain(..)
153 }
154
155 pub fn drain_replies(
156 &mut self,
157 ) -> impl DoubleEndedIterator<Item = Envelope<ClientIdentifier, Reply<P::Reply>>>
158 + ExactSizeIterator
159 + FusedIterator
160 + '_ {
161 self.replies.drain(..)
162 }
163
164 pub fn drain_send(
165 &mut self,
166 ) -> impl DoubleEndedIterator<Item = Envelope<usize, ProtocolPayload<P>>>
167 + ExactSizeIterator
168 + FusedIterator
169 + '_ {
170 self.send.drain(..)
171 }
172
173 pub fn drain_broadcast(
174 &mut self,
175 ) -> impl DoubleEndedIterator<Item = ProtocolPayload<P>> + ExactSizeIterator + FusedIterator + '_
176 {
177 self.broadcast.drain(..)
178 }
179}
180
181impl<P> Outbox<P> for BufferedMailbox<P>
182where
183 P: Protocol,
184{
185 fn prepare(&mut self, message: Prepare<P::Request, P::Prediction>) {
186 self.broadcast.push_back(ProtocolPayload::Prepare(message));
187 }
188
189 fn prepare_ok(&mut self, index: usize, message: PrepareOk) {
190 self.send.push_back(Envelope {
191 destination: index,
192 payload: ProtocolPayload::PrepareOk(message),
193 });
194 }
195
196 fn commit(&mut self, message: Commit) {
197 self.broadcast.push_back(ProtocolPayload::Commit(message));
198 }
199
200 fn get_state(&mut self, index: usize, message: GetState) {
201 self.send.push_back(Envelope {
202 destination: index,
203 payload: ProtocolPayload::GetState(message),
204 });
205 }
206
207 fn new_state(&mut self, index: usize, message: NewState<P::Request, P::Prediction>) {
208 self.send.push_back(Envelope {
209 destination: index,
210 payload: ProtocolPayload::NewState(message),
211 });
212 }
213
214 fn start_view_change(&mut self, message: StartViewChange) {
215 self.broadcast
216 .push_back(ProtocolPayload::StartViewChange(message));
217 }
218
219 fn do_view_change(&mut self, index: usize, message: DoViewChange<P::Request, P::Prediction>) {
220 self.send.push_back(Envelope {
221 destination: index,
222 payload: ProtocolPayload::DoViewChange(message),
223 });
224 }
225
226 fn start_view(&mut self, message: StartView<P::Request, P::Prediction>) {
227 self.broadcast
228 .push_back(ProtocolPayload::StartView(message));
229 }
230
231 fn recovery(&mut self, message: Recovery) {
232 self.broadcast.push_back(ProtocolPayload::Recovery(message));
233 }
234
235 fn recovery_response(
236 &mut self,
237 index: usize,
238 message: RecoveryResponse<P::Request, P::Prediction>,
239 ) {
240 self.send.push_back(Envelope {
241 destination: index,
242 payload: ProtocolPayload::RecoveryResponse(message),
243 });
244 }
245
246 fn reply(&mut self, client: ClientIdentifier, reply: &Reply<P::Reply>) {
247 self.replies.push_back(Envelope {
248 destination: client,
249 payload: reply.clone(),
250 });
251 }
252}
253
254impl<P> Inbox<P> for BufferedMailbox<P>
255where
256 P: Protocol,
257{
258 fn push_prepare(&mut self, message: Prepare<P::Request, P::Prediction>) {
259 self.inbound.push_back(ProtocolPayload::Prepare(message));
260 }
261
262 fn push_prepare_ok(&mut self, message: PrepareOk) {
263 self.inbound.push_back(ProtocolPayload::PrepareOk(message));
264 }
265
266 fn push_commit(&mut self, message: Commit) {
267 self.inbound.push_back(ProtocolPayload::Commit(message));
268 }
269
270 fn push_get_state(&mut self, message: GetState) {
271 self.inbound.push_back(ProtocolPayload::GetState(message));
272 }
273
274 fn push_new_state(&mut self, message: NewState<P::Request, P::Prediction>) {
275 self.inbound.push_back(ProtocolPayload::NewState(message));
276 }
277
278 fn push_start_view_change(&mut self, message: StartViewChange) {
279 self.inbound
280 .push_back(ProtocolPayload::StartViewChange(message));
281 }
282
283 fn push_do_view_change(&mut self, message: DoViewChange<P::Request, P::Prediction>) {
284 self.inbound
285 .push_back(ProtocolPayload::DoViewChange(message));
286 }
287
288 fn push_start_view(&mut self, message: StartView<P::Request, P::Prediction>) {
289 self.inbound.push_back(ProtocolPayload::StartView(message));
290 }
291
292 fn push_recovery(&mut self, message: Recovery) {
293 self.inbound.push_back(ProtocolPayload::Recovery(message));
294 }
295
296 fn push_recovery_response(&mut self, message: RecoveryResponse<P::Request, P::Prediction>) {
297 self.inbound
298 .push_back(ProtocolPayload::RecoveryResponse(message));
299 }
300}
301
302impl<P> Mailbox<P> for BufferedMailbox<P> where P: Protocol {}