Skip to main content

palladium_runtime/
bounded_cluster.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
10use palladium_transport::network::{Network, TokioNetwork};
11use palladium_transport::{
12    QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport, TransportError,
13};
14use rustls::pki_types::ServerName;
15use serde_json::{json, Value};
16use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
17use tokio_rustls::TlsConnector;
18
19#[derive(Clone)]
20pub struct BoundedClusterConfig {
21    pub transport: BoundedTransportConfig,
22    pub peers: Vec<PeerSpec>,
23    pub declared_actors: Vec<DeclaredRemoteActor>,
24    pub roles: Vec<BoundedClusterRole>,
25}
26
27#[derive(Clone)]
28pub enum BoundedTransportConfig {
29    Tcp(TcpTransportConfig),
30    Quic(QuicTransportConfig),
31}
32
33impl BoundedTransportConfig {
34    pub fn engine_id(&self) -> &EngineId {
35        match self {
36            Self::Tcp(config) => &config.engine_id,
37            Self::Quic(config) => &config.engine_id,
38        }
39    }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct PeerSpec {
44    pub engine_id: EngineId,
45    pub addr: SocketAddr,
46    pub control_plane_addr: Option<SocketAddr>,
47    pub server_name: Option<String>,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct DeclaredRemoteActor {
52    pub path: ActorPath,
53    pub owner: EngineId,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct BoundedClusterRole {
58    pub name: String,
59    pub owner: EngineId,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum ClusterError {
64    DuplicatePeer(EngineId),
65    LocalPeerDeclared(EngineId),
66    DuplicateDeclaredActor(ActorPath),
67    LocalActorDeclaredRemote(ActorPath),
68    UnknownPeerOwner { path: ActorPath, owner: EngineId },
69    DuplicateRole(String),
70    UnknownRoleOwner { role: String, owner: EngineId },
71    Transport(TransportError),
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub struct RemoteSpawnSpec {
76    pub path: ActorPath,
77    pub type_name: String,
78    pub config: Option<Value>,
79}
80
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum RemoteSpawnError {
83    UnknownPeer(EngineId),
84    UnknownRole(String),
85    LocalPathCollision(ActorPath),
86    RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
87    RemotePathAlreadyRunning(ActorPath),
88    MissingControlPlaneEndpoint(EngineId),
89    InvalidServerName(String),
90    ControlPlane(String),
91    SpawnTimedOut(ActorPath),
92}
93
94#[derive(Clone)]
95pub(crate) enum ControlPlaneProtocol {
96    Tcp,
97    Quic,
98}
99
100#[derive(Clone)]
101pub(crate) struct ControlPlaneClientConfig {
102    pub(crate) protocol: ControlPlaneProtocol,
103    pub(crate) tls: palladium_transport::TlsConfig,
104    pub(crate) wait_timeout: Duration,
105}
106
107const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
108
109impl From<TransportError> for ClusterError {
110    fn from(value: TransportError) -> Self {
111        Self::Transport(value)
112    }
113}
114
115impl From<TransportError> for RemoteSpawnError {
116    fn from(value: TransportError) -> Self {
117        Self::ControlPlane(format!("transport route install failed: {value:?}"))
118    }
119}
120
121#[derive(Clone)]
122pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
123    Tcp(Arc<TcpTransport<R, N>>),
124    Quic(Arc<QuicTransport<R, N>>),
125}
126
127impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
128    pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
129        match self {
130            Self::Tcp(transport) => Some(transport),
131            Self::Quic(_) => None,
132        }
133    }
134
135    pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
136        match self {
137            Self::Tcp(_) => None,
138            Self::Quic(transport) => Some(transport),
139        }
140    }
141
142    pub fn add_peer(&self, peer: &PeerSpec) {
143        match self {
144            Self::Tcp(transport) => {
145                if let Some(server_name) = &peer.server_name {
146                    transport.add_peer_with_server_name(
147                        peer.engine_id.clone(),
148                        peer.addr,
149                        server_name.clone(),
150                    );
151                } else {
152                    transport.add_peer(peer.engine_id.clone(), peer.addr);
153                }
154            }
155            Self::Quic(transport) => {
156                if let Some(server_name) = &peer.server_name {
157                    transport.add_peer_with_server_name(
158                        peer.engine_id.clone(),
159                        peer.addr,
160                        server_name.clone(),
161                    );
162                } else {
163                    transport.add_peer(peer.engine_id.clone(), peer.addr);
164                }
165            }
166        }
167    }
168
169    pub fn declare_remote_path(
170        &self,
171        owner: EngineId,
172        path: &ActorPath,
173    ) -> Result<(), TransportError> {
174        match self {
175            Self::Tcp(transport) => transport.declare_remote_path(owner, path),
176            Self::Quic(transport) => transport.declare_remote_path(owner, path),
177        }
178    }
179
180    pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
181        match self {
182            Self::Tcp(transport) => transport.undeclare_remote_path(path),
183            Self::Quic(transport) => transport.undeclare_remote_path(path),
184        }
185    }
186
187    pub fn can_route(&self, destination: AddrHash) -> bool {
188        match self {
189            Self::Tcp(transport) => transport.can_route(destination),
190            Self::Quic(transport) => transport.can_route(destination),
191        }
192    }
193}
194
195#[derive(Clone)]
196pub struct BoundedClusterHandle<
197    R: Reactor = TokioReactor,
198    N: Network = TokioNetwork,
199    F: FileSystem = TokioFileSystem,
200> {
201    engine: EngineHandle<R, N, F>,
202    local_engine_id: EngineId,
203    control_plane: ControlPlaneClientConfig,
204    transport: BoundedTransportHandle<R, N>,
205    peers: Vec<PeerSpec>,
206    declared_actors: Vec<DeclaredRemoteActor>,
207    roles: Vec<BoundedClusterRole>,
208    registry: Arc<ActorRegistry<R>>,
209}
210
211impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
212    pub(crate) fn new(
213        engine: EngineHandle<R, N, F>,
214        local_engine_id: EngineId,
215        control_plane: ControlPlaneClientConfig,
216        transport: BoundedTransportHandle<R, N>,
217        peers: Vec<PeerSpec>,
218        roles: Vec<BoundedClusterRole>,
219        registry: Arc<ActorRegistry<R>>,
220    ) -> Self {
221        Self {
222            engine,
223            local_engine_id,
224            control_plane,
225            transport,
226            peers,
227            declared_actors: Vec::new(),
228            roles,
229            registry,
230        }
231    }
232
233    pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
234        &self.transport
235    }
236
237    pub fn peers(&self) -> &[PeerSpec] {
238        &self.peers
239    }
240
241    pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
242        &self.declared_actors
243    }
244
245    pub fn roles(&self) -> &[BoundedClusterRole] {
246        &self.roles
247    }
248
249    pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
250        validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
251        self.transport.add_peer(&peer);
252        self.peers.push(peer);
253        Ok(())
254    }
255
256    pub fn declare_remote_actor(
257        &mut self,
258        owner: EngineId,
259        path: ActorPath,
260    ) -> Result<(), ClusterError> {
261        validate_declared_actor(
262            &path,
263            &owner,
264            &self.local_engine_id,
265            &self.peers,
266            &self.declared_actors,
267            &self.registry,
268        )?;
269        self.transport.declare_remote_path(owner.clone(), &path)?;
270        self.declared_actors
271            .push(DeclaredRemoteActor { path, owner });
272        Ok(())
273    }
274
275    pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
276        self.transport.undeclare_remote_path(path)?;
277        self.declared_actors
278            .retain(|declared| &declared.path != path);
279        Ok(())
280    }
281
282    pub fn can_route(&self, path: &ActorPath) -> bool {
283        self.transport.can_route(AddrHash::new(path, 0))
284    }
285
286    pub async fn spawn_remote_on_role<M>(
287        &self,
288        role: &str,
289        spec: RemoteSpawnSpec,
290    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
291    where
292        M: RemoteMessage,
293        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
294        R: Send + Sync,
295    {
296        let owner = self
297            .roles
298            .iter()
299            .find(|declared| declared.name == role)
300            .map(|declared| declared.owner.clone())
301            .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
302        self.spawn_remote::<M>(owner, spec).await
303    }
304
305    pub async fn spawn_remote<M>(
306        &self,
307        target: EngineId,
308        spec: RemoteSpawnSpec,
309    ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
310    where
311        M: RemoteMessage,
312        M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
313        R: Send + Sync,
314    {
315        validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
316
317        let peer = self
318            .peers
319            .iter()
320            .find(|peer| peer.engine_id == target)
321            .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
322        let control_plane_addr = peer
323            .control_plane_addr
324            .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
325        let server_name = peer
326            .server_name
327            .clone()
328            .unwrap_or_else(|| target.as_str().to_string());
329
330        call_actor_spawn(&self.control_plane, control_plane_addr, &server_name, &spec).await?;
331
332        let deadline = Instant::now() + self.control_plane.wait_timeout;
333        while Instant::now() <= deadline {
334            if actor_is_spawned(
335                &self.control_plane,
336                control_plane_addr,
337                &server_name,
338                &spec.path,
339            )
340            .await?
341            {
342                self.transport
343                    .declare_remote_path(target.clone(), &spec.path)
344                    .map_err(RemoteSpawnError::from)?;
345                return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
346            }
347            tokio::time::sleep(Duration::from_millis(20)).await;
348        }
349
350        Err(RemoteSpawnError::SpawnTimedOut(spec.path))
351    }
352}
353
354async fn call_actor_spawn(
355    client: &ControlPlaneClientConfig,
356    addr: SocketAddr,
357    server_name: &str,
358    spec: &RemoteSpawnSpec,
359) -> Result<(), RemoteSpawnError> {
360    let params = match &spec.config {
361        Some(config) => json!({
362            "path": spec.path.as_str(),
363            "type_name": spec.type_name,
364            "config": config,
365        }),
366        None => json!({
367            "path": spec.path.as_str(),
368            "type_name": spec.type_name,
369        }),
370    };
371    match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
372        Ok(_) => {}
373        Err(RemoteSpawnError::ControlPlane(message))
374            if message.contains("actor already running at") =>
375        {
376            return Err(RemoteSpawnError::RemotePathAlreadyRunning(
377                spec.path.clone(),
378            ));
379        }
380        Err(err) => return Err(err),
381    }
382    Ok(())
383}
384
385fn validate_remote_spawn_request<R: Reactor>(
386    path: &ActorPath,
387    owner: &EngineId,
388    declared_actors: &[DeclaredRemoteActor],
389    registry: &Arc<ActorRegistry<R>>,
390) -> Result<(), RemoteSpawnError> {
391    if registry.get_by_path(path).is_some() {
392        return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
393    }
394
395    if let Some(declared) = declared_actors
396        .iter()
397        .find(|declared| declared.path == *path && declared.owner != *owner)
398    {
399        return Err(RemoteSpawnError::RemoteOwnershipConflict {
400            path: path.clone(),
401            owner: declared.owner.clone(),
402        });
403    }
404
405    Ok(())
406}
407
408async fn actor_is_spawned(
409    client: &ControlPlaneClientConfig,
410    addr: SocketAddr,
411    server_name: &str,
412    path: &ActorPath,
413) -> Result<bool, RemoteSpawnError> {
414    match call_control_plane(
415        client,
416        addr,
417        server_name,
418        "actor.info",
419        json!({ "path": path.as_str() }),
420    )
421    .await
422    {
423        Ok(_) => Ok(true),
424        Err(RemoteSpawnError::ControlPlane(message))
425            if message.contains("actor not found:")
426                || message.contains("actor not found")
427                || message.contains("User supervisor not available") =>
428        {
429            Ok(false)
430        }
431        Err(err) => Err(err),
432    }
433}
434
435async fn call_control_plane(
436    client: &ControlPlaneClientConfig,
437    addr: SocketAddr,
438    server_name: &str,
439    method: &str,
440    params: Value,
441) -> Result<Value, RemoteSpawnError> {
442    let request = json!({
443        "id": 1,
444        "method": method,
445        "params": params,
446    });
447
448    match client.protocol {
449        ControlPlaneProtocol::Tcp => {
450            call_control_plane_tcp(addr, server_name, &client.tls, &request).await
451        }
452        ControlPlaneProtocol::Quic => {
453            call_control_plane_quic(addr, server_name, &client.tls, &request).await
454        }
455    }
456}
457
458async fn call_control_plane_tcp(
459    addr: SocketAddr,
460    server_name: &str,
461    tls: &palladium_transport::TlsConfig,
462    request: &Value,
463) -> Result<Value, RemoteSpawnError> {
464    let config = tls
465        .client_config(server_name)
466        .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
467    let name = ServerName::try_from(server_name.to_string())
468        .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
469    let stream = tokio::net::TcpStream::connect(addr)
470        .await
471        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
472    let connector = TlsConnector::from(config);
473    let tls_stream = connector
474        .connect(name, stream)
475        .await
476        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
477
478    let (reader, mut writer) = tokio::io::split(tls_stream);
479    let mut body = request.to_string();
480    body.push('\n');
481    writer
482        .write_all(body.as_bytes())
483        .await
484        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
485    let mut line = String::new();
486    let mut reader = BufReader::new(reader);
487    reader
488        .read_line(&mut line)
489        .await
490        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
491    parse_control_plane_response(&line)
492}
493
494async fn call_control_plane_quic(
495    addr: SocketAddr,
496    server_name: &str,
497    tls: &palladium_transport::TlsConfig,
498    request: &Value,
499) -> Result<Value, RemoteSpawnError> {
500    ensure_rustls_provider();
501    let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
502        RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
503    })?)
504    .clone();
505    client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
506    let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
507
508    let client = s2n_quic::Client::builder()
509        .with_tls(client_tls)
510        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
511        .with_io("0.0.0.0:0")
512        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
513        .start()
514        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
515
516    let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
517    let mut conn = client
518        .connect(connect)
519        .await
520        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
521    let mut stream = conn
522        .open_bidirectional_stream()
523        .await
524        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
525
526    let mut body = request.to_string();
527    body.push('\n');
528    stream
529        .write_all(body.as_bytes())
530        .await
531        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
532    stream
533        .close()
534        .await
535        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
536
537    let mut buf = Vec::new();
538    stream
539        .read_to_end(&mut buf)
540        .await
541        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
542    let line = String::from_utf8_lossy(&buf);
543    parse_control_plane_response(&line)
544}
545
546fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
547    let response: Value = serde_json::from_str(line.trim())
548        .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
549    if let Some(error) = response.get("error") {
550        let message = error
551            .get("message")
552            .and_then(Value::as_str)
553            .unwrap_or("unknown control-plane error");
554        return Err(RemoteSpawnError::ControlPlane(message.to_string()));
555    }
556    Ok(response.get("result").cloned().unwrap_or(Value::Null))
557}
558
559fn ensure_rustls_provider() {
560    static INIT: std::sync::Once = std::sync::Once::new();
561    INIT.call_once(|| {
562        #[cfg(feature = "aws-lc-rs")]
563        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
564        #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
565        let _ = rustls::crypto::ring::default_provider().install_default();
566    });
567}
568
569pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
570    ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
571}
572
573pub(crate) fn validate_config<R: Reactor>(
574    config: &BoundedClusterConfig,
575    registry: &Arc<ActorRegistry<R>>,
576) -> Result<(), ClusterError> {
577    let local_engine_id = config.transport.engine_id();
578    let mut seen_peers = Vec::new();
579    for peer in &config.peers {
580        validate_peer_spec(peer, local_engine_id, &seen_peers)?;
581        seen_peers.push(peer.clone());
582    }
583
584    let mut seen_declared = Vec::new();
585    for declared in &config.declared_actors {
586        validate_declared_actor(
587            &declared.path,
588            &declared.owner,
589            local_engine_id,
590            &config.peers,
591            &seen_declared,
592            registry,
593        )?;
594        seen_declared.push(declared.clone());
595    }
596
597    let mut seen_roles = Vec::new();
598    for role in &config.roles {
599        validate_role(role, &config.peers, &seen_roles)?;
600        seen_roles.push(role.clone());
601    }
602
603    Ok(())
604}
605
606fn validate_peer_spec(
607    peer: &PeerSpec,
608    local_engine_id: &EngineId,
609    existing: &[PeerSpec],
610) -> Result<(), ClusterError> {
611    if &peer.engine_id == local_engine_id {
612        return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
613    }
614    if existing
615        .iter()
616        .any(|existing| existing.engine_id == peer.engine_id)
617    {
618        return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
619    }
620    Ok(())
621}
622
623fn validate_declared_actor<R: Reactor>(
624    path: &ActorPath,
625    owner: &EngineId,
626    local_engine_id: &EngineId,
627    peers: &[PeerSpec],
628    existing: &[DeclaredRemoteActor],
629    registry: &Arc<ActorRegistry<R>>,
630) -> Result<(), ClusterError> {
631    if owner == local_engine_id || registry.get_by_path(path).is_some() {
632        return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
633    }
634    if existing.iter().any(|existing| existing.path == *path) {
635        return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
636    }
637    if !peers.iter().any(|peer| peer.engine_id == *owner) {
638        return Err(ClusterError::UnknownPeerOwner {
639            path: path.clone(),
640            owner: owner.clone(),
641        });
642    }
643    Ok(())
644}
645
646fn validate_role(
647    role: &BoundedClusterRole,
648    peers: &[PeerSpec],
649    existing: &[BoundedClusterRole],
650) -> Result<(), ClusterError> {
651    if existing.iter().any(|existing| existing.name == role.name) {
652        return Err(ClusterError::DuplicateRole(role.name.clone()));
653    }
654    if !peers.iter().any(|peer| peer.engine_id == role.owner) {
655        return Err(ClusterError::UnknownRoleOwner {
656            role: role.name.clone(),
657            owner: role.owner.clone(),
658        });
659    }
660    Ok(())
661}
662
663#[cfg(test)]
664mod tests {
665    use super::{
666        validate_config, BoundedClusterConfig, BoundedClusterRole, BoundedTransportConfig,
667        ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError, RemoteSpawnSpec,
668    };
669    use crate::{Engine, EngineConfig};
670    use palladium_actor::{
671        Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
672        Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
673    };
674    use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
675    use rcgen::{
676        BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
677        KeyUsagePurpose,
678    };
679    use std::net::{SocketAddr, TcpListener, UdpSocket};
680    use std::sync::Arc;
681    use std::time::{Duration, Instant};
682
683    fn make_ca() -> (Certificate, KeyPair) {
684        let mut params = CertificateParams::default();
685        params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
686        params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
687        let ca_key = KeyPair::generate().expect("ca keypair");
688        let cert = params.self_signed(&ca_key).expect("ca cert");
689        (cert, ca_key)
690    }
691
692    fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
693        let keypair = KeyPair::generate().expect("leaf keypair");
694        let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
695        let mut params = params;
696        params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
697        params.extended_key_usages = vec![
698            ExtendedKeyUsagePurpose::ServerAuth,
699            ExtendedKeyUsagePurpose::ClientAuth,
700        ];
701        let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
702        let cert_der = cert.der().to_vec();
703        let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
704        let ca_der = ca.der().to_vec();
705        TlsConfig {
706            cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
707            private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
708            trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
709        }
710    }
711
712    fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
713        TcpTransportConfig {
714            engine_id: EngineId::new(engine_id),
715            listen_addr: "127.0.0.1:0".parse().unwrap(),
716            max_connections_per_peer: 1,
717            idle_timeout: Duration::from_secs(1),
718            send_buffer_size: 8,
719            nodelay: true,
720            tls,
721            send_delay: Duration::from_millis(0),
722        }
723    }
724
725    fn reserve_tcp_addr() -> SocketAddr {
726        let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
727        let addr = listener.local_addr().expect("tcp local addr");
728        drop(listener);
729        addr
730    }
731
732    fn reserve_udp_addr() -> SocketAddr {
733        let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
734        let addr = socket.local_addr().expect("udp local addr");
735        drop(socket);
736        addr
737    }
738
739    #[test]
740    fn bounded_cluster_config_rejects_duplicate_peers() {
741        let (ca, ca_key) = make_ca();
742        let engine = Engine::with_config(EngineConfig {
743            engine_id: EngineId::new("engine-a.example"),
744            ..Default::default()
745        });
746        let config = BoundedClusterConfig {
747            transport: BoundedTransportConfig::Tcp(tcp_config(
748                "engine-a.example",
749                make_tls_config("engine-a.example", &ca, &ca_key),
750            )),
751            peers: vec![
752                PeerSpec {
753                    engine_id: EngineId::new("engine-b.example"),
754                    addr: "127.0.0.1:4100".parse().unwrap(),
755                    control_plane_addr: None,
756                    server_name: None,
757                },
758                PeerSpec {
759                    engine_id: EngineId::new("engine-b.example"),
760                    addr: "127.0.0.1:4200".parse().unwrap(),
761                    control_plane_addr: None,
762                    server_name: None,
763                },
764            ],
765            declared_actors: Vec::new(),
766            roles: Vec::new(),
767        };
768
769        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
770        assert_eq!(
771            err,
772            ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
773        );
774    }
775
776    #[test]
777    fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
778        let (ca, ca_key) = make_ca();
779        let engine = Engine::with_config(EngineConfig {
780            engine_id: EngineId::new("engine-a.example"),
781            ..Default::default()
782        });
783        let config = BoundedClusterConfig {
784            transport: BoundedTransportConfig::Tcp(tcp_config(
785                "engine-a.example",
786                make_tls_config("engine-a.example", &ca, &ca_key),
787            )),
788            peers: vec![PeerSpec {
789                engine_id: EngineId::new("engine-b.example"),
790                addr: "127.0.0.1:4100".parse().unwrap(),
791                control_plane_addr: None,
792                server_name: None,
793            }],
794            declared_actors: vec![DeclaredRemoteActor {
795                path: ActorPath::parse("/user/missing-owner").unwrap(),
796                owner: EngineId::new("engine-c.example"),
797            }],
798            roles: Vec::new(),
799        };
800
801        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
802        assert_eq!(
803            err,
804            ClusterError::UnknownPeerOwner {
805                path: ActorPath::parse("/user/missing-owner").unwrap(),
806                owner: EngineId::new("engine-c.example"),
807            }
808        );
809    }
810
811    #[test]
812    fn bounded_cluster_config_rejects_duplicate_roles() {
813        let (ca, ca_key) = make_ca();
814        let engine = Engine::with_config(EngineConfig {
815            engine_id: EngineId::new("engine-a.example"),
816            ..Default::default()
817        });
818        let config = BoundedClusterConfig {
819            transport: BoundedTransportConfig::Tcp(tcp_config(
820                "engine-a.example",
821                make_tls_config("engine-a.example", &ca, &ca_key),
822            )),
823            peers: vec![PeerSpec {
824                engine_id: EngineId::new("engine-b.example"),
825                addr: "127.0.0.1:4100".parse().unwrap(),
826                control_plane_addr: None,
827                server_name: None,
828            }],
829            declared_actors: Vec::new(),
830            roles: vec![
831                BoundedClusterRole {
832                    name: "context-service".to_string(),
833                    owner: EngineId::new("engine-b.example"),
834                },
835                BoundedClusterRole {
836                    name: "context-service".to_string(),
837                    owner: EngineId::new("engine-b.example"),
838                },
839            ],
840        };
841
842        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
843        assert_eq!(
844            err,
845            ClusterError::DuplicateRole("context-service".to_string())
846        );
847    }
848
849    #[test]
850    fn bounded_cluster_config_rejects_unknown_role_owner() {
851        let (ca, ca_key) = make_ca();
852        let engine = Engine::with_config(EngineConfig {
853            engine_id: EngineId::new("engine-a.example"),
854            ..Default::default()
855        });
856        let config = BoundedClusterConfig {
857            transport: BoundedTransportConfig::Tcp(tcp_config(
858                "engine-a.example",
859                make_tls_config("engine-a.example", &ca, &ca_key),
860            )),
861            peers: vec![PeerSpec {
862                engine_id: EngineId::new("engine-b.example"),
863                addr: "127.0.0.1:4100".parse().unwrap(),
864                control_plane_addr: None,
865                server_name: None,
866            }],
867            declared_actors: Vec::new(),
868            roles: vec![BoundedClusterRole {
869                name: "gateway".to_string(),
870                owner: EngineId::new("engine-c.example"),
871            }],
872        };
873
874        let err = validate_config(&config, &engine.handle().registry).unwrap_err();
875        assert_eq!(
876            err,
877            ClusterError::UnknownRoleOwner {
878                role: "gateway".to_string(),
879                owner: EngineId::new("engine-c.example"),
880            }
881        );
882    }
883
884    #[test]
885    fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
886        assert_eq!(
887            super::remote_spawn_wait_timeout(Duration::from_millis(25)),
888            Duration::from_secs(5)
889        );
890        assert_eq!(
891            super::remote_spawn_wait_timeout(Duration::from_secs(8)),
892            Duration::from_secs(8)
893        );
894    }
895
896    fn tcp_tests_available() -> bool {
897        if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
898            return false;
899        }
900        std::net::TcpListener::bind("127.0.0.1:0").is_ok()
901    }
902
903    fn quic_tests_available() -> bool {
904        rustls::crypto::CryptoProvider::get_default().is_some()
905    }
906
907    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
908    struct EchoRequest {
909        value: u64,
910    }
911
912    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
913    struct EchoResponse {
914        doubled: u64,
915    }
916
917    impl Message for EchoRequest {
918        type Response = EchoResponse;
919        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
920    }
921
922    struct EchoActor;
923
924    impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
925        fn on_message(
926            &mut self,
927            ctx: &mut ActorContext<R>,
928            envelope: &Envelope,
929            payload: MessagePayload,
930        ) -> Result<(), ActorError> {
931            let req = payload
932                .extract::<EchoRequest>()
933                .map_err(|_| ActorError::Handler)?;
934            let resp = EchoResponse {
935                doubled: req.value * 2,
936            };
937            let resp_env = envelope.response(0);
938            ctx.send_raw(resp_env, MessagePayload::local(resp))
939                .map_err(|_| ActorError::Handler)
940        }
941    }
942
943    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
944    struct CrashRequest;
945
946    impl Message for CrashRequest {
947        type Response = ();
948        const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
949    }
950
951    struct SpawnedEchoActor;
952
953    impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
954        fn on_message(
955            &mut self,
956            ctx: &mut ActorContext<R>,
957            envelope: &Envelope,
958            payload: MessagePayload,
959        ) -> Result<(), ActorError> {
960            match envelope.type_tag {
961                EchoRequest::TYPE_TAG => {
962                    let req = payload
963                        .extract::<EchoRequest>()
964                        .map_err(|_| ActorError::Handler)?;
965                    let resp = EchoResponse {
966                        doubled: req.value * 2,
967                    };
968                    let resp_env = envelope.response(0);
969                    ctx.send_raw(resp_env, MessagePayload::local(resp))
970                        .map_err(|_| ActorError::Handler)
971                }
972                CrashRequest::TYPE_TAG => Err(ActorError::Handler),
973                _ => Err(ActorError::Handler),
974            }
975        }
976    }
977
978    fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
979        Arc::new(|type_name: &str, _config: &[u8]| match type_name {
980            "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
981            _ => Err(format!("unknown remote spawn type: {type_name}")),
982        })
983    }
984
985    #[tokio::test]
986    async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
987        if !tcp_tests_available() {
988            eprintln!("skipping tcp integration tests: network bind not permitted");
989            return;
990        }
991
992        let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
993        let (ca, ca_key) = make_ca();
994        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
995        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
996
997        let mut engine_a = Engine::with_config(EngineConfig {
998            engine_id: EngineId::new("engine-a.example"),
999            ..Default::default()
1000        });
1001        let engine_b = Engine::with_config(EngineConfig {
1002            engine_id: EngineId::new("engine-b.example"),
1003            ..Default::default()
1004        });
1005
1006        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1007        engine_a.add_user_actor(ChildSpec::new(
1008            "cluster-echo",
1009            RestartPolicy::Permanent,
1010            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1011            ns,
1012            move || Box::new(EchoActor),
1013        ));
1014
1015        let handle_a = engine_a.handle();
1016        let handle_b = engine_b.handle();
1017        handle_a
1018            .type_registry()
1019            .register_remote_ask::<EchoRequest>();
1020        handle_b
1021            .type_registry()
1022            .register_remote_ask::<EchoRequest>();
1023
1024        let cluster_a = handle_a
1025            .attach_bounded_cluster(BoundedClusterConfig {
1026                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1027                peers: Vec::new(),
1028                declared_actors: Vec::new(),
1029                roles: Vec::new(),
1030            })
1031            .await
1032            .expect("attach bounded cluster a");
1033        let tcp_a = cluster_a
1034            .transport()
1035            .as_tcp()
1036            .cloned()
1037            .expect("tcp transport a");
1038
1039        let cluster_b = handle_b
1040            .attach_bounded_cluster(BoundedClusterConfig {
1041                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1042                peers: vec![PeerSpec {
1043                    engine_id: EngineId::new("engine-a.example"),
1044                    addr: tcp_a.local_addr(),
1045                    control_plane_addr: None,
1046                    server_name: None,
1047                }],
1048                declared_actors: Vec::new(),
1049                roles: Vec::new(),
1050            })
1051            .await
1052            .expect("attach bounded cluster b");
1053        let tcp_b = cluster_b
1054            .transport()
1055            .as_tcp()
1056            .cloned()
1057            .expect("tcp transport b");
1058
1059        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1060        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1061        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1062
1063        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1064        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1065
1066        let canonical = AddrHash::new(&actor_path, 0);
1067        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1068        while !cluster_b.can_route(&actor_path) {
1069            if std::time::Instant::now() > deadline {
1070                panic!("bounded cluster route did not propagate");
1071            }
1072            tokio::time::sleep(Duration::from_millis(20)).await;
1073        }
1074        assert!(tcp_b.can_route(canonical));
1075
1076        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1077        let response = remote
1078            .ask(EchoRequest { value: 12 })
1079            .await
1080            .expect("bounded cluster ask");
1081        assert_eq!(response, EchoResponse { doubled: 24 });
1082
1083        shutdown_a_tx.send(()).ok();
1084        shutdown_b_tx.send(()).ok();
1085    }
1086
1087    #[tokio::test]
1088    async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1089        if !tcp_tests_available() {
1090            eprintln!("skipping tcp integration tests: network bind not permitted");
1091            return;
1092        }
1093
1094        let (ca, ca_key) = make_ca();
1095        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1096        let engine = Engine::with_config(EngineConfig {
1097            engine_id: EngineId::new("engine-a.example"),
1098            ..Default::default()
1099        });
1100        let handle = engine.handle();
1101
1102        let mut cluster = handle
1103            .attach_bounded_cluster(BoundedClusterConfig {
1104                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1105                peers: vec![PeerSpec {
1106                    engine_id: EngineId::new("engine-b.example"),
1107                    addr: "127.0.0.1:4100".parse().unwrap(),
1108                    control_plane_addr: None,
1109                    server_name: None,
1110                }],
1111                declared_actors: Vec::new(),
1112                roles: Vec::new(),
1113            })
1114            .await
1115            .expect("attach bounded cluster");
1116
1117        let path = ActorPath::parse("/user/declared-remote").unwrap();
1118        cluster
1119            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1120            .expect("declare remote actor");
1121
1122        assert!(cluster.can_route(&path));
1123        assert!(cluster
1124            .transport()
1125            .as_tcp()
1126            .expect("tcp transport")
1127            .can_route(AddrHash::new(&path, 0)));
1128    }
1129
1130    #[tokio::test]
1131    async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1132        if !tcp_tests_available() {
1133            eprintln!("skipping tcp integration tests: network bind not permitted");
1134            return;
1135        }
1136
1137        let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1138        let (ca, ca_key) = make_ca();
1139        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1140        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1141        let control_plane_b = reserve_tcp_addr();
1142
1143        let mut engine_a = Engine::with_config(EngineConfig {
1144            engine_id: EngineId::new("engine-a.example"),
1145            ..Default::default()
1146        });
1147        let engine_b = Engine::with_config(EngineConfig {
1148            engine_id: EngineId::new("engine-b.example"),
1149            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1150            control_plane_tls: Some(tls_b.clone()),
1151            actor_spawn: Some(bounded_spawn_handler()),
1152            ..Default::default()
1153        });
1154
1155        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1156        engine_a.add_user_actor(ChildSpec::new(
1157            "local-remote-collision",
1158            RestartPolicy::Permanent,
1159            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1160            ns,
1161            move || Box::new(EchoActor),
1162        ));
1163
1164        let handle_a = engine_a.handle();
1165        let handle_b = engine_b.handle();
1166
1167        let cluster_b = handle_b
1168            .attach_bounded_cluster(BoundedClusterConfig {
1169                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1170                peers: Vec::new(),
1171                declared_actors: Vec::new(),
1172                roles: Vec::new(),
1173            })
1174            .await
1175            .expect("attach bounded cluster b");
1176        let tcp_b = cluster_b
1177            .transport()
1178            .as_tcp()
1179            .cloned()
1180            .expect("tcp transport b");
1181
1182        let cluster_a = handle_a
1183            .attach_bounded_cluster(BoundedClusterConfig {
1184                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1185                peers: vec![PeerSpec {
1186                    engine_id: EngineId::new("engine-b.example"),
1187                    addr: tcp_b.local_addr(),
1188                    control_plane_addr: Some(control_plane_b),
1189                    server_name: None,
1190                }],
1191                declared_actors: Vec::new(),
1192                roles: vec![BoundedClusterRole {
1193                    name: "context-service".to_string(),
1194                    owner: EngineId::new("engine-b.example"),
1195                }],
1196            })
1197            .await
1198            .expect("attach bounded cluster a");
1199
1200        let err = cluster_a
1201            .spawn_remote_on_role::<EchoRequest>(
1202                "context-service",
1203                RemoteSpawnSpec {
1204                    path: actor_path.clone(),
1205                    type_name: "bounded-echo".to_string(),
1206                    config: None,
1207                },
1208            )
1209            .await
1210            .expect_err("local path collision should fail");
1211        assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1212    }
1213
1214    #[tokio::test]
1215    async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1216        if !tcp_tests_available() {
1217            eprintln!("skipping tcp integration tests: network bind not permitted");
1218            return;
1219        }
1220
1221        let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1222        let (ca, ca_key) = make_ca();
1223        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1224        let engine = Engine::with_config(EngineConfig {
1225            engine_id: EngineId::new("engine-a.example"),
1226            ..Default::default()
1227        });
1228        let handle = engine.handle();
1229
1230        let cluster = handle
1231            .attach_bounded_cluster(BoundedClusterConfig {
1232                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1233                peers: vec![
1234                    PeerSpec {
1235                        engine_id: EngineId::new("engine-b.example"),
1236                        addr: "127.0.0.1:4100".parse().unwrap(),
1237                        control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1238                        server_name: None,
1239                    },
1240                    PeerSpec {
1241                        engine_id: EngineId::new("engine-c.example"),
1242                        addr: "127.0.0.1:4200".parse().unwrap(),
1243                        control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1244                        server_name: None,
1245                    },
1246                ],
1247                declared_actors: vec![DeclaredRemoteActor {
1248                    path: actor_path.clone(),
1249                    owner: EngineId::new("engine-b.example"),
1250                }],
1251                roles: Vec::new(),
1252            })
1253            .await
1254            .expect("attach bounded cluster");
1255
1256        let err = cluster
1257            .spawn_remote::<EchoRequest>(
1258                EngineId::new("engine-c.example"),
1259                RemoteSpawnSpec {
1260                    path: actor_path.clone(),
1261                    type_name: "bounded-echo".to_string(),
1262                    config: None,
1263                },
1264            )
1265            .await
1266            .expect_err("conflicting declared owner should fail");
1267        assert_eq!(
1268            err,
1269            RemoteSpawnError::RemoteOwnershipConflict {
1270                path: actor_path,
1271                owner: EngineId::new("engine-b.example"),
1272            }
1273        );
1274    }
1275
1276    #[tokio::test]
1277    async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1278        if !tcp_tests_available() {
1279            eprintln!("skipping tcp integration tests: network bind not permitted");
1280            return;
1281        }
1282
1283        let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1284        let (ca, ca_key) = make_ca();
1285        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1286        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1287        let control_plane_b = reserve_tcp_addr();
1288
1289        let engine_a = Engine::with_config(EngineConfig {
1290            engine_id: EngineId::new("engine-a.example"),
1291            ..Default::default()
1292        });
1293        let engine_b = Engine::with_config(EngineConfig {
1294            engine_id: EngineId::new("engine-b.example"),
1295            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1296            control_plane_tls: Some(tls_b.clone()),
1297            actor_spawn: Some(bounded_spawn_handler()),
1298            ..Default::default()
1299        });
1300
1301        let handle_a = engine_a.handle();
1302        let handle_b = engine_b.handle();
1303        handle_a
1304            .type_registry()
1305            .register_remote_ask::<EchoRequest>();
1306        handle_b
1307            .type_registry()
1308            .register_remote_ask::<EchoRequest>();
1309
1310        let cluster_a = handle_a
1311            .attach_bounded_cluster(BoundedClusterConfig {
1312                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1313                peers: Vec::new(),
1314                declared_actors: Vec::new(),
1315                roles: Vec::new(),
1316            })
1317            .await
1318            .expect("attach bounded cluster a");
1319        let tcp_a = cluster_a
1320            .transport()
1321            .as_tcp()
1322            .cloned()
1323            .expect("tcp transport a");
1324
1325        let cluster_b = handle_b
1326            .attach_bounded_cluster(BoundedClusterConfig {
1327                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1328                peers: vec![PeerSpec {
1329                    engine_id: EngineId::new("engine-a.example"),
1330                    addr: tcp_a.local_addr(),
1331                    control_plane_addr: None,
1332                    server_name: None,
1333                }],
1334                declared_actors: Vec::new(),
1335                roles: Vec::new(),
1336            })
1337            .await
1338            .expect("attach bounded cluster b");
1339        let tcp_b = cluster_b
1340            .transport()
1341            .as_tcp()
1342            .cloned()
1343            .expect("tcp transport b");
1344
1345        tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1346        let mut cluster_a = cluster_a;
1347        cluster_a
1348            .add_peer(PeerSpec {
1349                engine_id: EngineId::new("engine-b.example"),
1350                addr: tcp_b.local_addr(),
1351                control_plane_addr: Some(control_plane_b),
1352                server_name: None,
1353            })
1354            .expect("add peer with control plane");
1355
1356        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1357        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1358
1359        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1360        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1361        tokio::time::sleep(Duration::from_millis(150)).await;
1362
1363        let remote = cluster_a
1364            .spawn_remote::<EchoRequest>(
1365                EngineId::new("engine-b.example"),
1366                RemoteSpawnSpec {
1367                    path: actor_path.clone(),
1368                    type_name: "bounded-echo".to_string(),
1369                    config: None,
1370                },
1371            )
1372            .await
1373            .expect("spawn remote actor");
1374
1375        let response = remote
1376            .ask(EchoRequest { value: 21 })
1377            .await
1378            .expect("bounded remote spawn ask");
1379        assert_eq!(response, EchoResponse { doubled: 42 });
1380
1381        let duplicate = cluster_a
1382            .spawn_remote::<EchoRequest>(
1383                EngineId::new("engine-b.example"),
1384                RemoteSpawnSpec {
1385                    path: actor_path.clone(),
1386                    type_name: "bounded-echo".to_string(),
1387                    config: None,
1388                },
1389            )
1390            .await
1391            .expect_err("duplicate remote spawn should fail");
1392        assert_eq!(
1393            duplicate,
1394            RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
1395        );
1396
1397        let slot = handle_b
1398            .registry
1399            .get_by_path(&actor_path)
1400            .expect("spawned actor registered on host");
1401        handle_b.registry.increment_restart_count(slot.addr);
1402        slot.ctrl_tx
1403            .send(crate::common::LifecycleSignal::Stop(
1404                palladium_actor::StopReason::Killed,
1405            ))
1406            .ok();
1407
1408        let restart_deadline = Instant::now() + Duration::from_secs(5);
1409        loop {
1410            match remote.ask(EchoRequest { value: 7 }).await {
1411                Ok(response) => {
1412                    assert_eq!(response, EchoResponse { doubled: 14 });
1413                    break;
1414                }
1415                Err(_) if Instant::now() <= restart_deadline => {
1416                    tokio::time::sleep(Duration::from_millis(25)).await;
1417                }
1418                Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1419            }
1420        }
1421
1422        let unknown = cluster_a
1423            .spawn_remote::<EchoRequest>(
1424                EngineId::new("engine-c.example"),
1425                RemoteSpawnSpec {
1426                    path: ActorPath::parse("/user/unknown-target").unwrap(),
1427                    type_name: "bounded-echo".to_string(),
1428                    config: None,
1429                },
1430            )
1431            .await
1432            .expect_err("unknown target should fail");
1433        assert_eq!(
1434            unknown,
1435            RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1436        );
1437
1438        shutdown_a_tx.send(()).ok();
1439        shutdown_b_tx.send(()).ok();
1440    }
1441
1442    #[tokio::test]
1443    async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1444        if !tcp_tests_available() {
1445            eprintln!("skipping tcp integration tests: network bind not permitted");
1446            return;
1447        }
1448
1449        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1450        let (ca, ca_key) = make_ca();
1451        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1452        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1453        let control_plane_b = reserve_tcp_addr();
1454
1455        let engine_a = Engine::with_config(EngineConfig {
1456            engine_id: EngineId::new("engine-a.example"),
1457            ..Default::default()
1458        });
1459        let engine_b = Engine::with_config(EngineConfig {
1460            engine_id: EngineId::new("engine-b.example"),
1461            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1462            control_plane_tls: Some(tls_b.clone()),
1463            actor_spawn: Some(bounded_spawn_handler()),
1464            ..Default::default()
1465        });
1466
1467        let handle_a = engine_a.handle();
1468        let handle_b = engine_b.handle();
1469        handle_a
1470            .type_registry()
1471            .register_remote_ask::<EchoRequest>();
1472        handle_b
1473            .type_registry()
1474            .register_remote_ask::<EchoRequest>();
1475
1476        let cluster_b = handle_b
1477            .attach_bounded_cluster(BoundedClusterConfig {
1478                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1479                peers: Vec::new(),
1480                declared_actors: Vec::new(),
1481                roles: Vec::new(),
1482            })
1483            .await
1484            .expect("attach bounded cluster b");
1485        let tcp_b = cluster_b
1486            .transport()
1487            .as_tcp()
1488            .cloned()
1489            .expect("tcp transport b");
1490
1491        let cluster_a = handle_a
1492            .attach_bounded_cluster(BoundedClusterConfig {
1493                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1494                peers: vec![PeerSpec {
1495                    engine_id: EngineId::new("engine-b.example"),
1496                    addr: tcp_b.local_addr(),
1497                    control_plane_addr: Some(control_plane_b),
1498                    server_name: None,
1499                }],
1500                declared_actors: Vec::new(),
1501                roles: vec![BoundedClusterRole {
1502                    name: "context-service".to_string(),
1503                    owner: EngineId::new("engine-b.example"),
1504                }],
1505            })
1506            .await
1507            .expect("attach bounded cluster a");
1508        let tcp_a = cluster_a
1509            .transport()
1510            .as_tcp()
1511            .cloned()
1512            .expect("tcp transport a");
1513
1514        let mut cluster_b = cluster_b;
1515        cluster_b
1516            .add_peer(PeerSpec {
1517                engine_id: EngineId::new("engine-a.example"),
1518                addr: tcp_a.local_addr(),
1519                control_plane_addr: None,
1520                server_name: None,
1521            })
1522            .expect("add peer a to cluster b");
1523
1524        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1525        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1526
1527        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1528        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1529        tokio::time::sleep(Duration::from_millis(150)).await;
1530
1531        let remote = cluster_a
1532            .spawn_remote_on_role::<EchoRequest>(
1533                "context-service",
1534                RemoteSpawnSpec {
1535                    path: actor_path.clone(),
1536                    type_name: "bounded-echo".to_string(),
1537                    config: None,
1538                },
1539            )
1540            .await
1541            .expect("spawn remote actor by role over tcp");
1542
1543        let response = remote
1544            .ask(EchoRequest { value: 11 })
1545            .await
1546            .expect("bounded remote spawn ask over tcp by role");
1547        assert_eq!(response, EchoResponse { doubled: 22 });
1548
1549        let unknown_role = cluster_a
1550            .spawn_remote_on_role::<EchoRequest>(
1551                "missing-role",
1552                RemoteSpawnSpec {
1553                    path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1554                    type_name: "bounded-echo".to_string(),
1555                    config: None,
1556                },
1557            )
1558            .await
1559            .expect_err("unknown role should fail");
1560        assert_eq!(
1561            unknown_role,
1562            RemoteSpawnError::UnknownRole("missing-role".to_string())
1563        );
1564
1565        shutdown_a_tx.send(()).ok();
1566        shutdown_b_tx.send(()).ok();
1567    }
1568
1569    #[tokio::test]
1570    async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
1571        if !tcp_tests_available() {
1572            eprintln!("skipping tcp integration tests: network bind not permitted");
1573            return;
1574        }
1575
1576        let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
1577        let (ca, ca_key) = make_ca();
1578        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1579        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1580        let control_plane_b = reserve_tcp_addr();
1581
1582        let engine_a = Engine::with_config(EngineConfig {
1583            engine_id: EngineId::new("engine-a.example"),
1584            ..Default::default()
1585        });
1586        let engine_b = Engine::with_config(EngineConfig {
1587            engine_id: EngineId::new("engine-b.example"),
1588            control_plane_tcp_addr: Some(control_plane_b.to_string()),
1589            control_plane_tls: Some(tls_b.clone()),
1590            actor_spawn: Some(bounded_spawn_handler()),
1591            ..Default::default()
1592        });
1593
1594        let handle_a = engine_a.handle();
1595        let handle_b = engine_b.handle();
1596        handle_a
1597            .type_registry()
1598            .register_remote_ask::<EchoRequest>();
1599        handle_b
1600            .type_registry()
1601            .register_remote_ask::<EchoRequest>();
1602
1603        let cluster_b = handle_b
1604            .attach_bounded_cluster(BoundedClusterConfig {
1605                transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1606                peers: Vec::new(),
1607                declared_actors: Vec::new(),
1608                roles: Vec::new(),
1609            })
1610            .await
1611            .expect("attach bounded cluster b");
1612        let tcp_b = cluster_b
1613            .transport()
1614            .as_tcp()
1615            .cloned()
1616            .expect("tcp transport b");
1617
1618        let cluster_a = handle_a
1619            .attach_bounded_cluster(BoundedClusterConfig {
1620                transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1621                peers: vec![PeerSpec {
1622                    engine_id: EngineId::new("engine-b.example"),
1623                    addr: tcp_b.local_addr(),
1624                    control_plane_addr: Some(control_plane_b),
1625                    server_name: None,
1626                }],
1627                declared_actors: Vec::new(),
1628                roles: vec![BoundedClusterRole {
1629                    name: "context-service".to_string(),
1630                    owner: EngineId::new("engine-b.example"),
1631                }],
1632            })
1633            .await
1634            .expect("attach bounded cluster a");
1635        let tcp_a = cluster_a
1636            .transport()
1637            .as_tcp()
1638            .cloned()
1639            .expect("tcp transport a");
1640
1641        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1642        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1643
1644        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1645        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1646
1647        // Caller knows the static owner already. Remote spawn should not require
1648        // the host engine to predeclare a reverse peer just to install the
1649        // caller's canonical path route.
1650        let _ = tcp_a;
1651
1652        let remote = cluster_a
1653            .spawn_remote_on_role::<EchoRequest>(
1654                "context-service",
1655                RemoteSpawnSpec {
1656                    path: actor_path.clone(),
1657                    type_name: "bounded-echo".to_string(),
1658                    config: None,
1659                },
1660            )
1661            .await
1662            .expect("spawn remote actor by role over tcp without reverse declaration");
1663
1664        let response = remote
1665            .ask(EchoRequest { value: 13 })
1666            .await
1667            .expect("bounded remote spawn ask over tcp without reverse declaration");
1668        assert_eq!(response, EchoResponse { doubled: 26 });
1669
1670        shutdown_a_tx.send(()).ok();
1671        shutdown_b_tx.send(()).ok();
1672    }
1673
1674    fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
1675        QuicTransportConfig {
1676            engine_id: EngineId::new(engine_id),
1677            listen_addr: "127.0.0.1:0".parse().unwrap(),
1678            send_buffer_size: 1024,
1679            idle_timeout: Duration::from_secs(2),
1680            tls,
1681        }
1682    }
1683
1684    #[tokio::test]
1685    async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
1686        if !quic_tests_available() {
1687            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1688            return;
1689        }
1690
1691        let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
1692        let (ca, ca_key) = make_ca();
1693        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1694        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1695
1696        let mut engine_a = Engine::with_config(EngineConfig {
1697            engine_id: EngineId::new("engine-a.example"),
1698            ..Default::default()
1699        });
1700        let engine_b = Engine::with_config(EngineConfig {
1701            engine_id: EngineId::new("engine-b.example"),
1702            ..Default::default()
1703        });
1704
1705        let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1706        engine_a.add_user_actor(ChildSpec::new(
1707            "cluster-echo-quic",
1708            RestartPolicy::Permanent,
1709            ShutdownPolicy::Timeout(Duration::from_secs(1)),
1710            ns,
1711            move || Box::new(EchoActor),
1712        ));
1713
1714        let handle_a = engine_a.handle();
1715        let handle_b = engine_b.handle();
1716        handle_a
1717            .type_registry()
1718            .register_remote_ask::<EchoRequest>();
1719        handle_b
1720            .type_registry()
1721            .register_remote_ask::<EchoRequest>();
1722
1723        let cluster_a = match handle_a
1724            .attach_bounded_cluster(BoundedClusterConfig {
1725                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1726                peers: Vec::new(),
1727                declared_actors: Vec::new(),
1728                roles: vec![BoundedClusterRole {
1729                    name: "context-service".to_string(),
1730                    owner: EngineId::new("engine-b.example"),
1731                }],
1732            })
1733            .await
1734        {
1735            Ok(cluster) => cluster,
1736            Err(ClusterError::Transport(err)) => {
1737                eprintln!("skipping quic bounded-cluster test: {err:?}");
1738                return;
1739            }
1740            Err(err) => panic!("attach bounded cluster a: {err:?}"),
1741        };
1742        let quic_a = cluster_a
1743            .transport()
1744            .as_quic()
1745            .cloned()
1746            .expect("quic transport a");
1747
1748        let cluster_b = match handle_b
1749            .attach_bounded_cluster(BoundedClusterConfig {
1750                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1751                peers: vec![PeerSpec {
1752                    engine_id: EngineId::new("engine-a.example"),
1753                    addr: quic_a.local_addr(),
1754                    control_plane_addr: None,
1755                    server_name: None,
1756                }],
1757                declared_actors: Vec::new(),
1758                roles: Vec::new(),
1759            })
1760            .await
1761        {
1762            Ok(cluster) => cluster,
1763            Err(ClusterError::Transport(err)) => {
1764                eprintln!("skipping quic bounded-cluster test: {err:?}");
1765                return;
1766            }
1767            Err(err) => panic!("attach bounded cluster b: {err:?}"),
1768        };
1769        let quic_b = cluster_b
1770            .transport()
1771            .as_quic()
1772            .cloned()
1773            .expect("quic transport b");
1774
1775        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1776        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1777        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1778
1779        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1780        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1781
1782        let canonical = AddrHash::new(&actor_path, 0);
1783        let deadline = std::time::Instant::now() + Duration::from_secs(5);
1784        while !cluster_b.can_route(&actor_path) {
1785            if std::time::Instant::now() > deadline {
1786                panic!("bounded quic cluster route did not propagate");
1787            }
1788            tokio::time::sleep(Duration::from_millis(20)).await;
1789        }
1790        assert!(quic_b.can_route(canonical));
1791
1792        let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1793        let response = remote
1794            .ask(EchoRequest { value: 15 })
1795            .await
1796            .expect("bounded cluster ask over quic");
1797        assert_eq!(response, EchoResponse { doubled: 30 });
1798
1799        shutdown_a_tx.send(()).ok();
1800        shutdown_b_tx.send(()).ok();
1801    }
1802
1803    #[tokio::test]
1804    async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
1805        if !quic_tests_available() {
1806            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1807            return;
1808        }
1809
1810        let (ca, ca_key) = make_ca();
1811        let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1812        let engine = Engine::with_config(EngineConfig {
1813            engine_id: EngineId::new("engine-a.example"),
1814            ..Default::default()
1815        });
1816        let handle = engine.handle();
1817
1818        let mut cluster = match handle
1819            .attach_bounded_cluster(BoundedClusterConfig {
1820                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
1821                peers: vec![PeerSpec {
1822                    engine_id: EngineId::new("engine-b.example"),
1823                    addr: "127.0.0.1:4100".parse().unwrap(),
1824                    control_plane_addr: None,
1825                    server_name: None,
1826                }],
1827                declared_actors: Vec::new(),
1828                roles: Vec::new(),
1829            })
1830            .await
1831        {
1832            Ok(cluster) => cluster,
1833            Err(ClusterError::Transport(err)) => {
1834                eprintln!("skipping quic bounded-cluster test: {err:?}");
1835                return;
1836            }
1837            Err(err) => panic!("attach bounded cluster: {err:?}"),
1838        };
1839
1840        let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
1841        cluster
1842            .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1843            .expect("declare remote actor");
1844
1845        assert!(cluster.can_route(&path));
1846        assert!(cluster
1847            .transport()
1848            .as_quic()
1849            .expect("quic transport")
1850            .can_route(AddrHash::new(&path, 0)));
1851    }
1852
1853    #[tokio::test]
1854    async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
1855        if !quic_tests_available() {
1856            eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1857            return;
1858        }
1859
1860        let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
1861        let (ca, ca_key) = make_ca();
1862        let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1863        let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1864        let control_plane_b = reserve_udp_addr();
1865
1866        let engine_a = Engine::with_config(EngineConfig {
1867            engine_id: EngineId::new("engine-a.example"),
1868            ..Default::default()
1869        });
1870        let engine_b = Engine::with_config(EngineConfig {
1871            engine_id: EngineId::new("engine-b.example"),
1872            control_plane_quic_addr: Some(control_plane_b.to_string()),
1873            control_plane_tls: Some(tls_b.clone()),
1874            actor_spawn: Some(bounded_spawn_handler()),
1875            ..Default::default()
1876        });
1877
1878        let handle_a = engine_a.handle();
1879        let handle_b = engine_b.handle();
1880        handle_a
1881            .type_registry()
1882            .register_remote_ask::<EchoRequest>();
1883        handle_b
1884            .type_registry()
1885            .register_remote_ask::<EchoRequest>();
1886
1887        let cluster_a = match handle_a
1888            .attach_bounded_cluster(BoundedClusterConfig {
1889                transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1890                peers: Vec::new(),
1891                declared_actors: Vec::new(),
1892                roles: Vec::new(),
1893            })
1894            .await
1895        {
1896            Ok(cluster) => cluster,
1897            Err(ClusterError::Transport(err)) => {
1898                eprintln!("skipping quic bounded-cluster test: {err:?}");
1899                return;
1900            }
1901            Err(err) => panic!("attach bounded cluster a: {err:?}"),
1902        };
1903        let quic_a = cluster_a
1904            .transport()
1905            .as_quic()
1906            .cloned()
1907            .expect("quic transport a");
1908
1909        let cluster_b = match handle_b
1910            .attach_bounded_cluster(BoundedClusterConfig {
1911                transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1912                peers: vec![PeerSpec {
1913                    engine_id: EngineId::new("engine-a.example"),
1914                    addr: quic_a.local_addr(),
1915                    control_plane_addr: None,
1916                    server_name: None,
1917                }],
1918                declared_actors: Vec::new(),
1919                roles: Vec::new(),
1920            })
1921            .await
1922        {
1923            Ok(cluster) => cluster,
1924            Err(ClusterError::Transport(err)) => {
1925                eprintln!("skipping quic bounded-cluster test: {err:?}");
1926                return;
1927            }
1928            Err(err) => panic!("attach bounded cluster b: {err:?}"),
1929        };
1930        let quic_b = cluster_b
1931            .transport()
1932            .as_quic()
1933            .cloned()
1934            .expect("quic transport b");
1935
1936        quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1937        let mut cluster_a = cluster_a;
1938        cluster_a
1939            .add_peer(PeerSpec {
1940                engine_id: EngineId::new("engine-b.example"),
1941                addr: quic_b.local_addr(),
1942                control_plane_addr: Some(control_plane_b),
1943                server_name: None,
1944            })
1945            .expect("add quic peer with control plane");
1946
1947        let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1948        let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1949
1950        std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1951        std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1952        tokio::time::sleep(Duration::from_millis(150)).await;
1953
1954        let remote = cluster_a
1955            .spawn_remote_on_role::<EchoRequest>(
1956                "context-service",
1957                RemoteSpawnSpec {
1958                    path: actor_path.clone(),
1959                    type_name: "bounded-echo".to_string(),
1960                    config: None,
1961                },
1962            )
1963            .await
1964            .expect("spawn remote actor over quic by role");
1965
1966        let response = remote
1967            .ask(EchoRequest { value: 9 })
1968            .await
1969            .expect("bounded remote spawn ask over quic by role");
1970        assert_eq!(response, EchoResponse { doubled: 18 });
1971
1972        let unknown_role = cluster_a
1973            .spawn_remote_on_role::<EchoRequest>(
1974                "missing-role",
1975                RemoteSpawnSpec {
1976                    path: ActorPath::parse("/user/missing-role").unwrap(),
1977                    type_name: "bounded-echo".to_string(),
1978                    config: None,
1979                },
1980            )
1981            .await
1982            .expect_err("unknown role should fail");
1983        assert_eq!(
1984            unknown_role,
1985            RemoteSpawnError::UnknownRole("missing-role".to_string())
1986        );
1987
1988        shutdown_a_tx.send(()).ok();
1989        shutdown_b_tx.send(()).ok();
1990    }
1991}