Skip to main content

Observable

Trait Observable 

Source
pub trait Observable: ObservableOutput + WithPrimaryCategory {
    type Subscription<Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError>>: 'static + SubscriptionWithTeardown + Drop + Send + Sync;

    // Required method
    fn subscribe<Destination>(
        &mut self,
        destination: Destination,
    ) -> Self::Subscription<<Destination as UpgradeableObserver>::Upgraded>
       where Destination: 'static + UpgradeableObserver<In = Self::Out, InError = Self::OutError> + Send + Sync;
}
Expand description

§Observable

An observable is a signal-source-descriptor that can be subscribed to, allowing you to observe its signals.

Simply creating an observable instance will do nothing as it just defines how the subscriptions behave that it can create on subscribe.

§Signals

Anything an observable can push is a signal, not just the values you subscribe for, but errors, completions and unsubscribes too. These are the actions an observable can take.

§Subscribe

To subscribe to an observable, you must provide a destination, an observer, to which values and other observable signals will be forwarded to.

Calling subscribe will result in a Subscription that will contain the one (or more) Teardowns that can be used to release resources associated with this subscription, after which it is always safe to drop regardless of the kind of [Context][crate::SubscriptionContext] used.

§[Contexts][crate::SubscriptionContext]

Since everything is stored in subscription, the unit of execution is the subscription value. But not everything can be stored here: In some environments to react to an observed signal, a reference to something temporary is needed. For example, if you’re storing subscriptions in an ECS as a component on an entity, then to release that subscription, you must interact with the ECS. And since subscriptions can only interact with what they contain, that would mean we either store some reference under a lock to it, which would inevitably result in deadlocks, or since actions can only happen when something is pushed, we can also just pass in a context along with every pushed signal.

This is what contexts are for, to provide temporary references to things that may only live for the instant you’re pushing a signal.

This is possible because in rx_bevy new values cannot be produced without explicit action, like when a subscription is established, or when you push a new signal into an observer. But then how can timer-like Observables work like the IntervalObservable that emits a new value periodically? The answer is ticking. To signal the passage of time, a subscription must be “ticked”, which will result in moving the internal clocks of the subscriptions forward, and this action can result in additional signals, which is why ticking also require a context.

§Dropping Subscriptions

Subscriptions that were not unsubscribed when they are dropped will try to unsubscribe themselves. If you use a [DropUnsafeSubscriptionContext][crate::DropUnsafeSubscriptionContext], one that can’t just be created from the subscription itself (like unit ()), this will result in a panic. But do not worry, such contexts are only ever explicitly used, and are usually used in managed environment where you don’t directly handle subscriptions, such as in an ECS where everything is wrapped into components and events.

Note that not assigning the subscription to a variable (or assigning it to let _ =) will cause it to be immediately dropped, hence subscribe is #[must_use]!

Required Associated Types§

Source

type Subscription<Destination: 'static + Subscriber<In = Self::Out, InError = Self::OutError>>: 'static + SubscriptionWithTeardown + Drop + Send + Sync

The subscription produced by this Observable. As this is the only kind of subscription that is handled directly by users, only here are subscriptions required to implement Drop to ensure resources are released when the subscription is dropped.

Required Methods§

Source

fn subscribe<Destination>( &mut self, destination: Destination, ) -> Self::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = Self::Out, InError = Self::OutError> + Send + Sync,

Create a Subscription for this Observable. This action allocates resources to execute the behavior this Observable defines, essentially creating an instance of it.

The returned Subscription can be used to release the allocated resources and stop the subscription by calling unsubscribe

§Subscription Drop Behavior

If a subscription has not been unsubscribed manually, they will always attempt to unsubscribe themselves on drop.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementations on Foreign Types§

Source§

impl<'o, Source> Observable for RetryObservable<'o, Source>
where 'o: 'static, Source: 'o + Observable + Send + Sync,

Source§

type Subscription<Destination: 'static + Subscriber<In = <RetryObservable<'o, Source> as ObservableOutput>::Out, InError = <RetryObservable<'o, Source> as ObservableOutput>::OutError>> = SharedSubscription

Source§

fn subscribe<Destination>( &mut self, destination: Destination, ) -> <RetryObservable<'o, Source> as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = <RetryObservable<'o, Source> as ObservableOutput>::Out, InError = <RetryObservable<'o, Source> as ObservableOutput>::OutError> + Send + Sync,

Source§

impl<O> Observable for Option<O>
where O: Observable,

Source§

type Subscription<Destination: 'static + Subscriber<In = <Option<O> as ObservableOutput>::Out, InError = <Option<O> as ObservableOutput>::OutError>> = OptionSubscription<<O as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>>

Source§

fn subscribe<Destination>( &mut self, destination: Destination, ) -> <Option<O> as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = <Option<O> as ObservableOutput>::Out, InError = <Option<O> as ObservableOutput>::OutError> + Send + Sync,

Source§

impl<O> Observable for Arc<Mutex<O>>
where O: Observable,

Source§

type Subscription<Destination: 'static + Subscriber<In = <Arc<Mutex<O>> as ObservableOutput>::Out, InError = <Arc<Mutex<O>> as ObservableOutput>::OutError>> = <O as Observable>::Subscription<Destination>

Source§

fn subscribe<Destination>( &mut self, destination: Destination, ) -> <Arc<Mutex<O>> as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = <Arc<Mutex<O>> as ObservableOutput>::Out, InError = <Arc<Mutex<O>> as ObservableOutput>::OutError> + Send + Sync,

Source§

impl<O> Observable for Arc<RwLock<O>>
where O: Observable,

Source§

type Subscription<Destination: 'static + Subscriber<In = <Arc<RwLock<O>> as ObservableOutput>::Out, InError = <Arc<RwLock<O>> as ObservableOutput>::OutError>> = <O as Observable>::Subscription<Destination>

Source§

fn subscribe<Destination>( &mut self, destination: Destination, ) -> <Arc<RwLock<O>> as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = <Arc<RwLock<O>> as ObservableOutput>::Out, InError = <Arc<RwLock<O>> as ObservableOutput>::OutError> + Send + Sync,

Source§

impl<O> Observable for Weak<Mutex<O>>
where O: Observable,

Source§

type Subscription<Destination: 'static + Subscriber<In = <Weak<Mutex<O>> as ObservableOutput>::Out, InError = <Weak<Mutex<O>> as ObservableOutput>::OutError>> = OptionSubscription<<O as Observable>::Subscription<Destination>>

Source§

fn subscribe<Destination>( &mut self, destination: Destination, ) -> <Weak<Mutex<O>> as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = <Weak<Mutex<O>> as ObservableOutput>::Out, InError = <Weak<Mutex<O>> as ObservableOutput>::OutError> + Send + Sync,

Source§

impl<Source, ConnectorProvider> Observable for ShareObservable<Source, ConnectorProvider>
where Source: Observable, <Source as ObservableOutput>::Out: Clone, <Source as ObservableOutput>::OutError: Clone, ConnectorProvider: 'static + Provider, <ConnectorProvider as Provider>::Provided: SubjectLike<In = <Source as ObservableOutput>::Out, InError = <Source as ObservableOutput>::OutError> + Clone,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ShareObservable<Source, ConnectorProvider> as ObservableOutput>::Out, InError = <ShareObservable<Source, ConnectorProvider> as ObservableOutput>::OutError>> = <ConnectableObservable<Source, ConnectorProvider> as Observable>::Subscription<Destination>

Source§

fn subscribe<Destination>( &mut self, destination: Destination, ) -> <ShareObservable<Source, ConnectorProvider> as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = <ShareObservable<Source, ConnectorProvider> as ObservableOutput>::Out, InError = <ShareObservable<Source, ConnectorProvider> as ObservableOutput>::OutError> + Send + Sync,

Source§

impl<Source, S> Observable for SubscribeOnObservable<Source, S>
where Source: 'static + Observable + Send + Sync, S: 'static + Scheduler + Send + Sync,

Source§

type Subscription<Destination: 'static + Subscriber<In = <SubscribeOnObservable<Source, S> as ObservableOutput>::Out, InError = <SubscribeOnObservable<Source, S> as ObservableOutput>::OutError>> = SubscribeOnSubscription<Destination, Source>

Source§

fn subscribe<Destination>( &mut self, destination: Destination, ) -> <SubscribeOnObservable<Source, S> as Observable>::Subscription<<Destination as UpgradeableObserver>::Upgraded>
where Destination: 'static + UpgradeableObserver<In = <SubscribeOnObservable<Source, S> as ObservableOutput>::Out, InError = <SubscribeOnObservable<Source, S> as ObservableOutput>::OutError> + Send + Sync,

Implementors§

Source§

impl Observable for ClosedObservable

Source§

impl Observable for EmptyObservable

Source§

impl Observable for NeverObservable

Source§

type Subscription<Destination: 'static + Subscriber<In = <NeverObservable as ObservableOutput>::Out, InError = <NeverObservable as ObservableOutput>::OutError>> = Subscription<Destination>

Source§

impl<F, Source> Observable for DeferredObservable<F, Source>
where Source: Observable, F: FnMut() -> Source,

Source§

type Subscription<Destination: 'static + Subscriber<In = <DeferredObservable<F, Source> as ObservableOutput>::Out, InError = <DeferredObservable<F, Source> as ObservableOutput>::OutError>> = <Source as Observable>::Subscription<Destination>

Source§

impl<In, InError> Observable for BehaviorSubject<In, InError>
where In: Signal + Clone, InError: Signal + Clone,

Source§

type Subscription<Destination: 'static + Subscriber<In = <BehaviorSubject<In, InError> as ObservableOutput>::Out, InError = <BehaviorSubject<In, InError> as ObservableOutput>::OutError>> = MulticastSubscription<In, InError>

Source§

impl<In, InError> Observable for PublishSubject<In, InError>
where In: Signal + Clone, InError: Signal + Clone,

Source§

type Subscription<Destination: 'static + Subscriber<In = <PublishSubject<In, InError> as ObservableOutput>::Out, InError = <PublishSubject<In, InError> as ObservableOutput>::OutError>> = MulticastSubscription<In, InError>

Source§

impl<In, InError, Reducer> Observable for AsyncSubject<In, InError, Reducer>
where Reducer: 'static + FnMut(In, In) -> In + Send + Sync, In: Signal + Clone, InError: Signal + Clone,

Source§

type Subscription<Destination: 'static + Subscriber<In = <AsyncSubject<In, InError, Reducer> as ObservableOutput>::Out, InError = <AsyncSubject<In, InError, Reducer> as ObservableOutput>::OutError>> = MulticastSubscription<In, InError>

Source§

impl<Iterator> Observable for IteratorObservable<Iterator>
where Iterator: Clone + IntoIterator, <Iterator as IntoIterator>::Item: Signal,

Source§

type Subscription<Destination: 'static + Subscriber<In = <IteratorObservable<Iterator> as ObservableOutput>::Out, InError = <IteratorObservable<Iterator> as ObservableOutput>::OutError>> = InertSubscription

Source§

impl<Iterator, S> Observable for IteratorOnTickObservable<Iterator, S>
where Iterator: 'static + Clone + IntoIterator, <Iterator as IntoIterator>::Item: Signal, <Iterator as IntoIterator>::IntoIter: Send + Sync, S: 'static + Scheduler + Send + Sync,

Source§

type Subscription<Destination: 'static + Subscriber<In = <IteratorOnTickObservable<Iterator, S> as ObservableOutput>::Out, InError = <IteratorOnTickObservable<Iterator, S> as ObservableOutput>::OutError>> = OnTickIteratorSubscription<Destination, Iterator, S>

Source§

impl<O1, O2> Observable for CombineChangesObservable<O1, O2>
where O1: 'static + Send + Sync + Observable, <O1 as ObservableOutput>::Out: Clone, O2: 'static + Send + Sync + Observable, <O2 as ObservableOutput>::Out: Clone, <O2 as ObservableOutput>::OutError: Into<<O1 as ObservableOutput>::OutError>,

Source§

impl<O1, O2> Observable for CombineLatestObservable<O1, O2>
where O1: 'static + Send + Sync + Observable, <O1 as ObservableOutput>::Out: Clone, O2: 'static + Send + Sync + Observable, <O2 as ObservableOutput>::Out: Clone, <O2 as ObservableOutput>::OutError: Into<<O1 as ObservableOutput>::OutError>,

Source§

impl<O1, O2> Observable for JoinObservable<O1, O2>
where O1: 'static + Send + Sync + Observable, <O1 as ObservableOutput>::Out: Clone, O2: 'static + Send + Sync + Observable, <O2 as ObservableOutput>::Out: Clone, <O2 as ObservableOutput>::OutError: Into<<O1 as ObservableOutput>::OutError>,

Source§

type Subscription<Destination: 'static + Subscriber<In = <JoinObservable<O1, O2> as ObservableOutput>::Out, InError = <JoinObservable<O1, O2> as ObservableOutput>::OutError>> = SharedSubscription

Source§

impl<O1, O2> Observable for ZipObservable<O1, O2>
where O1: 'static + Send + Sync + Observable, <O1 as ObservableOutput>::Out: Clone, O2: 'static + Send + Sync + Observable, <O2 as ObservableOutput>::Out: Clone, <O2 as ObservableOutput>::OutError: Into<<O1 as ObservableOutput>::OutError>,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ZipObservable<O1, O2> as ObservableOutput>::Out, InError = <ZipObservable<O1, O2> as ObservableOutput>::OutError>> = SharedSubscription

Source§

impl<Out> Observable for JustObservable<Out>
where Out: Signal + Clone,

Source§

impl<Out, OutError> Observable for ErasedObservable<Out, OutError>
where Out: Signal, OutError: Signal,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ErasedObservable<Out, OutError> as ObservableOutput>::Out, InError = <ErasedObservable<Out, OutError> as ObservableOutput>::OutError>> = SubscriptionData

Source§

impl<Out, OutError, const SIZE: usize> Observable for ConcatObservable<Out, OutError, SIZE>
where Out: Signal, OutError: Signal,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ConcatObservable<Out, OutError, SIZE> as ObservableOutput>::Out, InError = <ConcatObservable<Out, OutError, SIZE> as ObservableOutput>::OutError>> = SharedSubscription

Source§

impl<Out, OutError, const SIZE: usize> Observable for MergeObservable<Out, OutError, SIZE>
where Out: Signal, OutError: Signal,

Source§

type Subscription<Destination: 'static + Subscriber<In = <MergeObservable<Out, OutError, SIZE> as ObservableOutput>::Out, InError = <MergeObservable<Out, OutError, SIZE> as ObservableOutput>::OutError>> = SharedSubscription

Source§

impl<OutError> Observable for ThrowObservable<OutError>
where OutError: Signal + Clone,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ThrowObservable<OutError> as ObservableOutput>::Out, InError = <ThrowObservable<OutError> as ObservableOutput>::OutError>> = InertSubscription

Source§

impl<Producer, Out, OutError> Observable for CreateObservable<Producer, Out, OutError>
where Out: Signal, OutError: Signal, Producer: Clone + FnOnce(&mut dyn Subscriber<In = Out, InError = OutError>),

Source§

type Subscription<Destination: 'static + Subscriber<In = <CreateObservable<Producer, Out, OutError> as ObservableOutput>::Out, InError = <CreateObservable<Producer, Out, OutError> as ObservableOutput>::OutError>> = SharedSubscription

Source§

impl<Provenance, In, InError> Observable for ProvenanceSubject<Provenance, In, InError>
where Provenance: Signal + Clone + PartialEq, In: Signal + Clone, InError: Signal + Clone,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ProvenanceSubject<Provenance, In, InError> as ObservableOutput>::Out, InError = <ProvenanceSubject<Provenance, In, InError> as ObservableOutput>::OutError>> = MulticastSubscription<(In, Provenance), InError>

Source§

impl<S> Observable for IntervalObservable<S>
where S: 'static + Scheduler + Send + Sync,

Source§

type Subscription<Destination: 'static + Subscriber<In = <IntervalObservable<S> as ObservableOutput>::Out, InError = <IntervalObservable<S> as ObservableOutput>::OutError>> = IntervalSubscription<Destination, S>

Source§

impl<S> Observable for TimerObservable<S>
where S: 'static + Scheduler + Send + Sync,

Source§

type Subscription<Destination: 'static + Subscriber<In = <TimerObservable<S> as ObservableOutput>::Out, InError = <TimerObservable<S> as ObservableOutput>::OutError>> = TimerSubscription<Destination, S>

Source§

impl<Source, ConnectorProvider> Observable for ConnectableObservable<Source, ConnectorProvider>
where Source: Observable, ConnectorProvider: 'static + Provider, <ConnectorProvider as Provider>::Provided: SubjectLike<In = <Source as ObservableOutput>::Out, InError = <Source as ObservableOutput>::OutError> + Clone, <Source as Observable>::Subscription<ConnectionSubscriber<<ConnectorProvider as Provider>::Provided>>: 'static + TeardownCollection,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ConnectableObservable<Source, ConnectorProvider> as ObservableOutput>::Out, InError = <ConnectableObservable<Source, ConnectorProvider> as ObservableOutput>::OutError>> = <<ConnectorProvider as Provider>::Provided as Observable>::Subscription<Destination>

Source§

impl<Source, Op> Observable for Pipe<Source, Op>
where Source: Observable, Op: 'static + ComposableOperator<In = <Source as ObservableOutput>::Out, InError = <Source as ObservableOutput>::OutError>,

Source§

type Subscription<Destination: 'static + Subscriber<In = <Pipe<Source, Op> as ObservableOutput>::Out, InError = <Pipe<Source, Op> as ObservableOutput>::OutError>> = <Source as Observable>::Subscription<<Op as ComposableOperator>::Subscriber<<Destination as UpgradeableObserver>::Upgraded>>

Source§

impl<const CAPACITY: usize, In, InError> Observable for ReplaySubject<CAPACITY, In, InError>
where In: Signal + Clone, InError: Signal + Clone,

Source§

type Subscription<Destination: 'static + Subscriber<In = <ReplaySubject<CAPACITY, In, InError> as ObservableOutput>::Out, InError = <ReplaySubject<CAPACITY, In, InError> as ObservableOutput>::OutError>> = MulticastSubscription<In, InError>