librtlsdr_rs/device/streaming.rs
1//! Data streaming — buffer reset, sync read, async read.
2//!
3//! Ports `rtlsdr_reset_buffer`, `rtlsdr_read_sync`,
4//! `rtlsdr_read_async`, `rtlsdr_cancel_async`.
5//!
6//! Note: The C implementation uses libusb's async transfer API with multiple
7//! pre-submitted bulk transfers. The Rust implementation uses a blocking
8//! read loop that checks a shared cancellation flag. True async support
9//! will be added when the pipeline is wired up with worker threads.
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::time::Duration;
13
14use crate::constants::{BULK_TIMEOUT, DEFAULT_BUF_LENGTH};
15use crate::error::RtlSdrError;
16use crate::reg::Block;
17use crate::usb;
18
19use super::RtlSdrDevice;
20use super::reader::ReaderBusyGuard;
21
22/// Callback type for async reading. Called with a byte slice of
23/// IQ data for each completed bulk transfer inside
24/// [`RtlSdrDevice::read_async_blocking`]'s loop.
25///
26/// # Callback contract (per audit pass-2 #71)
27///
28/// The callback runs on the thread that called
29/// [`RtlSdrDevice::read_async_blocking`], inside the read loop,
30/// while the reader-busy flag is held. To keep the loop
31/// responsive to the cancel flag and the device's bulk-read
32/// cadence:
33///
34/// - **Don't block on shared state held outside the callback.**
35/// Acquiring a mutex the cancel-flag setter also touches will
36/// deadlock the cancel.
37/// - **Don't perform long synchronous I/O.** The next bulk read
38/// doesn't fire until the callback returns; consumer-side
39/// blocking shows up as bytes-per-second drops.
40/// - **Don't panic under `panic = "abort"`.** The
41/// [`super::reader::ReaderBusyGuard`]'s Drop releases the
42/// busy-flag during normal unwind, but `abort` skips Drop
43/// and leaves the flag stuck `true` for the device's
44/// lifetime. Standard `panic = "unwind"` (the default) is
45/// safe because Rust's RAII handles the release.
46///
47/// In short: treat the callback as a buffer-handoff site, not a
48/// processing site. Push the bytes into a queue / channel /
49/// ringbuffer and do the actual work elsewhere.
50pub type ReadAsyncCb = Box<dyn FnMut(&[u8]) + Send>;
51
52/// Maximum allowed buffer length for async reads (16 MB).
53const MAX_BUF_LENGTH: u32 = 16 * 1024 * 1024;
54
55/// USB bulk transfer alignment requirement (bytes).
56const BULK_ALIGNMENT: u32 = 512;
57
58/// Async read loop timeout for cancel flag polling.
59const ASYNC_POLL_TIMEOUT: Duration = Duration::from_secs(1);
60
61/// Maximum consecutive `Ok(0)` reads tolerated by
62/// [`RtlSdrDevice::read_async_blocking`] before the loop fuses.
63///
64/// A healthy RTL-SDR returns either `Ok(n > 0)` bytes or
65/// `Err(Timeout)` — `Ok(0)` is a sentinel meaning the USB transfer
66/// completed with zero payload (typically a ZLP from a
67/// misbehaving / stalling device). The pre-#12 loop retried
68/// `Ok(0)` indefinitely, matching the C upstream — but a
69/// degenerate device producing ZLPs forever would lock the
70/// callback consumer in a tight retry loop with no diagnostic.
71///
72/// 100 reads × the `ASYNC_POLL_TIMEOUT` upper bound (1 s) =
73/// ~100 s worst-case before the loop fuses. Healthy devices
74/// reset the counter on the first `Ok(n > 0)`.
75///
76/// **Caveat (per audit pass-2 #69):** the counter is incremented
77/// only on `Ok(0)` — `Err(Timeout)` does NOT increment it. A
78/// degenerate device alternating `Ok(0), Timeout, Ok(0),
79/// Timeout, ...` would never fuse because each Timeout looks
80/// like a "no progress" event but doesn't count. The "100 × 1 s
81/// ≈ 100 s worst-case" bound holds only for pure-Ok(0) streams;
82/// interleaved Timeouts can stretch it indefinitely. Callers who
83/// care about a strict cancel deadline should set their own
84/// outer timeout (e.g. `tokio::time::timeout`) on top of this
85/// loop. Per audit issue #12 / "Reconcile Ok(0) semantics."
86const MAX_CONSECUTIVE_ZERO_READS: u32 = 100;
87
88/// Internal bulk-read helper shared by [`RtlSdrDevice::read_sync`]
89/// and [`SampleIter::next`] (and, in `reader.rs`, also by
90/// [`super::reader::RtlSdrReader::read_sync`] / `ReaderIter::next`).
91/// Does NOT acquire the reader-busy guard — callers are responsible
92/// for that contract; this function only does the actual USB
93/// bulk-IN transfer + `NoDevice` translation.
94///
95/// `dev_lost` is set to `true` on the `NoDevice → DeviceLost`
96/// translation path. The flag is shared with the parent
97/// [`RtlSdrDevice`] so its [`Drop`] impl can skip cleanup against
98/// a vanished handle (avoids a stream of cryptic
99/// "register access failed" lines from cleanup writes that would
100/// every return `NoDevice`). Per audit pass-2 #40.
101pub(crate) fn bulk_read(
102 handle: &rusb::DeviceHandle<rusb::GlobalContext>,
103 dev_lost: &AtomicBool,
104 buf: &mut [u8],
105) -> Result<usize, RtlSdrError> {
106 // BULK_TIMEOUT = 0 selects the streaming-friendly 5 s default
107 // (NOT libusb's "no timeout" convention) so drop-cancellation
108 // is observable within at most one bulk-read cycle. See the
109 // constant's docs for the full rationale. Per audit pass-2 #47.
110 let timeout = if BULK_TIMEOUT == 0 {
111 Duration::from_secs(5)
112 } else {
113 Duration::from_millis(BULK_TIMEOUT)
114 };
115 translate_bulk_result(
116 handle.read_bulk(crate::constants::BULK_ENDPOINT, buf, timeout),
117 dev_lost,
118 )
119}
120
121/// Translate a rusb bulk-read result into the crate's typed shape,
122/// side-effecting the `dev_lost` flag on disconnect.
123///
124/// Three rusb variants count as disconnect:
125/// - [`rusb::Error::NoDevice`] — libusb's authoritative signal.
126/// - [`rusb::Error::Pipe`] — endpoint stall; on Linux this is
127/// the common mid-flight-disconnect surrogate before libusb
128/// downgrades subsequent calls to `NoDevice`.
129/// - [`rusb::Error::Io`] — generic transport I/O failure; same
130/// Linux mid-flight surrogate.
131///
132/// All three set `dev_lost` AND normalize to [`RtlSdrError::DeviceLost`]
133/// so the bulk-read surface and [`RtlSdrError::is_disconnected`]
134/// agree (CodeRabbit on PR #80 caught the earlier asymmetry —
135/// pre-fix, `Pipe`/`Io` looked disconnected to callers via
136/// `is_disconnected()` but didn't trigger the `dev_lost` flag,
137/// so `Drop` ran cleanup against a dead handle).
138///
139/// Pulled out of [`bulk_read`] so the disconnect-detection
140/// behavior can be unit-tested without a real USB handle.
141/// Per audit pass-2 #40 + #43.
142fn translate_bulk_result(
143 result: rusb::Result<usize>,
144 dev_lost: &AtomicBool,
145) -> Result<usize, RtlSdrError> {
146 match result {
147 Ok(n) => Ok(n),
148 Err(rusb::Error::NoDevice | rusb::Error::Pipe | rusb::Error::Io) => {
149 // `Release` pairs with the `Drop` impl's `Acquire`
150 // load — any cleanup-skipping decision must observe
151 // a flag set by a happens-before bulk-read failure.
152 dev_lost.store(true, Ordering::Release);
153 Err(RtlSdrError::DeviceLost)
154 }
155 Err(e) => Err(e.into()),
156 }
157}
158
159impl RtlSdrDevice {
160 /// Reset the USB endpoint buffer.
161 ///
162 /// Ports `rtlsdr_reset_buffer`.
163 pub fn reset_buffer(&self) -> Result<(), RtlSdrError> {
164 usb::write_reg(
165 &self.handle,
166 Block::Usb,
167 crate::reg::usb_reg::USB_EPA_CTL,
168 0x1002,
169 2,
170 )?;
171 usb::write_reg(
172 &self.handle,
173 Block::Usb,
174 crate::reg::usb_reg::USB_EPA_CTL,
175 0x0000,
176 2,
177 )
178 }
179
180 /// Get a shared reference to the USB handle for spawning a reader thread.
181 ///
182 /// The returned Arc can be sent to another thread for concurrent bulk reads
183 /// while the main thread retains access for control transfers.
184 ///
185 /// # Concurrency hazard
186 ///
187 /// This is the *escape hatch* around the single-active-stream
188 /// guard that [`Self::read_sync`], [`Self::iter_samples`],
189 /// [`Self::read_async_blocking`], and the
190 /// [`super::RtlSdrReader`] streaming methods enforce via
191 /// [`RtlSdrError::DeviceBusy`]. Doing your own
192 /// `read_bulk(BULK_ENDPOINT, ...)` on this handle bypasses
193 /// that guard entirely.
194 ///
195 /// libusb permits concurrent bulk submits on the same
196 /// endpoint, but the responses interleave non-deterministically
197 /// — each thread sees valid bytes for its own libusb transfer,
198 /// but neither has the complete IQ stream. Only use this
199 /// escape hatch if you serialize bulk reads yourself (one
200 /// worker thread at a time on endpoint 0x81). Per #7.
201 ///
202 /// # Disconnect detection
203 ///
204 /// Bypassing the typed surface also bypasses the
205 /// `NoDevice → DeviceLost` translation that
206 /// [`Self::read_sync`] / [`super::RtlSdrReader::read_sync`]
207 /// perform internally — your raw `read_bulk` calls will
208 /// surface `rusb::Error::NoDevice` (and on Linux,
209 /// `rusb::Error::Pipe` / `Io` mid-flight) directly. Use
210 /// [`crate::RtlSdrError::is_disconnected`] to classify
211 /// rusb errors against the same disconnect-set the typed
212 /// surface uses; pre-#43 (0.2.1) consumers had to maintain
213 /// their own classifier. Per audit pass-2 #67.
214 pub fn usb_handle(&self) -> std::sync::Arc<rusb::DeviceHandle<rusb::GlobalContext>> {
215 std::sync::Arc::clone(&self.handle)
216 }
217
218 /// Synchronous (blocking) read of IQ samples.
219 ///
220 /// Ports `rtlsdr_read_sync`. Returns the number of bytes read.
221 ///
222 /// # Errors
223 ///
224 /// Returns [`RtlSdrError::DeviceBusy`] if another bulk-read
225 /// activity (sync read, blocking iterator, async stream) is
226 /// already in flight on this device. Per #7.
227 pub fn read_sync(&self, buf: &mut [u8]) -> Result<usize, RtlSdrError> {
228 let _guard = ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy))?;
229 bulk_read(&self.handle, &self.dev_lost, buf)
230 }
231
232 /// Iterate IQ samples as a sequence of owned byte buffers.
233 ///
234 /// Returns an `Iterator` whose [`Iterator::next`] blocks the
235 /// calling thread until one buffer's worth of samples is ready
236 /// (a single `read_sync` underneath), then yields a freshly-
237 /// allocated `Vec<u8>` of the actual byte count read. Each
238 /// item is `Result<Vec<u8>, RtlSdrError>` so transport errors
239 /// surface in-band; the iterator fuses (returns `None` from
240 /// then on) after the first error or a zero-length read.
241 ///
242 /// This is the foundation for both sync streaming (use
243 /// directly) and async streaming wrappers (the per-runtime
244 /// `stream_samples_*` methods drive this iterator inside a
245 /// blocking task).
246 ///
247 /// # Buffer size
248 ///
249 /// `buffer_size` is the bytes-per-yield target. The librtlsdr
250 /// default is 256 KB (16 × 32 × 512). Smaller buffers give
251 /// lower per-item latency but more allocator traffic; larger
252 /// buffers amortise USB overhead but increase per-buffer
253 /// latency. The size doesn't have to be a multiple of the USB
254 /// 512-byte packet — `read_sync` returns the actual byte count
255 /// — but multiples of 512 avoid short final transfers.
256 ///
257 /// Passing `0` selects the librtlsdr-equivalent default
258 /// (256 KB) rather than requesting a zero-length buffer —
259 /// matches the upstream "pass 0 for the default" ergonomic
260 /// and prevents a typo from silently fusing the iterator on
261 /// the first call (which would look like EOF).
262 ///
263 /// # Allocation
264 ///
265 /// Each yielded `Vec<u8>` is a fresh allocation. At the
266 /// 256 KB / 65 ms cadence of typical RTL-SDR rates this is
267 /// negligible (~15 allocs/sec). Smaller buffers scale
268 /// linearly: a 4 KB buffer at 2 Msps is ~1000 allocs/sec
269 /// (still acceptable on desktop), but at 512 bytes you're at
270 /// ~7800 allocs/sec and an arena/pool starts to matter.
271 /// For tight loops or embedded use prefer [`Self::read_sync`]
272 /// directly with a reused caller-owned buffer. Per #20.
273 ///
274 /// ```no_run
275 /// # use librtlsdr_rs::{RtlSdrDevice, RtlSdrError};
276 /// # fn main() -> Result<(), RtlSdrError> {
277 /// let dev = RtlSdrDevice::open(0)?;
278 /// dev.reset_buffer()?;
279 /// // Take the first 10 buffers — each ~65 ms at 2 Msps.
280 /// for chunk in dev.iter_samples(262_144).take(10) {
281 /// let bytes = chunk?;
282 /// // process `bytes`...
283 /// # let _ = bytes;
284 /// }
285 /// # Ok(())
286 /// # }
287 /// ```
288 pub fn iter_samples(&self, buffer_size: usize) -> SampleIter<'_> {
289 // Normalise zero to the librtlsdr-equivalent default
290 // (256 KB). A `buffer_size == 0` typo would otherwise
291 // hand `read_sync` an empty slice, which the USB
292 // backend treats as an immediate zero-length read —
293 // the iterator's zero-fuse path triggers, and the
294 // caller sees an empty `for chunk in iter { … }` that
295 // looks like EOF rather than a configuration mistake.
296 // Per #632 CR round 1.
297 let buffer_size = if buffer_size == 0 {
298 DEFAULT_BUF_LENGTH as usize
299 } else {
300 buffer_size
301 };
302 // Acquire the reader-busy guard for the iterator's lifetime.
303 // On contention, store the error in `pending_error` and yield
304 // it on first `next()` (then fuse) — matches the existing
305 // fuse-on-error contract documented on `SampleIter`. Per #7.
306 let (guard, pending_error) =
307 match ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy)) {
308 Ok(g) => (Some(g), None),
309 Err(e) => (None, Some(e)),
310 };
311 SampleIter {
312 device: Some(self),
313 buffer_size,
314 _guard: guard,
315 pending_error,
316 }
317 }
318
319 /// Read IQ samples in a blocking loop, calling the callback for each buffer.
320 ///
321 /// This is a simplified port of `rtlsdr_read_async`. It blocks the calling
322 /// thread and reads bulk data, calling `cb` for each completed buffer.
323 /// Use `cancel_flag` to signal cancellation from another thread.
324 ///
325 /// - `cb`: callback called with each buffer of IQ data
326 /// - `cancel_flag`: set to `true` from another thread to stop reading
327 /// - `buf_len`: buffer length in bytes (0 = default, must be multiple of 512)
328 ///
329 /// # Termination
330 ///
331 /// Returns when any of the following:
332 /// - `cancel_flag` becomes `true` (caller-initiated; returns `Ok(())`)
333 /// - The underlying USB read returns `NoDevice` (returns
334 /// `Err(DeviceLost)`) or any other transport error
335 /// - 100 consecutive `Ok(0)` (zero-length) reads have been
336 /// observed (returns `Ok(())` with a `tracing::warn!`). A
337 /// healthy device returns either `Ok(n > 0)` or
338 /// `Err(Timeout)`; sustained `Ok(0)` indicates a degenerate
339 /// device that the C upstream would loop on forever.
340 /// Brings this path into rough parity with `iter_samples`'s
341 /// defensive `Ok(0)` fuse. Per audit issue #12.
342 ///
343 /// # Cancellation latency
344 ///
345 /// The cancel flag is checked between bulk reads. Each bulk
346 /// read uses a 1-second timeout (the polling cadence), so
347 /// worst-case observation latency from
348 /// `cancel_flag.store(true, …)` to the function returning is
349 /// ~1 second on an idle device, plus up to one bulk-read time
350 /// (~65 ms typical at 2 Msps) on an actively-streaming device.
351 /// True in-flight cancellation needs libusb's async-submit +
352 /// cancel API and is tracked as #633. Per audit #20.
353 pub fn read_async_blocking(
354 &self,
355 mut cb: ReadAsyncCb,
356 cancel_flag: &AtomicBool,
357 buf_len: u32,
358 ) -> Result<(), RtlSdrError> {
359 // Acquire the reader-busy flag for the entire callback loop.
360 // Released on Drop when this function returns. Per #7.
361 let _guard = ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy))?;
362
363 let actual_buf_len = if buf_len == 0 {
364 DEFAULT_BUF_LENGTH as usize
365 } else if !buf_len.is_multiple_of(BULK_ALIGNMENT) || buf_len > MAX_BUF_LENGTH {
366 return Err(RtlSdrError::InvalidParameter(format!(
367 "buf_len must be a multiple of {BULK_ALIGNMENT} and <= {MAX_BUF_LENGTH}, got {buf_len}"
368 )));
369 } else {
370 buf_len as usize
371 };
372
373 let timeout = ASYNC_POLL_TIMEOUT;
374 let mut buf = vec![0u8; actual_buf_len];
375 let mut consecutive_zero_reads: u32 = 0;
376
377 // Relaxed ordering is sufficient: there's no other state
378 // being synchronized through this flag — the worst-case
379 // visibility latency is one extra bulk-read iteration
380 // (one ASYNC_POLL_TIMEOUT). Don't "upgrade" to SeqCst on
381 // a hot loop without a concrete invariant requiring it.
382 // Per audit issue #20.
383 while !cancel_flag.load(Ordering::Relaxed) {
384 match self
385 .handle
386 .read_bulk(crate::constants::BULK_ENDPOINT, &mut buf, timeout)
387 {
388 Ok(n) if n > 0 => {
389 consecutive_zero_reads = 0;
390 cb(&buf[..n]);
391 }
392 Ok(_) => {
393 // Zero-length read. A healthy device shouldn't
394 // produce these; a degenerate device producing
395 // ZLPs forever would lock the consumer in a
396 // tight retry loop. Fuse after a documented
397 // bound. Per audit issue #12.
398 consecutive_zero_reads += 1;
399 if consecutive_zero_reads >= MAX_CONSECUTIVE_ZERO_READS {
400 tracing::warn!(
401 "read_async_blocking: {MAX_CONSECUTIVE_ZERO_READS} consecutive \
402 zero-length reads — fusing the loop (degenerate device?)"
403 );
404 return Ok(());
405 }
406 }
407 Err(rusb::Error::Timeout) => {
408 // Timeout doesn't reset the counter (it carries
409 // no signal about whether the device is producing
410 // ZLPs) but doesn't increment it either —
411 // distinct from Ok(0).
412 }
413 Err(rusb::Error::NoDevice | rusb::Error::Pipe | rusb::Error::Io) => {
414 // Mirror `bulk_read`'s `dev_lost` side effect
415 // so `Drop` can skip cleanup against a
416 // vanished handle. Treats Linux hot-unplug
417 // surrogates (`Pipe`/`Io`) same as
418 // `NoDevice` — see `translate_bulk_result`
419 // doc for why. Per audit pass-2 #40 + #43.
420 self.dev_lost.store(true, Ordering::Release);
421 return Err(RtlSdrError::DeviceLost);
422 }
423 Err(e) => {
424 tracing::error!("bulk read error: {e}");
425 return Err(RtlSdrError::Usb(e));
426 }
427 }
428 }
429
430 Ok(())
431 }
432}
433
434/// Blocking iterator over IQ-sample buffers, returned by
435/// [`RtlSdrDevice::iter_samples`].
436///
437/// Each [`Iterator::next`] call performs one [`RtlSdrDevice::read_sync`]
438/// into a freshly-allocated `Vec<u8>` and yields it. The iterator
439/// fuses on the first error or zero-length read — once `next`
440/// returns `Some(Err(_))` (or `None` from a zero read), all
441/// subsequent calls return `None` so callers can use the standard
442/// `for chunk in iter { let chunk = chunk?; ... }` shape without
443/// worrying about post-error state.
444pub struct SampleIter<'a> {
445 /// `None` once the iterator has fused (error or zero read).
446 /// Borrows the device shared (`&`) because [`RtlSdrDevice::read_sync`]
447 /// is `&self` — the underlying USB bulk transfer doesn't need
448 /// mutable access.
449 device: Option<&'a RtlSdrDevice>,
450 buffer_size: usize,
451 /// Reader-busy guard held for the iterator's lifetime. Acquired
452 /// at construction (`iter_samples`); released on Drop. `None` if
453 /// construction failed to acquire (in which case
454 /// `pending_error` carries the `DeviceBusy` to yield on first
455 /// `next()`) — also `None` after the iterator drops itself.
456 /// Per #7.
457 _guard: Option<ReaderBusyGuard>,
458 /// Construction-time guard-acquire failure to yield on the next
459 /// (= first) `next()` call. Cleared after yielding; the
460 /// iterator fuses normally afterward via `device = None`.
461 pending_error: Option<RtlSdrError>,
462}
463
464impl Iterator for SampleIter<'_> {
465 type Item = Result<Vec<u8>, RtlSdrError>;
466
467 fn next(&mut self) -> Option<Self::Item> {
468 // Yield any deferred construction error first, then fuse.
469 if let Some(e) = self.pending_error.take() {
470 self.device = None;
471 return Some(Err(e));
472 }
473 let device = self.device?;
474 let mut buf = vec![0u8; self.buffer_size];
475 // Bypass `device.read_sync` (which would re-acquire its own
476 // guard per call) — the iterator already holds the guard
477 // for its lifetime via `_guard`. Per #7.
478 match bulk_read(&device.handle, &device.dev_lost, &mut buf) {
479 Ok(0) => {
480 // Zero-length read — treat as end-of-stream so
481 // callers using `.take(N)` / `for ... in iter`
482 // don't spin forever on a degenerate device.
483 self.device = None;
484 None
485 }
486 Ok(n) => {
487 buf.truncate(n);
488 Some(Ok(buf))
489 }
490 Err(e) => {
491 // Fuse after first error so subsequent calls
492 // return `None` rather than re-yielding the
493 // same error indefinitely.
494 self.device = None;
495 Some(Err(e))
496 }
497 }
498 }
499}
500
501impl std::iter::FusedIterator for SampleIter<'_> {}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 // Pin the trait-impl contract documented on `SampleIter` —
508 // standard `Iterator` + `FusedIterator` so consumers can rely
509 // on `for x in iter` shape AND on the post-fuse-returns-None
510 // contract without empirical testing. If a refactor ever
511 // changes the iterator shape, this fires at compile time.
512 const _: fn() = || {
513 fn assert_iter<T: Iterator>() {}
514 fn assert_fused<T: std::iter::FusedIterator>() {}
515 assert_iter::<SampleIter<'_>>();
516 assert_fused::<SampleIter<'_>>();
517 };
518
519 // Pin `SampleIter: !Send` — the borrowed iterator is the
520 // single-thread surface (the owned `ReaderIter` is the
521 // sendable one). `SampleIter<'a>` borrows `&'a RtlSdrDevice`
522 // and `RtlSdrDevice: !Sync`, so `&RtlSdrDevice: !Send`,
523 // making `SampleIter: !Send` transitively. If a future field
524 // change ever made `RtlSdrDevice: Sync`, `SampleIter` would
525 // silently become Sendable — and a downstream consumer might
526 // inadvertently move it across threads, violating the
527 // documented "single-threaded sync iteration" contract.
528 // Per audit issue #20.
529 static_assertions::assert_not_impl_any!(SampleIter<'static>: Send);
530
531 /// Per audit pass-2 #40: the `NoDevice → DeviceLost`
532 /// translation must side-effect the shared `dev_lost` flag
533 /// so the parent device's `Drop` can skip cleanup against a
534 /// vanished handle.
535 #[test]
536 fn translate_no_device_sets_dev_lost_flag() {
537 let flag = AtomicBool::new(false);
538 let result = translate_bulk_result(Err(rusb::Error::NoDevice), &flag);
539 assert!(matches!(result, Err(RtlSdrError::DeviceLost)));
540 assert!(flag.load(Ordering::Acquire), "dev_lost should be set");
541 }
542
543 #[test]
544 fn translate_ok_does_not_touch_dev_lost_flag() {
545 let flag = AtomicBool::new(false);
546 let result = translate_bulk_result(Ok(42), &flag);
547 assert!(matches!(result, Ok(42)));
548 assert!(!flag.load(Ordering::Acquire));
549 }
550
551 /// Per CodeRabbit on PR #80: `Pipe` and `Io` are the Linux
552 /// hot-unplug surrogates surfaced before libusb downgrades
553 /// to `NoDevice`. They must trigger the same side effect
554 /// (set `dev_lost`) and normalize to the same error
555 /// (`DeviceLost`) as `NoDevice` itself, otherwise the
556 /// bulk-read surface and `is_disconnected()` disagree and
557 /// `Drop` runs cleanup against a dead handle.
558 #[test]
559 fn translate_pipe_and_io_treated_as_disconnect() {
560 for kind in [rusb::Error::Pipe, rusb::Error::Io] {
561 let flag = AtomicBool::new(false);
562 let result = translate_bulk_result(Err(kind), &flag);
563 assert!(
564 matches!(result, Err(RtlSdrError::DeviceLost)),
565 "{kind:?} should normalize to DeviceLost"
566 );
567 assert!(
568 flag.load(Ordering::Acquire),
569 "dev_lost should be set for {kind:?}"
570 );
571 }
572 }
573
574 /// Pin that genuinely-transient or unrelated transport
575 /// errors do NOT trip the disconnect flag. `Timeout` is the
576 /// most important — a slow stream is healthy, not lost.
577 /// `Access` and `Overflow` mirror the exclusion set the
578 /// `is_disconnected` test pins (CodeRabbit on PR #80) so
579 /// the two surfaces stay in sync — a future widening of
580 /// either set fires both tests.
581 #[test]
582 fn translate_other_errors_do_not_touch_dev_lost_flag() {
583 for kind in [
584 rusb::Error::Timeout,
585 rusb::Error::Overflow,
586 rusb::Error::Access,
587 ] {
588 let flag = AtomicBool::new(false);
589 let _ = translate_bulk_result(Err(kind), &flag);
590 assert!(
591 !flag.load(Ordering::Acquire),
592 "dev_lost should not fire for {kind:?}"
593 );
594 }
595 }
596
597 /// Pin idempotence: a second `NoDevice` after the flag is
598 /// already set must be a no-op (still sets, still returns
599 /// `DeviceLost`, no panic). Real bulk-read paths might
600 /// retry once after the flag is set.
601 #[test]
602 fn translate_no_device_is_idempotent() {
603 let flag = AtomicBool::new(true);
604 let result = translate_bulk_result(Err(rusb::Error::NoDevice), &flag);
605 assert!(matches!(result, Err(RtlSdrError::DeviceLost)));
606 assert!(flag.load(Ordering::Acquire));
607 }
608}