use crate::{
context::Context,
observable::{CoreObservable, ObservableType},
observer::Observer,
};
#[derive(Clone)]
pub struct TakeWhile<S, P> {
pub source: S,
pub predicate: P,
pub inclusive: bool,
}
impl<S: ObservableType, P> ObservableType for TakeWhile<S, P> {
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
pub struct TakeWhileObserver<O, P> {
observer: Option<O>,
predicate: P,
inclusive: bool,
}
impl<O, P, Item, Err> Observer<Item, Err> for TakeWhileObserver<O, P>
where
O: Observer<Item, Err>,
P: FnMut(&Item) -> bool,
{
fn next(&mut self, v: Item) {
if let Some(ref mut observer) = self.observer {
if (self.predicate)(&v) {
observer.next(v);
} else {
if self.inclusive {
observer.next(v);
}
if let Some(observer) = self.observer.take() {
observer.complete();
}
}
}
}
fn error(self, e: Err) {
if let Some(observer) = self.observer {
observer.error(e);
}
}
fn complete(self) {
if let Some(observer) = self.observer {
observer.complete();
}
}
fn is_closed(&self) -> bool {
self
.observer
.as_ref()
.is_none_or(|o| o.is_closed())
}
}
impl<S, P, C> CoreObservable<C> for TakeWhile<S, P>
where
C: Context,
S: CoreObservable<C::With<TakeWhileObserver<C::Inner, P>>>,
{
type Unsub = S::Unsub;
fn subscribe(self, context: C) -> Self::Unsub {
let TakeWhile { source, predicate, inclusive } = self;
let wrapped = context.transform(|observer| TakeWhileObserver {
observer: Some(observer),
predicate,
inclusive,
});
source.subscribe(wrapped)
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test]
fn test_take_while_basic() {
let result = Rc::new(RefCell::new(Vec::new()));
let completed = Rc::new(RefCell::new(false));
let result_clone = result.clone();
let completed_clone = completed.clone();
Local::from_iter([1, 2, 3, 4, 5])
.take_while(|v| *v < 4)
.on_complete(move || {
*completed_clone.borrow_mut() = true;
})
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1, 2, 3]);
assert!(*completed.borrow());
}
#[rxrust_macro::test]
fn test_take_while_inclusive() {
let result = Rc::new(RefCell::new(Vec::new()));
let completed = Rc::new(RefCell::new(false));
let result_clone = result.clone();
let completed_clone = completed.clone();
Local::from_iter([1, 2, 3, 4, 5])
.take_while_inclusive(|v| *v < 4)
.on_complete(move || {
*completed_clone.borrow_mut() = true;
})
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1, 2, 3, 4]);
assert!(*completed.borrow());
}
#[rxrust_macro::test]
fn test_take_while_all_pass() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([1, 2, 3])
.take_while(|v| *v < 100)
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1, 2, 3]);
}
#[rxrust_macro::test]
fn test_take_while_none_pass() {
let result = Rc::new(RefCell::new(Vec::new()));
let completed = Rc::new(RefCell::new(false));
let result_clone = result.clone();
let completed_clone = completed.clone();
Local::from_iter([10, 20, 30])
.take_while(|v| *v < 5)
.on_complete(move || {
*completed_clone.borrow_mut() = true;
})
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), Vec::<i32>::new());
assert!(*completed.borrow());
}
#[rxrust_macro::test]
fn test_take_while_chaining() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.take_while(|v| *v < 8)
.take_while(|v| *v < 5)
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![1, 2, 3, 4]);
}
#[rxrust_macro::test]
fn test_take_while_error_propagation() {
let error = Rc::new(RefCell::new(String::new()));
let error_clone = error.clone();
Local::throw_err("test error".to_string())
.take_while(|_: &()| true)
.on_error(move |e| {
*error_clone.borrow_mut() = e;
})
.subscribe(|_| {});
assert_eq!(*error.borrow(), "test error");
}
#[rxrust_macro::test]
fn test_take_while_with_map() {
let result = Rc::new(RefCell::new(Vec::new()));
let result_clone = result.clone();
Local::from_iter([1, 2, 3, 4, 5])
.map(|v| v * 2)
.take_while(|v| *v < 8)
.subscribe(move |v| {
result_clone.borrow_mut().push(v);
});
assert_eq!(*result.borrow(), vec![2, 4, 6]);
}
}