tether_utils/
tether_receive.rs

1use clap::Args;
2use log::{debug, error, info, warn};
3use tether_agent::{three_part_topic::TetherOrCustomTopic, PlugOptionsBuilder, TetherAgent};
4
5#[derive(Args, Default)]
6pub struct ReceiveOptions {
7    /// Specify a ROLE (instead of wildcard +)
8    #[arg(long = "plug.role")]
9    pub subscribe_role: Option<String>,
10
11    /// Specify an ID (instead of wildcard +)
12    #[arg(long = "plug.id")]
13    pub subscribe_id: Option<String>,
14
15    /// Specify a PLUG NAME part for the topic (instead of wildcard +)
16    #[arg(long = "plug.name")]
17    pub subscribe_plug_name: Option<String>,
18
19    /// Override topic to subscribe; setting this will
20    /// ignore any `plug.` options you may have set, since the
21    /// topic is built manually.
22    #[arg(long = "topic")]
23    pub subscribe_topic: Option<String>,
24}
25
26pub fn receive(
27    options: &ReceiveOptions,
28    tether_agent: &mut TetherAgent,
29    on_message: fn(plug_name: String, topic: String, decoded: Option<String>),
30) {
31    info!("Tether Receive Utility");
32
33    let input_def = build_receiver_plug(options);
34
35    let input = input_def
36        .build(tether_agent)
37        .expect("failed to create input plug");
38
39    info!("Subscribed to topic \"{}\" ...", input.topic());
40
41    loop {
42        let mut did_work = false;
43        while let Some((topic, payload)) = tether_agent.check_messages() {
44            did_work = true;
45            let full_topic_string = topic.full_topic_string();
46            debug!("Received message on topic \"{}\"", &full_topic_string);
47            let plug_name = match topic {
48                TetherOrCustomTopic::Custom(_) => String::from("unknown"),
49                TetherOrCustomTopic::Tether(tpt) => String::from(tpt.plug_name()),
50            };
51
52            if payload.is_empty() {
53                debug!("Empty message payload");
54                on_message(plug_name, full_topic_string, None);
55            } else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(&payload) {
56                let json = serde_json::to_string(&value).expect("failed to stringify JSON");
57                debug!("Decoded MessagePack payload: {}", json);
58                on_message(plug_name, full_topic_string, Some(json));
59            } else {
60                debug!("Failed to decode MessagePack payload");
61                if let Ok(s) = String::from_utf8(payload.to_vec()) {
62                    warn!("String representation of payload: \"{}\"", s);
63                } else {
64                    error!("Could not decode payload bytes as string, either");
65                }
66                on_message(plug_name, full_topic_string, None);
67            }
68        }
69        if !did_work {
70            std::thread::sleep(std::time::Duration::from_micros(100)); //0.1 ms
71        }
72    }
73}
74
75fn build_receiver_plug(options: &ReceiveOptions) -> PlugOptionsBuilder {
76    if options.subscribe_id.is_some()
77        || options.subscribe_role.is_some()
78        || options.subscribe_plug_name.is_some()
79    {
80        debug!(
81            "TPT Overrides apply: {:?}, {:?}, {:?}",
82            &options.subscribe_id, &options.subscribe_role, &options.subscribe_plug_name
83        );
84        PlugOptionsBuilder::create_input(match &options.subscribe_plug_name {
85            Some(provided_name) => {
86                if provided_name.as_str() == "+" {
87                    "any"
88                } else {
89                    provided_name
90                }
91            }
92            None => "any",
93        })
94        .role(options.subscribe_role.as_deref())
95        .id(options.subscribe_id.as_deref())
96        .name(match &options.subscribe_plug_name {
97            Some(provided_name_part) => {
98                if provided_name_part.as_str() == "+" {
99                    Some("+")
100                } else {
101                    None
102                }
103            }
104            None => {
105                if options.subscribe_id.is_some() || options.subscribe_role.is_some() {
106                    // No plug name part was supplied, but other parts were; therefore
107                    // in this case Tether Receive should subscribr to all messages
108                    // matching or both of the specified Agent and/or Role
109                    Some("+")
110                } else {
111                    // No plug name part was supplied, but neither was anything else
112                    // Logically, we shouldn't reach this point because of the outer condition
113                    // but it must be provided here for completeness
114                    None
115                }
116            }
117        })
118    } else {
119        debug!(
120            "Using custom override topic \"{:?}\"",
121            &options.subscribe_topic
122        );
123        PlugOptionsBuilder::create_input("custom")
124            .topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use tether_agent::TetherAgentOptionsBuilder;
131
132    use crate::tether_receive::build_receiver_plug;
133
134    use super::ReceiveOptions;
135
136    #[test]
137    fn default_options() {
138        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
139            .build()
140            .expect("sorry, these tests require working localhost Broker");
141
142        let options = ReceiveOptions::default();
143
144        let receive_plug = build_receiver_plug(&options)
145            .build(&mut tether_agent)
146            .expect("build failed");
147
148        assert_eq!(receive_plug.name(), "custom");
149        assert_eq!(receive_plug.topic(), "#");
150    }
151
152    #[test]
153    fn only_topic_custom() {
154        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
155            .build()
156            .expect("sorry, these tests require working localhost Broker");
157
158        let options = ReceiveOptions {
159            subscribe_role: None,
160            subscribe_id: None,
161            subscribe_plug_name: None,
162            subscribe_topic: Some("some/special/plug".into()),
163        };
164
165        let receive_plug = build_receiver_plug(&options)
166            .build(&mut tether_agent)
167            .expect("build failed");
168
169        assert_eq!(receive_plug.name(), "custom");
170        assert_eq!(receive_plug.topic(), "some/special/plug");
171    }
172
173    #[test]
174    fn only_plug_name() {
175        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
176            .build()
177            .expect("sorry, these tests require working localhost Broker");
178
179        let options = ReceiveOptions {
180            subscribe_role: None,
181            subscribe_id: None,
182            subscribe_plug_name: Some("something".into()),
183            subscribe_topic: None,
184        };
185
186        let receive_plug = build_receiver_plug(&options)
187            .build(&mut tether_agent)
188            .expect("build failed");
189
190        assert_eq!(receive_plug.name(), "something");
191        assert_eq!(receive_plug.topic(), "+/+/something");
192    }
193
194    #[test]
195    fn only_role() {
196        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
197            .build()
198            .expect("sorry, these tests require working localhost Broker");
199
200        let options = ReceiveOptions {
201            subscribe_role: Some("something".into()),
202            subscribe_id: None,
203            subscribe_plug_name: None,
204            subscribe_topic: None,
205        };
206
207        let receive_plug = build_receiver_plug(&options)
208            .build(&mut tether_agent)
209            .expect("build failed");
210
211        assert_eq!(receive_plug.name(), "any");
212        assert_eq!(receive_plug.topic(), "something/+/+");
213    }
214
215    #[test]
216    fn only_id() {
217        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
218            .build()
219            .expect("sorry, these tests require working localhost Broker");
220
221        let options = ReceiveOptions {
222            subscribe_role: None,
223            subscribe_id: Some("something".into()),
224            subscribe_plug_name: None,
225            subscribe_topic: None,
226        };
227
228        let receive_plug = build_receiver_plug(&options)
229            .build(&mut tether_agent)
230            .expect("build failed");
231
232        assert_eq!(receive_plug.name(), "any");
233        assert_eq!(receive_plug.topic(), "+/something/+");
234    }
235
236    #[test]
237    fn role_and_id() {
238        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
239            .build()
240            .expect("sorry, these tests require working localhost Broker");
241
242        let options = ReceiveOptions {
243            subscribe_role: Some("x".into()),
244            subscribe_id: Some("y".into()),
245            subscribe_plug_name: None,
246            subscribe_topic: None,
247        };
248
249        let receive_plug = build_receiver_plug(&options)
250            .build(&mut tether_agent)
251            .expect("build failed");
252
253        assert_eq!(receive_plug.name(), "any");
254        assert_eq!(receive_plug.topic(), "x/y/+");
255    }
256
257    #[test]
258    fn role_and_plug_name() {
259        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
260            .build()
261            .expect("sorry, these tests require working localhost Broker");
262
263        let options = ReceiveOptions {
264            subscribe_role: Some("x".into()),
265            subscribe_id: None,
266            subscribe_plug_name: Some("z".into()),
267            subscribe_topic: None,
268        };
269
270        let receive_plug = build_receiver_plug(&options)
271            .build(&mut tether_agent)
272            .expect("build failed");
273
274        assert_eq!(receive_plug.name(), "z");
275        assert_eq!(receive_plug.topic(), "x/+/z");
276    }
277
278    #[test]
279    fn spec_all_three() {
280        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
281            .build()
282            .expect("sorry, these tests require working localhost Broker");
283
284        let options = ReceiveOptions {
285            subscribe_role: Some("x".into()),
286            subscribe_id: Some("y".into()),
287            subscribe_plug_name: Some("z".into()),
288            subscribe_topic: None,
289        };
290
291        let receive_plug = build_receiver_plug(&options)
292            .build(&mut tether_agent)
293            .expect("build failed");
294
295        assert_eq!(receive_plug.name(), "z");
296        assert_eq!(receive_plug.topic(), "x/y/z");
297    }
298
299    #[test]
300    fn redundant_but_valid() {
301        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
302            .build()
303            .expect("sorry, these tests require working localhost Broker");
304
305        let options = ReceiveOptions {
306            subscribe_role: None,
307            subscribe_id: None,
308            subscribe_plug_name: Some("+".into()),
309            subscribe_topic: None,
310        };
311
312        let receive_plug = build_receiver_plug(&options)
313            .build(&mut tether_agent)
314            .expect("build failed");
315
316        assert_eq!(receive_plug.name(), "any");
317        assert_eq!(receive_plug.topic(), "+/+/+");
318    }
319}