tether_agent/channels/
options.rs

1use anyhow::anyhow;
2use log::{debug, error, info, warn};
3
4use crate::{
5    definitions::ChannelDefinitionCommon, tether_compliant_topic::TetherCompliantTopic,
6    ChannelDefinition, TetherAgent,
7};
8
9use super::{
10    tether_compliant_topic::TetherOrCustomTopic, ChannelReceiverDefinition, ChannelSenderDefinition,
11};
12
13pub struct ChannelReceiverOptions {
14    channel_name: String,
15    qos: Option<i32>,
16    override_subscribe_role: Option<String>,
17    override_subscribe_id: Option<String>,
18    override_subscribe_channel_name: Option<String>,
19    override_topic: Option<String>,
20}
21
22pub struct ChannelSenderOptions {
23    channel_name: String,
24    qos: Option<i32>,
25    override_publish_role: Option<String>,
26    override_publish_id: Option<String>,
27    override_topic: Option<String>,
28    retain: Option<bool>,
29}
30
31/// This is the definition of a Channel Receiver or Sender.
32///
33/// You typically don't use an instance of this directly; call `.build()` at the
34/// end of the chain to get a usable **ChannelDefinition**
35pub enum ChannelOptionsBuilder {
36    ChannelReceiver(ChannelReceiverOptions),
37    ChannelSender(ChannelSenderOptions),
38}
39
40impl ChannelOptionsBuilder {
41    pub fn create_receiver(name: &str) -> ChannelOptionsBuilder {
42        ChannelOptionsBuilder::ChannelReceiver(ChannelReceiverOptions {
43            channel_name: String::from(name),
44            override_subscribe_id: None,
45            override_subscribe_role: None,
46            override_subscribe_channel_name: None,
47            override_topic: None,
48            qos: None,
49        })
50    }
51
52    pub fn create_sender(name: &str) -> ChannelOptionsBuilder {
53        ChannelOptionsBuilder::ChannelSender(ChannelSenderOptions {
54            channel_name: String::from(name),
55            override_publish_id: None,
56            override_publish_role: None,
57            override_topic: None,
58            qos: None,
59            retain: None,
60        })
61    }
62
63    pub fn qos(mut self, qos: Option<i32>) -> Self {
64        match &mut self {
65            ChannelOptionsBuilder::ChannelReceiver(s) => s.qos = qos,
66            ChannelOptionsBuilder::ChannelSender(s) => s.qos = qos,
67        };
68        self
69    }
70
71    /**
72    Override the "role" part of the topic that gets generated for this Channel.
73    - For Channel Receivers, this means you want to be specific about the Role part
74      of the topic, instead of using the default wildcard `+` at this location
75    - For Channel Senders, this means you want to override the Role part instead
76      of using your Agent's "own" Role with which you created the Tether Agent
77
78    If you override the entire topic using `.topic` this will be ignored.
79    */
80    pub fn role(mut self, role: Option<&str>) -> Self {
81        match &mut self {
82            ChannelOptionsBuilder::ChannelReceiver(s) => {
83                if s.override_topic.is_some() {
84                    error!("Override topic was also provided; this will take precedence");
85                } else {
86                    s.override_subscribe_role = role.map(|s| s.into());
87                }
88            }
89            ChannelOptionsBuilder::ChannelSender(s) => {
90                if s.override_topic.is_some() {
91                    error!("Override topic was also provided; this will take precedence");
92                } else {
93                    s.override_publish_role = role.map(|s| s.into());
94                }
95            }
96        };
97        self
98    }
99
100    /**
101    Override the "id" part of the topic that gets generated for this Channel.
102    - For Channel Receivers, this means you want to be specific about the ID part
103      of the topic, instead of using the default wildcard `+` at this location
104    - For Channel Senders, this means you want to override the ID part instead
105      of using your Agent's "own" ID which you specified (or left blank, i.e. "any")
106      when creating the Tether Agent
107
108    If you override the entire topic using `.topic` this will be ignored.
109    */
110    pub fn id(mut self, id: Option<&str>) -> Self {
111        match &mut self {
112            ChannelOptionsBuilder::ChannelReceiver(s) => {
113                if s.override_topic.is_some() {
114                    error!("Override topic was also provided; this will take precedence");
115                } else {
116                    s.override_subscribe_id = id.map(|s| s.into());
117                }
118            }
119            ChannelOptionsBuilder::ChannelSender(s) => {
120                if s.override_topic.is_some() {
121                    error!("Override topic was also provided; this will take precedence");
122                } else {
123                    s.override_publish_id = id.map(|s| s.into());
124                }
125            }
126        };
127        self
128    }
129
130    /// Override the "name" part of the topic that gets generated for this Channel.
131    /// This is mainly to facilitate wildcard subscriptions such as
132    /// `someRole/+` instead of `someRole/originalChannelName`.
133    ///
134    /// In the case of Receiver Topics, a wildcard `+` can be used to substitute
135    /// the last part of the topic as in `role/id/+`
136    ///
137    /// Channel Senders will ignore (with an error) any attempt to change the name after
138    /// instantiation.
139    pub fn name(mut self, override_channel_name: Option<&str>) -> Self {
140        match &mut self {
141            ChannelOptionsBuilder::ChannelReceiver(opt) => {
142                if opt.override_topic.is_some() {
143                    error!("Override topic was also provided; this will take precedence");
144                }
145                if override_channel_name.is_some() {
146                    opt.override_subscribe_channel_name = override_channel_name.map(|s| s.into());
147                } else {
148                    debug!("Override Channel name set to None; will use original name \"{}\" given in ::create_receiver constructor", opt.channel_name);
149                }
150            }
151            ChannelOptionsBuilder::ChannelSender(_) => {
152                error!(
153                    "Channel Senders cannot change their name part after ::create_sender constructor"
154                );
155            }
156        };
157        self
158    }
159
160    /// Call this if you would like your Channel Receiver to match **any channel**.
161    /// This is equivalent to `.name(Some("+"))` but is provided for convenience
162    /// since it does not require you to remember the wildcard string.
163    ///
164    /// This also does not prevent you from further restricting the topic
165    /// subscription match by Role and/or ID. So, for example, if you are
166    /// interested in **all messages** from an Agent with the role `"brain"`,
167    /// it is valid to create a channel with `.role("brain").any_channel()` and this
168    /// will subscribe to `"brain/+/#"` as expected.
169    pub fn any_channel(mut self) -> Self {
170        match &mut self {
171            ChannelOptionsBuilder::ChannelReceiver(opt) => {
172                opt.override_subscribe_channel_name = Some("+".into());
173            }
174            ChannelOptionsBuilder::ChannelSender(_) => {
175                error!(
176                    "Channel Senders cannot change their name part after ::create_sender constructor"
177                );
178            }
179        }
180        self
181    }
182
183    /// Override the final topic to use for publishing or subscribing. The provided topic **will** be checked
184    /// against the Tether Compliant Topic (TCT) convention, but the function **will not** reject topic strings - just
185    /// produce a warning. It's therefore valid to use a wildcard such as "#", for Receivers (subscribing).
186    ///
187    /// Any customisations specified using `.role(...)` or `.id(...)` will be ignored if this function is called
188    /// after these.
189    ///
190    /// By default, the override_topic is None, but you can specify None explicitly using this function.
191    pub fn topic(mut self, override_topic: Option<&str>) -> Self {
192        match override_topic {
193            Some(t) => {
194                if TryInto::<TetherCompliantTopic>::try_into(t).is_ok() {
195                    info!("Custom topic passes Tether Compliant Topic validation");
196                } else if t == "#" {
197                    info!("Wildcard \"#\" custom topics are not Tether Compliant Topics but are valid");
198                } else {
199                    warn!(
200                        "Could not convert \"{}\" into Tether Compliant Topic; presumably you know what you're doing!",
201                        t
202                    );
203                }
204                match &mut self {
205                    ChannelOptionsBuilder::ChannelReceiver(s) => s.override_topic = Some(t.into()),
206                    ChannelOptionsBuilder::ChannelSender(s) => s.override_topic = Some(t.into()),
207                };
208            }
209            None => {
210                match &mut self {
211                    ChannelOptionsBuilder::ChannelReceiver(s) => s.override_topic = None,
212                    ChannelOptionsBuilder::ChannelSender(s) => s.override_topic = None,
213                };
214            }
215        }
216        self
217    }
218
219    pub fn retain(mut self, should_retain: Option<bool>) -> Self {
220        match &mut self {
221            Self::ChannelReceiver(_) => {
222                error!("Cannot set retain flag on Receiver / subscription");
223            }
224            Self::ChannelSender(s) => {
225                s.retain = should_retain;
226            }
227        }
228        self
229    }
230
231    /// Finalise the options (substituting suitable defaults if no custom values have been
232    /// provided) and return a valid ChannelDefinition that you can actually use.
233    pub fn build(self, tether_agent: &mut TetherAgent) -> anyhow::Result<ChannelDefinition> {
234        match self {
235            Self::ChannelReceiver(channel_options) => {
236                let tpt: TetherOrCustomTopic = match channel_options.override_topic {
237                    Some(custom) => TetherOrCustomTopic::Custom(custom),
238                    None => {
239                        debug!("Not a custom topic; provided overrides: role = {:?}, id = {:?}, name = {:?}", channel_options.override_subscribe_role, channel_options.override_subscribe_id, channel_options.override_subscribe_channel_name);
240
241                        TetherOrCustomTopic::Tether(TetherCompliantTopic::new_for_subscribe(
242                            &channel_options
243                                .override_subscribe_channel_name
244                                .unwrap_or(channel_options.channel_name.clone()),
245                            channel_options.override_subscribe_role.as_deref(),
246                            channel_options.override_subscribe_id.as_deref(),
247                        ))
248                    }
249                };
250                let channel_definition = ChannelReceiverDefinition::new(
251                    &channel_options.channel_name,
252                    tpt,
253                    channel_options.qos,
254                );
255
256                // This is really only useful for testing purposes.
257                if !tether_agent.auto_connect_enabled() {
258                    warn!("Auto-connect is disabled, skipping subscription");
259                    return Ok(ChannelDefinition::ChannelReceiver(channel_definition));
260                }
261
262                if let Some(client) = &tether_agent.client {
263                    match client.subscribe(
264                        channel_definition.generated_topic(),
265                        match channel_definition.qos() {
266                            0 => rumqttc::QoS::AtMostOnce,
267                            1 => rumqttc::QoS::AtLeastOnce,
268                            2 => rumqttc::QoS::ExactlyOnce,
269                            _ => rumqttc::QoS::AtLeastOnce,
270                        },
271                    ) {
272                        Ok(res) => {
273                            debug!(
274                                "This topic was fine: \"{}\"",
275                                channel_definition.generated_topic()
276                            );
277                            debug!("Server respond OK for subscribe: {res:?}");
278                            Ok(ChannelDefinition::ChannelReceiver(channel_definition))
279                        }
280                        Err(_e) => Err(anyhow!("ClientError")),
281                    }
282                } else {
283                    Err(anyhow!("Client not available for subscription"))
284                }
285            }
286            Self::ChannelSender(channel_options) => {
287                let tpt: TetherOrCustomTopic = match channel_options.override_topic {
288                    Some(custom) => {
289                        warn!(
290                            "Custom topic override: \"{}\" - all other options ignored",
291                            custom
292                        );
293                        TetherOrCustomTopic::Custom(custom)
294                    }
295                    None => {
296                        let optional_id_part = match channel_options.override_publish_id {
297                            Some(id) => {
298                                debug!("Publish ID was overriden at Channel options level. The Agent ID will be ignored.");
299                                Some(id)
300                            }
301                            None => {
302                                debug!("Publish ID was not overriden at Channel options level. The Agent ID will be used instead, if specified in Agent creation.");
303                                tether_agent.id().map(String::from)
304                            }
305                        };
306
307                        TetherOrCustomTopic::Tether(TetherCompliantTopic::new_for_publish(
308                            tether_agent,
309                            &channel_options.channel_name,
310                            channel_options.override_publish_role.as_deref(),
311                            optional_id_part.as_deref(),
312                        ))
313                    }
314                };
315
316                let channel_definition = ChannelSenderDefinition::new(
317                    &channel_options.channel_name,
318                    tpt,
319                    channel_options.qos,
320                    channel_options.retain,
321                );
322                Ok(ChannelDefinition::ChannelSender(channel_definition))
323            }
324        }
325    }
326}
327
328#[cfg(test)]
329mod tests {
330
331    use crate::{ChannelOptionsBuilder, TetherAgentOptionsBuilder};
332
333    // fn verbose_logging() {
334    //     use env_logger::{Builder, Env};
335    //     let mut logger_builder = Builder::from_env(Env::default().default_filter_or("debug"));
336    //     logger_builder.init();
337    // }
338
339    #[test]
340    fn default_receiver_channel() {
341        // verbose_logging();
342        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
343            .auto_connect(false)
344            .build()
345            .expect("sorry, these tests require working localhost Broker");
346        let receiver = ChannelOptionsBuilder::create_receiver("one")
347            .build(&mut tether_agent)
348            .unwrap();
349        assert_eq!(receiver.name(), "one");
350        assert_eq!(receiver.generated_topic(), "+/one/#");
351    }
352
353    #[test]
354    /// This is a fairly trivial example, but contrast with the test
355    /// `sender_channel_default_but_agent_id_custom`: although a custom ID was set for the
356    /// Agent, this does not affect the Topic for a Channel Receiver created without any
357    /// explicit overrides.
358    fn default_channel_receiver_with_agent_custom_id() {
359        // verbose_logging();
360        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
361            .auto_connect(false)
362            .id(Some("verySpecialGroup"))
363            .build()
364            .expect("sorry, these tests require working localhost Broker");
365        let receiver = ChannelOptionsBuilder::create_receiver("one")
366            .build(&mut tether_agent)
367            .unwrap();
368        assert_eq!(receiver.name(), "one");
369        assert_eq!(receiver.generated_topic(), "+/one/#");
370    }
371
372    #[test]
373    fn default_channel_sender() {
374        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
375            .auto_connect(false)
376            .build()
377            .expect("sorry, these tests require working localhost Broker");
378        let channel = ChannelOptionsBuilder::create_sender("two")
379            .build(&mut tether_agent)
380            .unwrap();
381        assert_eq!(channel.name(), "two");
382        assert_eq!(channel.generated_topic(), "tester/two");
383    }
384
385    #[test]
386    /// This is identical to the case in which a Channel Sender is created with defaults (no overrides),
387    /// BUT the Agent had a custom ID set, which means that the final topic includes this custom
388    /// ID/Group value.
389    fn sender_channel_default_but_agent_id_custom() {
390        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
391            .auto_connect(false)
392            .id(Some("specialCustomGrouping"))
393            .build()
394            .expect("sorry, these tests require working localhost Broker");
395        let channel = ChannelOptionsBuilder::create_sender("somethingStandard")
396            .build(&mut tether_agent)
397            .unwrap();
398        assert_eq!(channel.name(), "somethingStandard");
399        assert_eq!(
400            channel.generated_topic(),
401            "tester/somethingStandard/specialCustomGrouping"
402        );
403    }
404
405    #[test]
406    fn receiver_id_andor_role() {
407        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
408            .auto_connect(false)
409            .build()
410            .expect("sorry, these tests require working localhost Broker");
411
412        let receive_role_only = ChannelOptionsBuilder::create_receiver("theChannel")
413            .role(Some("specificRole"))
414            .build(&mut tether_agent)
415            .unwrap();
416        assert_eq!(receive_role_only.name(), "theChannel");
417        assert_eq!(
418            receive_role_only.generated_topic(),
419            "specificRole/theChannel/#"
420        );
421
422        let receiver_id_only = ChannelOptionsBuilder::create_receiver("theChannel")
423            .id(Some("specificID"))
424            .build(&mut tether_agent)
425            .unwrap();
426        assert_eq!(receiver_id_only.name(), "theChannel");
427        assert_eq!(
428            receiver_id_only.generated_topic(),
429            "+/theChannel/specificID"
430        );
431
432        let receiver_both_custom = ChannelOptionsBuilder::create_receiver("theChannel")
433            .id(Some("specificID"))
434            .role(Some("specificRole"))
435            .build(&mut tether_agent)
436            .unwrap();
437        assert_eq!(receiver_both_custom.name(), "theChannel");
438        assert_eq!(
439            receiver_both_custom.generated_topic(),
440            "specificRole/theChannel/specificID"
441        );
442    }
443
444    #[test]
445    /// If the end-user implicitly specifies the chanel name part (does not set it to Some(_)
446    /// or None) then the ID and/or Role parts will change but the Channel Name part will
447    /// remain the "original" / default
448    /// Contrast with receiver_specific_id_andor_role_no_chanel_name below.
449    fn receiver_specific_id_andor_role_with_channel_name() {
450        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
451            .auto_connect(false)
452            .build()
453            .expect("sorry, these tests require working localhost Broker");
454
455        let receiver_role_only = ChannelOptionsBuilder::create_receiver("theChannel")
456            .role(Some("specificRole"))
457            .build(&mut tether_agent)
458            .unwrap();
459        assert_eq!(receiver_role_only.name(), "theChannel");
460        assert_eq!(
461            receiver_role_only.generated_topic(),
462            "specificRole/theChannel/#"
463        );
464
465        let receiver_id_only = ChannelOptionsBuilder::create_receiver("theChannel")
466            .id(Some("specificID"))
467            .build(&mut tether_agent)
468            .unwrap();
469        assert_eq!(receiver_id_only.name(), "theChannel");
470        assert_eq!(
471            receiver_id_only.generated_topic(),
472            "+/theChannel/specificID"
473        );
474
475        let receiver_both = ChannelOptionsBuilder::create_receiver("theChannel")
476            .id(Some("specificID"))
477            .role(Some("specificRole"))
478            .build(&mut tether_agent)
479            .unwrap();
480        assert_eq!(receiver_both.name(), "theChannel");
481        assert_eq!(
482            receiver_both.generated_topic(),
483            "specificRole/theChannel/specificID"
484        );
485    }
486
487    #[test]
488    /// Unlike receiver_specific_id_andor_role_with_channel_name, this tests the situation where
489    /// the end-user (possibly) specifies the ID and/or Role, but also explicitly
490    /// sets the Channel Name to Some("+"), ie. "use a wildcard at this
491    /// position instead" - and NOT the original channel name.
492    fn receiver_specific_id_andor_role_no_channel_name() {
493        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
494            .auto_connect(false)
495            .build()
496            .expect("sorry, these tests require working localhost Broker");
497
498        let receiver_only_chanel_name_none = ChannelOptionsBuilder::create_receiver("theChannel")
499            .name(Some("+"))
500            .build(&mut tether_agent)
501            .unwrap();
502        assert_eq!(receiver_only_chanel_name_none.name(), "theChannel");
503        assert_eq!(receiver_only_chanel_name_none.generated_topic(), "+/+/#");
504
505        let receiver_role_only = ChannelOptionsBuilder::create_receiver("theChannel")
506            .name(Some("+"))
507            .role(Some("specificRole"))
508            .build(&mut tether_agent)
509            .unwrap();
510        assert_eq!(receiver_role_only.name(), "theChannel");
511        assert_eq!(receiver_role_only.generated_topic(), "specificRole/+/#");
512
513        let receiver_id_only = ChannelOptionsBuilder::create_receiver("theChannel")
514            // .name(Some("+"))
515            .any_channel() // equivalent to Some("+")
516            .id(Some("specificID"))
517            .build(&mut tether_agent)
518            .unwrap();
519        assert_eq!(receiver_id_only.name(), "theChannel");
520        assert_eq!(receiver_id_only.generated_topic(), "+/+/specificID");
521
522        let receiver_both = ChannelOptionsBuilder::create_receiver("theChannel")
523            .name(Some("+"))
524            .id(Some("specificID"))
525            .role(Some("specificRole"))
526            .build(&mut tether_agent)
527            .unwrap();
528        assert_eq!(receiver_both.name(), "theChannel");
529        assert_eq!(receiver_both.generated_topic(), "specificRole/+/specificID");
530    }
531
532    #[test]
533    fn any_name_but_specify_role() {
534        // Some fairly niche cases here
535
536        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
537            .auto_connect(false)
538            .build()
539            .expect("sorry, these tests require working localhost Broker");
540
541        let receiver_any_channel = ChannelOptionsBuilder::create_receiver("aTest")
542            .any_channel()
543            .build(&mut tether_agent)
544            .unwrap();
545
546        assert_eq!(receiver_any_channel.name(), "aTest");
547        assert_eq!(receiver_any_channel.generated_topic(), "+/+/#");
548
549        let receiver_specify_role = ChannelOptionsBuilder::create_receiver("aTest")
550            .any_channel()
551            .role(Some("brain"))
552            .build(&mut tether_agent)
553            .unwrap();
554
555        assert_eq!(receiver_specify_role.name(), "aTest");
556        assert_eq!(receiver_specify_role.generated_topic(), "brain/+/#");
557    }
558
559    #[test]
560    fn sender_custom() {
561        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
562            .auto_connect(false)
563            .build()
564            .expect("sorry, these tests require working localhost Broker");
565
566        let sender_custom_role = ChannelOptionsBuilder::create_sender("theChannelSender")
567            .role(Some("customRole"))
568            .build(&mut tether_agent)
569            .unwrap();
570        assert_eq!(sender_custom_role.name(), "theChannelSender");
571        assert_eq!(
572            sender_custom_role.generated_topic(),
573            "customRole/theChannelSender"
574        );
575
576        let sender_custom_id = ChannelOptionsBuilder::create_sender("theChannelSender")
577            .id(Some("customID"))
578            .build(&mut tether_agent)
579            .unwrap();
580        assert_eq!(sender_custom_id.name(), "theChannelSender");
581        assert_eq!(
582            sender_custom_id.generated_topic(),
583            "tester/theChannelSender/customID"
584        );
585
586        let sender_custom_both = ChannelOptionsBuilder::create_sender("theChannelSender")
587            .role(Some("customRole"))
588            .id(Some("customID"))
589            .build(&mut tether_agent)
590            .unwrap();
591        assert_eq!(sender_custom_both.name(), "theChannelSender");
592        assert_eq!(
593            sender_custom_both.generated_topic(),
594            "customRole/theChannelSender/customID"
595        );
596    }
597
598    #[test]
599    fn receiver_manual_topics() {
600        let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
601            .auto_connect(false)
602            .build()
603            .expect("sorry, these tests require working localhost Broker");
604
605        let receiver_all = ChannelOptionsBuilder::create_receiver("everything")
606            .topic(Some("#"))
607            .build(&mut tether_agent)
608            .unwrap();
609        assert_eq!(receiver_all.name(), "everything");
610        assert_eq!(receiver_all.generated_topic(), "#");
611
612        let receiver_nontether = ChannelOptionsBuilder::create_receiver("weird")
613            .topic(Some("foo/bar/baz/one/two/three"))
614            .build(&mut tether_agent)
615            .unwrap();
616        assert_eq!(receiver_nontether.name(), "weird");
617        assert_eq!(
618            receiver_nontether.generated_topic(),
619            "foo/bar/baz/one/two/three"
620        );
621    }
622}