node_replication/log.rs
1// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4use alloc::alloc::{alloc, dealloc, Layout};
5
6use core::cell::Cell;
7use core::default::Default;
8use core::fmt;
9use core::mem::{align_of, size_of};
10use core::ops::{Drop, FnMut};
11use core::slice::from_raw_parts_mut;
12
13#[cfg(not(loom))]
14use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15#[cfg(loom)]
16pub use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
17
18use crossbeam_utils::CachePadded;
19
20use crate::context::MAX_PENDING_OPS;
21use crate::replica::MAX_THREADS_PER_REPLICA;
22
23/// The default size of the shared log in bytes. If constructed using the
24/// default constructor, the log will be these many bytes in size. Currently
25/// set to 32 MiB based on the ASPLOS 2017 paper.
26const DEFAULT_LOG_BYTES: usize = 32 * 1024 * 1024;
27const_assert!(DEFAULT_LOG_BYTES >= 1 && (DEFAULT_LOG_BYTES & (DEFAULT_LOG_BYTES - 1) == 0));
28
29/// The maximum number of replicas that can be registered with the log.
30#[cfg(not(loom))]
31pub const MAX_REPLICAS_PER_LOG: usize = 192;
32#[cfg(loom)] // Otherwise uses too much stack space wich crashes in loom...
33pub const MAX_REPLICAS_PER_LOG: usize = 3;
34
35/// Constant required for garbage collection. When the tail and the head are
36/// these many entries apart on the circular buffer, garbage collection will
37/// be performed by one of the replicas registered with the log.
38///
39/// For the GC algorithm to work, we need to ensure that we can support the
40/// largest possible append after deciding to perform GC. This largest possible
41/// append is when every thread within a replica has a full batch of writes
42/// to be appended to the shared log.
43const GC_FROM_HEAD: usize = MAX_PENDING_OPS * MAX_THREADS_PER_REPLICA;
44const_assert!(GC_FROM_HEAD >= 1 && (GC_FROM_HEAD & (GC_FROM_HEAD - 1) == 0));
45
46/// Threshold after how many iterations we log a warning for busy spinning loops.
47///
48/// This helps with debugging to figure out where things may end up blocking.
49/// Should be a power of two to avoid divisions.
50const WARN_THRESHOLD: usize = 1 << 28;
51
52/// An entry that sits on the log. Each entry consists of three fields: The operation to
53/// be performed when a thread reaches this entry on the log, the replica that appended
54/// this operation, and a flag indicating whether this entry is valid.
55///
56/// `T` is the type on the operation - typically an enum class containing opcodes as well as
57/// arguments. It is required that this type be sized and cloneable.
58#[derive(Default)]
59#[repr(align(64))]
60struct Entry<T>
61where
62 T: Sized + Clone,
63{
64 /// The operation that this entry represents.
65 operation: Option<T>,
66
67 /// Identifies the replica that issued the above operation.
68 replica: usize,
69
70 /// Indicates whether this entry represents a valid operation when on the log.
71 alivef: AtomicBool,
72}
73
74/// A log of operations that is typically accessed by multiple
75/// [Replica](struct.Replica.html).
76///
77/// Operations can be added to the log by calling the `append()` method and
78/// providing a list of operations to be performed.
79///
80/// Operations already on the log can be executed by calling the `exec()` method
81/// and providing a replica-id along with a closure. Newly added operations
82/// since the replica last called `exec()` will be executed by invoking the
83/// supplied closure for each one of them.
84///
85/// Accepts one generic type parameter; `T` defines the type of operations and
86/// their arguments that will go on the log and would typically be an enum
87/// class.
88///
89/// This struct is aligned to 64 bytes optimizing cache access.\
90///
91/// # Note
92/// As a client, typically there is no need to call any methods on the Log aside
93/// from `new`. Only in the rare circumstance someone would implement their own
94/// Replica would it be necessary to call any of the Log's methods.
95#[repr(align(64))]
96pub struct Log<'a, T>
97where
98 T: Sized + Clone,
99{
100 /// Raw pointer to the actual underlying log. Required for dealloc.
101 rawp: *mut u8,
102
103 /// Size of the underlying log in bytes. Required for dealloc.
104 rawb: usize,
105
106 /// The maximum number of entries that can be held inside the log.
107 size: usize,
108
109 /// A reference to the actual log. Nothing but a slice of entries.
110 slog: &'a [Cell<Entry<T>>],
111
112 /// Logical index into the above slice at which the log starts.
113 head: CachePadded<AtomicUsize>,
114
115 /// Logical index into the above slice at which the log ends.
116 /// New appends go here.
117 tail: CachePadded<AtomicUsize>,
118
119 /// Completed tail maintains an index <= tail that points to a
120 /// log entry after which there are no completed operations across
121 /// all replicas registered against this log.
122 ctail: CachePadded<AtomicUsize>,
123
124 /// Array consisting of the local tail of each replica registered with the log.
125 /// Required for garbage collection; since replicas make progress over the log
126 /// independently, we want to make sure that we don't garbage collect operations
127 /// that haven't been executed by all replicas.
128 ltails: [CachePadded<AtomicUsize>; MAX_REPLICAS_PER_LOG],
129
130 /// Identifier that will be allocated to the next replica that registers with
131 /// this Log. Also required to correctly index into ltails above.
132 next: CachePadded<AtomicUsize>,
133
134 /// Array consisting of local alive masks for each registered replica. Required
135 /// because replicas make independent progress over the log, so we need to
136 /// track log wrap-arounds for each of them separately.
137 lmasks: [CachePadded<Cell<bool>>; MAX_REPLICAS_PER_LOG],
138}
139
140impl<'a, T> fmt::Debug for Log<'a, T>
141where
142 T: Sized + Clone,
143{
144 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
145 fmt.debug_struct("Log")
146 .field("head", &self.tail)
147 .field("tail", &self.head)
148 .field("size", &self.size)
149 .finish()
150 }
151}
152
153/// The Log is Send. The *mut u8 (`rawp`) is never dereferenced.
154unsafe impl<'a, T> Send for Log<'a, T> where T: Sized + Clone {}
155
156/// The Log is Sync. We know this because: `head` and `tail` are atomic variables, `append()`
157/// reserves entries using a CAS, and exec() does not concurrently mutate entries on the log.
158unsafe impl<'a, T> Sync for Log<'a, T> where T: Sized + Clone {}
159
160impl<'a, T> Log<'a, T>
161where
162 T: Sized + Clone,
163{
164 /// Constructs and returns a log of size `bytes` bytes.
165 /// A size between 1-2 MiB usually works well in most cases.
166 ///
167 /// # Example
168 ///
169 /// ```
170 /// use node_replication::Log;
171 ///
172 /// // Operation type that will go onto the log.
173 /// #[derive(Clone)]
174 /// enum Operation {
175 /// Read,
176 /// Write(u64),
177 /// Invalid,
178 /// }
179 ///
180 /// // Creates a 1 Mega Byte sized log.
181 /// let l = Log::<Operation>::new(1 * 1024 * 1024);
182 /// ```
183 ///
184 /// This method also allocates memory for the log upfront. No further allocations
185 /// will be performed once this method returns.
186 pub fn new<'b>(bytes: usize) -> Log<'b, T> {
187 // Calculate the number of entries that will go into the log, and retrieve a
188 // slice to it from the allocated region of memory.
189 let mut num = bytes / Log::<T>::entry_size();
190
191 // Make sure the log is large enough to allow for periodic garbage collection.
192 if num < 2 * GC_FROM_HEAD {
193 num = 2 * GC_FROM_HEAD;
194 }
195
196 // Round off to the next power of two if required. If we overflow, then set
197 // the number of entries to the minimum required for GC. This is unlikely since
198 // we'd need a log size > 2^63 entries for this to happen.
199 if !num.is_power_of_two() {
200 num = num.checked_next_power_of_two().unwrap_or(2 * GC_FROM_HEAD)
201 };
202
203 // Now that we have the actual number of entries, allocate the log.
204 let b = num * Log::<T>::entry_size();
205 let mem = unsafe {
206 alloc(
207 Layout::from_size_align(b, align_of::<Cell<Entry<T>>>())
208 .expect("Alignment error while allocating the shared log!"),
209 )
210 };
211 if mem.is_null() {
212 panic!("Failed to allocate memory for the shared log!");
213 }
214 let raw = unsafe { from_raw_parts_mut(mem as *mut Cell<Entry<T>>, num) };
215
216 // Initialize all log entries by calling the default constructor.
217 for e in raw.iter_mut() {
218 unsafe {
219 ::core::ptr::write(
220 e,
221 Cell::new(Entry {
222 operation: None,
223 replica: 0usize,
224 alivef: AtomicBool::new(false),
225 }),
226 );
227 }
228 }
229
230 #[allow(clippy::declare_interior_mutable_const)]
231 const LMASK_DEFAULT: CachePadded<Cell<bool>> = CachePadded::new(Cell::new(true));
232
233 #[cfg(not(loom))]
234 {
235 #[allow(clippy::declare_interior_mutable_const)]
236 const LTAIL_DEFAULT: CachePadded<AtomicUsize> = CachePadded::new(AtomicUsize::new(0));
237
238 Log {
239 rawp: mem,
240 rawb: b,
241 size: num,
242 slog: raw,
243 head: CachePadded::new(AtomicUsize::new(0usize)),
244 tail: CachePadded::new(AtomicUsize::new(0usize)),
245 ctail: CachePadded::new(AtomicUsize::new(0usize)),
246 ltails: [LTAIL_DEFAULT; MAX_REPLICAS_PER_LOG],
247 next: CachePadded::new(AtomicUsize::new(1usize)),
248 lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
249 }
250 }
251 // AtomicUsize::new is not const in loom. This code block (including arr
252 // dependency) becomes redundant once
253 // https://github.com/tokio-rs/loom/issues/170 is fixed:
254 #[cfg(loom)]
255 {
256 use arr_macro::arr;
257 Log {
258 rawp: mem,
259 rawb: b,
260 size: num,
261 slog: raw,
262 head: CachePadded::new(AtomicUsize::new(0usize)),
263 tail: CachePadded::new(AtomicUsize::new(0usize)),
264 ctail: CachePadded::new(AtomicUsize::new(0usize)),
265 ltails: arr![CachePadded::new(AtomicUsize::new(0)); 3], // MAX_REPLICAS_PER_LOG
266 next: CachePadded::new(AtomicUsize::new(1usize)),
267 lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
268 }
269 }
270 }
271
272 /// Returns the size of an entry in bytes.
273 fn entry_size() -> usize {
274 size_of::<Cell<Entry<T>>>()
275 }
276
277 /// Registers a replica with the log. Returns an identifier that the replica
278 /// can use to execute operations on the log.
279 ///
280 /// # Example
281 ///
282 /// ```ignore
283 /// use node_replication::Log;
284 ///
285 /// // Operation type that will go onto the log.
286 /// #[derive(Clone)]
287 /// enum Operation {
288 /// Read,
289 /// Write(u64),
290 /// Invalid,
291 /// }
292 ///
293 /// // Creates a 1 Mega Byte sized log.
294 /// let l = Log::<Operation>::new(1 * 1024 * 1024);
295 ///
296 /// // Registers against the log. `idx` can now be used to append operations
297 /// // to the log, and execute these operations.
298 /// let idx = l.register().expect("Failed to register with the Log.");
299 /// ```
300 pub(crate) fn register(&self) -> Option<usize> {
301 // Loop until we either run out of identifiers or we manage to increment `next`.
302 loop {
303 let n = self.next.load(Ordering::Relaxed);
304
305 // Check if we've exceeded the maximum number of replicas the log can support.
306 if n >= MAX_REPLICAS_PER_LOG {
307 return None;
308 };
309
310 if self
311 .next
312 .compare_exchange_weak(n, n + 1, Ordering::SeqCst, Ordering::SeqCst)
313 != Ok(n)
314 {
315 continue;
316 };
317
318 return Some(n);
319 }
320 }
321
322 /// Adds a batch of operations to the shared log.
323 ///
324 /// # Example
325 ///
326 /// ```ignore
327 /// use node_replication::Log;
328 ///
329 /// // Operation type that will go onto the log.
330 /// #[derive(Clone)]
331 /// enum Operation {
332 /// Read,
333 /// Write(u64),
334 /// }
335 ///
336 /// let l = Log::<Operation>::new(1 * 1024 * 1024);
337 /// let idx = l.register().expect("Failed to register with the Log.");
338 ///
339 /// // The set of operations we would like to append. The order will
340 /// // be preserved by the interface.
341 /// let ops = [Operation::Write(100), Operation::Read];
342 ///
343 /// // `append()` might have to garbage collect the log. When doing so,
344 /// // it might encounter operations added in by another replica/thread.
345 /// // This closure allows us to consume those operations. `id` identifies
346 /// // the replica that added in those operations.
347 /// let f = |op: Operation, id: usize| {
348 /// match(op) {
349 /// Operation::Read => println!("Read by {}", id),
350 /// Operation::Write(x) => println!("Write({}) by {}", x, id),
351 /// }
352 /// };
353 ///
354 /// // Append the operations. These operations will be marked with `idx`,
355 /// // and will be linearized at the tail of the log.
356 /// l.append(&ops, idx, f);
357 /// ```
358 ///
359 /// If there isn't enough space to perform the append, this method busy
360 /// waits until the head is advanced. Accepts a replica `idx`; all appended
361 /// operations/entries will be marked with this replica-identifier. Also
362 /// accepts a closure `s`; when waiting for GC, this closure is passed into
363 /// exec() to ensure that this replica does'nt cause a deadlock.
364 ///
365 /// # Note
366 /// Documentation for this function is hidden since `append` is currently not
367 /// intended as a public interface. It is marked as public due to being
368 /// used by the benchmarking code.
369 #[inline(always)]
370 #[doc(hidden)]
371 pub fn append<F: FnMut(T, usize)>(&self, ops: &[T], idx: usize, mut s: F) {
372 let nops = ops.len();
373 let mut iteration = 1;
374 let mut waitgc = 1;
375
376 // Keep trying to reserve entries and add operations to the log until
377 // we succeed in doing so.
378 loop {
379 if iteration % WARN_THRESHOLD == 0 {
380 warn!(
381 "append(ops.len()={}, {}) takes too many iterations ({}) to complete...",
382 ops.len(),
383 idx,
384 iteration,
385 );
386 }
387 iteration += 1;
388
389 let tail = self.tail.load(Ordering::Relaxed);
390 let head = self.head.load(Ordering::Relaxed);
391
392 // If there are fewer than `GC_FROM_HEAD` entries on the log, then just
393 // try again. The replica that reserved entry (h + self.size - GC_FROM_HEAD)
394 // is currently trying to advance the head of the log. Keep refreshing the
395 // replica against the log to make sure that it isn't deadlocking GC.
396 if tail > head + self.size - GC_FROM_HEAD {
397 if waitgc % WARN_THRESHOLD == 0 {
398 warn!(
399 "append(ops.len()={}, {}) takes too many iterations ({}) waiting for gc...",
400 ops.len(),
401 idx,
402 waitgc,
403 );
404 }
405 waitgc += 1;
406 self.exec(idx, &mut s);
407
408 #[cfg(loom)]
409 loom::thread::yield_now();
410 continue;
411 }
412
413 // If on adding in the above entries there would be fewer than `GC_FROM_HEAD`
414 // entries left on the log, then we need to advance the head of the log.
415 let mut advance = false;
416 if tail + nops > head + self.size - GC_FROM_HEAD {
417 advance = true
418 };
419
420 // Try reserving slots for the operations. If that fails, then restart
421 // from the beginning of this loop.
422 if self.tail.compare_exchange_weak(
423 tail,
424 tail + nops,
425 Ordering::Acquire,
426 Ordering::Acquire,
427 ) != Ok(tail)
428 {
429 continue;
430 };
431
432 // Successfully reserved entries on the shared log. Add the operations in.
433 for (i, op) in ops.iter().enumerate().take(nops) {
434 let e = self.slog[self.index(tail + i)].as_ptr();
435 let mut m = self.lmasks[idx - 1].get();
436
437 // This entry was just reserved so it should be dead (!= m). However, if
438 // the log has wrapped around, then the alive mask has flipped. In this
439 // case, we flip the mask we were originally going to write into the
440 // allocated entry. We cannot flip lmasks[idx - 1] because this replica
441 // might still need to execute a few entries before the wrap around.
442 if unsafe { (*e).alivef.load(Ordering::Relaxed) == m } {
443 m = !m;
444 }
445
446 unsafe { (*e).operation = Some(op.clone()) };
447 unsafe { (*e).replica = idx };
448 unsafe { (*e).alivef.store(m, Ordering::Release) };
449 }
450
451 // If needed, advance the head of the log forward to make room on the log.
452 if advance {
453 self.advance_head(idx, &mut s);
454 }
455
456 return;
457 }
458 }
459
460 /// Executes a passed in closure (`d`) on all operations starting from
461 /// a replica's local tail on the shared log. The replica is identified through an
462 /// `idx` passed in as an argument.
463 ///
464 /// # Example
465 ///
466 /// ```ignore
467 /// use node_replication::Log;
468 ///
469 /// // Operation type that will go onto the log.
470 /// #[derive(Clone)]
471 /// enum Operation {
472 /// Read,
473 /// Write(u64),
474 /// }
475 ///
476 /// let l = Log::<Operation>::new(1 * 1024 * 1024);
477 /// let idx = l.register().expect("Failed to register with the Log.");
478 /// let ops = [Operation::Write(100), Operation::Read];
479 ///
480 /// let f = |op: Operation, id: usize| {
481 /// match(op) {
482 /// Operation::Read => println!("Read by {}", id),
483 /// Operation::Write(x) => println!("Write({}) by {}", x, id),
484 /// }
485 /// };
486 /// l.append(&ops, idx, f);
487 ///
488 /// // This closure is executed on every operation appended to the
489 /// // since the last call to `exec()` by this replica/thread.
490 /// let mut d = 0;
491 /// let mut g = |op: Operation, id: usize| {
492 /// match(op) {
493 /// // The write happened before the read.
494 /// Operation::Read => assert_eq!(100, d),
495 /// Operation::Write(x) => d += 100,
496 /// }
497 /// };
498 /// l.exec(idx, &mut g);
499 /// ```
500 ///
501 /// The passed in closure is expected to take in two arguments: The operation
502 /// from the shared log to be executed and the replica that issued it.
503 #[inline(always)]
504 pub(crate) fn exec<F: FnMut(T, usize)>(&self, idx: usize, d: &mut F) {
505 // Load the logical log offset from which we must execute operations.
506 let ltail = self.ltails[idx - 1].load(Ordering::Relaxed);
507
508 // Check if we have any work to do by comparing our local tail with the log's
509 // global tail. If they're equal, then we're done here and can simply return.
510 let gtail = self.tail.load(Ordering::Relaxed);
511 if ltail == gtail {
512 return;
513 }
514
515 let h = self.head.load(Ordering::Relaxed);
516
517 // Make sure we're within the shared log. If we aren't, then panic.
518 if ltail > gtail || ltail < h {
519 panic!("Local tail not within the shared log!")
520 };
521
522 // Execute all operations from the passed in offset to the shared log's tail. Check if
523 // the entry is live first; we could have a replica that has reserved entries, but not
524 // filled them into the log yet.
525 for i in ltail..gtail {
526 let mut iteration = 1;
527 let e = self.slog[self.index(i)].as_ptr();
528
529 while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[idx - 1].get() } {
530 if iteration % WARN_THRESHOLD == 0 {
531 warn!(
532 "alivef not being set for self.index(i={}) = {} (self.lmasks[{}] is {})...",
533 i,
534 self.index(i),
535 idx - 1,
536 self.lmasks[idx - 1].get()
537 );
538 }
539 iteration += 1;
540
541 #[cfg(loom)]
542 loom::thread::yield_now();
543 }
544
545 unsafe { d((*e).operation.as_ref().unwrap().clone(), (*e).replica) };
546
547 // Looks like we're going to wrap around now; flip this replica's local mask.
548 if self.index(i) == self.size - 1 {
549 self.lmasks[idx - 1].set(!self.lmasks[idx - 1].get());
550 //trace!("idx: {} lmask: {}", idx, self.lmasks[idx - 1].get());
551 }
552 }
553
554 // Update the completed tail after we've executed these operations.
555 // Also update this replica's local tail.
556 self.ctail.fetch_max(gtail, Ordering::Relaxed);
557 self.ltails[idx - 1].store(gtail, Ordering::Relaxed);
558 }
559
560 /// Returns a physical index given a logical index into the shared log.
561 #[inline(always)]
562 fn index(&self, logical: usize) -> usize {
563 logical & (self.size - 1)
564 }
565
566 /// Advances the head of the log forward. If a replica has stopped making progress,
567 /// then this method will never return. Accepts a closure that is passed into exec()
568 /// to ensure that this replica does not deadlock GC.
569 #[inline(always)]
570 fn advance_head<F: FnMut(T, usize)>(&self, rid: usize, mut s: &mut F) {
571 // Keep looping until we can advance the head and create some free space
572 // on the log. If one of the replicas has stopped making progress, then
573 // this method might never return.
574 let mut iteration = 1;
575 loop {
576 let r = self.next.load(Ordering::Relaxed);
577 let global_head = self.head.load(Ordering::Relaxed);
578 let f = self.tail.load(Ordering::Relaxed);
579
580 let mut min_local_tail = self.ltails[0].load(Ordering::Relaxed);
581
582 // Find the smallest local tail across all replicas.
583 for idx in 1..r {
584 let cur_local_tail = self.ltails[idx - 1].load(Ordering::Relaxed);
585 if min_local_tail > cur_local_tail {
586 min_local_tail = cur_local_tail
587 };
588 }
589
590 // If we cannot advance the head further, then start
591 // from the beginning of this loop again. Before doing so, try consuming
592 // any new entries on the log to prevent deadlock.
593 if min_local_tail == global_head {
594 if iteration % WARN_THRESHOLD == 0 {
595 warn!("Spending a long time in `advance_head`, are we starving?");
596 }
597 iteration += 1;
598 self.exec(rid, &mut s);
599
600 #[cfg(loom)]
601 loom::thread::yield_now();
602 continue;
603 }
604
605 // There are entries that can be freed up; update the head offset.
606 self.head.store(min_local_tail, Ordering::Relaxed);
607
608 // Make sure that we freed up enough space so that threads waiting for
609 // GC in append can make progress. Otherwise, try to make progress again.
610 // If we're making progress again, then try consuming entries on the log.
611 if f < min_local_tail + self.size - GC_FROM_HEAD {
612 return;
613 } else {
614 self.exec(rid, &mut s);
615 }
616 }
617 }
618
619 /// Resets the log. Required for microbenchmarking the log; with this method, we
620 /// can re-use the log across experimental runs without having to re-allocate the
621 /// log over and over again.
622 ///
623 /// # Safety
624 ///
625 /// *To be used for testing/benchmarking only, hence marked unsafe*. Before calling
626 /// this method, please make sure that there aren't any replicas/threads actively
627 /// issuing/executing operations to/from this log.
628 #[doc(hidden)]
629 #[inline(always)]
630 pub unsafe fn reset(&self) {
631 // First, reset global metadata.
632 self.head.store(0, Ordering::SeqCst);
633 self.tail.store(0, Ordering::SeqCst);
634 self.next.store(1, Ordering::SeqCst);
635
636 // Next, reset replica-local metadata.
637 for r in 0..MAX_REPLICAS_PER_LOG {
638 self.ltails[r].store(0, Ordering::Relaxed);
639 self.lmasks[r].set(true);
640 }
641
642 // Next, free up all log entries. Use pointers to avoid memcpy and speed up
643 // the reset of the log here.
644 for i in 0..self.size {
645 let e = self.slog[self.index(i)].as_ptr();
646 (*e).alivef.store(false, Ordering::Release);
647 }
648 }
649
650 /// This method checks if the replica is in sync to execute a read-only operation
651 /// right away. It does so by comparing the replica's local tail with the log's
652 /// completed tail.
653 ///
654 /// # Example
655 ///
656 /// ```ignore
657 /// use node_replication::Log;
658 ///
659 /// // Operation type that will go onto the log.
660 /// #[derive(Clone)]
661 /// enum Operation {
662 /// Read,
663 /// Write(u64),
664 /// }
665 ///
666 /// // We register two replicas here, `idx1` and `idx2`.
667 /// let l = Log::<Operation>::new(1 * 1024 * 1024);
668 /// let idx1 = l.register().expect("Failed to register with the Log.");
669 /// let idx2 = l.register().expect("Failed to register with the Log.");
670 /// let ops = [Operation::Write(100), Operation::Read];
671 ///
672 /// let f = |op: Operation, id: usize| {
673 /// match(op) {
674 /// Operation::Read => println!("Read by {}", id),
675 /// Operation::Write(x) => println!("Write({}) by {}", x, id),
676 /// }
677 /// };
678 /// l.append(&ops, idx2, f);
679 ///
680 /// let mut d = 0;
681 /// let mut g = |op: Operation, id: usize| {
682 /// match(op) {
683 /// // The write happened before the read.
684 /// Operation::Read => assert_eq!(100, d),
685 /// Operation::Write(x) => d += 100,
686 /// }
687 /// };
688 /// l.exec(idx2, &mut g);
689 ///
690 /// // This assertion fails because `idx1` has not executed operations
691 /// // that were appended by `idx2`.
692 /// assert_eq!(false, l.is_replica_synced_for_reads(idx1, l.get_ctail()));
693 ///
694 /// let mut e = 0;
695 /// let mut g = |op: Operation, id: usize| {
696 /// match(op) {
697 /// // The write happened before the read.
698 /// Operation::Read => assert_eq!(100, e),
699 /// Operation::Write(x) => e += 100,
700 /// }
701 /// };
702 /// l.exec(idx1, &mut g);
703 ///
704 /// // `idx1` is all synced up, so this assertion passes.
705 /// assert_eq!(true, l.is_replica_synced_for_reads(idx1, l.get_ctail()));
706 /// ```
707 #[inline(always)]
708 pub(crate) fn is_replica_synced_for_reads(&self, idx: usize, ctail: usize) -> bool {
709 self.ltails[idx - 1].load(Ordering::Relaxed) >= ctail
710 }
711
712 /// This method returns the current ctail value for the log.
713 #[inline(always)]
714 pub(crate) fn get_ctail(&self) -> usize {
715 self.ctail.load(Ordering::Relaxed)
716 }
717}
718
719impl<'a, T> Default for Log<'a, T>
720where
721 T: Sized + Clone,
722{
723 /// Default constructor for the shared log.
724 fn default() -> Self {
725 Log::new(DEFAULT_LOG_BYTES)
726 }
727}
728
729impl<'a, T> Drop for Log<'a, T>
730where
731 T: Sized + Clone,
732{
733 /// Destructor for the shared log.
734 fn drop(&mut self) {
735 unsafe {
736 dealloc(
737 self.rawp,
738 Layout::from_size_align(self.rawb, align_of::<Cell<Entry<T>>>())
739 .expect("Alignment error while deallocating the shared log!"),
740 )
741 };
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 // Import std so that we have an allocator for our unit tests.
748 extern crate std;
749
750 use super::*;
751 use std::sync::Arc;
752
753 // Define operations along with their arguments that go onto the log.
754 #[derive(Clone)] // Traits required by the log interface.
755 #[derive(Debug, PartialEq)] // Traits required for testing.
756 enum Operation {
757 Read,
758 Write(u64),
759 Invalid,
760 }
761
762 // Required so that we can unit test Entry.
763 impl Default for Operation {
764 fn default() -> Operation {
765 Operation::Invalid
766 }
767 }
768
769 // Test that we can default construct entries correctly.
770 #[test]
771 fn test_entry_create_default() {
772 let e = Entry::<Operation>::default();
773 assert_eq!(e.operation, None);
774 assert_eq!(e.replica, 0);
775 assert_eq!(e.alivef.load(Ordering::Relaxed), false);
776 }
777
778 // Test that our entry_size() method returns the correct size.
779 #[test]
780 fn test_log_entry_size() {
781 assert_eq!(Log::<Operation>::entry_size(), 64);
782 }
783
784 // Tests if a small log can be correctly constructed.
785 #[test]
786 fn test_log_create() {
787 let l = Log::<Operation>::new(1024 * 1024);
788 let n = (1024 * 1024) / Log::<Operation>::entry_size();
789 assert_eq!(l.rawb, 1024 * 1024);
790 assert_eq!(l.size, n);
791 assert_eq!(l.slog.len(), n);
792 assert_eq!(l.head.load(Ordering::Relaxed), 0);
793 assert_eq!(l.tail.load(Ordering::Relaxed), 0);
794 assert_eq!(l.next.load(Ordering::Relaxed), 1);
795 assert_eq!(l.ctail.load(Ordering::Relaxed), 0);
796
797 for i in 0..MAX_REPLICAS_PER_LOG {
798 assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
799 }
800
801 for i in 0..MAX_REPLICAS_PER_LOG {
802 assert_eq!(l.lmasks[i].get(), true);
803 }
804 }
805
806 // Tests if the constructor allocates enough space for GC.
807 #[test]
808 fn test_log_min_size() {
809 let l = Log::<Operation>::new(1024);
810 assert_eq!(l.rawb, 2 * GC_FROM_HEAD * Log::<Operation>::entry_size());
811 assert_eq!(l.size, 2 * GC_FROM_HEAD);
812 assert_eq!(l.slog.len(), 2 * GC_FROM_HEAD);
813 }
814
815 // Tests that the constructor allocates a log whose number of entries
816 // are a power of two.
817 #[test]
818 fn test_log_power_of_two() {
819 let l = Log::<Operation>::new(524 * 1024);
820 let n = ((524 * 1024) / Log::<Operation>::entry_size()).checked_next_power_of_two();
821 assert_eq!(l.rawb, n.unwrap() * Log::<Operation>::entry_size());
822 assert_eq!(l.size, n.unwrap());
823 assert_eq!(l.slog.len(), n.unwrap());
824 }
825
826 // Tests if the log can be successfully default constructed.
827 #[test]
828 fn test_log_create_default() {
829 let l = Log::<Operation>::default();
830 let n = DEFAULT_LOG_BYTES / Log::<Operation>::entry_size();
831 assert_eq!(l.rawb, DEFAULT_LOG_BYTES);
832 assert_eq!(l.size, n);
833 assert_eq!(l.slog.len(), n);
834 assert_eq!(l.head.load(Ordering::Relaxed), 0);
835 assert_eq!(l.tail.load(Ordering::Relaxed), 0);
836 assert_eq!(l.next.load(Ordering::Relaxed), 1);
837 assert_eq!(l.ctail.load(Ordering::Relaxed), 0);
838
839 for i in 0..MAX_REPLICAS_PER_LOG {
840 assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
841 }
842
843 for i in 0..MAX_REPLICAS_PER_LOG {
844 assert_eq!(l.lmasks[i].get(), true);
845 }
846 }
847
848 // Tests if we can correctly index into the shared log.
849 #[test]
850 fn test_log_index() {
851 let l = Log::<Operation>::new(2 * 1024 * 1024);
852 assert_eq!(l.index(99000), 696);
853 }
854
855 // Tests if we can correctly register with the shared log.
856 #[test]
857 fn test_log_register() {
858 let l = Log::<Operation>::new(1024);
859 assert_eq!(l.register(), Some(1));
860 assert_eq!(l.next.load(Ordering::Relaxed), 2);
861 }
862
863 // Tests that we cannot register more than the max replicas with the log.
864 #[test]
865 fn test_log_register_none() {
866 let l = Log::<Operation>::new(1024);
867 l.next.store(MAX_REPLICAS_PER_LOG, Ordering::Relaxed);
868 assert!(l.register().is_none());
869 assert_eq!(l.next.load(Ordering::Relaxed), MAX_REPLICAS_PER_LOG);
870 }
871
872 // Test that we can correctly append an entry into the log.
873 #[test]
874 fn test_log_append() {
875 let l = Log::<Operation>::default();
876 let o = [Operation::Read];
877 l.append(&o, 1, |_o: Operation, _i: usize| {});
878
879 assert_eq!(l.head.load(Ordering::Relaxed), 0);
880 assert_eq!(l.tail.load(Ordering::Relaxed), 1);
881 let slog = l.slog[0].take();
882 assert_eq!(slog.operation, Some(Operation::Read));
883 assert_eq!(slog.replica, 1);
884 }
885
886 // Test that multiple entries can be appended to the log.
887 #[test]
888 fn test_log_append_multiple() {
889 let l = Log::<Operation>::default();
890 let o = [Operation::Read, Operation::Write(119)];
891 l.append(&o, 1, |_o: Operation, _i: usize| {});
892
893 assert_eq!(l.head.load(Ordering::Relaxed), 0);
894 assert_eq!(l.tail.load(Ordering::Relaxed), 2);
895 }
896
897 // Tests that we can advance the head of the log to the smallest of all replica-local tails.
898 #[test]
899 fn test_log_advance_head() {
900 let l = Log::<Operation>::default();
901
902 l.next.store(5, Ordering::Relaxed);
903 l.ltails[0].store(1023, Ordering::Relaxed);
904 l.ltails[1].store(224, Ordering::Relaxed);
905 l.ltails[2].store(4096, Ordering::Relaxed);
906 l.ltails[3].store(799, Ordering::Relaxed);
907
908 l.advance_head(0, &mut |_o: Operation, _i: usize| {});
909 assert_eq!(l.head.load(Ordering::Relaxed), 224);
910 }
911
912 // Tests that the head of the log is advanced when we're close to filling up the entire log.
913 #[test]
914 fn test_log_append_gc() {
915 let l = Log::<Operation>::default();
916 let o: [Operation; 4] = unsafe {
917 let mut a: [Operation; 4] = ::std::mem::MaybeUninit::zeroed().assume_init();
918 for i in &mut a[..] {
919 ::std::ptr::write(i, Operation::Read);
920 }
921 a
922 };
923
924 l.next.store(2, Ordering::Relaxed);
925 l.tail.store(l.size - GC_FROM_HEAD - 1, Ordering::Relaxed);
926 l.ltails[0].store(1024, Ordering::Relaxed);
927 l.append(&o, 1, |_o: Operation, _i: usize| {});
928
929 assert_eq!(l.head.load(Ordering::Relaxed), 1024);
930 assert_eq!(l.tail.load(Ordering::Relaxed), l.size - GC_FROM_HEAD + 3);
931 }
932
933 // Tests that on log wrap around, the local mask stays
934 // the same because entries have not been executed yet.
935 #[test]
936 fn test_log_append_wrap() {
937 let l = Log::<Operation>::default();
938 let o: [Operation; 1024] = unsafe {
939 let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
940 for i in &mut a[..] {
941 ::std::ptr::write(i, Operation::Read);
942 }
943 a
944 };
945
946 l.next.store(2, Ordering::Relaxed);
947 l.head.store(2 * 8192, Ordering::Relaxed);
948 l.tail.store(l.size - 10, Ordering::Relaxed);
949 l.append(&o, 1, |_o: Operation, _i: usize| {});
950
951 assert_eq!(l.lmasks[0].get(), true);
952 assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
953 }
954
955 // Test that we can execute operations appended to the log.
956 #[test]
957 fn test_log_exec() {
958 let l = Log::<Operation>::default();
959 let o = [Operation::Read];
960 let mut f = |op: Operation, i: usize| {
961 assert_eq!(op, Operation::Read);
962 assert_eq!(i, 1);
963 };
964
965 l.append(&o, 1, |_o: Operation, _i: usize| {});
966 l.exec(1, &mut f);
967
968 assert_eq!(
969 l.tail.load(Ordering::Relaxed),
970 l.ctail.load(Ordering::Relaxed)
971 );
972 assert_eq!(
973 l.tail.load(Ordering::Relaxed),
974 l.ltails[0].load(Ordering::Relaxed)
975 );
976 }
977
978 // Test that exec() doesn't do anything when the log is empty.
979 #[test]
980 fn test_log_exec_empty() {
981 let l = Log::<Operation>::default();
982 let mut f = |_o: Operation, _i: usize| {
983 assert!(false);
984 };
985
986 l.exec(1, &mut f);
987 }
988
989 // Test that exec() doesn't do anything if we're already up-to-date.
990 #[test]
991 fn test_log_exec_zero() {
992 let l = Log::<Operation>::default();
993 let o = [Operation::Read];
994 let mut f = |op: Operation, i: usize| {
995 assert_eq!(op, Operation::Read);
996 assert_eq!(i, 1);
997 };
998 let mut g = |_op: Operation, _i: usize| {
999 assert!(false);
1000 };
1001
1002 l.append(&o, 1, |_o: Operation, _i: usize| {});
1003 l.exec(1, &mut f);
1004 l.exec(1, &mut g);
1005 }
1006
1007 // Test that multiple entries on the log can be executed correctly.
1008 #[test]
1009 fn test_log_exec_multiple() {
1010 let l = Log::<Operation>::default();
1011 let o = [Operation::Read, Operation::Write(119)];
1012 let mut s = 0;
1013 let mut f = |op: Operation, _i: usize| match op {
1014 Operation::Read => s += 121,
1015 Operation::Write(v) => s += v,
1016 Operation::Invalid => assert!(false),
1017 };
1018
1019 l.append(&o, 1, |_o: Operation, _i: usize| {});
1020 l.exec(1, &mut f);
1021 assert_eq!(s, 240);
1022
1023 assert_eq!(
1024 l.tail.load(Ordering::Relaxed),
1025 l.ctail.load(Ordering::Relaxed)
1026 );
1027 assert_eq!(
1028 l.tail.load(Ordering::Relaxed),
1029 l.ltails[0].load(Ordering::Relaxed)
1030 );
1031 }
1032
1033 // Test that the replica local mask is updated correctly when executing over
1034 // a wrapped around log.
1035 #[test]
1036 fn test_log_exec_wrap() {
1037 let l = Log::<Operation>::default();
1038 let o: [Operation; 1024] = unsafe {
1039 let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
1040 for i in &mut a[..] {
1041 ::std::ptr::write(i, Operation::Read);
1042 }
1043 a
1044 };
1045 let mut f = |op: Operation, i: usize| {
1046 assert_eq!(op, Operation::Read);
1047 assert_eq!(i, 1);
1048 };
1049
1050 l.append(&o, 1, |_o: Operation, _i: usize| {}); // Required for GC to work correctly.
1051 l.next.store(2, Ordering::SeqCst);
1052 l.head.store(2 * 8192, Ordering::SeqCst);
1053 l.tail.store(l.size - 10, Ordering::SeqCst);
1054 l.append(&o, 1, |_o: Operation, _i: usize| {});
1055
1056 l.ltails[0].store(l.size - 10, Ordering::SeqCst);
1057 l.exec(1, &mut f);
1058
1059 assert_eq!(l.lmasks[0].get(), false);
1060 assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
1061 }
1062
1063 // Tests that exec() panics if the head of the log advances beyond the tail.
1064 #[test]
1065 #[should_panic]
1066 fn test_exec_panic() {
1067 let l = Log::<Operation>::default();
1068 let o: [Operation; 1024] = unsafe {
1069 let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
1070 for i in &mut a[..] {
1071 ::std::ptr::write(i, Operation::Read);
1072 }
1073 a
1074 };
1075 let mut f = |_op: Operation, _i: usize| {
1076 assert!(false);
1077 };
1078
1079 l.append(&o, 1, |_o: Operation, _i: usize| {});
1080 l.head.store(8192, Ordering::SeqCst);
1081
1082 l.exec(1, &mut f);
1083 }
1084
1085 // Tests that operations are cloned when added to the log, and that
1086 // they are correctly dropped once overwritten.
1087 #[test]
1088 fn test_log_change_refcount() {
1089 let l = Log::<Arc<Operation>>::default();
1090 let o1 = [Arc::new(Operation::Read)];
1091 let o2 = [Arc::new(Operation::Read)];
1092 assert_eq!(Arc::strong_count(&o1[0]), 1);
1093 assert_eq!(Arc::strong_count(&o2[0]), 1);
1094
1095 l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
1096 assert_eq!(Arc::strong_count(&o1[0]), 2);
1097 l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
1098 assert_eq!(Arc::strong_count(&o1[0]), 3);
1099
1100 unsafe { l.reset() };
1101
1102 // Over here, we overwrite entries that were written to by the two
1103 // previous appends. This decreases the refcount of o1 and increases
1104 // the refcount of o2.
1105 l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
1106 assert_eq!(Arc::strong_count(&o1[0]), 2);
1107 assert_eq!(Arc::strong_count(&o2[0]), 2);
1108 l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
1109 assert_eq!(Arc::strong_count(&o1[0]), 1);
1110 assert_eq!(Arc::strong_count(&o2[0]), 3);
1111 }
1112
1113 // Tests that operations are cloned when added to the log, and that
1114 // they are correctly dropped once overwritten after the GC.
1115 #[test]
1116 fn test_log_refcount_change_with_gc() {
1117 let entry_size = 64;
1118 let total_entries = 16384;
1119
1120 assert_eq!(Log::<Arc<Operation>>::entry_size(), entry_size);
1121 let size: usize = total_entries * entry_size;
1122 let l = Log::<Arc<Operation>>::new(size);
1123 let o1 = [Arc::new(Operation::Read)];
1124 let o2 = [Arc::new(Operation::Read)];
1125 assert_eq!(Arc::strong_count(&o1[0]), 1);
1126 assert_eq!(Arc::strong_count(&o2[0]), 1);
1127
1128 for i in 1..(total_entries + 1) {
1129 l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
1130 assert_eq!(Arc::strong_count(&o1[0]), i + 1);
1131 }
1132 assert_eq!(Arc::strong_count(&o1[0]), total_entries + 1);
1133
1134 for i in 1..(total_entries + 1) {
1135 l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
1136 assert_eq!(Arc::strong_count(&o1[0]), (total_entries + 1) - i);
1137 assert_eq!(Arc::strong_count(&o2[0]), i + 1);
1138 }
1139 assert_eq!(Arc::strong_count(&o1[0]), 1);
1140 assert_eq!(Arc::strong_count(&o2[0]), total_entries + 1);
1141 }
1142
1143 // Tests that is_replica_synced_for_read() works correctly; it returns
1144 // false when a replica is not synced up and true when it is.
1145 #[test]
1146 fn test_replica_synced_for_read() {
1147 let l = Log::<Operation>::default();
1148 let one = l.register().unwrap();
1149 let two = l.register().unwrap();
1150
1151 assert_eq!(one, 1);
1152 assert_eq!(two, 2);
1153
1154 let o = [Operation::Read];
1155 let mut f = |op: Operation, i: usize| {
1156 assert_eq!(op, Operation::Read);
1157 assert_eq!(i, 1);
1158 };
1159
1160 l.append(&o, one, |_o: Operation, _i: usize| {});
1161 l.exec(one, &mut f);
1162 assert_eq!(l.is_replica_synced_for_reads(one, l.get_ctail()), true);
1163 assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), false);
1164
1165 l.exec(two, &mut f);
1166 assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), true);
1167 }
1168}