use futures_core::Stream;
use crate::filter_stateful_sync;
pub fn filter_distinct_until_changed<T>(last_item: &mut Option<T>, next_item: &T) -> bool
where
T: Clone + PartialEq,
{
if let Some(last_item) = last_item {
if last_item == next_item {
return false;
}
}
*last_item = Some(next_item.clone());
true
}
pub fn distinct_until_changed<S>(stream: S) -> impl Stream<Item = S::Item>
where
S: Stream,
S::Item: Clone + PartialEq,
{
filter_stateful_sync(stream, None, filter_distinct_until_changed)
}
pub fn distinct_until_changed_map<S, T, F>(
stream: S,
initial_value: T,
mut map_fn: F,
) -> impl Stream<Item = S::Item>
where
S: Stream,
F: FnMut(&S::Item) -> T,
T: PartialEq,
{
filter_stateful_sync(stream, initial_value, move |last_value, next_item| {
let next_value = map_fn(next_item);
if *last_value == next_value {
return false;
}
*last_value = next_value;
true
})
}
pub fn filter_distinct_until_changed_ok_result<T, E>(
last_ok: &mut Option<T>,
next_result: &Result<T, E>,
) -> bool
where
T: Clone + PartialEq,
{
if let Ok(next_ok) = &next_result {
if let Some(last_ok) = &last_ok {
if last_ok == next_ok {
return false;
}
}
*last_ok = Some(next_ok.clone());
} else {
*last_ok = None;
}
true
}
pub fn distinct_until_changed_ok_result<S, T, E>(stream: S) -> impl Stream<Item = S::Item>
where
S: Stream<Item = Result<T, E>>,
T: Clone + PartialEq,
{
filter_stateful_sync(stream, None, filter_distinct_until_changed_ok_result)
}
pub fn filter_distinct_until_changed_err_result<T, E>(
last_err: &mut Option<E>,
next_result: &Result<T, E>,
) -> bool
where
E: Clone + PartialEq,
{
if let Err(next_err) = &next_result {
if let Some(last_err) = &last_err {
if last_err == next_err {
return false;
}
}
*last_err = Some(next_err.clone());
} else {
*last_err = None;
}
true
}
pub fn distinct_until_changed_err_result<S, T, E>(stream: S) -> impl Stream<Item = S::Item>
where
S: Stream<Item = Result<T, E>>,
E: Clone + PartialEq,
{
filter_stateful_sync(stream, None, filter_distinct_until_changed_err_result)
}
#[cfg(test)]
mod tests;