ruststream_nats/
request_reply.rs1use async_nats::Request;
4use bytes::Bytes;
5use ruststream::{OutgoingMessage, RequestReply};
6use std::time::Duration;
7
8use crate::{
9 convert::headers_to_nats,
10 error::NatsError,
11 message::{CoreMessage, NatsMessage},
12 publisher::NatsPublisher,
13};
14
15impl RequestReply for NatsPublisher {
16 type Reply = NatsMessage;
17
18 async fn request(
19 &self,
20 msg: OutgoingMessage<'_>,
21 timeout: Duration,
22 ) -> Result<Self::Reply, Self::Error> {
23 let client = self.client_clone()?;
24 let subject = msg.name().to_owned();
25 let payload = Bytes::copy_from_slice(msg.payload());
26 let headers_owned = headers_to_nats(msg.headers());
27
28 let fut = async {
29 let request = match headers_owned {
30 Some(headers) => Request::new().payload(payload).headers(headers),
31 None => Request::new().payload(payload),
32 };
33 client
34 .send_request(subject, request)
35 .await
36 .map_err(|err| NatsError::Publish(Box::new(err)))
37 };
38
39 let response = tokio::time::timeout(timeout, fut)
40 .await
41 .map_err(|_| NatsError::RequestTimeout)??;
42 Ok(NatsMessage::Core(Box::new(CoreMessage::new(response))))
43 }
44}