async_map/
non_locking_map.rs1use crate::single_writer_versioned::{DataUpdater, Versioned};
2use crate::{AsyncKey, AsyncMap, AsyncStorable, FactoryBorrow};
3use im;
4
5use futures::{FutureExt, TryFutureExt};
6use std::future::{ready, Future};
7use std::pin::Pin;
8
9#[derive(Clone)]
10pub struct NonLockingMap<K: AsyncKey, V: AsyncStorable> {
11 versioned: Versioned<im::HashMap<K, V>>,
12}
13
14impl<K: AsyncKey, V: AsyncStorable> NonLockingMap<K, V> {
15 pub fn new() -> NonLockingMap<K, V> {
16 NonLockingMap {
17 versioned: Versioned::from_initial(im::HashMap::new()).0, }
19 }
20}
21
22impl<K: AsyncKey, V: AsyncStorable> AsyncMap for NonLockingMap<K, V> {
23 type Key = K;
24 type Value = V;
25 fn get_if_present(&self, key: &K) -> Option<V> {
26 self.versioned
27 .with_latest(|map| map.get(key).map(|value| value.clone()))
28 }
29
30 fn get<'a, 'b, F: FactoryBorrow<K, V>>(
31 &'a self,
32 key: &'a K,
33 factory: F,
34 ) -> Pin<Box<(dyn Future<Output = V> + Send + 'b)>> {
35 match self.get_if_present(key) {
36 Some(value) => ready(value).boxed(),
37 None => {
38 let (sender, receiver) = tokio::sync::oneshot::channel();
39
40 let key = key.clone();
41 let updater: Box<dyn DataUpdater<im::HashMap<K, V>>> =
42 Box::new(move |map| match map.get(&key) {
43 Some(value) => {
44 sender.send(value.clone()).expect("Send failed!");
45 None
46 }
47 None => {
48 let new_value = (*factory.borrow())(&key);
49 let new_map = map.update(key, new_value.clone());
50 sender.send(new_value).expect("Send failed!");
51 Some(new_map)
52 }
53 });
54
55 if self.versioned.clone().update(updater).is_err() {
56 panic!("Update failed");
57 }
58
59 receiver
60 .unwrap_or_else(|_| panic!("Oneshot receive failed!"))
61 .boxed()
62 }
63 }
64 }
65}
66
67#[cfg(test)]
68mod test {
69
70 use super::NonLockingMap as VersionedMap;
71 use crate::{AsyncFactory, AsyncMap};
72 #[tokio::test]
73 async fn get_sync() {
74 let map = VersionedMap::<String, String>::new();
75
76 assert_eq!(None, map.get_if_present(&"foo".to_owned()));
77 }
78
79 fn hello_factory(key: &String) -> String {
80 format!("Hello, {}!", key)
81 }
82
83 #[tokio::test]
84 async fn get_sync2() {
85 let map = VersionedMap::<String, String>::new();
86
87 let key = "foo".to_owned();
88
89 let future = map.get(
90 &key,
91 Box::new(hello_factory) as Box<dyn AsyncFactory<String, String>>,
92 );
93
94 assert_eq!(None, map.get_if_present(&key));
95 let value = future.await;
96
97 assert_eq!("Hello, foo!", value);
98 assert_eq!("Hello, foo!", map.get_if_present(&key).unwrap());
99 }
100}