hermes_runtime_components/traits/channel.rs
1/*!
2   Support for use of abstract communication channels within a runtime context.
3
4   This module define the abstract traits that can be implemented by a runtime
5   context to support message-passing concurrency. This provides similar
6   functionalities as the Rust channel types defined in
7   [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html).
8*/
9
10use cgp::prelude::*;
11
12use crate::traits::stream::HasStreamType;
13
14/**
15   Provides the abstract `Sender` and `Receiver` types for messsage-passing.
16
17   The `Sender` and `Receiver` types are parameterized by an arbitrary payload
18   type `T` using generic associated types. Given any payload type `T` that
19   implements [`Async`], a runtime context that implements `HasChannelTypes`
20   should be able to provide the concrete types `Sender<T>` and `Receiver<T>`,
21   where messages of type `T` can be sent over to the `Sender<T>` end and
22   received from the `Receiver<T>` end.
23
24   The abstract `Sender` and `Receiver` types need to support the
25   message-passing passing asynchronously, i.e. inside async functions.
26   As a result, although it work similar to the Rust channel provided in
27   the standard library,
28   [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html),
29   the standard Rust channels are not suited for use here, as they could block
30   the entire thread running the async tasks.
31
32   Instead, there are async equivalent of the channel types offered by async
33   libraries such as Tokio's
34   [tokio::sync::mpsc::channel](https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.channel.html)
35   or async-std's [async_std::channel](https://docs.rs/async-std/latest/async_std/channel/index.html).
36
37   A main difference between the channel types defined here and the common
38   MPSC (multiple producer, single consumer) channels in the stated libraries
39   is that we allow multiple consumers to use the same receiver. This is to
40   avoid the use of `&mut` references, which would require the entire context
41   containing a receiver to be mutable. Instead, concrete types can wrap a
42   single-consumer receiver as `Arc<Mutex<Receiver<T>>>` in the concrete
43   type definition, so that it can be used as a multi-consumer receiver.
44
45   The methods for using the abstract channel types are available in separate
46   traits, [`CanCreateChannels`] and [`CanUseChannels`]. This is because code
47   that needs to create new channels do not necessary need to use the channels,
48   and vice versa. Having separate traits makes it clear which capability a
49   component needs from the runtime.
50
51   There is also a similar trait
52   [`HasChannelOnceTypes`](super::channel_once::HasChannelOnceTypes),
53   which defines abstract one-shot channel types that allow at most one message
54   to be sent over.
55*/
56#[derive_component(ChannelTypeComponent, ProvideChannelType<Runtime>)]
57pub trait HasChannelTypes: Async {
58    /**
59       The sender end of a channel with payload type `T`.
60    */
61    type Sender<T>: Async
62    where
63        T: Async;
64
65    /**
66       The receiver end of a channel with payload type `T`.
67    */
68    type Receiver<T>: Async
69    where
70        T: Async;
71}
72
73/**
74   Allow the creation of new sender-receiver pairs for the channel types
75   defined in [`HasChannelTypes`].
76*/
77#[derive_component(ChannelCreatorComponent, ChannelCreator<Runtime>)]
78pub trait CanCreateChannels: HasChannelTypes {
79    /**
80       Given a generic payload type `T`, create a
81       [`Sender<T>`](HasChannelTypes::Sender<T>) and
82       [`Receiver<T>`](HasChannelTypes::Receiver<T>) pair that are connected.
83
84       The returned sender-receiver pair is expected to satisfy the following
85       invariants:
86
87         - Messages sent from the returned sender are expected to be received
88           via the returned receiver.
89
90         - Messages sent from mismatch sender should never be received by the
91           given receiver.
92
93       More invariants may be added in the future to better specify the
94       requirements of the abstract channel. For now, we assume that mainstream
95       implementation of Rust channels can all satisfy the same requirements.
96    */
97    fn new_channel<T>() -> (Self::Sender<T>, Self::Receiver<T>)
98    where
99        T: Async;
100}
101
102/**
103   Allow the sending and receiving of message payloads over the
104   [`Sender`](HasChannelTypes::Sender<T>) and
105   [`Receiver`](HasChannelTypes::Receiver<T>) ends of a channel.
106*/
107#[derive_component(ChannelUserComponent, ChannelUser<Runtime>)]
108#[async_trait]
109pub trait CanUseChannels: HasChannelTypes + HasErrorType {
110    /**
111       Given a reference to [`Sender<T>`](HasChannelTypes::Sender<T>),
112       send a message payload of type `T` over the sender.
113
114       If the receiver side of the channel has been dropped, the sending would
115       fail and an error will be returned.
116
117       The sending operation is _synchronous_. This ensures the payload is
118       guaranteed to be in the channel queue after `send()` is called.
119
120       The receiving operation is expected to be _asynchronous_. This means
121       that any operation after `receive()` is called on the receiving end
122       should _never_ execute within `send()`.
123    */
124    async fn send<T>(sender: &Self::Sender<T>, value: T) -> Result<(), Self::Error>
125    where
126        T: Async;
127
128    /**
129       Given a reference to [`Receiver<T>`](HasChannelTypes::Receiver<T>),
130       asynchronously receive a message payload of type `T` that is sent
131       over the sender end.
132
133       If the sender end is dropped before any value is sent, this would result
134       in an error in `receive()`
135    */
136    async fn receive<T>(receiver: &mut Self::Receiver<T>) -> Result<T, Self::Error>
137    where
138        T: Async;
139
140    fn try_receive<T>(receiver: &mut Self::Receiver<T>) -> Result<Option<T>, Self::Error>
141    where
142        T: Async;
143}
144
145#[derive_component(ReceiverStreamerComponent, ReceiverStreamer<Runtime>)]
146pub trait CanStreamReceiver: HasChannelTypes + HasStreamType {
147    fn receiver_to_stream<T>(receiver: Self::Receiver<T>) -> Self::Stream<T>
148    where
149        T: Async;
150}
151
152#[derive_component(SenderClonerComponent, SenderCloner<Runtime>)]
153pub trait CanCloneSender: HasChannelTypes {
154    fn clone_sender<T>(sender: &Self::Sender<T>) -> Self::Sender<T>
155    where
156        T: Async;
157}