use crate::channel::Channel;
use crate::message::{Indication, Request, Response};
use crate::transport::StunTransport;
use crate::{Error, Result};
use fibers::sync::{mpsc, oneshot};
use fibers::Spawn;
use futures::stream::Fuse;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use std::fmt;
use std::marker::PhantomData;
use stun_codec::Attribute;
#[derive(Debug, Clone)]
pub struct Client<A, T>
where
A: Attribute,
T: StunTransport<A>,
{
command_tx: mpsc::Sender<Command<A, T::PeerAddr>>,
_phantom: PhantomData<T>,
}
impl<A, T> Client<A, T>
where
A: Attribute + Send + 'static,
T: StunTransport<A> + Send + 'static,
T::PeerAddr: Send + 'static,
{
pub fn new<S>(spawner: &S, channel: Channel<A, T>) -> Self
where
S: Spawn + Clone + Send + 'static,
{
let (command_tx, command_rx) = mpsc::channel();
let channel_driver = ChannelDriver {
spawner: spawner.clone(),
channel: Ok(channel),
command_rx: command_rx.fuse(),
};
spawner.spawn(channel_driver);
Client {
command_tx,
_phantom: PhantomData,
}
}
pub fn call(
&self,
peer: T::PeerAddr,
request: Request<A>,
) -> impl Future<Item = Response<A>, Error = Error> {
let (tx, rx) = oneshot::monitor();
let command = Command::Call(peer, request, tx);
track!(self.command_tx.send(command).map_err(Error::from))
.into_future()
.and_then(move |()| rx.map_err(|e| track!(Error::from(e))))
}
pub fn cast(&self, peer: T::PeerAddr, indication: Indication<A>) -> Result<()> {
let command = Command::Cast(peer, indication);
track!(self.command_tx.send(command).map_err(Error::from))
}
}
enum Command<A, P> {
Call(P, Request<A>, oneshot::Monitored<Response<A>, Error>),
Cast(P, Indication<A>),
}
impl<A, P> fmt::Debug for Command<A, P> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Command::Call(..) => write!(f, "Call(..)"),
Command::Cast(..) => write!(f, "Cast(..)"),
}
}
}
struct ChannelDriver<S, A, T>
where
A: Attribute,
T: StunTransport<A>,
{
spawner: S,
channel: Result<Channel<A, T>>,
command_rx: Fuse<mpsc::Receiver<Command<A, T::PeerAddr>>>,
}
impl<S, A, T> ChannelDriver<S, A, T>
where
S: Spawn,
A: Attribute + Send + 'static,
T: StunTransport<A> + Send + 'static,
{
fn handle_command(&mut self, command: Command<A, T::PeerAddr>) {
match command {
Command::Cast(peer, indication) => {
if let Ok(channel) = self.channel.as_mut() {
let _ = channel.cast(peer, indication);
}
}
Command::Call(peer, request, reply) => match self.channel {
Err(ref e) => {
reply.exit(Err(track!(e.clone())));
}
Ok(ref mut channel) => {
let future =
channel
.call(peer, request)
.map_err(Error::from)
.then(move |result| {
reply.exit(track!(result));
Ok(())
});
self.spawner.spawn(future);
}
},
}
}
}
impl<S, A, T> Future for ChannelDriver<S, A, T>
where
S: Spawn,
A: Attribute + Send + 'static,
T: StunTransport<A> + Send + 'static,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Async::Ready(command) = self.command_rx.poll().expect("never fails") {
if let Some(command) = command {
self.handle_command(command);
} else {
let outstanding_transactions = self
.channel
.as_mut()
.ok()
.map_or(0, |c| c.outstanding_transactions());
if outstanding_transactions == 0 {
return Ok(Async::Ready(()));
} else {
break;
}
}
}
while self.channel.is_ok() {
match track!(self.channel.as_mut().expect("never fails").poll_recv()) {
Err(e) => {
self.channel = Err(e);
break;
}
Ok(Async::NotReady) => {}
Ok(Async::Ready(_message)) => {
continue;
}
}
if let Err(e) = track!(self.channel.as_mut().expect("never fails").poll_send()) {
self.channel = Err(e);
}
break;
}
Ok(Async::NotReady)
}
}