exocore_transport/http/
requests.rs

1use std::{
2    collections::HashMap,
3    sync::{
4        atomic::{AtomicU64, Ordering},
5        Arc, Weak,
6    },
7    time::Duration,
8};
9
10use exocore_core::futures::{block_on, sleep};
11use futures::{channel::oneshot, lock::Mutex, FutureExt};
12
13use super::config::HttpTransportConfig;
14use crate::OutMessage;
15
16pub type RequestId = u64;
17
18/// Tracks incoming HTTP requests for which we are waiting a reply from a
19/// service.
20pub struct RequestTracker {
21    requests: Mutex<HashMap<RequestId, oneshot::Sender<OutMessage>>>,
22    next_id: AtomicU64,
23    config: HttpTransportConfig,
24}
25
26impl RequestTracker {
27    pub fn new(config: HttpTransportConfig) -> RequestTracker {
28        RequestTracker {
29            requests: Mutex::new(HashMap::new()),
30            next_id: AtomicU64::new(0),
31            config,
32        }
33    }
34
35    /// Pushes a new request for which we'll expect a reply from a service.
36    pub async fn push(self: Arc<Self>) -> TrackedRequest {
37        let mut requests = self.requests.lock().await;
38
39        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
40
41        let (sender, receiver) = oneshot::channel();
42        let request = TrackedRequest {
43            id,
44            requests: Arc::downgrade(&self),
45            receiver: Some(receiver),
46            receive_timeout: self.config.request_timeout,
47        };
48
49        requests.insert(request.id, sender);
50
51        request
52    }
53
54    /// Handles a reply from a service to be sent back to a request.
55    pub async fn reply(&self, request_id: RequestId, message: OutMessage) {
56        let sender = {
57            let mut requests = self.requests.lock().await;
58            requests.remove(&request_id)
59        };
60
61        if let Some(sender) = sender {
62            if sender.send(message).is_err() {
63                warn!(
64                    "Error replying message to request {}. Channel got dropped.",
65                    request_id
66                );
67            }
68        } else {
69            warn!(
70                "Tried to reply to request {}, but wasn't there anymore (timed-out?)",
71                request_id
72            );
73        }
74    }
75
76    pub async fn remove(&self, request_id: RequestId) {
77        let mut requests = self.requests.lock().await;
78        requests.remove(&request_id);
79    }
80}
81
82/// Receiving end of a the tracked request. This is used in the HTTP request
83/// handler to wait for a reply from a service.
84pub struct TrackedRequest {
85    id: RequestId,
86    requests: Weak<RequestTracker>,
87    receiver: Option<oneshot::Receiver<OutMessage>>,
88    receive_timeout: Duration,
89}
90
91impl TrackedRequest {
92    pub fn id(&self) -> RequestId {
93        self.id
94    }
95
96    pub async fn get_response_or_timeout(mut self) -> Result<OutMessage, ()> {
97        let receiver = self.receiver.take().ok_or(())?;
98        let timeout = sleep(self.receive_timeout);
99
100        futures::select! {
101            resp = receiver.fuse() => {
102                resp.map_err(|_| ())
103            },
104            _ = timeout.fuse() => {
105                Err(())
106            },
107        }
108    }
109}
110
111impl Drop for TrackedRequest {
112    fn drop(&mut self) {
113        if let Some(requests) = self.requests.upgrade() {
114            block_on(requests.remove(self.id));
115        }
116    }
117}