1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3 disposable::subscription::Subscription,
4 observer::{
5 Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6 },
7 operators::{
8 combining::{
9 combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
10 merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
11 },
12 conditional_boolean::{
13 all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
14 sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
15 take_until::TakeUntil, take_while::TakeWhile,
16 },
17 connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
18 error_handling::{
19 catch::Catch,
20 retry::{Retry, RetryAction},
21 },
22 filtering::{
23 debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
24 element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
25 last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
26 take_last::TakeLast, throttle::Throttle,
27 },
28 mathematical_aggregate::{
29 average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
30 },
31 others::{
32 debug::{Debug, DefaultPrintType},
33 hook_on_next::HookOnNext,
34 hook_on_subscription::HookOnSubscription,
35 hook_on_termination::HookOnTermination,
36 map_infallible_to_error::MapInfallibleToError,
37 map_infallible_to_value::MapInfallibleToValue,
38 },
39 transforming::{
40 buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
41 buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
42 flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
43 window::Window, window_with_count::WindowWithCount,
44 },
45 utility::{
46 delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
47 do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
48 do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
49 do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
50 do_before_termination::DoBeforeTermination, materialize::Materialize,
51 observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
52 timeout::Timeout, timestamp::Timestamp,
53 },
54 },
55 subject::{
56 async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
57 },
58 utils::types::NecessarySendSync,
59};
60use std::{fmt::Display, num::NonZeroUsize, time::Duration};
61#[cfg(feature = "futures")]
62use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
63
64pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
68 fn all<F>(self, callback: F) -> All<T, Self, F>
70 where
71 F: FnMut(T) -> bool,
72 {
73 All::new(self, callback)
74 }
75
76 fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
78 Amb::new([self, other])
79 }
80
81 fn average(self) -> Average<T, Self> {
83 Average::new(self)
84 }
85
86 fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
88 where
89 OE1: Observable<'or, 'sub, (), E>,
90 {
91 Buffer::new(self, boundary)
92 }
93
94 fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
96 BufferWithCount::new(self, count)
97 }
98
99 fn buffer_with_time<S>(
101 self,
102 time_span: Duration,
103 scheduler: S,
104 delay: Option<Duration>,
105 ) -> BufferWithTime<Self, S> {
106 BufferWithTime::new(self, time_span, scheduler, delay)
107 }
108
109 fn buffer_with_time_or_count<S>(
111 self,
112 count: NonZeroUsize,
113 time_span: Duration,
114 scheduler: S,
115 delay: Option<Duration>,
116 ) -> BufferWithTimeOrCount<Self, S> {
117 BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
118 }
119
120 fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
122 where
123 OE1: Observable<'or, 'sub, T, E1>,
124 F: FnOnce(E) -> OE1,
125 {
126 Catch::new(self, callback)
127 }
128
129 fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
131 where
132 OE2: Observable<'or, 'sub, T1, E>,
133 {
134 CombineLatest::new(self, another_source)
135 }
136
137 fn concat_all<T1>(self) -> ConcatAll<Self, T>
139 where
140 T: Observable<'or, 'sub, T1, E>,
141 {
142 ConcatAll::new(self)
143 }
144
145 fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
147 where
148 OE1: Observable<'or, 'sub, T1, E>,
149 F: FnMut(T) -> OE1,
150 {
151 ConcatMap::new(self, callback)
152 }
153
154 fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
156 where
157 OE2: Observable<'or, 'sub, T, E>,
158 {
159 Concat::new(self, source_2)
160 }
161
162 fn contains(self, item: T) -> Contains<T, Self> {
164 Contains::new(self, item)
165 }
166
167 fn count(self) -> Count<T, Self> {
169 Count::new(self)
170 }
171
172 fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
174 Debounce::new(self, time_span, scheduler)
175 }
176
177 fn debug<L, F>(self, label: L, callback: F) -> Debug<Self, L, F> {
179 Debug::new(self, label, callback)
180 }
181
182 fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
184 where
185 L: Display,
186 T: std::fmt::Debug,
187 E: std::fmt::Debug,
188 {
189 Debug::new_default_print(self, label)
190 }
191
192 fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
194 DefaultIfEmpty::new(self, default_value)
195 }
196
197 fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
199 Delay::new(self, delay, scheduler)
200 }
201
202 fn dematerialize(self) -> Dematerialize<Self> {
204 Dematerialize::new(self)
205 }
206
207 fn distinct(self) -> Distinct<Self, fn(&T) -> T>
209 where
210 T: Clone,
211 {
212 Distinct::new(self)
213 }
214
215 fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
217 where
218 F: FnMut(&T) -> K,
219 {
220 Distinct::new_with_key_selector(self, key_selector)
221 }
222
223 fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
225 where
226 T: Clone,
227 {
228 DistinctUntilChanged::new(self)
229 }
230
231 fn distinct_until_changed_with_key_selector<F, K>(
233 self,
234 key_selector: F,
235 ) -> DistinctUntilChanged<Self, F>
236 where
237 F: FnMut(&T) -> K,
238 {
239 DistinctUntilChanged::new_with_key_selector(self, key_selector)
240 }
241
242 fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
244 where
245 F: FnOnce(),
246 {
247 DoAfterDisposal::new(self, callback)
248 }
249
250 fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
252 where
253 F: FnMut(T),
254 {
255 DoAfterNext::new(self, callback)
256 }
257
258 fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
260 where
261 F: FnOnce(),
262 {
263 DoAfterSubscription::new(self, callback)
264 }
265
266 fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
268 where
269 F: FnOnce(Termination<E>),
270 {
271 DoAfterTermination::new(self, callback)
272 }
273
274 fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
276 where
277 F: FnOnce(),
278 {
279 DoBeforeDisposal::new(self, callback)
280 }
281
282 fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
284 where
285 F: FnMut(&T),
286 {
287 DoBeforeNext::new(self, callback)
288 }
289
290 fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
292 where
293 F: FnOnce(),
294 {
295 DoBeforeSubscription::new(self, callback)
296 }
297
298 fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
300 where
301 F: FnOnce(&Termination<E>),
302 {
303 DoBeforeTermination::new(self, callback)
304 }
305
306 fn element_at(self, index: usize) -> ElementAt<Self> {
308 ElementAt::new(self, index)
309 }
310
311 fn filter<F>(self, callback: F) -> Filter<Self, F>
313 where
314 F: FnMut(&T) -> bool,
315 {
316 Filter::new(self, callback)
317 }
318
319 fn first(self) -> First<Self> {
321 First::new(self)
322 }
323
324 fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
326 where
327 OE1: Observable<'or, 'sub, T1, E>,
328 F: FnMut(T) -> OE1,
329 {
330 FlatMap::new(self, callback)
331 }
332
333 fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
335 where
336 F: FnMut(T) -> K,
337 {
338 GroupBy::new(self, callback)
339 }
340
341 fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
343 where
344 F: FnMut(&mut dyn Observer<T, E>, T),
345 {
346 HookOnNext::new(self, callback)
347 }
348
349 fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
351 where
352 F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
353 {
354 HookOnSubscription::new(self, callback)
355 }
356
357 fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
359 where
360 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
361 {
362 HookOnTermination::new(self, callback)
363 }
364
365 fn ignore_elements(self) -> IgnoreElements<Self> {
367 IgnoreElements::new(self)
368 }
369
370 fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
372 where
373 T: 'or,
374 E: 'or,
375 Self: NecessarySendSync + 'oe,
376 {
377 BoxedObservable::new(self)
378 }
379
380 #[cfg(feature = "futures")]
381 fn into_stream(self) -> ObservableStream<'sub, T, Self>
383 where
384 Self: Observable<'or, 'sub, T, Infallible>,
385 {
386 ObservableStream::new(self)
387 }
388
389 fn last(self) -> Last<Self> {
391 Last::new(self)
392 }
393
394 fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
396 where
397 F: FnMut(T) -> T1,
398 {
399 Map::new(self, callback)
400 }
401
402 fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
404 MapInfallibleToError::new(self)
405 }
406
407 fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
409 MapInfallibleToValue::new(self)
410 }
411
412 fn materialize(self) -> Materialize<Self> {
414 Materialize::new(self)
415 }
416
417 fn max(self) -> Max<Self> {
419 Max::new(self)
420 }
421
422 fn merge_all<T1>(self) -> MergeAll<Self, T>
424 where
425 T: Observable<'or, 'sub, T1, E>,
426 {
427 MergeAll::new(self)
428 }
429
430 fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
432 where
433 OE2: Observable<'or, 'sub, T, E>,
434 {
435 Merge::new(self, source_2)
436 }
437
438 fn min(self) -> Min<Self> {
440 Min::new(self)
441 }
442
443 fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
445 where
446 F: FnOnce() -> S,
447 {
448 ConnectableObservable::new(self, subject_maker())
449 }
450
451 fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
453 ObserveOn::new(self, scheduler)
454 }
455
456 fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
458 self.multicast(PublishSubject::default)
459 }
460
461 fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
463 self.multicast(AsyncSubject::default)
464 }
465
466 fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
468 where
469 F: FnMut(T0, T) -> T0,
470 {
471 Reduce::new(self, initial_value, callback)
472 }
473
474 fn replay(
476 self,
477 buffer_size: Option<usize>,
478 ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
479 self.multicast(|| ReplaySubject::new(buffer_size))
480 }
481
482 fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
484 where
485 OE1: Observable<'or, 'sub, T, E>,
486 F: FnMut(E) -> RetryAction<E, OE1>,
487 {
488 Retry::new(self, callback)
489 }
490
491 fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
493 where
494 OE1: Observable<'or, 'sub, (), E>,
495 {
496 Sample::new(self, sampler)
497 }
498
499 fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
501 where
502 F: FnMut(T0, T) -> T0,
503 {
504 Scan::new(self, initial_value, callback)
505 }
506
507 fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
509 where
510 OE2: Observable<'or, 'sub, T, E>,
511 {
512 SequenceEqual::new(self, another_source)
513 }
514
515 fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
517 self.publish().ref_count()
518 }
519
520 fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
522 self.publish_last().ref_count()
523 }
524
525 fn share_replay(
527 self,
528 buffer_size: Option<usize>,
529 ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
530 self.replay(buffer_size).ref_count()
531 }
532
533 fn skip(self, count: usize) -> Skip<Self> {
535 Skip::new(self, count)
536 }
537
538 fn skip_last(self, count: usize) -> SkipLast<Self> {
540 SkipLast::new(self, count)
541 }
542
543 fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
545 where
546 OE1: Observable<'or, 'sub, (), E>,
547 {
548 SkipUntil::new(self, start)
549 }
550
551 fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
553 where
554 F: FnMut(&T) -> bool,
555 {
556 SkipWhile::new(self, callback)
557 }
558
559 fn start_with<I>(self, values: I) -> StartWith<Self, I>
561 where
562 I: IntoIterator<Item = T>,
563 {
564 StartWith::new(self, values)
565 }
566
567 fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
569 SubscribeOn::new(self, scheduler)
570 }
571
572 fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
574 where
575 T: 'or,
576 E: 'or,
577 FN: FnMut(T) + NecessarySendSync + 'or,
578 FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
579 {
580 self.subscribe(CallbackObserver::new(on_next, on_termination))
581 }
582
583 fn sum(self) -> Sum<Self> {
585 Sum::new(self)
586 }
587
588 fn switch<T1>(self) -> Switch<Self, T>
590 where
591 T: Observable<'or, 'sub, T1, E>,
592 {
593 Switch::new(self)
594 }
595
596 fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
598 where
599 OE1: Observable<'or, 'sub, T1, E>,
600 F: FnMut(T) -> OE1,
601 {
602 SwitchMap::new(self, callback)
603 }
604
605 fn take(self, count: usize) -> Take<Self> {
607 Take::new(self, count)
608 }
609
610 fn take_last(self, count: usize) -> TakeLast<Self> {
612 TakeLast::new(self, count)
613 }
614
615 fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
617 where
618 OE1: Observable<'or, 'sub, (), E>,
619 {
620 TakeUntil::new(self, stop)
621 }
622
623 fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
625 where
626 F: FnMut(&T) -> bool,
627 {
628 TakeWhile::new(self, callback)
629 }
630
631 fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
633 Throttle::new(self, time_span, scheduler)
634 }
635
636 fn time_interval(self) -> TimeInterval<Self> {
638 TimeInterval::new(self)
639 }
640
641 fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
643 Timeout::new(self, duration, scheduler)
644 }
645
646 fn timestamp(self) -> Timestamp<Self> {
648 Timestamp::new(self)
649 }
650
651 fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
653 where
654 OE1: Observable<'or, 'sub, (), E>,
655 {
656 Window::new(self, boundary)
657 }
658
659 fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
661 WindowWithCount::new(self, count)
662 }
663
664 fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
666 where
667 OE2: Observable<'or, 'sub, T1, E>,
668 {
669 Zip::new(self, another_source)
670 }
671}
672
673impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
674{}