1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::fs::{FileSystem, TokioFileSystem};
7use dashmap::DashMap;
8use palladium_actor::{
9 Actor, ActorPath, AddrHash, AskError, ChildSpec, EngineId, Envelope, Message, MessagePayload,
10 PoolConfig, RemoteMessage, SendError, StableAddr, StopReason, WorkerPool,
11};
12use palladium_transport::network::{Network, TokioNetwork};
13use palladium_transport::{
14 InProcessTransport, MailboxMessage, MailboxReceiver, MailboxSender, QuicTransport,
15 QuicTransportConfig, TcpTransport, TcpTransportConfig, TransportError, TransportRegistry,
16 TypeRegistry,
17};
18
19use crate::addr::{make_ask_fn, make_remote_ask_fn, make_remote_send_fn, make_send_fn};
20use crate::bounded_cluster::{
21 validate_config, BoundedClusterConfig, BoundedClusterHandle, BoundedTransportConfig,
22 BoundedTransportHandle, ClusterError, ControlPlaneClientConfig, ControlPlaneProtocol,
23};
24use crate::common::LifecycleSignal;
25use crate::introspection::{ActorInfo, ActorQuery, EngineSnapshot};
26use crate::reactor::{Reactor, TokioReactor};
27use crate::registry::ActorRegistry;
28use crate::responses::ResponseRegistry;
29
30#[derive(Clone)]
32pub struct EngineHandle<
33 R: Reactor = TokioReactor,
34 N: Network = TokioNetwork,
35 F: FileSystem = TokioFileSystem,
36> {
37 pub(crate) transport: Arc<InProcessTransport>,
38 pub(crate) transport_registry: Arc<TransportRegistry>,
39 pub(crate) type_registry: TypeRegistry,
40 pub(crate) responses: Arc<ResponseRegistry>,
41 pub(crate) registry: Arc<ActorRegistry<R>>,
42 pub(crate) ask_timeout: Duration,
43 pub(crate) reactor: R,
44 pub(crate) network: N,
45 pub(crate) _fs: F,
46 pub(crate) start_time: Instant,
47 pub(crate) source_addr: AddrHash,
48 pub(crate) federated_routing: Option<Arc<crate::federation::FederatedRouting>>,
49 pub(crate) send_cache: Arc<DashMap<AddrHash, MailboxSender>>,
56 pub(crate) pump_rx: Arc<std::sync::Mutex<Option<MailboxReceiver>>>,
59}
60
61impl<R: Reactor, N: Network, F: FileSystem> EngineHandle<R, N, F> {
62 fn advertise_existing_actors<T: palladium_transport::Transport>(
63 &self,
64 transport: &Arc<T>,
65 ) -> Result<(), TransportError> {
66 for info in self.registry.snapshot(&ActorQuery::default(), 0) {
67 if info.state != crate::introspection::ActorState::Running {
68 continue;
69 }
70 if let Some(slot) = self.registry.get_by_path(&info.path) {
71 transport.advertise_path(slot.addr, &info.path)?;
72 }
73 }
74 Ok(())
75 }
76
77 pub fn type_registry(&self) -> TypeRegistry {
78 self.type_registry.clone()
79 }
80
81 pub fn send_to<M: Message>(&self, target: AddrHash, msg: M) -> Result<(), SendError> {
87 let envelope = Envelope::new(self.source_addr, target, M::TYPE_TAG, 0);
88 let payload = MessagePayload::local(msg);
89
90 if let Some(sender) = self.send_cache.get(&target) {
92 return match sender.try_send(MailboxMessage { envelope, payload }) {
93 Ok(()) => Ok(()),
94 Err(TransportError::MailboxFull) => Err(SendError::MailboxFull),
95 Err(_) => {
96 drop(sender); self.send_cache.remove(&target);
99 Err(SendError::ActorStopped)
100 }
101 };
102 }
103
104 if let Some(sender) = self.transport_registry.get_local_sender(target) {
106 let result = sender.try_send(MailboxMessage { envelope, payload });
107 match result {
108 Ok(()) => {
109 self.send_cache.insert(target, sender);
110 return Ok(());
111 }
112 Err(TransportError::MailboxFull) => {
113 self.send_cache.insert(target, sender);
115 return Err(SendError::MailboxFull);
116 }
117 Err(_) => return Err(SendError::ActorStopped),
118 }
119 }
120
121 self.transport_registry
123 .try_route(envelope, payload)
124 .map_err(|e| match e {
125 TransportError::MailboxFull => SendError::MailboxFull,
126 TransportError::ConnectionFailed => SendError::ConnectionFailed,
127 TransportError::Unroutable(_) => SendError::Unroutable,
128 TransportError::SerializationRequired
129 | TransportError::SerializationError(_)
130 | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
131 _ => SendError::ActorStopped,
132 })
133 }
134
135 pub fn send_many<M: Message + Clone>(
143 &self,
144 target: AddrHash,
145 msg: M,
146 n: usize,
147 ) -> Result<usize, SendError> {
148 if n == 0 {
149 return Ok(0);
150 }
151
152 let make_batch = || {
153 let mut batch = Vec::with_capacity(n);
154 for _ in 0..n {
155 batch.push(MailboxMessage {
156 envelope: Envelope::new(self.source_addr, target, M::TYPE_TAG, 0),
157 payload: MessagePayload::local(msg.clone()),
158 });
159 }
160 batch
161 };
162
163 if let Some(sender) = self.send_cache.get(&target) {
165 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
166 return match err {
167 None => Ok(sent),
168 Some(TransportError::MailboxFull) => Ok(sent),
169 _ => {
170 drop(sender);
171 self.send_cache.remove(&target);
172 Err(SendError::ActorStopped)
173 }
174 };
175 }
176
177 if let Some(sender) = self.transport_registry.get_local_sender(target) {
179 let (sent, err, _remaining) = sender.try_send_batch_owned(make_batch());
180 match err {
181 None => {
182 self.send_cache.insert(target, sender);
183 return Ok(sent);
184 }
185 Some(TransportError::MailboxFull) => {
186 self.send_cache.insert(target, sender);
187 return Ok(sent);
188 }
189 _ => return Err(SendError::ActorStopped),
190 }
191 }
192
193 let mut sent = 0usize;
195 for _ in 0..n {
196 match self.send_to::<M>(target, msg.clone()) {
197 Ok(()) => sent += 1,
198 Err(SendError::MailboxFull) => return Ok(sent),
199 Err(e) => return Err(e),
200 }
201 }
202 Ok(sent)
203 }
204
205 pub fn addr_for<M: Message>(&self, target: AddrHash) -> palladium_actor::Addr<M>
207 where
208 R: Send + Sync,
209 {
210 let send_fn = make_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
211 let ask_fn = make_ask_fn::<M, R>(
212 target,
213 self.source_addr,
214 self.transport.clone(),
215 self.transport_registry.clone(),
216 self.responses.clone(),
217 self.ask_timeout,
218 self.reactor.clone(),
219 );
220 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
221 }
222
223 pub fn remote_addr<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M> {
228 let registry = self.transport_registry.clone();
229 let source = self.source_addr;
230 let send_fn = Arc::new(move |msg: M| -> Result<(), SendError> {
231 if registry.is_local(target) {
232 return Err(SendError::PolicyViolation);
233 }
234 let bytes = bincode::serialize(&msg).map_err(|_| SendError::SerializationFailed)?;
235 let envelope = Envelope::new(source, target, M::TYPE_TAG, bytes.len() as u32);
236 registry
237 .try_route(envelope, MessagePayload::serialized(bytes))
238 .map_err(|e| match e {
239 TransportError::MailboxFull => SendError::MailboxFull,
240 TransportError::ConnectionFailed => SendError::ConnectionFailed,
241 TransportError::Unroutable(_) => SendError::Unroutable,
242 TransportError::SerializationRequired
243 | TransportError::SerializationError(_)
244 | TransportError::UnknownTypeTag(_) => SendError::SerializationFailed,
245 _ => SendError::ActorStopped,
246 })
247 });
248 let ask_fn: palladium_actor::AskFn<M> =
249 Arc::new(|_msg: M| -> palladium_actor::AskFuture<M> {
250 Box::pin(async { Err(AskError::Unbound) })
251 });
252 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
253 }
254
255 pub fn remote_addr_for<M: RemoteMessage>(&self, target: AddrHash) -> palladium_actor::Addr<M>
269 where
270 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
271 R: Clone + Send + Sync,
272 {
273 let send_fn =
274 make_remote_send_fn::<M>(target, self.source_addr, self.transport_registry.clone());
275 let ask_fn = make_remote_ask_fn::<M, R>(
276 target,
277 self.source_addr,
278 self.transport_registry.clone(),
279 self.responses.clone(),
280 self.ask_timeout,
281 self.reactor.clone(),
282 Arc::clone(&self.pump_rx),
283 );
284 palladium_actor::Addr::with_handlers(target, send_fn, ask_fn)
285 }
286
287 pub fn transport(&self) -> &Arc<InProcessTransport> {
288 &self.transport
289 }
290
291 pub fn transport_registry(&self) -> &Arc<TransportRegistry> {
292 &self.transport_registry
293 }
294
295 pub async fn attach_quic_transport(
296 &self,
297 config: QuicTransportConfig,
298 peers: HashMap<EngineId, SocketAddr>,
299 ) -> Result<Arc<QuicTransport<R, N>>, TransportError>
300 where
301 R: Clone,
302 N: Clone,
303 {
304 let transport = QuicTransport::bind(
305 config,
306 peers,
307 self.transport_registry.clone(),
308 self.type_registry.clone(),
309 self.reactor.clone(),
310 self.network.clone(),
311 )
312 .await?;
313 self.transport_registry.add_transport(transport.clone())?;
314 self.advertise_existing_actors(&transport)?;
315 Ok(transport)
316 }
317
318 pub async fn attach_tcp_transport(
319 &self,
320 config: TcpTransportConfig,
321 peers: HashMap<EngineId, SocketAddr>,
322 ) -> Result<Arc<TcpTransport<R, N>>, TransportError>
323 where
324 R: Clone,
325 N: Clone,
326 {
327 let transport = TcpTransport::bind(
328 config,
329 peers,
330 self.transport_registry.clone(),
331 self.type_registry.clone(),
332 self.reactor.clone(),
333 self.network.clone(),
334 )
335 .await?;
336 self.transport_registry.add_transport(transport.clone())?;
337 self.advertise_existing_actors(&transport)?;
338 Ok(transport)
339 }
340
341 pub async fn attach_bounded_cluster(
342 &self,
343 config: BoundedClusterConfig,
344 ) -> Result<BoundedClusterHandle<R, N, F>, ClusterError>
345 where
346 R: Clone,
347 N: Clone,
348 {
349 validate_config(&config, &self.registry)?;
350
351 let local_engine_id = config.transport.engine_id().clone();
352 let peers = config.peers;
353 let declared_actors = config.declared_actors;
354 let roles = config.roles;
355
356 let control_plane = match &config.transport {
357 BoundedTransportConfig::Tcp(config) => ControlPlaneClientConfig {
358 protocol: ControlPlaneProtocol::Tcp,
359 tls: config.tls.clone(),
360 wait_timeout: self.ask_timeout,
361 },
362 BoundedTransportConfig::Quic(config) => ControlPlaneClientConfig {
363 protocol: ControlPlaneProtocol::Quic,
364 tls: config.tls.clone(),
365 wait_timeout: self.ask_timeout,
366 },
367 };
368
369 let transport = match config.transport {
370 BoundedTransportConfig::Tcp(config) => {
371 let transport = self
372 .attach_tcp_transport(config, HashMap::new())
373 .await
374 .map_err(ClusterError::Transport)?;
375 for peer in &peers {
376 if let Some(server_name) = &peer.server_name {
377 transport.add_peer_with_server_name(
378 peer.engine_id.clone(),
379 peer.addr,
380 server_name.clone(),
381 );
382 } else {
383 transport.add_peer(peer.engine_id.clone(), peer.addr);
384 }
385 }
386 self.advertise_existing_actors(&transport)
387 .map_err(ClusterError::Transport)?;
388 BoundedTransportHandle::Tcp(transport)
389 }
390 BoundedTransportConfig::Quic(config) => {
391 let transport = self
392 .attach_quic_transport(config, HashMap::new())
393 .await
394 .map_err(ClusterError::Transport)?;
395 for peer in &peers {
396 if let Some(server_name) = &peer.server_name {
397 transport.add_peer_with_server_name(
398 peer.engine_id.clone(),
399 peer.addr,
400 server_name.clone(),
401 );
402 } else {
403 transport.add_peer(peer.engine_id.clone(), peer.addr);
404 }
405 }
406 self.advertise_existing_actors(&transport)
407 .map_err(ClusterError::Transport)?;
408 BoundedTransportHandle::Quic(transport)
409 }
410 };
411
412 let mut cluster = BoundedClusterHandle::new(
413 self.clone(),
414 local_engine_id,
415 control_plane,
416 transport,
417 peers,
418 roles,
419 self.registry.clone(),
420 );
421 for declared in declared_actors {
422 cluster.declare_remote_actor(declared.owner, declared.path)?;
423 }
424 Ok(cluster)
425 }
426
427 pub fn remote_addr_for_path<M: RemoteMessage>(
432 &self,
433 path: &ActorPath,
434 ) -> palladium_actor::Addr<M>
435 where
436 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
437 R: Clone + Send + Sync,
438 {
439 self.remote_addr_for::<M>(AddrHash::new(path, 0))
440 }
441
442 pub fn remote_stable_addr_for_path<M: RemoteMessage>(&self, path: &ActorPath) -> StableAddr<M>
450 where
451 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
452 R: Clone + Send + Sync,
453 {
454 StableAddr::from_addr(self.remote_addr_for_path::<M>(path))
455 }
456
457 pub fn actor_list(&self, query: ActorQuery) -> Vec<ActorInfo> {
461 self.registry.snapshot(&query, 0)
462 }
463
464 pub fn actor_info(&self, path: &ActorPath) -> Option<ActorInfo> {
466 self.registry.actor_info_by_path(path, 0)
467 }
468
469 pub fn snapshot(&self) -> EngineSnapshot {
471 let actors = self.actor_list(ActorQuery::default());
472 let uptime_secs = self.start_time.elapsed().as_secs();
473 EngineSnapshot {
474 num_cores: 1,
475 uptime_secs,
476 actors,
477 }
478 }
479
480 pub fn lookup_path(&self, path: &ActorPath) -> Option<AddrHash> {
482 self.registry.get_by_path(path).map(|s| s.addr).or_else(|| {
483 self.federated_routing
484 .as_ref()
485 .and_then(|routing| routing.resolve_optional(path))
486 })
487 }
488
489 pub fn resolve_path(&self, path: &ActorPath) -> Result<AddrHash, SendError> {
491 if let Some(local) = self.registry.get_by_path(path).map(|s| s.addr) {
492 return Ok(local);
493 }
494 if let Some(routing) = &self.federated_routing {
495 return routing.resolve(path);
496 }
497 Err(SendError::Unroutable)
498 }
499
500 pub fn stop_actor(&self, path: &ActorPath) -> bool {
506 if let Some(slot) = self.registry.get_by_path(path) {
507 slot.ctrl_tx
508 .send(LifecycleSignal::Stop(StopReason::Supervisor))
509 .ok();
510 true
511 } else {
512 false
513 }
514 }
515
516 pub fn fill_mailbox(&self, path: &ActorPath) -> usize {
522 let addr = self.registry.get_by_path(path).map(|s| s.addr);
523 let Some(addr) = addr else { return 0 };
524 let src = AddrHash::synthetic(b"fault-fill-mailbox");
525 let mut count = 0;
526 loop {
527 let env = Envelope::new(src, addr, 0, 0);
528 match self.transport.try_deliver(env, MessagePayload::local(())) {
529 Ok(()) => count += 1,
530 Err(_) => break,
531 }
532 }
533 count
534 }
535
536 pub fn spawn_user_actor(&self, spec: ChildSpec<R>) {
538 let user_path = ActorPath::parse("/user").unwrap();
539 if let Some(slot) = self.registry.get_by_path(&user_path) {
540 slot.ctrl_tx.send(LifecycleSignal::SpawnChild(spec)).ok();
541 }
542 }
543
544 pub fn spawn_worker_pool<M, G>(&self, config: &PoolConfig, factory: G) -> WorkerPool<M>
562 where
563 M: Message,
564 R: Clone + Send + Sync + 'static,
565 G: Fn() -> Box<dyn Actor<R>> + Send + Sync + Clone + 'static,
566 {
567 let user_path = ActorPath::parse("/user").unwrap();
568 let handle = self.clone();
569 crate::worker_pool::spawn_worker_pool(
570 &user_path,
571 config,
572 factory,
573 self.source_addr,
574 &self.transport,
575 &self.transport_registry,
576 &self.responses,
577 &self.reactor,
578 move |spec| handle.spawn_user_actor(spec),
579 )
580 }
581}