1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
10use palladium_transport::network::{Network, TokioNetwork};
11use palladium_transport::{
12 QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport, TransportError,
13};
14use rustls::pki_types::ServerName;
15use serde_json::{json, Value};
16use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
17use tokio_rustls::TlsConnector;
18
19#[derive(Clone)]
20pub struct BoundedClusterConfig {
21 pub transport: BoundedTransportConfig,
22 pub peers: Vec<PeerSpec>,
23 pub declared_actors: Vec<DeclaredRemoteActor>,
24 pub roles: Vec<BoundedClusterRole>,
25}
26
27#[derive(Clone)]
28pub enum BoundedTransportConfig {
29 Tcp(TcpTransportConfig),
30 Quic(QuicTransportConfig),
31}
32
33impl BoundedTransportConfig {
34 pub fn engine_id(&self) -> &EngineId {
35 match self {
36 Self::Tcp(config) => &config.engine_id,
37 Self::Quic(config) => &config.engine_id,
38 }
39 }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct PeerSpec {
44 pub engine_id: EngineId,
45 pub addr: SocketAddr,
46 pub control_plane_addr: Option<SocketAddr>,
47 pub server_name: Option<String>,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct DeclaredRemoteActor {
52 pub path: ActorPath,
53 pub owner: EngineId,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct BoundedClusterRole {
58 pub name: String,
59 pub owner: EngineId,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum ClusterError {
64 DuplicatePeer(EngineId),
65 LocalPeerDeclared(EngineId),
66 DuplicateDeclaredActor(ActorPath),
67 LocalActorDeclaredRemote(ActorPath),
68 UnknownPeerOwner { path: ActorPath, owner: EngineId },
69 DuplicateRole(String),
70 UnknownRoleOwner { role: String, owner: EngineId },
71 Transport(TransportError),
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub struct RemoteSpawnSpec {
76 pub path: ActorPath,
77 pub type_name: String,
78 pub config: Option<Value>,
79}
80
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum RemoteSpawnError {
83 UnknownPeer(EngineId),
84 UnknownRole(String),
85 MissingControlPlaneEndpoint(EngineId),
86 InvalidServerName(String),
87 ControlPlane(String),
88 SpawnTimedOut(ActorPath),
89}
90
91#[derive(Clone)]
92pub(crate) enum ControlPlaneProtocol {
93 Tcp,
94 Quic,
95}
96
97#[derive(Clone)]
98pub(crate) struct ControlPlaneClientConfig {
99 pub(crate) protocol: ControlPlaneProtocol,
100 pub(crate) tls: palladium_transport::TlsConfig,
101 pub(crate) wait_timeout: Duration,
102}
103
104const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
105
106impl From<TransportError> for ClusterError {
107 fn from(value: TransportError) -> Self {
108 Self::Transport(value)
109 }
110}
111
112impl From<TransportError> for RemoteSpawnError {
113 fn from(value: TransportError) -> Self {
114 Self::ControlPlane(format!("transport route install failed: {value:?}"))
115 }
116}
117
118#[derive(Clone)]
119pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
120 Tcp(Arc<TcpTransport<R, N>>),
121 Quic(Arc<QuicTransport<R, N>>),
122}
123
124impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
125 pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
126 match self {
127 Self::Tcp(transport) => Some(transport),
128 Self::Quic(_) => None,
129 }
130 }
131
132 pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
133 match self {
134 Self::Tcp(_) => None,
135 Self::Quic(transport) => Some(transport),
136 }
137 }
138
139 pub fn add_peer(&self, peer: &PeerSpec) {
140 match self {
141 Self::Tcp(transport) => {
142 if let Some(server_name) = &peer.server_name {
143 transport.add_peer_with_server_name(
144 peer.engine_id.clone(),
145 peer.addr,
146 server_name.clone(),
147 );
148 } else {
149 transport.add_peer(peer.engine_id.clone(), peer.addr);
150 }
151 }
152 Self::Quic(transport) => {
153 if let Some(server_name) = &peer.server_name {
154 transport.add_peer_with_server_name(
155 peer.engine_id.clone(),
156 peer.addr,
157 server_name.clone(),
158 );
159 } else {
160 transport.add_peer(peer.engine_id.clone(), peer.addr);
161 }
162 }
163 }
164 }
165
166 pub fn declare_remote_path(
167 &self,
168 owner: EngineId,
169 path: &ActorPath,
170 ) -> Result<(), TransportError> {
171 match self {
172 Self::Tcp(transport) => transport.declare_remote_path(owner, path),
173 Self::Quic(transport) => transport.declare_remote_path(owner, path),
174 }
175 }
176
177 pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
178 match self {
179 Self::Tcp(transport) => transport.undeclare_remote_path(path),
180 Self::Quic(transport) => transport.undeclare_remote_path(path),
181 }
182 }
183
184 pub fn can_route(&self, destination: AddrHash) -> bool {
185 match self {
186 Self::Tcp(transport) => transport.can_route(destination),
187 Self::Quic(transport) => transport.can_route(destination),
188 }
189 }
190}
191
192#[derive(Clone)]
193pub struct BoundedClusterHandle<
194 R: Reactor = TokioReactor,
195 N: Network = TokioNetwork,
196 F: FileSystem = TokioFileSystem,
197> {
198 engine: EngineHandle<R, N, F>,
199 local_engine_id: EngineId,
200 control_plane: ControlPlaneClientConfig,
201 transport: BoundedTransportHandle<R, N>,
202 peers: Vec<PeerSpec>,
203 declared_actors: Vec<DeclaredRemoteActor>,
204 roles: Vec<BoundedClusterRole>,
205 registry: Arc<ActorRegistry<R>>,
206}
207
208impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
209 pub(crate) fn new(
210 engine: EngineHandle<R, N, F>,
211 local_engine_id: EngineId,
212 control_plane: ControlPlaneClientConfig,
213 transport: BoundedTransportHandle<R, N>,
214 peers: Vec<PeerSpec>,
215 roles: Vec<BoundedClusterRole>,
216 registry: Arc<ActorRegistry<R>>,
217 ) -> Self {
218 Self {
219 engine,
220 local_engine_id,
221 control_plane,
222 transport,
223 peers,
224 declared_actors: Vec::new(),
225 roles,
226 registry,
227 }
228 }
229
230 pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
231 &self.transport
232 }
233
234 pub fn peers(&self) -> &[PeerSpec] {
235 &self.peers
236 }
237
238 pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
239 &self.declared_actors
240 }
241
242 pub fn roles(&self) -> &[BoundedClusterRole] {
243 &self.roles
244 }
245
246 pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
247 validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
248 self.transport.add_peer(&peer);
249 self.peers.push(peer);
250 Ok(())
251 }
252
253 pub fn declare_remote_actor(
254 &mut self,
255 owner: EngineId,
256 path: ActorPath,
257 ) -> Result<(), ClusterError> {
258 validate_declared_actor(
259 &path,
260 &owner,
261 &self.local_engine_id,
262 &self.peers,
263 &self.declared_actors,
264 &self.registry,
265 )?;
266 self.transport.declare_remote_path(owner.clone(), &path)?;
267 self.declared_actors
268 .push(DeclaredRemoteActor { path, owner });
269 Ok(())
270 }
271
272 pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
273 self.transport.undeclare_remote_path(path)?;
274 self.declared_actors
275 .retain(|declared| &declared.path != path);
276 Ok(())
277 }
278
279 pub fn can_route(&self, path: &ActorPath) -> bool {
280 self.transport.can_route(AddrHash::new(path, 0))
281 }
282
283 pub async fn spawn_remote_on_role<M>(
284 &self,
285 role: &str,
286 spec: RemoteSpawnSpec,
287 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
288 where
289 M: RemoteMessage,
290 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
291 R: Send + Sync,
292 {
293 let owner = self
294 .roles
295 .iter()
296 .find(|declared| declared.name == role)
297 .map(|declared| declared.owner.clone())
298 .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
299 self.spawn_remote::<M>(owner, spec).await
300 }
301
302 pub async fn spawn_remote<M>(
303 &self,
304 target: EngineId,
305 spec: RemoteSpawnSpec,
306 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
307 where
308 M: RemoteMessage,
309 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
310 R: Send + Sync,
311 {
312 let peer = self
313 .peers
314 .iter()
315 .find(|peer| peer.engine_id == target)
316 .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
317 let control_plane_addr = peer
318 .control_plane_addr
319 .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
320 let server_name = peer
321 .server_name
322 .clone()
323 .unwrap_or_else(|| target.as_str().to_string());
324
325 call_actor_spawn(&self.control_plane, control_plane_addr, &server_name, &spec).await?;
326
327 let deadline = Instant::now() + self.control_plane.wait_timeout;
328 while Instant::now() <= deadline {
329 if actor_is_spawned(
330 &self.control_plane,
331 control_plane_addr,
332 &server_name,
333 &spec.path,
334 )
335 .await?
336 {
337 self.transport
338 .declare_remote_path(target.clone(), &spec.path)
339 .map_err(RemoteSpawnError::from)?;
340 return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
341 }
342 tokio::time::sleep(Duration::from_millis(20)).await;
343 }
344
345 Err(RemoteSpawnError::SpawnTimedOut(spec.path))
346 }
347}
348
349async fn call_actor_spawn(
350 client: &ControlPlaneClientConfig,
351 addr: SocketAddr,
352 server_name: &str,
353 spec: &RemoteSpawnSpec,
354) -> Result<(), RemoteSpawnError> {
355 let params = match &spec.config {
356 Some(config) => json!({
357 "path": spec.path.as_str(),
358 "type_name": spec.type_name,
359 "config": config,
360 }),
361 None => json!({
362 "path": spec.path.as_str(),
363 "type_name": spec.type_name,
364 }),
365 };
366 call_control_plane(client, addr, server_name, "actor.spawn", params).await?;
367 Ok(())
368}
369
370async fn actor_is_spawned(
371 client: &ControlPlaneClientConfig,
372 addr: SocketAddr,
373 server_name: &str,
374 path: &ActorPath,
375) -> Result<bool, RemoteSpawnError> {
376 match call_control_plane(
377 client,
378 addr,
379 server_name,
380 "actor.info",
381 json!({ "path": path.as_str() }),
382 )
383 .await
384 {
385 Ok(_) => Ok(true),
386 Err(RemoteSpawnError::ControlPlane(message))
387 if message.contains("actor not found:")
388 || message.contains("actor not found")
389 || message.contains("User supervisor not available") =>
390 {
391 Ok(false)
392 }
393 Err(err) => Err(err),
394 }
395}
396
397async fn call_control_plane(
398 client: &ControlPlaneClientConfig,
399 addr: SocketAddr,
400 server_name: &str,
401 method: &str,
402 params: Value,
403) -> Result<Value, RemoteSpawnError> {
404 let request = json!({
405 "id": 1,
406 "method": method,
407 "params": params,
408 });
409
410 match client.protocol {
411 ControlPlaneProtocol::Tcp => {
412 call_control_plane_tcp(addr, server_name, &client.tls, &request).await
413 }
414 ControlPlaneProtocol::Quic => {
415 call_control_plane_quic(addr, server_name, &client.tls, &request).await
416 }
417 }
418}
419
420async fn call_control_plane_tcp(
421 addr: SocketAddr,
422 server_name: &str,
423 tls: &palladium_transport::TlsConfig,
424 request: &Value,
425) -> Result<Value, RemoteSpawnError> {
426 let config = tls
427 .client_config(server_name)
428 .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
429 let name = ServerName::try_from(server_name.to_string())
430 .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
431 let stream = tokio::net::TcpStream::connect(addr)
432 .await
433 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
434 let connector = TlsConnector::from(config);
435 let tls_stream = connector
436 .connect(name, stream)
437 .await
438 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
439
440 let (reader, mut writer) = tokio::io::split(tls_stream);
441 let mut body = request.to_string();
442 body.push('\n');
443 writer
444 .write_all(body.as_bytes())
445 .await
446 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
447 let mut line = String::new();
448 let mut reader = BufReader::new(reader);
449 reader
450 .read_line(&mut line)
451 .await
452 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
453 parse_control_plane_response(&line)
454}
455
456async fn call_control_plane_quic(
457 addr: SocketAddr,
458 server_name: &str,
459 tls: &palladium_transport::TlsConfig,
460 request: &Value,
461) -> Result<Value, RemoteSpawnError> {
462 ensure_rustls_provider();
463 let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
464 RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
465 })?)
466 .clone();
467 client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
468 let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
469
470 let client = s2n_quic::Client::builder()
471 .with_tls(client_tls)
472 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
473 .with_io("0.0.0.0:0")
474 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
475 .start()
476 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
477
478 let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
479 let mut conn = client
480 .connect(connect)
481 .await
482 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
483 let mut stream = conn
484 .open_bidirectional_stream()
485 .await
486 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
487
488 let mut body = request.to_string();
489 body.push('\n');
490 stream
491 .write_all(body.as_bytes())
492 .await
493 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
494 stream
495 .close()
496 .await
497 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
498
499 let mut buf = Vec::new();
500 stream
501 .read_to_end(&mut buf)
502 .await
503 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
504 let line = String::from_utf8_lossy(&buf);
505 parse_control_plane_response(&line)
506}
507
508fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
509 let response: Value = serde_json::from_str(line.trim())
510 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
511 if let Some(error) = response.get("error") {
512 let message = error
513 .get("message")
514 .and_then(Value::as_str)
515 .unwrap_or("unknown control-plane error");
516 return Err(RemoteSpawnError::ControlPlane(message.to_string()));
517 }
518 Ok(response.get("result").cloned().unwrap_or(Value::Null))
519}
520
521fn ensure_rustls_provider() {
522 static INIT: std::sync::Once = std::sync::Once::new();
523 INIT.call_once(|| {
524 #[cfg(feature = "aws-lc-rs")]
525 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
526 #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
527 let _ = rustls::crypto::ring::default_provider().install_default();
528 });
529}
530
531pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
532 ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
533}
534
535pub(crate) fn validate_config<R: Reactor>(
536 config: &BoundedClusterConfig,
537 registry: &Arc<ActorRegistry<R>>,
538) -> Result<(), ClusterError> {
539 let local_engine_id = config.transport.engine_id();
540 let mut seen_peers = Vec::new();
541 for peer in &config.peers {
542 validate_peer_spec(peer, local_engine_id, &seen_peers)?;
543 seen_peers.push(peer.clone());
544 }
545
546 let mut seen_declared = Vec::new();
547 for declared in &config.declared_actors {
548 validate_declared_actor(
549 &declared.path,
550 &declared.owner,
551 local_engine_id,
552 &config.peers,
553 &seen_declared,
554 registry,
555 )?;
556 seen_declared.push(declared.clone());
557 }
558
559 let mut seen_roles = Vec::new();
560 for role in &config.roles {
561 validate_role(role, &config.peers, &seen_roles)?;
562 seen_roles.push(role.clone());
563 }
564
565 Ok(())
566}
567
568fn validate_peer_spec(
569 peer: &PeerSpec,
570 local_engine_id: &EngineId,
571 existing: &[PeerSpec],
572) -> Result<(), ClusterError> {
573 if &peer.engine_id == local_engine_id {
574 return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
575 }
576 if existing
577 .iter()
578 .any(|existing| existing.engine_id == peer.engine_id)
579 {
580 return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
581 }
582 Ok(())
583}
584
585fn validate_declared_actor<R: Reactor>(
586 path: &ActorPath,
587 owner: &EngineId,
588 local_engine_id: &EngineId,
589 peers: &[PeerSpec],
590 existing: &[DeclaredRemoteActor],
591 registry: &Arc<ActorRegistry<R>>,
592) -> Result<(), ClusterError> {
593 if owner == local_engine_id || registry.get_by_path(path).is_some() {
594 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
595 }
596 if existing.iter().any(|existing| existing.path == *path) {
597 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
598 }
599 if !peers.iter().any(|peer| peer.engine_id == *owner) {
600 return Err(ClusterError::UnknownPeerOwner {
601 path: path.clone(),
602 owner: owner.clone(),
603 });
604 }
605 Ok(())
606}
607
608fn validate_role(
609 role: &BoundedClusterRole,
610 peers: &[PeerSpec],
611 existing: &[BoundedClusterRole],
612) -> Result<(), ClusterError> {
613 if existing.iter().any(|existing| existing.name == role.name) {
614 return Err(ClusterError::DuplicateRole(role.name.clone()));
615 }
616 if !peers.iter().any(|peer| peer.engine_id == role.owner) {
617 return Err(ClusterError::UnknownRoleOwner {
618 role: role.name.clone(),
619 owner: role.owner.clone(),
620 });
621 }
622 Ok(())
623}
624
625#[cfg(test)]
626mod tests {
627 use super::{
628 validate_config, BoundedClusterConfig, BoundedClusterRole, BoundedTransportConfig,
629 ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError, RemoteSpawnSpec,
630 };
631 use crate::{Engine, EngineConfig};
632 use palladium_actor::{
633 Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
634 Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
635 };
636 use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
637 use rcgen::{
638 BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
639 KeyUsagePurpose,
640 };
641 use std::net::{SocketAddr, TcpListener, UdpSocket};
642 use std::sync::Arc;
643 use std::time::{Duration, Instant};
644
645 fn make_ca() -> (Certificate, KeyPair) {
646 let mut params = CertificateParams::default();
647 params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
648 params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
649 let ca_key = KeyPair::generate().expect("ca keypair");
650 let cert = params.self_signed(&ca_key).expect("ca cert");
651 (cert, ca_key)
652 }
653
654 fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
655 let keypair = KeyPair::generate().expect("leaf keypair");
656 let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
657 let mut params = params;
658 params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
659 params.extended_key_usages = vec![
660 ExtendedKeyUsagePurpose::ServerAuth,
661 ExtendedKeyUsagePurpose::ClientAuth,
662 ];
663 let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
664 let cert_der = cert.der().to_vec();
665 let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
666 let ca_der = ca.der().to_vec();
667 TlsConfig {
668 cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
669 private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
670 trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
671 }
672 }
673
674 fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
675 TcpTransportConfig {
676 engine_id: EngineId::new(engine_id),
677 listen_addr: "127.0.0.1:0".parse().unwrap(),
678 max_connections_per_peer: 1,
679 idle_timeout: Duration::from_secs(1),
680 send_buffer_size: 8,
681 nodelay: true,
682 tls,
683 send_delay: Duration::from_millis(0),
684 }
685 }
686
687 fn reserve_tcp_addr() -> SocketAddr {
688 let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
689 let addr = listener.local_addr().expect("tcp local addr");
690 drop(listener);
691 addr
692 }
693
694 fn reserve_udp_addr() -> SocketAddr {
695 let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
696 let addr = socket.local_addr().expect("udp local addr");
697 drop(socket);
698 addr
699 }
700
701 #[test]
702 fn bounded_cluster_config_rejects_duplicate_peers() {
703 let (ca, ca_key) = make_ca();
704 let engine = Engine::with_config(EngineConfig {
705 engine_id: EngineId::new("engine-a.example"),
706 ..Default::default()
707 });
708 let config = BoundedClusterConfig {
709 transport: BoundedTransportConfig::Tcp(tcp_config(
710 "engine-a.example",
711 make_tls_config("engine-a.example", &ca, &ca_key),
712 )),
713 peers: vec![
714 PeerSpec {
715 engine_id: EngineId::new("engine-b.example"),
716 addr: "127.0.0.1:4100".parse().unwrap(),
717 control_plane_addr: None,
718 server_name: None,
719 },
720 PeerSpec {
721 engine_id: EngineId::new("engine-b.example"),
722 addr: "127.0.0.1:4200".parse().unwrap(),
723 control_plane_addr: None,
724 server_name: None,
725 },
726 ],
727 declared_actors: Vec::new(),
728 roles: Vec::new(),
729 };
730
731 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
732 assert_eq!(
733 err,
734 ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
735 );
736 }
737
738 #[test]
739 fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
740 let (ca, ca_key) = make_ca();
741 let engine = Engine::with_config(EngineConfig {
742 engine_id: EngineId::new("engine-a.example"),
743 ..Default::default()
744 });
745 let config = BoundedClusterConfig {
746 transport: BoundedTransportConfig::Tcp(tcp_config(
747 "engine-a.example",
748 make_tls_config("engine-a.example", &ca, &ca_key),
749 )),
750 peers: vec![PeerSpec {
751 engine_id: EngineId::new("engine-b.example"),
752 addr: "127.0.0.1:4100".parse().unwrap(),
753 control_plane_addr: None,
754 server_name: None,
755 }],
756 declared_actors: vec![DeclaredRemoteActor {
757 path: ActorPath::parse("/user/missing-owner").unwrap(),
758 owner: EngineId::new("engine-c.example"),
759 }],
760 roles: Vec::new(),
761 };
762
763 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
764 assert_eq!(
765 err,
766 ClusterError::UnknownPeerOwner {
767 path: ActorPath::parse("/user/missing-owner").unwrap(),
768 owner: EngineId::new("engine-c.example"),
769 }
770 );
771 }
772
773 #[test]
774 fn bounded_cluster_config_rejects_duplicate_roles() {
775 let (ca, ca_key) = make_ca();
776 let engine = Engine::with_config(EngineConfig {
777 engine_id: EngineId::new("engine-a.example"),
778 ..Default::default()
779 });
780 let config = BoundedClusterConfig {
781 transport: BoundedTransportConfig::Tcp(tcp_config(
782 "engine-a.example",
783 make_tls_config("engine-a.example", &ca, &ca_key),
784 )),
785 peers: vec![PeerSpec {
786 engine_id: EngineId::new("engine-b.example"),
787 addr: "127.0.0.1:4100".parse().unwrap(),
788 control_plane_addr: None,
789 server_name: None,
790 }],
791 declared_actors: Vec::new(),
792 roles: vec![
793 BoundedClusterRole {
794 name: "context-service".to_string(),
795 owner: EngineId::new("engine-b.example"),
796 },
797 BoundedClusterRole {
798 name: "context-service".to_string(),
799 owner: EngineId::new("engine-b.example"),
800 },
801 ],
802 };
803
804 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
805 assert_eq!(
806 err,
807 ClusterError::DuplicateRole("context-service".to_string())
808 );
809 }
810
811 #[test]
812 fn bounded_cluster_config_rejects_unknown_role_owner() {
813 let (ca, ca_key) = make_ca();
814 let engine = Engine::with_config(EngineConfig {
815 engine_id: EngineId::new("engine-a.example"),
816 ..Default::default()
817 });
818 let config = BoundedClusterConfig {
819 transport: BoundedTransportConfig::Tcp(tcp_config(
820 "engine-a.example",
821 make_tls_config("engine-a.example", &ca, &ca_key),
822 )),
823 peers: vec![PeerSpec {
824 engine_id: EngineId::new("engine-b.example"),
825 addr: "127.0.0.1:4100".parse().unwrap(),
826 control_plane_addr: None,
827 server_name: None,
828 }],
829 declared_actors: Vec::new(),
830 roles: vec![BoundedClusterRole {
831 name: "gateway".to_string(),
832 owner: EngineId::new("engine-c.example"),
833 }],
834 };
835
836 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
837 assert_eq!(
838 err,
839 ClusterError::UnknownRoleOwner {
840 role: "gateway".to_string(),
841 owner: EngineId::new("engine-c.example"),
842 }
843 );
844 }
845
846 #[test]
847 fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
848 assert_eq!(
849 super::remote_spawn_wait_timeout(Duration::from_millis(25)),
850 Duration::from_secs(5)
851 );
852 assert_eq!(
853 super::remote_spawn_wait_timeout(Duration::from_secs(8)),
854 Duration::from_secs(8)
855 );
856 }
857
858 fn tcp_tests_available() -> bool {
859 if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
860 return false;
861 }
862 std::net::TcpListener::bind("127.0.0.1:0").is_ok()
863 }
864
865 fn quic_tests_available() -> bool {
866 rustls::crypto::CryptoProvider::get_default().is_some()
867 }
868
869 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
870 struct EchoRequest {
871 value: u64,
872 }
873
874 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
875 struct EchoResponse {
876 doubled: u64,
877 }
878
879 impl Message for EchoRequest {
880 type Response = EchoResponse;
881 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
882 }
883
884 struct EchoActor;
885
886 impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
887 fn on_message(
888 &mut self,
889 ctx: &mut ActorContext<R>,
890 envelope: &Envelope,
891 payload: MessagePayload,
892 ) -> Result<(), ActorError> {
893 let req = payload
894 .extract::<EchoRequest>()
895 .map_err(|_| ActorError::Handler)?;
896 let resp = EchoResponse {
897 doubled: req.value * 2,
898 };
899 let resp_env = envelope.response(0);
900 ctx.send_raw(resp_env, MessagePayload::local(resp))
901 .map_err(|_| ActorError::Handler)
902 }
903 }
904
905 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
906 struct CrashRequest;
907
908 impl Message for CrashRequest {
909 type Response = ();
910 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
911 }
912
913 struct SpawnedEchoActor;
914
915 impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
916 fn on_message(
917 &mut self,
918 ctx: &mut ActorContext<R>,
919 envelope: &Envelope,
920 payload: MessagePayload,
921 ) -> Result<(), ActorError> {
922 match envelope.type_tag {
923 EchoRequest::TYPE_TAG => {
924 let req = payload
925 .extract::<EchoRequest>()
926 .map_err(|_| ActorError::Handler)?;
927 let resp = EchoResponse {
928 doubled: req.value * 2,
929 };
930 let resp_env = envelope.response(0);
931 ctx.send_raw(resp_env, MessagePayload::local(resp))
932 .map_err(|_| ActorError::Handler)
933 }
934 CrashRequest::TYPE_TAG => Err(ActorError::Handler),
935 _ => Err(ActorError::Handler),
936 }
937 }
938 }
939
940 fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
941 Arc::new(|type_name: &str, _config: &[u8]| match type_name {
942 "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
943 _ => Err(format!("unknown remote spawn type: {type_name}")),
944 })
945 }
946
947 #[tokio::test]
948 async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
949 if !tcp_tests_available() {
950 eprintln!("skipping tcp integration tests: network bind not permitted");
951 return;
952 }
953
954 let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
955 let (ca, ca_key) = make_ca();
956 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
957 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
958
959 let mut engine_a = Engine::with_config(EngineConfig {
960 engine_id: EngineId::new("engine-a.example"),
961 ..Default::default()
962 });
963 let engine_b = Engine::with_config(EngineConfig {
964 engine_id: EngineId::new("engine-b.example"),
965 ..Default::default()
966 });
967
968 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
969 engine_a.add_user_actor(ChildSpec::new(
970 "cluster-echo",
971 RestartPolicy::Permanent,
972 ShutdownPolicy::Timeout(Duration::from_secs(1)),
973 ns,
974 move || Box::new(EchoActor),
975 ));
976
977 let handle_a = engine_a.handle();
978 let handle_b = engine_b.handle();
979 handle_a
980 .type_registry()
981 .register_remote_ask::<EchoRequest>();
982 handle_b
983 .type_registry()
984 .register_remote_ask::<EchoRequest>();
985
986 let cluster_a = handle_a
987 .attach_bounded_cluster(BoundedClusterConfig {
988 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
989 peers: Vec::new(),
990 declared_actors: Vec::new(),
991 roles: Vec::new(),
992 })
993 .await
994 .expect("attach bounded cluster a");
995 let tcp_a = cluster_a
996 .transport()
997 .as_tcp()
998 .cloned()
999 .expect("tcp transport a");
1000
1001 let cluster_b = handle_b
1002 .attach_bounded_cluster(BoundedClusterConfig {
1003 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1004 peers: vec![PeerSpec {
1005 engine_id: EngineId::new("engine-a.example"),
1006 addr: tcp_a.local_addr(),
1007 control_plane_addr: None,
1008 server_name: None,
1009 }],
1010 declared_actors: Vec::new(),
1011 roles: Vec::new(),
1012 })
1013 .await
1014 .expect("attach bounded cluster b");
1015 let tcp_b = cluster_b
1016 .transport()
1017 .as_tcp()
1018 .cloned()
1019 .expect("tcp transport b");
1020
1021 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1022 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1023 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1024
1025 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1026 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1027
1028 let canonical = AddrHash::new(&actor_path, 0);
1029 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1030 while !cluster_b.can_route(&actor_path) {
1031 if std::time::Instant::now() > deadline {
1032 panic!("bounded cluster route did not propagate");
1033 }
1034 tokio::time::sleep(Duration::from_millis(20)).await;
1035 }
1036 assert!(tcp_b.can_route(canonical));
1037
1038 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1039 let response = remote
1040 .ask(EchoRequest { value: 12 })
1041 .await
1042 .expect("bounded cluster ask");
1043 assert_eq!(response, EchoResponse { doubled: 24 });
1044
1045 shutdown_a_tx.send(()).ok();
1046 shutdown_b_tx.send(()).ok();
1047 }
1048
1049 #[tokio::test]
1050 async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1051 if !tcp_tests_available() {
1052 eprintln!("skipping tcp integration tests: network bind not permitted");
1053 return;
1054 }
1055
1056 let (ca, ca_key) = make_ca();
1057 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1058 let engine = Engine::with_config(EngineConfig {
1059 engine_id: EngineId::new("engine-a.example"),
1060 ..Default::default()
1061 });
1062 let handle = engine.handle();
1063
1064 let mut cluster = handle
1065 .attach_bounded_cluster(BoundedClusterConfig {
1066 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1067 peers: vec![PeerSpec {
1068 engine_id: EngineId::new("engine-b.example"),
1069 addr: "127.0.0.1:4100".parse().unwrap(),
1070 control_plane_addr: None,
1071 server_name: None,
1072 }],
1073 declared_actors: Vec::new(),
1074 roles: Vec::new(),
1075 })
1076 .await
1077 .expect("attach bounded cluster");
1078
1079 let path = ActorPath::parse("/user/declared-remote").unwrap();
1080 cluster
1081 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1082 .expect("declare remote actor");
1083
1084 assert!(cluster.can_route(&path));
1085 assert!(cluster
1086 .transport()
1087 .as_tcp()
1088 .expect("tcp transport")
1089 .can_route(AddrHash::new(&path, 0)));
1090 }
1091
1092 #[tokio::test]
1093 async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1094 if !tcp_tests_available() {
1095 eprintln!("skipping tcp integration tests: network bind not permitted");
1096 return;
1097 }
1098
1099 let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1100 let (ca, ca_key) = make_ca();
1101 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1102 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1103 let control_plane_b = reserve_tcp_addr();
1104
1105 let engine_a = Engine::with_config(EngineConfig {
1106 engine_id: EngineId::new("engine-a.example"),
1107 ..Default::default()
1108 });
1109 let engine_b = Engine::with_config(EngineConfig {
1110 engine_id: EngineId::new("engine-b.example"),
1111 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1112 control_plane_tls: Some(tls_b.clone()),
1113 actor_spawn: Some(bounded_spawn_handler()),
1114 ..Default::default()
1115 });
1116
1117 let handle_a = engine_a.handle();
1118 let handle_b = engine_b.handle();
1119 handle_a
1120 .type_registry()
1121 .register_remote_ask::<EchoRequest>();
1122 handle_b
1123 .type_registry()
1124 .register_remote_ask::<EchoRequest>();
1125
1126 let cluster_a = handle_a
1127 .attach_bounded_cluster(BoundedClusterConfig {
1128 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1129 peers: Vec::new(),
1130 declared_actors: Vec::new(),
1131 roles: Vec::new(),
1132 })
1133 .await
1134 .expect("attach bounded cluster a");
1135 let tcp_a = cluster_a
1136 .transport()
1137 .as_tcp()
1138 .cloned()
1139 .expect("tcp transport a");
1140
1141 let cluster_b = handle_b
1142 .attach_bounded_cluster(BoundedClusterConfig {
1143 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1144 peers: vec![PeerSpec {
1145 engine_id: EngineId::new("engine-a.example"),
1146 addr: tcp_a.local_addr(),
1147 control_plane_addr: None,
1148 server_name: None,
1149 }],
1150 declared_actors: Vec::new(),
1151 roles: Vec::new(),
1152 })
1153 .await
1154 .expect("attach bounded cluster b");
1155 let tcp_b = cluster_b
1156 .transport()
1157 .as_tcp()
1158 .cloned()
1159 .expect("tcp transport b");
1160
1161 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1162 let mut cluster_a = cluster_a;
1163 cluster_a
1164 .add_peer(PeerSpec {
1165 engine_id: EngineId::new("engine-b.example"),
1166 addr: tcp_b.local_addr(),
1167 control_plane_addr: Some(control_plane_b),
1168 server_name: None,
1169 })
1170 .expect("add peer with control plane");
1171
1172 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1173 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1174
1175 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1176 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1177 tokio::time::sleep(Duration::from_millis(150)).await;
1178
1179 let remote = cluster_a
1180 .spawn_remote::<EchoRequest>(
1181 EngineId::new("engine-b.example"),
1182 RemoteSpawnSpec {
1183 path: actor_path.clone(),
1184 type_name: "bounded-echo".to_string(),
1185 config: None,
1186 },
1187 )
1188 .await
1189 .expect("spawn remote actor");
1190
1191 let response = remote
1192 .ask(EchoRequest { value: 21 })
1193 .await
1194 .expect("bounded remote spawn ask");
1195 assert_eq!(response, EchoResponse { doubled: 42 });
1196
1197 let duplicate = cluster_a
1198 .spawn_remote::<EchoRequest>(
1199 EngineId::new("engine-b.example"),
1200 RemoteSpawnSpec {
1201 path: actor_path.clone(),
1202 type_name: "bounded-echo".to_string(),
1203 config: None,
1204 },
1205 )
1206 .await
1207 .expect_err("duplicate remote spawn should fail");
1208 assert!(
1209 matches!(duplicate, RemoteSpawnError::ControlPlane(ref message) if message.contains("actor already running")),
1210 "unexpected duplicate error: {duplicate:?}"
1211 );
1212
1213 let slot = handle_b
1214 .registry
1215 .get_by_path(&actor_path)
1216 .expect("spawned actor registered on host");
1217 handle_b.registry.increment_restart_count(slot.addr);
1218 slot.ctrl_tx
1219 .send(crate::common::LifecycleSignal::Stop(
1220 palladium_actor::StopReason::Killed,
1221 ))
1222 .ok();
1223
1224 let restart_deadline = Instant::now() + Duration::from_secs(5);
1225 loop {
1226 match remote.ask(EchoRequest { value: 7 }).await {
1227 Ok(response) => {
1228 assert_eq!(response, EchoResponse { doubled: 14 });
1229 break;
1230 }
1231 Err(_) if Instant::now() <= restart_deadline => {
1232 tokio::time::sleep(Duration::from_millis(25)).await;
1233 }
1234 Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1235 }
1236 }
1237
1238 let unknown = cluster_a
1239 .spawn_remote::<EchoRequest>(
1240 EngineId::new("engine-c.example"),
1241 RemoteSpawnSpec {
1242 path: ActorPath::parse("/user/unknown-target").unwrap(),
1243 type_name: "bounded-echo".to_string(),
1244 config: None,
1245 },
1246 )
1247 .await
1248 .expect_err("unknown target should fail");
1249 assert_eq!(
1250 unknown,
1251 RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1252 );
1253
1254 shutdown_a_tx.send(()).ok();
1255 shutdown_b_tx.send(()).ok();
1256 }
1257
1258 #[tokio::test]
1259 async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1260 if !tcp_tests_available() {
1261 eprintln!("skipping tcp integration tests: network bind not permitted");
1262 return;
1263 }
1264
1265 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1266 let (ca, ca_key) = make_ca();
1267 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1268 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1269 let control_plane_b = reserve_tcp_addr();
1270
1271 let engine_a = Engine::with_config(EngineConfig {
1272 engine_id: EngineId::new("engine-a.example"),
1273 ..Default::default()
1274 });
1275 let engine_b = Engine::with_config(EngineConfig {
1276 engine_id: EngineId::new("engine-b.example"),
1277 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1278 control_plane_tls: Some(tls_b.clone()),
1279 actor_spawn: Some(bounded_spawn_handler()),
1280 ..Default::default()
1281 });
1282
1283 let handle_a = engine_a.handle();
1284 let handle_b = engine_b.handle();
1285 handle_a
1286 .type_registry()
1287 .register_remote_ask::<EchoRequest>();
1288 handle_b
1289 .type_registry()
1290 .register_remote_ask::<EchoRequest>();
1291
1292 let cluster_b = handle_b
1293 .attach_bounded_cluster(BoundedClusterConfig {
1294 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1295 peers: Vec::new(),
1296 declared_actors: Vec::new(),
1297 roles: Vec::new(),
1298 })
1299 .await
1300 .expect("attach bounded cluster b");
1301 let tcp_b = cluster_b
1302 .transport()
1303 .as_tcp()
1304 .cloned()
1305 .expect("tcp transport b");
1306
1307 let cluster_a = handle_a
1308 .attach_bounded_cluster(BoundedClusterConfig {
1309 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1310 peers: vec![PeerSpec {
1311 engine_id: EngineId::new("engine-b.example"),
1312 addr: tcp_b.local_addr(),
1313 control_plane_addr: Some(control_plane_b),
1314 server_name: None,
1315 }],
1316 declared_actors: Vec::new(),
1317 roles: vec![BoundedClusterRole {
1318 name: "context-service".to_string(),
1319 owner: EngineId::new("engine-b.example"),
1320 }],
1321 })
1322 .await
1323 .expect("attach bounded cluster a");
1324 let tcp_a = cluster_a
1325 .transport()
1326 .as_tcp()
1327 .cloned()
1328 .expect("tcp transport a");
1329
1330 let mut cluster_b = cluster_b;
1331 cluster_b
1332 .add_peer(PeerSpec {
1333 engine_id: EngineId::new("engine-a.example"),
1334 addr: tcp_a.local_addr(),
1335 control_plane_addr: None,
1336 server_name: None,
1337 })
1338 .expect("add peer a to cluster b");
1339
1340 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1341 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1342
1343 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1344 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1345 tokio::time::sleep(Duration::from_millis(150)).await;
1346
1347 let remote = cluster_a
1348 .spawn_remote_on_role::<EchoRequest>(
1349 "context-service",
1350 RemoteSpawnSpec {
1351 path: actor_path.clone(),
1352 type_name: "bounded-echo".to_string(),
1353 config: None,
1354 },
1355 )
1356 .await
1357 .expect("spawn remote actor by role over tcp");
1358
1359 let response = remote
1360 .ask(EchoRequest { value: 11 })
1361 .await
1362 .expect("bounded remote spawn ask over tcp by role");
1363 assert_eq!(response, EchoResponse { doubled: 22 });
1364
1365 let unknown_role = cluster_a
1366 .spawn_remote_on_role::<EchoRequest>(
1367 "missing-role",
1368 RemoteSpawnSpec {
1369 path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1370 type_name: "bounded-echo".to_string(),
1371 config: None,
1372 },
1373 )
1374 .await
1375 .expect_err("unknown role should fail");
1376 assert_eq!(
1377 unknown_role,
1378 RemoteSpawnError::UnknownRole("missing-role".to_string())
1379 );
1380
1381 shutdown_a_tx.send(()).ok();
1382 shutdown_b_tx.send(()).ok();
1383 }
1384
1385 #[tokio::test]
1386 async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
1387 if !tcp_tests_available() {
1388 eprintln!("skipping tcp integration tests: network bind not permitted");
1389 return;
1390 }
1391
1392 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
1393 let (ca, ca_key) = make_ca();
1394 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1395 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1396 let control_plane_b = reserve_tcp_addr();
1397
1398 let engine_a = Engine::with_config(EngineConfig {
1399 engine_id: EngineId::new("engine-a.example"),
1400 ..Default::default()
1401 });
1402 let engine_b = Engine::with_config(EngineConfig {
1403 engine_id: EngineId::new("engine-b.example"),
1404 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1405 control_plane_tls: Some(tls_b.clone()),
1406 actor_spawn: Some(bounded_spawn_handler()),
1407 ..Default::default()
1408 });
1409
1410 let handle_a = engine_a.handle();
1411 let handle_b = engine_b.handle();
1412 handle_a
1413 .type_registry()
1414 .register_remote_ask::<EchoRequest>();
1415 handle_b
1416 .type_registry()
1417 .register_remote_ask::<EchoRequest>();
1418
1419 let cluster_b = handle_b
1420 .attach_bounded_cluster(BoundedClusterConfig {
1421 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1422 peers: Vec::new(),
1423 declared_actors: Vec::new(),
1424 roles: Vec::new(),
1425 })
1426 .await
1427 .expect("attach bounded cluster b");
1428 let tcp_b = cluster_b
1429 .transport()
1430 .as_tcp()
1431 .cloned()
1432 .expect("tcp transport b");
1433
1434 let cluster_a = handle_a
1435 .attach_bounded_cluster(BoundedClusterConfig {
1436 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1437 peers: vec![PeerSpec {
1438 engine_id: EngineId::new("engine-b.example"),
1439 addr: tcp_b.local_addr(),
1440 control_plane_addr: Some(control_plane_b),
1441 server_name: None,
1442 }],
1443 declared_actors: Vec::new(),
1444 roles: vec![BoundedClusterRole {
1445 name: "context-service".to_string(),
1446 owner: EngineId::new("engine-b.example"),
1447 }],
1448 })
1449 .await
1450 .expect("attach bounded cluster a");
1451 let tcp_a = cluster_a
1452 .transport()
1453 .as_tcp()
1454 .cloned()
1455 .expect("tcp transport a");
1456
1457 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1458 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1459
1460 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1461 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1462
1463 let _ = tcp_a;
1467
1468 let remote = cluster_a
1469 .spawn_remote_on_role::<EchoRequest>(
1470 "context-service",
1471 RemoteSpawnSpec {
1472 path: actor_path.clone(),
1473 type_name: "bounded-echo".to_string(),
1474 config: None,
1475 },
1476 )
1477 .await
1478 .expect("spawn remote actor by role over tcp without reverse declaration");
1479
1480 let response = remote
1481 .ask(EchoRequest { value: 13 })
1482 .await
1483 .expect("bounded remote spawn ask over tcp without reverse declaration");
1484 assert_eq!(response, EchoResponse { doubled: 26 });
1485
1486 shutdown_a_tx.send(()).ok();
1487 shutdown_b_tx.send(()).ok();
1488 }
1489
1490 fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
1491 QuicTransportConfig {
1492 engine_id: EngineId::new(engine_id),
1493 listen_addr: "127.0.0.1:0".parse().unwrap(),
1494 send_buffer_size: 1024,
1495 idle_timeout: Duration::from_secs(2),
1496 tls,
1497 }
1498 }
1499
1500 #[tokio::test]
1501 async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
1502 if !quic_tests_available() {
1503 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1504 return;
1505 }
1506
1507 let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
1508 let (ca, ca_key) = make_ca();
1509 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1510 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1511
1512 let mut engine_a = Engine::with_config(EngineConfig {
1513 engine_id: EngineId::new("engine-a.example"),
1514 ..Default::default()
1515 });
1516 let engine_b = Engine::with_config(EngineConfig {
1517 engine_id: EngineId::new("engine-b.example"),
1518 ..Default::default()
1519 });
1520
1521 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1522 engine_a.add_user_actor(ChildSpec::new(
1523 "cluster-echo-quic",
1524 RestartPolicy::Permanent,
1525 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1526 ns,
1527 move || Box::new(EchoActor),
1528 ));
1529
1530 let handle_a = engine_a.handle();
1531 let handle_b = engine_b.handle();
1532 handle_a
1533 .type_registry()
1534 .register_remote_ask::<EchoRequest>();
1535 handle_b
1536 .type_registry()
1537 .register_remote_ask::<EchoRequest>();
1538
1539 let cluster_a = match handle_a
1540 .attach_bounded_cluster(BoundedClusterConfig {
1541 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1542 peers: Vec::new(),
1543 declared_actors: Vec::new(),
1544 roles: vec![BoundedClusterRole {
1545 name: "context-service".to_string(),
1546 owner: EngineId::new("engine-b.example"),
1547 }],
1548 })
1549 .await
1550 {
1551 Ok(cluster) => cluster,
1552 Err(ClusterError::Transport(err)) => {
1553 eprintln!("skipping quic bounded-cluster test: {err:?}");
1554 return;
1555 }
1556 Err(err) => panic!("attach bounded cluster a: {err:?}"),
1557 };
1558 let quic_a = cluster_a
1559 .transport()
1560 .as_quic()
1561 .cloned()
1562 .expect("quic transport a");
1563
1564 let cluster_b = match handle_b
1565 .attach_bounded_cluster(BoundedClusterConfig {
1566 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1567 peers: vec![PeerSpec {
1568 engine_id: EngineId::new("engine-a.example"),
1569 addr: quic_a.local_addr(),
1570 control_plane_addr: None,
1571 server_name: None,
1572 }],
1573 declared_actors: Vec::new(),
1574 roles: Vec::new(),
1575 })
1576 .await
1577 {
1578 Ok(cluster) => cluster,
1579 Err(ClusterError::Transport(err)) => {
1580 eprintln!("skipping quic bounded-cluster test: {err:?}");
1581 return;
1582 }
1583 Err(err) => panic!("attach bounded cluster b: {err:?}"),
1584 };
1585 let quic_b = cluster_b
1586 .transport()
1587 .as_quic()
1588 .cloned()
1589 .expect("quic transport b");
1590
1591 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1592 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1593 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1594
1595 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1596 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1597
1598 let canonical = AddrHash::new(&actor_path, 0);
1599 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1600 while !cluster_b.can_route(&actor_path) {
1601 if std::time::Instant::now() > deadline {
1602 panic!("bounded quic cluster route did not propagate");
1603 }
1604 tokio::time::sleep(Duration::from_millis(20)).await;
1605 }
1606 assert!(quic_b.can_route(canonical));
1607
1608 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1609 let response = remote
1610 .ask(EchoRequest { value: 15 })
1611 .await
1612 .expect("bounded cluster ask over quic");
1613 assert_eq!(response, EchoResponse { doubled: 30 });
1614
1615 shutdown_a_tx.send(()).ok();
1616 shutdown_b_tx.send(()).ok();
1617 }
1618
1619 #[tokio::test]
1620 async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
1621 if !quic_tests_available() {
1622 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1623 return;
1624 }
1625
1626 let (ca, ca_key) = make_ca();
1627 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1628 let engine = Engine::with_config(EngineConfig {
1629 engine_id: EngineId::new("engine-a.example"),
1630 ..Default::default()
1631 });
1632 let handle = engine.handle();
1633
1634 let mut cluster = match handle
1635 .attach_bounded_cluster(BoundedClusterConfig {
1636 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
1637 peers: vec![PeerSpec {
1638 engine_id: EngineId::new("engine-b.example"),
1639 addr: "127.0.0.1:4100".parse().unwrap(),
1640 control_plane_addr: None,
1641 server_name: None,
1642 }],
1643 declared_actors: Vec::new(),
1644 roles: Vec::new(),
1645 })
1646 .await
1647 {
1648 Ok(cluster) => cluster,
1649 Err(ClusterError::Transport(err)) => {
1650 eprintln!("skipping quic bounded-cluster test: {err:?}");
1651 return;
1652 }
1653 Err(err) => panic!("attach bounded cluster: {err:?}"),
1654 };
1655
1656 let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
1657 cluster
1658 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1659 .expect("declare remote actor");
1660
1661 assert!(cluster.can_route(&path));
1662 assert!(cluster
1663 .transport()
1664 .as_quic()
1665 .expect("quic transport")
1666 .can_route(AddrHash::new(&path, 0)));
1667 }
1668
1669 #[tokio::test]
1670 async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
1671 if !quic_tests_available() {
1672 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1673 return;
1674 }
1675
1676 let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
1677 let (ca, ca_key) = make_ca();
1678 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1679 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1680 let control_plane_b = reserve_udp_addr();
1681
1682 let engine_a = Engine::with_config(EngineConfig {
1683 engine_id: EngineId::new("engine-a.example"),
1684 ..Default::default()
1685 });
1686 let engine_b = Engine::with_config(EngineConfig {
1687 engine_id: EngineId::new("engine-b.example"),
1688 control_plane_quic_addr: Some(control_plane_b.to_string()),
1689 control_plane_tls: Some(tls_b.clone()),
1690 actor_spawn: Some(bounded_spawn_handler()),
1691 ..Default::default()
1692 });
1693
1694 let handle_a = engine_a.handle();
1695 let handle_b = engine_b.handle();
1696 handle_a
1697 .type_registry()
1698 .register_remote_ask::<EchoRequest>();
1699 handle_b
1700 .type_registry()
1701 .register_remote_ask::<EchoRequest>();
1702
1703 let cluster_a = match handle_a
1704 .attach_bounded_cluster(BoundedClusterConfig {
1705 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1706 peers: Vec::new(),
1707 declared_actors: Vec::new(),
1708 roles: Vec::new(),
1709 })
1710 .await
1711 {
1712 Ok(cluster) => cluster,
1713 Err(ClusterError::Transport(err)) => {
1714 eprintln!("skipping quic bounded-cluster test: {err:?}");
1715 return;
1716 }
1717 Err(err) => panic!("attach bounded cluster a: {err:?}"),
1718 };
1719 let quic_a = cluster_a
1720 .transport()
1721 .as_quic()
1722 .cloned()
1723 .expect("quic transport a");
1724
1725 let cluster_b = match handle_b
1726 .attach_bounded_cluster(BoundedClusterConfig {
1727 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1728 peers: vec![PeerSpec {
1729 engine_id: EngineId::new("engine-a.example"),
1730 addr: quic_a.local_addr(),
1731 control_plane_addr: None,
1732 server_name: None,
1733 }],
1734 declared_actors: Vec::new(),
1735 roles: Vec::new(),
1736 })
1737 .await
1738 {
1739 Ok(cluster) => cluster,
1740 Err(ClusterError::Transport(err)) => {
1741 eprintln!("skipping quic bounded-cluster test: {err:?}");
1742 return;
1743 }
1744 Err(err) => panic!("attach bounded cluster b: {err:?}"),
1745 };
1746 let quic_b = cluster_b
1747 .transport()
1748 .as_quic()
1749 .cloned()
1750 .expect("quic transport b");
1751
1752 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1753 let mut cluster_a = cluster_a;
1754 cluster_a
1755 .add_peer(PeerSpec {
1756 engine_id: EngineId::new("engine-b.example"),
1757 addr: quic_b.local_addr(),
1758 control_plane_addr: Some(control_plane_b),
1759 server_name: None,
1760 })
1761 .expect("add quic peer with control plane");
1762
1763 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1764 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1765
1766 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1767 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1768 tokio::time::sleep(Duration::from_millis(150)).await;
1769
1770 let remote = cluster_a
1771 .spawn_remote_on_role::<EchoRequest>(
1772 "context-service",
1773 RemoteSpawnSpec {
1774 path: actor_path.clone(),
1775 type_name: "bounded-echo".to_string(),
1776 config: None,
1777 },
1778 )
1779 .await
1780 .expect("spawn remote actor over quic by role");
1781
1782 let response = remote
1783 .ask(EchoRequest { value: 9 })
1784 .await
1785 .expect("bounded remote spawn ask over quic by role");
1786 assert_eq!(response, EchoResponse { doubled: 18 });
1787
1788 let unknown_role = cluster_a
1789 .spawn_remote_on_role::<EchoRequest>(
1790 "missing-role",
1791 RemoteSpawnSpec {
1792 path: ActorPath::parse("/user/missing-role").unwrap(),
1793 type_name: "bounded-echo".to_string(),
1794 config: None,
1795 },
1796 )
1797 .await
1798 .expect_err("unknown role should fail");
1799 assert_eq!(
1800 unknown_role,
1801 RemoteSpawnError::UnknownRole("missing-role".to_string())
1802 );
1803
1804 shutdown_a_tx.send(()).ok();
1805 shutdown_b_tx.send(()).ok();
1806 }
1807}