msg_transmitter/
uds.rs

1extern crate tokio_uds;
2extern crate tokio;
3extern crate futures;
4extern crate tokio_codec;
5extern crate serde;
6extern crate bincode;
7extern crate bytes;
8
9use super::*;
10use std::path::Path;
11use self::tokio_uds::{UnixStream, UnixListener};
12
13#[derive(Debug)]
14pub struct UDSMsgServer<T> {
15    // path is the Path bounded by UnixListener.
16    // connetions is used to map client's name to sender of channel.
17    pub path_name: String,
18    pub name: String,
19    pub connections: Arc<Mutex<HashMap<String, mpsc::Sender<Option<T>>>>>,
20}
21
22impl<T> UDSMsgServer<T>
23    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
24{
25    /// *addr* is socket address. like: 127.0.0.1:6666.
26    /// *name* is the server's name, to identity which server it is.
27    pub fn new(path_name: &str, server_name: &str) -> UDSMsgServer<T> {
28        UDSMsgServer {
29            path_name: String::from(path_name),
30            name: String::from(server_name),
31            connections: Arc::new(Mutex::new(HashMap::new())),
32        }
33    }
34
35    /// *first_msg* is the first message that server send to the client which just
36    /// connect to server.
37    /// *process_fuction* receive a tuple of <client_name, message>, and return
38    /// a series of tuple of (client_name,message) indicating which message will be
39    /// sent to which client. Note that if you want to send a message to current
40    /// client, you should set client_name as a string with 0 length, i.e. ""
41    pub fn start_server<F>(&self, first_msg: T,
42                           process_function: F)
43                           -> Box<Future<Item=(), Error=()> + Send + 'static>
44        where F: FnMut(String, T) -> Vec<(String, T)> + Send + Sync + 'static + Clone
45    {
46        let path = Path::new(&self.path_name);
47        let listener = UnixListener::bind(path)
48            .expect("unable to bind Unix listener");
49        start_server(listener.incoming(),
50                     first_msg,
51                     process_function,
52                     self.name.clone(),
53                     self.connections.clone())
54    }
55}
56
57#[derive(Debug)]
58pub struct UDSMsgClient<T> {
59    // path_name is the Path connected by client.
60    // phantom is just used to avoid compile error.
61    path_name: String,
62    name: String,
63    phantom: PhantomData<T>,
64}
65
66impl<T> UDSMsgClient<T>
67    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
68{
69    /// *addr* is socket address. like: 127.0.0.1:6666.
70    /// *name* is the client's name, to identity which client it is.
71    pub fn new(path_name: &str, client_name: &str) -> UDSMsgClient<T> {
72        UDSMsgClient {
73            path_name: String::from(path_name),
74            name: String::from(client_name),
75            phantom: PhantomData,
76        }
77    }
78
79    /// process_function receive a message from server and send a series
80    /// of messages to server
81    pub fn start_client<F>(&self, process_function: F) -> Box<Future<Item=(), Error=()> + Send + 'static>
82        where F: FnMut(T) -> Vec<T> + Send + Sync + 'static
83    {
84        start_client(UnixStream::connect(Path::new(&self.path_name)),
85                     self.name.clone(),
86                     process_function)
87    }
88}