coerce 0.8.11

Async actor runtime and distributed systems framework
Documentation
use coerce::actor::context::ActorContext;
use coerce::actor::describe::Describe;
use coerce::actor::message::{Handler, Message};
use coerce::actor::system::ActorSystem;
use coerce::actor::{Actor, ActorId, CoreActorRef, IntoActor, IntoActorId};

use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::timeout;

pub mod util;

#[macro_use]
extern crate tracing;

#[macro_use]
extern crate async_trait;

#[macro_use]
extern crate serde;

struct TestSupervisor {
    count: usize,
    max_depth: usize,
    all_child_actors_stopped: Option<oneshot::Sender<ActorId>>,
}

struct StopAll;

impl Message for StopAll {
    type Result = ();
}

#[async_trait]
impl Actor for TestSupervisor {
    async fn started(&mut self, ctx: &mut ActorContext) {
        for i in 0..self.count {
            let _child = ctx
                .spawn(
                    format!("spawned-{i}").into_actor_id(),
                    SpawnedActor {
                        depth: 1,
                        max_depth: self.max_depth,
                    },
                )
                .await
                .unwrap();
        }
    }

    async fn on_child_stopped(&mut self, id: &ActorId, ctx: &mut ActorContext) {
        info!("child stopped (id={})", &id);

        if ctx.supervised_count() == 0 {
            if let Some(cb) = self.all_child_actors_stopped.take() {
                let _ = cb.send(id.clone());
            }
        }
    }
}

#[async_trait]
impl Handler<StopAll> for TestSupervisor {
    async fn handle(&mut self, _: StopAll, ctx: &mut ActorContext) {
        if let Some(supervised) = ctx.supervised() {
            for child in &supervised.children {
                let _ = child.1.actor_ref().stop().await;
            }
        }
    }
}

struct SpawnedActor {
    depth: usize,
    max_depth: usize,
}

#[async_trait]
impl Actor for SpawnedActor {
    async fn started(&mut self, ctx: &mut ActorContext) {
        if self.depth > self.max_depth {
            return;
        }

        let _child = ctx
            .spawn(
                format!("spawned-{}", self.depth).into_actor_id(),
                SpawnedActor {
                    depth: self.depth + 1,
                    max_depth: self.max_depth,
                },
            )
            .await
            .unwrap();
    }
}

#[tokio::test]
pub async fn test_actor_child_spawn_and_stop() {
    util::create_trace_logger();

    const EXPECTED_ACTOR_COUNT: usize = 10;
    const EXPECTED_DEPTH: usize = 10;
    let system = ActorSystem::new();
    let actor_id = "actor".to_string();

    let (all_child_actors_stopped, on_all_child_actors_stopped) = oneshot::channel();

    let test_actor = TestSupervisor {
        count: EXPECTED_ACTOR_COUNT,
        max_depth: EXPECTED_DEPTH,
        all_child_actors_stopped: Some(all_child_actors_stopped),
    }
    .into_actor(Some(actor_id), &system)
    .await
    .expect("create actor");

    let (tx, rx) = oneshot::channel();
    let _ = test_actor.describe(Describe {
        sender: Some(tx),
        current_depth: 0,
        ..Default::default()
    });

    let actor_description = rx.await.unwrap();
    info!("{:#?}", actor_description);

    let actor_count = actor_description
        .supervised
        .map_or(0, |s| s.actors.iter().filter(|n| n.is_ok()).count());

    assert_eq!(EXPECTED_ACTOR_COUNT, actor_count);

    let _ = test_actor.notify(StopAll);

    timeout(Duration::from_secs(5), async move {
        on_all_child_actors_stopped.await
    })
    .await
    .expect("parent didn't receive the child-terminated notification");

    let (tx, rx) = oneshot::channel();
    let _ = test_actor.describe(Describe {
        sender: Some(tx),
        current_depth: 0,
        ..Default::default()
    });

    let x = rx.await;
    info!("{:#?}", x);

    let actor_count = x
        .unwrap()
        .supervised
        .map_or(0, |s| s.actors.iter().filter(|n| n.is_ok()).count());

    assert_eq!(0, actor_count);

    system.shutdown().await;
}