qq_bot/wss/
mod.rs

1use std::{
2    fmt::{Debug, Formatter},
3    str::FromStr,
4    time::Duration,
5};
6
7use chrono::Utc;
8use futures_util::{SinkExt, StreamExt};
9use reqwest::Method;
10use serde::{Deserialize, Serialize};
11use serde_json::{from_str, to_string, Value};
12use tokio::{net::TcpStream, select, time::interval};
13use tokio_tungstenite::{
14    connect_async,
15    tungstenite::{Error, Message},
16    MaybeTlsStream, WebSocketStream,
17};
18use url::Url;
19
20use crate::{QQBotProtocol, QQResult};
21
22pub use self::{
23    connect_event::{ConnectEvent, QQBotConnected},
24    emoji_event::EmojiEvent,
25    heartbeat_event::HeartbeatEvent,
26    message_event::{MessageAttachment, MessageEvent},
27    ready_event::LoginEvent,
28    subscription_mask::Subscription,
29};
30
31mod connect_event;
32mod emoji_event;
33mod heartbeat_event;
34mod message_event;
35mod ready_event;
36mod subscription_mask;
37
38pub struct QQBotWebsocket<T>
39where
40    T: QQBotProtocol,
41{
42    bot: T,
43    wss: WebSocketStream<MaybeTlsStream<TcpStream>>,
44    connected: QQBotConnected,
45    heartbeat_id: u32,
46    closed: bool,
47}
48
49#[derive(Serialize, Deserialize, Debug)]
50pub struct QQBotOperation {
51    op: u32,
52    #[serde(default)]
53    d: EventDispatcher,
54    #[serde(default)]
55    s: u32,
56    #[serde(default)]
57    t: String,
58    #[serde(default)]
59    id: String,
60}
61
62#[derive(Serialize, Deserialize, Debug)]
63#[serde(untagged)]
64pub enum EventDispatcher {
65    Connect(ConnectEvent),
66    LoginReady(LoginEvent),
67    Heartbeat(HeartbeatEvent),
68    Message(MessageEvent),
69    Emoji(EmojiEvent),
70    NeedBeat(u32),
71    MaybeFail(bool),
72}
73
74impl Default for EventDispatcher {
75    fn default() -> Self {
76        Self::MaybeFail(false)
77    }
78}
79
80impl Default for ConnectEvent {
81    fn default() -> Self {
82        Self { token: "".to_string(), intents: 0, shard: vec![] }
83    }
84}
85
86impl<T> Debug for QQBotWebsocket<T>
87where
88    T: QQBotProtocol,
89{
90    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91        // let tcp_stream = match self.wss.get_ref() {
92        //     MaybeTlsStream::Plain(s) => s.peer_addr().unwrap(),
93        //     MaybeTlsStream::NativeTls(t) => t.get_ref().get_ref().get_ref().peer_addr().unwrap(),
94        //     _ => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
95        // };
96        f.debug_struct("QQBotWebsocket")
97            .field("config", self.wss.get_config())
98            // .field("socket", &tcp_stream)
99            .field("connected", &self.connected)
100            .finish()
101    }
102}
103
104impl<T> QQBotWebsocket<T>
105where
106    T: QQBotProtocol,
107{
108    pub async fn link(bot: T) -> QQResult<Self> {
109        let url = Url::from_str("https://sandbox.api.sgroup.qq.com/gateway/bot")?;
110        let request = bot.build_request(Method::GET, url);
111        let connected: QQBotConnected = request.send().await?.json().await?;
112        let (wss, _) = connect_async(&connected.url).await?;
113        Ok(Self { wss, bot, connected, heartbeat_id: 0, closed: false })
114    }
115    pub async fn relink(&mut self) -> QQResult {
116        let url = Url::from_str("https://sandbox.api.sgroup.qq.com/gateway/bot")?;
117        let request = self.bot.build_request(Method::GET, url);
118        let _: QQBotConnected = request.send().await?.json().await?;
119        // let (wss, _) = connect_async(&connected.url).await?;
120        // self.wss = wss;
121        self.closed = false;
122        Ok(())
123    }
124    pub async fn run(&mut self) -> QQResult {
125        self.send_identify().await?;
126        let mut heartbeat = interval(Duration::from_secs_f32(42.0 * 0.9));
127        let mut saver = interval(Duration::from_secs_f32(30.0));
128        loop {
129            select! {
130                listen = self.next() => {
131                    match listen {
132                        Some(event) =>{
133                            match self.dispatch(event).await {
134                                Ok(_) => {}
135                                Err(_) => {}
136                            }
137                        }
138                        None => {
139                            return Ok(())
140                        }
141                    }
142                },
143                _ = heartbeat.tick() => {
144                     if self.closed {
145                        return Ok(())
146                     }
147                     else {
148                        self.send_heartbeat().await?;
149                    }
150                },
151                _ = saver.tick() => {
152                     self.bot.on_save().await?;
153                },
154            }
155        }
156    }
157
158    pub async fn next(&mut self) -> Option<Result<Message, Error>> {
159        self.wss.next().await
160    }
161    pub async fn dispatch(&mut self, event: Result<Message, Error>) -> QQResult {
162        let received: QQBotOperation = match event? {
163            Message::Text(s) => match from_str(&s) {
164                Ok(o) => o,
165                Err(e) => {
166                    let json: Value = from_str(&s)?;
167                    print!("未知错误 {:#?}", e);
168                    panic!("{:#?}", json);
169                }
170            },
171            Message::Close(_) => {
172                self.closed = true;
173                println!("链接已关闭");
174                return Ok(());
175            }
176            _ => unreachable!(),
177        };
178        match received.op {
179            0 => match received.d {
180                EventDispatcher::Connect(v) => {
181                    println!("    鉴权成功, 登陆为 {:?}", v);
182                }
183                EventDispatcher::Message(msg) => self.bot.on_message(msg).await?,
184                EventDispatcher::LoginReady(msg) => self.bot.on_login_success(msg).await?,
185                EventDispatcher::Emoji(msg) => self.bot.on_emoji(msg).await?,
186                _ => unreachable!(),
187            },
188            // 要求重新链接
189            7 => self.relink().await?,
190            9 => self.bot.on_login_failure().await?,
191            10 => match received.d {
192                EventDispatcher::Heartbeat(time) => {
193                    self.heartbeat_id = received.s;
194                    self.bot.on_connected(time).await?;
195                }
196                _ => unreachable!(),
197            },
198            // 接收到心跳包, 无参数
199            11 => {}
200            _ => {
201                println!("[{}] 协议 {}", Utc::now().format("%F %H:%M:%S"), received.op);
202                println!("未知协议 {:#?}", received);
203            }
204        };
205        Ok(())
206    }
207    pub async fn send(&mut self, operator: &QQBotOperation) -> QQResult<()> {
208        self.wss.send(Message::Text(to_string(&operator)?)).await?;
209        Ok(())
210    }
211    pub async fn send_heartbeat(&mut self) -> QQResult<()> {
212        let protocol = QQBotOperation {
213            op: 1,
214            d: EventDispatcher::NeedBeat(self.heartbeat_id),
215            s: 0,
216            t: "".to_string(),
217            id: "".to_string(),
218        };
219        self.send(&protocol).await?;
220        self.bot.on_heartbeat(self.heartbeat_id).await?;
221        Ok(())
222    }
223    pub async fn send_identify(&mut self) -> QQResult<()> {
224        println!("[{}] 协议 2", Utc::now().format("%F %H:%M:%S"));
225        let intents = self.bot.subscription().bits();
226        let protocol = QQBotOperation {
227            op: 2,
228            s: 0,
229            t: "".to_string(),
230            d: EventDispatcher::Connect(ConnectEvent {
231                token: self.bot.build_bot_token(),
232                intents,
233                shard: vec![0, 1],
234                ..Default::default()
235            }),
236            id: "".to_string(),
237        };
238        self.wss.send(Message::Text(to_string(&protocol)?)).await?;
239        println!("    首次连接鉴权");
240        println!("    监听掩码 {:0X}", intents);
241        Ok(())
242    }
243}