br_web_server/
websocket.rs

1use crate::request::Request;
2use crate::response::Response;
3use crate::{Handler, HttpError};
4use json::{object, JsonValue};
5use std::sync::mpsc::{channel, Sender};
6use std::sync::{Mutex};
7use std::{io, thread};
8use dashmap::DashMap;
9use log::{info};
10
11pub static USERS: std::sync::LazyLock<DashMap<String, Websocket>> = std::sync::LazyLock::new(DashMap::new);
12pub static WS_NOTICE: std::sync::LazyLock<Mutex<Vec<NoticeMsg>>> = std::sync::LazyLock::new(|| Mutex::new(Vec::new()));
13#[derive(Debug, Clone)]
14pub struct Websocket {
15    pub send: Option<Sender<Message>>,
16    /// 接收
17    pub key: String,
18    pub user_user: String,
19    pub org_org: String,
20    version: String,
21    request: Request,
22    response: Response,
23}
24
25
26impl Websocket {
27    #[must_use]
28    pub fn http(request: Request, response: Response) -> Self {
29        Self {
30            send: None,
31            request,
32            key: String::new(),
33            user_user: "".to_string(),
34            org_org: "".to_string(),
35            version: String::new(),
36            response,
37        }
38    }
39    pub fn new(request: Request, response: Response) -> Self {
40        Self {
41            send: None,
42            request,
43            key: String::new(),
44            user_user: "".to_string(),
45            org_org: String::new(),
46            version: String::new(),
47            response,
48        }
49    }
50    // 发送数据
51    pub fn send(&mut self, data: &JsonValue) {
52        let msg = Message {
53            mode: MessageMode::Server,
54            message_type: MessageType::Text,
55            payload: data.to_string().into_bytes(),
56            text: data.to_string(),
57            close: CloseCode::None,
58            error: ErrorCode::None,
59        };
60        match self.send.clone().unwrap().send(msg) {
61            Ok(()) => (),
62            Err(_) => self.on_error(ErrorCode::SendingDataFailed),
63        }
64    }
65    // 关闭连接
66    pub fn close(&mut self, code: CloseCode, reason: &str) {
67        let msg = Message {
68            mode: MessageMode::Server,
69            message_type: MessageType::Close,
70            payload: reason.as_bytes().to_vec(),
71            text: reason.to_string(),
72            close: code,
73            error: ErrorCode::None,
74        };
75        match self.send.clone().unwrap().send(msg) {
76            Ok(()) => (),
77            Err(_) => self.on_error(ErrorCode::SendingDataFailed),
78        }
79    }
80    ///// 发送给指定对象
81    //pub fn send_user(&mut self, user_user: &str, data: JsonValue) {
82    //    if USERS.get(user_user).is_some() {
83    //        for mut user in USERS.iter_mut() {
84    //            if user.user_user == user_user {
85    //                user.send(data.clone());
86    //                return;
87    //            }
88    //        }
89    //    }
90    //}
91    ///// 发送给指定的企业
92    //pub fn send_org(&mut self, org_org: &str, data: JsonValue) {
93    //    if USERS.get(org_org).is_some() {
94    //        for mut user in USERS.iter_mut() {
95    //            if user.org_org == org_org {
96    //                user.send(data.clone());
97    //                return;
98    //            }
99    //        }
100    //    }
101    //}
102    /// 在线人数
103    pub fn online_users(&mut self) -> usize {
104        USERS.len()
105    }
106    pub fn handle(&mut self) -> Result<(), HttpError> {
107        let (send, receive) = channel();
108        self.send = Some(send);
109        self.on_frame()?;
110        let mut factory = (self.response.factory)(self.clone());
111        USERS.insert(self.key.to_string(), self.clone());
112        factory.on_open()?;
113        let that = self.clone();
114        let scheme = self.response.request.scheme.clone();
115
116        let thr = thread::spawn(move || -> Result<(), HttpError> {
117            loop {
118                let msgs = scheme.lock().unwrap().read_ws_data();
119                let msg = match msgs {
120                    Ok(e) => e,
121                    Err(_) => return Ok(())
122                };
123                match msg.message_type {
124                    MessageType::TimeOut => continue,
125                    _ => {
126                        match that.send.clone().unwrap().send(msg) {
127                            Ok(()) => continue,
128                            Err(_) => return Ok(())
129                        }
130                    }
131                }
132            }
133        });
134        let that = self.clone();
135        let scheme = self.response.request.scheme.clone();
136        let key = self.key.clone();
137        thread::spawn(move || -> io::Result<()> {
138            let mut factory = (that.response.factory)(that.clone());
139            loop {
140                match receive.recv() {
141                    Ok(msg) => {
142                        match msg.message_type {
143                            MessageType::TimeOut => continue,
144                            MessageType::Close => {
145                                if USERS.get(&key).is_some() {
146                                    USERS.remove(&key);
147                                }
148                                factory.on_close(msg.close.clone(), &msg.text);
149                                if let Ok(()) = scheme.lock().unwrap().write_all(&Message::send_close(CloseCode::ServerClose, "客户退出关闭")) {};
150                                return Ok(());
151                            }
152                            MessageType::Pong => {
153                                info!("接收到一个Pong: {:?} {:?} {:?}", msg.mode, msg.message_type, msg.payload);
154                            }
155                            MessageType::Binary | MessageType::Text => {
156                                info!("接收到数据: {:?}", msg);
157                                if let Ok(()) = factory.on_message(msg) {};
158                            }
159                            _ => {
160                                info!("Client有数据: {:?} {:?} {}", msg.mode, msg.message_type, msg.text.clone());
161                                return Ok(());
162                            }
163                        }
164                    }
165                    Err(_) => return Ok(()),
166                }
167            }
168        });
169        let _ = thr.join().unwrap();
170        Ok(())
171    }
172}
173impl Handler for Websocket {
174    fn on_request(&mut self, _request: Request, _response: &mut Response) {}
175    fn on_frame(&mut self) -> Result<(), HttpError> {
176        self.key = self.request.header["sec-websocket-key"].as_str().unwrap_or("").to_string();
177        self.version = self.request.header["sec-websocket-version"].as_str().unwrap_or("").to_string();
178        self.response.header("Upgrade", "websocket");
179        self.response.header("Connection", "Upgrade");
180        let sec_websocket_accept = br_crypto::sha1::encrypt_base64(format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", self.key).as_bytes());
181        self.response.header("Sec-WebSocket-Accept", sec_websocket_accept.as_str());
182        self.response.status(101).send()?;
183        Ok(())
184    }
185}
186#[derive(Debug, Clone)]
187pub struct Message {
188    pub mode: MessageMode,
189    pub message_type: MessageType,
190    pub payload: Vec<u8>, // 消息载荷,以字节向量形式表示
191    pub text: String,
192    pub close: CloseCode,
193    pub error: ErrorCode,
194}
195
196impl Message {
197    #[must_use]
198    pub fn msg_error() -> Self {
199        Message {
200            mode: MessageMode::Client,
201            message_type: MessageType::Error,
202            payload: vec![],
203            text: "长度不够".to_string(),
204            close: CloseCode::None,
205            error: ErrorCode::SendingDataFailed,
206        }
207    }
208    // 解析WebSocket消息
209    pub fn parse_message(data: &mut Vec<u8>) -> Message {
210        println!("{data:?}");
211
212        // 检查数据是否足够长来包含消息类型和载荷长度
213        if data.len() < 2 {
214            return Message {
215                mode: MessageMode::Client,
216                message_type: MessageType::Error,
217                payload: vec![],
218                text: "长度不够".to_string(),
219                close: CloseCode::None,
220                error: ErrorCode::SendingDataFailed,
221            };
222        }
223
224        let header = data.drain(..2).collect::<Vec<u8>>();
225
226        // 解析帧头
227        let _fin = (header[0] & 0b1000_0000) != 0;
228        let opcode = header[0] & 0b0000_1111;
229        let masked = (header[1] & 0b1000_0000) != 0;
230        let len_flag = header[1] & 0b0111_1111;
231        let mut payload_data = Vec::new();
232        let message_tpye = MessageType::from(opcode);
233        println!("fin: {:#?} message_tpye: {:?} opcode: {} masked: {} len_flag: {}", _fin, message_tpye, opcode, masked, len_flag);
234        match message_tpye {
235            MessageType::Text => {
236                let payload_length = match len_flag {
237                    0..=125 => len_flag as usize,
238                    126 => {
239                        let ext = data.drain(..2).collect::<Vec<u8>>();
240                        u16::from_be_bytes([ext[0], ext[1]]) as usize
241                    }
242                    127 => {
243                        let ext = data.drain(..8).collect::<Vec<u8>>();
244                        u64::from_be_bytes([
245                            ext[0], ext[1], ext[2], ext[3],
246                            ext[4], ext[5], ext[6], ext[7],
247                        ]) as usize
248                    }
249                    _ => return Message {
250                        mode: MessageMode::Client,
251                        message_type: MessageType::Error,
252                        payload: vec![],
253                        text: "数据格式错误".to_string(),
254                        close: CloseCode::None,
255                        error: ErrorCode::SendingDataFailed,
256                    }
257                };
258                if masked {
259                    if data.len() < payload_length {
260                        return Message {
261                            mode: MessageMode::Client,
262                            message_type: message_tpye,
263                            payload: payload_data,
264                            text: "继续加载".to_string(),
265                            close: CloseCode::None,
266                            error: ErrorCode::None,
267                        };
268                    }
269                    let mask_key = data.drain(..4).collect::<Vec<u8>>();
270                    let payload = &data[..payload_length];
271                    for i in 0..payload.len() {
272                        payload_data.push(payload[i] ^ mask_key[i % 4]);
273                    }
274                } else {
275                    if data.len() < payload_length {
276                        return Message {
277                            mode: MessageMode::Client,
278                            message_type: message_tpye,
279                            payload: payload_data,
280                            text: "继续加载".to_string(),
281                            close: CloseCode::None,
282                            error: ErrorCode::None,
283                        };
284                    }
285                    let t = data.drain(..payload_length).collect::<Vec<u8>>();
286                    payload_data.extend_from_slice(&t);
287                }
288                let text = unsafe { String::from_utf8_unchecked(payload_data.clone()) };
289                Message {
290                    mode: MessageMode::Client,
291                    message_type: message_tpye,
292                    payload: payload_data,
293                    text: text.to_string(),
294                    close: CloseCode::None,
295                    error: ErrorCode::None,
296                }
297            }
298            MessageType::Binary => Message {
299                mode: MessageMode::Client,
300                message_type: message_tpye,
301                payload: payload_data,
302                text: String::new(),
303                close: CloseCode::None,
304                error: ErrorCode::None,
305            },
306            MessageType::Continuation => Message {
307                mode: MessageMode::Client,
308                message_type: message_tpye,
309                payload: payload_data,
310                text: "继续加载".to_string(),
311                close: CloseCode::None,
312                error: ErrorCode::None,
313            },
314            MessageType::Close => Message {
315                mode: MessageMode::Client,
316                message_type: message_tpye,
317                payload: payload_data,
318                text: "客户端关闭".to_string(),
319                close: CloseCode::ClientClose,
320                error: ErrorCode::None,
321            },
322            MessageType::Ping => Message {
323                mode: MessageMode::Client,
324                message_type: message_tpye,
325                payload: payload_data,
326                text: "Ping".to_string(),
327                close: CloseCode::None,
328                error: ErrorCode::None,
329            },
330            MessageType::Pong => Message {
331                mode: MessageMode::Client,
332                message_type: message_tpye,
333                payload: payload_data,
334                text: "Pong".to_string(),
335                close: CloseCode::None,
336                error: ErrorCode::None,
337            },
338            MessageType::Error => {
339                Message {
340                    mode: MessageMode::Client,
341                    message_type: message_tpye,
342                    payload: vec![],
343                    text: String::new(),
344                    close: CloseCode::None,
345                    error: ErrorCode::Unknown,
346                }
347            }
348            MessageType::None => Message {
349                mode: MessageMode::Client,
350                message_type: message_tpye,
351                payload: vec![],
352                text: String::new(),
353                close: CloseCode::None,
354                error: ErrorCode::None,
355            },
356            MessageType::TimeOut => Message {
357                mode: MessageMode::Client,
358                message_type: message_tpye,
359                payload: vec![],
360                text: String::new(),
361                close: CloseCode::None,
362                error: ErrorCode::TimeOut,
363            }
364        }
365    }
366    pub fn send_message(&mut self) -> Vec<u8> {
367        let mut frame = Vec::new();
368
369        // 第1字节:FIN + RSV1-3 + OPCODE
370        let opcode = self.clone().message_type.to_u8();
371        let mut byte1 = opcode & 0x0F;
372        byte1 |= 0x80; // FIN = 1
373
374        frame.push(byte1);
375
376        // 第2字节:MASK = 0(服务器发送),+ payload length
377        let payload_len = self.payload.len();
378        if payload_len < 126 {
379            frame.push(payload_len as u8);
380        } else if payload_len <= 65535 {
381            frame.push(126);
382            frame.extend_from_slice(&u16::try_from(payload_len).unwrap().to_be_bytes());
383        } else {
384            frame.push(127);
385            frame.extend_from_slice(&(payload_len as u64).to_be_bytes());
386        }
387        frame.extend_from_slice(&self.payload);
388        frame
389    }
390    #[must_use]
391    pub fn send_close(code: CloseCode, reason: &str) -> Vec<u8> {
392        let mut frame = Vec::new();
393        frame.push(0x88);
394        let payload_len = code.clone().to_u16().to_be_bytes().len() + reason.len();
395        frame.push(u8::try_from(payload_len).unwrap());
396        frame.extend(&code.to_u16().to_be_bytes());
397        frame.extend(reason.as_bytes());
398        frame
399    }
400}
401#[derive(Debug, Clone)]
402pub enum MessageType {
403    /// 文本
404    Text,
405    Continuation,
406    /// 客户端关闭
407    Close,
408    Binary,
409    Ping,
410    Pong,
411    None,
412    TimeOut,
413    Error,
414}
415
416impl MessageType {
417    #[must_use]
418    pub fn from(types: u8) -> Self {
419        match types {
420            0x0 => Self::Continuation,
421            0x1 => Self::Text,
422            0x2 => Self::Binary,
423            0x8 => Self::Close,
424            0x9 => Self::Ping,
425            0xa => Self::Pong,
426            _ => Self::None,
427        }
428    }
429    #[must_use]
430    pub fn to_u8(self) -> u8 {
431        match self {
432            MessageType::Text => 0x1,
433            MessageType::Continuation | MessageType::None | MessageType::Error | MessageType::TimeOut => 0x0,
434            MessageType::Close => 0x8,
435            MessageType::Binary => 0x2,
436            MessageType::Ping => 0x9,
437            MessageType::Pong => 0xa,
438        }
439    }
440}
441#[derive(Debug, Clone)]
442pub enum CloseCode {
443    /// 客户端主动关闭
444    ClientClose,
445    /// 服务端主动关闭
446    ServerClose,
447    /// 正常关闭
448    NormalClosure,
449    GoingAway,
450    /// 协议错误
451    ProtocolError,
452    /// 其它错误
453    Other,
454    None,
455}
456impl CloseCode {
457    #[must_use]
458    pub fn from_err(_err: ErrorCode) -> CloseCode {
459        CloseCode::None
460    }
461    #[must_use]
462    pub fn str(&self) -> String {
463        match self {
464            CloseCode::ClientClose => "客户端主动关闭",
465            CloseCode::ServerClose => "服务端主动关闭",
466            CloseCode::None => "未知关闭",
467            CloseCode::NormalClosure => "正常关闭",
468            CloseCode::GoingAway => "对方离开",
469            CloseCode::ProtocolError => "协议错误",
470            CloseCode::Other => "其它错误",
471        }.to_string()
472    }
473    #[must_use]
474    pub fn to_u16(self) -> u16 {
475        match self {
476            CloseCode::NormalClosure => 1000,
477            CloseCode::GoingAway => 1001,
478            CloseCode::ProtocolError => 1002,
479            CloseCode::ClientClose => 1003,
480            CloseCode::ServerClose => 1004,
481            CloseCode::Other => 1005,
482            CloseCode::None => 1006,
483        }
484    }
485}
486#[derive(Debug, Clone, Copy)]
487pub enum ErrorCode {
488    /// 发送数据失败
489    SendingDataFailed,
490    /// Unknown request error
491    Unknown,
492    /// 线程异常
493    ThreadException,
494    /// 超时
495    TimeOut,
496    None,
497}
498#[derive(Debug, Clone)]
499pub enum MessageMode {
500    Client,
501    Server,
502}
503
504pub struct NoticeMsg {
505    /// 消息类型
506    pub types: Types,
507    /// 消息实体内容
508    pub msg: JsonValue,
509    /// 消息创建时间
510    pub timestamp: i64,
511    /// 监听通道名称
512    pub channel: String,
513    pub user: String,
514    pub org: String,
515}
516impl NoticeMsg {
517    pub fn json(&mut self) -> JsonValue {
518        object! {
519            type:"notice",
520            channel: self.channel.clone(),
521            msg: self.msg.clone(),
522            timestamp: self.timestamp,
523        }
524    }
525}
526
527pub enum Types {
528    /// 全体消息
529    All,
530    /// 指定用户消息
531    User,
532    /// 指定企业消息
533    Org,
534}