rx_rust/observable/
observable_ext.rs

1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3    disposable::subscription::Subscription,
4    observer::{
5        Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6    },
7    operators::{
8        combining::{
9            combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
10            merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
11        },
12        conditional_boolean::{
13            all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
14            sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
15            take_until::TakeUntil, take_while::TakeWhile,
16        },
17        connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
18        error_handling::{
19            catch::Catch,
20            retry::{Retry, RetryAction},
21        },
22        filtering::{
23            debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
24            element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
25            last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
26            take_last::TakeLast, throttle::Throttle,
27        },
28        mathematical_aggregate::{
29            average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
30        },
31        others::{
32            debug::{Debug, DebugEvent, DefaultPrintType},
33            hook_on_next::HookOnNext,
34            hook_on_subscription::HookOnSubscription,
35            hook_on_termination::HookOnTermination,
36            map_infallible_to_error::MapInfallibleToError,
37            map_infallible_to_value::MapInfallibleToValue,
38        },
39        transforming::{
40            buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
41            buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
42            flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
43            window::Window, window_with_count::WindowWithCount,
44        },
45        utility::{
46            delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
47            do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
48            do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
49            do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
50            do_before_termination::DoBeforeTermination, materialize::Materialize,
51            observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
52            timeout::Timeout, timestamp::Timestamp,
53        },
54    },
55    subject::{
56        async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
57    },
58    utils::types::NecessarySendSync,
59};
60use std::{fmt::Display, num::NonZeroUsize, time::Duration};
61#[cfg(feature = "futures")]
62use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
63
64/// Extension trait that exposes the full suite of RxRust operators on any type that
65/// implements [`Observable`]. Each method forwards to the corresponding operator
66/// constructor, allowing a fluent, ergonomic style when composing observable pipelines.
67pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
68    /// Emits a single `bool` indicating whether every item satisfies the provided predicate.
69    fn all<F>(self, callback: F) -> All<T, Self, F>
70    where
71        F: FnMut(T) -> bool,
72    {
73        All::new(self, callback)
74    }
75
76    /// Competes two observables and mirrors whichever one produces an item or error first.
77    fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
78        Amb::new([self, other])
79    }
80
81    /// Calculates the arithmetic mean of all numeric items emitted by the source.
82    fn average(self) -> Average<T, Self> {
83        Average::new(self)
84    }
85
86    /// Collects the items emitted by the source into buffers delimited by another observable.
87    fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
88    where
89        OE1: Observable<'or, 'sub, (), E>,
90    {
91        Buffer::new(self, boundary)
92    }
93
94    /// Collects items into fixed-size buffers and emits each buffer as soon as it fills up.
95    fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
96        BufferWithCount::new(self, count)
97    }
98
99    /// Collects items into time-based buffers driven by the provided scheduler.
100    fn buffer_with_time<S>(
101        self,
102        time_span: Duration,
103        scheduler: S,
104        delay: Option<Duration>,
105    ) -> BufferWithTime<Self, S> {
106        BufferWithTime::new(self, time_span, scheduler, delay)
107    }
108
109    /// Collects items into buffers using both size and time boundaries whichever occurs first.
110    fn buffer_with_time_or_count<S>(
111        self,
112        count: NonZeroUsize,
113        time_span: Duration,
114        scheduler: S,
115        delay: Option<Duration>,
116    ) -> BufferWithTimeOrCount<Self, S> {
117        BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
118    }
119
120    /// Recovers from errors by switching to another observable yielded by the callback.
121    fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
122    where
123        OE1: Observable<'or, 'sub, T, E1>,
124        F: FnOnce(E) -> OE1,
125    {
126        Catch::new(self, callback)
127    }
128
129    /// Combines the latest values from both observables whenever either produces a new item.
130    fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
131    where
132        OE2: Observable<'or, 'sub, T1, E>,
133    {
134        CombineLatest::new(self, another_source)
135    }
136
137    /// Flattens an observable-of-observables by concatenating each inner observable sequentially.
138    fn concat_all<T1>(self) -> ConcatAll<Self, T>
139    where
140        T: Observable<'or, 'sub, T1, E>,
141    {
142        ConcatAll::new(self)
143    }
144
145    /// Maps each item to an observable and concatenates the resulting inner sequences.
146    fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
147    where
148        OE1: Observable<'or, 'sub, T1, E>,
149        F: FnMut(T) -> OE1,
150    {
151        ConcatMap::new(self, callback)
152    }
153
154    /// Concatenates the source with another observable, waiting for the first to complete.
155    fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
156    where
157        OE2: Observable<'or, 'sub, T, E>,
158    {
159        Concat::new(self, source_2)
160    }
161
162    /// Emits `true` if the sequence contains the provided item, `false` otherwise.
163    fn contains(self, item: T) -> Contains<T, Self> {
164        Contains::new(self, item)
165    }
166
167    /// Counts the number of items emitted and emits that count as a single value.
168    fn count(self) -> Count<T, Self> {
169        Count::new(self)
170    }
171
172    /// Emits an item from the source Observable only after a particular time span has passed without another source emission.
173    fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
174        Debounce::new(self, time_span, scheduler)
175    }
176
177    /// Attaches a label to the stream and logs lifecycle events for debugging purposes using the provided callback.
178    fn debug<C, F>(self, context: C, callback: F) -> Debug<Self, C, F>
179    where
180        F: Fn(C, DebugEvent<'_, T, E>),
181    {
182        Debug::new(self, context, callback)
183    }
184
185    /// Attaches a label to the stream and logs lifecycle events for debugging purposes using the default print.
186    fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
187    where
188        L: Display,
189        T: std::fmt::Debug,
190        E: std::fmt::Debug,
191    {
192        Debug::new_default_print(self, label)
193    }
194
195    /// Emits a default value if the source completes without emitting any items.
196    fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
197        DefaultIfEmpty::new(self, default_value)
198    }
199
200    /// Offsets the emission of items by the specified duration using the given scheduler.
201    fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
202        Delay::new(self, delay, scheduler)
203    }
204
205    /// Converts a stream of notifications back into a normal observable sequence.
206    fn dematerialize(self) -> Dematerialize<Self> {
207        Dematerialize::new(self)
208    }
209
210    /// Filters out duplicate items, keeping only the first occurrence of each value.
211    fn distinct(self) -> Distinct<Self, fn(&T) -> T>
212    where
213        T: Clone,
214    {
215        Distinct::new(self)
216    }
217
218    /// Filters out duplicates based on a key selector, keeping only unique keys.
219    fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
220    where
221        F: FnMut(&T) -> K,
222    {
223        Distinct::new_with_key_selector(self, key_selector)
224    }
225
226    /// Suppresses consecutive duplicate items, comparing the values directly.
227    fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
228    where
229        T: Clone,
230    {
231        DistinctUntilChanged::new(self)
232    }
233
234    /// Suppresses consecutive duplicate items using a custom key selector.
235    fn distinct_until_changed_with_key_selector<F, K>(
236        self,
237        key_selector: F,
238    ) -> DistinctUntilChanged<Self, F>
239    where
240        F: FnMut(&T) -> K,
241    {
242        DistinctUntilChanged::new_with_key_selector(self, key_selector)
243    }
244
245    /// Invokes a callback after the downstream subscription is disposed.
246    fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
247    where
248        F: FnOnce(),
249    {
250        DoAfterDisposal::new(self, callback)
251    }
252
253    /// Invokes a callback after each item is forwarded downstream.
254    fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
255    where
256        F: FnMut(T),
257    {
258        DoAfterNext::new(self, callback)
259    }
260
261    /// Invokes a callback after the observer subscribes to the source.
262    fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
263    where
264        F: FnOnce(),
265    {
266        DoAfterSubscription::new(self, callback)
267    }
268
269    /// Invokes a callback after the source terminates, regardless of completion or error.
270    fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
271    where
272        F: FnOnce(Termination<E>),
273    {
274        DoAfterTermination::new(self, callback)
275    }
276
277    /// Invokes a callback right before the downstream subscription is disposed.
278    fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
279    where
280        F: FnOnce(),
281    {
282        DoBeforeDisposal::new(self, callback)
283    }
284
285    /// Invokes a callback with a reference to each item before it is sent downstream.
286    fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
287    where
288        F: FnMut(&T),
289    {
290        DoBeforeNext::new(self, callback)
291    }
292
293    /// Invokes a callback just before the observer subscribes to the source.
294    fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
295    where
296        F: FnOnce(),
297    {
298        DoBeforeSubscription::new(self, callback)
299    }
300
301    /// Invokes a callback before the stream terminates, receiving the termination reason.
302    fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
303    where
304        F: FnOnce(&Termination<E>),
305    {
306        DoBeforeTermination::new(self, callback)
307    }
308
309    /// Emits only the item at the given zero-based index and then completes.
310    fn element_at(self, index: usize) -> ElementAt<Self> {
311        ElementAt::new(self, index)
312    }
313
314    /// Filters items using a predicate, forwarding only values that return `true`.
315    fn filter<F>(self, callback: F) -> Filter<Self, F>
316    where
317        F: FnMut(&T) -> bool,
318    {
319        Filter::new(self, callback)
320    }
321
322    /// Emits only the first item from the source, then completes.
323    fn first(self) -> First<Self> {
324        First::new(self)
325    }
326
327    /// Maps each item to an observable and merges the resulting inner sequences concurrently.
328    fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
329    where
330        OE1: Observable<'or, 'sub, T1, E>,
331        F: FnMut(T) -> OE1,
332    {
333        FlatMap::new(self, callback)
334    }
335
336    /// Groups items by key into multiple observable sequences.
337    fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
338    where
339        F: FnMut(T) -> K,
340    {
341        GroupBy::new(self, callback)
342    }
343
344    /// Hooks into the emission of items, allowing mutation of the downstream observer.
345    fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
346    where
347        F: FnMut(&mut dyn Observer<T, E>, T),
348    {
349        HookOnNext::new(self, callback)
350    }
351
352    /// Hooks into subscription, letting you override how the source subscribes observers.
353    fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
354    where
355        F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
356    {
357        HookOnSubscription::new(self, callback)
358    }
359
360    /// Hooks into termination, providing access to the observer and termination payload.
361    fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
362    where
363        F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
364    {
365        HookOnTermination::new(self, callback)
366    }
367
368    /// Ignores all items from the source, only relaying termination events.
369    fn ignore_elements(self) -> IgnoreElements<Self> {
370        IgnoreElements::new(self)
371    }
372
373    /// Boxes the observable, erasing its concrete type while preserving lifetime bounds.
374    fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
375    where
376        T: 'or,
377        E: 'or,
378        Self: NecessarySendSync + 'oe,
379    {
380        BoxedObservable::new(self)
381    }
382
383    #[cfg(feature = "futures")]
384    /// Converts the observable into an async stream.
385    fn into_stream(self) -> ObservableStream<'sub, T, Self>
386    where
387        Self: Observable<'or, 'sub, T, Infallible>,
388    {
389        ObservableStream::new(self)
390    }
391
392    /// Emits only the final item produced by the source before completion.
393    fn last(self) -> Last<Self> {
394        Last::new(self)
395    }
396
397    /// Transforms each item by applying a user-supplied mapping function.
398    fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
399    where
400        F: FnMut(T) -> T1,
401    {
402        Map::new(self, callback)
403    }
404
405    /// Maps an Observable with an `Infallible` error type to an Observable with a concrete error type.
406    fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
407        MapInfallibleToError::new(self)
408    }
409
410    /// Maps an Observable with an `Infallible` item type to an Observable with a concrete item type.
411    fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
412        MapInfallibleToValue::new(self)
413    }
414
415    /// Wraps each item into a notification, turning the stream into explicit events.
416    fn materialize(self) -> Materialize<Self> {
417        Materialize::new(self)
418    }
419
420    /// Emits the maximum item produced by the source according to the natural order.
421    fn max(self) -> Max<Self> {
422        Max::new(self)
423    }
424
425    /// Merges an observable-of-observables by interleaving items from inner streams.
426    fn merge_all<T1>(self) -> MergeAll<Self, T>
427    where
428        T: Observable<'or, 'sub, T1, E>,
429    {
430        MergeAll::new(self)
431    }
432
433    /// Merges the source with another observable, interleaving both streams concurrently.
434    fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
435    where
436        OE2: Observable<'or, 'sub, T, E>,
437    {
438        Merge::new(self, source_2)
439    }
440
441    /// Emits the minimum item produced by the source according to the natural order.
442    fn min(self) -> Min<Self> {
443        Min::new(self)
444    }
445
446    /// Converts the source into a connectable observable using a subject factory.
447    fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
448    where
449        F: FnOnce() -> S,
450    {
451        ConnectableObservable::new(self, subject_maker())
452    }
453
454    /// Schedules downstream observation on the provided scheduler.
455    fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
456        ObserveOn::new(self, scheduler)
457    }
458
459    /// Multicasts the source using a `PublishSubject`.
460    fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
461        self.multicast(PublishSubject::default)
462    }
463
464    /// Multicasts the source using an `AsyncSubject`, emitting only the last value.
465    fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
466        self.multicast(AsyncSubject::default)
467    }
468
469    /// Aggregates the sequence using an initial seed and an accumulator function.
470    fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
471    where
472        F: FnMut(T0, T) -> T0,
473    {
474        Reduce::new(self, initial_value, callback)
475    }
476
477    /// Multicasts the source using a `ReplaySubject` configured with the given buffer size.
478    fn replay(
479        self,
480        buffer_size: Option<usize>,
481    ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
482        self.multicast(|| ReplaySubject::new(buffer_size))
483    }
484
485    /// Re-subscribes to the source based on the retry strategy returned by the callback.
486    fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
487    where
488        OE1: Observable<'or, 'sub, T, E>,
489        F: FnMut(E) -> RetryAction<E, OE1>,
490    {
491        Retry::new(self, callback)
492    }
493
494    /// Samples the source whenever the sampler observable emits an event.
495    fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
496    where
497        OE1: Observable<'or, 'sub, (), E>,
498    {
499        Sample::new(self, sampler)
500    }
501
502    /// Accumulates values over time, emitting each intermediate result.
503    fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
504    where
505        F: FnMut(T0, T) -> T0,
506    {
507        Scan::new(self, initial_value, callback)
508    }
509
510    /// Compares two sequences element by element for equality.
511    fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
512    where
513        OE2: Observable<'or, 'sub, T, E>,
514    {
515        SequenceEqual::new(self, another_source)
516    }
517
518    /// Shares a single subscription to the source using `PublishSubject` semantics.
519    fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
520        self.publish().ref_count()
521    }
522
523    /// Shares a single subscription, replaying only the last item to new subscribers.
524    fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
525        self.publish_last().ref_count()
526    }
527
528    /// Shares a single subscription while replaying a bounded history to future subscribers.
529    fn share_replay(
530        self,
531        buffer_size: Option<usize>,
532    ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
533        self.replay(buffer_size).ref_count()
534    }
535
536    /// Skips the first `count` items before emitting the remainder of the sequence.
537    fn skip(self, count: usize) -> Skip<Self> {
538        Skip::new(self, count)
539    }
540
541    /// Skips the last `count` items emitted by the source.
542    fn skip_last(self, count: usize) -> SkipLast<Self> {
543        SkipLast::new(self, count)
544    }
545
546    /// Ignores items from the source until the notifier observable fires.
547    fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
548    where
549        OE1: Observable<'or, 'sub, (), E>,
550    {
551        SkipUntil::new(self, start)
552    }
553
554    /// Skips items while the predicate returns `true`, then emits the remaining items.
555    fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
556    where
557        F: FnMut(&T) -> bool,
558    {
559        SkipWhile::new(self, callback)
560    }
561
562    /// Pre-pends the provided values before the source starts emitting.
563    fn start_with<I>(self, values: I) -> StartWith<Self, I>
564    where
565        I: IntoIterator<Item = T>,
566    {
567        StartWith::new(self, values)
568    }
569
570    /// Subscribes to the source on the provided scheduler.
571    fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
572        SubscribeOn::new(self, scheduler)
573    }
574
575    /// Convenience helper for subscribing with plain callbacks instead of a full observer.
576    fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
577    where
578        T: 'or,
579        E: 'or,
580        FN: FnMut(T) + NecessarySendSync + 'or,
581        FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
582    {
583        self.subscribe(CallbackObserver::new(on_next, on_termination))
584    }
585
586    /// Sums all numeric items and emits the accumulated total.
587    fn sum(self) -> Sum<Self> {
588        Sum::new(self)
589    }
590
591    /// Switches to the most recent inner observable emitted by the source.
592    fn switch<T1>(self) -> Switch<Self, T>
593    where
594        T: Observable<'or, 'sub, T1, E>,
595    {
596        Switch::new(self)
597    }
598
599    /// Maps each item to an observable and switches to the latest inner sequence.
600    fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
601    where
602        OE1: Observable<'or, 'sub, T1, E>,
603        F: FnMut(T) -> OE1,
604    {
605        SwitchMap::new(self, callback)
606    }
607
608    /// Emits only the first `count` items from the source before completing.
609    fn take(self, count: usize) -> Take<Self> {
610        Take::new(self, count)
611    }
612
613    /// Emits only the last `count` items produced by the source.
614    fn take_last(self, count: usize) -> TakeLast<Self> {
615        TakeLast::new(self, count)
616    }
617
618    /// Relays items until the notifier observable emits, then completes.
619    fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
620    where
621        OE1: Observable<'or, 'sub, (), E>,
622    {
623        TakeUntil::new(self, stop)
624    }
625
626    /// Emits items while the predicate holds `true`, then completes.
627    fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
628    where
629        F: FnMut(&T) -> bool,
630    {
631        TakeWhile::new(self, callback)
632    }
633
634    /// Throttles emissions to at most one item per specified timespan.
635    fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
636        Throttle::new(self, time_span, scheduler)
637    }
638
639    /// Emits elapsed time between consecutive items as they flow through the stream.
640    fn time_interval(self) -> TimeInterval<Self> {
641        TimeInterval::new(self)
642    }
643
644    /// Errors if the next item does not arrive within the specified duration.
645    fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
646        Timeout::new(self, duration, scheduler)
647    }
648
649    /// Annotates each item with the current timestamp when it is emitted.
650    fn timestamp(self) -> Timestamp<Self> {
651        Timestamp::new(self)
652    }
653
654    /// Collects items into windows that are opened and closed by another observable.
655    fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
656    where
657        OE1: Observable<'or, 'sub, (), E>,
658    {
659        Window::new(self, boundary)
660    }
661
662    /// Collects items into windows containing a fixed number of elements.
663    fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
664        WindowWithCount::new(self, count)
665    }
666
667    /// Pairs items from both observables by index and emits tuples of corresponding values.
668    fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
669    where
670        OE2: Observable<'or, 'sub, T1, E>,
671    {
672        Zip::new(self, another_source)
673    }
674}
675
676impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
677{}