usage/usage.rs
1// The following examples show how to use nysa buses for simple task orchestration between threads.
2
3use std::{sync::Arc, time::Duration};
4
5use nysa::Bus;
6
7// First, let's define some types.
8
9// Add will be our request to add two numbers, or end the adder thread.
10enum Add {
11 Two(i32, i32),
12 Quit,
13}
14
15// AdditionResults will be pushed from the adder thread to the bus, as a result of addition.
16struct AdditionResult(i32);
17
18// This example demonstrates how to use an explicitly-created bus.
19fn local_bus() {
20 // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21 // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22 let bus = Arc::new(Bus::new());
23
24 let adder = {
25 // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26 // the thread, while still preserving our original instance.
27 let bus = Arc::clone(&bus);
28 std::thread::spawn(move || loop {
29 // The thread will wait indefinitely until it receives an Add message. It's good practice
30 // to specify which message you want to receive explicitly, as type inference can hide
31 // that information away.
32 match bus.wait_for::<Add>().consume() {
33 Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34 Add::Quit => break,
35 }
36 })
37 };
38
39 // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40 // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41 // indeed block until new messages arrive.
42 bus.push(Add::Two(1, 2));
43 std::thread::sleep(Duration::from_secs(1));
44 bus.push(Add::Two(4, 5));
45 std::thread::sleep(Duration::from_secs(1));
46 // After all these requests, we'll send a final shutdown request, and wait until the thread
47 // finishes execution.
48 bus.push(Add::Quit);
49 adder.join().unwrap();
50
51 // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52 // is performing its job of draining the message queue, the bus for the given type of messages
53 // is locked and new messages will not be pushed until the loop finishes.
54 for message in &bus.retrieve_all::<AdditionResult>() {
55 let AdditionResult(x) = message.consume();
56 println!("{}", x);
57 }
58}
59
60// Now, let's go over the same example, but using a global bus.
61fn global_bus() {
62 use nysa::global as bus;
63
64 let adder = std::thread::spawn(move || loop {
65 // We use `nysa::global::wait_for` to retrieve messages from the global bus.
66 match bus::wait_for::<Add>().consume() {
67 Add::Two(a, b) => bus::push(AdditionResult(a + b)),
68 Add::Quit => break,
69 }
70 });
71
72 // We use `nysa::global::push` to push messages to the global bus.
73 bus::push(Add::Two(1, 2));
74 std::thread::sleep(Duration::from_secs(1));
75 bus::push(Add::Two(4, 5));
76 std::thread::sleep(Duration::from_secs(1));
77 bus::push(Add::Quit);
78 adder.join().unwrap();
79
80 // We use `nysa::global::retrieve_all` to retrieve all messages of a given type from the bus.
81 for message in &bus::retrieve_all::<AdditionResult>() {
82 let AdditionResult(x) = message.consume();
83 println!("{}", x);
84 }
85}
86
87fn main() {
88 local_bus();
89 global_bus();
90}