use std::fmt::{Debug};
use std::clone::{Clone};
use std::net::SocketAddr;
use tokio::prelude::*;
use tokio::spawn;
use tokio_codec::{Encoder, Decoder};
use tokio_tcp::{TcpListener, TcpStream};
use crate::server::Server;
use crate::connection::Connection;
use crate::error::Error;
pub type TcpServer<C> = Server<TcpStream, C, TcpInfo>;
pub type TcpConnection<C> = Connection<TcpStream, C>;
impl <C> TcpConnection<C>
where
C: Encoder + Decoder + Clone + Send + 'static,
<C as Decoder>::Item: Send,
<C as Decoder>::Error: Send + Debug,
{
pub fn new(addr: &SocketAddr, codec: C) -> impl Future<Item=Connection<TcpStream, C>, Error=Error> {
debug!("[connector] creating connection (tcp address: {})", addr);
TcpStream::connect(&addr).map(move |s| {
Connection::from_socket(s, codec)
}).map_err(|e| e.into() )
}
pub fn close(self) {
self.shutdown();
}
}
#[derive(Clone, Debug)]
pub struct TcpInfo {
pub address: SocketAddr,
}
impl<C> TcpServer<C>
where
C: Encoder + Decoder + Clone + Send + 'static,
<C as Decoder>::Item: Clone + Send + Debug,
<C as Decoder>::Error: Send + Debug,
<C as Encoder>::Item: Clone + Send + Debug,
<C as Encoder>::Error: Send + Debug,
{
pub fn new(address: &SocketAddr, codec: C) -> Result<TcpServer<C>, Error> {
let server = Server::base(codec);
let socket = TcpListener::bind(&address)?;
let exit_rx = server.exit_rx.lock().unwrap().take();
let mut server_int = server.clone();
let tokio_server = socket
.incoming()
.for_each(move |s| {
debug!("[server] accept connection: {:?}", s);
let info = TcpInfo{address: s.peer_addr().unwrap()};
server_int.bind(info, s);
Ok(())
})
.map_err(|err| {
error!("[server] accept error: {:?}", err);
})
.select2(exit_rx)
.then(|_| {
debug!("[server] closing listener");
Ok(())
});
spawn(tokio_server);
Ok(server)
}
pub fn connect(&mut self, address: SocketAddr) -> impl Future<Item=(), Error=Error> {
let mut s = self.clone();
TcpStream::connect(&address).map(move |socket| {
let info = TcpInfo{address: address.clone()};
s.bind(info, socket);
()
}).map_err(|e| e.into() )
}
}