Skip to main content

ruststream_nats/
request_reply.rs

1//! [`RequestReply`] capability for the NATS publisher.
2
3use async_nats::Request;
4use bytes::Bytes;
5use ruststream::{OutgoingMessage, RequestReply};
6use std::time::Duration;
7
8use crate::{
9    convert::headers_to_nats,
10    error::NatsError,
11    message::{CoreMessage, NatsMessage},
12    publisher::NatsPublisher,
13};
14
15impl RequestReply for NatsPublisher {
16    type Reply = NatsMessage;
17
18    async fn request(
19        &self,
20        msg: OutgoingMessage<'_>,
21        timeout: Duration,
22    ) -> Result<Self::Reply, Self::Error> {
23        let client = self.client_clone()?;
24        let subject = msg.name().to_owned();
25        let payload = Bytes::copy_from_slice(msg.payload());
26        let headers_owned = headers_to_nats(msg.headers());
27
28        let fut = async {
29            let request = match headers_owned {
30                Some(headers) => Request::new().payload(payload).headers(headers),
31                None => Request::new().payload(payload),
32            };
33            client
34                .send_request(subject, request)
35                .await
36                .map_err(|err| NatsError::Publish(Box::new(err)))
37        };
38
39        let response = tokio::time::timeout(timeout, fut)
40            .await
41            .map_err(|_| NatsError::RequestTimeout)??;
42        Ok(NatsMessage::Core(Box::new(CoreMessage::new(response))))
43    }
44}