Skip to main content

palladium_runtime/engine/
handle.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9    Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10    PoolConfig, RemoteMessage, SendError, StableAddr, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14    mailbox, InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15    QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16    TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::bounded_cluster::{
21    register_spawn_peer_installer, remote_spawn_wait_timeout, validate_config,
22    BoundedClusterConfig, BoundedClusterHandle, BoundedTransportConfig, BoundedTransportHandle,
23    ClusterError, ControlPlaneClientConfig, ControlPlaneProtocol,
24};
25use crate::common::LifecycleSignal;
26use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
27use crate::reactor::{Reactor, TokioReactor};
28use crate::registry::ActorRegistry;
29use crate::responses::ResponseRegistry;
30
31/// Handle for interacting with the engine from external (test) code.
32#[derive(Clone)]
33pub struct EngineHandle<
34    R: Reactor = TokioReactor,
35    N: Network = TokioNetwork,
36    F: FileSystem = TokioFileSystem,
37> {
38    pub(crate) transport: Arc<InProcessTransport>,
39    pub(crate) transport_registry: Arc<TransportRegistry>,
40    pub(crate) type_registry: TypeRegistry,
41    pub(crate) responses: Arc<ResponseRegistry>,
42    pub(crate) registry: Arc<ActorRegistry<R>>,
43    pub(crate) ask_timeout: Duration,
44    pub(crate) reactor: R,
45    pub(crate) network: N,
46    pub(crate) _fs: F,
47    pub(crate) start_time: Instant,
48    pub(crate) source_addr: AddrHash,
49    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
50    /// Cache of direct `MailboxSender`s for local actors, keyed by `AddrHash`.
51    ///
52    /// Populated on the first `send_to` to a local actor; avoids the DashMap
53    /// lookup in `TransportRegistry::try_route` on subsequent sends.
54    /// Evicted when a send returns `ActorStopped` (receiver dropped).
55    /// Shared across handle clones via `Arc` so all clones benefit.
56    pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
57    /// Receiving end of the response pump mailbox.  Consumed by the first
58    /// `remote_addr_for` ask, which spawns a long-lived pump task.
59    pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
60}
61
62impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
63    fn advertise_existing_actors<T: palladium_transport::Transport>(
64        &self,
65        transport: &Arc<T>,
66    ) -> Result<(), TransportError> {
67        transport.register(self.source_addr, mailbox(1).0)?;
68        for info in self.registry.snapshot(&ActorQuery::default(), 0) {
69            if info.state != crate::introspection::ActorState::Running {
70                continue;
71            }
72            if let Some(slot) = self.registry.get_by_path(&info.path) {
73                transport.advertise_path(slot.addr, &info.path)?;
74            }
75        }
76        Ok(())
77    }
78
79    pub fn type_registry(&self) -> TypeRegistry {
80        self.type_registry.clone()
81    }
82
83    /// Send a message to an actor by its `AddrHash`.
84    ///
85    /// **Fast path**: after the first send to a local actor, the `MailboxSender`
86    /// is cached in `send_cache`, bypassing the `TransportRegistry` DashMap on
87    /// subsequent sends.  Stale entries are evicted on `ActorStopped`.
88    pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
89        let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
90        let payload = MessagePayload::local(msg);
91
92        // Fast path: use cached MailboxSender to avoid DashMap in TransportRegistry.
93        if let Some(sender) = self.send_cache.get(&target) {
94            return match sender.try_send(MailboxMessage { envelope, payload }) {
95                Ok(()) => Ok(()),
96                Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
97                Err(_) => {
98                    // Receiver dropped — evict stale cache entry.
99                    drop(sender); // release DashMap shard lock before remove
100                    self.send_cache.remove(&target);
101                    Err(SendError::ActorStopped)
102                }
103            };
104        }
105
106        // Cache miss: check if this is a local actor, populate cache, and send.
107        if let Some(sender) = self.transport_registry.get_local_sender(target) {
108            let result = sender.try_send(MailboxMessage { envelope, payload });
109            match result {
110                Ok(()) => {
111                    self.send_cache.insert(target, sender);
112                    return Ok(());
113                }
114                Err(TransportError::MailboxFull) => {
115                    // Actor is alive but full; cache for next send.
116                    self.send_cache.insert(target, sender);
117                    return Err(SendError::MailboxFull);
118                }
119                Err(_) => return Err(SendError::ActorStopped),
120            }
121        }
122
123        // Non-local destination (remote, QUIC, TCP, etc.): full routing.
124        self.transport_registry
125            .try_route(envelope, payload)
126            .map_err(|e| match e {
127                TransportError::MailboxFull => SendError::MailboxFull,
128                TransportError::ConnectionFailed => SendError::ConnectionFailed,
129                TransportError::Unroutable(_) => SendError::Unroutable,
130                TransportError::SerializationRequired
131                | TransportError::SerializationError(_)
132                | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
133                _ => SendError::ActorStopped,
134            })
135    }
136
137    /// Send up to `n` messages to one target using a producer-side batch fast path.
138    ///
139    /// For local destinations this uses `MailboxSender::try_send_batch_owned`,
140    /// reducing wake/schedule overhead vs repeated `send_to` calls.
141    ///
142    /// Returns the number of messages successfully enqueued. On local mailbox
143    /// backpressure this may be less than `n` without returning an error.
144    pub fn send_many<M: Message + Clone>(
145        &self,
146        target: AddrHash,
147        msg: M,
148        n: usize,
149    ) -> Result<usize, SendError> {
150        if n == 0 {
151            return Ok(0);
152        }
153
154        let make_batch = || {
155            let mut batch = Vec::with_capacity(n);
156            for _ in 0..n {
157                batch.push(MailboxMessage {
158                    envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
159                    payload: MessagePayload::local(msg.clone()),
160                });
161            }
162            batch
163        };
164
165        // Fast path: use cached MailboxSender.
166        if let Some(sender) = self.send_cache.get(&target) {
167            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
168            return match err {
169                None => Ok(sent),
170                Some(TransportError::MailboxFull) => Ok(sent),
171                _ => {
172                    drop(sender);
173                    self.send_cache.remove(&target);
174                    Err(SendError::ActorStopped)
175                }
176            };
177        }
178
179        // Cache miss: local actor lookup and batch send.
180        if let Some(sender) = self.transport_registry.get_local_sender(target) {
181            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
182            match err {
183                None => {
184                    self.send_cache.insert(target, sender);
185                    return Ok(sent);
186                }
187                Some(TransportError::MailboxFull) => {
188                    self.send_cache.insert(target, sender);
189                    return Ok(sent);
190                }
191                _ => return Err(SendError::ActorStopped),
192            }
193        }
194
195        // Non-local fallback: preserve send_to semantics.
196        let mut sent = 0usize;
197        for _ in 0..n {
198            match self.send_to::<M>(target, msg.clone()) {
199                Ok(()) => sent += 1,
200                Err(SendError::MailboxFull) => return Ok(sent),
201                Err(e) => return Err(e),
202            }
203        }
204        Ok(sent)
205    }
206
207    /// Create an `Addr<M>` that sends to `target` via this engine's transport.
208    pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
209    where
210        R: Send + Sync,
211    {
212        let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
213        let ask_fn = make_ask_fn::<M, R>(
214            target,
215            self.source_addr,
216            self.transport.clone(),
217            self.transport_registry.clone(),
218            self.responses.clone(),
219            self.ask_timeout,
220            self.reactor.clone(),
221        );
222        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
223    }
224
225    /// Create a remote `Addr<M>` that always serializes payloads before routing.
226    ///
227    /// This is intended for explicit remote sends; it rejects local destinations
228    /// to avoid accidental local delivery of serialized payloads.
229    pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
230        let registry = self.transport_registry.clone();
231        let source = self.source_addr;
232        let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
233            if registry.is_local(target) {
234                return Err(SendError::PolicyViolation);
235            }
236            let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
237            let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
238            registry
239                .try_route(envelope, MessagePayload::serialized(bytes))
240                .map_err(|e| match e {
241                    TransportError::MailboxFull => SendError::MailboxFull,
242                    TransportError::ConnectionFailed => SendError::ConnectionFailed,
243                    TransportError::Unroutable(_) => SendError::Unroutable,
244                    TransportError::SerializationRequired
245                    | TransportError::SerializationError(_)
246                    | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
247                    _ => SendError::ActorStopped,
248                })
249        });
250        let ask_fn: palladium_actor::AskFn<M> =
251            Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
252                Box::pin(async { Err(AskError::Unbound) })
253            });
254        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
255    }
256
257    /// Create a typed remote `Addr<M>` that supports both `send()` and `ask()`.
258    ///
259    /// Both request and response payloads are serialized for transport delivery.
260    /// Local destinations are rejected with `AskError::Send(PolicyViolation)`.
261    ///
262    /// Before calling, register the message type on both engines:
263    /// ```ignore
264    /// handle.type_registry().register_remote_ask::<MyMsg>();
265    /// ```
266    ///
267    /// The first `ask()` on the returned address lazily starts the engine's
268    /// response pump task (a lightweight background task that routes incoming
269    /// serialized response frames into the local response registry).
270    pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
271    where
272        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
273        R: Clone + Send + Sync,
274    {
275        let send_fn =
276            make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
277        let ask_fn = make_remote_ask_fn::<M, R>(
278            target,
279            self.source_addr,
280            self.transport_registry.clone(),
281            self.responses.clone(),
282            self.ask_timeout,
283            self.reactor.clone(),
284            Arc::clone(&self.pump_rx),
285        );
286        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
287    }
288
289    pub fn transport(&self) -> &Arc<InProcessTransport> {
290        &self.transport
291    }
292
293    pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
294        &self.transport_registry
295    }
296
297    pub async fn attach_quic_transport(
298        &self,
299        config: QuicTransportConfig,
300        peers: HashMap<EngineId, SocketAddr>,
301    ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
302    where
303        R: Clone,
304        N: Clone,
305    {
306        let transport = QuicTransport::bind(
307            config,
308            peers,
309            self.transport_registry.clone(),
310            self.type_registry.clone(),
311            self.reactor.clone(),
312            self.network.clone(),
313        )
314        .await?;
315        self.transport_registry.add_transport(transport.clone())?;
316        self.advertise_existing_actors(&transport)?;
317        Ok(transport)
318    }
319
320    pub async fn attach_tcp_transport(
321        &self,
322        config: TcpTransportConfig,
323        peers: HashMap<EngineId, SocketAddr>,
324    ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
325    where
326        R: Clone,
327        N: Clone,
328    {
329        let transport = TcpTransport::bind(
330            config,
331            peers,
332            self.transport_registry.clone(),
333            self.type_registry.clone(),
334            self.reactor.clone(),
335            self.network.clone(),
336        )
337        .await?;
338        self.transport_registry.add_transport(transport.clone())?;
339        self.advertise_existing_actors(&transport)?;
340        Ok(transport)
341    }
342
343    pub async fn attach_bounded_cluster(
344        &self,
345        config: BoundedClusterConfig,
346    ) -> Result<BoundedClusterHandle<R, N, F>, ClusterError>
347    where
348        R: Clone,
349        N: Clone,
350    {
351        validate_config(&config, &self.registry)?;
352
353        let local_engine_id = config.transport.engine_id().clone();
354        let peers = config.peers;
355        let declared_actors = config.declared_actors;
356        let roles = config.roles;
357
358        let control_plane = match &config.transport {
359            BoundedTransportConfig::Tcp(config) => ControlPlaneClientConfig {
360                protocol: ControlPlaneProtocol::Tcp,
361                tls: config.tls.clone(),
362                wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
363            },
364            BoundedTransportConfig::Quic(config) => ControlPlaneClientConfig {
365                protocol: ControlPlaneProtocol::Quic,
366                tls: config.tls.clone(),
367                wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
368            },
369        };
370
371        let transport = match config.transport {
372            BoundedTransportConfig::Tcp(config) => {
373                let transport = self
374                    .attach_tcp_transport(config, HashMap::new())
375                    .await
376                    .map_err(ClusterError::Transport)?;
377                for peer in &peers {
378                    if let Some(server_name) = &peer.server_name {
379                        transport.add_peer_with_server_name(
380                            peer.engine_id.clone(),
381                            peer.addr,
382                            server_name.clone(),
383                        );
384                    } else {
385                        transport.add_peer(peer.engine_id.clone(), peer.addr);
386                    }
387                }
388                self.advertise_existing_actors(&transport)
389                    .map_err(ClusterError::Transport)?;
390                BoundedTransportHandle::Tcp(transport)
391            }
392            BoundedTransportConfig::Quic(config) => {
393                let transport = self
394                    .attach_quic_transport(config, HashMap::new())
395                    .await
396                    .map_err(ClusterError::Transport)?;
397                for peer in &peers {
398                    if let Some(server_name) = &peer.server_name {
399                        transport.add_peer_with_server_name(
400                            peer.engine_id.clone(),
401                            peer.addr,
402                            server_name.clone(),
403                        );
404                    } else {
405                        transport.add_peer(peer.engine_id.clone(), peer.addr);
406                    }
407                }
408                self.advertise_existing_actors(&transport)
409                    .map_err(ClusterError::Transport)?;
410                BoundedTransportHandle::Quic(transport)
411            }
412        };
413
414        let cluster_engine_id = local_engine_id.clone();
415        let mut cluster = BoundedClusterHandle::new(
416            self.clone(),
417            local_engine_id,
418            control_plane,
419            transport,
420            peers,
421            roles,
422            self.registry.clone(),
423        );
424        register_spawn_peer_installer(&cluster_engine_id, cluster.transport());
425        for declared in declared_actors {
426            cluster.declare_remote_actor(declared.owner, declared.path)?;
427        }
428        cluster.advertise_existing_local_routes()?;
429        Ok(cluster)
430    }
431
432    /// Create a typed remote `Addr<M>` from a canonical actor path.
433    ///
434    /// The cross-engine routing key is `AddrHash::new(path, 0)`, which is the
435    /// canonical distributed identity for a logical actor path.
436    pub fn remote_addr_for_path<M: RemoteMessage>(
437        &self,
438        path: &ActorPath,
439    ) -> palladium_actor::Addr<M>
440    where
441        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
442        R: Clone + Send + Sync,
443    {
444        self.remote_addr_for::<M>(AddrHash::new(path, 0))
445    }
446
447    /// Create a stable typed remote handle from a canonical actor path.
448    ///
449    /// This v1 API makes the logical remote identity stable by path. It does
450    /// not provide remote restart notifications or continuous liveness
451    /// tracking. During remote absence or re-advertisement windows, callers may
452    /// still observe the same transient failures as `remote_addr_for_path()`,
453    /// such as `Unroutable`, `ConnectionFailed`, or `Timeout`.
454    pub fn remote_stable_addr_for_path<M: RemoteMessage>(&self, path: &ActorPath) -> StableAddr<M>
455    where
456        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
457        R: Clone + Send + Sync,
458    {
459        StableAddr::from_addr(self.remote_addr_for_path::<M>(path))
460    }
461
462    // ── Introspection (REQ-085) ───────────────────────────────────────────────
463
464    /// List actors, optionally filtered by `query`.
465    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
466        self.registry.snapshot(&query, 0)
467    }
468
469    /// Get info for a single actor by path.
470    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
471        self.registry.actor_info_by_path(path, 0)
472    }
473
474    /// System-level snapshot: all actors + uptime.
475    pub fn snapshot(&self) -> EngineSnapshot {
476        let actors = self.actor_list(ActorQuery::default());
477        let uptime_secs = self.start_time.elapsed().as_secs();
478        EngineSnapshot {
479            num_cores: 1,
480            uptime_secs,
481            actors,
482        }
483    }
484
485    /// Lookup an actor's address by its path.
486    pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
487        self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
488            self.federated_routing
489                .as_ref()
490                .and_then(|routing| routing.resolve_optional(path))
491        })
492    }
493
494    /// Resolve an actor's address by path with routing errors.
495    pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
496        if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
497            return Ok(local);
498        }
499        if let Some(routing) = &self.federated_routing {
500            return routing.resolve(path);
501        }
502        Err(SendError::Unroutable)
503    }
504
505    /// Send a hard-stop signal to an actor by path.
506    ///
507    /// Returns `true` if the actor was found and the signal was sent. The
508    /// actor may still be running until the signal is processed by the
509    /// reactor. Used by `FaultInjector::crash_actor`.
510    pub fn stop_actor(&self, path: &ActorPath) -> bool {
511        if let Some(slot) = self.registry.get_by_path(path) {
512            slot.ctrl_tx
513                .send(LifecycleSignal::Stop(StopReason::Supervisor))
514                .ok();
515            true
516        } else {
517            false
518        }
519    }
520
521    /// Fill an actor's mailbox to capacity by sending dummy messages.
522    ///
523    /// Returns the number of messages that were successfully enqueued.
524    /// Subsequent sends to the same actor will return `SendError::MailboxFull`
525    /// until the actor drains messages. Used by `FaultInjector::fill_mailbox`.
526    pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
527        let addr = self.registry.get_by_path(path).map(|s| s.addr);
528        let Some(addr) = addr else { return 0 };
529        let src = AddrHash::synthetic(b"fault-fill-mailbox");
530        let mut count = 0;
531        loop {
532            let env = Envelope::new(src, addr, 0, 0);
533            match self.transport.try_deliver(env, MessagePayload::local(())) {
534                Ok(()) => count += 1,
535                Err(_) => break,
536            }
537        }
538        count
539    }
540
541    /// Dynamically spawn a user actor by sending a signal to the root /user supervisor.
542    pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
543        let user_path = ActorPath::parse("/user").unwrap();
544        if let Some(slot) = self.registry.get_by_path(&user_path) {
545            slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
546        }
547    }
548
549    /// Spawn a pool of N identical worker actors under the `/user` supervisor
550    /// and return a [`WorkerPool<M>`] routing handle.
551    ///
552    /// Each worker gets a [`StableAddr<M>`] backed by an [`AddrRefresher`] so
553    /// that the pool transparently routes to new incarnations after restarts.
554    ///
555    /// # Ordering
556    ///
557    /// Spawn requests are sent asynchronously to the `/user` supervisor.  The
558    /// returned pool's `StableAddr`s are initially `None`; they are populated
559    /// once the supervisor task processes each `SpawnChild` signal.  Yield at
560    /// least once (e.g. `tokio::task::yield_now().await`) before sending
561    /// messages through the pool.
562    ///
563    /// # Panics
564    ///
565    /// Panics if `config.size` is 0 (no workers requested).
566    pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
567    where
568        M: Message,
569        R: Clone + Send + Sync + 'static,
570        G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
571    {
572        let user_path = ActorPath::parse("/user").unwrap();
573        let handle = self.clone();
574        crate::worker_pool::spawn_worker_pool(
575            &user_path,
576            config,
577            factory,
578            self.source_addr,
579            &self.transport,
580            &self.transport_registry,
581            &self.responses,
582            &self.reactor,
583            move |spec| handle.spawn_user_actor(spec),
584        )
585    }
586}