1use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
2use crate::events::event_handler::EventHandler;
3use crate::namespaces::namespace::Namespace;
4use crate::prelude::*;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::net::tcp::OwnedReadHalf;
8
9pub mod builder;
10pub mod client;
11pub mod context;
12pub mod server;
13pub mod stream_emitter;
14
15async fn handle_connection(
17 namespaces: Arc<HashMap<String, Namespace>>,
18 handler: Arc<EventHandler>,
19 mut read_half: OwnedReadHalf,
20 ctx: Context,
21) {
22 while let Ok(event) = Event::from_async_read(&mut read_half).await {
23 tracing::trace!(
24 "event.name = {:?}, event.namespace = {:?}, event.reference_id = {:?}",
25 event.name(),
26 event.namespace(),
27 event.reference_id()
28 );
29 if let Some(ref_id) = event.reference_id() {
31 tracing::trace!("Event has reference id. Passing to reply listener");
32 if let Some(sender) = ctx.get_reply_sender(ref_id).await {
34 if let Err(event) = sender.send(event) {
36 handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
37 }
38 continue;
39 }
40 tracing::trace!("No response listener found for event. Passing to regular listener.");
41 }
42 if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) {
43 tracing::trace!("Passing event to namespace listener");
44 let handler = Arc::clone(&namespace.handler);
45 handle_event(Context::clone(&ctx), handler, event);
46 } else {
47 tracing::trace!("Passing event to global listener");
48 handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
49 }
50 }
51 tracing::debug!("Connection closed.");
52}
53
54fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
56 tokio::spawn(async move {
57 let id = event.id();
58 if let Err(e) = handler.handle_event(&ctx, event).await {
59 if let Err(e) = ctx
61 .emitter
62 .emit_response(
63 id,
64 ERROR_EVENT_NAME,
65 ErrorEventData {
66 message: format!("{:?}", e),
67 code: 500,
68 },
69 )
70 .await
71 {
72 tracing::error!("Error occurred when sending error response: {:?}", e);
73 }
74 tracing::error!("Failed to handle event: {:?}", e);
75 }
76 });
77}