1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use dashmap::DashMap;
10use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
11use palladium_transport::network::{Network, TokioNetwork};
12use palladium_transport::{
13 mailbox, QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport,
14 TransportError,
15};
16use rustls::pki_types::ServerName;
17use serde_json::{json, Value};
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
19use tokio_rustls::TlsConnector;
20
21trait SpawnPeerInstaller: Send + Sync {
22 fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>);
23}
24
25impl<R: Reactor + Clone, N: Network + Clone> SpawnPeerInstaller for BoundedTransportHandle<R, N> {
26 fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>) {
27 let source_addr = AddrHash::synthetic(engine_id.as_str().as_bytes());
28 match server_name {
29 Some(server_name) => self.add_peer(&PeerSpec {
30 engine_id: engine_id.clone(),
31 addr,
32 control_plane_addr: None,
33 server_name: Some(server_name),
34 }),
35 None => self.add_peer(&PeerSpec {
36 engine_id: engine_id.clone(),
37 addr,
38 control_plane_addr: None,
39 server_name: None,
40 }),
41 }
42 match self {
43 Self::Tcp(transport) => {
44 let _ = transport.register_remote_source_addr(engine_id, source_addr);
45 }
46 Self::Quic(transport) => {
47 let _ = transport.register_remote_source_addr(engine_id, source_addr);
48 }
49 }
50 }
51}
52
53fn spawn_peer_installers() -> &'static DashMap<String, Arc<dyn SpawnPeerInstaller>> {
54 static INSTALLERS: std::sync::OnceLock<DashMap<String, Arc<dyn SpawnPeerInstaller>>> =
55 std::sync::OnceLock::new();
56 INSTALLERS.get_or_init(DashMap::new)
57}
58
59pub(crate) fn register_spawn_peer_installer<R: Reactor + Clone, N: Network + Clone>(
60 engine_id: &EngineId,
61 transport: &BoundedTransportHandle<R, N>,
62) {
63 spawn_peer_installers().insert(
64 engine_id.as_str().to_string(),
65 Arc::new(transport.clone()) as Arc<dyn SpawnPeerInstaller>,
66 );
67}
68
69pub(crate) fn maybe_register_spawn_caller_peer(
70 host_engine_id: &EngineId,
71 caller_engine_id: &EngineId,
72 caller_transport_addr: SocketAddr,
73 caller_server_name: Option<String>,
74) {
75 if let Some(installer) = spawn_peer_installers().get(host_engine_id.as_str()) {
76 installer.ensure_peer(
77 caller_engine_id.clone(),
78 caller_transport_addr,
79 caller_server_name,
80 );
81 }
82}
83
84#[derive(Clone)]
85pub struct BoundedClusterConfig {
86 pub transport: BoundedTransportConfig,
87 pub peers: Vec<PeerSpec>,
88 pub declared_actors: Vec<DeclaredRemoteActor>,
89 pub roles: Vec<BoundedClusterRole>,
90}
91
92impl BoundedClusterConfig {
93 pub fn validate(&self) -> Result<(), ClusterError> {
94 validate_static_config(self)
95 }
96}
97
98#[derive(Clone)]
99pub enum BoundedTransportConfig {
100 Tcp(TcpTransportConfig),
101 Quic(QuicTransportConfig),
102}
103
104impl BoundedTransportConfig {
105 pub fn engine_id(&self) -> &EngineId {
106 match self {
107 Self::Tcp(config) => &config.engine_id,
108 Self::Quic(config) => &config.engine_id,
109 }
110 }
111}
112
113#[derive(Clone, Debug, PartialEq, Eq)]
114pub struct PeerSpec {
115 pub engine_id: EngineId,
116 pub addr: SocketAddr,
117 pub control_plane_addr: Option<SocketAddr>,
118 pub server_name: Option<String>,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq)]
122pub struct DeclaredRemoteActor {
123 pub path: ActorPath,
124 pub owner: EngineId,
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
128pub struct BoundedClusterRole {
129 pub name: String,
130 pub owner: EngineId,
131}
132
133#[derive(Clone, Debug, PartialEq, Eq)]
134pub enum ClusterError {
135 DuplicatePeer(EngineId),
136 LocalPeerDeclared(EngineId),
137 DuplicateDeclaredActor(ActorPath),
138 LocalActorDeclaredRemote(ActorPath),
139 UnknownPeerOwner { path: ActorPath, owner: EngineId },
140 DuplicateRole(String),
141 UnknownRoleOwner { role: String, owner: EngineId },
142 RoleOwnerMissingControlPlaneEndpoint { role: String, owner: EngineId },
143 Transport(TransportError),
144}
145
146#[derive(Clone, Debug, PartialEq)]
147pub struct RemoteSpawnSpec {
148 pub path: ActorPath,
149 pub type_name: String,
150 pub config: Option<Value>,
151}
152
153#[derive(Clone, Debug, PartialEq, Eq)]
154pub enum RemoteSpawnError {
155 UnknownPeer(EngineId),
156 UnknownRole(String),
157 LocalPathCollision(ActorPath),
158 RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
159 RemotePathAlreadyRunning(ActorPath),
160 MissingControlPlaneEndpoint(EngineId),
161 InvalidServerName(String),
162 ControlPlane(String),
163 SpawnTimedOut(ActorPath),
164}
165
166#[derive(Clone)]
167pub(crate) enum ControlPlaneProtocol {
168 Tcp,
169 Quic,
170}
171
172#[derive(Clone)]
173pub(crate) struct ControlPlaneClientConfig {
174 pub(crate) protocol: ControlPlaneProtocol,
175 pub(crate) tls: palladium_transport::TlsConfig,
176 pub(crate) wait_timeout: Duration,
177}
178
179const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
180
181impl From<TransportError> for ClusterError {
182 fn from(value: TransportError) -> Self {
183 Self::Transport(value)
184 }
185}
186
187impl From<TransportError> for RemoteSpawnError {
188 fn from(value: TransportError) -> Self {
189 Self::ControlPlane(format!("transport route install failed: {value:?}"))
190 }
191}
192
193#[derive(Clone)]
194pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
195 Tcp(Arc<TcpTransport<R, N>>),
196 Quic(Arc<QuicTransport<R, N>>),
197}
198
199impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
200 pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
201 match self {
202 Self::Tcp(transport) => Some(transport),
203 Self::Quic(_) => None,
204 }
205 }
206
207 pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
208 match self {
209 Self::Tcp(_) => None,
210 Self::Quic(transport) => Some(transport),
211 }
212 }
213
214 pub fn add_peer(&self, peer: &PeerSpec) {
215 match self {
216 Self::Tcp(transport) => {
217 if let Some(server_name) = &peer.server_name {
218 transport.add_peer_with_server_name(
219 peer.engine_id.clone(),
220 peer.addr,
221 server_name.clone(),
222 );
223 } else {
224 transport.add_peer(peer.engine_id.clone(), peer.addr);
225 }
226 }
227 Self::Quic(transport) => {
228 if let Some(server_name) = &peer.server_name {
229 transport.add_peer_with_server_name(
230 peer.engine_id.clone(),
231 peer.addr,
232 server_name.clone(),
233 );
234 } else {
235 transport.add_peer(peer.engine_id.clone(), peer.addr);
236 }
237 }
238 }
239 }
240
241 pub fn declare_remote_path(
242 &self,
243 owner: EngineId,
244 path: &ActorPath,
245 ) -> Result<(), TransportError> {
246 match self {
247 Self::Tcp(transport) => transport.declare_remote_path(owner, path),
248 Self::Quic(transport) => transport.declare_remote_path(owner, path),
249 }
250 }
251
252 pub fn advertise_path(&self, addr: AddrHash, path: &ActorPath) -> Result<(), TransportError> {
253 match self {
254 Self::Tcp(transport) => transport.advertise_path(addr, path),
255 Self::Quic(transport) => transport.advertise_path(addr, path),
256 }
257 }
258
259 pub fn register_source_addr(&self, addr: AddrHash) -> Result<(), TransportError> {
260 match self {
261 Self::Tcp(transport) => transport.register(addr, mailbox(1).0),
262 Self::Quic(transport) => transport.register(addr, mailbox(1).0),
263 }
264 }
265
266 pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
267 match self {
268 Self::Tcp(transport) => transport.undeclare_remote_path(path),
269 Self::Quic(transport) => transport.undeclare_remote_path(path),
270 }
271 }
272
273 pub fn can_route(&self, destination: AddrHash) -> bool {
274 match self {
275 Self::Tcp(transport) => transport.can_route(destination),
276 Self::Quic(transport) => transport.can_route(destination),
277 }
278 }
279
280 pub async fn wait_for_peer_connection(
281 &self,
282 engine_id: &EngineId,
283 timeout: Duration,
284 ) -> Result<(), TransportError> {
285 match self {
286 Self::Tcp(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
287 Self::Quic(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
288 }
289 }
290}
291
292#[derive(Clone)]
293pub struct BoundedClusterHandle<
294 R: Reactor = TokioReactor,
295 N: Network = TokioNetwork,
296 F: FileSystem = TokioFileSystem,
297> {
298 engine: EngineHandle<R, N, F>,
299 local_engine_id: EngineId,
300 control_plane: ControlPlaneClientConfig,
301 transport: BoundedTransportHandle<R, N>,
302 peers: Vec<PeerSpec>,
303 declared_actors: Vec<DeclaredRemoteActor>,
304 roles: Vec<BoundedClusterRole>,
305 registry: Arc<ActorRegistry<R>>,
306}
307
308impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
309 pub(crate) fn new(
310 engine: EngineHandle<R, N, F>,
311 local_engine_id: EngineId,
312 control_plane: ControlPlaneClientConfig,
313 transport: BoundedTransportHandle<R, N>,
314 peers: Vec<PeerSpec>,
315 roles: Vec<BoundedClusterRole>,
316 registry: Arc<ActorRegistry<R>>,
317 ) -> Self {
318 Self {
319 engine,
320 local_engine_id,
321 control_plane,
322 transport,
323 peers,
324 declared_actors: Vec::new(),
325 roles,
326 registry,
327 }
328 }
329
330 pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
331 &self.transport
332 }
333
334 pub fn peers(&self) -> &[PeerSpec] {
335 &self.peers
336 }
337
338 pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
339 &self.declared_actors
340 }
341
342 pub fn roles(&self) -> &[BoundedClusterRole] {
343 &self.roles
344 }
345
346 pub(crate) fn advertise_existing_local_routes(&self) -> Result<(), ClusterError> {
347 self.transport
348 .register_source_addr(self.engine.source_addr)
349 .map_err(ClusterError::Transport)?;
350
351 for info in self
352 .registry
353 .snapshot(&crate::introspection::ActorQuery::default(), 0)
354 {
355 if info.state != crate::introspection::ActorState::Running {
356 continue;
357 }
358 if let Some(slot) = self.registry.get_by_path(&info.path) {
359 self.transport
360 .advertise_path(slot.addr, &info.path)
361 .map_err(ClusterError::Transport)?;
362 }
363 }
364 Ok(())
365 }
366
367 pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
368 validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
369 self.transport.add_peer(&peer);
370 self.advertise_existing_local_routes()?;
371 self.peers.push(peer);
372 Ok(())
373 }
374
375 pub fn declare_remote_actor(
376 &mut self,
377 owner: EngineId,
378 path: ActorPath,
379 ) -> Result<(), ClusterError> {
380 validate_declared_actor_topology(
381 &path,
382 &owner,
383 &self.local_engine_id,
384 &self.peers,
385 &self.declared_actors,
386 )?;
387 validate_declared_actor_registry(
388 &path,
389 &owner,
390 &self.local_engine_id,
391 &self.declared_actors,
392 &self.registry,
393 )?;
394 self.transport.declare_remote_path(owner.clone(), &path)?;
395 self.declared_actors
396 .push(DeclaredRemoteActor { path, owner });
397 Ok(())
398 }
399
400 pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
401 self.transport.undeclare_remote_path(path)?;
402 self.declared_actors
403 .retain(|declared| &declared.path != path);
404 Ok(())
405 }
406
407 pub fn can_route(&self, path: &ActorPath) -> bool {
408 self.transport.can_route(AddrHash::new(path, 0))
409 }
410
411 pub async fn spawn_remote_on_role<M>(
412 &self,
413 role: &str,
414 spec: RemoteSpawnSpec,
415 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
416 where
417 M: RemoteMessage,
418 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
419 R: Send + Sync,
420 {
421 let owner = self
422 .roles
423 .iter()
424 .find(|declared| declared.name == role)
425 .map(|declared| declared.owner.clone())
426 .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
427 self.spawn_remote::<M>(owner, spec).await
428 }
429
430 pub async fn spawn_remote<M>(
431 &self,
432 target: EngineId,
433 spec: RemoteSpawnSpec,
434 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
435 where
436 M: RemoteMessage,
437 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
438 R: Send + Sync,
439 {
440 validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
441
442 let peer = self
443 .peers
444 .iter()
445 .find(|peer| peer.engine_id == target)
446 .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
447 let control_plane_addr = peer
448 .control_plane_addr
449 .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
450 let server_name = peer
451 .server_name
452 .clone()
453 .unwrap_or_else(|| target.as_str().to_string());
454 let caller_server_name = self.local_engine_id.as_str().to_string();
455
456 let deadline = Instant::now() + self.control_plane.wait_timeout;
457 loop {
458 match call_actor_spawn(
459 &self.control_plane,
460 control_plane_addr,
461 &server_name,
462 &self.local_engine_id,
463 transport_listen_addr(&self.transport),
464 &caller_server_name,
465 &spec,
466 )
467 .await
468 {
469 Ok(()) => break,
470 Err(RemoteSpawnError::ControlPlane(message))
471 if is_retryable_control_plane_error(&message) && Instant::now() <= deadline =>
472 {
473 tokio::time::sleep(Duration::from_millis(20)).await;
474 }
475 Err(err) => return Err(err),
476 }
477 }
478
479 while Instant::now() <= deadline {
480 if actor_is_spawned(
481 &self.control_plane,
482 control_plane_addr,
483 &server_name,
484 &spec.path,
485 )
486 .await?
487 {
488 self.transport
489 .declare_remote_path(target.clone(), &spec.path)
490 .map_err(RemoteSpawnError::from)?;
491 self.transport
492 .wait_for_peer_connection(&target, self.control_plane.wait_timeout)
493 .await
494 .map_err(RemoteSpawnError::from)?;
495 return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
496 }
497 tokio::time::sleep(Duration::from_millis(20)).await;
498 }
499
500 Err(RemoteSpawnError::SpawnTimedOut(spec.path))
501 }
502}
503
504async fn call_actor_spawn(
505 client: &ControlPlaneClientConfig,
506 addr: SocketAddr,
507 server_name: &str,
508 caller_engine_id: &EngineId,
509 caller_transport_addr: SocketAddr,
510 caller_server_name: &str,
511 spec: &RemoteSpawnSpec,
512) -> Result<(), RemoteSpawnError> {
513 let params = match &spec.config {
514 Some(config) => json!({
515 "path": spec.path.as_str(),
516 "type_name": spec.type_name,
517 "config": config,
518 "caller_engine_id": caller_engine_id.as_str(),
519 "caller_transport_addr": caller_transport_addr,
520 "caller_server_name": caller_server_name,
521 }),
522 None => json!({
523 "path": spec.path.as_str(),
524 "type_name": spec.type_name,
525 "caller_engine_id": caller_engine_id.as_str(),
526 "caller_transport_addr": caller_transport_addr,
527 "caller_server_name": caller_server_name,
528 }),
529 };
530 match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
531 Ok(_) => {}
532 Err(RemoteSpawnError::ControlPlane(message))
533 if message.contains("actor already running at") =>
534 {
535 return Err(RemoteSpawnError::RemotePathAlreadyRunning(
536 spec.path.clone(),
537 ));
538 }
539 Err(err) => return Err(err),
540 }
541 Ok(())
542}
543
544fn transport_listen_addr<R: Reactor + Clone, N: Network + Clone>(
545 transport: &BoundedTransportHandle<R, N>,
546) -> SocketAddr {
547 match transport {
548 BoundedTransportHandle::Tcp(transport) => transport.local_addr(),
549 BoundedTransportHandle::Quic(transport) => transport.local_addr(),
550 }
551}
552
553fn is_retryable_control_plane_error(message: &str) -> bool {
554 let msg = message.to_ascii_lowercase();
555 msg.contains("connection refused")
556 || msg.contains("connection reset")
557 || msg.contains("timed out")
558 || msg.contains("broken pipe")
559 || msg.contains("not connected")
560}
561
562fn validate_remote_spawn_request<R: Reactor>(
563 path: &ActorPath,
564 owner: &EngineId,
565 declared_actors: &[DeclaredRemoteActor],
566 registry: &Arc<ActorRegistry<R>>,
567) -> Result<(), RemoteSpawnError> {
568 if registry.get_by_path(path).is_some() {
569 return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
570 }
571
572 if let Some(declared) = declared_actors
573 .iter()
574 .find(|declared| declared.path == *path && declared.owner != *owner)
575 {
576 return Err(RemoteSpawnError::RemoteOwnershipConflict {
577 path: path.clone(),
578 owner: declared.owner.clone(),
579 });
580 }
581
582 Ok(())
583}
584
585async fn actor_is_spawned(
586 client: &ControlPlaneClientConfig,
587 addr: SocketAddr,
588 server_name: &str,
589 path: &ActorPath,
590) -> Result<bool, RemoteSpawnError> {
591 match call_control_plane(
592 client,
593 addr,
594 server_name,
595 "actor.info",
596 json!({ "path": path.as_str() }),
597 )
598 .await
599 {
600 Ok(_) => Ok(true),
601 Err(RemoteSpawnError::ControlPlane(message))
602 if message.contains("actor not found:")
603 || message.contains("actor not found")
604 || message.contains("User supervisor not available") =>
605 {
606 Ok(false)
607 }
608 Err(err) => Err(err),
609 }
610}
611
612async fn call_control_plane(
613 client: &ControlPlaneClientConfig,
614 addr: SocketAddr,
615 server_name: &str,
616 method: &str,
617 params: Value,
618) -> Result<Value, RemoteSpawnError> {
619 let request = json!({
620 "id": 1,
621 "method": method,
622 "params": params,
623 });
624
625 match client.protocol {
626 ControlPlaneProtocol::Tcp => {
627 call_control_plane_tcp(addr, server_name, &client.tls, &request).await
628 }
629 ControlPlaneProtocol::Quic => {
630 call_control_plane_quic(addr, server_name, &client.tls, &request).await
631 }
632 }
633}
634
635async fn call_control_plane_tcp(
636 addr: SocketAddr,
637 server_name: &str,
638 tls: &palladium_transport::TlsConfig,
639 request: &Value,
640) -> Result<Value, RemoteSpawnError> {
641 let config = tls
642 .client_config(server_name)
643 .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
644 let name = ServerName::try_from(server_name.to_string())
645 .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
646 let stream = tokio::net::TcpStream::connect(addr)
647 .await
648 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
649 let connector = TlsConnector::from(config);
650 let tls_stream = connector
651 .connect(name, stream)
652 .await
653 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
654
655 let (reader, mut writer) = tokio::io::split(tls_stream);
656 let mut body = request.to_string();
657 body.push('\n');
658 writer
659 .write_all(body.as_bytes())
660 .await
661 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
662 let mut line = String::new();
663 let mut reader = BufReader::new(reader);
664 reader
665 .read_line(&mut line)
666 .await
667 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
668 parse_control_plane_response(&line)
669}
670
671async fn call_control_plane_quic(
672 addr: SocketAddr,
673 server_name: &str,
674 tls: &palladium_transport::TlsConfig,
675 request: &Value,
676) -> Result<Value, RemoteSpawnError> {
677 ensure_rustls_provider();
678 let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
679 RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
680 })?)
681 .clone();
682 client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
683 let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
684
685 let client = s2n_quic::Client::builder()
686 .with_tls(client_tls)
687 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
688 .with_io("0.0.0.0:0")
689 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
690 .start()
691 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
692
693 let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
694 let mut conn = client
695 .connect(connect)
696 .await
697 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
698 let mut stream = conn
699 .open_bidirectional_stream()
700 .await
701 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
702
703 let mut body = request.to_string();
704 body.push('\n');
705 stream
706 .write_all(body.as_bytes())
707 .await
708 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
709 stream
710 .close()
711 .await
712 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
713
714 let mut buf = Vec::new();
715 stream
716 .read_to_end(&mut buf)
717 .await
718 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
719 let line = String::from_utf8_lossy(&buf);
720 parse_control_plane_response(&line)
721}
722
723fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
724 let response: Value = serde_json::from_str(line.trim())
725 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
726 if let Some(error) = response.get("error") {
727 let message = error
728 .get("message")
729 .and_then(Value::as_str)
730 .unwrap_or("unknown control-plane error");
731 return Err(RemoteSpawnError::ControlPlane(message.to_string()));
732 }
733 Ok(response.get("result").cloned().unwrap_or(Value::Null))
734}
735
736fn ensure_rustls_provider() {
737 static INIT: std::sync::Once = std::sync::Once::new();
738 INIT.call_once(|| {
739 #[cfg(feature = "aws-lc-rs")]
740 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
741 #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
742 let _ = rustls::crypto::ring::default_provider().install_default();
743 });
744}
745
746pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
747 ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
748}
749
750pub(crate) fn validate_config<R: Reactor>(
751 config: &BoundedClusterConfig,
752 registry: &Arc<ActorRegistry<R>>,
753) -> Result<(), ClusterError> {
754 validate_static_config(config)?;
755
756 let local_engine_id = config.transport.engine_id();
757 let mut seen_declared = Vec::new();
758 for declared in &config.declared_actors {
759 validate_declared_actor_registry(
760 &declared.path,
761 &declared.owner,
762 local_engine_id,
763 &seen_declared,
764 registry,
765 )?;
766 seen_declared.push(declared.clone());
767 }
768
769 Ok(())
770}
771
772fn validate_static_config(config: &BoundedClusterConfig) -> Result<(), ClusterError> {
773 let local_engine_id = config.transport.engine_id();
774 let mut seen_peers = Vec::new();
775 for peer in &config.peers {
776 validate_peer_spec(peer, local_engine_id, &seen_peers)?;
777 seen_peers.push(peer.clone());
778 }
779
780 let mut seen_declared = Vec::new();
781 for declared in &config.declared_actors {
782 validate_declared_actor_topology(
783 &declared.path,
784 &declared.owner,
785 local_engine_id,
786 &config.peers,
787 &seen_declared,
788 )?;
789 seen_declared.push(declared.clone());
790 }
791
792 let mut seen_roles = Vec::new();
793 for role in &config.roles {
794 validate_role(role, &config.peers, &seen_roles)?;
795 seen_roles.push(role.clone());
796 }
797
798 Ok(())
799}
800
801fn validate_peer_spec(
802 peer: &PeerSpec,
803 local_engine_id: &EngineId,
804 existing: &[PeerSpec],
805) -> Result<(), ClusterError> {
806 if &peer.engine_id == local_engine_id {
807 return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
808 }
809 if existing
810 .iter()
811 .any(|existing| existing.engine_id == peer.engine_id)
812 {
813 return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
814 }
815 Ok(())
816}
817
818fn validate_declared_actor_topology(
819 path: &ActorPath,
820 owner: &EngineId,
821 local_engine_id: &EngineId,
822 peers: &[PeerSpec],
823 existing: &[DeclaredRemoteActor],
824) -> Result<(), ClusterError> {
825 if owner == local_engine_id {
826 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
827 }
828 if existing.iter().any(|existing| existing.path == *path) {
829 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
830 }
831 if !peers.iter().any(|peer| peer.engine_id == *owner) {
832 return Err(ClusterError::UnknownPeerOwner {
833 path: path.clone(),
834 owner: owner.clone(),
835 });
836 }
837 Ok(())
838}
839
840fn validate_declared_actor_registry<R: Reactor>(
841 path: &ActorPath,
842 owner: &EngineId,
843 local_engine_id: &EngineId,
844 existing: &[DeclaredRemoteActor],
845 registry: &Arc<ActorRegistry<R>>,
846) -> Result<(), ClusterError> {
847 if owner == local_engine_id || registry.get_by_path(path).is_some() {
848 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
849 }
850 if existing.iter().any(|existing| existing.path == *path) {
851 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
852 }
853 Ok(())
854}
855
856fn validate_role(
857 role: &BoundedClusterRole,
858 peers: &[PeerSpec],
859 existing: &[BoundedClusterRole],
860) -> Result<(), ClusterError> {
861 if existing.iter().any(|existing| existing.name == role.name) {
862 return Err(ClusterError::DuplicateRole(role.name.clone()));
863 }
864 if !peers.iter().any(|peer| peer.engine_id == role.owner) {
865 return Err(ClusterError::UnknownRoleOwner {
866 role: role.name.clone(),
867 owner: role.owner.clone(),
868 });
869 }
870 if !peers
871 .iter()
872 .any(|peer| peer.engine_id == role.owner && peer.control_plane_addr.is_some())
873 {
874 return Err(ClusterError::RoleOwnerMissingControlPlaneEndpoint {
875 role: role.name.clone(),
876 owner: role.owner.clone(),
877 });
878 }
879 Ok(())
880}
881
882#[cfg(test)]
883mod tests {
884 use super::{
885 call_control_plane, validate_config, BoundedClusterConfig, BoundedClusterRole,
886 BoundedTransportConfig, ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError,
887 RemoteSpawnSpec,
888 };
889 use crate::{Engine, EngineConfig};
890 use palladium_actor::{
891 Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
892 Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
893 };
894 use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
895 use rcgen::{
896 BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
897 KeyUsagePurpose,
898 };
899 use serde_json::json;
900 use std::net::{SocketAddr, TcpListener, UdpSocket};
901 use std::sync::Arc;
902 use std::time::{Duration, Instant};
903
904 fn make_ca() -> (Certificate, KeyPair) {
905 let mut params = CertificateParams::default();
906 params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
907 params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
908 let ca_key = KeyPair::generate().expect("ca keypair");
909 let cert = params.self_signed(&ca_key).expect("ca cert");
910 (cert, ca_key)
911 }
912
913 fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
914 let keypair = KeyPair::generate().expect("leaf keypair");
915 let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
916 let mut params = params;
917 params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
918 params.extended_key_usages = vec![
919 ExtendedKeyUsagePurpose::ServerAuth,
920 ExtendedKeyUsagePurpose::ClientAuth,
921 ];
922 let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
923 let cert_der = cert.der().to_vec();
924 let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
925 let ca_der = ca.der().to_vec();
926 TlsConfig {
927 cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
928 private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
929 trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
930 }
931 }
932
933 fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
934 TcpTransportConfig {
935 engine_id: EngineId::new(engine_id),
936 listen_addr: "127.0.0.1:0".parse().unwrap(),
937 max_connections_per_peer: 1,
938 idle_timeout: Duration::from_secs(1),
939 send_buffer_size: 8,
940 nodelay: true,
941 tls,
942 send_delay: Duration::from_millis(0),
943 }
944 }
945
946 fn reserve_tcp_addr() -> SocketAddr {
947 let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
948 let addr = listener.local_addr().expect("tcp local addr");
949 drop(listener);
950 addr
951 }
952
953 fn reserve_udp_addr() -> SocketAddr {
954 let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
955 let addr = socket.local_addr().expect("udp local addr");
956 drop(socket);
957 addr
958 }
959
960 #[test]
961 fn bounded_cluster_config_rejects_duplicate_peers() {
962 let (ca, ca_key) = make_ca();
963 let engine = Engine::with_config(EngineConfig {
964 engine_id: EngineId::new("engine-a.example"),
965 ..Default::default()
966 });
967 let config = BoundedClusterConfig {
968 transport: BoundedTransportConfig::Tcp(tcp_config(
969 "engine-a.example",
970 make_tls_config("engine-a.example", &ca, &ca_key),
971 )),
972 peers: vec![
973 PeerSpec {
974 engine_id: EngineId::new("engine-b.example"),
975 addr: "127.0.0.1:4100".parse().unwrap(),
976 control_plane_addr: None,
977 server_name: None,
978 },
979 PeerSpec {
980 engine_id: EngineId::new("engine-b.example"),
981 addr: "127.0.0.1:4200".parse().unwrap(),
982 control_plane_addr: None,
983 server_name: None,
984 },
985 ],
986 declared_actors: Vec::new(),
987 roles: Vec::new(),
988 };
989
990 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
991 assert_eq!(
992 err,
993 ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
994 );
995 }
996
997 #[test]
998 fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
999 let (ca, ca_key) = make_ca();
1000 let engine = Engine::with_config(EngineConfig {
1001 engine_id: EngineId::new("engine-a.example"),
1002 ..Default::default()
1003 });
1004 let config = BoundedClusterConfig {
1005 transport: BoundedTransportConfig::Tcp(tcp_config(
1006 "engine-a.example",
1007 make_tls_config("engine-a.example", &ca, &ca_key),
1008 )),
1009 peers: vec![PeerSpec {
1010 engine_id: EngineId::new("engine-b.example"),
1011 addr: "127.0.0.1:4100".parse().unwrap(),
1012 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1013 server_name: None,
1014 }],
1015 declared_actors: vec![DeclaredRemoteActor {
1016 path: ActorPath::parse("/user/missing-owner").unwrap(),
1017 owner: EngineId::new("engine-c.example"),
1018 }],
1019 roles: Vec::new(),
1020 };
1021
1022 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1023 assert_eq!(
1024 err,
1025 ClusterError::UnknownPeerOwner {
1026 path: ActorPath::parse("/user/missing-owner").unwrap(),
1027 owner: EngineId::new("engine-c.example"),
1028 }
1029 );
1030 }
1031
1032 #[test]
1033 fn bounded_cluster_config_rejects_duplicate_roles() {
1034 let (ca, ca_key) = make_ca();
1035 let engine = Engine::with_config(EngineConfig {
1036 engine_id: EngineId::new("engine-a.example"),
1037 ..Default::default()
1038 });
1039 let config = BoundedClusterConfig {
1040 transport: BoundedTransportConfig::Tcp(tcp_config(
1041 "engine-a.example",
1042 make_tls_config("engine-a.example", &ca, &ca_key),
1043 )),
1044 peers: vec![PeerSpec {
1045 engine_id: EngineId::new("engine-b.example"),
1046 addr: "127.0.0.1:4100".parse().unwrap(),
1047 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1048 server_name: None,
1049 }],
1050 declared_actors: Vec::new(),
1051 roles: vec![
1052 BoundedClusterRole {
1053 name: "context-service".to_string(),
1054 owner: EngineId::new("engine-b.example"),
1055 },
1056 BoundedClusterRole {
1057 name: "context-service".to_string(),
1058 owner: EngineId::new("engine-b.example"),
1059 },
1060 ],
1061 };
1062
1063 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1064 assert_eq!(
1065 err,
1066 ClusterError::DuplicateRole("context-service".to_string())
1067 );
1068 }
1069
1070 #[test]
1071 fn bounded_cluster_config_rejects_unknown_role_owner() {
1072 let (ca, ca_key) = make_ca();
1073 let engine = Engine::with_config(EngineConfig {
1074 engine_id: EngineId::new("engine-a.example"),
1075 ..Default::default()
1076 });
1077 let config = BoundedClusterConfig {
1078 transport: BoundedTransportConfig::Tcp(tcp_config(
1079 "engine-a.example",
1080 make_tls_config("engine-a.example", &ca, &ca_key),
1081 )),
1082 peers: vec![PeerSpec {
1083 engine_id: EngineId::new("engine-b.example"),
1084 addr: "127.0.0.1:4100".parse().unwrap(),
1085 control_plane_addr: None,
1086 server_name: None,
1087 }],
1088 declared_actors: Vec::new(),
1089 roles: vec![BoundedClusterRole {
1090 name: "gateway".to_string(),
1091 owner: EngineId::new("engine-c.example"),
1092 }],
1093 };
1094
1095 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1096 assert_eq!(
1097 err,
1098 ClusterError::UnknownRoleOwner {
1099 role: "gateway".to_string(),
1100 owner: EngineId::new("engine-c.example"),
1101 }
1102 );
1103 }
1104
1105 #[test]
1106 fn bounded_cluster_config_rejects_role_owner_without_control_plane() {
1107 let (ca, ca_key) = make_ca();
1108 let config = BoundedClusterConfig {
1109 transport: BoundedTransportConfig::Tcp(tcp_config(
1110 "engine-a.example",
1111 make_tls_config("engine-a.example", &ca, &ca_key),
1112 )),
1113 peers: vec![PeerSpec {
1114 engine_id: EngineId::new("engine-b.example"),
1115 addr: "127.0.0.1:4100".parse().unwrap(),
1116 control_plane_addr: None,
1117 server_name: None,
1118 }],
1119 declared_actors: Vec::new(),
1120 roles: vec![BoundedClusterRole {
1121 name: "session-host".to_string(),
1122 owner: EngineId::new("engine-b.example"),
1123 }],
1124 };
1125
1126 let err = config.validate().unwrap_err();
1127 assert_eq!(
1128 err,
1129 ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1130 role: "session-host".to_string(),
1131 owner: EngineId::new("engine-b.example"),
1132 }
1133 );
1134 }
1135
1136 #[tokio::test]
1137 async fn attach_bounded_cluster_rejects_role_owner_without_control_plane() {
1138 let (ca, ca_key) = make_ca();
1139 let engine = Engine::with_config(EngineConfig {
1140 engine_id: EngineId::new("engine-a.example"),
1141 ..Default::default()
1142 });
1143
1144 let err = match engine
1145 .handle()
1146 .attach_bounded_cluster(BoundedClusterConfig {
1147 transport: BoundedTransportConfig::Tcp(tcp_config(
1148 "engine-a.example",
1149 make_tls_config("engine-a.example", &ca, &ca_key),
1150 )),
1151 peers: vec![PeerSpec {
1152 engine_id: EngineId::new("engine-b.example"),
1153 addr: "127.0.0.1:4100".parse().unwrap(),
1154 control_plane_addr: None,
1155 server_name: None,
1156 }],
1157 declared_actors: Vec::new(),
1158 roles: vec![BoundedClusterRole {
1159 name: "session-host".to_string(),
1160 owner: EngineId::new("engine-b.example"),
1161 }],
1162 })
1163 .await
1164 {
1165 Ok(_) => panic!("expected bounded cluster attach to reject missing control plane"),
1166 Err(err) => err,
1167 };
1168
1169 assert_eq!(
1170 err,
1171 ClusterError::RoleOwnerMissingControlPlaneEndpoint {
1172 role: "session-host".to_string(),
1173 owner: EngineId::new("engine-b.example"),
1174 }
1175 );
1176 }
1177
1178 #[test]
1179 fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
1180 assert_eq!(
1181 super::remote_spawn_wait_timeout(Duration::from_millis(25)),
1182 Duration::from_secs(5)
1183 );
1184 assert_eq!(
1185 super::remote_spawn_wait_timeout(Duration::from_secs(8)),
1186 Duration::from_secs(8)
1187 );
1188 }
1189
1190 fn tcp_tests_available() -> bool {
1191 if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
1192 return false;
1193 }
1194 std::net::TcpListener::bind("127.0.0.1:0").is_ok()
1195 }
1196
1197 fn quic_tests_available() -> bool {
1198 super::ensure_rustls_provider();
1199 rustls::crypto::CryptoProvider::get_default().is_some()
1200 }
1201
1202 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1203 struct EchoRequest {
1204 value: u64,
1205 }
1206
1207 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1208 struct EchoResponse {
1209 doubled: u64,
1210 }
1211
1212 impl Message for EchoRequest {
1213 type Response = EchoResponse;
1214 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
1215 }
1216
1217 struct EchoActor;
1218
1219 impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
1220 fn on_message(
1221 &mut self,
1222 ctx: &mut ActorContext<R>,
1223 envelope: &Envelope,
1224 payload: MessagePayload,
1225 ) -> Result<(), ActorError> {
1226 let req = payload
1227 .extract::<EchoRequest>()
1228 .map_err(|_| ActorError::Handler)?;
1229 let resp = EchoResponse {
1230 doubled: req.value * 2,
1231 };
1232 let resp_env = envelope.response(0);
1233 ctx.send_raw(resp_env, MessagePayload::local(resp))
1234 .map_err(|_| ActorError::Handler)
1235 }
1236 }
1237
1238 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1239 struct CrashRequest;
1240
1241 impl Message for CrashRequest {
1242 type Response = ();
1243 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
1244 }
1245
1246 struct SpawnedEchoActor;
1247
1248 impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
1249 fn on_message(
1250 &mut self,
1251 ctx: &mut ActorContext<R>,
1252 envelope: &Envelope,
1253 payload: MessagePayload,
1254 ) -> Result<(), ActorError> {
1255 match envelope.type_tag {
1256 EchoRequest::TYPE_TAG => {
1257 let req = payload
1258 .extract::<EchoRequest>()
1259 .map_err(|_| ActorError::Handler)?;
1260 let resp = EchoResponse {
1261 doubled: req.value * 2,
1262 };
1263 let resp_env = envelope.response(0);
1264 ctx.send_raw(resp_env, MessagePayload::local(resp))
1265 .map_err(|_| ActorError::Handler)
1266 }
1267 CrashRequest::TYPE_TAG => Err(ActorError::Handler),
1268 _ => Err(ActorError::Handler),
1269 }
1270 }
1271 }
1272
1273 fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
1274 Arc::new(|type_name: &str, _config: &[u8]| match type_name {
1275 "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
1276 _ => Err(format!("unknown remote spawn type: {type_name}")),
1277 })
1278 }
1279
1280 #[tokio::test]
1281 async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
1282 if !tcp_tests_available() {
1283 eprintln!("skipping tcp integration tests: network bind not permitted");
1284 return;
1285 }
1286
1287 let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
1288 let (ca, ca_key) = make_ca();
1289 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1290 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1291
1292 let mut engine_a = Engine::with_config(EngineConfig {
1293 engine_id: EngineId::new("engine-a.example"),
1294 ..Default::default()
1295 });
1296 let engine_b = Engine::with_config(EngineConfig {
1297 engine_id: EngineId::new("engine-b.example"),
1298 ..Default::default()
1299 });
1300
1301 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1302 engine_a.add_user_actor(ChildSpec::new(
1303 "cluster-echo",
1304 RestartPolicy::Permanent,
1305 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1306 ns,
1307 move || Box::new(EchoActor),
1308 ));
1309
1310 let handle_a = engine_a.handle();
1311 let handle_b = engine_b.handle();
1312 handle_a
1313 .type_registry()
1314 .register_remote_ask::<EchoRequest>();
1315 handle_b
1316 .type_registry()
1317 .register_remote_ask::<EchoRequest>();
1318
1319 let cluster_a = handle_a
1320 .attach_bounded_cluster(BoundedClusterConfig {
1321 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1322 peers: Vec::new(),
1323 declared_actors: Vec::new(),
1324 roles: Vec::new(),
1325 })
1326 .await
1327 .expect("attach bounded cluster a");
1328 let tcp_a = cluster_a
1329 .transport()
1330 .as_tcp()
1331 .cloned()
1332 .expect("tcp transport a");
1333
1334 let cluster_b = handle_b
1335 .attach_bounded_cluster(BoundedClusterConfig {
1336 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1337 peers: vec![PeerSpec {
1338 engine_id: EngineId::new("engine-a.example"),
1339 addr: tcp_a.local_addr(),
1340 control_plane_addr: None,
1341 server_name: None,
1342 }],
1343 declared_actors: Vec::new(),
1344 roles: Vec::new(),
1345 })
1346 .await
1347 .expect("attach bounded cluster b");
1348 let tcp_b = cluster_b
1349 .transport()
1350 .as_tcp()
1351 .cloned()
1352 .expect("tcp transport b");
1353
1354 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1355 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1356 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1357
1358 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1359 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1360
1361 let canonical = AddrHash::new(&actor_path, 0);
1362 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1363 while !cluster_b.can_route(&actor_path) {
1364 if std::time::Instant::now() > deadline {
1365 panic!("bounded cluster route did not propagate");
1366 }
1367 tokio::time::sleep(Duration::from_millis(20)).await;
1368 }
1369 assert!(tcp_b.can_route(canonical));
1370
1371 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1372 let response = remote
1373 .ask(EchoRequest { value: 12 })
1374 .await
1375 .expect("bounded cluster ask");
1376 assert_eq!(response, EchoResponse { doubled: 24 });
1377
1378 shutdown_a_tx.send(()).ok();
1379 shutdown_b_tx.send(()).ok();
1380 }
1381
1382 #[tokio::test]
1383 async fn bounded_cluster_add_peer_after_start_backfills_existing_tcp_routes() {
1384 if !tcp_tests_available() {
1385 eprintln!("skipping tcp integration tests: network bind not permitted");
1386 return;
1387 }
1388
1389 let actor_path = ActorPath::parse("/user/cluster-echo-add-peer").unwrap();
1390 let (ca, ca_key) = make_ca();
1391 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1392 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1393
1394 let mut engine_a = Engine::with_config(EngineConfig {
1395 engine_id: EngineId::new("engine-a.example"),
1396 ..Default::default()
1397 });
1398 let engine_b = Engine::with_config(EngineConfig {
1399 engine_id: EngineId::new("engine-b.example"),
1400 ..Default::default()
1401 });
1402
1403 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1404 engine_a.add_user_actor(ChildSpec::new(
1405 "cluster-echo-add-peer",
1406 RestartPolicy::Permanent,
1407 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1408 ns,
1409 move || Box::new(EchoActor),
1410 ));
1411
1412 let handle_a = engine_a.handle();
1413 let handle_b = engine_b.handle();
1414 handle_a
1415 .type_registry()
1416 .register_remote_ask::<EchoRequest>();
1417 handle_b
1418 .type_registry()
1419 .register_remote_ask::<EchoRequest>();
1420
1421 let mut cluster_a = handle_a
1422 .attach_bounded_cluster(BoundedClusterConfig {
1423 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1424 peers: Vec::new(),
1425 declared_actors: Vec::new(),
1426 roles: Vec::new(),
1427 })
1428 .await
1429 .expect("attach bounded cluster a");
1430 let tcp_a = cluster_a
1431 .transport()
1432 .as_tcp()
1433 .cloned()
1434 .expect("tcp transport a");
1435
1436 let cluster_b = handle_b
1437 .attach_bounded_cluster(BoundedClusterConfig {
1438 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1439 peers: vec![PeerSpec {
1440 engine_id: EngineId::new("engine-a.example"),
1441 addr: tcp_a.local_addr(),
1442 control_plane_addr: None,
1443 server_name: None,
1444 }],
1445 declared_actors: Vec::new(),
1446 roles: Vec::new(),
1447 })
1448 .await
1449 .expect("attach bounded cluster b");
1450 let tcp_b = cluster_b
1451 .transport()
1452 .as_tcp()
1453 .cloned()
1454 .expect("tcp transport b");
1455
1456 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1457 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1458
1459 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1460 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1461
1462 let local_ready_deadline = std::time::Instant::now() + Duration::from_secs(5);
1463 while handle_a.registry.get_by_path(&actor_path).is_none() {
1464 if std::time::Instant::now() > local_ready_deadline {
1465 panic!("local actor did not start before peer add");
1466 }
1467 tokio::time::sleep(Duration::from_millis(20)).await;
1468 }
1469
1470 cluster_a
1471 .add_peer(PeerSpec {
1472 engine_id: EngineId::new("engine-b.example"),
1473 addr: tcp_b.local_addr(),
1474 control_plane_addr: None,
1475 server_name: None,
1476 })
1477 .expect("add peer b to cluster a after start");
1478
1479 let canonical = AddrHash::new(&actor_path, 0);
1480 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1481 while !cluster_b.can_route(&actor_path) {
1482 if std::time::Instant::now() > deadline {
1483 panic!("bounded cluster route did not propagate after add_peer");
1484 }
1485 tokio::time::sleep(Duration::from_millis(20)).await;
1486 }
1487 assert!(tcp_b.can_route(canonical));
1488
1489 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1490 let response = remote
1491 .ask(EchoRequest { value: 15 })
1492 .await
1493 .expect("bounded cluster ask after add_peer");
1494 assert_eq!(response, EchoResponse { doubled: 30 });
1495
1496 shutdown_a_tx.send(()).ok();
1497 shutdown_b_tx.send(()).ok();
1498 }
1499
1500 #[tokio::test]
1501 async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1502 if !tcp_tests_available() {
1503 eprintln!("skipping tcp integration tests: network bind not permitted");
1504 return;
1505 }
1506
1507 let (ca, ca_key) = make_ca();
1508 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1509 let engine = Engine::with_config(EngineConfig {
1510 engine_id: EngineId::new("engine-a.example"),
1511 ..Default::default()
1512 });
1513 let handle = engine.handle();
1514
1515 let mut cluster = handle
1516 .attach_bounded_cluster(BoundedClusterConfig {
1517 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1518 peers: vec![PeerSpec {
1519 engine_id: EngineId::new("engine-b.example"),
1520 addr: "127.0.0.1:4100".parse().unwrap(),
1521 control_plane_addr: None,
1522 server_name: None,
1523 }],
1524 declared_actors: Vec::new(),
1525 roles: Vec::new(),
1526 })
1527 .await
1528 .expect("attach bounded cluster");
1529
1530 let path = ActorPath::parse("/user/declared-remote").unwrap();
1531 cluster
1532 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1533 .expect("declare remote actor");
1534
1535 assert!(cluster.can_route(&path));
1536 assert!(cluster
1537 .transport()
1538 .as_tcp()
1539 .expect("tcp transport")
1540 .can_route(AddrHash::new(&path, 0)));
1541 }
1542
1543 #[tokio::test]
1544 async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1545 if !tcp_tests_available() {
1546 eprintln!("skipping tcp integration tests: network bind not permitted");
1547 return;
1548 }
1549
1550 let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1551 let (ca, ca_key) = make_ca();
1552 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1553 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1554 let control_plane_b = reserve_tcp_addr();
1555
1556 let mut engine_a = Engine::with_config(EngineConfig {
1557 engine_id: EngineId::new("engine-a.example"),
1558 ..Default::default()
1559 });
1560 let engine_b = Engine::with_config(EngineConfig {
1561 engine_id: EngineId::new("engine-b.example"),
1562 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1563 control_plane_tls: Some(tls_b.clone()),
1564 actor_spawn: Some(bounded_spawn_handler()),
1565 ..Default::default()
1566 });
1567
1568 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1569 engine_a.add_user_actor(ChildSpec::new(
1570 "local-remote-collision",
1571 RestartPolicy::Permanent,
1572 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1573 ns,
1574 move || Box::new(EchoActor),
1575 ));
1576
1577 let handle_a = engine_a.handle();
1578 let handle_b = engine_b.handle();
1579
1580 let cluster_b = handle_b
1581 .attach_bounded_cluster(BoundedClusterConfig {
1582 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1583 peers: Vec::new(),
1584 declared_actors: Vec::new(),
1585 roles: Vec::new(),
1586 })
1587 .await
1588 .expect("attach bounded cluster b");
1589 let tcp_b = cluster_b
1590 .transport()
1591 .as_tcp()
1592 .cloned()
1593 .expect("tcp transport b");
1594
1595 let cluster_a = handle_a
1596 .attach_bounded_cluster(BoundedClusterConfig {
1597 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1598 peers: vec![PeerSpec {
1599 engine_id: EngineId::new("engine-b.example"),
1600 addr: tcp_b.local_addr(),
1601 control_plane_addr: Some(control_plane_b),
1602 server_name: None,
1603 }],
1604 declared_actors: Vec::new(),
1605 roles: vec![BoundedClusterRole {
1606 name: "context-service".to_string(),
1607 owner: EngineId::new("engine-b.example"),
1608 }],
1609 })
1610 .await
1611 .expect("attach bounded cluster a");
1612
1613 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1614 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1615
1616 let deadline = Instant::now() + Duration::from_secs(5);
1617 while handle_a.registry.get_by_path(&actor_path).is_none() {
1618 if Instant::now() > deadline {
1619 panic!("local collision actor did not start");
1620 }
1621 tokio::time::sleep(Duration::from_millis(20)).await;
1622 }
1623
1624 let err = cluster_a
1625 .spawn_remote_on_role::<EchoRequest>(
1626 "context-service",
1627 RemoteSpawnSpec {
1628 path: actor_path.clone(),
1629 type_name: "bounded-echo".to_string(),
1630 config: None,
1631 },
1632 )
1633 .await
1634 .expect_err("local path collision should fail");
1635 assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1636
1637 shutdown_a_tx.send(()).ok();
1638 }
1639
1640 #[tokio::test]
1641 async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1642 if !tcp_tests_available() {
1643 eprintln!("skipping tcp integration tests: network bind not permitted");
1644 return;
1645 }
1646
1647 let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1648 let (ca, ca_key) = make_ca();
1649 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1650 let engine = Engine::with_config(EngineConfig {
1651 engine_id: EngineId::new("engine-a.example"),
1652 ..Default::default()
1653 });
1654 let handle = engine.handle();
1655
1656 let cluster = handle
1657 .attach_bounded_cluster(BoundedClusterConfig {
1658 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1659 peers: vec![
1660 PeerSpec {
1661 engine_id: EngineId::new("engine-b.example"),
1662 addr: "127.0.0.1:4100".parse().unwrap(),
1663 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1664 server_name: None,
1665 },
1666 PeerSpec {
1667 engine_id: EngineId::new("engine-c.example"),
1668 addr: "127.0.0.1:4200".parse().unwrap(),
1669 control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1670 server_name: None,
1671 },
1672 ],
1673 declared_actors: vec![DeclaredRemoteActor {
1674 path: actor_path.clone(),
1675 owner: EngineId::new("engine-b.example"),
1676 }],
1677 roles: Vec::new(),
1678 })
1679 .await
1680 .expect("attach bounded cluster");
1681
1682 let err = cluster
1683 .spawn_remote::<EchoRequest>(
1684 EngineId::new("engine-c.example"),
1685 RemoteSpawnSpec {
1686 path: actor_path.clone(),
1687 type_name: "bounded-echo".to_string(),
1688 config: None,
1689 },
1690 )
1691 .await
1692 .expect_err("conflicting declared owner should fail");
1693 assert_eq!(
1694 err,
1695 RemoteSpawnError::RemoteOwnershipConflict {
1696 path: actor_path,
1697 owner: EngineId::new("engine-b.example"),
1698 }
1699 );
1700 }
1701
1702 #[tokio::test]
1703 async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1704 if !tcp_tests_available() {
1705 eprintln!("skipping tcp integration tests: network bind not permitted");
1706 return;
1707 }
1708
1709 let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1710 let (ca, ca_key) = make_ca();
1711 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1712 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1713 let control_plane_b = reserve_tcp_addr();
1714
1715 let engine_a = Engine::with_config(EngineConfig {
1716 engine_id: EngineId::new("engine-a.example"),
1717 ..Default::default()
1718 });
1719 let engine_b = Engine::with_config(EngineConfig {
1720 engine_id: EngineId::new("engine-b.example"),
1721 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1722 control_plane_tls: Some(tls_b.clone()),
1723 actor_spawn: Some(bounded_spawn_handler()),
1724 ..Default::default()
1725 });
1726
1727 let handle_a = engine_a.handle();
1728 let handle_b = engine_b.handle();
1729 handle_a
1730 .type_registry()
1731 .register_remote_ask::<EchoRequest>();
1732 handle_b
1733 .type_registry()
1734 .register_remote_ask::<EchoRequest>();
1735
1736 let cluster_a = handle_a
1737 .attach_bounded_cluster(BoundedClusterConfig {
1738 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1739 peers: Vec::new(),
1740 declared_actors: Vec::new(),
1741 roles: Vec::new(),
1742 })
1743 .await
1744 .expect("attach bounded cluster a");
1745 let tcp_a = cluster_a
1746 .transport()
1747 .as_tcp()
1748 .cloned()
1749 .expect("tcp transport a");
1750
1751 let cluster_b = handle_b
1752 .attach_bounded_cluster(BoundedClusterConfig {
1753 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1754 peers: vec![PeerSpec {
1755 engine_id: EngineId::new("engine-a.example"),
1756 addr: tcp_a.local_addr(),
1757 control_plane_addr: None,
1758 server_name: None,
1759 }],
1760 declared_actors: Vec::new(),
1761 roles: Vec::new(),
1762 })
1763 .await
1764 .expect("attach bounded cluster b");
1765 let tcp_b = cluster_b
1766 .transport()
1767 .as_tcp()
1768 .cloned()
1769 .expect("tcp transport b");
1770
1771 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1772 let mut cluster_a = cluster_a;
1773 cluster_a
1774 .add_peer(PeerSpec {
1775 engine_id: EngineId::new("engine-b.example"),
1776 addr: tcp_b.local_addr(),
1777 control_plane_addr: Some(control_plane_b),
1778 server_name: None,
1779 })
1780 .expect("add peer with control plane");
1781
1782 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1783 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1784
1785 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1786 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1787 tokio::time::sleep(Duration::from_millis(150)).await;
1788
1789 let remote = cluster_a
1790 .spawn_remote::<EchoRequest>(
1791 EngineId::new("engine-b.example"),
1792 RemoteSpawnSpec {
1793 path: actor_path.clone(),
1794 type_name: "bounded-echo".to_string(),
1795 config: None,
1796 },
1797 )
1798 .await
1799 .expect("spawn remote actor");
1800
1801 let response = remote
1802 .ask(EchoRequest { value: 21 })
1803 .await
1804 .expect("bounded remote spawn ask");
1805 assert_eq!(response, EchoResponse { doubled: 42 });
1806
1807 let duplicate = cluster_a
1808 .spawn_remote::<EchoRequest>(
1809 EngineId::new("engine-b.example"),
1810 RemoteSpawnSpec {
1811 path: actor_path.clone(),
1812 type_name: "bounded-echo".to_string(),
1813 config: None,
1814 },
1815 )
1816 .await
1817 .expect_err("duplicate remote spawn should fail");
1818 assert_eq!(
1819 duplicate,
1820 RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
1821 );
1822
1823 let old_host_addr = handle_b
1824 .registry
1825 .get_by_path(&actor_path)
1826 .expect("spawned actor registered on host")
1827 .addr;
1828 call_control_plane(
1829 &cluster_a.control_plane,
1830 control_plane_b,
1831 "engine-b.example",
1832 "actor.restart",
1833 json!({ "path": actor_path.as_str() }),
1834 )
1835 .await
1836 .expect("restart remote actor on host");
1837
1838 let restart_deadline = Instant::now() + Duration::from_secs(5);
1839 loop {
1840 if handle_b
1841 .registry
1842 .get_by_path(&actor_path)
1843 .map(|slot| {
1844 slot.running.load(std::sync::atomic::Ordering::Relaxed)
1845 && slot.addr != old_host_addr
1846 && slot
1847 .restart_count
1848 .load(std::sync::atomic::Ordering::Relaxed)
1849 > 0
1850 })
1851 .unwrap_or(false)
1852 {
1853 break;
1854 }
1855 if Instant::now() > restart_deadline {
1856 panic!("remote actor did not restart on host");
1857 }
1858 tokio::time::sleep(Duration::from_millis(25)).await;
1859 }
1860
1861 let restart_deadline = Instant::now() + Duration::from_secs(5);
1862 loop {
1863 if cluster_a.can_route(&actor_path) {
1864 break;
1865 }
1866 if Instant::now() > restart_deadline {
1867 panic!("remote actor route did not return after restart");
1868 }
1869 tokio::time::sleep(Duration::from_millis(25)).await;
1870 }
1871
1872 let restart_deadline = Instant::now() + Duration::from_secs(5);
1873 loop {
1874 match remote.ask(EchoRequest { value: 7 }).await {
1875 Ok(response) => {
1876 assert_eq!(response, EchoResponse { doubled: 14 });
1877 break;
1878 }
1879 Err(_) if Instant::now() <= restart_deadline => {
1880 tokio::time::sleep(Duration::from_millis(25)).await;
1881 }
1882 Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1883 }
1884 }
1885
1886 let unknown = cluster_a
1887 .spawn_remote::<EchoRequest>(
1888 EngineId::new("engine-c.example"),
1889 RemoteSpawnSpec {
1890 path: ActorPath::parse("/user/unknown-target").unwrap(),
1891 type_name: "bounded-echo".to_string(),
1892 config: None,
1893 },
1894 )
1895 .await
1896 .expect_err("unknown target should fail");
1897 assert_eq!(
1898 unknown,
1899 RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1900 );
1901
1902 shutdown_a_tx.send(()).ok();
1903 shutdown_b_tx.send(()).ok();
1904 }
1905
1906 #[tokio::test]
1907 async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1908 if !tcp_tests_available() {
1909 eprintln!("skipping tcp integration tests: network bind not permitted");
1910 return;
1911 }
1912
1913 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1914 let (ca, ca_key) = make_ca();
1915 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1916 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1917 let control_plane_b = reserve_tcp_addr();
1918
1919 let engine_a = Engine::with_config(EngineConfig {
1920 engine_id: EngineId::new("engine-a.example"),
1921 ..Default::default()
1922 });
1923 let engine_b = Engine::with_config(EngineConfig {
1924 engine_id: EngineId::new("engine-b.example"),
1925 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1926 control_plane_tls: Some(tls_b.clone()),
1927 actor_spawn: Some(bounded_spawn_handler()),
1928 ..Default::default()
1929 });
1930
1931 let handle_a = engine_a.handle();
1932 let handle_b = engine_b.handle();
1933 handle_a
1934 .type_registry()
1935 .register_remote_ask::<EchoRequest>();
1936 handle_b
1937 .type_registry()
1938 .register_remote_ask::<EchoRequest>();
1939
1940 let cluster_b = handle_b
1941 .attach_bounded_cluster(BoundedClusterConfig {
1942 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1943 peers: Vec::new(),
1944 declared_actors: Vec::new(),
1945 roles: Vec::new(),
1946 })
1947 .await
1948 .expect("attach bounded cluster b");
1949 let tcp_b = cluster_b
1950 .transport()
1951 .as_tcp()
1952 .cloned()
1953 .expect("tcp transport b");
1954
1955 let cluster_a = handle_a
1956 .attach_bounded_cluster(BoundedClusterConfig {
1957 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1958 peers: vec![PeerSpec {
1959 engine_id: EngineId::new("engine-b.example"),
1960 addr: tcp_b.local_addr(),
1961 control_plane_addr: Some(control_plane_b),
1962 server_name: None,
1963 }],
1964 declared_actors: Vec::new(),
1965 roles: vec![BoundedClusterRole {
1966 name: "context-service".to_string(),
1967 owner: EngineId::new("engine-b.example"),
1968 }],
1969 })
1970 .await
1971 .expect("attach bounded cluster a");
1972 let tcp_a = cluster_a
1973 .transport()
1974 .as_tcp()
1975 .cloned()
1976 .expect("tcp transport a");
1977
1978 let mut cluster_b = cluster_b;
1979 cluster_b
1980 .add_peer(PeerSpec {
1981 engine_id: EngineId::new("engine-a.example"),
1982 addr: tcp_a.local_addr(),
1983 control_plane_addr: None,
1984 server_name: None,
1985 })
1986 .expect("add peer a to cluster b");
1987
1988 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1989 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1990
1991 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1992 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1993 tokio::time::sleep(Duration::from_millis(150)).await;
1994
1995 let remote = cluster_a
1996 .spawn_remote_on_role::<EchoRequest>(
1997 "context-service",
1998 RemoteSpawnSpec {
1999 path: actor_path.clone(),
2000 type_name: "bounded-echo".to_string(),
2001 config: None,
2002 },
2003 )
2004 .await
2005 .expect("spawn remote actor by role over tcp");
2006
2007 let response = remote
2008 .ask(EchoRequest { value: 11 })
2009 .await
2010 .expect("bounded remote spawn ask over tcp by role");
2011 assert_eq!(response, EchoResponse { doubled: 22 });
2012
2013 let unknown_role = cluster_a
2014 .spawn_remote_on_role::<EchoRequest>(
2015 "missing-role",
2016 RemoteSpawnSpec {
2017 path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
2018 type_name: "bounded-echo".to_string(),
2019 config: None,
2020 },
2021 )
2022 .await
2023 .expect_err("unknown role should fail");
2024 assert_eq!(
2025 unknown_role,
2026 RemoteSpawnError::UnknownRole("missing-role".to_string())
2027 );
2028
2029 shutdown_a_tx.send(()).ok();
2030 shutdown_b_tx.send(()).ok();
2031 }
2032
2033 #[tokio::test]
2034 async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
2035 if !tcp_tests_available() {
2036 eprintln!("skipping tcp integration tests: network bind not permitted");
2037 return;
2038 }
2039
2040 let engine_a_id = EngineId::new("engine-a-norev.example");
2041 let engine_b_id = EngineId::new("engine-b-norev.example");
2042 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
2043 let (ca, ca_key) = make_ca();
2044 let tls_a = make_tls_config(engine_a_id.as_str(), &ca, &ca_key);
2045 let tls_b = make_tls_config(engine_b_id.as_str(), &ca, &ca_key);
2046 let control_plane_b = reserve_tcp_addr();
2047
2048 let engine_a = Engine::with_config(EngineConfig {
2049 engine_id: engine_a_id.clone(),
2050 ..Default::default()
2051 });
2052 let engine_b = Engine::with_config(EngineConfig {
2053 engine_id: engine_b_id.clone(),
2054 control_plane_tcp_addr: Some(control_plane_b.to_string()),
2055 control_plane_tls: Some(tls_b.clone()),
2056 actor_spawn: Some(bounded_spawn_handler()),
2057 ..Default::default()
2058 });
2059
2060 let handle_a = engine_a.handle();
2061 let handle_b = engine_b.handle();
2062 handle_a
2063 .type_registry()
2064 .register_remote_ask::<EchoRequest>();
2065 handle_b
2066 .type_registry()
2067 .register_remote_ask::<EchoRequest>();
2068
2069 let cluster_b = handle_b
2070 .attach_bounded_cluster(BoundedClusterConfig {
2071 transport: BoundedTransportConfig::Tcp(tcp_config(engine_b_id.as_str(), tls_b)),
2072 peers: Vec::new(),
2073 declared_actors: Vec::new(),
2074 roles: Vec::new(),
2075 })
2076 .await
2077 .expect("attach bounded cluster b");
2078 let tcp_b = cluster_b
2079 .transport()
2080 .as_tcp()
2081 .cloned()
2082 .expect("tcp transport b");
2083
2084 let cluster_a = handle_a
2085 .attach_bounded_cluster(BoundedClusterConfig {
2086 transport: BoundedTransportConfig::Tcp(tcp_config(engine_a_id.as_str(), tls_a)),
2087 peers: vec![PeerSpec {
2088 engine_id: engine_b_id.clone(),
2089 addr: tcp_b.local_addr(),
2090 control_plane_addr: Some(control_plane_b),
2091 server_name: None,
2092 }],
2093 declared_actors: Vec::new(),
2094 roles: vec![BoundedClusterRole {
2095 name: "context-service".to_string(),
2096 owner: engine_b_id.clone(),
2097 }],
2098 })
2099 .await
2100 .expect("attach bounded cluster a");
2101 let tcp_a = cluster_a
2102 .transport()
2103 .as_tcp()
2104 .cloned()
2105 .expect("tcp transport a");
2106
2107 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2108 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2109
2110 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2111 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2112
2113 let _ = tcp_a;
2117
2118 let remote = cluster_a
2119 .spawn_remote_on_role::<EchoRequest>(
2120 "context-service",
2121 RemoteSpawnSpec {
2122 path: actor_path.clone(),
2123 type_name: "bounded-echo".to_string(),
2124 config: None,
2125 },
2126 )
2127 .await
2128 .expect("spawn remote actor by role over tcp without reverse declaration");
2129
2130 let response = remote
2131 .ask(EchoRequest { value: 13 })
2132 .await
2133 .expect("bounded remote spawn ask over tcp without reverse declaration");
2134 assert_eq!(response, EchoResponse { doubled: 26 });
2135
2136 shutdown_a_tx.send(()).ok();
2137 shutdown_b_tx.send(()).ok();
2138 }
2139
2140 fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
2141 QuicTransportConfig {
2142 engine_id: EngineId::new(engine_id),
2143 listen_addr: "127.0.0.1:0".parse().unwrap(),
2144 send_buffer_size: 1024,
2145 idle_timeout: Duration::from_secs(2),
2146 tls,
2147 }
2148 }
2149
2150 #[tokio::test]
2151 async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
2152 if !quic_tests_available() {
2153 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2154 return;
2155 }
2156
2157 let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
2158 let (ca, ca_key) = make_ca();
2159 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2160 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2161
2162 let mut engine_a = Engine::with_config(EngineConfig {
2163 engine_id: EngineId::new("engine-a.example"),
2164 ..Default::default()
2165 });
2166 let engine_b = Engine::with_config(EngineConfig {
2167 engine_id: EngineId::new("engine-b.example"),
2168 ..Default::default()
2169 });
2170
2171 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2172 engine_a.add_user_actor(ChildSpec::new(
2173 "cluster-echo-quic",
2174 RestartPolicy::Permanent,
2175 ShutdownPolicy::Timeout(Duration::from_secs(1)),
2176 ns,
2177 move || Box::new(EchoActor),
2178 ));
2179
2180 let handle_a = engine_a.handle();
2181 let handle_b = engine_b.handle();
2182 handle_a
2183 .type_registry()
2184 .register_remote_ask::<EchoRequest>();
2185 handle_b
2186 .type_registry()
2187 .register_remote_ask::<EchoRequest>();
2188
2189 let cluster_a = match handle_a
2190 .attach_bounded_cluster(BoundedClusterConfig {
2191 transport: BoundedTransportConfig::Quic(quic_config(
2192 "engine-a.example",
2193 tls_a.clone(),
2194 )),
2195 peers: Vec::new(),
2196 declared_actors: Vec::new(),
2197 roles: Vec::new(),
2198 })
2199 .await
2200 {
2201 Ok(cluster) => cluster,
2202 Err(ClusterError::Transport(err)) => {
2203 eprintln!("skipping quic bounded-cluster test: {err:?}");
2204 return;
2205 }
2206 Err(err) => panic!("attach bounded cluster a: {err:?}"),
2207 };
2208 let quic_a = cluster_a
2209 .transport()
2210 .as_quic()
2211 .cloned()
2212 .expect("quic transport a");
2213
2214 let cluster_b = match handle_b
2215 .attach_bounded_cluster(BoundedClusterConfig {
2216 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2217 peers: vec![PeerSpec {
2218 engine_id: EngineId::new("engine-a.example"),
2219 addr: quic_a.local_addr(),
2220 control_plane_addr: None,
2221 server_name: None,
2222 }],
2223 declared_actors: Vec::new(),
2224 roles: Vec::new(),
2225 })
2226 .await
2227 {
2228 Ok(cluster) => cluster,
2229 Err(ClusterError::Transport(err)) => {
2230 eprintln!("skipping quic bounded-cluster test: {err:?}");
2231 return;
2232 }
2233 Err(err) => panic!("attach bounded cluster b: {err:?}"),
2234 };
2235 let quic_b = cluster_b
2236 .transport()
2237 .as_quic()
2238 .cloned()
2239 .expect("quic transport b");
2240
2241 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
2242 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2243 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2244
2245 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2246 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2247
2248 let canonical = AddrHash::new(&actor_path, 0);
2249 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2250 while !cluster_b.can_route(&actor_path) {
2251 if std::time::Instant::now() > deadline {
2252 panic!("bounded quic cluster route did not propagate");
2253 }
2254 tokio::time::sleep(Duration::from_millis(20)).await;
2255 }
2256 assert!(quic_b.can_route(canonical));
2257
2258 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
2259 let ask_deadline = Instant::now() + Duration::from_secs(2);
2260 loop {
2261 match remote.ask(EchoRequest { value: 15 }).await {
2262 Ok(response) => {
2263 assert_eq!(response, EchoResponse { doubled: 30 });
2264 break;
2265 }
2266 Err(palladium_actor::AskError::Send(
2267 palladium_actor::SendError::ConnectionFailed,
2268 )) if Instant::now() <= ask_deadline => {
2269 tokio::time::sleep(Duration::from_millis(20)).await;
2270 }
2271 Err(err) => panic!("bounded cluster ask over quic: {err:?}"),
2272 }
2273 }
2274
2275 shutdown_a_tx.send(()).ok();
2276 shutdown_b_tx.send(()).ok();
2277 }
2278
2279 #[tokio::test]
2280 async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
2281 if !quic_tests_available() {
2282 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2283 return;
2284 }
2285
2286 let (ca, ca_key) = make_ca();
2287 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
2288 let engine = Engine::with_config(EngineConfig {
2289 engine_id: EngineId::new("engine-a.example"),
2290 ..Default::default()
2291 });
2292 let handle = engine.handle();
2293
2294 let mut cluster = match handle
2295 .attach_bounded_cluster(BoundedClusterConfig {
2296 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
2297 peers: vec![PeerSpec {
2298 engine_id: EngineId::new("engine-b.example"),
2299 addr: "127.0.0.1:4100".parse().unwrap(),
2300 control_plane_addr: None,
2301 server_name: None,
2302 }],
2303 declared_actors: Vec::new(),
2304 roles: Vec::new(),
2305 })
2306 .await
2307 {
2308 Ok(cluster) => cluster,
2309 Err(ClusterError::Transport(err)) => {
2310 eprintln!("skipping quic bounded-cluster test: {err:?}");
2311 return;
2312 }
2313 Err(err) => panic!("attach bounded cluster: {err:?}"),
2314 };
2315
2316 let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
2317 cluster
2318 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
2319 .expect("declare remote actor");
2320
2321 assert!(cluster.can_route(&path));
2322 assert!(cluster
2323 .transport()
2324 .as_quic()
2325 .expect("quic transport")
2326 .can_route(AddrHash::new(&path, 0)));
2327 }
2328
2329 #[tokio::test]
2330 async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
2331 if !quic_tests_available() {
2332 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2333 return;
2334 }
2335
2336 let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
2337 let (ca, ca_key) = make_ca();
2338 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2339 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2340 let control_plane_b = reserve_udp_addr();
2341
2342 let engine_a = Engine::with_config(EngineConfig {
2343 engine_id: EngineId::new("engine-a.example"),
2344 ..Default::default()
2345 });
2346 let engine_b = Engine::with_config(EngineConfig {
2347 engine_id: EngineId::new("engine-b.example"),
2348 control_plane_quic_addr: Some(control_plane_b.to_string()),
2349 control_plane_tls: Some(tls_b.clone()),
2350 actor_spawn: Some(bounded_spawn_handler()),
2351 ..Default::default()
2352 });
2353
2354 let handle_a = engine_a.handle();
2355 let handle_b = engine_b.handle();
2356 handle_a
2357 .type_registry()
2358 .register_remote_ask::<EchoRequest>();
2359 handle_b
2360 .type_registry()
2361 .register_remote_ask::<EchoRequest>();
2362
2363 let cluster_b = match handle_b
2364 .attach_bounded_cluster(BoundedClusterConfig {
2365 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2366 peers: Vec::new(),
2367 declared_actors: Vec::new(),
2368 roles: Vec::new(),
2369 })
2370 .await
2371 {
2372 Ok(cluster) => cluster,
2373 Err(ClusterError::Transport(err)) => {
2374 eprintln!("skipping quic bounded-cluster test: {err:?}");
2375 return;
2376 }
2377 Err(err) => panic!("attach bounded cluster b: {err:?}"),
2378 };
2379 let quic_b = cluster_b
2380 .transport()
2381 .as_quic()
2382 .cloned()
2383 .expect("quic transport b");
2384
2385 let cluster_a = match handle_a
2386 .attach_bounded_cluster(BoundedClusterConfig {
2387 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
2388 peers: vec![PeerSpec {
2389 engine_id: EngineId::new("engine-b.example"),
2390 addr: quic_b.local_addr(),
2391 control_plane_addr: Some(control_plane_b),
2392 server_name: None,
2393 }],
2394 declared_actors: Vec::new(),
2395 roles: vec![BoundedClusterRole {
2396 name: "context-service".to_string(),
2397 owner: EngineId::new("engine-b.example"),
2398 }],
2399 })
2400 .await
2401 {
2402 Ok(cluster) => cluster,
2403 Err(ClusterError::Transport(err)) => {
2404 eprintln!("skipping quic bounded-cluster test: {err:?}");
2405 return;
2406 }
2407 Err(err) => panic!("attach bounded cluster a: {err:?}"),
2408 };
2409 let quic_a = cluster_a
2410 .transport()
2411 .as_quic()
2412 .cloned()
2413 .expect("quic transport a");
2414
2415 let mut cluster_b = cluster_b;
2416 cluster_b
2417 .add_peer(PeerSpec {
2418 engine_id: EngineId::new("engine-a.example"),
2419 addr: quic_a.local_addr(),
2420 control_plane_addr: None,
2421 server_name: None,
2422 })
2423 .expect("add quic peer a to cluster b");
2424
2425 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2426 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2427
2428 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2429 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2430 tokio::time::sleep(Duration::from_millis(150)).await;
2431
2432 let remote = cluster_a
2433 .spawn_remote_on_role::<EchoRequest>(
2434 "context-service",
2435 RemoteSpawnSpec {
2436 path: actor_path.clone(),
2437 type_name: "bounded-echo".to_string(),
2438 config: None,
2439 },
2440 )
2441 .await
2442 .expect("spawn remote actor over quic by role");
2443
2444 let response = remote
2445 .ask(EchoRequest { value: 9 })
2446 .await
2447 .expect("bounded remote spawn ask over quic by role");
2448 assert_eq!(response, EchoResponse { doubled: 18 });
2449
2450 let unknown_role = cluster_a
2451 .spawn_remote_on_role::<EchoRequest>(
2452 "missing-role",
2453 RemoteSpawnSpec {
2454 path: ActorPath::parse("/user/missing-role").unwrap(),
2455 type_name: "bounded-echo".to_string(),
2456 config: None,
2457 },
2458 )
2459 .await
2460 .expect_err("unknown role should fail");
2461 assert_eq!(
2462 unknown_role,
2463 RemoteSpawnError::UnknownRole("missing-role".to_string())
2464 );
2465
2466 shutdown_a_tx.send(()).ok();
2467 shutdown_b_tx.send(()).ok();
2468 }
2469}