use std::future::Future;
use crate::concurrency::JoinHandle;
use crate::Actor as SendActor;
use crate::ActorCell;
use crate::ActorName;
use crate::ActorProcessingErr;
use crate::ActorRef;
use crate::Message;
use crate::RpcReplyPort;
use crate::SpawnErr;
use crate::State;
use crate::SupervisionEvent;
mod inner;
#[cfg(test)]
mod supervision_tests;
#[cfg(test)]
mod tests;
pub trait ThreadLocalActor: Default + Sized + 'static {
type Msg: Message;
type State;
type Arguments: State;
fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> impl Future<Output = Result<Self::State, ActorProcessingErr>>;
#[allow(unused_variables)]
fn post_start(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> {
async { Ok(()) }
}
#[allow(unused_variables)]
fn post_stop(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> {
async { Ok(()) }
}
#[allow(unused_variables)]
fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> {
async { Ok(()) }
}
#[allow(unused_variables)]
#[cfg(feature = "cluster")]
fn handle_serialized(
&self,
myself: ActorRef<Self::Msg>,
message: crate::message::SerializedMessage,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> {
async { Ok(()) }
}
#[allow(unused_variables)]
fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> {
async move {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorFailed(who, _) => {
myself.stop(None);
}
_ => {}
}
Ok(())
}
}
fn spawn(
name: Option<ActorName>,
startup_args: Self::Arguments,
spawner: ThreadLocalActorSpawner,
) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> {
inner::ThreadLocalActorRuntime::<Self>::spawn(name, startup_args, spawner)
}
#[allow(clippy::type_complexity)]
fn spawn_instant(
name: Option<ActorName>,
startup_args: Self::Arguments,
spawner: ThreadLocalActorSpawner,
) -> Result<
(
ActorRef<Self::Msg>,
JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
),
SpawnErr,
> {
inner::ThreadLocalActorRuntime::<Self>::spawn_instant(name, startup_args, spawner)
}
fn spawn_linked(
name: Option<ActorName>,
startup_args: Self::Arguments,
supervisor: ActorCell,
spawner: ThreadLocalActorSpawner,
) -> impl Future<Output = Result<(ActorRef<Self::Msg>, JoinHandle<()>), SpawnErr>> {
inner::ThreadLocalActorRuntime::<Self>::spawn_linked(
name,
startup_args,
spawner,
supervisor,
)
}
#[allow(clippy::type_complexity)]
fn spawn_linked_instant(
name: Option<ActorName>,
startup_args: Self::Arguments,
supervisor: ActorCell,
spawner: ThreadLocalActorSpawner,
) -> Result<
(
ActorRef<Self::Msg>,
JoinHandle<Result<JoinHandle<()>, SpawnErr>>,
),
SpawnErr,
> {
inner::ThreadLocalActorRuntime::<Self>::spawn_linked_instant(
name,
startup_args,
spawner,
supervisor,
)
}
}
impl<T> ThreadLocalActor for T
where
T: SendActor + Default,
{
type Msg = <T as SendActor>::Msg;
type State = <T as SendActor>::State;
type Arguments = <T as SendActor>::Arguments;
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
<Self as SendActor>::pre_start(self, myself, args).await
}
async fn post_start(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as SendActor>::post_start(self, myself, state).await
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as SendActor>::handle(self, myself, message, state).await
}
#[cfg(feature = "cluster")]
async fn handle_serialized(
&self,
myself: ActorRef<Self::Msg>,
message: crate::message::SerializedMessage,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as SendActor>::handle_serialized(self, myself, message, state).await
}
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as SendActor>::handle_supervisor_evt(self, myself, message, state).await
}
}
#[allow(clippy::type_complexity)]
struct SpawnArgs {
builder: Box<
dyn FnOnce() -> std::pin::Pin<Box<dyn Future<Output = Result<JoinHandle<()>, SpawnErr>>>>
+ Send,
>,
reply: RpcReplyPort<JoinHandle<Result<JoinHandle<()>, SpawnErr>>>,
name: Option<String>,
}
#[derive(Clone)]
pub struct ThreadLocalActorSpawner {
send: crate::concurrency::MpscUnboundedSender<SpawnArgs>,
}
impl std::fmt::Debug for ThreadLocalActorSpawner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ThreadLocalActorSpawner")
}
}
impl Default for ThreadLocalActorSpawner {
fn default() -> Self {
Self::new()
}
}
impl ThreadLocalActorSpawner {
#[cfg(all(not(feature = "async-std"), not(target_arch = "wasm32")))]
pub fn new() -> Self {
let (send, mut recv) = crate::concurrency::mpsc_unbounded();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
std::thread::spawn(move || {
let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
while let Some(SpawnArgs {
builder,
reply,
name,
}) = recv.recv().await
{
let fut = builder();
#[cfg(tokio_unstable)]
{
let handle = tokio::task::Builder::new()
.name(name.unwrap_or_default().as_str())
.spawn_local(fut)
.expect("Tokio task spawn failed");
_ = reply.send(handle);
}
#[cfg(not(tokio_unstable))]
{
_ = name;
let handle = crate::concurrency::spawn_local(fut);
_ = reply.send(handle);
}
}
});
rt.block_on(local);
});
Self { send }
}
#[cfg(all(feature = "async-std", not(target_arch = "wasm32")))]
pub fn new() -> Self {
let (send, mut recv) = crate::concurrency::mpsc_unbounded();
std::thread::spawn(move || {
async_std::task::block_on(async_std::task::spawn_local(async move {
while let Some(SpawnArgs {
builder,
reply,
name,
}) = recv.recv().await
{
let fut = builder();
_ = name;
let handle = crate::concurrency::spawn_local(fut);
_ = reply.send(handle);
}
}));
});
Self { send }
}
#[cfg(target_arch = "wasm32")]
pub fn new() -> Self {
let (send, mut recv) = crate::concurrency::mpsc_unbounded();
crate::concurrency::spawn_local(async move {
while let Some(SpawnArgs {
builder,
reply,
name: _name,
}) = recv.recv().await
{
let fut = builder();
let handle = crate::concurrency::spawn_local(fut);
_ = reply.send(handle);
}
});
Self { send }
}
#[allow(clippy::type_complexity)]
async fn spawn(
&self,
builder: Box<
dyn FnOnce()
-> std::pin::Pin<Box<dyn Future<Output = Result<JoinHandle<()>, SpawnErr>>>>
+ Send,
>,
name: Option<String>,
) -> Result<JoinHandle<()>, SpawnErr> {
let (tx, rx) = crate::concurrency::oneshot();
let args = SpawnArgs {
builder,
reply: tx.into(),
name,
};
if self.send.send(args).is_err() {
return Err(SpawnErr::StartupFailed("Spawner dead".into()));
}
let rx_result = rx
.await
.map_err(|inner| SpawnErr::StartupFailed(inner.into()))?
.await;
#[cfg(not(feature = "async-std"))]
{
rx_result.map_err(|joinerr| SpawnErr::StartupFailed(joinerr.into()))?
}
#[cfg(feature = "async-std")]
{
rx_result.map_err(|()| SpawnErr::StartupFailed("receive failed".into()))?
}
}
}
impl ActorCell {
pub async fn spawn_local_linked<T: ThreadLocalActor>(
&self,
name: Option<String>,
startup_args: T::Arguments,
spawner: ThreadLocalActorSpawner,
) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
T::spawn_linked(name, startup_args, self.clone(), spawner).await
}
}