remoc/rch/oneshot/mod.rs
1//! A one-shot channel is used for sending a single message between asynchronous, remote tasks.
2//!
3//! The sender and receiver can both be sent to remote endpoints.
4//! The channel also works if both halves are local.
5//! Forwarding over multiple connections is supported.
6//!
7//! This has similar functionality as [tokio::sync::oneshot] with the additional
8//! ability to work over remote connections.
9//!
10//! # Example
11//!
12//! In the following example the client sends a number and a oneshot channel sender to the server.
13//! The server squares the received number and sends the result back over the oneshot channel.
14//!
15//! ```
16//! use remoc::prelude::*;
17//!
18//! #[derive(Debug, serde::Serialize, serde::Deserialize)]
19//! struct SquareReq {
20//! number: u32,
21//! result_tx: rch::oneshot::Sender<u32>,
22//! }
23//!
24//! // This would be run on the client.
25//! async fn client(mut tx: rch::base::Sender<SquareReq>) {
26//! let (result_tx, result_rx) = rch::oneshot::channel();
27//! tx.send(SquareReq { number: 4, result_tx }).await.unwrap();
28//! let result = result_rx.await.unwrap();
29//! assert_eq!(result, 16);
30//! }
31//!
32//! // This would be run on the server.
33//! async fn server(mut rx: rch::base::Receiver<SquareReq>) {
34//! while let Some(req) = rx.recv().await.unwrap() {
35//! req.result_tx.send(req.number * req.number).unwrap();
36//! }
37//! }
38//! # tokio_test::block_on(remoc::doctest::client_server(client, server));
39//! ```
40//!
41
42use serde::{Serialize, de::DeserializeOwned};
43
44use super::mpsc;
45use crate::{RemoteSend, codec};
46
47mod receiver;
48mod sender;
49
50pub use receiver::{Receiver, RecvError, TryRecvError};
51pub use sender::{SendError, Sender};
52
53/// Create a new one-shot channel for sending single values across asynchronous tasks.
54///
55/// The sender and receiver may be sent to remote endpoints via channels.
56pub fn channel<T, Codec>() -> (Sender<T, Codec>, Receiver<T, Codec>)
57where
58 T: Serialize + DeserializeOwned + Send + 'static,
59 Codec: codec::Codec,
60{
61 let (tx, rx) = mpsc::channel(1);
62 let tx = tx.set_buffer();
63 let rx = rx.set_buffer();
64 (Sender(tx), Receiver(rx))
65}
66
67/// Extensions for oneshot channels.
68pub trait OneshotExt<T, Codec, const MAX_ITEM_SIZE: usize> {
69 /// Sets the maximum item size for the channel.
70 fn with_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(
71 self,
72 ) -> (Sender<T, Codec>, Receiver<T, Codec, NEW_MAX_ITEM_SIZE>);
73}
74
75impl<T, Codec, const MAX_ITEM_SIZE: usize> OneshotExt<T, Codec, MAX_ITEM_SIZE>
76 for (Sender<T, Codec>, Receiver<T, Codec, MAX_ITEM_SIZE>)
77where
78 T: RemoteSend,
79 Codec: codec::Codec,
80{
81 fn with_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(
82 self,
83 ) -> (Sender<T, Codec>, Receiver<T, Codec, NEW_MAX_ITEM_SIZE>) {
84 let (mut tx, rx) = self;
85 tx.set_max_item_size(NEW_MAX_ITEM_SIZE);
86 let rx = rx.set_max_item_size();
87 (tx, rx)
88 }
89}