1use std::cell::RefCell;
8use std::collections::{BTreeMap, VecDeque};
9use std::rc::Rc;
10
11use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
12
13fn range_initial_done(start: i64, end: i64, inclusive: bool) -> bool {
14 if inclusive {
15 start > end
16 } else {
17 start >= end
18 }
19}
20
21fn range_next(next: &mut i64, end: i64, inclusive: bool, done: &mut bool) -> Option<i64> {
22 if *done {
23 return None;
24 }
25 let value = *next;
26 let at_end = if inclusive {
27 value >= end
28 } else {
29 value
30 .checked_add(1)
31 .is_none_or(|candidate| candidate >= end)
32 };
33 if at_end {
34 *done = true;
35 } else {
36 *next += 1;
37 }
38 Some(value)
39}
40
41#[derive(Debug)]
42pub struct VmBroadcastState {
43 source: Rc<RefCell<VmIter>>,
44 buffer: Vec<VmValue>,
45 exhausted: bool,
46}
47
48#[derive(Debug)]
50pub enum VmIter {
51 Range {
53 next: i64,
54 end: i64,
55 inclusive: bool,
56 done: bool,
57 },
58 Vec { items: Rc<Vec<VmValue>>, idx: usize },
60 Dict {
62 entries: Rc<BTreeMap<String, VmValue>>,
63 keys: Vec<String>,
64 idx: usize,
65 },
66 Chars { s: Rc<str>, byte_idx: usize },
68 Gen { gen: VmGenerator },
70 Stream { stream: VmStream },
72 Chan { handle: VmChannelHandle },
74 Map {
76 inner: Rc<RefCell<VmIter>>,
77 f: VmValue,
78 },
79 Tap {
81 inner: Rc<RefCell<VmIter>>,
82 f: VmValue,
83 },
84 Filter {
86 inner: Rc<RefCell<VmIter>>,
87 p: VmValue,
88 },
89 Scan {
91 inner: Rc<RefCell<VmIter>>,
92 acc: VmValue,
93 f: VmValue,
94 },
95 FlatMap {
97 inner: Rc<RefCell<VmIter>>,
98 f: VmValue,
99 cur: Option<Rc<RefCell<VmIter>>>,
100 },
101 Take {
103 inner: Rc<RefCell<VmIter>>,
104 remaining: usize,
105 },
106 Skip {
109 inner: Rc<RefCell<VmIter>>,
110 remaining: usize,
111 },
112 TakeWhile {
115 inner: Rc<RefCell<VmIter>>,
116 p: VmValue,
117 done: bool,
118 },
119 TakeUntil {
122 inner: Rc<RefCell<VmIter>>,
123 p: VmValue,
124 },
125 SkipWhile {
128 inner: Rc<RefCell<VmIter>>,
129 p: VmValue,
130 primed: bool,
131 },
132 Zip {
135 a: Rc<RefCell<VmIter>>,
136 b: Rc<RefCell<VmIter>>,
137 },
138 Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
140 Chain {
142 a: Rc<RefCell<VmIter>>,
143 b: Rc<RefCell<VmIter>>,
144 on_a: bool,
145 },
146 Merge {
148 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
149 cursor: usize,
150 },
151 Interleave {
153 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
154 cursor: usize,
155 },
156 Race {
158 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
159 winner: Option<Rc<RefCell<VmIter>>>,
160 },
161 Broadcast {
163 shared: Rc<RefCell<VmBroadcastState>>,
164 branch: usize,
165 index: usize,
166 },
167 Throttle {
169 inner: Rc<RefCell<VmIter>>,
170 interval_ms: u64,
171 next_ready: Option<tokio::time::Instant>,
172 },
173 Debounce {
176 inner: Rc<RefCell<VmIter>>,
177 window_ms: u64,
178 },
179 Chunks {
182 inner: Rc<RefCell<VmIter>>,
183 n: usize,
184 },
185 Windows {
188 inner: Rc<RefCell<VmIter>>,
189 n: usize,
190 buf: VecDeque<VmValue>,
191 },
192 Exhausted,
194}
195
196impl VmIter {
197 pub fn next<'a>(
202 &'a mut self,
203 vm: &'a mut crate::vm::Vm,
204 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
205 {
206 Box::pin(async move { self.next_impl(vm).await })
207 }
208
209 async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
210 match self {
211 VmIter::Exhausted => Ok(None),
212 VmIter::Range {
213 next,
214 end,
215 inclusive,
216 done,
217 } => {
218 if let Some(v) = range_next(next, *end, *inclusive, done) {
219 Ok(Some(VmValue::Int(v)))
220 } else {
221 *self = VmIter::Exhausted;
222 Ok(None)
223 }
224 }
225 VmIter::Vec { items, idx } => {
226 if *idx < items.len() {
227 let v = items[*idx].clone();
228 *idx += 1;
229 Ok(Some(v))
230 } else {
231 *self = VmIter::Exhausted;
232 Ok(None)
233 }
234 }
235 VmIter::Dict { entries, keys, idx } => {
236 if *idx < keys.len() {
237 let k = &keys[*idx];
238 let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
239 *idx += 1;
240 Ok(Some(VmValue::Pair(Rc::new((
241 VmValue::String(Rc::from(k.as_str())),
242 v,
243 )))))
244 } else {
245 *self = VmIter::Exhausted;
246 Ok(None)
247 }
248 }
249 VmIter::Chars { s, byte_idx } => {
250 if *byte_idx >= s.len() {
251 *self = VmIter::Exhausted;
252 return Ok(None);
253 }
254 let rest = &s[*byte_idx..];
255 if let Some(c) = rest.chars().next() {
256 *byte_idx += c.len_utf8();
257 Ok(Some(VmValue::String(Rc::from(c.to_string().as_str()))))
258 } else {
259 *self = VmIter::Exhausted;
260 Ok(None)
261 }
262 }
263 VmIter::Gen { gen } => {
264 if gen.done.get() {
265 *self = VmIter::Exhausted;
266 return Ok(None);
267 }
268 let rx = gen.receiver.clone();
269 let mut guard = rx.lock().await;
270 match guard.recv().await {
271 Some(Ok(v)) => Ok(Some(v)),
272 Some(Err(error)) => {
273 gen.done.set(true);
274 drop(guard);
275 *self = VmIter::Exhausted;
276 Err(error)
277 }
278 None => {
279 gen.done.set(true);
280 drop(guard);
281 *self = VmIter::Exhausted;
282 Ok(None)
283 }
284 }
285 }
286 VmIter::Stream { stream } => {
287 if stream.done.get() {
288 *self = VmIter::Exhausted;
289 return Ok(None);
290 }
291 let rx = stream.receiver.clone();
292 let mut guard = rx.lock().await;
293 match guard.recv().await {
294 Some(Ok(v)) => Ok(Some(v)),
295 Some(Err(error)) => {
296 stream.done.set(true);
297 drop(guard);
298 *self = VmIter::Exhausted;
299 Err(error)
300 }
301 None => {
302 stream.done.set(true);
303 drop(guard);
304 *self = VmIter::Exhausted;
305 Ok(None)
306 }
307 }
308 }
309 VmIter::Map { inner, f } => {
310 let f = f.clone();
311 let item = next_handle(inner, vm).await?;
312 match item {
313 None => {
314 *self = VmIter::Exhausted;
315 Ok(None)
316 }
317 Some(v) => {
318 let out = vm.call_callable_value(&f, &[v]).await?;
319 Ok(Some(out))
320 }
321 }
322 }
323 VmIter::Tap { inner, f } => {
324 let f = f.clone();
325 let item = next_handle(inner, vm).await?;
326 match item {
327 None => {
328 *self = VmIter::Exhausted;
329 Ok(None)
330 }
331 Some(v) => {
332 vm.call_callable_value(&f, &[v.clone()]).await?;
333 Ok(Some(v))
334 }
335 }
336 }
337 VmIter::Filter { inner, p } => {
338 let p = p.clone();
339 loop {
340 let item = next_handle(inner, vm).await?;
341 match item {
342 None => {
343 *self = VmIter::Exhausted;
344 return Ok(None);
345 }
346 Some(v) => {
347 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
348 if keep.is_truthy() {
349 return Ok(Some(v));
350 }
351 }
352 }
353 }
354 }
355 VmIter::Scan { inner, acc, f } => {
356 let f = f.clone();
357 let item = next_handle(inner, vm).await?;
358 match item {
359 None => {
360 *self = VmIter::Exhausted;
361 Ok(None)
362 }
363 Some(v) => {
364 let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
365 *acc = next_acc.clone();
366 Ok(Some(next_acc))
367 }
368 }
369 }
370 VmIter::FlatMap { inner, f, cur } => {
371 let f = f.clone();
372 loop {
373 if let Some(cur_iter) = cur.clone() {
374 let item = next_handle(&cur_iter, vm).await?;
375 if let Some(v) = item {
376 return Ok(Some(v));
377 }
378 *cur = None;
379 }
380 let item = next_handle(inner, vm).await?;
381 match item {
382 None => {
383 *self = VmIter::Exhausted;
384 return Ok(None);
385 }
386 Some(v) => {
387 let result = vm.call_callable_value(&f, &[v]).await?;
388 let lifted = iter_from_value(result)?;
389 if let VmValue::Iter(h) = lifted {
390 *cur = Some(h);
391 } else {
392 return Err(VmError::TypeError(
393 "flat_map: expected iterable result".to_string(),
394 ));
395 }
396 }
397 }
398 }
399 }
400 VmIter::Take { inner, remaining } => {
401 if *remaining == 0 {
402 *self = VmIter::Exhausted;
403 return Ok(None);
404 }
405 let item = next_handle(inner, vm).await?;
406 match item {
407 None => {
408 *self = VmIter::Exhausted;
409 Ok(None)
410 }
411 Some(v) => {
412 *remaining -= 1;
413 if *remaining == 0 {
414 *self = VmIter::Exhausted;
415 }
416 Ok(Some(v))
417 }
418 }
419 }
420 VmIter::Skip { inner, remaining } => {
421 while *remaining > 0 {
422 let item = next_handle(inner, vm).await?;
423 match item {
424 None => {
425 *self = VmIter::Exhausted;
426 return Ok(None);
427 }
428 Some(_) => {
429 *remaining -= 1;
430 }
431 }
432 }
433 let item = next_handle(inner, vm).await?;
434 match item {
435 None => {
436 *self = VmIter::Exhausted;
437 Ok(None)
438 }
439 Some(v) => Ok(Some(v)),
440 }
441 }
442 VmIter::TakeWhile { inner, p, done } => {
443 if *done {
444 return Ok(None);
445 }
446 let p = p.clone();
447 let item = next_handle(inner, vm).await?;
448 match item {
449 None => {
450 *self = VmIter::Exhausted;
451 Ok(None)
452 }
453 Some(v) => {
454 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
455 if keep.is_truthy() {
456 Ok(Some(v))
457 } else {
458 *self = VmIter::Exhausted;
459 Ok(None)
460 }
461 }
462 }
463 }
464 VmIter::TakeUntil { inner, p } => {
465 let p = p.clone();
466 let item = next_handle(inner, vm).await?;
467 match item {
468 None => {
469 *self = VmIter::Exhausted;
470 Ok(None)
471 }
472 Some(v) => {
473 let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
474 if stop.is_truthy() {
475 *self = VmIter::Exhausted;
476 Ok(None)
477 } else {
478 Ok(Some(v))
479 }
480 }
481 }
482 }
483 VmIter::SkipWhile { inner, p, primed } => {
484 if *primed {
485 let item = next_handle(inner, vm).await?;
486 return match item {
487 None => {
488 *self = VmIter::Exhausted;
489 Ok(None)
490 }
491 Some(v) => Ok(Some(v)),
492 };
493 }
494 let p = p.clone();
495 loop {
496 let item = next_handle(inner, vm).await?;
497 match item {
498 None => {
499 *self = VmIter::Exhausted;
500 return Ok(None);
501 }
502 Some(v) => {
503 let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
504 if !drop_it.is_truthy() {
505 *primed = true;
506 return Ok(Some(v));
507 }
508 }
509 }
510 }
511 }
512 VmIter::Zip { a, b } => {
513 let ia = next_handle(a, vm).await?;
514 let x = match ia {
515 None => {
516 *self = VmIter::Exhausted;
517 return Ok(None);
518 }
519 Some(v) => v,
520 };
521 let ib = next_handle(b, vm).await?;
522 let y = match ib {
523 None => {
524 *self = VmIter::Exhausted;
525 return Ok(None);
526 }
527 Some(v) => v,
528 };
529 Ok(Some(VmValue::Pair(Rc::new((x, y)))))
530 }
531 VmIter::Enumerate { inner, i } => {
532 let item = next_handle(inner, vm).await?;
533 match item {
534 None => {
535 *self = VmIter::Exhausted;
536 Ok(None)
537 }
538 Some(v) => {
539 let idx = *i;
540 *i += 1;
541 Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
542 }
543 }
544 }
545 VmIter::Chain { a, b, on_a } => {
546 if *on_a {
547 let item = next_handle(a, vm).await?;
548 if let Some(v) = item {
549 return Ok(Some(v));
550 }
551 *on_a = false;
552 }
553 let item = next_handle(b, vm).await?;
554 match item {
555 None => {
556 *self = VmIter::Exhausted;
557 Ok(None)
558 }
559 Some(v) => Ok(Some(v)),
560 }
561 }
562 VmIter::Merge { sources, cursor } => loop {
563 if sources.is_empty() || sources.iter().all(Option::is_none) {
564 *self = VmIter::Exhausted;
565 return Ok(None);
566 }
567 let len = sources.len();
568 let mut live = 0usize;
569 for offset in 0..len {
570 let idx = (*cursor + offset) % len;
571 let Some(handle) = sources[idx].clone() else {
572 continue;
573 };
574 match try_next_ready(&handle, vm).await? {
575 Some(v) => {
576 *cursor = (idx + 1) % len;
577 return Ok(Some(v));
578 }
579 None => {
580 if is_exhausted_handle(&handle) {
581 sources[idx] = None;
582 } else {
583 live += 1;
584 }
585 }
586 }
587 }
588 if live == 0 {
589 *self = VmIter::Exhausted;
590 return Ok(None);
591 }
592 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
593 },
594 VmIter::Interleave { sources, cursor } => {
595 if sources.is_empty() || sources.iter().all(Option::is_none) {
596 *self = VmIter::Exhausted;
597 return Ok(None);
598 }
599 let len = sources.len();
600 for offset in 0..len {
601 let idx = (*cursor + offset) % len;
602 let Some(handle) = sources[idx].clone() else {
603 continue;
604 };
605 match next_handle(&handle, vm).await? {
606 Some(v) => {
607 *cursor = (idx + 1) % len;
608 return Ok(Some(v));
609 }
610 None => {
611 sources[idx] = None;
612 }
613 }
614 }
615 *self = VmIter::Exhausted;
616 Ok(None)
617 }
618 VmIter::Race { sources, winner } => {
619 if let Some(handle) = winner.clone() {
620 let item = next_handle(&handle, vm).await?;
621 return match item {
622 Some(v) => Ok(Some(v)),
623 None => {
624 *self = VmIter::Exhausted;
625 Ok(None)
626 }
627 };
628 }
629 loop {
630 let mut live = 0usize;
631 for source in sources.iter_mut() {
632 let Some(handle) = source.clone() else {
633 continue;
634 };
635 match try_next_ready(&handle, vm).await? {
636 Some(v) => {
637 *winner = Some(handle);
638 sources.clear();
639 return Ok(Some(v));
640 }
641 None => {
642 if is_exhausted_handle(&handle) {
643 *source = None;
644 } else {
645 live += 1;
646 }
647 }
648 }
649 }
650 if live == 0 {
651 *self = VmIter::Exhausted;
652 return Ok(None);
653 }
654 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
655 }
656 }
657 VmIter::Broadcast {
658 shared,
659 branch,
660 index,
661 } => {
662 let _ = branch;
663 loop {
664 let mut state =
665 std::mem::replace(&mut *shared.borrow_mut(), empty_broadcast_state());
666 if *index < state.buffer.len() {
667 let item = state.buffer[*index].clone();
668 *index += 1;
669 *shared.borrow_mut() = state;
670 return Ok(Some(item));
671 }
672 if state.exhausted {
673 *shared.borrow_mut() = state;
674 *self = VmIter::Exhausted;
675 return Ok(None);
676 }
677 let next = next_handle(&state.source, vm).await;
678 match next {
679 Err(err) => {
680 *shared.borrow_mut() = state;
681 return Err(err);
682 }
683 Ok(Some(v)) => {
684 state.buffer.push(v);
685 *shared.borrow_mut() = state;
686 }
687 Ok(None) => {
688 state.exhausted = true;
689 *shared.borrow_mut() = state;
690 }
691 }
692 }
693 }
694 VmIter::Throttle {
695 inner,
696 interval_ms,
697 next_ready,
698 } => {
699 if let Some(ready_at) = next_ready.take() {
700 let now = tokio::time::Instant::now();
701 if ready_at > now {
702 tokio::time::sleep_until(ready_at).await;
703 }
704 }
705 let item = next_handle(inner, vm).await?;
706 match item {
707 None => {
708 *self = VmIter::Exhausted;
709 Ok(None)
710 }
711 Some(v) => {
712 *next_ready = Some(
713 tokio::time::Instant::now()
714 + tokio::time::Duration::from_millis(*interval_ms),
715 );
716 Ok(Some(v))
717 }
718 }
719 }
720 VmIter::Debounce { inner, window_ms } => {
721 let mut last = match next_handle(inner, vm).await? {
722 Some(v) => v,
723 None => {
724 *self = VmIter::Exhausted;
725 return Ok(None);
726 }
727 };
728 if *window_ms > 0 {
729 tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
730 }
731 while let Some(v) = try_next_ready(inner, vm).await? {
732 last = v;
733 }
734 Ok(Some(last))
735 }
736 VmIter::Chunks { inner, n } => {
737 let n = *n;
738 let mut batch: Vec<VmValue> = Vec::with_capacity(n);
739 for _ in 0..n {
740 let item = next_handle(inner, vm).await?;
741 match item {
742 Some(v) => {
743 batch.push(v);
744 }
745 None => break,
746 }
747 }
748 if batch.is_empty() {
749 *self = VmIter::Exhausted;
750 Ok(None)
751 } else {
752 Ok(Some(VmValue::List(Rc::new(batch))))
753 }
754 }
755 VmIter::Windows { inner, n, buf } => {
756 let n = *n;
757 if buf.is_empty() {
758 while buf.len() < n {
759 let item = next_handle(inner, vm).await?;
760 match item {
761 Some(v) => buf.push_back(v),
762 None => {
763 *self = VmIter::Exhausted;
764 return Ok(None);
765 }
766 }
767 }
768 } else {
769 let item = next_handle(inner, vm).await?;
770 match item {
771 Some(v) => {
772 buf.pop_front();
773 buf.push_back(v);
774 }
775 None => {
776 *self = VmIter::Exhausted;
777 return Ok(None);
778 }
779 }
780 }
781 let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
782 Ok(Some(VmValue::List(Rc::new(snapshot))))
783 }
784 VmIter::Chan { handle } => {
785 let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
786 let rx = handle.receiver.clone();
787 let mut guard = rx.lock().await;
788 let item = if is_closed {
789 guard.try_recv().ok()
790 } else {
791 guard.recv().await
792 };
793 match item {
794 Some(v) => Ok(Some(v)),
795 None => {
796 drop(guard);
797 *self = VmIter::Exhausted;
798 Ok(None)
799 }
800 }
801 }
802 }
803 }
804}
805
806pub async fn next_handle(
815 handle: &Rc<RefCell<VmIter>>,
816 vm: &mut crate::vm::Vm,
817) -> Result<Option<VmValue>, VmError> {
818 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
819 let result = state.next(vm).await;
820 *handle.borrow_mut() = state;
822 result
823}
824
825pub async fn drain(
827 handle: &Rc<RefCell<VmIter>>,
828 vm: &mut crate::vm::Vm,
829) -> Result<Vec<VmValue>, VmError> {
830 let mut out = Vec::new();
831 loop {
832 let v = next_handle(handle, vm).await?;
833 match v {
834 Some(v) => out.push(v),
835 None => break,
836 }
837 }
838 Ok(out)
839}
840
841pub async fn drain_capped(
844 handle: &Rc<RefCell<VmIter>>,
845 vm: &mut crate::vm::Vm,
846 max: usize,
847) -> Result<Vec<VmValue>, VmError> {
848 let mut out = Vec::new();
849 loop {
850 let v = next_handle(handle, vm).await?;
851 match v {
852 Some(v) => {
853 if out.len() >= max {
854 return Err(VmError::Runtime(format!(
855 "stream.collect: max cap {max} exceeded"
856 )));
857 }
858 out.push(v);
859 }
860 None => break,
861 }
862 }
863 Ok(out)
864}
865
866pub fn iter_handle_from_value(v: VmValue) -> Result<Rc<RefCell<VmIter>>, VmError> {
867 match iter_from_value(v)? {
868 VmValue::Iter(handle) => Ok(handle),
869 _ => unreachable!("iter_from_value returns Iter"),
870 }
871}
872
873pub fn broadcast_branches(source: Rc<RefCell<VmIter>>, n: usize) -> Vec<VmValue> {
874 let shared = Rc::new(RefCell::new(VmBroadcastState {
875 source,
876 buffer: Vec::new(),
877 exhausted: false,
878 }));
879 (0..n)
880 .map(|branch| {
881 VmValue::Iter(Rc::new(RefCell::new(VmIter::Broadcast {
882 shared: Rc::clone(&shared),
883 branch,
884 index: 0,
885 })))
886 })
887 .collect()
888}
889
890fn empty_broadcast_state() -> VmBroadcastState {
891 VmBroadcastState {
892 source: Rc::new(RefCell::new(VmIter::Exhausted)),
893 buffer: Vec::new(),
894 exhausted: true,
895 }
896}
897
898fn try_next_ready<'a>(
899 handle: &'a Rc<RefCell<VmIter>>,
900 vm: &'a mut crate::vm::Vm,
901) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>> {
902 Box::pin(async move {
903 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
904 let result = state.try_next_ready_impl(vm).await;
905 *handle.borrow_mut() = state;
906 result
907 })
908}
909
910fn is_exhausted_handle(handle: &Rc<RefCell<VmIter>>) -> bool {
911 matches!(&*handle.borrow(), VmIter::Exhausted)
912}
913
914impl VmIter {
915 async fn try_next_ready_impl(
916 &mut self,
917 vm: &mut crate::vm::Vm,
918 ) -> Result<Option<VmValue>, VmError> {
919 match self {
920 VmIter::Exhausted => Ok(None),
921 VmIter::Map { inner, f } => {
922 let f = f.clone();
923 match try_next_ready(inner, vm).await? {
924 Some(v) => Ok(Some(vm.call_callable_value(&f, &[v]).await?)),
925 None => {
926 if is_exhausted_handle(inner) {
927 *self = VmIter::Exhausted;
928 }
929 Ok(None)
930 }
931 }
932 }
933 VmIter::Tap { inner, f } => {
934 let f = f.clone();
935 match try_next_ready(inner, vm).await? {
936 Some(v) => {
937 vm.call_callable_value(&f, &[v.clone()]).await?;
938 Ok(Some(v))
939 }
940 None => {
941 if is_exhausted_handle(inner) {
942 *self = VmIter::Exhausted;
943 }
944 Ok(None)
945 }
946 }
947 }
948 VmIter::Filter { inner, p } => {
949 let p = p.clone();
950 loop {
951 match try_next_ready(inner, vm).await? {
952 Some(v) => {
953 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
954 if keep.is_truthy() {
955 return Ok(Some(v));
956 }
957 }
958 None => {
959 if is_exhausted_handle(inner) {
960 *self = VmIter::Exhausted;
961 }
962 return Ok(None);
963 }
964 }
965 }
966 }
967 VmIter::Scan { inner, acc, f } => {
968 let f = f.clone();
969 match try_next_ready(inner, vm).await? {
970 Some(v) => {
971 let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
972 *acc = next_acc.clone();
973 Ok(Some(next_acc))
974 }
975 None => {
976 if is_exhausted_handle(inner) {
977 *self = VmIter::Exhausted;
978 }
979 Ok(None)
980 }
981 }
982 }
983 VmIter::FlatMap { inner, f, cur } => {
984 let f = f.clone();
985 loop {
986 if let Some(cur_iter) = cur.clone() {
987 match try_next_ready(&cur_iter, vm).await? {
988 Some(v) => return Ok(Some(v)),
989 None => {
990 if is_exhausted_handle(&cur_iter) {
991 *cur = None;
992 }
993 return Ok(None);
994 }
995 }
996 }
997 match try_next_ready(inner, vm).await? {
998 Some(v) => {
999 let result = vm.call_callable_value(&f, &[v]).await?;
1000 *cur = Some(iter_handle_from_value(result)?);
1001 }
1002 None => {
1003 if is_exhausted_handle(inner) {
1004 *self = VmIter::Exhausted;
1005 }
1006 return Ok(None);
1007 }
1008 }
1009 }
1010 }
1011 VmIter::Take { inner, remaining } => {
1012 if *remaining == 0 {
1013 *self = VmIter::Exhausted;
1014 return Ok(None);
1015 }
1016 match try_next_ready(inner, vm).await? {
1017 Some(v) => {
1018 *remaining -= 1;
1019 if *remaining == 0 {
1020 *self = VmIter::Exhausted;
1021 }
1022 Ok(Some(v))
1023 }
1024 None => {
1025 if is_exhausted_handle(inner) {
1026 *self = VmIter::Exhausted;
1027 }
1028 Ok(None)
1029 }
1030 }
1031 }
1032 VmIter::TakeUntil { inner, p } => {
1033 let p = p.clone();
1034 match try_next_ready(inner, vm).await? {
1035 Some(v) => {
1036 let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
1037 if stop.is_truthy() {
1038 *self = VmIter::Exhausted;
1039 Ok(None)
1040 } else {
1041 Ok(Some(v))
1042 }
1043 }
1044 None => {
1045 if is_exhausted_handle(inner) {
1046 *self = VmIter::Exhausted;
1047 }
1048 Ok(None)
1049 }
1050 }
1051 }
1052 VmIter::Throttle {
1053 inner,
1054 interval_ms,
1055 next_ready,
1056 } => {
1057 if let Some(ready_at) = *next_ready {
1058 if ready_at > tokio::time::Instant::now() {
1059 return Ok(None);
1060 }
1061 }
1062 match try_next_ready(inner, vm).await? {
1063 Some(v) => {
1064 *next_ready = Some(
1065 tokio::time::Instant::now()
1066 + tokio::time::Duration::from_millis(*interval_ms),
1067 );
1068 Ok(Some(v))
1069 }
1070 None => {
1071 if is_exhausted_handle(inner) {
1072 *self = VmIter::Exhausted;
1073 }
1074 Ok(None)
1075 }
1076 }
1077 }
1078 VmIter::Range { .. }
1079 | VmIter::Vec { .. }
1080 | VmIter::Dict { .. }
1081 | VmIter::Chars { .. }
1082 | VmIter::Skip { .. }
1083 | VmIter::TakeWhile { .. }
1084 | VmIter::SkipWhile { .. }
1085 | VmIter::Zip { .. }
1086 | VmIter::Enumerate { .. }
1087 | VmIter::Chain { .. }
1088 | VmIter::Merge { .. }
1089 | VmIter::Interleave { .. }
1090 | VmIter::Race { .. }
1091 | VmIter::Broadcast { .. }
1092 | VmIter::Debounce { .. }
1093 | VmIter::Chunks { .. }
1094 | VmIter::Windows { .. } => self.next(vm).await,
1095 VmIter::Gen { gen } => {
1096 if gen.done.get() {
1097 *self = VmIter::Exhausted;
1098 return Ok(None);
1099 }
1100 let rx = gen.receiver.clone();
1101 let result = match rx.try_lock() {
1102 Ok(mut guard) => match guard.try_recv() {
1103 Ok(Ok(v)) => Ok(Some(v)),
1104 Ok(Err(error)) => {
1105 gen.done.set(true);
1106 *self = VmIter::Exhausted;
1107 Err(error)
1108 }
1109 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1110 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1111 gen.done.set(true);
1112 *self = VmIter::Exhausted;
1113 Ok(None)
1114 }
1115 },
1116 Err(_) => Ok(None),
1117 };
1118 result
1119 }
1120 VmIter::Stream { stream } => {
1121 if stream.done.get() {
1122 *self = VmIter::Exhausted;
1123 return Ok(None);
1124 }
1125 let rx = stream.receiver.clone();
1126 let result = match rx.try_lock() {
1127 Ok(mut guard) => match guard.try_recv() {
1128 Ok(Ok(v)) => Ok(Some(v)),
1129 Ok(Err(error)) => {
1130 stream.done.set(true);
1131 *self = VmIter::Exhausted;
1132 Err(error)
1133 }
1134 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1135 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1136 stream.done.set(true);
1137 *self = VmIter::Exhausted;
1138 Ok(None)
1139 }
1140 },
1141 Err(_) => Ok(None),
1142 };
1143 result
1144 }
1145 VmIter::Chan { handle } => {
1146 let rx = handle.receiver.clone();
1147 let result = match rx.try_lock() {
1148 Ok(mut guard) => match guard.try_recv() {
1149 Ok(v) => Ok(Some(v)),
1150 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1151 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1152 *self = VmIter::Exhausted;
1153 Ok(None)
1154 }
1155 },
1156 Err(_) => Ok(None),
1157 };
1158 result
1159 }
1160 }
1161 }
1162}
1163
1164pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1167 let inner = match v {
1168 VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1169 VmValue::Range(r) => VmIter::Range {
1170 next: r.start,
1171 end: r.end,
1172 inclusive: r.inclusive,
1173 done: range_initial_done(r.start, r.end, r.inclusive),
1174 },
1175 VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1176 VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1177 VmValue::Dict(entries) => {
1178 let keys: Vec<String> = entries.keys().cloned().collect();
1179 VmIter::Dict {
1180 entries,
1181 keys,
1182 idx: 0,
1183 }
1184 }
1185 VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1186 VmValue::Generator(gen) => VmIter::Gen { gen },
1187 VmValue::Stream(stream) => VmIter::Stream { stream },
1188 VmValue::Channel(handle) => VmIter::Chan { handle },
1189 other => {
1190 return Err(VmError::TypeError(format!(
1191 "iter: value of type {} is not iterable",
1192 other.type_name()
1193 )))
1194 }
1195 };
1196 Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201 use super::*;
1202 use crate::value::VmRange;
1203
1204 fn run_iter_test(test: impl std::future::Future<Output = ()>) {
1205 let rt = tokio::runtime::Builder::new_current_thread()
1206 .enable_all()
1207 .build()
1208 .unwrap();
1209 rt.block_on(test);
1210 }
1211
1212 #[test]
1213 fn inclusive_range_at_i64_max_yields_the_endpoint() {
1214 run_iter_test(async {
1215 let mut vm = crate::vm::Vm::new();
1216 let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1217 start: i64::MAX,
1218 end: i64::MAX,
1219 inclusive: true,
1220 }))
1221 .unwrap() else {
1222 panic!("expected iter");
1223 };
1224
1225 let first = next_handle(&handle, &mut vm).await.unwrap();
1226 assert!(matches!(first, Some(VmValue::Int(value)) if value == i64::MAX));
1227 assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1228 });
1229 }
1230
1231 #[test]
1232 fn exclusive_range_at_i64_max_is_empty() {
1233 run_iter_test(async {
1234 let mut vm = crate::vm::Vm::new();
1235 let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1236 start: i64::MAX,
1237 end: i64::MAX,
1238 inclusive: false,
1239 }))
1240 .unwrap() else {
1241 panic!("expected iter");
1242 };
1243
1244 assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1245 });
1246 }
1247}