use toxcore::tcp::client::connection::*;
use toxcore::tcp::packet::*;
use toxcore::io_tokio::IoFuture;
use futures::prelude::*;
use futures::sync::mpsc;
use std::io::{Error, ErrorKind};
pub struct ClientProcessor {
pub from_client_tx: mpsc::UnboundedSender<OutgoingPacket>,
pub to_client_rx: mpsc::UnboundedReceiver<IncomingPacket>,
pub from_server_tx: mpsc::UnboundedSender<Packet>,
pub to_server_rx: mpsc::UnboundedReceiver<Packet>,
pub processor: IoFuture<()>
}
impl ClientProcessor {
pub fn new() -> ClientProcessor {
let (from_client_tx, from_client_rx) = mpsc::unbounded();
let (to_client_tx, to_client_rx) = mpsc::unbounded();
let (from_server_tx, from_server_rx) = mpsc::unbounded();
let (to_server_tx, to_server_rx) = mpsc::unbounded();
let connection = Connection::new(to_server_tx.clone(), to_client_tx.clone());
let connection_c = connection.clone();
let process_messages_from_server = from_server_rx
.map_err(|_| Error::from(ErrorKind::UnexpectedEof))
.for_each(move |packet| -> IoFuture<()> {
debug!("Handle packet from server: {:?}", packet);
connection_c.handle_from_server(packet)
})
.then(|res| {
debug!("process_messages_from_server ended with {:?}", res);
res
});
let connection_c = connection.clone();
let process_messages_from_client = from_client_rx
.map_err(|()| Error::from(ErrorKind::UnexpectedEof))
.for_each(move |packet| -> IoFuture<()> {
debug!("Handle packet from client: {:?}", packet);
connection_c.handle_from_client(packet)
})
.then(|res| {
debug!("process_messages_from_client ended with {:?}", res);
res
});
let processor = process_messages_from_server
.select(process_messages_from_client)
.map(|_| ())
.map_err(|(err, _select_next)| err);
let processor = Box::new(processor);
ClientProcessor { to_client_rx, from_client_tx, from_server_tx, to_server_rx, processor }
}
}
#[cfg(test)]
mod tests {
use toxcore::tcp::client::*;
use futures::Future;
use tokio;
#[test]
fn client_processor_shutdown_client() {
let ClientProcessor {
from_client_tx,
to_client_rx,
from_server_tx,
to_server_rx,
processor
} = ClientProcessor::new();
let client_processor = processor.map_err(|_| ());
drop(from_client_tx);
drop(to_client_rx);
let _from_server_tx = from_server_tx;
let _to_server_rx = to_server_rx;
tokio::run(client_processor);
}
#[test]
fn client_processor_shutdown_server() {
let ClientProcessor {
from_client_tx,
to_client_rx,
from_server_tx,
to_server_rx,
processor
} = ClientProcessor::new();
let client_processor = processor.map_err(|_| ());
drop(from_server_tx);
drop(to_server_rx);
let _from_client_tx = from_client_tx;
let _to_client_rx = to_client_rx;
tokio::run(client_processor);
}
}