Struct tether_agent::TetherAgent
source · pub struct TetherAgent { /* private fields */ }Implementations§
source§impl TetherAgent
impl TetherAgent
pub fn is_connected(&self) -> bool
sourcepub fn description(&self) -> (&str, &str)
pub fn description(&self) -> (&str, &str)
Returns the Agent Role and ID (group)
Examples found in repository?
examples/publish.rs (line 23)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
fn main() {
println!("Rust Tether Agent publish example");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some(String::from("localhost")));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent.connect(None, None).expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
id: i,
name: "hello".into(),
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}More examples
examples/username_password.rs (line 23)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
fn main() {
println!("Rust Tether Agent: with username and password");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some("10.112.10.10".into()));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent
.connect(
Some("connected.space".into()),
Some("connected.space".into()),
)
.expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
foo: "hello".into(),
bar: 0.42,
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}sourcepub fn broker_uri(&self) -> &str
pub fn broker_uri(&self) -> &str
Return the URI (protocol, IP address, port, path) that was used to connect to the MQTT broker
pub fn set_role(&mut self, role: &str)
pub fn set_id(&mut self, id: &str)
sourcepub fn new(role: &str, id: Option<&str>, broker_host: Option<String>) -> Self
pub fn new(role: &str, id: Option<&str>, broker_host: Option<String>) -> Self
Examples found in repository?
examples/publish.rs (line 22)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
fn main() {
println!("Rust Tether Agent publish example");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some(String::from("localhost")));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent.connect(None, None).expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
id: i,
name: "hello".into(),
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}More examples
examples/username_password.rs (line 22)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
fn main() {
println!("Rust Tether Agent: with username and password");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some("10.112.10.10".into()));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent
.connect(
Some("connected.space".into()),
Some("connected.space".into()),
)
.expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
foo: "hello".into(),
bar: 0.42,
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_threaded.rs (lines 28-32)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
fn main() {
println!("Rust Tether Agent subscribe example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
match agent.lock() {
Ok(a) => {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
}
Err(e) => {
panic!("Failed to acquire lock for Tether Agent setup: {}", e);
}
};
let (tx, rx) = mpsc::channel();
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every 1s, 10x...");
let mut message_count = 0;
let mut i = 0;
loop {
i += 1;
// println!("#{i}: Checking messages...");
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
message_count += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}",);
tx.send(format!("received message #{message_count}"))
.expect("failed to send message via channel");
}
}
Err(e) => {
println!("Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_millis(1));
}
});
let mut main_thread_received_count = 0;
loop {
println!("Main thread sleep...");
for rx in rx.try_iter() {
main_thread_received_count += 1;
println!(
"<<<<<<<< MAIN THREAD: received {} (count: {})",
rx, main_thread_received_count
);
}
if main_thread_received_count >= 10 {
println!("We're done!");
std::process::exit(0);
}
std::thread::sleep(Duration::from_secs(1));
}
}examples/subscribe.rs (line 25)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
fn main() {
println!("Rust Tether Agent subscribe example");
let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", Some("example"), None);
agent.connect(None, None).expect("Failed to connect");
let input_one = agent.create_input_plug("one", None, None).unwrap();
let input_two = agent.create_input_plug("two", None, None).unwrap();
let input_empty = agent.create_input_plug("nothing", None, None).unwrap();
info!("Checking messages every 1s, 10x...");
for i in 1..10 {
info!("#{i}: Checking for messages...");
if let Some((plug_name, message)) = agent.check_messages() {
if &input_one.name == plug_name.as_str() {
info!(
"******** INPUT ONE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_one.name,
message.topic(),
message.payload().len()
);
}
if &input_two.name == plug_name.as_str() {
info!(
"******** INPUT TWO:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_two.name,
message.topic(),
message.payload().len()
);
// Notice how you must give the from_slice function a type so it knows what to expect
let decoded = from_slice::<CustomMessage>(&message.payload());
match decoded {
Ok(d) => {
info!("Yes, we decoded the MessagePack payload as: {:?}", d);
let CustomMessage { name, id } = d;
debug!("Name is {} and ID is {}", name, id);
}
Err(e) => {
warn!("Failed to decode the payload: {}", e)
}
};
}
if &input_empty.name == plug_name.as_str() {
info!(
"******** EMPTY MESSAGE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_empty.name,
message.topic(),
message.payload().len()
);
}
}
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_publish_threaded.rs (lines 32-36)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() {
let check_interval = 0.01;
let publish_count_target = 100;
let publish_interval = 0.1;
println!("Rust Tether Agent threaded publish-while-consuming example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
let mut output_plug = None;
// Here we call .lock() because it is OK to block while "setting up", connecting
if let Ok(a) = agent.lock() {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
let plug = a
.create_output_plug("one", None, None)
.expect("failed to create Output Plug");
output_plug = Some(plug);
} else {
panic!("Error setting up Tether Agent!");
}
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every {check_interval}s...");
let mut i = 0;
let mut count_messages_received = 0;
/*
Infinite loop. But because we never join the threads, this thread will terminate
as soon as the main thread does.
*/
loop {
i += 1;
println!("CHECKING LOOP: Checking messages attempt #{i}...");
/*
Here we call try_lock() because we do not want to block
if the Agent is currently locked by another thread.
Just print a message, wait and try again later.
*/
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
count_messages_received += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}; Now has {count_messages_received} messages");
}
}
Err(e) => {
println!("CHECKING LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(check_interval));
}
});
let sending_agent = Arc::clone(&agent);
println!(
"Sending a message, every {}s, exactly {}x times...",
publish_interval, publish_count_target
);
let mut count_messages_sent = 0;
for i in 1..=publish_count_target {
println!("MAIN THREAD LOOP: Send attempt #{i}");
/*
In this particular case, lock() is preferable to try_lock() because
we are not doing anything else on this thread. Waiting (blocking)
to acquire the lock
is fine; the other thread will let it go soon.
*/
match sending_agent.lock() {
Ok(a) => {
count_messages_sent += 1;
if let Some(plug) = &output_plug {
a.publish(plug, Some(&[0])).expect("Failed to publish");
println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
}
}
Err(e) => {
panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(publish_interval));
}
}sourcepub fn connect(
&self,
user: Option<String>,
password: Option<String>
) -> Result<(), Error>
pub fn connect( &self, user: Option<String>, password: Option<String> ) -> Result<(), Error>
Examples found in repository?
examples/publish.rs (line 26)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
fn main() {
println!("Rust Tether Agent publish example");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some(String::from("localhost")));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent.connect(None, None).expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
id: i,
name: "hello".into(),
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}More examples
examples/username_password.rs (lines 27-30)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
fn main() {
println!("Rust Tether Agent: with username and password");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some("10.112.10.10".into()));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent
.connect(
Some("connected.space".into()),
Some("connected.space".into()),
)
.expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
foo: "hello".into(),
bar: 0.42,
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_threaded.rs (line 36)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
fn main() {
println!("Rust Tether Agent subscribe example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
match agent.lock() {
Ok(a) => {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
}
Err(e) => {
panic!("Failed to acquire lock for Tether Agent setup: {}", e);
}
};
let (tx, rx) = mpsc::channel();
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every 1s, 10x...");
let mut message_count = 0;
let mut i = 0;
loop {
i += 1;
// println!("#{i}: Checking messages...");
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
message_count += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}",);
tx.send(format!("received message #{message_count}"))
.expect("failed to send message via channel");
}
}
Err(e) => {
println!("Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_millis(1));
}
});
let mut main_thread_received_count = 0;
loop {
println!("Main thread sleep...");
for rx in rx.try_iter() {
main_thread_received_count += 1;
println!(
"<<<<<<<< MAIN THREAD: received {} (count: {})",
rx, main_thread_received_count
);
}
if main_thread_received_count >= 10 {
println!("We're done!");
std::process::exit(0);
}
std::thread::sleep(Duration::from_secs(1));
}
}examples/subscribe.rs (line 27)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
fn main() {
println!("Rust Tether Agent subscribe example");
let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", Some("example"), None);
agent.connect(None, None).expect("Failed to connect");
let input_one = agent.create_input_plug("one", None, None).unwrap();
let input_two = agent.create_input_plug("two", None, None).unwrap();
let input_empty = agent.create_input_plug("nothing", None, None).unwrap();
info!("Checking messages every 1s, 10x...");
for i in 1..10 {
info!("#{i}: Checking for messages...");
if let Some((plug_name, message)) = agent.check_messages() {
if &input_one.name == plug_name.as_str() {
info!(
"******** INPUT ONE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_one.name,
message.topic(),
message.payload().len()
);
}
if &input_two.name == plug_name.as_str() {
info!(
"******** INPUT TWO:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_two.name,
message.topic(),
message.payload().len()
);
// Notice how you must give the from_slice function a type so it knows what to expect
let decoded = from_slice::<CustomMessage>(&message.payload());
match decoded {
Ok(d) => {
info!("Yes, we decoded the MessagePack payload as: {:?}", d);
let CustomMessage { name, id } = d;
debug!("Name is {} and ID is {}", name, id);
}
Err(e) => {
warn!("Failed to decode the payload: {}", e)
}
};
}
if &input_empty.name == plug_name.as_str() {
info!(
"******** EMPTY MESSAGE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_empty.name,
message.topic(),
message.payload().len()
);
}
}
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_publish_threaded.rs (line 42)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() {
let check_interval = 0.01;
let publish_count_target = 100;
let publish_interval = 0.1;
println!("Rust Tether Agent threaded publish-while-consuming example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
let mut output_plug = None;
// Here we call .lock() because it is OK to block while "setting up", connecting
if let Ok(a) = agent.lock() {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
let plug = a
.create_output_plug("one", None, None)
.expect("failed to create Output Plug");
output_plug = Some(plug);
} else {
panic!("Error setting up Tether Agent!");
}
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every {check_interval}s...");
let mut i = 0;
let mut count_messages_received = 0;
/*
Infinite loop. But because we never join the threads, this thread will terminate
as soon as the main thread does.
*/
loop {
i += 1;
println!("CHECKING LOOP: Checking messages attempt #{i}...");
/*
Here we call try_lock() because we do not want to block
if the Agent is currently locked by another thread.
Just print a message, wait and try again later.
*/
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
count_messages_received += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}; Now has {count_messages_received} messages");
}
}
Err(e) => {
println!("CHECKING LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(check_interval));
}
});
let sending_agent = Arc::clone(&agent);
println!(
"Sending a message, every {}s, exactly {}x times...",
publish_interval, publish_count_target
);
let mut count_messages_sent = 0;
for i in 1..=publish_count_target {
println!("MAIN THREAD LOOP: Send attempt #{i}");
/*
In this particular case, lock() is preferable to try_lock() because
we are not doing anything else on this thread. Waiting (blocking)
to acquire the lock
is fine; the other thread will let it go soon.
*/
match sending_agent.lock() {
Ok(a) => {
count_messages_sent += 1;
if let Some(plug) = &output_plug {
a.publish(plug, Some(&[0])).expect("Failed to publish");
println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
}
}
Err(e) => {
panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(publish_interval));
}
}sourcepub fn create_input_plug(
&self,
name: &str,
qos: Option<i32>,
override_topic: Option<&str>
) -> Result<PlugDefinition, ()>
pub fn create_input_plug( &self, name: &str, qos: Option<i32>, override_topic: Option<&str> ) -> Result<PlugDefinition, ()>
Examples found in repository?
examples/subscribe_threaded.rs (line 37)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
fn main() {
println!("Rust Tether Agent subscribe example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
match agent.lock() {
Ok(a) => {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
}
Err(e) => {
panic!("Failed to acquire lock for Tether Agent setup: {}", e);
}
};
let (tx, rx) = mpsc::channel();
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every 1s, 10x...");
let mut message_count = 0;
let mut i = 0;
loop {
i += 1;
// println!("#{i}: Checking messages...");
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
message_count += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}",);
tx.send(format!("received message #{message_count}"))
.expect("failed to send message via channel");
}
}
Err(e) => {
println!("Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_millis(1));
}
});
let mut main_thread_received_count = 0;
loop {
println!("Main thread sleep...");
for rx in rx.try_iter() {
main_thread_received_count += 1;
println!(
"<<<<<<<< MAIN THREAD: received {} (count: {})",
rx, main_thread_received_count
);
}
if main_thread_received_count >= 10 {
println!("We're done!");
std::process::exit(0);
}
std::thread::sleep(Duration::from_secs(1));
}
}More examples
examples/subscribe.rs (line 29)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
fn main() {
println!("Rust Tether Agent subscribe example");
let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", Some("example"), None);
agent.connect(None, None).expect("Failed to connect");
let input_one = agent.create_input_plug("one", None, None).unwrap();
let input_two = agent.create_input_plug("two", None, None).unwrap();
let input_empty = agent.create_input_plug("nothing", None, None).unwrap();
info!("Checking messages every 1s, 10x...");
for i in 1..10 {
info!("#{i}: Checking for messages...");
if let Some((plug_name, message)) = agent.check_messages() {
if &input_one.name == plug_name.as_str() {
info!(
"******** INPUT ONE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_one.name,
message.topic(),
message.payload().len()
);
}
if &input_two.name == plug_name.as_str() {
info!(
"******** INPUT TWO:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_two.name,
message.topic(),
message.payload().len()
);
// Notice how you must give the from_slice function a type so it knows what to expect
let decoded = from_slice::<CustomMessage>(&message.payload());
match decoded {
Ok(d) => {
info!("Yes, we decoded the MessagePack payload as: {:?}", d);
let CustomMessage { name, id } = d;
debug!("Name is {} and ID is {}", name, id);
}
Err(e) => {
warn!("Failed to decode the payload: {}", e)
}
};
}
if &input_empty.name == plug_name.as_str() {
info!(
"******** EMPTY MESSAGE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_empty.name,
message.topic(),
message.payload().len()
);
}
}
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_publish_threaded.rs (line 43)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() {
let check_interval = 0.01;
let publish_count_target = 100;
let publish_interval = 0.1;
println!("Rust Tether Agent threaded publish-while-consuming example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
let mut output_plug = None;
// Here we call .lock() because it is OK to block while "setting up", connecting
if let Ok(a) = agent.lock() {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
let plug = a
.create_output_plug("one", None, None)
.expect("failed to create Output Plug");
output_plug = Some(plug);
} else {
panic!("Error setting up Tether Agent!");
}
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every {check_interval}s...");
let mut i = 0;
let mut count_messages_received = 0;
/*
Infinite loop. But because we never join the threads, this thread will terminate
as soon as the main thread does.
*/
loop {
i += 1;
println!("CHECKING LOOP: Checking messages attempt #{i}...");
/*
Here we call try_lock() because we do not want to block
if the Agent is currently locked by another thread.
Just print a message, wait and try again later.
*/
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
count_messages_received += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}; Now has {count_messages_received} messages");
}
}
Err(e) => {
println!("CHECKING LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(check_interval));
}
});
let sending_agent = Arc::clone(&agent);
println!(
"Sending a message, every {}s, exactly {}x times...",
publish_interval, publish_count_target
);
let mut count_messages_sent = 0;
for i in 1..=publish_count_target {
println!("MAIN THREAD LOOP: Send attempt #{i}");
/*
In this particular case, lock() is preferable to try_lock() because
we are not doing anything else on this thread. Waiting (blocking)
to acquire the lock
is fine; the other thread will let it go soon.
*/
match sending_agent.lock() {
Ok(a) => {
count_messages_sent += 1;
if let Some(plug) = &output_plug {
a.publish(plug, Some(&[0])).expect("Failed to publish");
println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
}
}
Err(e) => {
panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(publish_interval));
}
}sourcepub fn create_output_plug(
&self,
name: &str,
qos: Option<i32>,
override_topic: Option<&str>
) -> Result<PlugDefinition, ()>
pub fn create_output_plug( &self, name: &str, qos: Option<i32>, override_topic: Option<&str> ) -> Result<PlugDefinition, ()>
Examples found in repository?
examples/publish.rs (line 29)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
fn main() {
println!("Rust Tether Agent publish example");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some(String::from("localhost")));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent.connect(None, None).expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
id: i,
name: "hello".into(),
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}More examples
examples/username_password.rs (line 34)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
fn main() {
println!("Rust Tether Agent: with username and password");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some("10.112.10.10".into()));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent
.connect(
Some("connected.space".into()),
Some("connected.space".into()),
)
.expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
foo: "hello".into(),
bar: 0.42,
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_publish_threaded.rs (line 46)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() {
let check_interval = 0.01;
let publish_count_target = 100;
let publish_interval = 0.1;
println!("Rust Tether Agent threaded publish-while-consuming example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
let mut output_plug = None;
// Here we call .lock() because it is OK to block while "setting up", connecting
if let Ok(a) = agent.lock() {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
let plug = a
.create_output_plug("one", None, None)
.expect("failed to create Output Plug");
output_plug = Some(plug);
} else {
panic!("Error setting up Tether Agent!");
}
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every {check_interval}s...");
let mut i = 0;
let mut count_messages_received = 0;
/*
Infinite loop. But because we never join the threads, this thread will terminate
as soon as the main thread does.
*/
loop {
i += 1;
println!("CHECKING LOOP: Checking messages attempt #{i}...");
/*
Here we call try_lock() because we do not want to block
if the Agent is currently locked by another thread.
Just print a message, wait and try again later.
*/
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
count_messages_received += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}; Now has {count_messages_received} messages");
}
}
Err(e) => {
println!("CHECKING LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(check_interval));
}
});
let sending_agent = Arc::clone(&agent);
println!(
"Sending a message, every {}s, exactly {}x times...",
publish_interval, publish_count_target
);
let mut count_messages_sent = 0;
for i in 1..=publish_count_target {
println!("MAIN THREAD LOOP: Send attempt #{i}");
/*
In this particular case, lock() is preferable to try_lock() because
we are not doing anything else on this thread. Waiting (blocking)
to acquire the lock
is fine; the other thread will let it go soon.
*/
match sending_agent.lock() {
Ok(a) => {
count_messages_sent += 1;
if let Some(plug) = &output_plug {
a.publish(plug, Some(&[0])).expect("Failed to publish");
println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
}
}
Err(e) => {
panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(publish_interval));
}
}sourcepub fn check_messages(&self) -> Option<(String, Message)>
pub fn check_messages(&self) -> Option<(String, Message)>
If a message is waiting return Plug Name, Message (String, Message)
Examples found in repository?
examples/subscribe_threaded.rs (line 59)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
fn main() {
println!("Rust Tether Agent subscribe example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
match agent.lock() {
Ok(a) => {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
}
Err(e) => {
panic!("Failed to acquire lock for Tether Agent setup: {}", e);
}
};
let (tx, rx) = mpsc::channel();
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every 1s, 10x...");
let mut message_count = 0;
let mut i = 0;
loop {
i += 1;
// println!("#{i}: Checking messages...");
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
message_count += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}",);
tx.send(format!("received message #{message_count}"))
.expect("failed to send message via channel");
}
}
Err(e) => {
println!("Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_millis(1));
}
});
let mut main_thread_received_count = 0;
loop {
println!("Main thread sleep...");
for rx in rx.try_iter() {
main_thread_received_count += 1;
println!(
"<<<<<<<< MAIN THREAD: received {} (count: {})",
rx, main_thread_received_count
);
}
if main_thread_received_count >= 10 {
println!("We're done!");
std::process::exit(0);
}
std::thread::sleep(Duration::from_secs(1));
}
}More examples
examples/subscribe.rs (line 37)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
fn main() {
println!("Rust Tether Agent subscribe example");
let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", Some("example"), None);
agent.connect(None, None).expect("Failed to connect");
let input_one = agent.create_input_plug("one", None, None).unwrap();
let input_two = agent.create_input_plug("two", None, None).unwrap();
let input_empty = agent.create_input_plug("nothing", None, None).unwrap();
info!("Checking messages every 1s, 10x...");
for i in 1..10 {
info!("#{i}: Checking for messages...");
if let Some((plug_name, message)) = agent.check_messages() {
if &input_one.name == plug_name.as_str() {
info!(
"******** INPUT ONE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_one.name,
message.topic(),
message.payload().len()
);
}
if &input_two.name == plug_name.as_str() {
info!(
"******** INPUT TWO:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_two.name,
message.topic(),
message.payload().len()
);
// Notice how you must give the from_slice function a type so it knows what to expect
let decoded = from_slice::<CustomMessage>(&message.payload());
match decoded {
Ok(d) => {
info!("Yes, we decoded the MessagePack payload as: {:?}", d);
let CustomMessage { name, id } = d;
debug!("Name is {} and ID is {}", name, id);
}
Err(e) => {
warn!("Failed to decode the payload: {}", e)
}
};
}
if &input_empty.name == plug_name.as_str() {
info!(
"******** EMPTY MESSAGE:\n Received a message from plug named \"{}\" on topic {} with length {} bytes",
input_empty.name,
message.topic(),
message.payload().len()
);
}
}
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_publish_threaded.rs (line 75)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() {
let check_interval = 0.01;
let publish_count_target = 100;
let publish_interval = 0.1;
println!("Rust Tether Agent threaded publish-while-consuming example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
let mut output_plug = None;
// Here we call .lock() because it is OK to block while "setting up", connecting
if let Ok(a) = agent.lock() {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
let plug = a
.create_output_plug("one", None, None)
.expect("failed to create Output Plug");
output_plug = Some(plug);
} else {
panic!("Error setting up Tether Agent!");
}
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every {check_interval}s...");
let mut i = 0;
let mut count_messages_received = 0;
/*
Infinite loop. But because we never join the threads, this thread will terminate
as soon as the main thread does.
*/
loop {
i += 1;
println!("CHECKING LOOP: Checking messages attempt #{i}...");
/*
Here we call try_lock() because we do not want to block
if the Agent is currently locked by another thread.
Just print a message, wait and try again later.
*/
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
count_messages_received += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}; Now has {count_messages_received} messages");
}
}
Err(e) => {
println!("CHECKING LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(check_interval));
}
});
let sending_agent = Arc::clone(&agent);
println!(
"Sending a message, every {}s, exactly {}x times...",
publish_interval, publish_count_target
);
let mut count_messages_sent = 0;
for i in 1..=publish_count_target {
println!("MAIN THREAD LOOP: Send attempt #{i}");
/*
In this particular case, lock() is preferable to try_lock() because
we are not doing anything else on this thread. Waiting (blocking)
to acquire the lock
is fine; the other thread will let it go soon.
*/
match sending_agent.lock() {
Ok(a) => {
count_messages_sent += 1;
if let Some(plug) = &output_plug {
a.publish(plug, Some(&[0])).expect("Failed to publish");
println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
}
}
Err(e) => {
panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(publish_interval));
}
}sourcepub fn publish(
&self,
plug: &PlugDefinition,
payload: Option<&[u8]>
) -> Result<(), ()>
pub fn publish( &self, plug: &PlugDefinition, payload: Option<&[u8]> ) -> Result<(), ()>
Given a plug definition and a raw (u8 buffer) payload, generate a message on an appropriate topic and with the QOS specified in the Plug Definition
Examples found in repository?
examples/publish.rs (line 35)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
fn main() {
println!("Rust Tether Agent publish example");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some(String::from("localhost")));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent.connect(None, None).expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
id: i,
name: "hello".into(),
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}More examples
examples/username_password.rs (line 40)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
fn main() {
println!("Rust Tether Agent: with username and password");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some("10.112.10.10".into()));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent
.connect(
Some("connected.space".into()),
Some("connected.space".into()),
)
.expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
foo: "hello".into(),
bar: 0.42,
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}examples/subscribe_publish_threaded.rs (line 106)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() {
let check_interval = 0.01;
let publish_count_target = 100;
let publish_interval = 0.1;
println!("Rust Tether Agent threaded publish-while-consuming example");
let agent = Arc::new(Mutex::new(TetherAgent::new(
"RustDemoAgent",
Some("example"),
None,
)));
let mut output_plug = None;
// Here we call .lock() because it is OK to block while "setting up", connecting
if let Ok(a) = agent.lock() {
a.connect(None, None).expect("failed to connect");
a.create_input_plug("one", None, None)
.expect("failed to create Input Plug");
let plug = a
.create_output_plug("one", None, None)
.expect("failed to create Output Plug");
output_plug = Some(plug);
} else {
panic!("Error setting up Tether Agent!");
}
let receiver_agent = Arc::clone(&agent);
thread::spawn(move || {
println!("Checking messages every {check_interval}s...");
let mut i = 0;
let mut count_messages_received = 0;
/*
Infinite loop. But because we never join the threads, this thread will terminate
as soon as the main thread does.
*/
loop {
i += 1;
println!("CHECKING LOOP: Checking messages attempt #{i}...");
/*
Here we call try_lock() because we do not want to block
if the Agent is currently locked by another thread.
Just print a message, wait and try again later.
*/
match receiver_agent.try_lock() {
Ok(a) => {
if let Some((topic, _message)) = a.check_messages() {
count_messages_received += 1;
println!("<<<<<<<< CHECKING LOOP: Received a message on topic {topic}; Now has {count_messages_received} messages");
}
}
Err(e) => {
println!("CHECKING LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(check_interval));
}
});
let sending_agent = Arc::clone(&agent);
println!(
"Sending a message, every {}s, exactly {}x times...",
publish_interval, publish_count_target
);
let mut count_messages_sent = 0;
for i in 1..=publish_count_target {
println!("MAIN THREAD LOOP: Send attempt #{i}");
/*
In this particular case, lock() is preferable to try_lock() because
we are not doing anything else on this thread. Waiting (blocking)
to acquire the lock
is fine; the other thread will let it go soon.
*/
match sending_agent.lock() {
Ok(a) => {
count_messages_sent += 1;
if let Some(plug) = &output_plug {
a.publish(plug, Some(&[0])).expect("Failed to publish");
println!(">>>>>>>> MAIN THREAD LOOP: sent {count_messages_sent} messages");
}
}
Err(e) => {
panic!("MAIN THREAD LOOP: Failed to acquire lock: {}", e);
}
}
thread::sleep(Duration::from_secs_f32(publish_interval));
}
}sourcepub fn encode_and_publish<T: Serialize>(
&self,
plug: &PlugDefinition,
data: T
) -> Result<(), ()>
pub fn encode_and_publish<T: Serialize>( &self, plug: &PlugDefinition, data: T ) -> Result<(), ()>
Similar to publish but serializes the data automatically before sending
Examples found in repository?
examples/publish.rs (line 48)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
fn main() {
println!("Rust Tether Agent publish example");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some(String::from("localhost")));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent.connect(None, None).expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
id: i,
name: "hello".into(),
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}More examples
examples/username_password.rs (line 53)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
fn main() {
println!("Rust Tether Agent: with username and password");
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.init();
debug!("Debugging is enabled; could be verbose");
let agent = TetherAgent::new("RustDemoAgent", None, Some("10.112.10.10".into()));
let (role, id) = agent.description();
info!("Created agent OK: {}, {}", role, id);
agent
.connect(
Some("connected.space".into()),
Some("connected.space".into()),
)
.expect("Failed to connect");
let empty_message_output: tether_agent::PlugDefinition =
agent.create_output_plug("nothing", None, None).unwrap();
let boolean_message_output = agent.create_output_plug("one", None, None).unwrap();
let custom_output = agent.create_output_plug("two", None, None).unwrap();
for i in 1..=10 {
info!("#{i}: Sending empty message...");
agent.publish(&empty_message_output, None).unwrap();
let bool = i % 2 == 0;
info!("#{i}: Sending boolean message...");
agent
.publish(&boolean_message_output, Some(&[bool.into()]))
.unwrap();
let custom_message = CustomStruct {
foo: "hello".into(),
bar: 0.42,
};
agent
.encode_and_publish(&custom_output, custom_message)
.unwrap();
thread::sleep(Duration::from_millis(1000))
}
}Auto Trait Implementations§
impl !RefUnwindSafe for TetherAgent
impl Send for TetherAgent
impl Sync for TetherAgent
impl Unpin for TetherAgent
impl !UnwindSafe for TetherAgent
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more