tether_utils/
tether_receive.rs

1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{
4    ChannelDef, ChannelDefBuilder, ChannelReceiverDef, ChannelReceiverDefBuilder, TetherAgent,
5    tether_compliant_topic::TetherOrCustomTopic,
6};
7
8#[derive(Args, Default)]
9pub struct ReceiveOptions {
10    /// Specify a ROLE (instead of wildcard +)
11    #[arg(long = "channel.role")]
12    pub subscribe_role: Option<String>,
13
14    /// Specify an ID (instead of wildcard #)
15    #[arg(long = "channel.id")]
16    pub subscribe_id: Option<String>,
17
18    /// Specify a CHANNEL NAME part for the topic (instead of wildcard +)
19    #[arg(long = "channel.name")]
20    pub subscribe_channel_name: Option<String>,
21
22    /// Override topic to subscribe; setting this will
23    /// ignore any `channel.` options you may have set, since the
24    /// topic is built manually.
25    #[arg(long = "topic")]
26    pub subscribe_topic: Option<String>,
27}
28
29pub fn receive(
30    options: &ReceiveOptions,
31    tether_agent: &mut TetherAgent,
32    on_message: fn(channel_name: String, topic: String, decoded: Option<String>),
33) {
34    info!("Tether Receive Utility");
35
36    let channel_def = build_receiver(options, tether_agent);
37
38    // let channel = channel_options
39    //     .build(tether_agent)
40    //     .expect("failed to create Channel Receiver");
41
42    // let channel = tether_agent
43    //     .create_receiver_with_def(channel_def)
44    //     .expect("failed to create Receiver");
45
46    if let Some(client) = tether_agent.client_mut() {
47        client
48            .subscribe(channel_def.generated_topic(), channel_def.qos())
49            .expect("failed to subscribe");
50
51        info!(
52            "Subscribed to topic \"{}\" ...",
53            channel_def.generated_topic()
54        );
55
56        loop {
57            let mut did_work = false;
58            while let Some((topic, payload)) = tether_agent.check_messages() {
59                did_work = true;
60                let full_topic_string = topic.full_topic_string();
61                debug!("Received message on topic \"{}\"", &full_topic_string);
62                let channel_name = match topic {
63                    TetherOrCustomTopic::Custom(_) => String::from("unknown"),
64                    TetherOrCustomTopic::Tether(tpt) => String::from(tpt.channel_name()),
65                };
66
67                if payload.is_empty() {
68                    debug!("Empty message payload");
69                    on_message(channel_name, full_topic_string, None);
70                } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
71                    let json = serde_json::to_string(&value).expect("failed to stringify JSON");
72                    debug!("Decoded MessagePack payload: {}", json);
73                    on_message(channel_name, full_topic_string, Some(json));
74                } else {
75                    debug!("Failed to decode MessagePack payload");
76                    if let Ok(s) = String::from_utf8(payload.to_vec()) {
77                        warn!("String representation of payload: \"{}\"", s);
78                    } else {
79                        error!("Could not decode payload bytes as string, either");
80                    }
81                    on_message(channel_name, full_topic_string, None);
82                }
83            }
84            if !did_work {
85                std::thread::sleep(std::time::Duration::from_micros(100)); //0.1 ms
86            }
87        }
88    } else {
89        error!("Failed to subscribe via Client!");
90    }
91}
92
93fn build_receiver(options: &ReceiveOptions, tether_agent: &TetherAgent) -> ChannelReceiverDef {
94    if options.subscribe_id.is_some()
95        || options.subscribe_role.is_some()
96        || options.subscribe_channel_name.is_some()
97    {
98        debug!(
99            "TPT Overrides apply: {:?}, {:?}, {:?}",
100            &options.subscribe_id, &options.subscribe_role, &options.subscribe_channel_name
101        );
102        ChannelReceiverDefBuilder::new(match &options.subscribe_channel_name {
103            Some(provided_name) => provided_name,
104            None => "+",
105        })
106        .role(options.subscribe_role.as_deref())
107        .id(options.subscribe_id.as_deref())
108        .build(tether_agent)
109    } else {
110        debug!(
111            "Using custom override topic \"{:?}\"",
112            &options.subscribe_topic
113        );
114        ChannelReceiverDefBuilder::new("custom")
115            .override_topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
116            .build(tether_agent)
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use tether_agent::{ChannelDef, TetherAgentBuilder};
123
124    use crate::tether_receive::build_receiver;
125
126    use super::ReceiveOptions;
127
128    #[test]
129    fn default_options() {
130        let tether_agent = TetherAgentBuilder::new("tester")
131            .auto_connect(false)
132            .build()
133            .unwrap();
134
135        let options = ReceiveOptions::default();
136
137        let receiver_def = build_receiver(&options, &tether_agent);
138
139        assert_eq!(receiver_def.name(), "custom");
140        assert_eq!(receiver_def.generated_topic(), "#");
141    }
142
143    #[test]
144    fn only_topic_custom() {
145        let tether_agent = TetherAgentBuilder::new("tester")
146            .auto_connect(false)
147            .build()
148            .unwrap();
149
150        let options = ReceiveOptions {
151            subscribe_role: None,
152            subscribe_id: None,
153            subscribe_channel_name: None,
154            subscribe_topic: Some("some/channel/special/fourpart".into()),
155        };
156
157        let receiver_def = build_receiver(&options, &tether_agent);
158
159        assert_eq!(receiver_def.name(), "custom");
160        assert_eq!(
161            receiver_def.generated_topic(),
162            "some/channel/special/fourpart"
163        );
164    }
165
166    #[test]
167    fn only_chanel_name() {
168        let tether_agent = TetherAgentBuilder::new("tester")
169            .auto_connect(false)
170            .build()
171            .unwrap();
172
173        let options = ReceiveOptions {
174            subscribe_role: None,
175            subscribe_id: None,
176            subscribe_channel_name: Some("something".into()),
177            subscribe_topic: None,
178        };
179
180        let receiver_def = build_receiver(&options, &tether_agent);
181
182        assert_eq!(receiver_def.name(), "something");
183        assert_eq!(receiver_def.generated_topic(), "+/something/#");
184    }
185
186    #[test]
187    fn only_role() {
188        let tether_agent = TetherAgentBuilder::new("tester")
189            .auto_connect(false)
190            .build()
191            .unwrap();
192
193        let options = ReceiveOptions {
194            subscribe_role: Some("something".into()),
195            subscribe_id: None,
196            subscribe_channel_name: None,
197            subscribe_topic: None,
198        };
199
200        let receiver_def = build_receiver(&options, &tether_agent);
201
202        assert_eq!(receiver_def.name(), "+");
203        assert_eq!(receiver_def.generated_topic(), "something/+/#");
204    }
205
206    #[test]
207    fn only_id() {
208        let tether_agent = TetherAgentBuilder::new("tester")
209            .auto_connect(false)
210            .build()
211            .unwrap();
212
213        let options = ReceiveOptions {
214            subscribe_role: None,
215            subscribe_id: Some("something".into()),
216            subscribe_channel_name: None,
217            subscribe_topic: None,
218        };
219
220        let receiver_def = build_receiver(&options, &tether_agent);
221
222        assert_eq!(receiver_def.name(), "+");
223        assert_eq!(receiver_def.generated_topic(), "+/+/something");
224    }
225
226    #[test]
227    fn role_and_id() {
228        let tether_agent = TetherAgentBuilder::new("tester")
229            .auto_connect(false)
230            .build()
231            .unwrap();
232
233        let options = ReceiveOptions {
234            subscribe_role: Some("x".into()),
235            subscribe_id: Some("y".into()),
236            subscribe_channel_name: None,
237            subscribe_topic: None,
238        };
239
240        let receiver_def = build_receiver(&options, &tether_agent);
241
242        assert_eq!(receiver_def.name(), "+");
243        assert_eq!(receiver_def.generated_topic(), "x/+/y");
244    }
245
246    #[test]
247    fn role_and_channel_name() {
248        let tether_agent = TetherAgentBuilder::new("tester")
249            .auto_connect(false)
250            .build()
251            .unwrap();
252
253        let options = ReceiveOptions {
254            subscribe_role: Some("x".into()),
255            subscribe_id: None,
256            subscribe_channel_name: Some("z".into()),
257            subscribe_topic: None,
258        };
259
260        let receiver_def = build_receiver(&options, &tether_agent);
261
262        assert_eq!(receiver_def.name(), "z");
263        assert_eq!(receiver_def.generated_topic(), "x/z/#");
264    }
265
266    #[test]
267    fn spec_all_three() {
268        let tether_agent = TetherAgentBuilder::new("tester")
269            .auto_connect(false)
270            .build()
271            .unwrap();
272
273        let options = ReceiveOptions {
274            subscribe_role: Some("x".into()),
275            subscribe_channel_name: Some("z".into()),
276            subscribe_id: Some("y".into()),
277            subscribe_topic: None,
278        };
279
280        let receiver_def = build_receiver(&options, &tether_agent);
281
282        assert_eq!(receiver_def.name(), "z");
283        assert_eq!(receiver_def.generated_topic(), "x/z/y");
284    }
285
286    #[test]
287    fn redundant_but_valid() {
288        let tether_agent = TetherAgentBuilder::new("tester")
289            .auto_connect(false)
290            .build()
291            .unwrap();
292
293        let options = ReceiveOptions {
294            subscribe_role: None,
295            subscribe_id: None,
296            subscribe_channel_name: Some("+".into()),
297            subscribe_topic: None,
298        };
299
300        let receiver_def = build_receiver(&options, &tether_agent);
301
302        assert_eq!(receiver_def.name(), "+");
303        assert_eq!(receiver_def.generated_topic(), "+/+/#");
304    }
305}