Skip to main content

palladium_runtime/engine/
handle.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9    Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10    PoolConfig, RemoteMessage, SendError, StableAddr, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14    InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15    QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16    TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::bounded_cluster::{
21    validate_config, BoundedClusterConfig, BoundedClusterHandle, BoundedTransportConfig,
22    BoundedTransportHandle, ClusterError, ControlPlaneClientConfig, ControlPlaneProtocol,
23};
24use crate::common::LifecycleSignal;
25use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
26use crate::reactor::{Reactor, TokioReactor};
27use crate::registry::ActorRegistry;
28use crate::responses::ResponseRegistry;
29
30/// Handle for interacting with the engine from external (test) code.
31#[derive(Clone)]
32pub struct EngineHandle<
33    R: Reactor = TokioReactor,
34    N: Network = TokioNetwork,
35    F: FileSystem = TokioFileSystem,
36> {
37    pub(crate) transport: Arc<InProcessTransport>,
38    pub(crate) transport_registry: Arc<TransportRegistry>,
39    pub(crate) type_registry: TypeRegistry,
40    pub(crate) responses: Arc<ResponseRegistry>,
41    pub(crate) registry: Arc<ActorRegistry<R>>,
42    pub(crate) ask_timeout: Duration,
43    pub(crate) reactor: R,
44    pub(crate) network: N,
45    pub(crate) _fs: F,
46    pub(crate) start_time: Instant,
47    pub(crate) source_addr: AddrHash,
48    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
49    /// Cache of direct `MailboxSender`s for local actors, keyed by `AddrHash`.
50    ///
51    /// Populated on the first `send_to` to a local actor; avoids the DashMap
52    /// lookup in `TransportRegistry::try_route` on subsequent sends.
53    /// Evicted when a send returns `ActorStopped` (receiver dropped).
54    /// Shared across handle clones via `Arc` so all clones benefit.
55    pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
56    /// Receiving end of the response pump mailbox.  Consumed by the first
57    /// `remote_addr_for` ask, which spawns a long-lived pump task.
58    pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
59}
60
61impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
62    fn advertise_existing_actors<T: palladium_transport::Transport>(
63        &self,
64        transport: &Arc<T>,
65    ) -> Result<(), TransportError> {
66        for info in self.registry.snapshot(&ActorQuery::default(), 0) {
67            if info.state != crate::introspection::ActorState::Running {
68                continue;
69            }
70            if let Some(slot) = self.registry.get_by_path(&info.path) {
71                transport.advertise_path(slot.addr, &info.path)?;
72            }
73        }
74        Ok(())
75    }
76
77    pub fn type_registry(&self) -> TypeRegistry {
78        self.type_registry.clone()
79    }
80
81    /// Send a message to an actor by its `AddrHash`.
82    ///
83    /// **Fast path**: after the first send to a local actor, the `MailboxSender`
84    /// is cached in `send_cache`, bypassing the `TransportRegistry` DashMap on
85    /// subsequent sends.  Stale entries are evicted on `ActorStopped`.
86    pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
87        let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
88        let payload = MessagePayload::local(msg);
89
90        // Fast path: use cached MailboxSender to avoid DashMap in TransportRegistry.
91        if let Some(sender) = self.send_cache.get(&target) {
92            return match sender.try_send(MailboxMessage { envelope, payload }) {
93                Ok(()) => Ok(()),
94                Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
95                Err(_) => {
96                    // Receiver dropped — evict stale cache entry.
97                    drop(sender); // release DashMap shard lock before remove
98                    self.send_cache.remove(&target);
99                    Err(SendError::ActorStopped)
100                }
101            };
102        }
103
104        // Cache miss: check if this is a local actor, populate cache, and send.
105        if let Some(sender) = self.transport_registry.get_local_sender(target) {
106            let result = sender.try_send(MailboxMessage { envelope, payload });
107            match result {
108                Ok(()) => {
109                    self.send_cache.insert(target, sender);
110                    return Ok(());
111                }
112                Err(TransportError::MailboxFull) => {
113                    // Actor is alive but full; cache for next send.
114                    self.send_cache.insert(target, sender);
115                    return Err(SendError::MailboxFull);
116                }
117                Err(_) => return Err(SendError::ActorStopped),
118            }
119        }
120
121        // Non-local destination (remote, QUIC, TCP, etc.): full routing.
122        self.transport_registry
123            .try_route(envelope, payload)
124            .map_err(|e| match e {
125                TransportError::MailboxFull => SendError::MailboxFull,
126                TransportError::ConnectionFailed => SendError::ConnectionFailed,
127                TransportError::Unroutable(_) => SendError::Unroutable,
128                TransportError::SerializationRequired
129                | TransportError::SerializationError(_)
130                | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
131                _ => SendError::ActorStopped,
132            })
133    }
134
135    /// Send up to `n` messages to one target using a producer-side batch fast path.
136    ///
137    /// For local destinations this uses `MailboxSender::try_send_batch_owned`,
138    /// reducing wake/schedule overhead vs repeated `send_to` calls.
139    ///
140    /// Returns the number of messages successfully enqueued. On local mailbox
141    /// backpressure this may be less than `n` without returning an error.
142    pub fn send_many<M: Message + Clone>(
143        &self,
144        target: AddrHash,
145        msg: M,
146        n: usize,
147    ) -> Result<usize, SendError> {
148        if n == 0 {
149            return Ok(0);
150        }
151
152        let make_batch = || {
153            let mut batch = Vec::with_capacity(n);
154            for _ in 0..n {
155                batch.push(MailboxMessage {
156                    envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
157                    payload: MessagePayload::local(msg.clone()),
158                });
159            }
160            batch
161        };
162
163        // Fast path: use cached MailboxSender.
164        if let Some(sender) = self.send_cache.get(&target) {
165            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
166            return match err {
167                None => Ok(sent),
168                Some(TransportError::MailboxFull) => Ok(sent),
169                _ => {
170                    drop(sender);
171                    self.send_cache.remove(&target);
172                    Err(SendError::ActorStopped)
173                }
174            };
175        }
176
177        // Cache miss: local actor lookup and batch send.
178        if let Some(sender) = self.transport_registry.get_local_sender(target) {
179            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
180            match err {
181                None => {
182                    self.send_cache.insert(target, sender);
183                    return Ok(sent);
184                }
185                Some(TransportError::MailboxFull) => {
186                    self.send_cache.insert(target, sender);
187                    return Ok(sent);
188                }
189                _ => return Err(SendError::ActorStopped),
190            }
191        }
192
193        // Non-local fallback: preserve send_to semantics.
194        let mut sent = 0usize;
195        for _ in 0..n {
196            match self.send_to::<M>(target, msg.clone()) {
197                Ok(()) => sent += 1,
198                Err(SendError::MailboxFull) => return Ok(sent),
199                Err(e) => return Err(e),
200            }
201        }
202        Ok(sent)
203    }
204
205    /// Create an `Addr<M>` that sends to `target` via this engine's transport.
206    pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
207    where
208        R: Send + Sync,
209    {
210        let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
211        let ask_fn = make_ask_fn::<M, R>(
212            target,
213            self.source_addr,
214            self.transport.clone(),
215            self.transport_registry.clone(),
216            self.responses.clone(),
217            self.ask_timeout,
218            self.reactor.clone(),
219        );
220        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
221    }
222
223    /// Create a remote `Addr<M>` that always serializes payloads before routing.
224    ///
225    /// This is intended for explicit remote sends; it rejects local destinations
226    /// to avoid accidental local delivery of serialized payloads.
227    pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
228        let registry = self.transport_registry.clone();
229        let source = self.source_addr;
230        let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
231            if registry.is_local(target) {
232                return Err(SendError::PolicyViolation);
233            }
234            let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
235            let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
236            registry
237                .try_route(envelope, MessagePayload::serialized(bytes))
238                .map_err(|e| match e {
239                    TransportError::MailboxFull => SendError::MailboxFull,
240                    TransportError::ConnectionFailed => SendError::ConnectionFailed,
241                    TransportError::Unroutable(_) => SendError::Unroutable,
242                    TransportError::SerializationRequired
243                    | TransportError::SerializationError(_)
244                    | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
245                    _ => SendError::ActorStopped,
246                })
247        });
248        let ask_fn: palladium_actor::AskFn<M> =
249            Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
250                Box::pin(async { Err(AskError::Unbound) })
251            });
252        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
253    }
254
255    /// Create a typed remote `Addr<M>` that supports both `send()` and `ask()`.
256    ///
257    /// Both request and response payloads are serialized for transport delivery.
258    /// Local destinations are rejected with `AskError::Send(PolicyViolation)`.
259    ///
260    /// Before calling, register the message type on both engines:
261    /// ```ignore
262    /// handle.type_registry().register_remote_ask::<MyMsg>();
263    /// ```
264    ///
265    /// The first `ask()` on the returned address lazily starts the engine's
266    /// response pump task (a lightweight background task that routes incoming
267    /// serialized response frames into the local response registry).
268    pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
269    where
270        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
271        R: Clone + Send + Sync,
272    {
273        let send_fn =
274            make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
275        let ask_fn = make_remote_ask_fn::<M, R>(
276            target,
277            self.source_addr,
278            self.transport_registry.clone(),
279            self.responses.clone(),
280            self.ask_timeout,
281            self.reactor.clone(),
282            Arc::clone(&self.pump_rx),
283        );
284        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
285    }
286
287    pub fn transport(&self) -> &Arc<InProcessTransport> {
288        &self.transport
289    }
290
291    pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
292        &self.transport_registry
293    }
294
295    pub async fn attach_quic_transport(
296        &self,
297        config: QuicTransportConfig,
298        peers: HashMap<EngineId, SocketAddr>,
299    ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
300    where
301        R: Clone,
302        N: Clone,
303    {
304        let transport = QuicTransport::bind(
305            config,
306            peers,
307            self.transport_registry.clone(),
308            self.type_registry.clone(),
309            self.reactor.clone(),
310            self.network.clone(),
311        )
312        .await?;
313        self.transport_registry.add_transport(transport.clone())?;
314        self.advertise_existing_actors(&transport)?;
315        Ok(transport)
316    }
317
318    pub async fn attach_tcp_transport(
319        &self,
320        config: TcpTransportConfig,
321        peers: HashMap<EngineId, SocketAddr>,
322    ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
323    where
324        R: Clone,
325        N: Clone,
326    {
327        let transport = TcpTransport::bind(
328            config,
329            peers,
330            self.transport_registry.clone(),
331            self.type_registry.clone(),
332            self.reactor.clone(),
333            self.network.clone(),
334        )
335        .await?;
336        self.transport_registry.add_transport(transport.clone())?;
337        self.advertise_existing_actors(&transport)?;
338        Ok(transport)
339    }
340
341    pub async fn attach_bounded_cluster(
342        &self,
343        config: BoundedClusterConfig,
344    ) -> Result<BoundedClusterHandle<R, N, F>, ClusterError>
345    where
346        R: Clone,
347        N: Clone,
348    {
349        validate_config(&config, &self.registry)?;
350
351        let local_engine_id = config.transport.engine_id().clone();
352        let peers = config.peers;
353        let declared_actors = config.declared_actors;
354        let roles = config.roles;
355
356        let control_plane = match &config.transport {
357            BoundedTransportConfig::Tcp(config) => ControlPlaneClientConfig {
358                protocol: ControlPlaneProtocol::Tcp,
359                tls: config.tls.clone(),
360                wait_timeout: self.ask_timeout,
361            },
362            BoundedTransportConfig::Quic(config) => ControlPlaneClientConfig {
363                protocol: ControlPlaneProtocol::Quic,
364                tls: config.tls.clone(),
365                wait_timeout: self.ask_timeout,
366            },
367        };
368
369        let transport = match config.transport {
370            BoundedTransportConfig::Tcp(config) => {
371                let transport = self
372                    .attach_tcp_transport(config, HashMap::new())
373                    .await
374                    .map_err(ClusterError::Transport)?;
375                for peer in &peers {
376                    if let Some(server_name) = &peer.server_name {
377                        transport.add_peer_with_server_name(
378                            peer.engine_id.clone(),
379                            peer.addr,
380                            server_name.clone(),
381                        );
382                    } else {
383                        transport.add_peer(peer.engine_id.clone(), peer.addr);
384                    }
385                }
386                self.advertise_existing_actors(&transport)
387                    .map_err(ClusterError::Transport)?;
388                BoundedTransportHandle::Tcp(transport)
389            }
390            BoundedTransportConfig::Quic(config) => {
391                let transport = self
392                    .attach_quic_transport(config, HashMap::new())
393                    .await
394                    .map_err(ClusterError::Transport)?;
395                for peer in &peers {
396                    if let Some(server_name) = &peer.server_name {
397                        transport.add_peer_with_server_name(
398                            peer.engine_id.clone(),
399                            peer.addr,
400                            server_name.clone(),
401                        );
402                    } else {
403                        transport.add_peer(peer.engine_id.clone(), peer.addr);
404                    }
405                }
406                self.advertise_existing_actors(&transport)
407                    .map_err(ClusterError::Transport)?;
408                BoundedTransportHandle::Quic(transport)
409            }
410        };
411
412        let mut cluster = BoundedClusterHandle::new(
413            self.clone(),
414            local_engine_id,
415            control_plane,
416            transport,
417            peers,
418            roles,
419            self.registry.clone(),
420        );
421        for declared in declared_actors {
422            cluster.declare_remote_actor(declared.owner, declared.path)?;
423        }
424        Ok(cluster)
425    }
426
427    /// Create a typed remote `Addr<M>` from a canonical actor path.
428    ///
429    /// The cross-engine routing key is `AddrHash::new(path, 0)`, which is the
430    /// canonical distributed identity for a logical actor path.
431    pub fn remote_addr_for_path<M: RemoteMessage>(
432        &self,
433        path: &ActorPath,
434    ) -> palladium_actor::Addr<M>
435    where
436        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
437        R: Clone + Send + Sync,
438    {
439        self.remote_addr_for::<M>(AddrHash::new(path, 0))
440    }
441
442    /// Create a stable typed remote handle from a canonical actor path.
443    ///
444    /// This v1 API makes the logical remote identity stable by path. It does
445    /// not provide remote restart notifications or continuous liveness
446    /// tracking. During remote absence or re-advertisement windows, callers may
447    /// still observe the same transient failures as `remote_addr_for_path()`,
448    /// such as `Unroutable`, `ConnectionFailed`, or `Timeout`.
449    pub fn remote_stable_addr_for_path<M: RemoteMessage>(&self, path: &ActorPath) -> StableAddr<M>
450    where
451        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
452        R: Clone + Send + Sync,
453    {
454        StableAddr::from_addr(self.remote_addr_for_path::<M>(path))
455    }
456
457    // ── Introspection (REQ-085) ───────────────────────────────────────────────
458
459    /// List actors, optionally filtered by `query`.
460    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
461        self.registry.snapshot(&query, 0)
462    }
463
464    /// Get info for a single actor by path.
465    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
466        self.registry.actor_info_by_path(path, 0)
467    }
468
469    /// System-level snapshot: all actors + uptime.
470    pub fn snapshot(&self) -> EngineSnapshot {
471        let actors = self.actor_list(ActorQuery::default());
472        let uptime_secs = self.start_time.elapsed().as_secs();
473        EngineSnapshot {
474            num_cores: 1,
475            uptime_secs,
476            actors,
477        }
478    }
479
480    /// Lookup an actor's address by its path.
481    pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
482        self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
483            self.federated_routing
484                .as_ref()
485                .and_then(|routing| routing.resolve_optional(path))
486        })
487    }
488
489    /// Resolve an actor's address by path with routing errors.
490    pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
491        if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
492            return Ok(local);
493        }
494        if let Some(routing) = &self.federated_routing {
495            return routing.resolve(path);
496        }
497        Err(SendError::Unroutable)
498    }
499
500    /// Send a hard-stop signal to an actor by path.
501    ///
502    /// Returns `true` if the actor was found and the signal was sent. The
503    /// actor may still be running until the signal is processed by the
504    /// reactor. Used by `FaultInjector::crash_actor`.
505    pub fn stop_actor(&self, path: &ActorPath) -> bool {
506        if let Some(slot) = self.registry.get_by_path(path) {
507            slot.ctrl_tx
508                .send(LifecycleSignal::Stop(StopReason::Supervisor))
509                .ok();
510            true
511        } else {
512            false
513        }
514    }
515
516    /// Fill an actor's mailbox to capacity by sending dummy messages.
517    ///
518    /// Returns the number of messages that were successfully enqueued.
519    /// Subsequent sends to the same actor will return `SendError::MailboxFull`
520    /// until the actor drains messages. Used by `FaultInjector::fill_mailbox`.
521    pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
522        let addr = self.registry.get_by_path(path).map(|s| s.addr);
523        let Some(addr) = addr else { return 0 };
524        let src = AddrHash::synthetic(b"fault-fill-mailbox");
525        let mut count = 0;
526        loop {
527            let env = Envelope::new(src, addr, 0, 0);
528            match self.transport.try_deliver(env, MessagePayload::local(())) {
529                Ok(()) => count += 1,
530                Err(_) => break,
531            }
532        }
533        count
534    }
535
536    /// Dynamically spawn a user actor by sending a signal to the root /user supervisor.
537    pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
538        let user_path = ActorPath::parse("/user").unwrap();
539        if let Some(slot) = self.registry.get_by_path(&user_path) {
540            slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
541        }
542    }
543
544    /// Spawn a pool of N identical worker actors under the `/user` supervisor
545    /// and return a [`WorkerPool<M>`] routing handle.
546    ///
547    /// Each worker gets a [`StableAddr<M>`] backed by an [`AddrRefresher`] so
548    /// that the pool transparently routes to new incarnations after restarts.
549    ///
550    /// # Ordering
551    ///
552    /// Spawn requests are sent asynchronously to the `/user` supervisor.  The
553    /// returned pool's `StableAddr`s are initially `None`; they are populated
554    /// once the supervisor task processes each `SpawnChild` signal.  Yield at
555    /// least once (e.g. `tokio::task::yield_now().await`) before sending
556    /// messages through the pool.
557    ///
558    /// # Panics
559    ///
560    /// Panics if `config.size` is 0 (no workers requested).
561    pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
562    where
563        M: Message,
564        R: Clone + Send + Sync + 'static,
565        G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
566    {
567        let user_path = ActorPath::parse("/user").unwrap();
568        let handle = self.clone();
569        crate::worker_pool::spawn_worker_pool(
570            &user_path,
571            config,
572            factory,
573            self.source_addr,
574            &self.transport,
575            &self.transport_registry,
576            &self.responses,
577            &self.reactor,
578            move |spec| handle.spawn_user_actor(spec),
579        )
580    }
581}