use std::sync::Arc;
use actr_framework::{Context, ErrorCategory, ErrorEvent, MessageDispatcher, Workload};
use actr_protocol::{Acl, ActorResult, ActrError, ActrId, RpcEnvelope};
use bytes::Bytes;
use futures_util::FutureExt as _;
use crate::acl::check_acl_permission;
pub struct ActrDispatch<W: Workload> {
workload: Arc<W>,
acl: Option<Acl>,
}
impl<W: Workload> ActrDispatch<W> {
pub fn new(workload: Arc<W>, acl: Option<Acl>) -> Self {
Self { workload, acl }
}
pub fn workload(&self) -> &W {
&self.workload
}
pub async fn on_start<C: Context>(&self, ctx: &C) -> ActorResult<()> {
self.workload.on_start(ctx).await
}
pub async fn on_stop<C: Context>(&self, ctx: &C) -> ActorResult<()> {
self.workload.on_stop(ctx).await
}
pub async fn dispatch<C: Context>(
&self,
self_id: &ActrId,
caller_id: Option<&ActrId>,
envelope: RpcEnvelope,
ctx: &C,
) -> ActorResult<Bytes> {
let allowed = check_acl_permission(caller_id, self_id, self.acl.as_ref())
.map_err(|e| ActrError::Internal(format!("ACL check failed: {e}")))?;
if !allowed {
tracing::warn!(
severity = 5,
error_category = "acl_denied",
request_id = %envelope.request_id,
route_key = %envelope.route_key,
"ACL: permission denied",
);
return Err(ActrError::PermissionDenied(format!(
"ACL denied: {} -> {}",
caller_id
.map(|c| c.to_string_repr())
.unwrap_or_else(|| "<unknown>".into()),
self_id.to_string_repr(),
)));
}
self.do_dispatch(envelope, ctx).await
}
async fn do_dispatch<C: Context>(&self, envelope: RpcEnvelope, ctx: &C) -> ActorResult<Bytes> {
let route_key = envelope.route_key.clone();
let request_id = envelope.request_id.clone();
let result =
std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(&self.workload, envelope, ctx))
.catch_unwind()
.await;
match result {
Ok(r) => r,
Err(panic_payload) => {
let info = extract_panic_info(panic_payload);
tracing::error!(
severity = 8,
error_category = "handler_panic",
route_key = %route_key,
request_id = %request_id,
"handler panicked: {}", info,
);
let event = ErrorEvent::now(
ActrError::Internal(format!("handler panicked: {info}")),
ErrorCategory::HandlerPanic,
format!("route_key={route_key} request_id={request_id}"),
);
let _ = self.workload.on_error(ctx, &event).await;
Err(ActrError::DecodeFailure(format!(
"handler panicked: {info}"
)))
}
}
}
}
fn extract_panic_info(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic>".to_string()
}
}
impl<W: Workload> Clone for ActrDispatch<W> {
fn clone(&self) -> Self {
Self {
workload: Arc::clone(&self.workload),
acl: self.acl.clone(),
}
}
}
impl<W: Workload> std::fmt::Debug for ActrDispatch<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActrDispatch")
.field("has_acl", &self.acl.is_some())
.finish()
}
}