use crate::api::ws::{MessageHandler, WebSocketClient, WebSocketError};
use crate::model::MuteItem;
use crate::model::chatroom::{
BarragerCost, BarragerMsg, ChatContentType, ChatRoomMessageMode, ChatRoomMessageType,
ChatRoomMsg, ClientType, CustomMsg, OnlineInfo, RevokeMsg,
};
use crate::model::redpacket::RedPacketStatusMsg;
use crate::utils::get_text;
use crate::utils::{delete, error::Error, get, post};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use url::Url;
#[derive(Debug, Clone, serde::Deserialize)]
#[allow(non_snake_case)]
pub struct ChatRoomNodeResponse {
pub msg: String,
pub code: i32,
pub data: String,
pub apiKey: String,
pub avaliable: Vec<ChatRoomAvailableNode>,
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ChatRoomAvailableNode {
pub node: String,
pub name: String,
pub weight: u32,
pub online: u32,
}
#[derive(Debug, Clone)]
pub enum ChatRoomEventData {
Online(Vec<OnlineInfo>),
DiscussChanged(String),
Revoke(String),
Msg(ChatRoomMsg),
Barrager(BarragerMsg),
RedPacket(ChatRoomMsg<Value>),
RedPacketStatus(RedPacketStatusMsg),
Music(ChatRoomMsg<Value>),
Weather(ChatRoomMsg<Value>),
Custom(CustomMsg),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ChatRoomEventType {
Online,
DiscussChanged,
Revoke,
Msg,
Barrager,
RedPacket,
RedPacketStatus,
Music,
Weather,
Custom,
All,
}
pub type ChatRoomListener = Arc<dyn Fn(ChatRoomEventData) + Send + Sync + 'static>;
pub struct ChatRoomHandler {
emitter: Arc<Mutex<HashMap<ChatRoomEventType, Vec<ChatRoomListener>>>>,
}
impl Default for ChatRoomHandler {
fn default() -> Self {
Self {
emitter: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl ChatRoomHandler {
pub fn new() -> Self {
Self::default()
}
pub fn get_emitter(&self) -> Arc<Mutex<HashMap<ChatRoomEventType, Vec<ChatRoomListener>>>> {
self.emitter.clone()
}
async fn emit_event(
emitter: &Arc<Mutex<HashMap<ChatRoomEventType, Vec<ChatRoomListener>>>>,
event_type: ChatRoomEventType,
event: ChatRoomEventData,
) {
let listeners: Vec<ChatRoomListener> = {
let guard = emitter.lock().await;
guard.get(&event_type).cloned().unwrap_or_default()
};
for listener in listeners {
let event = event.clone();
tokio::spawn(async move { listener(event) });
}
if event_type != ChatRoomEventType::All {
let all_listeners: Vec<ChatRoomListener> = {
let guard = emitter.lock().await;
guard
.get(&ChatRoomEventType::All)
.cloned()
.unwrap_or_default()
};
for listener in all_listeners {
let event = event.clone();
tokio::spawn(async move { listener(event) });
}
}
}
}
impl MessageHandler for ChatRoomHandler {
fn handle_message(&self, text: String) {
if let Ok(json) = serde_json::from_str::<Value>(&text) {
let emitter = self.get_emitter();
tokio::spawn(async move {
match parse_chatroom_message(&json) {
Ok((event_type, event)) => {
Self::emit_event(&emitter, event_type, event).await;
}
Err(e) => {
eprintln!("解析聊天室消息失败: {}", e);
}
}
});
}
}
}
#[allow(non_snake_case)]
fn parse_chatroom_message(json: &Value) -> Result<(ChatRoomEventType, ChatRoomEventData), Error> {
let type_str = json["type"]
.as_str()
.ok_or_else(|| Error::Parse("Missing type in message".to_string()))?;
let r#type = ChatRoomMessageType::from_str(type_str)
.map_err(|_| Error::Parse(format!("Unknown message type: {}", type_str)))?;
match r#type {
ChatRoomMessageType::Online => {
if let Some(users) = json["users"].as_array() {
let online_info: Vec<OnlineInfo> = users
.iter()
.filter_map(|u| {
Some(OnlineInfo {
homePage: u["homePage"].as_str()?.to_string(),
userAvatarURL: u["userAvatarURL"].as_str()?.to_string(),
userName: u["userName"].as_str()?.to_string(),
})
})
.collect();
Ok((
ChatRoomEventType::Online,
ChatRoomEventData::Online(online_info),
))
} else {
Err(Error::Parse("Missing users in online message".to_string()))
}
}
ChatRoomMessageType::DiscussChanged => {
let new_discuss = json["newDiscuss"]
.as_str()
.ok_or_else(|| Error::Parse("Missing newDiscuss".to_string()))?
.to_string();
Ok((
ChatRoomEventType::DiscussChanged,
ChatRoomEventData::DiscussChanged(new_discuss),
))
}
ChatRoomMessageType::Revoke => {
let o_id = json["oId"]
.as_str()
.ok_or_else(|| Error::Parse("Missing oId in revoke".to_string()))?
.to_string();
Ok((ChatRoomEventType::Revoke, ChatRoomEventData::Revoke(o_id)))
}
ChatRoomMessageType::Msg => {
let chat_msg = ChatRoomMsg::from_value(json)?;
if let Value::Object(ref obj) = chat_msg.content {
if obj.get("msgType").and_then(|v| v.as_str()) == Some("music") {
Ok((ChatRoomEventType::Music, ChatRoomEventData::Music(chat_msg)))
} else if obj.get("msgType").and_then(|v| v.as_str()) == Some("weather") {
Ok((
ChatRoomEventType::Weather,
ChatRoomEventData::Weather(chat_msg),
))
} else {
Ok((ChatRoomEventType::Msg, ChatRoomEventData::Msg(chat_msg)))
}
} else {
Ok((ChatRoomEventType::Msg, ChatRoomEventData::Msg(chat_msg)))
}
}
ChatRoomMessageType::RedPacket => {
let redpacket_msg = ChatRoomMsg::from_value(json)?;
Ok((
ChatRoomEventType::RedPacket,
ChatRoomEventData::RedPacket(redpacket_msg),
))
}
ChatRoomMessageType::Barrager => {
let barrager = BarragerMsg::from_value(json)?;
Ok((
ChatRoomEventType::Barrager,
ChatRoomEventData::Barrager(barrager),
))
}
ChatRoomMessageType::Custom => {
let message = json["message"]
.as_str()
.ok_or_else(|| Error::Parse("Missing message in custom".to_string()))?
.to_string();
Ok((
ChatRoomEventType::Custom,
ChatRoomEventData::Custom(CustomMsg { message }),
))
}
ChatRoomMessageType::RedPacketStatus => {
let redpacket_status = RedPacketStatusMsg::from_value(json)?;
Ok((
ChatRoomEventType::RedPacketStatus,
ChatRoomEventData::RedPacketStatus(redpacket_status),
))
}
}
}
impl Clone for ChatRoomHandler {
fn clone(&self) -> Self {
Self {
emitter: self.emitter.clone(),
}
}
}
pub struct ChatRoom {
ws: Option<WebSocketClient>,
handler: ChatRoomHandler,
sender: Option<mpsc::UnboundedSender<String>>,
api_key: String,
discuss: Arc<Mutex<String>>,
onlines: Arc<Mutex<Vec<OnlineInfo>>>,
client: ClientType,
version: String,
}
impl ChatRoom {
pub fn new(api_key: String) -> Self {
Self {
ws: None,
handler: ChatRoomHandler::new(),
sender: None,
api_key,
discuss: Arc::new(Mutex::new(String::new())),
onlines: Arc::new(Mutex::new(Vec::new())),
client: ClientType::Rust,
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
pub async fn get_node(&self) -> Result<ChatRoomNodeResponse, WebSocketError> {
let url = format!("chat-room/node/get?apiKey={}", self.api_key);
let response: Value = get(&url)
.await
.map_err(|e| WebSocketError::Other(format!("请求失败:{}", e)))?;
let code = response["code"].as_i64().unwrap_or(-1) as i32;
if code != 0 {
let msg = response["msg"].as_str().unwrap_or("未知错误");
return Err(WebSocketError::Other(format!("获取节点失败:{}", msg)));
}
let node_response: ChatRoomNodeResponse = serde_json::from_value(response)
.map_err(|e| WebSocketError::Other(format!("解析节点信息失败:{}", e)))?;
Ok(node_response)
}
pub async fn get_ws_url(&self) -> Result<String, WebSocketError> {
match self.get_node().await {
Ok(node_response) => {
let mut parsed = Url::parse(&node_response.data)
.map_err(|e| WebSocketError::Other(format!("URL parse error: {}", e)))?;
if parsed.path() == "" {
parsed.set_path("/");
}
Ok(parsed.to_string())
}
Err(_) => Ok(format!(
"wss://fishpi.cn/chat-room-channel?apiKey={}",
self.api_key
)),
}
}
pub async fn connect(&mut self, reload: bool) -> Result<(), WebSocketError> {
if self.ws.is_some() && !reload {
return Ok(());
}
let url = self.get_ws_url().await?;
let (tx_send, _) = mpsc::unbounded_channel::<String>();
self.sender = Some(tx_send);
let ws = WebSocketClient::connect(&url, self.handler.clone()).await?;
self.ws = Some(ws);
Ok(())
}
pub async fn reconnect(&mut self) -> Result<(), WebSocketError> {
self.connect(true).await
}
pub async fn on_online<F>(&self, listener: F)
where
F: Fn(Vec<OnlineInfo>) + Send + Sync + 'static,
{
let onlines = Arc::clone(&self.onlines);
let wrapped_listener: ChatRoomListener = Arc::new(move |event: ChatRoomEventData| {
if let ChatRoomEventData::Online(users) = event {
if let Ok(mut onlines_guard) = onlines.try_lock() {
*onlines_guard = users.clone();
}
listener(users);
}
});
let mut emitter = self.handler.emitter.lock().await;
emitter
.entry(ChatRoomEventType::Online)
.or_insert_with(Vec::new)
.push(wrapped_listener);
}
pub async fn on_discuss<F>(&self, listener: F)
where
F: Fn(String) + Send + Sync + 'static,
{
let discuss = Arc::clone(&self.discuss);
let wrapped_listener: ChatRoomListener = Arc::new(move |event: ChatRoomEventData| {
if let ChatRoomEventData::DiscussChanged(topic) = event {
if let Ok(mut discuss_guard) = discuss.try_lock() {
*discuss_guard = topic.clone();
}
listener(topic);
}
});
let mut emitter = self.handler.emitter.lock().await;
emitter
.entry(ChatRoomEventType::DiscussChanged)
.or_insert_with(Vec::new)
.push(wrapped_listener);
}
pub async fn on_revoke<F>(&self, listener: F)
where
F: Fn(String) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Revoke,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Revoke(msg_id) = event {
listener(msg_id);
}
},
)
.await;
}
pub async fn on_msg<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg) + Send + Sync + 'static,
{
self.add_listener(ChatRoomEventType::Msg, move |event: ChatRoomEventData| {
if let ChatRoomEventData::Msg(msg) = event {
listener(msg);
}
})
.await;
}
pub async fn on_barrager<F>(&self, listener: F)
where
F: Fn(BarragerMsg) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Barrager,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Barrager(barrager) = event {
listener(barrager);
}
},
)
.await;
}
pub async fn on_redpacket<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg<Value>) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::RedPacket,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::RedPacket(red_packet) = event {
listener(red_packet);
}
},
)
.await;
}
pub async fn on_redpacketstatus<F>(&self, listener: F)
where
F: Fn(RedPacketStatusMsg) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::RedPacketStatus,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::RedPacketStatus(status) = event {
listener(status);
}
},
)
.await;
}
pub async fn on_music<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg<Value>) + Send + Sync + 'static,
{
self.add_listener(ChatRoomEventType::Music, move |event: ChatRoomEventData| {
if let ChatRoomEventData::Music(music) = event {
listener(music);
}
})
.await;
}
pub async fn on_weather<F>(&self, listener: F)
where
F: Fn(ChatRoomMsg<Value>) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Weather,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Weather(weather) = event {
listener(weather);
}
},
)
.await;
}
pub async fn on_custom<F>(&self, listener: F)
where
F: Fn(CustomMsg) + Send + Sync + 'static,
{
self.add_listener(
ChatRoomEventType::Custom,
move |event: ChatRoomEventData| {
if let ChatRoomEventData::Custom(custom) = event {
listener(custom);
}
},
)
.await;
}
pub async fn on_all<F>(&self, listener: F)
where
F: Fn(ChatRoomEventData) + Send + Sync + 'static,
{
self.add_listener(ChatRoomEventType::All, listener).await;
}
async fn add_listener<F>(&self, event: ChatRoomEventType, listener: F)
where
F: Fn(ChatRoomEventData) + Send + Sync + 'static,
{
let wrapped_listener: ChatRoomListener = Arc::new(listener);
let mut emitter = self.handler.emitter.lock().await;
emitter
.entry(event)
.or_insert_with(Vec::new)
.push(wrapped_listener);
}
pub async fn off(&self, event: ChatRoomEventType) {
let mut emitter = self.handler.emitter.lock().await;
emitter.remove(&event);
}
pub fn disconnect(&mut self) {
if let Some(ws) = &self.ws {
ws.disconnect();
}
self.ws = None;
self.sender = None;
}
pub async fn send(&self, msg: String) -> Result<(), Error> {
let client = format!("{}/{}", self.client.as_str(), self.version);
let data = json!({
"content": msg,
"client": client,
"apiKey": self.api_key,
});
let resp = post("chat-room/send", Some(data)).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("发送失败").to_string(),
));
}
Ok(())
}
pub async fn get_discuss(&self) -> String {
let discuss_guard = self.discuss.lock().await;
discuss_guard.clone()
}
pub async fn set_discuss(&self, discuss: String) {
self.send(format!("[setdiscuss]{}[/setdiscuss]", discuss))
.await
.ok();
}
pub async fn get_online_count(&self) -> usize {
let onlines_guard = self.onlines.lock().await;
onlines_guard.len()
}
pub fn set_api_key(&mut self, api_key: String) {
self.api_key = api_key;
}
pub fn set_client_type(&mut self, client: ClientType, version: Option<String>) {
self.client = client;
self.version = version.unwrap_or_else(|| "last".to_string());
}
pub async fn history(
&self,
page: u32,
type_: ChatContentType,
) -> Result<Vec<ChatRoomMsg>, Error> {
let resp = get(&format!(
"chat-room/more?page={}&type={}&apiKey={}",
page,
type_.as_str(),
self.api_key
))
.await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
let messages: Vec<ChatRoomMsg> = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?
.iter()
.map(ChatRoomMsg::from_value)
.collect::<Result<Vec<_>, _>>()?;
Ok(messages)
}
pub async fn get_msg_around(
&self,
o_id: &str,
mode: ChatRoomMessageMode,
size: u32,
type_: ChatContentType,
) -> Result<Vec<ChatRoomMsg>, Error> {
let resp = get(&format!(
"chat-room/getMessage?oId={}&mode={}&size={}&type={}&apiKey={}",
o_id, mode, size, type_, self.api_key
))
.await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
let messages: Vec<ChatRoomMsg> = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?
.iter()
.map(ChatRoomMsg::from_value)
.collect::<Result<Vec<_>, _>>()?;
Ok(messages)
}
pub async fn revoke(&self, o_id: &str) -> Result<RevokeMsg, Error> {
let data = json!({
"apiKey": self.api_key,
});
let resp = delete(&format!("chat-room/revoke/{}", o_id), Some(data)).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
Ok(RevokeMsg {
msg: resp["msg"].as_str().unwrap_or("").to_string(),
})
}
pub async fn barrager(&self, msg: String, color: Option<String>) -> Result<String, Error> {
let color = color.unwrap_or("#ffffff".to_string());
let data = json!({
"content": format!("[barrager]{{\"color\":\"{}\",\"content\":\"{}\"}}[/barrager]",color, msg),
"apiKey": self.api_key,
});
let resp = post("chat-room/send", Some(data)).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("弹幕发送失败").to_string(),
));
}
Ok(resp["msg"].as_str().unwrap_or("弹幕发送成功").to_string())
}
pub async fn barrage_cost(&self) -> Result<BarragerCost, Error> {
let resp = get(&format!("chat-room/barrager/get?apiKey={}", self.api_key)).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"]
.as_str()
.unwrap_or("获取弹幕花费失败")
.to_string(),
));
}
Ok(BarragerCost::from_value(&resp["data"]))
}
pub async fn mutes(&self) -> Result<Vec<MuteItem>, Error> {
let resp = get("chat-room/si-guo-list").await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"]
.as_str()
.unwrap_or("获取禁言成员列表失败")
.to_string(),
));
}
let messages: Vec<MuteItem> = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?
.iter()
.map(MuteItem::from_value)
.collect::<Result<Vec<_>, _>>()?;
Ok(messages)
}
pub async fn get_raw_message(&self, o_id: &str) -> Result<String, Error> {
let resp = get_text(&format!("cr/raw/{}", o_id,)).await?;
let raw_message = resp.split("<!--").next().unwrap_or("").trim().to_string();
Ok(raw_message)
}
}