1use crate::request::Request;
2use crate::response::Response;
3use crate::{Handler, HttpError};
4use json::{object, JsonValue};
5use std::sync::mpsc::{channel, Sender};
6use std::sync::{Mutex};
7use std::{io, thread};
8use dashmap::DashMap;
9use log::{info};
10
11pub static USERS: std::sync::LazyLock<DashMap<String, Websocket>> = std::sync::LazyLock::new(DashMap::new);
12pub static WS_NOTICE: std::sync::LazyLock<Mutex<Vec<NoticeMsg>>> = std::sync::LazyLock::new(|| Mutex::new(Vec::new()));
13#[derive(Debug, Clone)]
14pub struct Websocket {
15 pub send: Option<Sender<Message>>,
16 pub key: String,
18 pub user_user: String,
19 pub org_org: String,
20 version: String,
21 request: Request,
22 response: Response,
23}
24
25
26impl Websocket {
27 #[must_use]
28 pub fn http(request: Request, response: Response) -> Self {
29 Self {
30 send: None,
31 request,
32 key: String::new(),
33 user_user: "".to_string(),
34 org_org: "".to_string(),
35 version: String::new(),
36 response,
37 }
38 }
39 pub fn new(request: Request, response: Response) -> Self {
40 Self {
41 send: None,
42 request,
43 key: String::new(),
44 user_user: "".to_string(),
45 org_org: String::new(),
46 version: String::new(),
47 response,
48 }
49 }
50 pub fn send(&mut self, data: &JsonValue) {
52 let msg = Message {
53 mode: MessageMode::Server,
54 message_type: MessageType::Text,
55 payload: data.to_string().into_bytes(),
56 text: data.to_string(),
57 close: CloseCode::None,
58 error: ErrorCode::None,
59 };
60 match self.send.clone().unwrap().send(msg) {
61 Ok(()) => (),
62 Err(_) => self.on_error(ErrorCode::SendingDataFailed),
63 }
64 }
65 pub fn close(&mut self, code: CloseCode, reason: &str) {
67 let msg = Message {
68 mode: MessageMode::Server,
69 message_type: MessageType::Close,
70 payload: reason.as_bytes().to_vec(),
71 text: reason.to_string(),
72 close: code,
73 error: ErrorCode::None,
74 };
75 match self.send.clone().unwrap().send(msg) {
76 Ok(()) => (),
77 Err(_) => self.on_error(ErrorCode::SendingDataFailed),
78 }
79 }
80 pub fn online_users(&mut self) -> usize {
104 USERS.len()
105 }
106 pub fn handle(&mut self) -> Result<(), HttpError> {
107 let (send, receive) = channel();
108 self.send = Some(send);
109 self.on_frame()?;
110 let mut factory = (self.response.factory)(self.clone());
111 USERS.insert(self.key.to_string(), self.clone());
112 factory.on_open()?;
113 let that = self.clone();
114 let scheme = self.response.request.scheme.clone();
115
116 let thr = thread::spawn(move || -> Result<(), HttpError> {
117 loop {
118 let msgs = scheme.lock().unwrap().read_ws_data();
119 let msg = match msgs {
120 Ok(e) => e,
121 Err(_) => return Ok(())
122 };
123 match msg.message_type {
124 MessageType::TimeOut => continue,
125 _ => {
126 match that.send.clone().unwrap().send(msg) {
127 Ok(()) => continue,
128 Err(_) => return Ok(())
129 }
130 }
131 }
132 }
133 });
134 let that = self.clone();
135 let scheme = self.response.request.scheme.clone();
136 let key = self.key.clone();
137 thread::spawn(move || -> io::Result<()> {
138 let mut factory = (that.response.factory)(that.clone());
139 loop {
140 match receive.recv() {
141 Ok(msg) => {
142 match msg.message_type {
143 MessageType::TimeOut => continue,
144 MessageType::Close => {
145 if USERS.get(&key).is_some() {
146 USERS.remove(&key);
147 }
148 factory.on_close(msg.close.clone(), &msg.text);
149 if let Ok(()) = scheme.lock().unwrap().write_all(&Message::send_close(CloseCode::ServerClose, "客户退出关闭")) {};
150 return Ok(());
151 }
152 MessageType::Pong => {
153 info!("接收到一个Pong: {:?} {:?} {:?}", msg.mode, msg.message_type, msg.payload);
154 }
155 MessageType::Binary | MessageType::Text => {
156 info!("接收到数据: {:?}", msg);
157 if let Ok(()) = factory.on_message(msg) {};
158 }
159 _ => {
160 info!("Client有数据: {:?} {:?} {}", msg.mode, msg.message_type, msg.text.clone());
161 return Ok(());
162 }
163 }
164 }
165 Err(_) => return Ok(()),
166 }
167 }
168 });
169 let _ = thr.join().unwrap();
170 Ok(())
171 }
172}
173impl Handler for Websocket {
174 fn on_request(&mut self, _request: Request, _response: &mut Response) {}
175 fn on_frame(&mut self) -> Result<(), HttpError> {
176 self.key = self.request.header["sec-websocket-key"].as_str().unwrap_or("").to_string();
177 self.version = self.request.header["sec-websocket-version"].as_str().unwrap_or("").to_string();
178 self.response.header("Upgrade", "websocket");
179 self.response.header("Connection", "Upgrade");
180 let sec_websocket_accept = br_crypto::sha1::encrypt_base64(format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", self.key).as_bytes());
181 self.response.header("Sec-WebSocket-Accept", sec_websocket_accept.as_str());
182 self.response.status(101).send()?;
183 Ok(())
184 }
185}
186#[derive(Debug, Clone)]
187pub struct Message {
188 pub mode: MessageMode,
189 pub message_type: MessageType,
190 pub payload: Vec<u8>, pub text: String,
192 pub close: CloseCode,
193 pub error: ErrorCode,
194}
195
196impl Message {
197 #[must_use]
198 pub fn msg_error() -> Self {
199 Message {
200 mode: MessageMode::Client,
201 message_type: MessageType::Error,
202 payload: vec![],
203 text: "长度不够".to_string(),
204 close: CloseCode::None,
205 error: ErrorCode::SendingDataFailed,
206 }
207 }
208 pub fn parse_message(data: &mut Vec<u8>) -> Message {
210 println!("{data:?}");
211
212 if data.len() < 2 {
214 return Message {
215 mode: MessageMode::Client,
216 message_type: MessageType::Error,
217 payload: vec![],
218 text: "长度不够".to_string(),
219 close: CloseCode::None,
220 error: ErrorCode::SendingDataFailed,
221 };
222 }
223
224 let header = data.drain(..2).collect::<Vec<u8>>();
225
226 let _fin = (header[0] & 0b1000_0000) != 0;
228 let opcode = header[0] & 0b0000_1111;
229 let masked = (header[1] & 0b1000_0000) != 0;
230 let len_flag = header[1] & 0b0111_1111;
231 let mut payload_data = Vec::new();
232 let message_tpye = MessageType::from(opcode);
233 println!("fin: {:#?} message_tpye: {:?} opcode: {} masked: {} len_flag: {}", _fin, message_tpye, opcode, masked, len_flag);
234 match message_tpye {
235 MessageType::Text => {
236 let payload_length = match len_flag {
237 0..=125 => len_flag as usize,
238 126 => {
239 let ext = data.drain(..2).collect::<Vec<u8>>();
240 u16::from_be_bytes([ext[0], ext[1]]) as usize
241 }
242 127 => {
243 let ext = data.drain(..8).collect::<Vec<u8>>();
244 u64::from_be_bytes([
245 ext[0], ext[1], ext[2], ext[3],
246 ext[4], ext[5], ext[6], ext[7],
247 ]) as usize
248 }
249 _ => return Message {
250 mode: MessageMode::Client,
251 message_type: MessageType::Error,
252 payload: vec![],
253 text: "数据格式错误".to_string(),
254 close: CloseCode::None,
255 error: ErrorCode::SendingDataFailed,
256 }
257 };
258 if masked {
259 if data.len() < payload_length {
260 return Message {
261 mode: MessageMode::Client,
262 message_type: message_tpye,
263 payload: payload_data,
264 text: "继续加载".to_string(),
265 close: CloseCode::None,
266 error: ErrorCode::None,
267 };
268 }
269 let mask_key = data.drain(..4).collect::<Vec<u8>>();
270 let payload = &data[..payload_length];
271 for i in 0..payload.len() {
272 payload_data.push(payload[i] ^ mask_key[i % 4]);
273 }
274 } else {
275 if data.len() < payload_length {
276 return Message {
277 mode: MessageMode::Client,
278 message_type: message_tpye,
279 payload: payload_data,
280 text: "继续加载".to_string(),
281 close: CloseCode::None,
282 error: ErrorCode::None,
283 };
284 }
285 let t = data.drain(..payload_length).collect::<Vec<u8>>();
286 payload_data.extend_from_slice(&t);
287 }
288 let text = unsafe { String::from_utf8_unchecked(payload_data.clone()) };
289 Message {
290 mode: MessageMode::Client,
291 message_type: message_tpye,
292 payload: payload_data,
293 text: text.to_string(),
294 close: CloseCode::None,
295 error: ErrorCode::None,
296 }
297 }
298 MessageType::Binary => Message {
299 mode: MessageMode::Client,
300 message_type: message_tpye,
301 payload: payload_data,
302 text: String::new(),
303 close: CloseCode::None,
304 error: ErrorCode::None,
305 },
306 MessageType::Continuation => Message {
307 mode: MessageMode::Client,
308 message_type: message_tpye,
309 payload: payload_data,
310 text: "继续加载".to_string(),
311 close: CloseCode::None,
312 error: ErrorCode::None,
313 },
314 MessageType::Close => Message {
315 mode: MessageMode::Client,
316 message_type: message_tpye,
317 payload: payload_data,
318 text: "客户端关闭".to_string(),
319 close: CloseCode::ClientClose,
320 error: ErrorCode::None,
321 },
322 MessageType::Ping => Message {
323 mode: MessageMode::Client,
324 message_type: message_tpye,
325 payload: payload_data,
326 text: "Ping".to_string(),
327 close: CloseCode::None,
328 error: ErrorCode::None,
329 },
330 MessageType::Pong => Message {
331 mode: MessageMode::Client,
332 message_type: message_tpye,
333 payload: payload_data,
334 text: "Pong".to_string(),
335 close: CloseCode::None,
336 error: ErrorCode::None,
337 },
338 MessageType::Error => {
339 Message {
340 mode: MessageMode::Client,
341 message_type: message_tpye,
342 payload: vec![],
343 text: String::new(),
344 close: CloseCode::None,
345 error: ErrorCode::Unknown,
346 }
347 }
348 MessageType::None => Message {
349 mode: MessageMode::Client,
350 message_type: message_tpye,
351 payload: vec![],
352 text: String::new(),
353 close: CloseCode::None,
354 error: ErrorCode::None,
355 },
356 MessageType::TimeOut => Message {
357 mode: MessageMode::Client,
358 message_type: message_tpye,
359 payload: vec![],
360 text: String::new(),
361 close: CloseCode::None,
362 error: ErrorCode::TimeOut,
363 }
364 }
365 }
366 pub fn send_message(&mut self) -> Vec<u8> {
367 let mut frame = Vec::new();
368
369 let opcode = self.clone().message_type.to_u8();
371 let mut byte1 = opcode & 0x0F;
372 byte1 |= 0x80; frame.push(byte1);
375
376 let payload_len = self.payload.len();
378 if payload_len < 126 {
379 frame.push(payload_len as u8);
380 } else if payload_len <= 65535 {
381 frame.push(126);
382 frame.extend_from_slice(&u16::try_from(payload_len).unwrap().to_be_bytes());
383 } else {
384 frame.push(127);
385 frame.extend_from_slice(&(payload_len as u64).to_be_bytes());
386 }
387 frame.extend_from_slice(&self.payload);
388 frame
389 }
390 #[must_use]
391 pub fn send_close(code: CloseCode, reason: &str) -> Vec<u8> {
392 let mut frame = Vec::new();
393 frame.push(0x88);
394 let payload_len = code.clone().to_u16().to_be_bytes().len() + reason.len();
395 frame.push(u8::try_from(payload_len).unwrap());
396 frame.extend(&code.to_u16().to_be_bytes());
397 frame.extend(reason.as_bytes());
398 frame
399 }
400}
401#[derive(Debug, Clone)]
402pub enum MessageType {
403 Text,
405 Continuation,
406 Close,
408 Binary,
409 Ping,
410 Pong,
411 None,
412 TimeOut,
413 Error,
414}
415
416impl MessageType {
417 #[must_use]
418 pub fn from(types: u8) -> Self {
419 match types {
420 0x0 => Self::Continuation,
421 0x1 => Self::Text,
422 0x2 => Self::Binary,
423 0x8 => Self::Close,
424 0x9 => Self::Ping,
425 0xa => Self::Pong,
426 _ => Self::None,
427 }
428 }
429 #[must_use]
430 pub fn to_u8(self) -> u8 {
431 match self {
432 MessageType::Text => 0x1,
433 MessageType::Continuation | MessageType::None | MessageType::Error | MessageType::TimeOut => 0x0,
434 MessageType::Close => 0x8,
435 MessageType::Binary => 0x2,
436 MessageType::Ping => 0x9,
437 MessageType::Pong => 0xa,
438 }
439 }
440}
441#[derive(Debug, Clone)]
442pub enum CloseCode {
443 ClientClose,
445 ServerClose,
447 NormalClosure,
449 GoingAway,
450 ProtocolError,
452 Other,
454 None,
455}
456impl CloseCode {
457 #[must_use]
458 pub fn from_err(_err: ErrorCode) -> CloseCode {
459 CloseCode::None
460 }
461 #[must_use]
462 pub fn str(&self) -> String {
463 match self {
464 CloseCode::ClientClose => "客户端主动关闭",
465 CloseCode::ServerClose => "服务端主动关闭",
466 CloseCode::None => "未知关闭",
467 CloseCode::NormalClosure => "正常关闭",
468 CloseCode::GoingAway => "对方离开",
469 CloseCode::ProtocolError => "协议错误",
470 CloseCode::Other => "其它错误",
471 }.to_string()
472 }
473 #[must_use]
474 pub fn to_u16(self) -> u16 {
475 match self {
476 CloseCode::NormalClosure => 1000,
477 CloseCode::GoingAway => 1001,
478 CloseCode::ProtocolError => 1002,
479 CloseCode::ClientClose => 1003,
480 CloseCode::ServerClose => 1004,
481 CloseCode::Other => 1005,
482 CloseCode::None => 1006,
483 }
484 }
485}
486#[derive(Debug, Clone, Copy)]
487pub enum ErrorCode {
488 SendingDataFailed,
490 Unknown,
492 ThreadException,
494 TimeOut,
496 None,
497}
498#[derive(Debug, Clone)]
499pub enum MessageMode {
500 Client,
501 Server,
502}
503
504pub struct NoticeMsg {
505 pub types: Types,
507 pub msg: JsonValue,
509 pub timestamp: i64,
511 pub channel: String,
513 pub user: String,
514 pub org: String,
515}
516impl NoticeMsg {
517 pub fn json(&mut self) -> JsonValue {
518 object! {
519 type:"notice",
520 channel: self.channel.clone(),
521 msg: self.msg.clone(),
522 timestamp: self.timestamp,
523 }
524 }
525}
526
527pub enum Types {
528 All,
530 User,
532 Org,
534}