1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
//! generic, thread safe, in-memory, key-value data store
//!
//! As indicated by the name, the `Library` struct is the main export of
//! this module. Some supporting types and traits are also exported.
//!
//! The `Library` structure supports two operations: `get` and `insert`, which fill similar roles to
//! their counter parts in the `HashMap` collection.
//! `get` accepts a key and returns a value.
//! `insert` updates provided key in the data store with the provided value.
//!
//! The thread safety property of the `Library` is the result of wrapping multiple concurrency
//! primitives; Arc, ArcCell, and Mutex.
//!
//! Mutex and Arc are part of the standard library. We use mutex to prevent multiple writers from
//! adding a new key simultaneously. Note: this is subtly, but significantly, different from
//! preventing multiple insertions of the same key.
//!
//! ## Insertion
//!
//! There are two different scenarios to consider: inserting a new value under a new key and
//! inserting a new value for an existing key and
//!
//! ### New value under new key
//!
//! Inserting a value with a new key requires allocating additional space in the `HashMap` and
//! potentially rearranging the underlying data. To prevent consistency errors the `Library` has an
//! internal `Mutex` (`Library.insert_mutex`) which must be obtained before inserting a key.
//!
//! ```
//! use concurrent_kv::Library;
//!
//! let lib: Library<String, i64> = Library::new();
//! lib.insert("qwerty".into(), 12345);
//! let val0 = lib.get("qwerty").unwrap();
//! lib.insert("asdfgh".into(), 67890);
//! let val1 = lib.get("asdfgh").unwrap();
//! assert_eq!(val0, 12345.into());
//! assert_eq!(val1, 67890.into());
//! ```
//!
//! ### New value under existing key
//!
//! Since the key already exists the `HashMap` does not need to allocate any additional storage (we
//! are just swapping the contents of an ArcCell). So we can short-circuit the insertion process,
//! and thus skipping lock acquisition, by providing a reference to the ArcCell and swapping directly.
//!
//! This tradeoff for performance is what introduces the "Last Writer Wins" behavior for multiple
//! insertions to the same key.
//!
//! ```
//! use concurrent_kv::Library;
//! use std::sync::Arc;
//!
//! let lib0: Arc<Library<String, i64>> = Library::new().into();
//! let val0 = lib0.get("abc");
//! assert_eq!(val0, None.into());
//! let lib1 = lib0.clone();
//! lib0.insert("abc".into(), 123);
//! let val123 = lib1.get("abc");
//! assert_eq!(val123, lib0.get("abc"));
//! assert_eq!(val0, None.into());
//! lib1.insert("abc".into(), 456);
//! let val456 = lib1.get("abc");
//! assert_eq!(val456, Some(456.into()));
//! assert_eq!(val123, Some(123.into()));
//! ```
//!
//! ## `ArcCell`
//!
//! [`ArcCell`](https://github.com/aturon/crossbeam/blob/master/src/sync/arc_cell.rs) is provided by
//! the crossbeam crate. The naming and documentation are atrocious, so we attempt to provide an
//! explaination here.
//!
//! As the figure below attempts to depict, The defining feature of this type is the ability to swap
//! out the contents of the heap allocated value (i.e. `Cell`) atomically. So a more accurate name
//! would be `AtomicCell`.
//!
//! ```text
//!          A ----> N
//!   A -\
//!   A ---> A ----> O
//!   A --/      /
//!            A-
//!
//!          A -\ /- N
//!   A -\       X
//!   A ---> A -/ \- > O
//!   A --/       /
//!             A-
//!
//! see: https://internals.rust-lang.org/t/atomic-arc-swap/3588
//! ```
//!
//! ## Caveats
//!
//! It is up to the user of `Library` to ensure that only a single update to an individual key happens concurrently.
//! Otherwise the `Library` will default to the "Last Writer Wins" conflict resolution strategy (hardly ever the
//! desired behavior from an end user perspective).

extern crate crossbeam;

use std::collections::hash_map::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::borrow::Borrow;
use std::clone::Clone;
use crossbeam::sync::ArcCell;

pub type LibraryStore<K, V> = HashMap<K, Arc<ArcCell<V>>>;

pub trait LibraryKey: Eq + ::std::hash::Hash + Clone {}
impl<              K: Eq + ::std::hash::Hash + Clone> LibraryKey for K {}

pub struct Library<K, V> where K: LibraryKey {
    internal_data: ArcCell<LibraryStore<K, V>>,
    insert_mutex: Mutex<()>,
}

impl<K, V> ::std::default::Default for Library<K, V> where K: LibraryKey {
    fn default() -> Library<K, V> {
        Library {
            internal_data: ArcCell::new(HashMap::new().into()),
            insert_mutex: Mutex::new(()),
        }
    }
}

impl<K, V> Library<K, V> where K: LibraryKey {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn with_capacity(capacity: usize) -> Self {
        Library {
            internal_data: ArcCell::new(HashMap::with_capacity(capacity).into()),
            insert_mutex: Mutex::new(()),
        }
    }

    fn internal_data(&self) -> Arc<LibraryStore<K, V>> {
        self.internal_data.get()
    }

    pub fn get<Q: ?Sized>(&self, key: &Q) -> Option<Arc<V>>
    where K: Borrow<Q>, Q: ::std::hash::Hash + Eq {
        self.internal_data().get(key).map(|el| el.get())
    }

    pub fn insert(&self, key: K, value: V) {
        let store = self.internal_data();

        // short circuit if domain already exists
        if let Some(arccell) = store.get(&key) {
                arccell.set(value.into());
                return;
        }

        // obtain lock (released at end of function scope)
        let _guard = self.insert_mutex.lock().unwrap();

        let store = self.internal_data();

        // exact copy of first `if let`
        if let Some(arccell) = store.get(&key) {
                arccell.set(value.into());
                return;
        }

        let new_hash: LibraryStore<K, V> = {
            // Multiple bindings because rust is incapable of inferring types
            let new_hash: &LibraryStore<K, V> = store.borrow();

            // Note: Potential room for future optimization if HashMap
            // ever provides a custom implementation of `clone_from`
            // that re-uses the allocated memory of `self`.
            //
            // If it did, we could create `new_hash` prior to the write-lock:
            //
            //     // (Before the lock)
            //     let mut new_hash: LibraryStore<K, V> = LibraryStore::with_capacity(store.capacity());
            //     // Ensure new_hash can have an additional element without resizing
            //     new_hash.reserve(1);
            //
            // And then here, avoid having to allocate memory (unless a
            // bajillion elements were added between when we grabbed the
            // hash map and when we write-locked Library).
            //
            //     // (After the lock)
            //     new_hash.clone_from(&store);
            //     new_hash.insert(...);
            //
            let mut new_hash = new_hash.clone();
            new_hash.insert(key,
                            ArcCell::new(value.into()).into());

            new_hash
        };

        self.internal_data.set(new_hash.into());
    }

    pub fn map_concat<F>(&self, closure: F) -> String // TODO? Make generic over "things that can be concatenated/appended".
        where F : Fn(&K, &V) -> String {
        let mut s = String::new();
        for (key, val) in self.internal_data().iter() {
            s.push_str(&closure(key, val.get().deref())[..]);
        }
        s
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use ::std::sync::Arc;

    #[test]
    fn basic() {
        let lib1: Arc<Library<String, i64>> = Library::new().into();
        lib1.insert("abc".into(), 123);

        assert_eq!(lib1.get(&String::from("abc")), Some(Arc::new(123)));
    }

    #[test]
    fn old_value_replace_same_key() {
        let lib0: Arc<Library<String, i64>> = Library::new().into();
        let val0 = lib0.get("abc");
        assert_eq!(val0, None.into());
        let lib1 = lib0.clone();
        lib0.insert("abc".into(), 123);
        let val123 = lib1.get("abc");
        assert_eq!(val123, lib0.get("abc"));
        assert_eq!(val0, None.into());
        lib1.insert("abc".into(), 456);
        let val456 = lib1.get("abc");
        assert_eq!(val456, Some(456.into()));
        assert_eq!(val123, Some(123.into()));
    }

    #[test]
    fn old_value_insert_new_key() {
        let lib: Library<String, i64> = Library::new();
        lib.insert("qwerty".into(), 12345);
        let val0 = lib.get("qwerty").unwrap();
        lib.insert("asdfgh".into(), 67890);
        let val1 = lib.get("asdfgh").unwrap();
        assert_eq!(val0, 12345.into());
        assert_eq!(val1, 67890.into());
    }

    #[test]
    fn render_all_values() {

        let lib: Library<String, String> = Library::new();

        lib.insert("I".into(), "hope".into());
        lib.insert("this".into(), "works".into());
        lib.insert("!".into(), "!!".into());

        let result = lib.map_concat(|k, v| {
            format!("\"{}\": \"{}\"\n", k, v)
        });
        assert!(result.contains("\"I\": \"hope\""));
        assert!(result.contains("\"!\": \"!!\""));
        assert!(result.contains("\"this\": \"works\""));
    }
}