1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
use super::spi::Mono; use crate::spi::{Publisher, Subscriber, Subscription}; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; pub struct MonoCreate<T, G, E> where G: Fn() -> Result<T, E>, { g: G, } impl<T, G, E> MonoCreate<T, G, E> where G: Fn() -> Result<T, E>, { pub(crate) fn new(gen: G) -> MonoCreate<T, G, E> { MonoCreate { g: gen } } } impl<T, G, E> Mono<T, E> for MonoCreate<T, G, E> where G: Fn() -> Result<T, E> {} impl<T, G, E> Publisher for MonoCreate<T, G, E> where G: Fn() -> Result<T, E>, { type Item = T; type Error = E; fn subscribe(self, subscriber: impl Subscriber<Item = T, Error = E>) { let sub = Rc::new(subscriber); let subs = CreateSubscription::new(self.g, sub.clone()); sub.on_subscribe(subs); } } struct CreateSubscription<T, G, S, E> where G: Fn() -> Result<T, E>, S: Subscriber<Item = T, Error = E>, { g: G, actual: Rc<S>, requested: Arc<AtomicBool>, } impl<T, G, S, E> CreateSubscription<T, G, S, E> where G: Fn() -> Result<T, E>, S: Subscriber<Item = T, Error = E>, { fn new(g: G, actual: Rc<S>) -> CreateSubscription<T, G, S, E> { CreateSubscription { g, actual, requested: Arc::new(AtomicBool::new(false)), } } } impl<T, G, S, E> Subscription for CreateSubscription<T, G, S, E> where G: Fn() -> Result<T, E>, S: Subscriber<Item = T, Error = E>, { fn request(&self, _n: usize) { let locker = self.requested.clone(); if locker.fetch_and(true, Ordering::SeqCst) { warn!("subscription has been requested already!"); } else { match (self.g)() { Ok(v) => { self.actual.on_next(v); self.actual.on_complete(); } Err(e) => { self.actual.on_error(e); } } } } fn cancel(&self) { unimplemented!() } }