usage/
usage.rs

1// The following examples show how to use nysa buses for simple task orchestration between threads.
2
3use std::{sync::Arc, time::Duration};
4
5use nysa::Bus;
6
7// First, let's define some types.
8
9// Add will be our request to add two numbers, or end the adder thread.
10enum Add {
11   Two(i32, i32),
12   Quit,
13}
14
15// AdditionResults will be pushed from the adder thread to the bus, as a result of addition.
16struct AdditionResult(i32);
17
18// This example demonstrates how to use an explicitly-created bus.
19fn local_bus() {
20   // The bus needs to be behind an `Arc`, such that we can share its instance between threads.
21   // Access to buses is inherently gated behind a mutex, so all we need to do is share the data.
22   let bus = Arc::new(Bus::new());
23
24   let adder = {
25      // Now we can spawn the adder thread. We need to clone the Arc such that we can send it into
26      // the thread, while still preserving our original instance.
27      let bus = Arc::clone(&bus);
28      std::thread::spawn(move || loop {
29         // The thread will wait indefinitely until it receives an Add message. It's good practice
30         // to specify which message you want to receive explicitly, as type inference can hide
31         // that information away.
32         match bus.wait_for::<Add>().consume() {
33            Add::Two(a, b) => bus.push(AdditionResult(a + b)),
34            Add::Quit => break,
35         }
36      })
37   };
38
39   // Now that the adder thread is spinned up and ready to go, we'll send it some requests to
40   // add numbers. The requests will be delayed by a second, to demonstrate that the thread will
41   // indeed block until new messages arrive.
42   bus.push(Add::Two(1, 2));
43   std::thread::sleep(Duration::from_secs(1));
44   bus.push(Add::Two(4, 5));
45   std::thread::sleep(Duration::from_secs(1));
46   // After all these requests, we'll send a final shutdown request, and wait until the thread
47   // finishes execution.
48   bus.push(Add::Quit);
49   adder.join().unwrap();
50
51   // Now we can take a look at our results and print them all out. Note that while `retrieve_all`
52   // is performing its job of draining the message queue, the bus for the given type of messages
53   // is locked and new messages will not be pushed until the loop finishes.
54   for message in &bus.retrieve_all::<AdditionResult>() {
55      let AdditionResult(x) = message.consume();
56      println!("{}", x);
57   }
58}
59
60// Now, let's go over the same example, but using a global bus.
61fn global_bus() {
62   use nysa::global as bus;
63
64   let adder = std::thread::spawn(move || loop {
65      // We use `nysa::global::wait_for` to retrieve messages from the global bus.
66      match bus::wait_for::<Add>().consume() {
67         Add::Two(a, b) => bus::push(AdditionResult(a + b)),
68         Add::Quit => break,
69      }
70   });
71
72   // We use `nysa::global::push` to push messages to the global bus.
73   bus::push(Add::Two(1, 2));
74   std::thread::sleep(Duration::from_secs(1));
75   bus::push(Add::Two(4, 5));
76   std::thread::sleep(Duration::from_secs(1));
77   bus::push(Add::Quit);
78   adder.join().unwrap();
79
80   // We use `nysa::global::retrieve_all` to retrieve all messages of a given type from the bus.
81   for message in &bus::retrieve_all::<AdditionResult>() {
82      let AdditionResult(x) = message.consume();
83      println!("{}", x);
84   }
85}
86
87fn main() {
88   local_bus();
89   global_bus();
90}