use std::net::SocketAddr;
use std::io;
use futures::{Async, Future, Poll, Stream};
use tokio_core::reactor::Handle;
use BufClientStreamHandle;
use client::ClientStreamHandle;
use udp::UdpStream;
#[must_use = "futures do nothing unless polled"]
pub struct UdpClientStream {
name_server: SocketAddr,
udp_stream: UdpStream,
}
impl UdpClientStream {
pub fn new
(name_server: SocketAddr,
loop_handle: &Handle)
-> (Box<Future<Item = UdpClientStream, Error = io::Error>>, Box<ClientStreamHandle>) {
let (stream_future, sender) = UdpStream::new(name_server, loop_handle);
let new_future: Box<Future<Item = UdpClientStream, Error = io::Error>> =
Box::new(stream_future.map(move |udp_stream| {
UdpClientStream {
name_server: name_server,
udp_stream: udp_stream,
}
}));
let sender = Box::new(BufClientStreamHandle {
name_server: name_server,
sender: sender,
});
(new_future, sender)
}
}
impl Stream for UdpClientStream {
type Item = Vec<u8>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.udp_stream.poll()) {
Some((buffer, src_addr)) => {
if src_addr != self.name_server {
debug!("{} does not match name_server: {}",
src_addr,
self.name_server)
}
Ok(Async::Ready(Some(buffer)))
}
None => Ok(Async::Ready(None)),
}
}
}
#[cfg(test)]
use std::net::{IpAddr, Ipv4Addr};
#[cfg(not(target_os = "linux"))]
#[cfg(test)]
use std::net::Ipv6Addr;
#[test]
fn test_udp_client_stream_ipv4() {
udp_client_stream_test(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
}
#[test]
#[cfg(not(target_os = "linux"))] fn test_udp_client_stream_ipv6() {
udp_client_stream_test(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))
}
#[cfg(test)]
fn udp_client_stream_test(server_addr: IpAddr) {
use tokio_core::reactor::Core;
use log::LogLevel;
use logger::TrustDnsLogger;
TrustDnsLogger::enable_logging(LogLevel::Debug);
use std;
let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let succeeded_clone = succeeded.clone();
std::thread::Builder::new()
.name("thread_killer".to_string())
.spawn(move || {
let succeeded = succeeded_clone.clone();
for _ in 0..15 {
std::thread::sleep(std::time::Duration::from_secs(1));
if succeeded.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
}
panic!("timeout");
})
.unwrap();
let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
server.set_read_timeout(Some(std::time::Duration::from_secs(5))).unwrap(); server.set_write_timeout(Some(std::time::Duration::from_secs(5))).unwrap(); let server_addr = server.local_addr().unwrap();
let test_bytes: &'static [u8; 8] = b"DEADBEEF";
let send_recv_times = 4;
let server_handle = std::thread::Builder::new()
.name("test_udp_client_stream_ipv4:server".to_string())
.spawn(move || {
let mut buffer = [0_u8; 512];
for _ in 0..send_recv_times {
let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
assert_eq!(&buffer[0..len], test_bytes);
assert_eq!(server.send_to(&buffer[0..len], addr).expect("send failed"),
len);
}
})
.unwrap();
let mut io_loop = Core::new().unwrap();
let (stream, mut sender) = UdpClientStream::new(server_addr, &io_loop.handle());
let mut stream: UdpClientStream = io_loop.run(stream).ok().unwrap();
for _ in 0..send_recv_times {
sender.send(test_bytes.to_vec()).unwrap();
let (buffer, stream_tmp) = io_loop.run(stream.into_future()).ok().unwrap();
stream = stream_tmp;
assert_eq!(&buffer.expect("no buffer received"), test_bytes);
}
succeeded.store(true, std::sync::atomic::Ordering::Relaxed);
server_handle.join().expect("server thread failed");
}