rx_rust/observable/
observable_ext.rs

1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3    disposable::subscription::Subscription,
4    observer::{
5        Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6    },
7    operators::{
8        combining::{
9            combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
10            merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
11        },
12        conditional_boolean::{
13            all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
14            sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
15            take_until::TakeUntil, take_while::TakeWhile,
16        },
17        connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
18        error_handling::{
19            catch::Catch,
20            retry::{Retry, RetryAction},
21        },
22        filtering::{
23            debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
24            element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
25            last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
26            take_last::TakeLast, throttle::Throttle,
27        },
28        mathematical_aggregate::{
29            average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
30        },
31        others::{
32            debug::Debug, hook_on_next::HookOnNext, hook_on_subscription::HookOnSubscription,
33            hook_on_termination::HookOnTermination, map_infallible_to_error::MapInfallibleToError,
34            map_infallible_to_value::MapInfallibleToValue,
35        },
36        transforming::{
37            buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
38            buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
39            flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
40            window::Window, window_with_count::WindowWithCount,
41        },
42        utility::{
43            delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
44            do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
45            do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
46            do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
47            do_before_termination::DoBeforeTermination, materialize::Materialize,
48            observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
49            timeout::Timeout, timestamp::Timestamp,
50        },
51    },
52    subject::{
53        async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
54    },
55    utils::types::NecessarySendSync,
56};
57use std::{num::NonZeroUsize, time::Duration};
58#[cfg(feature = "futures")]
59use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
60
61/// Extension trait that exposes the full suite of RxRust operators on any type that
62/// implements [`Observable`]. Each method forwards to the corresponding operator
63/// constructor, allowing a fluent, ergonomic style when composing observable pipelines.
64pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
65    /// Emits a single `bool` indicating whether every item satisfies the provided predicate.
66    fn all<F>(self, callback: F) -> All<T, Self, F>
67    where
68        F: FnMut(T) -> bool,
69    {
70        All::new(self, callback)
71    }
72
73    /// Competes two observables and mirrors whichever one produces an item or error first.
74    fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
75        Amb::new([self, other])
76    }
77
78    /// Calculates the arithmetic mean of all numeric items emitted by the source.
79    fn average(self) -> Average<T, Self> {
80        Average::new(self)
81    }
82
83    /// Collects the items emitted by the source into buffers delimited by another observable.
84    fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
85    where
86        OE1: Observable<'or, 'sub, (), E>,
87    {
88        Buffer::new(self, boundary)
89    }
90
91    /// Collects items into fixed-size buffers and emits each buffer as soon as it fills up.
92    fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
93        BufferWithCount::new(self, count)
94    }
95
96    /// Collects items into time-based buffers driven by the provided scheduler.
97    fn buffer_with_time<S>(
98        self,
99        time_span: Duration,
100        scheduler: S,
101        delay: Option<Duration>,
102    ) -> BufferWithTime<Self, S> {
103        BufferWithTime::new(self, time_span, scheduler, delay)
104    }
105
106    /// Collects items into buffers using both size and time boundaries whichever occurs first.
107    fn buffer_with_time_or_count<S>(
108        self,
109        count: NonZeroUsize,
110        time_span: Duration,
111        scheduler: S,
112        delay: Option<Duration>,
113    ) -> BufferWithTimeOrCount<Self, S> {
114        BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
115    }
116
117    /// Recovers from errors by switching to another observable yielded by the callback.
118    fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
119    where
120        OE1: Observable<'or, 'sub, T, E1>,
121        F: FnOnce(E) -> OE1,
122    {
123        Catch::new(self, callback)
124    }
125
126    /// Combines the latest values from both observables whenever either produces a new item.
127    fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
128    where
129        OE2: Observable<'or, 'sub, T1, E>,
130    {
131        CombineLatest::new(self, another_source)
132    }
133
134    /// Flattens an observable-of-observables by concatenating each inner observable sequentially.
135    fn concat_all<T1>(self) -> ConcatAll<Self, T>
136    where
137        T: Observable<'or, 'sub, T1, E>,
138    {
139        ConcatAll::new(self)
140    }
141
142    /// Maps each item to an observable and concatenates the resulting inner sequences.
143    fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
144    where
145        OE1: Observable<'or, 'sub, T1, E>,
146        F: FnMut(T) -> OE1,
147    {
148        ConcatMap::new(self, callback)
149    }
150
151    /// Concatenates the source with another observable, waiting for the first to complete.
152    fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
153    where
154        OE2: Observable<'or, 'sub, T, E>,
155    {
156        Concat::new(self, source_2)
157    }
158
159    /// Emits `true` if the sequence contains the provided item, `false` otherwise.
160    fn contains(self, item: T) -> Contains<T, Self> {
161        Contains::new(self, item)
162    }
163
164    /// Counts the number of items emitted and emits that count as a single value.
165    fn count(self) -> Count<T, Self> {
166        Count::new(self)
167    }
168
169    /// Emits an item from the source Observable only after a particular time span has passed without another source emission.
170    fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
171        Debounce::new(self, time_span, scheduler)
172    }
173
174    /// Attaches a label to the stream and logs lifecycle events for debugging purposes.
175    fn debug<D>(self, label: D) -> Debug<Self, D> {
176        Debug::new(self, label)
177    }
178
179    /// Emits a default value if the source completes without emitting any items.
180    fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
181        DefaultIfEmpty::new(self, default_value)
182    }
183
184    /// Offsets the emission of items by the specified duration using the given scheduler.
185    fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
186        Delay::new(self, delay, scheduler)
187    }
188
189    /// Converts a stream of notifications back into a normal observable sequence.
190    fn dematerialize(self) -> Dematerialize<Self> {
191        Dematerialize::new(self)
192    }
193
194    /// Filters out duplicate items, keeping only the first occurrence of each value.
195    fn distinct(self) -> Distinct<Self, fn(&T) -> T>
196    where
197        T: Clone,
198    {
199        Distinct::new(self)
200    }
201
202    /// Filters out duplicates based on a key selector, keeping only unique keys.
203    fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
204    where
205        F: FnMut(&T) -> K,
206    {
207        Distinct::new_with_key_selector(self, key_selector)
208    }
209
210    /// Suppresses consecutive duplicate items, comparing the values directly.
211    fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
212    where
213        T: Clone,
214    {
215        DistinctUntilChanged::new(self)
216    }
217
218    /// Suppresses consecutive duplicate items using a custom key selector.
219    fn distinct_until_changed_with_key_selector<F, K>(
220        self,
221        key_selector: F,
222    ) -> DistinctUntilChanged<Self, F>
223    where
224        F: FnMut(&T) -> K,
225    {
226        DistinctUntilChanged::new_with_key_selector(self, key_selector)
227    }
228
229    /// Invokes a callback after the downstream subscription is disposed.
230    fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
231    where
232        F: FnOnce(),
233    {
234        DoAfterDisposal::new(self, callback)
235    }
236
237    /// Invokes a callback after each item is forwarded downstream.
238    fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
239    where
240        F: FnMut(T),
241    {
242        DoAfterNext::new(self, callback)
243    }
244
245    /// Invokes a callback after the observer subscribes to the source.
246    fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
247    where
248        F: FnOnce(),
249    {
250        DoAfterSubscription::new(self, callback)
251    }
252
253    /// Invokes a callback after the source terminates, regardless of completion or error.
254    fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
255    where
256        F: FnOnce(Termination<E>),
257    {
258        DoAfterTermination::new(self, callback)
259    }
260
261    /// Invokes a callback right before the downstream subscription is disposed.
262    fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
263    where
264        F: FnOnce(),
265    {
266        DoBeforeDisposal::new(self, callback)
267    }
268
269    /// Invokes a callback with a reference to each item before it is sent downstream.
270    fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
271    where
272        F: FnMut(&T),
273    {
274        DoBeforeNext::new(self, callback)
275    }
276
277    /// Invokes a callback just before the observer subscribes to the source.
278    fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
279    where
280        F: FnOnce(),
281    {
282        DoBeforeSubscription::new(self, callback)
283    }
284
285    /// Invokes a callback before the stream terminates, receiving the termination reason.
286    fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
287    where
288        F: FnOnce(&Termination<E>),
289    {
290        DoBeforeTermination::new(self, callback)
291    }
292
293    /// Emits only the item at the given zero-based index and then completes.
294    fn element_at(self, index: usize) -> ElementAt<Self> {
295        ElementAt::new(self, index)
296    }
297
298    /// Filters items using a predicate, forwarding only values that return `true`.
299    fn filter<F>(self, callback: F) -> Filter<Self, F>
300    where
301        F: FnMut(&T) -> bool,
302    {
303        Filter::new(self, callback)
304    }
305
306    /// Emits only the first item from the source, then completes.
307    fn first(self) -> First<Self> {
308        First::new(self)
309    }
310
311    /// Maps each item to an observable and merges the resulting inner sequences concurrently.
312    fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
313    where
314        OE1: Observable<'or, 'sub, T1, E>,
315        F: FnMut(T) -> OE1,
316    {
317        FlatMap::new(self, callback)
318    }
319
320    /// Groups items by key into multiple observable sequences.
321    fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
322    where
323        F: FnMut(T) -> K,
324    {
325        GroupBy::new(self, callback)
326    }
327
328    /// Hooks into the emission of items, allowing mutation of the downstream observer.
329    fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
330    where
331        F: FnMut(&mut dyn Observer<T, E>, T),
332    {
333        HookOnNext::new(self, callback)
334    }
335
336    /// Hooks into subscription, letting you override how the source subscribes observers.
337    fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
338    where
339        F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
340    {
341        HookOnSubscription::new(self, callback)
342    }
343
344    /// Hooks into termination, providing access to the observer and termination payload.
345    fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
346    where
347        F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
348    {
349        HookOnTermination::new(self, callback)
350    }
351
352    /// Ignores all items from the source, only relaying termination events.
353    fn ignore_elements(self) -> IgnoreElements<Self> {
354        IgnoreElements::new(self)
355    }
356
357    /// Boxes the observable, erasing its concrete type while preserving lifetime bounds.
358    fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
359    where
360        T: 'or,
361        E: 'or,
362        Self: NecessarySendSync + 'oe,
363    {
364        BoxedObservable::new(self)
365    }
366
367    #[cfg(feature = "futures")]
368    /// Converts the observable into an async stream.
369    fn into_stream(self) -> ObservableStream<'sub, T, Self>
370    where
371        Self: Observable<'or, 'sub, T, Infallible>,
372    {
373        ObservableStream::new(self)
374    }
375
376    /// Emits only the final item produced by the source before completion.
377    fn last(self) -> Last<Self> {
378        Last::new(self)
379    }
380
381    /// Transforms each item by applying a user-supplied mapping function.
382    fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
383    where
384        F: FnMut(T) -> T1,
385    {
386        Map::new(self, callback)
387    }
388
389    /// Maps an Observable with an `Infallible` error type to an Observable with a concrete error type.
390    fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
391        MapInfallibleToError::new(self)
392    }
393
394    /// Maps an Observable with an `Infallible` item type to an Observable with a concrete item type.
395    fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
396        MapInfallibleToValue::new(self)
397    }
398
399    /// Wraps each item into a notification, turning the stream into explicit events.
400    fn materialize(self) -> Materialize<Self> {
401        Materialize::new(self)
402    }
403
404    /// Emits the maximum item produced by the source according to the natural order.
405    fn max(self) -> Max<Self> {
406        Max::new(self)
407    }
408
409    /// Merges an observable-of-observables by interleaving items from inner streams.
410    fn merge_all<T1>(self) -> MergeAll<Self, T>
411    where
412        T: Observable<'or, 'sub, T1, E>,
413    {
414        MergeAll::new(self)
415    }
416
417    /// Merges the source with another observable, interleaving both streams concurrently.
418    fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
419    where
420        OE2: Observable<'or, 'sub, T, E>,
421    {
422        Merge::new(self, source_2)
423    }
424
425    /// Emits the minimum item produced by the source according to the natural order.
426    fn min(self) -> Min<Self> {
427        Min::new(self)
428    }
429
430    /// Converts the source into a connectable observable using a subject factory.
431    fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
432    where
433        F: FnOnce() -> S,
434    {
435        ConnectableObservable::new(self, subject_maker())
436    }
437
438    /// Schedules downstream observation on the provided scheduler.
439    fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
440        ObserveOn::new(self, scheduler)
441    }
442
443    /// Multicasts the source using a `PublishSubject`.
444    fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
445        self.multicast(PublishSubject::default)
446    }
447
448    /// Multicasts the source using an `AsyncSubject`, emitting only the last value.
449    fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
450        self.multicast(AsyncSubject::default)
451    }
452
453    /// Aggregates the sequence using an initial seed and an accumulator function.
454    fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
455    where
456        F: FnMut(T0, T) -> T0,
457    {
458        Reduce::new(self, initial_value, callback)
459    }
460
461    /// Multicasts the source using a `ReplaySubject` configured with the given buffer size.
462    fn replay(
463        self,
464        buffer_size: Option<usize>,
465    ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
466        self.multicast(|| ReplaySubject::new(buffer_size))
467    }
468
469    /// Re-subscribes to the source based on the retry strategy returned by the callback.
470    fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
471    where
472        OE1: Observable<'or, 'sub, T, E>,
473        F: FnMut(E) -> RetryAction<E, OE1>,
474    {
475        Retry::new(self, callback)
476    }
477
478    /// Samples the source whenever the sampler observable emits an event.
479    fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
480    where
481        OE1: Observable<'or, 'sub, (), E>,
482    {
483        Sample::new(self, sampler)
484    }
485
486    /// Accumulates values over time, emitting each intermediate result.
487    fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
488    where
489        F: FnMut(T0, T) -> T0,
490    {
491        Scan::new(self, initial_value, callback)
492    }
493
494    /// Compares two sequences element by element for equality.
495    fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
496    where
497        OE2: Observable<'or, 'sub, T, E>,
498    {
499        SequenceEqual::new(self, another_source)
500    }
501
502    /// Shares a single subscription to the source using `PublishSubject` semantics.
503    fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
504        self.publish().ref_count()
505    }
506
507    /// Shares a single subscription, replaying only the last item to new subscribers.
508    fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
509        self.publish_last().ref_count()
510    }
511
512    /// Shares a single subscription while replaying a bounded history to future subscribers.
513    fn share_replay(
514        self,
515        buffer_size: Option<usize>,
516    ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
517        self.replay(buffer_size).ref_count()
518    }
519
520    /// Skips the first `count` items before emitting the remainder of the sequence.
521    fn skip(self, count: usize) -> Skip<Self> {
522        Skip::new(self, count)
523    }
524
525    /// Skips the last `count` items emitted by the source.
526    fn skip_last(self, count: usize) -> SkipLast<Self> {
527        SkipLast::new(self, count)
528    }
529
530    /// Ignores items from the source until the notifier observable fires.
531    fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
532    where
533        OE1: Observable<'or, 'sub, (), E>,
534    {
535        SkipUntil::new(self, start)
536    }
537
538    /// Skips items while the predicate returns `true`, then emits the remaining items.
539    fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
540    where
541        F: FnMut(&T) -> bool,
542    {
543        SkipWhile::new(self, callback)
544    }
545
546    /// Pre-pends the provided values before the source starts emitting.
547    fn start_with<I>(self, values: I) -> StartWith<Self, I>
548    where
549        I: IntoIterator<Item = T>,
550    {
551        StartWith::new(self, values)
552    }
553
554    /// Subscribes to the source on the provided scheduler.
555    fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
556        SubscribeOn::new(self, scheduler)
557    }
558
559    /// Convenience helper for subscribing with plain callbacks instead of a full observer.
560    fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
561    where
562        T: 'or,
563        E: 'or,
564        FN: FnMut(T) + NecessarySendSync + 'or,
565        FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
566    {
567        self.subscribe(CallbackObserver::new(on_next, on_termination))
568    }
569
570    /// Sums all numeric items and emits the accumulated total.
571    fn sum(self) -> Sum<Self> {
572        Sum::new(self)
573    }
574
575    /// Switches to the most recent inner observable emitted by the source.
576    fn switch<T1>(self) -> Switch<Self, T>
577    where
578        T: Observable<'or, 'sub, T1, E>,
579    {
580        Switch::new(self)
581    }
582
583    /// Maps each item to an observable and switches to the latest inner sequence.
584    fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
585    where
586        OE1: Observable<'or, 'sub, T1, E>,
587        F: FnMut(T) -> OE1,
588    {
589        SwitchMap::new(self, callback)
590    }
591
592    /// Emits only the first `count` items from the source before completing.
593    fn take(self, count: usize) -> Take<Self> {
594        Take::new(self, count)
595    }
596
597    /// Emits only the last `count` items produced by the source.
598    fn take_last(self, count: usize) -> TakeLast<Self> {
599        TakeLast::new(self, count)
600    }
601
602    /// Relays items until the notifier observable emits, then completes.
603    fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
604    where
605        OE1: Observable<'or, 'sub, (), E>,
606    {
607        TakeUntil::new(self, stop)
608    }
609
610    /// Emits items while the predicate holds `true`, then completes.
611    fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
612    where
613        F: FnMut(&T) -> bool,
614    {
615        TakeWhile::new(self, callback)
616    }
617
618    /// Throttles emissions to at most one item per specified timespan.
619    fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
620        Throttle::new(self, time_span, scheduler)
621    }
622
623    /// Emits elapsed time between consecutive items as they flow through the stream.
624    fn time_interval(self) -> TimeInterval<Self> {
625        TimeInterval::new(self)
626    }
627
628    /// Errors if the next item does not arrive within the specified duration.
629    fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
630        Timeout::new(self, duration, scheduler)
631    }
632
633    /// Annotates each item with the current timestamp when it is emitted.
634    fn timestamp(self) -> Timestamp<Self> {
635        Timestamp::new(self)
636    }
637
638    /// Collects items into windows that are opened and closed by another observable.
639    fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
640    where
641        OE1: Observable<'or, 'sub, (), E>,
642    {
643        Window::new(self, boundary)
644    }
645
646    /// Collects items into windows containing a fixed number of elements.
647    fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
648        WindowWithCount::new(self, count)
649    }
650
651    /// Pairs items from both observables by index and emits tuples of corresponding values.
652    fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
653    where
654        OE2: Observable<'or, 'sub, T1, E>,
655    {
656        Zip::new(self, another_source)
657    }
658}
659
660impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
661{}