pub struct Bus { /* private fields */ }Expand description
A bus for passing messages across threads.
Nysa buses are fully thread-safe, and thus, can be stored in static variables. The library
provides a “default” bus in the module crate::global.
Implementations§
Source§impl Bus
impl Bus
Sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new bus.
Examples found in repository?
19fn local_bus() {
20 // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21 // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22 let bus = Arc::new(Bus::new());
23
24 let adder = {
25 // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26 // the thread, while still preserving our original instance.
27 let bus = Arc::clone(&bus);
28 std::thread::spawn(move || loop {
29 // The thread will wait indefinitely until it receives an Add message. It's good practice
30 // to specify which message you want to receive explicitly, as type inference can hide
31 // that information away.
32 match bus.wait_for::<Add>().consume() {
33 Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34 Add::Quit => break,
35 }
36 })
37 };
38
39 // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40 // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41 // indeed block until new messages arrive.
42 bus.push(Add::Two(1, 2));
43 std::thread::sleep(Duration::from_secs(1));
44 bus.push(Add::Two(4, 5));
45 std::thread::sleep(Duration::from_secs(1));
46 // After all these requests, we'll send a final shutdown request, and wait until the thread
47 // finishes execution.
48 bus.push(Add::Quit);
49 adder.join().unwrap();
50
51 // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52 // is performing its job of draining the message queue, the bus for the given type of messages
53 // is locked and new messages will not be pushed until the loop finishes.
54 for message in &bus.retrieve_all::<AdditionResult>() {
55 let AdditionResult(x) = message.consume();
56 println!("{}", x);
57 }
58}Sourcepub fn push<T>(&self, message_data: T)where
T: 'static + Send,
pub fn push<T>(&self, message_data: T)where
T: 'static + Send,
Pushes a message with the given data onto the bus.
Examples found in repository?
19fn local_bus() {
20 // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21 // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22 let bus = Arc::new(Bus::new());
23
24 let adder = {
25 // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26 // the thread, while still preserving our original instance.
27 let bus = Arc::clone(&bus);
28 std::thread::spawn(move || loop {
29 // The thread will wait indefinitely until it receives an Add message. It's good practice
30 // to specify which message you want to receive explicitly, as type inference can hide
31 // that information away.
32 match bus.wait_for::<Add>().consume() {
33 Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34 Add::Quit => break,
35 }
36 })
37 };
38
39 // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40 // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41 // indeed block until new messages arrive.
42 bus.push(Add::Two(1, 2));
43 std::thread::sleep(Duration::from_secs(1));
44 bus.push(Add::Two(4, 5));
45 std::thread::sleep(Duration::from_secs(1));
46 // After all these requests, we'll send a final shutdown request, and wait until the thread
47 // finishes execution.
48 bus.push(Add::Quit);
49 adder.join().unwrap();
50
51 // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52 // is performing its job of draining the message queue, the bus for the given type of messages
53 // is locked and new messages will not be pushed until the loop finishes.
54 for message in &bus.retrieve_all::<AdditionResult>() {
55 let AdditionResult(x) = message.consume();
56 println!("{}", x);
57 }
58}Sourcepub fn retrieve_all<'bus, T>(&'bus self) -> RetrieveAllRef<'bus, T>where
T: 'static + Send,
pub fn retrieve_all<'bus, T>(&'bus self) -> RetrieveAllRef<'bus, T>where
T: 'static + Send,
Retrieves all messages of the given type from the bus.
Note that the bus for the given message type is locked for the entire duration of this loop.
§See also
Examples found in repository?
19fn local_bus() {
20 // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21 // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22 let bus = Arc::new(Bus::new());
23
24 let adder = {
25 // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26 // the thread, while still preserving our original instance.
27 let bus = Arc::clone(&bus);
28 std::thread::spawn(move || loop {
29 // The thread will wait indefinitely until it receives an Add message. It's good practice
30 // to specify which message you want to receive explicitly, as type inference can hide
31 // that information away.
32 match bus.wait_for::<Add>().consume() {
33 Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34 Add::Quit => break,
35 }
36 })
37 };
38
39 // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40 // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41 // indeed block until new messages arrive.
42 bus.push(Add::Two(1, 2));
43 std::thread::sleep(Duration::from_secs(1));
44 bus.push(Add::Two(4, 5));
45 std::thread::sleep(Duration::from_secs(1));
46 // After all these requests, we'll send a final shutdown request, and wait until the thread
47 // finishes execution.
48 bus.push(Add::Quit);
49 adder.join().unwrap();
50
51 // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52 // is performing its job of draining the message queue, the bus for the given type of messages
53 // is locked and new messages will not be pushed until the loop finishes.
54 for message in &bus.retrieve_all::<AdditionResult>() {
55 let AdditionResult(x) = message.consume();
56 println!("{}", x);
57 }
58}Sourcepub fn wait_for<'bus, T>(&'bus self) -> Message<'bus, T>where
T: 'static + Send,
pub fn wait_for<'bus, T>(&'bus self) -> Message<'bus, T>where
T: 'static + Send,
Blocks execution in the current thread indefinitely until a message of the provided type arrives on the bus.
Although the Rust style guidelines say otherwise, to prevent bugs and aid readability it’s best to always use the turbofish syntax over implicit type inference with this function, such that it’s immediately visible what type of message is being waited for.
§See also
Examples found in repository?
19fn local_bus() {
20 // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21 // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22 let bus = Arc::new(Bus::new());
23
24 let adder = {
25 // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26 // the thread, while still preserving our original instance.
27 let bus = Arc::clone(&bus);
28 std::thread::spawn(move || loop {
29 // The thread will wait indefinitely until it receives an Add message. It's good practice
30 // to specify which message you want to receive explicitly, as type inference can hide
31 // that information away.
32 match bus.wait_for::<Add>().consume() {
33 Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34 Add::Quit => break,
35 }
36 })
37 };
38
39 // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40 // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41 // indeed block until new messages arrive.
42 bus.push(Add::Two(1, 2));
43 std::thread::sleep(Duration::from_secs(1));
44 bus.push(Add::Two(4, 5));
45 std::thread::sleep(Duration::from_secs(1));
46 // After all these requests, we'll send a final shutdown request, and wait until the thread
47 // finishes execution.
48 bus.push(Add::Quit);
49 adder.join().unwrap();
50
51 // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52 // is performing its job of draining the message queue, the bus for the given type of messages
53 // is locked and new messages will not be pushed until the loop finishes.
54 for message in &bus.retrieve_all::<AdditionResult>() {
55 let AdditionResult(x) = message.consume();
56 println!("{}", x);
57 }
58}Sourcepub fn wait_for_timeout<'bus, T>(
&'bus self,
timeout: Duration,
) -> Option<Message<'bus, T>>where
T: 'static + Send,
pub fn wait_for_timeout<'bus, T>(
&'bus self,
timeout: Duration,
) -> Option<Message<'bus, T>>where
T: 'static + Send,
Blocks execution in the current thread until a message of the provided type arrives on the bus, or the given timeout is reached.
This function will block for the specified amount of time at most and resume execution normally. Otherwise it will block indefinitely.
Returns Some(data) if data was successfully fetched, or None if the timeout was reached.