Skip to main content

sim_lib_web_bridge/
fixture.rs

1//! The fixture transport: a deterministic in-memory runtime.
2//!
3//! The fixture maps the bus onto an in-memory value store, so a session can be
4//! driven end to end without a server or browser. It also models connection
5//! loss and reconnection so the UI's session-status handling is testable.
6
7use std::collections::BTreeMap;
8
9use sim_kernel::{Error, Expr, Result, Symbol};
10use sim_lib_stream_core::{
11    PushResult, StreamEnvelope, StreamInspectorSnapshot, StreamItem, StreamMetadata, StreamPacket,
12    StreamStats, StreamValue, TransportProfile, stream_inspector_route_local_symbol,
13};
14
15use crate::transport::{
16    BrowserStreamStatus, ChangeEvent, SessionStatus, StreamInspectorRecord, Transport,
17    TransportKind,
18};
19
20/// An in-memory transport for deterministic sessions and replay.
21pub struct FixtureTransport {
22    store: BTreeMap<Symbol, Expr>,
23    streams: BTreeMap<Symbol, FixtureStream>,
24    events: Vec<ChangeEvent>,
25    status: SessionStatus,
26}
27
28struct FixtureStream {
29    stream: StreamValue,
30    buffered: usize,
31    status: BrowserStreamStatus,
32    diagnostics: Vec<Symbol>,
33}
34
35impl Default for FixtureTransport {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl FixtureTransport {
42    /// A connected, empty fixture.
43    pub fn new() -> Self {
44        Self {
45            store: BTreeMap::new(),
46            streams: BTreeMap::new(),
47            events: Vec::new(),
48            status: SessionStatus::Connected,
49        }
50    }
51
52    /// Seed a resource value (builder form).
53    pub fn with(mut self, resource: Symbol, value: Expr) -> Self {
54        self.store.insert(resource, value);
55        self
56    }
57
58    /// Seed or replace a resource value.
59    pub fn set(&mut self, resource: Symbol, value: Expr) {
60        self.store.insert(resource, value);
61    }
62
63    /// Seed a deterministic finite stream.
64    pub fn with_finite_stream(mut self, metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
65        self.set_finite_stream(metadata, items);
66        self
67    }
68
69    /// Seed a push stream.
70    pub fn with_push_stream(mut self, metadata: StreamMetadata) -> Self {
71        self.set_push_stream(metadata);
72        self
73    }
74
75    /// Seed or replace a deterministic finite stream.
76    pub fn set_finite_stream(&mut self, metadata: StreamMetadata, items: Vec<StreamItem>) {
77        self.streams.insert(
78            metadata.id().clone(),
79            FixtureStream {
80                buffered: items.len(),
81                stream: StreamValue::pull(metadata, items),
82                status: BrowserStreamStatus::Live,
83                diagnostics: Vec::new(),
84            },
85        );
86    }
87
88    /// Seed or replace a push stream.
89    pub fn set_push_stream(&mut self, metadata: StreamMetadata) {
90        self.streams.insert(
91            metadata.id().clone(),
92            FixtureStream {
93                stream: StreamValue::push(metadata),
94                buffered: 0,
95                status: BrowserStreamStatus::Live,
96                diagnostics: Vec::new(),
97            },
98        );
99    }
100
101    /// Mark a stream as refused after a profile diagnostic.
102    pub fn mark_stream_refused(&mut self, stream_id: &Symbol, diagnostic: Symbol) -> Result<()> {
103        let stream = self.stream_mut(stream_id)?;
104        stream.status = BrowserStreamStatus::RefusedProfile;
105        stream.diagnostics.push(diagnostic);
106        Ok(())
107    }
108
109    /// Simulate connection loss.
110    pub fn disconnect(&mut self) {
111        self.status = SessionStatus::Disconnected;
112    }
113
114    /// Simulate a reconnecting transport.
115    pub fn begin_reconnect(&mut self) {
116        self.status = SessionStatus::Reconnecting;
117    }
118
119    /// Simulate a restored connection.
120    pub fn reconnect(&mut self) {
121        self.status = SessionStatus::Connected;
122    }
123
124    fn ensure_live(&self) -> Result<()> {
125        if self.status.is_live() {
126            Ok(())
127        } else {
128            Err(Error::HostError(format!(
129                "fixture session is {:?}; no traffic can flow",
130                self.status
131            )))
132        }
133    }
134
135    fn stream_ref(&self, stream_id: &Symbol) -> Result<&FixtureStream> {
136        self.streams
137            .get(stream_id)
138            .ok_or_else(|| Error::UnknownSymbol {
139                symbol: stream_id.clone(),
140            })
141    }
142
143    fn stream_mut(&mut self, stream_id: &Symbol) -> Result<&mut FixtureStream> {
144        self.streams
145            .get_mut(stream_id)
146            .ok_or_else(|| Error::UnknownSymbol {
147                symbol: stream_id.clone(),
148            })
149    }
150
151    fn visible_stream_status(&self, stream: &FixtureStream) -> BrowserStreamStatus {
152        match self.status {
153            SessionStatus::Disconnected => BrowserStreamStatus::Disconnected,
154            SessionStatus::Reconnecting => BrowserStreamStatus::Reconnecting,
155            _ => stream.status,
156        }
157    }
158
159    fn inspector(&self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
160        let stream = self.stream_ref(stream_id)?;
161        let stats = stream.stream.stats()?;
162        let status = self.visible_stream_status(stream);
163        let queue_depth = stream.stream.queue_depth()?;
164        let observed = stats
165            .accepted
166            .max(stats.yielded.saturating_add(queue_depth as u64));
167        let snapshot = StreamInspectorSnapshot::new(
168            stream.stream.metadata(),
169            stream_inspector_route_local_symbol(),
170            TransportProfile::memory_local().name().clone(),
171            status.inspector_status(),
172            queue_depth,
173            &stats,
174            observed.checked_sub(1),
175            stream.diagnostics.clone(),
176        );
177        Ok(StreamInspectorRecord {
178            stream_id: stream_id.clone(),
179            status,
180            buffered: stream.buffered,
181            stats,
182            diagnostics: stream.diagnostics.clone(),
183            snapshot,
184        })
185    }
186}
187
188impl Transport for FixtureTransport {
189    fn kind(&self) -> TransportKind {
190        TransportKind::Fixture
191    }
192
193    fn status(&self) -> SessionStatus {
194        self.status
195    }
196
197    fn read(&self, resource: &Symbol) -> Result<Expr> {
198        self.ensure_live()?;
199        self.store
200            .get(resource)
201            .cloned()
202            .ok_or_else(|| Error::UnknownSymbol {
203                symbol: resource.clone(),
204            })
205    }
206
207    fn realize(&mut self, resource: &Symbol, operation: &Expr) -> Result<Expr> {
208        self.ensure_live()?;
209        let new_value = apply_operation(self.store.get(resource), operation)?;
210        self.store.insert(resource.clone(), new_value.clone());
211        self.events.push(ChangeEvent {
212            resource: resource.clone(),
213        });
214        Ok(new_value)
215    }
216
217    fn drain_events(&mut self) -> Vec<ChangeEvent> {
218        std::mem::take(&mut self.events)
219    }
220
221    fn stream_subscribe(&mut self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
222        self.ensure_live()?;
223        self.inspector(stream_id)
224    }
225
226    fn stream_read(&mut self, stream_id: &Symbol, limit: usize) -> Result<Vec<StreamItem>> {
227        self.ensure_live()?;
228        let stream = self.stream_mut(stream_id)?;
229        let items = stream.stream.take_packets(limit)?;
230        stream.buffered = stream.buffered.saturating_sub(items.len());
231        if stream.stream.stats()?.cancelled {
232            stream.status = BrowserStreamStatus::Cancelled;
233        } else if stream.stream.is_done()? {
234            stream.status = BrowserStreamStatus::Ended;
235        }
236        Ok(items)
237    }
238
239    fn stream_push(&mut self, stream_id: &Symbol, envelope: StreamEnvelope) -> Result<PushResult> {
240        self.ensure_live()?;
241        if envelope.stream_id() != stream_id {
242            return Err(Error::HostError(format!(
243                "stream push envelope id {} does not match target {}",
244                envelope.stream_id(),
245                stream_id
246            )));
247        }
248        let item = StreamItem::with_ticks(envelope.packet().clone(), envelope.ticks().to_vec())?;
249        let stream = self.stream_mut(stream_id)?;
250        let result = stream.stream.push_packet(item)?;
251        match &result {
252            PushResult::Accepted => {
253                stream.buffered = stream.buffered.saturating_add(1);
254                stream.status = BrowserStreamStatus::Live;
255            }
256            PushResult::DroppedNewest(item) | PushResult::DroppedOldest(item) => {
257                stream.status = BrowserStreamStatus::BufferOverflow;
258                if matches!(item.packet(), StreamPacket::Diagnostic(_)) {
259                    stream
260                        .diagnostics
261                        .push(Symbol::qualified("stream/browser", "buffer-overflow"));
262                }
263            }
264            PushResult::Rejected(_) => {
265                stream.status = BrowserStreamStatus::BufferOverflow;
266            }
267            PushResult::Closed(_) => {
268                stream.status = BrowserStreamStatus::Cancelled;
269            }
270        }
271        Ok(result)
272    }
273
274    fn stream_cancel(&mut self, stream_id: &Symbol) -> Result<()> {
275        self.ensure_live()?;
276        let stream = self.stream_mut(stream_id)?;
277        stream.stream.cancel()?;
278        stream.buffered = 0;
279        stream.status = BrowserStreamStatus::Cancelled;
280        Ok(())
281    }
282
283    fn stream_stats(&self, stream_id: &Symbol) -> Result<StreamStats> {
284        self.stream_ref(stream_id)?.stream.stats()
285    }
286
287    fn stream_inspector(&self, stream_id: &Symbol) -> Result<StreamInspectorRecord> {
288        self.inspector(stream_id)
289    }
290}
291
292/// Interpret a checked operation against the current value. The fixture
293/// understands the universal editor's `set-value` operation; unknown operations
294/// fail closed.
295fn apply_operation(current: Option<&Expr>, operation: &Expr) -> Result<Expr> {
296    let Expr::Map(entries) = operation else {
297        return Err(Error::HostError("operation is not a map".to_owned()));
298    };
299    let op_name = entries.iter().find_map(|(key, value)| {
300        let is_op = matches!(key, Expr::Symbol(symbol) if &*symbol.name == "op");
301        match value {
302            Expr::Symbol(symbol) if is_op => Some(symbol.name.to_string()),
303            _ => None,
304        }
305    });
306    match op_name.as_deref() {
307        Some("set-value") => entries
308            .iter()
309            .find_map(|(key, value)| {
310                matches!(key, Expr::Symbol(symbol) if &*symbol.name == "value").then_some(value)
311            })
312            .cloned()
313            .ok_or_else(|| Error::HostError("set-value operation is missing a 'value'".to_owned())),
314        Some(other) => Err(Error::HostError(format!(
315            "fixture transport cannot realize operation '{other}'"
316        ))),
317        None => {
318            let _ = current;
319            Err(Error::HostError(
320                "operation is missing an 'op' tag".to_owned(),
321            ))
322        }
323    }
324}