1#![allow(unused)]
13
14use crate::channel::*;
15use std::collections::HashMap;
16use std::hash::Hash;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::sync::Mutex;
20
21pub type LookupResult<V, E> = std::result::Result<V, E>;
23pub enum RequestType<V, E> {
24 New(Receiver<LookupResult<V, E>>),
25 Pending(Receiver<LookupResult<V, E>>),
26}
27
28pub type SenderList<V, E> = Vec<Sender<LookupResult<V, E>>>;
30
31pub struct LookupHandler<K, V, E> {
65 pub map: Arc<Mutex<HashMap<K, SenderList<V, E>>>>,
66 pending: AtomicUsize,
67}
68
69impl<K, V, E> Default for LookupHandler<K, V, E>
71where
72 V: Clone,
73 K: Clone + Eq + Hash + std::fmt::Debug,
74 E: Clone,
75{
76 fn default() -> Self {
77 LookupHandler::new()
78 }
79}
80
81impl<K, V, E> LookupHandler<K, V, E>
82where
83 V: Clone,
84 K: Clone + Eq + Hash + std::fmt::Debug,
85 E: Clone,
86{
87 pub fn new() -> Self {
89 LookupHandler {
90 map: Arc::new(Mutex::new(HashMap::new())),
91 pending: AtomicUsize::new(0),
92 }
93 }
94
95 pub fn pending(&self) -> usize {
97 self.pending.load(Ordering::SeqCst)
98 }
99
100 pub async fn queue(&self, key: &K) -> RequestType<V, E> {
106 let mut pending = self.map.lock().unwrap();
107 let (sender, receiver) = oneshot::<LookupResult<V, E>>();
108
109 if let Some(list) = pending.get_mut(key) {
110 list.push(sender);
111 RequestType::Pending(receiver)
112 } else {
113 pending.insert(key.clone(), vec![sender]);
114 self.pending.fetch_add(1, Ordering::Relaxed);
115 RequestType::New(receiver)
116 }
117 }
118
119 pub async fn complete(&self, key: &K, result: LookupResult<V, E>) {
122 let list = { self.map.lock().unwrap().remove(key) };
123
124 if let Some(list) = list {
125 self.pending.fetch_sub(1, Ordering::Relaxed);
126 for sender in list {
127 sender
128 .send(result.clone())
129 .await
130 .expect("Unable to complete lookup result");
131 }
132 } else {
133 panic!("Lookup handler failure while processing key {key:?}")
134 }
135 }
136}
137
138#[cfg(not(target_arch = "bpf"))]
139#[cfg(any(test, feature = "test"))]
140mod tests {
141 use super::LookupHandler;
142 use super::RequestType;
143 use std::sync::Arc;
144 use std::sync::Mutex;
145 use std::sync::PoisonError;
146 use std::time::Duration;
147
148 use crate::task::sleep;
149 use futures::join;
150 use std::collections::HashMap;
151 use workflow_core::channel::RecvError;
152
153 #[derive(thiserror::Error, Debug, Clone)]
154 pub enum Error {
155 #[error("{0}")]
156 String(String),
157 }
158
159 impl<T> From<PoisonError<T>> for Error {
160 fn from(_: PoisonError<T>) -> Self {
161 Error::String("PoisonError".to_string())
162 }
163 }
164
165 impl From<RecvError> for Error {
166 fn from(_: RecvError) -> Self {
167 Error::String("RecvError".to_string())
168 }
169 }
170
171 type Result<T> = std::result::Result<T, Error>;
172
173 #[derive(Debug, Eq, PartialEq)]
174 enum RequestTypeTest {
175 New = 0,
176 Pending = 1,
177 }
178
179 struct LookupHandlerTest {
180 pub lookup_handler: LookupHandler<u32, Option<u32>, Error>,
181 pub map: Arc<Mutex<HashMap<u32, u32>>>,
182 pub request_types: Arc<Mutex<Vec<RequestTypeTest>>>,
183 }
184
185 impl LookupHandlerTest {
186 pub fn new() -> Self {
187 Self {
188 lookup_handler: LookupHandler::new(),
189 map: Arc::new(Mutex::new(HashMap::new())),
190 request_types: Arc::new(Mutex::new(Vec::new())),
191 }
192 }
193
194 pub fn insert(self: &Arc<Self>, key: u32, value: u32) -> Result<()> {
195 let mut map = self.map.lock()?;
196 map.insert(key, value);
197 Ok(())
198 }
199
200 pub async fn lookup_remote_impl(self: &Arc<Self>, key: &u32) -> Result<Option<u32>> {
201 sleep(Duration::from_millis(100)).await;
203 let map = self.map.lock()?;
205 Ok(map.get(key).cloned())
206 }
207
208 pub async fn lookup_handler_request(self: &Arc<Self>, key: &u32) -> Result<Option<u32>> {
209 let request_type = self.lookup_handler.queue(key).await;
210 match request_type {
211 RequestType::New(receiver) => {
212 self.request_types
213 .lock()
214 .unwrap()
215 .push(RequestTypeTest::New);
216 let response = self.lookup_remote_impl(key).await;
218 self.lookup_handler.complete(key, response).await;
220 receiver.recv().await?
221 }
222 RequestType::Pending(receiver) => {
223 self.request_types
224 .lock()
225 .unwrap()
226 .push(RequestTypeTest::Pending);
227 receiver.recv().await?
229 }
230 }
231 }
232 }
233
234 pub async fn lookup_handler_test() -> Result<()> {
235 let lht = Arc::new(LookupHandlerTest::new());
236 lht.insert(0xc0fee, 0xdecaf)?;
237
238 let v0 = lht.lookup_handler_request(&0xc0fee);
239 let v1 = lht.lookup_handler_request(&0xc0fee);
240 let v2 = lht.lookup_handler_request(&0xc0fee);
241 let f = join!(v0, v1, v2);
242
243 println!("[lh] results: {:?}", f);
244 let f = (
245 f.0.unwrap().unwrap(),
246 f.1.unwrap().unwrap(),
247 f.2.unwrap().unwrap(),
248 );
249 assert_eq!(f, (0xdecaf, 0xdecaf, 0xdecaf));
250
251 let request_types = lht.request_types.lock().unwrap();
252 println!("[lh] request types: {:?}", request_types);
253 assert_eq!(
254 request_types[..],
255 [
256 RequestTypeTest::New,
257 RequestTypeTest::Pending,
258 RequestTypeTest::Pending
259 ]
260 );
261 println!("all looks good ... 😎");
262
263 Ok(())
264 }
265
266 #[cfg(not(any(target_arch = "wasm32", target_arch = "bpf")))]
267 #[cfg(test)]
268 mod native_tests {
269 use super::*;
270
271 #[tokio::test]
272 pub async fn lookup_handler_test() -> Result<()> {
273 super::lookup_handler_test().await
274 }
275 }
276}