use crate::sync::{AtomicUsize, Ordering};
use crate::LogError;
use std::cell::UnsafeCell;
use std::sync::Arc;
use cache_padded::CachePadded;
#[derive(Debug)]
pub struct Log<T> {
len: CachePadded<AtomicUsize>,
capacity: usize,
data: Vec<UnsafeCell<Option<T>>>,
}
impl<T> Log<T> {
pub fn new(capacity: usize) -> Self {
let capacity = capacity.max(1);
let mut data = Vec::with_capacity(capacity);
for _ in 0..capacity {
data.push(UnsafeCell::new(None));
}
Self {
capacity,
len: CachePadded::new(AtomicUsize::new(0)),
data,
}
}
#[inline]
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed).min(self.capacity())
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get(&self, index: usize) -> Option<&T> {
if index >= self.len() {
return None;
}
let cell = &self.data[index];
unsafe { (*cell.get()).as_ref() }
}
pub fn push(&self, value: T) -> Result<usize, LogError<T>> {
let token = self.len.fetch_add(1, Ordering::Relaxed);
if token >= self.capacity() {
return Err(LogError::LogCapacityExceeded(value));
}
let cell = &self.data[token];
let slot = unsafe { &mut *cell.get() };
*slot = Some(value);
Ok(token)
}
}
unsafe impl<T: Sync + Send> Send for Log<T> {}
unsafe impl<T: Sync + Send> Sync for Log<T> {}
impl<T> Log<T> {
pub fn into_sender(self: Arc<Self>) -> Sender<T> {
Sender { log: self }
}
pub fn into_receiver(self: Arc<Self>) -> Receiver<T> {
Receiver { log: self }
}
pub fn iter(&self) -> LogReaderIterator<T> {
LogReaderIterator { idx: 0, log: self }
}
}
pub fn open<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let log = Arc::new(Log::new(capacity));
(Sender { log: log.clone() }, Receiver { log })
}
#[derive(Debug, Clone)]
pub struct Sender<T> {
log: Arc<Log<T>>,
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<usize, LogError<T>> {
self.log.push(value)
}
pub fn into_inner(self) -> Arc<Log<T>> {
self.log
}
}
#[derive(Debug, Clone)]
pub struct Receiver<T> {
log: Arc<Log<T>>,
}
impl<T> Receiver<T> {
pub fn recv(&self, index: usize) -> Option<&T> {
self.log.get(index)
}
pub fn into_inner(self) -> Arc<Log<T>> {
self.log
}
}
pub struct LogReaderIterator<'a, T> {
idx: usize,
log: &'a Log<T>,
}
impl<'a, T> Iterator for LogReaderIterator<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
let idx = self.idx;
self.idx += 1;
self.log.get(idx)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use log::debug;
use crate::sync::thread;
use super::*;
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[test]
#[cfg(loom)]
fn test_loom() {
loom::model(test_log_capacity);
loom::model(test_log_capacity_excess);
loom::model(test_log_capacity_excess_len);
loom::model(test_log_immutable_entries);
loom::model(test_basic_log);
loom::model(test_log_iter);
loom::model(test_send_recv);
loom::model(test_eventual_consistency);
}
#[test]
fn test_log_capacity() {
init();
let log: Log<u32> = Log::new(0);
assert_eq!(log.capacity(), 1);
}
#[test]
fn test_log_capacity_excess() {
init();
let log = Log::new(1);
log.push(0).unwrap();
assert!(log.push(1).is_err());
}
#[test]
fn test_log_capacity_excess_len() {
init();
let log = Log::new(1);
log.push(0).unwrap();
log.push(1).unwrap_err();
log.push(2).unwrap_err();
log.push(3).unwrap_err();
log.push(4).unwrap_err();
assert_eq!(log.len(), 1);
}
#[test]
fn test_log_immutable_entries() {
init();
let log = Log::new(200);
log.push(0).unwrap();
log.push(42).unwrap();
assert_eq!(log.get(1).map(|s| *s), Some(42));
for i in 0..100 {
log.push(i).unwrap();
}
assert_eq!(log.get(1).map(|s| *s), Some(42));
}
#[test]
fn test_basic_log() {
init();
let log = Log::new(3);
log.push(1).unwrap();
log.push(2).unwrap();
log.push(3).unwrap();
assert_eq!(log.get(0), Some(&1));
assert_eq!(log.get(1), Some(&2));
assert_eq!(log.get(2), Some(&3));
assert_eq!(log.get(3), None);
}
#[test]
fn test_log_iter() {
init();
let log = Log::new(3);
log.push(1).unwrap();
log.push(2).unwrap();
log.push(3).unwrap();
let mut iter = log.iter();
assert_eq!(iter.next(), Some(&1));
assert_eq!(iter.next(), Some(&2));
assert_eq!(iter.next(), Some(&3));
assert_eq!(iter.next(), None);
}
#[test]
fn test_send_recv() {
init();
let (tx, rx) = open(4);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
assert_eq!(rx.recv(0), Some(&1));
assert_eq!(rx.recv(1), Some(&2));
assert_eq!(rx.recv(2), Some(&3));
assert_eq!(rx.recv(3), None);
tx.into_inner().push(4).unwrap();
assert_eq!(rx.recv(3), Some(&4));
}
#[test]
fn test_eventual_consistency() {
init();
let vec = Arc::new(Log::new(2));
let v1 = vec.clone();
let v2 = vec.clone();
let h1 = thread::spawn(move || {
v1.push('a').unwrap();
let x0 = v1.get(0);
let x1 = v1.get(1);
(x0.cloned(), x1.cloned())
});
let h2 = thread::spawn(move || {
v2.push('b').unwrap();
let x0 = v2.get(0);
let x1 = v2.get(1);
(x0.cloned(), x1.cloned())
});
let (x0h1, x1h1) = h1.join().unwrap();
let (x0h2, x1h2) = h2.join().unwrap();
let (x0, x1) = (vec.get(0).cloned(), vec.get(1).cloned());
debug!(
"0: h1(a) {:<10} h2(c) {:<10} f {:<10}",
format!("{:?}", x0h1),
format!("{:?}", x0h2),
format!("{:?}", x0)
);
debug!(
"1: h1(b) {:<10} h2(d) {:<10} f {:<10}",
format!("{:?}", x1h1),
format!("{:?}", x1h2),
format!("{:?}", x1)
);
debug!("");
match (x0h1, x1h1, x0h2, x1h2) {
(None, None, _, _) | (_, _, None, None) => {
assert!(false, "1|2: (Read your own write)");
}
(None, Some(_), None, Some(_)) => {
assert!(false, "1: (Read your own write)");
}
(Some(_), None, Some(_), None) => {
assert!(false, "2: (Read your own write)");
}
(None, Some(_), Some(_), None) => {
assert!(false, "(Observed state are global)");
}
(Some(a), None, None, Some(d)) => {
assert_eq!(Some(a), x0, "a == x0 (Observed state are immutable)");
assert_eq!(Some(d), x1, "d == x1 (Observed state are immutable)");
}
(None, Some(b), Some(c), Some(d)) => {
assert_eq!(b, d, "b == d (Observed state are in-order)");
assert_eq!(Some(b), x1, "b == x1 (Observed state are immutable)");
assert_eq!(Some(c), x0, "c == x0 (Observed state are immutable)");
assert_eq!(Some(d), x1, "d == x1 (Observed state are immutable)");
}
(Some(a), None, Some(c), Some(d)) => {
assert_eq!(a, c, "a == c (Observed state are in-order)");
assert_eq!(Some(a), x0, "a == x0 (Observed state are immutable)");
assert_eq!(Some(c), x0, "c == x0 (Observed state are immutable)");
assert_eq!(Some(d), x1, "d == x1 (Observed state are immutable)");
}
(Some(a), Some(b), Some(c), None) => {
assert_eq!(a, c, "a == c (Observed state are in-order)");
assert_eq!(Some(a), x0, "a == x0 (Observed state are immutable)");
assert_eq!(Some(b), x1, "b == x1 (Observed state are immutable)");
assert_eq!(Some(c), x0, "c == x0 (Observed state are immutable)");
}
(Some(a), Some(b), None, Some(d)) => {
assert_eq!(b, d, "b == d (Observed state are in-order)");
assert_eq!(Some(a), x0, "a == x0 (Observed state are immutable)");
assert_eq!(Some(b), x1, "b == x1 (Observed state are immutable)");
assert_eq!(Some(d), x1, "d == x1 (Observed state are immutable)");
}
(Some(a), Some(b), Some(c), Some(d)) => {
assert_eq!(a, c, "a == c");
assert_eq!(b, d, "b == d");
assert_eq!(Some(a), x0, "a == x0 (Observed state are immutable)");
assert_eq!(Some(b), x1, "b == x1 (Observed state are immutable)");
assert_eq!(Some(c), x0, "c == x0 (Observed state are immutable)");
assert_eq!(Some(d), x1, "d == x1 (Observed state are immutable)");
}
}
let pair = [x0, x1];
assert!(
pair == [Some('a'), Some('b')] || pair == [Some('b'), Some('a')],
"final state is always complete."
);
}
}