1use std::{
26 collections::HashMap,
27 pin::Pin,
28 sync::{Arc, Weak},
29 task::{Context, Poll},
30};
31
32use futures::stream::{FusedStream, Stream};
33use parking_lot::ReentrantMutex;
35use std::cell::RefCell;
36
37use crate::utils::{
38 id_sequence::SeqID,
39 mpsc::{TracingUnboundedReceiver, TracingUnboundedSender},
40};
41
42#[cfg(test)]
43mod tests;
44
45pub trait Unsubscribe {
47 fn unsubscribe(&mut self, subs_id: SeqID);
49}
50
51pub trait Subscribe<K> {
53 fn subscribe(&mut self, subs_key: K, subs_id: SeqID);
55}
56
57pub trait Dispatch<M> {
59 type Item;
61 type Ret;
63
64 fn dispatch<F>(&mut self, message: M, dispatch: F) -> Self::Ret
72 where
73 F: FnMut(&SeqID, Self::Item);
74}
75
76#[derive(Debug)]
82pub struct Hub<M, R> {
83 tracing_key: &'static str,
84 shared: Arc<ReentrantMutex<RefCell<Shared<M, R>>>>,
85}
86
87#[derive(Debug)]
92pub struct Receiver<M, R>
93where
94 R: Unsubscribe,
95{
96 rx: TracingUnboundedReceiver<M>,
97
98 shared: Weak<ReentrantMutex<RefCell<Shared<M, R>>>>,
99 subs_id: SeqID,
100}
101
102#[derive(Debug)]
103struct Shared<M, R> {
104 id_sequence: crate::utils::id_sequence::IDSequence,
105 registry: R,
106 sinks: HashMap<SeqID, TracingUnboundedSender<M>>,
107}
108
109impl<M, R> Hub<M, R>
110where
111 R: Unsubscribe,
112{
113 pub fn map_registry_for_tests<MapF, Ret>(&self, map: MapF) -> Ret
115 where
116 MapF: FnOnce(&R) -> Ret,
117 {
118 let shared_locked = self.shared.lock();
119 let shared_borrowed = shared_locked.borrow();
120 map(&shared_borrowed.registry)
121 }
122}
123
124impl<M, R> Drop for Receiver<M, R>
125where
126 R: Unsubscribe,
127{
128 fn drop(&mut self) {
129 if let Some(shared) = self.shared.upgrade() {
130 shared.lock().borrow_mut().unsubscribe(self.subs_id);
131 }
132 }
133}
134
135impl<M, R> Hub<M, R> {
136 pub fn new(tracing_key: &'static str) -> Self
138 where
139 R: Default,
140 {
141 Self::new_with_registry(tracing_key, Default::default())
142 }
143
144 pub fn new_with_registry(tracing_key: &'static str, registry: R) -> Self {
146 let shared =
147 Shared { registry, sinks: Default::default(), id_sequence: Default::default() };
148 let shared = Arc::new(ReentrantMutex::new(RefCell::new(shared)));
149 Self { tracing_key, shared }
150 }
151
152 pub fn subscribe<K>(&self, subs_key: K, queue_size_warning: usize) -> Receiver<M, R>
156 where
157 R: Subscribe<K> + Unsubscribe,
158 {
159 let shared_locked = self.shared.lock();
160 let mut shared_borrowed = shared_locked.borrow_mut();
161
162 let subs_id = shared_borrowed.id_sequence.next_id();
163
164 shared_borrowed.registry.subscribe(subs_key, subs_id);
168
169 let (tx, rx) = crate::utils::mpsc::tracing_unbounded(self.tracing_key, queue_size_warning);
170 assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique.");
171
172 Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx }
173 }
174
175 pub fn send<Trigger>(&self, trigger: Trigger) -> <R as Dispatch<Trigger>>::Ret
179 where
180 R: Dispatch<Trigger, Item = M>,
181 {
182 let shared_locked = self.shared.lock();
183 let mut shared_borrowed = shared_locked.borrow_mut();
184 let (registry, sinks) = shared_borrowed.get_mut();
185
186 registry.dispatch(trigger, |subs_id, item| {
187 if let Some(tx) = sinks.get_mut(subs_id) {
188 if let Err(send_err) = tx.unbounded_send(item) {
189 log::warn!("Sink with SubsID = {} failed to perform unbounded_send: {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", subs_id, send_err, std::any::type_name::<R>(),
190 std::any::type_name::<Trigger>(),
191 std::any::type_name::<M>());
192 }
193 } else {
194 log::warn!(
195 "No Sink for SubsID = {} ({} as Dispatch<{}, Item = {}>::dispatch(...))",
196 subs_id,
197 std::any::type_name::<R>(),
198 std::any::type_name::<Trigger>(),
199 std::any::type_name::<M>(),
200 );
201 }
202 })
203 }
204}
205
206impl<M, R> Shared<M, R> {
207 fn get_mut(&mut self) -> (&mut R, &mut HashMap<SeqID, TracingUnboundedSender<M>>) {
208 (&mut self.registry, &mut self.sinks)
209 }
210
211 fn unsubscribe(&mut self, subs_id: SeqID)
212 where
213 R: Unsubscribe,
214 {
215 self.sinks.remove(&subs_id);
219 self.registry.unsubscribe(subs_id);
220 }
221}
222
223impl<M, R> Clone for Hub<M, R> {
224 fn clone(&self) -> Self {
225 Self { tracing_key: self.tracing_key, shared: self.shared.clone() }
226 }
227}
228
229impl<M, R> Unpin for Receiver<M, R> where R: Unsubscribe {}
230
231impl<M, R> Stream for Receiver<M, R>
232where
233 R: Unsubscribe,
234{
235 type Item = M;
236
237 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
238 Pin::new(&mut self.get_mut().rx).poll_next(cx)
239 }
240}
241
242impl<Ch, R> FusedStream for Receiver<Ch, R>
243where
244 R: Unsubscribe,
245{
246 fn is_terminated(&self) -> bool {
247 self.rx.is_terminated()
248 }
249}