use drumbeat::event::observable::Observable;
use drumbeat::event::ops::*;
use drumbeat::event::scheduler::SchedulerType;
use drumbeat::event::subject::{BasicSubject, BasicSubjectBuilder, Subject};
use drumbeat::utils::testing;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::{Arc, Mutex};
#[test]
fn simple_event_test() {
testing::async_context(|| {
let basic = BasicSubject::new();
let finalize = Arc::new(AtomicBool::new(false));
let (tx, rx) = std::sync::mpsc::channel();
let tx = Mutex::new(tx.clone());
{
let cloned = finalize.clone();
let _subscription = basic
.observe()
.pipe()
.subscribe(move |x| {
tx.lock().unwrap().send(x).unwrap();
})
.finalize(move || {
cloned.store(true, Ordering::Relaxed);
});
basic.next("test".to_owned());
assert_eq!(rx.recv().unwrap(), "test");
assert_eq!(finalize.load(Ordering::Relaxed), false);
}
assert_eq!(finalize.load(Ordering::Relaxed), true);
});
}
#[test]
fn take_observable_of_test() {
testing::async_context(|| {
let sum = Arc::new(AtomicI32::new(0));
let subscribe_sum = sum.clone();
let finalize_sum = sum.clone();
let (tx, rx) = std::sync::mpsc::channel();
let tx = Mutex::new(tx.clone());
let observable = Observable::of(vec![1, 2, 3]);
{
let _subscription = observable
.pipe()
.take(3)
.subscribe(move |x| {
subscribe_sum.fetch_add(x, Ordering::Relaxed);
})
.finalize(move || {
tx.lock().unwrap().send(finalize_sum.load(Ordering::Relaxed)).unwrap();
});
assert_eq!(rx.recv().unwrap(), 6);
}
});
}
#[test]
fn first_subject_test() {
testing::async_context(|| {
let sum = Arc::new(AtomicI32::new(0));
let subscribe_sum = sum.clone();
let finalize_sum = sum.clone();
let (tx, rx) = std::sync::mpsc::channel();
let tx = Mutex::new(tx.clone());
let subject = BasicSubjectBuilder::new().scheduler(SchedulerType::Blocking).build();
{
let _subscription = subject
.observe()
.pipe()
.first()
.subscribe(move |x| {
subscribe_sum.fetch_add(x, Ordering::Relaxed);
})
.finalize(move || {
tx.lock().unwrap().send(finalize_sum.load(Ordering::Relaxed)).unwrap();
});
subject.next(5)
}
assert_eq!(rx.recv().unwrap(), 5);
});
}
#[test]
fn map_event_to_string_test() {
testing::async_context(|| {
let subject = BasicSubject::new();
let rx = subject.observe().pipe().take(3).map(|x| format!("{}_", x)).collect();
subject.next("abc");
subject.next("def");
subject.next("ghi");
let mut result = rx.recv().unwrap();
result.sort();
assert_eq!(result, ["abc_", "def_", "ghi_"]);
});
}
#[test]
fn complex_pipe_test() {
testing::async_context(|| {
let subject = BasicSubjectBuilder::new()
.scheduler(SchedulerType::Worker)
.build::<String>();
let rx = subject
.observe()
.pipe()
.take(3)
.filter(|x| x.len() > 1)
.map(|x| format!("{}_", x))
.skip(1)
.collect();
subject.next("A".to_owned());
subject.next("BBB".to_owned());
subject.next("CCC".to_owned());
assert_eq!(rx.recv().unwrap(), ["CCC_"]);
});
}
#[test]
fn multi_observable_test() {
testing::async_context(|| {
let subject = BasicSubjectBuilder::new().scheduler(SchedulerType::Worker).build();
let done = BasicSubjectBuilder::new().scheduler(SchedulerType::Blocking).build();
let list = Arc::new(Mutex::new(Vec::new()));
let cloned = list.clone();
let (tx, rx) = std::sync::mpsc::channel();
let tx = Mutex::new(tx);
let done_clone = done.clone();
let _subscription = subject
.observe()
.pipe()
.take_until(done.observe())
.tap(move |x| {
if x == "C" {
done_clone.next(());
}
})
.map(|x| format!("{}+", x))
.subscribe(move |x| {
cloned.lock().unwrap().push(x);
})
.finalize(move || {
tx.lock().unwrap().send(()).unwrap();
});
subject.next("A".to_owned());
subject.next("B".to_owned());
subject.next("C".to_owned());
subject.next("D".to_owned());
subject.next("E".to_owned());
subject.next("F".to_owned());
rx.recv().unwrap();
list.lock().unwrap().sort();
assert_eq!(*list.lock().unwrap(), ["A+", "B+", "C+"]);
});
}