1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3 disposable::subscription::Subscription,
4 observable::cloneable_boxed_observable::CloneableBoxedObservable,
5 observer::{
6 Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
7 },
8 operators::{
9 backpressure::{
10 on_backpressure::OnBackpressure, on_backpressure_buffer::OnBackpressureBuffer,
11 on_backpressure_latest::OnBackpressureLatest,
12 },
13 combining::{
14 combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
15 merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
16 },
17 conditional_boolean::{
18 all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
19 sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
20 take_until::TakeUntil, take_while::TakeWhile,
21 },
22 connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
23 error_handling::{
24 catch::Catch,
25 retry::{Retry, RetryAction},
26 },
27 filtering::{
28 debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
29 element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
30 last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
31 take_last::TakeLast, throttle::Throttle,
32 },
33 mathematical_aggregate::{
34 average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
35 },
36 others::{
37 debug::{Debug, DebugEvent, DefaultPrintType},
38 hook_on_next::HookOnNext,
39 hook_on_subscription::HookOnSubscription,
40 hook_on_termination::HookOnTermination,
41 map_infallible_to_error::MapInfallibleToError,
42 map_infallible_to_value::MapInfallibleToValue,
43 },
44 transforming::{
45 buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
46 buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
47 flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
48 window::Window, window_with_count::WindowWithCount,
49 },
50 utility::{
51 delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
52 do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
53 do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
54 do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
55 do_before_termination::DoBeforeTermination, materialize::Materialize,
56 observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
57 timeout::Timeout, timestamp::Timestamp,
58 },
59 },
60 subject::{
61 async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
62 },
63 utils::types::NecessarySendSync,
64};
65use std::{fmt::Display, num::NonZeroUsize, time::Duration};
66#[cfg(feature = "futures")]
67use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
68
69pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
73 fn all<F>(self, callback: F) -> All<T, Self, F>
75 where
76 F: FnMut(T) -> bool,
77 {
78 All::new(self, callback)
79 }
80
81 fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
83 Amb::new([self, other])
84 }
85
86 fn average(self) -> Average<T, Self> {
88 Average::new(self)
89 }
90
91 fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
93 where
94 OE1: Observable<'or, 'sub, (), E>,
95 {
96 Buffer::new(self, boundary)
97 }
98
99 fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
101 BufferWithCount::new(self, count)
102 }
103
104 fn buffer_with_time<S>(
106 self,
107 time_span: Duration,
108 scheduler: S,
109 delay: Option<Duration>,
110 ) -> BufferWithTime<Self, S> {
111 BufferWithTime::new(self, time_span, scheduler, delay)
112 }
113
114 fn buffer_with_time_or_count<S>(
116 self,
117 count: NonZeroUsize,
118 time_span: Duration,
119 scheduler: S,
120 delay: Option<Duration>,
121 ) -> BufferWithTimeOrCount<Self, S> {
122 BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
123 }
124
125 fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
127 where
128 OE1: Observable<'or, 'sub, T, E1>,
129 F: FnOnce(E) -> OE1,
130 {
131 Catch::new(self, callback)
132 }
133
134 fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
136 where
137 OE2: Observable<'or, 'sub, T1, E>,
138 {
139 CombineLatest::new(self, another_source)
140 }
141
142 fn concat_all<T1>(self) -> ConcatAll<Self, T>
144 where
145 T: Observable<'or, 'sub, T1, E>,
146 {
147 ConcatAll::new(self)
148 }
149
150 fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
152 where
153 OE1: Observable<'or, 'sub, T1, E>,
154 F: FnMut(T) -> OE1,
155 {
156 ConcatMap::new(self, callback)
157 }
158
159 fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
161 where
162 OE2: Observable<'or, 'sub, T, E>,
163 {
164 Concat::new(self, source_2)
165 }
166
167 fn contains(self, item: T) -> Contains<T, Self> {
169 Contains::new(self, item)
170 }
171
172 fn count(self) -> Count<T, Self> {
174 Count::new(self)
175 }
176
177 fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
179 Debounce::new(self, time_span, scheduler)
180 }
181
182 fn debug<C, F>(self, context: C, callback: F) -> Debug<Self, C, F>
184 where
185 F: Fn(C, DebugEvent<'_, T, E>),
186 {
187 Debug::new(self, context, callback)
188 }
189
190 fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
192 where
193 L: Display,
194 T: std::fmt::Debug,
195 E: std::fmt::Debug,
196 {
197 Debug::new_default_print(self, label)
198 }
199
200 fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
202 DefaultIfEmpty::new(self, default_value)
203 }
204
205 fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
207 Delay::new(self, delay, scheduler)
208 }
209
210 fn dematerialize(self) -> Dematerialize<Self> {
212 Dematerialize::new(self)
213 }
214
215 fn distinct(self) -> Distinct<Self, fn(&T) -> T>
217 where
218 T: Clone,
219 {
220 Distinct::new(self)
221 }
222
223 fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
225 where
226 F: FnMut(&T) -> K,
227 {
228 Distinct::new_with_key_selector(self, key_selector)
229 }
230
231 fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
233 where
234 T: Clone,
235 {
236 DistinctUntilChanged::new(self)
237 }
238
239 fn distinct_until_changed_with_key_selector<F, K>(
241 self,
242 key_selector: F,
243 ) -> DistinctUntilChanged<Self, F>
244 where
245 F: FnMut(&T) -> K,
246 {
247 DistinctUntilChanged::new_with_key_selector(self, key_selector)
248 }
249
250 fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
252 where
253 F: FnOnce(),
254 {
255 DoAfterDisposal::new(self, callback)
256 }
257
258 fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
260 where
261 F: FnMut(T),
262 {
263 DoAfterNext::new(self, callback)
264 }
265
266 fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
268 where
269 F: FnOnce(),
270 {
271 DoAfterSubscription::new(self, callback)
272 }
273
274 fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
276 where
277 F: FnOnce(Termination<E>),
278 {
279 DoAfterTermination::new(self, callback)
280 }
281
282 fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
284 where
285 F: FnOnce(),
286 {
287 DoBeforeDisposal::new(self, callback)
288 }
289
290 fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
292 where
293 F: FnMut(&T),
294 {
295 DoBeforeNext::new(self, callback)
296 }
297
298 fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
300 where
301 F: FnOnce(),
302 {
303 DoBeforeSubscription::new(self, callback)
304 }
305
306 fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
308 where
309 F: FnOnce(&Termination<E>),
310 {
311 DoBeforeTermination::new(self, callback)
312 }
313
314 fn element_at(self, index: usize) -> ElementAt<Self> {
316 ElementAt::new(self, index)
317 }
318
319 fn filter<F>(self, callback: F) -> Filter<Self, F>
321 where
322 F: FnMut(&T) -> bool,
323 {
324 Filter::new(self, callback)
325 }
326
327 fn first(self) -> First<Self> {
329 First::new(self)
330 }
331
332 fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
334 where
335 OE1: Observable<'or, 'sub, T1, E>,
336 F: FnMut(T) -> OE1,
337 {
338 FlatMap::new(self, callback)
339 }
340
341 fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
343 where
344 F: FnMut(T) -> K,
345 {
346 GroupBy::new(self, callback)
347 }
348
349 fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
351 where
352 F: FnMut(&mut dyn Observer<T, E>, T),
353 {
354 HookOnNext::new(self, callback)
355 }
356
357 fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
359 where
360 F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
361 {
362 HookOnSubscription::new(self, callback)
363 }
364
365 fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
367 where
368 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
369 {
370 HookOnTermination::new(self, callback)
371 }
372
373 fn ignore_elements(self) -> IgnoreElements<Self> {
375 IgnoreElements::new(self)
376 }
377
378 fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
380 where
381 T: 'or,
382 E: 'or,
383 Self: NecessarySendSync + 'oe,
384 {
385 BoxedObservable::new(self)
386 }
387
388 fn into_cloneable_boxed<'oe>(self) -> CloneableBoxedObservable<'or, 'sub, 'oe, T, E>
390 where
391 T: 'or,
392 E: 'or,
393 Self: NecessarySendSync + Clone + 'oe,
394 {
395 CloneableBoxedObservable::new(self)
396 }
397
398 #[cfg(feature = "futures")]
399 fn into_stream(self) -> ObservableStream<'sub, T, Self>
401 where
402 Self: Observable<'or, 'sub, T, Infallible>,
403 {
404 ObservableStream::new(self)
405 }
406
407 fn last(self) -> Last<Self> {
409 Last::new(self)
410 }
411
412 fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
414 where
415 F: FnMut(T) -> T1,
416 {
417 Map::new(self, callback)
418 }
419
420 fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
422 MapInfallibleToError::new(self)
423 }
424
425 fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
427 MapInfallibleToValue::new(self)
428 }
429
430 fn materialize(self) -> Materialize<Self> {
432 Materialize::new(self)
433 }
434
435 fn max(self) -> Max<Self> {
437 Max::new(self)
438 }
439
440 fn merge_all<T1>(self) -> MergeAll<Self, T>
442 where
443 T: Observable<'or, 'sub, T1, E>,
444 {
445 MergeAll::new(self)
446 }
447
448 fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
450 where
451 OE2: Observable<'or, 'sub, T, E>,
452 {
453 Merge::new(self, source_2)
454 }
455
456 fn min(self) -> Min<Self> {
458 Min::new(self)
459 }
460
461 fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
463 where
464 F: FnOnce() -> S,
465 {
466 ConnectableObservable::new(self, subject_maker())
467 }
468
469 fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
471 ObserveOn::new(self, scheduler)
472 }
473
474 fn on_backpressure<F>(self, receiving_strategy: F) -> OnBackpressure<Self, F>
475 where
476 F: FnMut(&mut Vec<T>, T),
477 {
478 OnBackpressure::new(self, receiving_strategy)
479 }
480
481 fn on_backpressure_buffer(self) -> OnBackpressureBuffer<Self> {
482 OnBackpressureBuffer::new(self)
483 }
484
485 fn on_backpressure_latest(self) -> OnBackpressureLatest<Self> {
486 OnBackpressureLatest::new(self)
487 }
488
489 fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
491 self.multicast(PublishSubject::default)
492 }
493
494 fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
496 self.multicast(AsyncSubject::default)
497 }
498
499 fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
501 where
502 F: FnMut(T0, T) -> T0,
503 {
504 Reduce::new(self, initial_value, callback)
505 }
506
507 fn replay(
509 self,
510 buffer_size: Option<usize>,
511 ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
512 self.multicast(|| ReplaySubject::new(buffer_size))
513 }
514
515 fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
517 where
518 OE1: Observable<'or, 'sub, T, E>,
519 F: FnMut(E) -> RetryAction<E, OE1>,
520 {
521 Retry::new(self, callback)
522 }
523
524 fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
526 where
527 OE1: Observable<'or, 'sub, (), E>,
528 {
529 Sample::new(self, sampler)
530 }
531
532 fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
534 where
535 F: FnMut(T0, T) -> T0,
536 {
537 Scan::new(self, initial_value, callback)
538 }
539
540 fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
542 where
543 OE2: Observable<'or, 'sub, T, E>,
544 {
545 SequenceEqual::new(self, another_source)
546 }
547
548 fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
550 self.publish().ref_count()
551 }
552
553 fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
555 self.publish_last().ref_count()
556 }
557
558 fn share_replay(
560 self,
561 buffer_size: Option<usize>,
562 ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
563 self.replay(buffer_size).ref_count()
564 }
565
566 fn skip(self, count: usize) -> Skip<Self> {
568 Skip::new(self, count)
569 }
570
571 fn skip_last(self, count: usize) -> SkipLast<Self> {
573 SkipLast::new(self, count)
574 }
575
576 fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
578 where
579 OE1: Observable<'or, 'sub, (), E>,
580 {
581 SkipUntil::new(self, start)
582 }
583
584 fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
586 where
587 F: FnMut(&T) -> bool,
588 {
589 SkipWhile::new(self, callback)
590 }
591
592 fn start_with<I>(self, values: I) -> StartWith<Self, I>
594 where
595 I: IntoIterator<Item = T>,
596 {
597 StartWith::new(self, values)
598 }
599
600 fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
602 SubscribeOn::new(self, scheduler)
603 }
604
605 fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
607 where
608 T: 'or,
609 E: 'or,
610 FN: FnMut(T) + NecessarySendSync + 'or,
611 FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
612 {
613 self.subscribe(CallbackObserver::new(on_next, on_termination))
614 }
615
616 fn sum(self) -> Sum<Self> {
618 Sum::new(self)
619 }
620
621 fn switch<T1>(self) -> Switch<Self, T>
623 where
624 T: Observable<'or, 'sub, T1, E>,
625 {
626 Switch::new(self)
627 }
628
629 fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
631 where
632 OE1: Observable<'or, 'sub, T1, E>,
633 F: FnMut(T) -> OE1,
634 {
635 SwitchMap::new(self, callback)
636 }
637
638 fn take(self, count: usize) -> Take<Self> {
640 Take::new(self, count)
641 }
642
643 fn take_last(self, count: usize) -> TakeLast<Self> {
645 TakeLast::new(self, count)
646 }
647
648 fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
650 where
651 OE1: Observable<'or, 'sub, (), E>,
652 {
653 TakeUntil::new(self, stop)
654 }
655
656 fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
658 where
659 F: FnMut(&T) -> bool,
660 {
661 TakeWhile::new(self, callback)
662 }
663
664 fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
666 Throttle::new(self, time_span, scheduler)
667 }
668
669 fn time_interval(self) -> TimeInterval<Self> {
671 TimeInterval::new(self)
672 }
673
674 fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
676 Timeout::new(self, duration, scheduler)
677 }
678
679 fn timestamp(self) -> Timestamp<Self> {
681 Timestamp::new(self)
682 }
683
684 fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
686 where
687 OE1: Observable<'or, 'sub, (), E>,
688 {
689 Window::new(self, boundary)
690 }
691
692 fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
694 WindowWithCount::new(self, count)
695 }
696
697 fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
699 where
700 OE2: Observable<'or, 'sub, T1, E>,
701 {
702 Zip::new(self, another_source)
703 }
704}
705
706impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
707{}