1use crate::client_table::ClientTable;
2use crate::configuration::Configuration;
3use crate::log::Log;
4use crate::mail::{Mailbox, Outbox};
5use crate::nonce::Nonce;
6use crate::protocol::{
7 Checkpoint, Commit, DoViewChange, GetState, NewState, Prepare, PrepareOk, Recovery,
8 RecoveryResponse, StartView, StartViewChange,
9};
10use crate::request::{Reply, Request};
11use crate::service::Service;
12use crate::status::Status;
13use crate::viewstamp::{OpNumber, View};
14use rand::Rng;
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, HashMap, HashSet};
17
18pub struct Replica<S>
22where
23 S: Service,
24{
25 configuration: Configuration,
26 index: usize,
27 service: S,
28 status: Status,
29 view: View,
30 log: Log<S::Request, S::Prediction>,
31 committed: OpNumber,
32 client_table: ClientTable<S::Reply>,
33 prepared: BTreeMap<OpNumber, HashSet<usize>>,
34 start_view_changes: HashSet<usize>,
35 do_view_changes: HashMap<usize, DoViewChange<S::Request, S::Prediction>>,
36 recovery_responses: HashMap<usize, RecoveryResponse<S::Request, S::Prediction>>,
37 nonce: Nonce,
38}
39
40impl<S> Replica<S>
41where
42 S: Service,
43{
44 pub fn new(configuration: Configuration, index: usize, service: S) -> Self {
46 Self {
47 configuration,
48 index,
49 service,
50 status: Status::Normal,
51 view: Default::default(),
52 log: Default::default(),
53 committed: Default::default(),
54 client_table: Default::default(),
55 prepared: Default::default(),
56 start_view_changes: Default::default(),
57 do_view_changes: Default::default(),
58 recovery_responses: Default::default(),
59 nonce: Default::default(),
60 }
61 }
62
63 pub fn recovering<O>(
66 configuration: Configuration,
67 index: usize,
68 checkpoint: Checkpoint<S::Checkpoint>,
69 outbox: &mut O,
70 ) -> Self
71 where
72 O: Outbox<S>,
73 {
74 let mut replica = Self::new(configuration, index, checkpoint.state.into());
75
76 replica.committed = checkpoint.committed;
77 replica.status = Status::Recovering;
78
79 outbox.recovery(Recovery {
80 index,
81 committed: replica.committed,
82 nonce: replica.nonce,
83 });
84
85 replica
86 }
87
88 pub fn configuration(&self) -> Configuration {
89 self.configuration
90 }
91
92 pub fn index(&self) -> usize {
93 self.index
94 }
95
96 pub fn view(&self) -> View {
97 self.view
98 }
99
100 pub fn checkpoint(&self) -> Checkpoint<S::Checkpoint> {
101 Checkpoint {
102 committed: self.committed,
103 state: self.service.checkpoint(),
104 }
105 }
106
107 pub fn checkpoint_with_suffix(&mut self, suffix: usize) -> Option<Checkpoint<S::Checkpoint>> {
108 let mut new_start = self.log.first_op_number();
109 let trimmed = self.log.len().checked_sub(suffix).unwrap_or_default();
110
111 new_start.increment_by(trimmed);
112
113 if self.committed >= new_start {
114 let checkpoint = Checkpoint {
115 committed: self.committed,
116 state: self.service.checkpoint(),
117 };
118
119 self.log.constrain(suffix);
120
121 Some(checkpoint)
122 } else {
123 None
124 }
125 }
126
127 pub fn idle<O>(&mut self, outbox: &mut O)
128 where
129 O: Outbox<S>,
130 {
131 match self.status {
132 Status::Normal => {
133 if self.is_primary() {
134 if self.committed == self.log.last_op_number() {
135 outbox.commit(Commit {
136 view: self.view,
137 committed: self.committed,
138 });
139 } else {
140 self.prepare_pending(outbox);
141 }
142 } else {
143 self.start_view_change(self.view.next(), outbox);
144 }
145 }
146 Status::Recovering => {
147 outbox.recovery(Recovery {
148 index: self.index,
149 committed: self.committed,
150 nonce: self.nonce,
151 });
152 }
153 Status::ViewChange => {
154 if self.is_backup() && self.should_do_view_change() {
155 self.start_view_change(self.view.next(), outbox);
157 } else {
158 outbox.start_view_change(StartViewChange {
159 view: self.view,
160 index: self.index,
161 });
162 }
163 }
164 }
165 }
166
167 pub fn resend_pending<O>(&mut self, outbox: &mut O)
168 where
169 O: Outbox<S>,
170 {
171 match self.status {
172 Status::Normal => {
173 self.prepare_pending(outbox);
174 }
175 Status::Recovering => {
176 outbox.recovery(Recovery {
177 index: self.index,
178 committed: self.committed,
179 nonce: self.nonce,
180 });
181 }
182 Status::ViewChange => {
183 outbox.start_view_change(StartViewChange {
184 view: self.view,
185 index: self.index,
186 });
187 }
188 }
189 }
190
191 pub fn handle_request<O>(&mut self, request: Request<S::Request>, outbox: &mut O)
192 where
193 O: Outbox<S>,
194 {
195 if self.is_backup() {
196 return;
197 }
198
199 match self.client_table.compare(&request) {
200 Ok(Ordering::Greater) => {
201 let prediction = self.service.predict(&request.payload);
202 let (entry, op_number) = self.log.push(self.view, request, prediction);
203
204 self.client_table.start(entry.request());
205
206 outbox.prepare(Prepare {
207 view: self.view,
208 op_number,
209 request: entry.request().clone(),
210 prediction: entry.prediction().clone(),
211 committed: self.committed,
212 });
213 }
214 Ok(Ordering::Equal) => {
215 if let Some(reply) = self.client_table.reply(&request) {
216 outbox.reply(request.client, reply);
217 }
218 }
219 Ok(Ordering::Less) => (),
220 Err(_) => (),
221 }
222 }
223
224 pub fn handle_prepare<M>(
225 &mut self,
226 message: Prepare<S::Request, S::Prediction>,
227 mailbox: &mut M,
228 ) where
229 M: Mailbox<S>,
230 {
231 if self.need_state_transfer(message.view) {
232 self.state_transfer(message.view, mailbox);
233 mailbox.push_prepare(message);
234 return;
235 }
236
237 if self.should_ignore_normal(message.view) || self.log.contains(&message.op_number) {
238 return;
239 }
240
241 let next = self.log.next_op_number();
242 if next < message.op_number || next < message.committed {
243 self.state_transfer(message.view, mailbox);
244 mailbox.push_prepare(message);
245 return;
246 }
247
248 self.client_table.start(&message.request);
249 self.log
250 .push(self.view, message.request, message.prediction);
251 mailbox.prepare_ok(
252 self.configuration % self.view,
253 PrepareOk {
254 view: self.view,
255 op_number: message.op_number,
256 index: self.index,
257 },
258 );
259 self.commit_operations(message.committed, mailbox);
260 }
261
262 pub fn handle_prepare_ok<M>(&mut self, message: PrepareOk, mailbox: &mut M)
263 where
264 M: Mailbox<S>,
265 {
266 if self.need_state_transfer(message.view) {
267 self.state_transfer(message.view, mailbox);
268 mailbox.push_prepare_ok(message);
269 return;
270 }
271
272 if self.should_ignore_normal(message.view) || message.op_number <= self.committed {
273 return;
274 }
275
276 let prepared = self.prepared.entry(message.op_number).or_default();
277
278 prepared.insert(message.index);
279
280 if prepared.len() >= self.configuration.sub_majority() {
281 self.prepared.retain(|&o, _| o > message.op_number);
282 self.commit_operations(message.op_number, mailbox);
283 }
284 }
285
286 pub fn handle_commit<M>(&mut self, message: Commit, mailbox: &mut M)
287 where
288 M: Mailbox<S>,
289 {
290 if self.need_state_transfer(message.view) {
291 self.state_transfer(message.view, mailbox);
292 mailbox.push_commit(message);
293 return;
294 }
295
296 if self.should_ignore_normal(message.view) || message.committed <= self.committed {
297 return;
298 }
299
300 if !self.log.contains(&message.committed) {
301 self.state_transfer(message.view, mailbox);
302 mailbox.push_commit(message);
303 return;
304 }
305
306 self.commit_operations(message.committed, mailbox);
307 }
308
309 pub fn handle_get_state<M>(&mut self, message: GetState, mailbox: &mut M)
310 where
311 M: Mailbox<S>,
312 {
313 if self.need_state_transfer(message.view) {
314 self.state_transfer(message.view, mailbox);
315 mailbox.push_get_state(message);
316 return;
317 }
318
319 if self.should_ignore_normal(message.view) {
320 return;
321 }
322
323 if !self.log.contains(&message.op_number) {
324 return;
325 }
326
327 mailbox.new_state(
328 message.index,
329 NewState {
330 view: self.view,
331 log: self.log.after(message.op_number),
332 committed: self.committed,
333 },
334 );
335 }
336
337 pub fn handle_recovery<O>(&mut self, message: Recovery, outbox: &mut O)
338 where
339 O: Outbox<S>,
340 {
341 if self.status != Status::Normal {
342 return;
343 }
344
345 let mut response = RecoveryResponse {
346 view: self.view,
347 nonce: message.nonce,
348 log: Default::default(),
349 committed: Default::default(),
350 index: self.index,
351 };
352
353 if self.is_primary() {
354 response.log = self.log.clone();
355 response.committed = self.committed;
356 }
357
358 outbox.recovery_response(message.index, response);
359 }
360
361 pub fn handle_recovery_response<O>(
362 &mut self,
363 message: RecoveryResponse<S::Request, S::Prediction>,
364 outbox: &mut O,
365 ) where
366 O: Outbox<S>,
367 {
368 if self.status != Status::Recovering || self.nonce != message.nonce {
369 return;
370 }
371
372 self.recovery_responses.insert(message.index, message);
373
374 if self.recovery_responses.len() >= self.configuration.quorum() {
375 let view = self
376 .recovery_responses
377 .values()
378 .map(|r| r.view)
379 .max()
380 .unwrap_or_default();
381 let primary = self.configuration % view;
382
383 if let Some(primary_response) = self.recovery_responses.remove(&primary) {
384 self.view = primary_response.view;
385 self.log = primary_response.log;
386 self.set_status(Status::Normal);
387 self.commit_operations(primary_response.committed, outbox);
388 self.prepare_pending(outbox);
389 }
390 }
391 }
392
393 pub fn handle_new_state<O>(
394 &mut self,
395 message: NewState<S::Request, S::Prediction>,
396 outbox: &mut O,
397 ) where
398 O: Outbox<S>,
399 {
400 if message.view < self.view
401 || self.status != Status::Normal
402 || message.log.first_op_number() != self.log.next_op_number()
403 {
404 return;
405 }
406
407 self.view = message.view;
408 self.log.extend(message.log);
409 self.commit_operations(message.committed, outbox);
410 self.prepare_pending(outbox);
411 }
412
413 pub fn handle_start_view_change<O>(&mut self, message: StartViewChange, outbox: &mut O)
414 where
415 O: Outbox<S>,
416 {
417 if self.need_view_change(message.view) {
418 self.start_view_change(message.view, outbox);
419 }
420
421 if self.should_ignore_view_change(message.view) {
422 return;
423 }
424
425 self.start_view_changes.insert(message.index);
426
427 if self.should_do_view_change() {
428 outbox.do_view_change(
429 self.configuration % self.view,
430 DoViewChange {
431 view: self.view,
432 log: self.log.clone(),
433 committed: self.committed,
434 index: self.index,
435 },
436 )
437 }
438 }
439
440 pub fn handle_do_view_change<O>(
441 &mut self,
442 message: DoViewChange<S::Request, S::Prediction>,
443 outbox: &mut O,
444 ) where
445 O: Outbox<S>,
446 {
447 if self.need_view_change(message.view) {
448 self.start_view_change(message.view, outbox);
449 }
450
451 if self.should_ignore_view_change(message.view) {
452 return;
453 }
454
455 self.do_view_changes.insert(message.index, message);
456
457 if self.do_view_changes.contains_key(&self.index)
458 && self.do_view_changes.len() >= self.configuration.quorum()
459 {
460 let committed = self
461 .do_view_changes
462 .values()
463 .map(|v| v.committed)
464 .max()
465 .unwrap_or(self.committed);
466 if let Some(do_view_change) = self
467 .do_view_changes
468 .drain()
469 .map(|(_, v)| v)
470 .max_by(|x, y| x.log.cmp(&y.log))
471 {
472 self.log = do_view_change.log;
473 self.view = do_view_change.view;
474 self.set_status(Status::Normal);
475
476 outbox.start_view(StartView {
477 view: self.view,
478 log: self.log.clone(),
479 committed,
480 });
481
482 self.commit_operations(committed, outbox);
483 self.prepare_pending(outbox);
484 }
485 }
486 }
487
488 pub fn handle_start_view<O>(
489 &mut self,
490 message: StartView<S::Request, S::Prediction>,
491 outbox: &mut O,
492 ) where
493 O: Outbox<S>,
494 {
495 if message.view < self.view {
496 return;
497 }
498
499 if message.view == self.view && self.status == Status::Normal {
500 return;
501 }
502
503 self.view = message.view;
504 self.log = message.log;
505
506 self.set_status(Status::Normal);
507 self.commit_operations(message.committed, outbox);
508 self.prepare_pending(outbox);
509 }
510
511 fn start_view_change<O>(&mut self, view: View, outbox: &mut O)
512 where
513 O: Outbox<S>,
514 {
515 self.view = view;
516
517 self.set_status(Status::ViewChange);
518
519 outbox.start_view_change(StartViewChange {
520 view: self.view,
521 index: self.index,
522 });
523 }
524
525 fn state_transfer<O>(&mut self, view: View, outbox: &mut O)
526 where
527 O: Outbox<S>,
528 {
529 if self.view < view {
530 self.log.truncate(self.committed);
531 }
532
533 let replicas = self.configuration.replicas();
534
535 let mut replica = self.index;
536 while replica == self.index {
537 replica = rand::thread_rng().gen_range(0..replicas);
538 }
539
540 outbox.get_state(
541 replica,
542 GetState {
543 view: self.view,
544 op_number: self.log.last_op_number(),
545 index: self.index,
546 },
547 );
548 }
549
550 fn commit_operations<O>(&mut self, committed: OpNumber, outbox: &mut O)
551 where
552 O: Outbox<S>,
553 {
554 while self.committed < committed {
555 self.committed.increment();
556
557 let entry = &self.log[self.committed];
558 let request = entry.request();
559 let reply = Reply {
560 view: self.view,
561 id: request.id,
562 payload: self.service.invoke(&request.payload, entry.prediction()),
563 };
564
565 if self.is_primary() {
566 outbox.reply(request.client, &reply);
567 }
568
569 self.client_table.finish(request, reply);
570 }
571 }
572
573 fn prepare_pending<O>(&mut self, outbox: &mut O)
574 where
575 O: Outbox<S>,
576 {
577 let mut current = self.committed.next();
578
579 while self.log.contains(¤t) {
580 let entry = &self.log[current];
581 let request = entry.request();
582
583 self.client_table.start(request);
584
585 if self.is_primary() {
586 outbox.prepare(Prepare {
587 view: self.view,
588 op_number: current,
589 request: entry.request().clone(),
590 prediction: entry.prediction().clone(),
591 committed: self.committed,
592 });
593 } else {
594 outbox.prepare_ok(
595 self.configuration % self.view,
596 PrepareOk {
597 view: self.view,
598 op_number: current,
599 index: self.index,
600 },
601 );
602 }
603
604 current.increment();
605 }
606 }
607
608 fn set_status(&mut self, status: Status) {
609 self.status = status;
610 self.prepared = Default::default();
611
612 self.recovery_responses = Default::default();
614
615 match self.status {
617 Status::ViewChange => {
618 self.start_view_changes = HashSet::with_capacity(self.configuration.sub_majority());
619 self.do_view_changes = HashMap::with_capacity(self.configuration.quorum());
620 }
621 _ => {
622 self.start_view_changes = Default::default();
623 self.do_view_changes = Default::default();
624 }
625 }
626 }
627
628 pub fn is_primary(&self) -> bool {
629 (self.configuration % self.view) == self.index
630 }
631
632 pub fn is_backup(&self) -> bool {
633 !self.is_primary()
634 }
635
636 fn should_ignore_normal(&self, view: View) -> bool {
637 self.view != view || self.status != Status::Normal
638 }
639
640 fn need_state_transfer(&self, view: View) -> bool {
641 self.status == Status::Normal && view > self.view
642 }
643
644 fn should_ignore_view_change(&self, view: View) -> bool {
645 self.view != view || self.status != Status::ViewChange
646 }
647
648 fn need_view_change(&self, view: View) -> bool {
649 self.status != Status::Recovering && view > self.view
650 }
651
652 fn should_do_view_change(&self) -> bool {
653 self.start_view_changes.len() >= self.configuration.sub_majority()
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660 use crate::buffer::{BufferedMailbox, ProtocolPayload};
661
662 #[test]
663 fn sender_behind_prepare() {
664 let configuration = Configuration::from(3);
665 let mut replica = Replica::new(configuration, 0, 0);
666 let mut mailbox = BufferedMailbox::default();
667
668 replica.view.increment();
669 replica.view.increment();
670
671 let message = Prepare {
672 view: View::default().next(),
673 op_number: OpNumber::default().next(),
674 request: Request {
675 payload: 2,
676 client: Default::default(),
677 id: Default::default(),
678 },
679 prediction: (),
680 committed: OpNumber::default(),
681 };
682
683 replica.handle_prepare(message, &mut mailbox);
684
685 assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
686 assert!(mailbox.is_empty());
687 }
688
689 #[test]
690 fn sender_ahead_prepare() {
691 let configuration = Configuration::from(3);
692 let mut replica = Replica::new(configuration, 1, 0);
693 let mut mailbox = BufferedMailbox::default();
694
695 let message = Prepare {
696 view: View::default().next(),
697 op_number: OpNumber::default().next(),
698 request: Request {
699 payload: 2,
700 client: Default::default(),
701 id: Default::default(),
702 },
703 prediction: (),
704 committed: OpNumber::default(),
705 };
706
707 replica.handle_prepare(message.clone(), &mut mailbox);
708
709 assert_eq!(
710 mailbox.pop_inbound().map(ProtocolPayload::unwrap_prepare),
711 Some(message)
712 );
713
714 let mut messages = Vec::from_iter(mailbox.drain_send());
715 let outbound = GetState {
716 view: replica.view,
717 op_number: replica.log.last_op_number(),
718 index: replica.index,
719 };
720 let envelope = messages.pop().unwrap();
721
722 assert_ne!(envelope.destination, replica.index);
723 assert_eq!(envelope.payload.unwrap_get_state(), outbound);
724 assert!(messages.is_empty());
725 assert!(mailbox.is_empty());
726 }
727
728 #[test]
729 fn sender_behind_prepare_ok() {
730 let configuration = Configuration::from(3);
731 let mut replica = Replica::new(configuration, 2, 0);
732 let mut mailbox = BufferedMailbox::default();
733
734 replica.view.increment();
735 replica.view.increment();
736
737 let message = PrepareOk {
738 view: View::default().next(),
739 op_number: OpNumber::default().next(),
740 index: 0,
741 };
742
743 replica.handle_prepare_ok(message, &mut mailbox);
744
745 assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
746 assert!(mailbox.is_empty());
747 }
748
749 #[test]
750 fn sender_ahead_prepare_ok() {
751 let configuration = Configuration::from(3);
752 let mut replica = Replica::new(configuration, 1, 0);
753 let mut mailbox = BufferedMailbox::default();
754
755 let message = PrepareOk {
756 view: View::default().next(),
757 op_number: OpNumber::default().next(),
758 index: 0,
759 };
760
761 replica.handle_prepare_ok(message.clone(), &mut mailbox);
762
763 assert_eq!(
764 mailbox
765 .pop_inbound()
766 .map(ProtocolPayload::unwrap_prepare_ok),
767 Some(message)
768 );
769
770 let mut messages = Vec::from_iter(mailbox.drain_send());
771 let outbound = GetState {
772 view: replica.view,
773 op_number: replica.log.last_op_number(),
774 index: replica.index,
775 };
776 let envelope = messages.pop().unwrap();
777
778 assert_ne!(envelope.destination, replica.index);
779 assert_eq!(envelope.payload.unwrap_get_state(), outbound);
780 assert!(messages.is_empty());
781 assert!(mailbox.is_empty());
782 }
783
784 #[test]
785 fn sender_behind_commit() {
786 let configuration = Configuration::from(3);
787 let mut replica = Replica::new(configuration, 0, 0);
788 let mut mailbox = BufferedMailbox::default();
789
790 replica.view.increment();
791 replica.view.increment();
792
793 let message = Commit {
794 view: View::default().next(),
795 committed: OpNumber::default().next(),
796 };
797
798 replica.handle_commit(message, &mut mailbox);
799
800 assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
801 assert!(mailbox.is_empty());
802 }
803
804 #[test]
805 fn sender_ahead_commit() {
806 let configuration = Configuration::from(3);
807 let mut replica = Replica::new(configuration, 0, 0);
808 let mut mailbox = BufferedMailbox::default();
809
810 let message = Commit {
811 view: View::default().next(),
812 committed: OpNumber::default().next(),
813 };
814
815 replica.handle_commit(message.clone(), &mut mailbox);
816
817 assert_eq!(
818 mailbox.pop_inbound().map(ProtocolPayload::unwrap_commit),
819 Some(message)
820 );
821
822 let mut messages = Vec::from_iter(mailbox.drain_send());
823 let outbound = GetState {
824 view: replica.view,
825 op_number: replica.log.last_op_number(),
826 index: replica.index,
827 };
828 let envelope = messages.pop().unwrap();
829
830 assert_ne!(envelope.destination, replica.index);
831 assert_eq!(envelope.payload.unwrap_get_state(), outbound);
832 assert!(messages.is_empty());
833 assert!(mailbox.is_empty());
834 }
835
836 #[test]
837 fn sender_behind_get_state() {
838 let configuration = Configuration::from(3);
839 let mut replica = Replica::new(configuration, 0, 0);
840 let mut mailbox = BufferedMailbox::default();
841
842 replica.view.increment();
843 replica.view.increment();
844
845 let message = GetState {
846 view: View::default().next(),
847 op_number: OpNumber::default(),
848 index: 1,
849 };
850
851 replica.handle_get_state(message, &mut mailbox);
852
853 assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
854 assert!(mailbox.is_empty());
855 }
856
857 #[test]
858 fn sender_ahead_get_state() {
859 let configuration = Configuration::from(3);
860 let mut replica = Replica::new(configuration, 0, 0);
861 let mut mailbox = BufferedMailbox::default();
862
863 let message = GetState {
864 view: View::default().next(),
865 op_number: OpNumber::default().next(),
866 index: 1,
867 };
868
869 replica.handle_get_state(message.clone(), &mut mailbox);
870
871 assert_eq!(
872 mailbox.pop_inbound().map(ProtocolPayload::unwrap_get_state),
873 Some(message)
874 );
875
876 let mut messages = Vec::from_iter(mailbox.drain_send());
877 let outbound = GetState {
878 view: replica.view,
879 op_number: replica.log.last_op_number(),
880 index: replica.index,
881 };
882 let envelope = messages.pop().unwrap();
883
884 assert_ne!(envelope.destination, replica.index);
885 assert_eq!(envelope.payload.unwrap_get_state(), outbound);
886 assert!(messages.is_empty());
887 assert!(mailbox.is_empty());
888 }
889
890 #[test]
891 fn sender_behind_new_state() {
892 let configuration = Configuration::from(3);
893 let mut replica = Replica::new(configuration, 0, 0);
894 let mut outbox = BufferedMailbox::default();
895
896 replica.view.increment();
897 replica.view.increment();
898 replica.log.push(
899 replica.view,
900 Request {
901 payload: 2,
902 client: Default::default(),
903 id: Default::default(),
904 },
905 (),
906 );
907
908 let message = NewState {
909 view: View::default().next(),
910 log: Log::default(),
911 committed: OpNumber::default().next(),
912 };
913
914 replica.handle_new_state(message.clone(), &mut outbox);
915
916 assert_ne!(replica.log, message.log);
917 assert_ne!(replica.committed, message.committed);
918 assert!(outbox.is_empty());
919 }
920}