waverave_hackrf/
rx.rs

1use nusb::transfer::RequestBuffer;
2
3use crate::{Buffer, Error, HackRf, consts::TransceiverMode, error::StateChangeError};
4
5/// A HackRF operating in receive mode.
6///
7/// To receive data, first take a HackRF peripheral and call
8/// [`HackRf::start_rx`], or use [`Receive::new`] with it.
9///
10/// Next, call [`submit`][Receive::submit] to queue up requests, stopping when
11/// there are enough pending requests. `libhackrf` queues up 1 MiB of data, or
12/// 524288 samples. You'll probably want something similar, with the number of
13/// pending requests informed by your chosen transfer block size.
14///
15/// Actual reception is done with [`next_complete`][Receive::next_complete],
16/// which will panic if there are no pending requests. The number of pending
17/// requests can always be checked with [`pending`][Receive::pending].
18///
19/// When finished receiving, call [`stop`][Receive::stop] to cancel all
20/// remaining transactions and switch the HackRF off again.
21///
22/// Putting it all together, here's an example receive program that writes all
23/// data to a file:
24///
25///
26/// ```no_run
27/// use std::sync::{Arc, atomic};
28///
29/// use anyhow::Result;
30/// use tokio::io::AsyncWriteExt;
31/// use waverave_hackrf::Buffer;
32/// #[tokio::main]
33/// async fn main() -> Result<()> {
34///     // Set up the ctrl-c handler
35///     let ctrlc_rx = Arc::new(atomic::AtomicBool::new(false));
36///     let ctrlc_tx = ctrlc_rx.clone();
37///     tokio::spawn(async move {
38///         tokio::signal::ctrl_c().await.unwrap();
39///         ctrlc_tx.store(true, atomic::Ordering::Release);
40///     });
41///
42///     // Open up a file for buffered writing.
43///     let mut args = std::env::args();
44///     args.next();
45///     let file_name = args.next().unwrap_or_else(|| String::from("./rx.bin"));
46///     let mut file = tokio::fs::File::create(&file_name).await?;
47///
48///     // Open up the HackRF
49///     let hackrf = waverave_hackrf::open_hackrf()?;
50///
51///     // Configure: 20MHz sample rate, turn on the RF amp, set IF & BB gains to 16 dB,
52///     // and tune to 915 MHz.
53///     hackrf.set_amp_enable(true).await?;
54///     hackrf.set_sample_rate(20e6).await?;
55///     hackrf.set_lna_gain(16).await?;
56///     hackrf.set_vga_gain(16).await?;
57///     hackrf.set_freq(915_000_000).await?;
58///
59///     // Start receiving, in bursts of 8192 samples
60///     let mut hackrf_rx = hackrf.start_rx(8192).await.map_err(|e| e.err)?;
61///
62///     // Separate the file writer from the sample reader with a separate task
63///     let (data_send, mut data_recv) = tokio::sync::mpsc::unbounded_channel::<Buffer>();
64///     let file_writer = tokio::spawn(async move {
65///         loop {
66///             let Some(buf) = data_recv.recv().await else {
67///                 break;
68///             };
69///             file.write_all_buf(&mut buf.bytes()).await?;
70///         }
71///         file.flush().await?;
72///         Ok::<(), anyhow::Error>(())
73///     });
74///
75///     // Queue up 64 transfers immediately, then start retrieving them until we
76///     // get a ctrl-c.
77///     for _ in 0..64 {
78///         hackrf_rx.submit();
79///     }
80///     loop {
81///         if ctrlc_rx.load(atomic::Ordering::Acquire) {
82///             break;
83///         }
84///         if data_send.send(hackrf_rx.next_complete().await?).is_err() {
85///             break;
86///         }
87///         hackrf_rx.submit();
88///     }
89///
90///     // Stop receiving
91///     hackrf_rx.stop().await?;
92///     drop(data_send);
93///
94///     // Wait for file writer task to close up shop
95///     file_writer.await??;
96///
97///     Ok(())
98/// }
99/// ```
100pub struct Receive {
101    rf: HackRf,
102    transfer_size: usize,
103}
104
105impl Receive {
106    /// Switch a HackRF into receive mode, getting `transfer_size` samples at a
107    /// time. The transfer size is always rounded up to the nearest 256-sample
108    /// block increment; it's recommended to be 8192 samples but can be smaller
109    /// or larger as needed.
110    pub async fn new(rf: HackRf, transfer_size: usize) -> Result<Self, StateChangeError> {
111        if let Err(err) = rf.set_transceiver_mode(TransceiverMode::Receive).await {
112            return Err(StateChangeError { err, rf });
113        }
114        // Go from samples to bytes, and round up to nearest 256-sample increment
115        let transfer_size = (transfer_size.max(1) + 0xFF) & !0xFF;
116        Ok(Self { rf, transfer_size })
117    }
118
119    /// Get the chosen transfer size, in samples.
120    pub fn transfer_size(&self) -> usize {
121        self.transfer_size
122    }
123
124    /// Queue up a receive transfer.
125    ///
126    /// This will pull from a reusable buffer pool first, and allocate a new
127    /// buffer if none are available in the pool.
128    ///
129    /// The buffer pool will grow so long as completed buffers aren't dropped.
130    pub fn submit(&mut self) {
131        let req = if let Ok(buf) = self.rf.rx.buf_pool.try_recv() {
132            RequestBuffer::reuse(buf, self.transfer_size * 2)
133        } else {
134            RequestBuffer::new(self.transfer_size * 2)
135        };
136        self.rf.rx.queue.submit(req);
137    }
138
139    /// Retrieve the next chunk of receive data.
140    ///
141    /// This future is cancel-safe, so feel free to use it alongside a timeout
142    /// or a `select!`-type pattern.
143    pub async fn next_complete(&mut self) -> Result<Buffer, Error> {
144        let result = self.rf.rx.queue.next_complete().await;
145        match result.status {
146            Ok(_) => Ok(Buffer::new(result.data, self.rf.rx.buf_pool_send.clone())),
147            Err(e) => {
148                // Reuse the buffer even in the event of an error.
149                let _ = self.rf.rx.buf_pool_send.send(result.data);
150                Err(e.into())
151            }
152        }
153    }
154
155    /// Get the number of pending requests.
156    pub fn pending(&self) -> usize {
157        self.rf.rx.queue.pending()
158    }
159
160    /// Halt receiving and return to idle mode.
161    ///
162    /// This attempts to cancel all transfers and then complete whatever is
163    /// left. Transfer errors are ignored.
164    pub async fn stop(mut self) -> Result<HackRf, Error> {
165        self.rf.rx.queue.cancel_all();
166        while self.pending() > 0 {
167            let _ = self.next_complete().await;
168        }
169        self.rf.set_transceiver_mode(TransceiverMode::Off).await?;
170        Ok(self.rf)
171    }
172}