use core::{
any::TypeId,
sync::atomic::{AtomicUsize, Ordering},
};
use crossbus::prelude::*;
const ACTOR_NUM: usize = 6;
const MSG_NUM: usize = 7;
struct Station {
id: ActorId,
pass: usize,
send_to: Option<Sender<Msg>>,
current: Option<Sender<Msg>>,
}
#[derive(Debug, Message)]
struct Msg(usize);
impl Actor for Station {
type Message = Msg;
fn create(_: &mut Context<Self>) -> Self {
Self {
id: 0,
pass: 0,
send_to: None,
current: None,
}
}
fn action(&mut self, mut msg: Self::Message, ctx: &mut Context<Self>) {
static STARTER: AtomicUsize = AtomicUsize::new(0);
if self.id == 0 {
self.id = ctx.id();
}
let len = Register::as_ref()
.iter()
.filter(|en| en.type_id() == TypeId::of::<Self>())
.collect::<Vec<_>>()
.len();
log::debug!("current number of actors: {}", len);
if len < ACTOR_NUM {
let new_addr = Station::start();
self.send_to = Some(new_addr.0.sender());
log::info!("create a new actor {} by actor {}", new_addr.1, self.id);
}
if len == 1 && self.pass == 0 {
STARTER.store(self.id, Ordering::Relaxed);
log::info!("mark the first actor");
} else if len == ACTOR_NUM && self.send_to.is_none() && self.pass == 0 {
let starter_id = STARTER.load(Ordering::Relaxed);
let act = Register::as_ref()
.iter()
.find(|en| en.downcast_ref::<Self>().unwrap().id == starter_id)
.and_then(|en| en.downcast_ref::<Self>());
assert!(act.is_some());
self.send_to = act.unwrap().current.clone();
log::info!("mark the last actor");
}
msg.0 += 1;
self.pass += 1;
self.current = Some(ctx.sender());
if self.pass < MSG_NUM {
let sender = self.send_to.as_ref().unwrap();
log::info!(
"actor: {} {}-th time send msg: {:?} ",
self.id,
self.pass,
msg,
);
sender.send(msg).unwrap();
} else if self.pass == MSG_NUM {
let sender = self.send_to.take().unwrap();
if msg.0 != MSG_NUM * ACTOR_NUM {
log::info!(
"last round, actor: {} {}-th time send msg: {:?} ",
self.id,
self.pass,
msg,
);
sender.send(msg).unwrap();
} else {
log::info!(
"last round and last time, actor: {} {}-th time send msg: {:?} ",
self.id,
self.pass,
msg,
);
self.send_to = None;
}
log::info!("dorpping sender to exit actor {}", self.id);
self.current = None;
assert!(self.send_to.is_none());
assert!(self.current.is_none());
}
}
fn stopped(&mut self, _: &mut Context<Self>) {
log::info!("actor {} stopped, pass: {}, ", self.id, self.pass);
}
}
#[cfg(feature = "async-std")]
#[crossbus::main(runtime = async-std)]
async fn main() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
let (addr, _) = Station::start();
let sender = addr.sender();
sender.send(Msg(0)).unwrap();
}
#[cfg(feature = "tokio")]
#[crossbus::main(runtime = tokio)]
async fn main() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
let (addr, _) = Station::start();
let sender = addr.sender();
sender.send(Msg(0)).unwrap();
}
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
fn main() {}