Skip to main content

harn_vm/vm/
iter.rs

1//! Lazy iterator protocol for the Harn VM.
2//!
3//! `VmIter` is the backing enum for `VmValue::Iter`. It's a single-pass, fused
4//! iterator; once `next` returns `None` the variant is replaced with
5//! `Exhausted`.
6
7use std::cell::RefCell;
8use std::collections::{BTreeMap, VecDeque};
9use std::rc::Rc;
10
11use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
12
13fn range_initial_done(start: i64, end: i64, inclusive: bool) -> bool {
14    if inclusive {
15        start > end
16    } else {
17        start >= end
18    }
19}
20
21fn range_next(next: &mut i64, end: i64, inclusive: bool, done: &mut bool) -> Option<i64> {
22    if *done {
23        return None;
24    }
25    let value = *next;
26    let at_end = if inclusive {
27        value >= end
28    } else {
29        value
30            .checked_add(1)
31            .is_none_or(|candidate| candidate >= end)
32    };
33    if at_end {
34        *done = true;
35    } else {
36        *next += 1;
37    }
38    Some(value)
39}
40
41#[derive(Debug)]
42pub struct VmBroadcastState {
43    source: Rc<RefCell<VmIter>>,
44    buffer: Vec<VmValue>,
45    exhausted: bool,
46}
47
48/// Backing enum for `VmValue::Iter`. See module docs.
49#[derive(Debug)]
50pub enum VmIter {
51    /// Step through a lazy integer range without materializing.
52    Range {
53        next: i64,
54        end: i64,
55        inclusive: bool,
56        done: bool,
57    },
58    /// Snapshot over a shared list / set backing store.
59    Vec { items: Rc<Vec<VmValue>>, idx: usize },
60    /// Snapshot over a dict; yields `Pair(key, value)` items.
61    Dict {
62        entries: Rc<BTreeMap<String, VmValue>>,
63        keys: Vec<String>,
64        idx: usize,
65    },
66    /// Unicode scalar iteration over a string.
67    Chars { s: Rc<str>, byte_idx: usize },
68    /// Drains a generator's yield channel.
69    Gen { gen: Rc<VmGenerator> },
70    /// Drains a stream's emit channel.
71    Stream { stream: Rc<VmStream> },
72    /// Reads from a channel handle.
73    Chan { handle: Rc<VmChannelHandle> },
74    /// Maps each item through a closure.
75    Map {
76        inner: Rc<RefCell<VmIter>>,
77        f: VmValue,
78    },
79    /// Runs a callback for side effects, then yields the original item.
80    Tap {
81        inner: Rc<RefCell<VmIter>>,
82        f: VmValue,
83    },
84    /// Keeps only items for which the predicate is truthy.
85    Filter {
86        inner: Rc<RefCell<VmIter>>,
87        p: VmValue,
88    },
89    /// Running fold that yields each accumulator.
90    Scan {
91        inner: Rc<RefCell<VmIter>>,
92        acc: VmValue,
93        f: VmValue,
94    },
95    /// Maps each item to an iterable and flattens one level.
96    FlatMap {
97        inner: Rc<RefCell<VmIter>>,
98        f: VmValue,
99        cur: Option<Rc<RefCell<VmIter>>>,
100    },
101    /// Yields up to `remaining` items from `inner`, then becomes Exhausted.
102    Take {
103        inner: Rc<RefCell<VmIter>>,
104        remaining: usize,
105    },
106    /// Skips the first `remaining` items from `inner` on the first call, then
107    /// forwards. `remaining == 0` is the sentinel for "already primed".
108    Skip {
109        inner: Rc<RefCell<VmIter>>,
110        remaining: usize,
111    },
112    /// Yields items from `inner` while the predicate is truthy; after the
113    /// first falsy predicate or inner exhaustion, becomes Exhausted.
114    TakeWhile {
115        inner: Rc<RefCell<VmIter>>,
116        p: VmValue,
117        done: bool,
118    },
119    /// Yields items until the predicate is truthy. The matching sentinel item
120    /// is consumed but not yielded.
121    TakeUntil {
122        inner: Rc<RefCell<VmIter>>,
123        p: VmValue,
124    },
125    /// Discards items while the predicate is truthy; after the first falsy
126    /// item, forwards that item and all subsequent items from `inner`.
127    SkipWhile {
128        inner: Rc<RefCell<VmIter>>,
129        p: VmValue,
130        primed: bool,
131    },
132    /// Advances two inner iters in lockstep; yields `Pair(a, b)` until either
133    /// side is exhausted.
134    Zip {
135        a: Rc<RefCell<VmIter>>,
136        b: Rc<RefCell<VmIter>>,
137    },
138    /// Yields `Pair(i, item)` starting at `i = 0`.
139    Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
140    /// Concatenates two iters: drains `a` first, then `b`.
141    Chain {
142        a: Rc<RefCell<VmIter>>,
143        b: Rc<RefCell<VmIter>>,
144        on_a: bool,
145    },
146    /// Drains any non-exhausted source in rotating order.
147    Merge {
148        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
149        cursor: usize,
150    },
151    /// Strict round-robin over non-exhausted sources.
152    Interleave {
153        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
154        cursor: usize,
155    },
156    /// First source to yield wins; subsequent pulls only read that source.
157    Race {
158        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
159        winner: Option<Rc<RefCell<VmIter>>>,
160    },
161    /// One source fanned out into several single-pass branches.
162    Broadcast {
163        shared: Rc<RefCell<VmBroadcastState>>,
164        branch: usize,
165        index: usize,
166    },
167    /// Sleeps between emissions after the first item.
168    Throttle {
169        inner: Rc<RefCell<VmIter>>,
170        interval_ms: u64,
171        next_ready: Option<tokio::time::Instant>,
172    },
173    /// Coalesces immediately available bursts and emits the last item seen
174    /// after the quiet window.
175    Debounce {
176        inner: Rc<RefCell<VmIter>>,
177        window_ms: u64,
178    },
179    /// Yields `VmValue::List` batches of up to `n` items from `inner`.
180    /// The final batch may be shorter; empty input yields no batches.
181    Chunks {
182        inner: Rc<RefCell<VmIter>>,
183        n: usize,
184    },
185    /// Yields sliding windows of exactly `n` items from `inner` as `VmValue::List`.
186    /// If the input has fewer than `n` items total, no windows are yielded.
187    Windows {
188        inner: Rc<RefCell<VmIter>>,
189        n: usize,
190        buf: VecDeque<VmValue>,
191    },
192    /// Terminal state: `next` always returns `None`.
193    Exhausted,
194}
195
196impl VmIter {
197    /// Produce the next value, or `None` when exhausted.
198    ///
199    /// Combinator variants (`Map`, `Filter`, `FlatMap`) invoke user-provided
200    /// closures through the `vm` parameter.
201    pub fn next<'a>(
202        &'a mut self,
203        vm: &'a mut crate::vm::Vm,
204    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
205    {
206        Box::pin(async move { self.next_impl(vm).await })
207    }
208
209    async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
210        match self {
211            VmIter::Exhausted => Ok(None),
212            VmIter::Range {
213                next,
214                end,
215                inclusive,
216                done,
217            } => {
218                if let Some(v) = range_next(next, *end, *inclusive, done) {
219                    Ok(Some(VmValue::Int(v)))
220                } else {
221                    *self = VmIter::Exhausted;
222                    Ok(None)
223                }
224            }
225            VmIter::Vec { items, idx } => {
226                if *idx < items.len() {
227                    let v = items[*idx].clone();
228                    *idx += 1;
229                    Ok(Some(v))
230                } else {
231                    *self = VmIter::Exhausted;
232                    Ok(None)
233                }
234            }
235            VmIter::Dict { entries, keys, idx } => {
236                if *idx < keys.len() {
237                    let k = &keys[*idx];
238                    let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
239                    *idx += 1;
240                    Ok(Some(VmValue::Pair(Rc::new((
241                        VmValue::String(Rc::from(k.as_str())),
242                        v,
243                    )))))
244                } else {
245                    *self = VmIter::Exhausted;
246                    Ok(None)
247                }
248            }
249            VmIter::Chars { s, byte_idx } => {
250                if *byte_idx >= s.len() {
251                    *self = VmIter::Exhausted;
252                    return Ok(None);
253                }
254                let rest = &s[*byte_idx..];
255                if let Some(c) = rest.chars().next() {
256                    *byte_idx += c.len_utf8();
257                    let mut buf = [0u8; 4];
258                    let encoded = c.encode_utf8(&mut buf);
259                    Ok(Some(VmValue::String(Rc::from(&*encoded))))
260                } else {
261                    *self = VmIter::Exhausted;
262                    Ok(None)
263                }
264            }
265            VmIter::Gen { gen } => {
266                if gen.done.get() {
267                    *self = VmIter::Exhausted;
268                    return Ok(None);
269                }
270                let rx = gen.receiver.clone();
271                let mut guard = rx.lock().await;
272                match guard.recv().await {
273                    Some(Ok(v)) => Ok(Some(v)),
274                    Some(Err(error)) => {
275                        gen.done.set(true);
276                        drop(guard);
277                        *self = VmIter::Exhausted;
278                        Err(error)
279                    }
280                    None => {
281                        gen.done.set(true);
282                        drop(guard);
283                        *self = VmIter::Exhausted;
284                        Ok(None)
285                    }
286                }
287            }
288            VmIter::Stream { stream } => {
289                if stream.done.get() {
290                    *self = VmIter::Exhausted;
291                    return Ok(None);
292                }
293                let rx = stream.receiver.clone();
294                let mut guard = rx.lock().await;
295                match guard.recv().await {
296                    Some(Ok(v)) => Ok(Some(v)),
297                    Some(Err(error)) => {
298                        stream.done.set(true);
299                        drop(guard);
300                        *self = VmIter::Exhausted;
301                        Err(error)
302                    }
303                    None => {
304                        stream.done.set(true);
305                        drop(guard);
306                        *self = VmIter::Exhausted;
307                        Ok(None)
308                    }
309                }
310            }
311            VmIter::Map { inner, f } => {
312                let f = f.clone();
313                let item = next_handle(inner, vm).await?;
314                match item {
315                    None => {
316                        *self = VmIter::Exhausted;
317                        Ok(None)
318                    }
319                    Some(v) => {
320                        let out = vm.call_callable_value(&f, &[v]).await?;
321                        Ok(Some(out))
322                    }
323                }
324            }
325            VmIter::Tap { inner, f } => {
326                let f = f.clone();
327                let item = next_handle(inner, vm).await?;
328                match item {
329                    None => {
330                        *self = VmIter::Exhausted;
331                        Ok(None)
332                    }
333                    Some(v) => {
334                        vm.call_callable_value(&f, &[v.clone()]).await?;
335                        Ok(Some(v))
336                    }
337                }
338            }
339            VmIter::Filter { inner, p } => {
340                let p = p.clone();
341                loop {
342                    let item = next_handle(inner, vm).await?;
343                    match item {
344                        None => {
345                            *self = VmIter::Exhausted;
346                            return Ok(None);
347                        }
348                        Some(v) => {
349                            let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
350                            if keep.is_truthy() {
351                                return Ok(Some(v));
352                            }
353                        }
354                    }
355                }
356            }
357            VmIter::Scan { inner, acc, f } => {
358                let f = f.clone();
359                let item = next_handle(inner, vm).await?;
360                match item {
361                    None => {
362                        *self = VmIter::Exhausted;
363                        Ok(None)
364                    }
365                    Some(v) => {
366                        let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
367                        *acc = next_acc.clone();
368                        Ok(Some(next_acc))
369                    }
370                }
371            }
372            VmIter::FlatMap { inner, f, cur } => {
373                let f = f.clone();
374                loop {
375                    if let Some(cur_iter) = cur.clone() {
376                        let item = next_handle(&cur_iter, vm).await?;
377                        if let Some(v) = item {
378                            return Ok(Some(v));
379                        }
380                        *cur = None;
381                    }
382                    let item = next_handle(inner, vm).await?;
383                    match item {
384                        None => {
385                            *self = VmIter::Exhausted;
386                            return Ok(None);
387                        }
388                        Some(v) => {
389                            let result = vm.call_callable_value(&f, &[v]).await?;
390                            let lifted = iter_from_value(result)?;
391                            if let VmValue::Iter(h) = lifted {
392                                *cur = Some(h);
393                            } else {
394                                return Err(VmError::TypeError(
395                                    "flat_map: expected iterable result".to_string(),
396                                ));
397                            }
398                        }
399                    }
400                }
401            }
402            VmIter::Take { inner, remaining } => {
403                if *remaining == 0 {
404                    *self = VmIter::Exhausted;
405                    return Ok(None);
406                }
407                let item = next_handle(inner, vm).await?;
408                match item {
409                    None => {
410                        *self = VmIter::Exhausted;
411                        Ok(None)
412                    }
413                    Some(v) => {
414                        *remaining -= 1;
415                        if *remaining == 0 {
416                            *self = VmIter::Exhausted;
417                        }
418                        Ok(Some(v))
419                    }
420                }
421            }
422            VmIter::Skip { inner, remaining } => {
423                while *remaining > 0 {
424                    let item = next_handle(inner, vm).await?;
425                    match item {
426                        None => {
427                            *self = VmIter::Exhausted;
428                            return Ok(None);
429                        }
430                        Some(_) => {
431                            *remaining -= 1;
432                        }
433                    }
434                }
435                let item = next_handle(inner, vm).await?;
436                match item {
437                    None => {
438                        *self = VmIter::Exhausted;
439                        Ok(None)
440                    }
441                    Some(v) => Ok(Some(v)),
442                }
443            }
444            VmIter::TakeWhile { inner, p, done } => {
445                if *done {
446                    return Ok(None);
447                }
448                let p = p.clone();
449                let item = next_handle(inner, vm).await?;
450                match item {
451                    None => {
452                        *self = VmIter::Exhausted;
453                        Ok(None)
454                    }
455                    Some(v) => {
456                        let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
457                        if keep.is_truthy() {
458                            Ok(Some(v))
459                        } else {
460                            *self = VmIter::Exhausted;
461                            Ok(None)
462                        }
463                    }
464                }
465            }
466            VmIter::TakeUntil { inner, p } => {
467                let p = p.clone();
468                let item = next_handle(inner, vm).await?;
469                match item {
470                    None => {
471                        *self = VmIter::Exhausted;
472                        Ok(None)
473                    }
474                    Some(v) => {
475                        let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
476                        if stop.is_truthy() {
477                            *self = VmIter::Exhausted;
478                            Ok(None)
479                        } else {
480                            Ok(Some(v))
481                        }
482                    }
483                }
484            }
485            VmIter::SkipWhile { inner, p, primed } => {
486                if *primed {
487                    let item = next_handle(inner, vm).await?;
488                    return match item {
489                        None => {
490                            *self = VmIter::Exhausted;
491                            Ok(None)
492                        }
493                        Some(v) => Ok(Some(v)),
494                    };
495                }
496                let p = p.clone();
497                loop {
498                    let item = next_handle(inner, vm).await?;
499                    match item {
500                        None => {
501                            *self = VmIter::Exhausted;
502                            return Ok(None);
503                        }
504                        Some(v) => {
505                            let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
506                            if !drop_it.is_truthy() {
507                                *primed = true;
508                                return Ok(Some(v));
509                            }
510                        }
511                    }
512                }
513            }
514            VmIter::Zip { a, b } => {
515                let ia = next_handle(a, vm).await?;
516                let x = match ia {
517                    None => {
518                        *self = VmIter::Exhausted;
519                        return Ok(None);
520                    }
521                    Some(v) => v,
522                };
523                let ib = next_handle(b, vm).await?;
524                let y = match ib {
525                    None => {
526                        *self = VmIter::Exhausted;
527                        return Ok(None);
528                    }
529                    Some(v) => v,
530                };
531                Ok(Some(VmValue::Pair(Rc::new((x, y)))))
532            }
533            VmIter::Enumerate { inner, i } => {
534                let item = next_handle(inner, vm).await?;
535                match item {
536                    None => {
537                        *self = VmIter::Exhausted;
538                        Ok(None)
539                    }
540                    Some(v) => {
541                        let idx = *i;
542                        *i += 1;
543                        Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
544                    }
545                }
546            }
547            VmIter::Chain { a, b, on_a } => {
548                if *on_a {
549                    let item = next_handle(a, vm).await?;
550                    if let Some(v) = item {
551                        return Ok(Some(v));
552                    }
553                    *on_a = false;
554                }
555                let item = next_handle(b, vm).await?;
556                match item {
557                    None => {
558                        *self = VmIter::Exhausted;
559                        Ok(None)
560                    }
561                    Some(v) => Ok(Some(v)),
562                }
563            }
564            VmIter::Merge { sources, cursor } => loop {
565                if sources.is_empty() || sources.iter().all(Option::is_none) {
566                    *self = VmIter::Exhausted;
567                    return Ok(None);
568                }
569                let len = sources.len();
570                let mut live = 0usize;
571                for offset in 0..len {
572                    let idx = (*cursor + offset) % len;
573                    let Some(handle) = sources[idx].clone() else {
574                        continue;
575                    };
576                    match try_next_ready(&handle, vm).await? {
577                        Some(v) => {
578                            *cursor = (idx + 1) % len;
579                            return Ok(Some(v));
580                        }
581                        None => {
582                            if is_exhausted_handle(&handle) {
583                                sources[idx] = None;
584                            } else {
585                                live += 1;
586                            }
587                        }
588                    }
589                }
590                if live == 0 {
591                    *self = VmIter::Exhausted;
592                    return Ok(None);
593                }
594                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
595            },
596            VmIter::Interleave { sources, cursor } => {
597                if sources.is_empty() || sources.iter().all(Option::is_none) {
598                    *self = VmIter::Exhausted;
599                    return Ok(None);
600                }
601                let len = sources.len();
602                for offset in 0..len {
603                    let idx = (*cursor + offset) % len;
604                    let Some(handle) = sources[idx].clone() else {
605                        continue;
606                    };
607                    match next_handle(&handle, vm).await? {
608                        Some(v) => {
609                            *cursor = (idx + 1) % len;
610                            return Ok(Some(v));
611                        }
612                        None => {
613                            sources[idx] = None;
614                        }
615                    }
616                }
617                *self = VmIter::Exhausted;
618                Ok(None)
619            }
620            VmIter::Race { sources, winner } => {
621                if let Some(handle) = winner.clone() {
622                    let item = next_handle(&handle, vm).await?;
623                    return match item {
624                        Some(v) => Ok(Some(v)),
625                        None => {
626                            *self = VmIter::Exhausted;
627                            Ok(None)
628                        }
629                    };
630                }
631                loop {
632                    let mut live = 0usize;
633                    for source in sources.iter_mut() {
634                        let Some(handle) = source.clone() else {
635                            continue;
636                        };
637                        match try_next_ready(&handle, vm).await? {
638                            Some(v) => {
639                                *winner = Some(handle);
640                                sources.clear();
641                                return Ok(Some(v));
642                            }
643                            None => {
644                                if is_exhausted_handle(&handle) {
645                                    *source = None;
646                                } else {
647                                    live += 1;
648                                }
649                            }
650                        }
651                    }
652                    if live == 0 {
653                        *self = VmIter::Exhausted;
654                        return Ok(None);
655                    }
656                    tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
657                }
658            }
659            VmIter::Broadcast {
660                shared,
661                branch,
662                index,
663            } => {
664                let _ = branch;
665                loop {
666                    let mut state =
667                        std::mem::replace(&mut *shared.borrow_mut(), empty_broadcast_state());
668                    if *index < state.buffer.len() {
669                        let item = state.buffer[*index].clone();
670                        *index += 1;
671                        *shared.borrow_mut() = state;
672                        return Ok(Some(item));
673                    }
674                    if state.exhausted {
675                        *shared.borrow_mut() = state;
676                        *self = VmIter::Exhausted;
677                        return Ok(None);
678                    }
679                    let next = next_handle(&state.source, vm).await;
680                    match next {
681                        Err(err) => {
682                            *shared.borrow_mut() = state;
683                            return Err(err);
684                        }
685                        Ok(Some(v)) => {
686                            state.buffer.push(v);
687                            *shared.borrow_mut() = state;
688                        }
689                        Ok(None) => {
690                            state.exhausted = true;
691                            *shared.borrow_mut() = state;
692                        }
693                    }
694                }
695            }
696            VmIter::Throttle {
697                inner,
698                interval_ms,
699                next_ready,
700            } => {
701                if let Some(ready_at) = next_ready.take() {
702                    let now = tokio::time::Instant::now();
703                    if ready_at > now {
704                        tokio::time::sleep_until(ready_at).await;
705                    }
706                }
707                let item = next_handle(inner, vm).await?;
708                match item {
709                    None => {
710                        *self = VmIter::Exhausted;
711                        Ok(None)
712                    }
713                    Some(v) => {
714                        *next_ready = Some(
715                            tokio::time::Instant::now()
716                                + tokio::time::Duration::from_millis(*interval_ms),
717                        );
718                        Ok(Some(v))
719                    }
720                }
721            }
722            VmIter::Debounce { inner, window_ms } => {
723                let mut last = match next_handle(inner, vm).await? {
724                    Some(v) => v,
725                    None => {
726                        *self = VmIter::Exhausted;
727                        return Ok(None);
728                    }
729                };
730                if *window_ms > 0 {
731                    tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
732                }
733                while let Some(v) = try_next_ready(inner, vm).await? {
734                    last = v;
735                }
736                Ok(Some(last))
737            }
738            VmIter::Chunks { inner, n } => {
739                let n = *n;
740                let mut batch: Vec<VmValue> = Vec::with_capacity(n);
741                for _ in 0..n {
742                    let item = next_handle(inner, vm).await?;
743                    match item {
744                        Some(v) => {
745                            batch.push(v);
746                        }
747                        None => break,
748                    }
749                }
750                if batch.is_empty() {
751                    *self = VmIter::Exhausted;
752                    Ok(None)
753                } else {
754                    Ok(Some(VmValue::List(Rc::new(batch))))
755                }
756            }
757            VmIter::Windows { inner, n, buf } => {
758                let n = *n;
759                if buf.is_empty() {
760                    while buf.len() < n {
761                        let item = next_handle(inner, vm).await?;
762                        match item {
763                            Some(v) => buf.push_back(v),
764                            None => {
765                                *self = VmIter::Exhausted;
766                                return Ok(None);
767                            }
768                        }
769                    }
770                } else {
771                    let item = next_handle(inner, vm).await?;
772                    match item {
773                        Some(v) => {
774                            buf.pop_front();
775                            buf.push_back(v);
776                        }
777                        None => {
778                            *self = VmIter::Exhausted;
779                            return Ok(None);
780                        }
781                    }
782                }
783                let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
784                Ok(Some(VmValue::List(Rc::new(snapshot))))
785            }
786            VmIter::Chan { handle } => {
787                let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
788                let rx = handle.receiver.clone();
789                let mut guard = rx.lock().await;
790                let item = if is_closed {
791                    guard.try_recv().ok()
792                } else {
793                    guard.recv().await
794                };
795                match item {
796                    Some(v) => Ok(Some(v)),
797                    None => {
798                        drop(guard);
799                        *self = VmIter::Exhausted;
800                        Ok(None)
801                    }
802                }
803            }
804        }
805    }
806}
807
808/// Advance a handle without holding a `RefCell` borrow across the await.
809///
810/// Swaps the iter state out into a local owned value (replacing it with
811/// `Exhausted`), runs `next` on the owned state, then swaps it back. This
812/// avoids `clippy::await_holding_refcell_ref` while preserving single-pass
813/// semantics: a nested `next` call on the same handle during the await would
814/// see `Exhausted` (the iter protocol doesn't permit re-entrant stepping of
815/// the same handle anyway).
816pub async fn next_handle(
817    handle: &Rc<RefCell<VmIter>>,
818    vm: &mut crate::vm::Vm,
819) -> Result<Option<VmValue>, VmError> {
820    let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
821    let result = state.next(vm).await;
822    // Restore state unless the inner call replaced it with Exhausted.
823    *handle.borrow_mut() = state;
824    result
825}
826
827/// Fully consume an iter handle into a Vec of values.
828pub async fn drain(
829    handle: &Rc<RefCell<VmIter>>,
830    vm: &mut crate::vm::Vm,
831) -> Result<Vec<VmValue>, VmError> {
832    let mut out = Vec::new();
833    loop {
834        let v = next_handle(handle, vm).await?;
835        match v {
836            Some(v) => out.push(v),
837            None => break,
838        }
839    }
840    Ok(out)
841}
842
843/// Fully consume an iter handle into a Vec, failing before pushing item
844/// `max + 1`.
845pub async fn drain_capped(
846    handle: &Rc<RefCell<VmIter>>,
847    vm: &mut crate::vm::Vm,
848    max: usize,
849) -> Result<Vec<VmValue>, VmError> {
850    let mut out = Vec::new();
851    loop {
852        let v = next_handle(handle, vm).await?;
853        match v {
854            Some(v) => {
855                if out.len() >= max {
856                    return Err(VmError::Runtime(format!(
857                        "stream.collect: max cap {max} exceeded"
858                    )));
859                }
860                out.push(v);
861            }
862            None => break,
863        }
864    }
865    Ok(out)
866}
867
868pub fn iter_handle_from_value(v: VmValue) -> Result<Rc<RefCell<VmIter>>, VmError> {
869    match iter_from_value(v)? {
870        VmValue::Iter(handle) => Ok(handle),
871        _ => unreachable!("iter_from_value returns Iter"),
872    }
873}
874
875pub fn broadcast_branches(source: Rc<RefCell<VmIter>>, n: usize) -> Vec<VmValue> {
876    let shared = Rc::new(RefCell::new(VmBroadcastState {
877        source,
878        buffer: Vec::new(),
879        exhausted: false,
880    }));
881    (0..n)
882        .map(|branch| {
883            VmValue::Iter(Rc::new(RefCell::new(VmIter::Broadcast {
884                shared: Rc::clone(&shared),
885                branch,
886                index: 0,
887            })))
888        })
889        .collect()
890}
891
892fn empty_broadcast_state() -> VmBroadcastState {
893    VmBroadcastState {
894        source: Rc::new(RefCell::new(VmIter::Exhausted)),
895        buffer: Vec::new(),
896        exhausted: true,
897    }
898}
899
900fn try_next_ready<'a>(
901    handle: &'a Rc<RefCell<VmIter>>,
902    vm: &'a mut crate::vm::Vm,
903) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>> {
904    Box::pin(async move {
905        let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
906        let result = state.try_next_ready_impl(vm).await;
907        *handle.borrow_mut() = state;
908        result
909    })
910}
911
912fn is_exhausted_handle(handle: &Rc<RefCell<VmIter>>) -> bool {
913    matches!(&*handle.borrow(), VmIter::Exhausted)
914}
915
916impl VmIter {
917    async fn try_next_ready_impl(
918        &mut self,
919        vm: &mut crate::vm::Vm,
920    ) -> Result<Option<VmValue>, VmError> {
921        match self {
922            VmIter::Exhausted => Ok(None),
923            VmIter::Map { inner, f } => {
924                let f = f.clone();
925                match try_next_ready(inner, vm).await? {
926                    Some(v) => Ok(Some(vm.call_callable_value(&f, &[v]).await?)),
927                    None => {
928                        if is_exhausted_handle(inner) {
929                            *self = VmIter::Exhausted;
930                        }
931                        Ok(None)
932                    }
933                }
934            }
935            VmIter::Tap { inner, f } => {
936                let f = f.clone();
937                match try_next_ready(inner, vm).await? {
938                    Some(v) => {
939                        vm.call_callable_value(&f, &[v.clone()]).await?;
940                        Ok(Some(v))
941                    }
942                    None => {
943                        if is_exhausted_handle(inner) {
944                            *self = VmIter::Exhausted;
945                        }
946                        Ok(None)
947                    }
948                }
949            }
950            VmIter::Filter { inner, p } => {
951                let p = p.clone();
952                loop {
953                    match try_next_ready(inner, vm).await? {
954                        Some(v) => {
955                            let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
956                            if keep.is_truthy() {
957                                return Ok(Some(v));
958                            }
959                        }
960                        None => {
961                            if is_exhausted_handle(inner) {
962                                *self = VmIter::Exhausted;
963                            }
964                            return Ok(None);
965                        }
966                    }
967                }
968            }
969            VmIter::Scan { inner, acc, f } => {
970                let f = f.clone();
971                match try_next_ready(inner, vm).await? {
972                    Some(v) => {
973                        let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
974                        *acc = next_acc.clone();
975                        Ok(Some(next_acc))
976                    }
977                    None => {
978                        if is_exhausted_handle(inner) {
979                            *self = VmIter::Exhausted;
980                        }
981                        Ok(None)
982                    }
983                }
984            }
985            VmIter::FlatMap { inner, f, cur } => {
986                let f = f.clone();
987                loop {
988                    if let Some(cur_iter) = cur.clone() {
989                        match try_next_ready(&cur_iter, vm).await? {
990                            Some(v) => return Ok(Some(v)),
991                            None => {
992                                if is_exhausted_handle(&cur_iter) {
993                                    *cur = None;
994                                }
995                                return Ok(None);
996                            }
997                        }
998                    }
999                    match try_next_ready(inner, vm).await? {
1000                        Some(v) => {
1001                            let result = vm.call_callable_value(&f, &[v]).await?;
1002                            *cur = Some(iter_handle_from_value(result)?);
1003                        }
1004                        None => {
1005                            if is_exhausted_handle(inner) {
1006                                *self = VmIter::Exhausted;
1007                            }
1008                            return Ok(None);
1009                        }
1010                    }
1011                }
1012            }
1013            VmIter::Take { inner, remaining } => {
1014                if *remaining == 0 {
1015                    *self = VmIter::Exhausted;
1016                    return Ok(None);
1017                }
1018                match try_next_ready(inner, vm).await? {
1019                    Some(v) => {
1020                        *remaining -= 1;
1021                        if *remaining == 0 {
1022                            *self = VmIter::Exhausted;
1023                        }
1024                        Ok(Some(v))
1025                    }
1026                    None => {
1027                        if is_exhausted_handle(inner) {
1028                            *self = VmIter::Exhausted;
1029                        }
1030                        Ok(None)
1031                    }
1032                }
1033            }
1034            VmIter::TakeUntil { inner, p } => {
1035                let p = p.clone();
1036                match try_next_ready(inner, vm).await? {
1037                    Some(v) => {
1038                        let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
1039                        if stop.is_truthy() {
1040                            *self = VmIter::Exhausted;
1041                            Ok(None)
1042                        } else {
1043                            Ok(Some(v))
1044                        }
1045                    }
1046                    None => {
1047                        if is_exhausted_handle(inner) {
1048                            *self = VmIter::Exhausted;
1049                        }
1050                        Ok(None)
1051                    }
1052                }
1053            }
1054            VmIter::Throttle {
1055                inner,
1056                interval_ms,
1057                next_ready,
1058            } => {
1059                if let Some(ready_at) = *next_ready {
1060                    if ready_at > tokio::time::Instant::now() {
1061                        return Ok(None);
1062                    }
1063                }
1064                match try_next_ready(inner, vm).await? {
1065                    Some(v) => {
1066                        *next_ready = Some(
1067                            tokio::time::Instant::now()
1068                                + tokio::time::Duration::from_millis(*interval_ms),
1069                        );
1070                        Ok(Some(v))
1071                    }
1072                    None => {
1073                        if is_exhausted_handle(inner) {
1074                            *self = VmIter::Exhausted;
1075                        }
1076                        Ok(None)
1077                    }
1078                }
1079            }
1080            VmIter::Range { .. }
1081            | VmIter::Vec { .. }
1082            | VmIter::Dict { .. }
1083            | VmIter::Chars { .. }
1084            | VmIter::Skip { .. }
1085            | VmIter::TakeWhile { .. }
1086            | VmIter::SkipWhile { .. }
1087            | VmIter::Zip { .. }
1088            | VmIter::Enumerate { .. }
1089            | VmIter::Chain { .. }
1090            | VmIter::Merge { .. }
1091            | VmIter::Interleave { .. }
1092            | VmIter::Race { .. }
1093            | VmIter::Broadcast { .. }
1094            | VmIter::Debounce { .. }
1095            | VmIter::Chunks { .. }
1096            | VmIter::Windows { .. } => self.next(vm).await,
1097            VmIter::Gen { gen } => {
1098                if gen.done.get() {
1099                    *self = VmIter::Exhausted;
1100                    return Ok(None);
1101                }
1102                let rx = gen.receiver.clone();
1103                let result = match rx.try_lock() {
1104                    Ok(mut guard) => match guard.try_recv() {
1105                        Ok(Ok(v)) => Ok(Some(v)),
1106                        Ok(Err(error)) => {
1107                            gen.done.set(true);
1108                            *self = VmIter::Exhausted;
1109                            Err(error)
1110                        }
1111                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1112                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1113                            gen.done.set(true);
1114                            *self = VmIter::Exhausted;
1115                            Ok(None)
1116                        }
1117                    },
1118                    Err(_) => Ok(None),
1119                };
1120                result
1121            }
1122            VmIter::Stream { stream } => {
1123                if stream.done.get() {
1124                    *self = VmIter::Exhausted;
1125                    return Ok(None);
1126                }
1127                let rx = stream.receiver.clone();
1128                let result = match rx.try_lock() {
1129                    Ok(mut guard) => match guard.try_recv() {
1130                        Ok(Ok(v)) => Ok(Some(v)),
1131                        Ok(Err(error)) => {
1132                            stream.done.set(true);
1133                            *self = VmIter::Exhausted;
1134                            Err(error)
1135                        }
1136                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1137                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1138                            stream.done.set(true);
1139                            *self = VmIter::Exhausted;
1140                            Ok(None)
1141                        }
1142                    },
1143                    Err(_) => Ok(None),
1144                };
1145                result
1146            }
1147            VmIter::Chan { handle } => {
1148                let rx = handle.receiver.clone();
1149                let result = match rx.try_lock() {
1150                    Ok(mut guard) => match guard.try_recv() {
1151                        Ok(v) => Ok(Some(v)),
1152                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1153                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1154                            *self = VmIter::Exhausted;
1155                            Ok(None)
1156                        }
1157                    },
1158                    Err(_) => Ok(None),
1159                };
1160                result
1161            }
1162        }
1163    }
1164}
1165
1166/// Convenience: wrap a source value into a `VmValue::Iter`. Used by the
1167/// `iter()` builtin and by combinator/sink implementations in later steps.
1168pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1169    let inner = match v {
1170        VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1171        VmValue::Range(r) => VmIter::Range {
1172            next: r.start,
1173            end: r.end,
1174            inclusive: r.inclusive,
1175            done: range_initial_done(r.start, r.end, r.inclusive),
1176        },
1177        VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1178        VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1179        VmValue::Dict(entries) => {
1180            let keys: Vec<String> = entries.keys().cloned().collect();
1181            VmIter::Dict {
1182                entries,
1183                keys,
1184                idx: 0,
1185            }
1186        }
1187        VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1188        VmValue::Generator(gen) => VmIter::Gen { gen },
1189        VmValue::Stream(stream) => VmIter::Stream { stream },
1190        VmValue::Channel(handle) => VmIter::Chan { handle },
1191        other => {
1192            return Err(VmError::TypeError(format!(
1193                "iter: value of type {} is not iterable",
1194                other.type_name()
1195            )))
1196        }
1197    };
1198    Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use super::*;
1204    use crate::value::VmRange;
1205
1206    fn run_iter_test(test: impl std::future::Future<Output = ()>) {
1207        let rt = tokio::runtime::Builder::new_current_thread()
1208            .enable_all()
1209            .build()
1210            .unwrap();
1211        rt.block_on(test);
1212    }
1213
1214    #[test]
1215    fn inclusive_range_at_i64_max_yields_the_endpoint() {
1216        run_iter_test(async {
1217            let mut vm = crate::vm::Vm::new();
1218            let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1219                start: i64::MAX,
1220                end: i64::MAX,
1221                inclusive: true,
1222            }))
1223            .unwrap() else {
1224                panic!("expected iter");
1225            };
1226
1227            let first = next_handle(&handle, &mut vm).await.unwrap();
1228            assert!(matches!(first, Some(VmValue::Int(value)) if value == i64::MAX));
1229            assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1230        });
1231    }
1232
1233    #[test]
1234    fn exclusive_range_at_i64_max_is_empty() {
1235        run_iter_test(async {
1236            let mut vm = crate::vm::Vm::new();
1237            let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1238                start: i64::MAX,
1239                end: i64::MAX,
1240                inclusive: false,
1241            }))
1242            .unwrap() else {
1243                panic!("expected iter");
1244            };
1245
1246            assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1247        });
1248    }
1249}