viewstamped_replication/
replica.rs

1use crate::client_table::ClientTable;
2use crate::configuration::Configuration;
3use crate::log::Log;
4use crate::mail::{Mailbox, Outbox};
5use crate::nonce::Nonce;
6use crate::protocol::{
7    Checkpoint, Commit, DoViewChange, GetState, NewState, Prepare, PrepareOk, Recovery,
8    RecoveryResponse, StartView, StartViewChange,
9};
10use crate::request::{Reply, Request};
11use crate::service::Service;
12use crate::status::Status;
13use crate::viewstamp::{OpNumber, View};
14use rand::Rng;
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, HashMap, HashSet};
17
18/// A replica may perform the role of a primary or backup depending on the configuration and the current view.
19/// Implements a message-based viewstamped replication revisited protocol that does not wait for messages to arrive.
20/// Instead, the protocol requires returned messages to be re-queued for later (i.e. after a new message comes in).
21pub struct Replica<S>
22where
23    S: Service,
24{
25    configuration: Configuration,
26    index: usize,
27    service: S,
28    status: Status,
29    view: View,
30    log: Log<S::Request, S::Prediction>,
31    committed: OpNumber,
32    client_table: ClientTable<S::Reply>,
33    prepared: BTreeMap<OpNumber, HashSet<usize>>,
34    start_view_changes: HashSet<usize>,
35    do_view_changes: HashMap<usize, DoViewChange<S::Request, S::Prediction>>,
36    recovery_responses: HashMap<usize, RecoveryResponse<S::Request, S::Prediction>>,
37    nonce: Nonce,
38}
39
40impl<S> Replica<S>
41where
42    S: Service,
43{
44    /// Creates a new instance of a replica.
45    pub fn new(configuration: Configuration, index: usize, service: S) -> Self {
46        Self {
47            configuration,
48            index,
49            service,
50            status: Status::Normal,
51            view: Default::default(),
52            log: Default::default(),
53            committed: Default::default(),
54            client_table: Default::default(),
55            prepared: Default::default(),
56            start_view_changes: Default::default(),
57            do_view_changes: Default::default(),
58            recovery_responses: Default::default(),
59            nonce: Default::default(),
60        }
61    }
62
63    /// Creates a new instance of a replica running the recovery protocol.
64    /// The caller is responsible for determining when a replica needs to recover.
65    pub fn recovering<O>(
66        configuration: Configuration,
67        index: usize,
68        checkpoint: Checkpoint<S::Checkpoint>,
69        outbox: &mut O,
70    ) -> Self
71    where
72        O: Outbox<S>,
73    {
74        let mut replica = Self::new(configuration, index, checkpoint.state.into());
75
76        replica.committed = checkpoint.committed;
77        replica.status = Status::Recovering;
78
79        outbox.recovery(Recovery {
80            index,
81            committed: replica.committed,
82            nonce: replica.nonce,
83        });
84
85        replica
86    }
87
88    pub fn configuration(&self) -> Configuration {
89        self.configuration
90    }
91
92    pub fn index(&self) -> usize {
93        self.index
94    }
95
96    pub fn view(&self) -> View {
97        self.view
98    }
99
100    pub fn checkpoint(&self) -> Checkpoint<S::Checkpoint> {
101        Checkpoint {
102            committed: self.committed,
103            state: self.service.checkpoint(),
104        }
105    }
106
107    pub fn checkpoint_with_suffix(&mut self, suffix: usize) -> Option<Checkpoint<S::Checkpoint>> {
108        let mut new_start = self.log.first_op_number();
109        let trimmed = self.log.len().checked_sub(suffix).unwrap_or_default();
110
111        new_start.increment_by(trimmed);
112
113        if self.committed >= new_start {
114            let checkpoint = Checkpoint {
115                committed: self.committed,
116                state: self.service.checkpoint(),
117            };
118
119            self.log.constrain(suffix);
120
121            Some(checkpoint)
122        } else {
123            None
124        }
125    }
126
127    pub fn idle<O>(&mut self, outbox: &mut O)
128    where
129        O: Outbox<S>,
130    {
131        match self.status {
132            Status::Normal => {
133                if self.is_primary() {
134                    if self.committed == self.log.last_op_number() {
135                        outbox.commit(Commit {
136                            view: self.view,
137                            committed: self.committed,
138                        });
139                    } else {
140                        self.prepare_pending(outbox);
141                    }
142                } else {
143                    self.start_view_change(self.view.next(), outbox);
144                }
145            }
146            Status::Recovering => {
147                outbox.recovery(Recovery {
148                    index: self.index,
149                    committed: self.committed,
150                    nonce: self.nonce,
151                });
152            }
153            Status::ViewChange => {
154                if self.is_backup() && self.should_do_view_change() {
155                    // The new primary is unresponsive. Start a new view change.
156                    self.start_view_change(self.view.next(), outbox);
157                } else {
158                    outbox.start_view_change(StartViewChange {
159                        view: self.view,
160                        index: self.index,
161                    });
162                }
163            }
164        }
165    }
166
167    pub fn resend_pending<O>(&mut self, outbox: &mut O)
168    where
169        O: Outbox<S>,
170    {
171        match self.status {
172            Status::Normal => {
173                self.prepare_pending(outbox);
174            }
175            Status::Recovering => {
176                outbox.recovery(Recovery {
177                    index: self.index,
178                    committed: self.committed,
179                    nonce: self.nonce,
180                });
181            }
182            Status::ViewChange => {
183                outbox.start_view_change(StartViewChange {
184                    view: self.view,
185                    index: self.index,
186                });
187            }
188        }
189    }
190
191    pub fn handle_request<O>(&mut self, request: Request<S::Request>, outbox: &mut O)
192    where
193        O: Outbox<S>,
194    {
195        if self.is_backup() {
196            return;
197        }
198
199        match self.client_table.compare(&request) {
200            Ok(Ordering::Greater) => {
201                let prediction = self.service.predict(&request.payload);
202                let (entry, op_number) = self.log.push(self.view, request, prediction);
203
204                self.client_table.start(entry.request());
205
206                outbox.prepare(Prepare {
207                    view: self.view,
208                    op_number,
209                    request: entry.request().clone(),
210                    prediction: entry.prediction().clone(),
211                    committed: self.committed,
212                });
213            }
214            Ok(Ordering::Equal) => {
215                if let Some(reply) = self.client_table.reply(&request) {
216                    outbox.reply(request.client, reply);
217                }
218            }
219            Ok(Ordering::Less) => (),
220            Err(_) => (),
221        }
222    }
223
224    pub fn handle_prepare<M>(
225        &mut self,
226        message: Prepare<S::Request, S::Prediction>,
227        mailbox: &mut M,
228    ) where
229        M: Mailbox<S>,
230    {
231        if self.need_state_transfer(message.view) {
232            self.state_transfer(message.view, mailbox);
233            mailbox.push_prepare(message);
234            return;
235        }
236
237        if self.should_ignore_normal(message.view) || self.log.contains(&message.op_number) {
238            return;
239        }
240
241        let next = self.log.next_op_number();
242        if next < message.op_number || next < message.committed {
243            self.state_transfer(message.view, mailbox);
244            mailbox.push_prepare(message);
245            return;
246        }
247
248        self.client_table.start(&message.request);
249        self.log
250            .push(self.view, message.request, message.prediction);
251        mailbox.prepare_ok(
252            self.configuration % self.view,
253            PrepareOk {
254                view: self.view,
255                op_number: message.op_number,
256                index: self.index,
257            },
258        );
259        self.commit_operations(message.committed, mailbox);
260    }
261
262    pub fn handle_prepare_ok<M>(&mut self, message: PrepareOk, mailbox: &mut M)
263    where
264        M: Mailbox<S>,
265    {
266        if self.need_state_transfer(message.view) {
267            self.state_transfer(message.view, mailbox);
268            mailbox.push_prepare_ok(message);
269            return;
270        }
271
272        if self.should_ignore_normal(message.view) || message.op_number <= self.committed {
273            return;
274        }
275
276        let prepared = self.prepared.entry(message.op_number).or_default();
277
278        prepared.insert(message.index);
279
280        if prepared.len() >= self.configuration.sub_majority() {
281            self.prepared.retain(|&o, _| o > message.op_number);
282            self.commit_operations(message.op_number, mailbox);
283        }
284    }
285
286    pub fn handle_commit<M>(&mut self, message: Commit, mailbox: &mut M)
287    where
288        M: Mailbox<S>,
289    {
290        if self.need_state_transfer(message.view) {
291            self.state_transfer(message.view, mailbox);
292            mailbox.push_commit(message);
293            return;
294        }
295
296        if self.should_ignore_normal(message.view) || message.committed <= self.committed {
297            return;
298        }
299
300        if !self.log.contains(&message.committed) {
301            self.state_transfer(message.view, mailbox);
302            mailbox.push_commit(message);
303            return;
304        }
305
306        self.commit_operations(message.committed, mailbox);
307    }
308
309    pub fn handle_get_state<M>(&mut self, message: GetState, mailbox: &mut M)
310    where
311        M: Mailbox<S>,
312    {
313        if self.need_state_transfer(message.view) {
314            self.state_transfer(message.view, mailbox);
315            mailbox.push_get_state(message);
316            return;
317        }
318
319        if self.should_ignore_normal(message.view) {
320            return;
321        }
322
323        if !self.log.contains(&message.op_number) {
324            return;
325        }
326
327        mailbox.new_state(
328            message.index,
329            NewState {
330                view: self.view,
331                log: self.log.after(message.op_number),
332                committed: self.committed,
333            },
334        );
335    }
336
337    pub fn handle_recovery<O>(&mut self, message: Recovery, outbox: &mut O)
338    where
339        O: Outbox<S>,
340    {
341        if self.status != Status::Normal {
342            return;
343        }
344
345        let mut response = RecoveryResponse {
346            view: self.view,
347            nonce: message.nonce,
348            log: Default::default(),
349            committed: Default::default(),
350            index: self.index,
351        };
352
353        if self.is_primary() {
354            response.log = self.log.clone();
355            response.committed = self.committed;
356        }
357
358        outbox.recovery_response(message.index, response);
359    }
360
361    pub fn handle_recovery_response<O>(
362        &mut self,
363        message: RecoveryResponse<S::Request, S::Prediction>,
364        outbox: &mut O,
365    ) where
366        O: Outbox<S>,
367    {
368        if self.status != Status::Recovering || self.nonce != message.nonce {
369            return;
370        }
371
372        self.recovery_responses.insert(message.index, message);
373
374        if self.recovery_responses.len() >= self.configuration.quorum() {
375            let view = self
376                .recovery_responses
377                .values()
378                .map(|r| r.view)
379                .max()
380                .unwrap_or_default();
381            let primary = self.configuration % view;
382
383            if let Some(primary_response) = self.recovery_responses.remove(&primary) {
384                self.view = primary_response.view;
385                self.log = primary_response.log;
386                self.set_status(Status::Normal);
387                self.commit_operations(primary_response.committed, outbox);
388                self.prepare_pending(outbox);
389            }
390        }
391    }
392
393    pub fn handle_new_state<O>(
394        &mut self,
395        message: NewState<S::Request, S::Prediction>,
396        outbox: &mut O,
397    ) where
398        O: Outbox<S>,
399    {
400        if message.view < self.view
401            || self.status != Status::Normal
402            || message.log.first_op_number() != self.log.next_op_number()
403        {
404            return;
405        }
406
407        self.view = message.view;
408        self.log.extend(message.log);
409        self.commit_operations(message.committed, outbox);
410        self.prepare_pending(outbox);
411    }
412
413    pub fn handle_start_view_change<O>(&mut self, message: StartViewChange, outbox: &mut O)
414    where
415        O: Outbox<S>,
416    {
417        if self.need_view_change(message.view) {
418            self.start_view_change(message.view, outbox);
419        }
420
421        if self.should_ignore_view_change(message.view) {
422            return;
423        }
424
425        self.start_view_changes.insert(message.index);
426
427        if self.should_do_view_change() {
428            outbox.do_view_change(
429                self.configuration % self.view,
430                DoViewChange {
431                    view: self.view,
432                    log: self.log.clone(),
433                    committed: self.committed,
434                    index: self.index,
435                },
436            )
437        }
438    }
439
440    pub fn handle_do_view_change<O>(
441        &mut self,
442        message: DoViewChange<S::Request, S::Prediction>,
443        outbox: &mut O,
444    ) where
445        O: Outbox<S>,
446    {
447        if self.need_view_change(message.view) {
448            self.start_view_change(message.view, outbox);
449        }
450
451        if self.should_ignore_view_change(message.view) {
452            return;
453        }
454
455        self.do_view_changes.insert(message.index, message);
456
457        if self.do_view_changes.contains_key(&self.index)
458            && self.do_view_changes.len() >= self.configuration.quorum()
459        {
460            let committed = self
461                .do_view_changes
462                .values()
463                .map(|v| v.committed)
464                .max()
465                .unwrap_or(self.committed);
466            if let Some(do_view_change) = self
467                .do_view_changes
468                .drain()
469                .map(|(_, v)| v)
470                .max_by(|x, y| x.log.cmp(&y.log))
471            {
472                self.log = do_view_change.log;
473                self.view = do_view_change.view;
474                self.set_status(Status::Normal);
475
476                outbox.start_view(StartView {
477                    view: self.view,
478                    log: self.log.clone(),
479                    committed,
480                });
481
482                self.commit_operations(committed, outbox);
483                self.prepare_pending(outbox);
484            }
485        }
486    }
487
488    pub fn handle_start_view<O>(
489        &mut self,
490        message: StartView<S::Request, S::Prediction>,
491        outbox: &mut O,
492    ) where
493        O: Outbox<S>,
494    {
495        if message.view < self.view {
496            return;
497        }
498
499        if message.view == self.view && self.status == Status::Normal {
500            return;
501        }
502
503        self.view = message.view;
504        self.log = message.log;
505
506        self.set_status(Status::Normal);
507        self.commit_operations(message.committed, outbox);
508        self.prepare_pending(outbox);
509    }
510
511    fn start_view_change<O>(&mut self, view: View, outbox: &mut O)
512    where
513        O: Outbox<S>,
514    {
515        self.view = view;
516
517        self.set_status(Status::ViewChange);
518
519        outbox.start_view_change(StartViewChange {
520            view: self.view,
521            index: self.index,
522        });
523    }
524
525    fn state_transfer<O>(&mut self, view: View, outbox: &mut O)
526    where
527        O: Outbox<S>,
528    {
529        if self.view < view {
530            self.log.truncate(self.committed);
531        }
532
533        let replicas = self.configuration.replicas();
534
535        let mut replica = self.index;
536        while replica == self.index {
537            replica = rand::thread_rng().gen_range(0..replicas);
538        }
539
540        outbox.get_state(
541            replica,
542            GetState {
543                view: self.view,
544                op_number: self.log.last_op_number(),
545                index: self.index,
546            },
547        );
548    }
549
550    fn commit_operations<O>(&mut self, committed: OpNumber, outbox: &mut O)
551    where
552        O: Outbox<S>,
553    {
554        while self.committed < committed {
555            self.committed.increment();
556
557            let entry = &self.log[self.committed];
558            let request = entry.request();
559            let reply = Reply {
560                view: self.view,
561                id: request.id,
562                payload: self.service.invoke(&request.payload, entry.prediction()),
563            };
564
565            if self.is_primary() {
566                outbox.reply(request.client, &reply);
567            }
568
569            self.client_table.finish(request, reply);
570        }
571    }
572
573    fn prepare_pending<O>(&mut self, outbox: &mut O)
574    where
575        O: Outbox<S>,
576    {
577        let mut current = self.committed.next();
578
579        while self.log.contains(&current) {
580            let entry = &self.log[current];
581            let request = entry.request();
582
583            self.client_table.start(request);
584
585            if self.is_primary() {
586                outbox.prepare(Prepare {
587                    view: self.view,
588                    op_number: current,
589                    request: entry.request().clone(),
590                    prediction: entry.prediction().clone(),
591                    committed: self.committed,
592                });
593            } else {
594                outbox.prepare_ok(
595                    self.configuration % self.view,
596                    PrepareOk {
597                        view: self.view,
598                        op_number: current,
599                        index: self.index,
600                    },
601                );
602            }
603
604            current.increment();
605        }
606    }
607
608    fn set_status(&mut self, status: Status) {
609        self.status = status;
610        self.prepared = Default::default();
611
612        // We only need this on a new replica. Therefore, we can deallocate on any status change.
613        self.recovery_responses = Default::default();
614
615        // Avoid allocating unless we need it for the current protocol.
616        match self.status {
617            Status::ViewChange => {
618                self.start_view_changes = HashSet::with_capacity(self.configuration.sub_majority());
619                self.do_view_changes = HashMap::with_capacity(self.configuration.quorum());
620            }
621            _ => {
622                self.start_view_changes = Default::default();
623                self.do_view_changes = Default::default();
624            }
625        }
626    }
627
628    pub fn is_primary(&self) -> bool {
629        (self.configuration % self.view) == self.index
630    }
631
632    pub fn is_backup(&self) -> bool {
633        !self.is_primary()
634    }
635
636    fn should_ignore_normal(&self, view: View) -> bool {
637        self.view != view || self.status != Status::Normal
638    }
639
640    fn need_state_transfer(&self, view: View) -> bool {
641        self.status == Status::Normal && view > self.view
642    }
643
644    fn should_ignore_view_change(&self, view: View) -> bool {
645        self.view != view || self.status != Status::ViewChange
646    }
647
648    fn need_view_change(&self, view: View) -> bool {
649        self.status != Status::Recovering && view > self.view
650    }
651
652    fn should_do_view_change(&self) -> bool {
653        self.start_view_changes.len() >= self.configuration.sub_majority()
654    }
655}
656
657#[cfg(test)]
658mod tests {
659    use super::*;
660    use crate::buffer::{BufferedMailbox, ProtocolPayload};
661
662    #[test]
663    fn sender_behind_prepare() {
664        let configuration = Configuration::from(3);
665        let mut replica = Replica::new(configuration, 0, 0);
666        let mut mailbox = BufferedMailbox::default();
667
668        replica.view.increment();
669        replica.view.increment();
670
671        let message = Prepare {
672            view: View::default().next(),
673            op_number: OpNumber::default().next(),
674            request: Request {
675                payload: 2,
676                client: Default::default(),
677                id: Default::default(),
678            },
679            prediction: (),
680            committed: OpNumber::default(),
681        };
682
683        replica.handle_prepare(message, &mut mailbox);
684
685        assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
686        assert!(mailbox.is_empty());
687    }
688
689    #[test]
690    fn sender_ahead_prepare() {
691        let configuration = Configuration::from(3);
692        let mut replica = Replica::new(configuration, 1, 0);
693        let mut mailbox = BufferedMailbox::default();
694
695        let message = Prepare {
696            view: View::default().next(),
697            op_number: OpNumber::default().next(),
698            request: Request {
699                payload: 2,
700                client: Default::default(),
701                id: Default::default(),
702            },
703            prediction: (),
704            committed: OpNumber::default(),
705        };
706
707        replica.handle_prepare(message.clone(), &mut mailbox);
708
709        assert_eq!(
710            mailbox.pop_inbound().map(ProtocolPayload::unwrap_prepare),
711            Some(message)
712        );
713
714        let mut messages = Vec::from_iter(mailbox.drain_send());
715        let outbound = GetState {
716            view: replica.view,
717            op_number: replica.log.last_op_number(),
718            index: replica.index,
719        };
720        let envelope = messages.pop().unwrap();
721
722        assert_ne!(envelope.destination, replica.index);
723        assert_eq!(envelope.payload.unwrap_get_state(), outbound);
724        assert!(messages.is_empty());
725        assert!(mailbox.is_empty());
726    }
727
728    #[test]
729    fn sender_behind_prepare_ok() {
730        let configuration = Configuration::from(3);
731        let mut replica = Replica::new(configuration, 2, 0);
732        let mut mailbox = BufferedMailbox::default();
733
734        replica.view.increment();
735        replica.view.increment();
736
737        let message = PrepareOk {
738            view: View::default().next(),
739            op_number: OpNumber::default().next(),
740            index: 0,
741        };
742
743        replica.handle_prepare_ok(message, &mut mailbox);
744
745        assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
746        assert!(mailbox.is_empty());
747    }
748
749    #[test]
750    fn sender_ahead_prepare_ok() {
751        let configuration = Configuration::from(3);
752        let mut replica = Replica::new(configuration, 1, 0);
753        let mut mailbox = BufferedMailbox::default();
754
755        let message = PrepareOk {
756            view: View::default().next(),
757            op_number: OpNumber::default().next(),
758            index: 0,
759        };
760
761        replica.handle_prepare_ok(message.clone(), &mut mailbox);
762
763        assert_eq!(
764            mailbox
765                .pop_inbound()
766                .map(ProtocolPayload::unwrap_prepare_ok),
767            Some(message)
768        );
769
770        let mut messages = Vec::from_iter(mailbox.drain_send());
771        let outbound = GetState {
772            view: replica.view,
773            op_number: replica.log.last_op_number(),
774            index: replica.index,
775        };
776        let envelope = messages.pop().unwrap();
777
778        assert_ne!(envelope.destination, replica.index);
779        assert_eq!(envelope.payload.unwrap_get_state(), outbound);
780        assert!(messages.is_empty());
781        assert!(mailbox.is_empty());
782    }
783
784    #[test]
785    fn sender_behind_commit() {
786        let configuration = Configuration::from(3);
787        let mut replica = Replica::new(configuration, 0, 0);
788        let mut mailbox = BufferedMailbox::default();
789
790        replica.view.increment();
791        replica.view.increment();
792
793        let message = Commit {
794            view: View::default().next(),
795            committed: OpNumber::default().next(),
796        };
797
798        replica.handle_commit(message, &mut mailbox);
799
800        assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
801        assert!(mailbox.is_empty());
802    }
803
804    #[test]
805    fn sender_ahead_commit() {
806        let configuration = Configuration::from(3);
807        let mut replica = Replica::new(configuration, 0, 0);
808        let mut mailbox = BufferedMailbox::default();
809
810        let message = Commit {
811            view: View::default().next(),
812            committed: OpNumber::default().next(),
813        };
814
815        replica.handle_commit(message.clone(), &mut mailbox);
816
817        assert_eq!(
818            mailbox.pop_inbound().map(ProtocolPayload::unwrap_commit),
819            Some(message)
820        );
821
822        let mut messages = Vec::from_iter(mailbox.drain_send());
823        let outbound = GetState {
824            view: replica.view,
825            op_number: replica.log.last_op_number(),
826            index: replica.index,
827        };
828        let envelope = messages.pop().unwrap();
829
830        assert_ne!(envelope.destination, replica.index);
831        assert_eq!(envelope.payload.unwrap_get_state(), outbound);
832        assert!(messages.is_empty());
833        assert!(mailbox.is_empty());
834    }
835
836    #[test]
837    fn sender_behind_get_state() {
838        let configuration = Configuration::from(3);
839        let mut replica = Replica::new(configuration, 0, 0);
840        let mut mailbox = BufferedMailbox::default();
841
842        replica.view.increment();
843        replica.view.increment();
844
845        let message = GetState {
846            view: View::default().next(),
847            op_number: OpNumber::default(),
848            index: 1,
849        };
850
851        replica.handle_get_state(message, &mut mailbox);
852
853        assert_eq!(Vec::from_iter(mailbox.drain_inbound()), vec![]);
854        assert!(mailbox.is_empty());
855    }
856
857    #[test]
858    fn sender_ahead_get_state() {
859        let configuration = Configuration::from(3);
860        let mut replica = Replica::new(configuration, 0, 0);
861        let mut mailbox = BufferedMailbox::default();
862
863        let message = GetState {
864            view: View::default().next(),
865            op_number: OpNumber::default().next(),
866            index: 1,
867        };
868
869        replica.handle_get_state(message.clone(), &mut mailbox);
870
871        assert_eq!(
872            mailbox.pop_inbound().map(ProtocolPayload::unwrap_get_state),
873            Some(message)
874        );
875
876        let mut messages = Vec::from_iter(mailbox.drain_send());
877        let outbound = GetState {
878            view: replica.view,
879            op_number: replica.log.last_op_number(),
880            index: replica.index,
881        };
882        let envelope = messages.pop().unwrap();
883
884        assert_ne!(envelope.destination, replica.index);
885        assert_eq!(envelope.payload.unwrap_get_state(), outbound);
886        assert!(messages.is_empty());
887        assert!(mailbox.is_empty());
888    }
889
890    #[test]
891    fn sender_behind_new_state() {
892        let configuration = Configuration::from(3);
893        let mut replica = Replica::new(configuration, 0, 0);
894        let mut outbox = BufferedMailbox::default();
895
896        replica.view.increment();
897        replica.view.increment();
898        replica.log.push(
899            replica.view,
900            Request {
901                payload: 2,
902                client: Default::default(),
903                id: Default::default(),
904            },
905            (),
906        );
907
908        let message = NewState {
909            view: View::default().next(),
910            log: Log::default(),
911            committed: OpNumber::default().next(),
912        };
913
914        replica.handle_new_state(message.clone(), &mut outbox);
915
916        assert_ne!(replica.log, message.log);
917        assert_ne!(replica.committed, message.committed);
918        assert!(outbox.is_empty());
919    }
920}