use crate::{
api::ws::{
ParsedMessageHandler, RetryPolicy, WebSocketError, WsConnection, WsLogHook, build_ws_url,
},
model::chat::{ChatData, ChatMsgType, ChatNotice, ChatRevoke},
utils::{build_http_path, error::Error, get},
};
use serde_json::Value;
use std::{str::FromStr, sync::Arc};
const DOMAIN: &str = "fishpi.cn";
#[derive(Clone, Debug)]
pub enum ChatEventData {
Notice(ChatNotice),
Data(ChatData),
Revoke(ChatRevoke),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ChatEventType {
Notice,
Data,
Revoke,
}
pub type ChatListener = Arc<dyn Fn(ChatEventData) + Send + Sync + 'static>;
pub type ChatHandler = ParsedMessageHandler<ChatEventType, ChatEventData>;
#[allow(non_snake_case)]
fn parse_chat_message(json: &Value) -> Result<(ChatEventType, ChatEventData), Error> {
let event_type = detect_chat_msg_type(json)?;
let payload = json.get("data").filter(|v| !v.is_null()).unwrap_or(json);
match event_type {
ChatMsgType::Notice => {
let notice = ChatNotice::from_value(payload).or_else(|_| ChatNotice::from_value(json))?;
Ok((ChatEventType::Notice, ChatEventData::Notice(notice)))
}
ChatMsgType::Data => {
let data = ChatData::from_value(payload).or_else(|_| ChatData::from_value(json))?;
Ok((ChatEventType::Data, ChatEventData::Data(data)))
}
ChatMsgType::Revoke => {
let revoke = ChatRevoke::from_value(payload).or_else(|_| ChatRevoke::from_value(json))?;
Ok((ChatEventType::Revoke, ChatEventData::Revoke(revoke)))
}
}
}
fn detect_chat_msg_type(json: &Value) -> Result<ChatMsgType, Error> {
let candidates = [
json.get("type"),
json.get("command"),
json.get("data").and_then(|v| v.get("type")),
json.get("data").and_then(|v| v.get("command")),
];
for candidate in candidates {
if let Some(raw) = candidate.and_then(|v| v.as_str())
&& let Ok(t) = ChatMsgType::from_str(raw)
{
return Ok(t);
}
}
let payload = json.get("data").filter(|v| v.is_object()).unwrap_or(json);
if payload.get("senderUserName").is_some() && payload.get("receiverUserName").is_some() {
return Ok(ChatMsgType::Data);
}
if payload.get("userId").is_some() && payload.get("preview").is_some() {
return Ok(ChatMsgType::Notice);
}
if payload.get("data").and_then(|v| v.as_str()).is_some() {
return Ok(ChatMsgType::Revoke);
}
Err(Error::Parse("Missing type/command field".to_string()))
}
pub struct Chat {
connection: WsConnection,
handler: ChatHandler,
api_key: String,
}
impl Chat {
pub fn new(api_key: String) -> Self {
Self {
connection: WsConnection::new(),
handler: ChatHandler::new(parse_chat_message, None, "chat"),
api_key,
}
}
fn ws_url(&self, user: Option<&str>) -> Result<String, WebSocketError> {
let mut params = vec![("apiKey", self.api_key.clone())];
let path = if let Some(user) = user {
params.push(("toUser", user.to_string()));
"chat-channel"
} else {
"user-channel"
};
build_ws_url(DOMAIN, path, ¶ms)
}
pub async fn connect(
&mut self,
reload: bool,
user: Option<String>,
) -> Result<(), WebSocketError> {
let url = self.ws_url(user.as_deref())?;
self.connection
.connect(reload, &url, self.handler.clone())
.await
}
pub async fn reconnect(&mut self, user: Option<String>) -> Result<(), WebSocketError> {
let url = self.ws_url(user.as_deref())?;
self.connection.reconnect(&url, self.handler.clone()).await
}
pub fn set_reconnect_policy(&mut self, policy: RetryPolicy) {
self.connection.set_retry_policy(policy);
}
pub fn on_ws_log<F>(&mut self, hook: F)
where
F: Fn(&str) + Send + Sync + 'static,
{
let hook = Arc::new(hook) as WsLogHook;
self.connection.set_log_hook_arc(hook.clone());
self.handler.set_log_hook_arc(hook);
}
pub async fn on_notice<F>(&self, listener: F)
where
F: Fn(ChatNotice) + Send + Sync + 'static,
{
self.add_listener(ChatEventType::Notice, move |event: ChatEventData| {
if let ChatEventData::Notice(notice) = event {
listener(notice);
}
}).await;
}
pub async fn on_data<F>(&self, listener: F)
where
F: Fn(ChatData) + Send + Sync + 'static,
{
self.add_listener(ChatEventType::Data, move |event: ChatEventData| {
if let ChatEventData::Data(data) = event {
listener(data);
}
}).await;
}
pub async fn on_revoke<F>(&self, listener: F)
where
F: Fn(ChatRevoke) + Send + Sync + 'static,
{
self.add_listener(ChatEventType::Revoke, move |event: ChatEventData| {
if let ChatEventData::Revoke(revoke) = event {
listener(revoke);
}
}).await;
}
async fn add_listener<F>(&self, event: ChatEventType, listener: F)
where
F: Fn(ChatEventData) + Send + Sync + 'static,
{
self.handler.get_emitter().add_listener(event, listener).await;
}
pub async fn off(&self, event: ChatEventType) {
self.handler
.get_emitter()
.remove_listener(Some(event))
.await;
}
pub fn disconnect(&mut self) {
self.connection.disconnect();
}
pub fn send_ws(&self, content: &str) -> Result<(), Error> {
self.connection
.send_text(content)
.map_err(|e| Error::Api(format!("WS send failed: {}", e)))
}
pub async fn list(&self) -> Result<Vec<ChatData>, Error> {
let url = build_http_path("chat/get-list", &[("apiKey", self.api_key.clone())]);
let resp = get(&url).await?;
if let Some(code) = resp.get("code").and_then(|c| c.as_i64())
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("API error").to_string(),
));
}
let mut chat_list = Vec::new();
if let Some(list) = resp["data"].as_array() {
for item in list {
let chat_data = ChatData::from_value(item)?;
chat_list.push(chat_data);
}
}
Ok(chat_list)
}
pub async fn history(
&self,
user: String,
page: u32,
size: u32,
autoread: bool,
) -> Result<Vec<ChatData>, Error> {
let url = build_http_path(
"chat/get-message",
&[
("apiKey", self.api_key.clone()),
("page", page.to_string()),
("pageSize", size.to_string()),
("toUser", user.clone()),
],
);
let resp = get(&url).await?;
if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("API error").to_string(),
));
}
let mut chat_list = Vec::new();
if let Some(list) = resp["data"].as_array() {
for item in list {
let chat_data = ChatData::from_value(item)?;
chat_list.push(chat_data);
}
}
if autoread {
self.mark_as_read(user).await?;
}
Ok(chat_list)
}
pub async fn mark_as_read(&self, user: String) -> Result<bool, Error> {
let to_user_url = build_http_path(
"chat/mark-as-read",
&[("toUser", user.clone()), ("apiKey", self.api_key.clone())],
);
let first = get(&to_user_url).await;
match first {
Ok(resp) => {
if let Some(code) = resp.get("result").and_then(|c| c.as_i64()) {
if code == 0 {
return Ok(true);
}
let msg = resp["msg"].as_str().unwrap_or("API error").to_string();
let need_from_user_retry =
msg.contains("fromUserJSON") || msg.contains("Cannot invoke");
if !need_from_user_retry {
return Err(Error::Api(msg));
}
let from_user_url = build_http_path(
"chat/mark-as-read",
&[("fromUser", user), ("apiKey", self.api_key.clone())],
);
let resp = get(&from_user_url).await?;
if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("API error").to_string(),
));
}
return Ok(true);
}
Ok(false)
}
Err(err) => {
let err_text = err.to_string();
if !(err_text.contains("fromUserJSON") || err_text.contains("Cannot invoke")) {
return Err(err);
}
let from_user_url = build_http_path(
"chat/mark-as-read",
&[("fromUser", user), ("apiKey", self.api_key.clone())],
);
let resp = get(&from_user_url).await?;
if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("API error").to_string(),
));
}
Ok(true)
}
}
}
pub async fn unread(&self) -> Result<Vec<ChatData>, Error> {
let url = build_http_path("chat/has-unread", &[("apiKey", self.api_key.clone())]);
let resp = get(&url).await?;
let unread_len = resp["result"].as_i64().unwrap_or(0);
if unread_len == 0 {
return Ok(Vec::new());
}
let chat_list = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?
.iter()
.map(ChatData::from_value)
.collect::<Result<Vec<_>, _>>()?;
Ok(chat_list)
}
pub async fn revoke(&self, msg_id: &str) -> Result<bool, Error> {
let url = build_http_path(
"chat/revoke",
&[("apiKey", self.api_key.clone()), ("oId", msg_id.to_string())],
);
let resp = get(&url).await?;
if let Some(code) = resp.get("result").and_then(|c| c.as_i64())
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("API error").to_string(),
));
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::{ChatEventData, ChatEventType, parse_chat_message};
use serde_json::json;
#[test]
fn parse_chat_notice_message() {
let payload = json!({
"type": "notice",
"data": {
"command": "notice",
"userId": "u1",
"preview": "hi",
"senderAvatar": "a",
"senderUserName": "bob"
}
});
let (event_type, event) = parse_chat_message(&payload).expect("should parse");
assert!(matches!(event_type, ChatEventType::Notice));
match event {
ChatEventData::Notice(n) => assert_eq!(n.preview, "hi"),
_ => panic!("unexpected event variant"),
}
}
#[test]
fn parse_chat_invalid_type_fails() {
let payload = json!({
"type": "unknown",
"data": {}
});
assert!(parse_chat_message(&payload).is_err());
}
#[test]
fn parse_chat_notice_without_type_field() {
let payload = json!({
"command": "notice",
"data": {
"command": "notice",
"userId": "u1",
"preview": "hello",
"senderAvatar": "a",
"senderUserName": "alice"
}
});
let (event_type, event) = parse_chat_message(&payload).expect("should parse");
assert!(matches!(event_type, ChatEventType::Notice));
match event {
ChatEventData::Notice(n) => assert_eq!(n.preview, "hello"),
_ => panic!("unexpected event variant"),
}
}
}