1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
/// Extension trait providing the `distinct_until_changed` operator for streams.
///
/// This operator filters out consecutive duplicate values, emitting only when
/// the value changes from the previous emission.
///
/// Use [`DistinctUntilChangedExt::distinct_until_changed`] to use this operator.
///
/// # Behavior
///
/// - First value is always emitted (no previous value to compare)
/// - Subsequent values are compared to the last emitted value
/// - Only values where `current != previous` are emitted
/// - Timestamps are preserved from the original incoming values
/// - Errors are always propagated immediately
///
/// # Examples
///
/// ## Basic Deduplication
///
/// ```rust
/// use fluxion_stream::{DistinctUntilChangedExt, IntoFluxionStream};
/// use fluxion_test_utils::Sequenced;
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, rx) = async_channel::unbounded();
/// let stream = rx.into_fluxion_stream();
///
/// let mut distinct = stream.distinct_until_changed();
///
/// // Send: 1, 1, 2, 2, 2, 3, 2
/// tx.try_send(Sequenced::new(1)).unwrap();
/// tx.try_send(Sequenced::new(1)).unwrap(); // Filtered
/// tx.try_send(Sequenced::new(2)).unwrap();
/// tx.try_send(Sequenced::new(2)).unwrap(); // Filtered
/// tx.try_send(Sequenced::new(2)).unwrap(); // Filtered
/// tx.try_send(Sequenced::new(3)).unwrap();
/// tx.try_send(Sequenced::new(2)).unwrap(); // Emitted (different from 3)
///
/// // Output: 1, 2, 3, 2
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner(), 1);
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner(), 2);
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner(), 3);
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner(), 2);
/// # }
/// ```
///
/// ## Toggle/State Change Detection
///
/// Detect when a boolean state changes:
///
/// ```rust
/// use fluxion_stream::{DistinctUntilChangedExt, IntoFluxionStream};
/// use fluxion_test_utils::Sequenced;
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, rx) = async_channel::unbounded();
/// let stream = rx.into_fluxion_stream();
///
/// let mut changes = stream.distinct_until_changed();
///
/// // Simulate toggle events
/// tx.try_send(Sequenced::new(false)).unwrap(); // Initial state
/// tx.try_send(Sequenced::new(false)).unwrap(); // No change
/// tx.try_send(Sequenced::new(true)).unwrap(); // Changed!
/// tx.try_send(Sequenced::new(true)).unwrap(); // No change
/// tx.try_send(Sequenced::new(false)).unwrap(); // Changed!
///
/// // Only state transitions are emitted
/// assert!(!changes.next().await.unwrap().unwrap().into_inner());
/// assert!(changes.next().await.unwrap().unwrap().into_inner());
/// assert!(!changes.next().await.unwrap().unwrap().into_inner());
/// # }
/// ```
///
/// ## Timestamp Preservation
///
/// Timestamps are preserved from the original incoming values:
///
/// ```rust
/// use fluxion_stream::{DistinctUntilChangedExt, IntoFluxionStream};
/// use fluxion_test_utils::Sequenced;
/// use fluxion_core::HasTimestamp;
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, rx) = async_channel::unbounded();
/// let stream = rx.into_fluxion_stream();
///
/// let mut distinct = stream.distinct_until_changed();
///
/// tx.try_send(Sequenced::new(1)).unwrap();
/// let first = distinct.next().await.unwrap().unwrap();
/// let ts1 = first.timestamp();
///
/// tx.try_send(Sequenced::new(1)).unwrap(); // Filtered (duplicate)
/// tx.try_send(Sequenced::new(2)).unwrap(); // Emitted with its original timestamp
///
/// let second = distinct.next().await.unwrap().unwrap();
/// let ts2 = second.timestamp();
///
/// // Timestamps come from the original Sequenced values
/// assert!(ts2 > ts1);
/// # }
/// ```
///
/// # Use Cases
///
/// - Debouncing repeated sensor readings
/// - Detecting state transitions (on/off, connected/disconnected)
/// - Filtering redundant UI updates
/// - Change detection in data streams
/// - Rate limiting when values don't change
///
/// # Performance
///
/// - O(1) time complexity per item
/// - Stores only the last emitted value
/// - No buffering or lookahead required
///
/// # See Also
///
/// - [`filter_ordered`](crate::FilterOrderedExt::filter_ordered) - General filtering
/// - [`take_while_with`](crate::TakeWhileExt::take_while_with) - Conditional stream termination
pub use DistinctUntilChangedExt;
pub use DistinctUntilChangedExt;