anng/protocols/
reqrep0.rs

1//! Request/Reply pattern (REQ0/REP0 protocol).
2//!
3//! This module implements the Request/Reply messaging pattern, which provides
4//! reliable, synchronous request-response communication between clients and servers.
5//!
6//! # Socket Types
7//!
8//! - [`Req0`] - Clients that send requests and await replies
9//! - [`Rep0`] - Servers that receive requests and send replies
10//!
11//! # Protocol Overview
12//!
13//! The Request/Reply pattern enforces a strict state machine where request sockets
14//! must send before receiving, and reply sockets must receive before sending.
15//! Protocol violations are prevented at compile time by the type-safe bindings.
16//!
17//! # Key Features
18//!
19//! - **Automatic load balancing**: Requests distribute across available servers
20//! - **Resilience**: Automatic retry and reconnection on failure
21//!
22//! # Examples
23//!
24//! ## Simple client-server
25//!
26//! ```rust
27//! use anng::{protocols::reqrep0, Message};
28//! use std::io::Write;
29//!
30//! # #[tokio::main]
31//! # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
32//! // Server, in one task
33//! # tokio::spawn(async {
34//! let socket = reqrep0::Rep0::listen(c"inproc://demo").await?;
35//! let mut ctx = socket.context();
36//!
37//! let (request, responder) = ctx.receive().await?;
38//! println!("Got request: {:?}", request.as_slice());
39//!
40//! let mut reply = Message::with_capacity(100);
41//! write!(&mut reply, "Hello back!")?;
42//! // TODO: In production, handle error and retry with returned responder and message
43//! responder.reply(reply).await.unwrap();
44//! # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
45//! # });
46//!
47//! // Client, in another (concurrent) task
48//! let socket = reqrep0::Req0::dial(c"inproc://demo").await?;
49//! let mut ctx = socket.context();
50//!
51//! let mut request = Message::with_capacity(100);
52//! write!(&mut request, "Hello server!")?;
53//!
54//! // TODO: In production, handle error and retry with returned message
55//! let reply_future = ctx.request(request).await.unwrap();
56//! let reply = reply_future.await?;
57//! println!("Got reply: {:?}", reply.as_slice());
58//! # Ok(())
59//! # }
60//! ```
61//!
62//! ## Concurrent server
63//!
64//! ```rust
65//! use anng::{protocols::reqrep0, Message};
66//! use std::{io::Write, sync::Arc};
67//!
68//! # #[tokio::main]
69//! # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
70//! let socket = Arc::new(reqrep0::Rep0::listen(c"inproc://reqrep-concurrent-demo").await?);
71//!
72//! // Spawn multiple workers for concurrent handling
73//! for worker_id in 0..4 {
74//!     let socket = Arc::clone(&socket);
75//!     tokio::spawn(async move {
76//!         let mut ctx = socket.context();
77//!         loop {
78//!             let (request, responder) = ctx.receive().await?;
79//!             println!("Worker {} handling request", worker_id);
80//!
81//!             // Process request...
82//!             let mut reply = Message::with_capacity(100);
83//!             write!(&mut reply, "Processed by worker {}", worker_id)?;
84//!             responder.reply(reply).await.map_err(|(_, err, _)| err)?;
85//!         }
86//!         Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
87//!     });
88//! }
89//! # Ok(())
90//! # }
91//! ```
92
93use super::SupportsContext;
94use crate::{ContextfulSocket, Socket, aio::AioError, message::Message};
95use core::ffi::CStr;
96use std::{future::Future, io};
97
98/// Request socket type for the client side of Request/Reply communication.
99///
100/// REQ sockets send requests and wait for replies. The protocol enforces that each
101/// request must receive exactly one reply before the next request can be sent.
102/// This ordering is maintained automatically by the NNG library and enforced at
103/// compile time by the type system.
104///
105/// # Connection patterns
106///
107/// - **Typical**: Dial to connect to reply servers (`Req0::dial()`)
108/// - **Reverse**: Listen for connections from reply servers (`Req0::listen()`)
109/// - **Multi-connection**: Connect to multiple servers for round-robin request distribution
110///
111/// # Usage
112///
113/// ```rust
114/// # use anng::protocols::reqrep0::{Req0, Rep0};
115/// # use std::io::Write;
116/// # #[tokio::main]
117/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
118/// # tokio::spawn(async {
119/// #     let socket = Rep0::listen(c"inproc://req0-usage-doctest").await?;
120/// #     let mut ctx = socket.context();
121/// #     loop {
122/// #         let (request, responder) = ctx.receive().await?;
123/// #         let reply = anng::Message::from(&b"Reply from server"[..]);
124/// #         responder.reply(reply).await.unwrap();
125/// #     }
126/// #     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
127/// # });
128/// // Connect to a server
129/// let socket = Req0::dial(c"inproc://req0-usage-doctest").await?;
130/// let mut ctx = socket.context();
131///
132/// // Send request and await reply
133/// let request = anng::Message::from(&b"Hello"[..]);
134/// // TODO: In production, handle error and retry with returned message
135/// let reply_future = ctx.request(request).await.unwrap();
136/// let reply = reply_future.await?;
137/// # Ok(())
138/// # }
139/// ```
140#[derive(Debug, Clone, Copy)]
141pub struct Req0;
142
143impl SupportsContext for Req0 {}
144
145impl Req0 {
146    /// Creates a new REQ0 socket without establishing connections.
147    ///
148    /// For simple cases, prefer [`Req0::dial`] or [`Req0::listen`] which create and connect in one step.
149    pub fn socket() -> io::Result<Socket<Req0>> {
150        // SAFETY: nng_req0_open is the correct initialization function for Req0 protocol.
151        unsafe { super::create_socket(nng_sys::nng_req0_open, Req0) }
152    }
153
154    /// Creates a REQ0 socket and connects to the specified URL.
155    ///
156    /// Convenience function that combines [`Req0::socket`] and [`Socket::dial`].
157    /// For multiple connections or advanced configuration, use [`Req0::socket`] directly.
158    pub async fn dial(url: impl AsRef<CStr>) -> io::Result<Socket<Req0>> {
159        let socket = Self::socket()?;
160        socket.dial(url.as_ref()).await?;
161        Ok(socket)
162    }
163
164    /// Creates a REQ0 socket and listens on the specified URL.
165    ///
166    /// Convenience function that combines [`Req0::socket`] and [`Socket::listen`].
167    /// For listening on multiple addresses or advanced configuration, use [`Req0::socket`] directly.
168    pub async fn listen(url: impl AsRef<CStr>) -> io::Result<Socket<Req0>> {
169        let socket = Self::socket()?;
170        socket.listen(url.as_ref()).await?;
171        Ok(socket)
172    }
173}
174
175impl<'socket> ContextfulSocket<'socket, Req0> {
176    /// Sends a request and returns a future that will resolve to the reply.
177    ///
178    /// This method implements the client side of the Request/Reply pattern.
179    /// It sends the provided message as a request and returns a future that
180    /// will resolve to the reply message when it arrives.
181    ///
182    /// # Two-Phase Operation
183    ///
184    /// The request operation is split into two phases:
185    /// 1. **Send phase**: Returns immediately with a future if the send succeeds
186    /// 2. **Reply phase**: The returned future awaits the reply
187    ///
188    /// This design allows you to handle send failures immediately while still
189    /// benefiting from async reply handling.
190    ///
191    /// # Cancellation safety
192    ///
193    /// This function is **cancellation safe**. If the send phase is cancelled, the message
194    /// may or may not be sent (depending on when cancellation occurs relative to NNG's
195    /// internal dispatch). The message will be dropped if not sent. If the reply future
196    /// is cancelled, any incoming reply will be discarded, and a new request can be sent
197    /// immediately.
198    ///
199    /// # Protocol state
200    ///
201    /// Each context maintains independent protocol state. You can have multiple
202    /// outstanding requests across different contexts, but each individual context
203    /// must complete its request-reply cycle before starting a new one.
204    /// This is enforced at compile-time.
205    ///
206    /// # Errors
207    ///
208    /// Returns `Err((error, message))` if the send operation fails, giving you back
209    /// the message for potential retry. The reply future can fail with
210    /// network errors, timeouts, or cancellation.
211    ///
212    /// # Examples
213    ///
214    /// ```rust
215    /// use anng::{protocols::reqrep0, Message, AioError};
216    /// use std::io::Write;
217    ///
218    /// # #[tokio::main]
219    /// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
220    /// # tokio::spawn(async {
221    /// #     let socket = reqrep0::Rep0::listen(c"inproc://request-doctest").await?;
222    /// #     let mut ctx = socket.context();
223    /// #     loop {
224    /// #         let (request, responder) = ctx.receive().await?;
225    /// #         let mut reply = Message::with_capacity(100);
226    /// #         write!(&mut reply, "Hello back!")?;
227    /// #         responder.reply(reply).await.unwrap();
228    /// #     }
229    /// #     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
230    /// # });
231    /// let socket = reqrep0::Req0::dial(c"inproc://request-doctest").await?;
232    /// let mut ctx = socket.context();
233    ///
234    /// let mut request = Message::with_capacity(100);
235    /// write!(&mut request, "Hello")?;
236    ///
237    /// match ctx.request(request).await {
238    ///     Ok(reply_future) => {
239    ///         match reply_future.await {
240    ///             Ok(reply) => println!("Got reply: {:?}", reply.as_slice()),
241    ///             Err(AioError::TimedOut) => println!("Request timed out"),
242    ///             Err(e) => println!("Request failed: {:?}", e),
243    ///         }
244    ///     }
245    ///     Err((error, msg)) => {
246    ///         println!("Send failed: {:?}", error);
247    ///         // Could retry with `msg`
248    ///     }
249    /// }
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub async fn request<'s>(
254        &'s mut self,
255        message: Message,
256    ) -> Result<
257        impl Future<Output = Result<Message, AioError>> + use<'s, 'socket>,
258        (AioError, Message),
259    > {
260        tracing::trace!("send request");
261        // NOTE(jon): it's fine if send_msg is cancelled, but eventually succeeds; the context will
262        // implicitly cancel the previous request as a result and discard replies to that old
263        // request.
264        if let Err((e, msg)) = self.context.send_msg(message, &mut self.aio).await {
265            tracing::error!(?e, "request failed");
266            Err((e, msg))
267        } else {
268            tracing::debug!("request succeeded; ready to await reply");
269            // NOTE(jon): here, too, it's fine if the user drops this reply future, because a new
270            // request will reset the context and cancel the old request (plus drop its responses).
271            Ok(async move {
272                tracing::trace!("awaiting reply");
273                let r = self.context.recv_msg(&mut self.aio).await;
274                tracing::debug!("reply arrived");
275                r
276            })
277        }
278    }
279}
280
281/// Reply socket type for the server side of Request/Reply communication.
282///
283/// REP sockets receive requests and send back replies. The protocol enforces that each
284/// received request must be replied to exactly once before the next request can be
285/// received. This ordering is maintained automatically by the NNG library.
286///
287/// # Connection patterns
288///
289/// - **Typical**: Listen to accept connections from request clients (`Rep0::listen()`)
290/// - **Reverse**: Dial to connect to request endpoints (`Rep0::dial()`)
291///
292/// # Concurrency
293///
294/// For concurrent request handling, use multiple contexts on the same socket.
295/// Each context maintains independent protocol state, allowing parallel processing
296/// of different requests.
297///
298/// # Usage
299///
300/// ```rust
301/// # use anng::protocols::reqrep0::{Rep0, Req0};
302/// # #[tokio::main]
303/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
304/// # tokio::spawn(async {
305/// #     let socket = Req0::dial(c"inproc://rep0-usage-demo").await?;
306/// #     let mut ctx = socket.context();
307/// #     let request = anng::Message::from(&b"Hello server"[..]);
308/// #     let reply_future = ctx.request(request).await.unwrap();
309/// #     let _reply = reply_future.await?;
310/// #     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
311/// # });
312/// // Listen for client connections
313/// let socket = Rep0::listen(c"inproc://rep0-usage-demo").await?;
314/// let mut ctx = socket.context();
315///
316/// // Handle request-reply cycle
317/// let (request, responder) = ctx.receive().await?;
318/// let reply = anng::Message::from(&b"Hello back"[..]);
319/// // TODO: In production, handle error and retry with returned responder and message
320/// responder.reply(reply).await.unwrap();
321/// # Ok(())
322/// # }
323/// ```
324#[derive(Debug, Clone, Copy)]
325pub struct Rep0;
326
327impl SupportsContext for Rep0 {}
328
329impl Rep0 {
330    /// Creates a new REP0 socket without establishing connections.
331    ///
332    /// For simple cases, prefer [`Rep0::listen`] or [`Rep0::dial`] which create and connect in one step.
333    pub fn socket() -> io::Result<Socket<Rep0>> {
334        // SAFETY: nng_rep0_open is the correct initialization function for Rep0 protocol.
335        unsafe { super::create_socket(nng_sys::nng_rep0_open, Rep0) }
336    }
337
338    /// Creates a REP0 socket and listens on the specified URL.
339    ///
340    /// Convenience function that combines [`Rep0::socket`] and [`Socket::listen`].
341    /// For listening on multiple addresses or advanced configuration, use [`Rep0::socket`] directly.
342    pub async fn listen(url: impl AsRef<CStr>) -> io::Result<Socket<Rep0>> {
343        let socket = Self::socket()?;
344        socket.listen(url.as_ref()).await?;
345        Ok(socket)
346    }
347
348    /// Creates a REP0 socket and connects to the specified URL.
349    ///
350    /// Convenience function that combines [`Rep0::socket`] and [`Socket::dial`].
351    /// For connecting to multiple addresses or advanced configuration, use [`Rep0::socket`] directly.
352    pub async fn dial(url: impl AsRef<CStr>) -> io::Result<Socket<Rep0>> {
353        let socket = Self::socket()?;
354        socket.dial(url.as_ref()).await?;
355        Ok(socket)
356    }
357}
358
359impl<'socket> ContextfulSocket<'socket, Rep0> {
360    /// Receives a request and returns the message along with a means to reply.
361    ///
362    /// This method implements the server side of the Request/Reply pattern.
363    /// It waits for an incoming request and returns both the request message
364    /// and a [`Responder`] that must be used to send the reply.
365    ///
366    /// # Responder Pattern
367    ///
368    /// The returned [`Responder`] ensures that:
369    /// - Each request gets exactly one reply
370    /// - Protocol state is maintained correctly
371    /// - Resources are cleaned up if the responder is dropped
372    ///
373    /// You **must** call [`Responder::reply`] to complete the request-reply cycle.
374    /// Dropping the responder without replying will lead to the client re-sending
375    /// their request.
376    ///
377    /// # Cancellation safety
378    ///
379    /// This function is **cancellation safe**. If cancelled after a request message has
380    /// been received, that message will be returned by the next call to `receive()` on
381    /// the same context. No request messages are lost due to cancellation. This ensures
382    /// that clients don't need to resend requests unnecessarily.
383    ///
384    /// # Protocol state
385    ///
386    /// Each context maintains independent protocol state. After receiving a
387    /// request, that context cannot receive another request until it has
388    /// sent a reply using the [`Responder`].
389    ///
390    /// # Examples
391    ///
392    /// ```rust
393    /// use anng::{protocols::reqrep0, Message};
394    /// use std::io::Write;
395    ///
396    /// # #[tokio::main]
397    /// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
398    /// let socket = reqrep0::Rep0::listen(c"inproc://receive-doctest-echo").await?;
399    /// let mut ctx = socket.context();
400    ///
401    /// # tokio::spawn(async {
402    /// #     let socket = reqrep0::Req0::dial(c"inproc://receive-doctest-echo").await?;
403    /// #     let mut ctx = socket.context();
404    /// #     let mut request = Message::with_capacity(100);
405    /// #     write!(&mut request, "Test message")?;
406    /// #     let reply_future = ctx.request(request).await.unwrap();
407    /// #     let _reply = reply_future.await?;
408    /// #     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
409    /// # });
410    /// loop {
411    ///     let (request, responder) = ctx.receive().await?;
412    ///
413    ///     // Echo the request back as a reply
414    ///     let mut reply = Message::with_capacity(request.len());
415    ///     reply.write(request.as_slice())?;
416    ///
417    ///     // TODO: In production, handle error and retry with returned responder and message
418    ///     responder.reply(reply).await.unwrap();
419    /// #     break;
420    /// }
421    /// # Ok(())
422    /// # }
423    /// ```
424    pub async fn receive<'s>(&'s mut self) -> Result<(Message, Responder<'s, 'socket>), AioError> {
425        // NOTE(jon): if `recv_msg` succeeds after being dropped, it saves its Message so that the
426        // next call to `receive` will return the same message (and thus we won't get `NNG_ESTATE`
427        // from trying to receive on a Rep0 socket that is expected to send next).
428        let message = self.context.recv_msg(&mut self.aio).await?;
429        // NOTE: Dropping the `Responder` without calling `reply()` means we'll call recv on a rep
430        // protocol socket twice in a row. Upstream has confirmed that this is acceptable behavior,
431        // and is effectively equivalent to a dropped reply (ie, the requester will retry).
432        let responder = Responder {
433            contextful: self,
434            sent: false,
435        };
436        Ok((message, responder))
437    }
438
439    /// Non-blocking variant of [`receive`](Self::receive).
440    ///
441    /// Returns `Ok(Some((message, responder)))` if immediately available, `Ok(None)` if no request is available.
442    pub fn try_receive<'s>(
443        &'s mut self,
444    ) -> Result<Option<(Message, Responder<'s, 'socket>)>, AioError> {
445        match self.context.try_recv_msg()? {
446            Some(message) => {
447                let responder = Responder {
448                    contextful: self,
449                    sent: false,
450                };
451                Ok(Some((message, responder)))
452            }
453            None => Ok(None),
454        }
455    }
456}
457
458/// A handle for sending a reply to a specific request.
459///
460/// `Responder` ensures that each request gets exactly one reply and maintains
461/// proper protocol state. It must be used to complete the request-reply cycle
462/// after receiving a request via [`ContextfulSocket::receive`].
463///
464/// # Must use
465///
466/// This type is marked `#[must_use]` because dropping it without calling [`Responder::reply`]
467/// means the client will have to retry the request.
468///
469/// If a `Responder` is dropped without sending a reply (e.g., due to an error or cancellation), a
470/// warning is logged to indicate that the peer will need to re-send their request.
471///
472/// # Examples
473///
474/// ```rust
475/// use anng::{protocols::reqrep0, Message};
476/// use std::io::Write;
477///
478/// # #[tokio::main]
479/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
480/// # tokio::spawn(async {
481/// #     let socket = reqrep0::Req0::dial(c"inproc://service").await?;
482/// #     let mut ctx = socket.context();
483/// #     let request = Message::from(&b"Test request"[..]);
484/// #     let reply_future = ctx.request(request).await.unwrap();
485/// #     let _reply = reply_future.await?;
486/// #     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
487/// # });
488/// let socket = reqrep0::Rep0::listen(c"inproc://service").await?;
489/// let mut ctx = socket.context();
490///
491/// let (request, responder) = ctx.receive().await?;
492///
493/// // Process the request...
494/// let mut reply = Message::with_capacity(100);
495/// write!(&mut reply, "Processed: {:?}", request.as_slice())?;
496///
497/// // Send the reply
498/// // TODO: In production, handle error and retry with returned responder and message
499/// responder.reply(reply).await.unwrap();
500/// # Ok(())
501/// # }
502/// ```
503#[derive(Debug)]
504#[must_use]
505pub struct Responder<'s, 'socket> {
506    contextful: &'s mut ContextfulSocket<'socket, Rep0>,
507    sent: bool,
508}
509
510impl Drop for Responder<'_, '_> {
511    fn drop(&mut self) {
512        if !self.sent {
513            tracing::warn!(
514                "REP0 Responder dropped (likely cancelled); \
515            peer will have to re-send the request"
516            );
517        }
518    }
519}
520
521impl Responder<'_, '_> {
522    /// Sends a reply message to complete the request-reply cycle.
523    ///
524    /// This method consumes the `Responder` and sends the provided message
525    /// as a reply to the original request. This completes the protocol
526    /// state machine and allows the context to receive new requests.
527    ///
528    /// # Cancellation safety
529    ///
530    /// This function is **cancellation safe**. If cancelled, the reply message may or may
531    /// not be sent (depending on when cancellation occurs). The message and responder will
532    /// be dropped if not sent. Note that if the reply is not sent, the client will
533    /// typically retry their request after a timeout, resulting in a new request to
534    /// `receive()`.
535    ///
536    /// # Errors
537    ///
538    /// Returns `Err((responder, error, message))` if the send operation fails.
539    /// This gives you back all the resources so you can handle the error
540    /// appropriately (retry, log, etc.).
541    ///
542    /// # Examples
543    ///
544    /// ```rust
545    /// use anng::{protocols::reqrep0, Message, AioError};
546    /// use std::io::Write;
547    ///
548    /// # #[tokio::main]
549    /// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
550    /// # tokio::spawn(async {
551    /// #     let socket = reqrep0::Req0::dial(c"inproc://replies").await?;
552    /// #     let mut ctx = socket.context();
553    /// #     let request = Message::from(&b"Test request"[..]);
554    /// #     let reply_future = ctx.request(request).await.unwrap();
555    /// #     let _reply = reply_future.await?;
556    /// #     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
557    /// # });
558    /// let socket = reqrep0::Rep0::listen(c"inproc://replies").await?;
559    /// let mut ctx = socket.context();
560    ///
561    /// let (request, responder) = ctx.receive().await?;
562    ///
563    /// let mut reply = Message::with_capacity(100);
564    /// write!(&mut reply, "Hello back!")?;
565    ///
566    /// match responder.reply(reply).await {
567    ///     Ok(()) => println!("Reply sent successfully"),
568    ///     Err((responder, error, message)) => {
569    ///         println!("Failed to send reply: {:?}", error);
570    ///         // Could retry with `responder.reply(message)`
571    ///     }
572    /// }
573    /// # Ok(())
574    /// # }
575    /// ```
576    pub async fn reply(mut self, message: Message) -> Result<(), (Self, AioError, Message)> {
577        // avoid the drop warning
578        self.sent = true;
579
580        if let Err((e, msg)) = self
581            .contextful
582            .context
583            .send_msg(message, &mut self.contextful.aio)
584            .await
585        {
586            Err((self, e, msg))
587        } else {
588            Ok(())
589        }
590    }
591}