1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{
4 ChannelDef, ChannelDefBuilder, ChannelReceiverDef, ChannelReceiverDefBuilder, TetherAgent,
5 tether_compliant_topic::TetherOrCustomTopic,
6};
7
8#[derive(Args, Default)]
9pub struct ReceiveOptions {
10 #[arg(long = "channel.role")]
12 pub subscribe_role: Option<String>,
13
14 #[arg(long = "channel.id")]
16 pub subscribe_id: Option<String>,
17
18 #[arg(long = "channel.name")]
20 pub subscribe_channel_name: Option<String>,
21
22 #[arg(long = "topic")]
26 pub subscribe_topic: Option<String>,
27}
28
29pub fn receive(
30 options: &ReceiveOptions,
31 tether_agent: &mut TetherAgent,
32 on_message: fn(channel_name: String, topic: String, decoded: Option<String>),
33) {
34 info!("Tether Receive Utility");
35
36 let channel_def = build_receiver(options, tether_agent);
37
38 if let Some(client) = tether_agent.client_mut() {
47 client
48 .subscribe(channel_def.generated_topic(), channel_def.qos())
49 .expect("failed to subscribe");
50
51 info!(
52 "Subscribed to topic \"{}\" ...",
53 channel_def.generated_topic()
54 );
55
56 loop {
57 let mut did_work = false;
58 while let Some((topic, payload)) = tether_agent.check_messages() {
59 did_work = true;
60 let full_topic_string = topic.full_topic_string();
61 debug!("Received message on topic \"{}\"", &full_topic_string);
62 let channel_name = match topic {
63 TetherOrCustomTopic::Custom(_) => String::from("unknown"),
64 TetherOrCustomTopic::Tether(tpt) => String::from(tpt.channel_name()),
65 };
66
67 if payload.is_empty() {
68 debug!("Empty message payload");
69 on_message(channel_name, full_topic_string, None);
70 } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
71 let json = serde_json::to_string(&value).expect("failed to stringify JSON");
72 debug!("Decoded MessagePack payload: {}", json);
73 on_message(channel_name, full_topic_string, Some(json));
74 } else {
75 debug!("Failed to decode MessagePack payload");
76 if let Ok(s) = String::from_utf8(payload.to_vec()) {
77 warn!("String representation of payload: \"{}\"", s);
78 } else {
79 error!("Could not decode payload bytes as string, either");
80 }
81 on_message(channel_name, full_topic_string, None);
82 }
83 }
84 if !did_work {
85 std::thread::sleep(std::time::Duration::from_micros(100)); }
87 }
88 } else {
89 error!("Failed to subscribe via Client!");
90 }
91}
92
93fn build_receiver(options: &ReceiveOptions, tether_agent: &TetherAgent) -> ChannelReceiverDef {
94 if options.subscribe_id.is_some()
95 || options.subscribe_role.is_some()
96 || options.subscribe_channel_name.is_some()
97 {
98 debug!(
99 "TPT Overrides apply: {:?}, {:?}, {:?}",
100 &options.subscribe_id, &options.subscribe_role, &options.subscribe_channel_name
101 );
102 ChannelReceiverDefBuilder::new(match &options.subscribe_channel_name {
103 Some(provided_name) => {
104 if provided_name.as_str() == "+" {
105 "any"
106 } else {
107 provided_name
108 }
109 }
110 None => "any",
111 })
112 .role(options.subscribe_role.as_deref())
113 .id(options.subscribe_id.as_deref())
114 .build(tether_agent)
115 } else {
116 debug!(
117 "Using custom override topic \"{:?}\"",
118 &options.subscribe_topic
119 );
120 ChannelReceiverDefBuilder::new("custom")
121 .override_topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
122 .build(tether_agent)
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use tether_agent::{ChannelDef, TetherAgentBuilder};
129
130 use crate::tether_receive::build_receiver;
131
132 use super::ReceiveOptions;
133
134 #[test]
135 fn default_options() {
136 let tether_agent = TetherAgentBuilder::new("tester")
137 .auto_connect(false)
138 .build()
139 .unwrap();
140
141 let options = ReceiveOptions::default();
142
143 let receiver_def = build_receiver(&options, &tether_agent);
144
145 assert_eq!(receiver_def.name(), "custom");
146 assert_eq!(receiver_def.generated_topic(), "#");
147 }
148
149 #[test]
150 fn only_topic_custom() {
151 let tether_agent = TetherAgentBuilder::new("tester")
152 .auto_connect(false)
153 .build()
154 .unwrap();
155
156 let options = ReceiveOptions {
157 subscribe_role: None,
158 subscribe_id: None,
159 subscribe_channel_name: None,
160 subscribe_topic: Some("some/channel/special/fourpart".into()),
161 };
162
163 let receiver_def = build_receiver(&options, &tether_agent);
164
165 assert_eq!(receiver_def.name(), "custom");
166 assert_eq!(
167 receiver_def.generated_topic(),
168 "some/channel/special/fourpart"
169 );
170 }
171
172 #[test]
173 fn only_chanel_name() {
174 let tether_agent = TetherAgentBuilder::new("tester")
175 .auto_connect(false)
176 .build()
177 .unwrap();
178
179 let options = ReceiveOptions {
180 subscribe_role: None,
181 subscribe_id: None,
182 subscribe_channel_name: Some("something".into()),
183 subscribe_topic: None,
184 };
185
186 let receiver_def = build_receiver(&options, &tether_agent);
187
188 assert_eq!(receiver_def.name(), "something");
189 assert_eq!(receiver_def.generated_topic(), "+/something/#");
190 }
191
192 #[test]
193 fn only_role() {
194 let tether_agent = TetherAgentBuilder::new("tester")
195 .auto_connect(false)
196 .build()
197 .unwrap();
198
199 let options = ReceiveOptions {
200 subscribe_role: Some("something".into()),
201 subscribe_id: None,
202 subscribe_channel_name: None,
203 subscribe_topic: None,
204 };
205
206 let receiver_def = build_receiver(&options, &tether_agent);
207
208 assert_eq!(receiver_def.name(), "any");
209 assert_eq!(receiver_def.generated_topic(), "something/+/#");
210 }
211
212 #[test]
213 fn only_id() {
214 let tether_agent = TetherAgentBuilder::new("tester")
215 .auto_connect(false)
216 .build()
217 .unwrap();
218
219 let options = ReceiveOptions {
220 subscribe_role: None,
221 subscribe_id: Some("something".into()),
222 subscribe_channel_name: None,
223 subscribe_topic: None,
224 };
225
226 let receiver_def = build_receiver(&options, &tether_agent);
227
228 assert_eq!(receiver_def.name(), "any");
229 assert_eq!(receiver_def.generated_topic(), "+/+/something");
230 }
231
232 #[test]
233 fn role_and_id() {
234 let tether_agent = TetherAgentBuilder::new("tester")
235 .auto_connect(false)
236 .build()
237 .unwrap();
238
239 let options = ReceiveOptions {
240 subscribe_role: Some("x".into()),
241 subscribe_id: Some("y".into()),
242 subscribe_channel_name: None,
243 subscribe_topic: None,
244 };
245
246 let receiver_def = build_receiver(&options, &tether_agent);
247
248 assert_eq!(receiver_def.name(), "any");
249 assert_eq!(receiver_def.generated_topic(), "x/+/y");
250 }
251
252 #[test]
253 fn role_and_channel_name() {
254 let tether_agent = TetherAgentBuilder::new("tester")
255 .auto_connect(false)
256 .build()
257 .unwrap();
258
259 let options = ReceiveOptions {
260 subscribe_role: Some("x".into()),
261 subscribe_id: None,
262 subscribe_channel_name: Some("z".into()),
263 subscribe_topic: None,
264 };
265
266 let receiver_def = build_receiver(&options, &tether_agent);
267
268 assert_eq!(receiver_def.name(), "z");
269 assert_eq!(receiver_def.generated_topic(), "x/z/#");
270 }
271
272 #[test]
273 fn spec_all_three() {
274 let tether_agent = TetherAgentBuilder::new("tester")
275 .auto_connect(false)
276 .build()
277 .unwrap();
278
279 let options = ReceiveOptions {
280 subscribe_role: Some("x".into()),
281 subscribe_channel_name: Some("z".into()),
282 subscribe_id: Some("y".into()),
283 subscribe_topic: None,
284 };
285
286 let receiver_def = build_receiver(&options, &tether_agent);
287
288 assert_eq!(receiver_def.name(), "z");
289 assert_eq!(receiver_def.generated_topic(), "x/z/y");
290 }
291
292 #[test]
293 fn redundant_but_valid() {
294 let tether_agent = TetherAgentBuilder::new("tester")
295 .auto_connect(false)
296 .build()
297 .unwrap();
298
299 let options = ReceiveOptions {
300 subscribe_role: None,
301 subscribe_id: None,
302 subscribe_channel_name: Some("+".into()),
303 subscribe_topic: None,
304 };
305
306 let receiver_def = build_receiver(&options, &tether_agent);
307
308 assert_eq!(receiver_def.name(), "any");
309 assert_eq!(receiver_def.generated_topic(), "+/+/#");
310 }
311}