1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::mem;
use std::sync::Arc;
use std::sync::atomic;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::thread;

use crate::{WeakEpoch, Epochs, Inner, OperationCache, USIZE_MSB};

/// A handle which allows accessing the inner data mutably through operations.
pub struct WriteHandle<T: OperationCache> {
    writers_inner: Arc<AtomicPtr<Inner<T>>>,
    readers_inner: Arc<AtomicPtr<Inner<T>>>,

    epochs: Epochs,
    last_epochs: Vec<usize>,

    ops: Vec<T::Operation>,
}

impl<T: OperationCache> WriteHandle<T> {
    pub(crate) fn new(writers_inner: Arc<AtomicPtr<Inner<T>>>, readers_inner: Arc<AtomicPtr<Inner<T>>>, epochs: Epochs) -> Self {
        Self {
            writers_inner,
            readers_inner,

            epochs,
            last_epochs: Vec::new(),
            ops: Vec::new(),
        }
    }
    /// Mutate the inner data using an operation.
    pub fn write(&mut self, operation: T::Operation) {
        self.ops.push(operation)
    }
    fn wait(&mut self, epochs: &mut Vec<WeakEpoch>) {
        let mut start_index = 0;
        let mut retry_count = 0;

        self.last_epochs.resize(epochs.len(), 0);

        'retrying: loop {
            for index in start_index..self.last_epochs.len() {
                // Delete the reader from the epochs if the reader has dropped.
                let epoch = match epochs[index].upgrade() {
                    Some(e) => e,
                    None => {
                        epochs.remove(index);
                        self.last_epochs.remove(index);

                        // TODO: Maybe this "garbage collecting could happen in another loop?
                        start_index = 0;
                        continue 'retrying
                    }
                };

                if self.last_epochs[index] & USIZE_MSB != 0 {
                    continue
                }

                let current_epoch = epoch.load(Ordering::Acquire);
                
                if current_epoch == self.last_epochs[index] && current_epoch & USIZE_MSB == 0 && current_epoch != 0 {
                    start_index = index;

                    if retry_count < 32 {
                        retry_count += 1;
                    } else {
                        thread::yield_now();
                    }

                    continue 'retrying
                }
            }
            break
        }
    }
    /// Refresh the queued writes, making the changes visible to readers.
    pub fn refresh(&mut self) {
        let epochs = Arc::clone(&self.epochs);
        let mut epochs = epochs.lock().unwrap();
        self.wait(&mut epochs);

        let w_handle = &mut unsafe {
            self.writers_inner.load(Ordering::Relaxed).as_mut().unwrap()
        }.value;

        for operation in self.ops.iter().cloned() {
            w_handle.apply_operation(operation);
        }

        // Swap the pointers.
        let writers_inner = self.writers_inner.swap(self.readers_inner.load(Ordering::Relaxed), Ordering::Release);
        self.readers_inner.store(writers_inner, Ordering::Release);

        atomic::fence(Ordering::SeqCst);

        for (i, epoch) in epochs.iter().enumerate() {
            if let Some(e) = epoch.upgrade() {
                self.last_epochs[i] = e.load(Ordering::Acquire);
            }
        }

        let w_handle = &mut unsafe {
            self.writers_inner.load(Ordering::Relaxed).as_mut().unwrap()
        }.value;

        for operation in self.ops.drain(0..self.ops.len()) {
            w_handle.apply_operation(operation)
        }
    }
}

impl<T: OperationCache> Drop for WriteHandle<T> {
    fn drop(&mut self) {
        if !self.ops.is_empty() {
            self.refresh();
        }
        assert!(self.ops.is_empty());

        let writers_inner = self.writers_inner.load(Ordering::Relaxed);
        mem::drop(unsafe { Box::from_raw(writers_inner) });

        // The readers should be able to continue reading after this writer has gone, and thus they
        // should be responsible for destroying their handle.
    }
}