1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::{collections::HashMap, fmt::Debug};
use tokio::sync::mpsc;
use crate::{message, ResponseManagerHandle};
#[derive(Debug)]
enum ResponseState<R> {
Received(message::Receive<R>),
Requested(message::Request<R>),
}
pub struct ResponseManager<R>
where
R: Send + Debug,
{
response: HashMap<u64, ResponseState<R>>,
}
impl<R> ResponseManager<R>
where
R: Send + Debug + 'static,
{
pub fn spawn() -> ResponseManagerHandle<R> {
let (sender, receiver) = mpsc::unbounded_channel();
let task = tokio::spawn(async {
Self::new().listen(receiver).await;
});
ResponseManagerHandle::new(task, sender)
}
fn new() -> Self {
let response = HashMap::new();
Self { response }
}
async fn listen(
&mut self,
mut receiver: mpsc::UnboundedReceiver<message::Message<R>>,
) -> Option<()> {
loop {
let message = receiver.recv().await?;
self.handle_message(message);
}
}
fn handle_message(&mut self, message: message::Message<R>) {
match message {
message::Message::Request(request) => self.on_request(request),
message::Message::Receive(store) => self.on_receive(store),
}
}
fn on_request(&mut self, request: message::Request<R>) {
let query = self.response.remove(&request.request_id);
match query {
None => self.store_request(request),
Some(ResponseState::Received(receive)) => Self::fulfill_request(request, receive),
Some(ResponseState::Requested(_)) => {
panic!("requested two requests for the same ID")
}
}
}
fn on_receive(&mut self, receive: message::Receive<R>) {
let query = self.response.remove(&receive.request_id);
match query {
None => self.store_received(receive),
Some(ResponseState::Requested(request)) => Self::fulfill_request(request, receive),
Some(ResponseState::Received(_)) => {
panic!("received two response for the same ID")
}
}
}
fn store_request(&mut self, request: message::Request<R>) {
let key = request.request_id;
self.response.insert(key, ResponseState::Requested(request));
}
fn store_received(&mut self, received: message::Receive<R>) {
let key = received.request_id;
self.response.insert(key, ResponseState::Received(received));
}
fn fulfill_request(request: message::Request<R>, receive: message::Receive<R>) {
let message::Request {
request_id: _,
resolve,
} = request;
let message::Receive {
request_id: _,
signal,
} = receive;
resolve.send(signal).unwrap();
}
}