1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use super::spi::Mono;
use crate::spi::{Publisher, Subscriber, Subscription};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

pub struct MonoCreate<T, G, E>
where
  G: Fn() -> Result<T, E>,
{
  g: G,
}

impl<T, G, E> MonoCreate<T, G, E>
where
  G: Fn() -> Result<T, E>,
{
  pub(crate) fn new(gen: G) -> MonoCreate<T, G, E> {
    MonoCreate { g: gen }
  }
}

impl<T, G, E> Mono<T, E> for MonoCreate<T, G, E> where G: Fn() -> Result<T, E> {}

impl<T, G, E> Publisher for MonoCreate<T, G, E>
where
  G: Fn() -> Result<T, E>,
{
  type Item = T;
  type Error = E;

  fn subscribe(self, subscriber: impl Subscriber<Item = T, Error = E>) {
    let sub = Rc::new(subscriber);
    let subs = CreateSubscription::new(self.g, sub.clone());
    sub.on_subscribe(subs);
  }
}

struct CreateSubscription<T, G, S, E>
where
  G: Fn() -> Result<T, E>,
  S: Subscriber<Item = T, Error = E>,
{
  g: G,
  actual: Rc<S>,
  requested: Arc<AtomicBool>,
}

impl<T, G, S, E> CreateSubscription<T, G, S, E>
where
  G: Fn() -> Result<T, E>,
  S: Subscriber<Item = T, Error = E>,
{
  fn new(g: G, actual: Rc<S>) -> CreateSubscription<T, G, S, E> {
    CreateSubscription {
      g,
      actual,
      requested: Arc::new(AtomicBool::new(false)),
    }
  }
}

impl<T, G, S, E> Subscription for CreateSubscription<T, G, S, E>
where
  G: Fn() -> Result<T, E>,
  S: Subscriber<Item = T, Error = E>,
{
  fn request(&self, _n: usize) {
    let locker = self.requested.clone();
    if locker.fetch_and(true, Ordering::SeqCst) {
      warn!("subscription has been requested already!");
    } else {
      match (self.g)() {
        Ok(v) => {
          self.actual.on_next(v);
          self.actual.on_complete();
        }
        Err(e) => {
          self.actual.on_error(e);
        }
      }
    }
  }

  fn cancel(&self) {
    unimplemented!()
  }
}