evmap/
write.rs

1use super::{Operation, Predicate, ShallowCopy};
2use crate::inner::Inner;
3use crate::read::ReadHandle;
4use crate::values::Values;
5
6use std::collections::hash_map::RandomState;
7use std::hash::{BuildHasher, Hash};
8use std::mem::ManuallyDrop;
9use std::sync::atomic;
10use std::sync::{Arc, MutexGuard};
11use std::{fmt, mem, thread};
12
13#[cfg(feature = "indexed")]
14use indexmap::map::Entry;
15#[cfg(not(feature = "indexed"))]
16use std::collections::hash_map::Entry;
17
18/// A handle that may be used to modify the eventually consistent map.
19///
20/// Note that any changes made to the map will not be made visible to readers until `refresh()` is
21/// called.
22///
23/// When the `WriteHandle` is dropped, the map is immediately (but safely) taken away from all
24/// readers, causing all future lookups to return `None`.
25///
26/// # Examples
27/// ```
28/// let x = ('x', 42);
29///
30/// let (r, mut w) = evmap::new();
31///
32/// // the map is uninitialized, so all lookups should return None
33/// assert_eq!(r.get(&x.0).map(|rs| rs.len()), None);
34///
35/// w.refresh();
36///
37/// // after the first refresh, it is empty, but ready
38/// assert_eq!(r.get(&x.0).map(|rs| rs.len()), None);
39///
40/// w.insert(x.0, x);
41///
42/// // it is empty even after an add (we haven't refresh yet)
43/// assert_eq!(r.get(&x.0).map(|rs| rs.len()), None);
44///
45/// w.refresh();
46///
47/// // but after the swap, the record is there!
48/// assert_eq!(r.get(&x.0).map(|rs| rs.len()), Some(1));
49/// assert_eq!(r.get(&x.0).map(|rs| rs.iter().any(|v| v.0 == x.0 && v.1 == x.1)), Some(true));
50/// ```
51pub struct WriteHandle<K, V, M = (), S = RandomState>
52where
53    K: Eq + Hash + Clone,
54    S: BuildHasher + Clone,
55    V: Eq + Hash + ShallowCopy,
56    M: 'static + Clone,
57{
58    epochs: crate::Epochs,
59    w_handle: Option<Box<Inner<K, ManuallyDrop<V>, M, S>>>,
60    oplog: Vec<Operation<K, V>>,
61    swap_index: usize,
62    r_handle: ReadHandle<K, V, M, S>,
63    last_epochs: Vec<usize>,
64    meta: M,
65    first: bool,
66    second: bool,
67}
68
69impl<K, V, M, S> fmt::Debug for WriteHandle<K, V, M, S>
70where
71    K: Eq + Hash + Clone + fmt::Debug,
72    S: BuildHasher + Clone,
73    V: Eq + Hash + ShallowCopy + fmt::Debug,
74    M: 'static + Clone + fmt::Debug,
75{
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        f.debug_struct("WriteHandle")
78            .field("epochs", &self.epochs)
79            .field("w_handle", &self.w_handle)
80            .field("oplog", &self.oplog)
81            .field("swap_index", &self.swap_index)
82            .field("r_handle", &self.r_handle)
83            .field("meta", &self.meta)
84            .field("first", &self.first)
85            .field("second", &self.second)
86            .finish()
87    }
88}
89
90pub(crate) fn new<K, V, M, S>(
91    w_handle: Inner<K, ManuallyDrop<V>, M, S>,
92    epochs: crate::Epochs,
93    r_handle: ReadHandle<K, V, M, S>,
94) -> WriteHandle<K, V, M, S>
95where
96    K: Eq + Hash + Clone,
97    S: BuildHasher + Clone,
98    V: Eq + Hash + ShallowCopy,
99    M: 'static + Clone,
100{
101    let m = w_handle.meta.clone();
102    WriteHandle {
103        epochs,
104        w_handle: Some(Box::new(w_handle)),
105        oplog: Vec::new(),
106        swap_index: 0,
107        r_handle,
108        last_epochs: Vec::new(),
109        meta: m,
110        first: true,
111        second: false,
112    }
113}
114
115impl<K, V, M, S> Drop for WriteHandle<K, V, M, S>
116where
117    K: Eq + Hash + Clone,
118    S: BuildHasher + Clone,
119    V: Eq + Hash + ShallowCopy,
120    M: 'static + Clone,
121{
122    fn drop(&mut self) {
123        use std::ptr;
124
125        // first, ensure both maps are up to date
126        // (otherwise safely dropping deduplicated rows is a pain)
127        if !self.oplog.is_empty() {
128            self.refresh();
129        }
130        if !self.oplog.is_empty() {
131            self.refresh();
132        }
133        assert!(self.oplog.is_empty());
134
135        // next, grab the read handle and set it to NULL
136        let r_handle = self
137            .r_handle
138            .inner
139            .swap(ptr::null_mut(), atomic::Ordering::Release);
140
141        // now, wait for all readers to depart
142        let epochs = Arc::clone(&self.epochs);
143        let mut epochs = epochs.lock().unwrap();
144        self.wait(&mut epochs);
145
146        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
147        atomic::fence(atomic::Ordering::SeqCst);
148
149        let w_handle = &mut self.w_handle.as_mut().unwrap().data;
150
151        // all readers have now observed the NULL, so we own both handles.
152        // all records are duplicated between w_handle and r_handle.
153        // since the two maps are exactly equal, we need to make sure that we *don't* call the
154        // destructors of any of the values that are in our map, as they'll all be called when the
155        // last read handle goes out of scope. to do so, we first clear w_handle, which won't drop
156        // any elements since its values are kept as ManuallyDrop:
157        w_handle.clear();
158
159        // then we transmute r_handle to remove the ManuallyDrop, and then drop it, which will free
160        // all the records. this is safe, since we know that no readers are using this pointer
161        // anymore (due to the .wait() following swapping the pointer with NULL).
162        drop(unsafe { Box::from_raw(r_handle as *mut Inner<K, V, M, S>) });
163    }
164}
165
166impl<K, V, M, S> WriteHandle<K, V, M, S>
167where
168    K: Eq + Hash + Clone,
169    S: BuildHasher + Clone,
170    V: Eq + Hash + ShallowCopy,
171    M: 'static + Clone,
172{
173    fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<atomic::AtomicUsize>>>) {
174        let mut iter = 0;
175        let mut starti = 0;
176        let high_bit = 1usize << (mem::size_of::<usize>() * 8 - 1);
177        // we're over-estimating here, but slab doesn't expose its max index
178        self.last_epochs.resize(epochs.capacity(), 0);
179        'retry: loop {
180            // read all and see if all have changed (which is likely)
181            for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
182                // note that `ri` _may_ have been re-used since we last read into last_epochs.
183                // this is okay though, as a change still implies that the new reader must have
184                // arrived _after_ we did the atomic swap, and thus must also have seen the new
185                // pointer.
186                if self.last_epochs[ri] & high_bit != 0 {
187                    // reader was not active right after last swap
188                    // and therefore *must* only see new pointer
189                    continue;
190                }
191
192                let now = epoch.load(atomic::Ordering::Acquire);
193                if (now != self.last_epochs[ri]) | (now & high_bit != 0) | (now == 0) {
194                    // reader must have seen last swap
195                } else {
196                    // reader may not have seen swap
197                    // continue from this reader's epoch
198                    starti = ii;
199
200                    // how eagerly should we retry?
201                    if iter != 20 {
202                        iter += 1;
203                    } else {
204                        thread::yield_now();
205                    }
206
207                    continue 'retry;
208                }
209            }
210            break;
211        }
212    }
213
214    /// Refresh the handle used by readers so that pending writes are made visible.
215    ///
216    /// This method needs to wait for all readers to move to the new handle so that it can replay
217    /// the operational log onto the stale map copy the readers used to use. This can take some
218    /// time, especially if readers are executing slow operations, or if there are many of them.
219    pub fn refresh(&mut self) -> &mut Self {
220        // we need to wait until all epochs have changed since the swaps *or* until a "finished"
221        // flag has been observed to be on for two subsequent iterations (there still may be some
222        // readers present since we did the previous refresh)
223        //
224        // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
225        // only block on pre-existing readers, and they are never waiting to push onto epochs
226        // unless they have finished reading.
227        let epochs = Arc::clone(&self.epochs);
228        let mut epochs = epochs.lock().unwrap();
229
230        self.wait(&mut epochs);
231
232        {
233            // all the readers have left!
234            // we can safely bring the w_handle up to date.
235            let w_handle = self.w_handle.as_mut().unwrap();
236
237            if self.second {
238                // before the first refresh, all writes went directly to w_handle. then, at the
239                // first refresh, r_handle and w_handle were swapped. thus, the w_handle we
240                // have now is empty, *and* none of the writes in r_handle are in the oplog.
241                // we therefore have to first clone the entire state of the current r_handle
242                // and make that w_handle, and *then* replay the oplog (which holds writes
243                // following the first refresh).
244                //
245                // this may seem unnecessarily complex, but it has the major advantage that it
246                // is relatively efficient to do lots of writes to the evmap at startup to
247                // populate it, and then refresh().
248                let r_handle = unsafe {
249                    self.r_handle
250                        .inner
251                        .load(atomic::Ordering::Relaxed)
252                        .as_mut()
253                        .unwrap()
254                };
255
256                // XXX: it really is too bad that we can't just .clone() the data here and save
257                // ourselves a lot of re-hashing, re-bucketization, etc.
258                w_handle.data.extend(r_handle.data.iter().map(|(k, vs)| {
259                    (
260                        k.clone(),
261                        Values::from_iter(
262                            vs.iter().map(|v| unsafe { (&**v).shallow_copy() }),
263                            r_handle.data.hasher(),
264                        ),
265                    )
266                }));
267            }
268
269            // safety: we will not swap while we hold this reference
270            let r_hasher = unsafe { self.r_handle.hasher() };
271
272            // the w_handle map has not seen any of the writes in the oplog
273            // the r_handle map has not seen any of the writes following swap_index
274            if self.swap_index != 0 {
275                // we can drain out the operations that only the w_handle map needs
276                //
277                // NOTE: the if above is because drain(0..0) would remove 0
278                //
279                // NOTE: the distinction between apply_first and apply_second is the reason why our
280                // use of shallow_copy is safe. we apply each op in the oplog twice, first with
281                // apply_first, and then with apply_second. on apply_first, no destructors are
282                // called for removed values (since those values all still exist in the other map),
283                // and all new values are shallow copied in (since we need the original for the
284                // other map). on apply_second, we call the destructor for anything that's been
285                // removed (since those removals have already happened on the other map, and
286                // without calling their destructor).
287                for op in self.oplog.drain(0..self.swap_index) {
288                    // because we are applying second, we _do_ want to perform drops
289                    Self::apply_second(unsafe { w_handle.do_drop() }, op, r_hasher);
290                }
291            }
292            // the rest have to be cloned because they'll also be needed by the r_handle map
293            for op in self.oplog.iter_mut() {
294                Self::apply_first(w_handle, op, r_hasher);
295            }
296            // the w_handle map is about to become the r_handle, and can ignore the oplog
297            self.swap_index = self.oplog.len();
298            // ensure meta-information is up to date
299            w_handle.meta = self.meta.clone();
300            w_handle.mark_ready();
301
302            // w_handle (the old r_handle) is now fully up to date!
303        }
304
305        // at this point, we have exclusive access to w_handle, and it is up-to-date with all
306        // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
307        // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
308        // r_handle.
309        //
310        // it's now time for us to swap the maps so that readers see up-to-date results from
311        // w_handle.
312
313        // prepare w_handle
314        let w_handle = self.w_handle.take().unwrap();
315        let w_handle = Box::into_raw(w_handle);
316
317        // swap in our w_handle, and get r_handle in return
318        let r_handle = self
319            .r_handle
320            .inner
321            .swap(w_handle, atomic::Ordering::Release);
322        let r_handle = unsafe { Box::from_raw(r_handle) };
323
324        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
325        atomic::fence(atomic::Ordering::SeqCst);
326
327        for (ri, epoch) in epochs.iter() {
328            self.last_epochs[ri] = epoch.load(atomic::Ordering::Acquire);
329        }
330
331        // NOTE: at this point, there are likely still readers using the w_handle we got
332        self.w_handle = Some(r_handle);
333        self.second = self.first;
334        self.first = false;
335
336        self
337    }
338
339    /// Gives the sequence of operations that have not yet been applied.
340    ///
341    /// Note that until the *first* call to `refresh`, the sequence of operations is always empty.
342    ///
343    /// ```
344    /// # use evmap::Operation;
345    /// let x = ('x', 42);
346    ///
347    /// let (r, mut w) = evmap::new();
348    ///
349    /// // before the first refresh, no oplog is kept
350    /// w.refresh();
351    ///
352    /// assert_eq!(w.pending(), &[]);
353    /// w.insert(x.0, x);
354    /// assert_eq!(w.pending(), &[Operation::Add(x.0, x)]);
355    /// w.refresh();
356    /// w.remove(x.0, x);
357    /// assert_eq!(w.pending(), &[Operation::Remove(x.0, x)]);
358    /// w.refresh();
359    /// assert_eq!(w.pending(), &[]);
360    /// ```
361    pub fn pending(&self) -> &[Operation<K, V>] {
362        &self.oplog[self.swap_index..]
363    }
364
365    /// Refresh as necessary to ensure that all operations are visible to readers.
366    ///
367    /// `WriteHandle::refresh` will *always* wait for old readers to depart and swap the maps.
368    /// This method will only do so if there are pending operations.
369    pub fn flush(&mut self) -> &mut Self {
370        if !self.pending().is_empty() {
371            self.refresh();
372        }
373
374        self
375    }
376
377    /// Set the metadata.
378    ///
379    /// Will only be visible to readers after the next call to `refresh()`.
380    pub fn set_meta(&mut self, mut meta: M) -> M {
381        mem::swap(&mut self.meta, &mut meta);
382        meta
383    }
384
385    fn add_op(&mut self, op: Operation<K, V>) -> &mut Self {
386        if !self.first {
387            self.oplog.push(op);
388        } else {
389            // we know there are no outstanding w_handle readers, so we can modify it directly!
390            let w_inner = self.w_handle.as_mut().unwrap();
391            let r_hasher = unsafe { self.r_handle.hasher() };
392            // because we are applying second, we _do_ want to perform drops
393            Self::apply_second(unsafe { w_inner.do_drop() }, op, r_hasher);
394            // NOTE: since we didn't record this in the oplog, r_handle *must* clone w_handle
395        }
396
397        self
398    }
399
400    /// Add the given value to the value-bag of the given key.
401    ///
402    /// The updated value-bag will only be visible to readers after the next call to `refresh()`.
403    pub fn insert(&mut self, k: K, v: V) -> &mut Self {
404        self.add_op(Operation::Add(k, v))
405    }
406
407    /// Replace the value-bag of the given key with the given value.
408    ///
409    /// Replacing the value will automatically deallocate any heap storage and place the new value
410    /// back into the `SmallVec` inline storage. This can improve cache locality for common
411    /// cases where the value-bag is only ever a single element.
412    ///
413    /// See [the doc section on this](./index.html#small-vector-optimization) for more information.
414    ///
415    /// The new value will only be visible to readers after the next call to `refresh()`.
416    pub fn update(&mut self, k: K, v: V) -> &mut Self {
417        self.add_op(Operation::Replace(k, v))
418    }
419
420    /// Clear the value-bag of the given key, without removing it.
421    ///
422    /// If a value-bag already exists, this will clear it but leave the
423    /// allocated memory intact for reuse, or if no associated value-bag exists
424    /// an empty value-bag will be created for the given key.
425    ///
426    /// The new value will only be visible to readers after the next call to `refresh()`.
427    pub fn clear(&mut self, k: K) -> &mut Self {
428        self.add_op(Operation::Clear(k))
429    }
430
431    /// Remove the given value from the value-bag of the given key.
432    ///
433    /// The updated value-bag will only be visible to readers after the next call to `refresh()`.
434    pub fn remove(&mut self, k: K, v: V) -> &mut Self {
435        self.add_op(Operation::Remove(k, v))
436    }
437
438    /// Remove the value-bag for the given key.
439    ///
440    /// The value-bag will only disappear from readers after the next call to `refresh()`.
441    pub fn empty(&mut self, k: K) -> &mut Self {
442        self.add_op(Operation::Empty(k))
443    }
444
445    /// Purge all value-bags from the map.
446    ///
447    /// The map will only appear empty to readers after the next call to `refresh()`.
448    ///
449    /// Note that this will iterate once over all the keys internally.
450    pub fn purge(&mut self) -> &mut Self {
451        self.add_op(Operation::Purge)
452    }
453
454    /// Retain elements for the given key using the provided predicate function.
455    ///
456    /// The remaining value-bag will only be visible to readers after the next call to `refresh()`
457    ///
458    /// # Safety
459    ///
460    /// The given closure is called _twice_ for each element, once when called, and once
461    /// on swap. It _must_ retain the same elements each time, otherwise a value may exist in one
462    /// map, but not the other, leaving the two maps permanently out-of-sync. This is _really_ bad,
463    /// as values are aliased between the maps, and are assumed safe to free when they leave the
464    /// map during a `refresh`. Returning `true` when `retain` is first called for a value, and
465    /// `false` the second time would free the value, but leave an aliased pointer to it in the
466    /// other side of the map.
467    ///
468    /// The arguments to the predicate function are the current value in the value-bag, and `true`
469    /// if this is the first value in the value-bag on the second map, or `false` otherwise. Use
470    /// the second argument to know when to reset any closure-local state to ensure deterministic
471    /// operation.
472    ///
473    /// So, stated plainly, the given closure _must_ return the same order of true/false for each
474    /// of the two iterations over the value-bag. That is, the sequence of returned booleans before
475    /// the second argument is true must be exactly equal to the sequence of returned booleans
476    /// at and beyond when the second argument is true.
477    pub unsafe fn retain<F>(&mut self, k: K, f: F) -> &mut Self
478    where
479        F: FnMut(&V, bool) -> bool + 'static + Send,
480    {
481        self.add_op(Operation::Retain(k, Predicate(Box::new(f))))
482    }
483
484    /// Shrinks a value-bag to it's minimum necessary size, freeing memory
485    /// and potentially improving cache locality by switching to inline storage.
486    ///
487    /// The optimized value-bag will only be visible to readers after the next call to `refresh()`
488    pub fn fit(&mut self, k: K) -> &mut Self {
489        self.add_op(Operation::Fit(Some(k)))
490    }
491
492    /// Like [`WriteHandle::fit`](#method.fit), but shrinks <b>all</b> value-bags in the map.
493    ///
494    /// The optimized value-bags will only be visible to readers after the next call to `refresh()`
495    pub fn fit_all(&mut self) -> &mut Self {
496        self.add_op(Operation::Fit(None))
497    }
498
499    /// Reserves capacity for some number of additional elements in a value-bag,
500    /// or creates an empty value-bag for this key with the given capacity if
501    /// it doesn't already exist.
502    ///
503    /// Readers are unaffected by this operation, but it can improve performance
504    /// by pre-allocating space for large value-bags.
505    pub fn reserve(&mut self, k: K, additional: usize) -> &mut Self {
506        self.add_op(Operation::Reserve(k, additional))
507    }
508
509    #[cfg(feature = "indexed")]
510    /// Remove the value-bag for a key at a specified index.
511    ///
512    /// This is effectively random removal.
513    /// The value-bag will only disappear from readers after the next call to `refresh()`.
514    ///
515    /// Note that this does a _swap-remove_, so use it carefully.
516    pub fn empty_at_index(&mut self, index: usize) -> Option<(&K, &Values<V, S>)> {
517        self.add_op(Operation::EmptyRandom(index));
518        // the actual emptying won't happen until refresh(), which needs &mut self
519        // so it's okay for us to return the references here
520
521        // NOTE to future zealots intent on removing the unsafe code: it's **NOT** okay to use
522        // self.w_handle here, since that does not have all pending operations applied to it yet.
523        // Specifically, there may be an *eviction* pending that already has been applied to the
524        // r_handle side, but not to the w_handle (will be applied on the next swap). We must make
525        // sure that we read the most up to date map here.
526        let inner = self.r_handle.inner.load(atomic::Ordering::SeqCst);
527        unsafe { (*inner).data.get_index(index) }.map(|(k, vs)| (k, vs.user_friendly()))
528    }
529
530    /// Apply ops in such a way that no values are dropped, only forgotten
531    fn apply_first(
532        inner: &mut Inner<K, ManuallyDrop<V>, M, S>,
533        op: &mut Operation<K, V>,
534        hasher: &S,
535    ) {
536        match *op {
537            Operation::Replace(ref key, ref mut value) => {
538                let vs = inner.data.entry(key.clone()).or_insert_with(Values::new);
539
540                // truncate vector
541                vs.clear();
542
543                // implicit shrink_to_fit on replace op
544                // so it will switch back to inline allocation for the subsequent push.
545                vs.shrink_to_fit();
546
547                vs.push(unsafe { value.shallow_copy() }, hasher);
548            }
549            Operation::Clear(ref key) => {
550                inner
551                    .data
552                    .entry(key.clone())
553                    .or_insert_with(Values::new)
554                    .clear();
555            }
556            Operation::Add(ref key, ref mut value) => {
557                inner
558                    .data
559                    .entry(key.clone())
560                    .or_insert_with(Values::new)
561                    .push(unsafe { value.shallow_copy() }, hasher);
562            }
563            Operation::Empty(ref key) => {
564                #[cfg(not(feature = "indexed"))]
565                inner.data.remove(key);
566                #[cfg(feature = "indexed")]
567                inner.data.swap_remove(key);
568            }
569            Operation::Purge => {
570                inner.data.clear();
571            }
572            #[cfg(feature = "indexed")]
573            Operation::EmptyRandom(index) => {
574                inner.data.swap_remove_index(index);
575            }
576            Operation::Remove(ref key, ref value) => {
577                if let Some(e) = inner.data.get_mut(key) {
578                    // remove a matching value from the value set
579                    // safety: this is fine
580                    e.swap_remove(unsafe { &*(value as *const _ as *const ManuallyDrop<V>) });
581                }
582            }
583            Operation::Retain(ref key, ref mut predicate) => {
584                if let Some(e) = inner.data.get_mut(key) {
585                    let mut first = true;
586                    e.retain(move |v| {
587                        let retain = predicate.eval(v, first);
588                        first = false;
589                        retain
590                    });
591                }
592            }
593            Operation::Fit(ref key) => match key {
594                Some(ref key) => {
595                    if let Some(e) = inner.data.get_mut(key) {
596                        e.shrink_to_fit();
597                    }
598                }
599                None => {
600                    for value_set in inner.data.values_mut() {
601                        value_set.shrink_to_fit();
602                    }
603                }
604            },
605            Operation::Reserve(ref key, additional) => match inner.data.entry(key.clone()) {
606                Entry::Occupied(mut entry) => {
607                    entry.get_mut().reserve(additional, hasher);
608                }
609                Entry::Vacant(entry) => {
610                    entry.insert(Values::with_capacity_and_hasher(additional, hasher));
611                }
612            },
613        }
614    }
615
616    /// Apply operations while allowing dropping of values
617    fn apply_second(inner: &mut Inner<K, V, M, S>, op: Operation<K, V>, hasher: &S) {
618        match op {
619            Operation::Replace(key, value) => {
620                let v = inner.data.entry(key).or_insert_with(Values::new);
621
622                // we are going second, so we should drop!
623                v.clear();
624
625                v.shrink_to_fit();
626
627                v.push(value, hasher);
628            }
629            Operation::Clear(key) => {
630                inner.data.entry(key).or_insert_with(Values::new).clear();
631            }
632            Operation::Add(key, value) => {
633                inner
634                    .data
635                    .entry(key)
636                    .or_insert_with(Values::new)
637                    .push(value, hasher);
638            }
639            Operation::Empty(key) => {
640                #[cfg(not(feature = "indexed"))]
641                inner.data.remove(&key);
642                #[cfg(feature = "indexed")]
643                inner.data.swap_remove(&key);
644            }
645            Operation::Purge => {
646                inner.data.clear();
647            }
648            #[cfg(feature = "indexed")]
649            Operation::EmptyRandom(index) => {
650                inner.data.swap_remove_index(index);
651            }
652            Operation::Remove(key, value) => {
653                if let Some(e) = inner.data.get_mut(&key) {
654                    // find the first entry that matches all fields
655                    e.swap_remove(&value);
656                }
657            }
658            Operation::Retain(key, mut predicate) => {
659                if let Some(e) = inner.data.get_mut(&key) {
660                    let mut first = true;
661                    e.retain(move |v| {
662                        let retain = predicate.eval(v, first);
663                        first = false;
664                        retain
665                    });
666                }
667            }
668            Operation::Fit(key) => match key {
669                Some(ref key) => {
670                    if let Some(e) = inner.data.get_mut(key) {
671                        e.shrink_to_fit();
672                    }
673                }
674                None => {
675                    for value_set in inner.data.values_mut() {
676                        value_set.shrink_to_fit();
677                    }
678                }
679            },
680            Operation::Reserve(key, additional) => match inner.data.entry(key) {
681                Entry::Occupied(mut entry) => {
682                    entry.get_mut().reserve(additional, hasher);
683                }
684                Entry::Vacant(entry) => {
685                    entry.insert(Values::with_capacity_and_hasher(additional, hasher));
686                }
687            },
688        }
689    }
690}
691
692impl<K, V, M, S> Extend<(K, V)> for WriteHandle<K, V, M, S>
693where
694    K: Eq + Hash + Clone,
695    S: BuildHasher + Clone,
696    V: Eq + Hash + ShallowCopy,
697    M: 'static + Clone,
698{
699    fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
700        for (k, v) in iter {
701            self.insert(k, v);
702        }
703    }
704}
705
706// allow using write handle for reads
707use std::ops::Deref;
708impl<K, V, M, S> Deref for WriteHandle<K, V, M, S>
709where
710    K: Eq + Hash + Clone,
711    S: BuildHasher + Clone,
712    V: Eq + Hash + ShallowCopy,
713    M: 'static + Clone,
714{
715    type Target = ReadHandle<K, V, M, S>;
716    fn deref(&self) -> &Self::Target {
717        &self.r_handle
718    }
719}