workflow_core/
lookup.rs

1//!
2//! [`LookupHandler`] provides ability to queue multiple async requests for the same key
3//! into a group of futures that resolve upon request completion.
4//!
5//! This functionality is useful when a client may be making multiple requests
6//! for data that is not available and may need to be fetched over a transport
7//! that may take time (such as network I/O). Each async request for the same
8//! key will get queued into a set of futures all of which will resolve once
9//! the initial request is resolved.
10//!
11
12#![allow(unused)]
13
14use crate::channel::*;
15use std::collections::HashMap;
16use std::hash::Hash;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::sync::Mutex;
20
21/// Custom result type used by [`LookupHandler`]
22pub type LookupResult<V, E> = std::result::Result<V, E>;
23pub enum RequestType<V, E> {
24    New(Receiver<LookupResult<V, E>>),
25    Pending(Receiver<LookupResult<V, E>>),
26}
27
28/// List of channel senders awaiting for the same key lookup.
29pub type SenderList<V, E> = Vec<Sender<LookupResult<V, E>>>;
30
31///
32/// [`LookupHandler`] provides ability to queue multiple async requests for the same key
33/// into a group of futures that resolve upon request completion.
34///
35/// To use [`LookupHandler`], you need to create a custom lookup function. The example below
36/// declares a function `lookup()` that uses [`LookupHandler`] to queue requests
37/// and if there are no pending requests (request is new) performs the actual
38/// request by calling `lookup_impl()`. The [`LookupHandler::complete()`] will
39/// resolve all pending futures for the specific key.
40///
41/// Example:
42/// ```ignore
43/// ...
44/// pub lookup_handler : LookupHandler<Pubkey,Arc<Data>,Error>
45/// ...
46/// async fn lookup(&self, pubkey:&Pubkey) -> Result<Option<Arc<Data>>> {
47///     let request_type = self.lookup_handler.queue(pubkey).await;
48///     let result = match request_type {
49///         RequestType::New(receiver) => {
50///             // execute the actual lookup
51///             let response = self.lookup_impl(pubkey).await;
52///             // signal completion for all awaiting futures
53///             lookup_handler.complete(pubkey, response).await;
54///             // this request is queued like all the others
55///             // so wait for your own notification as well
56///             receiver.recv().await?
57///         },
58///         RequestType::Pending(receiver) => {
59///             receiver.recv().await?
60///         }
61///     }
62/// };
63/// ```
64pub struct LookupHandler<K, V, E> {
65    pub map: Arc<Mutex<HashMap<K, SenderList<V, E>>>>,
66    pending: AtomicUsize,
67}
68
69/// Default trait for the LookupHandler
70impl<K, V, E> Default for LookupHandler<K, V, E>
71where
72    V: Clone,
73    K: Clone + Eq + Hash + std::fmt::Debug,
74    E: Clone,
75{
76    fn default() -> Self {
77        LookupHandler::new()
78    }
79}
80
81impl<K, V, E> LookupHandler<K, V, E>
82where
83    V: Clone,
84    K: Clone + Eq + Hash + std::fmt::Debug,
85    E: Clone,
86{
87    /// Create a new instance of the LookupHandler
88    pub fn new() -> Self {
89        LookupHandler {
90            map: Arc::new(Mutex::new(HashMap::new())),
91            pending: AtomicUsize::new(0),
92        }
93    }
94
95    /// Returns the total number of pending requests
96    pub fn pending(&self) -> usize {
97        self.pending.load(Ordering::SeqCst)
98    }
99
100    /// Queue the request for key `K`. Returns [`RequestType::New`] if
101    /// no other requests for the same key are pending and [`RequestType::Pending`]
102    /// if there are pending requests. Both [`RequestType`] values contain a [`async_std::channel::Receiver`]
103    /// that can be listened to for lookup completion. Lookup completion
104    /// can be signaled by [`LookupHandler::complete()`]
105    pub async fn queue(&self, key: &K) -> RequestType<V, E> {
106        let mut pending = self.map.lock().unwrap();
107        let (sender, receiver) = oneshot::<LookupResult<V, E>>();
108
109        if let Some(list) = pending.get_mut(key) {
110            list.push(sender);
111            RequestType::Pending(receiver)
112        } else {
113            pending.insert(key.clone(), vec![sender]);
114            self.pending.fetch_add(1, Ordering::Relaxed);
115            RequestType::New(receiver)
116        }
117    }
118
119    /// Signal the lookup completion for key `K` by supplying a [`LookupResult`]
120    /// with a resulting value `V` or an error `E`.
121    pub async fn complete(&self, key: &K, result: LookupResult<V, E>) {
122        let list = { self.map.lock().unwrap().remove(key) };
123
124        if let Some(list) = list {
125            self.pending.fetch_sub(1, Ordering::Relaxed);
126            for sender in list {
127                sender
128                    .send(result.clone())
129                    .await
130                    .expect("Unable to complete lookup result");
131            }
132        } else {
133            panic!("Lookup handler failure while processing key {key:?}")
134        }
135    }
136}
137
138#[cfg(not(target_arch = "bpf"))]
139#[cfg(any(test, feature = "test"))]
140mod tests {
141    use super::LookupHandler;
142    use super::RequestType;
143    use std::sync::Arc;
144    use std::sync::Mutex;
145    use std::sync::PoisonError;
146    use std::time::Duration;
147
148    use crate::task::sleep;
149    use futures::join;
150    use std::collections::HashMap;
151    use workflow_core::channel::RecvError;
152
153    #[derive(thiserror::Error, Debug, Clone)]
154    pub enum Error {
155        #[error("{0}")]
156        String(String),
157    }
158
159    impl<T> From<PoisonError<T>> for Error {
160        fn from(_: PoisonError<T>) -> Self {
161            Error::String("PoisonError".to_string())
162        }
163    }
164
165    impl From<RecvError> for Error {
166        fn from(_: RecvError) -> Self {
167            Error::String("RecvError".to_string())
168        }
169    }
170
171    type Result<T> = std::result::Result<T, Error>;
172
173    #[derive(Debug, Eq, PartialEq)]
174    enum RequestTypeTest {
175        New = 0,
176        Pending = 1,
177    }
178
179    struct LookupHandlerTest {
180        pub lookup_handler: LookupHandler<u32, Option<u32>, Error>,
181        pub map: Arc<Mutex<HashMap<u32, u32>>>,
182        pub request_types: Arc<Mutex<Vec<RequestTypeTest>>>,
183    }
184
185    impl LookupHandlerTest {
186        pub fn new() -> Self {
187            Self {
188                lookup_handler: LookupHandler::new(),
189                map: Arc::new(Mutex::new(HashMap::new())),
190                request_types: Arc::new(Mutex::new(Vec::new())),
191            }
192        }
193
194        pub fn insert(self: &Arc<Self>, key: u32, value: u32) -> Result<()> {
195            let mut map = self.map.lock()?;
196            map.insert(key, value);
197            Ok(())
198        }
199
200        pub async fn lookup_remote_impl(self: &Arc<Self>, key: &u32) -> Result<Option<u32>> {
201            // println!("[lh] lookup sleep...");
202            sleep(Duration::from_millis(100)).await;
203            // println!("[lh] lookup wake...");
204            let map = self.map.lock()?;
205            Ok(map.get(key).cloned())
206        }
207
208        pub async fn lookup_handler_request(self: &Arc<Self>, key: &u32) -> Result<Option<u32>> {
209            let request_type = self.lookup_handler.queue(key).await;
210            match request_type {
211                RequestType::New(receiver) => {
212                    self.request_types
213                        .lock()
214                        .unwrap()
215                        .push(RequestTypeTest::New);
216                    // println!("[lh] new request");
217                    let response = self.lookup_remote_impl(key).await;
218                    // println!("[lh] completing initial request");
219                    self.lookup_handler.complete(key, response).await;
220                    receiver.recv().await?
221                }
222                RequestType::Pending(receiver) => {
223                    self.request_types
224                        .lock()
225                        .unwrap()
226                        .push(RequestTypeTest::Pending);
227                    // println!("[lh] pending request");
228                    receiver.recv().await?
229                }
230            }
231        }
232    }
233
234    pub async fn lookup_handler_test() -> Result<()> {
235        let lht = Arc::new(LookupHandlerTest::new());
236        lht.insert(0xc0fee, 0xdecaf)?;
237
238        let v0 = lht.lookup_handler_request(&0xc0fee);
239        let v1 = lht.lookup_handler_request(&0xc0fee);
240        let v2 = lht.lookup_handler_request(&0xc0fee);
241        let f = join!(v0, v1, v2);
242
243        println!("[lh] results: {:?}", f);
244        let f = (
245            f.0.unwrap().unwrap(),
246            f.1.unwrap().unwrap(),
247            f.2.unwrap().unwrap(),
248        );
249        assert_eq!(f, (0xdecaf, 0xdecaf, 0xdecaf));
250
251        let request_types = lht.request_types.lock().unwrap();
252        println!("[lh] request types: {:?}", request_types);
253        assert_eq!(
254            request_types[..],
255            [
256                RequestTypeTest::New,
257                RequestTypeTest::Pending,
258                RequestTypeTest::Pending
259            ]
260        );
261        println!("all looks good ... 😎");
262
263        Ok(())
264    }
265
266    #[cfg(not(any(target_arch = "wasm32", target_arch = "bpf")))]
267    #[cfg(test)]
268    mod native_tests {
269        use super::*;
270
271        #[tokio::test]
272        pub async fn lookup_handler_test() -> Result<()> {
273            super::lookup_handler_test().await
274        }
275    }
276}