Expand description
Coerce Actor Runtime
An Actor
, at a high level, is a primitive that describes a unit of computation.
Internally, it can have mutable state, receive messages and perform actions. The only way
to communicate with an actor is by delivering messages to its mailbox.
Actors can only process one message at a time, this can be useful because it can alleviate the need for thread synchronisation, usually achieved by locking (using Mutex, RwLock etc).
How is this achieved in Coerce?
Under the hood, Coerce uses tokio’s mpsc and oneshot channels for message communication.
When an actor is created, an mpsc channel is created, used as the Actor
’s mailbox.
A task is spawned, listening for messages from a receiver, handling and emitting the result
of the message to the sender’s oneshot channel (if applicable).
Every reference (LocalActorRef
) holds a SenderActor
: Handler<M>
,
which can be cheaply cloned.
Whilst message handlers are async
, the actor will always wait until for handler completion
before moving onto subsequent messages in the mailbox. If the actor needs to defer work and
return a result faster, an asynchronous task should be spawned.
General lifecycle of an Actor
:
ActorContext
is created- Actor started (
Actor::started
) - Actor begins processing messages
- Actor reference registered with the
ActorScheduler
(if actor isActorType::Tracked
) - Actor reference sent back to the creator
- Stop request is received, or system is shutting down
- Actor stopping (
Actor::stopped
) - Actor reference deregistered from the
ActorScheduler
(if actor isActorType::Tracked
)
Actor Example
The below example demonstrates how to create an actor that spawns {n} child actors, waits for them to do work and then finally stops once completed.
use coerce::actor::{ActorId, IntoActor, IntoActorId, Actor};
use coerce::actor::context::ActorContext;
use coerce::actor::message::{Message, Handler};
use coerce::actor::system::ActorSystem;
use async_trait::async_trait;
use tokio::sync::oneshot::{channel, Sender};
struct ParentActor {
child_count: usize,
completed_actors: usize,
on_work_completed: Option<Sender<usize>>,
}
struct ChildActor;
#[tokio::main]
pub async fn main() {
let system = ActorSystem::new();
let (tx, rx) = channel();
const TOTAL_CHILD_ACTORS: usize = 10;
let actor = ParentActor {
child_count: TOTAL_CHILD_ACTORS,
completed_actors: 0,
on_work_completed: Some(tx),
}
.into_actor(Some("parent"), &system)
.await
.unwrap();
let completed_actors = rx.await.ok();
assert_eq!(completed_actors, Some(TOTAL_CHILD_ACTORS))
}
#[async_trait]
impl Actor for ParentActor {
async fn started(&mut self, ctx: &mut ActorContext) {
for i in 0..self.child_count {
ctx.spawn(format!("child-{}", i).into_actor_id(), ChildActor).await.unwrap();
}
}
async fn on_child_stopped(&mut self, id: &ActorId, ctx: &mut ActorContext) {
println!("child actor (id={}) stopped", ctx.id());
self.completed_actors += 1;
if ctx.supervised_count() == 0 && self.completed_actors == self.child_count {
println!("all child actors finished, stopping ParentActor");
if let Some(on_work_completed) = self.on_work_completed.take() {
let _ = on_work_completed.send(self.completed_actors);
}
ctx.stop(None);
}
}
}
struct Finished;
impl Message for Finished {
type Result = ();
}
#[async_trait]
impl Actor for ChildActor {
async fn started(&mut self, ctx: &mut ActorContext) {
println!("child actor (id={}) running", ctx.id());
// simulate some work that takes 5 milliseconds
let _ = self.actor_ref(ctx)
.scheduled_notify(Finished, std::time::Duration::from_millis(5));
}
async fn stopped(&mut self, ctx: &mut ActorContext) {
println!("child actor (id={}) finished", ctx.id());
}
}
#[async_trait]
impl Handler<Finished> for ChildActor {
async fn handle(&mut self, message: Finished, ctx: &mut ActorContext) {
ctx.stop(None);
}
}
Re-exports
pub use refs::*;
Modules
- Blocking Actor creation and communication APIs
- Actor Context
- Interrogate the
ActorSystem
by usingDescribe
to analyse the actor hierarchy - Actor lifecycle and
ActorLoop
implementation - Actor Messaging primitives
- Actor Metrics
- Actor Scheduling and
ActorRef
registry - Actor supervision and child spawning
- Actor System
- Actor watching allows one actor to watch another, ensuring that when the subject stops, the the watching actor will be notified immediately.
- Workers are a collection of actors that can work as a pool of
Worker
actors, distributing incoming tasks amongst them
Structs
- Allows type-omission of
Actor
types but still allowing statically-typed message transmission - A handle to a scheduled notification, which can be created via
scheduled_notify
.
Enums
- The error returned when a factory has failed to create the requested actor.
- An
Actor
can return self-defined tags, which can be retrieved by sending aDescribe
to the targetActor
or by callingdescribe()
on the targetActor
’s reference.
Traits
- Actor definition, with specific lifecycle hooks.
- Trait that defines how a specific
Actor
implementation can be created, allowing things like distributed sharding and remoting to initialiseActor
s from a pre-definedActorRecipe
. - Trait that defines the arguments used to initialise specific
Actor
implementations - Trait allowing the creation of an
Actor
directly from itself - Trait allowing the conversion of a type into
ActorId
, by consuming the input - Trait allowing the conversion of a type into
ActorPath
, by consuming the input - Trait allowing the creation of a supervised
Actor
directly from itself - Trait allowing the type-omission of
Actor
types but still allowing statically-typed message transmission - Trait helping us to clone
MessageReceiver
s trait-objects. - Trait allowing the conversion of a type into
ActorId
, by borrowing the input
Functions
- Gets an actor reference from the global
ActorSystem
- Creates an actor using the global
ActorSystem
- Creates a new random
ActorId
Type Aliases
- A reference to a string-based
ActorId
- A reference to a string-based
ActorPath
- A reference to a string-based
ActorTag