1use std::sync::mpsc;
2
3use crate::{Buffer, Error, HackRf, consts::TransceiverMode, error::StateChangeError};
4
5pub struct Transmit {
7 rf: HackRf,
8 queue: nusb::transfer::Queue<Vec<u8>>,
9 max_transfer_size: usize,
10 buf_pool: mpsc::Receiver<Vec<u8>>,
11 buf_pool_send: mpsc::Sender<Vec<u8>>,
12}
13
14impl Transmit {
15 pub async fn new(rf: HackRf, max_transfer_size: usize) -> Result<Self, StateChangeError> {
17 if let Err(err) = rf.set_transceiver_mode(TransceiverMode::Transmit).await {
19 return Err(StateChangeError { err, rf });
20 }
21 let max_transfer_size = (max_transfer_size.max(1) + 0x1FF) & !0x1FF;
22 let queue = rf.interface.bulk_out_queue(0x02);
23 let (buf_pool_send, buf_pool) = mpsc::channel();
24 Ok(Self {
25 rf,
26 queue,
27 max_transfer_size,
28 buf_pool,
29 buf_pool_send,
30 })
31 }
32
33 pub fn get_buffer(&self) -> Buffer {
39 if let Ok(mut buf) = self.buf_pool.try_recv() {
40 buf.clear();
41 Buffer::new(buf, self.buf_pool_send.clone())
42 } else {
43 Buffer::new(
44 Vec::with_capacity(self.max_transfer_size),
45 self.buf_pool_send.clone(),
46 )
47 }
48 }
49
50 pub fn submit(&mut self, tx: Buffer) {
57 let mut tx = tx.into_vec();
58 let new_len = (tx.len() + 0x1ff) & !0x1ff;
60 tx.resize(new_len, 0);
61 self.queue.submit(tx);
62 }
63
64 pub async fn next_complete(&mut self) -> Result<(), Error> {
66 let result = self.queue.next_complete().await;
67 let _ = self.buf_pool_send.send(result.data.reuse());
68 Ok(result.status?)
69 }
70
71 pub fn pending(&self) -> usize {
73 self.queue.pending()
74 }
75
76 pub async fn stop(mut self) -> Result<HackRf, Error> {
81 while self.pending() > 0 {
82 let _ = self.next_complete().await;
83 }
84 self.rf.set_transceiver_mode(TransceiverMode::Off).await?;
85 Ok(self.rf)
86 }
87}