Skip to main content

harn_vm/vm/
iter.rs

1//! Lazy iterator protocol for the Harn VM.
2//!
3//! `VmIter` is the backing enum for `VmValue::Iter`. It's a single-pass, fused
4//! iterator; once `next` returns `None` the variant is replaced with
5//! `Exhausted`.
6
7use std::cell::RefCell;
8use std::collections::{BTreeMap, VecDeque};
9use std::rc::Rc;
10
11use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
12
13fn range_initial_done(start: i64, end: i64, inclusive: bool) -> bool {
14    if inclusive {
15        start > end
16    } else {
17        start >= end
18    }
19}
20
21fn range_next(next: &mut i64, end: i64, inclusive: bool, done: &mut bool) -> Option<i64> {
22    if *done {
23        return None;
24    }
25    let value = *next;
26    let at_end = if inclusive {
27        value >= end
28    } else {
29        value
30            .checked_add(1)
31            .is_none_or(|candidate| candidate >= end)
32    };
33    if at_end {
34        *done = true;
35    } else {
36        *next += 1;
37    }
38    Some(value)
39}
40
41#[derive(Debug)]
42pub struct VmBroadcastState {
43    source: Rc<RefCell<VmIter>>,
44    buffer: Vec<VmValue>,
45    exhausted: bool,
46}
47
48/// Backing enum for `VmValue::Iter`. See module docs.
49#[derive(Debug)]
50pub enum VmIter {
51    /// Step through a lazy integer range without materializing.
52    Range {
53        next: i64,
54        end: i64,
55        inclusive: bool,
56        done: bool,
57    },
58    /// Snapshot over a shared list / set backing store.
59    Vec { items: Rc<Vec<VmValue>>, idx: usize },
60    /// Snapshot over a dict; yields `Pair(key, value)` items.
61    Dict {
62        entries: Rc<BTreeMap<String, VmValue>>,
63        keys: Vec<String>,
64        idx: usize,
65    },
66    /// Unicode scalar iteration over a string.
67    Chars { s: Rc<str>, byte_idx: usize },
68    /// Drains a generator's yield channel.
69    Gen { gen: VmGenerator },
70    /// Drains a stream's emit channel.
71    Stream { stream: VmStream },
72    /// Reads from a channel handle.
73    Chan { handle: VmChannelHandle },
74    /// Maps each item through a closure.
75    Map {
76        inner: Rc<RefCell<VmIter>>,
77        f: VmValue,
78    },
79    /// Runs a callback for side effects, then yields the original item.
80    Tap {
81        inner: Rc<RefCell<VmIter>>,
82        f: VmValue,
83    },
84    /// Keeps only items for which the predicate is truthy.
85    Filter {
86        inner: Rc<RefCell<VmIter>>,
87        p: VmValue,
88    },
89    /// Running fold that yields each accumulator.
90    Scan {
91        inner: Rc<RefCell<VmIter>>,
92        acc: VmValue,
93        f: VmValue,
94    },
95    /// Maps each item to an iterable and flattens one level.
96    FlatMap {
97        inner: Rc<RefCell<VmIter>>,
98        f: VmValue,
99        cur: Option<Rc<RefCell<VmIter>>>,
100    },
101    /// Yields up to `remaining` items from `inner`, then becomes Exhausted.
102    Take {
103        inner: Rc<RefCell<VmIter>>,
104        remaining: usize,
105    },
106    /// Skips the first `remaining` items from `inner` on the first call, then
107    /// forwards. `remaining == 0` is the sentinel for "already primed".
108    Skip {
109        inner: Rc<RefCell<VmIter>>,
110        remaining: usize,
111    },
112    /// Yields items from `inner` while the predicate is truthy; after the
113    /// first falsy predicate or inner exhaustion, becomes Exhausted.
114    TakeWhile {
115        inner: Rc<RefCell<VmIter>>,
116        p: VmValue,
117        done: bool,
118    },
119    /// Yields items until the predicate is truthy. The matching sentinel item
120    /// is consumed but not yielded.
121    TakeUntil {
122        inner: Rc<RefCell<VmIter>>,
123        p: VmValue,
124    },
125    /// Discards items while the predicate is truthy; after the first falsy
126    /// item, forwards that item and all subsequent items from `inner`.
127    SkipWhile {
128        inner: Rc<RefCell<VmIter>>,
129        p: VmValue,
130        primed: bool,
131    },
132    /// Advances two inner iters in lockstep; yields `Pair(a, b)` until either
133    /// side is exhausted.
134    Zip {
135        a: Rc<RefCell<VmIter>>,
136        b: Rc<RefCell<VmIter>>,
137    },
138    /// Yields `Pair(i, item)` starting at `i = 0`.
139    Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
140    /// Concatenates two iters: drains `a` first, then `b`.
141    Chain {
142        a: Rc<RefCell<VmIter>>,
143        b: Rc<RefCell<VmIter>>,
144        on_a: bool,
145    },
146    /// Drains any non-exhausted source in rotating order.
147    Merge {
148        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
149        cursor: usize,
150    },
151    /// Strict round-robin over non-exhausted sources.
152    Interleave {
153        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
154        cursor: usize,
155    },
156    /// First source to yield wins; subsequent pulls only read that source.
157    Race {
158        sources: Vec<Option<Rc<RefCell<VmIter>>>>,
159        winner: Option<Rc<RefCell<VmIter>>>,
160    },
161    /// One source fanned out into several single-pass branches.
162    Broadcast {
163        shared: Rc<RefCell<VmBroadcastState>>,
164        branch: usize,
165        index: usize,
166    },
167    /// Sleeps between emissions after the first item.
168    Throttle {
169        inner: Rc<RefCell<VmIter>>,
170        interval_ms: u64,
171        next_ready: Option<tokio::time::Instant>,
172    },
173    /// Coalesces immediately available bursts and emits the last item seen
174    /// after the quiet window.
175    Debounce {
176        inner: Rc<RefCell<VmIter>>,
177        window_ms: u64,
178    },
179    /// Yields `VmValue::List` batches of up to `n` items from `inner`.
180    /// The final batch may be shorter; empty input yields no batches.
181    Chunks {
182        inner: Rc<RefCell<VmIter>>,
183        n: usize,
184    },
185    /// Yields sliding windows of exactly `n` items from `inner` as `VmValue::List`.
186    /// If the input has fewer than `n` items total, no windows are yielded.
187    Windows {
188        inner: Rc<RefCell<VmIter>>,
189        n: usize,
190        buf: VecDeque<VmValue>,
191    },
192    /// Terminal state: `next` always returns `None`.
193    Exhausted,
194}
195
196impl VmIter {
197    /// Produce the next value, or `None` when exhausted.
198    ///
199    /// Combinator variants (`Map`, `Filter`, `FlatMap`) invoke user-provided
200    /// closures through the `vm` parameter.
201    pub fn next<'a>(
202        &'a mut self,
203        vm: &'a mut crate::vm::Vm,
204    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
205    {
206        Box::pin(async move { self.next_impl(vm).await })
207    }
208
209    async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
210        match self {
211            VmIter::Exhausted => Ok(None),
212            VmIter::Range {
213                next,
214                end,
215                inclusive,
216                done,
217            } => {
218                if let Some(v) = range_next(next, *end, *inclusive, done) {
219                    Ok(Some(VmValue::Int(v)))
220                } else {
221                    *self = VmIter::Exhausted;
222                    Ok(None)
223                }
224            }
225            VmIter::Vec { items, idx } => {
226                if *idx < items.len() {
227                    let v = items[*idx].clone();
228                    *idx += 1;
229                    Ok(Some(v))
230                } else {
231                    *self = VmIter::Exhausted;
232                    Ok(None)
233                }
234            }
235            VmIter::Dict { entries, keys, idx } => {
236                if *idx < keys.len() {
237                    let k = &keys[*idx];
238                    let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
239                    *idx += 1;
240                    Ok(Some(VmValue::Pair(Rc::new((
241                        VmValue::String(Rc::from(k.as_str())),
242                        v,
243                    )))))
244                } else {
245                    *self = VmIter::Exhausted;
246                    Ok(None)
247                }
248            }
249            VmIter::Chars { s, byte_idx } => {
250                if *byte_idx >= s.len() {
251                    *self = VmIter::Exhausted;
252                    return Ok(None);
253                }
254                let rest = &s[*byte_idx..];
255                if let Some(c) = rest.chars().next() {
256                    *byte_idx += c.len_utf8();
257                    Ok(Some(VmValue::String(Rc::from(c.to_string().as_str()))))
258                } else {
259                    *self = VmIter::Exhausted;
260                    Ok(None)
261                }
262            }
263            VmIter::Gen { gen } => {
264                if gen.done.get() {
265                    *self = VmIter::Exhausted;
266                    return Ok(None);
267                }
268                let rx = gen.receiver.clone();
269                let mut guard = rx.lock().await;
270                match guard.recv().await {
271                    Some(Ok(v)) => Ok(Some(v)),
272                    Some(Err(error)) => {
273                        gen.done.set(true);
274                        drop(guard);
275                        *self = VmIter::Exhausted;
276                        Err(error)
277                    }
278                    None => {
279                        gen.done.set(true);
280                        drop(guard);
281                        *self = VmIter::Exhausted;
282                        Ok(None)
283                    }
284                }
285            }
286            VmIter::Stream { stream } => {
287                if stream.done.get() {
288                    *self = VmIter::Exhausted;
289                    return Ok(None);
290                }
291                let rx = stream.receiver.clone();
292                let mut guard = rx.lock().await;
293                match guard.recv().await {
294                    Some(Ok(v)) => Ok(Some(v)),
295                    Some(Err(error)) => {
296                        stream.done.set(true);
297                        drop(guard);
298                        *self = VmIter::Exhausted;
299                        Err(error)
300                    }
301                    None => {
302                        stream.done.set(true);
303                        drop(guard);
304                        *self = VmIter::Exhausted;
305                        Ok(None)
306                    }
307                }
308            }
309            VmIter::Map { inner, f } => {
310                let f = f.clone();
311                let item = next_handle(inner, vm).await?;
312                match item {
313                    None => {
314                        *self = VmIter::Exhausted;
315                        Ok(None)
316                    }
317                    Some(v) => {
318                        let out = vm.call_callable_value(&f, &[v]).await?;
319                        Ok(Some(out))
320                    }
321                }
322            }
323            VmIter::Tap { inner, f } => {
324                let f = f.clone();
325                let item = next_handle(inner, vm).await?;
326                match item {
327                    None => {
328                        *self = VmIter::Exhausted;
329                        Ok(None)
330                    }
331                    Some(v) => {
332                        vm.call_callable_value(&f, &[v.clone()]).await?;
333                        Ok(Some(v))
334                    }
335                }
336            }
337            VmIter::Filter { inner, p } => {
338                let p = p.clone();
339                loop {
340                    let item = next_handle(inner, vm).await?;
341                    match item {
342                        None => {
343                            *self = VmIter::Exhausted;
344                            return Ok(None);
345                        }
346                        Some(v) => {
347                            let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
348                            if keep.is_truthy() {
349                                return Ok(Some(v));
350                            }
351                        }
352                    }
353                }
354            }
355            VmIter::Scan { inner, acc, f } => {
356                let f = f.clone();
357                let item = next_handle(inner, vm).await?;
358                match item {
359                    None => {
360                        *self = VmIter::Exhausted;
361                        Ok(None)
362                    }
363                    Some(v) => {
364                        let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
365                        *acc = next_acc.clone();
366                        Ok(Some(next_acc))
367                    }
368                }
369            }
370            VmIter::FlatMap { inner, f, cur } => {
371                let f = f.clone();
372                loop {
373                    if let Some(cur_iter) = cur.clone() {
374                        let item = next_handle(&cur_iter, vm).await?;
375                        if let Some(v) = item {
376                            return Ok(Some(v));
377                        }
378                        *cur = None;
379                    }
380                    let item = next_handle(inner, vm).await?;
381                    match item {
382                        None => {
383                            *self = VmIter::Exhausted;
384                            return Ok(None);
385                        }
386                        Some(v) => {
387                            let result = vm.call_callable_value(&f, &[v]).await?;
388                            let lifted = iter_from_value(result)?;
389                            if let VmValue::Iter(h) = lifted {
390                                *cur = Some(h);
391                            } else {
392                                return Err(VmError::TypeError(
393                                    "flat_map: expected iterable result".to_string(),
394                                ));
395                            }
396                        }
397                    }
398                }
399            }
400            VmIter::Take { inner, remaining } => {
401                if *remaining == 0 {
402                    *self = VmIter::Exhausted;
403                    return Ok(None);
404                }
405                let item = next_handle(inner, vm).await?;
406                match item {
407                    None => {
408                        *self = VmIter::Exhausted;
409                        Ok(None)
410                    }
411                    Some(v) => {
412                        *remaining -= 1;
413                        if *remaining == 0 {
414                            *self = VmIter::Exhausted;
415                        }
416                        Ok(Some(v))
417                    }
418                }
419            }
420            VmIter::Skip { inner, remaining } => {
421                while *remaining > 0 {
422                    let item = next_handle(inner, vm).await?;
423                    match item {
424                        None => {
425                            *self = VmIter::Exhausted;
426                            return Ok(None);
427                        }
428                        Some(_) => {
429                            *remaining -= 1;
430                        }
431                    }
432                }
433                let item = next_handle(inner, vm).await?;
434                match item {
435                    None => {
436                        *self = VmIter::Exhausted;
437                        Ok(None)
438                    }
439                    Some(v) => Ok(Some(v)),
440                }
441            }
442            VmIter::TakeWhile { inner, p, done } => {
443                if *done {
444                    return Ok(None);
445                }
446                let p = p.clone();
447                let item = next_handle(inner, vm).await?;
448                match item {
449                    None => {
450                        *self = VmIter::Exhausted;
451                        Ok(None)
452                    }
453                    Some(v) => {
454                        let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
455                        if keep.is_truthy() {
456                            Ok(Some(v))
457                        } else {
458                            *self = VmIter::Exhausted;
459                            Ok(None)
460                        }
461                    }
462                }
463            }
464            VmIter::TakeUntil { inner, p } => {
465                let p = p.clone();
466                let item = next_handle(inner, vm).await?;
467                match item {
468                    None => {
469                        *self = VmIter::Exhausted;
470                        Ok(None)
471                    }
472                    Some(v) => {
473                        let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
474                        if stop.is_truthy() {
475                            *self = VmIter::Exhausted;
476                            Ok(None)
477                        } else {
478                            Ok(Some(v))
479                        }
480                    }
481                }
482            }
483            VmIter::SkipWhile { inner, p, primed } => {
484                if *primed {
485                    let item = next_handle(inner, vm).await?;
486                    return match item {
487                        None => {
488                            *self = VmIter::Exhausted;
489                            Ok(None)
490                        }
491                        Some(v) => Ok(Some(v)),
492                    };
493                }
494                let p = p.clone();
495                loop {
496                    let item = next_handle(inner, vm).await?;
497                    match item {
498                        None => {
499                            *self = VmIter::Exhausted;
500                            return Ok(None);
501                        }
502                        Some(v) => {
503                            let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
504                            if !drop_it.is_truthy() {
505                                *primed = true;
506                                return Ok(Some(v));
507                            }
508                        }
509                    }
510                }
511            }
512            VmIter::Zip { a, b } => {
513                let ia = next_handle(a, vm).await?;
514                let x = match ia {
515                    None => {
516                        *self = VmIter::Exhausted;
517                        return Ok(None);
518                    }
519                    Some(v) => v,
520                };
521                let ib = next_handle(b, vm).await?;
522                let y = match ib {
523                    None => {
524                        *self = VmIter::Exhausted;
525                        return Ok(None);
526                    }
527                    Some(v) => v,
528                };
529                Ok(Some(VmValue::Pair(Rc::new((x, y)))))
530            }
531            VmIter::Enumerate { inner, i } => {
532                let item = next_handle(inner, vm).await?;
533                match item {
534                    None => {
535                        *self = VmIter::Exhausted;
536                        Ok(None)
537                    }
538                    Some(v) => {
539                        let idx = *i;
540                        *i += 1;
541                        Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
542                    }
543                }
544            }
545            VmIter::Chain { a, b, on_a } => {
546                if *on_a {
547                    let item = next_handle(a, vm).await?;
548                    if let Some(v) = item {
549                        return Ok(Some(v));
550                    }
551                    *on_a = false;
552                }
553                let item = next_handle(b, vm).await?;
554                match item {
555                    None => {
556                        *self = VmIter::Exhausted;
557                        Ok(None)
558                    }
559                    Some(v) => Ok(Some(v)),
560                }
561            }
562            VmIter::Merge { sources, cursor } => loop {
563                if sources.is_empty() || sources.iter().all(Option::is_none) {
564                    *self = VmIter::Exhausted;
565                    return Ok(None);
566                }
567                let len = sources.len();
568                let mut live = 0usize;
569                for offset in 0..len {
570                    let idx = (*cursor + offset) % len;
571                    let Some(handle) = sources[idx].clone() else {
572                        continue;
573                    };
574                    match try_next_ready(&handle, vm).await? {
575                        Some(v) => {
576                            *cursor = (idx + 1) % len;
577                            return Ok(Some(v));
578                        }
579                        None => {
580                            if is_exhausted_handle(&handle) {
581                                sources[idx] = None;
582                            } else {
583                                live += 1;
584                            }
585                        }
586                    }
587                }
588                if live == 0 {
589                    *self = VmIter::Exhausted;
590                    return Ok(None);
591                }
592                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
593            },
594            VmIter::Interleave { sources, cursor } => {
595                if sources.is_empty() || sources.iter().all(Option::is_none) {
596                    *self = VmIter::Exhausted;
597                    return Ok(None);
598                }
599                let len = sources.len();
600                for offset in 0..len {
601                    let idx = (*cursor + offset) % len;
602                    let Some(handle) = sources[idx].clone() else {
603                        continue;
604                    };
605                    match next_handle(&handle, vm).await? {
606                        Some(v) => {
607                            *cursor = (idx + 1) % len;
608                            return Ok(Some(v));
609                        }
610                        None => {
611                            sources[idx] = None;
612                        }
613                    }
614                }
615                *self = VmIter::Exhausted;
616                Ok(None)
617            }
618            VmIter::Race { sources, winner } => {
619                if let Some(handle) = winner.clone() {
620                    let item = next_handle(&handle, vm).await?;
621                    return match item {
622                        Some(v) => Ok(Some(v)),
623                        None => {
624                            *self = VmIter::Exhausted;
625                            Ok(None)
626                        }
627                    };
628                }
629                loop {
630                    let mut live = 0usize;
631                    for source in sources.iter_mut() {
632                        let Some(handle) = source.clone() else {
633                            continue;
634                        };
635                        match try_next_ready(&handle, vm).await? {
636                            Some(v) => {
637                                *winner = Some(handle);
638                                sources.clear();
639                                return Ok(Some(v));
640                            }
641                            None => {
642                                if is_exhausted_handle(&handle) {
643                                    *source = None;
644                                } else {
645                                    live += 1;
646                                }
647                            }
648                        }
649                    }
650                    if live == 0 {
651                        *self = VmIter::Exhausted;
652                        return Ok(None);
653                    }
654                    tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
655                }
656            }
657            VmIter::Broadcast {
658                shared,
659                branch,
660                index,
661            } => {
662                let _ = branch;
663                loop {
664                    let mut state =
665                        std::mem::replace(&mut *shared.borrow_mut(), empty_broadcast_state());
666                    if *index < state.buffer.len() {
667                        let item = state.buffer[*index].clone();
668                        *index += 1;
669                        *shared.borrow_mut() = state;
670                        return Ok(Some(item));
671                    }
672                    if state.exhausted {
673                        *shared.borrow_mut() = state;
674                        *self = VmIter::Exhausted;
675                        return Ok(None);
676                    }
677                    let next = next_handle(&state.source, vm).await;
678                    match next {
679                        Err(err) => {
680                            *shared.borrow_mut() = state;
681                            return Err(err);
682                        }
683                        Ok(Some(v)) => {
684                            state.buffer.push(v);
685                            *shared.borrow_mut() = state;
686                        }
687                        Ok(None) => {
688                            state.exhausted = true;
689                            *shared.borrow_mut() = state;
690                        }
691                    }
692                }
693            }
694            VmIter::Throttle {
695                inner,
696                interval_ms,
697                next_ready,
698            } => {
699                if let Some(ready_at) = next_ready.take() {
700                    let now = tokio::time::Instant::now();
701                    if ready_at > now {
702                        tokio::time::sleep_until(ready_at).await;
703                    }
704                }
705                let item = next_handle(inner, vm).await?;
706                match item {
707                    None => {
708                        *self = VmIter::Exhausted;
709                        Ok(None)
710                    }
711                    Some(v) => {
712                        *next_ready = Some(
713                            tokio::time::Instant::now()
714                                + tokio::time::Duration::from_millis(*interval_ms),
715                        );
716                        Ok(Some(v))
717                    }
718                }
719            }
720            VmIter::Debounce { inner, window_ms } => {
721                let mut last = match next_handle(inner, vm).await? {
722                    Some(v) => v,
723                    None => {
724                        *self = VmIter::Exhausted;
725                        return Ok(None);
726                    }
727                };
728                if *window_ms > 0 {
729                    tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
730                }
731                while let Some(v) = try_next_ready(inner, vm).await? {
732                    last = v;
733                }
734                Ok(Some(last))
735            }
736            VmIter::Chunks { inner, n } => {
737                let n = *n;
738                let mut batch: Vec<VmValue> = Vec::with_capacity(n);
739                for _ in 0..n {
740                    let item = next_handle(inner, vm).await?;
741                    match item {
742                        Some(v) => {
743                            batch.push(v);
744                        }
745                        None => break,
746                    }
747                }
748                if batch.is_empty() {
749                    *self = VmIter::Exhausted;
750                    Ok(None)
751                } else {
752                    Ok(Some(VmValue::List(Rc::new(batch))))
753                }
754            }
755            VmIter::Windows { inner, n, buf } => {
756                let n = *n;
757                if buf.is_empty() {
758                    while buf.len() < n {
759                        let item = next_handle(inner, vm).await?;
760                        match item {
761                            Some(v) => buf.push_back(v),
762                            None => {
763                                *self = VmIter::Exhausted;
764                                return Ok(None);
765                            }
766                        }
767                    }
768                } else {
769                    let item = next_handle(inner, vm).await?;
770                    match item {
771                        Some(v) => {
772                            buf.pop_front();
773                            buf.push_back(v);
774                        }
775                        None => {
776                            *self = VmIter::Exhausted;
777                            return Ok(None);
778                        }
779                    }
780                }
781                let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
782                Ok(Some(VmValue::List(Rc::new(snapshot))))
783            }
784            VmIter::Chan { handle } => {
785                let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
786                let rx = handle.receiver.clone();
787                let mut guard = rx.lock().await;
788                let item = if is_closed {
789                    guard.try_recv().ok()
790                } else {
791                    guard.recv().await
792                };
793                match item {
794                    Some(v) => Ok(Some(v)),
795                    None => {
796                        drop(guard);
797                        *self = VmIter::Exhausted;
798                        Ok(None)
799                    }
800                }
801            }
802        }
803    }
804}
805
806/// Advance a handle without holding a `RefCell` borrow across the await.
807///
808/// Swaps the iter state out into a local owned value (replacing it with
809/// `Exhausted`), runs `next` on the owned state, then swaps it back. This
810/// avoids `clippy::await_holding_refcell_ref` while preserving single-pass
811/// semantics: a nested `next` call on the same handle during the await would
812/// see `Exhausted` (the iter protocol doesn't permit re-entrant stepping of
813/// the same handle anyway).
814pub async fn next_handle(
815    handle: &Rc<RefCell<VmIter>>,
816    vm: &mut crate::vm::Vm,
817) -> Result<Option<VmValue>, VmError> {
818    let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
819    let result = state.next(vm).await;
820    // Restore state unless the inner call replaced it with Exhausted.
821    *handle.borrow_mut() = state;
822    result
823}
824
825/// Fully consume an iter handle into a Vec of values.
826pub async fn drain(
827    handle: &Rc<RefCell<VmIter>>,
828    vm: &mut crate::vm::Vm,
829) -> Result<Vec<VmValue>, VmError> {
830    let mut out = Vec::new();
831    loop {
832        let v = next_handle(handle, vm).await?;
833        match v {
834            Some(v) => out.push(v),
835            None => break,
836        }
837    }
838    Ok(out)
839}
840
841/// Fully consume an iter handle into a Vec, failing before pushing item
842/// `max + 1`.
843pub async fn drain_capped(
844    handle: &Rc<RefCell<VmIter>>,
845    vm: &mut crate::vm::Vm,
846    max: usize,
847) -> Result<Vec<VmValue>, VmError> {
848    let mut out = Vec::new();
849    loop {
850        let v = next_handle(handle, vm).await?;
851        match v {
852            Some(v) => {
853                if out.len() >= max {
854                    return Err(VmError::Runtime(format!(
855                        "stream.collect: max cap {max} exceeded"
856                    )));
857                }
858                out.push(v);
859            }
860            None => break,
861        }
862    }
863    Ok(out)
864}
865
866pub fn iter_handle_from_value(v: VmValue) -> Result<Rc<RefCell<VmIter>>, VmError> {
867    match iter_from_value(v)? {
868        VmValue::Iter(handle) => Ok(handle),
869        _ => unreachable!("iter_from_value returns Iter"),
870    }
871}
872
873pub fn broadcast_branches(source: Rc<RefCell<VmIter>>, n: usize) -> Vec<VmValue> {
874    let shared = Rc::new(RefCell::new(VmBroadcastState {
875        source,
876        buffer: Vec::new(),
877        exhausted: false,
878    }));
879    (0..n)
880        .map(|branch| {
881            VmValue::Iter(Rc::new(RefCell::new(VmIter::Broadcast {
882                shared: Rc::clone(&shared),
883                branch,
884                index: 0,
885            })))
886        })
887        .collect()
888}
889
890fn empty_broadcast_state() -> VmBroadcastState {
891    VmBroadcastState {
892        source: Rc::new(RefCell::new(VmIter::Exhausted)),
893        buffer: Vec::new(),
894        exhausted: true,
895    }
896}
897
898fn try_next_ready<'a>(
899    handle: &'a Rc<RefCell<VmIter>>,
900    vm: &'a mut crate::vm::Vm,
901) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>> {
902    Box::pin(async move {
903        let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
904        let result = state.try_next_ready_impl(vm).await;
905        *handle.borrow_mut() = state;
906        result
907    })
908}
909
910fn is_exhausted_handle(handle: &Rc<RefCell<VmIter>>) -> bool {
911    matches!(&*handle.borrow(), VmIter::Exhausted)
912}
913
914impl VmIter {
915    async fn try_next_ready_impl(
916        &mut self,
917        vm: &mut crate::vm::Vm,
918    ) -> Result<Option<VmValue>, VmError> {
919        match self {
920            VmIter::Exhausted => Ok(None),
921            VmIter::Map { inner, f } => {
922                let f = f.clone();
923                match try_next_ready(inner, vm).await? {
924                    Some(v) => Ok(Some(vm.call_callable_value(&f, &[v]).await?)),
925                    None => {
926                        if is_exhausted_handle(inner) {
927                            *self = VmIter::Exhausted;
928                        }
929                        Ok(None)
930                    }
931                }
932            }
933            VmIter::Tap { inner, f } => {
934                let f = f.clone();
935                match try_next_ready(inner, vm).await? {
936                    Some(v) => {
937                        vm.call_callable_value(&f, &[v.clone()]).await?;
938                        Ok(Some(v))
939                    }
940                    None => {
941                        if is_exhausted_handle(inner) {
942                            *self = VmIter::Exhausted;
943                        }
944                        Ok(None)
945                    }
946                }
947            }
948            VmIter::Filter { inner, p } => {
949                let p = p.clone();
950                loop {
951                    match try_next_ready(inner, vm).await? {
952                        Some(v) => {
953                            let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
954                            if keep.is_truthy() {
955                                return Ok(Some(v));
956                            }
957                        }
958                        None => {
959                            if is_exhausted_handle(inner) {
960                                *self = VmIter::Exhausted;
961                            }
962                            return Ok(None);
963                        }
964                    }
965                }
966            }
967            VmIter::Scan { inner, acc, f } => {
968                let f = f.clone();
969                match try_next_ready(inner, vm).await? {
970                    Some(v) => {
971                        let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
972                        *acc = next_acc.clone();
973                        Ok(Some(next_acc))
974                    }
975                    None => {
976                        if is_exhausted_handle(inner) {
977                            *self = VmIter::Exhausted;
978                        }
979                        Ok(None)
980                    }
981                }
982            }
983            VmIter::FlatMap { inner, f, cur } => {
984                let f = f.clone();
985                loop {
986                    if let Some(cur_iter) = cur.clone() {
987                        match try_next_ready(&cur_iter, vm).await? {
988                            Some(v) => return Ok(Some(v)),
989                            None => {
990                                if is_exhausted_handle(&cur_iter) {
991                                    *cur = None;
992                                }
993                                return Ok(None);
994                            }
995                        }
996                    }
997                    match try_next_ready(inner, vm).await? {
998                        Some(v) => {
999                            let result = vm.call_callable_value(&f, &[v]).await?;
1000                            *cur = Some(iter_handle_from_value(result)?);
1001                        }
1002                        None => {
1003                            if is_exhausted_handle(inner) {
1004                                *self = VmIter::Exhausted;
1005                            }
1006                            return Ok(None);
1007                        }
1008                    }
1009                }
1010            }
1011            VmIter::Take { inner, remaining } => {
1012                if *remaining == 0 {
1013                    *self = VmIter::Exhausted;
1014                    return Ok(None);
1015                }
1016                match try_next_ready(inner, vm).await? {
1017                    Some(v) => {
1018                        *remaining -= 1;
1019                        if *remaining == 0 {
1020                            *self = VmIter::Exhausted;
1021                        }
1022                        Ok(Some(v))
1023                    }
1024                    None => {
1025                        if is_exhausted_handle(inner) {
1026                            *self = VmIter::Exhausted;
1027                        }
1028                        Ok(None)
1029                    }
1030                }
1031            }
1032            VmIter::TakeUntil { inner, p } => {
1033                let p = p.clone();
1034                match try_next_ready(inner, vm).await? {
1035                    Some(v) => {
1036                        let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
1037                        if stop.is_truthy() {
1038                            *self = VmIter::Exhausted;
1039                            Ok(None)
1040                        } else {
1041                            Ok(Some(v))
1042                        }
1043                    }
1044                    None => {
1045                        if is_exhausted_handle(inner) {
1046                            *self = VmIter::Exhausted;
1047                        }
1048                        Ok(None)
1049                    }
1050                }
1051            }
1052            VmIter::Throttle {
1053                inner,
1054                interval_ms,
1055                next_ready,
1056            } => {
1057                if let Some(ready_at) = *next_ready {
1058                    if ready_at > tokio::time::Instant::now() {
1059                        return Ok(None);
1060                    }
1061                }
1062                match try_next_ready(inner, vm).await? {
1063                    Some(v) => {
1064                        *next_ready = Some(
1065                            tokio::time::Instant::now()
1066                                + tokio::time::Duration::from_millis(*interval_ms),
1067                        );
1068                        Ok(Some(v))
1069                    }
1070                    None => {
1071                        if is_exhausted_handle(inner) {
1072                            *self = VmIter::Exhausted;
1073                        }
1074                        Ok(None)
1075                    }
1076                }
1077            }
1078            VmIter::Range { .. }
1079            | VmIter::Vec { .. }
1080            | VmIter::Dict { .. }
1081            | VmIter::Chars { .. }
1082            | VmIter::Skip { .. }
1083            | VmIter::TakeWhile { .. }
1084            | VmIter::SkipWhile { .. }
1085            | VmIter::Zip { .. }
1086            | VmIter::Enumerate { .. }
1087            | VmIter::Chain { .. }
1088            | VmIter::Merge { .. }
1089            | VmIter::Interleave { .. }
1090            | VmIter::Race { .. }
1091            | VmIter::Broadcast { .. }
1092            | VmIter::Debounce { .. }
1093            | VmIter::Chunks { .. }
1094            | VmIter::Windows { .. } => self.next(vm).await,
1095            VmIter::Gen { gen } => {
1096                if gen.done.get() {
1097                    *self = VmIter::Exhausted;
1098                    return Ok(None);
1099                }
1100                let rx = gen.receiver.clone();
1101                let result = match rx.try_lock() {
1102                    Ok(mut guard) => match guard.try_recv() {
1103                        Ok(Ok(v)) => Ok(Some(v)),
1104                        Ok(Err(error)) => {
1105                            gen.done.set(true);
1106                            *self = VmIter::Exhausted;
1107                            Err(error)
1108                        }
1109                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1110                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1111                            gen.done.set(true);
1112                            *self = VmIter::Exhausted;
1113                            Ok(None)
1114                        }
1115                    },
1116                    Err(_) => Ok(None),
1117                };
1118                result
1119            }
1120            VmIter::Stream { stream } => {
1121                if stream.done.get() {
1122                    *self = VmIter::Exhausted;
1123                    return Ok(None);
1124                }
1125                let rx = stream.receiver.clone();
1126                let result = match rx.try_lock() {
1127                    Ok(mut guard) => match guard.try_recv() {
1128                        Ok(Ok(v)) => Ok(Some(v)),
1129                        Ok(Err(error)) => {
1130                            stream.done.set(true);
1131                            *self = VmIter::Exhausted;
1132                            Err(error)
1133                        }
1134                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1135                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1136                            stream.done.set(true);
1137                            *self = VmIter::Exhausted;
1138                            Ok(None)
1139                        }
1140                    },
1141                    Err(_) => Ok(None),
1142                };
1143                result
1144            }
1145            VmIter::Chan { handle } => {
1146                let rx = handle.receiver.clone();
1147                let result = match rx.try_lock() {
1148                    Ok(mut guard) => match guard.try_recv() {
1149                        Ok(v) => Ok(Some(v)),
1150                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1151                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1152                            *self = VmIter::Exhausted;
1153                            Ok(None)
1154                        }
1155                    },
1156                    Err(_) => Ok(None),
1157                };
1158                result
1159            }
1160        }
1161    }
1162}
1163
1164/// Convenience: wrap a source value into a `VmValue::Iter`. Used by the
1165/// `iter()` builtin and by combinator/sink implementations in later steps.
1166pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1167    let inner = match v {
1168        VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1169        VmValue::Range(r) => VmIter::Range {
1170            next: r.start,
1171            end: r.end,
1172            inclusive: r.inclusive,
1173            done: range_initial_done(r.start, r.end, r.inclusive),
1174        },
1175        VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1176        VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1177        VmValue::Dict(entries) => {
1178            let keys: Vec<String> = entries.keys().cloned().collect();
1179            VmIter::Dict {
1180                entries,
1181                keys,
1182                idx: 0,
1183            }
1184        }
1185        VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1186        VmValue::Generator(gen) => VmIter::Gen { gen },
1187        VmValue::Stream(stream) => VmIter::Stream { stream },
1188        VmValue::Channel(handle) => VmIter::Chan { handle },
1189        other => {
1190            return Err(VmError::TypeError(format!(
1191                "iter: value of type {} is not iterable",
1192                other.type_name()
1193            )))
1194        }
1195    };
1196    Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201    use super::*;
1202    use crate::value::VmRange;
1203
1204    fn run_iter_test(test: impl std::future::Future<Output = ()>) {
1205        let rt = tokio::runtime::Builder::new_current_thread()
1206            .enable_all()
1207            .build()
1208            .unwrap();
1209        rt.block_on(test);
1210    }
1211
1212    #[test]
1213    fn inclusive_range_at_i64_max_yields_the_endpoint() {
1214        run_iter_test(async {
1215            let mut vm = crate::vm::Vm::new();
1216            let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1217                start: i64::MAX,
1218                end: i64::MAX,
1219                inclusive: true,
1220            }))
1221            .unwrap() else {
1222                panic!("expected iter");
1223            };
1224
1225            let first = next_handle(&handle, &mut vm).await.unwrap();
1226            assert!(matches!(first, Some(VmValue::Int(value)) if value == i64::MAX));
1227            assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1228        });
1229    }
1230
1231    #[test]
1232    fn exclusive_range_at_i64_max_is_empty() {
1233        run_iter_test(async {
1234            let mut vm = crate::vm::Vm::new();
1235            let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1236                start: i64::MAX,
1237                end: i64::MAX,
1238                inclusive: false,
1239            }))
1240            .unwrap() else {
1241                panic!("expected iter");
1242            };
1243
1244            assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1245        });
1246    }
1247}