1use crate::buffer::{ReplicaId, SeqNo};
67use mdcs_core::lattice::Lattice;
68use serde::{Deserialize, Serialize};
69use std::collections::{HashMap, VecDeque};
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
80pub struct DeltaInterval<D> {
81 pub from: ReplicaId,
83 pub to: ReplicaId,
85 pub delta: D,
87 pub from_seq: SeqNo,
89 pub to_seq: SeqNo,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
95pub struct IntervalAck {
96 pub from: ReplicaId,
97 pub to: ReplicaId,
98 pub acked_seq: SeqNo,
100}
101
102#[derive(Debug, Clone)]
104pub enum CausalMessage<D> {
105 DeltaInterval(DeltaInterval<D>),
107 Ack(IntervalAck),
109 SnapshotRequest { from: ReplicaId, to: ReplicaId },
111 Snapshot {
113 from: ReplicaId,
114 to: ReplicaId,
115 state: D,
116 seq: SeqNo,
117 },
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DurableState<S> {
126 pub replica_id: ReplicaId,
128 pub state: S,
130 pub counter: SeqNo,
132}
133
134impl<S: Lattice> DurableState<S> {
135 pub fn new(replica_id: impl Into<ReplicaId>) -> Self {
136 Self {
137 replica_id: replica_id.into(),
138 state: S::bottom(),
139 counter: 0,
140 }
141 }
142}
143
144#[derive(Debug, Clone)]
149pub struct PeerDeltaBuffer<D: Lattice> {
150 delta: Option<D>,
152 from_seq: SeqNo,
154 to_seq: SeqNo,
156}
157
158impl<D: Lattice> PeerDeltaBuffer<D> {
159 pub fn new() -> Self {
160 Self {
161 delta: None,
162 from_seq: 0,
163 to_seq: 0,
164 }
165 }
166
167 pub fn start_from(seq: SeqNo) -> Self {
169 Self {
170 delta: None,
171 from_seq: seq,
172 to_seq: seq,
173 }
174 }
175
176 pub fn push(&mut self, delta: D, seq: SeqNo) {
178 match &mut self.delta {
179 Some(existing) => {
180 existing.join_assign(&delta);
181 }
182 None => {
183 self.delta = Some(delta);
184 }
185 }
186 self.to_seq = seq;
187 }
188
189 pub fn has_pending(&self) -> bool {
191 self.delta.is_some()
192 }
193
194 pub fn take(&mut self) -> Option<(D, SeqNo, SeqNo)> {
196 self.delta.take().map(|d| {
197 let from = self.from_seq;
198 let to = self.to_seq;
199 self.from_seq = to;
200 (d, from, to)
201 })
202 }
203
204 pub fn clear(&mut self) {
206 self.delta = None;
207 self.from_seq = self.to_seq;
208 }
209
210 pub fn reset_from(&mut self, seq: SeqNo) {
212 self.delta = None;
213 self.from_seq = seq;
214 self.to_seq = seq;
215 }
216}
217
218impl<D: Lattice> Default for PeerDeltaBuffer<D> {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224#[derive(Debug, Clone)]
226pub struct VolatileState<D: Lattice> {
227 pub delta_buffers: HashMap<ReplicaId, PeerDeltaBuffer<D>>,
229 pub peer_acks: HashMap<ReplicaId, SeqNo>,
232}
233
234impl<D: Lattice> VolatileState<D> {
235 pub fn new() -> Self {
236 Self {
237 delta_buffers: HashMap::new(),
238 peer_acks: HashMap::new(),
239 }
240 }
241
242 pub fn register_peer(&mut self, peer_id: ReplicaId) {
244 self.delta_buffers
245 .entry(peer_id.clone())
246 .or_default();
247 self.peer_acks.entry(peer_id).or_insert(0);
248 }
249
250 pub fn get_peer_ack(&self, peer_id: &str) -> SeqNo {
252 self.peer_acks.get(peer_id).copied().unwrap_or(0)
253 }
254
255 pub fn update_peer_ack(&mut self, peer_id: &str, seq: SeqNo) {
257 if let Some(ack) = self.peer_acks.get_mut(peer_id) {
258 *ack = (*ack).max(seq);
259 }
260 }
261}
262
263impl<D: Lattice> Default for VolatileState<D> {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269#[derive(Debug, Clone)]
276pub struct CausalReplica<S: Lattice + Clone> {
277 durable: DurableState<S>,
279 volatile: VolatileState<S>,
281 pending: HashMap<ReplicaId, VecDeque<DeltaInterval<S>>>,
283}
284
285impl<S: Lattice + Clone> CausalReplica<S> {
286 pub fn new(id: impl Into<ReplicaId>) -> Self {
288 Self {
289 durable: DurableState::new(id),
290 volatile: VolatileState::new(),
291 pending: HashMap::new(),
292 }
293 }
294
295 pub fn restore(durable: DurableState<S>) -> Self {
297 Self {
298 durable,
299 volatile: VolatileState::new(),
300 pending: HashMap::new(),
301 }
302 }
303
304 pub fn id(&self) -> &ReplicaId {
306 &self.durable.replica_id
307 }
308
309 pub fn state(&self) -> &S {
311 &self.durable.state
312 }
313
314 pub fn counter(&self) -> SeqNo {
316 self.durable.counter
317 }
318
319 pub fn durable_state(&self) -> &DurableState<S> {
321 &self.durable
322 }
323
324 pub fn register_peer(&mut self, peer_id: ReplicaId) {
326 self.volatile.register_peer(peer_id.clone());
327 self.pending.entry(peer_id).or_default();
328 }
329
330 pub fn mutate<F>(&mut self, mutator: F) -> S
342 where
343 F: FnOnce(&S) -> S,
344 {
345 self.durable.counter += 1;
347 let seq = self.durable.counter;
348
349 let delta = mutator(&self.durable.state);
351
352 self.durable.state.join_assign(&delta);
354
355 for buffer in self.volatile.delta_buffers.values_mut() {
357 buffer.push(delta.clone(), seq);
358 }
359
360 delta
361 }
362
363 pub fn prepare_interval(&mut self, peer_id: &str) -> Option<DeltaInterval<S>> {
368 let buffer = self.volatile.delta_buffers.get_mut(peer_id)?;
369
370 buffer
371 .take()
372 .map(|(delta, from_seq, to_seq)| DeltaInterval {
373 from: self.durable.replica_id.clone(),
374 to: peer_id.to_string(),
375 delta,
376 from_seq,
377 to_seq,
378 })
379 }
380
381 fn is_causally_ready(&self, interval: &DeltaInterval<S>) -> bool {
385 let last_acked = self.volatile.get_peer_ack(&interval.from);
386 interval.from_seq == last_acked
387 }
388
389 pub fn receive_interval(&mut self, interval: DeltaInterval<S>) -> Option<IntervalAck> {
404 if !self.volatile.peer_acks.contains_key(&interval.from) {
406 self.register_peer(interval.from.clone());
407 }
408
409 if self.is_causally_ready(&interval) {
410 self.durable.state.join_assign(&interval.delta);
412
413 self.volatile
415 .update_peer_ack(&interval.from, interval.to_seq);
416
417 let ack = IntervalAck {
418 from: self.durable.replica_id.clone(),
419 to: interval.from.clone(),
420 acked_seq: interval.to_seq,
421 };
422
423 self.try_apply_pending(&interval.from);
425
426 Some(ack)
427 } else {
428 let pending = self
430 .pending
431 .entry(interval.from.clone())
432 .or_default();
433
434 let pos = pending.iter().position(|p| p.from_seq > interval.from_seq);
436 match pos {
437 Some(i) => pending.insert(i, interval),
438 None => pending.push_back(interval),
439 }
440
441 None
442 }
443 }
444
445 fn try_apply_pending(&mut self, peer_id: &str) -> Vec<IntervalAck> {
447 let mut acks = Vec::new();
448
449 if let Some(pending) = self.pending.get_mut(peer_id) {
450 while let Some(interval) = pending.front() {
451 let last_acked = self.volatile.get_peer_ack(peer_id);
452 if interval.from_seq == last_acked {
453 let interval = pending.pop_front().unwrap();
454
455 self.durable.state.join_assign(&interval.delta);
457
458 self.volatile.update_peer_ack(peer_id, interval.to_seq);
460
461 acks.push(IntervalAck {
462 from: self.durable.replica_id.clone(),
463 to: interval.from.clone(),
464 acked_seq: interval.to_seq,
465 });
466 } else {
467 break;
468 }
469 }
470 }
471
472 acks
473 }
474
475 pub fn receive_ack(&mut self, ack: &IntervalAck) {
482 if let Some(buffer) = self.volatile.delta_buffers.get_mut(&ack.from) {
483 buffer.clear();
484 }
485 }
486
487 pub fn snapshot(&self) -> (S, SeqNo) {
489 (self.durable.state.clone(), self.durable.counter)
490 }
491
492 pub fn apply_snapshot(&mut self, state: S, seq: SeqNo, from: &str) {
494 self.durable.state.join_assign(&state);
495 self.volatile.update_peer_ack(from, seq);
496 }
497
498 pub fn peers(&self) -> impl Iterator<Item = &ReplicaId> {
500 self.volatile.peer_acks.keys()
501 }
502
503 pub fn has_pending_deltas(&self) -> bool {
505 self.volatile
506 .delta_buffers
507 .values()
508 .any(|b| b.has_pending())
509 }
510
511 pub fn pending_count(&self) -> usize {
513 self.pending.values().map(|v| v.len()).sum()
514 }
515}
516
517pub trait DurableStorage<S: Lattice> {
521 fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError>;
523
524 fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError>;
526
527 fn sync(&mut self) -> Result<(), StorageError>;
529}
530
531#[derive(Debug, Clone)]
533pub enum StorageError {
534 IoError(String),
535 SerializationError(String),
536 NotFound,
537}
538
539impl std::fmt::Display for StorageError {
540 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
541 match self {
542 StorageError::IoError(msg) => write!(f, "IO error: {}", msg),
543 StorageError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
544 StorageError::NotFound => write!(f, "State not found"),
545 }
546 }
547}
548
549impl std::error::Error for StorageError {}
550
551#[derive(Debug, Default)]
553pub struct MemoryStorage<S> {
554 states: HashMap<ReplicaId, DurableState<S>>,
555}
556
557impl<S: Clone> MemoryStorage<S> {
558 pub fn new() -> Self {
559 Self {
560 states: HashMap::new(),
561 }
562 }
563}
564
565impl<S: Lattice + Clone + Serialize + for<'de> Deserialize<'de>> DurableStorage<S>
566 for MemoryStorage<S>
567{
568 fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError> {
569 self.states.insert(state.replica_id.clone(), state.clone());
570 Ok(())
571 }
572
573 fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError> {
574 Ok(self.states.get(replica_id).cloned())
575 }
576
577 fn sync(&mut self) -> Result<(), StorageError> {
578 Ok(())
579 }
580}
581
582#[derive(Debug)]
584pub struct CausalNetworkSimulator<D> {
585 in_flight: VecDeque<CausalMessage<D>>,
587 lost: Vec<CausalMessage<D>>,
589 loss_rate: f64,
591 rng_state: u64,
593}
594
595impl<D: Clone> CausalNetworkSimulator<D> {
596 pub fn new(loss_rate: f64) -> Self {
597 Self {
598 in_flight: VecDeque::new(),
599 lost: Vec::new(),
600 loss_rate,
601 rng_state: 42,
602 }
603 }
604
605 fn next_random(&mut self) -> f64 {
607 self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
608 ((self.rng_state >> 16) & 0x7fff) as f64 / 32768.0
609 }
610
611 pub fn send(&mut self, msg: CausalMessage<D>) {
613 if self.next_random() < self.loss_rate {
614 self.lost.push(msg);
615 } else {
616 self.in_flight.push_back(msg);
617 }
618 }
619
620 pub fn receive(&mut self) -> Option<CausalMessage<D>> {
622 self.in_flight.pop_front()
623 }
624
625 pub fn retransmit_lost(&mut self) {
627 for msg in self.lost.drain(..) {
628 self.in_flight.push_back(msg);
629 }
630 }
631
632 pub fn is_empty(&self) -> bool {
634 self.in_flight.is_empty()
635 }
636
637 pub fn in_flight_count(&self) -> usize {
639 self.in_flight.len()
640 }
641
642 pub fn lost_count(&self) -> usize {
644 self.lost.len()
645 }
646}
647
648#[derive(Debug)]
650pub struct CausalCluster<S: Lattice + Clone> {
651 replicas: Vec<CausalReplica<S>>,
653 network: CausalNetworkSimulator<S>,
655}
656
657impl<S: Lattice + Clone> CausalCluster<S> {
658 pub fn new(n: usize, loss_rate: f64) -> Self {
660 let mut replicas = Vec::with_capacity(n);
661
662 for i in 0..n {
664 let mut replica = CausalReplica::new(format!("causal_{}", i));
665 for j in 0..n {
667 if i != j {
668 replica.register_peer(format!("causal_{}", j));
669 }
670 }
671 replicas.push(replica);
672 }
673
674 Self {
675 replicas,
676 network: CausalNetworkSimulator::new(loss_rate),
677 }
678 }
679
680 pub fn replica(&self, idx: usize) -> &CausalReplica<S> {
682 &self.replicas[idx]
683 }
684
685 pub fn replica_mut(&mut self, idx: usize) -> &mut CausalReplica<S> {
687 &mut self.replicas[idx]
688 }
689
690 pub fn mutate<F>(&mut self, replica_idx: usize, mutator: F) -> S
692 where
693 F: FnOnce(&S) -> S,
694 {
695 self.replicas[replica_idx].mutate(mutator)
696 }
697
698 pub fn broadcast_intervals(&mut self, from_idx: usize) {
700 let replica = &mut self.replicas[from_idx];
701 let peer_ids: Vec<_> = replica.peers().cloned().collect();
702
703 for peer_id in peer_ids {
704 if let Some(interval) = replica.prepare_interval(&peer_id) {
705 self.network.send(CausalMessage::DeltaInterval(interval));
706 }
707 }
708 }
709
710 pub fn process_one(&mut self) -> bool {
712 if let Some(msg) = self.network.receive() {
713 match msg {
714 CausalMessage::DeltaInterval(interval) => {
715 for replica in &mut self.replicas {
717 if replica.id() == &interval.to {
718 if let Some(ack) = replica.receive_interval(interval.clone()) {
719 self.network.send(CausalMessage::Ack(ack));
720 }
721 break;
722 }
723 }
724 }
725 CausalMessage::Ack(ack) => {
726 for replica in &mut self.replicas {
728 if replica.id() == &ack.to {
729 replica.receive_ack(&ack);
730 break;
731 }
732 }
733 }
734 CausalMessage::SnapshotRequest { from, to } => {
735 for replica in &self.replicas {
737 if replica.id() == &to {
738 let (state, seq) = replica.snapshot();
739 self.network.send(CausalMessage::Snapshot {
740 from: to,
741 to: from,
742 state,
743 seq,
744 });
745 break;
746 }
747 }
748 }
749 CausalMessage::Snapshot {
750 from,
751 to,
752 state,
753 seq,
754 } => {
755 for replica in &mut self.replicas {
757 if replica.id() == &to {
758 replica.apply_snapshot(state, seq, &from);
759 break;
760 }
761 }
762 }
763 }
764 true
765 } else {
766 false
767 }
768 }
769
770 pub fn drain_network(&mut self) {
772 while self.process_one() {}
773 }
774
775 pub fn full_sync_round(&mut self) {
777 let n = self.replicas.len();
778 for i in 0..n {
779 self.broadcast_intervals(i);
780 }
781 self.drain_network();
782 }
783
784 pub fn is_converged(&self) -> bool {
786 if self.replicas.len() < 2 {
787 return true;
788 }
789
790 let first = self.replicas[0].state();
791 self.replicas.iter().skip(1).all(|r| r.state() == first)
792 }
793
794 pub fn retransmit_and_process(&mut self) {
796 self.network.retransmit_lost();
797 self.drain_network();
798 }
799
800 pub fn len(&self) -> usize {
802 self.replicas.len()
803 }
804
805 pub fn is_empty(&self) -> bool {
807 self.replicas.is_empty()
808 }
809
810 pub fn crash_and_recover(&mut self, idx: usize) {
812 let durable = self.replicas[idx].durable_state().clone();
813
814 let mut recovered = CausalReplica::restore(durable);
816
817 let n = self.replicas.len();
819 for j in 0..n {
820 if idx != j {
821 recovered.register_peer(format!("causal_{}", j));
822 }
823 }
824
825 self.replicas[idx] = recovered;
826 }
827
828 pub fn total_pending(&self) -> usize {
830 self.replicas.iter().map(|r| r.pending_count()).sum()
831 }
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837 use mdcs_core::gset::GSet;
838 use mdcs_core::pncounter::PNCounter;
839
840 #[test]
841 fn test_causal_replica_basic() {
842 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
843
844 replica.mutate(|_| {
845 let mut d = GSet::new();
846 d.insert(42);
847 d
848 });
849
850 assert!(replica.state().contains(&42));
851 assert_eq!(replica.counter(), 1);
852 }
853
854 #[test]
855 fn test_causal_interval_generation() {
856 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
857 replica.register_peer("peer1".to_string());
858
859 replica.mutate(|_| {
860 let mut d = GSet::new();
861 d.insert(1);
862 d
863 });
864
865 replica.mutate(|_| {
866 let mut d = GSet::new();
867 d.insert(2);
868 d
869 });
870
871 let interval = replica.prepare_interval("peer1").unwrap();
872 assert_eq!(interval.from_seq, 0);
873 assert_eq!(interval.to_seq, 2);
874 assert!(interval.delta.contains(&1));
875 assert!(interval.delta.contains(&2));
876 }
877
878 #[test]
879 fn test_causal_delivery() {
880 let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
881 let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
882
883 r1.register_peer("r2".to_string());
884 r2.register_peer("r1".to_string());
885
886 r1.mutate(|_| {
888 let mut d = GSet::new();
889 d.insert(1);
890 d
891 });
892 r1.mutate(|_| {
893 let mut d = GSet::new();
894 d.insert(2);
895 d
896 });
897
898 let interval = r1.prepare_interval("r2").unwrap();
900 assert_eq!(interval.from_seq, 0);
901 assert_eq!(interval.to_seq, 2);
902
903 let ack = r2.receive_interval(interval).unwrap();
905 assert_eq!(ack.acked_seq, 2);
906
907 assert!(r2.state().contains(&1));
909 assert!(r2.state().contains(&2));
910 }
911
912 #[test]
913 fn test_out_of_order_buffering() {
914 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
915 replica.register_peer("peer".to_string());
916
917 let out_of_order = DeltaInterval {
919 from: "peer".to_string(),
920 to: "r1".to_string(),
921 delta: {
922 let mut d = GSet::new();
923 d.insert(999);
924 d
925 },
926 from_seq: 5, to_seq: 6,
928 };
929
930 let result = replica.receive_interval(out_of_order);
932 assert!(result.is_none());
933 assert_eq!(replica.pending_count(), 1);
934 assert!(!replica.state().contains(&999));
935 }
936
937 #[test]
938 fn test_cluster_convergence() {
939 let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.0);
940
941 for i in 0..3 {
943 let val = (i + 1) as i32;
944 cluster.mutate(i, move |_| {
945 let mut d = GSet::new();
946 d.insert(val);
947 d
948 });
949 }
950
951 assert!(!cluster.is_converged());
953
954 cluster.full_sync_round();
956
957 assert!(cluster.is_converged());
959
960 for i in 0..3 {
962 for val in 1..=3 {
963 assert!(cluster.replica(i).state().contains(&val));
964 }
965 }
966 }
967
968 #[test]
969 fn test_cluster_with_loss() {
970 let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.3);
971
972 for i in 0..3 {
973 let val = (i + 1) as i32;
974 cluster.mutate(i, move |_| {
975 let mut d = GSet::new();
976 d.insert(val);
977 d
978 });
979 }
980
981 for _ in 0..10 {
983 cluster.full_sync_round();
984 cluster.retransmit_and_process();
985 }
986
987 assert!(cluster.is_converged());
989 }
990
991 #[test]
992 fn test_crash_recovery() {
993 let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(2, 0.0);
994
995 cluster.mutate(0, |_| {
997 let mut d = GSet::new();
998 d.insert(1);
999 d
1000 });
1001
1002 cluster.full_sync_round();
1004 assert!(cluster.is_converged());
1005
1006 cluster.mutate(0, |_| {
1008 let mut d = GSet::new();
1009 d.insert(2);
1010 d
1011 });
1012
1013 let counter_before = cluster.replica(0).counter();
1015 cluster.crash_and_recover(0);
1016
1017 assert_eq!(cluster.replica(0).counter(), counter_before);
1019 assert!(cluster.replica(0).state().contains(&1));
1020 assert!(cluster.replica(0).state().contains(&2));
1021
1022 assert!(!cluster.replica(0).has_pending_deltas());
1025 }
1026
1027 #[test]
1028 fn test_pncounter_causal() {
1029 let mut cluster: CausalCluster<PNCounter<String>> = CausalCluster::new(2, 0.0);
1030
1031 cluster.mutate(0, |_s| {
1033 let mut delta = PNCounter::new();
1034 delta.increment("r0".to_string(), 1);
1035 delta
1036 });
1037
1038 cluster.mutate(1, |_s| {
1040 let mut delta = PNCounter::new();
1041 delta.decrement("r1".to_string(), 1);
1042 delta
1043 });
1044
1045 cluster.full_sync_round();
1047
1048 assert!(cluster.is_converged());
1050 assert_eq!(cluster.replica(0).state().value(), 0);
1051 }
1052
1053 #[test]
1054 fn test_causal_ordering_preserved() {
1055 let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
1057 let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
1058
1059 r1.register_peer("r2".to_string());
1060 r2.register_peer("r1".to_string());
1061
1062 for i in 1..=3 {
1064 r1.mutate(move |_| {
1065 let mut d = GSet::new();
1066 d.insert(i);
1067 d
1068 });
1069 }
1070
1071 let interval_1_3 = DeltaInterval {
1076 from: "r1".to_string(),
1077 to: "r2".to_string(),
1078 delta: {
1079 let mut d = GSet::new();
1080 d.insert(3);
1081 d
1082 },
1083 from_seq: 2, to_seq: 3,
1085 };
1086
1087 let interval_0_2 = DeltaInterval {
1088 from: "r1".to_string(),
1089 to: "r2".to_string(),
1090 delta: {
1091 let mut d = GSet::new();
1092 d.insert(1);
1093 d.insert(2);
1094 d
1095 },
1096 from_seq: 0,
1097 to_seq: 2,
1098 };
1099
1100 let result = r2.receive_interval(interval_1_3.clone());
1102 assert!(result.is_none()); assert!(!r2.state().contains(&3)); let result = r2.receive_interval(interval_0_2);
1107 assert!(result.is_some()); assert!(r2.state().contains(&1));
1109 assert!(r2.state().contains(&2));
1110
1111 assert!(r2.state().contains(&3));
1113 assert_eq!(r2.pending_count(), 0);
1114 }
1115
1116 #[test]
1117 fn test_durable_storage() {
1118 let mut storage: MemoryStorage<GSet<i32>> = MemoryStorage::new();
1119
1120 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test");
1121 replica.mutate(|_| {
1122 let mut d = GSet::new();
1123 d.insert(42);
1124 d
1125 });
1126
1127 storage.persist(replica.durable_state()).unwrap();
1129
1130 let loaded = storage.load("test").unwrap().unwrap();
1132 assert_eq!(loaded.counter, 1);
1133 assert!(loaded.state.contains(&42));
1134
1135 let recovered = CausalReplica::restore(loaded);
1137 assert!(recovered.state().contains(&42));
1138 }
1139}