Expand description
§Factory actors
A factory is a manager of a pool of workers on the same node. This is helpful for job dispatch and load balancing when single-threaded execution of a single crate::Actor may not be sufficient. Factories have a set “Job” syntax which denotes a key and message payload for each action. Workers are effectively mindless agents of the factory’s will.
§Worker message routing mode
The factory has a series of dispatch modes which are defined in the routing module and control the way the factory dispatches work to workers. This should be selected based on the intended workload. Some general guidance:
- If you need to process a sequence of operations on a given key (i.e. the Job is a user, and there’s a sequential list of updates to that user). You then want the job to land on the same worker and should select routing::KeyPersistentRouting or routing::StickyQueuerRouting.
- If you don’t need a sequence of operations then routing::QueuerRouting is likely a good choice.
- If your workers are making remote calls to other services/actors you probably want routing::QueuerRouting or routing::StickyQueuerRouting to prevent head-of-the-line contention. Otherwise routing::KeyPersistentRouting is sufficient.
- For some custom defined routing, you can define your own routing::CustomHashFunction which will be used in conjunction with routing::CustomRouting to take the incoming job key and the space which should be hashed to (i.e. the number of workers).
- If you just want load balancing there’s also routing::RoundRobinRouting for general 1-off dispatching of jobs
§Factory queueing
The factory can also support factory-side or worker-side queueing of extra work messages based on the definition of the routing::Router and queues::Queue assigned to the factory.
Supported queueing protocols today for factory-side queueing is
- Default, no-priority, queueing: queues::DefaultQueue
- Priority-based queuing, based on a constant number of priorities queues::PriorityQueue
§Worker lifecycle
A worker’s lifecycle is managed by the factory. If the worker dies or crashes, the factory will replace the worker with a new instance and continue processing jobs for that worker. The factory also maintains the worker’s message queue’s so messages won’t be lost which were in the “worker”’s queue.
§Example Factory
use ractor::concurrency::Duration;
use ractor::factory::*;
use ractor::Actor;
use ractor::ActorProcessingErr;
use ractor::ActorRef;
use ractor::RpcReplyPort;
#[derive(Debug)]
enum ExampleMessage {
PrintValue(u64),
EchoValue(u64, RpcReplyPort<u64>),
}
#[cfg(feature = "cluster")]
impl ractor::Message for ExampleMessage {}
/// The worker's specification for the factory. This defines
/// the business logic for each message that will be done in parallel.
struct ExampleWorker;
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Worker for ExampleWorker {
type Key = ();
type Message = ExampleMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}
async fn handle(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
Job { msg, key, .. }: Job<(), ExampleMessage>,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
// Actual business logic that we want to parallelize
tracing::trace!("Worker {} received {:?}", wid, msg);
match msg {
ExampleMessage::PrintValue(value) => {
tracing::info!("Worker {} printing value {value}", wid);
}
ExampleMessage::EchoValue(value, reply) => {
tracing::info!("Worker {} echoing value {value}", wid);
let _ = reply.send(value);
}
}
Ok(key)
}
}
/// Used by the factory to build new [ExampleWorker]s.
struct ExampleWorkerBuilder;
impl WorkerBuilder<ExampleWorker, ()> for ExampleWorkerBuilder {
fn build(&mut self, _wid: usize) -> (ExampleWorker, ()) {
(ExampleWorker, ())
}
}
#[tokio::main]
async fn main() {
let factory_def = Factory::<
(),
ExampleMessage,
(),
ExampleWorker,
routing::QueuerRouting<(), ExampleMessage>,
queues::DefaultQueue<(), ExampleMessage>,
>::default();
let factory_args = FactoryArguments::builder()
.worker_builder(Box::new(ExampleWorkerBuilder))
.queue(Default::default())
.router(Default::default())
.num_initial_workers(5)
.build();
let (factory, handle) = Actor::spawn(None, factory_def, factory_args)
.await
.expect("Failed to startup factory");
for i in 0..99 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::PrintValue(i),
options: JobOptions::default(),
accepted: None,
}))
.expect("Failed to send to factory");
}
let reply = factory
.call(
|prt| {
FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::EchoValue(123, prt),
options: JobOptions::default(),
accepted: None,
})
},
None,
)
.await
.expect("Failed to send to factory")
.expect("Failed to parse reply");
assert_eq!(reply, 123);
factory.stop(None);
handle.await.unwrap();
}Modules§
- discard
- Discard handler managing when jobs are discarded
- factoryimpl
- Factory definition
- hash
- Factory’s defaulting routing hash mechanism. Hashes a super::JobKey to a finite space
- job
- Specification for a Job sent to a factory
- lifecycle
- Lifecycle hooks support interjecting external logic into the factory’s lifecycle (startup/shutdown/etc) such that users can intercept and adjust factory functionality at key interjection points.
- queues
- Queue implementations for Factories
- ratelim
- Rate limiting protocols for factory routers
- routing
- Routing protocols for Factories
- stats
- Statistics management + collection for factories
- worker
- Factory worker properties
Structs§
- Dead
Mans Switch Configuration - The configuration for the dead-man’s switch functionality
- Factory
- Factory definition.
- Factory
Arguments - Arguments for configuring and starting a Factory actor instance.
- Factory
Arguments Builder - Use builder syntax to set the required parameters and finish by calling the method
Self::build(). - Job
- Represents a job sent to a factory
- JobOptions
- Represents options for the specified job
- Leaky
Bucket Rate Limiter - A basic leaky-bucket rate limiter. This is a synchronous implementation with no interior locking since it’s only used by the RateLimitedRouter uniquely and doesn’t share its state
- Rate
Limited Router - A generic struct which wraps the message router and adds support for a rate-limiting implementation to rate limit jobs processed by the factory. This handles the plubming around wrapping a rate limited message router
- Retriable
Message - A retriable message is a job message which will automatically be resubmitted to the factory in the event of a factory worker dropping the message due to failure (panic or unhandled error). This wraps the inner message in a struct which captures the drop, and if there’s still some retries left, will reschedule the work to the factory with captured state information.
- Update
Settings Request - The settings to change for an update request to the factory at runtime.
- Update
Settings Request Builder - Use builder syntax to set the required parameters and finish by calling the method
Self::build(). - Worker
Properties - Properties of a worker
- Worker
Start Context - Startup context data (
Arguments) which are passed to a worker on start
Enums§
- Discard
Mode - The discard mode of a factory
- Discard
Reason - Reason for discarding a job
- Discard
Settings - If a factory supports job discarding (loadshedding) it can have a few configurations which are defined in this enum. There is
- Factory
Message - Messages to a factory.
- Message
Retry Strategy - The retry strategy for a RetriableMessage.
- Worker
Message - Message to a worker
Traits§
- Discard
Handler - Trait defining the discard handler for a factory.
- Dynamic
Discard Controller - Controls the dynamic concurrency level by receiving periodic snapshots of job statistics and emitting a new concurrency limit
- Factory
Lifecycle Hooks - Hooks for crate::factory::Factory lifecycle events based on the underlying actor’s lifecycle.
- JobKey
- Represents a key to a job. Needs to be hashable for routing properties
- Rate
Limiter - A basic trait which allows controlling rate limiting of message routing
- Worker
- A factory worker trait, which is a basic wrapper around actor logic, with predefined type information specific to workers
- Worker
Builder - The super::Factory is responsible for spawning workers and re-spawning workers under failure scenarios. This means that it needs to understand how to build workers. The WorkerBuilder trait is used by the factory to construct new workers when needed.
- Worker
Capacity Controller - Controls the size of the worker pool by dynamically growing/shrinking the pool to requested size
Type Aliases§
- Worker
Id - Unique identifier of a disctinct worker in the factory