use crate::api::ws::{
ParsedMessageHandler, RetryPolicy, WebSocketError, WsConnection, WsLogHook, build_ws_url,
};
use crate::model::MuteItem;
use crate::model::chatroom::{
BarragerCost, BarragerMsg, ChatContentType, ChatRoomMessageMode, ChatRoomMessageType,
ChatRoomMsg, ClientType, CustomMsg, OnlineInfo, RevokeMsg,
};
use crate::model::redpacket::RedPacketStatusMsg;
use crate::utils::get_text;
use crate::utils::{build_http_path, delete, error::Error, get, post};
use serde_json::{Value, json};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use url::Url;
#[derive(Debug, Clone, serde::Deserialize)]
#[allow(non_snake_case)]
pub struct ChatRoomNodeResponse {
pub msg: String,
pub code: i32,
pub data: String,
pub apiKey: String,
pub avaliable: Vec<ChatRoomAvailableNode>,
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ChatRoomAvailableNode {
pub node: String,
pub name: String,
pub weight: u32,
pub online: u32,
}
#[derive(Debug, Clone)]
pub enum ChatRoomEventData {
Online(Vec<OnlineInfo>),
DiscussChanged(String),
Revoke(String),
Msg(ChatRoomMsg),
Barrager(BarragerMsg),
RedPacket(ChatRoomMsg<Value>),
RedPacketStatus(RedPacketStatusMsg),
Music(ChatRoomMsg<Value>),
Weather(ChatRoomMsg<Value>),
Custom(CustomMsg),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ChatRoomEventType {
Online,
DiscussChanged,
Revoke,
Msg,
Barrager,
RedPacket,
RedPacketStatus,
Music,
Weather,
Custom,
All,
}
pub type ChatRoomListener = Arc<dyn Fn(ChatRoomEventData) + Send + Sync + 'static>;
pub type ChatRoomHandler = ParsedMessageHandler<ChatRoomEventType, ChatRoomEventData>;
#[allow(non_snake_case)]
fn parse_chatroom_message(json: &Value) -> Result<(ChatRoomEventType, ChatRoomEventData), Error> {
let type_str = json["type"]
.as_str()
.ok_or_else(|| Error::Parse("Missing type in message".to_string()))?;
let r#type = ChatRoomMessageType::from_str(type_str)
.map_err(|_| Error::Parse(format!("Unknown message type: {}", type_str)))?;
match r#type {
ChatRoomMessageType::Online => {
if let Some(users) = json["users"].as_array() {
let online_info: Vec<OnlineInfo> = users
.iter()
.filter_map(|u| {
Some(OnlineInfo {
homePage: u["homePage"].as_str()?.to_string(),
userAvatarURL: u["userAvatarURL"].as_str()?.to_string(),
userName: u["userName"].as_str()?.to_string(),
})
})
.collect();
Ok((
ChatRoomEventType::Online,
ChatRoomEventData::Online(online_info),
))
} else {
Err(Error::Parse("Missing users in online message".to_string()))
}
}
ChatRoomMessageType::DiscussChanged => {
let new_discuss = json["newDiscuss"]
.as_str()
.ok_or_else(|| Error::Parse("Missing newDiscuss".to_string()))?
.to_string();
Ok((
ChatRoomEventType::DiscussChanged,
ChatRoomEventData::DiscussChanged(new_discuss),
))
}
ChatRoomMessageType::Revoke => {
let o_id = json["oId"]
.as_str()
.ok_or_else(|| Error::Parse("Missing oId in revoke".to_string()))?
.to_string();
Ok((ChatRoomEventType::Revoke, ChatRoomEventData::Revoke(o_id)))
}
ChatRoomMessageType::Msg => {
let chat_msg = ChatRoomMsg::from_value(json)?;
if let Value::Object(ref obj) = chat_msg.content {
if obj.get("msgType").and_then(|v| v.as_str()) == Some("music") {
Ok((ChatRoomEventType::Music, ChatRoomEventData::Music(chat_msg)))
} else if obj.get("msgType").and_then(|v| v.as_str()) == Some("weather") {
Ok((
ChatRoomEventType::Weather,
ChatRoomEventData::Weather(chat_msg),
))
} else {
Ok((ChatRoomEventType::Msg, ChatRoomEventData::Msg(chat_msg)))
}
} else {
Ok((ChatRoomEventType::Msg, ChatRoomEventData::Msg(chat_msg)))
}
}
ChatRoomMessageType::RedPacket => {
let redpacket_msg = ChatRoomMsg::from_value(json)?;
Ok((
ChatRoomEventType::RedPacket,
ChatRoomEventData::RedPacket(redpacket_msg),
))
}
ChatRoomMessageType::Barrager => {
let barrager = BarragerMsg::from_value(json)?;
Ok((
ChatRoomEventType::Barrager,
ChatRoomEventData::Barrager(barrager),
))
}
ChatRoomMessageType::Custom => {
let message = json["message"]
.as_str()
.ok_or_else(|| Error::Parse("Missing message in custom".to_string()))?
.to_string();
Ok((
ChatRoomEventType::Custom,
ChatRoomEventData::Custom(CustomMsg { message }),
))
}
ChatRoomMessageType::RedPacketStatus => {
let redpacket_status = RedPacketStatusMsg::from_value(json)?;
Ok((
ChatRoomEventType::RedPacketStatus,
ChatRoomEventData::RedPacketStatus(redpacket_status),
))
}
}
}
pub struct ChatRoom {
connection: WsConnection,
handler: ChatRoomHandler,
api_key: String,
discuss: Arc<Mutex<String>>,
onlines: Arc<Mutex<Vec<OnlineInfo>>>,
client: ClientType,
version: String,
}
impl ChatRoom {
pub fn new(api_key: String) -> Self {
Self {
connection: WsConnection::new(),
handler: ChatRoomHandler::new(
parse_chatroom_message,
Some(ChatRoomEventType::All),
"chatroom",
),
api_key,
discuss: Arc::new(Mutex::new(String::new())),
onlines: Arc::new(Mutex::new(Vec::new())),
client: ClientType::Rust,
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
pub async fn get_node(&self) -> Result<ChatRoomNodeResponse, WebSocketError> {
let url = build_http_path("chat-room/node/get", &[("apiKey", self.api_key.clone())]);
let response: Value = get(&url)
.await
.map_err(|e| WebSocketError::Other(format!("请求失败:{}", e)))?;
let code = response["code"].as_i64().unwrap_or(-1) as i32;
if code != 0 {
let msg = response["msg"].as_str().unwrap_or("未知错误");
return Err(WebSocketError::Other(format!("获取节点失败:{}", msg)));
}
let node_response: ChatRoomNodeResponse = serde_json::from_value(response)
.map_err(|e| WebSocketError::Other(format!("解析节点信息失败:{}", e)))?;
Ok(node_response)
}
pub async fn get_ws_url(&self) -> Result<String, WebSocketError> {
match self.get_node().await {
Ok(node_response) => {
let mut parsed = Url::parse(&node_response.data)
.map_err(|e| WebSocketError::Other(format!("URL parse error: {}", e)))?;
if parsed.path() == "" {
parsed.set_path("/");
}
Ok(parsed.to_string())
}
Err(_) => build_ws_url(
"fishpi.cn",
"chat-room-channel",
&[("apiKey", self.api_key.clone())],
),
}
}
pub async fn connect(&mut self, reload: bool) -> Result<(), WebSocketError> {
let url = self.get_ws_url().await?;
self.connection
.connect(reload, &url, self.handler.clone())
.await
}
pub async fn reconnect(&mut self) -> Result<(), WebSocketError> {
let url = self.get_ws_url().await?;
self.connection.reconnect(&url, self.handler.clone()).await
}
pub fn set_reconnect_policy(&mut self, policy: RetryPolicy) {
self.connection.set_retry_policy(policy);
}
pub fn on_ws_log<F>(&mut self, hook: F)
where
F: Fn(&str) + Send + Sync + 'static,
{
let hook = Arc::new(hook) as WsLogHook;
self.connection.set_log_hook_arc(hook.clone());
self.handler.set_log_hook_arc(hook);
}
pub async fn on_online<F>(&self, listener: F)
where
F: Fn(Vec<OnlineInfo>) + Send + Sync + 'static,
{
let onlines = Arc::clone(&self.onlines);
let wrapped_listener: ChatRoomListener = Arc::new(move |event: ChatRoomEventData| {
if let ChatRoomEventData::Online(users) = event {
if let Ok(mut onlines_guard) = onlines.try_lock() {
*onlines_guard = users.clone();
}
listener(users);
}
});
self.handler
.get_emitter()
.add_listener(ChatRoomEventType::Online, move |event| wrapped_listener(event))
.await;
}
pub async fn on_discuss<F>(&self, listener: F)
where
F: Fn(String) + Send + Sync + 'static,
{
let discuss = Arc::clone(&self.discuss);
let wrapped_listener: ChatRoomListener = Arc::new(move |event: ChatRoomEventData| {
if let ChatRoomEventData::DiscussChanged(topic) = event {
if let Ok(mut discuss_guard) = discuss.try_lock() {
*discuss_guard = topic.clone();
}
listener(topic);
}
});
self.handler
.get_emitter()
.add_listener(ChatRoomEventType::DiscussChanged, move |event| wrapped_listener(event))
.await;
}
pub async fn on_revoke<F>(&self, listener: F)
where
F: Fn(String) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Revoke,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Revoke(msg_id) = event {
listener(msg_id);
}
},
)
.await;
}
pub async fn on_msg<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg) + Send + Sync + 'static,
{
self.add_listener(ChatRoomEventType::Msg, move |event: ChatRoomEventData| {
if let ChatRoomEventData::Msg(msg) = event {
listener(msg);
}
})
.await;
}
pub async fn on_barrager<F>(&self, listener: F)
where
F: Fn(BarragerMsg) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Barrager,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Barrager(barrager) = event {
listener(barrager);
}
},
)
.await;
}
pub async fn on_redpacket<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg<Value>) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::RedPacket,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::RedPacket(red_packet) = event {
listener(red_packet);
}
},
)
.await;
}
pub async fn on_redpacketstatus<F>(&self, listener: F)
where
F: Fn(RedPacketStatusMsg) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::RedPacketStatus,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::RedPacketStatus(status) = event {
listener(status);
}
},
)
.await;
}
pub async fn on_music<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg<Value>) + Send + Sync + 'static,
{
self.add_listener(ChatRoomEventType::Music, move |event: ChatRoomEventData| {
if let ChatRoomEventData::Music(music) = event {
listener(music);
}
})
.await;
}
pub async fn on_weather<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg<Value>) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Weather,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Weather(weather) = event {
listener(weather);
}
},
)
.await;
}
pub async fn on_custom<F>(&self, listener: F)
where
F: Fn(CustomMsg) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Custom,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Custom(custom) = event {
listener(custom);
}
},
)
.await;
}
pub async fn on_all<F>(&self, listener: F)
where
F: Fn(ChatRoomEventData) + Send + Sync + 'static,
{
self.add_listener(ChatRoomEventType::All, listener).await;
}
async fn add_listener<F>(&self, event: ChatRoomEventType, listener: F)
where
F: Fn(ChatRoomEventData) + Send + Sync + 'static,
{
self.handler.get_emitter().add_listener(event, listener).await;
}
pub async fn off(&self, event: ChatRoomEventType) {
self.handler
.get_emitter()
.remove_listener(Some(event))
.await;
}
pub fn disconnect(&mut self) {
self.connection.disconnect();
}
pub async fn send(&self, msg: String) -> Result<(), Error> {
let client = format!("{}/{}", self.client.as_str(), self.version);
let data = json!({
"content": msg,
"client": client,
"apiKey": self.api_key,
});
let resp = post("chat-room/send", Some(data)).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("发送失败").to_string(),
));
}
Ok(())
}
pub async fn get_discuss(&self) -> String {
let discuss_guard = self.discuss.lock().await;
discuss_guard.clone()
}
pub async fn set_discuss(&self, discuss: String) {
self.send(format!("[setdiscuss]{}[/setdiscuss]", discuss))
.await
.ok();
}
pub async fn get_online_count(&self) -> usize {
let onlines_guard = self.onlines.lock().await;
onlines_guard.len()
}
pub fn set_api_key(&mut self, api_key: String) {
self.api_key = api_key;
}
pub fn set_client_type(&mut self, client: ClientType, version: Option<String>) {
self.client = client;
self.version = version.unwrap_or_else(|| "last".to_string());
}
pub async fn history(
&self,
page: u32,
type_: ChatContentType,
) -> Result<Vec<ChatRoomMsg>, Error> {
let resp = get(&build_http_path(
"chat-room/more",
&[
("page", page.to_string()),
("type", type_.as_str().to_string()),
("apiKey", self.api_key.clone()),
],
))
.await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
let messages: Vec<ChatRoomMsg> = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?
.iter()
.map(ChatRoomMsg::from_value)
.collect::<Result<Vec<_>, _>>()?;
Ok(messages)
}
pub async fn get_msg_around(
&self,
o_id: &str,
mode: ChatRoomMessageMode,
size: u32,
type_: ChatContentType,
) -> Result<Vec<ChatRoomMsg>, Error> {
let resp = get(&build_http_path(
"chat-room/getMessage",
&[
("oId", o_id.to_string()),
("mode", mode.to_string()),
("size", size.to_string()),
("type", type_.as_str().to_string()),
("apiKey", self.api_key.clone()),
],
))
.await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
let messages: Vec<ChatRoomMsg> = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?
.iter()
.map(ChatRoomMsg::from_value)
.collect::<Result<Vec<_>, _>>()?;
Ok(messages)
}
pub async fn revoke(&self, o_id: &str) -> Result<RevokeMsg, Error> {
let data = json!({
"apiKey": self.api_key,
});
let resp = delete(&format!("chat-room/revoke/{}", o_id), Some(data)).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
Ok(RevokeMsg {
msg: resp["msg"].as_str().unwrap_or("").to_string(),
})
}
pub async fn barrager(&self, msg: String, color: Option<String>) -> Result<String, Error> {
let color = color.unwrap_or("#ffffff".to_string());
let data = json!({
"content": format!("[barrager]{{\"color\":\"{}\",\"content\":\"{}\"}}[/barrager]",color, msg),
"apiKey": self.api_key,
});
let resp = post("chat-room/send", Some(data)).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("弹幕发送失败").to_string(),
));
}
Ok(resp["msg"].as_str().unwrap_or("弹幕发送成功").to_string())
}
pub async fn barrage_cost(&self) -> Result<BarragerCost, Error> {
let resp = get(&build_http_path(
"chat-room/barrager/get",
&[("apiKey", self.api_key.clone())],
))
.await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"]
.as_str()
.unwrap_or("获取弹幕花费失败")
.to_string(),
));
}
Ok(BarragerCost::from_value(&resp["data"]))
}
pub async fn mutes(&self) -> Result<Vec<MuteItem>, Error> {
let resp = get("chat-room/si-guo-list").await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"]
.as_str()
.unwrap_or("获取禁言成员列表失败")
.to_string(),
));
}
let messages: Vec<MuteItem> = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?
.iter()
.map(MuteItem::from_value)
.collect::<Result<Vec<_>, _>>()?;
Ok(messages)
}
pub async fn get_raw_message(&self, o_id: &str) -> Result<String, Error> {
let resp = get_text(&format!("cr/raw/{}", o_id,)).await?;
let raw_message = resp.split("<!--").next().unwrap_or("").trim().to_string();
Ok(raw_message)
}
}
#[cfg(test)]
mod tests {
use super::{ChatRoomEventData, ChatRoomEventType, parse_chatroom_message};
use serde_json::json;
#[test]
fn parse_chatroom_custom_message() {
let payload = json!({
"type": "customMessage",
"message": "user joined"
});
let (event_type, event) = parse_chatroom_message(&payload).expect("should parse");
assert!(matches!(event_type, ChatRoomEventType::Custom));
match event {
ChatRoomEventData::Custom(msg) => assert_eq!(msg.message, "user joined"),
_ => panic!("unexpected event variant"),
}
}
#[test]
fn parse_chatroom_unknown_type_fails() {
let payload = json!({
"type": "nope"
});
assert!(parse_chatroom_message(&payload).is_err());
}
}