Skip to main content

sim_lib_stream_combinators/
recording.rs

1use std::ops::RangeBounds;
2use std::sync::{Arc, Mutex};
3
4use sim_kernel::{
5    Cx, Error, Event, EventKind, EventLedger, Ref, Result, Severity, Symbol, Tick, value_from_ref,
6};
7use sim_lib_stream_core::{
8    StreamCassette, StreamDiagnostic, StreamItem, StreamMetadata, StreamPacket, StreamStats,
9    TransportProfile,
10};
11
12use crate::stream::{Stream, StreamNode};
13
14/// A fully captured stream: its metadata plus every packet it produced.
15///
16/// A recording is the materialized, replayable form of a finished stream. It is
17/// produced by draining a [`Stream`] to `done` and can be replayed any number
18/// of times, seeked into, or serialized to a transport cassette.
19///
20/// # Examples
21///
22/// ```
23/// use sim_kernel::{Expr, Symbol};
24/// use sim_lib_stream_core::{
25///     BufferOverflowPolicy, BufferPolicy, StreamDirection, StreamItem, StreamMedia,
26///     StreamMetadata, StreamPacket,
27/// };
28/// use sim_lib_stream_combinators::{record_bang, Stream};
29///
30/// let metadata = StreamMetadata::new(
31///     Symbol::qualified("stream", "doc"),
32///     StreamMedia::Data,
33///     StreamDirection::Source,
34///     Symbol::qualified("clock", "doc"),
35///     BufferPolicy::bounded_with_overflow(8, BufferOverflowPolicy::DropNewest).unwrap(),
36/// );
37/// let item = StreamItem::new(StreamPacket::data(
38///     Symbol::qualified("stream/data", "model-event"),
39///     Expr::Nil,
40/// ));
41/// let stream = Stream::pull(metadata, vec![item.clone()]);
42///
43/// let recording = record_bang(&stream).unwrap();
44/// assert_eq!(recording.len(), 1);
45/// assert_eq!(recording.replay().take_packets(8).unwrap(), vec![item]);
46/// ```
47#[derive(Clone, Debug, PartialEq, Eq)]
48pub struct StreamRecording {
49    metadata: StreamMetadata,
50    items: Vec<StreamItem>,
51}
52
53impl StreamRecording {
54    /// Builds a recording from explicit metadata and captured packets.
55    pub fn new(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
56        Self { metadata, items }
57    }
58
59    /// Returns the metadata of the recorded stream.
60    pub fn metadata(&self) -> &StreamMetadata {
61        &self.metadata
62    }
63
64    /// Returns the captured packets in their recorded order.
65    pub fn items(&self) -> &[StreamItem] {
66        &self.items
67    }
68
69    /// Returns the number of captured packets.
70    pub fn len(&self) -> usize {
71        self.items.len()
72    }
73
74    /// Reports whether the recording captured no packets.
75    pub fn is_empty(&self) -> bool {
76        self.items.is_empty()
77    }
78
79    /// Returns a fresh stream that replays the captured packets.
80    pub fn replay(&self) -> Stream {
81        replay(self)
82    }
83
84    /// Replays the recording from the first packet matching `target`.
85    pub fn seek(&self, target: SeekTarget) -> Stream {
86        seek(self.replay(), target)
87    }
88
89    /// Serializes the recording into a transport cassette for `profile`.
90    pub fn cassette(&self, profile: TransportProfile) -> Result<StreamCassette> {
91        StreamCassette::from_items(
92            self.metadata.clone(),
93            self.items.clone(),
94            profile,
95            StreamStats {
96                yielded: self.items.len() as u64,
97                ..StreamStats::default()
98            },
99        )
100    }
101}
102
103/// Where a [`seek`] should begin replaying within a recorded stream.
104///
105/// # Examples
106///
107/// ```
108/// use sim_lib_stream_combinators::SeekTarget;
109///
110/// let by_index = SeekTarget::packet_index(2);
111/// assert_eq!(by_index, SeekTarget::PacketIndex(2));
112/// ```
113#[derive(Clone, Debug, PartialEq, Eq)]
114pub enum SeekTarget {
115    /// Start at the packet at this zero-based position in the stream.
116    PacketIndex(usize),
117    /// Start at the first packet bearing `index` on the named `clock`.
118    ClockIndex {
119        /// The clock whose tick index is matched.
120        clock: Symbol,
121        /// The tick index on `clock` to seek to.
122        index: Ref,
123    },
124}
125
126impl SeekTarget {
127    /// Builds a [`SeekTarget::PacketIndex`] for the given position.
128    pub fn packet_index(index: usize) -> Self {
129        Self::PacketIndex(index)
130    }
131
132    /// Builds a [`SeekTarget::ClockIndex`] for the given clock and tick index.
133    pub fn clock_index(clock: Symbol, index: Ref) -> Self {
134        Self::ClockIndex { clock, index }
135    }
136}
137
138/// Drains `source` to `done` and captures it as a [`StreamRecording`].
139///
140/// Errors if the stream is exhausted without reaching its terminal `done`.
141pub fn record_bang(source: &Stream) -> Result<StreamRecording> {
142    let mut items = Vec::new();
143    while let Some(item) = source.next_packet()? {
144        items.push(item);
145    }
146    if !source.is_done()? {
147        return Err(Error::Eval(
148            "cannot record a stream that has not reached done".to_owned(),
149        ));
150    }
151    Ok(StreamRecording::new(source.metadata().clone(), items))
152}
153
154/// Returns a fresh stream replaying every packet of `recording`.
155pub fn replay(recording: &StreamRecording) -> Stream {
156    Stream::pull(recording.metadata.clone(), recording.items.clone())
157}
158
159/// Records `source` to completion and serializes it to a cassette for `profile`.
160pub fn record_cassette_bang(source: &Stream, profile: TransportProfile) -> Result<StreamCassette> {
161    record_bang(source)?.cassette(profile)
162}
163
164/// Rebuilds a replayable stream from a serialized transport `cassette`.
165pub fn replay_cassette(cassette: &StreamCassette) -> Result<Stream> {
166    Ok(Stream::from_value(Arc::new(
167        cassette.replay_stream_value()?,
168    )))
169}
170
171/// Returns a stream that skips ahead in `source` to the first packet at `target`.
172///
173/// The stream then continues from that packet; if no packet matches, it is
174/// empty.
175pub fn seek(source: Stream, target: SeekTarget) -> Stream {
176    Stream::new(SeekNode {
177        source,
178        target,
179        state: Mutex::new(SeekState::Pending),
180    })
181}
182
183/// Reconstructs a recording from all of `run`'s events in `ledger`.
184///
185/// Convenience wrapper over [`record_events`] for an entire run.
186pub fn record_ledger_run(
187    cx: &mut Cx,
188    metadata: StreamMetadata,
189    ledger: &EventLedger,
190    run: &Ref,
191) -> Result<StreamRecording> {
192    record_events(cx, metadata, ledger.events_for_run(run))
193}
194
195/// Reconstructs a recording from the events of `run` within `seq_range`.
196///
197/// Like [`record_ledger_run`] but limited to events whose sequence number
198/// falls inside `seq_range`.
199pub fn record_ledger_slice<R>(
200    cx: &mut Cx,
201    metadata: StreamMetadata,
202    ledger: &EventLedger,
203    run: &Ref,
204    seq_range: R,
205) -> Result<StreamRecording>
206where
207    R: RangeBounds<u64>,
208{
209    record_events(
210        cx,
211        metadata,
212        ledger
213            .events_for_run(run)
214            .iter()
215            .filter(|event| seq_range.contains(&event.seq)),
216    )
217}
218
219/// Reconstructs a recording from an arbitrary sequence of kernel `events`.
220///
221/// Chunk events are decoded back into stream packets and diagnostic events into
222/// diagnostic packets; a `done` event ends capture, a `failed` event errors,
223/// and other event kinds are ignored.
224pub fn record_events<'a>(
225    cx: &mut Cx,
226    metadata: StreamMetadata,
227    events: impl IntoIterator<Item = &'a Event>,
228) -> Result<StreamRecording> {
229    let mut items = Vec::new();
230    for event in events {
231        match &event.kind {
232            EventKind::Chunk { payload } => {
233                items.push(item_from_payload(cx, payload, event.ticks.clone())?);
234            }
235            EventKind::Diagnostic(diagnostic) => {
236                items.push(StreamItem::new(StreamPacket::Diagnostic(
237                    diagnostic_packet(diagnostic),
238                )));
239            }
240            EventKind::Done => break,
241            EventKind::Failed(_) => {
242                return Err(Error::Eval(
243                    "cannot record a failed stream event slice".to_owned(),
244                ));
245            }
246            EventKind::Started { .. }
247            | EventKind::Claim { .. }
248            | EventKind::Trace(_)
249            | EventKind::EffectRequested { .. }
250            | EventKind::EffectResolved { .. }
251            | EventKind::Capture { .. }
252            | EventKind::Card { .. }
253            | EventKind::Final(_) => {}
254        }
255    }
256    Ok(StreamRecording::new(metadata, items))
257}
258
259fn item_from_payload(cx: &mut Cx, payload: &Ref, ticks: Vec<Tick>) -> Result<StreamItem> {
260    let value = value_from_ref(cx, payload)?;
261    let packet = StreamPacket::try_from(value.object().as_expr(cx)?)?;
262    StreamItem::with_ticks(packet, ticks)
263}
264
265fn diagnostic_packet(diagnostic: &sim_kernel::Diagnostic) -> StreamDiagnostic {
266    let kind = diagnostic
267        .code
268        .clone()
269        .unwrap_or_else(|| Symbol::qualified("stream/combinator", "Diagnostic"));
270    let prefix = match diagnostic.severity {
271        Severity::Error => "error",
272        Severity::Warning => "warning",
273        Severity::Info => "info",
274        Severity::Note => "note",
275    };
276    StreamDiagnostic::new(kind, format!("{prefix}: {}", diagnostic.message))
277}
278
279struct SeekNode {
280    source: Stream,
281    target: SeekTarget,
282    state: Mutex<SeekState>,
283}
284
285enum SeekState {
286    Pending,
287    Ready,
288    Drained,
289}
290
291impl StreamNode for SeekNode {
292    fn metadata(&self) -> &StreamMetadata {
293        self.source.metadata()
294    }
295
296    fn next_packet(&self) -> Result<Option<StreamItem>> {
297        let mut state = self
298            .state
299            .lock()
300            .map_err(|_| Error::PoisonedLock("seek stream"))?;
301        match *state {
302            SeekState::Ready => self.source.next_packet(),
303            SeekState::Drained => Ok(None),
304            SeekState::Pending => {
305                let item = seek_first(&self.source, &self.target)?;
306                *state = if item.is_some() {
307                    SeekState::Ready
308                } else {
309                    SeekState::Drained
310                };
311                Ok(item)
312            }
313        }
314    }
315
316    fn is_done(&self) -> Result<bool> {
317        let state = self
318            .state
319            .lock()
320            .map_err(|_| Error::PoisonedLock("seek stream"))?;
321        match *state {
322            SeekState::Drained => Ok(true),
323            SeekState::Pending | SeekState::Ready => self.source.is_done(),
324        }
325    }
326}
327
328fn seek_first(source: &Stream, target: &SeekTarget) -> Result<Option<StreamItem>> {
329    match target {
330        SeekTarget::PacketIndex(index) => {
331            for _ in 0..*index {
332                if source.next_packet()?.is_none() {
333                    return Ok(None);
334                }
335            }
336            source.next_packet()
337        }
338        SeekTarget::ClockIndex { clock, index } => {
339            while let Some(item) = source.next_packet()? {
340                if item
341                    .ticks()
342                    .iter()
343                    .any(|tick| &tick.clock == clock && &tick.index == index)
344                {
345                    return Ok(Some(item));
346                }
347            }
348            Ok(None)
349        }
350    }
351}