1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
//! # Subscribe on one thread
//!
//! This example demonstrates checking for messages repeatedly on a separate, spawned thread.
//! If any messages are found, it will relay them to the other (main) thread via an
//! async Channel.
//!
//! To keep this example reasonably realistic, the message-checking thread loops more quickly
//! (1ms interval) while the main thread simulates being very busy with other things (1 second
//! interval).
//!
//! When the main thread comes to a non-empty "queue" of messages on the Receiver, it processes
//! them as quickly as possible. Once it has received at least 10 messages, it simply quits.
//!
//! **This example needs some published messages to be useful!** Try running the publish example
//! at the same time: `cargo run --example publish`
use std::{
sync::{mpsc, Arc, Mutex},
thread,
time::Duration,
};
use tether_agent::{PlugOptionsBuilder, TetherAgentOptionsBuilder};
fn main() {
println!("Rust Tether Agent subscribe example");
let tether_agent = Arc::new(Mutex::new(
TetherAgentOptionsBuilder::new("RustDemoAgent")
.id("example")
.build()
.expect("failed to init/connect"),
));
match tether_agent.lock() {
Ok(a) => {
let _input_plug = PlugOptionsBuilder::create_output("one").build(&a);
}
Err(e) => {
panic!("Failed to acquire lock for Tether Agent setup: {}", e);
}
};
let (tx, rx) = mpsc::channel();
let receiver_agent = Arc::clone(&tether_agent);
thread::spawn(move || {
println!("Checking messages every 1s, 10x...");
let mut message_count = 0;
// let mut i = 0;
loop {
// i += 1;
// println!("#{i}: Checking messages...");
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
message_count += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}",);
tx.send(format!("received message #{message_count}"))
.expect("failed to send message via channel");
}
}
Err(e) => {
println!("Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_millis(1));
}
});
let mut main_thread_received_count = 0;
loop {
println!("Main thread sleep...");
for rx in rx.try_iter() {
main_thread_received_count += 1;
println!(
"<<<<<<<< MAIN THREAD: received {} (count: {})",
rx, main_thread_received_count
);
}
if main_thread_received_count >= 10 {
println!("We're done!");
std::process::exit(0);
}
std::thread::sleep(Duration::from_secs(1));
}
}