1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{
4 tether_compliant_topic::TetherOrCustomTopic, ChannelOptionsBuilder, TetherAgent,
5};
6
7#[derive(Args, Default)]
8pub struct ReceiveOptions {
9 #[arg(long = "channel.role")]
11 pub subscribe_role: Option<String>,
12
13 #[arg(long = "channel.id")]
15 pub subscribe_id: Option<String>,
16
17 #[arg(long = "channel.name")]
19 pub subscribe_channel_name: Option<String>,
20
21 #[arg(long = "topic")]
25 pub subscribe_topic: Option<String>,
26}
27
28pub fn receive(
29 options: &ReceiveOptions,
30 tether_agent: &mut TetherAgent,
31 on_message: fn(channel_name: String, topic: String, decoded: Option<String>),
32) {
33 info!("Tether Receive Utility");
34
35 let channel_options = build_receiver(options);
36
37 let channel = channel_options
38 .build(tether_agent)
39 .expect("failed to create Channel Receiver");
40
41 info!("Subscribed to topic \"{}\" ...", channel.generated_topic());
42
43 loop {
44 let mut did_work = false;
45 while let Some((topic, payload)) = tether_agent.check_messages() {
46 did_work = true;
47 let full_topic_string = topic.full_topic_string();
48 debug!("Received message on topic \"{}\"", &full_topic_string);
49 let channel_name = match topic {
50 TetherOrCustomTopic::Custom(_) => String::from("unknown"),
51 TetherOrCustomTopic::Tether(tpt) => String::from(tpt.channel_name()),
52 };
53
54 if payload.is_empty() {
55 debug!("Empty message payload");
56 on_message(channel_name, full_topic_string, None);
57 } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
58 let json = serde_json::to_string(&value).expect("failed to stringify JSON");
59 debug!("Decoded MessagePack payload: {}", json);
60 on_message(channel_name, full_topic_string, Some(json));
61 } else {
62 debug!("Failed to decode MessagePack payload");
63 if let Ok(s) = String::from_utf8(payload.to_vec()) {
64 warn!("String representation of payload: \"{}\"", s);
65 } else {
66 error!("Could not decode payload bytes as string, either");
67 }
68 on_message(channel_name, full_topic_string, None);
69 }
70 }
71 if !did_work {
72 std::thread::sleep(std::time::Duration::from_micros(100)); }
74 }
75}
76
77fn build_receiver(options: &ReceiveOptions) -> ChannelOptionsBuilder {
78 if options.subscribe_id.is_some()
79 || options.subscribe_role.is_some()
80 || options.subscribe_channel_name.is_some()
81 {
82 debug!(
83 "TPT Overrides apply: {:?}, {:?}, {:?}",
84 &options.subscribe_id, &options.subscribe_role, &options.subscribe_channel_name
85 );
86 ChannelOptionsBuilder::create_receiver(match &options.subscribe_channel_name {
87 Some(provided_name) => {
88 if provided_name.as_str() == "+" {
89 "any"
90 } else {
91 provided_name
92 }
93 }
94 None => "any",
95 })
96 .role(options.subscribe_role.as_deref())
97 .id(options.subscribe_id.as_deref())
98 .name(match &options.subscribe_channel_name {
99 Some(provided_name_part) => {
100 if provided_name_part.as_str() == "+" {
101 Some("+")
102 } else {
103 None
104 }
105 }
106 None => {
107 if options.subscribe_id.is_some() || options.subscribe_role.is_some() {
108 Some("+")
112 } else {
113 None
117 }
118 }
119 })
120 } else {
121 debug!(
122 "Using custom override topic \"{:?}\"",
123 &options.subscribe_topic
124 );
125 ChannelOptionsBuilder::create_receiver("custom")
126 .topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use tether_agent::TetherAgentOptionsBuilder;
133
134 use crate::tether_receive::build_receiver;
135
136 use super::ReceiveOptions;
137
138 #[test]
139 fn default_options() {
140 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
141 .auto_connect(false)
142 .build()
143 .unwrap();
144
145 let options = ReceiveOptions::default();
146
147 let receiver = build_receiver(&options)
148 .build(&mut tether_agent)
149 .expect("build failed");
150
151 assert_eq!(receiver.name(), "custom");
152 assert_eq!(receiver.generated_topic(), "#");
153 }
154
155 #[test]
156 fn only_topic_custom() {
157 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
158 .auto_connect(false)
159 .build()
160 .unwrap();
161
162 let options = ReceiveOptions {
163 subscribe_role: None,
164 subscribe_id: None,
165 subscribe_channel_name: None,
166 subscribe_topic: Some("some/channel/special/fourpart".into()),
167 };
168
169 let receiver = build_receiver(&options)
170 .build(&mut tether_agent)
171 .expect("build failed");
172
173 assert_eq!(receiver.name(), "custom");
174 assert_eq!(receiver.generated_topic(), "some/channel/special/fourpart");
175 }
176
177 #[test]
178 fn only_chanel_name() {
179 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
180 .auto_connect(false)
181 .build()
182 .unwrap();
183
184 let options = ReceiveOptions {
185 subscribe_role: None,
186 subscribe_id: None,
187 subscribe_channel_name: Some("something".into()),
188 subscribe_topic: None,
189 };
190
191 let receiver = build_receiver(&options)
192 .build(&mut tether_agent)
193 .expect("build failed");
194
195 assert_eq!(receiver.name(), "something");
196 assert_eq!(receiver.generated_topic(), "+/something/#");
197 }
198
199 #[test]
200 fn only_role() {
201 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
202 .auto_connect(false)
203 .build()
204 .unwrap();
205
206 let options = ReceiveOptions {
207 subscribe_role: Some("something".into()),
208 subscribe_id: None,
209 subscribe_channel_name: None,
210 subscribe_topic: None,
211 };
212
213 let receiver = build_receiver(&options)
214 .build(&mut tether_agent)
215 .expect("build failed");
216
217 assert_eq!(receiver.name(), "any");
218 assert_eq!(receiver.generated_topic(), "something/+/#");
219 }
220
221 #[test]
222 fn only_id() {
223 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
224 .auto_connect(false)
225 .build()
226 .unwrap();
227
228 let options = ReceiveOptions {
229 subscribe_role: None,
230 subscribe_id: Some("something".into()),
231 subscribe_channel_name: None,
232 subscribe_topic: None,
233 };
234
235 let receiver = build_receiver(&options)
236 .build(&mut tether_agent)
237 .expect("build failed");
238
239 assert_eq!(receiver.name(), "any");
240 assert_eq!(receiver.generated_topic(), "+/+/something");
241 }
242
243 #[test]
244 fn role_and_id() {
245 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
246 .auto_connect(false)
247 .build()
248 .unwrap();
249
250 let options = ReceiveOptions {
251 subscribe_role: Some("x".into()),
252 subscribe_id: Some("y".into()),
253 subscribe_channel_name: None,
254 subscribe_topic: None,
255 };
256
257 let receiver = build_receiver(&options)
258 .build(&mut tether_agent)
259 .expect("build failed");
260
261 assert_eq!(receiver.name(), "any");
262 assert_eq!(receiver.generated_topic(), "x/+/y");
263 }
264
265 #[test]
266 fn role_and_channel_name() {
267 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
268 .auto_connect(false)
269 .build()
270 .unwrap();
271
272 let options = ReceiveOptions {
273 subscribe_role: Some("x".into()),
274 subscribe_id: None,
275 subscribe_channel_name: Some("z".into()),
276 subscribe_topic: None,
277 };
278
279 let receiver = build_receiver(&options)
280 .build(&mut tether_agent)
281 .expect("build failed");
282
283 assert_eq!(receiver.name(), "z");
284 assert_eq!(receiver.generated_topic(), "x/z/#");
285 }
286
287 #[test]
288 fn spec_all_three() {
289 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
290 .auto_connect(false)
291 .build()
292 .unwrap();
293
294 let options = ReceiveOptions {
295 subscribe_role: Some("x".into()),
296 subscribe_channel_name: Some("z".into()),
297 subscribe_id: Some("y".into()),
298 subscribe_topic: None,
299 };
300
301 let receiver = build_receiver(&options)
302 .build(&mut tether_agent)
303 .expect("build failed");
304
305 assert_eq!(receiver.name(), "z");
306 assert_eq!(receiver.generated_topic(), "x/z/y");
307 }
308
309 #[test]
310 fn redundant_but_valid() {
311 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
312 .auto_connect(false)
313 .build()
314 .unwrap();
315
316 let options = ReceiveOptions {
317 subscribe_role: None,
318 subscribe_id: None,
319 subscribe_channel_name: Some("+".into()),
320 subscribe_topic: None,
321 };
322
323 let receiver = build_receiver(&options)
324 .build(&mut tether_agent)
325 .expect("build failed");
326
327 assert_eq!(receiver.name(), "any");
328 assert_eq!(receiver.generated_topic(), "+/+/#");
329 }
330}