Skip to main content

palladium_runtime/engine/
handle.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9    ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10    RemoteMessage, SendError, StopReason,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14    InProcessTransport, MailboxMessage, MailboxSender, QuicTransport, QuicTransportConfig,
15    TcpTransport, TcpTransportConfig, TransportError, TransportRegistry, TypeRegistry,
16};
17
18use crate::addr::{make_ask_fn, make_send_fn};
19use crate::common::LifecycleSignal;
20use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
21use crate::reactor::{Reactor, TokioReactor};
22use crate::registry::ActorRegistry;
23use crate::responses::ResponseRegistry;
24
25/// Handle for interacting with the engine from external (test) code.
26#[derive(Clone)]
27pub struct EngineHandle<
28    R: Reactor = TokioReactor,
29    N: Network = TokioNetwork,
30    F: FileSystem = TokioFileSystem,
31> {
32    pub(crate) transport: Arc<InProcessTransport>,
33    pub(crate) transport_registry: Arc<TransportRegistry>,
34    pub(crate) type_registry: TypeRegistry,
35    pub(crate) responses: Arc<ResponseRegistry>,
36    pub(crate) registry: Arc<ActorRegistry<R>>,
37    pub(crate) ask_timeout: Duration,
38    pub(crate) reactor: R,
39    pub(crate) network: N,
40    pub(crate) _fs: F,
41    pub(crate) start_time: Instant,
42    pub(crate) source_addr: AddrHash,
43    pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
44    /// Cache of direct `MailboxSender`s for local actors, keyed by `AddrHash`.
45    ///
46    /// Populated on the first `send_to` to a local actor; avoids the DashMap
47    /// lookup in `TransportRegistry::try_route` on subsequent sends.
48    /// Evicted when a send returns `ActorStopped` (receiver dropped).
49    /// Shared across handle clones via `Arc` so all clones benefit.
50    pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
51}
52
53impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
54    pub fn type_registry(&self) -> TypeRegistry {
55        self.type_registry.clone()
56    }
57
58    /// Send a message to an actor by its `AddrHash`.
59    ///
60    /// **Fast path**: after the first send to a local actor, the `MailboxSender`
61    /// is cached in `send_cache`, bypassing the `TransportRegistry` DashMap on
62    /// subsequent sends.  Stale entries are evicted on `ActorStopped`.
63    pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
64        let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
65        let payload = MessagePayload::local(msg);
66
67        // Fast path: use cached MailboxSender to avoid DashMap in TransportRegistry.
68        if let Some(sender) = self.send_cache.get(&target) {
69            return match sender.try_send(MailboxMessage { envelope, payload }) {
70                Ok(()) => Ok(()),
71                Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
72                Err(_) => {
73                    // Receiver dropped — evict stale cache entry.
74                    drop(sender); // release DashMap shard lock before remove
75                    self.send_cache.remove(&target);
76                    Err(SendError::ActorStopped)
77                }
78            };
79        }
80
81        // Cache miss: check if this is a local actor, populate cache, and send.
82        if let Some(sender) = self.transport_registry.get_local_sender(target) {
83            let result = sender.try_send(MailboxMessage { envelope, payload });
84            match result {
85                Ok(()) => {
86                    self.send_cache.insert(target, sender);
87                    return Ok(());
88                }
89                Err(TransportError::MailboxFull) => {
90                    // Actor is alive but full; cache for next send.
91                    self.send_cache.insert(target, sender);
92                    return Err(SendError::MailboxFull);
93                }
94                Err(_) => return Err(SendError::ActorStopped),
95            }
96        }
97
98        // Non-local destination (remote, QUIC, TCP, etc.): full routing.
99        self.transport_registry
100            .try_route(envelope, payload)
101            .map_err(|e| match e {
102                TransportError::MailboxFull => SendError::MailboxFull,
103                TransportError::ConnectionFailed => SendError::ConnectionFailed,
104                TransportError::Unroutable(_) => SendError::Unroutable,
105                TransportError::SerializationRequired | TransportError::SerializationError(_) => {
106                    SendError::SerializationFailed
107                }
108                _ => SendError::ActorStopped,
109            })
110    }
111
112    /// Send up to `n` messages to one target using a producer-side batch fast path.
113    ///
114    /// For local destinations this uses `MailboxSender::try_send_batch_owned`,
115    /// reducing wake/schedule overhead vs repeated `send_to` calls.
116    ///
117    /// Returns the number of messages successfully enqueued. On local mailbox
118    /// backpressure this may be less than `n` without returning an error.
119    pub fn send_many<M: Message + Clone>(
120        &self,
121        target: AddrHash,
122        msg: M,
123        n: usize,
124    ) -> Result<usize, SendError> {
125        if n == 0 {
126            return Ok(0);
127        }
128
129        let make_batch = || {
130            let mut batch = Vec::with_capacity(n);
131            for _ in 0..n {
132                batch.push(MailboxMessage {
133                    envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
134                    payload: MessagePayload::local(msg.clone()),
135                });
136            }
137            batch
138        };
139
140        // Fast path: use cached MailboxSender.
141        if let Some(sender) = self.send_cache.get(&target) {
142            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
143            return match err {
144                None => Ok(sent),
145                Some(TransportError::MailboxFull) => Ok(sent),
146                _ => {
147                    drop(sender);
148                    self.send_cache.remove(&target);
149                    Err(SendError::ActorStopped)
150                }
151            };
152        }
153
154        // Cache miss: local actor lookup and batch send.
155        if let Some(sender) = self.transport_registry.get_local_sender(target) {
156            let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
157            match err {
158                None => {
159                    self.send_cache.insert(target, sender);
160                    return Ok(sent);
161                }
162                Some(TransportError::MailboxFull) => {
163                    self.send_cache.insert(target, sender);
164                    return Ok(sent);
165                }
166                _ => return Err(SendError::ActorStopped),
167            }
168        }
169
170        // Non-local fallback: preserve send_to semantics.
171        let mut sent = 0usize;
172        for _ in 0..n {
173            match self.send_to::<M>(target, msg.clone()) {
174                Ok(()) => sent += 1,
175                Err(SendError::MailboxFull) => return Ok(sent),
176                Err(e) => return Err(e),
177            }
178        }
179        Ok(sent)
180    }
181
182    /// Create an `Addr<M>` that sends to `target` via this engine's transport.
183    pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
184    where
185        R: Send + Sync,
186    {
187        let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
188        let ask_fn = make_ask_fn::<M, R>(
189            target,
190            self.source_addr,
191            self.transport.clone(),
192            self.transport_registry.clone(),
193            self.responses.clone(),
194            self.ask_timeout,
195            self.reactor.clone(),
196        );
197        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
198    }
199
200    /// Create a remote `Addr<M>` that always serializes payloads before routing.
201    ///
202    /// This is intended for explicit remote sends; it rejects local destinations
203    /// to avoid accidental local delivery of serialized payloads.
204    pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
205        let registry = self.transport_registry.clone();
206        let source = self.source_addr;
207        let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
208            if registry.is_local(target) {
209                return Err(SendError::PolicyViolation);
210            }
211            let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
212            let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
213            registry
214                .try_route(envelope, MessagePayload::serialized(bytes))
215                .map_err(|e| match e {
216                    TransportError::MailboxFull => SendError::MailboxFull,
217                    TransportError::ConnectionFailed => SendError::ConnectionFailed,
218                    TransportError::Unroutable(_) => SendError::Unroutable,
219                    TransportError::SerializationRequired
220                    | TransportError::SerializationError(_) => SendError::SerializationFailed,
221                    _ => SendError::ActorStopped,
222                })
223        });
224        let ask_fn: palladium_actor::AskFn<M> =
225            Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
226                Box::pin(async { Err(AskError::Unbound) })
227            });
228        palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
229    }
230
231    pub fn transport(&self) -> &Arc<InProcessTransport> {
232        &self.transport
233    }
234
235    pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
236        &self.transport_registry
237    }
238
239    pub async fn attach_quic_transport(
240        &self,
241        config: QuicTransportConfig,
242        peers: HashMap<EngineId, SocketAddr>,
243    ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
244    where
245        R: Clone,
246        N: Clone,
247    {
248        let transport = QuicTransport::bind(
249            config,
250            peers,
251            self.transport_registry.clone(),
252            self.type_registry.clone(),
253            self.reactor.clone(),
254            self.network.clone(),
255        )
256        .await?;
257        self.transport_registry.add_transport(transport.clone())?;
258        Ok(transport)
259    }
260
261    pub async fn attach_tcp_transport(
262        &self,
263        config: TcpTransportConfig,
264        peers: HashMap<EngineId, SocketAddr>,
265    ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
266    where
267        R: Clone,
268        N: Clone,
269    {
270        let transport = TcpTransport::bind(
271            config,
272            peers,
273            self.transport_registry.clone(),
274            self.type_registry.clone(),
275            self.reactor.clone(),
276            self.network.clone(),
277        )
278        .await?;
279        self.transport_registry.add_transport(transport.clone())?;
280        Ok(transport)
281    }
282
283    // ── Introspection (REQ-085) ───────────────────────────────────────────────
284
285    /// List actors, optionally filtered by `query`.
286    pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
287        self.registry.snapshot(&query, 0)
288    }
289
290    /// Get info for a single actor by path.
291    pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
292        self.registry.actor_info_by_path(path, 0)
293    }
294
295    /// System-level snapshot: all actors + uptime.
296    pub fn snapshot(&self) -> EngineSnapshot {
297        let actors = self.actor_list(ActorQuery::default());
298        let uptime_secs = self.start_time.elapsed().as_secs();
299        EngineSnapshot {
300            num_cores: 1,
301            uptime_secs,
302            actors,
303        }
304    }
305
306    /// Lookup an actor's address by its path.
307    pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
308        self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
309            self.federated_routing
310                .as_ref()
311                .and_then(|routing| routing.resolve_optional(path))
312        })
313    }
314
315    /// Resolve an actor's address by path with routing errors.
316    pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
317        if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
318            return Ok(local);
319        }
320        if let Some(routing) = &self.federated_routing {
321            return routing.resolve(path);
322        }
323        Err(SendError::Unroutable)
324    }
325
326    /// Send a hard-stop signal to an actor by path.
327    ///
328    /// Returns `true` if the actor was found and the signal was sent. The
329    /// actor may still be running until the signal is processed by the
330    /// reactor. Used by `FaultInjector::crash_actor`.
331    pub fn stop_actor(&self, path: &ActorPath) -> bool {
332        if let Some(slot) = self.registry.get_by_path(path) {
333            slot.ctrl_tx
334                .send(LifecycleSignal::Stop(StopReason::Supervisor))
335                .ok();
336            true
337        } else {
338            false
339        }
340    }
341
342    /// Fill an actor's mailbox to capacity by sending dummy messages.
343    ///
344    /// Returns the number of messages that were successfully enqueued.
345    /// Subsequent sends to the same actor will return `SendError::MailboxFull`
346    /// until the actor drains messages. Used by `FaultInjector::fill_mailbox`.
347    pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
348        let addr = self.registry.get_by_path(path).map(|s| s.addr);
349        let Some(addr) = addr else { return 0 };
350        let src = AddrHash::synthetic(b"fault-fill-mailbox");
351        let mut count = 0;
352        loop {
353            let env = Envelope::new(src, addr, 0, 0);
354            match self.transport.try_deliver(env, MessagePayload::local(())) {
355                Ok(()) => count += 1,
356                Err(_) => break,
357            }
358        }
359        count
360    }
361
362    /// Dynamically spawn a user actor by sending a signal to the root /user supervisor.
363    pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
364        let user_path = ActorPath::parse("/user").unwrap();
365        if let Some(slot) = self.registry.get_by_path(&user_path) {
366            slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
367        }
368    }
369}