1extern crate tokio;
31extern crate futures;
32extern crate tokio_codec;
33extern crate serde;
34extern crate bincode;
35extern crate bytes;
36
37use tokio::net;
38use tokio::prelude::*;
39use bincode::{deserialize, serialize};
40use tokio::io;
41use tokio_codec::*;
42use bytes::{BufMut, BytesMut};
43use std::marker::PhantomData;
44use std::sync::{Arc, Mutex};
45use std::collections::HashMap;
46use futures::sync::mpsc;
47
48const DATA_SIZE: usize = 4;
50
51#[derive(Debug)]
54struct MessageCodec<T> {
55 name: String,
56 phantom: PhantomData<T>,
57}
58
59impl<T> MessageCodec<T> {
60 pub fn new(name: String) -> MessageCodec<T> {
61 MessageCodec { name: name, phantom: PhantomData }
62 }
63}
64
65pub fn number_to_four_vecu8(num: u64) -> Vec<u8> {
67 assert!(num < (1 << 32));
68 let mut result: Vec<u8> = vec![];
69 let mut x = num;
70 loop {
71 if x / 256 > 0 {
72 result.push((x % 256) as u8);
73 x = x / 256;
74 } else {
75 result.push((x % 256) as u8);
76 break;
77 }
78 }
79 for _ in 0..(DATA_SIZE - result.len()) {
80 result.push(0);
81 }
82 result.reverse();
83 return result;
84}
85
86pub fn four_vecu8_to_number(vec: Vec<u8>) -> u64 {
88 assert_eq!(vec.len(), DATA_SIZE);
89 let num = vec[0] as u64 * 256 * 256 * 256 + vec[1] as u64 * 256 * 256
90 + vec[2] as u64 * 256 + vec[3] as u64;
91 return num;
92}
93
94impl<T> Encoder for MessageCodec<T> where T: serde::Serialize {
95 type Item = Option<T>;
96 type Error = io::Error;
97 fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
98 let mut data: Vec<u8> = vec![];
99 match item {
100 None => {
103 data.push(0 as u8);
104 let mut name = self.name.clone().into_bytes();
105 data.append(&mut name);
106 }
107 Some(v) => {
109 data.push(1 as u8);
110 data.append(&mut serialize(&v).unwrap());
111 }
112 }
113 let mut encoder: Vec<u8> = number_to_four_vecu8(data.len() as u64);
114 encoder.append(&mut data);
115 dst.reserve(encoder.len());
116 dst.put(encoder);
117 Ok(())
118 }
119}
120
121impl<T> Decoder for MessageCodec<T> where T: serde::de::DeserializeOwned {
122 type Item = (Option<String>, Option<T>);
123 type Error = io::Error;
124 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
125 if src.len() < DATA_SIZE {
126 Ok(None)
127 } else {
128 let mut vec: Vec<u8> = src.to_vec();
129 let mut truth_data = vec.split_off(DATA_SIZE);
130 let vec_length = four_vecu8_to_number(vec);
131 if truth_data.len() == vec_length as usize {
132 let msg_data = truth_data.split_off(1);
133 src.clear();
134 match truth_data[0] {
135 0 => {
137 Ok(Some((Some(String::from_utf8(msg_data).unwrap()), None)))
138 }
139 1 => {
140 let msg: T = deserialize(&msg_data).unwrap();
141 Ok(Some((None, Some(msg))))
142 }
143 _ => {
144 panic!("unexpected message");
145 }
146 }
147 } else {
148 Ok(None)
149 }
150 }
151 }
152}
153
154fn start_server<T, F, U>(incoming: U, first_msg: T,
158 process_function: F,
159 server_name: String,
160 connections_outer: Arc<Mutex<HashMap<String, mpsc::Sender<Option<T>>>>>)
161 -> Box<Future<Item=(), Error=()> + Send + 'static>
162 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone,
163 F: FnMut(String, T) -> Vec<(String, T)> + Send + Sync + 'static + Clone,
164 U: Stream + Send + Sync + 'static,
165 <U as futures::Stream>::Item: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync,
166 <U as futures::Stream>::Error: std::fmt::Debug + 'static + Send + Sync
167{
168 Box::new(
169 incoming
170 .for_each(move |stream| {
171 let process_function_outer = process_function.clone();
172 let server_name = server_name.clone();
173 let first_msg_inner = first_msg.clone();
174 let connections = connections_outer.clone();
175
176 let (tx, rx): (mpsc::Sender<Option<T>>, mpsc::Receiver<Option<T>>) = mpsc::channel(0);
179 let rx: Box<Stream<Item=Option<T>, Error=io::Error> + Send> = Box::new(rx.map_err(|_| panic!()));
180
181 let (sink, stream) = MessageCodec::new(server_name).framed(stream).split();
184
185 let send_to_client = rx.forward(sink).then(|result| {
187 if let Err(e) = result {
188 panic!("failed to write to socket: {}", e)
189 }
190 Ok(())
191 });
192 tokio::spawn(send_to_client);
193
194 let mut client_name = String::new();
196
197 let receive_and_process = stream.for_each(move |(name, msg): (Option<String>, Option<T>)| {
199 let connections_inner = connections.clone();
200 match name {
201 Some(register_name) => {
203 client_name = register_name.clone();
204 let mut tx_inner = tx.clone();
205 tx_inner.try_send(Some(first_msg_inner.clone())).unwrap();
206 connections_inner.lock().unwrap().insert(register_name, tx_inner);
207 }
208 None => {
210 let msg = msg.unwrap();
211 let mut process_function_inner = process_function_outer.clone();
212 let dest_and_msg = process_function_inner(client_name.clone(), msg);
213 for (dest, msg) in dest_and_msg {
214 if dest == "" {
215 let mut tx_inner = tx.clone();
216 tx_inner.try_send(Some(msg)).unwrap();
217 } else {
218 if connections_inner.lock().unwrap().contains_key(&dest) {
219 connections_inner.lock().unwrap().get_mut(&dest).unwrap()
220 .try_send(Some(msg)).unwrap();
221 } else {
222 println!("{} doesn't register", dest);
223 }
224 }
225 }
226 }
227 }
228 Ok(())
229 }).map_err(move |_| {
230 println!("closed connection");
231 });
232
233 tokio::spawn(receive_and_process);
234 Ok(())
235 }).map_err(|e| { println!("{:?}", e); })
236 )
237}
238
239fn start_client<T, F, U>(connect: U,
243 client_name: String,
244 mut process_function: F)
245 -> Box<Future<Item=(), Error=()> + Send + 'static>
246 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone,
247 F: FnMut(T) -> Vec<T> + Send + Sync + 'static,
248 U: Future + Send + Sync + 'static,
249 <U as futures::Future>::Item: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync,
250 <U as futures::Future>::Error: std::fmt::Debug + 'static + Send + Sync
251{
252 let (mut tx, rx): (mpsc::Sender<Option<T>>, mpsc::Receiver<Option<T>>) = mpsc::channel(0);
255 let rx: Box<Stream<Item=Option<T>, Error=io::Error> + Send> = Box::new(rx.map_err(|_| panic!()));
256
257 Box::new(
258 connect.and_then(move |mut tcp_stream| {
259 let mut message_codec: MessageCodec<T> = MessageCodec::new(client_name);
260
261 let register_msg = None;
263 let mut buf = BytesMut::new();
264 let _ = message_codec.encode(register_msg, &mut buf);
265 let _ = tcp_stream.write_all(&buf);
266
267
268 let (sink, stream) = message_codec.framed(tcp_stream).split();
269
270 let send_to_server = rx.forward(sink).then(|result| {
272 if let Err(e) = result {
273 panic!("failed to write to socket: {}", e)
274 }
275 Ok(())
276 });
277 tokio::spawn(send_to_server);
278
279 let receive_and_process = stream.for_each(move |(name, msg): (Option<String>, Option<T>)| {
281 match name {
282 Some(_) => {
283 panic!("client received unexpected message");
284 }
285 None => {
286 let msg = msg.unwrap();
287 let msgs = process_function(msg);
288 for msg in msgs {
289 tx.try_send(Some(msg)).unwrap();
290 }
291 }
292 }
293 Ok(())
294 }).map_err(move |_| { println!("server closed"); });
295 tokio::spawn(receive_and_process);
296 Ok(())
297 }).map_err(|e| { println!("{:?}", e); })
298 )
299}
300
301#[allow(dead_code)]
302pub mod tcp;
303
304#[allow(dead_code)]
305pub mod uds;
306
307pub fn create_tcp_server<T>(addr: &str, server_name: &str) -> tcp::TCPMsgServer<T>
308 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
309{
310 tcp::TCPMsgServer::new(addr, server_name)
311}
312
313pub fn create_tcp_client<T>(addr: &str, client_name: &str) -> tcp::TCPMsgClient<T>
314 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
315{
316 tcp::TCPMsgClient::new(addr, client_name)
317}
318
319pub fn create_uds_server<T>(addr: &str, server_name: &str) -> uds::UDSMsgServer<T>
320 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
321{
322 uds::UDSMsgServer::new(addr, server_name)
323}
324
325pub fn create_uds_client<T>(addr: &str, client_name: &str) -> uds::UDSMsgClient<T>
326 where T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static + Clone
327{
328 uds::UDSMsgClient::new(addr, client_name)
329}