node-replication 0.1.1

An operation-log based approach that transform single-threaded data structures into concurrent, replicated structures.
Documentation
// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0 OR MIT

use alloc::alloc::{alloc, dealloc, Layout};

use core::cell::Cell;
use core::default::Default;
use core::fmt;
use core::mem::{align_of, size_of};
use core::ops::{Drop, FnMut};
use core::slice::from_raw_parts_mut;

#[cfg(not(loom))]
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[cfg(loom)]
pub use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

use crossbeam_utils::CachePadded;

use crate::context::MAX_PENDING_OPS;
use crate::replica::MAX_THREADS_PER_REPLICA;

/// The default size of the shared log in bytes. If constructed using the
/// default constructor, the log will be these many bytes in size. Currently
/// set to 32 MiB based on the ASPLOS 2017 paper.
const DEFAULT_LOG_BYTES: usize = 32 * 1024 * 1024;
const_assert!(DEFAULT_LOG_BYTES >= 1 && (DEFAULT_LOG_BYTES & (DEFAULT_LOG_BYTES - 1) == 0));

/// The maximum number of replicas that can be registered with the log.
#[cfg(not(loom))]
pub const MAX_REPLICAS_PER_LOG: usize = 192;
#[cfg(loom)] // Otherwise uses too much stack space wich crashes in loom...
pub const MAX_REPLICAS_PER_LOG: usize = 3;

/// Constant required for garbage collection. When the tail and the head are
/// these many entries apart on the circular buffer, garbage collection will
/// be performed by one of the replicas registered with the log.
///
/// For the GC algorithm to work, we need to ensure that we can support the
/// largest possible append after deciding to perform GC. This largest possible
/// append is when every thread within a replica has a full batch of writes
/// to be appended to the shared log.
const GC_FROM_HEAD: usize = MAX_PENDING_OPS * MAX_THREADS_PER_REPLICA;
const_assert!(GC_FROM_HEAD >= 1 && (GC_FROM_HEAD & (GC_FROM_HEAD - 1) == 0));

/// Threshold after how many iterations we log a warning for busy spinning loops.
///
/// This helps with debugging to figure out where things may end up blocking.
/// Should be a power of two to avoid divisions.
const WARN_THRESHOLD: usize = 1 << 28;

/// An entry that sits on the log. Each entry consists of three fields: The operation to
/// be performed when a thread reaches this entry on the log, the replica that appended
/// this operation, and a flag indicating whether this entry is valid.
///
/// `T` is the type on the operation - typically an enum class containing opcodes as well as
/// arguments. It is required that this type be sized and cloneable.
#[derive(Default)]
#[repr(align(64))]
struct Entry<T>
where
    T: Sized + Clone,
{
    /// The operation that this entry represents.
    operation: Option<T>,

    /// Identifies the replica that issued the above operation.
    replica: usize,

    /// Indicates whether this entry represents a valid operation when on the log.
    alivef: AtomicBool,
}

/// A log of operations that is typically accessed by multiple
/// [Replica](struct.Replica.html).
///
/// Operations can be added to the log by calling the `append()` method and
/// providing a list of operations to be performed.
///
/// Operations already on the log can be executed by calling the `exec()` method
/// and providing a replica-id along with a closure. Newly added operations
/// since the replica last called `exec()` will be executed by invoking the
/// supplied closure for each one of them.
///
/// Accepts one generic type parameter; `T` defines the type of operations and
/// their arguments that will go on the log and would typically be an enum
/// class.
///
/// This struct is aligned to 64 bytes optimizing cache access.\
///
/// # Note
/// As a client, typically there is no need to call any methods on the Log aside
/// from `new`. Only in the rare circumstance someone would implement their own
/// Replica would it be necessary to call any of the Log's methods.
#[repr(align(64))]
pub struct Log<'a, T>
where
    T: Sized + Clone,
{
    /// Raw pointer to the actual underlying log. Required for dealloc.
    rawp: *mut u8,

    /// Size of the underlying log in bytes. Required for dealloc.
    rawb: usize,

    /// The maximum number of entries that can be held inside the log.
    size: usize,

    /// A reference to the actual log. Nothing but a slice of entries.
    slog: &'a [Cell<Entry<T>>],

    /// Logical index into the above slice at which the log starts.
    head: CachePadded<AtomicUsize>,

    /// Logical index into the above slice at which the log ends.
    /// New appends go here.
    tail: CachePadded<AtomicUsize>,

    /// Completed tail maintains an index <= tail that points to a
    /// log entry after which there are no completed operations across
    /// all replicas registered against this log.
    ctail: CachePadded<AtomicUsize>,

    /// Array consisting of the local tail of each replica registered with the log.
    /// Required for garbage collection; since replicas make progress over the log
    /// independently, we want to make sure that we don't garbage collect operations
    /// that haven't been executed by all replicas.
    ltails: [CachePadded<AtomicUsize>; MAX_REPLICAS_PER_LOG],

    /// Identifier that will be allocated to the next replica that registers with
    /// this Log. Also required to correctly index into ltails above.
    next: CachePadded<AtomicUsize>,

    /// Array consisting of local alive masks for each registered replica. Required
    /// because replicas make independent progress over the log, so we need to
    /// track log wrap-arounds for each of them separately.
    lmasks: [CachePadded<Cell<bool>>; MAX_REPLICAS_PER_LOG],
}

impl<'a, T> fmt::Debug for Log<'a, T>
where
    T: Sized + Clone,
{
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("Log")
            .field("head", &self.tail)
            .field("tail", &self.head)
            .field("size", &self.size)
            .finish()
    }
}

/// The Log is Send. The *mut u8 (`rawp`) is never dereferenced.
unsafe impl<'a, T> Send for Log<'a, T> where T: Sized + Clone {}

/// The Log is Sync. We know this because: `head` and `tail` are atomic variables, `append()`
/// reserves entries using a CAS, and exec() does not concurrently mutate entries on the log.
unsafe impl<'a, T> Sync for Log<'a, T> where T: Sized + Clone {}

impl<'a, T> Log<'a, T>
where
    T: Sized + Clone,
{
    /// Constructs and returns a log of size `bytes` bytes.
    /// A size between 1-2 MiB usually works well in most cases.
    ///
    /// # Example
    ///
    /// ```
    /// use node_replication::Log;
    ///
    /// // Operation type that will go onto the log.
    /// #[derive(Clone)]
    /// enum Operation {
    ///     Read,
    ///     Write(u64),
    ///     Invalid,
    /// }
    ///
    /// // Creates a 1 Mega Byte sized log.
    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
    /// ```
    ///
    /// This method also allocates memory for the log upfront. No further allocations
    /// will be performed once this method returns.
    pub fn new<'b>(bytes: usize) -> Log<'b, T> {
        // Calculate the number of entries that will go into the log, and retrieve a
        // slice to it from the allocated region of memory.
        let mut num = bytes / Log::<T>::entry_size();

        // Make sure the log is large enough to allow for periodic garbage collection.
        if num < 2 * GC_FROM_HEAD {
            num = 2 * GC_FROM_HEAD;
        }

        // Round off to the next power of two if required. If we overflow, then set
        // the number of entries to the minimum required for GC. This is unlikely since
        // we'd need a log size > 2^63 entries for this to happen.
        if !num.is_power_of_two() {
            num = num.checked_next_power_of_two().unwrap_or(2 * GC_FROM_HEAD)
        };

        // Now that we have the actual number of entries, allocate the log.
        let b = num * Log::<T>::entry_size();
        let mem = unsafe {
            alloc(
                Layout::from_size_align(b, align_of::<Cell<Entry<T>>>())
                    .expect("Alignment error while allocating the shared log!"),
            )
        };
        if mem.is_null() {
            panic!("Failed to allocate memory for the shared log!");
        }
        let raw = unsafe { from_raw_parts_mut(mem as *mut Cell<Entry<T>>, num) };

        // Initialize all log entries by calling the default constructor.
        for e in raw.iter_mut() {
            unsafe {
                ::core::ptr::write(
                    e,
                    Cell::new(Entry {
                        operation: None,
                        replica: 0usize,
                        alivef: AtomicBool::new(false),
                    }),
                );
            }
        }

        #[allow(clippy::declare_interior_mutable_const)]
        const LMASK_DEFAULT: CachePadded<Cell<bool>> = CachePadded::new(Cell::new(true));

        #[cfg(not(loom))]
        {
            #[allow(clippy::declare_interior_mutable_const)]
            const LTAIL_DEFAULT: CachePadded<AtomicUsize> = CachePadded::new(AtomicUsize::new(0));

            Log {
                rawp: mem,
                rawb: b,
                size: num,
                slog: raw,
                head: CachePadded::new(AtomicUsize::new(0usize)),
                tail: CachePadded::new(AtomicUsize::new(0usize)),
                ctail: CachePadded::new(AtomicUsize::new(0usize)),
                ltails: [LTAIL_DEFAULT; MAX_REPLICAS_PER_LOG],
                next: CachePadded::new(AtomicUsize::new(1usize)),
                lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
            }
        }
        // AtomicUsize::new is not const in loom. This code block (including arr
        // dependency) becomes redundant once
        // https://github.com/tokio-rs/loom/issues/170 is fixed:
        #[cfg(loom)]
        {
            use arr_macro::arr;
            Log {
                rawp: mem,
                rawb: b,
                size: num,
                slog: raw,
                head: CachePadded::new(AtomicUsize::new(0usize)),
                tail: CachePadded::new(AtomicUsize::new(0usize)),
                ctail: CachePadded::new(AtomicUsize::new(0usize)),
                ltails: arr![CachePadded::new(AtomicUsize::new(0)); 3], // MAX_REPLICAS_PER_LOG
                next: CachePadded::new(AtomicUsize::new(1usize)),
                lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
            }
        }
    }

    /// Returns the size of an entry in bytes.
    fn entry_size() -> usize {
        size_of::<Cell<Entry<T>>>()
    }

    /// Registers a replica with the log. Returns an identifier that the replica
    /// can use to execute operations on the log.
    ///
    /// # Example
    ///
    /// ```ignore
    /// use node_replication::Log;
    ///
    /// // Operation type that will go onto the log.
    /// #[derive(Clone)]
    /// enum Operation {
    ///    Read,
    ///    Write(u64),
    ///    Invalid,
    /// }
    ///
    /// // Creates a 1 Mega Byte sized log.
    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
    ///
    /// // Registers against the log. `idx` can now be used to append operations
    /// // to the log, and execute these operations.
    /// let idx = l.register().expect("Failed to register with the Log.");
    /// ```
    pub(crate) fn register(&self) -> Option<usize> {
        // Loop until we either run out of identifiers or we manage to increment `next`.
        loop {
            let n = self.next.load(Ordering::Relaxed);

            // Check if we've exceeded the maximum number of replicas the log can support.
            if n >= MAX_REPLICAS_PER_LOG {
                return None;
            };

            if self
                .next
                .compare_exchange_weak(n, n + 1, Ordering::SeqCst, Ordering::SeqCst)
                != Ok(n)
            {
                continue;
            };

            return Some(n);
        }
    }

    /// Adds a batch of operations to the shared log.
    ///
    /// # Example
    ///
    /// ```ignore
    /// use node_replication::Log;
    ///
    /// // Operation type that will go onto the log.
    /// #[derive(Clone)]
    /// enum Operation {
    ///     Read,
    ///     Write(u64),
    /// }
    ///
    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
    /// let idx = l.register().expect("Failed to register with the Log.");
    ///
    /// // The set of operations we would like to append. The order will
    /// // be preserved by the interface.
    /// let ops = [Operation::Write(100), Operation::Read];
    ///
    /// // `append()` might have to garbage collect the log. When doing so,
    /// // it might encounter operations added in by another replica/thread.
    /// // This closure allows us to consume those operations. `id` identifies
    /// // the replica that added in those operations.
    /// let f = |op: Operation, id: usize| {
    ///     match(op) {
    ///         Operation::Read => println!("Read by {}", id),
    ///         Operation::Write(x) => println!("Write({}) by {}", x, id),
    ///     }
    /// };
    ///
    /// // Append the operations. These operations will be marked with `idx`,
    /// // and will be linearized at the tail of the log.
    /// l.append(&ops, idx, f);
    /// ```
    ///
    /// If there isn't enough space to perform the append, this method busy
    /// waits until the head is advanced. Accepts a replica `idx`; all appended
    /// operations/entries will be marked with this replica-identifier. Also
    /// accepts a closure `s`; when waiting for GC, this closure is passed into
    /// exec() to ensure that this replica does'nt cause a deadlock.
    ///
    /// # Note
    /// Documentation for this function is hidden since `append` is currently not
    /// intended as a public interface. It is marked as public due to being
    /// used by the benchmarking code.
    #[inline(always)]
    #[doc(hidden)]
    pub fn append<F: FnMut(T, usize)>(&self, ops: &[T], idx: usize, mut s: F) {
        let nops = ops.len();
        let mut iteration = 1;
        let mut waitgc = 1;

        // Keep trying to reserve entries and add operations to the log until
        // we succeed in doing so.
        loop {
            if iteration % WARN_THRESHOLD == 0 {
                warn!(
                    "append(ops.len()={}, {}) takes too many iterations ({}) to complete...",
                    ops.len(),
                    idx,
                    iteration,
                );
            }
            iteration += 1;

            let tail = self.tail.load(Ordering::Relaxed);
            let head = self.head.load(Ordering::Relaxed);

            // If there are fewer than `GC_FROM_HEAD` entries on the log, then just
            // try again. The replica that reserved entry (h + self.size - GC_FROM_HEAD)
            // is currently trying to advance the head of the log. Keep refreshing the
            // replica against the log to make sure that it isn't deadlocking GC.
            if tail > head + self.size - GC_FROM_HEAD {
                if waitgc % WARN_THRESHOLD == 0 {
                    warn!(
                        "append(ops.len()={}, {}) takes too many iterations ({}) waiting for gc...",
                        ops.len(),
                        idx,
                        waitgc,
                    );
                }
                waitgc += 1;
                self.exec(idx, &mut s);

                #[cfg(loom)]
                loom::thread::yield_now();
                continue;
            }

            // If on adding in the above entries there would be fewer than `GC_FROM_HEAD`
            // entries left on the log, then we need to advance the head of the log.
            let mut advance = false;
            if tail + nops > head + self.size - GC_FROM_HEAD {
                advance = true
            };

            // Try reserving slots for the operations. If that fails, then restart
            // from the beginning of this loop.
            if self.tail.compare_exchange_weak(
                tail,
                tail + nops,
                Ordering::Acquire,
                Ordering::Acquire,
            ) != Ok(tail)
            {
                continue;
            };

            // Successfully reserved entries on the shared log. Add the operations in.
            for (i, op) in ops.iter().enumerate().take(nops) {
                let e = self.slog[self.index(tail + i)].as_ptr();
                let mut m = self.lmasks[idx - 1].get();

                // This entry was just reserved so it should be dead (!= m). However, if
                // the log has wrapped around, then the alive mask has flipped. In this
                // case, we flip the mask we were originally going to write into the
                // allocated entry. We cannot flip lmasks[idx - 1] because this replica
                // might still need to execute a few entries before the wrap around.
                if unsafe { (*e).alivef.load(Ordering::Relaxed) == m } {
                    m = !m;
                }

                unsafe { (*e).operation = Some(op.clone()) };
                unsafe { (*e).replica = idx };
                unsafe { (*e).alivef.store(m, Ordering::Release) };
            }

            // If needed, advance the head of the log forward to make room on the log.
            if advance {
                self.advance_head(idx, &mut s);
            }

            return;
        }
    }

    /// Executes a passed in closure (`d`) on all operations starting from
    /// a replica's local tail on the shared log. The replica is identified through an
    /// `idx` passed in as an argument.
    ///
    /// # Example
    ///
    /// ```ignore
    /// use node_replication::Log;
    ///
    /// // Operation type that will go onto the log.
    /// #[derive(Clone)]
    /// enum Operation {
    ///     Read,
    ///     Write(u64),
    /// }
    ///
    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
    /// let idx = l.register().expect("Failed to register with the Log.");
    /// let ops = [Operation::Write(100), Operation::Read];
    ///
    /// let f = |op: Operation, id: usize| {
    ///     match(op) {
    ///         Operation::Read => println!("Read by {}", id),
    ///         Operation::Write(x) => println!("Write({}) by {}", x, id),
    ///     }
    /// };
    /// l.append(&ops, idx, f);
    ///
    /// // This closure is executed on every operation appended to the
    /// // since the last call to `exec()` by this replica/thread.
    /// let mut d = 0;
    /// let mut g = |op: Operation, id: usize| {
    ///     match(op) {
    ///         // The write happened before the read.
    ///         Operation::Read => assert_eq!(100, d),
    ///         Operation::Write(x) => d += 100,
    ///     }
    /// };
    /// l.exec(idx, &mut g);
    /// ```
    ///
    /// The passed in closure is expected to take in two arguments: The operation
    /// from the shared log to be executed and the replica that issued it.
    #[inline(always)]
    pub(crate) fn exec<F: FnMut(T, usize)>(&self, idx: usize, d: &mut F) {
        // Load the logical log offset from which we must execute operations.
        let ltail = self.ltails[idx - 1].load(Ordering::Relaxed);

        // Check if we have any work to do by comparing our local tail with the log's
        // global tail. If they're equal, then we're done here and can simply return.
        let gtail = self.tail.load(Ordering::Relaxed);
        if ltail == gtail {
            return;
        }

        let h = self.head.load(Ordering::Relaxed);

        // Make sure we're within the shared log. If we aren't, then panic.
        if ltail > gtail || ltail < h {
            panic!("Local tail not within the shared log!")
        };

        // Execute all operations from the passed in offset to the shared log's tail. Check if
        // the entry is live first; we could have a replica that has reserved entries, but not
        // filled them into the log yet.
        for i in ltail..gtail {
            let mut iteration = 1;
            let e = self.slog[self.index(i)].as_ptr();

            while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[idx - 1].get() } {
                if iteration % WARN_THRESHOLD == 0 {
                    warn!(
                        "alivef not being set for self.index(i={}) = {} (self.lmasks[{}] is {})...",
                        i,
                        self.index(i),
                        idx - 1,
                        self.lmasks[idx - 1].get()
                    );
                }
                iteration += 1;

                #[cfg(loom)]
                loom::thread::yield_now();
            }

            unsafe { d((*e).operation.as_ref().unwrap().clone(), (*e).replica) };

            // Looks like we're going to wrap around now; flip this replica's local mask.
            if self.index(i) == self.size - 1 {
                self.lmasks[idx - 1].set(!self.lmasks[idx - 1].get());
                //trace!("idx: {} lmask: {}", idx, self.lmasks[idx - 1].get());
            }
        }

        // Update the completed tail after we've executed these operations.
        // Also update this replica's local tail.
        self.ctail.fetch_max(gtail, Ordering::Relaxed);
        self.ltails[idx - 1].store(gtail, Ordering::Relaxed);
    }

    /// Returns a physical index given a logical index into the shared log.
    #[inline(always)]
    fn index(&self, logical: usize) -> usize {
        logical & (self.size - 1)
    }

    /// Advances the head of the log forward. If a replica has stopped making progress,
    /// then this method will never return. Accepts a closure that is passed into exec()
    /// to ensure that this replica does not deadlock GC.
    #[inline(always)]
    fn advance_head<F: FnMut(T, usize)>(&self, rid: usize, mut s: &mut F) {
        // Keep looping until we can advance the head and create some free space
        // on the log. If one of the replicas has stopped making progress, then
        // this method might never return.
        let mut iteration = 1;
        loop {
            let r = self.next.load(Ordering::Relaxed);
            let global_head = self.head.load(Ordering::Relaxed);
            let f = self.tail.load(Ordering::Relaxed);

            let mut min_local_tail = self.ltails[0].load(Ordering::Relaxed);

            // Find the smallest local tail across all replicas.
            for idx in 1..r {
                let cur_local_tail = self.ltails[idx - 1].load(Ordering::Relaxed);
                if min_local_tail > cur_local_tail {
                    min_local_tail = cur_local_tail
                };
            }

            // If we cannot advance the head further, then start
            // from the beginning of this loop again. Before doing so, try consuming
            // any new entries on the log to prevent deadlock.
            if min_local_tail == global_head {
                if iteration % WARN_THRESHOLD == 0 {
                    warn!("Spending a long time in `advance_head`, are we starving?");
                }
                iteration += 1;
                self.exec(rid, &mut s);

                #[cfg(loom)]
                loom::thread::yield_now();
                continue;
            }

            // There are entries that can be freed up; update the head offset.
            self.head.store(min_local_tail, Ordering::Relaxed);

            // Make sure that we freed up enough space so that threads waiting for
            // GC in append can make progress. Otherwise, try to make progress again.
            // If we're making progress again, then try consuming entries on the log.
            if f < min_local_tail + self.size - GC_FROM_HEAD {
                return;
            } else {
                self.exec(rid, &mut s);
            }
        }
    }

    /// Resets the log. Required for microbenchmarking the log; with this method, we
    /// can re-use the log across experimental runs without having to re-allocate the
    /// log over and over again.
    ///
    /// # Safety
    ///
    /// *To be used for testing/benchmarking only, hence marked unsafe*. Before calling
    /// this method, please make sure that there aren't any replicas/threads actively
    /// issuing/executing operations to/from this log.
    #[doc(hidden)]
    #[inline(always)]
    pub unsafe fn reset(&self) {
        // First, reset global metadata.
        self.head.store(0, Ordering::SeqCst);
        self.tail.store(0, Ordering::SeqCst);
        self.next.store(1, Ordering::SeqCst);

        // Next, reset replica-local metadata.
        for r in 0..MAX_REPLICAS_PER_LOG {
            self.ltails[r].store(0, Ordering::Relaxed);
            self.lmasks[r].set(true);
        }

        // Next, free up all log entries. Use pointers to avoid memcpy and speed up
        // the reset of the log here.
        for i in 0..self.size {
            let e = self.slog[self.index(i)].as_ptr();
            (*e).alivef.store(false, Ordering::Release);
        }
    }

    /// This method checks if the replica is in sync to execute a read-only operation
    /// right away. It does so by comparing the replica's local tail with the log's
    /// completed tail.
    ///
    /// # Example
    ///
    /// ```ignore
    /// use node_replication::Log;
    ///
    /// // Operation type that will go onto the log.
    /// #[derive(Clone)]
    /// enum Operation {
    ///     Read,
    ///     Write(u64),
    /// }
    ///
    /// // We register two replicas here, `idx1` and `idx2`.
    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
    /// let idx1 = l.register().expect("Failed to register with the Log.");
    /// let idx2 = l.register().expect("Failed to register with the Log.");
    /// let ops = [Operation::Write(100), Operation::Read];
    ///
    /// let f = |op: Operation, id: usize| {
    ///     match(op) {
    ///         Operation::Read => println!("Read by {}", id),
    ///         Operation::Write(x) => println!("Write({}) by {}", x, id),
    ///     }
    /// };
    /// l.append(&ops, idx2, f);
    ///
    /// let mut d = 0;
    /// let mut g = |op: Operation, id: usize| {
    ///     match(op) {
    ///         // The write happened before the read.
    ///         Operation::Read => assert_eq!(100, d),
    ///         Operation::Write(x) => d += 100,
    ///     }
    /// };
    /// l.exec(idx2, &mut g);
    ///
    /// // This assertion fails because `idx1` has not executed operations
    /// // that were appended by `idx2`.
    /// assert_eq!(false, l.is_replica_synced_for_reads(idx1, l.get_ctail()));
    ///
    /// let mut e = 0;
    /// let mut g = |op: Operation, id: usize| {
    ///     match(op) {
    ///         // The write happened before the read.
    ///         Operation::Read => assert_eq!(100, e),
    ///         Operation::Write(x) => e += 100,
    ///     }
    /// };
    /// l.exec(idx1, &mut g);
    ///
    /// // `idx1` is all synced up, so this assertion passes.
    /// assert_eq!(true, l.is_replica_synced_for_reads(idx1, l.get_ctail()));
    /// ```
    #[inline(always)]
    pub(crate) fn is_replica_synced_for_reads(&self, idx: usize, ctail: usize) -> bool {
        self.ltails[idx - 1].load(Ordering::Relaxed) >= ctail
    }

    /// This method returns the current ctail value for the log.
    #[inline(always)]
    pub(crate) fn get_ctail(&self) -> usize {
        self.ctail.load(Ordering::Relaxed)
    }
}

impl<'a, T> Default for Log<'a, T>
where
    T: Sized + Clone,
{
    /// Default constructor for the shared log.
    fn default() -> Self {
        Log::new(DEFAULT_LOG_BYTES)
    }
}

impl<'a, T> Drop for Log<'a, T>
where
    T: Sized + Clone,
{
    /// Destructor for the shared log.
    fn drop(&mut self) {
        unsafe {
            dealloc(
                self.rawp,
                Layout::from_size_align(self.rawb, align_of::<Cell<Entry<T>>>())
                    .expect("Alignment error while deallocating the shared log!"),
            )
        };
    }
}

#[cfg(test)]
mod tests {
    // Import std so that we have an allocator for our unit tests.
    extern crate std;

    use super::*;
    use std::sync::Arc;

    // Define operations along with their arguments that go onto the log.
    #[derive(Clone)] // Traits required by the log interface.
    #[derive(Debug, PartialEq)] // Traits required for testing.
    enum Operation {
        Read,
        Write(u64),
        Invalid,
    }

    // Required so that we can unit test Entry.
    impl Default for Operation {
        fn default() -> Operation {
            Operation::Invalid
        }
    }

    // Test that we can default construct entries correctly.
    #[test]
    fn test_entry_create_default() {
        let e = Entry::<Operation>::default();
        assert_eq!(e.operation, None);
        assert_eq!(e.replica, 0);
        assert_eq!(e.alivef.load(Ordering::Relaxed), false);
    }

    // Test that our entry_size() method returns the correct size.
    #[test]
    fn test_log_entry_size() {
        assert_eq!(Log::<Operation>::entry_size(), 64);
    }

    // Tests if a small log can be correctly constructed.
    #[test]
    fn test_log_create() {
        let l = Log::<Operation>::new(1024 * 1024);
        let n = (1024 * 1024) / Log::<Operation>::entry_size();
        assert_eq!(l.rawb, 1024 * 1024);
        assert_eq!(l.size, n);
        assert_eq!(l.slog.len(), n);
        assert_eq!(l.head.load(Ordering::Relaxed), 0);
        assert_eq!(l.tail.load(Ordering::Relaxed), 0);
        assert_eq!(l.next.load(Ordering::Relaxed), 1);
        assert_eq!(l.ctail.load(Ordering::Relaxed), 0);

        for i in 0..MAX_REPLICAS_PER_LOG {
            assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
        }

        for i in 0..MAX_REPLICAS_PER_LOG {
            assert_eq!(l.lmasks[i].get(), true);
        }
    }

    // Tests if the constructor allocates enough space for GC.
    #[test]
    fn test_log_min_size() {
        let l = Log::<Operation>::new(1024);
        assert_eq!(l.rawb, 2 * GC_FROM_HEAD * Log::<Operation>::entry_size());
        assert_eq!(l.size, 2 * GC_FROM_HEAD);
        assert_eq!(l.slog.len(), 2 * GC_FROM_HEAD);
    }

    // Tests that the constructor allocates a log whose number of entries
    // are a power of two.
    #[test]
    fn test_log_power_of_two() {
        let l = Log::<Operation>::new(524 * 1024);
        let n = ((524 * 1024) / Log::<Operation>::entry_size()).checked_next_power_of_two();
        assert_eq!(l.rawb, n.unwrap() * Log::<Operation>::entry_size());
        assert_eq!(l.size, n.unwrap());
        assert_eq!(l.slog.len(), n.unwrap());
    }

    // Tests if the log can be successfully default constructed.
    #[test]
    fn test_log_create_default() {
        let l = Log::<Operation>::default();
        let n = DEFAULT_LOG_BYTES / Log::<Operation>::entry_size();
        assert_eq!(l.rawb, DEFAULT_LOG_BYTES);
        assert_eq!(l.size, n);
        assert_eq!(l.slog.len(), n);
        assert_eq!(l.head.load(Ordering::Relaxed), 0);
        assert_eq!(l.tail.load(Ordering::Relaxed), 0);
        assert_eq!(l.next.load(Ordering::Relaxed), 1);
        assert_eq!(l.ctail.load(Ordering::Relaxed), 0);

        for i in 0..MAX_REPLICAS_PER_LOG {
            assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
        }

        for i in 0..MAX_REPLICAS_PER_LOG {
            assert_eq!(l.lmasks[i].get(), true);
        }
    }

    // Tests if we can correctly index into the shared log.
    #[test]
    fn test_log_index() {
        let l = Log::<Operation>::new(2 * 1024 * 1024);
        assert_eq!(l.index(99000), 696);
    }

    // Tests if we can correctly register with the shared log.
    #[test]
    fn test_log_register() {
        let l = Log::<Operation>::new(1024);
        assert_eq!(l.register(), Some(1));
        assert_eq!(l.next.load(Ordering::Relaxed), 2);
    }

    // Tests that we cannot register more than the max replicas with the log.
    #[test]
    fn test_log_register_none() {
        let l = Log::<Operation>::new(1024);
        l.next.store(MAX_REPLICAS_PER_LOG, Ordering::Relaxed);
        assert!(l.register().is_none());
        assert_eq!(l.next.load(Ordering::Relaxed), MAX_REPLICAS_PER_LOG);
    }

    // Test that we can correctly append an entry into the log.
    #[test]
    fn test_log_append() {
        let l = Log::<Operation>::default();
        let o = [Operation::Read];
        l.append(&o, 1, |_o: Operation, _i: usize| {});

        assert_eq!(l.head.load(Ordering::Relaxed), 0);
        assert_eq!(l.tail.load(Ordering::Relaxed), 1);
        let slog = l.slog[0].take();
        assert_eq!(slog.operation, Some(Operation::Read));
        assert_eq!(slog.replica, 1);
    }

    // Test that multiple entries can be appended to the log.
    #[test]
    fn test_log_append_multiple() {
        let l = Log::<Operation>::default();
        let o = [Operation::Read, Operation::Write(119)];
        l.append(&o, 1, |_o: Operation, _i: usize| {});

        assert_eq!(l.head.load(Ordering::Relaxed), 0);
        assert_eq!(l.tail.load(Ordering::Relaxed), 2);
    }

    // Tests that we can advance the head of the log to the smallest of all replica-local tails.
    #[test]
    fn test_log_advance_head() {
        let l = Log::<Operation>::default();

        l.next.store(5, Ordering::Relaxed);
        l.ltails[0].store(1023, Ordering::Relaxed);
        l.ltails[1].store(224, Ordering::Relaxed);
        l.ltails[2].store(4096, Ordering::Relaxed);
        l.ltails[3].store(799, Ordering::Relaxed);

        l.advance_head(0, &mut |_o: Operation, _i: usize| {});
        assert_eq!(l.head.load(Ordering::Relaxed), 224);
    }

    // Tests that the head of the log is advanced when we're close to filling up the entire log.
    #[test]
    fn test_log_append_gc() {
        let l = Log::<Operation>::default();
        let o: [Operation; 4] = unsafe {
            let mut a: [Operation; 4] = ::std::mem::MaybeUninit::zeroed().assume_init();
            for i in &mut a[..] {
                ::std::ptr::write(i, Operation::Read);
            }
            a
        };

        l.next.store(2, Ordering::Relaxed);
        l.tail.store(l.size - GC_FROM_HEAD - 1, Ordering::Relaxed);
        l.ltails[0].store(1024, Ordering::Relaxed);
        l.append(&o, 1, |_o: Operation, _i: usize| {});

        assert_eq!(l.head.load(Ordering::Relaxed), 1024);
        assert_eq!(l.tail.load(Ordering::Relaxed), l.size - GC_FROM_HEAD + 3);
    }

    // Tests that on log wrap around, the local mask stays
    // the same because entries have not been executed yet.
    #[test]
    fn test_log_append_wrap() {
        let l = Log::<Operation>::default();
        let o: [Operation; 1024] = unsafe {
            let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
            for i in &mut a[..] {
                ::std::ptr::write(i, Operation::Read);
            }
            a
        };

        l.next.store(2, Ordering::Relaxed);
        l.head.store(2 * 8192, Ordering::Relaxed);
        l.tail.store(l.size - 10, Ordering::Relaxed);
        l.append(&o, 1, |_o: Operation, _i: usize| {});

        assert_eq!(l.lmasks[0].get(), true);
        assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
    }

    // Test that we can execute operations appended to the log.
    #[test]
    fn test_log_exec() {
        let l = Log::<Operation>::default();
        let o = [Operation::Read];
        let mut f = |op: Operation, i: usize| {
            assert_eq!(op, Operation::Read);
            assert_eq!(i, 1);
        };

        l.append(&o, 1, |_o: Operation, _i: usize| {});
        l.exec(1, &mut f);

        assert_eq!(
            l.tail.load(Ordering::Relaxed),
            l.ctail.load(Ordering::Relaxed)
        );
        assert_eq!(
            l.tail.load(Ordering::Relaxed),
            l.ltails[0].load(Ordering::Relaxed)
        );
    }

    // Test that exec() doesn't do anything when the log is empty.
    #[test]
    fn test_log_exec_empty() {
        let l = Log::<Operation>::default();
        let mut f = |_o: Operation, _i: usize| {
            assert!(false);
        };

        l.exec(1, &mut f);
    }

    // Test that exec() doesn't do anything if we're already up-to-date.
    #[test]
    fn test_log_exec_zero() {
        let l = Log::<Operation>::default();
        let o = [Operation::Read];
        let mut f = |op: Operation, i: usize| {
            assert_eq!(op, Operation::Read);
            assert_eq!(i, 1);
        };
        let mut g = |_op: Operation, _i: usize| {
            assert!(false);
        };

        l.append(&o, 1, |_o: Operation, _i: usize| {});
        l.exec(1, &mut f);
        l.exec(1, &mut g);
    }

    // Test that multiple entries on the log can be executed correctly.
    #[test]
    fn test_log_exec_multiple() {
        let l = Log::<Operation>::default();
        let o = [Operation::Read, Operation::Write(119)];
        let mut s = 0;
        let mut f = |op: Operation, _i: usize| match op {
            Operation::Read => s += 121,
            Operation::Write(v) => s += v,
            Operation::Invalid => assert!(false),
        };

        l.append(&o, 1, |_o: Operation, _i: usize| {});
        l.exec(1, &mut f);
        assert_eq!(s, 240);

        assert_eq!(
            l.tail.load(Ordering::Relaxed),
            l.ctail.load(Ordering::Relaxed)
        );
        assert_eq!(
            l.tail.load(Ordering::Relaxed),
            l.ltails[0].load(Ordering::Relaxed)
        );
    }

    // Test that the replica local mask is updated correctly when executing over
    // a wrapped around log.
    #[test]
    fn test_log_exec_wrap() {
        let l = Log::<Operation>::default();
        let o: [Operation; 1024] = unsafe {
            let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
            for i in &mut a[..] {
                ::std::ptr::write(i, Operation::Read);
            }
            a
        };
        let mut f = |op: Operation, i: usize| {
            assert_eq!(op, Operation::Read);
            assert_eq!(i, 1);
        };

        l.append(&o, 1, |_o: Operation, _i: usize| {}); // Required for GC to work correctly.
        l.next.store(2, Ordering::SeqCst);
        l.head.store(2 * 8192, Ordering::SeqCst);
        l.tail.store(l.size - 10, Ordering::SeqCst);
        l.append(&o, 1, |_o: Operation, _i: usize| {});

        l.ltails[0].store(l.size - 10, Ordering::SeqCst);
        l.exec(1, &mut f);

        assert_eq!(l.lmasks[0].get(), false);
        assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
    }

    // Tests that exec() panics if the head of the log advances beyond the tail.
    #[test]
    #[should_panic]
    fn test_exec_panic() {
        let l = Log::<Operation>::default();
        let o: [Operation; 1024] = unsafe {
            let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
            for i in &mut a[..] {
                ::std::ptr::write(i, Operation::Read);
            }
            a
        };
        let mut f = |_op: Operation, _i: usize| {
            assert!(false);
        };

        l.append(&o, 1, |_o: Operation, _i: usize| {});
        l.head.store(8192, Ordering::SeqCst);

        l.exec(1, &mut f);
    }

    // Tests that operations are cloned when added to the log, and that
    // they are correctly dropped once overwritten.
    #[test]
    fn test_log_change_refcount() {
        let l = Log::<Arc<Operation>>::default();
        let o1 = [Arc::new(Operation::Read)];
        let o2 = [Arc::new(Operation::Read)];
        assert_eq!(Arc::strong_count(&o1[0]), 1);
        assert_eq!(Arc::strong_count(&o2[0]), 1);

        l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
        assert_eq!(Arc::strong_count(&o1[0]), 2);
        l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
        assert_eq!(Arc::strong_count(&o1[0]), 3);

        unsafe { l.reset() };

        // Over here, we overwrite entries that were written to by the two
        // previous appends. This decreases the refcount of o1 and increases
        // the refcount of o2.
        l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
        assert_eq!(Arc::strong_count(&o1[0]), 2);
        assert_eq!(Arc::strong_count(&o2[0]), 2);
        l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
        assert_eq!(Arc::strong_count(&o1[0]), 1);
        assert_eq!(Arc::strong_count(&o2[0]), 3);
    }

    // Tests that operations are cloned when added to the log, and that
    // they are correctly dropped once overwritten after the GC.
    #[test]
    fn test_log_refcount_change_with_gc() {
        let entry_size = 64;
        let total_entries = 16384;

        assert_eq!(Log::<Arc<Operation>>::entry_size(), entry_size);
        let size: usize = total_entries * entry_size;
        let l = Log::<Arc<Operation>>::new(size);
        let o1 = [Arc::new(Operation::Read)];
        let o2 = [Arc::new(Operation::Read)];
        assert_eq!(Arc::strong_count(&o1[0]), 1);
        assert_eq!(Arc::strong_count(&o2[0]), 1);

        for i in 1..(total_entries + 1) {
            l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
            assert_eq!(Arc::strong_count(&o1[0]), i + 1);
        }
        assert_eq!(Arc::strong_count(&o1[0]), total_entries + 1);

        for i in 1..(total_entries + 1) {
            l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
            assert_eq!(Arc::strong_count(&o1[0]), (total_entries + 1) - i);
            assert_eq!(Arc::strong_count(&o2[0]), i + 1);
        }
        assert_eq!(Arc::strong_count(&o1[0]), 1);
        assert_eq!(Arc::strong_count(&o2[0]), total_entries + 1);
    }

    // Tests that is_replica_synced_for_read() works correctly; it returns
    // false when a replica is not synced up and true when it is.
    #[test]
    fn test_replica_synced_for_read() {
        let l = Log::<Operation>::default();
        let one = l.register().unwrap();
        let two = l.register().unwrap();

        assert_eq!(one, 1);
        assert_eq!(two, 2);

        let o = [Operation::Read];
        let mut f = |op: Operation, i: usize| {
            assert_eq!(op, Operation::Read);
            assert_eq!(i, 1);
        };

        l.append(&o, one, |_o: Operation, _i: usize| {});
        l.exec(one, &mut f);
        assert_eq!(l.is_replica_synced_for_reads(one, l.get_ctail()), true);
        assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), false);

        l.exec(two, &mut f);
        assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), true);
    }
}