librtlsdr_rs/device/reader.rs
1//! Streaming-focused handle that runs concurrently with control.
2//!
3//! See [`RtlSdrReader`] and [`RtlSdrDevice::reader`].
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8use crate::error::RtlSdrError;
9
10// Brought into scope only so this file's intra-doc links
11// (`[`RtlSdrDevice::reader`]`, etc.) resolve under
12// `cargo doc -D warnings`. The type is not referenced by name in
13// any module-level code path here — clippy's `unused_imports` lint
14// flags this without the explicit allow. Per Code Rabbit on #23.
15#[allow(unused_imports)]
16use super::RtlSdrDevice;
17
18/// RAII guard for the per-device reader-busy flag. Acquiring sets
19/// the flag to `true` via `compare_exchange`; dropping clears it.
20///
21/// Used to ensure at most one bulk-read activity (sync read,
22/// blocking iterator, async stream) is in flight on USB endpoint
23/// 0x81 at a time. Concurrent bulk reads on the same endpoint
24/// silently split the contiguous IQ stream between callers — each
25/// thread sees valid bytes for its own transfer, but neither has
26/// the complete signal. Per #7.
27///
28/// Constructed via [`Self::try_acquire`]; never instantiated
29/// directly.
30//
31// `dead_code` allow lifts in the follow-up commits that wire the
32// guard into `RtlSdrDevice` + `RtlSdrReader` bulk-read entry points.
33// Per #7 plan; remove this allow when those callers exist.
34#[allow(dead_code)]
35#[derive(Debug)]
36pub(crate) struct ReaderBusyGuard {
37 flag: Arc<AtomicBool>,
38}
39
40#[allow(dead_code)]
41impl ReaderBusyGuard {
42 /// Try to acquire the reader-busy flag. Returns
43 /// `Err(RtlSdrError::DeviceBusy)` if another bulk-read activity
44 /// is already in flight on this device.
45 ///
46 /// `Acquire` ordering on success and on the failure-load path is
47 /// sufficient: the only invariant being synchronized is "another
48 /// caller holds the flag," and the matching `Release` in `Drop`
49 /// happens-before the next successful acquire.
50 pub(crate) fn try_acquire(flag: Arc<AtomicBool>) -> Result<Self, RtlSdrError> {
51 flag.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
52 .map_err(|_| RtlSdrError::DeviceBusy)?;
53 Ok(Self { flag })
54 }
55}
56
57impl Drop for ReaderBusyGuard {
58 fn drop(&mut self) {
59 // Release ordering pairs with the Acquire on the next
60 // try_acquire — any writes performed while the guard was
61 // held are observable to the next acquirer.
62 self.flag.store(false, Ordering::Release);
63 }
64}
65
66/// Streaming-focused handle. Acquired via [`RtlSdrDevice::reader`].
67///
68/// `RtlSdrReader` exists to resolve the design tension between
69/// Rust's ownership model (control methods like
70/// [`RtlSdrDevice::set_center_freq`] take `&mut self`; concurrent
71/// streaming would require holding `self` for the duration) and
72/// the underlying USB protocol's reality (bulk reads use endpoint
73/// 0x81; control transfers use endpoint 0x00 — different
74/// endpoints, no conflict on real hardware).
75///
76/// The reader internally clones the device's
77/// `Arc<rusb::DeviceHandle>`, then exposes the streaming surface
78/// (sync iterator + per-runtime async streams) by consuming the
79/// reader. The parent retains the [`RtlSdrDevice`] for control:
80///
81/// ```no_run
82/// # use librtlsdr_rs::{RtlSdrDevice, RtlSdrError};
83/// # fn example() -> Result<(), RtlSdrError> {
84/// let mut device = RtlSdrDevice::open(0)?;
85/// device.set_sample_rate(2_400_000)?;
86/// device.set_center_freq(100_000_000)?;
87/// device.reset_buffer()?;
88///
89/// // Hand a reader to a worker thread.
90/// let reader = device.reader();
91/// let thread = std::thread::spawn(move || {
92/// for chunk in reader.iter_samples(262_144) {
93/// match chunk {
94/// Ok(buf) => { /* push to ring / DSP */ let _ = buf; }
95/// Err(e) => { eprintln!("read error: {e}"); break; }
96/// }
97/// }
98/// });
99///
100/// // Parent thread retains control of the device while the reader
101/// // streams — separate USB endpoints, no rusb-level conflict.
102/// device.set_center_freq(101_000_000)?;
103/// device.set_tuner_gain(150)?;
104/// # let _ = thread;
105/// # Ok(())
106/// # }
107/// ```
108///
109/// # Concurrency safety
110///
111/// The shared-handle pattern (one `Arc<DeviceHandle>` reffed by
112/// both the parent device and any reader) is what upstream
113/// `librtlsdr`'s reference implementations have used for years.
114/// Bulk reads on endpoint 0x81 don't interfere with control
115/// transfers on endpoint 0x00 at the libusb level on real hardware.
116///
117/// **However**, libusb's documentation does not formally
118/// guarantee that concurrent bulk and control transfers on a
119/// single device handle are safe. The shared-handle pattern is a
120/// practical convention rather than a documented promise. If you
121/// need strict by-the-book safety, sequence the operations from
122/// a single thread (e.g. fully drop the reader before retuning,
123/// then build a new reader). For the typical "stream while
124/// retuning the satellite at AOS" pattern this works reliably on
125/// the dongles in active use; verify against your specific
126/// hardware in production.
127///
128/// # Single active streaming session
129///
130/// At most one bulk-read activity may be in flight on the device
131/// at a time — across `RtlSdrDevice::{read_sync, iter_samples,
132/// read_async_blocking}` and `RtlSdrReader::{read_sync,
133/// iter_samples, stream_samples_tokio, stream_samples_smol}`.
134/// Concurrent attempts return [`RtlSdrError::DeviceBusy`].
135///
136/// This invariant exists because libusb permits concurrent submits
137/// on the same endpoint, but the resulting transfer responses
138/// interleave non-deterministically — each thread sees valid
139/// bytes for its own libusb transfer, but neither has the
140/// complete IQ stream. The runtime guard makes the contention
141/// observable as a typed error rather than as silent
142/// sample-stream corruption.
143///
144/// `RtlSdrDevice::usb_handle()` is the documented escape hatch and
145/// is *not* gated; bypassing the typed reader path lets you
146/// re-create the corruption hazard. See its method docs.
147///
148/// # Cheap clone via the device
149///
150/// A [`RtlSdrReader`] is just an `Arc` clone of the device's USB
151/// handle plus the per-device busy-flag. Build one via
152/// [`RtlSdrDevice::reader`] any time you need a fresh streaming
153/// handle — the cost is two atomic increments. Cloning is allowed
154/// (and cheap), but only one clone may have an active streaming
155/// session at a time; the rest get [`RtlSdrError::DeviceBusy`].
156#[derive(Clone)]
157pub struct RtlSdrReader {
158 pub(crate) handle: Arc<rusb::DeviceHandle<rusb::GlobalContext>>,
159 /// Per-device reader-busy flag (cloned from the parent
160 /// [`RtlSdrDevice::reader_busy`]). Acquired via
161 /// [`ReaderBusyGuard::try_acquire`] at the top of every
162 /// bulk-read entry point on this reader to enforce single-active-
163 /// reader. Per #7.
164 pub(crate) busy: Arc<AtomicBool>,
165 /// Per-device disconnect flag (cloned from the parent
166 /// [`RtlSdrDevice::dev_lost`]). Set by [`bulk_read`] on the
167 /// `NoDevice → DeviceLost` translation so the parent
168 /// device's `Drop` skips cleanup against a vanished handle.
169 /// Per audit pass-2 #40.
170 pub(crate) dev_lost: Arc<AtomicBool>,
171}
172
173impl RtlSdrReader {
174 /// Synchronous bulk read into a caller-owned buffer.
175 ///
176 /// Mirror of [`RtlSdrDevice::read_sync`] with the same
177 /// semantics, exposed on the Reader so streaming code that
178 /// already has a Reader doesn't need to round-trip through
179 /// the device.
180 ///
181 /// # Errors
182 ///
183 /// - [`RtlSdrError::DeviceLost`] if the dongle was
184 /// disconnected.
185 /// - [`RtlSdrError::DeviceBusy`] if another bulk-read activity
186 /// (sync read, blocking iterator, async stream) is already
187 /// in flight on this device. Per #7.
188 /// - [`RtlSdrError::Usb`] for any other rusb transport
189 /// error.
190 pub fn read_sync(&self, buf: &mut [u8]) -> Result<usize, RtlSdrError> {
191 let _guard = ReaderBusyGuard::try_acquire(Arc::clone(&self.busy))?;
192 super::streaming::bulk_read(&self.handle, &self.dev_lost, buf)
193 }
194
195 /// Sync iterator over IQ-sample buffers, consuming the
196 /// reader.
197 ///
198 /// Each [`Iterator::next`] performs one [`Self::read_sync`]
199 /// into a freshly-allocated `Vec<u8>` and yields it. Same
200 /// fuse-on-error semantics as [`RtlSdrDevice::iter_samples`]:
201 /// returns `None` permanently after the first error or
202 /// zero-length read.
203 ///
204 /// Consumes the reader so the iterator owns the
205 /// `Arc<DeviceHandle>` clone — usable across thread
206 /// boundaries (`'static`-friendly, sendable).
207 ///
208 /// # Buffer size
209 ///
210 /// Same guidance as [`RtlSdrDevice::iter_samples`] — 256 KB
211 /// (`262_144`) is the librtlsdr-equivalent default. Passing
212 /// `0` selects the default.
213 #[must_use]
214 pub fn iter_samples(self, buffer_size: usize) -> ReaderIter {
215 let buffer_size = if buffer_size == 0 {
216 crate::constants::DEFAULT_BUF_LENGTH as usize
217 } else {
218 buffer_size
219 };
220 // Acquire the reader-busy guard for the iterator's lifetime.
221 // On contention, `pending_error` carries the `DeviceBusy`
222 // to yield on first `next()` (then fuse) — matches the
223 // existing fuse-on-error contract documented on
224 // `ReaderIter`. Per #7.
225 let (guard, pending_error) = match ReaderBusyGuard::try_acquire(Arc::clone(&self.busy)) {
226 Ok(g) => (Some(g), None),
227 Err(e) => (None, Some(e)),
228 };
229 ReaderIter {
230 reader: Some(self),
231 buffer_size,
232 _guard: guard,
233 pending_error,
234 }
235 }
236}
237
238/// Owned, sendable iterator over IQ-sample buffers, returned by
239/// [`RtlSdrReader::iter_samples`].
240///
241/// Differs from [`crate::SampleIter`] in that it owns the reader
242/// (and thus the underlying `Arc<DeviceHandle>` clone) rather
243/// than borrowing the device — so it satisfies `'static` and can
244/// be sent to other threads / async runtimes. Same
245/// `FusedIterator` contract: `None` permanently after the first
246/// error or zero read.
247pub struct ReaderIter {
248 /// `None` once the iterator has fused.
249 reader: Option<RtlSdrReader>,
250 buffer_size: usize,
251 /// Reader-busy guard held for the iterator's lifetime. Acquired
252 /// at construction (`iter_samples`); released on Drop. `None` if
253 /// construction failed to acquire (in which case
254 /// `pending_error` carries the `DeviceBusy` to yield on first
255 /// `next()`) — also `None` after the iterator drops itself.
256 /// Per #7.
257 _guard: Option<ReaderBusyGuard>,
258 /// Construction-time guard-acquire failure to yield on the next
259 /// (= first) `next()` call. Cleared after yielding; the
260 /// iterator fuses normally afterward via `reader = None`.
261 pending_error: Option<RtlSdrError>,
262}
263
264impl Iterator for ReaderIter {
265 type Item = Result<Vec<u8>, RtlSdrError>;
266
267 fn next(&mut self) -> Option<Self::Item> {
268 // Yield any deferred construction error first, then fuse.
269 if let Some(e) = self.pending_error.take() {
270 self.reader = None;
271 return Some(Err(e));
272 }
273 let reader = self.reader.as_ref()?;
274 let mut buf = vec![0u8; self.buffer_size];
275 // Bypass `reader.read_sync` (which would re-acquire its own
276 // guard per call) — the iterator already holds the guard
277 // for its lifetime via `_guard`. Per #7.
278 match super::streaming::bulk_read(&reader.handle, &reader.dev_lost, &mut buf) {
279 Ok(0) => {
280 self.reader = None;
281 None
282 }
283 Ok(n) => {
284 buf.truncate(n);
285 Some(Ok(buf))
286 }
287 Err(e) => {
288 self.reader = None;
289 Some(Err(e))
290 }
291 }
292 }
293}
294
295impl std::iter::FusedIterator for ReaderIter {}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 // Pin the trait + marker contract: ReaderIter is Iterator +
302 // FusedIterator + Send. The Send guarantee is the whole
303 // point of the Reader split — the iterator must move freely
304 // between threads / async runtimes.
305 const _: fn() = || {
306 fn assert_iter<T: Iterator>() {}
307 fn assert_fused<T: std::iter::FusedIterator>() {}
308 fn assert_send<T: Send>() {}
309 assert_iter::<ReaderIter>();
310 assert_fused::<ReaderIter>();
311 assert_send::<ReaderIter>();
312 assert_send::<RtlSdrReader>();
313 };
314
315 /// Per #7: a second `try_acquire` while a guard is alive must
316 /// return [`RtlSdrError::DeviceBusy`].
317 #[test]
318 fn busy_guard_first_acquire_succeeds_second_returns_device_busy() {
319 let flag = Arc::new(AtomicBool::new(false));
320 let _guard1 = ReaderBusyGuard::try_acquire(Arc::clone(&flag))
321 .expect("first acquire on a free flag must succeed");
322 let result = ReaderBusyGuard::try_acquire(Arc::clone(&flag));
323 assert!(
324 matches!(result, Err(RtlSdrError::DeviceBusy)),
325 "expected DeviceBusy on contended acquire, got {result:?}",
326 );
327 }
328
329 /// Per #7: dropping the guard must clear the flag so subsequent
330 /// acquires succeed.
331 #[test]
332 fn busy_guard_drop_releases_flag() {
333 let flag = Arc::new(AtomicBool::new(false));
334 {
335 let _guard = ReaderBusyGuard::try_acquire(Arc::clone(&flag))
336 .expect("first acquire must succeed");
337 assert!(
338 flag.load(Ordering::Acquire),
339 "flag must be set while guard is alive",
340 );
341 }
342 assert!(
343 !flag.load(Ordering::Acquire),
344 "flag must be cleared after guard drop",
345 );
346 let _guard2 = ReaderBusyGuard::try_acquire(Arc::clone(&flag))
347 .expect("acquire after drop must succeed");
348 }
349}