Window

Struct Window 

Source
pub struct Window<OE, OE1> { /* private fields */ }
Expand description

Periodically subdivides items from an Observable into Observable windows, each window being emitted when a boundary Observable emits an item. See https://reactivex.io/documentation/operators/window.html

§Examples

use rx_rust::{
    disposable::subscription::Subscription,
    observable::observable_ext::ObservableExt,
    observer::{Observer, Termination},
    operators::transforming::window::Window,
    subject::publish_subject::PublishSubject,
};
use std::{convert::Infallible, sync::{Arc, Mutex}};

let windows = Arc::new(Mutex::new(Vec::<Vec<i32>>::new()));
let terminations = Arc::new(Mutex::new(Vec::new()));
let inner_subscriptions = Arc::new(Mutex::new(Vec::<Subscription>::new()));

let mut source: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
let mut boundary: PublishSubject<'_, (), Infallible> = PublishSubject::default();
let windows_observer = Arc::clone(&windows);
let terminations_observer = Arc::clone(&terminations);
let inner_subscriptions_observer = Arc::clone(&inner_subscriptions);

let subscription = Window::new(source.clone(), boundary.clone()).subscribe_with_callback(
    move |window| {
        let index = {
            let mut windows = windows_observer.lock().unwrap();
            windows.push(Vec::new());
            windows.len() - 1
        };
        let windows_for_values = Arc::clone(&windows_observer);
        let sub = window.subscribe_with_callback(
            move |value| {
                windows_for_values.lock().unwrap()[index].push(value);
            },
            |_| {},
        );
        inner_subscriptions_observer.lock().unwrap().push(sub);
    },
    move |termination| terminations_observer
        .lock()
        .unwrap()
        .push(termination),
);

source.on_next(1);
source.on_next(2);
boundary.on_next(());
source.on_next(3);
source.on_termination(Termination::Completed);
drop(subscription);
inner_subscriptions.lock().unwrap().drain(..).for_each(drop);

assert_eq!(
    &*windows.lock().unwrap(),
    &[vec![1, 2], vec![3]]
);
assert_eq!(
    &*terminations.lock().unwrap(),
    &[Termination::Completed]
);

Implementations§

Source§

impl<OE, OE1> Window<OE, OE1>

Source

pub fn new<'or, 'sub, T, E>(source: OE, boundary: OE1) -> Self
where OE: Observable<'or, 'sub, T, E>, OE1: Observable<'or, 'sub, (), E>,

Trait Implementations§

Source§

impl<OE, OE1> Clone for Window<OE, OE1>
where OE: Clone, OE1: Clone,

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<OE, OE1> Debug for Window<OE, OE1>
where OE: Debug, OE1: Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, SubjectObservable<PublishSubject<'or, T, E>>, E> for Window<OE, OE1>
where T: Clone + NecessarySendSync + 'or, E: Clone + NecessarySendSync + 'or, OE: Observable<'or, 'sub, T, E>, OE1: Observable<'or, 'sub, (), E>, 'sub: 'or,

Source§

fn subscribe( self, observer: impl Observer<SubjectObservable<PublishSubject<'or, T, E>>, E> + NecessarySendSync + 'or, ) -> Subscription<'sub>

Subscribes an observer to this observable. When an observer is subscribed, it will start receiving events from the observable. The subscribe method returns a Subscription which can be used to unsubscribe the observer from the observable. We use Subscription struct instead of trait like impl Cancellable, because we need to cancel the subscription when the Subscription is dropped. It’s not possible to implement Drop for a trait object.

Auto Trait Implementations§

§

impl<OE, OE1> Freeze for Window<OE, OE1>
where OE: Freeze, OE1: Freeze,

§

impl<OE, OE1> RefUnwindSafe for Window<OE, OE1>
where OE: RefUnwindSafe, OE1: RefUnwindSafe,

§

impl<OE, OE1> Send for Window<OE, OE1>
where OE: Send, OE1: Send,

§

impl<OE, OE1> Sync for Window<OE, OE1>
where OE: Sync, OE1: Sync,

§

impl<OE, OE1> Unpin for Window<OE, OE1>
where OE: Unpin, OE1: Unpin,

§

impl<OE, OE1> UnwindSafe for Window<OE, OE1>
where OE: UnwindSafe, OE1: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<'or, 'sub, T, E, OE> ObservableExt<'or, 'sub, T, E> for OE
where OE: Observable<'or, 'sub, T, E>,

Source§

fn all<F>(self, callback: F) -> All<T, Self, F>
where F: FnMut(T) -> bool,

Emits a single bool indicating whether every item satisfies the provided predicate.
Source§

fn amb_with(self, other: Self) -> Amb<[Self; 2]>

Competes two observables and mirrors whichever one produces an item or error first.
Source§

fn average(self) -> Average<T, Self>

Calculates the arithmetic mean of all numeric items emitted by the source.
Source§

fn buffer<OE1>(self, boundary: OE1) -> Buffer<Self, OE1>
where OE1: Observable<'or, 'sub, (), E>,

Collects the items emitted by the source into buffers delimited by another observable.
Source§

fn buffer_with_count(self, count: NonZeroUsize) -> BufferWithCount<Self>

Collects items into fixed-size buffers and emits each buffer as soon as it fills up.
Source§

fn buffer_with_time<S>( self, time_span: Duration, scheduler: S, delay: Option<Duration>, ) -> BufferWithTime<Self, S>

Collects items into time-based buffers driven by the provided scheduler.
Source§

fn buffer_with_time_or_count<S>( self, count: NonZeroUsize, time_span: Duration, scheduler: S, delay: Option<Duration>, ) -> BufferWithTimeOrCount<Self, S>

Collects items into buffers using both size and time boundaries whichever occurs first.
Source§

fn catch<E1, OE1, F>(self, callback: F) -> Catch<E, Self, F>
where OE1: Observable<'or, 'sub, T, E1>, F: FnOnce(E) -> OE1,

Recovers from errors by switching to another observable yielded by the callback.
Source§

fn combine_latest<T1, OE2>( self, another_source: OE2, ) -> CombineLatest<Self, OE2>
where OE2: Observable<'or, 'sub, T1, E>,

Combines the latest values from both observables whenever either produces a new item.
Source§

fn concat_all<T1>(self) -> ConcatAll<Self, T>
where T: Observable<'or, 'sub, T1, E>,

Flattens an observable-of-observables by concatenating each inner observable sequentially.
Source§

fn concat_map<T1, OE1, F>(self, callback: F) -> ConcatMap<T, Self, OE1, F>
where OE1: Observable<'or, 'sub, T1, E>, F: FnMut(T) -> OE1,

Maps each item to an observable and concatenates the resulting inner sequences.
Source§

fn concat_with<OE2>(self, source_2: OE2) -> Concat<Self, OE2>
where OE2: Observable<'or, 'sub, T, E>,

Concatenates the source with another observable, waiting for the first to complete.
Source§

fn contains(self, item: T) -> Contains<T, Self>

Emits true if the sequence contains the provided item, false otherwise.
Source§

fn count(self) -> Count<T, Self>

Counts the number of items emitted and emits that count as a single value.
Source§

fn debounce<S>(self, time_span: Duration, scheduler: S) -> Debounce<Self, S>

Emits an item from the source Observable only after a particular time span has passed without another source emission.
Source§

fn debug<C, F>(self, context: C, callback: F) -> Debug<Self, C, F>
where F: Fn(C, DebugEvent<'_, T, E>),

Attaches a label to the stream and logs lifecycle events for debugging purposes using the provided callback.
Source§

fn debug_default_print<L>( self, label: L, ) -> Debug<Self, L, DefaultPrintType<L, T, E>>
where L: Display, T: Debug, E: Debug,

Attaches a label to the stream and logs lifecycle events for debugging purposes using the default print.
Source§

fn default_if_empty(self, default_value: T) -> DefaultIfEmpty<T, Self>

Emits a default value if the source completes without emitting any items.
Source§

fn delay<S>(self, delay: Duration, scheduler: S) -> Delay<Self, S>

Offsets the emission of items by the specified duration using the given scheduler.
Source§

fn dematerialize(self) -> Dematerialize<Self>

Converts a stream of notifications back into a normal observable sequence.
Source§

fn distinct(self) -> Distinct<Self, fn(&T) -> T>
where T: Clone,

Filters out duplicate items, keeping only the first occurrence of each value.
Source§

fn distinct_with_key_selector<F, K>(self, key_selector: F) -> Distinct<Self, F>
where F: FnMut(&T) -> K,

Filters out duplicates based on a key selector, keeping only unique keys.
Source§

fn distinct_until_changed(self) -> DistinctUntilChanged<Self, fn(&T) -> T>
where T: Clone,

Suppresses consecutive duplicate items, comparing the values directly.
Source§

fn distinct_until_changed_with_key_selector<F, K>( self, key_selector: F, ) -> DistinctUntilChanged<Self, F>
where F: FnMut(&T) -> K,

Suppresses consecutive duplicate items using a custom key selector.
Source§

fn do_after_disposal<F>(self, callback: F) -> DoAfterDisposal<Self, F>
where F: FnOnce(),

Invokes a callback after the downstream subscription is disposed.
Source§

fn do_after_next<F>(self, callback: F) -> DoAfterNext<Self, F>
where F: FnMut(T),

Invokes a callback after each item is forwarded downstream.
Source§

fn do_after_subscription<F>(self, callback: F) -> DoAfterSubscription<Self, F>
where F: FnOnce(),

Invokes a callback after the observer subscribes to the source.
Source§

fn do_after_termination<F>(self, callback: F) -> DoAfterTermination<Self, F>
where F: FnOnce(Termination<E>),

Invokes a callback after the source terminates, regardless of completion or error.
Source§

fn do_before_disposal<F>(self, callback: F) -> DoBeforeDisposal<Self, F>
where F: FnOnce(),

Invokes a callback right before the downstream subscription is disposed.
Source§

fn do_before_next<F>(self, callback: F) -> DoBeforeNext<Self, F>
where F: FnMut(&T),

Invokes a callback with a reference to each item before it is sent downstream.
Source§

fn do_before_subscription<F>(self, callback: F) -> DoBeforeSubscription<Self, F>
where F: FnOnce(),

Invokes a callback just before the observer subscribes to the source.
Source§

fn do_before_termination<F>(self, callback: F) -> DoBeforeTermination<Self, F>
where F: FnOnce(&Termination<E>),

Invokes a callback before the stream terminates, receiving the termination reason.
Source§

fn element_at(self, index: usize) -> ElementAt<Self>

Emits only the item at the given zero-based index and then completes.
Source§

fn filter<F>(self, callback: F) -> Filter<Self, F>
where F: FnMut(&T) -> bool,

Filters items using a predicate, forwarding only values that return true.
Source§

fn first(self) -> First<Self>

Emits only the first item from the source, then completes.
Source§

fn flat_map<T1, OE1, F>(self, callback: F) -> FlatMap<T, Self, OE1, F>
where OE1: Observable<'or, 'sub, T1, E>, F: FnMut(T) -> OE1,

Maps each item to an observable and merges the resulting inner sequences concurrently.
Source§

fn group_by<F, K>(self, callback: F) -> GroupBy<Self, F, K>
where F: FnMut(T) -> K,

Groups items by key into multiple observable sequences.
Source§

fn hook_on_next<F>(self, callback: F) -> HookOnNext<Self, F>
where F: FnMut(&mut dyn Observer<T, E>, T),

Hooks into the emission of items, allowing mutation of the downstream observer.
Source§

fn hook_on_subscription<F>(self, callback: F) -> HookOnSubscription<Self, F>
where F: FnOnce(Self, BoxedObserver<'or, T, E>) -> Subscription<'sub>,

Hooks into subscription, letting you override how the source subscribes observers.
Source§

fn hook_on_termination<F>(self, callback: F) -> HookOnTermination<Self, F>
where F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),

Hooks into termination, providing access to the observer and termination payload.
Source§

fn ignore_elements(self) -> IgnoreElements<Self>

Ignores all items from the source, only relaying termination events.
Source§

fn into_boxed<'oe>(self) -> BoxedObservable<'or, 'sub, 'oe, T, E>
where T: 'or, E: 'or, Self: NecessarySendSync + 'oe,

Boxes the observable, erasing its concrete type while preserving lifetime bounds.
Source§

fn last(self) -> Last<Self>

Emits only the final item produced by the source before completion.
Source§

fn map<T1, F>(self, callback: F) -> Map<T, Self, F>
where F: FnMut(T) -> T1,

Transforms each item by applying a user-supplied mapping function.
Source§

fn map_infallible_to_error<E1>(self) -> MapInfallibleToError<E1, Self>

Maps an Observable with an Infallible error type to an Observable with a concrete error type.
Source§

fn map_infallible_to_value<V1>(self) -> MapInfallibleToValue<V1, Self>

Maps an Observable with an Infallible item type to an Observable with a concrete item type.
Source§

fn materialize(self) -> Materialize<Self>

Wraps each item into a notification, turning the stream into explicit events.
Source§

fn max(self) -> Max<Self>

Emits the maximum item produced by the source according to the natural order.
Source§

fn merge_all<T1>(self) -> MergeAll<Self, T>
where T: Observable<'or, 'sub, T1, E>,

Merges an observable-of-observables by interleaving items from inner streams.
Source§

fn merge_with<OE2>(self, source_2: OE2) -> Merge<Self, OE2>
where OE2: Observable<'or, 'sub, T, E>,

Merges the source with another observable, interleaving both streams concurrently.
Source§

fn min(self) -> Min<Self>

Emits the minimum item produced by the source according to the natural order.
Source§

fn multicast<S, F>(self, subject_maker: F) -> ConnectableObservable<Self, S>
where F: FnOnce() -> S,

Converts the source into a connectable observable using a subject factory.
Source§

fn observe_on<S>(self, scheduler: S) -> ObserveOn<Self, S>

Schedules downstream observation on the provided scheduler.
Source§

fn on_backpressure<F>(self, receiving_strategy: F) -> OnBackpressure<Self, F>
where F: FnMut(&mut Vec<T>, T),

Source§

fn on_backpressure_buffer(self) -> OnBackpressureBuffer<Self>

Source§

fn on_backpressure_latest(self) -> OnBackpressureLatest<Self>

Source§

fn publish(self) -> ConnectableObservable<Self, PublishSubject<'or, T, E>>

Multicasts the source using a PublishSubject.
Source§

fn publish_last(self) -> ConnectableObservable<Self, AsyncSubject<'or, T, E>>

Multicasts the source using an AsyncSubject, emitting only the last value.
Source§

fn reduce<T0, F>(self, initial_value: T0, callback: F) -> Reduce<T0, T, Self, F>
where F: FnMut(T0, T) -> T0,

Aggregates the sequence using an initial seed and an accumulator function.
Source§

fn replay( self, buffer_size: Option<usize>, ) -> ConnectableObservable<Self, ReplaySubject<'or, T, E>>

Multicasts the source using a ReplaySubject configured with the given buffer size.
Source§

fn retry<OE1, F>(self, callback: F) -> Retry<Self, F>
where OE1: Observable<'or, 'sub, T, E>, F: FnMut(E) -> RetryAction<E, OE1>,

Re-subscribes to the source based on the retry strategy returned by the callback.
Source§

fn sample<OE1>(self, sampler: OE1) -> Sample<Self, OE1>
where OE1: Observable<'or, 'sub, (), E>,

Samples the source whenever the sampler observable emits an event.
Source§

fn scan<T0, F>(self, initial_value: T0, callback: F) -> Scan<T0, T, Self, F>
where F: FnMut(T0, T) -> T0,

Accumulates values over time, emitting each intermediate result.
Source§

fn sequence_equal<OE2>(self, another_source: OE2) -> SequenceEqual<T, Self, OE2>
where OE2: Observable<'or, 'sub, T, E>,

Compares two sequences element by element for equality.
Source§

fn share(self) -> RefCount<'sub, Self, PublishSubject<'or, T, E>>

Shares a single subscription to the source using PublishSubject semantics.
Source§

fn share_last(self) -> RefCount<'sub, Self, AsyncSubject<'or, T, E>>

Shares a single subscription, replaying only the last item to new subscribers.
Source§

fn share_replay( self, buffer_size: Option<usize>, ) -> RefCount<'sub, Self, ReplaySubject<'or, T, E>>

Shares a single subscription while replaying a bounded history to future subscribers.
Source§

fn skip(self, count: usize) -> Skip<Self>

Skips the first count items before emitting the remainder of the sequence.
Source§

fn skip_last(self, count: usize) -> SkipLast<Self>

Skips the last count items emitted by the source.
Source§

fn skip_until<OE1>(self, start: OE1) -> SkipUntil<Self, OE1>
where OE1: Observable<'or, 'sub, (), E>,

Ignores items from the source until the notifier observable fires.
Source§

fn skip_while<F>(self, callback: F) -> SkipWhile<Self, F>
where F: FnMut(&T) -> bool,

Skips items while the predicate returns true, then emits the remaining items.
Source§

fn start_with<I>(self, values: I) -> StartWith<Self, I>
where I: IntoIterator<Item = T>,

Pre-pends the provided values before the source starts emitting.
Source§

fn subscribe_on<S>(self, scheduler: S) -> SubscribeOn<Self, S>

Subscribes to the source on the provided scheduler.
Source§

fn subscribe_with_callback<FN, FT>( self, on_next: FN, on_termination: FT, ) -> Subscription<'sub>
where T: 'or, E: 'or, FN: FnMut(T) + NecessarySendSync + 'or, FT: FnOnce(Termination<E>) + NecessarySendSync + 'or,

Convenience helper for subscribing with plain callbacks instead of a full observer.
Source§

fn sum(self) -> Sum<Self>

Sums all numeric items and emits the accumulated total.
Source§

fn switch<T1>(self) -> Switch<Self, T>
where T: Observable<'or, 'sub, T1, E>,

Switches to the most recent inner observable emitted by the source.
Source§

fn switch_map<T1, OE1, F>(self, callback: F) -> SwitchMap<T, Self, OE1, F>
where OE1: Observable<'or, 'sub, T1, E>, F: FnMut(T) -> OE1,

Maps each item to an observable and switches to the latest inner sequence.
Source§

fn take(self, count: usize) -> Take<Self>

Emits only the first count items from the source before completing.
Source§

fn take_last(self, count: usize) -> TakeLast<Self>

Emits only the last count items produced by the source.
Source§

fn take_until<OE1>(self, stop: OE1) -> TakeUntil<Self, OE1>
where OE1: Observable<'or, 'sub, (), E>,

Relays items until the notifier observable emits, then completes.
Source§

fn take_while<F>(self, callback: F) -> TakeWhile<Self, F>
where F: FnMut(&T) -> bool,

Emits items while the predicate holds true, then completes.
Source§

fn throttle<S>(self, time_span: Duration, scheduler: S) -> Throttle<Self, S>

Throttles emissions to at most one item per specified timespan.
Source§

fn time_interval(self) -> TimeInterval<Self>

Emits elapsed time between consecutive items as they flow through the stream.
Source§

fn timeout<S>(self, duration: Duration, scheduler: S) -> Timeout<Self, S>

Errors if the next item does not arrive within the specified duration.
Source§

fn timestamp(self) -> Timestamp<Self>

Annotates each item with the current timestamp when it is emitted.
Source§

fn window<OE1>(self, boundary: OE1) -> Window<Self, OE1>
where OE1: Observable<'or, 'sub, (), E>,

Collects items into windows that are opened and closed by another observable.
Source§

fn window_with_count(self, count: NonZeroUsize) -> WindowWithCount<Self>

Collects items into windows containing a fixed number of elements.
Source§

fn zip<T1, OE2>(self, another_source: OE2) -> Zip<Self, OE2>
where OE2: Observable<'or, 'sub, T1, E>,

Pairs items from both observables by index and emits tuples of corresponding values.
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> NecessarySendSync for T
where T: Send + Sync,