use crate::message::ActorMessage;
use crate::prelude::*;
use futures::{FutureExt, Stream};
use log::{debug, error, trace, warn};
use secc::*;
use serde::de::Deserializer;
use serde::ser::Serializer;
use serde::{Deserialize, Serialize};
use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::marker::{Send, Sync};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use uuid::Uuid;
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Status {
Done,
Skip,
Reset,
Stop,
}
impl Status {
pub fn done<T>(state: T) -> (T, Status) {
(state, Status::Done)
}
pub fn skip<T>(state: T) -> (T, Status) {
(state, Status::Skip)
}
pub fn reset<T>(state: T) -> (T, Status) {
(state, Status::Reset)
}
pub fn stop<T>(state: T) -> (T, Status) {
(state, Status::Stop)
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AidError {
CantConvertToBincode,
CantConvertFromBincode,
ActorAlreadyStopped,
AidNotLocal,
SendTimedOut(Aid),
UnableToSchedule,
}
impl std::fmt::Display for AidError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for AidError {}
enum ActorSender {
Local {
stopped: AtomicBool,
sender: SeccSender<Message>,
system: ActorSystem,
},
Remote { sender: SeccSender<WireMessage> },
}
impl std::fmt::Debug for ActorSender {
fn fmt(&self, formatter: &'_ mut std::fmt::Formatter) -> std::fmt::Result {
write!(
formatter,
"{}",
match *self {
ActorSender::Local { .. } => "ActorSender::Local",
ActorSender::Remote { .. } => "ActorSender::Remote",
}
)
}
}
struct AidData {
uuid: Uuid,
system_uuid: Uuid,
name: Option<String>,
sender: ActorSender,
}
#[derive(Serialize, Deserialize)]
struct AidSerializedForm {
uuid: Uuid,
system_uuid: Uuid,
name: Option<String>,
}
#[derive(Clone)]
pub struct Aid {
data: Arc<AidData>,
}
impl Serialize for Aid {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let serialized_form = AidSerializedForm {
uuid: self.uuid(),
system_uuid: self.system_uuid(),
name: self.name(),
};
serialized_form.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for Aid {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let serialized_form = AidSerializedForm::deserialize(deserializer)?;
let system = ActorSystem::current();
match system.find_aid_by_uuid(&serialized_form.uuid) {
Some(aid) => Ok(aid.clone()),
None => {
if serialized_form.system_uuid == system.uuid() {
Err(serde::de::Error::custom(format!(
"{:?}:{} system uuid matches but the uuid was not found.",
serialized_form.name, serialized_form.uuid,
)))
} else if let Some(sender) = system.remote_sender(&serialized_form.system_uuid) {
Ok(Aid {
data: Arc::new(AidData {
uuid: serialized_form.uuid,
system_uuid: serialized_form.system_uuid,
name: serialized_form.name,
sender: ActorSender::Remote { sender: sender },
}),
})
} else {
Err(serde::de::Error::custom(format!(
"{:?}:{} Unable to find a connection for remote system.",
serialized_form.name, serialized_form.uuid,
)))
}
}
}
}
}
impl std::cmp::PartialEq for Aid {
fn eq(&self, other: &Self) -> bool {
self.data.uuid == other.data.uuid && self.data.system_uuid == other.data.system_uuid
}
}
impl std::cmp::Eq for Aid {}
impl std::cmp::PartialOrd for Aid {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
use std::cmp::Ordering;
match (&self.data.name, &other.data.name) {
(None, Some(_)) => Some(Ordering::Less),
(Some(_), None) => Some(Ordering::Greater),
(Some(a), Some(b)) if a != b => Some(a.cmp(b)),
(_, _) => {
match self.data.system_uuid.cmp(&other.data.system_uuid) {
Ordering::Equal => Some(self.data.uuid.cmp(&other.data.uuid)),
x => Some(x),
}
}
}
}
}
impl std::cmp::Ord for Aid {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.partial_cmp(other)
.expect("Aid::partial_cmp() returned None; can't happen")
}
}
impl Aid {
pub fn send(&self, message: Message) -> Result<(), AidError> {
match &self.data.sender {
ActorSender::Local {
stopped,
sender,
system,
} => {
if stopped.load(Ordering::Relaxed) {
Err(AidError::ActorAlreadyStopped)
} else {
match sender.send_await_timeout(message, system.config().send_timeout) {
Ok(_) => {
if sender.receivable() == 1 {
system.schedule(self.clone());
};
Ok(())
}
Err(_) => Err(AidError::SendTimedOut(self.clone())),
}
}
}
ActorSender::Remote { sender } => {
sender
.send_await(WireMessage::ActorMessage {
actor_uuid: self.data.uuid,
system_uuid: self.data.system_uuid,
message,
})
.unwrap();
Ok(())
}
}
}
pub fn send_arc<T>(&self, value: Arc<T>) -> Result<(), AidError>
where
T: 'static + ActorMessage,
{
self.send(Message::from_arc(value))
}
pub fn send_new<T>(&self, value: T) -> Result<(), AidError>
where
T: 'static + ActorMessage,
{
self.send(Message::new(value))
}
pub fn send_after(&self, message: Message, duration: Duration) -> Result<(), AidError> {
match &self.data.sender {
ActorSender::Local {
stopped, system, ..
} => {
if stopped.load(Ordering::Relaxed) {
Err(AidError::ActorAlreadyStopped)
} else {
system.send_after(message, self.clone(), duration);
Ok(())
}
}
ActorSender::Remote { sender } => {
if let Err(err) = sender.send_await(WireMessage::DelayedActorMessage {
duration,
actor_uuid: self.data.uuid,
system_uuid: self.data.system_uuid,
message,
}) {
return match err {
SeccErrors::Full(_) | SeccErrors::Empty => Ok(()),
};
}
Ok(())
}
}
}
pub fn send_arc_after<T>(&self, value: Arc<T>, duration: Duration) -> Result<(), AidError>
where
T: 'static + ActorMessage,
{
self.send_after(Message::from_arc(value), duration)
}
pub fn send_new_after<T>(&self, value: T, duration: Duration) -> Result<(), AidError>
where
T: 'static + ActorMessage,
{
self.send_after(Message::new(value), duration)
}
#[inline]
pub fn uuid(&self) -> Uuid {
self.data.uuid.clone()
}
#[inline]
pub fn system_uuid(&self) -> Uuid {
self.data.system_uuid.clone()
}
#[inline]
pub fn name(&self) -> Option<String> {
self.data.name.clone()
}
#[inline]
pub fn name_or_uuid(&self) -> String {
match &self.data.name {
Some(value) => value.to_string(),
None => self.data.uuid.to_string(),
}
}
#[inline]
pub fn is_local(&self) -> bool {
if let ActorSender::Local { .. } = self.data.sender {
true
} else {
false
}
}
pub fn sent(&self) -> Result<usize, AidError> {
match &self.data.sender {
ActorSender::Local { sender, .. } => Ok(sender.sent()),
_ => Err(AidError::AidNotLocal),
}
}
pub fn received(&self) -> Result<usize, AidError> {
match &self.data.sender {
ActorSender::Local { sender, .. } => Ok(sender.received()),
_ => Err(AidError::AidNotLocal),
}
}
pub(crate) fn stop(&self) -> Result<(), AidError> {
match &self.data.sender {
ActorSender::Local { stopped, .. } => {
trace!("Stopping local Actor");
stopped.fetch_or(true, Ordering::AcqRel);
Ok(())
}
_ => Err(AidError::AidNotLocal),
}
}
pub fn ptr_eq(left: &Aid, right: &Aid) -> bool {
Arc::ptr_eq(&left.data, &right.data)
}
}
impl std::fmt::Debug for Aid {
fn fmt(&self, formatter: &'_ mut std::fmt::Formatter) -> std::fmt::Result {
write!(
formatter,
"Aid{{id: {}, system_uuid: {}, name: {:?}, is_local: {}}}",
self.data.uuid.to_string(),
self.data.system_uuid.to_string(),
self.data.name,
self.is_local()
)
}
}
impl std::fmt::Display for Aid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.data.name {
Some(name) => write!(f, "{}:{}", name, self.data.uuid),
None => write!(f, "{}", self.data.uuid),
}
}
}
impl Hash for Aid {
fn hash<H: Hasher>(&self, state: &'_ mut H) {
self.data.uuid.hash(state);
self.data.system_uuid.hash(state);
}
}
#[derive(Clone, Debug)]
pub struct Context {
pub aid: Aid,
pub system: ActorSystem,
}
impl std::fmt::Display for Context {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Context{{aid: {}, system: {}}}",
self.aid.uuid(),
self.system.uuid()
)
}
}
pub trait Processor<S: Send + Sync, R: Future<Output = ActorResult<S>> + Send + 'static>:
(FnMut(S, Context, Message) -> R) + Send + Sync
{
}
impl<F, S, R> Processor<S, R> for F
where
S: Send + Sync,
R: Future<Output = ActorResult<S>> + Send + 'static,
F: (FnMut(S, Context, Message) -> R) + Send + Sync + 'static,
{
}
pub(crate) type HandlerFuture =
Pin<Box<dyn Future<Output = Result<Status, StdError>> + Send + 'static>>;
pub(crate) trait Handler:
(FnMut(Context, Message) -> HandlerFuture) + Send + Sync + 'static
{
}
impl<F> Handler for F where F: (FnMut(Context, Message) -> HandlerFuture) + Send + Sync + 'static {}
pub struct ActorBuilder {
pub(crate) system: ActorSystem,
pub name: Option<String>,
pub channel_size: Option<u16>,
}
impl ActorBuilder {
pub fn with<F, S, R>(self, state: S, processor: F) -> Result<Aid, SystemError>
where
S: Send + Sync + 'static,
R: Future<Output = ActorResult<S>> + Send + 'static,
F: Processor<S, R> + 'static,
{
let (actor, stream) = Actor::new(self.system.clone(), &self, state, processor);
debug!("Actor created: {}", actor.context.aid.uuid());
self.system.register_actor(actor, stream)
}
pub fn name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn channel_size(mut self, size: u16) -> Self {
assert!(size > 0);
self.channel_size = Some(size);
self
}
}
pub(crate) struct ActorStream {
pub context: Context,
receiver: SeccReceiver<Message>,
handler: Box<dyn Handler>,
pending: Option<HandlerFuture>,
stopping: bool,
}
pub(crate) struct Actor {
pub context: Context,
}
#[repr(transparent)]
struct SendSyncPointer<T>(*mut T);
#[repr(transparent)]
struct SendSyncUnsafeCell<T>(UnsafeCell<T>);
unsafe impl<T> Send for SendSyncPointer<T> {}
unsafe impl<T> Sync for SendSyncPointer<T> {}
unsafe impl<T> Send for SendSyncUnsafeCell<T> {}
unsafe impl<T> Sync for SendSyncUnsafeCell<T> {}
impl Actor {
pub(crate) fn new<F, S, R>(
system: ActorSystem,
builder: &ActorBuilder,
state: S,
mut processor: F,
) -> (Arc<Actor>, ActorStream)
where
S: Send + Sync + 'static,
R: Future<Output = ActorResult<S>> + Send + 'static,
F: Processor<S, R> + 'static,
{
let (sender, receiver) = secc::create::<Message>(
builder
.channel_size
.unwrap_or(system.config().message_channel_size),
Duration::from_millis(10),
);
let aid = Aid {
data: Arc::new(AidData {
uuid: Uuid::new_v4(),
system_uuid: system.uuid(),
name: builder.name.clone(),
sender: ActorSender::Local {
system: system.clone(),
stopped: AtomicBool::new(false),
sender,
},
}),
};
let state_box = SendSyncUnsafeCell(UnsafeCell::new(Some(state)));
let handler = Box::new(move |ctx: Context, msg: Message| {
let state = SendSyncPointer(state_box.0.get());
let s = unsafe { (*state.0).take() }.expect("State cell was empty");
let future = catch_unwind(AssertUnwindSafe(|| (processor)(s, ctx, msg)));
async move {
match future {
Ok(future) => match AssertUnwindSafe(future).catch_unwind().await {
Ok(x) => x,
Err(panic) => {
warn!("Actor panicked! Catching as error");
Err(Panic::from(panic).into())
}
},
Err(err) => {
warn!("Actor panicked! Catching as error");
Err(Panic::from(err).into())
}
}
.map(|(s, status)| {
unsafe { ptr::write(state.0, Some(s)) };
status
})
}
.boxed()
});
let context = Context { aid, system };
let actor = Actor {
context: context.clone(),
};
let stream = ActorStream {
context,
receiver,
handler,
pending: None,
stopping: false,
};
(Arc::new(actor), stream)
}
}
impl ActorStream {
pub(crate) fn handle_result(&self, result: Result<Status, StdError>) -> bool {
let mut stopping = false;
match result {
Ok(Status::Done) => {
trace!(
"Actor {} finished processing a message",
self.context.aid.uuid()
);
self.receiver.pop().unwrap()
}
Ok(Status::Skip) => {
trace!(
"Actor {} skipped processing a message",
self.context.aid.uuid()
);
self.receiver.skip().unwrap()
}
Ok(Status::Reset) => {
trace!(
"Actor {} finished processing a message and reset the cursor",
self.context.aid.uuid()
);
self.receiver.pop().unwrap();
self.receiver.reset_skip().unwrap();
}
Ok(Status::Stop) => {
debug!("Actor \"{}\" stopping", self.context.aid.name_or_uuid());
self.receiver.pop().unwrap();
self.context
.system
.internal_stop_actor(&self.context.aid, None);
stopping = true;
}
Err(e) => {
self.receiver.pop().unwrap();
error!(
"[{}] returned an error when processing: {}",
self.context.aid, &e
);
self.context
.system
.internal_stop_actor(&self.context.aid, e);
stopping = true;
}
}
stopping
}
fn overwrite_on_stop(&self, result: Result<Status, StdError>) -> Result<Status, StdError> {
match self.stopping {
true => result.map(|_| Status::Stop),
false => result,
}
}
}
impl Stream for ActorStream {
type Item = Result<Status, StdError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
trace!("Actor {} is being polled", self.context.aid.name_or_uuid());
if let Some(pending) = self.pending.as_mut() {
let poll = pending
.as_mut()
.poll(cx)
.map(|r| Some(self.overwrite_on_stop(r)));
if let &Poll::Pending = &poll {
trace!("Actor {} is pending", self.context.aid.uuid());
} else {
drop(self.pending.take());
}
poll
} else {
if self.stopping {
panic!("Stopped ActorStream was polled after stopping. Please open a bug report.")
}
match self.receiver.peek() {
Ok(msg) => {
if let Some(m) = msg.content_as::<SystemMsg>() {
if let SystemMsg::Stop = *m {
trace!("Actor {} received stop message", self.context.aid.uuid());
self.stopping = true;
}
}
let ctx = self.context.clone();
let mut future = (&mut self.handler)(ctx, msg.clone());
match future.as_mut().poll(cx) {
Poll::Ready(r) => Poll::Ready(Some(self.overwrite_on_stop(r))),
Poll::Pending => {
trace!("Actor {} is pending", self.context.aid.uuid());
self.pending = Some(future);
Poll::Pending
}
}
}
Err(err) => match err {
SeccErrors::Empty | SeccErrors::Full(_) => {
trace!(
"Actor `{}` has no more messages, return to sleep",
self.context.aid.name_or_uuid()
);
Poll::Ready(None)
}
},
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::*;
use log::*;
use std::thread;
use std::time::Instant;
#[test]
fn test_send_examples() {
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system
.spawn()
.with((), |_: (), context: Context, message: Message| {
async move {
if let Some(_) = message.content_as::<i32>() {
context.system.trigger_shutdown();
}
Ok(Status::done(()))
}
})
.unwrap();
match aid.send(Message::new(11)) {
Ok(_) => info!("OK Then!"),
Err(e) => info!("Ooops {:?}", e),
}
system.await_shutdown(None);
}
#[test]
fn test_send_unserializable() {
use std::time::Duration;
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
struct Foo {}
impl ActorMessage for Foo {}
assert!(Foo {}.to_bincode().is_err());
assert!(Foo::from_bincode(&vec![1, 2, 3]).is_err());
let aid = system
.spawn()
.with((), move |_state: (), context: Context, message: Message| {
async move {
if let Some(_) = message.content_as::<Foo>() {
context.system.trigger_shutdown();
}
Ok(Status::done(()))
}
})
.unwrap();
aid.send(Message::new(Foo {})).unwrap();
await_received(&aid, 2, 1000).unwrap();
system.await_shutdown(Duration::from_millis(1000));
}
#[test]
fn test_basic_info_unnamed() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().with((), simple_handler).unwrap();
await_received(&aid, 1, 1000).unwrap();
assert_eq!(system.uuid(), aid.data.system_uuid);
assert_eq!(aid.data.system_uuid, aid.system_uuid());
assert_eq!(aid.data.uuid, aid.uuid());
assert_eq!(None, aid.data.name);
assert_eq!(aid.data.name, aid.name());
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_basic_info_named() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().name("A").with((), simple_handler).unwrap();
await_received(&aid, 1, 1000).unwrap();
assert_eq!(system.uuid(), aid.data.system_uuid);
assert_eq!(aid.data.system_uuid, aid.system_uuid());
assert_eq!(aid.data.uuid, aid.uuid());
assert_eq!(Some("A".to_string()), aid.data.name);
assert_eq!(aid.data.name, aid.name());
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_aid_serialization() {
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid1 = system.spawn().with((), simple_handler).unwrap();
system.init_current();
match aid1.data.sender {
ActorSender::Local { .. } => (),
_ => panic!("The sender should be `Local`"),
}
let aid1_serialized = bincode::serialize(&aid1).unwrap();
let aid1_deserialized: Aid = bincode::deserialize(&aid1_serialized).unwrap();
assert!(Aid::ptr_eq(&aid1, &aid1_deserialized));
let aid2 = system.spawn().with((), simple_handler).unwrap();
let aid2_serialized = bincode::serialize(&aid2).unwrap();
system.stop_actor(&aid2);
let aid2_deserialized = bincode::deserialize::<Aid>(&aid2_serialized);
assert!(aid2_deserialized.is_err());
let handle = thread::spawn(move || {
let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
system2.init_current();
ActorSystem::connect_with_channels(&system, &system2);
let deserialized: Aid = bincode::deserialize(&aid1_serialized).unwrap();
match deserialized.data.sender {
ActorSender::Remote { .. } => {
assert_eq!(aid1.uuid(), deserialized.uuid());
assert_eq!(aid1.system_uuid(), deserialized.system_uuid());
assert_eq!(aid1.name(), deserialized.name());
}
_ => panic!(
"The sender should be `Remote` but was {:?}",
aid1.data.sender
),
}
system2.disconnect(aid1.system_uuid()).unwrap();
let aid1_deserialized = bincode::deserialize::<Aid>(&aid1_serialized);
assert!(aid1_deserialized.is_err());
});
handle.join().unwrap();
}
#[test]
fn test_aid_as_message() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let tracker = AssertCollect::new();
let t = tracker.clone();
#[derive(Serialize, Deserialize)]
enum Op {
Aid(Aid),
}
let aid = system
.spawn()
.with(t, |t: AssertCollect, context: Context, message: Message| {
async move {
if let Some(msg) = message.content_as::<Aid>() {
t.assert(Aid::ptr_eq(&context.aid, &msg), "Aid mutated in transit");
} else if let Some(msg) = message.content_as::<Op>() {
match &*msg {
Op::Aid(a) => {
t.assert(Aid::ptr_eq(&context.aid, &a), "Aid mutated in transit")
}
}
}
Ok(Status::done(t))
}
})
.unwrap();
aid.send_new(aid.clone()).unwrap();
aid.send_new(Op::Aid(aid.clone())).unwrap();
await_received(&aid, 2, 1000).unwrap();
system.trigger_and_await_shutdown(None);
tracker.collect();
}
#[test]
fn test_cant_send_to_stopped() {
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let aid = system.spawn().with((), simple_handler).unwrap();
system.stop_actor(&aid);
assert_eq!(false, system.is_actor_alive(&aid));
match aid.send(Message::new(42 as i32)) {
Err(AidError::ActorAlreadyStopped) => assert!(true), Ok(_) => panic!("Expected the actor to be shut down!"),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
#[test]
fn test_actor_returns_stop() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let tracker = AssertCollect::new();
let t = tracker.clone();
let aid = system
.spawn()
.with(t, |t: AssertCollect, _: Context, message: Message| {
async move {
if let Some(_msg) = message.content_as::<i32>() {
Ok(Status::stop(t))
} else if let Some(msg) = message.content_as::<SystemMsg>() {
match &*msg {
SystemMsg::Start => Ok(Status::done(t)),
m => t.panic(format!("unexpected message: {:?}", m)),
}
} else {
t.panic("Unknown Message received")
}
}
})
.unwrap();
assert_eq!(true, system.is_actor_alive(&aid));
aid.send_new(11 as i32).unwrap();
await_received(&aid, 2, 1000).unwrap();
let max = Duration::from_millis(200);
let start = Instant::now();
loop {
if !system.is_actor_alive(&aid) {
break;
} else if max < Instant::elapsed(&start) {
panic!("Timed out waiting for actor to stop!");
}
sleep(1);
}
system.trigger_and_await_shutdown(None);
tracker.collect();
}
#[test]
fn test_actor_cannot_override_stop() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let tracker = AssertCollect::new();
let t = tracker.clone();
let aid = system
.spawn()
.with(t, |t: AssertCollect, _: Context, message: Message| {
async move {
if let Some(msg) = message.content_as::<SystemMsg>() {
match &*msg {
SystemMsg::Start => Ok(Status::done(t)),
SystemMsg::Stop => Ok(Status::done(t)),
m => t.panic(format!("unexpected message: {:?}", m)),
}
} else {
t.panic("Unknown Message received")
}
}
})
.unwrap();
assert_eq!(true, system.is_actor_alive(&aid));
aid.send_new(SystemMsg::Stop).unwrap();
await_received(&aid, 2, 1000).unwrap();
let max = Duration::from_millis(200);
let start = Instant::now();
loop {
if !system.is_actor_alive(&aid) {
break;
} else if max < Instant::elapsed(&start) {
panic!("Timed out waiting for actor to stop!");
}
sleep(1);
}
system.trigger_and_await_shutdown(None);
tracker.collect();
}
}