Skip to main content

palladium_runtime/
bounded_cluster.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
10use palladium_transport::network::{Network, TokioNetwork};
11use palladium_transport::{
12    QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport, TransportError,
13};
14use rustls::pki_types::ServerName;
15use serde_json::{json, Value};
16use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
17use tokio_rustls::TlsConnector;
18
19#[derive(Clone)]
20pub struct BoundedClusterConfig {
21    pub transport: BoundedTransportConfig,
22    pub peers: Vec<PeerSpec>,
23    pub declared_actors: Vec<DeclaredRemoteActor>,
24    pub roles: Vec<BoundedClusterRole>,
25}
26
27#[derive(Clone)]
28pub enum BoundedTransportConfig {
29    Tcp(TcpTransportConfig),
30    Quic(QuicTransportConfig),
31}
32
33impl BoundedTransportConfig {
34    pub fn engine_id(&self) -> &EngineId {
35        match self {
36            Self::Tcp(config) => &config.engine_id,
37            Self::Quic(config) => &config.engine_id,
38        }
39    }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct PeerSpec {
44    pub engine_id: EngineId,
45    pub addr: SocketAddr,
46    pub control_plane_addr: Option<SocketAddr>,
47    pub server_name: Option<String>,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct DeclaredRemoteActor {
52    pub path: ActorPath,
53    pub owner: EngineId,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct BoundedClusterRole {
58    pub name: String,
59    pub owner: EngineId,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum ClusterError {
64    DuplicatePeer(EngineId),
65    LocalPeerDeclared(EngineId),
66    DuplicateDeclaredActor(ActorPath),
67    LocalActorDeclaredRemote(ActorPath),
68    UnknownPeerOwner { path: ActorPath, owner: EngineId },
69    DuplicateRole(String),
70    UnknownRoleOwner { role: String, owner: EngineId },
71    Transport(TransportError),
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub struct RemoteSpawnSpec {
76    pub path: ActorPath,
77    pub type_name: String,
78    pub config: Option<Value>,
79}
80
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum RemoteSpawnError {
83    UnknownPeer(EngineId),
84    UnknownRole(String),
85    MissingControlPlaneEndpoint(EngineId),
86    InvalidServerName(String),
87    ControlPlane(String),
88    SpawnTimedOut(ActorPath),
89}
90
91#[derive(Clone)]
92pub(crate) enum ControlPlaneProtocol {
93    Tcp,
94    Quic,
95}
96
97#[derive(Clone)]
98pub(crate) struct ControlPlaneClientConfig {
99    pub(crate) protocol: ControlPlaneProtocol,
100    pub(crate) tls: palladium_transport::TlsConfig,
101    pub(crate) wait_timeout: Duration,
102}
103
104impl From<TransportError> for ClusterError {
105    fn from(value: TransportError) -> Self {
106        Self::Transport(value)
107    }
108}
109
110#[derive(Clone)]
111pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
112    Tcp(Arc<TcpTransport<R, N>>),
113    Quic(Arc<QuicTransport<R, N>>),
114}
115
116impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
117    pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
118        match self {
119            Self::Tcp(transport) => Some(transport),
120            Self::Quic(_) => None,
121        }
122    }
123
124    pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
125        match self {
126            Self::Tcp(_) => None,
127            Self::Quic(transport) => Some(transport),
128        }
129    }
130
131    pub fn add_peer(&self, peer: &PeerSpec) {
132        match self {
133            Self::Tcp(transport) => {
134                if let Some(server_name) = &peer.server_name {
135                    transport.add_peer_with_server_name(
136                        peer.engine_id.clone(),
137                        peer.addr,
138                        server_name.clone(),
139                    );
140                } else {
141                    transport.add_peer(peer.engine_id.clone(), peer.addr);
142                }
143            }
144            Self::Quic(transport) => {
145                if let Some(server_name) = &peer.server_name {
146                    transport.add_peer_with_server_name(
147                        peer.engine_id.clone(),
148                        peer.addr,
149                        server_name.clone(),
150                    );
151                } else {
152                    transport.add_peer(peer.engine_id.clone(), peer.addr);
153                }
154            }
155        }
156    }
157
158    pub fn declare_remote_path(
159        &self,
160        owner: EngineId,
161        path: &ActorPath,
162    ) -> Result<(), TransportError> {
163        match self {
164            Self::Tcp(transport) => transport.declare_remote_path(owner, path),
165            Self::Quic(transport) => transport.declare_remote_path(owner, path),
166        }
167    }
168
169    pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
170        match self {
171            Self::Tcp(transport) => transport.undeclare_remote_path(path),
172            Self::Quic(transport) => transport.undeclare_remote_path(path),
173        }
174    }
175
176    pub fn can_route(&self, destination: AddrHash) -> bool {
177        match self {
178            Self::Tcp(transport) => transport.can_route(destination),
179            Self::Quic(transport) => transport.can_route(destination),
180        }
181    }
182}
183
184#[derive(Clone)]
185pub struct BoundedClusterHandle<
186    R: Reactor = TokioReactor,
187    N: Network = TokioNetwork,
188    F: FileSystem = TokioFileSystem,
189> {
190    engine: EngineHandle<R, N, F>,
191    local_engine_id: EngineId,
192    control_plane: ControlPlaneClientConfig,
193    transport: BoundedTransportHandle<R, N>,
194    peers: Vec<PeerSpec>,
195    declared_actors: Vec<DeclaredRemoteActor>,
196    roles: Vec<BoundedClusterRole>,
197    registry: Arc<ActorRegistry<R>>,
198}
199
200impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
201    pub(crate) fn new(
202        engine: EngineHandle<R, N, F>,
203        local_engine_id: EngineId,
204        control_plane: ControlPlaneClientConfig,
205        transport: BoundedTransportHandle<R, N>,
206        peers: Vec<PeerSpec>,
207        roles: Vec<BoundedClusterRole>,
208        registry: Arc<ActorRegistry<R>>,
209    ) -> Self {
210        Self {
211            engine,
212            local_engine_id,
213            control_plane,
214            transport,
215            peers,
216            declared_actors: Vec::new(),
217            roles,
218            registry,
219        }
220    }
221
222    pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
223        &self.transport
224    }
225
226    pub fn peers(&self) -> &[PeerSpec] {
227        &self.peers
228    }
229
230    pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
231        &self.declared_actors
232    }
233
234    pub fn roles(&self) -> &[BoundedClusterRole] {
235        &self.roles
236    }
237
238    pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
239        validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
240        self.transport.add_peer(&peer);
241        self.peers.push(peer);
242        Ok(())
243    }
244
245    pub fn declare_remote_actor(
246        &mut self,
247        owner: EngineId,
248        path: ActorPath,
249    ) -> Result<(), ClusterError> {
250        validate_declared_actor(
251            &path,
252            &owner,
253            &self.local_engine_id,
254            &self.peers,
255            &self.declared_actors,
256            &self.registry,
257        )?;
258        self.transport.declare_remote_path(owner.clone(), &path)?;
259        self.declared_actors
260            .push(DeclaredRemoteActor { path, owner });
261        Ok(())
262    }
263
264    pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
265        self.transport.undeclare_remote_path(path)?;
266        self.declared_actors
267            .retain(|declared| &declared.path != path);
268        Ok(())
269    }
270
271    pub fn can_route(&self, path: &ActorPath) -> bool {
272        self.transport.can_route(AddrHash::new(path, 0))
273    }
274
275    pub async fn spawn_remote_on_role<M>(
276        &self,
277        role: &str,
278        spec: RemoteSpawnSpec,
279    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
280    where
281        M: RemoteMessage,
282        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
283        R: Send + Sync,
284    {
285        let owner = self
286            .roles
287            .iter()
288            .find(|declared| declared.name == role)
289            .map(|declared| declared.owner.clone())
290            .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
291        self.spawn_remote::<M>(owner, spec).await
292    }
293
294    pub async fn spawn_remote<M>(
295        &self,
296        target: EngineId,
297        spec: RemoteSpawnSpec,
298    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
299    where
300        M: RemoteMessage,
301        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
302        R: Send + Sync,
303    {
304        let peer = self
305            .peers
306            .iter()
307            .find(|peer| peer.engine_id == target)
308            .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
309        let control_plane_addr = peer
310            .control_plane_addr
311            .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
312        let server_name = peer
313            .server_name
314            .clone()
315            .unwrap_or_else(|| target.as_str().to_string());
316
317        call_actor_spawn(&self.control_plane, control_plane_addr, &server_name, &spec).await?;
318
319        let deadline = Instant::now() + self.control_plane.wait_timeout;
320        while Instant::now() <= deadline {
321            if self.can_route(&spec.path) {
322                return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
323            }
324            tokio::time::sleep(Duration::from_millis(20)).await;
325        }
326
327        Err(RemoteSpawnError::SpawnTimedOut(spec.path))
328    }
329}
330
331async fn call_actor_spawn(
332    client: &ControlPlaneClientConfig,
333    addr: SocketAddr,
334    server_name: &str,
335    spec: &RemoteSpawnSpec,
336) -> Result<(), RemoteSpawnError> {
337    let params = match &spec.config {
338        Some(config) => json!({
339            "path": spec.path.as_str(),
340            "type_name": spec.type_name,
341            "config": config,
342        }),
343        None => json!({
344            "path": spec.path.as_str(),
345            "type_name": spec.type_name,
346        }),
347    };
348    let request = json!({
349        "id": 1,
350        "method": "actor.spawn",
351        "params": params,
352    });
353
354    match client.protocol {
355        ControlPlaneProtocol::Tcp => {
356            call_control_plane_tcp(addr, server_name, &client.tls, &request).await?
357        }
358        ControlPlaneProtocol::Quic => {
359            call_control_plane_quic(addr, server_name, &client.tls, &request).await?
360        }
361    };
362    Ok(())
363}
364
365async fn call_control_plane_tcp(
366    addr: SocketAddr,
367    server_name: &str,
368    tls: &palladium_transport::TlsConfig,
369    request: &Value,
370) -> Result<Value, RemoteSpawnError> {
371    let config = tls
372        .client_config(server_name)
373        .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
374    let name = ServerName::try_from(server_name.to_string())
375        .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
376    let stream = tokio::net::TcpStream::connect(addr)
377        .await
378        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
379    let connector = TlsConnector::from(config);
380    let tls_stream = connector
381        .connect(name, stream)
382        .await
383        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
384
385    let (reader, mut writer) = tokio::io::split(tls_stream);
386    let mut body = request.to_string();
387    body.push('\n');
388    writer
389        .write_all(body.as_bytes())
390        .await
391        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
392    let mut line = String::new();
393    let mut reader = BufReader::new(reader);
394    reader
395        .read_line(&mut line)
396        .await
397        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
398    parse_control_plane_response(&line)
399}
400
401async fn call_control_plane_quic(
402    addr: SocketAddr,
403    server_name: &str,
404    tls: &palladium_transport::TlsConfig,
405    request: &Value,
406) -> Result<Value, RemoteSpawnError> {
407    ensure_rustls_provider();
408    let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
409        RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
410    })?)
411    .clone();
412    client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
413    let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
414
415    let client = s2n_quic::Client::builder()
416        .with_tls(client_tls)
417        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
418        .with_io("0.0.0.0:0")
419        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
420        .start()
421        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
422
423    let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
424    let mut conn = client
425        .connect(connect)
426        .await
427        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
428    let mut stream = conn
429        .open_bidirectional_stream()
430        .await
431        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
432
433    let mut body = request.to_string();
434    body.push('\n');
435    stream
436        .write_all(body.as_bytes())
437        .await
438        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
439    stream
440        .close()
441        .await
442        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
443
444    let mut buf = Vec::new();
445    stream
446        .read_to_end(&mut buf)
447        .await
448        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
449    let line = String::from_utf8_lossy(&buf);
450    parse_control_plane_response(&line)
451}
452
453fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
454    let response: Value = serde_json::from_str(line.trim())
455        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
456    if let Some(error) = response.get("error") {
457        let message = error
458            .get("message")
459            .and_then(Value::as_str)
460            .unwrap_or("unknown control-plane error");
461        return Err(RemoteSpawnError::ControlPlane(message.to_string()));
462    }
463    Ok(response.get("result").cloned().unwrap_or(Value::Null))
464}
465
466fn ensure_rustls_provider() {
467    static INIT: std::sync::Once = std::sync::Once::new();
468    INIT.call_once(|| {
469        #[cfg(feature = "aws-lc-rs")]
470        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
471        #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
472        let _ = rustls::crypto::ring::default_provider().install_default();
473    });
474}
475
476pub(crate) fn validate_config<R: Reactor>(
477    config: &BoundedClusterConfig,
478    registry: &Arc<ActorRegistry<R>>,
479) -> Result<(), ClusterError> {
480    let local_engine_id = config.transport.engine_id();
481    let mut seen_peers = Vec::new();
482    for peer in &config.peers {
483        validate_peer_spec(peer, local_engine_id, &seen_peers)?;
484        seen_peers.push(peer.clone());
485    }
486
487    let mut seen_declared = Vec::new();
488    for declared in &config.declared_actors {
489        validate_declared_actor(
490            &declared.path,
491            &declared.owner,
492            local_engine_id,
493            &config.peers,
494            &seen_declared,
495            registry,
496        )?;
497        seen_declared.push(declared.clone());
498    }
499
500    let mut seen_roles = Vec::new();
501    for role in &config.roles {
502        validate_role(role, &config.peers, &seen_roles)?;
503        seen_roles.push(role.clone());
504    }
505
506    Ok(())
507}
508
509fn validate_peer_spec(
510    peer: &PeerSpec,
511    local_engine_id: &EngineId,
512    existing: &[PeerSpec],
513) -> Result<(), ClusterError> {
514    if &peer.engine_id == local_engine_id {
515        return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
516    }
517    if existing
518        .iter()
519        .any(|existing| existing.engine_id == peer.engine_id)
520    {
521        return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
522    }
523    Ok(())
524}
525
526fn validate_declared_actor<R: Reactor>(
527    path: &ActorPath,
528    owner: &EngineId,
529    local_engine_id: &EngineId,
530    peers: &[PeerSpec],
531    existing: &[DeclaredRemoteActor],
532    registry: &Arc<ActorRegistry<R>>,
533) -> Result<(), ClusterError> {
534    if owner == local_engine_id || registry.get_by_path(path).is_some() {
535        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
536    }
537    if existing.iter().any(|existing| existing.path == *path) {
538        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
539    }
540    if !peers.iter().any(|peer| peer.engine_id == *owner) {
541        return Err(ClusterError::UnknownPeerOwner {
542            path: path.clone(),
543            owner: owner.clone(),
544        });
545    }
546    Ok(())
547}
548
549fn validate_role(
550    role: &BoundedClusterRole,
551    peers: &[PeerSpec],
552    existing: &[BoundedClusterRole],
553) -> Result<(), ClusterError> {
554    if existing.iter().any(|existing| existing.name == role.name) {
555        return Err(ClusterError::DuplicateRole(role.name.clone()));
556    }
557    if !peers.iter().any(|peer| peer.engine_id == role.owner) {
558        return Err(ClusterError::UnknownRoleOwner {
559            role: role.name.clone(),
560            owner: role.owner.clone(),
561        });
562    }
563    Ok(())
564}
565
566#[cfg(test)]
567mod tests {
568    use super::{
569        validate_config, BoundedClusterConfig, BoundedClusterRole, BoundedTransportConfig,
570        ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError, RemoteSpawnSpec,
571    };
572    use crate::{Engine, EngineConfig};
573    use palladium_actor::{
574        Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
575        Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
576    };
577    use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
578    use rcgen::{
579        BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
580        KeyUsagePurpose,
581    };
582    use std::net::{SocketAddr, TcpListener, UdpSocket};
583    use std::sync::Arc;
584    use std::time::{Duration, Instant};
585
586    fn make_ca() -> (Certificate, KeyPair) {
587        let mut params = CertificateParams::default();
588        params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
589        params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
590        let ca_key = KeyPair::generate().expect("ca keypair");
591        let cert = params.self_signed(&ca_key).expect("ca cert");
592        (cert, ca_key)
593    }
594
595    fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
596        let keypair = KeyPair::generate().expect("leaf keypair");
597        let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
598        let mut params = params;
599        params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
600        params.extended_key_usages = vec![
601            ExtendedKeyUsagePurpose::ServerAuth,
602            ExtendedKeyUsagePurpose::ClientAuth,
603        ];
604        let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
605        let cert_der = cert.der().to_vec();
606        let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
607        let ca_der = ca.der().to_vec();
608        TlsConfig {
609            cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
610            private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
611            trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
612        }
613    }
614
615    fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
616        TcpTransportConfig {
617            engine_id: EngineId::new(engine_id),
618            listen_addr: "127.0.0.1:0".parse().unwrap(),
619            max_connections_per_peer: 1,
620            idle_timeout: Duration::from_secs(1),
621            send_buffer_size: 8,
622            nodelay: true,
623            tls,
624            send_delay: Duration::from_millis(0),
625        }
626    }
627
628    fn reserve_tcp_addr() -> SocketAddr {
629        let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
630        let addr = listener.local_addr().expect("tcp local addr");
631        drop(listener);
632        addr
633    }
634
635    fn reserve_udp_addr() -> SocketAddr {
636        let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
637        let addr = socket.local_addr().expect("udp local addr");
638        drop(socket);
639        addr
640    }
641
642    #[test]
643    fn bounded_cluster_config_rejects_duplicate_peers() {
644        let (ca, ca_key) = make_ca();
645        let engine = Engine::with_config(EngineConfig {
646            engine_id: EngineId::new("engine-a.example"),
647            ..Default::default()
648        });
649        let config = BoundedClusterConfig {
650            transport: BoundedTransportConfig::Tcp(tcp_config(
651                "engine-a.example",
652                make_tls_config("engine-a.example", &ca, &ca_key),
653            )),
654            peers: vec![
655                PeerSpec {
656                    engine_id: EngineId::new("engine-b.example"),
657                    addr: "127.0.0.1:4100".parse().unwrap(),
658                    control_plane_addr: None,
659                    server_name: None,
660                },
661                PeerSpec {
662                    engine_id: EngineId::new("engine-b.example"),
663                    addr: "127.0.0.1:4200".parse().unwrap(),
664                    control_plane_addr: None,
665                    server_name: None,
666                },
667            ],
668            declared_actors: Vec::new(),
669            roles: Vec::new(),
670        };
671
672        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
673        assert_eq!(
674            err,
675            ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
676        );
677    }
678
679    #[test]
680    fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
681        let (ca, ca_key) = make_ca();
682        let engine = Engine::with_config(EngineConfig {
683            engine_id: EngineId::new("engine-a.example"),
684            ..Default::default()
685        });
686        let config = BoundedClusterConfig {
687            transport: BoundedTransportConfig::Tcp(tcp_config(
688                "engine-a.example",
689                make_tls_config("engine-a.example", &ca, &ca_key),
690            )),
691            peers: vec![PeerSpec {
692                engine_id: EngineId::new("engine-b.example"),
693                addr: "127.0.0.1:4100".parse().unwrap(),
694                control_plane_addr: None,
695                server_name: None,
696            }],
697            declared_actors: vec![DeclaredRemoteActor {
698                path: ActorPath::parse("/user/missing-owner").unwrap(),
699                owner: EngineId::new("engine-c.example"),
700            }],
701            roles: Vec::new(),
702        };
703
704        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
705        assert_eq!(
706            err,
707            ClusterError::UnknownPeerOwner {
708                path: ActorPath::parse("/user/missing-owner").unwrap(),
709                owner: EngineId::new("engine-c.example"),
710            }
711        );
712    }
713
714    #[test]
715    fn bounded_cluster_config_rejects_duplicate_roles() {
716        let (ca, ca_key) = make_ca();
717        let engine = Engine::with_config(EngineConfig {
718            engine_id: EngineId::new("engine-a.example"),
719            ..Default::default()
720        });
721        let config = BoundedClusterConfig {
722            transport: BoundedTransportConfig::Tcp(tcp_config(
723                "engine-a.example",
724                make_tls_config("engine-a.example", &ca, &ca_key),
725            )),
726            peers: vec![PeerSpec {
727                engine_id: EngineId::new("engine-b.example"),
728                addr: "127.0.0.1:4100".parse().unwrap(),
729                control_plane_addr: None,
730                server_name: None,
731            }],
732            declared_actors: Vec::new(),
733            roles: vec![
734                BoundedClusterRole {
735                    name: "context-service".to_string(),
736                    owner: EngineId::new("engine-b.example"),
737                },
738                BoundedClusterRole {
739                    name: "context-service".to_string(),
740                    owner: EngineId::new("engine-b.example"),
741                },
742            ],
743        };
744
745        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
746        assert_eq!(
747            err,
748            ClusterError::DuplicateRole("context-service".to_string())
749        );
750    }
751
752    #[test]
753    fn bounded_cluster_config_rejects_unknown_role_owner() {
754        let (ca, ca_key) = make_ca();
755        let engine = Engine::with_config(EngineConfig {
756            engine_id: EngineId::new("engine-a.example"),
757            ..Default::default()
758        });
759        let config = BoundedClusterConfig {
760            transport: BoundedTransportConfig::Tcp(tcp_config(
761                "engine-a.example",
762                make_tls_config("engine-a.example", &ca, &ca_key),
763            )),
764            peers: vec![PeerSpec {
765                engine_id: EngineId::new("engine-b.example"),
766                addr: "127.0.0.1:4100".parse().unwrap(),
767                control_plane_addr: None,
768                server_name: None,
769            }],
770            declared_actors: Vec::new(),
771            roles: vec![BoundedClusterRole {
772                name: "gateway".to_string(),
773                owner: EngineId::new("engine-c.example"),
774            }],
775        };
776
777        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
778        assert_eq!(
779            err,
780            ClusterError::UnknownRoleOwner {
781                role: "gateway".to_string(),
782                owner: EngineId::new("engine-c.example"),
783            }
784        );
785    }
786
787    fn tcp_tests_available() -> bool {
788        if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
789            return false;
790        }
791        std::net::TcpListener::bind("127.0.0.1:0").is_ok()
792    }
793
794    fn quic_tests_available() -> bool {
795        rustls::crypto::CryptoProvider::get_default().is_some()
796    }
797
798    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
799    struct EchoRequest {
800        value: u64,
801    }
802
803    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
804    struct EchoResponse {
805        doubled: u64,
806    }
807
808    impl Message for EchoRequest {
809        type Response = EchoResponse;
810        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
811    }
812
813    struct EchoActor;
814
815    impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
816        fn on_message(
817            &mut self,
818            ctx: &mut ActorContext<R>,
819            envelope: &Envelope,
820            payload: MessagePayload,
821        ) -> Result<(), ActorError> {
822            let req = payload
823                .extract::<EchoRequest>()
824                .map_err(|_| ActorError::Handler)?;
825            let resp = EchoResponse {
826                doubled: req.value * 2,
827            };
828            let resp_env = envelope.response(0);
829            ctx.send_raw(resp_env, MessagePayload::local(resp))
830                .map_err(|_| ActorError::Handler)
831        }
832    }
833
834    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
835    struct CrashRequest;
836
837    impl Message for CrashRequest {
838        type Response = ();
839        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
840    }
841
842    struct SpawnedEchoActor;
843
844    impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
845        fn on_message(
846            &mut self,
847            ctx: &mut ActorContext<R>,
848            envelope: &Envelope,
849            payload: MessagePayload,
850        ) -> Result<(), ActorError> {
851            match envelope.type_tag {
852                EchoRequest::TYPE_TAG => {
853                    let req = payload
854                        .extract::<EchoRequest>()
855                        .map_err(|_| ActorError::Handler)?;
856                    let resp = EchoResponse {
857                        doubled: req.value * 2,
858                    };
859                    let resp_env = envelope.response(0);
860                    ctx.send_raw(resp_env, MessagePayload::local(resp))
861                        .map_err(|_| ActorError::Handler)
862                }
863                CrashRequest::TYPE_TAG => Err(ActorError::Handler),
864                _ => Err(ActorError::Handler),
865            }
866        }
867    }
868
869    fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
870        Arc::new(|type_name: &str, _config: &[u8]| match type_name {
871            "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
872            _ => Err(format!("unknown remote spawn type: {type_name}")),
873        })
874    }
875
876    #[tokio::test]
877    async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
878        if !tcp_tests_available() {
879            eprintln!("skipping tcp integration tests: network bind not permitted");
880            return;
881        }
882
883        let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
884        let (ca, ca_key) = make_ca();
885        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
886        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
887
888        let mut engine_a = Engine::with_config(EngineConfig {
889            engine_id: EngineId::new("engine-a.example"),
890            ..Default::default()
891        });
892        let engine_b = Engine::with_config(EngineConfig {
893            engine_id: EngineId::new("engine-b.example"),
894            ..Default::default()
895        });
896
897        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
898        engine_a.add_user_actor(ChildSpec::new(
899            "cluster-echo",
900            RestartPolicy::Permanent,
901            ShutdownPolicy::Timeout(Duration::from_secs(1)),
902            ns,
903            move || Box::new(EchoActor),
904        ));
905
906        let handle_a = engine_a.handle();
907        let handle_b = engine_b.handle();
908        handle_a
909            .type_registry()
910            .register_remote_ask::<EchoRequest>();
911        handle_b
912            .type_registry()
913            .register_remote_ask::<EchoRequest>();
914
915        let cluster_a = handle_a
916            .attach_bounded_cluster(BoundedClusterConfig {
917                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
918                peers: Vec::new(),
919                declared_actors: Vec::new(),
920                roles: Vec::new(),
921            })
922            .await
923            .expect("attach bounded cluster a");
924        let tcp_a = cluster_a
925            .transport()
926            .as_tcp()
927            .cloned()
928            .expect("tcp transport a");
929
930        let cluster_b = handle_b
931            .attach_bounded_cluster(BoundedClusterConfig {
932                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
933                peers: vec![PeerSpec {
934                    engine_id: EngineId::new("engine-a.example"),
935                    addr: tcp_a.local_addr(),
936                    control_plane_addr: None,
937                    server_name: None,
938                }],
939                declared_actors: Vec::new(),
940                roles: Vec::new(),
941            })
942            .await
943            .expect("attach bounded cluster b");
944        let tcp_b = cluster_b
945            .transport()
946            .as_tcp()
947            .cloned()
948            .expect("tcp transport b");
949
950        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
951        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
952        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
953
954        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
955        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
956
957        let canonical = AddrHash::new(&actor_path, 0);
958        let deadline = std::time::Instant::now() + Duration::from_secs(5);
959        while !cluster_b.can_route(&actor_path) {
960            if std::time::Instant::now() > deadline {
961                panic!("bounded cluster route did not propagate");
962            }
963            tokio::time::sleep(Duration::from_millis(20)).await;
964        }
965        assert!(tcp_b.can_route(canonical));
966
967        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
968        let response = remote
969            .ask(EchoRequest { value: 12 })
970            .await
971            .expect("bounded cluster ask");
972        assert_eq!(response, EchoResponse { doubled: 24 });
973
974        shutdown_a_tx.send(()).ok();
975        shutdown_b_tx.send(()).ok();
976    }
977
978    #[tokio::test]
979    async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
980        if !tcp_tests_available() {
981            eprintln!("skipping tcp integration tests: network bind not permitted");
982            return;
983        }
984
985        let (ca, ca_key) = make_ca();
986        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
987        let engine = Engine::with_config(EngineConfig {
988            engine_id: EngineId::new("engine-a.example"),
989            ..Default::default()
990        });
991        let handle = engine.handle();
992
993        let mut cluster = handle
994            .attach_bounded_cluster(BoundedClusterConfig {
995                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
996                peers: vec![PeerSpec {
997                    engine_id: EngineId::new("engine-b.example"),
998                    addr: "127.0.0.1:4100".parse().unwrap(),
999                    control_plane_addr: None,
1000                    server_name: None,
1001                }],
1002                declared_actors: Vec::new(),
1003                roles: Vec::new(),
1004            })
1005            .await
1006            .expect("attach bounded cluster");
1007
1008        let path = ActorPath::parse("/user/declared-remote").unwrap();
1009        cluster
1010            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1011            .expect("declare remote actor");
1012
1013        assert!(cluster.can_route(&path));
1014        assert!(cluster
1015            .transport()
1016            .as_tcp()
1017            .expect("tcp transport")
1018            .can_route(AddrHash::new(&path, 0)));
1019    }
1020
1021    #[tokio::test]
1022    async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1023        if !tcp_tests_available() {
1024            eprintln!("skipping tcp integration tests: network bind not permitted");
1025            return;
1026        }
1027
1028        let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1029        let (ca, ca_key) = make_ca();
1030        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1031        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1032        let control_plane_b = reserve_tcp_addr();
1033
1034        let engine_a = Engine::with_config(EngineConfig {
1035            engine_id: EngineId::new("engine-a.example"),
1036            ..Default::default()
1037        });
1038        let engine_b = Engine::with_config(EngineConfig {
1039            engine_id: EngineId::new("engine-b.example"),
1040            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1041            control_plane_tls: Some(tls_b.clone()),
1042            actor_spawn: Some(bounded_spawn_handler()),
1043            ..Default::default()
1044        });
1045
1046        let handle_a = engine_a.handle();
1047        let handle_b = engine_b.handle();
1048        handle_a
1049            .type_registry()
1050            .register_remote_ask::<EchoRequest>();
1051        handle_b
1052            .type_registry()
1053            .register_remote_ask::<EchoRequest>();
1054
1055        let cluster_a = handle_a
1056            .attach_bounded_cluster(BoundedClusterConfig {
1057                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1058                peers: Vec::new(),
1059                declared_actors: Vec::new(),
1060                roles: Vec::new(),
1061            })
1062            .await
1063            .expect("attach bounded cluster a");
1064        let tcp_a = cluster_a
1065            .transport()
1066            .as_tcp()
1067            .cloned()
1068            .expect("tcp transport a");
1069
1070        let cluster_b = handle_b
1071            .attach_bounded_cluster(BoundedClusterConfig {
1072                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1073                peers: vec![PeerSpec {
1074                    engine_id: EngineId::new("engine-a.example"),
1075                    addr: tcp_a.local_addr(),
1076                    control_plane_addr: None,
1077                    server_name: None,
1078                }],
1079                declared_actors: Vec::new(),
1080                roles: Vec::new(),
1081            })
1082            .await
1083            .expect("attach bounded cluster b");
1084        let tcp_b = cluster_b
1085            .transport()
1086            .as_tcp()
1087            .cloned()
1088            .expect("tcp transport b");
1089
1090        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1091        let mut cluster_a = cluster_a;
1092        cluster_a
1093            .add_peer(PeerSpec {
1094                engine_id: EngineId::new("engine-b.example"),
1095                addr: tcp_b.local_addr(),
1096                control_plane_addr: Some(control_plane_b),
1097                server_name: None,
1098            })
1099            .expect("add peer with control plane");
1100
1101        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1102        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1103
1104        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1105        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1106        tokio::time::sleep(Duration::from_millis(150)).await;
1107
1108        let remote = cluster_a
1109            .spawn_remote::<EchoRequest>(
1110                EngineId::new("engine-b.example"),
1111                RemoteSpawnSpec {
1112                    path: actor_path.clone(),
1113                    type_name: "bounded-echo".to_string(),
1114                    config: None,
1115                },
1116            )
1117            .await
1118            .expect("spawn remote actor");
1119
1120        let response = remote
1121            .ask(EchoRequest { value: 21 })
1122            .await
1123            .expect("bounded remote spawn ask");
1124        assert_eq!(response, EchoResponse { doubled: 42 });
1125
1126        let duplicate = cluster_a
1127            .spawn_remote::<EchoRequest>(
1128                EngineId::new("engine-b.example"),
1129                RemoteSpawnSpec {
1130                    path: actor_path.clone(),
1131                    type_name: "bounded-echo".to_string(),
1132                    config: None,
1133                },
1134            )
1135            .await
1136            .expect_err("duplicate remote spawn should fail");
1137        assert!(
1138            matches!(duplicate, RemoteSpawnError::ControlPlane(ref message) if message.contains("actor already running")),
1139            "unexpected duplicate error: {duplicate:?}"
1140        );
1141
1142        let slot = handle_b
1143            .registry
1144            .get_by_path(&actor_path)
1145            .expect("spawned actor registered on host");
1146        handle_b.registry.increment_restart_count(slot.addr);
1147        slot.ctrl_tx
1148            .send(crate::common::LifecycleSignal::Stop(
1149                palladium_actor::StopReason::Killed,
1150            ))
1151            .ok();
1152
1153        let restart_deadline = Instant::now() + Duration::from_secs(5);
1154        loop {
1155            match remote.ask(EchoRequest { value: 7 }).await {
1156                Ok(response) => {
1157                    assert_eq!(response, EchoResponse { doubled: 14 });
1158                    break;
1159                }
1160                Err(_) if Instant::now() <= restart_deadline => {
1161                    tokio::time::sleep(Duration::from_millis(25)).await;
1162                }
1163                Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1164            }
1165        }
1166
1167        let unknown = cluster_a
1168            .spawn_remote::<EchoRequest>(
1169                EngineId::new("engine-c.example"),
1170                RemoteSpawnSpec {
1171                    path: ActorPath::parse("/user/unknown-target").unwrap(),
1172                    type_name: "bounded-echo".to_string(),
1173                    config: None,
1174                },
1175            )
1176            .await
1177            .expect_err("unknown target should fail");
1178        assert_eq!(
1179            unknown,
1180            RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1181        );
1182
1183        shutdown_a_tx.send(()).ok();
1184        shutdown_b_tx.send(()).ok();
1185    }
1186
1187    #[tokio::test]
1188    async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1189        if !tcp_tests_available() {
1190            eprintln!("skipping tcp integration tests: network bind not permitted");
1191            return;
1192        }
1193
1194        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1195        let (ca, ca_key) = make_ca();
1196        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1197        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1198        let control_plane_b = reserve_tcp_addr();
1199
1200        let engine_a = Engine::with_config(EngineConfig {
1201            engine_id: EngineId::new("engine-a.example"),
1202            ..Default::default()
1203        });
1204        let engine_b = Engine::with_config(EngineConfig {
1205            engine_id: EngineId::new("engine-b.example"),
1206            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1207            control_plane_tls: Some(tls_b.clone()),
1208            actor_spawn: Some(bounded_spawn_handler()),
1209            ..Default::default()
1210        });
1211
1212        let handle_a = engine_a.handle();
1213        let handle_b = engine_b.handle();
1214        handle_a
1215            .type_registry()
1216            .register_remote_ask::<EchoRequest>();
1217        handle_b
1218            .type_registry()
1219            .register_remote_ask::<EchoRequest>();
1220
1221        let cluster_b = handle_b
1222            .attach_bounded_cluster(BoundedClusterConfig {
1223                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1224                peers: Vec::new(),
1225                declared_actors: Vec::new(),
1226                roles: Vec::new(),
1227            })
1228            .await
1229            .expect("attach bounded cluster b");
1230        let tcp_b = cluster_b
1231            .transport()
1232            .as_tcp()
1233            .cloned()
1234            .expect("tcp transport b");
1235
1236        let cluster_a = handle_a
1237            .attach_bounded_cluster(BoundedClusterConfig {
1238                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1239                peers: vec![PeerSpec {
1240                    engine_id: EngineId::new("engine-b.example"),
1241                    addr: tcp_b.local_addr(),
1242                    control_plane_addr: Some(control_plane_b),
1243                    server_name: None,
1244                }],
1245                declared_actors: Vec::new(),
1246                roles: vec![BoundedClusterRole {
1247                    name: "context-service".to_string(),
1248                    owner: EngineId::new("engine-b.example"),
1249                }],
1250            })
1251            .await
1252            .expect("attach bounded cluster a");
1253        let tcp_a = cluster_a
1254            .transport()
1255            .as_tcp()
1256            .cloned()
1257            .expect("tcp transport a");
1258
1259        let mut cluster_b = cluster_b;
1260        cluster_b
1261            .add_peer(PeerSpec {
1262                engine_id: EngineId::new("engine-a.example"),
1263                addr: tcp_a.local_addr(),
1264                control_plane_addr: None,
1265                server_name: None,
1266            })
1267            .expect("add peer a to cluster b");
1268
1269        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1270        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1271
1272        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1273        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1274        tokio::time::sleep(Duration::from_millis(150)).await;
1275
1276        let remote = cluster_a
1277            .spawn_remote_on_role::<EchoRequest>(
1278                "context-service",
1279                RemoteSpawnSpec {
1280                    path: actor_path.clone(),
1281                    type_name: "bounded-echo".to_string(),
1282                    config: None,
1283                },
1284            )
1285            .await
1286            .expect("spawn remote actor by role over tcp");
1287
1288        let response = remote
1289            .ask(EchoRequest { value: 11 })
1290            .await
1291            .expect("bounded remote spawn ask over tcp by role");
1292        assert_eq!(response, EchoResponse { doubled: 22 });
1293
1294        let unknown_role = cluster_a
1295            .spawn_remote_on_role::<EchoRequest>(
1296                "missing-role",
1297                RemoteSpawnSpec {
1298                    path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1299                    type_name: "bounded-echo".to_string(),
1300                    config: None,
1301                },
1302            )
1303            .await
1304            .expect_err("unknown role should fail");
1305        assert_eq!(
1306            unknown_role,
1307            RemoteSpawnError::UnknownRole("missing-role".to_string())
1308        );
1309
1310        shutdown_a_tx.send(()).ok();
1311        shutdown_b_tx.send(()).ok();
1312    }
1313
1314    fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
1315        QuicTransportConfig {
1316            engine_id: EngineId::new(engine_id),
1317            listen_addr: "127.0.0.1:0".parse().unwrap(),
1318            send_buffer_size: 1024,
1319            idle_timeout: Duration::from_secs(2),
1320            tls,
1321        }
1322    }
1323
1324    #[tokio::test]
1325    async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
1326        if !quic_tests_available() {
1327            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1328            return;
1329        }
1330
1331        let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
1332        let (ca, ca_key) = make_ca();
1333        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1334        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1335
1336        let mut engine_a = Engine::with_config(EngineConfig {
1337            engine_id: EngineId::new("engine-a.example"),
1338            ..Default::default()
1339        });
1340        let engine_b = Engine::with_config(EngineConfig {
1341            engine_id: EngineId::new("engine-b.example"),
1342            ..Default::default()
1343        });
1344
1345        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1346        engine_a.add_user_actor(ChildSpec::new(
1347            "cluster-echo-quic",
1348            RestartPolicy::Permanent,
1349            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1350            ns,
1351            move || Box::new(EchoActor),
1352        ));
1353
1354        let handle_a = engine_a.handle();
1355        let handle_b = engine_b.handle();
1356        handle_a
1357            .type_registry()
1358            .register_remote_ask::<EchoRequest>();
1359        handle_b
1360            .type_registry()
1361            .register_remote_ask::<EchoRequest>();
1362
1363        let cluster_a = match handle_a
1364            .attach_bounded_cluster(BoundedClusterConfig {
1365                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1366                peers: Vec::new(),
1367                declared_actors: Vec::new(),
1368                roles: vec![BoundedClusterRole {
1369                    name: "context-service".to_string(),
1370                    owner: EngineId::new("engine-b.example"),
1371                }],
1372            })
1373            .await
1374        {
1375            Ok(cluster) => cluster,
1376            Err(ClusterError::Transport(err)) => {
1377                eprintln!("skipping quic bounded-cluster test: {err:?}");
1378                return;
1379            }
1380            Err(err) => panic!("attach bounded cluster a: {err:?}"),
1381        };
1382        let quic_a = cluster_a
1383            .transport()
1384            .as_quic()
1385            .cloned()
1386            .expect("quic transport a");
1387
1388        let cluster_b = match handle_b
1389            .attach_bounded_cluster(BoundedClusterConfig {
1390                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1391                peers: vec![PeerSpec {
1392                    engine_id: EngineId::new("engine-a.example"),
1393                    addr: quic_a.local_addr(),
1394                    control_plane_addr: None,
1395                    server_name: None,
1396                }],
1397                declared_actors: Vec::new(),
1398                roles: Vec::new(),
1399            })
1400            .await
1401        {
1402            Ok(cluster) => cluster,
1403            Err(ClusterError::Transport(err)) => {
1404                eprintln!("skipping quic bounded-cluster test: {err:?}");
1405                return;
1406            }
1407            Err(err) => panic!("attach bounded cluster b: {err:?}"),
1408        };
1409        let quic_b = cluster_b
1410            .transport()
1411            .as_quic()
1412            .cloned()
1413            .expect("quic transport b");
1414
1415        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1416        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1417        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1418
1419        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1420        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1421
1422        let canonical = AddrHash::new(&actor_path, 0);
1423        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1424        while !cluster_b.can_route(&actor_path) {
1425            if std::time::Instant::now() > deadline {
1426                panic!("bounded quic cluster route did not propagate");
1427            }
1428            tokio::time::sleep(Duration::from_millis(20)).await;
1429        }
1430        assert!(quic_b.can_route(canonical));
1431
1432        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1433        let response = remote
1434            .ask(EchoRequest { value: 15 })
1435            .await
1436            .expect("bounded cluster ask over quic");
1437        assert_eq!(response, EchoResponse { doubled: 30 });
1438
1439        shutdown_a_tx.send(()).ok();
1440        shutdown_b_tx.send(()).ok();
1441    }
1442
1443    #[tokio::test]
1444    async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
1445        if !quic_tests_available() {
1446            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1447            return;
1448        }
1449
1450        let (ca, ca_key) = make_ca();
1451        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1452        let engine = Engine::with_config(EngineConfig {
1453            engine_id: EngineId::new("engine-a.example"),
1454            ..Default::default()
1455        });
1456        let handle = engine.handle();
1457
1458        let mut cluster = match handle
1459            .attach_bounded_cluster(BoundedClusterConfig {
1460                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
1461                peers: vec![PeerSpec {
1462                    engine_id: EngineId::new("engine-b.example"),
1463                    addr: "127.0.0.1:4100".parse().unwrap(),
1464                    control_plane_addr: None,
1465                    server_name: None,
1466                }],
1467                declared_actors: Vec::new(),
1468                roles: Vec::new(),
1469            })
1470            .await
1471        {
1472            Ok(cluster) => cluster,
1473            Err(ClusterError::Transport(err)) => {
1474                eprintln!("skipping quic bounded-cluster test: {err:?}");
1475                return;
1476            }
1477            Err(err) => panic!("attach bounded cluster: {err:?}"),
1478        };
1479
1480        let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
1481        cluster
1482            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1483            .expect("declare remote actor");
1484
1485        assert!(cluster.can_route(&path));
1486        assert!(cluster
1487            .transport()
1488            .as_quic()
1489            .expect("quic transport")
1490            .can_route(AddrHash::new(&path, 0)));
1491    }
1492
1493    #[tokio::test]
1494    async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
1495        if !quic_tests_available() {
1496            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1497            return;
1498        }
1499
1500        let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
1501        let (ca, ca_key) = make_ca();
1502        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1503        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1504        let control_plane_b = reserve_udp_addr();
1505
1506        let engine_a = Engine::with_config(EngineConfig {
1507            engine_id: EngineId::new("engine-a.example"),
1508            ..Default::default()
1509        });
1510        let engine_b = Engine::with_config(EngineConfig {
1511            engine_id: EngineId::new("engine-b.example"),
1512            control_plane_quic_addr: Some(control_plane_b.to_string()),
1513            control_plane_tls: Some(tls_b.clone()),
1514            actor_spawn: Some(bounded_spawn_handler()),
1515            ..Default::default()
1516        });
1517
1518        let handle_a = engine_a.handle();
1519        let handle_b = engine_b.handle();
1520        handle_a
1521            .type_registry()
1522            .register_remote_ask::<EchoRequest>();
1523        handle_b
1524            .type_registry()
1525            .register_remote_ask::<EchoRequest>();
1526
1527        let cluster_a = match handle_a
1528            .attach_bounded_cluster(BoundedClusterConfig {
1529                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1530                peers: Vec::new(),
1531                declared_actors: Vec::new(),
1532                roles: Vec::new(),
1533            })
1534            .await
1535        {
1536            Ok(cluster) => cluster,
1537            Err(ClusterError::Transport(err)) => {
1538                eprintln!("skipping quic bounded-cluster test: {err:?}");
1539                return;
1540            }
1541            Err(err) => panic!("attach bounded cluster a: {err:?}"),
1542        };
1543        let quic_a = cluster_a
1544            .transport()
1545            .as_quic()
1546            .cloned()
1547            .expect("quic transport a");
1548
1549        let cluster_b = match handle_b
1550            .attach_bounded_cluster(BoundedClusterConfig {
1551                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1552                peers: vec![PeerSpec {
1553                    engine_id: EngineId::new("engine-a.example"),
1554                    addr: quic_a.local_addr(),
1555                    control_plane_addr: None,
1556                    server_name: None,
1557                }],
1558                declared_actors: Vec::new(),
1559                roles: Vec::new(),
1560            })
1561            .await
1562        {
1563            Ok(cluster) => cluster,
1564            Err(ClusterError::Transport(err)) => {
1565                eprintln!("skipping quic bounded-cluster test: {err:?}");
1566                return;
1567            }
1568            Err(err) => panic!("attach bounded cluster b: {err:?}"),
1569        };
1570        let quic_b = cluster_b
1571            .transport()
1572            .as_quic()
1573            .cloned()
1574            .expect("quic transport b");
1575
1576        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1577        let mut cluster_a = cluster_a;
1578        cluster_a
1579            .add_peer(PeerSpec {
1580                engine_id: EngineId::new("engine-b.example"),
1581                addr: quic_b.local_addr(),
1582                control_plane_addr: Some(control_plane_b),
1583                server_name: None,
1584            })
1585            .expect("add quic peer with control plane");
1586
1587        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1588        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1589
1590        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1591        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1592        tokio::time::sleep(Duration::from_millis(150)).await;
1593
1594        let remote = cluster_a
1595            .spawn_remote_on_role::<EchoRequest>(
1596                "context-service",
1597                RemoteSpawnSpec {
1598                    path: actor_path.clone(),
1599                    type_name: "bounded-echo".to_string(),
1600                    config: None,
1601                },
1602            )
1603            .await
1604            .expect("spawn remote actor over quic by role");
1605
1606        let response = remote
1607            .ask(EchoRequest { value: 9 })
1608            .await
1609            .expect("bounded remote spawn ask over quic by role");
1610        assert_eq!(response, EchoResponse { doubled: 18 });
1611
1612        let unknown_role = cluster_a
1613            .spawn_remote_on_role::<EchoRequest>(
1614                "missing-role",
1615                RemoteSpawnSpec {
1616                    path: ActorPath::parse("/user/missing-role").unwrap(),
1617                    type_name: "bounded-echo".to_string(),
1618                    config: None,
1619                },
1620            )
1621            .await
1622            .expect_err("unknown role should fail");
1623        assert_eq!(
1624            unknown_role,
1625            RemoteSpawnError::UnknownRole("missing-role".to_string())
1626        );
1627
1628        shutdown_a_tx.send(()).ok();
1629        shutdown_b_tx.send(()).ok();
1630    }
1631}