Skip to main content

librtlsdr_rs/device/
streaming.rs

1//! Data streaming — buffer reset, sync read, async read.
2//!
3//! Ports `rtlsdr_reset_buffer`, `rtlsdr_read_sync`,
4//! `rtlsdr_read_async`, `rtlsdr_cancel_async`.
5//!
6//! Note: The C implementation uses libusb's async transfer API with multiple
7//! pre-submitted bulk transfers. The Rust implementation uses a blocking
8//! read loop that checks a shared cancellation flag. True async support
9//! will be added when the pipeline is wired up with worker threads.
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::time::Duration;
13
14use crate::constants::{BULK_TIMEOUT, DEFAULT_BUF_LENGTH};
15use crate::error::RtlSdrError;
16use crate::reg::Block;
17use crate::usb;
18
19use super::RtlSdrDevice;
20use super::reader::ReaderBusyGuard;
21
22/// Callback type for async reading. Called with a byte slice of
23/// IQ data for each completed bulk transfer inside
24/// [`RtlSdrDevice::read_async_blocking`]'s loop.
25///
26/// # Callback contract (per audit pass-2 #71)
27///
28/// The callback runs on the thread that called
29/// [`RtlSdrDevice::read_async_blocking`], inside the read loop,
30/// while the reader-busy flag is held. To keep the loop
31/// responsive to the cancel flag and the device's bulk-read
32/// cadence:
33///
34/// - **Don't block on shared state held outside the callback.**
35///   Acquiring a mutex the cancel-flag setter also touches will
36///   deadlock the cancel.
37/// - **Don't perform long synchronous I/O.** The next bulk read
38///   doesn't fire until the callback returns; consumer-side
39///   blocking shows up as bytes-per-second drops.
40/// - **Don't panic under `panic = "abort"`.** The
41///   [`super::reader::ReaderBusyGuard`]'s Drop releases the
42///   busy-flag during normal unwind, but `abort` skips Drop
43///   and leaves the flag stuck `true` for the device's
44///   lifetime. Standard `panic = "unwind"` (the default) is
45///   safe because Rust's RAII handles the release.
46///
47/// In short: treat the callback as a buffer-handoff site, not a
48/// processing site. Push the bytes into a queue / channel /
49/// ringbuffer and do the actual work elsewhere.
50pub type ReadAsyncCb = Box<dyn FnMut(&[u8]) + Send>;
51
52/// Maximum allowed buffer length for async reads (16 MB).
53const MAX_BUF_LENGTH: u32 = 16 * 1024 * 1024;
54
55/// USB bulk transfer alignment requirement (bytes).
56const BULK_ALIGNMENT: u32 = 512;
57
58/// Async read loop timeout for cancel flag polling.
59const ASYNC_POLL_TIMEOUT: Duration = Duration::from_secs(1);
60
61/// Maximum consecutive `Ok(0)` reads tolerated by
62/// [`RtlSdrDevice::read_async_blocking`] before the loop fuses.
63///
64/// A healthy RTL-SDR returns either `Ok(n > 0)` bytes or
65/// `Err(Timeout)` — `Ok(0)` is a sentinel meaning the USB transfer
66/// completed with zero payload (typically a ZLP from a
67/// misbehaving / stalling device). The pre-#12 loop retried
68/// `Ok(0)` indefinitely, matching the C upstream — but a
69/// degenerate device producing ZLPs forever would lock the
70/// callback consumer in a tight retry loop with no diagnostic.
71///
72/// 100 reads × the `ASYNC_POLL_TIMEOUT` upper bound (1 s) =
73/// ~100 s worst-case before the loop fuses. Healthy devices
74/// reset the counter on the first `Ok(n > 0)`.
75///
76/// **Caveat (per audit pass-2 #69):** the counter is incremented
77/// only on `Ok(0)` — `Err(Timeout)` does NOT increment it. A
78/// degenerate device alternating `Ok(0), Timeout, Ok(0),
79/// Timeout, ...` would never fuse because each Timeout looks
80/// like a "no progress" event but doesn't count. The "100 × 1 s
81/// ≈ 100 s worst-case" bound holds only for pure-Ok(0) streams;
82/// interleaved Timeouts can stretch it indefinitely. Callers who
83/// care about a strict cancel deadline should set their own
84/// outer timeout (e.g. `tokio::time::timeout`) on top of this
85/// loop. Per audit issue #12 / "Reconcile Ok(0) semantics."
86const MAX_CONSECUTIVE_ZERO_READS: u32 = 100;
87
88/// Internal bulk-read helper shared by [`RtlSdrDevice::read_sync`]
89/// and [`SampleIter::next`] (and, in `reader.rs`, also by
90/// [`super::reader::RtlSdrReader::read_sync`] / `ReaderIter::next`).
91/// Does NOT acquire the reader-busy guard — callers are responsible
92/// for that contract; this function only does the actual USB
93/// bulk-IN transfer + `NoDevice` translation.
94///
95/// `dev_lost` is set to `true` on the `NoDevice → DeviceLost`
96/// translation path. The flag is shared with the parent
97/// [`RtlSdrDevice`] so its [`Drop`] impl can skip cleanup against
98/// a vanished handle (avoids a stream of cryptic
99/// "register access failed" lines from cleanup writes that would
100/// every return `NoDevice`). Per audit pass-2 #40.
101pub(crate) fn bulk_read(
102    handle: &rusb::DeviceHandle<rusb::GlobalContext>,
103    dev_lost: &AtomicBool,
104    buf: &mut [u8],
105) -> Result<usize, RtlSdrError> {
106    // BULK_TIMEOUT = 0 selects the streaming-friendly 5 s default
107    // (NOT libusb's "no timeout" convention) so drop-cancellation
108    // is observable within at most one bulk-read cycle. See the
109    // constant's docs for the full rationale. Per audit pass-2 #47.
110    let timeout = if BULK_TIMEOUT == 0 {
111        Duration::from_secs(5)
112    } else {
113        Duration::from_millis(BULK_TIMEOUT)
114    };
115    translate_bulk_result(
116        handle.read_bulk(crate::constants::BULK_ENDPOINT, buf, timeout),
117        dev_lost,
118    )
119}
120
121/// Translate a rusb bulk-read result into the crate's typed shape,
122/// side-effecting the `dev_lost` flag on disconnect.
123///
124/// Three rusb variants count as disconnect:
125/// - [`rusb::Error::NoDevice`] — libusb's authoritative signal.
126/// - [`rusb::Error::Pipe`] — endpoint stall; on Linux this is
127///   the common mid-flight-disconnect surrogate before libusb
128///   downgrades subsequent calls to `NoDevice`.
129/// - [`rusb::Error::Io`] — generic transport I/O failure; same
130///   Linux mid-flight surrogate.
131///
132/// All three set `dev_lost` AND normalize to [`RtlSdrError::DeviceLost`]
133/// so the bulk-read surface and [`RtlSdrError::is_disconnected`]
134/// agree (CodeRabbit on PR #80 caught the earlier asymmetry —
135/// pre-fix, `Pipe`/`Io` looked disconnected to callers via
136/// `is_disconnected()` but didn't trigger the `dev_lost` flag,
137/// so `Drop` ran cleanup against a dead handle).
138///
139/// Pulled out of [`bulk_read`] so the disconnect-detection
140/// behavior can be unit-tested without a real USB handle.
141/// Per audit pass-2 #40 + #43.
142fn translate_bulk_result(
143    result: rusb::Result<usize>,
144    dev_lost: &AtomicBool,
145) -> Result<usize, RtlSdrError> {
146    match result {
147        Ok(n) => Ok(n),
148        Err(rusb::Error::NoDevice | rusb::Error::Pipe | rusb::Error::Io) => {
149            // `Release` pairs with the `Drop` impl's `Acquire`
150            // load — any cleanup-skipping decision must observe
151            // a flag set by a happens-before bulk-read failure.
152            dev_lost.store(true, Ordering::Release);
153            Err(RtlSdrError::DeviceLost)
154        }
155        Err(e) => Err(e.into()),
156    }
157}
158
159impl RtlSdrDevice {
160    /// Reset the USB endpoint buffer.
161    ///
162    /// Ports `rtlsdr_reset_buffer`.
163    pub fn reset_buffer(&self) -> Result<(), RtlSdrError> {
164        usb::write_reg(
165            &self.handle,
166            Block::Usb,
167            crate::reg::usb_reg::USB_EPA_CTL,
168            0x1002,
169            2,
170        )?;
171        usb::write_reg(
172            &self.handle,
173            Block::Usb,
174            crate::reg::usb_reg::USB_EPA_CTL,
175            0x0000,
176            2,
177        )
178    }
179
180    /// Get a shared reference to the USB handle for spawning a reader thread.
181    ///
182    /// The returned Arc can be sent to another thread for concurrent bulk reads
183    /// while the main thread retains access for control transfers.
184    ///
185    /// # Concurrency hazard
186    ///
187    /// This is the *escape hatch* around the single-active-stream
188    /// guard that [`Self::read_sync`], [`Self::iter_samples`],
189    /// [`Self::read_async_blocking`], and the
190    /// [`super::RtlSdrReader`] streaming methods enforce via
191    /// [`RtlSdrError::DeviceBusy`]. Doing your own
192    /// `read_bulk(BULK_ENDPOINT, ...)` on this handle bypasses
193    /// that guard entirely.
194    ///
195    /// libusb permits concurrent bulk submits on the same
196    /// endpoint, but the responses interleave non-deterministically
197    /// — each thread sees valid bytes for its own libusb transfer,
198    /// but neither has the complete IQ stream. Only use this
199    /// escape hatch if you serialize bulk reads yourself (one
200    /// worker thread at a time on endpoint 0x81). Per #7.
201    ///
202    /// # Disconnect detection
203    ///
204    /// Bypassing the typed surface also bypasses the
205    /// `NoDevice → DeviceLost` translation that
206    /// [`Self::read_sync`] / [`super::RtlSdrReader::read_sync`]
207    /// perform internally — your raw `read_bulk` calls will
208    /// surface `rusb::Error::NoDevice` (and on Linux,
209    /// `rusb::Error::Pipe` / `Io` mid-flight) directly. Use
210    /// [`crate::RtlSdrError::is_disconnected`] to classify
211    /// rusb errors against the same disconnect-set the typed
212    /// surface uses; pre-#43 (0.2.1) consumers had to maintain
213    /// their own classifier. Per audit pass-2 #67.
214    pub fn usb_handle(&self) -> std::sync::Arc<rusb::DeviceHandle<rusb::GlobalContext>> {
215        std::sync::Arc::clone(&self.handle)
216    }
217
218    /// Synchronous (blocking) read of IQ samples.
219    ///
220    /// Ports `rtlsdr_read_sync`. Returns the number of bytes read.
221    ///
222    /// # Errors
223    ///
224    /// Returns [`RtlSdrError::DeviceBusy`] if another bulk-read
225    /// activity (sync read, blocking iterator, async stream) is
226    /// already in flight on this device. Per #7.
227    pub fn read_sync(&self, buf: &mut [u8]) -> Result<usize, RtlSdrError> {
228        let _guard = ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy))?;
229        bulk_read(&self.handle, &self.dev_lost, buf)
230    }
231
232    /// Iterate IQ samples as a sequence of owned byte buffers.
233    ///
234    /// Returns an `Iterator` whose [`Iterator::next`] blocks the
235    /// calling thread until one buffer's worth of samples is ready
236    /// (a single `read_sync` underneath), then yields a freshly-
237    /// allocated `Vec<u8>` of the actual byte count read. Each
238    /// item is `Result<Vec<u8>, RtlSdrError>` so transport errors
239    /// surface in-band; the iterator fuses (returns `None` from
240    /// then on) after the first error or a zero-length read.
241    ///
242    /// This is the foundation for both sync streaming (use
243    /// directly) and async streaming wrappers (the per-runtime
244    /// `stream_samples_*` methods drive this iterator inside a
245    /// blocking task).
246    ///
247    /// # Buffer size
248    ///
249    /// `buffer_size` is the bytes-per-yield target. The librtlsdr
250    /// default is 256 KB (16 × 32 × 512). Smaller buffers give
251    /// lower per-item latency but more allocator traffic; larger
252    /// buffers amortise USB overhead but increase per-buffer
253    /// latency. The size doesn't have to be a multiple of the USB
254    /// 512-byte packet — `read_sync` returns the actual byte count
255    /// — but multiples of 512 avoid short final transfers.
256    ///
257    /// Passing `0` selects the librtlsdr-equivalent default
258    /// (256 KB) rather than requesting a zero-length buffer —
259    /// matches the upstream "pass 0 for the default" ergonomic
260    /// and prevents a typo from silently fusing the iterator on
261    /// the first call (which would look like EOF).
262    ///
263    /// # Allocation
264    ///
265    /// Each yielded `Vec<u8>` is a fresh allocation. At the
266    /// 256 KB / 65 ms cadence of typical RTL-SDR rates this is
267    /// negligible (~15 allocs/sec). Smaller buffers scale
268    /// linearly: a 4 KB buffer at 2 Msps is ~1000 allocs/sec
269    /// (still acceptable on desktop), but at 512 bytes you're at
270    /// ~7800 allocs/sec and an arena/pool starts to matter.
271    /// For tight loops or embedded use prefer [`Self::read_sync`]
272    /// directly with a reused caller-owned buffer. Per #20.
273    ///
274    /// ```no_run
275    /// # use librtlsdr_rs::{RtlSdrDevice, RtlSdrError};
276    /// # fn main() -> Result<(), RtlSdrError> {
277    /// let dev = RtlSdrDevice::open(0)?;
278    /// dev.reset_buffer()?;
279    /// // Take the first 10 buffers — each ~65 ms at 2 Msps.
280    /// for chunk in dev.iter_samples(262_144).take(10) {
281    ///     let bytes = chunk?;
282    ///     // process `bytes`...
283    ///     # let _ = bytes;
284    /// }
285    /// # Ok(())
286    /// # }
287    /// ```
288    pub fn iter_samples(&self, buffer_size: usize) -> SampleIter<'_> {
289        // Normalise zero to the librtlsdr-equivalent default
290        // (256 KB). A `buffer_size == 0` typo would otherwise
291        // hand `read_sync` an empty slice, which the USB
292        // backend treats as an immediate zero-length read —
293        // the iterator's zero-fuse path triggers, and the
294        // caller sees an empty `for chunk in iter { … }` that
295        // looks like EOF rather than a configuration mistake.
296        // Per #632 CR round 1.
297        let buffer_size = if buffer_size == 0 {
298            DEFAULT_BUF_LENGTH as usize
299        } else {
300            buffer_size
301        };
302        // Acquire the reader-busy guard for the iterator's lifetime.
303        // On contention, store the error in `pending_error` and yield
304        // it on first `next()` (then fuse) — matches the existing
305        // fuse-on-error contract documented on `SampleIter`. Per #7.
306        let (guard, pending_error) =
307            match ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy)) {
308                Ok(g) => (Some(g), None),
309                Err(e) => (None, Some(e)),
310            };
311        SampleIter {
312            device: Some(self),
313            buffer_size,
314            _guard: guard,
315            pending_error,
316        }
317    }
318
319    /// Read IQ samples in a blocking loop, calling the callback for each buffer.
320    ///
321    /// This is a simplified port of `rtlsdr_read_async`. It blocks the calling
322    /// thread and reads bulk data, calling `cb` for each completed buffer.
323    /// Use `cancel_flag` to signal cancellation from another thread.
324    ///
325    /// - `cb`: callback called with each buffer of IQ data
326    /// - `cancel_flag`: set to `true` from another thread to stop reading
327    /// - `buf_len`: buffer length in bytes (0 = default, must be multiple of 512)
328    ///
329    /// # Termination
330    ///
331    /// Returns when any of the following:
332    /// - `cancel_flag` becomes `true` (caller-initiated; returns `Ok(())`)
333    /// - The underlying USB read returns `NoDevice` (returns
334    ///   `Err(DeviceLost)`) or any other transport error
335    /// - 100 consecutive `Ok(0)` (zero-length) reads have been
336    ///   observed (returns `Ok(())` with a `tracing::warn!`). A
337    ///   healthy device returns either `Ok(n > 0)` or
338    ///   `Err(Timeout)`; sustained `Ok(0)` indicates a degenerate
339    ///   device that the C upstream would loop on forever.
340    ///   Brings this path into rough parity with `iter_samples`'s
341    ///   defensive `Ok(0)` fuse. Per audit issue #12.
342    ///
343    /// # Cancellation latency
344    ///
345    /// The cancel flag is checked between bulk reads. Each bulk
346    /// read uses a 1-second timeout (the polling cadence), so
347    /// worst-case observation latency from
348    /// `cancel_flag.store(true, …)` to the function returning is
349    /// ~1 second on an idle device, plus up to one bulk-read time
350    /// (~65 ms typical at 2 Msps) on an actively-streaming device.
351    /// True in-flight cancellation needs libusb's async-submit +
352    /// cancel API and is tracked as #633. Per audit #20.
353    pub fn read_async_blocking(
354        &self,
355        mut cb: ReadAsyncCb,
356        cancel_flag: &AtomicBool,
357        buf_len: u32,
358    ) -> Result<(), RtlSdrError> {
359        // Acquire the reader-busy flag for the entire callback loop.
360        // Released on Drop when this function returns. Per #7.
361        let _guard = ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy))?;
362
363        let actual_buf_len = if buf_len == 0 {
364            DEFAULT_BUF_LENGTH as usize
365        } else if !buf_len.is_multiple_of(BULK_ALIGNMENT) || buf_len > MAX_BUF_LENGTH {
366            return Err(RtlSdrError::InvalidParameter(format!(
367                "buf_len must be a multiple of {BULK_ALIGNMENT} and <= {MAX_BUF_LENGTH}, got {buf_len}"
368            )));
369        } else {
370            buf_len as usize
371        };
372
373        let timeout = ASYNC_POLL_TIMEOUT;
374        let mut buf = vec![0u8; actual_buf_len];
375        let mut consecutive_zero_reads: u32 = 0;
376
377        // Relaxed ordering is sufficient: there's no other state
378        // being synchronized through this flag — the worst-case
379        // visibility latency is one extra bulk-read iteration
380        // (one ASYNC_POLL_TIMEOUT). Don't "upgrade" to SeqCst on
381        // a hot loop without a concrete invariant requiring it.
382        // Per audit issue #20.
383        while !cancel_flag.load(Ordering::Relaxed) {
384            match self
385                .handle
386                .read_bulk(crate::constants::BULK_ENDPOINT, &mut buf, timeout)
387            {
388                Ok(n) if n > 0 => {
389                    consecutive_zero_reads = 0;
390                    cb(&buf[..n]);
391                }
392                Ok(_) => {
393                    // Zero-length read. A healthy device shouldn't
394                    // produce these; a degenerate device producing
395                    // ZLPs forever would lock the consumer in a
396                    // tight retry loop. Fuse after a documented
397                    // bound. Per audit issue #12.
398                    consecutive_zero_reads += 1;
399                    if consecutive_zero_reads >= MAX_CONSECUTIVE_ZERO_READS {
400                        tracing::warn!(
401                            "read_async_blocking: {MAX_CONSECUTIVE_ZERO_READS} consecutive \
402                             zero-length reads — fusing the loop (degenerate device?)"
403                        );
404                        return Ok(());
405                    }
406                }
407                Err(rusb::Error::Timeout) => {
408                    // Timeout doesn't reset the counter (it carries
409                    // no signal about whether the device is producing
410                    // ZLPs) but doesn't increment it either —
411                    // distinct from Ok(0).
412                }
413                Err(rusb::Error::NoDevice | rusb::Error::Pipe | rusb::Error::Io) => {
414                    // Mirror `bulk_read`'s `dev_lost` side effect
415                    // so `Drop` can skip cleanup against a
416                    // vanished handle. Treats Linux hot-unplug
417                    // surrogates (`Pipe`/`Io`) same as
418                    // `NoDevice` — see `translate_bulk_result`
419                    // doc for why. Per audit pass-2 #40 + #43.
420                    self.dev_lost.store(true, Ordering::Release);
421                    return Err(RtlSdrError::DeviceLost);
422                }
423                Err(e) => {
424                    tracing::error!("bulk read error: {e}");
425                    return Err(RtlSdrError::Usb(e));
426                }
427            }
428        }
429
430        Ok(())
431    }
432}
433
434/// Blocking iterator over IQ-sample buffers, returned by
435/// [`RtlSdrDevice::iter_samples`].
436///
437/// Each [`Iterator::next`] call performs one [`RtlSdrDevice::read_sync`]
438/// into a freshly-allocated `Vec<u8>` and yields it. The iterator
439/// fuses on the first error or zero-length read — once `next`
440/// returns `Some(Err(_))` (or `None` from a zero read), all
441/// subsequent calls return `None` so callers can use the standard
442/// `for chunk in iter { let chunk = chunk?; ... }` shape without
443/// worrying about post-error state.
444pub struct SampleIter<'a> {
445    /// `None` once the iterator has fused (error or zero read).
446    /// Borrows the device shared (`&`) because [`RtlSdrDevice::read_sync`]
447    /// is `&self` — the underlying USB bulk transfer doesn't need
448    /// mutable access.
449    device: Option<&'a RtlSdrDevice>,
450    buffer_size: usize,
451    /// Reader-busy guard held for the iterator's lifetime. Acquired
452    /// at construction (`iter_samples`); released on Drop. `None` if
453    /// construction failed to acquire (in which case
454    /// `pending_error` carries the `DeviceBusy` to yield on first
455    /// `next()`) — also `None` after the iterator drops itself.
456    /// Per #7.
457    _guard: Option<ReaderBusyGuard>,
458    /// Construction-time guard-acquire failure to yield on the next
459    /// (= first) `next()` call. Cleared after yielding; the
460    /// iterator fuses normally afterward via `device = None`.
461    pending_error: Option<RtlSdrError>,
462}
463
464impl Iterator for SampleIter<'_> {
465    type Item = Result<Vec<u8>, RtlSdrError>;
466
467    fn next(&mut self) -> Option<Self::Item> {
468        // Yield any deferred construction error first, then fuse.
469        if let Some(e) = self.pending_error.take() {
470            self.device = None;
471            return Some(Err(e));
472        }
473        let device = self.device?;
474        let mut buf = vec![0u8; self.buffer_size];
475        // Bypass `device.read_sync` (which would re-acquire its own
476        // guard per call) — the iterator already holds the guard
477        // for its lifetime via `_guard`. Per #7.
478        match bulk_read(&device.handle, &device.dev_lost, &mut buf) {
479            Ok(0) => {
480                // Zero-length read — treat as end-of-stream so
481                // callers using `.take(N)` / `for ... in iter`
482                // don't spin forever on a degenerate device.
483                self.device = None;
484                None
485            }
486            Ok(n) => {
487                buf.truncate(n);
488                Some(Ok(buf))
489            }
490            Err(e) => {
491                // Fuse after first error so subsequent calls
492                // return `None` rather than re-yielding the
493                // same error indefinitely.
494                self.device = None;
495                Some(Err(e))
496            }
497        }
498    }
499}
500
501impl std::iter::FusedIterator for SampleIter<'_> {}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    // Pin the trait-impl contract documented on `SampleIter` —
508    // standard `Iterator` + `FusedIterator` so consumers can rely
509    // on `for x in iter` shape AND on the post-fuse-returns-None
510    // contract without empirical testing. If a refactor ever
511    // changes the iterator shape, this fires at compile time.
512    const _: fn() = || {
513        fn assert_iter<T: Iterator>() {}
514        fn assert_fused<T: std::iter::FusedIterator>() {}
515        assert_iter::<SampleIter<'_>>();
516        assert_fused::<SampleIter<'_>>();
517    };
518
519    // Pin `SampleIter: !Send` — the borrowed iterator is the
520    // single-thread surface (the owned `ReaderIter` is the
521    // sendable one). `SampleIter<'a>` borrows `&'a RtlSdrDevice`
522    // and `RtlSdrDevice: !Sync`, so `&RtlSdrDevice: !Send`,
523    // making `SampleIter: !Send` transitively. If a future field
524    // change ever made `RtlSdrDevice: Sync`, `SampleIter` would
525    // silently become Sendable — and a downstream consumer might
526    // inadvertently move it across threads, violating the
527    // documented "single-threaded sync iteration" contract.
528    // Per audit issue #20.
529    static_assertions::assert_not_impl_any!(SampleIter<'static>: Send);
530
531    /// Per audit pass-2 #40: the `NoDevice → DeviceLost`
532    /// translation must side-effect the shared `dev_lost` flag
533    /// so the parent device's `Drop` can skip cleanup against a
534    /// vanished handle.
535    #[test]
536    fn translate_no_device_sets_dev_lost_flag() {
537        let flag = AtomicBool::new(false);
538        let result = translate_bulk_result(Err(rusb::Error::NoDevice), &flag);
539        assert!(matches!(result, Err(RtlSdrError::DeviceLost)));
540        assert!(flag.load(Ordering::Acquire), "dev_lost should be set");
541    }
542
543    #[test]
544    fn translate_ok_does_not_touch_dev_lost_flag() {
545        let flag = AtomicBool::new(false);
546        let result = translate_bulk_result(Ok(42), &flag);
547        assert!(matches!(result, Ok(42)));
548        assert!(!flag.load(Ordering::Acquire));
549    }
550
551    /// Per CodeRabbit on PR #80: `Pipe` and `Io` are the Linux
552    /// hot-unplug surrogates surfaced before libusb downgrades
553    /// to `NoDevice`. They must trigger the same side effect
554    /// (set `dev_lost`) and normalize to the same error
555    /// (`DeviceLost`) as `NoDevice` itself, otherwise the
556    /// bulk-read surface and `is_disconnected()` disagree and
557    /// `Drop` runs cleanup against a dead handle.
558    #[test]
559    fn translate_pipe_and_io_treated_as_disconnect() {
560        for kind in [rusb::Error::Pipe, rusb::Error::Io] {
561            let flag = AtomicBool::new(false);
562            let result = translate_bulk_result(Err(kind), &flag);
563            assert!(
564                matches!(result, Err(RtlSdrError::DeviceLost)),
565                "{kind:?} should normalize to DeviceLost"
566            );
567            assert!(
568                flag.load(Ordering::Acquire),
569                "dev_lost should be set for {kind:?}"
570            );
571        }
572    }
573
574    /// Pin that genuinely-transient or unrelated transport
575    /// errors do NOT trip the disconnect flag. `Timeout` is the
576    /// most important — a slow stream is healthy, not lost.
577    /// `Access` and `Overflow` mirror the exclusion set the
578    /// `is_disconnected` test pins (CodeRabbit on PR #80) so
579    /// the two surfaces stay in sync — a future widening of
580    /// either set fires both tests.
581    #[test]
582    fn translate_other_errors_do_not_touch_dev_lost_flag() {
583        for kind in [
584            rusb::Error::Timeout,
585            rusb::Error::Overflow,
586            rusb::Error::Access,
587        ] {
588            let flag = AtomicBool::new(false);
589            let _ = translate_bulk_result(Err(kind), &flag);
590            assert!(
591                !flag.load(Ordering::Acquire),
592                "dev_lost should not fire for {kind:?}"
593            );
594        }
595    }
596
597    /// Pin idempotence: a second `NoDevice` after the flag is
598    /// already set must be a no-op (still sets, still returns
599    /// `DeviceLost`, no panic). Real bulk-read paths might
600    /// retry once after the flag is set.
601    #[test]
602    fn translate_no_device_is_idempotent() {
603        let flag = AtomicBool::new(true);
604        let result = translate_bulk_result(Err(rusb::Error::NoDevice), &flag);
605        assert!(matches!(result, Err(RtlSdrError::DeviceLost)));
606        assert!(flag.load(Ordering::Acquire));
607    }
608}