use std::io;
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crate::devicediscovery::DeviceInfo;
use crate::protocol::{
PACKET_SIZE, PKT_TYPE_POSE_DATA, POSE_KEEPALIVE_INTERVAL_SECS, POSE_PORT,
encode_pose_subscribe, encode_pose_unsubscribe, parse_packet,
};
use crate::marker_pose::MarkerPose;
const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(200);
const BLOCK_TIMEOUT: Duration = Duration::from_secs(3);
const RECV_BUF_SIZE: usize = 65536;
pub struct PoseStream {
socket: Arc<UdpSocket>,
connected: bool,
thread: Option<ThreadState>,
last_keepalive: Instant,
}
struct ThreadState {
buffered: Arc<(Mutex<Vec<MarkerPose>>, Condvar)>,
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl PoseStream {
pub fn from_ip(ip: IpAddr, create_thread: bool) -> io::Result<Self> {
if ip.is_ipv6() {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"the Enpose API supports IPv4 only",
));
}
Self::connect_to(SocketAddr::new(ip, POSE_PORT), create_thread)
}
pub fn from_device(device: &DeviceInfo, create_thread: bool) -> io::Result<Self> {
Self::from_ip(device.ip, create_thread)
}
#[cfg(test)]
pub(crate) fn with_target(addr: SocketAddr, create_thread: bool) -> io::Result<Self> {
Self::connect_to(addr, create_thread)
}
fn connect_to(addr: SocketAddr, create_thread: bool) -> io::Result<Self> {
let socket = UdpSocket::bind((IpAddr::from([0, 0, 0, 0]), 0))?;
socket.connect(addr)?;
let socket = Arc::new(socket);
socket.send(&encode_pose_subscribe())?;
let thread = if create_thread {
Some(Self::spawn_receiver(socket.clone()))
} else {
None
};
Ok(Self {
socket,
connected: true,
thread,
last_keepalive: Instant::now(),
})
}
pub fn receive_pose_updates(&mut self, block: bool) -> io::Result<Vec<MarkerPose>> {
if let Some(thread) = &self.thread {
let (buffer, available) = &*thread.buffered;
let mut buffer = buffer.lock().unwrap();
if block {
(buffer, _) = available
.wait_timeout_while(buffer, BLOCK_TIMEOUT, |b| b.is_empty())
.unwrap();
}
return Ok(std::mem::take(&mut *buffer));
}
self.send_keepalive_if_due()?;
let mut poses = Vec::new();
self.drain_available(&mut poses)?;
if block && poses.is_empty() {
self.block_for_pose(&mut poses)?;
self.drain_available(&mut poses)?;
}
Ok(poses)
}
fn send_keepalive_if_due(&mut self) -> io::Result<()> {
if self.last_keepalive.elapsed() >= Duration::from_secs(POSE_KEEPALIVE_INTERVAL_SECS) {
self.socket.send(&encode_pose_subscribe())?;
self.last_keepalive = Instant::now();
}
Ok(())
}
fn drain_available(&self, poses: &mut Vec<MarkerPose>) -> io::Result<()> {
self.socket.set_nonblocking(true)?;
let mut buf = [0u8; RECV_BUF_SIZE];
loop {
match self.socket.recv(&mut buf) {
Ok(n) => {
if let Some(batch) = parse_pose_packet(&buf[..n]) {
poses.extend(batch);
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => return Err(e),
}
}
Ok(())
}
fn block_for_pose(&mut self, poses: &mut Vec<MarkerPose>) -> io::Result<()> {
self.socket.set_nonblocking(false)?;
let deadline = Instant::now() + BLOCK_TIMEOUT;
let mut buf = [0u8; RECV_BUF_SIZE];
while poses.is_empty() {
let remaining = match deadline.checked_duration_since(Instant::now()) {
Some(d) if !d.is_zero() => d,
_ => break,
};
self.socket.set_read_timeout(Some(remaining.min(THREAD_RECV_TIMEOUT)))?;
match self.socket.recv(&mut buf) {
Ok(n) => {
if let Some(batch) = parse_pose_packet(&buf[..n]) {
poses.extend(batch);
}
}
Err(e)
if e.kind() == io::ErrorKind::WouldBlock
|| e.kind() == io::ErrorKind::TimedOut =>
{
self.send_keepalive_if_due()?;
}
Err(e) => return Err(e),
}
}
Ok(())
}
pub fn disconnect(&mut self) {
if !self.connected {
return;
}
self.connected = false;
if let Some(thread) = &mut self.thread {
thread.stop.store(true, Ordering::Relaxed);
if let Some(handle) = thread.handle.take() {
let _ = handle.join();
}
}
let _ = self.socket.send(&encode_pose_unsubscribe());
}
fn spawn_receiver(socket: Arc<UdpSocket>) -> ThreadState {
let buffered = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
let stop = Arc::new(AtomicBool::new(false));
let thread_buffered = buffered.clone();
let thread_stop = stop.clone();
let handle =
thread::spawn(move || Self::receiver_thread(socket, thread_buffered, thread_stop));
ThreadState {
buffered,
stop,
handle: Some(handle),
}
}
fn receiver_thread(
socket: Arc<UdpSocket>,
buffered: Arc<(Mutex<Vec<MarkerPose>>, Condvar)>,
stop: Arc<AtomicBool>,
) {
let _ = socket.set_read_timeout(Some(THREAD_RECV_TIMEOUT));
let mut last_keepalive = Instant::now();
let mut buf = [0u8; RECV_BUF_SIZE];
let (buffer, available) = &*buffered;
while !stop.load(Ordering::Relaxed) {
if last_keepalive.elapsed() >= Duration::from_secs(POSE_KEEPALIVE_INTERVAL_SECS) {
let _ = socket.send(&encode_pose_subscribe());
last_keepalive = Instant::now();
}
if let Ok(n) = socket.recv(&mut buf)
&& let Some(batch) = parse_pose_packet(&buf[..n])
&& !batch.is_empty()
{
buffer.lock().unwrap().extend(batch);
available.notify_one();
}
}
}
}
impl Drop for PoseStream {
fn drop(&mut self) {
self.disconnect();
}
}
fn parse_pose_packet(data: &[u8]) -> Option<Vec<MarkerPose>> {
let parsed = parse_packet(data)?;
if parsed.pkt_type != PKT_TYPE_POSE_DATA {
return None;
}
rmp_serde::from_slice::<Vec<MarkerPose>>(&data[PACKET_SIZE..]).ok()
}
#[cfg(test)]
#[path = "posestream_tests.rs"]
mod tests;