Skip to main content

palladium_runtime/
bounded_cluster.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
10use palladium_transport::network::{Network, TokioNetwork};
11use palladium_transport::{
12    QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport, TransportError,
13};
14use rustls::pki_types::ServerName;
15use serde_json::{json, Value};
16use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
17use tokio_rustls::TlsConnector;
18
19#[derive(Clone)]
20pub struct BoundedClusterConfig {
21    pub transport: BoundedTransportConfig,
22    pub peers: Vec<PeerSpec>,
23    pub declared_actors: Vec<DeclaredRemoteActor>,
24    pub roles: Vec<BoundedClusterRole>,
25}
26
27#[derive(Clone)]
28pub enum BoundedTransportConfig {
29    Tcp(TcpTransportConfig),
30    Quic(QuicTransportConfig),
31}
32
33impl BoundedTransportConfig {
34    pub fn engine_id(&self) -> &EngineId {
35        match self {
36            Self::Tcp(config) => &config.engine_id,
37            Self::Quic(config) => &config.engine_id,
38        }
39    }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct PeerSpec {
44    pub engine_id: EngineId,
45    pub addr: SocketAddr,
46    pub control_plane_addr: Option<SocketAddr>,
47    pub server_name: Option<String>,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct DeclaredRemoteActor {
52    pub path: ActorPath,
53    pub owner: EngineId,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct BoundedClusterRole {
58    pub name: String,
59    pub owner: EngineId,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum ClusterError {
64    DuplicatePeer(EngineId),
65    LocalPeerDeclared(EngineId),
66    DuplicateDeclaredActor(ActorPath),
67    LocalActorDeclaredRemote(ActorPath),
68    UnknownPeerOwner { path: ActorPath, owner: EngineId },
69    DuplicateRole(String),
70    UnknownRoleOwner { role: String, owner: EngineId },
71    Transport(TransportError),
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub struct RemoteSpawnSpec {
76    pub path: ActorPath,
77    pub type_name: String,
78    pub config: Option<Value>,
79}
80
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum RemoteSpawnError {
83    UnknownPeer(EngineId),
84    UnknownRole(String),
85    MissingControlPlaneEndpoint(EngineId),
86    InvalidServerName(String),
87    ControlPlane(String),
88    SpawnTimedOut(ActorPath),
89}
90
91#[derive(Clone)]
92pub(crate) enum ControlPlaneProtocol {
93    Tcp,
94    Quic,
95}
96
97#[derive(Clone)]
98pub(crate) struct ControlPlaneClientConfig {
99    pub(crate) protocol: ControlPlaneProtocol,
100    pub(crate) tls: palladium_transport::TlsConfig,
101    pub(crate) wait_timeout: Duration,
102}
103
104const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
105
106impl From<TransportError> for ClusterError {
107    fn from(value: TransportError) -> Self {
108        Self::Transport(value)
109    }
110}
111
112impl From<TransportError> for RemoteSpawnError {
113    fn from(value: TransportError) -> Self {
114        Self::ControlPlane(format!("transport route install failed: {value:?}"))
115    }
116}
117
118#[derive(Clone)]
119pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
120    Tcp(Arc<TcpTransport<R, N>>),
121    Quic(Arc<QuicTransport<R, N>>),
122}
123
124impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
125    pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
126        match self {
127            Self::Tcp(transport) => Some(transport),
128            Self::Quic(_) => None,
129        }
130    }
131
132    pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
133        match self {
134            Self::Tcp(_) => None,
135            Self::Quic(transport) => Some(transport),
136        }
137    }
138
139    pub fn add_peer(&self, peer: &PeerSpec) {
140        match self {
141            Self::Tcp(transport) => {
142                if let Some(server_name) = &peer.server_name {
143                    transport.add_peer_with_server_name(
144                        peer.engine_id.clone(),
145                        peer.addr,
146                        server_name.clone(),
147                    );
148                } else {
149                    transport.add_peer(peer.engine_id.clone(), peer.addr);
150                }
151            }
152            Self::Quic(transport) => {
153                if let Some(server_name) = &peer.server_name {
154                    transport.add_peer_with_server_name(
155                        peer.engine_id.clone(),
156                        peer.addr,
157                        server_name.clone(),
158                    );
159                } else {
160                    transport.add_peer(peer.engine_id.clone(), peer.addr);
161                }
162            }
163        }
164    }
165
166    pub fn declare_remote_path(
167        &self,
168        owner: EngineId,
169        path: &ActorPath,
170    ) -> Result<(), TransportError> {
171        match self {
172            Self::Tcp(transport) => transport.declare_remote_path(owner, path),
173            Self::Quic(transport) => transport.declare_remote_path(owner, path),
174        }
175    }
176
177    pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
178        match self {
179            Self::Tcp(transport) => transport.undeclare_remote_path(path),
180            Self::Quic(transport) => transport.undeclare_remote_path(path),
181        }
182    }
183
184    pub fn can_route(&self, destination: AddrHash) -> bool {
185        match self {
186            Self::Tcp(transport) => transport.can_route(destination),
187            Self::Quic(transport) => transport.can_route(destination),
188        }
189    }
190}
191
192#[derive(Clone)]
193pub struct BoundedClusterHandle<
194    R: Reactor = TokioReactor,
195    N: Network = TokioNetwork,
196    F: FileSystem = TokioFileSystem,
197> {
198    engine: EngineHandle<R, N, F>,
199    local_engine_id: EngineId,
200    control_plane: ControlPlaneClientConfig,
201    transport: BoundedTransportHandle<R, N>,
202    peers: Vec<PeerSpec>,
203    declared_actors: Vec<DeclaredRemoteActor>,
204    roles: Vec<BoundedClusterRole>,
205    registry: Arc<ActorRegistry<R>>,
206}
207
208impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
209    pub(crate) fn new(
210        engine: EngineHandle<R, N, F>,
211        local_engine_id: EngineId,
212        control_plane: ControlPlaneClientConfig,
213        transport: BoundedTransportHandle<R, N>,
214        peers: Vec<PeerSpec>,
215        roles: Vec<BoundedClusterRole>,
216        registry: Arc<ActorRegistry<R>>,
217    ) -> Self {
218        Self {
219            engine,
220            local_engine_id,
221            control_plane,
222            transport,
223            peers,
224            declared_actors: Vec::new(),
225            roles,
226            registry,
227        }
228    }
229
230    pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
231        &self.transport
232    }
233
234    pub fn peers(&self) -> &[PeerSpec] {
235        &self.peers
236    }
237
238    pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
239        &self.declared_actors
240    }
241
242    pub fn roles(&self) -> &[BoundedClusterRole] {
243        &self.roles
244    }
245
246    pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
247        validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
248        self.transport.add_peer(&peer);
249        self.peers.push(peer);
250        Ok(())
251    }
252
253    pub fn declare_remote_actor(
254        &mut self,
255        owner: EngineId,
256        path: ActorPath,
257    ) -> Result<(), ClusterError> {
258        validate_declared_actor(
259            &path,
260            &owner,
261            &self.local_engine_id,
262            &self.peers,
263            &self.declared_actors,
264            &self.registry,
265        )?;
266        self.transport.declare_remote_path(owner.clone(), &path)?;
267        self.declared_actors
268            .push(DeclaredRemoteActor { path, owner });
269        Ok(())
270    }
271
272    pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
273        self.transport.undeclare_remote_path(path)?;
274        self.declared_actors
275            .retain(|declared| &declared.path != path);
276        Ok(())
277    }
278
279    pub fn can_route(&self, path: &ActorPath) -> bool {
280        self.transport.can_route(AddrHash::new(path, 0))
281    }
282
283    pub async fn spawn_remote_on_role<M>(
284        &self,
285        role: &str,
286        spec: RemoteSpawnSpec,
287    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
288    where
289        M: RemoteMessage,
290        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
291        R: Send + Sync,
292    {
293        let owner = self
294            .roles
295            .iter()
296            .find(|declared| declared.name == role)
297            .map(|declared| declared.owner.clone())
298            .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
299        self.spawn_remote::<M>(owner, spec).await
300    }
301
302    pub async fn spawn_remote<M>(
303        &self,
304        target: EngineId,
305        spec: RemoteSpawnSpec,
306    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
307    where
308        M: RemoteMessage,
309        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
310        R: Send + Sync,
311    {
312        let peer = self
313            .peers
314            .iter()
315            .find(|peer| peer.engine_id == target)
316            .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
317        let control_plane_addr = peer
318            .control_plane_addr
319            .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
320        let server_name = peer
321            .server_name
322            .clone()
323            .unwrap_or_else(|| target.as_str().to_string());
324
325        call_actor_spawn(&self.control_plane, control_plane_addr, &server_name, &spec).await?;
326
327        let deadline = Instant::now() + self.control_plane.wait_timeout;
328        while Instant::now() <= deadline {
329            if actor_is_spawned(
330                &self.control_plane,
331                control_plane_addr,
332                &server_name,
333                &spec.path,
334            )
335            .await?
336            {
337                self.transport
338                    .declare_remote_path(target.clone(), &spec.path)
339                    .map_err(RemoteSpawnError::from)?;
340                return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
341            }
342            tokio::time::sleep(Duration::from_millis(20)).await;
343        }
344
345        Err(RemoteSpawnError::SpawnTimedOut(spec.path))
346    }
347}
348
349async fn call_actor_spawn(
350    client: &ControlPlaneClientConfig,
351    addr: SocketAddr,
352    server_name: &str,
353    spec: &RemoteSpawnSpec,
354) -> Result<(), RemoteSpawnError> {
355    let params = match &spec.config {
356        Some(config) => json!({
357            "path": spec.path.as_str(),
358            "type_name": spec.type_name,
359            "config": config,
360        }),
361        None => json!({
362            "path": spec.path.as_str(),
363            "type_name": spec.type_name,
364        }),
365    };
366    call_control_plane(client, addr, server_name, "actor.spawn", params).await?;
367    Ok(())
368}
369
370async fn actor_is_spawned(
371    client: &ControlPlaneClientConfig,
372    addr: SocketAddr,
373    server_name: &str,
374    path: &ActorPath,
375) -> Result<bool, RemoteSpawnError> {
376    match call_control_plane(
377        client,
378        addr,
379        server_name,
380        "actor.info",
381        json!({ "path": path.as_str() }),
382    )
383    .await
384    {
385        Ok(_) => Ok(true),
386        Err(RemoteSpawnError::ControlPlane(message))
387            if message.contains("actor not found:")
388                || message.contains("actor not found")
389                || message.contains("User supervisor not available") =>
390        {
391            Ok(false)
392        }
393        Err(err) => Err(err),
394    }
395}
396
397async fn call_control_plane(
398    client: &ControlPlaneClientConfig,
399    addr: SocketAddr,
400    server_name: &str,
401    method: &str,
402    params: Value,
403) -> Result<Value, RemoteSpawnError> {
404    let request = json!({
405        "id": 1,
406        "method": method,
407        "params": params,
408    });
409
410    match client.protocol {
411        ControlPlaneProtocol::Tcp => {
412            call_control_plane_tcp(addr, server_name, &client.tls, &request).await
413        }
414        ControlPlaneProtocol::Quic => {
415            call_control_plane_quic(addr, server_name, &client.tls, &request).await
416        }
417    }
418}
419
420async fn call_control_plane_tcp(
421    addr: SocketAddr,
422    server_name: &str,
423    tls: &palladium_transport::TlsConfig,
424    request: &Value,
425) -> Result<Value, RemoteSpawnError> {
426    let config = tls
427        .client_config(server_name)
428        .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
429    let name = ServerName::try_from(server_name.to_string())
430        .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
431    let stream = tokio::net::TcpStream::connect(addr)
432        .await
433        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
434    let connector = TlsConnector::from(config);
435    let tls_stream = connector
436        .connect(name, stream)
437        .await
438        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
439
440    let (reader, mut writer) = tokio::io::split(tls_stream);
441    let mut body = request.to_string();
442    body.push('\n');
443    writer
444        .write_all(body.as_bytes())
445        .await
446        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
447    let mut line = String::new();
448    let mut reader = BufReader::new(reader);
449    reader
450        .read_line(&mut line)
451        .await
452        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
453    parse_control_plane_response(&line)
454}
455
456async fn call_control_plane_quic(
457    addr: SocketAddr,
458    server_name: &str,
459    tls: &palladium_transport::TlsConfig,
460    request: &Value,
461) -> Result<Value, RemoteSpawnError> {
462    ensure_rustls_provider();
463    let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
464        RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
465    })?)
466    .clone();
467    client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
468    let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
469
470    let client = s2n_quic::Client::builder()
471        .with_tls(client_tls)
472        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
473        .with_io("0.0.0.0:0")
474        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
475        .start()
476        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
477
478    let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
479    let mut conn = client
480        .connect(connect)
481        .await
482        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
483    let mut stream = conn
484        .open_bidirectional_stream()
485        .await
486        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
487
488    let mut body = request.to_string();
489    body.push('\n');
490    stream
491        .write_all(body.as_bytes())
492        .await
493        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
494    stream
495        .close()
496        .await
497        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
498
499    let mut buf = Vec::new();
500    stream
501        .read_to_end(&mut buf)
502        .await
503        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
504    let line = String::from_utf8_lossy(&buf);
505    parse_control_plane_response(&line)
506}
507
508fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
509    let response: Value = serde_json::from_str(line.trim())
510        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
511    if let Some(error) = response.get("error") {
512        let message = error
513            .get("message")
514            .and_then(Value::as_str)
515            .unwrap_or("unknown control-plane error");
516        return Err(RemoteSpawnError::ControlPlane(message.to_string()));
517    }
518    Ok(response.get("result").cloned().unwrap_or(Value::Null))
519}
520
521fn ensure_rustls_provider() {
522    static INIT: std::sync::Once = std::sync::Once::new();
523    INIT.call_once(|| {
524        #[cfg(feature = "aws-lc-rs")]
525        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
526        #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
527        let _ = rustls::crypto::ring::default_provider().install_default();
528    });
529}
530
531pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
532    ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
533}
534
535pub(crate) fn validate_config<R: Reactor>(
536    config: &BoundedClusterConfig,
537    registry: &Arc<ActorRegistry<R>>,
538) -> Result<(), ClusterError> {
539    let local_engine_id = config.transport.engine_id();
540    let mut seen_peers = Vec::new();
541    for peer in &config.peers {
542        validate_peer_spec(peer, local_engine_id, &seen_peers)?;
543        seen_peers.push(peer.clone());
544    }
545
546    let mut seen_declared = Vec::new();
547    for declared in &config.declared_actors {
548        validate_declared_actor(
549            &declared.path,
550            &declared.owner,
551            local_engine_id,
552            &config.peers,
553            &seen_declared,
554            registry,
555        )?;
556        seen_declared.push(declared.clone());
557    }
558
559    let mut seen_roles = Vec::new();
560    for role in &config.roles {
561        validate_role(role, &config.peers, &seen_roles)?;
562        seen_roles.push(role.clone());
563    }
564
565    Ok(())
566}
567
568fn validate_peer_spec(
569    peer: &PeerSpec,
570    local_engine_id: &EngineId,
571    existing: &[PeerSpec],
572) -> Result<(), ClusterError> {
573    if &peer.engine_id == local_engine_id {
574        return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
575    }
576    if existing
577        .iter()
578        .any(|existing| existing.engine_id == peer.engine_id)
579    {
580        return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
581    }
582    Ok(())
583}
584
585fn validate_declared_actor<R: Reactor>(
586    path: &ActorPath,
587    owner: &EngineId,
588    local_engine_id: &EngineId,
589    peers: &[PeerSpec],
590    existing: &[DeclaredRemoteActor],
591    registry: &Arc<ActorRegistry<R>>,
592) -> Result<(), ClusterError> {
593    if owner == local_engine_id || registry.get_by_path(path).is_some() {
594        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
595    }
596    if existing.iter().any(|existing| existing.path == *path) {
597        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
598    }
599    if !peers.iter().any(|peer| peer.engine_id == *owner) {
600        return Err(ClusterError::UnknownPeerOwner {
601            path: path.clone(),
602            owner: owner.clone(),
603        });
604    }
605    Ok(())
606}
607
608fn validate_role(
609    role: &BoundedClusterRole,
610    peers: &[PeerSpec],
611    existing: &[BoundedClusterRole],
612) -> Result<(), ClusterError> {
613    if existing.iter().any(|existing| existing.name == role.name) {
614        return Err(ClusterError::DuplicateRole(role.name.clone()));
615    }
616    if !peers.iter().any(|peer| peer.engine_id == role.owner) {
617        return Err(ClusterError::UnknownRoleOwner {
618            role: role.name.clone(),
619            owner: role.owner.clone(),
620        });
621    }
622    Ok(())
623}
624
625#[cfg(test)]
626mod tests {
627    use super::{
628        validate_config, BoundedClusterConfig, BoundedClusterRole, BoundedTransportConfig,
629        ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError, RemoteSpawnSpec,
630    };
631    use crate::{Engine, EngineConfig};
632    use palladium_actor::{
633        Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
634        Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
635    };
636    use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
637    use rcgen::{
638        BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
639        KeyUsagePurpose,
640    };
641    use std::net::{SocketAddr, TcpListener, UdpSocket};
642    use std::sync::Arc;
643    use std::time::{Duration, Instant};
644
645    fn make_ca() -> (Certificate, KeyPair) {
646        let mut params = CertificateParams::default();
647        params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
648        params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
649        let ca_key = KeyPair::generate().expect("ca keypair");
650        let cert = params.self_signed(&ca_key).expect("ca cert");
651        (cert, ca_key)
652    }
653
654    fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
655        let keypair = KeyPair::generate().expect("leaf keypair");
656        let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
657        let mut params = params;
658        params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
659        params.extended_key_usages = vec![
660            ExtendedKeyUsagePurpose::ServerAuth,
661            ExtendedKeyUsagePurpose::ClientAuth,
662        ];
663        let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
664        let cert_der = cert.der().to_vec();
665        let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
666        let ca_der = ca.der().to_vec();
667        TlsConfig {
668            cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
669            private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
670            trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
671        }
672    }
673
674    fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
675        TcpTransportConfig {
676            engine_id: EngineId::new(engine_id),
677            listen_addr: "127.0.0.1:0".parse().unwrap(),
678            max_connections_per_peer: 1,
679            idle_timeout: Duration::from_secs(1),
680            send_buffer_size: 8,
681            nodelay: true,
682            tls,
683            send_delay: Duration::from_millis(0),
684        }
685    }
686
687    fn reserve_tcp_addr() -> SocketAddr {
688        let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
689        let addr = listener.local_addr().expect("tcp local addr");
690        drop(listener);
691        addr
692    }
693
694    fn reserve_udp_addr() -> SocketAddr {
695        let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
696        let addr = socket.local_addr().expect("udp local addr");
697        drop(socket);
698        addr
699    }
700
701    #[test]
702    fn bounded_cluster_config_rejects_duplicate_peers() {
703        let (ca, ca_key) = make_ca();
704        let engine = Engine::with_config(EngineConfig {
705            engine_id: EngineId::new("engine-a.example"),
706            ..Default::default()
707        });
708        let config = BoundedClusterConfig {
709            transport: BoundedTransportConfig::Tcp(tcp_config(
710                "engine-a.example",
711                make_tls_config("engine-a.example", &ca, &ca_key),
712            )),
713            peers: vec![
714                PeerSpec {
715                    engine_id: EngineId::new("engine-b.example"),
716                    addr: "127.0.0.1:4100".parse().unwrap(),
717                    control_plane_addr: None,
718                    server_name: None,
719                },
720                PeerSpec {
721                    engine_id: EngineId::new("engine-b.example"),
722                    addr: "127.0.0.1:4200".parse().unwrap(),
723                    control_plane_addr: None,
724                    server_name: None,
725                },
726            ],
727            declared_actors: Vec::new(),
728            roles: Vec::new(),
729        };
730
731        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
732        assert_eq!(
733            err,
734            ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
735        );
736    }
737
738    #[test]
739    fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
740        let (ca, ca_key) = make_ca();
741        let engine = Engine::with_config(EngineConfig {
742            engine_id: EngineId::new("engine-a.example"),
743            ..Default::default()
744        });
745        let config = BoundedClusterConfig {
746            transport: BoundedTransportConfig::Tcp(tcp_config(
747                "engine-a.example",
748                make_tls_config("engine-a.example", &ca, &ca_key),
749            )),
750            peers: vec![PeerSpec {
751                engine_id: EngineId::new("engine-b.example"),
752                addr: "127.0.0.1:4100".parse().unwrap(),
753                control_plane_addr: None,
754                server_name: None,
755            }],
756            declared_actors: vec![DeclaredRemoteActor {
757                path: ActorPath::parse("/user/missing-owner").unwrap(),
758                owner: EngineId::new("engine-c.example"),
759            }],
760            roles: Vec::new(),
761        };
762
763        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
764        assert_eq!(
765            err,
766            ClusterError::UnknownPeerOwner {
767                path: ActorPath::parse("/user/missing-owner").unwrap(),
768                owner: EngineId::new("engine-c.example"),
769            }
770        );
771    }
772
773    #[test]
774    fn bounded_cluster_config_rejects_duplicate_roles() {
775        let (ca, ca_key) = make_ca();
776        let engine = Engine::with_config(EngineConfig {
777            engine_id: EngineId::new("engine-a.example"),
778            ..Default::default()
779        });
780        let config = BoundedClusterConfig {
781            transport: BoundedTransportConfig::Tcp(tcp_config(
782                "engine-a.example",
783                make_tls_config("engine-a.example", &ca, &ca_key),
784            )),
785            peers: vec![PeerSpec {
786                engine_id: EngineId::new("engine-b.example"),
787                addr: "127.0.0.1:4100".parse().unwrap(),
788                control_plane_addr: None,
789                server_name: None,
790            }],
791            declared_actors: Vec::new(),
792            roles: vec![
793                BoundedClusterRole {
794                    name: "context-service".to_string(),
795                    owner: EngineId::new("engine-b.example"),
796                },
797                BoundedClusterRole {
798                    name: "context-service".to_string(),
799                    owner: EngineId::new("engine-b.example"),
800                },
801            ],
802        };
803
804        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
805        assert_eq!(
806            err,
807            ClusterError::DuplicateRole("context-service".to_string())
808        );
809    }
810
811    #[test]
812    fn bounded_cluster_config_rejects_unknown_role_owner() {
813        let (ca, ca_key) = make_ca();
814        let engine = Engine::with_config(EngineConfig {
815            engine_id: EngineId::new("engine-a.example"),
816            ..Default::default()
817        });
818        let config = BoundedClusterConfig {
819            transport: BoundedTransportConfig::Tcp(tcp_config(
820                "engine-a.example",
821                make_tls_config("engine-a.example", &ca, &ca_key),
822            )),
823            peers: vec![PeerSpec {
824                engine_id: EngineId::new("engine-b.example"),
825                addr: "127.0.0.1:4100".parse().unwrap(),
826                control_plane_addr: None,
827                server_name: None,
828            }],
829            declared_actors: Vec::new(),
830            roles: vec![BoundedClusterRole {
831                name: "gateway".to_string(),
832                owner: EngineId::new("engine-c.example"),
833            }],
834        };
835
836        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
837        assert_eq!(
838            err,
839            ClusterError::UnknownRoleOwner {
840                role: "gateway".to_string(),
841                owner: EngineId::new("engine-c.example"),
842            }
843        );
844    }
845
846    #[test]
847    fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
848        assert_eq!(
849            super::remote_spawn_wait_timeout(Duration::from_millis(25)),
850            Duration::from_secs(5)
851        );
852        assert_eq!(
853            super::remote_spawn_wait_timeout(Duration::from_secs(8)),
854            Duration::from_secs(8)
855        );
856    }
857
858    fn tcp_tests_available() -> bool {
859        if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
860            return false;
861        }
862        std::net::TcpListener::bind("127.0.0.1:0").is_ok()
863    }
864
865    fn quic_tests_available() -> bool {
866        rustls::crypto::CryptoProvider::get_default().is_some()
867    }
868
869    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
870    struct EchoRequest {
871        value: u64,
872    }
873
874    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
875    struct EchoResponse {
876        doubled: u64,
877    }
878
879    impl Message for EchoRequest {
880        type Response = EchoResponse;
881        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
882    }
883
884    struct EchoActor;
885
886    impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
887        fn on_message(
888            &mut self,
889            ctx: &mut ActorContext<R>,
890            envelope: &Envelope,
891            payload: MessagePayload,
892        ) -> Result<(), ActorError> {
893            let req = payload
894                .extract::<EchoRequest>()
895                .map_err(|_| ActorError::Handler)?;
896            let resp = EchoResponse {
897                doubled: req.value * 2,
898            };
899            let resp_env = envelope.response(0);
900            ctx.send_raw(resp_env, MessagePayload::local(resp))
901                .map_err(|_| ActorError::Handler)
902        }
903    }
904
905    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
906    struct CrashRequest;
907
908    impl Message for CrashRequest {
909        type Response = ();
910        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
911    }
912
913    struct SpawnedEchoActor;
914
915    impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
916        fn on_message(
917            &mut self,
918            ctx: &mut ActorContext<R>,
919            envelope: &Envelope,
920            payload: MessagePayload,
921        ) -> Result<(), ActorError> {
922            match envelope.type_tag {
923                EchoRequest::TYPE_TAG => {
924                    let req = payload
925                        .extract::<EchoRequest>()
926                        .map_err(|_| ActorError::Handler)?;
927                    let resp = EchoResponse {
928                        doubled: req.value * 2,
929                    };
930                    let resp_env = envelope.response(0);
931                    ctx.send_raw(resp_env, MessagePayload::local(resp))
932                        .map_err(|_| ActorError::Handler)
933                }
934                CrashRequest::TYPE_TAG => Err(ActorError::Handler),
935                _ => Err(ActorError::Handler),
936            }
937        }
938    }
939
940    fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
941        Arc::new(|type_name: &str, _config: &[u8]| match type_name {
942            "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
943            _ => Err(format!("unknown remote spawn type: {type_name}")),
944        })
945    }
946
947    #[tokio::test]
948    async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
949        if !tcp_tests_available() {
950            eprintln!("skipping tcp integration tests: network bind not permitted");
951            return;
952        }
953
954        let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
955        let (ca, ca_key) = make_ca();
956        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
957        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
958
959        let mut engine_a = Engine::with_config(EngineConfig {
960            engine_id: EngineId::new("engine-a.example"),
961            ..Default::default()
962        });
963        let engine_b = Engine::with_config(EngineConfig {
964            engine_id: EngineId::new("engine-b.example"),
965            ..Default::default()
966        });
967
968        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
969        engine_a.add_user_actor(ChildSpec::new(
970            "cluster-echo",
971            RestartPolicy::Permanent,
972            ShutdownPolicy::Timeout(Duration::from_secs(1)),
973            ns,
974            move || Box::new(EchoActor),
975        ));
976
977        let handle_a = engine_a.handle();
978        let handle_b = engine_b.handle();
979        handle_a
980            .type_registry()
981            .register_remote_ask::<EchoRequest>();
982        handle_b
983            .type_registry()
984            .register_remote_ask::<EchoRequest>();
985
986        let cluster_a = handle_a
987            .attach_bounded_cluster(BoundedClusterConfig {
988                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
989                peers: Vec::new(),
990                declared_actors: Vec::new(),
991                roles: Vec::new(),
992            })
993            .await
994            .expect("attach bounded cluster a");
995        let tcp_a = cluster_a
996            .transport()
997            .as_tcp()
998            .cloned()
999            .expect("tcp transport a");
1000
1001        let cluster_b = handle_b
1002            .attach_bounded_cluster(BoundedClusterConfig {
1003                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1004                peers: vec![PeerSpec {
1005                    engine_id: EngineId::new("engine-a.example"),
1006                    addr: tcp_a.local_addr(),
1007                    control_plane_addr: None,
1008                    server_name: None,
1009                }],
1010                declared_actors: Vec::new(),
1011                roles: Vec::new(),
1012            })
1013            .await
1014            .expect("attach bounded cluster b");
1015        let tcp_b = cluster_b
1016            .transport()
1017            .as_tcp()
1018            .cloned()
1019            .expect("tcp transport b");
1020
1021        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1022        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1023        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1024
1025        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1026        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1027
1028        let canonical = AddrHash::new(&actor_path, 0);
1029        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1030        while !cluster_b.can_route(&actor_path) {
1031            if std::time::Instant::now() > deadline {
1032                panic!("bounded cluster route did not propagate");
1033            }
1034            tokio::time::sleep(Duration::from_millis(20)).await;
1035        }
1036        assert!(tcp_b.can_route(canonical));
1037
1038        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1039        let response = remote
1040            .ask(EchoRequest { value: 12 })
1041            .await
1042            .expect("bounded cluster ask");
1043        assert_eq!(response, EchoResponse { doubled: 24 });
1044
1045        shutdown_a_tx.send(()).ok();
1046        shutdown_b_tx.send(()).ok();
1047    }
1048
1049    #[tokio::test]
1050    async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1051        if !tcp_tests_available() {
1052            eprintln!("skipping tcp integration tests: network bind not permitted");
1053            return;
1054        }
1055
1056        let (ca, ca_key) = make_ca();
1057        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1058        let engine = Engine::with_config(EngineConfig {
1059            engine_id: EngineId::new("engine-a.example"),
1060            ..Default::default()
1061        });
1062        let handle = engine.handle();
1063
1064        let mut cluster = handle
1065            .attach_bounded_cluster(BoundedClusterConfig {
1066                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1067                peers: vec![PeerSpec {
1068                    engine_id: EngineId::new("engine-b.example"),
1069                    addr: "127.0.0.1:4100".parse().unwrap(),
1070                    control_plane_addr: None,
1071                    server_name: None,
1072                }],
1073                declared_actors: Vec::new(),
1074                roles: Vec::new(),
1075            })
1076            .await
1077            .expect("attach bounded cluster");
1078
1079        let path = ActorPath::parse("/user/declared-remote").unwrap();
1080        cluster
1081            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1082            .expect("declare remote actor");
1083
1084        assert!(cluster.can_route(&path));
1085        assert!(cluster
1086            .transport()
1087            .as_tcp()
1088            .expect("tcp transport")
1089            .can_route(AddrHash::new(&path, 0)));
1090    }
1091
1092    #[tokio::test]
1093    async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1094        if !tcp_tests_available() {
1095            eprintln!("skipping tcp integration tests: network bind not permitted");
1096            return;
1097        }
1098
1099        let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1100        let (ca, ca_key) = make_ca();
1101        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1102        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1103        let control_plane_b = reserve_tcp_addr();
1104
1105        let engine_a = Engine::with_config(EngineConfig {
1106            engine_id: EngineId::new("engine-a.example"),
1107            ..Default::default()
1108        });
1109        let engine_b = Engine::with_config(EngineConfig {
1110            engine_id: EngineId::new("engine-b.example"),
1111            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1112            control_plane_tls: Some(tls_b.clone()),
1113            actor_spawn: Some(bounded_spawn_handler()),
1114            ..Default::default()
1115        });
1116
1117        let handle_a = engine_a.handle();
1118        let handle_b = engine_b.handle();
1119        handle_a
1120            .type_registry()
1121            .register_remote_ask::<EchoRequest>();
1122        handle_b
1123            .type_registry()
1124            .register_remote_ask::<EchoRequest>();
1125
1126        let cluster_a = handle_a
1127            .attach_bounded_cluster(BoundedClusterConfig {
1128                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1129                peers: Vec::new(),
1130                declared_actors: Vec::new(),
1131                roles: Vec::new(),
1132            })
1133            .await
1134            .expect("attach bounded cluster a");
1135        let tcp_a = cluster_a
1136            .transport()
1137            .as_tcp()
1138            .cloned()
1139            .expect("tcp transport a");
1140
1141        let cluster_b = handle_b
1142            .attach_bounded_cluster(BoundedClusterConfig {
1143                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1144                peers: vec![PeerSpec {
1145                    engine_id: EngineId::new("engine-a.example"),
1146                    addr: tcp_a.local_addr(),
1147                    control_plane_addr: None,
1148                    server_name: None,
1149                }],
1150                declared_actors: Vec::new(),
1151                roles: Vec::new(),
1152            })
1153            .await
1154            .expect("attach bounded cluster b");
1155        let tcp_b = cluster_b
1156            .transport()
1157            .as_tcp()
1158            .cloned()
1159            .expect("tcp transport b");
1160
1161        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1162        let mut cluster_a = cluster_a;
1163        cluster_a
1164            .add_peer(PeerSpec {
1165                engine_id: EngineId::new("engine-b.example"),
1166                addr: tcp_b.local_addr(),
1167                control_plane_addr: Some(control_plane_b),
1168                server_name: None,
1169            })
1170            .expect("add peer with control plane");
1171
1172        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1173        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1174
1175        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1176        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1177        tokio::time::sleep(Duration::from_millis(150)).await;
1178
1179        let remote = cluster_a
1180            .spawn_remote::<EchoRequest>(
1181                EngineId::new("engine-b.example"),
1182                RemoteSpawnSpec {
1183                    path: actor_path.clone(),
1184                    type_name: "bounded-echo".to_string(),
1185                    config: None,
1186                },
1187            )
1188            .await
1189            .expect("spawn remote actor");
1190
1191        let response = remote
1192            .ask(EchoRequest { value: 21 })
1193            .await
1194            .expect("bounded remote spawn ask");
1195        assert_eq!(response, EchoResponse { doubled: 42 });
1196
1197        let duplicate = cluster_a
1198            .spawn_remote::<EchoRequest>(
1199                EngineId::new("engine-b.example"),
1200                RemoteSpawnSpec {
1201                    path: actor_path.clone(),
1202                    type_name: "bounded-echo".to_string(),
1203                    config: None,
1204                },
1205            )
1206            .await
1207            .expect_err("duplicate remote spawn should fail");
1208        assert!(
1209            matches!(duplicate, RemoteSpawnError::ControlPlane(ref message) if message.contains("actor already running")),
1210            "unexpected duplicate error: {duplicate:?}"
1211        );
1212
1213        let slot = handle_b
1214            .registry
1215            .get_by_path(&actor_path)
1216            .expect("spawned actor registered on host");
1217        handle_b.registry.increment_restart_count(slot.addr);
1218        slot.ctrl_tx
1219            .send(crate::common::LifecycleSignal::Stop(
1220                palladium_actor::StopReason::Killed,
1221            ))
1222            .ok();
1223
1224        let restart_deadline = Instant::now() + Duration::from_secs(5);
1225        loop {
1226            match remote.ask(EchoRequest { value: 7 }).await {
1227                Ok(response) => {
1228                    assert_eq!(response, EchoResponse { doubled: 14 });
1229                    break;
1230                }
1231                Err(_) if Instant::now() <= restart_deadline => {
1232                    tokio::time::sleep(Duration::from_millis(25)).await;
1233                }
1234                Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1235            }
1236        }
1237
1238        let unknown = cluster_a
1239            .spawn_remote::<EchoRequest>(
1240                EngineId::new("engine-c.example"),
1241                RemoteSpawnSpec {
1242                    path: ActorPath::parse("/user/unknown-target").unwrap(),
1243                    type_name: "bounded-echo".to_string(),
1244                    config: None,
1245                },
1246            )
1247            .await
1248            .expect_err("unknown target should fail");
1249        assert_eq!(
1250            unknown,
1251            RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1252        );
1253
1254        shutdown_a_tx.send(()).ok();
1255        shutdown_b_tx.send(()).ok();
1256    }
1257
1258    #[tokio::test]
1259    async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1260        if !tcp_tests_available() {
1261            eprintln!("skipping tcp integration tests: network bind not permitted");
1262            return;
1263        }
1264
1265        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1266        let (ca, ca_key) = make_ca();
1267        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1268        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1269        let control_plane_b = reserve_tcp_addr();
1270
1271        let engine_a = Engine::with_config(EngineConfig {
1272            engine_id: EngineId::new("engine-a.example"),
1273            ..Default::default()
1274        });
1275        let engine_b = Engine::with_config(EngineConfig {
1276            engine_id: EngineId::new("engine-b.example"),
1277            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1278            control_plane_tls: Some(tls_b.clone()),
1279            actor_spawn: Some(bounded_spawn_handler()),
1280            ..Default::default()
1281        });
1282
1283        let handle_a = engine_a.handle();
1284        let handle_b = engine_b.handle();
1285        handle_a
1286            .type_registry()
1287            .register_remote_ask::<EchoRequest>();
1288        handle_b
1289            .type_registry()
1290            .register_remote_ask::<EchoRequest>();
1291
1292        let cluster_b = handle_b
1293            .attach_bounded_cluster(BoundedClusterConfig {
1294                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1295                peers: Vec::new(),
1296                declared_actors: Vec::new(),
1297                roles: Vec::new(),
1298            })
1299            .await
1300            .expect("attach bounded cluster b");
1301        let tcp_b = cluster_b
1302            .transport()
1303            .as_tcp()
1304            .cloned()
1305            .expect("tcp transport b");
1306
1307        let cluster_a = handle_a
1308            .attach_bounded_cluster(BoundedClusterConfig {
1309                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1310                peers: vec![PeerSpec {
1311                    engine_id: EngineId::new("engine-b.example"),
1312                    addr: tcp_b.local_addr(),
1313                    control_plane_addr: Some(control_plane_b),
1314                    server_name: None,
1315                }],
1316                declared_actors: Vec::new(),
1317                roles: vec![BoundedClusterRole {
1318                    name: "context-service".to_string(),
1319                    owner: EngineId::new("engine-b.example"),
1320                }],
1321            })
1322            .await
1323            .expect("attach bounded cluster a");
1324        let tcp_a = cluster_a
1325            .transport()
1326            .as_tcp()
1327            .cloned()
1328            .expect("tcp transport a");
1329
1330        let mut cluster_b = cluster_b;
1331        cluster_b
1332            .add_peer(PeerSpec {
1333                engine_id: EngineId::new("engine-a.example"),
1334                addr: tcp_a.local_addr(),
1335                control_plane_addr: None,
1336                server_name: None,
1337            })
1338            .expect("add peer a to cluster b");
1339
1340        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1341        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1342
1343        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1344        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1345        tokio::time::sleep(Duration::from_millis(150)).await;
1346
1347        let remote = cluster_a
1348            .spawn_remote_on_role::<EchoRequest>(
1349                "context-service",
1350                RemoteSpawnSpec {
1351                    path: actor_path.clone(),
1352                    type_name: "bounded-echo".to_string(),
1353                    config: None,
1354                },
1355            )
1356            .await
1357            .expect("spawn remote actor by role over tcp");
1358
1359        let response = remote
1360            .ask(EchoRequest { value: 11 })
1361            .await
1362            .expect("bounded remote spawn ask over tcp by role");
1363        assert_eq!(response, EchoResponse { doubled: 22 });
1364
1365        let unknown_role = cluster_a
1366            .spawn_remote_on_role::<EchoRequest>(
1367                "missing-role",
1368                RemoteSpawnSpec {
1369                    path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1370                    type_name: "bounded-echo".to_string(),
1371                    config: None,
1372                },
1373            )
1374            .await
1375            .expect_err("unknown role should fail");
1376        assert_eq!(
1377            unknown_role,
1378            RemoteSpawnError::UnknownRole("missing-role".to_string())
1379        );
1380
1381        shutdown_a_tx.send(()).ok();
1382        shutdown_b_tx.send(()).ok();
1383    }
1384
1385    #[tokio::test]
1386    async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
1387        if !tcp_tests_available() {
1388            eprintln!("skipping tcp integration tests: network bind not permitted");
1389            return;
1390        }
1391
1392        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
1393        let (ca, ca_key) = make_ca();
1394        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1395        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1396        let control_plane_b = reserve_tcp_addr();
1397
1398        let engine_a = Engine::with_config(EngineConfig {
1399            engine_id: EngineId::new("engine-a.example"),
1400            ..Default::default()
1401        });
1402        let engine_b = Engine::with_config(EngineConfig {
1403            engine_id: EngineId::new("engine-b.example"),
1404            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1405            control_plane_tls: Some(tls_b.clone()),
1406            actor_spawn: Some(bounded_spawn_handler()),
1407            ..Default::default()
1408        });
1409
1410        let handle_a = engine_a.handle();
1411        let handle_b = engine_b.handle();
1412        handle_a
1413            .type_registry()
1414            .register_remote_ask::<EchoRequest>();
1415        handle_b
1416            .type_registry()
1417            .register_remote_ask::<EchoRequest>();
1418
1419        let cluster_b = handle_b
1420            .attach_bounded_cluster(BoundedClusterConfig {
1421                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1422                peers: Vec::new(),
1423                declared_actors: Vec::new(),
1424                roles: Vec::new(),
1425            })
1426            .await
1427            .expect("attach bounded cluster b");
1428        let tcp_b = cluster_b
1429            .transport()
1430            .as_tcp()
1431            .cloned()
1432            .expect("tcp transport b");
1433
1434        let cluster_a = handle_a
1435            .attach_bounded_cluster(BoundedClusterConfig {
1436                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1437                peers: vec![PeerSpec {
1438                    engine_id: EngineId::new("engine-b.example"),
1439                    addr: tcp_b.local_addr(),
1440                    control_plane_addr: Some(control_plane_b),
1441                    server_name: None,
1442                }],
1443                declared_actors: Vec::new(),
1444                roles: vec![BoundedClusterRole {
1445                    name: "context-service".to_string(),
1446                    owner: EngineId::new("engine-b.example"),
1447                }],
1448            })
1449            .await
1450            .expect("attach bounded cluster a");
1451        let tcp_a = cluster_a
1452            .transport()
1453            .as_tcp()
1454            .cloned()
1455            .expect("tcp transport a");
1456
1457        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1458        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1459
1460        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1461        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1462
1463        // Caller knows the static owner already. Remote spawn should not require
1464        // the host engine to predeclare a reverse peer just to install the
1465        // caller's canonical path route.
1466        let _ = tcp_a;
1467
1468        let remote = cluster_a
1469            .spawn_remote_on_role::<EchoRequest>(
1470                "context-service",
1471                RemoteSpawnSpec {
1472                    path: actor_path.clone(),
1473                    type_name: "bounded-echo".to_string(),
1474                    config: None,
1475                },
1476            )
1477            .await
1478            .expect("spawn remote actor by role over tcp without reverse declaration");
1479
1480        let response = remote
1481            .ask(EchoRequest { value: 13 })
1482            .await
1483            .expect("bounded remote spawn ask over tcp without reverse declaration");
1484        assert_eq!(response, EchoResponse { doubled: 26 });
1485
1486        shutdown_a_tx.send(()).ok();
1487        shutdown_b_tx.send(()).ok();
1488    }
1489
1490    fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
1491        QuicTransportConfig {
1492            engine_id: EngineId::new(engine_id),
1493            listen_addr: "127.0.0.1:0".parse().unwrap(),
1494            send_buffer_size: 1024,
1495            idle_timeout: Duration::from_secs(2),
1496            tls,
1497        }
1498    }
1499
1500    #[tokio::test]
1501    async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
1502        if !quic_tests_available() {
1503            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1504            return;
1505        }
1506
1507        let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
1508        let (ca, ca_key) = make_ca();
1509        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1510        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1511
1512        let mut engine_a = Engine::with_config(EngineConfig {
1513            engine_id: EngineId::new("engine-a.example"),
1514            ..Default::default()
1515        });
1516        let engine_b = Engine::with_config(EngineConfig {
1517            engine_id: EngineId::new("engine-b.example"),
1518            ..Default::default()
1519        });
1520
1521        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1522        engine_a.add_user_actor(ChildSpec::new(
1523            "cluster-echo-quic",
1524            RestartPolicy::Permanent,
1525            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1526            ns,
1527            move || Box::new(EchoActor),
1528        ));
1529
1530        let handle_a = engine_a.handle();
1531        let handle_b = engine_b.handle();
1532        handle_a
1533            .type_registry()
1534            .register_remote_ask::<EchoRequest>();
1535        handle_b
1536            .type_registry()
1537            .register_remote_ask::<EchoRequest>();
1538
1539        let cluster_a = match handle_a
1540            .attach_bounded_cluster(BoundedClusterConfig {
1541                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1542                peers: Vec::new(),
1543                declared_actors: Vec::new(),
1544                roles: vec![BoundedClusterRole {
1545                    name: "context-service".to_string(),
1546                    owner: EngineId::new("engine-b.example"),
1547                }],
1548            })
1549            .await
1550        {
1551            Ok(cluster) => cluster,
1552            Err(ClusterError::Transport(err)) => {
1553                eprintln!("skipping quic bounded-cluster test: {err:?}");
1554                return;
1555            }
1556            Err(err) => panic!("attach bounded cluster a: {err:?}"),
1557        };
1558        let quic_a = cluster_a
1559            .transport()
1560            .as_quic()
1561            .cloned()
1562            .expect("quic transport a");
1563
1564        let cluster_b = match handle_b
1565            .attach_bounded_cluster(BoundedClusterConfig {
1566                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1567                peers: vec![PeerSpec {
1568                    engine_id: EngineId::new("engine-a.example"),
1569                    addr: quic_a.local_addr(),
1570                    control_plane_addr: None,
1571                    server_name: None,
1572                }],
1573                declared_actors: Vec::new(),
1574                roles: Vec::new(),
1575            })
1576            .await
1577        {
1578            Ok(cluster) => cluster,
1579            Err(ClusterError::Transport(err)) => {
1580                eprintln!("skipping quic bounded-cluster test: {err:?}");
1581                return;
1582            }
1583            Err(err) => panic!("attach bounded cluster b: {err:?}"),
1584        };
1585        let quic_b = cluster_b
1586            .transport()
1587            .as_quic()
1588            .cloned()
1589            .expect("quic transport b");
1590
1591        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1592        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1593        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1594
1595        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1596        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1597
1598        let canonical = AddrHash::new(&actor_path, 0);
1599        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1600        while !cluster_b.can_route(&actor_path) {
1601            if std::time::Instant::now() > deadline {
1602                panic!("bounded quic cluster route did not propagate");
1603            }
1604            tokio::time::sleep(Duration::from_millis(20)).await;
1605        }
1606        assert!(quic_b.can_route(canonical));
1607
1608        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1609        let response = remote
1610            .ask(EchoRequest { value: 15 })
1611            .await
1612            .expect("bounded cluster ask over quic");
1613        assert_eq!(response, EchoResponse { doubled: 30 });
1614
1615        shutdown_a_tx.send(()).ok();
1616        shutdown_b_tx.send(()).ok();
1617    }
1618
1619    #[tokio::test]
1620    async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
1621        if !quic_tests_available() {
1622            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1623            return;
1624        }
1625
1626        let (ca, ca_key) = make_ca();
1627        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1628        let engine = Engine::with_config(EngineConfig {
1629            engine_id: EngineId::new("engine-a.example"),
1630            ..Default::default()
1631        });
1632        let handle = engine.handle();
1633
1634        let mut cluster = match handle
1635            .attach_bounded_cluster(BoundedClusterConfig {
1636                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
1637                peers: vec![PeerSpec {
1638                    engine_id: EngineId::new("engine-b.example"),
1639                    addr: "127.0.0.1:4100".parse().unwrap(),
1640                    control_plane_addr: None,
1641                    server_name: None,
1642                }],
1643                declared_actors: Vec::new(),
1644                roles: Vec::new(),
1645            })
1646            .await
1647        {
1648            Ok(cluster) => cluster,
1649            Err(ClusterError::Transport(err)) => {
1650                eprintln!("skipping quic bounded-cluster test: {err:?}");
1651                return;
1652            }
1653            Err(err) => panic!("attach bounded cluster: {err:?}"),
1654        };
1655
1656        let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
1657        cluster
1658            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1659            .expect("declare remote actor");
1660
1661        assert!(cluster.can_route(&path));
1662        assert!(cluster
1663            .transport()
1664            .as_quic()
1665            .expect("quic transport")
1666            .can_route(AddrHash::new(&path, 0)));
1667    }
1668
1669    #[tokio::test]
1670    async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
1671        if !quic_tests_available() {
1672            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1673            return;
1674        }
1675
1676        let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
1677        let (ca, ca_key) = make_ca();
1678        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1679        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1680        let control_plane_b = reserve_udp_addr();
1681
1682        let engine_a = Engine::with_config(EngineConfig {
1683            engine_id: EngineId::new("engine-a.example"),
1684            ..Default::default()
1685        });
1686        let engine_b = Engine::with_config(EngineConfig {
1687            engine_id: EngineId::new("engine-b.example"),
1688            control_plane_quic_addr: Some(control_plane_b.to_string()),
1689            control_plane_tls: Some(tls_b.clone()),
1690            actor_spawn: Some(bounded_spawn_handler()),
1691            ..Default::default()
1692        });
1693
1694        let handle_a = engine_a.handle();
1695        let handle_b = engine_b.handle();
1696        handle_a
1697            .type_registry()
1698            .register_remote_ask::<EchoRequest>();
1699        handle_b
1700            .type_registry()
1701            .register_remote_ask::<EchoRequest>();
1702
1703        let cluster_a = match handle_a
1704            .attach_bounded_cluster(BoundedClusterConfig {
1705                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1706                peers: Vec::new(),
1707                declared_actors: Vec::new(),
1708                roles: Vec::new(),
1709            })
1710            .await
1711        {
1712            Ok(cluster) => cluster,
1713            Err(ClusterError::Transport(err)) => {
1714                eprintln!("skipping quic bounded-cluster test: {err:?}");
1715                return;
1716            }
1717            Err(err) => panic!("attach bounded cluster a: {err:?}"),
1718        };
1719        let quic_a = cluster_a
1720            .transport()
1721            .as_quic()
1722            .cloned()
1723            .expect("quic transport a");
1724
1725        let cluster_b = match handle_b
1726            .attach_bounded_cluster(BoundedClusterConfig {
1727                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1728                peers: vec![PeerSpec {
1729                    engine_id: EngineId::new("engine-a.example"),
1730                    addr: quic_a.local_addr(),
1731                    control_plane_addr: None,
1732                    server_name: None,
1733                }],
1734                declared_actors: Vec::new(),
1735                roles: Vec::new(),
1736            })
1737            .await
1738        {
1739            Ok(cluster) => cluster,
1740            Err(ClusterError::Transport(err)) => {
1741                eprintln!("skipping quic bounded-cluster test: {err:?}");
1742                return;
1743            }
1744            Err(err) => panic!("attach bounded cluster b: {err:?}"),
1745        };
1746        let quic_b = cluster_b
1747            .transport()
1748            .as_quic()
1749            .cloned()
1750            .expect("quic transport b");
1751
1752        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1753        let mut cluster_a = cluster_a;
1754        cluster_a
1755            .add_peer(PeerSpec {
1756                engine_id: EngineId::new("engine-b.example"),
1757                addr: quic_b.local_addr(),
1758                control_plane_addr: Some(control_plane_b),
1759                server_name: None,
1760            })
1761            .expect("add quic peer with control plane");
1762
1763        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1764        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1765
1766        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1767        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1768        tokio::time::sleep(Duration::from_millis(150)).await;
1769
1770        let remote = cluster_a
1771            .spawn_remote_on_role::<EchoRequest>(
1772                "context-service",
1773                RemoteSpawnSpec {
1774                    path: actor_path.clone(),
1775                    type_name: "bounded-echo".to_string(),
1776                    config: None,
1777                },
1778            )
1779            .await
1780            .expect("spawn remote actor over quic by role");
1781
1782        let response = remote
1783            .ask(EchoRequest { value: 9 })
1784            .await
1785            .expect("bounded remote spawn ask over quic by role");
1786        assert_eq!(response, EchoResponse { doubled: 18 });
1787
1788        let unknown_role = cluster_a
1789            .spawn_remote_on_role::<EchoRequest>(
1790                "missing-role",
1791                RemoteSpawnSpec {
1792                    path: ActorPath::parse("/user/missing-role").unwrap(),
1793                    type_name: "bounded-echo".to_string(),
1794                    config: None,
1795                },
1796            )
1797            .await
1798            .expect_err("unknown role should fail");
1799        assert_eq!(
1800            unknown_role,
1801            RemoteSpawnError::UnknownRole("missing-role".to_string())
1802        );
1803
1804        shutdown_a_tx.send(()).ok();
1805        shutdown_b_tx.send(()).ok();
1806    }
1807}