palladium_actor/actor.rs
1use crate::bridge::{CachedSendFn, RuntimeBridge};
2use crate::envelope::Envelope;
3use crate::errors::{ActorError, SendError};
4use crate::message::MessagePayload;
5use crate::path::{ActorPath, AddrHash};
6use crate::policy::NamespacePolicy;
7use crate::spec::ChildSpec;
8use std::collections::HashMap;
9use std::fmt;
10use std::sync::Arc;
11use std::time::Duration;
12
13// ── AddrCache ─────────────────────────────────────────────────────────────────
14
15/// Per-actor cache of direct `MailboxSender` wrappers for frequently-contacted
16/// destinations.
17///
18/// Populated on the first successful `send_raw` to a local actor; evicted when
19/// delivery returns `SendError::ActorStopped`. Cloned contexts receive a fresh
20/// empty cache so parent/child actors don't share stale entries.
21#[derive(Default)]
22pub(crate) struct AddrCache {
23 entries: HashMap<AddrHash, CachedSendFn>,
24}
25
26impl Clone for AddrCache {
27 fn clone(&self) -> Self {
28 Self::default()
29 }
30}
31
32impl AddrCache {
33 pub(crate) fn get(&self, addr: AddrHash) -> Option<&CachedSendFn> {
34 self.entries.get(&addr)
35 }
36
37 pub(crate) fn insert(&mut self, addr: AddrHash, f: CachedSendFn) {
38 self.entries.insert(addr, f);
39 }
40
41 pub(crate) fn remove(&mut self, addr: AddrHash) {
42 self.entries.remove(&addr);
43 }
44}
45
46// ── StopReason ────────────────────────────────────────────────────────────────
47
48/// The reason supplied to [`Actor::on_stop`].
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum StopReason {
51 /// Actor exited cleanly under normal conditions.
52 Normal,
53 /// Stop was requested explicitly (e.g., `ActorContext::stop`).
54 Requested,
55 /// Stop was initiated by the actor's supervisor.
56 Supervisor,
57 /// Actor was stopped because its parent stopped (REQ-004).
58 Hierarchical,
59 /// Engine is shutting down.
60 Shutdown,
61 /// Actor was force-killed (e.g., `ShutdownPolicy::Brutal`).
62 Killed,
63 /// Actor failed with an error that caused it to stop.
64 Error(ActorError),
65}
66
67// ── Actor ─────────────────────────────────────────────────────────────────────
68
69/// The core actor trait.
70///
71/// Implementations provide lifecycle hooks. The runtime calls `on_start`
72/// before the first `on_message`, and `on_stop` after the last.
73pub trait Actor<R: crate::bridge::Reactor>: Send + 'static {
74 fn on_start(&mut self, _ctx: &mut ActorContext<R>) -> Result<(), ActorError> {
75 Ok(())
76 }
77
78 fn on_message(
79 &mut self,
80 _ctx: &mut ActorContext<R>,
81 _envelope: &Envelope,
82 _payload: MessagePayload,
83 ) -> Result<(), ActorError>;
84
85 fn on_stop(&mut self, _ctx: &mut ActorContext<R>, _reason: StopReason) {}
86}
87
88// ── ActorContext ──────────────────────────────────────────────────────────────
89
90/// Runtime handle provided to each lifecycle hook.
91///
92/// `bridge` is `None` in test/stub contexts (Phase 1). `pd-runtime` injects
93/// a real bridge via `ActorContext::with_bridge` when spawning actors so that
94/// `spawn`, `stop`, `send_raw`, and `send_after` are fully functional.
95#[derive(Clone)]
96pub struct ActorContext<R: crate::bridge::Reactor> {
97 pub(crate) path: ActorPath,
98 pub(crate) addr_hash: AddrHash,
99 pub(crate) bridge: Option<Arc<dyn RuntimeBridge<R>>>,
100 /// Optional namespace policy enforced on every `send_raw` call.
101 ///
102 /// `None` means no enforcement (test/stub contexts; actors spawned without
103 /// a ChildSpec). When `Some`, destinations whose path violates the policy
104 /// cause `send_raw` to return `Err(SendError::PolicyViolation)`.
105 pub(crate) namespace_policy: Option<NamespacePolicy>,
106 /// Per-actor cache of direct mailbox senders for hot destinations.
107 ///
108 /// Bypasses `TransportRegistry::try_route` (DashMap lookup) for locally
109 /// known actors. See [`AddrCache`] for eviction rules.
110 pub(crate) addr_cache: AddrCache,
111 pub(crate) _phantom: std::marker::PhantomData<R>,
112}
113
114impl<R: crate::bridge::Reactor> fmt::Debug for ActorContext<R> {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 f.debug_struct("ActorContext")
117 .field("path", &self.path)
118 .field("addr_hash", &self.addr_hash)
119 .field("bridge", &self.bridge.as_ref().map(|_| "<bridge>"))
120 .finish()
121 }
122}
123
124impl<R: crate::bridge::Reactor> ActorContext<R> {
125 /// Create a context without a runtime bridge (Phase 1 / test use).
126 pub fn new(path: ActorPath, addr_hash: AddrHash) -> Self {
127 Self {
128 path,
129 addr_hash,
130 bridge: None,
131 namespace_policy: None,
132 addr_cache: AddrCache::default(),
133 _phantom: std::marker::PhantomData,
134 }
135 }
136
137 /// Create a context with a runtime bridge. Called by `pd-runtime` at
138 /// spawn time.
139 pub fn with_bridge(
140 path: ActorPath,
141 addr_hash: AddrHash,
142 bridge: Arc<dyn RuntimeBridge<R>>,
143 ) -> Self {
144 Self {
145 path,
146 addr_hash,
147 bridge: Some(bridge),
148 namespace_policy: None,
149 addr_cache: AddrCache::default(),
150 _phantom: std::marker::PhantomData,
151 }
152 }
153
154 /// Builder-style method to attach a namespace policy.
155 ///
156 /// Called by the supervisor when spawning actors from a `ChildSpec`.
157 pub fn with_namespace_policy(mut self, policy: NamespacePolicy) -> Self {
158 self.namespace_policy = Some(policy);
159 self
160 }
161
162 pub fn path(&self) -> &ActorPath {
163 &self.path
164 }
165
166 pub fn addr_hash(&self) -> AddrHash {
167 self.addr_hash
168 }
169
170 /// Spawn a child actor under this actor's supervision tree.
171 ///
172 /// Returns the child's `AddrHash` (derived immediately from path);
173 /// the runtime processes the actual spawn asynchronously.
174 ///
175 /// # Panics
176 /// Panics when called without a runtime bridge (Phase 1 / stub contexts).
177 pub fn spawn(&mut self, spec: ChildSpec<R>) -> AddrHash {
178 self.bridge
179 .as_ref()
180 .expect("ActorContext::spawn requires a runtime bridge")
181 .spawn_child(spec, &self.path)
182 }
183
184 /// Send a pre-built envelope through the local transport layer.
185 ///
186 /// If a namespace policy is set, the destination is resolved to an
187 /// `ActorPath` and checked against the policy. Unknown destinations
188 /// (hash not found in the registry) are allowed through — the runtime
189 /// will deliver or drop as appropriate.
190 ///
191 /// **Fast path**: On the second and subsequent sends to the same local
192 /// actor, the cached `CachedSendFn` is used directly — bypassing the
193 /// global `TransportRegistry` DashMap lookup. The cache is populated on
194 /// the first successful delivery and evicted on `ActorStopped`.
195 ///
196 /// Response envelopes (`FLAG_RESPONSE` set) always bypass the cache and go
197 /// through `RuntimeBridge::route` so the `ResponseRegistry` can complete
198 /// pending `ask()` futures.
199 ///
200 /// Returns `Err(SendError::PolicyViolation)` if the destination path
201 /// is not permitted by the current namespace policy.
202 /// Returns `Err(SendError::ActorStopped)` if called without a bridge.
203 pub fn send_raw(
204 &mut self,
205 envelope: Envelope,
206 payload: MessagePayload,
207 ) -> Result<(), SendError> {
208 // Policy check before routing.
209 if let (Some(policy), Some(bridge)) = (&self.namespace_policy, &self.bridge) {
210 if let Some(dest_path) = bridge.resolve_addr(envelope.destination) {
211 if !policy.allows(&dest_path) {
212 return Err(SendError::PolicyViolation);
213 }
214 }
215 }
216
217 // Response envelopes complete a pending ask() — must go through route()
218 // so the ResponseRegistry sees them, not the destination's mailbox.
219 if envelope.is_response() {
220 return match &self.bridge {
221 Some(b) => b.route(envelope, payload),
222 None => Err(SendError::ActorStopped),
223 };
224 }
225
226 let dest = envelope.destination;
227
228 // Fast path: cached direct sender (no DashMap lookup).
229 if let Some(cached) = self.addr_cache.get(dest).cloned() {
230 let result = cached(envelope, payload);
231 if matches!(result, Err(SendError::ActorStopped)) {
232 self.addr_cache.remove(dest);
233 }
234 return result;
235 }
236
237 // Cache miss: try to get a direct sender from the bridge.
238 let cached_opt = self.bridge.as_ref().and_then(|b| b.cached_sender(dest));
239 match cached_opt {
240 Some(cached_fn) => {
241 let result = cached_fn(envelope, payload);
242 if result.is_ok() {
243 self.addr_cache.insert(dest, cached_fn);
244 }
245 result
246 }
247 None => {
248 // No local mailbox (remote / federated / no bridge).
249 match &self.bridge {
250 Some(b) => b.route(envelope, payload),
251 None => Err(SendError::ActorStopped),
252 }
253 }
254 }
255 }
256
257 /// Request that this actor stop after the current message handler returns.
258 ///
259 /// # Panics
260 /// Panics when called without a runtime bridge (Phase 1 / stub contexts).
261 pub fn stop(&mut self) {
262 self.bridge
263 .as_ref()
264 .expect("ActorContext::stop requires a runtime bridge")
265 .stop_self();
266 }
267
268 /// Access the reactor for spawning, time, and async primitives.
269 ///
270 /// # Panics
271 /// Panics when called without a runtime bridge (Phase 1 / stub contexts).
272 pub fn reactor(&self) -> &R {
273 self.bridge
274 .as_ref()
275 .expect("ActorContext::reactor requires a runtime bridge")
276 .reactor()
277 }
278
279 /// Return the current monotonic time.
280 ///
281 /// Phase 3 routes this through the `Reactor` trait for deterministic
282 /// simulation.
283 pub fn now(&self) -> std::time::Instant {
284 match &self.bridge {
285 Some(b) => b.now(),
286 None => std::time::Instant::now(),
287 }
288 }
289
290 /// Schedule a message to be delivered after `delay`.
291 ///
292 /// # Panics
293 /// Panics when called without a runtime bridge.
294 pub fn send_after(&self, delay: Duration, envelope: Envelope, payload: MessagePayload) {
295 self.bridge
296 .as_ref()
297 .expect("ActorContext::send_after requires a runtime bridge")
298 .send_after(delay, envelope, payload);
299 }
300
301 /// Resolve an actor's current address by its path.
302 ///
303 /// Returns `None` if the actor is not currently running or if no bridge
304 /// is available (test/stub contexts).
305 pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
306 self.bridge.as_ref()?.lookup_path(path)
307 }
308
309 /// Return a cached direct sender for `addr`, populating the cache on first call.
310 ///
311 /// Checks `addr_cache` first; on a miss, delegates to the bridge's
312 /// `cached_sender()` and inserts the result. Returns `None` when the
313 /// destination is not local (remote / federated / no bridge).
314 ///
315 /// Used by `FfiBridge::cached_sender` to wire the fresh FFI context's
316 /// addr_cache through to the real `RuntimeBridgeImpl`.
317 pub fn cached_sender_for(&mut self, addr: AddrHash) -> Option<CachedSendFn> {
318 if let Some(cached) = self.addr_cache.get(addr) {
319 return Some(cached.clone());
320 }
321 let cached_fn = self.bridge.as_ref()?.cached_sender(addr)?;
322 self.addr_cache.insert(addr, cached_fn.clone());
323 Some(cached_fn)
324 }
325}