Skip to main content

Module factory

Module factory 

Source
Expand description

§Factory actors

A factory is a manager of a pool of workers on the same node. This is helpful for job dispatch and load balancing when single-threaded execution of a single crate::Actor may not be sufficient. Factories have a set “Job” syntax which denotes a key and message payload for each action. Workers are effectively mindless agents of the factory’s will.

§Worker message routing mode

The factory has a series of dispatch modes which are defined in the routing module and control the way the factory dispatches work to workers. This should be selected based on the intended workload. Some general guidance:

  1. If you need to process a sequence of operations on a given key (i.e. the Job is a user, and there’s a sequential list of updates to that user). You then want the job to land on the same worker and should select routing::KeyPersistentRouting or routing::StickyQueuerRouting.
  2. If you don’t need a sequence of operations then routing::QueuerRouting is likely a good choice.
  3. If your workers are making remote calls to other services/actors you probably want routing::QueuerRouting or routing::StickyQueuerRouting to prevent head-of-the-line contention. Otherwise routing::KeyPersistentRouting is sufficient.
  4. For some custom defined routing, you can define your own routing::CustomHashFunction which will be used in conjunction with routing::CustomRouting to take the incoming job key and the space which should be hashed to (i.e. the number of workers).
  5. If you just want load balancing there’s also routing::RoundRobinRouting for general 1-off dispatching of jobs

§Factory queueing

The factory can also support factory-side or worker-side queueing of extra work messages based on the definition of the routing::Router and queues::Queue assigned to the factory.

Supported queueing protocols today for factory-side queueing is

  1. Default, no-priority, queueing: queues::DefaultQueue
  2. Priority-based queuing, based on a constant number of priorities queues::PriorityQueue

§Worker lifecycle

A worker’s lifecycle is managed by the factory. If the worker dies or crashes, the factory will replace the worker with a new instance and continue processing jobs for that worker. The factory also maintains the worker’s message queue’s so messages won’t be lost which were in the “worker”’s queue.

§Example Factory

use ractor::concurrency::Duration;
use ractor::factory::*;
use ractor::Actor;
use ractor::ActorProcessingErr;
use ractor::ActorRef;
use ractor::RpcReplyPort;

#[derive(Debug)]
enum ExampleMessage {
    PrintValue(u64),
    EchoValue(u64, RpcReplyPort<u64>),
}

#[cfg(feature = "cluster")]
impl ractor::Message for ExampleMessage {}

/// The worker's specification for the factory. This defines
/// the business logic for each message that will be done in parallel.
struct ExampleWorker;
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Worker for ExampleWorker {
    type Key = ();
    type Message = ExampleMessage;
    type State = ();
    type Arguments = ();
    async fn pre_start(
        &self,
        wid: WorkerId,
        factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
        startup_context: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(startup_context)
    }
    async fn handle(
        &self,
        wid: WorkerId,
        factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
        Job { msg, key, .. }: Job<(), ExampleMessage>,
        _state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        // Actual business logic that we want to parallelize
        tracing::trace!("Worker {} received {:?}", wid, msg);
        match msg {
            ExampleMessage::PrintValue(value) => {
                tracing::info!("Worker {} printing value {value}", wid);
            }
            ExampleMessage::EchoValue(value, reply) => {
                tracing::info!("Worker {} echoing value {value}", wid);
                let _ = reply.send(value);
            }
        }
        Ok(key)
    }
}
/// Used by the factory to build new [ExampleWorker]s.
struct ExampleWorkerBuilder;
impl WorkerBuilder<ExampleWorker, ()> for ExampleWorkerBuilder {
    fn build(&mut self, _wid: usize) -> (ExampleWorker, ()) {
        (ExampleWorker, ())
    }
}
#[tokio::main]
async fn main() {
    let factory_def = Factory::<
        (),
        ExampleMessage,
        (),
        ExampleWorker,
        routing::QueuerRouting<(), ExampleMessage>,
        queues::DefaultQueue<(), ExampleMessage>,
    >::default();
    let factory_args = FactoryArguments::builder()
        .worker_builder(Box::new(ExampleWorkerBuilder))
        .queue(Default::default())
        .router(Default::default())
        .num_initial_workers(5)
        .build();

    let (factory, handle) = Actor::spawn(None, factory_def, factory_args)
        .await
        .expect("Failed to startup factory");
    for i in 0..99 {
        factory
            .cast(FactoryMessage::Dispatch(Job {
                key: (),
                msg: ExampleMessage::PrintValue(i),
                options: JobOptions::default(),
                accepted: None,
            }))
            .expect("Failed to send to factory");
    }
    let reply = factory
        .call(
            |prt| {
                FactoryMessage::Dispatch(Job {
                    key: (),
                    msg: ExampleMessage::EchoValue(123, prt),
                    options: JobOptions::default(),
                    accepted: None,
                })
            },
            None,
        )
        .await
        .expect("Failed to send to factory")
        .expect("Failed to parse reply");
    assert_eq!(reply, 123);
    factory.stop(None);
    handle.await.unwrap();
}

Modules§

discard
Discard handler managing when jobs are discarded
factoryimpl
Factory definition
hash
Factory’s defaulting routing hash mechanism. Hashes a super::JobKey to a finite space
job
Specification for a Job sent to a factory
lifecycle
Lifecycle hooks support interjecting external logic into the factory’s lifecycle (startup/shutdown/etc) such that users can intercept and adjust factory functionality at key interjection points.
queues
Queue implementations for Factories
ratelim
Rate limiting protocols for factory routers
routing
Routing protocols for Factories
stats
Statistics management + collection for factories
worker
Factory worker properties

Structs§

DeadMansSwitchConfiguration
The configuration for the dead-man’s switch functionality
Factory
Factory definition.
FactoryArguments
Arguments for configuring and starting a Factory actor instance.
FactoryArgumentsBuilder
Use builder syntax to set the required parameters and finish by calling the method Self::build().
Job
Represents a job sent to a factory
JobOptions
Represents options for the specified job
LeakyBucketRateLimiter
A basic leaky-bucket rate limiter. This is a synchronous implementation with no interior locking since it’s only used by the RateLimitedRouter uniquely and doesn’t share its state
RateLimitedRouter
A generic struct which wraps the message router and adds support for a rate-limiting implementation to rate limit jobs processed by the factory. This handles the plubming around wrapping a rate limited message router
RetriableMessage
A retriable message is a job message which will automatically be resubmitted to the factory in the event of a factory worker dropping the message due to failure (panic or unhandled error). This wraps the inner message in a struct which captures the drop, and if there’s still some retries left, will reschedule the work to the factory with captured state information.
UpdateSettingsRequest
The settings to change for an update request to the factory at runtime.
UpdateSettingsRequestBuilder
Use builder syntax to set the required parameters and finish by calling the method Self::build().
WorkerProperties
Properties of a worker
WorkerStartContext
Startup context data (Arguments) which are passed to a worker on start

Enums§

DiscardMode
The discard mode of a factory
DiscardReason
Reason for discarding a job
DiscardSettings
If a factory supports job discarding (loadshedding) it can have a few configurations which are defined in this enum. There is
FactoryMessage
Messages to a factory.
MessageRetryStrategy
The retry strategy for a RetriableMessage.
WorkerMessage
Message to a worker

Traits§

DiscardHandler
Trait defining the discard handler for a factory.
DynamicDiscardController
Controls the dynamic concurrency level by receiving periodic snapshots of job statistics and emitting a new concurrency limit
FactoryLifecycleHooks
Hooks for crate::factory::Factory lifecycle events based on the underlying actor’s lifecycle.
JobKey
Represents a key to a job. Needs to be hashable for routing properties
RateLimiter
A basic trait which allows controlling rate limiting of message routing
Worker
A factory worker trait, which is a basic wrapper around actor logic, with predefined type information specific to workers
WorkerBuilder
The super::Factory is responsible for spawning workers and re-spawning workers under failure scenarios. This means that it needs to understand how to build workers. The WorkerBuilder trait is used by the factory to construct new workers when needed.
WorkerCapacityController
Controls the size of the worker pool by dynamically growing/shrinking the pool to requested size

Type Aliases§

WorkerId
Unique identifier of a disctinct worker in the factory