use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::stream::Stream;
use tokio::sync::mpsc;
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use aranet_types::CurrentReading;
use crate::device::Device;
use crate::error::Error;
#[derive(Debug, Clone)]
pub struct StreamOptions {
pub poll_interval: Duration,
pub buffer_size: usize,
pub include_errors: bool,
pub max_consecutive_failures: Option<u32>,
}
impl Default for StreamOptions {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
buffer_size: 16,
include_errors: false,
max_consecutive_failures: Some(10),
}
}
}
impl StreamOptions {
pub fn builder() -> StreamOptionsBuilder {
StreamOptionsBuilder::default()
}
pub fn with_interval(interval: Duration) -> Self {
Self {
poll_interval: interval,
..Default::default()
}
}
pub fn validate(&self) -> crate::error::Result<()> {
if self.buffer_size == 0 {
return Err(crate::error::Error::InvalidConfig(
"buffer_size must be > 0".to_string(),
));
}
if self.poll_interval.is_zero() {
return Err(crate::error::Error::InvalidConfig(
"poll_interval must be > 0".to_string(),
));
}
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamOptionsBuilder {
options: StreamOptions,
}
impl StreamOptionsBuilder {
#[must_use]
pub fn poll_interval(mut self, interval: Duration) -> Self {
self.options.poll_interval = interval;
self
}
#[must_use]
pub fn buffer_size(mut self, size: usize) -> Self {
self.options.buffer_size = size;
self
}
#[must_use]
pub fn include_errors(mut self, include: bool) -> Self {
self.options.include_errors = include;
self
}
#[must_use]
pub fn max_consecutive_failures(mut self, max: u32) -> Self {
self.options.max_consecutive_failures = Some(max);
self
}
#[must_use]
pub fn build(self) -> StreamOptions {
self.options
}
}
pub struct ReadingStream {
receiver: mpsc::Receiver<ReadingResult>,
handle: tokio::task::JoinHandle<()>,
cancel_token: CancellationToken,
}
pub type ReadingResult = std::result::Result<CurrentReading, Error>;
impl ReadingStream {
pub fn new(device: Arc<Device>, options: StreamOptions) -> Self {
let options = if let Err(e) = options.validate() {
warn!("Invalid stream options ({e}), using defaults");
StreamOptions::default()
} else {
options
};
let (tx, rx) = mpsc::channel(options.buffer_size);
let cancel_token = CancellationToken::new();
let task_token = cancel_token.clone();
let max_failures = options.max_consecutive_failures;
let handle = tokio::spawn(async move {
let mut interval = interval(options.poll_interval);
let mut consecutive_failures: u32 = 0;
loop {
tokio::select! {
_ = task_token.cancelled() => {
debug!("Stream cancelled, stopping gracefully");
break;
}
_ = interval.tick() => {
match device.read_current().await {
Ok(reading) => {
consecutive_failures = 0;
if tx.send(Ok(reading)).await.is_err() {
debug!("Stream receiver dropped, stopping");
break;
}
}
Err(e) => {
consecutive_failures += 1;
warn!(
"Error reading from device (failure {}/{}): {}",
consecutive_failures,
max_failures.map_or("∞".to_string(), |n| n.to_string()),
e
);
if let Some(max) = max_failures
&& consecutive_failures >= max {
warn!(
"Max consecutive failures ({}) reached, auto-closing stream",
max
);
if options.include_errors {
let _ = tx.send(Err(e)).await;
}
break;
}
if options.include_errors && tx.send(Err(e)).await.is_err() {
debug!("Stream receiver dropped, stopping");
break;
}
}
}
}
}
}
});
Self {
receiver: rx,
handle,
cancel_token,
}
}
pub fn close(self) {
self.cancel_token.cancel();
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
pub fn is_active(&self) -> bool {
!self.handle.is_finished()
}
pub fn is_cancelled(&self) -> bool {
self.cancel_token.is_cancelled()
}
pub fn has_unexpectedly_stopped(&self) -> bool {
self.handle.is_finished() && !self.cancel_token.is_cancelled()
}
#[deprecated(
since = "0.2.0",
note = "Use has_unexpectedly_stopped() instead for clearer semantics"
)]
pub fn has_panicked(&self) -> bool {
self.has_unexpectedly_stopped()
}
}
impl Drop for ReadingStream {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
impl Stream for ReadingStream {
type Item = ReadingResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_recv(cx)
}
}
pub trait DeviceStreamExt {
fn stream(self: Arc<Self>) -> ReadingStream;
fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream;
}
impl DeviceStreamExt for Device {
fn stream(self: Arc<Self>) -> ReadingStream {
ReadingStream::new(self, StreamOptions::default())
}
fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream {
ReadingStream::new(self, options)
}
}
pub fn from_device(device: Arc<Device>, poll_interval: Duration) -> ReadingStream {
ReadingStream::new(device, StreamOptions::with_interval(poll_interval))
}
pub fn from_device_default(device: Arc<Device>) -> ReadingStream {
ReadingStream::new(device, StreamOptions::default())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_options_default() {
let opts = StreamOptions::default();
assert_eq!(opts.poll_interval, Duration::from_secs(1));
assert_eq!(opts.buffer_size, 16);
assert!(!opts.include_errors);
}
#[test]
fn test_stream_options_with_interval() {
let opts = StreamOptions::with_interval(Duration::from_millis(500));
assert_eq!(opts.poll_interval, Duration::from_millis(500));
}
#[test]
fn test_stream_options_builder() {
let opts = StreamOptions::builder()
.poll_interval(Duration::from_secs(5))
.buffer_size(32)
.include_errors(true)
.build();
assert_eq!(opts.poll_interval, Duration::from_secs(5));
assert_eq!(opts.buffer_size, 32);
assert!(opts.include_errors);
}
#[test]
fn test_stream_options_builder_partial() {
let opts = StreamOptions::builder().include_errors(true).build();
assert_eq!(opts.poll_interval, Duration::from_secs(1)); assert_eq!(opts.buffer_size, 16); assert!(opts.include_errors); }
}