1use std::{collections::HashMap, future::Future, pin::Pin};
2
3use crate::{ClientEndpoint, ClientNode, ClientNodeError, ClientReceivedMessage, MessageAckHandle};
4use asteroid_mq_model::{
5 event::{Event, EventAttribute, EventCodec, Handler},
6 MessageAckExpectKind, Subject, TopicCode, WaitAckSuccess,
7};
8use tracing::Instrument;
9
10impl ClientNode {
11 pub async fn send_event<E: Event>(
12 &self,
13 topic: TopicCode,
14 event: E,
15 ) -> Result<MessageAckHandle, ClientNodeError> {
16 let handle = self.send_message(event.into_edge_message(topic)).await?;
17 Ok(handle)
18 }
19 pub async fn send_event_and_wait<E: Event>(
20 &self,
21 topic: TopicCode,
22 event: E,
23 ) -> Result<WaitAckSuccess, ClientNodeError> {
24 let handle = self.send_event(topic, event).await?;
25 let success = handle.wait().await?;
26 Ok(success)
27 }
28}
29
30type InnerEventHandler = dyn Fn(
31 ClientReceivedMessage,
32 ) -> Pin<Box<dyn Future<Output = Result<(), crate::error::ClientNodeError>> + Send>>
33 + Send;
34
35pub struct HandleEventLoop {
36 ep: ClientEndpoint,
37 handlers: HashMap<Subject, Box<InnerEventHandler>>,
38}
39
40impl HandleEventLoop {
41 pub fn new(ep: ClientEndpoint) -> Self {
42 Self {
43 ep,
44 handlers: Default::default(),
45 }
46 }
47 pub fn with_handler<A>(mut self, handler: impl Handler<A>) -> Self {
48 self.register_handler(handler);
49 self
50 }
51
52 pub fn register_handler<H, A>(&mut self, handler: H)
53 where
54 H: Handler<A>,
55 {
56 let subject = H::Event::SUBJECT;
57 tracing::debug!(?subject, "register event handler");
58 let inner_handler = Box::new(move |message: ClientReceivedMessage| {
59 let handler = handler.clone();
60 Box::pin(async move {
61 if H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Received
62 || H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Processed
63 {
64 message.ack_received().await?;
65 }
66 let event = match H::Event::from_bytes(message.payload.clone().into_inner()) {
67 Some(msg) => msg,
68 None => {
69 tracing::debug!("failed to decode event, this message will be ignored");
70 if H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Processed {
71 message.ack_failed().await?;
72 }
73 return Ok(());
74 }
75 };
76 let handle_result = handler.handle(event).await;
77 if H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Processed {
78 if let Err(e) = handle_result {
79 tracing::warn!("failed to handle event: {:?}", e);
80 message.ack_failed().await?;
81 } else {
82 message.ack_processed().await?;
83 }
84 }
85
86 Ok(())
87 })
88 as Pin<Box<dyn Future<Output = Result<(), crate::error::ClientNodeError>> + Send>>
89 });
90 self.handlers.insert(subject, inner_handler);
91 }
92
93 pub async fn run(mut self) {
94 loop {
95 let Ok(message) = self.ep.next_message_and_auto_respawn().await else {
96 break;
97 };
98 tracing::trace!(?message, "handle message");
99 let subjects = message.header.subjects.clone();
100 for subject in subjects.iter() {
101 let Some(handler) = self.handlers.get(subject) else {
102 tracing::debug!(?subject, "no handler found");
103 continue;
104 };
105 let fut = (handler)(message.clone());
106 let subject = subject.clone();
107 tokio::spawn(async move {
108 let result = fut
109 .instrument(tracing::info_span!("event_handler", %subject))
110 .await;
111 if let Err(e) = result {
112 tracing::warn!(error=%e, "handler process error")
113 }
114 });
115 }
116 }
117 }
118
119 pub fn spawn(self) -> tokio::task::JoinHandle<()> {
120 tokio::spawn(self.run())
121 }
122}
123
124impl ClientEndpoint {
125 pub fn into_event_loop(self) -> HandleEventLoop {
126 HandleEventLoop::new(self)
127 }
128}