1use std::sync::Arc;
13
14use actr_framework::{Context, ErrorCategory, ErrorEvent, MessageDispatcher, Workload};
15use actr_protocol::{Acl, ActorResult, ActrError, ActrId, RpcEnvelope};
16use bytes::Bytes;
17use futures_util::FutureExt as _;
18
19use crate::acl::check_acl_permission;
20
21pub struct ActrDispatch<W: Workload> {
26 workload: Arc<W>,
27 acl: Option<Acl>,
28}
29
30impl<W: Workload> ActrDispatch<W> {
31 pub fn new(workload: Arc<W>, acl: Option<Acl>) -> Self {
37 Self { workload, acl }
38 }
39
40 pub fn workload(&self) -> &W {
42 &self.workload
43 }
44
45 pub async fn on_start<C: Context>(&self, ctx: &C) -> ActorResult<()> {
51 self.workload.on_start(ctx).await
52 }
53
54 pub async fn on_stop<C: Context>(&self, ctx: &C) -> ActorResult<()> {
56 self.workload.on_stop(ctx).await
57 }
58
59 pub async fn dispatch<C: Context>(
74 &self,
75 self_id: &ActrId,
76 caller_id: Option<&ActrId>,
77 envelope: RpcEnvelope,
78 ctx: &C,
79 ) -> ActorResult<Bytes> {
80 let allowed = check_acl_permission(caller_id, self_id, self.acl.as_ref())
82 .map_err(|e| ActrError::Internal(format!("ACL check failed: {e}")))?;
83
84 if !allowed {
85 tracing::warn!(
86 severity = 5,
87 error_category = "acl_denied",
88 request_id = %envelope.request_id,
89 route_key = %envelope.route_key,
90 "ACL: permission denied",
91 );
92 return Err(ActrError::PermissionDenied(format!(
93 "ACL denied: {} -> {}",
94 caller_id
95 .map(|c| c.to_string_repr())
96 .unwrap_or_else(|| "<unknown>".into()),
97 self_id.to_string_repr(),
98 )));
99 }
100
101 self.do_dispatch(envelope, ctx).await
103 }
104
105 async fn do_dispatch<C: Context>(&self, envelope: RpcEnvelope, ctx: &C) -> ActorResult<Bytes> {
107 let route_key = envelope.route_key.clone();
108 let request_id = envelope.request_id.clone();
109
110 let result =
111 std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(&self.workload, envelope, ctx))
112 .catch_unwind()
113 .await;
114
115 match result {
116 Ok(r) => r,
117 Err(panic_payload) => {
118 let info = extract_panic_info(panic_payload);
119 tracing::error!(
120 severity = 8,
121 error_category = "handler_panic",
122 route_key = %route_key,
123 request_id = %request_id,
124 "handler panicked: {}", info,
125 );
126 let event = ErrorEvent::now(
128 ActrError::Internal(format!("handler panicked: {info}")),
129 ErrorCategory::HandlerPanic,
130 format!("route_key={route_key} request_id={request_id}"),
131 );
132 let _ = self.workload.on_error(ctx, &event).await;
133 Err(ActrError::DecodeFailure(format!(
134 "handler panicked: {info}"
135 )))
136 }
137 }
138 }
139}
140
141fn extract_panic_info(payload: Box<dyn std::any::Any + Send>) -> String {
147 if let Some(s) = payload.downcast_ref::<&str>() {
148 s.to_string()
149 } else if let Some(s) = payload.downcast_ref::<String>() {
150 s.clone()
151 } else {
152 "<non-string panic>".to_string()
153 }
154}
155
156impl<W: Workload> Clone for ActrDispatch<W> {
161 fn clone(&self) -> Self {
162 Self {
163 workload: Arc::clone(&self.workload),
164 acl: self.acl.clone(),
165 }
166 }
167}
168
169impl<W: Workload> std::fmt::Debug for ActrDispatch<W> {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("ActrDispatch")
172 .field("has_acl", &self.acl.is_some())
173 .finish()
174 }
175}