Skip to main content

palladium_runtime/
bounded_cluster.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use dashmap::DashMap;
10use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
11use palladium_transport::network::{Network, TokioNetwork};
12use palladium_transport::{
13    mailbox, QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport,
14    TransportError,
15};
16use rustls::pki_types::ServerName;
17use serde_json::{json, Value};
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
19use tokio_rustls::TlsConnector;
20
21trait SpawnPeerInstaller: Send + Sync {
22    fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>);
23}
24
25impl<R: Reactor + Clone, N: Network + Clone> SpawnPeerInstaller for BoundedTransportHandle<R, N> {
26    fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>) {
27        let source_addr = AddrHash::synthetic(engine_id.as_str().as_bytes());
28        match server_name {
29            Some(server_name) => self.add_peer(&PeerSpec {
30                engine_id: engine_id.clone(),
31                addr,
32                control_plane_addr: None,
33                server_name: Some(server_name),
34            }),
35            None => self.add_peer(&PeerSpec {
36                engine_id: engine_id.clone(),
37                addr,
38                control_plane_addr: None,
39                server_name: None,
40            }),
41        }
42        match self {
43            Self::Tcp(transport) => {
44                let _ = transport.register_remote_source_addr(engine_id, source_addr);
45            }
46            Self::Quic(transport) => {
47                let _ = transport.register_remote_source_addr(engine_id, source_addr);
48            }
49        }
50    }
51}
52
53fn spawn_peer_installers() -> &'static DashMap<String, Arc<dyn SpawnPeerInstaller>> {
54    static INSTALLERS: std::sync::OnceLock<DashMap<String, Arc<dyn SpawnPeerInstaller>>> =
55        std::sync::OnceLock::new();
56    INSTALLERS.get_or_init(DashMap::new)
57}
58
59pub(crate) fn register_spawn_peer_installer<R: Reactor + Clone, N: Network + Clone>(
60    engine_id: &EngineId,
61    transport: &BoundedTransportHandle<R, N>,
62) {
63    spawn_peer_installers().insert(
64        engine_id.as_str().to_string(),
65        Arc::new(transport.clone()) as Arc<dyn SpawnPeerInstaller>,
66    );
67}
68
69pub(crate) fn maybe_register_spawn_caller_peer(
70    host_engine_id: &EngineId,
71    caller_engine_id: &EngineId,
72    caller_transport_addr: SocketAddr,
73    caller_server_name: Option<String>,
74) {
75    if let Some(installer) = spawn_peer_installers().get(host_engine_id.as_str()) {
76        installer.ensure_peer(
77            caller_engine_id.clone(),
78            caller_transport_addr,
79            caller_server_name,
80        );
81    }
82}
83
84#[derive(Clone)]
85pub struct BoundedClusterConfig {
86    pub transport: BoundedTransportConfig,
87    pub peers: Vec<PeerSpec>,
88    pub declared_actors: Vec<DeclaredRemoteActor>,
89    pub roles: Vec<BoundedClusterRole>,
90    pub placement_policies: Vec<BoundedPlacementPolicy>,
91}
92
93impl BoundedClusterConfig {
94    pub fn validate(&self) -> Result<(), ClusterError> {
95        validate_static_config(self)
96    }
97}
98
99#[derive(Clone)]
100pub enum BoundedTransportConfig {
101    Tcp(TcpTransportConfig),
102    Quic(QuicTransportConfig),
103}
104
105impl BoundedTransportConfig {
106    pub fn engine_id(&self) -> &EngineId {
107        match self {
108            Self::Tcp(config) => &config.engine_id,
109            Self::Quic(config) => &config.engine_id,
110        }
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq)]
115pub struct PeerSpec {
116    pub engine_id: EngineId,
117    pub addr: SocketAddr,
118    pub control_plane_addr: Option<SocketAddr>,
119    pub server_name: Option<String>,
120}
121
122#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct DeclaredRemoteActor {
124    pub path: ActorPath,
125    pub owner: EngineId,
126}
127
128#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct BoundedClusterRole {
130    pub name: String,
131    pub owner: EngineId,
132}
133
134#[derive(Clone, Debug, PartialEq, Eq)]
135pub struct BoundedPlacementPolicy {
136    pub actor_class: String,
137    pub role: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Eq)]
141pub enum ClusterError {
142    DuplicatePeer(EngineId),
143    LocalPeerDeclared(EngineId),
144    DuplicateDeclaredActor(ActorPath),
145    LocalActorDeclaredRemote(ActorPath),
146    UnknownPeerOwner { path: ActorPath, owner: EngineId },
147    DuplicateRole(String),
148    UnknownRoleOwner { role: String, owner: EngineId },
149    RoleOwnerMissingControlPlaneEndpoint { role: String, owner: EngineId },
150    DuplicatePlacementClass(String),
151    UnknownPlacementRole { actor_class: String, role: String },
152    Transport(TransportError),
153}
154
155#[derive(Clone, Debug, PartialEq)]
156pub struct RemoteSpawnSpec {
157    pub path: ActorPath,
158    pub type_name: String,
159    pub config: Option<Value>,
160}
161
162#[derive(Clone, Debug, PartialEq, Eq)]
163pub enum RemoteSpawnError {
164    UnknownPeer(EngineId),
165    UnknownRole(String),
166    UnknownPlacementClass(String),
167    LocalPathCollision(ActorPath),
168    RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
169    RemotePathAlreadyRunning(ActorPath),
170    MissingControlPlaneEndpoint(EngineId),
171    InvalidServerName(String),
172    ControlPlane(String),
173    SpawnTimedOut(ActorPath),
174}
175
176#[derive(Clone, Debug, PartialEq, Eq)]
177pub enum RemoteSupervisionState {
178    Running { restart_count: u32 },
179    Restarting { restart_count: u32 },
180    ActorUnavailable,
181    SupervisorUnavailable,
182    TransportUnavailable,
183}
184
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct RemoteSupervisionStatus {
187    pub path: ActorPath,
188    pub supervisor: EngineId,
189    pub state: RemoteSupervisionState,
190}
191
192#[derive(Clone, Debug, PartialEq, Eq)]
193pub enum RemoteSupervisionError {
194    UnknownRemoteActor(ActorPath),
195    UnknownPeer(EngineId),
196    MissingControlPlaneEndpoint(EngineId),
197}
198
199#[derive(Clone)]
200pub(crate) enum ControlPlaneProtocol {
201    Tcp,
202    Quic,
203}
204
205#[derive(Clone)]
206pub(crate) struct ControlPlaneClientConfig {
207    pub(crate) protocol: ControlPlaneProtocol,
208    pub(crate) tls: palladium_transport::TlsConfig,
209    pub(crate) wait_timeout: Duration,
210}
211
212#[derive(Clone)]
213pub(crate) struct BoundedClusterTopology {
214    pub(crate) peers: Vec<PeerSpec>,
215    pub(crate) roles: Vec<BoundedClusterRole>,
216    pub(crate) placement_policies: Vec<BoundedPlacementPolicy>,
217}
218
219const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
220const REMOTE_SUPERVISION_PROBE_TIMEOUT: Duration = Duration::from_millis(150);
221
222impl From<TransportError> for ClusterError {
223    fn from(value: TransportError) -> Self {
224        Self::Transport(value)
225    }
226}
227
228impl From<TransportError> for RemoteSpawnError {
229    fn from(value: TransportError) -> Self {
230        Self::ControlPlane(format!("transport route install failed: {value:?}"))
231    }
232}
233
234#[derive(Clone)]
235pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
236    Tcp(Arc<TcpTransport<R, N>>),
237    Quic(Arc<QuicTransport<R, N>>),
238}
239
240impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
241    pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
242        match self {
243            Self::Tcp(transport) => Some(transport),
244            Self::Quic(_) => None,
245        }
246    }
247
248    pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
249        match self {
250            Self::Tcp(_) => None,
251            Self::Quic(transport) => Some(transport),
252        }
253    }
254
255    pub fn add_peer(&self, peer: &PeerSpec) {
256        match self {
257            Self::Tcp(transport) => {
258                if let Some(server_name) = &peer.server_name {
259                    transport.add_peer_with_server_name(
260                        peer.engine_id.clone(),
261                        peer.addr,
262                        server_name.clone(),
263                    );
264                } else {
265                    transport.add_peer(peer.engine_id.clone(), peer.addr);
266                }
267            }
268            Self::Quic(transport) => {
269                if let Some(server_name) = &peer.server_name {
270                    transport.add_peer_with_server_name(
271                        peer.engine_id.clone(),
272                        peer.addr,
273                        server_name.clone(),
274                    );
275                } else {
276                    transport.add_peer(peer.engine_id.clone(), peer.addr);
277                }
278            }
279        }
280    }
281
282    pub fn declare_remote_path(
283        &self,
284        owner: EngineId,
285        path: &ActorPath,
286    ) -> Result<(), TransportError> {
287        match self {
288            Self::Tcp(transport) => transport.declare_remote_path(owner, path),
289            Self::Quic(transport) => transport.declare_remote_path(owner, path),
290        }
291    }
292
293    pub fn advertise_path(&self, addr: AddrHash, path: &ActorPath) -> Result<(), TransportError> {
294        match self {
295            Self::Tcp(transport) => transport.advertise_path(addr, path),
296            Self::Quic(transport) => transport.advertise_path(addr, path),
297        }
298    }
299
300    pub fn register_source_addr(&self, addr: AddrHash) -> Result<(), TransportError> {
301        match self {
302            Self::Tcp(transport) => transport.register(addr, mailbox(1).0),
303            Self::Quic(transport) => transport.register(addr, mailbox(1).0),
304        }
305    }
306
307    pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
308        match self {
309            Self::Tcp(transport) => transport.undeclare_remote_path(path),
310            Self::Quic(transport) => transport.undeclare_remote_path(path),
311        }
312    }
313
314    pub fn can_route(&self, destination: AddrHash) -> bool {
315        match self {
316            Self::Tcp(transport) => transport.can_route(destination),
317            Self::Quic(transport) => transport.can_route(destination),
318        }
319    }
320
321    pub async fn wait_for_peer_connection(
322        &self,
323        engine_id: &EngineId,
324        timeout: Duration,
325    ) -> Result<(), TransportError> {
326        match self {
327            Self::Tcp(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
328            Self::Quic(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
329        }
330    }
331}
332
333#[derive(Clone)]
334pub struct BoundedClusterHandle<
335    R: Reactor = TokioReactor,
336    N: Network = TokioNetwork,
337    F: FileSystem = TokioFileSystem,
338> {
339    engine: EngineHandle<R, N, F>,
340    local_engine_id: EngineId,
341    control_plane: ControlPlaneClientConfig,
342    transport: BoundedTransportHandle<R, N>,
343    peers: Vec<PeerSpec>,
344    declared_actors: Vec<DeclaredRemoteActor>,
345    roles: Vec<BoundedClusterRole>,
346    placement_policies: Vec<BoundedPlacementPolicy>,
347    supervised_owners: Arc<DashMap<String, EngineId>>,
348    registry: Arc<ActorRegistry<R>>,
349}
350
351impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
352    pub(crate) fn new(
353        engine: EngineHandle<R, N, F>,
354        local_engine_id: EngineId,
355        control_plane: ControlPlaneClientConfig,
356        transport: BoundedTransportHandle<R, N>,
357        topology: BoundedClusterTopology,
358        registry: Arc<ActorRegistry<R>>,
359    ) -> Self {
360        Self {
361            engine,
362            local_engine_id,
363            control_plane,
364            transport,
365            peers: topology.peers,
366            declared_actors: Vec::new(),
367            roles: topology.roles,
368            placement_policies: topology.placement_policies,
369            supervised_owners: Arc::new(DashMap::new()),
370            registry,
371        }
372    }
373
374    pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
375        &self.transport
376    }
377
378    pub fn peers(&self) -> &[PeerSpec] {
379        &self.peers
380    }
381
382    pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
383        &self.declared_actors
384    }
385
386    pub fn roles(&self) -> &[BoundedClusterRole] {
387        &self.roles
388    }
389
390    pub fn placement_policies(&self) -> &[BoundedPlacementPolicy] {
391        &self.placement_policies
392    }
393
394    pub(crate) fn advertise_existing_local_routes(&self) -> Result<(), ClusterError> {
395        self.transport
396            .register_source_addr(self.engine.source_addr)
397            .map_err(ClusterError::Transport)?;
398
399        for info in self
400            .registry
401            .snapshot(&crate::introspection::ActorQuery::default(), 0)
402        {
403            if info.state != crate::introspection::ActorState::Running {
404                continue;
405            }
406            if let Some(slot) = self.registry.get_by_path(&info.path) {
407                self.transport
408                    .advertise_path(slot.addr, &info.path)
409                    .map_err(ClusterError::Transport)?;
410            }
411        }
412        Ok(())
413    }
414
415    pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
416        validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
417        self.transport.add_peer(&peer);
418        self.advertise_existing_local_routes()?;
419        self.peers.push(peer);
420        Ok(())
421    }
422
423    pub fn declare_remote_actor(
424        &mut self,
425        owner: EngineId,
426        path: ActorPath,
427    ) -> Result<(), ClusterError> {
428        validate_declared_actor_topology(
429            &path,
430            &owner,
431            &self.local_engine_id,
432            &self.peers,
433            &self.declared_actors,
434        )?;
435        validate_declared_actor_registry(
436            &path,
437            &owner,
438            &self.local_engine_id,
439            &self.declared_actors,
440            &self.registry,
441        )?;
442        self.transport.declare_remote_path(owner.clone(), &path)?;
443        self.supervised_owners
444            .insert(path.as_str().to_string(), owner.clone());
445        self.declared_actors
446            .push(DeclaredRemoteActor { path, owner });
447        Ok(())
448    }
449
450    pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
451        self.transport.undeclare_remote_path(path)?;
452        self.supervised_owners.remove(path.as_str());
453        self.declared_actors
454            .retain(|declared| &declared.path != path);
455        Ok(())
456    }
457
458    pub fn can_route(&self, path: &ActorPath) -> bool {
459        self.transport.can_route(AddrHash::new(path, 0))
460    }
461
462    pub async fn remote_supervision_status(
463        &self,
464        path: &ActorPath,
465    ) -> Result<RemoteSupervisionStatus, RemoteSupervisionError> {
466        let supervisor = self
467            .supervised_owners
468            .get(path.as_str())
469            .map(|entry| entry.clone())
470            .or_else(|| {
471                self.declared_actors
472                    .iter()
473                    .find(|declared| declared.path == *path)
474                    .map(|declared| declared.owner.clone())
475            })
476            .ok_or_else(|| RemoteSupervisionError::UnknownRemoteActor(path.clone()))?;
477
478        let peer = self
479            .peers
480            .iter()
481            .find(|peer| peer.engine_id == supervisor)
482            .ok_or_else(|| RemoteSupervisionError::UnknownPeer(supervisor.clone()))?;
483        let control_plane_addr = peer.control_plane_addr.ok_or_else(|| {
484            RemoteSupervisionError::MissingControlPlaneEndpoint(supervisor.clone())
485        })?;
486        let server_name = peer
487            .server_name
488            .clone()
489            .unwrap_or_else(|| supervisor.as_str().to_string());
490
491        if call_control_plane(
492            &self.control_plane,
493            control_plane_addr,
494            &server_name,
495            "engine.status",
496            json!({}),
497        )
498        .await
499        .is_err()
500        {
501            return Ok(RemoteSupervisionStatus {
502                path: path.clone(),
503                supervisor,
504                state: RemoteSupervisionState::SupervisorUnavailable,
505            });
506        }
507
508        match call_control_plane(
509            &self.control_plane,
510            control_plane_addr,
511            &server_name,
512            "actor.info",
513            json!({ "path": path.as_str() }),
514        )
515        .await
516        {
517            Ok(info) => {
518                let restart_count = info
519                    .get("restart_count")
520                    .and_then(Value::as_u64)
521                    .unwrap_or(0) as u32;
522                let state = match info.get("state").and_then(Value::as_str) {
523                    Some("Stopped") => RemoteSupervisionState::Restarting { restart_count },
524                    _ => match self
525                        .transport
526                        .wait_for_peer_connection(&supervisor, REMOTE_SUPERVISION_PROBE_TIMEOUT)
527                        .await
528                    {
529                        Ok(()) => RemoteSupervisionState::Running { restart_count },
530                        Err(_) => RemoteSupervisionState::TransportUnavailable,
531                    },
532                };
533                Ok(RemoteSupervisionStatus {
534                    path: path.clone(),
535                    supervisor,
536                    state,
537                })
538            }
539            Err(RemoteSpawnError::ControlPlane(message))
540                if message.contains("actor not found:") || message.contains("actor not found") =>
541            {
542                Ok(RemoteSupervisionStatus {
543                    path: path.clone(),
544                    supervisor,
545                    state: RemoteSupervisionState::ActorUnavailable,
546                })
547            }
548            Err(_) => Ok(RemoteSupervisionStatus {
549                path: path.clone(),
550                supervisor,
551                state: RemoteSupervisionState::SupervisorUnavailable,
552            }),
553        }
554    }
555
556    pub async fn spawn_remote_on_role<M>(
557        &self,
558        role: &str,
559        spec: RemoteSpawnSpec,
560    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
561    where
562        M: RemoteMessage,
563        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
564        R: Send + Sync,
565    {
566        let owner = self
567            .roles
568            .iter()
569            .find(|declared| declared.name == role)
570            .map(|declared| declared.owner.clone())
571            .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
572        self.spawn_remote::<M>(owner, spec).await
573    }
574
575    pub async fn spawn_remote_for_class<M>(
576        &self,
577        actor_class: &str,
578        spec: RemoteSpawnSpec,
579    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
580    where
581        M: RemoteMessage,
582        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
583        R: Send + Sync,
584    {
585        let role = self
586            .placement_policies
587            .iter()
588            .find(|policy| policy.actor_class == actor_class)
589            .map(|policy| policy.role.as_str())
590            .ok_or_else(|| RemoteSpawnError::UnknownPlacementClass(actor_class.to_string()))?;
591        self.spawn_remote_on_role::<M>(role, spec).await
592    }
593
594    pub async fn spawn_remote<M>(
595        &self,
596        target: EngineId,
597        spec: RemoteSpawnSpec,
598    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
599    where
600        M: RemoteMessage,
601        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
602        R: Send + Sync,
603    {
604        validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
605
606        let peer = self
607            .peers
608            .iter()
609            .find(|peer| peer.engine_id == target)
610            .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
611        let control_plane_addr = peer
612            .control_plane_addr
613            .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
614        let server_name = peer
615            .server_name
616            .clone()
617            .unwrap_or_else(|| target.as_str().to_string());
618        let caller_server_name = self.local_engine_id.as_str().to_string();
619
620        let deadline = Instant::now() + self.control_plane.wait_timeout;
621        loop {
622            match call_actor_spawn(
623                &self.control_plane,
624                control_plane_addr,
625                &server_name,
626                &self.local_engine_id,
627                transport_listen_addr(&self.transport),
628                &caller_server_name,
629                &spec,
630            )
631            .await
632            {
633                Ok(()) => break,
634                Err(RemoteSpawnError::ControlPlane(message))
635                    if is_retryable_control_plane_error(&message) && Instant::now() <= deadline =>
636                {
637                    tokio::time::sleep(Duration::from_millis(20)).await;
638                }
639                Err(err) => return Err(err),
640            }
641        }
642
643        while Instant::now() <= deadline {
644            if actor_is_spawned(
645                &self.control_plane,
646                control_plane_addr,
647                &server_name,
648                &spec.path,
649            )
650            .await?
651            {
652                self.transport
653                    .declare_remote_path(target.clone(), &spec.path)
654                    .map_err(RemoteSpawnError::from)?;
655                self.supervised_owners
656                    .insert(spec.path.as_str().to_string(), target.clone());
657                self.transport
658                    .wait_for_peer_connection(&target, self.control_plane.wait_timeout)
659                    .await
660                    .map_err(RemoteSpawnError::from)?;
661                return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
662            }
663            tokio::time::sleep(Duration::from_millis(20)).await;
664        }
665
666        Err(RemoteSpawnError::SpawnTimedOut(spec.path))
667    }
668}
669
670async fn call_actor_spawn(
671    client: &ControlPlaneClientConfig,
672    addr: SocketAddr,
673    server_name: &str,
674    caller_engine_id: &EngineId,
675    caller_transport_addr: SocketAddr,
676    caller_server_name: &str,
677    spec: &RemoteSpawnSpec,
678) -> Result<(), RemoteSpawnError> {
679    let params = match &spec.config {
680        Some(config) => json!({
681            "path": spec.path.as_str(),
682            "type_name": spec.type_name,
683            "config": config,
684            "caller_engine_id": caller_engine_id.as_str(),
685            "caller_transport_addr": caller_transport_addr,
686            "caller_server_name": caller_server_name,
687        }),
688        None => json!({
689            "path": spec.path.as_str(),
690            "type_name": spec.type_name,
691            "caller_engine_id": caller_engine_id.as_str(),
692            "caller_transport_addr": caller_transport_addr,
693            "caller_server_name": caller_server_name,
694        }),
695    };
696    match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
697        Ok(_) => {}
698        Err(RemoteSpawnError::ControlPlane(message))
699            if message.contains("actor already running at") =>
700        {
701            return Err(RemoteSpawnError::RemotePathAlreadyRunning(
702                spec.path.clone(),
703            ));
704        }
705        Err(err) => return Err(err),
706    }
707    Ok(())
708}
709
710fn transport_listen_addr<R: Reactor + Clone, N: Network + Clone>(
711    transport: &BoundedTransportHandle<R, N>,
712) -> SocketAddr {
713    match transport {
714        BoundedTransportHandle::Tcp(transport) => transport.local_addr(),
715        BoundedTransportHandle::Quic(transport) => transport.local_addr(),
716    }
717}
718
719fn is_retryable_control_plane_error(message: &str) -> bool {
720    let msg = message.to_ascii_lowercase();
721    msg.contains("connection refused")
722        || msg.contains("connection reset")
723        || msg.contains("timed out")
724        || msg.contains("broken pipe")
725        || msg.contains("not connected")
726}
727
728fn validate_remote_spawn_request<R: Reactor>(
729    path: &ActorPath,
730    owner: &EngineId,
731    declared_actors: &[DeclaredRemoteActor],
732    registry: &Arc<ActorRegistry<R>>,
733) -> Result<(), RemoteSpawnError> {
734    if registry.get_by_path(path).is_some() {
735        return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
736    }
737
738    if let Some(declared) = declared_actors
739        .iter()
740        .find(|declared| declared.path == *path && declared.owner != *owner)
741    {
742        return Err(RemoteSpawnError::RemoteOwnershipConflict {
743            path: path.clone(),
744            owner: declared.owner.clone(),
745        });
746    }
747
748    Ok(())
749}
750
751async fn actor_is_spawned(
752    client: &ControlPlaneClientConfig,
753    addr: SocketAddr,
754    server_name: &str,
755    path: &ActorPath,
756) -> Result<bool, RemoteSpawnError> {
757    match call_control_plane(
758        client,
759        addr,
760        server_name,
761        "actor.info",
762        json!({ "path": path.as_str() }),
763    )
764    .await
765    {
766        Ok(_) => Ok(true),
767        Err(RemoteSpawnError::ControlPlane(message))
768            if message.contains("actor not found:")
769                || message.contains("actor not found")
770                || message.contains("User supervisor not available") =>
771        {
772            Ok(false)
773        }
774        Err(err) => Err(err),
775    }
776}
777
778async fn call_control_plane(
779    client: &ControlPlaneClientConfig,
780    addr: SocketAddr,
781    server_name: &str,
782    method: &str,
783    params: Value,
784) -> Result<Value, RemoteSpawnError> {
785    let request = json!({
786        "id": 1,
787        "method": method,
788        "params": params,
789    });
790
791    match client.protocol {
792        ControlPlaneProtocol::Tcp => {
793            call_control_plane_tcp(addr, server_name, &client.tls, &request).await
794        }
795        ControlPlaneProtocol::Quic => {
796            call_control_plane_quic(addr, server_name, &client.tls, &request).await
797        }
798    }
799}
800
801async fn call_control_plane_tcp(
802    addr: SocketAddr,
803    server_name: &str,
804    tls: &palladium_transport::TlsConfig,
805    request: &Value,
806) -> Result<Value, RemoteSpawnError> {
807    let config = tls
808        .client_config(server_name)
809        .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
810    let name = ServerName::try_from(server_name.to_string())
811        .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
812    let stream = tokio::net::TcpStream::connect(addr)
813        .await
814        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
815    let connector = TlsConnector::from(config);
816    let tls_stream = connector
817        .connect(name, stream)
818        .await
819        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
820
821    let (reader, mut writer) = tokio::io::split(tls_stream);
822    let mut body = request.to_string();
823    body.push('\n');
824    writer
825        .write_all(body.as_bytes())
826        .await
827        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
828    let mut line = String::new();
829    let mut reader = BufReader::new(reader);
830    reader
831        .read_line(&mut line)
832        .await
833        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
834    parse_control_plane_response(&line)
835}
836
837async fn call_control_plane_quic(
838    addr: SocketAddr,
839    server_name: &str,
840    tls: &palladium_transport::TlsConfig,
841    request: &Value,
842) -> Result<Value, RemoteSpawnError> {
843    ensure_rustls_provider();
844    let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
845        RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
846    })?)
847    .clone();
848    client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
849    let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
850
851    let client = s2n_quic::Client::builder()
852        .with_tls(client_tls)
853        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
854        .with_io("0.0.0.0:0")
855        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
856        .start()
857        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
858
859    let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
860    let mut conn = client
861        .connect(connect)
862        .await
863        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
864    let mut stream = conn
865        .open_bidirectional_stream()
866        .await
867        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
868
869    let mut body = request.to_string();
870    body.push('\n');
871    stream
872        .write_all(body.as_bytes())
873        .await
874        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
875    stream
876        .close()
877        .await
878        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
879
880    let mut buf = Vec::new();
881    stream
882        .read_to_end(&mut buf)
883        .await
884        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
885    let line = String::from_utf8_lossy(&buf);
886    parse_control_plane_response(&line)
887}
888
889fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
890    let response: Value = serde_json::from_str(line.trim())
891        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
892    if let Some(error) = response.get("error") {
893        let message = error
894            .get("message")
895            .and_then(Value::as_str)
896            .unwrap_or("unknown control-plane error");
897        return Err(RemoteSpawnError::ControlPlane(message.to_string()));
898    }
899    Ok(response.get("result").cloned().unwrap_or(Value::Null))
900}
901
902fn ensure_rustls_provider() {
903    static INIT: std::sync::Once = std::sync::Once::new();
904    INIT.call_once(|| {
905        #[cfg(feature = "aws-lc-rs")]
906        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
907        #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
908        let _ = rustls::crypto::ring::default_provider().install_default();
909    });
910}
911
912pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
913    ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
914}
915
916pub(crate) fn validate_config<R: Reactor>(
917    config: &BoundedClusterConfig,
918    registry: &Arc<ActorRegistry<R>>,
919) -> Result<(), ClusterError> {
920    validate_static_config(config)?;
921
922    let local_engine_id = config.transport.engine_id();
923    let mut seen_declared = Vec::new();
924    for declared in &config.declared_actors {
925        validate_declared_actor_registry(
926            &declared.path,
927            &declared.owner,
928            local_engine_id,
929            &seen_declared,
930            registry,
931        )?;
932        seen_declared.push(declared.clone());
933    }
934
935    Ok(())
936}
937
938fn validate_static_config(config: &BoundedClusterConfig) -> Result<(), ClusterError> {
939    let local_engine_id = config.transport.engine_id();
940    let mut seen_peers = Vec::new();
941    for peer in &config.peers {
942        validate_peer_spec(peer, local_engine_id, &seen_peers)?;
943        seen_peers.push(peer.clone());
944    }
945
946    let mut seen_declared = Vec::new();
947    for declared in &config.declared_actors {
948        validate_declared_actor_topology(
949            &declared.path,
950            &declared.owner,
951            local_engine_id,
952            &config.peers,
953            &seen_declared,
954        )?;
955        seen_declared.push(declared.clone());
956    }
957
958    let mut seen_roles = Vec::new();
959    for role in &config.roles {
960        validate_role(role, &config.peers, &seen_roles)?;
961        seen_roles.push(role.clone());
962    }
963
964    let mut seen_placement_policies = Vec::new();
965    for policy in &config.placement_policies {
966        validate_placement_policy(policy, &config.roles, &seen_placement_policies)?;
967        seen_placement_policies.push(policy.clone());
968    }
969
970    Ok(())
971}
972
973fn validate_peer_spec(
974    peer: &PeerSpec,
975    local_engine_id: &EngineId,
976    existing: &[PeerSpec],
977) -> Result<(), ClusterError> {
978    if &peer.engine_id == local_engine_id {
979        return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
980    }
981    if existing
982        .iter()
983        .any(|existing| existing.engine_id == peer.engine_id)
984    {
985        return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
986    }
987    Ok(())
988}
989
990fn validate_declared_actor_topology(
991    path: &ActorPath,
992    owner: &EngineId,
993    local_engine_id: &EngineId,
994    peers: &[PeerSpec],
995    existing: &[DeclaredRemoteActor],
996) -> Result<(), ClusterError> {
997    if owner == local_engine_id {
998        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
999    }
1000    if existing.iter().any(|existing| existing.path == *path) {
1001        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
1002    }
1003    if !peers.iter().any(|peer| peer.engine_id == *owner) {
1004        return Err(ClusterError::UnknownPeerOwner {
1005            path: path.clone(),
1006            owner: owner.clone(),
1007        });
1008    }
1009    Ok(())
1010}
1011
1012fn validate_declared_actor_registry<R: Reactor>(
1013    path: &ActorPath,
1014    owner: &EngineId,
1015    local_engine_id: &EngineId,
1016    existing: &[DeclaredRemoteActor],
1017    registry: &Arc<ActorRegistry<R>>,
1018) -> Result<(), ClusterError> {
1019    if owner == local_engine_id || registry.get_by_path(path).is_some() {
1020        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
1021    }
1022    if existing.iter().any(|existing| existing.path == *path) {
1023        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
1024    }
1025    Ok(())
1026}
1027
1028fn validate_role(
1029    role: &BoundedClusterRole,
1030    peers: &[PeerSpec],
1031    existing: &[BoundedClusterRole],
1032) -> Result<(), ClusterError> {
1033    if existing.iter().any(|existing| existing.name == role.name) {
1034        return Err(ClusterError::DuplicateRole(role.name.clone()));
1035    }
1036    if !peers.iter().any(|peer| peer.engine_id == role.owner) {
1037        return Err(ClusterError::UnknownRoleOwner {
1038            role: role.name.clone(),
1039            owner: role.owner.clone(),
1040        });
1041    }
1042    if !peers
1043        .iter()
1044        .any(|peer| peer.engine_id == role.owner && peer.control_plane_addr.is_some())
1045    {
1046        return Err(ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1047            role: role.name.clone(),
1048            owner: role.owner.clone(),
1049        });
1050    }
1051    Ok(())
1052}
1053
1054fn validate_placement_policy(
1055    policy: &BoundedPlacementPolicy,
1056    roles: &[BoundedClusterRole],
1057    existing: &[BoundedPlacementPolicy],
1058) -> Result<(), ClusterError> {
1059    if existing
1060        .iter()
1061        .any(|existing| existing.actor_class == policy.actor_class)
1062    {
1063        return Err(ClusterError::DuplicatePlacementClass(
1064            policy.actor_class.clone(),
1065        ));
1066    }
1067    if !roles.iter().any(|role| role.name == policy.role) {
1068        return Err(ClusterError::UnknownPlacementRole {
1069            actor_class: policy.actor_class.clone(),
1070            role: policy.role.clone(),
1071        });
1072    }
1073    Ok(())
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::{
1079        call_control_plane, validate_config, BoundedClusterConfig, BoundedClusterRole,
1080        BoundedPlacementPolicy, BoundedTransportConfig, ClusterError, DeclaredRemoteActor,
1081        PeerSpec, RemoteSpawnError, RemoteSpawnSpec, RemoteSupervisionState,
1082    };
1083    use crate::{Engine, EngineConfig};
1084    use palladium_actor::{
1085        Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
1086        Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
1087    };
1088    use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
1089    use rcgen::{
1090        BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
1091        KeyUsagePurpose,
1092    };
1093    use serde_json::json;
1094    use std::net::{SocketAddr, TcpListener, UdpSocket};
1095    use std::sync::Arc;
1096    use std::time::{Duration, Instant};
1097
1098    fn make_ca() -> (Certificate, KeyPair) {
1099        let mut params = CertificateParams::default();
1100        params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
1101        params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
1102        let ca_key = KeyPair::generate().expect("ca keypair");
1103        let cert = params.self_signed(&ca_key).expect("ca cert");
1104        (cert, ca_key)
1105    }
1106
1107    fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
1108        let keypair = KeyPair::generate().expect("leaf keypair");
1109        let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
1110        let mut params = params;
1111        params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
1112        params.extended_key_usages = vec![
1113            ExtendedKeyUsagePurpose::ServerAuth,
1114            ExtendedKeyUsagePurpose::ClientAuth,
1115        ];
1116        let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
1117        let cert_der = cert.der().to_vec();
1118        let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
1119        let ca_der = ca.der().to_vec();
1120        TlsConfig {
1121            cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
1122            private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
1123            trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
1124        }
1125    }
1126
1127    fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
1128        TcpTransportConfig {
1129            engine_id: EngineId::new(engine_id),
1130            listen_addr: "127.0.0.1:0".parse().unwrap(),
1131            max_connections_per_peer: 1,
1132            idle_timeout: Duration::from_secs(1),
1133            send_buffer_size: 8,
1134            nodelay: true,
1135            tls,
1136            send_delay: Duration::from_millis(0),
1137        }
1138    }
1139
1140    fn reserve_tcp_addr() -> SocketAddr {
1141        let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
1142        let addr = listener.local_addr().expect("tcp local addr");
1143        drop(listener);
1144        addr
1145    }
1146
1147    fn reserve_udp_addr() -> SocketAddr {
1148        let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
1149        let addr = socket.local_addr().expect("udp local addr");
1150        drop(socket);
1151        addr
1152    }
1153
1154    #[test]
1155    fn bounded_cluster_config_rejects_duplicate_peers() {
1156        let (ca, ca_key) = make_ca();
1157        let engine = Engine::with_config(EngineConfig {
1158            engine_id: EngineId::new("engine-a.example"),
1159            ..Default::default()
1160        });
1161        let config = BoundedClusterConfig {
1162            transport: BoundedTransportConfig::Tcp(tcp_config(
1163                "engine-a.example",
1164                make_tls_config("engine-a.example", &ca, &ca_key),
1165            )),
1166            peers: vec![
1167                PeerSpec {
1168                    engine_id: EngineId::new("engine-b.example"),
1169                    addr: "127.0.0.1:4100".parse().unwrap(),
1170                    control_plane_addr: None,
1171                    server_name: None,
1172                },
1173                PeerSpec {
1174                    engine_id: EngineId::new("engine-b.example"),
1175                    addr: "127.0.0.1:4200".parse().unwrap(),
1176                    control_plane_addr: None,
1177                    server_name: None,
1178                },
1179            ],
1180            declared_actors: Vec::new(),
1181            roles: Vec::new(),
1182            placement_policies: Vec::new(),
1183        };
1184
1185        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1186        assert_eq!(
1187            err,
1188            ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
1189        );
1190    }
1191
1192    #[test]
1193    fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
1194        let (ca, ca_key) = make_ca();
1195        let engine = Engine::with_config(EngineConfig {
1196            engine_id: EngineId::new("engine-a.example"),
1197            ..Default::default()
1198        });
1199        let config = BoundedClusterConfig {
1200            transport: BoundedTransportConfig::Tcp(tcp_config(
1201                "engine-a.example",
1202                make_tls_config("engine-a.example", &ca, &ca_key),
1203            )),
1204            peers: vec![PeerSpec {
1205                engine_id: EngineId::new("engine-b.example"),
1206                addr: "127.0.0.1:4100".parse().unwrap(),
1207                control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1208                server_name: None,
1209            }],
1210            declared_actors: vec![DeclaredRemoteActor {
1211                path: ActorPath::parse("/user/missing-owner").unwrap(),
1212                owner: EngineId::new("engine-c.example"),
1213            }],
1214            roles: Vec::new(),
1215            placement_policies: Vec::new(),
1216        };
1217
1218        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1219        assert_eq!(
1220            err,
1221            ClusterError::UnknownPeerOwner {
1222                path: ActorPath::parse("/user/missing-owner").unwrap(),
1223                owner: EngineId::new("engine-c.example"),
1224            }
1225        );
1226    }
1227
1228    #[test]
1229    fn bounded_cluster_config_rejects_duplicate_roles() {
1230        let (ca, ca_key) = make_ca();
1231        let engine = Engine::with_config(EngineConfig {
1232            engine_id: EngineId::new("engine-a.example"),
1233            ..Default::default()
1234        });
1235        let config = BoundedClusterConfig {
1236            transport: BoundedTransportConfig::Tcp(tcp_config(
1237                "engine-a.example",
1238                make_tls_config("engine-a.example", &ca, &ca_key),
1239            )),
1240            peers: vec![PeerSpec {
1241                engine_id: EngineId::new("engine-b.example"),
1242                addr: "127.0.0.1:4100".parse().unwrap(),
1243                control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1244                server_name: None,
1245            }],
1246            declared_actors: Vec::new(),
1247            roles: vec![
1248                BoundedClusterRole {
1249                    name: "context-service".to_string(),
1250                    owner: EngineId::new("engine-b.example"),
1251                },
1252                BoundedClusterRole {
1253                    name: "context-service".to_string(),
1254                    owner: EngineId::new("engine-b.example"),
1255                },
1256            ],
1257            placement_policies: Vec::new(),
1258        };
1259
1260        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1261        assert_eq!(
1262            err,
1263            ClusterError::DuplicateRole("context-service".to_string())
1264        );
1265    }
1266
1267    #[test]
1268    fn bounded_cluster_config_rejects_unknown_role_owner() {
1269        let (ca, ca_key) = make_ca();
1270        let engine = Engine::with_config(EngineConfig {
1271            engine_id: EngineId::new("engine-a.example"),
1272            ..Default::default()
1273        });
1274        let config = BoundedClusterConfig {
1275            transport: BoundedTransportConfig::Tcp(tcp_config(
1276                "engine-a.example",
1277                make_tls_config("engine-a.example", &ca, &ca_key),
1278            )),
1279            peers: vec![PeerSpec {
1280                engine_id: EngineId::new("engine-b.example"),
1281                addr: "127.0.0.1:4100".parse().unwrap(),
1282                control_plane_addr: None,
1283                server_name: None,
1284            }],
1285            declared_actors: Vec::new(),
1286            roles: vec![BoundedClusterRole {
1287                name: "gateway".to_string(),
1288                owner: EngineId::new("engine-c.example"),
1289            }],
1290            placement_policies: Vec::new(),
1291        };
1292
1293        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1294        assert_eq!(
1295            err,
1296            ClusterError::UnknownRoleOwner {
1297                role: "gateway".to_string(),
1298                owner: EngineId::new("engine-c.example"),
1299            }
1300        );
1301    }
1302
1303    #[test]
1304    fn bounded_cluster_config_rejects_role_owner_without_control_plane() {
1305        let (ca, ca_key) = make_ca();
1306        let config = BoundedClusterConfig {
1307            transport: BoundedTransportConfig::Tcp(tcp_config(
1308                "engine-a.example",
1309                make_tls_config("engine-a.example", &ca, &ca_key),
1310            )),
1311            peers: vec![PeerSpec {
1312                engine_id: EngineId::new("engine-b.example"),
1313                addr: "127.0.0.1:4100".parse().unwrap(),
1314                control_plane_addr: None,
1315                server_name: None,
1316            }],
1317            declared_actors: Vec::new(),
1318            roles: vec![BoundedClusterRole {
1319                name: "session-host".to_string(),
1320                owner: EngineId::new("engine-b.example"),
1321            }],
1322            placement_policies: Vec::new(),
1323        };
1324
1325        let err = config.validate().unwrap_err();
1326        assert_eq!(
1327            err,
1328            ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1329                role: "session-host".to_string(),
1330                owner: EngineId::new("engine-b.example"),
1331            }
1332        );
1333    }
1334
1335    #[test]
1336    fn bounded_cluster_config_rejects_duplicate_placement_classes() {
1337        let (ca, ca_key) = make_ca();
1338        let config = BoundedClusterConfig {
1339            transport: BoundedTransportConfig::Tcp(tcp_config(
1340                "engine-a.example",
1341                make_tls_config("engine-a.example", &ca, &ca_key),
1342            )),
1343            peers: vec![PeerSpec {
1344                engine_id: EngineId::new("engine-b.example"),
1345                addr: "127.0.0.1:4100".parse().unwrap(),
1346                control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1347                server_name: None,
1348            }],
1349            declared_actors: Vec::new(),
1350            roles: vec![BoundedClusterRole {
1351                name: "context-service".to_string(),
1352                owner: EngineId::new("engine-b.example"),
1353            }],
1354            placement_policies: vec![
1355                BoundedPlacementPolicy {
1356                    actor_class: "SessionActor".to_string(),
1357                    role: "context-service".to_string(),
1358                },
1359                BoundedPlacementPolicy {
1360                    actor_class: "SessionActor".to_string(),
1361                    role: "context-service".to_string(),
1362                },
1363            ],
1364        };
1365
1366        let err = config.validate().unwrap_err();
1367        assert_eq!(
1368            err,
1369            ClusterError::DuplicatePlacementClass("SessionActor".to_string())
1370        );
1371    }
1372
1373    #[test]
1374    fn bounded_cluster_config_rejects_unknown_placement_role() {
1375        let (ca, ca_key) = make_ca();
1376        let config = BoundedClusterConfig {
1377            transport: BoundedTransportConfig::Tcp(tcp_config(
1378                "engine-a.example",
1379                make_tls_config("engine-a.example", &ca, &ca_key),
1380            )),
1381            peers: vec![PeerSpec {
1382                engine_id: EngineId::new("engine-b.example"),
1383                addr: "127.0.0.1:4100".parse().unwrap(),
1384                control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1385                server_name: None,
1386            }],
1387            declared_actors: Vec::new(),
1388            roles: vec![BoundedClusterRole {
1389                name: "context-service".to_string(),
1390                owner: EngineId::new("engine-b.example"),
1391            }],
1392            placement_policies: vec![BoundedPlacementPolicy {
1393                actor_class: "SessionActor".to_string(),
1394                role: "missing-role".to_string(),
1395            }],
1396        };
1397
1398        let err = config.validate().unwrap_err();
1399        assert_eq!(
1400            err,
1401            ClusterError::UnknownPlacementRole {
1402                actor_class: "SessionActor".to_string(),
1403                role: "missing-role".to_string(),
1404            }
1405        );
1406    }
1407
1408    #[tokio::test]
1409    async fn attach_bounded_cluster_rejects_role_owner_without_control_plane() {
1410        let (ca, ca_key) = make_ca();
1411        let engine = Engine::with_config(EngineConfig {
1412            engine_id: EngineId::new("engine-a.example"),
1413            ..Default::default()
1414        });
1415
1416        let err = match engine
1417            .handle()
1418            .attach_bounded_cluster(BoundedClusterConfig {
1419                transport: BoundedTransportConfig::Tcp(tcp_config(
1420                    "engine-a.example",
1421                    make_tls_config("engine-a.example", &ca, &ca_key),
1422                )),
1423                peers: vec![PeerSpec {
1424                    engine_id: EngineId::new("engine-b.example"),
1425                    addr: "127.0.0.1:4100".parse().unwrap(),
1426                    control_plane_addr: None,
1427                    server_name: None,
1428                }],
1429                declared_actors: Vec::new(),
1430                roles: vec![BoundedClusterRole {
1431                    name: "session-host".to_string(),
1432                    owner: EngineId::new("engine-b.example"),
1433                }],
1434                placement_policies: Vec::new(),
1435            })
1436            .await
1437        {
1438            Ok(_) => panic!("expected bounded cluster attach to reject missing control plane"),
1439            Err(err) => err,
1440        };
1441
1442        assert_eq!(
1443            err,
1444            ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1445                role: "session-host".to_string(),
1446                owner: EngineId::new("engine-b.example"),
1447            }
1448        );
1449    }
1450
1451    #[test]
1452    fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
1453        assert_eq!(
1454            super::remote_spawn_wait_timeout(Duration::from_millis(25)),
1455            Duration::from_secs(5)
1456        );
1457        assert_eq!(
1458            super::remote_spawn_wait_timeout(Duration::from_secs(8)),
1459            Duration::from_secs(8)
1460        );
1461    }
1462
1463    fn tcp_tests_available() -> bool {
1464        if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
1465            return false;
1466        }
1467        std::net::TcpListener::bind("127.0.0.1:0").is_ok()
1468    }
1469
1470    fn quic_tests_available() -> bool {
1471        super::ensure_rustls_provider();
1472        rustls::crypto::CryptoProvider::get_default().is_some()
1473    }
1474
1475    async fn wait_for_supervision_state<R, N, F>(
1476        cluster: &super::BoundedClusterHandle<R, N, F>,
1477        path: &ActorPath,
1478        expected: RemoteSupervisionState,
1479    ) where
1480        R: crate::reactor::Reactor + Clone + Send + Sync,
1481        N: palladium_transport::network::Network + Clone,
1482        F: crate::fs::FileSystem + Clone,
1483    {
1484        let deadline = Instant::now() + Duration::from_secs(5);
1485        loop {
1486            let status = cluster
1487                .remote_supervision_status(path)
1488                .await
1489                .expect("supervision status");
1490            if status.state == expected {
1491                return;
1492            }
1493            if Instant::now() > deadline {
1494                panic!(
1495                    "supervision status did not reach expected state: got {:?}, expected {:?}",
1496                    status.state, expected
1497                );
1498            }
1499            tokio::time::sleep(Duration::from_millis(25)).await;
1500        }
1501    }
1502
1503    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1504    struct EchoRequest {
1505        value: u64,
1506    }
1507
1508    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1509    struct EchoResponse {
1510        doubled: u64,
1511    }
1512
1513    impl Message for EchoRequest {
1514        type Response = EchoResponse;
1515        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
1516    }
1517
1518    struct EchoActor;
1519
1520    impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
1521        fn on_message(
1522            &mut self,
1523            ctx: &mut ActorContext<R>,
1524            envelope: &Envelope,
1525            payload: MessagePayload,
1526        ) -> Result<(), ActorError> {
1527            let req = payload
1528                .extract::<EchoRequest>()
1529                .map_err(|_| ActorError::Handler)?;
1530            let resp = EchoResponse {
1531                doubled: req.value * 2,
1532            };
1533            let resp_env = envelope.response(0);
1534            ctx.send_raw(resp_env, MessagePayload::local(resp))
1535                .map_err(|_| ActorError::Handler)
1536        }
1537    }
1538
1539    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1540    struct CrashRequest;
1541
1542    impl Message for CrashRequest {
1543        type Response = ();
1544        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
1545    }
1546
1547    struct SpawnedEchoActor;
1548
1549    impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
1550        fn on_message(
1551            &mut self,
1552            ctx: &mut ActorContext<R>,
1553            envelope: &Envelope,
1554            payload: MessagePayload,
1555        ) -> Result<(), ActorError> {
1556            match envelope.type_tag {
1557                EchoRequest::TYPE_TAG => {
1558                    let req = payload
1559                        .extract::<EchoRequest>()
1560                        .map_err(|_| ActorError::Handler)?;
1561                    let resp = EchoResponse {
1562                        doubled: req.value * 2,
1563                    };
1564                    let resp_env = envelope.response(0);
1565                    ctx.send_raw(resp_env, MessagePayload::local(resp))
1566                        .map_err(|_| ActorError::Handler)
1567                }
1568                CrashRequest::TYPE_TAG => Err(ActorError::Handler),
1569                _ => Err(ActorError::Handler),
1570            }
1571        }
1572    }
1573
1574    fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
1575        Arc::new(|type_name: &str, _config: &[u8]| match type_name {
1576            "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
1577            _ => Err(format!("unknown remote spawn type: {type_name}")),
1578        })
1579    }
1580
1581    #[tokio::test]
1582    async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
1583        if !tcp_tests_available() {
1584            eprintln!("skipping tcp integration tests: network bind not permitted");
1585            return;
1586        }
1587
1588        let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
1589        let (ca, ca_key) = make_ca();
1590        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1591        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1592
1593        let mut engine_a = Engine::with_config(EngineConfig {
1594            engine_id: EngineId::new("engine-a.example"),
1595            ..Default::default()
1596        });
1597        let engine_b = Engine::with_config(EngineConfig {
1598            engine_id: EngineId::new("engine-b.example"),
1599            ..Default::default()
1600        });
1601
1602        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1603        engine_a.add_user_actor(ChildSpec::new(
1604            "cluster-echo",
1605            RestartPolicy::Permanent,
1606            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1607            ns,
1608            move || Box::new(EchoActor),
1609        ));
1610
1611        let handle_a = engine_a.handle();
1612        let handle_b = engine_b.handle();
1613        handle_a
1614            .type_registry()
1615            .register_remote_ask::<EchoRequest>();
1616        handle_b
1617            .type_registry()
1618            .register_remote_ask::<EchoRequest>();
1619
1620        let cluster_a = handle_a
1621            .attach_bounded_cluster(BoundedClusterConfig {
1622                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1623                peers: Vec::new(),
1624                declared_actors: Vec::new(),
1625                roles: Vec::new(),
1626                placement_policies: Vec::new(),
1627            })
1628            .await
1629            .expect("attach bounded cluster a");
1630        let tcp_a = cluster_a
1631            .transport()
1632            .as_tcp()
1633            .cloned()
1634            .expect("tcp transport a");
1635
1636        let cluster_b = handle_b
1637            .attach_bounded_cluster(BoundedClusterConfig {
1638                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1639                peers: vec![PeerSpec {
1640                    engine_id: EngineId::new("engine-a.example"),
1641                    addr: tcp_a.local_addr(),
1642                    control_plane_addr: None,
1643                    server_name: None,
1644                }],
1645                declared_actors: Vec::new(),
1646                roles: Vec::new(),
1647                placement_policies: Vec::new(),
1648            })
1649            .await
1650            .expect("attach bounded cluster b");
1651        let tcp_b = cluster_b
1652            .transport()
1653            .as_tcp()
1654            .cloned()
1655            .expect("tcp transport b");
1656
1657        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1658        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1659        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1660
1661        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1662        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1663
1664        let canonical = AddrHash::new(&actor_path, 0);
1665        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1666        while !cluster_b.can_route(&actor_path) {
1667            if std::time::Instant::now() > deadline {
1668                panic!("bounded cluster route did not propagate");
1669            }
1670            tokio::time::sleep(Duration::from_millis(20)).await;
1671        }
1672        assert!(tcp_b.can_route(canonical));
1673
1674        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1675        let response = remote
1676            .ask(EchoRequest { value: 12 })
1677            .await
1678            .expect("bounded cluster ask");
1679        assert_eq!(response, EchoResponse { doubled: 24 });
1680
1681        shutdown_a_tx.send(()).ok();
1682        shutdown_b_tx.send(()).ok();
1683    }
1684
1685    #[tokio::test]
1686    async fn bounded_cluster_add_peer_after_start_backfills_existing_tcp_routes() {
1687        if !tcp_tests_available() {
1688            eprintln!("skipping tcp integration tests: network bind not permitted");
1689            return;
1690        }
1691
1692        let actor_path = ActorPath::parse("/user/cluster-echo-add-peer").unwrap();
1693        let (ca, ca_key) = make_ca();
1694        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1695        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1696
1697        let mut engine_a = Engine::with_config(EngineConfig {
1698            engine_id: EngineId::new("engine-a.example"),
1699            ..Default::default()
1700        });
1701        let engine_b = Engine::with_config(EngineConfig {
1702            engine_id: EngineId::new("engine-b.example"),
1703            ..Default::default()
1704        });
1705
1706        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1707        engine_a.add_user_actor(ChildSpec::new(
1708            "cluster-echo-add-peer",
1709            RestartPolicy::Permanent,
1710            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1711            ns,
1712            move || Box::new(EchoActor),
1713        ));
1714
1715        let handle_a = engine_a.handle();
1716        let handle_b = engine_b.handle();
1717        handle_a
1718            .type_registry()
1719            .register_remote_ask::<EchoRequest>();
1720        handle_b
1721            .type_registry()
1722            .register_remote_ask::<EchoRequest>();
1723
1724        let mut cluster_a = handle_a
1725            .attach_bounded_cluster(BoundedClusterConfig {
1726                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1727                peers: Vec::new(),
1728                declared_actors: Vec::new(),
1729                roles: Vec::new(),
1730                placement_policies: Vec::new(),
1731            })
1732            .await
1733            .expect("attach bounded cluster a");
1734        let tcp_a = cluster_a
1735            .transport()
1736            .as_tcp()
1737            .cloned()
1738            .expect("tcp transport a");
1739
1740        let cluster_b = handle_b
1741            .attach_bounded_cluster(BoundedClusterConfig {
1742                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1743                peers: vec![PeerSpec {
1744                    engine_id: EngineId::new("engine-a.example"),
1745                    addr: tcp_a.local_addr(),
1746                    control_plane_addr: None,
1747                    server_name: None,
1748                }],
1749                declared_actors: Vec::new(),
1750                roles: Vec::new(),
1751                placement_policies: Vec::new(),
1752            })
1753            .await
1754            .expect("attach bounded cluster b");
1755        let tcp_b = cluster_b
1756            .transport()
1757            .as_tcp()
1758            .cloned()
1759            .expect("tcp transport b");
1760
1761        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1762        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1763
1764        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1765        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1766
1767        let local_ready_deadline = std::time::Instant::now() + Duration::from_secs(5);
1768        while handle_a.registry.get_by_path(&actor_path).is_none() {
1769            if std::time::Instant::now() > local_ready_deadline {
1770                panic!("local actor did not start before peer add");
1771            }
1772            tokio::time::sleep(Duration::from_millis(20)).await;
1773        }
1774
1775        cluster_a
1776            .add_peer(PeerSpec {
1777                engine_id: EngineId::new("engine-b.example"),
1778                addr: tcp_b.local_addr(),
1779                control_plane_addr: None,
1780                server_name: None,
1781            })
1782            .expect("add peer b to cluster a after start");
1783
1784        let canonical = AddrHash::new(&actor_path, 0);
1785        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1786        while !cluster_b.can_route(&actor_path) {
1787            if std::time::Instant::now() > deadline {
1788                panic!("bounded cluster route did not propagate after add_peer");
1789            }
1790            tokio::time::sleep(Duration::from_millis(20)).await;
1791        }
1792        assert!(tcp_b.can_route(canonical));
1793
1794        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1795        let response = remote
1796            .ask(EchoRequest { value: 15 })
1797            .await
1798            .expect("bounded cluster ask after add_peer");
1799        assert_eq!(response, EchoResponse { doubled: 30 });
1800
1801        shutdown_a_tx.send(()).ok();
1802        shutdown_b_tx.send(()).ok();
1803    }
1804
1805    #[tokio::test]
1806    async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1807        if !tcp_tests_available() {
1808            eprintln!("skipping tcp integration tests: network bind not permitted");
1809            return;
1810        }
1811
1812        let (ca, ca_key) = make_ca();
1813        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1814        let engine = Engine::with_config(EngineConfig {
1815            engine_id: EngineId::new("engine-a.example"),
1816            ..Default::default()
1817        });
1818        let handle = engine.handle();
1819
1820        let mut cluster = handle
1821            .attach_bounded_cluster(BoundedClusterConfig {
1822                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1823                peers: vec![PeerSpec {
1824                    engine_id: EngineId::new("engine-b.example"),
1825                    addr: "127.0.0.1:4100".parse().unwrap(),
1826                    control_plane_addr: None,
1827                    server_name: None,
1828                }],
1829                declared_actors: Vec::new(),
1830                roles: Vec::new(),
1831                placement_policies: Vec::new(),
1832            })
1833            .await
1834            .expect("attach bounded cluster");
1835
1836        let path = ActorPath::parse("/user/declared-remote").unwrap();
1837        cluster
1838            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1839            .expect("declare remote actor");
1840
1841        assert!(cluster.can_route(&path));
1842        assert!(cluster
1843            .transport()
1844            .as_tcp()
1845            .expect("tcp transport")
1846            .can_route(AddrHash::new(&path, 0)));
1847    }
1848
1849    #[tokio::test]
1850    async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1851        if !tcp_tests_available() {
1852            eprintln!("skipping tcp integration tests: network bind not permitted");
1853            return;
1854        }
1855
1856        let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1857        let (ca, ca_key) = make_ca();
1858        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1859        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1860        let control_plane_b = reserve_tcp_addr();
1861
1862        let mut engine_a = Engine::with_config(EngineConfig {
1863            engine_id: EngineId::new("engine-a.example"),
1864            ..Default::default()
1865        });
1866        let engine_b = Engine::with_config(EngineConfig {
1867            engine_id: EngineId::new("engine-b.example"),
1868            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1869            control_plane_tls: Some(tls_b.clone()),
1870            actor_spawn: Some(bounded_spawn_handler()),
1871            ..Default::default()
1872        });
1873
1874        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1875        engine_a.add_user_actor(ChildSpec::new(
1876            "local-remote-collision",
1877            RestartPolicy::Permanent,
1878            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1879            ns,
1880            move || Box::new(EchoActor),
1881        ));
1882
1883        let handle_a = engine_a.handle();
1884        let handle_b = engine_b.handle();
1885
1886        let cluster_b = handle_b
1887            .attach_bounded_cluster(BoundedClusterConfig {
1888                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1889                peers: Vec::new(),
1890                declared_actors: Vec::new(),
1891                roles: Vec::new(),
1892                placement_policies: Vec::new(),
1893            })
1894            .await
1895            .expect("attach bounded cluster b");
1896        let tcp_b = cluster_b
1897            .transport()
1898            .as_tcp()
1899            .cloned()
1900            .expect("tcp transport b");
1901
1902        let cluster_a = handle_a
1903            .attach_bounded_cluster(BoundedClusterConfig {
1904                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1905                peers: vec![PeerSpec {
1906                    engine_id: EngineId::new("engine-b.example"),
1907                    addr: tcp_b.local_addr(),
1908                    control_plane_addr: Some(control_plane_b),
1909                    server_name: None,
1910                }],
1911                declared_actors: Vec::new(),
1912                roles: vec![BoundedClusterRole {
1913                    name: "context-service".to_string(),
1914                    owner: EngineId::new("engine-b.example"),
1915                }],
1916                placement_policies: Vec::new(),
1917            })
1918            .await
1919            .expect("attach bounded cluster a");
1920
1921        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1922        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1923
1924        let deadline = Instant::now() + Duration::from_secs(5);
1925        while handle_a.registry.get_by_path(&actor_path).is_none() {
1926            if Instant::now() > deadline {
1927                panic!("local collision actor did not start");
1928            }
1929            tokio::time::sleep(Duration::from_millis(20)).await;
1930        }
1931
1932        let err = cluster_a
1933            .spawn_remote_on_role::<EchoRequest>(
1934                "context-service",
1935                RemoteSpawnSpec {
1936                    path: actor_path.clone(),
1937                    type_name: "bounded-echo".to_string(),
1938                    config: None,
1939                },
1940            )
1941            .await
1942            .expect_err("local path collision should fail");
1943        assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1944
1945        shutdown_a_tx.send(()).ok();
1946    }
1947
1948    #[tokio::test]
1949    async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1950        if !tcp_tests_available() {
1951            eprintln!("skipping tcp integration tests: network bind not permitted");
1952            return;
1953        }
1954
1955        let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1956        let (ca, ca_key) = make_ca();
1957        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1958        let engine = Engine::with_config(EngineConfig {
1959            engine_id: EngineId::new("engine-a.example"),
1960            ..Default::default()
1961        });
1962        let handle = engine.handle();
1963
1964        let cluster = handle
1965            .attach_bounded_cluster(BoundedClusterConfig {
1966                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1967                peers: vec![
1968                    PeerSpec {
1969                        engine_id: EngineId::new("engine-b.example"),
1970                        addr: "127.0.0.1:4100".parse().unwrap(),
1971                        control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1972                        server_name: None,
1973                    },
1974                    PeerSpec {
1975                        engine_id: EngineId::new("engine-c.example"),
1976                        addr: "127.0.0.1:4200".parse().unwrap(),
1977                        control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1978                        server_name: None,
1979                    },
1980                ],
1981                declared_actors: vec![DeclaredRemoteActor {
1982                    path: actor_path.clone(),
1983                    owner: EngineId::new("engine-b.example"),
1984                }],
1985                roles: Vec::new(),
1986                placement_policies: Vec::new(),
1987            })
1988            .await
1989            .expect("attach bounded cluster");
1990
1991        let err = cluster
1992            .spawn_remote::<EchoRequest>(
1993                EngineId::new("engine-c.example"),
1994                RemoteSpawnSpec {
1995                    path: actor_path.clone(),
1996                    type_name: "bounded-echo".to_string(),
1997                    config: None,
1998                },
1999            )
2000            .await
2001            .expect_err("conflicting declared owner should fail");
2002        assert_eq!(
2003            err,
2004            RemoteSpawnError::RemoteOwnershipConflict {
2005                path: actor_path,
2006                owner: EngineId::new("engine-b.example"),
2007            }
2008        );
2009    }
2010
2011    #[tokio::test]
2012    async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
2013        if !tcp_tests_available() {
2014            eprintln!("skipping tcp integration tests: network bind not permitted");
2015            return;
2016        }
2017
2018        let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
2019        let (ca, ca_key) = make_ca();
2020        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2021        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2022        let control_plane_b = reserve_tcp_addr();
2023
2024        let engine_a = Engine::with_config(EngineConfig {
2025            engine_id: EngineId::new("engine-a.example"),
2026            ..Default::default()
2027        });
2028        let engine_b = Engine::with_config(EngineConfig {
2029            engine_id: EngineId::new("engine-b.example"),
2030            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2031            control_plane_tls: Some(tls_b.clone()),
2032            actor_spawn: Some(bounded_spawn_handler()),
2033            ..Default::default()
2034        });
2035
2036        let handle_a = engine_a.handle();
2037        let handle_b = engine_b.handle();
2038        handle_a
2039            .type_registry()
2040            .register_remote_ask::<EchoRequest>();
2041        handle_b
2042            .type_registry()
2043            .register_remote_ask::<EchoRequest>();
2044
2045        let cluster_a = handle_a
2046            .attach_bounded_cluster(BoundedClusterConfig {
2047                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2048                peers: Vec::new(),
2049                declared_actors: Vec::new(),
2050                roles: Vec::new(),
2051                placement_policies: Vec::new(),
2052            })
2053            .await
2054            .expect("attach bounded cluster a");
2055        let tcp_a = cluster_a
2056            .transport()
2057            .as_tcp()
2058            .cloned()
2059            .expect("tcp transport a");
2060
2061        let cluster_b = handle_b
2062            .attach_bounded_cluster(BoundedClusterConfig {
2063                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2064                peers: vec![PeerSpec {
2065                    engine_id: EngineId::new("engine-a.example"),
2066                    addr: tcp_a.local_addr(),
2067                    control_plane_addr: None,
2068                    server_name: None,
2069                }],
2070                declared_actors: Vec::new(),
2071                roles: Vec::new(),
2072                placement_policies: Vec::new(),
2073            })
2074            .await
2075            .expect("attach bounded cluster b");
2076        let tcp_b = cluster_b
2077            .transport()
2078            .as_tcp()
2079            .cloned()
2080            .expect("tcp transport b");
2081
2082        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
2083        let mut cluster_a = cluster_a;
2084        cluster_a
2085            .add_peer(PeerSpec {
2086                engine_id: EngineId::new("engine-b.example"),
2087                addr: tcp_b.local_addr(),
2088                control_plane_addr: Some(control_plane_b),
2089                server_name: None,
2090            })
2091            .expect("add peer with control plane");
2092
2093        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2094        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2095
2096        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2097        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2098        tokio::time::sleep(Duration::from_millis(150)).await;
2099
2100        let remote = cluster_a
2101            .spawn_remote::<EchoRequest>(
2102                EngineId::new("engine-b.example"),
2103                RemoteSpawnSpec {
2104                    path: actor_path.clone(),
2105                    type_name: "bounded-echo".to_string(),
2106                    config: None,
2107                },
2108            )
2109            .await
2110            .expect("spawn remote actor");
2111
2112        let status = cluster_a
2113            .remote_supervision_status(&actor_path)
2114            .await
2115            .expect("running supervision status");
2116        assert_eq!(status.supervisor, EngineId::new("engine-b.example"));
2117        assert_eq!(
2118            status.state,
2119            RemoteSupervisionState::Running { restart_count: 0 }
2120        );
2121
2122        let response = remote
2123            .ask(EchoRequest { value: 21 })
2124            .await
2125            .expect("bounded remote spawn ask");
2126        assert_eq!(response, EchoResponse { doubled: 42 });
2127
2128        let duplicate = cluster_a
2129            .spawn_remote::<EchoRequest>(
2130                EngineId::new("engine-b.example"),
2131                RemoteSpawnSpec {
2132                    path: actor_path.clone(),
2133                    type_name: "bounded-echo".to_string(),
2134                    config: None,
2135                },
2136            )
2137            .await
2138            .expect_err("duplicate remote spawn should fail");
2139        assert_eq!(
2140            duplicate,
2141            RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
2142        );
2143
2144        let old_host_addr = handle_b
2145            .registry
2146            .get_by_path(&actor_path)
2147            .expect("spawned actor registered on host")
2148            .addr;
2149        call_control_plane(
2150            &cluster_a.control_plane,
2151            control_plane_b,
2152            "engine-b.example",
2153            "actor.restart",
2154            json!({ "path": actor_path.as_str() }),
2155        )
2156        .await
2157        .expect("restart remote actor on host");
2158
2159        let restarting_deadline = Instant::now() + Duration::from_secs(5);
2160        loop {
2161            match cluster_a.remote_supervision_status(&actor_path).await {
2162                Ok(status)
2163                    if status.supervisor == EngineId::new("engine-b.example")
2164                        && matches!(
2165                            status.state,
2166                            RemoteSupervisionState::Restarting { restart_count: 1 }
2167                        ) =>
2168                {
2169                    break;
2170                }
2171                Ok(_) | Err(_) if Instant::now() <= restarting_deadline => {
2172                    tokio::time::sleep(Duration::from_millis(25)).await;
2173                }
2174                Ok(status) => panic!("remote actor did not enter restarting state: {status:?}"),
2175                Err(err) => panic!("remote actor restart status query failed: {err:?}"),
2176            }
2177        }
2178
2179        let restart_deadline = Instant::now() + Duration::from_secs(5);
2180        loop {
2181            if handle_b
2182                .registry
2183                .get_by_path(&actor_path)
2184                .map(|slot| {
2185                    slot.running.load(std::sync::atomic::Ordering::Relaxed)
2186                        && slot.addr != old_host_addr
2187                        && slot
2188                            .restart_count
2189                            .load(std::sync::atomic::Ordering::Relaxed)
2190                            > 0
2191                })
2192                .unwrap_or(false)
2193            {
2194                break;
2195            }
2196            if Instant::now() > restart_deadline {
2197                panic!("remote actor did not restart on host");
2198            }
2199            tokio::time::sleep(Duration::from_millis(25)).await;
2200        }
2201
2202        let restart_deadline = Instant::now() + Duration::from_secs(5);
2203        loop {
2204            if cluster_a.can_route(&actor_path) {
2205                break;
2206            }
2207            if Instant::now() > restart_deadline {
2208                panic!("remote actor route did not return after restart");
2209            }
2210            tokio::time::sleep(Duration::from_millis(25)).await;
2211        }
2212
2213        let restart_deadline = Instant::now() + Duration::from_secs(5);
2214        loop {
2215            match remote.ask(EchoRequest { value: 7 }).await {
2216                Ok(response) => {
2217                    assert_eq!(response, EchoResponse { doubled: 14 });
2218                    break;
2219                }
2220                Err(_) if Instant::now() <= restart_deadline => {
2221                    tokio::time::sleep(Duration::from_millis(25)).await;
2222                }
2223                Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
2224            }
2225        }
2226
2227        let restart_status = cluster_a
2228            .remote_supervision_status(&actor_path)
2229            .await
2230            .expect("restart supervision status");
2231        assert_eq!(restart_status.supervisor, EngineId::new("engine-b.example"));
2232        assert_eq!(
2233            restart_status.state,
2234            RemoteSupervisionState::Running { restart_count: 1 }
2235        );
2236
2237        let unknown = cluster_a
2238            .spawn_remote::<EchoRequest>(
2239                EngineId::new("engine-c.example"),
2240                RemoteSpawnSpec {
2241                    path: ActorPath::parse("/user/unknown-target").unwrap(),
2242                    type_name: "bounded-echo".to_string(),
2243                    config: None,
2244                },
2245            )
2246            .await
2247            .expect_err("unknown target should fail");
2248        assert_eq!(
2249            unknown,
2250            RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
2251        );
2252
2253        shutdown_a_tx.send(()).ok();
2254        shutdown_b_tx.send(()).ok();
2255    }
2256
2257    #[tokio::test]
2258    async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
2259        if !tcp_tests_available() {
2260            eprintln!("skipping tcp integration tests: network bind not permitted");
2261            return;
2262        }
2263
2264        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
2265        let (ca, ca_key) = make_ca();
2266        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2267        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2268        let control_plane_b = reserve_tcp_addr();
2269
2270        let engine_a = Engine::with_config(EngineConfig {
2271            engine_id: EngineId::new("engine-a.example"),
2272            ..Default::default()
2273        });
2274        let engine_b = Engine::with_config(EngineConfig {
2275            engine_id: EngineId::new("engine-b.example"),
2276            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2277            control_plane_tls: Some(tls_b.clone()),
2278            actor_spawn: Some(bounded_spawn_handler()),
2279            ..Default::default()
2280        });
2281
2282        let handle_a = engine_a.handle();
2283        let handle_b = engine_b.handle();
2284        handle_a
2285            .type_registry()
2286            .register_remote_ask::<EchoRequest>();
2287        handle_b
2288            .type_registry()
2289            .register_remote_ask::<EchoRequest>();
2290
2291        let cluster_b = handle_b
2292            .attach_bounded_cluster(BoundedClusterConfig {
2293                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2294                peers: Vec::new(),
2295                declared_actors: Vec::new(),
2296                roles: Vec::new(),
2297                placement_policies: Vec::new(),
2298            })
2299            .await
2300            .expect("attach bounded cluster b");
2301        let tcp_b = cluster_b
2302            .transport()
2303            .as_tcp()
2304            .cloned()
2305            .expect("tcp transport b");
2306
2307        let cluster_a = handle_a
2308            .attach_bounded_cluster(BoundedClusterConfig {
2309                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2310                peers: vec![PeerSpec {
2311                    engine_id: EngineId::new("engine-b.example"),
2312                    addr: tcp_b.local_addr(),
2313                    control_plane_addr: Some(control_plane_b),
2314                    server_name: None,
2315                }],
2316                declared_actors: Vec::new(),
2317                roles: vec![BoundedClusterRole {
2318                    name: "context-service".to_string(),
2319                    owner: EngineId::new("engine-b.example"),
2320                }],
2321                placement_policies: Vec::new(),
2322            })
2323            .await
2324            .expect("attach bounded cluster a");
2325        let tcp_a = cluster_a
2326            .transport()
2327            .as_tcp()
2328            .cloned()
2329            .expect("tcp transport a");
2330
2331        let mut cluster_b = cluster_b;
2332        cluster_b
2333            .add_peer(PeerSpec {
2334                engine_id: EngineId::new("engine-a.example"),
2335                addr: tcp_a.local_addr(),
2336                control_plane_addr: None,
2337                server_name: None,
2338            })
2339            .expect("add peer a to cluster b");
2340
2341        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2342        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2343
2344        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2345        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2346        tokio::time::sleep(Duration::from_millis(150)).await;
2347
2348        let remote = cluster_a
2349            .spawn_remote_on_role::<EchoRequest>(
2350                "context-service",
2351                RemoteSpawnSpec {
2352                    path: actor_path.clone(),
2353                    type_name: "bounded-echo".to_string(),
2354                    config: None,
2355                },
2356            )
2357            .await
2358            .expect("spawn remote actor by role over tcp");
2359
2360        let response = remote
2361            .ask(EchoRequest { value: 11 })
2362            .await
2363            .expect("bounded remote spawn ask over tcp by role");
2364        assert_eq!(response, EchoResponse { doubled: 22 });
2365
2366        let unknown_role = cluster_a
2367            .spawn_remote_on_role::<EchoRequest>(
2368                "missing-role",
2369                RemoteSpawnSpec {
2370                    path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
2371                    type_name: "bounded-echo".to_string(),
2372                    config: None,
2373                },
2374            )
2375            .await
2376            .expect_err("unknown role should fail");
2377        assert_eq!(
2378            unknown_role,
2379            RemoteSpawnError::UnknownRole("missing-role".to_string())
2380        );
2381
2382        shutdown_a_tx.send(()).ok();
2383        shutdown_b_tx.send(()).ok();
2384    }
2385
2386    #[tokio::test]
2387    async fn bounded_cluster_spawn_remote_for_class_over_tcp_returns_typed_handle() {
2388        if !tcp_tests_available() {
2389            eprintln!("skipping tcp integration tests: network bind not permitted");
2390            return;
2391        }
2392
2393        let actor_path = ActorPath::parse("/user/remote-spawn-class-tcp").unwrap();
2394        let (ca, ca_key) = make_ca();
2395        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2396        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2397        let control_plane_b = reserve_tcp_addr();
2398
2399        let engine_a = Engine::with_config(EngineConfig {
2400            engine_id: EngineId::new("engine-a.example"),
2401            ..Default::default()
2402        });
2403        let engine_b = Engine::with_config(EngineConfig {
2404            engine_id: EngineId::new("engine-b.example"),
2405            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2406            control_plane_tls: Some(tls_b.clone()),
2407            actor_spawn: Some(bounded_spawn_handler()),
2408            ..Default::default()
2409        });
2410
2411        let handle_a = engine_a.handle();
2412        let handle_b = engine_b.handle();
2413        handle_a
2414            .type_registry()
2415            .register_remote_ask::<EchoRequest>();
2416        handle_b
2417            .type_registry()
2418            .register_remote_ask::<EchoRequest>();
2419
2420        let cluster_b = handle_b
2421            .attach_bounded_cluster(BoundedClusterConfig {
2422                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2423                peers: Vec::new(),
2424                declared_actors: Vec::new(),
2425                roles: Vec::new(),
2426                placement_policies: Vec::new(),
2427            })
2428            .await
2429            .expect("attach bounded cluster b");
2430        let tcp_b = cluster_b
2431            .transport()
2432            .as_tcp()
2433            .cloned()
2434            .expect("tcp transport b");
2435
2436        let cluster_a = handle_a
2437            .attach_bounded_cluster(BoundedClusterConfig {
2438                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2439                peers: vec![PeerSpec {
2440                    engine_id: EngineId::new("engine-b.example"),
2441                    addr: tcp_b.local_addr(),
2442                    control_plane_addr: Some(control_plane_b),
2443                    server_name: None,
2444                }],
2445                declared_actors: Vec::new(),
2446                roles: vec![BoundedClusterRole {
2447                    name: "context-service".to_string(),
2448                    owner: EngineId::new("engine-b.example"),
2449                }],
2450                placement_policies: vec![BoundedPlacementPolicy {
2451                    actor_class: "SessionActor".to_string(),
2452                    role: "context-service".to_string(),
2453                }],
2454            })
2455            .await
2456            .expect("attach bounded cluster a");
2457        let tcp_a = cluster_a
2458            .transport()
2459            .as_tcp()
2460            .cloned()
2461            .expect("tcp transport a");
2462
2463        let mut cluster_b = cluster_b;
2464        cluster_b
2465            .add_peer(PeerSpec {
2466                engine_id: EngineId::new("engine-a.example"),
2467                addr: tcp_a.local_addr(),
2468                control_plane_addr: None,
2469                server_name: None,
2470            })
2471            .expect("add peer a to cluster b");
2472
2473        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2474        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2475
2476        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2477        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2478        tokio::time::sleep(Duration::from_millis(150)).await;
2479
2480        let remote = cluster_a
2481            .spawn_remote_for_class::<EchoRequest>(
2482                "SessionActor",
2483                RemoteSpawnSpec {
2484                    path: actor_path.clone(),
2485                    type_name: "bounded-echo".to_string(),
2486                    config: None,
2487                },
2488            )
2489            .await
2490            .expect("spawn remote actor by placement class over tcp");
2491
2492        let response = remote
2493            .ask(EchoRequest { value: 17 })
2494            .await
2495            .expect("bounded remote spawn ask over tcp by placement class");
2496        assert_eq!(response, EchoResponse { doubled: 34 });
2497
2498        let status = cluster_a
2499            .remote_supervision_status(&actor_path)
2500            .await
2501            .expect("placement supervision status");
2502        assert_eq!(status.supervisor, EngineId::new("engine-b.example"));
2503        assert_eq!(
2504            status.state,
2505            RemoteSupervisionState::Running { restart_count: 0 }
2506        );
2507
2508        call_control_plane(
2509            &cluster_a.control_plane,
2510            control_plane_b,
2511            "engine-b.example",
2512            "actor.restart",
2513            json!({ "path": actor_path.as_str() }),
2514        )
2515        .await
2516        .expect("restart placement actor on host");
2517
2518        let restarting_deadline = Instant::now() + Duration::from_secs(5);
2519        loop {
2520            match cluster_a.remote_supervision_status(&actor_path).await {
2521                Ok(status)
2522                    if matches!(
2523                        status.state,
2524                        RemoteSupervisionState::Restarting { restart_count: 1 }
2525                    ) =>
2526                {
2527                    assert_eq!(status.supervisor, EngineId::new("engine-b.example"));
2528                    break;
2529                }
2530                Ok(_) | Err(_) if Instant::now() <= restarting_deadline => {
2531                    tokio::time::sleep(Duration::from_millis(25)).await;
2532                }
2533                Ok(status) => {
2534                    panic!("placement-driven actor did not enter restarting state: {status:?}")
2535                }
2536                Err(err) => panic!("placement restart status query failed: {err:?}"),
2537            }
2538        }
2539
2540        wait_for_supervision_state(
2541            &cluster_a,
2542            &actor_path,
2543            RemoteSupervisionState::Running { restart_count: 1 },
2544        )
2545        .await;
2546
2547        let unknown_class = cluster_a
2548            .spawn_remote_for_class::<EchoRequest>(
2549                "MissingActorClass",
2550                RemoteSpawnSpec {
2551                    path: ActorPath::parse("/user/missing-class-tcp").unwrap(),
2552                    type_name: "bounded-echo".to_string(),
2553                    config: None,
2554                },
2555            )
2556            .await
2557            .expect_err("unknown placement class should fail");
2558        assert_eq!(
2559            unknown_class,
2560            RemoteSpawnError::UnknownPlacementClass("MissingActorClass".to_string())
2561        );
2562
2563        shutdown_a_tx.send(()).ok();
2564        shutdown_b_tx.send(()).ok();
2565    }
2566
2567    #[tokio::test]
2568    async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
2569        if !tcp_tests_available() {
2570            eprintln!("skipping tcp integration tests: network bind not permitted");
2571            return;
2572        }
2573
2574        let engine_a_id = EngineId::new("engine-a-norev.example");
2575        let engine_b_id = EngineId::new("engine-b-norev.example");
2576        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
2577        let (ca, ca_key) = make_ca();
2578        let tls_a = make_tls_config(engine_a_id.as_str(), &ca, &ca_key);
2579        let tls_b = make_tls_config(engine_b_id.as_str(), &ca, &ca_key);
2580        let control_plane_b = reserve_tcp_addr();
2581
2582        let engine_a = Engine::with_config(EngineConfig {
2583            engine_id: engine_a_id.clone(),
2584            ..Default::default()
2585        });
2586        let engine_b = Engine::with_config(EngineConfig {
2587            engine_id: engine_b_id.clone(),
2588            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2589            control_plane_tls: Some(tls_b.clone()),
2590            actor_spawn: Some(bounded_spawn_handler()),
2591            ..Default::default()
2592        });
2593
2594        let handle_a = engine_a.handle();
2595        let handle_b = engine_b.handle();
2596        handle_a
2597            .type_registry()
2598            .register_remote_ask::<EchoRequest>();
2599        handle_b
2600            .type_registry()
2601            .register_remote_ask::<EchoRequest>();
2602
2603        let cluster_b = handle_b
2604            .attach_bounded_cluster(BoundedClusterConfig {
2605                transport: BoundedTransportConfig::Tcp(tcp_config(engine_b_id.as_str(), tls_b)),
2606                peers: Vec::new(),
2607                declared_actors: Vec::new(),
2608                roles: Vec::new(),
2609                placement_policies: Vec::new(),
2610            })
2611            .await
2612            .expect("attach bounded cluster b");
2613        let tcp_b = cluster_b
2614            .transport()
2615            .as_tcp()
2616            .cloned()
2617            .expect("tcp transport b");
2618
2619        let cluster_a = handle_a
2620            .attach_bounded_cluster(BoundedClusterConfig {
2621                transport: BoundedTransportConfig::Tcp(tcp_config(engine_a_id.as_str(), tls_a)),
2622                peers: vec![PeerSpec {
2623                    engine_id: engine_b_id.clone(),
2624                    addr: tcp_b.local_addr(),
2625                    control_plane_addr: Some(control_plane_b),
2626                    server_name: None,
2627                }],
2628                declared_actors: Vec::new(),
2629                roles: vec![BoundedClusterRole {
2630                    name: "context-service".to_string(),
2631                    owner: engine_b_id.clone(),
2632                }],
2633                placement_policies: Vec::new(),
2634            })
2635            .await
2636            .expect("attach bounded cluster a");
2637        let tcp_a = cluster_a
2638            .transport()
2639            .as_tcp()
2640            .cloned()
2641            .expect("tcp transport a");
2642
2643        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2644        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2645
2646        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2647        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2648
2649        // Caller knows the static owner already. Remote spawn should not require
2650        // the host engine to predeclare a reverse peer just to install the
2651        // caller's canonical path route.
2652        let _ = tcp_a;
2653
2654        let remote = cluster_a
2655            .spawn_remote_on_role::<EchoRequest>(
2656                "context-service",
2657                RemoteSpawnSpec {
2658                    path: actor_path.clone(),
2659                    type_name: "bounded-echo".to_string(),
2660                    config: None,
2661                },
2662            )
2663            .await
2664            .expect("spawn remote actor by role over tcp without reverse declaration");
2665
2666        let response = remote
2667            .ask(EchoRequest { value: 13 })
2668            .await
2669            .expect("bounded remote spawn ask over tcp without reverse declaration");
2670        assert_eq!(response, EchoResponse { doubled: 26 });
2671
2672        shutdown_a_tx.send(()).ok();
2673        shutdown_b_tx.send(()).ok();
2674    }
2675
2676    #[tokio::test]
2677    async fn bounded_cluster_remote_supervision_reports_actor_unavailable() {
2678        if !tcp_tests_available() {
2679            eprintln!("skipping tcp integration tests: network bind not permitted");
2680            return;
2681        }
2682
2683        let actor_path = ActorPath::parse("/user/supervision-actor-unavailable").unwrap();
2684        let (ca, ca_key) = make_ca();
2685        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2686        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2687        let control_plane_b = reserve_tcp_addr();
2688
2689        let engine_a = Engine::with_config(EngineConfig {
2690            engine_id: EngineId::new("engine-a.example"),
2691            ..Default::default()
2692        });
2693        let mut engine_b = Engine::with_config(EngineConfig {
2694            engine_id: EngineId::new("engine-b.example"),
2695            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2696            control_plane_tls: Some(tls_b.clone()),
2697            ..Default::default()
2698        });
2699        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2700        engine_b.add_user_actor(ChildSpec::new(
2701            "supervision-actor-unavailable",
2702            RestartPolicy::Temporary,
2703            ShutdownPolicy::Timeout(Duration::from_secs(1)),
2704            ns,
2705            move || Box::new(EchoActor),
2706        ));
2707
2708        let handle_a = engine_a.handle();
2709        let handle_b = engine_b.handle();
2710        handle_a
2711            .type_registry()
2712            .register_remote_ask::<EchoRequest>();
2713        handle_b
2714            .type_registry()
2715            .register_remote_ask::<EchoRequest>();
2716
2717        let cluster_b = handle_b
2718            .attach_bounded_cluster(BoundedClusterConfig {
2719                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2720                peers: Vec::new(),
2721                declared_actors: Vec::new(),
2722                roles: Vec::new(),
2723                placement_policies: Vec::new(),
2724            })
2725            .await
2726            .expect("attach bounded cluster b");
2727        let tcp_b = cluster_b
2728            .transport()
2729            .as_tcp()
2730            .cloned()
2731            .expect("tcp transport b");
2732
2733        let mut cluster_a = handle_a
2734            .attach_bounded_cluster(BoundedClusterConfig {
2735                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2736                peers: vec![PeerSpec {
2737                    engine_id: EngineId::new("engine-b.example"),
2738                    addr: tcp_b.local_addr(),
2739                    control_plane_addr: Some(control_plane_b),
2740                    server_name: None,
2741                }],
2742                declared_actors: Vec::new(),
2743                roles: Vec::new(),
2744                placement_policies: Vec::new(),
2745            })
2746            .await
2747            .expect("attach bounded cluster a");
2748        cluster_a
2749            .declare_remote_actor(EngineId::new("engine-b.example"), actor_path.clone())
2750            .expect("declare remote actor");
2751
2752        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2753        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2754        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2755        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2756        tokio::time::sleep(Duration::from_millis(150)).await;
2757
2758        let remote = handle_a.remote_addr_for_path::<EchoRequest>(&actor_path);
2759        let response = remote
2760            .ask(EchoRequest { value: 9 })
2761            .await
2762            .expect("initial remote ask");
2763        assert_eq!(response, EchoResponse { doubled: 18 });
2764        wait_for_supervision_state(
2765            &cluster_a,
2766            &actor_path,
2767            RemoteSupervisionState::Running { restart_count: 0 },
2768        )
2769        .await;
2770
2771        call_control_plane(
2772            &cluster_a.control_plane,
2773            control_plane_b,
2774            "engine-b.example",
2775            "actor.kill",
2776            json!({ "path": actor_path.as_str() }),
2777        )
2778        .await
2779        .expect("kill remote actor on host");
2780
2781        wait_for_supervision_state(
2782            &cluster_a,
2783            &actor_path,
2784            RemoteSupervisionState::ActorUnavailable,
2785        )
2786        .await;
2787
2788        shutdown_a_tx.send(()).ok();
2789        shutdown_b_tx.send(()).ok();
2790    }
2791
2792    #[tokio::test]
2793    async fn bounded_cluster_remote_supervision_reports_supervisor_unavailable() {
2794        if !tcp_tests_available() {
2795            eprintln!("skipping tcp integration tests: network bind not permitted");
2796            return;
2797        }
2798
2799        let actor_path = ActorPath::parse("/user/supervision-host-unavailable").unwrap();
2800        let (ca, ca_key) = make_ca();
2801        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2802        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2803        let control_plane_b = reserve_tcp_addr();
2804
2805        let engine_a = Engine::with_config(EngineConfig {
2806            engine_id: EngineId::new("engine-a.example"),
2807            ..Default::default()
2808        });
2809        let mut engine_b = Engine::with_config(EngineConfig {
2810            engine_id: EngineId::new("engine-b.example"),
2811            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2812            control_plane_tls: Some(tls_b.clone()),
2813            ..Default::default()
2814        });
2815        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2816        engine_b.add_user_actor(ChildSpec::new(
2817            "supervision-host-unavailable",
2818            RestartPolicy::Permanent,
2819            ShutdownPolicy::Timeout(Duration::from_secs(1)),
2820            ns,
2821            move || Box::new(EchoActor),
2822        ));
2823
2824        let handle_a = engine_a.handle();
2825        let handle_b = engine_b.handle();
2826        handle_a
2827            .type_registry()
2828            .register_remote_ask::<EchoRequest>();
2829        handle_b
2830            .type_registry()
2831            .register_remote_ask::<EchoRequest>();
2832
2833        let cluster_b = handle_b
2834            .attach_bounded_cluster(BoundedClusterConfig {
2835                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
2836                peers: Vec::new(),
2837                declared_actors: Vec::new(),
2838                roles: Vec::new(),
2839                placement_policies: Vec::new(),
2840            })
2841            .await
2842            .expect("attach bounded cluster b");
2843        let tcp_b = cluster_b
2844            .transport()
2845            .as_tcp()
2846            .cloned()
2847            .expect("tcp transport b");
2848
2849        let mut cluster_a = handle_a
2850            .attach_bounded_cluster(BoundedClusterConfig {
2851                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2852                peers: vec![PeerSpec {
2853                    engine_id: EngineId::new("engine-b.example"),
2854                    addr: tcp_b.local_addr(),
2855                    control_plane_addr: Some(control_plane_b),
2856                    server_name: None,
2857                }],
2858                declared_actors: Vec::new(),
2859                roles: Vec::new(),
2860                placement_policies: Vec::new(),
2861            })
2862            .await
2863            .expect("attach bounded cluster a");
2864        cluster_a
2865            .declare_remote_actor(EngineId::new("engine-b.example"), actor_path.clone())
2866            .expect("declare remote actor");
2867
2868        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2869        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2870        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2871        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2872        tokio::time::sleep(Duration::from_millis(150)).await;
2873
2874        let remote = handle_a.remote_addr_for_path::<EchoRequest>(&actor_path);
2875        let response = remote
2876            .ask(EchoRequest { value: 3 })
2877            .await
2878            .expect("initial remote ask");
2879        assert_eq!(response, EchoResponse { doubled: 6 });
2880        wait_for_supervision_state(
2881            &cluster_a,
2882            &actor_path,
2883            RemoteSupervisionState::Running { restart_count: 0 },
2884        )
2885        .await;
2886
2887        shutdown_b_tx.send(()).ok();
2888        wait_for_supervision_state(
2889            &cluster_a,
2890            &actor_path,
2891            RemoteSupervisionState::SupervisorUnavailable,
2892        )
2893        .await;
2894
2895        shutdown_a_tx.send(()).ok();
2896    }
2897
2898    #[tokio::test]
2899    async fn bounded_cluster_remote_supervision_reports_transport_unavailable() {
2900        if !tcp_tests_available() {
2901            eprintln!("skipping tcp integration tests: network bind not permitted");
2902            return;
2903        }
2904
2905        let actor_path = ActorPath::parse("/user/supervision-transport-unavailable").unwrap();
2906        let (ca, ca_key) = make_ca();
2907        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2908        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2909        let control_plane_b = reserve_tcp_addr();
2910        let wrong_transport_addr = reserve_tcp_addr();
2911
2912        let engine_a = Engine::with_config(EngineConfig {
2913            engine_id: EngineId::new("engine-a.example"),
2914            ..Default::default()
2915        });
2916        let mut engine_b = Engine::with_config(EngineConfig {
2917            engine_id: EngineId::new("engine-b.example"),
2918            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2919            control_plane_tls: Some(tls_b),
2920            ..Default::default()
2921        });
2922        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2923        engine_b.add_user_actor(ChildSpec::new(
2924            "supervision-transport-unavailable",
2925            RestartPolicy::Permanent,
2926            ShutdownPolicy::Timeout(Duration::from_secs(1)),
2927            ns,
2928            move || Box::new(EchoActor),
2929        ));
2930
2931        let handle_a = engine_a.handle();
2932        let mut cluster_a = handle_a
2933            .attach_bounded_cluster(BoundedClusterConfig {
2934                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
2935                peers: vec![PeerSpec {
2936                    engine_id: EngineId::new("engine-b.example"),
2937                    addr: wrong_transport_addr,
2938                    control_plane_addr: Some(control_plane_b),
2939                    server_name: None,
2940                }],
2941                declared_actors: Vec::new(),
2942                roles: Vec::new(),
2943                placement_policies: Vec::new(),
2944            })
2945            .await
2946            .expect("attach bounded cluster a");
2947        cluster_a
2948            .declare_remote_actor(EngineId::new("engine-b.example"), actor_path.clone())
2949            .expect("declare remote actor");
2950
2951        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2952        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2953        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2954        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2955        tokio::time::sleep(Duration::from_millis(150)).await;
2956
2957        wait_for_supervision_state(
2958            &cluster_a,
2959            &actor_path,
2960            RemoteSupervisionState::TransportUnavailable,
2961        )
2962        .await;
2963
2964        shutdown_a_tx.send(()).ok();
2965        shutdown_b_tx.send(()).ok();
2966    }
2967
2968    fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
2969        QuicTransportConfig {
2970            engine_id: EngineId::new(engine_id),
2971            listen_addr: "127.0.0.1:0".parse().unwrap(),
2972            send_buffer_size: 1024,
2973            idle_timeout: Duration::from_secs(2),
2974            tls,
2975        }
2976    }
2977
2978    #[tokio::test]
2979    async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
2980        if !quic_tests_available() {
2981            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2982            return;
2983        }
2984
2985        let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
2986        let (ca, ca_key) = make_ca();
2987        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2988        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2989
2990        let mut engine_a = Engine::with_config(EngineConfig {
2991            engine_id: EngineId::new("engine-a.example"),
2992            ..Default::default()
2993        });
2994        let engine_b = Engine::with_config(EngineConfig {
2995            engine_id: EngineId::new("engine-b.example"),
2996            ..Default::default()
2997        });
2998
2999        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
3000        engine_a.add_user_actor(ChildSpec::new(
3001            "cluster-echo-quic",
3002            RestartPolicy::Permanent,
3003            ShutdownPolicy::Timeout(Duration::from_secs(1)),
3004            ns,
3005            move || Box::new(EchoActor),
3006        ));
3007
3008        let handle_a = engine_a.handle();
3009        let handle_b = engine_b.handle();
3010        handle_a
3011            .type_registry()
3012            .register_remote_ask::<EchoRequest>();
3013        handle_b
3014            .type_registry()
3015            .register_remote_ask::<EchoRequest>();
3016
3017        let cluster_a = match handle_a
3018            .attach_bounded_cluster(BoundedClusterConfig {
3019                transport: BoundedTransportConfig::Quic(quic_config(
3020                    "engine-a.example",
3021                    tls_a.clone(),
3022                )),
3023                peers: Vec::new(),
3024                declared_actors: Vec::new(),
3025                roles: Vec::new(),
3026                placement_policies: Vec::new(),
3027            })
3028            .await
3029        {
3030            Ok(cluster) => cluster,
3031            Err(ClusterError::Transport(err)) => {
3032                eprintln!("skipping quic bounded-cluster test: {err:?}");
3033                return;
3034            }
3035            Err(err) => panic!("attach bounded cluster a: {err:?}"),
3036        };
3037        let quic_a = cluster_a
3038            .transport()
3039            .as_quic()
3040            .cloned()
3041            .expect("quic transport a");
3042
3043        let cluster_b = match handle_b
3044            .attach_bounded_cluster(BoundedClusterConfig {
3045                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
3046                peers: vec![PeerSpec {
3047                    engine_id: EngineId::new("engine-a.example"),
3048                    addr: quic_a.local_addr(),
3049                    control_plane_addr: None,
3050                    server_name: None,
3051                }],
3052                declared_actors: Vec::new(),
3053                roles: Vec::new(),
3054                placement_policies: Vec::new(),
3055            })
3056            .await
3057        {
3058            Ok(cluster) => cluster,
3059            Err(ClusterError::Transport(err)) => {
3060                eprintln!("skipping quic bounded-cluster test: {err:?}");
3061                return;
3062            }
3063            Err(err) => panic!("attach bounded cluster b: {err:?}"),
3064        };
3065        let quic_b = cluster_b
3066            .transport()
3067            .as_quic()
3068            .cloned()
3069            .expect("quic transport b");
3070
3071        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
3072        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
3073        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
3074
3075        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
3076        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
3077
3078        let canonical = AddrHash::new(&actor_path, 0);
3079        let deadline = std::time::Instant::now() + Duration::from_secs(5);
3080        while !cluster_b.can_route(&actor_path) {
3081            if std::time::Instant::now() > deadline {
3082                panic!("bounded quic cluster route did not propagate");
3083            }
3084            tokio::time::sleep(Duration::from_millis(20)).await;
3085        }
3086        assert!(quic_b.can_route(canonical));
3087
3088        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
3089        let ask_deadline = Instant::now() + Duration::from_secs(2);
3090        loop {
3091            match remote.ask(EchoRequest { value: 15 }).await {
3092                Ok(response) => {
3093                    assert_eq!(response, EchoResponse { doubled: 30 });
3094                    break;
3095                }
3096                Err(palladium_actor::AskError::Send(
3097                    palladium_actor::SendError::ConnectionFailed,
3098                )) if Instant::now() <= ask_deadline => {
3099                    tokio::time::sleep(Duration::from_millis(20)).await;
3100                }
3101                Err(err) => panic!("bounded cluster ask over quic: {err:?}"),
3102            }
3103        }
3104
3105        shutdown_a_tx.send(()).ok();
3106        shutdown_b_tx.send(()).ok();
3107    }
3108
3109    #[tokio::test]
3110    async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
3111        if !quic_tests_available() {
3112            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
3113            return;
3114        }
3115
3116        let (ca, ca_key) = make_ca();
3117        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
3118        let engine = Engine::with_config(EngineConfig {
3119            engine_id: EngineId::new("engine-a.example"),
3120            ..Default::default()
3121        });
3122        let handle = engine.handle();
3123
3124        let mut cluster = match handle
3125            .attach_bounded_cluster(BoundedClusterConfig {
3126                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
3127                peers: vec![PeerSpec {
3128                    engine_id: EngineId::new("engine-b.example"),
3129                    addr: "127.0.0.1:4100".parse().unwrap(),
3130                    control_plane_addr: None,
3131                    server_name: None,
3132                }],
3133                declared_actors: Vec::new(),
3134                roles: Vec::new(),
3135                placement_policies: Vec::new(),
3136            })
3137            .await
3138        {
3139            Ok(cluster) => cluster,
3140            Err(ClusterError::Transport(err)) => {
3141                eprintln!("skipping quic bounded-cluster test: {err:?}");
3142                return;
3143            }
3144            Err(err) => panic!("attach bounded cluster: {err:?}"),
3145        };
3146
3147        let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
3148        cluster
3149            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
3150            .expect("declare remote actor");
3151
3152        assert!(cluster.can_route(&path));
3153        assert!(cluster
3154            .transport()
3155            .as_quic()
3156            .expect("quic transport")
3157            .can_route(AddrHash::new(&path, 0)));
3158    }
3159
3160    #[tokio::test]
3161    async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
3162        if !quic_tests_available() {
3163            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
3164            return;
3165        }
3166
3167        let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
3168        let (ca, ca_key) = make_ca();
3169        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
3170        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
3171        let control_plane_b = reserve_udp_addr();
3172
3173        let engine_a = Engine::with_config(EngineConfig {
3174            engine_id: EngineId::new("engine-a.example"),
3175            ..Default::default()
3176        });
3177        let engine_b = Engine::with_config(EngineConfig {
3178            engine_id: EngineId::new("engine-b.example"),
3179            control_plane_quic_addr: Some(control_plane_b.to_string()),
3180            control_plane_tls: Some(tls_b.clone()),
3181            actor_spawn: Some(bounded_spawn_handler()),
3182            ..Default::default()
3183        });
3184
3185        let handle_a = engine_a.handle();
3186        let handle_b = engine_b.handle();
3187        handle_a
3188            .type_registry()
3189            .register_remote_ask::<EchoRequest>();
3190        handle_b
3191            .type_registry()
3192            .register_remote_ask::<EchoRequest>();
3193
3194        let cluster_b = match handle_b
3195            .attach_bounded_cluster(BoundedClusterConfig {
3196                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
3197                peers: Vec::new(),
3198                declared_actors: Vec::new(),
3199                roles: Vec::new(),
3200                placement_policies: Vec::new(),
3201            })
3202            .await
3203        {
3204            Ok(cluster) => cluster,
3205            Err(ClusterError::Transport(err)) => {
3206                eprintln!("skipping quic bounded-cluster test: {err:?}");
3207                return;
3208            }
3209            Err(err) => panic!("attach bounded cluster b: {err:?}"),
3210        };
3211        let quic_b = cluster_b
3212            .transport()
3213            .as_quic()
3214            .cloned()
3215            .expect("quic transport b");
3216
3217        let cluster_a = match handle_a
3218            .attach_bounded_cluster(BoundedClusterConfig {
3219                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
3220                peers: vec![PeerSpec {
3221                    engine_id: EngineId::new("engine-b.example"),
3222                    addr: quic_b.local_addr(),
3223                    control_plane_addr: Some(control_plane_b),
3224                    server_name: None,
3225                }],
3226                declared_actors: Vec::new(),
3227                roles: vec![BoundedClusterRole {
3228                    name: "context-service".to_string(),
3229                    owner: EngineId::new("engine-b.example"),
3230                }],
3231                placement_policies: Vec::new(),
3232            })
3233            .await
3234        {
3235            Ok(cluster) => cluster,
3236            Err(ClusterError::Transport(err)) => {
3237                eprintln!("skipping quic bounded-cluster test: {err:?}");
3238                return;
3239            }
3240            Err(err) => panic!("attach bounded cluster a: {err:?}"),
3241        };
3242        let quic_a = cluster_a
3243            .transport()
3244            .as_quic()
3245            .cloned()
3246            .expect("quic transport a");
3247
3248        let mut cluster_b = cluster_b;
3249        cluster_b
3250            .add_peer(PeerSpec {
3251                engine_id: EngineId::new("engine-a.example"),
3252                addr: quic_a.local_addr(),
3253                control_plane_addr: None,
3254                server_name: None,
3255            })
3256            .expect("add quic peer a to cluster b");
3257
3258        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
3259        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
3260
3261        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
3262        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
3263        tokio::time::sleep(Duration::from_millis(150)).await;
3264
3265        let remote = cluster_a
3266            .spawn_remote_on_role::<EchoRequest>(
3267                "context-service",
3268                RemoteSpawnSpec {
3269                    path: actor_path.clone(),
3270                    type_name: "bounded-echo".to_string(),
3271                    config: None,
3272                },
3273            )
3274            .await
3275            .expect("spawn remote actor over quic by role");
3276
3277        let response = remote
3278            .ask(EchoRequest { value: 9 })
3279            .await
3280            .expect("bounded remote spawn ask over quic by role");
3281        assert_eq!(response, EchoResponse { doubled: 18 });
3282
3283        let unknown_role = cluster_a
3284            .spawn_remote_on_role::<EchoRequest>(
3285                "missing-role",
3286                RemoteSpawnSpec {
3287                    path: ActorPath::parse("/user/missing-role").unwrap(),
3288                    type_name: "bounded-echo".to_string(),
3289                    config: None,
3290                },
3291            )
3292            .await
3293            .expect_err("unknown role should fail");
3294        assert_eq!(
3295            unknown_role,
3296            RemoteSpawnError::UnknownRole("missing-role".to_string())
3297        );
3298
3299        shutdown_a_tx.send(()).ok();
3300        shutdown_b_tx.send(()).ok();
3301    }
3302}