1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3 disposable::subscription::Subscription,
4 observer::{
5 Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6 },
7 operators::{
8 combining::{
9 combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
10 merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
11 },
12 conditional_boolean::{
13 all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
14 sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
15 take_until::TakeUntil, take_while::TakeWhile,
16 },
17 connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
18 error_handling::{
19 catch::Catch,
20 retry::{Retry, RetryAction},
21 },
22 filtering::{
23 debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
24 element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
25 last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
26 take_last::TakeLast, throttle::Throttle,
27 },
28 mathematical_aggregate::{
29 average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
30 },
31 others::{
32 debug::{Debug, DebugEvent, DefaultPrintType},
33 hook_on_next::HookOnNext,
34 hook_on_subscription::HookOnSubscription,
35 hook_on_termination::HookOnTermination,
36 map_infallible_to_error::MapInfallibleToError,
37 map_infallible_to_value::MapInfallibleToValue,
38 },
39 transforming::{
40 buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
41 buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
42 flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
43 window::Window, window_with_count::WindowWithCount,
44 },
45 utility::{
46 delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
47 do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
48 do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
49 do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
50 do_before_termination::DoBeforeTermination, materialize::Materialize,
51 observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
52 timeout::Timeout, timestamp::Timestamp,
53 },
54 },
55 subject::{
56 async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
57 },
58 utils::types::NecessarySendSync,
59};
60use std::{fmt::Display, num::NonZeroUsize, time::Duration};
61#[cfg(feature = "futures")]
62use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
63
64pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
68 fn all<F>(self, callback: F) -> All<T, Self, F>
70 where
71 F: FnMut(T) -> bool,
72 {
73 All::new(self, callback)
74 }
75
76 fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
78 Amb::new([self, other])
79 }
80
81 fn average(self) -> Average<T, Self> {
83 Average::new(self)
84 }
85
86 fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
88 where
89 OE1: Observable<'or, 'sub, (), E>,
90 {
91 Buffer::new(self, boundary)
92 }
93
94 fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
96 BufferWithCount::new(self, count)
97 }
98
99 fn buffer_with_time<S>(
101 self,
102 time_span: Duration,
103 scheduler: S,
104 delay: Option<Duration>,
105 ) -> BufferWithTime<Self, S> {
106 BufferWithTime::new(self, time_span, scheduler, delay)
107 }
108
109 fn buffer_with_time_or_count<S>(
111 self,
112 count: NonZeroUsize,
113 time_span: Duration,
114 scheduler: S,
115 delay: Option<Duration>,
116 ) -> BufferWithTimeOrCount<Self, S> {
117 BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
118 }
119
120 fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
122 where
123 OE1: Observable<'or, 'sub, T, E1>,
124 F: FnOnce(E) -> OE1,
125 {
126 Catch::new(self, callback)
127 }
128
129 fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
131 where
132 OE2: Observable<'or, 'sub, T1, E>,
133 {
134 CombineLatest::new(self, another_source)
135 }
136
137 fn concat_all<T1>(self) -> ConcatAll<Self, T>
139 where
140 T: Observable<'or, 'sub, T1, E>,
141 {
142 ConcatAll::new(self)
143 }
144
145 fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
147 where
148 OE1: Observable<'or, 'sub, T1, E>,
149 F: FnMut(T) -> OE1,
150 {
151 ConcatMap::new(self, callback)
152 }
153
154 fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
156 where
157 OE2: Observable<'or, 'sub, T, E>,
158 {
159 Concat::new(self, source_2)
160 }
161
162 fn contains(self, item: T) -> Contains<T, Self> {
164 Contains::new(self, item)
165 }
166
167 fn count(self) -> Count<T, Self> {
169 Count::new(self)
170 }
171
172 fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
174 Debounce::new(self, time_span, scheduler)
175 }
176
177 fn debug<C, F>(self, context: C, callback: F) -> Debug<Self, C, F>
179 where
180 F: Fn(C, DebugEvent<'_, T, E>),
181 {
182 Debug::new(self, context, callback)
183 }
184
185 fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
187 where
188 L: Display,
189 T: std::fmt::Debug,
190 E: std::fmt::Debug,
191 {
192 Debug::new_default_print(self, label)
193 }
194
195 fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
197 DefaultIfEmpty::new(self, default_value)
198 }
199
200 fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
202 Delay::new(self, delay, scheduler)
203 }
204
205 fn dematerialize(self) -> Dematerialize<Self> {
207 Dematerialize::new(self)
208 }
209
210 fn distinct(self) -> Distinct<Self, fn(&T) -> T>
212 where
213 T: Clone,
214 {
215 Distinct::new(self)
216 }
217
218 fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
220 where
221 F: FnMut(&T) -> K,
222 {
223 Distinct::new_with_key_selector(self, key_selector)
224 }
225
226 fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
228 where
229 T: Clone,
230 {
231 DistinctUntilChanged::new(self)
232 }
233
234 fn distinct_until_changed_with_key_selector<F, K>(
236 self,
237 key_selector: F,
238 ) -> DistinctUntilChanged<Self, F>
239 where
240 F: FnMut(&T) -> K,
241 {
242 DistinctUntilChanged::new_with_key_selector(self, key_selector)
243 }
244
245 fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
247 where
248 F: FnOnce(),
249 {
250 DoAfterDisposal::new(self, callback)
251 }
252
253 fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
255 where
256 F: FnMut(T),
257 {
258 DoAfterNext::new(self, callback)
259 }
260
261 fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
263 where
264 F: FnOnce(),
265 {
266 DoAfterSubscription::new(self, callback)
267 }
268
269 fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
271 where
272 F: FnOnce(Termination<E>),
273 {
274 DoAfterTermination::new(self, callback)
275 }
276
277 fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
279 where
280 F: FnOnce(),
281 {
282 DoBeforeDisposal::new(self, callback)
283 }
284
285 fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
287 where
288 F: FnMut(&T),
289 {
290 DoBeforeNext::new(self, callback)
291 }
292
293 fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
295 where
296 F: FnOnce(),
297 {
298 DoBeforeSubscription::new(self, callback)
299 }
300
301 fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
303 where
304 F: FnOnce(&Termination<E>),
305 {
306 DoBeforeTermination::new(self, callback)
307 }
308
309 fn element_at(self, index: usize) -> ElementAt<Self> {
311 ElementAt::new(self, index)
312 }
313
314 fn filter<F>(self, callback: F) -> Filter<Self, F>
316 where
317 F: FnMut(&T) -> bool,
318 {
319 Filter::new(self, callback)
320 }
321
322 fn first(self) -> First<Self> {
324 First::new(self)
325 }
326
327 fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
329 where
330 OE1: Observable<'or, 'sub, T1, E>,
331 F: FnMut(T) -> OE1,
332 {
333 FlatMap::new(self, callback)
334 }
335
336 fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
338 where
339 F: FnMut(T) -> K,
340 {
341 GroupBy::new(self, callback)
342 }
343
344 fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
346 where
347 F: FnMut(&mut dyn Observer<T, E>, T),
348 {
349 HookOnNext::new(self, callback)
350 }
351
352 fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
354 where
355 F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
356 {
357 HookOnSubscription::new(self, callback)
358 }
359
360 fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
362 where
363 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
364 {
365 HookOnTermination::new(self, callback)
366 }
367
368 fn ignore_elements(self) -> IgnoreElements<Self> {
370 IgnoreElements::new(self)
371 }
372
373 fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
375 where
376 T: 'or,
377 E: 'or,
378 Self: NecessarySendSync + 'oe,
379 {
380 BoxedObservable::new(self)
381 }
382
383 #[cfg(feature = "futures")]
384 fn into_stream(self) -> ObservableStream<'sub, T, Self>
386 where
387 Self: Observable<'or, 'sub, T, Infallible>,
388 {
389 ObservableStream::new(self)
390 }
391
392 fn last(self) -> Last<Self> {
394 Last::new(self)
395 }
396
397 fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
399 where
400 F: FnMut(T) -> T1,
401 {
402 Map::new(self, callback)
403 }
404
405 fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
407 MapInfallibleToError::new(self)
408 }
409
410 fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
412 MapInfallibleToValue::new(self)
413 }
414
415 fn materialize(self) -> Materialize<Self> {
417 Materialize::new(self)
418 }
419
420 fn max(self) -> Max<Self> {
422 Max::new(self)
423 }
424
425 fn merge_all<T1>(self) -> MergeAll<Self, T>
427 where
428 T: Observable<'or, 'sub, T1, E>,
429 {
430 MergeAll::new(self)
431 }
432
433 fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
435 where
436 OE2: Observable<'or, 'sub, T, E>,
437 {
438 Merge::new(self, source_2)
439 }
440
441 fn min(self) -> Min<Self> {
443 Min::new(self)
444 }
445
446 fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
448 where
449 F: FnOnce() -> S,
450 {
451 ConnectableObservable::new(self, subject_maker())
452 }
453
454 fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
456 ObserveOn::new(self, scheduler)
457 }
458
459 fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
461 self.multicast(PublishSubject::default)
462 }
463
464 fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
466 self.multicast(AsyncSubject::default)
467 }
468
469 fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
471 where
472 F: FnMut(T0, T) -> T0,
473 {
474 Reduce::new(self, initial_value, callback)
475 }
476
477 fn replay(
479 self,
480 buffer_size: Option<usize>,
481 ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
482 self.multicast(|| ReplaySubject::new(buffer_size))
483 }
484
485 fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
487 where
488 OE1: Observable<'or, 'sub, T, E>,
489 F: FnMut(E) -> RetryAction<E, OE1>,
490 {
491 Retry::new(self, callback)
492 }
493
494 fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
496 where
497 OE1: Observable<'or, 'sub, (), E>,
498 {
499 Sample::new(self, sampler)
500 }
501
502 fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
504 where
505 F: FnMut(T0, T) -> T0,
506 {
507 Scan::new(self, initial_value, callback)
508 }
509
510 fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
512 where
513 OE2: Observable<'or, 'sub, T, E>,
514 {
515 SequenceEqual::new(self, another_source)
516 }
517
518 fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
520 self.publish().ref_count()
521 }
522
523 fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
525 self.publish_last().ref_count()
526 }
527
528 fn share_replay(
530 self,
531 buffer_size: Option<usize>,
532 ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
533 self.replay(buffer_size).ref_count()
534 }
535
536 fn skip(self, count: usize) -> Skip<Self> {
538 Skip::new(self, count)
539 }
540
541 fn skip_last(self, count: usize) -> SkipLast<Self> {
543 SkipLast::new(self, count)
544 }
545
546 fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
548 where
549 OE1: Observable<'or, 'sub, (), E>,
550 {
551 SkipUntil::new(self, start)
552 }
553
554 fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
556 where
557 F: FnMut(&T) -> bool,
558 {
559 SkipWhile::new(self, callback)
560 }
561
562 fn start_with<I>(self, values: I) -> StartWith<Self, I>
564 where
565 I: IntoIterator<Item = T>,
566 {
567 StartWith::new(self, values)
568 }
569
570 fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
572 SubscribeOn::new(self, scheduler)
573 }
574
575 fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
577 where
578 T: 'or,
579 E: 'or,
580 FN: FnMut(T) + NecessarySendSync + 'or,
581 FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
582 {
583 self.subscribe(CallbackObserver::new(on_next, on_termination))
584 }
585
586 fn sum(self) -> Sum<Self> {
588 Sum::new(self)
589 }
590
591 fn switch<T1>(self) -> Switch<Self, T>
593 where
594 T: Observable<'or, 'sub, T1, E>,
595 {
596 Switch::new(self)
597 }
598
599 fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
601 where
602 OE1: Observable<'or, 'sub, T1, E>,
603 F: FnMut(T) -> OE1,
604 {
605 SwitchMap::new(self, callback)
606 }
607
608 fn take(self, count: usize) -> Take<Self> {
610 Take::new(self, count)
611 }
612
613 fn take_last(self, count: usize) -> TakeLast<Self> {
615 TakeLast::new(self, count)
616 }
617
618 fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
620 where
621 OE1: Observable<'or, 'sub, (), E>,
622 {
623 TakeUntil::new(self, stop)
624 }
625
626 fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
628 where
629 F: FnMut(&T) -> bool,
630 {
631 TakeWhile::new(self, callback)
632 }
633
634 fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
636 Throttle::new(self, time_span, scheduler)
637 }
638
639 fn time_interval(self) -> TimeInterval<Self> {
641 TimeInterval::new(self)
642 }
643
644 fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
646 Timeout::new(self, duration, scheduler)
647 }
648
649 fn timestamp(self) -> Timestamp<Self> {
651 Timestamp::new(self)
652 }
653
654 fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
656 where
657 OE1: Observable<'or, 'sub, (), E>,
658 {
659 Window::new(self, boundary)
660 }
661
662 fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
664 WindowWithCount::new(self, count)
665 }
666
667 fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
669 where
670 OE2: Observable<'or, 'sub, T1, E>,
671 {
672 Zip::new(self, another_source)
673 }
674}
675
676impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
677{}