use std::io::{Error, ErrorKind};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::mpsc::TryRecvError;
use std::collections::VecDeque;
use shared::stats::{Stats, StatsCollector};
use shared::ticker::Ticker;
use super::{
Config,
Connection, ConnectionEvent,
RateLimiter, PacketModifier, Socket
};
#[derive(Debug, PartialEq)]
pub enum ClientEvent {
Connection,
ConnectionFailed,
ConnectionLost(bool),
ConnectionClosed(bool),
Message(Vec<u8>),
PacketLost(Vec<u8>),
ConnectionCongestionStateChanged(bool)
}
#[derive(Debug)]
pub struct Client<S: Socket, R: RateLimiter, M: PacketModifier> {
config: Config,
socket: Option<S>,
connection: Option<Connection<R, M>>,
ticker: Ticker,
peer_address: Option<SocketAddr>,
local_address: Option<SocketAddr>,
events: VecDeque<ClientEvent>,
should_receive: bool,
stats_collector: StatsCollector,
stats: Stats
}
impl<S: Socket, R: RateLimiter, M: PacketModifier> Client<S, R, M> {
pub fn new(config: Config) -> Client<S, R, M> {
Client {
config: config,
socket: None,
connection: None,
ticker: Ticker::new(config),
peer_address: None,
local_address: None,
events: VecDeque::new(),
should_receive: false,
stats_collector: StatsCollector::new(config),
stats: Stats {
bytes_sent: 0,
bytes_received: 0
}
}
}
pub fn bytes_sent(&self) -> u32 {
self.stats.bytes_sent
}
pub fn bytes_received(&self) -> u32 {
self.stats.bytes_received
}
pub fn peer_addr(&self) -> Result<SocketAddr, Error> {
self.peer_address.ok_or_else(|| Error::new(ErrorKind::AddrNotAvailable, ""))
}
pub fn local_addr(&self) -> Result<SocketAddr, Error> {
self.local_address.ok_or_else(|| Error::new(ErrorKind::AddrNotAvailable, ""))
}
pub fn connection(&mut self) -> Result<&mut Connection<R, M>, Error> {
if let Some(connection) = self.connection.as_mut() {
Ok(connection)
} else {
Err(Error::new(ErrorKind::NotConnected, ""))
}
}
pub fn socket(&mut self) -> Result<&mut S, Error> {
if let Some(socket) = self.socket.as_mut() {
Ok(socket)
} else {
Err(Error::new(ErrorKind::NotConnected, ""))
}
}
pub fn config(&self) -> Config {
self.config
}
pub fn set_config(&mut self, config: Config) {
self.config = config;
self.ticker.set_config(config);
self.stats_collector.set_config(config);
if let Some(connection) = self.connection.as_mut() {
connection.set_config(config);
}
}
pub fn connect<A: ToSocketAddrs>(&mut self, addr: A) -> Result<(), Error> {
if self.socket.is_none() {
let socket = try!(S::new(
"0.0.0.0:0",
self.config.packet_max_size
));
let peer_addr = try!(addr.to_socket_addrs()).nth(0).unwrap();
let local_addr = try!(socket.local_addr());
self.socket = Some(socket);
self.peer_address = Some(peer_addr);
self.local_address = Some(local_addr);
self.connection = Some(Connection::new(
self.config,
local_addr,
peer_addr,
R::new(self.config),
M::new(self.config)
));
self.should_receive = true;
Ok(())
} else {
Err(Error::new(ErrorKind::AlreadyExists, ""))
}
}
pub fn receive(&mut self) -> Result<ClientEvent, TryRecvError> {
if self.socket.is_none() {
Err(TryRecvError::Disconnected)
} else {
if self.should_receive {
self.ticker.begin_tick();
let peer_address = self.peer_address.unwrap();
let mut bytes_received = 0;
while let Ok((addr, packet)) = self.socket.as_mut().unwrap().try_recv() {
if addr == peer_address {
bytes_received += packet.len();
self.connection.as_mut().unwrap().receive_packet(packet);
}
}
self.stats_collector.set_bytes_received(bytes_received as u32);
for e in self.connection.as_mut().unwrap().events() {
self.events.push_back(match e {
ConnectionEvent::Connected => ClientEvent::Connection,
ConnectionEvent::FailedToConnect => ClientEvent::ConnectionFailed,
ConnectionEvent::Lost(by_remote) => ClientEvent::ConnectionLost(by_remote),
ConnectionEvent::Closed(by_remote) => ClientEvent::ConnectionClosed(by_remote),
ConnectionEvent::Message(payload) => ClientEvent::Message(payload),
ConnectionEvent::CongestionStateChanged(c) => ClientEvent::ConnectionCongestionStateChanged(c),
ConnectionEvent::PacketLost(payload) => ClientEvent::PacketLost(payload)
});
}
self.should_receive = false;
}
if let Some(event) = self.events.pop_front() {
Ok(event)
} else {
Err(TryRecvError::Empty)
}
}
}
pub fn send(&mut self, auto_tick: bool) -> Result<(), Error> {
if self.socket.is_some() {
let peer_address = self.peer_address.unwrap();
let bytes_sent = self.connection.as_mut().unwrap().send_packet(
self.socket.as_mut().unwrap(),
&peer_address
);
self.stats_collector.set_bytes_sent(bytes_sent);
self.stats_collector.tick();
self.stats = self.stats_collector.average();
self.should_receive = true;
if auto_tick {
self.ticker.end_tick();
}
Ok(())
} else {
Err(Error::new(ErrorKind::NotConnected, ""))
}
}
pub fn reset(&mut self) -> Result<(), Error> {
if self.socket.is_some() {
self.connection.as_mut().unwrap().reset();
self.stats_collector.reset();
self.stats.reset();
self.events.clear();
self.ticker.reset();
Ok(())
} else {
Err(Error::new(ErrorKind::NotConnected, ""))
}
}
pub fn disconnect(&mut self) -> Result<(), Error> {
if self.socket.is_some() {
self.reset().ok();
self.should_receive = false;
self.peer_address = None;
self.local_address = None;
self.connection = None;
self.socket = None;
Ok(())
} else {
Err(Error::new(ErrorKind::NotConnected, ""))
}
}
}