1use super::AuraEffectSystem;
2use async_trait::async_trait;
3use aura_core::effects::time::PhysicalTimeEffects;
4use aura_core::effects::transport::{
5 TransportEnvelope, TransportError, TransportStats, MAX_TRANSPORT_SIGNATURE_BYTES,
6};
7use aura_core::effects::TransportEffects;
8use aura_core::service::{LinkEndpoint, LinkProtocol, Route};
9#[cfg(not(target_arch = "wasm32"))]
10use aura_core::{execute_with_timeout_budget, TimeoutBudget, TimeoutRunError};
11use aura_core::{AuthorityId, ContextId};
12#[cfg(not(target_arch = "wasm32"))]
13use aura_effects::time::PhysicalTimeHandler;
14#[cfg(not(target_arch = "wasm32"))]
15use aura_effects::transport::TransportConfig;
16#[cfg(target_arch = "wasm32")]
17use base64::{engine::general_purpose::STANDARD, Engine};
18use cfg_if::cfg_if;
19#[cfg(not(target_arch = "wasm32"))]
20use futures::SinkExt;
21#[cfg(target_arch = "wasm32")]
22use futures::SinkExt;
23#[cfg(target_arch = "wasm32")]
24use gloo_net::websocket::{futures::WebSocket, Message};
25#[cfg(target_arch = "wasm32")]
26use serde::Serialize;
27#[cfg(target_arch = "wasm32")]
28use std::future::Future;
29#[cfg(not(target_arch = "wasm32"))]
30use std::net::SocketAddr;
31#[cfg(not(target_arch = "wasm32"))]
32use tokio::io::AsyncWriteExt;
33#[cfg(not(target_arch = "wasm32"))]
34use tokio::net::TcpStream;
35#[cfg(not(target_arch = "wasm32"))]
36use tokio_tungstenite::{connect_async, tungstenite::Message as TungsteniteMessage};
37#[cfg(not(target_arch = "wasm32"))]
38async fn execute_transport_timeout<F, Fut, T>(
39 timeout: std::time::Duration,
40 timeout_reason: impl Fn() -> TransportError + Copy,
41 f: F,
42) -> Result<T, TransportError>
43where
44 F: FnOnce() -> Fut,
45 Fut: std::future::Future<Output = Result<T, TransportError>>,
46{
47 let time = PhysicalTimeHandler::new();
48 let started_at = time.physical_time().await.map_err(|_| timeout_reason())?;
49 let budget = TimeoutBudget::from_start_and_timeout(&started_at, timeout)
50 .map_err(|_| timeout_reason())?;
51 execute_with_timeout_budget(&time, &budget, f)
52 .await
53 .map_err(|error| match error {
54 TimeoutRunError::Timeout(_) => timeout_reason(),
55 TimeoutRunError::Operation(error) => error,
56 })
57}
58#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
60#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
61impl TransportEffects for AuraEffectSystem {
62 async fn send_envelope(&self, envelope: TransportEnvelope) -> Result<(), TransportError> {
63 let now_ms = self
64 .time_effects()
65 .physical_time()
66 .await
67 .map(|time| time.ts_ms)
68 .unwrap_or(0);
69
70 let route = resolve_move_route(self, envelope.context, envelope.destination)
71 .await
72 .unwrap_or_else(|| fallback_direct_route(&envelope));
73
74 if let Some(move_manager) = self.move_manager() {
75 let batch = move_manager
76 .enqueue_for_delivery(envelope, route, now_ms, self)
77 .await
78 .map_err(|error| TransportError::ProtocolError {
79 details: error.to_string(),
80 })?;
81
82 for plan in batch {
83 let payload_len = plan.envelope.payload.len();
84 let context = plan.envelope.context;
85 let destination = plan.envelope.destination;
86 match send_planned_envelope(self, plan.envelope, &plan.route).await {
87 Ok(()) => {
88 self.transport.record_send(payload_len);
89 move_manager
90 .record_delivery_result(
91 plan.replay_marker,
92 context,
93 destination,
94 true,
95 now_ms,
96 )
97 .await;
98 }
99 Err(error) => {
100 self.transport.record_send_failure();
101 move_manager
102 .record_delivery_result(
103 plan.replay_marker,
104 context,
105 destination,
106 false,
107 now_ms,
108 )
109 .await;
110 return Err(error);
111 }
112 }
113 }
114 return Ok(());
115 }
116
117 let payload_len = envelope.payload.len();
118 let fallback_route = fallback_direct_route(&envelope);
119 match send_planned_envelope(self, envelope, &fallback_route).await {
120 Ok(()) => {
121 self.transport.record_send(payload_len);
122 Ok(())
123 }
124 Err(err) => {
125 self.transport.record_send_failure();
126 Err(err)
127 }
128 }
129 }
130
131 async fn receive_envelope(&self) -> Result<TransportEnvelope, TransportError> {
132 let self_device_id = self.config.device_id.to_string();
133 let inbox = self.transport.inbox();
134 let maybe = {
135 let mut inbox = inbox.write();
136 inbox
138 .iter()
139 .position(|env| {
140 let device_match = env
141 .metadata
142 .get("aura-destination-device-id")
143 .is_some_and(|dst| dst == &self_device_id);
144
145 if env.destination == self.authority_id {
146 return match env.metadata.get("aura-destination-device-id") {
147 Some(dst) => dst == &self_device_id,
148 None => true,
149 };
150 }
151
152 device_match
154 })
155 .map(|pos| inbox.remove(pos))
156 };
157
158 match maybe {
159 Some(env) => {
160 validate_inbound_transport_receipt(&env)?;
161 self.transport.record_receive();
162 Ok(env)
163 }
164 None => Err(TransportError::NoMessage),
165 }
166 }
167
168 async fn receive_envelope_from(
169 &self,
170 source: AuthorityId,
171 context: ContextId,
172 ) -> Result<TransportEnvelope, TransportError> {
173 let self_device_id = self.config.device_id.to_string();
174 let inbox = self.transport.inbox();
175 let maybe = {
176 let mut inbox = inbox.write();
177 inbox
179 .iter()
180 .position(|env| {
181 let device_match = env
182 .metadata
183 .get("aura-destination-device-id")
184 .is_some_and(|dst| dst == &self_device_id);
185
186 if env.destination == self.authority_id {
187 env.source == source
188 && env.context == context
189 && match env.metadata.get("aura-destination-device-id") {
190 Some(dst) => dst == &self_device_id,
191 None => true,
192 }
193 } else {
194 env.source == source && env.context == context && device_match
195 }
196 })
197 .map(|pos| inbox.remove(pos))
198 };
199
200 match maybe {
201 Some(env) => {
202 validate_inbound_transport_receipt(&env)?;
203 self.transport.record_receive();
204 Ok(env)
205 }
206 None => Err(TransportError::NoMessage),
207 }
208 }
209
210 async fn is_channel_established(&self, context: ContextId, peer: AuthorityId) -> bool {
211 if let Some(shared) = self.transport.shared_transport() {
212 return shared.is_peer_online(peer);
213 }
214 if let Some(manager) = self.rendezvous_manager() {
215 return manager.get_descriptor(context, peer).await.is_some();
216 }
217 false
218 }
219
220 async fn get_transport_stats(&self) -> TransportStats {
221 let mut stats = self.transport.stats_snapshot();
222
223 if let Some(shared) = self.transport.shared_transport() {
224 let active = shared.connected_peer_count(self.authority_id) as u32;
225 self.transport.set_active_channels(active);
226 stats.active_channels = active;
227 }
228
229 stats
230 }
231}
232
233async fn resolve_peer_addr(
234 effects: &AuraEffectSystem,
235 context: ContextId,
236 peer: AuthorityId,
237) -> Option<String> {
238 resolve_move_route(effects, context, peer)
239 .await
240 .and_then(|route| route_destination_addr(&route.destination))
241}
242
243async fn resolve_move_route(
244 effects: &AuraEffectSystem,
245 context: ContextId,
246 peer: AuthorityId,
247) -> Option<Route> {
248 let manager = effects.rendezvous_manager()?;
249 let descriptor = manager.get_descriptor(context, peer).await?;
250 descriptor
251 .advertised_move_paths()
252 .into_iter()
253 .map(|path| path.route)
254 .next()
255}
256
257async fn send_planned_envelope(
258 effects: &AuraEffectSystem,
259 envelope: TransportEnvelope,
260 _route: &Route,
261) -> Result<(), TransportError> {
262 if let Some(shared) = effects.transport.shared_transport() {
263 shared.route_envelope(envelope);
264 return Ok(());
265 }
266
267 let self_device_id = effects.config.device_id.to_string();
268 let destination_device_id = envelope.metadata.get("aura-destination-device-id");
269 let is_local = if envelope.destination == effects.authority_id {
270 match destination_device_id {
271 Some(dst) => dst == &self_device_id,
272 None => true,
273 }
274 } else {
275 destination_device_id.is_some_and(|dst| dst == &self_device_id)
276 };
277 if is_local {
278 effects.queue_runtime_envelope(envelope);
279 return Ok(());
280 }
281
282 #[cfg(target_arch = "wasm32")]
283 if let Some(url) = current_browser_harness_enqueue_url() {
284 send_harness_browser_envelope(&url, &envelope)?;
285 return Ok(());
286 }
287
288 let addr = resolve_peer_addr(effects, envelope.context, envelope.destination)
289 .await
290 .ok_or(TransportError::DestinationUnreachable {
291 destination: envelope.destination,
292 })?;
293 send_envelope_tcp(&addr, &envelope).await
294}
295
296fn fallback_direct_route(envelope: &TransportEnvelope) -> Route {
297 Route::direct(LinkEndpoint::direct(
298 LinkProtocol::Tcp,
299 format!("runtime://{}", envelope.destination),
300 ))
301}
302
303fn validate_inbound_transport_receipt(envelope: &TransportEnvelope) -> Result<(), TransportError> {
304 let Some(receipt) = envelope.receipt.as_ref() else {
305 return Ok(());
306 };
307
308 if receipt.context != envelope.context {
309 return Err(TransportError::ReceiptValidationFailed {
310 reason: "receipt context does not match envelope context".to_string(),
311 });
312 }
313 if receipt.src != envelope.source {
314 return Err(TransportError::ReceiptValidationFailed {
315 reason: "receipt source does not match envelope source".to_string(),
316 });
317 }
318 if receipt.dst != envelope.destination {
319 return Err(TransportError::ReceiptValidationFailed {
320 reason: "receipt destination does not match envelope destination".to_string(),
321 });
322 }
323 if receipt.sig.is_empty() {
324 return Err(TransportError::ReceiptValidationFailed {
325 reason: "receipt signature is missing".to_string(),
326 });
327 }
328 if receipt.sig.len() > MAX_TRANSPORT_SIGNATURE_BYTES {
329 return Err(TransportError::ReceiptValidationFailed {
330 reason: "receipt signature exceeds transport bound".to_string(),
331 });
332 }
333
334 Ok(())
335}
336
337fn route_destination_addr(endpoint: &LinkEndpoint) -> Option<String> {
338 match endpoint.protocol {
339 LinkProtocol::Tcp | LinkProtocol::WebSocket => {}
340 _ => return None,
341 }
342
343 #[cfg(target_arch = "wasm32")]
344 if endpoint.protocol == LinkProtocol::WebSocket {
345 return endpoint.address.clone();
346 }
347
348 #[cfg(not(target_arch = "wasm32"))]
349 if endpoint.protocol == LinkProtocol::WebSocket {
350 return endpoint
351 .address
352 .as_ref()
353 .map(|addr| format!("ws://{}", addr));
354 }
355
356 endpoint.address.clone()
357}
358
359async fn send_envelope_tcp(addr: &str, envelope: &TransportEnvelope) -> Result<(), TransportError> {
360 cfg_if! {
361 if #[cfg(target_arch = "wasm32")] {
362 let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
363 TransportError::SendFailed {
364 destination: envelope.destination,
365 reason: format!("Envelope serialization failed: {e}"),
366 }
367 })?;
368 let (url, use_harness_transport) = resolve_browser_transport_target(addr);
369 log_harness_mailbox_send(envelope, &url, use_harness_transport);
370 let wrapped_payload = if use_harness_transport {
371 Some(
372 serde_json::to_string(
373 &HarnessBrowserTransportEnvelope::from_parts(envelope, &payload),
374 )
375 .map_err(|e| TransportError::SendFailed {
376 destination: envelope.destination,
377 reason: format!("Harness browser transport encode failed: {e}"),
378 })?,
379 )
380 } else {
381 None
382 };
383
384 if let Some(wrapped) = wrapped_payload {
385 let window = web_sys::window().ok_or_else(|| TransportError::SendFailed {
386 destination: envelope.destination,
387 reason: "browser window unavailable for harness transport enqueue".to_string(),
388 })?;
389 let init = web_sys::RequestInit::new();
390 init.set_method("POST");
391 let body_value = wrapped.into();
392 init.set_body(&body_value);
393 let request = web_sys::Request::new_with_str_and_init(&url, &init).map_err(
394 |error| TransportError::SendFailed {
395 destination: envelope.destination,
396 reason: format!(
397 "Harness browser transport build failed ({url}): {error:?}"
398 ),
399 },
400 )?;
401 request
402 .headers()
403 .set("Content-Type", "application/json; charset=utf-8")
404 .map_err(|error| TransportError::SendFailed {
405 destination: envelope.destination,
406 reason: format!(
407 "Harness browser transport header failed ({url}): {error:?}"
408 ),
409 })?;
410 let _ = window.fetch_with_request(&request);
411 return Ok(());
412 }
413
414 run_local_ws(move || async move {
415 let mut ws = WebSocket::open(&url)
416 .map_err(|e| format!("WebSocket open failed ({url}): {e}"))?;
417 ws.send(Message::Bytes(payload))
418 .await
419 .map_err(|e| format!("WebSocket send failed ({url}): {e}"))?;
420 Ok(())
421 })
422 .await
423 .map_err(|reason| TransportError::SendFailed {
424 destination: envelope.destination,
425 reason,
426 })
427 } else {
428 let config = TransportConfig::default();
429 if addr.starts_with("ws://") || addr.starts_with("wss://") {
430 let (mut ws, _) = execute_transport_timeout(
431 config.connect_timeout.get(),
432 || TransportError::SendFailed {
433 destination: envelope.destination,
434 reason: "WebSocket connect timeout".to_string(),
435 },
436 || async {
437 connect_async(addr).await.map_err(|e| TransportError::SendFailed {
438 destination: envelope.destination,
439 reason: format!("WebSocket connect failed: {e}"),
440 })
441 },
442 )
443 .await?;
444
445 let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
446 TransportError::SendFailed {
447 destination: envelope.destination,
448 reason: format!("Envelope serialization failed: {e}"),
449 }
450 })?;
451
452 execute_transport_timeout(
453 config.write_timeout.get(),
454 || TransportError::SendFailed {
455 destination: envelope.destination,
456 reason: "WebSocket write timeout".to_string(),
457 },
458 || async {
459 ws.send(TungsteniteMessage::Binary(payload))
460 .await
461 .map_err(|e| TransportError::SendFailed {
462 destination: envelope.destination,
463 reason: format!("WebSocket send failed: {e}"),
464 })
465 },
466 )
467 .await?;
468
469 return Ok(());
470 }
471
472 let socket_addr: SocketAddr = addr.parse().map_err(|e| TransportError::SendFailed {
473 destination: envelope.destination,
474 reason: format!("Invalid transport address '{addr}': {e}"),
475 })?;
476
477 let mut stream = execute_transport_timeout(
478 config.connect_timeout.get(),
479 || TransportError::SendFailed {
480 destination: envelope.destination,
481 reason: "TCP connect timeout".to_string(),
482 },
483 || async {
484 TcpStream::connect(socket_addr)
485 .await
486 .map_err(|e| TransportError::SendFailed {
487 destination: envelope.destination,
488 reason: format!("TCP connect failed: {e}"),
489 })
490 },
491 )
492 .await?;
493
494 let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
495 TransportError::SendFailed {
496 destination: envelope.destination,
497 reason: format!("Envelope serialization failed: {e}"),
498 }
499 })?;
500
501 let len = (payload.len() as u32).to_be_bytes();
502 execute_transport_timeout(
503 config.write_timeout.get(),
504 || TransportError::SendFailed {
505 destination: envelope.destination,
506 reason: "TCP write timeout".to_string(),
507 },
508 || async {
509 stream
510 .write_all(&len)
511 .await
512 .map_err(|e| TransportError::SendFailed {
513 destination: envelope.destination,
514 reason: format!("TCP write failed: {e}"),
515 })
516 },
517 )
518 .await?;
519 execute_transport_timeout(
520 config.write_timeout.get(),
521 || TransportError::SendFailed {
522 destination: envelope.destination,
523 reason: "TCP write timeout".to_string(),
524 },
525 || async {
526 stream
527 .write_all(&payload)
528 .await
529 .map_err(|e| TransportError::SendFailed {
530 destination: envelope.destination,
531 reason: format!("TCP write failed: {e}"),
532 })
533 },
534 )
535 .await?;
536 execute_transport_timeout(
537 config.write_timeout.get(),
538 || TransportError::SendFailed {
539 destination: envelope.destination,
540 reason: "TCP flush timeout".to_string(),
541 },
542 || async {
543 stream.flush().await.map_err(|e| TransportError::SendFailed {
544 destination: envelope.destination,
545 reason: format!("TCP flush failed: {e}"),
546 })
547 },
548 )
549 .await?;
550
551 Ok(())
552 }
553 }
554}
555
556#[cfg(target_arch = "wasm32")]
557#[derive(Serialize)]
558struct HarnessBrowserTransportEnvelope<'a> {
559 kind: &'static str,
560 destination: String,
561 #[serde(skip_serializing_if = "Option::is_none")]
562 destination_device_id: Option<&'a str>,
563 envelope_b64: String,
564}
565
566#[cfg(target_arch = "wasm32")]
567const HARNESS_TRANSPORT_ENQUEUE_PATH: &str = "/__aura_harness_transport__/enqueue";
568
569#[cfg(target_arch = "wasm32")]
570impl<'a> HarnessBrowserTransportEnvelope<'a> {
571 fn from_parts(envelope: &'a TransportEnvelope, payload: &[u8]) -> Self {
572 Self {
573 kind: "transport_envelope",
574 destination: envelope.destination.to_string(),
575 destination_device_id: envelope
576 .metadata
577 .get("aura-destination-device-id")
578 .map(String::as_str),
579 envelope_b64: STANDARD.encode(payload),
580 }
581 }
582}
583
584#[cfg(any(test, target_arch = "wasm32"))]
585fn normalize_ws_url(addr: &str) -> String {
586 if addr.starts_with("ws://") || addr.starts_with("wss://") {
587 addr.to_string()
588 } else {
589 format!("ws://{addr}")
590 }
591}
592
593#[cfg(any(test, target_arch = "wasm32"))]
594fn harness_browser_transport_ws_url(current_host: &str, harness_mode: bool) -> Option<String> {
595 if !harness_mode || current_host.is_empty() {
596 return None;
597 }
598
599 Some(normalize_ws_url(current_host))
600}
601
602#[cfg(target_arch = "wasm32")]
603fn current_browser_location_and_harness_mode() -> Option<(String, String, bool)> {
604 let window = web_sys::window()?;
605 let search = window.location().search().ok()?;
606 let host = window.location().host().ok()?;
607 let origin = window.location().origin().ok()?;
608 let query = search.strip_prefix('?').unwrap_or(&search);
609 let harness_mode = query.split('&').any(|pair: &str| {
610 pair.split_once('=')
611 .is_some_and(|(key, value)| key == "__aura_harness_instance" && !value.is_empty())
612 });
613 Some((host, origin, harness_mode))
614}
615
616#[cfg(target_arch = "wasm32")]
617fn current_browser_harness_enqueue_url() -> Option<String> {
618 let (_host, origin, harness_mode) = current_browser_location_and_harness_mode()?;
619 if !harness_mode || origin.is_empty() {
620 return None;
621 }
622 Some(format!("{origin}{HARNESS_TRANSPORT_ENQUEUE_PATH}"))
623}
624
625#[cfg(target_arch = "wasm32")]
626fn send_harness_browser_envelope(
627 url: &str,
628 envelope: &TransportEnvelope,
629) -> Result<(), TransportError> {
630 let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
631 TransportError::SendFailed {
632 destination: envelope.destination,
633 reason: format!("Envelope serialization failed: {e}"),
634 }
635 })?;
636 let wrapped = serde_json::to_string(&HarnessBrowserTransportEnvelope::from_parts(
637 envelope, &payload,
638 ))
639 .map_err(|e| TransportError::SendFailed {
640 destination: envelope.destination,
641 reason: format!("Harness browser transport encode failed: {e}"),
642 })?;
643 let window = web_sys::window().ok_or_else(|| TransportError::SendFailed {
644 destination: envelope.destination,
645 reason: "browser window unavailable for harness transport enqueue".to_string(),
646 })?;
647 let init = web_sys::RequestInit::new();
648 init.set_method("POST");
649 let body_value = wrapped.into();
650 init.set_body(&body_value);
651 let request = web_sys::Request::new_with_str_and_init(url, &init).map_err(|error| {
652 TransportError::SendFailed {
653 destination: envelope.destination,
654 reason: format!("Harness browser transport build failed ({url}): {error:?}"),
655 }
656 })?;
657 request
658 .headers()
659 .set("Content-Type", "application/json; charset=utf-8")
660 .map_err(|error| TransportError::SendFailed {
661 destination: envelope.destination,
662 reason: format!("Harness browser transport header failed ({url}): {error:?}"),
663 })?;
664 log_harness_mailbox_send(envelope, url, true);
665 let _ = window.fetch_with_request(&request);
666 Ok(())
667}
668
669#[cfg(target_arch = "wasm32")]
670fn log_harness_mailbox_send(envelope: &TransportEnvelope, url: &str, use_harness_transport: bool) {
671 if !use_harness_transport {
672 return;
673 }
674
675 let content_type = envelope
676 .metadata
677 .get("content-type")
678 .map(String::as_str)
679 .unwrap_or("<missing>");
680 if content_type != "application/aura-invitation"
681 && content_type != "application/aura-invitation-acceptance+json"
682 {
683 return;
684 }
685
686 web_sys::console::log_1(
687 &format!(
688 "[web-harness-transport] mailbox_send destination={} context={} content_type={} via={}",
689 envelope.destination, envelope.context, content_type, url
690 )
691 .into(),
692 );
693}
694
695#[cfg(target_arch = "wasm32")]
696async fn run_local_ws<Mk, Fut>(make_fut: Mk) -> Result<(), String>
697where
698 Mk: FnOnce() -> Fut + 'static,
699 Fut: Future<Output = Result<(), String>> + 'static,
700{
701 make_fut().await
702}
703
704#[cfg(target_arch = "wasm32")]
705fn resolve_browser_transport_target(addr: &str) -> (String, bool) {
706 if let Some((host, _origin, harness_mode)) = current_browser_location_and_harness_mode() {
707 if let Some(enqueue_url) = current_browser_harness_enqueue_url() {
708 return (enqueue_url, true);
709 }
710 if let Some(harness_url) = harness_browser_transport_ws_url(&host, harness_mode) {
711 return (harness_url, true);
712 }
713 }
714
715 (normalize_ws_url(addr), false)
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721 use crate::core::default_context_id_for_authority;
722 use crate::core::AgentConfig;
723 use crate::runtime::services::{
724 MoveManager, MoveManagerConfig, RendezvousManager, RendezvousManagerConfig, RuntimeService,
725 RuntimeServiceContext, ServiceRegistry,
726 };
727 use crate::runtime::TaskSupervisor;
728 use aura_core::effects::transport::TransportEnvelope;
729 use aura_core::effects::TransportEffects;
730 use aura_rendezvous::{RendezvousDescriptor, TransportHint};
731 use std::collections::HashMap;
732 use std::sync::Arc;
733
734 fn descriptor(
735 authority_id: AuthorityId,
736 context_id: ContextId,
737 transport_hints: Vec<TransportHint>,
738 ) -> RendezvousDescriptor {
739 RendezvousDescriptor {
740 authority_id,
741 device_id: None,
742 context_id,
743 transport_hints,
744 handshake_psk_commitment: [0u8; 32],
745 public_key: [0u8; 32],
746 valid_from: 1,
747 valid_until: u64::MAX,
748 nonce: [0u8; 32],
749 nickname_suggestion: None,
750 }
751 }
752
753 #[test]
754 fn harness_browser_transport_uses_page_host_when_enabled() {
755 assert_eq!(
756 harness_browser_transport_ws_url("127.0.0.1:4173", true).as_deref(),
757 Some("ws://127.0.0.1:4173")
758 );
759 assert_eq!(
760 harness_browser_transport_ws_url("127.0.0.1:4173", false),
761 None
762 );
763 assert_eq!(harness_browser_transport_ws_url("", true), None);
764 }
765
766 #[tokio::test]
767 async fn resolve_peer_addr_does_not_fall_back_across_contexts() {
768 let authority = AuthorityId::new_from_entropy([210u8; 32]);
769 let peer = AuthorityId::new_from_entropy([211u8; 32]);
770 let primary_context = ContextId::new_from_entropy([212u8; 32]);
771 let fallback_context = default_context_id_for_authority(peer);
772
773 let config = AgentConfig::default();
774 let effects =
775 AuraEffectSystem::simulation_for_test_for_authority(&config, authority).unwrap();
776 let manager = RendezvousManager::new_with_default_udp(
777 authority,
778 RendezvousManagerConfig::default(),
779 Arc::new(effects.time_effects().clone()),
780 );
781 effects.attach_rendezvous_manager(manager.clone());
782 let service_context = RuntimeServiceContext::new(
783 Arc::new(TaskSupervisor::new()),
784 Arc::new(effects.time_effects().clone()),
785 );
786 RuntimeService::start(&manager, &service_context)
787 .await
788 .unwrap();
789
790 manager
791 .cache_descriptor(descriptor(
792 peer,
793 primary_context,
794 vec![TransportHint::quic_direct("127.0.0.1:55001").unwrap()],
795 ))
796 .await
797 .unwrap();
798
799 manager
800 .cache_descriptor(descriptor(
801 peer,
802 fallback_context,
803 vec![TransportHint::tcp_direct("127.0.0.1:55002").unwrap()],
804 ))
805 .await
806 .unwrap();
807
808 let resolved = resolve_peer_addr(&effects, primary_context, peer).await;
809 assert!(resolved.is_none());
810 RuntimeService::stop(&manager).await.unwrap();
811 }
812
813 #[tokio::test]
814 async fn move_passthrough_preserves_opaque_envelope_delivery() {
815 let config = AgentConfig::default();
816 let sender = AuthorityId::new_from_entropy([220u8; 32]);
817 let receiver = AuthorityId::new_from_entropy([221u8; 32]);
818 let context = ContextId::new_from_entropy([222u8; 32]);
819
820 let plain_shared = crate::runtime::SharedTransport::new();
821 let plain_sender =
822 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
823 &config,
824 sender,
825 plain_shared.clone(),
826 )
827 .unwrap();
828 let plain_receiver =
829 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
830 &config,
831 receiver,
832 plain_shared,
833 )
834 .unwrap();
835
836 let mut metadata = HashMap::new();
837 metadata.insert(
838 "content-type".to_string(),
839 "application/aura-opaque-object".to_string(),
840 );
841 let envelope = TransportEnvelope {
842 destination: receiver,
843 source: sender,
844 context,
845 payload: vec![9, 4, 2, 7, 1],
846 metadata,
847 receipt: None,
848 };
849
850 plain_sender.send_envelope(envelope.clone()).await.unwrap();
851 let baseline = plain_receiver.receive_envelope().await.unwrap();
852
853 let move_shared = crate::runtime::SharedTransport::new();
854 let move_sender =
855 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
856 &config,
857 sender,
858 move_shared.clone(),
859 )
860 .unwrap();
861 let move_receiver =
862 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
863 &config,
864 receiver,
865 move_shared,
866 )
867 .unwrap();
868 let move_manager = MoveManager::new(
869 MoveManagerConfig::for_testing(),
870 Arc::new(ServiceRegistry::new()),
871 );
872 move_sender.attach_move_manager(move_manager.clone());
873
874 move_sender.send_envelope(envelope.clone()).await.unwrap();
875 let migrated = move_receiver.receive_envelope().await.unwrap();
876
877 assert_eq!(baseline.destination, envelope.destination);
878 assert_eq!(baseline.source, envelope.source);
879 assert_eq!(baseline.context, envelope.context);
880 assert_eq!(baseline.payload, envelope.payload);
881 assert_eq!(baseline.metadata, envelope.metadata);
882 assert!(baseline.receipt.is_none());
883
884 assert_eq!(migrated.destination, baseline.destination);
885 assert_eq!(migrated.source, baseline.source);
886 assert_eq!(migrated.context, baseline.context);
887 assert_eq!(migrated.payload, baseline.payload);
888 assert_eq!(migrated.metadata, baseline.metadata);
889 assert!(migrated.receipt.is_none());
890
891 let projection = move_manager.projection().await;
892 assert_eq!(projection.queued_envelopes, 0);
893 assert_eq!(projection.replay_window_entries, 0);
894 }
895
896 #[tokio::test]
897 async fn receive_envelope_rejects_receipt_binding_mismatch() {
898 let config = AgentConfig::default();
899 let sender = AuthorityId::new_from_entropy([230u8; 32]);
900 let receiver = AuthorityId::new_from_entropy([231u8; 32]);
901 let context = ContextId::new_from_entropy([232u8; 32]);
902
903 let shared = crate::runtime::SharedTransport::new();
904 let sender_effects =
905 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
906 &config,
907 sender,
908 shared.clone(),
909 )
910 .unwrap();
911 let receiver_effects =
912 AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
913 &config, receiver, shared,
914 )
915 .unwrap();
916
917 let envelope = TransportEnvelope {
918 destination: receiver,
919 source: sender,
920 context,
921 payload: vec![1, 2, 3],
922 metadata: HashMap::new(),
923 receipt: Some(aura_core::effects::transport::TransportReceipt {
924 context: ContextId::new_from_entropy([233u8; 32]),
925 src: sender,
926 dst: receiver,
927 epoch: 1,
928 cost: 1,
929 nonce: 7,
930 prev: [0u8; 32],
931 sig: vec![0xAA],
932 }),
933 };
934
935 sender_effects.send_envelope(envelope).await.unwrap();
936 let error = receiver_effects
937 .receive_envelope()
938 .await
939 .expect_err("receipt binding mismatch must fail closed");
940 assert!(matches!(
941 error,
942 TransportError::ReceiptValidationFailed { .. }
943 ));
944 }
945}