async_map/
non_locking_map.rs

1use crate::single_writer_versioned::{DataUpdater, Versioned};
2use crate::{AsyncKey, AsyncMap, AsyncStorable, FactoryBorrow};
3use im;
4
5use futures::{FutureExt, TryFutureExt};
6use std::future::{ready, Future};
7use std::pin::Pin;
8
9#[derive(Clone)]
10pub struct NonLockingMap<K: AsyncKey, V: AsyncStorable> {
11    versioned: Versioned<im::HashMap<K, V>>,
12}
13
14impl<K: AsyncKey, V: AsyncStorable> NonLockingMap<K, V> {
15    pub fn new() -> NonLockingMap<K, V> {
16        NonLockingMap {
17            versioned: Versioned::from_initial(im::HashMap::new()).0, // No quitting!
18        }
19    }
20}
21
22impl<K: AsyncKey, V: AsyncStorable> AsyncMap for NonLockingMap<K, V> {
23    type Key = K;
24    type Value = V;
25    fn get_if_present(&self, key: &K) -> Option<V> {
26        self.versioned
27            .with_latest(|map| map.get(key).map(|value| value.clone()))
28    }
29
30    fn get<'a, 'b, F: FactoryBorrow<K, V>>(
31        &'a self,
32        key: &'a K,
33        factory: F,
34    ) -> Pin<Box<(dyn Future<Output = V> + Send + 'b)>> {
35        match self.get_if_present(key) {
36            Some(value) => ready(value).boxed(),
37            None => {
38                let (sender, receiver) = tokio::sync::oneshot::channel();
39
40                let key = key.clone();
41                let updater: Box<dyn DataUpdater<im::HashMap<K, V>>> =
42                    Box::new(move |map| match map.get(&key) {
43                        Some(value) => {
44                            sender.send(value.clone()).expect("Send failed!");
45                            None
46                        }
47                        None => {
48                            let new_value = (*factory.borrow())(&key);
49                            let new_map = map.update(key, new_value.clone());
50                            sender.send(new_value).expect("Send failed!");
51                            Some(new_map)
52                        }
53                    });
54
55                if self.versioned.clone().update(updater).is_err() {
56                    panic!("Update failed");
57                }
58
59                receiver
60                    .unwrap_or_else(|_| panic!("Oneshot receive failed!"))
61                    .boxed()
62            }
63        }
64    }
65}
66
67#[cfg(test)]
68mod test {
69
70    use super::NonLockingMap as VersionedMap;
71    use crate::{AsyncFactory, AsyncMap};
72    #[tokio::test]
73    async fn get_sync() {
74        let map = VersionedMap::<String, String>::new();
75
76        assert_eq!(None, map.get_if_present(&"foo".to_owned()));
77    }
78
79    fn hello_factory(key: &String) -> String {
80        format!("Hello, {}!", key)
81    }
82
83    #[tokio::test]
84    async fn get_sync2() {
85        let map = VersionedMap::<String, String>::new();
86
87        let key = "foo".to_owned();
88
89        let future = map.get(
90            &key,
91            Box::new(hello_factory) as Box<dyn AsyncFactory<String, String>>,
92        );
93
94        assert_eq!(None, map.get_if_present(&key));
95        let value = future.await;
96
97        assert_eq!("Hello, foo!", value);
98        assert_eq!("Hello, foo!", map.get_if_present(&key).unwrap());
99    }
100}