#![allow(clippy::needless_doctest_main)]
mod background_unsubscribe;
pub mod multicast;
pub mod timestamp;
use std::{
collections::VecDeque,
error::Error,
sync::{mpsc::RecvTimeoutError, Arc, Mutex},
time::Duration,
};
use crate::{observer::Observer, subscribe::Fuse, subscription::subscribe::UnsubscribeLogic};
use crate::{
subscribe::SubscriptionCollection,
subscription::subscribe::{
Subscribeable, Subscriber, Subscription, SubscriptionHandle, Unsubscribeable,
},
TimestampedEmit,
};
use self::{background_unsubscribe::setup_unsubscribe_channel, multicast::Connectable};
enum EmittedValue<T> {
Success(T),
Complete,
Error(Arc<dyn std::error::Error + Send + Sync>),
}
pub trait Notifier {
type Item;
fn into_observable(self) -> Observable<Self::Item>;
}
impl<T> Notifier for Observable<T> {
type Item = T;
fn into_observable(self) -> Observable<Self::Item> {
self
}
}
impl<T, F> Notifier for F
where
F: FnOnce() -> Observable<T>,
{
type Item = T;
fn into_observable(self) -> Observable<Self::Item> {
self()
}
}
#[derive(Debug, Clone)]
pub struct EmptyError;
impl std::fmt::Display for EmptyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "no elements in sequence")
}
}
impl Error for EmptyError {}
type SubscribeFn<T> = Box<dyn FnMut(Subscriber<T>) -> Subscription + Send + Sync>;
type PendingObservables<T> = VecDeque<(Observable<T>, Subscriber<T>)>;
macro_rules! arc_mutex_clone {
($val:expr => $name:ident$(: $explicit:ty)?, $(@$name_cl:ident),+) => {
let $name$(: $explicit)? = Arc::new(Mutex::new($val));
$(let $name_cl = Arc::clone(&$name);)+
};
}
#[derive(Clone)]
pub struct Observable<T> {
subscribe_fn: Arc<Mutex<SubscribeFn<T>>>,
fused: bool,
defused: bool,
pub(crate) subject: bool,
}
impl<T> Observable<T> {
pub fn new(sf: impl FnMut(Subscriber<T>) -> Subscription + Send + Sync + 'static) -> Self {
Observable {
subscribe_fn: Arc::new(Mutex::new(Box::new(sf))),
fused: false,
defused: false,
subject: false,
}
}
#[must_use]
pub fn empty() -> Self {
Observable {
subscribe_fn: Arc::new(Mutex::new(Box::new(|_| {
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
}))),
fused: false,
defused: false,
subject: false,
}
}
#[must_use]
pub fn fuse(mut self) -> Self {
self.fused = true;
self.defused = false;
self
}
#[must_use]
pub fn defuse(mut self) -> Self {
self.fused = false;
self.defused = true;
self
}
}
#[allow(clippy::module_name_repetitions)]
pub trait ObservableExt<T: 'static>: Subscribeable<ObsType = T> {
fn map<U, F>(mut self, f: F) -> Observable<U>
where
Self: Sized + Send + Sync + 'static,
F: FnOnce(T) -> U + Copy + Send + Sync + 'static,
U: 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let mut u = Subscriber::new(
move |v| {
let t = f(v);
o_shared.lock().unwrap().next(t);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn filter<P>(mut self, predicate: P) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let mut u = Subscriber::new(
move |v| {
if predicate(&v) {
o_shared.lock().unwrap().next(v);
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn skip(mut self, n: usize) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let mut n = n;
let mut u = Subscriber::new(
move |v| {
if n > 0 {
n -= 1;
return;
}
o_shared.lock().unwrap().next(v);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn buffer<U, N>(mut self, closing_notifier: N) -> Observable<Vec<T>>
where
Self: Sized + Send + Sync + 'static,
N: Notifier<Item = U>,
T: Send,
U: 'static,
{
enum Signal {
Idle,
Notified,
Complete,
Done,
}
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut notifier_observable = closing_notifier.into_observable();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
arc_mutex_clone!(Signal::Idle => signal,
@signal_cl_c, @signal_cl_e, @signal_cl_n, @signal_cl_cn, @signal_cl_en
);
arc_mutex_clone!(o => o_shared, @o_cloned_c, @o_cloned_cn, @o_cloned_e, @o_cloned_en);
let notifier_observer = Subscriber::new(
move |_: U| {
let mut signal = signal_cl_n.lock().unwrap();
if let Signal::Complete | Signal::Done = *signal {
return;
}
*signal = Signal::Notified;
},
move |notifier_observable_error| {
*signal_cl_en.lock().unwrap() = Signal::Done;
o_cloned_en.lock().unwrap().error(notifier_observable_error);
},
move || {
let mut signal = signal_cl_cn.lock().unwrap();
if let Signal::Complete | Signal::Done = *signal {
return;
}
*signal = Signal::Complete;
o_cloned_cn.lock().unwrap().complete();
},
);
let notifier_unsubscriber = Arc::new(Mutex::new(Some(
notifier_observable.subscribe(notifier_observer),
)));
let notifier_unsubscriber_cl = Arc::clone(¬ifier_unsubscriber);
arc_mutex_clone!(Vec::with_capacity(10) => buf, @buf_cl);
let mut u = Subscriber::new(
move |v| {
let mut signal = signal.lock().unwrap();
let mut buf = buf.lock().unwrap();
match *signal {
Signal::Idle => buf.push(v),
ref s @ (Signal::Notified | Signal::Complete) => {
buf.push(v);
let mut buf_temp = Vec::with_capacity(buf.len());
buf_temp.append(&mut buf);
o_shared.lock().unwrap().next(buf_temp);
match s {
Signal::Notified => *signal = Signal::Idle,
Signal::Complete => *signal = Signal::Done,
_ => {}
}
}
Signal::Done => (),
}
},
move |observable_error| {
if let Some(unsubscriber) = notifier_unsubscriber_cl.lock().unwrap().take() {
unsubscriber.unsubscribe();
};
*signal_cl_e.lock().unwrap() = Signal::Done;
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
let mut signal = signal_cl_c.lock().unwrap();
if let Some(unsubscriber) = notifier_unsubscriber.lock().unwrap().take() {
unsubscriber.unsubscribe();
};
if let Signal::Done = *signal {
return;
}
*signal = Signal::Complete;
let mut buf = buf_cl.lock().unwrap();
let mut buf_temp = Vec::with_capacity(buf.len());
buf_temp.append(&mut buf);
o_cloned_c.lock().unwrap().next(buf_temp);
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn debounce(mut self, due_time: Duration) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
T: Send,
{
struct LastValue<T> {
value: T,
}
enum Signal {
Interrupt,
}
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let last_value: Arc<Mutex<Option<LastValue<T>>>> = Arc::new(Mutex::new(None));
let last_value_cl = Arc::clone(&last_value);
let last_value_cl_emit = Arc::clone(&last_value);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
loop {
match rx.recv_timeout(due_time) {
Ok(Signal::Interrupt) => {
}
Err(RecvTimeoutError::Timeout) => {
if let Some(last_value) = last_value_cl_emit.lock().unwrap().take() {
o_shared.lock().unwrap().next(last_value.value);
}
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
});
let mut u = Subscriber::new(
move |v| {
*last_value.lock().unwrap() = Some(LastValue { value: v });
let _ = tx.send(Signal::Interrupt);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
if let Some(lv) = last_value_cl.lock().unwrap().take() {
o_cloned_c.lock().unwrap().next(lv.value);
}
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn debounce_map<R: 'static, F>(mut self, duration_selector: F) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
F: FnMut(&T) -> Observable<R> + Sync + Send + 'static,
T: Send,
{
fn unsubscribe_duration_observable(
duration_subscription: &Arc<Mutex<Option<Subscription>>>,
) {
if let Some(subscription) = duration_subscription.lock().unwrap().take() {
subscription.unsubscribe();
};
}
struct LastValue<T> {
value: T,
id: u64,
}
let duration_selector = Arc::new(Mutex::new(duration_selector));
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let duration_selector = Arc::clone(&duration_selector);
let last_value: Arc<Mutex<Option<LastValue<T>>>> = Arc::new(Mutex::new(None));
let last_value_cl = Arc::clone(&last_value);
let duration_subscription: Arc<Mutex<Option<Subscription>>> =
Arc::new(Mutex::new(None));
let duration_subscription_cl_c = Arc::clone(&duration_subscription);
let duration_subscription_cl_e = Arc::clone(&duration_subscription);
let mut duration_id = 0_u64;
let mut u = Subscriber::new(
move |v| {
duration_id = duration_id.wrapping_add(1);
let o_shared = Arc::clone(&o_shared);
let o_cloned_e = Arc::clone(&o_shared);
let duration_selector = Arc::clone(&duration_selector);
let last_value_cl = Arc::clone(&last_value);
let duration_subscription_cl = Arc::clone(&duration_subscription);
unsubscribe_duration_observable(&duration_subscription);
let mut duration_observable = duration_selector.lock().unwrap()(&v);
*last_value.lock().unwrap() = Some(LastValue {
value: v,
id: duration_id,
});
drop(duration_selector);
let duration_subscriber = Subscriber::new(
move |_| {
let is_stale;
let duration_id = duration_id;
if let Ok(mut glv) = last_value_cl.try_lock() {
is_stale = glv.as_ref().map_or_else(
|| true,
|lv| {
if lv.id > duration_id {
return true;
};
false
},
);
if is_stale {
return;
}
if let Some(lv) = glv.take() {
o_shared.lock().unwrap().next(lv.value);
}
unsubscribe_duration_observable(&duration_subscription_cl);
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {},
);
let s = duration_observable.subscribe(duration_subscriber);
*duration_subscription.lock().unwrap() = Some(s);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
unsubscribe_duration_observable(&duration_subscription_cl_e);
},
move || {
if let Some(lv) = last_value_cl.lock().unwrap().take() {
o_cloned_c.lock().unwrap().next(lv.value);
}
o_cloned_c.lock().unwrap().complete();
unsubscribe_duration_observable(&duration_subscription_cl_c);
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn delay(mut self, num_of_ms: u64) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let mut u = Subscriber::new(
move |v| {
std::thread::sleep(Duration::from_millis(num_of_ms));
o_shared.lock().unwrap().next(v);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn scan<U, F>(mut self, acc: F, seed: Option<U>) -> Observable<U>
where
Self: Sized + Send + Sync + 'static,
F: FnOnce(U, T) -> U + Copy + Sync + Send + 'static,
U: From<T> + Clone + Send + Sync + 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let state = Arc::new(Mutex::new(seed.clone()));
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let state_cl = Arc::clone(&state);
let mut u = Subscriber::new(
move |v: T| {
if let Ok(mut state) = state_cl.lock() {
if state.is_none() {
*state = Some(std::convert::Into::into(v));
} else {
*state = state.as_ref().map(|s| acc(s.clone(), v));
}
o_shared
.lock()
.unwrap()
.next(state.as_ref().unwrap().clone());
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn connectable(self) -> Connectable<T>
where
Self: Send + Sync + Sized + 'static,
T: Send + Sync + Clone,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut connectable_observable = Connectable::new(Arc::new(Mutex::new(self)));
connectable_observable.set_subject_indicator(subject);
connectable_observable.set_fused(fused, defused);
connectable_observable
}
fn first<F>(mut self, predicate: F, default_value: Option<T>) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
F: FnOnce(T, usize) -> bool + Copy + Send + Sync + 'static,
T: Clone + Send + Sync,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let mut default_value = default_value.clone();
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let mut signal_sent = false;
let emitted = Arc::new(Mutex::new(false));
let emitted_cl = Arc::clone(&emitted);
let (tx, rx) = setup_unsubscribe_channel();
let mut index = 0;
let mut u = Subscriber::new(
move |v: T| {
if !signal_sent && predicate(v.clone(), index) {
o_shared.lock().unwrap().next(v);
signal_sent = true;
*emitted.lock().unwrap() = true;
tx.send_unsubscribe_signal();
}
index += 1;
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
if let (Ok(mut observer), Ok(emitted)) = (o_cloned_c.lock(), emitted_cl.lock())
{
if !*emitted {
if let Some(v) = default_value.take() {
observer.next(v);
observer.complete();
} else {
observer.error(Arc::new(EmptyError));
}
return;
}
observer.complete();
}
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
let unsubscriber = self.subscribe(u);
rx.unsubscribe_background_emissions(&self, unsubscriber)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
#[allow(clippy::too_many_lines)]
fn zip(mut self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
where
Self: Clone + Sized + Send + Sync + 'static,
T: Clone + Send,
{
use std::collections::HashMap;
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
use std::task::Poll;
#[allow(clippy::needless_pass_by_value)]
fn unsubscribe_stored_subscriptions(
subscriptions_store: Arc<Mutex<Vec<Subscription>>>,
is_subject: bool,
) {
if is_subject {
if let Ok(mut s) = subscriptions_store.lock() {
s.pop();
}
}
if let Ok(mut s) = subscriptions_store.lock() {
while let Some(u) = s.pop() {
u.unsubscribe();
}
}
}
let is_subject = self.is_subject();
let mut observable_inputs: VecDeque<Observable<T>> = observable_inputs.clone().into();
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let input_len = observable_inputs.len();
let all_emits_collect = Arc::new(Mutex::new(HashMap::with_capacity(input_len)));
let subscriptions_store = Arc::new(Mutex::new(Vec::with_capacity(input_len)));
let subscriptions_store_cl = Arc::clone(&subscriptions_store);
let subscriptions_store_cl2 = Arc::clone(&subscriptions_store);
let subscriptions_store_cl3 = Arc::clone(&subscriptions_store);
let tokio_handle = tokio::runtime::Handle::try_current();
let mut idx = 0;
while let Some(mut input) = observable_inputs.pop_front() {
let inner_emits_collect = VecDeque::with_capacity(16);
all_emits_collect
.lock()
.unwrap()
.insert(idx, inner_emits_collect);
let all_emits_collect_cl = Arc::clone(&all_emits_collect);
let all_emits_collect_cl2 = Arc::clone(&all_emits_collect);
let all_emits_collect_cl3 = Arc::clone(&all_emits_collect);
let inner_subscriber = Subscriber::new(
move |v: T| {
let all_emits_collect_cl = Arc::clone(&all_emits_collect_cl);
if let Some(inner_emits) =
all_emits_collect_cl.lock().unwrap().get_mut(&idx)
{
inner_emits.push_back(EmittedValue::Success(v));
};
},
move |e| {
if let Some(inner_emits) =
all_emits_collect_cl2.lock().unwrap().get_mut(&idx)
{
inner_emits.push_back(EmittedValue::Error(e));
}
},
move || {
if let Some(inner_emits) =
all_emits_collect_cl3.lock().unwrap().get_mut(&idx)
{
inner_emits.push_back(EmittedValue::Complete);
}
},
);
let subscriptions_store = Arc::clone(&subscriptions_store);
let subscription = input.subscribe(inner_subscriber);
subscriptions_store.lock().unwrap().push(subscription);
idx += 1;
}
let mut unsubscribed = false;
let mut u = Subscriber::new(
move |v| {
if unsubscribed {
return;
}
let mut values = Vec::with_capacity(input_len);
values.push(v);
let mut unsub = false;
let mut i = 0;
loop {
std::thread::sleep(Duration::from_millis(1));
if let Some(s) = all_emits_collect.lock().unwrap().get_mut(&i) {
match s.pop_front() {
Some(EmittedValue::Success(e)) => {
values.push(e);
i += 1;
}
Some(EmittedValue::Complete) => {
unsub = true;
break;
}
Some(EmittedValue::Error(e)) => {
unsub = true;
o_shared.lock().unwrap().error(e);
break;
}
None => (),
}
}
if i == input_len {
break;
}
if tokio::runtime::Handle::try_current().is_ok() {
let ftr = std::future::poll_fn(|cx| {
cx.waker().wake_by_ref();
Poll::Ready::<()>(())
});
tokio::task::spawn(async {
ftr.await;
});
}
}
if unsub {
unsubscribe_stored_subscriptions(
subscriptions_store_cl.clone(),
is_subject,
);
unsubscribed = true;
return;
}
o_shared.lock().unwrap().next(values);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
unsubscribe_stored_subscriptions(subscriptions_store_cl2.clone(), is_subject);
},
move || {
o_cloned_c.lock().unwrap().complete();
unsubscribe_stored_subscriptions(subscriptions_store_cl3.clone(), is_subject);
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
let mut outer_subscription = self.subscribe(u);
let handle = outer_subscription.subscription_future;
outer_subscription.subscription_future = SubscriptionHandle::Nil;
subscriptions_store.lock().unwrap().push(outer_subscription);
if tokio_handle.is_ok() {
return Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
unsubscribe_stored_subscriptions(subscriptions_store, false);
})),
handle,
);
}
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
unsubscribe_stored_subscriptions(subscriptions_store, false);
})),
handle,
)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn take(mut self, n: usize) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut i = 0;
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let (tx, rx) = setup_unsubscribe_channel();
let mut signal_sent = false;
if self.is_subject() {
signal_sent = true;
}
let mut u = Subscriber::new(
move |v| {
if i < n {
i += 1;
o_shared.lock().unwrap().next(v);
} else if !signal_sent {
signal_sent = true;
tx.send_unsubscribe_signal();
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = true;
self.set_fused(fused, defused);
let unsubscriber = self.subscribe(u);
rx.unsubscribe_background_emissions(&self, unsubscriber)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn take_until<U: 'static>(
mut self,
notifier: Observable<U>,
unsubscribe_notifier: bool,
) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
{
let notifier = Arc::new(Mutex::new(notifier));
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let (tx, rx) = setup_unsubscribe_channel();
let mut signal_sent = false;
let notifier_next_called = Arc::new(Mutex::new(false));
let notifier_next_called_cl = Arc::clone(¬ifier_next_called);
let observer =
Subscriber::on_next(move |_: U| *notifier_next_called_cl.lock().unwrap() = true);
let notifier = Arc::clone(¬ifier);
let subscription = Arc::new(Mutex::new(None));
let subscription_cl = Arc::clone(&subscription);
if tx.is_tokio_used() {
tokio::task::spawn(async move {
let subscription = notifier.lock().unwrap().subscribe(observer);
*subscription_cl.lock().unwrap() = Some(subscription);
});
} else {
std::thread::spawn(move || {
let subscription = notifier.lock().unwrap().subscribe(observer);
*subscription_cl.lock().unwrap() = Some(subscription);
});
}
if self.is_subject() {
signal_sent = true;
}
let mut u = Subscriber::new(
move |v| {
if !(*notifier_next_called.lock().unwrap()) {
o_shared.lock().unwrap().next(v);
} else if !signal_sent {
signal_sent = true;
tx.send_unsubscribe_signal();
if unsubscribe_notifier {
if let Some(s) = subscription.lock().unwrap().take() {
s.unsubscribe();
}
}
} else if unsubscribe_notifier {
if let Some(s) = subscription.lock().unwrap().take() {
s.unsubscribe();
}
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = true;
self.set_fused(fused, defused);
let unsubscriber = self.subscribe(u);
rx.unsubscribe_background_emissions(&self, unsubscriber)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn take_while<P>(mut self, predicate: P) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let (tx, rx) = setup_unsubscribe_channel();
let mut signal_sent = false;
if self.is_subject() {
signal_sent = true;
}
let mut u = Subscriber::new(
move |v| {
if predicate(&v) {
o_shared.lock().unwrap().next(v);
} else if !signal_sent {
signal_sent = true;
tx.send_unsubscribe_signal();
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = true;
self.set_fused(fused, defused);
let unsubscriber = self.subscribe(u);
rx.unsubscribe_background_emissions(&self, unsubscriber)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn take_last(mut self, count: usize) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
T: Send,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let last_values_buffer = Arc::new(Mutex::new(VecDeque::with_capacity(count)));
let last_values_buffer_cl = Arc::clone(&last_values_buffer);
let fused = o.fused;
let defused = o.defused;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let i = Arc::new(Mutex::new(0));
let i_cl = Arc::clone(&i);
let mut u = Subscriber::new(
move |v| {
if count == 0 {
return;
}
if let (Ok(mut counter), Ok(mut last_values_buffer)) =
(i.lock(), last_values_buffer_cl.lock())
{
*counter += 1;
if *counter > count {
last_values_buffer.pop_front();
}
last_values_buffer.push_back(v);
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
if let (Ok(mut o), Ok(mut last_values_buffer)) =
(o_shared.lock(), last_values_buffer.lock())
{
while let Some(item) = last_values_buffer.pop_front() {
o.next(item);
}
let _ = i_cl.lock().map(|mut counter| *counter = 0);
o.complete();
}
},
);
u.take_wrapped = true;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn tap(mut self, observer: Subscriber<T>) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
T: Clone,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let observer = Arc::new(Mutex::new(observer));
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let observer = Arc::clone(&observer);
let observer_cl = Arc::clone(&observer);
let observer_e = Arc::clone(&observer);
let mut u = Subscriber::new(
move |v: T| {
if let Ok(mut s) = observer.lock() {
s.next(v.clone());
}
o_shared.lock().unwrap().next(v);
},
move |observable_error| {
if let Ok(mut s) = observer_e.lock() {
s.error(observable_error.clone());
}
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
if let Ok(mut s) = observer_cl.lock() {
s.complete();
}
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn timestamp(mut self) -> Observable<TimestampedEmit<T>>
where
Self: Sized + Send + Sync + 'static,
{
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let mut u = Subscriber::new(
move |v| {
let timestamped_emit = TimestampedEmit::new(v);
o_shared.lock().unwrap().next(timestamped_emit);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn merge(mut self, mut sources: Vec<Observable<T>>) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
{
fn wrap_subscriber<S: 'static>(
s: Arc<Mutex<Subscriber<S>>>,
is_fused: bool,
is_defused: bool,
is_take_wrapped: bool,
) -> Subscriber<S> {
let s_complete = s.clone();
let s_error = s.clone();
let mut s = Subscriber::new(
move |v| {
s.lock().unwrap().next(v);
},
move |e| {
s_error.lock().unwrap().error(e);
},
move || {
s_complete.lock().unwrap().complete();
},
);
s.take_wrapped = is_take_wrapped;
s.set_fused(is_fused, is_defused);
s
}
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let take_wrapped = o.take_wrapped;
let fused = o.fused;
let defused = o.defused;
let o = Arc::new(Mutex::new(o));
let mut subscriptions = Vec::with_capacity(sources.len());
let mut use_tokio_task = false;
self.set_fused(fused, defused);
let s = self.subscribe(wrap_subscriber(o.clone(), fused, defused, take_wrapped));
if let UnsubscribeLogic::Future(_) = &s.unsubscribe_logic {
use_tokio_task = true;
}
for source in &mut sources {
let wrapped = wrap_subscriber(o.clone(), fused, defused, false);
let subscription = source.subscribe(wrapped);
if let UnsubscribeLogic::Future(_) = &subscription.unsubscribe_logic {
use_tokio_task = true;
}
subscriptions.push(subscription);
}
if let Ok(handle) = s.runtime_handle.as_ref() {
if let tokio::runtime::RuntimeFlavor::CurrentThread = handle.runtime_flavor() {
use_tokio_task = false;
}
}
subscriptions.push(s);
let subscriptions = Arc::new(Mutex::new(Some(subscriptions)));
let sc = Arc::clone(&subscriptions);
if use_tokio_task {
return Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
let subscriptions = subscriptions.lock().unwrap().take();
if let Some(subscriptions) = subscriptions {
for subscription in subscriptions {
subscription.unsubscribe();
}
}
})),
SubscriptionHandle::JoinSubscriptions(SubscriptionCollection::new(sc, true)),
);
}
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
let subscriptions = subscriptions.lock().unwrap().take();
if let Some(subscriptions) = subscriptions {
for subscription in subscriptions {
subscription.unsubscribe();
}
}
})),
SubscriptionHandle::JoinSubscriptions(SubscriptionCollection::new(sc, false)),
)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn merge_one(mut self, mut source: Observable<T>) -> Observable<T>
where
Self: Sized + Send + Sync + 'static,
{
fn wrap_subscriber<S: 'static>(
s: Arc<Mutex<Subscriber<S>>>,
is_fused: bool,
is_defused: bool,
is_take_wrapped: bool,
) -> Subscriber<S> {
let s_complete = s.clone();
let s_error = s.clone();
let mut s = Subscriber::new(
move |v| {
s.lock().unwrap().next(v);
},
move |e| {
s_error.lock().unwrap().error(e);
},
move || {
s_complete.lock().unwrap().complete();
},
);
s.take_wrapped = is_take_wrapped;
s.set_fused(is_fused, is_defused);
s
}
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let take_wrapped = o.take_wrapped;
let fused = o.fused;
let defused = o.defused;
let o = Arc::new(Mutex::new(o));
let wrapped = wrap_subscriber(o.clone(), fused, defused, take_wrapped);
let wrapped2 = wrap_subscriber(o, fused, defused, false);
let mut use_tokio_task = false;
self.set_fused(fused, defused);
let s1 = self.subscribe(wrapped);
let s2 = source.subscribe(wrapped2);
match (&s1.unsubscribe_logic, &s2.unsubscribe_logic) {
(UnsubscribeLogic::Future(_), _) | (_, UnsubscribeLogic::Future(_)) => {
use_tokio_task = true;
}
_ => (),
}
if let Ok(handle) = s1.runtime_handle.as_ref() {
if let tokio::runtime::RuntimeFlavor::CurrentThread = handle.runtime_flavor() {
use_tokio_task = false;
}
}
let subscriptions = vec![s1, s2];
let subscriptions = Arc::new(Mutex::new(Some(subscriptions)));
let sc = Arc::clone(&subscriptions);
if use_tokio_task {
return Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
let subscriptions = subscriptions.lock().unwrap().take();
if let Some(subscriptions) = subscriptions {
for subscription in subscriptions {
subscription.unsubscribe();
}
}
})),
SubscriptionHandle::JoinSubscriptions(SubscriptionCollection::new(sc, true)),
);
}
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
let subscriptions = subscriptions.lock().unwrap().take();
if let Some(subscriptions) = subscriptions {
for subscription in subscriptions {
subscription.unsubscribe();
}
}
})),
SubscriptionHandle::JoinSubscriptions(SubscriptionCollection::new(sc, false)),
)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn switch_map<R: 'static, F>(mut self, project: F) -> Observable<R>
where
Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static,
{
let project = Arc::new(Mutex::new(project));
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let project = Arc::clone(&project);
let mut current_subscription: Option<Subscription> = None;
let mut u = Subscriber::new(
move |v| {
let o_shared = Arc::clone(&o_shared);
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let project = Arc::clone(&project);
let mut inner_observable = project.lock().unwrap()(v);
drop(project);
let inner_subscriber = Subscriber::new(
move |k| {
o_shared.lock().unwrap().next(k);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
if let Some(subscription) = current_subscription.take() {
subscription.unsubscribe();
};
let s = inner_observable.subscribe(inner_subscriber);
current_subscription = Some(s);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn merge_map<R: 'static, F>(mut self, project: F) -> Observable<R>
where
Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static,
{
let project = Arc::new(Mutex::new(project));
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let project = Arc::clone(&project);
let mut u = Subscriber::new(
move |v| {
let o_shared = Arc::clone(&o_shared);
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let project = Arc::clone(&project);
let mut inner_observable = project.lock().unwrap()(v);
drop(project);
let inner_subscriber = Subscriber::new(
move |k| {
o_shared.lock().unwrap().next(k);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
inner_observable.subscribe(inner_subscriber);
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn concat_map<R: 'static, F>(mut self, project: F) -> Observable<R>
where
Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static,
{
let project = Arc::new(Mutex::new(project));
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let project = Arc::clone(&project);
let pending_observables: Arc<Mutex<PendingObservables<R>>> =
Arc::new(Mutex::new(VecDeque::new()));
let mut first_pass = true;
let mut u = Subscriber::new(
move |v| {
let o_shared = Arc::clone(&o_shared);
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let po_cloned = Arc::clone(&pending_observables);
let project = Arc::clone(&project);
let mut inner_observable = project.lock().unwrap()(v);
drop(project);
let inner_subscriber = Subscriber::new(
move |k| o_shared.lock().unwrap().next(k),
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
if let Some((mut io, is)) = po_cloned.lock().unwrap().pop_front() {
io.subscribe(is);
}
},
);
if first_pass {
inner_observable.subscribe(inner_subscriber);
first_pass = false;
return;
}
pending_observables
.lock()
.unwrap()
.push_back((inner_observable, inner_subscriber));
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
fn exhaust_map<R: 'static, F>(mut self, project: F) -> Observable<R>
where
Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static,
{
let project = Arc::new(Mutex::new(project));
let subject = self.is_subject();
let (fused, defused) = self.get_fused();
let mut observable = Observable::new(move |o| {
let fused = o.fused;
let defused = o.defused;
let take_wrapped = o.take_wrapped;
let o_shared = Arc::new(Mutex::new(o));
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let project = Arc::clone(&project);
let active_subscription = Arc::new(Mutex::new(false));
let guard = Arc::new(Mutex::new(true));
let mut u = Subscriber::new(
move |v| {
let as_cloned = Arc::clone(&active_subscription);
let as_cloned2 = Arc::clone(&active_subscription);
let project = Arc::clone(&project);
let _guard = guard.lock().unwrap();
let is_previous_subscription_active = *as_cloned.lock().unwrap();
let o_shared = Arc::clone(&o_shared);
let o_cloned_e = Arc::clone(&o_shared);
let o_cloned_c = Arc::clone(&o_shared);
let mut inner_observable = project.lock().unwrap()(v);
drop(project);
let inner_subscriber = Subscriber::new(
move |k| o_shared.lock().unwrap().next(k),
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
*as_cloned2.lock().unwrap() = false;
},
);
if !is_previous_subscription_active {
*as_cloned.lock().unwrap() = true;
inner_observable.subscribe(inner_subscriber);
}
},
move |observable_error| {
o_cloned_e.lock().unwrap().error(observable_error);
},
move || {
o_cloned_c.lock().unwrap().complete();
},
);
u.take_wrapped = take_wrapped;
self.set_fused(fused, defused);
self.subscribe(u)
});
observable.set_subject_indicator(subject);
observable.set_fused(fused, defused);
observable
}
}
impl<T> crate::subscription::subscribe::Fuse for Observable<T> {
fn set_fused(&mut self, fused: bool, defused: bool) {
self.fused = fused;
self.defused = defused;
}
fn get_fused(&self) -> (bool, bool) {
(self.fused, self.defused)
}
}
impl<T: 'static> Subscribeable for Observable<T> {
type ObsType = T;
fn subscribe(&mut self, mut v: Subscriber<Self::ObsType>) -> Subscription {
let (fused, defused) = v.get_fused();
if defused || (fused && !self.fused) {
self.defused = v.defused;
self.fused = v.fused;
} else {
v.set_fused(self.fused, self.defused);
}
(self.subscribe_fn.lock().unwrap())(v)
}
fn is_subject(&self) -> bool {
self.subject
}
fn set_subject_indicator(&mut self, s: bool) {
self.subject = s;
}
}
impl<O, T: 'static> ObservableExt<T> for O where O: Subscribeable<ObsType = T> {}