sim_lib_web_bridge/
remote.rs1use sim_kernel::{Cx, Error, Expr, Result, Symbol};
10use sim_lib_server::{FrameEnvelope, ServerFrame};
11use sim_lib_stream_core::{
12 BufferPolicy, ClockDomain, PushResult, StreamDirection, StreamEnvelope,
13 StreamInspectorSnapshot, StreamItem, StreamMedia, StreamMetadata, StreamStats,
14 TransportProfile, stream_inspector_route_local_symbol,
15};
16use sim_lib_stream_fabric::{StreamControl, stream_control_frame_from_control};
17
18use crate::transport::{
19 BrowserStreamStatus, ChangeEvent, SessionStatus, StreamInspectorRecord, Transport,
20 TransportKind,
21};
22
23pub struct RemoteTransport {
25 kind: TransportKind,
26 status: SessionStatus,
27 endpoint: String,
28}
29
30impl RemoteTransport {
31 pub fn wasm() -> Self {
33 Self::new(TransportKind::Wasm, "wasm:local")
34 }
35
36 pub fn local_server(endpoint: impl Into<String>) -> Self {
38 Self::new(TransportKind::LocalServer, endpoint)
39 }
40
41 pub fn remote_server(endpoint: impl Into<String>) -> Self {
43 Self::new(TransportKind::RemoteServer, endpoint)
44 }
45
46 fn new(kind: TransportKind, endpoint: impl Into<String>) -> Self {
47 Self {
48 kind,
49 status: SessionStatus::Disconnected,
50 endpoint: endpoint.into(),
51 }
52 }
53
54 pub fn endpoint(&self) -> &str {
56 &self.endpoint
57 }
58
59 pub fn connect(&mut self) {
61 self.status = SessionStatus::Connected;
62 }
63
64 pub fn disconnect(&mut self) {
66 self.status = SessionStatus::Disconnected;
67 }
68
69 pub fn begin_reconnect(&mut self) {
71 self.status = SessionStatus::Reconnecting;
72 }
73
74 pub fn stream_control_frame(
76 &self,
77 cx: &mut Cx,
78 codec: Symbol,
79 control: &StreamControl,
80 ) -> Result<ServerFrame> {
81 match self.kind {
82 TransportKind::LocalServer | TransportKind::RemoteServer => {
83 stream_control_frame_from_control(cx, codec, control, FrameEnvelope::default())
84 }
85 TransportKind::Fixture | TransportKind::Wasm | TransportKind::Fabric => {
86 Err(Error::HostError(format!(
87 "{:?} transport does not use server stream-fabric frames",
88 self.kind
89 )))
90 }
91 }
92 }
93
94 fn not_connected(&self) -> Error {
95 Error::HostError(format!(
96 "{:?} transport to {} is not connected (live channel unavailable)",
97 self.kind, self.endpoint
98 ))
99 }
100}
101
102impl Transport for RemoteTransport {
103 fn kind(&self) -> TransportKind {
104 self.kind
105 }
106
107 fn status(&self) -> SessionStatus {
108 self.status
109 }
110
111 fn read(&self, _resource: &Symbol) -> Result<Expr> {
112 Err(self.not_connected())
113 }
114
115 fn realize(&mut self, _resource: &Symbol, _operation: &Expr) -> Result<Expr> {
116 Err(self.not_connected())
117 }
118
119 fn drain_events(&mut self) -> Vec<ChangeEvent> {
120 Vec::new()
121 }
122
123 fn stream_subscribe(&mut self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
124 Err(Error::HostError(format!(
125 "cannot subscribe to stream {stream_id}: {}",
126 self.not_connected()
127 )))
128 }
129
130 fn stream_read(&mut self, stream_id: &Symbol, _limit: usize) -> Result<Vec<StreamItem>> {
131 Err(Error::HostError(format!(
132 "cannot read stream {stream_id}: {}",
133 self.not_connected()
134 )))
135 }
136
137 fn stream_push(&mut self, stream_id: &Symbol, _envelope: StreamEnvelope) -> Result<PushResult> {
138 Err(Error::HostError(format!(
139 "cannot push stream {stream_id}: {}",
140 self.not_connected()
141 )))
142 }
143
144 fn stream_cancel(&mut self, stream_id: &Symbol) -> Result<()> {
145 Err(Error::HostError(format!(
146 "cannot cancel stream {stream_id}: {}",
147 self.not_connected()
148 )))
149 }
150
151 fn stream_stats(&self, stream_id: &Symbol) -> Result<StreamStats> {
152 Err(Error::HostError(format!(
153 "cannot inspect stream stats {stream_id}: {}",
154 self.not_connected()
155 )))
156 }
157
158 fn stream_inspector(&self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
159 let status = match self.status {
160 SessionStatus::Disconnected => BrowserStreamStatus::Disconnected,
161 SessionStatus::Reconnecting => BrowserStreamStatus::Reconnecting,
162 SessionStatus::Closed => BrowserStreamStatus::Cancelled,
163 _ => BrowserStreamStatus::Live,
164 };
165 Ok(StreamInspectorRecord {
166 stream_id: stream_id.clone(),
167 status,
168 buffered: 0,
169 stats: StreamStats::default(),
170 diagnostics: Vec::new(),
171 snapshot: StreamInspectorSnapshot::new(
172 &StreamMetadata::new(
173 stream_id.clone(),
174 StreamMedia::Data,
175 StreamDirection::Source,
176 ClockDomain::ServerFrame.symbol(),
177 BufferPolicy::bounded(1)?,
178 ),
179 stream_inspector_route_local_symbol(),
180 TransportProfile::remote_stream_fabric().name().clone(),
181 status.inspector_status(),
182 0,
183 &StreamStats::default(),
184 None,
185 Vec::new(),
186 ),
187 })
188 }
189}