another_rxrust/operators/
materialize.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct Materialize<Item>
7where
8  Item: Clone + Send + Sync,
9{
10  _item: PhantomData<Item>,
11}
12
13impl<'a, Item> Materialize<Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new() -> Materialize<Item> {
18    Materialize { _item: PhantomData }
19  }
20  pub fn execute(
21    &self,
22    source: Observable<'a, Item>,
23  ) -> Observable<'a, Material<Item>> {
24    Observable::create(move |s| {
25      let source = source.clone();
26
27      let sctl = StreamController::new(s);
28      let sctl_next = sctl.clone();
29      let sctl_error = sctl.clone();
30      let sctl_complete = sctl.clone();
31
32      source.inner_subscribe(sctl.new_observer(
33        move |_, x| {
34          sctl_next.sink_next(Material::Next(x));
35        },
36        move |serial, e| {
37          sctl_error.sink_next(Material::Error(e));
38          sctl_error.sink_complete(&serial);
39        },
40        move |serial| {
41          sctl_complete.sink_next(Material::Complete);
42          sctl_complete.sink_complete(&serial)
43        },
44      ));
45    })
46  }
47}
48
49impl<'a, Item> Observable<'a, Item>
50where
51  Item: Clone + Send + Sync,
52{
53  pub fn materialize(&self) -> Observable<'a, Material<Item>> {
54    Materialize::new().execute(self.clone())
55  }
56}
57
58#[cfg(test)]
59mod test {
60  use crate::prelude::*;
61
62  #[test]
63  fn basic() {
64    observables::from_iter(0..10).materialize().subscribe(
65      print_next_fmt!("{:?}"),
66      print_error!(),
67      print_complete!(),
68    );
69  }
70}