use std::any::Any;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use crate::actor::{Actor, ActorContext, ReduceHandler, Handler, ExpandHandler, TransformHandler};
use crate::errors::RuntimeError;
use crate::interceptor::{Disposition, SendMode};
use crate::message::Message;
use crate::stream::{StreamReceiver, StreamSender};
#[async_trait]
pub trait Dispatch<A: Actor>: Send {
async fn dispatch(self: Box<Self>, actor: &mut A, ctx: &mut ActorContext) -> DispatchResult;
fn message_any(&self) -> &dyn Any;
fn send_mode(&self) -> SendMode;
fn message_type_name(&self) -> &'static str;
fn reject(self: Box<Self>, disposition: Disposition, interceptor_name: &str);
fn cancel(self: Box<Self>);
fn cancel_token(&self) -> Option<CancellationToken>;
}
pub type BoxedDispatch<A> = Box<dyn Dispatch<A>>;
pub type ReplySender = Box<dyn FnOnce(Box<dyn Any + Send>) + Send>;
pub struct DispatchResult {
pub reply: Option<Box<dyn Any + Send>>,
pub reply_sender: Option<ReplySender>,
}
impl DispatchResult {
pub fn tell() -> Self {
Self {
reply: None,
reply_sender: None,
}
}
pub fn send_reply(self) {
if let (Some(reply), Some(sender)) = (self.reply, self.reply_sender) {
sender(reply);
}
}
}
pub struct TypedDispatch<M: Message> {
pub msg: M,
}
#[async_trait]
impl<A, M> Dispatch<A> for TypedDispatch<M>
where
A: Handler<M>,
M: Message<Reply = ()>,
{
async fn dispatch(self: Box<Self>, actor: &mut A, ctx: &mut ActorContext) -> DispatchResult {
actor.handle(self.msg, ctx).await;
DispatchResult::tell()
}
fn message_any(&self) -> &dyn Any {
&self.msg
}
fn send_mode(&self) -> SendMode {
SendMode::Tell
}
fn message_type_name(&self) -> &'static str {
std::any::type_name::<M>()
}
fn reject(self: Box<Self>, _: Disposition, _: &str) {}
fn cancel(self: Box<Self>) {}
fn cancel_token(&self) -> Option<CancellationToken> {
None
}
}
pub struct AskDispatch<M: Message> {
pub msg: M,
pub reply_tx: tokio::sync::oneshot::Sender<Result<M::Reply, RuntimeError>>,
pub cancel: Option<CancellationToken>,
}
#[async_trait]
impl<A, M> Dispatch<A> for AskDispatch<M>
where
A: Handler<M>,
M: Message,
{
async fn dispatch(self: Box<Self>, actor: &mut A, ctx: &mut ActorContext) -> DispatchResult {
let reply = actor.handle(self.msg, ctx).await;
let reply_any: Box<dyn Any + Send> = Box::new(reply);
let reply_tx = self.reply_tx;
DispatchResult {
reply: Some(reply_any),
reply_sender: Some(Box::new(move |boxed_reply| {
if let Ok(reply) = boxed_reply.downcast::<M::Reply>() {
if reply_tx.send(Ok(*reply)).is_err() {
tracing::debug!(
"reply dropped — caller may have timed out or been cancelled"
);
}
}
})),
}
}
fn message_any(&self) -> &dyn Any {
&self.msg
}
fn send_mode(&self) -> SendMode {
SendMode::Ask
}
fn message_type_name(&self) -> &'static str {
std::any::type_name::<M>()
}
fn reject(self: Box<Self>, disposition: Disposition, interceptor_name: &str) {
let error = match disposition {
Disposition::Reject(reason) => RuntimeError::Rejected {
interceptor: interceptor_name.to_string(),
reason,
},
Disposition::Retry(retry_after) => RuntimeError::RetryAfter {
interceptor: interceptor_name.to_string(),
retry_after,
},
Disposition::Drop => RuntimeError::ActorNotFound(format!(
"dropped by interceptor '{}'",
interceptor_name
)),
_ => return,
};
let _ = self.reply_tx.send(Err(error));
}
fn cancel(self: Box<Self>) {
let _ = self.reply_tx.send(Err(RuntimeError::Cancelled));
}
fn cancel_token(&self) -> Option<CancellationToken> {
self.cancel.clone()
}
}
pub struct ExpandDispatch<M: Send + 'static, OutputItem: Send + 'static> {
pub msg: M,
pub sender: StreamSender<OutputItem>,
pub cancel: Option<CancellationToken>,
}
#[async_trait]
impl<A, M, OutputItem> Dispatch<A> for ExpandDispatch<M, OutputItem>
where
A: ExpandHandler<M, OutputItem>,
M: Send + 'static,
OutputItem: Send + 'static,
{
async fn dispatch(self: Box<Self>, actor: &mut A, ctx: &mut ActorContext) -> DispatchResult {
actor.handle_expand(self.msg, self.sender, ctx).await;
DispatchResult::tell()
}
fn message_any(&self) -> &dyn Any {
&self.msg
}
fn send_mode(&self) -> SendMode {
SendMode::Expand
}
fn message_type_name(&self) -> &'static str {
std::any::type_name::<M>()
}
fn reject(self: Box<Self>, _: Disposition, _: &str) {}
fn cancel(self: Box<Self>) {}
fn cancel_token(&self) -> Option<CancellationToken> {
self.cancel.clone()
}
}
pub struct ReduceDispatch<InputItem: Send + 'static, Reply: Send + 'static> {
pub receiver: StreamReceiver<InputItem>,
pub reply_tx: tokio::sync::oneshot::Sender<Result<Reply, RuntimeError>>,
pub cancel: Option<CancellationToken>,
}
#[async_trait]
impl<A, InputItem, Reply> Dispatch<A> for ReduceDispatch<InputItem, Reply>
where
A: ReduceHandler<InputItem, Reply>,
InputItem: Send + 'static,
Reply: Send + 'static,
{
async fn dispatch(self: Box<Self>, actor: &mut A, ctx: &mut ActorContext) -> DispatchResult {
let reply = actor.handle_reduce(self.receiver, ctx).await;
let reply_any: Box<dyn Any + Send> = Box::new(reply);
let reply_tx = self.reply_tx;
DispatchResult {
reply: Some(reply_any),
reply_sender: Some(Box::new(move |boxed_reply| {
if let Ok(reply) = boxed_reply.downcast::<Reply>() {
if reply_tx.send(Ok(*reply)).is_err() {
tracing::debug!(
"reply dropped — caller may have timed out or been cancelled"
);
}
}
})),
}
}
fn message_any(&self) -> &dyn Any {
&()
}
fn send_mode(&self) -> SendMode {
SendMode::Reduce
}
fn message_type_name(&self) -> &'static str {
std::any::type_name::<InputItem>()
}
fn reject(self: Box<Self>, disposition: Disposition, interceptor_name: &str) {
let error = match disposition {
Disposition::Reject(reason) => RuntimeError::Rejected {
interceptor: interceptor_name.to_string(),
reason,
},
Disposition::Retry(retry_after) => RuntimeError::RetryAfter {
interceptor: interceptor_name.to_string(),
retry_after,
},
Disposition::Drop => RuntimeError::ActorNotFound(format!(
"dropped by interceptor '{}'",
interceptor_name
)),
_ => return,
};
let _ = self.reply_tx.send(Err(error));
}
fn cancel(self: Box<Self>) {
let _ = self.reply_tx.send(Err(RuntimeError::Cancelled));
}
fn cancel_token(&self) -> Option<CancellationToken> {
self.cancel.clone()
}
}
pub struct TransformDispatch<A, InputItem, OutputItem>
where
A: TransformHandler<InputItem, OutputItem>,
InputItem: Send + 'static,
OutputItem: Send + 'static,
{
pub receiver: StreamReceiver<InputItem>,
pub sender: StreamSender<OutputItem>,
pub cancel: Option<CancellationToken>,
_phantom: std::marker::PhantomData<fn() -> A>,
}
impl<A, InputItem, OutputItem> TransformDispatch<A, InputItem, OutputItem>
where
A: TransformHandler<InputItem, OutputItem>,
InputItem: Send + 'static,
OutputItem: Send + 'static,
{
pub fn new(
receiver: StreamReceiver<InputItem>,
sender: StreamSender<OutputItem>,
cancel: Option<CancellationToken>,
) -> Self {
Self {
receiver,
sender,
cancel,
_phantom: std::marker::PhantomData,
}
}
}
#[async_trait]
impl<A, InputItem, OutputItem> Dispatch<A> for TransformDispatch<A, InputItem, OutputItem>
where
A: TransformHandler<InputItem, OutputItem>,
InputItem: Send + 'static,
OutputItem: Send + 'static,
{
async fn dispatch(self: Box<Self>, actor: &mut A, ctx: &mut ActorContext) -> DispatchResult {
let mut receiver = self.receiver;
let sender = self.sender;
let cancel = self.cancel;
let mut cancelled = false;
while let Some(item) = receiver.recv().await {
if let Some(ref token) = cancel {
if token.is_cancelled() {
cancelled = true;
break;
}
}
actor.handle_transform(item, &sender, ctx).await;
}
if !cancelled {
actor.on_transform_complete(&sender, ctx).await;
}
DispatchResult::tell()
}
fn message_any(&self) -> &dyn Any {
&()
}
fn send_mode(&self) -> SendMode {
SendMode::Transform
}
fn message_type_name(&self) -> &'static str {
std::any::type_name::<InputItem>()
}
fn reject(self: Box<Self>, _: Disposition, _: &str) {}
fn cancel(self: Box<Self>) {}
fn cancel_token(&self) -> Option<CancellationToken> {
self.cancel.clone()
}
}