sim_lib_stream_combinators/
recording.rs1use std::ops::RangeBounds;
2use std::sync::{Arc, Mutex};
3
4use sim_kernel::{
5 Cx, Error, Event, EventKind, EventLedger, Ref, Result, Severity, Symbol, Tick, value_from_ref,
6};
7use sim_lib_stream_core::{
8 StreamCassette, StreamDiagnostic, StreamItem, StreamMetadata, StreamPacket, StreamStats,
9 TransportProfile,
10};
11
12use crate::stream::{Stream, StreamNode};
13
14#[derive(Clone, Debug, PartialEq, Eq)]
48pub struct StreamRecording {
49 metadata: StreamMetadata,
50 items: Vec<StreamItem>,
51}
52
53impl StreamRecording {
54 pub fn new(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
56 Self { metadata, items }
57 }
58
59 pub fn metadata(&self) -> &StreamMetadata {
61 &self.metadata
62 }
63
64 pub fn items(&self) -> &[StreamItem] {
66 &self.items
67 }
68
69 pub fn len(&self) -> usize {
71 self.items.len()
72 }
73
74 pub fn is_empty(&self) -> bool {
76 self.items.is_empty()
77 }
78
79 pub fn replay(&self) -> Stream {
81 replay(self)
82 }
83
84 pub fn seek(&self, target: SeekTarget) -> Stream {
86 seek(self.replay(), target)
87 }
88
89 pub fn cassette(&self, profile: TransportProfile) -> Result<StreamCassette> {
91 StreamCassette::from_items(
92 self.metadata.clone(),
93 self.items.clone(),
94 profile,
95 StreamStats {
96 yielded: self.items.len() as u64,
97 ..StreamStats::default()
98 },
99 )
100 }
101}
102
103#[derive(Clone, Debug, PartialEq, Eq)]
114pub enum SeekTarget {
115 PacketIndex(usize),
117 ClockIndex {
119 clock: Symbol,
121 index: Ref,
123 },
124}
125
126impl SeekTarget {
127 pub fn packet_index(index: usize) -> Self {
129 Self::PacketIndex(index)
130 }
131
132 pub fn clock_index(clock: Symbol, index: Ref) -> Self {
134 Self::ClockIndex { clock, index }
135 }
136}
137
138pub fn record_bang(source: &Stream) -> Result<StreamRecording> {
142 let mut items = Vec::new();
143 while let Some(item) = source.next_packet()? {
144 items.push(item);
145 }
146 if !source.is_done()? {
147 return Err(Error::Eval(
148 "cannot record a stream that has not reached done".to_owned(),
149 ));
150 }
151 Ok(StreamRecording::new(source.metadata().clone(), items))
152}
153
154pub fn replay(recording: &StreamRecording) -> Stream {
156 Stream::pull(recording.metadata.clone(), recording.items.clone())
157}
158
159pub fn record_cassette_bang(source: &Stream, profile: TransportProfile) -> Result<StreamCassette> {
161 record_bang(source)?.cassette(profile)
162}
163
164pub fn replay_cassette(cassette: &StreamCassette) -> Result<Stream> {
166 Ok(Stream::from_value(Arc::new(
167 cassette.replay_stream_value()?,
168 )))
169}
170
171pub fn seek(source: Stream, target: SeekTarget) -> Stream {
176 Stream::new(SeekNode {
177 source,
178 target,
179 state: Mutex::new(SeekState::Pending),
180 })
181}
182
183pub fn record_ledger_run(
187 cx: &mut Cx,
188 metadata: StreamMetadata,
189 ledger: &EventLedger,
190 run: &Ref,
191) -> Result<StreamRecording> {
192 record_events(cx, metadata, ledger.events_for_run(run))
193}
194
195pub fn record_ledger_slice<R>(
200 cx: &mut Cx,
201 metadata: StreamMetadata,
202 ledger: &EventLedger,
203 run: &Ref,
204 seq_range: R,
205) -> Result<StreamRecording>
206where
207 R: RangeBounds<u64>,
208{
209 record_events(
210 cx,
211 metadata,
212 ledger
213 .events_for_run(run)
214 .iter()
215 .filter(|event| seq_range.contains(&event.seq)),
216 )
217}
218
219pub fn record_events<'a>(
225 cx: &mut Cx,
226 metadata: StreamMetadata,
227 events: impl IntoIterator<Item = &'a Event>,
228) -> Result<StreamRecording> {
229 let mut items = Vec::new();
230 for event in events {
231 match &event.kind {
232 EventKind::Chunk { payload } => {
233 items.push(item_from_payload(cx, payload, event.ticks.clone())?);
234 }
235 EventKind::Diagnostic(diagnostic) => {
236 items.push(StreamItem::new(StreamPacket::Diagnostic(
237 diagnostic_packet(diagnostic),
238 )));
239 }
240 EventKind::Done => break,
241 EventKind::Failed(_) => {
242 return Err(Error::Eval(
243 "cannot record a failed stream event slice".to_owned(),
244 ));
245 }
246 EventKind::Started { .. }
247 | EventKind::Claim { .. }
248 | EventKind::Trace(_)
249 | EventKind::EffectRequested { .. }
250 | EventKind::EffectResolved { .. }
251 | EventKind::Capture { .. }
252 | EventKind::Card { .. }
253 | EventKind::Final(_) => {}
254 }
255 }
256 Ok(StreamRecording::new(metadata, items))
257}
258
259fn item_from_payload(cx: &mut Cx, payload: &Ref, ticks: Vec<Tick>) -> Result<StreamItem> {
260 let value = value_from_ref(cx, payload)?;
261 let packet = StreamPacket::try_from(value.object().as_expr(cx)?)?;
262 StreamItem::with_ticks(packet, ticks)
263}
264
265fn diagnostic_packet(diagnostic: &sim_kernel::Diagnostic) -> StreamDiagnostic {
266 let kind = diagnostic
267 .code
268 .clone()
269 .unwrap_or_else(|| Symbol::qualified("stream/combinator", "Diagnostic"));
270 let prefix = match diagnostic.severity {
271 Severity::Error => "error",
272 Severity::Warning => "warning",
273 Severity::Info => "info",
274 Severity::Note => "note",
275 };
276 StreamDiagnostic::new(kind, format!("{prefix}: {}", diagnostic.message))
277}
278
279struct SeekNode {
280 source: Stream,
281 target: SeekTarget,
282 state: Mutex<SeekState>,
283}
284
285enum SeekState {
286 Pending,
287 Ready,
288 Drained,
289}
290
291impl StreamNode for SeekNode {
292 fn metadata(&self) -> &StreamMetadata {
293 self.source.metadata()
294 }
295
296 fn next_packet(&self) -> Result<Option<StreamItem>> {
297 let mut state = self
298 .state
299 .lock()
300 .map_err(|_| Error::PoisonedLock("seek stream"))?;
301 match *state {
302 SeekState::Ready => self.source.next_packet(),
303 SeekState::Drained => Ok(None),
304 SeekState::Pending => {
305 let item = seek_first(&self.source, &self.target)?;
306 *state = if item.is_some() {
307 SeekState::Ready
308 } else {
309 SeekState::Drained
310 };
311 Ok(item)
312 }
313 }
314 }
315
316 fn is_done(&self) -> Result<bool> {
317 let state = self
318 .state
319 .lock()
320 .map_err(|_| Error::PoisonedLock("seek stream"))?;
321 match *state {
322 SeekState::Drained => Ok(true),
323 SeekState::Pending | SeekState::Ready => self.source.is_done(),
324 }
325 }
326}
327
328fn seek_first(source: &Stream, target: &SeekTarget) -> Result<Option<StreamItem>> {
329 match target {
330 SeekTarget::PacketIndex(index) => {
331 for _ in 0..*index {
332 if source.next_packet()?.is_none() {
333 return Ok(None);
334 }
335 }
336 source.next_packet()
337 }
338 SeekTarget::ClockIndex { clock, index } => {
339 while let Some(item) = source.next_packet()? {
340 if item
341 .ticks()
342 .iter()
343 .any(|tick| &tick.clock == clock && &tick.index == index)
344 {
345 return Ok(Some(item));
346 }
347 }
348 Ok(None)
349 }
350 }
351}