1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use std::sync::{
    Arc,
    atomic::{AtomicBool, Ordering}
};

use std::thread;
use std::net::ToSocketAddrs;

use crate::net::{NetWork, Packet, CryptoOptions};
use crate::net::tcp_ext::TcpExt;
use crate::Stream;
use crate::dict::*;
use crate::nson::{msg, Message};
use crate::error::Result;

use queen_io::tcp::TcpStream;
use queen_io::queue::mpsc::Queue;

#[derive(Clone)]
pub struct Port {
    queue: Queue<Packet>,
    pub tcp_keep_alive: bool,
    pub tcp_keep_idle: u32,
    pub tcp_keep_intvl: u32,
    pub tcp_keep_cnt: u32,
    run: Arc<AtomicBool>
}

impl Port {
    pub fn new() -> Result<Port> {
        let port = Port {
            queue: Queue::new()?,
            tcp_keep_alive: true,
            tcp_keep_idle: 30,
            tcp_keep_intvl: 5,
            tcp_keep_cnt: 3,
            run: Arc::new(AtomicBool::new(true))
        };

        let mut net_work = NetWork::new(port.queue.clone(), port.run.clone())?;

        thread::Builder::new().name("port_net".to_string()).spawn(move || {
            let ret = net_work.run();
            if ret.is_err() {
                log::error!("net thread exit: {:?}", ret);
            } else {
                log::debug!("net thread exit");
            }

            net_work.run.store(false, Ordering::Relaxed);
        }).unwrap();

        Ok(port)
    }

    pub fn stop(&self) {
        self.run.store(false, Ordering::Relaxed);
    }

    pub fn is_run(&self) -> bool {
        self.run.load(Ordering::Relaxed)
    }

    pub fn connect<A: ToSocketAddrs>(
        &self,
        addr: A,
        attr: Message,
        crypto_options: Option<CryptoOptions>,
        capacity: Option<usize>
    ) -> Result<Stream<Message>> {
        let socket = TcpStream::connect(addr)?;

        socket.set_keep_alive(self.tcp_keep_alive)?;
        socket.set_keep_idle(self.tcp_keep_idle as i32)?;
        socket.set_keep_intvl(self.tcp_keep_intvl as i32)?;
        socket.set_keep_cnt(self.tcp_keep_cnt as i32)?;

        let mut attr2 = msg!{
            ADDR: socket.peer_addr()?.to_string(),
            SECURE: crypto_options.is_some()
        };

        attr2.extend(attr);

        let (stream1, stream2) = Stream::pipe(capacity.unwrap_or(64), attr2).unwrap();

        self.queue.push(Packet::NewConn {
            net_stream: socket,
            stream: stream1,
            options: crypto_options
        });

        Ok(stream2)
    }
}