1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::engine::EngineHandle;
6use crate::fs::{FileSystem, TokioFileSystem};
7use crate::reactor::{Reactor, TokioReactor};
8use crate::registry::ActorRegistry;
9use dashmap::DashMap;
10use palladium_actor::{ActorPath, AddrHash, EngineId, RemoteMessage};
11use palladium_transport::network::{Network, TokioNetwork};
12use palladium_transport::{
13 mailbox, QuicTransport, QuicTransportConfig, TcpTransport, TcpTransportConfig, Transport,
14 TransportError,
15};
16use rustls::pki_types::ServerName;
17use serde_json::{json, Value};
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
19use tokio_rustls::TlsConnector;
20
21trait SpawnPeerInstaller: Send + Sync {
22 fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>);
23}
24
25impl<R: Reactor + Clone, N: Network + Clone> SpawnPeerInstaller for BoundedTransportHandle<R, N> {
26 fn ensure_peer(&self, engine_id: EngineId, addr: SocketAddr, server_name: Option<String>) {
27 let source_addr = AddrHash::synthetic(engine_id.as_str().as_bytes());
28 match server_name {
29 Some(server_name) => self.add_peer(&PeerSpec {
30 engine_id: engine_id.clone(),
31 addr,
32 control_plane_addr: None,
33 server_name: Some(server_name),
34 }),
35 None => self.add_peer(&PeerSpec {
36 engine_id: engine_id.clone(),
37 addr,
38 control_plane_addr: None,
39 server_name: None,
40 }),
41 }
42 match self {
43 Self::Tcp(transport) => {
44 let _ = transport.register_remote_source_addr(engine_id, source_addr);
45 }
46 Self::Quic(transport) => {
47 let _ = transport.register_remote_source_addr(engine_id, source_addr);
48 }
49 }
50 }
51}
52
53fn spawn_peer_installers() -> &'static DashMap<String, Arc<dyn SpawnPeerInstaller>> {
54 static INSTALLERS: std::sync::OnceLock<DashMap<String, Arc<dyn SpawnPeerInstaller>>> =
55 std::sync::OnceLock::new();
56 INSTALLERS.get_or_init(DashMap::new)
57}
58
59pub(crate) fn register_spawn_peer_installer<R: Reactor + Clone, N: Network + Clone>(
60 engine_id: &EngineId,
61 transport: &BoundedTransportHandle<R, N>,
62) {
63 spawn_peer_installers().insert(
64 engine_id.as_str().to_string(),
65 Arc::new(transport.clone()) as Arc<dyn SpawnPeerInstaller>,
66 );
67}
68
69pub(crate) fn maybe_register_spawn_caller_peer(
70 host_engine_id: &EngineId,
71 caller_engine_id: &EngineId,
72 caller_transport_addr: SocketAddr,
73 caller_server_name: Option<String>,
74) {
75 if let Some(installer) = spawn_peer_installers().get(host_engine_id.as_str()) {
76 installer.ensure_peer(
77 caller_engine_id.clone(),
78 caller_transport_addr,
79 caller_server_name,
80 );
81 }
82}
83
84#[derive(Clone)]
85pub struct BoundedClusterConfig {
86 pub transport: BoundedTransportConfig,
87 pub peers: Vec<PeerSpec>,
88 pub declared_actors: Vec<DeclaredRemoteActor>,
89 pub roles: Vec<BoundedClusterRole>,
90}
91
92#[derive(Clone)]
93pub enum BoundedTransportConfig {
94 Tcp(TcpTransportConfig),
95 Quic(QuicTransportConfig),
96}
97
98impl BoundedTransportConfig {
99 pub fn engine_id(&self) -> &EngineId {
100 match self {
101 Self::Tcp(config) => &config.engine_id,
102 Self::Quic(config) => &config.engine_id,
103 }
104 }
105}
106
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub struct PeerSpec {
109 pub engine_id: EngineId,
110 pub addr: SocketAddr,
111 pub control_plane_addr: Option<SocketAddr>,
112 pub server_name: Option<String>,
113}
114
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct DeclaredRemoteActor {
117 pub path: ActorPath,
118 pub owner: EngineId,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq)]
122pub struct BoundedClusterRole {
123 pub name: String,
124 pub owner: EngineId,
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
128pub enum ClusterError {
129 DuplicatePeer(EngineId),
130 LocalPeerDeclared(EngineId),
131 DuplicateDeclaredActor(ActorPath),
132 LocalActorDeclaredRemote(ActorPath),
133 UnknownPeerOwner { path: ActorPath, owner: EngineId },
134 DuplicateRole(String),
135 UnknownRoleOwner { role: String, owner: EngineId },
136 Transport(TransportError),
137}
138
139#[derive(Clone, Debug, PartialEq)]
140pub struct RemoteSpawnSpec {
141 pub path: ActorPath,
142 pub type_name: String,
143 pub config: Option<Value>,
144}
145
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub enum RemoteSpawnError {
148 UnknownPeer(EngineId),
149 UnknownRole(String),
150 LocalPathCollision(ActorPath),
151 RemoteOwnershipConflict { path: ActorPath, owner: EngineId },
152 RemotePathAlreadyRunning(ActorPath),
153 MissingControlPlaneEndpoint(EngineId),
154 InvalidServerName(String),
155 ControlPlane(String),
156 SpawnTimedOut(ActorPath),
157}
158
159#[derive(Clone)]
160pub(crate) enum ControlPlaneProtocol {
161 Tcp,
162 Quic,
163}
164
165#[derive(Clone)]
166pub(crate) struct ControlPlaneClientConfig {
167 pub(crate) protocol: ControlPlaneProtocol,
168 pub(crate) tls: palladium_transport::TlsConfig,
169 pub(crate) wait_timeout: Duration,
170}
171
172const MIN_REMOTE_SPAWN_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
173
174impl From<TransportError> for ClusterError {
175 fn from(value: TransportError) -> Self {
176 Self::Transport(value)
177 }
178}
179
180impl From<TransportError> for RemoteSpawnError {
181 fn from(value: TransportError) -> Self {
182 Self::ControlPlane(format!("transport route install failed: {value:?}"))
183 }
184}
185
186#[derive(Clone)]
187pub enum BoundedTransportHandle<R: Reactor = TokioReactor, N: Network = TokioNetwork> {
188 Tcp(Arc<TcpTransport<R, N>>),
189 Quic(Arc<QuicTransport<R, N>>),
190}
191
192impl<R: Reactor + Clone, N: Network + Clone> BoundedTransportHandle<R, N> {
193 pub fn as_tcp(&self) -> Option<&Arc<TcpTransport<R, N>>> {
194 match self {
195 Self::Tcp(transport) => Some(transport),
196 Self::Quic(_) => None,
197 }
198 }
199
200 pub fn as_quic(&self) -> Option<&Arc<QuicTransport<R, N>>> {
201 match self {
202 Self::Tcp(_) => None,
203 Self::Quic(transport) => Some(transport),
204 }
205 }
206
207 pub fn add_peer(&self, peer: &PeerSpec) {
208 match self {
209 Self::Tcp(transport) => {
210 if let Some(server_name) = &peer.server_name {
211 transport.add_peer_with_server_name(
212 peer.engine_id.clone(),
213 peer.addr,
214 server_name.clone(),
215 );
216 } else {
217 transport.add_peer(peer.engine_id.clone(), peer.addr);
218 }
219 }
220 Self::Quic(transport) => {
221 if let Some(server_name) = &peer.server_name {
222 transport.add_peer_with_server_name(
223 peer.engine_id.clone(),
224 peer.addr,
225 server_name.clone(),
226 );
227 } else {
228 transport.add_peer(peer.engine_id.clone(), peer.addr);
229 }
230 }
231 }
232 }
233
234 pub fn declare_remote_path(
235 &self,
236 owner: EngineId,
237 path: &ActorPath,
238 ) -> Result<(), TransportError> {
239 match self {
240 Self::Tcp(transport) => transport.declare_remote_path(owner, path),
241 Self::Quic(transport) => transport.declare_remote_path(owner, path),
242 }
243 }
244
245 pub fn advertise_path(&self, addr: AddrHash, path: &ActorPath) -> Result<(), TransportError> {
246 match self {
247 Self::Tcp(transport) => transport.advertise_path(addr, path),
248 Self::Quic(transport) => transport.advertise_path(addr, path),
249 }
250 }
251
252 pub fn register_source_addr(&self, addr: AddrHash) -> Result<(), TransportError> {
253 match self {
254 Self::Tcp(transport) => transport.register(addr, mailbox(1).0),
255 Self::Quic(transport) => transport.register(addr, mailbox(1).0),
256 }
257 }
258
259 pub fn undeclare_remote_path(&self, path: &ActorPath) -> Result<(), TransportError> {
260 match self {
261 Self::Tcp(transport) => transport.undeclare_remote_path(path),
262 Self::Quic(transport) => transport.undeclare_remote_path(path),
263 }
264 }
265
266 pub fn can_route(&self, destination: AddrHash) -> bool {
267 match self {
268 Self::Tcp(transport) => transport.can_route(destination),
269 Self::Quic(transport) => transport.can_route(destination),
270 }
271 }
272
273 pub async fn wait_for_peer_connection(
274 &self,
275 engine_id: &EngineId,
276 timeout: Duration,
277 ) -> Result<(), TransportError> {
278 match self {
279 Self::Tcp(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
280 Self::Quic(transport) => transport.wait_for_peer_connection(engine_id, timeout).await,
281 }
282 }
283}
284
285#[derive(Clone)]
286pub struct BoundedClusterHandle<
287 R: Reactor = TokioReactor,
288 N: Network = TokioNetwork,
289 F: FileSystem = TokioFileSystem,
290> {
291 engine: EngineHandle<R, N, F>,
292 local_engine_id: EngineId,
293 control_plane: ControlPlaneClientConfig,
294 transport: BoundedTransportHandle<R, N>,
295 peers: Vec<PeerSpec>,
296 declared_actors: Vec<DeclaredRemoteActor>,
297 roles: Vec<BoundedClusterRole>,
298 registry: Arc<ActorRegistry<R>>,
299}
300
301impl<R: Reactor + Clone, N: Network + Clone, F: FileSystem + Clone> BoundedClusterHandle<R, N, F> {
302 pub(crate) fn new(
303 engine: EngineHandle<R, N, F>,
304 local_engine_id: EngineId,
305 control_plane: ControlPlaneClientConfig,
306 transport: BoundedTransportHandle<R, N>,
307 peers: Vec<PeerSpec>,
308 roles: Vec<BoundedClusterRole>,
309 registry: Arc<ActorRegistry<R>>,
310 ) -> Self {
311 Self {
312 engine,
313 local_engine_id,
314 control_plane,
315 transport,
316 peers,
317 declared_actors: Vec::new(),
318 roles,
319 registry,
320 }
321 }
322
323 pub fn transport(&self) -> &BoundedTransportHandle<R, N> {
324 &self.transport
325 }
326
327 pub fn peers(&self) -> &[PeerSpec] {
328 &self.peers
329 }
330
331 pub fn declared_actors(&self) -> &[DeclaredRemoteActor] {
332 &self.declared_actors
333 }
334
335 pub fn roles(&self) -> &[BoundedClusterRole] {
336 &self.roles
337 }
338
339 pub(crate) fn advertise_existing_local_routes(&self) -> Result<(), ClusterError> {
340 self.transport
341 .register_source_addr(self.engine.source_addr)
342 .map_err(ClusterError::Transport)?;
343
344 for info in self
345 .registry
346 .snapshot(&crate::introspection::ActorQuery::default(), 0)
347 {
348 if info.state != crate::introspection::ActorState::Running {
349 continue;
350 }
351 if let Some(slot) = self.registry.get_by_path(&info.path) {
352 self.transport
353 .advertise_path(slot.addr, &info.path)
354 .map_err(ClusterError::Transport)?;
355 }
356 }
357 Ok(())
358 }
359
360 pub fn add_peer(&mut self, peer: PeerSpec) -> Result<(), ClusterError> {
361 validate_peer_spec(&peer, &self.local_engine_id, &self.peers)?;
362 self.transport.add_peer(&peer);
363 self.advertise_existing_local_routes()?;
364 self.peers.push(peer);
365 Ok(())
366 }
367
368 pub fn declare_remote_actor(
369 &mut self,
370 owner: EngineId,
371 path: ActorPath,
372 ) -> Result<(), ClusterError> {
373 validate_declared_actor(
374 &path,
375 &owner,
376 &self.local_engine_id,
377 &self.peers,
378 &self.declared_actors,
379 &self.registry,
380 )?;
381 self.transport.declare_remote_path(owner.clone(), &path)?;
382 self.declared_actors
383 .push(DeclaredRemoteActor { path, owner });
384 Ok(())
385 }
386
387 pub fn undeclare_remote_actor(&mut self, path: &ActorPath) -> Result<(), ClusterError> {
388 self.transport.undeclare_remote_path(path)?;
389 self.declared_actors
390 .retain(|declared| &declared.path != path);
391 Ok(())
392 }
393
394 pub fn can_route(&self, path: &ActorPath) -> bool {
395 self.transport.can_route(AddrHash::new(path, 0))
396 }
397
398 pub async fn spawn_remote_on_role<M>(
399 &self,
400 role: &str,
401 spec: RemoteSpawnSpec,
402 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
403 where
404 M: RemoteMessage,
405 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
406 R: Send + Sync,
407 {
408 let owner = self
409 .roles
410 .iter()
411 .find(|declared| declared.name == role)
412 .map(|declared| declared.owner.clone())
413 .ok_or_else(|| RemoteSpawnError::UnknownRole(role.to_string()))?;
414 self.spawn_remote::<M>(owner, spec).await
415 }
416
417 pub async fn spawn_remote<M>(
418 &self,
419 target: EngineId,
420 spec: RemoteSpawnSpec,
421 ) -> Result<palladium_actor::Addr<M>, RemoteSpawnError>
422 where
423 M: RemoteMessage,
424 M::Response: serde::Serialize + for<'de> serde::Deserialize<'de> + Send + 'static,
425 R: Send + Sync,
426 {
427 validate_remote_spawn_request(&spec.path, &target, &self.declared_actors, &self.registry)?;
428
429 let peer = self
430 .peers
431 .iter()
432 .find(|peer| peer.engine_id == target)
433 .ok_or_else(|| RemoteSpawnError::UnknownPeer(target.clone()))?;
434 let control_plane_addr = peer
435 .control_plane_addr
436 .ok_or_else(|| RemoteSpawnError::MissingControlPlaneEndpoint(target.clone()))?;
437 let server_name = peer
438 .server_name
439 .clone()
440 .unwrap_or_else(|| target.as_str().to_string());
441 let caller_server_name = self.local_engine_id.as_str().to_string();
442
443 let deadline = Instant::now() + self.control_plane.wait_timeout;
444 loop {
445 match call_actor_spawn(
446 &self.control_plane,
447 control_plane_addr,
448 &server_name,
449 &self.local_engine_id,
450 transport_listen_addr(&self.transport),
451 &caller_server_name,
452 &spec,
453 )
454 .await
455 {
456 Ok(()) => break,
457 Err(RemoteSpawnError::ControlPlane(message))
458 if is_retryable_control_plane_error(&message) && Instant::now() <= deadline =>
459 {
460 tokio::time::sleep(Duration::from_millis(20)).await;
461 }
462 Err(err) => return Err(err),
463 }
464 }
465
466 while Instant::now() <= deadline {
467 if actor_is_spawned(
468 &self.control_plane,
469 control_plane_addr,
470 &server_name,
471 &spec.path,
472 )
473 .await?
474 {
475 self.transport
476 .declare_remote_path(target.clone(), &spec.path)
477 .map_err(RemoteSpawnError::from)?;
478 self.transport
479 .wait_for_peer_connection(&target, self.control_plane.wait_timeout)
480 .await
481 .map_err(RemoteSpawnError::from)?;
482 return Ok(self.engine.remote_addr_for_path::<M>(&spec.path));
483 }
484 tokio::time::sleep(Duration::from_millis(20)).await;
485 }
486
487 Err(RemoteSpawnError::SpawnTimedOut(spec.path))
488 }
489}
490
491async fn call_actor_spawn(
492 client: &ControlPlaneClientConfig,
493 addr: SocketAddr,
494 server_name: &str,
495 caller_engine_id: &EngineId,
496 caller_transport_addr: SocketAddr,
497 caller_server_name: &str,
498 spec: &RemoteSpawnSpec,
499) -> Result<(), RemoteSpawnError> {
500 let params = match &spec.config {
501 Some(config) => json!({
502 "path": spec.path.as_str(),
503 "type_name": spec.type_name,
504 "config": config,
505 "caller_engine_id": caller_engine_id.as_str(),
506 "caller_transport_addr": caller_transport_addr,
507 "caller_server_name": caller_server_name,
508 }),
509 None => json!({
510 "path": spec.path.as_str(),
511 "type_name": spec.type_name,
512 "caller_engine_id": caller_engine_id.as_str(),
513 "caller_transport_addr": caller_transport_addr,
514 "caller_server_name": caller_server_name,
515 }),
516 };
517 match call_control_plane(client, addr, server_name, "actor.spawn", params).await {
518 Ok(_) => {}
519 Err(RemoteSpawnError::ControlPlane(message))
520 if message.contains("actor already running at") =>
521 {
522 return Err(RemoteSpawnError::RemotePathAlreadyRunning(
523 spec.path.clone(),
524 ));
525 }
526 Err(err) => return Err(err),
527 }
528 Ok(())
529}
530
531fn transport_listen_addr<R: Reactor + Clone, N: Network + Clone>(
532 transport: &BoundedTransportHandle<R, N>,
533) -> SocketAddr {
534 match transport {
535 BoundedTransportHandle::Tcp(transport) => transport.local_addr(),
536 BoundedTransportHandle::Quic(transport) => transport.local_addr(),
537 }
538}
539
540fn is_retryable_control_plane_error(message: &str) -> bool {
541 let msg = message.to_ascii_lowercase();
542 msg.contains("connection refused")
543 || msg.contains("connection reset")
544 || msg.contains("timed out")
545 || msg.contains("broken pipe")
546 || msg.contains("not connected")
547}
548
549fn validate_remote_spawn_request<R: Reactor>(
550 path: &ActorPath,
551 owner: &EngineId,
552 declared_actors: &[DeclaredRemoteActor],
553 registry: &Arc<ActorRegistry<R>>,
554) -> Result<(), RemoteSpawnError> {
555 if registry.get_by_path(path).is_some() {
556 return Err(RemoteSpawnError::LocalPathCollision(path.clone()));
557 }
558
559 if let Some(declared) = declared_actors
560 .iter()
561 .find(|declared| declared.path == *path && declared.owner != *owner)
562 {
563 return Err(RemoteSpawnError::RemoteOwnershipConflict {
564 path: path.clone(),
565 owner: declared.owner.clone(),
566 });
567 }
568
569 Ok(())
570}
571
572async fn actor_is_spawned(
573 client: &ControlPlaneClientConfig,
574 addr: SocketAddr,
575 server_name: &str,
576 path: &ActorPath,
577) -> Result<bool, RemoteSpawnError> {
578 match call_control_plane(
579 client,
580 addr,
581 server_name,
582 "actor.info",
583 json!({ "path": path.as_str() }),
584 )
585 .await
586 {
587 Ok(_) => Ok(true),
588 Err(RemoteSpawnError::ControlPlane(message))
589 if message.contains("actor not found:")
590 || message.contains("actor not found")
591 || message.contains("User supervisor not available") =>
592 {
593 Ok(false)
594 }
595 Err(err) => Err(err),
596 }
597}
598
599async fn call_control_plane(
600 client: &ControlPlaneClientConfig,
601 addr: SocketAddr,
602 server_name: &str,
603 method: &str,
604 params: Value,
605) -> Result<Value, RemoteSpawnError> {
606 let request = json!({
607 "id": 1,
608 "method": method,
609 "params": params,
610 });
611
612 match client.protocol {
613 ControlPlaneProtocol::Tcp => {
614 call_control_plane_tcp(addr, server_name, &client.tls, &request).await
615 }
616 ControlPlaneProtocol::Quic => {
617 call_control_plane_quic(addr, server_name, &client.tls, &request).await
618 }
619 }
620}
621
622async fn call_control_plane_tcp(
623 addr: SocketAddr,
624 server_name: &str,
625 tls: &palladium_transport::TlsConfig,
626 request: &Value,
627) -> Result<Value, RemoteSpawnError> {
628 let config = tls
629 .client_config(server_name)
630 .map_err(|err| RemoteSpawnError::ControlPlane(format!("tcp tls config failed: {err:?}")))?;
631 let name = ServerName::try_from(server_name.to_string())
632 .map_err(|_| RemoteSpawnError::InvalidServerName(server_name.to_string()))?;
633 let stream = tokio::net::TcpStream::connect(addr)
634 .await
635 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
636 let connector = TlsConnector::from(config);
637 let tls_stream = connector
638 .connect(name, stream)
639 .await
640 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
641
642 let (reader, mut writer) = tokio::io::split(tls_stream);
643 let mut body = request.to_string();
644 body.push('\n');
645 writer
646 .write_all(body.as_bytes())
647 .await
648 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
649 let mut line = String::new();
650 let mut reader = BufReader::new(reader);
651 reader
652 .read_line(&mut line)
653 .await
654 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
655 parse_control_plane_response(&line)
656}
657
658async fn call_control_plane_quic(
659 addr: SocketAddr,
660 server_name: &str,
661 tls: &palladium_transport::TlsConfig,
662 request: &Value,
663) -> Result<Value, RemoteSpawnError> {
664 ensure_rustls_provider();
665 let mut client_crypto = (*tls.client_config(server_name).map_err(|err| {
666 RemoteSpawnError::ControlPlane(format!("quic tls config failed: {err:?}"))
667 })?)
668 .clone();
669 client_crypto.alpn_protocols = vec![b"pd-control".to_vec()];
670 let client_tls: s2n_quic::provider::tls::rustls::Client = client_crypto.into();
671
672 let client = s2n_quic::Client::builder()
673 .with_tls(client_tls)
674 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
675 .with_io("0.0.0.0:0")
676 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?
677 .start()
678 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
679
680 let connect = s2n_quic::client::Connect::new(addr).with_server_name(server_name.to_string());
681 let mut conn = client
682 .connect(connect)
683 .await
684 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
685 let mut stream = conn
686 .open_bidirectional_stream()
687 .await
688 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
689
690 let mut body = request.to_string();
691 body.push('\n');
692 stream
693 .write_all(body.as_bytes())
694 .await
695 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
696 stream
697 .close()
698 .await
699 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
700
701 let mut buf = Vec::new();
702 stream
703 .read_to_end(&mut buf)
704 .await
705 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
706 let line = String::from_utf8_lossy(&buf);
707 parse_control_plane_response(&line)
708}
709
710fn parse_control_plane_response(line: &str) -> Result<Value, RemoteSpawnError> {
711 let response: Value = serde_json::from_str(line.trim())
712 .map_err(|err| RemoteSpawnError::ControlPlane(err.to_string()))?;
713 if let Some(error) = response.get("error") {
714 let message = error
715 .get("message")
716 .and_then(Value::as_str)
717 .unwrap_or("unknown control-plane error");
718 return Err(RemoteSpawnError::ControlPlane(message.to_string()));
719 }
720 Ok(response.get("result").cloned().unwrap_or(Value::Null))
721}
722
723fn ensure_rustls_provider() {
724 static INIT: std::sync::Once = std::sync::Once::new();
725 INIT.call_once(|| {
726 #[cfg(feature = "aws-lc-rs")]
727 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
728 #[cfg(all(not(feature = "aws-lc-rs"), feature = "ring"))]
729 let _ = rustls::crypto::ring::default_provider().install_default();
730 });
731}
732
733pub(crate) fn remote_spawn_wait_timeout(ask_timeout: Duration) -> Duration {
734 ask_timeout.max(MIN_REMOTE_SPAWN_WAIT_TIMEOUT)
735}
736
737pub(crate) fn validate_config<R: Reactor>(
738 config: &BoundedClusterConfig,
739 registry: &Arc<ActorRegistry<R>>,
740) -> Result<(), ClusterError> {
741 let local_engine_id = config.transport.engine_id();
742 let mut seen_peers = Vec::new();
743 for peer in &config.peers {
744 validate_peer_spec(peer, local_engine_id, &seen_peers)?;
745 seen_peers.push(peer.clone());
746 }
747
748 let mut seen_declared = Vec::new();
749 for declared in &config.declared_actors {
750 validate_declared_actor(
751 &declared.path,
752 &declared.owner,
753 local_engine_id,
754 &config.peers,
755 &seen_declared,
756 registry,
757 )?;
758 seen_declared.push(declared.clone());
759 }
760
761 let mut seen_roles = Vec::new();
762 for role in &config.roles {
763 validate_role(role, &config.peers, &seen_roles)?;
764 seen_roles.push(role.clone());
765 }
766
767 Ok(())
768}
769
770fn validate_peer_spec(
771 peer: &PeerSpec,
772 local_engine_id: &EngineId,
773 existing: &[PeerSpec],
774) -> Result<(), ClusterError> {
775 if &peer.engine_id == local_engine_id {
776 return Err(ClusterError::LocalPeerDeclared(peer.engine_id.clone()));
777 }
778 if existing
779 .iter()
780 .any(|existing| existing.engine_id == peer.engine_id)
781 {
782 return Err(ClusterError::DuplicatePeer(peer.engine_id.clone()));
783 }
784 Ok(())
785}
786
787fn validate_declared_actor<R: Reactor>(
788 path: &ActorPath,
789 owner: &EngineId,
790 local_engine_id: &EngineId,
791 peers: &[PeerSpec],
792 existing: &[DeclaredRemoteActor],
793 registry: &Arc<ActorRegistry<R>>,
794) -> Result<(), ClusterError> {
795 if owner == local_engine_id || registry.get_by_path(path).is_some() {
796 return Err(ClusterError::LocalActorDeclaredRemote(path.clone()));
797 }
798 if existing.iter().any(|existing| existing.path == *path) {
799 return Err(ClusterError::DuplicateDeclaredActor(path.clone()));
800 }
801 if !peers.iter().any(|peer| peer.engine_id == *owner) {
802 return Err(ClusterError::UnknownPeerOwner {
803 path: path.clone(),
804 owner: owner.clone(),
805 });
806 }
807 Ok(())
808}
809
810fn validate_role(
811 role: &BoundedClusterRole,
812 peers: &[PeerSpec],
813 existing: &[BoundedClusterRole],
814) -> Result<(), ClusterError> {
815 if existing.iter().any(|existing| existing.name == role.name) {
816 return Err(ClusterError::DuplicateRole(role.name.clone()));
817 }
818 if !peers.iter().any(|peer| peer.engine_id == role.owner) {
819 return Err(ClusterError::UnknownRoleOwner {
820 role: role.name.clone(),
821 owner: role.owner.clone(),
822 });
823 }
824 Ok(())
825}
826
827#[cfg(test)]
828mod tests {
829 use super::{
830 call_control_plane, validate_config, BoundedClusterConfig, BoundedClusterRole,
831 BoundedTransportConfig, ClusterError, DeclaredRemoteActor, PeerSpec, RemoteSpawnError,
832 RemoteSpawnSpec,
833 };
834 use crate::{Engine, EngineConfig};
835 use palladium_actor::{
836 Actor, ActorContext, ActorError, ActorPath, AddrHash, ChildSpec, EngineId, Envelope,
837 Message, MessagePayload, NamespacePolicy, RestartPolicy, ShutdownPolicy,
838 };
839 use palladium_transport::{QuicTransportConfig, TcpTransportConfig, TlsConfig, Transport};
840 use rcgen::{
841 BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
842 KeyUsagePurpose,
843 };
844 use serde_json::json;
845 use std::net::{SocketAddr, TcpListener, UdpSocket};
846 use std::sync::Arc;
847 use std::time::{Duration, Instant};
848
849 fn make_ca() -> (Certificate, KeyPair) {
850 let mut params = CertificateParams::default();
851 params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
852 params.key_usages = vec![KeyUsagePurpose::KeyCertSign, KeyUsagePurpose::CrlSign];
853 let ca_key = KeyPair::generate().expect("ca keypair");
854 let cert = params.self_signed(&ca_key).expect("ca cert");
855 (cert, ca_key)
856 }
857
858 fn make_tls_config(common_name: &str, ca: &Certificate, ca_key: &KeyPair) -> TlsConfig {
859 let keypair = KeyPair::generate().expect("leaf keypair");
860 let params = CertificateParams::new(vec![common_name.to_string()]).expect("params");
861 let mut params = params;
862 params.key_usages = vec![KeyUsagePurpose::DigitalSignature];
863 params.extended_key_usages = vec![
864 ExtendedKeyUsagePurpose::ServerAuth,
865 ExtendedKeyUsagePurpose::ClientAuth,
866 ];
867 let cert = params.signed_by(&keypair, ca, ca_key).expect("leaf cert");
868 let cert_der = cert.der().to_vec();
869 let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(keypair.serialize_der());
870 let ca_der = ca.der().to_vec();
871 TlsConfig {
872 cert_chain: vec![rustls::pki_types::CertificateDer::from(cert_der)],
873 private_key: rustls::pki_types::PrivateKeyDer::from(key_der),
874 trusted_cas: vec![rustls::pki_types::CertificateDer::from(ca_der)],
875 }
876 }
877
878 fn tcp_config(engine_id: &str, tls: TlsConfig) -> TcpTransportConfig {
879 TcpTransportConfig {
880 engine_id: EngineId::new(engine_id),
881 listen_addr: "127.0.0.1:0".parse().unwrap(),
882 max_connections_per_peer: 1,
883 idle_timeout: Duration::from_secs(1),
884 send_buffer_size: 8,
885 nodelay: true,
886 tls,
887 send_delay: Duration::from_millis(0),
888 }
889 }
890
891 fn reserve_tcp_addr() -> SocketAddr {
892 let listener = TcpListener::bind("127.0.0.1:0").expect("reserve tcp addr");
893 let addr = listener.local_addr().expect("tcp local addr");
894 drop(listener);
895 addr
896 }
897
898 fn reserve_udp_addr() -> SocketAddr {
899 let socket = UdpSocket::bind("127.0.0.1:0").expect("reserve udp addr");
900 let addr = socket.local_addr().expect("udp local addr");
901 drop(socket);
902 addr
903 }
904
905 #[test]
906 fn bounded_cluster_config_rejects_duplicate_peers() {
907 let (ca, ca_key) = make_ca();
908 let engine = Engine::with_config(EngineConfig {
909 engine_id: EngineId::new("engine-a.example"),
910 ..Default::default()
911 });
912 let config = BoundedClusterConfig {
913 transport: BoundedTransportConfig::Tcp(tcp_config(
914 "engine-a.example",
915 make_tls_config("engine-a.example", &ca, &ca_key),
916 )),
917 peers: vec![
918 PeerSpec {
919 engine_id: EngineId::new("engine-b.example"),
920 addr: "127.0.0.1:4100".parse().unwrap(),
921 control_plane_addr: None,
922 server_name: None,
923 },
924 PeerSpec {
925 engine_id: EngineId::new("engine-b.example"),
926 addr: "127.0.0.1:4200".parse().unwrap(),
927 control_plane_addr: None,
928 server_name: None,
929 },
930 ],
931 declared_actors: Vec::new(),
932 roles: Vec::new(),
933 };
934
935 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
936 assert_eq!(
937 err,
938 ClusterError::DuplicatePeer(EngineId::new("engine-b.example"))
939 );
940 }
941
942 #[test]
943 fn bounded_cluster_config_rejects_unknown_declared_actor_owner() {
944 let (ca, ca_key) = make_ca();
945 let engine = Engine::with_config(EngineConfig {
946 engine_id: EngineId::new("engine-a.example"),
947 ..Default::default()
948 });
949 let config = BoundedClusterConfig {
950 transport: BoundedTransportConfig::Tcp(tcp_config(
951 "engine-a.example",
952 make_tls_config("engine-a.example", &ca, &ca_key),
953 )),
954 peers: vec![PeerSpec {
955 engine_id: EngineId::new("engine-b.example"),
956 addr: "127.0.0.1:4100".parse().unwrap(),
957 control_plane_addr: None,
958 server_name: None,
959 }],
960 declared_actors: vec![DeclaredRemoteActor {
961 path: ActorPath::parse("/user/missing-owner").unwrap(),
962 owner: EngineId::new("engine-c.example"),
963 }],
964 roles: Vec::new(),
965 };
966
967 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
968 assert_eq!(
969 err,
970 ClusterError::UnknownPeerOwner {
971 path: ActorPath::parse("/user/missing-owner").unwrap(),
972 owner: EngineId::new("engine-c.example"),
973 }
974 );
975 }
976
977 #[test]
978 fn bounded_cluster_config_rejects_duplicate_roles() {
979 let (ca, ca_key) = make_ca();
980 let engine = Engine::with_config(EngineConfig {
981 engine_id: EngineId::new("engine-a.example"),
982 ..Default::default()
983 });
984 let config = BoundedClusterConfig {
985 transport: BoundedTransportConfig::Tcp(tcp_config(
986 "engine-a.example",
987 make_tls_config("engine-a.example", &ca, &ca_key),
988 )),
989 peers: vec![PeerSpec {
990 engine_id: EngineId::new("engine-b.example"),
991 addr: "127.0.0.1:4100".parse().unwrap(),
992 control_plane_addr: None,
993 server_name: None,
994 }],
995 declared_actors: Vec::new(),
996 roles: vec![
997 BoundedClusterRole {
998 name: "context-service".to_string(),
999 owner: EngineId::new("engine-b.example"),
1000 },
1001 BoundedClusterRole {
1002 name: "context-service".to_string(),
1003 owner: EngineId::new("engine-b.example"),
1004 },
1005 ],
1006 };
1007
1008 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1009 assert_eq!(
1010 err,
1011 ClusterError::DuplicateRole("context-service".to_string())
1012 );
1013 }
1014
1015 #[test]
1016 fn bounded_cluster_config_rejects_unknown_role_owner() {
1017 let (ca, ca_key) = make_ca();
1018 let engine = Engine::with_config(EngineConfig {
1019 engine_id: EngineId::new("engine-a.example"),
1020 ..Default::default()
1021 });
1022 let config = BoundedClusterConfig {
1023 transport: BoundedTransportConfig::Tcp(tcp_config(
1024 "engine-a.example",
1025 make_tls_config("engine-a.example", &ca, &ca_key),
1026 )),
1027 peers: vec![PeerSpec {
1028 engine_id: EngineId::new("engine-b.example"),
1029 addr: "127.0.0.1:4100".parse().unwrap(),
1030 control_plane_addr: None,
1031 server_name: None,
1032 }],
1033 declared_actors: Vec::new(),
1034 roles: vec![BoundedClusterRole {
1035 name: "gateway".to_string(),
1036 owner: EngineId::new("engine-c.example"),
1037 }],
1038 };
1039
1040 let err = validate_config(&config, &engine.handle().registry).unwrap_err();
1041 assert_eq!(
1042 err,
1043 ClusterError::UnknownRoleOwner {
1044 role: "gateway".to_string(),
1045 owner: EngineId::new("engine-c.example"),
1046 }
1047 );
1048 }
1049
1050 #[test]
1051 fn remote_spawn_wait_timeout_is_not_coupled_to_small_ask_timeout() {
1052 assert_eq!(
1053 super::remote_spawn_wait_timeout(Duration::from_millis(25)),
1054 Duration::from_secs(5)
1055 );
1056 assert_eq!(
1057 super::remote_spawn_wait_timeout(Duration::from_secs(8)),
1058 Duration::from_secs(8)
1059 );
1060 }
1061
1062 fn tcp_tests_available() -> bool {
1063 if std::env::var("PD_ENABLE_TCP_TESTS").is_err() {
1064 return false;
1065 }
1066 std::net::TcpListener::bind("127.0.0.1:0").is_ok()
1067 }
1068
1069 fn quic_tests_available() -> bool {
1070 super::ensure_rustls_provider();
1071 rustls::crypto::CryptoProvider::get_default().is_some()
1072 }
1073
1074 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1075 struct EchoRequest {
1076 value: u64,
1077 }
1078
1079 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1080 struct EchoResponse {
1081 doubled: u64,
1082 }
1083
1084 impl Message for EchoRequest {
1085 type Response = EchoResponse;
1086 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedEcho");
1087 }
1088
1089 struct EchoActor;
1090
1091 impl<R: crate::reactor::Reactor> Actor<R> for EchoActor {
1092 fn on_message(
1093 &mut self,
1094 ctx: &mut ActorContext<R>,
1095 envelope: &Envelope,
1096 payload: MessagePayload,
1097 ) -> Result<(), ActorError> {
1098 let req = payload
1099 .extract::<EchoRequest>()
1100 .map_err(|_| ActorError::Handler)?;
1101 let resp = EchoResponse {
1102 doubled: req.value * 2,
1103 };
1104 let resp_env = envelope.response(0);
1105 ctx.send_raw(resp_env, MessagePayload::local(resp))
1106 .map_err(|_| ActorError::Handler)
1107 }
1108 }
1109
1110 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1111 struct CrashRequest;
1112
1113 impl Message for CrashRequest {
1114 type Response = ();
1115 const TYPE_TAG: u64 = palladium_actor::fnv1a_64("palladium_runtime.test.BoundedCrash");
1116 }
1117
1118 struct SpawnedEchoActor;
1119
1120 impl<R: crate::reactor::Reactor> Actor<R> for SpawnedEchoActor {
1121 fn on_message(
1122 &mut self,
1123 ctx: &mut ActorContext<R>,
1124 envelope: &Envelope,
1125 payload: MessagePayload,
1126 ) -> Result<(), ActorError> {
1127 match envelope.type_tag {
1128 EchoRequest::TYPE_TAG => {
1129 let req = payload
1130 .extract::<EchoRequest>()
1131 .map_err(|_| ActorError::Handler)?;
1132 let resp = EchoResponse {
1133 doubled: req.value * 2,
1134 };
1135 let resp_env = envelope.response(0);
1136 ctx.send_raw(resp_env, MessagePayload::local(resp))
1137 .map_err(|_| ActorError::Handler)
1138 }
1139 CrashRequest::TYPE_TAG => Err(ActorError::Handler),
1140 _ => Err(ActorError::Handler),
1141 }
1142 }
1143 }
1144
1145 fn bounded_spawn_handler() -> Arc<crate::engine::ActorSpawnFn<crate::TokioReactor>> {
1146 Arc::new(|type_name: &str, _config: &[u8]| match type_name {
1147 "bounded-echo" => Ok(Box::new(SpawnedEchoActor)),
1148 _ => Err(format!("unknown remote spawn type: {type_name}")),
1149 })
1150 }
1151
1152 #[tokio::test]
1153 async fn bounded_cluster_attach_reaches_remote_actor_over_tcp() {
1154 if !tcp_tests_available() {
1155 eprintln!("skipping tcp integration tests: network bind not permitted");
1156 return;
1157 }
1158
1159 let actor_path = ActorPath::parse("/user/cluster-echo").unwrap();
1160 let (ca, ca_key) = make_ca();
1161 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1162 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1163
1164 let mut engine_a = Engine::with_config(EngineConfig {
1165 engine_id: EngineId::new("engine-a.example"),
1166 ..Default::default()
1167 });
1168 let engine_b = Engine::with_config(EngineConfig {
1169 engine_id: EngineId::new("engine-b.example"),
1170 ..Default::default()
1171 });
1172
1173 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1174 engine_a.add_user_actor(ChildSpec::new(
1175 "cluster-echo",
1176 RestartPolicy::Permanent,
1177 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1178 ns,
1179 move || Box::new(EchoActor),
1180 ));
1181
1182 let handle_a = engine_a.handle();
1183 let handle_b = engine_b.handle();
1184 handle_a
1185 .type_registry()
1186 .register_remote_ask::<EchoRequest>();
1187 handle_b
1188 .type_registry()
1189 .register_remote_ask::<EchoRequest>();
1190
1191 let cluster_a = handle_a
1192 .attach_bounded_cluster(BoundedClusterConfig {
1193 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1194 peers: Vec::new(),
1195 declared_actors: Vec::new(),
1196 roles: Vec::new(),
1197 })
1198 .await
1199 .expect("attach bounded cluster a");
1200 let tcp_a = cluster_a
1201 .transport()
1202 .as_tcp()
1203 .cloned()
1204 .expect("tcp transport a");
1205
1206 let cluster_b = handle_b
1207 .attach_bounded_cluster(BoundedClusterConfig {
1208 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1209 peers: vec![PeerSpec {
1210 engine_id: EngineId::new("engine-a.example"),
1211 addr: tcp_a.local_addr(),
1212 control_plane_addr: None,
1213 server_name: None,
1214 }],
1215 declared_actors: Vec::new(),
1216 roles: Vec::new(),
1217 })
1218 .await
1219 .expect("attach bounded cluster b");
1220 let tcp_b = cluster_b
1221 .transport()
1222 .as_tcp()
1223 .cloned()
1224 .expect("tcp transport b");
1225
1226 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1227 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1228 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1229
1230 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1231 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1232
1233 let canonical = AddrHash::new(&actor_path, 0);
1234 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1235 while !cluster_b.can_route(&actor_path) {
1236 if std::time::Instant::now() > deadline {
1237 panic!("bounded cluster route did not propagate");
1238 }
1239 tokio::time::sleep(Duration::from_millis(20)).await;
1240 }
1241 assert!(tcp_b.can_route(canonical));
1242
1243 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1244 let response = remote
1245 .ask(EchoRequest { value: 12 })
1246 .await
1247 .expect("bounded cluster ask");
1248 assert_eq!(response, EchoResponse { doubled: 24 });
1249
1250 shutdown_a_tx.send(()).ok();
1251 shutdown_b_tx.send(()).ok();
1252 }
1253
1254 #[tokio::test]
1255 async fn bounded_cluster_add_peer_after_start_backfills_existing_tcp_routes() {
1256 if !tcp_tests_available() {
1257 eprintln!("skipping tcp integration tests: network bind not permitted");
1258 return;
1259 }
1260
1261 let actor_path = ActorPath::parse("/user/cluster-echo-add-peer").unwrap();
1262 let (ca, ca_key) = make_ca();
1263 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1264 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1265
1266 let mut engine_a = Engine::with_config(EngineConfig {
1267 engine_id: EngineId::new("engine-a.example"),
1268 ..Default::default()
1269 });
1270 let engine_b = Engine::with_config(EngineConfig {
1271 engine_id: EngineId::new("engine-b.example"),
1272 ..Default::default()
1273 });
1274
1275 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1276 engine_a.add_user_actor(ChildSpec::new(
1277 "cluster-echo-add-peer",
1278 RestartPolicy::Permanent,
1279 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1280 ns,
1281 move || Box::new(EchoActor),
1282 ));
1283
1284 let handle_a = engine_a.handle();
1285 let handle_b = engine_b.handle();
1286 handle_a
1287 .type_registry()
1288 .register_remote_ask::<EchoRequest>();
1289 handle_b
1290 .type_registry()
1291 .register_remote_ask::<EchoRequest>();
1292
1293 let mut cluster_a = handle_a
1294 .attach_bounded_cluster(BoundedClusterConfig {
1295 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1296 peers: Vec::new(),
1297 declared_actors: Vec::new(),
1298 roles: Vec::new(),
1299 })
1300 .await
1301 .expect("attach bounded cluster a");
1302 let tcp_a = cluster_a
1303 .transport()
1304 .as_tcp()
1305 .cloned()
1306 .expect("tcp transport a");
1307
1308 let cluster_b = handle_b
1309 .attach_bounded_cluster(BoundedClusterConfig {
1310 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1311 peers: vec![PeerSpec {
1312 engine_id: EngineId::new("engine-a.example"),
1313 addr: tcp_a.local_addr(),
1314 control_plane_addr: None,
1315 server_name: None,
1316 }],
1317 declared_actors: Vec::new(),
1318 roles: Vec::new(),
1319 })
1320 .await
1321 .expect("attach bounded cluster b");
1322 let tcp_b = cluster_b
1323 .transport()
1324 .as_tcp()
1325 .cloned()
1326 .expect("tcp transport b");
1327
1328 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1329 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1330
1331 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1332 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1333
1334 let local_ready_deadline = std::time::Instant::now() + Duration::from_secs(5);
1335 while handle_a.registry.get_by_path(&actor_path).is_none() {
1336 if std::time::Instant::now() > local_ready_deadline {
1337 panic!("local actor did not start before peer add");
1338 }
1339 tokio::time::sleep(Duration::from_millis(20)).await;
1340 }
1341
1342 cluster_a
1343 .add_peer(PeerSpec {
1344 engine_id: EngineId::new("engine-b.example"),
1345 addr: tcp_b.local_addr(),
1346 control_plane_addr: None,
1347 server_name: None,
1348 })
1349 .expect("add peer b to cluster a after start");
1350
1351 let canonical = AddrHash::new(&actor_path, 0);
1352 let deadline = std::time::Instant::now() + Duration::from_secs(5);
1353 while !cluster_b.can_route(&actor_path) {
1354 if std::time::Instant::now() > deadline {
1355 panic!("bounded cluster route did not propagate after add_peer");
1356 }
1357 tokio::time::sleep(Duration::from_millis(20)).await;
1358 }
1359 assert!(tcp_b.can_route(canonical));
1360
1361 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
1362 let response = remote
1363 .ask(EchoRequest { value: 15 })
1364 .await
1365 .expect("bounded cluster ask after add_peer");
1366 assert_eq!(response, EchoResponse { doubled: 30 });
1367
1368 shutdown_a_tx.send(()).ok();
1369 shutdown_b_tx.send(()).ok();
1370 }
1371
1372 #[tokio::test]
1373 async fn bounded_cluster_declared_remote_actor_installs_canonical_route() {
1374 if !tcp_tests_available() {
1375 eprintln!("skipping tcp integration tests: network bind not permitted");
1376 return;
1377 }
1378
1379 let (ca, ca_key) = make_ca();
1380 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1381 let engine = Engine::with_config(EngineConfig {
1382 engine_id: EngineId::new("engine-a.example"),
1383 ..Default::default()
1384 });
1385 let handle = engine.handle();
1386
1387 let mut cluster = handle
1388 .attach_bounded_cluster(BoundedClusterConfig {
1389 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1390 peers: vec![PeerSpec {
1391 engine_id: EngineId::new("engine-b.example"),
1392 addr: "127.0.0.1:4100".parse().unwrap(),
1393 control_plane_addr: None,
1394 server_name: None,
1395 }],
1396 declared_actors: Vec::new(),
1397 roles: Vec::new(),
1398 })
1399 .await
1400 .expect("attach bounded cluster");
1401
1402 let path = ActorPath::parse("/user/declared-remote").unwrap();
1403 cluster
1404 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
1405 .expect("declare remote actor");
1406
1407 assert!(cluster.can_route(&path));
1408 assert!(cluster
1409 .transport()
1410 .as_tcp()
1411 .expect("tcp transport")
1412 .can_route(AddrHash::new(&path, 0)));
1413 }
1414
1415 #[tokio::test]
1416 async fn bounded_cluster_spawn_remote_by_role_rejects_local_path_collision() {
1417 if !tcp_tests_available() {
1418 eprintln!("skipping tcp integration tests: network bind not permitted");
1419 return;
1420 }
1421
1422 let actor_path = ActorPath::parse("/user/local-remote-collision").unwrap();
1423 let (ca, ca_key) = make_ca();
1424 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1425 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1426 let control_plane_b = reserve_tcp_addr();
1427
1428 let mut engine_a = Engine::with_config(EngineConfig {
1429 engine_id: EngineId::new("engine-a.example"),
1430 ..Default::default()
1431 });
1432 let engine_b = Engine::with_config(EngineConfig {
1433 engine_id: EngineId::new("engine-b.example"),
1434 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1435 control_plane_tls: Some(tls_b.clone()),
1436 actor_spawn: Some(bounded_spawn_handler()),
1437 ..Default::default()
1438 });
1439
1440 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
1441 engine_a.add_user_actor(ChildSpec::new(
1442 "local-remote-collision",
1443 RestartPolicy::Permanent,
1444 ShutdownPolicy::Timeout(Duration::from_secs(1)),
1445 ns,
1446 move || Box::new(EchoActor),
1447 ));
1448
1449 let handle_a = engine_a.handle();
1450 let handle_b = engine_b.handle();
1451
1452 let cluster_b = handle_b
1453 .attach_bounded_cluster(BoundedClusterConfig {
1454 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1455 peers: Vec::new(),
1456 declared_actors: Vec::new(),
1457 roles: Vec::new(),
1458 })
1459 .await
1460 .expect("attach bounded cluster b");
1461 let tcp_b = cluster_b
1462 .transport()
1463 .as_tcp()
1464 .cloned()
1465 .expect("tcp transport b");
1466
1467 let cluster_a = handle_a
1468 .attach_bounded_cluster(BoundedClusterConfig {
1469 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1470 peers: vec![PeerSpec {
1471 engine_id: EngineId::new("engine-b.example"),
1472 addr: tcp_b.local_addr(),
1473 control_plane_addr: Some(control_plane_b),
1474 server_name: None,
1475 }],
1476 declared_actors: Vec::new(),
1477 roles: vec![BoundedClusterRole {
1478 name: "context-service".to_string(),
1479 owner: EngineId::new("engine-b.example"),
1480 }],
1481 })
1482 .await
1483 .expect("attach bounded cluster a");
1484
1485 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1486 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1487
1488 let deadline = Instant::now() + Duration::from_secs(5);
1489 while handle_a.registry.get_by_path(&actor_path).is_none() {
1490 if Instant::now() > deadline {
1491 panic!("local collision actor did not start");
1492 }
1493 tokio::time::sleep(Duration::from_millis(20)).await;
1494 }
1495
1496 let err = cluster_a
1497 .spawn_remote_on_role::<EchoRequest>(
1498 "context-service",
1499 RemoteSpawnSpec {
1500 path: actor_path.clone(),
1501 type_name: "bounded-echo".to_string(),
1502 config: None,
1503 },
1504 )
1505 .await
1506 .expect_err("local path collision should fail");
1507 assert_eq!(err, RemoteSpawnError::LocalPathCollision(actor_path));
1508
1509 shutdown_a_tx.send(()).ok();
1510 }
1511
1512 #[tokio::test]
1513 async fn bounded_cluster_spawn_remote_rejects_conflicting_declared_owner() {
1514 if !tcp_tests_available() {
1515 eprintln!("skipping tcp integration tests: network bind not permitted");
1516 return;
1517 }
1518
1519 let actor_path = ActorPath::parse("/user/conflicting-remote-owner").unwrap();
1520 let (ca, ca_key) = make_ca();
1521 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
1522 let engine = Engine::with_config(EngineConfig {
1523 engine_id: EngineId::new("engine-a.example"),
1524 ..Default::default()
1525 });
1526 let handle = engine.handle();
1527
1528 let cluster = handle
1529 .attach_bounded_cluster(BoundedClusterConfig {
1530 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls)),
1531 peers: vec![
1532 PeerSpec {
1533 engine_id: EngineId::new("engine-b.example"),
1534 addr: "127.0.0.1:4100".parse().unwrap(),
1535 control_plane_addr: Some("127.0.0.1:5100".parse().unwrap()),
1536 server_name: None,
1537 },
1538 PeerSpec {
1539 engine_id: EngineId::new("engine-c.example"),
1540 addr: "127.0.0.1:4200".parse().unwrap(),
1541 control_plane_addr: Some("127.0.0.1:5200".parse().unwrap()),
1542 server_name: None,
1543 },
1544 ],
1545 declared_actors: vec![DeclaredRemoteActor {
1546 path: actor_path.clone(),
1547 owner: EngineId::new("engine-b.example"),
1548 }],
1549 roles: Vec::new(),
1550 })
1551 .await
1552 .expect("attach bounded cluster");
1553
1554 let err = cluster
1555 .spawn_remote::<EchoRequest>(
1556 EngineId::new("engine-c.example"),
1557 RemoteSpawnSpec {
1558 path: actor_path.clone(),
1559 type_name: "bounded-echo".to_string(),
1560 config: None,
1561 },
1562 )
1563 .await
1564 .expect_err("conflicting declared owner should fail");
1565 assert_eq!(
1566 err,
1567 RemoteSpawnError::RemoteOwnershipConflict {
1568 path: actor_path,
1569 owner: EngineId::new("engine-b.example"),
1570 }
1571 );
1572 }
1573
1574 #[tokio::test]
1575 async fn bounded_cluster_spawn_remote_over_tcp_returns_typed_handle() {
1576 if !tcp_tests_available() {
1577 eprintln!("skipping tcp integration tests: network bind not permitted");
1578 return;
1579 }
1580
1581 let actor_path = ActorPath::parse("/user/remote-spawn-tcp").unwrap();
1582 let (ca, ca_key) = make_ca();
1583 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1584 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1585 let control_plane_b = reserve_tcp_addr();
1586
1587 let engine_a = Engine::with_config(EngineConfig {
1588 engine_id: EngineId::new("engine-a.example"),
1589 ..Default::default()
1590 });
1591 let engine_b = Engine::with_config(EngineConfig {
1592 engine_id: EngineId::new("engine-b.example"),
1593 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1594 control_plane_tls: Some(tls_b.clone()),
1595 actor_spawn: Some(bounded_spawn_handler()),
1596 ..Default::default()
1597 });
1598
1599 let handle_a = engine_a.handle();
1600 let handle_b = engine_b.handle();
1601 handle_a
1602 .type_registry()
1603 .register_remote_ask::<EchoRequest>();
1604 handle_b
1605 .type_registry()
1606 .register_remote_ask::<EchoRequest>();
1607
1608 let cluster_a = handle_a
1609 .attach_bounded_cluster(BoundedClusterConfig {
1610 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1611 peers: Vec::new(),
1612 declared_actors: Vec::new(),
1613 roles: Vec::new(),
1614 })
1615 .await
1616 .expect("attach bounded cluster a");
1617 let tcp_a = cluster_a
1618 .transport()
1619 .as_tcp()
1620 .cloned()
1621 .expect("tcp transport a");
1622
1623 let cluster_b = handle_b
1624 .attach_bounded_cluster(BoundedClusterConfig {
1625 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1626 peers: vec![PeerSpec {
1627 engine_id: EngineId::new("engine-a.example"),
1628 addr: tcp_a.local_addr(),
1629 control_plane_addr: None,
1630 server_name: None,
1631 }],
1632 declared_actors: Vec::new(),
1633 roles: Vec::new(),
1634 })
1635 .await
1636 .expect("attach bounded cluster b");
1637 let tcp_b = cluster_b
1638 .transport()
1639 .as_tcp()
1640 .cloned()
1641 .expect("tcp transport b");
1642
1643 tcp_a.add_peer(EngineId::new("engine-b.example"), tcp_b.local_addr());
1644 let mut cluster_a = cluster_a;
1645 cluster_a
1646 .add_peer(PeerSpec {
1647 engine_id: EngineId::new("engine-b.example"),
1648 addr: tcp_b.local_addr(),
1649 control_plane_addr: Some(control_plane_b),
1650 server_name: None,
1651 })
1652 .expect("add peer with control plane");
1653
1654 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1655 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1656
1657 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1658 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1659 tokio::time::sleep(Duration::from_millis(150)).await;
1660
1661 let remote = cluster_a
1662 .spawn_remote::<EchoRequest>(
1663 EngineId::new("engine-b.example"),
1664 RemoteSpawnSpec {
1665 path: actor_path.clone(),
1666 type_name: "bounded-echo".to_string(),
1667 config: None,
1668 },
1669 )
1670 .await
1671 .expect("spawn remote actor");
1672
1673 let response = remote
1674 .ask(EchoRequest { value: 21 })
1675 .await
1676 .expect("bounded remote spawn ask");
1677 assert_eq!(response, EchoResponse { doubled: 42 });
1678
1679 let duplicate = cluster_a
1680 .spawn_remote::<EchoRequest>(
1681 EngineId::new("engine-b.example"),
1682 RemoteSpawnSpec {
1683 path: actor_path.clone(),
1684 type_name: "bounded-echo".to_string(),
1685 config: None,
1686 },
1687 )
1688 .await
1689 .expect_err("duplicate remote spawn should fail");
1690 assert_eq!(
1691 duplicate,
1692 RemoteSpawnError::RemotePathAlreadyRunning(actor_path.clone())
1693 );
1694
1695 let old_host_addr = handle_b
1696 .registry
1697 .get_by_path(&actor_path)
1698 .expect("spawned actor registered on host")
1699 .addr;
1700 call_control_plane(
1701 &cluster_a.control_plane,
1702 control_plane_b,
1703 "engine-b.example",
1704 "actor.restart",
1705 json!({ "path": actor_path.as_str() }),
1706 )
1707 .await
1708 .expect("restart remote actor on host");
1709
1710 let restart_deadline = Instant::now() + Duration::from_secs(5);
1711 loop {
1712 if handle_b
1713 .registry
1714 .get_by_path(&actor_path)
1715 .map(|slot| {
1716 slot.running.load(std::sync::atomic::Ordering::Relaxed)
1717 && slot.addr != old_host_addr
1718 && slot
1719 .restart_count
1720 .load(std::sync::atomic::Ordering::Relaxed)
1721 > 0
1722 })
1723 .unwrap_or(false)
1724 {
1725 break;
1726 }
1727 if Instant::now() > restart_deadline {
1728 panic!("remote actor did not restart on host");
1729 }
1730 tokio::time::sleep(Duration::from_millis(25)).await;
1731 }
1732
1733 let restart_deadline = Instant::now() + Duration::from_secs(5);
1734 loop {
1735 if cluster_a.can_route(&actor_path) {
1736 break;
1737 }
1738 if Instant::now() > restart_deadline {
1739 panic!("remote actor route did not return after restart");
1740 }
1741 tokio::time::sleep(Duration::from_millis(25)).await;
1742 }
1743
1744 let restart_deadline = Instant::now() + Duration::from_secs(5);
1745 loop {
1746 match remote.ask(EchoRequest { value: 7 }).await {
1747 Ok(response) => {
1748 assert_eq!(response, EchoResponse { doubled: 14 });
1749 break;
1750 }
1751 Err(_) if Instant::now() <= restart_deadline => {
1752 tokio::time::sleep(Duration::from_millis(25)).await;
1753 }
1754 Err(err) => panic!("remote actor did not recover after restart: {err:?}"),
1755 }
1756 }
1757
1758 let unknown = cluster_a
1759 .spawn_remote::<EchoRequest>(
1760 EngineId::new("engine-c.example"),
1761 RemoteSpawnSpec {
1762 path: ActorPath::parse("/user/unknown-target").unwrap(),
1763 type_name: "bounded-echo".to_string(),
1764 config: None,
1765 },
1766 )
1767 .await
1768 .expect_err("unknown target should fail");
1769 assert_eq!(
1770 unknown,
1771 RemoteSpawnError::UnknownPeer(EngineId::new("engine-c.example"))
1772 );
1773
1774 shutdown_a_tx.send(()).ok();
1775 shutdown_b_tx.send(()).ok();
1776 }
1777
1778 #[tokio::test]
1779 async fn bounded_cluster_spawn_remote_over_tcp_by_role_returns_typed_handle() {
1780 if !tcp_tests_available() {
1781 eprintln!("skipping tcp integration tests: network bind not permitted");
1782 return;
1783 }
1784
1785 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp").unwrap();
1786 let (ca, ca_key) = make_ca();
1787 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
1788 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
1789 let control_plane_b = reserve_tcp_addr();
1790
1791 let engine_a = Engine::with_config(EngineConfig {
1792 engine_id: EngineId::new("engine-a.example"),
1793 ..Default::default()
1794 });
1795 let engine_b = Engine::with_config(EngineConfig {
1796 engine_id: EngineId::new("engine-b.example"),
1797 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1798 control_plane_tls: Some(tls_b.clone()),
1799 actor_spawn: Some(bounded_spawn_handler()),
1800 ..Default::default()
1801 });
1802
1803 let handle_a = engine_a.handle();
1804 let handle_b = engine_b.handle();
1805 handle_a
1806 .type_registry()
1807 .register_remote_ask::<EchoRequest>();
1808 handle_b
1809 .type_registry()
1810 .register_remote_ask::<EchoRequest>();
1811
1812 let cluster_b = handle_b
1813 .attach_bounded_cluster(BoundedClusterConfig {
1814 transport: BoundedTransportConfig::Tcp(tcp_config("engine-b.example", tls_b)),
1815 peers: Vec::new(),
1816 declared_actors: Vec::new(),
1817 roles: Vec::new(),
1818 })
1819 .await
1820 .expect("attach bounded cluster b");
1821 let tcp_b = cluster_b
1822 .transport()
1823 .as_tcp()
1824 .cloned()
1825 .expect("tcp transport b");
1826
1827 let cluster_a = handle_a
1828 .attach_bounded_cluster(BoundedClusterConfig {
1829 transport: BoundedTransportConfig::Tcp(tcp_config("engine-a.example", tls_a)),
1830 peers: vec![PeerSpec {
1831 engine_id: EngineId::new("engine-b.example"),
1832 addr: tcp_b.local_addr(),
1833 control_plane_addr: Some(control_plane_b),
1834 server_name: None,
1835 }],
1836 declared_actors: Vec::new(),
1837 roles: vec![BoundedClusterRole {
1838 name: "context-service".to_string(),
1839 owner: EngineId::new("engine-b.example"),
1840 }],
1841 })
1842 .await
1843 .expect("attach bounded cluster a");
1844 let tcp_a = cluster_a
1845 .transport()
1846 .as_tcp()
1847 .cloned()
1848 .expect("tcp transport a");
1849
1850 let mut cluster_b = cluster_b;
1851 cluster_b
1852 .add_peer(PeerSpec {
1853 engine_id: EngineId::new("engine-a.example"),
1854 addr: tcp_a.local_addr(),
1855 control_plane_addr: None,
1856 server_name: None,
1857 })
1858 .expect("add peer a to cluster b");
1859
1860 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1861 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1862
1863 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1864 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1865 tokio::time::sleep(Duration::from_millis(150)).await;
1866
1867 let remote = cluster_a
1868 .spawn_remote_on_role::<EchoRequest>(
1869 "context-service",
1870 RemoteSpawnSpec {
1871 path: actor_path.clone(),
1872 type_name: "bounded-echo".to_string(),
1873 config: None,
1874 },
1875 )
1876 .await
1877 .expect("spawn remote actor by role over tcp");
1878
1879 let response = remote
1880 .ask(EchoRequest { value: 11 })
1881 .await
1882 .expect("bounded remote spawn ask over tcp by role");
1883 assert_eq!(response, EchoResponse { doubled: 22 });
1884
1885 let unknown_role = cluster_a
1886 .spawn_remote_on_role::<EchoRequest>(
1887 "missing-role",
1888 RemoteSpawnSpec {
1889 path: ActorPath::parse("/user/missing-role-tcp").unwrap(),
1890 type_name: "bounded-echo".to_string(),
1891 config: None,
1892 },
1893 )
1894 .await
1895 .expect_err("unknown role should fail");
1896 assert_eq!(
1897 unknown_role,
1898 RemoteSpawnError::UnknownRole("missing-role".to_string())
1899 );
1900
1901 shutdown_a_tx.send(()).ok();
1902 shutdown_b_tx.send(()).ok();
1903 }
1904
1905 #[tokio::test]
1906 async fn bounded_cluster_spawn_remote_over_tcp_by_role_without_reverse_declaration() {
1907 if !tcp_tests_available() {
1908 eprintln!("skipping tcp integration tests: network bind not permitted");
1909 return;
1910 }
1911
1912 let engine_a_id = EngineId::new("engine-a-norev.example");
1913 let engine_b_id = EngineId::new("engine-b-norev.example");
1914 let actor_path = ActorPath::parse("/user/remote-spawn-role-tcp-slow").unwrap();
1915 let (ca, ca_key) = make_ca();
1916 let tls_a = make_tls_config(engine_a_id.as_str(), &ca, &ca_key);
1917 let tls_b = make_tls_config(engine_b_id.as_str(), &ca, &ca_key);
1918 let control_plane_b = reserve_tcp_addr();
1919
1920 let engine_a = Engine::with_config(EngineConfig {
1921 engine_id: engine_a_id.clone(),
1922 ..Default::default()
1923 });
1924 let engine_b = Engine::with_config(EngineConfig {
1925 engine_id: engine_b_id.clone(),
1926 control_plane_tcp_addr: Some(control_plane_b.to_string()),
1927 control_plane_tls: Some(tls_b.clone()),
1928 actor_spawn: Some(bounded_spawn_handler()),
1929 ..Default::default()
1930 });
1931
1932 let handle_a = engine_a.handle();
1933 let handle_b = engine_b.handle();
1934 handle_a
1935 .type_registry()
1936 .register_remote_ask::<EchoRequest>();
1937 handle_b
1938 .type_registry()
1939 .register_remote_ask::<EchoRequest>();
1940
1941 let cluster_b = handle_b
1942 .attach_bounded_cluster(BoundedClusterConfig {
1943 transport: BoundedTransportConfig::Tcp(tcp_config(engine_b_id.as_str(), tls_b)),
1944 peers: Vec::new(),
1945 declared_actors: Vec::new(),
1946 roles: Vec::new(),
1947 })
1948 .await
1949 .expect("attach bounded cluster b");
1950 let tcp_b = cluster_b
1951 .transport()
1952 .as_tcp()
1953 .cloned()
1954 .expect("tcp transport b");
1955
1956 let cluster_a = handle_a
1957 .attach_bounded_cluster(BoundedClusterConfig {
1958 transport: BoundedTransportConfig::Tcp(tcp_config(engine_a_id.as_str(), tls_a)),
1959 peers: vec![PeerSpec {
1960 engine_id: engine_b_id.clone(),
1961 addr: tcp_b.local_addr(),
1962 control_plane_addr: Some(control_plane_b),
1963 server_name: None,
1964 }],
1965 declared_actors: Vec::new(),
1966 roles: vec![BoundedClusterRole {
1967 name: "context-service".to_string(),
1968 owner: engine_b_id.clone(),
1969 }],
1970 })
1971 .await
1972 .expect("attach bounded cluster a");
1973 let tcp_a = cluster_a
1974 .transport()
1975 .as_tcp()
1976 .cloned()
1977 .expect("tcp transport a");
1978
1979 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
1980 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
1981
1982 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
1983 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
1984
1985 let _ = tcp_a;
1989
1990 let remote = cluster_a
1991 .spawn_remote_on_role::<EchoRequest>(
1992 "context-service",
1993 RemoteSpawnSpec {
1994 path: actor_path.clone(),
1995 type_name: "bounded-echo".to_string(),
1996 config: None,
1997 },
1998 )
1999 .await
2000 .expect("spawn remote actor by role over tcp without reverse declaration");
2001
2002 let response = remote
2003 .ask(EchoRequest { value: 13 })
2004 .await
2005 .expect("bounded remote spawn ask over tcp without reverse declaration");
2006 assert_eq!(response, EchoResponse { doubled: 26 });
2007
2008 shutdown_a_tx.send(()).ok();
2009 shutdown_b_tx.send(()).ok();
2010 }
2011
2012 fn quic_config(engine_id: &str, tls: TlsConfig) -> QuicTransportConfig {
2013 QuicTransportConfig {
2014 engine_id: EngineId::new(engine_id),
2015 listen_addr: "127.0.0.1:0".parse().unwrap(),
2016 send_buffer_size: 1024,
2017 idle_timeout: Duration::from_secs(2),
2018 tls,
2019 }
2020 }
2021
2022 #[tokio::test]
2023 async fn bounded_cluster_attach_reaches_remote_actor_over_quic() {
2024 if !quic_tests_available() {
2025 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2026 return;
2027 }
2028
2029 let actor_path = ActorPath::parse("/user/cluster-echo-quic").unwrap();
2030 let (ca, ca_key) = make_ca();
2031 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2032 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2033
2034 let mut engine_a = Engine::with_config(EngineConfig {
2035 engine_id: EngineId::new("engine-a.example"),
2036 ..Default::default()
2037 });
2038 let engine_b = Engine::with_config(EngineConfig {
2039 engine_id: EngineId::new("engine-b.example"),
2040 ..Default::default()
2041 });
2042
2043 let ns = NamespacePolicy::default_for(&actor_path).unwrap();
2044 engine_a.add_user_actor(ChildSpec::new(
2045 "cluster-echo-quic",
2046 RestartPolicy::Permanent,
2047 ShutdownPolicy::Timeout(Duration::from_secs(1)),
2048 ns,
2049 move || Box::new(EchoActor),
2050 ));
2051
2052 let handle_a = engine_a.handle();
2053 let handle_b = engine_b.handle();
2054 handle_a
2055 .type_registry()
2056 .register_remote_ask::<EchoRequest>();
2057 handle_b
2058 .type_registry()
2059 .register_remote_ask::<EchoRequest>();
2060
2061 let cluster_a = match handle_a
2062 .attach_bounded_cluster(BoundedClusterConfig {
2063 transport: BoundedTransportConfig::Quic(quic_config(
2064 "engine-a.example",
2065 tls_a.clone(),
2066 )),
2067 peers: Vec::new(),
2068 declared_actors: Vec::new(),
2069 roles: Vec::new(),
2070 })
2071 .await
2072 {
2073 Ok(cluster) => cluster,
2074 Err(ClusterError::Transport(err)) => {
2075 eprintln!("skipping quic bounded-cluster test: {err:?}");
2076 return;
2077 }
2078 Err(err) => panic!("attach bounded cluster a: {err:?}"),
2079 };
2080 let quic_a = cluster_a
2081 .transport()
2082 .as_quic()
2083 .cloned()
2084 .expect("quic transport a");
2085
2086 let cluster_b = match handle_b
2087 .attach_bounded_cluster(BoundedClusterConfig {
2088 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2089 peers: vec![PeerSpec {
2090 engine_id: EngineId::new("engine-a.example"),
2091 addr: quic_a.local_addr(),
2092 control_plane_addr: None,
2093 server_name: None,
2094 }],
2095 declared_actors: Vec::new(),
2096 roles: Vec::new(),
2097 })
2098 .await
2099 {
2100 Ok(cluster) => cluster,
2101 Err(ClusterError::Transport(err)) => {
2102 eprintln!("skipping quic bounded-cluster test: {err:?}");
2103 return;
2104 }
2105 Err(err) => panic!("attach bounded cluster b: {err:?}"),
2106 };
2107 let quic_b = cluster_b
2108 .transport()
2109 .as_quic()
2110 .cloned()
2111 .expect("quic transport b");
2112
2113 quic_a.add_peer(EngineId::new("engine-b.example"), quic_b.local_addr());
2114 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2115 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2116
2117 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2118 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2119
2120 let canonical = AddrHash::new(&actor_path, 0);
2121 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2122 while !cluster_b.can_route(&actor_path) {
2123 if std::time::Instant::now() > deadline {
2124 panic!("bounded quic cluster route did not propagate");
2125 }
2126 tokio::time::sleep(Duration::from_millis(20)).await;
2127 }
2128 assert!(quic_b.can_route(canonical));
2129
2130 let remote = handle_b.remote_addr_for_path::<EchoRequest>(&actor_path);
2131 let ask_deadline = Instant::now() + Duration::from_secs(2);
2132 loop {
2133 match remote.ask(EchoRequest { value: 15 }).await {
2134 Ok(response) => {
2135 assert_eq!(response, EchoResponse { doubled: 30 });
2136 break;
2137 }
2138 Err(palladium_actor::AskError::Send(
2139 palladium_actor::SendError::ConnectionFailed,
2140 )) if Instant::now() <= ask_deadline => {
2141 tokio::time::sleep(Duration::from_millis(20)).await;
2142 }
2143 Err(err) => panic!("bounded cluster ask over quic: {err:?}"),
2144 }
2145 }
2146
2147 shutdown_a_tx.send(()).ok();
2148 shutdown_b_tx.send(()).ok();
2149 }
2150
2151 #[tokio::test]
2152 async fn bounded_cluster_declared_remote_actor_installs_canonical_quic_route() {
2153 if !quic_tests_available() {
2154 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2155 return;
2156 }
2157
2158 let (ca, ca_key) = make_ca();
2159 let tls = make_tls_config("engine-a.example", &ca, &ca_key);
2160 let engine = Engine::with_config(EngineConfig {
2161 engine_id: EngineId::new("engine-a.example"),
2162 ..Default::default()
2163 });
2164 let handle = engine.handle();
2165
2166 let mut cluster = match handle
2167 .attach_bounded_cluster(BoundedClusterConfig {
2168 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls)),
2169 peers: vec![PeerSpec {
2170 engine_id: EngineId::new("engine-b.example"),
2171 addr: "127.0.0.1:4100".parse().unwrap(),
2172 control_plane_addr: None,
2173 server_name: None,
2174 }],
2175 declared_actors: Vec::new(),
2176 roles: Vec::new(),
2177 })
2178 .await
2179 {
2180 Ok(cluster) => cluster,
2181 Err(ClusterError::Transport(err)) => {
2182 eprintln!("skipping quic bounded-cluster test: {err:?}");
2183 return;
2184 }
2185 Err(err) => panic!("attach bounded cluster: {err:?}"),
2186 };
2187
2188 let path = ActorPath::parse("/user/declared-remote-quic").unwrap();
2189 cluster
2190 .declare_remote_actor(EngineId::new("engine-b.example"), path.clone())
2191 .expect("declare remote actor");
2192
2193 assert!(cluster.can_route(&path));
2194 assert!(cluster
2195 .transport()
2196 .as_quic()
2197 .expect("quic transport")
2198 .can_route(AddrHash::new(&path, 0)));
2199 }
2200
2201 #[tokio::test]
2202 async fn bounded_cluster_spawn_remote_over_quic_returns_typed_handle() {
2203 if !quic_tests_available() {
2204 eprintln!("skipping quic bounded-cluster test: no rustls CryptoProvider configured");
2205 return;
2206 }
2207
2208 let actor_path = ActorPath::parse("/user/remote-spawn-quic").unwrap();
2209 let (ca, ca_key) = make_ca();
2210 let tls_a = make_tls_config("engine-a.example", &ca, &ca_key);
2211 let tls_b = make_tls_config("engine-b.example", &ca, &ca_key);
2212 let control_plane_b = reserve_udp_addr();
2213
2214 let engine_a = Engine::with_config(EngineConfig {
2215 engine_id: EngineId::new("engine-a.example"),
2216 ..Default::default()
2217 });
2218 let engine_b = Engine::with_config(EngineConfig {
2219 engine_id: EngineId::new("engine-b.example"),
2220 control_plane_quic_addr: Some(control_plane_b.to_string()),
2221 control_plane_tls: Some(tls_b.clone()),
2222 actor_spawn: Some(bounded_spawn_handler()),
2223 ..Default::default()
2224 });
2225
2226 let handle_a = engine_a.handle();
2227 let handle_b = engine_b.handle();
2228 handle_a
2229 .type_registry()
2230 .register_remote_ask::<EchoRequest>();
2231 handle_b
2232 .type_registry()
2233 .register_remote_ask::<EchoRequest>();
2234
2235 let cluster_b = match handle_b
2236 .attach_bounded_cluster(BoundedClusterConfig {
2237 transport: BoundedTransportConfig::Quic(quic_config("engine-b.example", tls_b)),
2238 peers: Vec::new(),
2239 declared_actors: Vec::new(),
2240 roles: Vec::new(),
2241 })
2242 .await
2243 {
2244 Ok(cluster) => cluster,
2245 Err(ClusterError::Transport(err)) => {
2246 eprintln!("skipping quic bounded-cluster test: {err:?}");
2247 return;
2248 }
2249 Err(err) => panic!("attach bounded cluster b: {err:?}"),
2250 };
2251 let quic_b = cluster_b
2252 .transport()
2253 .as_quic()
2254 .cloned()
2255 .expect("quic transport b");
2256
2257 let cluster_a = match handle_a
2258 .attach_bounded_cluster(BoundedClusterConfig {
2259 transport: BoundedTransportConfig::Quic(quic_config("engine-a.example", tls_a)),
2260 peers: vec![PeerSpec {
2261 engine_id: EngineId::new("engine-b.example"),
2262 addr: quic_b.local_addr(),
2263 control_plane_addr: Some(control_plane_b),
2264 server_name: None,
2265 }],
2266 declared_actors: Vec::new(),
2267 roles: vec![BoundedClusterRole {
2268 name: "context-service".to_string(),
2269 owner: EngineId::new("engine-b.example"),
2270 }],
2271 })
2272 .await
2273 {
2274 Ok(cluster) => cluster,
2275 Err(ClusterError::Transport(err)) => {
2276 eprintln!("skipping quic bounded-cluster test: {err:?}");
2277 return;
2278 }
2279 Err(err) => panic!("attach bounded cluster a: {err:?}"),
2280 };
2281 let quic_a = cluster_a
2282 .transport()
2283 .as_quic()
2284 .cloned()
2285 .expect("quic transport a");
2286
2287 let mut cluster_b = cluster_b;
2288 cluster_b
2289 .add_peer(PeerSpec {
2290 engine_id: EngineId::new("engine-a.example"),
2291 addr: quic_a.local_addr(),
2292 control_plane_addr: None,
2293 server_name: None,
2294 })
2295 .expect("add quic peer a to cluster b");
2296
2297 let (shutdown_a_tx, shutdown_a_rx) = tokio::sync::oneshot::channel::<()>();
2298 let (shutdown_b_tx, shutdown_b_rx) = tokio::sync::oneshot::channel::<()>();
2299
2300 std::thread::spawn(move || engine_a.run(shutdown_a_rx));
2301 std::thread::spawn(move || engine_b.run(shutdown_b_rx));
2302 tokio::time::sleep(Duration::from_millis(150)).await;
2303
2304 let remote = cluster_a
2305 .spawn_remote_on_role::<EchoRequest>(
2306 "context-service",
2307 RemoteSpawnSpec {
2308 path: actor_path.clone(),
2309 type_name: "bounded-echo".to_string(),
2310 config: None,
2311 },
2312 )
2313 .await
2314 .expect("spawn remote actor over quic by role");
2315
2316 let response = remote
2317 .ask(EchoRequest { value: 9 })
2318 .await
2319 .expect("bounded remote spawn ask over quic by role");
2320 assert_eq!(response, EchoResponse { doubled: 18 });
2321
2322 let unknown_role = cluster_a
2323 .spawn_remote_on_role::<EchoRequest>(
2324 "missing-role",
2325 RemoteSpawnSpec {
2326 path: ActorPath::parse("/user/missing-role").unwrap(),
2327 type_name: "bounded-echo".to_string(),
2328 config: None,
2329 },
2330 )
2331 .await
2332 .expect_err("unknown role should fail");
2333 assert_eq!(
2334 unknown_role,
2335 RemoteSpawnError::UnknownRole("missing-role".to_string())
2336 );
2337
2338 shutdown_a_tx.send(()).ok();
2339 shutdown_b_tx.send(()).ok();
2340 }
2341}