use std::{collections::HashMap, sync::Arc};
use serde_json::Value;
use tokio::sync::{Mutex, mpsc};
use crate::{
api::ws::{MessageHandler, WebSocketClient, WebSocketError},
model::notice::{NoticeCount, NoticeItem, NoticeList, NoticeMsg, NoticeMsgType, NoticeType},
utils::{error::Error, get},
};
const DOMAIN: &str = "fishpi.cn";
#[derive(Clone, Debug)]
pub enum NoticeEventData {
Msg(NoticeMsg),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum NoticeEventType {
Msg,
}
pub type NoticeListener = Arc<dyn Fn(NoticeEventData) + Send + Sync + 'static>;
pub struct NoticeHandler {
emitter: Arc<Mutex<HashMap<NoticeEventType, Vec<NoticeListener>>>>,
}
impl Default for NoticeHandler {
fn default() -> Self {
Self {
emitter: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl NoticeHandler {
pub fn new() -> Self {
Self::default()
}
pub fn get_emitter(&self) -> Arc<Mutex<HashMap<NoticeEventType, Vec<NoticeListener>>>> {
self.emitter.clone()
}
async fn emit_event(
emitter: &Arc<Mutex<HashMap<NoticeEventType, Vec<NoticeListener>>>>,
event_type: NoticeEventType,
event: NoticeEventData,
) {
let listeners: Vec<NoticeListener> = {
let guard = emitter.lock().await;
guard.get(&event_type).cloned().unwrap_or_default()
};
for listener in listeners {
let event = event.clone();
tokio::spawn(async move { listener(event) });
}
}
}
impl MessageHandler for NoticeHandler {
fn handle_message(&self, text: String) {
if let Ok(json) = serde_json::from_str::<Value>(&text) {
let emitter = self.get_emitter();
tokio::spawn(async move {
match parse_notice_message(&json) {
Ok((event_type, event)) => {
Self::emit_event(&emitter, event_type, event).await;
}
Err(e) => {
eprintln!("Failed to parse chat message: {}", e);
}
}
});
}
}
}
#[allow(non_snake_case)]
fn parse_notice_message(data: &Value) -> Result<(NoticeEventType, NoticeEventData), Error> {
let command = data["command"]
.as_str()
.ok_or_else(|| Error::Parse("Missing command field".to_string()))?;
if NoticeMsgType::values().contains(&command) {
let msg = NoticeMsg::from_value(data)?;
Ok((NoticeEventType::Msg, NoticeEventData::Msg(msg)))
} else {
Err(Error::Parse(format!("Unsupported command: {}", command)))
}
}
impl Clone for NoticeHandler {
fn clone(&self) -> Self {
Self {
emitter: self.emitter.clone(),
}
}
}
pub struct Notice {
ws: Option<WebSocketClient>,
handler: NoticeHandler,
sender: Option<mpsc::UnboundedSender<String>>,
api_key: String,
}
impl Notice {
pub fn new(api_key: String) -> Self {
Self {
ws: None,
handler: NoticeHandler::new(),
sender: None,
api_key,
}
}
pub async fn connect(&mut self, reload: bool) -> Result<(), WebSocketError> {
if self.ws.is_some() && !reload {
return Ok(());
}
let url = format!("wss://{}/user-channel?apiKey={}", DOMAIN, self.api_key);
let (tx_send, _) = mpsc::unbounded_channel::<String>();
self.sender = Some(tx_send);
let ws = WebSocketClient::connect(&url, self.handler.clone()).await?;
self.ws = Some(ws);
Ok(())
}
pub async fn reconnect(&mut self) -> Result<(), WebSocketError> {
self.connect(true).await
}
pub async fn off(&self, event_type: NoticeEventType) {
let mut listeners = self.handler.emitter.lock().await;
listeners.remove(&event_type);
}
pub async fn on_notice<F>(&self, listener: F)
where
F: Fn(NoticeMsg) + Send + Sync + 'static,
{
self.add_listener(NoticeEventType::Msg, move |event: NoticeEventData| {
let NoticeEventData::Msg(msg) = event;
listener(msg);
}).await;
}
async fn add_listener<F>(&self, event: NoticeEventType, listener: F)
where
F: Fn(NoticeEventData) + Send + Sync + 'static,
{
let wrapped_listener: NoticeListener = Arc::new(listener);
let mut emitter = self.handler.emitter.lock().await;
emitter
.entry(event)
.or_insert_with(Vec::new)
.push(wrapped_listener);
}
pub fn disconnect(&mut self) {
if let Some(ws) = &mut self.ws {
ws.disconnect();
}
self.ws = None;
self.sender = None;
}
pub async fn count(&self) -> Result<NoticeCount, Error> {
let url = format!("notifications/unread/count?apiKey={}", self.api_key);
let resp = get(&url).await?;
let count = NoticeCount::from_value(&resp)?;
Ok(count)
}
pub async fn list(&self, notice_type: NoticeType) -> Result<NoticeList, Error> {
let url = format!(
"api/getNotifications?apiKey={}&type={}",
self.api_key,
notice_type.as_str()
);
let resp = get(&url).await?;
let data_array = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?;
let list: Vec<NoticeItem> = data_array
.iter()
.map(|item| NoticeItem::from_value(item, ¬ice_type))
.collect::<Result<Vec<_>, _>>()?;
Ok(list)
}
pub async fn make_read(&self, notice_type: NoticeType) -> Result<bool, Error> {
let url = format!(
"notifications/make-read/{}?apiKey={}",
notice_type.as_str(),
self.api_key
);
let resp = get(&url).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
Ok(true)
}
pub async fn read_all(&self) -> Result<bool, Error> {
let url = format!("notifications/all-read?apiKey={}", self.api_key);
let resp = get(&url).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
Ok(true)
}
}