Skip to main content

vantage_api_pool/matcher/
mod.rs

1use std::{
2    collections::HashMap,
3    sync::{atomic::AtomicUsize, Arc},
4};
5
6use reqwest::{Request, Response};
7use tokio::sync::{mpsc, oneshot, Mutex};
8
9use crate::EventualRequest;
10
11pub struct EventualRequestMatcher<T: Send + Sync + Sized> {
12    thread_handle: tokio::task::JoinHandle<()>,
13    request_sender: mpsc::Sender<EventualRequest<T>>,
14    pending_requests: Arc<Mutex<HashMap<usize, oneshot::Sender<EventualRequest<T>>>>>,
15    seq_id: AtomicUsize,
16}
17
18impl<T: Send + Sync + Sized + 'static> EventualRequestMatcher<T> {
19    pub fn new(
20        request_sender: mpsc::Sender<EventualRequest<T>>,
21        response_receiver: mpsc::Receiver<EventualRequest<T>>,
22    ) -> Self {
23        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
24        let thread_handle = tokio::spawn(Self::worker_thread(
25            pending_requests.clone(),
26            response_receiver,
27        ));
28
29        Self {
30            request_sender,
31            thread_handle,
32            pending_requests,
33            seq_id: AtomicUsize::new(0),
34        }
35    }
36
37    pub fn seq_id(&self) -> usize {
38        self.seq_id
39            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
40    }
41
42    pub async fn worker_thread(
43        ch: Arc<Mutex<HashMap<usize, oneshot::Sender<EventualRequest<T>>>>>,
44        mut response_receiver: mpsc::Receiver<EventualRequest<T>>,
45    ) {
46        loop {
47            let Some(response) = response_receiver.recv().await else {
48                eprintln!("EventualRequestMatcher: response channel closed");
49                break;
50            };
51
52            match ch.lock().await.remove(&response.get_id().unwrap()) {
53                Some(sender) => sender.send(response).unwrap_or_else(|response| {
54                    eprintln!(
55                        "EventualRequestMatcher: failed to send response for id {:?}",
56                        response.get_id()
57                    )
58                }),
59                None => eprintln!(
60                    "EventualRequestMatcher: no pending request found for id {:?}",
61                    response.get_id()
62                ),
63            }
64        }
65    }
66
67    pub async fn send(
68        &self,
69        request: impl Into<Request>,
70        metadata: Option<T>,
71    ) -> Result<Response, String> {
72        let mut request = EventualRequest::new(request, metadata);
73        let receiver = request
74            .register(self.seq_id(), self.pending_requests.clone())
75            .await;
76
77        self.request_sender
78            .send(request)
79            .await
80            .map_err(|e| e.to_string())?;
81
82        let mut result = receiver.await.map_err(|e| e.to_string())?;
83
84        result.response().ok_or("No response".to_string())
85    }
86
87    pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
88        // Close channel, this will cause worker to finish
89        drop(self.request_sender);
90
91        // wait for worker to finish
92        self.thread_handle.await
93    }
94}