Crate employees

source ·
Expand description

A small and lightweight crate to hide most of the burden of setting up long-living threads.

§Philosophy

This crate sees threads as unique entities called Workers which may (or may not) live as long as the program lives. Workers are spawned in Runtimes that manage them.

§Usage

Here’s a small example that spawns a worker that prints “Hello, World!” every 100ms for 1 second.

struct WorkerThatPrints;
impl Worker for WorkerThatPrints {
    fn on_update(&mut self) -> ControlFlow {
        println!("Hello, World!");
        std::thread::sleep(Duration::from_millis(100));
        ControlFlow::Continue
    }
}

let mut runtime = Runtime::new();

runtime.launch(WorkerThatPrints);
std::thread::sleep(Duration::from_secs(1));

§Features

Runtimes and Workers comes with a set of various features and helpers to setup and run them.

§Runtimes and non-'static things

Runtimes need 'static lifetimes, therefore the following example won’t compile.

struct WorkerThatPrints<'a>(&'a str);
impl Worker for WorkerThatPrints<'_> {
    fn on_update(&mut self) -> ControlFlow {
        println!("Hello, {}!", self.0);
        ControlFlow::Continue
    }
}

let name = String::from("Alice");
let worker = WorkerThatPrints(&name);

let mut runtime = Runtime::new();
runtime.launch(worker); // worker isn't 'static!

Fortunately, this crate provides ScopedRuntimes. They are a 1 to 1 implementation of classic Runtimes except that they need a scope.

let name = String::from("Alice");
let worker = WorkerThatPrints(&name);

std::thread::scope(|scope| {
    // Let's create a scoped runtime
    let mut runtime = ScopedRuntime::new(scope);

    runtime.launch(worker); // Now, that works!
})

§Configuring the threads

Workers threads can be configured via the Settings type with users can use it to set a thread’s stack size and name. Then, by passing the settings alongside the actor using the Runtime::launch_with_settings function, the thread will be spawned with the specified settings.

Users can also set the thread’s affinity by passing a list of CPU IDs to the Runtime::launch_pinned function.

The Runtime::launch_pinned_with_settings can do both.

let mut runtime = Runtime::new();
let settings = Settings::new().name("worker");

// The thread will be named "worker" and pinned to the CPUs #0 and #1
runtime.launch_pinned_with_settings(WorkerThatPrints, [0,1], settings);

§Contextes

Users may want to configure their workers via a builder pattern before spawning them in a runtime, somewhat like how the std::process::Command type works. This can be done easily giving a type that implements the Context trait to the Runtime::launch_from_context function. Contextes also provide thread settings.

struct WorkerContext {
    name: String
}

impl Context for WorkerContext {
    type Target = WorkerThatPrints;

    fn into_worker(self) -> Result<Self::Target, Error> {
        Ok(WorkerThatPrints(self.name))
    }

    fn settings(&self) -> Settings {
        Settings::new().name("worker")
    }

    fn core_pinning(&self) -> Option<Vec<usize>> {
        Some(vec![0,1])
    }
}

let mut runtime = Runtime::new();
let context = WorkerContext { name: String::from("Alice") };

runtime.launch_from_context(context);

§Respawning workers that panicked

The runtimes allow respawning workers that panicked. This can be achieved by implementing the RespawnableContext trait for a type a passing that type to the Runtime::launch_respawnable function.

By calling the Runtime::health_check function, a runtime will check every respawnable workers that panicked and will respawn it using their respective contextes.

// A worker that panic some time after being spawned...
struct PanickingWorker;
impl Worker for PanickingWorker {
    fn on_update(&mut self) -> ControlFlow {
        std::thread::sleep(Duration::from_secs(1));
        panic!("panicking!")
    }
}

// ... and its context.
struct WorkerContext;
impl RespawnableContext<'_> for WorkerContext {
    fn boxed_worker(&self) -> Result<Box<dyn Worker>, Error> {
        Ok(Box::new(PanickingWorker))
    }
}

let mut runtime = Runtime::new();
runtime.launch_respawnable(WorkerContext);

std::thread::sleep(Duration::from_secs(1));
runtime.health_check();

§Inter-workers communication

This crate exposes two traits, Register and Connect, enabling the communication between two workers. Those traits are channel agnostics and direction: virtually anything that can send or receive data or share states can be used as a Register::Endpoint.

// A producer worker.
#[derive(Default)]
struct Producer {
    sender: Option<Sender<u8>>,
}

impl Worker for Producer {
    fn on_update(&mut self) -> ControlFlow {
        std::thread::sleep(Duration::from_millis(100));

        if let Some(sender) = self.sender.as_ref() {
            sender.send(1).unwrap();
        }

        ControlFlow::Continue
    }
}

impl Connect<Consumer> for Producer {
    fn on_connection(&mut self, endpoint: Sender<u8>) {
        self.sender = Some(endpoint)
    }
}

// A consumer worker
struct Consumer {
    sender: Sender<u8>,
    recver: Receiver<u8>,
    count: u8,
}

impl Consumer {
    fn new() -> Self {
        let (sender, recver) = mpsc::channel();

        Self {
            sender,
            recver,
            count: 0,
        }
    }
}

impl Worker for Consumer {
    fn on_update(&mut self) -> ControlFlow {
        let val = self.recver.recv().unwrap();

        self.count += val;
        ControlFlow::Continue
    }
}

impl Register for Consumer {
    type Endpoint = Sender<u8>;

    fn register(&mut self, other: &mut impl Connect<Self>) {
        other.on_connection(self.sender.clone())
    }
}

let mut consumer = Consumer::new();
let mut prod1 = Producer::default();
let mut prod2 = Producer::default();
let mut prod3 = Producer::default();

// Let's connect everything
consumer.register(&mut prod1);
consumer.register(&mut prod2);
consumer.register(&mut prod3);

// Launch the workers in a runtime
let mut runtime = Runtime::new();
runtime.launch(consumer);
runtime.launch(prod1);
runtime.launch(prod2);
runtime.launch(prod3);

§Timers

This crate re-exports all types from the minuteurs crate and implements the Worker and Register traits on the Timer type, allowing it to be used in runtimes.

Requires the timing feature.

use employees::{Runtime, Worker, ControlFlow, Error, Register, Connect, Context};
use employees::minuteurs::{Timer, Watcher};

// A worker that prints "Hello!" each times the timer ticks...
struct WorkerThatPrints {
    watcher: Watcher,
}

impl Worker for WorkerThatPrints {
    fn on_update(&mut self) -> ControlFlow {
        if self.watcher.has_ticked() {
            println!("Hello!");
        }

        ControlFlow::Continue
    }
}

// ... and its context.
struct WorkerThatPrintsContext {
    watcher: Option<Watcher>,
}

impl WorkerThatPrintsContext {
    fn new() -> Self {
        Self { watcher: None }
    }
}

impl Context for WorkerThatPrintsContext {
    type Target = WorkerThatPrints;
    fn into_worker(self) -> Result<Self::Target, Error> {
        Ok(WorkerThatPrints { watcher: self.watcher.expect("the context must be connected") })
    }
}

impl Connect<Timer> for WorkerThatPrintsContext {
    fn on_connection(&mut self, endpoint: Watcher) {
        let _ = self.watcher.insert(endpoint);
    }
}

let mut runtime = Runtime::new();

// Set a timer that ticks every 100ms
let mut timer = Timer::new(Duration::from_millis(100));
let mut worker_ctx = WorkerThatPrintsContext::new();

// Connect the worker (or rather, its context) to the timer
timer.register(&mut worker_ctx);

runtime.launch(timer).expect("failed to launch the timer");
runtime.launch_from_context(worker_ctx).expect("failed to launch the worker");

Structs§

Enums§

Traits§

  • A type implementing this trait can be connected to some Register.
  • Allow building a worker before actually launching it with the Runtime::launch_from_context function.
  • A type that implements this trait can open an communication channel between the type and some other type implements the Connect trait of the type.
  • This trait is very similar to the Context but is needed for workers to be respawned.
  • A worker represents a thread that runs for the lifetime of the program (or at least for a very long time).