Bus

Struct Bus 

Source
pub struct Bus { /* private fields */ }
Expand description

A bus for passing messages across threads.

Nysa buses are fully thread-safe, and thus, can be stored in static variables. The library provides a “default” bus in the module crate::global.

Implementations§

Source§

impl Bus

Source

pub fn new() -> Self

Creates a new bus.

Examples found in repository?
examples/usage.rs (line 22)
19fn local_bus() {
20   // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21   // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22   let bus = Arc::new(Bus::new());
23
24   let adder = {
25      // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26      // the thread, while still preserving our original instance.
27      let bus = Arc::clone(&bus);
28      std::thread::spawn(move || loop {
29         // The thread will wait indefinitely until it receives an Add message. It's good practice
30         // to specify which message you want to receive explicitly, as type inference can hide
31         // that information away.
32         match bus.wait_for::<Add>().consume() {
33            Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34            Add::Quit => break,
35         }
36      })
37   };
38
39   // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40   // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41   // indeed block until new messages arrive.
42   bus.push(Add::Two(1, 2));
43   std::thread::sleep(Duration::from_secs(1));
44   bus.push(Add::Two(4, 5));
45   std::thread::sleep(Duration::from_secs(1));
46   // After all these requests, we'll send a final shutdown request, and wait until the thread
47   // finishes execution.
48   bus.push(Add::Quit);
49   adder.join().unwrap();
50
51   // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52   // is performing its job of draining the message queue, the bus for the given type of messages
53   // is locked and new messages will not be pushed until the loop finishes.
54   for message in &bus.retrieve_all::<AdditionResult>() {
55      let AdditionResult(x) = message.consume();
56      println!("{}", x);
57   }
58}
Source

pub fn push<T>(&self, message_data: T)
where T: 'static + Send,

Pushes a message with the given data onto the bus.

Examples found in repository?
examples/usage.rs (line 33)
19fn local_bus() {
20   // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21   // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22   let bus = Arc::new(Bus::new());
23
24   let adder = {
25      // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26      // the thread, while still preserving our original instance.
27      let bus = Arc::clone(&bus);
28      std::thread::spawn(move || loop {
29         // The thread will wait indefinitely until it receives an Add message. It's good practice
30         // to specify which message you want to receive explicitly, as type inference can hide
31         // that information away.
32         match bus.wait_for::<Add>().consume() {
33            Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34            Add::Quit => break,
35         }
36      })
37   };
38
39   // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40   // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41   // indeed block until new messages arrive.
42   bus.push(Add::Two(1, 2));
43   std::thread::sleep(Duration::from_secs(1));
44   bus.push(Add::Two(4, 5));
45   std::thread::sleep(Duration::from_secs(1));
46   // After all these requests, we'll send a final shutdown request, and wait until the thread
47   // finishes execution.
48   bus.push(Add::Quit);
49   adder.join().unwrap();
50
51   // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52   // is performing its job of draining the message queue, the bus for the given type of messages
53   // is locked and new messages will not be pushed until the loop finishes.
54   for message in &bus.retrieve_all::<AdditionResult>() {
55      let AdditionResult(x) = message.consume();
56      println!("{}", x);
57   }
58}
Source

pub fn retrieve_all<'bus, T>(&'bus self) -> RetrieveAllRef<'bus, T>
where T: 'static + Send,

Retrieves all messages of the given type from the bus.

Note that the bus for the given message type is locked for the entire duration of this loop.

§See also
Examples found in repository?
examples/usage.rs (line 54)
19fn local_bus() {
20   // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21   // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22   let bus = Arc::new(Bus::new());
23
24   let adder = {
25      // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26      // the thread, while still preserving our original instance.
27      let bus = Arc::clone(&bus);
28      std::thread::spawn(move || loop {
29         // The thread will wait indefinitely until it receives an Add message. It's good practice
30         // to specify which message you want to receive explicitly, as type inference can hide
31         // that information away.
32         match bus.wait_for::<Add>().consume() {
33            Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34            Add::Quit => break,
35         }
36      })
37   };
38
39   // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40   // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41   // indeed block until new messages arrive.
42   bus.push(Add::Two(1, 2));
43   std::thread::sleep(Duration::from_secs(1));
44   bus.push(Add::Two(4, 5));
45   std::thread::sleep(Duration::from_secs(1));
46   // After all these requests, we'll send a final shutdown request, and wait until the thread
47   // finishes execution.
48   bus.push(Add::Quit);
49   adder.join().unwrap();
50
51   // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52   // is performing its job of draining the message queue, the bus for the given type of messages
53   // is locked and new messages will not be pushed until the loop finishes.
54   for message in &bus.retrieve_all::<AdditionResult>() {
55      let AdditionResult(x) = message.consume();
56      println!("{}", x);
57   }
58}
Source

pub fn wait_for<'bus, T>(&'bus self) -> Message<'bus, T>
where T: 'static + Send,

Blocks execution in the current thread indefinitely until a message of the provided type arrives on the bus.

Although the Rust style guidelines say otherwise, to prevent bugs and aid readability it’s best to always use the turbofish syntax over implicit type inference with this function, such that it’s immediately visible what type of message is being waited for.

§See also
Examples found in repository?
examples/usage.rs (line 32)
19fn local_bus() {
20   // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21   // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22   let bus = Arc::new(Bus::new());
23
24   let adder = {
25      // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26      // the thread, while still preserving our original instance.
27      let bus = Arc::clone(&bus);
28      std::thread::spawn(move || loop {
29         // The thread will wait indefinitely until it receives an Add message. It's good practice
30         // to specify which message you want to receive explicitly, as type inference can hide
31         // that information away.
32         match bus.wait_for::<Add>().consume() {
33            Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34            Add::Quit => break,
35         }
36      })
37   };
38
39   // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40   // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41   // indeed block until new messages arrive.
42   bus.push(Add::Two(1, 2));
43   std::thread::sleep(Duration::from_secs(1));
44   bus.push(Add::Two(4, 5));
45   std::thread::sleep(Duration::from_secs(1));
46   // After all these requests, we'll send a final shutdown request, and wait until the thread
47   // finishes execution.
48   bus.push(Add::Quit);
49   adder.join().unwrap();
50
51   // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52   // is performing its job of draining the message queue, the bus for the given type of messages
53   // is locked and new messages will not be pushed until the loop finishes.
54   for message in &bus.retrieve_all::<AdditionResult>() {
55      let AdditionResult(x) = message.consume();
56      println!("{}", x);
57   }
58}
Source

pub fn wait_for_timeout<'bus, T>( &'bus self, timeout: Duration, ) -> Option<Message<'bus, T>>
where T: 'static + Send,

Blocks execution in the current thread until a message of the provided type arrives on the bus, or the given timeout is reached.

This function will block for the specified amount of time at most and resume execution normally. Otherwise it will block indefinitely.

Returns Some(data) if data was successfully fetched, or None if the timeout was reached.

§See also

Auto Trait Implementations§

§

impl !Freeze for Bus

§

impl RefUnwindSafe for Bus

§

impl Send for Bus

§

impl Sync for Bus

§

impl Unpin for Bus

§

impl UnwindSafe for Bus

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.