another_rxrust/operators/
materialize.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct Materialize<Item>
7where
8 Item: Clone + Send + Sync,
9{
10 _item: PhantomData<Item>,
11}
12
13impl<'a, Item> Materialize<Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new() -> Materialize<Item> {
18 Materialize { _item: PhantomData }
19 }
20 pub fn execute(
21 &self,
22 source: Observable<'a, Item>,
23 ) -> Observable<'a, Material<Item>> {
24 Observable::create(move |s| {
25 let source = source.clone();
26
27 let sctl = StreamController::new(s);
28 let sctl_next = sctl.clone();
29 let sctl_error = sctl.clone();
30 let sctl_complete = sctl.clone();
31
32 source.inner_subscribe(sctl.new_observer(
33 move |_, x| {
34 sctl_next.sink_next(Material::Next(x));
35 },
36 move |serial, e| {
37 sctl_error.sink_next(Material::Error(e));
38 sctl_error.sink_complete(&serial);
39 },
40 move |serial| {
41 sctl_complete.sink_next(Material::Complete);
42 sctl_complete.sink_complete(&serial)
43 },
44 ));
45 })
46 }
47}
48
49impl<'a, Item> Observable<'a, Item>
50where
51 Item: Clone + Send + Sync,
52{
53 pub fn materialize(&self) -> Observable<'a, Material<Item>> {
54 Materialize::new().execute(self.clone())
55 }
56}
57
58#[cfg(test)]
59mod test {
60 use crate::prelude::*;
61
62 #[test]
63 fn basic() {
64 observables::from_iter(0..10).materialize().subscribe(
65 print_next_fmt!("{:?}"),
66 print_error!(),
67 print_complete!(),
68 );
69 }
70}