node_replication/replica.rs
1// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4use core::cell::RefCell;
5use core::hint::spin_loop;
6#[cfg(not(loom))]
7use core::sync::atomic::{AtomicUsize, Ordering};
8#[cfg(loom)]
9use loom::sync::atomic::{AtomicUsize, Ordering};
10
11#[cfg(not(loom))]
12use alloc::sync::Arc;
13#[cfg(loom)]
14use loom::sync::Arc;
15
16use alloc::vec::Vec;
17
18use crossbeam_utils::CachePadded;
19
20use super::context::Context;
21use super::log::Log;
22use super::rwlock::RwLock;
23use super::Dispatch;
24
25/// A token handed out to threads registered with replicas.
26///
27/// # Note
28/// Ideally this would be an affine type and returned again by
29/// `execute` and `execute_ro`. However it feels like this would
30/// hurt API ergonomics a lot.
31#[derive(Copy, Clone, Debug, PartialEq)]
32pub struct ReplicaToken(usize);
33
34/// To make it harder to use the same ReplicaToken on multiple threads.
35#[cfg(features = "unstable")]
36impl !Send for ReplicaToken {}
37
38impl ReplicaToken {
39 /// Creates a new ReplicaToken
40 ///
41 /// # Safety
42 /// This should only ever be used for the benchmark harness to create
43 /// additional fake replica implementations.
44 /// If we had a means to declare this not-pub we should do that instead.
45 #[doc(hidden)]
46 pub unsafe fn new(ident: usize) -> Self {
47 ReplicaToken(ident)
48 }
49
50 /// Getter for id
51 pub fn id(&self) -> usize {
52 self.0
53 }
54}
55
56/// The maximum number of threads that can be registered with a replica. If more than
57/// this number of threads try to register, the register() function will return None.
58///
59/// # Important
60/// If this number is adjusted due to the use of the `arr_macro::arr` macro we
61/// have to adjust the `256` literals in the `new` constructor of `Replica`.
62#[cfg(not(loom))]
63pub const MAX_THREADS_PER_REPLICA: usize = 256;
64#[cfg(loom)]
65pub const MAX_THREADS_PER_REPLICA: usize = 2;
66const_assert!(
67 MAX_THREADS_PER_REPLICA >= 1 && (MAX_THREADS_PER_REPLICA & (MAX_THREADS_PER_REPLICA - 1) == 0)
68);
69
70/// An instance of a replicated data structure. Uses a shared log to scale
71/// operations on the data structure across cores and processors.
72///
73/// Takes in one type argument: `D` represents the underlying sequential data
74/// structure `D` must implement the `Dispatch` trait.
75///
76/// A thread can be registered against the replica by calling `register()`. A
77/// mutable operation can be issued by calling `execute_mut()` (immutable uses
78/// `execute`). A mutable operation will be eventually executed against the replica
79/// along with any operations that were received on other replicas that share
80/// the same underlying log.
81pub struct Replica<'a, D>
82where
83 D: Sized + Dispatch + Sync,
84{
85 /// A replica-identifier received when the replica is registered against
86 /// the shared-log. Required when consuming operations from the log.
87 idx: usize,
88
89 /// Thread idx of the thread currently responsible for flat combining. Zero
90 /// if there isn't any thread actively performing flat combining on the log.
91 /// This also doubles up as the combiner lock.
92 combiner: CachePadded<AtomicUsize>,
93
94 /// Idx that will be handed out to the next thread that registers with the replica.
95 next: CachePadded<AtomicUsize>,
96
97 /// List of per-thread contexts. Threads buffer write operations in here when they
98 /// cannot perform flat combining (because another thread might be doing so).
99 ///
100 /// The vector is initialized with `MAX_THREADS_PER_REPLICA` elements.
101 contexts: Vec<Context<<D as Dispatch>::WriteOperation, <D as Dispatch>::Response>>,
102
103 /// A buffer of operations for flat combining. The combiner stages operations in
104 /// here and then batch appends them into the shared log. This helps amortize
105 /// the cost of the compare_and_swap() on the tail of the log.
106 buffer: RefCell<Vec<<D as Dispatch>::WriteOperation>>,
107
108 /// Number of operations collected by the combiner from each thread at any
109 /// given point of time. Index `i` holds the number of operations collected from
110 /// thread with identifier `i + 1`.
111 inflight: RefCell<[usize; MAX_THREADS_PER_REPLICA]>,
112
113 /// A buffer of results collected after flat combining. With the help of `inflight`,
114 /// the combiner enqueues these results into the appropriate thread context.
115 result: RefCell<Vec<<D as Dispatch>::Response>>,
116
117 /// Reference to the shared log that operations will be appended to and the
118 /// data structure will be updated from.
119 slog: Arc<Log<'a, <D as Dispatch>::WriteOperation>>,
120
121 /// The underlying replicated data structure. Shared between threads registered
122 /// with this replica. Each replica maintains its own.
123 data: CachePadded<RwLock<D>>,
124}
125
126/// The Replica is Sync. Member variables are protected by a CAS on `combiner`.
127/// Contexts are thread-safe.
128unsafe impl<'a, D> Sync for Replica<'a, D> where D: Sized + Sync + Dispatch {}
129
130impl<'a, D> core::fmt::Debug for Replica<'a, D>
131where
132 D: Sized + Sync + Dispatch,
133{
134 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
135 write!(f, "Replica")
136 }
137}
138
139impl<'a, D> Replica<'a, D>
140where
141 D: Sized + Default + Dispatch + Sync,
142{
143 /// Constructs an instance of a replicated data structure.
144 ///
145 /// Takes a reference to the shared log as an argument. The Log is assumed to
146 /// outlive the replica. The replica is bound to the log's lifetime.
147 ///
148 /// # Example
149 ///
150 /// ```
151 /// use node_replication::Dispatch;
152 /// use node_replication::Log;
153 /// use node_replication::Replica;
154 ///
155 /// use std::sync::Arc;
156 ///
157 /// // The data structure we want replicated.
158 /// #[derive(Default)]
159 /// struct Data {
160 /// junk: u64,
161 /// }
162 ///
163 /// // This trait allows the `Data` to be used with node-replication.
164 /// impl Dispatch for Data {
165 /// type ReadOperation = ();
166 /// type WriteOperation = u64;
167 /// type Response = Option<u64>;
168 ///
169 /// // A read returns the underlying u64.
170 /// fn dispatch(
171 /// &self,
172 /// _op: Self::ReadOperation,
173 /// ) -> Self::Response {
174 /// Some(self.junk)
175 /// }
176 ///
177 /// // A write updates the underlying u64.
178 /// fn dispatch_mut(
179 /// &mut self,
180 /// op: Self::WriteOperation,
181 /// ) -> Self::Response {
182 /// self.junk = op;
183 /// None
184 /// }
185 /// }
186 ///
187 /// // First create a shared log.
188 /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
189 ///
190 /// // Create a replica that uses the above log.
191 /// let replica = Replica::<Data>::new(&log);
192 /// ```
193 pub fn new<'b>(log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>) -> Arc<Replica<'b, D>> {
194 Replica::with_data(log, Default::default())
195 }
196}
197
198impl<'a, D> Replica<'a, D>
199where
200 D: Sized + Dispatch + Sync,
201{
202 /// Similar to [`Replica<D>::new`], but we pass a pre-initialized
203 /// data-structure as an argument (`d`) rather than relying on the
204 /// [Default](core::default::Default) trait to create one.
205 ///
206 /// # Note
207 /// [`Replica<D>::new`] should be the preferred method to create a Replica.
208 /// If `with_data` is used, care must be taken that the same state is passed
209 /// to every Replica object. If not the resulting operations executed
210 /// against replicas may not give deterministic results.
211 #[cfg(not(feature = "unstable"))]
212 pub fn with_data<'b>(
213 log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
214 d: D,
215 ) -> Arc<Replica<'b, D>> {
216 let mut contexts = Vec::with_capacity(MAX_THREADS_PER_REPLICA);
217 // Add `MAX_THREADS_PER_REPLICA` contexts
218 for _idx in 0..MAX_THREADS_PER_REPLICA {
219 contexts.push(Default::default());
220 }
221
222 Arc::new(
223 Replica {
224 idx: log.register().unwrap(),
225 combiner: CachePadded::new(AtomicUsize::new(0)),
226 next: CachePadded::new(AtomicUsize::new(1)),
227 contexts,
228 buffer:
229 RefCell::new(
230 Vec::with_capacity(
231 MAX_THREADS_PER_REPLICA
232 * Context::<
233 <D as Dispatch>::WriteOperation,
234 <D as Dispatch>::Response,
235 >::batch_size(),
236 ),
237 ),
238 inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
239 result:
240 RefCell::new(
241 Vec::with_capacity(
242 MAX_THREADS_PER_REPLICA
243 * Context::<
244 <D as Dispatch>::WriteOperation,
245 <D as Dispatch>::Response,
246 >::batch_size(),
247 ),
248 ),
249 slog: log.clone(),
250 data: CachePadded::new(RwLock::<D>::new(d)),
251 },
252 )
253 }
254
255 /// See `with_data` documentation without unstable feature.
256 #[cfg(feature = "unstable")]
257 pub fn with_data<'b>(
258 log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
259 d: D,
260 ) -> Arc<Replica<'b, D>> {
261 use core::mem::MaybeUninit;
262 let mut uninit_replica: Arc<MaybeUninit<Replica<D>>> = Arc::new_zeroed();
263
264 // This is the preferred (but unsafe) mode of initialization as it avoids
265 // putting the big Replica object on the stack first.
266 unsafe {
267 let uninit_ptr = Arc::get_mut_unchecked(&mut uninit_replica).as_mut_ptr();
268 uninit_ptr.write(Replica {
269 idx: log.register().unwrap(),
270 combiner: CachePadded::new(AtomicUsize::new(0)),
271 next: CachePadded::new(AtomicUsize::new(1)),
272 contexts: Vec::with_capacity(MAX_THREADS_PER_REPLICA),
273 buffer:
274 RefCell::new(
275 Vec::with_capacity(
276 MAX_THREADS_PER_REPLICA
277 * Context::<
278 <D as Dispatch>::WriteOperation,
279 <D as Dispatch>::Response,
280 >::batch_size(),
281 ),
282 ),
283 inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
284 result:
285 RefCell::new(
286 Vec::with_capacity(
287 MAX_THREADS_PER_REPLICA
288 * Context::<
289 <D as Dispatch>::WriteOperation,
290 <D as Dispatch>::Response,
291 >::batch_size(),
292 ),
293 ),
294 slog: log.clone(),
295 data: CachePadded::new(RwLock::<D>::new(d)),
296 });
297
298 let mut replica = uninit_replica.assume_init();
299 // Add `MAX_THREADS_PER_REPLICA` contexts
300 for _idx in 0..MAX_THREADS_PER_REPLICA {
301 Arc::get_mut(&mut replica)
302 .unwrap()
303 .contexts
304 .push(Default::default());
305 }
306
307 replica
308 }
309 }
310
311 /// Registers a thread with this replica. Returns an idx inside an Option if the registration
312 /// was successfull. None if the registration failed.
313 ///
314 /// # Example
315 ///
316 /// ```
317 /// use node_replication::Dispatch;
318 /// use node_replication::Log;
319 /// use node_replication::Replica;
320 ///
321 /// use std::sync::Arc;
322 ///
323 /// #[derive(Default)]
324 /// struct Data {
325 /// junk: u64,
326 /// }
327 ///
328 /// impl Dispatch for Data {
329 /// type ReadOperation = ();
330 /// type WriteOperation = u64;
331 /// type Response = Option<u64>;
332 ///
333 /// fn dispatch(
334 /// &self,
335 /// _op: Self::ReadOperation,
336 /// ) -> Self::Response {
337 /// Some(self.junk)
338 /// }
339 ///
340 /// fn dispatch_mut(
341 /// &mut self,
342 /// op: Self::WriteOperation,
343 /// ) -> Self::Response {
344 /// self.junk = op;
345 /// None
346 /// }
347 /// }
348 ///
349 /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
350 /// let replica = Replica::<Data>::new(&log);
351 ///
352 /// // Calling register() returns an idx that can be used to execute
353 /// // operations against the replica.
354 /// let idx = replica.register().expect("Failed to register with replica.");
355 /// ```
356 pub fn register(&self) -> Option<ReplicaToken> {
357 // Loop until we either run out of identifiers or we manage to increment `next`.
358 loop {
359 let idx = self.next.load(Ordering::SeqCst);
360
361 if idx > MAX_THREADS_PER_REPLICA {
362 return None;
363 };
364
365 if self
366 .next
367 .compare_exchange_weak(idx, idx + 1, Ordering::SeqCst, Ordering::SeqCst)
368 != Ok(idx)
369 {
370 continue;
371 };
372
373 return Some(ReplicaToken(idx));
374 }
375 }
376
377 /// Executes an mutable operation against this replica and returns a response.
378 /// `idx` is an identifier for the thread performing the execute operation.
379 ///
380 /// # Example
381 ///
382 /// ```
383 /// use node_replication::Dispatch;
384 /// use node_replication::Log;
385 /// use node_replication::Replica;
386 ///
387 /// use std::sync::Arc;
388 ///
389 /// #[derive(Default)]
390 /// struct Data {
391 /// junk: u64,
392 /// }
393 ///
394 /// impl Dispatch for Data {
395 /// type ReadOperation = ();
396 /// type WriteOperation = u64;
397 /// type Response = Option<u64>;
398 ///
399 /// fn dispatch(
400 /// &self,
401 /// _op: Self::ReadOperation,
402 /// ) -> Self::Response {
403 /// Some(self.junk)
404 /// }
405 ///
406 /// fn dispatch_mut(
407 /// &mut self,
408 /// op: Self::WriteOperation,
409 /// ) -> Self::Response {
410 /// self.junk = op;
411 /// None
412 /// }
413 /// }
414 ///
415 /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
416 /// let replica = Replica::<Data>::new(&log);
417 /// let idx = replica.register().expect("Failed to register with replica.");
418 ///
419 /// // execute_mut() can be used to write to the replicated data structure.
420 /// let res = replica.execute_mut(100, idx);
421 /// assert_eq!(None, res);
422 pub fn execute_mut(
423 &self,
424 op: <D as Dispatch>::WriteOperation,
425 idx: ReplicaToken,
426 ) -> <D as Dispatch>::Response {
427 // Enqueue the operation onto the thread local batch and then try to flat combine.
428 while !self.make_pending(op.clone(), idx.0) {}
429 self.try_combine(idx.0);
430
431 // Return the response to the caller function.
432 self.get_response(idx.0)
433 }
434
435 /// Executes a read-only operation against this replica and returns a response.
436 /// `idx` is an identifier for the thread performing the execute operation.
437 ///
438 /// # Example
439 ///
440 /// ```
441 /// use node_replication::Dispatch;
442 /// use node_replication::Log;
443 /// use node_replication::Replica;
444 ///
445 /// use std::sync::Arc;
446 ///
447 /// #[derive(Default)]
448 /// struct Data {
449 /// junk: u64,
450 /// }
451 ///
452 /// impl Dispatch for Data {
453 /// type ReadOperation = ();
454 /// type WriteOperation = u64;
455 /// type Response = Option<u64>;
456 ///
457 /// fn dispatch(
458 /// &self,
459 /// _op: Self::ReadOperation,
460 /// ) -> Self::Response {
461 /// Some(self.junk)
462 /// }
463 ///
464 /// fn dispatch_mut(
465 /// &mut self,
466 /// op: Self::WriteOperation,
467 /// ) -> Self::Response {
468 /// self.junk = op;
469 /// None
470 /// }
471 /// }
472 ///
473 /// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
474 /// let replica = Replica::<Data>::new(&log);
475 /// let idx = replica.register().expect("Failed to register with replica.");
476 /// let _wr = replica.execute_mut(100, idx);
477 ///
478 /// // execute() can be used to read from the replicated data structure.
479 /// let res = replica.execute((), idx);
480 /// assert_eq!(Some(100), res);
481 pub fn execute(
482 &self,
483 op: <D as Dispatch>::ReadOperation,
484 idx: ReplicaToken,
485 ) -> <D as Dispatch>::Response {
486 self.read_only(op, idx.0)
487 }
488
489 /// Busy waits until a response is available within the thread's context.
490 /// `idx` identifies this thread.
491 fn get_response(&self, idx: usize) -> <D as Dispatch>::Response {
492 let mut iter = 0;
493 let interval = 1 << 29;
494
495 // Keep trying to retrieve a response from the thread context. After trying `interval`
496 // times with no luck, try to perform flat combining to make some progress.
497 loop {
498 let r = self.contexts[idx - 1].res();
499 if let Some(resp) = r {
500 return resp;
501 }
502
503 iter += 1;
504
505 if iter == interval {
506 self.try_combine(idx);
507 iter = 0;
508 }
509 }
510 }
511
512 /// Executes a passed in closure against the replica's underlying data
513 /// structure. Useful for unit testing; can be used to verify certain
514 /// properties of the data structure after issuing a bunch of operations
515 /// against it.
516 ///
517 /// # Note
518 /// There is probably no need for a regular client to ever call this function.
519 #[doc(hidden)]
520 pub fn verify<F: FnMut(&D)>(&self, mut v: F) {
521 // Acquire the combiner lock before attempting anything on the data structure.
522 // Use an idx greater than the maximum that can be allocated.
523 while self.combiner.compare_exchange_weak(
524 0,
525 MAX_THREADS_PER_REPLICA + 2,
526 Ordering::Acquire,
527 Ordering::Acquire,
528 ) != Ok(0)
529 {
530 spin_loop();
531 }
532
533 let mut data = self.data.write(self.next.load(Ordering::Relaxed));
534 let mut f = |o: <D as Dispatch>::WriteOperation, _i: usize| {
535 data.dispatch_mut(o);
536 };
537
538 self.slog.exec(self.idx, &mut f);
539
540 v(&data);
541
542 self.combiner.store(0, Ordering::Release);
543 }
544
545 /// This method is useful when a replica stops making progress and some threads
546 /// on another replica are still active. The active replica will use all the entries
547 /// in the log and won't be able perform garbage collection because of the inactive
548 /// replica. So, this method syncs up the replica against the underlying log.
549 pub fn sync(&self, idx: ReplicaToken) {
550 let ctail = self.slog.get_ctail();
551 while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
552 self.try_combine(idx.0);
553 spin_loop();
554 }
555 }
556
557 /// Issues a read-only operation against the replica and returns a response.
558 /// Makes sure the replica is synced up against the log before doing so.
559 fn read_only(
560 &self,
561 op: <D as Dispatch>::ReadOperation,
562 tid: usize,
563 ) -> <D as Dispatch>::Response {
564 // We can perform the read only if our replica is synced up against
565 // the shared log. If it isn't, then try to combine until it is synced up.
566 let ctail = self.slog.get_ctail();
567 while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
568 self.try_combine(tid);
569 spin_loop();
570 }
571
572 return self.data.read(tid - 1).dispatch(op);
573 }
574
575 /// Enqueues an operation inside a thread local context. Returns a boolean
576 /// indicating whether the operation was enqueued (true) or not (false).
577 #[inline(always)]
578 fn make_pending(&self, op: <D as Dispatch>::WriteOperation, idx: usize) -> bool {
579 self.contexts[idx - 1].enqueue(op)
580 }
581
582 /// Appends an operation to the log and attempts to perform flat combining.
583 /// Accepts a thread `tid` as an argument. Required to acquire the combiner lock.
584 fn try_combine(&self, tid: usize) {
585 // First, check if there already is a flat combiner. If there is no active flat combiner
586 // then try to acquire the combiner lock. If there is, then just return.
587 for _i in 0..4 {
588 #[cfg(not(loom))]
589 if unsafe {
590 core::ptr::read_volatile(
591 &self.combiner
592 as *const crossbeam_utils::CachePadded<core::sync::atomic::AtomicUsize>
593 as *const usize,
594 )
595 } != 0
596 {
597 return;
598 }
599
600 #[cfg(loom)]
601 {
602 if self.combiner.load(Ordering::Relaxed) != 0 {
603 loom::thread::yield_now();
604 return;
605 }
606 }
607 }
608
609 // Try to become the combiner here. If this fails, then simply return.
610 if self
611 .combiner
612 .compare_exchange_weak(0, tid, Ordering::Acquire, Ordering::Acquire)
613 != Ok(0)
614 {
615 #[cfg(loom)]
616 loom::thread::yield_now();
617 return;
618 }
619
620 // Successfully became the combiner; perform one round of flat combining.
621 self.combine();
622
623 // Allow other threads to perform flat combining once we have finished all our work.
624 // At this point, we've dropped all mutable references to thread contexts and to
625 // the staging buffer as well.
626 self.combiner.store(0, Ordering::Release);
627 }
628
629 /// Performs one round of flat combining. Collects, appends and executes operations.
630 #[inline(always)]
631 fn combine(&self) {
632 let mut buffer = self.buffer.borrow_mut();
633 let mut operations = self.inflight.borrow_mut();
634 let mut results = self.result.borrow_mut();
635
636 buffer.clear();
637 results.clear();
638
639 let next = self.next.load(Ordering::Relaxed);
640
641 // Collect operations from each thread registered with this replica.
642 for i in 1..next {
643 operations[i - 1] = self.contexts[i - 1].ops(&mut buffer);
644 }
645
646 // Append all collected operations into the shared log. We pass a closure
647 // in here because operations on the log might need to be consumed for GC.
648 {
649 let mut data = self.data.write(next);
650 let f = |o: <D as Dispatch>::WriteOperation, i: usize| {
651 #[cfg(not(loom))]
652 let resp = data.dispatch_mut(o);
653 #[cfg(loom)]
654 let resp = data.dispatch_mut(o);
655 if i == self.idx {
656 results.push(resp);
657 }
658 };
659 self.slog.append(&buffer, self.idx, f);
660 }
661
662 // Execute any operations on the shared log against this replica.
663 {
664 let mut data = self.data.write(next);
665 let mut f = |o: <D as Dispatch>::WriteOperation, i: usize| {
666 let resp = data.dispatch_mut(o);
667 if i == self.idx {
668 results.push(resp)
669 };
670 };
671 self.slog.exec(self.idx, &mut f);
672 }
673
674 // Return/Enqueue responses back into the appropriate thread context(s).
675 let (mut s, mut f) = (0, 0);
676 for i in 1..next {
677 if operations[i - 1] == 0 {
678 continue;
679 };
680
681 f += operations[i - 1];
682 self.contexts[i - 1].enqueue_resps(&results[s..f]);
683 s += operations[i - 1];
684 operations[i - 1] = 0;
685 }
686 }
687}
688
689#[cfg(test)]
690mod test {
691 extern crate std;
692
693 use super::*;
694 use std::vec;
695
696 // Really dumb data structure to test against the Replica and shared log.
697 #[derive(Default)]
698 struct Data {
699 junk: u64,
700 }
701
702 impl Dispatch for Data {
703 type ReadOperation = u64;
704 type WriteOperation = u64;
705 type Response = Result<u64, ()>;
706
707 fn dispatch(&self, _op: Self::ReadOperation) -> Self::Response {
708 Ok(self.junk)
709 }
710
711 fn dispatch_mut(&mut self, _op: Self::WriteOperation) -> Self::Response {
712 self.junk += 1;
713 return Ok(107);
714 }
715 }
716
717 // Tests whether we can construct a Replica given a log.
718 #[test]
719 fn test_replica_create() {
720 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
721 let repl = Replica::<Data>::new(&slog);
722 assert_eq!(repl.idx, 1);
723 assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
724 assert_eq!(repl.next.load(Ordering::SeqCst), 1);
725 assert_eq!(repl.contexts.len(), MAX_THREADS_PER_REPLICA);
726 assert_eq!(
727 repl.buffer.borrow().capacity(),
728 MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
729 );
730 assert_eq!(repl.inflight.borrow().len(), MAX_THREADS_PER_REPLICA);
731 assert_eq!(
732 repl.result.borrow().capacity(),
733 MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
734 );
735 assert_eq!(repl.data.read(0).junk, 0);
736 }
737
738 // Tests whether we can register with this replica and receive an idx.
739 #[test]
740 fn test_replica_register() {
741 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
742 let repl = Replica::<Data>::new(&slog);
743 assert_eq!(repl.register(), Some(ReplicaToken(1)));
744 assert_eq!(repl.next.load(Ordering::SeqCst), 2);
745 repl.next.store(17, Ordering::SeqCst);
746 assert_eq!(repl.register(), Some(ReplicaToken(17)));
747 assert_eq!(repl.next.load(Ordering::SeqCst), 18);
748 }
749
750 // Tests whether registering more than the maximum limit of threads per replica is disallowed.
751 #[test]
752 fn test_replica_register_none() {
753 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
754 let repl = Replica::<Data>::new(&slog);
755 repl.next
756 .store(MAX_THREADS_PER_REPLICA + 1, Ordering::SeqCst);
757 assert!(repl.register().is_none());
758 }
759
760 // Tests that we can successfully allow operations to go pending on this replica.
761 #[test]
762 fn test_replica_make_pending() {
763 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
764 let repl = Replica::<Data>::new(&slog);
765 let mut o = vec![];
766
767 assert!(repl.make_pending(121, 8));
768 assert_eq!(repl.contexts[7].ops(&mut o), 1);
769 assert_eq!(o.len(), 1);
770 assert_eq!(o[0], 121);
771 }
772
773 // Tests that we can't pend operations on a context that is already full of operations.
774 #[test]
775 fn test_replica_make_pending_false() {
776 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
777 let repl = Replica::<Data>::new(&slog);
778 for _i in 0..Context::<u64, Result<u64, ()>>::batch_size() {
779 assert!(repl.make_pending(121, 1))
780 }
781
782 assert!(!repl.make_pending(11, 1));
783 }
784
785 // Tests that we can append and execute operations using try_combine().
786 #[test]
787 fn test_replica_try_combine() {
788 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
789 let repl = Replica::<Data>::new(&slog);
790 let _idx = repl.register();
791
792 repl.make_pending(121, 1);
793 repl.try_combine(1);
794
795 assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
796 assert_eq!(repl.data.read(0).junk, 1);
797 assert_eq!(repl.contexts[0].res(), Some(Ok(107)));
798 }
799
800 // Tests whether try_combine() also applies pending operations on other threads to the log.
801 #[test]
802 fn test_replica_try_combine_pending() {
803 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
804 let repl = Replica::<Data>::new(&slog);
805
806 repl.next.store(9, Ordering::SeqCst);
807 repl.make_pending(121, 8);
808 repl.try_combine(1);
809
810 assert_eq!(repl.data.read(0).junk, 1);
811 assert_eq!(repl.contexts[7].res(), Some(Ok(107)));
812 }
813
814 // Tests whether try_combine() fails if someone else is currently flat combining.
815 #[test]
816 fn test_replica_try_combine_fail() {
817 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
818 let repl = Replica::<Data>::new(&slog);
819
820 repl.next.store(9, Ordering::SeqCst);
821 repl.combiner.store(8, Ordering::SeqCst);
822 repl.make_pending(121, 1);
823 repl.try_combine(1);
824
825 assert_eq!(repl.data.read(0).junk, 0);
826 assert_eq!(repl.contexts[0].res(), None);
827 }
828
829 // Tests whether we can execute an operation against the log using execute_mut().
830 #[test]
831 fn test_replica_execute_combine() {
832 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
833 let repl = Replica::<Data>::new(&slog);
834 let idx = repl.register().unwrap();
835
836 assert_eq!(Ok(107), repl.execute_mut(121, idx));
837 assert_eq!(1, repl.data.read(0).junk);
838 }
839
840 // Tests whether get_response() retrieves a response to an operation that was executed
841 // against a replica.
842 #[test]
843 fn test_replica_get_response() {
844 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
845 let repl = Replica::<Data>::new(&slog);
846 let _idx = repl.register();
847
848 repl.make_pending(121, 1);
849
850 assert_eq!(repl.get_response(1), Ok(107));
851 }
852
853 // Tests whether we can issue a read-only operation against the replica.
854 #[test]
855 fn test_replica_execute() {
856 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
857 let repl = Replica::<Data>::new(&slog);
858 let idx = repl.register().expect("Failed to register with replica.");
859
860 assert_eq!(Ok(107), repl.execute_mut(121, idx));
861 assert_eq!(Ok(1), repl.execute(11, idx));
862 }
863
864 // Tests that execute() syncs up the replica with the log before
865 // executing the read against the data structure.
866 #[test]
867 fn test_replica_execute_not_synced() {
868 let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
869 let repl = Replica::<Data>::new(&slog);
870
871 // Add in operations to the log off the side, not through the replica.
872 let o = [121, 212];
873 slog.append(&o, 2, |_o: u64, _i: usize| {});
874 slog.exec(2, &mut |_o: u64, _i: usize| {});
875
876 let t1 = repl.register().expect("Failed to register with replica.");
877 assert_eq!(Ok(2), repl.execute(11, t1));
878 }
879}