another_rxrust/internals/
stream_controller.rs

1use crate::prelude::*;
2use std::{
3  collections::HashMap,
4  sync::{Arc, RwLock},
5};
6
7use super::function_wrapper::FunctionWrapper;
8
9#[derive(Clone)]
10pub struct StreamController<'a, Item>
11where
12  Item: Clone + Send + Sync + 'a,
13{
14  serial: Arc<RwLock<i32>>,
15  subscriber: Observer<'a, Item>,
16  unscribers: Arc<RwLock<HashMap<i32, FunctionWrapper<'a, (), ()>>>>,
17  on_finalize: Arc<RwLock<Option<FunctionWrapper<'a, (), ()>>>>,
18}
19
20impl<'a, Item> StreamController<'a, Item>
21where
22  Item: Clone + Send + Sync,
23{
24  pub fn new(subscriber: Observer<Item>) -> StreamController<Item> {
25    let subscriber_ = subscriber.clone();
26    let sctl = StreamController {
27      serial: Arc::new(RwLock::new(0)),
28      subscriber,
29      unscribers: Arc::new(RwLock::new(HashMap::new())),
30      on_finalize: Arc::new(RwLock::new(None)),
31    };
32    {
33      let sctl = sctl.clone();
34      subscriber_.set_on_unsubscribe(move || sctl.finalize());
35    }
36    sctl
37  }
38
39  pub fn set_on_finalize<F>(&self, f: F)
40  where
41    F: Fn() + Send + Sync + 'a,
42  {
43    *self.on_finalize.write().unwrap() =
44      Some(FunctionWrapper::new(move |_| f()));
45  }
46
47  pub fn new_observer<XItem, Next, Error, Complete>(
48    &self,
49    next: Next,
50    error: Error,
51    complete: Complete,
52  ) -> Observer<'a, XItem>
53  where
54    XItem: Clone + Send + Sync + 'a,
55    Next: Fn(i32, XItem) + Send + Sync + 'a,
56    Error: Fn(i32, RxError) + Send + Sync + 'a,
57    Complete: Fn(i32) -> () + Send + Sync + 'a,
58  {
59    let serial = {
60      let mut x = self.serial.write().unwrap();
61      let ret = *x;
62      *x += 1;
63      ret
64    };
65
66    let serial_next = serial.clone();
67    let serial_error = serial.clone();
68    let serial_complete = serial.clone();
69    let observer = Observer::new(
70      move |x| next(serial_next, x),
71      move |e| error(serial_error, e),
72      move || complete(serial_complete),
73    );
74    let o_unsub = observer.clone();
75
76    let mut unsubscribers = self.unscribers.write().unwrap();
77    unsubscribers.insert(
78      serial.clone(),
79      FunctionWrapper::new(move |_| o_unsub.unsubscribe()),
80    );
81    observer
82  }
83
84  pub fn sink_next(&self, x: Item) {
85    if self.subscriber.is_subscribed() {
86      self.subscriber.next(x);
87    } else {
88      self.finalize();
89    }
90  }
91
92  pub fn sink_error(&self, e: RxError) {
93    if self.subscriber.is_subscribed() {
94      self.subscriber.error(e);
95      self.finalize();
96    } else {
97      self.finalize();
98    }
99  }
100
101  pub fn sink_complete(&self, serial: &i32) {
102    if self.subscriber.is_subscribed() {
103      let done_all = {
104        let mut observers = self.unscribers.write().unwrap();
105        observers.remove(serial);
106        observers.len() == 0
107      };
108      if done_all {
109        self.subscriber.complete();
110        self.finalize();
111      }
112    } else {
113      self.finalize();
114    }
115  }
116
117  pub fn sink_complete_force(&self) {
118    if self.subscriber.is_subscribed() {
119      self.subscriber.complete();
120    }
121    self.finalize();
122  }
123
124  pub fn upstream_abort_observe(&self, serial: &i32) {
125    let mut observers = self.unscribers.write().unwrap();
126    let o = observers.remove(serial);
127    if let Some(o) = o {
128      o.call(());
129    }
130  }
131
132  pub fn finalize(&self) {
133    self.unscribers.read().unwrap().iter().for_each(|x| {
134      x.1.call(());
135    });
136    self.unscribers.write().unwrap().clear();
137    if self.subscriber.is_subscribed() {
138      self.subscriber.unsubscribe();
139    }
140    let on_finalize = &mut *self.on_finalize.write().unwrap();
141    if let Some(f) = on_finalize {
142      f.call(());
143      *on_finalize = None;
144    }
145  }
146
147  pub fn is_subscribed(&self) -> bool {
148    self.subscriber.is_subscribed()
149  }
150}