tether_utils/
tether_receive.rs

1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{
4    tether_compliant_topic::TetherOrCustomTopic, ChannelOptionsBuilder, TetherAgent,
5};
6
7#[derive(Args, Default)]
8pub struct ReceiveOptions {
9    /// Specify a ROLE (instead of wildcard +)
10    #[arg(long = "channel.role")]
11    pub subscribe_role: Option<String>,
12
13    /// Specify an ID (instead of wildcard #)
14    #[arg(long = "channel.id")]
15    pub subscribe_id: Option<String>,
16
17    /// Specify a CHANNEL NAME part for the topic (instead of wildcard +)
18    #[arg(long = "channel.name")]
19    pub subscribe_channel_name: Option<String>,
20
21    /// Override topic to subscribe; setting this will
22    /// ignore any `channel.` options you may have set, since the
23    /// topic is built manually.
24    #[arg(long = "topic")]
25    pub subscribe_topic: Option<String>,
26}
27
28pub fn receive(
29    options: &ReceiveOptions,
30    tether_agent: &mut TetherAgent,
31    on_message: fn(channel_name: String, topic: String, decoded: Option<String>),
32) {
33    info!("Tether Receive Utility");
34
35    let channel_options = build_receiver(options);
36
37    let channel = channel_options
38        .build(tether_agent)
39        .expect("failed to create Channel Receiver");
40
41    info!("Subscribed to topic \"{}\" ...", channel.generated_topic());
42
43    loop {
44        let mut did_work = false;
45        while let Some((topic, payload)) = tether_agent.check_messages() {
46            did_work = true;
47            let full_topic_string = topic.full_topic_string();
48            debug!("Received message on topic \"{}\"", &full_topic_string);
49            let channel_name = match topic {
50                TetherOrCustomTopic::Custom(_) => String::from("unknown"),
51                TetherOrCustomTopic::Tether(tpt) => String::from(tpt.channel_name()),
52            };
53
54            if payload.is_empty() {
55                debug!("Empty message payload");
56                on_message(channel_name, full_topic_string, None);
57            } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
58                let json = serde_json::to_string(&value).expect("failed to stringify JSON");
59                debug!("Decoded MessagePack payload: {}", json);
60                on_message(channel_name, full_topic_string, Some(json));
61            } else {
62                debug!("Failed to decode MessagePack payload");
63                if let Ok(s) = String::from_utf8(payload.to_vec()) {
64                    warn!("String representation of payload: \"{}\"", s);
65                } else {
66                    error!("Could not decode payload bytes as string, either");
67                }
68                on_message(channel_name, full_topic_string, None);
69            }
70        }
71        if !did_work {
72            std::thread::sleep(std::time::Duration::from_micros(100)); //0.1 ms
73        }
74    }
75}
76
77fn build_receiver(options: &ReceiveOptions) -> ChannelOptionsBuilder {
78    if options.subscribe_id.is_some()
79        || options.subscribe_role.is_some()
80        || options.subscribe_channel_name.is_some()
81    {
82        debug!(
83            "TPT Overrides apply: {:?}, {:?}, {:?}",
84            &options.subscribe_id, &options.subscribe_role, &options.subscribe_channel_name
85        );
86        ChannelOptionsBuilder::create_receiver(match &options.subscribe_channel_name {
87            Some(provided_name) => {
88                if provided_name.as_str() == "+" {
89                    "any"
90                } else {
91                    provided_name
92                }
93            }
94            None => "any",
95        })
96        .role(options.subscribe_role.as_deref())
97        .id(options.subscribe_id.as_deref())
98        .name(match &options.subscribe_channel_name {
99            Some(provided_name_part) => {
100                if provided_name_part.as_str() == "+" {
101                    Some("+")
102                } else {
103                    None
104                }
105            }
106            None => {
107                if options.subscribe_id.is_some() || options.subscribe_role.is_some() {
108                    // No channel name part was supplied, but other parts were; therefore
109                    // in this case Tether Receive should subscribr to all messages
110                    // matching or both of the specified Agent and/or Role
111                    Some("+")
112                } else {
113                    // No channel name part was supplied, but neither was anything else
114                    // Logically, we shouldn't reach this point because of the outer condition
115                    // but it must be provided here for completeness
116                    None
117                }
118            }
119        })
120    } else {
121        debug!(
122            "Using custom override topic \"{:?}\"",
123            &options.subscribe_topic
124        );
125        ChannelOptionsBuilder::create_receiver("custom")
126            .topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use tether_agent::TetherAgentOptionsBuilder;
133
134    use crate::tether_receive::build_receiver;
135
136    use super::ReceiveOptions;
137
138    #[test]
139    fn default_options() {
140        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
141            .auto_connect(false)
142            .build()
143            .unwrap();
144
145        let options = ReceiveOptions::default();
146
147        let receiver = build_receiver(&options)
148            .build(&mut tether_agent)
149            .expect("build failed");
150
151        assert_eq!(receiver.name(), "custom");
152        assert_eq!(receiver.generated_topic(), "#");
153    }
154
155    #[test]
156    fn only_topic_custom() {
157        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
158            .auto_connect(false)
159            .build()
160            .unwrap();
161
162        let options = ReceiveOptions {
163            subscribe_role: None,
164            subscribe_id: None,
165            subscribe_channel_name: None,
166            subscribe_topic: Some("some/channel/special/fourpart".into()),
167        };
168
169        let receiver = build_receiver(&options)
170            .build(&mut tether_agent)
171            .expect("build failed");
172
173        assert_eq!(receiver.name(), "custom");
174        assert_eq!(receiver.generated_topic(), "some/channel/special/fourpart");
175    }
176
177    #[test]
178    fn only_chanel_name() {
179        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
180            .auto_connect(false)
181            .build()
182            .unwrap();
183
184        let options = ReceiveOptions {
185            subscribe_role: None,
186            subscribe_id: None,
187            subscribe_channel_name: Some("something".into()),
188            subscribe_topic: None,
189        };
190
191        let receiver = build_receiver(&options)
192            .build(&mut tether_agent)
193            .expect("build failed");
194
195        assert_eq!(receiver.name(), "something");
196        assert_eq!(receiver.generated_topic(), "+/something/#");
197    }
198
199    #[test]
200    fn only_role() {
201        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
202            .auto_connect(false)
203            .build()
204            .unwrap();
205
206        let options = ReceiveOptions {
207            subscribe_role: Some("something".into()),
208            subscribe_id: None,
209            subscribe_channel_name: None,
210            subscribe_topic: None,
211        };
212
213        let receiver = build_receiver(&options)
214            .build(&mut tether_agent)
215            .expect("build failed");
216
217        assert_eq!(receiver.name(), "any");
218        assert_eq!(receiver.generated_topic(), "something/+/#");
219    }
220
221    #[test]
222    fn only_id() {
223        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
224            .auto_connect(false)
225            .build()
226            .unwrap();
227
228        let options = ReceiveOptions {
229            subscribe_role: None,
230            subscribe_id: Some("something".into()),
231            subscribe_channel_name: None,
232            subscribe_topic: None,
233        };
234
235        let receiver = build_receiver(&options)
236            .build(&mut tether_agent)
237            .expect("build failed");
238
239        assert_eq!(receiver.name(), "any");
240        assert_eq!(receiver.generated_topic(), "+/+/something");
241    }
242
243    #[test]
244    fn role_and_id() {
245        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
246            .auto_connect(false)
247            .build()
248            .unwrap();
249
250        let options = ReceiveOptions {
251            subscribe_role: Some("x".into()),
252            subscribe_id: Some("y".into()),
253            subscribe_channel_name: None,
254            subscribe_topic: None,
255        };
256
257        let receiver = build_receiver(&options)
258            .build(&mut tether_agent)
259            .expect("build failed");
260
261        assert_eq!(receiver.name(), "any");
262        assert_eq!(receiver.generated_topic(), "x/+/y");
263    }
264
265    #[test]
266    fn role_and_channel_name() {
267        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
268            .auto_connect(false)
269            .build()
270            .unwrap();
271
272        let options = ReceiveOptions {
273            subscribe_role: Some("x".into()),
274            subscribe_id: None,
275            subscribe_channel_name: Some("z".into()),
276            subscribe_topic: None,
277        };
278
279        let receiver = build_receiver(&options)
280            .build(&mut tether_agent)
281            .expect("build failed");
282
283        assert_eq!(receiver.name(), "z");
284        assert_eq!(receiver.generated_topic(), "x/z/#");
285    }
286
287    #[test]
288    fn spec_all_three() {
289        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
290            .auto_connect(false)
291            .build()
292            .unwrap();
293
294        let options = ReceiveOptions {
295            subscribe_role: Some("x".into()),
296            subscribe_channel_name: Some("z".into()),
297            subscribe_id: Some("y".into()),
298            subscribe_topic: None,
299        };
300
301        let receiver = build_receiver(&options)
302            .build(&mut tether_agent)
303            .expect("build failed");
304
305        assert_eq!(receiver.name(), "z");
306        assert_eq!(receiver.generated_topic(), "x/z/y");
307    }
308
309    #[test]
310    fn redundant_but_valid() {
311        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
312            .auto_connect(false)
313            .build()
314            .unwrap();
315
316        let options = ReceiveOptions {
317            subscribe_role: None,
318            subscribe_id: None,
319            subscribe_channel_name: Some("+".into()),
320            subscribe_topic: None,
321        };
322
323        let receiver = build_receiver(&options)
324            .build(&mut tether_agent)
325            .expect("build failed");
326
327        assert_eq!(receiver.name(), "any");
328        assert_eq!(receiver.generated_topic(), "+/+/#");
329    }
330}