use super::Absorb;
use crate::read::ReadHandle;
use std::collections::VecDeque;
use std::ptr::NonNull;
use std::sync::atomic;
use std::sync::{Arc, MutexGuard};
use std::{fmt, thread};
pub struct WriteHandle<T, O>
where
T: Absorb<O>,
{
epochs: crate::Epochs,
w_handle: NonNull<T>,
oplog: VecDeque<O>,
swap_index: usize,
r_handle: ReadHandle<T>,
last_epochs: Vec<usize>,
#[cfg(test)]
refreshes: usize,
first: bool,
second: bool,
}
unsafe impl<T, O> Send for WriteHandle<T, O>
where
T: Absorb<O>,
T: Send,
O: Send,
ReadHandle<T>: Send,
{
}
impl<T, O> fmt::Debug for WriteHandle<T, O>
where
T: Absorb<O> + fmt::Debug,
O: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WriteHandle")
.field("epochs", &self.epochs)
.field("w_handle", &self.w_handle)
.field("oplog", &self.oplog)
.field("swap_index", &self.swap_index)
.field("r_handle", &self.r_handle)
.field("first", &self.first)
.field("second", &self.second)
.finish()
}
}
impl<T, O> Drop for WriteHandle<T, O>
where
T: Absorb<O>,
{
fn drop(&mut self) {
use std::ptr;
if !self.oplog.is_empty() {
self.publish();
}
if !self.oplog.is_empty() {
self.publish();
}
assert!(self.oplog.is_empty());
let r_handle = self
.r_handle
.inner
.swap(ptr::null_mut(), atomic::Ordering::Release);
let epochs = Arc::clone(&self.epochs);
let mut epochs = epochs.lock().unwrap();
self.wait(&mut epochs);
atomic::fence(atomic::Ordering::SeqCst);
Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
Absorb::drop_second(unsafe { Box::from_raw(r_handle) });
}
}
impl<T, O> WriteHandle<T, O>
where
T: Absorb<O>,
{
pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle<T>) -> Self {
Self {
epochs,
w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
oplog: VecDeque::new(),
swap_index: 0,
r_handle,
last_epochs: Vec::new(),
#[cfg(test)]
refreshes: 0,
first: true,
second: true,
}
}
fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<atomic::AtomicUsize>>>) {
let mut iter = 0;
let mut starti = 0;
self.last_epochs.resize(epochs.capacity(), 0);
'retry: loop {
for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
if self.last_epochs[ri] % 2 == 0 {
continue;
}
let now = epoch.load(atomic::Ordering::Acquire);
if now != self.last_epochs[ri] {
} else {
starti = ii;
if iter != 20 {
iter += 1;
} else {
thread::yield_now();
}
continue 'retry;
}
}
break;
}
}
pub fn publish(&mut self) -> &mut Self {
let epochs = Arc::clone(&self.epochs);
let mut epochs = epochs.lock().unwrap();
self.wait(&mut epochs);
if !self.first {
let w_handle = unsafe { self.w_handle.as_mut() };
let r_handle = unsafe {
self.r_handle
.inner
.load(atomic::Ordering::Acquire)
.as_ref()
.unwrap()
};
if self.second {
Absorb::sync_with(w_handle, r_handle);
self.second = false
}
if self.swap_index != 0 {
for op in self.oplog.drain(0..self.swap_index) {
T::absorb_second(w_handle, op, r_handle);
}
}
for op in self.oplog.iter_mut() {
T::absorb_first(w_handle, op, r_handle);
}
self.swap_index = self.oplog.len();
} else {
self.first = false
}
let r_handle = self
.r_handle
.inner
.swap(self.w_handle.as_ptr(), atomic::Ordering::Release);
self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
atomic::fence(atomic::Ordering::SeqCst);
for (ri, epoch) in epochs.iter() {
self.last_epochs[ri] = epoch.load(atomic::Ordering::Acquire);
}
#[cfg(test)]
{
self.refreshes += 1;
}
self
}
pub fn flush(&mut self) {
if self.has_pending_operations() {
self.publish();
}
}
pub fn has_pending_operations(&self) -> bool {
self.swap_index < self.oplog.len()
}
pub fn append(&mut self, op: O) -> &mut Self {
self.extend(std::iter::once(op));
self
}
pub fn raw_write_handle(&mut self) -> NonNull<T> {
self.w_handle
}
}
use std::ops::Deref;
impl<T, O> Deref for WriteHandle<T, O>
where
T: Absorb<O>,
{
type Target = ReadHandle<T>;
fn deref(&self) -> &Self::Target {
&self.r_handle
}
}
impl<T, O> Extend<O> for WriteHandle<T, O>
where
T: Absorb<O>,
{
fn extend<I>(&mut self, ops: I)
where
I: IntoIterator<Item = O>,
{
if self.first {
let mut w_inner = self.raw_write_handle();
let w_inner = unsafe { w_inner.as_mut() };
let r_handle = self.enter().expect("map has not yet been destroyed");
for op in ops {
Absorb::absorb_second(w_inner, op, &*r_handle);
}
} else {
self.oplog.extend(ops);
}
}
}
#[allow(dead_code)]
struct CheckWriteHandleSend;
#[cfg(test)]
mod tests {
use crate::CounterAddOp;
#[test]
fn flush_noblock() {
let (mut w, r) = crate::new::<i32, _>();
w.append(CounterAddOp(42));
w.publish();
assert_eq!(*r.enter().unwrap(), 42);
let _count = r.enter();
assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
assert!(!w.has_pending_operations());
}
#[test]
fn flush_no_refresh() {
let (mut w, _) = crate::new::<i32, _>();
assert!(!w.has_pending_operations());
w.publish();
assert!(!w.has_pending_operations());
assert_eq!(w.refreshes, 1);
w.append(CounterAddOp(42));
assert!(w.has_pending_operations());
w.publish();
assert!(!w.has_pending_operations());
assert_eq!(w.refreshes, 2);
w.append(CounterAddOp(42));
assert!(w.has_pending_operations());
w.publish();
assert!(!w.has_pending_operations());
assert_eq!(w.refreshes, 3);
assert!(!w.has_pending_operations());
w.publish();
assert_eq!(w.refreshes, 4);
}
}