1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3 disposable::subscription::Subscription,
4 observer::{
5 Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6 },
7 operators::{
8 combining::{
9 combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
10 merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
11 },
12 conditional_boolean::{
13 all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
14 sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
15 take_until::TakeUntil, take_while::TakeWhile,
16 },
17 connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
18 error_handling::{
19 catch::Catch,
20 retry::{Retry, RetryAction},
21 },
22 filtering::{
23 debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
24 element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
25 last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
26 take_last::TakeLast, throttle::Throttle,
27 },
28 mathematical_aggregate::{
29 average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
30 },
31 others::{
32 debug::Debug, hook_on_next::HookOnNext, hook_on_subscription::HookOnSubscription,
33 hook_on_termination::HookOnTermination, map_infallible_to_error::MapInfallibleToError,
34 map_infallible_to_value::MapInfallibleToValue,
35 },
36 transforming::{
37 buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
38 buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
39 flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
40 window::Window, window_with_count::WindowWithCount,
41 },
42 utility::{
43 delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
44 do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
45 do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
46 do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
47 do_before_termination::DoBeforeTermination, materialize::Materialize,
48 observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
49 timeout::Timeout, timestamp::Timestamp,
50 },
51 },
52 subject::{
53 async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
54 },
55 utils::types::NecessarySend,
56};
57use std::{num::NonZeroUsize, time::Duration};
58#[cfg(feature = "futures")]
59use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
60
61pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
65 fn all<F>(self, callback: F) -> All<T, Self, F>
67 where
68 F: FnMut(T) -> bool,
69 {
70 All::new(self, callback)
71 }
72
73 fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
75 Amb::new([self, other])
76 }
77
78 fn average(self) -> Average<T, Self> {
80 Average::new(self)
81 }
82
83 fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
85 where
86 OE1: Observable<'or, 'sub, (), E>,
87 {
88 Buffer::new(self, boundary)
89 }
90
91 fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
93 BufferWithCount::new(self, count)
94 }
95
96 fn buffer_with_time<S>(
98 self,
99 time_span: Duration,
100 scheduler: S,
101 delay: Option<Duration>,
102 ) -> BufferWithTime<Self, S> {
103 BufferWithTime::new(self, time_span, scheduler, delay)
104 }
105
106 fn buffer_with_time_or_count<S>(
108 self,
109 count: NonZeroUsize,
110 time_span: Duration,
111 scheduler: S,
112 delay: Option<Duration>,
113 ) -> BufferWithTimeOrCount<Self, S> {
114 BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
115 }
116
117 fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
119 where
120 OE1: Observable<'or, 'sub, T, E1>,
121 F: FnOnce(E) -> OE1,
122 {
123 Catch::new(self, callback)
124 }
125
126 fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
128 where
129 OE2: Observable<'or, 'sub, T1, E>,
130 {
131 CombineLatest::new(self, another_source)
132 }
133
134 fn concat_all<T1>(self) -> ConcatAll<Self, T>
136 where
137 T: Observable<'or, 'sub, T1, E>,
138 {
139 ConcatAll::new(self)
140 }
141
142 fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
144 where
145 OE1: Observable<'or, 'sub, T1, E>,
146 F: FnMut(T) -> OE1,
147 {
148 ConcatMap::new(self, callback)
149 }
150
151 fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
153 where
154 OE2: Observable<'or, 'sub, T, E>,
155 {
156 Concat::new(self, source_2)
157 }
158
159 fn contains(self, item: T) -> Contains<T, Self> {
161 Contains::new(self, item)
162 }
163
164 fn count(self) -> Count<T, Self> {
166 Count::new(self)
167 }
168
169 fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
171 Debounce::new(self, time_span, scheduler)
172 }
173
174 fn debug<D>(self, label: D) -> Debug<Self, D> {
176 Debug::new(self, label)
177 }
178
179 fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
181 DefaultIfEmpty::new(self, default_value)
182 }
183
184 fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
186 Delay::new(self, delay, scheduler)
187 }
188
189 fn dematerialize(self) -> Dematerialize<Self> {
191 Dematerialize::new(self)
192 }
193
194 fn distinct(self) -> Distinct<Self, fn(&T) -> T>
196 where
197 T: Clone,
198 {
199 Distinct::new(self)
200 }
201
202 fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
204 where
205 F: FnMut(&T) -> K,
206 {
207 Distinct::new_with_key_selector(self, key_selector)
208 }
209
210 fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
212 where
213 T: Clone,
214 {
215 DistinctUntilChanged::new(self)
216 }
217
218 fn distinct_until_changed_with_key_selector<F, K>(
220 self,
221 key_selector: F,
222 ) -> DistinctUntilChanged<Self, F>
223 where
224 F: FnMut(&T) -> K,
225 {
226 DistinctUntilChanged::new_with_key_selector(self, key_selector)
227 }
228
229 fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
231 where
232 F: FnOnce(),
233 {
234 DoAfterDisposal::new(self, callback)
235 }
236
237 fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
239 where
240 F: FnMut(T),
241 {
242 DoAfterNext::new(self, callback)
243 }
244
245 fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
247 where
248 F: FnOnce(),
249 {
250 DoAfterSubscription::new(self, callback)
251 }
252
253 fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
255 where
256 F: FnOnce(Termination<E>),
257 {
258 DoAfterTermination::new(self, callback)
259 }
260
261 fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
263 where
264 F: FnOnce(),
265 {
266 DoBeforeDisposal::new(self, callback)
267 }
268
269 fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
271 where
272 F: FnMut(&T),
273 {
274 DoBeforeNext::new(self, callback)
275 }
276
277 fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
279 where
280 F: FnOnce(),
281 {
282 DoBeforeSubscription::new(self, callback)
283 }
284
285 fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
287 where
288 F: FnOnce(&Termination<E>),
289 {
290 DoBeforeTermination::new(self, callback)
291 }
292
293 fn element_at(self, index: usize) -> ElementAt<Self> {
295 ElementAt::new(self, index)
296 }
297
298 fn filter<F>(self, callback: F) -> Filter<Self, F>
300 where
301 F: FnMut(&T) -> bool,
302 {
303 Filter::new(self, callback)
304 }
305
306 fn first(self) -> First<Self> {
308 First::new(self)
309 }
310
311 fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
313 where
314 OE1: Observable<'or, 'sub, T1, E>,
315 F: FnMut(T) -> OE1,
316 {
317 FlatMap::new(self, callback)
318 }
319
320 fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
322 where
323 F: FnMut(T) -> K,
324 {
325 GroupBy::new(self, callback)
326 }
327
328 fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
330 where
331 F: FnMut(&mut dyn Observer<T, E>, T),
332 {
333 HookOnNext::new(self, callback)
334 }
335
336 fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
338 where
339 F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
340 {
341 HookOnSubscription::new(self, callback)
342 }
343
344 fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
346 where
347 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
348 {
349 HookOnTermination::new(self, callback)
350 }
351
352 fn ignore_elements(self) -> IgnoreElements<Self> {
354 IgnoreElements::new(self)
355 }
356
357 fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
359 where
360 T: 'or,
361 E: 'or,
362 Self: NecessarySend + 'oe,
363 {
364 BoxedObservable::new(self)
365 }
366
367 #[cfg(feature = "futures")]
368 fn into_stream(self) -> ObservableStream<'sub, T, Self>
370 where
371 Self: Observable<'or, 'sub, T, Infallible>,
372 {
373 ObservableStream::new(self)
374 }
375
376 fn last(self) -> Last<Self> {
378 Last::new(self)
379 }
380
381 fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
383 where
384 F: FnMut(T) -> T1,
385 {
386 Map::new(self, callback)
387 }
388
389 fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
391 MapInfallibleToError::new(self)
392 }
393
394 fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
396 MapInfallibleToValue::new(self)
397 }
398
399 fn materialize(self) -> Materialize<Self> {
401 Materialize::new(self)
402 }
403
404 fn max(self) -> Max<Self> {
406 Max::new(self)
407 }
408
409 fn merge_all<T1>(self) -> MergeAll<Self, T>
411 where
412 T: Observable<'or, 'sub, T1, E>,
413 {
414 MergeAll::new(self)
415 }
416
417 fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
419 where
420 OE2: Observable<'or, 'sub, T, E>,
421 {
422 Merge::new(self, source_2)
423 }
424
425 fn min(self) -> Min<Self> {
427 Min::new(self)
428 }
429
430 fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
432 where
433 F: FnOnce() -> S,
434 {
435 ConnectableObservable::new(self, subject_maker())
436 }
437
438 fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
440 ObserveOn::new(self, scheduler)
441 }
442
443 fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
445 self.multicast(PublishSubject::default)
446 }
447
448 fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
450 self.multicast(AsyncSubject::default)
451 }
452
453 fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
455 where
456 F: FnMut(T0, T) -> T0,
457 {
458 Reduce::new(self, initial_value, callback)
459 }
460
461 fn replay(
463 self,
464 buffer_size: Option<usize>,
465 ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
466 self.multicast(|| ReplaySubject::new(buffer_size))
467 }
468
469 fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
471 where
472 OE1: Observable<'or, 'sub, T, E>,
473 F: FnMut(E) -> RetryAction<E, OE1>,
474 {
475 Retry::new(self, callback)
476 }
477
478 fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
480 where
481 OE1: Observable<'or, 'sub, (), E>,
482 {
483 Sample::new(self, sampler)
484 }
485
486 fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
488 where
489 F: FnMut(T0, T) -> T0,
490 {
491 Scan::new(self, initial_value, callback)
492 }
493
494 fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
496 where
497 OE2: Observable<'or, 'sub, T, E>,
498 {
499 SequenceEqual::new(self, another_source)
500 }
501
502 fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
504 self.publish().ref_count()
505 }
506
507 fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
509 self.publish_last().ref_count()
510 }
511
512 fn share_replay(
514 self,
515 buffer_size: Option<usize>,
516 ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
517 self.replay(buffer_size).ref_count()
518 }
519
520 fn skip(self, count: usize) -> Skip<Self> {
522 Skip::new(self, count)
523 }
524
525 fn skip_last(self, count: usize) -> SkipLast<Self> {
527 SkipLast::new(self, count)
528 }
529
530 fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
532 where
533 OE1: Observable<'or, 'sub, (), E>,
534 {
535 SkipUntil::new(self, start)
536 }
537
538 fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
540 where
541 F: FnMut(&T) -> bool,
542 {
543 SkipWhile::new(self, callback)
544 }
545
546 fn start_with<I>(self, values: I) -> StartWith<Self, I>
548 where
549 I: IntoIterator<Item = T>,
550 {
551 StartWith::new(self, values)
552 }
553
554 fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
556 SubscribeOn::new(self, scheduler)
557 }
558
559 fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
561 where
562 T: 'or,
563 E: 'or,
564 FN: FnMut(T) + NecessarySend + 'or,
565 FT: FnOnce(Termination<E>) + NecessarySend + 'or,
566 {
567 self.subscribe(CallbackObserver::new(on_next, on_termination))
568 }
569
570 fn sum(self) -> Sum<Self> {
572 Sum::new(self)
573 }
574
575 fn switch<T1>(self) -> Switch<Self, T>
577 where
578 T: Observable<'or, 'sub, T1, E>,
579 {
580 Switch::new(self)
581 }
582
583 fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
585 where
586 OE1: Observable<'or, 'sub, T1, E>,
587 F: FnMut(T) -> OE1,
588 {
589 SwitchMap::new(self, callback)
590 }
591
592 fn take(self, count: usize) -> Take<Self> {
594 Take::new(self, count)
595 }
596
597 fn take_last(self, count: usize) -> TakeLast<Self> {
599 TakeLast::new(self, count)
600 }
601
602 fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
604 where
605 OE1: Observable<'or, 'sub, (), E>,
606 {
607 TakeUntil::new(self, stop)
608 }
609
610 fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
612 where
613 F: FnMut(&T) -> bool,
614 {
615 TakeWhile::new(self, callback)
616 }
617
618 fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
620 Throttle::new(self, time_span, scheduler)
621 }
622
623 fn time_interval(self) -> TimeInterval<Self> {
625 TimeInterval::new(self)
626 }
627
628 fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
630 Timeout::new(self, duration, scheduler)
631 }
632
633 fn timestamp(self) -> Timestamp<Self> {
635 Timestamp::new(self)
636 }
637
638 fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
640 where
641 OE1: Observable<'or, 'sub, (), E>,
642 {
643 Window::new(self, boundary)
644 }
645
646 fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
648 WindowWithCount::new(self, count)
649 }
650
651 fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
653 where
654 OE2: Observable<'or, 'sub, T1, E>,
655 {
656 Zip::new(self, another_source)
657 }
658}
659
660impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
661{}