use std::any::TypeId;
use std::sync::Arc;
#[cfg(feature = "async-std")]
use futures::FutureExt;
use super::actor_properties::MuxedMessage;
use super::messages::Signal;
use super::messages::StopMessage;
use super::SupervisionEvent;
use crate::actor::actor_properties::ActorProperties;
use crate::concurrency::JoinHandle;
use crate::concurrency::MpscUnboundedReceiver as InputPortReceiver;
use crate::concurrency::OneshotReceiver;
use crate::errors::MessagingErr;
#[cfg(feature = "cluster")]
use crate::message::SerializedMessage;
use crate::Actor;
use crate::ActorId;
use crate::ActorName;
use crate::ActorRef;
use crate::Message;
use crate::RactorErr;
use crate::SpawnErr;
#[derive(Debug, Clone, Eq, PartialEq, Copy, PartialOrd, Ord)]
#[repr(u8)]
pub enum ActorStatus {
Unstarted = 0u8,
Starting = 1u8,
Running = 2u8,
Upgrading = 3u8,
Draining = 4u8,
Stopping = 5u8,
Stopped = 6u8,
}
pub const ACTIVE_STATES: [ActorStatus; 3] = [
ActorStatus::Starting,
ActorStatus::Running,
ActorStatus::Upgrading,
];
pub(crate) struct ActorPortSet {
pub(crate) signal_rx: OneshotReceiver<Signal>,
pub(crate) stop_rx: OneshotReceiver<StopMessage>,
pub(crate) supervisor_rx: InputPortReceiver<SupervisionEvent>,
pub(crate) message_rx: InputPortReceiver<MuxedMessage>,
}
impl Drop for ActorPortSet {
fn drop(&mut self) {
self.signal_rx.close();
self.stop_rx.close();
self.supervisor_rx.close();
self.message_rx.close();
while self.signal_rx.try_recv().is_ok() {}
while self.stop_rx.try_recv().is_ok() {}
while self.supervisor_rx.try_recv().is_ok() {}
while self.message_rx.try_recv().is_ok() {}
}
}
pub(crate) enum ActorPortMessage {
Signal(Signal),
Stop(StopMessage),
Supervision(SupervisionEvent),
Message(MuxedMessage),
}
impl ActorPortSet {
pub(crate) async fn run_with_signal<TState>(
&mut self,
future: impl std::future::Future<Output = TState>,
) -> Result<TState, Signal>
where
TState: crate::State,
{
#[cfg(feature = "async-std")]
{
crate::concurrency::select! {
signal = (&mut self.signal_rx).fuse() => {
Err(signal.unwrap_or(Signal::Kill))
}
new_state = future.fuse() => {
Ok(new_state)
}
}
}
#[cfg(not(feature = "async-std"))]
{
crate::concurrency::select! {
signal = &mut self.signal_rx => {
Err(signal.unwrap_or(Signal::Kill))
}
new_state = future => {
Ok(new_state)
}
}
}
}
pub(crate) async fn listen_in_priority(
&mut self,
) -> Result<ActorPortMessage, MessagingErr<()>> {
#[cfg(feature = "async-std")]
{
crate::concurrency::select! {
signal = (&mut self.signal_rx).fuse() => {
signal.map(ActorPortMessage::Signal).map_err(|_| MessagingErr::ChannelClosed)
}
stop = (&mut self.stop_rx).fuse() => {
stop.map(ActorPortMessage::Stop).map_err(|_| MessagingErr::ChannelClosed)
}
supervision = self.supervisor_rx.recv().fuse() => {
supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed)
}
message = self.message_rx.recv().fuse() => {
message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed)
}
}
}
#[cfg(not(feature = "async-std"))]
{
crate::concurrency::select! {
signal = &mut self.signal_rx => {
signal.map(ActorPortMessage::Signal).map_err(|_| MessagingErr::ChannelClosed)
}
stop = &mut self.stop_rx => {
stop.map(ActorPortMessage::Stop).map_err(|_| MessagingErr::ChannelClosed)
}
supervision = self.supervisor_rx.recv() => {
supervision.map(ActorPortMessage::Supervision).ok_or(MessagingErr::ChannelClosed)
}
message = self.message_rx.recv() => {
message.map(ActorPortMessage::Message).ok_or(MessagingErr::ChannelClosed)
}
}
}
}
}
#[derive(Clone)]
pub struct ActorCell {
pub(crate) inner: Arc<ActorProperties>,
}
impl std::fmt::Debug for ActorCell {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Actor")
.field("name", &self.get_name())
.field("id", &self.get_id())
.finish()
}
}
impl PartialEq for ActorCell {
fn eq(&self, other: &Self) -> bool {
other.get_id() == self.get_id()
}
}
impl Eq for ActorCell {}
impl std::hash::Hash for ActorCell {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.get_id().hash(state)
}
}
impl ActorCell {
pub(crate) fn new<TActor>(name: Option<ActorName>) -> Result<(Self, ActorPortSet), SpawnErr>
where
TActor: Actor,
{
let (props, rx1, rx2, rx3, rx4) = ActorProperties::new::<TActor>(name.clone());
let cell = Self {
inner: Arc::new(props),
};
#[cfg(feature = "cluster")]
{
crate::registry::pid_registry::register_pid(cell.get_id(), cell.clone())?;
}
if let Some(r_name) = name {
crate::registry::register(r_name, cell.clone())?;
}
Ok((
cell,
ActorPortSet {
signal_rx: rx1,
stop_rx: rx2,
supervisor_rx: rx3,
message_rx: rx4,
},
))
}
#[cfg(feature = "cluster")]
pub(crate) fn new_remote<TActor>(
name: Option<ActorName>,
id: ActorId,
) -> Result<(Self, ActorPortSet), SpawnErr>
where
TActor: Actor,
{
if id.is_local() {
return Err(SpawnErr::StartupFailed(From::from("Cannot create a new remote actor handler without the actor id being marked as a remote actor!")));
}
let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::<TActor>(name, id);
let cell = Self {
inner: Arc::new(props),
};
Ok((
cell,
ActorPortSet {
signal_rx: rx1,
stop_rx: rx2,
supervisor_rx: rx3,
message_rx: rx4,
},
))
}
pub fn get_id(&self) -> ActorId {
self.inner.id
}
pub fn get_name(&self) -> Option<ActorName> {
self.inner.name.clone()
}
pub fn get_status(&self) -> ActorStatus {
self.inner.get_status()
}
#[cfg(feature = "cluster")]
pub fn supports_remoting(&self) -> bool {
self.inner.supports_remoting
}
pub(crate) fn set_status(&self, status: ActorStatus) {
if (status == ActorStatus::Stopped || status == ActorStatus::Stopping)
&& self.get_status() < ActorStatus::Stopping
{
#[cfg(feature = "cluster")]
{
crate::registry::pid_registry::demonitor(self.get_id());
crate::registry::pid_registry::unregister_pid(self.get_id());
}
if let Some(name) = self.get_name() {
crate::registry::unregister(name);
}
crate::pg::demonitor_all(self.get_id());
crate::pg::leave_all(self.get_id());
}
if status == ActorStatus::Stopped {
self.inner.notify_stop_listener();
}
self.inner.set_status(status)
}
pub(crate) fn terminate(&self) {
if self.get_status() as u8 <= ActorStatus::Upgrading as u8 {
self.kill();
}
self.inner.tree.terminate_all_children();
}
pub fn link(&self, supervisor: ActorCell) {
supervisor.inner.tree.insert_child(self.clone());
self.inner.tree.set_supervisor(supervisor);
}
pub fn unlink(&self, supervisor: ActorCell) {
if self.inner.tree.is_child_of(supervisor.get_id()) {
supervisor.inner.tree.remove_child(self.get_id());
self.inner.tree.clear_supervisor();
}
}
pub(crate) fn clear_supervisor(&self) {
self.inner.tree.clear_supervisor();
}
#[cfg(feature = "monitors")]
pub fn monitor(&self, who: ActorCell) {
who.inner.tree.set_monitor(self.clone());
}
#[cfg(feature = "monitors")]
pub fn unmonitor(&self, who: ActorCell) {
who.inner.tree.remove_monitor(self.get_id());
}
pub fn kill(&self) {
let _ = self.inner.send_signal(Signal::Kill);
}
pub async fn kill_and_wait(
&self,
timeout: Option<crate::concurrency::Duration>,
) -> Result<(), RactorErr<()>> {
if let Some(to) = timeout {
match crate::concurrency::timeout(to, self.inner.send_signal_and_wait(Signal::Kill))
.await
{
Err(_) => Err(RactorErr::Timeout),
Ok(Err(e)) => Err(e.into()),
Ok(_) => Ok(()),
}
} else {
Ok(self.inner.send_signal_and_wait(Signal::Kill).await?)
}
}
pub fn stop(&self, reason: Option<String>) {
let _ = self.inner.send_stop(reason);
}
pub async fn stop_and_wait(
&self,
reason: Option<String>,
timeout: Option<crate::concurrency::Duration>,
) -> Result<(), RactorErr<StopMessage>> {
if let Some(to) = timeout {
match crate::concurrency::timeout(to, self.inner.send_stop_and_wait(reason)).await {
Err(_) => Err(RactorErr::Timeout),
Ok(Err(e)) => Err(e.into()),
Ok(_) => Ok(()),
}
} else {
Ok(self.inner.send_stop_and_wait(reason).await?)
}
}
pub async fn wait(
&self,
timeout: Option<crate::concurrency::Duration>,
) -> Result<(), crate::concurrency::Timeout> {
if let Some(to) = timeout {
crate::concurrency::timeout(to, self.inner.wait()).await
} else {
self.inner.wait().await;
Ok(())
}
}
pub(crate) fn send_supervisor_evt(
&self,
message: SupervisionEvent,
) -> Result<(), MessagingErr<SupervisionEvent>> {
self.inner.send_supervisor_evt(message)
}
pub fn send_message<TMessage>(&self, message: TMessage) -> Result<(), MessagingErr<TMessage>>
where
TMessage: Message,
{
self.inner.send_message::<TMessage>(message)
}
pub fn drain(&self) -> Result<(), MessagingErr<()>> {
self.inner.drain()
}
pub async fn drain_and_wait(
&self,
timeout: Option<crate::concurrency::Duration>,
) -> Result<(), RactorErr<()>> {
if let Some(to) = timeout {
match crate::concurrency::timeout(to, self.inner.drain_and_wait()).await {
Err(_) => Err(RactorErr::Timeout),
Ok(Err(e)) => Err(e.into()),
Ok(_) => Ok(()),
}
} else {
Ok(self.inner.drain_and_wait().await?)
}
}
#[cfg(feature = "cluster")]
pub fn send_serialized(
&self,
message: SerializedMessage,
) -> Result<(), Box<MessagingErr<SerializedMessage>>> {
self.inner.send_serialized(message)
}
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
self.inner.tree.notify_supervisor(evt)
}
pub fn stop_children(&self, reason: Option<String>) {
self.inner.tree.stop_all_children(reason);
}
pub fn try_get_supervisor(&self) -> Option<ActorCell> {
self.inner.tree.try_get_supervisor()
}
pub async fn stop_children_and_wait(
&self,
reason: Option<String>,
timeout: Option<crate::concurrency::Duration>,
) {
self.inner
.tree
.stop_all_children_and_wait(reason, timeout)
.await
}
pub fn drain_children(&self) {
self.inner.tree.drain_all_children();
}
pub async fn drain_children_and_wait(&self, timeout: Option<crate::concurrency::Duration>) {
self.inner.tree.drain_all_children_and_wait(timeout).await
}
pub fn get_children(&self) -> Vec<ActorCell> {
self.inner.tree.get_children()
}
pub fn get_type_id(&self) -> TypeId {
self.inner.type_id
}
pub fn is_message_type_of<TMessage: Message>(&self) -> Option<bool> {
if self.get_id().is_local() {
Some(self.get_type_id() == std::any::TypeId::of::<TMessage>())
} else {
None
}
}
pub async fn spawn_linked<T: Actor>(
&self,
name: Option<String>,
handler: T,
startup_args: T::Arguments,
) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
crate::actor::ActorRuntime::spawn_linked(name, handler, startup_args, self.clone()).await
}
#[cfg(test)]
pub(crate) fn get_num_children(&self) -> usize {
self.inner.tree.get_num_children()
}
#[cfg(test)]
pub(crate) fn get_num_parents(&self) -> usize {
self.inner.tree.get_num_parents()
}
}