node_replication/
log.rs

1// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4use alloc::alloc::{alloc, dealloc, Layout};
5
6use core::cell::Cell;
7use core::default::Default;
8use core::fmt;
9use core::mem::{align_of, size_of};
10use core::ops::{Drop, FnMut};
11use core::slice::from_raw_parts_mut;
12
13#[cfg(not(loom))]
14use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15#[cfg(loom)]
16pub use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17
18use crossbeam_utils::CachePadded;
19
20use crate::context::MAX_PENDING_OPS;
21use crate::replica::MAX_THREADS_PER_REPLICA;
22
23/// The default size of the shared log in bytes. If constructed using the
24/// default constructor, the log will be these many bytes in size. Currently
25/// set to 32 MiB based on the ASPLOS 2017 paper.
26const DEFAULT_LOG_BYTES: usize = 32 * 1024 * 1024;
27const_assert!(DEFAULT_LOG_BYTES >= 1 && (DEFAULT_LOG_BYTES & (DEFAULT_LOG_BYTES - 1) == 0));
28
29/// The maximum number of replicas that can be registered with the log.
30#[cfg(not(loom))]
31pub const MAX_REPLICAS_PER_LOG: usize = 192;
32#[cfg(loom)] // Otherwise uses too much stack space wich crashes in loom...
33pub const MAX_REPLICAS_PER_LOG: usize = 3;
34
35/// Constant required for garbage collection. When the tail and the head are
36/// these many entries apart on the circular buffer, garbage collection will
37/// be performed by one of the replicas registered with the log.
38///
39/// For the GC algorithm to work, we need to ensure that we can support the
40/// largest possible append after deciding to perform GC. This largest possible
41/// append is when every thread within a replica has a full batch of writes
42/// to be appended to the shared log.
43const GC_FROM_HEAD: usize = MAX_PENDING_OPS * MAX_THREADS_PER_REPLICA;
44const_assert!(GC_FROM_HEAD >= 1 && (GC_FROM_HEAD & (GC_FROM_HEAD - 1) == 0));
45
46/// Threshold after how many iterations we log a warning for busy spinning loops.
47///
48/// This helps with debugging to figure out where things may end up blocking.
49/// Should be a power of two to avoid divisions.
50const WARN_THRESHOLD: usize = 1 << 28;
51
52/// An entry that sits on the log. Each entry consists of three fields: The operation to
53/// be performed when a thread reaches this entry on the log, the replica that appended
54/// this operation, and a flag indicating whether this entry is valid.
55///
56/// `T` is the type on the operation - typically an enum class containing opcodes as well as
57/// arguments. It is required that this type be sized and cloneable.
58#[derive(Default)]
59#[repr(align(64))]
60struct Entry<T>
61where
62    T: Sized + Clone,
63{
64    /// The operation that this entry represents.
65    operation: Option<T>,
66
67    /// Identifies the replica that issued the above operation.
68    replica: usize,
69
70    /// Indicates whether this entry represents a valid operation when on the log.
71    alivef: AtomicBool,
72}
73
74/// A log of operations that is typically accessed by multiple
75/// [Replica](struct.Replica.html).
76///
77/// Operations can be added to the log by calling the `append()` method and
78/// providing a list of operations to be performed.
79///
80/// Operations already on the log can be executed by calling the `exec()` method
81/// and providing a replica-id along with a closure. Newly added operations
82/// since the replica last called `exec()` will be executed by invoking the
83/// supplied closure for each one of them.
84///
85/// Accepts one generic type parameter; `T` defines the type of operations and
86/// their arguments that will go on the log and would typically be an enum
87/// class.
88///
89/// This struct is aligned to 64 bytes optimizing cache access.\
90///
91/// # Note
92/// As a client, typically there is no need to call any methods on the Log aside
93/// from `new`. Only in the rare circumstance someone would implement their own
94/// Replica would it be necessary to call any of the Log's methods.
95#[repr(align(64))]
96pub struct Log<'a, T>
97where
98    T: Sized + Clone,
99{
100    /// Raw pointer to the actual underlying log. Required for dealloc.
101    rawp: *mut u8,
102
103    /// Size of the underlying log in bytes. Required for dealloc.
104    rawb: usize,
105
106    /// The maximum number of entries that can be held inside the log.
107    size: usize,
108
109    /// A reference to the actual log. Nothing but a slice of entries.
110    slog: &'a [Cell<Entry<T>>],
111
112    /// Logical index into the above slice at which the log starts.
113    head: CachePadded<AtomicUsize>,
114
115    /// Logical index into the above slice at which the log ends.
116    /// New appends go here.
117    tail: CachePadded<AtomicUsize>,
118
119    /// Completed tail maintains an index <= tail that points to a
120    /// log entry after which there are no completed operations across
121    /// all replicas registered against this log.
122    ctail: CachePadded<AtomicUsize>,
123
124    /// Array consisting of the local tail of each replica registered with the log.
125    /// Required for garbage collection; since replicas make progress over the log
126    /// independently, we want to make sure that we don't garbage collect operations
127    /// that haven't been executed by all replicas.
128    ltails: [CachePadded<AtomicUsize>; MAX_REPLICAS_PER_LOG],
129
130    /// Identifier that will be allocated to the next replica that registers with
131    /// this Log. Also required to correctly index into ltails above.
132    next: CachePadded<AtomicUsize>,
133
134    /// Array consisting of local alive masks for each registered replica. Required
135    /// because replicas make independent progress over the log, so we need to
136    /// track log wrap-arounds for each of them separately.
137    lmasks: [CachePadded<Cell<bool>>; MAX_REPLICAS_PER_LOG],
138}
139
140impl<'a, T> fmt::Debug for Log<'a, T>
141where
142    T: Sized + Clone,
143{
144    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
145        fmt.debug_struct("Log")
146            .field("head", &self.tail)
147            .field("tail", &self.head)
148            .field("size", &self.size)
149            .finish()
150    }
151}
152
153/// The Log is Send. The *mut u8 (`rawp`) is never dereferenced.
154unsafe impl<'a, T> Send for Log<'a, T> where T: Sized + Clone {}
155
156/// The Log is Sync. We know this because: `head` and `tail` are atomic variables, `append()`
157/// reserves entries using a CAS, and exec() does not concurrently mutate entries on the log.
158unsafe impl<'a, T> Sync for Log<'a, T> where T: Sized + Clone {}
159
160impl<'a, T> Log<'a, T>
161where
162    T: Sized + Clone,
163{
164    /// Constructs and returns a log of size `bytes` bytes.
165    /// A size between 1-2 MiB usually works well in most cases.
166    ///
167    /// # Example
168    ///
169    /// ```
170    /// use node_replication::Log;
171    ///
172    /// // Operation type that will go onto the log.
173    /// #[derive(Clone)]
174    /// enum Operation {
175    ///     Read,
176    ///     Write(u64),
177    ///     Invalid,
178    /// }
179    ///
180    /// // Creates a 1 Mega Byte sized log.
181    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
182    /// ```
183    ///
184    /// This method also allocates memory for the log upfront. No further allocations
185    /// will be performed once this method returns.
186    pub fn new<'b>(bytes: usize) -> Log<'b, T> {
187        // Calculate the number of entries that will go into the log, and retrieve a
188        // slice to it from the allocated region of memory.
189        let mut num = bytes / Log::<T>::entry_size();
190
191        // Make sure the log is large enough to allow for periodic garbage collection.
192        if num < 2 * GC_FROM_HEAD {
193            num = 2 * GC_FROM_HEAD;
194        }
195
196        // Round off to the next power of two if required. If we overflow, then set
197        // the number of entries to the minimum required for GC. This is unlikely since
198        // we'd need a log size > 2^63 entries for this to happen.
199        if !num.is_power_of_two() {
200            num = num.checked_next_power_of_two().unwrap_or(2 * GC_FROM_HEAD)
201        };
202
203        // Now that we have the actual number of entries, allocate the log.
204        let b = num * Log::<T>::entry_size();
205        let mem = unsafe {
206            alloc(
207                Layout::from_size_align(b, align_of::<Cell<Entry<T>>>())
208                    .expect("Alignment error while allocating the shared log!"),
209            )
210        };
211        if mem.is_null() {
212            panic!("Failed to allocate memory for the shared log!");
213        }
214        let raw = unsafe { from_raw_parts_mut(mem as *mut Cell<Entry<T>>, num) };
215
216        // Initialize all log entries by calling the default constructor.
217        for e in raw.iter_mut() {
218            unsafe {
219                ::core::ptr::write(
220                    e,
221                    Cell::new(Entry {
222                        operation: None,
223                        replica: 0usize,
224                        alivef: AtomicBool::new(false),
225                    }),
226                );
227            }
228        }
229
230        #[allow(clippy::declare_interior_mutable_const)]
231        const LMASK_DEFAULT: CachePadded<Cell<bool>> = CachePadded::new(Cell::new(true));
232
233        #[cfg(not(loom))]
234        {
235            #[allow(clippy::declare_interior_mutable_const)]
236            const LTAIL_DEFAULT: CachePadded<AtomicUsize> = CachePadded::new(AtomicUsize::new(0));
237
238            Log {
239                rawp: mem,
240                rawb: b,
241                size: num,
242                slog: raw,
243                head: CachePadded::new(AtomicUsize::new(0usize)),
244                tail: CachePadded::new(AtomicUsize::new(0usize)),
245                ctail: CachePadded::new(AtomicUsize::new(0usize)),
246                ltails: [LTAIL_DEFAULT; MAX_REPLICAS_PER_LOG],
247                next: CachePadded::new(AtomicUsize::new(1usize)),
248                lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
249            }
250        }
251        // AtomicUsize::new is not const in loom. This code block (including arr
252        // dependency) becomes redundant once
253        // https://github.com/tokio-rs/loom/issues/170 is fixed:
254        #[cfg(loom)]
255        {
256            use arr_macro::arr;
257            Log {
258                rawp: mem,
259                rawb: b,
260                size: num,
261                slog: raw,
262                head: CachePadded::new(AtomicUsize::new(0usize)),
263                tail: CachePadded::new(AtomicUsize::new(0usize)),
264                ctail: CachePadded::new(AtomicUsize::new(0usize)),
265                ltails: arr![CachePadded::new(AtomicUsize::new(0)); 3], // MAX_REPLICAS_PER_LOG
266                next: CachePadded::new(AtomicUsize::new(1usize)),
267                lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
268            }
269        }
270    }
271
272    /// Returns the size of an entry in bytes.
273    fn entry_size() -> usize {
274        size_of::<Cell<Entry<T>>>()
275    }
276
277    /// Registers a replica with the log. Returns an identifier that the replica
278    /// can use to execute operations on the log.
279    ///
280    /// # Example
281    ///
282    /// ```ignore
283    /// use node_replication::Log;
284    ///
285    /// // Operation type that will go onto the log.
286    /// #[derive(Clone)]
287    /// enum Operation {
288    ///    Read,
289    ///    Write(u64),
290    ///    Invalid,
291    /// }
292    ///
293    /// // Creates a 1 Mega Byte sized log.
294    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
295    ///
296    /// // Registers against the log. `idx` can now be used to append operations
297    /// // to the log, and execute these operations.
298    /// let idx = l.register().expect("Failed to register with the Log.");
299    /// ```
300    pub(crate) fn register(&self) -> Option<usize> {
301        // Loop until we either run out of identifiers or we manage to increment `next`.
302        loop {
303            let n = self.next.load(Ordering::Relaxed);
304
305            // Check if we've exceeded the maximum number of replicas the log can support.
306            if n >= MAX_REPLICAS_PER_LOG {
307                return None;
308            };
309
310            if self
311                .next
312                .compare_exchange_weak(n, n + 1, Ordering::SeqCst, Ordering::SeqCst)
313                != Ok(n)
314            {
315                continue;
316            };
317
318            return Some(n);
319        }
320    }
321
322    /// Adds a batch of operations to the shared log.
323    ///
324    /// # Example
325    ///
326    /// ```ignore
327    /// use node_replication::Log;
328    ///
329    /// // Operation type that will go onto the log.
330    /// #[derive(Clone)]
331    /// enum Operation {
332    ///     Read,
333    ///     Write(u64),
334    /// }
335    ///
336    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
337    /// let idx = l.register().expect("Failed to register with the Log.");
338    ///
339    /// // The set of operations we would like to append. The order will
340    /// // be preserved by the interface.
341    /// let ops = [Operation::Write(100), Operation::Read];
342    ///
343    /// // `append()` might have to garbage collect the log. When doing so,
344    /// // it might encounter operations added in by another replica/thread.
345    /// // This closure allows us to consume those operations. `id` identifies
346    /// // the replica that added in those operations.
347    /// let f = |op: Operation, id: usize| {
348    ///     match(op) {
349    ///         Operation::Read => println!("Read by {}", id),
350    ///         Operation::Write(x) => println!("Write({}) by {}", x, id),
351    ///     }
352    /// };
353    ///
354    /// // Append the operations. These operations will be marked with `idx`,
355    /// // and will be linearized at the tail of the log.
356    /// l.append(&ops, idx, f);
357    /// ```
358    ///
359    /// If there isn't enough space to perform the append, this method busy
360    /// waits until the head is advanced. Accepts a replica `idx`; all appended
361    /// operations/entries will be marked with this replica-identifier. Also
362    /// accepts a closure `s`; when waiting for GC, this closure is passed into
363    /// exec() to ensure that this replica does'nt cause a deadlock.
364    ///
365    /// # Note
366    /// Documentation for this function is hidden since `append` is currently not
367    /// intended as a public interface. It is marked as public due to being
368    /// used by the benchmarking code.
369    #[inline(always)]
370    #[doc(hidden)]
371    pub fn append<F: FnMut(T, usize)>(&self, ops: &[T], idx: usize, mut s: F) {
372        let nops = ops.len();
373        let mut iteration = 1;
374        let mut waitgc = 1;
375
376        // Keep trying to reserve entries and add operations to the log until
377        // we succeed in doing so.
378        loop {
379            if iteration % WARN_THRESHOLD == 0 {
380                warn!(
381                    "append(ops.len()={}, {}) takes too many iterations ({}) to complete...",
382                    ops.len(),
383                    idx,
384                    iteration,
385                );
386            }
387            iteration += 1;
388
389            let tail = self.tail.load(Ordering::Relaxed);
390            let head = self.head.load(Ordering::Relaxed);
391
392            // If there are fewer than `GC_FROM_HEAD` entries on the log, then just
393            // try again. The replica that reserved entry (h + self.size - GC_FROM_HEAD)
394            // is currently trying to advance the head of the log. Keep refreshing the
395            // replica against the log to make sure that it isn't deadlocking GC.
396            if tail > head + self.size - GC_FROM_HEAD {
397                if waitgc % WARN_THRESHOLD == 0 {
398                    warn!(
399                        "append(ops.len()={}, {}) takes too many iterations ({}) waiting for gc...",
400                        ops.len(),
401                        idx,
402                        waitgc,
403                    );
404                }
405                waitgc += 1;
406                self.exec(idx, &mut s);
407
408                #[cfg(loom)]
409                loom::thread::yield_now();
410                continue;
411            }
412
413            // If on adding in the above entries there would be fewer than `GC_FROM_HEAD`
414            // entries left on the log, then we need to advance the head of the log.
415            let mut advance = false;
416            if tail + nops > head + self.size - GC_FROM_HEAD {
417                advance = true
418            };
419
420            // Try reserving slots for the operations. If that fails, then restart
421            // from the beginning of this loop.
422            if self.tail.compare_exchange_weak(
423                tail,
424                tail + nops,
425                Ordering::Acquire,
426                Ordering::Acquire,
427            ) != Ok(tail)
428            {
429                continue;
430            };
431
432            // Successfully reserved entries on the shared log. Add the operations in.
433            for (i, op) in ops.iter().enumerate().take(nops) {
434                let e = self.slog[self.index(tail + i)].as_ptr();
435                let mut m = self.lmasks[idx - 1].get();
436
437                // This entry was just reserved so it should be dead (!= m). However, if
438                // the log has wrapped around, then the alive mask has flipped. In this
439                // case, we flip the mask we were originally going to write into the
440                // allocated entry. We cannot flip lmasks[idx - 1] because this replica
441                // might still need to execute a few entries before the wrap around.
442                if unsafe { (*e).alivef.load(Ordering::Relaxed) == m } {
443                    m = !m;
444                }
445
446                unsafe { (*e).operation = Some(op.clone()) };
447                unsafe { (*e).replica = idx };
448                unsafe { (*e).alivef.store(m, Ordering::Release) };
449            }
450
451            // If needed, advance the head of the log forward to make room on the log.
452            if advance {
453                self.advance_head(idx, &mut s);
454            }
455
456            return;
457        }
458    }
459
460    /// Executes a passed in closure (`d`) on all operations starting from
461    /// a replica's local tail on the shared log. The replica is identified through an
462    /// `idx` passed in as an argument.
463    ///
464    /// # Example
465    ///
466    /// ```ignore
467    /// use node_replication::Log;
468    ///
469    /// // Operation type that will go onto the log.
470    /// #[derive(Clone)]
471    /// enum Operation {
472    ///     Read,
473    ///     Write(u64),
474    /// }
475    ///
476    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
477    /// let idx = l.register().expect("Failed to register with the Log.");
478    /// let ops = [Operation::Write(100), Operation::Read];
479    ///
480    /// let f = |op: Operation, id: usize| {
481    ///     match(op) {
482    ///         Operation::Read => println!("Read by {}", id),
483    ///         Operation::Write(x) => println!("Write({}) by {}", x, id),
484    ///     }
485    /// };
486    /// l.append(&ops, idx, f);
487    ///
488    /// // This closure is executed on every operation appended to the
489    /// // since the last call to `exec()` by this replica/thread.
490    /// let mut d = 0;
491    /// let mut g = |op: Operation, id: usize| {
492    ///     match(op) {
493    ///         // The write happened before the read.
494    ///         Operation::Read => assert_eq!(100, d),
495    ///         Operation::Write(x) => d += 100,
496    ///     }
497    /// };
498    /// l.exec(idx, &mut g);
499    /// ```
500    ///
501    /// The passed in closure is expected to take in two arguments: The operation
502    /// from the shared log to be executed and the replica that issued it.
503    #[inline(always)]
504    pub(crate) fn exec<F: FnMut(T, usize)>(&self, idx: usize, d: &mut F) {
505        // Load the logical log offset from which we must execute operations.
506        let ltail = self.ltails[idx - 1].load(Ordering::Relaxed);
507
508        // Check if we have any work to do by comparing our local tail with the log's
509        // global tail. If they're equal, then we're done here and can simply return.
510        let gtail = self.tail.load(Ordering::Relaxed);
511        if ltail == gtail {
512            return;
513        }
514
515        let h = self.head.load(Ordering::Relaxed);
516
517        // Make sure we're within the shared log. If we aren't, then panic.
518        if ltail > gtail || ltail < h {
519            panic!("Local tail not within the shared log!")
520        };
521
522        // Execute all operations from the passed in offset to the shared log's tail. Check if
523        // the entry is live first; we could have a replica that has reserved entries, but not
524        // filled them into the log yet.
525        for i in ltail..gtail {
526            let mut iteration = 1;
527            let e = self.slog[self.index(i)].as_ptr();
528
529            while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[idx - 1].get() } {
530                if iteration % WARN_THRESHOLD == 0 {
531                    warn!(
532                        "alivef not being set for self.index(i={}) = {} (self.lmasks[{}] is {})...",
533                        i,
534                        self.index(i),
535                        idx - 1,
536                        self.lmasks[idx - 1].get()
537                    );
538                }
539                iteration += 1;
540
541                #[cfg(loom)]
542                loom::thread::yield_now();
543            }
544
545            unsafe { d((*e).operation.as_ref().unwrap().clone(), (*e).replica) };
546
547            // Looks like we're going to wrap around now; flip this replica's local mask.
548            if self.index(i) == self.size - 1 {
549                self.lmasks[idx - 1].set(!self.lmasks[idx - 1].get());
550                //trace!("idx: {} lmask: {}", idx, self.lmasks[idx - 1].get());
551            }
552        }
553
554        // Update the completed tail after we've executed these operations.
555        // Also update this replica's local tail.
556        self.ctail.fetch_max(gtail, Ordering::Relaxed);
557        self.ltails[idx - 1].store(gtail, Ordering::Relaxed);
558    }
559
560    /// Returns a physical index given a logical index into the shared log.
561    #[inline(always)]
562    fn index(&self, logical: usize) -> usize {
563        logical & (self.size - 1)
564    }
565
566    /// Advances the head of the log forward. If a replica has stopped making progress,
567    /// then this method will never return. Accepts a closure that is passed into exec()
568    /// to ensure that this replica does not deadlock GC.
569    #[inline(always)]
570    fn advance_head<F: FnMut(T, usize)>(&self, rid: usize, mut s: &mut F) {
571        // Keep looping until we can advance the head and create some free space
572        // on the log. If one of the replicas has stopped making progress, then
573        // this method might never return.
574        let mut iteration = 1;
575        loop {
576            let r = self.next.load(Ordering::Relaxed);
577            let global_head = self.head.load(Ordering::Relaxed);
578            let f = self.tail.load(Ordering::Relaxed);
579
580            let mut min_local_tail = self.ltails[0].load(Ordering::Relaxed);
581
582            // Find the smallest local tail across all replicas.
583            for idx in 1..r {
584                let cur_local_tail = self.ltails[idx - 1].load(Ordering::Relaxed);
585                if min_local_tail > cur_local_tail {
586                    min_local_tail = cur_local_tail
587                };
588            }
589
590            // If we cannot advance the head further, then start
591            // from the beginning of this loop again. Before doing so, try consuming
592            // any new entries on the log to prevent deadlock.
593            if min_local_tail == global_head {
594                if iteration % WARN_THRESHOLD == 0 {
595                    warn!("Spending a long time in `advance_head`, are we starving?");
596                }
597                iteration += 1;
598                self.exec(rid, &mut s);
599
600                #[cfg(loom)]
601                loom::thread::yield_now();
602                continue;
603            }
604
605            // There are entries that can be freed up; update the head offset.
606            self.head.store(min_local_tail, Ordering::Relaxed);
607
608            // Make sure that we freed up enough space so that threads waiting for
609            // GC in append can make progress. Otherwise, try to make progress again.
610            // If we're making progress again, then try consuming entries on the log.
611            if f < min_local_tail + self.size - GC_FROM_HEAD {
612                return;
613            } else {
614                self.exec(rid, &mut s);
615            }
616        }
617    }
618
619    /// Resets the log. Required for microbenchmarking the log; with this method, we
620    /// can re-use the log across experimental runs without having to re-allocate the
621    /// log over and over again.
622    ///
623    /// # Safety
624    ///
625    /// *To be used for testing/benchmarking only, hence marked unsafe*. Before calling
626    /// this method, please make sure that there aren't any replicas/threads actively
627    /// issuing/executing operations to/from this log.
628    #[doc(hidden)]
629    #[inline(always)]
630    pub unsafe fn reset(&self) {
631        // First, reset global metadata.
632        self.head.store(0, Ordering::SeqCst);
633        self.tail.store(0, Ordering::SeqCst);
634        self.next.store(1, Ordering::SeqCst);
635
636        // Next, reset replica-local metadata.
637        for r in 0..MAX_REPLICAS_PER_LOG {
638            self.ltails[r].store(0, Ordering::Relaxed);
639            self.lmasks[r].set(true);
640        }
641
642        // Next, free up all log entries. Use pointers to avoid memcpy and speed up
643        // the reset of the log here.
644        for i in 0..self.size {
645            let e = self.slog[self.index(i)].as_ptr();
646            (*e).alivef.store(false, Ordering::Release);
647        }
648    }
649
650    /// This method checks if the replica is in sync to execute a read-only operation
651    /// right away. It does so by comparing the replica's local tail with the log's
652    /// completed tail.
653    ///
654    /// # Example
655    ///
656    /// ```ignore
657    /// use node_replication::Log;
658    ///
659    /// // Operation type that will go onto the log.
660    /// #[derive(Clone)]
661    /// enum Operation {
662    ///     Read,
663    ///     Write(u64),
664    /// }
665    ///
666    /// // We register two replicas here, `idx1` and `idx2`.
667    /// let l = Log::<Operation>::new(1 * 1024 * 1024);
668    /// let idx1 = l.register().expect("Failed to register with the Log.");
669    /// let idx2 = l.register().expect("Failed to register with the Log.");
670    /// let ops = [Operation::Write(100), Operation::Read];
671    ///
672    /// let f = |op: Operation, id: usize| {
673    ///     match(op) {
674    ///         Operation::Read => println!("Read by {}", id),
675    ///         Operation::Write(x) => println!("Write({}) by {}", x, id),
676    ///     }
677    /// };
678    /// l.append(&ops, idx2, f);
679    ///
680    /// let mut d = 0;
681    /// let mut g = |op: Operation, id: usize| {
682    ///     match(op) {
683    ///         // The write happened before the read.
684    ///         Operation::Read => assert_eq!(100, d),
685    ///         Operation::Write(x) => d += 100,
686    ///     }
687    /// };
688    /// l.exec(idx2, &mut g);
689    ///
690    /// // This assertion fails because `idx1` has not executed operations
691    /// // that were appended by `idx2`.
692    /// assert_eq!(false, l.is_replica_synced_for_reads(idx1, l.get_ctail()));
693    ///
694    /// let mut e = 0;
695    /// let mut g = |op: Operation, id: usize| {
696    ///     match(op) {
697    ///         // The write happened before the read.
698    ///         Operation::Read => assert_eq!(100, e),
699    ///         Operation::Write(x) => e += 100,
700    ///     }
701    /// };
702    /// l.exec(idx1, &mut g);
703    ///
704    /// // `idx1` is all synced up, so this assertion passes.
705    /// assert_eq!(true, l.is_replica_synced_for_reads(idx1, l.get_ctail()));
706    /// ```
707    #[inline(always)]
708    pub(crate) fn is_replica_synced_for_reads(&self, idx: usize, ctail: usize) -> bool {
709        self.ltails[idx - 1].load(Ordering::Relaxed) >= ctail
710    }
711
712    /// This method returns the current ctail value for the log.
713    #[inline(always)]
714    pub(crate) fn get_ctail(&self) -> usize {
715        self.ctail.load(Ordering::Relaxed)
716    }
717}
718
719impl<'a, T> Default for Log<'a, T>
720where
721    T: Sized + Clone,
722{
723    /// Default constructor for the shared log.
724    fn default() -> Self {
725        Log::new(DEFAULT_LOG_BYTES)
726    }
727}
728
729impl<'a, T> Drop for Log<'a, T>
730where
731    T: Sized + Clone,
732{
733    /// Destructor for the shared log.
734    fn drop(&mut self) {
735        unsafe {
736            dealloc(
737                self.rawp,
738                Layout::from_size_align(self.rawb, align_of::<Cell<Entry<T>>>())
739                    .expect("Alignment error while deallocating the shared log!"),
740            )
741        };
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    // Import std so that we have an allocator for our unit tests.
748    extern crate std;
749
750    use super::*;
751    use std::sync::Arc;
752
753    // Define operations along with their arguments that go onto the log.
754    #[derive(Clone)] // Traits required by the log interface.
755    #[derive(Debug, PartialEq)] // Traits required for testing.
756    enum Operation {
757        Read,
758        Write(u64),
759        Invalid,
760    }
761
762    // Required so that we can unit test Entry.
763    impl Default for Operation {
764        fn default() -> Operation {
765            Operation::Invalid
766        }
767    }
768
769    // Test that we can default construct entries correctly.
770    #[test]
771    fn test_entry_create_default() {
772        let e = Entry::<Operation>::default();
773        assert_eq!(e.operation, None);
774        assert_eq!(e.replica, 0);
775        assert_eq!(e.alivef.load(Ordering::Relaxed), false);
776    }
777
778    // Test that our entry_size() method returns the correct size.
779    #[test]
780    fn test_log_entry_size() {
781        assert_eq!(Log::<Operation>::entry_size(), 64);
782    }
783
784    // Tests if a small log can be correctly constructed.
785    #[test]
786    fn test_log_create() {
787        let l = Log::<Operation>::new(1024 * 1024);
788        let n = (1024 * 1024) / Log::<Operation>::entry_size();
789        assert_eq!(l.rawb, 1024 * 1024);
790        assert_eq!(l.size, n);
791        assert_eq!(l.slog.len(), n);
792        assert_eq!(l.head.load(Ordering::Relaxed), 0);
793        assert_eq!(l.tail.load(Ordering::Relaxed), 0);
794        assert_eq!(l.next.load(Ordering::Relaxed), 1);
795        assert_eq!(l.ctail.load(Ordering::Relaxed), 0);
796
797        for i in 0..MAX_REPLICAS_PER_LOG {
798            assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
799        }
800
801        for i in 0..MAX_REPLICAS_PER_LOG {
802            assert_eq!(l.lmasks[i].get(), true);
803        }
804    }
805
806    // Tests if the constructor allocates enough space for GC.
807    #[test]
808    fn test_log_min_size() {
809        let l = Log::<Operation>::new(1024);
810        assert_eq!(l.rawb, 2 * GC_FROM_HEAD * Log::<Operation>::entry_size());
811        assert_eq!(l.size, 2 * GC_FROM_HEAD);
812        assert_eq!(l.slog.len(), 2 * GC_FROM_HEAD);
813    }
814
815    // Tests that the constructor allocates a log whose number of entries
816    // are a power of two.
817    #[test]
818    fn test_log_power_of_two() {
819        let l = Log::<Operation>::new(524 * 1024);
820        let n = ((524 * 1024) / Log::<Operation>::entry_size()).checked_next_power_of_two();
821        assert_eq!(l.rawb, n.unwrap() * Log::<Operation>::entry_size());
822        assert_eq!(l.size, n.unwrap());
823        assert_eq!(l.slog.len(), n.unwrap());
824    }
825
826    // Tests if the log can be successfully default constructed.
827    #[test]
828    fn test_log_create_default() {
829        let l = Log::<Operation>::default();
830        let n = DEFAULT_LOG_BYTES / Log::<Operation>::entry_size();
831        assert_eq!(l.rawb, DEFAULT_LOG_BYTES);
832        assert_eq!(l.size, n);
833        assert_eq!(l.slog.len(), n);
834        assert_eq!(l.head.load(Ordering::Relaxed), 0);
835        assert_eq!(l.tail.load(Ordering::Relaxed), 0);
836        assert_eq!(l.next.load(Ordering::Relaxed), 1);
837        assert_eq!(l.ctail.load(Ordering::Relaxed), 0);
838
839        for i in 0..MAX_REPLICAS_PER_LOG {
840            assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
841        }
842
843        for i in 0..MAX_REPLICAS_PER_LOG {
844            assert_eq!(l.lmasks[i].get(), true);
845        }
846    }
847
848    // Tests if we can correctly index into the shared log.
849    #[test]
850    fn test_log_index() {
851        let l = Log::<Operation>::new(2 * 1024 * 1024);
852        assert_eq!(l.index(99000), 696);
853    }
854
855    // Tests if we can correctly register with the shared log.
856    #[test]
857    fn test_log_register() {
858        let l = Log::<Operation>::new(1024);
859        assert_eq!(l.register(), Some(1));
860        assert_eq!(l.next.load(Ordering::Relaxed), 2);
861    }
862
863    // Tests that we cannot register more than the max replicas with the log.
864    #[test]
865    fn test_log_register_none() {
866        let l = Log::<Operation>::new(1024);
867        l.next.store(MAX_REPLICAS_PER_LOG, Ordering::Relaxed);
868        assert!(l.register().is_none());
869        assert_eq!(l.next.load(Ordering::Relaxed), MAX_REPLICAS_PER_LOG);
870    }
871
872    // Test that we can correctly append an entry into the log.
873    #[test]
874    fn test_log_append() {
875        let l = Log::<Operation>::default();
876        let o = [Operation::Read];
877        l.append(&o, 1, |_o: Operation, _i: usize| {});
878
879        assert_eq!(l.head.load(Ordering::Relaxed), 0);
880        assert_eq!(l.tail.load(Ordering::Relaxed), 1);
881        let slog = l.slog[0].take();
882        assert_eq!(slog.operation, Some(Operation::Read));
883        assert_eq!(slog.replica, 1);
884    }
885
886    // Test that multiple entries can be appended to the log.
887    #[test]
888    fn test_log_append_multiple() {
889        let l = Log::<Operation>::default();
890        let o = [Operation::Read, Operation::Write(119)];
891        l.append(&o, 1, |_o: Operation, _i: usize| {});
892
893        assert_eq!(l.head.load(Ordering::Relaxed), 0);
894        assert_eq!(l.tail.load(Ordering::Relaxed), 2);
895    }
896
897    // Tests that we can advance the head of the log to the smallest of all replica-local tails.
898    #[test]
899    fn test_log_advance_head() {
900        let l = Log::<Operation>::default();
901
902        l.next.store(5, Ordering::Relaxed);
903        l.ltails[0].store(1023, Ordering::Relaxed);
904        l.ltails[1].store(224, Ordering::Relaxed);
905        l.ltails[2].store(4096, Ordering::Relaxed);
906        l.ltails[3].store(799, Ordering::Relaxed);
907
908        l.advance_head(0, &mut |_o: Operation, _i: usize| {});
909        assert_eq!(l.head.load(Ordering::Relaxed), 224);
910    }
911
912    // Tests that the head of the log is advanced when we're close to filling up the entire log.
913    #[test]
914    fn test_log_append_gc() {
915        let l = Log::<Operation>::default();
916        let o: [Operation; 4] = unsafe {
917            let mut a: [Operation; 4] = ::std::mem::MaybeUninit::zeroed().assume_init();
918            for i in &mut a[..] {
919                ::std::ptr::write(i, Operation::Read);
920            }
921            a
922        };
923
924        l.next.store(2, Ordering::Relaxed);
925        l.tail.store(l.size - GC_FROM_HEAD - 1, Ordering::Relaxed);
926        l.ltails[0].store(1024, Ordering::Relaxed);
927        l.append(&o, 1, |_o: Operation, _i: usize| {});
928
929        assert_eq!(l.head.load(Ordering::Relaxed), 1024);
930        assert_eq!(l.tail.load(Ordering::Relaxed), l.size - GC_FROM_HEAD + 3);
931    }
932
933    // Tests that on log wrap around, the local mask stays
934    // the same because entries have not been executed yet.
935    #[test]
936    fn test_log_append_wrap() {
937        let l = Log::<Operation>::default();
938        let o: [Operation; 1024] = unsafe {
939            let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
940            for i in &mut a[..] {
941                ::std::ptr::write(i, Operation::Read);
942            }
943            a
944        };
945
946        l.next.store(2, Ordering::Relaxed);
947        l.head.store(2 * 8192, Ordering::Relaxed);
948        l.tail.store(l.size - 10, Ordering::Relaxed);
949        l.append(&o, 1, |_o: Operation, _i: usize| {});
950
951        assert_eq!(l.lmasks[0].get(), true);
952        assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
953    }
954
955    // Test that we can execute operations appended to the log.
956    #[test]
957    fn test_log_exec() {
958        let l = Log::<Operation>::default();
959        let o = [Operation::Read];
960        let mut f = |op: Operation, i: usize| {
961            assert_eq!(op, Operation::Read);
962            assert_eq!(i, 1);
963        };
964
965        l.append(&o, 1, |_o: Operation, _i: usize| {});
966        l.exec(1, &mut f);
967
968        assert_eq!(
969            l.tail.load(Ordering::Relaxed),
970            l.ctail.load(Ordering::Relaxed)
971        );
972        assert_eq!(
973            l.tail.load(Ordering::Relaxed),
974            l.ltails[0].load(Ordering::Relaxed)
975        );
976    }
977
978    // Test that exec() doesn't do anything when the log is empty.
979    #[test]
980    fn test_log_exec_empty() {
981        let l = Log::<Operation>::default();
982        let mut f = |_o: Operation, _i: usize| {
983            assert!(false);
984        };
985
986        l.exec(1, &mut f);
987    }
988
989    // Test that exec() doesn't do anything if we're already up-to-date.
990    #[test]
991    fn test_log_exec_zero() {
992        let l = Log::<Operation>::default();
993        let o = [Operation::Read];
994        let mut f = |op: Operation, i: usize| {
995            assert_eq!(op, Operation::Read);
996            assert_eq!(i, 1);
997        };
998        let mut g = |_op: Operation, _i: usize| {
999            assert!(false);
1000        };
1001
1002        l.append(&o, 1, |_o: Operation, _i: usize| {});
1003        l.exec(1, &mut f);
1004        l.exec(1, &mut g);
1005    }
1006
1007    // Test that multiple entries on the log can be executed correctly.
1008    #[test]
1009    fn test_log_exec_multiple() {
1010        let l = Log::<Operation>::default();
1011        let o = [Operation::Read, Operation::Write(119)];
1012        let mut s = 0;
1013        let mut f = |op: Operation, _i: usize| match op {
1014            Operation::Read => s += 121,
1015            Operation::Write(v) => s += v,
1016            Operation::Invalid => assert!(false),
1017        };
1018
1019        l.append(&o, 1, |_o: Operation, _i: usize| {});
1020        l.exec(1, &mut f);
1021        assert_eq!(s, 240);
1022
1023        assert_eq!(
1024            l.tail.load(Ordering::Relaxed),
1025            l.ctail.load(Ordering::Relaxed)
1026        );
1027        assert_eq!(
1028            l.tail.load(Ordering::Relaxed),
1029            l.ltails[0].load(Ordering::Relaxed)
1030        );
1031    }
1032
1033    // Test that the replica local mask is updated correctly when executing over
1034    // a wrapped around log.
1035    #[test]
1036    fn test_log_exec_wrap() {
1037        let l = Log::<Operation>::default();
1038        let o: [Operation; 1024] = unsafe {
1039            let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
1040            for i in &mut a[..] {
1041                ::std::ptr::write(i, Operation::Read);
1042            }
1043            a
1044        };
1045        let mut f = |op: Operation, i: usize| {
1046            assert_eq!(op, Operation::Read);
1047            assert_eq!(i, 1);
1048        };
1049
1050        l.append(&o, 1, |_o: Operation, _i: usize| {}); // Required for GC to work correctly.
1051        l.next.store(2, Ordering::SeqCst);
1052        l.head.store(2 * 8192, Ordering::SeqCst);
1053        l.tail.store(l.size - 10, Ordering::SeqCst);
1054        l.append(&o, 1, |_o: Operation, _i: usize| {});
1055
1056        l.ltails[0].store(l.size - 10, Ordering::SeqCst);
1057        l.exec(1, &mut f);
1058
1059        assert_eq!(l.lmasks[0].get(), false);
1060        assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
1061    }
1062
1063    // Tests that exec() panics if the head of the log advances beyond the tail.
1064    #[test]
1065    #[should_panic]
1066    fn test_exec_panic() {
1067        let l = Log::<Operation>::default();
1068        let o: [Operation; 1024] = unsafe {
1069            let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
1070            for i in &mut a[..] {
1071                ::std::ptr::write(i, Operation::Read);
1072            }
1073            a
1074        };
1075        let mut f = |_op: Operation, _i: usize| {
1076            assert!(false);
1077        };
1078
1079        l.append(&o, 1, |_o: Operation, _i: usize| {});
1080        l.head.store(8192, Ordering::SeqCst);
1081
1082        l.exec(1, &mut f);
1083    }
1084
1085    // Tests that operations are cloned when added to the log, and that
1086    // they are correctly dropped once overwritten.
1087    #[test]
1088    fn test_log_change_refcount() {
1089        let l = Log::<Arc<Operation>>::default();
1090        let o1 = [Arc::new(Operation::Read)];
1091        let o2 = [Arc::new(Operation::Read)];
1092        assert_eq!(Arc::strong_count(&o1[0]), 1);
1093        assert_eq!(Arc::strong_count(&o2[0]), 1);
1094
1095        l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
1096        assert_eq!(Arc::strong_count(&o1[0]), 2);
1097        l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
1098        assert_eq!(Arc::strong_count(&o1[0]), 3);
1099
1100        unsafe { l.reset() };
1101
1102        // Over here, we overwrite entries that were written to by the two
1103        // previous appends. This decreases the refcount of o1 and increases
1104        // the refcount of o2.
1105        l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
1106        assert_eq!(Arc::strong_count(&o1[0]), 2);
1107        assert_eq!(Arc::strong_count(&o2[0]), 2);
1108        l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
1109        assert_eq!(Arc::strong_count(&o1[0]), 1);
1110        assert_eq!(Arc::strong_count(&o2[0]), 3);
1111    }
1112
1113    // Tests that operations are cloned when added to the log, and that
1114    // they are correctly dropped once overwritten after the GC.
1115    #[test]
1116    fn test_log_refcount_change_with_gc() {
1117        let entry_size = 64;
1118        let total_entries = 16384;
1119
1120        assert_eq!(Log::<Arc<Operation>>::entry_size(), entry_size);
1121        let size: usize = total_entries * entry_size;
1122        let l = Log::<Arc<Operation>>::new(size);
1123        let o1 = [Arc::new(Operation::Read)];
1124        let o2 = [Arc::new(Operation::Read)];
1125        assert_eq!(Arc::strong_count(&o1[0]), 1);
1126        assert_eq!(Arc::strong_count(&o2[0]), 1);
1127
1128        for i in 1..(total_entries + 1) {
1129            l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
1130            assert_eq!(Arc::strong_count(&o1[0]), i + 1);
1131        }
1132        assert_eq!(Arc::strong_count(&o1[0]), total_entries + 1);
1133
1134        for i in 1..(total_entries + 1) {
1135            l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
1136            assert_eq!(Arc::strong_count(&o1[0]), (total_entries + 1) - i);
1137            assert_eq!(Arc::strong_count(&o2[0]), i + 1);
1138        }
1139        assert_eq!(Arc::strong_count(&o1[0]), 1);
1140        assert_eq!(Arc::strong_count(&o2[0]), total_entries + 1);
1141    }
1142
1143    // Tests that is_replica_synced_for_read() works correctly; it returns
1144    // false when a replica is not synced up and true when it is.
1145    #[test]
1146    fn test_replica_synced_for_read() {
1147        let l = Log::<Operation>::default();
1148        let one = l.register().unwrap();
1149        let two = l.register().unwrap();
1150
1151        assert_eq!(one, 1);
1152        assert_eq!(two, 2);
1153
1154        let o = [Operation::Read];
1155        let mut f = |op: Operation, i: usize| {
1156            assert_eq!(op, Operation::Read);
1157            assert_eq!(i, 1);
1158        };
1159
1160        l.append(&o, one, |_o: Operation, _i: usize| {});
1161        l.exec(one, &mut f);
1162        assert_eq!(l.is_replica_synced_for_reads(one, l.get_ctail()), true);
1163        assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), false);
1164
1165        l.exec(two, &mut f);
1166        assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), true);
1167    }
1168}