mbarc_map/
lib.rs

1//! Implementation of a [Minimally-blocking, Atomic Reference Counted Map](MbarcMap).
2//!
3//! To break that down, map at the heart of this crate achieves the following core goals:
4//! - Minimally-blocking: a user should never need to wrap this map in a mutex, all internal mutexes are held for as short of a duration as possible, and there are no deadlock cases.  Users only need to manually take locks to individual elements.
5//! - Atomic Reference Counted - all data stored within the map are reference counted in a thread-safe manner, and it is safe to hold these references indefinitely
6
7pub use fixed_address_continuous_allocation::*;
8
9pub use data_reference::*;
10pub use data_reference_generic::*;
11pub use minimally_blocking_atomic_reference_counted_map::*;
12
13mod fixed_address_continuous_allocation;
14
15mod data_holder;
16mod data_reference;
17mod data_reference_generic;
18
19mod minimally_blocking_atomic_reference_counted_map;
20
21#[cfg(test)]
22mod tests {
23	use rand::prelude::*;
24	use rand_chacha::ChaCha8Rng;
25	use rayon::prelude::*;
26	use std::any::TypeId;
27	use std::collections::HashMap;
28	use std::mem::size_of;
29	use std::ops::Deref;
30	use std::sync::{Arc, Mutex, MutexGuard};
31	use std::thread;
32
33	use super::*;
34
35	type PreSeed<const N: usize> = Box<[(i64, i64); N]>;
36
37	const FIXED_SEED: u64 = 0xDEADBEEF;
38
39	fn make_data_pairs<const N: usize>(seed: u64) -> PreSeed<N> {
40		let mut rng = ChaCha8Rng::seed_from_u64(seed);
41
42		let mut pairs = Box::new([(0i64, 0i64); N]);
43		for i in 0..N {
44			let a = rng.random_range(i64::MIN..i64::MAX);
45			let b = rng.random_range(i64::MIN..i64::MAX);
46			pairs[i] = (a, b);
47		}
48
49		pairs
50	}
51
52	#[test]
53	fn test_use_element_after_drop_one_value() {
54		let concurrent_hash = Arc::new(MbarcMap::new());
55
56		let key: i64 = 2;
57		let value: &str = "Hi";
58		concurrent_hash.insert(key, value);
59
60		let first_value: Option<DataReference<&str>> = concurrent_hash.get(&key);
61		drop(concurrent_hash);
62
63		let first_value: DataReference<&str> = first_value.unwrap();
64
65		assert_eq!(first_value.ref_count(), 1);
66		assert!(first_value.is_marked_deleted());
67
68		let first_value_lock: MutexGuard<&str> = first_value.lock().unwrap();
69		assert_eq!(*first_value_lock, "Hi");
70	}
71
72	#[test]
73	fn test_safe_to_use_element_after_map_is_dropped() {
74		const N: usize = 1000;
75
76		let source_data = make_data_pairs::<N>(FIXED_SEED);
77		let concurrent_hash = Arc::new(MbarcMap::new());
78
79		insert_several_threaded(&source_data, &concurrent_hash);
80
81		let first_value = concurrent_hash.get(&source_data[0].0);
82		assert!(first_value.is_some());
83
84		drop(concurrent_hash);
85		let first_value = first_value.unwrap();
86
87		//let raw_data = first_value.raw_data();
88		assert_eq!(first_value.ref_count(), 1);
89		assert!(first_value.is_marked_deleted());
90
91		let first_value_lock = first_value.lock().unwrap();
92		assert_eq!(*first_value_lock, source_data[0].1);
93	}
94
95	#[test]
96	fn test_insert_remove_insert() {
97		const STEP_SIZE: usize = 30000;
98		const START_REMOVING_INDEX: usize = 2 * STEP_SIZE;
99		const N: usize = 3 * STEP_SIZE;
100
101		let source_data = make_data_pairs::<N>(FIXED_SEED);
102		let mut base_hash = Box::new(HashMap::new());
103		let concurrent_hash = Arc::new(MbarcMap::new());
104
105		source_data.iter().enumerate().for_each(|(i, (k, v))| {
106			if i >= START_REMOVING_INDEX {
107				let removal_index = i - START_REMOVING_INDEX;
108				let remove_key = source_data.get(removal_index).unwrap().0;
109
110				base_hash.remove(&remove_key);
111			}
112
113			base_hash.insert(*k, *v);
114		});
115
116		let (initial_insertions, parallel_inserted_while_removing) =
117			source_data.split_at(START_REMOVING_INDEX);
118
119		for (k, v) in initial_insertions {
120			concurrent_hash.insert(*k, *v);
121		}
122
123		parallel_inserted_while_removing
124			.par_iter()
125			.enumerate()
126			.for_each(|(i, (k, v))| {
127				let remove_key = source_data.get(i).unwrap().0;
128				concurrent_hash.remove(&remove_key);
129
130				concurrent_hash.insert(*k, *v);
131			});
132
133		assert_hash_contents_equal(&base_hash, concurrent_hash)
134	}
135
136	fn insert_several<const N: usize>(from: &PreSeed<N>, to: &mut HashMap<i64, i64>) {
137		for (k, v) in from.iter() {
138			to.insert(*k, *v);
139		}
140	}
141
142	fn insert_several_threaded<const N: usize>(from: &PreSeed<N>, to: &Arc<MbarcMap<i64, i64>>) {
143		from.par_iter().for_each(|(k, v)| {
144			to.insert(*k, *v);
145		});
146	}
147
148	#[test]
149	fn test_insert_only() {
150		const N: usize = 100000;
151
152		let source_data = make_data_pairs::<N>(FIXED_SEED);
153		let mut base_hash = Box::new(HashMap::new());
154		let concurrent_hash = Arc::new(MbarcMap::new());
155
156		insert_several(&source_data, &mut base_hash);
157		insert_several_threaded(&source_data, &concurrent_hash);
158
159		//println!("Confirming length after insert");
160		assert_eq!(base_hash.len(), N);
161		assert_eq!(concurrent_hash.len(), N);
162
163		assert_hash_contents_equal(&base_hash, concurrent_hash);
164
165		println!("Insert test done");
166	}
167
168	fn assert_hash_contents_equal(
169		base_hash: &HashMap<i64, i64>,
170		concurrent_hash: Arc<MbarcMap<i64, i64>>,
171	) {
172		//println!("Comparing values after insert");
173		for (k, v) in base_hash.iter() {
174			//println!("Checking for {} and {}",k,v);
175			assert!(concurrent_hash.contains(k));
176			//println!("Key found, comparing value");
177
178			let expected_value: i64 = *v;
179
180			//println!("Fetching from map");
181			let data_from_map = concurrent_hash.get(k).unwrap();
182
183			//println!("Checking inner data");
184			//let raw_data = data_from_map.raw_data();
185			let current_ref_count = data_from_map.ref_count();
186			let is_raw_deleted = data_from_map.is_marked_deleted();
187			assert_eq!(current_ref_count, 2);
188			assert!(!is_raw_deleted);
189
190			//println!("making sure lock's ok");
191			let data_mutex_poisoned = data_from_map.is_poisoned();
192			assert!(!data_mutex_poisoned);
193
194			//println!("Taking lock on inner data");
195			//let data_lock=data_from_map.lock().unwrap();
196			let data_lock = data_from_map.try_lock();
197			let data_lock_ok = data_lock.is_ok();
198			assert!(data_lock_ok);
199
200			//println!("Assigning value");
201			let true_lock = data_lock.unwrap();
202			let actual_value = *true_lock;
203
204			assert_eq!(expected_value, actual_value);
205			//println!("Pair {}, {} passed!",k,v);
206
207			//drop(true_lock);
208		}
209	}
210
211	#[test]
212	fn test_key_iterator() {
213		const N: usize = 100000;
214
215		let source_data = make_data_pairs::<N>(FIXED_SEED);
216		let mut base_hash = Box::new(HashMap::new());
217		let concurrent_hash = Arc::new(MbarcMap::new());
218
219		insert_several(&source_data, &mut base_hash);
220		insert_several_threaded(&source_data, &concurrent_hash);
221
222		for (k, v) in concurrent_hash.iter_copied_keys() {
223			assert!(base_hash.contains_key(&k));
224
225			let base_val = base_hash.remove(&k).unwrap();
226			assert_eq!(base_val, *v.lock().unwrap());
227		}
228
229		assert_eq!(base_hash.len(), 0);
230	}
231
232	#[test]
233	fn test_value_iterator_preserves_insert_order_when_no_removal() {
234		const N: usize = 100000;
235
236		let source_data = make_data_pairs::<N>(FIXED_SEED);
237		let base_hash = MbarcMap::new();
238
239		source_data.iter().for_each(|(k, v)| {
240			base_hash.insert(*k, *v);
241		});
242
243		for (i, value) in base_hash.iter_copied_values_ordered().enumerate() {
244			assert_eq!(source_data[i].1, *value.lock().unwrap());
245		}
246	}
247
248	#[test]
249	fn test_locked_value_iterator_preserves_insert_order_when_no_removal() {
250		const N: usize = 100000;
251
252		let source_data = make_data_pairs::<N>(FIXED_SEED);
253		let base_hash = MbarcMap::new();
254
255		source_data.iter().for_each(|(k, v)| {
256			base_hash.insert(*k, *v);
257		});
258
259		//TODO: this proves enumeration is correct, but does not prove that the map itself is being locked during iteration (such as in test_locked_iteration)
260		for (i, value) in base_hash.iter_values_exclusive().iter().enumerate() {
261			assert_eq!(source_data[i].1, *value.lock().unwrap());
262		}
263	}
264
265	#[test]
266	fn test_drop() {
267		const N: usize = 100000;
268
269		let source_data = make_data_pairs::<N>(FIXED_SEED);
270		let concurrent_hash = Arc::new(MbarcMap::new());
271
272		insert_several_threaded(&source_data, &concurrent_hash);
273
274		let iter = concurrent_hash.iter();
275		drop(concurrent_hash);
276
277		for v in iter {
278			assert!(v.is_marked_deleted());
279			assert_eq!(v.ref_count(), 1);
280		}
281	}
282
283	trait TestTrait {
284		fn get(&self) -> u64 {
285			5
286		}
287	}
288
289	const TEST_TYPE_VALUE: u64 = 2;
290
291	struct TestType {}
292
293	impl TestTrait for TestType {
294		fn get(&self) -> u64 {
295			TEST_TYPE_VALUE
296		}
297	}
298
299	#[test]
300	fn test_mutate_deref() {
301		assert_eq!(size_of::<TestType>(), 0);
302
303		let map = MbarcMap::<usize, TestType>::new();
304		map.insert(0, TestType {});
305
306		let item = map.get(&0).unwrap();
307		assert_eq!(item.lock().unwrap().deref().get(), TEST_TYPE_VALUE);
308
309		let raw: &Mutex<dyn TestTrait> = item.deref();
310		assert_eq!(raw.lock().unwrap().get(), TEST_TYPE_VALUE);
311	}
312
313	#[test]
314	fn test_locked_iteration() {
315		const N: usize = 1000;
316
317		let source_data = make_data_pairs::<N>(FIXED_SEED);
318		let concurrent_hash = Arc::new(MbarcMap::new());
319
320		insert_several_threaded(&source_data, &concurrent_hash);
321
322		let result = thread::scope(|scope| {
323			let v: Arc<Mutex<Vec<(i64, DataReference<i64>)>>> = Default::default();
324
325			for _ in 0..2 {
326				let my_hash = concurrent_hash.clone();
327				let my_vec = v.clone();
328				scope.spawn(move || {
329					for (k, v) in my_hash.iter_exclusive().iter() {
330						my_vec.lock().unwrap().push((*k, v.clone()))
331					}
332				});
333			}
334
335			v
336		});
337
338		let result = match Arc::try_unwrap(result) {
339			Ok(r) => r.into_inner().unwrap(),
340			Err(_) => {
341				unreachable!()
342			}
343		};
344
345		assert_eq!(result.len(), 2 * N);
346
347		for i in 0..N {
348			let (k1, v1) = &result[i];
349			let (k2, v2) = &result[i + N];
350
351			assert_eq!(*k1, *k2);
352
353			let v1 = *v1.lock().unwrap();
354			let v2 = *v2.lock().unwrap();
355			assert_eq!(v1, v2);
356		}
357	}
358
359	struct GenericRefTestType {
360		a: MbarcMap<usize, u32>,
361		b: MbarcMap<usize, u64>,
362	}
363
364	impl GenericRefTestType {
365		const A_ITEM_KEY: usize = 0;
366		const B_ITEM_KEY: usize = 0;
367
368		fn new(a_val: u32, b_val: u64) -> Self {
369			let a = MbarcMap::new();
370			let b = MbarcMap::new();
371
372			a.insert(Self::A_ITEM_KEY, a_val);
373			b.insert(Self::B_ITEM_KEY, b_val);
374
375			Self { a, b }
376		}
377
378		fn get_from_a(&self) -> DataReferenceGeneric {
379			DataReferenceGeneric::from(self.a.get(&Self::A_ITEM_KEY).unwrap())
380		}
381
382		fn get_from_b(&self) -> DataReferenceGeneric {
383			DataReferenceGeneric::from(self.b.get(&Self::B_ITEM_KEY).unwrap())
384		}
385
386		fn a_ref_count(&self) -> usize {
387			//number of refs, minus the temporary one we just created
388			self.a.get(&Self::A_ITEM_KEY).unwrap().ref_count() - 1
389		}
390
391		fn b_ref_count(&self) -> usize {
392			//number of refs, minus the temporary one we just created
393			self.b.get(&Self::B_ITEM_KEY).unwrap().ref_count() - 1
394		}
395
396		fn set_a(&self, value: u32) {
397			*self.a.get(&Self::A_ITEM_KEY).unwrap().lock().unwrap() = value;
398		}
399
400		fn set_b(&self, value: u64) {
401			*self.b.get(&Self::B_ITEM_KEY).unwrap().lock().unwrap() = value;
402		}
403	}
404
405	#[test]
406	fn test_generic_morphing() {
407		const A_VALUE: u32 = 0;
408		const B_VALUE: u64 = 0;
409
410		let tester = GenericRefTestType::new(A_VALUE, B_VALUE);
411
412		assert_eq!(tester.a_ref_count(), 1);
413		assert_eq!(tester.b_ref_count(), 1);
414
415		let a_generic = tester.get_from_a();
416		let b_generic = tester.get_from_b();
417
418		assert_eq!(tester.a_ref_count(), 2);
419		assert_eq!(tester.b_ref_count(), 2);
420
421		assert_eq!(a_generic.type_id(), TypeId::of::<DataReference<u32>>());
422		assert_eq!(a_generic.inner_type_id(), TypeId::of::<u32>());
423
424		assert_eq!(b_generic.type_id(), TypeId::of::<DataReference<u64>>());
425		assert_eq!(b_generic.inner_type_id(), TypeId::of::<u64>());
426
427		let not_a = a_generic.to_typed::<u128>();
428		let not_b = b_generic.to_typed::<u128>();
429
430		assert!(not_a.is_none());
431		assert!(not_b.is_none());
432
433		assert_eq!(tester.a_ref_count(), 2);
434		assert_eq!(tester.b_ref_count(), 2);
435
436		let actually_a = a_generic.to_typed::<u32>();
437		let actually_b = b_generic.to_typed::<u64>();
438
439		assert!(actually_a.is_some());
440		assert!(actually_b.is_some());
441
442		assert_eq!(tester.a_ref_count(), 3);
443		assert_eq!(tester.b_ref_count(), 3);
444
445		drop(a_generic);
446		assert_eq!(tester.a_ref_count(), 2);
447
448		drop(b_generic);
449		assert_eq!(tester.b_ref_count(), 2);
450
451		let actually_a = actually_a.unwrap();
452		let actually_b = actually_b.unwrap();
453
454		assert_eq!(*actually_a.lock().unwrap(), A_VALUE);
455		assert_eq!(*actually_b.lock().unwrap(), B_VALUE);
456
457		const NEW_A: u32 = 11;
458		const NEW_B: u64 = 12;
459
460		tester.set_a(NEW_A);
461		tester.set_b(NEW_B);
462
463		assert_eq!(*actually_a.lock().unwrap(), NEW_A);
464		assert_eq!(*actually_b.lock().unwrap(), NEW_B);
465	}
466
467	#[test]
468	fn test_generic_early_drop() {
469		const A_VALUE: u32 = 0;
470		const B_VALUE: u64 = 0;
471
472		let tester = GenericRefTestType::new(A_VALUE, B_VALUE);
473
474		assert_eq!(tester.a_ref_count(), 1);
475		assert_eq!(tester.b_ref_count(), 1);
476
477		let a_generic = tester.get_from_a();
478		let b_generic = tester.get_from_b();
479
480		assert_eq!(tester.a_ref_count(), 2);
481		assert_eq!(tester.b_ref_count(), 2);
482
483		drop(tester);
484
485		let actually_a = a_generic.to_typed::<u32>();
486		let actually_b = b_generic.to_typed::<u64>();
487
488		assert!(actually_a.is_some());
489		assert!(actually_b.is_some());
490
491		let actually_a = actually_a.unwrap();
492		let actually_b = actually_b.unwrap();
493
494		assert_eq!(actually_a.ref_count(), 2);
495		assert_eq!(actually_b.ref_count(), 2);
496
497		drop(a_generic);
498		assert_eq!(actually_a.ref_count(), 1);
499
500		drop(b_generic);
501		assert_eq!(actually_b.ref_count(), 1);
502
503		assert_eq!(*actually_a.lock().unwrap(), A_VALUE);
504		assert_eq!(*actually_b.lock().unwrap(), B_VALUE);
505	}
506}