use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_core::Stream;
use crate::error::RtlSdrError;
use crate::constants::STREAM_BACKPRESSURE_DEPTH;
use super::RtlSdrReader;
use super::reader::ReaderBusyGuard;
type BoxedReceiver = Pin<Box<async_channel::Receiver<Result<Vec<u8>, RtlSdrError>>>>;
impl RtlSdrReader {
pub fn stream_samples_smol(
self,
buffer_size: usize,
) -> Result<SmolSampleStream, Box<(RtlSdrError, Self)>> {
let guard = match ReaderBusyGuard::try_acquire(Arc::clone(&self.busy)) {
Ok(g) => g,
Err(e) => return Err(Box::new((e, self))),
};
let buffer_size = if buffer_size == 0 {
crate::constants::DEFAULT_BUF_LENGTH as usize
} else {
buffer_size
};
let (tx, rx) = async_channel::bounded(STREAM_BACKPRESSURE_DEPTH);
blocking::unblock(move || {
let _guard = guard;
let reader = self;
loop {
if tx.is_closed() {
return;
}
let mut buf = vec![0u8; buffer_size];
match super::streaming::bulk_read(&reader.handle, &reader.dev_lost, &mut buf) {
Ok(0) => return, Ok(n) => {
buf.truncate(n);
if tx.send_blocking(Ok(buf)).is_err() {
return;
}
}
Err(e) => {
if let Err(unsent) = tx.send_blocking(Err(e)) {
tracing::debug!(
"smol stream worker exiting with unobserved error: {:?}",
unsent.into_inner()
);
}
return;
}
}
}
})
.detach();
Ok(SmolSampleStream { rx: Box::pin(rx) })
}
}
pub struct SmolSampleStream {
rx: BoxedReceiver,
}
impl Stream for SmolSampleStream {
type Item = Result<Vec<u8>, RtlSdrError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.as_mut().poll_next(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
const _: fn() = || {
fn assert_stream<T: Stream>() {}
fn assert_send<T: Send>() {}
fn assert_unpin<T: Unpin>() {}
assert_stream::<SmolSampleStream>();
assert_send::<SmolSampleStream>();
assert_send::<<SmolSampleStream as Stream>::Item>();
assert_unpin::<SmolSampleStream>();
};
static_assertions::assert_not_impl_any!(
async_channel::Receiver<()>: Unpin
);
}