use std;
use std::result::Result;
use std::io;
use std::net::SocketAddr;
use std::sync::mpsc;
use futures::{self, sync};
use tokio_core::reactor::Remote;
use tokio_core::net::{UdpSocket, UdpCodec};
use slog::Logger;
use serde_json;
use super::{GameMessage, WireMessage, SendWireMessage, RecvWireMessage};
struct Codec<G> {
log: Logger,
_phantom_game_message: std::marker::PhantomData<G>,
}
impl<G: GameMessage> UdpCodec for Codec<G> {
type In = RecvWireMessage<G>;
type Out = SendWireMessage<G>;
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<RecvWireMessage<G>> {
serde_json::from_slice::<WireMessage<G>>(buf)
.map(|message| {
RecvWireMessage {
src: *src,
message: Result::Ok(message)
}
})
.or_else(|error| {
warn!(self.log, "Got a bad message from peer"; "peer_addr" => format!("{:?}", src), "message" => format!("{:?}", buf), "error" => format!("{:?}", error));
Ok(RecvWireMessage {
src: *src,
message: Result::Err(())
})
})
}
fn encode(&mut self, message: SendWireMessage<G>, buf: &mut Vec<u8>) -> SocketAddr {
serde_json::to_writer(buf, &message.message).expect("Error encoding message");
message.dest
}
}
pub fn start_udp_server<G: GameMessage, MaybePort>(
parent_log: &Logger,
recv_system_sender: mpsc::Sender<RecvWireMessage<G>>,
send_system_udp_receiver: sync::mpsc::Receiver<SendWireMessage<G>>,
remote: Remote,
port: MaybePort
) -> u16
where MaybePort: Into<Option<u16>>
{
use futures::{Future, Stream, Sink};
let (actual_port_tx, actual_port_rx) = std::sync::mpsc::channel::<u16>();
let addr = format!("0.0.0.0:{}", port.into().unwrap_or(0));
let addr = addr.parse::<SocketAddr>().unwrap();
let server_log = parent_log.new(o!());
let server_error_log = server_log.new(o!());
let sink_error_log = server_log.new(o!());
let codec_log = parent_log.new(o!());
remote.spawn(move |handle| {
let socket = UdpSocket::bind(&addr, &handle).expect("Failed to bind server socket");
let actual_addr = socket.local_addr().expect("Socket isn't bound");
info!(server_log, "UDP server listening"; "addr" => format!("{}", actual_addr));
actual_port_tx.send(actual_addr.port()).expect("Receiver hung up");
let codec = Codec::<G>{
log: codec_log,
_phantom_game_message: std::marker::PhantomData,
};
let (sink, stream) = socket.framed(codec).split();
let sink = sink.sink_map_err(move |err| {
error!(sink_error_log, "Unexpected error in sending to sink"; "err" => format!("{}", err));
()
});
let tx_f = sink.send_all(send_system_udp_receiver).map(|_| ());
handle.spawn(tx_f);
let rx_f = stream
.filter(|recv_wire_message| {
match recv_wire_message.message {
Result::Err(_) => {
println!("Got a bad message from peer");
false
}
_ => true,
}
})
.for_each(move |recv_wire_message| {
trace!(server_log, "Got recv_wire_message"; "recv_wire_message" => format!("{:?}", recv_wire_message));
recv_system_sender.send(recv_wire_message).expect("Receiver hung up?");
futures::future::ok(())
}).or_else(move |error| {
info!(server_error_log, "Something broke in listening for connections"; "error" => format!("{}", error));
futures::future::ok(())
});
rx_f
});
actual_port_rx.recv().expect("Sender hung up")
}
#[cfg(test)]
mod tests {
use super::*;
use std;
use std::thread;
use std::time::Duration;
use futures::Future;
use tokio_core::reactor::{Core, Timeout};
use tokio_core::net::UdpSocket;
use slog;
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
struct TestMessage {}
impl GameMessage for TestMessage{}
#[test]
fn receive_corrupt_message() {
let (remote_tx, remote_rx) = mpsc::channel::<Remote>();
thread::Builder::new()
.name("tcp_server".to_string())
.spawn(move || {
let mut reactor = Core::new().expect("Failed to create reactor for network server");
remote_tx.send(reactor.remote()).expect("Receiver hung up");
reactor.run(futures::future::empty::<(), ()>()).expect("Network server reactor failed");
}).expect("Failed to spawn server thread");
let remote = remote_rx.recv().expect("Sender hung up");
let drain = slog::Discard;
let log = slog::Logger::root(drain, o!("pk_version" => env!("CARGO_PKG_VERSION")));
let (recv_tx, recv_rx) = mpsc::channel::<RecvWireMessage<TestMessage>>();
let (_send_tx, send_rx) = sync::mpsc::channel::<SendWireMessage<TestMessage>>(10);
let server_port = start_udp_server(&log, recv_tx, send_rx, remote, None);
let addr = "0.0.0.0:0".to_string();
let addr = addr.parse::<SocketAddr>().unwrap();
let mut reactor = Core::new().expect("Failed to create reactor");
let handle = reactor.handle();
let socket = UdpSocket::bind(&addr, &handle).expect("Failed to bind socket");
let dest_addr = format!("127.0.0.1:{}", server_port);
let dest_addr: SocketAddr = dest_addr.parse().unwrap();
let f = socket.send_dgram(b"\"hello\"", dest_addr).and_then(
|(socket2, _buf)| {
Timeout::new(Duration::from_millis(10), &handle).expect("Failed to set timeout").and_then(
move |_| {
socket2.send_dgram(b"{\"Game\":{}}", dest_addr)
}
)
},
);
reactor.run(f).expect("Test reactor failed");
std::thread::sleep(std::time::Duration::from_millis(100));
let recv_wire_message = recv_rx.recv().expect("Should have been something on the channel");
assert_eq!(recv_wire_message.message, Ok(WireMessage::Game(TestMessage{})));
assert_eq!(recv_rx.try_recv(), Err(mpsc::TryRecvError::Empty));
}
}