broadcaster/
broadcaster.rs1use dispatchers::{Broadcaster, MessageType, Observer};
2use tokio;
3
4#[derive(Default, Debug, Clone)]
5struct Message {
6 pub value: i32,
7 pub message_type: String,
8}
9
10impl MessageType for Message {
11 fn message_type(&self) -> &str {
12 &self.message_type
13 }
14}
15
16impl Message {
17 pub fn update(value: i32) -> Self {
18 Self {
19 value,
20 message_type: "update".to_owned(),
21 }
22 }
23 pub fn exit() -> Self {
24 Self {
25 value: 0,
26 message_type: "exit".to_owned(),
27 }
28 }
29}
30
31struct Handler<F>
32where
33 F: Fn(&Message) + Send,
34{
35 fun: F,
36}
37
38impl<F> Handler<F>
39where
40 F: Fn(&Message) + Send,
41{
42 pub fn new(fun: F) -> Box<Self> {
43 Box::new(Self { fun })
44 }
45}
46
47impl<F> Observer<Message> for Handler<F>
48where
49 F: Fn(&Message) + Send,
50{
51 fn call(&self, message: &Message) {
52 (self.fun)(message)
53 }
54}
55
56#[tokio::main]
57async fn main() {
58 let mut input_dispatcher = Broadcaster::<Message>::default();
59 let output_dispatcher = Broadcaster::<Message>::default();
60 input_dispatcher.register_handler(
61 "update",
62 Handler::new(|message| {
63 println!("input update: {}", message.value);
64 }),
65 "tag1",
66 );
67 let input_shared = input_dispatcher.clone();
68 let output_shared = output_dispatcher.clone();
69 tokio::spawn(async move {
70 let mut receiver = output_shared.receiver();
72 loop {
73 match receiver.recv().await {
74 Ok(message) => {
75 if message.message_type == "exit" {
76 input_shared.send(Message::exit()).unwrap();
77 break;
78 } else {
79 println!("output update: {}", message.value);
80 input_shared.send(message).unwrap();
81 }
82 }
83 Err(_err) => {
84 break;
85 }
86 };
87 }
88 });
89
90 let mut input_receiver = input_dispatcher.receiver();
91 let mut counter = 0;
92 loop {
93 tokio::select! {
94 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
95 counter += 1;
96 if counter < 6 {
97 output_dispatcher.dispatch(Message::update(counter)).unwrap();
98 } else if counter == 6 {
99 output_dispatcher.dispatch(Message::exit()).unwrap();
100 }
101 }
102
103 Ok(message) = input_receiver.recv() => {
104 input_dispatcher.dispatch_local(&message).unwrap();
105 if message.message_type == "exit" {
106 println!("exit");
107 break;
108 }
109 }
110 }
111 }
112}