1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9 Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10 PoolConfig, RemoteMessage, SendError, StableAddr, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14 InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15 QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16 TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::bounded_cluster::{
21 remote_spawn_wait_timeout, validate_config, BoundedClusterConfig, BoundedClusterHandle,
22 BoundedTransportConfig, BoundedTransportHandle, ClusterError, ControlPlaneClientConfig,
23 ControlPlaneProtocol,
24};
25use crate::common::LifecycleSignal;
26use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
27use crate::reactor::{Reactor, TokioReactor};
28use crate::registry::ActorRegistry;
29use crate::responses::ResponseRegistry;
30
31#[derive(Clone)]
33pub struct EngineHandle<
34 R: Reactor = TokioReactor,
35 N: Network = TokioNetwork,
36 F: FileSystem = TokioFileSystem,
37> {
38 pub(crate) transport: Arc<InProcessTransport>,
39 pub(crate) transport_registry: Arc<TransportRegistry>,
40 pub(crate) type_registry: TypeRegistry,
41 pub(crate) responses: Arc<ResponseRegistry>,
42 pub(crate) registry: Arc<ActorRegistry<R>>,
43 pub(crate) ask_timeout: Duration,
44 pub(crate) reactor: R,
45 pub(crate) network: N,
46 pub(crate) _fs: F,
47 pub(crate) start_time: Instant,
48 pub(crate) source_addr: AddrHash,
49 pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
50 pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
57 pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
60}
61
62impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
63 fn advertise_existing_actors<T: palladium_transport::Transport>(
64 &self,
65 transport: &Arc<T>,
66 ) -> Result<(), TransportError> {
67 for info in self.registry.snapshot(&ActorQuery::default(), 0) {
68 if info.state != crate::introspection::ActorState::Running {
69 continue;
70 }
71 if let Some(slot) = self.registry.get_by_path(&info.path) {
72 transport.advertise_path(slot.addr, &info.path)?;
73 }
74 }
75 Ok(())
76 }
77
78 pub fn type_registry(&self) -> TypeRegistry {
79 self.type_registry.clone()
80 }
81
82 pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
88 let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
89 let payload = MessagePayload::local(msg);
90
91 if let Some(sender) = self.send_cache.get(&target) {
93 return match sender.try_send(MailboxMessage { envelope, payload }) {
94 Ok(()) => Ok(()),
95 Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
96 Err(_) => {
97 drop(sender); self.send_cache.remove(&target);
100 Err(SendError::ActorStopped)
101 }
102 };
103 }
104
105 if let Some(sender) = self.transport_registry.get_local_sender(target) {
107 let result = sender.try_send(MailboxMessage { envelope, payload });
108 match result {
109 Ok(()) => {
110 self.send_cache.insert(target, sender);
111 return Ok(());
112 }
113 Err(TransportError::MailboxFull) => {
114 self.send_cache.insert(target, sender);
116 return Err(SendError::MailboxFull);
117 }
118 Err(_) => return Err(SendError::ActorStopped),
119 }
120 }
121
122 self.transport_registry
124 .try_route(envelope, payload)
125 .map_err(|e| match e {
126 TransportError::MailboxFull => SendError::MailboxFull,
127 TransportError::ConnectionFailed => SendError::ConnectionFailed,
128 TransportError::Unroutable(_) => SendError::Unroutable,
129 TransportError::SerializationRequired
130 | TransportError::SerializationError(_)
131 | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
132 _ => SendError::ActorStopped,
133 })
134 }
135
136 pub fn send_many<M: Message + Clone>(
144 &self,
145 target: AddrHash,
146 msg: M,
147 n: usize,
148 ) -> Result<usize, SendError> {
149 if n == 0 {
150 return Ok(0);
151 }
152
153 let make_batch = || {
154 let mut batch = Vec::with_capacity(n);
155 for _ in 0..n {
156 batch.push(MailboxMessage {
157 envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
158 payload: MessagePayload::local(msg.clone()),
159 });
160 }
161 batch
162 };
163
164 if let Some(sender) = self.send_cache.get(&target) {
166 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
167 return match err {
168 None => Ok(sent),
169 Some(TransportError::MailboxFull) => Ok(sent),
170 _ => {
171 drop(sender);
172 self.send_cache.remove(&target);
173 Err(SendError::ActorStopped)
174 }
175 };
176 }
177
178 if let Some(sender) = self.transport_registry.get_local_sender(target) {
180 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
181 match err {
182 None => {
183 self.send_cache.insert(target, sender);
184 return Ok(sent);
185 }
186 Some(TransportError::MailboxFull) => {
187 self.send_cache.insert(target, sender);
188 return Ok(sent);
189 }
190 _ => return Err(SendError::ActorStopped),
191 }
192 }
193
194 let mut sent = 0usize;
196 for _ in 0..n {
197 match self.send_to::<M>(target, msg.clone()) {
198 Ok(()) => sent += 1,
199 Err(SendError::MailboxFull) => return Ok(sent),
200 Err(e) => return Err(e),
201 }
202 }
203 Ok(sent)
204 }
205
206 pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
208 where
209 R: Send + Sync,
210 {
211 let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
212 let ask_fn = make_ask_fn::<M, R>(
213 target,
214 self.source_addr,
215 self.transport.clone(),
216 self.transport_registry.clone(),
217 self.responses.clone(),
218 self.ask_timeout,
219 self.reactor.clone(),
220 );
221 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
222 }
223
224 pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
229 let registry = self.transport_registry.clone();
230 let source = self.source_addr;
231 let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
232 if registry.is_local(target) {
233 return Err(SendError::PolicyViolation);
234 }
235 let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
236 let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
237 registry
238 .try_route(envelope, MessagePayload::serialized(bytes))
239 .map_err(|e| match e {
240 TransportError::MailboxFull => SendError::MailboxFull,
241 TransportError::ConnectionFailed => SendError::ConnectionFailed,
242 TransportError::Unroutable(_) => SendError::Unroutable,
243 TransportError::SerializationRequired
244 | TransportError::SerializationError(_)
245 | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
246 _ => SendError::ActorStopped,
247 })
248 });
249 let ask_fn: palladium_actor::AskFn<M> =
250 Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
251 Box::pin(async { Err(AskError::Unbound) })
252 });
253 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
254 }
255
256 pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
270 where
271 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
272 R: Clone + Send + Sync,
273 {
274 let send_fn =
275 make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
276 let ask_fn = make_remote_ask_fn::<M, R>(
277 target,
278 self.source_addr,
279 self.transport_registry.clone(),
280 self.responses.clone(),
281 self.ask_timeout,
282 self.reactor.clone(),
283 Arc::clone(&self.pump_rx),
284 );
285 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
286 }
287
288 pub fn transport(&self) -> &Arc<InProcessTransport> {
289 &self.transport
290 }
291
292 pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
293 &self.transport_registry
294 }
295
296 pub async fn attach_quic_transport(
297 &self,
298 config: QuicTransportConfig,
299 peers: HashMap<EngineId, SocketAddr>,
300 ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
301 where
302 R: Clone,
303 N: Clone,
304 {
305 let transport = QuicTransport::bind(
306 config,
307 peers,
308 self.transport_registry.clone(),
309 self.type_registry.clone(),
310 self.reactor.clone(),
311 self.network.clone(),
312 )
313 .await?;
314 self.transport_registry.add_transport(transport.clone())?;
315 self.advertise_existing_actors(&transport)?;
316 Ok(transport)
317 }
318
319 pub async fn attach_tcp_transport(
320 &self,
321 config: TcpTransportConfig,
322 peers: HashMap<EngineId, SocketAddr>,
323 ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
324 where
325 R: Clone,
326 N: Clone,
327 {
328 let transport = TcpTransport::bind(
329 config,
330 peers,
331 self.transport_registry.clone(),
332 self.type_registry.clone(),
333 self.reactor.clone(),
334 self.network.clone(),
335 )
336 .await?;
337 self.transport_registry.add_transport(transport.clone())?;
338 self.advertise_existing_actors(&transport)?;
339 Ok(transport)
340 }
341
342 pub async fn attach_bounded_cluster(
343 &self,
344 config: BoundedClusterConfig,
345 ) -> Result<BoundedClusterHandle<R, N, F>, ClusterError>
346 where
347 R: Clone,
348 N: Clone,
349 {
350 validate_config(&config, &self.registry)?;
351
352 let local_engine_id = config.transport.engine_id().clone();
353 let peers = config.peers;
354 let declared_actors = config.declared_actors;
355 let roles = config.roles;
356
357 let control_plane = match &config.transport {
358 BoundedTransportConfig::Tcp(config) => ControlPlaneClientConfig {
359 protocol: ControlPlaneProtocol::Tcp,
360 tls: config.tls.clone(),
361 wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
362 },
363 BoundedTransportConfig::Quic(config) => ControlPlaneClientConfig {
364 protocol: ControlPlaneProtocol::Quic,
365 tls: config.tls.clone(),
366 wait_timeout: remote_spawn_wait_timeout(self.ask_timeout),
367 },
368 };
369
370 let transport = match config.transport {
371 BoundedTransportConfig::Tcp(config) => {
372 let transport = self
373 .attach_tcp_transport(config, HashMap::new())
374 .await
375 .map_err(ClusterError::Transport)?;
376 for peer in &peers {
377 if let Some(server_name) = &peer.server_name {
378 transport.add_peer_with_server_name(
379 peer.engine_id.clone(),
380 peer.addr,
381 server_name.clone(),
382 );
383 } else {
384 transport.add_peer(peer.engine_id.clone(), peer.addr);
385 }
386 }
387 self.advertise_existing_actors(&transport)
388 .map_err(ClusterError::Transport)?;
389 BoundedTransportHandle::Tcp(transport)
390 }
391 BoundedTransportConfig::Quic(config) => {
392 let transport = self
393 .attach_quic_transport(config, HashMap::new())
394 .await
395 .map_err(ClusterError::Transport)?;
396 for peer in &peers {
397 if let Some(server_name) = &peer.server_name {
398 transport.add_peer_with_server_name(
399 peer.engine_id.clone(),
400 peer.addr,
401 server_name.clone(),
402 );
403 } else {
404 transport.add_peer(peer.engine_id.clone(), peer.addr);
405 }
406 }
407 self.advertise_existing_actors(&transport)
408 .map_err(ClusterError::Transport)?;
409 BoundedTransportHandle::Quic(transport)
410 }
411 };
412
413 let mut cluster = BoundedClusterHandle::new(
414 self.clone(),
415 local_engine_id,
416 control_plane,
417 transport,
418 peers,
419 roles,
420 self.registry.clone(),
421 );
422 for declared in declared_actors {
423 cluster.declare_remote_actor(declared.owner, declared.path)?;
424 }
425 Ok(cluster)
426 }
427
428 pub fn remote_addr_for_path<M: RemoteMessage>(
433 &self,
434 path: &ActorPath,
435 ) -> palladium_actor::Addr<M>
436 where
437 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
438 R: Clone + Send + Sync,
439 {
440 self.remote_addr_for::<M>(AddrHash::new(path, 0))
441 }
442
443 pub fn remote_stable_addr_for_path<M: RemoteMessage>(&self, path: &ActorPath) -> StableAddr<M>
451 where
452 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
453 R: Clone + Send + Sync,
454 {
455 StableAddr::from_addr(self.remote_addr_for_path::<M>(path))
456 }
457
458 pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
462 self.registry.snapshot(&query, 0)
463 }
464
465 pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
467 self.registry.actor_info_by_path(path, 0)
468 }
469
470 pub fn snapshot(&self) -> EngineSnapshot {
472 let actors = self.actor_list(ActorQuery::default());
473 let uptime_secs = self.start_time.elapsed().as_secs();
474 EngineSnapshot {
475 num_cores: 1,
476 uptime_secs,
477 actors,
478 }
479 }
480
481 pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
483 self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
484 self.federated_routing
485 .as_ref()
486 .and_then(|routing| routing.resolve_optional(path))
487 })
488 }
489
490 pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
492 if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
493 return Ok(local);
494 }
495 if let Some(routing) = &self.federated_routing {
496 return routing.resolve(path);
497 }
498 Err(SendError::Unroutable)
499 }
500
501 pub fn stop_actor(&self, path: &ActorPath) -> bool {
507 if let Some(slot) = self.registry.get_by_path(path) {
508 slot.ctrl_tx
509 .send(LifecycleSignal::Stop(StopReason::Supervisor))
510 .ok();
511 true
512 } else {
513 false
514 }
515 }
516
517 pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
523 let addr = self.registry.get_by_path(path).map(|s| s.addr);
524 let Some(addr) = addr else { return 0 };
525 let src = AddrHash::synthetic(b"fault-fill-mailbox");
526 let mut count = 0;
527 loop {
528 let env = Envelope::new(src, addr, 0, 0);
529 match self.transport.try_deliver(env, MessagePayload::local(())) {
530 Ok(()) => count += 1,
531 Err(_) => break,
532 }
533 }
534 count
535 }
536
537 pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
539 let user_path = ActorPath::parse("/user").unwrap();
540 if let Some(slot) = self.registry.get_by_path(&user_path) {
541 slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
542 }
543 }
544
545 pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
563 where
564 M: Message,
565 R: Clone + Send + Sync + 'static,
566 G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
567 {
568 let user_path = ActorPath::parse("/user").unwrap();
569 let handle = self.clone();
570 crate::worker_pool::spawn_worker_pool(
571 &user_path,
572 config,
573 factory,
574 self.source_addr,
575 &self.transport,
576 &self.transport_registry,
577 &self.responses,
578 &self.reactor,
579 move |spec| handle.spawn_user_actor(spec),
580 )
581 }
582}