fluxion_stream/
types.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Common types and type aliases used throughout the fluxion-stream crate.
6//!
7//! This module centralizes shared types to reduce duplication and improve maintainability.
8
9use alloc::vec::Vec;
10use core::fmt::Debug;
11use fluxion_core::{HasTimestamp, Timestamped};
12
13/// Represents a value paired with its previous value in the stream.
14///
15/// Used by [`CombineWithPreviousExt`](crate::CombineWithPreviousExt) to provide
16/// both current and previous values.
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
18pub struct WithPrevious<T> {
19    /// The previous value in the stream, if any
20    pub previous: Option<T>,
21    /// The current value in the stream
22    pub current: T,
23}
24
25impl<T> WithPrevious<T> {
26    /// Creates a new WithPrevious with the given previous and current values.
27    pub fn new(previous: Option<T>, current: T) -> Self {
28        Self { previous, current }
29    }
30
31    /// Returns true if there is a previous value.
32    pub fn has_previous(&self) -> bool {
33        self.previous.is_some()
34    }
35
36    /// Returns a tuple of references to (previous, current) if previous exists.
37    pub fn as_pair(&self) -> Option<(&T, &T)> {
38        self.previous.as_ref().map(|prev| (prev, &self.current))
39    }
40}
41
42impl<T: Timestamped> HasTimestamp for WithPrevious<T> {
43    type Timestamp = T::Timestamp;
44
45    fn timestamp(&self) -> Self::Timestamp {
46        self.current.timestamp()
47    }
48}
49
50impl<T: Timestamped> Timestamped for WithPrevious<T> {
51    type Inner = T::Inner;
52
53    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
54        Self {
55            previous: None,
56            current: T::with_timestamp(value, timestamp),
57        }
58    }
59
60    fn into_inner(self) -> Self::Inner {
61        self.current.into_inner()
62    }
63}
64
65/// State container holding the latest values from multiple combined streams.
66///
67/// Used by operators that combine multiple streams such as [`combine_latest`](crate::CombineLatestExt::combine_latest),
68/// [`with_latest_from`](crate::WithLatestFromExt::with_latest_from), and
69/// [`emit_when`](crate::EmitWhenExt::emit_when).
70///
71/// Each value is paired with its original timestamp, enabling detection of
72/// transient states when combining multiple subscribers from the same shared source.
73///
74/// # Examples
75///
76/// ```
77/// use fluxion_stream::CombinedState;
78///
79/// let state = CombinedState::new(vec![(1, 100u64), (2, 100u64), (3, 100u64)], 100u64);
80/// assert_eq!(state.values().len(), 3);
81/// assert_eq!(state.values()[0], 1);
82/// // All timestamps match - this is a stable state
83/// assert!(state.timestamps().iter().all(|ts| *ts == 100));
84/// ```
85#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
86pub struct CombinedState<V, TS = u64>
87where
88    V: Clone + Debug + Ord,
89    TS: Clone + Debug + Ord,
90{
91    /// Values paired with their individual timestamps
92    state: Vec<(V, TS)>,
93    /// The maximum timestamp (for Timestamped trait compatibility)
94    timestamp: TS,
95}
96
97impl<V, TS> CombinedState<V, TS>
98where
99    V: Clone + Debug + Ord,
100    TS: Clone + Debug + Ord,
101{
102    /// Creates a new CombinedState with the given vector of value-timestamp pairs and max timestamp.
103    pub fn new(state: Vec<(V, TS)>, timestamp: TS) -> Self {
104        Self { state, timestamp }
105    }
106
107    /// Returns the values as a vector.
108    ///
109    /// If you need access to individual timestamps, use [`pairs()`](Self::pairs) or
110    /// [`timestamps()`](Self::timestamps) instead.
111    pub fn values(&self) -> Vec<V> {
112        self.state.iter().map(|(v, _)| v.clone()).collect()
113    }
114
115    /// Returns the values as a vector of timestamps.
116    ///
117    pub fn timestamps(&self) -> Vec<TS> {
118        self.state.iter().map(|(_, ts)| ts.clone()).collect()
119    }
120
121    /// Returns a slice of the raw value-timestamp pairs.
122    pub fn pairs(&self) -> &[(V, TS)] {
123        &self.state
124    }
125
126    /// Returns the number of streams in the combined state.
127    pub fn len(&self) -> usize {
128        self.state.len()
129    }
130
131    /// Returns true if there are no streams in the combined state.
132    pub fn is_empty(&self) -> bool {
133        self.state.is_empty()
134    }
135}
136
137impl<V, TS> HasTimestamp for CombinedState<V, TS>
138where
139    V: Clone + Debug + Ord,
140    TS: Clone + Debug + Ord + Copy + Send + Sync,
141{
142    type Timestamp = TS;
143
144    fn timestamp(&self) -> Self::Timestamp {
145        self.timestamp
146    }
147}
148
149impl<V, TS> Timestamped for CombinedState<V, TS>
150where
151    V: Clone + Debug + Ord,
152    TS: Clone + Debug + Ord + Copy + Send + Sync,
153{
154    type Inner = Self;
155
156    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
157        Self {
158            state: value.state,
159            timestamp,
160        }
161    }
162
163    fn into_inner(self) -> Self::Inner {
164        self
165    }
166}