use super::{Observable, boxed_observable::BoxedObservable};
use crate::{
disposable::subscription::Subscription,
observable::cloneable_boxed_observable::CloneableBoxedObservable,
observer::{
Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
},
operators::{
backpressure::{
on_backpressure::OnBackpressure, on_backpressure_buffer::OnBackpressureBuffer,
on_backpressure_latest::OnBackpressureLatest,
},
combining::{
combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
},
conditional_boolean::{
all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
take_until::TakeUntil, take_while::TakeWhile,
},
connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
error_handling::{
catch::Catch,
retry::{Retry, RetryAction},
},
filtering::{
debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
take_last::TakeLast, throttle::Throttle,
},
mathematical_aggregate::{
average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
},
others::{
debug::{Debug, DebugEvent, DefaultPrintType},
hook_on_next::HookOnNext,
hook_on_subscription::HookOnSubscription,
hook_on_termination::HookOnTermination,
map_infallible_to_error::MapInfallibleToError,
map_infallible_to_value::MapInfallibleToValue,
},
transforming::{
buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
window::Window, window_with_count::WindowWithCount,
},
utility::{
delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
do_before_termination::DoBeforeTermination, materialize::Materialize,
observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
timeout::Timeout, timestamp::Timestamp,
},
},
subject::{
async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
},
utils::types::NecessarySendSync,
};
use std::{fmt::Display, num::NonZeroUsize, time::Duration};
#[cfg(feature = "futures")]
use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
fn all<F>(self, callback: F) -> All<T, Self, F>
where
F: FnMut(T) -> bool,
{
All::new(self, callback)
}
fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
Amb::new([self, other])
}
fn average(self) -> Average<T, Self> {
Average::new(self)
}
fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
where
OE1: Observable<'or, 'sub, (), E>,
{
Buffer::new(self, boundary)
}
fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
BufferWithCount::new(self, count)
}
fn buffer_with_time<S>(
self,
time_span: Duration,
scheduler: S,
delay: Option<Duration>,
) -> BufferWithTime<Self, S> {
BufferWithTime::new(self, time_span, scheduler, delay)
}
fn buffer_with_time_or_count<S>(
self,
count: NonZeroUsize,
time_span: Duration,
scheduler: S,
delay: Option<Duration>,
) -> BufferWithTimeOrCount<Self, S> {
BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
}
fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
where
OE1: Observable<'or, 'sub, T, E1>,
F: FnOnce(E) -> OE1,
{
Catch::new(self, callback)
}
fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
where
OE2: Observable<'or, 'sub, T1, E>,
{
CombineLatest::new(self, another_source)
}
fn concat_all<T1>(self) -> ConcatAll<Self, T>
where
T: Observable<'or, 'sub, T1, E>,
{
ConcatAll::new(self)
}
fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
where
OE1: Observable<'or, 'sub, T1, E>,
F: FnMut(T) -> OE1,
{
ConcatMap::new(self, callback)
}
fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
where
OE2: Observable<'or, 'sub, T, E>,
{
Concat::new(self, source_2)
}
fn contains(self, item: T) -> Contains<T, Self> {
Contains::new(self, item)
}
fn count(self) -> Count<T, Self> {
Count::new(self)
}
fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
Debounce::new(self, time_span, scheduler)
}
fn debug<C, F>(self, context: C, callback: F) -> Debug<Self, C, F>
where
F: Fn(C, DebugEvent<'_, T, E>),
{
Debug::new(self, context, callback)
}
fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
where
L: Display,
T: std::fmt::Debug,
E: std::fmt::Debug,
{
Debug::new_default_print(self, label)
}
fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
DefaultIfEmpty::new(self, default_value)
}
fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
Delay::new(self, delay, scheduler)
}
fn dematerialize(self) -> Dematerialize<Self> {
Dematerialize::new(self)
}
fn distinct(self) -> Distinct<Self, fn(&T) -> T>
where
T: Clone,
{
Distinct::new(self)
}
fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
where
F: FnMut(&T) -> K,
{
Distinct::new_with_key_selector(self, key_selector)
}
fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
where
T: Clone,
{
DistinctUntilChanged::new(self)
}
fn distinct_until_changed_with_key_selector<F, K>(
self,
key_selector: F,
) -> DistinctUntilChanged<Self, F>
where
F: FnMut(&T) -> K,
{
DistinctUntilChanged::new_with_key_selector(self, key_selector)
}
fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
where
F: FnOnce(),
{
DoAfterDisposal::new(self, callback)
}
fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
where
F: FnMut(T),
{
DoAfterNext::new(self, callback)
}
fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
where
F: FnOnce(),
{
DoAfterSubscription::new(self, callback)
}
fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
where
F: FnOnce(Termination<E>),
{
DoAfterTermination::new(self, callback)
}
fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
where
F: FnOnce(),
{
DoBeforeDisposal::new(self, callback)
}
fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
where
F: FnMut(&T),
{
DoBeforeNext::new(self, callback)
}
fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
where
F: FnOnce(),
{
DoBeforeSubscription::new(self, callback)
}
fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
where
F: FnOnce(&Termination<E>),
{
DoBeforeTermination::new(self, callback)
}
fn element_at(self, index: usize) -> ElementAt<Self> {
ElementAt::new(self, index)
}
fn filter<F>(self, callback: F) -> Filter<Self, F>
where
F: FnMut(&T) -> bool,
{
Filter::new(self, callback)
}
fn first(self) -> First<Self> {
First::new(self)
}
fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
where
OE1: Observable<'or, 'sub, T1, E>,
F: FnMut(T) -> OE1,
{
FlatMap::new(self, callback)
}
fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
where
F: FnMut(T) -> K,
{
GroupBy::new(self, callback)
}
fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
where
F: FnMut(&mut dyn Observer<T, E>, T),
{
HookOnNext::new(self, callback)
}
fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
where
F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
{
HookOnSubscription::new(self, callback)
}
fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
where
F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
{
HookOnTermination::new(self, callback)
}
fn ignore_elements(self) -> IgnoreElements<Self> {
IgnoreElements::new(self)
}
fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
where
T: 'or,
E: 'or,
Self: NecessarySendSync + 'oe,
{
BoxedObservable::new(self)
}
fn into_cloneable_boxed<'oe>(self) -> CloneableBoxedObservable<'or, 'sub, 'oe, T, E>
where
T: 'or,
E: 'or,
Self: NecessarySendSync + Clone + 'oe,
{
CloneableBoxedObservable::new(self)
}
#[cfg(feature = "futures")]
fn into_stream(self) -> ObservableStream<'sub, T, Self>
where
Self: Observable<'or, 'sub, T, Infallible>,
{
ObservableStream::new(self)
}
fn last(self) -> Last<Self> {
Last::new(self)
}
fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
where
F: FnMut(T) -> T1,
{
Map::new(self, callback)
}
fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
MapInfallibleToError::new(self)
}
fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
MapInfallibleToValue::new(self)
}
fn materialize(self) -> Materialize<Self> {
Materialize::new(self)
}
fn max(self) -> Max<Self> {
Max::new(self)
}
fn merge_all<T1>(self) -> MergeAll<Self, T>
where
T: Observable<'or, 'sub, T1, E>,
{
MergeAll::new(self)
}
fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
where
OE2: Observable<'or, 'sub, T, E>,
{
Merge::new(self, source_2)
}
fn min(self) -> Min<Self> {
Min::new(self)
}
fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
where
F: FnOnce() -> S,
{
ConnectableObservable::new(self, subject_maker())
}
fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
ObserveOn::new(self, scheduler)
}
fn on_backpressure<F>(self, receiving_strategy: F) -> OnBackpressure<Self, F>
where
F: FnMut(&mut Vec<T>, T),
{
OnBackpressure::new(self, receiving_strategy)
}
fn on_backpressure_buffer(self) -> OnBackpressureBuffer<Self> {
OnBackpressureBuffer::new(self)
}
fn on_backpressure_latest(self) -> OnBackpressureLatest<Self> {
OnBackpressureLatest::new(self)
}
fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
self.multicast(PublishSubject::default)
}
fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
self.multicast(AsyncSubject::default)
}
fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
where
F: FnMut(T0, T) -> T0,
{
Reduce::new(self, initial_value, callback)
}
fn replay(
self,
buffer_size: Option<usize>,
) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
self.multicast(|| ReplaySubject::new(buffer_size))
}
fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
where
OE1: Observable<'or, 'sub, T, E>,
F: FnMut(E) -> RetryAction<E, OE1>,
{
Retry::new(self, callback)
}
fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
where
OE1: Observable<'or, 'sub, (), E>,
{
Sample::new(self, sampler)
}
fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
where
F: FnMut(T0, T) -> T0,
{
Scan::new(self, initial_value, callback)
}
fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
where
OE2: Observable<'or, 'sub, T, E>,
{
SequenceEqual::new(self, another_source)
}
fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
self.publish().ref_count()
}
fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
self.publish_last().ref_count()
}
fn share_replay(
self,
buffer_size: Option<usize>,
) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
self.replay(buffer_size).ref_count()
}
fn skip(self, count: usize) -> Skip<Self> {
Skip::new(self, count)
}
fn skip_last(self, count: usize) -> SkipLast<Self> {
SkipLast::new(self, count)
}
fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
where
OE1: Observable<'or, 'sub, (), E>,
{
SkipUntil::new(self, start)
}
fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
where
F: FnMut(&T) -> bool,
{
SkipWhile::new(self, callback)
}
fn start_with<I>(self, values: I) -> StartWith<Self, I>
where
I: IntoIterator<Item = T>,
{
StartWith::new(self, values)
}
fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
SubscribeOn::new(self, scheduler)
}
fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
where
T: 'or,
E: 'or,
FN: FnMut(T) + NecessarySendSync + 'or,
FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
{
self.subscribe(CallbackObserver::new(on_next, on_termination))
}
fn sum(self) -> Sum<Self> {
Sum::new(self)
}
fn switch<T1>(self) -> Switch<Self, T>
where
T: Observable<'or, 'sub, T1, E>,
{
Switch::new(self)
}
fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
where
OE1: Observable<'or, 'sub, T1, E>,
F: FnMut(T) -> OE1,
{
SwitchMap::new(self, callback)
}
fn take(self, count: usize) -> Take<Self> {
Take::new(self, count)
}
fn take_last(self, count: usize) -> TakeLast<Self> {
TakeLast::new(self, count)
}
fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
where
OE1: Observable<'or, 'sub, (), E>,
{
TakeUntil::new(self, stop)
}
fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
where
F: FnMut(&T) -> bool,
{
TakeWhile::new(self, callback)
}
fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
Throttle::new(self, time_span, scheduler)
}
fn time_interval(self) -> TimeInterval<Self> {
TimeInterval::new(self)
}
fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
Timeout::new(self, duration, scheduler)
}
fn timestamp(self) -> Timestamp<Self> {
Timestamp::new(self)
}
fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
where
OE1: Observable<'or, 'sub, (), E>,
{
Window::new(self, boundary)
}
fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
WindowWithCount::new(self, count)
}
fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
where
OE2: Observable<'or, 'sub, T1, E>,
{
Zip::new(self, another_source)
}
}
impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
{}