Skip to main content

palladium_runtime/engine/
handle.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9    Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10    PoolConfig, RemoteMessage, SendError, StableAddr, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14    InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15    QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16    TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::bounded_cluster::{
21    remote_spawn_wait_timeout, validate_config, BoundedClusterConfig, BoundedClusterHandle,
22    BoundedTransportConfig, BoundedTransportHandle, ClusterError, ControlPlaneClientConfig,
23    ControlPlaneProtocol,
24};
25use crate::common::LifecycleSignal;
26use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
27use crate::reactor::{Reactor, TokioReactor};
28use crate::registry::ActorRegistry;
29use crate::responses::ResponseRegistry;
30
31/// Handle for interacting with the engine from external (test) code.
32#[derive(Clone)]
33pub struct EngineHandle<
34    R: Reactor = TokioReactor,
35    N: Network = TokioNetwork,
36    F: FileSystem = TokioFileSystem,
37> {
38    pub(crate) transport: Arc<InProcessTransport>,
39    pub(crate) transport_registry: Arc<TransportRegistry>,
40    pub(crate) type_registry: TypeRegistry,
41    pub(crate) responses: Arc<ResponseRegistry>,
42    pub(crate) registry: Arc<ActorRegistry<R>>,
43    pub(crate) ask_timeout: Duration,
44    pub(crate) reactor: R,
45    pub(crate) network: N,
46    pub(crate) _fs: F,
47    pub(crate) start_time: Instant,
48    pub(crate) source_addr: AddrHash,
49    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
50    /// Cache of direct `MailboxSender`s for local actors, keyed by `AddrHash`.
51    ///
52    /// Populated on the first `send_to` to a local actor; avoids the DashMap
53    /// lookup in `TransportRegistry::try_route` on subsequent sends.
54    /// Evicted when a send returns `ActorStopped` (receiver dropped).
55    /// Shared across handle clones via `Arc` so all clones benefit.
56    pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
57    /// Receiving end of the response pump mailbox.  Consumed by the first
58    /// `remote_addr_for` ask, which spawns a long-lived pump task.
59    pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
60}
61
62impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
63    fn advertise_existing_actors<T: palladium_transport::Transport>(
64        &self,
65        transport: &Arc<T>,
66    ) -> Result<(), TransportError> {
67        for info in self.registry.snapshot(&ActorQuery::default(), 0) {
68            if info.state != crate::introspection::ActorState::Running {
69                continue;
70            }
71            if let Some(slot) = self.registry.get_by_path(&info.path) {
72                transport.advertise_path(slot.addr, &info.path)?;
73            }
74        }
75        Ok(())
76    }
77
78    pub fn type_registry(&self) -> TypeRegistry {
79        self.type_registry.clone()
80    }
81
82    /// Send a message to an actor by its `AddrHash`.
83    ///
84    /// **Fast path**: after the first send to a local actor, the `MailboxSender`
85    /// is cached in `send_cache`, bypassing the `TransportRegistry` DashMap on
86    /// subsequent sends.  Stale entries are evicted on `ActorStopped`.
87    pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
88        let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
89        let payload = MessagePayload::local(msg);
90
91        // Fast path: use cached MailboxSender to avoid DashMap in TransportRegistry.
92        if let Some(sender) = self.send_cache.get(&target) {
93            return match sender.try_send(MailboxMessage { envelope, payload }) {
94                Ok(()) => Ok(()),
95                Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
96                Err(_) => {
97                    // Receiver dropped — evict stale cache entry.
98                    drop(sender); // release DashMap shard lock before remove
99                    self.send_cache.remove(&target);
100                    Err(SendError::ActorStopped)
101                }
102            };
103        }
104
105        // Cache miss: check if this is a local actor, populate cache, and send.
106        if let Some(sender) = self.transport_registry.get_local_sender(target) {
107            let result = sender.try_send(MailboxMessage { envelope, payload });
108            match result {
109                Ok(()) => {
110                    self.send_cache.insert(target, sender);
111                    return Ok(());
112                }
113                Err(TransportError::MailboxFull) => {
114                    // Actor is alive but full; cache for next send.
115                    self.send_cache.insert(target, sender);
116                    return Err(SendError::MailboxFull);
117                }
118                Err(_) => return Err(SendError::ActorStopped),
119            }
120        }
121
122        // Non-local destination (remote, QUIC, TCP, etc.): full routing.
123        self.transport_registry
124            .try_route(envelope, payload)
125            .map_err(|e| match e {
126                TransportError::MailboxFull => SendError::MailboxFull,
127                TransportError::ConnectionFailed => SendError::ConnectionFailed,
128                TransportError::Unroutable(_) => SendError::Unroutable,
129                TransportError::SerializationRequired
130                | TransportError::SerializationError(_)
131                | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
132                _ => SendError::ActorStopped,
133            })
134    }
135
136    /// Send up to `n` messages to one target using a producer-side batch fast path.
137    ///
138    /// For local destinations this uses `MailboxSender::try_send_batch_owned`,
139    /// reducing wake/schedule overhead vs repeated `send_to` calls.
140    ///
141    /// Returns the number of messages successfully enqueued. On local mailbox
142    /// backpressure this may be less than `n` without returning an error.
143    pub fn send_many<M: Message + Clone>(
144        &self,
145        target: AddrHash,
146        msg: M,
147        n: usize,
148    ) -> Result<usize, SendError> {
149        if n == 0 {
150            return Ok(0);
151        }
152
153        let make_batch = || {
154            let mut batch = Vec::with_capacity(n);
155            for _ in 0..n {
156                batch.push(MailboxMessage {
157                    envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
158                    payload: MessagePayload::local(msg.clone()),
159                });
160            }
161            batch
162        };
163
164        // Fast path: use cached MailboxSender.
165        if let Some(sender) = self.send_cache.get(&target) {
166            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
167            return match err {
168                None => Ok(sent),
169                Some(TransportError::MailboxFull) => Ok(sent),
170                _ => {
171                    drop(sender);
172                    self.send_cache.remove(&target);
173                    Err(SendError::ActorStopped)
174                }
175            };
176        }
177
178        // Cache miss: local actor lookup and batch send.
179        if let Some(sender) = self.transport_registry.get_local_sender(target) {
180            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
181            match err {
182                None => {
183                    self.send_cache.insert(target, sender);
184                    return Ok(sent);
185                }
186                Some(TransportError::MailboxFull) => {
187                    self.send_cache.insert(target, sender);
188                    return Ok(sent);
189                }
190                _ => return Err(SendError::ActorStopped),
191            }
192        }
193
194        // Non-local fallback: preserve send_to semantics.
195        let mut sent = 0usize;
196        for _ in 0..n {
197            match self.send_to::<M>(target, msg.clone()) {
198                Ok(()) => sent += 1,
199                Err(SendError::MailboxFull) => return Ok(sent),
200                Err(e) => return Err(e),
201            }
202        }
203        Ok(sent)
204    }
205
206    /// Create an `Addr<M>` that sends to `target` via this engine's transport.
207    pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
208    where
209        R: Send + Sync,
210    {
211        let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
212        let ask_fn = make_ask_fn::<M, R>(
213            target,
214            self.source_addr,
215            self.transport.clone(),
216            self.transport_registry.clone(),
217            self.responses.clone(),
218            self.ask_timeout,
219            self.reactor.clone(),
220        );
221        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
222    }
223
224    /// Create a remote `Addr<M>` that always serializes payloads before routing.
225    ///
226    /// This is intended for explicit remote sends; it rejects local destinations
227    /// to avoid accidental local delivery of serialized payloads.
228    pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
229        let registry = self.transport_registry.clone();
230        let source = self.source_addr;
231        let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
232            if registry.is_local(target) {
233                return Err(SendError::PolicyViolation);
234            }
235            let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
236            let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
237            registry
238                .try_route(envelope, MessagePayload::serialized(bytes))
239                .map_err(|e| match e {
240                    TransportError::MailboxFull => SendError::MailboxFull,
241                    TransportError::ConnectionFailed => SendError::ConnectionFailed,
242                    TransportError::Unroutable(_) => SendError::Unroutable,
243                    TransportError::SerializationRequired
244                    | TransportError::SerializationError(_)
245                    | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
246                    _ => SendError::ActorStopped,
247                })
248        });
249        let ask_fn: palladium_actor::AskFn<M> =
250            Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
251                Box::pin(async { Err(AskError::Unbound) })
252            });
253        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
254    }
255
256    /// Create a typed remote `Addr<M>` that supports both `send()` and `ask()`.
257    ///
258    /// Both request and response payloads are serialized for transport delivery.
259    /// Local destinations are rejected with `AskError::Send(PolicyViolation)`.
260    ///
261    /// Before calling, register the message type on both engines:
262    /// ```ignore
263    /// handle.type_registry().register_remote_ask::<MyMsg>();
264    /// ```
265    ///
266    /// The first `ask()` on the returned address lazily starts the engine's
267    /// response pump task (a lightweight background task that routes incoming
268    /// serialized response frames into the local response registry).
269    pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
270    where
271        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
272        R: Clone + Send + Sync,
273    {
274        let send_fn =
275            make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
276        let ask_fn = make_remote_ask_fn::<M, R>(
277            target,
278            self.source_addr,
279            self.transport_registry.clone(),
280            self.responses.clone(),
281            self.ask_timeout,
282            self.reactor.clone(),
283            Arc::clone(&self.pump_rx),
284        );
285        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
286    }
287
288    pub fn transport(&self) -> &Arc<InProcessTransport> {
289        &self.transport
290    }
291
292    pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
293        &self.transport_registry
294    }
295
296    pub async fn attach_quic_transport(
297        &self,
298        config: QuicTransportConfig,
299        peers: HashMap<EngineId, SocketAddr>,
300    ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
301    where
302        R: Clone,
303        N: Clone,
304    {
305        let transport = QuicTransport::bind(
306            config,
307            peers,
308            self.transport_registry.clone(),
309            self.type_registry.clone(),
310            self.reactor.clone(),
311            self.network.clone(),
312        )
313        .await?;
314        self.transport_registry.add_transport(transport.clone())?;
315        self.advertise_existing_actors(&transport)?;
316        Ok(transport)
317    }
318
319    pub async fn attach_tcp_transport(
320        &self,
321        config: TcpTransportConfig,
322        peers: HashMap<EngineId, SocketAddr>,
323    ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
324    where
325        R: Clone,
326        N: Clone,
327    {
328        let transport = TcpTransport::bind(
329            config,
330            peers,
331            self.transport_registry.clone(),
332            self.type_registry.clone(),
333            self.reactor.clone(),
334            self.network.clone(),
335        )
336        .await?;
337        self.transport_registry.add_transport(transport.clone())?;
338        self.advertise_existing_actors(&transport)?;
339        Ok(transport)
340    }
341
342    pub async fn attach_bounded_cluster(
343        &self,
344        config: BoundedClusterConfig,
345    ) -> Result<BoundedClusterHandle<R, N, F>, ClusterError>
346    where
347        R: Clone,
348        N: Clone,
349    {
350        validate_config(&config, &self.registry)?;
351
352        let local_engine_id = config.transport.engine_id().clone();
353        let peers = config.peers;
354        let declared_actors = config.declared_actors;
355        let roles = config.roles;
356
357        let control_plane = match &config.transport {
358            BoundedTransportConfig::Tcp(config) => ControlPlaneClientConfig {
359                protocol: ControlPlaneProtocol::Tcp,
360                tls: config.tls.clone(),
361                wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
362            },
363            BoundedTransportConfig::Quic(config) => ControlPlaneClientConfig {
364                protocol: ControlPlaneProtocol::Quic,
365                tls: config.tls.clone(),
366                wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
367            },
368        };
369
370        let transport = match config.transport {
371            BoundedTransportConfig::Tcp(config) => {
372                let transport = self
373                    .attach_tcp_transport(config, HashMap::new())
374                    .await
375                    .map_err(ClusterError::Transport)?;
376                for peer in &peers {
377                    if let Some(server_name) = &peer.server_name {
378                        transport.add_peer_with_server_name(
379                            peer.engine_id.clone(),
380                            peer.addr,
381                            server_name.clone(),
382                        );
383                    } else {
384                        transport.add_peer(peer.engine_id.clone(), peer.addr);
385                    }
386                }
387                self.advertise_existing_actors(&transport)
388                    .map_err(ClusterError::Transport)?;
389                BoundedTransportHandle::Tcp(transport)
390            }
391            BoundedTransportConfig::Quic(config) => {
392                let transport = self
393                    .attach_quic_transport(config, HashMap::new())
394                    .await
395                    .map_err(ClusterError::Transport)?;
396                for peer in &peers {
397                    if let Some(server_name) = &peer.server_name {
398                        transport.add_peer_with_server_name(
399                            peer.engine_id.clone(),
400                            peer.addr,
401                            server_name.clone(),
402                        );
403                    } else {
404                        transport.add_peer(peer.engine_id.clone(), peer.addr);
405                    }
406                }
407                self.advertise_existing_actors(&transport)
408                    .map_err(ClusterError::Transport)?;
409                BoundedTransportHandle::Quic(transport)
410            }
411        };
412
413        let mut cluster = BoundedClusterHandle::new(
414            self.clone(),
415            local_engine_id,
416            control_plane,
417            transport,
418            peers,
419            roles,
420            self.registry.clone(),
421        );
422        for declared in declared_actors {
423            cluster.declare_remote_actor(declared.owner, declared.path)?;
424        }
425        Ok(cluster)
426    }
427
428    /// Create a typed remote `Addr<M>` from a canonical actor path.
429    ///
430    /// The cross-engine routing key is `AddrHash::new(path, 0)`, which is the
431    /// canonical distributed identity for a logical actor path.
432    pub fn remote_addr_for_path<M: RemoteMessage>(
433        &self,
434        path: &ActorPath,
435    ) -> palladium_actor::Addr<M>
436    where
437        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
438        R: Clone + Send + Sync,
439    {
440        self.remote_addr_for::<M>(AddrHash::new(path, 0))
441    }
442
443    /// Create a stable typed remote handle from a canonical actor path.
444    ///
445    /// This v1 API makes the logical remote identity stable by path. It does
446    /// not provide remote restart notifications or continuous liveness
447    /// tracking. During remote absence or re-advertisement windows, callers may
448    /// still observe the same transient failures as `remote_addr_for_path()`,
449    /// such as `Unroutable`, `ConnectionFailed`, or `Timeout`.
450    pub fn remote_stable_addr_for_path<M: RemoteMessage>(&self, path: &ActorPath) -> StableAddr<M>
451    where
452        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
453        R: Clone + Send + Sync,
454    {
455        StableAddr::from_addr(self.remote_addr_for_path::<M>(path))
456    }
457
458    // ── Introspection (REQ-085) ───────────────────────────────────────────────
459
460    /// List actors, optionally filtered by `query`.
461    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
462        self.registry.snapshot(&query, 0)
463    }
464
465    /// Get info for a single actor by path.
466    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
467        self.registry.actor_info_by_path(path, 0)
468    }
469
470    /// System-level snapshot: all actors + uptime.
471    pub fn snapshot(&self) -> EngineSnapshot {
472        let actors = self.actor_list(ActorQuery::default());
473        let uptime_secs = self.start_time.elapsed().as_secs();
474        EngineSnapshot {
475            num_cores: 1,
476            uptime_secs,
477            actors,
478        }
479    }
480
481    /// Lookup an actor's address by its path.
482    pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
483        self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
484            self.federated_routing
485                .as_ref()
486                .and_then(|routing| routing.resolve_optional(path))
487        })
488    }
489
490    /// Resolve an actor's address by path with routing errors.
491    pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
492        if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
493            return Ok(local);
494        }
495        if let Some(routing) = &self.federated_routing {
496            return routing.resolve(path);
497        }
498        Err(SendError::Unroutable)
499    }
500
501    /// Send a hard-stop signal to an actor by path.
502    ///
503    /// Returns `true` if the actor was found and the signal was sent. The
504    /// actor may still be running until the signal is processed by the
505    /// reactor. Used by `FaultInjector::crash_actor`.
506    pub fn stop_actor(&self, path: &ActorPath) -> bool {
507        if let Some(slot) = self.registry.get_by_path(path) {
508            slot.ctrl_tx
509                .send(LifecycleSignal::Stop(StopReason::Supervisor))
510                .ok();
511            true
512        } else {
513            false
514        }
515    }
516
517    /// Fill an actor's mailbox to capacity by sending dummy messages.
518    ///
519    /// Returns the number of messages that were successfully enqueued.
520    /// Subsequent sends to the same actor will return `SendError::MailboxFull`
521    /// until the actor drains messages. Used by `FaultInjector::fill_mailbox`.
522    pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
523        let addr = self.registry.get_by_path(path).map(|s| s.addr);
524        let Some(addr) = addr else { return 0 };
525        let src = AddrHash::synthetic(b"fault-fill-mailbox");
526        let mut count = 0;
527        loop {
528            let env = Envelope::new(src, addr, 0, 0);
529            match self.transport.try_deliver(env, MessagePayload::local(())) {
530                Ok(()) => count += 1,
531                Err(_) => break,
532            }
533        }
534        count
535    }
536
537    /// Dynamically spawn a user actor by sending a signal to the root /user supervisor.
538    pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
539        let user_path = ActorPath::parse("/user").unwrap();
540        if let Some(slot) = self.registry.get_by_path(&user_path) {
541            slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
542        }
543    }
544
545    /// Spawn a pool of N identical worker actors under the `/user` supervisor
546    /// and return a [`WorkerPool<M>`] routing handle.
547    ///
548    /// Each worker gets a [`StableAddr<M>`] backed by an [`AddrRefresher`] so
549    /// that the pool transparently routes to new incarnations after restarts.
550    ///
551    /// # Ordering
552    ///
553    /// Spawn requests are sent asynchronously to the `/user` supervisor.  The
554    /// returned pool's `StableAddr`s are initially `None`; they are populated
555    /// once the supervisor task processes each `SpawnChild` signal.  Yield at
556    /// least once (e.g. `tokio::task::yield_now().await`) before sending
557    /// messages through the pool.
558    ///
559    /// # Panics
560    ///
561    /// Panics if `config.size` is 0 (no workers requested).
562    pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
563    where
564        M: Message,
565        R: Clone + Send + Sync + 'static,
566        G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
567    {
568        let user_path = ActorPath::parse("/user").unwrap();
569        let handle = self.clone();
570        crate::worker_pool::spawn_worker_pool(
571            &user_path,
572            config,
573            factory,
574            self.source_addr,
575            &self.transport,
576            &self.transport_registry,
577            &self.responses,
578            &self.reactor,
579            move |spec| handle.spawn_user_actor(spec),
580        )
581    }
582}