anng/protocols/reqrep0.rs
1//! Request/Reply pattern (REQ0/REP0 protocol).
2//!
3//! This module implements the Request/Reply messaging pattern, which provides
4//! reliable, synchronous request-response communication between clients and servers.
5//!
6//! # Socket Types
7//!
8//! - [`Req0`] - Clients that send requests and await replies
9//! - [`Rep0`] - Servers that receive requests and send replies
10//!
11//! # Protocol Overview
12//!
13//! The Request/Reply pattern enforces a strict state machine where request sockets
14//! must send before receiving, and reply sockets must receive before sending.
15//! Protocol violations are prevented at compile time by the type-safe bindings.
16//!
17//! # Key Features
18//!
19//! - **Automatic load balancing**: Requests distribute across available servers
20//! - **Resilience**: Automatic retry and reconnection on failure
21//!
22//! # Examples
23//!
24//! ## Simple client-server
25//!
26//! ```rust
27//! use anng::{protocols::reqrep0, Message};
28//! use std::io::Write;
29//!
30//! # #[tokio::main]
31//! # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
32//! // Server, in one task
33//! # tokio::spawn(async {
34//! let socket = reqrep0::Rep0::listen(c"inproc://demo").await?;
35//! let mut ctx = socket.context();
36//!
37//! let (request, responder) = ctx.receive().await?;
38//! println!("Got request: {:?}", request.as_slice());
39//!
40//! let mut reply = Message::with_capacity(100);
41//! write!(&mut reply, "Hello back!")?;
42//! // TODO: In production, handle error and retry with returned responder and message
43//! responder.reply(reply).await.unwrap();
44//! # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
45//! # });
46//!
47//! // Client, in another (concurrent) task
48//! let socket = reqrep0::Req0::dial(c"inproc://demo").await?;
49//! let mut ctx = socket.context();
50//!
51//! let mut request = Message::with_capacity(100);
52//! write!(&mut request, "Hello server!")?;
53//!
54//! // TODO: In production, handle error and retry with returned message
55//! let reply_future = ctx.request(request).await.unwrap();
56//! let reply = reply_future.await?;
57//! println!("Got reply: {:?}", reply.as_slice());
58//! # Ok(())
59//! # }
60//! ```
61//!
62//! ## Concurrent server
63//!
64//! ```rust
65//! use anng::{protocols::reqrep0, Message};
66//! use std::{io::Write, sync::Arc};
67//!
68//! # #[tokio::main]
69//! # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
70//! let socket = Arc::new(reqrep0::Rep0::listen(c"inproc://reqrep-concurrent-demo").await?);
71//!
72//! // Spawn multiple workers for concurrent handling
73//! for worker_id in 0..4 {
74//! let socket = Arc::clone(&socket);
75//! tokio::spawn(async move {
76//! let mut ctx = socket.context();
77//! loop {
78//! let (request, responder) = ctx.receive().await?;
79//! println!("Worker {} handling request", worker_id);
80//!
81//! // Process request...
82//! let mut reply = Message::with_capacity(100);
83//! write!(&mut reply, "Processed by worker {}", worker_id)?;
84//! responder.reply(reply).await.map_err(|(_, err, _)| err)?;
85//! }
86//! Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
87//! });
88//! }
89//! # Ok(())
90//! # }
91//! ```
92
93use super::SupportsContext;
94use crate::{ContextfulSocket, Socket, aio::AioError, message::Message};
95use core::ffi::CStr;
96use std::{future::Future, io};
97
98/// Request socket type for the client side of Request/Reply communication.
99///
100/// REQ sockets send requests and wait for replies. The protocol enforces that each
101/// request must receive exactly one reply before the next request can be sent.
102/// This ordering is maintained automatically by the NNG library and enforced at
103/// compile time by the type system.
104///
105/// # Connection patterns
106///
107/// - **Typical**: Dial to connect to reply servers (`Req0::dial()`)
108/// - **Reverse**: Listen for connections from reply servers (`Req0::listen()`)
109/// - **Multi-connection**: Connect to multiple servers for round-robin request distribution
110///
111/// # Usage
112///
113/// ```rust
114/// # use anng::protocols::reqrep0::{Req0, Rep0};
115/// # use std::io::Write;
116/// # #[tokio::main]
117/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
118/// # tokio::spawn(async {
119/// # let socket = Rep0::listen(c"inproc://req0-usage-doctest").await?;
120/// # let mut ctx = socket.context();
121/// # loop {
122/// # let (request, responder) = ctx.receive().await?;
123/// # let reply = anng::Message::from(&b"Reply from server"[..]);
124/// # responder.reply(reply).await.unwrap();
125/// # }
126/// # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
127/// # });
128/// // Connect to a server
129/// let socket = Req0::dial(c"inproc://req0-usage-doctest").await?;
130/// let mut ctx = socket.context();
131///
132/// // Send request and await reply
133/// let request = anng::Message::from(&b"Hello"[..]);
134/// // TODO: In production, handle error and retry with returned message
135/// let reply_future = ctx.request(request).await.unwrap();
136/// let reply = reply_future.await?;
137/// # Ok(())
138/// # }
139/// ```
140#[derive(Debug, Clone, Copy)]
141pub struct Req0;
142
143impl SupportsContext for Req0 {}
144
145impl Req0 {
146 /// Creates a new REQ0 socket without establishing connections.
147 ///
148 /// For simple cases, prefer [`Req0::dial`] or [`Req0::listen`] which create and connect in one step.
149 pub fn socket() -> io::Result<Socket<Req0>> {
150 // SAFETY: nng_req0_open is the correct initialization function for Req0 protocol.
151 unsafe { super::create_socket(nng_sys::nng_req0_open, Req0) }
152 }
153
154 /// Creates a REQ0 socket and connects to the specified URL.
155 ///
156 /// Convenience function that combines [`Req0::socket`] and [`Socket::dial`].
157 /// For multiple connections or advanced configuration, use [`Req0::socket`] directly.
158 pub async fn dial(url: impl AsRef<CStr>) -> io::Result<Socket<Req0>> {
159 let socket = Self::socket()?;
160 socket.dial(url.as_ref()).await?;
161 Ok(socket)
162 }
163
164 /// Creates a REQ0 socket and listens on the specified URL.
165 ///
166 /// Convenience function that combines [`Req0::socket`] and [`Socket::listen`].
167 /// For listening on multiple addresses or advanced configuration, use [`Req0::socket`] directly.
168 pub async fn listen(url: impl AsRef<CStr>) -> io::Result<Socket<Req0>> {
169 let socket = Self::socket()?;
170 socket.listen(url.as_ref()).await?;
171 Ok(socket)
172 }
173}
174
175impl<'socket> ContextfulSocket<'socket, Req0> {
176 /// Sends a request and returns a future that will resolve to the reply.
177 ///
178 /// This method implements the client side of the Request/Reply pattern.
179 /// It sends the provided message as a request and returns a future that
180 /// will resolve to the reply message when it arrives.
181 ///
182 /// # Two-Phase Operation
183 ///
184 /// The request operation is split into two phases:
185 /// 1. **Send phase**: Returns immediately with a future if the send succeeds
186 /// 2. **Reply phase**: The returned future awaits the reply
187 ///
188 /// This design allows you to handle send failures immediately while still
189 /// benefiting from async reply handling.
190 ///
191 /// # Cancellation safety
192 ///
193 /// This function is **cancellation safe**. If the send phase is cancelled, the message
194 /// may or may not be sent (depending on when cancellation occurs relative to NNG's
195 /// internal dispatch). The message will be dropped if not sent. If the reply future
196 /// is cancelled, any incoming reply will be discarded, and a new request can be sent
197 /// immediately.
198 ///
199 /// # Protocol state
200 ///
201 /// Each context maintains independent protocol state. You can have multiple
202 /// outstanding requests across different contexts, but each individual context
203 /// must complete its request-reply cycle before starting a new one.
204 /// This is enforced at compile-time.
205 ///
206 /// # Errors
207 ///
208 /// Returns `Err((error, message))` if the send operation fails, giving you back
209 /// the message for potential retry. The reply future can fail with
210 /// network errors, timeouts, or cancellation.
211 ///
212 /// # Examples
213 ///
214 /// ```rust
215 /// use anng::{protocols::reqrep0, Message, AioError};
216 /// use std::io::Write;
217 ///
218 /// # #[tokio::main]
219 /// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
220 /// # tokio::spawn(async {
221 /// # let socket = reqrep0::Rep0::listen(c"inproc://request-doctest").await?;
222 /// # let mut ctx = socket.context();
223 /// # loop {
224 /// # let (request, responder) = ctx.receive().await?;
225 /// # let mut reply = Message::with_capacity(100);
226 /// # write!(&mut reply, "Hello back!")?;
227 /// # responder.reply(reply).await.unwrap();
228 /// # }
229 /// # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
230 /// # });
231 /// let socket = reqrep0::Req0::dial(c"inproc://request-doctest").await?;
232 /// let mut ctx = socket.context();
233 ///
234 /// let mut request = Message::with_capacity(100);
235 /// write!(&mut request, "Hello")?;
236 ///
237 /// match ctx.request(request).await {
238 /// Ok(reply_future) => {
239 /// match reply_future.await {
240 /// Ok(reply) => println!("Got reply: {:?}", reply.as_slice()),
241 /// Err(AioError::TimedOut) => println!("Request timed out"),
242 /// Err(e) => println!("Request failed: {:?}", e),
243 /// }
244 /// }
245 /// Err((error, msg)) => {
246 /// println!("Send failed: {:?}", error);
247 /// // Could retry with `msg`
248 /// }
249 /// }
250 /// # Ok(())
251 /// # }
252 /// ```
253 pub async fn request<'s>(
254 &'s mut self,
255 message: Message,
256 ) -> Result<
257 impl Future<Output = Result<Message, AioError>> + use<'s, 'socket>,
258 (AioError, Message),
259 > {
260 tracing::trace!("send request");
261 // NOTE(jon): it's fine if send_msg is cancelled, but eventually succeeds; the context will
262 // implicitly cancel the previous request as a result and discard replies to that old
263 // request.
264 if let Err((e, msg)) = self.context.send_msg(message, &mut self.aio).await {
265 tracing::error!(?e, "request failed");
266 Err((e, msg))
267 } else {
268 tracing::debug!("request succeeded; ready to await reply");
269 // NOTE(jon): here, too, it's fine if the user drops this reply future, because a new
270 // request will reset the context and cancel the old request (plus drop its responses).
271 Ok(async move {
272 tracing::trace!("awaiting reply");
273 let r = self.context.recv_msg(&mut self.aio).await;
274 tracing::debug!("reply arrived");
275 r
276 })
277 }
278 }
279}
280
281/// Reply socket type for the server side of Request/Reply communication.
282///
283/// REP sockets receive requests and send back replies. The protocol enforces that each
284/// received request must be replied to exactly once before the next request can be
285/// received. This ordering is maintained automatically by the NNG library.
286///
287/// # Connection patterns
288///
289/// - **Typical**: Listen to accept connections from request clients (`Rep0::listen()`)
290/// - **Reverse**: Dial to connect to request endpoints (`Rep0::dial()`)
291///
292/// # Concurrency
293///
294/// For concurrent request handling, use multiple contexts on the same socket.
295/// Each context maintains independent protocol state, allowing parallel processing
296/// of different requests.
297///
298/// # Usage
299///
300/// ```rust
301/// # use anng::protocols::reqrep0::{Rep0, Req0};
302/// # #[tokio::main]
303/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
304/// # tokio::spawn(async {
305/// # let socket = Req0::dial(c"inproc://rep0-usage-demo").await?;
306/// # let mut ctx = socket.context();
307/// # let request = anng::Message::from(&b"Hello server"[..]);
308/// # let reply_future = ctx.request(request).await.unwrap();
309/// # let _reply = reply_future.await?;
310/// # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
311/// # });
312/// // Listen for client connections
313/// let socket = Rep0::listen(c"inproc://rep0-usage-demo").await?;
314/// let mut ctx = socket.context();
315///
316/// // Handle request-reply cycle
317/// let (request, responder) = ctx.receive().await?;
318/// let reply = anng::Message::from(&b"Hello back"[..]);
319/// // TODO: In production, handle error and retry with returned responder and message
320/// responder.reply(reply).await.unwrap();
321/// # Ok(())
322/// # }
323/// ```
324#[derive(Debug, Clone, Copy)]
325pub struct Rep0;
326
327impl SupportsContext for Rep0 {}
328
329impl Rep0 {
330 /// Creates a new REP0 socket without establishing connections.
331 ///
332 /// For simple cases, prefer [`Rep0::listen`] or [`Rep0::dial`] which create and connect in one step.
333 pub fn socket() -> io::Result<Socket<Rep0>> {
334 // SAFETY: nng_rep0_open is the correct initialization function for Rep0 protocol.
335 unsafe { super::create_socket(nng_sys::nng_rep0_open, Rep0) }
336 }
337
338 /// Creates a REP0 socket and listens on the specified URL.
339 ///
340 /// Convenience function that combines [`Rep0::socket`] and [`Socket::listen`].
341 /// For listening on multiple addresses or advanced configuration, use [`Rep0::socket`] directly.
342 pub async fn listen(url: impl AsRef<CStr>) -> io::Result<Socket<Rep0>> {
343 let socket = Self::socket()?;
344 socket.listen(url.as_ref()).await?;
345 Ok(socket)
346 }
347
348 /// Creates a REP0 socket and connects to the specified URL.
349 ///
350 /// Convenience function that combines [`Rep0::socket`] and [`Socket::dial`].
351 /// For connecting to multiple addresses or advanced configuration, use [`Rep0::socket`] directly.
352 pub async fn dial(url: impl AsRef<CStr>) -> io::Result<Socket<Rep0>> {
353 let socket = Self::socket()?;
354 socket.dial(url.as_ref()).await?;
355 Ok(socket)
356 }
357}
358
359impl<'socket> ContextfulSocket<'socket, Rep0> {
360 /// Receives a request and returns the message along with a means to reply.
361 ///
362 /// This method implements the server side of the Request/Reply pattern.
363 /// It waits for an incoming request and returns both the request message
364 /// and a [`Responder`] that must be used to send the reply.
365 ///
366 /// # Responder Pattern
367 ///
368 /// The returned [`Responder`] ensures that:
369 /// - Each request gets exactly one reply
370 /// - Protocol state is maintained correctly
371 /// - Resources are cleaned up if the responder is dropped
372 ///
373 /// You **must** call [`Responder::reply`] to complete the request-reply cycle.
374 /// Dropping the responder without replying will lead to the client re-sending
375 /// their request.
376 ///
377 /// # Cancellation safety
378 ///
379 /// This function is **cancellation safe**. If cancelled after a request message has
380 /// been received, that message will be returned by the next call to `receive()` on
381 /// the same context. No request messages are lost due to cancellation. This ensures
382 /// that clients don't need to resend requests unnecessarily.
383 ///
384 /// # Protocol state
385 ///
386 /// Each context maintains independent protocol state. After receiving a
387 /// request, that context cannot receive another request until it has
388 /// sent a reply using the [`Responder`].
389 ///
390 /// # Examples
391 ///
392 /// ```rust
393 /// use anng::{protocols::reqrep0, Message};
394 /// use std::io::Write;
395 ///
396 /// # #[tokio::main]
397 /// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
398 /// let socket = reqrep0::Rep0::listen(c"inproc://receive-doctest-echo").await?;
399 /// let mut ctx = socket.context();
400 ///
401 /// # tokio::spawn(async {
402 /// # let socket = reqrep0::Req0::dial(c"inproc://receive-doctest-echo").await?;
403 /// # let mut ctx = socket.context();
404 /// # let mut request = Message::with_capacity(100);
405 /// # write!(&mut request, "Test message")?;
406 /// # let reply_future = ctx.request(request).await.unwrap();
407 /// # let _reply = reply_future.await?;
408 /// # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
409 /// # });
410 /// loop {
411 /// let (request, responder) = ctx.receive().await?;
412 ///
413 /// // Echo the request back as a reply
414 /// let mut reply = Message::with_capacity(request.len());
415 /// reply.write(request.as_slice())?;
416 ///
417 /// // TODO: In production, handle error and retry with returned responder and message
418 /// responder.reply(reply).await.unwrap();
419 /// # break;
420 /// }
421 /// # Ok(())
422 /// # }
423 /// ```
424 pub async fn receive<'s>(&'s mut self) -> Result<(Message, Responder<'s, 'socket>), AioError> {
425 // NOTE(jon): if `recv_msg` succeeds after being dropped, it saves its Message so that the
426 // next call to `receive` will return the same message (and thus we won't get `NNG_ESTATE`
427 // from trying to receive on a Rep0 socket that is expected to send next).
428 let message = self.context.recv_msg(&mut self.aio).await?;
429 // NOTE: Dropping the `Responder` without calling `reply()` means we'll call recv on a rep
430 // protocol socket twice in a row. Upstream has confirmed that this is acceptable behavior,
431 // and is effectively equivalent to a dropped reply (ie, the requester will retry).
432 let responder = Responder {
433 contextful: self,
434 sent: false,
435 };
436 Ok((message, responder))
437 }
438
439 /// Non-blocking variant of [`receive`](Self::receive).
440 ///
441 /// Returns `Ok(Some((message, responder)))` if immediately available, `Ok(None)` if no request is available.
442 pub fn try_receive<'s>(
443 &'s mut self,
444 ) -> Result<Option<(Message, Responder<'s, 'socket>)>, AioError> {
445 match self.context.try_recv_msg()? {
446 Some(message) => {
447 let responder = Responder {
448 contextful: self,
449 sent: false,
450 };
451 Ok(Some((message, responder)))
452 }
453 None => Ok(None),
454 }
455 }
456}
457
458/// A handle for sending a reply to a specific request.
459///
460/// `Responder` ensures that each request gets exactly one reply and maintains
461/// proper protocol state. It must be used to complete the request-reply cycle
462/// after receiving a request via [`ContextfulSocket::receive`].
463///
464/// # Must use
465///
466/// This type is marked `#[must_use]` because dropping it without calling [`Responder::reply`]
467/// means the client will have to retry the request.
468///
469/// If a `Responder` is dropped without sending a reply (e.g., due to an error or cancellation), a
470/// warning is logged to indicate that the peer will need to re-send their request.
471///
472/// # Examples
473///
474/// ```rust
475/// use anng::{protocols::reqrep0, Message};
476/// use std::io::Write;
477///
478/// # #[tokio::main]
479/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
480/// # tokio::spawn(async {
481/// # let socket = reqrep0::Req0::dial(c"inproc://service").await?;
482/// # let mut ctx = socket.context();
483/// # let request = Message::from(&b"Test request"[..]);
484/// # let reply_future = ctx.request(request).await.unwrap();
485/// # let _reply = reply_future.await?;
486/// # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
487/// # });
488/// let socket = reqrep0::Rep0::listen(c"inproc://service").await?;
489/// let mut ctx = socket.context();
490///
491/// let (request, responder) = ctx.receive().await?;
492///
493/// // Process the request...
494/// let mut reply = Message::with_capacity(100);
495/// write!(&mut reply, "Processed: {:?}", request.as_slice())?;
496///
497/// // Send the reply
498/// // TODO: In production, handle error and retry with returned responder and message
499/// responder.reply(reply).await.unwrap();
500/// # Ok(())
501/// # }
502/// ```
503#[derive(Debug)]
504#[must_use]
505pub struct Responder<'s, 'socket> {
506 contextful: &'s mut ContextfulSocket<'socket, Rep0>,
507 sent: bool,
508}
509
510impl Drop for Responder<'_, '_> {
511 fn drop(&mut self) {
512 if !self.sent {
513 tracing::warn!(
514 "REP0 Responder dropped (likely cancelled); \
515 peer will have to re-send the request"
516 );
517 }
518 }
519}
520
521impl Responder<'_, '_> {
522 /// Sends a reply message to complete the request-reply cycle.
523 ///
524 /// This method consumes the `Responder` and sends the provided message
525 /// as a reply to the original request. This completes the protocol
526 /// state machine and allows the context to receive new requests.
527 ///
528 /// # Cancellation safety
529 ///
530 /// This function is **cancellation safe**. If cancelled, the reply message may or may
531 /// not be sent (depending on when cancellation occurs). The message and responder will
532 /// be dropped if not sent. Note that if the reply is not sent, the client will
533 /// typically retry their request after a timeout, resulting in a new request to
534 /// `receive()`.
535 ///
536 /// # Errors
537 ///
538 /// Returns `Err((responder, error, message))` if the send operation fails.
539 /// This gives you back all the resources so you can handle the error
540 /// appropriately (retry, log, etc.).
541 ///
542 /// # Examples
543 ///
544 /// ```rust
545 /// use anng::{protocols::reqrep0, Message, AioError};
546 /// use std::io::Write;
547 ///
548 /// # #[tokio::main]
549 /// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
550 /// # tokio::spawn(async {
551 /// # let socket = reqrep0::Req0::dial(c"inproc://replies").await?;
552 /// # let mut ctx = socket.context();
553 /// # let request = Message::from(&b"Test request"[..]);
554 /// # let reply_future = ctx.request(request).await.unwrap();
555 /// # let _reply = reply_future.await?;
556 /// # Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
557 /// # });
558 /// let socket = reqrep0::Rep0::listen(c"inproc://replies").await?;
559 /// let mut ctx = socket.context();
560 ///
561 /// let (request, responder) = ctx.receive().await?;
562 ///
563 /// let mut reply = Message::with_capacity(100);
564 /// write!(&mut reply, "Hello back!")?;
565 ///
566 /// match responder.reply(reply).await {
567 /// Ok(()) => println!("Reply sent successfully"),
568 /// Err((responder, error, message)) => {
569 /// println!("Failed to send reply: {:?}", error);
570 /// // Could retry with `responder.reply(message)`
571 /// }
572 /// }
573 /// # Ok(())
574 /// # }
575 /// ```
576 pub async fn reply(mut self, message: Message) -> Result<(), (Self, AioError, Message)> {
577 // avoid the drop warning
578 self.sent = true;
579
580 if let Err((e, msg)) = self
581 .contextful
582 .context
583 .send_msg(message, &mut self.contextful.aio)
584 .await
585 {
586 Err((self, e, msg))
587 } else {
588 Ok(())
589 }
590 }
591}