another_rxrust/operators/
map_to_any.rs1use std::any::Any;
2use std::sync::Arc;
3
4use crate::internals::stream_controller::*;
5use crate::prelude::*;
6
7#[derive(Clone)]
8pub struct MapToAny {}
9
10impl<'a> MapToAny {
11 pub fn new() -> MapToAny {
12 MapToAny {}
13 }
14 pub fn execute<Item>(
15 &self,
16 source: Observable<'a, Item>,
17 ) -> Observable<'a, Arc<Box<dyn Any + Send + Sync + 'static>>>
18 where
19 Item: Clone + Send + Sync + 'static,
20 {
21 Observable::<Arc<Box<dyn Any + Send + Sync + 'static>>>::create(move |s| {
22 let sctl = StreamController::new(s);
23 let sctl_next = sctl.clone();
24 let sctl_error = sctl.clone();
25 let sctl_complete = sctl.clone();
26
27 source.inner_subscribe(sctl.new_observer(
28 move |_, x| {
29 sctl_next.sink_next(Arc::new(Box::new(x)));
30 },
31 move |_, e| {
32 sctl_error.sink_error(e);
33 },
34 move |serial| sctl_complete.sink_complete(&serial),
35 ));
36 })
37 }
38}
39
40impl<'a, Item> Observable<'a, Item>
41where
42 Item: Clone + Send + Sync + 'static,
43{
44 pub fn map_to_any(
45 &self,
46 ) -> Observable<'a, Arc<Box<dyn Any + Send + Sync + 'static>>> {
47 MapToAny::new().execute(self.clone())
48 }
49}
50
51#[cfg(test)]
52mod test {
53 use crate::prelude::*;
54
55 #[test]
56 fn basic() {
57 observables::just(1).map_to_any().subscribe(
58 |x| {
59 println!("{}", x.downcast_ref::<i32>().unwrap());
60 },
61 print_error!(),
62 print_complete!(),
63 );
64 }
65
66 #[test]
67 fn combination_with_zip() {
68 let o = observables::range(0, 10);
69 o.map_to_any()
70 .zip(&[
71 o.map(|x| format!("string {}", x)).map_to_any(),
72 o.map(|x| x as f64 * 0.1).map_to_any(),
73 ])
74 .subscribe(
75 |x| {
76 println!(
77 "{} - {} - {}",
78 x[0].downcast_ref::<i64>().unwrap(),
79 x[1].downcast_ref::<String>().unwrap(),
80 x[2].downcast_ref::<f64>().unwrap(),
81 );
82 },
83 print_error!(),
84 print_complete!(),
85 );
86 }
87}