aranet_core/streaming.rs
1//! Real-time streaming of sensor readings via BLE notifications.
2//!
3//! This module provides functionality to subscribe to sensor readings
4//! and receive them as an async stream.
5//!
6//! The stream supports graceful shutdown via the [`ReadingStream::close`] method,
7//! which uses a cancellation token to cleanly stop the background polling task.
8
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13
14use futures::stream::Stream;
15use tokio::sync::mpsc;
16use tokio::time::interval;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, warn};
19
20use aranet_types::CurrentReading;
21
22use crate::device::Device;
23use crate::error::Error;
24
25/// Options for reading streams.
26///
27/// Use the builder pattern for convenient configuration:
28///
29/// ```ignore
30/// let options = StreamOptions::builder()
31/// .poll_interval(Duration::from_secs(5))
32/// .include_errors(true)
33/// .max_consecutive_failures(5)
34/// .build();
35/// ```
36#[derive(Debug, Clone)]
37pub struct StreamOptions {
38 /// Polling interval for devices that don't support notifications.
39 /// Default: 1 second.
40 pub poll_interval: Duration,
41 /// Buffer size for the reading channel.
42 /// Default: 16 readings.
43 pub buffer_size: usize,
44 /// Whether to include failed reads in the stream.
45 ///
46 /// When `false` (default), read errors are logged but not sent to the stream.
47 /// When `true`, errors are sent as `Err(Error)` items, allowing the consumer
48 /// to detect and handle connection issues.
49 ///
50 /// **Recommendation:** Set to `true` for applications that need to detect
51 /// disconnections or errors in real-time.
52 pub include_errors: bool,
53 /// Maximum consecutive failures before auto-closing the stream.
54 ///
55 /// When set to `Some(n)`, the stream will automatically close after `n`
56 /// consecutive read failures, indicating a likely disconnection.
57 /// When `None` (default), the stream will continue indefinitely regardless
58 /// of failures.
59 ///
60 /// **Recommendation:** Set to `Some(5)` or similar for production use to
61 /// prevent indefinite polling of a disconnected device.
62 pub max_consecutive_failures: Option<u32>,
63}
64
65impl Default for StreamOptions {
66 fn default() -> Self {
67 Self {
68 poll_interval: Duration::from_secs(1),
69 buffer_size: 16,
70 include_errors: false,
71 max_consecutive_failures: Some(10),
72 }
73 }
74}
75
76impl StreamOptions {
77 /// Create a new builder for StreamOptions.
78 pub fn builder() -> StreamOptionsBuilder {
79 StreamOptionsBuilder::default()
80 }
81
82 /// Create options with a specific poll interval.
83 pub fn with_interval(interval: Duration) -> Self {
84 Self {
85 poll_interval: interval,
86 ..Default::default()
87 }
88 }
89
90 /// Validate the options and return an error if invalid.
91 ///
92 /// Checks that:
93 /// - `buffer_size` is > 0
94 /// - `poll_interval` is > 0
95 pub fn validate(&self) -> crate::error::Result<()> {
96 if self.buffer_size == 0 {
97 return Err(crate::error::Error::InvalidConfig(
98 "buffer_size must be > 0".to_string(),
99 ));
100 }
101 if self.poll_interval.is_zero() {
102 return Err(crate::error::Error::InvalidConfig(
103 "poll_interval must be > 0".to_string(),
104 ));
105 }
106 Ok(())
107 }
108}
109
110/// Builder for StreamOptions.
111#[derive(Debug, Clone, Default)]
112pub struct StreamOptionsBuilder {
113 options: StreamOptions,
114}
115
116impl StreamOptionsBuilder {
117 /// Set the polling interval.
118 #[must_use]
119 pub fn poll_interval(mut self, interval: Duration) -> Self {
120 self.options.poll_interval = interval;
121 self
122 }
123
124 /// Set the buffer size.
125 #[must_use]
126 pub fn buffer_size(mut self, size: usize) -> Self {
127 self.options.buffer_size = size;
128 self
129 }
130
131 /// Set whether to include errors in the stream.
132 ///
133 /// When `true`, read errors are sent as `Err(Error)` items to the stream,
134 /// allowing consumers to detect disconnections and other issues.
135 #[must_use]
136 pub fn include_errors(mut self, include: bool) -> Self {
137 self.options.include_errors = include;
138 self
139 }
140
141 /// Set the maximum consecutive failures before auto-closing.
142 ///
143 /// When set, the stream will automatically close after this many
144 /// consecutive read failures, indicating a likely disconnection.
145 #[must_use]
146 pub fn max_consecutive_failures(mut self, max: u32) -> Self {
147 self.options.max_consecutive_failures = Some(max);
148 self
149 }
150
151 /// Build the StreamOptions.
152 #[must_use]
153 pub fn build(self) -> StreamOptions {
154 self.options
155 }
156}
157
158/// A stream of sensor readings from a device.
159///
160/// The stream polls the device at a configured interval and sends readings
161/// through a channel. It supports graceful shutdown via [`close`](Self::close).
162pub struct ReadingStream {
163 receiver: mpsc::Receiver<ReadingResult>,
164 handle: tokio::task::JoinHandle<()>,
165 cancel_token: CancellationToken,
166}
167
168/// Result type for stream items.
169pub type ReadingResult = std::result::Result<CurrentReading, Error>;
170
171impl ReadingStream {
172 /// Create a new reading stream from a connected device (takes Arc).
173 ///
174 /// This spawns a background task that polls the device at the configured
175 /// interval and sends readings to the stream.
176 ///
177 /// If `max_consecutive_failures` is set, the stream will automatically
178 /// close after that many consecutive read failures.
179 ///
180 /// Invalid options (zero buffer size, zero poll interval) are replaced
181 /// with defaults and a warning is logged.
182 pub fn new(device: Arc<Device>, options: StreamOptions) -> Self {
183 let options = if let Err(e) = options.validate() {
184 warn!("Invalid stream options ({e}), using defaults");
185 StreamOptions::default()
186 } else {
187 options
188 };
189 let (tx, rx) = mpsc::channel(options.buffer_size);
190 let cancel_token = CancellationToken::new();
191 let task_token = cancel_token.clone();
192 let max_failures = options.max_consecutive_failures;
193
194 let handle = tokio::spawn(async move {
195 let mut interval = interval(options.poll_interval);
196 let mut consecutive_failures: u32 = 0;
197
198 loop {
199 tokio::select! {
200 _ = task_token.cancelled() => {
201 debug!("Stream cancelled, stopping gracefully");
202 break;
203 }
204 _ = interval.tick() => {
205 match device.read_current().await {
206 Ok(reading) => {
207 // Reset failure counter on success
208 consecutive_failures = 0;
209 if tx.send(Ok(reading)).await.is_err() {
210 debug!("Stream receiver dropped, stopping");
211 break;
212 }
213 }
214 Err(e) => {
215 consecutive_failures += 1;
216 warn!(
217 "Error reading from device (failure {}/{}): {}",
218 consecutive_failures,
219 max_failures.map_or("∞".to_string(), |n| n.to_string()),
220 e
221 );
222
223 // Check if we've exceeded max consecutive failures
224 if let Some(max) = max_failures
225 && consecutive_failures >= max {
226 warn!(
227 "Max consecutive failures ({}) reached, auto-closing stream",
228 max
229 );
230 // Send final error if configured to include errors
231 if options.include_errors {
232 let _ = tx.send(Err(e)).await;
233 }
234 break;
235 }
236
237 if options.include_errors && tx.send(Err(e)).await.is_err() {
238 debug!("Stream receiver dropped, stopping");
239 break;
240 }
241 }
242 }
243 }
244 }
245 }
246 });
247
248 Self {
249 receiver: rx,
250 handle,
251 cancel_token,
252 }
253 }
254
255 /// Close the stream and stop the background polling task gracefully.
256 ///
257 /// This signals the background task to stop via a cancellation token,
258 /// allowing it to complete any in-progress operations before exiting.
259 /// This is preferred over aborting the task, which may leave resources
260 /// in an inconsistent state.
261 pub fn close(self) {
262 self.cancel_token.cancel();
263 // The handle will complete on its own; we don't need to await it
264 }
265
266 /// Get a cancellation token that can be used to cancel the stream externally.
267 ///
268 /// This allows multiple places to trigger cancellation of the stream.
269 pub fn cancellation_token(&self) -> CancellationToken {
270 self.cancel_token.clone()
271 }
272
273 /// Check if the stream is still active (background task running).
274 pub fn is_active(&self) -> bool {
275 !self.handle.is_finished()
276 }
277
278 /// Check if the stream has been cancelled.
279 pub fn is_cancelled(&self) -> bool {
280 self.cancel_token.is_cancelled()
281 }
282
283 /// Check if the stream stopped unexpectedly.
284 ///
285 /// Returns `true` if the background task has finished but was not explicitly
286 /// cancelled via [`close()`](Self::close) or by dropping the stream.
287 ///
288 /// This can indicate:
289 /// - A panic in the background task
290 /// - The stream auto-closed due to reaching `max_consecutive_failures`
291 /// - The receiver was dropped unexpectedly
292 ///
293 /// This can be useful for detecting and handling unexpected stream termination:
294 ///
295 /// ```ignore
296 /// if stream.has_unexpectedly_stopped() {
297 /// // Log the event and potentially restart the stream
298 /// log::warn!("Stream stopped unexpectedly - may need restart");
299 /// }
300 /// ```
301 ///
302 /// Note: To distinguish between auto-close due to failures vs actual panics,
303 /// you may need additional monitoring of the stream's error output.
304 pub fn has_unexpectedly_stopped(&self) -> bool {
305 self.handle.is_finished() && !self.cancel_token.is_cancelled()
306 }
307
308 /// Check if the background task has panicked.
309 ///
310 /// **Deprecated:** Use [`has_unexpectedly_stopped()`](Self::has_unexpectedly_stopped) instead,
311 /// which has clearer semantics. This method may return `true` even when the stream
312 /// stopped due to `max_consecutive_failures` being reached, not just panics.
313 #[deprecated(
314 since = "0.2.0",
315 note = "Use has_unexpectedly_stopped() instead for clearer semantics"
316 )]
317 pub fn has_panicked(&self) -> bool {
318 self.has_unexpectedly_stopped()
319 }
320}
321
322impl Drop for ReadingStream {
323 fn drop(&mut self) {
324 // Ensure the background task is cancelled when the stream is dropped.
325 // This prevents resource leaks if the stream is dropped without calling close().
326 self.cancel_token.cancel();
327 }
328}
329
330impl Stream for ReadingStream {
331 type Item = ReadingResult;
332
333 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
334 Pin::new(&mut self.receiver).poll_recv(cx)
335 }
336}
337
338/// Extension trait for Device to create reading streams.
339///
340/// **Note:** This trait requires `Arc<Self>` because the stream's background task
341/// needs to hold a reference to the device that outlives the method call.
342///
343/// # Example
344///
345/// ```ignore
346/// use std::sync::Arc;
347/// use aranet_core::{Device, DeviceStreamExt};
348/// use futures::StreamExt;
349///
350/// // Wrap device in Arc for streaming
351/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
352///
353/// // Create a stream and consume readings
354/// let mut stream = device.stream();
355/// while let Some(result) = stream.next().await {
356/// match result {
357/// Ok(reading) => println!("CO2: {} ppm", reading.co2),
358/// Err(e) => eprintln!("Error: {}", e),
359/// }
360/// }
361/// ```
362pub trait DeviceStreamExt {
363 /// Create a reading stream with default options.
364 ///
365 /// Polls the device every second and buffers up to 16 readings.
366 fn stream(self: Arc<Self>) -> ReadingStream;
367
368 /// Create a reading stream with custom options.
369 fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream;
370}
371
372impl DeviceStreamExt for Device {
373 fn stream(self: Arc<Self>) -> ReadingStream {
374 ReadingStream::new(self, StreamOptions::default())
375 }
376
377 fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream {
378 ReadingStream::new(self, options)
379 }
380}
381
382/// Create a stream from a device without needing the trait import.
383///
384/// This is a convenience function for creating a polling stream.
385///
386/// # Example
387///
388/// ```ignore
389/// use std::sync::Arc;
390/// use std::time::Duration;
391/// use aranet_core::{Device, streaming};
392///
393/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
394/// let stream = streaming::from_device(device, Duration::from_secs(5));
395/// ```
396pub fn from_device(device: Arc<Device>, poll_interval: Duration) -> ReadingStream {
397 ReadingStream::new(device, StreamOptions::with_interval(poll_interval))
398}
399
400/// Create a stream with default options from a device.
401///
402/// Convenience function that wraps `from_device` with a 1-second interval.
403pub fn from_device_default(device: Arc<Device>) -> ReadingStream {
404 ReadingStream::new(device, StreamOptions::default())
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[test]
412 fn test_stream_options_default() {
413 let opts = StreamOptions::default();
414 assert_eq!(opts.poll_interval, Duration::from_secs(1));
415 assert_eq!(opts.buffer_size, 16);
416 assert!(!opts.include_errors);
417 }
418
419 #[test]
420 fn test_stream_options_with_interval() {
421 let opts = StreamOptions::with_interval(Duration::from_millis(500));
422 assert_eq!(opts.poll_interval, Duration::from_millis(500));
423 }
424
425 #[test]
426 fn test_stream_options_builder() {
427 let opts = StreamOptions::builder()
428 .poll_interval(Duration::from_secs(5))
429 .buffer_size(32)
430 .include_errors(true)
431 .build();
432
433 assert_eq!(opts.poll_interval, Duration::from_secs(5));
434 assert_eq!(opts.buffer_size, 32);
435 assert!(opts.include_errors);
436 }
437
438 #[test]
439 fn test_stream_options_builder_partial() {
440 // Only set some options, others should be defaults
441 let opts = StreamOptions::builder().include_errors(true).build();
442
443 assert_eq!(opts.poll_interval, Duration::from_secs(1)); // default
444 assert_eq!(opts.buffer_size, 16); // default
445 assert!(opts.include_errors); // set
446 }
447}