Skip to main content

palladium_runtime/
bounded_cluster.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use dashmap::DashMap;
10use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
11use palladium_transport::network::{Network, TokioNetwork};
12use palladium_transport::{
13    mailbox, QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport,
14    TransportError,
15};
16use rustls::pki_types::ServerName;
17use serde_json::{json, Value};
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
19use tokio_rustls::TlsConnector;
20
21trait SpawnPeerInstaller: Send + Sync {
22    fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>);
23}
24
25impl<R: Reactor + Clone, N: Network + Clone> SpawnPeerInstaller for BoundedTransportHandle<R, N> {
26    fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>) {
27        let source_addr = AddrHash::synthetic(engine_id.as_str().as_bytes());
28        match server_name {
29            Some(server_name) => self.add_peer(&PeerSpec {
30                engine_id: engine_id.clone(),
31                addr,
32                control_plane_addr: None,
33                server_name: Some(server_name),
34            }),
35            None => self.add_peer(&PeerSpec {
36                engine_id: engine_id.clone(),
37                addr,
38                control_plane_addr: None,
39                server_name: None,
40            }),
41        }
42        match self {
43            Self::Tcp(transport) => {
44                let _ = transport.register_remote_source_addr(engine_id, source_addr);
45            }
46            Self::Quic(transport) => {
47                let _ = transport.register_remote_source_addr(engine_id, source_addr);
48            }
49        }
50    }
51}
52
53fn spawn_peer_installers() -> &'static DashMap<String, Arc<dyn SpawnPeerInstaller>> {
54    static INSTALLERS: std::sync::OnceLock<DashMap<String, Arc<dyn SpawnPeerInstaller>>> =
55        std::sync::OnceLock::new();
56    INSTALLERS.get_or_init(DashMap::new)
57}
58
59pub(crate) fn register_spawn_peer_installer<R: Reactor + Clone, N: Network + Clone>(
60    engine_id: &EngineId,
61    transport: &BoundedTransportHandle<R, N>,
62) {
63    spawn_peer_installers().insert(
64        engine_id.as_str().to_string(),
65        Arc::new(transport.clone()) as Arc<dyn SpawnPeerInstaller>,
66    );
67}
68
69pub(crate) fn maybe_register_spawn_caller_peer(
70    host_engine_id: &EngineId,
71    caller_engine_id: &EngineId,
72    caller_transport_addr: SocketAddr,
73    caller_server_name: Option<String>,
74) {
75    if let Some(installer) = spawn_peer_installers().get(host_engine_id.as_str()) {
76        installer.ensure_peer(
77            caller_engine_id.clone(),
78            caller_transport_addr,
79            caller_server_name,
80        );
81    }
82}
83
84#[derive(Clone)]
85pub struct BoundedClusterConfig {
86    pub transport: BoundedTransportConfig,
87    pub peers: Vec<PeerSpec>,
88    pub declared_actors: Vec<DeclaredRemoteActor>,
89    pub roles: Vec<BoundedClusterRole>,
90}
91
92impl BoundedClusterConfig {
93    pub fn validate(&self) -> Result<(), ClusterError> {
94        validate_static_config(self)
95    }
96}
97
98#[derive(Clone)]
99pub enum BoundedTransportConfig {
100    Tcp(TcpTransportConfig),
101    Quic(QuicTransportConfig),
102}
103
104impl BoundedTransportConfig {
105    pub fn engine_id(&self) -> &EngineId {
106        match self {
107            Self::Tcp(config) => &config.engine_id,
108            Self::Quic(config) => &config.engine_id,
109        }
110    }
111}
112
113#[derive(Clone, Debug, PartialEq, Eq)]
114pub struct PeerSpec {
115    pub engine_id: EngineId,
116    pub addr: SocketAddr,
117    pub control_plane_addr: Option<SocketAddr>,
118    pub server_name: Option<String>,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq)]
122pub struct DeclaredRemoteActor {
123    pub path: ActorPath,
124    pub owner: EngineId,
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
128pub struct BoundedClusterRole {
129    pub name: String,
130    pub owner: EngineId,
131}
132
133#[derive(Clone, Debug, PartialEq, Eq)]
134pub enum ClusterError {
135    DuplicatePeer(EngineId),
136    LocalPeerDeclared(EngineId),
137    DuplicateDeclaredActor(ActorPath),
138    LocalActorDeclaredRemote(ActorPath),
139    UnknownPeerOwner { path: ActorPath, owner: EngineId },
140    DuplicateRole(String),
141    UnknownRoleOwner { role: String, owner: EngineId },
142    RoleOwnerMissingControlPlaneEndpoint { role: String, owner: EngineId },
143    Transport(TransportError),
144}
145
146#[derive(Clone, Debug, PartialEq)]
147pub struct RemoteSpawnSpec {
148    pub path: ActorPath,
149    pub type_name: String,
150    pub config: Option<Value>,
151}
152
153#[derive(Clone, Debug, PartialEq, Eq)]
154pub enum RemoteSpawnError {
155    UnknownPeer(EngineId),
156    UnknownRole(String),
157    LocalPathCollision(ActorPath),
158    RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
159    RemotePathAlreadyRunning(ActorPath),
160    MissingControlPlaneEndpoint(EngineId),
161    InvalidServerName(String),
162    ControlPlane(String),
163    SpawnTimedOut(ActorPath),
164}
165
166#[derive(Clone)]
167pub(crate) enum ControlPlaneProtocol {
168    Tcp,
169    Quic,
170}
171
172#[derive(Clone)]
173pub(crate) struct ControlPlaneClientConfig {
174    pub(crate) protocol: ControlPlaneProtocol,
175    pub(crate) tls: palladium_transport::TlsConfig,
176    pub(crate) wait_timeout: Duration,
177}
178
179const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
180
181impl From<TransportError> for ClusterError {
182    fn from(value: TransportError) -> Self {
183        Self::Transport(value)
184    }
185}
186
187impl From<TransportError> for RemoteSpawnError {
188    fn from(value: TransportError) -> Self {
189        Self::ControlPlane(format!("transport route install failed: {value:?}"))
190    }
191}
192
193#[derive(Clone)]
194pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
195    Tcp(Arc<TcpTransport<R, N>>),
196    Quic(Arc<QuicTransport<R, N>>),
197}
198
199impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
200    pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
201        match self {
202            Self::Tcp(transport) => Some(transport),
203            Self::Quic(_) => None,
204        }
205    }
206
207    pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
208        match self {
209            Self::Tcp(_) => None,
210            Self::Quic(transport) => Some(transport),
211        }
212    }
213
214    pub fn add_peer(&self, peer: &PeerSpec) {
215        match self {
216            Self::Tcp(transport) => {
217                if let Some(server_name) = &peer.server_name {
218                    transport.add_peer_with_server_name(
219                        peer.engine_id.clone(),
220                        peer.addr,
221                        server_name.clone(),
222                    );
223                } else {
224                    transport.add_peer(peer.engine_id.clone(), peer.addr);
225                }
226            }
227            Self::Quic(transport) => {
228                if let Some(server_name) = &peer.server_name {
229                    transport.add_peer_with_server_name(
230                        peer.engine_id.clone(),
231                        peer.addr,
232                        server_name.clone(),
233                    );
234                } else {
235                    transport.add_peer(peer.engine_id.clone(), peer.addr);
236                }
237            }
238        }
239    }
240
241    pub fn declare_remote_path(
242        &self,
243        owner: EngineId,
244        path: &ActorPath,
245    ) -> Result<(), TransportError> {
246        match self {
247            Self::Tcp(transport) => transport.declare_remote_path(owner, path),
248            Self::Quic(transport) => transport.declare_remote_path(owner, path),
249        }
250    }
251
252    pub fn advertise_path(&self, addr: AddrHash, path: &ActorPath) -> Result<(), TransportError> {
253        match self {
254            Self::Tcp(transport) => transport.advertise_path(addr, path),
255            Self::Quic(transport) => transport.advertise_path(addr, path),
256        }
257    }
258
259    pub fn register_source_addr(&self, addr: AddrHash) -> Result<(), TransportError> {
260        match self {
261            Self::Tcp(transport) => transport.register(addr, mailbox(1).0),
262            Self::Quic(transport) => transport.register(addr, mailbox(1).0),
263        }
264    }
265
266    pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
267        match self {
268            Self::Tcp(transport) => transport.undeclare_remote_path(path),
269            Self::Quic(transport) => transport.undeclare_remote_path(path),
270        }
271    }
272
273    pub fn can_route(&self, destination: AddrHash) -> bool {
274        match self {
275            Self::Tcp(transport) => transport.can_route(destination),
276            Self::Quic(transport) => transport.can_route(destination),
277        }
278    }
279
280    pub async fn wait_for_peer_connection(
281        &self,
282        engine_id: &EngineId,
283        timeout: Duration,
284    ) -> Result<(), TransportError> {
285        match self {
286            Self::Tcp(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
287            Self::Quic(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
288        }
289    }
290}
291
292#[derive(Clone)]
293pub struct BoundedClusterHandle<
294    R: Reactor = TokioReactor,
295    N: Network = TokioNetwork,
296    F: FileSystem = TokioFileSystem,
297> {
298    engine: EngineHandle<R, N, F>,
299    local_engine_id: EngineId,
300    control_plane: ControlPlaneClientConfig,
301    transport: BoundedTransportHandle<R, N>,
302    peers: Vec<PeerSpec>,
303    declared_actors: Vec<DeclaredRemoteActor>,
304    roles: Vec<BoundedClusterRole>,
305    registry: Arc<ActorRegistry<R>>,
306}
307
308impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
309    pub(crate) fn new(
310        engine: EngineHandle<R, N, F>,
311        local_engine_id: EngineId,
312        control_plane: ControlPlaneClientConfig,
313        transport: BoundedTransportHandle<R, N>,
314        peers: Vec<PeerSpec>,
315        roles: Vec<BoundedClusterRole>,
316        registry: Arc<ActorRegistry<R>>,
317    ) -> Self {
318        Self {
319            engine,
320            local_engine_id,
321            control_plane,
322            transport,
323            peers,
324            declared_actors: Vec::new(),
325            roles,
326            registry,
327        }
328    }
329
330    pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
331        &self.transport
332    }
333
334    pub fn peers(&self) -> &[PeerSpec] {
335        &self.peers
336    }
337
338    pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
339        &self.declared_actors
340    }
341
342    pub fn roles(&self) -> &[BoundedClusterRole] {
343        &self.roles
344    }
345
346    pub(crate) fn advertise_existing_local_routes(&self) -> Result<(), ClusterError> {
347        self.transport
348            .register_source_addr(self.engine.source_addr)
349            .map_err(ClusterError::Transport)?;
350
351        for info in self
352            .registry
353            .snapshot(&crate::introspection::ActorQuery::default(), 0)
354        {
355            if info.state != crate::introspection::ActorState::Running {
356                continue;
357            }
358            if let Some(slot) = self.registry.get_by_path(&info.path) {
359                self.transport
360                    .advertise_path(slot.addr, &info.path)
361                    .map_err(ClusterError::Transport)?;
362            }
363        }
364        Ok(())
365    }
366
367    pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
368        validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
369        self.transport.add_peer(&peer);
370        self.advertise_existing_local_routes()?;
371        self.peers.push(peer);
372        Ok(())
373    }
374
375    pub fn declare_remote_actor(
376        &mut self,
377        owner: EngineId,
378        path: ActorPath,
379    ) -> Result<(), ClusterError> {
380        validate_declared_actor_topology(
381            &path,
382            &owner,
383            &self.local_engine_id,
384            &self.peers,
385            &self.declared_actors,
386        )?;
387        validate_declared_actor_registry(
388            &path,
389            &owner,
390            &self.local_engine_id,
391            &self.declared_actors,
392            &self.registry,
393        )?;
394        self.transport.declare_remote_path(owner.clone(), &path)?;
395        self.declared_actors
396            .push(DeclaredRemoteActor { path, owner });
397        Ok(())
398    }
399
400    pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
401        self.transport.undeclare_remote_path(path)?;
402        self.declared_actors
403            .retain(|declared| &declared.path != path);
404        Ok(())
405    }
406
407    pub fn can_route(&self, path: &ActorPath) -> bool {
408        self.transport.can_route(AddrHash::new(path, 0))
409    }
410
411    pub async fn spawn_remote_on_role<M>(
412        &self,
413        role: &str,
414        spec: RemoteSpawnSpec,
415    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
416    where
417        M: RemoteMessage,
418        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
419        R: Send + Sync,
420    {
421        let owner = self
422            .roles
423            .iter()
424            .find(|declared| declared.name == role)
425            .map(|declared| declared.owner.clone())
426            .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
427        self.spawn_remote::<M>(owner, spec).await
428    }
429
430    pub async fn spawn_remote<M>(
431        &self,
432        target: EngineId,
433        spec: RemoteSpawnSpec,
434    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
435    where
436        M: RemoteMessage,
437        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
438        R: Send + Sync,
439    {
440        validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
441
442        let peer = self
443            .peers
444            .iter()
445            .find(|peer| peer.engine_id == target)
446            .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
447        let control_plane_addr = peer
448            .control_plane_addr
449            .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
450        let server_name = peer
451            .server_name
452            .clone()
453            .unwrap_or_else(|| target.as_str().to_string());
454        let caller_server_name = self.local_engine_id.as_str().to_string();
455
456        let deadline = Instant::now() + self.control_plane.wait_timeout;
457        loop {
458            match call_actor_spawn(
459                &self.control_plane,
460                control_plane_addr,
461                &server_name,
462                &self.local_engine_id,
463                transport_listen_addr(&self.transport),
464                &caller_server_name,
465                &spec,
466            )
467            .await
468            {
469                Ok(()) => break,
470                Err(RemoteSpawnError::ControlPlane(message))
471                    if is_retryable_control_plane_error(&message) && Instant::now() <= deadline =>
472                {
473                    tokio::time::sleep(Duration::from_millis(20)).await;
474                }
475                Err(err) => return Err(err),
476            }
477        }
478
479        while Instant::now() <= deadline {
480            if actor_is_spawned(
481                &self.control_plane,
482                control_plane_addr,
483                &server_name,
484                &spec.path,
485            )
486            .await?
487            {
488                self.transport
489                    .declare_remote_path(target.clone(), &spec.path)
490                    .map_err(RemoteSpawnError::from)?;
491                self.transport
492                    .wait_for_peer_connection(&target, self.control_plane.wait_timeout)
493                    .await
494                    .map_err(RemoteSpawnError::from)?;
495                return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
496            }
497            tokio::time::sleep(Duration::from_millis(20)).await;
498        }
499
500        Err(RemoteSpawnError::SpawnTimedOut(spec.path))
501    }
502}
503
504async fn call_actor_spawn(
505    client: &ControlPlaneClientConfig,
506    addr: SocketAddr,
507    server_name: &str,
508    caller_engine_id: &EngineId,
509    caller_transport_addr: SocketAddr,
510    caller_server_name: &str,
511    spec: &RemoteSpawnSpec,
512) -> Result<(), RemoteSpawnError> {
513    let params = match &spec.config {
514        Some(config) => json!({
515            "path": spec.path.as_str(),
516            "type_name": spec.type_name,
517            "config": config,
518            "caller_engine_id": caller_engine_id.as_str(),
519            "caller_transport_addr": caller_transport_addr,
520            "caller_server_name": caller_server_name,
521        }),
522        None => json!({
523            "path": spec.path.as_str(),
524            "type_name": spec.type_name,
525            "caller_engine_id": caller_engine_id.as_str(),
526            "caller_transport_addr": caller_transport_addr,
527            "caller_server_name": caller_server_name,
528        }),
529    };
530    match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
531        Ok(_) => {}
532        Err(RemoteSpawnError::ControlPlane(message))
533            if message.contains("actor already running at") =>
534        {
535            return Err(RemoteSpawnError::RemotePathAlreadyRunning(
536                spec.path.clone(),
537            ));
538        }
539        Err(err) => return Err(err),
540    }
541    Ok(())
542}
543
544fn transport_listen_addr<R: Reactor + Clone, N: Network + Clone>(
545    transport: &BoundedTransportHandle<R, N>,
546) -> SocketAddr {
547    match transport {
548        BoundedTransportHandle::Tcp(transport) => transport.local_addr(),
549        BoundedTransportHandle::Quic(transport) => transport.local_addr(),
550    }
551}
552
553fn is_retryable_control_plane_error(message: &str) -> bool {
554    let msg = message.to_ascii_lowercase();
555    msg.contains("connection refused")
556        || msg.contains("connection reset")
557        || msg.contains("timed out")
558        || msg.contains("broken pipe")
559        || msg.contains("not connected")
560}
561
562fn validate_remote_spawn_request<R: Reactor>(
563    path: &ActorPath,
564    owner: &EngineId,
565    declared_actors: &[DeclaredRemoteActor],
566    registry: &Arc<ActorRegistry<R>>,
567) -> Result<(), RemoteSpawnError> {
568    if registry.get_by_path(path).is_some() {
569        return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
570    }
571
572    if let Some(declared) = declared_actors
573        .iter()
574        .find(|declared| declared.path == *path && declared.owner != *owner)
575    {
576        return Err(RemoteSpawnError::RemoteOwnershipConflict {
577            path: path.clone(),
578            owner: declared.owner.clone(),
579        });
580    }
581
582    Ok(())
583}
584
585async fn actor_is_spawned(
586    client: &ControlPlaneClientConfig,
587    addr: SocketAddr,
588    server_name: &str,
589    path: &ActorPath,
590) -> Result<bool, RemoteSpawnError> {
591    match call_control_plane(
592        client,
593        addr,
594        server_name,
595        "actor.info",
596        json!({ "path": path.as_str() }),
597    )
598    .await
599    {
600        Ok(_) => Ok(true),
601        Err(RemoteSpawnError::ControlPlane(message))
602            if message.contains("actor not found:")
603                || message.contains("actor not found")
604                || message.contains("User supervisor not available") =>
605        {
606            Ok(false)
607        }
608        Err(err) => Err(err),
609    }
610}
611
612async fn call_control_plane(
613    client: &ControlPlaneClientConfig,
614    addr: SocketAddr,
615    server_name: &str,
616    method: &str,
617    params: Value,
618) -> Result<Value, RemoteSpawnError> {
619    let request = json!({
620        "id": 1,
621        "method": method,
622        "params": params,
623    });
624
625    match client.protocol {
626        ControlPlaneProtocol::Tcp => {
627            call_control_plane_tcp(addr, server_name, &client.tls, &request).await
628        }
629        ControlPlaneProtocol::Quic => {
630            call_control_plane_quic(addr, server_name, &client.tls, &request).await
631        }
632    }
633}
634
635async fn call_control_plane_tcp(
636    addr: SocketAddr,
637    server_name: &str,
638    tls: &palladium_transport::TlsConfig,
639    request: &Value,
640) -> Result<Value, RemoteSpawnError> {
641    let config = tls
642        .client_config(server_name)
643        .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
644    let name = ServerName::try_from(server_name.to_string())
645        .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
646    let stream = tokio::net::TcpStream::connect(addr)
647        .await
648        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
649    let connector = TlsConnector::from(config);
650    let tls_stream = connector
651        .connect(name, stream)
652        .await
653        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
654
655    let (reader, mut writer) = tokio::io::split(tls_stream);
656    let mut body = request.to_string();
657    body.push('\n');
658    writer
659        .write_all(body.as_bytes())
660        .await
661        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
662    let mut line = String::new();
663    let mut reader = BufReader::new(reader);
664    reader
665        .read_line(&mut line)
666        .await
667        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
668    parse_control_plane_response(&line)
669}
670
671async fn call_control_plane_quic(
672    addr: SocketAddr,
673    server_name: &str,
674    tls: &palladium_transport::TlsConfig,
675    request: &Value,
676) -> Result<Value, RemoteSpawnError> {
677    ensure_rustls_provider();
678    let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
679        RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
680    })?)
681    .clone();
682    client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
683    let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
684
685    let client = s2n_quic::Client::builder()
686        .with_tls(client_tls)
687        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
688        .with_io("0.0.0.0:0")
689        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
690        .start()
691        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
692
693    let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
694    let mut conn = client
695        .connect(connect)
696        .await
697        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
698    let mut stream = conn
699        .open_bidirectional_stream()
700        .await
701        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
702
703    let mut body = request.to_string();
704    body.push('\n');
705    stream
706        .write_all(body.as_bytes())
707        .await
708        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
709    stream
710        .close()
711        .await
712        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
713
714    let mut buf = Vec::new();
715    stream
716        .read_to_end(&mut buf)
717        .await
718        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
719    let line = String::from_utf8_lossy(&buf);
720    parse_control_plane_response(&line)
721}
722
723fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
724    let response: Value = serde_json::from_str(line.trim())
725        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
726    if let Some(error) = response.get("error") {
727        let message = error
728            .get("message")
729            .and_then(Value::as_str)
730            .unwrap_or("unknown control-plane error");
731        return Err(RemoteSpawnError::ControlPlane(message.to_string()));
732    }
733    Ok(response.get("result").cloned().unwrap_or(Value::Null))
734}
735
736fn ensure_rustls_provider() {
737    static INIT: std::sync::Once = std::sync::Once::new();
738    INIT.call_once(|| {
739        #[cfg(feature = "aws-lc-rs")]
740        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
741        #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
742        let _ = rustls::crypto::ring::default_provider().install_default();
743    });
744}
745
746pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
747    ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
748}
749
750pub(crate) fn validate_config<R: Reactor>(
751    config: &BoundedClusterConfig,
752    registry: &Arc<ActorRegistry<R>>,
753) -> Result<(), ClusterError> {
754    validate_static_config(config)?;
755
756    let local_engine_id = config.transport.engine_id();
757    let mut seen_declared = Vec::new();
758    for declared in &config.declared_actors {
759        validate_declared_actor_registry(
760            &declared.path,
761            &declared.owner,
762            local_engine_id,
763            &seen_declared,
764            registry,
765        )?;
766        seen_declared.push(declared.clone());
767    }
768
769    Ok(())
770}
771
772fn validate_static_config(config: &BoundedClusterConfig) -> Result<(), ClusterError> {
773    let local_engine_id = config.transport.engine_id();
774    let mut seen_peers = Vec::new();
775    for peer in &config.peers {
776        validate_peer_spec(peer, local_engine_id, &seen_peers)?;
777        seen_peers.push(peer.clone());
778    }
779
780    let mut seen_declared = Vec::new();
781    for declared in &config.declared_actors {
782        validate_declared_actor_topology(
783            &declared.path,
784            &declared.owner,
785            local_engine_id,
786            &config.peers,
787            &seen_declared,
788        )?;
789        seen_declared.push(declared.clone());
790    }
791
792    let mut seen_roles = Vec::new();
793    for role in &config.roles {
794        validate_role(role, &config.peers, &seen_roles)?;
795        seen_roles.push(role.clone());
796    }
797
798    Ok(())
799}
800
801fn validate_peer_spec(
802    peer: &PeerSpec,
803    local_engine_id: &EngineId,
804    existing: &[PeerSpec],
805) -> Result<(), ClusterError> {
806    if &peer.engine_id == local_engine_id {
807        return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
808    }
809    if existing
810        .iter()
811        .any(|existing| existing.engine_id == peer.engine_id)
812    {
813        return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
814    }
815    Ok(())
816}
817
818fn validate_declared_actor_topology(
819    path: &ActorPath,
820    owner: &EngineId,
821    local_engine_id: &EngineId,
822    peers: &[PeerSpec],
823    existing: &[DeclaredRemoteActor],
824) -> Result<(), ClusterError> {
825    if owner == local_engine_id {
826        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
827    }
828    if existing.iter().any(|existing| existing.path == *path) {
829        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
830    }
831    if !peers.iter().any(|peer| peer.engine_id == *owner) {
832        return Err(ClusterError::UnknownPeerOwner {
833            path: path.clone(),
834            owner: owner.clone(),
835        });
836    }
837    Ok(())
838}
839
840fn validate_declared_actor_registry<R: Reactor>(
841    path: &ActorPath,
842    owner: &EngineId,
843    local_engine_id: &EngineId,
844    existing: &[DeclaredRemoteActor],
845    registry: &Arc<ActorRegistry<R>>,
846) -> Result<(), ClusterError> {
847    if owner == local_engine_id || registry.get_by_path(path).is_some() {
848        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
849    }
850    if existing.iter().any(|existing| existing.path == *path) {
851        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
852    }
853    Ok(())
854}
855
856fn validate_role(
857    role: &BoundedClusterRole,
858    peers: &[PeerSpec],
859    existing: &[BoundedClusterRole],
860) -> Result<(), ClusterError> {
861    if existing.iter().any(|existing| existing.name == role.name) {
862        return Err(ClusterError::DuplicateRole(role.name.clone()));
863    }
864    if !peers.iter().any(|peer| peer.engine_id == role.owner) {
865        return Err(ClusterError::UnknownRoleOwner {
866            role: role.name.clone(),
867            owner: role.owner.clone(),
868        });
869    }
870    if !peers
871        .iter()
872        .any(|peer| peer.engine_id == role.owner && peer.control_plane_addr.is_some())
873    {
874        return Err(ClusterError::RoleOwnerMissingControlPlaneEndpoint {
875            role: role.name.clone(),
876            owner: role.owner.clone(),
877        });
878    }
879    Ok(())
880}
881
882#[cfg(test)]
883mod tests {
884    use super::{
885        call_control_plane, validate_config, BoundedClusterConfig, BoundedClusterRole,
886        BoundedTransportConfig, ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError,
887        RemoteSpawnSpec,
888    };
889    use crate::{Engine, EngineConfig};
890    use palladium_actor::{
891        Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
892        Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
893    };
894    use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
895    use rcgen::{
896        BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
897        KeyUsagePurpose,
898    };
899    use serde_json::json;
900    use std::net::{SocketAddr, TcpListener, UdpSocket};
901    use std::sync::Arc;
902    use std::time::{Duration, Instant};
903
904    fn make_ca() -> (Certificate, KeyPair) {
905        let mut params = CertificateParams::default();
906        params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
907        params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
908        let ca_key = KeyPair::generate().expect("ca keypair");
909        let cert = params.self_signed(&ca_key).expect("ca cert");
910        (cert, ca_key)
911    }
912
913    fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
914        let keypair = KeyPair::generate().expect("leaf keypair");
915        let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
916        let mut params = params;
917        params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
918        params.extended_key_usages = vec![
919            ExtendedKeyUsagePurpose::ServerAuth,
920            ExtendedKeyUsagePurpose::ClientAuth,
921        ];
922        let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
923        let cert_der = cert.der().to_vec();
924        let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
925        let ca_der = ca.der().to_vec();
926        TlsConfig {
927            cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
928            private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
929            trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
930        }
931    }
932
933    fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
934        TcpTransportConfig {
935            engine_id: EngineId::new(engine_id),
936            listen_addr: "127.0.0.1:0".parse().unwrap(),
937            max_connections_per_peer: 1,
938            idle_timeout: Duration::from_secs(1),
939            send_buffer_size: 8,
940            nodelay: true,
941            tls,
942            send_delay: Duration::from_millis(0),
943        }
944    }
945
946    fn reserve_tcp_addr() -> SocketAddr {
947        let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
948        let addr = listener.local_addr().expect("tcp local addr");
949        drop(listener);
950        addr
951    }
952
953    fn reserve_udp_addr() -> SocketAddr {
954        let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
955        let addr = socket.local_addr().expect("udp local addr");
956        drop(socket);
957        addr
958    }
959
960    #[test]
961    fn bounded_cluster_config_rejects_duplicate_peers() {
962        let (ca, ca_key) = make_ca();
963        let engine = Engine::with_config(EngineConfig {
964            engine_id: EngineId::new("engine-a.example"),
965            ..Default::default()
966        });
967        let config = BoundedClusterConfig {
968            transport: BoundedTransportConfig::Tcp(tcp_config(
969                "engine-a.example",
970                make_tls_config("engine-a.example", &ca, &ca_key),
971            )),
972            peers: vec![
973                PeerSpec {
974                    engine_id: EngineId::new("engine-b.example"),
975                    addr: "127.0.0.1:4100".parse().unwrap(),
976                    control_plane_addr: None,
977                    server_name: None,
978                },
979                PeerSpec {
980                    engine_id: EngineId::new("engine-b.example"),
981                    addr: "127.0.0.1:4200".parse().unwrap(),
982                    control_plane_addr: None,
983                    server_name: None,
984                },
985            ],
986            declared_actors: Vec::new(),
987            roles: Vec::new(),
988        };
989
990        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
991        assert_eq!(
992            err,
993            ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
994        );
995    }
996
997    #[test]
998    fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
999        let (ca, ca_key) = make_ca();
1000        let engine = Engine::with_config(EngineConfig {
1001            engine_id: EngineId::new("engine-a.example"),
1002            ..Default::default()
1003        });
1004        let config = BoundedClusterConfig {
1005            transport: BoundedTransportConfig::Tcp(tcp_config(
1006                "engine-a.example",
1007                make_tls_config("engine-a.example", &ca, &ca_key),
1008            )),
1009            peers: vec![PeerSpec {
1010                engine_id: EngineId::new("engine-b.example"),
1011                addr: "127.0.0.1:4100".parse().unwrap(),
1012                control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1013                server_name: None,
1014            }],
1015            declared_actors: vec![DeclaredRemoteActor {
1016                path: ActorPath::parse("/user/missing-owner").unwrap(),
1017                owner: EngineId::new("engine-c.example"),
1018            }],
1019            roles: Vec::new(),
1020        };
1021
1022        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1023        assert_eq!(
1024            err,
1025            ClusterError::UnknownPeerOwner {
1026                path: ActorPath::parse("/user/missing-owner").unwrap(),
1027                owner: EngineId::new("engine-c.example"),
1028            }
1029        );
1030    }
1031
1032    #[test]
1033    fn bounded_cluster_config_rejects_duplicate_roles() {
1034        let (ca, ca_key) = make_ca();
1035        let engine = Engine::with_config(EngineConfig {
1036            engine_id: EngineId::new("engine-a.example"),
1037            ..Default::default()
1038        });
1039        let config = BoundedClusterConfig {
1040            transport: BoundedTransportConfig::Tcp(tcp_config(
1041                "engine-a.example",
1042                make_tls_config("engine-a.example", &ca, &ca_key),
1043            )),
1044            peers: vec![PeerSpec {
1045                engine_id: EngineId::new("engine-b.example"),
1046                addr: "127.0.0.1:4100".parse().unwrap(),
1047                control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1048                server_name: None,
1049            }],
1050            declared_actors: Vec::new(),
1051            roles: vec![
1052                BoundedClusterRole {
1053                    name: "context-service".to_string(),
1054                    owner: EngineId::new("engine-b.example"),
1055                },
1056                BoundedClusterRole {
1057                    name: "context-service".to_string(),
1058                    owner: EngineId::new("engine-b.example"),
1059                },
1060            ],
1061        };
1062
1063        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1064        assert_eq!(
1065            err,
1066            ClusterError::DuplicateRole("context-service".to_string())
1067        );
1068    }
1069
1070    #[test]
1071    fn bounded_cluster_config_rejects_unknown_role_owner() {
1072        let (ca, ca_key) = make_ca();
1073        let engine = Engine::with_config(EngineConfig {
1074            engine_id: EngineId::new("engine-a.example"),
1075            ..Default::default()
1076        });
1077        let config = BoundedClusterConfig {
1078            transport: BoundedTransportConfig::Tcp(tcp_config(
1079                "engine-a.example",
1080                make_tls_config("engine-a.example", &ca, &ca_key),
1081            )),
1082            peers: vec![PeerSpec {
1083                engine_id: EngineId::new("engine-b.example"),
1084                addr: "127.0.0.1:4100".parse().unwrap(),
1085                control_plane_addr: None,
1086                server_name: None,
1087            }],
1088            declared_actors: Vec::new(),
1089            roles: vec![BoundedClusterRole {
1090                name: "gateway".to_string(),
1091                owner: EngineId::new("engine-c.example"),
1092            }],
1093        };
1094
1095        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1096        assert_eq!(
1097            err,
1098            ClusterError::UnknownRoleOwner {
1099                role: "gateway".to_string(),
1100                owner: EngineId::new("engine-c.example"),
1101            }
1102        );
1103    }
1104
1105    #[test]
1106    fn bounded_cluster_config_rejects_role_owner_without_control_plane() {
1107        let (ca, ca_key) = make_ca();
1108        let config = BoundedClusterConfig {
1109            transport: BoundedTransportConfig::Tcp(tcp_config(
1110                "engine-a.example",
1111                make_tls_config("engine-a.example", &ca, &ca_key),
1112            )),
1113            peers: vec![PeerSpec {
1114                engine_id: EngineId::new("engine-b.example"),
1115                addr: "127.0.0.1:4100".parse().unwrap(),
1116                control_plane_addr: None,
1117                server_name: None,
1118            }],
1119            declared_actors: Vec::new(),
1120            roles: vec![BoundedClusterRole {
1121                name: "session-host".to_string(),
1122                owner: EngineId::new("engine-b.example"),
1123            }],
1124        };
1125
1126        let err = config.validate().unwrap_err();
1127        assert_eq!(
1128            err,
1129            ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1130                role: "session-host".to_string(),
1131                owner: EngineId::new("engine-b.example"),
1132            }
1133        );
1134    }
1135
1136    #[tokio::test]
1137    async fn attach_bounded_cluster_rejects_role_owner_without_control_plane() {
1138        let (ca, ca_key) = make_ca();
1139        let engine = Engine::with_config(EngineConfig {
1140            engine_id: EngineId::new("engine-a.example"),
1141            ..Default::default()
1142        });
1143
1144        let err = match engine
1145            .handle()
1146            .attach_bounded_cluster(BoundedClusterConfig {
1147                transport: BoundedTransportConfig::Tcp(tcp_config(
1148                    "engine-a.example",
1149                    make_tls_config("engine-a.example", &ca, &ca_key),
1150                )),
1151                peers: vec![PeerSpec {
1152                    engine_id: EngineId::new("engine-b.example"),
1153                    addr: "127.0.0.1:4100".parse().unwrap(),
1154                    control_plane_addr: None,
1155                    server_name: None,
1156                }],
1157                declared_actors: Vec::new(),
1158                roles: vec![BoundedClusterRole {
1159                    name: "session-host".to_string(),
1160                    owner: EngineId::new("engine-b.example"),
1161                }],
1162            })
1163            .await
1164        {
1165            Ok(_) => panic!("expected bounded cluster attach to reject missing control plane"),
1166            Err(err) => err,
1167        };
1168
1169        assert_eq!(
1170            err,
1171            ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1172                role: "session-host".to_string(),
1173                owner: EngineId::new("engine-b.example"),
1174            }
1175        );
1176    }
1177
1178    #[test]
1179    fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
1180        assert_eq!(
1181            super::remote_spawn_wait_timeout(Duration::from_millis(25)),
1182            Duration::from_secs(5)
1183        );
1184        assert_eq!(
1185            super::remote_spawn_wait_timeout(Duration::from_secs(8)),
1186            Duration::from_secs(8)
1187        );
1188    }
1189
1190    fn tcp_tests_available() -> bool {
1191        if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
1192            return false;
1193        }
1194        std::net::TcpListener::bind("127.0.0.1:0").is_ok()
1195    }
1196
1197    fn quic_tests_available() -> bool {
1198        super::ensure_rustls_provider();
1199        rustls::crypto::CryptoProvider::get_default().is_some()
1200    }
1201
1202    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1203    struct EchoRequest {
1204        value: u64,
1205    }
1206
1207    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1208    struct EchoResponse {
1209        doubled: u64,
1210    }
1211
1212    impl Message for EchoRequest {
1213        type Response = EchoResponse;
1214        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
1215    }
1216
1217    struct EchoActor;
1218
1219    impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
1220        fn on_message(
1221            &mut self,
1222            ctx: &mut ActorContext<R>,
1223            envelope: &Envelope,
1224            payload: MessagePayload,
1225        ) -> Result<(), ActorError> {
1226            let req = payload
1227                .extract::<EchoRequest>()
1228                .map_err(|_| ActorError::Handler)?;
1229            let resp = EchoResponse {
1230                doubled: req.value * 2,
1231            };
1232            let resp_env = envelope.response(0);
1233            ctx.send_raw(resp_env, MessagePayload::local(resp))
1234                .map_err(|_| ActorError::Handler)
1235        }
1236    }
1237
1238    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1239    struct CrashRequest;
1240
1241    impl Message for CrashRequest {
1242        type Response = ();
1243        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
1244    }
1245
1246    struct SpawnedEchoActor;
1247
1248    impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
1249        fn on_message(
1250            &mut self,
1251            ctx: &mut ActorContext<R>,
1252            envelope: &Envelope,
1253            payload: MessagePayload,
1254        ) -> Result<(), ActorError> {
1255            match envelope.type_tag {
1256                EchoRequest::TYPE_TAG => {
1257                    let req = payload
1258                        .extract::<EchoRequest>()
1259                        .map_err(|_| ActorError::Handler)?;
1260                    let resp = EchoResponse {
1261                        doubled: req.value * 2,
1262                    };
1263                    let resp_env = envelope.response(0);
1264                    ctx.send_raw(resp_env, MessagePayload::local(resp))
1265                        .map_err(|_| ActorError::Handler)
1266                }
1267                CrashRequest::TYPE_TAG => Err(ActorError::Handler),
1268                _ => Err(ActorError::Handler),
1269            }
1270        }
1271    }
1272
1273    fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
1274        Arc::new(|type_name: &str, _config: &[u8]| match type_name {
1275            "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
1276            _ => Err(format!("unknown remote spawn type: {type_name}")),
1277        })
1278    }
1279
1280    #[tokio::test]
1281    async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
1282        if !tcp_tests_available() {
1283            eprintln!("skipping tcp integration tests: network bind not permitted");
1284            return;
1285        }
1286
1287        let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
1288        let (ca, ca_key) = make_ca();
1289        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1290        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1291
1292        let mut engine_a = Engine::with_config(EngineConfig {
1293            engine_id: EngineId::new("engine-a.example"),
1294            ..Default::default()
1295        });
1296        let engine_b = Engine::with_config(EngineConfig {
1297            engine_id: EngineId::new("engine-b.example"),
1298            ..Default::default()
1299        });
1300
1301        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1302        engine_a.add_user_actor(ChildSpec::new(
1303            "cluster-echo",
1304            RestartPolicy::Permanent,
1305            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1306            ns,
1307            move || Box::new(EchoActor),
1308        ));
1309
1310        let handle_a = engine_a.handle();
1311        let handle_b = engine_b.handle();
1312        handle_a
1313            .type_registry()
1314            .register_remote_ask::<EchoRequest>();
1315        handle_b
1316            .type_registry()
1317            .register_remote_ask::<EchoRequest>();
1318
1319        let cluster_a = handle_a
1320            .attach_bounded_cluster(BoundedClusterConfig {
1321                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1322                peers: Vec::new(),
1323                declared_actors: Vec::new(),
1324                roles: Vec::new(),
1325            })
1326            .await
1327            .expect("attach bounded cluster a");
1328        let tcp_a = cluster_a
1329            .transport()
1330            .as_tcp()
1331            .cloned()
1332            .expect("tcp transport a");
1333
1334        let cluster_b = handle_b
1335            .attach_bounded_cluster(BoundedClusterConfig {
1336                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1337                peers: vec![PeerSpec {
1338                    engine_id: EngineId::new("engine-a.example"),
1339                    addr: tcp_a.local_addr(),
1340                    control_plane_addr: None,
1341                    server_name: None,
1342                }],
1343                declared_actors: Vec::new(),
1344                roles: Vec::new(),
1345            })
1346            .await
1347            .expect("attach bounded cluster b");
1348        let tcp_b = cluster_b
1349            .transport()
1350            .as_tcp()
1351            .cloned()
1352            .expect("tcp transport b");
1353
1354        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1355        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1356        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1357
1358        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1359        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1360
1361        let canonical = AddrHash::new(&actor_path, 0);
1362        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1363        while !cluster_b.can_route(&actor_path) {
1364            if std::time::Instant::now() > deadline {
1365                panic!("bounded cluster route did not propagate");
1366            }
1367            tokio::time::sleep(Duration::from_millis(20)).await;
1368        }
1369        assert!(tcp_b.can_route(canonical));
1370
1371        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1372        let response = remote
1373            .ask(EchoRequest { value: 12 })
1374            .await
1375            .expect("bounded cluster ask");
1376        assert_eq!(response, EchoResponse { doubled: 24 });
1377
1378        shutdown_a_tx.send(()).ok();
1379        shutdown_b_tx.send(()).ok();
1380    }
1381
1382    #[tokio::test]
1383    async fn bounded_cluster_add_peer_after_start_backfills_existing_tcp_routes() {
1384        if !tcp_tests_available() {
1385            eprintln!("skipping tcp integration tests: network bind not permitted");
1386            return;
1387        }
1388
1389        let actor_path = ActorPath::parse("/user/cluster-echo-add-peer").unwrap();
1390        let (ca, ca_key) = make_ca();
1391        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1392        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1393
1394        let mut engine_a = Engine::with_config(EngineConfig {
1395            engine_id: EngineId::new("engine-a.example"),
1396            ..Default::default()
1397        });
1398        let engine_b = Engine::with_config(EngineConfig {
1399            engine_id: EngineId::new("engine-b.example"),
1400            ..Default::default()
1401        });
1402
1403        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1404        engine_a.add_user_actor(ChildSpec::new(
1405            "cluster-echo-add-peer",
1406            RestartPolicy::Permanent,
1407            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1408            ns,
1409            move || Box::new(EchoActor),
1410        ));
1411
1412        let handle_a = engine_a.handle();
1413        let handle_b = engine_b.handle();
1414        handle_a
1415            .type_registry()
1416            .register_remote_ask::<EchoRequest>();
1417        handle_b
1418            .type_registry()
1419            .register_remote_ask::<EchoRequest>();
1420
1421        let mut cluster_a = handle_a
1422            .attach_bounded_cluster(BoundedClusterConfig {
1423                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1424                peers: Vec::new(),
1425                declared_actors: Vec::new(),
1426                roles: Vec::new(),
1427            })
1428            .await
1429            .expect("attach bounded cluster a");
1430        let tcp_a = cluster_a
1431            .transport()
1432            .as_tcp()
1433            .cloned()
1434            .expect("tcp transport a");
1435
1436        let cluster_b = handle_b
1437            .attach_bounded_cluster(BoundedClusterConfig {
1438                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1439                peers: vec![PeerSpec {
1440                    engine_id: EngineId::new("engine-a.example"),
1441                    addr: tcp_a.local_addr(),
1442                    control_plane_addr: None,
1443                    server_name: None,
1444                }],
1445                declared_actors: Vec::new(),
1446                roles: Vec::new(),
1447            })
1448            .await
1449            .expect("attach bounded cluster b");
1450        let tcp_b = cluster_b
1451            .transport()
1452            .as_tcp()
1453            .cloned()
1454            .expect("tcp transport b");
1455
1456        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1457        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1458
1459        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1460        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1461
1462        let local_ready_deadline = std::time::Instant::now() + Duration::from_secs(5);
1463        while handle_a.registry.get_by_path(&actor_path).is_none() {
1464            if std::time::Instant::now() > local_ready_deadline {
1465                panic!("local actor did not start before peer add");
1466            }
1467            tokio::time::sleep(Duration::from_millis(20)).await;
1468        }
1469
1470        cluster_a
1471            .add_peer(PeerSpec {
1472                engine_id: EngineId::new("engine-b.example"),
1473                addr: tcp_b.local_addr(),
1474                control_plane_addr: None,
1475                server_name: None,
1476            })
1477            .expect("add peer b to cluster a after start");
1478
1479        let canonical = AddrHash::new(&actor_path, 0);
1480        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1481        while !cluster_b.can_route(&actor_path) {
1482            if std::time::Instant::now() > deadline {
1483                panic!("bounded cluster route did not propagate after add_peer");
1484            }
1485            tokio::time::sleep(Duration::from_millis(20)).await;
1486        }
1487        assert!(tcp_b.can_route(canonical));
1488
1489        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1490        let response = remote
1491            .ask(EchoRequest { value: 15 })
1492            .await
1493            .expect("bounded cluster ask after add_peer");
1494        assert_eq!(response, EchoResponse { doubled: 30 });
1495
1496        shutdown_a_tx.send(()).ok();
1497        shutdown_b_tx.send(()).ok();
1498    }
1499
1500    #[tokio::test]
1501    async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1502        if !tcp_tests_available() {
1503            eprintln!("skipping tcp integration tests: network bind not permitted");
1504            return;
1505        }
1506
1507        let (ca, ca_key) = make_ca();
1508        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1509        let engine = Engine::with_config(EngineConfig {
1510            engine_id: EngineId::new("engine-a.example"),
1511            ..Default::default()
1512        });
1513        let handle = engine.handle();
1514
1515        let mut cluster = handle
1516            .attach_bounded_cluster(BoundedClusterConfig {
1517                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1518                peers: vec![PeerSpec {
1519                    engine_id: EngineId::new("engine-b.example"),
1520                    addr: "127.0.0.1:4100".parse().unwrap(),
1521                    control_plane_addr: None,
1522                    server_name: None,
1523                }],
1524                declared_actors: Vec::new(),
1525                roles: Vec::new(),
1526            })
1527            .await
1528            .expect("attach bounded cluster");
1529
1530        let path = ActorPath::parse("/user/declared-remote").unwrap();
1531        cluster
1532            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1533            .expect("declare remote actor");
1534
1535        assert!(cluster.can_route(&path));
1536        assert!(cluster
1537            .transport()
1538            .as_tcp()
1539            .expect("tcp transport")
1540            .can_route(AddrHash::new(&path, 0)));
1541    }
1542
1543    #[tokio::test]
1544    async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1545        if !tcp_tests_available() {
1546            eprintln!("skipping tcp integration tests: network bind not permitted");
1547            return;
1548        }
1549
1550        let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1551        let (ca, ca_key) = make_ca();
1552        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1553        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1554        let control_plane_b = reserve_tcp_addr();
1555
1556        let mut engine_a = Engine::with_config(EngineConfig {
1557            engine_id: EngineId::new("engine-a.example"),
1558            ..Default::default()
1559        });
1560        let engine_b = Engine::with_config(EngineConfig {
1561            engine_id: EngineId::new("engine-b.example"),
1562            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1563            control_plane_tls: Some(tls_b.clone()),
1564            actor_spawn: Some(bounded_spawn_handler()),
1565            ..Default::default()
1566        });
1567
1568        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1569        engine_a.add_user_actor(ChildSpec::new(
1570            "local-remote-collision",
1571            RestartPolicy::Permanent,
1572            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1573            ns,
1574            move || Box::new(EchoActor),
1575        ));
1576
1577        let handle_a = engine_a.handle();
1578        let handle_b = engine_b.handle();
1579
1580        let cluster_b = handle_b
1581            .attach_bounded_cluster(BoundedClusterConfig {
1582                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1583                peers: Vec::new(),
1584                declared_actors: Vec::new(),
1585                roles: Vec::new(),
1586            })
1587            .await
1588            .expect("attach bounded cluster b");
1589        let tcp_b = cluster_b
1590            .transport()
1591            .as_tcp()
1592            .cloned()
1593            .expect("tcp transport b");
1594
1595        let cluster_a = handle_a
1596            .attach_bounded_cluster(BoundedClusterConfig {
1597                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1598                peers: vec![PeerSpec {
1599                    engine_id: EngineId::new("engine-b.example"),
1600                    addr: tcp_b.local_addr(),
1601                    control_plane_addr: Some(control_plane_b),
1602                    server_name: None,
1603                }],
1604                declared_actors: Vec::new(),
1605                roles: vec![BoundedClusterRole {
1606                    name: "context-service".to_string(),
1607                    owner: EngineId::new("engine-b.example"),
1608                }],
1609            })
1610            .await
1611            .expect("attach bounded cluster a");
1612
1613        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1614        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1615
1616        let deadline = Instant::now() + Duration::from_secs(5);
1617        while handle_a.registry.get_by_path(&actor_path).is_none() {
1618            if Instant::now() > deadline {
1619                panic!("local collision actor did not start");
1620            }
1621            tokio::time::sleep(Duration::from_millis(20)).await;
1622        }
1623
1624        let err = cluster_a
1625            .spawn_remote_on_role::<EchoRequest>(
1626                "context-service",
1627                RemoteSpawnSpec {
1628                    path: actor_path.clone(),
1629                    type_name: "bounded-echo".to_string(),
1630                    config: None,
1631                },
1632            )
1633            .await
1634            .expect_err("local path collision should fail");
1635        assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1636
1637        shutdown_a_tx.send(()).ok();
1638    }
1639
1640    #[tokio::test]
1641    async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1642        if !tcp_tests_available() {
1643            eprintln!("skipping tcp integration tests: network bind not permitted");
1644            return;
1645        }
1646
1647        let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1648        let (ca, ca_key) = make_ca();
1649        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1650        let engine = Engine::with_config(EngineConfig {
1651            engine_id: EngineId::new("engine-a.example"),
1652            ..Default::default()
1653        });
1654        let handle = engine.handle();
1655
1656        let cluster = handle
1657            .attach_bounded_cluster(BoundedClusterConfig {
1658                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1659                peers: vec![
1660                    PeerSpec {
1661                        engine_id: EngineId::new("engine-b.example"),
1662                        addr: "127.0.0.1:4100".parse().unwrap(),
1663                        control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1664                        server_name: None,
1665                    },
1666                    PeerSpec {
1667                        engine_id: EngineId::new("engine-c.example"),
1668                        addr: "127.0.0.1:4200".parse().unwrap(),
1669                        control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1670                        server_name: None,
1671                    },
1672                ],
1673                declared_actors: vec![DeclaredRemoteActor {
1674                    path: actor_path.clone(),
1675                    owner: EngineId::new("engine-b.example"),
1676                }],
1677                roles: Vec::new(),
1678            })
1679            .await
1680            .expect("attach bounded cluster");
1681
1682        let err = cluster
1683            .spawn_remote::<EchoRequest>(
1684                EngineId::new("engine-c.example"),
1685                RemoteSpawnSpec {
1686                    path: actor_path.clone(),
1687                    type_name: "bounded-echo".to_string(),
1688                    config: None,
1689                },
1690            )
1691            .await
1692            .expect_err("conflicting declared owner should fail");
1693        assert_eq!(
1694            err,
1695            RemoteSpawnError::RemoteOwnershipConflict {
1696                path: actor_path,
1697                owner: EngineId::new("engine-b.example"),
1698            }
1699        );
1700    }
1701
1702    #[tokio::test]
1703    async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1704        if !tcp_tests_available() {
1705            eprintln!("skipping tcp integration tests: network bind not permitted");
1706            return;
1707        }
1708
1709        let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1710        let (ca, ca_key) = make_ca();
1711        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1712        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1713        let control_plane_b = reserve_tcp_addr();
1714
1715        let engine_a = Engine::with_config(EngineConfig {
1716            engine_id: EngineId::new("engine-a.example"),
1717            ..Default::default()
1718        });
1719        let engine_b = Engine::with_config(EngineConfig {
1720            engine_id: EngineId::new("engine-b.example"),
1721            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1722            control_plane_tls: Some(tls_b.clone()),
1723            actor_spawn: Some(bounded_spawn_handler()),
1724            ..Default::default()
1725        });
1726
1727        let handle_a = engine_a.handle();
1728        let handle_b = engine_b.handle();
1729        handle_a
1730            .type_registry()
1731            .register_remote_ask::<EchoRequest>();
1732        handle_b
1733            .type_registry()
1734            .register_remote_ask::<EchoRequest>();
1735
1736        let cluster_a = handle_a
1737            .attach_bounded_cluster(BoundedClusterConfig {
1738                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1739                peers: Vec::new(),
1740                declared_actors: Vec::new(),
1741                roles: Vec::new(),
1742            })
1743            .await
1744            .expect("attach bounded cluster a");
1745        let tcp_a = cluster_a
1746            .transport()
1747            .as_tcp()
1748            .cloned()
1749            .expect("tcp transport a");
1750
1751        let cluster_b = handle_b
1752            .attach_bounded_cluster(BoundedClusterConfig {
1753                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1754                peers: vec![PeerSpec {
1755                    engine_id: EngineId::new("engine-a.example"),
1756                    addr: tcp_a.local_addr(),
1757                    control_plane_addr: None,
1758                    server_name: None,
1759                }],
1760                declared_actors: Vec::new(),
1761                roles: Vec::new(),
1762            })
1763            .await
1764            .expect("attach bounded cluster b");
1765        let tcp_b = cluster_b
1766            .transport()
1767            .as_tcp()
1768            .cloned()
1769            .expect("tcp transport b");
1770
1771        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1772        let mut cluster_a = cluster_a;
1773        cluster_a
1774            .add_peer(PeerSpec {
1775                engine_id: EngineId::new("engine-b.example"),
1776                addr: tcp_b.local_addr(),
1777                control_plane_addr: Some(control_plane_b),
1778                server_name: None,
1779            })
1780            .expect("add peer with control plane");
1781
1782        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1783        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1784
1785        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1786        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1787        tokio::time::sleep(Duration::from_millis(150)).await;
1788
1789        let remote = cluster_a
1790            .spawn_remote::<EchoRequest>(
1791                EngineId::new("engine-b.example"),
1792                RemoteSpawnSpec {
1793                    path: actor_path.clone(),
1794                    type_name: "bounded-echo".to_string(),
1795                    config: None,
1796                },
1797            )
1798            .await
1799            .expect("spawn remote actor");
1800
1801        let response = remote
1802            .ask(EchoRequest { value: 21 })
1803            .await
1804            .expect("bounded remote spawn ask");
1805        assert_eq!(response, EchoResponse { doubled: 42 });
1806
1807        let duplicate = cluster_a
1808            .spawn_remote::<EchoRequest>(
1809                EngineId::new("engine-b.example"),
1810                RemoteSpawnSpec {
1811                    path: actor_path.clone(),
1812                    type_name: "bounded-echo".to_string(),
1813                    config: None,
1814                },
1815            )
1816            .await
1817            .expect_err("duplicate remote spawn should fail");
1818        assert_eq!(
1819            duplicate,
1820            RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
1821        );
1822
1823        let old_host_addr = handle_b
1824            .registry
1825            .get_by_path(&actor_path)
1826            .expect("spawned actor registered on host")
1827            .addr;
1828        call_control_plane(
1829            &cluster_a.control_plane,
1830            control_plane_b,
1831            "engine-b.example",
1832            "actor.restart",
1833            json!({ "path": actor_path.as_str() }),
1834        )
1835        .await
1836        .expect("restart remote actor on host");
1837
1838        let restart_deadline = Instant::now() + Duration::from_secs(5);
1839        loop {
1840            if handle_b
1841                .registry
1842                .get_by_path(&actor_path)
1843                .map(|slot| {
1844                    slot.running.load(std::sync::atomic::Ordering::Relaxed)
1845                        && slot.addr != old_host_addr
1846                        && slot
1847                            .restart_count
1848                            .load(std::sync::atomic::Ordering::Relaxed)
1849                            > 0
1850                })
1851                .unwrap_or(false)
1852            {
1853                break;
1854            }
1855            if Instant::now() > restart_deadline {
1856                panic!("remote actor did not restart on host");
1857            }
1858            tokio::time::sleep(Duration::from_millis(25)).await;
1859        }
1860
1861        let restart_deadline = Instant::now() + Duration::from_secs(5);
1862        loop {
1863            if cluster_a.can_route(&actor_path) {
1864                break;
1865            }
1866            if Instant::now() > restart_deadline {
1867                panic!("remote actor route did not return after restart");
1868            }
1869            tokio::time::sleep(Duration::from_millis(25)).await;
1870        }
1871
1872        let restart_deadline = Instant::now() + Duration::from_secs(5);
1873        loop {
1874            match remote.ask(EchoRequest { value: 7 }).await {
1875                Ok(response) => {
1876                    assert_eq!(response, EchoResponse { doubled: 14 });
1877                    break;
1878                }
1879                Err(_) if Instant::now() <= restart_deadline => {
1880                    tokio::time::sleep(Duration::from_millis(25)).await;
1881                }
1882                Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1883            }
1884        }
1885
1886        let unknown = cluster_a
1887            .spawn_remote::<EchoRequest>(
1888                EngineId::new("engine-c.example"),
1889                RemoteSpawnSpec {
1890                    path: ActorPath::parse("/user/unknown-target").unwrap(),
1891                    type_name: "bounded-echo".to_string(),
1892                    config: None,
1893                },
1894            )
1895            .await
1896            .expect_err("unknown target should fail");
1897        assert_eq!(
1898            unknown,
1899            RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1900        );
1901
1902        shutdown_a_tx.send(()).ok();
1903        shutdown_b_tx.send(()).ok();
1904    }
1905
1906    #[tokio::test]
1907    async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1908        if !tcp_tests_available() {
1909            eprintln!("skipping tcp integration tests: network bind not permitted");
1910            return;
1911        }
1912
1913        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1914        let (ca, ca_key) = make_ca();
1915        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1916        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1917        let control_plane_b = reserve_tcp_addr();
1918
1919        let engine_a = Engine::with_config(EngineConfig {
1920            engine_id: EngineId::new("engine-a.example"),
1921            ..Default::default()
1922        });
1923        let engine_b = Engine::with_config(EngineConfig {
1924            engine_id: EngineId::new("engine-b.example"),
1925            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1926            control_plane_tls: Some(tls_b.clone()),
1927            actor_spawn: Some(bounded_spawn_handler()),
1928            ..Default::default()
1929        });
1930
1931        let handle_a = engine_a.handle();
1932        let handle_b = engine_b.handle();
1933        handle_a
1934            .type_registry()
1935            .register_remote_ask::<EchoRequest>();
1936        handle_b
1937            .type_registry()
1938            .register_remote_ask::<EchoRequest>();
1939
1940        let cluster_b = handle_b
1941            .attach_bounded_cluster(BoundedClusterConfig {
1942                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1943                peers: Vec::new(),
1944                declared_actors: Vec::new(),
1945                roles: Vec::new(),
1946            })
1947            .await
1948            .expect("attach bounded cluster b");
1949        let tcp_b = cluster_b
1950            .transport()
1951            .as_tcp()
1952            .cloned()
1953            .expect("tcp transport b");
1954
1955        let cluster_a = handle_a
1956            .attach_bounded_cluster(BoundedClusterConfig {
1957                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1958                peers: vec![PeerSpec {
1959                    engine_id: EngineId::new("engine-b.example"),
1960                    addr: tcp_b.local_addr(),
1961                    control_plane_addr: Some(control_plane_b),
1962                    server_name: None,
1963                }],
1964                declared_actors: Vec::new(),
1965                roles: vec![BoundedClusterRole {
1966                    name: "context-service".to_string(),
1967                    owner: EngineId::new("engine-b.example"),
1968                }],
1969            })
1970            .await
1971            .expect("attach bounded cluster a");
1972        let tcp_a = cluster_a
1973            .transport()
1974            .as_tcp()
1975            .cloned()
1976            .expect("tcp transport a");
1977
1978        let mut cluster_b = cluster_b;
1979        cluster_b
1980            .add_peer(PeerSpec {
1981                engine_id: EngineId::new("engine-a.example"),
1982                addr: tcp_a.local_addr(),
1983                control_plane_addr: None,
1984                server_name: None,
1985            })
1986            .expect("add peer a to cluster b");
1987
1988        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1989        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1990
1991        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1992        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1993        tokio::time::sleep(Duration::from_millis(150)).await;
1994
1995        let remote = cluster_a
1996            .spawn_remote_on_role::<EchoRequest>(
1997                "context-service",
1998                RemoteSpawnSpec {
1999                    path: actor_path.clone(),
2000                    type_name: "bounded-echo".to_string(),
2001                    config: None,
2002                },
2003            )
2004            .await
2005            .expect("spawn remote actor by role over tcp");
2006
2007        let response = remote
2008            .ask(EchoRequest { value: 11 })
2009            .await
2010            .expect("bounded remote spawn ask over tcp by role");
2011        assert_eq!(response, EchoResponse { doubled: 22 });
2012
2013        let unknown_role = cluster_a
2014            .spawn_remote_on_role::<EchoRequest>(
2015                "missing-role",
2016                RemoteSpawnSpec {
2017                    path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
2018                    type_name: "bounded-echo".to_string(),
2019                    config: None,
2020                },
2021            )
2022            .await
2023            .expect_err("unknown role should fail");
2024        assert_eq!(
2025            unknown_role,
2026            RemoteSpawnError::UnknownRole("missing-role".to_string())
2027        );
2028
2029        shutdown_a_tx.send(()).ok();
2030        shutdown_b_tx.send(()).ok();
2031    }
2032
2033    #[tokio::test]
2034    async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
2035        if !tcp_tests_available() {
2036            eprintln!("skipping tcp integration tests: network bind not permitted");
2037            return;
2038        }
2039
2040        let engine_a_id = EngineId::new("engine-a-norev.example");
2041        let engine_b_id = EngineId::new("engine-b-norev.example");
2042        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
2043        let (ca, ca_key) = make_ca();
2044        let tls_a = make_tls_config(engine_a_id.as_str(), &ca, &ca_key);
2045        let tls_b = make_tls_config(engine_b_id.as_str(), &ca, &ca_key);
2046        let control_plane_b = reserve_tcp_addr();
2047
2048        let engine_a = Engine::with_config(EngineConfig {
2049            engine_id: engine_a_id.clone(),
2050            ..Default::default()
2051        });
2052        let engine_b = Engine::with_config(EngineConfig {
2053            engine_id: engine_b_id.clone(),
2054            control_plane_tcp_addr: Some(control_plane_b.to_string()),
2055            control_plane_tls: Some(tls_b.clone()),
2056            actor_spawn: Some(bounded_spawn_handler()),
2057            ..Default::default()
2058        });
2059
2060        let handle_a = engine_a.handle();
2061        let handle_b = engine_b.handle();
2062        handle_a
2063            .type_registry()
2064            .register_remote_ask::<EchoRequest>();
2065        handle_b
2066            .type_registry()
2067            .register_remote_ask::<EchoRequest>();
2068
2069        let cluster_b = handle_b
2070            .attach_bounded_cluster(BoundedClusterConfig {
2071                transport: BoundedTransportConfig::Tcp(tcp_config(engine_b_id.as_str(), tls_b)),
2072                peers: Vec::new(),
2073                declared_actors: Vec::new(),
2074                roles: Vec::new(),
2075            })
2076            .await
2077            .expect("attach bounded cluster b");
2078        let tcp_b = cluster_b
2079            .transport()
2080            .as_tcp()
2081            .cloned()
2082            .expect("tcp transport b");
2083
2084        let cluster_a = handle_a
2085            .attach_bounded_cluster(BoundedClusterConfig {
2086                transport: BoundedTransportConfig::Tcp(tcp_config(engine_a_id.as_str(), tls_a)),
2087                peers: vec![PeerSpec {
2088                    engine_id: engine_b_id.clone(),
2089                    addr: tcp_b.local_addr(),
2090                    control_plane_addr: Some(control_plane_b),
2091                    server_name: None,
2092                }],
2093                declared_actors: Vec::new(),
2094                roles: vec![BoundedClusterRole {
2095                    name: "context-service".to_string(),
2096                    owner: engine_b_id.clone(),
2097                }],
2098            })
2099            .await
2100            .expect("attach bounded cluster a");
2101        let tcp_a = cluster_a
2102            .transport()
2103            .as_tcp()
2104            .cloned()
2105            .expect("tcp transport a");
2106
2107        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2108        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2109
2110        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2111        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2112
2113        // Caller knows the static owner already. Remote spawn should not require
2114        // the host engine to predeclare a reverse peer just to install the
2115        // caller's canonical path route.
2116        let _ = tcp_a;
2117
2118        let remote = cluster_a
2119            .spawn_remote_on_role::<EchoRequest>(
2120                "context-service",
2121                RemoteSpawnSpec {
2122                    path: actor_path.clone(),
2123                    type_name: "bounded-echo".to_string(),
2124                    config: None,
2125                },
2126            )
2127            .await
2128            .expect("spawn remote actor by role over tcp without reverse declaration");
2129
2130        let response = remote
2131            .ask(EchoRequest { value: 13 })
2132            .await
2133            .expect("bounded remote spawn ask over tcp without reverse declaration");
2134        assert_eq!(response, EchoResponse { doubled: 26 });
2135
2136        shutdown_a_tx.send(()).ok();
2137        shutdown_b_tx.send(()).ok();
2138    }
2139
2140    fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
2141        QuicTransportConfig {
2142            engine_id: EngineId::new(engine_id),
2143            listen_addr: "127.0.0.1:0".parse().unwrap(),
2144            send_buffer_size: 1024,
2145            idle_timeout: Duration::from_secs(2),
2146            tls,
2147        }
2148    }
2149
2150    #[tokio::test]
2151    async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
2152        if !quic_tests_available() {
2153            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2154            return;
2155        }
2156
2157        let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
2158        let (ca, ca_key) = make_ca();
2159        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2160        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2161
2162        let mut engine_a = Engine::with_config(EngineConfig {
2163            engine_id: EngineId::new("engine-a.example"),
2164            ..Default::default()
2165        });
2166        let engine_b = Engine::with_config(EngineConfig {
2167            engine_id: EngineId::new("engine-b.example"),
2168            ..Default::default()
2169        });
2170
2171        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2172        engine_a.add_user_actor(ChildSpec::new(
2173            "cluster-echo-quic",
2174            RestartPolicy::Permanent,
2175            ShutdownPolicy::Timeout(Duration::from_secs(1)),
2176            ns,
2177            move || Box::new(EchoActor),
2178        ));
2179
2180        let handle_a = engine_a.handle();
2181        let handle_b = engine_b.handle();
2182        handle_a
2183            .type_registry()
2184            .register_remote_ask::<EchoRequest>();
2185        handle_b
2186            .type_registry()
2187            .register_remote_ask::<EchoRequest>();
2188
2189        let cluster_a = match handle_a
2190            .attach_bounded_cluster(BoundedClusterConfig {
2191                transport: BoundedTransportConfig::Quic(quic_config(
2192                    "engine-a.example",
2193                    tls_a.clone(),
2194                )),
2195                peers: Vec::new(),
2196                declared_actors: Vec::new(),
2197                roles: Vec::new(),
2198            })
2199            .await
2200        {
2201            Ok(cluster) => cluster,
2202            Err(ClusterError::Transport(err)) => {
2203                eprintln!("skipping quic bounded-cluster test: {err:?}");
2204                return;
2205            }
2206            Err(err) => panic!("attach bounded cluster a: {err:?}"),
2207        };
2208        let quic_a = cluster_a
2209            .transport()
2210            .as_quic()
2211            .cloned()
2212            .expect("quic transport a");
2213
2214        let cluster_b = match handle_b
2215            .attach_bounded_cluster(BoundedClusterConfig {
2216                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2217                peers: vec![PeerSpec {
2218                    engine_id: EngineId::new("engine-a.example"),
2219                    addr: quic_a.local_addr(),
2220                    control_plane_addr: None,
2221                    server_name: None,
2222                }],
2223                declared_actors: Vec::new(),
2224                roles: Vec::new(),
2225            })
2226            .await
2227        {
2228            Ok(cluster) => cluster,
2229            Err(ClusterError::Transport(err)) => {
2230                eprintln!("skipping quic bounded-cluster test: {err:?}");
2231                return;
2232            }
2233            Err(err) => panic!("attach bounded cluster b: {err:?}"),
2234        };
2235        let quic_b = cluster_b
2236            .transport()
2237            .as_quic()
2238            .cloned()
2239            .expect("quic transport b");
2240
2241        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
2242        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2243        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2244
2245        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2246        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2247
2248        let canonical = AddrHash::new(&actor_path, 0);
2249        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2250        while !cluster_b.can_route(&actor_path) {
2251            if std::time::Instant::now() > deadline {
2252                panic!("bounded quic cluster route did not propagate");
2253            }
2254            tokio::time::sleep(Duration::from_millis(20)).await;
2255        }
2256        assert!(quic_b.can_route(canonical));
2257
2258        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
2259        let ask_deadline = Instant::now() + Duration::from_secs(2);
2260        loop {
2261            match remote.ask(EchoRequest { value: 15 }).await {
2262                Ok(response) => {
2263                    assert_eq!(response, EchoResponse { doubled: 30 });
2264                    break;
2265                }
2266                Err(palladium_actor::AskError::Send(
2267                    palladium_actor::SendError::ConnectionFailed,
2268                )) if Instant::now() <= ask_deadline => {
2269                    tokio::time::sleep(Duration::from_millis(20)).await;
2270                }
2271                Err(err) => panic!("bounded cluster ask over quic: {err:?}"),
2272            }
2273        }
2274
2275        shutdown_a_tx.send(()).ok();
2276        shutdown_b_tx.send(()).ok();
2277    }
2278
2279    #[tokio::test]
2280    async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
2281        if !quic_tests_available() {
2282            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2283            return;
2284        }
2285
2286        let (ca, ca_key) = make_ca();
2287        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
2288        let engine = Engine::with_config(EngineConfig {
2289            engine_id: EngineId::new("engine-a.example"),
2290            ..Default::default()
2291        });
2292        let handle = engine.handle();
2293
2294        let mut cluster = match handle
2295            .attach_bounded_cluster(BoundedClusterConfig {
2296                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
2297                peers: vec![PeerSpec {
2298                    engine_id: EngineId::new("engine-b.example"),
2299                    addr: "127.0.0.1:4100".parse().unwrap(),
2300                    control_plane_addr: None,
2301                    server_name: None,
2302                }],
2303                declared_actors: Vec::new(),
2304                roles: Vec::new(),
2305            })
2306            .await
2307        {
2308            Ok(cluster) => cluster,
2309            Err(ClusterError::Transport(err)) => {
2310                eprintln!("skipping quic bounded-cluster test: {err:?}");
2311                return;
2312            }
2313            Err(err) => panic!("attach bounded cluster: {err:?}"),
2314        };
2315
2316        let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
2317        cluster
2318            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
2319            .expect("declare remote actor");
2320
2321        assert!(cluster.can_route(&path));
2322        assert!(cluster
2323            .transport()
2324            .as_quic()
2325            .expect("quic transport")
2326            .can_route(AddrHash::new(&path, 0)));
2327    }
2328
2329    #[tokio::test]
2330    async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
2331        if !quic_tests_available() {
2332            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2333            return;
2334        }
2335
2336        let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
2337        let (ca, ca_key) = make_ca();
2338        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2339        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2340        let control_plane_b = reserve_udp_addr();
2341
2342        let engine_a = Engine::with_config(EngineConfig {
2343            engine_id: EngineId::new("engine-a.example"),
2344            ..Default::default()
2345        });
2346        let engine_b = Engine::with_config(EngineConfig {
2347            engine_id: EngineId::new("engine-b.example"),
2348            control_plane_quic_addr: Some(control_plane_b.to_string()),
2349            control_plane_tls: Some(tls_b.clone()),
2350            actor_spawn: Some(bounded_spawn_handler()),
2351            ..Default::default()
2352        });
2353
2354        let handle_a = engine_a.handle();
2355        let handle_b = engine_b.handle();
2356        handle_a
2357            .type_registry()
2358            .register_remote_ask::<EchoRequest>();
2359        handle_b
2360            .type_registry()
2361            .register_remote_ask::<EchoRequest>();
2362
2363        let cluster_b = match handle_b
2364            .attach_bounded_cluster(BoundedClusterConfig {
2365                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2366                peers: Vec::new(),
2367                declared_actors: Vec::new(),
2368                roles: Vec::new(),
2369            })
2370            .await
2371        {
2372            Ok(cluster) => cluster,
2373            Err(ClusterError::Transport(err)) => {
2374                eprintln!("skipping quic bounded-cluster test: {err:?}");
2375                return;
2376            }
2377            Err(err) => panic!("attach bounded cluster b: {err:?}"),
2378        };
2379        let quic_b = cluster_b
2380            .transport()
2381            .as_quic()
2382            .cloned()
2383            .expect("quic transport b");
2384
2385        let cluster_a = match handle_a
2386            .attach_bounded_cluster(BoundedClusterConfig {
2387                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
2388                peers: vec![PeerSpec {
2389                    engine_id: EngineId::new("engine-b.example"),
2390                    addr: quic_b.local_addr(),
2391                    control_plane_addr: Some(control_plane_b),
2392                    server_name: None,
2393                }],
2394                declared_actors: Vec::new(),
2395                roles: vec![BoundedClusterRole {
2396                    name: "context-service".to_string(),
2397                    owner: EngineId::new("engine-b.example"),
2398                }],
2399            })
2400            .await
2401        {
2402            Ok(cluster) => cluster,
2403            Err(ClusterError::Transport(err)) => {
2404                eprintln!("skipping quic bounded-cluster test: {err:?}");
2405                return;
2406            }
2407            Err(err) => panic!("attach bounded cluster a: {err:?}"),
2408        };
2409        let quic_a = cluster_a
2410            .transport()
2411            .as_quic()
2412            .cloned()
2413            .expect("quic transport a");
2414
2415        let mut cluster_b = cluster_b;
2416        cluster_b
2417            .add_peer(PeerSpec {
2418                engine_id: EngineId::new("engine-a.example"),
2419                addr: quic_a.local_addr(),
2420                control_plane_addr: None,
2421                server_name: None,
2422            })
2423            .expect("add quic peer a to cluster b");
2424
2425        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2426        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2427
2428        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2429        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2430        tokio::time::sleep(Duration::from_millis(150)).await;
2431
2432        let remote = cluster_a
2433            .spawn_remote_on_role::<EchoRequest>(
2434                "context-service",
2435                RemoteSpawnSpec {
2436                    path: actor_path.clone(),
2437                    type_name: "bounded-echo".to_string(),
2438                    config: None,
2439                },
2440            )
2441            .await
2442            .expect("spawn remote actor over quic by role");
2443
2444        let response = remote
2445            .ask(EchoRequest { value: 9 })
2446            .await
2447            .expect("bounded remote spawn ask over quic by role");
2448        assert_eq!(response, EchoResponse { doubled: 18 });
2449
2450        let unknown_role = cluster_a
2451            .spawn_remote_on_role::<EchoRequest>(
2452                "missing-role",
2453                RemoteSpawnSpec {
2454                    path: ActorPath::parse("/user/missing-role").unwrap(),
2455                    type_name: "bounded-echo".to_string(),
2456                    config: None,
2457                },
2458            )
2459            .await
2460            .expect_err("unknown role should fail");
2461        assert_eq!(
2462            unknown_role,
2463            RemoteSpawnError::UnknownRole("missing-role".to_string())
2464        );
2465
2466        shutdown_a_tx.send(()).ok();
2467        shutdown_b_tx.send(()).ok();
2468    }
2469}