Skip to main content

palladium_runtime/engine/
handle.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9    Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10    PoolConfig, RemoteMessage, SendError, StableAddr, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14    mailbox, InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15    QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16    TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::bounded_cluster::{
21    register_spawn_peer_installer, remote_spawn_wait_timeout, validate_config,
22    BoundedClusterConfig, BoundedClusterHandle, BoundedClusterTopology, BoundedTransportConfig,
23    BoundedTransportHandle, ClusterError, ControlPlaneClientConfig, ControlPlaneProtocol,
24};
25use crate::common::LifecycleSignal;
26use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
27use crate::reactor::{Reactor, TokioReactor};
28use crate::registry::ActorRegistry;
29use crate::responses::ResponseRegistry;
30
31/// Handle for interacting with the engine from external (test) code.
32#[derive(Clone)]
33pub struct EngineHandle<
34    R: Reactor = TokioReactor,
35    N: Network = TokioNetwork,
36    F: FileSystem = TokioFileSystem,
37> {
38    pub(crate) transport: Arc<InProcessTransport>,
39    pub(crate) transport_registry: Arc<TransportRegistry>,
40    pub(crate) type_registry: TypeRegistry,
41    pub(crate) responses: Arc<ResponseRegistry>,
42    pub(crate) registry: Arc<ActorRegistry<R>>,
43    pub(crate) ask_timeout: Duration,
44    pub(crate) reactor: R,
45    pub(crate) network: N,
46    pub(crate) _fs: F,
47    pub(crate) start_time: Instant,
48    pub(crate) source_addr: AddrHash,
49    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
50    /// Cache of direct `MailboxSender`s for local actors, keyed by `AddrHash`.
51    ///
52    /// Populated on the first `send_to` to a local actor; avoids the DashMap
53    /// lookup in `TransportRegistry::try_route` on subsequent sends.
54    /// Evicted when a send returns `ActorStopped` (receiver dropped).
55    /// Shared across handle clones via `Arc` so all clones benefit.
56    pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
57    /// Receiving end of the response pump mailbox.  Consumed by the first
58    /// `remote_addr_for` ask, which spawns a long-lived pump task.
59    pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
60}
61
62impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
63    fn advertise_existing_actors<T: palladium_transport::Transport>(
64        &self,
65        transport: &Arc<T>,
66    ) -> Result<(), TransportError> {
67        transport.register(self.source_addr, mailbox(1).0)?;
68        for info in self.registry.snapshot(&ActorQuery::default(), 0) {
69            if info.state != crate::introspection::ActorState::Running {
70                continue;
71            }
72            if let Some(slot) = self.registry.get_by_path(&info.path) {
73                transport.advertise_path(slot.addr, &info.path)?;
74            }
75        }
76        Ok(())
77    }
78
79    pub fn type_registry(&self) -> TypeRegistry {
80        self.type_registry.clone()
81    }
82
83    /// Send a message to an actor by its `AddrHash`.
84    ///
85    /// **Fast path**: after the first send to a local actor, the `MailboxSender`
86    /// is cached in `send_cache`, bypassing the `TransportRegistry` DashMap on
87    /// subsequent sends.  Stale entries are evicted on `ActorStopped`.
88    pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
89        let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
90        let payload = MessagePayload::local(msg);
91
92        // Fast path: use cached MailboxSender to avoid DashMap in TransportRegistry.
93        if let Some(sender) = self.send_cache.get(&target) {
94            return match sender.try_send(MailboxMessage { envelope, payload }) {
95                Ok(()) => Ok(()),
96                Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
97                Err(_) => {
98                    // Receiver dropped — evict stale cache entry.
99                    drop(sender); // release DashMap shard lock before remove
100                    self.send_cache.remove(&target);
101                    Err(SendError::ActorStopped)
102                }
103            };
104        }
105
106        // Cache miss: check if this is a local actor, populate cache, and send.
107        if let Some(sender) = self.transport_registry.get_local_sender(target) {
108            let result = sender.try_send(MailboxMessage { envelope, payload });
109            match result {
110                Ok(()) => {
111                    self.send_cache.insert(target, sender);
112                    return Ok(());
113                }
114                Err(TransportError::MailboxFull) => {
115                    // Actor is alive but full; cache for next send.
116                    self.send_cache.insert(target, sender);
117                    return Err(SendError::MailboxFull);
118                }
119                Err(_) => return Err(SendError::ActorStopped),
120            }
121        }
122
123        // Non-local destination (remote, QUIC, TCP, etc.): full routing.
124        self.transport_registry
125            .try_route(envelope, payload)
126            .map_err(|e| match e {
127                TransportError::MailboxFull => SendError::MailboxFull,
128                TransportError::ConnectionFailed => SendError::ConnectionFailed,
129                TransportError::Unroutable(_) => SendError::Unroutable,
130                TransportError::SerializationRequired
131                | TransportError::SerializationError(_)
132                | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
133                _ => SendError::ActorStopped,
134            })
135    }
136
137    /// Send up to `n` messages to one target using a producer-side batch fast path.
138    ///
139    /// For local destinations this uses `MailboxSender::try_send_batch_owned`,
140    /// reducing wake/schedule overhead vs repeated `send_to` calls.
141    ///
142    /// Returns the number of messages successfully enqueued. On local mailbox
143    /// backpressure this may be less than `n` without returning an error.
144    pub fn send_many<M: Message + Clone>(
145        &self,
146        target: AddrHash,
147        msg: M,
148        n: usize,
149    ) -> Result<usize, SendError> {
150        if n == 0 {
151            return Ok(0);
152        }
153
154        let make_batch = || {
155            let mut batch = Vec::with_capacity(n);
156            for _ in 0..n {
157                batch.push(MailboxMessage {
158                    envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
159                    payload: MessagePayload::local(msg.clone()),
160                });
161            }
162            batch
163        };
164
165        // Fast path: use cached MailboxSender.
166        if let Some(sender) = self.send_cache.get(&target) {
167            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
168            return match err {
169                None => Ok(sent),
170                Some(TransportError::MailboxFull) => Ok(sent),
171                _ => {
172                    drop(sender);
173                    self.send_cache.remove(&target);
174                    Err(SendError::ActorStopped)
175                }
176            };
177        }
178
179        // Cache miss: local actor lookup and batch send.
180        if let Some(sender) = self.transport_registry.get_local_sender(target) {
181            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
182            match err {
183                None => {
184                    self.send_cache.insert(target, sender);
185                    return Ok(sent);
186                }
187                Some(TransportError::MailboxFull) => {
188                    self.send_cache.insert(target, sender);
189                    return Ok(sent);
190                }
191                _ => return Err(SendError::ActorStopped),
192            }
193        }
194
195        // Non-local fallback: preserve send_to semantics.
196        let mut sent = 0usize;
197        for _ in 0..n {
198            match self.send_to::<M>(target, msg.clone()) {
199                Ok(()) => sent += 1,
200                Err(SendError::MailboxFull) => return Ok(sent),
201                Err(e) => return Err(e),
202            }
203        }
204        Ok(sent)
205    }
206
207    /// Create an `Addr<M>` that sends to `target` via this engine's transport.
208    pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
209    where
210        R: Send + Sync,
211    {
212        let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
213        let ask_fn = make_ask_fn::<M, R>(
214            target,
215            self.source_addr,
216            self.transport.clone(),
217            self.transport_registry.clone(),
218            self.responses.clone(),
219            self.ask_timeout,
220            self.reactor.clone(),
221        );
222        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
223    }
224
225    /// Create a remote `Addr<M>` that always serializes payloads before routing.
226    ///
227    /// This is intended for explicit remote sends; it rejects local destinations
228    /// to avoid accidental local delivery of serialized payloads.
229    pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
230        let registry = self.transport_registry.clone();
231        let source = self.source_addr;
232        let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
233            if registry.is_local(target) {
234                return Err(SendError::PolicyViolation);
235            }
236            let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
237            let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
238            registry
239                .try_route(envelope, MessagePayload::serialized(bytes))
240                .map_err(|e| match e {
241                    TransportError::MailboxFull => SendError::MailboxFull,
242                    TransportError::ConnectionFailed => SendError::ConnectionFailed,
243                    TransportError::Unroutable(_) => SendError::Unroutable,
244                    TransportError::SerializationRequired
245                    | TransportError::SerializationError(_)
246                    | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
247                    _ => SendError::ActorStopped,
248                })
249        });
250        let ask_fn: palladium_actor::AskFn<M> =
251            Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
252                Box::pin(async { Err(AskError::Unbound) })
253            });
254        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
255    }
256
257    /// Create a typed remote `Addr<M>` that supports both `send()` and `ask()`.
258    ///
259    /// Both request and response payloads are serialized for transport delivery.
260    /// Local destinations are rejected with `AskError::Send(PolicyViolation)`.
261    ///
262    /// Before calling, register the message type on both engines:
263    /// ```ignore
264    /// handle.type_registry().register_remote_ask::<MyMsg>();
265    /// ```
266    ///
267    /// The first `ask()` on the returned address lazily starts the engine's
268    /// response pump task (a lightweight background task that routes incoming
269    /// serialized response frames into the local response registry).
270    pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
271    where
272        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
273        R: Clone + Send + Sync,
274    {
275        let send_fn =
276            make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
277        let ask_fn = make_remote_ask_fn::<M, R>(
278            target,
279            self.source_addr,
280            self.transport_registry.clone(),
281            self.responses.clone(),
282            self.ask_timeout,
283            self.reactor.clone(),
284            Arc::clone(&self.pump_rx),
285        );
286        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
287    }
288
289    pub fn transport(&self) -> &Arc<InProcessTransport> {
290        &self.transport
291    }
292
293    pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
294        &self.transport_registry
295    }
296
297    pub async fn attach_quic_transport(
298        &self,
299        config: QuicTransportConfig,
300        peers: HashMap<EngineId, SocketAddr>,
301    ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
302    where
303        R: Clone,
304        N: Clone,
305    {
306        let transport = QuicTransport::bind(
307            config,
308            peers,
309            self.transport_registry.clone(),
310            self.type_registry.clone(),
311            self.reactor.clone(),
312            self.network.clone(),
313        )
314        .await?;
315        self.transport_registry.add_transport(transport.clone())?;
316        self.advertise_existing_actors(&transport)?;
317        Ok(transport)
318    }
319
320    pub async fn attach_tcp_transport(
321        &self,
322        config: TcpTransportConfig,
323        peers: HashMap<EngineId, SocketAddr>,
324    ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
325    where
326        R: Clone,
327        N: Clone,
328    {
329        let transport = TcpTransport::bind(
330            config,
331            peers,
332            self.transport_registry.clone(),
333            self.type_registry.clone(),
334            self.reactor.clone(),
335            self.network.clone(),
336        )
337        .await?;
338        self.transport_registry.add_transport(transport.clone())?;
339        self.advertise_existing_actors(&transport)?;
340        Ok(transport)
341    }
342
343    pub async fn attach_bounded_cluster(
344        &self,
345        config: BoundedClusterConfig,
346    ) -> Result<BoundedClusterHandle<R, N, F>, ClusterError>
347    where
348        R: Clone,
349        N: Clone,
350    {
351        validate_config(&config, &self.registry)?;
352
353        let local_engine_id = config.transport.engine_id().clone();
354        let peers = config.peers;
355        let declared_actors = config.declared_actors;
356        let roles = config.roles;
357        let placement_policies = config.placement_policies;
358
359        let control_plane = match &config.transport {
360            BoundedTransportConfig::Tcp(config) => ControlPlaneClientConfig {
361                protocol: ControlPlaneProtocol::Tcp,
362                tls: config.tls.clone(),
363                wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
364            },
365            BoundedTransportConfig::Quic(config) => ControlPlaneClientConfig {
366                protocol: ControlPlaneProtocol::Quic,
367                tls: config.tls.clone(),
368                wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
369            },
370        };
371
372        let transport = match config.transport {
373            BoundedTransportConfig::Tcp(config) => {
374                let transport = self
375                    .attach_tcp_transport(config, HashMap::new())
376                    .await
377                    .map_err(ClusterError::Transport)?;
378                for peer in &peers {
379                    if let Some(server_name) = &peer.server_name {
380                        transport.add_peer_with_server_name(
381                            peer.engine_id.clone(),
382                            peer.addr,
383                            server_name.clone(),
384                        );
385                    } else {
386                        transport.add_peer(peer.engine_id.clone(), peer.addr);
387                    }
388                }
389                self.advertise_existing_actors(&transport)
390                    .map_err(ClusterError::Transport)?;
391                BoundedTransportHandle::Tcp(transport)
392            }
393            BoundedTransportConfig::Quic(config) => {
394                let transport = self
395                    .attach_quic_transport(config, HashMap::new())
396                    .await
397                    .map_err(ClusterError::Transport)?;
398                for peer in &peers {
399                    if let Some(server_name) = &peer.server_name {
400                        transport.add_peer_with_server_name(
401                            peer.engine_id.clone(),
402                            peer.addr,
403                            server_name.clone(),
404                        );
405                    } else {
406                        transport.add_peer(peer.engine_id.clone(), peer.addr);
407                    }
408                }
409                self.advertise_existing_actors(&transport)
410                    .map_err(ClusterError::Transport)?;
411                BoundedTransportHandle::Quic(transport)
412            }
413        };
414
415        let cluster_engine_id = local_engine_id.clone();
416        let mut cluster = BoundedClusterHandle::new(
417            self.clone(),
418            local_engine_id,
419            control_plane,
420            transport,
421            BoundedClusterTopology {
422                peers,
423                roles,
424                placement_policies,
425            },
426            self.registry.clone(),
427        );
428        register_spawn_peer_installer(&cluster_engine_id, cluster.transport());
429        for declared in declared_actors {
430            cluster.declare_remote_actor(declared.owner, declared.path)?;
431        }
432        cluster.advertise_existing_local_routes()?;
433        Ok(cluster)
434    }
435
436    /// Create a typed remote `Addr<M>` from a canonical actor path.
437    ///
438    /// The cross-engine routing key is `AddrHash::new(path, 0)`, which is the
439    /// canonical distributed identity for a logical actor path.
440    pub fn remote_addr_for_path<M: RemoteMessage>(
441        &self,
442        path: &ActorPath,
443    ) -> palladium_actor::Addr<M>
444    where
445        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
446        R: Clone + Send + Sync,
447    {
448        self.remote_addr_for::<M>(AddrHash::new(path, 0))
449    }
450
451    /// Create a stable typed remote handle from a canonical actor path.
452    ///
453    /// This v1 API makes the logical remote identity stable by path. It does
454    /// not provide remote restart notifications or continuous liveness
455    /// tracking. During remote absence or re-advertisement windows, callers may
456    /// still observe the same transient failures as `remote_addr_for_path()`,
457    /// such as `Unroutable`, `ConnectionFailed`, or `Timeout`.
458    pub fn remote_stable_addr_for_path<M: RemoteMessage>(&self, path: &ActorPath) -> StableAddr<M>
459    where
460        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
461        R: Clone + Send + Sync,
462    {
463        StableAddr::from_addr(self.remote_addr_for_path::<M>(path))
464    }
465
466    // ── Introspection (REQ-085) ───────────────────────────────────────────────
467
468    /// List actors, optionally filtered by `query`.
469    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
470        self.registry.snapshot(&query, 0)
471    }
472
473    /// Get info for a single actor by path.
474    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
475        self.registry.actor_info_by_path(path, 0)
476    }
477
478    /// System-level snapshot: all actors + uptime.
479    pub fn snapshot(&self) -> EngineSnapshot {
480        let actors = self.actor_list(ActorQuery::default());
481        let uptime_secs = self.start_time.elapsed().as_secs();
482        EngineSnapshot {
483            num_cores: 1,
484            uptime_secs,
485            actors,
486        }
487    }
488
489    /// Lookup an actor's address by its path.
490    pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
491        self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
492            self.federated_routing
493                .as_ref()
494                .and_then(|routing| routing.resolve_optional(path))
495        })
496    }
497
498    /// Resolve an actor's address by path with routing errors.
499    pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
500        if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
501            return Ok(local);
502        }
503        if let Some(routing) = &self.federated_routing {
504            return routing.resolve(path);
505        }
506        Err(SendError::Unroutable)
507    }
508
509    /// Send a hard-stop signal to an actor by path.
510    ///
511    /// Returns `true` if the actor was found and the signal was sent. The
512    /// actor may still be running until the signal is processed by the
513    /// reactor. Used by `FaultInjector::crash_actor`.
514    pub fn stop_actor(&self, path: &ActorPath) -> bool {
515        if let Some(slot) = self.registry.get_by_path(path) {
516            slot.ctrl_tx
517                .send(LifecycleSignal::Stop(StopReason::Supervisor))
518                .ok();
519            true
520        } else {
521            false
522        }
523    }
524
525    /// Fill an actor's mailbox to capacity by sending dummy messages.
526    ///
527    /// Returns the number of messages that were successfully enqueued.
528    /// Subsequent sends to the same actor will return `SendError::MailboxFull`
529    /// until the actor drains messages. Used by `FaultInjector::fill_mailbox`.
530    pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
531        let addr = self.registry.get_by_path(path).map(|s| s.addr);
532        let Some(addr) = addr else { return 0 };
533        let src = AddrHash::synthetic(b"fault-fill-mailbox");
534        let mut count = 0;
535        loop {
536            let env = Envelope::new(src, addr, 0, 0);
537            match self.transport.try_deliver(env, MessagePayload::local(())) {
538                Ok(()) => count += 1,
539                Err(_) => break,
540            }
541        }
542        count
543    }
544
545    /// Dynamically spawn a user actor by sending a signal to the root /user supervisor.
546    pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
547        let user_path = ActorPath::parse("/user").unwrap();
548        if let Some(slot) = self.registry.get_by_path(&user_path) {
549            slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
550        }
551    }
552
553    /// Spawn a pool of N identical worker actors under the `/user` supervisor
554    /// and return a [`WorkerPool<M>`] routing handle.
555    ///
556    /// Each worker gets a [`StableAddr<M>`] backed by an [`AddrRefresher`] so
557    /// that the pool transparently routes to new incarnations after restarts.
558    ///
559    /// # Ordering
560    ///
561    /// Spawn requests are sent asynchronously to the `/user` supervisor.  The
562    /// returned pool's `StableAddr`s are initially `None`; they are populated
563    /// once the supervisor task processes each `SpawnChild` signal.  Yield at
564    /// least once (e.g. `tokio::task::yield_now().await`) before sending
565    /// messages through the pool.
566    ///
567    /// # Panics
568    ///
569    /// Panics if `config.size` is 0 (no workers requested).
570    pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
571    where
572        M: Message,
573        R: Clone + Send + Sync + 'static,
574        G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
575    {
576        let user_path = ActorPath::parse("/user").unwrap();
577        let handle = self.clone();
578        crate::worker_pool::spawn_worker_pool(
579            &user_path,
580            config,
581            factory,
582            self.source_addr,
583            &self.transport,
584            &self.transport_registry,
585            &self.responses,
586            &self.reactor,
587            move |spec| handle.spawn_user_actor(spec),
588        )
589    }
590}