broadcaster/
broadcaster.rs

1use dispatchers::{Broadcaster, MessageType, Observer};
2use tokio;
3
4#[derive(Default, Debug, Clone)]
5struct Message {
6    pub value: i32,
7    pub message_type: String,
8}
9
10impl MessageType for Message {
11    fn message_type(&self) -> &str {
12        &self.message_type
13    }
14}
15
16impl Message {
17    pub fn update(value: i32) -> Self {
18        Self {
19            value,
20            message_type: "update".to_owned(),
21        }
22    }
23    pub fn exit() -> Self {
24        Self {
25            value: 0,
26            message_type: "exit".to_owned(),
27        }
28    }
29}
30
31struct Handler<F>
32where
33    F: Fn(&Message) + Send,
34{
35    fun: F,
36}
37
38impl<F> Handler<F>
39where
40    F: Fn(&Message) + Send,
41{
42    pub fn new(fun: F) -> Box<Self> {
43        Box::new(Self { fun })
44    }
45}
46
47impl<F> Observer<Message> for Handler<F>
48where
49    F: Fn(&Message) + Send,
50{
51    fn call(&self, message: &Message) {
52        (self.fun)(message)
53    }
54}
55
56#[tokio::main]
57async fn main() {
58    let mut input_dispatcher = Broadcaster::<Message>::default();
59    let output_dispatcher = Broadcaster::<Message>::default();
60    input_dispatcher.register_handler(
61        "update",
62        Handler::new(|message| {
63            println!("input update: {}", message.value);
64        }),
65        "tag1",
66    );
67    let input_shared = input_dispatcher.clone();
68    let output_shared = output_dispatcher.clone();
69    tokio::spawn(async move {
70        // start loop for receiving messages from other threads
71        let mut receiver = output_shared.receiver();
72        loop {
73            match receiver.recv().await {
74                Ok(message) => {
75                    if message.message_type == "exit" {
76                        input_shared.send(Message::exit()).unwrap();
77                        break;
78                    } else {
79                        println!("output update: {}", message.value);
80                        input_shared.send(message).unwrap();
81                    }
82                }
83                Err(_err) => {
84                    break;
85                }
86            };
87        }
88    });
89
90    let mut input_receiver = input_dispatcher.receiver();
91    let mut counter = 0;
92    loop {
93        tokio::select! {
94            _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
95                counter += 1;
96                if counter < 6 {
97                    output_dispatcher.dispatch(Message::update(counter)).unwrap();
98                } else if counter == 6 {
99                    output_dispatcher.dispatch(Message::exit()).unwrap();
100                }
101            }
102
103            Ok(message) = input_receiver.recv() => {
104                input_dispatcher.dispatch_local(&message).unwrap();
105                if message.message_type == "exit" {
106                    println!("exit");
107                    break;
108                }
109            }
110        }
111    }
112}