use super::{Operation, ShallowCopy};
use inner::{Inner, Values};
use read::ReadHandle;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash};
use std::sync::atomic;
use std::sync::{Arc, MutexGuard};
use std::{mem, thread};
#[cfg(feature = "hashbrown")]
use hashbrown::hash_map::Entry;
#[cfg(not(feature = "hashbrown"))]
use std::collections::hash_map::Entry;
pub struct WriteHandle<K, V, M = (), S = RandomState>
where
K: Eq + Hash + Clone,
S: BuildHasher + Clone,
V: Eq + ShallowCopy,
M: 'static + Clone,
{
w_handle: Option<Box<Inner<K, V, M, S>>>,
oplog: Vec<Operation<K, V>>,
swap_index: usize,
r_handle: ReadHandle<K, V, M, S>,
last_epochs: Vec<usize>,
meta: M,
first: bool,
second: bool,
drop_dont_refresh: bool,
}
pub(crate) fn new<K, V, M, S>(
w_handle: Inner<K, V, M, S>,
r_handle: ReadHandle<K, V, M, S>,
) -> WriteHandle<K, V, M, S>
where
K: Eq + Hash + Clone,
S: BuildHasher + Clone,
V: Eq + ShallowCopy,
M: 'static + Clone,
{
let m = w_handle.meta.clone();
WriteHandle {
w_handle: Some(Box::new(w_handle)),
oplog: Vec::new(),
swap_index: 0,
r_handle: r_handle,
last_epochs: Vec::new(),
meta: m,
first: true,
second: false,
drop_dont_refresh: false,
}
}
impl<K, V, M, S> Drop for WriteHandle<K, V, M, S>
where
K: Eq + Hash + Clone,
S: BuildHasher + Clone,
V: Eq + ShallowCopy,
M: 'static + Clone,
{
fn drop(&mut self) {
if !self.drop_dont_refresh {
if !self.oplog.is_empty() {
self.refresh();
}
if !self.oplog.is_empty() {
self.refresh();
}
}
assert!(self.oplog.is_empty());
for (_, mut vs) in self.w_handle.as_mut().unwrap().data.drain() {
#[cfg(not(feature = "smallvec"))]
let drain = vs.drain(..);
#[cfg(feature = "smallvec")]
let drain = vs.drain();
for v in drain {
mem::forget(v);
}
}
}
}
impl<K, V, M, S> WriteHandle<K, V, M, S>
where
K: Eq + Hash + Clone,
S: BuildHasher + Clone,
V: Eq + ShallowCopy,
M: 'static + Clone,
{
fn wait(&mut self, epochs: &mut MutexGuard<Vec<Arc<atomic::AtomicUsize>>>) {
let mut iter = 0;
let mut starti = 0;
let high_bit = 1usize << (mem::size_of::<usize>() * 8 - 1);
self.last_epochs.resize(epochs.len(), 0);
'retry: loop {
for (i, epoch) in epochs.iter().enumerate().skip(starti) {
if self.last_epochs[i] & high_bit != 0 {
continue;
}
let now = epoch.load(atomic::Ordering::Acquire);
if (now != self.last_epochs[i]) | (now & high_bit != 0) | (now == 0) {
} else {
starti = i;
if iter != 20 {
iter += 1;
} else {
thread::yield_now();
}
continue 'retry;
}
}
break;
}
}
pub fn refresh(&mut self) {
let epochs = Arc::clone(&self.w_handle.as_ref().unwrap().epochs);
let mut epochs = epochs.lock().unwrap();
self.wait(&mut epochs);
{
let w_handle = self.w_handle.as_mut().unwrap();
if self.second {
let r_handle = unsafe {
self.r_handle
.inner
.load(atomic::Ordering::Relaxed)
.as_mut()
.unwrap()
};
w_handle
.data
.extend(r_handle.data.iter_mut().map(|(k, vs)| {
(
k.clone(),
vs.iter_mut().map(|v| unsafe { v.shallow_copy() }).collect(),
)
}));
}
if self.swap_index != 0 {
for op in self.oplog.drain(0..self.swap_index) {
Self::apply_second(w_handle, op);
}
}
for op in self.oplog.iter_mut() {
Self::apply_first(w_handle, op);
}
self.swap_index = self.oplog.len();
w_handle.meta = self.meta.clone();
w_handle.mark_ready();
}
let w_handle = self.w_handle.take().unwrap();
let w_handle = Box::into_raw(w_handle);
let r_handle = self
.r_handle
.inner
.swap(w_handle, atomic::Ordering::Release);
let r_handle = unsafe { Box::from_raw(r_handle) };
atomic::fence(atomic::Ordering::SeqCst);
for (i, epoch) in epochs.iter().enumerate() {
self.last_epochs[i] = epoch.load(atomic::Ordering::Acquire);
}
self.w_handle = Some(r_handle);
self.second = self.first;
self.first = false;
}
pub fn destroy(mut self) {
use std::ptr;
self.refresh();
self.refresh();
let r_handle = self
.r_handle
.inner
.swap(ptr::null_mut(), atomic::Ordering::Release);
let r_handle = unsafe { Box::from_raw(r_handle) };
let epochs = Arc::clone(&self.w_handle.as_ref().unwrap().epochs);
let mut epochs = epochs.lock().unwrap();
self.wait(&mut epochs);
atomic::fence(atomic::Ordering::SeqCst);
self.drop_dont_refresh = true;
drop(self);
drop(r_handle);
}
pub fn pending(&self) -> &[Operation<K, V>] {
&self.oplog[self.swap_index..]
}
pub fn flush(&mut self) {
if !self.pending().is_empty() {
self.refresh();
}
}
pub fn set_meta(&mut self, mut meta: M) -> M {
mem::swap(&mut self.meta, &mut meta);
meta
}
fn add_op(&mut self, op: Operation<K, V>) {
if !self.first {
self.oplog.push(op);
} else {
let inner = self.w_handle.as_mut().unwrap();
Self::apply_second(inner, op);
}
}
pub fn insert(&mut self, k: K, v: V) {
self.add_op(Operation::Add(k, v));
}
pub fn update(&mut self, k: K, v: V) {
self.add_op(Operation::Replace(k, v));
}
pub fn clear(&mut self, k: K) {
self.add_op(Operation::Clear(k));
}
pub fn remove(&mut self, k: K, v: V) {
self.add_op(Operation::Remove(k, v));
}
pub fn empty(&mut self, k: K) {
self.add_op(Operation::Empty(k));
}
fn apply_first(inner: &mut Inner<K, V, M, S>, op: &mut Operation<K, V>) {
match *op {
Operation::Replace(ref key, ref mut value) => {
let vs = inner.data.entry(key.clone()).or_insert_with(Values::new);
{
#[cfg(not(feature = "smallvec"))]
let drain = vs.drain(..);
#[cfg(feature = "smallvec")]
let drain = vs.drain();
for v in drain {
mem::forget(v);
}
}
vs.push(unsafe { value.shallow_copy() });
}
Operation::Clear(ref key) => {
match inner.data.entry(key.clone()) {
Entry::Occupied(mut occupied) => {
#[cfg(not(feature = "smallvec"))]
let drain = occupied.get_mut().drain(..);
#[cfg(feature = "smallvec")]
let drain = occupied.get_mut().drain();
for v in drain {
mem::forget(v);
}
}
Entry::Vacant(vacant) => {
vacant.insert(Values::new());
}
}
}
Operation::Add(ref key, ref mut value) => {
inner
.data
.entry(key.clone())
.or_insert_with(Values::new)
.push(unsafe { value.shallow_copy() });
}
Operation::Empty(ref key) => {
if let Some(mut vs) = inner.data.remove(key) {
#[cfg(not(feature = "smallvec"))]
let drain = vs.drain(..);
#[cfg(feature = "smallvec")]
let drain = vs.drain();
for v in drain {
mem::forget(v);
}
}
}
Operation::Remove(ref key, ref value) => {
if let Some(e) = inner.data.get_mut(key) {
if let Some(i) = e.iter().position(|v| v == value) {
let v = e.swap_remove(i);
mem::forget(v);
}
}
}
}
}
fn apply_second(inner: &mut Inner<K, V, M, S>, op: Operation<K, V>) {
match op {
Operation::Replace(key, value) => {
let v = inner.data.entry(key).or_insert_with(Values::new);
v.clear();
v.push(value);
}
Operation::Clear(key) => {
let v = inner.data.entry(key).or_insert_with(Values::new);
v.clear();
}
Operation::Add(key, value) => {
inner
.data
.entry(key)
.or_insert_with(Values::new)
.push(value);
}
Operation::Empty(key) => {
inner.data.remove(&key);
}
Operation::Remove(key, value) => {
if let Some(e) = inner.data.get_mut(&key) {
if let Some(i) = e.iter().position(|v| v == &value) {
e.swap_remove(i);
}
}
}
}
}
}
impl<K, V, M, S> Extend<(K, V)> for WriteHandle<K, V, M, S>
where
K: Eq + Hash + Clone,
S: BuildHasher + Clone,
V: Eq + ShallowCopy,
M: 'static + Clone,
{
fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
for (k, v) in iter {
self.insert(k, v);
}
}
}
use std::ops::Deref;
impl<K, V, M, S> Deref for WriteHandle<K, V, M, S>
where
K: Eq + Hash + Clone,
S: BuildHasher + Clone,
V: Eq + ShallowCopy,
M: 'static + Clone,
{
type Target = ReadHandle<K, V, M, S>;
fn deref(&self) -> &Self::Target {
&self.r_handle
}
}