1use std::any::Any;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::rc::Rc;
6use yew::prelude::*;
7
8type SynchiStore = HashMap<&'static str, Rc<RefCell<HashMap<usize, DataStatus>>>>;
9type SynchiSubscriber = Callback<Box<dyn Any>>;
10
11enum DataStatus {
12 Free(Box<dyn Any>),
13 Claimed(SynchiSubscriber, Box<dyn Any>),
14}
15
16thread_local! {
17 static SYNCHI: Rc<RefCell<SynchiStore>> = Rc::new(RefCell::new(HashMap::new()));
18 static SYNCHI_COUNTERS: RefCell<HashMap<&'static str, usize>> = RefCell::new(HashMap::new());
19 static SYNCHI_SUBSCRIBERS: RefCell<HashMap<&'static str, Vec<SynchiSubscriber>>> =
20 RefCell::new(HashMap::new());
21}
22
23pub trait Merge {
24 fn merge(&self, other: &Self) -> Self;
25}
26
27fn register_channel<T>(name: &'static str, data: T) -> Option<usize>
28where
29 T: Any,
30{
31 SYNCHI.with(|store| {
32 let mut store = store.borrow_mut();
33
34 if let Some(existing_channel) = store.get(name) {
35 let mut channel = existing_channel.borrow_mut();
36
37 let index = SYNCHI_COUNTERS.with(|counters| {
38 let mut counters = counters.borrow_mut();
39
40 if let Some(counter) = counters.get_mut(name) {
41 *counter += 1;
42 *counter
43 } else {
44 counters.insert(name, 0);
45 0
46 }
47 });
48
49 channel.insert(index, DataStatus::Free(Box::new(data)));
50 return Some(index);
51 }
52
53 SYNCHI_COUNTERS.with(|counters| {
54 let mut counters = counters.borrow_mut();
55 counters.insert(name, 0);
56 });
57
58 let new_channel = Rc::new(RefCell::new(HashMap::<usize, DataStatus>::from_iter(vec![
59 (0, DataStatus::Free(Box::new(data))),
60 ])));
61
62 store.insert(name, new_channel);
63 Some(0)
64 })
65}
66
67fn unregister_channel(name: &'static str, index: usize) -> usize {
68 SYNCHI.with(|store| {
69 let store = store.borrow_mut();
70
71 if let Some(channel) = store.get(name) {
72 let mut channel = channel.borrow_mut();
73 channel.remove(&index);
74
75 channel.len()
76 } else {
77 panic!("Failed to get SYNCHI channel");
78 }
79 })
80}
81
82fn subscribe_to_channel<T>(name: &'static str, indexes: Vec<usize>, callback: SynchiSubscriber)
83where
84 T: Any + Clone + Debug + Default + Merge,
85{
86 let mut no_new_subscriber = false;
87
88 SYNCHI_SUBSCRIBERS.with(|subscribers| {
89 let mut subscribers = subscribers.borrow_mut();
90
91 if let Some(existing_subscribers) = subscribers.get_mut(name) {
92 for existing_subscriber in existing_subscribers.iter() {
93 if existing_subscriber == &callback {
94 no_new_subscriber = true;
95 return;
96 }
97 }
98
99 existing_subscribers.push(callback.clone());
100 } else {
101 subscribers.insert(name, vec![callback.clone()]);
102 }
103 });
104
105 if no_new_subscriber {
106 return;
107 }
108
109 SYNCHI.with(|store| {
110 let store = store.borrow_mut();
111
112 if let Some(channel) = store.get(name) {
113 let mut channel = channel.borrow_mut();
114 let mut merged_data = T::default();
115
116 for index in indexes {
117 if let Some(data_status) = channel.get_mut(&index) {
118 match data_status {
119 DataStatus::Free(data) => {
120 if let Some(data) = data.downcast_ref::<T>() {
121 merged_data = merged_data.merge(data);
122
123 *data_status =
124 DataStatus::Claimed(callback.clone(), Box::new(data.clone()));
125 }
126 }
127 DataStatus::Claimed(subscriber, data) => {
128 if *subscriber == callback {
129 if let Some(data) = data.downcast_ref::<T>() {
130 merged_data = merged_data.merge(data);
131 }
132 }
133 }
134 }
135 }
136 }
137
138 callback.emit(Box::new(merged_data));
139 }
140 });
141}
142
143fn unsubscribe_from_channel<T>(name: &'static str, callback: SynchiSubscriber)
144where
145 T: Any + Clone + Debug + Default + Merge,
146{
147 SYNCHI_SUBSCRIBERS.with(|subscribers| {
148 let mut subscribers = subscribers.borrow_mut();
149
150 if let Some(existing_subscribers) = subscribers.get_mut(name) {
151 existing_subscribers.retain(|subscriber| subscriber != &callback);
152 }
153 });
154
155 SYNCHI.with(|store| {
156 let store = store.borrow_mut();
157
158 if let Some(channel) = store.get(name) {
159 let mut channel = channel.borrow_mut();
160
161 for index in 0..channel.len() {
162 if let Some(data_status) = channel.get_mut(&index) {
163 if let DataStatus::Claimed(subscriber, data) = data_status {
164 if *subscriber == callback {
165 if let Some(data) = data.downcast_ref::<T>() {
166 *data_status = DataStatus::Free(Box::new(data.clone()));
167 }
168 }
169 }
170 }
171 }
172 }
173 });
174}
175
176fn notify_subscriber<T>(name: &'static str, callback: SynchiSubscriber)
177where
178 T: Any + Clone + Debug + Default + Merge,
179{
180 SYNCHI.with(|store| {
181 let store = store.borrow_mut();
182
183 if let Some(channel) = store.get(name) {
184 let mut channel = channel.borrow_mut();
185 let mut merged_data = T::default();
186
187 for index in 0..channel.len() {
188 if let Some(DataStatus::Claimed(subscriber, data)) = channel.get_mut(&index) {
189 if *subscriber == callback {
190 if let Some(data) = data.downcast_ref::<T>() {
191 merged_data = merged_data.merge(data);
192 }
193 }
194 }
195 }
196
197 callback.emit(Box::new(merged_data));
198 }
199 });
200}
201
202#[derive(Debug, Clone, PartialEq)]
203pub struct SynchiChannel<T>
204where
205 T: Any + Clone + Default,
206{
207 pub name: &'static str,
208 pub index: usize,
209 _marker: std::marker::PhantomData<T>,
210}
211
212impl<T> Drop for SynchiChannel<T>
213where
214 T: Any + Clone + Default,
215{
216 fn drop(&mut self) {
217 let channel_len = unregister_channel(self.name, self.index);
218
219 if channel_len == 0 {
220 SYNCHI.with(|store| {
221 let mut store = store.borrow_mut();
222
223 if let Some(channel) = store.remove(self.name) {
224 drop(channel);
225 }
226 });
227
228 SYNCHI_COUNTERS.with(|counters| {
229 let mut counters = counters.borrow_mut();
230 counters.remove(self.name);
231 });
232
233 SYNCHI_SUBSCRIBERS.with(|subscribers| {
234 let mut subscribers = subscribers.borrow_mut();
235 subscribers.remove(self.name);
236 });
237 }
238 }
239}
240
241impl<T> SynchiChannel<T>
242where
243 T: Any + Clone + Debug + Merge + Default,
244{
245 pub fn new(name: &'static str) -> Self {
246 if let Some(index) = register_channel(name, T::default()) {
247 SynchiChannel {
248 name,
249 index,
250 _marker: std::marker::PhantomData,
251 }
252 } else {
253 panic!("Failed to register SYNCHI channel");
254 }
255 }
256
257 pub fn new_with_data(name: &'static str, data: T) -> Self {
258 if let Some(index) = register_channel(name, data) {
259 SynchiChannel {
260 name,
261 index,
262 _marker: std::marker::PhantomData,
263 }
264 } else {
265 panic!("Failed to register SYNCHI channel");
266 }
267 }
268
269 pub fn pull(&self) -> T {
270 SYNCHI.with(|store| {
271 let store = store.borrow();
272
273 if let Some(channel) = store.get(self.name) {
274 let channel = channel.borrow();
275
276 if let Some(data) = channel.get(&self.index) {
277 match data {
278 DataStatus::Free(data) => {
279 let data = data.downcast_ref::<T>().unwrap();
280 data.clone()
281 }
282 DataStatus::Claimed(_, data) => {
283 let data = data.downcast_ref::<T>().unwrap();
284 data.clone()
285 }
286 }
287 } else {
288 panic!("Failed to get SYNCHI channel data");
289 }
290 } else {
291 panic!("Failed to get SYNCHI channel");
292 }
293 })
294 }
295
296 pub fn push(&self, data: T) {
297 let mut target_subscriber = None::<SynchiSubscriber>;
298
299 SYNCHI.with(|store| {
300 let store = store.borrow_mut();
301
302 if let Some(channel) = store.get(self.name) {
303 let mut channel = channel.borrow_mut();
304
305 if let Some(channel_data) = channel.get_mut(&self.index) {
306 match channel_data {
307 DataStatus::Free(_) => {
308 *channel_data = DataStatus::Free(Box::new(data.clone()));
309 }
310 DataStatus::Claimed(subscriber, _) => {
311 target_subscriber = subscriber.clone().into();
312
313 *channel_data =
314 DataStatus::Claimed(subscriber.clone(), Box::new(data.clone()));
315 }
316 }
317 } else {
318 panic!("Failed to get SYNCHI channel data");
319 }
320 } else {
321 panic!("Failed to get SYNCHI channel");
322 }
323 });
324
325 if let Some(subscriber) = target_subscriber {
326 notify_subscriber::<T>(self.name, subscriber.clone());
327 }
328 }
329}
330
331#[hook]
332pub fn use_synchi_channel<T>(name: &'static str) -> Rc<RefCell<SynchiChannel<T>>>
333where
334 T: Any + Clone + Default + Merge + Debug,
335{
336 use_mut_ref(|| SynchiChannel::<T>::new(name))
337}
338
339#[hook]
340pub fn use_synchi_channel_with<T>(name: &'static str, data: T) -> Rc<RefCell<SynchiChannel<T>>>
341where
342 T: Any + Clone + Default + PartialEq + Merge + Debug,
343{
344 use_mut_ref(|| SynchiChannel::<T>::new_with_data(name, data.clone()))
345}
346
347#[hook]
348pub fn use_synchi_channel_subscribe<T>(name: &'static str, indexes: Vec<usize>) -> UseStateHandle<T>
349where
350 T: Any + Clone + Default + PartialEq + Debug + Merge,
351{
352 let data = use_state(|| T::default());
353
354 let subscriber = {
355 let data = data.clone();
356
357 use_callback((), move |next_data: Box<dyn Any>, _| {
358 if let Some(next_data) = next_data.downcast_ref::<T>() {
359 data.set(next_data.clone());
360 } else {
361 panic!("Failed to downcast SYNCHI channel \"{name}\" data: {next_data:?}");
362 }
363 })
364 };
365
366 subscribe_to_channel::<T>(name, indexes, subscriber.clone());
367
368 use_effect_with((), move |_| {
369 let subscriber = subscriber.clone();
370
371 move || {
372 unsubscribe_from_channel::<T>(name, subscriber);
373 }
374 });
375
376 data
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use serial_test::serial;
383
384 #[derive(Debug, Clone, PartialEq, Default)]
385 struct MergeInt(i32);
386
387 impl Merge for MergeInt {
388 fn merge(&self, other: &Self) -> Self {
389 MergeInt(self.0 + other.0)
390 }
391 }
392
393 #[test]
394 #[serial]
395 fn test_synchi_channel() {
396 let channel = SynchiChannel::<MergeInt>::new("test");
397 assert_eq!(channel.index, 0);
398 }
399
400 #[test]
401 #[serial]
402 fn test_several_values_for_synchi_channel() {
403 let channel = SynchiChannel::<MergeInt>::new("test");
404 assert_eq!(channel.index, 0);
405
406 let channel = SynchiChannel::<MergeInt>::new("test");
407 assert_eq!(channel.index, 1);
408 }
409
410 #[test]
411 #[serial]
412 fn test_pull_value_from_synchi_channel() {
413 let channel1 = SynchiChannel::<MergeInt>::new("test");
414 assert_eq!(channel1.index, 0);
415
416 let channel2 = SynchiChannel::<MergeInt>::new("test");
417 assert_eq!(channel2.index, 1);
418
419 let channel3 = SynchiChannel::<MergeInt>::new("test");
420 assert_eq!(channel3.index, 2);
421
422 assert_eq!(channel1.pull(), MergeInt(0));
423 assert_eq!(channel2.pull(), MergeInt(0));
424 assert_eq!(channel3.pull(), MergeInt(0));
425 }
426
427 #[test]
428 #[serial]
429 fn test_push_value_to_synchi_channel() {
430 let channel = SynchiChannel::<MergeInt>::new("test");
431 assert_eq!(channel.index, 0);
432
433 channel.push(MergeInt(42));
434 assert_eq!(channel.pull(), MergeInt(42));
435 }
436
437 #[test]
438 fn test_push_several_values_to_synchi_channel() {
439 let channel = SynchiChannel::<MergeInt>::new("test");
440 assert_eq!(channel.index, 0);
441
442 channel.push(MergeInt(42));
443 assert_eq!(channel.pull(), MergeInt(42));
444
445 channel.push(MergeInt(43));
446 assert_eq!(channel.pull(), MergeInt(43));
447 }
448
449 #[test]
450 #[serial]
451 fn test_push_several_values_from_several_channels_to_synchi_channel() {
452 let channel1 = SynchiChannel::<MergeInt>::new("test");
453 assert_eq!(channel1.index, 0);
454
455 let channel2 = SynchiChannel::<MergeInt>::new("test");
456 assert_eq!(channel2.index, 1);
457
458 let channel3 = SynchiChannel::<MergeInt>::new("test");
459 assert_eq!(channel3.index, 2);
460
461 channel1.push(MergeInt(42));
462 assert_eq!(channel1.pull(), MergeInt(42));
463
464 channel2.push(MergeInt(43));
465 assert_eq!(channel2.pull(), MergeInt(43));
466
467 channel3.push(MergeInt(44));
468 assert_eq!(channel3.pull(), MergeInt(44));
469
470 assert_eq!(channel1.pull(), MergeInt(42));
472 assert_eq!(channel2.pull(), MergeInt(43));
473 assert_eq!(channel3.pull(), MergeInt(44));
474 }
475
476 #[test]
477 #[serial]
478 fn test_if_one_subscriber_is_notified() {
479 let channel = SynchiChannel::<MergeInt>::new("test");
480 assert_eq!(channel.index, 0);
481
482 let received_data = Rc::new(RefCell::new(Vec::new()));
483
484 let subscriber = {
485 let received_data = received_data.clone();
486
487 Callback::from(move |data: Box<dyn Any>| {
488 if let Some(data) = data.downcast_ref::<MergeInt>() {
489 received_data.borrow_mut().push(data.clone());
490 } else {
491 panic!("Failed to downcast SYNCHI channel data");
492 }
493 })
494 };
495
496 subscribe_to_channel::<MergeInt>("test", vec![0], subscriber.clone());
498
499 assert_eq!(received_data.borrow().len(), 1);
501 assert_eq!(received_data.borrow()[0], MergeInt(0));
502
503 channel.push(MergeInt(42));
504 assert_eq!(channel.pull(), MergeInt(42));
505 assert_eq!(received_data.borrow().len(), 2);
506 assert_eq!(received_data.borrow()[1], MergeInt(42));
507
508 channel.push(MergeInt(43));
509 assert_eq!(channel.pull(), MergeInt(43));
510 assert_eq!(received_data.borrow().len(), 3);
511 assert_eq!(received_data.borrow()[2], MergeInt(43));
512
513 unsubscribe_from_channel::<MergeInt>("test", subscriber);
514
515 channel.push(MergeInt(44));
517 assert_eq!(channel.pull(), MergeInt(44));
518 assert_eq!(received_data.borrow().len(), 3);
519 }
520
521 #[test]
522 #[serial]
523 fn test_several_subscribers_for_one_channel() {
524 let channel = SynchiChannel::<MergeInt>::new("test");
525 assert_eq!(channel.index, 0);
526
527 let received_data_1 = Rc::new(RefCell::new(Vec::new()));
528
529 let subscriber_1 = {
530 let received_data = received_data_1.clone();
531
532 Callback::from(move |data: Box<dyn Any>| {
533 if let Some(data) = data.downcast_ref::<MergeInt>() {
534 received_data.borrow_mut().push(data.clone());
535 } else {
536 panic!("Failed to downcast SYNCHI channel data");
537 }
538 })
539 };
540
541 let received_data_2 = Rc::new(RefCell::new(Vec::new()));
542
543 let subscriber_2 = {
544 let received_data = received_data_2.clone();
545
546 Callback::from(move |data: Box<dyn Any>| {
547 if let Some(data) = data.downcast_ref::<MergeInt>() {
548 received_data.borrow_mut().push(data.clone());
549 } else {
550 panic!("Failed to downcast SYNCHI channel data");
551 }
552 })
553 };
554
555 subscribe_to_channel::<MergeInt>("test", vec![0], subscriber_1.clone());
557 subscribe_to_channel::<MergeInt>("test", vec![1], subscriber_2.clone());
558
559 assert_eq!(received_data_1.borrow().len(), 1);
561 assert_eq!(received_data_1.borrow()[0], MergeInt(0));
562 assert_eq!(received_data_2.borrow().len(), 1);
563 assert_eq!(received_data_2.borrow()[0], MergeInt(0));
564
565 channel.push(MergeInt(42));
567 assert_eq!(channel.pull(), MergeInt(42));
568 assert_eq!(received_data_1.borrow().len(), 2);
569 assert_eq!(received_data_1.borrow()[1], MergeInt(42));
570 assert_eq!(received_data_2.borrow().len(), 1);
571
572 unsubscribe_from_channel::<MergeInt>("test", subscriber_1);
574
575 channel.push(MergeInt(44));
577 assert_eq!(channel.pull(), MergeInt(44));
578 assert_eq!(received_data_1.borrow().len(), 2);
579
580 assert_eq!(received_data_2.borrow().len(), 1);
582
583 let received_data_3 = Rc::new(RefCell::new(Vec::new()));
585
586 let subscriber_3 = {
587 let received_data = received_data_3.clone();
588
589 Callback::from(move |data: Box<dyn Any>| {
590 if let Some(data) = data.downcast_ref::<MergeInt>() {
591 received_data.borrow_mut().push(data.clone());
592 } else {
593 panic!("Failed to downcast SYNCHI channel data");
594 }
595 })
596 };
597
598 subscribe_to_channel::<MergeInt>("test", vec![0], subscriber_3.clone());
599
600 assert_eq!(received_data_3.borrow().len(), 1);
601 assert_eq!(received_data_3.borrow()[0], MergeInt(44));
602
603 assert_eq!(received_data_2.borrow().len(), 1);
605 assert_eq!(received_data_2.borrow()[0], MergeInt(0));
606 }
607
608 #[test]
609 #[serial]
610 fn test_one_subscriber_for_multiple_subchannels() {
611 let channel_1 = SynchiChannel::<MergeInt>::new("test");
612 let channel_2 = SynchiChannel::<MergeInt>::new("test");
613
614 let received_data = Rc::new(RefCell::new(Vec::new()));
615
616 let subscriber = {
617 let received_data = received_data.clone();
618
619 Callback::from(move |data: Box<dyn Any>| {
620 if let Some(data) = data.downcast_ref::<MergeInt>() {
621 received_data.borrow_mut().push(data.clone());
622 } else {
623 panic!("Failed to downcast SYNCHI channel data");
624 }
625 })
626 };
627
628 subscribe_to_channel::<MergeInt>("test", vec![0, 1], subscriber.clone());
629
630 assert_eq!(received_data.borrow().len(), 1);
633 assert_eq!(received_data.borrow()[0], MergeInt(0));
634
635 channel_1.push(MergeInt(42));
636
637 assert_eq!(channel_1.pull(), MergeInt(42));
638 assert_eq!(received_data.borrow().len(), 2);
639 assert_eq!(received_data.borrow()[1], MergeInt(42));
640
641 channel_2.push(MergeInt(43));
642
643 assert_eq!(channel_2.pull(), MergeInt(43));
644 assert_eq!(received_data.borrow().len(), 3);
645 assert_eq!(received_data.borrow()[2], MergeInt(85));
646 }
647
648 #[test]
649 #[serial]
650 fn test_several_subscribers_for_several_channels() {
651 let channel_1 = SynchiChannel::<MergeInt>::new("test");
652 channel_1.push(MergeInt(42));
653 let received_data_1 = Rc::new(RefCell::new(Vec::new()));
654
655 let subscriber_1 = {
656 let received_data = received_data_1.clone();
657
658 Callback::from(move |data: Box<dyn Any>| {
659 if let Some(data) = data.downcast_ref::<MergeInt>() {
660 received_data.borrow_mut().push(data.clone());
661 } else {
662 panic!("Failed to downcast SYNCHI channel data");
663 }
664 })
665 };
666
667 subscribe_to_channel::<MergeInt>("test", vec![0], subscriber_1.clone());
668 assert!(received_data_1.borrow().len() == 1);
669 assert_eq!(received_data_1.borrow()[0], MergeInt(42));
670
671 let channel_2 = SynchiChannel::<MergeInt>::new("test");
672 channel_2.push(MergeInt(43));
673 let received_data_2 = Rc::new(RefCell::new(Vec::new()));
674
675 let subscriber_2 = {
676 let received_data = received_data_2.clone();
677
678 Callback::from(move |data: Box<dyn Any>| {
679 if let Some(data) = data.downcast_ref::<MergeInt>() {
680 received_data.borrow_mut().push(data.clone());
681 } else {
682 panic!("Failed to downcast SYNCHI channel data");
683 }
684 })
685 };
686
687 subscribe_to_channel::<MergeInt>("test", vec![1], subscriber_2.clone());
688 assert_eq!(received_data_1.borrow().len(), 1);
690 assert_eq!(received_data_1.borrow()[0], MergeInt(42));
691
692 assert_eq!(received_data_2.borrow().len(), 1);
694 assert_eq!(received_data_2.borrow()[0], MergeInt(43));
695 }
696}