use std::io;
use std::result::Result;
use std::thread;
use std::sync::mpsc::{self, Receiver, Sender, TryRecvError, RecvError, Iter};
use std::net::{UdpSocket, SocketAddr};
use std::time::Duration;
use SSDPResult;
use net::packet::PacketReceiver;
pub trait FromRawSSDP: Sized {
fn raw_ssdp(bytes: &[u8]) -> SSDPResult<Self>;
}
pub struct SSDPIter<T> {
recv: SSDPReceiver<T>,
}
impl<T> SSDPIter<T> {
fn new(recv: SSDPReceiver<T>) -> SSDPIter<T> {
SSDPIter { recv: recv }
}
}
impl<T> Iterator for SSDPIter<T> {
type Item = (T, SocketAddr);
fn next(&mut self) -> Option<Self::Item> {
self.recv.recv().ok()
}
}
pub struct SSDPReceiver<T> {
recvr: Receiver<(T, SocketAddr)>
}
impl<T> SSDPReceiver<T>
where T: FromRawSSDP + Send + 'static
{
pub fn new(socks: Vec<UdpSocket>, time: Option<Duration>) -> io::Result<SSDPReceiver<T>> {
let (send, recv) = mpsc::channel();
for sock in socks.iter() {
try!(sock.set_read_timeout(time));
}
spawn_receivers(socks, send);
Ok(SSDPReceiver {
recvr: recv
})
}
}
fn spawn_receivers<T>(socks: Vec<UdpSocket>, sender: Sender<(T, SocketAddr)>)
where T: FromRawSSDP + Send + 'static
{
for sock in socks {
let pckt_recv = PacketReceiver::new(sock);
let sender = sender.clone();
thread::spawn(move || {
receive_packets(pckt_recv, sender);
});
}
}
impl<T> SSDPReceiver<T> {
pub fn try_recv(&self) -> Result<(T, SocketAddr), TryRecvError> {
self.recvr.try_recv()
}
pub fn recv(&self) -> Result<(T, SocketAddr), RecvError> {
self.recvr.recv()
}
}
impl<'a, T> IntoIterator for &'a SSDPReceiver<T> {
type Item = (T, SocketAddr);
type IntoIter = Iter<'a, (T, SocketAddr)>;
fn into_iter(self) -> Self::IntoIter {
self.recvr.iter()
}
}
impl<'a, T> IntoIterator for &'a mut SSDPReceiver<T> {
type Item = (T, SocketAddr);
type IntoIter = Iter<'a, (T, SocketAddr)>;
fn into_iter(self) -> Self::IntoIter {
self.recvr.iter()
}
}
impl<T> IntoIterator for SSDPReceiver<T> {
type Item = (T, SocketAddr);
type IntoIter = SSDPIter<T>;
fn into_iter(self) -> Self::IntoIter {
SSDPIter::new(self)
}
}
fn receive_packets<T>(recv: PacketReceiver, send: Sender<(T, SocketAddr)>)
where T: FromRawSSDP + Send
{
loop {
trace!("Waiting on packet at {}...", recv);
let (msg_bytes, addr) = match recv.recv_pckt() {
Ok((bytes, addr)) => (bytes, addr),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock ||
err.kind() == io::ErrorKind::TimedOut => {
trace!("Receiver at {} timed out", recv);
return;
},
Err(_) => {
continue;
}
};
trace!("Received packet with {} bytes", msg_bytes.len());
match T::raw_ssdp(&msg_bytes[..]) {
Ok(n) => send.send((n, addr)).unwrap(),
Err(_) => {
continue;
}
};
}
}