msg_transmitter/
lib.rs

1//! # msg-transmitter
2//! ## Overview
3//! It is a library of single server multiple clients model. The main purpose of this library
4//! is helping users more focus on communication logic instead of low-level networking design.
5//! User can transmit any structs between server and client.
6//!
7//! User is able to choose either tcp-based or uds-based connection. Note that tcp-based connection
8//! can support both Windows and *nux, but uds-based connection only can support *nux.
9//!
10//! ## dependances
11//! - Main networking architecture impletmented by asynchronous
12//! framework [tokio](https://github.com/tokio-rs/tokio) and
13//! [futures](https://github.com/rust-lang-nursery/futures-rs).
14//!
15//! - User data are transfered to bytes by serialization framework
16//! [serde](https://github.com/serde-rs/serde) and binary encoder/decoder
17//! crate [bincode](https://github.com/TyOverby/bincode).
18//!
19//! ## example
20//! Examples can be found [here](https://github.com/ChijinZ/msg-transmitter/tree/master/examples).
21//!
22//! ## Design
23//! Design can be found [here](https://github.com/ChijinZ/msg-transmitter/blob/dev/readme.md)
24//!
25//! This crate is created by ChijinZ(tlock.chijin@gmail.com).
26
27//#![deny(warnings, missing_debug_implementations)]
28
29
30extern crate tokio;
31extern crate futures;
32extern crate tokio_codec;
33extern crate serde;
34extern crate bincode;
35extern crate bytes;
36
37use tokio::net;
38use tokio::prelude::*;
39use bincode::{deserialize, serialize};
40use tokio::io;
41use tokio_codec::*;
42use bytes::{BufMut, BytesMut};
43use std::marker::PhantomData;
44use std::sync::{Arc, Mutex};
45use std::collections::HashMap;
46use futures::sync::mpsc;
47
48// The number of bytes to represent data size.
49const DATA_SIZE: usize = 4;
50
51// This struct is to build a tokio framed to encode messages to bytes and decode bytes to messages.
52// 'T' represents user-defined message type.
53#[derive(Debug)]
54struct MessageCodec<T> {
55    name: String,
56    phantom: PhantomData<T>,
57}
58
59impl<T> MessageCodec<T> {
60    pub fn new(name: String) -> MessageCodec<T> {
61        MessageCodec { name: name, phantom: PhantomData }
62    }
63}
64
65// A u64 to Vec<u8> function to convert decimal to 256 hexadecimal.
66pub fn number_to_four_vecu8(num: u64) -> Vec<u8> {
67    assert!(num < (1 << 32));
68    let mut result: Vec<u8> = vec![];
69    let mut x = num;
70    loop {
71        if x / 256 > 0 {
72            result.push((x % 256) as u8);
73            x = x / 256;
74        } else {
75            result.push((x % 256) as u8);
76            break;
77        }
78    }
79    for _ in 0..(DATA_SIZE - result.len()) {
80        result.push(0);
81    }
82    result.reverse();
83    return result;
84}
85
86// A Vec<u8> to u64 function to convert 256 hexadecimal to decimal.
87pub fn four_vecu8_to_number(vec: Vec<u8>) -> u64 {
88    assert_eq!(vec.len(), DATA_SIZE);
89    let num = vec[0] as u64 * 256 * 256 * 256 + vec[1] as u64 * 256 * 256
90        + vec[2] as u64 * 256 + vec[3] as u64;
91    return num;
92}
93
94impl<T> Encoder for MessageCodec<T> where T: serde::Serialize {
95    type Item = Option<T>;
96    type Error = io::Error;
97    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
98        let mut data: Vec<u8> = vec![];
99        match item {
100            // If None, it is a register information, so state is 0 and data is register
101            // information (i.e. name).
102            None => {
103                data.push(0 as u8);
104                let mut name = self.name.clone().into_bytes();
105                data.append(&mut name);
106            }
107            // If not None, it is user's message, so state is 1.
108            Some(v) => {
109                data.push(1 as u8);
110                data.append(&mut serialize(&v).unwrap());
111            }
112        }
113        let mut encoder: Vec<u8> = number_to_four_vecu8(data.len() as u64);
114        encoder.append(&mut data);
115        dst.reserve(encoder.len());
116        dst.put(encoder);
117        Ok(())
118    }
119}
120
121impl<T> Decoder for MessageCodec<T> where T: serde::de::DeserializeOwned {
122    type Item = (Option<String>, Option<T>);
123    type Error = io::Error;
124    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
125        if src.len() < DATA_SIZE {
126            Ok(None)
127        } else {
128            let mut vec: Vec<u8> = src.to_vec();
129            let mut truth_data = vec.split_off(DATA_SIZE);
130            let vec_length = four_vecu8_to_number(vec);
131            if truth_data.len() == vec_length as usize {
132                let msg_data = truth_data.split_off(1);
133                src.clear();
134                match truth_data[0] {
135                    // Deserialize it is register information or user's message.
136                    0 => {
137                        Ok(Some((Some(String::from_utf8(msg_data).unwrap()), None)))
138                    }
139                    1 => {
140                        let msg: T = deserialize(&msg_data).unwrap();
141                        Ok(Some((None, Some(msg))))
142                    }
143                    _ => {
144                        panic!("unexpected message");
145                    }
146                }
147            } else {
148                Ok(None)
149            }
150        }
151    }
152}
153
154//T is user message type
155//F is the closure of control logic
156//U is to abstract tcp and uds
157fn start_server<T, F, U>(incoming: U, first_msg: T,
158                         process_function: F,
159                         server_name: String,
160                         connections_outer: Arc<Mutex<HashMap<String, mpsc::Sender<Option<T>>>>>)
161                         -> Box<Future<Item=(), Error=()> + Send + 'static>
162    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone,
163          F: FnMut(String, T) -> Vec<(String, T)> + Send + Sync + 'static + Clone,
164          U: Stream + Send + Sync + 'static,
165          <U as futures::Stream>::Item: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync,
166          <U as futures::Stream>::Error: std::fmt::Debug + 'static + Send + Sync
167{
168    Box::new(
169        incoming
170            .for_each(move |stream| {
171                let process_function_outer = process_function.clone();
172                let server_name = server_name.clone();
173                let first_msg_inner = first_msg.clone();
174                let connections = connections_outer.clone();
175
176                // Create a mpsc::channel in order to build a bridge between sender task and receiver
177                // task.
178                let (tx, rx): (mpsc::Sender<Option<T>>, mpsc::Receiver<Option<T>>) = mpsc::channel(0);
179                let rx: Box<Stream<Item=Option<T>, Error=io::Error> + Send> = Box::new(rx.map_err(|_| panic!()));
180
181                // Split tcp_stream to sink and stream. Sink responses to send messages to this
182                // client, stream responses to receive messages from this client.
183                let (sink, stream) = MessageCodec::new(server_name).framed(stream).split();
184
185                // Spawn a sender task.
186                let send_to_client = rx.forward(sink).then(|result| {
187                    if let Err(e) = result {
188                        panic!("failed to write to socket: {}", e)
189                    }
190                    Ok(())
191                });
192                tokio::spawn(send_to_client);
193
194                // To record the client_name
195                let mut client_name = String::new();
196
197                // Spawn a receiver task.
198                let receive_and_process = stream.for_each(move |(name, msg): (Option<String>, Option<T>)| {
199                    let connections_inner = connections.clone();
200                    match name {
201                        // If it is a register information, register to connections.
202                        Some(register_name) => {
203                            client_name = register_name.clone();
204                            let mut tx_inner = tx.clone();
205                            tx_inner.try_send(Some(first_msg_inner.clone())).unwrap();
206                            connections_inner.lock().unwrap().insert(register_name, tx_inner);
207                        }
208                        // If it is a user's message, process it.
209                        None => {
210                            let msg = msg.unwrap();
211                            let mut process_function_inner = process_function_outer.clone();
212                            let dest_and_msg = process_function_inner(client_name.clone(), msg);
213                            for (dest, msg) in dest_and_msg {
214                                if dest == "" {
215                                    let mut tx_inner = tx.clone();
216                                    tx_inner.try_send(Some(msg)).unwrap();
217                                } else {
218                                    if connections_inner.lock().unwrap().contains_key(&dest) {
219                                        connections_inner.lock().unwrap().get_mut(&dest).unwrap()
220                                            .try_send(Some(msg)).unwrap();
221                                    } else {
222                                        println!("{} doesn't register", dest);
223                                    }
224                                }
225                            }
226                        }
227                    }
228                    Ok(())
229                }).map_err(move |_| {
230                    println!("closed connection");
231                });
232
233                tokio::spawn(receive_and_process);
234                Ok(())
235            }).map_err(|e| { println!("{:?}", e); })
236    )
237}
238
239//T is user message type
240//F is the closure of control logic
241//U is to abstract tcp and uds
242fn start_client<T, F, U>(connect: U,
243                         client_name: String,
244                         mut process_function: F)
245                         -> Box<Future<Item=(), Error=()> + Send + 'static>
246    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone,
247          F: FnMut(T) -> Vec<T> + Send + Sync + 'static,
248          U: Future + Send + Sync + 'static,
249          <U as futures::Future>::Item: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync,
250          <U as futures::Future>::Error: std::fmt::Debug + 'static + Send + Sync
251{
252    // Create a mpsc::channel in order to build a bridge between sender task and receiver
253    // task.
254    let (mut tx, rx): (mpsc::Sender<Option<T>>, mpsc::Receiver<Option<T>>) = mpsc::channel(0);
255    let rx: Box<Stream<Item=Option<T>, Error=io::Error> + Send> = Box::new(rx.map_err(|_| panic!()));
256
257    Box::new(
258        connect.and_then(move |mut tcp_stream| {
259            let mut message_codec: MessageCodec<T> = MessageCodec::new(client_name);
260
261            // Send register information to server.
262            let register_msg = None;
263            let mut buf = BytesMut::new();
264            let _ = message_codec.encode(register_msg, &mut buf);
265            let _ = tcp_stream.write_all(&buf);
266
267
268            let (sink, stream) = message_codec.framed(tcp_stream).split();
269
270            // Spawn a sender task.
271            let send_to_server = rx.forward(sink).then(|result| {
272                if let Err(e) = result {
273                    panic!("failed to write to socket: {}", e)
274                }
275                Ok(())
276            });
277            tokio::spawn(send_to_server);
278
279            // Spawn a receiver task.
280            let receive_and_process = stream.for_each(move |(name, msg): (Option<String>, Option<T>)| {
281                match name {
282                    Some(_) => {
283                        panic!("client received unexpected message");
284                    }
285                    None => {
286                        let msg = msg.unwrap();
287                        let msgs = process_function(msg);
288                        for msg in msgs {
289                            tx.try_send(Some(msg)).unwrap();
290                        }
291                    }
292                }
293                Ok(())
294            }).map_err(move |_| { println!("server closed"); });
295            tokio::spawn(receive_and_process);
296            Ok(())
297        }).map_err(|e| { println!("{:?}", e); })
298    )
299}
300
301#[allow(dead_code)]
302pub mod tcp;
303
304#[allow(dead_code)]
305pub mod uds;
306
307pub fn create_tcp_server<T>(addr: &str, server_name: &str) -> tcp::TCPMsgServer<T>
308    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
309{
310    tcp::TCPMsgServer::new(addr, server_name)
311}
312
313pub fn create_tcp_client<T>(addr: &str, client_name: &str) -> tcp::TCPMsgClient<T>
314    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
315{
316    tcp::TCPMsgClient::new(addr, client_name)
317}
318
319pub fn create_uds_server<T>(addr: &str, server_name: &str) -> uds::UDSMsgServer<T>
320    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
321{
322    uds::UDSMsgServer::new(addr, server_name)
323}
324
325pub fn create_uds_client<T>(addr: &str, client_name: &str) -> uds::UDSMsgClient<T>
326    where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
327{
328    uds::UDSMsgClient::new(addr, client_name)
329}