1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3 disposable::subscription::Subscription,
4 observer::{
5 Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6 },
7 operators::{
8 backpressure::{
9 on_backpressure::OnBackpressure, on_backpressure_buffer::OnBackpressureBuffer,
10 on_backpressure_latest::OnBackpressureLatest,
11 },
12 combining::{
13 combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
14 merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
15 },
16 conditional_boolean::{
17 all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
18 sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
19 take_until::TakeUntil, take_while::TakeWhile,
20 },
21 connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
22 error_handling::{
23 catch::Catch,
24 retry::{Retry, RetryAction},
25 },
26 filtering::{
27 debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
28 element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
29 last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
30 take_last::TakeLast, throttle::Throttle,
31 },
32 mathematical_aggregate::{
33 average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
34 },
35 others::{
36 debug::{Debug, DebugEvent, DefaultPrintType},
37 hook_on_next::HookOnNext,
38 hook_on_subscription::HookOnSubscription,
39 hook_on_termination::HookOnTermination,
40 map_infallible_to_error::MapInfallibleToError,
41 map_infallible_to_value::MapInfallibleToValue,
42 },
43 transforming::{
44 buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
45 buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
46 flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
47 window::Window, window_with_count::WindowWithCount,
48 },
49 utility::{
50 delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
51 do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
52 do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
53 do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
54 do_before_termination::DoBeforeTermination, materialize::Materialize,
55 observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
56 timeout::Timeout, timestamp::Timestamp,
57 },
58 },
59 subject::{
60 async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
61 },
62 utils::types::NecessarySendSync,
63};
64use std::{fmt::Display, num::NonZeroUsize, time::Duration};
65#[cfg(feature = "futures")]
66use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
67
68pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
72 fn all<F>(self, callback: F) -> All<T, Self, F>
74 where
75 F: FnMut(T) -> bool,
76 {
77 All::new(self, callback)
78 }
79
80 fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
82 Amb::new([self, other])
83 }
84
85 fn average(self) -> Average<T, Self> {
87 Average::new(self)
88 }
89
90 fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
92 where
93 OE1: Observable<'or, 'sub, (), E>,
94 {
95 Buffer::new(self, boundary)
96 }
97
98 fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
100 BufferWithCount::new(self, count)
101 }
102
103 fn buffer_with_time<S>(
105 self,
106 time_span: Duration,
107 scheduler: S,
108 delay: Option<Duration>,
109 ) -> BufferWithTime<Self, S> {
110 BufferWithTime::new(self, time_span, scheduler, delay)
111 }
112
113 fn buffer_with_time_or_count<S>(
115 self,
116 count: NonZeroUsize,
117 time_span: Duration,
118 scheduler: S,
119 delay: Option<Duration>,
120 ) -> BufferWithTimeOrCount<Self, S> {
121 BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
122 }
123
124 fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
126 where
127 OE1: Observable<'or, 'sub, T, E1>,
128 F: FnOnce(E) -> OE1,
129 {
130 Catch::new(self, callback)
131 }
132
133 fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
135 where
136 OE2: Observable<'or, 'sub, T1, E>,
137 {
138 CombineLatest::new(self, another_source)
139 }
140
141 fn concat_all<T1>(self) -> ConcatAll<Self, T>
143 where
144 T: Observable<'or, 'sub, T1, E>,
145 {
146 ConcatAll::new(self)
147 }
148
149 fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
151 where
152 OE1: Observable<'or, 'sub, T1, E>,
153 F: FnMut(T) -> OE1,
154 {
155 ConcatMap::new(self, callback)
156 }
157
158 fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
160 where
161 OE2: Observable<'or, 'sub, T, E>,
162 {
163 Concat::new(self, source_2)
164 }
165
166 fn contains(self, item: T) -> Contains<T, Self> {
168 Contains::new(self, item)
169 }
170
171 fn count(self) -> Count<T, Self> {
173 Count::new(self)
174 }
175
176 fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
178 Debounce::new(self, time_span, scheduler)
179 }
180
181 fn debug<C, F>(self, context: C, callback: F) -> Debug<Self, C, F>
183 where
184 F: Fn(C, DebugEvent<'_, T, E>),
185 {
186 Debug::new(self, context, callback)
187 }
188
189 fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
191 where
192 L: Display,
193 T: std::fmt::Debug,
194 E: std::fmt::Debug,
195 {
196 Debug::new_default_print(self, label)
197 }
198
199 fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
201 DefaultIfEmpty::new(self, default_value)
202 }
203
204 fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
206 Delay::new(self, delay, scheduler)
207 }
208
209 fn dematerialize(self) -> Dematerialize<Self> {
211 Dematerialize::new(self)
212 }
213
214 fn distinct(self) -> Distinct<Self, fn(&T) -> T>
216 where
217 T: Clone,
218 {
219 Distinct::new(self)
220 }
221
222 fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
224 where
225 F: FnMut(&T) -> K,
226 {
227 Distinct::new_with_key_selector(self, key_selector)
228 }
229
230 fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
232 where
233 T: Clone,
234 {
235 DistinctUntilChanged::new(self)
236 }
237
238 fn distinct_until_changed_with_key_selector<F, K>(
240 self,
241 key_selector: F,
242 ) -> DistinctUntilChanged<Self, F>
243 where
244 F: FnMut(&T) -> K,
245 {
246 DistinctUntilChanged::new_with_key_selector(self, key_selector)
247 }
248
249 fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
251 where
252 F: FnOnce(),
253 {
254 DoAfterDisposal::new(self, callback)
255 }
256
257 fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
259 where
260 F: FnMut(T),
261 {
262 DoAfterNext::new(self, callback)
263 }
264
265 fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
267 where
268 F: FnOnce(),
269 {
270 DoAfterSubscription::new(self, callback)
271 }
272
273 fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
275 where
276 F: FnOnce(Termination<E>),
277 {
278 DoAfterTermination::new(self, callback)
279 }
280
281 fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
283 where
284 F: FnOnce(),
285 {
286 DoBeforeDisposal::new(self, callback)
287 }
288
289 fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
291 where
292 F: FnMut(&T),
293 {
294 DoBeforeNext::new(self, callback)
295 }
296
297 fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
299 where
300 F: FnOnce(),
301 {
302 DoBeforeSubscription::new(self, callback)
303 }
304
305 fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
307 where
308 F: FnOnce(&Termination<E>),
309 {
310 DoBeforeTermination::new(self, callback)
311 }
312
313 fn element_at(self, index: usize) -> ElementAt<Self> {
315 ElementAt::new(self, index)
316 }
317
318 fn filter<F>(self, callback: F) -> Filter<Self, F>
320 where
321 F: FnMut(&T) -> bool,
322 {
323 Filter::new(self, callback)
324 }
325
326 fn first(self) -> First<Self> {
328 First::new(self)
329 }
330
331 fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
333 where
334 OE1: Observable<'or, 'sub, T1, E>,
335 F: FnMut(T) -> OE1,
336 {
337 FlatMap::new(self, callback)
338 }
339
340 fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
342 where
343 F: FnMut(T) -> K,
344 {
345 GroupBy::new(self, callback)
346 }
347
348 fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
350 where
351 F: FnMut(&mut dyn Observer<T, E>, T),
352 {
353 HookOnNext::new(self, callback)
354 }
355
356 fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
358 where
359 F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
360 {
361 HookOnSubscription::new(self, callback)
362 }
363
364 fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
366 where
367 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
368 {
369 HookOnTermination::new(self, callback)
370 }
371
372 fn ignore_elements(self) -> IgnoreElements<Self> {
374 IgnoreElements::new(self)
375 }
376
377 fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
379 where
380 T: 'or,
381 E: 'or,
382 Self: NecessarySendSync + 'oe,
383 {
384 BoxedObservable::new(self)
385 }
386
387 #[cfg(feature = "futures")]
388 fn into_stream(self) -> ObservableStream<'sub, T, Self>
390 where
391 Self: Observable<'or, 'sub, T, Infallible>,
392 {
393 ObservableStream::new(self)
394 }
395
396 fn last(self) -> Last<Self> {
398 Last::new(self)
399 }
400
401 fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
403 where
404 F: FnMut(T) -> T1,
405 {
406 Map::new(self, callback)
407 }
408
409 fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
411 MapInfallibleToError::new(self)
412 }
413
414 fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
416 MapInfallibleToValue::new(self)
417 }
418
419 fn materialize(self) -> Materialize<Self> {
421 Materialize::new(self)
422 }
423
424 fn max(self) -> Max<Self> {
426 Max::new(self)
427 }
428
429 fn merge_all<T1>(self) -> MergeAll<Self, T>
431 where
432 T: Observable<'or, 'sub, T1, E>,
433 {
434 MergeAll::new(self)
435 }
436
437 fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
439 where
440 OE2: Observable<'or, 'sub, T, E>,
441 {
442 Merge::new(self, source_2)
443 }
444
445 fn min(self) -> Min<Self> {
447 Min::new(self)
448 }
449
450 fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
452 where
453 F: FnOnce() -> S,
454 {
455 ConnectableObservable::new(self, subject_maker())
456 }
457
458 fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
460 ObserveOn::new(self, scheduler)
461 }
462
463 fn on_backpressure<F>(self, receiving_strategy: F) -> OnBackpressure<Self, F>
464 where
465 F: FnMut(&mut Vec<T>, T),
466 {
467 OnBackpressure::new(self, receiving_strategy)
468 }
469
470 fn on_backpressure_buffer(self) -> OnBackpressureBuffer<Self> {
471 OnBackpressureBuffer::new(self)
472 }
473
474 fn on_backpressure_latest(self) -> OnBackpressureLatest<Self> {
475 OnBackpressureLatest::new(self)
476 }
477
478 fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
480 self.multicast(PublishSubject::default)
481 }
482
483 fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
485 self.multicast(AsyncSubject::default)
486 }
487
488 fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
490 where
491 F: FnMut(T0, T) -> T0,
492 {
493 Reduce::new(self, initial_value, callback)
494 }
495
496 fn replay(
498 self,
499 buffer_size: Option<usize>,
500 ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
501 self.multicast(|| ReplaySubject::new(buffer_size))
502 }
503
504 fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
506 where
507 OE1: Observable<'or, 'sub, T, E>,
508 F: FnMut(E) -> RetryAction<E, OE1>,
509 {
510 Retry::new(self, callback)
511 }
512
513 fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
515 where
516 OE1: Observable<'or, 'sub, (), E>,
517 {
518 Sample::new(self, sampler)
519 }
520
521 fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
523 where
524 F: FnMut(T0, T) -> T0,
525 {
526 Scan::new(self, initial_value, callback)
527 }
528
529 fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
531 where
532 OE2: Observable<'or, 'sub, T, E>,
533 {
534 SequenceEqual::new(self, another_source)
535 }
536
537 fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
539 self.publish().ref_count()
540 }
541
542 fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
544 self.publish_last().ref_count()
545 }
546
547 fn share_replay(
549 self,
550 buffer_size: Option<usize>,
551 ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
552 self.replay(buffer_size).ref_count()
553 }
554
555 fn skip(self, count: usize) -> Skip<Self> {
557 Skip::new(self, count)
558 }
559
560 fn skip_last(self, count: usize) -> SkipLast<Self> {
562 SkipLast::new(self, count)
563 }
564
565 fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
567 where
568 OE1: Observable<'or, 'sub, (), E>,
569 {
570 SkipUntil::new(self, start)
571 }
572
573 fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
575 where
576 F: FnMut(&T) -> bool,
577 {
578 SkipWhile::new(self, callback)
579 }
580
581 fn start_with<I>(self, values: I) -> StartWith<Self, I>
583 where
584 I: IntoIterator<Item = T>,
585 {
586 StartWith::new(self, values)
587 }
588
589 fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
591 SubscribeOn::new(self, scheduler)
592 }
593
594 fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
596 where
597 T: 'or,
598 E: 'or,
599 FN: FnMut(T) + NecessarySendSync + 'or,
600 FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
601 {
602 self.subscribe(CallbackObserver::new(on_next, on_termination))
603 }
604
605 fn sum(self) -> Sum<Self> {
607 Sum::new(self)
608 }
609
610 fn switch<T1>(self) -> Switch<Self, T>
612 where
613 T: Observable<'or, 'sub, T1, E>,
614 {
615 Switch::new(self)
616 }
617
618 fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
620 where
621 OE1: Observable<'or, 'sub, T1, E>,
622 F: FnMut(T) -> OE1,
623 {
624 SwitchMap::new(self, callback)
625 }
626
627 fn take(self, count: usize) -> Take<Self> {
629 Take::new(self, count)
630 }
631
632 fn take_last(self, count: usize) -> TakeLast<Self> {
634 TakeLast::new(self, count)
635 }
636
637 fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
639 where
640 OE1: Observable<'or, 'sub, (), E>,
641 {
642 TakeUntil::new(self, stop)
643 }
644
645 fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
647 where
648 F: FnMut(&T) -> bool,
649 {
650 TakeWhile::new(self, callback)
651 }
652
653 fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
655 Throttle::new(self, time_span, scheduler)
656 }
657
658 fn time_interval(self) -> TimeInterval<Self> {
660 TimeInterval::new(self)
661 }
662
663 fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
665 Timeout::new(self, duration, scheduler)
666 }
667
668 fn timestamp(self) -> Timestamp<Self> {
670 Timestamp::new(self)
671 }
672
673 fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
675 where
676 OE1: Observable<'or, 'sub, (), E>,
677 {
678 Window::new(self, boundary)
679 }
680
681 fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
683 WindowWithCount::new(self, count)
684 }
685
686 fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
688 where
689 OE2: Observable<'or, 'sub, T1, E>,
690 {
691 Zip::new(self, another_source)
692 }
693}
694
695impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
696{}