subscribe/
subscribe.rs

1use std::{thread, time::Duration};
2
3use env_logger::{Builder, Env};
4use log::{debug, info, warn};
5use rmp_serde::from_slice;
6use serde::Deserialize;
7use tether_agent::{PlugOptionsBuilder, TetherAgentOptionsBuilder};
8
9#[derive(Deserialize, Debug)]
10struct CustomMessage {
11    id: usize,
12    name: String,
13}
14// Test this by sending a message like
15// tether send --topic specific/any/two --message '{"id":1,"name":"boo"}'
16
17fn main() {
18    println!("Rust Tether Agent subscribe example");
19
20    let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
21    builder.filter_module("tether_agent", log::LevelFilter::Warn);
22    builder.filter_module("rumqttc", log::LevelFilter::Warn);
23    builder.init();
24
25    debug!("Debugging is enabled; could be verbose");
26
27    let mut tether_agent = TetherAgentOptionsBuilder::new("RustDemo")
28        .id(Some("example"))
29        .build()
30        .expect("failed to init Tether agent");
31
32    let input_one = PlugOptionsBuilder::create_input("one")
33        .build(&mut tether_agent)
34        .expect("failed to create input");
35    info!("input one {} = {}", input_one.name(), input_one.topic());
36    let input_two = PlugOptionsBuilder::create_input("two")
37        .role(Some("specific"))
38        .build(&mut tether_agent)
39        .expect("failed to create input");
40    info!("input two {} = {}", input_two.name(), input_two.topic());
41    let input_empty = PlugOptionsBuilder::create_input("nothing")
42        .build(&mut tether_agent)
43        .expect("failed to create input");
44
45    let input_everything = PlugOptionsBuilder::create_input("everything")
46        .topic(Some("#"))
47        .build(&mut tether_agent)
48        .expect("failed to create input");
49
50    let input_specify_id = PlugOptionsBuilder::create_input("groupMessages")
51        .id(Some("someGroup"))
52        .name(None)
53        .build(&mut tether_agent)
54        .expect("failed to create input");
55
56    debug!(
57        "input everything {} = {}",
58        input_everything.name(),
59        input_everything.topic()
60    );
61
62    info!("Checking messages every 1s, 10x...");
63
64    loop {
65        debug!("Checking for messages...");
66        while let Some((topic, payload)) = tether_agent.check_messages() {
67            // debug!(
68            //     "........ Received a message topic {:?} => topic parts {:?}",
69            //     topic, topic
70            // );
71
72            if input_one.matches(&topic) {
73                info!(
74                            "******** INPUT ONE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
75                            input_one.name(),
76                            topic,
77                            payload.len()
78                        );
79                // assert_eq!(parse_plug_name(topic.un), Some("one"));
80            }
81            if input_two.matches(&topic) {
82                info!(
83                        "******** INPUT TWO:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
84                        input_two.name(),
85                        topic,
86                        payload.len()
87                    );
88                // assert_eq!(parse_plug_name(message.topic()), Some("two"));
89                // assert_ne!(parse_plug_name(message.topic()), Some("one"));
90
91                // Notice how you must give the from_slice function a type so it knows what to expect
92                let decoded = from_slice::<CustomMessage>(&payload);
93                match decoded {
94                    Ok(d) => {
95                        info!("Yes, we decoded the MessagePack payload as: {:?}", d);
96                        let CustomMessage { name, id } = d;
97                        debug!("Name is {} and ID is {}", name, id);
98                    }
99                    Err(e) => {
100                        warn!("Failed to decode the payload: {}", e)
101                    }
102                };
103            }
104            if input_empty.matches(&topic) {
105                info!(
106                        "******** EMPTY MESSAGE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
107                        input_empty.name(),
108                        topic,
109                       payload.len()
110                    );
111                // assert_eq!(parse_plug_name(topic), Some("nothing"));
112            }
113            if input_everything.matches(&topic) {
114                info!(
115                    "******** EVERYTHING MATCHES HERE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
116                    input_everything.name(),
117                    topic,
118                   payload.len()
119                );
120            }
121            if input_specify_id.matches(&topic) {
122                info!("******** ID MATCH:\n Should match any role and plug name, but only messages with ID \"groupMessages\"");
123                info!(
124                    "\n Received a message from plug named \"{}\" on topic {:?} with length {} bytes",
125                    input_specify_id.name(),
126                    topic,
127                    payload.len()
128                );
129                // assert_eq!(parse_agent_id(message.topic()), Some("groupMessages"));
130            }
131        }
132
133        thread::sleep(Duration::from_millis(1000))
134    }
135}