Skip to main content

actr_runtime/
dispatch.rs

1//! Business message dispatch layer
2//!
3//! `ActrDispatch` is the core struct of the new actr-runtime, responsible for:
4//! 1. ACL permission checking
5//! 2. Route key -> Workload handler static dispatch
6//! 3. Handler panic capture and reporting
7//! 4. Lifecycle hook delegation (on_start / on_stop)
8//!
9//! This module contains **no** IO, network, or transport logic,
10//! and can be compiled and run on both native and wasm32 targets.
11
12use std::sync::Arc;
13
14use actr_framework::{Context, ErrorCategory, ErrorEvent, MessageDispatcher, Workload};
15use actr_protocol::{Acl, ActorResult, ActrError, ActrId, RpcEnvelope};
16use bytes::Bytes;
17use futures_util::FutureExt as _;
18
19use crate::acl::check_acl_permission;
20
21/// Pure business dispatcher
22///
23/// Holds an `Arc<W>` workload instance and optional ACL rules,
24/// exposing `dispatch()` and lifecycle methods.
25pub struct ActrDispatch<W: Workload> {
26    workload: Arc<W>,
27    acl: Option<Acl>,
28}
29
30impl<W: Workload> ActrDispatch<W> {
31    /// Create a dispatcher
32    ///
33    /// # Arguments
34    /// - `workload`: Business Workload instance (wrapped in `Arc`)
35    /// - `acl`: Optional ACL rule set; `None` means all calls are allowed by default
36    pub fn new(workload: Arc<W>, acl: Option<Acl>) -> Self {
37        Self { workload, acl }
38    }
39
40    /// Get Workload reference
41    pub fn workload(&self) -> &W {
42        &self.workload
43    }
44
45    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
46    // Lifecycle
47    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
48
49    /// Forward on_start lifecycle hook
50    pub async fn on_start<C: Context>(&self, ctx: &C) -> ActorResult<()> {
51        self.workload.on_start(ctx).await
52    }
53
54    /// Forward on_stop lifecycle hook
55    pub async fn on_stop<C: Context>(&self, ctx: &C) -> ActorResult<()> {
56        self.workload.on_stop(ctx).await
57    }
58
59    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
60    // Message dispatch
61    // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
62
63    /// Dispatch inbound message: ACL check -> routing -> handler execution
64    ///
65    /// # Arguments
66    /// - `self_id`: Current Actor's ID
67    /// - `caller_id`: Caller ID (`None` for local calls)
68    /// - `envelope`: RPC envelope (contains route_key and payload)
69    /// - `ctx`: Execution context (generic, provided by upper layer)
70    ///
71    /// # Returns
72    /// Serialized response bytes, or `ActrError`
73    pub async fn dispatch<C: Context>(
74        &self,
75        self_id: &ActrId,
76        caller_id: Option<&ActrId>,
77        envelope: RpcEnvelope,
78        ctx: &C,
79    ) -> ActorResult<Bytes> {
80        // -- ACL check --
81        let allowed = check_acl_permission(caller_id, self_id, self.acl.as_ref())
82            .map_err(|e| ActrError::Internal(format!("ACL check failed: {e}")))?;
83
84        if !allowed {
85            tracing::warn!(
86                severity = 5,
87                error_category = "acl_denied",
88                request_id = %envelope.request_id,
89                route_key = %envelope.route_key,
90                "ACL: permission denied",
91            );
92            return Err(ActrError::PermissionDenied(format!(
93                "ACL denied: {} -> {}",
94                caller_id
95                    .map(|c| c.to_string_repr())
96                    .unwrap_or_else(|| "<unknown>".into()),
97                self_id.to_string_repr(),
98            )));
99        }
100
101        // -- Static dispatch + panic capture --
102        self.do_dispatch(envelope, ctx).await
103    }
104
105    /// Internal dispatch: call `MessageDispatcher::dispatch`, capture handler panics
106    async fn do_dispatch<C: Context>(&self, envelope: RpcEnvelope, ctx: &C) -> ActorResult<Bytes> {
107        let route_key = envelope.route_key.clone();
108        let request_id = envelope.request_id.clone();
109
110        let result =
111            std::panic::AssertUnwindSafe(W::Dispatcher::dispatch(&self.workload, envelope, ctx))
112                .catch_unwind()
113                .await;
114
115        match result {
116            Ok(r) => r,
117            Err(panic_payload) => {
118                let info = extract_panic_info(panic_payload);
119                tracing::error!(
120                    severity = 8,
121                    error_category = "handler_panic",
122                    route_key = %route_key,
123                    request_id = %request_id,
124                    "handler panicked: {}", info,
125                );
126                // Notify workload's on_error hook
127                let event = ErrorEvent::now(
128                    ActrError::Internal(format!("handler panicked: {info}")),
129                    ErrorCategory::HandlerPanic,
130                    format!("route_key={route_key} request_id={request_id}"),
131                );
132                let _ = self.workload.on_error(ctx, &event).await;
133                Err(ActrError::DecodeFailure(format!(
134                    "handler panicked: {info}"
135                )))
136            }
137        }
138    }
139}
140
141// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
142// Utility functions
143// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
144
145/// Extract a readable string from a panic payload
146fn extract_panic_info(payload: Box<dyn std::any::Any + Send>) -> String {
147    if let Some(s) = payload.downcast_ref::<&str>() {
148        s.to_string()
149    } else if let Some(s) = payload.downcast_ref::<String>() {
150        s.clone()
151    } else {
152        "<non-string panic>".to_string()
153    }
154}
155
156// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
157// trait impls
158// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
159
160impl<W: Workload> Clone for ActrDispatch<W> {
161    fn clone(&self) -> Self {
162        Self {
163            workload: Arc::clone(&self.workload),
164            acl: self.acl.clone(),
165        }
166    }
167}
168
169impl<W: Workload> std::fmt::Debug for ActrDispatch<W> {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("ActrDispatch")
172            .field("has_acl", &self.acl.is_some())
173            .finish()
174    }
175}