exocore_transport/http/
requests.rs1use std::{
2 collections::HashMap,
3 sync::{
4 atomic::{AtomicU64, Ordering},
5 Arc, Weak,
6 },
7 time::Duration,
8};
9
10use exocore_core::futures::{block_on, sleep};
11use futures::{channel::oneshot, lock::Mutex, FutureExt};
12
13use super::config::HttpTransportConfig;
14use crate::OutMessage;
15
16pub type RequestId = u64;
17
18pub struct RequestTracker {
21 requests: Mutex<HashMap<RequestId, oneshot::Sender<OutMessage>>>,
22 next_id: AtomicU64,
23 config: HttpTransportConfig,
24}
25
26impl RequestTracker {
27 pub fn new(config: HttpTransportConfig) -> RequestTracker {
28 RequestTracker {
29 requests: Mutex::new(HashMap::new()),
30 next_id: AtomicU64::new(0),
31 config,
32 }
33 }
34
35 pub async fn push(self: Arc<Self>) -> TrackedRequest {
37 let mut requests = self.requests.lock().await;
38
39 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
40
41 let (sender, receiver) = oneshot::channel();
42 let request = TrackedRequest {
43 id,
44 requests: Arc::downgrade(&self),
45 receiver: Some(receiver),
46 receive_timeout: self.config.request_timeout,
47 };
48
49 requests.insert(request.id, sender);
50
51 request
52 }
53
54 pub async fn reply(&self, request_id: RequestId, message: OutMessage) {
56 let sender = {
57 let mut requests = self.requests.lock().await;
58 requests.remove(&request_id)
59 };
60
61 if let Some(sender) = sender {
62 if sender.send(message).is_err() {
63 warn!(
64 "Error replying message to request {}. Channel got dropped.",
65 request_id
66 );
67 }
68 } else {
69 warn!(
70 "Tried to reply to request {}, but wasn't there anymore (timed-out?)",
71 request_id
72 );
73 }
74 }
75
76 pub async fn remove(&self, request_id: RequestId) {
77 let mut requests = self.requests.lock().await;
78 requests.remove(&request_id);
79 }
80}
81
82pub struct TrackedRequest {
85 id: RequestId,
86 requests: Weak<RequestTracker>,
87 receiver: Option<oneshot::Receiver<OutMessage>>,
88 receive_timeout: Duration,
89}
90
91impl TrackedRequest {
92 pub fn id(&self) -> RequestId {
93 self.id
94 }
95
96 pub async fn get_response_or_timeout(mut self) -> Result<OutMessage, ()> {
97 let receiver = self.receiver.take().ok_or(())?;
98 let timeout = sleep(self.receive_timeout);
99
100 futures::select! {
101 resp = receiver.fuse() => {
102 resp.map_err(|_| ())
103 },
104 _ = timeout.fuse() => {
105 Err(())
106 },
107 }
108 }
109}
110
111impl Drop for TrackedRequest {
112 fn drop(&mut self) {
113 if let Some(requests) = self.requests.upgrade() {
114 block_on(requests.remove(self.id));
115 }
116 }
117}