rx_rust/observable/
observable_ext.rs

1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3    disposable::subscription::Subscription,
4    observer::{
5        Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6    },
7    operators::{
8        combining::{
9            combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
10            merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
11        },
12        conditional_boolean::{
13            all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
14            sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
15            take_until::TakeUntil, take_while::TakeWhile,
16        },
17        connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
18        error_handling::{
19            catch::Catch,
20            retry::{Retry, RetryAction},
21        },
22        filtering::{
23            debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
24            element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
25            last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
26            take_last::TakeLast, throttle::Throttle,
27        },
28        mathematical_aggregate::{
29            average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
30        },
31        others::{
32            debug::{Debug, DefaultPrintType},
33            hook_on_next::HookOnNext,
34            hook_on_subscription::HookOnSubscription,
35            hook_on_termination::HookOnTermination,
36            map_infallible_to_error::MapInfallibleToError,
37            map_infallible_to_value::MapInfallibleToValue,
38        },
39        transforming::{
40            buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
41            buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
42            flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
43            window::Window, window_with_count::WindowWithCount,
44        },
45        utility::{
46            delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
47            do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
48            do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
49            do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
50            do_before_termination::DoBeforeTermination, materialize::Materialize,
51            observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
52            timeout::Timeout, timestamp::Timestamp,
53        },
54    },
55    subject::{
56        async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
57    },
58    utils::types::NecessarySendSync,
59};
60use std::{fmt::Display, num::NonZeroUsize, time::Duration};
61#[cfg(feature = "futures")]
62use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
63
64/// Extension trait that exposes the full suite of RxRust operators on any type that
65/// implements [`Observable`]. Each method forwards to the corresponding operator
66/// constructor, allowing a fluent, ergonomic style when composing observable pipelines.
67pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
68    /// Emits a single `bool` indicating whether every item satisfies the provided predicate.
69    fn all<F>(self, callback: F) -> All<T, Self, F>
70    where
71        F: FnMut(T) -> bool,
72    {
73        All::new(self, callback)
74    }
75
76    /// Competes two observables and mirrors whichever one produces an item or error first.
77    fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
78        Amb::new([self, other])
79    }
80
81    /// Calculates the arithmetic mean of all numeric items emitted by the source.
82    fn average(self) -> Average<T, Self> {
83        Average::new(self)
84    }
85
86    /// Collects the items emitted by the source into buffers delimited by another observable.
87    fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
88    where
89        OE1: Observable<'or, 'sub, (), E>,
90    {
91        Buffer::new(self, boundary)
92    }
93
94    /// Collects items into fixed-size buffers and emits each buffer as soon as it fills up.
95    fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
96        BufferWithCount::new(self, count)
97    }
98
99    /// Collects items into time-based buffers driven by the provided scheduler.
100    fn buffer_with_time<S>(
101        self,
102        time_span: Duration,
103        scheduler: S,
104        delay: Option<Duration>,
105    ) -> BufferWithTime<Self, S> {
106        BufferWithTime::new(self, time_span, scheduler, delay)
107    }
108
109    /// Collects items into buffers using both size and time boundaries whichever occurs first.
110    fn buffer_with_time_or_count<S>(
111        self,
112        count: NonZeroUsize,
113        time_span: Duration,
114        scheduler: S,
115        delay: Option<Duration>,
116    ) -> BufferWithTimeOrCount<Self, S> {
117        BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
118    }
119
120    /// Recovers from errors by switching to another observable yielded by the callback.
121    fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
122    where
123        OE1: Observable<'or, 'sub, T, E1>,
124        F: FnOnce(E) -> OE1,
125    {
126        Catch::new(self, callback)
127    }
128
129    /// Combines the latest values from both observables whenever either produces a new item.
130    fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
131    where
132        OE2: Observable<'or, 'sub, T1, E>,
133    {
134        CombineLatest::new(self, another_source)
135    }
136
137    /// Flattens an observable-of-observables by concatenating each inner observable sequentially.
138    fn concat_all<T1>(self) -> ConcatAll<Self, T>
139    where
140        T: Observable<'or, 'sub, T1, E>,
141    {
142        ConcatAll::new(self)
143    }
144
145    /// Maps each item to an observable and concatenates the resulting inner sequences.
146    fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
147    where
148        OE1: Observable<'or, 'sub, T1, E>,
149        F: FnMut(T) -> OE1,
150    {
151        ConcatMap::new(self, callback)
152    }
153
154    /// Concatenates the source with another observable, waiting for the first to complete.
155    fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
156    where
157        OE2: Observable<'or, 'sub, T, E>,
158    {
159        Concat::new(self, source_2)
160    }
161
162    /// Emits `true` if the sequence contains the provided item, `false` otherwise.
163    fn contains(self, item: T) -> Contains<T, Self> {
164        Contains::new(self, item)
165    }
166
167    /// Counts the number of items emitted and emits that count as a single value.
168    fn count(self) -> Count<T, Self> {
169        Count::new(self)
170    }
171
172    /// Emits an item from the source Observable only after a particular time span has passed without another source emission.
173    fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
174        Debounce::new(self, time_span, scheduler)
175    }
176
177    /// Attaches a label to the stream and logs lifecycle events for debugging purposes using the provided callback.
178    fn debug<L, F>(self, label: L, callback: F) -> Debug<Self, L, F> {
179        Debug::new(self, label, callback)
180    }
181
182    /// Attaches a label to the stream and logs lifecycle events for debugging purposes using the default print.
183    fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
184    where
185        L: Display,
186        T: std::fmt::Debug,
187        E: std::fmt::Debug,
188    {
189        Debug::new_default_print(self, label)
190    }
191
192    /// Emits a default value if the source completes without emitting any items.
193    fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
194        DefaultIfEmpty::new(self, default_value)
195    }
196
197    /// Offsets the emission of items by the specified duration using the given scheduler.
198    fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
199        Delay::new(self, delay, scheduler)
200    }
201
202    /// Converts a stream of notifications back into a normal observable sequence.
203    fn dematerialize(self) -> Dematerialize<Self> {
204        Dematerialize::new(self)
205    }
206
207    /// Filters out duplicate items, keeping only the first occurrence of each value.
208    fn distinct(self) -> Distinct<Self, fn(&T) -> T>
209    where
210        T: Clone,
211    {
212        Distinct::new(self)
213    }
214
215    /// Filters out duplicates based on a key selector, keeping only unique keys.
216    fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
217    where
218        F: FnMut(&T) -> K,
219    {
220        Distinct::new_with_key_selector(self, key_selector)
221    }
222
223    /// Suppresses consecutive duplicate items, comparing the values directly.
224    fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
225    where
226        T: Clone,
227    {
228        DistinctUntilChanged::new(self)
229    }
230
231    /// Suppresses consecutive duplicate items using a custom key selector.
232    fn distinct_until_changed_with_key_selector<F, K>(
233        self,
234        key_selector: F,
235    ) -> DistinctUntilChanged<Self, F>
236    where
237        F: FnMut(&T) -> K,
238    {
239        DistinctUntilChanged::new_with_key_selector(self, key_selector)
240    }
241
242    /// Invokes a callback after the downstream subscription is disposed.
243    fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
244    where
245        F: FnOnce(),
246    {
247        DoAfterDisposal::new(self, callback)
248    }
249
250    /// Invokes a callback after each item is forwarded downstream.
251    fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
252    where
253        F: FnMut(T),
254    {
255        DoAfterNext::new(self, callback)
256    }
257
258    /// Invokes a callback after the observer subscribes to the source.
259    fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
260    where
261        F: FnOnce(),
262    {
263        DoAfterSubscription::new(self, callback)
264    }
265
266    /// Invokes a callback after the source terminates, regardless of completion or error.
267    fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
268    where
269        F: FnOnce(Termination<E>),
270    {
271        DoAfterTermination::new(self, callback)
272    }
273
274    /// Invokes a callback right before the downstream subscription is disposed.
275    fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
276    where
277        F: FnOnce(),
278    {
279        DoBeforeDisposal::new(self, callback)
280    }
281
282    /// Invokes a callback with a reference to each item before it is sent downstream.
283    fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
284    where
285        F: FnMut(&T),
286    {
287        DoBeforeNext::new(self, callback)
288    }
289
290    /// Invokes a callback just before the observer subscribes to the source.
291    fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
292    where
293        F: FnOnce(),
294    {
295        DoBeforeSubscription::new(self, callback)
296    }
297
298    /// Invokes a callback before the stream terminates, receiving the termination reason.
299    fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
300    where
301        F: FnOnce(&Termination<E>),
302    {
303        DoBeforeTermination::new(self, callback)
304    }
305
306    /// Emits only the item at the given zero-based index and then completes.
307    fn element_at(self, index: usize) -> ElementAt<Self> {
308        ElementAt::new(self, index)
309    }
310
311    /// Filters items using a predicate, forwarding only values that return `true`.
312    fn filter<F>(self, callback: F) -> Filter<Self, F>
313    where
314        F: FnMut(&T) -> bool,
315    {
316        Filter::new(self, callback)
317    }
318
319    /// Emits only the first item from the source, then completes.
320    fn first(self) -> First<Self> {
321        First::new(self)
322    }
323
324    /// Maps each item to an observable and merges the resulting inner sequences concurrently.
325    fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
326    where
327        OE1: Observable<'or, 'sub, T1, E>,
328        F: FnMut(T) -> OE1,
329    {
330        FlatMap::new(self, callback)
331    }
332
333    /// Groups items by key into multiple observable sequences.
334    fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
335    where
336        F: FnMut(T) -> K,
337    {
338        GroupBy::new(self, callback)
339    }
340
341    /// Hooks into the emission of items, allowing mutation of the downstream observer.
342    fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
343    where
344        F: FnMut(&mut dyn Observer<T, E>, T),
345    {
346        HookOnNext::new(self, callback)
347    }
348
349    /// Hooks into subscription, letting you override how the source subscribes observers.
350    fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
351    where
352        F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
353    {
354        HookOnSubscription::new(self, callback)
355    }
356
357    /// Hooks into termination, providing access to the observer and termination payload.
358    fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
359    where
360        F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
361    {
362        HookOnTermination::new(self, callback)
363    }
364
365    /// Ignores all items from the source, only relaying termination events.
366    fn ignore_elements(self) -> IgnoreElements<Self> {
367        IgnoreElements::new(self)
368    }
369
370    /// Boxes the observable, erasing its concrete type while preserving lifetime bounds.
371    fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
372    where
373        T: 'or,
374        E: 'or,
375        Self: NecessarySendSync + 'oe,
376    {
377        BoxedObservable::new(self)
378    }
379
380    #[cfg(feature = "futures")]
381    /// Converts the observable into an async stream.
382    fn into_stream(self) -> ObservableStream<'sub, T, Self>
383    where
384        Self: Observable<'or, 'sub, T, Infallible>,
385    {
386        ObservableStream::new(self)
387    }
388
389    /// Emits only the final item produced by the source before completion.
390    fn last(self) -> Last<Self> {
391        Last::new(self)
392    }
393
394    /// Transforms each item by applying a user-supplied mapping function.
395    fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
396    where
397        F: FnMut(T) -> T1,
398    {
399        Map::new(self, callback)
400    }
401
402    /// Maps an Observable with an `Infallible` error type to an Observable with a concrete error type.
403    fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
404        MapInfallibleToError::new(self)
405    }
406
407    /// Maps an Observable with an `Infallible` item type to an Observable with a concrete item type.
408    fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
409        MapInfallibleToValue::new(self)
410    }
411
412    /// Wraps each item into a notification, turning the stream into explicit events.
413    fn materialize(self) -> Materialize<Self> {
414        Materialize::new(self)
415    }
416
417    /// Emits the maximum item produced by the source according to the natural order.
418    fn max(self) -> Max<Self> {
419        Max::new(self)
420    }
421
422    /// Merges an observable-of-observables by interleaving items from inner streams.
423    fn merge_all<T1>(self) -> MergeAll<Self, T>
424    where
425        T: Observable<'or, 'sub, T1, E>,
426    {
427        MergeAll::new(self)
428    }
429
430    /// Merges the source with another observable, interleaving both streams concurrently.
431    fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
432    where
433        OE2: Observable<'or, 'sub, T, E>,
434    {
435        Merge::new(self, source_2)
436    }
437
438    /// Emits the minimum item produced by the source according to the natural order.
439    fn min(self) -> Min<Self> {
440        Min::new(self)
441    }
442
443    /// Converts the source into a connectable observable using a subject factory.
444    fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
445    where
446        F: FnOnce() -> S,
447    {
448        ConnectableObservable::new(self, subject_maker())
449    }
450
451    /// Schedules downstream observation on the provided scheduler.
452    fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
453        ObserveOn::new(self, scheduler)
454    }
455
456    /// Multicasts the source using a `PublishSubject`.
457    fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
458        self.multicast(PublishSubject::default)
459    }
460
461    /// Multicasts the source using an `AsyncSubject`, emitting only the last value.
462    fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
463        self.multicast(AsyncSubject::default)
464    }
465
466    /// Aggregates the sequence using an initial seed and an accumulator function.
467    fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
468    where
469        F: FnMut(T0, T) -> T0,
470    {
471        Reduce::new(self, initial_value, callback)
472    }
473
474    /// Multicasts the source using a `ReplaySubject` configured with the given buffer size.
475    fn replay(
476        self,
477        buffer_size: Option<usize>,
478    ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
479        self.multicast(|| ReplaySubject::new(buffer_size))
480    }
481
482    /// Re-subscribes to the source based on the retry strategy returned by the callback.
483    fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
484    where
485        OE1: Observable<'or, 'sub, T, E>,
486        F: FnMut(E) -> RetryAction<E, OE1>,
487    {
488        Retry::new(self, callback)
489    }
490
491    /// Samples the source whenever the sampler observable emits an event.
492    fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
493    where
494        OE1: Observable<'or, 'sub, (), E>,
495    {
496        Sample::new(self, sampler)
497    }
498
499    /// Accumulates values over time, emitting each intermediate result.
500    fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
501    where
502        F: FnMut(T0, T) -> T0,
503    {
504        Scan::new(self, initial_value, callback)
505    }
506
507    /// Compares two sequences element by element for equality.
508    fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
509    where
510        OE2: Observable<'or, 'sub, T, E>,
511    {
512        SequenceEqual::new(self, another_source)
513    }
514
515    /// Shares a single subscription to the source using `PublishSubject` semantics.
516    fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
517        self.publish().ref_count()
518    }
519
520    /// Shares a single subscription, replaying only the last item to new subscribers.
521    fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
522        self.publish_last().ref_count()
523    }
524
525    /// Shares a single subscription while replaying a bounded history to future subscribers.
526    fn share_replay(
527        self,
528        buffer_size: Option<usize>,
529    ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
530        self.replay(buffer_size).ref_count()
531    }
532
533    /// Skips the first `count` items before emitting the remainder of the sequence.
534    fn skip(self, count: usize) -> Skip<Self> {
535        Skip::new(self, count)
536    }
537
538    /// Skips the last `count` items emitted by the source.
539    fn skip_last(self, count: usize) -> SkipLast<Self> {
540        SkipLast::new(self, count)
541    }
542
543    /// Ignores items from the source until the notifier observable fires.
544    fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
545    where
546        OE1: Observable<'or, 'sub, (), E>,
547    {
548        SkipUntil::new(self, start)
549    }
550
551    /// Skips items while the predicate returns `true`, then emits the remaining items.
552    fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
553    where
554        F: FnMut(&T) -> bool,
555    {
556        SkipWhile::new(self, callback)
557    }
558
559    /// Pre-pends the provided values before the source starts emitting.
560    fn start_with<I>(self, values: I) -> StartWith<Self, I>
561    where
562        I: IntoIterator<Item = T>,
563    {
564        StartWith::new(self, values)
565    }
566
567    /// Subscribes to the source on the provided scheduler.
568    fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
569        SubscribeOn::new(self, scheduler)
570    }
571
572    /// Convenience helper for subscribing with plain callbacks instead of a full observer.
573    fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
574    where
575        T: 'or,
576        E: 'or,
577        FN: FnMut(T) + NecessarySendSync + 'or,
578        FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
579    {
580        self.subscribe(CallbackObserver::new(on_next, on_termination))
581    }
582
583    /// Sums all numeric items and emits the accumulated total.
584    fn sum(self) -> Sum<Self> {
585        Sum::new(self)
586    }
587
588    /// Switches to the most recent inner observable emitted by the source.
589    fn switch<T1>(self) -> Switch<Self, T>
590    where
591        T: Observable<'or, 'sub, T1, E>,
592    {
593        Switch::new(self)
594    }
595
596    /// Maps each item to an observable and switches to the latest inner sequence.
597    fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
598    where
599        OE1: Observable<'or, 'sub, T1, E>,
600        F: FnMut(T) -> OE1,
601    {
602        SwitchMap::new(self, callback)
603    }
604
605    /// Emits only the first `count` items from the source before completing.
606    fn take(self, count: usize) -> Take<Self> {
607        Take::new(self, count)
608    }
609
610    /// Emits only the last `count` items produced by the source.
611    fn take_last(self, count: usize) -> TakeLast<Self> {
612        TakeLast::new(self, count)
613    }
614
615    /// Relays items until the notifier observable emits, then completes.
616    fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
617    where
618        OE1: Observable<'or, 'sub, (), E>,
619    {
620        TakeUntil::new(self, stop)
621    }
622
623    /// Emits items while the predicate holds `true`, then completes.
624    fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
625    where
626        F: FnMut(&T) -> bool,
627    {
628        TakeWhile::new(self, callback)
629    }
630
631    /// Throttles emissions to at most one item per specified timespan.
632    fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
633        Throttle::new(self, time_span, scheduler)
634    }
635
636    /// Emits elapsed time between consecutive items as they flow through the stream.
637    fn time_interval(self) -> TimeInterval<Self> {
638        TimeInterval::new(self)
639    }
640
641    /// Errors if the next item does not arrive within the specified duration.
642    fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
643        Timeout::new(self, duration, scheduler)
644    }
645
646    /// Annotates each item with the current timestamp when it is emitted.
647    fn timestamp(self) -> Timestamp<Self> {
648        Timestamp::new(self)
649    }
650
651    /// Collects items into windows that are opened and closed by another observable.
652    fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
653    where
654        OE1: Observable<'or, 'sub, (), E>,
655    {
656        Window::new(self, boundary)
657    }
658
659    /// Collects items into windows containing a fixed number of elements.
660    fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
661        WindowWithCount::new(self, count)
662    }
663
664    /// Pairs items from both observables by index and emits tuples of corresponding values.
665    fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
666    where
667        OE2: Observable<'or, 'sub, T1, E>,
668    {
669        Zip::new(self, another_source)
670    }
671}
672
673impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
674{}