use super::{decode, rosc, CommunicationError, Connected, Packet, Unconnected};
use std;
use std::net::{SocketAddr, SocketAddrV4, ToSocketAddrs, UdpSocket};
use std::sync::atomic::{self, AtomicBool};
use std::sync::Mutex;
pub const DEFAULT_MTU: usize = rosc::decoder::MTU;
pub const DEFAULT_NON_BLOCKING: bool = false;
pub struct Receiver<M = Unconnected> {
buffer: Mutex<Vec<u8>>,
socket: UdpSocket,
non_blocking: AtomicBool,
mode: M,
}
pub struct Iter<'a, M = Unconnected>
where
M: 'a,
{
receiver: &'a Receiver<M>,
}
pub struct TryIter<'a, M = Unconnected>
where
M: 'a,
{
receiver: &'a Receiver<M>,
}
impl<M> Receiver<M> {
pub fn local_addr(&self) -> Result<SocketAddr, std::io::Error> {
self.socket.local_addr()
}
fn switch_to_blocking(&self) -> Result<(), std::io::Error> {
if self.non_blocking.load(atomic::Ordering::Relaxed) {
self.socket.set_nonblocking(false)?;
self.non_blocking.store(false, atomic::Ordering::Relaxed);
}
Ok(())
}
fn switch_to_non_blocking(&self) -> Result<(), std::io::Error> {
if !self.non_blocking.load(atomic::Ordering::Relaxed) {
self.socket.set_nonblocking(true)?;
self.non_blocking.store(true, atomic::Ordering::Relaxed);
}
Ok(())
}
}
impl Receiver<Unconnected> {
pub fn bind_to<A>(addr: A) -> Result<Self, std::io::Error>
where
A: ToSocketAddrs,
{
Self::bind_to_with_mtu(addr, DEFAULT_MTU)
}
pub fn bind_to_with_mtu<A>(addr: A, mtu: usize) -> Result<Self, std::io::Error>
where
A: ToSocketAddrs,
{
let buffer = Mutex::new(vec![0; mtu]);
let socket = UdpSocket::bind(addr)?;
let non_blocking = AtomicBool::new(DEFAULT_NON_BLOCKING);
let mode = Unconnected;
let receiver = Receiver {
buffer,
socket,
non_blocking,
mode,
};
Ok(receiver)
}
pub fn bind(port: u16) -> Result<Self, std::io::Error> {
Self::bind_to(SocketAddrV4::new(super::default_ipv4_addr(), port))
}
pub fn bind_with_mtu(port: u16, mtu: usize) -> Result<Self, std::io::Error> {
Self::bind_to_with_mtu(SocketAddrV4::new(super::default_ipv4_addr(), port), mtu)
}
pub fn connect<A>(self, addr: A) -> Result<Receiver<Connected>, std::io::Error>
where
A: ToSocketAddrs,
{
let Receiver {
buffer,
socket,
non_blocking,
..
} = self;
let mut addrs = addr.to_socket_addrs()?;
let addr = addrs.next().expect("could not resolve any `SocketAddr`s");
socket.connect(addr)?;
let mode = Connected { addr };
Ok(Receiver {
buffer,
socket,
non_blocking,
mode,
})
}
pub fn recv(&self) -> Result<(Packet, SocketAddr), CommunicationError> {
self.switch_to_blocking()?;
let mut buffer = self.buffer.lock()?;
let (len, addr) = self.socket.recv_from(&mut buffer)?;
let packet = decode(&buffer[..len])?;
Ok((packet, addr))
}
pub fn try_recv(&self) -> Result<Option<(Packet, SocketAddr)>, CommunicationError> {
self.switch_to_non_blocking()?;
let mut buffer = self.buffer.lock()?;
let (len, addr) = match self.socket.recv_from(&mut buffer) {
Ok(tuple) => tuple,
Err(_) => return Ok(None),
};
let packet = decode(&buffer[..len])?;
Ok(Some((packet, addr)))
}
pub fn iter(&self) -> Iter<Unconnected> {
Iter { receiver: self }
}
pub fn try_iter(&self) -> TryIter<Unconnected> {
TryIter { receiver: self }
}
}
impl Receiver<Connected> {
pub fn remote_addr(&self) -> SocketAddr {
self.mode.addr
}
pub fn recv(&self) -> Result<Packet, CommunicationError> {
self.switch_to_blocking()?;
let mut buffer = self.buffer.lock()?;
let len = self.socket.recv(&mut buffer)?;
let packet = decode(&buffer[..len])?;
Ok(packet)
}
pub fn try_recv(&self) -> Result<Option<Packet>, CommunicationError> {
self.switch_to_non_blocking()?;
let mut buffer = self.buffer.lock()?;
let len = match self.socket.recv(&mut buffer) {
Ok(len) => len,
Err(_) => return Ok(None),
};
let packet = decode(&buffer[..len])?;
Ok(Some(packet))
}
pub fn iter(&self) -> Iter<Connected> {
Iter { receiver: self }
}
pub fn try_iter(&self) -> TryIter<Connected> {
TryIter { receiver: self }
}
}
impl<'a> Iterator for Iter<'a, Connected> {
type Item = Packet;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
impl<'a> Iterator for Iter<'a, Unconnected> {
type Item = (Packet, SocketAddr);
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
impl<'a> Iterator for TryIter<'a, Connected> {
type Item = Packet;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.try_recv().ok().and_then(|p| p)
}
}
impl<'a> Iterator for TryIter<'a, Unconnected> {
type Item = (Packet, SocketAddr);
fn next(&mut self) -> Option<Self::Item> {
self.receiver.try_recv().ok().and_then(|p| p)
}
}