1use std::{collections::HashMap, sync::Mutex};
2
3use serde::{de::DeserializeOwned, ser::Serialize};
4use serde_json::Value as JsonValue;
5use tokio::sync::oneshot;
6
7use crate::{
8 mqtt::{Agent, IncomingResponse, OutgoingMessage, OutgoingRequest},
9 Error,
10};
11
12pub struct Dispatcher {
13 agent: Agent,
14 store: Mutex<HashMap<String, oneshot::Sender<IncomingResponse<JsonValue>>>>,
15}
16
17impl Dispatcher {
18 pub fn new(agent: &Agent) -> Self {
19 Self {
20 agent: agent.to_owned(),
21 store: Mutex::new(HashMap::new()),
22 }
23 }
24
25 pub async fn request<Req, Resp>(
26 &self,
27 req: OutgoingRequest<Req>,
28 ) -> Result<IncomingResponse<Resp>, Error>
29 where
30 Req: 'static + Serialize,
31 Resp: DeserializeOwned,
32 {
33 let corr_data = req.properties().correlation_data();
34 let rx = {
35 let mut store_lock = self.store.lock().expect("Dispatcher lock poisoned");
36
37 if store_lock.get(corr_data).is_some() {
38 let err = format!(
39 "Already awaiting response with correlation data = '{}'",
40 corr_data
41 );
42 return Err(Error::new(&err));
43 }
44
45 let (tx, rx) = oneshot::channel::<IncomingResponse<JsonValue>>();
46 store_lock.insert(corr_data.to_owned(), tx);
47 drop(store_lock);
48 rx
49 };
50
51 self.agent.clone().publish(OutgoingMessage::Request(req))?;
52
53 let resp = rx
54 .await
55 .map_err(|err| Error::new(&format!("Failed to receive response: {}", err)))?;
56
57 let props = resp.properties().to_owned();
58 let payload = serde_json::from_value::<Resp>(resp.payload().to_owned())
59 .map_err(|err| Error::new(&format!("Failed to parse response payload: {}", err)))?;
60
61 Ok(IncomingResponse::new(payload, props))
62 }
63
64 pub fn response(&self, resp: IncomingResponse<JsonValue>) -> Result<(), Error> {
65 let tx = {
66 let mut store_lock = self.store.lock().expect("Dispatcher lock poisoned");
67
68 let tx = store_lock
69 .remove(resp.properties().correlation_data())
70 .ok_or_else(|| {
71 Error::new(&format!(
72 "Failed to commit response with correlation data = '{}': not being awaited",
73 resp.properties().correlation_data()
74 ))
75 })?;
76
77 drop(store_lock);
78 tx
79 };
80
81 tx.send(resp).map_err(|resp| {
82 Error::new(&format!(
83 "Failed to commit response with correlation data = '{}': receiver has been dropped",
84 resp.properties().correlation_data(),
85 ))
86 })?;
87
88 Ok(())
89 }
90
91 pub fn cancel_request(&self, corr_data: &str) -> Result<(), Error> {
92 self.store
93 .lock()
94 .expect("Dispatcher lock poisoned")
95 .remove(corr_data)
96 .map(|_| ())
97 .ok_or_else(|| Error::new(&format!(
98 "Failed to cancel request; response with correlation data = '{}' is not being awaited",
99 corr_data
100 )))
101 }
102}