Skip to main content

aranet_core/
streaming.rs

1//! Real-time streaming of sensor readings via BLE notifications.
2//!
3//! This module provides functionality to subscribe to sensor readings
4//! and receive them as an async stream.
5//!
6//! The stream supports graceful shutdown via the [`ReadingStream::close`] method,
7//! which uses a cancellation token to cleanly stop the background polling task.
8
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13
14use futures::stream::Stream;
15use tokio::sync::mpsc;
16use tokio::time::interval;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, warn};
19
20use aranet_types::CurrentReading;
21
22use crate::device::Device;
23use crate::error::Error;
24
25/// Options for reading streams.
26///
27/// Use the builder pattern for convenient configuration:
28///
29/// ```ignore
30/// let options = StreamOptions::builder()
31///     .poll_interval(Duration::from_secs(5))
32///     .include_errors(true)
33///     .max_consecutive_failures(5)
34///     .build();
35/// ```
36#[derive(Debug, Clone)]
37pub struct StreamOptions {
38    /// Polling interval for devices that don't support notifications.
39    /// Default: 1 second.
40    pub poll_interval: Duration,
41    /// Buffer size for the reading channel.
42    /// Default: 16 readings.
43    pub buffer_size: usize,
44    /// Whether to include failed reads in the stream.
45    ///
46    /// When `false` (default), read errors are logged but not sent to the stream.
47    /// When `true`, errors are sent as `Err(Error)` items, allowing the consumer
48    /// to detect and handle connection issues.
49    ///
50    /// **Recommendation:** Set to `true` for applications that need to detect
51    /// disconnections or errors in real-time.
52    pub include_errors: bool,
53    /// Maximum consecutive failures before auto-closing the stream.
54    ///
55    /// When set to `Some(n)`, the stream will automatically close after `n`
56    /// consecutive read failures, indicating a likely disconnection.
57    /// When `None` (default), the stream will continue indefinitely regardless
58    /// of failures.
59    ///
60    /// **Recommendation:** Set to `Some(5)` or similar for production use to
61    /// prevent indefinite polling of a disconnected device.
62    pub max_consecutive_failures: Option<u32>,
63}
64
65impl Default for StreamOptions {
66    fn default() -> Self {
67        Self {
68            poll_interval: Duration::from_secs(1),
69            buffer_size: 16,
70            include_errors: false,
71            max_consecutive_failures: Some(10),
72        }
73    }
74}
75
76impl StreamOptions {
77    /// Create a new builder for StreamOptions.
78    pub fn builder() -> StreamOptionsBuilder {
79        StreamOptionsBuilder::default()
80    }
81
82    /// Create options with a specific poll interval.
83    pub fn with_interval(interval: Duration) -> Self {
84        Self {
85            poll_interval: interval,
86            ..Default::default()
87        }
88    }
89
90    /// Validate the options and return an error if invalid.
91    ///
92    /// Checks that:
93    /// - `buffer_size` is > 0
94    /// - `poll_interval` is > 0
95    pub fn validate(&self) -> crate::error::Result<()> {
96        if self.buffer_size == 0 {
97            return Err(crate::error::Error::InvalidConfig(
98                "buffer_size must be > 0".to_string(),
99            ));
100        }
101        if self.poll_interval.is_zero() {
102            return Err(crate::error::Error::InvalidConfig(
103                "poll_interval must be > 0".to_string(),
104            ));
105        }
106        Ok(())
107    }
108}
109
110/// Builder for StreamOptions.
111#[derive(Debug, Clone, Default)]
112pub struct StreamOptionsBuilder {
113    options: StreamOptions,
114}
115
116impl StreamOptionsBuilder {
117    /// Set the polling interval.
118    #[must_use]
119    pub fn poll_interval(mut self, interval: Duration) -> Self {
120        self.options.poll_interval = interval;
121        self
122    }
123
124    /// Set the buffer size.
125    #[must_use]
126    pub fn buffer_size(mut self, size: usize) -> Self {
127        self.options.buffer_size = size;
128        self
129    }
130
131    /// Set whether to include errors in the stream.
132    ///
133    /// When `true`, read errors are sent as `Err(Error)` items to the stream,
134    /// allowing consumers to detect disconnections and other issues.
135    #[must_use]
136    pub fn include_errors(mut self, include: bool) -> Self {
137        self.options.include_errors = include;
138        self
139    }
140
141    /// Set the maximum consecutive failures before auto-closing.
142    ///
143    /// When set, the stream will automatically close after this many
144    /// consecutive read failures, indicating a likely disconnection.
145    #[must_use]
146    pub fn max_consecutive_failures(mut self, max: u32) -> Self {
147        self.options.max_consecutive_failures = Some(max);
148        self
149    }
150
151    /// Build the StreamOptions.
152    #[must_use]
153    pub fn build(self) -> StreamOptions {
154        self.options
155    }
156}
157
158/// A stream of sensor readings from a device.
159///
160/// The stream polls the device at a configured interval and sends readings
161/// through a channel. It supports graceful shutdown via [`close`](Self::close).
162pub struct ReadingStream {
163    receiver: mpsc::Receiver<ReadingResult>,
164    handle: tokio::task::JoinHandle<()>,
165    cancel_token: CancellationToken,
166}
167
168/// Result type for stream items.
169pub type ReadingResult = std::result::Result<CurrentReading, Error>;
170
171impl ReadingStream {
172    /// Create a new reading stream from a connected device (takes Arc).
173    ///
174    /// This spawns a background task that polls the device at the configured
175    /// interval and sends readings to the stream.
176    ///
177    /// If `max_consecutive_failures` is set, the stream will automatically
178    /// close after that many consecutive read failures.
179    ///
180    /// Invalid options (zero buffer size, zero poll interval) are replaced
181    /// with defaults and a warning is logged.
182    pub fn new(device: Arc<Device>, options: StreamOptions) -> Self {
183        let options = if let Err(e) = options.validate() {
184            warn!("Invalid stream options ({e}), using defaults");
185            StreamOptions::default()
186        } else {
187            options
188        };
189        let (tx, rx) = mpsc::channel(options.buffer_size);
190        let cancel_token = CancellationToken::new();
191        let task_token = cancel_token.clone();
192        let max_failures = options.max_consecutive_failures;
193
194        let handle = tokio::spawn(async move {
195            let mut interval = interval(options.poll_interval);
196            let mut consecutive_failures: u32 = 0;
197
198            loop {
199                tokio::select! {
200                    _ = task_token.cancelled() => {
201                        debug!("Stream cancelled, stopping gracefully");
202                        break;
203                    }
204                    _ = interval.tick() => {
205                        match device.read_current().await {
206                            Ok(reading) => {
207                                // Reset failure counter on success
208                                consecutive_failures = 0;
209                                if tx.send(Ok(reading)).await.is_err() {
210                                    debug!("Stream receiver dropped, stopping");
211                                    break;
212                                }
213                            }
214                            Err(e) => {
215                                consecutive_failures += 1;
216                                warn!(
217                                    "Error reading from device (failure {}/{}): {}",
218                                    consecutive_failures,
219                                    max_failures.map_or("∞".to_string(), |n| n.to_string()),
220                                    e
221                                );
222
223                                // Check if we've exceeded max consecutive failures
224                                if let Some(max) = max_failures
225                                    && consecutive_failures >= max {
226                                        warn!(
227                                            "Max consecutive failures ({}) reached, auto-closing stream",
228                                            max
229                                        );
230                                        // Send final error if configured to include errors
231                                        if options.include_errors {
232                                            let _ = tx.send(Err(e)).await;
233                                        }
234                                        break;
235                                    }
236
237                                if options.include_errors && tx.send(Err(e)).await.is_err() {
238                                    debug!("Stream receiver dropped, stopping");
239                                    break;
240                                }
241                            }
242                        }
243                    }
244                }
245            }
246        });
247
248        Self {
249            receiver: rx,
250            handle,
251            cancel_token,
252        }
253    }
254
255    /// Close the stream and stop the background polling task gracefully.
256    ///
257    /// This signals the background task to stop via a cancellation token,
258    /// allowing it to complete any in-progress operations before exiting.
259    /// This is preferred over aborting the task, which may leave resources
260    /// in an inconsistent state.
261    pub fn close(self) {
262        self.cancel_token.cancel();
263        // The handle will complete on its own; we don't need to await it
264    }
265
266    /// Get a cancellation token that can be used to cancel the stream externally.
267    ///
268    /// This allows multiple places to trigger cancellation of the stream.
269    pub fn cancellation_token(&self) -> CancellationToken {
270        self.cancel_token.clone()
271    }
272
273    /// Check if the stream is still active (background task running).
274    pub fn is_active(&self) -> bool {
275        !self.handle.is_finished()
276    }
277
278    /// Check if the stream has been cancelled.
279    pub fn is_cancelled(&self) -> bool {
280        self.cancel_token.is_cancelled()
281    }
282
283    /// Check if the stream stopped unexpectedly.
284    ///
285    /// Returns `true` if the background task has finished but was not explicitly
286    /// cancelled via [`close()`](Self::close) or by dropping the stream.
287    ///
288    /// This can indicate:
289    /// - A panic in the background task
290    /// - The stream auto-closed due to reaching `max_consecutive_failures`
291    /// - The receiver was dropped unexpectedly
292    ///
293    /// This can be useful for detecting and handling unexpected stream termination:
294    ///
295    /// ```ignore
296    /// if stream.has_unexpectedly_stopped() {
297    ///     // Log the event and potentially restart the stream
298    ///     log::warn!("Stream stopped unexpectedly - may need restart");
299    /// }
300    /// ```
301    ///
302    /// Note: To distinguish between auto-close due to failures vs actual panics,
303    /// you may need additional monitoring of the stream's error output.
304    pub fn has_unexpectedly_stopped(&self) -> bool {
305        self.handle.is_finished() && !self.cancel_token.is_cancelled()
306    }
307
308    /// Check if the background task has panicked.
309    ///
310    /// **Deprecated:** Use [`has_unexpectedly_stopped()`](Self::has_unexpectedly_stopped) instead,
311    /// which has clearer semantics. This method may return `true` even when the stream
312    /// stopped due to `max_consecutive_failures` being reached, not just panics.
313    #[deprecated(
314        since = "0.2.0",
315        note = "Use has_unexpectedly_stopped() instead for clearer semantics"
316    )]
317    pub fn has_panicked(&self) -> bool {
318        self.has_unexpectedly_stopped()
319    }
320}
321
322impl Drop for ReadingStream {
323    fn drop(&mut self) {
324        // Ensure the background task is cancelled when the stream is dropped.
325        // This prevents resource leaks if the stream is dropped without calling close().
326        self.cancel_token.cancel();
327    }
328}
329
330impl Stream for ReadingStream {
331    type Item = ReadingResult;
332
333    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
334        Pin::new(&mut self.receiver).poll_recv(cx)
335    }
336}
337
338/// Extension trait for Device to create reading streams.
339///
340/// **Note:** This trait requires `Arc<Self>` because the stream's background task
341/// needs to hold a reference to the device that outlives the method call.
342///
343/// # Example
344///
345/// ```ignore
346/// use std::sync::Arc;
347/// use aranet_core::{Device, DeviceStreamExt};
348/// use futures::StreamExt;
349///
350/// // Wrap device in Arc for streaming
351/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
352///
353/// // Create a stream and consume readings
354/// let mut stream = device.stream();
355/// while let Some(result) = stream.next().await {
356///     match result {
357///         Ok(reading) => println!("CO2: {} ppm", reading.co2),
358///         Err(e) => eprintln!("Error: {}", e),
359///     }
360/// }
361/// ```
362pub trait DeviceStreamExt {
363    /// Create a reading stream with default options.
364    ///
365    /// Polls the device every second and buffers up to 16 readings.
366    fn stream(self: Arc<Self>) -> ReadingStream;
367
368    /// Create a reading stream with custom options.
369    fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream;
370}
371
372impl DeviceStreamExt for Device {
373    fn stream(self: Arc<Self>) -> ReadingStream {
374        ReadingStream::new(self, StreamOptions::default())
375    }
376
377    fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream {
378        ReadingStream::new(self, options)
379    }
380}
381
382/// Create a stream from a device without needing the trait import.
383///
384/// This is a convenience function for creating a polling stream.
385///
386/// # Example
387///
388/// ```ignore
389/// use std::sync::Arc;
390/// use std::time::Duration;
391/// use aranet_core::{Device, streaming};
392///
393/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
394/// let stream = streaming::from_device(device, Duration::from_secs(5));
395/// ```
396pub fn from_device(device: Arc<Device>, poll_interval: Duration) -> ReadingStream {
397    ReadingStream::new(device, StreamOptions::with_interval(poll_interval))
398}
399
400/// Create a stream with default options from a device.
401///
402/// Convenience function that wraps `from_device` with a 1-second interval.
403pub fn from_device_default(device: Arc<Device>) -> ReadingStream {
404    ReadingStream::new(device, StreamOptions::default())
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn test_stream_options_default() {
413        let opts = StreamOptions::default();
414        assert_eq!(opts.poll_interval, Duration::from_secs(1));
415        assert_eq!(opts.buffer_size, 16);
416        assert!(!opts.include_errors);
417    }
418
419    #[test]
420    fn test_stream_options_with_interval() {
421        let opts = StreamOptions::with_interval(Duration::from_millis(500));
422        assert_eq!(opts.poll_interval, Duration::from_millis(500));
423    }
424
425    #[test]
426    fn test_stream_options_builder() {
427        let opts = StreamOptions::builder()
428            .poll_interval(Duration::from_secs(5))
429            .buffer_size(32)
430            .include_errors(true)
431            .build();
432
433        assert_eq!(opts.poll_interval, Duration::from_secs(5));
434        assert_eq!(opts.buffer_size, 32);
435        assert!(opts.include_errors);
436    }
437
438    #[test]
439    fn test_stream_options_builder_partial() {
440        // Only set some options, others should be defaults
441        let opts = StreamOptions::builder().include_errors(true).build();
442
443        assert_eq!(opts.poll_interval, Duration::from_secs(1)); // default
444        assert_eq!(opts.buffer_size, 16); // default
445        assert!(opts.include_errors); // set
446    }
447}