Skip to main content

palladium_actor/
actor.rs

1use crate::bridge::{CachedSendFn, RuntimeBridge};
2use crate::envelope::Envelope;
3use crate::errors::{ActorError, SendError};
4use crate::message::MessagePayload;
5use crate::path::{ActorPath, AddrHash};
6use crate::policy::NamespacePolicy;
7use crate::spec::ChildSpec;
8use std::collections::HashMap;
9use std::fmt;
10use std::sync::Arc;
11use std::time::Duration;
12
13// ── AddrCache ─────────────────────────────────────────────────────────────────
14
15/// Per-actor cache of direct `MailboxSender` wrappers for frequently-contacted
16/// destinations.
17///
18/// Populated on the first successful `send_raw` to a local actor; evicted when
19/// delivery returns `SendError::ActorStopped`.  Cloned contexts receive a fresh
20/// empty cache so parent/child actors don't share stale entries.
21#[derive(Default)]
22pub(crate) struct AddrCache {
23    entries: HashMap<AddrHash, CachedSendFn>,
24}
25
26impl Clone for AddrCache {
27    fn clone(&self) -> Self {
28        Self::default()
29    }
30}
31
32impl AddrCache {
33    pub(crate) fn get(&self, addr: AddrHash) -> Option<&CachedSendFn> {
34        self.entries.get(&addr)
35    }
36
37    pub(crate) fn insert(&mut self, addr: AddrHash, f: CachedSendFn) {
38        self.entries.insert(addr, f);
39    }
40
41    pub(crate) fn remove(&mut self, addr: AddrHash) {
42        self.entries.remove(&addr);
43    }
44}
45
46// ── StopReason ────────────────────────────────────────────────────────────────
47
48/// The reason supplied to [`Actor::on_stop`].
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum StopReason {
51    /// Actor exited cleanly under normal conditions.
52    Normal,
53    /// Stop was requested explicitly (e.g., `ActorContext::stop`).
54    Requested,
55    /// Stop was initiated by the actor's supervisor.
56    Supervisor,
57    /// Actor was stopped because its parent stopped (REQ-004).
58    Hierarchical,
59    /// Engine is shutting down.
60    Shutdown,
61    /// Actor was force-killed (e.g., `ShutdownPolicy::Brutal`).
62    Killed,
63    /// Actor failed with an error that caused it to stop.
64    Error(ActorError),
65}
66
67// ── Actor ─────────────────────────────────────────────────────────────────────
68
69/// The core actor trait.
70///
71/// Implementations provide lifecycle hooks.  The runtime calls `on_start`
72/// before the first `on_message`, and `on_stop` after the last.
73pub trait Actor<R: crate::bridge::Reactor>: Send + 'static {
74    fn on_start(&mut self, _ctx: &mut ActorContext<R>) -> Result<(), ActorError> {
75        Ok(())
76    }
77
78    fn on_message(
79        &mut self,
80        _ctx: &mut ActorContext<R>,
81        _envelope: &Envelope,
82        _payload: MessagePayload,
83    ) -> Result<(), ActorError>;
84
85    fn on_stop(&mut self, _ctx: &mut ActorContext<R>, _reason: StopReason) {}
86}
87
88// ── ActorContext ──────────────────────────────────────────────────────────────
89
90/// Runtime handle provided to each lifecycle hook.
91///
92/// `bridge` is `None` in test/stub contexts (Phase 1).  `pd-runtime` injects
93/// a real bridge via `ActorContext::with_bridge` when spawning actors so that
94/// `spawn`, `stop`, `send_raw`, and `send_after` are fully functional.
95#[derive(Clone)]
96pub struct ActorContext<R: crate::bridge::Reactor> {
97    pub(crate) path: ActorPath,
98    pub(crate) addr_hash: AddrHash,
99    pub(crate) bridge: Option<Arc<dyn RuntimeBridge<R>>>,
100    /// Optional namespace policy enforced on every `send_raw` call.
101    ///
102    /// `None` means no enforcement (test/stub contexts; actors spawned without
103    /// a ChildSpec).  When `Some`, destinations whose path violates the policy
104    /// cause `send_raw` to return `Err(SendError::PolicyViolation)`.
105    pub(crate) namespace_policy: Option<NamespacePolicy>,
106    /// Per-actor cache of direct mailbox senders for hot destinations.
107    ///
108    /// Bypasses `TransportRegistry::try_route` (DashMap lookup) for locally
109    /// known actors.  See [`AddrCache`] for eviction rules.
110    pub(crate) addr_cache: AddrCache,
111    pub(crate) _phantom: std::marker::PhantomData<R>,
112}
113
114impl<R: crate::bridge::Reactor> fmt::Debug for ActorContext<R> {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        f.debug_struct("ActorContext")
117            .field("path", &self.path)
118            .field("addr_hash", &self.addr_hash)
119            .field("bridge", &self.bridge.as_ref().map(|_| "<bridge>"))
120            .finish()
121    }
122}
123
124impl<R: crate::bridge::Reactor> ActorContext<R> {
125    /// Create a context without a runtime bridge (Phase 1 / test use).
126    pub fn new(path: ActorPath, addr_hash: AddrHash) -> Self {
127        Self {
128            path,
129            addr_hash,
130            bridge: None,
131            namespace_policy: None,
132            addr_cache: AddrCache::default(),
133            _phantom: std::marker::PhantomData,
134        }
135    }
136
137    /// Create a context with a runtime bridge.  Called by `pd-runtime` at
138    /// spawn time.
139    pub fn with_bridge(
140        path: ActorPath,
141        addr_hash: AddrHash,
142        bridge: Arc<dyn RuntimeBridge<R>>,
143    ) -> Self {
144        Self {
145            path,
146            addr_hash,
147            bridge: Some(bridge),
148            namespace_policy: None,
149            addr_cache: AddrCache::default(),
150            _phantom: std::marker::PhantomData,
151        }
152    }
153
154    /// Builder-style method to attach a namespace policy.
155    ///
156    /// Called by the supervisor when spawning actors from a `ChildSpec`.
157    pub fn with_namespace_policy(mut self, policy: NamespacePolicy) -> Self {
158        self.namespace_policy = Some(policy);
159        self
160    }
161
162    pub fn path(&self) -> &ActorPath {
163        &self.path
164    }
165
166    pub fn addr_hash(&self) -> AddrHash {
167        self.addr_hash
168    }
169
170    /// Spawn a child actor under this actor's supervision tree.
171    ///
172    /// Returns the child's `AddrHash` (derived immediately from path);
173    /// the runtime processes the actual spawn asynchronously.
174    ///
175    /// # Panics
176    /// Panics when called without a runtime bridge (Phase 1 / stub contexts).
177    pub fn spawn(&mut self, spec: ChildSpec<R>) -> AddrHash {
178        self.bridge
179            .as_ref()
180            .expect("ActorContext::spawn requires a runtime bridge")
181            .spawn_child(spec, &self.path)
182    }
183
184    /// Send a pre-built envelope through the local transport layer.
185    ///
186    /// If a namespace policy is set, the destination is resolved to an
187    /// `ActorPath` and checked against the policy.  Unknown destinations
188    /// (hash not found in the registry) are allowed through — the runtime
189    /// will deliver or drop as appropriate.
190    ///
191    /// **Fast path**: On the second and subsequent sends to the same local
192    /// actor, the cached `CachedSendFn` is used directly — bypassing the
193    /// global `TransportRegistry` DashMap lookup.  The cache is populated on
194    /// the first successful delivery and evicted on `ActorStopped`.
195    ///
196    /// Response envelopes (`FLAG_RESPONSE` set) always bypass the cache and go
197    /// through `RuntimeBridge::route` so the `ResponseRegistry` can complete
198    /// pending `ask()` futures.
199    ///
200    /// Returns `Err(SendError::PolicyViolation)` if the destination path
201    /// is not permitted by the current namespace policy.
202    /// Returns `Err(SendError::ActorStopped)` if called without a bridge.
203    pub fn send_raw(
204        &mut self,
205        envelope: Envelope,
206        payload: MessagePayload,
207    ) -> Result<(), SendError> {
208        // Policy check before routing.
209        if let (Some(policy), Some(bridge)) = (&self.namespace_policy, &self.bridge) {
210            if let Some(dest_path) = bridge.resolve_addr(envelope.destination) {
211                if !policy.allows(&dest_path) {
212                    return Err(SendError::PolicyViolation);
213                }
214            }
215        }
216
217        // Response envelopes complete a pending ask() — must go through route()
218        // so the ResponseRegistry sees them, not the destination's mailbox.
219        if envelope.is_response() {
220            return match &self.bridge {
221                Some(b) => b.route(envelope, payload),
222                None => Err(SendError::ActorStopped),
223            };
224        }
225
226        let dest = envelope.destination;
227
228        // Fast path: cached direct sender (no DashMap lookup).
229        if let Some(cached) = self.addr_cache.get(dest).cloned() {
230            let result = cached(envelope, payload);
231            if matches!(result, Err(SendError::ActorStopped)) {
232                self.addr_cache.remove(dest);
233            }
234            return result;
235        }
236
237        // Cache miss: try to get a direct sender from the bridge.
238        let cached_opt = self.bridge.as_ref().and_then(|b| b.cached_sender(dest));
239        match cached_opt {
240            Some(cached_fn) => {
241                let result = cached_fn(envelope, payload);
242                if result.is_ok() {
243                    self.addr_cache.insert(dest, cached_fn);
244                }
245                result
246            }
247            None => {
248                // No local mailbox (remote / federated / no bridge).
249                match &self.bridge {
250                    Some(b) => b.route(envelope, payload),
251                    None => Err(SendError::ActorStopped),
252                }
253            }
254        }
255    }
256
257    /// Request that this actor stop after the current message handler returns.
258    ///
259    /// # Panics
260    /// Panics when called without a runtime bridge (Phase 1 / stub contexts).
261    pub fn stop(&mut self) {
262        self.bridge
263            .as_ref()
264            .expect("ActorContext::stop requires a runtime bridge")
265            .stop_self();
266    }
267
268    /// Access the reactor for spawning, time, and async primitives.
269    ///
270    /// # Panics
271    /// Panics when called without a runtime bridge (Phase 1 / stub contexts).
272    pub fn reactor(&self) -> &R {
273        self.bridge
274            .as_ref()
275            .expect("ActorContext::reactor requires a runtime bridge")
276            .reactor()
277    }
278
279    /// Return the current monotonic time.
280    ///
281    /// Phase 3 routes this through the `Reactor` trait for deterministic
282    /// simulation.
283    pub fn now(&self) -> std::time::Instant {
284        match &self.bridge {
285            Some(b) => b.now(),
286            None => std::time::Instant::now(),
287        }
288    }
289
290    /// Schedule a message to be delivered after `delay`.
291    ///
292    /// # Panics
293    /// Panics when called without a runtime bridge.
294    pub fn send_after(&self, delay: Duration, envelope: Envelope, payload: MessagePayload) {
295        self.bridge
296            .as_ref()
297            .expect("ActorContext::send_after requires a runtime bridge")
298            .send_after(delay, envelope, payload);
299    }
300
301    /// Resolve an actor's current address by its path.
302    ///
303    /// Returns `None` if the actor is not currently running or if no bridge
304    /// is available (test/stub contexts).
305    pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
306        self.bridge.as_ref()?.lookup_path(path)
307    }
308
309    /// Return a cached direct sender for `addr`, populating the cache on first call.
310    ///
311    /// Checks `addr_cache` first; on a miss, delegates to the bridge's
312    /// `cached_sender()` and inserts the result.  Returns `None` when the
313    /// destination is not local (remote / federated / no bridge).
314    ///
315    /// Used by `FfiBridge::cached_sender` to wire the fresh FFI context's
316    /// addr_cache through to the real `RuntimeBridgeImpl`.
317    pub fn cached_sender_for(&mut self, addr: AddrHash) -> Option<CachedSendFn> {
318        if let Some(cached) = self.addr_cache.get(addr) {
319            return Some(cached.clone());
320        }
321        let cached_fn = self.bridge.as_ref()?.cached_sender(addr)?;
322        self.addr_cache.insert(addr, cached_fn.clone());
323        Some(cached_fn)
324    }
325}