desperado 0.4.1

Iterate and stream I/Q samples from stdin, files, TCP streams and SDR devices
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
//! RTL-SDR I/Q Data Source Module
//! (requires the `rtlsdr` feature)
//!
//! Provides synchronous and asynchronous I/Q readers for RTL-SDR devices,
//! using the `rs_rtl` crate (pure-Rust nusb backend) for hardware access.
//!
//! The rs_rtl driver keeps multiple USB bulk transfers in-flight simultaneously
//! via nusb's endpoint queue, eliminating the inter-transfer gap that causes
//! RTL2832U FIFO overflow at high sample rates.

use futures::Stream;
use num_complex::Complex;

use crate::{Gain, IqFormat, error};

/// Tokio-side bridge queue depth between the USB reader thread and the async consumer.
///
/// The inner `rs_rtl` streaming queue (RECOMMENDED_QUEUE_DEPTH = 32 chunks) is the
/// primary buffer; this bridges the sync reader thread → tokio async consumer.
///
/// Sized to absorb bursts when the tokio event loop is busy with OFDM/DSP work.
/// At DAB sample rates (2048 kSPS CU8 = 2 MB/s), each 16 KB chunk is ~4 ms,
/// so 32 chunks ≈ 128 ms of headroom.
const BRIDGE_QUEUE_DEPTH: usize = 32;

/**
 * Device selector for RTL-SDR devices
 */
#[derive(Debug, Clone, PartialEq)]
pub enum DeviceSelector {
    /// Select device by index (0 for first device)
    Index(usize),
    /// Select device by filters (manufacturer, product, serial).
    /// All provided filters must match.
    Filter {
        manufacturer: Option<String>,
        product: Option<String>,
        serial: Option<String>,
    },
}

impl Default for DeviceSelector {
    fn default() -> Self {
        DeviceSelector::Index(0)
    }
}

/**
 * RTL-SDR Configuration
 */
#[derive(Debug, Clone, PartialEq)]
pub struct RtlSdrConfig {
    /// Device selector (index or filters)
    pub device: DeviceSelector,
    /// Center frequency in Hz
    pub center_freq: u32,
    /// Sample rate in Hz
    pub sample_rate: u32,
    /// Tuner gain (Auto or Manual in dB)
    pub gain: Gain,
    /// Enable bias tee (default: false)
    pub bias_tee: bool,
    /// Frequency correction in PPM (default: 0)
    pub freq_correction_ppm: i32,
}

impl RtlSdrConfig {
    /// Create a new RTL-SDR configuration with device index
    pub fn new(device_index: usize, center_freq: u32, sample_rate: u32, gain: Gain) -> Self {
        Self {
            device: DeviceSelector::Index(device_index),
            center_freq,
            sample_rate,
            gain,
            bias_tee: false,
            freq_correction_ppm: 0,
        }
    }

    /// Create a new RTL-SDR configuration with device filters
    pub fn new_with_filters(
        manufacturer: Option<String>,
        product: Option<String>,
        serial: Option<String>,
        center_freq: u32,
        sample_rate: u32,
        gain: Gain,
    ) -> Self {
        Self {
            device: DeviceSelector::Filter {
                manufacturer,
                product,
                serial,
            },
            center_freq,
            sample_rate,
            gain,
            bias_tee: false,
            freq_correction_ppm: 0,
        }
    }
}

/// Control message for dynamic RTL-SDR parameter adjustment.
///
/// Sent to the async reader to adjust device parameters in real-time.
/// Only `Frequency` is supported during streaming; others are logged and ignored.
#[derive(Debug, Clone)]
pub enum RtlSdrMessage {
    /// Retune to center frequency (Hz)
    Frequency(u32),
    /// Change sample rate (Hz) — not supported during streaming
    SampleRate(u32),
    /// Change tuner gain — not supported during streaming
    Gain(Gain),
    /// Change frequency correction (PPM) — not supported during streaming
    FreqCorrection(i32),
}

/// Device information for RTL-SDR devices
#[derive(Debug, Clone, PartialEq)]
pub struct RtlSdrDeviceInfo {
    /// Device index (0-based)
    pub index: usize,
    /// Manufacturer name
    pub manufacturer: String,
    /// Product name
    pub product: String,
    /// Serial number
    pub serial: String,
}

/// List all available RTL-SDR devices.
pub fn list_devices() -> error::Result<Vec<RtlSdrDeviceInfo>> {
    let devices = rs_rtl::list_devices()
        .map_err(|e| error::Error::device(format!("Failed to enumerate devices: {e}")))?;
    Ok(devices
        .iter()
        .enumerate()
        .map(|(i, d)| RtlSdrDeviceInfo {
            index: i,
            manufacturer: d.manufacturer.clone().unwrap_or_default(),
            product: d.product.clone().unwrap_or_default(),
            serial: d.serial.clone().unwrap_or_default(),
        })
        .collect())
}

/// Resolve a DeviceSelector to a device index.
fn resolve_device_index(selector: &DeviceSelector) -> error::Result<usize> {
    match selector {
        DeviceSelector::Index(idx) => Ok(*idx),
        DeviceSelector::Filter {
            manufacturer,
            product,
            serial,
        } => {
            let devices = rs_rtl::list_devices()
                .map_err(|e| error::Error::device(format!("Failed to enumerate devices: {e}")))?;

            let matching = devices.iter().enumerate().find(|(_, d)| {
                manufacturer
                    .as_ref()
                    .is_none_or(|m| d.manufacturer.as_deref() == Some(m.as_str()))
                    && product
                        .as_ref()
                        .is_none_or(|p| d.product.as_deref() == Some(p.as_str()))
                    && serial
                        .as_ref()
                        .is_none_or(|s| d.serial.as_deref() == Some(s.as_str()))
            });

            match matching {
                Some((idx, _)) => Ok(idx),
                None => Err(error::Error::device(format!(
                    "No RTL-SDR device found matching filters: \
                     manufacturer={manufacturer:?}, product={product:?}, serial={serial:?}"
                ))),
            }
        }
    }
}

/// Open and configure an RTL-SDR device from a `RtlSdrConfig`.
fn open_and_configure(config: &RtlSdrConfig) -> error::Result<rs_rtl::RtlSdr> {
    let idx = resolve_device_index(&config.device)?;
    let mut sdr = rs_rtl::RtlSdr::open(idx)?;

    sdr.set_sample_rate(config.sample_rate)?;
    let _ = sdr.set_bandwidth(config.sample_rate);

    if config.freq_correction_ppm != 0 {
        tracing::warn!(
            ppm = config.freq_correction_ppm,
            "rs-rtl does not support frequency correction PPM; ignoring"
        );
    }

    sdr.set_center_freq(config.center_freq)?;

    match config.gain {
        Gain::Manual(gain_db) => {
            let gain_tenths = (gain_db * 10.0) as i32;
            tracing::info!(gain_db, gain_tenths, "Setting manual tuner gain");
            sdr.set_gain_manual(gain_tenths)?;
        }
        Gain::Auto => {
            tracing::info!("Setting automatic tuner gain");
            sdr.set_gain_auto()?;
        }
        Gain::Elements(_) => {
            tracing::warn!("RTL-SDR does not support element-based gain control, using auto gain");
            sdr.set_gain_auto()?;
        }
    }

    let _ = sdr.set_bias_t(config.bias_tee);

    Ok(sdr)
}

/**
 * Synchronous RTL-SDR I/Q Reader
 */
pub struct RtlSdrReader {
    config: RtlSdrConfig,
    /// Background streaming handle (lazily initialized on first `next()` call).
    bg_rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, String>>>,
}

impl RtlSdrReader {
    pub fn new(config: &RtlSdrConfig) -> error::Result<Self> {
        Ok(Self {
            config: config.clone(),
            bg_rx: None,
        })
    }

    fn start_reader_thread(&mut self) -> error::Result<()> {
        let (tx, rx) = std::sync::mpsc::sync_channel::<Result<Vec<u8>, String>>(64);
        let (tx_init, rx_init) = std::sync::mpsc::sync_channel::<Result<(), String>>(1);

        let config = self.config.clone();

        std::thread::spawn(move || {
            let mut sdr = match open_and_configure(&config) {
                Ok(dev) => dev,
                Err(e) => {
                    let _ = tx_init.send(Err(e.to_string()));
                    return;
                }
            };

            let reader = match sdr.start_streaming() {
                Ok(r) => r,
                Err(e) => {
                    let _ = tx_init.send(Err(e.to_string()));
                    return;
                }
            };
            let _ = tx_init.send(Ok(()));

            while let Some(data) = reader.recv() {
                if data.is_empty() {
                    continue;
                }
                // Use blocking send for backpressure (lesson 20.1)
                if tx.send(Ok(data)).is_err() {
                    break;
                }
            }
        });

        match rx_init.recv() {
            Ok(Ok(())) => {
                self.bg_rx = Some(rx);
                Ok(())
            }
            Ok(Err(msg)) => Err(error::Error::Other(msg)),
            Err(_) => Err(error::Error::device(
                "Failed to initialize RTL-SDR reader thread",
            )),
        }
    }

    /// Retune the reader to a new center frequency.
    pub fn tune(&mut self, center_freq: u32) -> error::Result<()> {
        if self.bg_rx.is_none() {
            self.config.center_freq = center_freq;
        }
        Ok(())
    }

    /// Change tuner gain mode/value.
    pub fn set_gain(&mut self, gain: Gain) -> error::Result<()> {
        if self.bg_rx.is_none() {
            self.config.gain = gain;
        }
        Ok(())
    }
}

impl Iterator for RtlSdrReader {
    type Item = error::Result<Vec<Complex<f32>>>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.bg_rx.is_none()
            && let Err(e) = self.start_reader_thread()
        {
            return Some(Err(e));
        }

        if let Some(ref rx) = self.bg_rx {
            return match rx.recv() {
                Ok(Ok(bytes)) => {
                    let samples = crate::convert_bytes_to_complex(IqFormat::Cu8, &bytes);
                    Some(Ok(samples))
                }
                Ok(Err(msg)) => Some(Err(error::Error::Other(msg))),
                Err(_) => None,
            };
        }
        None
    }
}

/// Asynchronous RTL-SDR I/Q Reader.
///
/// Uses `rs_rtl::RtlSdr::start_streaming()` which runs multi-transfer USB I/O
/// on a dedicated OS thread with a bounded `sync_channel`. A thin bridge thread
/// converts raw bytes to `Complex<f32>` and forwards to a tokio channel,
/// keeping the tokio worker pool free for decode work.
///
/// Architecture:
/// ```text
///   USB streaming thread  ──sync_channel(32)──▶  bridge thread  ──tokio::mpsc(32)──▶  decode loop
///   (nusb multi-transfer)                         (bytes→complex)                      (IqAsyncSource)
/// ```
///
/// The streaming thread uses nusb's endpoint queue with 15 transfers in-flight
/// simultaneously, eliminating inter-transfer gaps. Delivery uses blocking
/// `send()` for backpressure — the USB thread pauses when the queue is full,
/// preventing data loss.
pub struct AsyncRtlSdrReader {
    /// Control handle for tune/gain/stop on the USB reader thread.
    control: rs_rtl::AsyncReadControlHandle,
    /// Async receiver for the decode loop.
    samples_rx: tokio::sync::mpsc::Receiver<error::Result<Vec<Complex<f32>>>>,
}

impl AsyncRtlSdrReader {
    /// Create a new async RTL-SDR reader.
    ///
    /// Opens and configures the device, then starts multi-transfer streaming.
    /// A bridge thread converts raw IQ bytes and forwards them to the tokio channel.
    pub fn new(config: &RtlSdrConfig) -> error::Result<Self> {
        let mut sdr = open_and_configure(config)?;

        let reader = sdr
            .start_streaming()
            .map_err(|e| error::Error::device(format!("Failed to start streaming: {e}")))?;
        let control = reader.control_handle();

        let (samples_tx, samples_rx) = tokio::sync::mpsc::channel(BRIDGE_QUEUE_DEPTH);

        // Bridge thread: receives raw bytes from the USB streaming thread,
        // converts to complex, sends to tokio channel.
        // Exits when the consumer drops `samples_rx`.
        std::thread::Builder::new()
            .name("rtlsdr-bridge".into())
            .spawn(move || {
                // Keep sdr alive so the device is not dropped while streaming
                let _sdr = sdr;
                while let Some(bytes) = reader.recv() {
                    let samples = Ok(crate::convert_bytes_to_complex(IqFormat::Cu8, &bytes));
                    if samples_tx.blocking_send(samples).is_err() {
                        break;
                    }
                }
            })
            .map_err(|e| error::Error::device(format!("Failed to spawn bridge thread: {e}")))?;

        Ok(Self {
            control,
            samples_rx,
        })
    }

    /// Send a control message to the USB reader thread.
    pub fn adjust(&self, message: RtlSdrMessage) -> error::Result<()> {
        match message {
            RtlSdrMessage::Frequency(freq) => self
                .control
                .tune(freq)
                .map_err(|e| error::Error::device(format!("RTL-SDR tune failed: {e}"))),
            RtlSdrMessage::Gain(gain) => match gain {
                Gain::Auto => self.control.set_gain_auto().map_err(|e| {
                    error::Error::device(format!("RTL-SDR set auto gain failed: {e}"))
                }),
                Gain::Manual(db) => {
                    let gain_tenths = (db * 10.0) as i32;
                    self.control
                        .set_gain(gain_tenths)
                        .map_err(|e| error::Error::device(format!("RTL-SDR set gain failed: {e}")))
                }
                Gain::Elements(_) => {
                    tracing::warn!("Element-based gain not supported for RTL-SDR; ignoring");
                    Ok(())
                }
            },
            RtlSdrMessage::SampleRate(_rate) => {
                tracing::warn!(
                    "RTL-SDR live sample rate change not supported during streaming; ignoring"
                );
                Ok(())
            }
            RtlSdrMessage::FreqCorrection(ppm) => {
                tracing::warn!(
                    ppm,
                    "RTL-SDR live frequency correction change not supported during streaming; ignoring"
                );
                Ok(())
            }
        }
    }

    /// Retune to a specific center frequency.
    pub fn tune(&self, center_freq: u32) -> error::Result<()> {
        self.adjust(RtlSdrMessage::Frequency(center_freq))
    }
}

impl Stream for AsyncRtlSdrReader {
    type Item = error::Result<Vec<Complex<f32>>>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        self.samples_rx.poll_recv(cx)
    }
}

/// Get the index of the first available RTL-SDR device (always 0).
pub fn get_first_device_index() -> usize {
    0
}