use std::io::{self};
use std::result::{Result};
use std::thread::{self};
use std::sync::{Arc};
use std::sync::mpsc::{self, Receiver, Sender, TryRecvError, RecvError, Iter};
use std::sync::atomic::{AtomicBool, Ordering};
use std::net::{UdpSocket, SocketAddr};
use std::time::Duration;
use {SSDPResult};
use net::packet::{PacketReceiver};
pub trait FromRawSSDP: Sized {
fn raw_ssdp(bytes: &[u8]) -> SSDPResult<Self>;
}
pub struct SSDPIter<T> {
recv: SSDPReceiver<T>
}
impl<T> SSDPIter<T> {
fn new(recv: SSDPReceiver<T>) -> SSDPIter<T> {
SSDPIter{ recv: recv }
}
}
impl<T> Iterator for SSDPIter<T> {
type Item = (T, SocketAddr);
fn next(&mut self) -> Option<Self::Item> {
self.recv.recv().ok()
}
}
pub struct SSDPReceiver<T> {
recvr: Receiver<(T, SocketAddr)>,
socks: Vec<UdpSocket>,
addrs: Vec<SocketAddr>,
kill: Arc<AtomicBool>
}
impl<T> SSDPReceiver<T> where T: FromRawSSDP + Send + 'static {
pub fn new(socks: Vec<UdpSocket>, time: Option<Duration>) -> io::Result<SSDPReceiver<T>> {
let (send, recv) = mpsc::channel();
let send_socks = try!(clone_socks(&socks[..]));
let recv_addrs = try!(clone_addrs(&socks[..]));
let self_kill = Arc::new(AtomicBool::new(false));
spawn_receivers(send_socks, self_kill.clone(), send);
let spawn_result = maybe_spawn_timer(time, self_kill.clone(), &socks[..]);
if let Err(e) = spawn_result {
syncronize_kill(&*self_kill, &socks[..], &recv_addrs[..]);
return Err(e)
}
Ok(SSDPReceiver{ recvr: recv, socks: socks, addrs: recv_addrs, kill: self_kill })
}
}
fn clone_socks(socks: &[UdpSocket]) -> io::Result<Vec<UdpSocket>> {
let mut clone_socks = Vec::with_capacity(socks.len());
for sock in socks.iter() {
clone_socks.push(try!(sock.try_clone()));
}
Ok(clone_socks)
}
fn clone_addrs(socks: &[UdpSocket]) -> io::Result<Vec<SocketAddr>> {
let mut clone_addrs = Vec::with_capacity(socks.len());
for sock in socks.iter() {
clone_addrs.push(try!(sock.local_addr()));
}
Ok(clone_addrs)
}
fn spawn_receivers<T>(socks: Vec<UdpSocket>, kill_flag: Arc<AtomicBool>, sender: Sender<(T, SocketAddr)>)
where T: FromRawSSDP + Send + 'static {
for sock in socks {
let pckt_recv = PacketReceiver::new(sock);
let kill_flag = kill_flag.clone();
let sender = sender.clone();
thread::spawn(move || {
receive_packets(pckt_recv, kill_flag, sender);
});
}
}
fn maybe_spawn_timer(time: Option<Duration>, kill: Arc<AtomicBool>, socks: &[UdpSocket]) -> io::Result<()> {
match time {
Some(n) => {
let timer_socks = try!(clone_socks(socks));
let timer_addrs = try!(clone_addrs(socks));
thread::spawn(move || {
kill_timer(n, kill, timer_socks, timer_addrs);
});
},
None => ()
};
Ok(())
}
impl<T> SSDPReceiver<T> {
pub fn try_recv(&self) -> Result<(T, SocketAddr), TryRecvError> {
self.recvr.try_recv()
}
pub fn recv(&self) -> Result<(T, SocketAddr), RecvError> {
self.recvr.recv()
}
}
impl<'a, T> IntoIterator for &'a SSDPReceiver<T> {
type Item = (T, SocketAddr);
type IntoIter = Iter<'a, (T, SocketAddr)>;
fn into_iter(self) -> Self::IntoIter {
self.recvr.iter()
}
}
impl<'a, T> IntoIterator for &'a mut SSDPReceiver<T> {
type Item = (T, SocketAddr);
type IntoIter = Iter<'a, (T, SocketAddr)>;
fn into_iter(self) -> Self::IntoIter {
self.recvr.iter()
}
}
impl<T> IntoIterator for SSDPReceiver<T> {
type Item = (T, SocketAddr);
type IntoIter = SSDPIter<T>;
fn into_iter(self) -> Self::IntoIter {
SSDPIter::new(self)
}
}
impl<T> Drop for SSDPReceiver<T> {
fn drop(&mut self) {
syncronize_kill(&*self.kill, &self.socks[..], &self.addrs[..]);
}
}
fn receive_packets<T>(recv: PacketReceiver, kill: Arc<AtomicBool>, send: Sender<(T, SocketAddr)>)
where T: FromRawSSDP + Send {
loop {
let (msg_bytes, addr) = match recv.recv_pckt() {
Ok((bytes, addr)) => (bytes, addr),
Err(_) => { continue; }
};
if kill.load(Ordering::Acquire) {
return
}
match T::raw_ssdp(&msg_bytes[..]) {
Ok(n) => {
send.send((n, addr)).unwrap()
},
Err(_) => { continue; }
};
}
}
fn kill_timer(time: Duration, kill: Arc<AtomicBool>, socks: Vec<UdpSocket>, addrs: Vec<SocketAddr>) {
thread::sleep(time);
syncronize_kill(&*kill, &socks[..], &addrs[..]);
}
#[allow(unused)]
fn syncronize_kill(kill: &AtomicBool, socks: &[UdpSocket], local_addrs: &[SocketAddr]) {
kill.store(true, Ordering::SeqCst);
for (sock, addr) in socks.iter().zip(local_addrs.iter()) {
sock.send_to(&[0], addr);
}
}