Skip to main content

sim_kernel/
seq.rs

1//! The [`Sequence`] contract: pull-based iteration over runtime values.
2//!
3//! The kernel defines the sequence protocol and its next/close/peek surface,
4//! including bridging events into sequence items; libraries supply the concrete
5//! sequence sources.
6
7use std::{
8    collections::VecDeque,
9    sync::{Arc, Mutex},
10};
11
12use crate::{
13    env::Cx,
14    error::{Error, Result},
15    event::{Event, EventKind, EventSource, Tick, validate_ticks},
16    expr::Expr,
17    id::{CORE_SEQUENCE_CLASS_ID, Symbol},
18    list::ListValue,
19    object::{ClassRef, Object},
20    ref_id::Ref,
21    ref_resolver::value_from_ref,
22    stream::Stream,
23    term::Term,
24    value::Value,
25};
26
27/// One item produced by a sequence: a value plus its clock ticks.
28#[derive(Clone, Debug)]
29pub struct SequenceItem {
30    value: Value,
31    ticks: Vec<Tick>,
32}
33
34impl SequenceItem {
35    /// Builds an item carrying a value with no ticks.
36    pub fn new(value: Value) -> Self {
37        Self {
38            value,
39            ticks: Vec::new(),
40        }
41    }
42
43    /// Builds an item with ticks, validating their clock ordering.
44    pub fn with_ticks(value: Value, ticks: Vec<Tick>) -> Result<Self> {
45        validate_ticks(&ticks)?;
46        Ok(Self { value, ticks })
47    }
48
49    /// Borrows the carried value.
50    pub fn value(&self) -> &Value {
51        &self.value
52    }
53
54    /// Borrows the carried clock ticks.
55    pub fn ticks(&self) -> &[Tick] {
56        &self.ticks
57    }
58
59    /// Projects the item to a runtime value, folding in any ticks.
60    pub fn into_value(self, cx: &mut Cx) -> Result<Value> {
61        sequence_item_value(cx, self)
62    }
63}
64
65/// Pull-based iteration protocol over runtime values.
66pub trait Sequence: Send + Sync {
67    /// Pulls the next item, or `Ok(None)` once exhausted.
68    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>>;
69
70    /// Releases resources backing the sequence; idempotent by default.
71    fn close(&self, _cx: &mut Cx) -> Result<()> {
72        Ok(())
73    }
74
75    /// Peeks the next item without consuming it; unsupported by default.
76    fn peek_item(&self, _cx: &mut Cx) -> Result<Option<SequenceItem>> {
77        Err(Error::Eval("sequence does not support peek".to_owned()))
78    }
79
80    /// Whether the sequence is known to be exhausted; `false` by default.
81    fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
82        Ok(false)
83    }
84}
85
86/// Pulls the next item from a sequence.
87pub fn seq_next(cx: &mut Cx, sequence: &dyn Sequence) -> Result<Option<SequenceItem>> {
88    sequence.next_item(cx)
89}
90
91/// Closes a sequence.
92pub fn seq_close(cx: &mut Cx, sequence: &dyn Sequence) -> Result<()> {
93    sequence.close(cx)
94}
95
96/// Peeks the next item of a sequence without consuming it.
97pub fn seq_peek(cx: &mut Cx, sequence: &dyn Sequence) -> Result<Option<SequenceItem>> {
98    sequence.peek_item(cx)
99}
100
101/// Reports whether a sequence is exhausted.
102pub fn seq_is_done(cx: &mut Cx, sequence: &dyn Sequence) -> Result<bool> {
103    sequence.is_done(cx)
104}
105
106/// Pulls the next item from a value that is a sequence or a stream.
107pub fn seq_next_value(cx: &mut Cx, target: &Value) -> Result<Option<SequenceItem>> {
108    if let Some(sequence) = target.object().as_sequence() {
109        return seq_next(cx, sequence);
110    }
111    if let Some(stream) = target.object().as_stream() {
112        return stream.next(cx).map(|item| item.map(SequenceItem::new));
113    }
114    Err(Error::TypeMismatch {
115        expected: "sequence",
116        found: "non-sequence",
117    })
118}
119
120/// Closes a value that is a sequence or a stream.
121pub fn seq_close_value(cx: &mut Cx, target: &Value) -> Result<()> {
122    if let Some(sequence) = target.object().as_sequence() {
123        return seq_close(cx, sequence);
124    }
125    if let Some(stream) = target.object().as_stream() {
126        return stream.close(cx);
127    }
128    Err(Error::TypeMismatch {
129        expected: "sequence",
130        found: "non-sequence",
131    })
132}
133
134/// Projects a sequence item to a value, wrapping it in a payload/ticks table
135/// when ticks are present.
136pub fn sequence_item_value(cx: &mut Cx, item: SequenceItem) -> Result<Value> {
137    if item.ticks.is_empty() {
138        return Ok(item.value);
139    }
140
141    let ticks = item
142        .ticks
143        .into_iter()
144        .map(|tick| tick_value(cx, tick))
145        .collect::<Result<Vec<_>>>()?;
146    let ticks = cx.factory().list(ticks)?;
147    cx.factory().table(vec![
148        (Symbol::new("payload"), item.value),
149        (Symbol::new("ticks"), ticks),
150    ])
151}
152
153/// Maps an event into a sequence item, or `Ok(None)` for non-payload events;
154/// failure events are surfaced as errors.
155pub fn sequence_item_from_event(cx: &mut Cx, event: Event) -> Result<Option<SequenceItem>> {
156    match event.kind {
157        EventKind::Chunk { payload } | EventKind::Final(payload) => {
158            SequenceItem::with_ticks(value_from_ref(cx, &payload)?, event.ticks).map(Some)
159        }
160        EventKind::Failed(error) => Err(error_from_failed_ref(cx, &error)),
161        EventKind::Started { .. }
162        | EventKind::Claim { .. }
163        | EventKind::Diagnostic(_)
164        | EventKind::Trace(_)
165        | EventKind::EffectRequested { .. }
166        | EventKind::EffectResolved { .. }
167        | EventKind::Capture { .. }
168        | EventKind::Card { .. }
169        | EventKind::Done => Ok(None),
170    }
171}
172
173// sim-non-citizen(reason = "live event-source sequence adapter; descriptor data is the underlying event source", kind = "handle", descriptor = "core/EventSource")
174/// Sequence adapter that drains an [`EventSource`] into sequence items.
175pub struct EventSourceSequence {
176    source: Arc<dyn EventSource>,
177    state: Mutex<EventSourceSequenceState>,
178}
179
180#[derive(Debug, Default)]
181struct EventSourceSequenceState {
182    pending: VecDeque<Event>,
183    done: bool,
184}
185
186impl EventSourceSequence {
187    /// Wraps an event source as a sequence.
188    pub fn new(source: Arc<dyn EventSource>) -> Self {
189        Self {
190            source,
191            state: Mutex::new(EventSourceSequenceState::default()),
192        }
193    }
194}
195
196impl Object for EventSourceSequence {
197    fn display(&self, _cx: &mut Cx) -> Result<String> {
198        Ok("#<event-source-sequence>".to_owned())
199    }
200
201    fn as_any(&self) -> &dyn std::any::Any {
202        self
203    }
204}
205
206impl crate::ObjectCompat for EventSourceSequence {
207    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
208        cx.factory().class_stub(
209            CORE_SEQUENCE_CLASS_ID,
210            Symbol::qualified("core", "EventSourceSequence"),
211        )
212    }
213    fn as_sequence(&self) -> Option<&dyn Sequence> {
214        Some(self)
215    }
216}
217
218impl Sequence for EventSourceSequence {
219    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
220        while let Some(event) = self.next_event(cx)? {
221            let done = matches!(event.kind, EventKind::Done);
222            if let Some(item) = sequence_item_from_event(cx, event)? {
223                return Ok(Some(item));
224            }
225            if done {
226                self.mark_done()?;
227                return Ok(None);
228            }
229        }
230        Ok(None)
231    }
232
233    fn close(&self, cx: &mut Cx) -> Result<()> {
234        {
235            let mut state = self
236                .state
237                .lock()
238                .map_err(|_| Error::PoisonedLock("event source sequence"))?;
239            state.pending.clear();
240            state.done = true;
241        }
242        self.source.close(cx)
243    }
244
245    fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
246        loop {
247            if let Some(item) = self.pending_item(cx)? {
248                return Ok(Some(item));
249            }
250            if self.is_done(cx)? {
251                return Ok(None);
252            }
253            let Some(event) = self.source.next(cx)? else {
254                self.mark_done()?;
255                return Ok(None);
256            };
257            let done = matches!(event.kind, EventKind::Done);
258            self.push_pending(event)?;
259            if done {
260                self.mark_done()?;
261            }
262        }
263    }
264
265    fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
266        let state = self
267            .state
268            .lock()
269            .map_err(|_| Error::PoisonedLock("event source sequence"))?;
270        Ok(state.done && state.pending.is_empty())
271    }
272}
273
274impl EventSourceSequence {
275    fn next_event(&self, cx: &mut Cx) -> Result<Option<Event>> {
276        {
277            let mut state = self
278                .state
279                .lock()
280                .map_err(|_| Error::PoisonedLock("event source sequence"))?;
281            if let Some(event) = state.pending.pop_front() {
282                return Ok(Some(event));
283            }
284            if state.done {
285                return Ok(None);
286            }
287        }
288
289        match self.source.next(cx)? {
290            Some(event) => Ok(Some(event)),
291            None => {
292                self.mark_done()?;
293                Ok(None)
294            }
295        }
296    }
297
298    fn pending_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
299        let events = self
300            .state
301            .lock()
302            .map_err(|_| Error::PoisonedLock("event source sequence"))?
303            .pending
304            .iter()
305            .cloned()
306            .collect::<Vec<_>>();
307        for event in events {
308            if let Some(item) = sequence_item_from_event(cx, event)? {
309                return Ok(Some(item));
310            }
311        }
312        Ok(None)
313    }
314
315    fn push_pending(&self, event: Event) -> Result<()> {
316        let mut state = self
317            .state
318            .lock()
319            .map_err(|_| Error::PoisonedLock("event source sequence"))?;
320        state.pending.push_back(event);
321        Ok(())
322    }
323
324    fn mark_done(&self) -> Result<()> {
325        let mut state = self
326            .state
327            .lock()
328            .map_err(|_| Error::PoisonedLock("event source sequence"))?;
329        state.done = true;
330        Ok(())
331    }
332}
333
334// sim-non-citizen(reason = "live list sequence cursor; canonical list data remains the source list value", kind = "handle", descriptor = "core/List")
335/// Sequence cursor that walks the spine of a kernel list value.
336#[derive(Debug)]
337pub struct ListSequence {
338    state: Mutex<ListSequenceState>,
339}
340
341#[derive(Debug)]
342struct ListSequenceState {
343    next: Option<Value>,
344    closed: bool,
345}
346
347impl ListSequence {
348    /// Builds a sequence that iterates the given list value.
349    pub fn new(list: Value) -> Self {
350        Self {
351            state: Mutex::new(ListSequenceState {
352                next: Some(list),
353                closed: false,
354            }),
355        }
356    }
357}
358
359impl Object for ListSequence {
360    fn display(&self, _cx: &mut Cx) -> Result<String> {
361        Ok("#<list-sequence>".to_owned())
362    }
363
364    fn as_any(&self) -> &dyn std::any::Any {
365        self
366    }
367}
368
369impl crate::ObjectCompat for ListSequence {
370    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
371        cx.factory().class_stub(
372            CORE_SEQUENCE_CLASS_ID,
373            Symbol::qualified("core", "Sequence"),
374        )
375    }
376    fn as_sequence(&self) -> Option<&dyn Sequence> {
377        Some(self)
378    }
379}
380
381impl Sequence for ListSequence {
382    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
383        let mut state = self
384            .state
385            .lock()
386            .map_err(|_| Error::PoisonedLock("list sequence"))?;
387        if state.closed {
388            return Ok(None);
389        }
390
391        let Some(node) = state.next.clone() else {
392            state.closed = true;
393            return Ok(None);
394        };
395        let list = list_from_value(&node)?;
396        if list.is_empty(cx)? {
397            state.next = None;
398            state.closed = true;
399            return Ok(None);
400        }
401
402        let value = list
403            .car(cx)?
404            .ok_or_else(|| Error::Eval("list car missing for non-empty list".to_owned()))?;
405        state.next = list.cdr(cx)?;
406        Ok(Some(SequenceItem::new(value)))
407    }
408
409    fn close(&self, _cx: &mut Cx) -> Result<()> {
410        let mut state = self
411            .state
412            .lock()
413            .map_err(|_| Error::PoisonedLock("list sequence"))?;
414        state.next = None;
415        state.closed = true;
416        Ok(())
417    }
418
419    fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
420        let mut state = self
421            .state
422            .lock()
423            .map_err(|_| Error::PoisonedLock("list sequence"))?;
424        if state.closed {
425            return Ok(None);
426        }
427
428        let Some(node) = state.next.as_ref() else {
429            state.closed = true;
430            return Ok(None);
431        };
432        let list = list_from_value(node)?;
433        if list.is_empty(cx)? {
434            state.next = None;
435            state.closed = true;
436            return Ok(None);
437        }
438        list.car(cx).map(|item| item.map(SequenceItem::new))
439    }
440
441    fn is_done(&self, cx: &mut Cx) -> Result<bool> {
442        let mut state = self
443            .state
444            .lock()
445            .map_err(|_| Error::PoisonedLock("list sequence"))?;
446        if state.closed {
447            return Ok(true);
448        }
449
450        let Some(node) = state.next.as_ref() else {
451            state.closed = true;
452            return Ok(true);
453        };
454        let done = list_from_value(node)?.is_empty(cx)?;
455        if done {
456            state.next = None;
457            state.closed = true;
458        }
459        Ok(done)
460    }
461}
462
463impl<T> Sequence for T
464where
465    T: Stream + ?Sized,
466{
467    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
468        self.next(cx).map(|item| item.map(SequenceItem::new))
469    }
470
471    fn close(&self, cx: &mut Cx) -> Result<()> {
472        Stream::close(self, cx)
473    }
474}
475
476fn list_from_value(value: &Value) -> Result<&dyn ListValue> {
477    value.object().as_list().ok_or(Error::TypeMismatch {
478        expected: "list",
479        found: "non-list",
480    })
481}
482
483fn tick_value(cx: &mut Cx, tick: Tick) -> Result<Value> {
484    let clock = cx.factory().symbol(tick.clock)?;
485    let index = ref_value(cx, tick.index)?;
486    cx.factory().table(vec![
487        (Symbol::new("clock"), clock),
488        (Symbol::new("index"), index),
489    ])
490}
491
492fn ref_value(cx: &mut Cx, reference: Ref) -> Result<Value> {
493    cx.factory().expr(Expr::from(Term::Ref(reference)))
494}
495
496fn error_from_failed_ref(cx: &mut Cx, reference: &Ref) -> Error {
497    match value_from_ref(cx, reference) {
498        Ok(value) => match value.object().display(cx) {
499            Ok(message) => Error::Eval(message),
500            Err(err) => err,
501        },
502        Err(err) => err,
503    }
504}