1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//! 服务端

use std::sync::{Arc, Mutex};
use std::thread;
use unmp::link::{Driver, Link, Recver};
use unmp::net;
use log::{trace, warn};
use websocket::OwnedMessage;

/// 服务端链路驱动接口
struct Server {
    socket: Mutex<websocket::sender::Writer<std::net::TcpStream>>,
}
impl Driver for Server {
    fn send(&self, buf: &[u8]) {
        trace!("websocket_server send: {:02X?}.", buf);
        let mut socket = self.socket.lock().unwrap();
        socket
            .send_message(&OwnedMessage::Binary(buf.into()))
            .expect("websocket_server send error");
    }
}

/// 创建一个链路实例,并监听指定端口
pub fn start(ip: &str, port: u16) {
    let server = websocket::sync::Server::bind((ip, port)).unwrap();

    thread::spawn(|| {
        for request in server.filter_map(Result::ok) {
            thread::spawn(|| {
                if !request.protocols().contains(&"unmp".to_string()) {
                    request.reject().unwrap();
                    return;
                }
                let client = request.use_protocol("unmp").accept().unwrap();
                let addr = client.peer_addr().unwrap();
                let (mut receiver, sender) = client.split().unwrap();
                let driver = Arc::new(Server {
                    socket: Mutex::new(sender),
                });
                let link = Link::new(
                    format!("WebSocket_Server({}:{})", addr.ip(), addr.port()),
                    driver.clone(),
                );

                let driver_tmp = driver.clone();
                let link_tmp = link.clone();
                for message in receiver.incoming_messages() {
                    let mut recver: Recver = Recver::new();
                    match message {
                        Ok(m) => match m {
                            OwnedMessage::Binary(buf) => {
                                let mut buf: &[u8] = &buf;
                                trace!("websocket_server recv: {:02X?}.", buf);

                                // 接收数据帧
                                while let Some(load) = {
                                    let load = recver.recv(buf);
                                    buf = &[];
                                    load
                                } {
                                    net::recv_handle(link_tmp.clone(), &load);
                                }
                            }
                            _ => {}
                        },
                        Err(e) => {
                            warn!("websocket_server recv error: {:?}", e);
                            let mut socket = driver_tmp.socket.lock().unwrap();
                            let _ = socket.send_message(&OwnedMessage::Close(None));
                            return;
                        }
                    };
                }
            });
        }
    });
}