enpose_api/posestream.rs
1//! Client-side pose streaming from a single Enpose tracker device.
2//!
3//! A [`PoseStream`] delivers a live stream of [`MarkerPose`] updates from
4//! one device. Construct it from a [`DeviceInfo`] returned by
5//! [`crate::DeviceDiscovery`] or directly from an [`IpAddr`], then poll it
6//! with [`PoseStream::receive_pose_updates`] to get the poses that have
7//! arrived since your last call.
8//!
9//! The `create_thread` constructor argument selects between two modes:
10//!
11//! * **Threaded** (`true`) — a background thread continuously receives and
12//! buffers incoming poses, so none are missed regardless of how often you
13//! poll. [`PoseStream::receive_pose_updates`] returns the buffered poses.
14//! Use this when you cannot guarantee a regular polling cadence.
15//!
16//! * **Single-threaded** (`false`) — no thread is spawned; poses are
17//! collected when you call [`PoseStream::receive_pose_updates`]. Use this
18//! when you poll on your own regular cadence and want the stream to use
19//! only your thread. Poll often enough that updates are not missed between
20//! calls.
21//!
22//! In either mode, [`PoseStream::receive_pose_updates`] takes a `block` flag:
23//! pass `false` to return whatever has arrived (possibly nothing) without
24//! waiting, or `true` to wait for at least one pose update. A blocking call
25//! waits at most 3 seconds, then returns an empty result if none arrived.
26//!
27//! The connection is closed automatically when the [`PoseStream`] is
28//! dropped.
29
30use std::io;
31use std::net::{IpAddr, SocketAddr, UdpSocket};
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Condvar, Mutex};
34use std::thread::{self, JoinHandle};
35use std::time::{Duration, Instant};
36
37use crate::devicediscovery::DeviceInfo;
38use crate::protocol::{
39 PACKET_SIZE, PKT_TYPE_POSE_DATA, POSE_KEEPALIVE_INTERVAL_SECS, POSE_PORT,
40 encode_pose_subscribe, encode_pose_unsubscribe, parse_packet,
41};
42use crate::marker_pose::MarkerPose;
43
44/// Read timeout used by the background receive thread. Bounds how long the
45/// thread blocks on `recv` so it can wake up to send keep-alives and to
46/// notice a stop request promptly.
47const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(200);
48
49/// Maximum time a blocking [`PoseStream::receive_pose_updates`] call (one made
50/// with `block = true`) waits for a pose update before giving up and returning
51/// an empty result.
52const BLOCK_TIMEOUT: Duration = Duration::from_secs(3);
53
54/// Receive buffer size. Large enough for a full pose datagram (fixed
55/// header plus the MessagePack-encoded poses of one frame).
56const RECV_BUF_SIZE: usize = 65536;
57
58/// A live pose stream from one Enpose tracker device.
59///
60/// Dropping a `PoseStream` automatically disconnects it (see
61/// [`PoseStream::disconnect`]).
62pub struct PoseStream {
63 /// Socket connected to the device's [`POSE_PORT`]. Shared with the
64 /// background thread in threaded mode.
65 socket: Arc<UdpSocket>,
66 /// `false` once [`Self::disconnect`] has run, so `Drop` does not send a
67 /// second disconnect packet or join an already-joined thread.
68 connected: bool,
69 /// Background receiver state, present only in threaded mode.
70 thread: Option<ThreadState>,
71 /// When the last keep-alive was sent, used only in single-threaded mode
72 /// to decide when the next one is due.
73 last_keepalive: Instant,
74}
75
76/// State owned by the background receive thread in threaded mode.
77struct ThreadState {
78 /// Poses received since the last [`PoseStream::receive_pose_updates`]
79 /// call, in arrival order, paired with a condvar the receiver signals
80 /// whenever it appends — so a blocking receive can wait for data.
81 buffered: Arc<(Mutex<Vec<MarkerPose>>, Condvar)>,
82 /// Set to request the thread to exit.
83 stop: Arc<AtomicBool>,
84 /// Handle to join the thread on disconnect.
85 handle: Option<JoinHandle<()>>,
86}
87
88impl PoseStream {
89 /// Connect a pose stream to the device at `ip`.
90 ///
91 /// `ip` must be IPv4 — the Enpose API is IPv4-only. An IPv6 address is
92 /// rejected with [`io::ErrorKind::Unsupported`].
93 ///
94 /// When `create_thread` is `true`, spawns the background receiver thread
95 /// described in the [module docs](crate::posestream).
96 ///
97 /// # Errors
98 ///
99 /// Returns an [`io::Error`] if `ip` is not IPv4, or if the connection to
100 /// the device cannot be established.
101 pub fn from_ip(ip: IpAddr, create_thread: bool) -> io::Result<Self> {
102 if ip.is_ipv6() {
103 return Err(io::Error::new(
104 io::ErrorKind::Unsupported,
105 "the Enpose API supports IPv4 only",
106 ));
107 }
108 Self::connect_to(SocketAddr::new(ip, POSE_PORT), create_thread)
109 }
110
111 /// Connect a pose stream to a device discovered via
112 /// [`crate::DeviceDiscovery`].
113 ///
114 /// Convenience wrapper around [`Self::from_ip`] using [`DeviceInfo::ip`].
115 pub fn from_device(device: &DeviceInfo, create_thread: bool) -> io::Result<Self> {
116 Self::from_ip(device.ip, create_thread)
117 }
118
119 /// Test constructor that targets a full socket address instead of the
120 /// fixed [`POSE_PORT`], so unit tests can run a fake device on an
121 /// ephemeral loopback port.
122 #[cfg(test)]
123 pub(crate) fn with_target(addr: SocketAddr, create_thread: bool) -> io::Result<Self> {
124 Self::connect_to(addr, create_thread)
125 }
126
127 /// Bind an ephemeral local socket, connect it to `addr`, send the
128 /// initial subscribe packet, and optionally spawn the receiver thread.
129 fn connect_to(addr: SocketAddr, create_thread: bool) -> io::Result<Self> {
130 let socket = UdpSocket::bind((IpAddr::from([0, 0, 0, 0]), 0))?;
131 socket.connect(addr)?;
132 let socket = Arc::new(socket);
133
134 // Subscribe immediately so poses start flowing without waiting for
135 // the first keep-alive interval.
136 socket.send(&encode_pose_subscribe())?;
137
138 let thread = if create_thread {
139 Some(Self::spawn_receiver(socket.clone()))
140 } else {
141 None
142 };
143
144 Ok(Self {
145 socket,
146 connected: true,
147 thread,
148 last_keepalive: Instant::now(),
149 })
150 }
151
152 /// Return the marker poses received from the stream.
153 ///
154 /// When `block` is `false`, returns all poses that have arrived since the
155 /// previous call, or an empty vector if none have — it never waits. When
156 /// `block` is `true`, it waits for a pose update and returns the poses
157 /// gathered so far; the wait is bounded by a 3-second timeout, so a
158 /// blocking call still returns an empty vector if no update arrives within
159 /// that window.
160 ///
161 /// Call it repeatedly to keep receiving updates. In threaded mode the
162 /// poses are those gathered by the background thread.
163 ///
164 /// # Errors
165 ///
166 /// Returns an [`io::Error`] only for an unrecoverable communication
167 /// failure. In threaded mode this never returns an error.
168 pub fn receive_pose_updates(&mut self, block: bool) -> io::Result<Vec<MarkerPose>> {
169 if let Some(thread) = &self.thread {
170 let (buffer, available) = &*thread.buffered;
171 let mut buffer = buffer.lock().unwrap();
172 if block {
173 // Wait until the background thread buffers a pose, or until the
174 // timeout elapses (handles spurious wakeups internally).
175 (buffer, _) = available
176 .wait_timeout_while(buffer, BLOCK_TIMEOUT, |b| b.is_empty())
177 .unwrap();
178 }
179 return Ok(std::mem::take(&mut *buffer));
180 }
181
182 // Single-threaded: send keep-alives and drain the socket ourselves.
183 self.send_keepalive_if_due()?;
184
185 let mut poses = Vec::new();
186 self.drain_available(&mut poses)?;
187 if block && poses.is_empty() {
188 self.block_for_pose(&mut poses)?;
189 self.drain_available(&mut poses)?;
190 }
191 Ok(poses)
192 }
193
194 /// Send a keep-alive subscribe packet if the keep-alive interval has
195 /// elapsed since the previous one. Single-threaded mode only.
196 fn send_keepalive_if_due(&mut self) -> io::Result<()> {
197 if self.last_keepalive.elapsed() >= Duration::from_secs(POSE_KEEPALIVE_INTERVAL_SECS) {
198 self.socket.send(&encode_pose_subscribe())?;
199 self.last_keepalive = Instant::now();
200 }
201 Ok(())
202 }
203
204 /// Drain every pose datagram already waiting on the socket into `poses`,
205 /// without blocking.
206 fn drain_available(&self, poses: &mut Vec<MarkerPose>) -> io::Result<()> {
207 self.socket.set_nonblocking(true)?;
208 let mut buf = [0u8; RECV_BUF_SIZE];
209 loop {
210 match self.socket.recv(&mut buf) {
211 Ok(n) => {
212 if let Some(batch) = parse_pose_packet(&buf[..n]) {
213 poses.extend(batch);
214 }
215 }
216 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
217 Err(e) => return Err(e),
218 }
219 }
220 Ok(())
221 }
222
223 /// Block until at least one pose datagram arrives or [`BLOCK_TIMEOUT`]
224 /// elapses, appending any poses to `poses` while keeping the connection
225 /// alive. Returns with `poses` still empty if the timeout is reached.
226 fn block_for_pose(&mut self, poses: &mut Vec<MarkerPose>) -> io::Result<()> {
227 self.socket.set_nonblocking(false)?;
228 let deadline = Instant::now() + BLOCK_TIMEOUT;
229 let mut buf = [0u8; RECV_BUF_SIZE];
230 while poses.is_empty() {
231 // Stop once the deadline passes. Otherwise wake at least once per
232 // keep-alive tick, but never wait past the deadline.
233 let remaining = match deadline.checked_duration_since(Instant::now()) {
234 Some(d) if !d.is_zero() => d,
235 _ => break,
236 };
237 self.socket.set_read_timeout(Some(remaining.min(THREAD_RECV_TIMEOUT)))?;
238 match self.socket.recv(&mut buf) {
239 Ok(n) => {
240 if let Some(batch) = parse_pose_packet(&buf[..n]) {
241 poses.extend(batch);
242 }
243 }
244 // The read timeout fired (reported as WouldBlock or TimedOut
245 // depending on the platform): no data yet, so refresh the
246 // keep-alive and keep waiting until the deadline.
247 Err(e)
248 if e.kind() == io::ErrorKind::WouldBlock
249 || e.kind() == io::ErrorKind::TimedOut =>
250 {
251 self.send_keepalive_if_due()?;
252 }
253 Err(e) => return Err(e),
254 }
255 }
256 Ok(())
257 }
258
259 /// Disconnect the stream, closing the connection to the device.
260 ///
261 /// Idempotent, and called automatically when the [`PoseStream`] is
262 /// dropped, so you only need to call it explicitly to disconnect early.
263 pub fn disconnect(&mut self) {
264 if !self.connected {
265 return;
266 }
267 self.connected = false;
268
269 if let Some(thread) = &mut self.thread {
270 thread.stop.store(true, Ordering::Relaxed);
271 if let Some(handle) = thread.handle.take() {
272 let _ = handle.join();
273 }
274 }
275
276 // Best-effort: the device times the connection out regardless.
277 let _ = self.socket.send(&encode_pose_unsubscribe());
278 }
279
280 /// Spawn the background receiver thread and return its shared state.
281 fn spawn_receiver(socket: Arc<UdpSocket>) -> ThreadState {
282 let buffered = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
283 let stop = Arc::new(AtomicBool::new(false));
284 let thread_buffered = buffered.clone();
285 let thread_stop = stop.clone();
286 let handle =
287 thread::spawn(move || Self::receiver_thread(socket, thread_buffered, thread_stop));
288 ThreadState {
289 buffered,
290 stop,
291 handle: Some(handle),
292 }
293 }
294
295 /// Background thread body: periodically sends keep-alives and buffers
296 /// incoming pose datagrams until asked to stop, signalling the condvar so
297 /// a blocking receive wakes when new poses land.
298 fn receiver_thread(
299 socket: Arc<UdpSocket>,
300 buffered: Arc<(Mutex<Vec<MarkerPose>>, Condvar)>,
301 stop: Arc<AtomicBool>,
302 ) {
303 let _ = socket.set_read_timeout(Some(THREAD_RECV_TIMEOUT));
304 let mut last_keepalive = Instant::now();
305 let mut buf = [0u8; RECV_BUF_SIZE];
306 let (buffer, available) = &*buffered;
307
308 while !stop.load(Ordering::Relaxed) {
309 if last_keepalive.elapsed() >= Duration::from_secs(POSE_KEEPALIVE_INTERVAL_SECS) {
310 // Best-effort: a transient send error should not kill the
311 // stream; the next tick retries.
312 let _ = socket.send(&encode_pose_subscribe());
313 last_keepalive = Instant::now();
314 }
315
316 // A recv error is either the expected read timeout (which bounds
317 // the loop) or a transient failure retried next iteration; only a
318 // successful read carries poses. Append them and wake any blocking
319 // receive waiting on the condvar.
320 if let Ok(n) = socket.recv(&mut buf)
321 && let Some(batch) = parse_pose_packet(&buf[..n])
322 && !batch.is_empty()
323 {
324 buffer.lock().unwrap().extend(batch);
325 available.notify_one();
326 }
327 }
328 }
329}
330
331impl Drop for PoseStream {
332 fn drop(&mut self) {
333 self.disconnect();
334 }
335}
336
337/// Parse a [`PKT_TYPE_POSE_DATA`] datagram into its poses, or return
338/// `None` if the buffer is not a valid pose-data packet (wrong magic,
339/// wrong type, or undecodable payload).
340fn parse_pose_packet(data: &[u8]) -> Option<Vec<MarkerPose>> {
341 let parsed = parse_packet(data)?;
342 if parsed.pkt_type != PKT_TYPE_POSE_DATA {
343 return None;
344 }
345 rmp_serde::from_slice::<Vec<MarkerPose>>(&data[PACKET_SIZE..]).ok()
346}
347
348#[cfg(test)]
349#[path = "posestream_tests.rs"]
350mod tests;