msg_transmitter/
tcp.rs

1extern crate tokio;
2extern crate futures;
3extern crate tokio_codec;
4extern crate serde;
5extern crate bincode;
6extern crate bytes;
7
8use std::net::SocketAddr;
9
10use super::*;
11
12#[derive(Debug)]
13pub struct TCPMsgServer<T> {
14    // addr is the socket address which server will bind and listen to.
15    // connetions is used to map client's name to sender of channel.
16    pub addr: SocketAddr,
17    pub name: String,
18    pub connections: Arc<Mutex<HashMap<String, mpsc::Sender<Option<T>>>>>,
19}
20
21impl<T> TCPMsgServer<T>
22    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
23{
24    /// *addr* is socket address. like: 127.0.0.1:6666.
25    /// *name* is the server's name, to identity which server it is.
26    pub fn new(addr: &str, server_name: &str) -> TCPMsgServer<T> {
27        let socket_addr = addr.parse::<SocketAddr>().unwrap();
28        TCPMsgServer {
29            addr: socket_addr,
30            name: String::from(server_name),
31            connections: Arc::new(Mutex::new(HashMap::new())),
32        }
33    }
34
35    /// *first_msg* is the first message that server send to the client which just
36    /// connect to server.
37    /// *process_fuction* receive a tuple of <client_name, message>, and return
38    /// a series of tuple of (client_name,message) indicating which message will be
39    /// sent to which client. Note that if you want to send a message to current
40    /// client, you should set client_name as a string with 0 length, i.e. "" .
41    pub fn start_server<F>(&self, first_msg: T,
42                           process_function: F)
43                           -> Box<Future<Item=(), Error=()> + Send + 'static>
44        where F: FnMut(String, T) -> Vec<(String, T)> + Send + Sync + 'static + Clone
45    {
46        let listener = net::TcpListener::bind(&self.addr)
47            .expect("unable to bind TCP listener");
48        start_server(listener.incoming(),
49                     first_msg,
50                     process_function,
51                     self.name.clone(),
52                     self.connections.clone())
53        //tokio::run(done);
54    }
55}
56
57#[derive(Debug)]
58pub struct TCPMsgClient<T> {
59    // addr is the socket address which client will connect to.
60    // phantom is just used to avoid compile error.
61    connect_addr: SocketAddr,
62    name: String,
63    phantom: PhantomData<T>,
64}
65
66impl<T> TCPMsgClient<T>
67    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
68{
69    /// *addr* is socket address. like: 127.0.0.1:6666.
70    /// *name* is the client's name, to identity which client it is.
71    pub fn new(addr: &str, client_name: &str) -> TCPMsgClient<T> {
72        let socket_addr = addr.parse::<SocketAddr>().unwrap();
73        TCPMsgClient {
74            connect_addr: socket_addr,
75            name: String::from(client_name),
76            phantom: PhantomData,
77        }
78    }
79
80    /// process_function receive a message from server and send a series
81    /// of messages to server
82    pub fn start_client<F>(&self, process_function: F) -> Box<Future<Item=(), Error=()> + Send + 'static>
83        where F: FnMut(T) -> Vec<T> + Send + Sync + 'static
84    {
85        start_client(net::TcpStream::connect(&self.connect_addr),
86                     self.name.clone(),
87                     process_function)
88
89        //tokio::run(done);
90    }
91}