rx_rust/operators/creating/
create.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9/// Creates an Observable from scratch by means of a producer function.
10/// See <https://reactivex.io/documentation/operators/create.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     disposable::subscription::Subscription,
16///     observable::observable_ext::ObservableExt,
17///     observer::{boxed_observer::BoxedObserver, Observer, Termination},
18///     operators::creating::create::Create,
19/// };
20///
21/// let mut values = Vec::new();
22/// let mut terminations = Vec::new();
23///
24/// let observable = Create::new(|mut observer: BoxedObserver<'_, i32, ()>| {
25///     observer.on_next(42);
26///     observer.on_termination(Termination::Completed);
27///     Subscription::default()
28/// });
29///
30/// observable.subscribe_with_callback(
31///     |value| values.push(value),
32///     |termination| terminations.push(termination),
33/// );
34///
35/// assert_eq!(values, vec![42]);
36/// assert_eq!(terminations, vec![Termination::Completed]);
37/// ```
38#[derive(Educe)]
39#[educe(Debug, Clone)]
40pub struct Create<F>(F);
41
42impl<F> Create<F> {
43    pub fn new<'or, 'sub, T, E>(builder: F) -> Self
44    where
45        // Using `Subscription` instead of FnOnce() to make `Create` more easy to wrap other observables. See more in `test_unsubscribe_wrap_observable`.
46        F: FnOnce(BoxedObserver<'or, T, E>) -> Subscription<'sub>,
47    {
48        Self(builder)
49    }
50}
51
52impl<'or, 'sub, T, E, F> Observable<'or, 'sub, T, E> for Create<F>
53where
54    F: FnOnce(BoxedObserver<'or, T, E>) -> Subscription<'sub>,
55{
56    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
57        self.0(BoxedObserver::new(observer))
58    }
59}