1use std::{
2 fmt::{Debug, Formatter},
3 str::FromStr,
4 time::Duration,
5};
6
7use chrono::Utc;
8use futures_util::{SinkExt, StreamExt};
9use reqwest::Method;
10use serde::{Deserialize, Serialize};
11use serde_json::{from_str, to_string, Value};
12use tokio::{net::TcpStream, select, time::interval};
13use tokio_tungstenite::{
14 connect_async,
15 tungstenite::{Error, Message},
16 MaybeTlsStream, WebSocketStream,
17};
18use url::Url;
19
20use crate::{QQBotProtocol, QQResult};
21
22pub use self::{
23 connect_event::{ConnectEvent, QQBotConnected},
24 emoji_event::EmojiEvent,
25 heartbeat_event::HeartbeatEvent,
26 message_event::{MessageAttachment, MessageEvent},
27 ready_event::LoginEvent,
28 subscription_mask::Subscription,
29};
30
31mod connect_event;
32mod emoji_event;
33mod heartbeat_event;
34mod message_event;
35mod ready_event;
36mod subscription_mask;
37
38pub struct QQBotWebsocket<T>
39where
40 T: QQBotProtocol,
41{
42 bot: T,
43 wss: WebSocketStream<MaybeTlsStream<TcpStream>>,
44 connected: QQBotConnected,
45 heartbeat_id: u32,
46 closed: bool,
47}
48
49#[derive(Serialize, Deserialize, Debug)]
50pub struct QQBotOperation {
51 op: u32,
52 #[serde(default)]
53 d: EventDispatcher,
54 #[serde(default)]
55 s: u32,
56 #[serde(default)]
57 t: String,
58 #[serde(default)]
59 id: String,
60}
61
62#[derive(Serialize, Deserialize, Debug)]
63#[serde(untagged)]
64pub enum EventDispatcher {
65 Connect(ConnectEvent),
66 LoginReady(LoginEvent),
67 Heartbeat(HeartbeatEvent),
68 Message(MessageEvent),
69 Emoji(EmojiEvent),
70 NeedBeat(u32),
71 MaybeFail(bool),
72}
73
74impl Default for EventDispatcher {
75 fn default() -> Self {
76 Self::MaybeFail(false)
77 }
78}
79
80impl Default for ConnectEvent {
81 fn default() -> Self {
82 Self { token: "".to_string(), intents: 0, shard: vec![] }
83 }
84}
85
86impl<T> Debug for QQBotWebsocket<T>
87where
88 T: QQBotProtocol,
89{
90 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("QQBotWebsocket")
97 .field("config", self.wss.get_config())
98 .field("connected", &self.connected)
100 .finish()
101 }
102}
103
104impl<T> QQBotWebsocket<T>
105where
106 T: QQBotProtocol,
107{
108 pub async fn link(bot: T) -> QQResult<Self> {
109 let url = Url::from_str("https://sandbox.api.sgroup.qq.com/gateway/bot")?;
110 let request = bot.build_request(Method::GET, url);
111 let connected: QQBotConnected = request.send().await?.json().await?;
112 let (wss, _) = connect_async(&connected.url).await?;
113 Ok(Self { wss, bot, connected, heartbeat_id: 0, closed: false })
114 }
115 pub async fn relink(&mut self) -> QQResult {
116 let url = Url::from_str("https://sandbox.api.sgroup.qq.com/gateway/bot")?;
117 let request = self.bot.build_request(Method::GET, url);
118 let _: QQBotConnected = request.send().await?.json().await?;
119 self.closed = false;
122 Ok(())
123 }
124 pub async fn run(&mut self) -> QQResult {
125 self.send_identify().await?;
126 let mut heartbeat = interval(Duration::from_secs_f32(42.0 * 0.9));
127 let mut saver = interval(Duration::from_secs_f32(30.0));
128 loop {
129 select! {
130 listen = self.next() => {
131 match listen {
132 Some(event) =>{
133 match self.dispatch(event).await {
134 Ok(_) => {}
135 Err(_) => {}
136 }
137 }
138 None => {
139 return Ok(())
140 }
141 }
142 },
143 _ = heartbeat.tick() => {
144 if self.closed {
145 return Ok(())
146 }
147 else {
148 self.send_heartbeat().await?;
149 }
150 },
151 _ = saver.tick() => {
152 self.bot.on_save().await?;
153 },
154 }
155 }
156 }
157
158 pub async fn next(&mut self) -> Option<Result<Message, Error>> {
159 self.wss.next().await
160 }
161 pub async fn dispatch(&mut self, event: Result<Message, Error>) -> QQResult {
162 let received: QQBotOperation = match event? {
163 Message::Text(s) => match from_str(&s) {
164 Ok(o) => o,
165 Err(e) => {
166 let json: Value = from_str(&s)?;
167 print!("未知错误 {:#?}", e);
168 panic!("{:#?}", json);
169 }
170 },
171 Message::Close(_) => {
172 self.closed = true;
173 println!("链接已关闭");
174 return Ok(());
175 }
176 _ => unreachable!(),
177 };
178 match received.op {
179 0 => match received.d {
180 EventDispatcher::Connect(v) => {
181 println!(" 鉴权成功, 登陆为 {:?}", v);
182 }
183 EventDispatcher::Message(msg) => self.bot.on_message(msg).await?,
184 EventDispatcher::LoginReady(msg) => self.bot.on_login_success(msg).await?,
185 EventDispatcher::Emoji(msg) => self.bot.on_emoji(msg).await?,
186 _ => unreachable!(),
187 },
188 7 => self.relink().await?,
190 9 => self.bot.on_login_failure().await?,
191 10 => match received.d {
192 EventDispatcher::Heartbeat(time) => {
193 self.heartbeat_id = received.s;
194 self.bot.on_connected(time).await?;
195 }
196 _ => unreachable!(),
197 },
198 11 => {}
200 _ => {
201 println!("[{}] 协议 {}", Utc::now().format("%F %H:%M:%S"), received.op);
202 println!("未知协议 {:#?}", received);
203 }
204 };
205 Ok(())
206 }
207 pub async fn send(&mut self, operator: &QQBotOperation) -> QQResult<()> {
208 self.wss.send(Message::Text(to_string(&operator)?)).await?;
209 Ok(())
210 }
211 pub async fn send_heartbeat(&mut self) -> QQResult<()> {
212 let protocol = QQBotOperation {
213 op: 1,
214 d: EventDispatcher::NeedBeat(self.heartbeat_id),
215 s: 0,
216 t: "".to_string(),
217 id: "".to_string(),
218 };
219 self.send(&protocol).await?;
220 self.bot.on_heartbeat(self.heartbeat_id).await?;
221 Ok(())
222 }
223 pub async fn send_identify(&mut self) -> QQResult<()> {
224 println!("[{}] 协议 2", Utc::now().format("%F %H:%M:%S"));
225 let intents = self.bot.subscription().bits();
226 let protocol = QQBotOperation {
227 op: 2,
228 s: 0,
229 t: "".to_string(),
230 d: EventDispatcher::Connect(ConnectEvent {
231 token: self.bot.build_bot_token(),
232 intents,
233 shard: vec![0, 1],
234 ..Default::default()
235 }),
236 id: "".to_string(),
237 };
238 self.wss.send(Message::Text(to_string(&protocol)?)).await?;
239 println!(" 首次连接鉴权");
240 println!(" 监听掩码 {:0X}", intents);
241 Ok(())
242 }
243}