dcl_rpc/server.rs
1//! This module contains all the types needed to have a running [`RpcServer`].
2//
3use crate::{
4 messages_handlers::ServerMessagesHandler,
5 rpc_protocol::{
6 fill_remote_error,
7 parse::{build_message_identifier, parse_header},
8 server_ready_message, CreatePort, CreatePortResponse, DestroyPort, ModuleProcedure,
9 RemoteError, RemoteErrorResponse, Request, RequestModule, RequestModuleResponse,
10 RpcMessageTypes,
11 },
12 service_module_definition::{ProcedureContext, ProcedureDefinition, ServiceModuleDefinition},
13 stream_protocol::StreamProtocol,
14 transports::{Transport, TransportError, TransportMessage},
15};
16use log::{debug, error};
17use prost::{alloc::vec::Vec, Message};
18use std::{collections::HashMap, sync::Arc, u8};
19use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
20
21/// Handler that runs each time that a port is created
22type PortHandlerFn<Context> = dyn Fn(&mut RpcServerPort<Context>) + Send + Sync + 'static;
23
24type TransportHandler<Transport> = dyn Fn(Arc<Transport>, TransportID) + Send + Sync + 'static;
25
26/// Handler that runs each time that a transport was closed
27type OnTransportClosesHandler<Transport> = TransportHandler<Transport>;
28
29/// Handler that run each time that a transport is put to run
30type OnTransportConnected<Transport> = TransportHandler<Transport>;
31
32/// Error returned by a server function could be an error which it's possible and useful to communicate or not.
33#[derive(Debug)]
34pub enum ServerResultError {
35 External(ServerError),
36 Internal(ServerInternalError),
37}
38
39/// Result type for all [`RpcServer`] functions
40pub type ServerResult<T> = Result<T, ServerResultError>;
41
42/// Enum of errors which should be exposed to the client and turned into a [`crate::rpc_protocol::RemoteError`]
43#[derive(Debug)]
44pub enum ServerError {
45 /// Error on decoding bytes (`Vec<u8>`) into a given type using [`crate::rpc_protocol::parse::parse_protocol_message`] or using the [`Message::decode`]
46 ProtocolError,
47 /// Port was not found in the server state, possibly not created
48 PortNotFound(u32),
49 /// Error on loading a Module, unlikely to happen
50 LoadModuleError,
51 /// Module was not found, not registered in the server
52 ModuleNotFound(String),
53 /// Given procedure's ID was not found
54 ProcedureNotFound(u32),
55 /// Unexpexted Error while responding back or Error on sending the original procedure response
56 ///
57 /// This error should be use as a "re-try" when a [`Transport::send`] failed.
58 UnexpectedErrorOnTransport,
59}
60
61impl RemoteErrorResponse for ServerError {
62 fn error_code(&self) -> u32 {
63 match self {
64 Self::ProtocolError => 1,
65 Self::PortNotFound(_) => 2,
66 Self::ModuleNotFound(_) => 3,
67 Self::ProcedureNotFound(_) => 4,
68 Self::UnexpectedErrorOnTransport => 5,
69 Self::LoadModuleError => 0, // it's unlikely to happen
70 }
71 }
72
73 fn error_message(&self) -> String {
74 match self {
75 Self::ProtocolError => "Error on parsing a message. The content seems to be corrupted and not to meet the protocol requirements".to_string(),
76 Self::PortNotFound(id) => format!("The given Port's ID: {id} was not found"),
77 Self::LoadModuleError => "Error on loading a module".to_string(),
78 Self::ModuleNotFound(module_name) => format!("Module wasn't found on the server, check the name: {module_name}"),
79 Self::ProcedureNotFound(id) => format!("Procedure's ID: {id} wasn't found on the server"),
80 Self::UnexpectedErrorOnTransport => "Error on the transport while sending the original procedure response".to_string()
81 }
82 }
83}
84
85/// Enum of errors which are internal or have no sense to be exposed to the client
86#[derive(Debug)]
87pub enum ServerInternalError {
88 UnableToNofifyServer,
89 TransportError,
90 TransportNotAttached,
91 InvalidHeader,
92 TransportWasClosed,
93}
94
95type TransportID = u32;
96type PortID = u32;
97
98type TransportEvent<T, M> = (T, M);
99
100/// Events that the [`RpcServer`] has to react to
101enum ServerEvents<T: Transport + ?Sized> {
102 AttachTransport(Arc<T>),
103 NewTransport(TransportID, Arc<T>),
104}
105
106/// Notifications about Transports connected to the [`RpcServer`]
107enum TransportNotification<T: Transport + ?Sized> {
108 /// New message received from a transport
109 NewMessage(TransportEvent<(Arc<T>, TransportID), TransportMessage>),
110 /// A Notification for when a `ServerEvents::AttachTransport` is received in order to attach a transport to the server [`RpcServer`](#method.RpcServer.attach_transport) and make it run to receive messages
111 MustAttachTransport(Arc<T>),
112 /// Close Transport Notification in order to remove it from the [`RpcServer`] state
113 CloseTransport(TransportID),
114}
115
116/// Structure to send events to the server from outside. It's a wrapper for a [`tokio::sync::mpsc::UnboundedSender`] from a channel so that we can send events from another thread e.g for a Websocket listener.
117pub struct ServerEventsSender<T: Transport + ?Sized>(UnboundedSender<ServerEvents<T>>);
118
119impl<T: Transport + ?Sized> ServerEventsSender<T> {
120 /// Sends a [`ServerEvents::AttachTransport`] to the [`RpcServer`]
121 ///
122 /// This allows you to notify the server that has to attach a new transport so after that it can make it run to listen for new messages
123 ///
124 /// This is equivalent to `RpcServer::attach_transport` but it can be used to attach a transport to the [`RpcServer`] from another spawned thread (or background task)
125 ///
126 /// This allows you to listen on a port in a background taskĀ for external connections and attach multiple transports that want to connect to the server
127 ///
128 /// It receives the `Transport` inside an `Arc` because it must be sharable.
129 ///
130 pub fn send_attach_transport(&self, transport: Arc<T>) -> ServerResult<()> {
131 if self
132 .0
133 .send(ServerEvents::AttachTransport(transport))
134 .is_err()
135 {
136 return Err(ServerResultError::Internal(
137 ServerInternalError::UnableToNofifyServer,
138 ));
139 }
140 Ok(())
141 }
142
143 /// Sends a [`ServerEvents::NewTransport`] to the [`RpcServer`]
144 ///
145 /// This allows you to notify the server that has to put to run a new transport
146 ///
147 /// It receives the [`Transport`] inside an `Arc` because it must be sharable.
148 ///
149 fn send_new_transport(&self, id: TransportID, transport: Arc<T>) -> ServerResult<()> {
150 if self
151 .0
152 .send(ServerEvents::NewTransport(id, transport))
153 .is_err()
154 {
155 error!("> RpcServer > Error on notifying the new transport {id}");
156 return Err(ServerResultError::Internal(
157 ServerInternalError::TransportNotAttached,
158 ));
159 }
160 Ok(())
161 }
162}
163
164impl<T: Transport + ?Sized> Clone for ServerEventsSender<T> {
165 fn clone(&self) -> Self {
166 Self(self.0.clone())
167 }
168}
169
170/// RpcServer receives and process different requests from the RpcClient
171///
172/// Once a RpcServer is inited, you should attach a transport and handler
173/// for the port creation.
174pub struct RpcServer<Context, T: Transport + ?Sized> {
175 /// The Transport used for the communication between `RpcClient` and [`RpcServer`]
176 transports: HashMap<TransportID, Arc<T>>,
177 /// The handler executed when a new port is created
178 port_creation_handler: Option<Box<PortHandlerFn<Context>>>,
179 /// The handler is executed when a transport is closed.
180 ///
181 /// It works for cleaning resources that may be tied to or depends on the transport's connection.
182 on_transport_closes_handler: Option<Box<OnTransportClosesHandler<T>>>,
183 /// The handler is executed when a transport is put to run.
184 ///
185 /// It works for executing a function which receives the Transport ID assigned by the server to a new running transport
186 on_transport_connected_handler: Option<Box<OnTransportConnected<T>>>,
187 /// Ports registered in the [`RpcServer`]
188 ports: HashMap<PortID, RpcServerPort<Context>>,
189 ports_by_transport_id: HashMap<TransportID, Vec<PortID>>,
190 /// RpcServer Context
191 context: Arc<Context>,
192 /// Handler in charge of handling every request<>response.
193 ///
194 /// It's stored inside an `Arc` because it'll be shared between threads
195 messages_handler: Arc<ServerMessagesHandler>,
196 /// `ServerEventsSender` structure that contains the sender half of a channel to send `ServerEvents` to the [`RpcServer`]
197 server_events_sender: ServerEventsSender<T>,
198 /// The receiver half of a channel that receives `ServerEvents` which the [`RpcServer`] has to react to
199 ///
200 /// It's an Option so that we can take ownership of it and remove it from the [`RpcServer`], and make it run in a background task
201 server_events_receiver: Option<UnboundedReceiver<ServerEvents<T>>>,
202 /// The id that will be assigned if a new transport is a attached
203 next_transport_id: u32,
204 /// THe id that will be assigned to a port when it's created.
205 next_port_id: u32,
206}
207impl<Context: Send + Sync + 'static, T: Transport + ?Sized + 'static> RpcServer<Context, T> {
208 pub fn create(ctx: Context) -> Self {
209 let channel = unbounded_channel();
210 Self {
211 transports: HashMap::new(),
212 port_creation_handler: None,
213 on_transport_connected_handler: None,
214 on_transport_closes_handler: None,
215 ports: HashMap::new(),
216 ports_by_transport_id: HashMap::new(),
217 context: Arc::new(ctx),
218 messages_handler: Arc::new(ServerMessagesHandler::new()),
219 next_transport_id: 1,
220 next_port_id: 1,
221 server_events_sender: ServerEventsSender(channel.0),
222 server_events_receiver: Some(channel.1),
223 }
224 }
225
226 /// Get a `ServerEventsSender` to send allowed server events from outside
227 pub fn get_server_events_sender(&self) -> ServerEventsSender<T> {
228 self.server_events_sender.clone()
229 }
230
231 /// Attaches the server half of the transport for Client<>Server communications
232 ///
233 /// It differs from sending the `ServerEvents::AtacchTransport` because it can only be used to attach transport from the current thread where the [`RpcServer`] was initalized due to the mutably borrow
234 ///
235 /// It receives the `Transport` inside an `Arc` because it must be sharable.
236 ///
237 pub async fn attach_transport(&mut self, transport: Arc<T>) -> ServerResult<()> {
238 self.new_transport_attached(transport).await
239 }
240
241 /// Sends the `ServerEvents::NewTransport` in order to make this new transport run in backround to receive its messages
242 ///
243 /// This function is used when a transport is attached with`RpcServer::attach_transport` and with the `ServerEventsSender::send_attach_transport`
244 ///
245 /// It receives the `Transport` inside an `Arc` because it must be sharable.
246 ///
247 async fn new_transport_attached(&mut self, transport: Arc<T>) -> ServerResult<()> {
248 let current_id = self.next_transport_id;
249 if let Err(error) = transport.send(server_ready_message().encode_to_vec()).await {
250 error!("> RpcServer > new_transport_attached > Error while sending server ready message: {error:?}");
251 if matches!(error, TransportError::Closed) {
252 return Err(ServerResultError::Internal(
253 ServerInternalError::TransportError,
254 ));
255 } else {
256 transport.close().await;
257 return Err(ServerResultError::Internal(
258 ServerInternalError::TransportError,
259 ));
260 }
261 }
262 self.server_events_sender
263 .send_new_transport(current_id, transport.clone())?;
264 if let Some(handler) = &self.on_transport_connected_handler {
265 handler(transport.clone(), current_id);
266 }
267 self.transports.insert(current_id, transport);
268 self.next_transport_id += 1;
269 Ok(())
270 }
271
272 /// Start processing `ServerEvent` events and listening on a channel for new `TransportNotification` that are sent by all the attached transports that are running in background tasks.
273 pub async fn run(&mut self) {
274 // create transports notifier. This channel will be in charge of sending all messages (and errors) that all the transports attached to server receieve
275 // We use async_channel crate for this channel because we want our receiver to be cloned so that we can close it when no more transports are open
276 // And after that, our server can exit because it knows that it wont receive more notifications
277 let (transports_notifier, mut transports_notification_receiver) =
278 unbounded_channel::<TransportNotification<T>>();
279 // Spawn a task to process ServerEvents in background
280 self.process_server_events(transports_notifier);
281 // loop on transports_notifier
282 loop {
283 // A transport here is the equivalent to a new connection in a common HTTP server
284 match transports_notification_receiver.recv().await {
285 Some(notification) => match notification {
286 TransportNotification::NewMessage(((transport, transport_id), event)) => {
287 match parse_header(&event) {
288 Some((message_type, message_number)) => {
289 match self
290 .handle_message(
291 transport_id,
292 event,
293 message_type,
294 message_number,
295 )
296 .await
297 {
298 Ok(_) => debug!("> RpcServer > Transport message handled!"),
299 Err(server_error) => match server_error {
300 ServerResultError::External(server_external_error) => {
301 error!("> RpcServer > Server External Error {server_external_error:?}");
302 // If a server error is external, we should send it back to the client
303 tokio::spawn(async move {
304 let mut remote_error: RemoteError =
305 server_external_error.into();
306 fill_remote_error(
307 &mut remote_error,
308 message_number,
309 );
310 if transport
311 .send(remote_error.encode_to_vec())
312 .await
313 .is_err()
314 {
315 error!("> RpcServer > Error on sending the a RemoteError to the client {remote_error:?}")
316 }
317 });
318 }
319 ServerResultError::Internal(server_internal_error) => {
320 error!("> RpcServer > Server Internal Error: {server_internal_error:?}")
321 }
322 },
323 }
324 }
325 None => {
326 error!("> RpcServer > A Invalid Header was sent by the client, message ignored");
327 continue;
328 }
329 }
330 }
331 TransportNotification::MustAttachTransport(transport) => {
332 if let Err(error) = self.new_transport_attached(transport).await {
333 error!("> RpcServer > Error on attaching transport to the server in order to receive message from it: {error:?}");
334 continue;
335 }
336 }
337 TransportNotification::CloseTransport(id) => {
338 if let Some(transport) = self.transports.remove(&id) {
339 if let Some(on_close_handler) = &self.on_transport_closes_handler {
340 on_close_handler(transport, id);
341 }
342 // Get port ids to drop ports
343 if let Some(port_ids) = self.ports_by_transport_id.remove(&id) {
344 for id in port_ids {
345 // Drop port
346 self.ports.remove(&id);
347 }
348 }
349 }
350 }
351 },
352 None => {
353 error!("> RpcServer > Transport notification receiver error");
354 break;
355 }
356 }
357 }
358 }
359
360 /// Process `ServerEvent` that are sent through the events channel.
361 ///
362 /// It spawns a background task to listen on the channel for new events and executes different actions depending on the event.
363 ///
364 /// # Events
365 /// - `ServerEvent::NewTransport` : Spawns a background task to listen on the transport for new `TransportEvent` and then it sends that new event to the [`RpcServer`]
366 /// - `ServerEvent::TransportFinished` : Collect in memory the amount of transports that already finished and when the amount is equal to the total running transport, it emits `ServerEvents::Terminated`
367 /// - `ServerEvent::Terminated` : Close the [`RpcServer`] transports notfier (channel) and events channel
368 ///
369 /// # Arguments
370 /// * `transports_notifier` - The channel which works as a notifier about events in each transport. It's cloned for each new spawned transport
371 ///
372 fn process_server_events(
373 &mut self,
374 transports_notifier: UnboundedSender<TransportNotification<T>>,
375 ) {
376 let mut events_receiver = if let Some(events_receiver) = self.server_events_receiver.take()
377 {
378 events_receiver
379 } else {
380 panic!("> RpcServer > process_server_events > misuse of process_server_events, seems to be called more than one time")
381 };
382
383 tokio::spawn(async move {
384 while let Some(event) = events_receiver.recv().await {
385 match event {
386 ServerEvents::NewTransport(id, transport) => {
387 let tx_cloned = transports_notifier.clone();
388 tokio::spawn(async move {
389 loop {
390 match transport.receive().await {
391 Ok(event) => {
392 if tx_cloned
393 .send(TransportNotification::NewMessage((
394 (transport.clone(), id),
395 event,
396 )))
397 .is_err()
398 {
399 error!("> From a Transport > Error while sending new message from transport to server via notifier");
400 break;
401 }
402 }
403 Err(error) => {
404 if matches!(error, TransportError::Closed) {
405 error!(
406 "> From a Transport > Transport is already closed. Breaking..."
407 );
408 if tx_cloned
409 .send(TransportNotification::CloseTransport(id))
410 .is_err()
411 {
412 error!("> From a Transport > Error while sending new message from transport to server via notifier");
413 break;
414 }
415 break;
416 }
417 error!("> From a Transport > Error on receiving {error:?}");
418 }
419 }
420 }
421 });
422 }
423 ServerEvents::AttachTransport(transport) => {
424 if transports_notifier
425 .send(TransportNotification::MustAttachTransport(transport))
426 .is_err()
427 {
428 error!("> From a Transport > Error while notifying the server to attach a new transport");
429 continue;
430 };
431 }
432 }
433 }
434 });
435 }
436
437 /// Set a handler for the port creation
438 ///
439 /// When a port is created, a service should be registered
440 /// for the port.
441 pub fn set_module_registrator_handler<H>(&mut self, handler: H)
442 where
443 H: Fn(&mut RpcServerPort<Context>) + Send + Sync + 'static,
444 {
445 self.port_creation_handler = Some(Box::new(handler));
446 }
447
448 /// Set a handler to be executed when a transport was closed
449 ///
450 /// When a transport closes its connection, the closure will be executed.
451 ///
452 /// This could be useful when there are resources that may be tied to or depends on a transport's connection
453 pub fn set_on_transport_closes_handler<H>(&mut self, handler: H)
454 where
455 H: Fn(Arc<T>, TransportID) + Send + Sync + 'static,
456 {
457 self.on_transport_closes_handler = Some(Box::new(handler));
458 }
459
460 /// Set a handler is executed when a transport is put to run.
461 ///
462 /// It works for executing a function which receives the Transport ID assigned by the server to a new running transport
463 pub fn set_on_transport_connected_handler<H>(&mut self, handler: H)
464 where
465 H: Fn(Arc<T>, TransportID) + Send + Sync + 'static,
466 {
467 self.on_transport_connected_handler = Some(Box::new(handler));
468 }
469
470 /// Handle the requests for a procedure call
471 ///
472 /// # Arguments
473 ///
474 /// * `transport` - The transport which sent the procedure request
475 /// * `message_number` - A 32-bit unsigned number created by `build_message_identifier` in `protocol/parse.rs`
476 /// * `payload` - Slice of bytes containing the request payload encoded with protobuf
477 async fn handle_request(
478 &self,
479 transport: Arc<T>,
480 transport_id: TransportID,
481 message_number: u32,
482 payload: Vec<u8>,
483 ) -> ServerResult<()> {
484 let request = Request::decode(payload.as_slice())
485 .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
486
487 match self.ports.get(&request.port_id) {
488 Some(port) => {
489 let transport_cloned = transport.clone();
490 let procedure_handler = port.get_procedure(request.procedure_id)?;
491 let procedure_ctx = ProcedureContext {
492 server_context: self.context.clone(),
493 transport_id,
494 };
495
496 match procedure_handler {
497 ProcedureDefinition::Unary(procedure_handler) => {
498 self.messages_handler.process_unary_request(
499 transport_cloned,
500 message_number,
501 procedure_handler(request.payload, procedure_ctx),
502 );
503 }
504 ProcedureDefinition::ServerStreams(procedure_handler) => {
505 self.messages_handler
506 // Cloned because the receiver of the function is an Arc. It'll be spawned in other thread and it needs to modify its state
507 .clone()
508 .process_server_streams_request(
509 transport_cloned,
510 message_number,
511 request.port_id,
512 procedure_handler(request.payload, procedure_ctx),
513 )
514 }
515 ProcedureDefinition::ClientStreams(procedure_handler) => {
516 let client_stream_id = request.client_stream;
517 let stream_protocol = StreamProtocol::new(
518 transport.clone(),
519 request.port_id,
520 request.client_stream,
521 );
522
523 let msg_handler = self.messages_handler.clone();
524 match stream_protocol
525 .start_processing(move || async move {
526 msg_handler.unregister_listener(client_stream_id).await
527 })
528 .await
529 {
530 Ok(listener) => {
531 self.messages_handler
532 .clone()
533 .process_client_streams_request(
534 transport_cloned,
535 message_number,
536 client_stream_id,
537 procedure_handler(
538 stream_protocol.to_generator(Some),
539 procedure_ctx,
540 ),
541 listener,
542 );
543 }
544 Err(_) => {
545 return Err(ServerResultError::Internal(
546 ServerInternalError::TransportError,
547 ))
548 }
549 }
550 }
551 ProcedureDefinition::BiStreams(procedure_handler) => {
552 let client_stream_id = request.client_stream;
553 let stream_protocol = StreamProtocol::new(
554 transport.clone(),
555 request.port_id,
556 request.client_stream,
557 );
558
559 let msg_handler = self.messages_handler.clone();
560 match stream_protocol
561 .start_processing(move || async move {
562 msg_handler.unregister_listener(client_stream_id).await
563 })
564 .await
565 {
566 Ok(listener) => {
567 self.messages_handler.clone().process_bidir_streams_request(
568 transport_cloned,
569 message_number,
570 request.port_id,
571 client_stream_id,
572 listener,
573 procedure_handler(
574 stream_protocol.to_generator(Some),
575 procedure_ctx,
576 ),
577 );
578 }
579 Err(_) => {
580 return Err(ServerResultError::Internal(
581 ServerInternalError::TransportError,
582 ))
583 }
584 }
585 }
586 }
587
588 Ok(())
589 }
590 _ => Err(ServerResultError::External(ServerError::PortNotFound(
591 request.port_id,
592 ))),
593 }
594 }
595
596 /// Handle the requests when a client wants to load a specific registered module and then starts calling the procedures
597 ///
598 /// # Arguments
599 ///
600 /// * `transport` - The transport which is requesting the module
601 /// * `message_number` - A 32-bit unsigned number created by `build_message_identifier` in `protocol/parse.rs`
602 /// * `payload` - Slice of bytes containing the request payload encoded with protobuf
603 async fn handle_request_module(
604 &mut self,
605 transport: Arc<T>,
606 message_number: u32,
607 payload: Vec<u8>,
608 ) -> ServerResult<()> {
609 let request_module = RequestModule::decode(payload.as_slice())
610 .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
611 if let Some(port) = self.ports.get_mut(&request_module.port_id) {
612 if let Ok(server_module_declaration) = port.load_module(request_module.module_name) {
613 let mut procedures: Vec<ModuleProcedure> = Vec::default();
614 for procedure in &server_module_declaration.procedures {
615 let module_procedure = ModuleProcedure {
616 procedure_name: procedure.procedure_name.clone(),
617 procedure_id: procedure.procedure_id,
618 };
619 procedures.push(module_procedure)
620 }
621
622 let response = RequestModuleResponse {
623 port_id: request_module.port_id,
624 message_identifier: build_message_identifier(
625 RpcMessageTypes::RequestModuleResponse as u32,
626 message_number,
627 ),
628 procedures,
629 };
630 let response = response.encode_to_vec();
631 transport
632 .send(response)
633 .await
634 .map_err(|_| ServerResultError::Internal(ServerInternalError::TransportError))?
635 } else {
636 return Err(ServerResultError::External(ServerError::LoadModuleError));
637 }
638 } else {
639 return Err(ServerResultError::External(ServerError::PortNotFound(
640 request_module.port_id,
641 )));
642 }
643
644 Ok(())
645 }
646
647 /// Handle the requests when a client wants to create a port.
648 ///
649 /// The `handler` registered with `set_handler` function is called here.
650 ///
651 /// # Arguments
652 ///
653 /// * `transport` - The transport which sent the request to create a port
654 /// * `message_number` - A 32-bit unsigned number created by `build_message_identifier` in `protocol/parse.rs`
655 /// * `payload` - Slice of bytes containing the request payload encoded with protobuf
656 async fn handle_create_port(
657 &mut self,
658 transport: Arc<T>,
659 transport_id: TransportID,
660 message_number: u32,
661 payload: Vec<u8>,
662 ) -> ServerResult<()> {
663 let port_id = self.next_port_id;
664 let create_port = CreatePort::decode(payload.as_slice())
665 .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
666 let port_name = create_port.port_name;
667 let mut port = RpcServerPort::new(port_name.clone());
668
669 if let Some(handler) = &self.port_creation_handler {
670 handler(&mut port);
671 }
672
673 let response = CreatePortResponse {
674 message_identifier: build_message_identifier(
675 RpcMessageTypes::CreatePortResponse as u32,
676 message_number,
677 ),
678 port_id,
679 };
680 let response = response.encode_to_vec();
681
682 transport
683 .send(response)
684 .await
685 .map_err(|_| ServerResultError::Internal(ServerInternalError::TransportError))?;
686
687 self.next_port_id += 1;
688 self.ports.insert(port_id, port);
689 self.ports_by_transport_id
690 .entry(transport_id)
691 .and_modify(|ports| ports.push(port_id))
692 .or_insert_with(|| vec![port_id]);
693
694 Ok(())
695 }
696
697 /// Handle the requests when a client wants to destroy a port because no longer needed
698 ///
699 /// # Arguments
700 ///
701 /// * `payload` - Vec of bytes containing the request payload encoded with protobuf
702 fn handle_destroy_port(&mut self, payload: Vec<u8>) -> ServerResult<()> {
703 let destroy_port = DestroyPort::decode(payload.as_slice())
704 .map_err(|_| ServerResultError::External(ServerError::ProtocolError))?;
705
706 self.ports.remove(&destroy_port.port_id);
707 Ok(())
708 }
709
710 /// Handle every request from the client.
711 ///
712 /// Then, parse the "header" that contains the `message_type` and `message_identifier`
713 ///
714 /// This allows us know which function should finially handle the request
715 ///
716 /// # Arguments
717 ///
718 /// * `transport_id` - The transport ID which sent a new message to be processed
719 /// * `payload` - Vec of bytes containing the request payload encoded with protobuf
720 /// * `message_type` - [`RpcMessageTypes`] the protocol type of the message
721 /// * `message_number` - the number of the message derivided from the `message_identifier` in the [`crate::rpc_protocol::RpcMessageHeader`]
722 async fn handle_message(
723 &mut self,
724 transport_id: TransportID,
725 payload: Vec<u8>,
726 message_type: RpcMessageTypes,
727 message_number: u32,
728 ) -> ServerResult<()> {
729 let transport = self
730 .transports
731 .get(&transport_id)
732 .ok_or(ServerResultError::Internal(
733 ServerInternalError::TransportNotAttached,
734 ))?
735 .clone();
736 match message_type {
737 RpcMessageTypes::Request => {
738 self.handle_request(transport, transport_id, message_number, payload)
739 .await?
740 }
741 RpcMessageTypes::RequestModule => {
742 self.handle_request_module(transport, message_number, payload)
743 .await?
744 }
745 RpcMessageTypes::CreatePort => {
746 self.handle_create_port(transport, transport_id, message_number, payload)
747 .await?
748 }
749 RpcMessageTypes::DestroyPort => self.handle_destroy_port(payload)?,
750 RpcMessageTypes::StreamAck => {
751 // Client akcnowledged a stream message sent by Server
752 // and we should notify the waiter for the ack in order to
753 // continue sending streams to Client
754 self.messages_handler
755 .streams_handler
756 .clone()
757 .message_acknowledged_by_peer(message_number, payload)
758 }
759 RpcMessageTypes::StreamMessage => {
760 // Client has a client stream request type opened and we should
761 // notify our listener for the client message id that we have a new message to process
762 self.messages_handler
763 .clone()
764 .notify_new_client_stream(message_number, payload)
765 }
766 _ => {
767 debug!("Unknown message");
768 }
769 };
770
771 Ok(())
772 }
773}
774
775/// RpcServerPort is what a RpcServer contains to handle different services/modules
776pub struct RpcServerPort<Context> {
777 /// RpcServer name
778 pub name: String,
779 /// Registered modules contains the name and module/service definition
780 ///
781 /// A module can be registered but not loaded
782 registered_modules: HashMap<String, ServiceModuleDefinition<Context>>,
783 /// Loaded modules contains the name and a collection of procedures with id and the name for each one
784 ///
785 /// A module is loaded when the client requests to.
786 loaded_modules: HashMap<String, ServerModuleDeclaration>,
787 /// Procedures contains the id and the handler for each procedure
788 procedures: HashMap<u32, ProcedureDefinition<Context>>,
789 /// Global Procedure ID
790 next_procedure_id: u32,
791}
792
793impl<Context> RpcServerPort<Context> {
794 fn new(name: String) -> Self {
795 RpcServerPort {
796 name,
797 registered_modules: HashMap::new(),
798 loaded_modules: HashMap::new(),
799 procedures: HashMap::new(),
800 next_procedure_id: 1,
801 }
802 }
803
804 /// Just register the module in the port
805 pub fn register_module(
806 &mut self,
807 module_name: String,
808 service_definition: ServiceModuleDefinition<Context>,
809 ) {
810 self.registered_modules
811 .insert(module_name, service_definition);
812 }
813
814 /// It checks if the module is already loaded and return it.
815 ///
816 /// Otherwise, it will get the module definition from the `registered_modules` and load it
817 fn load_module(&mut self, module_name: String) -> ServerResult<&ServerModuleDeclaration> {
818 if self.loaded_modules.contains_key(&module_name) {
819 Ok(self
820 .loaded_modules
821 .get(&module_name)
822 .expect("Already checked."))
823 } else {
824 match self.registered_modules.get(&module_name) {
825 None => Err(ServerResultError::External(ServerError::ModuleNotFound(
826 module_name,
827 ))),
828 Some(module_generator) => {
829 let mut server_module_declaration = ServerModuleDeclaration {
830 procedures: Vec::new(),
831 };
832
833 let definitions = module_generator.get_definitions();
834
835 for (procedure_name, procedure_definition) in definitions {
836 let current_id = self.next_procedure_id;
837 self.procedures
838 .insert(current_id, procedure_definition.clone());
839 server_module_declaration
840 .procedures
841 .push(ServerModuleProcedure {
842 procedure_name: procedure_name.clone(),
843 procedure_id: current_id,
844 });
845 self.next_procedure_id += 1;
846 }
847
848 self.loaded_modules
849 .insert(module_name.clone(), server_module_declaration);
850
851 let module_definition = self
852 .loaded_modules
853 .get(&module_name)
854 .ok_or(ServerResultError::External(ServerError::LoadModuleError))?;
855 Ok(module_definition)
856 }
857 }
858 }
859 }
860
861 /// It will look up the procedure id in the port's `procedures` and return the procedure's handler
862 fn get_procedure(&self, procedure_id: u32) -> ServerResult<ProcedureDefinition<Context>> {
863 match self.procedures.get(&procedure_id) {
864 Some(procedure_definition) => Ok(procedure_definition.clone()),
865 _ => Err(ServerResultError::External(ServerError::ProcedureNotFound(
866 procedure_id,
867 ))),
868 }
869 }
870}
871
872#[derive(Debug)]
873pub struct ServerModuleProcedure {
874 pub procedure_name: String,
875 pub procedure_id: u32,
876}
877
878/// Used to store all the procedures in the `loaded_modules` fields inside [`RpcServerPort`]
879pub struct ServerModuleDeclaration {
880 /// Array with all the module's (service) procedures
881 pub procedures: Vec<ServerModuleProcedure>,
882}