use std::{
fmt::{Debug, Formatter},
str::FromStr,
time::Duration,
};
use chrono::Utc;
use futures_util::{SinkExt, StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde_json::{from_str, to_string, Value};
use tokio::{net::TcpStream, select, time::interval};
use tokio_tungstenite::{
connect_async,
tungstenite::{Error, Message},
MaybeTlsStream, WebSocketStream,
};
use url::Url;
use crate::{QQBotProtocol, QQResult};
pub use self::{
connect_event::{ConnectEvent, QQBotConnected},
emoji_event::EmojiEvent,
heartbeat_event::HeartbeatEvent,
message_event::{MessageAttachment, MessageEvent},
ready_event::LoginEvent,
subscription_mask::Subscription,
};
mod connect_event;
mod emoji_event;
mod heartbeat_event;
mod message_event;
mod ready_event;
mod subscription_mask;
pub struct QQBotWebsocket<T>
where
T: QQBotProtocol,
{
bot: T,
wss: WebSocketStream<MaybeTlsStream<TcpStream>>,
connected: QQBotConnected,
heartbeat_id: u32,
closed: bool,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct QQBotOperation {
op: u32,
#[serde(default)]
d: EventDispatcher,
#[serde(default)]
s: u32,
#[serde(default)]
t: String,
#[serde(default)]
id: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum EventDispatcher {
Connect(ConnectEvent),
LoginReady(LoginEvent),
Heartbeat(HeartbeatEvent),
Message(MessageEvent),
Emoji(EmojiEvent),
NeedBeat(u32),
MaybeFail(bool),
}
impl Default for EventDispatcher {
fn default() -> Self {
Self::MaybeFail(false)
}
}
impl Default for ConnectEvent {
fn default() -> Self {
Self { token: "".to_string(), intents: 0, shard: vec![] }
}
}
impl<T> Debug for QQBotWebsocket<T>
where
T: QQBotProtocol,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QQBotWebsocket")
.field("config", self.wss.get_config())
.field("connected", &self.connected)
.finish()
}
}
impl<T> QQBotWebsocket<T>
where
T: QQBotProtocol,
{
pub async fn link(bot: T) -> QQResult<Self> {
let url = Url::from_str("https://sandbox.api.sgroup.qq.com/gateway/bot")?;
let request = bot.build_request(Method::GET, url);
let connected: QQBotConnected = request.send().await?.json().await?;
let (wss, _) = connect_async(&connected.url).await?;
Ok(Self { wss, bot, connected, heartbeat_id: 0, closed: false })
}
pub async fn relink(&mut self) -> QQResult {
let url = Url::from_str("https://sandbox.api.sgroup.qq.com/gateway/bot")?;
let request = self.bot.build_request(Method::GET, url);
let _: QQBotConnected = request.send().await?.json().await?;
self.closed = false;
Ok(())
}
pub async fn run(&mut self) -> QQResult {
self.send_identify().await?;
let mut heartbeat = interval(Duration::from_secs_f32(42.0 * 0.9));
let mut saver = interval(Duration::from_secs_f32(30.0));
loop {
select! {
listen = self.next() => {
match listen {
Some(event) =>{
match self.dispatch(event).await {
Ok(_) => {}
Err(_) => {}
}
}
None => {
return Ok(())
}
}
},
_ = heartbeat.tick() => {
if self.closed {
return Ok(())
}
else {
self.send_heartbeat().await?;
}
},
_ = saver.tick() => {
self.bot.on_save().await?;
},
}
}
}
pub async fn next(&mut self) -> Option<Result<Message, Error>> {
self.wss.next().await
}
pub async fn dispatch(&mut self, event: Result<Message, Error>) -> QQResult {
let received: QQBotOperation = match event? {
Message::Text(s) => match from_str(&s) {
Ok(o) => o,
Err(e) => {
let json: Value = from_str(&s)?;
print!("未知错误 {:#?}", e);
panic!("{:#?}", json);
}
},
Message::Close(_) => {
self.closed = true;
println!("链接已关闭");
return Ok(());
}
_ => unreachable!(),
};
match received.op {
0 => match received.d {
EventDispatcher::Connect(v) => {
println!(" 鉴权成功, 登陆为 {:?}", v);
}
EventDispatcher::Message(msg) => self.bot.on_message(msg).await?,
EventDispatcher::LoginReady(msg) => self.bot.on_login_success(msg).await?,
EventDispatcher::Emoji(msg) => self.bot.on_emoji(msg).await?,
_ => unreachable!(),
},
7 => self.relink().await?,
9 => self.bot.on_login_failure().await?,
10 => match received.d {
EventDispatcher::Heartbeat(time) => {
self.heartbeat_id = received.s;
self.bot.on_connected(time).await?;
}
_ => unreachable!(),
},
11 => {}
_ => {
println!("[{}] 协议 {}", Utc::now().format("%F %H:%M:%S"), received.op);
println!("未知协议 {:#?}", received);
}
};
Ok(())
}
pub async fn send(&mut self, operator: &QQBotOperation) -> QQResult<()> {
self.wss.send(Message::Text(to_string(&operator)?)).await?;
Ok(())
}
pub async fn send_heartbeat(&mut self) -> QQResult<()> {
let protocol = QQBotOperation {
op: 1,
d: EventDispatcher::NeedBeat(self.heartbeat_id),
s: 0,
t: "".to_string(),
id: "".to_string(),
};
self.send(&protocol).await?;
self.bot.on_heartbeat(self.heartbeat_id).await?;
Ok(())
}
pub async fn send_identify(&mut self) -> QQResult<()> {
println!("[{}] 协议 2", Utc::now().format("%F %H:%M:%S"));
let intents = self.bot.subscription().bits();
let protocol = QQBotOperation {
op: 2,
s: 0,
t: "".to_string(),
d: EventDispatcher::Connect(ConnectEvent {
token: self.bot.build_bot_token(),
intents,
shard: vec![0, 1],
..Default::default()
}),
id: "".to_string(),
};
self.wss.send(Message::Text(to_string(&protocol)?)).await?;
println!(" 首次连接鉴权");
println!(" 监听掩码 {:0X}", intents);
Ok(())
}
}