tether_utils/
tether_receive.rs

1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{
4    ChannelDef, ChannelDefBuilder, ChannelReceiverDef, ChannelReceiverDefBuilder, TetherAgent,
5    tether_compliant_topic::TetherOrCustomTopic,
6};
7
8#[derive(Args, Default)]
9pub struct ReceiveOptions {
10    /// Specify a ROLE (instead of wildcard +)
11    #[arg(long = "channel.role")]
12    pub subscribe_role: Option<String>,
13
14    /// Specify an ID (instead of wildcard #)
15    #[arg(long = "channel.id")]
16    pub subscribe_id: Option<String>,
17
18    /// Specify a CHANNEL NAME part for the topic (instead of wildcard +)
19    #[arg(long = "channel.name")]
20    pub subscribe_channel_name: Option<String>,
21
22    /// Override topic to subscribe; setting this will
23    /// ignore any `channel.` options you may have set, since the
24    /// topic is built manually.
25    #[arg(long = "topic")]
26    pub subscribe_topic: Option<String>,
27}
28
29pub fn receive(
30    options: &ReceiveOptions,
31    tether_agent: &mut TetherAgent,
32    on_message: fn(channel_name: String, topic: String, decoded: Option<String>),
33) {
34    info!("Tether Receive Utility");
35
36    let channel_def = build_receiver(options, tether_agent);
37
38    // let channel = channel_options
39    //     .build(tether_agent)
40    //     .expect("failed to create Channel Receiver");
41
42    // let channel = tether_agent
43    //     .create_receiver_with_def(channel_def)
44    //     .expect("failed to create Receiver");
45
46    if let Some(client) = tether_agent.client_mut() {
47        client
48            .subscribe(channel_def.generated_topic(), channel_def.qos())
49            .expect("failed to subscribe");
50
51        info!(
52            "Subscribed to topic \"{}\" ...",
53            channel_def.generated_topic()
54        );
55
56        loop {
57            let mut did_work = false;
58            while let Some((topic, payload)) = tether_agent.check_messages() {
59                did_work = true;
60                let full_topic_string = topic.full_topic_string();
61                debug!("Received message on topic \"{}\"", &full_topic_string);
62                let channel_name = match topic {
63                    TetherOrCustomTopic::Custom(_) => String::from("unknown"),
64                    TetherOrCustomTopic::Tether(tpt) => String::from(tpt.channel_name()),
65                };
66
67                if payload.is_empty() {
68                    debug!("Empty message payload");
69                    on_message(channel_name, full_topic_string, None);
70                } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
71                    let json = serde_json::to_string(&value).expect("failed to stringify JSON");
72                    debug!("Decoded MessagePack payload: {}", json);
73                    on_message(channel_name, full_topic_string, Some(json));
74                } else {
75                    debug!("Failed to decode MessagePack payload");
76                    if let Ok(s) = String::from_utf8(payload.to_vec()) {
77                        warn!("String representation of payload: \"{}\"", s);
78                    } else {
79                        error!("Could not decode payload bytes as string, either");
80                    }
81                    on_message(channel_name, full_topic_string, None);
82                }
83            }
84            if !did_work {
85                std::thread::sleep(std::time::Duration::from_micros(100)); //0.1 ms
86            }
87        }
88    } else {
89        error!("Failed to subscribe via Client!");
90    }
91}
92
93fn build_receiver(options: &ReceiveOptions, tether_agent: &TetherAgent) -> ChannelReceiverDef {
94    if options.subscribe_id.is_some()
95        || options.subscribe_role.is_some()
96        || options.subscribe_channel_name.is_some()
97    {
98        debug!(
99            "TPT Overrides apply: {:?}, {:?}, {:?}",
100            &options.subscribe_id, &options.subscribe_role, &options.subscribe_channel_name
101        );
102        ChannelReceiverDefBuilder::new(match &options.subscribe_channel_name {
103            Some(provided_name) => {
104                if provided_name.as_str() == "+" {
105                    "any"
106                } else {
107                    provided_name
108                }
109            }
110            None => "any",
111        })
112        .role(options.subscribe_role.as_deref())
113        .id(options.subscribe_id.as_deref())
114        .build(tether_agent)
115    } else {
116        debug!(
117            "Using custom override topic \"{:?}\"",
118            &options.subscribe_topic
119        );
120        ChannelReceiverDefBuilder::new("custom")
121            .override_topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
122            .build(tether_agent)
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use tether_agent::{ChannelDef, TetherAgentBuilder};
129
130    use crate::tether_receive::build_receiver;
131
132    use super::ReceiveOptions;
133
134    #[test]
135    fn default_options() {
136        let tether_agent = TetherAgentBuilder::new("tester")
137            .auto_connect(false)
138            .build()
139            .unwrap();
140
141        let options = ReceiveOptions::default();
142
143        let receiver_def = build_receiver(&options, &tether_agent);
144
145        assert_eq!(receiver_def.name(), "custom");
146        assert_eq!(receiver_def.generated_topic(), "#");
147    }
148
149    #[test]
150    fn only_topic_custom() {
151        let tether_agent = TetherAgentBuilder::new("tester")
152            .auto_connect(false)
153            .build()
154            .unwrap();
155
156        let options = ReceiveOptions {
157            subscribe_role: None,
158            subscribe_id: None,
159            subscribe_channel_name: None,
160            subscribe_topic: Some("some/channel/special/fourpart".into()),
161        };
162
163        let receiver_def = build_receiver(&options, &tether_agent);
164
165        assert_eq!(receiver_def.name(), "custom");
166        assert_eq!(
167            receiver_def.generated_topic(),
168            "some/channel/special/fourpart"
169        );
170    }
171
172    #[test]
173    fn only_chanel_name() {
174        let tether_agent = TetherAgentBuilder::new("tester")
175            .auto_connect(false)
176            .build()
177            .unwrap();
178
179        let options = ReceiveOptions {
180            subscribe_role: None,
181            subscribe_id: None,
182            subscribe_channel_name: Some("something".into()),
183            subscribe_topic: None,
184        };
185
186        let receiver_def = build_receiver(&options, &tether_agent);
187
188        assert_eq!(receiver_def.name(), "something");
189        assert_eq!(receiver_def.generated_topic(), "+/something/#");
190    }
191
192    #[test]
193    fn only_role() {
194        let tether_agent = TetherAgentBuilder::new("tester")
195            .auto_connect(false)
196            .build()
197            .unwrap();
198
199        let options = ReceiveOptions {
200            subscribe_role: Some("something".into()),
201            subscribe_id: None,
202            subscribe_channel_name: None,
203            subscribe_topic: None,
204        };
205
206        let receiver_def = build_receiver(&options, &tether_agent);
207
208        assert_eq!(receiver_def.name(), "any");
209        assert_eq!(receiver_def.generated_topic(), "something/+/#");
210    }
211
212    #[test]
213    fn only_id() {
214        let tether_agent = TetherAgentBuilder::new("tester")
215            .auto_connect(false)
216            .build()
217            .unwrap();
218
219        let options = ReceiveOptions {
220            subscribe_role: None,
221            subscribe_id: Some("something".into()),
222            subscribe_channel_name: None,
223            subscribe_topic: None,
224        };
225
226        let receiver_def = build_receiver(&options, &tether_agent);
227
228        assert_eq!(receiver_def.name(), "any");
229        assert_eq!(receiver_def.generated_topic(), "+/+/something");
230    }
231
232    #[test]
233    fn role_and_id() {
234        let tether_agent = TetherAgentBuilder::new("tester")
235            .auto_connect(false)
236            .build()
237            .unwrap();
238
239        let options = ReceiveOptions {
240            subscribe_role: Some("x".into()),
241            subscribe_id: Some("y".into()),
242            subscribe_channel_name: None,
243            subscribe_topic: None,
244        };
245
246        let receiver_def = build_receiver(&options, &tether_agent);
247
248        assert_eq!(receiver_def.name(), "any");
249        assert_eq!(receiver_def.generated_topic(), "x/+/y");
250    }
251
252    #[test]
253    fn role_and_channel_name() {
254        let tether_agent = TetherAgentBuilder::new("tester")
255            .auto_connect(false)
256            .build()
257            .unwrap();
258
259        let options = ReceiveOptions {
260            subscribe_role: Some("x".into()),
261            subscribe_id: None,
262            subscribe_channel_name: Some("z".into()),
263            subscribe_topic: None,
264        };
265
266        let receiver_def = build_receiver(&options, &tether_agent);
267
268        assert_eq!(receiver_def.name(), "z");
269        assert_eq!(receiver_def.generated_topic(), "x/z/#");
270    }
271
272    #[test]
273    fn spec_all_three() {
274        let tether_agent = TetherAgentBuilder::new("tester")
275            .auto_connect(false)
276            .build()
277            .unwrap();
278
279        let options = ReceiveOptions {
280            subscribe_role: Some("x".into()),
281            subscribe_channel_name: Some("z".into()),
282            subscribe_id: Some("y".into()),
283            subscribe_topic: None,
284        };
285
286        let receiver_def = build_receiver(&options, &tether_agent);
287
288        assert_eq!(receiver_def.name(), "z");
289        assert_eq!(receiver_def.generated_topic(), "x/z/y");
290    }
291
292    #[test]
293    fn redundant_but_valid() {
294        let tether_agent = TetherAgentBuilder::new("tester")
295            .auto_connect(false)
296            .build()
297            .unwrap();
298
299        let options = ReceiveOptions {
300            subscribe_role: None,
301            subscribe_id: None,
302            subscribe_channel_name: Some("+".into()),
303            subscribe_topic: None,
304        };
305
306        let receiver_def = build_receiver(&options, &tether_agent);
307
308        assert_eq!(receiver_def.name(), "any");
309        assert_eq!(receiver_def.generated_topic(), "+/+/#");
310    }
311}