1use std::sync::mpsc;
2
3use nusb::transfer::RequestBuffer;
4
5use crate::{Buffer, Error, HackRf, consts::TransceiverMode, error::StateChangeError};
6
7pub struct Receive {
9 rf: HackRf,
10 queue: nusb::transfer::Queue<nusb::transfer::RequestBuffer>,
11 transfer_size: usize,
12 buf_pool: mpsc::Receiver<Vec<u8>>,
13 buf_pool_send: mpsc::Sender<Vec<u8>>,
14}
15
16impl Receive {
17 pub async fn new(rf: HackRf, transfer_size: usize) -> Result<Self, StateChangeError> {
19 if let Err(err) = rf.set_transceiver_mode(TransceiverMode::Receive).await {
21 return Err(StateChangeError { err, rf });
22 }
23 let transfer_size = (transfer_size.max(1) + 0x1FF) & !0x1FF;
24 let queue = rf.interface.bulk_in_queue(0x81);
25 let (buf_pool_send, buf_pool) = mpsc::channel();
26 Ok(Self {
27 rf,
28 queue,
29 transfer_size,
30 buf_pool,
31 buf_pool_send,
32 })
33 }
34
35 pub fn submit(&mut self) {
42 let req = if let Ok(buf) = self.buf_pool.try_recv() {
43 RequestBuffer::reuse(buf, self.transfer_size)
44 } else {
45 RequestBuffer::new(self.transfer_size)
46 };
47 self.queue.submit(req);
48 }
49
50 pub async fn next_complete(&mut self) -> Result<Buffer, Error> {
52 let buf = self.queue.next_complete().await.into_result()?;
53 Ok(Buffer::new(buf, self.buf_pool_send.clone()))
54 }
55
56 pub fn pending(&self) -> usize {
58 self.queue.pending()
59 }
60
61 pub async fn stop(mut self) -> Result<HackRf, Error> {
66 while self.pending() > 0 {
67 let _ = self.next_complete().await;
68 }
69 self.rf.set_transceiver_mode(TransceiverMode::Off).await?;
70 Ok(self.rf)
71 }
72}