Skip to main content

harn_vm/vm/
iter.rs

1//! Lazy iterator protocol for the Harn VM.
2//!
3//! `VmIter` is the backing enum for `VmValue::Iter`. It's a single-pass, fused
4//! iterator; once `next` returns `None` the variant is replaced with
5//! `Exhausted`.
6
7use std::collections::{BTreeMap, VecDeque};
8use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
13
14pub type VmIterHandle = Arc<Mutex<VmIter>>;
15
16fn range_initial_done(start: i64, end: i64, inclusive: bool) -> bool {
17    if inclusive {
18        start > end
19    } else {
20        start >= end
21    }
22}
23
24fn range_next(next: &mut i64, end: i64, inclusive: bool, done: &mut bool) -> Option<i64> {
25    if *done {
26        return None;
27    }
28    let value = *next;
29    let at_end = if inclusive {
30        value >= end
31    } else {
32        value
33            .checked_add(1)
34            .is_none_or(|candidate| candidate >= end)
35    };
36    if at_end {
37        *done = true;
38    } else {
39        *next += 1;
40    }
41    Some(value)
42}
43
44#[derive(Debug)]
45pub struct VmBroadcastState {
46    source: VmIterHandle,
47    buffer: Vec<VmValue>,
48    exhausted: bool,
49}
50
51/// Backing enum for `VmValue::Iter`. See module docs.
52#[derive(Debug)]
53pub enum VmIter {
54    /// Step through a lazy integer range without materializing.
55    Range {
56        next: i64,
57        end: i64,
58        inclusive: bool,
59        done: bool,
60    },
61    /// Snapshot over a shared list / set backing store.
62    Vec {
63        items: Arc<Vec<VmValue>>,
64        idx: usize,
65    },
66    /// Snapshot over a dict; yields `Pair(key, value)` items.
67    Dict {
68        entries: Arc<BTreeMap<String, VmValue>>,
69        keys: Vec<String>,
70        idx: usize,
71    },
72    /// Unicode scalar iteration over a string.
73    Chars { s: Arc<str>, byte_idx: usize },
74    /// Drains a generator's yield channel.
75    Gen { gen: Arc<VmGenerator> },
76    /// Drains a stream's emit channel.
77    Stream { stream: Arc<VmStream> },
78    /// Reads from a channel handle.
79    Chan { handle: Arc<VmChannelHandle> },
80    /// Maps each item through a closure.
81    Map { inner: VmIterHandle, f: VmValue },
82    /// Runs a callback for side effects, then yields the original item.
83    Tap { inner: VmIterHandle, f: VmValue },
84    /// Keeps only items for which the predicate is truthy.
85    Filter { inner: VmIterHandle, p: VmValue },
86    /// Running fold that yields each accumulator.
87    Scan {
88        inner: VmIterHandle,
89        acc: VmValue,
90        f: VmValue,
91    },
92    /// Maps each item to an iterable and flattens one level.
93    FlatMap {
94        inner: VmIterHandle,
95        f: VmValue,
96        cur: Option<VmIterHandle>,
97    },
98    /// Yields up to `remaining` items from `inner`, then becomes Exhausted.
99    Take {
100        inner: VmIterHandle,
101        remaining: usize,
102    },
103    /// Skips the first `remaining` items from `inner` on the first call, then
104    /// forwards. `remaining == 0` is the sentinel for "already primed".
105    Skip {
106        inner: VmIterHandle,
107        remaining: usize,
108    },
109    /// Yields items from `inner` while the predicate is truthy; after the
110    /// first falsy predicate or inner exhaustion, becomes Exhausted.
111    TakeWhile {
112        inner: VmIterHandle,
113        p: VmValue,
114        done: bool,
115    },
116    /// Yields items until the predicate is truthy. The matching sentinel item
117    /// is consumed but not yielded.
118    TakeUntil { inner: VmIterHandle, p: VmValue },
119    /// Discards items while the predicate is truthy; after the first falsy
120    /// item, forwards that item and all subsequent items from `inner`.
121    SkipWhile {
122        inner: VmIterHandle,
123        p: VmValue,
124        primed: bool,
125    },
126    /// Advances two inner iters in lockstep; yields `Pair(a, b)` until either
127    /// side is exhausted.
128    Zip { a: VmIterHandle, b: VmIterHandle },
129    /// Yields `Pair(i, item)` starting at `i = 0`.
130    Enumerate { inner: VmIterHandle, i: i64 },
131    /// Concatenates two iters: drains `a` first, then `b`.
132    Chain {
133        a: VmIterHandle,
134        b: VmIterHandle,
135        on_a: bool,
136    },
137    /// Drains any non-exhausted source in rotating order.
138    Merge {
139        sources: Vec<Option<VmIterHandle>>,
140        cursor: usize,
141    },
142    /// Strict round-robin over non-exhausted sources.
143    Interleave {
144        sources: Vec<Option<VmIterHandle>>,
145        cursor: usize,
146    },
147    /// First source to yield wins; subsequent pulls only read that source.
148    Race {
149        sources: Vec<Option<VmIterHandle>>,
150        winner: Option<VmIterHandle>,
151    },
152    /// One source fanned out into several single-pass branches.
153    Broadcast {
154        shared: Arc<Mutex<VmBroadcastState>>,
155        branch: usize,
156        index: usize,
157    },
158    /// Sleeps between emissions after the first item.
159    Throttle {
160        inner: VmIterHandle,
161        interval_ms: u64,
162        next_ready: Option<tokio::time::Instant>,
163    },
164    /// Coalesces immediately available bursts and emits the last item seen
165    /// after the quiet window.
166    Debounce { inner: VmIterHandle, window_ms: u64 },
167    /// Yields `VmValue::List` batches of up to `n` items from `inner`.
168    /// The final batch may be shorter; empty input yields no batches.
169    Chunks { inner: VmIterHandle, n: usize },
170    /// Yields sliding windows of exactly `n` items from `inner` as `VmValue::List`.
171    /// If the input has fewer than `n` items total, no windows are yielded.
172    Windows {
173        inner: VmIterHandle,
174        n: usize,
175        buf: VecDeque<VmValue>,
176    },
177    /// Terminal state: `next` always returns `None`.
178    Exhausted,
179}
180
181impl VmIter {
182    /// Produce the next value, or `None` when exhausted.
183    ///
184    /// Combinator variants (`Map`, `Filter`, `FlatMap`) invoke user-provided
185    /// closures through the `vm` parameter.
186    pub fn next<'a>(
187        &'a mut self,
188        vm: &'a mut crate::vm::Vm,
189    ) -> std::pin::Pin<
190        Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + Send + 'a>,
191    > {
192        Box::pin(async move { self.next_impl(vm).await })
193    }
194
195    async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
196        match self {
197            VmIter::Exhausted => Ok(None),
198            VmIter::Range {
199                next,
200                end,
201                inclusive,
202                done,
203            } => {
204                if let Some(v) = range_next(next, *end, *inclusive, done) {
205                    Ok(Some(VmValue::Int(v)))
206                } else {
207                    *self = VmIter::Exhausted;
208                    Ok(None)
209                }
210            }
211            VmIter::Vec { items, idx } => {
212                if *idx < items.len() {
213                    let v = items[*idx].clone();
214                    *idx += 1;
215                    Ok(Some(v))
216                } else {
217                    *self = VmIter::Exhausted;
218                    Ok(None)
219                }
220            }
221            VmIter::Dict { entries, keys, idx } => {
222                if *idx < keys.len() {
223                    let k = &keys[*idx];
224                    let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
225                    *idx += 1;
226                    Ok(Some(VmValue::Pair(std::sync::Arc::new((
227                        VmValue::String(std::sync::Arc::from(k.as_str())),
228                        v,
229                    )))))
230                } else {
231                    *self = VmIter::Exhausted;
232                    Ok(None)
233                }
234            }
235            VmIter::Chars { s, byte_idx } => {
236                if *byte_idx >= s.len() {
237                    *self = VmIter::Exhausted;
238                    return Ok(None);
239                }
240                let rest = &s[*byte_idx..];
241                if let Some(c) = rest.chars().next() {
242                    *byte_idx += c.len_utf8();
243                    let mut buf = [0u8; 4];
244                    let encoded = c.encode_utf8(&mut buf);
245                    Ok(Some(VmValue::String(std::sync::Arc::from(&*encoded))))
246                } else {
247                    *self = VmIter::Exhausted;
248                    Ok(None)
249                }
250            }
251            VmIter::Gen { gen } => {
252                if gen.is_done() {
253                    *self = VmIter::Exhausted;
254                    return Ok(None);
255                }
256                let rx = gen.receiver.clone();
257                let mut guard = rx.lock().await;
258                match guard.recv().await {
259                    Some(Ok(v)) => Ok(Some(v)),
260                    Some(Err(error)) => {
261                        gen.mark_done();
262                        drop(guard);
263                        *self = VmIter::Exhausted;
264                        Err(error)
265                    }
266                    None => {
267                        gen.mark_done();
268                        drop(guard);
269                        *self = VmIter::Exhausted;
270                        Ok(None)
271                    }
272                }
273            }
274            VmIter::Stream { stream } => {
275                if stream.is_done() {
276                    *self = VmIter::Exhausted;
277                    return Ok(None);
278                }
279                let rx = stream.receiver.clone();
280                let mut guard = rx.lock().await;
281                match guard.recv().await {
282                    Some(Ok(v)) => Ok(Some(v)),
283                    Some(Err(error)) => {
284                        stream.mark_done();
285                        drop(guard);
286                        *self = VmIter::Exhausted;
287                        Err(error)
288                    }
289                    None => {
290                        stream.mark_done();
291                        drop(guard);
292                        *self = VmIter::Exhausted;
293                        Ok(None)
294                    }
295                }
296            }
297            VmIter::Map { inner, f } => {
298                let f = f.clone();
299                let item = next_handle(inner, vm).await?;
300                match item {
301                    None => {
302                        *self = VmIter::Exhausted;
303                        Ok(None)
304                    }
305                    Some(v) => {
306                        let out = vm.call_callable_one(&f, &v).await?;
307                        Ok(Some(out))
308                    }
309                }
310            }
311            VmIter::Tap { inner, f } => {
312                let f = f.clone();
313                let item = next_handle(inner, vm).await?;
314                match item {
315                    None => {
316                        *self = VmIter::Exhausted;
317                        Ok(None)
318                    }
319                    Some(v) => {
320                        vm.call_callable_one(&f, &v).await?;
321                        Ok(Some(v))
322                    }
323                }
324            }
325            VmIter::Filter { inner, p } => {
326                let p = p.clone();
327                loop {
328                    let item = next_handle(inner, vm).await?;
329                    match item {
330                        None => {
331                            *self = VmIter::Exhausted;
332                            return Ok(None);
333                        }
334                        Some(v) => {
335                            let keep = vm.call_callable_one(&p, &v).await?;
336                            if keep.is_truthy() {
337                                return Ok(Some(v));
338                            }
339                        }
340                    }
341                }
342            }
343            VmIter::Scan { inner, acc, f } => {
344                let f = f.clone();
345                let item = next_handle(inner, vm).await?;
346                match item {
347                    None => {
348                        *self = VmIter::Exhausted;
349                        Ok(None)
350                    }
351                    Some(v) => {
352                        let next_acc = vm.call_callable_two(&f, acc, &v).await?;
353                        *acc = next_acc.clone();
354                        Ok(Some(next_acc))
355                    }
356                }
357            }
358            VmIter::FlatMap { inner, f, cur } => {
359                let f = f.clone();
360                loop {
361                    if let Some(cur_iter) = cur.clone() {
362                        let item = next_handle(&cur_iter, vm).await?;
363                        if let Some(v) = item {
364                            return Ok(Some(v));
365                        }
366                        *cur = None;
367                    }
368                    let item = next_handle(inner, vm).await?;
369                    match item {
370                        None => {
371                            *self = VmIter::Exhausted;
372                            return Ok(None);
373                        }
374                        Some(v) => {
375                            let result = vm.call_callable_one(&f, &v).await?;
376                            let lifted = iter_from_value(result)?;
377                            if let VmValue::Iter(h) = lifted {
378                                *cur = Some(h);
379                            } else {
380                                return Err(VmError::TypeError(
381                                    "flat_map: expected iterable result".to_string(),
382                                ));
383                            }
384                        }
385                    }
386                }
387            }
388            VmIter::Take { inner, remaining } => {
389                if *remaining == 0 {
390                    *self = VmIter::Exhausted;
391                    return Ok(None);
392                }
393                let item = next_handle(inner, vm).await?;
394                match item {
395                    None => {
396                        *self = VmIter::Exhausted;
397                        Ok(None)
398                    }
399                    Some(v) => {
400                        *remaining -= 1;
401                        if *remaining == 0 {
402                            *self = VmIter::Exhausted;
403                        }
404                        Ok(Some(v))
405                    }
406                }
407            }
408            VmIter::Skip { inner, remaining } => {
409                while *remaining > 0 {
410                    let item = next_handle(inner, vm).await?;
411                    match item {
412                        None => {
413                            *self = VmIter::Exhausted;
414                            return Ok(None);
415                        }
416                        Some(_) => {
417                            *remaining -= 1;
418                        }
419                    }
420                }
421                let item = next_handle(inner, vm).await?;
422                match item {
423                    None => {
424                        *self = VmIter::Exhausted;
425                        Ok(None)
426                    }
427                    Some(v) => Ok(Some(v)),
428                }
429            }
430            VmIter::TakeWhile { inner, p, done } => {
431                if *done {
432                    return Ok(None);
433                }
434                let p = p.clone();
435                let item = next_handle(inner, vm).await?;
436                match item {
437                    None => {
438                        *self = VmIter::Exhausted;
439                        Ok(None)
440                    }
441                    Some(v) => {
442                        let keep = vm.call_callable_one(&p, &v).await?;
443                        if keep.is_truthy() {
444                            Ok(Some(v))
445                        } else {
446                            *self = VmIter::Exhausted;
447                            Ok(None)
448                        }
449                    }
450                }
451            }
452            VmIter::TakeUntil { inner, p } => {
453                let p = p.clone();
454                let item = next_handle(inner, vm).await?;
455                match item {
456                    None => {
457                        *self = VmIter::Exhausted;
458                        Ok(None)
459                    }
460                    Some(v) => {
461                        let stop = vm.call_callable_one(&p, &v).await?;
462                        if stop.is_truthy() {
463                            *self = VmIter::Exhausted;
464                            Ok(None)
465                        } else {
466                            Ok(Some(v))
467                        }
468                    }
469                }
470            }
471            VmIter::SkipWhile { inner, p, primed } => {
472                if *primed {
473                    let item = next_handle(inner, vm).await?;
474                    return match item {
475                        None => {
476                            *self = VmIter::Exhausted;
477                            Ok(None)
478                        }
479                        Some(v) => Ok(Some(v)),
480                    };
481                }
482                let p = p.clone();
483                loop {
484                    let item = next_handle(inner, vm).await?;
485                    match item {
486                        None => {
487                            *self = VmIter::Exhausted;
488                            return Ok(None);
489                        }
490                        Some(v) => {
491                            let drop_it = vm.call_callable_one(&p, &v).await?;
492                            if !drop_it.is_truthy() {
493                                *primed = true;
494                                return Ok(Some(v));
495                            }
496                        }
497                    }
498                }
499            }
500            VmIter::Zip { a, b } => {
501                let ia = next_handle(a, vm).await?;
502                let x = match ia {
503                    None => {
504                        *self = VmIter::Exhausted;
505                        return Ok(None);
506                    }
507                    Some(v) => v,
508                };
509                let ib = next_handle(b, vm).await?;
510                let y = match ib {
511                    None => {
512                        *self = VmIter::Exhausted;
513                        return Ok(None);
514                    }
515                    Some(v) => v,
516                };
517                Ok(Some(VmValue::Pair(std::sync::Arc::new((x, y)))))
518            }
519            VmIter::Enumerate { inner, i } => {
520                let item = next_handle(inner, vm).await?;
521                match item {
522                    None => {
523                        *self = VmIter::Exhausted;
524                        Ok(None)
525                    }
526                    Some(v) => {
527                        let idx = *i;
528                        *i += 1;
529                        Ok(Some(VmValue::Pair(std::sync::Arc::new((
530                            VmValue::Int(idx),
531                            v,
532                        )))))
533                    }
534                }
535            }
536            VmIter::Chain { a, b, on_a } => {
537                if *on_a {
538                    let item = next_handle(a, vm).await?;
539                    if let Some(v) = item {
540                        return Ok(Some(v));
541                    }
542                    *on_a = false;
543                }
544                let item = next_handle(b, vm).await?;
545                match item {
546                    None => {
547                        *self = VmIter::Exhausted;
548                        Ok(None)
549                    }
550                    Some(v) => Ok(Some(v)),
551                }
552            }
553            VmIter::Merge { sources, cursor } => loop {
554                if sources.is_empty() || sources.iter().all(Option::is_none) {
555                    *self = VmIter::Exhausted;
556                    return Ok(None);
557                }
558                let len = sources.len();
559                let mut live = 0usize;
560                for offset in 0..len {
561                    let idx = (*cursor + offset) % len;
562                    let Some(handle) = sources[idx].clone() else {
563                        continue;
564                    };
565                    match try_next_ready(&handle, vm).await? {
566                        Some(v) => {
567                            *cursor = (idx + 1) % len;
568                            return Ok(Some(v));
569                        }
570                        None => {
571                            if is_exhausted_handle(&handle) {
572                                sources[idx] = None;
573                            } else {
574                                live += 1;
575                            }
576                        }
577                    }
578                }
579                if live == 0 {
580                    *self = VmIter::Exhausted;
581                    return Ok(None);
582                }
583                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
584            },
585            VmIter::Interleave { sources, cursor } => {
586                if sources.is_empty() || sources.iter().all(Option::is_none) {
587                    *self = VmIter::Exhausted;
588                    return Ok(None);
589                }
590                let len = sources.len();
591                for offset in 0..len {
592                    let idx = (*cursor + offset) % len;
593                    let Some(handle) = sources[idx].clone() else {
594                        continue;
595                    };
596                    match next_handle(&handle, vm).await? {
597                        Some(v) => {
598                            *cursor = (idx + 1) % len;
599                            return Ok(Some(v));
600                        }
601                        None => {
602                            sources[idx] = None;
603                        }
604                    }
605                }
606                *self = VmIter::Exhausted;
607                Ok(None)
608            }
609            VmIter::Race { sources, winner } => {
610                if let Some(handle) = winner.clone() {
611                    let item = next_handle(&handle, vm).await?;
612                    return match item {
613                        Some(v) => Ok(Some(v)),
614                        None => {
615                            *self = VmIter::Exhausted;
616                            Ok(None)
617                        }
618                    };
619                }
620                loop {
621                    let mut live = 0usize;
622                    for source in sources.iter_mut() {
623                        let Some(handle) = source.clone() else {
624                            continue;
625                        };
626                        match try_next_ready(&handle, vm).await? {
627                            Some(v) => {
628                                *winner = Some(handle);
629                                sources.clear();
630                                return Ok(Some(v));
631                            }
632                            None => {
633                                if is_exhausted_handle(&handle) {
634                                    *source = None;
635                                } else {
636                                    live += 1;
637                                }
638                            }
639                        }
640                    }
641                    if live == 0 {
642                        *self = VmIter::Exhausted;
643                        return Ok(None);
644                    }
645                    tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
646                }
647            }
648            VmIter::Broadcast {
649                shared,
650                branch,
651                index,
652            } => {
653                let _ = branch;
654                loop {
655                    let mut state = std::mem::replace(&mut *shared.lock(), empty_broadcast_state());
656                    if *index < state.buffer.len() {
657                        let item = state.buffer[*index].clone();
658                        *index += 1;
659                        *shared.lock() = state;
660                        return Ok(Some(item));
661                    }
662                    if state.exhausted {
663                        *shared.lock() = state;
664                        *self = VmIter::Exhausted;
665                        return Ok(None);
666                    }
667                    let next = next_handle(&state.source, vm).await;
668                    match next {
669                        Err(err) => {
670                            *shared.lock() = state;
671                            return Err(err);
672                        }
673                        Ok(Some(v)) => {
674                            state.buffer.push(v);
675                            *shared.lock() = state;
676                        }
677                        Ok(None) => {
678                            state.exhausted = true;
679                            *shared.lock() = state;
680                        }
681                    }
682                }
683            }
684            VmIter::Throttle {
685                inner,
686                interval_ms,
687                next_ready,
688            } => {
689                if let Some(ready_at) = next_ready.take() {
690                    let now = tokio::time::Instant::now();
691                    if ready_at > now {
692                        tokio::time::sleep_until(ready_at).await;
693                    }
694                }
695                let item = next_handle(inner, vm).await?;
696                match item {
697                    None => {
698                        *self = VmIter::Exhausted;
699                        Ok(None)
700                    }
701                    Some(v) => {
702                        *next_ready = Some(
703                            tokio::time::Instant::now()
704                                + tokio::time::Duration::from_millis(*interval_ms),
705                        );
706                        Ok(Some(v))
707                    }
708                }
709            }
710            VmIter::Debounce { inner, window_ms } => {
711                let mut last = match next_handle(inner, vm).await? {
712                    Some(v) => v,
713                    None => {
714                        *self = VmIter::Exhausted;
715                        return Ok(None);
716                    }
717                };
718                if *window_ms > 0 {
719                    tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
720                }
721                while let Some(v) = try_next_ready(inner, vm).await? {
722                    last = v;
723                }
724                Ok(Some(last))
725            }
726            VmIter::Chunks { inner, n } => {
727                let n = *n;
728                let mut batch: Vec<VmValue> = Vec::with_capacity(n);
729                for _ in 0..n {
730                    let item = next_handle(inner, vm).await?;
731                    match item {
732                        Some(v) => {
733                            batch.push(v);
734                        }
735                        None => break,
736                    }
737                }
738                if batch.is_empty() {
739                    *self = VmIter::Exhausted;
740                    Ok(None)
741                } else {
742                    Ok(Some(VmValue::List(std::sync::Arc::new(batch))))
743                }
744            }
745            VmIter::Windows { inner, n, buf } => {
746                let n = *n;
747                if buf.is_empty() {
748                    while buf.len() < n {
749                        let item = next_handle(inner, vm).await?;
750                        match item {
751                            Some(v) => buf.push_back(v),
752                            None => {
753                                *self = VmIter::Exhausted;
754                                return Ok(None);
755                            }
756                        }
757                    }
758                } else {
759                    let item = next_handle(inner, vm).await?;
760                    match item {
761                        Some(v) => {
762                            buf.pop_front();
763                            buf.push_back(v);
764                        }
765                        None => {
766                            *self = VmIter::Exhausted;
767                            return Ok(None);
768                        }
769                    }
770                }
771                let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
772                Ok(Some(VmValue::List(std::sync::Arc::new(snapshot))))
773            }
774            VmIter::Chan { handle } => {
775                let is_closed = handle.is_closed();
776                let rx = handle.receiver.clone();
777                let mut closed_rx = handle.subscribe_closed();
778                let mut guard = rx.lock().await;
779                let item = if is_closed {
780                    guard.try_recv().ok()
781                } else {
782                    tokio::select! {
783                        item = guard.recv() => item,
784                        _ = closed_rx.changed() => guard.try_recv().ok(),
785                    }
786                };
787                match item {
788                    Some(v) => Ok(Some(v)),
789                    None => {
790                        drop(guard);
791                        *self = VmIter::Exhausted;
792                        Ok(None)
793                    }
794                }
795            }
796        }
797    }
798}
799
800/// Advance a handle without holding the iterator-state lock across the await.
801///
802/// Swaps the iter state out into a local owned value (replacing it with
803/// `Exhausted`), runs `next` on the owned state, then swaps it back. This
804/// avoids blocking re-entrant iterator access while preserving single-pass
805/// semantics: a nested `next` call on the same handle during the await would
806/// see `Exhausted` (the iter protocol doesn't permit re-entrant stepping of
807/// the same handle anyway).
808pub async fn next_handle(
809    handle: &VmIterHandle,
810    vm: &mut crate::vm::Vm,
811) -> Result<Option<VmValue>, VmError> {
812    let mut state = std::mem::replace(&mut *handle.lock(), VmIter::Exhausted);
813    let result = state.next(vm).await;
814    // Restore state unless the inner call replaced it with Exhausted.
815    *handle.lock() = state;
816    result
817}
818
819/// Fully consume an iter handle into a Vec of values.
820pub async fn drain(handle: &VmIterHandle, vm: &mut crate::vm::Vm) -> Result<Vec<VmValue>, VmError> {
821    let mut out = Vec::new();
822    loop {
823        let v = next_handle(handle, vm).await?;
824        match v {
825            Some(v) => out.push(v),
826            None => break,
827        }
828    }
829    Ok(out)
830}
831
832/// Fully consume an iter handle into a Vec, failing before pushing item
833/// `max + 1`.
834pub async fn drain_capped(
835    handle: &VmIterHandle,
836    vm: &mut crate::vm::Vm,
837    max: usize,
838) -> Result<Vec<VmValue>, VmError> {
839    let mut out = Vec::new();
840    loop {
841        let v = next_handle(handle, vm).await?;
842        match v {
843            Some(v) => {
844                if out.len() >= max {
845                    return Err(VmError::Runtime(format!(
846                        "stream.collect: max cap {max} exceeded"
847                    )));
848                }
849                out.push(v);
850            }
851            None => break,
852        }
853    }
854    Ok(out)
855}
856
857pub fn iter_handle_from_value(v: VmValue) -> Result<VmIterHandle, VmError> {
858    match iter_from_value(v)? {
859        VmValue::Iter(handle) => Ok(handle),
860        _ => unreachable!("iter_from_value returns Iter"),
861    }
862}
863
864pub fn broadcast_branches(source: VmIterHandle, n: usize) -> Vec<VmValue> {
865    let shared = Arc::new(Mutex::new(VmBroadcastState {
866        source,
867        buffer: Vec::new(),
868        exhausted: false,
869    }));
870    (0..n)
871        .map(|branch| {
872            VmValue::Iter(Arc::new(Mutex::new(VmIter::Broadcast {
873                shared: Arc::clone(&shared),
874                branch,
875                index: 0,
876            })))
877        })
878        .collect()
879}
880
881fn empty_broadcast_state() -> VmBroadcastState {
882    VmBroadcastState {
883        source: Arc::new(Mutex::new(VmIter::Exhausted)),
884        buffer: Vec::new(),
885        exhausted: true,
886    }
887}
888
889fn try_next_ready<'a>(
890    handle: &'a VmIterHandle,
891    vm: &'a mut crate::vm::Vm,
892) -> std::pin::Pin<
893    Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + Send + 'a>,
894> {
895    Box::pin(async move {
896        let mut state = std::mem::replace(&mut *handle.lock(), VmIter::Exhausted);
897        let result = state.try_next_ready_impl(vm).await;
898        *handle.lock() = state;
899        result
900    })
901}
902
903fn is_exhausted_handle(handle: &VmIterHandle) -> bool {
904    matches!(&*handle.lock(), VmIter::Exhausted)
905}
906
907impl VmIter {
908    async fn try_next_ready_impl(
909        &mut self,
910        vm: &mut crate::vm::Vm,
911    ) -> Result<Option<VmValue>, VmError> {
912        match self {
913            VmIter::Exhausted => Ok(None),
914            VmIter::Map { inner, f } => {
915                let f = f.clone();
916                match try_next_ready(inner, vm).await? {
917                    Some(v) => Ok(Some(vm.call_callable_one(&f, &v).await?)),
918                    None => {
919                        if is_exhausted_handle(inner) {
920                            *self = VmIter::Exhausted;
921                        }
922                        Ok(None)
923                    }
924                }
925            }
926            VmIter::Tap { inner, f } => {
927                let f = f.clone();
928                match try_next_ready(inner, vm).await? {
929                    Some(v) => {
930                        vm.call_callable_one(&f, &v).await?;
931                        Ok(Some(v))
932                    }
933                    None => {
934                        if is_exhausted_handle(inner) {
935                            *self = VmIter::Exhausted;
936                        }
937                        Ok(None)
938                    }
939                }
940            }
941            VmIter::Filter { inner, p } => {
942                let p = p.clone();
943                loop {
944                    match try_next_ready(inner, vm).await? {
945                        Some(v) => {
946                            let keep = vm.call_callable_one(&p, &v).await?;
947                            if keep.is_truthy() {
948                                return Ok(Some(v));
949                            }
950                        }
951                        None => {
952                            if is_exhausted_handle(inner) {
953                                *self = VmIter::Exhausted;
954                            }
955                            return Ok(None);
956                        }
957                    }
958                }
959            }
960            VmIter::Scan { inner, acc, f } => {
961                let f = f.clone();
962                match try_next_ready(inner, vm).await? {
963                    Some(v) => {
964                        let next_acc = vm.call_callable_two(&f, acc, &v).await?;
965                        *acc = next_acc.clone();
966                        Ok(Some(next_acc))
967                    }
968                    None => {
969                        if is_exhausted_handle(inner) {
970                            *self = VmIter::Exhausted;
971                        }
972                        Ok(None)
973                    }
974                }
975            }
976            VmIter::FlatMap { inner, f, cur } => {
977                let f = f.clone();
978                loop {
979                    if let Some(cur_iter) = cur.clone() {
980                        match try_next_ready(&cur_iter, vm).await? {
981                            Some(v) => return Ok(Some(v)),
982                            None => {
983                                if is_exhausted_handle(&cur_iter) {
984                                    *cur = None;
985                                }
986                                return Ok(None);
987                            }
988                        }
989                    }
990                    match try_next_ready(inner, vm).await? {
991                        Some(v) => {
992                            let result = vm.call_callable_one(&f, &v).await?;
993                            *cur = Some(iter_handle_from_value(result)?);
994                        }
995                        None => {
996                            if is_exhausted_handle(inner) {
997                                *self = VmIter::Exhausted;
998                            }
999                            return Ok(None);
1000                        }
1001                    }
1002                }
1003            }
1004            VmIter::Take { inner, remaining } => {
1005                if *remaining == 0 {
1006                    *self = VmIter::Exhausted;
1007                    return Ok(None);
1008                }
1009                match try_next_ready(inner, vm).await? {
1010                    Some(v) => {
1011                        *remaining -= 1;
1012                        if *remaining == 0 {
1013                            *self = VmIter::Exhausted;
1014                        }
1015                        Ok(Some(v))
1016                    }
1017                    None => {
1018                        if is_exhausted_handle(inner) {
1019                            *self = VmIter::Exhausted;
1020                        }
1021                        Ok(None)
1022                    }
1023                }
1024            }
1025            VmIter::TakeUntil { inner, p } => {
1026                let p = p.clone();
1027                match try_next_ready(inner, vm).await? {
1028                    Some(v) => {
1029                        let stop = vm.call_callable_one(&p, &v).await?;
1030                        if stop.is_truthy() {
1031                            *self = VmIter::Exhausted;
1032                            Ok(None)
1033                        } else {
1034                            Ok(Some(v))
1035                        }
1036                    }
1037                    None => {
1038                        if is_exhausted_handle(inner) {
1039                            *self = VmIter::Exhausted;
1040                        }
1041                        Ok(None)
1042                    }
1043                }
1044            }
1045            VmIter::Throttle {
1046                inner,
1047                interval_ms,
1048                next_ready,
1049            } => {
1050                if let Some(ready_at) = *next_ready {
1051                    if ready_at > tokio::time::Instant::now() {
1052                        return Ok(None);
1053                    }
1054                }
1055                match try_next_ready(inner, vm).await? {
1056                    Some(v) => {
1057                        *next_ready = Some(
1058                            tokio::time::Instant::now()
1059                                + tokio::time::Duration::from_millis(*interval_ms),
1060                        );
1061                        Ok(Some(v))
1062                    }
1063                    None => {
1064                        if is_exhausted_handle(inner) {
1065                            *self = VmIter::Exhausted;
1066                        }
1067                        Ok(None)
1068                    }
1069                }
1070            }
1071            VmIter::Range { .. }
1072            | VmIter::Vec { .. }
1073            | VmIter::Dict { .. }
1074            | VmIter::Chars { .. }
1075            | VmIter::Skip { .. }
1076            | VmIter::TakeWhile { .. }
1077            | VmIter::SkipWhile { .. }
1078            | VmIter::Zip { .. }
1079            | VmIter::Enumerate { .. }
1080            | VmIter::Chain { .. }
1081            | VmIter::Merge { .. }
1082            | VmIter::Interleave { .. }
1083            | VmIter::Race { .. }
1084            | VmIter::Broadcast { .. }
1085            | VmIter::Debounce { .. }
1086            | VmIter::Chunks { .. }
1087            | VmIter::Windows { .. } => self.next(vm).await,
1088            VmIter::Gen { gen } => {
1089                if gen.is_done() {
1090                    *self = VmIter::Exhausted;
1091                    return Ok(None);
1092                }
1093                let rx = gen.receiver.clone();
1094                let result = match rx.try_lock() {
1095                    Ok(mut guard) => match guard.try_recv() {
1096                        Ok(Ok(v)) => Ok(Some(v)),
1097                        Ok(Err(error)) => {
1098                            gen.mark_done();
1099                            *self = VmIter::Exhausted;
1100                            Err(error)
1101                        }
1102                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1103                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1104                            gen.mark_done();
1105                            *self = VmIter::Exhausted;
1106                            Ok(None)
1107                        }
1108                    },
1109                    Err(_) => Ok(None),
1110                };
1111                result
1112            }
1113            VmIter::Stream { stream } => {
1114                if stream.is_done() {
1115                    *self = VmIter::Exhausted;
1116                    return Ok(None);
1117                }
1118                let rx = stream.receiver.clone();
1119                let result = match rx.try_lock() {
1120                    Ok(mut guard) => match guard.try_recv() {
1121                        Ok(Ok(v)) => Ok(Some(v)),
1122                        Ok(Err(error)) => {
1123                            stream.mark_done();
1124                            *self = VmIter::Exhausted;
1125                            Err(error)
1126                        }
1127                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1128                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1129                            stream.mark_done();
1130                            *self = VmIter::Exhausted;
1131                            Ok(None)
1132                        }
1133                    },
1134                    Err(_) => Ok(None),
1135                };
1136                result
1137            }
1138            VmIter::Chan { handle } => {
1139                let rx = handle.receiver.clone();
1140                let result = match rx.try_lock() {
1141                    Ok(mut guard) => match guard.try_recv() {
1142                        Ok(v) => Ok(Some(v)),
1143                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1144                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1145                            *self = VmIter::Exhausted;
1146                            Ok(None)
1147                        }
1148                    },
1149                    Err(_) => Ok(None),
1150                };
1151                result
1152            }
1153        }
1154    }
1155}
1156
1157/// Convenience: wrap a source value into a `VmValue::Iter`. Used by the
1158/// `iter()` builtin and by combinator/sink implementations in later steps.
1159pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1160    let inner = match v {
1161        VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1162        VmValue::Range(r) => VmIter::Range {
1163            next: r.start,
1164            end: r.end,
1165            inclusive: r.inclusive,
1166            done: range_initial_done(r.start, r.end, r.inclusive),
1167        },
1168        VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1169        VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1170        VmValue::Dict(entries) => {
1171            let keys: Vec<String> = entries.keys().cloned().collect();
1172            VmIter::Dict {
1173                entries,
1174                keys,
1175                idx: 0,
1176            }
1177        }
1178        VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1179        VmValue::Generator(gen) => VmIter::Gen { gen },
1180        VmValue::Stream(stream) => VmIter::Stream { stream },
1181        VmValue::Channel(handle) => VmIter::Chan { handle },
1182        other => {
1183            return Err(VmError::TypeError(format!(
1184                "iter: value of type {} is not iterable",
1185                other.type_name()
1186            )))
1187        }
1188    };
1189    Ok(VmValue::Iter(Arc::new(Mutex::new(inner))))
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194    use super::*;
1195    use crate::value::VmRange;
1196
1197    fn run_iter_test(test: impl std::future::Future<Output = ()>) {
1198        let rt = tokio::runtime::Builder::new_current_thread()
1199            .enable_all()
1200            .build()
1201            .unwrap();
1202        rt.block_on(test);
1203    }
1204
1205    #[test]
1206    fn inclusive_range_at_i64_max_yields_the_endpoint() {
1207        run_iter_test(async {
1208            let mut vm = crate::vm::Vm::new();
1209            let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1210                start: i64::MAX,
1211                end: i64::MAX,
1212                inclusive: true,
1213            }))
1214            .unwrap() else {
1215                panic!("expected iter");
1216            };
1217
1218            let first = next_handle(&handle, &mut vm).await.unwrap();
1219            assert!(matches!(first, Some(VmValue::Int(value)) if value == i64::MAX));
1220            assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1221        });
1222    }
1223
1224    #[test]
1225    fn exclusive_range_at_i64_max_is_empty() {
1226        run_iter_test(async {
1227            let mut vm = crate::vm::Vm::new();
1228            let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1229                start: i64::MAX,
1230                end: i64::MAX,
1231                inclusive: false,
1232            }))
1233            .unwrap() else {
1234                panic!("expected iter");
1235            };
1236
1237            assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1238        });
1239    }
1240}