Module coerce::actor

source ·
Expand description

Coerce Actor Runtime

An Actor, at a high level, is a primitive that describes a unit of computation. Internally, it can have mutable state, receive messages and perform actions. The only way to communicate with an actor is by delivering messages to its mailbox.

Actors can only process one message at a time, this can be useful because it can alleviate the need for thread synchronisation, usually achieved by locking (using Mutex, RwLock etc).

How is this achieved in Coerce?

Under the hood, Coerce uses tokio’s mpsc and oneshot channels for message communication. When an actor is created, an mpsc channel is created, used as the Actor’s mailbox. A task is spawned, listening for messages from a receiver, handling and emitting the result of the message to the sender’s oneshot channel (if applicable).

Every reference (LocalActorRef) holds a Sender where Actor: Handler<M>, which can be cheaply cloned.

Whilst message handlers are async, the actor will always wait until for handler completion before moving onto subsequent messages in the mailbox. If the actor needs to defer work and return a result faster, an asynchronous task should be spawned.

General lifecycle of an Actor:

  1. ActorContext is created
  2. Actor started (Actor::started)
  3. Actor begins processing messages
  4. Actor reference registered with the ActorScheduler (if actor is ActorType::Tracked)
  5. Actor reference sent back to the creator
  6. Stop request is received, or system is shutting down
  7. Actor stopping (Actor::stopped)
  8. Actor reference deregistered from the ActorScheduler (if actor is ActorType::Tracked)

Actor Example

The below example demonstrates how to create an actor that spawns {n} child actors, waits for them to do work and then finally stops once completed.

 use coerce::actor::{ActorId, IntoActor, IntoActorId, Actor};
 use coerce::actor::context::ActorContext;
 use coerce::actor::message::{Message, Handler};
 use coerce::actor::system::ActorSystem;

 use async_trait::async_trait;
 use tokio::sync::oneshot::{channel, Sender};

 struct ParentActor {
    child_count: usize,
    completed_actors: usize,
    on_work_completed: Option<Sender<usize>>,
 }

 struct ChildActor;

 #[tokio::main]
 pub async fn main() {
    let system = ActorSystem::new();
    let (tx, rx) = channel();

    const TOTAL_CHILD_ACTORS: usize = 10;

    let actor = ParentActor {
            child_count: TOTAL_CHILD_ACTORS,
            completed_actors: 0,
            on_work_completed: Some(tx),
         }
        .into_actor(Some("parent"), &system)
        .await
        .unwrap();

    let completed_actors = rx.await.ok();
    assert_eq!(completed_actors, Some(TOTAL_CHILD_ACTORS))
 }

 #[async_trait]
 impl Actor for ParentActor {
    async fn started(&mut self, ctx: &mut ActorContext) {
        for i in 0..self.child_count {
            ctx.spawn(format!("child-{}", i).into_actor_id(), ChildActor).await.unwrap();
        }
    }

    async fn on_child_stopped(&mut self, id: &ActorId, ctx: &mut ActorContext) {
        println!("child actor (id={}) stopped", ctx.id());

        self.completed_actors += 1;

        if ctx.supervised_count() == 0 && self.completed_actors == self.child_count {
            println!("all child actors finished, stopping ParentActor");

            if let Some(on_work_completed) = self.on_work_completed.take() {
                let _ = on_work_completed.send(self.completed_actors);
            }

            ctx.stop(None);
        }
    }
 }

 struct Finished;

 impl Message for Finished {
    type Result = ();
 }

 #[async_trait]
 impl Actor for ChildActor {
    async fn started(&mut self, ctx: &mut ActorContext) {
        println!("child actor (id={}) running", ctx.id());

        // simulate some work that takes 5 milliseconds
        let _ = self.actor_ref(ctx)
                    .scheduled_notify(Finished, std::time::Duration::from_millis(5));
    }

    async fn stopped(&mut self, ctx: &mut ActorContext) {
        println!("child actor (id={}) finished", ctx.id());
    }
 }

 #[async_trait]
 impl Handler<Finished> for ChildActor {
     async fn handle(&mut self, message: Finished, ctx: &mut ActorContext)  {
        ctx.stop(None);
     }
 }

Re-exports

Modules

Structs

Enums

Traits

Functions

Type Aliases

  • A reference to a string-based ActorId
  • A reference to a string-based ActorPath
  • A reference to a string-based ActorTag