1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9 Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10 PoolConfig, RemoteMessage, SendError, StableAddr, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14 mailbox, InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15 QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16 TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::bounded_cluster::{
21 register_spawn_peer_installer, remote_spawn_wait_timeout, validate_config,
22 BoundedClusterConfig, BoundedClusterHandle, BoundedTransportConfig, BoundedTransportHandle,
23 ClusterError, ControlPlaneClientConfig, ControlPlaneProtocol,
24};
25use crate::common::LifecycleSignal;
26use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
27use crate::reactor::{Reactor, TokioReactor};
28use crate::registry::ActorRegistry;
29use crate::responses::ResponseRegistry;
30
31#[derive(Clone)]
33pub struct EngineHandle<
34 R: Reactor = TokioReactor,
35 N: Network = TokioNetwork,
36 F: FileSystem = TokioFileSystem,
37> {
38 pub(crate) transport: Arc<InProcessTransport>,
39 pub(crate) transport_registry: Arc<TransportRegistry>,
40 pub(crate) type_registry: TypeRegistry,
41 pub(crate) responses: Arc<ResponseRegistry>,
42 pub(crate) registry: Arc<ActorRegistry<R>>,
43 pub(crate) ask_timeout: Duration,
44 pub(crate) reactor: R,
45 pub(crate) network: N,
46 pub(crate) _fs: F,
47 pub(crate) start_time: Instant,
48 pub(crate) source_addr: AddrHash,
49 pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
50 pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
57 pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
60}
61
62impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
63 fn advertise_existing_actors<T: palladium_transport::Transport>(
64 &self,
65 transport: &Arc<T>,
66 ) -> Result<(), TransportError> {
67 transport.register(self.source_addr, mailbox(1).0)?;
68 for info in self.registry.snapshot(&ActorQuery::default(), 0) {
69 if info.state != crate::introspection::ActorState::Running {
70 continue;
71 }
72 if let Some(slot) = self.registry.get_by_path(&info.path) {
73 transport.advertise_path(slot.addr, &info.path)?;
74 }
75 }
76 Ok(())
77 }
78
79 pub fn type_registry(&self) -> TypeRegistry {
80 self.type_registry.clone()
81 }
82
83 pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
89 let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
90 let payload = MessagePayload::local(msg);
91
92 if let Some(sender) = self.send_cache.get(&target) {
94 return match sender.try_send(MailboxMessage { envelope, payload }) {
95 Ok(()) => Ok(()),
96 Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
97 Err(_) => {
98 drop(sender); self.send_cache.remove(&target);
101 Err(SendError::ActorStopped)
102 }
103 };
104 }
105
106 if let Some(sender) = self.transport_registry.get_local_sender(target) {
108 let result = sender.try_send(MailboxMessage { envelope, payload });
109 match result {
110 Ok(()) => {
111 self.send_cache.insert(target, sender);
112 return Ok(());
113 }
114 Err(TransportError::MailboxFull) => {
115 self.send_cache.insert(target, sender);
117 return Err(SendError::MailboxFull);
118 }
119 Err(_) => return Err(SendError::ActorStopped),
120 }
121 }
122
123 self.transport_registry
125 .try_route(envelope, payload)
126 .map_err(|e| match e {
127 TransportError::MailboxFull => SendError::MailboxFull,
128 TransportError::ConnectionFailed => SendError::ConnectionFailed,
129 TransportError::Unroutable(_) => SendError::Unroutable,
130 TransportError::SerializationRequired
131 | TransportError::SerializationError(_)
132 | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
133 _ => SendError::ActorStopped,
134 })
135 }
136
137 pub fn send_many<M: Message + Clone>(
145 &self,
146 target: AddrHash,
147 msg: M,
148 n: usize,
149 ) -> Result<usize, SendError> {
150 if n == 0 {
151 return Ok(0);
152 }
153
154 let make_batch = || {
155 let mut batch = Vec::with_capacity(n);
156 for _ in 0..n {
157 batch.push(MailboxMessage {
158 envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
159 payload: MessagePayload::local(msg.clone()),
160 });
161 }
162 batch
163 };
164
165 if let Some(sender) = self.send_cache.get(&target) {
167 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
168 return match err {
169 None => Ok(sent),
170 Some(TransportError::MailboxFull) => Ok(sent),
171 _ => {
172 drop(sender);
173 self.send_cache.remove(&target);
174 Err(SendError::ActorStopped)
175 }
176 };
177 }
178
179 if let Some(sender) = self.transport_registry.get_local_sender(target) {
181 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
182 match err {
183 None => {
184 self.send_cache.insert(target, sender);
185 return Ok(sent);
186 }
187 Some(TransportError::MailboxFull) => {
188 self.send_cache.insert(target, sender);
189 return Ok(sent);
190 }
191 _ => return Err(SendError::ActorStopped),
192 }
193 }
194
195 let mut sent = 0usize;
197 for _ in 0..n {
198 match self.send_to::<M>(target, msg.clone()) {
199 Ok(()) => sent += 1,
200 Err(SendError::MailboxFull) => return Ok(sent),
201 Err(e) => return Err(e),
202 }
203 }
204 Ok(sent)
205 }
206
207 pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
209 where
210 R: Send + Sync,
211 {
212 let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
213 let ask_fn = make_ask_fn::<M, R>(
214 target,
215 self.source_addr,
216 self.transport.clone(),
217 self.transport_registry.clone(),
218 self.responses.clone(),
219 self.ask_timeout,
220 self.reactor.clone(),
221 );
222 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
223 }
224
225 pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
230 let registry = self.transport_registry.clone();
231 let source = self.source_addr;
232 let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
233 if registry.is_local(target) {
234 return Err(SendError::PolicyViolation);
235 }
236 let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
237 let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
238 registry
239 .try_route(envelope, MessagePayload::serialized(bytes))
240 .map_err(|e| match e {
241 TransportError::MailboxFull => SendError::MailboxFull,
242 TransportError::ConnectionFailed => SendError::ConnectionFailed,
243 TransportError::Unroutable(_) => SendError::Unroutable,
244 TransportError::SerializationRequired
245 | TransportError::SerializationError(_)
246 | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
247 _ => SendError::ActorStopped,
248 })
249 });
250 let ask_fn: palladium_actor::AskFn<M> =
251 Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
252 Box::pin(async { Err(AskError::Unbound) })
253 });
254 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
255 }
256
257 pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
271 where
272 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
273 R: Clone + Send + Sync,
274 {
275 let send_fn =
276 make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
277 let ask_fn = make_remote_ask_fn::<M, R>(
278 target,
279 self.source_addr,
280 self.transport_registry.clone(),
281 self.responses.clone(),
282 self.ask_timeout,
283 self.reactor.clone(),
284 Arc::clone(&self.pump_rx),
285 );
286 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
287 }
288
289 pub fn transport(&self) -> &Arc<InProcessTransport> {
290 &self.transport
291 }
292
293 pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
294 &self.transport_registry
295 }
296
297 pub async fn attach_quic_transport(
298 &self,
299 config: QuicTransportConfig,
300 peers: HashMap<EngineId, SocketAddr>,
301 ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
302 where
303 R: Clone,
304 N: Clone,
305 {
306 let transport = QuicTransport::bind(
307 config,
308 peers,
309 self.transport_registry.clone(),
310 self.type_registry.clone(),
311 self.reactor.clone(),
312 self.network.clone(),
313 )
314 .await?;
315 self.transport_registry.add_transport(transport.clone())?;
316 self.advertise_existing_actors(&transport)?;
317 Ok(transport)
318 }
319
320 pub async fn attach_tcp_transport(
321 &self,
322 config: TcpTransportConfig,
323 peers: HashMap<EngineId, SocketAddr>,
324 ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
325 where
326 R: Clone,
327 N: Clone,
328 {
329 let transport = TcpTransport::bind(
330 config,
331 peers,
332 self.transport_registry.clone(),
333 self.type_registry.clone(),
334 self.reactor.clone(),
335 self.network.clone(),
336 )
337 .await?;
338 self.transport_registry.add_transport(transport.clone())?;
339 self.advertise_existing_actors(&transport)?;
340 Ok(transport)
341 }
342
343 pub async fn attach_bounded_cluster(
344 &self,
345 config: BoundedClusterConfig,
346 ) -> Result<BoundedClusterHandle<R, N, F>, ClusterError>
347 where
348 R: Clone,
349 N: Clone,
350 {
351 validate_config(&config, &self.registry)?;
352
353 let local_engine_id = config.transport.engine_id().clone();
354 let peers = config.peers;
355 let declared_actors = config.declared_actors;
356 let roles = config.roles;
357
358 let control_plane = match &config.transport {
359 BoundedTransportConfig::Tcp(config) => ControlPlaneClientConfig {
360 protocol: ControlPlaneProtocol::Tcp,
361 tls: config.tls.clone(),
362 wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
363 },
364 BoundedTransportConfig::Quic(config) => ControlPlaneClientConfig {
365 protocol: ControlPlaneProtocol::Quic,
366 tls: config.tls.clone(),
367 wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
368 },
369 };
370
371 let transport = match config.transport {
372 BoundedTransportConfig::Tcp(config) => {
373 let transport = self
374 .attach_tcp_transport(config, HashMap::new())
375 .await
376 .map_err(ClusterError::Transport)?;
377 for peer in &peers {
378 if let Some(server_name) = &peer.server_name {
379 transport.add_peer_with_server_name(
380 peer.engine_id.clone(),
381 peer.addr,
382 server_name.clone(),
383 );
384 } else {
385 transport.add_peer(peer.engine_id.clone(), peer.addr);
386 }
387 }
388 self.advertise_existing_actors(&transport)
389 .map_err(ClusterError::Transport)?;
390 BoundedTransportHandle::Tcp(transport)
391 }
392 BoundedTransportConfig::Quic(config) => {
393 let transport = self
394 .attach_quic_transport(config, HashMap::new())
395 .await
396 .map_err(ClusterError::Transport)?;
397 for peer in &peers {
398 if let Some(server_name) = &peer.server_name {
399 transport.add_peer_with_server_name(
400 peer.engine_id.clone(),
401 peer.addr,
402 server_name.clone(),
403 );
404 } else {
405 transport.add_peer(peer.engine_id.clone(), peer.addr);
406 }
407 }
408 self.advertise_existing_actors(&transport)
409 .map_err(ClusterError::Transport)?;
410 BoundedTransportHandle::Quic(transport)
411 }
412 };
413
414 let cluster_engine_id = local_engine_id.clone();
415 let mut cluster = BoundedClusterHandle::new(
416 self.clone(),
417 local_engine_id,
418 control_plane,
419 transport,
420 peers,
421 roles,
422 self.registry.clone(),
423 );
424 register_spawn_peer_installer(&cluster_engine_id, cluster.transport());
425 for declared in declared_actors {
426 cluster.declare_remote_actor(declared.owner, declared.path)?;
427 }
428 cluster.advertise_existing_local_routes()?;
429 Ok(cluster)
430 }
431
432 pub fn remote_addr_for_path<M: RemoteMessage>(
437 &self,
438 path: &ActorPath,
439 ) -> palladium_actor::Addr<M>
440 where
441 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
442 R: Clone + Send + Sync,
443 {
444 self.remote_addr_for::<M>(AddrHash::new(path, 0))
445 }
446
447 pub fn remote_stable_addr_for_path<M: RemoteMessage>(&self, path: &ActorPath) -> StableAddr<M>
455 where
456 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
457 R: Clone + Send + Sync,
458 {
459 StableAddr::from_addr(self.remote_addr_for_path::<M>(path))
460 }
461
462 pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
466 self.registry.snapshot(&query, 0)
467 }
468
469 pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
471 self.registry.actor_info_by_path(path, 0)
472 }
473
474 pub fn snapshot(&self) -> EngineSnapshot {
476 let actors = self.actor_list(ActorQuery::default());
477 let uptime_secs = self.start_time.elapsed().as_secs();
478 EngineSnapshot {
479 num_cores: 1,
480 uptime_secs,
481 actors,
482 }
483 }
484
485 pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
487 self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
488 self.federated_routing
489 .as_ref()
490 .and_then(|routing| routing.resolve_optional(path))
491 })
492 }
493
494 pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
496 if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
497 return Ok(local);
498 }
499 if let Some(routing) = &self.federated_routing {
500 return routing.resolve(path);
501 }
502 Err(SendError::Unroutable)
503 }
504
505 pub fn stop_actor(&self, path: &ActorPath) -> bool {
511 if let Some(slot) = self.registry.get_by_path(path) {
512 slot.ctrl_tx
513 .send(LifecycleSignal::Stop(StopReason::Supervisor))
514 .ok();
515 true
516 } else {
517 false
518 }
519 }
520
521 pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
527 let addr = self.registry.get_by_path(path).map(|s| s.addr);
528 let Some(addr) = addr else { return 0 };
529 let src = AddrHash::synthetic(b"fault-fill-mailbox");
530 let mut count = 0;
531 loop {
532 let env = Envelope::new(src, addr, 0, 0);
533 match self.transport.try_deliver(env, MessagePayload::local(())) {
534 Ok(()) => count += 1,
535 Err(_) => break,
536 }
537 }
538 count
539 }
540
541 pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
543 let user_path = ActorPath::parse("/user").unwrap();
544 if let Some(slot) = self.registry.get_by_path(&user_path) {
545 slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
546 }
547 }
548
549 pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
567 where
568 M: Message,
569 R: Clone + Send + Sync + 'static,
570 G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
571 {
572 let user_path = ActorPath::parse("/user").unwrap();
573 let handle = self.clone();
574 crate::worker_pool::spawn_worker_pool(
575 &user_path,
576 config,
577 factory,
578 self.source_addr,
579 &self.transport,
580 &self.transport_registry,
581 &self.responses,
582 &self.reactor,
583 move |spec| handle.spawn_user_actor(spec),
584 )
585 }
586}