Skip to main content

sim_lib_stream_core/
spine.rs

1//! The stream spine: the runtime-visible stream value and its base combinators.
2//!
3//! A [`StreamValue`] is the homogeneous stream object the runtime hands around.
4//! It pairs immutable [`StreamMetadata`] with one of two internal spines -- a
5//! pull spine over a fixed buffer of pre-built items, or a push spine whose
6//! bounded queue is fed by an external producer. Both are driven through one
7//! base combinator surface: `next`/`peek`/`take`/`run`/`cancel`/`done?`/
8//! `metadata`/`stats`, exposed both as Rust methods on [`StreamValue`] and as
9//! free `stream_*_bang` verb helpers paired with `stream_*_symbol` helpers that
10//! name the corresponding kernel [`Symbol`].
11//!
12//! The unit of flow is a [`StreamItem`]: a [`StreamPacket`] plus the clock-
13//! domain [`Tick`]s observed at it. The kernel defines the protocol contracts
14//! ([`Sequence`], [`Object`], [`Event`], [`Symbol`]); this module supplies the
15//! concrete streaming-fabric behavior over them.
16
17mod event_source;
18mod queue;
19
20use std::{sync::Arc, time::Duration};
21
22use sim_citizen_derive::non_citizen;
23use sim_kernel::{
24    CORE_SEQUENCE_CLASS_ID, ClassRef, Cx, Error, Event, Object, ObjectCompat, Ref, Result,
25    Sequence, SequenceItem, Symbol, Tick, Value, stream_surface::stream_packet_event,
26    validate_ticks,
27};
28
29use crate::{StreamMetadata, StreamPacket, publish_metadata_claims};
30
31pub use event_source::StreamEventSource;
32use queue::{PullSpine, PushSpine};
33pub use queue::{PushResult, StreamStats};
34
35/// One unit of flow through the spine: a packet plus its clock-domain ticks.
36///
37/// Couples a [`StreamPacket`] payload with the [`Tick`]s observed at it, so a
38/// packet carries its clock-domain context as it moves through the stream. An
39/// item can be projected into a runtime [`Value`], a [`SequenceItem`], or a
40/// sequenced packet [`Event`].
41#[derive(Clone, Debug, PartialEq, Eq)]
42pub struct StreamItem {
43    packet: StreamPacket,
44    ticks: Vec<Tick>,
45}
46
47impl StreamItem {
48    /// Creates an item carrying `packet` with no ticks.
49    pub fn new(packet: StreamPacket) -> Self {
50        Self {
51            packet,
52            ticks: Vec::new(),
53        }
54    }
55
56    /// Creates an item carrying `packet` with `ticks`, validating the ticks.
57    ///
58    /// Returns an error if `ticks` is not a valid clock-domain reading.
59    pub fn with_ticks(packet: StreamPacket, ticks: Vec<Tick>) -> Result<Self> {
60        validate_ticks(&ticks)?;
61        Ok(Self { packet, ticks })
62    }
63
64    /// Returns the packet payload.
65    pub fn packet(&self) -> &StreamPacket {
66        &self.packet
67    }
68
69    /// Returns the clock-domain ticks observed at this item.
70    pub fn ticks(&self) -> &[Tick] {
71        &self.ticks
72    }
73
74    /// Materializes the packet payload as a runtime [`Value`].
75    pub fn packet_value(&self, cx: &mut Cx) -> Result<Value> {
76        cx.factory().expr(self.packet.to_expr())
77    }
78
79    /// Projects this item into a [`SequenceItem`], preserving its ticks.
80    pub fn sequence_item(&self, cx: &mut Cx) -> Result<SequenceItem> {
81        SequenceItem::with_ticks(self.packet_value(cx)?, self.ticks.clone())
82    }
83
84    /// Builds a sequenced packet [`Event`] for `run` at sequence number `seq`.
85    pub fn chunk_event(&self, cx: &mut Cx, run: Ref, seq: u64) -> Result<Event> {
86        let payload = self.packet.intern_ref(cx)?;
87        stream_packet_event(run, seq, self.ticks.clone(), payload)
88    }
89}
90
91/// The runtime-visible stream value: metadata plus a live spine.
92///
93/// A `StreamValue` is the homogeneous stream object the runtime passes around.
94/// It is a non-citizen live handle (it is not serialized directly; consumers
95/// reconstruct the `stream/Packet` and `stream/Metadata` descriptors and
96/// realize them separately). Build a finite, replayable stream with
97/// [`StreamValue::pull`] or a producer-fed stream with [`StreamValue::push`],
98/// then drive it through the base combinators ([`next_packet`], [`peek_packet`],
99/// [`take_packets`], [`run_events`], [`cancel`], [`is_done`]).
100///
101/// As a kernel [`Object`] it presents as a [`Sequence`], so it interoperates
102/// with sequence-consuming operations directly.
103///
104/// [`next_packet`]: StreamValue::next_packet
105/// [`peek_packet`]: StreamValue::peek_packet
106/// [`take_packets`]: StreamValue::take_packets
107/// [`run_events`]: StreamValue::run_events
108/// [`cancel`]: StreamValue::cancel
109/// [`is_done`]: StreamValue::is_done
110#[non_citizen(
111    reason = "live stream spine; reconstruct stream/Packet and stream/Metadata descriptors then realize separately",
112    kind = "handle",
113    descriptor = "stream/Packet"
114)]
115pub struct StreamValue {
116    metadata: StreamMetadata,
117    spine: StreamSpine,
118}
119
120enum StreamSpine {
121    Pull(PullSpine),
122    Push(PushSpine),
123}
124
125impl StreamValue {
126    /// Builds a pull stream that yields the given pre-built `items` in order.
127    ///
128    /// A pull stream is finite and self-draining: it serves `items` until
129    /// exhausted, then reports done. It rejects pushed packets.
130    pub fn pull(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
131        Self {
132            metadata,
133            spine: StreamSpine::Pull(PullSpine::new(items)),
134        }
135    }
136
137    /// Builds a push stream fed by an external producer.
138    ///
139    /// The stream starts empty; producers call [`StreamValue::push_packet`] to
140    /// enqueue items under the buffer policy carried by `metadata`, and
141    /// consumers pull them out.
142    pub fn push(metadata: StreamMetadata) -> Self {
143        Self {
144            spine: StreamSpine::Push(PushSpine::new(metadata.buffer().clone())),
145            metadata,
146        }
147    }
148
149    /// Returns the stream's immutable metadata.
150    pub fn metadata(&self) -> &StreamMetadata {
151        &self.metadata
152    }
153
154    /// Publishes this stream's metadata as claims about `subject` into `cx`.
155    pub fn publish_claims(&self, cx: &mut Cx, subject: Ref) -> Result<()> {
156        publish_metadata_claims(cx, subject, &self.metadata)
157    }
158
159    /// Pushes a packet into a push stream, returning the backpressure outcome.
160    ///
161    /// Returns an error when called on a pull stream, which accepts no input.
162    pub fn push_packet(&self, item: StreamItem) -> Result<PushResult> {
163        match &self.spine {
164            StreamSpine::Pull(_) => Err(Error::Eval(
165                "cannot push packets into a pull stream".to_owned(),
166            )),
167            StreamSpine::Push(spine) => spine.push(item),
168        }
169    }
170
171    /// Closes the stream to further input, leaving buffered items to drain.
172    pub fn close_push(&self) -> Result<()> {
173        match &self.spine {
174            StreamSpine::Pull(spine) => spine.close(),
175            StreamSpine::Push(spine) => spine.close(),
176        }
177    }
178
179    /// Pulls the next packet, or `None` if the stream is currently empty or
180    /// exhausted.
181    ///
182    /// Does not block; on a push stream an empty-but-open queue yields `None`.
183    pub fn next_packet(&self) -> Result<Option<StreamItem>> {
184        match &self.spine {
185            StreamSpine::Pull(spine) => spine.next(),
186            StreamSpine::Push(spine) => spine.next(),
187        }
188    }
189
190    /// Pulls the next packet, blocking up to `timeout` for one to arrive.
191    ///
192    /// On a pull stream this is equivalent to [`StreamValue::next_packet`]; on a
193    /// push stream it waits for a producer up to `timeout` before yielding
194    /// `None`.
195    pub fn next_packet_timeout(&self, timeout: Duration) -> Result<Option<StreamItem>> {
196        match &self.spine {
197            StreamSpine::Pull(spine) => spine.next(),
198            StreamSpine::Push(spine) => spine.next_timeout(timeout),
199        }
200    }
201
202    /// Returns a clone of the next packet without consuming it.
203    pub fn peek_packet(&self) -> Result<Option<StreamItem>> {
204        match &self.spine {
205            StreamSpine::Pull(spine) => spine.peek(),
206            StreamSpine::Push(spine) => spine.peek(),
207        }
208    }
209
210    /// Reports whether the stream is exhausted and will yield no more packets.
211    pub fn is_done(&self) -> Result<bool> {
212        match &self.spine {
213            StreamSpine::Pull(spine) => spine.is_done(),
214            StreamSpine::Push(spine) => spine.is_done(),
215        }
216    }
217
218    /// Pulls up to `limit` packets, stopping early when the stream runs dry.
219    pub fn take_packets(&self, limit: usize) -> Result<Vec<StreamItem>> {
220        let mut out = Vec::new();
221        for _ in 0..limit {
222            let Some(item) = self.next_packet()? else {
223                break;
224            };
225            out.push(item);
226        }
227        Ok(out)
228    }
229
230    /// Drains the stream into a vector of sequenced packet events.
231    ///
232    /// Pulls every currently available packet, emitting one packet [`Event`] per
233    /// item numbered from `start_seq`, and appends a terminal `done` event if
234    /// the stream is exhausted. Events are attributed to `run`.
235    pub fn run_events(&self, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
236        let mut seq = start_seq;
237        let mut out = Vec::new();
238        while let Some(item) = self.next_packet()? {
239            out.push(item.chunk_event(cx, run.clone(), seq)?);
240            seq = seq.saturating_add(1);
241        }
242        if self.is_done()? {
243            out.push(Event::done(run, seq)?);
244        }
245        Ok(out)
246    }
247
248    /// Cancels the stream: closes it and discards any buffered packets.
249    pub fn cancel(&self) -> Result<()> {
250        match &self.spine {
251            StreamSpine::Pull(spine) => spine.cancel(),
252            StreamSpine::Push(spine) => spine.cancel(),
253        }
254    }
255
256    /// Returns a snapshot of the stream's lifetime [`StreamStats`].
257    pub fn stats(&self) -> Result<StreamStats> {
258        match &self.spine {
259            StreamSpine::Pull(spine) => spine.stats(),
260            StreamSpine::Push(spine) => spine.stats(),
261        }
262    }
263
264    /// Returns the number of packets currently buffered in the spine.
265    pub fn queue_depth(&self) -> Result<usize> {
266        match &self.spine {
267            StreamSpine::Pull(spine) => spine.depth(),
268            StreamSpine::Push(spine) => spine.depth(),
269        }
270    }
271
272    /// Builds an [`StreamEventSource`] that feeds this stream's packets into the
273    /// run ledger as sequenced events numbered from `start_seq`.
274    pub fn event_source(self: &Arc<Self>, run: Ref, start_seq: u64) -> Arc<StreamEventSource> {
275        Arc::new(StreamEventSource::new(Arc::clone(self), run, start_seq))
276    }
277}
278
279impl Object for StreamValue {
280    fn display(&self, _cx: &mut Cx) -> Result<String> {
281        Ok(format!("#<stream {}>", self.metadata.id()))
282    }
283
284    fn as_any(&self) -> &dyn std::any::Any {
285        self
286    }
287}
288
289impl ObjectCompat for StreamValue {
290    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
291        cx.factory().class_stub(
292            CORE_SEQUENCE_CLASS_ID,
293            Symbol::qualified("stream", "Stream"),
294        )
295    }
296
297    fn as_sequence(&self) -> Option<&dyn Sequence> {
298        Some(self)
299    }
300}
301
302impl Sequence for StreamValue {
303    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
304        self.next_packet()?
305            .map(|item| item.sequence_item(cx))
306            .transpose()
307    }
308
309    fn close(&self, _cx: &mut Cx) -> Result<()> {
310        self.cancel()
311    }
312
313    fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
314        self.peek_packet()?
315            .map(|item| item.sequence_item(cx))
316            .transpose()
317    }
318
319    fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
320        self.is_done()
321    }
322}
323
324/// `stream/next!`: pulls and consumes the next packet from `stream`.
325pub fn stream_next_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
326    stream.next_packet()
327}
328
329/// `stream/peek!`: returns the next packet of `stream` without consuming it.
330pub fn stream_peek_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
331    stream.peek_packet()
332}
333
334/// `stream/done?`: reports whether `stream` is exhausted.
335pub fn stream_done_q(stream: &StreamValue) -> Result<bool> {
336    stream.is_done()
337}
338
339/// `stream/take`: pulls up to `limit` packets from `stream`.
340pub fn stream_take(stream: &StreamValue, limit: usize) -> Result<Vec<StreamItem>> {
341    stream.take_packets(limit)
342}
343
344/// `stream/run!`: drains `stream` into sequenced events numbered from
345/// `start_seq`.
346pub fn stream_run_bang(
347    stream: &StreamValue,
348    cx: &mut Cx,
349    run: Ref,
350    start_seq: u64,
351) -> Result<Vec<Event>> {
352    stream.run_events(cx, run, start_seq)
353}
354
355/// `stream/cancel!`: cancels `stream`, discarding any buffered packets.
356pub fn stream_cancel_bang(stream: &StreamValue) -> Result<()> {
357    stream.cancel()
358}
359
360/// `stream/stats`: returns a snapshot of `stream`'s lifetime counters.
361pub fn stream_stats(stream: &StreamValue) -> Result<StreamStats> {
362    stream.stats()
363}
364
365/// `stream/metadata`: returns `stream`'s immutable metadata.
366pub fn stream_metadata(stream: &StreamValue) -> &StreamMetadata {
367    stream.metadata()
368}
369
370/// The kernel [`Symbol`] naming the `stream/next!` operation.
371pub fn stream_next_symbol() -> Symbol {
372    Symbol::qualified("stream", "next!")
373}
374
375/// The kernel [`Symbol`] naming the `stream/peek!` operation.
376pub fn stream_peek_symbol() -> Symbol {
377    Symbol::qualified("stream", "peek!")
378}
379
380/// The kernel [`Symbol`] naming the `stream/done?` operation.
381pub fn stream_done_symbol() -> Symbol {
382    Symbol::qualified("stream", "done?")
383}
384
385/// The kernel [`Symbol`] naming the `stream/take` operation.
386pub fn stream_take_symbol() -> Symbol {
387    Symbol::qualified("stream", "take")
388}
389
390/// The kernel [`Symbol`] naming the `stream/run!` operation.
391pub fn stream_run_symbol() -> Symbol {
392    Symbol::qualified("stream", "run!")
393}
394
395/// The kernel [`Symbol`] naming the `stream/cancel!` operation.
396pub fn stream_cancel_symbol() -> Symbol {
397    Symbol::qualified("stream", "cancel!")
398}
399
400/// The kernel [`Symbol`] naming the `stream/stats` operation.
401pub fn stream_stats_symbol() -> Symbol {
402    Symbol::qualified("stream", "stats")
403}
404
405/// The kernel [`Symbol`] naming the `stream/metadata` operation.
406pub fn stream_metadata_symbol() -> Symbol {
407    Symbol::qualified("stream", "metadata")
408}