Skip to main content

aura_agent/runtime/effects/
transport.rs

1use super::AuraEffectSystem;
2use async_trait::async_trait;
3use aura_core::effects::time::PhysicalTimeEffects;
4use aura_core::effects::transport::{
5    TransportEnvelope, TransportError, TransportStats, MAX_TRANSPORT_SIGNATURE_BYTES,
6};
7use aura_core::effects::TransportEffects;
8use aura_core::service::{LinkEndpoint, LinkProtocol, Route};
9#[cfg(not(target_arch = "wasm32"))]
10use aura_core::{execute_with_timeout_budget, TimeoutBudget, TimeoutRunError};
11use aura_core::{AuthorityId, ContextId};
12#[cfg(not(target_arch = "wasm32"))]
13use aura_effects::time::PhysicalTimeHandler;
14#[cfg(not(target_arch = "wasm32"))]
15use aura_effects::transport::TransportConfig;
16#[cfg(target_arch = "wasm32")]
17use base64::{engine::general_purpose::STANDARD, Engine};
18use cfg_if::cfg_if;
19#[cfg(not(target_arch = "wasm32"))]
20use futures::SinkExt;
21#[cfg(target_arch = "wasm32")]
22use futures::SinkExt;
23#[cfg(target_arch = "wasm32")]
24use gloo_net::websocket::{futures::WebSocket, Message};
25#[cfg(target_arch = "wasm32")]
26use serde::Serialize;
27#[cfg(target_arch = "wasm32")]
28use std::future::Future;
29#[cfg(not(target_arch = "wasm32"))]
30use std::net::SocketAddr;
31#[cfg(not(target_arch = "wasm32"))]
32use tokio::io::AsyncWriteExt;
33#[cfg(not(target_arch = "wasm32"))]
34use tokio::net::TcpStream;
35#[cfg(not(target_arch = "wasm32"))]
36use tokio_tungstenite::{connect_async, tungstenite::Message as TungsteniteMessage};
37#[cfg(not(target_arch = "wasm32"))]
38async fn execute_transport_timeout<F, Fut, T>(
39    timeout: std::time::Duration,
40    timeout_reason: impl Fn() -> TransportError + Copy,
41    f: F,
42) -> Result<T, TransportError>
43where
44    F: FnOnce() -> Fut,
45    Fut: std::future::Future<Output = Result<T, TransportError>>,
46{
47    let time = PhysicalTimeHandler::new();
48    let started_at = time.physical_time().await.map_err(|_| timeout_reason())?;
49    let budget = TimeoutBudget::from_start_and_timeout(&started_at, timeout)
50        .map_err(|_| timeout_reason())?;
51    execute_with_timeout_budget(&time, &budget, f)
52        .await
53        .map_err(|error| match error {
54            TimeoutRunError::Timeout(_) => timeout_reason(),
55            TimeoutRunError::Operation(error) => error,
56        })
57}
58// Implementation of TransportEffects
59#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
60#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
61impl TransportEffects for AuraEffectSystem {
62    async fn send_envelope(&self, envelope: TransportEnvelope) -> Result<(), TransportError> {
63        let now_ms = self
64            .time_effects()
65            .physical_time()
66            .await
67            .map(|time| time.ts_ms)
68            .unwrap_or(0);
69
70        let route = resolve_move_route(self, envelope.context, envelope.destination)
71            .await
72            .unwrap_or_else(|| fallback_direct_route(&envelope));
73
74        if let Some(move_manager) = self.move_manager() {
75            let batch = move_manager
76                .enqueue_for_delivery(envelope, route, now_ms, self)
77                .await
78                .map_err(|error| TransportError::ProtocolError {
79                    details: error.to_string(),
80                })?;
81
82            for plan in batch {
83                let payload_len = plan.envelope.payload.len();
84                let context = plan.envelope.context;
85                let destination = plan.envelope.destination;
86                match send_planned_envelope(self, plan.envelope, &plan.route).await {
87                    Ok(()) => {
88                        self.transport.record_send(payload_len);
89                        move_manager
90                            .record_delivery_result(
91                                plan.replay_marker,
92                                context,
93                                destination,
94                                true,
95                                now_ms,
96                            )
97                            .await;
98                    }
99                    Err(error) => {
100                        self.transport.record_send_failure();
101                        move_manager
102                            .record_delivery_result(
103                                plan.replay_marker,
104                                context,
105                                destination,
106                                false,
107                                now_ms,
108                            )
109                            .await;
110                        return Err(error);
111                    }
112                }
113            }
114            return Ok(());
115        }
116
117        let payload_len = envelope.payload.len();
118        let fallback_route = fallback_direct_route(&envelope);
119        match send_planned_envelope(self, envelope, &fallback_route).await {
120            Ok(()) => {
121                self.transport.record_send(payload_len);
122                Ok(())
123            }
124            Err(err) => {
125                self.transport.record_send_failure();
126                Err(err)
127            }
128        }
129    }
130
131    async fn receive_envelope(&self) -> Result<TransportEnvelope, TransportError> {
132        let self_device_id = self.config.device_id.to_string();
133        let inbox = self.transport.inbox();
134        let maybe = {
135            let mut inbox = inbox.write();
136            // In shared transport mode, filter by destination (this agent's authority ID)
137            inbox
138                .iter()
139                .position(|env| {
140                    let device_match = env
141                        .metadata
142                        .get("aura-destination-device-id")
143                        .is_some_and(|dst| dst == &self_device_id);
144
145                    if env.destination == self.authority_id {
146                        return match env.metadata.get("aura-destination-device-id") {
147                            Some(dst) => dst == &self_device_id,
148                            None => true,
149                        };
150                    }
151
152                    // Allow device-targeted envelopes for other authorities (multi-authority devices).
153                    device_match
154                })
155                .map(|pos| inbox.remove(pos))
156        };
157
158        match maybe {
159            Some(env) => {
160                validate_inbound_transport_receipt(&env)?;
161                self.transport.record_receive();
162                Ok(env)
163            }
164            None => Err(TransportError::NoMessage),
165        }
166    }
167
168    async fn receive_envelope_from(
169        &self,
170        source: AuthorityId,
171        context: ContextId,
172    ) -> Result<TransportEnvelope, TransportError> {
173        let self_device_id = self.config.device_id.to_string();
174        let inbox = self.transport.inbox();
175        let maybe = {
176            let mut inbox = inbox.write();
177            // In shared transport mode, filter by destination AND source/context
178            inbox
179                .iter()
180                .position(|env| {
181                    let device_match = env
182                        .metadata
183                        .get("aura-destination-device-id")
184                        .is_some_and(|dst| dst == &self_device_id);
185
186                    if env.destination == self.authority_id {
187                        env.source == source
188                            && env.context == context
189                            && match env.metadata.get("aura-destination-device-id") {
190                                Some(dst) => dst == &self_device_id,
191                                None => true,
192                            }
193                    } else {
194                        env.source == source && env.context == context && device_match
195                    }
196                })
197                .map(|pos| inbox.remove(pos))
198        };
199
200        match maybe {
201            Some(env) => {
202                validate_inbound_transport_receipt(&env)?;
203                self.transport.record_receive();
204                Ok(env)
205            }
206            None => Err(TransportError::NoMessage),
207        }
208    }
209
210    async fn is_channel_established(&self, context: ContextId, peer: AuthorityId) -> bool {
211        if let Some(shared) = self.transport.shared_transport() {
212            return shared.is_peer_online(peer);
213        }
214        if let Some(manager) = self.rendezvous_manager() {
215            return manager.get_descriptor(context, peer).await.is_some();
216        }
217        false
218    }
219
220    async fn get_transport_stats(&self) -> TransportStats {
221        let mut stats = self.transport.stats_snapshot();
222
223        if let Some(shared) = self.transport.shared_transport() {
224            let active = shared.connected_peer_count(self.authority_id) as u32;
225            self.transport.set_active_channels(active);
226            stats.active_channels = active;
227        }
228
229        stats
230    }
231}
232
233async fn resolve_peer_addr(
234    effects: &AuraEffectSystem,
235    context: ContextId,
236    peer: AuthorityId,
237) -> Option<String> {
238    resolve_move_route(effects, context, peer)
239        .await
240        .and_then(|route| route_destination_addr(&route.destination))
241}
242
243async fn resolve_move_route(
244    effects: &AuraEffectSystem,
245    context: ContextId,
246    peer: AuthorityId,
247) -> Option<Route> {
248    let manager = effects.rendezvous_manager()?;
249    let descriptor = manager.get_descriptor(context, peer).await?;
250    descriptor
251        .advertised_move_paths()
252        .into_iter()
253        .map(|path| path.route)
254        .next()
255}
256
257async fn send_planned_envelope(
258    effects: &AuraEffectSystem,
259    envelope: TransportEnvelope,
260    _route: &Route,
261) -> Result<(), TransportError> {
262    if let Some(shared) = effects.transport.shared_transport() {
263        shared.route_envelope(envelope);
264        return Ok(());
265    }
266
267    let self_device_id = effects.config.device_id.to_string();
268    let destination_device_id = envelope.metadata.get("aura-destination-device-id");
269    let is_local = if envelope.destination == effects.authority_id {
270        match destination_device_id {
271            Some(dst) => dst == &self_device_id,
272            None => true,
273        }
274    } else {
275        destination_device_id.is_some_and(|dst| dst == &self_device_id)
276    };
277    if is_local {
278        effects.queue_runtime_envelope(envelope);
279        return Ok(());
280    }
281
282    #[cfg(target_arch = "wasm32")]
283    if let Some(url) = current_browser_harness_enqueue_url() {
284        send_harness_browser_envelope(&url, &envelope)?;
285        return Ok(());
286    }
287
288    let addr = resolve_peer_addr(effects, envelope.context, envelope.destination)
289        .await
290        .ok_or(TransportError::DestinationUnreachable {
291            destination: envelope.destination,
292        })?;
293    send_envelope_tcp(&addr, &envelope).await
294}
295
296fn fallback_direct_route(envelope: &TransportEnvelope) -> Route {
297    Route::direct(LinkEndpoint::direct(
298        LinkProtocol::Tcp,
299        format!("runtime://{}", envelope.destination),
300    ))
301}
302
303fn validate_inbound_transport_receipt(envelope: &TransportEnvelope) -> Result<(), TransportError> {
304    let Some(receipt) = envelope.receipt.as_ref() else {
305        return Ok(());
306    };
307
308    if receipt.context != envelope.context {
309        return Err(TransportError::ReceiptValidationFailed {
310            reason: "receipt context does not match envelope context".to_string(),
311        });
312    }
313    if receipt.src != envelope.source {
314        return Err(TransportError::ReceiptValidationFailed {
315            reason: "receipt source does not match envelope source".to_string(),
316        });
317    }
318    if receipt.dst != envelope.destination {
319        return Err(TransportError::ReceiptValidationFailed {
320            reason: "receipt destination does not match envelope destination".to_string(),
321        });
322    }
323    if receipt.sig.is_empty() {
324        return Err(TransportError::ReceiptValidationFailed {
325            reason: "receipt signature is missing".to_string(),
326        });
327    }
328    if receipt.sig.len() > MAX_TRANSPORT_SIGNATURE_BYTES {
329        return Err(TransportError::ReceiptValidationFailed {
330            reason: "receipt signature exceeds transport bound".to_string(),
331        });
332    }
333
334    Ok(())
335}
336
337fn route_destination_addr(endpoint: &LinkEndpoint) -> Option<String> {
338    match endpoint.protocol {
339        LinkProtocol::Tcp | LinkProtocol::WebSocket => {}
340        _ => return None,
341    }
342
343    #[cfg(target_arch = "wasm32")]
344    if endpoint.protocol == LinkProtocol::WebSocket {
345        return endpoint.address.clone();
346    }
347
348    #[cfg(not(target_arch = "wasm32"))]
349    if endpoint.protocol == LinkProtocol::WebSocket {
350        return endpoint
351            .address
352            .as_ref()
353            .map(|addr| format!("ws://{}", addr));
354    }
355
356    endpoint.address.clone()
357}
358
359async fn send_envelope_tcp(addr: &str, envelope: &TransportEnvelope) -> Result<(), TransportError> {
360    cfg_if! {
361        if #[cfg(target_arch = "wasm32")] {
362            let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
363                TransportError::SendFailed {
364                    destination: envelope.destination,
365                    reason: format!("Envelope serialization failed: {e}"),
366                }
367            })?;
368            let (url, use_harness_transport) = resolve_browser_transport_target(addr);
369            log_harness_mailbox_send(envelope, &url, use_harness_transport);
370            let wrapped_payload = if use_harness_transport {
371                Some(
372                    serde_json::to_string(
373                        &HarnessBrowserTransportEnvelope::from_parts(envelope, &payload),
374                    )
375                    .map_err(|e| TransportError::SendFailed {
376                        destination: envelope.destination,
377                        reason: format!("Harness browser transport encode failed: {e}"),
378                    })?,
379                )
380            } else {
381                None
382            };
383
384            if let Some(wrapped) = wrapped_payload {
385                let window = web_sys::window().ok_or_else(|| TransportError::SendFailed {
386                    destination: envelope.destination,
387                    reason: "browser window unavailable for harness transport enqueue".to_string(),
388                })?;
389                let init = web_sys::RequestInit::new();
390                init.set_method("POST");
391                let body_value = wrapped.into();
392                init.set_body(&body_value);
393                let request = web_sys::Request::new_with_str_and_init(&url, &init).map_err(
394                    |error| TransportError::SendFailed {
395                        destination: envelope.destination,
396                        reason: format!(
397                            "Harness browser transport build failed ({url}): {error:?}"
398                        ),
399                    },
400                )?;
401                request
402                    .headers()
403                    .set("Content-Type", "application/json; charset=utf-8")
404                    .map_err(|error| TransportError::SendFailed {
405                        destination: envelope.destination,
406                        reason: format!(
407                            "Harness browser transport header failed ({url}): {error:?}"
408                        ),
409                    })?;
410                let _ = window.fetch_with_request(&request);
411                return Ok(());
412            }
413
414            run_local_ws(move || async move {
415                let mut ws = WebSocket::open(&url)
416                    .map_err(|e| format!("WebSocket open failed ({url}): {e}"))?;
417                ws.send(Message::Bytes(payload))
418                    .await
419                    .map_err(|e| format!("WebSocket send failed ({url}): {e}"))?;
420                Ok(())
421            })
422            .await
423            .map_err(|reason| TransportError::SendFailed {
424                destination: envelope.destination,
425                reason,
426            })
427        } else {
428            let config = TransportConfig::default();
429            if addr.starts_with("ws://") || addr.starts_with("wss://") {
430                let (mut ws, _) = execute_transport_timeout(
431                    config.connect_timeout.get(),
432                    || TransportError::SendFailed {
433                        destination: envelope.destination,
434                        reason: "WebSocket connect timeout".to_string(),
435                    },
436                    || async {
437                        connect_async(addr).await.map_err(|e| TransportError::SendFailed {
438                            destination: envelope.destination,
439                            reason: format!("WebSocket connect failed: {e}"),
440                        })
441                    },
442                )
443                .await?;
444
445                let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
446                    TransportError::SendFailed {
447                        destination: envelope.destination,
448                        reason: format!("Envelope serialization failed: {e}"),
449                    }
450                })?;
451
452                execute_transport_timeout(
453                    config.write_timeout.get(),
454                    || TransportError::SendFailed {
455                        destination: envelope.destination,
456                        reason: "WebSocket write timeout".to_string(),
457                    },
458                    || async {
459                        ws.send(TungsteniteMessage::Binary(payload))
460                            .await
461                            .map_err(|e| TransportError::SendFailed {
462                                destination: envelope.destination,
463                                reason: format!("WebSocket send failed: {e}"),
464                            })
465                    },
466                )
467                .await?;
468
469                return Ok(());
470            }
471
472            let socket_addr: SocketAddr = addr.parse().map_err(|e| TransportError::SendFailed {
473                destination: envelope.destination,
474                reason: format!("Invalid transport address '{addr}': {e}"),
475            })?;
476
477            let mut stream = execute_transport_timeout(
478                config.connect_timeout.get(),
479                || TransportError::SendFailed {
480                    destination: envelope.destination,
481                    reason: "TCP connect timeout".to_string(),
482                },
483                || async {
484                    TcpStream::connect(socket_addr)
485                        .await
486                        .map_err(|e| TransportError::SendFailed {
487                            destination: envelope.destination,
488                            reason: format!("TCP connect failed: {e}"),
489                        })
490                },
491            )
492            .await?;
493
494            let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
495                TransportError::SendFailed {
496                    destination: envelope.destination,
497                    reason: format!("Envelope serialization failed: {e}"),
498                }
499            })?;
500
501            let len = (payload.len() as u32).to_be_bytes();
502            execute_transport_timeout(
503                config.write_timeout.get(),
504                || TransportError::SendFailed {
505                    destination: envelope.destination,
506                    reason: "TCP write timeout".to_string(),
507                },
508                || async {
509                    stream
510                        .write_all(&len)
511                        .await
512                        .map_err(|e| TransportError::SendFailed {
513                            destination: envelope.destination,
514                            reason: format!("TCP write failed: {e}"),
515                        })
516                },
517            )
518            .await?;
519            execute_transport_timeout(
520                config.write_timeout.get(),
521                || TransportError::SendFailed {
522                    destination: envelope.destination,
523                    reason: "TCP write timeout".to_string(),
524                },
525                || async {
526                    stream
527                        .write_all(&payload)
528                        .await
529                        .map_err(|e| TransportError::SendFailed {
530                            destination: envelope.destination,
531                            reason: format!("TCP write failed: {e}"),
532                        })
533                },
534            )
535            .await?;
536            execute_transport_timeout(
537                config.write_timeout.get(),
538                || TransportError::SendFailed {
539                    destination: envelope.destination,
540                    reason: "TCP flush timeout".to_string(),
541                },
542                || async {
543                    stream.flush().await.map_err(|e| TransportError::SendFailed {
544                        destination: envelope.destination,
545                        reason: format!("TCP flush failed: {e}"),
546                    })
547                },
548            )
549            .await?;
550
551            Ok(())
552        }
553    }
554}
555
556#[cfg(target_arch = "wasm32")]
557#[derive(Serialize)]
558struct HarnessBrowserTransportEnvelope<'a> {
559    kind: &'static str,
560    destination: String,
561    #[serde(skip_serializing_if = "Option::is_none")]
562    destination_device_id: Option<&'a str>,
563    envelope_b64: String,
564}
565
566#[cfg(target_arch = "wasm32")]
567const HARNESS_TRANSPORT_ENQUEUE_PATH: &str = "/__aura_harness_transport__/enqueue";
568
569#[cfg(target_arch = "wasm32")]
570impl<'a> HarnessBrowserTransportEnvelope<'a> {
571    fn from_parts(envelope: &'a TransportEnvelope, payload: &[u8]) -> Self {
572        Self {
573            kind: "transport_envelope",
574            destination: envelope.destination.to_string(),
575            destination_device_id: envelope
576                .metadata
577                .get("aura-destination-device-id")
578                .map(String::as_str),
579            envelope_b64: STANDARD.encode(payload),
580        }
581    }
582}
583
584#[cfg(any(test, target_arch = "wasm32"))]
585fn normalize_ws_url(addr: &str) -> String {
586    if addr.starts_with("ws://") || addr.starts_with("wss://") {
587        addr.to_string()
588    } else {
589        format!("ws://{addr}")
590    }
591}
592
593#[cfg(any(test, target_arch = "wasm32"))]
594fn harness_browser_transport_ws_url(current_host: &str, harness_mode: bool) -> Option<String> {
595    if !harness_mode || current_host.is_empty() {
596        return None;
597    }
598
599    Some(normalize_ws_url(current_host))
600}
601
602#[cfg(target_arch = "wasm32")]
603fn current_browser_location_and_harness_mode() -> Option<(String, String, bool)> {
604    let window = web_sys::window()?;
605    let search = window.location().search().ok()?;
606    let host = window.location().host().ok()?;
607    let origin = window.location().origin().ok()?;
608    let query = search.strip_prefix('?').unwrap_or(&search);
609    let harness_mode = query.split('&').any(|pair: &str| {
610        pair.split_once('=')
611            .is_some_and(|(key, value)| key == "__aura_harness_instance" && !value.is_empty())
612    });
613    Some((host, origin, harness_mode))
614}
615
616#[cfg(target_arch = "wasm32")]
617fn current_browser_harness_enqueue_url() -> Option<String> {
618    let (_host, origin, harness_mode) = current_browser_location_and_harness_mode()?;
619    if !harness_mode || origin.is_empty() {
620        return None;
621    }
622    Some(format!("{origin}{HARNESS_TRANSPORT_ENQUEUE_PATH}"))
623}
624
625#[cfg(target_arch = "wasm32")]
626fn send_harness_browser_envelope(
627    url: &str,
628    envelope: &TransportEnvelope,
629) -> Result<(), TransportError> {
630    let payload = aura_core::util::serialization::to_vec(envelope).map_err(|e| {
631        TransportError::SendFailed {
632            destination: envelope.destination,
633            reason: format!("Envelope serialization failed: {e}"),
634        }
635    })?;
636    let wrapped = serde_json::to_string(&HarnessBrowserTransportEnvelope::from_parts(
637        envelope, &payload,
638    ))
639    .map_err(|e| TransportError::SendFailed {
640        destination: envelope.destination,
641        reason: format!("Harness browser transport encode failed: {e}"),
642    })?;
643    let window = web_sys::window().ok_or_else(|| TransportError::SendFailed {
644        destination: envelope.destination,
645        reason: "browser window unavailable for harness transport enqueue".to_string(),
646    })?;
647    let init = web_sys::RequestInit::new();
648    init.set_method("POST");
649    let body_value = wrapped.into();
650    init.set_body(&body_value);
651    let request = web_sys::Request::new_with_str_and_init(url, &init).map_err(|error| {
652        TransportError::SendFailed {
653            destination: envelope.destination,
654            reason: format!("Harness browser transport build failed ({url}): {error:?}"),
655        }
656    })?;
657    request
658        .headers()
659        .set("Content-Type", "application/json; charset=utf-8")
660        .map_err(|error| TransportError::SendFailed {
661            destination: envelope.destination,
662            reason: format!("Harness browser transport header failed ({url}): {error:?}"),
663        })?;
664    log_harness_mailbox_send(envelope, url, true);
665    let _ = window.fetch_with_request(&request);
666    Ok(())
667}
668
669#[cfg(target_arch = "wasm32")]
670fn log_harness_mailbox_send(envelope: &TransportEnvelope, url: &str, use_harness_transport: bool) {
671    if !use_harness_transport {
672        return;
673    }
674
675    let content_type = envelope
676        .metadata
677        .get("content-type")
678        .map(String::as_str)
679        .unwrap_or("<missing>");
680    if content_type != "application/aura-invitation"
681        && content_type != "application/aura-invitation-acceptance+json"
682    {
683        return;
684    }
685
686    web_sys::console::log_1(
687        &format!(
688            "[web-harness-transport] mailbox_send destination={} context={} content_type={} via={}",
689            envelope.destination, envelope.context, content_type, url
690        )
691        .into(),
692    );
693}
694
695#[cfg(target_arch = "wasm32")]
696async fn run_local_ws<Mk, Fut>(make_fut: Mk) -> Result<(), String>
697where
698    Mk: FnOnce() -> Fut + 'static,
699    Fut: Future<Output = Result<(), String>> + 'static,
700{
701    make_fut().await
702}
703
704#[cfg(target_arch = "wasm32")]
705fn resolve_browser_transport_target(addr: &str) -> (String, bool) {
706    if let Some((host, _origin, harness_mode)) = current_browser_location_and_harness_mode() {
707        if let Some(enqueue_url) = current_browser_harness_enqueue_url() {
708            return (enqueue_url, true);
709        }
710        if let Some(harness_url) = harness_browser_transport_ws_url(&host, harness_mode) {
711            return (harness_url, true);
712        }
713    }
714
715    (normalize_ws_url(addr), false)
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721    use crate::core::default_context_id_for_authority;
722    use crate::core::AgentConfig;
723    use crate::runtime::services::{
724        MoveManager, MoveManagerConfig, RendezvousManager, RendezvousManagerConfig, RuntimeService,
725        RuntimeServiceContext, ServiceRegistry,
726    };
727    use crate::runtime::TaskSupervisor;
728    use aura_core::effects::transport::TransportEnvelope;
729    use aura_core::effects::TransportEffects;
730    use aura_rendezvous::{RendezvousDescriptor, TransportHint};
731    use std::collections::HashMap;
732    use std::sync::Arc;
733
734    fn descriptor(
735        authority_id: AuthorityId,
736        context_id: ContextId,
737        transport_hints: Vec<TransportHint>,
738    ) -> RendezvousDescriptor {
739        RendezvousDescriptor {
740            authority_id,
741            device_id: None,
742            context_id,
743            transport_hints,
744            handshake_psk_commitment: [0u8; 32],
745            public_key: [0u8; 32],
746            valid_from: 1,
747            valid_until: u64::MAX,
748            nonce: [0u8; 32],
749            nickname_suggestion: None,
750        }
751    }
752
753    #[test]
754    fn harness_browser_transport_uses_page_host_when_enabled() {
755        assert_eq!(
756            harness_browser_transport_ws_url("127.0.0.1:4173", true).as_deref(),
757            Some("ws://127.0.0.1:4173")
758        );
759        assert_eq!(
760            harness_browser_transport_ws_url("127.0.0.1:4173", false),
761            None
762        );
763        assert_eq!(harness_browser_transport_ws_url("", true), None);
764    }
765
766    #[tokio::test]
767    async fn resolve_peer_addr_does_not_fall_back_across_contexts() {
768        let authority = AuthorityId::new_from_entropy([210u8; 32]);
769        let peer = AuthorityId::new_from_entropy([211u8; 32]);
770        let primary_context = ContextId::new_from_entropy([212u8; 32]);
771        let fallback_context = default_context_id_for_authority(peer);
772
773        let config = AgentConfig::default();
774        let effects =
775            AuraEffectSystem::simulation_for_test_for_authority(&config, authority).unwrap();
776        let manager = RendezvousManager::new_with_default_udp(
777            authority,
778            RendezvousManagerConfig::default(),
779            Arc::new(effects.time_effects().clone()),
780        );
781        effects.attach_rendezvous_manager(manager.clone());
782        let service_context = RuntimeServiceContext::new(
783            Arc::new(TaskSupervisor::new()),
784            Arc::new(effects.time_effects().clone()),
785        );
786        RuntimeService::start(&manager, &service_context)
787            .await
788            .unwrap();
789
790        manager
791            .cache_descriptor(descriptor(
792                peer,
793                primary_context,
794                vec![TransportHint::quic_direct("127.0.0.1:55001").unwrap()],
795            ))
796            .await
797            .unwrap();
798
799        manager
800            .cache_descriptor(descriptor(
801                peer,
802                fallback_context,
803                vec![TransportHint::tcp_direct("127.0.0.1:55002").unwrap()],
804            ))
805            .await
806            .unwrap();
807
808        let resolved = resolve_peer_addr(&effects, primary_context, peer).await;
809        assert!(resolved.is_none());
810        RuntimeService::stop(&manager).await.unwrap();
811    }
812
813    #[tokio::test]
814    async fn move_passthrough_preserves_opaque_envelope_delivery() {
815        let config = AgentConfig::default();
816        let sender = AuthorityId::new_from_entropy([220u8; 32]);
817        let receiver = AuthorityId::new_from_entropy([221u8; 32]);
818        let context = ContextId::new_from_entropy([222u8; 32]);
819
820        let plain_shared = crate::runtime::SharedTransport::new();
821        let plain_sender =
822            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
823                &config,
824                sender,
825                plain_shared.clone(),
826            )
827            .unwrap();
828        let plain_receiver =
829            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
830                &config,
831                receiver,
832                plain_shared,
833            )
834            .unwrap();
835
836        let mut metadata = HashMap::new();
837        metadata.insert(
838            "content-type".to_string(),
839            "application/aura-opaque-object".to_string(),
840        );
841        let envelope = TransportEnvelope {
842            destination: receiver,
843            source: sender,
844            context,
845            payload: vec![9, 4, 2, 7, 1],
846            metadata,
847            receipt: None,
848        };
849
850        plain_sender.send_envelope(envelope.clone()).await.unwrap();
851        let baseline = plain_receiver.receive_envelope().await.unwrap();
852
853        let move_shared = crate::runtime::SharedTransport::new();
854        let move_sender =
855            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
856                &config,
857                sender,
858                move_shared.clone(),
859            )
860            .unwrap();
861        let move_receiver =
862            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
863                &config,
864                receiver,
865                move_shared,
866            )
867            .unwrap();
868        let move_manager = MoveManager::new(
869            MoveManagerConfig::for_testing(),
870            Arc::new(ServiceRegistry::new()),
871        );
872        move_sender.attach_move_manager(move_manager.clone());
873
874        move_sender.send_envelope(envelope.clone()).await.unwrap();
875        let migrated = move_receiver.receive_envelope().await.unwrap();
876
877        assert_eq!(baseline.destination, envelope.destination);
878        assert_eq!(baseline.source, envelope.source);
879        assert_eq!(baseline.context, envelope.context);
880        assert_eq!(baseline.payload, envelope.payload);
881        assert_eq!(baseline.metadata, envelope.metadata);
882        assert!(baseline.receipt.is_none());
883
884        assert_eq!(migrated.destination, baseline.destination);
885        assert_eq!(migrated.source, baseline.source);
886        assert_eq!(migrated.context, baseline.context);
887        assert_eq!(migrated.payload, baseline.payload);
888        assert_eq!(migrated.metadata, baseline.metadata);
889        assert!(migrated.receipt.is_none());
890
891        let projection = move_manager.projection().await;
892        assert_eq!(projection.queued_envelopes, 0);
893        assert_eq!(projection.replay_window_entries, 0);
894    }
895
896    #[tokio::test]
897    async fn receive_envelope_rejects_receipt_binding_mismatch() {
898        let config = AgentConfig::default();
899        let sender = AuthorityId::new_from_entropy([230u8; 32]);
900        let receiver = AuthorityId::new_from_entropy([231u8; 32]);
901        let context = ContextId::new_from_entropy([232u8; 32]);
902
903        let shared = crate::runtime::SharedTransport::new();
904        let sender_effects =
905            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
906                &config,
907                sender,
908                shared.clone(),
909            )
910            .unwrap();
911        let receiver_effects =
912            AuraEffectSystem::simulation_for_test_with_shared_transport_for_authority(
913                &config, receiver, shared,
914            )
915            .unwrap();
916
917        let envelope = TransportEnvelope {
918            destination: receiver,
919            source: sender,
920            context,
921            payload: vec![1, 2, 3],
922            metadata: HashMap::new(),
923            receipt: Some(aura_core::effects::transport::TransportReceipt {
924                context: ContextId::new_from_entropy([233u8; 32]),
925                src: sender,
926                dst: receiver,
927                epoch: 1,
928                cost: 1,
929                nonce: 7,
930                prev: [0u8; 32],
931                sig: vec![0xAA],
932            }),
933        };
934
935        sender_effects.send_envelope(envelope).await.unwrap();
936        let error = receiver_effects
937            .receive_envelope()
938            .await
939            .expect_err("receipt binding mismatch must fail closed");
940        assert!(matches!(
941            error,
942            TransportError::ReceiptValidationFailed { .. }
943        ));
944    }
945}