rmp_ipc/ipc/
mod.rs

1use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
2use crate::events::event_handler::EventHandler;
3use crate::namespaces::namespace::Namespace;
4use crate::prelude::*;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::net::tcp::OwnedReadHalf;
8
9pub mod builder;
10pub mod client;
11pub mod context;
12pub mod server;
13pub mod stream_emitter;
14
15/// Handles listening to a connection and triggering the corresponding event functions
16async fn handle_connection(
17    namespaces: Arc<HashMap<String, Namespace>>,
18    handler: Arc<EventHandler>,
19    mut read_half: OwnedReadHalf,
20    ctx: Context,
21) {
22    while let Ok(event) = Event::from_async_read(&mut read_half).await {
23        tracing::trace!(
24            "event.name = {:?}, event.namespace = {:?}, event.reference_id = {:?}",
25            event.name(),
26            event.namespace(),
27            event.reference_id()
28        );
29        // check if the event is a reply
30        if let Some(ref_id) = event.reference_id() {
31            tracing::trace!("Event has reference id. Passing to reply listener");
32            // get the listener for replies
33            if let Some(sender) = ctx.get_reply_sender(ref_id).await {
34                // try sending the event to the listener for replies
35                if let Err(event) = sender.send(event) {
36                    handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
37                }
38                continue;
39            }
40            tracing::trace!("No response listener found for event. Passing to regular listener.");
41        }
42        if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) {
43            tracing::trace!("Passing event to namespace listener");
44            let handler = Arc::clone(&namespace.handler);
45            handle_event(Context::clone(&ctx), handler, event);
46        } else {
47            tracing::trace!("Passing event to global listener");
48            handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
49        }
50    }
51    tracing::debug!("Connection closed.");
52}
53
54/// Handles a single event in a different tokio context
55fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
56    tokio::spawn(async move {
57        let id = event.id();
58        if let Err(e) = handler.handle_event(&ctx, event).await {
59            // emit an error event
60            if let Err(e) = ctx
61                .emitter
62                .emit_response(
63                    id,
64                    ERROR_EVENT_NAME,
65                    ErrorEventData {
66                        message: format!("{:?}", e),
67                        code: 500,
68                    },
69                )
70                .await
71            {
72                tracing::error!("Error occurred when sending error response: {:?}", e);
73            }
74            tracing::error!("Failed to handle event: {:?}", e);
75        }
76    });
77}