1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{three_part_topic::TetherOrCustomTopic, PlugOptionsBuilder, TetherAgent};
4
5#[derive(Args, Default)]
6pub struct ReceiveOptions {
7 #[arg(long = "plug.role")]
9 pub subscribe_role: Option<String>,
10
11 #[arg(long = "plug.id")]
13 pub subscribe_id: Option<String>,
14
15 #[arg(long = "plug.name")]
17 pub subscribe_plug_name: Option<String>,
18
19 #[arg(long = "topic")]
23 pub subscribe_topic: Option<String>,
24}
25
26pub fn receive(
27 options: &ReceiveOptions,
28 tether_agent: &mut TetherAgent,
29 on_message: fn(plug_name: String, topic: String, decoded: Option<String>),
30) {
31 info!("Tether Receive Utility");
32
33 let input_def = build_receiver_plug(options);
34
35 let input = input_def
36 .build(tether_agent)
37 .expect("failed to create input plug");
38
39 info!("Subscribed to topic \"{}\" ...", input.topic());
40
41 loop {
42 let mut did_work = false;
43 while let Some((topic, payload)) = tether_agent.check_messages() {
44 did_work = true;
45 let full_topic_string = topic.full_topic_string();
46 debug!("Received message on topic \"{}\"", &full_topic_string);
47 let plug_name = match topic {
48 TetherOrCustomTopic::Custom(_) => String::from("unknown"),
49 TetherOrCustomTopic::Tether(tpt) => String::from(tpt.plug_name()),
50 };
51
52 if payload.is_empty() {
53 debug!("Empty message payload");
54 on_message(plug_name, full_topic_string, None);
55 } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
56 let json = serde_json::to_string(&value).expect("failed to stringify JSON");
57 debug!("Decoded MessagePack payload: {}", json);
58 on_message(plug_name, full_topic_string, Some(json));
59 } else {
60 debug!("Failed to decode MessagePack payload");
61 if let Ok(s) = String::from_utf8(payload.to_vec()) {
62 warn!("String representation of payload: \"{}\"", s);
63 } else {
64 error!("Could not decode payload bytes as string, either");
65 }
66 on_message(plug_name, full_topic_string, None);
67 }
68 }
69 if !did_work {
70 std::thread::sleep(std::time::Duration::from_micros(100)); }
72 }
73}
74
75fn build_receiver_plug(options: &ReceiveOptions) -> PlugOptionsBuilder {
76 if options.subscribe_id.is_some()
77 || options.subscribe_role.is_some()
78 || options.subscribe_plug_name.is_some()
79 {
80 debug!(
81 "TPT Overrides apply: {:?}, {:?}, {:?}",
82 &options.subscribe_id, &options.subscribe_role, &options.subscribe_plug_name
83 );
84 PlugOptionsBuilder::create_input(match &options.subscribe_plug_name {
85 Some(provided_name) => {
86 if provided_name.as_str() == "+" {
87 "any"
88 } else {
89 provided_name
90 }
91 }
92 None => "any",
93 })
94 .role(options.subscribe_role.as_deref())
95 .id(options.subscribe_id.as_deref())
96 .name(match &options.subscribe_plug_name {
97 Some(provided_name_part) => {
98 if provided_name_part.as_str() == "+" {
99 Some("+")
100 } else {
101 None
102 }
103 }
104 None => {
105 if options.subscribe_id.is_some() || options.subscribe_role.is_some() {
106 Some("+")
110 } else {
111 None
115 }
116 }
117 })
118 } else {
119 debug!(
120 "Using custom override topic \"{:?}\"",
121 &options.subscribe_topic
122 );
123 PlugOptionsBuilder::create_input("custom")
124 .topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use tether_agent::TetherAgentOptionsBuilder;
131
132 use crate::tether_receive::build_receiver_plug;
133
134 use super::ReceiveOptions;
135
136 #[test]
137 fn default_options() {
138 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
139 .build()
140 .expect("sorry, these tests require working localhost Broker");
141
142 let options = ReceiveOptions::default();
143
144 let receive_plug = build_receiver_plug(&options)
145 .build(&mut tether_agent)
146 .expect("build failed");
147
148 assert_eq!(receive_plug.name(), "custom");
149 assert_eq!(receive_plug.topic(), "#");
150 }
151
152 #[test]
153 fn only_topic_custom() {
154 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
155 .build()
156 .expect("sorry, these tests require working localhost Broker");
157
158 let options = ReceiveOptions {
159 subscribe_role: None,
160 subscribe_id: None,
161 subscribe_plug_name: None,
162 subscribe_topic: Some("some/special/plug".into()),
163 };
164
165 let receive_plug = build_receiver_plug(&options)
166 .build(&mut tether_agent)
167 .expect("build failed");
168
169 assert_eq!(receive_plug.name(), "custom");
170 assert_eq!(receive_plug.topic(), "some/special/plug");
171 }
172
173 #[test]
174 fn only_plug_name() {
175 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
176 .build()
177 .expect("sorry, these tests require working localhost Broker");
178
179 let options = ReceiveOptions {
180 subscribe_role: None,
181 subscribe_id: None,
182 subscribe_plug_name: Some("something".into()),
183 subscribe_topic: None,
184 };
185
186 let receive_plug = build_receiver_plug(&options)
187 .build(&mut tether_agent)
188 .expect("build failed");
189
190 assert_eq!(receive_plug.name(), "something");
191 assert_eq!(receive_plug.topic(), "+/+/something");
192 }
193
194 #[test]
195 fn only_role() {
196 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
197 .build()
198 .expect("sorry, these tests require working localhost Broker");
199
200 let options = ReceiveOptions {
201 subscribe_role: Some("something".into()),
202 subscribe_id: None,
203 subscribe_plug_name: None,
204 subscribe_topic: None,
205 };
206
207 let receive_plug = build_receiver_plug(&options)
208 .build(&mut tether_agent)
209 .expect("build failed");
210
211 assert_eq!(receive_plug.name(), "any");
212 assert_eq!(receive_plug.topic(), "something/+/+");
213 }
214
215 #[test]
216 fn only_id() {
217 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
218 .build()
219 .expect("sorry, these tests require working localhost Broker");
220
221 let options = ReceiveOptions {
222 subscribe_role: None,
223 subscribe_id: Some("something".into()),
224 subscribe_plug_name: None,
225 subscribe_topic: None,
226 };
227
228 let receive_plug = build_receiver_plug(&options)
229 .build(&mut tether_agent)
230 .expect("build failed");
231
232 assert_eq!(receive_plug.name(), "any");
233 assert_eq!(receive_plug.topic(), "+/something/+");
234 }
235
236 #[test]
237 fn role_and_id() {
238 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
239 .build()
240 .expect("sorry, these tests require working localhost Broker");
241
242 let options = ReceiveOptions {
243 subscribe_role: Some("x".into()),
244 subscribe_id: Some("y".into()),
245 subscribe_plug_name: None,
246 subscribe_topic: None,
247 };
248
249 let receive_plug = build_receiver_plug(&options)
250 .build(&mut tether_agent)
251 .expect("build failed");
252
253 assert_eq!(receive_plug.name(), "any");
254 assert_eq!(receive_plug.topic(), "x/y/+");
255 }
256
257 #[test]
258 fn role_and_plug_name() {
259 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
260 .build()
261 .expect("sorry, these tests require working localhost Broker");
262
263 let options = ReceiveOptions {
264 subscribe_role: Some("x".into()),
265 subscribe_id: None,
266 subscribe_plug_name: Some("z".into()),
267 subscribe_topic: None,
268 };
269
270 let receive_plug = build_receiver_plug(&options)
271 .build(&mut tether_agent)
272 .expect("build failed");
273
274 assert_eq!(receive_plug.name(), "z");
275 assert_eq!(receive_plug.topic(), "x/+/z");
276 }
277
278 #[test]
279 fn spec_all_three() {
280 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
281 .build()
282 .expect("sorry, these tests require working localhost Broker");
283
284 let options = ReceiveOptions {
285 subscribe_role: Some("x".into()),
286 subscribe_id: Some("y".into()),
287 subscribe_plug_name: Some("z".into()),
288 subscribe_topic: None,
289 };
290
291 let receive_plug = build_receiver_plug(&options)
292 .build(&mut tether_agent)
293 .expect("build failed");
294
295 assert_eq!(receive_plug.name(), "z");
296 assert_eq!(receive_plug.topic(), "x/y/z");
297 }
298
299 #[test]
300 fn redundant_but_valid() {
301 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
302 .build()
303 .expect("sorry, these tests require working localhost Broker");
304
305 let options = ReceiveOptions {
306 subscribe_role: None,
307 subscribe_id: None,
308 subscribe_plug_name: Some("+".into()),
309 subscribe_topic: None,
310 };
311
312 let receive_plug = build_receiver_plug(&options)
313 .build(&mut tether_agent)
314 .expect("build failed");
315
316 assert_eq!(receive_plug.name(), "any");
317 assert_eq!(receive_plug.topic(), "+/+/+");
318 }
319}