1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
10use palladium_transport::network::{Network, TokioNetwork};
11use palladium_transport::{
12 QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport, TransportError,
13};
14use rustls::pki_types::ServerName;
15use serde_json::{json, Value};
16use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
17use tokio_rustls::TlsConnector;
18
19#[derive(Clone)]
20pub struct BoundedClusterConfig {
21 pub transport: BoundedTransportConfig,
22 pub peers: Vec<PeerSpec>,
23 pub declared_actors: Vec<DeclaredRemoteActor>,
24 pub roles: Vec<BoundedClusterRole>,
25}
26
27#[derive(Clone)]
28pub enum BoundedTransportConfig {
29 Tcp(TcpTransportConfig),
30 Quic(QuicTransportConfig),
31}
32
33impl BoundedTransportConfig {
34 pub fn engine_id(&self) -> &EngineId {
35 match self {
36 Self::Tcp(config) => &config.engine_id,
37 Self::Quic(config) => &config.engine_id,
38 }
39 }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct PeerSpec {
44 pub engine_id: EngineId,
45 pub addr: SocketAddr,
46 pub control_plane_addr: Option<SocketAddr>,
47 pub server_name: Option<String>,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct DeclaredRemoteActor {
52 pub path: ActorPath,
53 pub owner: EngineId,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct BoundedClusterRole {
58 pub name: String,
59 pub owner: EngineId,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum ClusterError {
64 DuplicatePeer(EngineId),
65 LocalPeerDeclared(EngineId),
66 DuplicateDeclaredActor(ActorPath),
67 LocalActorDeclaredRemote(ActorPath),
68 UnknownPeerOwner { path: ActorPath, owner: EngineId },
69 DuplicateRole(String),
70 UnknownRoleOwner { role: String, owner: EngineId },
71 Transport(TransportError),
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub struct RemoteSpawnSpec {
76 pub path: ActorPath,
77 pub type_name: String,
78 pub config: Option<Value>,
79}
80
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum RemoteSpawnError {
83 UnknownPeer(EngineId),
84 UnknownRole(String),
85 MissingControlPlaneEndpoint(EngineId),
86 InvalidServerName(String),
87 ControlPlane(String),
88 SpawnTimedOut(ActorPath),
89}
90
91#[derive(Clone)]
92pub(crate) enum ControlPlaneProtocol {
93 Tcp,
94 Quic,
95}
96
97#[derive(Clone)]
98pub(crate) struct ControlPlaneClientConfig {
99 pub(crate) protocol: ControlPlaneProtocol,
100 pub(crate) tls: palladium_transport::TlsConfig,
101 pub(crate) wait_timeout: Duration,
102}
103
104impl From<TransportError> for ClusterError {
105 fn from(value: TransportError) -> Self {
106 Self::Transport(value)
107 }
108}
109
110#[derive(Clone)]
111pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
112 Tcp(Arc<TcpTransport<R, N>>),
113 Quic(Arc<QuicTransport<R, N>>),
114}
115
116impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
117 pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
118 match self {
119 Self::Tcp(transport) => Some(transport),
120 Self::Quic(_) => None,
121 }
122 }
123
124 pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
125 match self {
126 Self::Tcp(_) => None,
127 Self::Quic(transport) => Some(transport),
128 }
129 }
130
131 pub fn add_peer(&self, peer: &PeerSpec) {
132 match self {
133 Self::Tcp(transport) => {
134 if let Some(server_name) = &peer.server_name {
135 transport.add_peer_with_server_name(
136 peer.engine_id.clone(),
137 peer.addr,
138 server_name.clone(),
139 );
140 } else {
141 transport.add_peer(peer.engine_id.clone(), peer.addr);
142 }
143 }
144 Self::Quic(transport) => {
145 if let Some(server_name) = &peer.server_name {
146 transport.add_peer_with_server_name(
147 peer.engine_id.clone(),
148 peer.addr,
149 server_name.clone(),
150 );
151 } else {
152 transport.add_peer(peer.engine_id.clone(), peer.addr);
153 }
154 }
155 }
156 }
157
158 pub fn declare_remote_path(
159 &self,
160 owner: EngineId,
161 path: &ActorPath,
162 ) -> Result<(), TransportError> {
163 match self {
164 Self::Tcp(transport) => transport.declare_remote_path(owner, path),
165 Self::Quic(transport) => transport.declare_remote_path(owner, path),
166 }
167 }
168
169 pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
170 match self {
171 Self::Tcp(transport) => transport.undeclare_remote_path(path),
172 Self::Quic(transport) => transport.undeclare_remote_path(path),
173 }
174 }
175
176 pub fn can_route(&self, destination: AddrHash) -> bool {
177 match self {
178 Self::Tcp(transport) => transport.can_route(destination),
179 Self::Quic(transport) => transport.can_route(destination),
180 }
181 }
182}
183
184#[derive(Clone)]
185pub struct BoundedClusterHandle<
186 R: Reactor = TokioReactor,
187 N: Network = TokioNetwork,
188 F: FileSystem = TokioFileSystem,
189> {
190 engine: EngineHandle<R, N, F>,
191 local_engine_id: EngineId,
192 control_plane: ControlPlaneClientConfig,
193 transport: BoundedTransportHandle<R, N>,
194 peers: Vec<PeerSpec>,
195 declared_actors: Vec<DeclaredRemoteActor>,
196 roles: Vec<BoundedClusterRole>,
197 registry: Arc<ActorRegistry<R>>,
198}
199
200impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
201 pub(crate) fn new(
202 engine: EngineHandle<R, N, F>,
203 local_engine_id: EngineId,
204 control_plane: ControlPlaneClientConfig,
205 transport: BoundedTransportHandle<R, N>,
206 peers: Vec<PeerSpec>,
207 roles: Vec<BoundedClusterRole>,
208 registry: Arc<ActorRegistry<R>>,
209 ) -> Self {
210 Self {
211 engine,
212 local_engine_id,
213 control_plane,
214 transport,
215 peers,
216 declared_actors: Vec::new(),
217 roles,
218 registry,
219 }
220 }
221
222 pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
223 &self.transport
224 }
225
226 pub fn peers(&self) -> &[PeerSpec] {
227 &self.peers
228 }
229
230 pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
231 &self.declared_actors
232 }
233
234 pub fn roles(&self) -> &[BoundedClusterRole] {
235 &self.roles
236 }
237
238 pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
239 validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
240 self.transport.add_peer(&peer);
241 self.peers.push(peer);
242 Ok(())
243 }
244
245 pub fn declare_remote_actor(
246 &mut self,
247 owner: EngineId,
248 path: ActorPath,
249 ) -> Result<(), ClusterError> {
250 validate_declared_actor(
251 &path,
252 &owner,
253 &self.local_engine_id,
254 &self.peers,
255 &self.declared_actors,
256 &self.registry,
257 )?;
258 self.transport.declare_remote_path(owner.clone(), &path)?;
259 self.declared_actors
260 .push(DeclaredRemoteActor { path, owner });
261 Ok(())
262 }
263
264 pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
265 self.transport.undeclare_remote_path(path)?;
266 self.declared_actors
267 .retain(|declared| &declared.path != path);
268 Ok(())
269 }
270
271 pub fn can_route(&self, path: &ActorPath) -> bool {
272 self.transport.can_route(AddrHash::new(path, 0))
273 }
274
275 pub async fn spawn_remote_on_role<M>(
276 &self,
277 role: &str,
278 spec: RemoteSpawnSpec,
279 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
280 where
281 M: RemoteMessage,
282 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
283 R: Send + Sync,
284 {
285 let owner = self
286 .roles
287 .iter()
288 .find(|declared| declared.name == role)
289 .map(|declared| declared.owner.clone())
290 .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
291 self.spawn_remote::<M>(owner, spec).await
292 }
293
294 pub async fn spawn_remote<M>(
295 &self,
296 target: EngineId,
297 spec: RemoteSpawnSpec,
298 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
299 where
300 M: RemoteMessage,
301 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
302 R: Send + Sync,
303 {
304 let peer = self
305 .peers
306 .iter()
307 .find(|peer| peer.engine_id == target)
308 .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
309 let control_plane_addr = peer
310 .control_plane_addr
311 .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
312 let server_name = peer
313 .server_name
314 .clone()
315 .unwrap_or_else(|| target.as_str().to_string());
316
317 call_actor_spawn(&self.control_plane, control_plane_addr, &server_name, &spec).await?;
318
319 let deadline = Instant::now() + self.control_plane.wait_timeout;
320 while Instant::now() <= deadline {
321 if self.can_route(&spec.path) {
322 return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
323 }
324 tokio::time::sleep(Duration::from_millis(20)).await;
325 }
326
327 Err(RemoteSpawnError::SpawnTimedOut(spec.path))
328 }
329}
330
331async fn call_actor_spawn(
332 client: &ControlPlaneClientConfig,
333 addr: SocketAddr,
334 server_name: &str,
335 spec: &RemoteSpawnSpec,
336) -> Result<(), RemoteSpawnError> {
337 let params = match &spec.config {
338 Some(config) => json!({
339 "path": spec.path.as_str(),
340 "type_name": spec.type_name,
341 "config": config,
342 }),
343 None => json!({
344 "path": spec.path.as_str(),
345 "type_name": spec.type_name,
346 }),
347 };
348 let request = json!({
349 "id": 1,
350 "method": "actor.spawn",
351 "params": params,
352 });
353
354 match client.protocol {
355 ControlPlaneProtocol::Tcp => {
356 call_control_plane_tcp(addr, server_name, &client.tls, &request).await?
357 }
358 ControlPlaneProtocol::Quic => {
359 call_control_plane_quic(addr, server_name, &client.tls, &request).await?
360 }
361 };
362 Ok(())
363}
364
365async fn call_control_plane_tcp(
366 addr: SocketAddr,
367 server_name: &str,
368 tls: &palladium_transport::TlsConfig,
369 request: &Value,
370) -> Result<Value, RemoteSpawnError> {
371 let config = tls
372 .client_config(server_name)
373 .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
374 let name = ServerName::try_from(server_name.to_string())
375 .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
376 let stream = tokio::net::TcpStream::connect(addr)
377 .await
378 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
379 let connector = TlsConnector::from(config);
380 let tls_stream = connector
381 .connect(name, stream)
382 .await
383 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
384
385 let (reader, mut writer) = tokio::io::split(tls_stream);
386 let mut body = request.to_string();
387 body.push('\n');
388 writer
389 .write_all(body.as_bytes())
390 .await
391 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
392 let mut line = String::new();
393 let mut reader = BufReader::new(reader);
394 reader
395 .read_line(&mut line)
396 .await
397 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
398 parse_control_plane_response(&line)
399}
400
401async fn call_control_plane_quic(
402 addr: SocketAddr,
403 server_name: &str,
404 tls: &palladium_transport::TlsConfig,
405 request: &Value,
406) -> Result<Value, RemoteSpawnError> {
407 ensure_rustls_provider();
408 let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
409 RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
410 })?)
411 .clone();
412 client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
413 let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
414
415 let client = s2n_quic::Client::builder()
416 .with_tls(client_tls)
417 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
418 .with_io("0.0.0.0:0")
419 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
420 .start()
421 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
422
423 let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
424 let mut conn = client
425 .connect(connect)
426 .await
427 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
428 let mut stream = conn
429 .open_bidirectional_stream()
430 .await
431 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
432
433 let mut body = request.to_string();
434 body.push('\n');
435 stream
436 .write_all(body.as_bytes())
437 .await
438 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
439 stream
440 .close()
441 .await
442 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
443
444 let mut buf = Vec::new();
445 stream
446 .read_to_end(&mut buf)
447 .await
448 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
449 let line = String::from_utf8_lossy(&buf);
450 parse_control_plane_response(&line)
451}
452
453fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
454 let response: Value = serde_json::from_str(line.trim())
455 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
456 if let Some(error) = response.get("error") {
457 let message = error
458 .get("message")
459 .and_then(Value::as_str)
460 .unwrap_or("unknown control-plane error");
461 return Err(RemoteSpawnError::ControlPlane(message.to_string()));
462 }
463 Ok(response.get("result").cloned().unwrap_or(Value::Null))
464}
465
466fn ensure_rustls_provider() {
467 static INIT: std::sync::Once = std::sync::Once::new();
468 INIT.call_once(|| {
469 #[cfg(feature = "aws-lc-rs")]
470 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
471 #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
472 let _ = rustls::crypto::ring::default_provider().install_default();
473 });
474}
475
476pub(crate) fn validate_config<R: Reactor>(
477 config: &BoundedClusterConfig,
478 registry: &Arc<ActorRegistry<R>>,
479) -> Result<(), ClusterError> {
480 let local_engine_id = config.transport.engine_id();
481 let mut seen_peers = Vec::new();
482 for peer in &config.peers {
483 validate_peer_spec(peer, local_engine_id, &seen_peers)?;
484 seen_peers.push(peer.clone());
485 }
486
487 let mut seen_declared = Vec::new();
488 for declared in &config.declared_actors {
489 validate_declared_actor(
490 &declared.path,
491 &declared.owner,
492 local_engine_id,
493 &config.peers,
494 &seen_declared,
495 registry,
496 )?;
497 seen_declared.push(declared.clone());
498 }
499
500 let mut seen_roles = Vec::new();
501 for role in &config.roles {
502 validate_role(role, &config.peers, &seen_roles)?;
503 seen_roles.push(role.clone());
504 }
505
506 Ok(())
507}
508
509fn validate_peer_spec(
510 peer: &PeerSpec,
511 local_engine_id: &EngineId,
512 existing: &[PeerSpec],
513) -> Result<(), ClusterError> {
514 if &peer.engine_id == local_engine_id {
515 return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
516 }
517 if existing
518 .iter()
519 .any(|existing| existing.engine_id == peer.engine_id)
520 {
521 return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
522 }
523 Ok(())
524}
525
526fn validate_declared_actor<R: Reactor>(
527 path: &ActorPath,
528 owner: &EngineId,
529 local_engine_id: &EngineId,
530 peers: &[PeerSpec],
531 existing: &[DeclaredRemoteActor],
532 registry: &Arc<ActorRegistry<R>>,
533) -> Result<(), ClusterError> {
534 if owner == local_engine_id || registry.get_by_path(path).is_some() {
535 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
536 }
537 if existing.iter().any(|existing| existing.path == *path) {
538 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
539 }
540 if !peers.iter().any(|peer| peer.engine_id == *owner) {
541 return Err(ClusterError::UnknownPeerOwner {
542 path: path.clone(),
543 owner: owner.clone(),
544 });
545 }
546 Ok(())
547}
548
549fn validate_role(
550 role: &BoundedClusterRole,
551 peers: &[PeerSpec],
552 existing: &[BoundedClusterRole],
553) -> Result<(), ClusterError> {
554 if existing.iter().any(|existing| existing.name == role.name) {
555 return Err(ClusterError::DuplicateRole(role.name.clone()));
556 }
557 if !peers.iter().any(|peer| peer.engine_id == role.owner) {
558 return Err(ClusterError::UnknownRoleOwner {
559 role: role.name.clone(),
560 owner: role.owner.clone(),
561 });
562 }
563 Ok(())
564}
565
566#[cfg(test)]
567mod tests {
568 use super::{
569 validate_config, BoundedClusterConfig, BoundedClusterRole, BoundedTransportConfig,
570 ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError, RemoteSpawnSpec,
571 };
572 use crate::{Engine, EngineConfig};
573 use palladium_actor::{
574 Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
575 Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
576 };
577 use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
578 use rcgen::{
579 BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
580 KeyUsagePurpose,
581 };
582 use std::net::{SocketAddr, TcpListener, UdpSocket};
583 use std::sync::Arc;
584 use std::time::{Duration, Instant};
585
586 fn make_ca() -> (Certificate, KeyPair) {
587 let mut params = CertificateParams::default();
588 params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
589 params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
590 let ca_key = KeyPair::generate().expect("ca keypair");
591 let cert = params.self_signed(&ca_key).expect("ca cert");
592 (cert, ca_key)
593 }
594
595 fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
596 let keypair = KeyPair::generate().expect("leaf keypair");
597 let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
598 let mut params = params;
599 params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
600 params.extended_key_usages = vec![
601 ExtendedKeyUsagePurpose::ServerAuth,
602 ExtendedKeyUsagePurpose::ClientAuth,
603 ];
604 let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
605 let cert_der = cert.der().to_vec();
606 let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
607 let ca_der = ca.der().to_vec();
608 TlsConfig {
609 cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
610 private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
611 trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
612 }
613 }
614
615 fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
616 TcpTransportConfig {
617 engine_id: EngineId::new(engine_id),
618 listen_addr: "127.0.0.1:0".parse().unwrap(),
619 max_connections_per_peer: 1,
620 idle_timeout: Duration::from_secs(1),
621 send_buffer_size: 8,
622 nodelay: true,
623 tls,
624 send_delay: Duration::from_millis(0),
625 }
626 }
627
628 fn reserve_tcp_addr() -> SocketAddr {
629 let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
630 let addr = listener.local_addr().expect("tcp local addr");
631 drop(listener);
632 addr
633 }
634
635 fn reserve_udp_addr() -> SocketAddr {
636 let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
637 let addr = socket.local_addr().expect("udp local addr");
638 drop(socket);
639 addr
640 }
641
642 #[test]
643 fn bounded_cluster_config_rejects_duplicate_peers() {
644 let (ca, ca_key) = make_ca();
645 let engine = Engine::with_config(EngineConfig {
646 engine_id: EngineId::new("engine-a.example"),
647 ..Default::default()
648 });
649 let config = BoundedClusterConfig {
650 transport: BoundedTransportConfig::Tcp(tcp_config(
651 "engine-a.example",
652 make_tls_config("engine-a.example", &ca, &ca_key),
653 )),
654 peers: vec![
655 PeerSpec {
656 engine_id: EngineId::new("engine-b.example"),
657 addr: "127.0.0.1:4100".parse().unwrap(),
658 control_plane_addr: None,
659 server_name: None,
660 },
661 PeerSpec {
662 engine_id: EngineId::new("engine-b.example"),
663 addr: "127.0.0.1:4200".parse().unwrap(),
664 control_plane_addr: None,
665 server_name: None,
666 },
667 ],
668 declared_actors: Vec::new(),
669 roles: Vec::new(),
670 };
671
672 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
673 assert_eq!(
674 err,
675 ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
676 );
677 }
678
679 #[test]
680 fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
681 let (ca, ca_key) = make_ca();
682 let engine = Engine::with_config(EngineConfig {
683 engine_id: EngineId::new("engine-a.example"),
684 ..Default::default()
685 });
686 let config = BoundedClusterConfig {
687 transport: BoundedTransportConfig::Tcp(tcp_config(
688 "engine-a.example",
689 make_tls_config("engine-a.example", &ca, &ca_key),
690 )),
691 peers: vec![PeerSpec {
692 engine_id: EngineId::new("engine-b.example"),
693 addr: "127.0.0.1:4100".parse().unwrap(),
694 control_plane_addr: None,
695 server_name: None,
696 }],
697 declared_actors: vec![DeclaredRemoteActor {
698 path: ActorPath::parse("/user/missing-owner").unwrap(),
699 owner: EngineId::new("engine-c.example"),
700 }],
701 roles: Vec::new(),
702 };
703
704 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
705 assert_eq!(
706 err,
707 ClusterError::UnknownPeerOwner {
708 path: ActorPath::parse("/user/missing-owner").unwrap(),
709 owner: EngineId::new("engine-c.example"),
710 }
711 );
712 }
713
714 #[test]
715 fn bounded_cluster_config_rejects_duplicate_roles() {
716 let (ca, ca_key) = make_ca();
717 let engine = Engine::with_config(EngineConfig {
718 engine_id: EngineId::new("engine-a.example"),
719 ..Default::default()
720 });
721 let config = BoundedClusterConfig {
722 transport: BoundedTransportConfig::Tcp(tcp_config(
723 "engine-a.example",
724 make_tls_config("engine-a.example", &ca, &ca_key),
725 )),
726 peers: vec![PeerSpec {
727 engine_id: EngineId::new("engine-b.example"),
728 addr: "127.0.0.1:4100".parse().unwrap(),
729 control_plane_addr: None,
730 server_name: None,
731 }],
732 declared_actors: Vec::new(),
733 roles: vec![
734 BoundedClusterRole {
735 name: "context-service".to_string(),
736 owner: EngineId::new("engine-b.example"),
737 },
738 BoundedClusterRole {
739 name: "context-service".to_string(),
740 owner: EngineId::new("engine-b.example"),
741 },
742 ],
743 };
744
745 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
746 assert_eq!(
747 err,
748 ClusterError::DuplicateRole("context-service".to_string())
749 );
750 }
751
752 #[test]
753 fn bounded_cluster_config_rejects_unknown_role_owner() {
754 let (ca, ca_key) = make_ca();
755 let engine = Engine::with_config(EngineConfig {
756 engine_id: EngineId::new("engine-a.example"),
757 ..Default::default()
758 });
759 let config = BoundedClusterConfig {
760 transport: BoundedTransportConfig::Tcp(tcp_config(
761 "engine-a.example",
762 make_tls_config("engine-a.example", &ca, &ca_key),
763 )),
764 peers: vec![PeerSpec {
765 engine_id: EngineId::new("engine-b.example"),
766 addr: "127.0.0.1:4100".parse().unwrap(),
767 control_plane_addr: None,
768 server_name: None,
769 }],
770 declared_actors: Vec::new(),
771 roles: vec![BoundedClusterRole {
772 name: "gateway".to_string(),
773 owner: EngineId::new("engine-c.example"),
774 }],
775 };
776
777 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
778 assert_eq!(
779 err,
780 ClusterError::UnknownRoleOwner {
781 role: "gateway".to_string(),
782 owner: EngineId::new("engine-c.example"),
783 }
784 );
785 }
786
787 fn tcp_tests_available() -> bool {
788 if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
789 return false;
790 }
791 std::net::TcpListener::bind("127.0.0.1:0").is_ok()
792 }
793
794 fn quic_tests_available() -> bool {
795 rustls::crypto::CryptoProvider::get_default().is_some()
796 }
797
798 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
799 struct EchoRequest {
800 value: u64,
801 }
802
803 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
804 struct EchoResponse {
805 doubled: u64,
806 }
807
808 impl Message for EchoRequest {
809 type Response = EchoResponse;
810 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
811 }
812
813 struct EchoActor;
814
815 impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
816 fn on_message(
817 &mut self,
818 ctx: &mut ActorContext<R>,
819 envelope: &Envelope,
820 payload: MessagePayload,
821 ) -> Result<(), ActorError> {
822 let req = payload
823 .extract::<EchoRequest>()
824 .map_err(|_| ActorError::Handler)?;
825 let resp = EchoResponse {
826 doubled: req.value * 2,
827 };
828 let resp_env = envelope.response(0);
829 ctx.send_raw(resp_env, MessagePayload::local(resp))
830 .map_err(|_| ActorError::Handler)
831 }
832 }
833
834 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
835 struct CrashRequest;
836
837 impl Message for CrashRequest {
838 type Response = ();
839 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
840 }
841
842 struct SpawnedEchoActor;
843
844 impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
845 fn on_message(
846 &mut self,
847 ctx: &mut ActorContext<R>,
848 envelope: &Envelope,
849 payload: MessagePayload,
850 ) -> Result<(), ActorError> {
851 match envelope.type_tag {
852 EchoRequest::TYPE_TAG => {
853 let req = payload
854 .extract::<EchoRequest>()
855 .map_err(|_| ActorError::Handler)?;
856 let resp = EchoResponse {
857 doubled: req.value * 2,
858 };
859 let resp_env = envelope.response(0);
860 ctx.send_raw(resp_env, MessagePayload::local(resp))
861 .map_err(|_| ActorError::Handler)
862 }
863 CrashRequest::TYPE_TAG => Err(ActorError::Handler),
864 _ => Err(ActorError::Handler),
865 }
866 }
867 }
868
869 fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
870 Arc::new(|type_name: &str, _config: &[u8]| match type_name {
871 "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
872 _ => Err(format!("unknown remote spawn type: {type_name}")),
873 })
874 }
875
876 #[tokio::test]
877 async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
878 if !tcp_tests_available() {
879 eprintln!("skipping tcp integration tests: network bind not permitted");
880 return;
881 }
882
883 let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
884 let (ca, ca_key) = make_ca();
885 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
886 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
887
888 let mut engine_a = Engine::with_config(EngineConfig {
889 engine_id: EngineId::new("engine-a.example"),
890 ..Default::default()
891 });
892 let engine_b = Engine::with_config(EngineConfig {
893 engine_id: EngineId::new("engine-b.example"),
894 ..Default::default()
895 });
896
897 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
898 engine_a.add_user_actor(ChildSpec::new(
899 "cluster-echo",
900 RestartPolicy::Permanent,
901 ShutdownPolicy::Timeout(Duration::from_secs(1)),
902 ns,
903 move || Box::new(EchoActor),
904 ));
905
906 let handle_a = engine_a.handle();
907 let handle_b = engine_b.handle();
908 handle_a
909 .type_registry()
910 .register_remote_ask::<EchoRequest>();
911 handle_b
912 .type_registry()
913 .register_remote_ask::<EchoRequest>();
914
915 let cluster_a = handle_a
916 .attach_bounded_cluster(BoundedClusterConfig {
917 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
918 peers: Vec::new(),
919 declared_actors: Vec::new(),
920 roles: Vec::new(),
921 })
922 .await
923 .expect("attach bounded cluster a");
924 let tcp_a = cluster_a
925 .transport()
926 .as_tcp()
927 .cloned()
928 .expect("tcp transport a");
929
930 let cluster_b = handle_b
931 .attach_bounded_cluster(BoundedClusterConfig {
932 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
933 peers: vec![PeerSpec {
934 engine_id: EngineId::new("engine-a.example"),
935 addr: tcp_a.local_addr(),
936 control_plane_addr: None,
937 server_name: None,
938 }],
939 declared_actors: Vec::new(),
940 roles: Vec::new(),
941 })
942 .await
943 .expect("attach bounded cluster b");
944 let tcp_b = cluster_b
945 .transport()
946 .as_tcp()
947 .cloned()
948 .expect("tcp transport b");
949
950 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
951 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
952 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
953
954 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
955 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
956
957 let canonical = AddrHash::new(&actor_path, 0);
958 let deadline = std::time::Instant::now() + Duration::from_secs(5);
959 while !cluster_b.can_route(&actor_path) {
960 if std::time::Instant::now() > deadline {
961 panic!("bounded cluster route did not propagate");
962 }
963 tokio::time::sleep(Duration::from_millis(20)).await;
964 }
965 assert!(tcp_b.can_route(canonical));
966
967 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
968 let response = remote
969 .ask(EchoRequest { value: 12 })
970 .await
971 .expect("bounded cluster ask");
972 assert_eq!(response, EchoResponse { doubled: 24 });
973
974 shutdown_a_tx.send(()).ok();
975 shutdown_b_tx.send(()).ok();
976 }
977
978 #[tokio::test]
979 async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
980 if !tcp_tests_available() {
981 eprintln!("skipping tcp integration tests: network bind not permitted");
982 return;
983 }
984
985 let (ca, ca_key) = make_ca();
986 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
987 let engine = Engine::with_config(EngineConfig {
988 engine_id: EngineId::new("engine-a.example"),
989 ..Default::default()
990 });
991 let handle = engine.handle();
992
993 let mut cluster = handle
994 .attach_bounded_cluster(BoundedClusterConfig {
995 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
996 peers: vec![PeerSpec {
997 engine_id: EngineId::new("engine-b.example"),
998 addr: "127.0.0.1:4100".parse().unwrap(),
999 control_plane_addr: None,
1000 server_name: None,
1001 }],
1002 declared_actors: Vec::new(),
1003 roles: Vec::new(),
1004 })
1005 .await
1006 .expect("attach bounded cluster");
1007
1008 let path = ActorPath::parse("/user/declared-remote").unwrap();
1009 cluster
1010 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1011 .expect("declare remote actor");
1012
1013 assert!(cluster.can_route(&path));
1014 assert!(cluster
1015 .transport()
1016 .as_tcp()
1017 .expect("tcp transport")
1018 .can_route(AddrHash::new(&path, 0)));
1019 }
1020
1021 #[tokio::test]
1022 async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1023 if !tcp_tests_available() {
1024 eprintln!("skipping tcp integration tests: network bind not permitted");
1025 return;
1026 }
1027
1028 let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1029 let (ca, ca_key) = make_ca();
1030 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1031 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1032 let control_plane_b = reserve_tcp_addr();
1033
1034 let engine_a = Engine::with_config(EngineConfig {
1035 engine_id: EngineId::new("engine-a.example"),
1036 ..Default::default()
1037 });
1038 let engine_b = Engine::with_config(EngineConfig {
1039 engine_id: EngineId::new("engine-b.example"),
1040 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1041 control_plane_tls: Some(tls_b.clone()),
1042 actor_spawn: Some(bounded_spawn_handler()),
1043 ..Default::default()
1044 });
1045
1046 let handle_a = engine_a.handle();
1047 let handle_b = engine_b.handle();
1048 handle_a
1049 .type_registry()
1050 .register_remote_ask::<EchoRequest>();
1051 handle_b
1052 .type_registry()
1053 .register_remote_ask::<EchoRequest>();
1054
1055 let cluster_a = handle_a
1056 .attach_bounded_cluster(BoundedClusterConfig {
1057 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1058 peers: Vec::new(),
1059 declared_actors: Vec::new(),
1060 roles: Vec::new(),
1061 })
1062 .await
1063 .expect("attach bounded cluster a");
1064 let tcp_a = cluster_a
1065 .transport()
1066 .as_tcp()
1067 .cloned()
1068 .expect("tcp transport a");
1069
1070 let cluster_b = handle_b
1071 .attach_bounded_cluster(BoundedClusterConfig {
1072 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1073 peers: vec![PeerSpec {
1074 engine_id: EngineId::new("engine-a.example"),
1075 addr: tcp_a.local_addr(),
1076 control_plane_addr: None,
1077 server_name: None,
1078 }],
1079 declared_actors: Vec::new(),
1080 roles: Vec::new(),
1081 })
1082 .await
1083 .expect("attach bounded cluster b");
1084 let tcp_b = cluster_b
1085 .transport()
1086 .as_tcp()
1087 .cloned()
1088 .expect("tcp transport b");
1089
1090 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1091 let mut cluster_a = cluster_a;
1092 cluster_a
1093 .add_peer(PeerSpec {
1094 engine_id: EngineId::new("engine-b.example"),
1095 addr: tcp_b.local_addr(),
1096 control_plane_addr: Some(control_plane_b),
1097 server_name: None,
1098 })
1099 .expect("add peer with control plane");
1100
1101 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1102 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1103
1104 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1105 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1106 tokio::time::sleep(Duration::from_millis(150)).await;
1107
1108 let remote = cluster_a
1109 .spawn_remote::<EchoRequest>(
1110 EngineId::new("engine-b.example"),
1111 RemoteSpawnSpec {
1112 path: actor_path.clone(),
1113 type_name: "bounded-echo".to_string(),
1114 config: None,
1115 },
1116 )
1117 .await
1118 .expect("spawn remote actor");
1119
1120 let response = remote
1121 .ask(EchoRequest { value: 21 })
1122 .await
1123 .expect("bounded remote spawn ask");
1124 assert_eq!(response, EchoResponse { doubled: 42 });
1125
1126 let duplicate = cluster_a
1127 .spawn_remote::<EchoRequest>(
1128 EngineId::new("engine-b.example"),
1129 RemoteSpawnSpec {
1130 path: actor_path.clone(),
1131 type_name: "bounded-echo".to_string(),
1132 config: None,
1133 },
1134 )
1135 .await
1136 .expect_err("duplicate remote spawn should fail");
1137 assert!(
1138 matches!(duplicate, RemoteSpawnError::ControlPlane(ref message) if message.contains("actor already running")),
1139 "unexpected duplicate error: {duplicate:?}"
1140 );
1141
1142 let slot = handle_b
1143 .registry
1144 .get_by_path(&actor_path)
1145 .expect("spawned actor registered on host");
1146 handle_b.registry.increment_restart_count(slot.addr);
1147 slot.ctrl_tx
1148 .send(crate::common::LifecycleSignal::Stop(
1149 palladium_actor::StopReason::Killed,
1150 ))
1151 .ok();
1152
1153 let restart_deadline = Instant::now() + Duration::from_secs(5);
1154 loop {
1155 match remote.ask(EchoRequest { value: 7 }).await {
1156 Ok(response) => {
1157 assert_eq!(response, EchoResponse { doubled: 14 });
1158 break;
1159 }
1160 Err(_) if Instant::now() <= restart_deadline => {
1161 tokio::time::sleep(Duration::from_millis(25)).await;
1162 }
1163 Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1164 }
1165 }
1166
1167 let unknown = cluster_a
1168 .spawn_remote::<EchoRequest>(
1169 EngineId::new("engine-c.example"),
1170 RemoteSpawnSpec {
1171 path: ActorPath::parse("/user/unknown-target").unwrap(),
1172 type_name: "bounded-echo".to_string(),
1173 config: None,
1174 },
1175 )
1176 .await
1177 .expect_err("unknown target should fail");
1178 assert_eq!(
1179 unknown,
1180 RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1181 );
1182
1183 shutdown_a_tx.send(()).ok();
1184 shutdown_b_tx.send(()).ok();
1185 }
1186
1187 #[tokio::test]
1188 async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1189 if !tcp_tests_available() {
1190 eprintln!("skipping tcp integration tests: network bind not permitted");
1191 return;
1192 }
1193
1194 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1195 let (ca, ca_key) = make_ca();
1196 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1197 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1198 let control_plane_b = reserve_tcp_addr();
1199
1200 let engine_a = Engine::with_config(EngineConfig {
1201 engine_id: EngineId::new("engine-a.example"),
1202 ..Default::default()
1203 });
1204 let engine_b = Engine::with_config(EngineConfig {
1205 engine_id: EngineId::new("engine-b.example"),
1206 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1207 control_plane_tls: Some(tls_b.clone()),
1208 actor_spawn: Some(bounded_spawn_handler()),
1209 ..Default::default()
1210 });
1211
1212 let handle_a = engine_a.handle();
1213 let handle_b = engine_b.handle();
1214 handle_a
1215 .type_registry()
1216 .register_remote_ask::<EchoRequest>();
1217 handle_b
1218 .type_registry()
1219 .register_remote_ask::<EchoRequest>();
1220
1221 let cluster_b = handle_b
1222 .attach_bounded_cluster(BoundedClusterConfig {
1223 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1224 peers: Vec::new(),
1225 declared_actors: Vec::new(),
1226 roles: Vec::new(),
1227 })
1228 .await
1229 .expect("attach bounded cluster b");
1230 let tcp_b = cluster_b
1231 .transport()
1232 .as_tcp()
1233 .cloned()
1234 .expect("tcp transport b");
1235
1236 let cluster_a = handle_a
1237 .attach_bounded_cluster(BoundedClusterConfig {
1238 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1239 peers: vec![PeerSpec {
1240 engine_id: EngineId::new("engine-b.example"),
1241 addr: tcp_b.local_addr(),
1242 control_plane_addr: Some(control_plane_b),
1243 server_name: None,
1244 }],
1245 declared_actors: Vec::new(),
1246 roles: vec![BoundedClusterRole {
1247 name: "context-service".to_string(),
1248 owner: EngineId::new("engine-b.example"),
1249 }],
1250 })
1251 .await
1252 .expect("attach bounded cluster a");
1253 let tcp_a = cluster_a
1254 .transport()
1255 .as_tcp()
1256 .cloned()
1257 .expect("tcp transport a");
1258
1259 let mut cluster_b = cluster_b;
1260 cluster_b
1261 .add_peer(PeerSpec {
1262 engine_id: EngineId::new("engine-a.example"),
1263 addr: tcp_a.local_addr(),
1264 control_plane_addr: None,
1265 server_name: None,
1266 })
1267 .expect("add peer a to cluster b");
1268
1269 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1270 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1271
1272 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1273 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1274 tokio::time::sleep(Duration::from_millis(150)).await;
1275
1276 let remote = cluster_a
1277 .spawn_remote_on_role::<EchoRequest>(
1278 "context-service",
1279 RemoteSpawnSpec {
1280 path: actor_path.clone(),
1281 type_name: "bounded-echo".to_string(),
1282 config: None,
1283 },
1284 )
1285 .await
1286 .expect("spawn remote actor by role over tcp");
1287
1288 let response = remote
1289 .ask(EchoRequest { value: 11 })
1290 .await
1291 .expect("bounded remote spawn ask over tcp by role");
1292 assert_eq!(response, EchoResponse { doubled: 22 });
1293
1294 let unknown_role = cluster_a
1295 .spawn_remote_on_role::<EchoRequest>(
1296 "missing-role",
1297 RemoteSpawnSpec {
1298 path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1299 type_name: "bounded-echo".to_string(),
1300 config: None,
1301 },
1302 )
1303 .await
1304 .expect_err("unknown role should fail");
1305 assert_eq!(
1306 unknown_role,
1307 RemoteSpawnError::UnknownRole("missing-role".to_string())
1308 );
1309
1310 shutdown_a_tx.send(()).ok();
1311 shutdown_b_tx.send(()).ok();
1312 }
1313
1314 fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
1315 QuicTransportConfig {
1316 engine_id: EngineId::new(engine_id),
1317 listen_addr: "127.0.0.1:0".parse().unwrap(),
1318 send_buffer_size: 1024,
1319 idle_timeout: Duration::from_secs(2),
1320 tls,
1321 }
1322 }
1323
1324 #[tokio::test]
1325 async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
1326 if !quic_tests_available() {
1327 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1328 return;
1329 }
1330
1331 let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
1332 let (ca, ca_key) = make_ca();
1333 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1334 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1335
1336 let mut engine_a = Engine::with_config(EngineConfig {
1337 engine_id: EngineId::new("engine-a.example"),
1338 ..Default::default()
1339 });
1340 let engine_b = Engine::with_config(EngineConfig {
1341 engine_id: EngineId::new("engine-b.example"),
1342 ..Default::default()
1343 });
1344
1345 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1346 engine_a.add_user_actor(ChildSpec::new(
1347 "cluster-echo-quic",
1348 RestartPolicy::Permanent,
1349 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1350 ns,
1351 move || Box::new(EchoActor),
1352 ));
1353
1354 let handle_a = engine_a.handle();
1355 let handle_b = engine_b.handle();
1356 handle_a
1357 .type_registry()
1358 .register_remote_ask::<EchoRequest>();
1359 handle_b
1360 .type_registry()
1361 .register_remote_ask::<EchoRequest>();
1362
1363 let cluster_a = match handle_a
1364 .attach_bounded_cluster(BoundedClusterConfig {
1365 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1366 peers: Vec::new(),
1367 declared_actors: Vec::new(),
1368 roles: vec![BoundedClusterRole {
1369 name: "context-service".to_string(),
1370 owner: EngineId::new("engine-b.example"),
1371 }],
1372 })
1373 .await
1374 {
1375 Ok(cluster) => cluster,
1376 Err(ClusterError::Transport(err)) => {
1377 eprintln!("skipping quic bounded-cluster test: {err:?}");
1378 return;
1379 }
1380 Err(err) => panic!("attach bounded cluster a: {err:?}"),
1381 };
1382 let quic_a = cluster_a
1383 .transport()
1384 .as_quic()
1385 .cloned()
1386 .expect("quic transport a");
1387
1388 let cluster_b = match handle_b
1389 .attach_bounded_cluster(BoundedClusterConfig {
1390 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1391 peers: vec![PeerSpec {
1392 engine_id: EngineId::new("engine-a.example"),
1393 addr: quic_a.local_addr(),
1394 control_plane_addr: None,
1395 server_name: None,
1396 }],
1397 declared_actors: Vec::new(),
1398 roles: Vec::new(),
1399 })
1400 .await
1401 {
1402 Ok(cluster) => cluster,
1403 Err(ClusterError::Transport(err)) => {
1404 eprintln!("skipping quic bounded-cluster test: {err:?}");
1405 return;
1406 }
1407 Err(err) => panic!("attach bounded cluster b: {err:?}"),
1408 };
1409 let quic_b = cluster_b
1410 .transport()
1411 .as_quic()
1412 .cloned()
1413 .expect("quic transport b");
1414
1415 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1416 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1417 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1418
1419 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1420 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1421
1422 let canonical = AddrHash::new(&actor_path, 0);
1423 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1424 while !cluster_b.can_route(&actor_path) {
1425 if std::time::Instant::now() > deadline {
1426 panic!("bounded quic cluster route did not propagate");
1427 }
1428 tokio::time::sleep(Duration::from_millis(20)).await;
1429 }
1430 assert!(quic_b.can_route(canonical));
1431
1432 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1433 let response = remote
1434 .ask(EchoRequest { value: 15 })
1435 .await
1436 .expect("bounded cluster ask over quic");
1437 assert_eq!(response, EchoResponse { doubled: 30 });
1438
1439 shutdown_a_tx.send(()).ok();
1440 shutdown_b_tx.send(()).ok();
1441 }
1442
1443 #[tokio::test]
1444 async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
1445 if !quic_tests_available() {
1446 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1447 return;
1448 }
1449
1450 let (ca, ca_key) = make_ca();
1451 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1452 let engine = Engine::with_config(EngineConfig {
1453 engine_id: EngineId::new("engine-a.example"),
1454 ..Default::default()
1455 });
1456 let handle = engine.handle();
1457
1458 let mut cluster = match handle
1459 .attach_bounded_cluster(BoundedClusterConfig {
1460 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
1461 peers: vec![PeerSpec {
1462 engine_id: EngineId::new("engine-b.example"),
1463 addr: "127.0.0.1:4100".parse().unwrap(),
1464 control_plane_addr: None,
1465 server_name: None,
1466 }],
1467 declared_actors: Vec::new(),
1468 roles: Vec::new(),
1469 })
1470 .await
1471 {
1472 Ok(cluster) => cluster,
1473 Err(ClusterError::Transport(err)) => {
1474 eprintln!("skipping quic bounded-cluster test: {err:?}");
1475 return;
1476 }
1477 Err(err) => panic!("attach bounded cluster: {err:?}"),
1478 };
1479
1480 let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
1481 cluster
1482 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1483 .expect("declare remote actor");
1484
1485 assert!(cluster.can_route(&path));
1486 assert!(cluster
1487 .transport()
1488 .as_quic()
1489 .expect("quic transport")
1490 .can_route(AddrHash::new(&path, 0)));
1491 }
1492
1493 #[tokio::test]
1494 async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
1495 if !quic_tests_available() {
1496 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1497 return;
1498 }
1499
1500 let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
1501 let (ca, ca_key) = make_ca();
1502 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1503 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1504 let control_plane_b = reserve_udp_addr();
1505
1506 let engine_a = Engine::with_config(EngineConfig {
1507 engine_id: EngineId::new("engine-a.example"),
1508 ..Default::default()
1509 });
1510 let engine_b = Engine::with_config(EngineConfig {
1511 engine_id: EngineId::new("engine-b.example"),
1512 control_plane_quic_addr: Some(control_plane_b.to_string()),
1513 control_plane_tls: Some(tls_b.clone()),
1514 actor_spawn: Some(bounded_spawn_handler()),
1515 ..Default::default()
1516 });
1517
1518 let handle_a = engine_a.handle();
1519 let handle_b = engine_b.handle();
1520 handle_a
1521 .type_registry()
1522 .register_remote_ask::<EchoRequest>();
1523 handle_b
1524 .type_registry()
1525 .register_remote_ask::<EchoRequest>();
1526
1527 let cluster_a = match handle_a
1528 .attach_bounded_cluster(BoundedClusterConfig {
1529 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1530 peers: Vec::new(),
1531 declared_actors: Vec::new(),
1532 roles: Vec::new(),
1533 })
1534 .await
1535 {
1536 Ok(cluster) => cluster,
1537 Err(ClusterError::Transport(err)) => {
1538 eprintln!("skipping quic bounded-cluster test: {err:?}");
1539 return;
1540 }
1541 Err(err) => panic!("attach bounded cluster a: {err:?}"),
1542 };
1543 let quic_a = cluster_a
1544 .transport()
1545 .as_quic()
1546 .cloned()
1547 .expect("quic transport a");
1548
1549 let cluster_b = match handle_b
1550 .attach_bounded_cluster(BoundedClusterConfig {
1551 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1552 peers: vec![PeerSpec {
1553 engine_id: EngineId::new("engine-a.example"),
1554 addr: quic_a.local_addr(),
1555 control_plane_addr: None,
1556 server_name: None,
1557 }],
1558 declared_actors: Vec::new(),
1559 roles: Vec::new(),
1560 })
1561 .await
1562 {
1563 Ok(cluster) => cluster,
1564 Err(ClusterError::Transport(err)) => {
1565 eprintln!("skipping quic bounded-cluster test: {err:?}");
1566 return;
1567 }
1568 Err(err) => panic!("attach bounded cluster b: {err:?}"),
1569 };
1570 let quic_b = cluster_b
1571 .transport()
1572 .as_quic()
1573 .cloned()
1574 .expect("quic transport b");
1575
1576 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1577 let mut cluster_a = cluster_a;
1578 cluster_a
1579 .add_peer(PeerSpec {
1580 engine_id: EngineId::new("engine-b.example"),
1581 addr: quic_b.local_addr(),
1582 control_plane_addr: Some(control_plane_b),
1583 server_name: None,
1584 })
1585 .expect("add quic peer with control plane");
1586
1587 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1588 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1589
1590 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1591 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1592 tokio::time::sleep(Duration::from_millis(150)).await;
1593
1594 let remote = cluster_a
1595 .spawn_remote_on_role::<EchoRequest>(
1596 "context-service",
1597 RemoteSpawnSpec {
1598 path: actor_path.clone(),
1599 type_name: "bounded-echo".to_string(),
1600 config: None,
1601 },
1602 )
1603 .await
1604 .expect("spawn remote actor over quic by role");
1605
1606 let response = remote
1607 .ask(EchoRequest { value: 9 })
1608 .await
1609 .expect("bounded remote spawn ask over quic by role");
1610 assert_eq!(response, EchoResponse { doubled: 18 });
1611
1612 let unknown_role = cluster_a
1613 .spawn_remote_on_role::<EchoRequest>(
1614 "missing-role",
1615 RemoteSpawnSpec {
1616 path: ActorPath::parse("/user/missing-role").unwrap(),
1617 type_name: "bounded-echo".to_string(),
1618 config: None,
1619 },
1620 )
1621 .await
1622 .expect_err("unknown role should fail");
1623 assert_eq!(
1624 unknown_role,
1625 RemoteSpawnError::UnknownRole("missing-role".to_string())
1626 );
1627
1628 shutdown_a_tx.send(()).ok();
1629 shutdown_b_tx.send(()).ok();
1630 }
1631}