uv_once_map/
lib.rs

1use std::borrow::Borrow;
2use std::fmt::{Debug, Formatter};
3use std::hash::{BuildHasher, Hash, RandomState};
4use std::pin::pin;
5use std::sync::Arc;
6
7use dashmap::DashMap;
8use tokio::sync::Notify;
9
10/// Run tasks only once and store the results in a parallel hash map.
11///
12/// We often have jobs `Fn(K) -> V` that we only want to run once and memoize, e.g. network
13/// requests for metadata. When multiple tasks start the same query in parallel, e.g. through source
14/// dist builds, we want to wait until the other task is done and get a reference to the same
15/// result.
16///
17/// Note that this always clones the value out of the underlying map. Because
18/// of this, it's common to wrap the `V` in an `Arc<V>` to make cloning cheap.
19pub struct OnceMap<K, V, S = RandomState> {
20    items: DashMap<K, Value<V>, S>,
21}
22
23impl<K: Eq + Hash + Debug, V: Debug, S: BuildHasher + Clone> Debug for OnceMap<K, V, S> {
24    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
25        Debug::fmt(&self.items, f)
26    }
27}
28
29impl<K: Eq + Hash, V: Clone, H: BuildHasher + Clone> OnceMap<K, V, H> {
30    /// Create a [`OnceMap`] with the specified hasher.
31    pub fn with_hasher(hasher: H) -> Self {
32        Self {
33            items: DashMap::with_hasher(hasher),
34        }
35    }
36
37    /// Create a [`OnceMap`] with the specified capacity and hasher.
38    pub fn with_capacity_and_hasher(capacity: usize, hasher: H) -> Self {
39        Self {
40            items: DashMap::with_capacity_and_hasher(capacity, hasher),
41        }
42    }
43
44    /// Register that you want to start a job.
45    ///
46    /// If this method returns `true`, you need to start a job and call [`OnceMap::done`] eventually
47    /// or other tasks will hang. If it returns `false`, this job is already in progress and you
48    /// can [`OnceMap::wait`] for the result.
49    pub fn register(&self, key: K) -> bool {
50        let entry = self.items.entry(key);
51        match entry {
52            dashmap::mapref::entry::Entry::Occupied(_) => false,
53            dashmap::mapref::entry::Entry::Vacant(entry) => {
54                entry.insert(Value::Waiting(Arc::new(Notify::new())));
55                true
56            }
57        }
58    }
59
60    /// Submit the result of a job you registered.
61    pub fn done(&self, key: K, value: V) {
62        if let Some(Value::Waiting(notify)) = self.items.insert(key, Value::Filled(value)) {
63            notify.notify_waiters();
64        }
65    }
66
67    /// Wait for the result of a job that is running.
68    ///
69    /// Will hang if [`OnceMap::done`] isn't called for this key.
70    pub async fn wait(&self, key: &K) -> Option<V> {
71        let notify = {
72            let entry = self.items.get(key)?;
73            match entry.value() {
74                Value::Filled(value) => return Some(value.clone()),
75                Value::Waiting(notify) => notify.clone(),
76            }
77        };
78
79        // Register the waiter for calls to `notify_waiters`.
80        let notification = pin!(notify.notified());
81
82        // Make sure the value wasn't inserted in-between us checking the map and registering the waiter.
83        if let Value::Filled(value) = self.items.get(key).expect("map is append-only").value() {
84            return Some(value.clone());
85        }
86
87        // Wait until the value is inserted.
88        notification.await;
89
90        let entry = self.items.get(key).expect("map is append-only");
91        match entry.value() {
92            Value::Filled(value) => Some(value.clone()),
93            Value::Waiting(_) => unreachable!("notify was called"),
94        }
95    }
96
97    /// Wait for the result of a job that is running, in a blocking context.
98    ///
99    /// Will hang if [`OnceMap::done`] isn't called for this key.
100    pub fn wait_blocking(&self, key: &K) -> Option<V> {
101        futures::executor::block_on(self.wait(key))
102    }
103
104    /// Return the result of a previous job, if any.
105    pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
106    where
107        K: Borrow<Q>,
108    {
109        let entry = self.items.get(key)?;
110        match entry.value() {
111            Value::Filled(value) => Some(value.clone()),
112            Value::Waiting(_) => None,
113        }
114    }
115
116    /// Remove the result of a previous job, if any.
117    pub fn remove<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
118    where
119        K: Borrow<Q>,
120    {
121        let entry = self.items.remove(key)?;
122        match entry {
123            (_, Value::Filled(value)) => Some(value),
124            (_, Value::Waiting(_)) => None,
125        }
126    }
127}
128
129impl<K: Eq + Hash + Clone, V, H: Default + BuildHasher + Clone> Default for OnceMap<K, V, H> {
130    fn default() -> Self {
131        Self {
132            items: DashMap::with_hasher(H::default()),
133        }
134    }
135}
136
137impl<K, V, H> FromIterator<(K, V)> for OnceMap<K, V, H>
138where
139    K: Eq + Hash,
140    H: Default + Clone + BuildHasher,
141{
142    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
143        Self {
144            items: iter
145                .into_iter()
146                .map(|(k, v)| (k, Value::Filled(v)))
147                .collect(),
148        }
149    }
150}
151
152#[derive(Debug)]
153enum Value<V> {
154    Waiting(Arc<Notify>),
155    Filled(V),
156}