use futures::{Async, Future, Poll};
use mio::net::UdpSocket as MioUdpSocket;
use std::fmt;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use super::{into_io_error, Bind};
use io::poll::{EventedHandle, Interest};
use sync::oneshot::Monitor;
#[derive(Clone)]
pub struct UdpSocket {
handle: Arc<EventedHandle<MioUdpSocket>>,
}
impl UdpSocket {
pub fn bind(addr: SocketAddr) -> UdpSocketBind {
UdpSocketBind(Bind::Bind(addr, MioUdpSocket::bind))
}
pub fn send_to<B: AsRef<[u8]>>(self, buf: B, target: SocketAddr) -> SendTo<B> {
SendTo(Some(SendToInner {
socket: self,
buf,
target,
monitor: None,
}))
}
pub fn recv_from<B: AsMut<[u8]>>(self, buf: B) -> RecvFrom<B> {
RecvFrom(Some(RecvFromInner {
socket: self,
buf,
monitor: None,
}))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.handle.inner().local_addr()
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.handle.inner().take_error()
}
pub fn with_inner<F, T>(&self, f: F) -> T
where
F: FnOnce(&MioUdpSocket) -> T,
{
f(&*self.handle.inner())
}
}
impl fmt::Debug for UdpSocket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "UdpSocket {{ ")?;
if let Ok(addr) = self.local_addr() {
write!(f, "local_addr:{:?}, ", addr)?;
}
write!(f, ".. }}")?;
Ok(())
}
}
#[derive(Debug)]
pub struct UdpSocketBind(Bind<fn(&SocketAddr) -> io::Result<MioUdpSocket>, MioUdpSocket>);
impl Future for UdpSocketBind {
type Item = UdpSocket;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(self.0.poll()?.map(|handle| UdpSocket { handle }))
}
}
#[derive(Debug)]
pub struct SendTo<B>(Option<SendToInner<B>>);
impl<B: AsRef<[u8]>> Future for SendTo<B> {
type Item = (UdpSocket, B, usize);
type Error = (UdpSocket, B, io::Error);
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut state = self.0.take().expect("Cannot poll SendTo twice");
loop {
if let Some(mut monitor) = state.monitor.take() {
match monitor.poll() {
Err(e) => return Err((state.socket, state.buf, into_io_error(e))),
Ok(Async::NotReady) => {
state.monitor = Some(monitor);
self.0 = Some(state);
return Ok(Async::NotReady);
}
Ok(Async::Ready(())) => {}
}
} else {
let result = state
.socket
.handle
.inner()
.send_to(state.buf.as_ref(), &state.target);
match result {
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
state.monitor = Some(state.socket.handle.monitor(Interest::Write));
} else {
return Err((state.socket, state.buf, e));
}
}
Ok(size) => return Ok(Async::Ready((state.socket, state.buf, size))),
}
}
}
}
}
#[derive(Debug)]
struct SendToInner<B> {
socket: UdpSocket,
buf: B,
target: SocketAddr,
monitor: Option<Monitor<(), io::Error>>,
}
#[derive(Debug)]
pub struct RecvFrom<B>(Option<RecvFromInner<B>>);
impl<B: AsMut<[u8]>> Future for RecvFrom<B> {
type Item = (UdpSocket, B, usize, SocketAddr);
type Error = (UdpSocket, B, io::Error);
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut state = self.0.take().expect("Cannot poll RecvFrom twice");
loop {
if let Some(mut monitor) = state.monitor.take() {
match monitor.poll() {
Err(e) => return Err((state.socket, state.buf, into_io_error(e))),
Ok(Async::NotReady) => {
state.monitor = Some(monitor);
self.0 = Some(state);
return Ok(Async::NotReady);
}
Ok(Async::Ready(())) => {}
}
} else {
let mut buf = state.buf;
let result = state.socket.handle.inner().recv_from(buf.as_mut());
state.buf = buf;
match result {
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
state.monitor = Some(state.socket.handle.monitor(Interest::Read));
} else {
return Err((state.socket, state.buf, e));
}
}
Ok((size, addr)) => {
return Ok(Async::Ready((state.socket, state.buf, size, addr)))
}
}
}
}
}
}
#[derive(Debug)]
struct RecvFromInner<B> {
socket: UdpSocket,
buf: B,
monitor: Option<Monitor<(), io::Error>>,
}