use crate::prelude::*;
#[macro_export]
macro_rules! of_sequence {
( $( $item:expr ),* ) => {
{
$crate::observable::create(|mut s| {
$(
s.next($item);
)*
s.complete();
})
}
}
}
pub fn of<Item>(v: Item) -> ObservableBase<OfEmitter<Item>> {
ObservableBase::new(OfEmitter(v))
}
#[derive(Clone)]
pub struct OfEmitter<Item>(pub(crate) Item);
#[doc(hidden)]
macro_rules! of_emitter {
($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
where
O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
{
subscriber.next(self.0);
subscriber.complete();
}
}
}
impl<Item> Emitter for OfEmitter<Item> {
type Item = Item;
type Err = ();
}
impl<'a, Item> LocalEmitter<'a> for OfEmitter<Item> {
of_emitter!(LocalSubscription, 'a);
}
impl<Item> SharedEmitter for OfEmitter<Item> {
of_emitter!(SharedSubscription, Send + Sync + 'static);
}
pub fn of_result<Item, Err>(
r: Result<Item, Err>,
) -> ObservableBase<ResultEmitter<Item, Err>> {
ObservableBase::new(ResultEmitter(r))
}
#[doc(hidden)]
macro_rules! of_result_emitter {
($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
where
O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
{
match self.0 {
Ok(v) => subscriber.next(v),
Err(e) => subscriber.error(e),
};
subscriber.complete();
}
}
}
#[derive(Clone)]
pub struct ResultEmitter<Item, Err>(pub(crate) Result<Item, Err>);
impl<Item, Err> Emitter for ResultEmitter<Item, Err> {
type Item = Item;
type Err = Err;
}
impl<'a, Item, Err> LocalEmitter<'a> for ResultEmitter<Item, Err> {
of_result_emitter!(LocalSubscription, 'a);
}
impl<Item, Err> SharedEmitter for ResultEmitter<Item, Err> {
of_result_emitter!(SharedSubscription, Send + Sync + 'static);
}
pub fn of_option<Item>(o: Option<Item>) -> ObservableBase<OptionEmitter<Item>> {
ObservableBase::new(OptionEmitter(o))
}
#[derive(Clone)]
pub struct OptionEmitter<Item>(pub(crate) Option<Item>);
#[doc(hidden)]
macro_rules! of_option_emitter {
($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
where
O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
{
if let Some(v) = self.0 {
subscriber.next(v)
}
subscriber.complete();
}
}
}
impl<Item> Emitter for OptionEmitter<Item> {
type Item = Item;
type Err = ();
}
impl<'a, Item> LocalEmitter<'a> for OptionEmitter<Item> {
of_option_emitter!(LocalSubscription, 'a);
}
impl<Item> SharedEmitter for OptionEmitter<Item> {
of_option_emitter!(SharedSubscription, Send + Sync + 'static);
}
pub fn of_fn<F, Item>(f: F) -> ObservableBase<CallableEmitter<F>>
where
F: FnOnce() -> Item,
{
ObservableBase::new(CallableEmitter(f))
}
#[derive(Clone)]
pub struct CallableEmitter<F>(F);
#[doc(hidden)]
macro_rules! of_fn_emitter {
($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
fn emit<O>(self, mut subscriber: Subscriber<O, $subscription>)
where
O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf
{
subscriber.next((self.0)());
subscriber.complete();
}
}
}
impl<Item, F> Emitter for CallableEmitter<F>
where
F: FnOnce() -> Item,
{
type Item = Item;
type Err = ();
}
impl<'a, Item, F> LocalEmitter<'a> for CallableEmitter<F>
where
F: FnOnce() -> Item,
{
of_fn_emitter!(LocalSubscription, 'a);
}
impl<Item, F> SharedEmitter for CallableEmitter<F>
where
F: FnOnce() -> Item,
{
of_fn_emitter!(SharedSubscription, Send + Sync + 'static);
}
#[cfg(test)]
mod test {
use crate::prelude::*;
#[test]
fn from_fn() {
let mut value = 0;
let mut completed = false;
let callable = || 123;
observable::of_fn(callable).subscribe_complete(
|v| {
value = v;
},
|| completed = true,
);
assert_eq!(value, 123);
assert!(completed);
}
#[test]
fn of_option() {
let mut value1 = 0;
let mut completed1 = false;
observable::of_option(Some(123)).subscribe_complete(
|v| {
value1 = v;
},
|| completed1 = true,
);
assert_eq!(value1, 123);
assert!(completed1);
let mut value2 = 0;
let mut completed2 = false;
observable::of_option(None).subscribe_complete(
|v| {
value2 = v;
},
|| completed2 = true,
);
assert_eq!(value2, 0);
assert!(completed2);
}
#[test]
fn of_result() {
let mut value1 = 0;
let mut completed1 = false;
let r: Result<i32, &str> = Ok(123);
observable::of_result(r).subscribe_all(
|v| {
value1 = v;
},
|_| {},
|| completed1 = true,
);
assert_eq!(value1, 123);
assert!(completed1);
let mut value2 = 0;
let mut error_reported = false;
let r: Result<i32, &str> = Err("error");
observable::of_result(r)
.subscribe_err(|_| value2 = 123, |_| error_reported = true);
assert_eq!(value2, 0);
assert!(error_reported);
}
#[test]
fn of() {
let mut value = 0;
let mut completed = false;
observable::of(100).subscribe_complete(|v| value = v, || completed = true);
assert_eq!(value, 100);
assert!(completed);
}
#[test]
fn of_macros() {
let mut value = 0;
of_sequence!(1, 2, 3).subscribe(|v| value += v);
assert_eq!(value, 6);
}
#[test]
fn bench() { do_bench(); }
benchmark_group!(do_bench, bench_of);
fn bench_of(b: &mut bencher::Bencher) { b.iter(of); }
}