fluxion_stream/types.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Common types and type aliases used throughout the fluxion-stream crate.
6//!
7//! This module centralizes shared types to reduce duplication and improve maintainability.
8
9use alloc::vec::Vec;
10use core::fmt::Debug;
11use fluxion_core::{HasTimestamp, Timestamped};
12
13/// Represents a value paired with its previous value in the stream.
14///
15/// Used by [`CombineWithPreviousExt`](crate::CombineWithPreviousExt) to provide
16/// both current and previous values.
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
18pub struct WithPrevious<T> {
19 /// The previous value in the stream, if any
20 pub previous: Option<T>,
21 /// The current value in the stream
22 pub current: T,
23}
24
25impl<T> WithPrevious<T> {
26 /// Creates a new WithPrevious with the given previous and current values.
27 pub fn new(previous: Option<T>, current: T) -> Self {
28 Self { previous, current }
29 }
30
31 /// Returns true if there is a previous value.
32 pub fn has_previous(&self) -> bool {
33 self.previous.is_some()
34 }
35
36 /// Returns a tuple of references to (previous, current) if previous exists.
37 pub fn as_pair(&self) -> Option<(&T, &T)> {
38 self.previous.as_ref().map(|prev| (prev, &self.current))
39 }
40}
41
42impl<T: Timestamped> HasTimestamp for WithPrevious<T> {
43 type Timestamp = T::Timestamp;
44
45 fn timestamp(&self) -> Self::Timestamp {
46 self.current.timestamp()
47 }
48}
49
50impl<T: Timestamped> Timestamped for WithPrevious<T> {
51 type Inner = T::Inner;
52
53 fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
54 Self {
55 previous: None,
56 current: T::with_timestamp(value, timestamp),
57 }
58 }
59
60 fn into_inner(self) -> Self::Inner {
61 self.current.into_inner()
62 }
63}
64
65/// State container holding the latest values from multiple combined streams.
66///
67/// Used by operators that combine multiple streams such as [`combine_latest`](crate::CombineLatestExt::combine_latest),
68/// [`with_latest_from`](crate::WithLatestFromExt::with_latest_from), and
69/// [`emit_when`](crate::EmitWhenExt::emit_when).
70///
71/// Each value is paired with its original timestamp, enabling detection of
72/// transient states when combining multiple subscribers from the same shared source.
73///
74/// # Examples
75///
76/// ```
77/// use fluxion_stream::CombinedState;
78///
79/// let state = CombinedState::new(vec![(1, 100u64), (2, 100u64), (3, 100u64)], 100u64);
80/// assert_eq!(state.values().len(), 3);
81/// assert_eq!(state.values()[0], 1);
82/// // All timestamps match - this is a stable state
83/// assert!(state.timestamps().iter().all(|ts| *ts == 100));
84/// ```
85#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
86pub struct CombinedState<V, TS = u64>
87where
88 V: Clone + Debug + Ord,
89 TS: Clone + Debug + Ord,
90{
91 /// Values paired with their individual timestamps
92 state: Vec<(V, TS)>,
93 /// The maximum timestamp (for Timestamped trait compatibility)
94 timestamp: TS,
95}
96
97impl<V, TS> CombinedState<V, TS>
98where
99 V: Clone + Debug + Ord,
100 TS: Clone + Debug + Ord,
101{
102 /// Creates a new CombinedState with the given vector of value-timestamp pairs and max timestamp.
103 pub fn new(state: Vec<(V, TS)>, timestamp: TS) -> Self {
104 Self { state, timestamp }
105 }
106
107 /// Returns the values as a vector.
108 ///
109 /// If you need access to individual timestamps, use [`pairs()`](Self::pairs) or
110 /// [`timestamps()`](Self::timestamps) instead.
111 pub fn values(&self) -> Vec<V> {
112 self.state.iter().map(|(v, _)| v.clone()).collect()
113 }
114
115 /// Returns the values as a vector of timestamps.
116 ///
117 pub fn timestamps(&self) -> Vec<TS> {
118 self.state.iter().map(|(_, ts)| ts.clone()).collect()
119 }
120
121 /// Returns a slice of the raw value-timestamp pairs.
122 pub fn pairs(&self) -> &[(V, TS)] {
123 &self.state
124 }
125
126 /// Returns the number of streams in the combined state.
127 pub fn len(&self) -> usize {
128 self.state.len()
129 }
130
131 /// Returns true if there are no streams in the combined state.
132 pub fn is_empty(&self) -> bool {
133 self.state.is_empty()
134 }
135}
136
137impl<V, TS> HasTimestamp for CombinedState<V, TS>
138where
139 V: Clone + Debug + Ord,
140 TS: Clone + Debug + Ord + Copy + Send + Sync,
141{
142 type Timestamp = TS;
143
144 fn timestamp(&self) -> Self::Timestamp {
145 self.timestamp
146 }
147}
148
149impl<V, TS> Timestamped for CombinedState<V, TS>
150where
151 V: Clone + Debug + Ord,
152 TS: Clone + Debug + Ord + Copy + Send + Sync,
153{
154 type Inner = Self;
155
156 fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
157 Self {
158 state: value.state,
159 timestamp,
160 }
161 }
162
163 fn into_inner(self) -> Self::Inner {
164 self
165 }
166}