use std::fmt::{Debug};
use std::clone::{Clone};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use futures::sync::mpsc;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender, SendError};
use futures::sync::oneshot;
use tokio::prelude::*;
use tokio_codec::{Encoder, Decoder};
use tokio::spawn;
use tokio::net::udp::{UdpSocket, UdpFramed};
use crate::error::Error;
pub type IncomingRx<Req> = UnboundedReceiver<(Req, SocketAddr)>;
pub type OutgoingTx<Resp> = UnboundedSender<(Resp, SocketAddr)>;
#[derive(Clone)]
pub struct UdpConnection<Codec: Encoder + Decoder>
{
incoming_rx: Arc<Mutex<IncomingRx<<Codec as Decoder>::Item>>>,
outgoing_tx: Arc<Mutex<OutgoingTx<<Codec as Encoder>::Item>>>,
exit_tx: Arc<Mutex<Option<(oneshot::Sender<()>, oneshot::Sender<()>)>>>,
}
impl <Codec> UdpConnection<Codec>
where
Codec: Encoder + Decoder + Clone + Send + 'static,
<Codec as Encoder>::Item: Send + Debug,
<Codec as Encoder>::Error: Send + Debug,
<Codec as Decoder>::Item: Send + Debug,
<Codec as Decoder>::Error: Send + Debug,
{
pub fn new(addr: &SocketAddr, codec: Codec) -> impl Future<Item=UdpConnection<Codec>, Error=Error> {
debug!("[connector] creating connection (udp address: {})", addr);
let socket = match UdpSocket::bind(&addr) {
Ok(s) => s,
Err(e) => return futures::future::err(e.into()),
};
futures::future::ok(UdpConnection::from_socket(socket, codec))
}
fn from_socket(socket: UdpSocket, codec: Codec) -> UdpConnection<Codec> {
let framed = UdpFramed::new(socket, codec);
let (tx, rx) = framed.split();
let (incoming_tx, incoming_rx) = mpsc::unbounded::<_>();
let (incoming_exit_tx, incoming_exit_rx) = oneshot::channel::<()>();
let rx_handle = rx.for_each(move |(data, addr)| {
trace!("[udp connection] receive from: '{:?}' data: '{:?}'", addr, data);
incoming_tx.clone().send((data, addr)).map(|_| () ).map_err(|e| panic!("[udp connection] send error: {:?}", e))
})
.map_err(|e| panic!("[udp connection] error: {:?}", e))
.select2(incoming_exit_rx)
.then(|_| {
debug!("[udp connection] closing incoming handler");
Ok(())
});
spawn(rx_handle);
let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<_>();
let (outgoing_exit_tx, outgoing_exit_rx) = oneshot::channel::<()>();
let tx_handle = tx.send_all(outgoing_rx.map_err(|_| panic!() ))
.select2(outgoing_exit_rx)
.then(|_| {
debug!("[udp connection] closing outgoing handler");
Ok(())
});
spawn(tx_handle);
UdpConnection{
incoming_rx: Arc::new(Mutex::new(incoming_rx)),
outgoing_tx: Arc::new(Mutex::new(outgoing_tx)),
exit_tx: Arc::new(Mutex::new(Some((incoming_exit_tx, outgoing_exit_tx)))),
}
}
pub fn send(&mut self, addr: SocketAddr, data: <Codec as Encoder>::Item) {
let unlocked = self.outgoing_tx.lock().unwrap();
let _err = unlocked.unbounded_send((data, addr));
}
pub fn shutdown(self) {
let tx = self.exit_tx.lock().unwrap().take().unwrap();
let _ = tx.0.send(());
let _ = tx.1.send(());
}
}
unsafe impl<Codec> Send for UdpConnection<Codec>
where
Codec: Encoder + Decoder + Clone,
{}
impl<Codec> Sink for UdpConnection<Codec>
where
Codec: Encoder + Decoder + Clone,
{
type SinkItem = (<Codec as Encoder>::Item, SocketAddr);
type SinkError = SendError<(<Codec as Encoder>::Item, SocketAddr)>;
fn start_send(
&mut self,
item: Self::SinkItem,
) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
trace!("[connection] start send");
self.outgoing_tx.lock().unwrap().start_send(item)
}
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
trace!("[connection] send complete");
self.outgoing_tx.lock().unwrap().poll_complete()
}
}
impl<Codec> Stream for UdpConnection<Codec>
where
Codec: Encoder + Decoder + Clone,
{
type Item = (<Codec as Decoder>::Item, SocketAddr);
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
trace!("[connection] poll receive");
self.incoming_rx.lock().unwrap().poll()
}
}
#[derive(Clone, Debug)]
pub struct UdpInfo {
pub address: SocketAddr,
}