Skip to main content

harn_vm/vm/
iter.rs

1//! Lazy iterator protocol for the Harn VM.
2//!
3//! `VmIter` is the backing enum for `VmValue::Iter`. It's a single-pass, fused
4//! iterator; once `next` returns `None` the variant is replaced with
5//! `Exhausted`. Step (a) only introduces source variants (Vec, Dict, Chars,
6//! Gen, Chan, Exhausted) and wires them into the for-loop driver. Combinator
7//! variants (`Map`, `Filter`, `Take`, ...) and sink builtins land in later
8//! steps per the plan.
9
10use std::cell::RefCell;
11use std::collections::{BTreeMap, VecDeque};
12use std::rc::Rc;
13
14use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
15
16/// Backing enum for `VmValue::Iter`. See module docs.
17#[derive(Debug)]
18pub enum VmIter {
19    /// Step through a lazy integer range without materializing.
20    /// `next` is the value to emit on the next call; `stop` is the
21    /// first value that terminates the iteration (one past the end).
22    Range { next: i64, stop: i64 },
23    /// Snapshot over a shared list / set backing store.
24    Vec { items: Rc<Vec<VmValue>>, idx: usize },
25    /// Snapshot over a dict; yields one-key `{key, value}` dicts for now.
26    /// Step (b) swaps these for `VmValue::Pair` when the Pair variant lands.
27    Dict {
28        entries: Rc<BTreeMap<String, VmValue>>,
29        keys: Vec<String>,
30        idx: usize,
31    },
32    /// Unicode scalar iteration over a string.
33    Chars { s: Rc<str>, byte_idx: usize },
34    /// Drains a generator's yield channel.
35    Gen { gen: VmGenerator },
36    /// Drains a stream's emit channel.
37    Stream { stream: VmStream },
38    /// Reads from a channel handle.
39    Chan { handle: VmChannelHandle },
40    /// Maps each item through a closure.
41    Map {
42        inner: Rc<RefCell<VmIter>>,
43        f: VmValue,
44    },
45    /// Keeps only items for which the predicate is truthy.
46    Filter {
47        inner: Rc<RefCell<VmIter>>,
48        p: VmValue,
49    },
50    /// Maps each item to an iterable and flattens one level.
51    FlatMap {
52        inner: Rc<RefCell<VmIter>>,
53        f: VmValue,
54        cur: Option<Rc<RefCell<VmIter>>>,
55    },
56    /// Yields up to `remaining` items from `inner`, then becomes Exhausted.
57    Take {
58        inner: Rc<RefCell<VmIter>>,
59        remaining: usize,
60    },
61    /// Skips the first `remaining` items from `inner` on the first call, then
62    /// forwards. `remaining == 0` is the sentinel for "already primed".
63    Skip {
64        inner: Rc<RefCell<VmIter>>,
65        remaining: usize,
66    },
67    /// Yields items from `inner` while the predicate is truthy; after the
68    /// first falsy predicate or inner exhaustion, becomes Exhausted.
69    TakeWhile {
70        inner: Rc<RefCell<VmIter>>,
71        p: VmValue,
72        done: bool,
73    },
74    /// Discards items while the predicate is truthy; after the first falsy
75    /// item, forwards that item and all subsequent items from `inner`.
76    SkipWhile {
77        inner: Rc<RefCell<VmIter>>,
78        p: VmValue,
79        primed: bool,
80    },
81    /// Advances two inner iters in lockstep; yields `Pair(a, b)` until either
82    /// side is exhausted.
83    Zip {
84        a: Rc<RefCell<VmIter>>,
85        b: Rc<RefCell<VmIter>>,
86    },
87    /// Yields `Pair(i, item)` starting at `i = 0`.
88    Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
89    /// Concatenates two iters: drains `a` first, then `b`.
90    Chain {
91        a: Rc<RefCell<VmIter>>,
92        b: Rc<RefCell<VmIter>>,
93        on_a: bool,
94    },
95    /// Yields `VmValue::List` batches of up to `n` items from `inner`.
96    /// The final batch may be shorter; empty input yields no batches.
97    Chunks {
98        inner: Rc<RefCell<VmIter>>,
99        n: usize,
100    },
101    /// Yields sliding windows of exactly `n` items from `inner` as `VmValue::List`.
102    /// If the input has fewer than `n` items total, no windows are yielded.
103    Windows {
104        inner: Rc<RefCell<VmIter>>,
105        n: usize,
106        buf: VecDeque<VmValue>,
107    },
108    /// Terminal state: `next` always returns `None`.
109    Exhausted,
110}
111
112impl VmIter {
113    /// Produce the next value, or `None` when exhausted.
114    ///
115    /// Combinator variants (`Map`, `Filter`, `FlatMap`) invoke user-provided
116    /// closures through the `vm` parameter.
117    pub fn next<'a>(
118        &'a mut self,
119        vm: &'a mut crate::vm::Vm,
120    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
121    {
122        Box::pin(async move { self.next_impl(vm).await })
123    }
124
125    async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
126        match self {
127            VmIter::Exhausted => Ok(None),
128            VmIter::Range { next, stop } => {
129                if *next < *stop {
130                    let v = *next;
131                    *next += 1;
132                    Ok(Some(VmValue::Int(v)))
133                } else {
134                    *self = VmIter::Exhausted;
135                    Ok(None)
136                }
137            }
138            VmIter::Vec { items, idx } => {
139                if *idx < items.len() {
140                    let v = items[*idx].clone();
141                    *idx += 1;
142                    Ok(Some(v))
143                } else {
144                    *self = VmIter::Exhausted;
145                    Ok(None)
146                }
147            }
148            VmIter::Dict { entries, keys, idx } => {
149                if *idx < keys.len() {
150                    let k = &keys[*idx];
151                    let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
152                    *idx += 1;
153                    Ok(Some(VmValue::Pair(Rc::new((
154                        VmValue::String(Rc::from(k.as_str())),
155                        v,
156                    )))))
157                } else {
158                    *self = VmIter::Exhausted;
159                    Ok(None)
160                }
161            }
162            VmIter::Chars { s, byte_idx } => {
163                if *byte_idx >= s.len() {
164                    *self = VmIter::Exhausted;
165                    return Ok(None);
166                }
167                let rest = &s[*byte_idx..];
168                if let Some(c) = rest.chars().next() {
169                    *byte_idx += c.len_utf8();
170                    Ok(Some(VmValue::String(Rc::from(c.to_string().as_str()))))
171                } else {
172                    *self = VmIter::Exhausted;
173                    Ok(None)
174                }
175            }
176            VmIter::Gen { gen } => {
177                if gen.done.get() {
178                    *self = VmIter::Exhausted;
179                    return Ok(None);
180                }
181                let rx = gen.receiver.clone();
182                let mut guard = rx.lock().await;
183                match guard.recv().await {
184                    Some(Ok(v)) => Ok(Some(v)),
185                    Some(Err(error)) => {
186                        gen.done.set(true);
187                        drop(guard);
188                        *self = VmIter::Exhausted;
189                        Err(error)
190                    }
191                    None => {
192                        gen.done.set(true);
193                        drop(guard);
194                        *self = VmIter::Exhausted;
195                        Ok(None)
196                    }
197                }
198            }
199            VmIter::Stream { stream } => {
200                if stream.done.get() {
201                    *self = VmIter::Exhausted;
202                    return Ok(None);
203                }
204                let rx = stream.receiver.clone();
205                let mut guard = rx.lock().await;
206                match guard.recv().await {
207                    Some(Ok(v)) => Ok(Some(v)),
208                    Some(Err(error)) => {
209                        stream.done.set(true);
210                        drop(guard);
211                        *self = VmIter::Exhausted;
212                        Err(error)
213                    }
214                    None => {
215                        stream.done.set(true);
216                        drop(guard);
217                        *self = VmIter::Exhausted;
218                        Ok(None)
219                    }
220                }
221            }
222            VmIter::Map { inner, f } => {
223                let f = f.clone();
224                let item = next_handle(inner, vm).await?;
225                match item {
226                    None => {
227                        *self = VmIter::Exhausted;
228                        Ok(None)
229                    }
230                    Some(v) => {
231                        let out = vm.call_callable_value(&f, &[v]).await?;
232                        Ok(Some(out))
233                    }
234                }
235            }
236            VmIter::Filter { inner, p } => {
237                let p = p.clone();
238                loop {
239                    let item = next_handle(inner, vm).await?;
240                    match item {
241                        None => {
242                            *self = VmIter::Exhausted;
243                            return Ok(None);
244                        }
245                        Some(v) => {
246                            let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
247                            if keep.is_truthy() {
248                                return Ok(Some(v));
249                            }
250                        }
251                    }
252                }
253            }
254            VmIter::FlatMap { inner, f, cur } => {
255                let f = f.clone();
256                loop {
257                    if let Some(cur_iter) = cur.clone() {
258                        let item = next_handle(&cur_iter, vm).await?;
259                        if let Some(v) = item {
260                            return Ok(Some(v));
261                        }
262                        *cur = None;
263                    }
264                    let item = next_handle(inner, vm).await?;
265                    match item {
266                        None => {
267                            *self = VmIter::Exhausted;
268                            return Ok(None);
269                        }
270                        Some(v) => {
271                            let result = vm.call_callable_value(&f, &[v]).await?;
272                            let lifted = iter_from_value(result)?;
273                            if let VmValue::Iter(h) = lifted {
274                                *cur = Some(h);
275                            } else {
276                                return Err(VmError::TypeError(
277                                    "flat_map: expected iterable result".to_string(),
278                                ));
279                            }
280                        }
281                    }
282                }
283            }
284            VmIter::Take { inner, remaining } => {
285                if *remaining == 0 {
286                    *self = VmIter::Exhausted;
287                    return Ok(None);
288                }
289                let item = next_handle(inner, vm).await?;
290                match item {
291                    None => {
292                        *self = VmIter::Exhausted;
293                        Ok(None)
294                    }
295                    Some(v) => {
296                        *remaining -= 1;
297                        if *remaining == 0 {
298                            *self = VmIter::Exhausted;
299                        }
300                        Ok(Some(v))
301                    }
302                }
303            }
304            VmIter::Skip { inner, remaining } => {
305                while *remaining > 0 {
306                    let item = next_handle(inner, vm).await?;
307                    match item {
308                        None => {
309                            *self = VmIter::Exhausted;
310                            return Ok(None);
311                        }
312                        Some(_) => {
313                            *remaining -= 1;
314                        }
315                    }
316                }
317                let item = next_handle(inner, vm).await?;
318                match item {
319                    None => {
320                        *self = VmIter::Exhausted;
321                        Ok(None)
322                    }
323                    Some(v) => Ok(Some(v)),
324                }
325            }
326            VmIter::TakeWhile { inner, p, done } => {
327                if *done {
328                    return Ok(None);
329                }
330                let p = p.clone();
331                let item = next_handle(inner, vm).await?;
332                match item {
333                    None => {
334                        *self = VmIter::Exhausted;
335                        Ok(None)
336                    }
337                    Some(v) => {
338                        let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
339                        if keep.is_truthy() {
340                            Ok(Some(v))
341                        } else {
342                            *self = VmIter::Exhausted;
343                            Ok(None)
344                        }
345                    }
346                }
347            }
348            VmIter::SkipWhile { inner, p, primed } => {
349                if *primed {
350                    let item = next_handle(inner, vm).await?;
351                    return match item {
352                        None => {
353                            *self = VmIter::Exhausted;
354                            Ok(None)
355                        }
356                        Some(v) => Ok(Some(v)),
357                    };
358                }
359                let p = p.clone();
360                loop {
361                    let item = next_handle(inner, vm).await?;
362                    match item {
363                        None => {
364                            *self = VmIter::Exhausted;
365                            return Ok(None);
366                        }
367                        Some(v) => {
368                            let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
369                            if !drop_it.is_truthy() {
370                                *primed = true;
371                                return Ok(Some(v));
372                            }
373                        }
374                    }
375                }
376            }
377            VmIter::Zip { a, b } => {
378                let ia = next_handle(a, vm).await?;
379                let x = match ia {
380                    None => {
381                        *self = VmIter::Exhausted;
382                        return Ok(None);
383                    }
384                    Some(v) => v,
385                };
386                let ib = next_handle(b, vm).await?;
387                let y = match ib {
388                    None => {
389                        *self = VmIter::Exhausted;
390                        return Ok(None);
391                    }
392                    Some(v) => v,
393                };
394                Ok(Some(VmValue::Pair(Rc::new((x, y)))))
395            }
396            VmIter::Enumerate { inner, i } => {
397                let item = next_handle(inner, vm).await?;
398                match item {
399                    None => {
400                        *self = VmIter::Exhausted;
401                        Ok(None)
402                    }
403                    Some(v) => {
404                        let idx = *i;
405                        *i += 1;
406                        Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
407                    }
408                }
409            }
410            VmIter::Chain { a, b, on_a } => {
411                if *on_a {
412                    let item = next_handle(a, vm).await?;
413                    if let Some(v) = item {
414                        return Ok(Some(v));
415                    }
416                    *on_a = false;
417                }
418                let item = next_handle(b, vm).await?;
419                match item {
420                    None => {
421                        *self = VmIter::Exhausted;
422                        Ok(None)
423                    }
424                    Some(v) => Ok(Some(v)),
425                }
426            }
427            VmIter::Chunks { inner, n } => {
428                let n = *n;
429                let mut batch: Vec<VmValue> = Vec::with_capacity(n);
430                for _ in 0..n {
431                    let item = next_handle(inner, vm).await?;
432                    match item {
433                        Some(v) => batch.push(v),
434                        None => break,
435                    }
436                }
437                if batch.is_empty() {
438                    *self = VmIter::Exhausted;
439                    Ok(None)
440                } else {
441                    Ok(Some(VmValue::List(Rc::new(batch))))
442                }
443            }
444            VmIter::Windows { inner, n, buf } => {
445                let n = *n;
446                if buf.is_empty() {
447                    while buf.len() < n {
448                        let item = next_handle(inner, vm).await?;
449                        match item {
450                            Some(v) => buf.push_back(v),
451                            None => {
452                                *self = VmIter::Exhausted;
453                                return Ok(None);
454                            }
455                        }
456                    }
457                } else {
458                    let item = next_handle(inner, vm).await?;
459                    match item {
460                        Some(v) => {
461                            buf.pop_front();
462                            buf.push_back(v);
463                        }
464                        None => {
465                            *self = VmIter::Exhausted;
466                            return Ok(None);
467                        }
468                    }
469                }
470                let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
471                Ok(Some(VmValue::List(Rc::new(snapshot))))
472            }
473            VmIter::Chan { handle } => {
474                let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
475                let rx = handle.receiver.clone();
476                let mut guard = rx.lock().await;
477                let item = if is_closed {
478                    guard.try_recv().ok()
479                } else {
480                    guard.recv().await
481                };
482                match item {
483                    Some(v) => Ok(Some(v)),
484                    None => {
485                        drop(guard);
486                        *self = VmIter::Exhausted;
487                        Ok(None)
488                    }
489                }
490            }
491        }
492    }
493}
494
495/// Advance a handle without holding a `RefCell` borrow across the await.
496///
497/// Swaps the iter state out into a local owned value (replacing it with
498/// `Exhausted`), runs `next` on the owned state, then swaps it back. This
499/// avoids `clippy::await_holding_refcell_ref` while preserving single-pass
500/// semantics: a nested `next` call on the same handle during the await would
501/// see `Exhausted` (the iter protocol doesn't permit re-entrant stepping of
502/// the same handle anyway).
503pub async fn next_handle(
504    handle: &Rc<RefCell<VmIter>>,
505    vm: &mut crate::vm::Vm,
506) -> Result<Option<VmValue>, VmError> {
507    let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
508    let result = state.next(vm).await;
509    // Restore state unless the inner call replaced it with Exhausted.
510    *handle.borrow_mut() = state;
511    result
512}
513
514/// Fully consume an iter handle into a Vec of values.
515pub async fn drain(
516    handle: &Rc<RefCell<VmIter>>,
517    vm: &mut crate::vm::Vm,
518) -> Result<Vec<VmValue>, VmError> {
519    let mut out = Vec::new();
520    loop {
521        let v = next_handle(handle, vm).await?;
522        match v {
523            Some(v) => out.push(v),
524            None => break,
525        }
526    }
527    Ok(out)
528}
529
530/// Convenience: wrap a source value into a `VmValue::Iter`. Used by the
531/// `iter()` builtin and by combinator/sink implementations in later steps.
532pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
533    let inner = match v {
534        VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
535        VmValue::Range(r) => {
536            let stop = if r.inclusive {
537                r.end.saturating_add(1)
538            } else {
539                r.end
540            };
541            VmIter::Range {
542                next: r.start,
543                stop,
544            }
545        }
546        VmValue::List(items) => VmIter::Vec { items, idx: 0 },
547        VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
548        VmValue::Dict(entries) => {
549            let keys: Vec<String> = entries.keys().cloned().collect();
550            VmIter::Dict {
551                entries,
552                keys,
553                idx: 0,
554            }
555        }
556        VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
557        VmValue::Generator(gen) => VmIter::Gen { gen },
558        VmValue::Stream(stream) => VmIter::Stream { stream },
559        VmValue::Channel(handle) => VmIter::Chan { handle },
560        other => {
561            return Err(VmError::TypeError(format!(
562                "iter: value of type {} is not iterable",
563                other.type_name()
564            )))
565        }
566    };
567    Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
568}