use std::time::Duration;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use super::actor::{Actor, ActorCtx, TransitionCtx, TransitionError};
use super::context::RequestCtx;
use super::resource::{Resource, ResourceCtx, ResourceError, ResourceSnapshot, TransitionOutcome};
use super::transition::TransitionInput;
enum Command {
Transition {
name: String,
input: TransitionInput,
ctx: Box<TransitionCtx>,
reply: oneshot::Sender<Result<TransitionOutcome, TransitionError>>,
},
Snapshot {
ctx: ResourceCtx,
reply: oneshot::Sender<Result<ResourceSnapshot, ResourceError>>,
},
Stop { reply: oneshot::Sender<()> },
}
#[derive(Clone)]
pub struct ActorHandle {
tx: mpsc::Sender<Command>,
}
pub(crate) struct ActorSlot {
pub handle: ActorHandle,
pub task: JoinHandle<()>,
}
pub struct PendingTransition {
rx: oneshot::Receiver<Result<TransitionOutcome, TransitionError>>,
}
impl PendingTransition {
pub async fn await_outcome(self) -> Result<TransitionOutcome, TransitionError> {
match self.rx.await {
Ok(result) => result,
Err(_) => Err(TransitionError::Internal(
"actor task dropped reply slot".into(),
)),
}
}
}
impl ActorHandle {
pub fn spawn<A: Actor>(actor: A, capacity: usize) -> Self {
let (handle, _task) = Self::spawn_with_task(actor, capacity, ActorCtx::default());
handle
}
pub(crate) fn spawn_with_task<A: Actor>(
actor: A,
capacity: usize,
actor_ctx: ActorCtx,
) -> (Self, JoinHandle<()>) {
let capacity = capacity.max(1);
let (tx, mut rx) = mpsc::channel::<Command>(capacity);
let task = tokio::spawn(async move {
let mut actor = actor;
let _ = actor.on_start(actor_ctx.clone()).await;
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Transition {
name,
input,
ctx,
reply,
} => {
let enriched = (*ctx).with_actor(actor_ctx.clone());
let outcome = actor.transition(enriched, &name, input).await;
let _ = reply.send(outcome);
}
Command::Snapshot { ctx, reply } => {
let snap = Resource::snapshot(&actor, ctx).await;
let _ = reply.send(snap);
}
Command::Stop { reply } => {
let _ = actor.on_stop(actor_ctx.clone()).await;
let _ = reply.send(());
break;
}
}
}
});
(ActorHandle { tx }, task)
}
pub async fn transition(
&self,
name: &str,
input: TransitionInput,
) -> Result<TransitionOutcome, TransitionError> {
self.transition_with_ctx(
TransitionCtx::new(RequestCtx::default(), "actor"),
name,
input,
)
.await
}
pub async fn transition_with_ctx(
&self,
ctx: TransitionCtx,
name: &str,
input: TransitionInput,
) -> Result<TransitionOutcome, TransitionError> {
let (rtx, rrx) = oneshot::channel();
self.tx
.send(Command::Transition {
name: name.to_string(),
input,
ctx: Box::new(ctx),
reply: rtx,
})
.await
.map_err(|_| TransitionError::Internal("actor task terminated".into()))?;
match rrx.await {
Ok(result) => result,
Err(_) => Err(TransitionError::Internal(
"actor task dropped reply slot".into(),
)),
}
}
pub(crate) async fn snapshot(
&self,
ctx: ResourceCtx,
) -> Result<ResourceSnapshot, ResourceError> {
let (rtx, rrx) = oneshot::channel();
if self
.tx
.send(Command::Snapshot { ctx, reply: rtx })
.await
.is_err()
{
return Err(ResourceError::Unavailable("actor task terminated".into()));
}
match rrx.await {
Ok(result) => result,
Err(_) => Err(ResourceError::Internal(
"actor task dropped reply slot".into(),
)),
}
}
pub(crate) async fn shutdown(&self, within: Duration) -> bool {
let (rtx, rrx) = oneshot::channel();
if self.tx.send(Command::Stop { reply: rtx }).await.is_err() {
return false;
}
matches!(tokio::time::timeout(within, rrx).await, Ok(Ok(())))
}
pub fn try_transition(
&self,
name: &str,
input: TransitionInput,
) -> Result<PendingTransition, TransitionError> {
let (rtx, rrx) = oneshot::channel();
let ctx = TransitionCtx::new(RequestCtx::default(), "actor");
self.tx
.try_send(Command::Transition {
name: name.to_string(),
input,
ctx: Box::new(ctx),
reply: rtx,
})
.map_err(|e| match e {
TrySendError::Full(_) => TransitionError::Busy,
TrySendError::Closed(_) => {
TransitionError::Internal("actor task terminated".into())
}
})?;
Ok(PendingTransition { rx: rrx })
}
}
#[derive(Clone, Debug)]
pub struct NodePolicy {
pub actor_queue_capacity: usize,
}
impl Default for NodePolicy {
fn default() -> Self {
Self {
actor_queue_capacity: 32,
}
}
}