svc_agent/
request.rs

1use std::{collections::HashMap, sync::Mutex};
2
3use serde::{de::DeserializeOwned, ser::Serialize};
4use serde_json::Value as JsonValue;
5use tokio::sync::oneshot;
6
7use crate::{
8    mqtt::{Agent, IncomingResponse, OutgoingMessage, OutgoingRequest},
9    Error,
10};
11
12pub struct Dispatcher {
13    agent: Agent,
14    store: Mutex<HashMap<String, oneshot::Sender<IncomingResponse<JsonValue>>>>,
15}
16
17impl Dispatcher {
18    pub fn new(agent: &Agent) -> Self {
19        Self {
20            agent: agent.to_owned(),
21            store: Mutex::new(HashMap::new()),
22        }
23    }
24
25    pub async fn request<Req, Resp>(
26        &self,
27        req: OutgoingRequest<Req>,
28    ) -> Result<IncomingResponse<Resp>, Error>
29    where
30        Req: 'static + Serialize,
31        Resp: DeserializeOwned,
32    {
33        let corr_data = req.properties().correlation_data();
34        let rx = {
35            let mut store_lock = self.store.lock().expect("Dispatcher lock poisoned");
36
37            if store_lock.get(corr_data).is_some() {
38                let err = format!(
39                    "Already awaiting response with correlation data = '{}'",
40                    corr_data
41                );
42                return Err(Error::new(&err));
43            }
44
45            let (tx, rx) = oneshot::channel::<IncomingResponse<JsonValue>>();
46            store_lock.insert(corr_data.to_owned(), tx);
47            drop(store_lock);
48            rx
49        };
50
51        self.agent.clone().publish(OutgoingMessage::Request(req))?;
52
53        let resp = rx
54            .await
55            .map_err(|err| Error::new(&format!("Failed to receive response: {}", err)))?;
56
57        let props = resp.properties().to_owned();
58        let payload = serde_json::from_value::<Resp>(resp.payload().to_owned())
59            .map_err(|err| Error::new(&format!("Failed to parse response payload: {}", err)))?;
60
61        Ok(IncomingResponse::new(payload, props))
62    }
63
64    pub fn response(&self, resp: IncomingResponse<JsonValue>) -> Result<(), Error> {
65        let tx = {
66            let mut store_lock = self.store.lock().expect("Dispatcher lock poisoned");
67
68            let tx = store_lock
69                .remove(resp.properties().correlation_data())
70                .ok_or_else(|| {
71                    Error::new(&format!(
72                        "Failed to commit response with correlation data = '{}': not being awaited",
73                        resp.properties().correlation_data()
74                    ))
75                })?;
76
77            drop(store_lock);
78            tx
79        };
80
81        tx.send(resp).map_err(|resp| {
82            Error::new(&format!(
83                "Failed to commit response with correlation data = '{}': receiver has been dropped",
84                resp.properties().correlation_data(),
85            ))
86        })?;
87
88        Ok(())
89    }
90
91    pub fn cancel_request(&self, corr_data: &str) -> Result<(), Error> {
92        self.store
93            .lock()
94            .expect("Dispatcher lock poisoned")
95            .remove(corr_data)
96            .map(|_| ())
97            .ok_or_else(|| Error::new(&format!(
98                "Failed to cancel request; response with correlation data = '{}' is not being awaited",
99                corr_data
100            )))
101    }
102}