rx_rust/observable/
observable_ext.rs

1use super::{Observable, boxed_observable::BoxedObservable};
2use crate::{
3    disposable::subscription::Subscription,
4    observer::{
5        Observer, Termination, boxed_observer::BoxedObserver, callback_observer::CallbackObserver,
6    },
7    operators::{
8        backpressure::{
9            on_backpressure::OnBackpressure, on_backpressure_buffer::OnBackpressureBuffer,
10            on_backpressure_latest::OnBackpressureLatest,
11        },
12        combining::{
13            combine_latest::CombineLatest, concat::Concat, concat_all::ConcatAll, merge::Merge,
14            merge_all::MergeAll, start_with::StartWith, switch::Switch, zip::Zip,
15        },
16        conditional_boolean::{
17            all::All, amb::Amb, contains::Contains, default_if_empty::DefaultIfEmpty,
18            sequence_equal::SequenceEqual, skip_until::SkipUntil, skip_while::SkipWhile,
19            take_until::TakeUntil, take_while::TakeWhile,
20        },
21        connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
22        error_handling::{
23            catch::Catch,
24            retry::{Retry, RetryAction},
25        },
26        filtering::{
27            debounce::Debounce, distinct::Distinct, distinct_until_changed::DistinctUntilChanged,
28            element_at::ElementAt, filter::Filter, first::First, ignore_elements::IgnoreElements,
29            last::Last, sample::Sample, skip::Skip, skip_last::SkipLast, take::Take,
30            take_last::TakeLast, throttle::Throttle,
31        },
32        mathematical_aggregate::{
33            average::Average, count::Count, max::Max, min::Min, reduce::Reduce, sum::Sum,
34        },
35        others::{
36            debug::{Debug, DebugEvent, DefaultPrintType},
37            hook_on_next::HookOnNext,
38            hook_on_subscription::HookOnSubscription,
39            hook_on_termination::HookOnTermination,
40            map_infallible_to_error::MapInfallibleToError,
41            map_infallible_to_value::MapInfallibleToValue,
42        },
43        transforming::{
44            buffer::Buffer, buffer_with_count::BufferWithCount, buffer_with_time::BufferWithTime,
45            buffer_with_time_or_count::BufferWithTimeOrCount, concat_map::ConcatMap,
46            flat_map::FlatMap, group_by::GroupBy, map::Map, scan::Scan, switch_map::SwitchMap,
47            window::Window, window_with_count::WindowWithCount,
48        },
49        utility::{
50            delay::Delay, dematerialize::Dematerialize, do_after_disposal::DoAfterDisposal,
51            do_after_next::DoAfterNext, do_after_subscription::DoAfterSubscription,
52            do_after_termination::DoAfterTermination, do_before_disposal::DoBeforeDisposal,
53            do_before_next::DoBeforeNext, do_before_subscription::DoBeforeSubscription,
54            do_before_termination::DoBeforeTermination, materialize::Materialize,
55            observe_on::ObserveOn, subscribe_on::SubscribeOn, time_interval::TimeInterval,
56            timeout::Timeout, timestamp::Timestamp,
57        },
58    },
59    subject::{
60        async_subject::AsyncSubject, publish_subject::PublishSubject, replay_subject::ReplaySubject,
61    },
62    utils::types::NecessarySendSync,
63};
64use std::{fmt::Display, num::NonZeroUsize, time::Duration};
65#[cfg(feature = "futures")]
66use {crate::operators::others::observable_stream::ObservableStream, std::convert::Infallible};
67
68/// Extension trait that exposes the full suite of RxRust operators on any type that
69/// implements [`Observable`]. Each method forwards to the corresponding operator
70/// constructor, allowing a fluent, ergonomic style when composing observable pipelines.
71pub trait ObservableExt<'or, 'sub, T, E>: Observable<'or, 'sub, T, E> + Sized {
72    /// Emits a single `bool` indicating whether every item satisfies the provided predicate.
73    fn all<F>(self, callback: F) -> All<T, Self, F>
74    where
75        F: FnMut(T) -> bool,
76    {
77        All::new(self, callback)
78    }
79
80    /// Competes two observables and mirrors whichever one produces an item or error first.
81    fn amb_with(self, other: Self) -> Amb<[Self; 2]> {
82        Amb::new([self, other])
83    }
84
85    /// Calculates the arithmetic mean of all numeric items emitted by the source.
86    fn average(self) -> Average<T, Self> {
87        Average::new(self)
88    }
89
90    /// Collects the items emitted by the source into buffers delimited by another observable.
91    fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
92    where
93        OE1: Observable<'or, 'sub, (), E>,
94    {
95        Buffer::new(self, boundary)
96    }
97
98    /// Collects items into fixed-size buffers and emits each buffer as soon as it fills up.
99    fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self> {
100        BufferWithCount::new(self, count)
101    }
102
103    /// Collects items into time-based buffers driven by the provided scheduler.
104    fn buffer_with_time<S>(
105        self,
106        time_span: Duration,
107        scheduler: S,
108        delay: Option<Duration>,
109    ) -> BufferWithTime<Self, S> {
110        BufferWithTime::new(self, time_span, scheduler, delay)
111    }
112
113    /// Collects items into buffers using both size and time boundaries whichever occurs first.
114    fn buffer_with_time_or_count<S>(
115        self,
116        count: NonZeroUsize,
117        time_span: Duration,
118        scheduler: S,
119        delay: Option<Duration>,
120    ) -> BufferWithTimeOrCount<Self, S> {
121        BufferWithTimeOrCount::new(self, count, time_span, scheduler, delay)
122    }
123
124    /// Recovers from errors by switching to another observable yielded by the callback.
125    fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
126    where
127        OE1: Observable<'or, 'sub, T, E1>,
128        F: FnOnce(E) -> OE1,
129    {
130        Catch::new(self, callback)
131    }
132
133    /// Combines the latest values from both observables whenever either produces a new item.
134    fn combine_latest<T1, OE2>(self, another_source: OE2) -> CombineLatest<Self, OE2>
135    where
136        OE2: Observable<'or, 'sub, T1, E>,
137    {
138        CombineLatest::new(self, another_source)
139    }
140
141    /// Flattens an observable-of-observables by concatenating each inner observable sequentially.
142    fn concat_all<T1>(self) -> ConcatAll<Self, T>
143    where
144        T: Observable<'or, 'sub, T1, E>,
145    {
146        ConcatAll::new(self)
147    }
148
149    /// Maps each item to an observable and concatenates the resulting inner sequences.
150    fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
151    where
152        OE1: Observable<'or, 'sub, T1, E>,
153        F: FnMut(T) -> OE1,
154    {
155        ConcatMap::new(self, callback)
156    }
157
158    /// Concatenates the source with another observable, waiting for the first to complete.
159    fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
160    where
161        OE2: Observable<'or, 'sub, T, E>,
162    {
163        Concat::new(self, source_2)
164    }
165
166    /// Emits `true` if the sequence contains the provided item, `false` otherwise.
167    fn contains(self, item: T) -> Contains<T, Self> {
168        Contains::new(self, item)
169    }
170
171    /// Counts the number of items emitted and emits that count as a single value.
172    fn count(self) -> Count<T, Self> {
173        Count::new(self)
174    }
175
176    /// Emits an item from the source Observable only after a particular time span has passed without another source emission.
177    fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S> {
178        Debounce::new(self, time_span, scheduler)
179    }
180
181    /// Attaches a label to the stream and logs lifecycle events for debugging purposes using the provided callback.
182    fn debug<C, F>(self, context: C, callback: F) -> Debug<Self, C, F>
183    where
184        F: Fn(C, DebugEvent<'_, T, E>),
185    {
186        Debug::new(self, context, callback)
187    }
188
189    /// Attaches a label to the stream and logs lifecycle events for debugging purposes using the default print.
190    fn debug_default_print<L>(self, label: L) -> Debug<Self, L, DefaultPrintType<L, T, E>>
191    where
192        L: Display,
193        T: std::fmt::Debug,
194        E: std::fmt::Debug,
195    {
196        Debug::new_default_print(self, label)
197    }
198
199    /// Emits a default value if the source completes without emitting any items.
200    fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self> {
201        DefaultIfEmpty::new(self, default_value)
202    }
203
204    /// Offsets the emission of items by the specified duration using the given scheduler.
205    fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S> {
206        Delay::new(self, delay, scheduler)
207    }
208
209    /// Converts a stream of notifications back into a normal observable sequence.
210    fn dematerialize(self) -> Dematerialize<Self> {
211        Dematerialize::new(self)
212    }
213
214    /// Filters out duplicate items, keeping only the first occurrence of each value.
215    fn distinct(self) -> Distinct<Self, fn(&T) -> T>
216    where
217        T: Clone,
218    {
219        Distinct::new(self)
220    }
221
222    /// Filters out duplicates based on a key selector, keeping only unique keys.
223    fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
224    where
225        F: FnMut(&T) -> K,
226    {
227        Distinct::new_with_key_selector(self, key_selector)
228    }
229
230    /// Suppresses consecutive duplicate items, comparing the values directly.
231    fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
232    where
233        T: Clone,
234    {
235        DistinctUntilChanged::new(self)
236    }
237
238    /// Suppresses consecutive duplicate items using a custom key selector.
239    fn distinct_until_changed_with_key_selector<F, K>(
240        self,
241        key_selector: F,
242    ) -> DistinctUntilChanged<Self, F>
243    where
244        F: FnMut(&T) -> K,
245    {
246        DistinctUntilChanged::new_with_key_selector(self, key_selector)
247    }
248
249    /// Invokes a callback after the downstream subscription is disposed.
250    fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
251    where
252        F: FnOnce(),
253    {
254        DoAfterDisposal::new(self, callback)
255    }
256
257    /// Invokes a callback after each item is forwarded downstream.
258    fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
259    where
260        F: FnMut(T),
261    {
262        DoAfterNext::new(self, callback)
263    }
264
265    /// Invokes a callback after the observer subscribes to the source.
266    fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
267    where
268        F: FnOnce(),
269    {
270        DoAfterSubscription::new(self, callback)
271    }
272
273    /// Invokes a callback after the source terminates, regardless of completion or error.
274    fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
275    where
276        F: FnOnce(Termination<E>),
277    {
278        DoAfterTermination::new(self, callback)
279    }
280
281    /// Invokes a callback right before the downstream subscription is disposed.
282    fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
283    where
284        F: FnOnce(),
285    {
286        DoBeforeDisposal::new(self, callback)
287    }
288
289    /// Invokes a callback with a reference to each item before it is sent downstream.
290    fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
291    where
292        F: FnMut(&T),
293    {
294        DoBeforeNext::new(self, callback)
295    }
296
297    /// Invokes a callback just before the observer subscribes to the source.
298    fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
299    where
300        F: FnOnce(),
301    {
302        DoBeforeSubscription::new(self, callback)
303    }
304
305    /// Invokes a callback before the stream terminates, receiving the termination reason.
306    fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
307    where
308        F: FnOnce(&Termination<E>),
309    {
310        DoBeforeTermination::new(self, callback)
311    }
312
313    /// Emits only the item at the given zero-based index and then completes.
314    fn element_at(self, index: usize) -> ElementAt<Self> {
315        ElementAt::new(self, index)
316    }
317
318    /// Filters items using a predicate, forwarding only values that return `true`.
319    fn filter<F>(self, callback: F) -> Filter<Self, F>
320    where
321        F: FnMut(&T) -> bool,
322    {
323        Filter::new(self, callback)
324    }
325
326    /// Emits only the first item from the source, then completes.
327    fn first(self) -> First<Self> {
328        First::new(self)
329    }
330
331    /// Maps each item to an observable and merges the resulting inner sequences concurrently.
332    fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
333    where
334        OE1: Observable<'or, 'sub, T1, E>,
335        F: FnMut(T) -> OE1,
336    {
337        FlatMap::new(self, callback)
338    }
339
340    /// Groups items by key into multiple observable sequences.
341    fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
342    where
343        F: FnMut(T) -> K,
344    {
345        GroupBy::new(self, callback)
346    }
347
348    /// Hooks into the emission of items, allowing mutation of the downstream observer.
349    fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
350    where
351        F: FnMut(&mut dyn Observer<T, E>, T),
352    {
353        HookOnNext::new(self, callback)
354    }
355
356    /// Hooks into subscription, letting you override how the source subscribes observers.
357    fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
358    where
359        F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
360    {
361        HookOnSubscription::new(self, callback)
362    }
363
364    /// Hooks into termination, providing access to the observer and termination payload.
365    fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
366    where
367        F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
368    {
369        HookOnTermination::new(self, callback)
370    }
371
372    /// Ignores all items from the source, only relaying termination events.
373    fn ignore_elements(self) -> IgnoreElements<Self> {
374        IgnoreElements::new(self)
375    }
376
377    /// Boxes the observable, erasing its concrete type while preserving lifetime bounds.
378    fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
379    where
380        T: 'or,
381        E: 'or,
382        Self: NecessarySendSync + 'oe,
383    {
384        BoxedObservable::new(self)
385    }
386
387    #[cfg(feature = "futures")]
388    /// Converts the observable into an async stream.
389    fn into_stream(self) -> ObservableStream<'sub, T, Self>
390    where
391        Self: Observable<'or, 'sub, T, Infallible>,
392    {
393        ObservableStream::new(self)
394    }
395
396    /// Emits only the final item produced by the source before completion.
397    fn last(self) -> Last<Self> {
398        Last::new(self)
399    }
400
401    /// Transforms each item by applying a user-supplied mapping function.
402    fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
403    where
404        F: FnMut(T) -> T1,
405    {
406        Map::new(self, callback)
407    }
408
409    /// Maps an Observable with an `Infallible` error type to an Observable with a concrete error type.
410    fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self> {
411        MapInfallibleToError::new(self)
412    }
413
414    /// Maps an Observable with an `Infallible` item type to an Observable with a concrete item type.
415    fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self> {
416        MapInfallibleToValue::new(self)
417    }
418
419    /// Wraps each item into a notification, turning the stream into explicit events.
420    fn materialize(self) -> Materialize<Self> {
421        Materialize::new(self)
422    }
423
424    /// Emits the maximum item produced by the source according to the natural order.
425    fn max(self) -> Max<Self> {
426        Max::new(self)
427    }
428
429    /// Merges an observable-of-observables by interleaving items from inner streams.
430    fn merge_all<T1>(self) -> MergeAll<Self, T>
431    where
432        T: Observable<'or, 'sub, T1, E>,
433    {
434        MergeAll::new(self)
435    }
436
437    /// Merges the source with another observable, interleaving both streams concurrently.
438    fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
439    where
440        OE2: Observable<'or, 'sub, T, E>,
441    {
442        Merge::new(self, source_2)
443    }
444
445    /// Emits the minimum item produced by the source according to the natural order.
446    fn min(self) -> Min<Self> {
447        Min::new(self)
448    }
449
450    /// Converts the source into a connectable observable using a subject factory.
451    fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
452    where
453        F: FnOnce() -> S,
454    {
455        ConnectableObservable::new(self, subject_maker())
456    }
457
458    /// Schedules downstream observation on the provided scheduler.
459    fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S> {
460        ObserveOn::new(self, scheduler)
461    }
462
463    fn on_backpressure<F>(self, receiving_strategy: F) -> OnBackpressure<Self, F>
464    where
465        F: FnMut(&mut Vec<T>, T),
466    {
467        OnBackpressure::new(self, receiving_strategy)
468    }
469
470    fn on_backpressure_buffer(self) -> OnBackpressureBuffer<Self> {
471        OnBackpressureBuffer::new(self)
472    }
473
474    fn on_backpressure_latest(self) -> OnBackpressureLatest<Self> {
475        OnBackpressureLatest::new(self)
476    }
477
478    /// Multicasts the source using a `PublishSubject`.
479    fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>> {
480        self.multicast(PublishSubject::default)
481    }
482
483    /// Multicasts the source using an `AsyncSubject`, emitting only the last value.
484    fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>> {
485        self.multicast(AsyncSubject::default)
486    }
487
488    /// Aggregates the sequence using an initial seed and an accumulator function.
489    fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
490    where
491        F: FnMut(T0, T) -> T0,
492    {
493        Reduce::new(self, initial_value, callback)
494    }
495
496    /// Multicasts the source using a `ReplaySubject` configured with the given buffer size.
497    fn replay(
498        self,
499        buffer_size: Option<usize>,
500    ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>> {
501        self.multicast(|| ReplaySubject::new(buffer_size))
502    }
503
504    /// Re-subscribes to the source based on the retry strategy returned by the callback.
505    fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
506    where
507        OE1: Observable<'or, 'sub, T, E>,
508        F: FnMut(E) -> RetryAction<E, OE1>,
509    {
510        Retry::new(self, callback)
511    }
512
513    /// Samples the source whenever the sampler observable emits an event.
514    fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
515    where
516        OE1: Observable<'or, 'sub, (), E>,
517    {
518        Sample::new(self, sampler)
519    }
520
521    /// Accumulates values over time, emitting each intermediate result.
522    fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
523    where
524        F: FnMut(T0, T) -> T0,
525    {
526        Scan::new(self, initial_value, callback)
527    }
528
529    /// Compares two sequences element by element for equality.
530    fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
531    where
532        OE2: Observable<'or, 'sub, T, E>,
533    {
534        SequenceEqual::new(self, another_source)
535    }
536
537    /// Shares a single subscription to the source using `PublishSubject` semantics.
538    fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>> {
539        self.publish().ref_count()
540    }
541
542    /// Shares a single subscription, replaying only the last item to new subscribers.
543    fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>> {
544        self.publish_last().ref_count()
545    }
546
547    /// Shares a single subscription while replaying a bounded history to future subscribers.
548    fn share_replay(
549        self,
550        buffer_size: Option<usize>,
551    ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>> {
552        self.replay(buffer_size).ref_count()
553    }
554
555    /// Skips the first `count` items before emitting the remainder of the sequence.
556    fn skip(self, count: usize) -> Skip<Self> {
557        Skip::new(self, count)
558    }
559
560    /// Skips the last `count` items emitted by the source.
561    fn skip_last(self, count: usize) -> SkipLast<Self> {
562        SkipLast::new(self, count)
563    }
564
565    /// Ignores items from the source until the notifier observable fires.
566    fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
567    where
568        OE1: Observable<'or, 'sub, (), E>,
569    {
570        SkipUntil::new(self, start)
571    }
572
573    /// Skips items while the predicate returns `true`, then emits the remaining items.
574    fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
575    where
576        F: FnMut(&T) -> bool,
577    {
578        SkipWhile::new(self, callback)
579    }
580
581    /// Pre-pends the provided values before the source starts emitting.
582    fn start_with<I>(self, values: I) -> StartWith<Self, I>
583    where
584        I: IntoIterator<Item = T>,
585    {
586        StartWith::new(self, values)
587    }
588
589    /// Subscribes to the source on the provided scheduler.
590    fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S> {
591        SubscribeOn::new(self, scheduler)
592    }
593
594    /// Convenience helper for subscribing with plain callbacks instead of a full observer.
595    fn subscribe_with_callback<FN, FT>(self, on_next: FN, on_termination: FT) -> Subscription<'sub>
596    where
597        T: 'or,
598        E: 'or,
599        FN: FnMut(T) + NecessarySendSync + 'or,
600        FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,
601    {
602        self.subscribe(CallbackObserver::new(on_next, on_termination))
603    }
604
605    /// Sums all numeric items and emits the accumulated total.
606    fn sum(self) -> Sum<Self> {
607        Sum::new(self)
608    }
609
610    /// Switches to the most recent inner observable emitted by the source.
611    fn switch<T1>(self) -> Switch<Self, T>
612    where
613        T: Observable<'or, 'sub, T1, E>,
614    {
615        Switch::new(self)
616    }
617
618    /// Maps each item to an observable and switches to the latest inner sequence.
619    fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
620    where
621        OE1: Observable<'or, 'sub, T1, E>,
622        F: FnMut(T) -> OE1,
623    {
624        SwitchMap::new(self, callback)
625    }
626
627    /// Emits only the first `count` items from the source before completing.
628    fn take(self, count: usize) -> Take<Self> {
629        Take::new(self, count)
630    }
631
632    /// Emits only the last `count` items produced by the source.
633    fn take_last(self, count: usize) -> TakeLast<Self> {
634        TakeLast::new(self, count)
635    }
636
637    /// Relays items until the notifier observable emits, then completes.
638    fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
639    where
640        OE1: Observable<'or, 'sub, (), E>,
641    {
642        TakeUntil::new(self, stop)
643    }
644
645    /// Emits items while the predicate holds `true`, then completes.
646    fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
647    where
648        F: FnMut(&T) -> bool,
649    {
650        TakeWhile::new(self, callback)
651    }
652
653    /// Throttles emissions to at most one item per specified timespan.
654    fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S> {
655        Throttle::new(self, time_span, scheduler)
656    }
657
658    /// Emits elapsed time between consecutive items as they flow through the stream.
659    fn time_interval(self) -> TimeInterval<Self> {
660        TimeInterval::new(self)
661    }
662
663    /// Errors if the next item does not arrive within the specified duration.
664    fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S> {
665        Timeout::new(self, duration, scheduler)
666    }
667
668    /// Annotates each item with the current timestamp when it is emitted.
669    fn timestamp(self) -> Timestamp<Self> {
670        Timestamp::new(self)
671    }
672
673    /// Collects items into windows that are opened and closed by another observable.
674    fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
675    where
676        OE1: Observable<'or, 'sub, (), E>,
677    {
678        Window::new(self, boundary)
679    }
680
681    /// Collects items into windows containing a fixed number of elements.
682    fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self> {
683        WindowWithCount::new(self, count)
684    }
685
686    /// Pairs items from both observables by index and emits tuples of corresponding values.
687    fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
688    where
689        OE2: Observable<'or, 'sub, T1, E>,
690    {
691        Zip::new(self, another_source)
692    }
693}
694
695impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE where OE: Observable<'or, 'sub, T, E>
696{}