node_replication/
replica.rs

1// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4use core::cell::RefCell;
5use core::hint::spin_loop;
6#[cfg(not(loom))]
7use core::sync::atomic::{AtomicUsize, Ordering};
8#[cfg(loom)]
9use loom::sync::atomic::{AtomicUsize, Ordering};
10
11#[cfg(not(loom))]
12use alloc::sync::Arc;
13#[cfg(loom)]
14use loom::sync::Arc;
15
16use alloc::vec::Vec;
17
18use crossbeam_utils::CachePadded;
19
20use super::context::Context;
21use super::log::Log;
22use super::rwlock::RwLock;
23use super::Dispatch;
24
25/// A token handed out to threads registered with replicas.
26///
27/// # Note
28/// Ideally this would be an affine type and returned again by
29/// `execute` and `execute_ro`. However it feels like this would
30/// hurt API ergonomics a lot.
31#[derive(Copy, Clone, Debug, PartialEq)]
32pub struct ReplicaToken(usize);
33
34/// To make it harder to use the same ReplicaToken on multiple threads.
35#[cfg(features = "unstable")]
36impl !Send for ReplicaToken {}
37
38impl ReplicaToken {
39    /// Creates a new ReplicaToken
40    ///
41    /// # Safety
42    /// This should only ever be used for the benchmark harness to create
43    /// additional fake replica implementations.
44    /// If we had a means to declare this not-pub we should do that instead.
45    #[doc(hidden)]
46    pub unsafe fn new(ident: usize) -> Self {
47        ReplicaToken(ident)
48    }
49
50    /// Getter for id
51    pub fn id(&self) -> usize {
52        self.0
53    }
54}
55
56/// The maximum number of threads that can be registered with a replica. If more than
57/// this number of threads try to register, the register() function will return None.
58///
59/// # Important
60/// If this number is adjusted due to the use of the `arr_macro::arr` macro we
61/// have to adjust the `256` literals in the `new` constructor of `Replica`.
62#[cfg(not(loom))]
63pub const MAX_THREADS_PER_REPLICA: usize = 256;
64#[cfg(loom)]
65pub const MAX_THREADS_PER_REPLICA: usize = 2;
66const_assert!(
67    MAX_THREADS_PER_REPLICA >= 1 && (MAX_THREADS_PER_REPLICA & (MAX_THREADS_PER_REPLICA - 1) == 0)
68);
69
70/// An instance of a replicated data structure. Uses a shared log to scale
71/// operations on the data structure across cores and processors.
72///
73/// Takes in one type argument: `D` represents the underlying sequential data
74/// structure `D` must implement the `Dispatch` trait.
75///
76/// A thread can be registered against the replica by calling `register()`. A
77/// mutable operation can be issued by calling `execute_mut()` (immutable uses
78/// `execute`). A mutable operation will be eventually executed against the replica
79/// along with any operations that were received on other replicas that share
80/// the same underlying log.
81pub struct Replica<'a, D>
82where
83    D: Sized + Dispatch + Sync,
84{
85    /// A replica-identifier received when the replica is registered against
86    /// the shared-log. Required when consuming operations from the log.
87    idx: usize,
88
89    /// Thread idx of the thread currently responsible for flat combining. Zero
90    /// if there isn't any thread actively performing flat combining on the log.
91    /// This also doubles up as the combiner lock.
92    combiner: CachePadded<AtomicUsize>,
93
94    /// Idx that will be handed out to the next thread that registers with the replica.
95    next: CachePadded<AtomicUsize>,
96
97    /// List of per-thread contexts. Threads buffer write operations in here when they
98    /// cannot perform flat combining (because another thread might be doing so).
99    ///
100    /// The vector is initialized with `MAX_THREADS_PER_REPLICA` elements.
101    contexts: Vec<Context<<D as Dispatch>::WriteOperation, <D as Dispatch>::Response>>,
102
103    /// A buffer of operations for flat combining. The combiner stages operations in
104    /// here and then batch appends them into the shared log. This helps amortize
105    /// the cost of the compare_and_swap() on the tail of the log.
106    buffer: RefCell<Vec<<D as Dispatch>::WriteOperation>>,
107
108    /// Number of operations collected by the combiner from each thread at any
109    /// given point of time. Index `i` holds the number of operations collected from
110    /// thread with identifier `i + 1`.
111    inflight: RefCell<[usize; MAX_THREADS_PER_REPLICA]>,
112
113    /// A buffer of results collected after flat combining. With the help of `inflight`,
114    /// the combiner enqueues these results into the appropriate thread context.
115    result: RefCell<Vec<<D as Dispatch>::Response>>,
116
117    /// Reference to the shared log that operations will be appended to and the
118    /// data structure will be updated from.
119    slog: Arc<Log<'a, <D as Dispatch>::WriteOperation>>,
120
121    /// The underlying replicated data structure. Shared between threads registered
122    /// with this replica. Each replica maintains its own.
123    data: CachePadded<RwLock<D>>,
124}
125
126/// The Replica is Sync. Member variables are protected by a CAS on `combiner`.
127/// Contexts are thread-safe.
128unsafe impl<'a, D> Sync for Replica<'a, D> where D: Sized + Sync + Dispatch {}
129
130impl<'a, D> core::fmt::Debug for Replica<'a, D>
131where
132    D: Sized + Sync + Dispatch,
133{
134    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
135        write!(f, "Replica")
136    }
137}
138
139impl<'a, D> Replica<'a, D>
140where
141    D: Sized + Default + Dispatch + Sync,
142{
143    /// Constructs an instance of a replicated data structure.
144    ///
145    /// Takes a reference to the shared log as an argument. The Log is assumed to
146    /// outlive the replica. The replica is bound to the log's lifetime.
147    ///
148    /// # Example
149    ///
150    /// ```
151    /// use node_replication::Dispatch;
152    /// use node_replication::Log;
153    /// use node_replication::Replica;
154    ///
155    /// use std::sync::Arc;
156    ///
157    /// // The data structure we want replicated.
158    /// #[derive(Default)]
159    /// struct Data {
160    ///     junk: u64,
161    /// }
162    ///
163    /// // This trait allows the `Data` to be used with node-replication.
164    /// impl Dispatch for Data {
165    ///     type ReadOperation = ();
166    ///     type WriteOperation = u64;
167    ///     type Response = Option<u64>;
168    ///
169    ///     // A read returns the underlying u64.
170    ///     fn dispatch(
171    ///         &self,
172    ///         _op: Self::ReadOperation,
173    ///     ) -> Self::Response {
174    ///         Some(self.junk)
175    ///     }
176    ///
177    ///     // A write updates the underlying u64.
178    ///     fn dispatch_mut(
179    ///         &mut self,
180    ///         op: Self::WriteOperation,
181    ///     ) -> Self::Response {
182    ///         self.junk = op;
183    ///         None
184    ///     }
185    /// }
186    ///
187    /// // First create a shared log.
188    /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
189    ///
190    /// // Create a replica that uses the above log.
191    /// let replica = Replica::<Data>::new(&log);
192    /// ```
193    pub fn new<'b>(log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>) -> Arc<Replica<'b, D>> {
194        Replica::with_data(log, Default::default())
195    }
196}
197
198impl<'a, D> Replica<'a, D>
199where
200    D: Sized + Dispatch + Sync,
201{
202    /// Similar to [`Replica<D>::new`], but we pass a pre-initialized
203    /// data-structure as an argument (`d`) rather than relying on the
204    /// [Default](core::default::Default) trait to create one.
205    ///
206    /// # Note
207    /// [`Replica<D>::new`] should be the preferred method to create a Replica.
208    /// If `with_data` is used, care must be taken that the same state is passed
209    /// to every Replica object. If not the resulting operations executed
210    /// against replicas may not give deterministic results.
211    #[cfg(not(feature = "unstable"))]
212    pub fn with_data<'b>(
213        log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
214        d: D,
215    ) -> Arc<Replica<'b, D>> {
216        let mut contexts = Vec::with_capacity(MAX_THREADS_PER_REPLICA);
217        // Add `MAX_THREADS_PER_REPLICA` contexts
218        for _idx in 0..MAX_THREADS_PER_REPLICA {
219            contexts.push(Default::default());
220        }
221
222        Arc::new(
223            Replica {
224                idx: log.register().unwrap(),
225                combiner: CachePadded::new(AtomicUsize::new(0)),
226                next: CachePadded::new(AtomicUsize::new(1)),
227                contexts,
228                buffer:
229                    RefCell::new(
230                        Vec::with_capacity(
231                            MAX_THREADS_PER_REPLICA
232                                * Context::<
233                                    <D as Dispatch>::WriteOperation,
234                                    <D as Dispatch>::Response,
235                                >::batch_size(),
236                        ),
237                    ),
238                inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
239                result:
240                    RefCell::new(
241                        Vec::with_capacity(
242                            MAX_THREADS_PER_REPLICA
243                                * Context::<
244                                    <D as Dispatch>::WriteOperation,
245                                    <D as Dispatch>::Response,
246                                >::batch_size(),
247                        ),
248                    ),
249                slog: log.clone(),
250                data: CachePadded::new(RwLock::<D>::new(d)),
251            },
252        )
253    }
254
255    /// See `with_data` documentation without unstable feature.
256    #[cfg(feature = "unstable")]
257    pub fn with_data<'b>(
258        log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
259        d: D,
260    ) -> Arc<Replica<'b, D>> {
261        use core::mem::MaybeUninit;
262        let mut uninit_replica: Arc<MaybeUninit<Replica<D>>> = Arc::new_zeroed();
263
264        // This is the preferred (but unsafe) mode of initialization as it avoids
265        // putting the big Replica object on the stack first.
266        unsafe {
267            let uninit_ptr = Arc::get_mut_unchecked(&mut uninit_replica).as_mut_ptr();
268            uninit_ptr.write(Replica {
269                idx: log.register().unwrap(),
270                combiner: CachePadded::new(AtomicUsize::new(0)),
271                next: CachePadded::new(AtomicUsize::new(1)),
272                contexts: Vec::with_capacity(MAX_THREADS_PER_REPLICA),
273                buffer:
274                    RefCell::new(
275                        Vec::with_capacity(
276                            MAX_THREADS_PER_REPLICA
277                                * Context::<
278                                    <D as Dispatch>::WriteOperation,
279                                    <D as Dispatch>::Response,
280                                >::batch_size(),
281                        ),
282                    ),
283                inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
284                result:
285                    RefCell::new(
286                        Vec::with_capacity(
287                            MAX_THREADS_PER_REPLICA
288                                * Context::<
289                                    <D as Dispatch>::WriteOperation,
290                                    <D as Dispatch>::Response,
291                                >::batch_size(),
292                        ),
293                    ),
294                slog: log.clone(),
295                data: CachePadded::new(RwLock::<D>::new(d)),
296            });
297
298            let mut replica = uninit_replica.assume_init();
299            // Add `MAX_THREADS_PER_REPLICA` contexts
300            for _idx in 0..MAX_THREADS_PER_REPLICA {
301                Arc::get_mut(&mut replica)
302                    .unwrap()
303                    .contexts
304                    .push(Default::default());
305            }
306
307            replica
308        }
309    }
310
311    /// Registers a thread with this replica. Returns an idx inside an Option if the registration
312    /// was successfull. None if the registration failed.
313    ///
314    /// # Example
315    ///
316    /// ```
317    /// use node_replication::Dispatch;
318    /// use node_replication::Log;
319    /// use node_replication::Replica;
320    ///
321    /// use std::sync::Arc;
322    ///
323    /// #[derive(Default)]
324    /// struct Data {
325    ///     junk: u64,
326    /// }
327    ///
328    /// impl Dispatch for Data {
329    ///     type ReadOperation = ();
330    ///     type WriteOperation = u64;
331    ///     type Response = Option<u64>;
332    ///
333    ///     fn dispatch(
334    ///         &self,
335    ///         _op: Self::ReadOperation,
336    ///     ) -> Self::Response {
337    ///         Some(self.junk)
338    ///     }
339    ///
340    ///     fn dispatch_mut(
341    ///         &mut self,
342    ///         op: Self::WriteOperation,
343    ///     ) -> Self::Response {
344    ///         self.junk = op;
345    ///         None
346    ///     }
347    /// }
348    ///
349    /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
350    /// let replica = Replica::<Data>::new(&log);
351    ///
352    /// // Calling register() returns an idx that can be used to execute
353    /// // operations against the replica.
354    /// let idx = replica.register().expect("Failed to register with replica.");
355    /// ```
356    pub fn register(&self) -> Option<ReplicaToken> {
357        // Loop until we either run out of identifiers or we manage to increment `next`.
358        loop {
359            let idx = self.next.load(Ordering::SeqCst);
360
361            if idx > MAX_THREADS_PER_REPLICA {
362                return None;
363            };
364
365            if self
366                .next
367                .compare_exchange_weak(idx, idx + 1, Ordering::SeqCst, Ordering::SeqCst)
368                != Ok(idx)
369            {
370                continue;
371            };
372
373            return Some(ReplicaToken(idx));
374        }
375    }
376
377    /// Executes an mutable operation against this replica and returns a response.
378    /// `idx` is an identifier for the thread performing the execute operation.
379    ///
380    /// # Example
381    ///
382    /// ```
383    /// use node_replication::Dispatch;
384    /// use node_replication::Log;
385    /// use node_replication::Replica;
386    ///
387    /// use std::sync::Arc;
388    ///
389    /// #[derive(Default)]
390    /// struct Data {
391    ///     junk: u64,
392    /// }
393    ///
394    /// impl Dispatch for Data {
395    ///     type ReadOperation = ();
396    ///     type WriteOperation = u64;
397    ///     type Response = Option<u64>;
398    ///
399    ///     fn dispatch(
400    ///         &self,
401    ///         _op: Self::ReadOperation,
402    ///     ) -> Self::Response {
403    ///         Some(self.junk)
404    ///     }
405    ///
406    ///     fn dispatch_mut(
407    ///         &mut self,
408    ///         op: Self::WriteOperation,
409    ///     ) -> Self::Response {
410    ///         self.junk = op;
411    ///         None
412    ///     }
413    /// }
414    ///
415    /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
416    /// let replica = Replica::<Data>::new(&log);
417    /// let idx = replica.register().expect("Failed to register with replica.");
418    ///
419    /// // execute_mut() can be used to write to the replicated data structure.
420    /// let res = replica.execute_mut(100, idx);
421    /// assert_eq!(None, res);
422    pub fn execute_mut(
423        &self,
424        op: <D as Dispatch>::WriteOperation,
425        idx: ReplicaToken,
426    ) -> <D as Dispatch>::Response {
427        // Enqueue the operation onto the thread local batch and then try to flat combine.
428        while !self.make_pending(op.clone(), idx.0) {}
429        self.try_combine(idx.0);
430
431        // Return the response to the caller function.
432        self.get_response(idx.0)
433    }
434
435    /// Executes a read-only operation against this replica and returns a response.
436    /// `idx` is an identifier for the thread performing the execute operation.
437    ///
438    /// # Example
439    ///
440    /// ```
441    /// use node_replication::Dispatch;
442    /// use node_replication::Log;
443    /// use node_replication::Replica;
444    ///
445    /// use std::sync::Arc;
446    ///
447    /// #[derive(Default)]
448    /// struct Data {
449    ///     junk: u64,
450    /// }
451    ///
452    /// impl Dispatch for Data {
453    ///     type ReadOperation = ();
454    ///     type WriteOperation = u64;
455    ///     type Response = Option<u64>;
456    ///
457    ///     fn dispatch(
458    ///         &self,
459    ///         _op: Self::ReadOperation,
460    ///     ) -> Self::Response {
461    ///         Some(self.junk)
462    ///     }
463    ///
464    ///     fn dispatch_mut(
465    ///         &mut self,
466    ///         op: Self::WriteOperation,
467    ///     ) -> Self::Response {
468    ///         self.junk = op;
469    ///         None
470    ///     }
471    /// }
472    ///
473    /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
474    /// let replica = Replica::<Data>::new(&log);
475    /// let idx = replica.register().expect("Failed to register with replica.");
476    /// let _wr = replica.execute_mut(100, idx);
477    ///
478    /// // execute() can be used to read from the replicated data structure.
479    /// let res = replica.execute((), idx);
480    /// assert_eq!(Some(100), res);
481    pub fn execute(
482        &self,
483        op: <D as Dispatch>::ReadOperation,
484        idx: ReplicaToken,
485    ) -> <D as Dispatch>::Response {
486        self.read_only(op, idx.0)
487    }
488
489    /// Busy waits until a response is available within the thread's context.
490    /// `idx` identifies this thread.
491    fn get_response(&self, idx: usize) -> <D as Dispatch>::Response {
492        let mut iter = 0;
493        let interval = 1 << 29;
494
495        // Keep trying to retrieve a response from the thread context. After trying `interval`
496        // times with no luck, try to perform flat combining to make some progress.
497        loop {
498            let r = self.contexts[idx - 1].res();
499            if let Some(resp) = r {
500                return resp;
501            }
502
503            iter += 1;
504
505            if iter == interval {
506                self.try_combine(idx);
507                iter = 0;
508            }
509        }
510    }
511
512    /// Executes a passed in closure against the replica's underlying data
513    /// structure. Useful for unit testing; can be used to verify certain
514    /// properties of the data structure after issuing a bunch of operations
515    /// against it.
516    ///
517    /// # Note
518    /// There is probably no need for a regular client to ever call this function.
519    #[doc(hidden)]
520    pub fn verify<F: FnMut(&D)>(&self, mut v: F) {
521        // Acquire the combiner lock before attempting anything on the data structure.
522        // Use an idx greater than the maximum that can be allocated.
523        while self.combiner.compare_exchange_weak(
524            0,
525            MAX_THREADS_PER_REPLICA + 2,
526            Ordering::Acquire,
527            Ordering::Acquire,
528        ) != Ok(0)
529        {
530            spin_loop();
531        }
532
533        let mut data = self.data.write(self.next.load(Ordering::Relaxed));
534        let mut f = |o: <D as Dispatch>::WriteOperation, _i: usize| {
535            data.dispatch_mut(o);
536        };
537
538        self.slog.exec(self.idx, &mut f);
539
540        v(&data);
541
542        self.combiner.store(0, Ordering::Release);
543    }
544
545    /// This method is useful when a replica stops making progress and some threads
546    /// on another replica are still active. The active replica will use all the entries
547    /// in the log and won't be able perform garbage collection because of the inactive
548    /// replica. So, this method syncs up the replica against the underlying log.
549    pub fn sync(&self, idx: ReplicaToken) {
550        let ctail = self.slog.get_ctail();
551        while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
552            self.try_combine(idx.0);
553            spin_loop();
554        }
555    }
556
557    /// Issues a read-only operation against the replica and returns a response.
558    /// Makes sure the replica is synced up against the log before doing so.
559    fn read_only(
560        &self,
561        op: <D as Dispatch>::ReadOperation,
562        tid: usize,
563    ) -> <D as Dispatch>::Response {
564        // We can perform the read only if our replica is synced up against
565        // the shared log. If it isn't, then try to combine until it is synced up.
566        let ctail = self.slog.get_ctail();
567        while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
568            self.try_combine(tid);
569            spin_loop();
570        }
571
572        return self.data.read(tid - 1).dispatch(op);
573    }
574
575    /// Enqueues an operation inside a thread local context. Returns a boolean
576    /// indicating whether the operation was enqueued (true) or not (false).
577    #[inline(always)]
578    fn make_pending(&self, op: <D as Dispatch>::WriteOperation, idx: usize) -> bool {
579        self.contexts[idx - 1].enqueue(op)
580    }
581
582    /// Appends an operation to the log and attempts to perform flat combining.
583    /// Accepts a thread `tid` as an argument. Required to acquire the combiner lock.
584    fn try_combine(&self, tid: usize) {
585        // First, check if there already is a flat combiner. If there is no active flat combiner
586        // then try to acquire the combiner lock. If there is, then just return.
587        for _i in 0..4 {
588            #[cfg(not(loom))]
589            if unsafe {
590                core::ptr::read_volatile(
591                    &self.combiner
592                        as *const crossbeam_utils::CachePadded<core::sync::atomic::AtomicUsize>
593                        as *const usize,
594                )
595            } != 0
596            {
597                return;
598            }
599
600            #[cfg(loom)]
601            {
602                if self.combiner.load(Ordering::Relaxed) != 0 {
603                    loom::thread::yield_now();
604                    return;
605                }
606            }
607        }
608
609        // Try to become the combiner here. If this fails, then simply return.
610        if self
611            .combiner
612            .compare_exchange_weak(0, tid, Ordering::Acquire, Ordering::Acquire)
613            != Ok(0)
614        {
615            #[cfg(loom)]
616            loom::thread::yield_now();
617            return;
618        }
619
620        // Successfully became the combiner; perform one round of flat combining.
621        self.combine();
622
623        // Allow other threads to perform flat combining once we have finished all our work.
624        // At this point, we've dropped all mutable references to thread contexts and to
625        // the staging buffer as well.
626        self.combiner.store(0, Ordering::Release);
627    }
628
629    /// Performs one round of flat combining. Collects, appends and executes operations.
630    #[inline(always)]
631    fn combine(&self) {
632        let mut buffer = self.buffer.borrow_mut();
633        let mut operations = self.inflight.borrow_mut();
634        let mut results = self.result.borrow_mut();
635
636        buffer.clear();
637        results.clear();
638
639        let next = self.next.load(Ordering::Relaxed);
640
641        // Collect operations from each thread registered with this replica.
642        for i in 1..next {
643            operations[i - 1] = self.contexts[i - 1].ops(&mut buffer);
644        }
645
646        // Append all collected operations into the shared log. We pass a closure
647        // in here because operations on the log might need to be consumed for GC.
648        {
649            let mut data = self.data.write(next);
650            let f = |o: <D as Dispatch>::WriteOperation, i: usize| {
651                #[cfg(not(loom))]
652                let resp = data.dispatch_mut(o);
653                #[cfg(loom)]
654                let resp = data.dispatch_mut(o);
655                if i == self.idx {
656                    results.push(resp);
657                }
658            };
659            self.slog.append(&buffer, self.idx, f);
660        }
661
662        // Execute any operations on the shared log against this replica.
663        {
664            let mut data = self.data.write(next);
665            let mut f = |o: <D as Dispatch>::WriteOperation, i: usize| {
666                let resp = data.dispatch_mut(o);
667                if i == self.idx {
668                    results.push(resp)
669                };
670            };
671            self.slog.exec(self.idx, &mut f);
672        }
673
674        // Return/Enqueue responses back into the appropriate thread context(s).
675        let (mut s, mut f) = (0, 0);
676        for i in 1..next {
677            if operations[i - 1] == 0 {
678                continue;
679            };
680
681            f += operations[i - 1];
682            self.contexts[i - 1].enqueue_resps(&results[s..f]);
683            s += operations[i - 1];
684            operations[i - 1] = 0;
685        }
686    }
687}
688
689#[cfg(test)]
690mod test {
691    extern crate std;
692
693    use super::*;
694    use std::vec;
695
696    // Really dumb data structure to test against the Replica and shared log.
697    #[derive(Default)]
698    struct Data {
699        junk: u64,
700    }
701
702    impl Dispatch for Data {
703        type ReadOperation = u64;
704        type WriteOperation = u64;
705        type Response = Result<u64, ()>;
706
707        fn dispatch(&self, _op: Self::ReadOperation) -> Self::Response {
708            Ok(self.junk)
709        }
710
711        fn dispatch_mut(&mut self, _op: Self::WriteOperation) -> Self::Response {
712            self.junk += 1;
713            return Ok(107);
714        }
715    }
716
717    // Tests whether we can construct a Replica given a log.
718    #[test]
719    fn test_replica_create() {
720        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
721        let repl = Replica::<Data>::new(&slog);
722        assert_eq!(repl.idx, 1);
723        assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
724        assert_eq!(repl.next.load(Ordering::SeqCst), 1);
725        assert_eq!(repl.contexts.len(), MAX_THREADS_PER_REPLICA);
726        assert_eq!(
727            repl.buffer.borrow().capacity(),
728            MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
729        );
730        assert_eq!(repl.inflight.borrow().len(), MAX_THREADS_PER_REPLICA);
731        assert_eq!(
732            repl.result.borrow().capacity(),
733            MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
734        );
735        assert_eq!(repl.data.read(0).junk, 0);
736    }
737
738    // Tests whether we can register with this replica and receive an idx.
739    #[test]
740    fn test_replica_register() {
741        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
742        let repl = Replica::<Data>::new(&slog);
743        assert_eq!(repl.register(), Some(ReplicaToken(1)));
744        assert_eq!(repl.next.load(Ordering::SeqCst), 2);
745        repl.next.store(17, Ordering::SeqCst);
746        assert_eq!(repl.register(), Some(ReplicaToken(17)));
747        assert_eq!(repl.next.load(Ordering::SeqCst), 18);
748    }
749
750    // Tests whether registering more than the maximum limit of threads per replica is disallowed.
751    #[test]
752    fn test_replica_register_none() {
753        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
754        let repl = Replica::<Data>::new(&slog);
755        repl.next
756            .store(MAX_THREADS_PER_REPLICA + 1, Ordering::SeqCst);
757        assert!(repl.register().is_none());
758    }
759
760    // Tests that we can successfully allow operations to go pending on this replica.
761    #[test]
762    fn test_replica_make_pending() {
763        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
764        let repl = Replica::<Data>::new(&slog);
765        let mut o = vec![];
766
767        assert!(repl.make_pending(121, 8));
768        assert_eq!(repl.contexts[7].ops(&mut o), 1);
769        assert_eq!(o.len(), 1);
770        assert_eq!(o[0], 121);
771    }
772
773    // Tests that we can't pend operations on a context that is already full of operations.
774    #[test]
775    fn test_replica_make_pending_false() {
776        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
777        let repl = Replica::<Data>::new(&slog);
778        for _i in 0..Context::<u64, Result<u64, ()>>::batch_size() {
779            assert!(repl.make_pending(121, 1))
780        }
781
782        assert!(!repl.make_pending(11, 1));
783    }
784
785    // Tests that we can append and execute operations using try_combine().
786    #[test]
787    fn test_replica_try_combine() {
788        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
789        let repl = Replica::<Data>::new(&slog);
790        let _idx = repl.register();
791
792        repl.make_pending(121, 1);
793        repl.try_combine(1);
794
795        assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
796        assert_eq!(repl.data.read(0).junk, 1);
797        assert_eq!(repl.contexts[0].res(), Some(Ok(107)));
798    }
799
800    // Tests whether try_combine() also applies pending operations on other threads to the log.
801    #[test]
802    fn test_replica_try_combine_pending() {
803        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
804        let repl = Replica::<Data>::new(&slog);
805
806        repl.next.store(9, Ordering::SeqCst);
807        repl.make_pending(121, 8);
808        repl.try_combine(1);
809
810        assert_eq!(repl.data.read(0).junk, 1);
811        assert_eq!(repl.contexts[7].res(), Some(Ok(107)));
812    }
813
814    // Tests whether try_combine() fails if someone else is currently flat combining.
815    #[test]
816    fn test_replica_try_combine_fail() {
817        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
818        let repl = Replica::<Data>::new(&slog);
819
820        repl.next.store(9, Ordering::SeqCst);
821        repl.combiner.store(8, Ordering::SeqCst);
822        repl.make_pending(121, 1);
823        repl.try_combine(1);
824
825        assert_eq!(repl.data.read(0).junk, 0);
826        assert_eq!(repl.contexts[0].res(), None);
827    }
828
829    // Tests whether we can execute an operation against the log using execute_mut().
830    #[test]
831    fn test_replica_execute_combine() {
832        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
833        let repl = Replica::<Data>::new(&slog);
834        let idx = repl.register().unwrap();
835
836        assert_eq!(Ok(107), repl.execute_mut(121, idx));
837        assert_eq!(1, repl.data.read(0).junk);
838    }
839
840    // Tests whether get_response() retrieves a response to an operation that was executed
841    // against a replica.
842    #[test]
843    fn test_replica_get_response() {
844        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
845        let repl = Replica::<Data>::new(&slog);
846        let _idx = repl.register();
847
848        repl.make_pending(121, 1);
849
850        assert_eq!(repl.get_response(1), Ok(107));
851    }
852
853    // Tests whether we can issue a read-only operation against the replica.
854    #[test]
855    fn test_replica_execute() {
856        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
857        let repl = Replica::<Data>::new(&slog);
858        let idx = repl.register().expect("Failed to register with replica.");
859
860        assert_eq!(Ok(107), repl.execute_mut(121, idx));
861        assert_eq!(Ok(1), repl.execute(11, idx));
862    }
863
864    // Tests that execute() syncs up the replica with the log before
865    // executing the read against the data structure.
866    #[test]
867    fn test_replica_execute_not_synced() {
868        let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
869        let repl = Replica::<Data>::new(&slog);
870
871        // Add in operations to the log off the side, not through the replica.
872        let o = [121, 212];
873        slog.append(&o, 2, |_o: u64, _i: usize| {});
874        slog.exec(2, &mut |_o: u64, _i: usize| {});
875
876        let t1 = repl.register().expect("Failed to register with replica.");
877        assert_eq!(Ok(2), repl.execute(11, t1));
878    }
879}