1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
use super::Operation;
use inner::Inner;
use read::ReadHandle;

use std::sync;
use std::sync::atomic;
use std::hash::{Hash, BuildHasher};

/// A handle that may be used to modify the eventually consistent map.
///
/// Note that any changes made to the map will not be made visible to readers until `refresh()` is
/// called.
///
/// # Examples
/// ```
/// let x = ('x', 42);
///
/// let (r, mut w) = evmap::new();
///
/// // the map is uninitialized, so all lookups should return None
/// assert_eq!(r.get_and(&x.0, |rs| rs.len()), None);
///
/// w.refresh();
///
/// // after the first refresh, it is empty, but ready
/// assert_eq!(r.get_and(&x.0, |rs| rs.len()), None);
///
/// w.insert(x.0, x);
///
/// // it is empty even after an add (we haven't refresh yet)
/// assert_eq!(r.get_and(&x.0, |rs| rs.len()), None);
///
/// w.refresh();
///
/// // but after the swap, the record is there!
/// assert_eq!(r.get_and(&x.0, |rs| rs.len()), Some(1));
/// assert_eq!(r.get_and(&x.0, |rs| rs.iter().any(|v| v.0 == x.0 && v.1 == x.1)), Some(true));
/// ```
pub struct WriteHandle<K, V, M, S>
    where K: Eq + Hash,
          S: BuildHasher
{
    w_handle: Option<Box<sync::Arc<Inner<K, V, M, S>>>>,
    oplog: Vec<Operation<K, V>>,
    r_handle: ReadHandle<K, V, M, S>,
    first: bool,
}

pub fn new<K, V, M, S>(w_handle: Inner<K, V, M, S>,
                       r_handle: ReadHandle<K, V, M, S>)
                       -> WriteHandle<K, V, M, S>
    where K: Eq + Hash,
          S: BuildHasher
{
    WriteHandle {
        w_handle: Some(Box::new(sync::Arc::new(w_handle))),
        oplog: Vec::new(),
        r_handle: r_handle,
        first: true,
    }
}

impl<K, V, M, S> WriteHandle<K, V, M, S>
    where K: Eq + Hash + Clone,
          S: BuildHasher + Clone,
          V: Eq + Clone,
          M: 'static + Clone
{
    /// Refresh the handle used by readers so that pending writes are made visible.
    ///
    /// This method needs to wait for all readers to move to the new handle so that it can replay
    /// the operational log onto the stale map copy the readers used to use. This can take some
    /// time, especially if readers are executing slow operations, or if there are many of them.
    pub fn refresh(&mut self) {
        use std::thread;

        // at this point, we have exclusive access to w_handle, and it is up-to-date with all writes
        // r_handle is accessed by readers through a sync::Weak upgrade, and has old data
        // w_log contains all the changes that are in w_handle, but not in r_handle
        //
        // we're going to do the following:
        //
        //  - atomically swap in a weak pointer to the current w_handle into the BufferedStore,
        //    letting readers see new and updated state
        //  - store r_handle as our new w_handle
        //  - wait until we have exclusive access to this new w_handle
        //  - replay w_log onto w_handle

        // prepare w_handle
        let w_handle = self.w_handle.take().unwrap();
        let meta = w_handle.meta.clone();
        let w_handle_clone = if self.first {
            // this is the *first* swap, clone current w_handle instead of insterting all the rows
            // one-by-one
            self.first = false;
            Some(w_handle.data.clone())
        } else {
            None
        };
        let w_handle: *mut sync::Arc<_> = Box::into_raw(w_handle);

        // swap in our w_handle, and get r_handle in return
        let r_handle = self.r_handle.inner.swap(w_handle, atomic::Ordering::SeqCst);
        self.w_handle = Some(unsafe { Box::from_raw(r_handle) });

        // let readers go so they will be done with the old read Arc
        thread::yield_now();

        // now, wait for all existing readers to go away
        loop {
            if let Some(w_handle) = sync::Arc::get_mut(&mut *self.w_handle.as_mut().unwrap()) {
                // they're all gone
                // OR ARE THEY?
                // some poor reader could have *read* the pointer right before we swapped it, *but
                // not yet cloned the Arc*. we then check that there's only one strong reference,
                // *which there is*. *then* that reader upgrades their Arc => Uh-oh. *however*,
                // because of the second pointer read in readers, we know that a reader will detect
                // this case, and simply refuse to use the Arc it cloned. therefore, it is safe for
                // us to start mutating here

                // put in all the updates the read store hasn't seen
                if let Some(old_w_handle) = w_handle_clone {
                    w_handle.data = old_w_handle;
                } else {
                    for op in self.oplog.drain(..) {
                        Self::apply_op(w_handle, op);
                    }
                }
                w_handle.meta = meta;
                w_handle.mark_ready();

                // w_handle (the old r_handle) is now fully up to date!
                break;
            } else {
                thread::yield_now();
            }
        }
    }

    /// Set the metadata.
    ///
    /// Will only be visible to readers after the next call to `refresh()`.
    pub fn set_meta(&mut self, mut meta: M) -> M {
        self.with_mut(move |inner| {
            use std::mem;
            mem::swap(&mut inner.meta, &mut meta);
            meta
        })
    }

    fn add_op(&mut self, op: Operation<K, V>) {
        if self.first {
            // before the first swap, we'd rather just clone the entire map,
            // so no need to maintain an oplog (or to clone op)
            self.with_mut(|inner| { Self::apply_op(inner, op); });
        } else {
            self.with_mut(|inner| { Self::apply_op(inner, op.clone()); });
            // and also log it to later apply to the reads
            self.oplog.push(op);
        }
    }

    /// Add the given value to the value-set of the given key.
    ///
    /// The updated value-set will only be visible to readers after the next call to `refresh()`.
    pub fn insert(&mut self, k: K, v: V) {
        self.add_op(Operation::Add(k, v));
    }

    /// Replace the value-set of the given key with the given value.
    ///
    /// The new value will only be visible to readers after the next call to `refresh()`.
    pub fn update(&mut self, k: K, v: V) {
        self.add_op(Operation::Replace(k, v));
    }

    /// Remove the given value from the value-set of the given key.
    ///
    /// The updated value-set will only be visible to readers after the next call to `refresh()`.
    pub fn remove(&mut self, k: K, v: V) {
        self.add_op(Operation::Remove(k, v));
    }

    /// Remove the value-set for the given key.
    ///
    /// The value-set will only disappear from readers after the next call to `refresh()`.
    pub fn empty(&mut self, k: K) {
        self.add_op(Operation::Empty(k));
    }

    fn with_mut<F, T: 'static>(&mut self, f: F) -> T
        where F: FnOnce(&mut Inner<K, V, M, S>) -> T
    {
        let arc: &mut sync::Arc<_> = &mut *self.w_handle.as_mut().unwrap();
        loop {
            if let Some(s) = sync::Arc::get_mut(arc) {
                return f(s);
            } else {
                // writer should always be sole owner outside of swap
                // *however*, there may have been a reader who read the arc pointer before the
                // atomic pointer swap, and cloned *after* the Arc::get_mut call in swap(), so we
                // could still end up here. we know that the reader will detect its mistake and
                // drop that Arc (without using it), so we eventually end up with a unique Arc. In
                // fact, because we know the reader will never use the Arc in that case, we *could*
                // just start mutating straight away, but unfortunately Arc doesn't provide an API
                // to force this.
                use std::thread;
                thread::yield_now();
            }
        }
    }

    fn apply_op(inner: &mut Inner<K, V, M, S>, op: Operation<K, V>) {
        match op {
            Operation::Replace(key, value) => {
                let mut v = inner.data.entry(key).or_insert_with(Vec::new);
                v.clear();
                v.push(value);
            }
            Operation::Add(key, value) => {
                inner.data.entry(key).or_insert_with(Vec::new).push(value);
            }
            Operation::Empty(key) => {
                inner.data.remove(&key);
            }
            Operation::Remove(key, value) => {
                let mut now_empty = false;
                if let Some(mut e) = inner.data.get_mut(&key) {
                    // find the first entry that matches all fields
                    if let Some(i) = e.iter().position(|v| v == &value) {
                        e.swap_remove(i);
                        now_empty = e.is_empty();
                    }
                }
                if now_empty {
                    // no more entries for this key -- free up some space in the map
                    inner.data.remove(&key);
                }
            }
        }
    }
}

impl<K, V, M, S> Extend<(K, V)> for WriteHandle<K, V, M, S>
    where K: Eq + Hash + Clone,
          S: BuildHasher + Clone,
          V: Eq + Clone,
          M: 'static + Clone
{
    fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
        for (k, v) in iter {
            self.insert(k, v);
        }
    }
}

// allow using write handle for reads
use std::ops::Deref;
impl<K, V, M, S> Deref for WriteHandle<K, V, M, S>
    where K: Eq + Hash + Clone,
          S: BuildHasher + Clone,
          V: Eq + Clone,
          M: 'static + Clone
{
    type Target = ReadHandle<K, V, M, S>;
    fn deref(&self) -> &Self::Target {
        &self.r_handle
    }
}