uv_once_map/
lib.rs

1use std::borrow::Borrow;
2use std::hash::{BuildHasher, Hash, RandomState};
3use std::pin::pin;
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use tokio::sync::Notify;
8
9/// Run tasks only once and store the results in a parallel hash map.
10///
11/// We often have jobs `Fn(K) -> V` that we only want to run once and memoize, e.g. network
12/// requests for metadata. When multiple tasks start the same query in parallel, e.g. through source
13/// dist builds, we want to wait until the other task is done and get a reference to the same
14/// result.
15///
16/// Note that this always clones the value out of the underlying map. Because
17/// of this, it's common to wrap the `V` in an `Arc<V>` to make cloning cheap.
18pub struct OnceMap<K, V, S = RandomState> {
19    items: DashMap<K, Value<V>, S>,
20}
21
22impl<K: Eq + Hash, V: Clone, H: BuildHasher + Clone> OnceMap<K, V, H> {
23    /// Create a [`OnceMap`] with the specified hasher.
24    pub fn with_hasher(hasher: H) -> Self {
25        Self {
26            items: DashMap::with_hasher(hasher),
27        }
28    }
29
30    /// Create a [`OnceMap`] with the specified capacity and hasher.
31    pub fn with_capacity_and_hasher(capacity: usize, hasher: H) -> Self {
32        Self {
33            items: DashMap::with_capacity_and_hasher(capacity, hasher),
34        }
35    }
36
37    /// Register that you want to start a job.
38    ///
39    /// If this method returns `true`, you need to start a job and call [`OnceMap::done`] eventually
40    /// or other tasks will hang. If it returns `false`, this job is already in progress and you
41    /// can [`OnceMap::wait`] for the result.
42    pub fn register(&self, key: K) -> bool {
43        let entry = self.items.entry(key);
44        match entry {
45            dashmap::mapref::entry::Entry::Occupied(_) => false,
46            dashmap::mapref::entry::Entry::Vacant(entry) => {
47                entry.insert(Value::Waiting(Arc::new(Notify::new())));
48                true
49            }
50        }
51    }
52
53    /// Submit the result of a job you registered.
54    pub fn done(&self, key: K, value: V) {
55        if let Some(Value::Waiting(notify)) = self.items.insert(key, Value::Filled(value)) {
56            notify.notify_waiters();
57        }
58    }
59
60    /// Wait for the result of a job that is running.
61    ///
62    /// Will hang if [`OnceMap::done`] isn't called for this key.
63    pub async fn wait(&self, key: &K) -> Option<V> {
64        let notify = {
65            let entry = self.items.get(key)?;
66            match entry.value() {
67                Value::Filled(value) => return Some(value.clone()),
68                Value::Waiting(notify) => notify.clone(),
69            }
70        };
71
72        // Register the waiter for calls to `notify_waiters`.
73        let notification = pin!(notify.notified());
74
75        // Make sure the value wasn't inserted in-between us checking the map and registering the waiter.
76        if let Value::Filled(value) = self.items.get(key).expect("map is append-only").value() {
77            return Some(value.clone());
78        }
79
80        // Wait until the value is inserted.
81        notification.await;
82
83        let entry = self.items.get(key).expect("map is append-only");
84        match entry.value() {
85            Value::Filled(value) => Some(value.clone()),
86            Value::Waiting(_) => unreachable!("notify was called"),
87        }
88    }
89
90    /// Wait for the result of a job that is running, in a blocking context.
91    ///
92    /// Will hang if [`OnceMap::done`] isn't called for this key.
93    pub fn wait_blocking(&self, key: &K) -> Option<V> {
94        futures::executor::block_on(self.wait(key))
95    }
96
97    /// Return the result of a previous job, if any.
98    pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
99    where
100        K: Borrow<Q>,
101    {
102        let entry = self.items.get(key)?;
103        match entry.value() {
104            Value::Filled(value) => Some(value.clone()),
105            Value::Waiting(_) => None,
106        }
107    }
108
109    /// Remove the result of a previous job, if any.
110    pub fn remove<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
111    where
112        K: Borrow<Q>,
113    {
114        let entry = self.items.remove(key)?;
115        match entry {
116            (_, Value::Filled(value)) => Some(value),
117            (_, Value::Waiting(_)) => None,
118        }
119    }
120}
121
122impl<K: Eq + Hash + Clone, V, H: Default + BuildHasher + Clone> Default for OnceMap<K, V, H> {
123    fn default() -> Self {
124        Self {
125            items: DashMap::with_hasher(H::default()),
126        }
127    }
128}
129
130impl<K, V, H> FromIterator<(K, V)> for OnceMap<K, V, H>
131where
132    K: Eq + Hash,
133    H: Default + Clone + BuildHasher,
134{
135    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
136        Self {
137            items: iter
138                .into_iter()
139                .map(|(k, v)| (k, Value::Filled(v)))
140                .collect(),
141        }
142    }
143}
144
145enum Value<V> {
146    Waiting(Arc<Notify>),
147    Filled(V),
148}