vantage_api_pool/matcher/
mod.rs1use std::{
2 collections::HashMap,
3 sync::{atomic::AtomicUsize, Arc},
4};
5
6use reqwest::{Request, Response};
7use tokio::sync::{mpsc, oneshot, Mutex};
8
9use crate::EventualRequest;
10
11pub struct EventualRequestMatcher<T: Send + Sync + Sized> {
12 thread_handle: tokio::task::JoinHandle<()>,
13 request_sender: mpsc::Sender<EventualRequest<T>>,
14 pending_requests: Arc<Mutex<HashMap<usize, oneshot::Sender<EventualRequest<T>>>>>,
15 seq_id: AtomicUsize,
16}
17
18impl<T: Send + Sync + Sized + 'static> EventualRequestMatcher<T> {
19 pub fn new(
20 request_sender: mpsc::Sender<EventualRequest<T>>,
21 response_receiver: mpsc::Receiver<EventualRequest<T>>,
22 ) -> Self {
23 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
24 let thread_handle = tokio::spawn(Self::worker_thread(
25 pending_requests.clone(),
26 response_receiver,
27 ));
28
29 Self {
30 request_sender,
31 thread_handle,
32 pending_requests,
33 seq_id: AtomicUsize::new(0),
34 }
35 }
36
37 pub fn seq_id(&self) -> usize {
38 self.seq_id
39 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
40 }
41
42 pub async fn worker_thread(
43 ch: Arc<Mutex<HashMap<usize, oneshot::Sender<EventualRequest<T>>>>>,
44 mut response_receiver: mpsc::Receiver<EventualRequest<T>>,
45 ) {
46 loop {
47 let Some(response) = response_receiver.recv().await else {
48 eprintln!("EventualRequestMatcher: response channel closed");
49 break;
50 };
51
52 match ch.lock().await.remove(&response.get_id().unwrap()) {
53 Some(sender) => sender.send(response).unwrap_or_else(|response| {
54 eprintln!(
55 "EventualRequestMatcher: failed to send response for id {:?}",
56 response.get_id()
57 )
58 }),
59 None => eprintln!(
60 "EventualRequestMatcher: no pending request found for id {:?}",
61 response.get_id()
62 ),
63 }
64 }
65 }
66
67 pub async fn send(
68 &self,
69 request: impl Into<Request>,
70 metadata: Option<T>,
71 ) -> Result<Response, String> {
72 let mut request = EventualRequest::new(request, metadata);
73 let receiver = request
74 .register(self.seq_id(), self.pending_requests.clone())
75 .await;
76
77 self.request_sender
78 .send(request)
79 .await
80 .map_err(|e| e.to_string())?;
81
82 let mut result = receiver.await.map_err(|e| e.to_string())?;
83
84 result.response().ok_or("No response".to_string())
85 }
86
87 pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
88 drop(self.request_sender);
90
91 self.thread_handle.await
93 }
94}