1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{
4 ChannelDef, ChannelDefBuilder, ChannelReceiverDef, ChannelReceiverDefBuilder, TetherAgent,
5 tether_compliant_topic::TetherOrCustomTopic,
6};
7
8#[derive(Args, Default)]
9pub struct ReceiveOptions {
10 #[arg(long = "channel.role")]
12 pub subscribe_role: Option<String>,
13
14 #[arg(long = "channel.id")]
16 pub subscribe_id: Option<String>,
17
18 #[arg(long = "channel.name")]
20 pub subscribe_channel_name: Option<String>,
21
22 #[arg(long = "topic")]
26 pub subscribe_topic: Option<String>,
27}
28
29pub fn receive(
30 options: &ReceiveOptions,
31 tether_agent: &mut TetherAgent,
32 on_message: fn(channel_name: String, topic: String, decoded: Option<String>),
33) {
34 info!("Tether Receive Utility");
35
36 let channel_def = build_receiver(options, tether_agent);
37
38 if let Some(client) = tether_agent.client_mut() {
47 client
48 .subscribe(channel_def.generated_topic(), channel_def.qos())
49 .expect("failed to subscribe");
50
51 info!(
52 "Subscribed to topic \"{}\" ...",
53 channel_def.generated_topic()
54 );
55
56 loop {
57 let mut did_work = false;
58 while let Some((topic, payload)) = tether_agent.check_messages() {
59 did_work = true;
60 let full_topic_string = topic.full_topic_string();
61 debug!("Received message on topic \"{}\"", &full_topic_string);
62 let channel_name = match topic {
63 TetherOrCustomTopic::Custom(_) => String::from("unknown"),
64 TetherOrCustomTopic::Tether(tpt) => String::from(tpt.channel_name()),
65 };
66
67 if payload.is_empty() {
68 debug!("Empty message payload");
69 on_message(channel_name, full_topic_string, None);
70 } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
71 let json = serde_json::to_string(&value).expect("failed to stringify JSON");
72 debug!("Decoded MessagePack payload: {}", json);
73 on_message(channel_name, full_topic_string, Some(json));
74 } else {
75 debug!("Failed to decode MessagePack payload");
76 if let Ok(s) = String::from_utf8(payload.to_vec()) {
77 warn!("String representation of payload: \"{}\"", s);
78 } else {
79 error!("Could not decode payload bytes as string, either");
80 }
81 on_message(channel_name, full_topic_string, None);
82 }
83 }
84 if !did_work {
85 std::thread::sleep(std::time::Duration::from_micros(100)); }
87 }
88 } else {
89 error!("Failed to subscribe via Client!");
90 }
91}
92
93fn build_receiver(options: &ReceiveOptions, tether_agent: &TetherAgent) -> ChannelReceiverDef {
94 if options.subscribe_id.is_some()
95 || options.subscribe_role.is_some()
96 || options.subscribe_channel_name.is_some()
97 {
98 debug!(
99 "TPT Overrides apply: {:?}, {:?}, {:?}",
100 &options.subscribe_id, &options.subscribe_role, &options.subscribe_channel_name
101 );
102 ChannelReceiverDefBuilder::new(match &options.subscribe_channel_name {
103 Some(provided_name) => provided_name,
104 None => "+",
105 })
106 .role(options.subscribe_role.as_deref())
107 .id(options.subscribe_id.as_deref())
108 .build(tether_agent)
109 } else {
110 debug!(
111 "Using custom override topic \"{:?}\"",
112 &options.subscribe_topic
113 );
114 ChannelReceiverDefBuilder::new("custom")
115 .override_topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
116 .build(tether_agent)
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use tether_agent::{ChannelDef, TetherAgentBuilder};
123
124 use crate::tether_receive::build_receiver;
125
126 use super::ReceiveOptions;
127
128 #[test]
129 fn default_options() {
130 let tether_agent = TetherAgentBuilder::new("tester")
131 .auto_connect(false)
132 .build()
133 .unwrap();
134
135 let options = ReceiveOptions::default();
136
137 let receiver_def = build_receiver(&options, &tether_agent);
138
139 assert_eq!(receiver_def.name(), "custom");
140 assert_eq!(receiver_def.generated_topic(), "#");
141 }
142
143 #[test]
144 fn only_topic_custom() {
145 let tether_agent = TetherAgentBuilder::new("tester")
146 .auto_connect(false)
147 .build()
148 .unwrap();
149
150 let options = ReceiveOptions {
151 subscribe_role: None,
152 subscribe_id: None,
153 subscribe_channel_name: None,
154 subscribe_topic: Some("some/channel/special/fourpart".into()),
155 };
156
157 let receiver_def = build_receiver(&options, &tether_agent);
158
159 assert_eq!(receiver_def.name(), "custom");
160 assert_eq!(
161 receiver_def.generated_topic(),
162 "some/channel/special/fourpart"
163 );
164 }
165
166 #[test]
167 fn only_chanel_name() {
168 let tether_agent = TetherAgentBuilder::new("tester")
169 .auto_connect(false)
170 .build()
171 .unwrap();
172
173 let options = ReceiveOptions {
174 subscribe_role: None,
175 subscribe_id: None,
176 subscribe_channel_name: Some("something".into()),
177 subscribe_topic: None,
178 };
179
180 let receiver_def = build_receiver(&options, &tether_agent);
181
182 assert_eq!(receiver_def.name(), "something");
183 assert_eq!(receiver_def.generated_topic(), "+/something/#");
184 }
185
186 #[test]
187 fn only_role() {
188 let tether_agent = TetherAgentBuilder::new("tester")
189 .auto_connect(false)
190 .build()
191 .unwrap();
192
193 let options = ReceiveOptions {
194 subscribe_role: Some("something".into()),
195 subscribe_id: None,
196 subscribe_channel_name: None,
197 subscribe_topic: None,
198 };
199
200 let receiver_def = build_receiver(&options, &tether_agent);
201
202 assert_eq!(receiver_def.name(), "+");
203 assert_eq!(receiver_def.generated_topic(), "something/+/#");
204 }
205
206 #[test]
207 fn only_id() {
208 let tether_agent = TetherAgentBuilder::new("tester")
209 .auto_connect(false)
210 .build()
211 .unwrap();
212
213 let options = ReceiveOptions {
214 subscribe_role: None,
215 subscribe_id: Some("something".into()),
216 subscribe_channel_name: None,
217 subscribe_topic: None,
218 };
219
220 let receiver_def = build_receiver(&options, &tether_agent);
221
222 assert_eq!(receiver_def.name(), "+");
223 assert_eq!(receiver_def.generated_topic(), "+/+/something");
224 }
225
226 #[test]
227 fn role_and_id() {
228 let tether_agent = TetherAgentBuilder::new("tester")
229 .auto_connect(false)
230 .build()
231 .unwrap();
232
233 let options = ReceiveOptions {
234 subscribe_role: Some("x".into()),
235 subscribe_id: Some("y".into()),
236 subscribe_channel_name: None,
237 subscribe_topic: None,
238 };
239
240 let receiver_def = build_receiver(&options, &tether_agent);
241
242 assert_eq!(receiver_def.name(), "+");
243 assert_eq!(receiver_def.generated_topic(), "x/+/y");
244 }
245
246 #[test]
247 fn role_and_channel_name() {
248 let tether_agent = TetherAgentBuilder::new("tester")
249 .auto_connect(false)
250 .build()
251 .unwrap();
252
253 let options = ReceiveOptions {
254 subscribe_role: Some("x".into()),
255 subscribe_id: None,
256 subscribe_channel_name: Some("z".into()),
257 subscribe_topic: None,
258 };
259
260 let receiver_def = build_receiver(&options, &tether_agent);
261
262 assert_eq!(receiver_def.name(), "z");
263 assert_eq!(receiver_def.generated_topic(), "x/z/#");
264 }
265
266 #[test]
267 fn spec_all_three() {
268 let tether_agent = TetherAgentBuilder::new("tester")
269 .auto_connect(false)
270 .build()
271 .unwrap();
272
273 let options = ReceiveOptions {
274 subscribe_role: Some("x".into()),
275 subscribe_channel_name: Some("z".into()),
276 subscribe_id: Some("y".into()),
277 subscribe_topic: None,
278 };
279
280 let receiver_def = build_receiver(&options, &tether_agent);
281
282 assert_eq!(receiver_def.name(), "z");
283 assert_eq!(receiver_def.generated_topic(), "x/z/y");
284 }
285
286 #[test]
287 fn redundant_but_valid() {
288 let tether_agent = TetherAgentBuilder::new("tester")
289 .auto_connect(false)
290 .build()
291 .unwrap();
292
293 let options = ReceiveOptions {
294 subscribe_role: None,
295 subscribe_id: None,
296 subscribe_channel_name: Some("+".into()),
297 subscribe_topic: None,
298 };
299
300 let receiver_def = build_receiver(&options, &tether_agent);
301
302 assert_eq!(receiver_def.name(), "+");
303 assert_eq!(receiver_def.generated_topic(), "+/+/#");
304 }
305}