1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
//! [StateHashTable] solves the rendezvous problem.
//!
//! ```
//! use std::sync::atomic::{AtomicBool, Ordering};
//! use std::sync::Arc;
//! use sync42::state_hash_table::{Handle, Key, Value, StateHashTable};
//!
//! #[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
//! struct SampleKey {
//!     key: u64,
//! }
//!
//! impl SampleKey {
//!     const fn new(key: u64) -> Self {
//!         Self {
//!             key,
//!         }
//!     }
//! }
//!
//! impl Key for SampleKey {
//! }
//!
//! #[derive(Debug, Default)]
//! struct SampleValue {
//!     finished: AtomicBool,
//! }
//!
//! impl From<SampleKey> for SampleValue {
//!     fn from(key: SampleKey) -> Self {
//!         Self {
//!             finished: AtomicBool::default(),
//!         }
//!     }
//! }
//!
//! impl Value for SampleValue {
//!     fn finished(&self) -> bool { self.finished.load(Ordering::Relaxed) }
//! }
//!
//! // Create the state hash table.  This should be a global-ish structure.
//! let mut sht: StateHashTable<SampleKey, SampleValue> = StateHashTable::new();
//! // Everything revolves around the key.  We don't demonstrate this, but different keys are
//! // totally partitioned and do not interact except to contend on a shared lock.
//! const KEY: SampleKey = SampleKey::new(42);
//!
//! // There's nothing there until we create it.
//! assert!(sht.get_state(KEY).is_none());
//! let mut state1 = sht.create_state(KEY);
//! assert!(state1.is_some());
//! let mut state1 = state1.unwrap();
//!
//! // Attempts to create twice fail with None.
//! let mut state2 = sht.create_state(KEY);
//! assert!(state2.is_none());
//!
//! // But get_state will work.
//! let mut state3 = sht.get_state(KEY);
//! assert!(state3.is_some());
//! let mut state3 = state3.unwrap();
//!
//! // It is guaranteed that when two threads hold reference to the same hash table and have [Eq]
//! // keys they will be the same underlying value.
//!
//! Handle::is_same(&state1, &state3);
//!
//! // It is also guaranteed that when state is dropped but the work is unfinished that the value
//! // will persist in the table.  Note that there will be no handles to this state and it will
//! // persist.
//! drop(state1);
//! drop(state3);
//!
//! // Notice that we use [get_state] here.  It uses the existing state.
//! let mut state4 = sht.get_state(KEY);
//! assert!(state4.is_some());
//! let mut state4 = state4.unwrap();
//! state4.finished.store(true, Ordering::Relaxed);
//!
//! // Drop the remaining references.
//! drop(state4);
//!
//! // Get state fails because we marked it finished and dropped all references.  Only when the
//! // last reference is dropped will the item be collected, even if the outcome of the
//! // [finished()] call changes.
//! let mut state5 = sht.get_state(KEY);
//! assert!(state5.is_none());
//! ```

use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Deref;
use std::sync::{Arc, Mutex};

use biometrics::{Collector, Counter};

//////////////////////////////////////////// biometrics ////////////////////////////////////////////

static NEW_STATE_HASH_TABLE: Counter = Counter::new("sync42.state_hash_table.new");

static ENTRY_INSERTED: Counter = Counter::new("sync42.state_hash_table.inserted");
static ENTRY_REMOVED: Counter = Counter::new("sync42.state_hash_table.removed");
static ARBITRARY_KEY: Counter = Counter::new("sync42.state_hash_table.arbitrary_key");

/// Register the biometrics for state hash table.
pub fn register_biometrics(collector: &Collector) {
    collector.register_counter(&ENTRY_INSERTED);
    collector.register_counter(&ENTRY_REMOVED);
    collector.register_counter(&ARBITRARY_KEY);
}

//////////////////////////////////////////////// Key ///////////////////////////////////////////////

/// A key for a state hash table.
pub trait Key: Clone + Debug + Hash + Eq + PartialEq {}

impl Key for u64 {}
impl Key for String {}

/////////////////////////////////////////////// Value //////////////////////////////////////////////

/// A value for a state hash table.
pub trait Value: Default {
    /// True iff the value is at a quiescent/finished state.  This means it can be collected, not
    /// that it will be collected.  It is perfectly acceptable to pickup a handle to finished state
    /// and take a transition that leads to it being unfinished.  Consequently, finished should be
    /// evaluated under mutual exclusion.  The way we do this is to hold a lock, check that we hold
    /// the only deferenceable copy (there's another in the map, but the lock prevents anyone else
    /// from accessing the map because it's the map's lock that we hold).  Consequently, this
    /// should be a fast computation.
    fn finished(&self) -> bool;
}

////////////////////////////////////////////// Handle //////////////////////////////////////////////

/// A Handle holds a reference to a key-value pair in a table.  Two handles that come from the same
/// table and key are guaranteed to refer to the same piece of state.
pub struct Handle<'a, K: Key, V: Value> {
    table: &'a StateHashTable<K, V>,
    key: K,
    value: Arc<V>,
}

impl<'a, K: Key, V: Value> Handle<'a, K, V> {
    fn new(table: &'a StateHashTable<K, V>, key: K, value: Arc<V>) -> Self {
        Self { table, key, value }
    }

    /// True if and only if both handles point to the same table and state.
    pub fn is_same(lhs: &Self, rhs: &Self) -> bool {
        std::ptr::eq(lhs.table, rhs.table)
            && lhs.key == rhs.key
            && Arc::ptr_eq(&lhs.value, &rhs.value)
    }
}

impl<'a, K: Key, V: Value> Deref for Handle<'a, K, V> {
    type Target = V;

    fn deref(&self) -> &Self::Target {
        &self.value
    }
}

impl<'a, K: Key, V: Value> Drop for Handle<'a, K, V> {
    fn drop(&mut self) {
        let mut entries = self.table.entries.lock().unwrap();
        // us and the table; synchronized by entries intentionally.
        //
        // This intentionally calls finished() while holding the mutex.  We spec that it needs to
        // be fast.  And there's no way for anyone to come along, get the reference from us or the
        // map (per Rust borrow rules) and change the state.  So it looks like we're contending on
        // the mutex, but it's us and the map.  Only a new thread to come along can contend, and by
        // that point we've already made the decision to remove from the map, so the new thread
        // will follow the rules to create a value.
        if Arc::strong_count(&self.value) == 2 && (*self.value).finished() {
            ENTRY_REMOVED.click();
            entries.remove(&self.key);
        }
        // NOTE(rescrv):  Here we're safe to drop the handle.  If the count is less than two we've
        // already cleaned up all but self.  If the count is two we cleanup when finished.
        // Otherwise someone else will pass through two on the drop.
    }
}

////////////////////////////////////////// StateHashTable //////////////////////////////////////////

/// StateHashTable is the main collection.
pub struct StateHashTable<K: Key, V: Value> {
    entries: Mutex<HashMap<K, Arc<V>>>,
}

impl<K: Key, V: Value> StateHashTable<K, V> {
    /// Create a new StateHashTable.  This should be an infrequent operation.
    pub fn new() -> Self {
        NEW_STATE_HASH_TABLE.click();
        Self {
            entries: Mutex::new(HashMap::new()),
        }
    }

    /// Return a seemingly-arbitrary key from the hash table or None if there's no keys in the hash
    /// table.  This is meant to be used for draining a server of waiters.
    pub fn arbitary_key(&self) -> Option<K> {
        ARBITRARY_KEY.click();
        self.entries
            .lock()
            .unwrap()
            .iter()
            .map(|(k, _)| k.clone())
            .next()
    }

    /// Create a new piece of state, returning None iff there already exists state for `key`.
    pub fn create_state<'a: 'b, 'b>(&'a self, key: K) -> Option<Handle<'a, K, V>>
    where
        V: From<K>,
    {
        let value = Arc::new(V::from(key.clone()));
        let valuep = Arc::clone(&value);
        let mut entries = self.entries.lock().unwrap();
        if !entries.contains_key(&key) {
            ENTRY_INSERTED.click();
            entries.insert(key.clone(), value);
            Some(Handle::new(self, key, valuep))
        } else {
            None
        }
    }

    /// Return an existing new piece of state, returning None iff there does not exist state for
    /// `key`.
    pub fn get_state<'a: 'b, 'b>(&'a self, key: K) -> Option<Handle<'b, K, V>> {
        let entries = self.entries.lock().unwrap();
        entries.get(&key).map(|value| Handle {
            table: self,
            key,
            value: Arc::clone(value),
        })
    }

    /// Return an existing piece of state, or create a new one, and always return a handle to the
    /// state for `key`.
    pub fn get_or_create_state<'a: 'b, 'b>(&'a self, key: K) -> Handle<'b, K, V>
    where
        V: From<K>,
    {
        let mut value = None;
        let mut make_value = false;

        loop {
            if make_value && value.is_none() {
                value = Some(Arc::new(V::from(key.clone())));
            }
            let mut entries = self.entries.lock().unwrap();
            let state = entries.get(&key);
            match (state, &value) {
                (None, None) => {
                    make_value = true;
                }
                (None, Some(value)) => {
                    let value1 = Arc::clone(value);
                    let value2 = Arc::clone(value);
                    ENTRY_INSERTED.click();
                    entries.insert(key.clone(), value1);
                    return Handle::new(self, key, value2);
                }
                (Some(state), _) => {
                    let value = Arc::clone(state);
                    return Handle::new(self, key, value);
                }
            }
        }
    }
}

impl<K: Key, V: Value> Default for StateHashTable<K, V> {
    fn default() -> Self {
        Self::new()
    }
}