Module ractor::factory

source ·
Expand description

§Factory actors

A factory is a manager of a pool of workers on the same node. This is helpful for job dispatch and load balancing when single-threaded execution of a since crate::Actor may not be sufficient. Factories have a set “Job” syntax which denotes a key and message payload for each action. Workers are effectively mindless agents of the factory’s will.

§Worker message routing mode

The factory has a series of dispatch modes which are defined in RoutingMode and control the way the factory dispatches work to workers. This should be selected based on the intended workload. Some general guidance:

  1. If you need to process a sequence of operations on a given key (i.e. the Job is a user, and there’s a sequential list of updates to that user). You then want the job to land on the same worker and should select RoutingMode::KeyPersistent or RoutingMode::StickyQueuer.
  2. If you don’t need a sequence of operations then RoutingMode::Queuer is likely a good choice.
  3. If your workers are making remote calls to other services/actors you probably want RoutingMode::Queuer or RoutingMode::StickyQueuer to prevent head-of-the-line contention. Otherwise RoutingMode::KeyPersistent is sufficient.
  4. For some custom defined routing, you can define your own CustomHashFunction which will be used in conjunction with RoutingMode::CustomHashFunction to take the incoming job key and the space which should be hashed to (i.e. the number of workers).
  5. If you just want load balancing there’s also RoutingMode::RoundRobin and RoutingMode::Random for general 1-off dispatching of jobs

§Worker lifecycle

A worker’s lifecycle is managed by the factory. If the worker dies or crashes, the factory will replace the worker with a new instance and continue processing jobs for that worker. The factory also maintains the worker’s message queue’s so messages won’t be lost which were in the “worker”’s queue.

§Example Factory

use ractor::concurrency::Duration;
use ractor::factory::{
    Factory, FactoryMessage, Job, JobOptions, RoutingMode, WorkerBuilder, WorkerMessage,
    WorkerStartContext,
};
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
#[derive(Debug)]
enum ExampleMessage {
    PrintValue(u64),
    EchoValue(u64, RpcReplyPort<u64>),
}
/// The worker's specification for the factory. This defines
/// the business logic for each message that will be done in parallel.
struct ExampleWorker;
#[ractor::async_trait]
impl Actor for ExampleWorker {
    type Msg = WorkerMessage<(), ExampleMessage>;
    type State = WorkerStartContext<(), ExampleMessage>;
    type Arguments = WorkerStartContext<(), ExampleMessage>;
    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        startup_context: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(startup_context)
    }
    async fn handle(
        &self,
        _myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            WorkerMessage::FactoryPing(time) => {
                // This is a message which all factory workers **must**
                // adhere to. It is a background processing message from the
                // factory which is used for (a) metrics and (b) detecting
                // stuck workers, i.e. workers which aren't making progress
                // processing their messages
                state
                    .factory
                    .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
            }
            WorkerMessage::Dispatch(job) => {
                // Actual business logic that we want to parallelize
                tracing::trace!("Worker {} received {:?}", state.wid, job.msg);
                match job.msg {
                    ExampleMessage::PrintValue(value) => {
                        tracing::info!("Worker {} printing value {value}", state.wid);
                    }
                    ExampleMessage::EchoValue(value, reply) => {
                        tracing::info!("Worker {} echoing value {value}", state.wid);
                        let _ = reply.send(value);
                    }
                }
                // job finished, on success or err we report back to the factory
                state
                    .factory
                    .cast(FactoryMessage::Finished(state.wid, job.key))?;
            }
        }
        Ok(())
    }
}
/// Used by the factory to build new [ExampleWorker]s.
struct ExampleWorkerBuilder;
impl WorkerBuilder<ExampleWorker> for ExampleWorkerBuilder {
    fn build(&self, _wid: usize) -> ExampleWorker {
        ExampleWorker
    }
}
#[tokio::main]
async fn main() {
    let factory_def = Factory::<(), ExampleMessage, ExampleWorker> {
        worker_count: 5,
        routing_mode: RoutingMode::<()>::Queuer,
        ..Default::default()
    };
    let (factory, handle) = Actor::spawn(None, factory_def, Box::new(ExampleWorkerBuilder))
        .await
        .expect("Failed to startup factory");
    for i in 0..99 {
        factory
            .cast(FactoryMessage::Dispatch(Job {
                key: (),
                msg: ExampleMessage::PrintValue(i),
                options: JobOptions::default(),
            }))
            .expect("Failed to send to factory");
    }
    let reply = factory
        .call(
            |prt| {
                FactoryMessage::Dispatch(Job {
                    key: (),
                    msg: ExampleMessage::EchoValue(123, prt),
                    options: JobOptions::default(),
                })
            },
            None,
        )
        .await
        .expect("Failed to send to factory")
        .expect("Failed to parse reply");
    assert_eq!(reply, 123);
    factory.stop(None);
    handle.await.unwrap();
}

Re-exports§

Modules§

  • Factory’s defaulting routing hash mechanism. Hashes a super::JobKey to a finite space
  • Specification for a Job sent to a factory
  • Factory routing mode
  • Statistics management + collection for factories
  • Factory worker properties

Structs§

Enums§

Traits§

Type Aliases§

  • Identifier for a worker in a factory