queen/
port.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicBool, Ordering}
4};
5use std::thread;
6use std::net::ToSocketAddrs;
7use std::time::Duration;
8use std::io::Write;
9
10use queen_io::net::tcp::TcpStream;
11use queen_io::queue::mpsc::Queue;
12
13use nson::{Message, MessageId};
14
15use crate::net::{NetWork, Packet, CryptoOptions, Codec, KeepAlive};
16use crate::Wire;
17use crate::crypto::Crypto;
18use crate::dict::*;
19use crate::error::{Result, Error, Code};
20use crate::util::message::read_block;
21
22#[derive(Clone)]
23pub struct Port<C: Codec> {
24    inner: Arc<PortInner<C>>
25}
26
27struct PortInner<C: Codec> {
28    queue: Queue<Packet<C>>,
29    run: AtomicBool,
30    keep_alive: KeepAlive
31}
32
33impl<C: Codec> Port<C> {
34    pub fn new(keep_alive: KeepAlive) -> Result<Self> {
35        let port = Port {
36            inner: Arc::new(PortInner {
37                queue: Queue::new()?,
38                run: AtomicBool::new(true),
39                keep_alive
40            })
41        };
42
43        let mut net_work = NetWork::<C>::new(
44            port.inner.queue.clone(),
45            port.inner.keep_alive.clone()
46        )?;
47
48        let inner = port.inner.clone();
49
50        thread::Builder::new().name("port_net".to_string()).spawn(move || {
51            let ret = net_work.run();
52            if ret.is_err() {
53                log::error!("net thread exit: {:?}", ret);
54            } else {
55                log::debug!("net thread exit");
56            }
57
58            inner.run.store(false, Ordering::Relaxed);
59        }).unwrap();
60
61        Ok(port)
62    }
63
64    pub fn stop(&self) {
65        self.inner.run.store(false, Ordering::Relaxed);
66        self.inner.queue.push(Packet::Close);
67    }
68
69    pub fn running(&self) -> bool {
70        self.inner.run.load(Ordering::Relaxed)
71    }
72
73    pub fn connect<A: ToSocketAddrs>(
74        &self,
75        addr: A,
76        slot_id: MessageId,
77        root: bool,
78        mut attr: Message,
79        crypto_options: Option<CryptoOptions>,
80        capacity: Option<usize>
81    ) -> Result<Wire<Message>> {
82
83        if !self.running() {
84            return Err(Error::ConnectionAborted("port is not run!".to_string()))
85        }
86
87        let mut stream = TcpStream::connect(addr)?;
88
89        stream.set_nodelay(true)?;
90        // 握手开始
91        stream.set_nonblocking(false)?;
92        stream.set_read_timeout(Some(Duration::from_secs(10)))?;
93        stream.set_write_timeout(Some(Duration::from_secs(10)))?;
94
95        attr.insert(CHAN, HAND);
96        attr.insert(ADDR, stream.peer_addr()?.to_string());
97        attr.insert(SECURE, false);
98        attr.insert(SLOT_ID, slot_id);
99        attr.insert(ROOT, root);
100
101        let crypto = crypto_options.map(|options| {
102            attr.insert(SECURE, true);
103            attr.insert(METHOD, options.method.as_str());
104
105            Crypto::new(&options.method, options.secret.as_bytes())
106        });
107
108        let mut codec = C::new();
109
110        let bytes = codec.encode(&None, attr)?;
111
112        stream.write_all(&bytes)?;
113
114        // 握手时的消息,不能超过 1024 字节
115        let bytes = read_block(&mut stream, Some(1024))?;
116        let mut message = codec.decode(&None, bytes)?;
117
118        if let Some(code) = Code::get(&message) {
119            if code == Code::Ok {
120                message.remove(CHAN);
121                message.remove(CODE);
122
123                stream.set_nonblocking(true)?;
124                stream.set_read_timeout(None)?;
125                stream.set_write_timeout(None)?;
126                // 握手结束
127
128                // 握手消息可以被对端修改,这里将修改后的出入,以便能够携带一些自定义数据
129                let (wire1, wire2) = Wire::pipe(capacity.unwrap_or(64), message)?;
130
131                self.inner.queue.push(Packet::NewConn {
132                    wire: wire1,
133                    stream,
134                    codec,
135                    crypto
136                });
137
138                return Ok(wire2)
139            } else {
140                return Err(Error::ErrorCode(code))
141            }
142        }
143
144        Err(Error::InvalidData(format!("{}", message)))
145    }
146}
147
148impl<C: Codec> Drop for Port<C> {
149    fn drop(&mut self) {
150        if Arc::strong_count(&self.inner) <= 2 {
151            self.stop()
152        }
153    }
154}