#![cfg(feature = "async")]
#![cfg_attr(docsrs, doc(cfg(feature = "async")))]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::{Crazyradio, Error, Result, UsbCommand};
#[derive(Debug, Clone)]
pub struct ReceivedSnifferPacket {
pub rssi_dbm: i16,
pub pipe: u8,
pub timestamp_us: u32,
pub payload: Vec<u8>,
}
pub struct SnifferReceiver {
packet_rx: Option<flume::Receiver<Result<ReceivedSnifferPacket>>>,
close_tx: Option<flume::Sender<()>>,
radio_rx: Option<flume::Receiver<Result<Crazyradio>>>,
session_active: Arc<AtomicBool>,
}
#[derive(Clone)]
pub struct SnifferSender {
device_handle: Arc<rusb::DeviceHandle<rusb::GlobalContext>>,
session_active: Arc<AtomicBool>,
#[cfg(feature = "packet_capture")]
channel: u8,
#[cfg(feature = "packet_capture")]
serial: String,
}
impl SnifferReceiver {
pub async fn recv(&self) -> Option<Result<ReceivedSnifferPacket>> {
let rx = self.packet_rx.as_ref()?;
match rx.recv_async().await {
Ok(result) => Some(result),
Err(_) => None, }
}
pub async fn close(mut self) -> Result<Crazyradio> {
self.session_active.store(false, Ordering::Relaxed);
if let Some(close_tx) = self.close_tx.take() {
let _ = close_tx.send(());
}
if let Some(packet_rx) = self.packet_rx.take() {
drop(packet_rx);
}
if let Some(radio_rx) = self.radio_rx.take() {
match radio_rx.recv_async().await {
Ok(result) => result,
Err(_) => Err(Error::SnifferSessionClosed),
}
} else {
Err(Error::SnifferSessionClosed)
}
}
}
impl SnifferSender {
pub async fn send_broadcast(&self, address: &[u8; 5], data: &[u8]) -> Result<()> {
if !self.session_active.load(Ordering::Relaxed) {
return Err(Error::SnifferSessionClosed);
}
if data.is_empty() || data.len() > 63 {
return Err(Error::InvalidArgument);
}
let handle = self.device_handle.clone();
let mut buf = Vec::with_capacity(5 + data.len());
buf.extend_from_slice(address);
buf.extend_from_slice(data);
#[cfg(feature = "packet_capture")]
let (channel, serial) = (self.channel, self.serial.clone());
#[cfg(feature = "packet_capture")]
let capture_address = *address;
let (tx, rx) = flume::bounded(1);
std::thread::spawn(move || {
#[cfg(feature = "packet_capture")]
crate::capture::capture_packet(
crate::capture::DIRECTION_TX,
channel,
&capture_address,
&serial,
&buf[5..],
);
let result = handle
.write_bulk(0x01, &buf, Duration::from_secs(1))
.map(|_| ())
.map_err(Error::from);
let _ = tx.send(result);
});
rx.recv_async().await.unwrap()
}
pub async fn get_drop_count(&self) -> Result<u32> {
if !self.session_active.load(Ordering::Relaxed) {
return Err(Error::SnifferSessionClosed);
}
let handle = self.device_handle.clone();
let (tx, rx) = flume::bounded(1);
std::thread::spawn(move || {
let mut buf = [0u8; 4];
let result = handle
.read_control(
0xC0,
UsbCommand::GetSnifferDropCount as u8,
0,
0,
&mut buf,
Duration::from_secs(1),
)
.map(|_| u32::from_le_bytes(buf))
.map_err(Error::from);
let _ = tx.send(result);
});
rx.recv_async().await.unwrap()
}
}
fn sniffer_rx_loop(
mut cr: Crazyradio,
packet_tx: flume::Sender<Result<ReceivedSnifferPacket>>,
close_rx: flume::Receiver<()>,
radio_tx: flume::Sender<Result<Crazyradio>>,
) {
const RX_TIMEOUT: Duration = Duration::from_secs(1);
loop {
if close_rx.try_recv().is_ok() || packet_tx.is_disconnected() {
break;
}
let mut payload_buf = [0u8; 63];
match cr.receive_sniffer_packet(&mut payload_buf, RX_TIMEOUT) {
Ok(Some(pkt)) => {
let received = ReceivedSnifferPacket {
rssi_dbm: pkt.rssi_dbm,
pipe: pkt.pipe,
timestamp_us: pkt.timestamp_us,
payload: payload_buf[..pkt.length].to_vec(),
};
if packet_tx.send(Ok(received)).is_err() {
break; }
}
Ok(None) => {
}
Err(e) => {
let _ = packet_tx.send(Err(e));
break;
}
}
}
let result = cr.exit_sniffer_mode().map(|_| cr);
let _ = radio_tx.send(result);
}
pub(crate) async fn enter_sniffer_mode_async(
mut cr: Crazyradio,
) -> Result<(SnifferReceiver, SnifferSender)> {
let device_handle = cr.device_handle.clone();
#[cfg(feature = "packet_capture")]
let channel: u8 = cr.channel.into();
#[cfg(feature = "packet_capture")]
let serial = cr.serial.clone();
let (setup_tx, setup_rx) = flume::bounded(1);
std::thread::spawn(move || {
match cr.enter_sniffer_mode() {
Ok(()) => {
let _ = setup_tx.send(Ok(cr));
}
Err(e) => {
let _ = setup_tx.send(Err(e));
}
}
});
let cr = setup_rx.recv_async().await.unwrap()?;
let (packet_tx, packet_rx) = flume::unbounded();
let (close_tx, close_rx) = flume::bounded(1);
let (radio_tx, radio_rx) = flume::bounded(1);
let session_active = Arc::new(AtomicBool::new(true));
std::thread::spawn(move || {
sniffer_rx_loop(cr, packet_tx, close_rx, radio_tx);
});
let receiver = SnifferReceiver {
packet_rx: Some(packet_rx),
close_tx: Some(close_tx),
radio_rx: Some(radio_rx),
session_active: session_active.clone(),
};
let sender = SnifferSender {
device_handle,
session_active,
#[cfg(feature = "packet_capture")]
channel,
#[cfg(feature = "packet_capture")]
serial,
};
Ok((receiver, sender))
}