1use std::{thread, time::Duration};
2
3use env_logger::{Builder, Env};
4use log::{debug, info, warn};
5use rmp_serde::from_slice;
6use serde::Deserialize;
7use tether_agent::{PlugOptionsBuilder, TetherAgentOptionsBuilder};
8
9#[derive(Deserialize, Debug)]
10struct CustomMessage {
11 id: usize,
12 name: String,
13}
14fn main() {
18 println!("Rust Tether Agent subscribe example");
19
20 let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
21 builder.filter_module("tether_agent", log::LevelFilter::Warn);
22 builder.filter_module("rumqttc", log::LevelFilter::Warn);
23 builder.init();
24
25 debug!("Debugging is enabled; could be verbose");
26
27 let mut tether_agent = TetherAgentOptionsBuilder::new("RustDemo")
28 .id(Some("example"))
29 .build()
30 .expect("failed to init Tether agent");
31
32 let input_one = PlugOptionsBuilder::create_input("one")
33 .build(&mut tether_agent)
34 .expect("failed to create input");
35 info!("input one {} = {}", input_one.name(), input_one.topic());
36 let input_two = PlugOptionsBuilder::create_input("two")
37 .role(Some("specific"))
38 .build(&mut tether_agent)
39 .expect("failed to create input");
40 info!("input two {} = {}", input_two.name(), input_two.topic());
41 let input_empty = PlugOptionsBuilder::create_input("nothing")
42 .build(&mut tether_agent)
43 .expect("failed to create input");
44
45 let input_everything = PlugOptionsBuilder::create_input("everything")
46 .topic(Some("#"))
47 .build(&mut tether_agent)
48 .expect("failed to create input");
49
50 let input_specify_id = PlugOptionsBuilder::create_input("groupMessages")
51 .id(Some("someGroup"))
52 .name(None)
53 .build(&mut tether_agent)
54 .expect("failed to create input");
55
56 debug!(
57 "input everything {} = {}",
58 input_everything.name(),
59 input_everything.topic()
60 );
61
62 info!("Checking messages every 1s, 10x...");
63
64 loop {
65 debug!("Checking for messages...");
66 while let Some((topic, payload)) = tether_agent.check_messages() {
67 if input_one.matches(&topic) {
73 info!(
74 "******** INPUT ONE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
75 input_one.name(),
76 topic,
77 payload.len()
78 );
79 }
81 if input_two.matches(&topic) {
82 info!(
83 "******** INPUT TWO:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
84 input_two.name(),
85 topic,
86 payload.len()
87 );
88 let decoded = from_slice::<CustomMessage>(&payload);
93 match decoded {
94 Ok(d) => {
95 info!("Yes, we decoded the MessagePack payload as: {:?}", d);
96 let CustomMessage { name, id } = d;
97 debug!("Name is {} and ID is {}", name, id);
98 }
99 Err(e) => {
100 warn!("Failed to decode the payload: {}", e)
101 }
102 };
103 }
104 if input_empty.matches(&topic) {
105 info!(
106 "******** EMPTY MESSAGE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
107 input_empty.name(),
108 topic,
109 payload.len()
110 );
111 }
113 if input_everything.matches(&topic) {
114 info!(
115 "******** EVERYTHING MATCHES HERE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
116 input_everything.name(),
117 topic,
118 payload.len()
119 );
120 }
121 if input_specify_id.matches(&topic) {
122 info!("******** ID MATCH:\n Should match any role and plug name, but only messages with ID \"groupMessages\"");
123 info!(
124 "\n Received a message from plug named \"{}\" on topic {:?} with length {} bytes",
125 input_specify_id.name(),
126 topic,
127 payload.len()
128 );
129 }
131 }
132
133 thread::sleep(Duration::from_millis(1000))
134 }
135}