Expand description
§Factory actors
A factory is a manager of a pool of workers on the same node. This is helpful for job dispatch and load balancing when single-threaded execution of a since crate::Actor may not be sufficient. Factories have a set “Job” syntax which denotes a key and message payload for each action. Workers are effectively mindless agents of the factory’s will.
§Worker message routing mode
The factory has a series of dispatch modes which are defined in RoutingMode and control the way the factory dispatches work to workers. This should be selected based on the intended workload. Some general guidance:
- If you need to process a sequence of operations on a given key (i.e. the Job is a user, and there’s a sequential list of updates to that user). You then want the job to land on the same worker and should select RoutingMode::KeyPersistent or RoutingMode::StickyQueuer.
- If you don’t need a sequence of operations then RoutingMode::Queuer is likely a good choice.
- If your workers are making remote calls to other services/actors you probably want RoutingMode::Queuer or RoutingMode::StickyQueuer to prevent head-of-the-line contention. Otherwise RoutingMode::KeyPersistent is sufficient.
- For some custom defined routing, you can define your own CustomHashFunction which will be used in conjunction with RoutingMode::CustomHashFunction to take the incoming job key and the space which should be hashed to (i.e. the number of workers).
- If you just want load balancing there’s also RoutingMode::RoundRobin and RoutingMode::Random for general 1-off dispatching of jobs
§Worker lifecycle
A worker’s lifecycle is managed by the factory. If the worker dies or crashes, the factory will replace the worker with a new instance and continue processing jobs for that worker. The factory also maintains the worker’s message queue’s so messages won’t be lost which were in the “worker”’s queue.
§Example Factory
use ractor::concurrency::Duration;
use ractor::factory::{
Factory, FactoryMessage, Job, JobOptions, RoutingMode, WorkerBuilder, WorkerMessage,
WorkerStartContext,
};
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
#[derive(Debug)]
enum ExampleMessage {
PrintValue(u64),
EchoValue(u64, RpcReplyPort<u64>),
}
/// The worker's specification for the factory. This defines
/// the business logic for each message that will be done in parallel.
struct ExampleWorker;
#[ractor::async_trait]
impl Actor for ExampleWorker {
type Msg = WorkerMessage<(), ExampleMessage>;
type State = WorkerStartContext<(), ExampleMessage>;
type Arguments = WorkerStartContext<(), ExampleMessage>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
// This is a message which all factory workers **must**
// adhere to. It is a background processing message from the
// factory which is used for (a) metrics and (b) detecting
// stuck workers, i.e. workers which aren't making progress
// processing their messages
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
// Actual business logic that we want to parallelize
tracing::trace!("Worker {} received {:?}", state.wid, job.msg);
match job.msg {
ExampleMessage::PrintValue(value) => {
tracing::info!("Worker {} printing value {value}", state.wid);
}
ExampleMessage::EchoValue(value, reply) => {
tracing::info!("Worker {} echoing value {value}", state.wid);
let _ = reply.send(value);
}
}
// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
}
Ok(())
}
}
/// Used by the factory to build new [ExampleWorker]s.
struct ExampleWorkerBuilder;
impl WorkerBuilder<ExampleWorker> for ExampleWorkerBuilder {
fn build(&self, _wid: usize) -> ExampleWorker {
ExampleWorker
}
}
#[tokio::main]
async fn main() {
let factory_def = Factory::<(), ExampleMessage, ExampleWorker> {
worker_count: 5,
routing_mode: RoutingMode::<()>::Queuer,
..Default::default()
};
let (factory, handle) = Actor::spawn(None, factory_def, Box::new(ExampleWorkerBuilder))
.await
.expect("Failed to startup factory");
for i in 0..99 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::PrintValue(i),
options: JobOptions::default(),
}))
.expect("Failed to send to factory");
}
let reply = factory
.call(
|prt| {
FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::EchoValue(123, prt),
options: JobOptions::default(),
})
},
None,
)
.await
.expect("Failed to send to factory")
.expect("Failed to parse reply");
assert_eq!(reply, 123);
factory.stop(None);
handle.await.unwrap();
}
Re-exports§
pub use job::Job;
pub use job::JobKey;
pub use job::JobOptions;
pub use routing_mode::CustomHashFunction;
pub use routing_mode::RoutingMode;
pub use worker::WorkerMessage;
pub use worker::WorkerProperties;
pub use worker::WorkerStartContext;
Modules§
- Factory’s defaulting routing hash mechanism. Hashes a super::JobKey to a finite space
- Specification for a Job sent to a factory
- Factory routing mode
- Statistics management + collection for factories
- Factory worker properties
Structs§
- The configuration for the dead-man’s switch functionality
- A factory is a manager to a pool of worker actors used for job dispatching
- State of a factory (backlogged jobs, handler, etc)
Enums§
- Messages to a factory.
Traits§
- Trait defining the discard handler for a factory.
- Trait defining a builder of workers for a factory
Type Aliases§
- Identifier for a worker in a factory