dcl_rpc/
stream_protocol.rs

1//! This contains all the types and logic needed when a stream procedure is opened in whatever of the two sides (client or server).
2//!
3//! It contains the [`Generator`] type used when a server or a client wants to consume the stream opened and its messages.
4//!
5use std::{future::Future, sync::Arc};
6
7use log::debug;
8use prost::Message;
9
10use tokio::{
11    select,
12    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
13};
14
15use async_channel::{unbounded, Receiver as AsyncChannelReceiver, Sender as AsyncChannelSender};
16
17use tokio_util::sync::CancellationToken;
18
19use crate::{
20    rpc_protocol::{parse::build_message_identifier, RpcMessageTypes, StreamMessage},
21    transports::{Transport, TransportError},
22    CommonError,
23};
24
25/// It knows how to process all the stream messages for client streams, server streams and bidirectional streams.
26///
27/// And it should be used to consume the stream messages when a stream procedure is executed
28///
29pub struct StreamProtocol<T: Transport + ?Sized> {
30    /// the ID of Port
31    port_id: u32,
32    /// ID of the message that inited the streaming
33    message_number: u32,
34    /// The last sequence id received in a [`StreamMessage`]
35    last_received_sequence_id: u32,
36    /// Flag to know if the remote half is closed or closed the stream
37    is_remote_closed: bool,
38    /// Flag to know if the remote half was open in the last message
39    was_open: bool,
40    /// Generator field in charge of both half of a generator
41    ///
42    /// [`Generator`] in charge of waiting for the next stream message
43    ///
44    /// [`GeneratorYielder`] in charge of sending the next stream message to the [`Generator`]
45    ///
46    generator: (Generator<Vec<u8>>, GeneratorYielder<Vec<u8>>),
47    /// The transport used for the communications
48    transport: Arc<T>,
49    /// Cancellation token to cancel the background task listening for new messages
50    process_cancellation_token: CancellationToken,
51}
52
53impl<T: Transport + ?Sized + 'static> StreamProtocol<T> {
54    pub(crate) fn new(transport: Arc<T>, port_id: u32, message_number: u32) -> Self {
55        Self {
56            last_received_sequence_id: 0,
57            is_remote_closed: false,
58            was_open: false,
59            generator: Generator::create(),
60            transport,
61            message_number,
62            port_id,
63            process_cancellation_token: CancellationToken::new(),
64        }
65    }
66
67    /// Get the next received stream message
68    ///
69    /// It'll be sent by the [`GeneratorYielder`] but actually the message comes from the other half ([`RpcServer`](crate::server::RpcServer)  or [`RpcClient`](crate::client::RpcClient) )
70    ///
71    /// You won't use this function direcly, you should turn the [`StreamProtocol`] into a [`Generator`] using [`to_generator`](#method.to_generator)
72    ///
73    async fn next(&mut self) -> Option<Vec<u8>> {
74        select! {
75            _ = self.process_cancellation_token.cancelled() =>  {
76                self.generator.0.close();
77                self.is_remote_closed = true;
78                None
79            }
80            message = self.generator.0.next() => {
81                match message {
82                    Some(msg) => {
83                        self.last_received_sequence_id += 1;
84                        self.was_open = true;
85                        // Ack Message to let know the peer that we are ready to receive another message
86                        let stream_message = StreamMessage {
87                            port_id: self.port_id,
88                            sequence_id: self.last_received_sequence_id,
89                            message_identifier: build_message_identifier(
90                                RpcMessageTypes::StreamAck as u32,
91                                self.message_number,
92                            ),
93                            payload: vec![],
94                            closed: false,
95                            ack: true,
96                        };
97
98                        if let Err(err) = self.transport
99                            .send(stream_message.encode_to_vec())
100                            .await {
101                            log::error!("> StreamProtocol > next > Error while sending the ACK StreamMessage through the transport: {err:?}");
102                            // If the message cannot be ack, we should close the stream
103                            self.is_remote_closed = true;
104                            return None;
105                        }
106
107
108                        Some(msg)
109                    }
110                    None => {
111                        self.is_remote_closed = true;
112                        None
113                    }
114                }
115            }
116        }
117    }
118
119    /// It consumes the [`StreamProtocol`] and returns a [`Generator`].
120    ///
121    /// It allows to pass a closure to transform the values that the internal [`Generator`] has.
122    ///
123    /// The transform closure has to return an Option, this allows the user to return None if something failed in the closure or it depends on some condition.
124    ///
125    /// It transforms the values and sends it to the returned [`Generator`] in a background task for a immediate response.
126    ///
127    pub fn to_generator<
128        O: Send + Sync + 'static,
129        F: Fn(Vec<u8>) -> Option<O> + Send + Sync + 'static,
130    >(
131        mut self,
132        transformer: F,
133    ) -> Generator<O> {
134        let (generator, generator_yielder) = Generator::create();
135        tokio::spawn(async move {
136            while let Some(item) = self.next().await {
137                if let Some(item_decoded) = transformer(item) {
138                    if generator_yielder.r#yield(item_decoded).await.is_err() {
139                        log::error!("> StreamProtocol > to_generator > Generator > error r#yield, probably it's closed");
140                        log::debug!("> StreamProtocol > to_generator > Generator > breaking to stop yielding");
141                        break; // channel is closed
142                    }
143                }
144            }
145        });
146
147        generator
148    }
149
150    /// It sends an ACK stream message through the transport given to [`StreamProtocol`] to let the other half know that the strem is opened
151    async fn acknowledge_open(&self) -> Result<(), TransportError> {
152        let stream_message = StreamMessage {
153            port_id: self.port_id,
154            sequence_id: self.last_received_sequence_id,
155            message_identifier: build_message_identifier(
156                RpcMessageTypes::StreamAck as u32,
157                self.message_number,
158            ),
159            payload: vec![],
160            closed: false,
161            ack: true,
162        };
163
164        self.transport.send(stream_message.encode_to_vec()).await
165    }
166
167    /// Finishs the stream processing, closes all generators and cancels all background tasks
168    pub fn close(&mut self) {
169        self.generator.0.close();
170        self.process_cancellation_token.cancel();
171    }
172
173    /// Spawns a background task to start processing the stream messages and returns a listener ([`AsyncChannelSender`])
174    ///
175    /// The returned listener will be registered in [`crate::messages_handlers::ServerMessagesHandler`] or [`crate::messages_handlers::ClientMessagesHandler`] that the [`RpcServer`](crate::server::RpcServer)  and [`RpcClient`](crate::client::RpcClient) contains
176    ///
177    /// Each new message that either of both structs receives will be handled by the messages handler and sent to the returned listener if it corresponds
178    ///
179    /// The callback expected in params, it should be to remove the listener from the messages handler when the the processing finishes
180    ///
181    pub(crate) async fn start_processing<
182        F: Future + Send,
183        Callback: FnOnce() -> F + Send + 'static,
184    >(
185        &self,
186        callback: Callback,
187    ) -> Result<AsyncChannelSender<(RpcMessageTypes, u32, StreamMessage)>, CommonError> {
188        if self.acknowledge_open().await.is_err() {
189            return Err(CommonError::TransportError);
190        }
191        let token = self.process_cancellation_token.clone();
192        let (messages_listener, messages_processor) = unbounded();
193        let internal_channel = self.generator.1.clone();
194        tokio::spawn(async move {
195            let token_cloned = token.clone();
196            select! {
197                _ = token.cancelled() => {
198                    debug!("> StreamProtocol cancelled!");
199                    callback().await;
200                    // TOOD: Communicate error
201                },
202                _ = Self::process_messages(messages_processor, internal_channel, token_cloned) => {
203                    callback().await;
204                }
205            }
206        });
207
208        Ok(messages_listener)
209    }
210
211    /// Processes stream messages, it's called by the background task spawned in [`start_processing`](#method.start_processing)
212    ///
213    /// It receives each [`StreamMessage`] and decides what to do with the stream messages processing, if it has to continue yielding messages or close the stream processing
214    ///
215    async fn process_messages(
216        messages_processor: AsyncChannelReceiver<(RpcMessageTypes, u32, StreamMessage)>,
217        internal_channel_sender: GeneratorYielder<Vec<u8>>,
218        cancellation_token: CancellationToken,
219    ) {
220        while let Ok((message_type, _, stream_message)) = messages_processor.recv().await {
221            if matches!(message_type, RpcMessageTypes::StreamMessage) {
222                if stream_message.closed {
223                    cancellation_token.cancel();
224                    messages_processor.close();
225                } else if internal_channel_sender
226                    .r#yield(stream_message.payload)
227                    .await
228                    .is_err()
229                {
230                    log::error!("> StreamProtocol > process_messages > Error on sending through the Generator, seems to be dropped")
231                }
232            } else if matches!(message_type, RpcMessageTypes::RemoteErrorResponse) {
233                cancellation_token.cancel();
234                messages_processor.close();
235                // TOOD: Communicate error
236            }
237        }
238    }
239}
240
241/// Errors related with [`Generator`] and [`GeneratorYielder`]
242#[derive(Debug)]
243pub enum GeneratorError {
244    /// The underlying channel is closed, if it was unable to insert.
245    ///
246    /// So if this error appears, it's due to a closed channel. We must stop doing [`GeneratorYielder::r#yield`]
247    ///
248    UnableToInsert,
249}
250
251/// Generator struct contains only one field which it's an unbounded receiver from unounded channel from [`tokio`] crate
252///
253/// The other half of the unbounded channel is given to the [`GeneratorYielder`]
254///
255pub struct Generator<M>(UnboundedReceiver<M>);
256
257impl<M: Send + Sync + 'static> Generator<M> {
258    /// Creates an unbounded channel and returns a [`Generator`] and [`GeneratorYielder`]
259    pub fn create() -> (Self, GeneratorYielder<M>) {
260        let channel = unbounded_channel();
261        (Self(channel.1), GeneratorYielder::new(channel.0))
262    }
263
264    // TODO: could it be a trait and reuse for other structs?
265    /// This functions takes a owned [`Generator`] to turn the items in that generator into another ones
266    /// but keeping the items in a generator.
267    ///
268    /// It allows to pass a closure to transform the values in the old [`Generator`].
269    ///
270    /// The transform closure has to return an Option, this allows the user to return None if something failed in the closure or it depends on some condition.
271    ///
272    /// It transforms the values and sends it to the returned [`Generator`] in a background task for a immediate response.
273    pub fn from_generator<
274        O: Send + Sync + 'static,
275        F: Fn(M) -> Option<O> + Send + Sync + 'static,
276    >(
277        mut old_generator: Generator<M>,
278        transformer: F,
279    ) -> Generator<O> {
280        let (generator, generator_yielder) = Generator::create();
281        tokio::spawn(async move {
282            while let Some(item) = old_generator.next().await {
283                if let Some(new_item) = transformer(item) {
284                    if generator_yielder.r#yield(new_item).await.is_err() {
285                        log::error!("> Generator > error r#yield, probably it's closed");
286                        log::debug!("> Generator > breaking to stop yielding");
287                        break; // channel is closed
288                    }
289                }
290            }
291        });
292
293        generator
294    }
295
296    pub async fn next(&mut self) -> Option<M> {
297        self.0.recv().await
298    }
299
300    pub fn close(&mut self) {
301        self.0.close()
302    }
303}
304
305/// The other half for a [`Generator`]. It contains an only one field which it's an unbounded sender from a unbounded channel from [`tokio`] crate
306///
307/// It's in charge of sendin the values to the [`Generator`]
308///
309pub struct GeneratorYielder<M>(UnboundedSender<M>);
310
311impl<M> GeneratorYielder<M> {
312    fn new(sender: UnboundedSender<M>) -> Self {
313        Self(sender)
314    }
315
316    pub async fn r#yield(&self, item: M) -> Result<(), GeneratorError> {
317        match self.0.send(item) {
318            Ok(_) => Ok(()),
319            Err(_) => Err(GeneratorError::UnableToInsert),
320        }
321    }
322}
323
324impl<M> Clone for GeneratorYielder<M> {
325    fn clone(&self) -> Self {
326        Self(self.0.clone())
327    }
328}