hermes_runtime_components/traits/channel.rs
1/*!
2 Support for use of abstract communication channels within a runtime context.
3
4 This module define the abstract traits that can be implemented by a runtime
5 context to support message-passing concurrency. This provides similar
6 functionalities as the Rust channel types defined in
7 [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html).
8*/
9
10use cgp::prelude::*;
11
12use crate::traits::stream::HasStreamType;
13
14/**
15 Provides the abstract `Sender` and `Receiver` types for messsage-passing.
16
17 The `Sender` and `Receiver` types are parameterized by an arbitrary payload
18 type `T` using generic associated types. Given any payload type `T` that
19 implements [`Async`], a runtime context that implements `HasChannelTypes`
20 should be able to provide the concrete types `Sender<T>` and `Receiver<T>`,
21 where messages of type `T` can be sent over to the `Sender<T>` end and
22 received from the `Receiver<T>` end.
23
24 The abstract `Sender` and `Receiver` types need to support the
25 message-passing passing asynchronously, i.e. inside async functions.
26 As a result, although it work similar to the Rust channel provided in
27 the standard library,
28 [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html),
29 the standard Rust channels are not suited for use here, as they could block
30 the entire thread running the async tasks.
31
32 Instead, there are async equivalent of the channel types offered by async
33 libraries such as Tokio's
34 [tokio::sync::mpsc::channel](https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.channel.html)
35 or async-std's [async_std::channel](https://docs.rs/async-std/latest/async_std/channel/index.html).
36
37 A main difference between the channel types defined here and the common
38 MPSC (multiple producer, single consumer) channels in the stated libraries
39 is that we allow multiple consumers to use the same receiver. This is to
40 avoid the use of `&mut` references, which would require the entire context
41 containing a receiver to be mutable. Instead, concrete types can wrap a
42 single-consumer receiver as `Arc<Mutex<Receiver<T>>>` in the concrete
43 type definition, so that it can be used as a multi-consumer receiver.
44
45 The methods for using the abstract channel types are available in separate
46 traits, [`CanCreateChannels`] and [`CanUseChannels`]. This is because code
47 that needs to create new channels do not necessary need to use the channels,
48 and vice versa. Having separate traits makes it clear which capability a
49 component needs from the runtime.
50
51 There is also a similar trait
52 [`HasChannelOnceTypes`](super::channel_once::HasChannelOnceTypes),
53 which defines abstract one-shot channel types that allow at most one message
54 to be sent over.
55*/
56#[derive_component(ChannelTypeComponent, ProvideChannelType<Runtime>)]
57pub trait HasChannelTypes: Async {
58 /**
59 The sender end of a channel with payload type `T`.
60 */
61 type Sender<T>: Async
62 where
63 T: Async;
64
65 /**
66 The receiver end of a channel with payload type `T`.
67 */
68 type Receiver<T>: Async
69 where
70 T: Async;
71}
72
73/**
74 Allow the creation of new sender-receiver pairs for the channel types
75 defined in [`HasChannelTypes`].
76*/
77#[derive_component(ChannelCreatorComponent, ChannelCreator<Runtime>)]
78pub trait CanCreateChannels: HasChannelTypes {
79 /**
80 Given a generic payload type `T`, create a
81 [`Sender<T>`](HasChannelTypes::Sender<T>) and
82 [`Receiver<T>`](HasChannelTypes::Receiver<T>) pair that are connected.
83
84 The returned sender-receiver pair is expected to satisfy the following
85 invariants:
86
87 - Messages sent from the returned sender are expected to be received
88 via the returned receiver.
89
90 - Messages sent from mismatch sender should never be received by the
91 given receiver.
92
93 More invariants may be added in the future to better specify the
94 requirements of the abstract channel. For now, we assume that mainstream
95 implementation of Rust channels can all satisfy the same requirements.
96 */
97 fn new_channel<T>() -> (Self::Sender<T>, Self::Receiver<T>)
98 where
99 T: Async;
100}
101
102/**
103 Allow the sending and receiving of message payloads over the
104 [`Sender`](HasChannelTypes::Sender<T>) and
105 [`Receiver`](HasChannelTypes::Receiver<T>) ends of a channel.
106*/
107#[derive_component(ChannelUserComponent, ChannelUser<Runtime>)]
108#[async_trait]
109pub trait CanUseChannels: HasChannelTypes + HasErrorType {
110 /**
111 Given a reference to [`Sender<T>`](HasChannelTypes::Sender<T>),
112 send a message payload of type `T` over the sender.
113
114 If the receiver side of the channel has been dropped, the sending would
115 fail and an error will be returned.
116
117 The sending operation is _synchronous_. This ensures the payload is
118 guaranteed to be in the channel queue after `send()` is called.
119
120 The receiving operation is expected to be _asynchronous_. This means
121 that any operation after `receive()` is called on the receiving end
122 should _never_ execute within `send()`.
123 */
124 async fn send<T>(sender: &Self::Sender<T>, value: T) -> Result<(), Self::Error>
125 where
126 T: Async;
127
128 /**
129 Given a reference to [`Receiver<T>`](HasChannelTypes::Receiver<T>),
130 asynchronously receive a message payload of type `T` that is sent
131 over the sender end.
132
133 If the sender end is dropped before any value is sent, this would result
134 in an error in `receive()`
135 */
136 async fn receive<T>(receiver: &mut Self::Receiver<T>) -> Result<T, Self::Error>
137 where
138 T: Async;
139
140 fn try_receive<T>(receiver: &mut Self::Receiver<T>) -> Result<Option<T>, Self::Error>
141 where
142 T: Async;
143}
144
145#[derive_component(ReceiverStreamerComponent, ReceiverStreamer<Runtime>)]
146pub trait CanStreamReceiver: HasChannelTypes + HasStreamType {
147 fn receiver_to_stream<T>(receiver: Self::Receiver<T>) -> Self::Stream<T>
148 where
149 T: Async;
150}
151
152#[derive_component(SenderClonerComponent, SenderCloner<Runtime>)]
153pub trait CanCloneSender: HasChannelTypes {
154 fn clone_sender<T>(sender: &Self::Sender<T>) -> Self::Sender<T>
155 where
156 T: Async;
157}