Skip to main content

palladium_runtime/engine/
handle.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9    Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10    PoolConfig, RemoteMessage, SendError, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14    InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15    QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16    TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::common::LifecycleSignal;
21use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
22use crate::reactor::{Reactor, TokioReactor};
23use crate::registry::ActorRegistry;
24use crate::responses::ResponseRegistry;
25
26/// Handle for interacting with the engine from external (test) code.
27#[derive(Clone)]
28pub struct EngineHandle<
29    R: Reactor = TokioReactor,
30    N: Network = TokioNetwork,
31    F: FileSystem = TokioFileSystem,
32> {
33    pub(crate) transport: Arc<InProcessTransport>,
34    pub(crate) transport_registry: Arc<TransportRegistry>,
35    pub(crate) type_registry: TypeRegistry,
36    pub(crate) responses: Arc<ResponseRegistry>,
37    pub(crate) registry: Arc<ActorRegistry<R>>,
38    pub(crate) ask_timeout: Duration,
39    pub(crate) reactor: R,
40    pub(crate) network: N,
41    pub(crate) _fs: F,
42    pub(crate) start_time: Instant,
43    pub(crate) source_addr: AddrHash,
44    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
45    /// Cache of direct `MailboxSender`s for local actors, keyed by `AddrHash`.
46    ///
47    /// Populated on the first `send_to` to a local actor; avoids the DashMap
48    /// lookup in `TransportRegistry::try_route` on subsequent sends.
49    /// Evicted when a send returns `ActorStopped` (receiver dropped).
50    /// Shared across handle clones via `Arc` so all clones benefit.
51    pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
52    /// Receiving end of the response pump mailbox.  Consumed by the first
53    /// `remote_addr_for` ask, which spawns a long-lived pump task.
54    pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
55}
56
57impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
58    pub fn type_registry(&self) -> TypeRegistry {
59        self.type_registry.clone()
60    }
61
62    /// Send a message to an actor by its `AddrHash`.
63    ///
64    /// **Fast path**: after the first send to a local actor, the `MailboxSender`
65    /// is cached in `send_cache`, bypassing the `TransportRegistry` DashMap on
66    /// subsequent sends.  Stale entries are evicted on `ActorStopped`.
67    pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
68        let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
69        let payload = MessagePayload::local(msg);
70
71        // Fast path: use cached MailboxSender to avoid DashMap in TransportRegistry.
72        if let Some(sender) = self.send_cache.get(&target) {
73            return match sender.try_send(MailboxMessage { envelope, payload }) {
74                Ok(()) => Ok(()),
75                Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
76                Err(_) => {
77                    // Receiver dropped — evict stale cache entry.
78                    drop(sender); // release DashMap shard lock before remove
79                    self.send_cache.remove(&target);
80                    Err(SendError::ActorStopped)
81                }
82            };
83        }
84
85        // Cache miss: check if this is a local actor, populate cache, and send.
86        if let Some(sender) = self.transport_registry.get_local_sender(target) {
87            let result = sender.try_send(MailboxMessage { envelope, payload });
88            match result {
89                Ok(()) => {
90                    self.send_cache.insert(target, sender);
91                    return Ok(());
92                }
93                Err(TransportError::MailboxFull) => {
94                    // Actor is alive but full; cache for next send.
95                    self.send_cache.insert(target, sender);
96                    return Err(SendError::MailboxFull);
97                }
98                Err(_) => return Err(SendError::ActorStopped),
99            }
100        }
101
102        // Non-local destination (remote, QUIC, TCP, etc.): full routing.
103        self.transport_registry
104            .try_route(envelope, payload)
105            .map_err(|e| match e {
106                TransportError::MailboxFull => SendError::MailboxFull,
107                TransportError::ConnectionFailed => SendError::ConnectionFailed,
108                TransportError::Unroutable(_) => SendError::Unroutable,
109                TransportError::SerializationRequired | TransportError::SerializationError(_) => {
110                    SendError::SerializationFailed
111                }
112                _ => SendError::ActorStopped,
113            })
114    }
115
116    /// Send up to `n` messages to one target using a producer-side batch fast path.
117    ///
118    /// For local destinations this uses `MailboxSender::try_send_batch_owned`,
119    /// reducing wake/schedule overhead vs repeated `send_to` calls.
120    ///
121    /// Returns the number of messages successfully enqueued. On local mailbox
122    /// backpressure this may be less than `n` without returning an error.
123    pub fn send_many<M: Message + Clone>(
124        &self,
125        target: AddrHash,
126        msg: M,
127        n: usize,
128    ) -> Result<usize, SendError> {
129        if n == 0 {
130            return Ok(0);
131        }
132
133        let make_batch = || {
134            let mut batch = Vec::with_capacity(n);
135            for _ in 0..n {
136                batch.push(MailboxMessage {
137                    envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
138                    payload: MessagePayload::local(msg.clone()),
139                });
140            }
141            batch
142        };
143
144        // Fast path: use cached MailboxSender.
145        if let Some(sender) = self.send_cache.get(&target) {
146            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
147            return match err {
148                None => Ok(sent),
149                Some(TransportError::MailboxFull) => Ok(sent),
150                _ => {
151                    drop(sender);
152                    self.send_cache.remove(&target);
153                    Err(SendError::ActorStopped)
154                }
155            };
156        }
157
158        // Cache miss: local actor lookup and batch send.
159        if let Some(sender) = self.transport_registry.get_local_sender(target) {
160            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
161            match err {
162                None => {
163                    self.send_cache.insert(target, sender);
164                    return Ok(sent);
165                }
166                Some(TransportError::MailboxFull) => {
167                    self.send_cache.insert(target, sender);
168                    return Ok(sent);
169                }
170                _ => return Err(SendError::ActorStopped),
171            }
172        }
173
174        // Non-local fallback: preserve send_to semantics.
175        let mut sent = 0usize;
176        for _ in 0..n {
177            match self.send_to::<M>(target, msg.clone()) {
178                Ok(()) => sent += 1,
179                Err(SendError::MailboxFull) => return Ok(sent),
180                Err(e) => return Err(e),
181            }
182        }
183        Ok(sent)
184    }
185
186    /// Create an `Addr<M>` that sends to `target` via this engine's transport.
187    pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
188    where
189        R: Send + Sync,
190    {
191        let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
192        let ask_fn = make_ask_fn::<M, R>(
193            target,
194            self.source_addr,
195            self.transport.clone(),
196            self.transport_registry.clone(),
197            self.responses.clone(),
198            self.ask_timeout,
199            self.reactor.clone(),
200        );
201        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
202    }
203
204    /// Create a remote `Addr<M>` that always serializes payloads before routing.
205    ///
206    /// This is intended for explicit remote sends; it rejects local destinations
207    /// to avoid accidental local delivery of serialized payloads.
208    pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
209        let registry = self.transport_registry.clone();
210        let source = self.source_addr;
211        let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
212            if registry.is_local(target) {
213                return Err(SendError::PolicyViolation);
214            }
215            let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
216            let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
217            registry
218                .try_route(envelope, MessagePayload::serialized(bytes))
219                .map_err(|e| match e {
220                    TransportError::MailboxFull => SendError::MailboxFull,
221                    TransportError::ConnectionFailed => SendError::ConnectionFailed,
222                    TransportError::Unroutable(_) => SendError::Unroutable,
223                    TransportError::SerializationRequired
224                    | TransportError::SerializationError(_) => SendError::SerializationFailed,
225                    _ => SendError::ActorStopped,
226                })
227        });
228        let ask_fn: palladium_actor::AskFn<M> =
229            Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
230                Box::pin(async { Err(AskError::Unbound) })
231            });
232        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
233    }
234
235    /// Create a typed remote `Addr<M>` that supports both `send()` and `ask()`.
236    ///
237    /// Both request and response payloads are serialized for transport delivery.
238    /// Local destinations are rejected with `AskError::Send(PolicyViolation)`.
239    ///
240    /// Before calling, register the message type on both engines:
241    /// ```ignore
242    /// handle.type_registry().register_remote_ask::<MyMsg>();
243    /// ```
244    ///
245    /// The first `ask()` on the returned address lazily starts the engine's
246    /// response pump task (a lightweight background task that routes incoming
247    /// serialized response frames into the local response registry).
248    pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
249    where
250        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
251        R: Clone + Send + Sync,
252    {
253        let send_fn =
254            make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
255        let ask_fn = make_remote_ask_fn::<M, R>(
256            target,
257            self.source_addr,
258            self.transport_registry.clone(),
259            self.responses.clone(),
260            self.ask_timeout,
261            self.reactor.clone(),
262            Arc::clone(&self.pump_rx),
263        );
264        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
265    }
266
267    pub fn transport(&self) -> &Arc<InProcessTransport> {
268        &self.transport
269    }
270
271    pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
272        &self.transport_registry
273    }
274
275    pub async fn attach_quic_transport(
276        &self,
277        config: QuicTransportConfig,
278        peers: HashMap<EngineId, SocketAddr>,
279    ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
280    where
281        R: Clone,
282        N: Clone,
283    {
284        let transport = QuicTransport::bind(
285            config,
286            peers,
287            self.transport_registry.clone(),
288            self.type_registry.clone(),
289            self.reactor.clone(),
290            self.network.clone(),
291        )
292        .await?;
293        self.transport_registry.add_transport(transport.clone())?;
294        Ok(transport)
295    }
296
297    pub async fn attach_tcp_transport(
298        &self,
299        config: TcpTransportConfig,
300        peers: HashMap<EngineId, SocketAddr>,
301    ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
302    where
303        R: Clone,
304        N: Clone,
305    {
306        let transport = TcpTransport::bind(
307            config,
308            peers,
309            self.transport_registry.clone(),
310            self.type_registry.clone(),
311            self.reactor.clone(),
312            self.network.clone(),
313        )
314        .await?;
315        self.transport_registry.add_transport(transport.clone())?;
316        Ok(transport)
317    }
318
319    // ── Introspection (REQ-085) ───────────────────────────────────────────────
320
321    /// List actors, optionally filtered by `query`.
322    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
323        self.registry.snapshot(&query, 0)
324    }
325
326    /// Get info for a single actor by path.
327    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
328        self.registry.actor_info_by_path(path, 0)
329    }
330
331    /// System-level snapshot: all actors + uptime.
332    pub fn snapshot(&self) -> EngineSnapshot {
333        let actors = self.actor_list(ActorQuery::default());
334        let uptime_secs = self.start_time.elapsed().as_secs();
335        EngineSnapshot {
336            num_cores: 1,
337            uptime_secs,
338            actors,
339        }
340    }
341
342    /// Lookup an actor's address by its path.
343    pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
344        self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
345            self.federated_routing
346                .as_ref()
347                .and_then(|routing| routing.resolve_optional(path))
348        })
349    }
350
351    /// Resolve an actor's address by path with routing errors.
352    pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
353        if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
354            return Ok(local);
355        }
356        if let Some(routing) = &self.federated_routing {
357            return routing.resolve(path);
358        }
359        Err(SendError::Unroutable)
360    }
361
362    /// Send a hard-stop signal to an actor by path.
363    ///
364    /// Returns `true` if the actor was found and the signal was sent. The
365    /// actor may still be running until the signal is processed by the
366    /// reactor. Used by `FaultInjector::crash_actor`.
367    pub fn stop_actor(&self, path: &ActorPath) -> bool {
368        if let Some(slot) = self.registry.get_by_path(path) {
369            slot.ctrl_tx
370                .send(LifecycleSignal::Stop(StopReason::Supervisor))
371                .ok();
372            true
373        } else {
374            false
375        }
376    }
377
378    /// Fill an actor's mailbox to capacity by sending dummy messages.
379    ///
380    /// Returns the number of messages that were successfully enqueued.
381    /// Subsequent sends to the same actor will return `SendError::MailboxFull`
382    /// until the actor drains messages. Used by `FaultInjector::fill_mailbox`.
383    pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
384        let addr = self.registry.get_by_path(path).map(|s| s.addr);
385        let Some(addr) = addr else { return 0 };
386        let src = AddrHash::synthetic(b"fault-fill-mailbox");
387        let mut count = 0;
388        loop {
389            let env = Envelope::new(src, addr, 0, 0);
390            match self.transport.try_deliver(env, MessagePayload::local(())) {
391                Ok(()) => count += 1,
392                Err(_) => break,
393            }
394        }
395        count
396    }
397
398    /// Dynamically spawn a user actor by sending a signal to the root /user supervisor.
399    pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
400        let user_path = ActorPath::parse("/user").unwrap();
401        if let Some(slot) = self.registry.get_by_path(&user_path) {
402            slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
403        }
404    }
405
406    /// Spawn a pool of N identical worker actors under the `/user` supervisor
407    /// and return a [`WorkerPool<M>`] routing handle.
408    ///
409    /// Each worker gets a [`StableAddr<M>`] backed by an [`AddrRefresher`] so
410    /// that the pool transparently routes to new incarnations after restarts.
411    ///
412    /// # Ordering
413    ///
414    /// Spawn requests are sent asynchronously to the `/user` supervisor.  The
415    /// returned pool's `StableAddr`s are initially `None`; they are populated
416    /// once the supervisor task processes each `SpawnChild` signal.  Yield at
417    /// least once (e.g. `tokio::task::yield_now().await`) before sending
418    /// messages through the pool.
419    ///
420    /// # Panics
421    ///
422    /// Panics if `config.size` is 0 (no workers requested).
423    pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
424    where
425        M: Message,
426        R: Clone + Send + Sync + 'static,
427        G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
428    {
429        let user_path = ActorPath::parse("/user").unwrap();
430        let handle = self.clone();
431        crate::worker_pool::spawn_worker_pool(
432            &user_path,
433            config,
434            factory,
435            self.source_addr,
436            &self.transport,
437            &self.transport_registry,
438            &self.responses,
439            &self.reactor,
440            move |spec| handle.spawn_user_actor(spec),
441        )
442    }
443}