fluxion_stream/
types.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Common types and type aliases used throughout the fluxion-stream crate.
6//!
7//! This module centralizes shared types to reduce duplication and improve maintainability.
8
9use fluxion_core::{HasTimestamp, Timestamped};
10use std::fmt::Debug;
11
12/// Represents a value paired with its previous value in the stream.
13///
14/// Used by [`CombineWithPreviousExt`](crate::CombineWithPreviousExt) to provide
15/// both current and previous values.
16#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
17pub struct WithPrevious<T> {
18    /// The previous value in the stream, if any
19    pub previous: Option<T>,
20    /// The current value in the stream
21    pub current: T,
22}
23
24impl<T> WithPrevious<T> {
25    /// Creates a new WithPrevious with the given previous and current values.
26    pub fn new(previous: Option<T>, current: T) -> Self {
27        Self { previous, current }
28    }
29
30    /// Returns true if there is a previous value.
31    pub fn has_previous(&self) -> bool {
32        self.previous.is_some()
33    }
34
35    /// Returns a tuple of references to (previous, current) if previous exists.
36    pub fn as_pair(&self) -> Option<(&T, &T)> {
37        self.previous.as_ref().map(|prev| (prev, &self.current))
38    }
39}
40
41impl<T: Timestamped> HasTimestamp for WithPrevious<T> {
42    type Inner = T::Inner;
43    type Timestamp = T::Timestamp;
44
45    fn timestamp(&self) -> Self::Timestamp {
46        self.current.timestamp()
47    }
48}
49
50impl<T: Timestamped> Timestamped for WithPrevious<T> {
51    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
52        Self {
53            previous: None,
54            current: T::with_timestamp(value, timestamp),
55        }
56    }
57
58    fn with_fresh_timestamp(value: Self::Inner) -> Self {
59        Self {
60            previous: None,
61            current: T::with_fresh_timestamp(value),
62        }
63    }
64
65    fn into_inner(self) -> Self::Inner {
66        self.current.into_inner()
67    }
68}
69
70/// State container holding the latest values from multiple combined streams.
71///
72/// Used by operators that combine multiple streams such as [`combine_latest`](crate::CombineLatestExt::combine_latest),
73/// [`with_latest_from`](crate::WithLatestFromExt::with_latest_from), and
74/// [`emit_when`](crate::EmitWhenExt::emit_when).
75///
76/// # Examples
77///
78/// ```
79/// use fluxion_stream::CombinedState;
80///
81/// let state = CombinedState::new(vec![1, 2, 3], 0);
82/// assert_eq!(state.values().len(), 3);
83/// assert_eq!(state.values()[0], 1);
84/// ```
85#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
86pub struct CombinedState<V, TS = u64>
87where
88    V: Clone + Debug + Ord,
89    TS: Clone + Debug + Ord,
90{
91    state: Vec<V>,
92    timestamp: TS,
93}
94
95impl<V, TS> CombinedState<V, TS>
96where
97    V: Clone + Debug + Ord,
98    TS: Clone + Debug + Ord,
99{
100    /// Creates a new CombinedState with the given vector of values and timestamp.
101    pub fn new(state: Vec<V>, timestamp: TS) -> Self {
102        Self { state, timestamp }
103    }
104
105    /// Returns a reference to the internal values vector.
106    pub fn values(&self) -> &Vec<V> {
107        &self.state
108    }
109
110    /// Returns the number of streams in the combined state.
111    pub fn len(&self) -> usize {
112        self.state.len()
113    }
114
115    /// Returns true if there are no streams in the combined state.
116    pub fn is_empty(&self) -> bool {
117        self.state.is_empty()
118    }
119}
120
121impl<V, TS> HasTimestamp for CombinedState<V, TS>
122where
123    V: Clone + Debug + Ord,
124    TS: Clone + Debug + Ord + Copy + Send + Sync,
125{
126    type Inner = Self;
127    type Timestamp = TS;
128
129    fn timestamp(&self) -> Self::Timestamp {
130        self.timestamp
131    }
132}
133
134impl<V, TS> Timestamped for CombinedState<V, TS>
135where
136    V: Clone + Debug + Ord,
137    TS: Clone + Debug + Ord + Copy + Send + Sync,
138{
139    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
140        Self {
141            state: value.state,
142            timestamp,
143        }
144    }
145
146    fn with_fresh_timestamp(value: Self::Inner) -> Self {
147        // For now, recycle the timestamp from the value itself
148        // Later we can discuss whether to create a fresh one or use one from aggregated events
149        value
150    }
151
152    fn into_inner(self) -> Self::Inner {
153        self
154    }
155}