1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//! # Publish and Subscribe on separate threads
//!
//! Demonstrates checking for messages on one (spawned) thread while publishing messages on another
//! thread (in this, case the main thread). The main thread loops exactly `publish_count_target times`,
//! while the spawned thread - the one checking for messages - loops "inifitely".
//!
//! You may notice that the CHECKING LOOP occasionally throws an error from try_lock: this is
//! because both threads may attempt to access (lock) the reference to the Tether Agent at the same
//! time. This is **not** a problem because the CHECKING LOOP will simply find another opportunity to
//! check the messages again later.
//!
//! When the program ends, you should see `publish_count_target times` messages published and the same
//! number received, proving that nothing was "lost" between the threads and no deadlock situations
//! occurred.
//!

use std::{
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

use tether_agent::TetherAgent;

fn main() {
    let check_interval = 0.01;
    let publish_count_target = 100;
    let publish_interval = 0.1;

    println!("Rust Tether Agent subscribe example");

    let agent = Arc::new(Mutex::new(TetherAgent::new(
        "RustDemoAgent",
        Some("example"),
        None,
    )));

    let mut output_plug = None;

    // Here we call .lock() because it is OK to block while "setting up", connecting
    if let Ok(a) = agent.lock() {
        a.connect(None, None).expect("failed to connect");
        a.create_input_plug("one", None, None)
            .expect("failed to create Input Plug");
        let plug = a
            .create_output_plug("one", None, None)
            .expect("failed to create Output Plug");
        output_plug = Some(plug);
    } else {
        panic!("Error setting up Tether Agent!");
    }

    let receiver_agent = Arc::clone(&agent);
    thread::spawn(move || {
        println!("Checking messages every {check_interval}s...");

        let mut i = 0;
        let mut count_messages_received = 0;

        /*
         Infinite loop. But because we never join the threads, this thread will terminate
         as soon as the main thread does.
        */
        loop {
            i += 1;
            println!("CHECKING LOOP: Checking messages attempt #{i}...");

            /*
              Here we call try_lock() because we do not want to block
              if the Agent is currently locked by another thread.
              Just print a message, wait and try again later.
            */
            match receiver_agent.try_lock() {
                Ok(a) => {
                    if let Some((topic, _message)) = a.check_messages() {
                        count_messages_received += 1;
                        println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}; Now has {count_messages_received} messages");
                    }
                }
                Err(e) => {
                    println!("CHECKING LOOP: Failed to acquire lock: {}", e);
                }
            }
            thread::sleep(Duration::from_secs_f32(check_interval));
        }
    });

    let sending_agent = Arc::clone(&agent);
    println!(
        "Sending a message, every {}s, exactly {}x times...",
        publish_interval, publish_count_target
    );
    let mut count_messages_sent = 0;
    for i in 1..=publish_count_target {
        println!("MAIN THREAD LOOP: Send attempt #{i}");
        /*
          In this particular case, lock() is preferable to try_lock() because
          we are not doing anything else on this thread. Waiting (blocking)
          to acquire the lock
          is fine; the other thread will let it go soon.
        */
        match sending_agent.lock() {
            Ok(a) => {
                count_messages_sent += 1;
                if let Some(plug) = &output_plug {
                    a.publish(plug, Some(&[0])).expect("Failed to publish");
                    println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
                }
            }
            Err(e) => {
                panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
            }
        }
        thread::sleep(Duration::from_secs_f32(publish_interval));
    }
}