1use anyhow::anyhow;
2use log::{debug, error, info, warn};
3
4use crate::{
5 definitions::{InputPlugDefinition, OutputPlugDefinition, PlugDefinitionCommon},
6 three_part_topic::ThreePartTopic,
7 PlugDefinition, TetherAgent,
8};
9
10use super::three_part_topic::TetherOrCustomTopic;
11
12pub struct InputPlugOptions {
13 plug_name: String,
14 qos: Option<i32>,
15 override_subscribe_role: Option<String>,
16 override_subscribe_id: Option<String>,
17 override_subscribe_plug_name: Option<String>,
18 override_topic: Option<String>,
19}
20
21pub struct OutputPlugOptions {
22 plug_name: String,
23 qos: Option<i32>,
24 override_publish_role: Option<String>,
25 override_publish_id: Option<String>,
26 override_topic: Option<String>,
27 retain: Option<bool>,
28}
29
30pub enum PlugOptionsBuilder {
35 InputPlugOptions(InputPlugOptions),
36 OutputPlugOptions(OutputPlugOptions),
37}
38
39impl PlugOptionsBuilder {
40 pub fn create_input(name: &str) -> PlugOptionsBuilder {
41 PlugOptionsBuilder::InputPlugOptions(InputPlugOptions {
42 plug_name: String::from(name),
43 override_subscribe_id: None,
44 override_subscribe_role: None,
45 override_subscribe_plug_name: None,
46 override_topic: None,
47 qos: None,
48 })
49 }
50
51 pub fn create_output(name: &str) -> PlugOptionsBuilder {
52 PlugOptionsBuilder::OutputPlugOptions(OutputPlugOptions {
53 plug_name: String::from(name),
54 override_publish_id: None,
55 override_publish_role: None,
56 override_topic: None,
57 qos: None,
58 retain: None,
59 })
60 }
61
62 pub fn qos(mut self, qos: Option<i32>) -> Self {
63 match &mut self {
64 PlugOptionsBuilder::InputPlugOptions(s) => s.qos = qos,
65 PlugOptionsBuilder::OutputPlugOptions(s) => s.qos = qos,
66 };
67 self
68 }
69
70 pub fn role(mut self, role: Option<&str>) -> Self {
80 match &mut self {
81 PlugOptionsBuilder::InputPlugOptions(s) => {
82 if s.override_topic.is_some() {
83 error!("Override topic was also provided; this will take precedence");
84 } else {
85 s.override_subscribe_role = role.map(|s| s.into());
86 }
87 }
88 PlugOptionsBuilder::OutputPlugOptions(s) => {
89 if s.override_topic.is_some() {
90 error!("Override topic was also provided; this will take precedence");
91 } else {
92 s.override_publish_role = role.map(|s| s.into());
93 }
94 }
95 };
96 self
97 }
98
99 pub fn id(mut self, id: Option<&str>) -> Self {
110 match &mut self {
111 PlugOptionsBuilder::InputPlugOptions(s) => {
112 if s.override_topic.is_some() {
113 error!("Override topic was also provided; this will take precedence");
114 } else {
115 s.override_subscribe_id = id.map(|s| s.into());
116 }
117 }
118 PlugOptionsBuilder::OutputPlugOptions(s) => {
119 if s.override_topic.is_some() {
120 error!("Override topic was also provided; this will take precedence");
121 } else {
122 s.override_publish_id = id.map(|s| s.into());
123 }
124 }
125 };
126 self
127 }
128
129 pub fn name(mut self, override_plug_name: Option<&str>) -> Self {
140 match &mut self {
141 PlugOptionsBuilder::InputPlugOptions(opt) => {
142 if opt.override_topic.is_some() {
143 error!("Override topic was also provided; this will take precedence");
144 }
145 if let Some(s) = override_plug_name {
146 if s.eq("+") {
147 info!(
148 "Plug Name part given is a wildcard; subscribe topic will use this but (internally) Plug Name will remain \"{}\"", &opt.plug_name
149 );
150 } else {
151 error!("Input Plugs cannot change their name after ::create_input constructor EXCEPT for wildcard \"+\"");
152 }
153 opt.override_subscribe_plug_name = override_plug_name.map(|s| s.into());
154 } else {
155 debug!("Override plug name set to None; will use original name \"{}\" given in ::create_input constructor", opt.plug_name);
156 }
157 }
158 PlugOptionsBuilder::OutputPlugOptions(_) => {
159 error!(
160 "Output Plugs cannot change their name part after ::create_output constructor"
161 );
162 }
163 };
164 self
165 }
166
167 pub fn any_plug(mut self) -> Self {
177 match &mut self {
178 PlugOptionsBuilder::InputPlugOptions(opt) => {
179 opt.override_subscribe_plug_name = Some("+".into());
180 }
181 PlugOptionsBuilder::OutputPlugOptions(_) => {
182 error!(
183 "Output Plugs cannot change their name part after ::create_output constructor"
184 );
185 }
186 }
187 self
188 }
189
190 pub fn topic(mut self, override_topic: Option<&str>) -> Self {
198 match override_topic {
199 Some(t) => {
200 if TryInto::<ThreePartTopic>::try_into(t).is_ok() {
201 info!("Custom topic passes Three Part Topic validation");
202 } else if t == "#" {
203 info!("Wildcard \"#\" custom topics are not Three Part Topics but are valid");
204 } else {
205 warn!(
206 "Could not convert \"{}\" into Tether 3 Part Topic; presumably you know what you're doing!",
207 t
208 );
209 }
210 match &mut self {
211 PlugOptionsBuilder::InputPlugOptions(s) => s.override_topic = Some(t.into()),
212 PlugOptionsBuilder::OutputPlugOptions(s) => s.override_topic = Some(t.into()),
213 };
214 }
215 None => {
216 match &mut self {
217 PlugOptionsBuilder::InputPlugOptions(s) => s.override_topic = None,
218 PlugOptionsBuilder::OutputPlugOptions(s) => s.override_topic = None,
219 };
220 }
221 }
222 self
223 }
224
225 pub fn retain(mut self, should_retain: Option<bool>) -> Self {
226 match &mut self {
227 Self::InputPlugOptions(_) => {
228 error!("Cannot set retain flag on Input Plug / subscription");
229 }
230 Self::OutputPlugOptions(s) => {
231 s.retain = should_retain;
232 }
233 }
234 self
235 }
236
237 pub fn build(self, tether_agent: &mut TetherAgent) -> anyhow::Result<PlugDefinition> {
240 match self {
241 Self::InputPlugOptions(plug_options) => {
242 let tpt: TetherOrCustomTopic = match plug_options.override_topic {
243 Some(custom) => TetherOrCustomTopic::Custom(custom),
244 None => {
245 debug!("Not a custom topic; provided overrides: role = {:?}, id = {:?}, name = {:?}", plug_options.override_subscribe_role, plug_options.override_subscribe_id, plug_options.override_subscribe_plug_name);
246
247 TetherOrCustomTopic::Tether(ThreePartTopic::new_for_subscribe(
248 &plug_options.plug_name,
249 plug_options.override_subscribe_role.as_deref(),
250 plug_options.override_subscribe_id.as_deref(),
251 plug_options.override_subscribe_plug_name.as_deref(),
252 ))
253 }
254 };
255 let plug_definition =
256 InputPlugDefinition::new(&plug_options.plug_name, tpt, plug_options.qos);
257 if let Some(client) = &tether_agent.client {
258 match client.subscribe(
259 plug_definition.topic_str(),
260 match plug_definition.qos() {
261 0 => rumqttc::QoS::AtMostOnce,
262 1 => rumqttc::QoS::AtLeastOnce,
263 2 => rumqttc::QoS::ExactlyOnce,
264 _ => rumqttc::QoS::AtLeastOnce,
265 },
266 ) {
267 Ok(res) => {
268 debug!("This topic was fine: \"{}\"", plug_definition.topic_str());
269 debug!("Server respond OK for subscribe: {res:?}");
270 Ok(PlugDefinition::InputPlug(plug_definition))
271 }
272 Err(_e) => Err(anyhow!("ClientError")),
273 }
274 } else {
275 Err(anyhow!("Client not available for subscription"))
276 }
277 }
278 Self::OutputPlugOptions(plug_options) => {
279 let tpt: TetherOrCustomTopic = match plug_options.override_topic {
280 Some(custom) => TetherOrCustomTopic::Custom(custom),
281 None => TetherOrCustomTopic::Tether(ThreePartTopic::new_for_publish(
282 plug_options.override_publish_role.as_deref(),
283 plug_options.override_publish_id.as_deref(),
284 &plug_options.plug_name,
285 tether_agent,
286 )),
287 };
288
289 let plug_definition = OutputPlugDefinition::new(
290 &plug_options.plug_name,
291 tpt,
292 plug_options.qos,
293 plug_options.retain,
294 );
295 Ok(PlugDefinition::OutputPlug(plug_definition))
296 }
297 }
298 }
299}
300
301#[cfg(test)]
302mod tests {
303
304 use crate::{PlugOptionsBuilder, TetherAgentOptionsBuilder};
305
306 #[test]
313 fn default_input_plug() {
314 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
316 .build()
317 .expect("sorry, these tests require working localhost Broker");
318 let input = PlugOptionsBuilder::create_input("one")
319 .build(&mut tether_agent)
320 .unwrap();
321 assert_eq!(input.name(), "one");
322 assert_eq!(input.topic(), "+/+/one");
323 }
324
325 #[test]
326 fn default_input_plug_with_agent_custom_id() {
331 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
333 .id(Some("verySpecialGroup"))
334 .build()
335 .expect("sorry, these tests require working localhost Broker");
336 let input = PlugOptionsBuilder::create_input("one")
337 .build(&mut tether_agent)
338 .unwrap();
339 assert_eq!(input.name(), "one");
340 assert_eq!(input.topic(), "+/+/one");
341 }
342
343 #[test]
344 fn default_output_plug() {
345 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
346 .build()
347 .expect("sorry, these tests require working localhost Broker");
348 let input = PlugOptionsBuilder::create_output("two")
349 .build(&mut tether_agent)
350 .unwrap();
351 assert_eq!(input.name(), "two");
352 assert_eq!(input.topic(), "tester/any/two");
353 }
354
355 #[test]
356 fn output_plug_default_but_agent_id_custom() {
360 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
361 .id(Some("specialCustomGrouping"))
362 .build()
363 .expect("sorry, these tests require working localhost Broker");
364 let input = PlugOptionsBuilder::create_output("somethingStandard")
365 .build(&mut tether_agent)
366 .unwrap();
367 assert_eq!(input.name(), "somethingStandard");
368 assert_eq!(
369 input.topic(),
370 "tester/specialCustomGrouping/somethingStandard"
371 );
372 }
373
374 #[test]
375 fn input_id_andor_role() {
376 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
377 .build()
378 .expect("sorry, these tests require working localhost Broker");
379
380 let input_role_only = PlugOptionsBuilder::create_input("thePlug")
381 .role(Some("specificRole"))
382 .build(&mut tether_agent)
383 .unwrap();
384 assert_eq!(input_role_only.name(), "thePlug");
385 assert_eq!(input_role_only.topic(), "specificRole/+/thePlug");
386
387 let input_id_only = PlugOptionsBuilder::create_input("thePlug")
388 .id(Some("specificID"))
389 .build(&mut tether_agent)
390 .unwrap();
391 assert_eq!(input_id_only.name(), "thePlug");
392 assert_eq!(input_id_only.topic(), "+/specificID/thePlug");
393
394 let input_both = PlugOptionsBuilder::create_input("thePlug")
395 .id(Some("specificID"))
396 .role(Some("specificRole"))
397 .build(&mut tether_agent)
398 .unwrap();
399 assert_eq!(input_both.name(), "thePlug");
400 assert_eq!(input_both.topic(), "specificRole/specificID/thePlug");
401 }
402
403 #[test]
404 fn input_specific_id_andor_role_with_plugname() {
409 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
410 .build()
411 .expect("sorry, these tests require working localhost Broker");
412
413 let input_role_only = PlugOptionsBuilder::create_input("thePlug")
414 .role(Some("specificRole"))
415 .build(&mut tether_agent)
416 .unwrap();
417 assert_eq!(input_role_only.name(), "thePlug");
418 assert_eq!(input_role_only.topic(), "specificRole/+/thePlug");
419
420 let input_id_only = PlugOptionsBuilder::create_input("thePlug")
421 .id(Some("specificID"))
422 .build(&mut tether_agent)
423 .unwrap();
424 assert_eq!(input_id_only.name(), "thePlug");
425 assert_eq!(input_id_only.topic(), "+/specificID/thePlug");
426
427 let input_both = PlugOptionsBuilder::create_input("thePlug")
428 .id(Some("specificID"))
429 .role(Some("specificRole"))
430 .build(&mut tether_agent)
431 .unwrap();
432 assert_eq!(input_both.name(), "thePlug");
433 assert_eq!(input_both.topic(), "specificRole/specificID/thePlug");
434 }
435
436 #[test]
437 fn input_specific_id_andor_role_no_plugname() {
442 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
443 .build()
444 .expect("sorry, these tests require working localhost Broker");
445
446 let input_only_plugname_none = PlugOptionsBuilder::create_input("thePlug")
447 .name(Some("+"))
448 .build(&mut tether_agent)
449 .unwrap();
450 assert_eq!(input_only_plugname_none.name(), "thePlug");
451 assert_eq!(input_only_plugname_none.topic(), "+/+/+");
452
453 let input_role_only = PlugOptionsBuilder::create_input("thePlug")
454 .name(Some("+"))
455 .role(Some("specificRole"))
456 .build(&mut tether_agent)
457 .unwrap();
458 assert_eq!(input_role_only.name(), "thePlug");
459 assert_eq!(input_role_only.topic(), "specificRole/+/+");
460
461 let input_id_only = PlugOptionsBuilder::create_input("thePlug")
462 .any_plug() .id(Some("specificID"))
465 .build(&mut tether_agent)
466 .unwrap();
467 assert_eq!(input_id_only.name(), "thePlug");
468 assert_eq!(input_id_only.topic(), "+/specificID/+");
469
470 let input_both = PlugOptionsBuilder::create_input("thePlug")
471 .name(Some("+"))
472 .id(Some("specificID"))
473 .role(Some("specificRole"))
474 .build(&mut tether_agent)
475 .unwrap();
476 assert_eq!(input_both.name(), "thePlug");
477 assert_eq!(input_both.topic(), "specificRole/specificID/+");
478 }
479
480 #[test]
481 fn output_custom() {
482 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
483 .build()
484 .expect("sorry, these tests require working localhost Broker");
485
486 let output_custom_role = PlugOptionsBuilder::create_output("theOutputPlug")
487 .role(Some("customRole"))
488 .build(&mut tether_agent)
489 .unwrap();
490 assert_eq!(output_custom_role.name(), "theOutputPlug");
491 assert_eq!(output_custom_role.topic(), "customRole/any/theOutputPlug");
492
493 let output_custom_id = PlugOptionsBuilder::create_output("theOutputPlug")
494 .id(Some("customID"))
495 .build(&mut tether_agent)
496 .unwrap();
497 assert_eq!(output_custom_id.name(), "theOutputPlug");
498 assert_eq!(output_custom_id.topic(), "tester/customID/theOutputPlug");
499
500 let output_custom_both = PlugOptionsBuilder::create_output("theOutputPlug")
501 .role(Some("customRole"))
502 .id(Some("customID"))
503 .build(&mut tether_agent)
504 .unwrap();
505 assert_eq!(output_custom_both.name(), "theOutputPlug");
506 assert_eq!(
507 output_custom_both.topic(),
508 "customRole/customID/theOutputPlug"
509 );
510 }
511
512 #[test]
513 fn input_manual_topics() {
514 let mut tether_agent = TetherAgentOptionsBuilder::new("tester")
515 .build()
516 .expect("sorry, these tests require working localhost Broker");
517
518 let input_all = PlugOptionsBuilder::create_input("everything")
519 .topic(Some("#"))
520 .build(&mut tether_agent)
521 .unwrap();
522 assert_eq!(input_all.name(), "everything");
523 assert_eq!(input_all.topic(), "#");
524
525 let input_nontether = PlugOptionsBuilder::create_input("weird")
526 .topic(Some("foo/bar/baz/one/two/three"))
527 .build(&mut tether_agent)
528 .unwrap();
529 assert_eq!(input_nontether.name(), "weird");
530 assert_eq!(input_nontether.topic(), "foo/bar/baz/one/two/three");
531 }
532}