1use std::{
37 collections::HashMap,
38 pin::Pin,
39 sync::{Arc, Weak},
40 task::{Context, Poll},
41};
42
43use futures::stream::{FusedStream, Stream};
44use parking_lot::ReentrantMutex;
46use std::cell::RefCell;
47
48use crate::{
49 id_sequence::SeqID,
50 mpsc::{TracingUnboundedReceiver, TracingUnboundedSender},
51};
52
53#[cfg(test)]
54mod tests;
55
56pub trait Unsubscribe {
58 fn unsubscribe(&mut self, subs_id: SeqID);
60}
61
62pub trait Subscribe<K> {
64 fn subscribe(&mut self, subs_key: K, subs_id: SeqID);
66}
67
68pub trait Dispatch<M> {
70 type Item;
72 type Ret;
74
75 fn dispatch<F>(&mut self, message: M, dispatch: F) -> Self::Ret
83 where
84 F: FnMut(&SeqID, Self::Item);
85}
86
87#[derive(Debug)]
93pub struct Hub<M, R> {
94 tracing_key: &'static str,
95 shared: Arc<ReentrantMutex<RefCell<Shared<M, R>>>>,
96}
97
98#[derive(Debug)]
103pub struct Receiver<M, R>
104where
105 R: Unsubscribe,
106{
107 rx: TracingUnboundedReceiver<M>,
108
109 shared: Weak<ReentrantMutex<RefCell<Shared<M, R>>>>,
110 subs_id: SeqID,
111}
112
113#[derive(Debug)]
114struct Shared<M, R> {
115 id_sequence: crate::id_sequence::IDSequence,
116 registry: R,
117 sinks: HashMap<SeqID, TracingUnboundedSender<M>>,
118}
119
120impl<M, R> Hub<M, R>
121where
122 R: Unsubscribe,
123{
124 pub fn map_registry_for_tests<MapF, Ret>(&self, map: MapF) -> Ret
126 where
127 MapF: FnOnce(&R) -> Ret,
128 {
129 let shared_locked = self.shared.lock();
130 let shared_borrowed = shared_locked.borrow();
131 map(&shared_borrowed.registry)
132 }
133}
134
135impl<M, R> Drop for Receiver<M, R>
136where
137 R: Unsubscribe,
138{
139 fn drop(&mut self) {
140 if let Some(shared) = self.shared.upgrade() {
141 shared.lock().borrow_mut().unsubscribe(self.subs_id);
142 }
143 }
144}
145
146impl<M, R> Hub<M, R> {
147 pub fn new(tracing_key: &'static str) -> Self
149 where
150 R: Default,
151 {
152 Self::new_with_registry(tracing_key, Default::default())
153 }
154
155 pub fn new_with_registry(tracing_key: &'static str, registry: R) -> Self {
157 let shared =
158 Shared { registry, sinks: Default::default(), id_sequence: Default::default() };
159 let shared = Arc::new(ReentrantMutex::new(RefCell::new(shared)));
160 Self { tracing_key, shared }
161 }
162
163 pub fn subscribe<K>(&self, subs_key: K, queue_size_warning: usize) -> Receiver<M, R>
167 where
168 R: Subscribe<K> + Unsubscribe,
169 {
170 let shared_locked = self.shared.lock();
171 let mut shared_borrowed = shared_locked.borrow_mut();
172
173 let subs_id = shared_borrowed.id_sequence.next_id();
174
175 shared_borrowed.registry.subscribe(subs_key, subs_id);
179
180 let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key, queue_size_warning);
181 assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique.");
182
183 Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx }
184 }
185
186 pub fn send<Trigger>(&self, trigger: Trigger) -> <R as Dispatch<Trigger>>::Ret
190 where
191 R: Dispatch<Trigger, Item = M>,
192 {
193 let shared_locked = self.shared.lock();
194 let mut shared_borrowed = shared_locked.borrow_mut();
195 let (registry, sinks) = shared_borrowed.get_mut();
196
197 registry.dispatch(trigger, |subs_id, item| {
198 if let Some(tx) = sinks.get_mut(subs_id) {
199 if let Err(send_err) = tx.unbounded_send(item) {
200 log::warn!("Sink with SubsID = {} failed to perform unbounded_send: {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", subs_id, send_err, std::any::type_name::<R>(),
201 std::any::type_name::<Trigger>(),
202 std::any::type_name::<M>());
203 }
204 } else {
205 log::warn!(
206 "No Sink for SubsID = {} ({} as Dispatch<{}, Item = {}>::dispatch(...))",
207 subs_id,
208 std::any::type_name::<R>(),
209 std::any::type_name::<Trigger>(),
210 std::any::type_name::<M>(),
211 );
212 }
213 })
214 }
215}
216
217impl<M, R> Shared<M, R> {
218 fn get_mut(&mut self) -> (&mut R, &mut HashMap<SeqID, TracingUnboundedSender<M>>) {
219 (&mut self.registry, &mut self.sinks)
220 }
221
222 fn unsubscribe(&mut self, subs_id: SeqID)
223 where
224 R: Unsubscribe,
225 {
226 self.sinks.remove(&subs_id);
230 self.registry.unsubscribe(subs_id);
231 }
232}
233
234impl<M, R> Clone for Hub<M, R> {
235 fn clone(&self) -> Self {
236 Self { tracing_key: self.tracing_key, shared: self.shared.clone() }
237 }
238}
239
240impl<M, R> Unpin for Receiver<M, R> where R: Unsubscribe {}
241
242impl<M, R> Stream for Receiver<M, R>
243where
244 R: Unsubscribe,
245{
246 type Item = M;
247
248 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
249 Pin::new(&mut self.get_mut().rx).poll_next(cx)
250 }
251}
252
253impl<Ch, R> FusedStream for Receiver<Ch, R>
254where
255 R: Unsubscribe,
256{
257 fn is_terminated(&self) -> bool {
258 self.rx.is_terminated()
259 }
260}