use std::sync::Arc;
use serde_json::Value;
use crate::{
api::ws::{
ParsedMessageHandler, RetryPolicy, WebSocketError, WsConnection, WsLogHook, build_ws_url,
},
model::notice::{NoticeCount, NoticeItem, NoticeList, NoticeMsg, NoticeMsgType, NoticeType},
utils::{build_http_path, error::Error, get},
};
const DOMAIN: &str = "fishpi.cn";
#[derive(Clone, Debug)]
pub enum NoticeEventData {
Msg(NoticeMsg),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum NoticeEventType {
Msg,
}
pub type NoticeListener = Arc<dyn Fn(NoticeEventData) + Send + Sync + 'static>;
pub type NoticeHandler = ParsedMessageHandler<NoticeEventType, NoticeEventData>;
#[allow(non_snake_case)]
fn parse_notice_message(data: &Value) -> Result<(NoticeEventType, NoticeEventData), Error> {
let command = data
.get("command")
.and_then(|v| v.as_str())
.or_else(|| data.get("data").and_then(|v| v.get("command")).and_then(|v| v.as_str()))
.ok_or_else(|| Error::Parse("Missing command field".to_string()))?;
if NoticeMsgType::values().contains(&command) {
let msg = NoticeMsg::from_value(data)
.or_else(|_| data.get("data").ok_or_else(|| Error::Parse("Missing data field".to_string())).and_then(NoticeMsg::from_value))?;
Ok((NoticeEventType::Msg, NoticeEventData::Msg(msg)))
} else {
Err(Error::Parse(format!("Unsupported command: {}", command)))
}
}
pub struct Notice {
connection: WsConnection,
handler: NoticeHandler,
api_key: String,
}
impl Notice {
pub fn new(api_key: String) -> Self {
Self {
connection: WsConnection::new(),
handler: NoticeHandler::new(parse_notice_message, None, "notice"),
api_key,
}
}
fn ws_url(&self) -> Result<String, WebSocketError> {
build_ws_url(
DOMAIN,
"user-channel",
&[("apiKey", self.api_key.clone())],
)
}
pub async fn connect(&mut self, reload: bool) -> Result<(), WebSocketError> {
let url = self.ws_url()?;
self.connection
.connect(reload, &url, self.handler.clone())
.await
}
pub async fn reconnect(&mut self) -> Result<(), WebSocketError> {
let url = self.ws_url()?;
self.connection.reconnect(&url, self.handler.clone()).await
}
pub fn set_reconnect_policy(&mut self, policy: RetryPolicy) {
self.connection.set_retry_policy(policy);
}
pub fn on_ws_log<F>(&mut self, hook: F)
where
F: Fn(&str) + Send + Sync + 'static,
{
let hook = Arc::new(hook) as WsLogHook;
self.connection.set_log_hook_arc(hook.clone());
self.handler.set_log_hook_arc(hook);
}
pub async fn off(&self, event_type: NoticeEventType) {
self.handler
.get_emitter()
.remove_listener(Some(event_type))
.await;
}
pub async fn on_notice<F>(&self, listener: F)
where
F: Fn(NoticeMsg) + Send + Sync + 'static,
{
self.add_listener(NoticeEventType::Msg, move |event: NoticeEventData| {
let NoticeEventData::Msg(msg) = event;
listener(msg);
}).await;
}
async fn add_listener<F>(&self, event: NoticeEventType, listener: F)
where
F: Fn(NoticeEventData) + Send + Sync + 'static,
{
self.handler.get_emitter().add_listener(event, listener).await;
}
pub fn disconnect(&mut self) {
self.connection.disconnect();
}
pub async fn count(&self) -> Result<NoticeCount, Error> {
let url = build_http_path(
"notifications/unread/count",
&[("apiKey", self.api_key.clone())],
);
let resp = get(&url).await?;
let count = NoticeCount::from_value(&resp)?;
Ok(count)
}
pub async fn list(&self, notice_type: NoticeType) -> Result<NoticeList, Error> {
let url = build_http_path(
"api/getNotifications",
&[
("apiKey", self.api_key.clone()),
("type", notice_type.as_str().to_string()),
],
);
let resp = get(&url).await?;
let data_array = resp["data"]
.as_array()
.ok_or_else(|| Error::Api("Data is not an array".to_string()))?;
let list: Vec<NoticeItem> = data_array
.iter()
.map(|item| NoticeItem::from_value(item, ¬ice_type))
.collect::<Result<Vec<_>, _>>()?;
Ok(list)
}
pub async fn make_read(&self, notice_type: NoticeType) -> Result<bool, Error> {
let url = build_http_path(
&format!("notifications/make-read/{}", notice_type.as_str()),
&[("apiKey", self.api_key.clone())],
);
let resp = get(&url).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
Ok(true)
}
pub async fn read_all(&self) -> Result<bool, Error> {
let url = build_http_path("notifications/all-read", &[("apiKey", self.api_key.clone())]);
let resp = get(&url).await?;
if let Some(code) = resp["code"].as_i64()
&& code != 0
{
return Err(Error::Api(
resp["msg"].as_str().unwrap_or("Api error").to_string(),
));
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::{NoticeEventData, NoticeEventType, parse_notice_message};
use serde_json::json;
#[test]
fn parse_notice_warn_broadcast() {
let payload = json!({
"command": "warnBroadcast",
"userId": "u1",
"warnBroadcastText": "hello",
"who": "system"
});
let (event_type, event) = parse_notice_message(&payload).expect("should parse");
assert!(matches!(event_type, NoticeEventType::Msg));
match event {
NoticeEventData::Msg(msg) => assert_eq!(msg.content.as_deref(), Some("hello")),
}
}
#[test]
fn parse_notice_unsupported_command_fails() {
let payload = json!({
"command": "unknown",
"userId": "u1"
});
assert!(parse_notice_message(&payload).is_err());
}
}