Skip to main content

harn_vm/vm/
iter.rs

1//! Lazy iterator protocol for the Harn VM.
2//!
3//! `VmIter` is the backing enum for `VmValue::Iter`. It's a single-pass, fused
4//! iterator; once `next` returns `None` the variant is replaced with
5//! `Exhausted`. Step (a) only introduces source variants (Vec, Dict, Chars,
6//! Gen, Chan, Exhausted) and wires them into the for-loop driver. Combinator
7//! variants (`Map`, `Filter`, `Take`, ...) and sink builtins land in later
8//! steps per the plan.
9
10use std::cell::RefCell;
11use std::collections::{BTreeMap, VecDeque};
12use std::rc::Rc;
13
14use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
15
16#[derive(Debug)]
17pub struct VmBroadcastState {
18    source: Rc<RefCell<VmIter>>,
19    buffer: Vec<VmValue>,
20    exhausted: bool,
21}
22
23/// Backing enum for `VmValue::Iter`. See module docs.
24#[derive(Debug)]
25pub enum VmIter {
26    /// Step through a lazy integer range without materializing.
27    /// `next` is the value to emit on the next call; `stop` is the
28    /// first value that terminates the iteration (one past the end).
29    Range { next: i64, stop: i64 },
30    /// Snapshot over a shared list / set backing store.
31    Vec { items: Rc<Vec<VmValue>>, idx: usize },
32    /// Snapshot over a dict; yields one-key `{key, value}` dicts for now.
33    /// Step (b) swaps these for `VmValue::Pair` when the Pair variant lands.
34    Dict {
35        entries: Rc<BTreeMap<String, VmValue>>,
36        keys: Vec<String>,
37        idx: usize,
38    },
39    /// Unicode scalar iteration over a string.
40    Chars { s: Rc<str>, byte_idx: usize },
41    /// Drains a generator's yield channel.
42    Gen { gen: VmGenerator },
43    /// Drains a stream's emit channel.
44    Stream { stream: VmStream },
45    /// Reads from a channel handle.
46    Chan { handle: VmChannelHandle },
47    /// Maps each item through a closure.
48    Map {
49        inner: Rc<RefCell<VmIter>>,
50        f: VmValue,
51    },
52    /// Runs a callback for side effects, then yields the original item.
53    Tap {
54        inner: Rc<RefCell<VmIter>>,
55        f: VmValue,
56    },
57    /// Keeps only items for which the predicate is truthy.
58    Filter {
59        inner: Rc<RefCell<VmIter>>,
60        p: VmValue,
61    },
62    /// Running fold that yields each accumulator.
63    Scan {
64        inner: Rc<RefCell<VmIter>>,
65        acc: VmValue,
66        f: VmValue,
67    },
68    /// Maps each item to an iterable and flattens one level.
69    FlatMap {
70        inner: Rc<RefCell<VmIter>>,
71        f: VmValue,
72        cur: Option<Rc<RefCell<VmIter>>>,
73    },
74    /// Yields up to `remaining` items from `inner`, then becomes Exhausted.
75    Take {
76        inner: Rc<RefCell<VmIter>>,
77        remaining: usize,
78    },
79    /// Skips the first `remaining` items from `inner` on the first call, then
80    /// forwards. `remaining == 0` is the sentinel for "already primed".
81    Skip {
82        inner: Rc<RefCell<VmIter>>,
83        remaining: usize,
84    },
85    /// Yields items from `inner` while the predicate is truthy; after the
86    /// first falsy predicate or inner exhaustion, becomes Exhausted.
87    TakeWhile {
88        inner: Rc<RefCell<VmIter>>,
89        p: VmValue,
90        done: bool,
91    },
92    /// Yields items until the predicate is truthy. The matching sentinel item
93    /// is consumed but not yielded.
94    TakeUntil {
95        inner: Rc<RefCell<VmIter>>,
96        p: VmValue,
97    },
98    /// Discards items while the predicate is truthy; after the first falsy
99    /// item, forwards that item and all subsequent items from `inner`.
100    SkipWhile {
101        inner: Rc<RefCell<VmIter>>,
102        p: VmValue,
103        primed: bool,
104    },
105    /// Advances two inner iters in lockstep; yields `Pair(a, b)` until either
106    /// side is exhausted.
107    Zip {
108        a: Rc<RefCell<VmIter>>,
109        b: Rc<RefCell<VmIter>>,
110    },
111    /// Yields `Pair(i, item)` starting at `i = 0`.
112    Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
113    /// Concatenates two iters: drains `a` first, then `b`.
114    Chain {
115        a: Rc<RefCell<VmIter>>,
116        b: Rc<RefCell<VmIter>>,
117        on_a: bool,
118    },
119    /// Drains any non-exhausted source in rotating order.
120    Merge {
121        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
122        cursor: usize,
123    },
124    /// Strict round-robin over non-exhausted sources.
125    Interleave {
126        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
127        cursor: usize,
128    },
129    /// First source to yield wins; subsequent pulls only read that source.
130    Race {
131        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
132        winner: Option<Rc<RefCell<VmIter>>>,
133    },
134    /// One source fanned out into several single-pass branches.
135    Broadcast {
136        shared: Rc<RefCell<VmBroadcastState>>,
137        branch: usize,
138        index: usize,
139    },
140    /// Sleeps between emissions after the first item.
141    Throttle {
142        inner: Rc<RefCell<VmIter>>,
143        interval_ms: u64,
144        next_ready: Option<tokio::time::Instant>,
145    },
146    /// Coalesces immediately available bursts and emits the last item seen
147    /// after the quiet window.
148    Debounce {
149        inner: Rc<RefCell<VmIter>>,
150        window_ms: u64,
151    },
152    /// Yields `VmValue::List` batches of up to `n` items from `inner`.
153    /// The final batch may be shorter; empty input yields no batches.
154    Chunks {
155        inner: Rc<RefCell<VmIter>>,
156        n: usize,
157    },
158    /// Yields sliding windows of exactly `n` items from `inner` as `VmValue::List`.
159    /// If the input has fewer than `n` items total, no windows are yielded.
160    Windows {
161        inner: Rc<RefCell<VmIter>>,
162        n: usize,
163        buf: VecDeque<VmValue>,
164    },
165    /// Terminal state: `next` always returns `None`.
166    Exhausted,
167}
168
169impl VmIter {
170    /// Produce the next value, or `None` when exhausted.
171    ///
172    /// Combinator variants (`Map`, `Filter`, `FlatMap`) invoke user-provided
173    /// closures through the `vm` parameter.
174    pub fn next<'a>(
175        &'a mut self,
176        vm: &'a mut crate::vm::Vm,
177    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
178    {
179        Box::pin(async move { self.next_impl(vm).await })
180    }
181
182    async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
183        match self {
184            VmIter::Exhausted => Ok(None),
185            VmIter::Range { next, stop } => {
186                if *next < *stop {
187                    let v = *next;
188                    *next += 1;
189                    Ok(Some(VmValue::Int(v)))
190                } else {
191                    *self = VmIter::Exhausted;
192                    Ok(None)
193                }
194            }
195            VmIter::Vec { items, idx } => {
196                if *idx < items.len() {
197                    let v = items[*idx].clone();
198                    *idx += 1;
199                    Ok(Some(v))
200                } else {
201                    *self = VmIter::Exhausted;
202                    Ok(None)
203                }
204            }
205            VmIter::Dict { entries, keys, idx } => {
206                if *idx < keys.len() {
207                    let k = &keys[*idx];
208                    let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
209                    *idx += 1;
210                    Ok(Some(VmValue::Pair(Rc::new((
211                        VmValue::String(Rc::from(k.as_str())),
212                        v,
213                    )))))
214                } else {
215                    *self = VmIter::Exhausted;
216                    Ok(None)
217                }
218            }
219            VmIter::Chars { s, byte_idx } => {
220                if *byte_idx >= s.len() {
221                    *self = VmIter::Exhausted;
222                    return Ok(None);
223                }
224                let rest = &s[*byte_idx..];
225                if let Some(c) = rest.chars().next() {
226                    *byte_idx += c.len_utf8();
227                    Ok(Some(VmValue::String(Rc::from(c.to_string().as_str()))))
228                } else {
229                    *self = VmIter::Exhausted;
230                    Ok(None)
231                }
232            }
233            VmIter::Gen { gen } => {
234                if gen.done.get() {
235                    *self = VmIter::Exhausted;
236                    return Ok(None);
237                }
238                let rx = gen.receiver.clone();
239                let mut guard = rx.lock().await;
240                match guard.recv().await {
241                    Some(Ok(v)) => Ok(Some(v)),
242                    Some(Err(error)) => {
243                        gen.done.set(true);
244                        drop(guard);
245                        *self = VmIter::Exhausted;
246                        Err(error)
247                    }
248                    None => {
249                        gen.done.set(true);
250                        drop(guard);
251                        *self = VmIter::Exhausted;
252                        Ok(None)
253                    }
254                }
255            }
256            VmIter::Stream { stream } => {
257                if stream.done.get() {
258                    *self = VmIter::Exhausted;
259                    return Ok(None);
260                }
261                let rx = stream.receiver.clone();
262                let mut guard = rx.lock().await;
263                match guard.recv().await {
264                    Some(Ok(v)) => Ok(Some(v)),
265                    Some(Err(error)) => {
266                        stream.done.set(true);
267                        drop(guard);
268                        *self = VmIter::Exhausted;
269                        Err(error)
270                    }
271                    None => {
272                        stream.done.set(true);
273                        drop(guard);
274                        *self = VmIter::Exhausted;
275                        Ok(None)
276                    }
277                }
278            }
279            VmIter::Map { inner, f } => {
280                let f = f.clone();
281                let item = next_handle(inner, vm).await?;
282                match item {
283                    None => {
284                        *self = VmIter::Exhausted;
285                        Ok(None)
286                    }
287                    Some(v) => {
288                        let out = vm.call_callable_value(&f, &[v]).await?;
289                        Ok(Some(out))
290                    }
291                }
292            }
293            VmIter::Tap { inner, f } => {
294                let f = f.clone();
295                let item = next_handle(inner, vm).await?;
296                match item {
297                    None => {
298                        *self = VmIter::Exhausted;
299                        Ok(None)
300                    }
301                    Some(v) => {
302                        vm.call_callable_value(&f, &[v.clone()]).await?;
303                        Ok(Some(v))
304                    }
305                }
306            }
307            VmIter::Filter { inner, p } => {
308                let p = p.clone();
309                loop {
310                    let item = next_handle(inner, vm).await?;
311                    match item {
312                        None => {
313                            *self = VmIter::Exhausted;
314                            return Ok(None);
315                        }
316                        Some(v) => {
317                            let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
318                            if keep.is_truthy() {
319                                return Ok(Some(v));
320                            }
321                        }
322                    }
323                }
324            }
325            VmIter::Scan { inner, acc, f } => {
326                let f = f.clone();
327                let item = next_handle(inner, vm).await?;
328                match item {
329                    None => {
330                        *self = VmIter::Exhausted;
331                        Ok(None)
332                    }
333                    Some(v) => {
334                        let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
335                        *acc = next_acc.clone();
336                        Ok(Some(next_acc))
337                    }
338                }
339            }
340            VmIter::FlatMap { inner, f, cur } => {
341                let f = f.clone();
342                loop {
343                    if let Some(cur_iter) = cur.clone() {
344                        let item = next_handle(&cur_iter, vm).await?;
345                        if let Some(v) = item {
346                            return Ok(Some(v));
347                        }
348                        *cur = None;
349                    }
350                    let item = next_handle(inner, vm).await?;
351                    match item {
352                        None => {
353                            *self = VmIter::Exhausted;
354                            return Ok(None);
355                        }
356                        Some(v) => {
357                            let result = vm.call_callable_value(&f, &[v]).await?;
358                            let lifted = iter_from_value(result)?;
359                            if let VmValue::Iter(h) = lifted {
360                                *cur = Some(h);
361                            } else {
362                                return Err(VmError::TypeError(
363                                    "flat_map: expected iterable result".to_string(),
364                                ));
365                            }
366                        }
367                    }
368                }
369            }
370            VmIter::Take { inner, remaining } => {
371                if *remaining == 0 {
372                    *self = VmIter::Exhausted;
373                    return Ok(None);
374                }
375                let item = next_handle(inner, vm).await?;
376                match item {
377                    None => {
378                        *self = VmIter::Exhausted;
379                        Ok(None)
380                    }
381                    Some(v) => {
382                        *remaining -= 1;
383                        if *remaining == 0 {
384                            *self = VmIter::Exhausted;
385                        }
386                        Ok(Some(v))
387                    }
388                }
389            }
390            VmIter::Skip { inner, remaining } => {
391                while *remaining > 0 {
392                    let item = next_handle(inner, vm).await?;
393                    match item {
394                        None => {
395                            *self = VmIter::Exhausted;
396                            return Ok(None);
397                        }
398                        Some(_) => {
399                            *remaining -= 1;
400                        }
401                    }
402                }
403                let item = next_handle(inner, vm).await?;
404                match item {
405                    None => {
406                        *self = VmIter::Exhausted;
407                        Ok(None)
408                    }
409                    Some(v) => Ok(Some(v)),
410                }
411            }
412            VmIter::TakeWhile { inner, p, done } => {
413                if *done {
414                    return Ok(None);
415                }
416                let p = p.clone();
417                let item = next_handle(inner, vm).await?;
418                match item {
419                    None => {
420                        *self = VmIter::Exhausted;
421                        Ok(None)
422                    }
423                    Some(v) => {
424                        let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
425                        if keep.is_truthy() {
426                            Ok(Some(v))
427                        } else {
428                            *self = VmIter::Exhausted;
429                            Ok(None)
430                        }
431                    }
432                }
433            }
434            VmIter::TakeUntil { inner, p } => {
435                let p = p.clone();
436                let item = next_handle(inner, vm).await?;
437                match item {
438                    None => {
439                        *self = VmIter::Exhausted;
440                        Ok(None)
441                    }
442                    Some(v) => {
443                        let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
444                        if stop.is_truthy() {
445                            *self = VmIter::Exhausted;
446                            Ok(None)
447                        } else {
448                            Ok(Some(v))
449                        }
450                    }
451                }
452            }
453            VmIter::SkipWhile { inner, p, primed } => {
454                if *primed {
455                    let item = next_handle(inner, vm).await?;
456                    return match item {
457                        None => {
458                            *self = VmIter::Exhausted;
459                            Ok(None)
460                        }
461                        Some(v) => Ok(Some(v)),
462                    };
463                }
464                let p = p.clone();
465                loop {
466                    let item = next_handle(inner, vm).await?;
467                    match item {
468                        None => {
469                            *self = VmIter::Exhausted;
470                            return Ok(None);
471                        }
472                        Some(v) => {
473                            let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
474                            if !drop_it.is_truthy() {
475                                *primed = true;
476                                return Ok(Some(v));
477                            }
478                        }
479                    }
480                }
481            }
482            VmIter::Zip { a, b } => {
483                let ia = next_handle(a, vm).await?;
484                let x = match ia {
485                    None => {
486                        *self = VmIter::Exhausted;
487                        return Ok(None);
488                    }
489                    Some(v) => v,
490                };
491                let ib = next_handle(b, vm).await?;
492                let y = match ib {
493                    None => {
494                        *self = VmIter::Exhausted;
495                        return Ok(None);
496                    }
497                    Some(v) => v,
498                };
499                Ok(Some(VmValue::Pair(Rc::new((x, y)))))
500            }
501            VmIter::Enumerate { inner, i } => {
502                let item = next_handle(inner, vm).await?;
503                match item {
504                    None => {
505                        *self = VmIter::Exhausted;
506                        Ok(None)
507                    }
508                    Some(v) => {
509                        let idx = *i;
510                        *i += 1;
511                        Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
512                    }
513                }
514            }
515            VmIter::Chain { a, b, on_a } => {
516                if *on_a {
517                    let item = next_handle(a, vm).await?;
518                    if let Some(v) = item {
519                        return Ok(Some(v));
520                    }
521                    *on_a = false;
522                }
523                let item = next_handle(b, vm).await?;
524                match item {
525                    None => {
526                        *self = VmIter::Exhausted;
527                        Ok(None)
528                    }
529                    Some(v) => Ok(Some(v)),
530                }
531            }
532            VmIter::Merge { sources, cursor } => loop {
533                if sources.is_empty() || sources.iter().all(Option::is_none) {
534                    *self = VmIter::Exhausted;
535                    return Ok(None);
536                }
537                let len = sources.len();
538                let mut live = 0usize;
539                for offset in 0..len {
540                    let idx = (*cursor + offset) % len;
541                    let Some(handle) = sources[idx].clone() else {
542                        continue;
543                    };
544                    match try_next_ready(&handle, vm).await? {
545                        Some(v) => {
546                            *cursor = (idx + 1) % len;
547                            return Ok(Some(v));
548                        }
549                        None => {
550                            if is_exhausted_handle(&handle) {
551                                sources[idx] = None;
552                            } else {
553                                live += 1;
554                            }
555                        }
556                    }
557                }
558                if live == 0 {
559                    *self = VmIter::Exhausted;
560                    return Ok(None);
561                }
562                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
563            },
564            VmIter::Interleave { sources, cursor } => {
565                if sources.is_empty() || sources.iter().all(Option::is_none) {
566                    *self = VmIter::Exhausted;
567                    return Ok(None);
568                }
569                let len = sources.len();
570                for offset in 0..len {
571                    let idx = (*cursor + offset) % len;
572                    let Some(handle) = sources[idx].clone() else {
573                        continue;
574                    };
575                    match next_handle(&handle, vm).await? {
576                        Some(v) => {
577                            *cursor = (idx + 1) % len;
578                            return Ok(Some(v));
579                        }
580                        None => {
581                            sources[idx] = None;
582                        }
583                    }
584                }
585                *self = VmIter::Exhausted;
586                Ok(None)
587            }
588            VmIter::Race { sources, winner } => {
589                if let Some(handle) = winner.clone() {
590                    let item = next_handle(&handle, vm).await?;
591                    return match item {
592                        Some(v) => Ok(Some(v)),
593                        None => {
594                            *self = VmIter::Exhausted;
595                            Ok(None)
596                        }
597                    };
598                }
599                loop {
600                    let mut live = 0usize;
601                    for source in sources.iter_mut() {
602                        let Some(handle) = source.clone() else {
603                            continue;
604                        };
605                        match try_next_ready(&handle, vm).await? {
606                            Some(v) => {
607                                *winner = Some(handle);
608                                sources.clear();
609                                return Ok(Some(v));
610                            }
611                            None => {
612                                if is_exhausted_handle(&handle) {
613                                    *source = None;
614                                } else {
615                                    live += 1;
616                                }
617                            }
618                        }
619                    }
620                    if live == 0 {
621                        *self = VmIter::Exhausted;
622                        return Ok(None);
623                    }
624                    tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
625                }
626            }
627            VmIter::Broadcast {
628                shared,
629                branch,
630                index,
631            } => {
632                let _ = branch;
633                loop {
634                    let mut state =
635                        std::mem::replace(&mut *shared.borrow_mut(), empty_broadcast_state());
636                    if *index < state.buffer.len() {
637                        let item = state.buffer[*index].clone();
638                        *index += 1;
639                        *shared.borrow_mut() = state;
640                        return Ok(Some(item));
641                    }
642                    if state.exhausted {
643                        *shared.borrow_mut() = state;
644                        *self = VmIter::Exhausted;
645                        return Ok(None);
646                    }
647                    let next = next_handle(&state.source, vm).await;
648                    match next {
649                        Err(err) => {
650                            *shared.borrow_mut() = state;
651                            return Err(err);
652                        }
653                        Ok(Some(v)) => {
654                            state.buffer.push(v);
655                            *shared.borrow_mut() = state;
656                        }
657                        Ok(None) => {
658                            state.exhausted = true;
659                            *shared.borrow_mut() = state;
660                        }
661                    }
662                }
663            }
664            VmIter::Throttle {
665                inner,
666                interval_ms,
667                next_ready,
668            } => {
669                if let Some(ready_at) = next_ready.take() {
670                    let now = tokio::time::Instant::now();
671                    if ready_at > now {
672                        tokio::time::sleep_until(ready_at).await;
673                    }
674                }
675                let item = next_handle(inner, vm).await?;
676                match item {
677                    None => {
678                        *self = VmIter::Exhausted;
679                        Ok(None)
680                    }
681                    Some(v) => {
682                        *next_ready = Some(
683                            tokio::time::Instant::now()
684                                + tokio::time::Duration::from_millis(*interval_ms),
685                        );
686                        Ok(Some(v))
687                    }
688                }
689            }
690            VmIter::Debounce { inner, window_ms } => {
691                let mut last = match next_handle(inner, vm).await? {
692                    Some(v) => v,
693                    None => {
694                        *self = VmIter::Exhausted;
695                        return Ok(None);
696                    }
697                };
698                if *window_ms > 0 {
699                    tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
700                }
701                while let Some(v) = try_next_ready(inner, vm).await? {
702                    last = v;
703                }
704                Ok(Some(last))
705            }
706            VmIter::Chunks { inner, n } => {
707                let n = *n;
708                let mut batch: Vec<VmValue> = Vec::with_capacity(n);
709                for _ in 0..n {
710                    let item = next_handle(inner, vm).await?;
711                    match item {
712                        Some(v) => {
713                            batch.push(v);
714                        }
715                        None => break,
716                    }
717                }
718                if batch.is_empty() {
719                    *self = VmIter::Exhausted;
720                    Ok(None)
721                } else {
722                    Ok(Some(VmValue::List(Rc::new(batch))))
723                }
724            }
725            VmIter::Windows { inner, n, buf } => {
726                let n = *n;
727                if buf.is_empty() {
728                    while buf.len() < n {
729                        let item = next_handle(inner, vm).await?;
730                        match item {
731                            Some(v) => buf.push_back(v),
732                            None => {
733                                *self = VmIter::Exhausted;
734                                return Ok(None);
735                            }
736                        }
737                    }
738                } else {
739                    let item = next_handle(inner, vm).await?;
740                    match item {
741                        Some(v) => {
742                            buf.pop_front();
743                            buf.push_back(v);
744                        }
745                        None => {
746                            *self = VmIter::Exhausted;
747                            return Ok(None);
748                        }
749                    }
750                }
751                let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
752                Ok(Some(VmValue::List(Rc::new(snapshot))))
753            }
754            VmIter::Chan { handle } => {
755                let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
756                let rx = handle.receiver.clone();
757                let mut guard = rx.lock().await;
758                let item = if is_closed {
759                    guard.try_recv().ok()
760                } else {
761                    guard.recv().await
762                };
763                match item {
764                    Some(v) => Ok(Some(v)),
765                    None => {
766                        drop(guard);
767                        *self = VmIter::Exhausted;
768                        Ok(None)
769                    }
770                }
771            }
772        }
773    }
774}
775
776/// Advance a handle without holding a `RefCell` borrow across the await.
777///
778/// Swaps the iter state out into a local owned value (replacing it with
779/// `Exhausted`), runs `next` on the owned state, then swaps it back. This
780/// avoids `clippy::await_holding_refcell_ref` while preserving single-pass
781/// semantics: a nested `next` call on the same handle during the await would
782/// see `Exhausted` (the iter protocol doesn't permit re-entrant stepping of
783/// the same handle anyway).
784pub async fn next_handle(
785    handle: &Rc<RefCell<VmIter>>,
786    vm: &mut crate::vm::Vm,
787) -> Result<Option<VmValue>, VmError> {
788    let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
789    let result = state.next(vm).await;
790    // Restore state unless the inner call replaced it with Exhausted.
791    *handle.borrow_mut() = state;
792    result
793}
794
795/// Fully consume an iter handle into a Vec of values.
796pub async fn drain(
797    handle: &Rc<RefCell<VmIter>>,
798    vm: &mut crate::vm::Vm,
799) -> Result<Vec<VmValue>, VmError> {
800    let mut out = Vec::new();
801    loop {
802        let v = next_handle(handle, vm).await?;
803        match v {
804            Some(v) => out.push(v),
805            None => break,
806        }
807    }
808    Ok(out)
809}
810
811/// Fully consume an iter handle into a Vec, failing before pushing item
812/// `max + 1`.
813pub async fn drain_capped(
814    handle: &Rc<RefCell<VmIter>>,
815    vm: &mut crate::vm::Vm,
816    max: usize,
817) -> Result<Vec<VmValue>, VmError> {
818    let mut out = Vec::new();
819    loop {
820        let v = next_handle(handle, vm).await?;
821        match v {
822            Some(v) => {
823                if out.len() >= max {
824                    return Err(VmError::Runtime(format!(
825                        "stream.collect: max cap {max} exceeded"
826                    )));
827                }
828                out.push(v);
829            }
830            None => break,
831        }
832    }
833    Ok(out)
834}
835
836pub fn iter_handle_from_value(v: VmValue) -> Result<Rc<RefCell<VmIter>>, VmError> {
837    match iter_from_value(v)? {
838        VmValue::Iter(handle) => Ok(handle),
839        _ => unreachable!("iter_from_value returns Iter"),
840    }
841}
842
843pub fn broadcast_branches(source: Rc<RefCell<VmIter>>, n: usize) -> Vec<VmValue> {
844    let shared = Rc::new(RefCell::new(VmBroadcastState {
845        source,
846        buffer: Vec::new(),
847        exhausted: false,
848    }));
849    (0..n)
850        .map(|branch| {
851            VmValue::Iter(Rc::new(RefCell::new(VmIter::Broadcast {
852                shared: Rc::clone(&shared),
853                branch,
854                index: 0,
855            })))
856        })
857        .collect()
858}
859
860fn empty_broadcast_state() -> VmBroadcastState {
861    VmBroadcastState {
862        source: Rc::new(RefCell::new(VmIter::Exhausted)),
863        buffer: Vec::new(),
864        exhausted: true,
865    }
866}
867
868fn try_next_ready<'a>(
869    handle: &'a Rc<RefCell<VmIter>>,
870    vm: &'a mut crate::vm::Vm,
871) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>> {
872    Box::pin(async move {
873        let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
874        let result = state.try_next_ready_impl(vm).await;
875        *handle.borrow_mut() = state;
876        result
877    })
878}
879
880fn is_exhausted_handle(handle: &Rc<RefCell<VmIter>>) -> bool {
881    matches!(&*handle.borrow(), VmIter::Exhausted)
882}
883
884impl VmIter {
885    async fn try_next_ready_impl(
886        &mut self,
887        vm: &mut crate::vm::Vm,
888    ) -> Result<Option<VmValue>, VmError> {
889        match self {
890            VmIter::Exhausted => Ok(None),
891            VmIter::Map { inner, f } => {
892                let f = f.clone();
893                match try_next_ready(inner, vm).await? {
894                    Some(v) => Ok(Some(vm.call_callable_value(&f, &[v]).await?)),
895                    None => {
896                        if is_exhausted_handle(inner) {
897                            *self = VmIter::Exhausted;
898                        }
899                        Ok(None)
900                    }
901                }
902            }
903            VmIter::Tap { inner, f } => {
904                let f = f.clone();
905                match try_next_ready(inner, vm).await? {
906                    Some(v) => {
907                        vm.call_callable_value(&f, &[v.clone()]).await?;
908                        Ok(Some(v))
909                    }
910                    None => {
911                        if is_exhausted_handle(inner) {
912                            *self = VmIter::Exhausted;
913                        }
914                        Ok(None)
915                    }
916                }
917            }
918            VmIter::Filter { inner, p } => {
919                let p = p.clone();
920                loop {
921                    match try_next_ready(inner, vm).await? {
922                        Some(v) => {
923                            let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
924                            if keep.is_truthy() {
925                                return Ok(Some(v));
926                            }
927                        }
928                        None => {
929                            if is_exhausted_handle(inner) {
930                                *self = VmIter::Exhausted;
931                            }
932                            return Ok(None);
933                        }
934                    }
935                }
936            }
937            VmIter::Scan { inner, acc, f } => {
938                let f = f.clone();
939                match try_next_ready(inner, vm).await? {
940                    Some(v) => {
941                        let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
942                        *acc = next_acc.clone();
943                        Ok(Some(next_acc))
944                    }
945                    None => {
946                        if is_exhausted_handle(inner) {
947                            *self = VmIter::Exhausted;
948                        }
949                        Ok(None)
950                    }
951                }
952            }
953            VmIter::FlatMap { inner, f, cur } => {
954                let f = f.clone();
955                loop {
956                    if let Some(cur_iter) = cur.clone() {
957                        match try_next_ready(&cur_iter, vm).await? {
958                            Some(v) => return Ok(Some(v)),
959                            None => {
960                                if is_exhausted_handle(&cur_iter) {
961                                    *cur = None;
962                                }
963                                return Ok(None);
964                            }
965                        }
966                    }
967                    match try_next_ready(inner, vm).await? {
968                        Some(v) => {
969                            let result = vm.call_callable_value(&f, &[v]).await?;
970                            *cur = Some(iter_handle_from_value(result)?);
971                        }
972                        None => {
973                            if is_exhausted_handle(inner) {
974                                *self = VmIter::Exhausted;
975                            }
976                            return Ok(None);
977                        }
978                    }
979                }
980            }
981            VmIter::Take { inner, remaining } => {
982                if *remaining == 0 {
983                    *self = VmIter::Exhausted;
984                    return Ok(None);
985                }
986                match try_next_ready(inner, vm).await? {
987                    Some(v) => {
988                        *remaining -= 1;
989                        if *remaining == 0 {
990                            *self = VmIter::Exhausted;
991                        }
992                        Ok(Some(v))
993                    }
994                    None => {
995                        if is_exhausted_handle(inner) {
996                            *self = VmIter::Exhausted;
997                        }
998                        Ok(None)
999                    }
1000                }
1001            }
1002            VmIter::TakeUntil { inner, p } => {
1003                let p = p.clone();
1004                match try_next_ready(inner, vm).await? {
1005                    Some(v) => {
1006                        let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
1007                        if stop.is_truthy() {
1008                            *self = VmIter::Exhausted;
1009                            Ok(None)
1010                        } else {
1011                            Ok(Some(v))
1012                        }
1013                    }
1014                    None => {
1015                        if is_exhausted_handle(inner) {
1016                            *self = VmIter::Exhausted;
1017                        }
1018                        Ok(None)
1019                    }
1020                }
1021            }
1022            VmIter::Throttle {
1023                inner,
1024                interval_ms,
1025                next_ready,
1026            } => {
1027                if let Some(ready_at) = *next_ready {
1028                    if ready_at > tokio::time::Instant::now() {
1029                        return Ok(None);
1030                    }
1031                }
1032                match try_next_ready(inner, vm).await? {
1033                    Some(v) => {
1034                        *next_ready = Some(
1035                            tokio::time::Instant::now()
1036                                + tokio::time::Duration::from_millis(*interval_ms),
1037                        );
1038                        Ok(Some(v))
1039                    }
1040                    None => {
1041                        if is_exhausted_handle(inner) {
1042                            *self = VmIter::Exhausted;
1043                        }
1044                        Ok(None)
1045                    }
1046                }
1047            }
1048            VmIter::Range { .. }
1049            | VmIter::Vec { .. }
1050            | VmIter::Dict { .. }
1051            | VmIter::Chars { .. }
1052            | VmIter::Skip { .. }
1053            | VmIter::TakeWhile { .. }
1054            | VmIter::SkipWhile { .. }
1055            | VmIter::Zip { .. }
1056            | VmIter::Enumerate { .. }
1057            | VmIter::Chain { .. }
1058            | VmIter::Merge { .. }
1059            | VmIter::Interleave { .. }
1060            | VmIter::Race { .. }
1061            | VmIter::Broadcast { .. }
1062            | VmIter::Debounce { .. }
1063            | VmIter::Chunks { .. }
1064            | VmIter::Windows { .. } => self.next(vm).await,
1065            VmIter::Gen { gen } => {
1066                if gen.done.get() {
1067                    *self = VmIter::Exhausted;
1068                    return Ok(None);
1069                }
1070                let rx = gen.receiver.clone();
1071                let result = match rx.try_lock() {
1072                    Ok(mut guard) => match guard.try_recv() {
1073                        Ok(Ok(v)) => Ok(Some(v)),
1074                        Ok(Err(error)) => {
1075                            gen.done.set(true);
1076                            *self = VmIter::Exhausted;
1077                            Err(error)
1078                        }
1079                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1080                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1081                            gen.done.set(true);
1082                            *self = VmIter::Exhausted;
1083                            Ok(None)
1084                        }
1085                    },
1086                    Err(_) => Ok(None),
1087                };
1088                result
1089            }
1090            VmIter::Stream { stream } => {
1091                if stream.done.get() {
1092                    *self = VmIter::Exhausted;
1093                    return Ok(None);
1094                }
1095                let rx = stream.receiver.clone();
1096                let result = match rx.try_lock() {
1097                    Ok(mut guard) => match guard.try_recv() {
1098                        Ok(Ok(v)) => Ok(Some(v)),
1099                        Ok(Err(error)) => {
1100                            stream.done.set(true);
1101                            *self = VmIter::Exhausted;
1102                            Err(error)
1103                        }
1104                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1105                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1106                            stream.done.set(true);
1107                            *self = VmIter::Exhausted;
1108                            Ok(None)
1109                        }
1110                    },
1111                    Err(_) => Ok(None),
1112                };
1113                result
1114            }
1115            VmIter::Chan { handle } => {
1116                let rx = handle.receiver.clone();
1117                let result = match rx.try_lock() {
1118                    Ok(mut guard) => match guard.try_recv() {
1119                        Ok(v) => Ok(Some(v)),
1120                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1121                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1122                            *self = VmIter::Exhausted;
1123                            Ok(None)
1124                        }
1125                    },
1126                    Err(_) => Ok(None),
1127                };
1128                result
1129            }
1130        }
1131    }
1132}
1133
1134/// Convenience: wrap a source value into a `VmValue::Iter`. Used by the
1135/// `iter()` builtin and by combinator/sink implementations in later steps.
1136pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1137    let inner = match v {
1138        VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1139        VmValue::Range(r) => {
1140            let stop = if r.inclusive {
1141                r.end.saturating_add(1)
1142            } else {
1143                r.end
1144            };
1145            VmIter::Range {
1146                next: r.start,
1147                stop,
1148            }
1149        }
1150        VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1151        VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1152        VmValue::Dict(entries) => {
1153            let keys: Vec<String> = entries.keys().cloned().collect();
1154            VmIter::Dict {
1155                entries,
1156                keys,
1157                idx: 0,
1158            }
1159        }
1160        VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1161        VmValue::Generator(gen) => VmIter::Gen { gen },
1162        VmValue::Stream(stream) => VmIter::Stream { stream },
1163        VmValue::Channel(handle) => VmIter::Chan { handle },
1164        other => {
1165            return Err(VmError::TypeError(format!(
1166                "iter: value of type {} is not iterable",
1167                other.type_name()
1168            )))
1169        }
1170    };
1171    Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
1172}