waverave_hackrf/tx.rs
1use crate::{Buffer, Error, HackRf, consts::TransceiverMode, error::StateChangeError};
2
3/// A HackRF operating in transmit mode.
4///
5/// To send data, first take a HackRF peripheral and call
6/// [`HackRf::start_tx`], or use [`Transmit::new`] with it. Provide the maximum
7/// block transfer size, in samples, that will be used for the duration of the
8/// transmit.
9///
10/// Next, call [`get_buffer`][Transmit::get_buffer] to allocate a buffer, and
11/// then fill that buffer with the next block of samples to transmit, up to the
12/// maximum block transfer size chosen. Set up around half a million samples
13/// worth of buffers before submitting any, ideally, or gaps in the transmit
14/// sequence may occur.
15///
16/// Next, start calling [`submit`][Transmit::submit] to submit the blocks.
17/// Continue the buffer filling and submitting process as long as needed,
18/// preferrably pausing with [`next_complete`][Transmit::next_complete] when
19/// there a more than half a million samples worth of buffers queued up. The
20/// number of queued buffers can be checked with [`pending`][Transmit::pending].
21/// Once buffers are queued up, the process of completing, grabbing a buffer,
22/// and submitting it in a loop until finished.
23///
24/// When all buffers have been submitted, call [`flush`][Transmit::flush] to
25/// queue up a final set of zero-filled buffers to flush out any remaining data
26/// in the HackRF's internal buffers. Then continue calling
27/// [`next_complete`][Transmit::next_complete] until
28/// [`pending`][Transmit::pending] returns 0. Finally, exit transmit mode with
29/// [`stop`][Transmit::stop]. Transmit mode can be exited without doing this
30/// sequence, but not all samples queued up will necessarily be sent otherwise.
31///
32/// Putting it all together, here's an example program that transmits all the
33/// data in a file:
34///
35/// ```no_run
36/// use anyhow::Result;
37/// use tokio::io::AsyncReadExt;
38/// use waverave_hackrf::Buffer;
39/// #[tokio::main]
40/// async fn main() -> Result<()> {
41/// let hackrf = waverave_hackrf::open_hackrf()?;
42///
43/// // Configure: 20MHz sample rate, turn on RF amp, set TX IF gain to 16 dB,
44/// // and tune to 915 MHz.
45/// hackrf.set_sample_rate(20e6).await?;
46/// hackrf.set_txvga_gain(16).await?;
47/// hackrf.set_freq(915_000_000).await?;
48/// hackrf.set_amp_enable(true).await?;
49///
50/// // Open up a file for buffered reading.
51/// let mut args = std::env::args();
52/// args.next();
53/// let file_name = args.next().unwrap_or_else(|| String::from("./tx.bin"));
54/// let mut file = tokio::fs::File::open(&file_name).await?;
55///
56/// // Start transmitting, in bursts of 8192 samples
57/// let mut hackrf_tx = hackrf.start_tx(8192).await.map_err(|e| e.err)?;
58///
59/// // Set up an asynchronous process that fills buffers and sends them on to
60/// // the transmitter.
61/// let (buf_send, mut buf_recv) = tokio::sync::mpsc::channel::<Buffer>(4);
62/// let (data_send, mut data_recv) = tokio::sync::mpsc::channel::<Buffer>(4);
63/// tokio::spawn(async move {
64/// loop {
65/// let Some(mut buf) = buf_recv.recv().await else {
66/// break;
67/// };
68/// buf.extend_zeros(buf.remaining_capacity());
69/// file.read_exact(buf.bytes_mut()).await?;
70/// let Ok(_) = data_send.send(buf).await else {
71/// break;
72/// };
73/// }
74/// Ok::<(), anyhow::Error>(())
75/// });
76///
77/// // Start filling up queue
78/// let mut start = Vec::with_capacity(64);
79/// while start.len() < 64 {
80/// while buf_send.try_send(hackrf_tx.get_buffer()).is_ok() {}
81/// let Some(buf) = data_recv.recv().await else {
82/// break;
83/// };
84/// start.push(buf);
85/// }
86///
87/// // Submit the whole starting queue in one go
88/// for buf in start {
89/// hackrf_tx.submit(buf);
90/// }
91///
92/// // Continue filling the queue and submitting samples as often as possible
93/// loop {
94/// while buf_send.try_send(hackrf_tx.get_buffer()).is_ok() {}
95/// hackrf_tx.next_complete().await?;
96/// let Some(buf) = data_recv.recv().await else {
97/// break;
98/// };
99/// hackrf_tx.submit(buf);
100/// }
101///
102/// // Flush the remainder
103/// hackrf_tx.flush();
104/// while hackrf_tx.pending() > 0 {
105/// hackrf_tx.next_complete().await?;
106/// }
107///
108/// // Stop transmitting
109/// hackrf_tx.stop().await?;
110///
111/// Ok(())
112/// }
113/// ```
114pub struct Transmit {
115 rf: HackRf,
116 max_transfer_size: usize,
117}
118
119impl Transmit {
120 /// Switch a HackRF into transmit mode, with a set maximum number of samples
121 /// per buffer block.
122 ///
123 /// Buffers are reused across transmit operations, provided that the
124 /// `max_transfer_size` is always the same.
125 pub async fn new(rf: HackRf, max_transfer_size: usize) -> Result<Self, StateChangeError> {
126 if let Err(err) = rf.set_transceiver_mode(TransceiverMode::Transmit).await {
127 return Err(StateChangeError { err, rf });
128 }
129 // Round up to nearest 256-sample increment
130 let max_transfer_size = (max_transfer_size.max(1) + 0xFF) & !0xFF;
131 Ok(Self {
132 rf,
133 max_transfer_size,
134 })
135 }
136
137 /// Get a buffer for holding transmit data.
138 ///
139 /// All buffers have the same capacity, set when transmit is initialized.
140 /// Actual transmitted data is rounded up to the nearest 256 samples,
141 /// zero-filling as needed.
142 pub fn get_buffer(&self) -> Buffer {
143 if let Ok(mut buf) = self.rf.tx.buf_pool.try_recv() {
144 buf.clear();
145 buf.reserve_exact(self.max_transfer_size * 2);
146 Buffer::new(buf, self.rf.tx.buf_pool_send.clone())
147 } else {
148 Buffer::new(
149 Vec::with_capacity(self.max_transfer_size * 2),
150 self.rf.tx.buf_pool_send.clone(),
151 )
152 }
153 }
154
155 /// The maximum number of samples that can be queued within a single buffer.
156 pub fn max_transfer_size(&self) -> usize {
157 self.max_transfer_size
158 }
159
160 /// Queue up a transmit transfer.
161 ///
162 /// This will pull from a reusable buffer pool first, and allocate a new
163 /// buffer if none are available in the pool.
164 ///
165 /// The buffer pool will grow so long as completed buffers aren't dropped.
166 pub fn submit(&mut self, tx: Buffer) {
167 let mut tx = tx.into_vec();
168 // Round up to nearest 512-byte block and zero-fill remaining
169 let new_len = (tx.len() + 0x1ff) & !0x1ff;
170 tx.resize(new_len, 0);
171 self.rf.tx.queue.submit(tx);
172 }
173
174 /// Flush whatever remaining samples are in the HackRF internal buffer.
175 ///
176 /// This will generate additional pending operations.
177 ///
178 /// Additional pending operations go up by `8192.div_ceil(max_transfer_size)`.
179 pub fn flush(&mut self) {
180 // HackRF buffer depth, in samples (0x8000 bytes)
181 const BUFFER_DEPTH: usize = 0x4000;
182 let mut total_size = 0;
183 while total_size < BUFFER_DEPTH {
184 let buf_size = self.max_transfer_size.min(BUFFER_DEPTH - total_size);
185 let mut buf = self.get_buffer();
186 buf.extend_zeros(buf_size);
187 self.submit(buf);
188 total_size += buf_size;
189 }
190 }
191
192 /// Wait for a transmit operation to complete.
193 ///
194 /// This future is cancel-safe, so feel free to use it alongside a timeout
195 /// or a `select!`-type pattern.
196 pub async fn next_complete(&mut self) -> Result<(), Error> {
197 let result = self.rf.tx.queue.next_complete().await;
198 let _ = self.rf.tx.buf_pool_send.send(result.data.reuse());
199 Ok(result.status?)
200 }
201
202 /// Get the number of pending requests.
203 pub fn pending(&self) -> usize {
204 self.rf.tx.queue.pending()
205 }
206
207 /// Halt receiving and return to idle mode.
208 ///
209 /// This attempts to cancel all transfers and then complete whatever is
210 /// left. Transfer errors are ignored.
211 pub async fn stop(mut self) -> Result<HackRf, StateChangeError> {
212 self.rf.tx.queue.cancel_all();
213 while self.pending() > 0 {
214 let _ = self.next_complete().await;
215 }
216 match self.rf.set_transceiver_mode(TransceiverMode::Off).await {
217 Ok(_) => Ok(self.rf),
218 Err(err) => Err(StateChangeError { err, rf: self.rf }),
219 }
220 }
221}