another_rxrust/internals/
stream_controller.rs1use crate::prelude::*;
2use std::{
3 collections::HashMap,
4 sync::{Arc, RwLock},
5};
6
7use super::function_wrapper::FunctionWrapper;
8
9#[derive(Clone)]
10pub struct StreamController<'a, Item>
11where
12 Item: Clone + Send + Sync + 'a,
13{
14 serial: Arc<RwLock<i32>>,
15 subscriber: Observer<'a, Item>,
16 unscribers: Arc<RwLock<HashMap<i32, FunctionWrapper<'a, (), ()>>>>,
17 on_finalize: Arc<RwLock<Option<FunctionWrapper<'a, (), ()>>>>,
18}
19
20impl<'a, Item> StreamController<'a, Item>
21where
22 Item: Clone + Send + Sync,
23{
24 pub fn new(subscriber: Observer<Item>) -> StreamController<Item> {
25 let subscriber_ = subscriber.clone();
26 let sctl = StreamController {
27 serial: Arc::new(RwLock::new(0)),
28 subscriber,
29 unscribers: Arc::new(RwLock::new(HashMap::new())),
30 on_finalize: Arc::new(RwLock::new(None)),
31 };
32 {
33 let sctl = sctl.clone();
34 subscriber_.set_on_unsubscribe(move || sctl.finalize());
35 }
36 sctl
37 }
38
39 pub fn set_on_finalize<F>(&self, f: F)
40 where
41 F: Fn() + Send + Sync + 'a,
42 {
43 *self.on_finalize.write().unwrap() =
44 Some(FunctionWrapper::new(move |_| f()));
45 }
46
47 pub fn new_observer<XItem, Next, Error, Complete>(
48 &self,
49 next: Next,
50 error: Error,
51 complete: Complete,
52 ) -> Observer<'a, XItem>
53 where
54 XItem: Clone + Send + Sync + 'a,
55 Next: Fn(i32, XItem) + Send + Sync + 'a,
56 Error: Fn(i32, RxError) + Send + Sync + 'a,
57 Complete: Fn(i32) -> () + Send + Sync + 'a,
58 {
59 let serial = {
60 let mut x = self.serial.write().unwrap();
61 let ret = *x;
62 *x += 1;
63 ret
64 };
65
66 let serial_next = serial.clone();
67 let serial_error = serial.clone();
68 let serial_complete = serial.clone();
69 let observer = Observer::new(
70 move |x| next(serial_next, x),
71 move |e| error(serial_error, e),
72 move || complete(serial_complete),
73 );
74 let o_unsub = observer.clone();
75
76 let mut unsubscribers = self.unscribers.write().unwrap();
77 unsubscribers.insert(
78 serial.clone(),
79 FunctionWrapper::new(move |_| o_unsub.unsubscribe()),
80 );
81 observer
82 }
83
84 pub fn sink_next(&self, x: Item) {
85 if self.subscriber.is_subscribed() {
86 self.subscriber.next(x);
87 } else {
88 self.finalize();
89 }
90 }
91
92 pub fn sink_error(&self, e: RxError) {
93 if self.subscriber.is_subscribed() {
94 self.subscriber.error(e);
95 self.finalize();
96 } else {
97 self.finalize();
98 }
99 }
100
101 pub fn sink_complete(&self, serial: &i32) {
102 if self.subscriber.is_subscribed() {
103 let done_all = {
104 let mut observers = self.unscribers.write().unwrap();
105 observers.remove(serial);
106 observers.len() == 0
107 };
108 if done_all {
109 self.subscriber.complete();
110 self.finalize();
111 }
112 } else {
113 self.finalize();
114 }
115 }
116
117 pub fn sink_complete_force(&self) {
118 if self.subscriber.is_subscribed() {
119 self.subscriber.complete();
120 }
121 self.finalize();
122 }
123
124 pub fn upstream_abort_observe(&self, serial: &i32) {
125 let mut observers = self.unscribers.write().unwrap();
126 let o = observers.remove(serial);
127 if let Some(o) = o {
128 o.call(());
129 }
130 }
131
132 pub fn finalize(&self) {
133 self.unscribers.read().unwrap().iter().for_each(|x| {
134 x.1.call(());
135 });
136 self.unscribers.write().unwrap().clear();
137 if self.subscriber.is_subscribed() {
138 self.subscriber.unsubscribe();
139 }
140 let on_finalize = &mut *self.on_finalize.write().unwrap();
141 if let Some(f) = on_finalize {
142 f.call(());
143 *on_finalize = None;
144 }
145 }
146
147 pub fn is_subscribed(&self) -> bool {
148 self.subscriber.is_subscribed()
149 }
150}