1use crate::{
78 api::ws::{
79 ParsedMessageHandler, RetryPolicy, WebSocketError, WsConnection, WsLogHook, build_ws_url,
80 },
81 model::chat::{ChatData, ChatMsgType, ChatNotice, ChatRevoke},
82 utils::{build_http_path, error::Error, get},
83};
84use serde_json::Value;
85use std::{str::FromStr, sync::Arc};
86
87const DOMAIN: &str = "fishpi.cn";
88
89#[derive(Clone, Debug)]
90pub enum ChatEventData {
91 Notice(ChatNotice),
92 Data(ChatData),
93 Revoke(ChatRevoke),
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
98pub enum ChatEventType {
99 Notice,
100 Data,
101 Revoke,
102}
103pub type ChatListener = Arc<dyn Fn(ChatEventData) + Send + Sync + 'static>;
104
105pub type ChatHandler = ParsedMessageHandler<ChatEventType, ChatEventData>;
107
108#[allow(non_snake_case)]
110fn parse_chat_message(json: &Value) -> Result<(ChatEventType, ChatEventData), Error> {
111 let event_type = detect_chat_msg_type(json)?;
112 let payload = json.get("data").filter(|v| !v.is_null()).unwrap_or(json);
113
114 match event_type {
115 ChatMsgType::Notice => {
116 let notice = ChatNotice::from_value(payload).or_else(|_| ChatNotice::from_value(json))?;
117 Ok((ChatEventType::Notice, ChatEventData::Notice(notice)))
118 }
119 ChatMsgType::Data => {
120 let data = ChatData::from_value(payload).or_else(|_| ChatData::from_value(json))?;
121 Ok((ChatEventType::Data, ChatEventData::Data(data)))
122 }
123 ChatMsgType::Revoke => {
124 let revoke = ChatRevoke::from_value(payload).or_else(|_| ChatRevoke::from_value(json))?;
125 Ok((ChatEventType::Revoke, ChatEventData::Revoke(revoke)))
126 }
127 }
128}
129
130fn detect_chat_msg_type(json: &Value) -> Result<ChatMsgType, Error> {
131 let candidates = [
132 json.get("type"),
133 json.get("command"),
134 json.get("data").and_then(|v| v.get("type")),
135 json.get("data").and_then(|v| v.get("command")),
136 ];
137
138 for candidate in candidates {
139 if let Some(raw) = candidate.and_then(|v| v.as_str())
140 && let Ok(t) = ChatMsgType::from_str(raw)
141 {
142 return Ok(t);
143 }
144 }
145
146 let payload = json.get("data").filter(|v| v.is_object()).unwrap_or(json);
147
148 if payload.get("senderUserName").is_some() && payload.get("receiverUserName").is_some() {
150 return Ok(ChatMsgType::Data);
151 }
152 if payload.get("userId").is_some() && payload.get("preview").is_some() {
153 return Ok(ChatMsgType::Notice);
154 }
155 if payload.get("data").and_then(|v| v.as_str()).is_some() {
156 return Ok(ChatMsgType::Revoke);
157 }
158
159 Err(Error::Parse("Missing type/command field".to_string()))
160}
161
162pub struct Chat {
164 connection: WsConnection,
165 handler: ChatHandler,
166 api_key: String,
167}
168
169impl Chat {
170 pub fn new(api_key: String) -> Self {
171 Self {
172 connection: WsConnection::new(),
173 handler: ChatHandler::new(parse_chat_message, None, "chat"),
174 api_key,
175 }
176 }
177
178 fn ws_url(&self, user: Option<&str>) -> Result<String, WebSocketError> {
179 let mut params = vec![("apiKey", self.api_key.clone())];
180 let path = if let Some(user) = user {
181 params.push(("toUser", user.to_string()));
182 "chat-channel"
183 } else {
184 "user-channel"
185 };
186
187 build_ws_url(DOMAIN, path, ¶ms)
188 }
189
190 pub async fn connect(
191 &mut self,
192 reload: bool,
193 user: Option<String>,
194 ) -> Result<(), WebSocketError> {
195 let url = self.ws_url(user.as_deref())?;
196
197 self.connection
198 .connect(reload, &url, self.handler.clone())
199 .await
200 }
201
202 pub async fn reconnect(&mut self, user: Option<String>) -> Result<(), WebSocketError> {
204 let url = self.ws_url(user.as_deref())?;
205
206 self.connection.reconnect(&url, self.handler.clone()).await
207 }
208
209 pub fn set_reconnect_policy(&mut self, policy: RetryPolicy) {
210 self.connection.set_retry_policy(policy);
211 }
212
213 pub fn on_ws_log<F>(&mut self, hook: F)
214 where
215 F: Fn(&str) + Send + Sync + 'static,
216 {
217 let hook = Arc::new(hook) as WsLogHook;
218 self.connection.set_log_hook_arc(hook.clone());
219 self.handler.set_log_hook_arc(hook);
220 }
221
222 pub async fn on_notice<F>(&self, listener: F)
224 where
225 F: Fn(ChatNotice) + Send + Sync + 'static,
226 {
227 self.add_listener(ChatEventType::Notice, move |event: ChatEventData| {
228 if let ChatEventData::Notice(notice) = event {
229 listener(notice);
230 }
231 }).await;
232 }
233
234 pub async fn on_data<F>(&self, listener: F)
236 where
237 F: Fn(ChatData) + Send + Sync + 'static,
238 {
239 self.add_listener(ChatEventType::Data, move |event: ChatEventData| {
240 if let ChatEventData::Data(data) = event {
241 listener(data);
242 }
243 }).await;
244 }
245
246 pub async fn on_revoke<F>(&self, listener: F)
248 where
249 F: Fn(ChatRevoke) + Send + Sync + 'static,
250 {
251 self.add_listener(ChatEventType::Revoke, move |event: ChatEventData| {
252 if let ChatEventData::Revoke(revoke) = event {
253 listener(revoke);
254 }
255 }).await;
256 }
257
258 async fn add_listener<F>(&self, event: ChatEventType, listener: F)
259 where
260 F: Fn(ChatEventData) + Send + Sync + 'static,
261 {
262 self.handler.get_emitter().add_listener(event, listener).await;
263 }
264
265 pub async fn off(&self, event: ChatEventType) {
267 self.handler
268 .get_emitter()
269 .remove_listener(Some(event))
270 .await;
271 }
272
273 pub fn disconnect(&mut self) {
275 self.connection.disconnect();
276 }
277
278 pub fn send_ws(&self, content: &str) -> Result<(), Error> {
282 self.connection
283 .send_text(content)
284 .map_err(|e| Error::Api(format!("WS send failed: {}", e)))
285 }
286
287 pub async fn list(&self) -> Result<Vec<ChatData>, Error> {
291 let url = build_http_path("chat/get-list", &[("apiKey", self.api_key.clone())]);
292
293 let resp = get(&url).await?;
294
295 if let Some(code) = resp.get("code").and_then(|c| c.as_i64())
296 && code != 0
297 {
298 return Err(Error::Api(
299 resp["msg"].as_str().unwrap_or("API error").to_string(),
300 ));
301 }
302
303 let mut chat_list = Vec::new();
304 if let Some(list) = resp["data"].as_array() {
305 for item in list {
306 let chat_data = ChatData::from_value(item)?;
307 chat_list.push(chat_data);
308 }
309 }
310
311 Ok(chat_list)
312 }
313
314 pub async fn history(
322 &self,
323 user: String,
324 page: u32,
325 size: u32,
326 autoread: bool,
327 ) -> Result<Vec<ChatData>, Error> {
328 let url = build_http_path(
329 "chat/get-message",
330 &[
331 ("apiKey", self.api_key.clone()),
332 ("page", page.to_string()),
333 ("pageSize", size.to_string()),
334 ("toUser", user.clone()),
335 ],
336 );
337 let resp = get(&url).await?;
338 if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
339 && code != 0
340 {
341 return Err(Error::Api(
342 resp["msg"].as_str().unwrap_or("API error").to_string(),
343 ));
344 }
345 let mut chat_list = Vec::new();
346 if let Some(list) = resp["data"].as_array() {
347 for item in list {
348 let chat_data = ChatData::from_value(item)?;
349 chat_list.push(chat_data);
350 }
351 }
352 if autoread {
353 self.mark_as_read(user).await?;
354 }
355 Ok(chat_list)
356 }
357
358 pub async fn mark_as_read(&self, user: String) -> Result<bool, Error> {
364 let to_user_url = build_http_path(
365 "chat/mark-as-read",
366 &[("toUser", user.clone()), ("apiKey", self.api_key.clone())],
367 );
368 let first = get(&to_user_url).await;
369 match first {
370 Ok(resp) => {
371 if let Some(code) = resp.get("result").and_then(|c| c.as_i64()) {
372 if code == 0 {
373 return Ok(true);
374 }
375 let msg = resp["msg"].as_str().unwrap_or("API error").to_string();
376 let need_from_user_retry =
377 msg.contains("fromUserJSON") || msg.contains("Cannot invoke");
378 if !need_from_user_retry {
379 return Err(Error::Api(msg));
380 }
381
382 let from_user_url = build_http_path(
384 "chat/mark-as-read",
385 &[("fromUser", user), ("apiKey", self.api_key.clone())],
386 );
387 let resp = get(&from_user_url).await?;
388 if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
389 && code != 0
390 {
391 return Err(Error::Api(
392 resp["msg"].as_str().unwrap_or("API error").to_string(),
393 ));
394 }
395 return Ok(true);
396 }
397 Ok(false)
398 }
399 Err(err) => {
400 let err_text = err.to_string();
401 if !(err_text.contains("fromUserJSON") || err_text.contains("Cannot invoke")) {
402 return Err(err);
403 }
404
405 let from_user_url = build_http_path(
407 "chat/mark-as-read",
408 &[("fromUser", user), ("apiKey", self.api_key.clone())],
409 );
410 let resp = get(&from_user_url).await?;
411 if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
412 && code != 0
413 {
414 return Err(Error::Api(
415 resp["msg"].as_str().unwrap_or("API error").to_string(),
416 ));
417 }
418 Ok(true)
419 }
420 }
421 }
422
423 pub async fn unread(&self) -> Result<Vec<ChatData>, Error> {
427 let url = build_http_path("chat/has-unread", &[("apiKey", self.api_key.clone())]);
428 let resp = get(&url).await?;
429
430 let unread_len = resp["result"].as_i64().unwrap_or(0);
431 if unread_len == 0 {
432 return Ok(Vec::new());
433 }
434
435 let chat_list = resp["data"]
436 .as_array()
437 .ok_or_else(|| Error::Api("Data is not an array".to_string()))?
438 .iter()
439 .map(ChatData::from_value)
440 .collect::<Result<Vec<_>, _>>()?;
441
442 Ok(chat_list)
443 }
444
445 pub async fn revoke(&self, msg_id: &str) -> Result<bool, Error> {
451 let url = build_http_path(
452 "chat/revoke",
453 &[("apiKey", self.api_key.clone()), ("oId", msg_id.to_string())],
454 );
455 let resp = get(&url).await?;
456
457 if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
458 && code != 0
459 {
460 return Err(Error::Api(
461 resp["msg"].as_str().unwrap_or("API error").to_string(),
462 ));
463 }
464
465 Ok(true)
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::{ChatEventData, ChatEventType, parse_chat_message};
472 use serde_json::json;
473
474 #[test]
475 fn parse_chat_notice_message() {
476 let payload = json!({
477 "type": "notice",
478 "data": {
479 "command": "notice",
480 "userId": "u1",
481 "preview": "hi",
482 "senderAvatar": "a",
483 "senderUserName": "bob"
484 }
485 });
486
487 let (event_type, event) = parse_chat_message(&payload).expect("should parse");
488 assert!(matches!(event_type, ChatEventType::Notice));
489 match event {
490 ChatEventData::Notice(n) => assert_eq!(n.preview, "hi"),
491 _ => panic!("unexpected event variant"),
492 }
493 }
494
495 #[test]
496 fn parse_chat_invalid_type_fails() {
497 let payload = json!({
498 "type": "unknown",
499 "data": {}
500 });
501
502 assert!(parse_chat_message(&payload).is_err());
503 }
504
505 #[test]
506 fn parse_chat_notice_without_type_field() {
507 let payload = json!({
508 "command": "notice",
509 "data": {
510 "command": "notice",
511 "userId": "u1",
512 "preview": "hello",
513 "senderAvatar": "a",
514 "senderUserName": "alice"
515 }
516 });
517
518 let (event_type, event) = parse_chat_message(&payload).expect("should parse");
519 assert!(matches!(event_type, ChatEventType::Notice));
520 match event {
521 ChatEventData::Notice(n) => assert_eq!(n.preview, "hello"),
522 _ => panic!("unexpected event variant"),
523 }
524 }
525}