Skip to main content

palladium_runtime/
bounded_cluster.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use dashmap::DashMap;
10use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
11use palladium_transport::network::{Network, TokioNetwork};
12use palladium_transport::{
13    mailbox, QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport,
14    TransportError,
15};
16use rustls::pki_types::ServerName;
17use serde_json::{json, Value};
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
19use tokio_rustls::TlsConnector;
20
21trait SpawnPeerInstaller: Send + Sync {
22    fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>);
23}
24
25impl<R: Reactor + Clone, N: Network + Clone> SpawnPeerInstaller for BoundedTransportHandle<R, N> {
26    fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>) {
27        let source_addr = AddrHash::synthetic(engine_id.as_str().as_bytes());
28        match server_name {
29            Some(server_name) => self.add_peer(&PeerSpec {
30                engine_id: engine_id.clone(),
31                addr,
32                control_plane_addr: None,
33                server_name: Some(server_name),
34            }),
35            None => self.add_peer(&PeerSpec {
36                engine_id: engine_id.clone(),
37                addr,
38                control_plane_addr: None,
39                server_name: None,
40            }),
41        }
42        match self {
43            Self::Tcp(transport) => {
44                let _ = transport.register_remote_source_addr(engine_id, source_addr);
45            }
46            Self::Quic(transport) => {
47                let _ = transport.register_remote_source_addr(engine_id, source_addr);
48            }
49        }
50    }
51}
52
53fn spawn_peer_installers() -> &'static DashMap<String, Arc<dyn SpawnPeerInstaller>> {
54    static INSTALLERS: std::sync::OnceLock<DashMap<String, Arc<dyn SpawnPeerInstaller>>> =
55        std::sync::OnceLock::new();
56    INSTALLERS.get_or_init(DashMap::new)
57}
58
59pub(crate) fn register_spawn_peer_installer<R: Reactor + Clone, N: Network + Clone>(
60    engine_id: &EngineId,
61    transport: &BoundedTransportHandle<R, N>,
62) {
63    spawn_peer_installers().insert(
64        engine_id.as_str().to_string(),
65        Arc::new(transport.clone()) as Arc<dyn SpawnPeerInstaller>,
66    );
67}
68
69pub(crate) fn maybe_register_spawn_caller_peer(
70    host_engine_id: &EngineId,
71    caller_engine_id: &EngineId,
72    caller_transport_addr: SocketAddr,
73    caller_server_name: Option<String>,
74) {
75    if let Some(installer) = spawn_peer_installers().get(host_engine_id.as_str()) {
76        installer.ensure_peer(
77            caller_engine_id.clone(),
78            caller_transport_addr,
79            caller_server_name,
80        );
81    }
82}
83
84#[derive(Clone)]
85pub struct BoundedClusterConfig {
86    pub transport: BoundedTransportConfig,
87    pub peers: Vec<PeerSpec>,
88    pub declared_actors: Vec<DeclaredRemoteActor>,
89    pub roles: Vec<BoundedClusterRole>,
90}
91
92#[derive(Clone)]
93pub enum BoundedTransportConfig {
94    Tcp(TcpTransportConfig),
95    Quic(QuicTransportConfig),
96}
97
98impl BoundedTransportConfig {
99    pub fn engine_id(&self) -> &EngineId {
100        match self {
101            Self::Tcp(config) => &config.engine_id,
102            Self::Quic(config) => &config.engine_id,
103        }
104    }
105}
106
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub struct PeerSpec {
109    pub engine_id: EngineId,
110    pub addr: SocketAddr,
111    pub control_plane_addr: Option<SocketAddr>,
112    pub server_name: Option<String>,
113}
114
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct DeclaredRemoteActor {
117    pub path: ActorPath,
118    pub owner: EngineId,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq)]
122pub struct BoundedClusterRole {
123    pub name: String,
124    pub owner: EngineId,
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
128pub enum ClusterError {
129    DuplicatePeer(EngineId),
130    LocalPeerDeclared(EngineId),
131    DuplicateDeclaredActor(ActorPath),
132    LocalActorDeclaredRemote(ActorPath),
133    UnknownPeerOwner { path: ActorPath, owner: EngineId },
134    DuplicateRole(String),
135    UnknownRoleOwner { role: String, owner: EngineId },
136    Transport(TransportError),
137}
138
139#[derive(Clone, Debug, PartialEq)]
140pub struct RemoteSpawnSpec {
141    pub path: ActorPath,
142    pub type_name: String,
143    pub config: Option<Value>,
144}
145
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub enum RemoteSpawnError {
148    UnknownPeer(EngineId),
149    UnknownRole(String),
150    LocalPathCollision(ActorPath),
151    RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
152    RemotePathAlreadyRunning(ActorPath),
153    MissingControlPlaneEndpoint(EngineId),
154    InvalidServerName(String),
155    ControlPlane(String),
156    SpawnTimedOut(ActorPath),
157}
158
159#[derive(Clone)]
160pub(crate) enum ControlPlaneProtocol {
161    Tcp,
162    Quic,
163}
164
165#[derive(Clone)]
166pub(crate) struct ControlPlaneClientConfig {
167    pub(crate) protocol: ControlPlaneProtocol,
168    pub(crate) tls: palladium_transport::TlsConfig,
169    pub(crate) wait_timeout: Duration,
170}
171
172const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
173
174impl From<TransportError> for ClusterError {
175    fn from(value: TransportError) -> Self {
176        Self::Transport(value)
177    }
178}
179
180impl From<TransportError> for RemoteSpawnError {
181    fn from(value: TransportError) -> Self {
182        Self::ControlPlane(format!("transport route install failed: {value:?}"))
183    }
184}
185
186#[derive(Clone)]
187pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
188    Tcp(Arc<TcpTransport<R, N>>),
189    Quic(Arc<QuicTransport<R, N>>),
190}
191
192impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
193    pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
194        match self {
195            Self::Tcp(transport) => Some(transport),
196            Self::Quic(_) => None,
197        }
198    }
199
200    pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
201        match self {
202            Self::Tcp(_) => None,
203            Self::Quic(transport) => Some(transport),
204        }
205    }
206
207    pub fn add_peer(&self, peer: &PeerSpec) {
208        match self {
209            Self::Tcp(transport) => {
210                if let Some(server_name) = &peer.server_name {
211                    transport.add_peer_with_server_name(
212                        peer.engine_id.clone(),
213                        peer.addr,
214                        server_name.clone(),
215                    );
216                } else {
217                    transport.add_peer(peer.engine_id.clone(), peer.addr);
218                }
219            }
220            Self::Quic(transport) => {
221                if let Some(server_name) = &peer.server_name {
222                    transport.add_peer_with_server_name(
223                        peer.engine_id.clone(),
224                        peer.addr,
225                        server_name.clone(),
226                    );
227                } else {
228                    transport.add_peer(peer.engine_id.clone(), peer.addr);
229                }
230            }
231        }
232    }
233
234    pub fn declare_remote_path(
235        &self,
236        owner: EngineId,
237        path: &ActorPath,
238    ) -> Result<(), TransportError> {
239        match self {
240            Self::Tcp(transport) => transport.declare_remote_path(owner, path),
241            Self::Quic(transport) => transport.declare_remote_path(owner, path),
242        }
243    }
244
245    pub fn advertise_path(&self, addr: AddrHash, path: &ActorPath) -> Result<(), TransportError> {
246        match self {
247            Self::Tcp(transport) => transport.advertise_path(addr, path),
248            Self::Quic(transport) => transport.advertise_path(addr, path),
249        }
250    }
251
252    pub fn register_source_addr(&self, addr: AddrHash) -> Result<(), TransportError> {
253        match self {
254            Self::Tcp(transport) => transport.register(addr, mailbox(1).0),
255            Self::Quic(transport) => transport.register(addr, mailbox(1).0),
256        }
257    }
258
259    pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
260        match self {
261            Self::Tcp(transport) => transport.undeclare_remote_path(path),
262            Self::Quic(transport) => transport.undeclare_remote_path(path),
263        }
264    }
265
266    pub fn can_route(&self, destination: AddrHash) -> bool {
267        match self {
268            Self::Tcp(transport) => transport.can_route(destination),
269            Self::Quic(transport) => transport.can_route(destination),
270        }
271    }
272
273    pub async fn wait_for_peer_connection(
274        &self,
275        engine_id: &EngineId,
276        timeout: Duration,
277    ) -> Result<(), TransportError> {
278        match self {
279            Self::Tcp(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
280            Self::Quic(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
281        }
282    }
283}
284
285#[derive(Clone)]
286pub struct BoundedClusterHandle<
287    R: Reactor = TokioReactor,
288    N: Network = TokioNetwork,
289    F: FileSystem = TokioFileSystem,
290> {
291    engine: EngineHandle<R, N, F>,
292    local_engine_id: EngineId,
293    control_plane: ControlPlaneClientConfig,
294    transport: BoundedTransportHandle<R, N>,
295    peers: Vec<PeerSpec>,
296    declared_actors: Vec<DeclaredRemoteActor>,
297    roles: Vec<BoundedClusterRole>,
298    registry: Arc<ActorRegistry<R>>,
299}
300
301impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
302    pub(crate) fn new(
303        engine: EngineHandle<R, N, F>,
304        local_engine_id: EngineId,
305        control_plane: ControlPlaneClientConfig,
306        transport: BoundedTransportHandle<R, N>,
307        peers: Vec<PeerSpec>,
308        roles: Vec<BoundedClusterRole>,
309        registry: Arc<ActorRegistry<R>>,
310    ) -> Self {
311        Self {
312            engine,
313            local_engine_id,
314            control_plane,
315            transport,
316            peers,
317            declared_actors: Vec::new(),
318            roles,
319            registry,
320        }
321    }
322
323    pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
324        &self.transport
325    }
326
327    pub fn peers(&self) -> &[PeerSpec] {
328        &self.peers
329    }
330
331    pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
332        &self.declared_actors
333    }
334
335    pub fn roles(&self) -> &[BoundedClusterRole] {
336        &self.roles
337    }
338
339    pub(crate) fn advertise_existing_local_routes(&self) -> Result<(), ClusterError> {
340        self.transport
341            .register_source_addr(self.engine.source_addr)
342            .map_err(ClusterError::Transport)?;
343
344        for info in self
345            .registry
346            .snapshot(&crate::introspection::ActorQuery::default(), 0)
347        {
348            if info.state != crate::introspection::ActorState::Running {
349                continue;
350            }
351            if let Some(slot) = self.registry.get_by_path(&info.path) {
352                self.transport
353                    .advertise_path(slot.addr, &info.path)
354                    .map_err(ClusterError::Transport)?;
355            }
356        }
357        Ok(())
358    }
359
360    pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
361        validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
362        self.transport.add_peer(&peer);
363        self.advertise_existing_local_routes()?;
364        self.peers.push(peer);
365        Ok(())
366    }
367
368    pub fn declare_remote_actor(
369        &mut self,
370        owner: EngineId,
371        path: ActorPath,
372    ) -> Result<(), ClusterError> {
373        validate_declared_actor(
374            &path,
375            &owner,
376            &self.local_engine_id,
377            &self.peers,
378            &self.declared_actors,
379            &self.registry,
380        )?;
381        self.transport.declare_remote_path(owner.clone(), &path)?;
382        self.declared_actors
383            .push(DeclaredRemoteActor { path, owner });
384        Ok(())
385    }
386
387    pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
388        self.transport.undeclare_remote_path(path)?;
389        self.declared_actors
390            .retain(|declared| &declared.path != path);
391        Ok(())
392    }
393
394    pub fn can_route(&self, path: &ActorPath) -> bool {
395        self.transport.can_route(AddrHash::new(path, 0))
396    }
397
398    pub async fn spawn_remote_on_role<M>(
399        &self,
400        role: &str,
401        spec: RemoteSpawnSpec,
402    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
403    where
404        M: RemoteMessage,
405        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
406        R: Send + Sync,
407    {
408        let owner = self
409            .roles
410            .iter()
411            .find(|declared| declared.name == role)
412            .map(|declared| declared.owner.clone())
413            .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
414        self.spawn_remote::<M>(owner, spec).await
415    }
416
417    pub async fn spawn_remote<M>(
418        &self,
419        target: EngineId,
420        spec: RemoteSpawnSpec,
421    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
422    where
423        M: RemoteMessage,
424        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
425        R: Send + Sync,
426    {
427        validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
428
429        let peer = self
430            .peers
431            .iter()
432            .find(|peer| peer.engine_id == target)
433            .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
434        let control_plane_addr = peer
435            .control_plane_addr
436            .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
437        let server_name = peer
438            .server_name
439            .clone()
440            .unwrap_or_else(|| target.as_str().to_string());
441        let caller_server_name = self.local_engine_id.as_str().to_string();
442
443        let deadline = Instant::now() + self.control_plane.wait_timeout;
444        loop {
445            match call_actor_spawn(
446                &self.control_plane,
447                control_plane_addr,
448                &server_name,
449                &self.local_engine_id,
450                transport_listen_addr(&self.transport),
451                &caller_server_name,
452                &spec,
453            )
454            .await
455            {
456                Ok(()) => break,
457                Err(RemoteSpawnError::ControlPlane(message))
458                    if is_retryable_control_plane_error(&message) && Instant::now() <= deadline =>
459                {
460                    tokio::time::sleep(Duration::from_millis(20)).await;
461                }
462                Err(err) => return Err(err),
463            }
464        }
465
466        while Instant::now() <= deadline {
467            if actor_is_spawned(
468                &self.control_plane,
469                control_plane_addr,
470                &server_name,
471                &spec.path,
472            )
473            .await?
474            {
475                self.transport
476                    .declare_remote_path(target.clone(), &spec.path)
477                    .map_err(RemoteSpawnError::from)?;
478                self.transport
479                    .wait_for_peer_connection(&target, self.control_plane.wait_timeout)
480                    .await
481                    .map_err(RemoteSpawnError::from)?;
482                return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
483            }
484            tokio::time::sleep(Duration::from_millis(20)).await;
485        }
486
487        Err(RemoteSpawnError::SpawnTimedOut(spec.path))
488    }
489}
490
491async fn call_actor_spawn(
492    client: &ControlPlaneClientConfig,
493    addr: SocketAddr,
494    server_name: &str,
495    caller_engine_id: &EngineId,
496    caller_transport_addr: SocketAddr,
497    caller_server_name: &str,
498    spec: &RemoteSpawnSpec,
499) -> Result<(), RemoteSpawnError> {
500    let params = match &spec.config {
501        Some(config) => json!({
502            "path": spec.path.as_str(),
503            "type_name": spec.type_name,
504            "config": config,
505            "caller_engine_id": caller_engine_id.as_str(),
506            "caller_transport_addr": caller_transport_addr,
507            "caller_server_name": caller_server_name,
508        }),
509        None => json!({
510            "path": spec.path.as_str(),
511            "type_name": spec.type_name,
512            "caller_engine_id": caller_engine_id.as_str(),
513            "caller_transport_addr": caller_transport_addr,
514            "caller_server_name": caller_server_name,
515        }),
516    };
517    match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
518        Ok(_) => {}
519        Err(RemoteSpawnError::ControlPlane(message))
520            if message.contains("actor already running at") =>
521        {
522            return Err(RemoteSpawnError::RemotePathAlreadyRunning(
523                spec.path.clone(),
524            ));
525        }
526        Err(err) => return Err(err),
527    }
528    Ok(())
529}
530
531fn transport_listen_addr<R: Reactor + Clone, N: Network + Clone>(
532    transport: &BoundedTransportHandle<R, N>,
533) -> SocketAddr {
534    match transport {
535        BoundedTransportHandle::Tcp(transport) => transport.local_addr(),
536        BoundedTransportHandle::Quic(transport) => transport.local_addr(),
537    }
538}
539
540fn is_retryable_control_plane_error(message: &str) -> bool {
541    let msg = message.to_ascii_lowercase();
542    msg.contains("connection refused")
543        || msg.contains("connection reset")
544        || msg.contains("timed out")
545        || msg.contains("broken pipe")
546        || msg.contains("not connected")
547}
548
549fn validate_remote_spawn_request<R: Reactor>(
550    path: &ActorPath,
551    owner: &EngineId,
552    declared_actors: &[DeclaredRemoteActor],
553    registry: &Arc<ActorRegistry<R>>,
554) -> Result<(), RemoteSpawnError> {
555    if registry.get_by_path(path).is_some() {
556        return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
557    }
558
559    if let Some(declared) = declared_actors
560        .iter()
561        .find(|declared| declared.path == *path && declared.owner != *owner)
562    {
563        return Err(RemoteSpawnError::RemoteOwnershipConflict {
564            path: path.clone(),
565            owner: declared.owner.clone(),
566        });
567    }
568
569    Ok(())
570}
571
572async fn actor_is_spawned(
573    client: &ControlPlaneClientConfig,
574    addr: SocketAddr,
575    server_name: &str,
576    path: &ActorPath,
577) -> Result<bool, RemoteSpawnError> {
578    match call_control_plane(
579        client,
580        addr,
581        server_name,
582        "actor.info",
583        json!({ "path": path.as_str() }),
584    )
585    .await
586    {
587        Ok(_) => Ok(true),
588        Err(RemoteSpawnError::ControlPlane(message))
589            if message.contains("actor not found:")
590                || message.contains("actor not found")
591                || message.contains("User supervisor not available") =>
592        {
593            Ok(false)
594        }
595        Err(err) => Err(err),
596    }
597}
598
599async fn call_control_plane(
600    client: &ControlPlaneClientConfig,
601    addr: SocketAddr,
602    server_name: &str,
603    method: &str,
604    params: Value,
605) -> Result<Value, RemoteSpawnError> {
606    let request = json!({
607        "id": 1,
608        "method": method,
609        "params": params,
610    });
611
612    match client.protocol {
613        ControlPlaneProtocol::Tcp => {
614            call_control_plane_tcp(addr, server_name, &client.tls, &request).await
615        }
616        ControlPlaneProtocol::Quic => {
617            call_control_plane_quic(addr, server_name, &client.tls, &request).await
618        }
619    }
620}
621
622async fn call_control_plane_tcp(
623    addr: SocketAddr,
624    server_name: &str,
625    tls: &palladium_transport::TlsConfig,
626    request: &Value,
627) -> Result<Value, RemoteSpawnError> {
628    let config = tls
629        .client_config(server_name)
630        .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
631    let name = ServerName::try_from(server_name.to_string())
632        .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
633    let stream = tokio::net::TcpStream::connect(addr)
634        .await
635        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
636    let connector = TlsConnector::from(config);
637    let tls_stream = connector
638        .connect(name, stream)
639        .await
640        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
641
642    let (reader, mut writer) = tokio::io::split(tls_stream);
643    let mut body = request.to_string();
644    body.push('\n');
645    writer
646        .write_all(body.as_bytes())
647        .await
648        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
649    let mut line = String::new();
650    let mut reader = BufReader::new(reader);
651    reader
652        .read_line(&mut line)
653        .await
654        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
655    parse_control_plane_response(&line)
656}
657
658async fn call_control_plane_quic(
659    addr: SocketAddr,
660    server_name: &str,
661    tls: &palladium_transport::TlsConfig,
662    request: &Value,
663) -> Result<Value, RemoteSpawnError> {
664    ensure_rustls_provider();
665    let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
666        RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
667    })?)
668    .clone();
669    client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
670    let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
671
672    let client = s2n_quic::Client::builder()
673        .with_tls(client_tls)
674        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
675        .with_io("0.0.0.0:0")
676        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
677        .start()
678        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
679
680    let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
681    let mut conn = client
682        .connect(connect)
683        .await
684        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
685    let mut stream = conn
686        .open_bidirectional_stream()
687        .await
688        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
689
690    let mut body = request.to_string();
691    body.push('\n');
692    stream
693        .write_all(body.as_bytes())
694        .await
695        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
696    stream
697        .close()
698        .await
699        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
700
701    let mut buf = Vec::new();
702    stream
703        .read_to_end(&mut buf)
704        .await
705        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
706    let line = String::from_utf8_lossy(&buf);
707    parse_control_plane_response(&line)
708}
709
710fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
711    let response: Value = serde_json::from_str(line.trim())
712        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
713    if let Some(error) = response.get("error") {
714        let message = error
715            .get("message")
716            .and_then(Value::as_str)
717            .unwrap_or("unknown control-plane error");
718        return Err(RemoteSpawnError::ControlPlane(message.to_string()));
719    }
720    Ok(response.get("result").cloned().unwrap_or(Value::Null))
721}
722
723fn ensure_rustls_provider() {
724    static INIT: std::sync::Once = std::sync::Once::new();
725    INIT.call_once(|| {
726        #[cfg(feature = "aws-lc-rs")]
727        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
728        #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
729        let _ = rustls::crypto::ring::default_provider().install_default();
730    });
731}
732
733pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
734    ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
735}
736
737pub(crate) fn validate_config<R: Reactor>(
738    config: &BoundedClusterConfig,
739    registry: &Arc<ActorRegistry<R>>,
740) -> Result<(), ClusterError> {
741    let local_engine_id = config.transport.engine_id();
742    let mut seen_peers = Vec::new();
743    for peer in &config.peers {
744        validate_peer_spec(peer, local_engine_id, &seen_peers)?;
745        seen_peers.push(peer.clone());
746    }
747
748    let mut seen_declared = Vec::new();
749    for declared in &config.declared_actors {
750        validate_declared_actor(
751            &declared.path,
752            &declared.owner,
753            local_engine_id,
754            &config.peers,
755            &seen_declared,
756            registry,
757        )?;
758        seen_declared.push(declared.clone());
759    }
760
761    let mut seen_roles = Vec::new();
762    for role in &config.roles {
763        validate_role(role, &config.peers, &seen_roles)?;
764        seen_roles.push(role.clone());
765    }
766
767    Ok(())
768}
769
770fn validate_peer_spec(
771    peer: &PeerSpec,
772    local_engine_id: &EngineId,
773    existing: &[PeerSpec],
774) -> Result<(), ClusterError> {
775    if &peer.engine_id == local_engine_id {
776        return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
777    }
778    if existing
779        .iter()
780        .any(|existing| existing.engine_id == peer.engine_id)
781    {
782        return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
783    }
784    Ok(())
785}
786
787fn validate_declared_actor<R: Reactor>(
788    path: &ActorPath,
789    owner: &EngineId,
790    local_engine_id: &EngineId,
791    peers: &[PeerSpec],
792    existing: &[DeclaredRemoteActor],
793    registry: &Arc<ActorRegistry<R>>,
794) -> Result<(), ClusterError> {
795    if owner == local_engine_id || registry.get_by_path(path).is_some() {
796        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
797    }
798    if existing.iter().any(|existing| existing.path == *path) {
799        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
800    }
801    if !peers.iter().any(|peer| peer.engine_id == *owner) {
802        return Err(ClusterError::UnknownPeerOwner {
803            path: path.clone(),
804            owner: owner.clone(),
805        });
806    }
807    Ok(())
808}
809
810fn validate_role(
811    role: &BoundedClusterRole,
812    peers: &[PeerSpec],
813    existing: &[BoundedClusterRole],
814) -> Result<(), ClusterError> {
815    if existing.iter().any(|existing| existing.name == role.name) {
816        return Err(ClusterError::DuplicateRole(role.name.clone()));
817    }
818    if !peers.iter().any(|peer| peer.engine_id == role.owner) {
819        return Err(ClusterError::UnknownRoleOwner {
820            role: role.name.clone(),
821            owner: role.owner.clone(),
822        });
823    }
824    Ok(())
825}
826
827#[cfg(test)]
828mod tests {
829    use super::{
830        call_control_plane, validate_config, BoundedClusterConfig, BoundedClusterRole,
831        BoundedTransportConfig, ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError,
832        RemoteSpawnSpec,
833    };
834    use crate::{Engine, EngineConfig};
835    use palladium_actor::{
836        Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
837        Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
838    };
839    use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
840    use rcgen::{
841        BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
842        KeyUsagePurpose,
843    };
844    use serde_json::json;
845    use std::net::{SocketAddr, TcpListener, UdpSocket};
846    use std::sync::Arc;
847    use std::time::{Duration, Instant};
848
849    fn make_ca() -> (Certificate, KeyPair) {
850        let mut params = CertificateParams::default();
851        params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
852        params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
853        let ca_key = KeyPair::generate().expect("ca keypair");
854        let cert = params.self_signed(&ca_key).expect("ca cert");
855        (cert, ca_key)
856    }
857
858    fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
859        let keypair = KeyPair::generate().expect("leaf keypair");
860        let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
861        let mut params = params;
862        params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
863        params.extended_key_usages = vec![
864            ExtendedKeyUsagePurpose::ServerAuth,
865            ExtendedKeyUsagePurpose::ClientAuth,
866        ];
867        let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
868        let cert_der = cert.der().to_vec();
869        let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
870        let ca_der = ca.der().to_vec();
871        TlsConfig {
872            cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
873            private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
874            trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
875        }
876    }
877
878    fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
879        TcpTransportConfig {
880            engine_id: EngineId::new(engine_id),
881            listen_addr: "127.0.0.1:0".parse().unwrap(),
882            max_connections_per_peer: 1,
883            idle_timeout: Duration::from_secs(1),
884            send_buffer_size: 8,
885            nodelay: true,
886            tls,
887            send_delay: Duration::from_millis(0),
888        }
889    }
890
891    fn reserve_tcp_addr() -> SocketAddr {
892        let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
893        let addr = listener.local_addr().expect("tcp local addr");
894        drop(listener);
895        addr
896    }
897
898    fn reserve_udp_addr() -> SocketAddr {
899        let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
900        let addr = socket.local_addr().expect("udp local addr");
901        drop(socket);
902        addr
903    }
904
905    #[test]
906    fn bounded_cluster_config_rejects_duplicate_peers() {
907        let (ca, ca_key) = make_ca();
908        let engine = Engine::with_config(EngineConfig {
909            engine_id: EngineId::new("engine-a.example"),
910            ..Default::default()
911        });
912        let config = BoundedClusterConfig {
913            transport: BoundedTransportConfig::Tcp(tcp_config(
914                "engine-a.example",
915                make_tls_config("engine-a.example", &ca, &ca_key),
916            )),
917            peers: vec![
918                PeerSpec {
919                    engine_id: EngineId::new("engine-b.example"),
920                    addr: "127.0.0.1:4100".parse().unwrap(),
921                    control_plane_addr: None,
922                    server_name: None,
923                },
924                PeerSpec {
925                    engine_id: EngineId::new("engine-b.example"),
926                    addr: "127.0.0.1:4200".parse().unwrap(),
927                    control_plane_addr: None,
928                    server_name: None,
929                },
930            ],
931            declared_actors: Vec::new(),
932            roles: Vec::new(),
933        };
934
935        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
936        assert_eq!(
937            err,
938            ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
939        );
940    }
941
942    #[test]
943    fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
944        let (ca, ca_key) = make_ca();
945        let engine = Engine::with_config(EngineConfig {
946            engine_id: EngineId::new("engine-a.example"),
947            ..Default::default()
948        });
949        let config = BoundedClusterConfig {
950            transport: BoundedTransportConfig::Tcp(tcp_config(
951                "engine-a.example",
952                make_tls_config("engine-a.example", &ca, &ca_key),
953            )),
954            peers: vec![PeerSpec {
955                engine_id: EngineId::new("engine-b.example"),
956                addr: "127.0.0.1:4100".parse().unwrap(),
957                control_plane_addr: None,
958                server_name: None,
959            }],
960            declared_actors: vec![DeclaredRemoteActor {
961                path: ActorPath::parse("/user/missing-owner").unwrap(),
962                owner: EngineId::new("engine-c.example"),
963            }],
964            roles: Vec::new(),
965        };
966
967        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
968        assert_eq!(
969            err,
970            ClusterError::UnknownPeerOwner {
971                path: ActorPath::parse("/user/missing-owner").unwrap(),
972                owner: EngineId::new("engine-c.example"),
973            }
974        );
975    }
976
977    #[test]
978    fn bounded_cluster_config_rejects_duplicate_roles() {
979        let (ca, ca_key) = make_ca();
980        let engine = Engine::with_config(EngineConfig {
981            engine_id: EngineId::new("engine-a.example"),
982            ..Default::default()
983        });
984        let config = BoundedClusterConfig {
985            transport: BoundedTransportConfig::Tcp(tcp_config(
986                "engine-a.example",
987                make_tls_config("engine-a.example", &ca, &ca_key),
988            )),
989            peers: vec![PeerSpec {
990                engine_id: EngineId::new("engine-b.example"),
991                addr: "127.0.0.1:4100".parse().unwrap(),
992                control_plane_addr: None,
993                server_name: None,
994            }],
995            declared_actors: Vec::new(),
996            roles: vec![
997                BoundedClusterRole {
998                    name: "context-service".to_string(),
999                    owner: EngineId::new("engine-b.example"),
1000                },
1001                BoundedClusterRole {
1002                    name: "context-service".to_string(),
1003                    owner: EngineId::new("engine-b.example"),
1004                },
1005            ],
1006        };
1007
1008        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1009        assert_eq!(
1010            err,
1011            ClusterError::DuplicateRole("context-service".to_string())
1012        );
1013    }
1014
1015    #[test]
1016    fn bounded_cluster_config_rejects_unknown_role_owner() {
1017        let (ca, ca_key) = make_ca();
1018        let engine = Engine::with_config(EngineConfig {
1019            engine_id: EngineId::new("engine-a.example"),
1020            ..Default::default()
1021        });
1022        let config = BoundedClusterConfig {
1023            transport: BoundedTransportConfig::Tcp(tcp_config(
1024                "engine-a.example",
1025                make_tls_config("engine-a.example", &ca, &ca_key),
1026            )),
1027            peers: vec![PeerSpec {
1028                engine_id: EngineId::new("engine-b.example"),
1029                addr: "127.0.0.1:4100".parse().unwrap(),
1030                control_plane_addr: None,
1031                server_name: None,
1032            }],
1033            declared_actors: Vec::new(),
1034            roles: vec![BoundedClusterRole {
1035                name: "gateway".to_string(),
1036                owner: EngineId::new("engine-c.example"),
1037            }],
1038        };
1039
1040        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1041        assert_eq!(
1042            err,
1043            ClusterError::UnknownRoleOwner {
1044                role: "gateway".to_string(),
1045                owner: EngineId::new("engine-c.example"),
1046            }
1047        );
1048    }
1049
1050    #[test]
1051    fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
1052        assert_eq!(
1053            super::remote_spawn_wait_timeout(Duration::from_millis(25)),
1054            Duration::from_secs(5)
1055        );
1056        assert_eq!(
1057            super::remote_spawn_wait_timeout(Duration::from_secs(8)),
1058            Duration::from_secs(8)
1059        );
1060    }
1061
1062    fn tcp_tests_available() -> bool {
1063        if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
1064            return false;
1065        }
1066        std::net::TcpListener::bind("127.0.0.1:0").is_ok()
1067    }
1068
1069    fn quic_tests_available() -> bool {
1070        super::ensure_rustls_provider();
1071        rustls::crypto::CryptoProvider::get_default().is_some()
1072    }
1073
1074    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1075    struct EchoRequest {
1076        value: u64,
1077    }
1078
1079    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1080    struct EchoResponse {
1081        doubled: u64,
1082    }
1083
1084    impl Message for EchoRequest {
1085        type Response = EchoResponse;
1086        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
1087    }
1088
1089    struct EchoActor;
1090
1091    impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
1092        fn on_message(
1093            &mut self,
1094            ctx: &mut ActorContext<R>,
1095            envelope: &Envelope,
1096            payload: MessagePayload,
1097        ) -> Result<(), ActorError> {
1098            let req = payload
1099                .extract::<EchoRequest>()
1100                .map_err(|_| ActorError::Handler)?;
1101            let resp = EchoResponse {
1102                doubled: req.value * 2,
1103            };
1104            let resp_env = envelope.response(0);
1105            ctx.send_raw(resp_env, MessagePayload::local(resp))
1106                .map_err(|_| ActorError::Handler)
1107        }
1108    }
1109
1110    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1111    struct CrashRequest;
1112
1113    impl Message for CrashRequest {
1114        type Response = ();
1115        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
1116    }
1117
1118    struct SpawnedEchoActor;
1119
1120    impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
1121        fn on_message(
1122            &mut self,
1123            ctx: &mut ActorContext<R>,
1124            envelope: &Envelope,
1125            payload: MessagePayload,
1126        ) -> Result<(), ActorError> {
1127            match envelope.type_tag {
1128                EchoRequest::TYPE_TAG => {
1129                    let req = payload
1130                        .extract::<EchoRequest>()
1131                        .map_err(|_| ActorError::Handler)?;
1132                    let resp = EchoResponse {
1133                        doubled: req.value * 2,
1134                    };
1135                    let resp_env = envelope.response(0);
1136                    ctx.send_raw(resp_env, MessagePayload::local(resp))
1137                        .map_err(|_| ActorError::Handler)
1138                }
1139                CrashRequest::TYPE_TAG => Err(ActorError::Handler),
1140                _ => Err(ActorError::Handler),
1141            }
1142        }
1143    }
1144
1145    fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
1146        Arc::new(|type_name: &str, _config: &[u8]| match type_name {
1147            "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
1148            _ => Err(format!("unknown remote spawn type: {type_name}")),
1149        })
1150    }
1151
1152    #[tokio::test]
1153    async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
1154        if !tcp_tests_available() {
1155            eprintln!("skipping tcp integration tests: network bind not permitted");
1156            return;
1157        }
1158
1159        let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
1160        let (ca, ca_key) = make_ca();
1161        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1162        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1163
1164        let mut engine_a = Engine::with_config(EngineConfig {
1165            engine_id: EngineId::new("engine-a.example"),
1166            ..Default::default()
1167        });
1168        let engine_b = Engine::with_config(EngineConfig {
1169            engine_id: EngineId::new("engine-b.example"),
1170            ..Default::default()
1171        });
1172
1173        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1174        engine_a.add_user_actor(ChildSpec::new(
1175            "cluster-echo",
1176            RestartPolicy::Permanent,
1177            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1178            ns,
1179            move || Box::new(EchoActor),
1180        ));
1181
1182        let handle_a = engine_a.handle();
1183        let handle_b = engine_b.handle();
1184        handle_a
1185            .type_registry()
1186            .register_remote_ask::<EchoRequest>();
1187        handle_b
1188            .type_registry()
1189            .register_remote_ask::<EchoRequest>();
1190
1191        let cluster_a = handle_a
1192            .attach_bounded_cluster(BoundedClusterConfig {
1193                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1194                peers: Vec::new(),
1195                declared_actors: Vec::new(),
1196                roles: Vec::new(),
1197            })
1198            .await
1199            .expect("attach bounded cluster a");
1200        let tcp_a = cluster_a
1201            .transport()
1202            .as_tcp()
1203            .cloned()
1204            .expect("tcp transport a");
1205
1206        let cluster_b = handle_b
1207            .attach_bounded_cluster(BoundedClusterConfig {
1208                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1209                peers: vec![PeerSpec {
1210                    engine_id: EngineId::new("engine-a.example"),
1211                    addr: tcp_a.local_addr(),
1212                    control_plane_addr: None,
1213                    server_name: None,
1214                }],
1215                declared_actors: Vec::new(),
1216                roles: Vec::new(),
1217            })
1218            .await
1219            .expect("attach bounded cluster b");
1220        let tcp_b = cluster_b
1221            .transport()
1222            .as_tcp()
1223            .cloned()
1224            .expect("tcp transport b");
1225
1226        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1227        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1228        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1229
1230        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1231        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1232
1233        let canonical = AddrHash::new(&actor_path, 0);
1234        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1235        while !cluster_b.can_route(&actor_path) {
1236            if std::time::Instant::now() > deadline {
1237                panic!("bounded cluster route did not propagate");
1238            }
1239            tokio::time::sleep(Duration::from_millis(20)).await;
1240        }
1241        assert!(tcp_b.can_route(canonical));
1242
1243        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1244        let response = remote
1245            .ask(EchoRequest { value: 12 })
1246            .await
1247            .expect("bounded cluster ask");
1248        assert_eq!(response, EchoResponse { doubled: 24 });
1249
1250        shutdown_a_tx.send(()).ok();
1251        shutdown_b_tx.send(()).ok();
1252    }
1253
1254    #[tokio::test]
1255    async fn bounded_cluster_add_peer_after_start_backfills_existing_tcp_routes() {
1256        if !tcp_tests_available() {
1257            eprintln!("skipping tcp integration tests: network bind not permitted");
1258            return;
1259        }
1260
1261        let actor_path = ActorPath::parse("/user/cluster-echo-add-peer").unwrap();
1262        let (ca, ca_key) = make_ca();
1263        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1264        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1265
1266        let mut engine_a = Engine::with_config(EngineConfig {
1267            engine_id: EngineId::new("engine-a.example"),
1268            ..Default::default()
1269        });
1270        let engine_b = Engine::with_config(EngineConfig {
1271            engine_id: EngineId::new("engine-b.example"),
1272            ..Default::default()
1273        });
1274
1275        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1276        engine_a.add_user_actor(ChildSpec::new(
1277            "cluster-echo-add-peer",
1278            RestartPolicy::Permanent,
1279            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1280            ns,
1281            move || Box::new(EchoActor),
1282        ));
1283
1284        let handle_a = engine_a.handle();
1285        let handle_b = engine_b.handle();
1286        handle_a
1287            .type_registry()
1288            .register_remote_ask::<EchoRequest>();
1289        handle_b
1290            .type_registry()
1291            .register_remote_ask::<EchoRequest>();
1292
1293        let mut cluster_a = handle_a
1294            .attach_bounded_cluster(BoundedClusterConfig {
1295                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1296                peers: Vec::new(),
1297                declared_actors: Vec::new(),
1298                roles: Vec::new(),
1299            })
1300            .await
1301            .expect("attach bounded cluster a");
1302        let tcp_a = cluster_a
1303            .transport()
1304            .as_tcp()
1305            .cloned()
1306            .expect("tcp transport a");
1307
1308        let cluster_b = handle_b
1309            .attach_bounded_cluster(BoundedClusterConfig {
1310                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1311                peers: vec![PeerSpec {
1312                    engine_id: EngineId::new("engine-a.example"),
1313                    addr: tcp_a.local_addr(),
1314                    control_plane_addr: None,
1315                    server_name: None,
1316                }],
1317                declared_actors: Vec::new(),
1318                roles: Vec::new(),
1319            })
1320            .await
1321            .expect("attach bounded cluster b");
1322        let tcp_b = cluster_b
1323            .transport()
1324            .as_tcp()
1325            .cloned()
1326            .expect("tcp transport b");
1327
1328        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1329        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1330
1331        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1332        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1333
1334        let local_ready_deadline = std::time::Instant::now() + Duration::from_secs(5);
1335        while handle_a.registry.get_by_path(&actor_path).is_none() {
1336            if std::time::Instant::now() > local_ready_deadline {
1337                panic!("local actor did not start before peer add");
1338            }
1339            tokio::time::sleep(Duration::from_millis(20)).await;
1340        }
1341
1342        cluster_a
1343            .add_peer(PeerSpec {
1344                engine_id: EngineId::new("engine-b.example"),
1345                addr: tcp_b.local_addr(),
1346                control_plane_addr: None,
1347                server_name: None,
1348            })
1349            .expect("add peer b to cluster a after start");
1350
1351        let canonical = AddrHash::new(&actor_path, 0);
1352        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1353        while !cluster_b.can_route(&actor_path) {
1354            if std::time::Instant::now() > deadline {
1355                panic!("bounded cluster route did not propagate after add_peer");
1356            }
1357            tokio::time::sleep(Duration::from_millis(20)).await;
1358        }
1359        assert!(tcp_b.can_route(canonical));
1360
1361        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1362        let response = remote
1363            .ask(EchoRequest { value: 15 })
1364            .await
1365            .expect("bounded cluster ask after add_peer");
1366        assert_eq!(response, EchoResponse { doubled: 30 });
1367
1368        shutdown_a_tx.send(()).ok();
1369        shutdown_b_tx.send(()).ok();
1370    }
1371
1372    #[tokio::test]
1373    async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1374        if !tcp_tests_available() {
1375            eprintln!("skipping tcp integration tests: network bind not permitted");
1376            return;
1377        }
1378
1379        let (ca, ca_key) = make_ca();
1380        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1381        let engine = Engine::with_config(EngineConfig {
1382            engine_id: EngineId::new("engine-a.example"),
1383            ..Default::default()
1384        });
1385        let handle = engine.handle();
1386
1387        let mut cluster = handle
1388            .attach_bounded_cluster(BoundedClusterConfig {
1389                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1390                peers: vec![PeerSpec {
1391                    engine_id: EngineId::new("engine-b.example"),
1392                    addr: "127.0.0.1:4100".parse().unwrap(),
1393                    control_plane_addr: None,
1394                    server_name: None,
1395                }],
1396                declared_actors: Vec::new(),
1397                roles: Vec::new(),
1398            })
1399            .await
1400            .expect("attach bounded cluster");
1401
1402        let path = ActorPath::parse("/user/declared-remote").unwrap();
1403        cluster
1404            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1405            .expect("declare remote actor");
1406
1407        assert!(cluster.can_route(&path));
1408        assert!(cluster
1409            .transport()
1410            .as_tcp()
1411            .expect("tcp transport")
1412            .can_route(AddrHash::new(&path, 0)));
1413    }
1414
1415    #[tokio::test]
1416    async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1417        if !tcp_tests_available() {
1418            eprintln!("skipping tcp integration tests: network bind not permitted");
1419            return;
1420        }
1421
1422        let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1423        let (ca, ca_key) = make_ca();
1424        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1425        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1426        let control_plane_b = reserve_tcp_addr();
1427
1428        let mut engine_a = Engine::with_config(EngineConfig {
1429            engine_id: EngineId::new("engine-a.example"),
1430            ..Default::default()
1431        });
1432        let engine_b = Engine::with_config(EngineConfig {
1433            engine_id: EngineId::new("engine-b.example"),
1434            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1435            control_plane_tls: Some(tls_b.clone()),
1436            actor_spawn: Some(bounded_spawn_handler()),
1437            ..Default::default()
1438        });
1439
1440        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1441        engine_a.add_user_actor(ChildSpec::new(
1442            "local-remote-collision",
1443            RestartPolicy::Permanent,
1444            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1445            ns,
1446            move || Box::new(EchoActor),
1447        ));
1448
1449        let handle_a = engine_a.handle();
1450        let handle_b = engine_b.handle();
1451
1452        let cluster_b = handle_b
1453            .attach_bounded_cluster(BoundedClusterConfig {
1454                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1455                peers: Vec::new(),
1456                declared_actors: Vec::new(),
1457                roles: Vec::new(),
1458            })
1459            .await
1460            .expect("attach bounded cluster b");
1461        let tcp_b = cluster_b
1462            .transport()
1463            .as_tcp()
1464            .cloned()
1465            .expect("tcp transport b");
1466
1467        let cluster_a = handle_a
1468            .attach_bounded_cluster(BoundedClusterConfig {
1469                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1470                peers: vec![PeerSpec {
1471                    engine_id: EngineId::new("engine-b.example"),
1472                    addr: tcp_b.local_addr(),
1473                    control_plane_addr: Some(control_plane_b),
1474                    server_name: None,
1475                }],
1476                declared_actors: Vec::new(),
1477                roles: vec![BoundedClusterRole {
1478                    name: "context-service".to_string(),
1479                    owner: EngineId::new("engine-b.example"),
1480                }],
1481            })
1482            .await
1483            .expect("attach bounded cluster a");
1484
1485        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1486        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1487
1488        let deadline = Instant::now() + Duration::from_secs(5);
1489        while handle_a.registry.get_by_path(&actor_path).is_none() {
1490            if Instant::now() > deadline {
1491                panic!("local collision actor did not start");
1492            }
1493            tokio::time::sleep(Duration::from_millis(20)).await;
1494        }
1495
1496        let err = cluster_a
1497            .spawn_remote_on_role::<EchoRequest>(
1498                "context-service",
1499                RemoteSpawnSpec {
1500                    path: actor_path.clone(),
1501                    type_name: "bounded-echo".to_string(),
1502                    config: None,
1503                },
1504            )
1505            .await
1506            .expect_err("local path collision should fail");
1507        assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1508
1509        shutdown_a_tx.send(()).ok();
1510    }
1511
1512    #[tokio::test]
1513    async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1514        if !tcp_tests_available() {
1515            eprintln!("skipping tcp integration tests: network bind not permitted");
1516            return;
1517        }
1518
1519        let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1520        let (ca, ca_key) = make_ca();
1521        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1522        let engine = Engine::with_config(EngineConfig {
1523            engine_id: EngineId::new("engine-a.example"),
1524            ..Default::default()
1525        });
1526        let handle = engine.handle();
1527
1528        let cluster = handle
1529            .attach_bounded_cluster(BoundedClusterConfig {
1530                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1531                peers: vec![
1532                    PeerSpec {
1533                        engine_id: EngineId::new("engine-b.example"),
1534                        addr: "127.0.0.1:4100".parse().unwrap(),
1535                        control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1536                        server_name: None,
1537                    },
1538                    PeerSpec {
1539                        engine_id: EngineId::new("engine-c.example"),
1540                        addr: "127.0.0.1:4200".parse().unwrap(),
1541                        control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1542                        server_name: None,
1543                    },
1544                ],
1545                declared_actors: vec![DeclaredRemoteActor {
1546                    path: actor_path.clone(),
1547                    owner: EngineId::new("engine-b.example"),
1548                }],
1549                roles: Vec::new(),
1550            })
1551            .await
1552            .expect("attach bounded cluster");
1553
1554        let err = cluster
1555            .spawn_remote::<EchoRequest>(
1556                EngineId::new("engine-c.example"),
1557                RemoteSpawnSpec {
1558                    path: actor_path.clone(),
1559                    type_name: "bounded-echo".to_string(),
1560                    config: None,
1561                },
1562            )
1563            .await
1564            .expect_err("conflicting declared owner should fail");
1565        assert_eq!(
1566            err,
1567            RemoteSpawnError::RemoteOwnershipConflict {
1568                path: actor_path,
1569                owner: EngineId::new("engine-b.example"),
1570            }
1571        );
1572    }
1573
1574    #[tokio::test]
1575    async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1576        if !tcp_tests_available() {
1577            eprintln!("skipping tcp integration tests: network bind not permitted");
1578            return;
1579        }
1580
1581        let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1582        let (ca, ca_key) = make_ca();
1583        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1584        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1585        let control_plane_b = reserve_tcp_addr();
1586
1587        let engine_a = Engine::with_config(EngineConfig {
1588            engine_id: EngineId::new("engine-a.example"),
1589            ..Default::default()
1590        });
1591        let engine_b = Engine::with_config(EngineConfig {
1592            engine_id: EngineId::new("engine-b.example"),
1593            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1594            control_plane_tls: Some(tls_b.clone()),
1595            actor_spawn: Some(bounded_spawn_handler()),
1596            ..Default::default()
1597        });
1598
1599        let handle_a = engine_a.handle();
1600        let handle_b = engine_b.handle();
1601        handle_a
1602            .type_registry()
1603            .register_remote_ask::<EchoRequest>();
1604        handle_b
1605            .type_registry()
1606            .register_remote_ask::<EchoRequest>();
1607
1608        let cluster_a = handle_a
1609            .attach_bounded_cluster(BoundedClusterConfig {
1610                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1611                peers: Vec::new(),
1612                declared_actors: Vec::new(),
1613                roles: Vec::new(),
1614            })
1615            .await
1616            .expect("attach bounded cluster a");
1617        let tcp_a = cluster_a
1618            .transport()
1619            .as_tcp()
1620            .cloned()
1621            .expect("tcp transport a");
1622
1623        let cluster_b = handle_b
1624            .attach_bounded_cluster(BoundedClusterConfig {
1625                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1626                peers: vec![PeerSpec {
1627                    engine_id: EngineId::new("engine-a.example"),
1628                    addr: tcp_a.local_addr(),
1629                    control_plane_addr: None,
1630                    server_name: None,
1631                }],
1632                declared_actors: Vec::new(),
1633                roles: Vec::new(),
1634            })
1635            .await
1636            .expect("attach bounded cluster b");
1637        let tcp_b = cluster_b
1638            .transport()
1639            .as_tcp()
1640            .cloned()
1641            .expect("tcp transport b");
1642
1643        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1644        let mut cluster_a = cluster_a;
1645        cluster_a
1646            .add_peer(PeerSpec {
1647                engine_id: EngineId::new("engine-b.example"),
1648                addr: tcp_b.local_addr(),
1649                control_plane_addr: Some(control_plane_b),
1650                server_name: None,
1651            })
1652            .expect("add peer with control plane");
1653
1654        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1655        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1656
1657        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1658        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1659        tokio::time::sleep(Duration::from_millis(150)).await;
1660
1661        let remote = cluster_a
1662            .spawn_remote::<EchoRequest>(
1663                EngineId::new("engine-b.example"),
1664                RemoteSpawnSpec {
1665                    path: actor_path.clone(),
1666                    type_name: "bounded-echo".to_string(),
1667                    config: None,
1668                },
1669            )
1670            .await
1671            .expect("spawn remote actor");
1672
1673        let response = remote
1674            .ask(EchoRequest { value: 21 })
1675            .await
1676            .expect("bounded remote spawn ask");
1677        assert_eq!(response, EchoResponse { doubled: 42 });
1678
1679        let duplicate = cluster_a
1680            .spawn_remote::<EchoRequest>(
1681                EngineId::new("engine-b.example"),
1682                RemoteSpawnSpec {
1683                    path: actor_path.clone(),
1684                    type_name: "bounded-echo".to_string(),
1685                    config: None,
1686                },
1687            )
1688            .await
1689            .expect_err("duplicate remote spawn should fail");
1690        assert_eq!(
1691            duplicate,
1692            RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
1693        );
1694
1695        let old_host_addr = handle_b
1696            .registry
1697            .get_by_path(&actor_path)
1698            .expect("spawned actor registered on host")
1699            .addr;
1700        call_control_plane(
1701            &cluster_a.control_plane,
1702            control_plane_b,
1703            "engine-b.example",
1704            "actor.restart",
1705            json!({ "path": actor_path.as_str() }),
1706        )
1707        .await
1708        .expect("restart remote actor on host");
1709
1710        let restart_deadline = Instant::now() + Duration::from_secs(5);
1711        loop {
1712            if handle_b
1713                .registry
1714                .get_by_path(&actor_path)
1715                .map(|slot| {
1716                    slot.running.load(std::sync::atomic::Ordering::Relaxed)
1717                        && slot.addr != old_host_addr
1718                        && slot
1719                            .restart_count
1720                            .load(std::sync::atomic::Ordering::Relaxed)
1721                            > 0
1722                })
1723                .unwrap_or(false)
1724            {
1725                break;
1726            }
1727            if Instant::now() > restart_deadline {
1728                panic!("remote actor did not restart on host");
1729            }
1730            tokio::time::sleep(Duration::from_millis(25)).await;
1731        }
1732
1733        let restart_deadline = Instant::now() + Duration::from_secs(5);
1734        loop {
1735            if cluster_a.can_route(&actor_path) {
1736                break;
1737            }
1738            if Instant::now() > restart_deadline {
1739                panic!("remote actor route did not return after restart");
1740            }
1741            tokio::time::sleep(Duration::from_millis(25)).await;
1742        }
1743
1744        let restart_deadline = Instant::now() + Duration::from_secs(5);
1745        loop {
1746            match remote.ask(EchoRequest { value: 7 }).await {
1747                Ok(response) => {
1748                    assert_eq!(response, EchoResponse { doubled: 14 });
1749                    break;
1750                }
1751                Err(_) if Instant::now() <= restart_deadline => {
1752                    tokio::time::sleep(Duration::from_millis(25)).await;
1753                }
1754                Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1755            }
1756        }
1757
1758        let unknown = cluster_a
1759            .spawn_remote::<EchoRequest>(
1760                EngineId::new("engine-c.example"),
1761                RemoteSpawnSpec {
1762                    path: ActorPath::parse("/user/unknown-target").unwrap(),
1763                    type_name: "bounded-echo".to_string(),
1764                    config: None,
1765                },
1766            )
1767            .await
1768            .expect_err("unknown target should fail");
1769        assert_eq!(
1770            unknown,
1771            RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1772        );
1773
1774        shutdown_a_tx.send(()).ok();
1775        shutdown_b_tx.send(()).ok();
1776    }
1777
1778    #[tokio::test]
1779    async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1780        if !tcp_tests_available() {
1781            eprintln!("skipping tcp integration tests: network bind not permitted");
1782            return;
1783        }
1784
1785        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1786        let (ca, ca_key) = make_ca();
1787        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1788        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1789        let control_plane_b = reserve_tcp_addr();
1790
1791        let engine_a = Engine::with_config(EngineConfig {
1792            engine_id: EngineId::new("engine-a.example"),
1793            ..Default::default()
1794        });
1795        let engine_b = Engine::with_config(EngineConfig {
1796            engine_id: EngineId::new("engine-b.example"),
1797            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1798            control_plane_tls: Some(tls_b.clone()),
1799            actor_spawn: Some(bounded_spawn_handler()),
1800            ..Default::default()
1801        });
1802
1803        let handle_a = engine_a.handle();
1804        let handle_b = engine_b.handle();
1805        handle_a
1806            .type_registry()
1807            .register_remote_ask::<EchoRequest>();
1808        handle_b
1809            .type_registry()
1810            .register_remote_ask::<EchoRequest>();
1811
1812        let cluster_b = handle_b
1813            .attach_bounded_cluster(BoundedClusterConfig {
1814                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1815                peers: Vec::new(),
1816                declared_actors: Vec::new(),
1817                roles: Vec::new(),
1818            })
1819            .await
1820            .expect("attach bounded cluster b");
1821        let tcp_b = cluster_b
1822            .transport()
1823            .as_tcp()
1824            .cloned()
1825            .expect("tcp transport b");
1826
1827        let cluster_a = handle_a
1828            .attach_bounded_cluster(BoundedClusterConfig {
1829                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1830                peers: vec![PeerSpec {
1831                    engine_id: EngineId::new("engine-b.example"),
1832                    addr: tcp_b.local_addr(),
1833                    control_plane_addr: Some(control_plane_b),
1834                    server_name: None,
1835                }],
1836                declared_actors: Vec::new(),
1837                roles: vec![BoundedClusterRole {
1838                    name: "context-service".to_string(),
1839                    owner: EngineId::new("engine-b.example"),
1840                }],
1841            })
1842            .await
1843            .expect("attach bounded cluster a");
1844        let tcp_a = cluster_a
1845            .transport()
1846            .as_tcp()
1847            .cloned()
1848            .expect("tcp transport a");
1849
1850        let mut cluster_b = cluster_b;
1851        cluster_b
1852            .add_peer(PeerSpec {
1853                engine_id: EngineId::new("engine-a.example"),
1854                addr: tcp_a.local_addr(),
1855                control_plane_addr: None,
1856                server_name: None,
1857            })
1858            .expect("add peer a to cluster b");
1859
1860        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1861        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1862
1863        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1864        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1865        tokio::time::sleep(Duration::from_millis(150)).await;
1866
1867        let remote = cluster_a
1868            .spawn_remote_on_role::<EchoRequest>(
1869                "context-service",
1870                RemoteSpawnSpec {
1871                    path: actor_path.clone(),
1872                    type_name: "bounded-echo".to_string(),
1873                    config: None,
1874                },
1875            )
1876            .await
1877            .expect("spawn remote actor by role over tcp");
1878
1879        let response = remote
1880            .ask(EchoRequest { value: 11 })
1881            .await
1882            .expect("bounded remote spawn ask over tcp by role");
1883        assert_eq!(response, EchoResponse { doubled: 22 });
1884
1885        let unknown_role = cluster_a
1886            .spawn_remote_on_role::<EchoRequest>(
1887                "missing-role",
1888                RemoteSpawnSpec {
1889                    path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1890                    type_name: "bounded-echo".to_string(),
1891                    config: None,
1892                },
1893            )
1894            .await
1895            .expect_err("unknown role should fail");
1896        assert_eq!(
1897            unknown_role,
1898            RemoteSpawnError::UnknownRole("missing-role".to_string())
1899        );
1900
1901        shutdown_a_tx.send(()).ok();
1902        shutdown_b_tx.send(()).ok();
1903    }
1904
1905    #[tokio::test]
1906    async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
1907        if !tcp_tests_available() {
1908            eprintln!("skipping tcp integration tests: network bind not permitted");
1909            return;
1910        }
1911
1912        let engine_a_id = EngineId::new("engine-a-norev.example");
1913        let engine_b_id = EngineId::new("engine-b-norev.example");
1914        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
1915        let (ca, ca_key) = make_ca();
1916        let tls_a = make_tls_config(engine_a_id.as_str(), &ca, &ca_key);
1917        let tls_b = make_tls_config(engine_b_id.as_str(), &ca, &ca_key);
1918        let control_plane_b = reserve_tcp_addr();
1919
1920        let engine_a = Engine::with_config(EngineConfig {
1921            engine_id: engine_a_id.clone(),
1922            ..Default::default()
1923        });
1924        let engine_b = Engine::with_config(EngineConfig {
1925            engine_id: engine_b_id.clone(),
1926            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1927            control_plane_tls: Some(tls_b.clone()),
1928            actor_spawn: Some(bounded_spawn_handler()),
1929            ..Default::default()
1930        });
1931
1932        let handle_a = engine_a.handle();
1933        let handle_b = engine_b.handle();
1934        handle_a
1935            .type_registry()
1936            .register_remote_ask::<EchoRequest>();
1937        handle_b
1938            .type_registry()
1939            .register_remote_ask::<EchoRequest>();
1940
1941        let cluster_b = handle_b
1942            .attach_bounded_cluster(BoundedClusterConfig {
1943                transport: BoundedTransportConfig::Tcp(tcp_config(engine_b_id.as_str(), tls_b)),
1944                peers: Vec::new(),
1945                declared_actors: Vec::new(),
1946                roles: Vec::new(),
1947            })
1948            .await
1949            .expect("attach bounded cluster b");
1950        let tcp_b = cluster_b
1951            .transport()
1952            .as_tcp()
1953            .cloned()
1954            .expect("tcp transport b");
1955
1956        let cluster_a = handle_a
1957            .attach_bounded_cluster(BoundedClusterConfig {
1958                transport: BoundedTransportConfig::Tcp(tcp_config(engine_a_id.as_str(), tls_a)),
1959                peers: vec![PeerSpec {
1960                    engine_id: engine_b_id.clone(),
1961                    addr: tcp_b.local_addr(),
1962                    control_plane_addr: Some(control_plane_b),
1963                    server_name: None,
1964                }],
1965                declared_actors: Vec::new(),
1966                roles: vec![BoundedClusterRole {
1967                    name: "context-service".to_string(),
1968                    owner: engine_b_id.clone(),
1969                }],
1970            })
1971            .await
1972            .expect("attach bounded cluster a");
1973        let tcp_a = cluster_a
1974            .transport()
1975            .as_tcp()
1976            .cloned()
1977            .expect("tcp transport a");
1978
1979        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1980        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1981
1982        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1983        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1984
1985        // Caller knows the static owner already. Remote spawn should not require
1986        // the host engine to predeclare a reverse peer just to install the
1987        // caller's canonical path route.
1988        let _ = tcp_a;
1989
1990        let remote = cluster_a
1991            .spawn_remote_on_role::<EchoRequest>(
1992                "context-service",
1993                RemoteSpawnSpec {
1994                    path: actor_path.clone(),
1995                    type_name: "bounded-echo".to_string(),
1996                    config: None,
1997                },
1998            )
1999            .await
2000            .expect("spawn remote actor by role over tcp without reverse declaration");
2001
2002        let response = remote
2003            .ask(EchoRequest { value: 13 })
2004            .await
2005            .expect("bounded remote spawn ask over tcp without reverse declaration");
2006        assert_eq!(response, EchoResponse { doubled: 26 });
2007
2008        shutdown_a_tx.send(()).ok();
2009        shutdown_b_tx.send(()).ok();
2010    }
2011
2012    fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
2013        QuicTransportConfig {
2014            engine_id: EngineId::new(engine_id),
2015            listen_addr: "127.0.0.1:0".parse().unwrap(),
2016            send_buffer_size: 1024,
2017            idle_timeout: Duration::from_secs(2),
2018            tls,
2019        }
2020    }
2021
2022    #[tokio::test]
2023    async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
2024        if !quic_tests_available() {
2025            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2026            return;
2027        }
2028
2029        let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
2030        let (ca, ca_key) = make_ca();
2031        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2032        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2033
2034        let mut engine_a = Engine::with_config(EngineConfig {
2035            engine_id: EngineId::new("engine-a.example"),
2036            ..Default::default()
2037        });
2038        let engine_b = Engine::with_config(EngineConfig {
2039            engine_id: EngineId::new("engine-b.example"),
2040            ..Default::default()
2041        });
2042
2043        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2044        engine_a.add_user_actor(ChildSpec::new(
2045            "cluster-echo-quic",
2046            RestartPolicy::Permanent,
2047            ShutdownPolicy::Timeout(Duration::from_secs(1)),
2048            ns,
2049            move || Box::new(EchoActor),
2050        ));
2051
2052        let handle_a = engine_a.handle();
2053        let handle_b = engine_b.handle();
2054        handle_a
2055            .type_registry()
2056            .register_remote_ask::<EchoRequest>();
2057        handle_b
2058            .type_registry()
2059            .register_remote_ask::<EchoRequest>();
2060
2061        let cluster_a = match handle_a
2062            .attach_bounded_cluster(BoundedClusterConfig {
2063                transport: BoundedTransportConfig::Quic(quic_config(
2064                    "engine-a.example",
2065                    tls_a.clone(),
2066                )),
2067                peers: Vec::new(),
2068                declared_actors: Vec::new(),
2069                roles: Vec::new(),
2070            })
2071            .await
2072        {
2073            Ok(cluster) => cluster,
2074            Err(ClusterError::Transport(err)) => {
2075                eprintln!("skipping quic bounded-cluster test: {err:?}");
2076                return;
2077            }
2078            Err(err) => panic!("attach bounded cluster a: {err:?}"),
2079        };
2080        let quic_a = cluster_a
2081            .transport()
2082            .as_quic()
2083            .cloned()
2084            .expect("quic transport a");
2085
2086        let cluster_b = match handle_b
2087            .attach_bounded_cluster(BoundedClusterConfig {
2088                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2089                peers: vec![PeerSpec {
2090                    engine_id: EngineId::new("engine-a.example"),
2091                    addr: quic_a.local_addr(),
2092                    control_plane_addr: None,
2093                    server_name: None,
2094                }],
2095                declared_actors: Vec::new(),
2096                roles: Vec::new(),
2097            })
2098            .await
2099        {
2100            Ok(cluster) => cluster,
2101            Err(ClusterError::Transport(err)) => {
2102                eprintln!("skipping quic bounded-cluster test: {err:?}");
2103                return;
2104            }
2105            Err(err) => panic!("attach bounded cluster b: {err:?}"),
2106        };
2107        let quic_b = cluster_b
2108            .transport()
2109            .as_quic()
2110            .cloned()
2111            .expect("quic transport b");
2112
2113        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
2114        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2115        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2116
2117        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2118        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2119
2120        let canonical = AddrHash::new(&actor_path, 0);
2121        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2122        while !cluster_b.can_route(&actor_path) {
2123            if std::time::Instant::now() > deadline {
2124                panic!("bounded quic cluster route did not propagate");
2125            }
2126            tokio::time::sleep(Duration::from_millis(20)).await;
2127        }
2128        assert!(quic_b.can_route(canonical));
2129
2130        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
2131        let ask_deadline = Instant::now() + Duration::from_secs(2);
2132        loop {
2133            match remote.ask(EchoRequest { value: 15 }).await {
2134                Ok(response) => {
2135                    assert_eq!(response, EchoResponse { doubled: 30 });
2136                    break;
2137                }
2138                Err(palladium_actor::AskError::Send(
2139                    palladium_actor::SendError::ConnectionFailed,
2140                )) if Instant::now() <= ask_deadline => {
2141                    tokio::time::sleep(Duration::from_millis(20)).await;
2142                }
2143                Err(err) => panic!("bounded cluster ask over quic: {err:?}"),
2144            }
2145        }
2146
2147        shutdown_a_tx.send(()).ok();
2148        shutdown_b_tx.send(()).ok();
2149    }
2150
2151    #[tokio::test]
2152    async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
2153        if !quic_tests_available() {
2154            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2155            return;
2156        }
2157
2158        let (ca, ca_key) = make_ca();
2159        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
2160        let engine = Engine::with_config(EngineConfig {
2161            engine_id: EngineId::new("engine-a.example"),
2162            ..Default::default()
2163        });
2164        let handle = engine.handle();
2165
2166        let mut cluster = match handle
2167            .attach_bounded_cluster(BoundedClusterConfig {
2168                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
2169                peers: vec![PeerSpec {
2170                    engine_id: EngineId::new("engine-b.example"),
2171                    addr: "127.0.0.1:4100".parse().unwrap(),
2172                    control_plane_addr: None,
2173                    server_name: None,
2174                }],
2175                declared_actors: Vec::new(),
2176                roles: Vec::new(),
2177            })
2178            .await
2179        {
2180            Ok(cluster) => cluster,
2181            Err(ClusterError::Transport(err)) => {
2182                eprintln!("skipping quic bounded-cluster test: {err:?}");
2183                return;
2184            }
2185            Err(err) => panic!("attach bounded cluster: {err:?}"),
2186        };
2187
2188        let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
2189        cluster
2190            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
2191            .expect("declare remote actor");
2192
2193        assert!(cluster.can_route(&path));
2194        assert!(cluster
2195            .transport()
2196            .as_quic()
2197            .expect("quic transport")
2198            .can_route(AddrHash::new(&path, 0)));
2199    }
2200
2201    #[tokio::test]
2202    async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
2203        if !quic_tests_available() {
2204            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2205            return;
2206        }
2207
2208        let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
2209        let (ca, ca_key) = make_ca();
2210        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2211        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2212        let control_plane_b = reserve_udp_addr();
2213
2214        let engine_a = Engine::with_config(EngineConfig {
2215            engine_id: EngineId::new("engine-a.example"),
2216            ..Default::default()
2217        });
2218        let engine_b = Engine::with_config(EngineConfig {
2219            engine_id: EngineId::new("engine-b.example"),
2220            control_plane_quic_addr: Some(control_plane_b.to_string()),
2221            control_plane_tls: Some(tls_b.clone()),
2222            actor_spawn: Some(bounded_spawn_handler()),
2223            ..Default::default()
2224        });
2225
2226        let handle_a = engine_a.handle();
2227        let handle_b = engine_b.handle();
2228        handle_a
2229            .type_registry()
2230            .register_remote_ask::<EchoRequest>();
2231        handle_b
2232            .type_registry()
2233            .register_remote_ask::<EchoRequest>();
2234
2235        let cluster_b = match handle_b
2236            .attach_bounded_cluster(BoundedClusterConfig {
2237                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2238                peers: Vec::new(),
2239                declared_actors: Vec::new(),
2240                roles: Vec::new(),
2241            })
2242            .await
2243        {
2244            Ok(cluster) => cluster,
2245            Err(ClusterError::Transport(err)) => {
2246                eprintln!("skipping quic bounded-cluster test: {err:?}");
2247                return;
2248            }
2249            Err(err) => panic!("attach bounded cluster b: {err:?}"),
2250        };
2251        let quic_b = cluster_b
2252            .transport()
2253            .as_quic()
2254            .cloned()
2255            .expect("quic transport b");
2256
2257        let cluster_a = match handle_a
2258            .attach_bounded_cluster(BoundedClusterConfig {
2259                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
2260                peers: vec![PeerSpec {
2261                    engine_id: EngineId::new("engine-b.example"),
2262                    addr: quic_b.local_addr(),
2263                    control_plane_addr: Some(control_plane_b),
2264                    server_name: None,
2265                }],
2266                declared_actors: Vec::new(),
2267                roles: vec![BoundedClusterRole {
2268                    name: "context-service".to_string(),
2269                    owner: EngineId::new("engine-b.example"),
2270                }],
2271            })
2272            .await
2273        {
2274            Ok(cluster) => cluster,
2275            Err(ClusterError::Transport(err)) => {
2276                eprintln!("skipping quic bounded-cluster test: {err:?}");
2277                return;
2278            }
2279            Err(err) => panic!("attach bounded cluster a: {err:?}"),
2280        };
2281        let quic_a = cluster_a
2282            .transport()
2283            .as_quic()
2284            .cloned()
2285            .expect("quic transport a");
2286
2287        let mut cluster_b = cluster_b;
2288        cluster_b
2289            .add_peer(PeerSpec {
2290                engine_id: EngineId::new("engine-a.example"),
2291                addr: quic_a.local_addr(),
2292                control_plane_addr: None,
2293                server_name: None,
2294            })
2295            .expect("add quic peer a to cluster b");
2296
2297        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2298        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2299
2300        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2301        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2302        tokio::time::sleep(Duration::from_millis(150)).await;
2303
2304        let remote = cluster_a
2305            .spawn_remote_on_role::<EchoRequest>(
2306                "context-service",
2307                RemoteSpawnSpec {
2308                    path: actor_path.clone(),
2309                    type_name: "bounded-echo".to_string(),
2310                    config: None,
2311                },
2312            )
2313            .await
2314            .expect("spawn remote actor over quic by role");
2315
2316        let response = remote
2317            .ask(EchoRequest { value: 9 })
2318            .await
2319            .expect("bounded remote spawn ask over quic by role");
2320        assert_eq!(response, EchoResponse { doubled: 18 });
2321
2322        let unknown_role = cluster_a
2323            .spawn_remote_on_role::<EchoRequest>(
2324                "missing-role",
2325                RemoteSpawnSpec {
2326                    path: ActorPath::parse("/user/missing-role").unwrap(),
2327                    type_name: "bounded-echo".to_string(),
2328                    config: None,
2329                },
2330            )
2331            .await
2332            .expect_err("unknown role should fail");
2333        assert_eq!(
2334            unknown_role,
2335            RemoteSpawnError::UnknownRole("missing-role".to_string())
2336        );
2337
2338        shutdown_a_tx.send(()).ok();
2339        shutdown_b_tx.send(()).ok();
2340    }
2341}