1use std::collections::BTreeMap;
8
9use sim_kernel::{Error, Expr, Result, Symbol};
10use sim_lib_stream_core::{
11 PushResult, StreamEnvelope, StreamInspectorSnapshot, StreamItem, StreamMetadata, StreamPacket,
12 StreamStats, StreamValue, TransportProfile, stream_inspector_route_local_symbol,
13};
14
15use crate::transport::{
16 BrowserStreamStatus, ChangeEvent, SessionStatus, StreamInspectorRecord, Transport,
17 TransportKind,
18};
19
20pub struct FixtureTransport {
22 store: BTreeMap<Symbol, Expr>,
23 streams: BTreeMap<Symbol, FixtureStream>,
24 events: Vec<ChangeEvent>,
25 status: SessionStatus,
26}
27
28struct FixtureStream {
29 stream: StreamValue,
30 buffered: usize,
31 status: BrowserStreamStatus,
32 diagnostics: Vec<Symbol>,
33}
34
35impl Default for FixtureTransport {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl FixtureTransport {
42 pub fn new() -> Self {
44 Self {
45 store: BTreeMap::new(),
46 streams: BTreeMap::new(),
47 events: Vec::new(),
48 status: SessionStatus::Connected,
49 }
50 }
51
52 pub fn with(mut self, resource: Symbol, value: Expr) -> Self {
54 self.store.insert(resource, value);
55 self
56 }
57
58 pub fn set(&mut self, resource: Symbol, value: Expr) {
60 self.store.insert(resource, value);
61 }
62
63 pub fn with_finite_stream(mut self, metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
65 self.set_finite_stream(metadata, items);
66 self
67 }
68
69 pub fn with_push_stream(mut self, metadata: StreamMetadata) -> Self {
71 self.set_push_stream(metadata);
72 self
73 }
74
75 pub fn set_finite_stream(&mut self, metadata: StreamMetadata, items: Vec<StreamItem>) {
77 self.streams.insert(
78 metadata.id().clone(),
79 FixtureStream {
80 buffered: items.len(),
81 stream: StreamValue::pull(metadata, items),
82 status: BrowserStreamStatus::Live,
83 diagnostics: Vec::new(),
84 },
85 );
86 }
87
88 pub fn set_push_stream(&mut self, metadata: StreamMetadata) {
90 self.streams.insert(
91 metadata.id().clone(),
92 FixtureStream {
93 stream: StreamValue::push(metadata),
94 buffered: 0,
95 status: BrowserStreamStatus::Live,
96 diagnostics: Vec::new(),
97 },
98 );
99 }
100
101 pub fn mark_stream_refused(&mut self, stream_id: &Symbol, diagnostic: Symbol) -> Result<()> {
103 let stream = self.stream_mut(stream_id)?;
104 stream.status = BrowserStreamStatus::RefusedProfile;
105 stream.diagnostics.push(diagnostic);
106 Ok(())
107 }
108
109 pub fn disconnect(&mut self) {
111 self.status = SessionStatus::Disconnected;
112 }
113
114 pub fn begin_reconnect(&mut self) {
116 self.status = SessionStatus::Reconnecting;
117 }
118
119 pub fn reconnect(&mut self) {
121 self.status = SessionStatus::Connected;
122 }
123
124 fn ensure_live(&self) -> Result<()> {
125 if self.status.is_live() {
126 Ok(())
127 } else {
128 Err(Error::HostError(format!(
129 "fixture session is {:?}; no traffic can flow",
130 self.status
131 )))
132 }
133 }
134
135 fn stream_ref(&self, stream_id: &Symbol) -> Result<&FixtureStream> {
136 self.streams
137 .get(stream_id)
138 .ok_or_else(|| Error::UnknownSymbol {
139 symbol: stream_id.clone(),
140 })
141 }
142
143 fn stream_mut(&mut self, stream_id: &Symbol) -> Result<&mut FixtureStream> {
144 self.streams
145 .get_mut(stream_id)
146 .ok_or_else(|| Error::UnknownSymbol {
147 symbol: stream_id.clone(),
148 })
149 }
150
151 fn visible_stream_status(&self, stream: &FixtureStream) -> BrowserStreamStatus {
152 match self.status {
153 SessionStatus::Disconnected => BrowserStreamStatus::Disconnected,
154 SessionStatus::Reconnecting => BrowserStreamStatus::Reconnecting,
155 _ => stream.status,
156 }
157 }
158
159 fn inspector(&self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
160 let stream = self.stream_ref(stream_id)?;
161 let stats = stream.stream.stats()?;
162 let status = self.visible_stream_status(stream);
163 let queue_depth = stream.stream.queue_depth()?;
164 let observed = stats
165 .accepted
166 .max(stats.yielded.saturating_add(queue_depth as u64));
167 let snapshot = StreamInspectorSnapshot::new(
168 stream.stream.metadata(),
169 stream_inspector_route_local_symbol(),
170 TransportProfile::memory_local().name().clone(),
171 status.inspector_status(),
172 queue_depth,
173 &stats,
174 observed.checked_sub(1),
175 stream.diagnostics.clone(),
176 );
177 Ok(StreamInspectorRecord {
178 stream_id: stream_id.clone(),
179 status,
180 buffered: stream.buffered,
181 stats,
182 diagnostics: stream.diagnostics.clone(),
183 snapshot,
184 })
185 }
186}
187
188impl Transport for FixtureTransport {
189 fn kind(&self) -> TransportKind {
190 TransportKind::Fixture
191 }
192
193 fn status(&self) -> SessionStatus {
194 self.status
195 }
196
197 fn read(&self, resource: &Symbol) -> Result<Expr> {
198 self.ensure_live()?;
199 self.store
200 .get(resource)
201 .cloned()
202 .ok_or_else(|| Error::UnknownSymbol {
203 symbol: resource.clone(),
204 })
205 }
206
207 fn realize(&mut self, resource: &Symbol, operation: &Expr) -> Result<Expr> {
208 self.ensure_live()?;
209 let new_value = apply_operation(self.store.get(resource), operation)?;
210 self.store.insert(resource.clone(), new_value.clone());
211 self.events.push(ChangeEvent {
212 resource: resource.clone(),
213 });
214 Ok(new_value)
215 }
216
217 fn drain_events(&mut self) -> Vec<ChangeEvent> {
218 std::mem::take(&mut self.events)
219 }
220
221 fn stream_subscribe(&mut self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
222 self.ensure_live()?;
223 self.inspector(stream_id)
224 }
225
226 fn stream_read(&mut self, stream_id: &Symbol, limit: usize) -> Result<Vec<StreamItem>> {
227 self.ensure_live()?;
228 let stream = self.stream_mut(stream_id)?;
229 let items = stream.stream.take_packets(limit)?;
230 stream.buffered = stream.buffered.saturating_sub(items.len());
231 if stream.stream.stats()?.cancelled {
232 stream.status = BrowserStreamStatus::Cancelled;
233 } else if stream.stream.is_done()? {
234 stream.status = BrowserStreamStatus::Ended;
235 }
236 Ok(items)
237 }
238
239 fn stream_push(&mut self, stream_id: &Symbol, envelope: StreamEnvelope) -> Result<PushResult> {
240 self.ensure_live()?;
241 if envelope.stream_id() != stream_id {
242 return Err(Error::HostError(format!(
243 "stream push envelope id {} does not match target {}",
244 envelope.stream_id(),
245 stream_id
246 )));
247 }
248 let item = StreamItem::with_ticks(envelope.packet().clone(), envelope.ticks().to_vec())?;
249 let stream = self.stream_mut(stream_id)?;
250 let result = stream.stream.push_packet(item)?;
251 match &result {
252 PushResult::Accepted => {
253 stream.buffered = stream.buffered.saturating_add(1);
254 stream.status = BrowserStreamStatus::Live;
255 }
256 PushResult::DroppedNewest(item) | PushResult::DroppedOldest(item) => {
257 stream.status = BrowserStreamStatus::BufferOverflow;
258 if matches!(item.packet(), StreamPacket::Diagnostic(_)) {
259 stream
260 .diagnostics
261 .push(Symbol::qualified("stream/browser", "buffer-overflow"));
262 }
263 }
264 PushResult::Rejected(_) => {
265 stream.status = BrowserStreamStatus::BufferOverflow;
266 }
267 PushResult::Closed(_) => {
268 stream.status = BrowserStreamStatus::Cancelled;
269 }
270 }
271 Ok(result)
272 }
273
274 fn stream_cancel(&mut self, stream_id: &Symbol) -> Result<()> {
275 self.ensure_live()?;
276 let stream = self.stream_mut(stream_id)?;
277 stream.stream.cancel()?;
278 stream.buffered = 0;
279 stream.status = BrowserStreamStatus::Cancelled;
280 Ok(())
281 }
282
283 fn stream_stats(&self, stream_id: &Symbol) -> Result<StreamStats> {
284 self.stream_ref(stream_id)?.stream.stats()
285 }
286
287 fn stream_inspector(&self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
288 self.inspector(stream_id)
289 }
290}
291
292fn apply_operation(current: Option<&Expr>, operation: &Expr) -> Result<Expr> {
296 let Expr::Map(entries) = operation else {
297 return Err(Error::HostError("operation is not a map".to_owned()));
298 };
299 let op_name = entries.iter().find_map(|(key, value)| {
300 let is_op = matches!(key, Expr::Symbol(symbol) if &*symbol.name == "op");
301 match value {
302 Expr::Symbol(symbol) if is_op => Some(symbol.name.to_string()),
303 _ => None,
304 }
305 });
306 match op_name.as_deref() {
307 Some("set-value") => entries
308 .iter()
309 .find_map(|(key, value)| {
310 matches!(key, Expr::Symbol(symbol) if &*symbol.name == "value").then_some(value)
311 })
312 .cloned()
313 .ok_or_else(|| Error::HostError("set-value operation is missing a 'value'".to_owned())),
314 Some(other) => Err(Error::HostError(format!(
315 "fixture transport cannot realize operation '{other}'"
316 ))),
317 None => {
318 let _ = current;
319 Err(Error::HostError(
320 "operation is missing an 'op' tag".to_owned(),
321 ))
322 }
323 }
324}