asteroid_mq_sdk/
event.rs

1use std::{collections::HashMap, future::Future, pin::Pin};
2
3use crate::{ClientEndpoint, ClientNode, ClientNodeError, ClientReceivedMessage, MessageAckHandle};
4use asteroid_mq_model::{
5    event::{Event, EventAttribute, EventCodec, Handler},
6    MessageAckExpectKind, Subject, TopicCode, WaitAckSuccess,
7};
8use tracing::Instrument;
9
10impl ClientNode {
11    pub async fn send_event<E: Event>(
12        &self,
13        topic: TopicCode,
14        event: E,
15    ) -> Result<MessageAckHandle, ClientNodeError> {
16        let handle = self.send_message(event.into_edge_message(topic)).await?;
17        Ok(handle)
18    }
19    pub async fn send_event_and_wait<E: Event>(
20        &self,
21        topic: TopicCode,
22        event: E,
23    ) -> Result<WaitAckSuccess, ClientNodeError> {
24        let handle = self.send_event(topic, event).await?;
25        let success = handle.wait().await?;
26        Ok(success)
27    }
28}
29
30type InnerEventHandler = dyn Fn(
31        ClientReceivedMessage,
32    ) -> Pin<Box<dyn Future<Output = Result<(), crate::error::ClientNodeError>> + Send>>
33    + Send;
34
35pub struct HandleEventLoop {
36    ep: ClientEndpoint,
37    handlers: HashMap<Subject, Box<InnerEventHandler>>,
38}
39
40impl HandleEventLoop {
41    pub fn new(ep: ClientEndpoint) -> Self {
42        Self {
43            ep,
44            handlers: Default::default(),
45        }
46    }
47    pub fn with_handler<A>(mut self, handler: impl Handler<A>) -> Self {
48        self.register_handler(handler);
49        self
50    }
51
52    pub fn register_handler<H, A>(&mut self, handler: H)
53    where
54        H: Handler<A>,
55    {
56        let subject = H::Event::SUBJECT;
57        tracing::debug!(?subject, "register event handler");
58        let inner_handler = Box::new(move |message: ClientReceivedMessage| {
59            let handler = handler.clone();
60            Box::pin(async move {
61                if H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Received
62                    || H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Processed
63                {
64                    message.ack_received().await?;
65                }
66                let event = match H::Event::from_bytes(message.payload.clone().into_inner()) {
67                    Some(msg) => msg,
68                    None => {
69                        tracing::debug!("failed to decode event, this message will be ignored");
70                        if H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Processed {
71                            message.ack_failed().await?;
72                        }
73                        return Ok(());
74                    }
75                };
76                let handle_result = handler.handle(event).await;
77                if H::Event::EXPECT_ACK_KIND == MessageAckExpectKind::Processed {
78                    if let Err(e) = handle_result {
79                        tracing::warn!("failed to handle event: {:?}", e);
80                        message.ack_failed().await?;
81                    } else {
82                        message.ack_processed().await?;
83                    }
84                }
85
86                Ok(())
87            })
88                as Pin<Box<dyn Future<Output = Result<(), crate::error::ClientNodeError>> + Send>>
89        });
90        self.handlers.insert(subject, inner_handler);
91    }
92
93    pub async fn run(mut self) {
94        loop {
95            let Ok(message) = self.ep.next_message_and_auto_respawn().await else {
96                break;
97            };
98            tracing::trace!(?message, "handle message");
99            let subjects = message.header.subjects.clone();
100            for subject in subjects.iter() {
101                let Some(handler) = self.handlers.get(subject) else {
102                    tracing::debug!(?subject, "no handler found");
103                    continue;
104                };
105                let fut = (handler)(message.clone());
106                let subject = subject.clone();
107                tokio::spawn(async move {
108                    let result = fut
109                        .instrument(tracing::info_span!("event_handler", %subject))
110                        .await;
111                    if let Err(e) = result {
112                        tracing::warn!(error=%e, "handler process error")
113                    }
114                });
115            }
116        }
117    }
118
119    pub fn spawn(self) -> tokio::task::JoinHandle<()> {
120        tokio::spawn(self.run())
121    }
122}
123
124impl ClientEndpoint {
125    pub fn into_event_loop(self) -> HandleEventLoop {
126        HandleEventLoop::new(self)
127    }
128}