rx_rust/operators/creating/create.rs
1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9/// Creates an Observable from scratch by means of a producer function.
10/// See <https://reactivex.io/documentation/operators/create.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15/// disposable::subscription::Subscription,
16/// observable::observable_ext::ObservableExt,
17/// observer::{boxed_observer::BoxedObserver, Observer, Termination},
18/// operators::creating::create::Create,
19/// };
20///
21/// let mut values = Vec::new();
22/// let mut terminations = Vec::new();
23///
24/// let observable = Create::new(|mut observer: BoxedObserver<'_, i32, ()>| {
25/// observer.on_next(42);
26/// observer.on_termination(Termination::Completed);
27/// Subscription::default()
28/// });
29///
30/// observable.subscribe_with_callback(
31/// |value| values.push(value),
32/// |termination| terminations.push(termination),
33/// );
34///
35/// assert_eq!(values, vec![42]);
36/// assert_eq!(terminations, vec![Termination::Completed]);
37/// ```
38#[derive(Educe)]
39#[educe(Debug, Clone)]
40pub struct Create<F>(F);
41
42impl<F> Create<F> {
43 pub fn new<'or, 'sub, T, E>(builder: F) -> Self
44 where
45 // Using `Subscription` instead of FnOnce() to make `Create` more easy to wrap other observables. See more in `test_unsubscribe_wrap_observable`.
46 F: FnOnce(BoxedObserver<'or, T, E>) -> Subscription<'sub>,
47 {
48 Self(builder)
49 }
50}
51
52impl<'or, 'sub, T, E, F> Observable<'or, 'sub, T, E> for Create<F>
53where
54 F: FnOnce(BoxedObserver<'or, T, E>) -> Subscription<'sub>,
55{
56 fn subscribe(
57 self,
58 observer: impl Observer<T, E> + NecessarySendSync + 'or,
59 ) -> Subscription<'sub> {
60 self.0(BoxedObserver::new(observer))
61 }
62}