Skip to main content

librtlsdr_rs/device/
reader.rs

1//! Streaming-focused handle that runs concurrently with control.
2//!
3//! See [`RtlSdrReader`] and [`RtlSdrDevice::reader`].
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8use crate::error::RtlSdrError;
9
10// Brought into scope only so this file's intra-doc links
11// (`[`RtlSdrDevice::reader`]`, etc.) resolve under
12// `cargo doc -D warnings`. The type is not referenced by name in
13// any module-level code path here — clippy's `unused_imports` lint
14// flags this without the explicit allow. Per Code Rabbit on #23.
15#[allow(unused_imports)]
16use super::RtlSdrDevice;
17
18/// RAII guard for the per-device reader-busy flag. Acquiring sets
19/// the flag to `true` via `compare_exchange`; dropping clears it.
20///
21/// Used to ensure at most one bulk-read activity (sync read,
22/// blocking iterator, async stream) is in flight on USB endpoint
23/// 0x81 at a time. Concurrent bulk reads on the same endpoint
24/// silently split the contiguous IQ stream between callers — each
25/// thread sees valid bytes for its own transfer, but neither has
26/// the complete signal. Per #7.
27///
28/// Constructed via [`Self::try_acquire`]; never instantiated
29/// directly.
30//
31// `dead_code` allow lifts in the follow-up commits that wire the
32// guard into `RtlSdrDevice` + `RtlSdrReader` bulk-read entry points.
33// Per #7 plan; remove this allow when those callers exist.
34#[allow(dead_code)]
35#[derive(Debug)]
36pub(crate) struct ReaderBusyGuard {
37    flag: Arc<AtomicBool>,
38}
39
40#[allow(dead_code)]
41impl ReaderBusyGuard {
42    /// Try to acquire the reader-busy flag. Returns
43    /// `Err(RtlSdrError::DeviceBusy)` if another bulk-read activity
44    /// is already in flight on this device.
45    ///
46    /// `Acquire` ordering on success and on the failure-load path is
47    /// sufficient: the only invariant being synchronized is "another
48    /// caller holds the flag," and the matching `Release` in `Drop`
49    /// happens-before the next successful acquire.
50    pub(crate) fn try_acquire(flag: Arc<AtomicBool>) -> Result<Self, RtlSdrError> {
51        flag.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
52            .map_err(|_| RtlSdrError::DeviceBusy)?;
53        Ok(Self { flag })
54    }
55}
56
57impl Drop for ReaderBusyGuard {
58    fn drop(&mut self) {
59        // Release ordering pairs with the Acquire on the next
60        // try_acquire — any writes performed while the guard was
61        // held are observable to the next acquirer.
62        self.flag.store(false, Ordering::Release);
63    }
64}
65
66/// Streaming-focused handle. Acquired via [`RtlSdrDevice::reader`].
67///
68/// `RtlSdrReader` exists to resolve the design tension between
69/// Rust's ownership model (control methods like
70/// [`RtlSdrDevice::set_center_freq`] take `&mut self`; concurrent
71/// streaming would require holding `self` for the duration) and
72/// the underlying USB protocol's reality (bulk reads use endpoint
73/// 0x81; control transfers use endpoint 0x00 — different
74/// endpoints, no conflict on real hardware).
75///
76/// The reader internally clones the device's
77/// `Arc<rusb::DeviceHandle>`, then exposes the streaming surface
78/// (sync iterator + per-runtime async streams) by consuming the
79/// reader. The parent retains the [`RtlSdrDevice`] for control:
80///
81/// ```no_run
82/// # use librtlsdr_rs::{RtlSdrDevice, RtlSdrError};
83/// # fn example() -> Result<(), RtlSdrError> {
84/// let mut device = RtlSdrDevice::open(0)?;
85/// device.set_sample_rate(2_400_000)?;
86/// device.set_center_freq(100_000_000)?;
87/// device.reset_buffer()?;
88///
89/// // Hand a reader to a worker thread.
90/// let reader = device.reader();
91/// let thread = std::thread::spawn(move || {
92///     for chunk in reader.iter_samples(262_144) {
93///         match chunk {
94///             Ok(buf) => { /* push to ring / DSP */ let _ = buf; }
95///             Err(e) => { eprintln!("read error: {e}"); break; }
96///         }
97///     }
98/// });
99///
100/// // Parent thread retains control of the device while the reader
101/// // streams — separate USB endpoints, no rusb-level conflict.
102/// device.set_center_freq(101_000_000)?;
103/// device.set_tuner_gain(150)?;
104/// # let _ = thread;
105/// # Ok(())
106/// # }
107/// ```
108///
109/// # Concurrency safety
110///
111/// The shared-handle pattern (one `Arc<DeviceHandle>` reffed by
112/// both the parent device and any reader) is what upstream
113/// `librtlsdr`'s reference implementations have used for years.
114/// Bulk reads on endpoint 0x81 don't interfere with control
115/// transfers on endpoint 0x00 at the libusb level on real hardware.
116///
117/// **However**, libusb's documentation does not formally
118/// guarantee that concurrent bulk and control transfers on a
119/// single device handle are safe. The shared-handle pattern is a
120/// practical convention rather than a documented promise. If you
121/// need strict by-the-book safety, sequence the operations from
122/// a single thread (e.g. fully drop the reader before retuning,
123/// then build a new reader). For the typical "stream while
124/// retuning the satellite at AOS" pattern this works reliably on
125/// the dongles in active use; verify against your specific
126/// hardware in production.
127///
128/// # Single active streaming session
129///
130/// At most one bulk-read activity may be in flight on the device
131/// at a time — across `RtlSdrDevice::{read_sync, iter_samples,
132/// read_async_blocking}` and `RtlSdrReader::{read_sync,
133/// iter_samples, stream_samples_tokio, stream_samples_smol}`.
134/// Concurrent attempts return [`RtlSdrError::DeviceBusy`].
135///
136/// This invariant exists because libusb permits concurrent submits
137/// on the same endpoint, but the resulting transfer responses
138/// interleave non-deterministically — each thread sees valid
139/// bytes for its own libusb transfer, but neither has the
140/// complete IQ stream. The runtime guard makes the contention
141/// observable as a typed error rather than as silent
142/// sample-stream corruption.
143///
144/// `RtlSdrDevice::usb_handle()` is the documented escape hatch and
145/// is *not* gated; bypassing the typed reader path lets you
146/// re-create the corruption hazard. See its method docs.
147///
148/// # Cheap clone via the device
149///
150/// A [`RtlSdrReader`] is just an `Arc` clone of the device's USB
151/// handle plus the per-device busy-flag. Build one via
152/// [`RtlSdrDevice::reader`] any time you need a fresh streaming
153/// handle — the cost is two atomic increments. Cloning is allowed
154/// (and cheap), but only one clone may have an active streaming
155/// session at a time; the rest get [`RtlSdrError::DeviceBusy`].
156#[derive(Clone)]
157pub struct RtlSdrReader {
158    pub(crate) handle: Arc<rusb::DeviceHandle<rusb::GlobalContext>>,
159    /// Per-device reader-busy flag (cloned from the parent
160    /// [`RtlSdrDevice::reader_busy`]). Acquired via
161    /// [`ReaderBusyGuard::try_acquire`] at the top of every
162    /// bulk-read entry point on this reader to enforce single-active-
163    /// reader. Per #7.
164    pub(crate) busy: Arc<AtomicBool>,
165    /// Per-device disconnect flag (cloned from the parent
166    /// [`RtlSdrDevice::dev_lost`]). Set by [`bulk_read`] on the
167    /// `NoDevice → DeviceLost` translation so the parent
168    /// device's `Drop` skips cleanup against a vanished handle.
169    /// Per audit pass-2 #40.
170    pub(crate) dev_lost: Arc<AtomicBool>,
171}
172
173impl RtlSdrReader {
174    /// Synchronous bulk read into a caller-owned buffer.
175    ///
176    /// Mirror of [`RtlSdrDevice::read_sync`] with the same
177    /// semantics, exposed on the Reader so streaming code that
178    /// already has a Reader doesn't need to round-trip through
179    /// the device.
180    ///
181    /// # Errors
182    ///
183    /// - [`RtlSdrError::DeviceLost`] if the dongle was
184    ///   disconnected.
185    /// - [`RtlSdrError::DeviceBusy`] if another bulk-read activity
186    ///   (sync read, blocking iterator, async stream) is already
187    ///   in flight on this device. Per #7.
188    /// - [`RtlSdrError::Usb`] for any other rusb transport
189    ///   error.
190    pub fn read_sync(&self, buf: &mut [u8]) -> Result<usize, RtlSdrError> {
191        let _guard = ReaderBusyGuard::try_acquire(Arc::clone(&self.busy))?;
192        super::streaming::bulk_read(&self.handle, &self.dev_lost, buf)
193    }
194
195    /// Sync iterator over IQ-sample buffers, consuming the
196    /// reader.
197    ///
198    /// Each [`Iterator::next`] performs one [`Self::read_sync`]
199    /// into a freshly-allocated `Vec<u8>` and yields it. Same
200    /// fuse-on-error semantics as [`RtlSdrDevice::iter_samples`]:
201    /// returns `None` permanently after the first error or
202    /// zero-length read.
203    ///
204    /// Consumes the reader so the iterator owns the
205    /// `Arc<DeviceHandle>` clone — usable across thread
206    /// boundaries (`'static`-friendly, sendable).
207    ///
208    /// # Buffer size
209    ///
210    /// Same guidance as [`RtlSdrDevice::iter_samples`] — 256 KB
211    /// (`262_144`) is the librtlsdr-equivalent default. Passing
212    /// `0` selects the default.
213    #[must_use]
214    pub fn iter_samples(self, buffer_size: usize) -> ReaderIter {
215        let buffer_size = if buffer_size == 0 {
216            crate::constants::DEFAULT_BUF_LENGTH as usize
217        } else {
218            buffer_size
219        };
220        // Acquire the reader-busy guard for the iterator's lifetime.
221        // On contention, `pending_error` carries the `DeviceBusy`
222        // to yield on first `next()` (then fuse) — matches the
223        // existing fuse-on-error contract documented on
224        // `ReaderIter`. Per #7.
225        let (guard, pending_error) = match ReaderBusyGuard::try_acquire(Arc::clone(&self.busy)) {
226            Ok(g) => (Some(g), None),
227            Err(e) => (None, Some(e)),
228        };
229        ReaderIter {
230            reader: Some(self),
231            buffer_size,
232            _guard: guard,
233            pending_error,
234        }
235    }
236}
237
238/// Owned, sendable iterator over IQ-sample buffers, returned by
239/// [`RtlSdrReader::iter_samples`].
240///
241/// Differs from [`crate::SampleIter`] in that it owns the reader
242/// (and thus the underlying `Arc<DeviceHandle>` clone) rather
243/// than borrowing the device — so it satisfies `'static` and can
244/// be sent to other threads / async runtimes. Same
245/// `FusedIterator` contract: `None` permanently after the first
246/// error or zero read.
247pub struct ReaderIter {
248    /// `None` once the iterator has fused.
249    reader: Option<RtlSdrReader>,
250    buffer_size: usize,
251    /// Reader-busy guard held for the iterator's lifetime. Acquired
252    /// at construction (`iter_samples`); released on Drop. `None` if
253    /// construction failed to acquire (in which case
254    /// `pending_error` carries the `DeviceBusy` to yield on first
255    /// `next()`) — also `None` after the iterator drops itself.
256    /// Per #7.
257    _guard: Option<ReaderBusyGuard>,
258    /// Construction-time guard-acquire failure to yield on the next
259    /// (= first) `next()` call. Cleared after yielding; the
260    /// iterator fuses normally afterward via `reader = None`.
261    pending_error: Option<RtlSdrError>,
262}
263
264impl Iterator for ReaderIter {
265    type Item = Result<Vec<u8>, RtlSdrError>;
266
267    fn next(&mut self) -> Option<Self::Item> {
268        // Yield any deferred construction error first, then fuse.
269        if let Some(e) = self.pending_error.take() {
270            self.reader = None;
271            return Some(Err(e));
272        }
273        let reader = self.reader.as_ref()?;
274        let mut buf = vec![0u8; self.buffer_size];
275        // Bypass `reader.read_sync` (which would re-acquire its own
276        // guard per call) — the iterator already holds the guard
277        // for its lifetime via `_guard`. Per #7.
278        match super::streaming::bulk_read(&reader.handle, &reader.dev_lost, &mut buf) {
279            Ok(0) => {
280                self.reader = None;
281                None
282            }
283            Ok(n) => {
284                buf.truncate(n);
285                Some(Ok(buf))
286            }
287            Err(e) => {
288                self.reader = None;
289                Some(Err(e))
290            }
291        }
292    }
293}
294
295impl std::iter::FusedIterator for ReaderIter {}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    // Pin the trait + marker contract: ReaderIter is Iterator +
302    // FusedIterator + Send. The Send guarantee is the whole
303    // point of the Reader split — the iterator must move freely
304    // between threads / async runtimes.
305    const _: fn() = || {
306        fn assert_iter<T: Iterator>() {}
307        fn assert_fused<T: std::iter::FusedIterator>() {}
308        fn assert_send<T: Send>() {}
309        assert_iter::<ReaderIter>();
310        assert_fused::<ReaderIter>();
311        assert_send::<ReaderIter>();
312        assert_send::<RtlSdrReader>();
313    };
314
315    /// Per #7: a second `try_acquire` while a guard is alive must
316    /// return [`RtlSdrError::DeviceBusy`].
317    #[test]
318    fn busy_guard_first_acquire_succeeds_second_returns_device_busy() {
319        let flag = Arc::new(AtomicBool::new(false));
320        let _guard1 = ReaderBusyGuard::try_acquire(Arc::clone(&flag))
321            .expect("first acquire on a free flag must succeed");
322        let result = ReaderBusyGuard::try_acquire(Arc::clone(&flag));
323        assert!(
324            matches!(result, Err(RtlSdrError::DeviceBusy)),
325            "expected DeviceBusy on contended acquire, got {result:?}",
326        );
327    }
328
329    /// Per #7: dropping the guard must clear the flag so subsequent
330    /// acquires succeed.
331    #[test]
332    fn busy_guard_drop_releases_flag() {
333        let flag = Arc::new(AtomicBool::new(false));
334        {
335            let _guard = ReaderBusyGuard::try_acquire(Arc::clone(&flag))
336                .expect("first acquire must succeed");
337            assert!(
338                flag.load(Ordering::Acquire),
339                "flag must be set while guard is alive",
340            );
341        }
342        assert!(
343            !flag.load(Ordering::Acquire),
344            "flag must be cleared after guard drop",
345        );
346        let _guard2 = ReaderBusyGuard::try_acquire(Arc::clone(&flag))
347            .expect("acquire after drop must succeed");
348    }
349}