use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_core::Stream;
use crate::error::RtlSdrError;
use crate::constants::STREAM_BACKPRESSURE_DEPTH;
use super::RtlSdrReader;
use super::reader::ReaderBusyGuard;
impl RtlSdrReader {
pub fn stream_samples_tokio(
self,
buffer_size: usize,
) -> Result<TokioSampleStream, Box<(RtlSdrError, Self)>> {
if tokio::runtime::Handle::try_current().is_err() {
return Err(Box::new((
RtlSdrError::InvalidParameter(
"stream_samples_tokio must be called from within a Tokio runtime".to_string(),
),
self,
)));
}
let guard = match ReaderBusyGuard::try_acquire(Arc::clone(&self.busy)) {
Ok(g) => g,
Err(e) => return Err(Box::new((e, self))),
};
let buffer_size = if buffer_size == 0 {
crate::constants::DEFAULT_BUF_LENGTH as usize
} else {
buffer_size
};
let (tx, rx) = tokio::sync::mpsc::channel(STREAM_BACKPRESSURE_DEPTH);
tokio::task::spawn_blocking(move || {
let _guard = guard;
let reader = self;
loop {
if tx.is_closed() {
return;
}
let mut buf = vec![0u8; buffer_size];
match super::streaming::bulk_read(&reader.handle, &reader.dev_lost, &mut buf) {
Ok(0) => return, Ok(n) => {
buf.truncate(n);
if tx.blocking_send(Ok(buf)).is_err() {
return;
}
}
Err(e) => {
if let Err(unsent) = tx.blocking_send(Err(e)) {
tracing::debug!(
"tokio stream worker exiting with unobserved error: {:?}",
unsent.0
);
}
return;
}
}
}
});
Ok(TokioSampleStream { rx })
}
}
pub struct TokioSampleStream {
rx: tokio::sync::mpsc::Receiver<Result<Vec<u8>, RtlSdrError>>,
}
impl Stream for TokioSampleStream {
type Item = Result<Vec<u8>, RtlSdrError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
const _: fn() = || {
fn assert_stream<T: Stream>() {}
fn assert_send<T: Send>() {}
fn assert_unpin<T: Unpin>() {}
assert_stream::<TokioSampleStream>();
assert_send::<TokioSampleStream>();
assert_send::<<TokioSampleStream as Stream>::Item>();
assert_unpin::<TokioSampleStream>();
};
}