1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
use inner::Inner;

use std::sync;
use std::sync::atomic;
use std::sync::atomic::AtomicPtr;
use std::hash::{Hash, BuildHasher};
use std::collections::hash_map::RandomState;
use std::borrow::Borrow;
use std::iter::FromIterator;

/// A handle that may be used to read from the eventually consistent map.
///
/// Note that any changes made to the map will not be made visible until the writer calls
/// `refresh()`. In other words, all operations performed on a `ReadHandle` will *only* see writes
/// to the map that preceeded the last call to `refresh()`.
#[derive(Clone)]
pub struct ReadHandle<K, V, M = (), S = RandomState>
    where K: Eq + Hash,
          S: BuildHasher
{
    pub(crate) inner: sync::Arc<AtomicPtr<sync::Arc<Inner<K, V, M, S>>>>,
}

pub fn new<K, V, M, S>(inner: Inner<K, V, M, S>) -> ReadHandle<K, V, M, S>
    where K: Eq + Hash,
          S: BuildHasher
{
    let store = Box::into_raw(Box::new(sync::Arc::new(inner)));
    ReadHandle { inner: sync::Arc::new(AtomicPtr::new(store)) }
}

impl<K, V, M, S> ReadHandle<K, V, M, S>
    where K: Eq + Hash,
          S: BuildHasher,
          M: Clone
{
    fn with_handle<F, T>(&self, f: F) -> T
        where F: FnOnce(&Inner<K, V, M, S>) -> T
    {
        use std::mem;
        let r_handle = unsafe { Box::from_raw(self.inner.load(atomic::Ordering::SeqCst)) };
        let rs: sync::Arc<_> = (&*r_handle).clone();

        let r_handle_again = unsafe { Box::from_raw(self.inner.load(atomic::Ordering::SeqCst)) };
        #[allow(collapsible_if)]
        let res = if !sync::Arc::ptr_eq(&*r_handle, &*r_handle_again) {
            // a swap happened under us.
            //
            // let's first figure out where the writer can possibly be at this point. since we *do*
            // have a clone of an Arc, and a pointer swap has happened, we must be in one of the
            // following cases:
            //
            //  (a) we read and cloned, *then* a writer swapped, checked for uniqueness, and blocks
            //  (b) we read, then a writer swapped, checked for uniqueness, and continued
            //
            // in (a), we know that the writer that did the pointer swap is waiting on our Arc (rs)
            // we also know that we *ought* to be using a clone of the *new* pointer to read from.
            // since the writer won't do anything until we release rs, we can just do a new clone,
            // and *then* release the old Arc.
            //
            // in (b), we know that there is a writer that thinks it owns rs, and so it is not safe
            // to use rs. how about Arc(r_handle_again)? the only way that would be unsafe to use
            // would be if a writer has gone all the way through the pointer swap and Arc
            // uniqueness test *after* we read r_handle_again. can that have happened? no. if a
            // second writer came along after we read r_handle_again, and wanted to swap, it would
            // get r_handle from its atomic pointer swap. since we're still holding an
            // Arc(r_handle), it would fail the uniqueness test, and therefore block at that point.
            // thus, we know that no writer is currently modifying r_handle_again (and won't be for
            // as long as we hold rs).
            let rs2: sync::Arc<_> = (&*r_handle_again).clone();
            let res = f(&*rs2);
            drop(rs2);
            drop(rs);
            res
        } else {
            // are we actually safe in this case? could there not have been two swaps in a row,
            // making the value r_handle -> r_handle_again -> r_handle? well, there could, but that
            // implies that r_handle is safe to use. why? well, for this to have happened, we must
            // have read the pointer, then a writer went runs swap() (potentially multiple times).
            // then, at some point, we cloned(). then, we read r_handle_again to be equal to
            // r_handle.
            //
            // the moment we clone, we immediately prevent any writer from taking ownership of
            // r_handle. however, what if the current writer thinks that it owns r_handle? well, we
            // read r_handle_again == r_handle, which means that at some point *after the clone*, a
            // writer made r_handle read owned. since no writer can take ownership of r_handle
            // after we cloned it, we know that r_handle must still be owned by readers.
            let res = f(&*rs);
            drop(rs);
            res
        };
        mem::forget(r_handle); // don't free the Box!
        mem::forget(r_handle_again); // don't free the Box!
        res
    }

    /// Returns the number of non-empty keys present in the map.
    pub fn len(&self) -> usize {
        self.with_handle(|inner| inner.data.len())
    }

    /// Returns true if the map contains no elements.
    pub fn is_empty(&self) -> bool {
        self.with_handle(|inner| inner.data.is_empty())
    }

    /// Get the current meta value.
    pub fn meta(&self) -> M {
        self.with_handle(|inner| inner.meta.clone())
    }

    /// Applies a function to the values corresponding to the key, and returns the result.
    ///
    /// The key may be any borrowed form of the map's key type, but `Hash` and `Eq` on the borrowed
    /// form *must* match those for the key type.
    ///
    /// Note that not all writes will be included with this read -- only those that have been
    /// refreshed by the writer. If no refresh has happened, this function returns `None`.
    ///
    /// If no values exist for the given key, the function will not be called, and `None` will be
    /// returned.
    pub fn get_and<Q: ?Sized, F, T>(&self, key: &Q, then: F) -> Option<T>
        where F: FnOnce(&[V]) -> T,
              K: Borrow<Q>,
              Q: Hash + Eq
    {
        self.with_handle(move |inner| if !inner.is_ready() {
            None
        } else {
            inner.data.get(key).map(move |v| then(&**v))
        })
    }

    /// Applies a function to the values corresponding to the key, and returns the result alongside
    /// the meta information.
    ///
    /// The key may be any borrowed form of the map's key type, but `Hash` and `Eq` on the borrowed
    /// form *must* match those for the key type.
    ///
    /// Note that not all writes will be included with this read -- only those that have been
    /// refreshed by the writer. If no refresh has happened, this function returns `None`.
    ///
    /// If no values exist for the given key, the function will still be passed an empty list, and
    /// `Some` will be returned.
    pub fn meta_get_and<Q: ?Sized, F, T>(&self, key: &Q, then: F) -> Option<(T, M)>
        where F: FnOnce(&[V]) -> T,
              K: Borrow<Q>,
              Q: Hash + Eq
    {
        self.with_handle(move |inner| if !inner.is_ready() {
            None
        } else {
            let res = then(inner.data.get(key).map(|v| &**v).unwrap_or(&[]));
            let res = (res, inner.meta.clone());
            Some(res)
        })
    }

    /// Returns true if the map contains any values for the specified key.
    ///
    /// The key may be any borrowed form of the map's key type, but `Hash` and `Eq` on the borrowed
    /// form *must* match those for the key type.
    pub fn contains_key<Q: ?Sized>(&self, key: &Q) -> bool
        where K: Borrow<Q>,
              Q: Hash + Eq
    {
        self.with_handle(move |inner| inner.data.contains_key(key))
    }

    /// Read all values in the map, and transform them into a new collection.
    ///
    /// Be careful with this function! While the iteration is ongoing, any writer that tries to
    /// refresh will block waiting on this reader to finish.
    pub fn for_each<F>(&self, mut f: F)
        where F: FnMut(&K, &[V])
    {
        self.with_handle(move |inner| for (k, vs) in &inner.data {
            f(k, &vs[..])
        })
    }

    /// Read all values in the map, and transform them into a new collection.
    pub fn map_into<Map, Collector, Target>(&self, mut f: Map) -> Collector
        where Map: FnMut(&K, &[V]) -> Target,
              Collector: FromIterator<Target>
    {
        self.with_handle(move |inner| {
            Collector::from_iter(inner.data.iter().map(|(k, vs)| f(k, &vs[..])))
        })
    }
}