dcl_rpc/stream_protocol.rs
1//! This contains all the types and logic needed when a stream procedure is opened in whatever of the two sides (client or server).
2//!
3//! It contains the [`Generator`] type used when a server or a client wants to consume the stream opened and its messages.
4//!
5use std::{future::Future, sync::Arc};
6
7use log::debug;
8use prost::Message;
9
10use tokio::{
11 select,
12 sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
13};
14
15use async_channel::{unbounded, Receiver as AsyncChannelReceiver, Sender as AsyncChannelSender};
16
17use tokio_util::sync::CancellationToken;
18
19use crate::{
20 rpc_protocol::{parse::build_message_identifier, RpcMessageTypes, StreamMessage},
21 transports::{Transport, TransportError},
22 CommonError,
23};
24
25/// It knows how to process all the stream messages for client streams, server streams and bidirectional streams.
26///
27/// And it should be used to consume the stream messages when a stream procedure is executed
28///
29pub struct StreamProtocol<T: Transport + ?Sized> {
30 /// the ID of Port
31 port_id: u32,
32 /// ID of the message that inited the streaming
33 message_number: u32,
34 /// The last sequence id received in a [`StreamMessage`]
35 last_received_sequence_id: u32,
36 /// Flag to know if the remote half is closed or closed the stream
37 is_remote_closed: bool,
38 /// Flag to know if the remote half was open in the last message
39 was_open: bool,
40 /// Generator field in charge of both half of a generator
41 ///
42 /// [`Generator`] in charge of waiting for the next stream message
43 ///
44 /// [`GeneratorYielder`] in charge of sending the next stream message to the [`Generator`]
45 ///
46 generator: (Generator<Vec<u8>>, GeneratorYielder<Vec<u8>>),
47 /// The transport used for the communications
48 transport: Arc<T>,
49 /// Cancellation token to cancel the background task listening for new messages
50 process_cancellation_token: CancellationToken,
51}
52
53impl<T: Transport + ?Sized + 'static> StreamProtocol<T> {
54 pub(crate) fn new(transport: Arc<T>, port_id: u32, message_number: u32) -> Self {
55 Self {
56 last_received_sequence_id: 0,
57 is_remote_closed: false,
58 was_open: false,
59 generator: Generator::create(),
60 transport,
61 message_number,
62 port_id,
63 process_cancellation_token: CancellationToken::new(),
64 }
65 }
66
67 /// Get the next received stream message
68 ///
69 /// It'll be sent by the [`GeneratorYielder`] but actually the message comes from the other half ([`RpcServer`](crate::server::RpcServer) or [`RpcClient`](crate::client::RpcClient) )
70 ///
71 /// You won't use this function direcly, you should turn the [`StreamProtocol`] into a [`Generator`] using [`to_generator`](#method.to_generator)
72 ///
73 async fn next(&mut self) -> Option<Vec<u8>> {
74 select! {
75 _ = self.process_cancellation_token.cancelled() => {
76 self.generator.0.close();
77 self.is_remote_closed = true;
78 None
79 }
80 message = self.generator.0.next() => {
81 match message {
82 Some(msg) => {
83 self.last_received_sequence_id += 1;
84 self.was_open = true;
85 // Ack Message to let know the peer that we are ready to receive another message
86 let stream_message = StreamMessage {
87 port_id: self.port_id,
88 sequence_id: self.last_received_sequence_id,
89 message_identifier: build_message_identifier(
90 RpcMessageTypes::StreamAck as u32,
91 self.message_number,
92 ),
93 payload: vec![],
94 closed: false,
95 ack: true,
96 };
97
98 if let Err(err) = self.transport
99 .send(stream_message.encode_to_vec())
100 .await {
101 log::error!("> StreamProtocol > next > Error while sending the ACK StreamMessage through the transport: {err:?}");
102 // If the message cannot be ack, we should close the stream
103 self.is_remote_closed = true;
104 return None;
105 }
106
107
108 Some(msg)
109 }
110 None => {
111 self.is_remote_closed = true;
112 None
113 }
114 }
115 }
116 }
117 }
118
119 /// It consumes the [`StreamProtocol`] and returns a [`Generator`].
120 ///
121 /// It allows to pass a closure to transform the values that the internal [`Generator`] has.
122 ///
123 /// The transform closure has to return an Option, this allows the user to return None if something failed in the closure or it depends on some condition.
124 ///
125 /// It transforms the values and sends it to the returned [`Generator`] in a background task for a immediate response.
126 ///
127 pub fn to_generator<
128 O: Send + Sync + 'static,
129 F: Fn(Vec<u8>) -> Option<O> + Send + Sync + 'static,
130 >(
131 mut self,
132 transformer: F,
133 ) -> Generator<O> {
134 let (generator, generator_yielder) = Generator::create();
135 tokio::spawn(async move {
136 while let Some(item) = self.next().await {
137 if let Some(item_decoded) = transformer(item) {
138 if generator_yielder.r#yield(item_decoded).await.is_err() {
139 log::error!("> StreamProtocol > to_generator > Generator > error r#yield, probably it's closed");
140 log::debug!("> StreamProtocol > to_generator > Generator > breaking to stop yielding");
141 break; // channel is closed
142 }
143 }
144 }
145 });
146
147 generator
148 }
149
150 /// It sends an ACK stream message through the transport given to [`StreamProtocol`] to let the other half know that the strem is opened
151 async fn acknowledge_open(&self) -> Result<(), TransportError> {
152 let stream_message = StreamMessage {
153 port_id: self.port_id,
154 sequence_id: self.last_received_sequence_id,
155 message_identifier: build_message_identifier(
156 RpcMessageTypes::StreamAck as u32,
157 self.message_number,
158 ),
159 payload: vec![],
160 closed: false,
161 ack: true,
162 };
163
164 self.transport.send(stream_message.encode_to_vec()).await
165 }
166
167 /// Finishs the stream processing, closes all generators and cancels all background tasks
168 pub fn close(&mut self) {
169 self.generator.0.close();
170 self.process_cancellation_token.cancel();
171 }
172
173 /// Spawns a background task to start processing the stream messages and returns a listener ([`AsyncChannelSender`])
174 ///
175 /// The returned listener will be registered in [`crate::messages_handlers::ServerMessagesHandler`] or [`crate::messages_handlers::ClientMessagesHandler`] that the [`RpcServer`](crate::server::RpcServer) and [`RpcClient`](crate::client::RpcClient) contains
176 ///
177 /// Each new message that either of both structs receives will be handled by the messages handler and sent to the returned listener if it corresponds
178 ///
179 /// The callback expected in params, it should be to remove the listener from the messages handler when the the processing finishes
180 ///
181 pub(crate) async fn start_processing<
182 F: Future + Send,
183 Callback: FnOnce() -> F + Send + 'static,
184 >(
185 &self,
186 callback: Callback,
187 ) -> Result<AsyncChannelSender<(RpcMessageTypes, u32, StreamMessage)>, CommonError> {
188 if self.acknowledge_open().await.is_err() {
189 return Err(CommonError::TransportError);
190 }
191 let token = self.process_cancellation_token.clone();
192 let (messages_listener, messages_processor) = unbounded();
193 let internal_channel = self.generator.1.clone();
194 tokio::spawn(async move {
195 let token_cloned = token.clone();
196 select! {
197 _ = token.cancelled() => {
198 debug!("> StreamProtocol cancelled!");
199 callback().await;
200 // TOOD: Communicate error
201 },
202 _ = Self::process_messages(messages_processor, internal_channel, token_cloned) => {
203 callback().await;
204 }
205 }
206 });
207
208 Ok(messages_listener)
209 }
210
211 /// Processes stream messages, it's called by the background task spawned in [`start_processing`](#method.start_processing)
212 ///
213 /// It receives each [`StreamMessage`] and decides what to do with the stream messages processing, if it has to continue yielding messages or close the stream processing
214 ///
215 async fn process_messages(
216 messages_processor: AsyncChannelReceiver<(RpcMessageTypes, u32, StreamMessage)>,
217 internal_channel_sender: GeneratorYielder<Vec<u8>>,
218 cancellation_token: CancellationToken,
219 ) {
220 while let Ok((message_type, _, stream_message)) = messages_processor.recv().await {
221 if matches!(message_type, RpcMessageTypes::StreamMessage) {
222 if stream_message.closed {
223 cancellation_token.cancel();
224 messages_processor.close();
225 } else if internal_channel_sender
226 .r#yield(stream_message.payload)
227 .await
228 .is_err()
229 {
230 log::error!("> StreamProtocol > process_messages > Error on sending through the Generator, seems to be dropped")
231 }
232 } else if matches!(message_type, RpcMessageTypes::RemoteErrorResponse) {
233 cancellation_token.cancel();
234 messages_processor.close();
235 // TOOD: Communicate error
236 }
237 }
238 }
239}
240
241/// Errors related with [`Generator`] and [`GeneratorYielder`]
242#[derive(Debug)]
243pub enum GeneratorError {
244 /// The underlying channel is closed, if it was unable to insert.
245 ///
246 /// So if this error appears, it's due to a closed channel. We must stop doing [`GeneratorYielder::r#yield`]
247 ///
248 UnableToInsert,
249}
250
251/// Generator struct contains only one field which it's an unbounded receiver from unounded channel from [`tokio`] crate
252///
253/// The other half of the unbounded channel is given to the [`GeneratorYielder`]
254///
255pub struct Generator<M>(UnboundedReceiver<M>);
256
257impl<M: Send + Sync + 'static> Generator<M> {
258 /// Creates an unbounded channel and returns a [`Generator`] and [`GeneratorYielder`]
259 pub fn create() -> (Self, GeneratorYielder<M>) {
260 let channel = unbounded_channel();
261 (Self(channel.1), GeneratorYielder::new(channel.0))
262 }
263
264 // TODO: could it be a trait and reuse for other structs?
265 /// This functions takes a owned [`Generator`] to turn the items in that generator into another ones
266 /// but keeping the items in a generator.
267 ///
268 /// It allows to pass a closure to transform the values in the old [`Generator`].
269 ///
270 /// The transform closure has to return an Option, this allows the user to return None if something failed in the closure or it depends on some condition.
271 ///
272 /// It transforms the values and sends it to the returned [`Generator`] in a background task for a immediate response.
273 pub fn from_generator<
274 O: Send + Sync + 'static,
275 F: Fn(M) -> Option<O> + Send + Sync + 'static,
276 >(
277 mut old_generator: Generator<M>,
278 transformer: F,
279 ) -> Generator<O> {
280 let (generator, generator_yielder) = Generator::create();
281 tokio::spawn(async move {
282 while let Some(item) = old_generator.next().await {
283 if let Some(new_item) = transformer(item) {
284 if generator_yielder.r#yield(new_item).await.is_err() {
285 log::error!("> Generator > error r#yield, probably it's closed");
286 log::debug!("> Generator > breaking to stop yielding");
287 break; // channel is closed
288 }
289 }
290 }
291 });
292
293 generator
294 }
295
296 pub async fn next(&mut self) -> Option<M> {
297 self.0.recv().await
298 }
299
300 pub fn close(&mut self) {
301 self.0.close()
302 }
303}
304
305/// The other half for a [`Generator`]. It contains an only one field which it's an unbounded sender from a unbounded channel from [`tokio`] crate
306///
307/// It's in charge of sendin the values to the [`Generator`]
308///
309pub struct GeneratorYielder<M>(UnboundedSender<M>);
310
311impl<M> GeneratorYielder<M> {
312 fn new(sender: UnboundedSender<M>) -> Self {
313 Self(sender)
314 }
315
316 pub async fn r#yield(&self, item: M) -> Result<(), GeneratorError> {
317 match self.0.send(item) {
318 Ok(_) => Ok(()),
319 Err(_) => Err(GeneratorError::UnableToInsert),
320 }
321 }
322}
323
324impl<M> Clone for GeneratorYielder<M> {
325 fn clone(&self) -> Self {
326 Self(self.0.clone())
327 }
328}