Skip to main content

dingtalk_stream/
handlers.rs

1//! Handlers module for DingTalk Stream SDK
2//!
3//! Provides trait-based handlers for different message types
4
5use crate::frames::down_message::callback_message::CallbackMessage;
6use crate::frames::down_message::event_message::EventMessage;
7use crate::frames::down_message::system_message::SystemMessage;
8use crate::frames::down_message::MessageTopic;
9use crate::frames::up_message::callback_message::WebhookMessage;
10use crate::DingTalkStream;
11use async_trait::async_trait;
12use std::fmt::{Display, Formatter};
13use tokio::sync::mpsc::Sender;
14use tokio_tungstenite::tungstenite::Message;
15
16/// Callback handler trait for handling callback messages
17#[async_trait]
18pub trait CallbackHandler: Send + Sync {
19    /// Process a callback message
20    async fn process(
21        &self,
22        client: &DingTalkStream,
23        message: &CallbackMessage,
24        cb_webhook_msg_sender: Option<Sender<WebhookMessage>>,
25    ) -> Result<Resp, Error>;
26
27    /// Pre-start hook
28    fn pre_start(&self) {}
29
30    /// Get the topic this handler handles
31    fn topic(&self) -> &MessageTopic;
32}
33
34/// Event handler trait for handling event messages
35#[async_trait]
36pub trait EventHandler: Send + Sync {
37    /// Process an event message
38    async fn process(&self, message: &EventMessage) -> Result<Resp, Error>;
39
40    /// Pre-start hook
41    fn pre_start(&self) {}
42}
43
44/// System handler trait for handling system messages
45#[async_trait]
46pub trait SystemHandler: Send + Sync {
47    /// Process a system message
48    async fn process(&self, message: &SystemMessage) -> Result<Resp, Error>;
49
50    /// Pre-start hook
51    fn pre_start(&self) {}
52}
53
54#[derive(Debug, Clone)]
55pub enum Resp {
56    Text(String),
57    Json(serde_json::Value),
58}
59
60impl Display for Resp {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        match self {
63            Resp::Text(text) => write!(f, "Text: {}", text),
64            Resp::Json(json) => write!(f, "JSON: {}", json),
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct Error {
71    pub msg: String,
72    pub code: ErrorCode,
73}
74
75impl Display for Error {
76    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77        f.write_str(&format!("Error: {} (Code: {})", self.msg, self.code))
78    }
79}
80
81impl std::error::Error for Error {}
82
83#[derive(Debug, Clone, Copy)]
84#[repr(i32)]
85pub enum ErrorCode {
86    BadRequest = 400i32,
87    Unauthorized = 401,
88    Forbidden = 403,
89    NotFound = 404,
90    MethodNotAllowed = 405,
91    TooManyRequests = 429,
92    InternalServerError = 500,
93    BadGateway = 502,
94    ServiceUnavailable = 503,
95    GatewayTimeout = 504,
96}
97
98impl Display for ErrorCode {
99    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
100        f.write_str(&format!("{}", *self as i32))
101    }
102}
103
104/// Default no-op callback handler
105pub struct DefaultCallbackHandler {
106    pub topic: MessageTopic,
107}
108
109impl DefaultCallbackHandler {
110    pub fn new(topic: &str) -> Self {
111        Self {
112            topic: MessageTopic::Callback(topic.to_string()),
113        }
114    }
115}
116
117#[derive(Debug, Copy, Clone)]
118pub enum LifecycleEvent<'a> {
119    Start,
120    Connecting {
121        websocket_url: &'a str,
122    },
123    Connected {
124        websocket_url: &'a str,
125    },
126    WebsocketWrite {
127        payload: &'a str,
128        result: &'a crate::Result<()>,
129    },
130    WebsocketWriteWithRetry {
131        payload: &'a str,
132        cnt: u8,
133        result: &'a crate::Result<()>,
134    },
135    WebsocketRead {
136        result: &'a crate::Result<Message>,
137    },
138    Keepalive {
139        payload: &'a str,
140        result: &'a crate::Result<()>,
141    },
142    Disconnected {
143        result: &'a crate::Result<()>,
144    },
145    Stopped,
146}
147#[allow(unused)]
148#[async_trait]
149pub trait LifecycleListener: Send + Sync {
150    async fn on_event<'a>(&self, client: &DingTalkStream, event: LifecycleEvent<'a>) {}
151
152    async fn on_start(&self, client: &DingTalkStream) {
153        let _ = self.on_event(client, LifecycleEvent::Start).await;
154    }
155
156    async fn on_connecting(&self, client: &DingTalkStream, websocket_url: &str) {
157        let _ = self
158            .on_event(client, LifecycleEvent::Connecting { websocket_url })
159            .await;
160    }
161
162    async fn on_connected(&self, client: &DingTalkStream, websocket_url: &str) {
163        let _ = self
164            .on_event(client, LifecycleEvent::Connected { websocket_url })
165            .await;
166    }
167
168    async fn on_websocket_write(
169        &self,
170        client: &DingTalkStream,
171        payload: &str,
172        result: &crate::Result<()>,
173    ) {
174        let _ = self
175            .on_event(client, LifecycleEvent::WebsocketWrite { payload, result })
176            .await;
177    }
178
179    async fn on_websocket_write_with_retry(
180        &self,
181        client: &DingTalkStream,
182        payload: &str,
183        cnt: u8,
184        result: &crate::Result<()>,
185    ) {
186        let _ = self
187            .on_event(
188                client,
189                LifecycleEvent::WebsocketWriteWithRetry {
190                    payload,
191                    cnt,
192                    result,
193                },
194            )
195            .await;
196    }
197
198    async fn on_websocket_read(&self, client: &DingTalkStream, result: &crate::Result<Message>) {
199        let _ = self
200            .on_event(client, LifecycleEvent::WebsocketRead { result })
201            .await;
202    }
203
204    async fn on_keepalive(
205        &self,
206        client: &DingTalkStream,
207        payload: &str,
208        result: &crate::Result<()>,
209    ) {
210        let _ = self
211            .on_event(client, LifecycleEvent::Keepalive { payload, result })
212            .await;
213    }
214
215    async fn on_disconnected(&self, client: &DingTalkStream, result: &crate::Result<()>) {
216        let _ = self
217            .on_event(client, LifecycleEvent::Disconnected { result })
218            .await;
219    }
220
221    async fn on_stopped(&self, client: &DingTalkStream) {
222        let _ = self.on_event(client, LifecycleEvent::Stopped).await;
223    }
224}
225
226#[derive(Debug, Clone, Copy, Default)]
227pub(crate) struct DefaultLifecycleListener;
228
229impl LifecycleListener for DefaultLifecycleListener {}