1use anyhow::anyhow;
2use log::{debug, error, info, warn};
3
4use crate::{
5 definitions::ChannelDefinitionCommon, tether_compliant_topic::TetherCompliantTopic,
6 ChannelDefinition, TetherAgent,
7};
8
9use super::{
10 tether_compliant_topic::TetherOrCustomTopic, ChannelReceiverDefinition, ChannelSenderDefinition,
11};
12
13pub struct ChannelReceiverOptions {
14 channel_name: String,
15 qos: Option<i32>,
16 override_subscribe_role: Option<String>,
17 override_subscribe_id: Option<String>,
18 override_subscribe_channel_name: Option<String>,
19 override_topic: Option<String>,
20}
21
22pub struct ChannelSenderOptions {
23 channel_name: String,
24 qos: Option<i32>,
25 override_publish_role: Option<String>,
26 override_publish_id: Option<String>,
27 override_topic: Option<String>,
28 retain: Option<bool>,
29}
30
31pub enum ChannelOptionsBuilder {
36 ChannelReceiver(ChannelReceiverOptions),
37 ChannelSender(ChannelSenderOptions),
38}
39
40impl ChannelOptionsBuilder {
41 pub fn create_receiver(name: &str) -> ChannelOptionsBuilder {
42 ChannelOptionsBuilder::ChannelReceiver(ChannelReceiverOptions {
43 channel_name: String::from(name),
44 override_subscribe_id: None,
45 override_subscribe_role: None,
46 override_subscribe_channel_name: None,
47 override_topic: None,
48 qos: None,
49 })
50 }
51
52 pub fn create_sender(name: &str) -> ChannelOptionsBuilder {
53 ChannelOptionsBuilder::ChannelSender(ChannelSenderOptions {
54 channel_name: String::from(name),
55 override_publish_id: None,
56 override_publish_role: None,
57 override_topic: None,
58 qos: None,
59 retain: None,
60 })
61 }
62
63 pub fn qos(mut self, qos: Option<i32>) -> Self {
64 match &mut self {
65 ChannelOptionsBuilder::ChannelReceiver(s) => s.qos = qos,
66 ChannelOptionsBuilder::ChannelSender(s) => s.qos = qos,
67 };
68 self
69 }
70
71 pub fn role(mut self, role: Option<&str>) -> Self {
81 match &mut self {
82 ChannelOptionsBuilder::ChannelReceiver(s) => {
83 if s.override_topic.is_some() {
84 error!("Override topic was also provided; this will take precedence");
85 } else {
86 s.override_subscribe_role = role.map(|s| s.into());
87 }
88 }
89 ChannelOptionsBuilder::ChannelSender(s) => {
90 if s.override_topic.is_some() {
91 error!("Override topic was also provided; this will take precedence");
92 } else {
93 s.override_publish_role = role.map(|s| s.into());
94 }
95 }
96 };
97 self
98 }
99
100 pub fn id(mut self, id: Option<&str>) -> Self {
111 match &mut self {
112 ChannelOptionsBuilder::ChannelReceiver(s) => {
113 if s.override_topic.is_some() {
114 error!("Override topic was also provided; this will take precedence");
115 } else {
116 s.override_subscribe_id = id.map(|s| s.into());
117 }
118 }
119 ChannelOptionsBuilder::ChannelSender(s) => {
120 if s.override_topic.is_some() {
121 error!("Override topic was also provided; this will take precedence");
122 } else {
123 s.override_publish_id = id.map(|s| s.into());
124 }
125 }
126 };
127 self
128 }
129
130 pub fn name(mut self, override_channel_name: Option<&str>) -> Self {
140 match &mut self {
141 ChannelOptionsBuilder::ChannelReceiver(opt) => {
142 if opt.override_topic.is_some() {
143 error!("Override topic was also provided; this will take precedence");
144 }
145 if override_channel_name.is_some() {
146 opt.override_subscribe_channel_name = override_channel_name.map(|s| s.into());
147 } else {
148 debug!("Override Channel name set to None; will use original name \"{}\" given in ::create_receiver constructor", opt.channel_name);
149 }
150 }
151 ChannelOptionsBuilder::ChannelSender(_) => {
152 error!(
153 "Channel Senders cannot change their name part after ::create_sender constructor"
154 );
155 }
156 };
157 self
158 }
159
160 pub fn any_channel(mut self) -> Self {
170 match &mut self {
171 ChannelOptionsBuilder::ChannelReceiver(opt) => {
172 opt.override_subscribe_channel_name = Some("+".into());
173 }
174 ChannelOptionsBuilder::ChannelSender(_) => {
175 error!(
176 "Channel Senders cannot change their name part after ::create_sender constructor"
177 );
178 }
179 }
180 self
181 }
182
183 pub fn topic(mut self, override_topic: Option<&str>) -> Self {
192 match override_topic {
193 Some(t) => {
194 if TryInto::<TetherCompliantTopic>::try_into(t).is_ok() {
195 info!("Custom topic passes Tether Compliant Topic validation");
196 } else if t == "#" {
197 info!("Wildcard \"#\" custom topics are not Tether Compliant Topics but are valid");
198 } else {
199 warn!(
200 "Could not convert \"{}\" into Tether Compliant Topic; presumably you know what you're doing!",
201 t
202 );
203 }
204 match &mut self {
205 ChannelOptionsBuilder::ChannelReceiver(s) => s.override_topic = Some(t.into()),
206 ChannelOptionsBuilder::ChannelSender(s) => s.override_topic = Some(t.into()),
207 };
208 }
209 None => {
210 match &mut self {
211 ChannelOptionsBuilder::ChannelReceiver(s) => s.override_topic = None,
212 ChannelOptionsBuilder::ChannelSender(s) => s.override_topic = None,
213 };
214 }
215 }
216 self
217 }
218
219 pub fn retain(mut self, should_retain: Option<bool>) -> Self {
220 match &mut self {
221 Self::ChannelReceiver(_) => {
222 error!("Cannot set retain flag on Receiver / subscription");
223 }
224 Self::ChannelSender(s) => {
225 s.retain = should_retain;
226 }
227 }
228 self
229 }
230
231 pub fn build(self, tether_agent: &mut TetherAgent) -> anyhow::Result<ChannelDefinition> {
234 match self {
235 Self::ChannelReceiver(channel_options) => {
236 let tpt: TetherOrCustomTopic = match channel_options.override_topic {
237 Some(custom) => TetherOrCustomTopic::Custom(custom),
238 None => {
239 debug!("Not a custom topic; provided overrides: role = {:?}, id = {:?}, name = {:?}", channel_options.override_subscribe_role, channel_options.override_subscribe_id, channel_options.override_subscribe_channel_name);
240
241 TetherOrCustomTopic::Tether(TetherCompliantTopic::new_for_subscribe(
242 &channel_options
243 .override_subscribe_channel_name
244 .unwrap_or(channel_options.channel_name.clone()),
245 channel_options.override_subscribe_role.as_deref(),
246 channel_options.override_subscribe_id.as_deref(),
247 ))
248 }
249 };
250 let channel_definition = ChannelReceiverDefinition::new(
251 &channel_options.channel_name,
252 tpt,
253 channel_options.qos,
254 );
255
256 if !tether_agent.auto_connect_enabled() {
258 warn!("Auto-connect is disabled, skipping subscription");
259 return Ok(ChannelDefinition::ChannelReceiver(channel_definition));
260 }
261
262 if let Some(client) = &tether_agent.client {
263 match client.subscribe(
264 channel_definition.generated_topic(),
265 match channel_definition.qos() {
266 0 => rumqttc::QoS::AtMostOnce,
267 1 => rumqttc::QoS::AtLeastOnce,
268 2 => rumqttc::QoS::ExactlyOnce,
269 _ => rumqttc::QoS::AtLeastOnce,
270 },
271 ) {
272 Ok(res) => {
273 debug!(
274 "This topic was fine: \"{}\"",
275 channel_definition.generated_topic()
276 );
277 debug!("Server respond OK for subscribe: {res:?}");
278 Ok(ChannelDefinition::ChannelReceiver(channel_definition))
279 }
280 Err(_e) => Err(anyhow!("ClientError")),
281 }
282 } else {
283 Err(anyhow!("Client not available for subscription"))
284 }
285 }
286 Self::ChannelSender(channel_options) => {
287 let tpt: TetherOrCustomTopic = match channel_options.override_topic {
288 Some(custom) => {
289 warn!(
290 "Custom topic override: \"{}\" - all other options ignored",
291 custom
292 );
293 TetherOrCustomTopic::Custom(custom)
294 }
295 None => {
296 let optional_id_part = match channel_options.override_publish_id {
297 Some(id) => {
298 debug!("Publish ID was overriden at Channel options level. The Agent ID will be ignored.");
299 Some(id)
300 }
301 None => {
302 debug!("Publish ID was not overriden at Channel options level. The Agent ID will be used instead, if specified in Agent creation.");
303 tether_agent.id().map(String::from)
304 }
305 };
306
307 TetherOrCustomTopic::Tether(TetherCompliantTopic::new_for_publish(
308 tether_agent,
309 &channel_options.channel_name,
310 channel_options.override_publish_role.as_deref(),
311 optional_id_part.as_deref(),
312 ))
313 }
314 };
315
316 let channel_definition = ChannelSenderDefinition::new(
317 &channel_options.channel_name,
318 tpt,
319 channel_options.qos,
320 channel_options.retain,
321 );
322 Ok(ChannelDefinition::ChannelSender(channel_definition))
323 }
324 }
325 }
326}
327
328#[cfg(test)]
329mod tests {
330
331 use crate::{ChannelOptionsBuilder, TetherAgentOptionsBuilder};
332
333 #[test]
340 fn default_receiver_channel() {
341 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
343 .auto_connect(false)
344 .build()
345 .expect("sorry, these tests require working localhost Broker");
346 let receiver = ChannelOptionsBuilder::create_receiver("one")
347 .build(&mut tether_agent)
348 .unwrap();
349 assert_eq!(receiver.name(), "one");
350 assert_eq!(receiver.generated_topic(), "+/one/#");
351 }
352
353 #[test]
354 fn default_channel_receiver_with_agent_custom_id() {
359 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
361 .auto_connect(false)
362 .id(Some("verySpecialGroup"))
363 .build()
364 .expect("sorry, these tests require working localhost Broker");
365 let receiver = ChannelOptionsBuilder::create_receiver("one")
366 .build(&mut tether_agent)
367 .unwrap();
368 assert_eq!(receiver.name(), "one");
369 assert_eq!(receiver.generated_topic(), "+/one/#");
370 }
371
372 #[test]
373 fn default_channel_sender() {
374 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
375 .auto_connect(false)
376 .build()
377 .expect("sorry, these tests require working localhost Broker");
378 let channel = ChannelOptionsBuilder::create_sender("two")
379 .build(&mut tether_agent)
380 .unwrap();
381 assert_eq!(channel.name(), "two");
382 assert_eq!(channel.generated_topic(), "tester/two");
383 }
384
385 #[test]
386 fn sender_channel_default_but_agent_id_custom() {
390 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
391 .auto_connect(false)
392 .id(Some("specialCustomGrouping"))
393 .build()
394 .expect("sorry, these tests require working localhost Broker");
395 let channel = ChannelOptionsBuilder::create_sender("somethingStandard")
396 .build(&mut tether_agent)
397 .unwrap();
398 assert_eq!(channel.name(), "somethingStandard");
399 assert_eq!(
400 channel.generated_topic(),
401 "tester/somethingStandard/specialCustomGrouping"
402 );
403 }
404
405 #[test]
406 fn receiver_id_andor_role() {
407 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
408 .auto_connect(false)
409 .build()
410 .expect("sorry, these tests require working localhost Broker");
411
412 let receive_role_only = ChannelOptionsBuilder::create_receiver("theChannel")
413 .role(Some("specificRole"))
414 .build(&mut tether_agent)
415 .unwrap();
416 assert_eq!(receive_role_only.name(), "theChannel");
417 assert_eq!(
418 receive_role_only.generated_topic(),
419 "specificRole/theChannel/#"
420 );
421
422 let receiver_id_only = ChannelOptionsBuilder::create_receiver("theChannel")
423 .id(Some("specificID"))
424 .build(&mut tether_agent)
425 .unwrap();
426 assert_eq!(receiver_id_only.name(), "theChannel");
427 assert_eq!(
428 receiver_id_only.generated_topic(),
429 "+/theChannel/specificID"
430 );
431
432 let receiver_both_custom = ChannelOptionsBuilder::create_receiver("theChannel")
433 .id(Some("specificID"))
434 .role(Some("specificRole"))
435 .build(&mut tether_agent)
436 .unwrap();
437 assert_eq!(receiver_both_custom.name(), "theChannel");
438 assert_eq!(
439 receiver_both_custom.generated_topic(),
440 "specificRole/theChannel/specificID"
441 );
442 }
443
444 #[test]
445 fn receiver_specific_id_andor_role_with_channel_name() {
450 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
451 .auto_connect(false)
452 .build()
453 .expect("sorry, these tests require working localhost Broker");
454
455 let receiver_role_only = ChannelOptionsBuilder::create_receiver("theChannel")
456 .role(Some("specificRole"))
457 .build(&mut tether_agent)
458 .unwrap();
459 assert_eq!(receiver_role_only.name(), "theChannel");
460 assert_eq!(
461 receiver_role_only.generated_topic(),
462 "specificRole/theChannel/#"
463 );
464
465 let receiver_id_only = ChannelOptionsBuilder::create_receiver("theChannel")
466 .id(Some("specificID"))
467 .build(&mut tether_agent)
468 .unwrap();
469 assert_eq!(receiver_id_only.name(), "theChannel");
470 assert_eq!(
471 receiver_id_only.generated_topic(),
472 "+/theChannel/specificID"
473 );
474
475 let receiver_both = ChannelOptionsBuilder::create_receiver("theChannel")
476 .id(Some("specificID"))
477 .role(Some("specificRole"))
478 .build(&mut tether_agent)
479 .unwrap();
480 assert_eq!(receiver_both.name(), "theChannel");
481 assert_eq!(
482 receiver_both.generated_topic(),
483 "specificRole/theChannel/specificID"
484 );
485 }
486
487 #[test]
488 fn receiver_specific_id_andor_role_no_channel_name() {
493 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
494 .auto_connect(false)
495 .build()
496 .expect("sorry, these tests require working localhost Broker");
497
498 let receiver_only_chanel_name_none = ChannelOptionsBuilder::create_receiver("theChannel")
499 .name(Some("+"))
500 .build(&mut tether_agent)
501 .unwrap();
502 assert_eq!(receiver_only_chanel_name_none.name(), "theChannel");
503 assert_eq!(receiver_only_chanel_name_none.generated_topic(), "+/+/#");
504
505 let receiver_role_only = ChannelOptionsBuilder::create_receiver("theChannel")
506 .name(Some("+"))
507 .role(Some("specificRole"))
508 .build(&mut tether_agent)
509 .unwrap();
510 assert_eq!(receiver_role_only.name(), "theChannel");
511 assert_eq!(receiver_role_only.generated_topic(), "specificRole/+/#");
512
513 let receiver_id_only = ChannelOptionsBuilder::create_receiver("theChannel")
514 .any_channel() .id(Some("specificID"))
517 .build(&mut tether_agent)
518 .unwrap();
519 assert_eq!(receiver_id_only.name(), "theChannel");
520 assert_eq!(receiver_id_only.generated_topic(), "+/+/specificID");
521
522 let receiver_both = ChannelOptionsBuilder::create_receiver("theChannel")
523 .name(Some("+"))
524 .id(Some("specificID"))
525 .role(Some("specificRole"))
526 .build(&mut tether_agent)
527 .unwrap();
528 assert_eq!(receiver_both.name(), "theChannel");
529 assert_eq!(receiver_both.generated_topic(), "specificRole/+/specificID");
530 }
531
532 #[test]
533 fn any_name_but_specify_role() {
534 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
537 .auto_connect(false)
538 .build()
539 .expect("sorry, these tests require working localhost Broker");
540
541 let receiver_any_channel = ChannelOptionsBuilder::create_receiver("aTest")
542 .any_channel()
543 .build(&mut tether_agent)
544 .unwrap();
545
546 assert_eq!(receiver_any_channel.name(), "aTest");
547 assert_eq!(receiver_any_channel.generated_topic(), "+/+/#");
548
549 let receiver_specify_role = ChannelOptionsBuilder::create_receiver("aTest")
550 .any_channel()
551 .role(Some("brain"))
552 .build(&mut tether_agent)
553 .unwrap();
554
555 assert_eq!(receiver_specify_role.name(), "aTest");
556 assert_eq!(receiver_specify_role.generated_topic(), "brain/+/#");
557 }
558
559 #[test]
560 fn sender_custom() {
561 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
562 .auto_connect(false)
563 .build()
564 .expect("sorry, these tests require working localhost Broker");
565
566 let sender_custom_role = ChannelOptionsBuilder::create_sender("theChannelSender")
567 .role(Some("customRole"))
568 .build(&mut tether_agent)
569 .unwrap();
570 assert_eq!(sender_custom_role.name(), "theChannelSender");
571 assert_eq!(
572 sender_custom_role.generated_topic(),
573 "customRole/theChannelSender"
574 );
575
576 let sender_custom_id = ChannelOptionsBuilder::create_sender("theChannelSender")
577 .id(Some("customID"))
578 .build(&mut tether_agent)
579 .unwrap();
580 assert_eq!(sender_custom_id.name(), "theChannelSender");
581 assert_eq!(
582 sender_custom_id.generated_topic(),
583 "tester/theChannelSender/customID"
584 );
585
586 let sender_custom_both = ChannelOptionsBuilder::create_sender("theChannelSender")
587 .role(Some("customRole"))
588 .id(Some("customID"))
589 .build(&mut tether_agent)
590 .unwrap();
591 assert_eq!(sender_custom_both.name(), "theChannelSender");
592 assert_eq!(
593 sender_custom_both.generated_topic(),
594 "customRole/theChannelSender/customID"
595 );
596 }
597
598 #[test]
599 fn receiver_manual_topics() {
600 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
601 .auto_connect(false)
602 .build()
603 .expect("sorry, these tests require working localhost Broker");
604
605 let receiver_all = ChannelOptionsBuilder::create_receiver("everything")
606 .topic(Some("#"))
607 .build(&mut tether_agent)
608 .unwrap();
609 assert_eq!(receiver_all.name(), "everything");
610 assert_eq!(receiver_all.generated_topic(), "#");
611
612 let receiver_nontether = ChannelOptionsBuilder::create_receiver("weird")
613 .topic(Some("foo/bar/baz/one/two/three"))
614 .build(&mut tether_agent)
615 .unwrap();
616 assert_eq!(receiver_nontether.name(), "weird");
617 assert_eq!(
618 receiver_nontether.generated_topic(),
619 "foo/bar/baz/one/two/three"
620 );
621 }
622}