waverave_hackrf/rx.rs
1use nusb::transfer::RequestBuffer;
2
3use crate::{Buffer, Error, HackRf, consts::TransceiverMode, error::StateChangeError};
4
5/// A HackRF operating in receive mode.
6///
7/// To receive data, first take a HackRF peripheral and call
8/// [`HackRf::start_rx`], or use [`Receive::new`] with it.
9///
10/// Next, call [`submit`][Receive::submit] to queue up requests, stopping when
11/// there are enough pending requests. `libhackrf` queues up 1 MiB of data, or
12/// 524288 samples. You'll probably want something similar, with the number of
13/// pending requests informed by your chosen transfer block size.
14///
15/// Actual reception is done with [`next_complete`][Receive::next_complete],
16/// which will panic if there are no pending requests. The number of pending
17/// requests can always be checked with [`pending`][Receive::pending].
18///
19/// When finished receiving, call [`stop`][Receive::stop] to cancel all
20/// remaining transactions and switch the HackRF off again.
21///
22/// Putting it all together, here's an example receive program that writes all
23/// data to a file:
24///
25///
26/// ```no_run
27/// use std::sync::{Arc, atomic};
28///
29/// use anyhow::Result;
30/// use tokio::io::AsyncWriteExt;
31/// use waverave_hackrf::Buffer;
32/// #[tokio::main]
33/// async fn main() -> Result<()> {
34/// // Set up the ctrl-c handler
35/// let ctrlc_rx = Arc::new(atomic::AtomicBool::new(false));
36/// let ctrlc_tx = ctrlc_rx.clone();
37/// tokio::spawn(async move {
38/// tokio::signal::ctrl_c().await.unwrap();
39/// ctrlc_tx.store(true, atomic::Ordering::Release);
40/// });
41///
42/// // Open up a file for buffered writing.
43/// let mut args = std::env::args();
44/// args.next();
45/// let file_name = args.next().unwrap_or_else(|| String::from("./rx.bin"));
46/// let mut file = tokio::fs::File::create(&file_name).await?;
47///
48/// // Open up the HackRF
49/// let hackrf = waverave_hackrf::open_hackrf()?;
50///
51/// // Configure: 20MHz sample rate, turn on the RF amp, set IF & BB gains to 16 dB,
52/// // and tune to 915 MHz.
53/// hackrf.set_amp_enable(true).await?;
54/// hackrf.set_sample_rate(20e6).await?;
55/// hackrf.set_lna_gain(16).await?;
56/// hackrf.set_vga_gain(16).await?;
57/// hackrf.set_freq(915_000_000).await?;
58///
59/// // Start receiving, in bursts of 8192 samples
60/// let mut hackrf_rx = hackrf.start_rx(8192).await.map_err(|e| e.err)?;
61///
62/// // Separate the file writer from the sample reader with a separate task
63/// let (data_send, mut data_recv) = tokio::sync::mpsc::unbounded_channel::<Buffer>();
64/// let file_writer = tokio::spawn(async move {
65/// loop {
66/// let Some(buf) = data_recv.recv().await else {
67/// break;
68/// };
69/// file.write_all_buf(&mut buf.bytes()).await?;
70/// }
71/// file.flush().await?;
72/// Ok::<(), anyhow::Error>(())
73/// });
74///
75/// // Queue up 64 transfers immediately, then start retrieving them until we
76/// // get a ctrl-c.
77/// for _ in 0..64 {
78/// hackrf_rx.submit();
79/// }
80/// loop {
81/// if ctrlc_rx.load(atomic::Ordering::Acquire) {
82/// break;
83/// }
84/// if data_send.send(hackrf_rx.next_complete().await?).is_err() {
85/// break;
86/// }
87/// hackrf_rx.submit();
88/// }
89///
90/// // Stop receiving
91/// hackrf_rx.stop().await?;
92/// drop(data_send);
93///
94/// // Wait for file writer task to close up shop
95/// file_writer.await??;
96///
97/// Ok(())
98/// }
99/// ```
100pub struct Receive {
101 rf: HackRf,
102 transfer_size: usize,
103}
104
105impl Receive {
106 /// Switch a HackRF into receive mode, getting `transfer_size` samples at a
107 /// time. The transfer size is always rounded up to the nearest 256-sample
108 /// block increment; it's recommended to be 8192 samples but can be smaller
109 /// or larger as needed.
110 pub async fn new(rf: HackRf, transfer_size: usize) -> Result<Self, StateChangeError> {
111 if let Err(err) = rf.set_transceiver_mode(TransceiverMode::Receive).await {
112 return Err(StateChangeError { err, rf });
113 }
114 // Go from samples to bytes, and round up to nearest 256-sample increment
115 let transfer_size = (transfer_size.max(1) + 0xFF) & !0xFF;
116 Ok(Self { rf, transfer_size })
117 }
118
119 /// Get the chosen transfer size, in samples.
120 pub fn transfer_size(&self) -> usize {
121 self.transfer_size
122 }
123
124 /// Queue up a receive transfer.
125 ///
126 /// This will pull from a reusable buffer pool first, and allocate a new
127 /// buffer if none are available in the pool.
128 ///
129 /// The buffer pool will grow so long as completed buffers aren't dropped.
130 pub fn submit(&mut self) {
131 let req = if let Ok(buf) = self.rf.rx.buf_pool.try_recv() {
132 RequestBuffer::reuse(buf, self.transfer_size * 2)
133 } else {
134 RequestBuffer::new(self.transfer_size * 2)
135 };
136 self.rf.rx.queue.submit(req);
137 }
138
139 /// Retrieve the next chunk of receive data.
140 ///
141 /// This future is cancel-safe, so feel free to use it alongside a timeout
142 /// or a `select!`-type pattern.
143 pub async fn next_complete(&mut self) -> Result<Buffer, Error> {
144 let result = self.rf.rx.queue.next_complete().await;
145 match result.status {
146 Ok(_) => Ok(Buffer::new(result.data, self.rf.rx.buf_pool_send.clone())),
147 Err(e) => {
148 // Reuse the buffer even in the event of an error.
149 let _ = self.rf.rx.buf_pool_send.send(result.data);
150 Err(e.into())
151 }
152 }
153 }
154
155 /// Get the number of pending requests.
156 pub fn pending(&self) -> usize {
157 self.rf.rx.queue.pending()
158 }
159
160 /// Halt receiving and return to idle mode.
161 ///
162 /// This attempts to cancel all transfers and then complete whatever is
163 /// left. Transfer errors are ignored.
164 pub async fn stop(mut self) -> Result<HackRf, Error> {
165 self.rf.rx.queue.cancel_all();
166 while self.pending() > 0 {
167 let _ = self.next_complete().await;
168 }
169 self.rf.set_transceiver_mode(TransceiverMode::Off).await?;
170 Ok(self.rf)
171 }
172}