waverave_hackrf/
tx.rs

1use crate::{Buffer, Error, HackRf, consts::TransceiverMode, error::StateChangeError};
2
3/// A HackRF operating in transmit mode.
4///
5/// To send data, first take a HackRF peripheral and call
6/// [`HackRf::start_tx`], or use [`Transmit::new`] with it. Provide the maximum
7/// block transfer size, in samples, that will be used for the duration of the
8/// transmit.
9///
10/// Next, call [`get_buffer`][Transmit::get_buffer] to allocate a buffer, and
11/// then fill that buffer with the next block of samples to transmit, up to the
12/// maximum block transfer size chosen. Set up around half a million samples
13/// worth of buffers before submitting any, ideally, or gaps in the transmit
14/// sequence may occur.
15///
16/// Next, start calling [`submit`][Transmit::submit] to submit the blocks.
17/// Continue the buffer filling and submitting process as long as needed,
18/// preferrably pausing with [`next_complete`][Transmit::next_complete] when
19/// there a more than half a million samples worth of buffers queued up. The
20/// number of queued buffers can be checked with [`pending`][Transmit::pending].
21/// Once buffers are queued up, the process of completing, grabbing a buffer,
22/// and submitting it in a loop until finished.
23///
24/// When all buffers have been submitted, call [`flush`][Transmit::flush] to
25/// queue up a final set of zero-filled buffers to flush out any remaining data
26/// in the HackRF's internal buffers. Then continue calling
27/// [`next_complete`][Transmit::next_complete] until
28/// [`pending`][Transmit::pending] returns 0. Finally, exit transmit mode with
29/// [`stop`][Transmit::stop]. Transmit mode can be exited without doing this
30/// sequence, but not all samples queued up will necessarily be sent otherwise.
31///
32/// Putting it all together, here's an example program that transmits all the
33/// data in a file:
34///
35/// ```no_run
36/// use anyhow::Result;
37/// use tokio::io::AsyncReadExt;
38/// use waverave_hackrf::Buffer;
39/// #[tokio::main]
40/// async fn main() -> Result<()> {
41///     let hackrf = waverave_hackrf::open_hackrf()?;
42///
43///     // Configure: 20MHz sample rate, turn on RF amp, set TX IF gain to 16 dB,
44///     // and tune to 915 MHz.
45///     hackrf.set_sample_rate(20e6).await?;
46///     hackrf.set_txvga_gain(16).await?;
47///     hackrf.set_freq(915_000_000).await?;
48///     hackrf.set_amp_enable(true).await?;
49///
50///     // Open up a file for buffered reading.
51///     let mut args = std::env::args();
52///     args.next();
53///     let file_name = args.next().unwrap_or_else(|| String::from("./tx.bin"));
54///     let mut file = tokio::fs::File::open(&file_name).await?;
55///
56///     // Start transmitting, in bursts of 8192 samples
57///     let mut hackrf_tx = hackrf.start_tx(8192).await.map_err(|e| e.err)?;
58///
59///     // Set up an asynchronous process that fills buffers and sends them on to
60///     // the transmitter.
61///     let (buf_send, mut buf_recv) = tokio::sync::mpsc::channel::<Buffer>(4);
62///     let (data_send, mut data_recv) = tokio::sync::mpsc::channel::<Buffer>(4);
63///     tokio::spawn(async move {
64///         loop {
65///             let Some(mut buf) = buf_recv.recv().await else {
66///                 break;
67///             };
68///             buf.extend_zeros(buf.remaining_capacity());
69///             file.read_exact(buf.bytes_mut()).await?;
70///             let Ok(_) = data_send.send(buf).await else {
71///                 break;
72///             };
73///         }
74///         Ok::<(), anyhow::Error>(())
75///     });
76///
77///     // Start filling up queue
78///     let mut start = Vec::with_capacity(64);
79///     while start.len() < 64 {
80///         while buf_send.try_send(hackrf_tx.get_buffer()).is_ok() {}
81///         let Some(buf) = data_recv.recv().await else {
82///             break;
83///         };
84///         start.push(buf);
85///     }
86///
87///     // Submit the whole starting queue in one go
88///     for buf in start {
89///         hackrf_tx.submit(buf);
90///     }
91///
92///     // Continue filling the queue and submitting samples as often as possible
93///     loop {
94///         while buf_send.try_send(hackrf_tx.get_buffer()).is_ok() {}
95///         hackrf_tx.next_complete().await?;
96///         let Some(buf) = data_recv.recv().await else {
97///             break;
98///         };
99///         hackrf_tx.submit(buf);
100///     }
101///
102///     // Flush the remainder
103///     hackrf_tx.flush();
104///     while hackrf_tx.pending() > 0 {
105///         hackrf_tx.next_complete().await?;
106///     }
107///
108///     // Stop transmitting
109///     hackrf_tx.stop().await?;
110///
111///     Ok(())
112/// }
113/// ```
114pub struct Transmit {
115    rf: HackRf,
116    max_transfer_size: usize,
117}
118
119impl Transmit {
120    /// Switch a HackRF into transmit mode, with a set maximum number of samples
121    /// per buffer block.
122    ///
123    /// Buffers are reused across transmit operations, provided that the
124    /// `max_transfer_size` is always the same.
125    pub async fn new(rf: HackRf, max_transfer_size: usize) -> Result<Self, StateChangeError> {
126        if let Err(err) = rf.set_transceiver_mode(TransceiverMode::Transmit).await {
127            return Err(StateChangeError { err, rf });
128        }
129        // Round up to nearest 256-sample increment
130        let max_transfer_size = (max_transfer_size.max(1) + 0xFF) & !0xFF;
131        Ok(Self {
132            rf,
133            max_transfer_size,
134        })
135    }
136
137    /// Get a buffer for holding transmit data.
138    ///
139    /// All buffers have the same capacity, set when transmit is initialized.
140    /// Actual transmitted data is rounded up to the nearest 256 samples,
141    /// zero-filling as needed.
142    pub fn get_buffer(&self) -> Buffer {
143        if let Ok(mut buf) = self.rf.tx.buf_pool.try_recv() {
144            buf.clear();
145            buf.reserve_exact(self.max_transfer_size * 2);
146            Buffer::new(buf, self.rf.tx.buf_pool_send.clone())
147        } else {
148            Buffer::new(
149                Vec::with_capacity(self.max_transfer_size * 2),
150                self.rf.tx.buf_pool_send.clone(),
151            )
152        }
153    }
154
155    /// The maximum number of samples that can be queued within a single buffer.
156    pub fn max_transfer_size(&self) -> usize {
157        self.max_transfer_size
158    }
159
160    /// Queue up a transmit transfer.
161    ///
162    /// This will pull from a reusable buffer pool first, and allocate a new
163    /// buffer if none are available in the pool.
164    ///
165    /// The buffer pool will grow so long as completed buffers aren't dropped.
166    pub fn submit(&mut self, tx: Buffer) {
167        let mut tx = tx.into_vec();
168        // Round up to nearest 512-byte block and zero-fill remaining
169        let new_len = (tx.len() + 0x1ff) & !0x1ff;
170        tx.resize(new_len, 0);
171        self.rf.tx.queue.submit(tx);
172    }
173
174    /// Flush whatever remaining samples are in the HackRF internal buffer.
175    ///
176    /// This will generate additional pending operations.
177    ///
178    /// Additional pending operations go up by `8192.div_ceil(max_transfer_size)`.
179    pub fn flush(&mut self) {
180        // HackRF buffer depth, in samples (0x8000 bytes)
181        const BUFFER_DEPTH: usize = 0x4000;
182        let mut total_size = 0;
183        while total_size < BUFFER_DEPTH {
184            let buf_size = self.max_transfer_size.min(BUFFER_DEPTH - total_size);
185            let mut buf = self.get_buffer();
186            buf.extend_zeros(buf_size);
187            self.submit(buf);
188            total_size += buf_size;
189        }
190    }
191
192    /// Wait for a transmit operation to complete.
193    ///
194    /// This future is cancel-safe, so feel free to use it alongside a timeout
195    /// or a `select!`-type pattern.
196    pub async fn next_complete(&mut self) -> Result<(), Error> {
197        let result = self.rf.tx.queue.next_complete().await;
198        let _ = self.rf.tx.buf_pool_send.send(result.data.reuse());
199        Ok(result.status?)
200    }
201
202    /// Get the number of pending requests.
203    pub fn pending(&self) -> usize {
204        self.rf.tx.queue.pending()
205    }
206
207    /// Halt receiving and return to idle mode.
208    ///
209    /// This attempts to cancel all transfers and then complete whatever is
210    /// left. Transfer errors are ignored.
211    pub async fn stop(mut self) -> Result<HackRf, Error> {
212        self.rf.tx.queue.cancel_all();
213        while self.pending() > 0 {
214            let _ = self.next_complete().await;
215        }
216        self.rf.set_transceiver_mode(TransceiverMode::Off).await?;
217        Ok(self.rf)
218    }
219}