1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
10use palladium_transport::network::{Network, TokioNetwork};
11use palladium_transport::{
12 QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport, TransportError,
13};
14use rustls::pki_types::ServerName;
15use serde_json::{json, Value};
16use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
17use tokio_rustls::TlsConnector;
18
19#[derive(Clone)]
20pub struct BoundedClusterConfig {
21 pub transport: BoundedTransportConfig,
22 pub peers: Vec<PeerSpec>,
23 pub declared_actors: Vec<DeclaredRemoteActor>,
24 pub roles: Vec<BoundedClusterRole>,
25}
26
27#[derive(Clone)]
28pub enum BoundedTransportConfig {
29 Tcp(TcpTransportConfig),
30 Quic(QuicTransportConfig),
31}
32
33impl BoundedTransportConfig {
34 pub fn engine_id(&self) -> &EngineId {
35 match self {
36 Self::Tcp(config) => &config.engine_id,
37 Self::Quic(config) => &config.engine_id,
38 }
39 }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct PeerSpec {
44 pub engine_id: EngineId,
45 pub addr: SocketAddr,
46 pub control_plane_addr: Option<SocketAddr>,
47 pub server_name: Option<String>,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct DeclaredRemoteActor {
52 pub path: ActorPath,
53 pub owner: EngineId,
54}
55
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct BoundedClusterRole {
58 pub name: String,
59 pub owner: EngineId,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq)]
63pub enum ClusterError {
64 DuplicatePeer(EngineId),
65 LocalPeerDeclared(EngineId),
66 DuplicateDeclaredActor(ActorPath),
67 LocalActorDeclaredRemote(ActorPath),
68 UnknownPeerOwner { path: ActorPath, owner: EngineId },
69 DuplicateRole(String),
70 UnknownRoleOwner { role: String, owner: EngineId },
71 Transport(TransportError),
72}
73
74#[derive(Clone, Debug, PartialEq)]
75pub struct RemoteSpawnSpec {
76 pub path: ActorPath,
77 pub type_name: String,
78 pub config: Option<Value>,
79}
80
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum RemoteSpawnError {
83 UnknownPeer(EngineId),
84 UnknownRole(String),
85 LocalPathCollision(ActorPath),
86 RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
87 RemotePathAlreadyRunning(ActorPath),
88 MissingControlPlaneEndpoint(EngineId),
89 InvalidServerName(String),
90 ControlPlane(String),
91 SpawnTimedOut(ActorPath),
92}
93
94#[derive(Clone)]
95pub(crate) enum ControlPlaneProtocol {
96 Tcp,
97 Quic,
98}
99
100#[derive(Clone)]
101pub(crate) struct ControlPlaneClientConfig {
102 pub(crate) protocol: ControlPlaneProtocol,
103 pub(crate) tls: palladium_transport::TlsConfig,
104 pub(crate) wait_timeout: Duration,
105}
106
107const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
108
109impl From<TransportError> for ClusterError {
110 fn from(value: TransportError) -> Self {
111 Self::Transport(value)
112 }
113}
114
115impl From<TransportError> for RemoteSpawnError {
116 fn from(value: TransportError) -> Self {
117 Self::ControlPlane(format!("transport route install failed: {value:?}"))
118 }
119}
120
121#[derive(Clone)]
122pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
123 Tcp(Arc<TcpTransport<R, N>>),
124 Quic(Arc<QuicTransport<R, N>>),
125}
126
127impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
128 pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
129 match self {
130 Self::Tcp(transport) => Some(transport),
131 Self::Quic(_) => None,
132 }
133 }
134
135 pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
136 match self {
137 Self::Tcp(_) => None,
138 Self::Quic(transport) => Some(transport),
139 }
140 }
141
142 pub fn add_peer(&self, peer: &PeerSpec) {
143 match self {
144 Self::Tcp(transport) => {
145 if let Some(server_name) = &peer.server_name {
146 transport.add_peer_with_server_name(
147 peer.engine_id.clone(),
148 peer.addr,
149 server_name.clone(),
150 );
151 } else {
152 transport.add_peer(peer.engine_id.clone(), peer.addr);
153 }
154 }
155 Self::Quic(transport) => {
156 if let Some(server_name) = &peer.server_name {
157 transport.add_peer_with_server_name(
158 peer.engine_id.clone(),
159 peer.addr,
160 server_name.clone(),
161 );
162 } else {
163 transport.add_peer(peer.engine_id.clone(), peer.addr);
164 }
165 }
166 }
167 }
168
169 pub fn declare_remote_path(
170 &self,
171 owner: EngineId,
172 path: &ActorPath,
173 ) -> Result<(), TransportError> {
174 match self {
175 Self::Tcp(transport) => transport.declare_remote_path(owner, path),
176 Self::Quic(transport) => transport.declare_remote_path(owner, path),
177 }
178 }
179
180 pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
181 match self {
182 Self::Tcp(transport) => transport.undeclare_remote_path(path),
183 Self::Quic(transport) => transport.undeclare_remote_path(path),
184 }
185 }
186
187 pub fn can_route(&self, destination: AddrHash) -> bool {
188 match self {
189 Self::Tcp(transport) => transport.can_route(destination),
190 Self::Quic(transport) => transport.can_route(destination),
191 }
192 }
193}
194
195#[derive(Clone)]
196pub struct BoundedClusterHandle<
197 R: Reactor = TokioReactor,
198 N: Network = TokioNetwork,
199 F: FileSystem = TokioFileSystem,
200> {
201 engine: EngineHandle<R, N, F>,
202 local_engine_id: EngineId,
203 control_plane: ControlPlaneClientConfig,
204 transport: BoundedTransportHandle<R, N>,
205 peers: Vec<PeerSpec>,
206 declared_actors: Vec<DeclaredRemoteActor>,
207 roles: Vec<BoundedClusterRole>,
208 registry: Arc<ActorRegistry<R>>,
209}
210
211impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
212 pub(crate) fn new(
213 engine: EngineHandle<R, N, F>,
214 local_engine_id: EngineId,
215 control_plane: ControlPlaneClientConfig,
216 transport: BoundedTransportHandle<R, N>,
217 peers: Vec<PeerSpec>,
218 roles: Vec<BoundedClusterRole>,
219 registry: Arc<ActorRegistry<R>>,
220 ) -> Self {
221 Self {
222 engine,
223 local_engine_id,
224 control_plane,
225 transport,
226 peers,
227 declared_actors: Vec::new(),
228 roles,
229 registry,
230 }
231 }
232
233 pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
234 &self.transport
235 }
236
237 pub fn peers(&self) -> &[PeerSpec] {
238 &self.peers
239 }
240
241 pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
242 &self.declared_actors
243 }
244
245 pub fn roles(&self) -> &[BoundedClusterRole] {
246 &self.roles
247 }
248
249 pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
250 validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
251 self.transport.add_peer(&peer);
252 self.peers.push(peer);
253 Ok(())
254 }
255
256 pub fn declare_remote_actor(
257 &mut self,
258 owner: EngineId,
259 path: ActorPath,
260 ) -> Result<(), ClusterError> {
261 validate_declared_actor(
262 &path,
263 &owner,
264 &self.local_engine_id,
265 &self.peers,
266 &self.declared_actors,
267 &self.registry,
268 )?;
269 self.transport.declare_remote_path(owner.clone(), &path)?;
270 self.declared_actors
271 .push(DeclaredRemoteActor { path, owner });
272 Ok(())
273 }
274
275 pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
276 self.transport.undeclare_remote_path(path)?;
277 self.declared_actors
278 .retain(|declared| &declared.path != path);
279 Ok(())
280 }
281
282 pub fn can_route(&self, path: &ActorPath) -> bool {
283 self.transport.can_route(AddrHash::new(path, 0))
284 }
285
286 pub async fn spawn_remote_on_role<M>(
287 &self,
288 role: &str,
289 spec: RemoteSpawnSpec,
290 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
291 where
292 M: RemoteMessage,
293 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
294 R: Send + Sync,
295 {
296 let owner = self
297 .roles
298 .iter()
299 .find(|declared| declared.name == role)
300 .map(|declared| declared.owner.clone())
301 .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
302 self.spawn_remote::<M>(owner, spec).await
303 }
304
305 pub async fn spawn_remote<M>(
306 &self,
307 target: EngineId,
308 spec: RemoteSpawnSpec,
309 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
310 where
311 M: RemoteMessage,
312 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
313 R: Send + Sync,
314 {
315 validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
316
317 let peer = self
318 .peers
319 .iter()
320 .find(|peer| peer.engine_id == target)
321 .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
322 let control_plane_addr = peer
323 .control_plane_addr
324 .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
325 let server_name = peer
326 .server_name
327 .clone()
328 .unwrap_or_else(|| target.as_str().to_string());
329
330 call_actor_spawn(&self.control_plane, control_plane_addr, &server_name, &spec).await?;
331
332 let deadline = Instant::now() + self.control_plane.wait_timeout;
333 while Instant::now() <= deadline {
334 if actor_is_spawned(
335 &self.control_plane,
336 control_plane_addr,
337 &server_name,
338 &spec.path,
339 )
340 .await?
341 {
342 self.transport
343 .declare_remote_path(target.clone(), &spec.path)
344 .map_err(RemoteSpawnError::from)?;
345 return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
346 }
347 tokio::time::sleep(Duration::from_millis(20)).await;
348 }
349
350 Err(RemoteSpawnError::SpawnTimedOut(spec.path))
351 }
352}
353
354async fn call_actor_spawn(
355 client: &ControlPlaneClientConfig,
356 addr: SocketAddr,
357 server_name: &str,
358 spec: &RemoteSpawnSpec,
359) -> Result<(), RemoteSpawnError> {
360 let params = match &spec.config {
361 Some(config) => json!({
362 "path": spec.path.as_str(),
363 "type_name": spec.type_name,
364 "config": config,
365 }),
366 None => json!({
367 "path": spec.path.as_str(),
368 "type_name": spec.type_name,
369 }),
370 };
371 match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
372 Ok(_) => {}
373 Err(RemoteSpawnError::ControlPlane(message))
374 if message.contains("actor already running at") =>
375 {
376 return Err(RemoteSpawnError::RemotePathAlreadyRunning(
377 spec.path.clone(),
378 ));
379 }
380 Err(err) => return Err(err),
381 }
382 Ok(())
383}
384
385fn validate_remote_spawn_request<R: Reactor>(
386 path: &ActorPath,
387 owner: &EngineId,
388 declared_actors: &[DeclaredRemoteActor],
389 registry: &Arc<ActorRegistry<R>>,
390) -> Result<(), RemoteSpawnError> {
391 if registry.get_by_path(path).is_some() {
392 return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
393 }
394
395 if let Some(declared) = declared_actors
396 .iter()
397 .find(|declared| declared.path == *path && declared.owner != *owner)
398 {
399 return Err(RemoteSpawnError::RemoteOwnershipConflict {
400 path: path.clone(),
401 owner: declared.owner.clone(),
402 });
403 }
404
405 Ok(())
406}
407
408async fn actor_is_spawned(
409 client: &ControlPlaneClientConfig,
410 addr: SocketAddr,
411 server_name: &str,
412 path: &ActorPath,
413) -> Result<bool, RemoteSpawnError> {
414 match call_control_plane(
415 client,
416 addr,
417 server_name,
418 "actor.info",
419 json!({ "path": path.as_str() }),
420 )
421 .await
422 {
423 Ok(_) => Ok(true),
424 Err(RemoteSpawnError::ControlPlane(message))
425 if message.contains("actor not found:")
426 || message.contains("actor not found")
427 || message.contains("User supervisor not available") =>
428 {
429 Ok(false)
430 }
431 Err(err) => Err(err),
432 }
433}
434
435async fn call_control_plane(
436 client: &ControlPlaneClientConfig,
437 addr: SocketAddr,
438 server_name: &str,
439 method: &str,
440 params: Value,
441) -> Result<Value, RemoteSpawnError> {
442 let request = json!({
443 "id": 1,
444 "method": method,
445 "params": params,
446 });
447
448 match client.protocol {
449 ControlPlaneProtocol::Tcp => {
450 call_control_plane_tcp(addr, server_name, &client.tls, &request).await
451 }
452 ControlPlaneProtocol::Quic => {
453 call_control_plane_quic(addr, server_name, &client.tls, &request).await
454 }
455 }
456}
457
458async fn call_control_plane_tcp(
459 addr: SocketAddr,
460 server_name: &str,
461 tls: &palladium_transport::TlsConfig,
462 request: &Value,
463) -> Result<Value, RemoteSpawnError> {
464 let config = tls
465 .client_config(server_name)
466 .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
467 let name = ServerName::try_from(server_name.to_string())
468 .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
469 let stream = tokio::net::TcpStream::connect(addr)
470 .await
471 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
472 let connector = TlsConnector::from(config);
473 let tls_stream = connector
474 .connect(name, stream)
475 .await
476 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
477
478 let (reader, mut writer) = tokio::io::split(tls_stream);
479 let mut body = request.to_string();
480 body.push('\n');
481 writer
482 .write_all(body.as_bytes())
483 .await
484 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
485 let mut line = String::new();
486 let mut reader = BufReader::new(reader);
487 reader
488 .read_line(&mut line)
489 .await
490 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
491 parse_control_plane_response(&line)
492}
493
494async fn call_control_plane_quic(
495 addr: SocketAddr,
496 server_name: &str,
497 tls: &palladium_transport::TlsConfig,
498 request: &Value,
499) -> Result<Value, RemoteSpawnError> {
500 ensure_rustls_provider();
501 let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
502 RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
503 })?)
504 .clone();
505 client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
506 let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
507
508 let client = s2n_quic::Client::builder()
509 .with_tls(client_tls)
510 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
511 .with_io("0.0.0.0:0")
512 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
513 .start()
514 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
515
516 let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
517 let mut conn = client
518 .connect(connect)
519 .await
520 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
521 let mut stream = conn
522 .open_bidirectional_stream()
523 .await
524 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
525
526 let mut body = request.to_string();
527 body.push('\n');
528 stream
529 .write_all(body.as_bytes())
530 .await
531 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
532 stream
533 .close()
534 .await
535 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
536
537 let mut buf = Vec::new();
538 stream
539 .read_to_end(&mut buf)
540 .await
541 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
542 let line = String::from_utf8_lossy(&buf);
543 parse_control_plane_response(&line)
544}
545
546fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
547 let response: Value = serde_json::from_str(line.trim())
548 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
549 if let Some(error) = response.get("error") {
550 let message = error
551 .get("message")
552 .and_then(Value::as_str)
553 .unwrap_or("unknown control-plane error");
554 return Err(RemoteSpawnError::ControlPlane(message.to_string()));
555 }
556 Ok(response.get("result").cloned().unwrap_or(Value::Null))
557}
558
559fn ensure_rustls_provider() {
560 static INIT: std::sync::Once = std::sync::Once::new();
561 INIT.call_once(|| {
562 #[cfg(feature = "aws-lc-rs")]
563 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
564 #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
565 let _ = rustls::crypto::ring::default_provider().install_default();
566 });
567}
568
569pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
570 ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
571}
572
573pub(crate) fn validate_config<R: Reactor>(
574 config: &BoundedClusterConfig,
575 registry: &Arc<ActorRegistry<R>>,
576) -> Result<(), ClusterError> {
577 let local_engine_id = config.transport.engine_id();
578 let mut seen_peers = Vec::new();
579 for peer in &config.peers {
580 validate_peer_spec(peer, local_engine_id, &seen_peers)?;
581 seen_peers.push(peer.clone());
582 }
583
584 let mut seen_declared = Vec::new();
585 for declared in &config.declared_actors {
586 validate_declared_actor(
587 &declared.path,
588 &declared.owner,
589 local_engine_id,
590 &config.peers,
591 &seen_declared,
592 registry,
593 )?;
594 seen_declared.push(declared.clone());
595 }
596
597 let mut seen_roles = Vec::new();
598 for role in &config.roles {
599 validate_role(role, &config.peers, &seen_roles)?;
600 seen_roles.push(role.clone());
601 }
602
603 Ok(())
604}
605
606fn validate_peer_spec(
607 peer: &PeerSpec,
608 local_engine_id: &EngineId,
609 existing: &[PeerSpec],
610) -> Result<(), ClusterError> {
611 if &peer.engine_id == local_engine_id {
612 return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
613 }
614 if existing
615 .iter()
616 .any(|existing| existing.engine_id == peer.engine_id)
617 {
618 return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
619 }
620 Ok(())
621}
622
623fn validate_declared_actor<R: Reactor>(
624 path: &ActorPath,
625 owner: &EngineId,
626 local_engine_id: &EngineId,
627 peers: &[PeerSpec],
628 existing: &[DeclaredRemoteActor],
629 registry: &Arc<ActorRegistry<R>>,
630) -> Result<(), ClusterError> {
631 if owner == local_engine_id || registry.get_by_path(path).is_some() {
632 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
633 }
634 if existing.iter().any(|existing| existing.path == *path) {
635 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
636 }
637 if !peers.iter().any(|peer| peer.engine_id == *owner) {
638 return Err(ClusterError::UnknownPeerOwner {
639 path: path.clone(),
640 owner: owner.clone(),
641 });
642 }
643 Ok(())
644}
645
646fn validate_role(
647 role: &BoundedClusterRole,
648 peers: &[PeerSpec],
649 existing: &[BoundedClusterRole],
650) -> Result<(), ClusterError> {
651 if existing.iter().any(|existing| existing.name == role.name) {
652 return Err(ClusterError::DuplicateRole(role.name.clone()));
653 }
654 if !peers.iter().any(|peer| peer.engine_id == role.owner) {
655 return Err(ClusterError::UnknownRoleOwner {
656 role: role.name.clone(),
657 owner: role.owner.clone(),
658 });
659 }
660 Ok(())
661}
662
663#[cfg(test)]
664mod tests {
665 use super::{
666 validate_config, BoundedClusterConfig, BoundedClusterRole, BoundedTransportConfig,
667 ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError, RemoteSpawnSpec,
668 };
669 use crate::{Engine, EngineConfig};
670 use palladium_actor::{
671 Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
672 Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
673 };
674 use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
675 use rcgen::{
676 BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
677 KeyUsagePurpose,
678 };
679 use std::net::{SocketAddr, TcpListener, UdpSocket};
680 use std::sync::Arc;
681 use std::time::{Duration, Instant};
682
683 fn make_ca() -> (Certificate, KeyPair) {
684 let mut params = CertificateParams::default();
685 params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
686 params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
687 let ca_key = KeyPair::generate().expect("ca keypair");
688 let cert = params.self_signed(&ca_key).expect("ca cert");
689 (cert, ca_key)
690 }
691
692 fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
693 let keypair = KeyPair::generate().expect("leaf keypair");
694 let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
695 let mut params = params;
696 params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
697 params.extended_key_usages = vec![
698 ExtendedKeyUsagePurpose::ServerAuth,
699 ExtendedKeyUsagePurpose::ClientAuth,
700 ];
701 let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
702 let cert_der = cert.der().to_vec();
703 let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
704 let ca_der = ca.der().to_vec();
705 TlsConfig {
706 cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
707 private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
708 trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
709 }
710 }
711
712 fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
713 TcpTransportConfig {
714 engine_id: EngineId::new(engine_id),
715 listen_addr: "127.0.0.1:0".parse().unwrap(),
716 max_connections_per_peer: 1,
717 idle_timeout: Duration::from_secs(1),
718 send_buffer_size: 8,
719 nodelay: true,
720 tls,
721 send_delay: Duration::from_millis(0),
722 }
723 }
724
725 fn reserve_tcp_addr() -> SocketAddr {
726 let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
727 let addr = listener.local_addr().expect("tcp local addr");
728 drop(listener);
729 addr
730 }
731
732 fn reserve_udp_addr() -> SocketAddr {
733 let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
734 let addr = socket.local_addr().expect("udp local addr");
735 drop(socket);
736 addr
737 }
738
739 #[test]
740 fn bounded_cluster_config_rejects_duplicate_peers() {
741 let (ca, ca_key) = make_ca();
742 let engine = Engine::with_config(EngineConfig {
743 engine_id: EngineId::new("engine-a.example"),
744 ..Default::default()
745 });
746 let config = BoundedClusterConfig {
747 transport: BoundedTransportConfig::Tcp(tcp_config(
748 "engine-a.example",
749 make_tls_config("engine-a.example", &ca, &ca_key),
750 )),
751 peers: vec![
752 PeerSpec {
753 engine_id: EngineId::new("engine-b.example"),
754 addr: "127.0.0.1:4100".parse().unwrap(),
755 control_plane_addr: None,
756 server_name: None,
757 },
758 PeerSpec {
759 engine_id: EngineId::new("engine-b.example"),
760 addr: "127.0.0.1:4200".parse().unwrap(),
761 control_plane_addr: None,
762 server_name: None,
763 },
764 ],
765 declared_actors: Vec::new(),
766 roles: Vec::new(),
767 };
768
769 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
770 assert_eq!(
771 err,
772 ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
773 );
774 }
775
776 #[test]
777 fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
778 let (ca, ca_key) = make_ca();
779 let engine = Engine::with_config(EngineConfig {
780 engine_id: EngineId::new("engine-a.example"),
781 ..Default::default()
782 });
783 let config = BoundedClusterConfig {
784 transport: BoundedTransportConfig::Tcp(tcp_config(
785 "engine-a.example",
786 make_tls_config("engine-a.example", &ca, &ca_key),
787 )),
788 peers: vec![PeerSpec {
789 engine_id: EngineId::new("engine-b.example"),
790 addr: "127.0.0.1:4100".parse().unwrap(),
791 control_plane_addr: None,
792 server_name: None,
793 }],
794 declared_actors: vec![DeclaredRemoteActor {
795 path: ActorPath::parse("/user/missing-owner").unwrap(),
796 owner: EngineId::new("engine-c.example"),
797 }],
798 roles: Vec::new(),
799 };
800
801 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
802 assert_eq!(
803 err,
804 ClusterError::UnknownPeerOwner {
805 path: ActorPath::parse("/user/missing-owner").unwrap(),
806 owner: EngineId::new("engine-c.example"),
807 }
808 );
809 }
810
811 #[test]
812 fn bounded_cluster_config_rejects_duplicate_roles() {
813 let (ca, ca_key) = make_ca();
814 let engine = Engine::with_config(EngineConfig {
815 engine_id: EngineId::new("engine-a.example"),
816 ..Default::default()
817 });
818 let config = BoundedClusterConfig {
819 transport: BoundedTransportConfig::Tcp(tcp_config(
820 "engine-a.example",
821 make_tls_config("engine-a.example", &ca, &ca_key),
822 )),
823 peers: vec![PeerSpec {
824 engine_id: EngineId::new("engine-b.example"),
825 addr: "127.0.0.1:4100".parse().unwrap(),
826 control_plane_addr: None,
827 server_name: None,
828 }],
829 declared_actors: Vec::new(),
830 roles: vec![
831 BoundedClusterRole {
832 name: "context-service".to_string(),
833 owner: EngineId::new("engine-b.example"),
834 },
835 BoundedClusterRole {
836 name: "context-service".to_string(),
837 owner: EngineId::new("engine-b.example"),
838 },
839 ],
840 };
841
842 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
843 assert_eq!(
844 err,
845 ClusterError::DuplicateRole("context-service".to_string())
846 );
847 }
848
849 #[test]
850 fn bounded_cluster_config_rejects_unknown_role_owner() {
851 let (ca, ca_key) = make_ca();
852 let engine = Engine::with_config(EngineConfig {
853 engine_id: EngineId::new("engine-a.example"),
854 ..Default::default()
855 });
856 let config = BoundedClusterConfig {
857 transport: BoundedTransportConfig::Tcp(tcp_config(
858 "engine-a.example",
859 make_tls_config("engine-a.example", &ca, &ca_key),
860 )),
861 peers: vec![PeerSpec {
862 engine_id: EngineId::new("engine-b.example"),
863 addr: "127.0.0.1:4100".parse().unwrap(),
864 control_plane_addr: None,
865 server_name: None,
866 }],
867 declared_actors: Vec::new(),
868 roles: vec![BoundedClusterRole {
869 name: "gateway".to_string(),
870 owner: EngineId::new("engine-c.example"),
871 }],
872 };
873
874 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
875 assert_eq!(
876 err,
877 ClusterError::UnknownRoleOwner {
878 role: "gateway".to_string(),
879 owner: EngineId::new("engine-c.example"),
880 }
881 );
882 }
883
884 #[test]
885 fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
886 assert_eq!(
887 super::remote_spawn_wait_timeout(Duration::from_millis(25)),
888 Duration::from_secs(5)
889 );
890 assert_eq!(
891 super::remote_spawn_wait_timeout(Duration::from_secs(8)),
892 Duration::from_secs(8)
893 );
894 }
895
896 fn tcp_tests_available() -> bool {
897 if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
898 return false;
899 }
900 std::net::TcpListener::bind("127.0.0.1:0").is_ok()
901 }
902
903 fn quic_tests_available() -> bool {
904 rustls::crypto::CryptoProvider::get_default().is_some()
905 }
906
907 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
908 struct EchoRequest {
909 value: u64,
910 }
911
912 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
913 struct EchoResponse {
914 doubled: u64,
915 }
916
917 impl Message for EchoRequest {
918 type Response = EchoResponse;
919 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
920 }
921
922 struct EchoActor;
923
924 impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
925 fn on_message(
926 &mut self,
927 ctx: &mut ActorContext<R>,
928 envelope: &Envelope,
929 payload: MessagePayload,
930 ) -> Result<(), ActorError> {
931 let req = payload
932 .extract::<EchoRequest>()
933 .map_err(|_| ActorError::Handler)?;
934 let resp = EchoResponse {
935 doubled: req.value * 2,
936 };
937 let resp_env = envelope.response(0);
938 ctx.send_raw(resp_env, MessagePayload::local(resp))
939 .map_err(|_| ActorError::Handler)
940 }
941 }
942
943 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
944 struct CrashRequest;
945
946 impl Message for CrashRequest {
947 type Response = ();
948 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
949 }
950
951 struct SpawnedEchoActor;
952
953 impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
954 fn on_message(
955 &mut self,
956 ctx: &mut ActorContext<R>,
957 envelope: &Envelope,
958 payload: MessagePayload,
959 ) -> Result<(), ActorError> {
960 match envelope.type_tag {
961 EchoRequest::TYPE_TAG => {
962 let req = payload
963 .extract::<EchoRequest>()
964 .map_err(|_| ActorError::Handler)?;
965 let resp = EchoResponse {
966 doubled: req.value * 2,
967 };
968 let resp_env = envelope.response(0);
969 ctx.send_raw(resp_env, MessagePayload::local(resp))
970 .map_err(|_| ActorError::Handler)
971 }
972 CrashRequest::TYPE_TAG => Err(ActorError::Handler),
973 _ => Err(ActorError::Handler),
974 }
975 }
976 }
977
978 fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
979 Arc::new(|type_name: &str, _config: &[u8]| match type_name {
980 "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
981 _ => Err(format!("unknown remote spawn type: {type_name}")),
982 })
983 }
984
985 #[tokio::test]
986 async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
987 if !tcp_tests_available() {
988 eprintln!("skipping tcp integration tests: network bind not permitted");
989 return;
990 }
991
992 let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
993 let (ca, ca_key) = make_ca();
994 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
995 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
996
997 let mut engine_a = Engine::with_config(EngineConfig {
998 engine_id: EngineId::new("engine-a.example"),
999 ..Default::default()
1000 });
1001 let engine_b = Engine::with_config(EngineConfig {
1002 engine_id: EngineId::new("engine-b.example"),
1003 ..Default::default()
1004 });
1005
1006 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1007 engine_a.add_user_actor(ChildSpec::new(
1008 "cluster-echo",
1009 RestartPolicy::Permanent,
1010 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1011 ns,
1012 move || Box::new(EchoActor),
1013 ));
1014
1015 let handle_a = engine_a.handle();
1016 let handle_b = engine_b.handle();
1017 handle_a
1018 .type_registry()
1019 .register_remote_ask::<EchoRequest>();
1020 handle_b
1021 .type_registry()
1022 .register_remote_ask::<EchoRequest>();
1023
1024 let cluster_a = handle_a
1025 .attach_bounded_cluster(BoundedClusterConfig {
1026 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1027 peers: Vec::new(),
1028 declared_actors: Vec::new(),
1029 roles: Vec::new(),
1030 })
1031 .await
1032 .expect("attach bounded cluster a");
1033 let tcp_a = cluster_a
1034 .transport()
1035 .as_tcp()
1036 .cloned()
1037 .expect("tcp transport a");
1038
1039 let cluster_b = handle_b
1040 .attach_bounded_cluster(BoundedClusterConfig {
1041 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1042 peers: vec![PeerSpec {
1043 engine_id: EngineId::new("engine-a.example"),
1044 addr: tcp_a.local_addr(),
1045 control_plane_addr: None,
1046 server_name: None,
1047 }],
1048 declared_actors: Vec::new(),
1049 roles: Vec::new(),
1050 })
1051 .await
1052 .expect("attach bounded cluster b");
1053 let tcp_b = cluster_b
1054 .transport()
1055 .as_tcp()
1056 .cloned()
1057 .expect("tcp transport b");
1058
1059 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1060 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1061 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1062
1063 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1064 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1065
1066 let canonical = AddrHash::new(&actor_path, 0);
1067 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1068 while !cluster_b.can_route(&actor_path) {
1069 if std::time::Instant::now() > deadline {
1070 panic!("bounded cluster route did not propagate");
1071 }
1072 tokio::time::sleep(Duration::from_millis(20)).await;
1073 }
1074 assert!(tcp_b.can_route(canonical));
1075
1076 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1077 let response = remote
1078 .ask(EchoRequest { value: 12 })
1079 .await
1080 .expect("bounded cluster ask");
1081 assert_eq!(response, EchoResponse { doubled: 24 });
1082
1083 shutdown_a_tx.send(()).ok();
1084 shutdown_b_tx.send(()).ok();
1085 }
1086
1087 #[tokio::test]
1088 async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1089 if !tcp_tests_available() {
1090 eprintln!("skipping tcp integration tests: network bind not permitted");
1091 return;
1092 }
1093
1094 let (ca, ca_key) = make_ca();
1095 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1096 let engine = Engine::with_config(EngineConfig {
1097 engine_id: EngineId::new("engine-a.example"),
1098 ..Default::default()
1099 });
1100 let handle = engine.handle();
1101
1102 let mut cluster = handle
1103 .attach_bounded_cluster(BoundedClusterConfig {
1104 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1105 peers: vec![PeerSpec {
1106 engine_id: EngineId::new("engine-b.example"),
1107 addr: "127.0.0.1:4100".parse().unwrap(),
1108 control_plane_addr: None,
1109 server_name: None,
1110 }],
1111 declared_actors: Vec::new(),
1112 roles: Vec::new(),
1113 })
1114 .await
1115 .expect("attach bounded cluster");
1116
1117 let path = ActorPath::parse("/user/declared-remote").unwrap();
1118 cluster
1119 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1120 .expect("declare remote actor");
1121
1122 assert!(cluster.can_route(&path));
1123 assert!(cluster
1124 .transport()
1125 .as_tcp()
1126 .expect("tcp transport")
1127 .can_route(AddrHash::new(&path, 0)));
1128 }
1129
1130 #[tokio::test]
1131 async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1132 if !tcp_tests_available() {
1133 eprintln!("skipping tcp integration tests: network bind not permitted");
1134 return;
1135 }
1136
1137 let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1138 let (ca, ca_key) = make_ca();
1139 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1140 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1141 let control_plane_b = reserve_tcp_addr();
1142
1143 let mut engine_a = Engine::with_config(EngineConfig {
1144 engine_id: EngineId::new("engine-a.example"),
1145 ..Default::default()
1146 });
1147 let engine_b = Engine::with_config(EngineConfig {
1148 engine_id: EngineId::new("engine-b.example"),
1149 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1150 control_plane_tls: Some(tls_b.clone()),
1151 actor_spawn: Some(bounded_spawn_handler()),
1152 ..Default::default()
1153 });
1154
1155 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1156 engine_a.add_user_actor(ChildSpec::new(
1157 "local-remote-collision",
1158 RestartPolicy::Permanent,
1159 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1160 ns,
1161 move || Box::new(EchoActor),
1162 ));
1163
1164 let handle_a = engine_a.handle();
1165 let handle_b = engine_b.handle();
1166
1167 let cluster_b = handle_b
1168 .attach_bounded_cluster(BoundedClusterConfig {
1169 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1170 peers: Vec::new(),
1171 declared_actors: Vec::new(),
1172 roles: Vec::new(),
1173 })
1174 .await
1175 .expect("attach bounded cluster b");
1176 let tcp_b = cluster_b
1177 .transport()
1178 .as_tcp()
1179 .cloned()
1180 .expect("tcp transport b");
1181
1182 let cluster_a = handle_a
1183 .attach_bounded_cluster(BoundedClusterConfig {
1184 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1185 peers: vec![PeerSpec {
1186 engine_id: EngineId::new("engine-b.example"),
1187 addr: tcp_b.local_addr(),
1188 control_plane_addr: Some(control_plane_b),
1189 server_name: None,
1190 }],
1191 declared_actors: Vec::new(),
1192 roles: vec![BoundedClusterRole {
1193 name: "context-service".to_string(),
1194 owner: EngineId::new("engine-b.example"),
1195 }],
1196 })
1197 .await
1198 .expect("attach bounded cluster a");
1199
1200 let err = cluster_a
1201 .spawn_remote_on_role::<EchoRequest>(
1202 "context-service",
1203 RemoteSpawnSpec {
1204 path: actor_path.clone(),
1205 type_name: "bounded-echo".to_string(),
1206 config: None,
1207 },
1208 )
1209 .await
1210 .expect_err("local path collision should fail");
1211 assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1212 }
1213
1214 #[tokio::test]
1215 async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1216 if !tcp_tests_available() {
1217 eprintln!("skipping tcp integration tests: network bind not permitted");
1218 return;
1219 }
1220
1221 let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1222 let (ca, ca_key) = make_ca();
1223 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1224 let engine = Engine::with_config(EngineConfig {
1225 engine_id: EngineId::new("engine-a.example"),
1226 ..Default::default()
1227 });
1228 let handle = engine.handle();
1229
1230 let cluster = handle
1231 .attach_bounded_cluster(BoundedClusterConfig {
1232 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1233 peers: vec![
1234 PeerSpec {
1235 engine_id: EngineId::new("engine-b.example"),
1236 addr: "127.0.0.1:4100".parse().unwrap(),
1237 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1238 server_name: None,
1239 },
1240 PeerSpec {
1241 engine_id: EngineId::new("engine-c.example"),
1242 addr: "127.0.0.1:4200".parse().unwrap(),
1243 control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1244 server_name: None,
1245 },
1246 ],
1247 declared_actors: vec![DeclaredRemoteActor {
1248 path: actor_path.clone(),
1249 owner: EngineId::new("engine-b.example"),
1250 }],
1251 roles: Vec::new(),
1252 })
1253 .await
1254 .expect("attach bounded cluster");
1255
1256 let err = cluster
1257 .spawn_remote::<EchoRequest>(
1258 EngineId::new("engine-c.example"),
1259 RemoteSpawnSpec {
1260 path: actor_path.clone(),
1261 type_name: "bounded-echo".to_string(),
1262 config: None,
1263 },
1264 )
1265 .await
1266 .expect_err("conflicting declared owner should fail");
1267 assert_eq!(
1268 err,
1269 RemoteSpawnError::RemoteOwnershipConflict {
1270 path: actor_path,
1271 owner: EngineId::new("engine-b.example"),
1272 }
1273 );
1274 }
1275
1276 #[tokio::test]
1277 async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1278 if !tcp_tests_available() {
1279 eprintln!("skipping tcp integration tests: network bind not permitted");
1280 return;
1281 }
1282
1283 let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1284 let (ca, ca_key) = make_ca();
1285 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1286 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1287 let control_plane_b = reserve_tcp_addr();
1288
1289 let engine_a = Engine::with_config(EngineConfig {
1290 engine_id: EngineId::new("engine-a.example"),
1291 ..Default::default()
1292 });
1293 let engine_b = Engine::with_config(EngineConfig {
1294 engine_id: EngineId::new("engine-b.example"),
1295 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1296 control_plane_tls: Some(tls_b.clone()),
1297 actor_spawn: Some(bounded_spawn_handler()),
1298 ..Default::default()
1299 });
1300
1301 let handle_a = engine_a.handle();
1302 let handle_b = engine_b.handle();
1303 handle_a
1304 .type_registry()
1305 .register_remote_ask::<EchoRequest>();
1306 handle_b
1307 .type_registry()
1308 .register_remote_ask::<EchoRequest>();
1309
1310 let cluster_a = handle_a
1311 .attach_bounded_cluster(BoundedClusterConfig {
1312 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1313 peers: Vec::new(),
1314 declared_actors: Vec::new(),
1315 roles: Vec::new(),
1316 })
1317 .await
1318 .expect("attach bounded cluster a");
1319 let tcp_a = cluster_a
1320 .transport()
1321 .as_tcp()
1322 .cloned()
1323 .expect("tcp transport a");
1324
1325 let cluster_b = handle_b
1326 .attach_bounded_cluster(BoundedClusterConfig {
1327 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1328 peers: vec![PeerSpec {
1329 engine_id: EngineId::new("engine-a.example"),
1330 addr: tcp_a.local_addr(),
1331 control_plane_addr: None,
1332 server_name: None,
1333 }],
1334 declared_actors: Vec::new(),
1335 roles: Vec::new(),
1336 })
1337 .await
1338 .expect("attach bounded cluster b");
1339 let tcp_b = cluster_b
1340 .transport()
1341 .as_tcp()
1342 .cloned()
1343 .expect("tcp transport b");
1344
1345 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1346 let mut cluster_a = cluster_a;
1347 cluster_a
1348 .add_peer(PeerSpec {
1349 engine_id: EngineId::new("engine-b.example"),
1350 addr: tcp_b.local_addr(),
1351 control_plane_addr: Some(control_plane_b),
1352 server_name: None,
1353 })
1354 .expect("add peer with control plane");
1355
1356 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1357 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1358
1359 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1360 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1361 tokio::time::sleep(Duration::from_millis(150)).await;
1362
1363 let remote = cluster_a
1364 .spawn_remote::<EchoRequest>(
1365 EngineId::new("engine-b.example"),
1366 RemoteSpawnSpec {
1367 path: actor_path.clone(),
1368 type_name: "bounded-echo".to_string(),
1369 config: None,
1370 },
1371 )
1372 .await
1373 .expect("spawn remote actor");
1374
1375 let response = remote
1376 .ask(EchoRequest { value: 21 })
1377 .await
1378 .expect("bounded remote spawn ask");
1379 assert_eq!(response, EchoResponse { doubled: 42 });
1380
1381 let duplicate = cluster_a
1382 .spawn_remote::<EchoRequest>(
1383 EngineId::new("engine-b.example"),
1384 RemoteSpawnSpec {
1385 path: actor_path.clone(),
1386 type_name: "bounded-echo".to_string(),
1387 config: None,
1388 },
1389 )
1390 .await
1391 .expect_err("duplicate remote spawn should fail");
1392 assert_eq!(
1393 duplicate,
1394 RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
1395 );
1396
1397 let slot = handle_b
1398 .registry
1399 .get_by_path(&actor_path)
1400 .expect("spawned actor registered on host");
1401 handle_b.registry.increment_restart_count(slot.addr);
1402 slot.ctrl_tx
1403 .send(crate::common::LifecycleSignal::Stop(
1404 palladium_actor::StopReason::Killed,
1405 ))
1406 .ok();
1407
1408 let restart_deadline = Instant::now() + Duration::from_secs(5);
1409 loop {
1410 match remote.ask(EchoRequest { value: 7 }).await {
1411 Ok(response) => {
1412 assert_eq!(response, EchoResponse { doubled: 14 });
1413 break;
1414 }
1415 Err(_) if Instant::now() <= restart_deadline => {
1416 tokio::time::sleep(Duration::from_millis(25)).await;
1417 }
1418 Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1419 }
1420 }
1421
1422 let unknown = cluster_a
1423 .spawn_remote::<EchoRequest>(
1424 EngineId::new("engine-c.example"),
1425 RemoteSpawnSpec {
1426 path: ActorPath::parse("/user/unknown-target").unwrap(),
1427 type_name: "bounded-echo".to_string(),
1428 config: None,
1429 },
1430 )
1431 .await
1432 .expect_err("unknown target should fail");
1433 assert_eq!(
1434 unknown,
1435 RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1436 );
1437
1438 shutdown_a_tx.send(()).ok();
1439 shutdown_b_tx.send(()).ok();
1440 }
1441
1442 #[tokio::test]
1443 async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1444 if !tcp_tests_available() {
1445 eprintln!("skipping tcp integration tests: network bind not permitted");
1446 return;
1447 }
1448
1449 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1450 let (ca, ca_key) = make_ca();
1451 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1452 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1453 let control_plane_b = reserve_tcp_addr();
1454
1455 let engine_a = Engine::with_config(EngineConfig {
1456 engine_id: EngineId::new("engine-a.example"),
1457 ..Default::default()
1458 });
1459 let engine_b = Engine::with_config(EngineConfig {
1460 engine_id: EngineId::new("engine-b.example"),
1461 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1462 control_plane_tls: Some(tls_b.clone()),
1463 actor_spawn: Some(bounded_spawn_handler()),
1464 ..Default::default()
1465 });
1466
1467 let handle_a = engine_a.handle();
1468 let handle_b = engine_b.handle();
1469 handle_a
1470 .type_registry()
1471 .register_remote_ask::<EchoRequest>();
1472 handle_b
1473 .type_registry()
1474 .register_remote_ask::<EchoRequest>();
1475
1476 let cluster_b = handle_b
1477 .attach_bounded_cluster(BoundedClusterConfig {
1478 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1479 peers: Vec::new(),
1480 declared_actors: Vec::new(),
1481 roles: Vec::new(),
1482 })
1483 .await
1484 .expect("attach bounded cluster b");
1485 let tcp_b = cluster_b
1486 .transport()
1487 .as_tcp()
1488 .cloned()
1489 .expect("tcp transport b");
1490
1491 let cluster_a = handle_a
1492 .attach_bounded_cluster(BoundedClusterConfig {
1493 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1494 peers: vec![PeerSpec {
1495 engine_id: EngineId::new("engine-b.example"),
1496 addr: tcp_b.local_addr(),
1497 control_plane_addr: Some(control_plane_b),
1498 server_name: None,
1499 }],
1500 declared_actors: Vec::new(),
1501 roles: vec![BoundedClusterRole {
1502 name: "context-service".to_string(),
1503 owner: EngineId::new("engine-b.example"),
1504 }],
1505 })
1506 .await
1507 .expect("attach bounded cluster a");
1508 let tcp_a = cluster_a
1509 .transport()
1510 .as_tcp()
1511 .cloned()
1512 .expect("tcp transport a");
1513
1514 let mut cluster_b = cluster_b;
1515 cluster_b
1516 .add_peer(PeerSpec {
1517 engine_id: EngineId::new("engine-a.example"),
1518 addr: tcp_a.local_addr(),
1519 control_plane_addr: None,
1520 server_name: None,
1521 })
1522 .expect("add peer a to cluster b");
1523
1524 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1525 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1526
1527 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1528 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1529 tokio::time::sleep(Duration::from_millis(150)).await;
1530
1531 let remote = cluster_a
1532 .spawn_remote_on_role::<EchoRequest>(
1533 "context-service",
1534 RemoteSpawnSpec {
1535 path: actor_path.clone(),
1536 type_name: "bounded-echo".to_string(),
1537 config: None,
1538 },
1539 )
1540 .await
1541 .expect("spawn remote actor by role over tcp");
1542
1543 let response = remote
1544 .ask(EchoRequest { value: 11 })
1545 .await
1546 .expect("bounded remote spawn ask over tcp by role");
1547 assert_eq!(response, EchoResponse { doubled: 22 });
1548
1549 let unknown_role = cluster_a
1550 .spawn_remote_on_role::<EchoRequest>(
1551 "missing-role",
1552 RemoteSpawnSpec {
1553 path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1554 type_name: "bounded-echo".to_string(),
1555 config: None,
1556 },
1557 )
1558 .await
1559 .expect_err("unknown role should fail");
1560 assert_eq!(
1561 unknown_role,
1562 RemoteSpawnError::UnknownRole("missing-role".to_string())
1563 );
1564
1565 shutdown_a_tx.send(()).ok();
1566 shutdown_b_tx.send(()).ok();
1567 }
1568
1569 #[tokio::test]
1570 async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
1571 if !tcp_tests_available() {
1572 eprintln!("skipping tcp integration tests: network bind not permitted");
1573 return;
1574 }
1575
1576 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
1577 let (ca, ca_key) = make_ca();
1578 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1579 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1580 let control_plane_b = reserve_tcp_addr();
1581
1582 let engine_a = Engine::with_config(EngineConfig {
1583 engine_id: EngineId::new("engine-a.example"),
1584 ..Default::default()
1585 });
1586 let engine_b = Engine::with_config(EngineConfig {
1587 engine_id: EngineId::new("engine-b.example"),
1588 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1589 control_plane_tls: Some(tls_b.clone()),
1590 actor_spawn: Some(bounded_spawn_handler()),
1591 ..Default::default()
1592 });
1593
1594 let handle_a = engine_a.handle();
1595 let handle_b = engine_b.handle();
1596 handle_a
1597 .type_registry()
1598 .register_remote_ask::<EchoRequest>();
1599 handle_b
1600 .type_registry()
1601 .register_remote_ask::<EchoRequest>();
1602
1603 let cluster_b = handle_b
1604 .attach_bounded_cluster(BoundedClusterConfig {
1605 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1606 peers: Vec::new(),
1607 declared_actors: Vec::new(),
1608 roles: Vec::new(),
1609 })
1610 .await
1611 .expect("attach bounded cluster b");
1612 let tcp_b = cluster_b
1613 .transport()
1614 .as_tcp()
1615 .cloned()
1616 .expect("tcp transport b");
1617
1618 let cluster_a = handle_a
1619 .attach_bounded_cluster(BoundedClusterConfig {
1620 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1621 peers: vec![PeerSpec {
1622 engine_id: EngineId::new("engine-b.example"),
1623 addr: tcp_b.local_addr(),
1624 control_plane_addr: Some(control_plane_b),
1625 server_name: None,
1626 }],
1627 declared_actors: Vec::new(),
1628 roles: vec![BoundedClusterRole {
1629 name: "context-service".to_string(),
1630 owner: EngineId::new("engine-b.example"),
1631 }],
1632 })
1633 .await
1634 .expect("attach bounded cluster a");
1635 let tcp_a = cluster_a
1636 .transport()
1637 .as_tcp()
1638 .cloned()
1639 .expect("tcp transport a");
1640
1641 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1642 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1643
1644 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1645 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1646
1647 let _ = tcp_a;
1651
1652 let remote = cluster_a
1653 .spawn_remote_on_role::<EchoRequest>(
1654 "context-service",
1655 RemoteSpawnSpec {
1656 path: actor_path.clone(),
1657 type_name: "bounded-echo".to_string(),
1658 config: None,
1659 },
1660 )
1661 .await
1662 .expect("spawn remote actor by role over tcp without reverse declaration");
1663
1664 let response = remote
1665 .ask(EchoRequest { value: 13 })
1666 .await
1667 .expect("bounded remote spawn ask over tcp without reverse declaration");
1668 assert_eq!(response, EchoResponse { doubled: 26 });
1669
1670 shutdown_a_tx.send(()).ok();
1671 shutdown_b_tx.send(()).ok();
1672 }
1673
1674 fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
1675 QuicTransportConfig {
1676 engine_id: EngineId::new(engine_id),
1677 listen_addr: "127.0.0.1:0".parse().unwrap(),
1678 send_buffer_size: 1024,
1679 idle_timeout: Duration::from_secs(2),
1680 tls,
1681 }
1682 }
1683
1684 #[tokio::test]
1685 async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
1686 if !quic_tests_available() {
1687 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1688 return;
1689 }
1690
1691 let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
1692 let (ca, ca_key) = make_ca();
1693 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1694 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1695
1696 let mut engine_a = Engine::with_config(EngineConfig {
1697 engine_id: EngineId::new("engine-a.example"),
1698 ..Default::default()
1699 });
1700 let engine_b = Engine::with_config(EngineConfig {
1701 engine_id: EngineId::new("engine-b.example"),
1702 ..Default::default()
1703 });
1704
1705 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1706 engine_a.add_user_actor(ChildSpec::new(
1707 "cluster-echo-quic",
1708 RestartPolicy::Permanent,
1709 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1710 ns,
1711 move || Box::new(EchoActor),
1712 ));
1713
1714 let handle_a = engine_a.handle();
1715 let handle_b = engine_b.handle();
1716 handle_a
1717 .type_registry()
1718 .register_remote_ask::<EchoRequest>();
1719 handle_b
1720 .type_registry()
1721 .register_remote_ask::<EchoRequest>();
1722
1723 let cluster_a = match handle_a
1724 .attach_bounded_cluster(BoundedClusterConfig {
1725 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1726 peers: Vec::new(),
1727 declared_actors: Vec::new(),
1728 roles: vec![BoundedClusterRole {
1729 name: "context-service".to_string(),
1730 owner: EngineId::new("engine-b.example"),
1731 }],
1732 })
1733 .await
1734 {
1735 Ok(cluster) => cluster,
1736 Err(ClusterError::Transport(err)) => {
1737 eprintln!("skipping quic bounded-cluster test: {err:?}");
1738 return;
1739 }
1740 Err(err) => panic!("attach bounded cluster a: {err:?}"),
1741 };
1742 let quic_a = cluster_a
1743 .transport()
1744 .as_quic()
1745 .cloned()
1746 .expect("quic transport a");
1747
1748 let cluster_b = match handle_b
1749 .attach_bounded_cluster(BoundedClusterConfig {
1750 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1751 peers: vec![PeerSpec {
1752 engine_id: EngineId::new("engine-a.example"),
1753 addr: quic_a.local_addr(),
1754 control_plane_addr: None,
1755 server_name: None,
1756 }],
1757 declared_actors: Vec::new(),
1758 roles: Vec::new(),
1759 })
1760 .await
1761 {
1762 Ok(cluster) => cluster,
1763 Err(ClusterError::Transport(err)) => {
1764 eprintln!("skipping quic bounded-cluster test: {err:?}");
1765 return;
1766 }
1767 Err(err) => panic!("attach bounded cluster b: {err:?}"),
1768 };
1769 let quic_b = cluster_b
1770 .transport()
1771 .as_quic()
1772 .cloned()
1773 .expect("quic transport b");
1774
1775 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1776 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1777 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1778
1779 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1780 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1781
1782 let canonical = AddrHash::new(&actor_path, 0);
1783 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1784 while !cluster_b.can_route(&actor_path) {
1785 if std::time::Instant::now() > deadline {
1786 panic!("bounded quic cluster route did not propagate");
1787 }
1788 tokio::time::sleep(Duration::from_millis(20)).await;
1789 }
1790 assert!(quic_b.can_route(canonical));
1791
1792 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1793 let response = remote
1794 .ask(EchoRequest { value: 15 })
1795 .await
1796 .expect("bounded cluster ask over quic");
1797 assert_eq!(response, EchoResponse { doubled: 30 });
1798
1799 shutdown_a_tx.send(()).ok();
1800 shutdown_b_tx.send(()).ok();
1801 }
1802
1803 #[tokio::test]
1804 async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
1805 if !quic_tests_available() {
1806 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1807 return;
1808 }
1809
1810 let (ca, ca_key) = make_ca();
1811 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1812 let engine = Engine::with_config(EngineConfig {
1813 engine_id: EngineId::new("engine-a.example"),
1814 ..Default::default()
1815 });
1816 let handle = engine.handle();
1817
1818 let mut cluster = match handle
1819 .attach_bounded_cluster(BoundedClusterConfig {
1820 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
1821 peers: vec![PeerSpec {
1822 engine_id: EngineId::new("engine-b.example"),
1823 addr: "127.0.0.1:4100".parse().unwrap(),
1824 control_plane_addr: None,
1825 server_name: None,
1826 }],
1827 declared_actors: Vec::new(),
1828 roles: Vec::new(),
1829 })
1830 .await
1831 {
1832 Ok(cluster) => cluster,
1833 Err(ClusterError::Transport(err)) => {
1834 eprintln!("skipping quic bounded-cluster test: {err:?}");
1835 return;
1836 }
1837 Err(err) => panic!("attach bounded cluster: {err:?}"),
1838 };
1839
1840 let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
1841 cluster
1842 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1843 .expect("declare remote actor");
1844
1845 assert!(cluster.can_route(&path));
1846 assert!(cluster
1847 .transport()
1848 .as_quic()
1849 .expect("quic transport")
1850 .can_route(AddrHash::new(&path, 0)));
1851 }
1852
1853 #[tokio::test]
1854 async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
1855 if !quic_tests_available() {
1856 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
1857 return;
1858 }
1859
1860 let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
1861 let (ca, ca_key) = make_ca();
1862 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1863 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1864 let control_plane_b = reserve_udp_addr();
1865
1866 let engine_a = Engine::with_config(EngineConfig {
1867 engine_id: EngineId::new("engine-a.example"),
1868 ..Default::default()
1869 });
1870 let engine_b = Engine::with_config(EngineConfig {
1871 engine_id: EngineId::new("engine-b.example"),
1872 control_plane_quic_addr: Some(control_plane_b.to_string()),
1873 control_plane_tls: Some(tls_b.clone()),
1874 actor_spawn: Some(bounded_spawn_handler()),
1875 ..Default::default()
1876 });
1877
1878 let handle_a = engine_a.handle();
1879 let handle_b = engine_b.handle();
1880 handle_a
1881 .type_registry()
1882 .register_remote_ask::<EchoRequest>();
1883 handle_b
1884 .type_registry()
1885 .register_remote_ask::<EchoRequest>();
1886
1887 let cluster_a = match handle_a
1888 .attach_bounded_cluster(BoundedClusterConfig {
1889 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
1890 peers: Vec::new(),
1891 declared_actors: Vec::new(),
1892 roles: Vec::new(),
1893 })
1894 .await
1895 {
1896 Ok(cluster) => cluster,
1897 Err(ClusterError::Transport(err)) => {
1898 eprintln!("skipping quic bounded-cluster test: {err:?}");
1899 return;
1900 }
1901 Err(err) => panic!("attach bounded cluster a: {err:?}"),
1902 };
1903 let quic_a = cluster_a
1904 .transport()
1905 .as_quic()
1906 .cloned()
1907 .expect("quic transport a");
1908
1909 let cluster_b = match handle_b
1910 .attach_bounded_cluster(BoundedClusterConfig {
1911 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
1912 peers: vec![PeerSpec {
1913 engine_id: EngineId::new("engine-a.example"),
1914 addr: quic_a.local_addr(),
1915 control_plane_addr: None,
1916 server_name: None,
1917 }],
1918 declared_actors: Vec::new(),
1919 roles: Vec::new(),
1920 })
1921 .await
1922 {
1923 Ok(cluster) => cluster,
1924 Err(ClusterError::Transport(err)) => {
1925 eprintln!("skipping quic bounded-cluster test: {err:?}");
1926 return;
1927 }
1928 Err(err) => panic!("attach bounded cluster b: {err:?}"),
1929 };
1930 let quic_b = cluster_b
1931 .transport()
1932 .as_quic()
1933 .cloned()
1934 .expect("quic transport b");
1935
1936 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
1937 let mut cluster_a = cluster_a;
1938 cluster_a
1939 .add_peer(PeerSpec {
1940 engine_id: EngineId::new("engine-b.example"),
1941 addr: quic_b.local_addr(),
1942 control_plane_addr: Some(control_plane_b),
1943 server_name: None,
1944 })
1945 .expect("add quic peer with control plane");
1946
1947 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1948 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1949
1950 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1951 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1952 tokio::time::sleep(Duration::from_millis(150)).await;
1953
1954 let remote = cluster_a
1955 .spawn_remote_on_role::<EchoRequest>(
1956 "context-service",
1957 RemoteSpawnSpec {
1958 path: actor_path.clone(),
1959 type_name: "bounded-echo".to_string(),
1960 config: None,
1961 },
1962 )
1963 .await
1964 .expect("spawn remote actor over quic by role");
1965
1966 let response = remote
1967 .ask(EchoRequest { value: 9 })
1968 .await
1969 .expect("bounded remote spawn ask over quic by role");
1970 assert_eq!(response, EchoResponse { doubled: 18 });
1971
1972 let unknown_role = cluster_a
1973 .spawn_remote_on_role::<EchoRequest>(
1974 "missing-role",
1975 RemoteSpawnSpec {
1976 path: ActorPath::parse("/user/missing-role").unwrap(),
1977 type_name: "bounded-echo".to_string(),
1978 config: None,
1979 },
1980 )
1981 .await
1982 .expect_err("unknown role should fail");
1983 assert_eq!(
1984 unknown_role,
1985 RemoteSpawnError::UnknownRole("missing-role".to_string())
1986 );
1987
1988 shutdown_a_tx.send(()).ok();
1989 shutdown_b_tx.send(()).ok();
1990 }
1991}