Skip to main content

sim_lib_web_bridge/
remote.rs

1//! Network transports (wasm, local server, remote server).
2//!
3//! These share the [`Transport`] contract with the fixture so they are
4//! interchangeable behind the session bridge. Each connects its runtime through
5//! `realize`/`EvalFabric` (HTTP bootstrap plus a WebSocket live channel for the
6//! server transports, the in-process fabric for wasm). Disconnected transports
7//! fail closed, so the session degrades to a visible state rather than a crash.
8
9use sim_kernel::{Cx, Error, Expr, Result, Symbol};
10use sim_lib_server::{FrameEnvelope, ServerFrame};
11use sim_lib_stream_core::{
12    BufferPolicy, ClockDomain, PushResult, StreamDirection, StreamEnvelope,
13    StreamInspectorSnapshot, StreamItem, StreamMedia, StreamMetadata, StreamStats,
14    TransportProfile, stream_inspector_route_local_symbol,
15};
16use sim_lib_stream_fabric::{StreamControl, stream_control_frame_from_control};
17
18use crate::transport::{
19    BrowserStreamStatus, ChangeEvent, SessionStatus, StreamInspectorRecord, Transport,
20    TransportKind,
21};
22
23/// A network-backed transport that connects a runtime over `realize`.
24pub struct RemoteTransport {
25    kind: TransportKind,
26    status: SessionStatus,
27    endpoint: String,
28}
29
30impl RemoteTransport {
31    /// A wasm transport targeting an in-browser runtime.
32    pub fn wasm() -> Self {
33        Self::new(TransportKind::Wasm, "wasm:local")
34    }
35
36    /// A local-server transport (HTTP bootstrap + WebSocket live).
37    pub fn local_server(endpoint: impl Into<String>) -> Self {
38        Self::new(TransportKind::LocalServer, endpoint)
39    }
40
41    /// A remote-server transport (HTTP bootstrap + WebSocket live).
42    pub fn remote_server(endpoint: impl Into<String>) -> Self {
43        Self::new(TransportKind::RemoteServer, endpoint)
44    }
45
46    fn new(kind: TransportKind, endpoint: impl Into<String>) -> Self {
47        Self {
48            kind,
49            status: SessionStatus::Disconnected,
50            endpoint: endpoint.into(),
51        }
52    }
53
54    /// The configured endpoint.
55    pub fn endpoint(&self) -> &str {
56        &self.endpoint
57    }
58
59    /// Mark the remote channel connected.
60    pub fn connect(&mut self) {
61        self.status = SessionStatus::Connected;
62    }
63
64    /// Mark the remote channel disconnected.
65    pub fn disconnect(&mut self) {
66        self.status = SessionStatus::Disconnected;
67    }
68
69    /// Mark the remote channel reconnecting.
70    pub fn begin_reconnect(&mut self) {
71        self.status = SessionStatus::Reconnecting;
72    }
73
74    /// Encode a stream-fabric control frame for server-backed transports.
75    pub fn stream_control_frame(
76        &self,
77        cx: &mut Cx,
78        codec: Symbol,
79        control: &StreamControl,
80    ) -> Result<ServerFrame> {
81        match self.kind {
82            TransportKind::LocalServer | TransportKind::RemoteServer => {
83                stream_control_frame_from_control(cx, codec, control, FrameEnvelope::default())
84            }
85            TransportKind::Fixture | TransportKind::Wasm | TransportKind::Fabric => {
86                Err(Error::HostError(format!(
87                    "{:?} transport does not use server stream-fabric frames",
88                    self.kind
89                )))
90            }
91        }
92    }
93
94    fn not_connected(&self) -> Error {
95        Error::HostError(format!(
96            "{:?} transport to {} is not connected (live channel unavailable)",
97            self.kind, self.endpoint
98        ))
99    }
100}
101
102impl Transport for RemoteTransport {
103    fn kind(&self) -> TransportKind {
104        self.kind
105    }
106
107    fn status(&self) -> SessionStatus {
108        self.status
109    }
110
111    fn read(&self, _resource: &Symbol) -> Result<Expr> {
112        Err(self.not_connected())
113    }
114
115    fn realize(&mut self, _resource: &Symbol, _operation: &Expr) -> Result<Expr> {
116        Err(self.not_connected())
117    }
118
119    fn drain_events(&mut self) -> Vec<ChangeEvent> {
120        Vec::new()
121    }
122
123    fn stream_subscribe(&mut self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
124        Err(Error::HostError(format!(
125            "cannot subscribe to stream {stream_id}: {}",
126            self.not_connected()
127        )))
128    }
129
130    fn stream_read(&mut self, stream_id: &Symbol, _limit: usize) -> Result<Vec<StreamItem>> {
131        Err(Error::HostError(format!(
132            "cannot read stream {stream_id}: {}",
133            self.not_connected()
134        )))
135    }
136
137    fn stream_push(&mut self, stream_id: &Symbol, _envelope: StreamEnvelope) -> Result<PushResult> {
138        Err(Error::HostError(format!(
139            "cannot push stream {stream_id}: {}",
140            self.not_connected()
141        )))
142    }
143
144    fn stream_cancel(&mut self, stream_id: &Symbol) -> Result<()> {
145        Err(Error::HostError(format!(
146            "cannot cancel stream {stream_id}: {}",
147            self.not_connected()
148        )))
149    }
150
151    fn stream_stats(&self, stream_id: &Symbol) -> Result<StreamStats> {
152        Err(Error::HostError(format!(
153            "cannot inspect stream stats {stream_id}: {}",
154            self.not_connected()
155        )))
156    }
157
158    fn stream_inspector(&self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
159        let status = match self.status {
160            SessionStatus::Disconnected => BrowserStreamStatus::Disconnected,
161            SessionStatus::Reconnecting => BrowserStreamStatus::Reconnecting,
162            SessionStatus::Closed => BrowserStreamStatus::Cancelled,
163            _ => BrowserStreamStatus::Live,
164        };
165        Ok(StreamInspectorRecord {
166            stream_id: stream_id.clone(),
167            status,
168            buffered: 0,
169            stats: StreamStats::default(),
170            diagnostics: Vec::new(),
171            snapshot: StreamInspectorSnapshot::new(
172                &StreamMetadata::new(
173                    stream_id.clone(),
174                    StreamMedia::Data,
175                    StreamDirection::Source,
176                    ClockDomain::ServerFrame.symbol(),
177                    BufferPolicy::bounded(1)?,
178                ),
179                stream_inspector_route_local_symbol(),
180                TransportProfile::remote_stream_fabric().name().clone(),
181                status.inspector_status(),
182                0,
183                &StreamStats::default(),
184                None,
185                Vec::new(),
186            ),
187        })
188    }
189}