use async_std::net::{ToSocketAddrs, UdpSocket};
use async_std::stream::Stream;
use futures_lite::ready;
use rosc::OscPacket;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::Error;
use crate::prelude::IntoOscPacket;
use crate::udp::UdpSocketStream;
#[derive(Debug)]
pub struct OscSocket {
socket: UdpSocketStream,
}
impl OscSocket {
pub fn new(socket: UdpSocket) -> Self {
let socket = UdpSocketStream::new(socket);
Self { socket }
}
pub async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
let socket = UdpSocket::bind(addr).await?;
Ok(Self::new(socket))
}
pub async fn connect<A: ToSocketAddrs>(&self, addrs: A) -> Result<(), Error> {
self.socket().connect(addrs).await?;
Ok(())
}
pub async fn send_to<A: ToSocketAddrs, P: IntoOscPacket>(
&self,
packet: P,
addrs: A,
) -> Result<(), Error> {
let buf = rosc::encoder::encode(&packet.into_osc_packet())?;
let n = self.socket().send_to(&buf[..], addrs).await?;
check_len(&buf[..], n)
}
pub async fn send<P: IntoOscPacket>(&self, packet: P) -> Result<(), Error> {
let buf = rosc::encoder::encode(&packet.into_osc_packet())?;
let n = self.socket().send(&buf[..]).await?;
check_len(&buf[..], n)
}
pub fn sender(&self) -> OscSender {
OscSender::new(self.socket.clone_inner())
}
pub fn socket(&self) -> &UdpSocket {
self.socket.get_ref()
}
pub fn local_addr(&self) -> Result<SocketAddr, Error> {
let addr = self.socket().local_addr()?;
Ok(addr)
}
}
impl Stream for OscSocket {
type Item = Result<(OscPacket, SocketAddr), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let packet = ready!(Pin::new(&mut self.socket).poll_next(cx));
let message = match packet {
None => None,
Some(packet) => Some(match packet {
Err(err) => Err(err.into()),
Ok((buf, peer_addr)) => rosc::decoder::decode(&buf[..])
.map_err(|e| e.into())
.map(|p| (p, peer_addr)),
}),
};
Poll::Ready(message)
}
}
#[derive(Clone, Debug)]
pub struct OscSender {
socket: Arc<UdpSocket>,
}
impl OscSender {
fn new(socket: Arc<UdpSocket>) -> Self {
Self { socket }
}
pub async fn send_to<A: ToSocketAddrs, P: IntoOscPacket>(
&self,
packet: P,
addrs: A,
) -> Result<(), Error> {
let buf = rosc::encoder::encode(&packet.into_osc_packet())?;
let n = self.socket().send_to(&buf[..], addrs).await?;
check_len(&buf[..], n)
}
pub async fn send<P: IntoOscPacket>(&self, packet: P) -> Result<(), Error> {
let buf = rosc::encoder::encode(&packet.into_osc_packet())?;
let n = self.socket().send(&buf[..]).await?;
check_len(&buf[..], n)
}
pub fn socket(&self) -> &UdpSocket {
&*self.socket
}
}
fn check_len(buf: &[u8], len: usize) -> Result<(), Error> {
if len != buf.len() {
Err(io::Error::new(io::ErrorKind::Interrupted, "UDP packet not fully sent").into())
} else {
Ok(())
}
}