1use std::collections::{BTreeMap, VecDeque};
8use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
13
14pub type VmIterHandle = Arc<Mutex<VmIter>>;
15
16fn range_initial_done(start: i64, end: i64, inclusive: bool) -> bool {
17 if inclusive {
18 start > end
19 } else {
20 start >= end
21 }
22}
23
24fn range_next(next: &mut i64, end: i64, inclusive: bool, done: &mut bool) -> Option<i64> {
25 if *done {
26 return None;
27 }
28 let value = *next;
29 let at_end = if inclusive {
30 value >= end
31 } else {
32 value
33 .checked_add(1)
34 .is_none_or(|candidate| candidate >= end)
35 };
36 if at_end {
37 *done = true;
38 } else {
39 *next += 1;
40 }
41 Some(value)
42}
43
44#[derive(Debug)]
45pub struct VmBroadcastState {
46 source: VmIterHandle,
47 buffer: Vec<VmValue>,
48 exhausted: bool,
49}
50
51#[derive(Debug)]
53pub enum VmIter {
54 Range {
56 next: i64,
57 end: i64,
58 inclusive: bool,
59 done: bool,
60 },
61 Vec {
63 items: Arc<Vec<VmValue>>,
64 idx: usize,
65 },
66 Dict {
68 entries: Arc<BTreeMap<String, VmValue>>,
69 keys: Vec<String>,
70 idx: usize,
71 },
72 Chars { s: Arc<str>, byte_idx: usize },
74 Gen { gen: Arc<VmGenerator> },
76 Stream { stream: Arc<VmStream> },
78 Chan { handle: Arc<VmChannelHandle> },
80 Map { inner: VmIterHandle, f: VmValue },
82 Tap { inner: VmIterHandle, f: VmValue },
84 Filter { inner: VmIterHandle, p: VmValue },
86 Scan {
88 inner: VmIterHandle,
89 acc: VmValue,
90 f: VmValue,
91 },
92 FlatMap {
94 inner: VmIterHandle,
95 f: VmValue,
96 cur: Option<VmIterHandle>,
97 },
98 Take {
100 inner: VmIterHandle,
101 remaining: usize,
102 },
103 Skip {
106 inner: VmIterHandle,
107 remaining: usize,
108 },
109 TakeWhile {
112 inner: VmIterHandle,
113 p: VmValue,
114 done: bool,
115 },
116 TakeUntil { inner: VmIterHandle, p: VmValue },
119 SkipWhile {
122 inner: VmIterHandle,
123 p: VmValue,
124 primed: bool,
125 },
126 Zip { a: VmIterHandle, b: VmIterHandle },
129 Enumerate { inner: VmIterHandle, i: i64 },
131 Chain {
133 a: VmIterHandle,
134 b: VmIterHandle,
135 on_a: bool,
136 },
137 Merge {
139 sources: Vec<Option<VmIterHandle>>,
140 cursor: usize,
141 },
142 Interleave {
144 sources: Vec<Option<VmIterHandle>>,
145 cursor: usize,
146 },
147 Race {
149 sources: Vec<Option<VmIterHandle>>,
150 winner: Option<VmIterHandle>,
151 },
152 Broadcast {
154 shared: Arc<Mutex<VmBroadcastState>>,
155 branch: usize,
156 index: usize,
157 },
158 Throttle {
160 inner: VmIterHandle,
161 interval_ms: u64,
162 next_ready: Option<tokio::time::Instant>,
163 },
164 Debounce { inner: VmIterHandle, window_ms: u64 },
167 Chunks { inner: VmIterHandle, n: usize },
170 Windows {
173 inner: VmIterHandle,
174 n: usize,
175 buf: VecDeque<VmValue>,
176 },
177 Exhausted,
179}
180
181impl VmIter {
182 pub fn next<'a>(
187 &'a mut self,
188 vm: &'a mut crate::vm::Vm,
189 ) -> std::pin::Pin<
190 Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + Send + 'a>,
191 > {
192 Box::pin(async move { self.next_impl(vm).await })
193 }
194
195 async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
196 match self {
197 VmIter::Exhausted => Ok(None),
198 VmIter::Range {
199 next,
200 end,
201 inclusive,
202 done,
203 } => {
204 if let Some(v) = range_next(next, *end, *inclusive, done) {
205 Ok(Some(VmValue::Int(v)))
206 } else {
207 *self = VmIter::Exhausted;
208 Ok(None)
209 }
210 }
211 VmIter::Vec { items, idx } => {
212 if *idx < items.len() {
213 let v = items[*idx].clone();
214 *idx += 1;
215 Ok(Some(v))
216 } else {
217 *self = VmIter::Exhausted;
218 Ok(None)
219 }
220 }
221 VmIter::Dict { entries, keys, idx } => {
222 if *idx < keys.len() {
223 let k = &keys[*idx];
224 let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
225 *idx += 1;
226 Ok(Some(VmValue::Pair(std::sync::Arc::new((
227 VmValue::String(std::sync::Arc::from(k.as_str())),
228 v,
229 )))))
230 } else {
231 *self = VmIter::Exhausted;
232 Ok(None)
233 }
234 }
235 VmIter::Chars { s, byte_idx } => {
236 if *byte_idx >= s.len() {
237 *self = VmIter::Exhausted;
238 return Ok(None);
239 }
240 let rest = &s[*byte_idx..];
241 if let Some(c) = rest.chars().next() {
242 *byte_idx += c.len_utf8();
243 let mut buf = [0u8; 4];
244 let encoded = c.encode_utf8(&mut buf);
245 Ok(Some(VmValue::String(std::sync::Arc::from(&*encoded))))
246 } else {
247 *self = VmIter::Exhausted;
248 Ok(None)
249 }
250 }
251 VmIter::Gen { gen } => {
252 if gen.is_done() {
253 *self = VmIter::Exhausted;
254 return Ok(None);
255 }
256 let rx = gen.receiver.clone();
257 let mut guard = rx.lock().await;
258 match guard.recv().await {
259 Some(Ok(v)) => Ok(Some(v)),
260 Some(Err(error)) => {
261 gen.mark_done();
262 drop(guard);
263 *self = VmIter::Exhausted;
264 Err(error)
265 }
266 None => {
267 gen.mark_done();
268 drop(guard);
269 *self = VmIter::Exhausted;
270 Ok(None)
271 }
272 }
273 }
274 VmIter::Stream { stream } => {
275 if stream.is_done() {
276 *self = VmIter::Exhausted;
277 return Ok(None);
278 }
279 let rx = stream.receiver.clone();
280 let mut guard = rx.lock().await;
281 match guard.recv().await {
282 Some(Ok(v)) => Ok(Some(v)),
283 Some(Err(error)) => {
284 stream.mark_done();
285 drop(guard);
286 *self = VmIter::Exhausted;
287 Err(error)
288 }
289 None => {
290 stream.mark_done();
291 drop(guard);
292 *self = VmIter::Exhausted;
293 Ok(None)
294 }
295 }
296 }
297 VmIter::Map { inner, f } => {
298 let f = f.clone();
299 let item = next_handle(inner, vm).await?;
300 match item {
301 None => {
302 *self = VmIter::Exhausted;
303 Ok(None)
304 }
305 Some(v) => {
306 let out = vm.call_callable_one(&f, &v).await?;
307 Ok(Some(out))
308 }
309 }
310 }
311 VmIter::Tap { inner, f } => {
312 let f = f.clone();
313 let item = next_handle(inner, vm).await?;
314 match item {
315 None => {
316 *self = VmIter::Exhausted;
317 Ok(None)
318 }
319 Some(v) => {
320 vm.call_callable_one(&f, &v).await?;
321 Ok(Some(v))
322 }
323 }
324 }
325 VmIter::Filter { inner, p } => {
326 let p = p.clone();
327 loop {
328 let item = next_handle(inner, vm).await?;
329 match item {
330 None => {
331 *self = VmIter::Exhausted;
332 return Ok(None);
333 }
334 Some(v) => {
335 let keep = vm.call_callable_one(&p, &v).await?;
336 if keep.is_truthy() {
337 return Ok(Some(v));
338 }
339 }
340 }
341 }
342 }
343 VmIter::Scan { inner, acc, f } => {
344 let f = f.clone();
345 let item = next_handle(inner, vm).await?;
346 match item {
347 None => {
348 *self = VmIter::Exhausted;
349 Ok(None)
350 }
351 Some(v) => {
352 let next_acc = vm.call_callable_two(&f, acc, &v).await?;
353 *acc = next_acc.clone();
354 Ok(Some(next_acc))
355 }
356 }
357 }
358 VmIter::FlatMap { inner, f, cur } => {
359 let f = f.clone();
360 loop {
361 if let Some(cur_iter) = cur.clone() {
362 let item = next_handle(&cur_iter, vm).await?;
363 if let Some(v) = item {
364 return Ok(Some(v));
365 }
366 *cur = None;
367 }
368 let item = next_handle(inner, vm).await?;
369 match item {
370 None => {
371 *self = VmIter::Exhausted;
372 return Ok(None);
373 }
374 Some(v) => {
375 let result = vm.call_callable_one(&f, &v).await?;
376 let lifted = iter_from_value(result)?;
377 if let VmValue::Iter(h) = lifted {
378 *cur = Some(h);
379 } else {
380 return Err(VmError::TypeError(
381 "flat_map: expected iterable result".to_string(),
382 ));
383 }
384 }
385 }
386 }
387 }
388 VmIter::Take { inner, remaining } => {
389 if *remaining == 0 {
390 *self = VmIter::Exhausted;
391 return Ok(None);
392 }
393 let item = next_handle(inner, vm).await?;
394 match item {
395 None => {
396 *self = VmIter::Exhausted;
397 Ok(None)
398 }
399 Some(v) => {
400 *remaining -= 1;
401 if *remaining == 0 {
402 *self = VmIter::Exhausted;
403 }
404 Ok(Some(v))
405 }
406 }
407 }
408 VmIter::Skip { inner, remaining } => {
409 while *remaining > 0 {
410 let item = next_handle(inner, vm).await?;
411 match item {
412 None => {
413 *self = VmIter::Exhausted;
414 return Ok(None);
415 }
416 Some(_) => {
417 *remaining -= 1;
418 }
419 }
420 }
421 let item = next_handle(inner, vm).await?;
422 match item {
423 None => {
424 *self = VmIter::Exhausted;
425 Ok(None)
426 }
427 Some(v) => Ok(Some(v)),
428 }
429 }
430 VmIter::TakeWhile { inner, p, done } => {
431 if *done {
432 return Ok(None);
433 }
434 let p = p.clone();
435 let item = next_handle(inner, vm).await?;
436 match item {
437 None => {
438 *self = VmIter::Exhausted;
439 Ok(None)
440 }
441 Some(v) => {
442 let keep = vm.call_callable_one(&p, &v).await?;
443 if keep.is_truthy() {
444 Ok(Some(v))
445 } else {
446 *self = VmIter::Exhausted;
447 Ok(None)
448 }
449 }
450 }
451 }
452 VmIter::TakeUntil { inner, p } => {
453 let p = p.clone();
454 let item = next_handle(inner, vm).await?;
455 match item {
456 None => {
457 *self = VmIter::Exhausted;
458 Ok(None)
459 }
460 Some(v) => {
461 let stop = vm.call_callable_one(&p, &v).await?;
462 if stop.is_truthy() {
463 *self = VmIter::Exhausted;
464 Ok(None)
465 } else {
466 Ok(Some(v))
467 }
468 }
469 }
470 }
471 VmIter::SkipWhile { inner, p, primed } => {
472 if *primed {
473 let item = next_handle(inner, vm).await?;
474 return match item {
475 None => {
476 *self = VmIter::Exhausted;
477 Ok(None)
478 }
479 Some(v) => Ok(Some(v)),
480 };
481 }
482 let p = p.clone();
483 loop {
484 let item = next_handle(inner, vm).await?;
485 match item {
486 None => {
487 *self = VmIter::Exhausted;
488 return Ok(None);
489 }
490 Some(v) => {
491 let drop_it = vm.call_callable_one(&p, &v).await?;
492 if !drop_it.is_truthy() {
493 *primed = true;
494 return Ok(Some(v));
495 }
496 }
497 }
498 }
499 }
500 VmIter::Zip { a, b } => {
501 let ia = next_handle(a, vm).await?;
502 let x = match ia {
503 None => {
504 *self = VmIter::Exhausted;
505 return Ok(None);
506 }
507 Some(v) => v,
508 };
509 let ib = next_handle(b, vm).await?;
510 let y = match ib {
511 None => {
512 *self = VmIter::Exhausted;
513 return Ok(None);
514 }
515 Some(v) => v,
516 };
517 Ok(Some(VmValue::Pair(std::sync::Arc::new((x, y)))))
518 }
519 VmIter::Enumerate { inner, i } => {
520 let item = next_handle(inner, vm).await?;
521 match item {
522 None => {
523 *self = VmIter::Exhausted;
524 Ok(None)
525 }
526 Some(v) => {
527 let idx = *i;
528 *i += 1;
529 Ok(Some(VmValue::Pair(std::sync::Arc::new((
530 VmValue::Int(idx),
531 v,
532 )))))
533 }
534 }
535 }
536 VmIter::Chain { a, b, on_a } => {
537 if *on_a {
538 let item = next_handle(a, vm).await?;
539 if let Some(v) = item {
540 return Ok(Some(v));
541 }
542 *on_a = false;
543 }
544 let item = next_handle(b, vm).await?;
545 match item {
546 None => {
547 *self = VmIter::Exhausted;
548 Ok(None)
549 }
550 Some(v) => Ok(Some(v)),
551 }
552 }
553 VmIter::Merge { sources, cursor } => loop {
554 if sources.is_empty() || sources.iter().all(Option::is_none) {
555 *self = VmIter::Exhausted;
556 return Ok(None);
557 }
558 let len = sources.len();
559 let mut live = 0usize;
560 for offset in 0..len {
561 let idx = (*cursor + offset) % len;
562 let Some(handle) = sources[idx].clone() else {
563 continue;
564 };
565 match try_next_ready(&handle, vm).await? {
566 Some(v) => {
567 *cursor = (idx + 1) % len;
568 return Ok(Some(v));
569 }
570 None => {
571 if is_exhausted_handle(&handle) {
572 sources[idx] = None;
573 } else {
574 live += 1;
575 }
576 }
577 }
578 }
579 if live == 0 {
580 *self = VmIter::Exhausted;
581 return Ok(None);
582 }
583 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
584 },
585 VmIter::Interleave { sources, cursor } => {
586 if sources.is_empty() || sources.iter().all(Option::is_none) {
587 *self = VmIter::Exhausted;
588 return Ok(None);
589 }
590 let len = sources.len();
591 for offset in 0..len {
592 let idx = (*cursor + offset) % len;
593 let Some(handle) = sources[idx].clone() else {
594 continue;
595 };
596 match next_handle(&handle, vm).await? {
597 Some(v) => {
598 *cursor = (idx + 1) % len;
599 return Ok(Some(v));
600 }
601 None => {
602 sources[idx] = None;
603 }
604 }
605 }
606 *self = VmIter::Exhausted;
607 Ok(None)
608 }
609 VmIter::Race { sources, winner } => {
610 if let Some(handle) = winner.clone() {
611 let item = next_handle(&handle, vm).await?;
612 return match item {
613 Some(v) => Ok(Some(v)),
614 None => {
615 *self = VmIter::Exhausted;
616 Ok(None)
617 }
618 };
619 }
620 loop {
621 let mut live = 0usize;
622 for source in sources.iter_mut() {
623 let Some(handle) = source.clone() else {
624 continue;
625 };
626 match try_next_ready(&handle, vm).await? {
627 Some(v) => {
628 *winner = Some(handle);
629 sources.clear();
630 return Ok(Some(v));
631 }
632 None => {
633 if is_exhausted_handle(&handle) {
634 *source = None;
635 } else {
636 live += 1;
637 }
638 }
639 }
640 }
641 if live == 0 {
642 *self = VmIter::Exhausted;
643 return Ok(None);
644 }
645 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
646 }
647 }
648 VmIter::Broadcast {
649 shared,
650 branch,
651 index,
652 } => {
653 let _ = branch;
654 loop {
655 let mut state = std::mem::replace(&mut *shared.lock(), empty_broadcast_state());
656 if *index < state.buffer.len() {
657 let item = state.buffer[*index].clone();
658 *index += 1;
659 *shared.lock() = state;
660 return Ok(Some(item));
661 }
662 if state.exhausted {
663 *shared.lock() = state;
664 *self = VmIter::Exhausted;
665 return Ok(None);
666 }
667 let next = next_handle(&state.source, vm).await;
668 match next {
669 Err(err) => {
670 *shared.lock() = state;
671 return Err(err);
672 }
673 Ok(Some(v)) => {
674 state.buffer.push(v);
675 *shared.lock() = state;
676 }
677 Ok(None) => {
678 state.exhausted = true;
679 *shared.lock() = state;
680 }
681 }
682 }
683 }
684 VmIter::Throttle {
685 inner,
686 interval_ms,
687 next_ready,
688 } => {
689 if let Some(ready_at) = next_ready.take() {
690 let now = tokio::time::Instant::now();
691 if ready_at > now {
692 tokio::time::sleep_until(ready_at).await;
693 }
694 }
695 let item = next_handle(inner, vm).await?;
696 match item {
697 None => {
698 *self = VmIter::Exhausted;
699 Ok(None)
700 }
701 Some(v) => {
702 *next_ready = Some(
703 tokio::time::Instant::now()
704 + tokio::time::Duration::from_millis(*interval_ms),
705 );
706 Ok(Some(v))
707 }
708 }
709 }
710 VmIter::Debounce { inner, window_ms } => {
711 let mut last = match next_handle(inner, vm).await? {
712 Some(v) => v,
713 None => {
714 *self = VmIter::Exhausted;
715 return Ok(None);
716 }
717 };
718 if *window_ms > 0 {
719 tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
720 }
721 while let Some(v) = try_next_ready(inner, vm).await? {
722 last = v;
723 }
724 Ok(Some(last))
725 }
726 VmIter::Chunks { inner, n } => {
727 let n = *n;
728 let mut batch: Vec<VmValue> = Vec::with_capacity(n);
729 for _ in 0..n {
730 let item = next_handle(inner, vm).await?;
731 match item {
732 Some(v) => {
733 batch.push(v);
734 }
735 None => break,
736 }
737 }
738 if batch.is_empty() {
739 *self = VmIter::Exhausted;
740 Ok(None)
741 } else {
742 Ok(Some(VmValue::List(std::sync::Arc::new(batch))))
743 }
744 }
745 VmIter::Windows { inner, n, buf } => {
746 let n = *n;
747 if buf.is_empty() {
748 while buf.len() < n {
749 let item = next_handle(inner, vm).await?;
750 match item {
751 Some(v) => buf.push_back(v),
752 None => {
753 *self = VmIter::Exhausted;
754 return Ok(None);
755 }
756 }
757 }
758 } else {
759 let item = next_handle(inner, vm).await?;
760 match item {
761 Some(v) => {
762 buf.pop_front();
763 buf.push_back(v);
764 }
765 None => {
766 *self = VmIter::Exhausted;
767 return Ok(None);
768 }
769 }
770 }
771 let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
772 Ok(Some(VmValue::List(std::sync::Arc::new(snapshot))))
773 }
774 VmIter::Chan { handle } => {
775 let is_closed = handle.is_closed();
776 let rx = handle.receiver.clone();
777 let mut closed_rx = handle.subscribe_closed();
778 let mut guard = rx.lock().await;
779 let item = if is_closed {
780 guard.try_recv().ok()
781 } else {
782 tokio::select! {
783 item = guard.recv() => item,
784 _ = closed_rx.changed() => guard.try_recv().ok(),
785 }
786 };
787 match item {
788 Some(v) => Ok(Some(v)),
789 None => {
790 drop(guard);
791 *self = VmIter::Exhausted;
792 Ok(None)
793 }
794 }
795 }
796 }
797 }
798}
799
800pub async fn next_handle(
809 handle: &VmIterHandle,
810 vm: &mut crate::vm::Vm,
811) -> Result<Option<VmValue>, VmError> {
812 let mut state = std::mem::replace(&mut *handle.lock(), VmIter::Exhausted);
813 let result = state.next(vm).await;
814 *handle.lock() = state;
816 result
817}
818
819pub async fn drain(handle: &VmIterHandle, vm: &mut crate::vm::Vm) -> Result<Vec<VmValue>, VmError> {
821 let mut out = Vec::new();
822 loop {
823 let v = next_handle(handle, vm).await?;
824 match v {
825 Some(v) => out.push(v),
826 None => break,
827 }
828 }
829 Ok(out)
830}
831
832pub async fn drain_capped(
835 handle: &VmIterHandle,
836 vm: &mut crate::vm::Vm,
837 max: usize,
838) -> Result<Vec<VmValue>, VmError> {
839 let mut out = Vec::new();
840 loop {
841 let v = next_handle(handle, vm).await?;
842 match v {
843 Some(v) => {
844 if out.len() >= max {
845 return Err(VmError::Runtime(format!(
846 "stream.collect: max cap {max} exceeded"
847 )));
848 }
849 out.push(v);
850 }
851 None => break,
852 }
853 }
854 Ok(out)
855}
856
857pub fn iter_handle_from_value(v: VmValue) -> Result<VmIterHandle, VmError> {
858 match iter_from_value(v)? {
859 VmValue::Iter(handle) => Ok(handle),
860 _ => unreachable!("iter_from_value returns Iter"),
861 }
862}
863
864pub fn broadcast_branches(source: VmIterHandle, n: usize) -> Vec<VmValue> {
865 let shared = Arc::new(Mutex::new(VmBroadcastState {
866 source,
867 buffer: Vec::new(),
868 exhausted: false,
869 }));
870 (0..n)
871 .map(|branch| {
872 VmValue::Iter(Arc::new(Mutex::new(VmIter::Broadcast {
873 shared: Arc::clone(&shared),
874 branch,
875 index: 0,
876 })))
877 })
878 .collect()
879}
880
881fn empty_broadcast_state() -> VmBroadcastState {
882 VmBroadcastState {
883 source: Arc::new(Mutex::new(VmIter::Exhausted)),
884 buffer: Vec::new(),
885 exhausted: true,
886 }
887}
888
889fn try_next_ready<'a>(
890 handle: &'a VmIterHandle,
891 vm: &'a mut crate::vm::Vm,
892) -> std::pin::Pin<
893 Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + Send + 'a>,
894> {
895 Box::pin(async move {
896 let mut state = std::mem::replace(&mut *handle.lock(), VmIter::Exhausted);
897 let result = state.try_next_ready_impl(vm).await;
898 *handle.lock() = state;
899 result
900 })
901}
902
903fn is_exhausted_handle(handle: &VmIterHandle) -> bool {
904 matches!(&*handle.lock(), VmIter::Exhausted)
905}
906
907impl VmIter {
908 async fn try_next_ready_impl(
909 &mut self,
910 vm: &mut crate::vm::Vm,
911 ) -> Result<Option<VmValue>, VmError> {
912 match self {
913 VmIter::Exhausted => Ok(None),
914 VmIter::Map { inner, f } => {
915 let f = f.clone();
916 match try_next_ready(inner, vm).await? {
917 Some(v) => Ok(Some(vm.call_callable_one(&f, &v).await?)),
918 None => {
919 if is_exhausted_handle(inner) {
920 *self = VmIter::Exhausted;
921 }
922 Ok(None)
923 }
924 }
925 }
926 VmIter::Tap { inner, f } => {
927 let f = f.clone();
928 match try_next_ready(inner, vm).await? {
929 Some(v) => {
930 vm.call_callable_one(&f, &v).await?;
931 Ok(Some(v))
932 }
933 None => {
934 if is_exhausted_handle(inner) {
935 *self = VmIter::Exhausted;
936 }
937 Ok(None)
938 }
939 }
940 }
941 VmIter::Filter { inner, p } => {
942 let p = p.clone();
943 loop {
944 match try_next_ready(inner, vm).await? {
945 Some(v) => {
946 let keep = vm.call_callable_one(&p, &v).await?;
947 if keep.is_truthy() {
948 return Ok(Some(v));
949 }
950 }
951 None => {
952 if is_exhausted_handle(inner) {
953 *self = VmIter::Exhausted;
954 }
955 return Ok(None);
956 }
957 }
958 }
959 }
960 VmIter::Scan { inner, acc, f } => {
961 let f = f.clone();
962 match try_next_ready(inner, vm).await? {
963 Some(v) => {
964 let next_acc = vm.call_callable_two(&f, acc, &v).await?;
965 *acc = next_acc.clone();
966 Ok(Some(next_acc))
967 }
968 None => {
969 if is_exhausted_handle(inner) {
970 *self = VmIter::Exhausted;
971 }
972 Ok(None)
973 }
974 }
975 }
976 VmIter::FlatMap { inner, f, cur } => {
977 let f = f.clone();
978 loop {
979 if let Some(cur_iter) = cur.clone() {
980 match try_next_ready(&cur_iter, vm).await? {
981 Some(v) => return Ok(Some(v)),
982 None => {
983 if is_exhausted_handle(&cur_iter) {
984 *cur = None;
985 }
986 return Ok(None);
987 }
988 }
989 }
990 match try_next_ready(inner, vm).await? {
991 Some(v) => {
992 let result = vm.call_callable_one(&f, &v).await?;
993 *cur = Some(iter_handle_from_value(result)?);
994 }
995 None => {
996 if is_exhausted_handle(inner) {
997 *self = VmIter::Exhausted;
998 }
999 return Ok(None);
1000 }
1001 }
1002 }
1003 }
1004 VmIter::Take { inner, remaining } => {
1005 if *remaining == 0 {
1006 *self = VmIter::Exhausted;
1007 return Ok(None);
1008 }
1009 match try_next_ready(inner, vm).await? {
1010 Some(v) => {
1011 *remaining -= 1;
1012 if *remaining == 0 {
1013 *self = VmIter::Exhausted;
1014 }
1015 Ok(Some(v))
1016 }
1017 None => {
1018 if is_exhausted_handle(inner) {
1019 *self = VmIter::Exhausted;
1020 }
1021 Ok(None)
1022 }
1023 }
1024 }
1025 VmIter::TakeUntil { inner, p } => {
1026 let p = p.clone();
1027 match try_next_ready(inner, vm).await? {
1028 Some(v) => {
1029 let stop = vm.call_callable_one(&p, &v).await?;
1030 if stop.is_truthy() {
1031 *self = VmIter::Exhausted;
1032 Ok(None)
1033 } else {
1034 Ok(Some(v))
1035 }
1036 }
1037 None => {
1038 if is_exhausted_handle(inner) {
1039 *self = VmIter::Exhausted;
1040 }
1041 Ok(None)
1042 }
1043 }
1044 }
1045 VmIter::Throttle {
1046 inner,
1047 interval_ms,
1048 next_ready,
1049 } => {
1050 if let Some(ready_at) = *next_ready {
1051 if ready_at > tokio::time::Instant::now() {
1052 return Ok(None);
1053 }
1054 }
1055 match try_next_ready(inner, vm).await? {
1056 Some(v) => {
1057 *next_ready = Some(
1058 tokio::time::Instant::now()
1059 + tokio::time::Duration::from_millis(*interval_ms),
1060 );
1061 Ok(Some(v))
1062 }
1063 None => {
1064 if is_exhausted_handle(inner) {
1065 *self = VmIter::Exhausted;
1066 }
1067 Ok(None)
1068 }
1069 }
1070 }
1071 VmIter::Range { .. }
1072 | VmIter::Vec { .. }
1073 | VmIter::Dict { .. }
1074 | VmIter::Chars { .. }
1075 | VmIter::Skip { .. }
1076 | VmIter::TakeWhile { .. }
1077 | VmIter::SkipWhile { .. }
1078 | VmIter::Zip { .. }
1079 | VmIter::Enumerate { .. }
1080 | VmIter::Chain { .. }
1081 | VmIter::Merge { .. }
1082 | VmIter::Interleave { .. }
1083 | VmIter::Race { .. }
1084 | VmIter::Broadcast { .. }
1085 | VmIter::Debounce { .. }
1086 | VmIter::Chunks { .. }
1087 | VmIter::Windows { .. } => self.next(vm).await,
1088 VmIter::Gen { gen } => {
1089 if gen.is_done() {
1090 *self = VmIter::Exhausted;
1091 return Ok(None);
1092 }
1093 let rx = gen.receiver.clone();
1094 let result = match rx.try_lock() {
1095 Ok(mut guard) => match guard.try_recv() {
1096 Ok(Ok(v)) => Ok(Some(v)),
1097 Ok(Err(error)) => {
1098 gen.mark_done();
1099 *self = VmIter::Exhausted;
1100 Err(error)
1101 }
1102 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1103 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1104 gen.mark_done();
1105 *self = VmIter::Exhausted;
1106 Ok(None)
1107 }
1108 },
1109 Err(_) => Ok(None),
1110 };
1111 result
1112 }
1113 VmIter::Stream { stream } => {
1114 if stream.is_done() {
1115 *self = VmIter::Exhausted;
1116 return Ok(None);
1117 }
1118 let rx = stream.receiver.clone();
1119 let result = match rx.try_lock() {
1120 Ok(mut guard) => match guard.try_recv() {
1121 Ok(Ok(v)) => Ok(Some(v)),
1122 Ok(Err(error)) => {
1123 stream.mark_done();
1124 *self = VmIter::Exhausted;
1125 Err(error)
1126 }
1127 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1128 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1129 stream.mark_done();
1130 *self = VmIter::Exhausted;
1131 Ok(None)
1132 }
1133 },
1134 Err(_) => Ok(None),
1135 };
1136 result
1137 }
1138 VmIter::Chan { handle } => {
1139 let rx = handle.receiver.clone();
1140 let result = match rx.try_lock() {
1141 Ok(mut guard) => match guard.try_recv() {
1142 Ok(v) => Ok(Some(v)),
1143 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1144 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1145 *self = VmIter::Exhausted;
1146 Ok(None)
1147 }
1148 },
1149 Err(_) => Ok(None),
1150 };
1151 result
1152 }
1153 }
1154 }
1155}
1156
1157pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1160 let inner = match v {
1161 VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1162 VmValue::Range(r) => VmIter::Range {
1163 next: r.start,
1164 end: r.end,
1165 inclusive: r.inclusive,
1166 done: range_initial_done(r.start, r.end, r.inclusive),
1167 },
1168 VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1169 VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1170 VmValue::Dict(entries) => {
1171 let keys: Vec<String> = entries.keys().cloned().collect();
1172 VmIter::Dict {
1173 entries,
1174 keys,
1175 idx: 0,
1176 }
1177 }
1178 VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1179 VmValue::Generator(gen) => VmIter::Gen { gen },
1180 VmValue::Stream(stream) => VmIter::Stream { stream },
1181 VmValue::Channel(handle) => VmIter::Chan { handle },
1182 other => {
1183 return Err(VmError::TypeError(format!(
1184 "iter: value of type {} is not iterable",
1185 other.type_name()
1186 )))
1187 }
1188 };
1189 Ok(VmValue::Iter(Arc::new(Mutex::new(inner))))
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194 use super::*;
1195 use crate::value::VmRange;
1196
1197 fn run_iter_test(test: impl std::future::Future<Output = ()>) {
1198 let rt = tokio::runtime::Builder::new_current_thread()
1199 .enable_all()
1200 .build()
1201 .unwrap();
1202 rt.block_on(test);
1203 }
1204
1205 #[test]
1206 fn inclusive_range_at_i64_max_yields_the_endpoint() {
1207 run_iter_test(async {
1208 let mut vm = crate::vm::Vm::new();
1209 let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1210 start: i64::MAX,
1211 end: i64::MAX,
1212 inclusive: true,
1213 }))
1214 .unwrap() else {
1215 panic!("expected iter");
1216 };
1217
1218 let first = next_handle(&handle, &mut vm).await.unwrap();
1219 assert!(matches!(first, Some(VmValue::Int(value)) if value == i64::MAX));
1220 assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1221 });
1222 }
1223
1224 #[test]
1225 fn exclusive_range_at_i64_max_is_empty() {
1226 run_iter_test(async {
1227 let mut vm = crate::vm::Vm::new();
1228 let VmValue::Iter(handle) = iter_from_value(VmValue::Range(VmRange {
1229 start: i64::MAX,
1230 end: i64::MAX,
1231 inclusive: false,
1232 }))
1233 .unwrap() else {
1234 panic!("expected iter");
1235 };
1236
1237 assert!(next_handle(&handle, &mut vm).await.unwrap().is_none());
1238 });
1239 }
1240}