Skip to main content

fishpi_sdk/api/
chat.rs

1//! 私聊 API 模块
2//!
3//! 这个模块提供了与私聊相关的 API 操作,包括连接私聊 WebSocket、监听私聊事件、获取消息列表、历史消息、标记已读、撤回消息等功能。
4//! 主要结构体是 `Chat`,用于管理私聊的 WebSocket 连接和事件监听。
5//! 事件通过 `ChatEventData` 枚举表示,支持通知、数据、撤回等类型。
6//!
7//! # 主要组件
8//!
9//! - [`Chat`] - 私聊客户端结构体,负责连接、发送消息和管理监听器。
10//! - [`ChatHandler`] - 私聊消息处理器,实现 `MessageHandler` trait,处理 WebSocket 消息并发射事件。
11//! - [`ChatEventData`] - 私聊事件数据枚举,包装所有事件类型(如通知、数据、撤回等)。
12//! - [`ChatEventType`] - 私聊事件类型枚举,用于标识事件种类。
13//! - [`ChatListener`] - 私聊事件监听器类型别名,定义监听器函数的签名。
14//!
15//! # 方法列表
16//!
17//! - [`Chat::new`] - 创建新的私聊客户端实例。
18//! - [`Chat::connect`] - 连接私聊 WebSocket。
19//! - [`Chat::reconnect`] - 重连私聊 WebSocket。
20//! - [`Chat::on_notice`] - 监听通知消息事件。
21//! - [`Chat::on_data`] - 监听普通消息事件。
22//! - [`Chat::on_revoke`] - 监听消息撤回事件。
23//! - [`Chat::off`] - 移除事件监听器。
24//! - [`Chat::disconnect`] - 断开连接。
25//! - [`Chat::list`] - 获取有私聊用户列表第一条消息。
26//! - [`Chat::history`] - 获取用户私聊历史消息。
27//! - [`Chat::mark_as_read`] - 标记用户消息已读。
28//! - [`Chat::unread`] - 获取未读消息。
29//! - [`Chat::revoke`] - 撤回私聊消息。
30//!
31//! # 示例
32//!
33//! ```rust,no_run
34//! use fishpi_sdk::api::chat::Chat;
35//! use fishpi_sdk::model::chat::{ChatData, ChatNotice, ChatRevoke};
36//!
37//! #[tokio::main]
38//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
39//!     let mut chat = Chat::new("your_api_key".to_string());
40//!
41//!     // 监听通知消息(直接传递 ChatNotice,无需 match)
42//!     chat.on_notice(|notice: ChatNotice| {
43//!         println!("Notice: {}", notice.preview);
44//!     }).await;
45//!
46//!     // 监听普通消息
47//!     chat.on_data(|data: ChatData| {
48//!         println!("Message: {}", data.content);
49//!     }).await;
50//!
51//!     // 监听撤回消息
52//!     chat.on_revoke(|revoke: ChatRevoke| {
53//!         println!("Revoked: {}", revoke.data);
54//!     }).await;
55//!
56//!     // 连接私聊
57//!     chat.connect(false, Some("target_user".to_string())).await?;
58//!
59//!     // 获取历史消息
60//!     let history = chat.history("target_user".to_string(), 1, 20, true).await?;
61//!     for msg in history {
62//!         println!("History: {}", msg.content);
63//!     }
64//!
65//!     Ok(())
66//! }
67//! ```
68//!
69//! # 事件类型
70//!
71//! 私聊支持以下事件类型(通过特定 `on_*` 方法监听):
72//!
73//! - `Notice` - 通知消息。
74//! - `Data` - 普通消息。
75//! - `Revoke` - 消息撤回。
76
77use crate::{
78    api::ws::{
79        ParsedMessageHandler, RetryPolicy, WebSocketError, WsConnection, WsLogHook, build_ws_url,
80    },
81    model::chat::{ChatData, ChatMsgType, ChatNotice, ChatRevoke},
82    utils::{build_http_path, error::Error, get},
83};
84use serde_json::Value;
85use std::{str::FromStr, sync::Arc};
86
87const DOMAIN: &str = "fishpi.cn";
88
89#[derive(Clone, Debug)]
90pub enum ChatEventData {
91    Notice(ChatNotice),
92    Data(ChatData),
93    Revoke(ChatRevoke),
94}
95
96/// 私聊事件类型枚举
97#[derive(Debug, Clone, PartialEq, Eq, Hash)]
98pub enum ChatEventType {
99    Notice,
100    Data,
101    Revoke,
102}
103pub type ChatListener = Arc<dyn Fn(ChatEventData) + Send + Sync + 'static>;
104
105/// 消息处理器
106pub type ChatHandler = ParsedMessageHandler<ChatEventType, ChatEventData>;
107
108/// 解析私聊消息,返回(事件类型,事件数据)
109#[allow(non_snake_case)]
110fn parse_chat_message(json: &Value) -> Result<(ChatEventType, ChatEventData), Error> {
111    let event_type = detect_chat_msg_type(json)?;
112    let payload = json.get("data").filter(|v| !v.is_null()).unwrap_or(json);
113
114    match event_type {
115        ChatMsgType::Notice => {
116            let notice = ChatNotice::from_value(payload).or_else(|_| ChatNotice::from_value(json))?;
117            Ok((ChatEventType::Notice, ChatEventData::Notice(notice)))
118        }
119        ChatMsgType::Data => {
120            let data = ChatData::from_value(payload).or_else(|_| ChatData::from_value(json))?;
121            Ok((ChatEventType::Data, ChatEventData::Data(data)))
122        }
123        ChatMsgType::Revoke => {
124            let revoke = ChatRevoke::from_value(payload).or_else(|_| ChatRevoke::from_value(json))?;
125            Ok((ChatEventType::Revoke, ChatEventData::Revoke(revoke)))
126        }
127    }
128}
129
130fn detect_chat_msg_type(json: &Value) -> Result<ChatMsgType, Error> {
131    let candidates = [
132        json.get("type"),
133        json.get("command"),
134        json.get("data").and_then(|v| v.get("type")),
135        json.get("data").and_then(|v| v.get("command")),
136    ];
137
138    for candidate in candidates {
139        if let Some(raw) = candidate.and_then(|v| v.as_str())
140            && let Ok(t) = ChatMsgType::from_str(raw)
141        {
142            return Ok(t);
143        }
144    }
145
146    let payload = json.get("data").filter(|v| v.is_object()).unwrap_or(json);
147
148    // 兼容部分节点的无 type/command 推送格式
149    if payload.get("senderUserName").is_some() && payload.get("receiverUserName").is_some() {
150        return Ok(ChatMsgType::Data);
151    }
152    if payload.get("userId").is_some() && payload.get("preview").is_some() {
153        return Ok(ChatMsgType::Notice);
154    }
155    if payload.get("data").and_then(|v| v.as_str()).is_some() {
156        return Ok(ChatMsgType::Revoke);
157    }
158
159    Err(Error::Parse("Missing type/command field".to_string()))
160}
161
162/// 私聊客户端
163pub struct Chat {
164    connection: WsConnection,
165    handler: ChatHandler,
166    api_key: String,
167}
168
169impl Chat {
170    pub fn new(api_key: String) -> Self {
171        Self {
172            connection: WsConnection::new(),
173            handler: ChatHandler::new(parse_chat_message, None, "chat"),
174            api_key,
175        }
176    }
177
178    fn ws_url(&self, user: Option<&str>) -> Result<String, WebSocketError> {
179        let mut params = vec![("apiKey", self.api_key.clone())];
180        let path = if let Some(user) = user {
181            params.push(("toUser", user.to_string()));
182            "chat-channel"
183        } else {
184            "user-channel"
185        };
186
187        build_ws_url(DOMAIN, path, &params)
188    }
189
190    pub async fn connect(
191        &mut self,
192        reload: bool,
193        user: Option<String>,
194    ) -> Result<(), WebSocketError> {
195        let url = self.ws_url(user.as_deref())?;
196
197        self.connection
198            .connect(reload, &url, self.handler.clone())
199            .await
200    }
201
202    /// 重连
203    pub async fn reconnect(&mut self, user: Option<String>) -> Result<(), WebSocketError> {
204        let url = self.ws_url(user.as_deref())?;
205
206        self.connection.reconnect(&url, self.handler.clone()).await
207    }
208
209    pub fn set_reconnect_policy(&mut self, policy: RetryPolicy) {
210        self.connection.set_retry_policy(policy);
211    }
212
213    pub fn on_ws_log<F>(&mut self, hook: F)
214    where
215        F: Fn(&str) + Send + Sync + 'static,
216    {
217        let hook = Arc::new(hook) as WsLogHook;
218        self.connection.set_log_hook_arc(hook.clone());
219        self.handler.set_log_hook_arc(hook);
220    }
221
222    /// 监听通知消息事件
223    pub async fn on_notice<F>(&self, listener: F)
224    where
225        F: Fn(ChatNotice) + Send + Sync + 'static,
226    {
227        self.add_listener(ChatEventType::Notice, move |event: ChatEventData| {
228            if let ChatEventData::Notice(notice) = event {
229                listener(notice);
230            }
231        }).await;
232    }
233
234    /// 监听普通消息事件
235    pub async fn on_data<F>(&self, listener: F)
236    where
237        F: Fn(ChatData) + Send + Sync + 'static,
238    {
239        self.add_listener(ChatEventType::Data, move |event: ChatEventData| {
240            if let ChatEventData::Data(data) = event {
241                listener(data);
242            }
243        }).await;
244    }
245
246    /// 监听消息撤回事件
247    pub async fn on_revoke<F>(&self, listener: F)
248    where
249        F: Fn(ChatRevoke) + Send + Sync + 'static,
250    {
251        self.add_listener(ChatEventType::Revoke, move |event: ChatEventData| {
252            if let ChatEventData::Revoke(revoke) = event {
253                listener(revoke);
254            }
255        }).await;
256    }
257
258    async fn add_listener<F>(&self, event: ChatEventType, listener: F)
259    where
260        F: Fn(ChatEventData) + Send + Sync + 'static,
261    {
262        self.handler.get_emitter().add_listener(event, listener).await;
263    }
264
265    /// 移除监听
266    pub async fn off(&self, event: ChatEventType) {
267        self.handler
268            .get_emitter()
269            .remove_listener(Some(event))
270            .await;
271    }
272
273    /// 断开连接
274    pub fn disconnect(&mut self) {
275        self.connection.disconnect();
276    }
277
278    /// 通过已连接的 chat-channel 发送私聊消息
279    ///
280    /// 该方法要求先调用 `connect(..., Some(to_user))` 建立目标会话连接。
281    pub fn send_ws(&self, content: &str) -> Result<(), Error> {
282        self.connection
283            .send_text(content)
284            .map_err(|e| Error::Api(format!("WS send failed: {}", e)))
285    }
286
287    /// 获取有私聊用户列表第一条消息
288    ///
289    /// 返回 私聊消息列表
290    pub async fn list(&self) -> Result<Vec<ChatData>, Error> {
291        let url = build_http_path("chat/get-list", &[("apiKey", self.api_key.clone())]);
292
293        let resp = get(&url).await?;
294
295        if let Some(code) = resp.get("code").and_then(|c| c.as_i64())
296            && code != 0
297        {
298            return Err(Error::Api(
299                resp["msg"].as_str().unwrap_or("API error").to_string(),
300            ));
301        }
302
303        let mut chat_list = Vec::new();
304        if let Some(list) = resp["data"].as_array() {
305            for item in list {
306                let chat_data = ChatData::from_value(item)?;
307                chat_list.push(chat_data);
308            }
309        }
310
311        Ok(chat_list)
312    }
313
314    /// 获取用户私聊历史消息
315    ///
316    /// * `page` 页数
317    /// * `size` 每页消息数量
318    /// * `autoread` 是否自动标记为已读
319    ///
320    /// 返回 私聊消息列表
321    pub async fn history(
322        &self,
323        user: String,
324        page: u32,
325        size: u32,
326        autoread: bool,
327    ) -> Result<Vec<ChatData>, Error> {
328        let url = build_http_path(
329            "chat/get-message",
330            &[
331                ("apiKey", self.api_key.clone()),
332                ("page", page.to_string()),
333                ("pageSize", size.to_string()),
334                ("toUser", user.clone()),
335            ],
336        );
337        let resp = get(&url).await?;
338        if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
339            && code != 0
340        {
341            return Err(Error::Api(
342                resp["msg"].as_str().unwrap_or("API error").to_string(),
343            ));
344        }
345        let mut chat_list = Vec::new();
346        if let Some(list) = resp["data"].as_array() {
347            for item in list {
348                let chat_data = ChatData::from_value(item)?;
349                chat_list.push(chat_data);
350            }
351        }
352        if autoread {
353            self.mark_as_read(user).await?;
354        }
355        Ok(chat_list)
356    }
357
358    /// 标记用户消息已读
359    ///
360    /// - `user` 用户名
361    ///
362    /// 返回 执行结果
363    pub async fn mark_as_read(&self, user: String) -> Result<bool, Error> {
364        let to_user_url = build_http_path(
365            "chat/mark-as-read",
366            &[("toUser", user.clone()), ("apiKey", self.api_key.clone())],
367        );
368        let first = get(&to_user_url).await;
369        match first {
370            Ok(resp) => {
371                if let Some(code) = resp.get("result").and_then(|c| c.as_i64()) {
372                    if code == 0 {
373                        return Ok(true);
374                    }
375                    let msg = resp["msg"].as_str().unwrap_or("API error").to_string();
376                    let need_from_user_retry =
377                        msg.contains("fromUserJSON") || msg.contains("Cannot invoke");
378                    if !need_from_user_retry {
379                        return Err(Error::Api(msg));
380                    }
381
382                    // Some backend nodes require fromUser for mark-as-read.
383                    let from_user_url = build_http_path(
384                        "chat/mark-as-read",
385                        &[("fromUser", user), ("apiKey", self.api_key.clone())],
386                    );
387                    let resp = get(&from_user_url).await?;
388                    if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
389                        && code != 0
390                    {
391                        return Err(Error::Api(
392                            resp["msg"].as_str().unwrap_or("API error").to_string(),
393                        ));
394                    }
395                    return Ok(true);
396                }
397                Ok(false)
398            }
399            Err(err) => {
400                let err_text = err.to_string();
401                if !(err_text.contains("fromUserJSON") || err_text.contains("Cannot invoke")) {
402                    return Err(err);
403                }
404
405                // Some backend nodes require fromUser for mark-as-read.
406                let from_user_url = build_http_path(
407                    "chat/mark-as-read",
408                    &[("fromUser", user), ("apiKey", self.api_key.clone())],
409                );
410                let resp = get(&from_user_url).await?;
411                if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
412                    && code != 0
413                {
414                    return Err(Error::Api(
415                        resp["msg"].as_str().unwrap_or("API error").to_string(),
416                    ));
417                }
418                Ok(true)
419            }
420        }
421    }
422
423    /// 获取未读消息
424    ///
425    /// 返回 未读消息列表
426    pub async fn unread(&self) -> Result<Vec<ChatData>, Error> {
427        let url = build_http_path("chat/has-unread", &[("apiKey", self.api_key.clone())]);
428        let resp = get(&url).await?;
429
430        let unread_len = resp["result"].as_i64().unwrap_or(0);
431        if unread_len == 0 {
432            return Ok(Vec::new());
433        }
434
435        let chat_list = resp["data"]
436            .as_array()
437            .ok_or_else(|| Error::Api("Data is not an array".to_string()))?
438            .iter()
439            .map(ChatData::from_value)
440            .collect::<Result<Vec<_>, _>>()?;
441
442        Ok(chat_list)
443    }
444
445    /// 撤回私聊消息
446    ///
447    /// - `msgId` 消息 ID
448    ///
449    /// 返回 执行结果
450    pub async fn revoke(&self, msg_id: &str) -> Result<bool, Error> {
451        let url = build_http_path(
452            "chat/revoke",
453            &[("apiKey", self.api_key.clone()), ("oId", msg_id.to_string())],
454        );
455        let resp = get(&url).await?;
456
457        if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
458            && code != 0
459        {
460            return Err(Error::Api(
461                resp["msg"].as_str().unwrap_or("API error").to_string(),
462            ));
463        }
464
465        Ok(true)
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::{ChatEventData, ChatEventType, parse_chat_message};
472    use serde_json::json;
473
474    #[test]
475    fn parse_chat_notice_message() {
476        let payload = json!({
477            "type": "notice",
478            "data": {
479                "command": "notice",
480                "userId": "u1",
481                "preview": "hi",
482                "senderAvatar": "a",
483                "senderUserName": "bob"
484            }
485        });
486
487        let (event_type, event) = parse_chat_message(&payload).expect("should parse");
488        assert!(matches!(event_type, ChatEventType::Notice));
489        match event {
490            ChatEventData::Notice(n) => assert_eq!(n.preview, "hi"),
491            _ => panic!("unexpected event variant"),
492        }
493    }
494
495    #[test]
496    fn parse_chat_invalid_type_fails() {
497        let payload = json!({
498            "type": "unknown",
499            "data": {}
500        });
501
502        assert!(parse_chat_message(&payload).is_err());
503    }
504
505    #[test]
506    fn parse_chat_notice_without_type_field() {
507        let payload = json!({
508            "command": "notice",
509            "data": {
510                "command": "notice",
511                "userId": "u1",
512                "preview": "hello",
513                "senderAvatar": "a",
514                "senderUserName": "alice"
515            }
516        });
517
518        let (event_type, event) = parse_chat_message(&payload).expect("should parse");
519        assert!(matches!(event_type, ChatEventType::Notice));
520        match event {
521            ChatEventData::Notice(n) => assert_eq!(n.preview, "hello"),
522            _ => panic!("unexpected event variant"),
523        }
524    }
525}