use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use crate::constants::{BULK_TIMEOUT, DEFAULT_BUF_LENGTH};
use crate::error::RtlSdrError;
use crate::reg::Block;
use crate::usb;
use super::RtlSdrDevice;
use super::reader::ReaderBusyGuard;
pub type ReadAsyncCb = Box<dyn FnMut(&[u8]) + Send>;
const MAX_BUF_LENGTH: u32 = 16 * 1024 * 1024;
const BULK_ALIGNMENT: u32 = 512;
const ASYNC_POLL_TIMEOUT: Duration = Duration::from_secs(1);
const MAX_CONSECUTIVE_ZERO_READS: u32 = 100;
pub(crate) fn bulk_read(
handle: &rusb::DeviceHandle<rusb::GlobalContext>,
buf: &mut [u8],
) -> Result<usize, RtlSdrError> {
let timeout = if BULK_TIMEOUT == 0 {
Duration::from_secs(5)
} else {
Duration::from_millis(BULK_TIMEOUT)
};
match handle.read_bulk(crate::constants::BULK_ENDPOINT, buf, timeout) {
Ok(n) => Ok(n),
Err(rusb::Error::NoDevice) => Err(RtlSdrError::DeviceLost),
Err(e) => Err(e.into()),
}
}
impl RtlSdrDevice {
pub fn reset_buffer(&self) -> Result<(), RtlSdrError> {
usb::write_reg(
&self.handle,
Block::Usb,
crate::reg::usb_reg::USB_EPA_CTL,
0x1002,
2,
)?;
usb::write_reg(
&self.handle,
Block::Usb,
crate::reg::usb_reg::USB_EPA_CTL,
0x0000,
2,
)
}
pub fn usb_handle(&self) -> std::sync::Arc<rusb::DeviceHandle<rusb::GlobalContext>> {
std::sync::Arc::clone(&self.handle)
}
pub fn read_sync(&self, buf: &mut [u8]) -> Result<usize, RtlSdrError> {
let _guard = ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy))?;
bulk_read(&self.handle, buf)
}
pub fn iter_samples(&self, buffer_size: usize) -> SampleIter<'_> {
let buffer_size = if buffer_size == 0 {
DEFAULT_BUF_LENGTH as usize
} else {
buffer_size
};
let (guard, pending_error) =
match ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy)) {
Ok(g) => (Some(g), None),
Err(e) => (None, Some(e)),
};
SampleIter {
device: Some(self),
buffer_size,
_guard: guard,
pending_error,
}
}
pub fn read_async_blocking(
&self,
mut cb: ReadAsyncCb,
cancel_flag: &AtomicBool,
buf_len: u32,
) -> Result<(), RtlSdrError> {
let _guard = ReaderBusyGuard::try_acquire(std::sync::Arc::clone(&self.reader_busy))?;
let actual_buf_len = if buf_len == 0 {
DEFAULT_BUF_LENGTH as usize
} else if !buf_len.is_multiple_of(BULK_ALIGNMENT) || buf_len > MAX_BUF_LENGTH {
return Err(RtlSdrError::InvalidParameter(format!(
"buf_len must be a multiple of {BULK_ALIGNMENT} and <= {MAX_BUF_LENGTH}, got {buf_len}"
)));
} else {
buf_len as usize
};
let timeout = ASYNC_POLL_TIMEOUT;
let mut buf = vec![0u8; actual_buf_len];
let mut consecutive_zero_reads: u32 = 0;
while !cancel_flag.load(Ordering::Relaxed) {
match self
.handle
.read_bulk(crate::constants::BULK_ENDPOINT, &mut buf, timeout)
{
Ok(n) if n > 0 => {
consecutive_zero_reads = 0;
cb(&buf[..n]);
}
Ok(_) => {
consecutive_zero_reads += 1;
if consecutive_zero_reads >= MAX_CONSECUTIVE_ZERO_READS {
tracing::warn!(
"read_async_blocking: {MAX_CONSECUTIVE_ZERO_READS} consecutive \
zero-length reads — fusing the loop (degenerate device?)"
);
return Ok(());
}
}
Err(rusb::Error::Timeout) => {
}
Err(rusb::Error::NoDevice) => {
return Err(RtlSdrError::DeviceLost);
}
Err(e) => {
tracing::error!("bulk read error: {e}");
return Err(RtlSdrError::Usb(e));
}
}
}
Ok(())
}
}
pub struct SampleIter<'a> {
device: Option<&'a RtlSdrDevice>,
buffer_size: usize,
_guard: Option<ReaderBusyGuard>,
pending_error: Option<RtlSdrError>,
}
impl Iterator for SampleIter<'_> {
type Item = Result<Vec<u8>, RtlSdrError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(e) = self.pending_error.take() {
self.device = None;
return Some(Err(e));
}
let device = self.device?;
let mut buf = vec![0u8; self.buffer_size];
match bulk_read(&device.handle, &mut buf) {
Ok(0) => {
self.device = None;
None
}
Ok(n) => {
buf.truncate(n);
Some(Ok(buf))
}
Err(e) => {
self.device = None;
Some(Err(e))
}
}
}
}
impl std::iter::FusedIterator for SampleIter<'_> {}
#[cfg(test)]
mod tests {
use super::*;
const _: fn() = || {
fn assert_iter<T: Iterator>() {}
fn assert_fused<T: std::iter::FusedIterator>() {}
assert_iter::<SampleIter<'_>>();
assert_fused::<SampleIter<'_>>();
};
static_assertions::assert_not_impl_any!(SampleIter<'static>: Send);
}