async_session_types/multiplexing/mod.rs
1use std::collections::BTreeMap;
2
3use crate::{unbounded_channel, Receiver, Repr, Sender, SessionError};
4
5pub mod incoming;
6pub mod outgoing;
7
8/// A multiplexed message, consisting of the ID of the protocol
9/// and dynamic content that only the corresponding session will
10/// know how to handle, and check if it is indeed the next
11/// expected message.
12///
13/// A message like this is coming from or going to a specific
14/// network connection. It is assumed that there is no need to
15/// identify the remote party, since the protocol would only
16/// be talking to one party at a time that already got determined
17/// at connection time.
18///
19/// We can probably assume that there will be only one instance
20/// of each protocol per remote party, so `P` will be a simple
21/// static identifier for the protocol. If that's not the case,
22/// it could be more complex, like a unique session ID.
23///
24/// It is up to some higher level component to handle the wire
25/// format of the message, including turning the payload and
26/// the protocol ID into binary or JSON for example. This is
27/// what the `Repr` allows us to do, e.g. to have each message
28/// implement `serde`, wrap then in an enum, that gets serialised
29/// at the edge of the system to/from JSON.
30///
31/// A network connection will be either incoming or outgoing,
32/// i.e. for Alice to follow Bob and for Bob to follow Alice
33/// they will need 2 TCP connections, one from Alice to Bob
34/// and another from Bob to Alice. If it wasn't so we would
35/// need a way to differentiate between two instances of the
36/// same protocol, a unique session ID for each message.
37///
38/// Another way of doing it would be to add two flags to the message:
39/// 1) indicate the direction of the connection, incoming or outgoing
40/// 2) indicate whether the protocol was initiated by the source or
41/// the target of the connection.
42///
43/// This would facilitate all communication over a single TCP connection,
44/// however we would need to start with protocol negotiation to find out
45/// if the other side even supports serving certain protocols. It would
46/// also make it more difficult to tell if a connection can be closed,
47/// because we wouldn't know if the other side has interest in keeping it
48/// open even if currently there are no sessions.
49#[derive(Debug)]
50pub struct MultiMessage<P, R> {
51 /// The protocol ID features explicitly in the message, rather than
52 /// as a type class, because for example with `DynMessage` we don't
53 /// have a unique mapping between a message type and its protocol.
54 /// For example multiple protocols can expect a `String`.
55 pub protocol_id: P,
56
57 /// The payload is the `Repr` representation of the message,
58 /// to be (un)wrapped by the session.
59 pub payload: R,
60}
61
62impl<P, R> MultiMessage<P, R> {
63 /// Wrap a raw message that has a `Repr` into a `MultiMessage`.
64 pub fn new<T: Send + 'static>(protocol_id: P, msg: T) -> Self
65 where
66 R: Repr<T>,
67 {
68 Self {
69 protocol_id,
70 payload: Repr::from(msg),
71 }
72 }
73}
74
75/// The multiplexer takes messages from multiple channels that use
76/// `DynMessage` for payload (each associated with a session),
77/// attaches the protocol ID and relays the message into a common
78/// outgoing channel using `MultiMessage`.
79///
80/// A multiplexer instance is unique to one connected party.
81///
82/// ```text
83/// Protocol 1 ---> | \
84/// | \
85/// Protocol 2 ---> |mux| --> MultiMessage 1|2|3
86/// | /
87/// Protocol 3 ---> | /
88/// ```
89struct Multiplexer<P, R> {
90 tx: Sender<MultiMessage<P, R>>,
91 // NOTE: There used to be an `rxs` here similar to the `txs` in the `Demultiplexer`,
92 // however since switching to `tokio`, waiting on multiple channels has to work
93 // with a local `FuturesUnordered` in the runner loops that own the receives.
94}
95
96impl<P, R> Multiplexer<P, R> {
97 pub fn new(tx: Sender<MultiMessage<P, R>>) -> Self {
98 Self { tx }
99 }
100
101 /// Wrap and send a multiplexed message.
102 ///
103 /// Return `false` if the outgoing channel has already been closed.
104 pub fn send(&self, protocol_id: P, payload: R) -> bool {
105 self.tx
106 .send(MultiMessage {
107 protocol_id,
108 payload,
109 })
110 .is_ok()
111 }
112}
113
114/// The demultiplexer reads `MultiMessage` from an incoming channel
115/// that is associated with a remote party (e.g. a TCP connection)
116/// and dispatches the messages to protocol specific channels, each
117/// associated with a different session with the same party.
118///
119/// ```text
120/// / | ---> Protocol 1
121/// / |
122/// MultiMessage 1|2|3 ---> |demux| ---> Protocol 2
123/// \ |
124/// \ | ---> Protocol 3
125/// ```
126struct Demultiplexer<P, R> {
127 rx: Receiver<MultiMessage<P, R>>,
128 txs: BTreeMap<P, Sender<R>>,
129}
130
131impl<P, R> Demultiplexer<P, R> {
132 pub fn new(rx: Receiver<MultiMessage<P, R>>) -> Self {
133 Self {
134 rx,
135 txs: BTreeMap::new(),
136 }
137 }
138
139 pub async fn recv(&mut self) -> Option<(P, R)> {
140 self.rx.recv().await.map(|mm| (mm.protocol_id, mm.payload))
141 }
142}
143
144/// A helper struct to hold on to both sides of a channel.
145struct Chan<T> {
146 rx: Receiver<T>,
147 tx: Sender<T>,
148}
149
150impl<T> Chan<T> {
151 pub fn new() -> Self {
152 let (tx, rx) = unbounded_channel();
153 Self { tx, rx }
154 }
155}
156
157/// Channel we an use to allow protocols to signal the need to abort the
158/// connection with the remote party.
159///
160/// Just by abandoning the loop that processes incoming messages from the
161/// connection we should see the channel closing down, which the network
162/// handler will be able to detect and close the physical connection.
163type ErrorChan = Chan<SessionError>;
164
165/// Channel we can use to register newly started sessions that want to talk to a remote party.
166type AddChan<P, R> = Chan<AddMsg<P, R>>;
167
168/// A message we can send to over the `AddChan` to get a newly created session registered.
169type AddMsg<P, R> = (P, Sender<R>, Receiver<R>);