1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use dashmap::DashMap;
10use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
11use palladium_transport::network::{Network, TokioNetwork};
12use palladium_transport::{
13 mailbox, QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport,
14 TransportError,
15};
16use rustls::pki_types::ServerName;
17use serde_json::{json, Value};
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
19use tokio_rustls::TlsConnector;
20
21trait SpawnPeerInstaller: Send + Sync {
22 fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>);
23}
24
25impl<R: Reactor + Clone, N: Network + Clone> SpawnPeerInstaller for BoundedTransportHandle<R, N> {
26 fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>) {
27 let source_addr = AddrHash::synthetic(engine_id.as_str().as_bytes());
28 match server_name {
29 Some(server_name) => self.add_peer(&PeerSpec {
30 engine_id: engine_id.clone(),
31 addr,
32 control_plane_addr: None,
33 server_name: Some(server_name),
34 }),
35 None => self.add_peer(&PeerSpec {
36 engine_id: engine_id.clone(),
37 addr,
38 control_plane_addr: None,
39 server_name: None,
40 }),
41 }
42 match self {
43 Self::Tcp(transport) => {
44 let _ = transport.register_remote_source_addr(engine_id, source_addr);
45 }
46 Self::Quic(transport) => {
47 let _ = transport.register_remote_source_addr(engine_id, source_addr);
48 }
49 }
50 }
51}
52
53fn spawn_peer_installers() -> &'static DashMap<String, Arc<dyn SpawnPeerInstaller>> {
54 static INSTALLERS: std::sync::OnceLock<DashMap<String, Arc<dyn SpawnPeerInstaller>>> =
55 std::sync::OnceLock::new();
56 INSTALLERS.get_or_init(DashMap::new)
57}
58
59pub(crate) fn register_spawn_peer_installer<R: Reactor + Clone, N: Network + Clone>(
60 engine_id: &EngineId,
61 transport: &BoundedTransportHandle<R, N>,
62) {
63 spawn_peer_installers().insert(
64 engine_id.as_str().to_string(),
65 Arc::new(transport.clone()) as Arc<dyn SpawnPeerInstaller>,
66 );
67}
68
69pub(crate) fn maybe_register_spawn_caller_peer(
70 host_engine_id: &EngineId,
71 caller_engine_id: &EngineId,
72 caller_transport_addr: SocketAddr,
73 caller_server_name: Option<String>,
74) {
75 if let Some(installer) = spawn_peer_installers().get(host_engine_id.as_str()) {
76 installer.ensure_peer(
77 caller_engine_id.clone(),
78 caller_transport_addr,
79 caller_server_name,
80 );
81 }
82}
83
84#[derive(Clone)]
85pub struct BoundedClusterConfig {
86 pub transport: BoundedTransportConfig,
87 pub peers: Vec<PeerSpec>,
88 pub declared_actors: Vec<DeclaredRemoteActor>,
89 pub roles: Vec<BoundedClusterRole>,
90 pub placement_policies: Vec<BoundedPlacementPolicy>,
91}
92
93impl BoundedClusterConfig {
94 pub fn validate(&self) -> Result<(), ClusterError> {
95 validate_static_config(self)
96 }
97}
98
99#[derive(Clone)]
100pub enum BoundedTransportConfig {
101 Tcp(TcpTransportConfig),
102 Quic(QuicTransportConfig),
103}
104
105impl BoundedTransportConfig {
106 pub fn engine_id(&self) -> &EngineId {
107 match self {
108 Self::Tcp(config) => &config.engine_id,
109 Self::Quic(config) => &config.engine_id,
110 }
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq)]
115pub struct PeerSpec {
116 pub engine_id: EngineId,
117 pub addr: SocketAddr,
118 pub control_plane_addr: Option<SocketAddr>,
119 pub server_name: Option<String>,
120}
121
122#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct DeclaredRemoteActor {
124 pub path: ActorPath,
125 pub owner: EngineId,
126}
127
128#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct BoundedClusterRole {
130 pub name: String,
131 pub owner: EngineId,
132}
133
134#[derive(Clone, Debug, PartialEq, Eq)]
135pub struct BoundedPlacementPolicy {
136 pub actor_class: String,
137 pub role: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Eq)]
141pub enum ClusterError {
142 DuplicatePeer(EngineId),
143 LocalPeerDeclared(EngineId),
144 DuplicateDeclaredActor(ActorPath),
145 LocalActorDeclaredRemote(ActorPath),
146 UnknownPeerOwner { path: ActorPath, owner: EngineId },
147 DuplicateRole(String),
148 UnknownRoleOwner { role: String, owner: EngineId },
149 RoleOwnerMissingControlPlaneEndpoint { role: String, owner: EngineId },
150 DuplicatePlacementClass(String),
151 UnknownPlacementRole { actor_class: String, role: String },
152 Transport(TransportError),
153}
154
155#[derive(Clone, Debug, PartialEq)]
156pub struct RemoteSpawnSpec {
157 pub path: ActorPath,
158 pub type_name: String,
159 pub config: Option<Value>,
160}
161
162#[derive(Clone, Debug, PartialEq, Eq)]
163pub enum RemoteSpawnError {
164 UnknownPeer(EngineId),
165 UnknownRole(String),
166 UnknownPlacementClass(String),
167 LocalPathCollision(ActorPath),
168 RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
169 RemotePathAlreadyRunning(ActorPath),
170 MissingControlPlaneEndpoint(EngineId),
171 InvalidServerName(String),
172 ControlPlane(String),
173 SpawnTimedOut(ActorPath),
174}
175
176#[derive(Clone, Debug, PartialEq, Eq)]
177pub enum RemoteSupervisionState {
178 Running { restart_count: u32 },
179 Restarting { restart_count: u32 },
180 ActorUnavailable,
181 SupervisorUnavailable,
182 TransportUnavailable,
183}
184
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct RemoteSupervisionStatus {
187 pub path: ActorPath,
188 pub supervisor: EngineId,
189 pub state: RemoteSupervisionState,
190}
191
192#[derive(Clone, Debug, PartialEq, Eq)]
193pub enum RemoteSupervisionError {
194 UnknownRemoteActor(ActorPath),
195 UnknownPeer(EngineId),
196 MissingControlPlaneEndpoint(EngineId),
197}
198
199#[derive(Clone)]
200pub(crate) enum ControlPlaneProtocol {
201 Tcp,
202 Quic,
203}
204
205#[derive(Clone)]
206pub(crate) struct ControlPlaneClientConfig {
207 pub(crate) protocol: ControlPlaneProtocol,
208 pub(crate) tls: palladium_transport::TlsConfig,
209 pub(crate) wait_timeout: Duration,
210}
211
212#[derive(Clone)]
213pub(crate) struct BoundedClusterTopology {
214 pub(crate) peers: Vec<PeerSpec>,
215 pub(crate) roles: Vec<BoundedClusterRole>,
216 pub(crate) placement_policies: Vec<BoundedPlacementPolicy>,
217}
218
219const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
220const REMOTE_SUPERVISION_PROBE_TIMEOUT: Duration = Duration::from_millis(150);
221
222impl From<TransportError> for ClusterError {
223 fn from(value: TransportError) -> Self {
224 Self::Transport(value)
225 }
226}
227
228impl From<TransportError> for RemoteSpawnError {
229 fn from(value: TransportError) -> Self {
230 Self::ControlPlane(format!("transport route install failed: {value:?}"))
231 }
232}
233
234#[derive(Clone)]
235pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
236 Tcp(Arc<TcpTransport<R, N>>),
237 Quic(Arc<QuicTransport<R, N>>),
238}
239
240impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
241 pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
242 match self {
243 Self::Tcp(transport) => Some(transport),
244 Self::Quic(_) => None,
245 }
246 }
247
248 pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
249 match self {
250 Self::Tcp(_) => None,
251 Self::Quic(transport) => Some(transport),
252 }
253 }
254
255 pub fn add_peer(&self, peer: &PeerSpec) {
256 match self {
257 Self::Tcp(transport) => {
258 if let Some(server_name) = &peer.server_name {
259 transport.add_peer_with_server_name(
260 peer.engine_id.clone(),
261 peer.addr,
262 server_name.clone(),
263 );
264 } else {
265 transport.add_peer(peer.engine_id.clone(), peer.addr);
266 }
267 }
268 Self::Quic(transport) => {
269 if let Some(server_name) = &peer.server_name {
270 transport.add_peer_with_server_name(
271 peer.engine_id.clone(),
272 peer.addr,
273 server_name.clone(),
274 );
275 } else {
276 transport.add_peer(peer.engine_id.clone(), peer.addr);
277 }
278 }
279 }
280 }
281
282 pub fn declare_remote_path(
283 &self,
284 owner: EngineId,
285 path: &ActorPath,
286 ) -> Result<(), TransportError> {
287 match self {
288 Self::Tcp(transport) => transport.declare_remote_path(owner, path),
289 Self::Quic(transport) => transport.declare_remote_path(owner, path),
290 }
291 }
292
293 pub fn advertise_path(&self, addr: AddrHash, path: &ActorPath) -> Result<(), TransportError> {
294 match self {
295 Self::Tcp(transport) => transport.advertise_path(addr, path),
296 Self::Quic(transport) => transport.advertise_path(addr, path),
297 }
298 }
299
300 pub fn register_source_addr(&self, addr: AddrHash) -> Result<(), TransportError> {
301 match self {
302 Self::Tcp(transport) => transport.register(addr, mailbox(1).0),
303 Self::Quic(transport) => transport.register(addr, mailbox(1).0),
304 }
305 }
306
307 pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
308 match self {
309 Self::Tcp(transport) => transport.undeclare_remote_path(path),
310 Self::Quic(transport) => transport.undeclare_remote_path(path),
311 }
312 }
313
314 pub fn can_route(&self, destination: AddrHash) -> bool {
315 match self {
316 Self::Tcp(transport) => transport.can_route(destination),
317 Self::Quic(transport) => transport.can_route(destination),
318 }
319 }
320
321 pub async fn wait_for_peer_connection(
322 &self,
323 engine_id: &EngineId,
324 timeout: Duration,
325 ) -> Result<(), TransportError> {
326 match self {
327 Self::Tcp(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
328 Self::Quic(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
329 }
330 }
331}
332
333#[derive(Clone)]
334pub struct BoundedClusterHandle<
335 R: Reactor = TokioReactor,
336 N: Network = TokioNetwork,
337 F: FileSystem = TokioFileSystem,
338> {
339 engine: EngineHandle<R, N, F>,
340 local_engine_id: EngineId,
341 control_plane: ControlPlaneClientConfig,
342 transport: BoundedTransportHandle<R, N>,
343 peers: Vec<PeerSpec>,
344 declared_actors: Vec<DeclaredRemoteActor>,
345 roles: Vec<BoundedClusterRole>,
346 placement_policies: Vec<BoundedPlacementPolicy>,
347 supervised_owners: Arc<DashMap<String, EngineId>>,
348 registry: Arc<ActorRegistry<R>>,
349}
350
351impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
352 pub(crate) fn new(
353 engine: EngineHandle<R, N, F>,
354 local_engine_id: EngineId,
355 control_plane: ControlPlaneClientConfig,
356 transport: BoundedTransportHandle<R, N>,
357 topology: BoundedClusterTopology,
358 registry: Arc<ActorRegistry<R>>,
359 ) -> Self {
360 Self {
361 engine,
362 local_engine_id,
363 control_plane,
364 transport,
365 peers: topology.peers,
366 declared_actors: Vec::new(),
367 roles: topology.roles,
368 placement_policies: topology.placement_policies,
369 supervised_owners: Arc::new(DashMap::new()),
370 registry,
371 }
372 }
373
374 pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
375 &self.transport
376 }
377
378 pub fn peers(&self) -> &[PeerSpec] {
379 &self.peers
380 }
381
382 pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
383 &self.declared_actors
384 }
385
386 pub fn roles(&self) -> &[BoundedClusterRole] {
387 &self.roles
388 }
389
390 pub fn placement_policies(&self) -> &[BoundedPlacementPolicy] {
391 &self.placement_policies
392 }
393
394 pub(crate) fn advertise_existing_local_routes(&self) -> Result<(), ClusterError> {
395 self.transport
396 .register_source_addr(self.engine.source_addr)
397 .map_err(ClusterError::Transport)?;
398
399 for info in self
400 .registry
401 .snapshot(&crate::introspection::ActorQuery::default(), 0)
402 {
403 if info.state != crate::introspection::ActorState::Running {
404 continue;
405 }
406 if let Some(slot) = self.registry.get_by_path(&info.path) {
407 self.transport
408 .advertise_path(slot.addr, &info.path)
409 .map_err(ClusterError::Transport)?;
410 }
411 }
412 Ok(())
413 }
414
415 pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
416 validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
417 self.transport.add_peer(&peer);
418 self.advertise_existing_local_routes()?;
419 self.peers.push(peer);
420 Ok(())
421 }
422
423 pub fn declare_remote_actor(
424 &mut self,
425 owner: EngineId,
426 path: ActorPath,
427 ) -> Result<(), ClusterError> {
428 validate_declared_actor_topology(
429 &path,
430 &owner,
431 &self.local_engine_id,
432 &self.peers,
433 &self.declared_actors,
434 )?;
435 validate_declared_actor_registry(
436 &path,
437 &owner,
438 &self.local_engine_id,
439 &self.declared_actors,
440 &self.registry,
441 )?;
442 self.transport.declare_remote_path(owner.clone(), &path)?;
443 self.supervised_owners
444 .insert(path.as_str().to_string(), owner.clone());
445 self.declared_actors
446 .push(DeclaredRemoteActor { path, owner });
447 Ok(())
448 }
449
450 pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
451 self.transport.undeclare_remote_path(path)?;
452 self.supervised_owners.remove(path.as_str());
453 self.declared_actors
454 .retain(|declared| &declared.path != path);
455 Ok(())
456 }
457
458 pub fn can_route(&self, path: &ActorPath) -> bool {
459 self.transport.can_route(AddrHash::new(path, 0))
460 }
461
462 pub async fn remote_supervision_status(
463 &self,
464 path: &ActorPath,
465 ) -> Result<RemoteSupervisionStatus, RemoteSupervisionError> {
466 let supervisor = self
467 .supervised_owners
468 .get(path.as_str())
469 .map(|entry| entry.clone())
470 .or_else(|| {
471 self.declared_actors
472 .iter()
473 .find(|declared| declared.path == *path)
474 .map(|declared| declared.owner.clone())
475 })
476 .ok_or_else(|| RemoteSupervisionError::UnknownRemoteActor(path.clone()))?;
477
478 let peer = self
479 .peers
480 .iter()
481 .find(|peer| peer.engine_id == supervisor)
482 .ok_or_else(|| RemoteSupervisionError::UnknownPeer(supervisor.clone()))?;
483 let control_plane_addr = peer.control_plane_addr.ok_or_else(|| {
484 RemoteSupervisionError::MissingControlPlaneEndpoint(supervisor.clone())
485 })?;
486 let server_name = peer
487 .server_name
488 .clone()
489 .unwrap_or_else(|| supervisor.as_str().to_string());
490
491 if call_control_plane(
492 &self.control_plane,
493 control_plane_addr,
494 &server_name,
495 "engine.status",
496 json!({}),
497 )
498 .await
499 .is_err()
500 {
501 return Ok(RemoteSupervisionStatus {
502 path: path.clone(),
503 supervisor,
504 state: RemoteSupervisionState::SupervisorUnavailable,
505 });
506 }
507
508 match call_control_plane(
509 &self.control_plane,
510 control_plane_addr,
511 &server_name,
512 "actor.info",
513 json!({ "path": path.as_str() }),
514 )
515 .await
516 {
517 Ok(info) => {
518 let restart_count = info
519 .get("restart_count")
520 .and_then(Value::as_u64)
521 .unwrap_or(0) as u32;
522 let state = match info.get("state").and_then(Value::as_str) {
523 Some("Stopped") => RemoteSupervisionState::Restarting { restart_count },
524 _ => match self
525 .transport
526 .wait_for_peer_connection(&supervisor, REMOTE_SUPERVISION_PROBE_TIMEOUT)
527 .await
528 {
529 Ok(()) => RemoteSupervisionState::Running { restart_count },
530 Err(_) => RemoteSupervisionState::TransportUnavailable,
531 },
532 };
533 Ok(RemoteSupervisionStatus {
534 path: path.clone(),
535 supervisor,
536 state,
537 })
538 }
539 Err(RemoteSpawnError::ControlPlane(message))
540 if message.contains("actor not found:") || message.contains("actor not found") =>
541 {
542 Ok(RemoteSupervisionStatus {
543 path: path.clone(),
544 supervisor,
545 state: RemoteSupervisionState::ActorUnavailable,
546 })
547 }
548 Err(_) => Ok(RemoteSupervisionStatus {
549 path: path.clone(),
550 supervisor,
551 state: RemoteSupervisionState::SupervisorUnavailable,
552 }),
553 }
554 }
555
556 pub async fn spawn_remote_on_role<M>(
557 &self,
558 role: &str,
559 spec: RemoteSpawnSpec,
560 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
561 where
562 M: RemoteMessage,
563 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
564 R: Send + Sync,
565 {
566 let owner = self
567 .roles
568 .iter()
569 .find(|declared| declared.name == role)
570 .map(|declared| declared.owner.clone())
571 .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
572 self.spawn_remote::<M>(owner, spec).await
573 }
574
575 pub async fn spawn_remote_for_class<M>(
576 &self,
577 actor_class: &str,
578 spec: RemoteSpawnSpec,
579 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
580 where
581 M: RemoteMessage,
582 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
583 R: Send + Sync,
584 {
585 let role = self
586 .placement_policies
587 .iter()
588 .find(|policy| policy.actor_class == actor_class)
589 .map(|policy| policy.role.as_str())
590 .ok_or_else(|| RemoteSpawnError::UnknownPlacementClass(actor_class.to_string()))?;
591 self.spawn_remote_on_role::<M>(role, spec).await
592 }
593
594 pub async fn spawn_remote<M>(
595 &self,
596 target: EngineId,
597 spec: RemoteSpawnSpec,
598 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
599 where
600 M: RemoteMessage,
601 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
602 R: Send + Sync,
603 {
604 validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
605
606 let peer = self
607 .peers
608 .iter()
609 .find(|peer| peer.engine_id == target)
610 .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
611 let control_plane_addr = peer
612 .control_plane_addr
613 .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
614 let server_name = peer
615 .server_name
616 .clone()
617 .unwrap_or_else(|| target.as_str().to_string());
618 let caller_server_name = self.local_engine_id.as_str().to_string();
619
620 let deadline = Instant::now() + self.control_plane.wait_timeout;
621 loop {
622 match call_actor_spawn(
623 &self.control_plane,
624 control_plane_addr,
625 &server_name,
626 &self.local_engine_id,
627 transport_listen_addr(&self.transport),
628 &caller_server_name,
629 &spec,
630 )
631 .await
632 {
633 Ok(()) => break,
634 Err(RemoteSpawnError::ControlPlane(message))
635 if is_retryable_control_plane_error(&message) && Instant::now() <= deadline =>
636 {
637 tokio::time::sleep(Duration::from_millis(20)).await;
638 }
639 Err(err) => return Err(err),
640 }
641 }
642
643 while Instant::now() <= deadline {
644 if actor_is_spawned(
645 &self.control_plane,
646 control_plane_addr,
647 &server_name,
648 &spec.path,
649 )
650 .await?
651 {
652 self.transport
653 .declare_remote_path(target.clone(), &spec.path)
654 .map_err(RemoteSpawnError::from)?;
655 self.supervised_owners
656 .insert(spec.path.as_str().to_string(), target.clone());
657 self.transport
658 .wait_for_peer_connection(&target, self.control_plane.wait_timeout)
659 .await
660 .map_err(RemoteSpawnError::from)?;
661 return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
662 }
663 tokio::time::sleep(Duration::from_millis(20)).await;
664 }
665
666 Err(RemoteSpawnError::SpawnTimedOut(spec.path))
667 }
668}
669
670async fn call_actor_spawn(
671 client: &ControlPlaneClientConfig,
672 addr: SocketAddr,
673 server_name: &str,
674 caller_engine_id: &EngineId,
675 caller_transport_addr: SocketAddr,
676 caller_server_name: &str,
677 spec: &RemoteSpawnSpec,
678) -> Result<(), RemoteSpawnError> {
679 let params = match &spec.config {
680 Some(config) => json!({
681 "path": spec.path.as_str(),
682 "type_name": spec.type_name,
683 "config": config,
684 "caller_engine_id": caller_engine_id.as_str(),
685 "caller_transport_addr": caller_transport_addr,
686 "caller_server_name": caller_server_name,
687 }),
688 None => json!({
689 "path": spec.path.as_str(),
690 "type_name": spec.type_name,
691 "caller_engine_id": caller_engine_id.as_str(),
692 "caller_transport_addr": caller_transport_addr,
693 "caller_server_name": caller_server_name,
694 }),
695 };
696 match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
697 Ok(_) => {}
698 Err(RemoteSpawnError::ControlPlane(message))
699 if message.contains("actor already running at") =>
700 {
701 return Err(RemoteSpawnError::RemotePathAlreadyRunning(
702 spec.path.clone(),
703 ));
704 }
705 Err(err) => return Err(err),
706 }
707 Ok(())
708}
709
710fn transport_listen_addr<R: Reactor + Clone, N: Network + Clone>(
711 transport: &BoundedTransportHandle<R, N>,
712) -> SocketAddr {
713 match transport {
714 BoundedTransportHandle::Tcp(transport) => transport.local_addr(),
715 BoundedTransportHandle::Quic(transport) => transport.local_addr(),
716 }
717}
718
719fn is_retryable_control_plane_error(message: &str) -> bool {
720 let msg = message.to_ascii_lowercase();
721 msg.contains("connection refused")
722 || msg.contains("connection reset")
723 || msg.contains("timed out")
724 || msg.contains("broken pipe")
725 || msg.contains("not connected")
726}
727
728fn validate_remote_spawn_request<R: Reactor>(
729 path: &ActorPath,
730 owner: &EngineId,
731 declared_actors: &[DeclaredRemoteActor],
732 registry: &Arc<ActorRegistry<R>>,
733) -> Result<(), RemoteSpawnError> {
734 if registry.get_by_path(path).is_some() {
735 return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
736 }
737
738 if let Some(declared) = declared_actors
739 .iter()
740 .find(|declared| declared.path == *path && declared.owner != *owner)
741 {
742 return Err(RemoteSpawnError::RemoteOwnershipConflict {
743 path: path.clone(),
744 owner: declared.owner.clone(),
745 });
746 }
747
748 Ok(())
749}
750
751async fn actor_is_spawned(
752 client: &ControlPlaneClientConfig,
753 addr: SocketAddr,
754 server_name: &str,
755 path: &ActorPath,
756) -> Result<bool, RemoteSpawnError> {
757 match call_control_plane(
758 client,
759 addr,
760 server_name,
761 "actor.info",
762 json!({ "path": path.as_str() }),
763 )
764 .await
765 {
766 Ok(_) => Ok(true),
767 Err(RemoteSpawnError::ControlPlane(message))
768 if message.contains("actor not found:")
769 || message.contains("actor not found")
770 || message.contains("User supervisor not available") =>
771 {
772 Ok(false)
773 }
774 Err(err) => Err(err),
775 }
776}
777
778async fn call_control_plane(
779 client: &ControlPlaneClientConfig,
780 addr: SocketAddr,
781 server_name: &str,
782 method: &str,
783 params: Value,
784) -> Result<Value, RemoteSpawnError> {
785 let request = json!({
786 "id": 1,
787 "method": method,
788 "params": params,
789 });
790
791 match client.protocol {
792 ControlPlaneProtocol::Tcp => {
793 call_control_plane_tcp(addr, server_name, &client.tls, &request).await
794 }
795 ControlPlaneProtocol::Quic => {
796 call_control_plane_quic(addr, server_name, &client.tls, &request).await
797 }
798 }
799}
800
801async fn call_control_plane_tcp(
802 addr: SocketAddr,
803 server_name: &str,
804 tls: &palladium_transport::TlsConfig,
805 request: &Value,
806) -> Result<Value, RemoteSpawnError> {
807 let config = tls
808 .client_config(server_name)
809 .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
810 let name = ServerName::try_from(server_name.to_string())
811 .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
812 let stream = tokio::net::TcpStream::connect(addr)
813 .await
814 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
815 let connector = TlsConnector::from(config);
816 let tls_stream = connector
817 .connect(name, stream)
818 .await
819 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
820
821 let (reader, mut writer) = tokio::io::split(tls_stream);
822 let mut body = request.to_string();
823 body.push('\n');
824 writer
825 .write_all(body.as_bytes())
826 .await
827 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
828 let mut line = String::new();
829 let mut reader = BufReader::new(reader);
830 reader
831 .read_line(&mut line)
832 .await
833 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
834 parse_control_plane_response(&line)
835}
836
837async fn call_control_plane_quic(
838 addr: SocketAddr,
839 server_name: &str,
840 tls: &palladium_transport::TlsConfig,
841 request: &Value,
842) -> Result<Value, RemoteSpawnError> {
843 ensure_rustls_provider();
844 let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
845 RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
846 })?)
847 .clone();
848 client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
849 let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
850
851 let client = s2n_quic::Client::builder()
852 .with_tls(client_tls)
853 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
854 .with_io("0.0.0.0:0")
855 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
856 .start()
857 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
858
859 let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
860 let mut conn = client
861 .connect(connect)
862 .await
863 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
864 let mut stream = conn
865 .open_bidirectional_stream()
866 .await
867 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
868
869 let mut body = request.to_string();
870 body.push('\n');
871 stream
872 .write_all(body.as_bytes())
873 .await
874 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
875 stream
876 .close()
877 .await
878 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
879
880 let mut buf = Vec::new();
881 stream
882 .read_to_end(&mut buf)
883 .await
884 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
885 let line = String::from_utf8_lossy(&buf);
886 parse_control_plane_response(&line)
887}
888
889fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
890 let response: Value = serde_json::from_str(line.trim())
891 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
892 if let Some(error) = response.get("error") {
893 let message = error
894 .get("message")
895 .and_then(Value::as_str)
896 .unwrap_or("unknown control-plane error");
897 return Err(RemoteSpawnError::ControlPlane(message.to_string()));
898 }
899 Ok(response.get("result").cloned().unwrap_or(Value::Null))
900}
901
902fn ensure_rustls_provider() {
903 static INIT: std::sync::Once = std::sync::Once::new();
904 INIT.call_once(|| {
905 #[cfg(feature = "aws-lc-rs")]
906 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
907 #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
908 let _ = rustls::crypto::ring::default_provider().install_default();
909 });
910}
911
912pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
913 ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
914}
915
916pub(crate) fn validate_config<R: Reactor>(
917 config: &BoundedClusterConfig,
918 registry: &Arc<ActorRegistry<R>>,
919) -> Result<(), ClusterError> {
920 validate_static_config(config)?;
921
922 let local_engine_id = config.transport.engine_id();
923 let mut seen_declared = Vec::new();
924 for declared in &config.declared_actors {
925 validate_declared_actor_registry(
926 &declared.path,
927 &declared.owner,
928 local_engine_id,
929 &seen_declared,
930 registry,
931 )?;
932 seen_declared.push(declared.clone());
933 }
934
935 Ok(())
936}
937
938fn validate_static_config(config: &BoundedClusterConfig) -> Result<(), ClusterError> {
939 let local_engine_id = config.transport.engine_id();
940 let mut seen_peers = Vec::new();
941 for peer in &config.peers {
942 validate_peer_spec(peer, local_engine_id, &seen_peers)?;
943 seen_peers.push(peer.clone());
944 }
945
946 let mut seen_declared = Vec::new();
947 for declared in &config.declared_actors {
948 validate_declared_actor_topology(
949 &declared.path,
950 &declared.owner,
951 local_engine_id,
952 &config.peers,
953 &seen_declared,
954 )?;
955 seen_declared.push(declared.clone());
956 }
957
958 let mut seen_roles = Vec::new();
959 for role in &config.roles {
960 validate_role(role, &config.peers, &seen_roles)?;
961 seen_roles.push(role.clone());
962 }
963
964 let mut seen_placement_policies = Vec::new();
965 for policy in &config.placement_policies {
966 validate_placement_policy(policy, &config.roles, &seen_placement_policies)?;
967 seen_placement_policies.push(policy.clone());
968 }
969
970 Ok(())
971}
972
973fn validate_peer_spec(
974 peer: &PeerSpec,
975 local_engine_id: &EngineId,
976 existing: &[PeerSpec],
977) -> Result<(), ClusterError> {
978 if &peer.engine_id == local_engine_id {
979 return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
980 }
981 if existing
982 .iter()
983 .any(|existing| existing.engine_id == peer.engine_id)
984 {
985 return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
986 }
987 Ok(())
988}
989
990fn validate_declared_actor_topology(
991 path: &ActorPath,
992 owner: &EngineId,
993 local_engine_id: &EngineId,
994 peers: &[PeerSpec],
995 existing: &[DeclaredRemoteActor],
996) -> Result<(), ClusterError> {
997 if owner == local_engine_id {
998 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
999 }
1000 if existing.iter().any(|existing| existing.path == *path) {
1001 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
1002 }
1003 if !peers.iter().any(|peer| peer.engine_id == *owner) {
1004 return Err(ClusterError::UnknownPeerOwner {
1005 path: path.clone(),
1006 owner: owner.clone(),
1007 });
1008 }
1009 Ok(())
1010}
1011
1012fn validate_declared_actor_registry<R: Reactor>(
1013 path: &ActorPath,
1014 owner: &EngineId,
1015 local_engine_id: &EngineId,
1016 existing: &[DeclaredRemoteActor],
1017 registry: &Arc<ActorRegistry<R>>,
1018) -> Result<(), ClusterError> {
1019 if owner == local_engine_id || registry.get_by_path(path).is_some() {
1020 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
1021 }
1022 if existing.iter().any(|existing| existing.path == *path) {
1023 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
1024 }
1025 Ok(())
1026}
1027
1028fn validate_role(
1029 role: &BoundedClusterRole,
1030 peers: &[PeerSpec],
1031 existing: &[BoundedClusterRole],
1032) -> Result<(), ClusterError> {
1033 if existing.iter().any(|existing| existing.name == role.name) {
1034 return Err(ClusterError::DuplicateRole(role.name.clone()));
1035 }
1036 if !peers.iter().any(|peer| peer.engine_id == role.owner) {
1037 return Err(ClusterError::UnknownRoleOwner {
1038 role: role.name.clone(),
1039 owner: role.owner.clone(),
1040 });
1041 }
1042 if !peers
1043 .iter()
1044 .any(|peer| peer.engine_id == role.owner && peer.control_plane_addr.is_some())
1045 {
1046 return Err(ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1047 role: role.name.clone(),
1048 owner: role.owner.clone(),
1049 });
1050 }
1051 Ok(())
1052}
1053
1054fn validate_placement_policy(
1055 policy: &BoundedPlacementPolicy,
1056 roles: &[BoundedClusterRole],
1057 existing: &[BoundedPlacementPolicy],
1058) -> Result<(), ClusterError> {
1059 if existing
1060 .iter()
1061 .any(|existing| existing.actor_class == policy.actor_class)
1062 {
1063 return Err(ClusterError::DuplicatePlacementClass(
1064 policy.actor_class.clone(),
1065 ));
1066 }
1067 if !roles.iter().any(|role| role.name == policy.role) {
1068 return Err(ClusterError::UnknownPlacementRole {
1069 actor_class: policy.actor_class.clone(),
1070 role: policy.role.clone(),
1071 });
1072 }
1073 Ok(())
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::{
1079 call_control_plane, validate_config, BoundedClusterConfig, BoundedClusterRole,
1080 BoundedPlacementPolicy, BoundedTransportConfig, ClusterError, DeclaredRemoteActor,
1081 PeerSpec, RemoteSpawnError, RemoteSpawnSpec, RemoteSupervisionState,
1082 };
1083 use crate::{Engine, EngineConfig};
1084 use palladium_actor::{
1085 Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
1086 Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
1087 };
1088 use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
1089 use rcgen::{
1090 BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
1091 KeyUsagePurpose,
1092 };
1093 use serde_json::json;
1094 use std::net::{SocketAddr, TcpListener, UdpSocket};
1095 use std::sync::Arc;
1096 use std::time::{Duration, Instant};
1097
1098 fn make_ca() -> (Certificate, KeyPair) {
1099 let mut params = CertificateParams::default();
1100 params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
1101 params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
1102 let ca_key = KeyPair::generate().expect("ca keypair");
1103 let cert = params.self_signed(&ca_key).expect("ca cert");
1104 (cert, ca_key)
1105 }
1106
1107 fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
1108 let keypair = KeyPair::generate().expect("leaf keypair");
1109 let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
1110 let mut params = params;
1111 params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
1112 params.extended_key_usages = vec![
1113 ExtendedKeyUsagePurpose::ServerAuth,
1114 ExtendedKeyUsagePurpose::ClientAuth,
1115 ];
1116 let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
1117 let cert_der = cert.der().to_vec();
1118 let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
1119 let ca_der = ca.der().to_vec();
1120 TlsConfig {
1121 cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
1122 private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
1123 trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
1124 }
1125 }
1126
1127 fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
1128 TcpTransportConfig {
1129 engine_id: EngineId::new(engine_id),
1130 listen_addr: "127.0.0.1:0".parse().unwrap(),
1131 max_connections_per_peer: 1,
1132 idle_timeout: Duration::from_secs(1),
1133 send_buffer_size: 8,
1134 nodelay: true,
1135 tls,
1136 send_delay: Duration::from_millis(0),
1137 }
1138 }
1139
1140 fn reserve_tcp_addr() -> SocketAddr {
1141 let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
1142 let addr = listener.local_addr().expect("tcp local addr");
1143 drop(listener);
1144 addr
1145 }
1146
1147 fn reserve_udp_addr() -> SocketAddr {
1148 let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
1149 let addr = socket.local_addr().expect("udp local addr");
1150 drop(socket);
1151 addr
1152 }
1153
1154 #[test]
1155 fn bounded_cluster_config_rejects_duplicate_peers() {
1156 let (ca, ca_key) = make_ca();
1157 let engine = Engine::with_config(EngineConfig {
1158 engine_id: EngineId::new("engine-a.example"),
1159 ..Default::default()
1160 });
1161 let config = BoundedClusterConfig {
1162 transport: BoundedTransportConfig::Tcp(tcp_config(
1163 "engine-a.example",
1164 make_tls_config("engine-a.example", &ca, &ca_key),
1165 )),
1166 peers: vec![
1167 PeerSpec {
1168 engine_id: EngineId::new("engine-b.example"),
1169 addr: "127.0.0.1:4100".parse().unwrap(),
1170 control_plane_addr: None,
1171 server_name: None,
1172 },
1173 PeerSpec {
1174 engine_id: EngineId::new("engine-b.example"),
1175 addr: "127.0.0.1:4200".parse().unwrap(),
1176 control_plane_addr: None,
1177 server_name: None,
1178 },
1179 ],
1180 declared_actors: Vec::new(),
1181 roles: Vec::new(),
1182 placement_policies: Vec::new(),
1183 };
1184
1185 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1186 assert_eq!(
1187 err,
1188 ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
1189 );
1190 }
1191
1192 #[test]
1193 fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
1194 let (ca, ca_key) = make_ca();
1195 let engine = Engine::with_config(EngineConfig {
1196 engine_id: EngineId::new("engine-a.example"),
1197 ..Default::default()
1198 });
1199 let config = BoundedClusterConfig {
1200 transport: BoundedTransportConfig::Tcp(tcp_config(
1201 "engine-a.example",
1202 make_tls_config("engine-a.example", &ca, &ca_key),
1203 )),
1204 peers: vec![PeerSpec {
1205 engine_id: EngineId::new("engine-b.example"),
1206 addr: "127.0.0.1:4100".parse().unwrap(),
1207 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1208 server_name: None,
1209 }],
1210 declared_actors: vec![DeclaredRemoteActor {
1211 path: ActorPath::parse("/user/missing-owner").unwrap(),
1212 owner: EngineId::new("engine-c.example"),
1213 }],
1214 roles: Vec::new(),
1215 placement_policies: Vec::new(),
1216 };
1217
1218 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1219 assert_eq!(
1220 err,
1221 ClusterError::UnknownPeerOwner {
1222 path: ActorPath::parse("/user/missing-owner").unwrap(),
1223 owner: EngineId::new("engine-c.example"),
1224 }
1225 );
1226 }
1227
1228 #[test]
1229 fn bounded_cluster_config_rejects_duplicate_roles() {
1230 let (ca, ca_key) = make_ca();
1231 let engine = Engine::with_config(EngineConfig {
1232 engine_id: EngineId::new("engine-a.example"),
1233 ..Default::default()
1234 });
1235 let config = BoundedClusterConfig {
1236 transport: BoundedTransportConfig::Tcp(tcp_config(
1237 "engine-a.example",
1238 make_tls_config("engine-a.example", &ca, &ca_key),
1239 )),
1240 peers: vec![PeerSpec {
1241 engine_id: EngineId::new("engine-b.example"),
1242 addr: "127.0.0.1:4100".parse().unwrap(),
1243 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1244 server_name: None,
1245 }],
1246 declared_actors: Vec::new(),
1247 roles: vec![
1248 BoundedClusterRole {
1249 name: "context-service".to_string(),
1250 owner: EngineId::new("engine-b.example"),
1251 },
1252 BoundedClusterRole {
1253 name: "context-service".to_string(),
1254 owner: EngineId::new("engine-b.example"),
1255 },
1256 ],
1257 placement_policies: Vec::new(),
1258 };
1259
1260 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1261 assert_eq!(
1262 err,
1263 ClusterError::DuplicateRole("context-service".to_string())
1264 );
1265 }
1266
1267 #[test]
1268 fn bounded_cluster_config_rejects_unknown_role_owner() {
1269 let (ca, ca_key) = make_ca();
1270 let engine = Engine::with_config(EngineConfig {
1271 engine_id: EngineId::new("engine-a.example"),
1272 ..Default::default()
1273 });
1274 let config = BoundedClusterConfig {
1275 transport: BoundedTransportConfig::Tcp(tcp_config(
1276 "engine-a.example",
1277 make_tls_config("engine-a.example", &ca, &ca_key),
1278 )),
1279 peers: vec![PeerSpec {
1280 engine_id: EngineId::new("engine-b.example"),
1281 addr: "127.0.0.1:4100".parse().unwrap(),
1282 control_plane_addr: None,
1283 server_name: None,
1284 }],
1285 declared_actors: Vec::new(),
1286 roles: vec![BoundedClusterRole {
1287 name: "gateway".to_string(),
1288 owner: EngineId::new("engine-c.example"),
1289 }],
1290 placement_policies: Vec::new(),
1291 };
1292
1293 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1294 assert_eq!(
1295 err,
1296 ClusterError::UnknownRoleOwner {
1297 role: "gateway".to_string(),
1298 owner: EngineId::new("engine-c.example"),
1299 }
1300 );
1301 }
1302
1303 #[test]
1304 fn bounded_cluster_config_rejects_role_owner_without_control_plane() {
1305 let (ca, ca_key) = make_ca();
1306 let config = BoundedClusterConfig {
1307 transport: BoundedTransportConfig::Tcp(tcp_config(
1308 "engine-a.example",
1309 make_tls_config("engine-a.example", &ca, &ca_key),
1310 )),
1311 peers: vec![PeerSpec {
1312 engine_id: EngineId::new("engine-b.example"),
1313 addr: "127.0.0.1:4100".parse().unwrap(),
1314 control_plane_addr: None,
1315 server_name: None,
1316 }],
1317 declared_actors: Vec::new(),
1318 roles: vec![BoundedClusterRole {
1319 name: "session-host".to_string(),
1320 owner: EngineId::new("engine-b.example"),
1321 }],
1322 placement_policies: Vec::new(),
1323 };
1324
1325 let err = config.validate().unwrap_err();
1326 assert_eq!(
1327 err,
1328 ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1329 role: "session-host".to_string(),
1330 owner: EngineId::new("engine-b.example"),
1331 }
1332 );
1333 }
1334
1335 #[test]
1336 fn bounded_cluster_config_rejects_duplicate_placement_classes() {
1337 let (ca, ca_key) = make_ca();
1338 let config = BoundedClusterConfig {
1339 transport: BoundedTransportConfig::Tcp(tcp_config(
1340 "engine-a.example",
1341 make_tls_config("engine-a.example", &ca, &ca_key),
1342 )),
1343 peers: vec![PeerSpec {
1344 engine_id: EngineId::new("engine-b.example"),
1345 addr: "127.0.0.1:4100".parse().unwrap(),
1346 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1347 server_name: None,
1348 }],
1349 declared_actors: Vec::new(),
1350 roles: vec![BoundedClusterRole {
1351 name: "context-service".to_string(),
1352 owner: EngineId::new("engine-b.example"),
1353 }],
1354 placement_policies: vec![
1355 BoundedPlacementPolicy {
1356 actor_class: "SessionActor".to_string(),
1357 role: "context-service".to_string(),
1358 },
1359 BoundedPlacementPolicy {
1360 actor_class: "SessionActor".to_string(),
1361 role: "context-service".to_string(),
1362 },
1363 ],
1364 };
1365
1366 let err = config.validate().unwrap_err();
1367 assert_eq!(
1368 err,
1369 ClusterError::DuplicatePlacementClass("SessionActor".to_string())
1370 );
1371 }
1372
1373 #[test]
1374 fn bounded_cluster_config_rejects_unknown_placement_role() {
1375 let (ca, ca_key) = make_ca();
1376 let config = BoundedClusterConfig {
1377 transport: BoundedTransportConfig::Tcp(tcp_config(
1378 "engine-a.example",
1379 make_tls_config("engine-a.example", &ca, &ca_key),
1380 )),
1381 peers: vec![PeerSpec {
1382 engine_id: EngineId::new("engine-b.example"),
1383 addr: "127.0.0.1:4100".parse().unwrap(),
1384 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1385 server_name: None,
1386 }],
1387 declared_actors: Vec::new(),
1388 roles: vec![BoundedClusterRole {
1389 name: "context-service".to_string(),
1390 owner: EngineId::new("engine-b.example"),
1391 }],
1392 placement_policies: vec![BoundedPlacementPolicy {
1393 actor_class: "SessionActor".to_string(),
1394 role: "missing-role".to_string(),
1395 }],
1396 };
1397
1398 let err = config.validate().unwrap_err();
1399 assert_eq!(
1400 err,
1401 ClusterError::UnknownPlacementRole {
1402 actor_class: "SessionActor".to_string(),
1403 role: "missing-role".to_string(),
1404 }
1405 );
1406 }
1407
1408 #[tokio::test]
1409 async fn attach_bounded_cluster_rejects_role_owner_without_control_plane() {
1410 let (ca, ca_key) = make_ca();
1411 let engine = Engine::with_config(EngineConfig {
1412 engine_id: EngineId::new("engine-a.example"),
1413 ..Default::default()
1414 });
1415
1416 let err = match engine
1417 .handle()
1418 .attach_bounded_cluster(BoundedClusterConfig {
1419 transport: BoundedTransportConfig::Tcp(tcp_config(
1420 "engine-a.example",
1421 make_tls_config("engine-a.example", &ca, &ca_key),
1422 )),
1423 peers: vec![PeerSpec {
1424 engine_id: EngineId::new("engine-b.example"),
1425 addr: "127.0.0.1:4100".parse().unwrap(),
1426 control_plane_addr: None,
1427 server_name: None,
1428 }],
1429 declared_actors: Vec::new(),
1430 roles: vec![BoundedClusterRole {
1431 name: "session-host".to_string(),
1432 owner: EngineId::new("engine-b.example"),
1433 }],
1434 placement_policies: Vec::new(),
1435 })
1436 .await
1437 {
1438 Ok(_) => panic!("expected bounded cluster attach to reject missing control plane"),
1439 Err(err) => err,
1440 };
1441
1442 assert_eq!(
1443 err,
1444 ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1445 role: "session-host".to_string(),
1446 owner: EngineId::new("engine-b.example"),
1447 }
1448 );
1449 }
1450
1451 #[test]
1452 fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
1453 assert_eq!(
1454 super::remote_spawn_wait_timeout(Duration::from_millis(25)),
1455 Duration::from_secs(5)
1456 );
1457 assert_eq!(
1458 super::remote_spawn_wait_timeout(Duration::from_secs(8)),
1459 Duration::from_secs(8)
1460 );
1461 }
1462
1463 fn tcp_tests_available() -> bool {
1464 if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
1465 return false;
1466 }
1467 std::net::TcpListener::bind("127.0.0.1:0").is_ok()
1468 }
1469
1470 fn quic_tests_available() -> bool {
1471 super::ensure_rustls_provider();
1472 rustls::crypto::CryptoProvider::get_default().is_some()
1473 }
1474
1475 async fn wait_for_supervision_state<R, N, F>(
1476 cluster: &super::BoundedClusterHandle<R, N, F>,
1477 path: &ActorPath,
1478 expected: RemoteSupervisionState,
1479 ) where
1480 R: crate::reactor::Reactor + Clone + Send + Sync,
1481 N: palladium_transport::network::Network + Clone,
1482 F: crate::fs::FileSystem + Clone,
1483 {
1484 let deadline = Instant::now() + Duration::from_secs(5);
1485 loop {
1486 let status = cluster
1487 .remote_supervision_status(path)
1488 .await
1489 .expect("supervision status");
1490 if status.state == expected {
1491 return;
1492 }
1493 if Instant::now() > deadline {
1494 panic!(
1495 "supervision status did not reach expected state: got {:?}, expected {:?}",
1496 status.state, expected
1497 );
1498 }
1499 tokio::time::sleep(Duration::from_millis(25)).await;
1500 }
1501 }
1502
1503 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1504 struct EchoRequest {
1505 value: u64,
1506 }
1507
1508 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1509 struct EchoResponse {
1510 doubled: u64,
1511 }
1512
1513 impl Message for EchoRequest {
1514 type Response = EchoResponse;
1515 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
1516 }
1517
1518 struct EchoActor;
1519
1520 impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
1521 fn on_message(
1522 &mut self,
1523 ctx: &mut ActorContext<R>,
1524 envelope: &Envelope,
1525 payload: MessagePayload,
1526 ) -> Result<(), ActorError> {
1527 let req = payload
1528 .extract::<EchoRequest>()
1529 .map_err(|_| ActorError::Handler)?;
1530 let resp = EchoResponse {
1531 doubled: req.value * 2,
1532 };
1533 let resp_env = envelope.response(0);
1534 ctx.send_raw(resp_env, MessagePayload::local(resp))
1535 .map_err(|_| ActorError::Handler)
1536 }
1537 }
1538
1539 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1540 struct CrashRequest;
1541
1542 impl Message for CrashRequest {
1543 type Response = ();
1544 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
1545 }
1546
1547 struct SpawnedEchoActor;
1548
1549 impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
1550 fn on_message(
1551 &mut self,
1552 ctx: &mut ActorContext<R>,
1553 envelope: &Envelope,
1554 payload: MessagePayload,
1555 ) -> Result<(), ActorError> {
1556 match envelope.type_tag {
1557 EchoRequest::TYPE_TAG => {
1558 let req = payload
1559 .extract::<EchoRequest>()
1560 .map_err(|_| ActorError::Handler)?;
1561 let resp = EchoResponse {
1562 doubled: req.value * 2,
1563 };
1564 let resp_env = envelope.response(0);
1565 ctx.send_raw(resp_env, MessagePayload::local(resp))
1566 .map_err(|_| ActorError::Handler)
1567 }
1568 CrashRequest::TYPE_TAG => Err(ActorError::Handler),
1569 _ => Err(ActorError::Handler),
1570 }
1571 }
1572 }
1573
1574 fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
1575 Arc::new(|type_name: &str, _config: &[u8]| match type_name {
1576 "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
1577 _ => Err(format!("unknown remote spawn type: {type_name}")),
1578 })
1579 }
1580
1581 #[tokio::test]
1582 async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
1583 if !tcp_tests_available() {
1584 eprintln!("skipping tcp integration tests: network bind not permitted");
1585 return;
1586 }
1587
1588 let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
1589 let (ca, ca_key) = make_ca();
1590 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1591 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1592
1593 let mut engine_a = Engine::with_config(EngineConfig {
1594 engine_id: EngineId::new("engine-a.example"),
1595 ..Default::default()
1596 });
1597 let engine_b = Engine::with_config(EngineConfig {
1598 engine_id: EngineId::new("engine-b.example"),
1599 ..Default::default()
1600 });
1601
1602 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1603 engine_a.add_user_actor(ChildSpec::new(
1604 "cluster-echo",
1605 RestartPolicy::Permanent,
1606 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1607 ns,
1608 move || Box::new(EchoActor),
1609 ));
1610
1611 let handle_a = engine_a.handle();
1612 let handle_b = engine_b.handle();
1613 handle_a
1614 .type_registry()
1615 .register_remote_ask::<EchoRequest>();
1616 handle_b
1617 .type_registry()
1618 .register_remote_ask::<EchoRequest>();
1619
1620 let cluster_a = handle_a
1621 .attach_bounded_cluster(BoundedClusterConfig {
1622 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1623 peers: Vec::new(),
1624 declared_actors: Vec::new(),
1625 roles: Vec::new(),
1626 placement_policies: Vec::new(),
1627 })
1628 .await
1629 .expect("attach bounded cluster a");
1630 let tcp_a = cluster_a
1631 .transport()
1632 .as_tcp()
1633 .cloned()
1634 .expect("tcp transport a");
1635
1636 let cluster_b = handle_b
1637 .attach_bounded_cluster(BoundedClusterConfig {
1638 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1639 peers: vec![PeerSpec {
1640 engine_id: EngineId::new("engine-a.example"),
1641 addr: tcp_a.local_addr(),
1642 control_plane_addr: None,
1643 server_name: None,
1644 }],
1645 declared_actors: Vec::new(),
1646 roles: Vec::new(),
1647 placement_policies: Vec::new(),
1648 })
1649 .await
1650 .expect("attach bounded cluster b");
1651 let tcp_b = cluster_b
1652 .transport()
1653 .as_tcp()
1654 .cloned()
1655 .expect("tcp transport b");
1656
1657 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1658 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1659 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1660
1661 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1662 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1663
1664 let canonical = AddrHash::new(&actor_path, 0);
1665 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1666 while !cluster_b.can_route(&actor_path) {
1667 if std::time::Instant::now() > deadline {
1668 panic!("bounded cluster route did not propagate");
1669 }
1670 tokio::time::sleep(Duration::from_millis(20)).await;
1671 }
1672 assert!(tcp_b.can_route(canonical));
1673
1674 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1675 let response = remote
1676 .ask(EchoRequest { value: 12 })
1677 .await
1678 .expect("bounded cluster ask");
1679 assert_eq!(response, EchoResponse { doubled: 24 });
1680
1681 shutdown_a_tx.send(()).ok();
1682 shutdown_b_tx.send(()).ok();
1683 }
1684
1685 #[tokio::test]
1686 async fn bounded_cluster_add_peer_after_start_backfills_existing_tcp_routes() {
1687 if !tcp_tests_available() {
1688 eprintln!("skipping tcp integration tests: network bind not permitted");
1689 return;
1690 }
1691
1692 let actor_path = ActorPath::parse("/user/cluster-echo-add-peer").unwrap();
1693 let (ca, ca_key) = make_ca();
1694 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1695 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1696
1697 let mut engine_a = Engine::with_config(EngineConfig {
1698 engine_id: EngineId::new("engine-a.example"),
1699 ..Default::default()
1700 });
1701 let engine_b = Engine::with_config(EngineConfig {
1702 engine_id: EngineId::new("engine-b.example"),
1703 ..Default::default()
1704 });
1705
1706 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1707 engine_a.add_user_actor(ChildSpec::new(
1708 "cluster-echo-add-peer",
1709 RestartPolicy::Permanent,
1710 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1711 ns,
1712 move || Box::new(EchoActor),
1713 ));
1714
1715 let handle_a = engine_a.handle();
1716 let handle_b = engine_b.handle();
1717 handle_a
1718 .type_registry()
1719 .register_remote_ask::<EchoRequest>();
1720 handle_b
1721 .type_registry()
1722 .register_remote_ask::<EchoRequest>();
1723
1724 let mut cluster_a = handle_a
1725 .attach_bounded_cluster(BoundedClusterConfig {
1726 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1727 peers: Vec::new(),
1728 declared_actors: Vec::new(),
1729 roles: Vec::new(),
1730 placement_policies: Vec::new(),
1731 })
1732 .await
1733 .expect("attach bounded cluster a");
1734 let tcp_a = cluster_a
1735 .transport()
1736 .as_tcp()
1737 .cloned()
1738 .expect("tcp transport a");
1739
1740 let cluster_b = handle_b
1741 .attach_bounded_cluster(BoundedClusterConfig {
1742 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1743 peers: vec![PeerSpec {
1744 engine_id: EngineId::new("engine-a.example"),
1745 addr: tcp_a.local_addr(),
1746 control_plane_addr: None,
1747 server_name: None,
1748 }],
1749 declared_actors: Vec::new(),
1750 roles: Vec::new(),
1751 placement_policies: Vec::new(),
1752 })
1753 .await
1754 .expect("attach bounded cluster b");
1755 let tcp_b = cluster_b
1756 .transport()
1757 .as_tcp()
1758 .cloned()
1759 .expect("tcp transport b");
1760
1761 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1762 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1763
1764 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1765 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1766
1767 let local_ready_deadline = std::time::Instant::now() + Duration::from_secs(5);
1768 while handle_a.registry.get_by_path(&actor_path).is_none() {
1769 if std::time::Instant::now() > local_ready_deadline {
1770 panic!("local actor did not start before peer add");
1771 }
1772 tokio::time::sleep(Duration::from_millis(20)).await;
1773 }
1774
1775 cluster_a
1776 .add_peer(PeerSpec {
1777 engine_id: EngineId::new("engine-b.example"),
1778 addr: tcp_b.local_addr(),
1779 control_plane_addr: None,
1780 server_name: None,
1781 })
1782 .expect("add peer b to cluster a after start");
1783
1784 let canonical = AddrHash::new(&actor_path, 0);
1785 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1786 while !cluster_b.can_route(&actor_path) {
1787 if std::time::Instant::now() > deadline {
1788 panic!("bounded cluster route did not propagate after add_peer");
1789 }
1790 tokio::time::sleep(Duration::from_millis(20)).await;
1791 }
1792 assert!(tcp_b.can_route(canonical));
1793
1794 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1795 let response = remote
1796 .ask(EchoRequest { value: 15 })
1797 .await
1798 .expect("bounded cluster ask after add_peer");
1799 assert_eq!(response, EchoResponse { doubled: 30 });
1800
1801 shutdown_a_tx.send(()).ok();
1802 shutdown_b_tx.send(()).ok();
1803 }
1804
1805 #[tokio::test]
1806 async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1807 if !tcp_tests_available() {
1808 eprintln!("skipping tcp integration tests: network bind not permitted");
1809 return;
1810 }
1811
1812 let (ca, ca_key) = make_ca();
1813 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1814 let engine = Engine::with_config(EngineConfig {
1815 engine_id: EngineId::new("engine-a.example"),
1816 ..Default::default()
1817 });
1818 let handle = engine.handle();
1819
1820 let mut cluster = handle
1821 .attach_bounded_cluster(BoundedClusterConfig {
1822 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1823 peers: vec![PeerSpec {
1824 engine_id: EngineId::new("engine-b.example"),
1825 addr: "127.0.0.1:4100".parse().unwrap(),
1826 control_plane_addr: None,
1827 server_name: None,
1828 }],
1829 declared_actors: Vec::new(),
1830 roles: Vec::new(),
1831 placement_policies: Vec::new(),
1832 })
1833 .await
1834 .expect("attach bounded cluster");
1835
1836 let path = ActorPath::parse("/user/declared-remote").unwrap();
1837 cluster
1838 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1839 .expect("declare remote actor");
1840
1841 assert!(cluster.can_route(&path));
1842 assert!(cluster
1843 .transport()
1844 .as_tcp()
1845 .expect("tcp transport")
1846 .can_route(AddrHash::new(&path, 0)));
1847 }
1848
1849 #[tokio::test]
1850 async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1851 if !tcp_tests_available() {
1852 eprintln!("skipping tcp integration tests: network bind not permitted");
1853 return;
1854 }
1855
1856 let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1857 let (ca, ca_key) = make_ca();
1858 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1859 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1860 let control_plane_b = reserve_tcp_addr();
1861
1862 let mut engine_a = Engine::with_config(EngineConfig {
1863 engine_id: EngineId::new("engine-a.example"),
1864 ..Default::default()
1865 });
1866 let engine_b = Engine::with_config(EngineConfig {
1867 engine_id: EngineId::new("engine-b.example"),
1868 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1869 control_plane_tls: Some(tls_b.clone()),
1870 actor_spawn: Some(bounded_spawn_handler()),
1871 ..Default::default()
1872 });
1873
1874 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1875 engine_a.add_user_actor(ChildSpec::new(
1876 "local-remote-collision",
1877 RestartPolicy::Permanent,
1878 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1879 ns,
1880 move || Box::new(EchoActor),
1881 ));
1882
1883 let handle_a = engine_a.handle();
1884 let handle_b = engine_b.handle();
1885
1886 let cluster_b = handle_b
1887 .attach_bounded_cluster(BoundedClusterConfig {
1888 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1889 peers: Vec::new(),
1890 declared_actors: Vec::new(),
1891 roles: Vec::new(),
1892 placement_policies: Vec::new(),
1893 })
1894 .await
1895 .expect("attach bounded cluster b");
1896 let tcp_b = cluster_b
1897 .transport()
1898 .as_tcp()
1899 .cloned()
1900 .expect("tcp transport b");
1901
1902 let cluster_a = handle_a
1903 .attach_bounded_cluster(BoundedClusterConfig {
1904 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1905 peers: vec![PeerSpec {
1906 engine_id: EngineId::new("engine-b.example"),
1907 addr: tcp_b.local_addr(),
1908 control_plane_addr: Some(control_plane_b),
1909 server_name: None,
1910 }],
1911 declared_actors: Vec::new(),
1912 roles: vec![BoundedClusterRole {
1913 name: "context-service".to_string(),
1914 owner: EngineId::new("engine-b.example"),
1915 }],
1916 placement_policies: Vec::new(),
1917 })
1918 .await
1919 .expect("attach bounded cluster a");
1920
1921 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1922 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1923
1924 let deadline = Instant::now() + Duration::from_secs(5);
1925 while handle_a.registry.get_by_path(&actor_path).is_none() {
1926 if Instant::now() > deadline {
1927 panic!("local collision actor did not start");
1928 }
1929 tokio::time::sleep(Duration::from_millis(20)).await;
1930 }
1931
1932 let err = cluster_a
1933 .spawn_remote_on_role::<EchoRequest>(
1934 "context-service",
1935 RemoteSpawnSpec {
1936 path: actor_path.clone(),
1937 type_name: "bounded-echo".to_string(),
1938 config: None,
1939 },
1940 )
1941 .await
1942 .expect_err("local path collision should fail");
1943 assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1944
1945 shutdown_a_tx.send(()).ok();
1946 }
1947
1948 #[tokio::test]
1949 async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1950 if !tcp_tests_available() {
1951 eprintln!("skipping tcp integration tests: network bind not permitted");
1952 return;
1953 }
1954
1955 let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1956 let (ca, ca_key) = make_ca();
1957 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1958 let engine = Engine::with_config(EngineConfig {
1959 engine_id: EngineId::new("engine-a.example"),
1960 ..Default::default()
1961 });
1962 let handle = engine.handle();
1963
1964 let cluster = handle
1965 .attach_bounded_cluster(BoundedClusterConfig {
1966 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1967 peers: vec![
1968 PeerSpec {
1969 engine_id: EngineId::new("engine-b.example"),
1970 addr: "127.0.0.1:4100".parse().unwrap(),
1971 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1972 server_name: None,
1973 },
1974 PeerSpec {
1975 engine_id: EngineId::new("engine-c.example"),
1976 addr: "127.0.0.1:4200".parse().unwrap(),
1977 control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1978 server_name: None,
1979 },
1980 ],
1981 declared_actors: vec![DeclaredRemoteActor {
1982 path: actor_path.clone(),
1983 owner: EngineId::new("engine-b.example"),
1984 }],
1985 roles: Vec::new(),
1986 placement_policies: Vec::new(),
1987 })
1988 .await
1989 .expect("attach bounded cluster");
1990
1991 let err = cluster
1992 .spawn_remote::<EchoRequest>(
1993 EngineId::new("engine-c.example"),
1994 RemoteSpawnSpec {
1995 path: actor_path.clone(),
1996 type_name: "bounded-echo".to_string(),
1997 config: None,
1998 },
1999 )
2000 .await
2001 .expect_err("conflicting declared owner should fail");
2002 assert_eq!(
2003 err,
2004 RemoteSpawnError::RemoteOwnershipConflict {
2005 path: actor_path,
2006 owner: EngineId::new("engine-b.example"),
2007 }
2008 );
2009 }
2010
2011 #[tokio::test]
2012 async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
2013 if !tcp_tests_available() {
2014 eprintln!("skipping tcp integration tests: network bind not permitted");
2015 return;
2016 }
2017
2018 let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
2019 let (ca, ca_key) = make_ca();
2020 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2021 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2022 let control_plane_b = reserve_tcp_addr();
2023
2024 let engine_a = Engine::with_config(EngineConfig {
2025 engine_id: EngineId::new("engine-a.example"),
2026 ..Default::default()
2027 });
2028 let engine_b = Engine::with_config(EngineConfig {
2029 engine_id: EngineId::new("engine-b.example"),
2030 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2031 control_plane_tls: Some(tls_b.clone()),
2032 actor_spawn: Some(bounded_spawn_handler()),
2033 ..Default::default()
2034 });
2035
2036 let handle_a = engine_a.handle();
2037 let handle_b = engine_b.handle();
2038 handle_a
2039 .type_registry()
2040 .register_remote_ask::<EchoRequest>();
2041 handle_b
2042 .type_registry()
2043 .register_remote_ask::<EchoRequest>();
2044
2045 let cluster_a = handle_a
2046 .attach_bounded_cluster(BoundedClusterConfig {
2047 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2048 peers: Vec::new(),
2049 declared_actors: Vec::new(),
2050 roles: Vec::new(),
2051 placement_policies: Vec::new(),
2052 })
2053 .await
2054 .expect("attach bounded cluster a");
2055 let tcp_a = cluster_a
2056 .transport()
2057 .as_tcp()
2058 .cloned()
2059 .expect("tcp transport a");
2060
2061 let cluster_b = handle_b
2062 .attach_bounded_cluster(BoundedClusterConfig {
2063 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2064 peers: vec![PeerSpec {
2065 engine_id: EngineId::new("engine-a.example"),
2066 addr: tcp_a.local_addr(),
2067 control_plane_addr: None,
2068 server_name: None,
2069 }],
2070 declared_actors: Vec::new(),
2071 roles: Vec::new(),
2072 placement_policies: Vec::new(),
2073 })
2074 .await
2075 .expect("attach bounded cluster b");
2076 let tcp_b = cluster_b
2077 .transport()
2078 .as_tcp()
2079 .cloned()
2080 .expect("tcp transport b");
2081
2082 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
2083 let mut cluster_a = cluster_a;
2084 cluster_a
2085 .add_peer(PeerSpec {
2086 engine_id: EngineId::new("engine-b.example"),
2087 addr: tcp_b.local_addr(),
2088 control_plane_addr: Some(control_plane_b),
2089 server_name: None,
2090 })
2091 .expect("add peer with control plane");
2092
2093 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2094 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2095
2096 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2097 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2098 tokio::time::sleep(Duration::from_millis(150)).await;
2099
2100 let remote = cluster_a
2101 .spawn_remote::<EchoRequest>(
2102 EngineId::new("engine-b.example"),
2103 RemoteSpawnSpec {
2104 path: actor_path.clone(),
2105 type_name: "bounded-echo".to_string(),
2106 config: None,
2107 },
2108 )
2109 .await
2110 .expect("spawn remote actor");
2111
2112 let status = cluster_a
2113 .remote_supervision_status(&actor_path)
2114 .await
2115 .expect("running supervision status");
2116 assert_eq!(status.supervisor, EngineId::new("engine-b.example"));
2117 assert_eq!(
2118 status.state,
2119 RemoteSupervisionState::Running { restart_count: 0 }
2120 );
2121
2122 let response = remote
2123 .ask(EchoRequest { value: 21 })
2124 .await
2125 .expect("bounded remote spawn ask");
2126 assert_eq!(response, EchoResponse { doubled: 42 });
2127
2128 let duplicate = cluster_a
2129 .spawn_remote::<EchoRequest>(
2130 EngineId::new("engine-b.example"),
2131 RemoteSpawnSpec {
2132 path: actor_path.clone(),
2133 type_name: "bounded-echo".to_string(),
2134 config: None,
2135 },
2136 )
2137 .await
2138 .expect_err("duplicate remote spawn should fail");
2139 assert_eq!(
2140 duplicate,
2141 RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
2142 );
2143
2144 let old_host_addr = handle_b
2145 .registry
2146 .get_by_path(&actor_path)
2147 .expect("spawned actor registered on host")
2148 .addr;
2149 call_control_plane(
2150 &cluster_a.control_plane,
2151 control_plane_b,
2152 "engine-b.example",
2153 "actor.restart",
2154 json!({ "path": actor_path.as_str() }),
2155 )
2156 .await
2157 .expect("restart remote actor on host");
2158
2159 let restarting_deadline = Instant::now() + Duration::from_secs(5);
2160 loop {
2161 match cluster_a.remote_supervision_status(&actor_path).await {
2162 Ok(status)
2163 if status.supervisor == EngineId::new("engine-b.example")
2164 && matches!(
2165 status.state,
2166 RemoteSupervisionState::Restarting { restart_count: 1 }
2167 ) =>
2168 {
2169 break;
2170 }
2171 Ok(_) | Err(_) if Instant::now() <= restarting_deadline => {
2172 tokio::time::sleep(Duration::from_millis(25)).await;
2173 }
2174 Ok(status) => panic!("remote actor did not enter restarting state: {status:?}"),
2175 Err(err) => panic!("remote actor restart status query failed: {err:?}"),
2176 }
2177 }
2178
2179 let restart_deadline = Instant::now() + Duration::from_secs(5);
2180 loop {
2181 if handle_b
2182 .registry
2183 .get_by_path(&actor_path)
2184 .map(|slot| {
2185 slot.running.load(std::sync::atomic::Ordering::Relaxed)
2186 && slot.addr != old_host_addr
2187 && slot
2188 .restart_count
2189 .load(std::sync::atomic::Ordering::Relaxed)
2190 > 0
2191 })
2192 .unwrap_or(false)
2193 {
2194 break;
2195 }
2196 if Instant::now() > restart_deadline {
2197 panic!("remote actor did not restart on host");
2198 }
2199 tokio::time::sleep(Duration::from_millis(25)).await;
2200 }
2201
2202 let restart_deadline = Instant::now() + Duration::from_secs(5);
2203 loop {
2204 if cluster_a.can_route(&actor_path) {
2205 break;
2206 }
2207 if Instant::now() > restart_deadline {
2208 panic!("remote actor route did not return after restart");
2209 }
2210 tokio::time::sleep(Duration::from_millis(25)).await;
2211 }
2212
2213 let restart_deadline = Instant::now() + Duration::from_secs(5);
2214 loop {
2215 match remote.ask(EchoRequest { value: 7 }).await {
2216 Ok(response) => {
2217 assert_eq!(response, EchoResponse { doubled: 14 });
2218 break;
2219 }
2220 Err(_) if Instant::now() <= restart_deadline => {
2221 tokio::time::sleep(Duration::from_millis(25)).await;
2222 }
2223 Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
2224 }
2225 }
2226
2227 let restart_status = cluster_a
2228 .remote_supervision_status(&actor_path)
2229 .await
2230 .expect("restart supervision status");
2231 assert_eq!(restart_status.supervisor, EngineId::new("engine-b.example"));
2232 assert_eq!(
2233 restart_status.state,
2234 RemoteSupervisionState::Running { restart_count: 1 }
2235 );
2236
2237 let unknown = cluster_a
2238 .spawn_remote::<EchoRequest>(
2239 EngineId::new("engine-c.example"),
2240 RemoteSpawnSpec {
2241 path: ActorPath::parse("/user/unknown-target").unwrap(),
2242 type_name: "bounded-echo".to_string(),
2243 config: None,
2244 },
2245 )
2246 .await
2247 .expect_err("unknown target should fail");
2248 assert_eq!(
2249 unknown,
2250 RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
2251 );
2252
2253 shutdown_a_tx.send(()).ok();
2254 shutdown_b_tx.send(()).ok();
2255 }
2256
2257 #[tokio::test]
2258 async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
2259 if !tcp_tests_available() {
2260 eprintln!("skipping tcp integration tests: network bind not permitted");
2261 return;
2262 }
2263
2264 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
2265 let (ca, ca_key) = make_ca();
2266 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2267 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2268 let control_plane_b = reserve_tcp_addr();
2269
2270 let engine_a = Engine::with_config(EngineConfig {
2271 engine_id: EngineId::new("engine-a.example"),
2272 ..Default::default()
2273 });
2274 let engine_b = Engine::with_config(EngineConfig {
2275 engine_id: EngineId::new("engine-b.example"),
2276 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2277 control_plane_tls: Some(tls_b.clone()),
2278 actor_spawn: Some(bounded_spawn_handler()),
2279 ..Default::default()
2280 });
2281
2282 let handle_a = engine_a.handle();
2283 let handle_b = engine_b.handle();
2284 handle_a
2285 .type_registry()
2286 .register_remote_ask::<EchoRequest>();
2287 handle_b
2288 .type_registry()
2289 .register_remote_ask::<EchoRequest>();
2290
2291 let cluster_b = handle_b
2292 .attach_bounded_cluster(BoundedClusterConfig {
2293 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2294 peers: Vec::new(),
2295 declared_actors: Vec::new(),
2296 roles: Vec::new(),
2297 placement_policies: Vec::new(),
2298 })
2299 .await
2300 .expect("attach bounded cluster b");
2301 let tcp_b = cluster_b
2302 .transport()
2303 .as_tcp()
2304 .cloned()
2305 .expect("tcp transport b");
2306
2307 let cluster_a = handle_a
2308 .attach_bounded_cluster(BoundedClusterConfig {
2309 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2310 peers: vec![PeerSpec {
2311 engine_id: EngineId::new("engine-b.example"),
2312 addr: tcp_b.local_addr(),
2313 control_plane_addr: Some(control_plane_b),
2314 server_name: None,
2315 }],
2316 declared_actors: Vec::new(),
2317 roles: vec![BoundedClusterRole {
2318 name: "context-service".to_string(),
2319 owner: EngineId::new("engine-b.example"),
2320 }],
2321 placement_policies: Vec::new(),
2322 })
2323 .await
2324 .expect("attach bounded cluster a");
2325 let tcp_a = cluster_a
2326 .transport()
2327 .as_tcp()
2328 .cloned()
2329 .expect("tcp transport a");
2330
2331 let mut cluster_b = cluster_b;
2332 cluster_b
2333 .add_peer(PeerSpec {
2334 engine_id: EngineId::new("engine-a.example"),
2335 addr: tcp_a.local_addr(),
2336 control_plane_addr: None,
2337 server_name: None,
2338 })
2339 .expect("add peer a to cluster b");
2340
2341 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2342 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2343
2344 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2345 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2346 tokio::time::sleep(Duration::from_millis(150)).await;
2347
2348 let remote = cluster_a
2349 .spawn_remote_on_role::<EchoRequest>(
2350 "context-service",
2351 RemoteSpawnSpec {
2352 path: actor_path.clone(),
2353 type_name: "bounded-echo".to_string(),
2354 config: None,
2355 },
2356 )
2357 .await
2358 .expect("spawn remote actor by role over tcp");
2359
2360 let response = remote
2361 .ask(EchoRequest { value: 11 })
2362 .await
2363 .expect("bounded remote spawn ask over tcp by role");
2364 assert_eq!(response, EchoResponse { doubled: 22 });
2365
2366 let unknown_role = cluster_a
2367 .spawn_remote_on_role::<EchoRequest>(
2368 "missing-role",
2369 RemoteSpawnSpec {
2370 path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
2371 type_name: "bounded-echo".to_string(),
2372 config: None,
2373 },
2374 )
2375 .await
2376 .expect_err("unknown role should fail");
2377 assert_eq!(
2378 unknown_role,
2379 RemoteSpawnError::UnknownRole("missing-role".to_string())
2380 );
2381
2382 shutdown_a_tx.send(()).ok();
2383 shutdown_b_tx.send(()).ok();
2384 }
2385
2386 #[tokio::test]
2387 async fn bounded_cluster_spawn_remote_for_class_over_tcp_returns_typed_handle() {
2388 if !tcp_tests_available() {
2389 eprintln!("skipping tcp integration tests: network bind not permitted");
2390 return;
2391 }
2392
2393 let actor_path = ActorPath::parse("/user/remote-spawn-class-tcp").unwrap();
2394 let (ca, ca_key) = make_ca();
2395 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2396 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2397 let control_plane_b = reserve_tcp_addr();
2398
2399 let engine_a = Engine::with_config(EngineConfig {
2400 engine_id: EngineId::new("engine-a.example"),
2401 ..Default::default()
2402 });
2403 let engine_b = Engine::with_config(EngineConfig {
2404 engine_id: EngineId::new("engine-b.example"),
2405 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2406 control_plane_tls: Some(tls_b.clone()),
2407 actor_spawn: Some(bounded_spawn_handler()),
2408 ..Default::default()
2409 });
2410
2411 let handle_a = engine_a.handle();
2412 let handle_b = engine_b.handle();
2413 handle_a
2414 .type_registry()
2415 .register_remote_ask::<EchoRequest>();
2416 handle_b
2417 .type_registry()
2418 .register_remote_ask::<EchoRequest>();
2419
2420 let cluster_b = handle_b
2421 .attach_bounded_cluster(BoundedClusterConfig {
2422 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2423 peers: Vec::new(),
2424 declared_actors: Vec::new(),
2425 roles: Vec::new(),
2426 placement_policies: Vec::new(),
2427 })
2428 .await
2429 .expect("attach bounded cluster b");
2430 let tcp_b = cluster_b
2431 .transport()
2432 .as_tcp()
2433 .cloned()
2434 .expect("tcp transport b");
2435
2436 let cluster_a = handle_a
2437 .attach_bounded_cluster(BoundedClusterConfig {
2438 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2439 peers: vec![PeerSpec {
2440 engine_id: EngineId::new("engine-b.example"),
2441 addr: tcp_b.local_addr(),
2442 control_plane_addr: Some(control_plane_b),
2443 server_name: None,
2444 }],
2445 declared_actors: Vec::new(),
2446 roles: vec![BoundedClusterRole {
2447 name: "context-service".to_string(),
2448 owner: EngineId::new("engine-b.example"),
2449 }],
2450 placement_policies: vec![BoundedPlacementPolicy {
2451 actor_class: "SessionActor".to_string(),
2452 role: "context-service".to_string(),
2453 }],
2454 })
2455 .await
2456 .expect("attach bounded cluster a");
2457 let tcp_a = cluster_a
2458 .transport()
2459 .as_tcp()
2460 .cloned()
2461 .expect("tcp transport a");
2462
2463 let mut cluster_b = cluster_b;
2464 cluster_b
2465 .add_peer(PeerSpec {
2466 engine_id: EngineId::new("engine-a.example"),
2467 addr: tcp_a.local_addr(),
2468 control_plane_addr: None,
2469 server_name: None,
2470 })
2471 .expect("add peer a to cluster b");
2472
2473 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2474 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2475
2476 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2477 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2478 tokio::time::sleep(Duration::from_millis(150)).await;
2479
2480 let remote = cluster_a
2481 .spawn_remote_for_class::<EchoRequest>(
2482 "SessionActor",
2483 RemoteSpawnSpec {
2484 path: actor_path.clone(),
2485 type_name: "bounded-echo".to_string(),
2486 config: None,
2487 },
2488 )
2489 .await
2490 .expect("spawn remote actor by placement class over tcp");
2491
2492 let response = remote
2493 .ask(EchoRequest { value: 17 })
2494 .await
2495 .expect("bounded remote spawn ask over tcp by placement class");
2496 assert_eq!(response, EchoResponse { doubled: 34 });
2497
2498 let status = cluster_a
2499 .remote_supervision_status(&actor_path)
2500 .await
2501 .expect("placement supervision status");
2502 assert_eq!(status.supervisor, EngineId::new("engine-b.example"));
2503 assert_eq!(
2504 status.state,
2505 RemoteSupervisionState::Running { restart_count: 0 }
2506 );
2507
2508 call_control_plane(
2509 &cluster_a.control_plane,
2510 control_plane_b,
2511 "engine-b.example",
2512 "actor.restart",
2513 json!({ "path": actor_path.as_str() }),
2514 )
2515 .await
2516 .expect("restart placement actor on host");
2517
2518 let restarting_deadline = Instant::now() + Duration::from_secs(5);
2519 loop {
2520 match cluster_a.remote_supervision_status(&actor_path).await {
2521 Ok(status)
2522 if matches!(
2523 status.state,
2524 RemoteSupervisionState::Restarting { restart_count: 1 }
2525 ) =>
2526 {
2527 assert_eq!(status.supervisor, EngineId::new("engine-b.example"));
2528 break;
2529 }
2530 Ok(_) | Err(_) if Instant::now() <= restarting_deadline => {
2531 tokio::time::sleep(Duration::from_millis(25)).await;
2532 }
2533 Ok(status) => {
2534 panic!("placement-driven actor did not enter restarting state: {status:?}")
2535 }
2536 Err(err) => panic!("placement restart status query failed: {err:?}"),
2537 }
2538 }
2539
2540 wait_for_supervision_state(
2541 &cluster_a,
2542 &actor_path,
2543 RemoteSupervisionState::Running { restart_count: 1 },
2544 )
2545 .await;
2546
2547 let unknown_class = cluster_a
2548 .spawn_remote_for_class::<EchoRequest>(
2549 "MissingActorClass",
2550 RemoteSpawnSpec {
2551 path: ActorPath::parse("/user/missing-class-tcp").unwrap(),
2552 type_name: "bounded-echo".to_string(),
2553 config: None,
2554 },
2555 )
2556 .await
2557 .expect_err("unknown placement class should fail");
2558 assert_eq!(
2559 unknown_class,
2560 RemoteSpawnError::UnknownPlacementClass("MissingActorClass".to_string())
2561 );
2562
2563 shutdown_a_tx.send(()).ok();
2564 shutdown_b_tx.send(()).ok();
2565 }
2566
2567 #[tokio::test]
2568 async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
2569 if !tcp_tests_available() {
2570 eprintln!("skipping tcp integration tests: network bind not permitted");
2571 return;
2572 }
2573
2574 let engine_a_id = EngineId::new("engine-a-norev.example");
2575 let engine_b_id = EngineId::new("engine-b-norev.example");
2576 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
2577 let (ca, ca_key) = make_ca();
2578 let tls_a = make_tls_config(engine_a_id.as_str(), &ca, &ca_key);
2579 let tls_b = make_tls_config(engine_b_id.as_str(), &ca, &ca_key);
2580 let control_plane_b = reserve_tcp_addr();
2581
2582 let engine_a = Engine::with_config(EngineConfig {
2583 engine_id: engine_a_id.clone(),
2584 ..Default::default()
2585 });
2586 let engine_b = Engine::with_config(EngineConfig {
2587 engine_id: engine_b_id.clone(),
2588 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2589 control_plane_tls: Some(tls_b.clone()),
2590 actor_spawn: Some(bounded_spawn_handler()),
2591 ..Default::default()
2592 });
2593
2594 let handle_a = engine_a.handle();
2595 let handle_b = engine_b.handle();
2596 handle_a
2597 .type_registry()
2598 .register_remote_ask::<EchoRequest>();
2599 handle_b
2600 .type_registry()
2601 .register_remote_ask::<EchoRequest>();
2602
2603 let cluster_b = handle_b
2604 .attach_bounded_cluster(BoundedClusterConfig {
2605 transport: BoundedTransportConfig::Tcp(tcp_config(engine_b_id.as_str(), tls_b)),
2606 peers: Vec::new(),
2607 declared_actors: Vec::new(),
2608 roles: Vec::new(),
2609 placement_policies: Vec::new(),
2610 })
2611 .await
2612 .expect("attach bounded cluster b");
2613 let tcp_b = cluster_b
2614 .transport()
2615 .as_tcp()
2616 .cloned()
2617 .expect("tcp transport b");
2618
2619 let cluster_a = handle_a
2620 .attach_bounded_cluster(BoundedClusterConfig {
2621 transport: BoundedTransportConfig::Tcp(tcp_config(engine_a_id.as_str(), tls_a)),
2622 peers: vec![PeerSpec {
2623 engine_id: engine_b_id.clone(),
2624 addr: tcp_b.local_addr(),
2625 control_plane_addr: Some(control_plane_b),
2626 server_name: None,
2627 }],
2628 declared_actors: Vec::new(),
2629 roles: vec![BoundedClusterRole {
2630 name: "context-service".to_string(),
2631 owner: engine_b_id.clone(),
2632 }],
2633 placement_policies: Vec::new(),
2634 })
2635 .await
2636 .expect("attach bounded cluster a");
2637 let tcp_a = cluster_a
2638 .transport()
2639 .as_tcp()
2640 .cloned()
2641 .expect("tcp transport a");
2642
2643 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2644 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2645
2646 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2647 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2648
2649 let _ = tcp_a;
2653
2654 let remote = cluster_a
2655 .spawn_remote_on_role::<EchoRequest>(
2656 "context-service",
2657 RemoteSpawnSpec {
2658 path: actor_path.clone(),
2659 type_name: "bounded-echo".to_string(),
2660 config: None,
2661 },
2662 )
2663 .await
2664 .expect("spawn remote actor by role over tcp without reverse declaration");
2665
2666 let response = remote
2667 .ask(EchoRequest { value: 13 })
2668 .await
2669 .expect("bounded remote spawn ask over tcp without reverse declaration");
2670 assert_eq!(response, EchoResponse { doubled: 26 });
2671
2672 shutdown_a_tx.send(()).ok();
2673 shutdown_b_tx.send(()).ok();
2674 }
2675
2676 #[tokio::test]
2677 async fn bounded_cluster_remote_supervision_reports_actor_unavailable() {
2678 if !tcp_tests_available() {
2679 eprintln!("skipping tcp integration tests: network bind not permitted");
2680 return;
2681 }
2682
2683 let actor_path = ActorPath::parse("/user/supervision-actor-unavailable").unwrap();
2684 let (ca, ca_key) = make_ca();
2685 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2686 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2687 let control_plane_b = reserve_tcp_addr();
2688
2689 let engine_a = Engine::with_config(EngineConfig {
2690 engine_id: EngineId::new("engine-a.example"),
2691 ..Default::default()
2692 });
2693 let mut engine_b = Engine::with_config(EngineConfig {
2694 engine_id: EngineId::new("engine-b.example"),
2695 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2696 control_plane_tls: Some(tls_b.clone()),
2697 ..Default::default()
2698 });
2699 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2700 engine_b.add_user_actor(ChildSpec::new(
2701 "supervision-actor-unavailable",
2702 RestartPolicy::Temporary,
2703 ShutdownPolicy::Timeout(Duration::from_secs(1)),
2704 ns,
2705 move || Box::new(EchoActor),
2706 ));
2707
2708 let handle_a = engine_a.handle();
2709 let handle_b = engine_b.handle();
2710 handle_a
2711 .type_registry()
2712 .register_remote_ask::<EchoRequest>();
2713 handle_b
2714 .type_registry()
2715 .register_remote_ask::<EchoRequest>();
2716
2717 let cluster_b = handle_b
2718 .attach_bounded_cluster(BoundedClusterConfig {
2719 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2720 peers: Vec::new(),
2721 declared_actors: Vec::new(),
2722 roles: Vec::new(),
2723 placement_policies: Vec::new(),
2724 })
2725 .await
2726 .expect("attach bounded cluster b");
2727 let tcp_b = cluster_b
2728 .transport()
2729 .as_tcp()
2730 .cloned()
2731 .expect("tcp transport b");
2732
2733 let mut cluster_a = handle_a
2734 .attach_bounded_cluster(BoundedClusterConfig {
2735 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2736 peers: vec![PeerSpec {
2737 engine_id: EngineId::new("engine-b.example"),
2738 addr: tcp_b.local_addr(),
2739 control_plane_addr: Some(control_plane_b),
2740 server_name: None,
2741 }],
2742 declared_actors: Vec::new(),
2743 roles: Vec::new(),
2744 placement_policies: Vec::new(),
2745 })
2746 .await
2747 .expect("attach bounded cluster a");
2748 cluster_a
2749 .declare_remote_actor(EngineId::new("engine-b.example"), actor_path.clone())
2750 .expect("declare remote actor");
2751
2752 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2753 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2754 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2755 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2756 tokio::time::sleep(Duration::from_millis(150)).await;
2757
2758 let remote = handle_a.remote_addr_for_path::<EchoRequest>(&actor_path);
2759 let response = remote
2760 .ask(EchoRequest { value: 9 })
2761 .await
2762 .expect("initial remote ask");
2763 assert_eq!(response, EchoResponse { doubled: 18 });
2764 wait_for_supervision_state(
2765 &cluster_a,
2766 &actor_path,
2767 RemoteSupervisionState::Running { restart_count: 0 },
2768 )
2769 .await;
2770
2771 call_control_plane(
2772 &cluster_a.control_plane,
2773 control_plane_b,
2774 "engine-b.example",
2775 "actor.kill",
2776 json!({ "path": actor_path.as_str() }),
2777 )
2778 .await
2779 .expect("kill remote actor on host");
2780
2781 wait_for_supervision_state(
2782 &cluster_a,
2783 &actor_path,
2784 RemoteSupervisionState::ActorUnavailable,
2785 )
2786 .await;
2787
2788 shutdown_a_tx.send(()).ok();
2789 shutdown_b_tx.send(()).ok();
2790 }
2791
2792 #[tokio::test]
2793 async fn bounded_cluster_remote_supervision_reports_supervisor_unavailable() {
2794 if !tcp_tests_available() {
2795 eprintln!("skipping tcp integration tests: network bind not permitted");
2796 return;
2797 }
2798
2799 let actor_path = ActorPath::parse("/user/supervision-host-unavailable").unwrap();
2800 let (ca, ca_key) = make_ca();
2801 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2802 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2803 let control_plane_b = reserve_tcp_addr();
2804
2805 let engine_a = Engine::with_config(EngineConfig {
2806 engine_id: EngineId::new("engine-a.example"),
2807 ..Default::default()
2808 });
2809 let mut engine_b = Engine::with_config(EngineConfig {
2810 engine_id: EngineId::new("engine-b.example"),
2811 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2812 control_plane_tls: Some(tls_b.clone()),
2813 ..Default::default()
2814 });
2815 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2816 engine_b.add_user_actor(ChildSpec::new(
2817 "supervision-host-unavailable",
2818 RestartPolicy::Permanent,
2819 ShutdownPolicy::Timeout(Duration::from_secs(1)),
2820 ns,
2821 move || Box::new(EchoActor),
2822 ));
2823
2824 let handle_a = engine_a.handle();
2825 let handle_b = engine_b.handle();
2826 handle_a
2827 .type_registry()
2828 .register_remote_ask::<EchoRequest>();
2829 handle_b
2830 .type_registry()
2831 .register_remote_ask::<EchoRequest>();
2832
2833 let cluster_b = handle_b
2834 .attach_bounded_cluster(BoundedClusterConfig {
2835 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2836 peers: Vec::new(),
2837 declared_actors: Vec::new(),
2838 roles: Vec::new(),
2839 placement_policies: Vec::new(),
2840 })
2841 .await
2842 .expect("attach bounded cluster b");
2843 let tcp_b = cluster_b
2844 .transport()
2845 .as_tcp()
2846 .cloned()
2847 .expect("tcp transport b");
2848
2849 let mut cluster_a = handle_a
2850 .attach_bounded_cluster(BoundedClusterConfig {
2851 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2852 peers: vec![PeerSpec {
2853 engine_id: EngineId::new("engine-b.example"),
2854 addr: tcp_b.local_addr(),
2855 control_plane_addr: Some(control_plane_b),
2856 server_name: None,
2857 }],
2858 declared_actors: Vec::new(),
2859 roles: Vec::new(),
2860 placement_policies: Vec::new(),
2861 })
2862 .await
2863 .expect("attach bounded cluster a");
2864 cluster_a
2865 .declare_remote_actor(EngineId::new("engine-b.example"), actor_path.clone())
2866 .expect("declare remote actor");
2867
2868 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2869 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2870 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2871 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2872 tokio::time::sleep(Duration::from_millis(150)).await;
2873
2874 let remote = handle_a.remote_addr_for_path::<EchoRequest>(&actor_path);
2875 let response = remote
2876 .ask(EchoRequest { value: 3 })
2877 .await
2878 .expect("initial remote ask");
2879 assert_eq!(response, EchoResponse { doubled: 6 });
2880 wait_for_supervision_state(
2881 &cluster_a,
2882 &actor_path,
2883 RemoteSupervisionState::Running { restart_count: 0 },
2884 )
2885 .await;
2886
2887 shutdown_b_tx.send(()).ok();
2888 wait_for_supervision_state(
2889 &cluster_a,
2890 &actor_path,
2891 RemoteSupervisionState::SupervisorUnavailable,
2892 )
2893 .await;
2894
2895 shutdown_a_tx.send(()).ok();
2896 }
2897
2898 #[tokio::test]
2899 async fn bounded_cluster_remote_supervision_reports_transport_unavailable() {
2900 if !tcp_tests_available() {
2901 eprintln!("skipping tcp integration tests: network bind not permitted");
2902 return;
2903 }
2904
2905 let actor_path = ActorPath::parse("/user/supervision-transport-unavailable").unwrap();
2906 let (ca, ca_key) = make_ca();
2907 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2908 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2909 let control_plane_b = reserve_tcp_addr();
2910 let wrong_transport_addr = reserve_tcp_addr();
2911
2912 let engine_a = Engine::with_config(EngineConfig {
2913 engine_id: EngineId::new("engine-a.example"),
2914 ..Default::default()
2915 });
2916 let mut engine_b = Engine::with_config(EngineConfig {
2917 engine_id: EngineId::new("engine-b.example"),
2918 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2919 control_plane_tls: Some(tls_b),
2920 ..Default::default()
2921 });
2922 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2923 engine_b.add_user_actor(ChildSpec::new(
2924 "supervision-transport-unavailable",
2925 RestartPolicy::Permanent,
2926 ShutdownPolicy::Timeout(Duration::from_secs(1)),
2927 ns,
2928 move || Box::new(EchoActor),
2929 ));
2930
2931 let handle_a = engine_a.handle();
2932 let mut cluster_a = handle_a
2933 .attach_bounded_cluster(BoundedClusterConfig {
2934 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2935 peers: vec![PeerSpec {
2936 engine_id: EngineId::new("engine-b.example"),
2937 addr: wrong_transport_addr,
2938 control_plane_addr: Some(control_plane_b),
2939 server_name: None,
2940 }],
2941 declared_actors: Vec::new(),
2942 roles: Vec::new(),
2943 placement_policies: Vec::new(),
2944 })
2945 .await
2946 .expect("attach bounded cluster a");
2947 cluster_a
2948 .declare_remote_actor(EngineId::new("engine-b.example"), actor_path.clone())
2949 .expect("declare remote actor");
2950
2951 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2952 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2953 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2954 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2955 tokio::time::sleep(Duration::from_millis(150)).await;
2956
2957 wait_for_supervision_state(
2958 &cluster_a,
2959 &actor_path,
2960 RemoteSupervisionState::TransportUnavailable,
2961 )
2962 .await;
2963
2964 shutdown_a_tx.send(()).ok();
2965 shutdown_b_tx.send(()).ok();
2966 }
2967
2968 fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
2969 QuicTransportConfig {
2970 engine_id: EngineId::new(engine_id),
2971 listen_addr: "127.0.0.1:0".parse().unwrap(),
2972 send_buffer_size: 1024,
2973 idle_timeout: Duration::from_secs(2),
2974 tls,
2975 }
2976 }
2977
2978 #[tokio::test]
2979 async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
2980 if !quic_tests_available() {
2981 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2982 return;
2983 }
2984
2985 let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
2986 let (ca, ca_key) = make_ca();
2987 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2988 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2989
2990 let mut engine_a = Engine::with_config(EngineConfig {
2991 engine_id: EngineId::new("engine-a.example"),
2992 ..Default::default()
2993 });
2994 let engine_b = Engine::with_config(EngineConfig {
2995 engine_id: EngineId::new("engine-b.example"),
2996 ..Default::default()
2997 });
2998
2999 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
3000 engine_a.add_user_actor(ChildSpec::new(
3001 "cluster-echo-quic",
3002 RestartPolicy::Permanent,
3003 ShutdownPolicy::Timeout(Duration::from_secs(1)),
3004 ns,
3005 move || Box::new(EchoActor),
3006 ));
3007
3008 let handle_a = engine_a.handle();
3009 let handle_b = engine_b.handle();
3010 handle_a
3011 .type_registry()
3012 .register_remote_ask::<EchoRequest>();
3013 handle_b
3014 .type_registry()
3015 .register_remote_ask::<EchoRequest>();
3016
3017 let cluster_a = match handle_a
3018 .attach_bounded_cluster(BoundedClusterConfig {
3019 transport: BoundedTransportConfig::Quic(quic_config(
3020 "engine-a.example",
3021 tls_a.clone(),
3022 )),
3023 peers: Vec::new(),
3024 declared_actors: Vec::new(),
3025 roles: Vec::new(),
3026 placement_policies: Vec::new(),
3027 })
3028 .await
3029 {
3030 Ok(cluster) => cluster,
3031 Err(ClusterError::Transport(err)) => {
3032 eprintln!("skipping quic bounded-cluster test: {err:?}");
3033 return;
3034 }
3035 Err(err) => panic!("attach bounded cluster a: {err:?}"),
3036 };
3037 let quic_a = cluster_a
3038 .transport()
3039 .as_quic()
3040 .cloned()
3041 .expect("quic transport a");
3042
3043 let cluster_b = match handle_b
3044 .attach_bounded_cluster(BoundedClusterConfig {
3045 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
3046 peers: vec![PeerSpec {
3047 engine_id: EngineId::new("engine-a.example"),
3048 addr: quic_a.local_addr(),
3049 control_plane_addr: None,
3050 server_name: None,
3051 }],
3052 declared_actors: Vec::new(),
3053 roles: Vec::new(),
3054 placement_policies: Vec::new(),
3055 })
3056 .await
3057 {
3058 Ok(cluster) => cluster,
3059 Err(ClusterError::Transport(err)) => {
3060 eprintln!("skipping quic bounded-cluster test: {err:?}");
3061 return;
3062 }
3063 Err(err) => panic!("attach bounded cluster b: {err:?}"),
3064 };
3065 let quic_b = cluster_b
3066 .transport()
3067 .as_quic()
3068 .cloned()
3069 .expect("quic transport b");
3070
3071 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
3072 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
3073 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
3074
3075 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
3076 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
3077
3078 let canonical = AddrHash::new(&actor_path, 0);
3079 let deadline = std::time::Instant::now() + Duration::from_secs(5);
3080 while !cluster_b.can_route(&actor_path) {
3081 if std::time::Instant::now() > deadline {
3082 panic!("bounded quic cluster route did not propagate");
3083 }
3084 tokio::time::sleep(Duration::from_millis(20)).await;
3085 }
3086 assert!(quic_b.can_route(canonical));
3087
3088 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
3089 let ask_deadline = Instant::now() + Duration::from_secs(2);
3090 loop {
3091 match remote.ask(EchoRequest { value: 15 }).await {
3092 Ok(response) => {
3093 assert_eq!(response, EchoResponse { doubled: 30 });
3094 break;
3095 }
3096 Err(palladium_actor::AskError::Send(
3097 palladium_actor::SendError::ConnectionFailed,
3098 )) if Instant::now() <= ask_deadline => {
3099 tokio::time::sleep(Duration::from_millis(20)).await;
3100 }
3101 Err(err) => panic!("bounded cluster ask over quic: {err:?}"),
3102 }
3103 }
3104
3105 shutdown_a_tx.send(()).ok();
3106 shutdown_b_tx.send(()).ok();
3107 }
3108
3109 #[tokio::test]
3110 async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
3111 if !quic_tests_available() {
3112 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
3113 return;
3114 }
3115
3116 let (ca, ca_key) = make_ca();
3117 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
3118 let engine = Engine::with_config(EngineConfig {
3119 engine_id: EngineId::new("engine-a.example"),
3120 ..Default::default()
3121 });
3122 let handle = engine.handle();
3123
3124 let mut cluster = match handle
3125 .attach_bounded_cluster(BoundedClusterConfig {
3126 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
3127 peers: vec![PeerSpec {
3128 engine_id: EngineId::new("engine-b.example"),
3129 addr: "127.0.0.1:4100".parse().unwrap(),
3130 control_plane_addr: None,
3131 server_name: None,
3132 }],
3133 declared_actors: Vec::new(),
3134 roles: Vec::new(),
3135 placement_policies: Vec::new(),
3136 })
3137 .await
3138 {
3139 Ok(cluster) => cluster,
3140 Err(ClusterError::Transport(err)) => {
3141 eprintln!("skipping quic bounded-cluster test: {err:?}");
3142 return;
3143 }
3144 Err(err) => panic!("attach bounded cluster: {err:?}"),
3145 };
3146
3147 let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
3148 cluster
3149 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
3150 .expect("declare remote actor");
3151
3152 assert!(cluster.can_route(&path));
3153 assert!(cluster
3154 .transport()
3155 .as_quic()
3156 .expect("quic transport")
3157 .can_route(AddrHash::new(&path, 0)));
3158 }
3159
3160 #[tokio::test]
3161 async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
3162 if !quic_tests_available() {
3163 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
3164 return;
3165 }
3166
3167 let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
3168 let (ca, ca_key) = make_ca();
3169 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
3170 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
3171 let control_plane_b = reserve_udp_addr();
3172
3173 let engine_a = Engine::with_config(EngineConfig {
3174 engine_id: EngineId::new("engine-a.example"),
3175 ..Default::default()
3176 });
3177 let engine_b = Engine::with_config(EngineConfig {
3178 engine_id: EngineId::new("engine-b.example"),
3179 control_plane_quic_addr: Some(control_plane_b.to_string()),
3180 control_plane_tls: Some(tls_b.clone()),
3181 actor_spawn: Some(bounded_spawn_handler()),
3182 ..Default::default()
3183 });
3184
3185 let handle_a = engine_a.handle();
3186 let handle_b = engine_b.handle();
3187 handle_a
3188 .type_registry()
3189 .register_remote_ask::<EchoRequest>();
3190 handle_b
3191 .type_registry()
3192 .register_remote_ask::<EchoRequest>();
3193
3194 let cluster_b = match handle_b
3195 .attach_bounded_cluster(BoundedClusterConfig {
3196 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
3197 peers: Vec::new(),
3198 declared_actors: Vec::new(),
3199 roles: Vec::new(),
3200 placement_policies: Vec::new(),
3201 })
3202 .await
3203 {
3204 Ok(cluster) => cluster,
3205 Err(ClusterError::Transport(err)) => {
3206 eprintln!("skipping quic bounded-cluster test: {err:?}");
3207 return;
3208 }
3209 Err(err) => panic!("attach bounded cluster b: {err:?}"),
3210 };
3211 let quic_b = cluster_b
3212 .transport()
3213 .as_quic()
3214 .cloned()
3215 .expect("quic transport b");
3216
3217 let cluster_a = match handle_a
3218 .attach_bounded_cluster(BoundedClusterConfig {
3219 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
3220 peers: vec![PeerSpec {
3221 engine_id: EngineId::new("engine-b.example"),
3222 addr: quic_b.local_addr(),
3223 control_plane_addr: Some(control_plane_b),
3224 server_name: None,
3225 }],
3226 declared_actors: Vec::new(),
3227 roles: vec![BoundedClusterRole {
3228 name: "context-service".to_string(),
3229 owner: EngineId::new("engine-b.example"),
3230 }],
3231 placement_policies: Vec::new(),
3232 })
3233 .await
3234 {
3235 Ok(cluster) => cluster,
3236 Err(ClusterError::Transport(err)) => {
3237 eprintln!("skipping quic bounded-cluster test: {err:?}");
3238 return;
3239 }
3240 Err(err) => panic!("attach bounded cluster a: {err:?}"),
3241 };
3242 let quic_a = cluster_a
3243 .transport()
3244 .as_quic()
3245 .cloned()
3246 .expect("quic transport a");
3247
3248 let mut cluster_b = cluster_b;
3249 cluster_b
3250 .add_peer(PeerSpec {
3251 engine_id: EngineId::new("engine-a.example"),
3252 addr: quic_a.local_addr(),
3253 control_plane_addr: None,
3254 server_name: None,
3255 })
3256 .expect("add quic peer a to cluster b");
3257
3258 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
3259 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
3260
3261 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
3262 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
3263 tokio::time::sleep(Duration::from_millis(150)).await;
3264
3265 let remote = cluster_a
3266 .spawn_remote_on_role::<EchoRequest>(
3267 "context-service",
3268 RemoteSpawnSpec {
3269 path: actor_path.clone(),
3270 type_name: "bounded-echo".to_string(),
3271 config: None,
3272 },
3273 )
3274 .await
3275 .expect("spawn remote actor over quic by role");
3276
3277 let response = remote
3278 .ask(EchoRequest { value: 9 })
3279 .await
3280 .expect("bounded remote spawn ask over quic by role");
3281 assert_eq!(response, EchoResponse { doubled: 18 });
3282
3283 let unknown_role = cluster_a
3284 .spawn_remote_on_role::<EchoRequest>(
3285 "missing-role",
3286 RemoteSpawnSpec {
3287 path: ActorPath::parse("/user/missing-role").unwrap(),
3288 type_name: "bounded-echo".to_string(),
3289 config: None,
3290 },
3291 )
3292 .await
3293 .expect_err("unknown role should fail");
3294 assert_eq!(
3295 unknown_role,
3296 RemoteSpawnError::UnknownRole("missing-role".to_string())
3297 );
3298
3299 shutdown_a_tx.send(()).ok();
3300 shutdown_b_tx.send(()).ok();
3301 }
3302}