1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
/// Extension trait providing the `distinct_until_changed_by` operator for streams.
///
/// This operator filters out consecutive duplicate values using a custom comparison
/// function, emitting only when the value changes from the previous emission according
/// to the provided comparer.
///
/// Use [`DistinctUntilChangedByExt::distinct_until_changed_by`] to use this operator.
///
/// # Behavior
///
/// - First value is always emitted (no previous value to compare)
/// - Subsequent values are compared to the last emitted value using `compare`
/// - Only values where `compare(current, previous) == false` are emitted
/// - The comparer should return `true` if values are considered equal/same
/// - Timestamps are preserved from the original incoming values
/// - Errors are always propagated immediately
///
/// # Arguments
///
/// * `compare` - A function that takes two references to `T::Inner` and returns
/// `true` if they should be considered equal (and thus filtered), or `false`
/// if they are different (and should be emitted).
///
/// # Type Parameters
///
/// * `F` - The comparison function type
///
/// # Examples
///
/// ## Custom Equality by Field
///
/// ```rust
/// use fluxion_stream::{DistinctUntilChangedByExt, IntoFluxionStream};
/// use fluxion_test_utils::Sequenced;
/// use futures::StreamExt;
///
/// #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
/// struct User {
/// id: u32,
/// name: String,
/// }
///
/// # async fn example() {
/// let (tx, rx) = async_channel::unbounded();
/// let stream = rx.into_fluxion_stream();
///
/// // Only care about changes to user ID, ignore name changes
/// let mut distinct = stream.distinct_until_changed_by(|a: &User, b: &User| a.id == b.id);
///
/// tx.try_send(Sequenced::new(User { id: 1, name: "Alice".into() })).unwrap();
/// tx.try_send(Sequenced::new(User { id: 1, name: "Alice Updated".into() })).unwrap(); // Filtered
/// tx.try_send(Sequenced::new(User { id: 2, name: "Bob".into() })).unwrap(); // Emitted (ID changed)
///
/// let first = distinct.next().await.unwrap().unwrap();
/// assert_eq!(first.into_inner().id, 1);
///
/// let second = distinct.next().await.unwrap().unwrap();
/// assert_eq!(second.into_inner().id, 2);
/// # }
/// ```
///
/// ## Case-Insensitive String Comparison
///
/// ```rust
/// use fluxion_stream::{DistinctUntilChangedByExt, IntoFluxionStream};
/// use fluxion_test_utils::Sequenced;
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, rx) = async_channel::unbounded();
/// let stream = rx.into_fluxion_stream();
///
/// // Case-insensitive comparison
/// let mut distinct = stream.distinct_until_changed_by(|a: &String, b: &String| {
/// a.to_lowercase() == b.to_lowercase()
/// });
///
/// tx.try_send(Sequenced::new("hello".to_string())).unwrap();
/// tx.try_send(Sequenced::new("HELLO".to_string())).unwrap(); // Filtered (same ignoring case)
/// tx.try_send(Sequenced::new("world".to_string())).unwrap(); // Emitted
/// tx.try_send(Sequenced::new("World".to_string())).unwrap(); // Filtered
///
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner(), "hello");
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner(), "world");
/// # }
/// ```
///
/// ## Approximate Numerical Comparison
///
/// ```rust
/// use fluxion_stream::{DistinctUntilChangedByExt, IntoFluxionStream};
/// use fluxion_test_utils::Sequenced;
/// use futures::StreamExt;
/// use std::cmp::Ordering;
///
/// // Wrapper to implement Ord for f64 (for testing purposes)
/// #[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
/// struct OrderedF64(f64);
///
/// impl Eq for OrderedF64 {}
///
/// impl Ord for OrderedF64 {
/// fn cmp(&self, other: &Self) -> Ordering {
/// self.0.partial_cmp(&other.0).unwrap_or(Ordering::Equal)
/// }
/// }
///
/// # async fn example() {
/// let (tx, rx) = async_channel::unbounded();
/// let stream = rx.into_fluxion_stream();
///
/// // Only emit if difference is >= 0.1
/// let mut distinct = stream.distinct_until_changed_by(|a: &OrderedF64, b: &OrderedF64| {
/// (a.0 - b.0).abs() < 0.1
/// });
///
/// tx.try_send(Sequenced::new(OrderedF64(1.0))).unwrap();
/// tx.try_send(Sequenced::new(OrderedF64(1.05))).unwrap(); // Filtered (diff < 0.1)
/// tx.try_send(Sequenced::new(OrderedF64(1.15))).unwrap(); // Emitted (diff >= 0.1)
/// tx.try_send(Sequenced::new(OrderedF64(1.18))).unwrap(); // Filtered
/// tx.try_send(Sequenced::new(OrderedF64(1.30))).unwrap(); // Emitted
///
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner().0, 1.0);
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner().0, 1.15);
/// assert_eq!(distinct.next().await.unwrap().unwrap().into_inner().0, 1.30);
/// # }
/// ```
///
/// # Use Cases
///
/// - Comparing complex types by specific fields
/// - Case-insensitive or fuzzy comparisons
/// - Threshold-based filtering (e.g., temperature within range)
/// - Custom domain-specific equality logic
/// - Working with types that don't implement `PartialEq`
///
/// # Performance
///
/// - O(1) time complexity per item (plus the cost of the comparer function)
/// - Stores only the last emitted value
/// - No buffering or lookahead required
///
/// # See Also
///
/// - [`distinct_until_changed`](crate::DistinctUntilChangedExt::distinct_until_changed) - Uses `PartialEq` for comparison
/// - [`filter_ordered`](crate::FilterOrderedExt::filter_ordered) - General filtering
pub use DistinctUntilChangedByExt;
pub use DistinctUntilChangedByExt;