pub struct TetherAgent { /* private fields */ }

Implementations§

source§

impl TetherAgent

source

pub fn client(&self) -> &Client

source

pub fn is_connected(&self) -> bool

source

pub fn role(&self) -> &str

source

pub fn id(&self) -> &str

source

pub fn description(&self) -> (&str, &str, &str)

Returns the Agent Role, ID (group) and Broker URI

Examples found in repository?
examples/custom_options.rs (line 29)
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
fn main() {
    let tether_agent = TetherAgentOptionsBuilder::new("example")
        .id(None)
        .host(Some("localhost"))
        .port(Some(1883))
        .username(Some("tether"))
        .password(Some("sp_ceB0ss!"))
        .build()
        .expect("failed to create Tether Agent");

    let output_plug = PlugOptionsBuilder::create_output("anOutput")
        .role(Some("pretendingToBeSomethingElse"))
        .qos(Some(2))
        .retain(Some(true))
        .build(&tether_agent);
    let input_wildcard_plug = PlugOptionsBuilder::create_input("everything")
        .topic(Some("#"))
        .build(&tether_agent);

    let input_customid_plug = PlugOptionsBuilder::create_input("someData")
        .role(None) // i.e., just use default
        .id(Some("specificIDonly"))
        .build(&tether_agent);

    println!("Agent looks like this: {:?}", tether_agent.description());
    let (role, id, _) = tether_agent.description();
    assert_eq!(role, "example");
    assert_eq!(id, "any"); // because we set None

    if let PlugDefinition::OutputPlug(p) = output_plug.unwrap() {
        println!("output plug: {:?}", p);
        assert_eq!(p.topic_str(), "pretendingToBeSomethingElse/any/anOutput");
    }

    println!("wildcard input plug: {:?}", input_wildcard_plug);
    println!("speific ID input plug: {:?}", input_customid_plug);
}
More examples
Hide additional examples
examples/username_password.rs (line 28)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn main() {
    println!("Rust Tether Agent: with username and password");

    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let tether_agent = TetherAgentOptionsBuilder::new("RustDemoAgent")
        .host(Some("10.112.10.10"))
        .username(Some("connected.space"))
        .password(Some("connected.space"))
        .build()
        .expect("Failed to initialise and connect");
    let (role, id, _) = tether_agent.description();
    info!("Created agent OK: {}, {}", role, id);

    let empty_message_output = PlugOptionsBuilder::create_output("nothing")
        .build(&tether_agent)
        .expect("failed to create output");
    let boolean_message_output = PlugOptionsBuilder::create_output("one")
        .build(&tether_agent)
        .expect("failed to create output");
    let custom_output = PlugOptionsBuilder::create_output("two")
        .build(&tether_agent)
        .expect("failed to create output");

    for i in 1..=10 {
        info!("#{i}: Sending empty message...");
        tether_agent.publish(&empty_message_output, None).unwrap();

        let bool = i % 2 == 0;
        info!("#{i}: Sending boolean message...");
        tether_agent
            .publish(&boolean_message_output, Some(&[bool.into()]))
            .unwrap();

        let custom_message = CustomStruct {
            foo: "hello".into(),
            bar: 0.42,
        };
        tether_agent
            .encode_and_publish(&custom_output, custom_message)
            .unwrap();

        thread::sleep(Duration::from_millis(1000))
    }
}
examples/publish.rs (line 25)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
fn main() {
    println!("Rust Tether Agent publish example");

    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let agent = TetherAgentOptionsBuilder::new("RustDemoAgent")
        .build()
        .expect("failed to connect Tether");
    let (role, id, _) = agent.description();
    info!("Created agent OK: {}, {}", role, id);

    let empty_message_output = PlugOptionsBuilder::create_output("nothing")
        .build(&agent)
        .expect("failed to create output");
    let boolean_message_output = PlugOptionsBuilder::create_output("one")
        .build(&agent)
        .expect("failed to create output");
    let custom_output = PlugOptionsBuilder::create_output("two")
        .topic(Some("custom/custom/two".into()))
        .build(&agent)
        .expect("failed to create output");
    let grouped_output_1 = PlugOptionsBuilder::create_output("one")
        .id(Some("groupMessages"))
        .build(&agent)
        .expect("failed to create output");
    let grouped_output_2 = PlugOptionsBuilder::create_output("two")
        .id(Some("groupMessages"))
        .build(&agent)
        .expect("failed to create output");

    for i in 1..=10 {
        info!("#{i}: Sending empty message...");
        agent.publish(&empty_message_output, None).unwrap();

        let bool = i % 2 == 0;
        info!("#{i}: Sending boolean message...");
        agent
            .publish(&boolean_message_output, Some(&[bool.into()]))
            .unwrap();

        info!("#{i}: Sending custom struct message...");
        let custom_message = CustomStruct {
            id: i,
            name: "hello".into(),
        };
        agent
            .encode_and_publish(&custom_output, custom_message)
            .unwrap();

        info!("#{i}: Sending grouped messages...");
        agent.publish(&grouped_output_1, None).unwrap();
        agent.publish(&grouped_output_2, None).unwrap();

        thread::sleep(Duration::from_millis(1000))
    }
}
source

pub fn broker_uri(&self) -> &str

Return the URI (protocol, IP address, port, path) that was used to connect to the MQTT broker

source

pub fn set_role(&mut self, role: &str)

source

pub fn set_id(&mut self, id: &str)

source

pub fn connect(&self) -> Result<(), Error>

source

pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Message)>

If a message is waiting return ThreePartTopic, Message (String, Message) Messages received on topics that are not parseable as Tether Three Part Topics will be returned with the complete Topic string instead

Examples found in repository?
examples/subscribe_threaded.rs (line 58)
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
fn main() {
    println!("Rust Tether Agent subscribe example");

    let tether_agent = Arc::new(Mutex::new(
        TetherAgentOptionsBuilder::new("RustDemoAgent")
            .id(Some("example"))
            .build()
            .expect("failed to init/connect"),
    ));

    match tether_agent.lock() {
        Ok(a) => {
            let _input_plug = PlugOptionsBuilder::create_input("one").build(&a);
        }
        Err(e) => {
            panic!("Failed to acquire lock for Tether Agent setup: {}", e);
        }
    };

    let (tx, rx) = mpsc::channel();

    let receiver_agent = Arc::clone(&tether_agent);
    thread::spawn(move || {
        println!("Checking messages every 1s, until 10 messages received...");

        let mut message_count = 0;
        // let mut i = 0;

        loop {
            // i += 1;
            // println!("#{i}: Checking messages...");
            match receiver_agent.try_lock() {
                Ok(a) => {
                    if let Some((topic, _message)) = a.check_messages() {
                        message_count += 1;
                        println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic:?}",);
                        tx.send(format!("received message #{message_count}"))
                            .expect("failed to send message via channel");
                    }
                }
                Err(e) => {
                    println!("Failed to acquire lock: {}", e);
                }
            }
            thread::sleep(Duration::from_millis(1));
        }
    });

    let mut main_thread_received_count = 0;

    loop {
        println!("Main thread sleep...");
        for rx in rx.try_iter() {
            main_thread_received_count += 1;
            println!(
                "<<<<<<<< MAIN THREAD: received {} (count: {})",
                rx, main_thread_received_count
            );
        }
        if main_thread_received_count >= 10 {
            println!("We're done!");
            std::process::exit(0);
        }
        std::thread::sleep(Duration::from_secs(1));
    }
}
More examples
Hide additional examples
examples/subscribe_publish_threaded.rs (line 78)
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
fn main() {
    let check_interval = 0.01;
    let publish_count_target = 100;
    let publish_interval = 0.1;

    println!("Rust Tether Agent threaded publish-while-consuming example");

    let tether_agent = Arc::new(Mutex::new(
        TetherAgentOptionsBuilder::new("RustDemoAgent")
            .id(Some("example"))
            .build()
            .expect("failed to init/connect"),
    ));

    println!("Set up tether agent OK");

    #[allow(unused_assignments)]
    let mut output_plug = None;

    // Here we call .lock() because it is OK to block while "setting up", connecting
    if let Ok(a) = tether_agent.lock() {
        let _input_plug = PlugOptionsBuilder::create_input("one").build(&a);
        output_plug = Some(
            PlugOptionsBuilder::create_output("one")
                .build(&a)
                .expect("failed to create output plug"),
        );
    } else {
        panic!("Error setting up Tether Agent!");
    }

    let receiver_agent = Arc::clone(&tether_agent);
    thread::spawn(move || {
        println!("Checking messages every {check_interval}s...");

        let mut i = 0;
        let mut count_messages_received = 0;

        /*
         Infinite loop. But because we never join the threads, this thread will terminate
         as soon as the main thread does.
        */
        loop {
            i += 1;
            println!("CHECKING LOOP: Checking messages attempt #{i}...");

            /*
              Here we call try_lock() because we do not want to block
              if the Agent is currently locked by another thread.
              Just print a message, wait and try again later.
            */
            match receiver_agent.try_lock() {
                Ok(a) => {
                    if let Some((topic, _message)) = a.check_messages() {
                        count_messages_received += 1;
                        println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic:?}; Now has {count_messages_received} messages");
                    }
                }
                Err(e) => {
                    println!("CHECKING LOOP: Failed to acquire lock: {}", e);
                }
            }
            thread::sleep(Duration::from_secs_f32(check_interval));
        }
    });

    let sending_agent = Arc::clone(&tether_agent);
    println!(
        "Sending a message, every {}s, exactly {}x times...",
        publish_interval, publish_count_target
    );
    let mut count_messages_sent = 0;
    for i in 1..=publish_count_target {
        println!("MAIN THREAD LOOP: Send attempt #{i}");
        /*
          In this particular case, lock() is preferable to try_lock() because
          we are not doing anything else on this thread. Waiting (blocking)
          to acquire the lock
          is fine; the other thread will let it go soon.
        */
        match sending_agent.lock() {
            Ok(a) => {
                count_messages_sent += 1;
                if let Some(plug) = &output_plug {
                    a.publish(plug, Some(&[0])).expect("Failed to publish");
                    println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
                }
            }
            Err(e) => {
                panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
            }
        }
        thread::sleep(Duration::from_secs_f32(publish_interval));
    }
}
examples/subscribe.rs (line 67)
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
fn main() {
    println!("Rust Tether Agent subscribe example");

    let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let tether_agent = TetherAgentOptionsBuilder::new("RustDemoAgent")
        .id(Some("example"))
        .build()
        .expect("failed to init Tether agent");

    let input_one = PlugOptionsBuilder::create_input("one")
        .build(&tether_agent)
        .expect("failed to create input");
    debug!("input one {} = {}", input_one.name(), input_one.topic());
    let input_two = PlugOptionsBuilder::create_input("two")
        .role(Some("specific"))
        .build(&tether_agent)
        .expect("failed to create input");
    debug!("input two {} = {}", input_two.name(), input_two.topic());
    let input_empty = PlugOptionsBuilder::create_input("nothing")
        .build(&tether_agent)
        .expect("failed to create input");

    let input_everything = PlugOptionsBuilder::create_input("everything")
        .topic(Some("#"))
        .build(&tether_agent)
        .expect("failed to create input");

    let input_specify_id = PlugOptionsBuilder::create_input("groupMessages")
        .id(Some("someGroup"))
        .name(None)
        .build(&tether_agent)
        .expect("failed to create input");

    debug!(
        "input everything {} = {}",
        input_everything.name(),
        input_everything.topic()
    );

    info!("Checking messages every 1s, 10x...");

    for i in 1..10 {
        info!("#{i}: Checking for messages...");
        while let Some((topic, message)) = tether_agent.check_messages() {
            debug!(
                "........ Received a message topic {} => topic parts {:?}",
                message.topic(),
                topic
            );

            if input_one.matches(&topic) {
                info!(
                            "******** INPUT ONE:\n Received a message for plug named \"{}\" on topic {} with length {} bytes",
                            input_one.name(),
                            message.topic(),
                            message.payload().len()
                        );
                assert_eq!(parse_plug_name(message.topic()), Some("one"));
            }
            if input_two.matches(&topic) {
                info!(
                        "******** INPUT TWO:\n Received a message for plug named \"{}\" on topic {} with length {} bytes",
                        input_two.name(),
                        message.topic(),
                        message.payload().len()
                    );
                assert_eq!(parse_plug_name(message.topic()), Some("two"));
                assert_ne!(parse_plug_name(message.topic()), Some("one"));

                // Notice how you must give the from_slice function a type so it knows what to expect
                let decoded = from_slice::<CustomMessage>(&message.payload());
                match decoded {
                    Ok(d) => {
                        info!("Yes, we decoded the MessagePack payload as: {:?}", d);
                        let CustomMessage { name, id } = d;
                        debug!("Name is {} and ID is {}", name, id);
                    }
                    Err(e) => {
                        warn!("Failed to decode the payload: {}", e)
                    }
                };
            }
            if input_empty.matches(&topic) {
                info!(
                        "******** EMPTY MESSAGE:\n Received a message for plug named \"{}\" on topic {} with length {} bytes",
                        input_empty.name(),
                        message.topic(),
                        message.payload().len()
                    );
                assert_eq!(parse_plug_name(message.topic()), Some("nothing"));
            }
            if input_everything.matches(&topic) {
                info!(
                    "******** EVERYTHING MATCHES HERE:\n Received a message for plug named \"{}\" on topic {} with length {} bytes",
                    input_everything.name(),
                    message.topic(),
                    message.payload().len()
                );
            }
            if input_specify_id.matches(&topic) {
                info!("******** ID MATCH:\n Should match any role and plug name, but only messages with ID \"groupMessages\"");
                info!(
                    "\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
                    input_specify_id.name(),
                    message.topic(),
                    message.payload().len()
                );
                assert_eq!(parse_agent_id(message.topic()), Some("groupMessages"));
            }
        }

        thread::sleep(Duration::from_millis(1000))
    }
}
source

pub fn publish( &self, plug_definition: &PlugDefinition, payload: Option<&[u8]> ) -> Result<()>

Given a plug definition and a raw (u8 buffer) payload, generate a message on an appropriate topic and with the QOS specified in the Plug Definition

Examples found in repository?
examples/username_password.rs (line 43)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn main() {
    println!("Rust Tether Agent: with username and password");

    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let tether_agent = TetherAgentOptionsBuilder::new("RustDemoAgent")
        .host(Some("10.112.10.10"))
        .username(Some("connected.space"))
        .password(Some("connected.space"))
        .build()
        .expect("Failed to initialise and connect");
    let (role, id, _) = tether_agent.description();
    info!("Created agent OK: {}, {}", role, id);

    let empty_message_output = PlugOptionsBuilder::create_output("nothing")
        .build(&tether_agent)
        .expect("failed to create output");
    let boolean_message_output = PlugOptionsBuilder::create_output("one")
        .build(&tether_agent)
        .expect("failed to create output");
    let custom_output = PlugOptionsBuilder::create_output("two")
        .build(&tether_agent)
        .expect("failed to create output");

    for i in 1..=10 {
        info!("#{i}: Sending empty message...");
        tether_agent.publish(&empty_message_output, None).unwrap();

        let bool = i % 2 == 0;
        info!("#{i}: Sending boolean message...");
        tether_agent
            .publish(&boolean_message_output, Some(&[bool.into()]))
            .unwrap();

        let custom_message = CustomStruct {
            foo: "hello".into(),
            bar: 0.42,
        };
        tether_agent
            .encode_and_publish(&custom_output, custom_message)
            .unwrap();

        thread::sleep(Duration::from_millis(1000))
    }
}
More examples
Hide additional examples
examples/publish.rs (line 49)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
fn main() {
    println!("Rust Tether Agent publish example");

    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let agent = TetherAgentOptionsBuilder::new("RustDemoAgent")
        .build()
        .expect("failed to connect Tether");
    let (role, id, _) = agent.description();
    info!("Created agent OK: {}, {}", role, id);

    let empty_message_output = PlugOptionsBuilder::create_output("nothing")
        .build(&agent)
        .expect("failed to create output");
    let boolean_message_output = PlugOptionsBuilder::create_output("one")
        .build(&agent)
        .expect("failed to create output");
    let custom_output = PlugOptionsBuilder::create_output("two")
        .topic(Some("custom/custom/two".into()))
        .build(&agent)
        .expect("failed to create output");
    let grouped_output_1 = PlugOptionsBuilder::create_output("one")
        .id(Some("groupMessages"))
        .build(&agent)
        .expect("failed to create output");
    let grouped_output_2 = PlugOptionsBuilder::create_output("two")
        .id(Some("groupMessages"))
        .build(&agent)
        .expect("failed to create output");

    for i in 1..=10 {
        info!("#{i}: Sending empty message...");
        agent.publish(&empty_message_output, None).unwrap();

        let bool = i % 2 == 0;
        info!("#{i}: Sending boolean message...");
        agent
            .publish(&boolean_message_output, Some(&[bool.into()]))
            .unwrap();

        info!("#{i}: Sending custom struct message...");
        let custom_message = CustomStruct {
            id: i,
            name: "hello".into(),
        };
        agent
            .encode_and_publish(&custom_output, custom_message)
            .unwrap();

        info!("#{i}: Sending grouped messages...");
        agent.publish(&grouped_output_1, None).unwrap();
        agent.publish(&grouped_output_2, None).unwrap();

        thread::sleep(Duration::from_millis(1000))
    }
}
examples/error_handling.rs (line 65)
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
fn main() {
    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let bad_tether_agent = TetherAgentOptionsBuilder::new("tester")
        .host(Some("tether-io.dev"))
        .username(Some("bla"))
        .password(Some("bla"))
        .build();
    match bad_tether_agent {
        Ok(_agent) => {
            panic!("Connection: This shouldn't work!");
        }
        Err(e) => warn!("Got a connection error as expected: {e:?}"),
    }

    let disconnected = TetherAgentOptionsBuilder::new("tester")
        .host(Some("tether-io.dev"))
        .auto_connect(false)
        .build()
        .expect("this ought initialise but not conect");

    let output = PlugOptionsBuilder::create_output("values")
        .build(&disconnected)
        .expect("this output should be valid always");

    let an_array = &vec![0, 1, 2, 3];
    match disconnected.encode_and_publish(&output, an_array) {
        Ok(()) => panic!("Publish on disconnected agent: This shouldn't work!"),
        Err(e) => warn!("Got a not-connected error as expected: {e:?}"),
    }

    let input_on_disconnected = PlugOptionsBuilder::create_input("something");
    match input_on_disconnected.build(&disconnected) {
        Ok(_) => panic!("Input plug subscribe on disconnected client: This shouldn't work!"),
        Err(e) => warn!("Got a subscribe failure error as expected: {e:?}"),
    }

    // Rust's type-checking kind of prevents this happening at all!
    // let bad_payload: &[u8; 9] = &[0x87, 0xA3, 0x69, 0x6E, 0x74, 0x01, 0xA5, 0x66, 0x6C];
    // match working_tether_agent.encode_and_publish::<CustomStruct>(&output, bad_payload) {
    //     Ok(()) => panic!("Encoding: This shouldn't work!"),
    //     Err(e) => warn!("Got an encoding error as expected: {e:?}"),
    // }

    let working_tether_agent = TetherAgentOptionsBuilder::new("tester")
        .build()
        .expect("this should connect to local server");

    let bad_payload: &[u8; 9] = &[0x87, 0xA3, 0x69, 0x6E, 0x74, 0x01, 0xA5, 0x66, 0x6C];
    working_tether_agent
        .publish(&output, Some(bad_payload))
        .expect("This will produce an error when DECODING, but not checked by library");

    let bad_topic_input = PlugOptionsBuilder::create_input("something").topic(Some("*/#/house+"));
    match bad_topic_input.build(&working_tether_agent) {
        Ok(_) => panic!("Weird topic: This shouldn't work!"),
        Err(e) => warn!("Got a subscribe error (bad topic) as expected: {e:?}"),
    }
}
examples/subscribe_publish_threaded.rs (line 109)
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
fn main() {
    let check_interval = 0.01;
    let publish_count_target = 100;
    let publish_interval = 0.1;

    println!("Rust Tether Agent threaded publish-while-consuming example");

    let tether_agent = Arc::new(Mutex::new(
        TetherAgentOptionsBuilder::new("RustDemoAgent")
            .id(Some("example"))
            .build()
            .expect("failed to init/connect"),
    ));

    println!("Set up tether agent OK");

    #[allow(unused_assignments)]
    let mut output_plug = None;

    // Here we call .lock() because it is OK to block while "setting up", connecting
    if let Ok(a) = tether_agent.lock() {
        let _input_plug = PlugOptionsBuilder::create_input("one").build(&a);
        output_plug = Some(
            PlugOptionsBuilder::create_output("one")
                .build(&a)
                .expect("failed to create output plug"),
        );
    } else {
        panic!("Error setting up Tether Agent!");
    }

    let receiver_agent = Arc::clone(&tether_agent);
    thread::spawn(move || {
        println!("Checking messages every {check_interval}s...");

        let mut i = 0;
        let mut count_messages_received = 0;

        /*
         Infinite loop. But because we never join the threads, this thread will terminate
         as soon as the main thread does.
        */
        loop {
            i += 1;
            println!("CHECKING LOOP: Checking messages attempt #{i}...");

            /*
              Here we call try_lock() because we do not want to block
              if the Agent is currently locked by another thread.
              Just print a message, wait and try again later.
            */
            match receiver_agent.try_lock() {
                Ok(a) => {
                    if let Some((topic, _message)) = a.check_messages() {
                        count_messages_received += 1;
                        println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic:?}; Now has {count_messages_received} messages");
                    }
                }
                Err(e) => {
                    println!("CHECKING LOOP: Failed to acquire lock: {}", e);
                }
            }
            thread::sleep(Duration::from_secs_f32(check_interval));
        }
    });

    let sending_agent = Arc::clone(&tether_agent);
    println!(
        "Sending a message, every {}s, exactly {}x times...",
        publish_interval, publish_count_target
    );
    let mut count_messages_sent = 0;
    for i in 1..=publish_count_target {
        println!("MAIN THREAD LOOP: Send attempt #{i}");
        /*
          In this particular case, lock() is preferable to try_lock() because
          we are not doing anything else on this thread. Waiting (blocking)
          to acquire the lock
          is fine; the other thread will let it go soon.
        */
        match sending_agent.lock() {
            Ok(a) => {
                count_messages_sent += 1;
                if let Some(plug) = &output_plug {
                    a.publish(plug, Some(&[0])).expect("Failed to publish");
                    println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
                }
            }
            Err(e) => {
                panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
            }
        }
        thread::sleep(Duration::from_secs_f32(publish_interval));
    }
}
source

pub fn encode_and_publish<T: Serialize>( &self, plug_definition: &PlugDefinition, data: T ) -> Result<()>

Similar to publish but serializes the data automatically before sending

Examples found in repository?
examples/username_password.rs (line 56)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn main() {
    println!("Rust Tether Agent: with username and password");

    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let tether_agent = TetherAgentOptionsBuilder::new("RustDemoAgent")
        .host(Some("10.112.10.10"))
        .username(Some("connected.space"))
        .password(Some("connected.space"))
        .build()
        .expect("Failed to initialise and connect");
    let (role, id, _) = tether_agent.description();
    info!("Created agent OK: {}, {}", role, id);

    let empty_message_output = PlugOptionsBuilder::create_output("nothing")
        .build(&tether_agent)
        .expect("failed to create output");
    let boolean_message_output = PlugOptionsBuilder::create_output("one")
        .build(&tether_agent)
        .expect("failed to create output");
    let custom_output = PlugOptionsBuilder::create_output("two")
        .build(&tether_agent)
        .expect("failed to create output");

    for i in 1..=10 {
        info!("#{i}: Sending empty message...");
        tether_agent.publish(&empty_message_output, None).unwrap();

        let bool = i % 2 == 0;
        info!("#{i}: Sending boolean message...");
        tether_agent
            .publish(&boolean_message_output, Some(&[bool.into()]))
            .unwrap();

        let custom_message = CustomStruct {
            foo: "hello".into(),
            bar: 0.42,
        };
        tether_agent
            .encode_and_publish(&custom_output, custom_message)
            .unwrap();

        thread::sleep(Duration::from_millis(1000))
    }
}
More examples
Hide additional examples
examples/publish.rs (line 63)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
fn main() {
    println!("Rust Tether Agent publish example");

    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let agent = TetherAgentOptionsBuilder::new("RustDemoAgent")
        .build()
        .expect("failed to connect Tether");
    let (role, id, _) = agent.description();
    info!("Created agent OK: {}, {}", role, id);

    let empty_message_output = PlugOptionsBuilder::create_output("nothing")
        .build(&agent)
        .expect("failed to create output");
    let boolean_message_output = PlugOptionsBuilder::create_output("one")
        .build(&agent)
        .expect("failed to create output");
    let custom_output = PlugOptionsBuilder::create_output("two")
        .topic(Some("custom/custom/two".into()))
        .build(&agent)
        .expect("failed to create output");
    let grouped_output_1 = PlugOptionsBuilder::create_output("one")
        .id(Some("groupMessages"))
        .build(&agent)
        .expect("failed to create output");
    let grouped_output_2 = PlugOptionsBuilder::create_output("two")
        .id(Some("groupMessages"))
        .build(&agent)
        .expect("failed to create output");

    for i in 1..=10 {
        info!("#{i}: Sending empty message...");
        agent.publish(&empty_message_output, None).unwrap();

        let bool = i % 2 == 0;
        info!("#{i}: Sending boolean message...");
        agent
            .publish(&boolean_message_output, Some(&[bool.into()]))
            .unwrap();

        info!("#{i}: Sending custom struct message...");
        let custom_message = CustomStruct {
            id: i,
            name: "hello".into(),
        };
        agent
            .encode_and_publish(&custom_output, custom_message)
            .unwrap();

        info!("#{i}: Sending grouped messages...");
        agent.publish(&grouped_output_1, None).unwrap();
        agent.publish(&grouped_output_2, None).unwrap();

        thread::sleep(Duration::from_millis(1000))
    }
}
examples/error_handling.rs (line 41)
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
fn main() {
    let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
    builder.init();

    debug!("Debugging is enabled; could be verbose");

    let bad_tether_agent = TetherAgentOptionsBuilder::new("tester")
        .host(Some("tether-io.dev"))
        .username(Some("bla"))
        .password(Some("bla"))
        .build();
    match bad_tether_agent {
        Ok(_agent) => {
            panic!("Connection: This shouldn't work!");
        }
        Err(e) => warn!("Got a connection error as expected: {e:?}"),
    }

    let disconnected = TetherAgentOptionsBuilder::new("tester")
        .host(Some("tether-io.dev"))
        .auto_connect(false)
        .build()
        .expect("this ought initialise but not conect");

    let output = PlugOptionsBuilder::create_output("values")
        .build(&disconnected)
        .expect("this output should be valid always");

    let an_array = &vec![0, 1, 2, 3];
    match disconnected.encode_and_publish(&output, an_array) {
        Ok(()) => panic!("Publish on disconnected agent: This shouldn't work!"),
        Err(e) => warn!("Got a not-connected error as expected: {e:?}"),
    }

    let input_on_disconnected = PlugOptionsBuilder::create_input("something");
    match input_on_disconnected.build(&disconnected) {
        Ok(_) => panic!("Input plug subscribe on disconnected client: This shouldn't work!"),
        Err(e) => warn!("Got a subscribe failure error as expected: {e:?}"),
    }

    // Rust's type-checking kind of prevents this happening at all!
    // let bad_payload: &[u8; 9] = &[0x87, 0xA3, 0x69, 0x6E, 0x74, 0x01, 0xA5, 0x66, 0x6C];
    // match working_tether_agent.encode_and_publish::<CustomStruct>(&output, bad_payload) {
    //     Ok(()) => panic!("Encoding: This shouldn't work!"),
    //     Err(e) => warn!("Got an encoding error as expected: {e:?}"),
    // }

    let working_tether_agent = TetherAgentOptionsBuilder::new("tester")
        .build()
        .expect("this should connect to local server");

    let bad_payload: &[u8; 9] = &[0x87, 0xA3, 0x69, 0x6E, 0x74, 0x01, 0xA5, 0x66, 0x6C];
    working_tether_agent
        .publish(&output, Some(bad_payload))
        .expect("This will produce an error when DECODING, but not checked by library");

    let bad_topic_input = PlugOptionsBuilder::create_input("something").topic(Some("*/#/house+"));
    match bad_topic_input.build(&working_tether_agent) {
        Ok(_) => panic!("Weird topic: This shouldn't work!"),
        Err(e) => warn!("Got a subscribe error (bad topic) as expected: {e:?}"),
    }
}
source

pub fn publish_raw( &self, topic: &str, payload: &[u8], qos: Option<i32>, retained: Option<bool> ) -> Result<()>

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.