Skip to main content

sim_lib_sequence/
transducer.rs

1use std::sync::Arc;
2
3use sim_kernel::{
4    CORE_SEQUENCE_CLASS_ID, ClassRef, Cx, Error, Object, ObjectCompat, Result, Sequence,
5    SequenceItem, Symbol, Value, seq_next_value,
6};
7
8type ValueMapper = Arc<dyn Fn(&mut Cx, Value) -> Result<Value> + Send + Sync + 'static>;
9type ValuePredicate = Arc<dyn Fn(&mut Cx, &Value) -> Result<bool> + Send + Sync + 'static>;
10type ValueReducer = Arc<dyn Fn(&mut Cx, Value, Value) -> Result<Value> + Send + Sync + 'static>;
11type ValueVisitor = Arc<dyn Fn(&mut Cx, Value) -> Result<()> + Send + Sync + 'static>;
12
13/// A single stage of a [`TransducerPipeline`].
14#[derive(Clone)]
15pub enum TransducerStep {
16    /// Transform each element with the given mapper.
17    Map(ValueMapper),
18    /// Keep only elements satisfying the given predicate.
19    Filter(ValuePredicate),
20}
21
22/// An ordered chain of [`TransducerStep`]s applied per element in one pass.
23///
24/// The composable core of the sequence organ's map/filter/reduce surface:
25/// steps are fused so a source is traversed once, returning `None` for dropped
26/// elements.
27#[derive(Clone, Default)]
28pub struct TransducerPipeline {
29    steps: Vec<TransducerStep>,
30}
31
32impl TransducerPipeline {
33    /// Start an empty pipeline.
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Append a mapping step, consuming and returning the pipeline.
39    pub fn map(mut self, mapper: ValueMapper) -> Self {
40        self.steps.push(TransducerStep::Map(mapper));
41        self
42    }
43
44    /// Append a filtering step, consuming and returning the pipeline.
45    pub fn filter(mut self, predicate: ValuePredicate) -> Self {
46        self.steps.push(TransducerStep::Filter(predicate));
47        self
48    }
49
50    /// Run one element through the pipeline.
51    ///
52    /// Returns `Some(value)` if the element survives every step, or `None` if a
53    /// filter step drops it.
54    pub fn apply(&self, cx: &mut Cx, mut value: Value) -> Result<Option<Value>> {
55        for step in &self.steps {
56            match step {
57                TransducerStep::Map(mapper) => value = mapper(cx, value)?,
58                TransducerStep::Filter(predicate) => {
59                    if !predicate(cx, &value)? {
60                        return Ok(None);
61                    }
62                }
63            }
64        }
65        Ok(Some(value))
66    }
67}
68
69#[sim_citizen_derive::non_citizen(
70    reason = "transduced sequence adapter; reconstruct from the source sequence and transducer pipeline descriptor",
71    kind = "handle",
72    descriptor = "core/Sequence"
73)]
74pub struct TransducedSequence {
75    source: Value,
76    pipeline: TransducerPipeline,
77}
78
79impl TransducedSequence {
80    pub fn new(source: Value, pipeline: TransducerPipeline) -> Self {
81        Self { source, pipeline }
82    }
83}
84
85impl Object for TransducedSequence {
86    fn display(&self, _cx: &mut Cx) -> Result<String> {
87        Ok("#<transduced-sequence>".to_owned())
88    }
89
90    fn as_any(&self) -> &dyn std::any::Any {
91        self
92    }
93}
94
95impl ObjectCompat for TransducedSequence {
96    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
97        cx.factory().class_stub(
98            CORE_SEQUENCE_CLASS_ID,
99            Symbol::qualified("core", "Sequence"),
100        )
101    }
102
103    fn as_sequence(&self) -> Option<&dyn Sequence> {
104        Some(self)
105    }
106}
107
108impl Sequence for TransducedSequence {
109    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
110        while let Some(item) = seq_next_value(cx, &self.source)? {
111            let value = item.into_value(cx)?;
112            if let Some(value) = self.pipeline.apply(cx, value)? {
113                return Ok(Some(SequenceItem::new(value)));
114            }
115        }
116        Ok(None)
117    }
118
119    fn close(&self, cx: &mut Cx) -> Result<()> {
120        if let Some(sequence) = self.source.object().as_sequence() {
121            sequence.close(cx)
122        } else {
123            Ok(())
124        }
125    }
126}
127
128/// Return a lazy sequence that maps each element of `source`.
129///
130/// Fails if `source` is not a sequence.
131pub fn map_sequence(cx: &mut Cx, source: Value, mapper: ValueMapper) -> Result<Value> {
132    transduced_sequence_value(cx, source, TransducerPipeline::new().map(mapper))
133}
134
135/// Return a lazy sequence that keeps elements of `source` passing `predicate`.
136///
137/// Fails if `source` is not a sequence.
138pub fn filter_sequence(cx: &mut Cx, source: Value, predicate: ValuePredicate) -> Result<Value> {
139    transduced_sequence_value(cx, source, TransducerPipeline::new().filter(predicate))
140}
141
142/// Fold `source` into a single value with `reducer`, starting from `init`.
143pub fn reduce_sequence(
144    cx: &mut Cx,
145    source: &Value,
146    init: Value,
147    reducer: ValueReducer,
148) -> Result<Value> {
149    transduce(cx, source, TransducerPipeline::new(), init, reducer)
150}
151
152/// Apply `visitor` to each element of `source` for its effect.
153pub fn for_each_sequence(cx: &mut Cx, source: &Value, visitor: ValueVisitor) -> Result<()> {
154    while let Some(item) = seq_next_value(cx, source)? {
155        let value = item.into_value(cx)?;
156        visitor(cx, value)?;
157    }
158    Ok(())
159}
160
161/// Stream `source` through `pipeline` and fold survivors into a single value.
162///
163/// The one-pass core combining transformation and reduction: each element is
164/// run through `pipeline`, and surviving elements are folded with `reducer`
165/// from `init`.
166///
167/// # Examples
168///
169/// ```
170/// use std::sync::Arc;
171/// use sim_kernel::{Cx, DefaultFactory, Expr, NoopEvalPolicy, NumberLiteral, Symbol, Value};
172/// use sim_lib_sequence::{lazy_sequence_value, transduce, TransducerPipeline};
173///
174/// fn n(cx: &mut Cx, v: u64) -> Value {
175///     cx.factory().number_literal(Symbol::qualified("test", "u64"), v.to_string()).unwrap()
176/// }
177/// fn read(cx: &mut Cx, v: &Value) -> u64 {
178///     let Expr::Number(NumberLiteral { canonical, .. }) = v.object().as_expr(cx).unwrap() else {
179///         panic!("number");
180///     };
181///     canonical.parse().unwrap()
182/// }
183///
184/// let mut cx = Cx::new(Arc::new(NoopEvalPolicy), Arc::new(DefaultFactory));
185/// // Source yields 0, 1, 2 then ends.
186/// let source = lazy_sequence_value(&mut cx, Arc::new(|cx: &mut Cx, i| {
187///     if i >= 3 { return Ok(None); }
188///     Ok(Some(n(cx, i as u64)))
189/// }))?;
190///
191/// let zero = n(&mut cx, 0);
192/// let sum = transduce(
193///     &mut cx,
194///     &source,
195///     TransducerPipeline::new(),
196///     zero,
197///     Arc::new(|cx, acc, value| {
198///         let total = read(cx, &acc) + read(cx, &value);
199///         Ok(n(cx, total))
200///     }),
201/// )?;
202/// assert_eq!(read(&mut cx, &sum), 3);
203/// # Ok::<(), sim_kernel::Error>(())
204/// ```
205pub fn transduce(
206    cx: &mut Cx,
207    source: &Value,
208    pipeline: TransducerPipeline,
209    mut init: Value,
210    reducer: ValueReducer,
211) -> Result<Value> {
212    while let Some(item) = seq_next_value(cx, source)? {
213        let value = item.into_value(cx)?;
214        if let Some(value) = pipeline.apply(cx, value)? {
215            init = reducer(cx, init, value)?;
216        }
217    }
218    Ok(init)
219}
220
221fn transduced_sequence_value(
222    cx: &mut Cx,
223    source: Value,
224    pipeline: TransducerPipeline,
225) -> Result<Value> {
226    if source.object().as_sequence().is_none() {
227        return Err(Error::TypeMismatch {
228            expected: "sequence",
229            found: "non-sequence",
230        });
231    }
232    cx.factory()
233        .opaque(Arc::new(TransducedSequence::new(source, pipeline)))
234}