dcl_rpc/
server.rs

1//! This module contains all the types needed to have a running [`RpcServer`].
2//
3use crate::{
4    messages_handlers::ServerMessagesHandler,
5    rpc_protocol::{
6        fill_remote_error,
7        parse::{build_message_identifier, parse_header},
8        server_ready_message, CreatePort, CreatePortResponse, DestroyPort, ModuleProcedure,
9        RemoteError, RemoteErrorResponse, Request, RequestModule, RequestModuleResponse,
10        RpcMessageTypes,
11    },
12    service_module_definition::{ProcedureContext, ProcedureDefinition, ServiceModuleDefinition},
13    stream_protocol::StreamProtocol,
14    transports::{Transport, TransportError, TransportMessage},
15};
16use log::{debug, error};
17use prost::{alloc::vec::Vec, Message};
18use std::{collections::HashMap, sync::Arc, u8};
19use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
20
21/// Handler that runs each time that a port is created
22type PortHandlerFn<Context> = dyn Fn(&mut RpcServerPort<Context>) + Send + Sync + 'static;
23
24type TransportHandler<Transport> = dyn Fn(Arc<Transport>, TransportID) + Send + Sync + 'static;
25
26/// Handler that runs each time that a transport was closed
27type OnTransportClosesHandler<Transport> = TransportHandler<Transport>;
28
29/// Handler that run each time that a transport is put to run
30type OnTransportConnected<Transport> = TransportHandler<Transport>;
31
32/// Error returned by a server function could be an error which it's possible and useful to communicate or not.
33#[derive(Debug)]
34pub enum ServerResultError {
35    External(ServerError),
36    Internal(ServerInternalError),
37}
38
39/// Result type for all [`RpcServer`] functions
40pub type ServerResult<T> = Result<T, ServerResultError>;
41
42/// Enum of errors which should be exposed to the client and turned into a [`crate::rpc_protocol::RemoteError`]
43#[derive(Debug)]
44pub enum ServerError {
45    /// Error on decoding bytes (`Vec<u8>`) into a given type using [`crate::rpc_protocol::parse::parse_protocol_message`] or using the [`Message::decode`]
46    ProtocolError,
47    /// Port was not found in the server state, possibly not created
48    PortNotFound(u32),
49    /// Error on loading a Module, unlikely to happen
50    LoadModuleError,
51    /// Module was not found, not registered in the server
52    ModuleNotFound(String),
53    /// Given procedure's ID was not found
54    ProcedureNotFound(u32),
55    /// Unexpexted Error while responding back or Error on sending the original procedure response
56    ///
57    /// This error should be use as a "re-try" when a [`Transport::send`] failed.
58    UnexpectedErrorOnTransport,
59}
60
61impl RemoteErrorResponse for ServerError {
62    fn error_code(&self) -> u32 {
63        match self {
64            Self::ProtocolError => 1,
65            Self::PortNotFound(_) => 2,
66            Self::ModuleNotFound(_) => 3,
67            Self::ProcedureNotFound(_) => 4,
68            Self::UnexpectedErrorOnTransport => 5,
69            Self::LoadModuleError => 0, // it's unlikely to happen
70        }
71    }
72
73    fn error_message(&self) -> String {
74        match self {
75            Self::ProtocolError => "Error on parsing a message. The content seems to be corrupted and not to meet the protocol requirements".to_string(),
76            Self::PortNotFound(id) => format!("The given Port's ID: {id} was not found"),
77            Self::LoadModuleError => "Error on loading a module".to_string(),
78            Self::ModuleNotFound(module_name) => format!("Module wasn't found on the server, check the name: {module_name}"),
79            Self::ProcedureNotFound(id) => format!("Procedure's ID: {id} wasn't found on the server"),
80            Self::UnexpectedErrorOnTransport => "Error on the transport while sending the original procedure response".to_string()
81        }
82    }
83}
84
85/// Enum of errors which are internal or have no sense to be exposed to the client
86#[derive(Debug)]
87pub enum ServerInternalError {
88    UnableToNofifyServer,
89    TransportError,
90    TransportNotAttached,
91    InvalidHeader,
92    TransportWasClosed,
93}
94
95type TransportID = u32;
96type PortID = u32;
97
98type TransportEvent<T, M> = (T, M);
99
100/// Events that the [`RpcServer`] has to react to
101enum ServerEvents<T: Transport + ?Sized> {
102    AttachTransport(Arc<T>),
103    NewTransport(TransportID, Arc<T>),
104}
105
106/// Notifications about Transports connected to the [`RpcServer`]
107enum TransportNotification<T: Transport + ?Sized> {
108    /// New message received from a transport
109    NewMessage(TransportEvent<(Arc<T>, TransportID), TransportMessage>),
110    /// A Notification for when a `ServerEvents::AttachTransport` is received in order to attach a transport to the server [`RpcServer`](#method.RpcServer.attach_transport) and make it run to receive messages
111    MustAttachTransport(Arc<T>),
112    /// Close Transport Notification in order to remove it from the [`RpcServer`] state
113    CloseTransport(TransportID),
114}
115
116/// Structure to send events to the server from outside. It's a wrapper for a [`tokio::sync::mpsc::UnboundedSender`] from a channel so that we can send events from another thread e.g for a Websocket listener.
117pub struct ServerEventsSender<T: Transport + ?Sized>(UnboundedSender<ServerEvents<T>>);
118
119impl<T: Transport + ?Sized> ServerEventsSender<T> {
120    /// Sends a [`ServerEvents::AttachTransport`] to the [`RpcServer`]
121    ///
122    /// This allows you to notify the server that has to attach a new transport so after that it can make it run to listen for new messages
123    ///
124    /// This is equivalent to `RpcServer::attach_transport` but it can be used to attach a transport to the [`RpcServer`] from another spawned thread (or background task)
125    ///
126    /// This allows you to listen on a port in a background taskĀ for external connections and attach multiple transports that want to connect to the server
127    ///
128    /// It receives the `Transport` inside an `Arc` because it must be sharable.
129    ///
130    pub fn send_attach_transport(&self, transport: Arc<T>) -> ServerResult<()> {
131        if self
132            .0
133            .send(ServerEvents::AttachTransport(transport))
134            .is_err()
135        {
136            return Err(ServerResultError::Internal(
137                ServerInternalError::UnableToNofifyServer,
138            ));
139        }
140        Ok(())
141    }
142
143    /// Sends a [`ServerEvents::NewTransport`] to the [`RpcServer`]
144    ///
145    /// This allows you to notify the server that has to put to run a new transport
146    ///
147    /// It receives the [`Transport`] inside an `Arc` because it must be sharable.
148    ///
149    fn send_new_transport(&self, id: TransportID, transport: Arc<T>) -> ServerResult<()> {
150        if self
151            .0
152            .send(ServerEvents::NewTransport(id, transport))
153            .is_err()
154        {
155            error!("> RpcServer > Error on notifying the new transport {id}");
156            return Err(ServerResultError::Internal(
157                ServerInternalError::TransportNotAttached,
158            ));
159        }
160        Ok(())
161    }
162}
163
164impl<T: Transport + ?Sized> Clone for ServerEventsSender<T> {
165    fn clone(&self) -> Self {
166        Self(self.0.clone())
167    }
168}
169
170/// RpcServer receives and process different requests from the RpcClient
171///
172/// Once a RpcServer is inited, you should attach a transport and handler
173/// for the port creation.
174pub struct RpcServer<Context, T: Transport + ?Sized> {
175    /// The Transport used for the communication between `RpcClient` and [`RpcServer`]
176    transports: HashMap<TransportID, Arc<T>>,
177    /// The handler executed when a new port is created
178    port_creation_handler: Option<Box<PortHandlerFn<Context>>>,
179    /// The handler is executed when a transport is closed.
180    ///
181    /// It works for cleaning resources that may be tied to or depends on the transport's connection.
182    on_transport_closes_handler: Option<Box<OnTransportClosesHandler<T>>>,
183    /// The handler is executed when a transport is put to run.
184    ///
185    /// It works for executing a function which receives the Transport ID assigned by the server to a new running transport
186    on_transport_connected_handler: Option<Box<OnTransportConnected<T>>>,
187    /// Ports registered in the [`RpcServer`]
188    ports: HashMap<PortID, RpcServerPort<Context>>,
189    ports_by_transport_id: HashMap<TransportID, Vec<PortID>>,
190    /// RpcServer Context
191    context: Arc<Context>,
192    /// Handler in charge of handling every request<>response.
193    ///
194    /// It's stored inside an `Arc` because it'll be shared between threads
195    messages_handler: Arc<ServerMessagesHandler>,
196    /// `ServerEventsSender` structure that contains the sender half of a channel to send `ServerEvents` to the [`RpcServer`]
197    server_events_sender: ServerEventsSender<T>,
198    /// The receiver half of a channel that receives `ServerEvents` which the [`RpcServer`] has to react to
199    ///
200    /// It's an Option so that we can take ownership of it and remove it from the [`RpcServer`], and make it run in a background task
201    server_events_receiver: Option<UnboundedReceiver<ServerEvents<T>>>,
202    /// The id that will be assigned if a new transport is a attached
203    next_transport_id: u32,
204    /// THe id that will be assigned to a port when it's created.
205    next_port_id: u32,
206}
207impl<Context: Send + Sync + 'static, T: Transport + ?Sized + 'static> RpcServer<Context, T> {
208    pub fn create(ctx: Context) -> Self {
209        let channel = unbounded_channel();
210        Self {
211            transports: HashMap::new(),
212            port_creation_handler: None,
213            on_transport_connected_handler: None,
214            on_transport_closes_handler: None,
215            ports: HashMap::new(),
216            ports_by_transport_id: HashMap::new(),
217            context: Arc::new(ctx),
218            messages_handler: Arc::new(ServerMessagesHandler::new()),
219            next_transport_id: 1,
220            next_port_id: 1,
221            server_events_sender: ServerEventsSender(channel.0),
222            server_events_receiver: Some(channel.1),
223        }
224    }
225
226    /// Get a `ServerEventsSender` to send allowed server events from outside
227    pub fn get_server_events_sender(&self) -> ServerEventsSender<T> {
228        self.server_events_sender.clone()
229    }
230
231    /// Attaches the server half of the transport for Client<>Server communications
232    ///
233    /// It differs from sending the `ServerEvents::AtacchTransport` because it can only be used to attach transport from the current thread where the [`RpcServer`] was initalized due to the mutably borrow
234    ///
235    /// It receives the `Transport` inside an `Arc` because it must be sharable.
236    ///
237    pub async fn attach_transport(&mut self, transport: Arc<T>) -> ServerResult<()> {
238        self.new_transport_attached(transport).await
239    }
240
241    /// Sends the `ServerEvents::NewTransport` in order to make this new transport run in backround to receive its messages
242    ///
243    /// This function is used when a transport is attached with`RpcServer::attach_transport` and with the `ServerEventsSender::send_attach_transport`
244    ///
245    /// It receives the `Transport` inside an `Arc` because it must be sharable.
246    ///
247    async fn new_transport_attached(&mut self, transport: Arc<T>) -> ServerResult<()> {
248        let current_id = self.next_transport_id;
249        if let Err(error) = transport.send(server_ready_message().encode_to_vec()).await {
250            error!("> RpcServer > new_transport_attached > Error while sending server ready message: {error:?}");
251            if matches!(error, TransportError::Closed) {
252                return Err(ServerResultError::Internal(
253                    ServerInternalError::TransportError,
254                ));
255            } else {
256                transport.close().await;
257                return Err(ServerResultError::Internal(
258                    ServerInternalError::TransportError,
259                ));
260            }
261        }
262        self.server_events_sender
263            .send_new_transport(current_id, transport.clone())?;
264        if let Some(handler) = &self.on_transport_connected_handler {
265            handler(transport.clone(), current_id);
266        }
267        self.transports.insert(current_id, transport);
268        self.next_transport_id += 1;
269        Ok(())
270    }
271
272    /// Start processing `ServerEvent` events and listening on a channel for new `TransportNotification` that are sent by all the attached transports that are running in background tasks.
273    pub async fn run(&mut self) {
274        // create transports notifier. This channel will be in charge of sending all messages (and errors) that all the transports attached to server receieve
275        // We use async_channel crate for this channel because we want our receiver to be cloned so that we can close it when no more transports are open
276        // And after that, our server can exit because it knows that it wont receive more notifications
277        let (transports_notifier, mut transports_notification_receiver) =
278            unbounded_channel::<TransportNotification<T>>();
279        // Spawn a task to process ServerEvents in background
280        self.process_server_events(transports_notifier);
281        // loop on transports_notifier
282        loop {
283            // A transport here is the equivalent to a new connection in a common HTTP server
284            match transports_notification_receiver.recv().await {
285                Some(notification) => match notification {
286                    TransportNotification::NewMessage(((transport, transport_id), event)) => {
287                        match parse_header(&event) {
288                            Some((message_type, message_number)) => {
289                                match self
290                                    .handle_message(
291                                        transport_id,
292                                        event,
293                                        message_type,
294                                        message_number,
295                                    )
296                                    .await
297                                {
298                                    Ok(_) => debug!("> RpcServer > Transport message handled!"),
299                                    Err(server_error) => match server_error {
300                                        ServerResultError::External(server_external_error) => {
301                                            error!("> RpcServer > Server External Error {server_external_error:?}");
302                                            // If a server error is external, we should send it back to the client
303                                            tokio::spawn(async move {
304                                                let mut remote_error: RemoteError =
305                                                    server_external_error.into();
306                                                fill_remote_error(
307                                                    &mut remote_error,
308                                                    message_number,
309                                                );
310                                                if transport
311                                                    .send(remote_error.encode_to_vec())
312                                                    .await
313                                                    .is_err()
314                                                {
315                                                    error!("> RpcServer > Error on sending the a RemoteError to the client {remote_error:?}")
316                                                }
317                                            });
318                                        }
319                                        ServerResultError::Internal(server_internal_error) => {
320                                            error!("> RpcServer > Server Internal Error: {server_internal_error:?}")
321                                        }
322                                    },
323                                }
324                            }
325                            None => {
326                                error!("> RpcServer > A Invalid Header was sent by the client, message ignored");
327                                continue;
328                            }
329                        }
330                    }
331                    TransportNotification::MustAttachTransport(transport) => {
332                        if let Err(error) = self.new_transport_attached(transport).await {
333                            error!("> RpcServer > Error on attaching transport to the server in order to receive message from it: {error:?}");
334                            continue;
335                        }
336                    }
337                    TransportNotification::CloseTransport(id) => {
338                        if let Some(transport) = self.transports.remove(&id) {
339                            if let Some(on_close_handler) = &self.on_transport_closes_handler {
340                                on_close_handler(transport, id);
341                            }
342                            // Get port ids to drop ports
343                            if let Some(port_ids) = self.ports_by_transport_id.remove(&id) {
344                                for id in port_ids {
345                                    // Drop port
346                                    self.ports.remove(&id);
347                                }
348                            }
349                        }
350                    }
351                },
352                None => {
353                    error!("> RpcServer > Transport notification receiver error");
354                    break;
355                }
356            }
357        }
358    }
359
360    /// Process `ServerEvent` that are sent through the events channel.
361    ///
362    /// It spawns a background task to listen on the channel for new events and executes different actions depending on the event.
363    ///
364    /// # Events
365    /// - `ServerEvent::NewTransport` : Spawns a background task to listen on the transport for new `TransportEvent` and then it sends that new event to the [`RpcServer`]
366    /// - `ServerEvent::TransportFinished` : Collect in memory the amount of transports that already finished and when the amount is equal to the total running transport, it emits `ServerEvents::Terminated`
367    /// - `ServerEvent::Terminated` : Close the [`RpcServer`] transports notfier (channel) and events channel
368    ///
369    /// # Arguments
370    /// * `transports_notifier` - The channel which works as a notifier about events in each transport. It's cloned for each new spawned transport
371    ///
372    fn process_server_events(
373        &mut self,
374        transports_notifier: UnboundedSender<TransportNotification<T>>,
375    ) {
376        let mut events_receiver = if let Some(events_receiver) = self.server_events_receiver.take()
377        {
378            events_receiver
379        } else {
380            panic!("> RpcServer > process_server_events > misuse of process_server_events, seems to be called more than one time")
381        };
382
383        tokio::spawn(async move {
384            while let Some(event) = events_receiver.recv().await {
385                match event {
386                    ServerEvents::NewTransport(id, transport) => {
387                        let tx_cloned = transports_notifier.clone();
388                        tokio::spawn(async move {
389                            loop {
390                                match transport.receive().await {
391                                    Ok(event) => {
392                                        if tx_cloned
393                                            .send(TransportNotification::NewMessage((
394                                                (transport.clone(), id),
395                                                event,
396                                            )))
397                                            .is_err()
398                                        {
399                                            error!("> From a Transport > Error while sending new message from transport to server via notifier");
400                                            break;
401                                        }
402                                    }
403                                    Err(error) => {
404                                        if matches!(error, TransportError::Closed) {
405                                            error!(
406                                                "> From a Transport > Transport is already closed. Breaking..."
407                                            );
408                                            if tx_cloned
409                                                .send(TransportNotification::CloseTransport(id))
410                                                .is_err()
411                                            {
412                                                error!("> From a Transport > Error while sending new message from transport to server via notifier");
413                                                break;
414                                            }
415                                            break;
416                                        }
417                                        error!("> From a Transport > Error on receiving {error:?}");
418                                    }
419                                }
420                            }
421                        });
422                    }
423                    ServerEvents::AttachTransport(transport) => {
424                        if transports_notifier
425                            .send(TransportNotification::MustAttachTransport(transport))
426                            .is_err()
427                        {
428                            error!("> From a Transport > Error while notifying the server to attach a new transport");
429                            continue;
430                        };
431                    }
432                }
433            }
434        });
435    }
436
437    /// Set a handler for the port creation
438    ///
439    /// When a port is created, a service should be registered
440    /// for the port.
441    pub fn set_module_registrator_handler<H>(&mut self, handler: H)
442    where
443        H: Fn(&mut RpcServerPort<Context>) + Send + Sync + 'static,
444    {
445        self.port_creation_handler = Some(Box::new(handler));
446    }
447
448    /// Set a handler to be executed when a transport was closed
449    ///
450    /// When a transport closes its connection, the closure will be executed.
451    ///
452    /// This could be useful when there are resources that may be tied to or depends on a transport's connection
453    pub fn set_on_transport_closes_handler<H>(&mut self, handler: H)
454    where
455        H: Fn(Arc<T>, TransportID) + Send + Sync + 'static,
456    {
457        self.on_transport_closes_handler = Some(Box::new(handler));
458    }
459
460    /// Set a handler is executed when a transport is put to run.
461    ///
462    /// It works for executing a function which receives the Transport ID assigned by the server to a new running transport
463    pub fn set_on_transport_connected_handler<H>(&mut self, handler: H)
464    where
465        H: Fn(Arc<T>, TransportID) + Send + Sync + 'static,
466    {
467        self.on_transport_connected_handler = Some(Box::new(handler));
468    }
469
470    /// Handle the requests for a procedure call
471    ///
472    /// # Arguments
473    ///
474    /// * `transport` - The transport which sent the procedure request
475    /// * `message_number` - A 32-bit unsigned number created by `build_message_identifier` in `protocol/parse.rs`
476    /// * `payload` - Slice of bytes containing the request payload encoded with protobuf
477    async fn handle_request(
478        &self,
479        transport: Arc<T>,
480        transport_id: TransportID,
481        message_number: u32,
482        payload: Vec<u8>,
483    ) -> ServerResult<()> {
484        let request = Request::decode(payload.as_slice())
485            .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
486
487        match self.ports.get(&request.port_id) {
488            Some(port) => {
489                let transport_cloned = transport.clone();
490                let procedure_handler = port.get_procedure(request.procedure_id)?;
491                let procedure_ctx = ProcedureContext {
492                    server_context: self.context.clone(),
493                    transport_id,
494                };
495
496                match procedure_handler {
497                    ProcedureDefinition::Unary(procedure_handler) => {
498                        self.messages_handler.process_unary_request(
499                            transport_cloned,
500                            message_number,
501                            procedure_handler(request.payload, procedure_ctx),
502                        );
503                    }
504                    ProcedureDefinition::ServerStreams(procedure_handler) => {
505                        self.messages_handler
506                            // Cloned because the receiver of the function is an Arc. It'll be spawned in other thread and it needs to modify its state
507                            .clone()
508                            .process_server_streams_request(
509                                transport_cloned,
510                                message_number,
511                                request.port_id,
512                                procedure_handler(request.payload, procedure_ctx),
513                            )
514                    }
515                    ProcedureDefinition::ClientStreams(procedure_handler) => {
516                        let client_stream_id = request.client_stream;
517                        let stream_protocol = StreamProtocol::new(
518                            transport.clone(),
519                            request.port_id,
520                            request.client_stream,
521                        );
522
523                        let msg_handler = self.messages_handler.clone();
524                        match stream_protocol
525                            .start_processing(move || async move {
526                                msg_handler.unregister_listener(client_stream_id).await
527                            })
528                            .await
529                        {
530                            Ok(listener) => {
531                                self.messages_handler
532                                    .clone()
533                                    .process_client_streams_request(
534                                        transport_cloned,
535                                        message_number,
536                                        client_stream_id,
537                                        procedure_handler(
538                                            stream_protocol.to_generator(Some),
539                                            procedure_ctx,
540                                        ),
541                                        listener,
542                                    );
543                            }
544                            Err(_) => {
545                                return Err(ServerResultError::Internal(
546                                    ServerInternalError::TransportError,
547                                ))
548                            }
549                        }
550                    }
551                    ProcedureDefinition::BiStreams(procedure_handler) => {
552                        let client_stream_id = request.client_stream;
553                        let stream_protocol = StreamProtocol::new(
554                            transport.clone(),
555                            request.port_id,
556                            request.client_stream,
557                        );
558
559                        let msg_handler = self.messages_handler.clone();
560                        match stream_protocol
561                            .start_processing(move || async move {
562                                msg_handler.unregister_listener(client_stream_id).await
563                            })
564                            .await
565                        {
566                            Ok(listener) => {
567                                self.messages_handler.clone().process_bidir_streams_request(
568                                    transport_cloned,
569                                    message_number,
570                                    request.port_id,
571                                    client_stream_id,
572                                    listener,
573                                    procedure_handler(
574                                        stream_protocol.to_generator(Some),
575                                        procedure_ctx,
576                                    ),
577                                );
578                            }
579                            Err(_) => {
580                                return Err(ServerResultError::Internal(
581                                    ServerInternalError::TransportError,
582                                ))
583                            }
584                        }
585                    }
586                }
587
588                Ok(())
589            }
590            _ => Err(ServerResultError::External(ServerError::PortNotFound(
591                request.port_id,
592            ))),
593        }
594    }
595
596    /// Handle the requests when a client wants to load a specific registered module and then starts calling the procedures
597    ///
598    /// # Arguments
599    ///
600    /// * `transport` - The transport which is requesting the module
601    /// * `message_number` - A 32-bit unsigned number created by `build_message_identifier` in `protocol/parse.rs`
602    /// * `payload` - Slice of bytes containing the request payload encoded with protobuf
603    async fn handle_request_module(
604        &mut self,
605        transport: Arc<T>,
606        message_number: u32,
607        payload: Vec<u8>,
608    ) -> ServerResult<()> {
609        let request_module = RequestModule::decode(payload.as_slice())
610            .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
611        if let Some(port) = self.ports.get_mut(&request_module.port_id) {
612            if let Ok(server_module_declaration) = port.load_module(request_module.module_name) {
613                let mut procedures: Vec<ModuleProcedure> = Vec::default();
614                for procedure in &server_module_declaration.procedures {
615                    let module_procedure = ModuleProcedure {
616                        procedure_name: procedure.procedure_name.clone(),
617                        procedure_id: procedure.procedure_id,
618                    };
619                    procedures.push(module_procedure)
620                }
621
622                let response = RequestModuleResponse {
623                    port_id: request_module.port_id,
624                    message_identifier: build_message_identifier(
625                        RpcMessageTypes::RequestModuleResponse as u32,
626                        message_number,
627                    ),
628                    procedures,
629                };
630                let response = response.encode_to_vec();
631                transport
632                    .send(response)
633                    .await
634                    .map_err(|_| ServerResultError::Internal(ServerInternalError::TransportError))?
635            } else {
636                return Err(ServerResultError::External(ServerError::LoadModuleError));
637            }
638        } else {
639            return Err(ServerResultError::External(ServerError::PortNotFound(
640                request_module.port_id,
641            )));
642        }
643
644        Ok(())
645    }
646
647    /// Handle the requests when a client wants to create a port.
648    ///
649    /// The `handler` registered with `set_handler` function is called here.
650    ///
651    /// # Arguments
652    ///
653    /// * `transport` - The transport which sent the request to create a port
654    /// * `message_number` - A 32-bit unsigned number created by `build_message_identifier` in `protocol/parse.rs`
655    /// * `payload` - Slice of bytes containing the request payload encoded with protobuf
656    async fn handle_create_port(
657        &mut self,
658        transport: Arc<T>,
659        transport_id: TransportID,
660        message_number: u32,
661        payload: Vec<u8>,
662    ) -> ServerResult<()> {
663        let port_id = self.next_port_id;
664        let create_port = CreatePort::decode(payload.as_slice())
665            .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
666        let port_name = create_port.port_name;
667        let mut port = RpcServerPort::new(port_name.clone());
668
669        if let Some(handler) = &self.port_creation_handler {
670            handler(&mut port);
671        }
672
673        let response = CreatePortResponse {
674            message_identifier: build_message_identifier(
675                RpcMessageTypes::CreatePortResponse as u32,
676                message_number,
677            ),
678            port_id,
679        };
680        let response = response.encode_to_vec();
681
682        transport
683            .send(response)
684            .await
685            .map_err(|_| ServerResultError::Internal(ServerInternalError::TransportError))?;
686
687        self.next_port_id += 1;
688        self.ports.insert(port_id, port);
689        self.ports_by_transport_id
690            .entry(transport_id)
691            .and_modify(|ports| ports.push(port_id))
692            .or_insert_with(|| vec![port_id]);
693
694        Ok(())
695    }
696
697    /// Handle the requests when a client wants to destroy a port because no longer needed
698    ///
699    /// # Arguments
700    ///
701    /// * `payload` - Vec of bytes containing the request payload encoded with protobuf
702    fn handle_destroy_port(&mut self, payload: Vec<u8>) -> ServerResult<()> {
703        let destroy_port = DestroyPort::decode(payload.as_slice())
704            .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
705
706        self.ports.remove(&destroy_port.port_id);
707        Ok(())
708    }
709
710    /// Handle every request from the client.
711    ///
712    /// Then, parse the "header" that contains the `message_type` and `message_identifier`
713    ///
714    /// This allows us know which function should finially handle the request
715    ///
716    /// # Arguments
717    ///
718    /// * `transport_id` - The transport ID which sent a new message to be processed
719    /// * `payload` - Vec of bytes containing the request payload encoded with protobuf
720    /// * `message_type` - [`RpcMessageTypes`] the protocol type of the message
721    /// * `message_number` - the number of the message derivided from the `message_identifier` in the [`crate::rpc_protocol::RpcMessageHeader`]
722    async fn handle_message(
723        &mut self,
724        transport_id: TransportID,
725        payload: Vec<u8>,
726        message_type: RpcMessageTypes,
727        message_number: u32,
728    ) -> ServerResult<()> {
729        let transport = self
730            .transports
731            .get(&transport_id)
732            .ok_or(ServerResultError::Internal(
733                ServerInternalError::TransportNotAttached,
734            ))?
735            .clone();
736        match message_type {
737            RpcMessageTypes::Request => {
738                self.handle_request(transport, transport_id, message_number, payload)
739                    .await?
740            }
741            RpcMessageTypes::RequestModule => {
742                self.handle_request_module(transport, message_number, payload)
743                    .await?
744            }
745            RpcMessageTypes::CreatePort => {
746                self.handle_create_port(transport, transport_id, message_number, payload)
747                    .await?
748            }
749            RpcMessageTypes::DestroyPort => self.handle_destroy_port(payload)?,
750            RpcMessageTypes::StreamAck => {
751                // Client akcnowledged a stream message sent by Server
752                // and we should notify the waiter for the ack in order to
753                // continue sending streams to Client
754                self.messages_handler
755                    .streams_handler
756                    .clone()
757                    .message_acknowledged_by_peer(message_number, payload)
758            }
759            RpcMessageTypes::StreamMessage => {
760                // Client has a client stream request type opened and we should
761                // notify our listener for the client message id that we have a new message to process
762                self.messages_handler
763                    .clone()
764                    .notify_new_client_stream(message_number, payload)
765            }
766            _ => {
767                debug!("Unknown message");
768            }
769        };
770
771        Ok(())
772    }
773}
774
775/// RpcServerPort is what a RpcServer contains to handle different services/modules
776pub struct RpcServerPort<Context> {
777    /// RpcServer name
778    pub name: String,
779    /// Registered modules contains the name and module/service definition
780    ///
781    /// A module can be registered but not loaded
782    registered_modules: HashMap<String, ServiceModuleDefinition<Context>>,
783    /// Loaded modules contains the name and a collection of procedures with id and the name for each one
784    ///
785    /// A module is loaded when the client requests to.
786    loaded_modules: HashMap<String, ServerModuleDeclaration>,
787    /// Procedures contains the id and the handler for each procedure
788    procedures: HashMap<u32, ProcedureDefinition<Context>>,
789    /// Global Procedure ID
790    next_procedure_id: u32,
791}
792
793impl<Context> RpcServerPort<Context> {
794    fn new(name: String) -> Self {
795        RpcServerPort {
796            name,
797            registered_modules: HashMap::new(),
798            loaded_modules: HashMap::new(),
799            procedures: HashMap::new(),
800            next_procedure_id: 1,
801        }
802    }
803
804    /// Just register the module in the port
805    pub fn register_module(
806        &mut self,
807        module_name: String,
808        service_definition: ServiceModuleDefinition<Context>,
809    ) {
810        self.registered_modules
811            .insert(module_name, service_definition);
812    }
813
814    /// It checks if the module is already loaded and return it.
815    ///
816    /// Otherwise, it will get the module definition from the `registered_modules` and load it
817    fn load_module(&mut self, module_name: String) -> ServerResult<&ServerModuleDeclaration> {
818        if self.loaded_modules.contains_key(&module_name) {
819            Ok(self
820                .loaded_modules
821                .get(&module_name)
822                .expect("Already checked."))
823        } else {
824            match self.registered_modules.get(&module_name) {
825                None => Err(ServerResultError::External(ServerError::ModuleNotFound(
826                    module_name,
827                ))),
828                Some(module_generator) => {
829                    let mut server_module_declaration = ServerModuleDeclaration {
830                        procedures: Vec::new(),
831                    };
832
833                    let definitions = module_generator.get_definitions();
834
835                    for (procedure_name, procedure_definition) in definitions {
836                        let current_id = self.next_procedure_id;
837                        self.procedures
838                            .insert(current_id, procedure_definition.clone());
839                        server_module_declaration
840                            .procedures
841                            .push(ServerModuleProcedure {
842                                procedure_name: procedure_name.clone(),
843                                procedure_id: current_id,
844                            });
845                        self.next_procedure_id += 1;
846                    }
847
848                    self.loaded_modules
849                        .insert(module_name.clone(), server_module_declaration);
850
851                    let module_definition = self
852                        .loaded_modules
853                        .get(&module_name)
854                        .ok_or(ServerResultError::External(ServerError::LoadModuleError))?;
855                    Ok(module_definition)
856                }
857            }
858        }
859    }
860
861    /// It will look up the procedure id in the port's `procedures` and return the procedure's handler
862    fn get_procedure(&self, procedure_id: u32) -> ServerResult<ProcedureDefinition<Context>> {
863        match self.procedures.get(&procedure_id) {
864            Some(procedure_definition) => Ok(procedure_definition.clone()),
865            _ => Err(ServerResultError::External(ServerError::ProcedureNotFound(
866                procedure_id,
867            ))),
868        }
869    }
870}
871
872#[derive(Debug)]
873pub struct ServerModuleProcedure {
874    pub procedure_name: String,
875    pub procedure_id: u32,
876}
877
878/// Used to store all the procedures in the `loaded_modules` fields inside [`RpcServerPort`]
879pub struct ServerModuleDeclaration {
880    /// Array with all the module's (service) procedures
881    pub procedures: Vec<ServerModuleProcedure>,
882}