remoc/rch/oneshot/
mod.rs

1//! A one-shot channel is used for sending a single message between asynchronous, remote tasks.
2//!
3//! The sender and receiver can both be sent to remote endpoints.
4//! The channel also works if both halves are local.
5//! Forwarding over multiple connections is supported.
6//!
7//! This has similar functionality as [tokio::sync::oneshot] with the additional
8//! ability to work over remote connections.
9//!
10//! # Example
11//!
12//! In the following example the client sends a number and a oneshot channel sender to the server.
13//! The server squares the received number and sends the result back over the oneshot channel.
14//!
15//! ```
16//! use remoc::prelude::*;
17//!
18//! #[derive(Debug, serde::Serialize, serde::Deserialize)]
19//! struct SquareReq {
20//!     number: u32,
21//!     result_tx: rch::oneshot::Sender<u32>,
22//! }
23//!
24//! // This would be run on the client.
25//! async fn client(mut tx: rch::base::Sender<SquareReq>) {
26//!     let (result_tx, result_rx) = rch::oneshot::channel();
27//!     tx.send(SquareReq { number: 4, result_tx }).await.unwrap();
28//!     let result = result_rx.await.unwrap();
29//!     assert_eq!(result, 16);
30//! }
31//!
32//! // This would be run on the server.
33//! async fn server(mut rx: rch::base::Receiver<SquareReq>) {
34//!     while let Some(req) = rx.recv().await.unwrap() {
35//!         req.result_tx.send(req.number * req.number).unwrap();
36//!     }
37//! }
38//! # tokio_test::block_on(remoc::doctest::client_server(client, server));
39//! ```
40//!
41
42use serde::{Serialize, de::DeserializeOwned};
43
44use super::mpsc;
45use crate::{RemoteSend, codec};
46
47mod receiver;
48mod sender;
49
50pub use receiver::{Receiver, RecvError, TryRecvError};
51pub use sender::{SendError, Sender};
52
53/// Create a new one-shot channel for sending single values across asynchronous tasks.
54///
55/// The sender and receiver may be sent to remote endpoints via channels.
56pub fn channel<T, Codec>() -> (Sender<T, Codec>, Receiver<T, Codec>)
57where
58    T: Serialize + DeserializeOwned + Send + 'static,
59    Codec: codec::Codec,
60{
61    let (tx, rx) = mpsc::channel(1);
62    let tx = tx.set_buffer();
63    let rx = rx.set_buffer();
64    (Sender(tx), Receiver(rx))
65}
66
67/// Extensions for oneshot channels.
68pub trait OneshotExt<T, Codec, const MAX_ITEM_SIZE: usize> {
69    /// Sets the maximum item size for the channel.
70    fn with_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(
71        self,
72    ) -> (Sender<T, Codec>, Receiver<T, Codec, NEW_MAX_ITEM_SIZE>);
73}
74
75impl<T, Codec, const MAX_ITEM_SIZE: usize> OneshotExt<T, Codec, MAX_ITEM_SIZE>
76    for (Sender<T, Codec>, Receiver<T, Codec, MAX_ITEM_SIZE>)
77where
78    T: RemoteSend,
79    Codec: codec::Codec,
80{
81    fn with_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(
82        self,
83    ) -> (Sender<T, Codec>, Receiver<T, Codec, NEW_MAX_ITEM_SIZE>) {
84        let (mut tx, rx) = self;
85        tx.set_max_item_size(NEW_MAX_ITEM_SIZE);
86        let rx = rx.set_max_item_size();
87        (tx, rx)
88    }
89}