1use std::cell::RefCell;
8use std::collections::{BTreeMap, VecDeque};
9use std::rc::Rc;
10
11use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
12
13fn range_initial_done(start: i64, end: i64, inclusive: bool) -> bool {
14 if inclusive {
15 start > end
16 } else {
17 start >= end
18 }
19}
20
21fn range_next(next: &mut i64, end: i64, inclusive: bool, done: &mut bool) -> Option<i64> {
22 if *done {
23 return None;
24 }
25 let value = *next;
26 let at_end = if inclusive {
27 value >= end
28 } else {
29 value
30 .checked_add(1)
31 .is_none_or(|candidate| candidate >= end)
32 };
33 if at_end {
34 *done = true;
35 } else {
36 *next += 1;
37 }
38 Some(value)
39}
40
41#[derive(Debug)]
42pub struct VmBroadcastState {
43 source: Rc<RefCell<VmIter>>,
44 buffer: Vec<VmValue>,
45 exhausted: bool,
46}
47
48#[derive(Debug)]
50pub enum VmIter {
51 Range {
53 next: i64,
54 end: i64,
55 inclusive: bool,
56 done: bool,
57 },
58 Vec { items: Rc<Vec<VmValue>>, idx: usize },
60 Dict {
62 entries: Rc<BTreeMap<String, VmValue>>,
63 keys: Vec<String>,
64 idx: usize,
65 },
66 Chars { s: Rc<str>, byte_idx: usize },
68 Gen { gen: Rc<VmGenerator> },
70 Stream { stream: Rc<VmStream> },
72 Chan { handle: Rc<VmChannelHandle> },
74 Map {
76 inner: Rc<RefCell<VmIter>>,
77 f: VmValue,
78 },
79 Tap {
81 inner: Rc<RefCell<VmIter>>,
82 f: VmValue,
83 },
84 Filter {
86 inner: Rc<RefCell<VmIter>>,
87 p: VmValue,
88 },
89 Scan {
91 inner: Rc<RefCell<VmIter>>,
92 acc: VmValue,
93 f: VmValue,
94 },
95 FlatMap {
97 inner: Rc<RefCell<VmIter>>,
98 f: VmValue,
99 cur: Option<Rc<RefCell<VmIter>>>,
100 },
101 Take {
103 inner: Rc<RefCell<VmIter>>,
104 remaining: usize,
105 },
106 Skip {
109 inner: Rc<RefCell<VmIter>>,
110 remaining: usize,
111 },
112 TakeWhile {
115 inner: Rc<RefCell<VmIter>>,
116 p: VmValue,
117 done: bool,
118 },
119 TakeUntil {
122 inner: Rc<RefCell<VmIter>>,
123 p: VmValue,
124 },
125 SkipWhile {
128 inner: Rc<RefCell<VmIter>>,
129 p: VmValue,
130 primed: bool,
131 },
132 Zip {
135 a: Rc<RefCell<VmIter>>,
136 b: Rc<RefCell<VmIter>>,
137 },
138 Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
140 Chain {
142 a: Rc<RefCell<VmIter>>,
143 b: Rc<RefCell<VmIter>>,
144 on_a: bool,
145 },
146 Merge {
148 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
149 cursor: usize,
150 },
151 Interleave {
153 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
154 cursor: usize,
155 },
156 Race {
158 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
159 winner: Option<Rc<RefCell<VmIter>>>,
160 },
161 Broadcast {
163 shared: Rc<RefCell<VmBroadcastState>>,
164 branch: usize,
165 index: usize,
166 },
167 Throttle {
169 inner: Rc<RefCell<VmIter>>,
170 interval_ms: u64,
171 next_ready: Option<tokio::time::Instant>,
172 },
173 Debounce {
176 inner: Rc<RefCell<VmIter>>,
177 window_ms: u64,
178 },
179 Chunks {
182 inner: Rc<RefCell<VmIter>>,
183 n: usize,
184 },
185 Windows {
188 inner: Rc<RefCell<VmIter>>,
189 n: usize,
190 buf: VecDeque<VmValue>,
191 },
192 Exhausted,
194}
195
196impl VmIter {
197 pub fn next<'a>(
202 &'a mut self,
203 vm: &'a mut crate::vm::Vm,
204 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
205 {
206 Box::pin(async move { self.next_impl(vm).await })
207 }
208
209 async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
210 match self {
211 VmIter::Exhausted => Ok(None),
212 VmIter::Range {
213 next,
214 end,
215 inclusive,
216 done,
217 } => {
218 if let Some(v) = range_next(next, *end, *inclusive, done) {
219 Ok(Some(VmValue::Int(v)))
220 } else {
221 *self = VmIter::Exhausted;
222 Ok(None)
223 }
224 }
225 VmIter::Vec { items, idx } => {
226 if *idx < items.len() {
227 let v = items[*idx].clone();
228 *idx += 1;
229 Ok(Some(v))
230 } else {
231 *self = VmIter::Exhausted;
232 Ok(None)
233 }
234 }
235 VmIter::Dict { entries, keys, idx } => {
236 if *idx < keys.len() {
237 let k = &keys[*idx];
238 let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
239 *idx += 1;
240 Ok(Some(VmValue::Pair(Rc::new((
241 VmValue::String(Rc::from(k.as_str())),
242 v,
243 )))))
244 } else {
245 *self = VmIter::Exhausted;
246 Ok(None)
247 }
248 }
249 VmIter::Chars { s, byte_idx } => {
250 if *byte_idx >= s.len() {
251 *self = VmIter::Exhausted;
252 return Ok(None);
253 }
254 let rest = &s[*byte_idx..];
255 if let Some(c) = rest.chars().next() {
256 *byte_idx += c.len_utf8();
257 let mut buf = [0u8; 4];
258 let encoded = c.encode_utf8(&mut buf);
259 Ok(Some(VmValue::String(Rc::from(&*encoded))))
260 } else {
261 *self = VmIter::Exhausted;
262 Ok(None)
263 }
264 }
265 VmIter::Gen { gen } => {
266 if gen.done.get() {
267 *self = VmIter::Exhausted;
268 return Ok(None);
269 }
270 let rx = gen.receiver.clone();
271 let mut guard = rx.lock().await;
272 match guard.recv().await {
273 Some(Ok(v)) => Ok(Some(v)),
274 Some(Err(error)) => {
275 gen.done.set(true);
276 drop(guard);
277 *self = VmIter::Exhausted;
278 Err(error)
279 }
280 None => {
281 gen.done.set(true);
282 drop(guard);
283 *self = VmIter::Exhausted;
284 Ok(None)
285 }
286 }
287 }
288 VmIter::Stream { stream } => {
289 if stream.done.get() {
290 *self = VmIter::Exhausted;
291 return Ok(None);
292 }
293 let rx = stream.receiver.clone();
294 let mut guard = rx.lock().await;
295 match guard.recv().await {
296 Some(Ok(v)) => Ok(Some(v)),
297 Some(Err(error)) => {
298 stream.done.set(true);
299 drop(guard);
300 *self = VmIter::Exhausted;
301 Err(error)
302 }
303 None => {
304 stream.done.set(true);
305 drop(guard);
306 *self = VmIter::Exhausted;
307 Ok(None)
308 }
309 }
310 }
311 VmIter::Map { inner, f } => {
312 let f = f.clone();
313 let item = next_handle(inner, vm).await?;
314 match item {
315 None => {
316 *self = VmIter::Exhausted;
317 Ok(None)
318 }
319 Some(v) => {
320 let out = vm.call_callable_value(&f, &[v]).await?;
321 Ok(Some(out))
322 }
323 }
324 }
325 VmIter::Tap { inner, f } => {
326 let f = f.clone();
327 let item = next_handle(inner, vm).await?;
328 match item {
329 None => {
330 *self = VmIter::Exhausted;
331 Ok(None)
332 }
333 Some(v) => {
334 vm.call_callable_value(&f, &[v.clone()]).await?;
335 Ok(Some(v))
336 }
337 }
338 }
339 VmIter::Filter { inner, p } => {
340 let p = p.clone();
341 loop {
342 let item = next_handle(inner, vm).await?;
343 match item {
344 None => {
345 *self = VmIter::Exhausted;
346 return Ok(None);
347 }
348 Some(v) => {
349 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
350 if keep.is_truthy() {
351 return Ok(Some(v));
352 }
353 }
354 }
355 }
356 }
357 VmIter::Scan { inner, acc, f } => {
358 let f = f.clone();
359 let item = next_handle(inner, vm).await?;
360 match item {
361 None => {
362 *self = VmIter::Exhausted;
363 Ok(None)
364 }
365 Some(v) => {
366 let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
367 *acc = next_acc.clone();
368 Ok(Some(next_acc))
369 }
370 }
371 }
372 VmIter::FlatMap { inner, f, cur } => {
373 let f = f.clone();
374 loop {
375 if let Some(cur_iter) = cur.clone() {
376 let item = next_handle(&cur_iter, vm).await?;
377 if let Some(v) = item {
378 return Ok(Some(v));
379 }
380 *cur = None;
381 }
382 let item = next_handle(inner, vm).await?;
383 match item {
384 None => {
385 *self = VmIter::Exhausted;
386 return Ok(None);
387 }
388 Some(v) => {
389 let result = vm.call_callable_value(&f, &[v]).await?;
390 let lifted = iter_from_value(result)?;
391 if let VmValue::Iter(h) = lifted {
392 *cur = Some(h);
393 } else {
394 return Err(VmError::TypeError(
395 "flat_map: expected iterable result".to_string(),
396 ));
397 }
398 }
399 }
400 }
401 }
402 VmIter::Take { inner, remaining } => {
403 if *remaining == 0 {
404 *self = VmIter::Exhausted;
405 return Ok(None);
406 }
407 let item = next_handle(inner, vm).await?;
408 match item {
409 None => {
410 *self = VmIter::Exhausted;
411 Ok(None)
412 }
413 Some(v) => {
414 *remaining -= 1;
415 if *remaining == 0 {
416 *self = VmIter::Exhausted;
417 }
418 Ok(Some(v))
419 }
420 }
421 }
422 VmIter::Skip { inner, remaining } => {
423 while *remaining > 0 {
424 let item = next_handle(inner, vm).await?;
425 match item {
426 None => {
427 *self = VmIter::Exhausted;
428 return Ok(None);
429 }
430 Some(_) => {
431 *remaining -= 1;
432 }
433 }
434 }
435 let item = next_handle(inner, vm).await?;
436 match item {
437 None => {
438 *self = VmIter::Exhausted;
439 Ok(None)
440 }
441 Some(v) => Ok(Some(v)),
442 }
443 }
444 VmIter::TakeWhile { inner, p, done } => {
445 if *done {
446 return Ok(None);
447 }
448 let p = p.clone();
449 let item = next_handle(inner, vm).await?;
450 match item {
451 None => {
452 *self = VmIter::Exhausted;
453 Ok(None)
454 }
455 Some(v) => {
456 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
457 if keep.is_truthy() {
458 Ok(Some(v))
459 } else {
460 *self = VmIter::Exhausted;
461 Ok(None)
462 }
463 }
464 }
465 }
466 VmIter::TakeUntil { inner, p } => {
467 let p = p.clone();
468 let item = next_handle(inner, vm).await?;
469 match item {
470 None => {
471 *self = VmIter::Exhausted;
472 Ok(None)
473 }
474 Some(v) => {
475 let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
476 if stop.is_truthy() {
477 *self = VmIter::Exhausted;
478 Ok(None)
479 } else {
480 Ok(Some(v))
481 }
482 }
483 }
484 }
485 VmIter::SkipWhile { inner, p, primed } => {
486 if *primed {
487 let item = next_handle(inner, vm).await?;
488 return match item {
489 None => {
490 *self = VmIter::Exhausted;
491 Ok(None)
492 }
493 Some(v) => Ok(Some(v)),
494 };
495 }
496 let p = p.clone();
497 loop {
498 let item = next_handle(inner, vm).await?;
499 match item {
500 None => {
501 *self = VmIter::Exhausted;
502 return Ok(None);
503 }
504 Some(v) => {
505 let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
506 if !drop_it.is_truthy() {
507 *primed = true;
508 return Ok(Some(v));
509 }
510 }
511 }
512 }
513 }
514 VmIter::Zip { a, b } => {
515 let ia = next_handle(a, vm).await?;
516 let x = match ia {
517 None => {
518 *self = VmIter::Exhausted;
519 return Ok(None);
520 }
521 Some(v) => v,
522 };
523 let ib = next_handle(b, vm).await?;
524 let y = match ib {
525 None => {
526 *self = VmIter::Exhausted;
527 return Ok(None);
528 }
529 Some(v) => v,
530 };
531 Ok(Some(VmValue::Pair(Rc::new((x, y)))))
532 }
533 VmIter::Enumerate { inner, i } => {
534 let item = next_handle(inner, vm).await?;
535 match item {
536 None => {
537 *self = VmIter::Exhausted;
538 Ok(None)
539 }
540 Some(v) => {
541 let idx = *i;
542 *i += 1;
543 Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
544 }
545 }
546 }
547 VmIter::Chain { a, b, on_a } => {
548 if *on_a {
549 let item = next_handle(a, vm).await?;
550 if let Some(v) = item {
551 return Ok(Some(v));
552 }
553 *on_a = false;
554 }
555 let item = next_handle(b, vm).await?;
556 match item {
557 None => {
558 *self = VmIter::Exhausted;
559 Ok(None)
560 }
561 Some(v) => Ok(Some(v)),
562 }
563 }
564 VmIter::Merge { sources, cursor } => loop {
565 if sources.is_empty() || sources.iter().all(Option::is_none) {
566 *self = VmIter::Exhausted;
567 return Ok(None);
568 }
569 let len = sources.len();
570 let mut live = 0usize;
571 for offset in 0..len {
572 let idx = (*cursor + offset) % len;
573 let Some(handle) = sources[idx].clone() else {
574 continue;
575 };
576 match try_next_ready(&handle, vm).await? {
577 Some(v) => {
578 *cursor = (idx + 1) % len;
579 return Ok(Some(v));
580 }
581 None => {
582 if is_exhausted_handle(&handle) {
583 sources[idx] = None;
584 } else {
585 live += 1;
586 }
587 }
588 }
589 }
590 if live == 0 {
591 *self = VmIter::Exhausted;
592 return Ok(None);
593 }
594 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
595 },
596 VmIter::Interleave { sources, cursor } => {
597 if sources.is_empty() || sources.iter().all(Option::is_none) {
598 *self = VmIter::Exhausted;
599 return Ok(None);
600 }
601 let len = sources.len();
602 for offset in 0..len {
603 let idx = (*cursor + offset) % len;
604 let Some(handle) = sources[idx].clone() else {
605 continue;
606 };
607 match next_handle(&handle, vm).await? {
608 Some(v) => {
609 *cursor = (idx + 1) % len;
610 return Ok(Some(v));
611 }
612 None => {
613 sources[idx] = None;
614 }
615 }
616 }
617 *self = VmIter::Exhausted;
618 Ok(None)
619 }
620 VmIter::Race { sources, winner } => {
621 if let Some(handle) = winner.clone() {
622 let item = next_handle(&handle, vm).await?;
623 return match item {
624 Some(v) => Ok(Some(v)),
625 None => {
626 *self = VmIter::Exhausted;
627 Ok(None)
628 }
629 };
630 }
631 loop {
632 let mut live = 0usize;
633 for source in sources.iter_mut() {
634 let Some(handle) = source.clone() else {
635 continue;
636 };
637 match try_next_ready(&handle, vm).await? {
638 Some(v) => {
639 *winner = Some(handle);
640 sources.clear();
641 return Ok(Some(v));
642 }
643 None => {
644 if is_exhausted_handle(&handle) {
645 *source = None;
646 } else {
647 live += 1;
648 }
649 }
650 }
651 }
652 if live == 0 {
653 *self = VmIter::Exhausted;
654 return Ok(None);
655 }
656 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
657 }
658 }
659 VmIter::Broadcast {
660 shared,
661 branch,
662 index,
663 } => {
664 let _ = branch;
665 loop {
666 let mut state =
667 std::mem::replace(&mut *shared.borrow_mut(), empty_broadcast_state());
668 if *index < state.buffer.len() {
669 let item = state.buffer[*index].clone();
670 *index += 1;
671 *shared.borrow_mut() = state;
672 return Ok(Some(item));
673 }
674 if state.exhausted {
675 *shared.borrow_mut() = state;
676 *self = VmIter::Exhausted;
677 return Ok(None);
678 }
679 let next = next_handle(&state.source, vm).await;
680 match next {
681 Err(err) => {
682 *shared.borrow_mut() = state;
683 return Err(err);
684 }
685 Ok(Some(v)) => {
686 state.buffer.push(v);
687 *shared.borrow_mut() = state;
688 }
689 Ok(None) => {
690 state.exhausted = true;
691 *shared.borrow_mut() = state;
692 }
693 }
694 }
695 }
696 VmIter::Throttle {
697 inner,
698 interval_ms,
699 next_ready,
700 } => {
701 if let Some(ready_at) = next_ready.take() {
702 let now = tokio::time::Instant::now();
703 if ready_at > now {
704 tokio::time::sleep_until(ready_at).await;
705 }
706 }
707 let item = next_handle(inner, vm).await?;
708 match item {
709 None => {
710 *self = VmIter::Exhausted;
711 Ok(None)
712 }
713 Some(v) => {
714 *next_ready = Some(
715 tokio::time::Instant::now()
716 + tokio::time::Duration::from_millis(*interval_ms),
717 );
718 Ok(Some(v))
719 }
720 }
721 }
722 VmIter::Debounce { inner, window_ms } => {
723 let mut last = match next_handle(inner, vm).await? {
724 Some(v) => v,
725 None => {
726 *self = VmIter::Exhausted;
727 return Ok(None);
728 }
729 };
730 if *window_ms > 0 {
731 tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
732 }
733 while let Some(v) = try_next_ready(inner, vm).await? {
734 last = v;
735 }
736 Ok(Some(last))
737 }
738 VmIter::Chunks { inner, n } => {
739 let n = *n;
740 let mut batch: Vec<VmValue> = Vec::with_capacity(n);
741 for _ in 0..n {
742 let item = next_handle(inner, vm).await?;
743 match item {
744 Some(v) => {
745 batch.push(v);
746 }
747 None => break,
748 }
749 }
750 if batch.is_empty() {
751 *self = VmIter::Exhausted;
752 Ok(None)
753 } else {
754 Ok(Some(VmValue::List(Rc::new(batch))))
755 }
756 }
757 VmIter::Windows { inner, n, buf } => {
758 let n = *n;
759 if buf.is_empty() {
760 while buf.len() < n {
761 let item = next_handle(inner, vm).await?;
762 match item {
763 Some(v) => buf.push_back(v),
764 None => {
765 *self = VmIter::Exhausted;
766 return Ok(None);
767 }
768 }
769 }
770 } else {
771 let item = next_handle(inner, vm).await?;
772 match item {
773 Some(v) => {
774 buf.pop_front();
775 buf.push_back(v);
776 }
777 None => {
778 *self = VmIter::Exhausted;
779 return Ok(None);
780 }
781 }
782 }
783 let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
784 Ok(Some(VmValue::List(Rc::new(snapshot))))
785 }
786 VmIter::Chan { handle } => {
787 let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
788 let rx = handle.receiver.clone();
789 let mut guard = rx.lock().await;
790 let item = if is_closed {
791 guard.try_recv().ok()
792 } else {
793 guard.recv().await
794 };
795 match item {
796 Some(v) => Ok(Some(v)),
797 None => {
798 drop(guard);
799 *self = VmIter::Exhausted;
800 Ok(None)
801 }
802 }
803 }
804 }
805 }
806}
807
808pub async fn next_handle(
817 handle: &Rc<RefCell<VmIter>>,
818 vm: &mut crate::vm::Vm,
819) -> Result<Option<VmValue>, VmError> {
820 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
821 let result = state.next(vm).await;
822 *handle.borrow_mut() = state;
824 result
825}
826
827pub async fn drain(
829 handle: &Rc<RefCell<VmIter>>,
830 vm: &mut crate::vm::Vm,
831) -> Result<Vec<VmValue>, VmError> {
832 let mut out = Vec::new();
833 loop {
834 let v = next_handle(handle, vm).await?;
835 match v {
836 Some(v) => out.push(v),
837 None => break,
838 }
839 }
840 Ok(out)
841}
842
843pub async fn drain_capped(
846 handle: &Rc<RefCell<VmIter>>,
847 vm: &mut crate::vm::Vm,
848 max: usize,
849) -> Result<Vec<VmValue>, VmError> {
850 let mut out = Vec::new();
851 loop {
852 let v = next_handle(handle, vm).await?;
853 match v {
854 Some(v) => {
855 if out.len() >= max {
856 return Err(VmError::Runtime(format!(
857 "stream.collect: max cap {max} exceeded"
858 )));
859 }
860 out.push(v);
861 }
862 None => break,
863 }
864 }
865 Ok(out)
866}
867
868pub fn iter_handle_from_value(v: VmValue) -> Result<Rc<RefCell<VmIter>>, VmError> {
869 match iter_from_value(v)? {
870 VmValue::Iter(handle) => Ok(handle),
871 _ => unreachable!("iter_from_value returns Iter"),
872 }
873}
874
875pub fn broadcast_branches(source: Rc<RefCell<VmIter>>, n: usize) -> Vec<VmValue> {
876 let shared = Rc::new(RefCell::new(VmBroadcastState {
877 source,
878 buffer: Vec::new(),
879 exhausted: false,
880 }));
881 (0..n)
882 .map(|branch| {
883 VmValue::Iter(Rc::new(RefCell::new(VmIter::Broadcast {
884 shared: Rc::clone(&shared),
885 branch,
886 index: 0,
887 })))
888 })
889 .collect()
890}
891
892fn empty_broadcast_state() -> VmBroadcastState {
893 VmBroadcastState {
894 source: Rc::new(RefCell::new(VmIter::Exhausted)),
895 buffer: Vec::new(),
896 exhausted: true,
897 }
898}
899
900fn try_next_ready<'a>(
901 handle: &'a Rc<RefCell<VmIter>>,
902 vm: &'a mut crate::vm::Vm,
903) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>> {
904 Box::pin(async move {
905 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
906 let result = state.try_next_ready_impl(vm).await;
907 *handle.borrow_mut() = state;
908 result
909 })
910}
911
912fn is_exhausted_handle(handle: &Rc<RefCell<VmIter>>) -> bool {
913 matches!(&*handle.borrow(), VmIter::Exhausted)
914}
915
916impl VmIter {
917 async fn try_next_ready_impl(
918 &mut self,
919 vm: &mut crate::vm::Vm,
920 ) -> Result<Option<VmValue>, VmError> {
921 match self {
922 VmIter::Exhausted => Ok(None),
923 VmIter::Map { inner, f } => {
924 let f = f.clone();
925 match try_next_ready(inner, vm).await? {
926 Some(v) => Ok(Some(vm.call_callable_value(&f, &[v]).await?)),
927 None => {
928 if is_exhausted_handle(inner) {
929 *self = VmIter::Exhausted;
930 }
931 Ok(None)
932 }
933 }
934 }
935 VmIter::Tap { inner, f } => {
936 let f = f.clone();
937 match try_next_ready(inner, vm).await? {
938 Some(v) => {
939 vm.call_callable_value(&f, &[v.clone()]).await?;
940 Ok(Some(v))
941 }
942 None => {
943 if is_exhausted_handle(inner) {
944 *self = VmIter::Exhausted;
945 }
946 Ok(None)
947 }
948 }
949 }
950 VmIter::Filter { inner, p } => {
951 let p = p.clone();
952 loop {
953 match try_next_ready(inner, vm).await? {
954 Some(v) => {
955 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
956 if keep.is_truthy() {
957 return Ok(Some(v));
958 }
959 }
960 None => {
961 if is_exhausted_handle(inner) {
962 *self = VmIter::Exhausted;
963 }
964 return Ok(None);
965 }
966 }
967 }
968 }
969 VmIter::Scan { inner, acc, f } => {
970 let f = f.clone();
971 match try_next_ready(inner, vm).await? {
972 Some(v) => {
973 let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
974 *acc = next_acc.clone();
975 Ok(Some(next_acc))
976 }
977 None => {
978 if is_exhausted_handle(inner) {
979 *self = VmIter::Exhausted;
980 }
981 Ok(None)
982 }
983 }
984 }
985 VmIter::FlatMap { inner, f, cur } => {
986 let f = f.clone();
987 loop {
988 if let Some(cur_iter) = cur.clone() {
989 match try_next_ready(&cur_iter, vm).await? {
990 Some(v) => return Ok(Some(v)),
991 None => {
992 if is_exhausted_handle(&cur_iter) {
993 *cur = None;
994 }
995 return Ok(None);
996 }
997 }
998 }
999 match try_next_ready(inner, vm).await? {
1000 Some(v) => {
1001 let result = vm.call_callable_value(&f, &[v]).await?;
1002 *cur = Some(iter_handle_from_value(result)?);
1003 }
1004 None => {
1005 if is_exhausted_handle(inner) {
1006 *self = VmIter::Exhausted;
1007 }
1008 return Ok(None);
1009 }
1010 }
1011 }
1012 }
1013 VmIter::Take { inner, remaining } => {
1014 if *remaining == 0 {
1015 *self = VmIter::Exhausted;
1016 return Ok(None);
1017 }
1018 match try_next_ready(inner, vm).await? {
1019 Some(v) => {
1020 *remaining -= 1;
1021 if *remaining == 0 {
1022 *self = VmIter::Exhausted;
1023 }
1024 Ok(Some(v))
1025 }
1026 None => {
1027 if is_exhausted_handle(inner) {
1028 *self = VmIter::Exhausted;
1029 }
1030 Ok(None)
1031 }
1032 }
1033 }
1034 VmIter::TakeUntil { inner, p } => {
1035 let p = p.clone();
1036 match try_next_ready(inner, vm).await? {
1037 Some(v) => {
1038 let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
1039 if stop.is_truthy() {
1040 *self = VmIter::Exhausted;
1041 Ok(None)
1042 } else {
1043 Ok(Some(v))
1044 }
1045 }
1046 None => {
1047 if is_exhausted_handle(inner) {
1048 *self = VmIter::Exhausted;
1049 }
1050 Ok(None)
1051 }
1052 }
1053 }
1054 VmIter::Throttle {
1055 inner,
1056 interval_ms,
1057 next_ready,
1058 } => {
1059 if let Some(ready_at) = *next_ready {
1060 if ready_at > tokio::time::Instant::now() {
1061 return Ok(None);
1062 }
1063 }
1064 match try_next_ready(inner, vm).await? {
1065 Some(v) => {
1066 *next_ready = Some(
1067 tokio::time::Instant::now()
1068 + tokio::time::Duration::from_millis(*interval_ms),
1069 );
1070 Ok(Some(v))
1071 }
1072 None => {
1073 if is_exhausted_handle(inner) {
1074 *self = VmIter::Exhausted;
1075 }
1076 Ok(None)
1077 }
1078 }
1079 }
1080 VmIter::Range { .. }
1081 | VmIter::Vec { .. }
1082 | VmIter::Dict { .. }
1083 | VmIter::Chars { .. }
1084 | VmIter::Skip { .. }
1085 | VmIter::TakeWhile { .. }
1086 | VmIter::SkipWhile { .. }
1087 | VmIter::Zip { .. }
1088 | VmIter::Enumerate { .. }
1089 | VmIter::Chain { .. }
1090 | VmIter::Merge { .. }
1091 | VmIter::Interleave { .. }
1092 | VmIter::Race { .. }
1093 | VmIter::Broadcast { .. }
1094 | VmIter::Debounce { .. }
1095 | VmIter::Chunks { .. }
1096 | VmIter::Windows { .. } => self.next(vm).await,
1097 VmIter::Gen { gen } => {
1098 if gen.done.get() {
1099 *self = VmIter::Exhausted;
1100 return Ok(None);
1101 }
1102 let rx = gen.receiver.clone();
1103 let result = match rx.try_lock() {
1104 Ok(mut guard) => match guard.try_recv() {
1105 Ok(Ok(v)) => Ok(Some(v)),
1106 Ok(Err(error)) => {
1107 gen.done.set(true);
1108 *self = VmIter::Exhausted;
1109 Err(error)
1110 }
1111 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1112 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1113 gen.done.set(true);
1114 *self = VmIter::Exhausted;
1115 Ok(None)
1116 }
1117 },
1118 Err(_) => Ok(None),
1119 };
1120 result
1121 }
1122 VmIter::Stream { stream } => {
1123 if stream.done.get() {
1124 *self = VmIter::Exhausted;
1125 return Ok(None);
1126 }
1127 let rx = stream.receiver.clone();
1128 let result = match rx.try_lock() {
1129 Ok(mut guard) => match guard.try_recv() {
1130 Ok(Ok(v)) => Ok(Some(v)),
1131 Ok(Err(error)) => {
1132 stream.done.set(true);
1133 *self = VmIter::Exhausted;
1134 Err(error)
1135 }
1136 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1137 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1138 stream.done.set(true);
1139 *self = VmIter::Exhausted;
1140 Ok(None)
1141 }
1142 },
1143 Err(_) => Ok(None),
1144 };
1145 result
1146 }
1147 VmIter::Chan { handle } => {
1148 let rx = handle.receiver.clone();
1149 let result = match rx.try_lock() {
1150 Ok(mut guard) => match guard.try_recv() {
1151 Ok(v) => Ok(Some(v)),
1152 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1153 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1154 *self = VmIter::Exhausted;
1155 Ok(None)
1156 }
1157 },
1158 Err(_) => Ok(None),
1159 };
1160 result
1161 }
1162 }
1163 }
1164}
1165
1166pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1169 let inner = match v {
1170 VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1171 VmValue::Range(r) => VmIter::Range {
1172 next: r.start,
1173 end: r.end,
1174 inclusive: r.inclusive,
1175 done: range_initial_done(r.start, r.end, r.inclusive),
1176 },
1177 VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1178 VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1179 VmValue::Dict(entries) => {
1180 let keys: Vec<String> = entries.keys().cloned().collect();
1181 VmIter::Dict {
1182 entries,
1183 keys,
1184 idx: 0,
1185 }
1186 }
1187 VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1188 VmValue::Generator(gen) => VmIter::Gen { gen },
1189 VmValue::Stream(stream) => VmIter::Stream { stream },
1190 VmValue::Channel(handle) => VmIter::Chan { handle },
1191 other => {
1192 return Err(VmError::TypeError(format!(
1193 "iter: value of type {} is not iterable",
1194 other.type_name()
1195 )))
1196 }
1197 };
1198 Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203 use super::*;
1204 use crate::value::VmRange;
1205
1206 fn run_iter_test(test: impl std::future::Future<Output = ()>) {
1207 let rt = tokio::runtime::Builder::new_current_thread()
1208 .enable_all()
1209 .build()
1210 .unwrap();
1211 rt.block_on(test);
1212 }
1213
1214 #[test]
1215 fn inclusive_range_at_i64_max_yields_the_endpoint() {
1216 run_iter_test(async {
1217 let mut vm = crate::vm::Vm::new();
1218 let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1219 start: i64::MAX,
1220 end: i64::MAX,
1221 inclusive: true,
1222 }))
1223 .unwrap() else {
1224 panic!("expected iter");
1225 };
1226
1227 let first = next_handle(&handle, &mut vm).await.unwrap();
1228 assert!(matches!(first, Some(VmValue::Int(value)) if value == i64::MAX));
1229 assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1230 });
1231 }
1232
1233 #[test]
1234 fn exclusive_range_at_i64_max_is_empty() {
1235 run_iter_test(async {
1236 let mut vm = crate::vm::Vm::new();
1237 let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1238 start: i64::MAX,
1239 end: i64::MAX,
1240 inclusive: false,
1241 }))
1242 .unwrap() else {
1243 panic!("expected iter");
1244 };
1245
1246 assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1247 });
1248 }
1249}