use crate::{
context::Context, observable::CoreObservable, observer::Observer, prelude::ObservableType,
subscription::Subscription,
};
#[derive(Clone)]
pub struct Pairwise<S> {
pub source: S,
}
impl<S: ObservableType> ObservableType for Pairwise<S> {
type Item<'a>
= (S::Item<'a>, S::Item<'a>)
where
Self: 'a;
type Err = S::Err;
}
pub struct PairwiseObserver<O, Item> {
observer: O,
prev: Option<Item>,
}
impl<O, Item, Err> Observer<Item, Err> for PairwiseObserver<O, Item>
where
O: Observer<(Item, Item), Err>,
Item: Clone,
{
fn next(&mut self, value: Item) {
if let Some(prev_value) = self.prev.take() {
self.observer.next((prev_value, value.clone()));
}
self.prev = Some(value);
}
fn error(self, e: Err) { self.observer.error(e); }
fn complete(self) { self.observer.complete(); }
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
impl<S, C, Unsub> CoreObservable<C> for Pairwise<S>
where
C: Context,
S: for<'a> CoreObservable<
C::With<PairwiseObserver<C::Inner, <S as ObservableType>::Item<'a>>>,
Unsub = Unsub,
>,
Unsub: Subscription,
{
type Unsub = Unsub;
fn subscribe(self, context: C) -> Self::Unsub {
let wrapped = context.transform(|observer| PairwiseObserver { observer, prev: None });
self.source.subscribe(wrapped)
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test]
fn test_pairwise_basic() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([1, 2, 3, 4, 5])
.pairwise()
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(*result.borrow(), vec![(1, 2), (2, 3), (3, 4), (4, 5)]);
}
#[rxrust_macro::test]
fn test_pairwise_single_value() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([42])
.pairwise()
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(*result.borrow(), vec![]);
}
#[rxrust_macro::test]
fn test_pairwise_empty() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([] as [i32; 0])
.pairwise()
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(*result.borrow(), vec![]);
}
#[rxrust_macro::test]
fn test_pairwise_two_values() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([10, 20])
.pairwise()
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(*result.borrow(), vec![(10, 20)]);
}
#[rxrust_macro::test]
fn test_pairwise_with_strings() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter(["hello".to_string(), "world".to_string(), "test".to_string()])
.pairwise()
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(
*result.borrow(),
vec![("hello".to_string(), "world".to_string()), ("world".to_string(), "test".to_string())]
);
}
#[rxrust_macro::test]
fn test_pairwise_error_propagation() {
let result = Rc::new(RefCell::new(Vec::new()));
let error = Rc::new(RefCell::new(String::new()));
let result_clone = result.clone();
let error_clone = error.clone();
Local::throw_err("test error".to_string())
.pairwise()
.on_error(move |e| {
*error_clone.borrow_mut() = e;
})
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(*result.borrow(), vec![]);
assert_eq!(*error.borrow(), "test error");
}
#[rxrust_macro::test]
fn test_pairwise_completion() {
let result = Rc::new(RefCell::new(Vec::new()));
let completed = Rc::new(RefCell::new(false));
let result_clone = result.clone();
let completed_clone = completed.clone();
Local::from_iter([1, 2, 3])
.pairwise()
.on_complete(move || {
*completed_clone.borrow_mut() = true;
})
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(*result.borrow(), vec![(1, 2), (2, 3)]);
assert!(*completed.borrow());
}
#[rxrust_macro::test]
fn test_pairwise_chaining() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([1, 2, 3, 4])
.pairwise()
.map(|(a, b)| a + b)
.subscribe(move |sum| {
result_clone.borrow_mut().push(sum);
});
assert_eq!(*result.borrow(), vec![3, 5, 7]);
}
#[rxrust_macro::test]
fn test_pairwise_take_after_pairwise() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([1, 2, 3, 4, 5])
.pairwise()
.take(2)
.subscribe(move |pair| {
result_clone.borrow_mut().push(pair);
});
assert_eq!(*result.borrow(), vec![(1, 2), (2, 3)]);
}
}