1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use crate::{impl_local_shared_both, prelude::*};

pub fn start<Item>(
  func: impl FnOnce() -> Item + 'static,
) -> StartObservable<Item> {
  StartObservable {
    func: Box::new(func),
  }
}

pub struct StartObservable<Item> {
  func: Box<dyn FnOnce() -> Item>,
}

impl<Item> Observable for StartObservable<Item> {
  type Item = Item;
  type Err = ();
}

impl_local_shared_both! {
  impl<Item> StartObservable<Item>;
  type Unsub = SingleSubscription;
  macro method($self: ident, $observer: ident, $ctx: ident) {
    let result = ($self.func)();
    $observer.next(result);
    $observer.complete();
    SingleSubscription::default()
  }
}

#[cfg(test)]
mod tests {
  use crate::prelude::*;
  #[cfg(not(target_arch = "wasm32"))]
  use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
  #[cfg(not(target_arch = "wasm32"))]
  use std::sync::Arc;

  #[test]
  fn it_shall_emit_closure_value() {
    let mut actual = 0;
    let mut is_completed = false;

    observable::start(|| 123).subscribe_all(
      |n| actual = n,
      |_| {},
      || is_completed = true,
    );

    assert_eq!(123, actual);
    assert!(is_completed);
  }

  #[cfg(not(target_arch = "wasm32"))]
  #[test]
  fn it_shall_emit_closure_value_shared() {
    let actual = Arc::new(AtomicI32::new(0));
    let is_completed = Arc::new(AtomicBool::new(false));

    let actual_c = actual.clone();
    let is_completed_c = is_completed.clone();
    observable::start(|| 123)
      .into_shared()
      .subscribe_blocking_all(
        move |n| actual_c.store(n, Ordering::Relaxed),
        |_| {},
        move || is_completed_c.store(true, Ordering::Relaxed),
      );

    assert_eq!(123, actual.load(Ordering::Relaxed));
    assert!(is_completed.load(Ordering::Relaxed));
  }

  #[derive(PartialEq, Debug)]
  struct S {
    i: i32,
    f: f32,
    s: String,
  }

  fn function() -> S {
    S {
      i: 1,
      f: 2.5,
      s: String::from("aString"),
    }
  }

  #[test]
  fn it_shall_emit_function_value() {
    let expected = S {
      i: 1,
      f: 2.5,
      s: String::from("aString"),
    };
    let mut actual = S {
      i: 0,
      f: 0.0,
      s: String::new(),
    };
    let mut is_completed = false;

    observable::start(function).subscribe_all(
      |n| actual = n,
      |_| {},
      || is_completed = true,
    );

    assert_eq!(expected, actual);
    assert!(is_completed);
  }
}