biliapi/
ws_protocol.rs

1//! bilibili 直播间传回来的数据
2
3use std::fmt::{self, Debug, Display};
4
5use async_tungstenite::tungstenite::Message as WsMessage;
6use chrono::{DateTime, Local};
7use serde::{Deserialize, Serialize};
8
9/// 解析直播间数据时发生的错误
10#[derive(Debug, thiserror::Error)]
11pub enum ParseError {
12    #[error("Websocket packet type not supported: {0}")]
13    WsTypeNotSupported(String),
14
15    #[error("IO error while parsing: {0}")]
16    IO(#[from] std::io::Error),
17
18    #[error("Encoding error: {0}")]
19    Encoding(#[from] std::string::FromUtf8Error),
20}
21
22/// pure magic
23pub mod magic {
24
25    /// I    |      H    |  H  |  I  |   I
26    /// u32  |     u16   | u16 | u32 |  u32
27    /// size | head size | ver |  op | seq_id
28    pub const HEADER_SIZE: usize = 4 + 2 + 2 + 4 + 4;
29
30    pub const VER_ZLIB_COMPRESSED: u16 = 2;
31    pub const VER_NORMAL: u16 = 1;
32
33    /// 已知的操作
34    ///
35    /// 从泄露代码的 app/service/main/broadcast/model/operation.go 可以看到命名
36    #[enum_repr::EnumRepr(type = "u32")]
37    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize)]
38    pub enum KnownOperation {
39        Handshake = 0,
40        HandshakeReply = 1,
41        Heartbeat = 2,
42        HeartbeatReply = 3,
43        SendMsg = 4,
44        SendMsgReply = 5,
45        DisconnectReply = 6,
46        Auth = 7,
47        AuthReply = 8,
48        Raw = 9,
49        ProtoReady = 10,
50        ProtoFinish = 11,
51        ChangeRoom = 12,
52        ChangeRoomReply = 13,
53        Register = 14,
54        RegisterReply = 15,
55        Unregister = 16,
56        UnregisterReply = 17,
57    }
58}
59
60pub use magic::KnownOperation;
61
62/// 每个 packet 对应的 operation
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
64pub enum Operation {
65    Known(magic::KnownOperation),
66    Unknown(u32),
67}
68impl Display for Operation {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        match self {
71            Operation::Known(op) => f.write_fmt(format_args!("{:?}", op)),
72            Operation::Unknown(op) => f.write_fmt(format_args!("Unknown({})", op)),
73        }
74    }
75}
76impl Serialize for Operation {
77    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
78    where
79        S: serde::ser::Serializer,
80    {
81        serializer.serialize_str(&self.to_string())
82    }
83}
84impl<'de> Deserialize<'de> for Operation {
85    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
86    where
87        D: serde::de::Deserializer<'de>,
88    {
89        match KnownOperation::deserialize(deserializer) {
90            Ok(known) => Ok(Operation::Known(known)),
91            Err(e) => {
92                // TODO: implement
93                Err(e)
94            }
95        }
96    }
97}
98impl From<Operation> for u32 {
99    fn from(op: Operation) -> u32 {
100        match op {
101            Operation::Known(k) => k as u32,
102            Operation::Unknown(u) => u,
103        }
104    }
105}
106impl From<u32> for Operation {
107    fn from(u: u32) -> Operation {
108        match magic::KnownOperation::from_repr(u) {
109            Some(k) => Self::Known(k),
110            None => Self::Unknown(u),
111        }
112    }
113}
114
115/// 对应 websocket 返回的 packet,基本上没进行处理
116#[derive(Debug, Serialize, Deserialize, Clone)]
117pub struct Packet {
118    /// packet 对应的 operation,大部分应该都是 [`SendMsgReply`][`magic::KnownOperation::SendMsgReply`]
119    pub operation: Operation,
120    /// packet 对应的 数据,大部分都应该是 json string。这里不做任何解析
121    pub body: String,
122    /// 返回的包会带一个时间戳,表示收到时的时间戳,方便重放
123    pub time: DateTime<Local>,
124    /// 返回的包会带一个 room_id,表示收到的房间,方便重放
125    pub room_id: u64,
126}
127
128impl Packet {
129    /// 生成一个 auth 包
130    pub fn auth(room_id: u64, token: &str) -> Self {
131        let payload = serde_json::json!({
132            "uid": 0,
133            "roomid": room_id,
134            "protover": 2,
135            "platform": "web",
136            "clientver": "1.14.3",
137            "type": 2,
138            "key": token
139        });
140        let body = serde_json::to_string(&payload).unwrap();
141
142        Self {
143            operation: Operation::Known(magic::KnownOperation::Auth),
144            body,
145            time: Local::now(),
146            room_id,
147        }
148    }
149    /// 生成一个心跳包
150    pub fn heartbeat() -> Packet {
151        Packet {
152            operation: Operation::Known(magic::KnownOperation::Heartbeat),
153            body: "{}".to_string(),
154            time: Local::now(),
155            room_id: 0,
156        }
157    }
158
159    /// 从 bytes 解析出一堆 [`Packet`]
160    pub fn from_bytes(bytes: &[u8], room_id: u64) -> Result<Vec<Packet>, ParseError> {
161        use byteorder::{BigEndian, ReadBytesExt};
162        use std::io::Read;
163
164        let mut messages = vec![];
165        // parse bytes to messages
166        let mut buffer: &[u8] = bytes;
167        while !buffer.is_empty() {
168            // 见 magic::HEADER_SIZE
169            trace!("parsing header, buffer size = {:?} bytes", buffer.len());
170            if buffer.len() < magic::HEADER_SIZE {
171                debug!("header too small, ignore: {:2x?}", buffer);
172                break;
173            }
174            let total_size = buffer.read_u32::<BigEndian>()?;
175            let _raw_header_size = buffer.read_u16::<BigEndian>()?;
176            let ver = buffer.read_u16::<BigEndian>()?;
177            let operation = buffer.read_u32::<BigEndian>()?;
178            let operation = Operation::from(operation);
179            let seq_id = buffer.read_u32::<BigEndian>()?;
180            trace!("header parsed, seq_id = {}", seq_id);
181            // read rest data
182            let offset = total_size as usize - magic::HEADER_SIZE;
183
184            let body_buffer = &buffer[..offset];
185
186            match (operation, ver) {
187                (_, magic::VER_ZLIB_COMPRESSED) => {
188                    trace!(
189                        "ver = VER_ZLIB_COMPRESSED, op = {:?}, trying decompress",
190                        operation
191                    );
192                    let mut z = flate2::read::ZlibDecoder::new(body_buffer);
193                    let mut buffer = vec![];
194                    let bytes_read = z.read_to_end(&mut buffer)?;
195                    trace!("read {} bytes from zlib", bytes_read);
196                    // 居然还要递归
197                    let sub_messages = Self::from_bytes(&buffer, room_id).map_err(|e| match e {
198                        ParseError::Encoding(e) => {
199                            debug!("utf8 decoded error, raw bytes = {:?}", bytes);
200                            e.into()
201                        }
202                        e => e,
203                    })?;
204                    messages.extend(sub_messages);
205                }
206                (Operation::Known(magic::KnownOperation::HeartbeatReply), magic::VER_NORMAL) => {
207                    // 烦不烦,能不能统一返回 string
208                    let mut body_buffer = body_buffer;
209                    let popularity = body_buffer.read_u32::<BigEndian>()?;
210                    debug!("got a heartbeat response: {}", popularity);
211                    let message = Packet {
212                        operation,
213                        body: popularity.to_string(),
214                        time: Local::now(),
215                        room_id,
216                    };
217                    messages.push(message);
218                }
219                (operation, ver) => {
220                    let body = match String::from_utf8(body_buffer.to_vec()) {
221                        Ok(body) => body,
222                        Err(e) => {
223                            debug!("utf8 decoded error, raw bytes = {:?}", bytes);
224                            warn!(
225                                "Failed to parse body as utf8, op = {:?}, ver = {:?}",
226                                operation, ver
227                            );
228                            return Err(e.into());
229                        }
230                    };
231
232                    let message = Packet {
233                        operation,
234                        body,
235                        time: Local::now(),
236                        room_id,
237                    };
238                    messages.push(message);
239                }
240            }
241
242            buffer = &buffer[offset..];
243        }
244        Ok(messages)
245    }
246
247    /// 从 [`WsMessage`] 解析出一堆 [`Packet`]
248    pub fn from_ws_message(ws_message: WsMessage, room_id: u64) -> Result<Vec<Packet>, ParseError> {
249        match ws_message {
250            WsMessage::Binary(bytes) => Self::from_bytes(&bytes, room_id),
251            WsMessage::Ping(_) => {
252                debug!("received a ping message, ignore");
253                Ok(vec![])
254            }
255            ws_message => {
256                warn!("Unknown type of websocket message: {:?}", ws_message);
257                Err(ParseError::WsTypeNotSupported(ws_message.to_string()))
258            }
259        }
260    }
261}
262
263impl From<Packet> for WsMessage {
264    fn from(msg: Packet) -> WsMessage {
265        use byteorder::{BigEndian, WriteBytesExt};
266
267        let body_size = msg.body.len();
268        let total_size = magic::HEADER_SIZE + body_size;
269
270        let mut buffer = vec![0; magic::HEADER_SIZE];
271        buffer.extend_from_slice(msg.body.as_bytes());
272
273        let mut cursor = std::io::Cursor::new(buffer);
274
275        cursor.write_u32::<BigEndian>(total_size as u32).unwrap();
276        cursor
277            .write_u16::<BigEndian>(magic::HEADER_SIZE as u16)
278            .unwrap();
279        cursor.write_u16::<BigEndian>(1u16).unwrap();
280        cursor.write_u32::<BigEndian>(msg.operation.into()).unwrap();
281        cursor.write_u32::<BigEndian>(1u32).unwrap();
282
283        let bytes = cursor.into_inner();
284        WsMessage::Binary(bytes)
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    #[test]
292    fn test_operation_serialize() {
293        use serde_json::json;
294        assert_eq!(
295            serde_json::to_string(&json!({
296                "op": Operation::Known(KnownOperation::SendMsgReply)
297            }))
298            .unwrap(),
299            r#"{"op":"SendMsgReply"}"#
300        );
301        assert_eq!(
302            serde_json::to_string(&json!({
303                "op": Operation::Unknown(114514)
304            }))
305            .unwrap(),
306            r#"{"op":"Unknown(114514)"}"#
307        );
308    }
309
310    #[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
311    struct Test {
312        op: Operation,
313    }
314
315    #[test]
316    fn test_operation_deserialize_known() {
317        assert_eq!(
318            serde_json::from_str::<Test>(r#"{"op":"SendMsgReply"}"#).unwrap(),
319            Test {
320                op: Operation::Known(KnownOperation::SendMsgReply)
321            }
322        );
323    }
324
325    #[test]
326    #[ignore = "not yet implemented"]
327    fn test_operation_deserialize_unknown() {
328        assert_eq!(
329            serde_json::from_str::<Test>(r#"{"op":"Unknown(114514)"}"#).unwrap(),
330            Test {
331                op: Operation::Unknown(114514)
332            }
333        );
334    }
335}