sim-lib-stream-core 0.1.0

Core stream metadata, packets, envelopes, and buffer values.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
//! The stream spine: the runtime-visible stream value and its base combinators.
//!
//! A [`StreamValue`] is the homogeneous stream object the runtime hands around.
//! It pairs immutable [`StreamMetadata`] with one of two internal spines -- a
//! pull spine over a fixed buffer of pre-built items, or a push spine whose
//! bounded queue is fed by an external producer. Both are driven through one
//! base combinator surface: `next`/`peek`/`take`/`run`/`cancel`/`done?`/
//! `metadata`/`stats`, exposed both as Rust methods on [`StreamValue`] and as
//! free `stream_*_bang` verb helpers paired with `stream_*_symbol` helpers that
//! name the corresponding kernel [`Symbol`].
//!
//! The unit of flow is a [`StreamItem`]: a [`StreamPacket`] plus the clock-
//! domain [`Tick`]s observed at it. The kernel defines the protocol contracts
//! ([`Sequence`], [`Object`], [`Event`], [`Symbol`]); this module supplies the
//! concrete streaming-fabric behavior over them.

mod event_source;
mod queue;

use std::{sync::Arc, time::Duration};

use sim_citizen_derive::non_citizen;
use sim_kernel::{
    CORE_SEQUENCE_CLASS_ID, ClassRef, Cx, Error, Event, Object, ObjectCompat, Ref, Result,
    Sequence, SequenceItem, Symbol, Tick, Value, stream_surface::stream_packet_event,
    validate_ticks,
};

use crate::{StreamMetadata, StreamPacket, publish_metadata_claims};

pub use event_source::StreamEventSource;
use queue::{PullSpine, PushSpine};
pub use queue::{PushResult, StreamStats};

/// One unit of flow through the spine: a packet plus its clock-domain ticks.
///
/// Couples a [`StreamPacket`] payload with the [`Tick`]s observed at it, so a
/// packet carries its clock-domain context as it moves through the stream. An
/// item can be projected into a runtime [`Value`], a [`SequenceItem`], or a
/// sequenced packet [`Event`].
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamItem {
    packet: StreamPacket,
    ticks: Vec<Tick>,
}

impl StreamItem {
    /// Creates an item carrying `packet` with no ticks.
    pub fn new(packet: StreamPacket) -> Self {
        Self {
            packet,
            ticks: Vec::new(),
        }
    }

    /// Creates an item carrying `packet` with `ticks`, validating the ticks.
    ///
    /// Returns an error if `ticks` is not a valid clock-domain reading.
    pub fn with_ticks(packet: StreamPacket, ticks: Vec<Tick>) -> Result<Self> {
        validate_ticks(&ticks)?;
        Ok(Self { packet, ticks })
    }

    /// Returns the packet payload.
    pub fn packet(&self) -> &StreamPacket {
        &self.packet
    }

    /// Returns the clock-domain ticks observed at this item.
    pub fn ticks(&self) -> &[Tick] {
        &self.ticks
    }

    /// Materializes the packet payload as a runtime [`Value`].
    pub fn packet_value(&self, cx: &mut Cx) -> Result<Value> {
        cx.factory().expr(self.packet.to_expr())
    }

    /// Projects this item into a [`SequenceItem`], preserving its ticks.
    pub fn sequence_item(&self, cx: &mut Cx) -> Result<SequenceItem> {
        SequenceItem::with_ticks(self.packet_value(cx)?, self.ticks.clone())
    }

    /// Builds a sequenced packet [`Event`] for `run` at sequence number `seq`.
    pub fn chunk_event(&self, cx: &mut Cx, run: Ref, seq: u64) -> Result<Event> {
        let payload = self.packet.intern_ref(cx)?;
        stream_packet_event(run, seq, self.ticks.clone(), payload)
    }
}

/// The runtime-visible stream value: metadata plus a live spine.
///
/// A `StreamValue` is the homogeneous stream object the runtime passes around.
/// It is a non-citizen live handle (it is not serialized directly; consumers
/// reconstruct the `stream/Packet` and `stream/Metadata` descriptors and
/// realize them separately). Build a finite, replayable stream with
/// [`StreamValue::pull`] or a producer-fed stream with [`StreamValue::push`],
/// then drive it through the base combinators ([`next_packet`], [`peek_packet`],
/// [`take_packets`], [`run_events`], [`cancel`], [`is_done`]).
///
/// As a kernel [`Object`] it presents as a [`Sequence`], so it interoperates
/// with sequence-consuming operations directly.
///
/// [`next_packet`]: StreamValue::next_packet
/// [`peek_packet`]: StreamValue::peek_packet
/// [`take_packets`]: StreamValue::take_packets
/// [`run_events`]: StreamValue::run_events
/// [`cancel`]: StreamValue::cancel
/// [`is_done`]: StreamValue::is_done
#[non_citizen(
    reason = "live stream spine; reconstruct stream/Packet and stream/Metadata descriptors then realize separately",
    kind = "handle",
    descriptor = "stream/Packet"
)]
pub struct StreamValue {
    metadata: StreamMetadata,
    spine: StreamSpine,
}

enum StreamSpine {
    Pull(PullSpine),
    Push(PushSpine),
}

impl StreamValue {
    /// Builds a pull stream that yields the given pre-built `items` in order.
    ///
    /// A pull stream is finite and self-draining: it serves `items` until
    /// exhausted, then reports done. It rejects pushed packets.
    pub fn pull(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
        Self {
            metadata,
            spine: StreamSpine::Pull(PullSpine::new(items)),
        }
    }

    /// Builds a push stream fed by an external producer.
    ///
    /// The stream starts empty; producers call [`StreamValue::push_packet`] to
    /// enqueue items under the buffer policy carried by `metadata`, and
    /// consumers pull them out.
    pub fn push(metadata: StreamMetadata) -> Self {
        Self {
            spine: StreamSpine::Push(PushSpine::new(metadata.buffer().clone())),
            metadata,
        }
    }

    /// Returns the stream's immutable metadata.
    pub fn metadata(&self) -> &StreamMetadata {
        &self.metadata
    }

    /// Publishes this stream's metadata as claims about `subject` into `cx`.
    pub fn publish_claims(&self, cx: &mut Cx, subject: Ref) -> Result<()> {
        publish_metadata_claims(cx, subject, &self.metadata)
    }

    /// Pushes a packet into a push stream, returning the backpressure outcome.
    ///
    /// Returns an error when called on a pull stream, which accepts no input.
    pub fn push_packet(&self, item: StreamItem) -> Result<PushResult> {
        match &self.spine {
            StreamSpine::Pull(_) => Err(Error::Eval(
                "cannot push packets into a pull stream".to_owned(),
            )),
            StreamSpine::Push(spine) => spine.push(item),
        }
    }

    /// Closes the stream to further input, leaving buffered items to drain.
    pub fn close_push(&self) -> Result<()> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.close(),
            StreamSpine::Push(spine) => spine.close(),
        }
    }

    /// Pulls the next packet, or `None` if the stream is currently empty or
    /// exhausted.
    ///
    /// Does not block; on a push stream an empty-but-open queue yields `None`.
    pub fn next_packet(&self) -> Result<Option<StreamItem>> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.next(),
            StreamSpine::Push(spine) => spine.next(),
        }
    }

    /// Pulls the next packet, blocking up to `timeout` for one to arrive.
    ///
    /// On a pull stream this is equivalent to [`StreamValue::next_packet`]; on a
    /// push stream it waits for a producer up to `timeout` before yielding
    /// `None`.
    pub fn next_packet_timeout(&self, timeout: Duration) -> Result<Option<StreamItem>> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.next(),
            StreamSpine::Push(spine) => spine.next_timeout(timeout),
        }
    }

    /// Returns a clone of the next packet without consuming it.
    pub fn peek_packet(&self) -> Result<Option<StreamItem>> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.peek(),
            StreamSpine::Push(spine) => spine.peek(),
        }
    }

    /// Reports whether the stream is exhausted and will yield no more packets.
    pub fn is_done(&self) -> Result<bool> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.is_done(),
            StreamSpine::Push(spine) => spine.is_done(),
        }
    }

    /// Pulls up to `limit` packets, stopping early when the stream runs dry.
    pub fn take_packets(&self, limit: usize) -> Result<Vec<StreamItem>> {
        let mut out = Vec::new();
        for _ in 0..limit {
            let Some(item) = self.next_packet()? else {
                break;
            };
            out.push(item);
        }
        Ok(out)
    }

    /// Drains the stream into a vector of sequenced packet events.
    ///
    /// Pulls every currently available packet, emitting one packet [`Event`] per
    /// item numbered from `start_seq`, and appends a terminal `done` event if
    /// the stream is exhausted. Events are attributed to `run`.
    pub fn run_events(&self, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
        let mut seq = start_seq;
        let mut out = Vec::new();
        while let Some(item) = self.next_packet()? {
            out.push(item.chunk_event(cx, run.clone(), seq)?);
            seq = seq.saturating_add(1);
        }
        if self.is_done()? {
            out.push(Event::done(run, seq)?);
        }
        Ok(out)
    }

    /// Cancels the stream: closes it and discards any buffered packets.
    pub fn cancel(&self) -> Result<()> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.cancel(),
            StreamSpine::Push(spine) => spine.cancel(),
        }
    }

    /// Returns a snapshot of the stream's lifetime [`StreamStats`].
    pub fn stats(&self) -> Result<StreamStats> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.stats(),
            StreamSpine::Push(spine) => spine.stats(),
        }
    }

    /// Returns the number of packets currently buffered in the spine.
    pub fn queue_depth(&self) -> Result<usize> {
        match &self.spine {
            StreamSpine::Pull(spine) => spine.depth(),
            StreamSpine::Push(spine) => spine.depth(),
        }
    }

    /// Builds an [`StreamEventSource`] that feeds this stream's packets into the
    /// run ledger as sequenced events numbered from `start_seq`.
    pub fn event_source(self: &Arc<Self>, run: Ref, start_seq: u64) -> Arc<StreamEventSource> {
        Arc::new(StreamEventSource::new(Arc::clone(self), run, start_seq))
    }
}

impl Object for StreamValue {
    fn display(&self, _cx: &mut Cx) -> Result<String> {
        Ok(format!("#<stream {}>", self.metadata.id()))
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

impl ObjectCompat for StreamValue {
    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
        cx.factory().class_stub(
            CORE_SEQUENCE_CLASS_ID,
            Symbol::qualified("stream", "Stream"),
        )
    }

    fn as_sequence(&self) -> Option<&dyn Sequence> {
        Some(self)
    }
}

impl Sequence for StreamValue {
    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
        self.next_packet()?
            .map(|item| item.sequence_item(cx))
            .transpose()
    }

    fn close(&self, _cx: &mut Cx) -> Result<()> {
        self.cancel()
    }

    fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
        self.peek_packet()?
            .map(|item| item.sequence_item(cx))
            .transpose()
    }

    fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
        self.is_done()
    }
}

/// `stream/next!`: pulls and consumes the next packet from `stream`.
pub fn stream_next_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
    stream.next_packet()
}

/// `stream/peek!`: returns the next packet of `stream` without consuming it.
pub fn stream_peek_bang(stream: &StreamValue) -> Result<Option<StreamItem>> {
    stream.peek_packet()
}

/// `stream/done?`: reports whether `stream` is exhausted.
pub fn stream_done_q(stream: &StreamValue) -> Result<bool> {
    stream.is_done()
}

/// `stream/take`: pulls up to `limit` packets from `stream`.
pub fn stream_take(stream: &StreamValue, limit: usize) -> Result<Vec<StreamItem>> {
    stream.take_packets(limit)
}

/// `stream/run!`: drains `stream` into sequenced events numbered from
/// `start_seq`.
pub fn stream_run_bang(
    stream: &StreamValue,
    cx: &mut Cx,
    run: Ref,
    start_seq: u64,
) -> Result<Vec<Event>> {
    stream.run_events(cx, run, start_seq)
}

/// `stream/cancel!`: cancels `stream`, discarding any buffered packets.
pub fn stream_cancel_bang(stream: &StreamValue) -> Result<()> {
    stream.cancel()
}

/// `stream/stats`: returns a snapshot of `stream`'s lifetime counters.
pub fn stream_stats(stream: &StreamValue) -> Result<StreamStats> {
    stream.stats()
}

/// `stream/metadata`: returns `stream`'s immutable metadata.
pub fn stream_metadata(stream: &StreamValue) -> &StreamMetadata {
    stream.metadata()
}

/// The kernel [`Symbol`] naming the `stream/next!` operation.
pub fn stream_next_symbol() -> Symbol {
    Symbol::qualified("stream", "next!")
}

/// The kernel [`Symbol`] naming the `stream/peek!` operation.
pub fn stream_peek_symbol() -> Symbol {
    Symbol::qualified("stream", "peek!")
}

/// The kernel [`Symbol`] naming the `stream/done?` operation.
pub fn stream_done_symbol() -> Symbol {
    Symbol::qualified("stream", "done?")
}

/// The kernel [`Symbol`] naming the `stream/take` operation.
pub fn stream_take_symbol() -> Symbol {
    Symbol::qualified("stream", "take")
}

/// The kernel [`Symbol`] naming the `stream/run!` operation.
pub fn stream_run_symbol() -> Symbol {
    Symbol::qualified("stream", "run!")
}

/// The kernel [`Symbol`] naming the `stream/cancel!` operation.
pub fn stream_cancel_symbol() -> Symbol {
    Symbol::qualified("stream", "cancel!")
}

/// The kernel [`Symbol`] naming the `stream/stats` operation.
pub fn stream_stats_symbol() -> Symbol {
    Symbol::qualified("stream", "stats")
}

/// The kernel [`Symbol`] naming the `stream/metadata` operation.
pub fn stream_metadata_symbol() -> Symbol {
    Symbol::qualified("stream", "metadata")
}