1use std::sync::{
2 Arc,
3 atomic::{AtomicBool, Ordering}
4};
5use std::thread;
6use std::net::ToSocketAddrs;
7use std::time::Duration;
8use std::io::Write;
9
10use queen_io::net::tcp::TcpStream;
11use queen_io::queue::mpsc::Queue;
12
13use nson::{Message, MessageId};
14
15use crate::net::{NetWork, Packet, CryptoOptions, Codec, KeepAlive};
16use crate::Wire;
17use crate::crypto::Crypto;
18use crate::dict::*;
19use crate::error::{Result, Error, Code};
20use crate::util::message::read_block;
21
22#[derive(Clone)]
23pub struct Port<C: Codec> {
24 inner: Arc<PortInner<C>>
25}
26
27struct PortInner<C: Codec> {
28 queue: Queue<Packet<C>>,
29 run: AtomicBool,
30 keep_alive: KeepAlive
31}
32
33impl<C: Codec> Port<C> {
34 pub fn new(keep_alive: KeepAlive) -> Result<Self> {
35 let port = Port {
36 inner: Arc::new(PortInner {
37 queue: Queue::new()?,
38 run: AtomicBool::new(true),
39 keep_alive
40 })
41 };
42
43 let mut net_work = NetWork::<C>::new(
44 port.inner.queue.clone(),
45 port.inner.keep_alive.clone()
46 )?;
47
48 let inner = port.inner.clone();
49
50 thread::Builder::new().name("port_net".to_string()).spawn(move || {
51 let ret = net_work.run();
52 if ret.is_err() {
53 log::error!("net thread exit: {:?}", ret);
54 } else {
55 log::debug!("net thread exit");
56 }
57
58 inner.run.store(false, Ordering::Relaxed);
59 }).unwrap();
60
61 Ok(port)
62 }
63
64 pub fn stop(&self) {
65 self.inner.run.store(false, Ordering::Relaxed);
66 self.inner.queue.push(Packet::Close);
67 }
68
69 pub fn running(&self) -> bool {
70 self.inner.run.load(Ordering::Relaxed)
71 }
72
73 pub fn connect<A: ToSocketAddrs>(
74 &self,
75 addr: A,
76 slot_id: MessageId,
77 root: bool,
78 mut attr: Message,
79 crypto_options: Option<CryptoOptions>,
80 capacity: Option<usize>
81 ) -> Result<Wire<Message>> {
82
83 if !self.running() {
84 return Err(Error::ConnectionAborted("port is not run!".to_string()))
85 }
86
87 let mut stream = TcpStream::connect(addr)?;
88
89 stream.set_nodelay(true)?;
90 stream.set_nonblocking(false)?;
92 stream.set_read_timeout(Some(Duration::from_secs(10)))?;
93 stream.set_write_timeout(Some(Duration::from_secs(10)))?;
94
95 attr.insert(CHAN, HAND);
96 attr.insert(ADDR, stream.peer_addr()?.to_string());
97 attr.insert(SECURE, false);
98 attr.insert(SLOT_ID, slot_id);
99 attr.insert(ROOT, root);
100
101 let crypto = crypto_options.map(|options| {
102 attr.insert(SECURE, true);
103 attr.insert(METHOD, options.method.as_str());
104
105 Crypto::new(&options.method, options.secret.as_bytes())
106 });
107
108 let mut codec = C::new();
109
110 let bytes = codec.encode(&None, attr)?;
111
112 stream.write_all(&bytes)?;
113
114 let bytes = read_block(&mut stream, Some(1024))?;
116 let mut message = codec.decode(&None, bytes)?;
117
118 if let Some(code) = Code::get(&message) {
119 if code == Code::Ok {
120 message.remove(CHAN);
121 message.remove(CODE);
122
123 stream.set_nonblocking(true)?;
124 stream.set_read_timeout(None)?;
125 stream.set_write_timeout(None)?;
126 let (wire1, wire2) = Wire::pipe(capacity.unwrap_or(64), message)?;
130
131 self.inner.queue.push(Packet::NewConn {
132 wire: wire1,
133 stream,
134 codec,
135 crypto
136 });
137
138 return Ok(wire2)
139 } else {
140 return Err(Error::ErrorCode(code))
141 }
142 }
143
144 Err(Error::InvalidData(format!("{}", message)))
145 }
146}
147
148impl<C: Codec> Drop for Port<C> {
149 fn drop(&mut self) {
150 if Arc::strong_count(&self.inner) <= 2 {
151 self.stop()
152 }
153 }
154}