1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0 OR MIT
use core::cell::RefCell;
use core::hint::spin_loop;
#[cfg(not(loom))]
use core::sync::atomic::{AtomicUsize, Ordering};
#[cfg(loom)]
use loom::sync::atomic::{AtomicUsize, Ordering};
#[cfg(not(loom))]
use alloc::sync::Arc;
#[cfg(loom)]
use loom::sync::Arc;
use alloc::vec::Vec;
use crossbeam_utils::CachePadded;
use super::context::Context;
use super::log::Log;
use super::rwlock::RwLock;
use super::Dispatch;
/// A token handed out to threads registered with replicas.
///
/// # Note
/// Ideally this would be an affine type and returned again by
/// `execute` and `execute_ro`. However it feels like this would
/// hurt API ergonomics a lot.
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct ReplicaToken(usize);
/// To make it harder to use the same ReplicaToken on multiple threads.
#[cfg(features = "unstable")]
impl !Send for ReplicaToken {}
impl ReplicaToken {
/// Creates a new ReplicaToken
///
/// # Safety
/// This should only ever be used for the benchmark harness to create
/// additional fake replica implementations.
/// If we had a means to declare this not-pub we should do that instead.
#[doc(hidden)]
pub unsafe fn new(ident: usize) -> Self {
ReplicaToken(ident)
}
/// Getter for id
pub fn id(&self) -> usize {
self.0
}
}
/// The maximum number of threads that can be registered with a replica. If more than
/// this number of threads try to register, the register() function will return None.
///
/// # Important
/// If this number is adjusted due to the use of the `arr_macro::arr` macro we
/// have to adjust the `256` literals in the `new` constructor of `Replica`.
#[cfg(not(loom))]
pub const MAX_THREADS_PER_REPLICA: usize = 256;
#[cfg(loom)]
pub const MAX_THREADS_PER_REPLICA: usize = 2;
const_assert!(
MAX_THREADS_PER_REPLICA >= 1 && (MAX_THREADS_PER_REPLICA & (MAX_THREADS_PER_REPLICA - 1) == 0)
);
/// An instance of a replicated data structure. Uses a shared log to scale
/// operations on the data structure across cores and processors.
///
/// Takes in one type argument: `D` represents the underlying sequential data
/// structure `D` must implement the `Dispatch` trait.
///
/// A thread can be registered against the replica by calling `register()`. A
/// mutable operation can be issued by calling `execute_mut()` (immutable uses
/// `execute`). A mutable operation will be eventually executed against the replica
/// along with any operations that were received on other replicas that share
/// the same underlying log.
pub struct Replica<'a, D>
where
D: Sized + Dispatch + Sync,
{
/// A replica-identifier received when the replica is registered against
/// the shared-log. Required when consuming operations from the log.
idx: usize,
/// Thread idx of the thread currently responsible for flat combining. Zero
/// if there isn't any thread actively performing flat combining on the log.
/// This also doubles up as the combiner lock.
combiner: CachePadded<AtomicUsize>,
/// Idx that will be handed out to the next thread that registers with the replica.
next: CachePadded<AtomicUsize>,
/// List of per-thread contexts. Threads buffer write operations in here when they
/// cannot perform flat combining (because another thread might be doing so).
///
/// The vector is initialized with `MAX_THREADS_PER_REPLICA` elements.
contexts: Vec<Context<<D as Dispatch>::WriteOperation, <D as Dispatch>::Response>>,
/// A buffer of operations for flat combining. The combiner stages operations in
/// here and then batch appends them into the shared log. This helps amortize
/// the cost of the compare_and_swap() on the tail of the log.
buffer: RefCell<Vec<<D as Dispatch>::WriteOperation>>,
/// Number of operations collected by the combiner from each thread at any
/// given point of time. Index `i` holds the number of operations collected from
/// thread with identifier `i + 1`.
inflight: RefCell<[usize; MAX_THREADS_PER_REPLICA]>,
/// A buffer of results collected after flat combining. With the help of `inflight`,
/// the combiner enqueues these results into the appropriate thread context.
result: RefCell<Vec<<D as Dispatch>::Response>>,
/// Reference to the shared log that operations will be appended to and the
/// data structure will be updated from.
slog: Arc<Log<'a, <D as Dispatch>::WriteOperation>>,
/// The underlying replicated data structure. Shared between threads registered
/// with this replica. Each replica maintains its own.
data: CachePadded<RwLock<D>>,
}
/// The Replica is Sync. Member variables are protected by a CAS on `combiner`.
/// Contexts are thread-safe.
unsafe impl<'a, D> Sync for Replica<'a, D> where D: Sized + Sync + Dispatch {}
impl<'a, D> core::fmt::Debug for Replica<'a, D>
where
D: Sized + Sync + Dispatch,
{
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(f, "Replica")
}
}
impl<'a, D> Replica<'a, D>
where
D: Sized + Default + Dispatch + Sync,
{
/// Constructs an instance of a replicated data structure.
///
/// Takes a reference to the shared log as an argument. The Log is assumed to
/// outlive the replica. The replica is bound to the log's lifetime.
///
/// # Example
///
/// ```
/// use node_replication::Dispatch;
/// use node_replication::Log;
/// use node_replication::Replica;
///
/// use std::sync::Arc;
///
/// // The data structure we want replicated.
/// #[derive(Default)]
/// struct Data {
/// junk: u64,
/// }
///
/// // This trait allows the `Data` to be used with node-replication.
/// impl Dispatch for Data {
/// type ReadOperation = ();
/// type WriteOperation = u64;
/// type Response = Option<u64>;
///
/// // A read returns the underlying u64.
/// fn dispatch(
/// &self,
/// _op: Self::ReadOperation,
/// ) -> Self::Response {
/// Some(self.junk)
/// }
///
/// // A write updates the underlying u64.
/// fn dispatch_mut(
/// &mut self,
/// op: Self::WriteOperation,
/// ) -> Self::Response {
/// self.junk = op;
/// None
/// }
/// }
///
/// // First create a shared log.
/// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
///
/// // Create a replica that uses the above log.
/// let replica = Replica::<Data>::new(&log);
/// ```
pub fn new<'b>(log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>) -> Arc<Replica<'b, D>> {
Replica::with_data(log, Default::default())
}
}
impl<'a, D> Replica<'a, D>
where
D: Sized + Dispatch + Sync,
{
/// Similar to [`Replica<D>::new`], but we pass a pre-initialized
/// data-structure as an argument (`d`) rather than relying on the
/// [Default](core::default::Default) trait to create one.
///
/// # Note
/// [`Replica<D>::new`] should be the preferred method to create a Replica.
/// If `with_data` is used, care must be taken that the same state is passed
/// to every Replica object. If not the resulting operations executed
/// against replicas may not give deterministic results.
#[cfg(not(feature = "unstable"))]
pub fn with_data<'b>(
log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
d: D,
) -> Arc<Replica<'b, D>> {
let mut contexts = Vec::with_capacity(MAX_THREADS_PER_REPLICA);
// Add `MAX_THREADS_PER_REPLICA` contexts
for _idx in 0..MAX_THREADS_PER_REPLICA {
contexts.push(Default::default());
}
Arc::new(
Replica {
idx: log.register().unwrap(),
combiner: CachePadded::new(AtomicUsize::new(0)),
next: CachePadded::new(AtomicUsize::new(1)),
contexts,
buffer:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
result:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
slog: log.clone(),
data: CachePadded::new(RwLock::<D>::new(d)),
},
)
}
/// See `with_data` documentation without unstable feature.
#[cfg(feature = "unstable")]
pub fn with_data<'b>(
log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
d: D,
) -> Arc<Replica<'b, D>> {
use core::mem::MaybeUninit;
let mut uninit_replica: Arc<MaybeUninit<Replica<D>>> = Arc::new_zeroed();
// This is the preferred (but unsafe) mode of initialization as it avoids
// putting the big Replica object on the stack first.
unsafe {
let uninit_ptr = Arc::get_mut_unchecked(&mut uninit_replica).as_mut_ptr();
uninit_ptr.write(Replica {
idx: log.register().unwrap(),
combiner: CachePadded::new(AtomicUsize::new(0)),
next: CachePadded::new(AtomicUsize::new(1)),
contexts: Vec::with_capacity(MAX_THREADS_PER_REPLICA),
buffer:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
result:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
slog: log.clone(),
data: CachePadded::new(RwLock::<D>::new(d)),
});
let mut replica = uninit_replica.assume_init();
// Add `MAX_THREADS_PER_REPLICA` contexts
for _idx in 0..MAX_THREADS_PER_REPLICA {
Arc::get_mut(&mut replica)
.unwrap()
.contexts
.push(Default::default());
}
replica
}
}
/// Registers a thread with this replica. Returns an idx inside an Option if the registration
/// was successfull. None if the registration failed.
///
/// # Example
///
/// ```
/// use node_replication::Dispatch;
/// use node_replication::Log;
/// use node_replication::Replica;
///
/// use std::sync::Arc;
///
/// #[derive(Default)]
/// struct Data {
/// junk: u64,
/// }
///
/// impl Dispatch for Data {
/// type ReadOperation = ();
/// type WriteOperation = u64;
/// type Response = Option<u64>;
///
/// fn dispatch(
/// &self,
/// _op: Self::ReadOperation,
/// ) -> Self::Response {
/// Some(self.junk)
/// }
///
/// fn dispatch_mut(
/// &mut self,
/// op: Self::WriteOperation,
/// ) -> Self::Response {
/// self.junk = op;
/// None
/// }
/// }
///
/// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
/// let replica = Replica::<Data>::new(&log);
///
/// // Calling register() returns an idx that can be used to execute
/// // operations against the replica.
/// let idx = replica.register().expect("Failed to register with replica.");
/// ```
pub fn register(&self) -> Option<ReplicaToken> {
// Loop until we either run out of identifiers or we manage to increment `next`.
loop {
let idx = self.next.load(Ordering::SeqCst);
if idx > MAX_THREADS_PER_REPLICA {
return None;
};
if self
.next
.compare_exchange_weak(idx, idx + 1, Ordering::SeqCst, Ordering::SeqCst)
!= Ok(idx)
{
continue;
};
return Some(ReplicaToken(idx));
}
}
/// Executes an mutable operation against this replica and returns a response.
/// `idx` is an identifier for the thread performing the execute operation.
///
/// # Example
///
/// ```
/// use node_replication::Dispatch;
/// use node_replication::Log;
/// use node_replication::Replica;
///
/// use std::sync::Arc;
///
/// #[derive(Default)]
/// struct Data {
/// junk: u64,
/// }
///
/// impl Dispatch for Data {
/// type ReadOperation = ();
/// type WriteOperation = u64;
/// type Response = Option<u64>;
///
/// fn dispatch(
/// &self,
/// _op: Self::ReadOperation,
/// ) -> Self::Response {
/// Some(self.junk)
/// }
///
/// fn dispatch_mut(
/// &mut self,
/// op: Self::WriteOperation,
/// ) -> Self::Response {
/// self.junk = op;
/// None
/// }
/// }
///
/// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
/// let replica = Replica::<Data>::new(&log);
/// let idx = replica.register().expect("Failed to register with replica.");
///
/// // execute_mut() can be used to write to the replicated data structure.
/// let res = replica.execute_mut(100, idx);
/// assert_eq!(None, res);
pub fn execute_mut(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken,
) -> <D as Dispatch>::Response {
// Enqueue the operation onto the thread local batch and then try to flat combine.
while !self.make_pending(op.clone(), idx.0) {}
self.try_combine(idx.0);
// Return the response to the caller function.
self.get_response(idx.0)
}
/// Executes a read-only operation against this replica and returns a response.
/// `idx` is an identifier for the thread performing the execute operation.
///
/// # Example
///
/// ```
/// use node_replication::Dispatch;
/// use node_replication::Log;
/// use node_replication::Replica;
///
/// use std::sync::Arc;
///
/// #[derive(Default)]
/// struct Data {
/// junk: u64,
/// }
///
/// impl Dispatch for Data {
/// type ReadOperation = ();
/// type WriteOperation = u64;
/// type Response = Option<u64>;
///
/// fn dispatch(
/// &self,
/// _op: Self::ReadOperation,
/// ) -> Self::Response {
/// Some(self.junk)
/// }
///
/// fn dispatch_mut(
/// &mut self,
/// op: Self::WriteOperation,
/// ) -> Self::Response {
/// self.junk = op;
/// None
/// }
/// }
///
/// let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
/// let replica = Replica::<Data>::new(&log);
/// let idx = replica.register().expect("Failed to register with replica.");
/// let _wr = replica.execute_mut(100, idx);
///
/// // execute() can be used to read from the replicated data structure.
/// let res = replica.execute((), idx);
/// assert_eq!(Some(100), res);
pub fn execute(
&self,
op: <D as Dispatch>::ReadOperation,
idx: ReplicaToken,
) -> <D as Dispatch>::Response {
self.read_only(op, idx.0)
}
/// Busy waits until a response is available within the thread's context.
/// `idx` identifies this thread.
fn get_response(&self, idx: usize) -> <D as Dispatch>::Response {
let mut iter = 0;
let interval = 1 << 29;
// Keep trying to retrieve a response from the thread context. After trying `interval`
// times with no luck, try to perform flat combining to make some progress.
loop {
let r = self.contexts[idx - 1].res();
if let Some(resp) = r {
return resp;
}
iter += 1;
if iter == interval {
self.try_combine(idx);
iter = 0;
}
}
}
/// Executes a passed in closure against the replica's underlying data
/// structure. Useful for unit testing; can be used to verify certain
/// properties of the data structure after issuing a bunch of operations
/// against it.
///
/// # Note
/// There is probably no need for a regular client to ever call this function.
#[doc(hidden)]
pub fn verify<F: FnMut(&D)>(&self, mut v: F) {
// Acquire the combiner lock before attempting anything on the data structure.
// Use an idx greater than the maximum that can be allocated.
while self.combiner.compare_exchange_weak(
0,
MAX_THREADS_PER_REPLICA + 2,
Ordering::Acquire,
Ordering::Acquire,
) != Ok(0)
{
spin_loop();
}
let mut data = self.data.write(self.next.load(Ordering::Relaxed));
let mut f = |o: <D as Dispatch>::WriteOperation, _i: usize| {
data.dispatch_mut(o);
};
self.slog.exec(self.idx, &mut f);
v(&data);
self.combiner.store(0, Ordering::Release);
}
/// This method is useful when a replica stops making progress and some threads
/// on another replica are still active. The active replica will use all the entries
/// in the log and won't be able perform garbage collection because of the inactive
/// replica. So, this method syncs up the replica against the underlying log.
pub fn sync(&self, idx: ReplicaToken) {
let ctail = self.slog.get_ctail();
while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
self.try_combine(idx.0);
spin_loop();
}
}
/// Issues a read-only operation against the replica and returns a response.
/// Makes sure the replica is synced up against the log before doing so.
fn read_only(
&self,
op: <D as Dispatch>::ReadOperation,
tid: usize,
) -> <D as Dispatch>::Response {
// We can perform the read only if our replica is synced up against
// the shared log. If it isn't, then try to combine until it is synced up.
let ctail = self.slog.get_ctail();
while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
self.try_combine(tid);
spin_loop();
}
return self.data.read(tid - 1).dispatch(op);
}
/// Enqueues an operation inside a thread local context. Returns a boolean
/// indicating whether the operation was enqueued (true) or not (false).
#[inline(always)]
fn make_pending(&self, op: <D as Dispatch>::WriteOperation, idx: usize) -> bool {
self.contexts[idx - 1].enqueue(op)
}
/// Appends an operation to the log and attempts to perform flat combining.
/// Accepts a thread `tid` as an argument. Required to acquire the combiner lock.
fn try_combine(&self, tid: usize) {
// First, check if there already is a flat combiner. If there is no active flat combiner
// then try to acquire the combiner lock. If there is, then just return.
for _i in 0..4 {
#[cfg(not(loom))]
if unsafe {
core::ptr::read_volatile(
&self.combiner
as *const crossbeam_utils::CachePadded<core::sync::atomic::AtomicUsize>
as *const usize,
)
} != 0
{
return;
}
#[cfg(loom)]
{
if self.combiner.load(Ordering::Relaxed) != 0 {
loom::thread::yield_now();
return;
}
}
}
// Try to become the combiner here. If this fails, then simply return.
if self
.combiner
.compare_exchange_weak(0, tid, Ordering::Acquire, Ordering::Acquire)
!= Ok(0)
{
#[cfg(loom)]
loom::thread::yield_now();
return;
}
// Successfully became the combiner; perform one round of flat combining.
self.combine();
// Allow other threads to perform flat combining once we have finished all our work.
// At this point, we've dropped all mutable references to thread contexts and to
// the staging buffer as well.
self.combiner.store(0, Ordering::Release);
}
/// Performs one round of flat combining. Collects, appends and executes operations.
#[inline(always)]
fn combine(&self) {
let mut buffer = self.buffer.borrow_mut();
let mut operations = self.inflight.borrow_mut();
let mut results = self.result.borrow_mut();
buffer.clear();
results.clear();
let next = self.next.load(Ordering::Relaxed);
// Collect operations from each thread registered with this replica.
for i in 1..next {
operations[i - 1] = self.contexts[i - 1].ops(&mut buffer);
}
// Append all collected operations into the shared log. We pass a closure
// in here because operations on the log might need to be consumed for GC.
{
let mut data = self.data.write(next);
let f = |o: <D as Dispatch>::WriteOperation, i: usize| {
#[cfg(not(loom))]
let resp = data.dispatch_mut(o);
#[cfg(loom)]
let resp = data.dispatch_mut(o);
if i == self.idx {
results.push(resp);
}
};
self.slog.append(&buffer, self.idx, f);
}
// Execute any operations on the shared log against this replica.
{
let mut data = self.data.write(next);
let mut f = |o: <D as Dispatch>::WriteOperation, i: usize| {
let resp = data.dispatch_mut(o);
if i == self.idx {
results.push(resp)
};
};
self.slog.exec(self.idx, &mut f);
}
// Return/Enqueue responses back into the appropriate thread context(s).
let (mut s, mut f) = (0, 0);
for i in 1..next {
if operations[i - 1] == 0 {
continue;
};
f += operations[i - 1];
self.contexts[i - 1].enqueue_resps(&results[s..f]);
s += operations[i - 1];
operations[i - 1] = 0;
}
}
}
#[cfg(test)]
mod test {
extern crate std;
use super::*;
use std::vec;
// Really dumb data structure to test against the Replica and shared log.
#[derive(Default)]
struct Data {
junk: u64,
}
impl Dispatch for Data {
type ReadOperation = u64;
type WriteOperation = u64;
type Response = Result<u64, ()>;
fn dispatch(&self, _op: Self::ReadOperation) -> Self::Response {
Ok(self.junk)
}
fn dispatch_mut(&mut self, _op: Self::WriteOperation) -> Self::Response {
self.junk += 1;
return Ok(107);
}
}
// Tests whether we can construct a Replica given a log.
#[test]
fn test_replica_create() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
assert_eq!(repl.idx, 1);
assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
assert_eq!(repl.next.load(Ordering::SeqCst), 1);
assert_eq!(repl.contexts.len(), MAX_THREADS_PER_REPLICA);
assert_eq!(
repl.buffer.borrow().capacity(),
MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
);
assert_eq!(repl.inflight.borrow().len(), MAX_THREADS_PER_REPLICA);
assert_eq!(
repl.result.borrow().capacity(),
MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
);
assert_eq!(repl.data.read(0).junk, 0);
}
// Tests whether we can register with this replica and receive an idx.
#[test]
fn test_replica_register() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
assert_eq!(repl.register(), Some(ReplicaToken(1)));
assert_eq!(repl.next.load(Ordering::SeqCst), 2);
repl.next.store(17, Ordering::SeqCst);
assert_eq!(repl.register(), Some(ReplicaToken(17)));
assert_eq!(repl.next.load(Ordering::SeqCst), 18);
}
// Tests whether registering more than the maximum limit of threads per replica is disallowed.
#[test]
fn test_replica_register_none() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
repl.next
.store(MAX_THREADS_PER_REPLICA + 1, Ordering::SeqCst);
assert!(repl.register().is_none());
}
// Tests that we can successfully allow operations to go pending on this replica.
#[test]
fn test_replica_make_pending() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
let mut o = vec![];
assert!(repl.make_pending(121, 8));
assert_eq!(repl.contexts[7].ops(&mut o), 1);
assert_eq!(o.len(), 1);
assert_eq!(o[0], 121);
}
// Tests that we can't pend operations on a context that is already full of operations.
#[test]
fn test_replica_make_pending_false() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
for _i in 0..Context::<u64, Result<u64, ()>>::batch_size() {
assert!(repl.make_pending(121, 1))
}
assert!(!repl.make_pending(11, 1));
}
// Tests that we can append and execute operations using try_combine().
#[test]
fn test_replica_try_combine() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let _idx = repl.register();
repl.make_pending(121, 1);
repl.try_combine(1);
assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
assert_eq!(repl.data.read(0).junk, 1);
assert_eq!(repl.contexts[0].res(), Some(Ok(107)));
}
// Tests whether try_combine() also applies pending operations on other threads to the log.
#[test]
fn test_replica_try_combine_pending() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
repl.next.store(9, Ordering::SeqCst);
repl.make_pending(121, 8);
repl.try_combine(1);
assert_eq!(repl.data.read(0).junk, 1);
assert_eq!(repl.contexts[7].res(), Some(Ok(107)));
}
// Tests whether try_combine() fails if someone else is currently flat combining.
#[test]
fn test_replica_try_combine_fail() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
repl.next.store(9, Ordering::SeqCst);
repl.combiner.store(8, Ordering::SeqCst);
repl.make_pending(121, 1);
repl.try_combine(1);
assert_eq!(repl.data.read(0).junk, 0);
assert_eq!(repl.contexts[0].res(), None);
}
// Tests whether we can execute an operation against the log using execute_mut().
#[test]
fn test_replica_execute_combine() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let idx = repl.register().unwrap();
assert_eq!(Ok(107), repl.execute_mut(121, idx));
assert_eq!(1, repl.data.read(0).junk);
}
// Tests whether get_response() retrieves a response to an operation that was executed
// against a replica.
#[test]
fn test_replica_get_response() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let _idx = repl.register();
repl.make_pending(121, 1);
assert_eq!(repl.get_response(1), Ok(107));
}
// Tests whether we can issue a read-only operation against the replica.
#[test]
fn test_replica_execute() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let idx = repl.register().expect("Failed to register with replica.");
assert_eq!(Ok(107), repl.execute_mut(121, idx));
assert_eq!(Ok(1), repl.execute(11, idx));
}
// Tests that execute() syncs up the replica with the log before
// executing the read against the data structure.
#[test]
fn test_replica_execute_not_synced() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
// Add in operations to the log off the side, not through the replica.
let o = [121, 212];
slog.append(&o, 2, |_o: u64, _i: usize| {});
slog.exec(2, &mut |_o: u64, _i: usize| {});
let t1 = repl.register().expect("Failed to register with replica.");
assert_eq!(Ok(2), repl.execute(11, t1));
}
}