1extern crate tokio;
2extern crate futures;
3extern crate tokio_codec;
4extern crate serde;
5extern crate bincode;
6extern crate bytes;
7
8use std::net::SocketAddr;
9
10use super::*;
11
12#[derive(Debug)]
13pub struct TCPMsgServer<T> {
14 pub addr: SocketAddr,
17 pub name: String,
18 pub connections: Arc<Mutex<HashMap<String, mpsc::Sender<Option<T>>>>>,
19}
20
21impl<T> TCPMsgServer<T>
22 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
23{
24 pub fn new(addr: &str, server_name: &str) -> TCPMsgServer<T> {
27 let socket_addr = addr.parse::<SocketAddr>().unwrap();
28 TCPMsgServer {
29 addr: socket_addr,
30 name: String::from(server_name),
31 connections: Arc::new(Mutex::new(HashMap::new())),
32 }
33 }
34
35 pub fn start_server<F>(&self, first_msg: T,
42 process_function: F)
43 -> Box<Future<Item=(), Error=()> + Send + 'static>
44 where F: FnMut(String, T) -> Vec<(String, T)> + Send + Sync + 'static + Clone
45 {
46 let listener = net::TcpListener::bind(&self.addr)
47 .expect("unable to bind TCP listener");
48 start_server(listener.incoming(),
49 first_msg,
50 process_function,
51 self.name.clone(),
52 self.connections.clone())
53 }
55}
56
57#[derive(Debug)]
58pub struct TCPMsgClient<T> {
59 connect_addr: SocketAddr,
62 name: String,
63 phantom: PhantomData<T>,
64}
65
66impl<T> TCPMsgClient<T>
67 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
68{
69 pub fn new(addr: &str, client_name: &str) -> TCPMsgClient<T> {
72 let socket_addr = addr.parse::<SocketAddr>().unwrap();
73 TCPMsgClient {
74 connect_addr: socket_addr,
75 name: String::from(client_name),
76 phantom: PhantomData,
77 }
78 }
79
80 pub fn start_client<F>(&self, process_function: F) -> Box<Future<Item=(), Error=()> + Send + 'static>
83 where F: FnMut(T) -> Vec<T> + Send + Sync + 'static
84 {
85 start_client(net::TcpStream::connect(&self.connect_addr),
86 self.name.clone(),
87 process_function)
88
89 }
91}