Expand description
A small and lightweight crate to hide most of the burden of setting up long-living threads.
§Philosophy
This crate sees threads as unique entities called Workers
which may (or may not) live as long as the program lives. Workers
are spawned in Runtimes
that manage them.
§Usage
Here’s a small example that spawns a worker that prints “Hello, World!” every 100ms for 1 second.
struct WorkerThatPrints;
impl Worker for WorkerThatPrints {
fn on_update(&mut self) -> ControlFlow {
println!("Hello, World!");
std::thread::sleep(Duration::from_millis(100));
ControlFlow::Continue
}
}
let mut runtime = Runtime::new();
runtime.launch(WorkerThatPrints);
std::thread::sleep(Duration::from_secs(1));
§Features
Runtimes
and Workers
comes with a set of various features and helpers to setup and run them.
§Runtimes and non-'static
things
Runtimes
need 'static
lifetimes, therefore the following example won’t compile.
struct WorkerThatPrints<'a>(&'a str);
impl Worker for WorkerThatPrints<'_> {
fn on_update(&mut self) -> ControlFlow {
println!("Hello, {}!", self.0);
ControlFlow::Continue
}
}
let name = String::from("Alice");
let worker = WorkerThatPrints(&name);
let mut runtime = Runtime::new();
runtime.launch(worker); // worker isn't 'static!
Fortunately, this crate provides ScopedRuntimes
. They are a 1 to 1 implementation of classic Runtimes
except that they need a scope.
let name = String::from("Alice");
let worker = WorkerThatPrints(&name);
std::thread::scope(|scope| {
// Let's create a scoped runtime
let mut runtime = ScopedRuntime::new(scope);
runtime.launch(worker); // Now, that works!
})
§Configuring the threads
Workers threads can be configured via the Settings
type with users can use it to set a thread’s stack size and name.
Then, by passing the settings alongside the actor using the Runtime::launch_with_settings
function, the thread will
be spawned with the specified settings.
Users can also set the thread’s affinity by passing a list of CPU IDs to the Runtime::launch_pinned
function.
The Runtime::launch_pinned_with_settings
can do both.
let mut runtime = Runtime::new();
let settings = Settings::new().name("worker");
// The thread will be named "worker" and pinned to the CPUs #0 and #1
runtime.launch_pinned_with_settings(WorkerThatPrints, [0,1], settings);
§Contextes
Users may want to configure their workers via a builder pattern before spawning them in a runtime,
somewhat like how the std::process::Command
type works. This can be done easily giving a type that implements
the Context
trait to the Runtime::launch_from_context
function. Contextes also provide thread settings.
struct WorkerContext {
name: String
}
impl Context for WorkerContext {
type Target = WorkerThatPrints;
fn into_worker(self) -> Result<Self::Target, Error> {
Ok(WorkerThatPrints(self.name))
}
fn settings(&self) -> Settings {
Settings::new().name("worker")
}
fn core_pinning(&self) -> Option<Vec<usize>> {
Some(vec![0,1])
}
}
let mut runtime = Runtime::new();
let context = WorkerContext { name: String::from("Alice") };
runtime.launch_from_context(context);
§Respawning workers that panicked
The runtimes allow respawning workers that panicked. This can be achieved by implementing the RespawnableContext
trait
for a type a passing that type to the Runtime::launch_respawnable
function.
By calling the Runtime::health_check
function, a runtime will check every respawnable workers that panicked and will respawn
it using their respective contextes.
// A worker that panic some time after being spawned...
struct PanickingWorker;
impl Worker for PanickingWorker {
fn on_update(&mut self) -> ControlFlow {
std::thread::sleep(Duration::from_secs(1));
panic!("panicking!")
}
}
// ... and its context.
struct WorkerContext;
impl RespawnableContext<'_> for WorkerContext {
fn boxed_worker(&self) -> Result<Box<dyn Worker>, Error> {
Ok(Box::new(PanickingWorker))
}
}
let mut runtime = Runtime::new();
runtime.launch_respawnable(WorkerContext);
std::thread::sleep(Duration::from_secs(1));
runtime.health_check();
§Inter-workers communication
This crate exposes two traits, Register
and Connect
, enabling the communication between two workers.
Those traits are channel agnostics and direction: virtually anything that can send or receive data or share states
can be used as a Register::Endpoint
.
// A producer worker.
#[derive(Default)]
struct Producer {
sender: Option<Sender<u8>>,
}
impl Worker for Producer {
fn on_update(&mut self) -> ControlFlow {
std::thread::sleep(Duration::from_millis(100));
if let Some(sender) = self.sender.as_ref() {
sender.send(1).unwrap();
}
ControlFlow::Continue
}
}
impl Connect<Consumer> for Producer {
fn on_connection(&mut self, endpoint: Sender<u8>) {
self.sender = Some(endpoint)
}
}
// A consumer worker
struct Consumer {
sender: Sender<u8>,
recver: Receiver<u8>,
count: u8,
}
impl Consumer {
fn new() -> Self {
let (sender, recver) = mpsc::channel();
Self {
sender,
recver,
count: 0,
}
}
}
impl Worker for Consumer {
fn on_update(&mut self) -> ControlFlow {
let val = self.recver.recv().unwrap();
self.count += val;
ControlFlow::Continue
}
}
impl Register for Consumer {
type Endpoint = Sender<u8>;
fn register(&mut self, other: &mut impl Connect<Self>) {
other.on_connection(self.sender.clone())
}
}
let mut consumer = Consumer::new();
let mut prod1 = Producer::default();
let mut prod2 = Producer::default();
let mut prod3 = Producer::default();
// Let's connect everything
consumer.register(&mut prod1);
consumer.register(&mut prod2);
consumer.register(&mut prod3);
// Launch the workers in a runtime
let mut runtime = Runtime::new();
runtime.launch(consumer);
runtime.launch(prod1);
runtime.launch(prod2);
runtime.launch(prod3);
§Timers
This crate re-exports all types from the minuteurs
crate and implements the Worker
and Register
traits on
the Timer
type, allowing it to be used in runtimes.
Requires the timing
feature.
use employees::{Runtime, Worker, ControlFlow, Error, Register, Connect, Context};
use employees::minuteurs::{Timer, Watcher};
// A worker that prints "Hello!" each times the timer ticks...
struct WorkerThatPrints {
watcher: Watcher,
}
impl Worker for WorkerThatPrints {
fn on_update(&mut self) -> ControlFlow {
if self.watcher.has_ticked() {
println!("Hello!");
}
ControlFlow::Continue
}
}
// ... and its context.
struct WorkerThatPrintsContext {
watcher: Option<Watcher>,
}
impl WorkerThatPrintsContext {
fn new() -> Self {
Self { watcher: None }
}
}
impl Context for WorkerThatPrintsContext {
type Target = WorkerThatPrints;
fn into_worker(self) -> Result<Self::Target, Error> {
Ok(WorkerThatPrints { watcher: self.watcher.expect("the context must be connected") })
}
}
impl Connect<Timer> for WorkerThatPrintsContext {
fn on_connection(&mut self, endpoint: Watcher) {
let _ = self.watcher.insert(endpoint);
}
}
let mut runtime = Runtime::new();
// Set a timer that ticks every 100ms
let mut timer = Timer::new(Duration::from_millis(100));
let mut worker_ctx = WorkerThatPrintsContext::new();
// Connect the worker (or rather, its context) to the timer
timer.register(&mut worker_ctx);
runtime.launch(timer).expect("failed to launch the timer");
runtime.launch_from_context(worker_ctx).expect("failed to launch the worker");
Structs§
- A runtime that manages
Workers
threads. - A runtime to manage
Workers
scoped threads. - Used to configure the properties of a worker’s thread.
- Describes the running status of a
Runtime
.
Enums§
- Defines the control flow of
Workers
. - Kinds of error that might happen.
Traits§
- A type implementing this trait can be connected to some
Register
. - Allow building a worker before actually launching it with the
Runtime::launch_from_context
function. - A type that implements this trait can open an communication channel between the type and some other type implements the
Connect
trait of the type. - This trait is very similar to the
Context
but is needed for workers to be respawned. - A worker represents a thread that runs for the lifetime of the program (or at least for a very long time).