Skip to main content

ruststream_nats/
request_reply.rs

1//! [`RequestReply`] capability for the NATS publisher.
2
3use std::time::Duration;
4
5use bytes::Bytes;
6use ruststream::{OutgoingMessage, RequestReply};
7
8use crate::{
9    convert::headers_to_nats,
10    error::NatsError,
11    message::{CoreMessage, NatsMessage},
12    publisher::NatsPublisher,
13};
14
15impl RequestReply for NatsPublisher {
16    type Reply = NatsMessage;
17
18    async fn request(
19        &self,
20        msg: OutgoingMessage<'_>,
21        timeout: Duration,
22    ) -> Result<Self::Reply, Self::Error> {
23        let client = self.client_for_request();
24        let subject = msg.name().to_owned();
25        let payload = Bytes::copy_from_slice(msg.payload());
26        let headers_owned = headers_to_nats(msg.headers());
27
28        let fut = async {
29            let request = match headers_owned {
30                Some(headers) => async_nats::Request::new().payload(payload).headers(headers),
31                None => async_nats::Request::new().payload(payload),
32            };
33            client
34                .send_request(subject, request)
35                .await
36                .map_err(|err| NatsError::Publish(Box::new(err)))
37        };
38
39        let response = tokio::time::timeout(timeout, fut)
40            .await
41            .map_err(|_| NatsError::RequestTimeout)??;
42        Ok(NatsMessage::Core(Box::new(CoreMessage::new(response))))
43    }
44}
45
46impl NatsPublisher {
47    fn client_for_request(&self) -> async_nats::Client {
48        self.client_clone()
49    }
50}