yewlish_synchi/
lib.rs

1use std::any::Any;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::rc::Rc;
6use yew::prelude::*;
7
8type SynchiStore = HashMap<&'static str, Rc<RefCell<HashMap<usize, DataStatus>>>>;
9type SynchiSubscriber = Callback<Box<dyn Any>>;
10
11enum DataStatus {
12    Free(Box<dyn Any>),
13    Claimed(SynchiSubscriber, Box<dyn Any>),
14}
15
16thread_local! {
17    static SYNCHI: Rc<RefCell<SynchiStore>> = Rc::new(RefCell::new(HashMap::new()));
18    static SYNCHI_COUNTERS: RefCell<HashMap<&'static str, usize>> = RefCell::new(HashMap::new());
19    static SYNCHI_SUBSCRIBERS: RefCell<HashMap<&'static str, Vec<SynchiSubscriber>>> =
20        RefCell::new(HashMap::new());
21}
22
23pub trait Merge {
24    fn merge(&self, other: &Self) -> Self;
25}
26
27fn register_channel<T>(name: &'static str, data: T) -> Option<usize>
28where
29    T: Any,
30{
31    SYNCHI.with(|store| {
32        let mut store = store.borrow_mut();
33
34        if let Some(existing_channel) = store.get(name) {
35            let mut channel = existing_channel.borrow_mut();
36
37            let index = SYNCHI_COUNTERS.with(|counters| {
38                let mut counters = counters.borrow_mut();
39
40                if let Some(counter) = counters.get_mut(name) {
41                    *counter += 1;
42                    *counter
43                } else {
44                    counters.insert(name, 0);
45                    0
46                }
47            });
48
49            channel.insert(index, DataStatus::Free(Box::new(data)));
50            return Some(index);
51        }
52
53        SYNCHI_COUNTERS.with(|counters| {
54            let mut counters = counters.borrow_mut();
55            counters.insert(name, 0);
56        });
57
58        let new_channel = Rc::new(RefCell::new(HashMap::<usize, DataStatus>::from_iter(vec![
59            (0, DataStatus::Free(Box::new(data))),
60        ])));
61
62        store.insert(name, new_channel);
63        Some(0)
64    })
65}
66
67fn unregister_channel(name: &'static str, index: usize) -> usize {
68    SYNCHI.with(|store| {
69        let store = store.borrow_mut();
70
71        if let Some(channel) = store.get(name) {
72            let mut channel = channel.borrow_mut();
73            channel.remove(&index);
74
75            channel.len()
76        } else {
77            panic!("Failed to get SYNCHI channel");
78        }
79    })
80}
81
82fn subscribe_to_channel<T>(name: &'static str, indexes: Vec<usize>, callback: SynchiSubscriber)
83where
84    T: Any + Clone + Debug + Default + Merge,
85{
86    let mut no_new_subscriber = false;
87
88    SYNCHI_SUBSCRIBERS.with(|subscribers| {
89        let mut subscribers = subscribers.borrow_mut();
90
91        if let Some(existing_subscribers) = subscribers.get_mut(name) {
92            for existing_subscriber in existing_subscribers.iter() {
93                if existing_subscriber == &callback {
94                    no_new_subscriber = true;
95                    return;
96                }
97            }
98
99            existing_subscribers.push(callback.clone());
100        } else {
101            subscribers.insert(name, vec![callback.clone()]);
102        }
103    });
104
105    if no_new_subscriber {
106        return;
107    }
108
109    SYNCHI.with(|store| {
110        let store = store.borrow_mut();
111
112        if let Some(channel) = store.get(name) {
113            let mut channel = channel.borrow_mut();
114            let mut merged_data = T::default();
115
116            for index in indexes {
117                if let Some(data_status) = channel.get_mut(&index) {
118                    match data_status {
119                        DataStatus::Free(data) => {
120                            if let Some(data) = data.downcast_ref::<T>() {
121                                merged_data = merged_data.merge(data);
122
123                                *data_status =
124                                    DataStatus::Claimed(callback.clone(), Box::new(data.clone()));
125                            }
126                        }
127                        DataStatus::Claimed(subscriber, data) => {
128                            if *subscriber == callback {
129                                if let Some(data) = data.downcast_ref::<T>() {
130                                    merged_data = merged_data.merge(data);
131                                }
132                            }
133                        }
134                    }
135                }
136            }
137
138            callback.emit(Box::new(merged_data));
139        }
140    });
141}
142
143fn unsubscribe_from_channel<T>(name: &'static str, callback: SynchiSubscriber)
144where
145    T: Any + Clone + Debug + Default + Merge,
146{
147    SYNCHI_SUBSCRIBERS.with(|subscribers| {
148        let mut subscribers = subscribers.borrow_mut();
149
150        if let Some(existing_subscribers) = subscribers.get_mut(name) {
151            existing_subscribers.retain(|subscriber| subscriber != &callback);
152        }
153    });
154
155    SYNCHI.with(|store| {
156        let store = store.borrow_mut();
157
158        if let Some(channel) = store.get(name) {
159            let mut channel = channel.borrow_mut();
160
161            for index in 0..channel.len() {
162                if let Some(data_status) = channel.get_mut(&index) {
163                    if let DataStatus::Claimed(subscriber, data) = data_status {
164                        if *subscriber == callback {
165                            if let Some(data) = data.downcast_ref::<T>() {
166                                *data_status = DataStatus::Free(Box::new(data.clone()));
167                            }
168                        }
169                    }
170                }
171            }
172        }
173    });
174}
175
176fn notify_subscriber<T>(name: &'static str, callback: SynchiSubscriber)
177where
178    T: Any + Clone + Debug + Default + Merge,
179{
180    SYNCHI.with(|store| {
181        let store = store.borrow_mut();
182
183        if let Some(channel) = store.get(name) {
184            let mut channel = channel.borrow_mut();
185            let mut merged_data = T::default();
186
187            for index in 0..channel.len() {
188                if let Some(DataStatus::Claimed(subscriber, data)) = channel.get_mut(&index) {
189                    if *subscriber == callback {
190                        if let Some(data) = data.downcast_ref::<T>() {
191                            merged_data = merged_data.merge(data);
192                        }
193                    }
194                }
195            }
196
197            callback.emit(Box::new(merged_data));
198        }
199    });
200}
201
202#[derive(Debug, Clone, PartialEq)]
203pub struct SynchiChannel<T>
204where
205    T: Any + Clone + Default,
206{
207    pub name: &'static str,
208    pub index: usize,
209    _marker: std::marker::PhantomData<T>,
210}
211
212impl<T> Drop for SynchiChannel<T>
213where
214    T: Any + Clone + Default,
215{
216    fn drop(&mut self) {
217        let channel_len = unregister_channel(self.name, self.index);
218
219        if channel_len == 0 {
220            SYNCHI.with(|store| {
221                let mut store = store.borrow_mut();
222
223                if let Some(channel) = store.remove(self.name) {
224                    drop(channel);
225                }
226            });
227
228            SYNCHI_COUNTERS.with(|counters| {
229                let mut counters = counters.borrow_mut();
230                counters.remove(self.name);
231            });
232
233            SYNCHI_SUBSCRIBERS.with(|subscribers| {
234                let mut subscribers = subscribers.borrow_mut();
235                subscribers.remove(self.name);
236            });
237        }
238    }
239}
240
241impl<T> SynchiChannel<T>
242where
243    T: Any + Clone + Debug + Merge + Default,
244{
245    pub fn new(name: &'static str) -> Self {
246        if let Some(index) = register_channel(name, T::default()) {
247            SynchiChannel {
248                name,
249                index,
250                _marker: std::marker::PhantomData,
251            }
252        } else {
253            panic!("Failed to register SYNCHI channel");
254        }
255    }
256
257    pub fn new_with_data(name: &'static str, data: T) -> Self {
258        if let Some(index) = register_channel(name, data) {
259            SynchiChannel {
260                name,
261                index,
262                _marker: std::marker::PhantomData,
263            }
264        } else {
265            panic!("Failed to register SYNCHI channel");
266        }
267    }
268
269    pub fn pull(&self) -> T {
270        SYNCHI.with(|store| {
271            let store = store.borrow();
272
273            if let Some(channel) = store.get(self.name) {
274                let channel = channel.borrow();
275
276                if let Some(data) = channel.get(&self.index) {
277                    match data {
278                        DataStatus::Free(data) => {
279                            let data = data.downcast_ref::<T>().unwrap();
280                            data.clone()
281                        }
282                        DataStatus::Claimed(_, data) => {
283                            let data = data.downcast_ref::<T>().unwrap();
284                            data.clone()
285                        }
286                    }
287                } else {
288                    panic!("Failed to get SYNCHI channel data");
289                }
290            } else {
291                panic!("Failed to get SYNCHI channel");
292            }
293        })
294    }
295
296    pub fn push(&self, data: T) {
297        let mut target_subscriber = None::<SynchiSubscriber>;
298
299        SYNCHI.with(|store| {
300            let store = store.borrow_mut();
301
302            if let Some(channel) = store.get(self.name) {
303                let mut channel = channel.borrow_mut();
304
305                if let Some(channel_data) = channel.get_mut(&self.index) {
306                    match channel_data {
307                        DataStatus::Free(_) => {
308                            *channel_data = DataStatus::Free(Box::new(data.clone()));
309                        }
310                        DataStatus::Claimed(subscriber, _) => {
311                            target_subscriber = subscriber.clone().into();
312
313                            *channel_data =
314                                DataStatus::Claimed(subscriber.clone(), Box::new(data.clone()));
315                        }
316                    }
317                } else {
318                    panic!("Failed to get SYNCHI channel data");
319                }
320            } else {
321                panic!("Failed to get SYNCHI channel");
322            }
323        });
324
325        if let Some(subscriber) = target_subscriber {
326            notify_subscriber::<T>(self.name, subscriber.clone());
327        }
328    }
329}
330
331#[hook]
332pub fn use_synchi_channel<T>(name: &'static str) -> Rc<RefCell<SynchiChannel<T>>>
333where
334    T: Any + Clone + Default + Merge + Debug,
335{
336    use_mut_ref(|| SynchiChannel::<T>::new(name))
337}
338
339#[hook]
340pub fn use_synchi_channel_with<T>(name: &'static str, data: T) -> Rc<RefCell<SynchiChannel<T>>>
341where
342    T: Any + Clone + Default + PartialEq + Merge + Debug,
343{
344    use_mut_ref(|| SynchiChannel::<T>::new_with_data(name, data.clone()))
345}
346
347#[hook]
348pub fn use_synchi_channel_subscribe<T>(name: &'static str, indexes: Vec<usize>) -> UseStateHandle<T>
349where
350    T: Any + Clone + Default + PartialEq + Debug + Merge,
351{
352    let data = use_state(|| T::default());
353
354    let subscriber = {
355        let data = data.clone();
356
357        use_callback((), move |next_data: Box<dyn Any>, _| {
358            if let Some(next_data) = next_data.downcast_ref::<T>() {
359                data.set(next_data.clone());
360            } else {
361                panic!("Failed to downcast SYNCHI channel \"{name}\" data: {next_data:?}");
362            }
363        })
364    };
365
366    subscribe_to_channel::<T>(name, indexes, subscriber.clone());
367
368    use_effect_with((), move |_| {
369        let subscriber = subscriber.clone();
370
371        move || {
372            unsubscribe_from_channel::<T>(name, subscriber);
373        }
374    });
375
376    data
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use serial_test::serial;
383
384    #[derive(Debug, Clone, PartialEq, Default)]
385    struct MergeInt(i32);
386
387    impl Merge for MergeInt {
388        fn merge(&self, other: &Self) -> Self {
389            MergeInt(self.0 + other.0)
390        }
391    }
392
393    #[test]
394    #[serial]
395    fn test_synchi_channel() {
396        let channel = SynchiChannel::<MergeInt>::new("test");
397        assert_eq!(channel.index, 0);
398    }
399
400    #[test]
401    #[serial]
402    fn test_several_values_for_synchi_channel() {
403        let channel = SynchiChannel::<MergeInt>::new("test");
404        assert_eq!(channel.index, 0);
405
406        let channel = SynchiChannel::<MergeInt>::new("test");
407        assert_eq!(channel.index, 1);
408    }
409
410    #[test]
411    #[serial]
412    fn test_pull_value_from_synchi_channel() {
413        let channel1 = SynchiChannel::<MergeInt>::new("test");
414        assert_eq!(channel1.index, 0);
415
416        let channel2 = SynchiChannel::<MergeInt>::new("test");
417        assert_eq!(channel2.index, 1);
418
419        let channel3 = SynchiChannel::<MergeInt>::new("test");
420        assert_eq!(channel3.index, 2);
421
422        assert_eq!(channel1.pull(), MergeInt(0));
423        assert_eq!(channel2.pull(), MergeInt(0));
424        assert_eq!(channel3.pull(), MergeInt(0));
425    }
426
427    #[test]
428    #[serial]
429    fn test_push_value_to_synchi_channel() {
430        let channel = SynchiChannel::<MergeInt>::new("test");
431        assert_eq!(channel.index, 0);
432
433        channel.push(MergeInt(42));
434        assert_eq!(channel.pull(), MergeInt(42));
435    }
436
437    #[test]
438    fn test_push_several_values_to_synchi_channel() {
439        let channel = SynchiChannel::<MergeInt>::new("test");
440        assert_eq!(channel.index, 0);
441
442        channel.push(MergeInt(42));
443        assert_eq!(channel.pull(), MergeInt(42));
444
445        channel.push(MergeInt(43));
446        assert_eq!(channel.pull(), MergeInt(43));
447    }
448
449    #[test]
450    #[serial]
451    fn test_push_several_values_from_several_channels_to_synchi_channel() {
452        let channel1 = SynchiChannel::<MergeInt>::new("test");
453        assert_eq!(channel1.index, 0);
454
455        let channel2 = SynchiChannel::<MergeInt>::new("test");
456        assert_eq!(channel2.index, 1);
457
458        let channel3 = SynchiChannel::<MergeInt>::new("test");
459        assert_eq!(channel3.index, 2);
460
461        channel1.push(MergeInt(42));
462        assert_eq!(channel1.pull(), MergeInt(42));
463
464        channel2.push(MergeInt(43));
465        assert_eq!(channel2.pull(), MergeInt(43));
466
467        channel3.push(MergeInt(44));
468        assert_eq!(channel3.pull(), MergeInt(44));
469
470        // Check that the values are still there
471        assert_eq!(channel1.pull(), MergeInt(42));
472        assert_eq!(channel2.pull(), MergeInt(43));
473        assert_eq!(channel3.pull(), MergeInt(44));
474    }
475
476    #[test]
477    #[serial]
478    fn test_if_one_subscriber_is_notified() {
479        let channel = SynchiChannel::<MergeInt>::new("test");
480        assert_eq!(channel.index, 0);
481
482        let received_data = Rc::new(RefCell::new(Vec::new()));
483
484        let subscriber = {
485            let received_data = received_data.clone();
486
487            Callback::from(move |data: Box<dyn Any>| {
488                if let Some(data) = data.downcast_ref::<MergeInt>() {
489                    received_data.borrow_mut().push(data.clone());
490                } else {
491                    panic!("Failed to downcast SYNCHI channel data");
492                }
493            })
494        };
495
496        // Subscribed before any data is pushed
497        subscribe_to_channel::<MergeInt>("test", vec![0], subscriber.clone());
498
499        // The subscriber should receive the initial value
500        assert_eq!(received_data.borrow().len(), 1);
501        assert_eq!(received_data.borrow()[0], MergeInt(0));
502
503        channel.push(MergeInt(42));
504        assert_eq!(channel.pull(), MergeInt(42));
505        assert_eq!(received_data.borrow().len(), 2);
506        assert_eq!(received_data.borrow()[1], MergeInt(42));
507
508        channel.push(MergeInt(43));
509        assert_eq!(channel.pull(), MergeInt(43));
510        assert_eq!(received_data.borrow().len(), 3);
511        assert_eq!(received_data.borrow()[2], MergeInt(43));
512
513        unsubscribe_from_channel::<MergeInt>("test", subscriber);
514
515        // No new data should be received
516        channel.push(MergeInt(44));
517        assert_eq!(channel.pull(), MergeInt(44));
518        assert_eq!(received_data.borrow().len(), 3);
519    }
520
521    #[test]
522    #[serial]
523    fn test_several_subscribers_for_one_channel() {
524        let channel = SynchiChannel::<MergeInt>::new("test");
525        assert_eq!(channel.index, 0);
526
527        let received_data_1 = Rc::new(RefCell::new(Vec::new()));
528
529        let subscriber_1 = {
530            let received_data = received_data_1.clone();
531
532            Callback::from(move |data: Box<dyn Any>| {
533                if let Some(data) = data.downcast_ref::<MergeInt>() {
534                    received_data.borrow_mut().push(data.clone());
535                } else {
536                    panic!("Failed to downcast SYNCHI channel data");
537                }
538            })
539        };
540
541        let received_data_2 = Rc::new(RefCell::new(Vec::new()));
542
543        let subscriber_2 = {
544            let received_data = received_data_2.clone();
545
546            Callback::from(move |data: Box<dyn Any>| {
547                if let Some(data) = data.downcast_ref::<MergeInt>() {
548                    received_data.borrow_mut().push(data.clone());
549                } else {
550                    panic!("Failed to downcast SYNCHI channel data");
551                }
552            })
553        };
554
555        // Subscribe each after another
556        subscribe_to_channel::<MergeInt>("test", vec![0], subscriber_1.clone());
557        subscribe_to_channel::<MergeInt>("test", vec![1], subscriber_2.clone());
558
559        // Both subscribers should receive the initial value
560        assert_eq!(received_data_1.borrow().len(), 1);
561        assert_eq!(received_data_1.borrow()[0], MergeInt(0));
562        assert_eq!(received_data_2.borrow().len(), 1);
563        assert_eq!(received_data_2.borrow()[0], MergeInt(0));
564
565        // And the first subscriber should receive the new data
566        channel.push(MergeInt(42));
567        assert_eq!(channel.pull(), MergeInt(42));
568        assert_eq!(received_data_1.borrow().len(), 2);
569        assert_eq!(received_data_1.borrow()[1], MergeInt(42));
570        assert_eq!(received_data_2.borrow().len(), 1);
571
572        // Free the first subscriber
573        unsubscribe_from_channel::<MergeInt>("test", subscriber_1);
574
575        // No new data should be received
576        channel.push(MergeInt(44));
577        assert_eq!(channel.pull(), MergeInt(44));
578        assert_eq!(received_data_1.borrow().len(), 2);
579
580        // The second subscriber do not receive the new data bc it wasn't claimed by it
581        assert_eq!(received_data_2.borrow().len(), 1);
582
583        // But the new subscriber should receive the new data
584        let received_data_3 = Rc::new(RefCell::new(Vec::new()));
585
586        let subscriber_3 = {
587            let received_data = received_data_3.clone();
588
589            Callback::from(move |data: Box<dyn Any>| {
590                if let Some(data) = data.downcast_ref::<MergeInt>() {
591                    received_data.borrow_mut().push(data.clone());
592                } else {
593                    panic!("Failed to downcast SYNCHI channel data");
594                }
595            })
596        };
597
598        subscribe_to_channel::<MergeInt>("test", vec![0], subscriber_3.clone());
599
600        assert_eq!(received_data_3.borrow().len(), 1);
601        assert_eq!(received_data_3.borrow()[0], MergeInt(44));
602
603        // And the second subscriber should still have the initial value
604        assert_eq!(received_data_2.borrow().len(), 1);
605        assert_eq!(received_data_2.borrow()[0], MergeInt(0));
606    }
607
608    #[test]
609    #[serial]
610    fn test_one_subscriber_for_multiple_subchannels() {
611        let channel_1 = SynchiChannel::<MergeInt>::new("test");
612        let channel_2 = SynchiChannel::<MergeInt>::new("test");
613
614        let received_data = Rc::new(RefCell::new(Vec::new()));
615
616        let subscriber = {
617            let received_data = received_data.clone();
618
619            Callback::from(move |data: Box<dyn Any>| {
620                if let Some(data) = data.downcast_ref::<MergeInt>() {
621                    received_data.borrow_mut().push(data.clone());
622                } else {
623                    panic!("Failed to downcast SYNCHI channel data");
624                }
625            })
626        };
627
628        subscribe_to_channel::<MergeInt>("test", vec![0, 1], subscriber.clone());
629
630        // Since the data will merged before being sent to the subscriber, the subscriber should
631        // receive the initial value
632        assert_eq!(received_data.borrow().len(), 1);
633        assert_eq!(received_data.borrow()[0], MergeInt(0));
634
635        channel_1.push(MergeInt(42));
636
637        assert_eq!(channel_1.pull(), MergeInt(42));
638        assert_eq!(received_data.borrow().len(), 2);
639        assert_eq!(received_data.borrow()[1], MergeInt(42));
640
641        channel_2.push(MergeInt(43));
642
643        assert_eq!(channel_2.pull(), MergeInt(43));
644        assert_eq!(received_data.borrow().len(), 3);
645        assert_eq!(received_data.borrow()[2], MergeInt(85));
646    }
647
648    #[test]
649    #[serial]
650    fn test_several_subscribers_for_several_channels() {
651        let channel_1 = SynchiChannel::<MergeInt>::new("test");
652        channel_1.push(MergeInt(42));
653        let received_data_1 = Rc::new(RefCell::new(Vec::new()));
654
655        let subscriber_1 = {
656            let received_data = received_data_1.clone();
657
658            Callback::from(move |data: Box<dyn Any>| {
659                if let Some(data) = data.downcast_ref::<MergeInt>() {
660                    received_data.borrow_mut().push(data.clone());
661                } else {
662                    panic!("Failed to downcast SYNCHI channel data");
663                }
664            })
665        };
666
667        subscribe_to_channel::<MergeInt>("test", vec![0], subscriber_1.clone());
668        assert!(received_data_1.borrow().len() == 1);
669        assert_eq!(received_data_1.borrow()[0], MergeInt(42));
670
671        let channel_2 = SynchiChannel::<MergeInt>::new("test");
672        channel_2.push(MergeInt(43));
673        let received_data_2 = Rc::new(RefCell::new(Vec::new()));
674
675        let subscriber_2 = {
676            let received_data = received_data_2.clone();
677
678            Callback::from(move |data: Box<dyn Any>| {
679                if let Some(data) = data.downcast_ref::<MergeInt>() {
680                    received_data.borrow_mut().push(data.clone());
681                } else {
682                    panic!("Failed to downcast SYNCHI channel data");
683                }
684            })
685        };
686
687        subscribe_to_channel::<MergeInt>("test", vec![1], subscriber_2.clone());
688        // The first subscriber still has a prev value from the first channel
689        assert_eq!(received_data_1.borrow().len(), 1);
690        assert_eq!(received_data_1.borrow()[0], MergeInt(42));
691
692        // The second subscriber should receive it's own value from the second channel
693        assert_eq!(received_data_2.borrow().len(), 1);
694        assert_eq!(received_data_2.borrow()[0], MergeInt(43));
695    }
696}