1use std::cell::RefCell;
11use std::collections::{BTreeMap, VecDeque};
12use std::rc::Rc;
13
14use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
15
16#[derive(Debug)]
17pub struct VmBroadcastState {
18 source: Rc<RefCell<VmIter>>,
19 buffer: Vec<VmValue>,
20 exhausted: bool,
21}
22
23#[derive(Debug)]
25pub enum VmIter {
26 Range { next: i64, stop: i64 },
30 Vec { items: Rc<Vec<VmValue>>, idx: usize },
32 Dict {
35 entries: Rc<BTreeMap<String, VmValue>>,
36 keys: Vec<String>,
37 idx: usize,
38 },
39 Chars { s: Rc<str>, byte_idx: usize },
41 Gen { gen: VmGenerator },
43 Stream { stream: VmStream },
45 Chan { handle: VmChannelHandle },
47 Map {
49 inner: Rc<RefCell<VmIter>>,
50 f: VmValue,
51 },
52 Tap {
54 inner: Rc<RefCell<VmIter>>,
55 f: VmValue,
56 },
57 Filter {
59 inner: Rc<RefCell<VmIter>>,
60 p: VmValue,
61 },
62 Scan {
64 inner: Rc<RefCell<VmIter>>,
65 acc: VmValue,
66 f: VmValue,
67 },
68 FlatMap {
70 inner: Rc<RefCell<VmIter>>,
71 f: VmValue,
72 cur: Option<Rc<RefCell<VmIter>>>,
73 },
74 Take {
76 inner: Rc<RefCell<VmIter>>,
77 remaining: usize,
78 },
79 Skip {
82 inner: Rc<RefCell<VmIter>>,
83 remaining: usize,
84 },
85 TakeWhile {
88 inner: Rc<RefCell<VmIter>>,
89 p: VmValue,
90 done: bool,
91 },
92 TakeUntil {
95 inner: Rc<RefCell<VmIter>>,
96 p: VmValue,
97 },
98 SkipWhile {
101 inner: Rc<RefCell<VmIter>>,
102 p: VmValue,
103 primed: bool,
104 },
105 Zip {
108 a: Rc<RefCell<VmIter>>,
109 b: Rc<RefCell<VmIter>>,
110 },
111 Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
113 Chain {
115 a: Rc<RefCell<VmIter>>,
116 b: Rc<RefCell<VmIter>>,
117 on_a: bool,
118 },
119 Merge {
121 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
122 cursor: usize,
123 },
124 Interleave {
126 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
127 cursor: usize,
128 },
129 Race {
131 sources: Vec<Option<Rc<RefCell<VmIter>>>>,
132 winner: Option<Rc<RefCell<VmIter>>>,
133 },
134 Broadcast {
136 shared: Rc<RefCell<VmBroadcastState>>,
137 branch: usize,
138 index: usize,
139 },
140 Throttle {
142 inner: Rc<RefCell<VmIter>>,
143 interval_ms: u64,
144 next_ready: Option<tokio::time::Instant>,
145 },
146 Debounce {
149 inner: Rc<RefCell<VmIter>>,
150 window_ms: u64,
151 },
152 Chunks {
155 inner: Rc<RefCell<VmIter>>,
156 n: usize,
157 },
158 Windows {
161 inner: Rc<RefCell<VmIter>>,
162 n: usize,
163 buf: VecDeque<VmValue>,
164 },
165 Exhausted,
167}
168
169impl VmIter {
170 pub fn next<'a>(
175 &'a mut self,
176 vm: &'a mut crate::vm::Vm,
177 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
178 {
179 Box::pin(async move { self.next_impl(vm).await })
180 }
181
182 async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
183 match self {
184 VmIter::Exhausted => Ok(None),
185 VmIter::Range { next, stop } => {
186 if *next < *stop {
187 let v = *next;
188 *next += 1;
189 Ok(Some(VmValue::Int(v)))
190 } else {
191 *self = VmIter::Exhausted;
192 Ok(None)
193 }
194 }
195 VmIter::Vec { items, idx } => {
196 if *idx < items.len() {
197 let v = items[*idx].clone();
198 *idx += 1;
199 Ok(Some(v))
200 } else {
201 *self = VmIter::Exhausted;
202 Ok(None)
203 }
204 }
205 VmIter::Dict { entries, keys, idx } => {
206 if *idx < keys.len() {
207 let k = &keys[*idx];
208 let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
209 *idx += 1;
210 Ok(Some(VmValue::Pair(Rc::new((
211 VmValue::String(Rc::from(k.as_str())),
212 v,
213 )))))
214 } else {
215 *self = VmIter::Exhausted;
216 Ok(None)
217 }
218 }
219 VmIter::Chars { s, byte_idx } => {
220 if *byte_idx >= s.len() {
221 *self = VmIter::Exhausted;
222 return Ok(None);
223 }
224 let rest = &s[*byte_idx..];
225 if let Some(c) = rest.chars().next() {
226 *byte_idx += c.len_utf8();
227 Ok(Some(VmValue::String(Rc::from(c.to_string().as_str()))))
228 } else {
229 *self = VmIter::Exhausted;
230 Ok(None)
231 }
232 }
233 VmIter::Gen { gen } => {
234 if gen.done.get() {
235 *self = VmIter::Exhausted;
236 return Ok(None);
237 }
238 let rx = gen.receiver.clone();
239 let mut guard = rx.lock().await;
240 match guard.recv().await {
241 Some(Ok(v)) => Ok(Some(v)),
242 Some(Err(error)) => {
243 gen.done.set(true);
244 drop(guard);
245 *self = VmIter::Exhausted;
246 Err(error)
247 }
248 None => {
249 gen.done.set(true);
250 drop(guard);
251 *self = VmIter::Exhausted;
252 Ok(None)
253 }
254 }
255 }
256 VmIter::Stream { stream } => {
257 if stream.done.get() {
258 *self = VmIter::Exhausted;
259 return Ok(None);
260 }
261 let rx = stream.receiver.clone();
262 let mut guard = rx.lock().await;
263 match guard.recv().await {
264 Some(Ok(v)) => Ok(Some(v)),
265 Some(Err(error)) => {
266 stream.done.set(true);
267 drop(guard);
268 *self = VmIter::Exhausted;
269 Err(error)
270 }
271 None => {
272 stream.done.set(true);
273 drop(guard);
274 *self = VmIter::Exhausted;
275 Ok(None)
276 }
277 }
278 }
279 VmIter::Map { inner, f } => {
280 let f = f.clone();
281 let item = next_handle(inner, vm).await?;
282 match item {
283 None => {
284 *self = VmIter::Exhausted;
285 Ok(None)
286 }
287 Some(v) => {
288 let out = vm.call_callable_value(&f, &[v]).await?;
289 Ok(Some(out))
290 }
291 }
292 }
293 VmIter::Tap { inner, f } => {
294 let f = f.clone();
295 let item = next_handle(inner, vm).await?;
296 match item {
297 None => {
298 *self = VmIter::Exhausted;
299 Ok(None)
300 }
301 Some(v) => {
302 vm.call_callable_value(&f, &[v.clone()]).await?;
303 Ok(Some(v))
304 }
305 }
306 }
307 VmIter::Filter { inner, p } => {
308 let p = p.clone();
309 loop {
310 let item = next_handle(inner, vm).await?;
311 match item {
312 None => {
313 *self = VmIter::Exhausted;
314 return Ok(None);
315 }
316 Some(v) => {
317 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
318 if keep.is_truthy() {
319 return Ok(Some(v));
320 }
321 }
322 }
323 }
324 }
325 VmIter::Scan { inner, acc, f } => {
326 let f = f.clone();
327 let item = next_handle(inner, vm).await?;
328 match item {
329 None => {
330 *self = VmIter::Exhausted;
331 Ok(None)
332 }
333 Some(v) => {
334 let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
335 *acc = next_acc.clone();
336 Ok(Some(next_acc))
337 }
338 }
339 }
340 VmIter::FlatMap { inner, f, cur } => {
341 let f = f.clone();
342 loop {
343 if let Some(cur_iter) = cur.clone() {
344 let item = next_handle(&cur_iter, vm).await?;
345 if let Some(v) = item {
346 return Ok(Some(v));
347 }
348 *cur = None;
349 }
350 let item = next_handle(inner, vm).await?;
351 match item {
352 None => {
353 *self = VmIter::Exhausted;
354 return Ok(None);
355 }
356 Some(v) => {
357 let result = vm.call_callable_value(&f, &[v]).await?;
358 let lifted = iter_from_value(result)?;
359 if let VmValue::Iter(h) = lifted {
360 *cur = Some(h);
361 } else {
362 return Err(VmError::TypeError(
363 "flat_map: expected iterable result".to_string(),
364 ));
365 }
366 }
367 }
368 }
369 }
370 VmIter::Take { inner, remaining } => {
371 if *remaining == 0 {
372 *self = VmIter::Exhausted;
373 return Ok(None);
374 }
375 let item = next_handle(inner, vm).await?;
376 match item {
377 None => {
378 *self = VmIter::Exhausted;
379 Ok(None)
380 }
381 Some(v) => {
382 *remaining -= 1;
383 if *remaining == 0 {
384 *self = VmIter::Exhausted;
385 }
386 Ok(Some(v))
387 }
388 }
389 }
390 VmIter::Skip { inner, remaining } => {
391 while *remaining > 0 {
392 let item = next_handle(inner, vm).await?;
393 match item {
394 None => {
395 *self = VmIter::Exhausted;
396 return Ok(None);
397 }
398 Some(_) => {
399 *remaining -= 1;
400 }
401 }
402 }
403 let item = next_handle(inner, vm).await?;
404 match item {
405 None => {
406 *self = VmIter::Exhausted;
407 Ok(None)
408 }
409 Some(v) => Ok(Some(v)),
410 }
411 }
412 VmIter::TakeWhile { inner, p, done } => {
413 if *done {
414 return Ok(None);
415 }
416 let p = p.clone();
417 let item = next_handle(inner, vm).await?;
418 match item {
419 None => {
420 *self = VmIter::Exhausted;
421 Ok(None)
422 }
423 Some(v) => {
424 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
425 if keep.is_truthy() {
426 Ok(Some(v))
427 } else {
428 *self = VmIter::Exhausted;
429 Ok(None)
430 }
431 }
432 }
433 }
434 VmIter::TakeUntil { inner, p } => {
435 let p = p.clone();
436 let item = next_handle(inner, vm).await?;
437 match item {
438 None => {
439 *self = VmIter::Exhausted;
440 Ok(None)
441 }
442 Some(v) => {
443 let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
444 if stop.is_truthy() {
445 *self = VmIter::Exhausted;
446 Ok(None)
447 } else {
448 Ok(Some(v))
449 }
450 }
451 }
452 }
453 VmIter::SkipWhile { inner, p, primed } => {
454 if *primed {
455 let item = next_handle(inner, vm).await?;
456 return match item {
457 None => {
458 *self = VmIter::Exhausted;
459 Ok(None)
460 }
461 Some(v) => Ok(Some(v)),
462 };
463 }
464 let p = p.clone();
465 loop {
466 let item = next_handle(inner, vm).await?;
467 match item {
468 None => {
469 *self = VmIter::Exhausted;
470 return Ok(None);
471 }
472 Some(v) => {
473 let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
474 if !drop_it.is_truthy() {
475 *primed = true;
476 return Ok(Some(v));
477 }
478 }
479 }
480 }
481 }
482 VmIter::Zip { a, b } => {
483 let ia = next_handle(a, vm).await?;
484 let x = match ia {
485 None => {
486 *self = VmIter::Exhausted;
487 return Ok(None);
488 }
489 Some(v) => v,
490 };
491 let ib = next_handle(b, vm).await?;
492 let y = match ib {
493 None => {
494 *self = VmIter::Exhausted;
495 return Ok(None);
496 }
497 Some(v) => v,
498 };
499 Ok(Some(VmValue::Pair(Rc::new((x, y)))))
500 }
501 VmIter::Enumerate { inner, i } => {
502 let item = next_handle(inner, vm).await?;
503 match item {
504 None => {
505 *self = VmIter::Exhausted;
506 Ok(None)
507 }
508 Some(v) => {
509 let idx = *i;
510 *i += 1;
511 Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
512 }
513 }
514 }
515 VmIter::Chain { a, b, on_a } => {
516 if *on_a {
517 let item = next_handle(a, vm).await?;
518 if let Some(v) = item {
519 return Ok(Some(v));
520 }
521 *on_a = false;
522 }
523 let item = next_handle(b, vm).await?;
524 match item {
525 None => {
526 *self = VmIter::Exhausted;
527 Ok(None)
528 }
529 Some(v) => Ok(Some(v)),
530 }
531 }
532 VmIter::Merge { sources, cursor } => loop {
533 if sources.is_empty() || sources.iter().all(Option::is_none) {
534 *self = VmIter::Exhausted;
535 return Ok(None);
536 }
537 let len = sources.len();
538 let mut live = 0usize;
539 for offset in 0..len {
540 let idx = (*cursor + offset) % len;
541 let Some(handle) = sources[idx].clone() else {
542 continue;
543 };
544 match try_next_ready(&handle, vm).await? {
545 Some(v) => {
546 *cursor = (idx + 1) % len;
547 return Ok(Some(v));
548 }
549 None => {
550 if is_exhausted_handle(&handle) {
551 sources[idx] = None;
552 } else {
553 live += 1;
554 }
555 }
556 }
557 }
558 if live == 0 {
559 *self = VmIter::Exhausted;
560 return Ok(None);
561 }
562 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
563 },
564 VmIter::Interleave { sources, cursor } => {
565 if sources.is_empty() || sources.iter().all(Option::is_none) {
566 *self = VmIter::Exhausted;
567 return Ok(None);
568 }
569 let len = sources.len();
570 for offset in 0..len {
571 let idx = (*cursor + offset) % len;
572 let Some(handle) = sources[idx].clone() else {
573 continue;
574 };
575 match next_handle(&handle, vm).await? {
576 Some(v) => {
577 *cursor = (idx + 1) % len;
578 return Ok(Some(v));
579 }
580 None => {
581 sources[idx] = None;
582 }
583 }
584 }
585 *self = VmIter::Exhausted;
586 Ok(None)
587 }
588 VmIter::Race { sources, winner } => {
589 if let Some(handle) = winner.clone() {
590 let item = next_handle(&handle, vm).await?;
591 return match item {
592 Some(v) => Ok(Some(v)),
593 None => {
594 *self = VmIter::Exhausted;
595 Ok(None)
596 }
597 };
598 }
599 loop {
600 let mut live = 0usize;
601 for source in sources.iter_mut() {
602 let Some(handle) = source.clone() else {
603 continue;
604 };
605 match try_next_ready(&handle, vm).await? {
606 Some(v) => {
607 *winner = Some(handle);
608 sources.clear();
609 return Ok(Some(v));
610 }
611 None => {
612 if is_exhausted_handle(&handle) {
613 *source = None;
614 } else {
615 live += 1;
616 }
617 }
618 }
619 }
620 if live == 0 {
621 *self = VmIter::Exhausted;
622 return Ok(None);
623 }
624 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
625 }
626 }
627 VmIter::Broadcast {
628 shared,
629 branch,
630 index,
631 } => {
632 let _ = branch;
633 loop {
634 let mut state =
635 std::mem::replace(&mut *shared.borrow_mut(), empty_broadcast_state());
636 if *index < state.buffer.len() {
637 let item = state.buffer[*index].clone();
638 *index += 1;
639 *shared.borrow_mut() = state;
640 return Ok(Some(item));
641 }
642 if state.exhausted {
643 *shared.borrow_mut() = state;
644 *self = VmIter::Exhausted;
645 return Ok(None);
646 }
647 let next = next_handle(&state.source, vm).await;
648 match next {
649 Err(err) => {
650 *shared.borrow_mut() = state;
651 return Err(err);
652 }
653 Ok(Some(v)) => {
654 state.buffer.push(v);
655 *shared.borrow_mut() = state;
656 }
657 Ok(None) => {
658 state.exhausted = true;
659 *shared.borrow_mut() = state;
660 }
661 }
662 }
663 }
664 VmIter::Throttle {
665 inner,
666 interval_ms,
667 next_ready,
668 } => {
669 if let Some(ready_at) = next_ready.take() {
670 let now = tokio::time::Instant::now();
671 if ready_at > now {
672 tokio::time::sleep_until(ready_at).await;
673 }
674 }
675 let item = next_handle(inner, vm).await?;
676 match item {
677 None => {
678 *self = VmIter::Exhausted;
679 Ok(None)
680 }
681 Some(v) => {
682 *next_ready = Some(
683 tokio::time::Instant::now()
684 + tokio::time::Duration::from_millis(*interval_ms),
685 );
686 Ok(Some(v))
687 }
688 }
689 }
690 VmIter::Debounce { inner, window_ms } => {
691 let mut last = match next_handle(inner, vm).await? {
692 Some(v) => v,
693 None => {
694 *self = VmIter::Exhausted;
695 return Ok(None);
696 }
697 };
698 if *window_ms > 0 {
699 tokio::time::sleep(tokio::time::Duration::from_millis(*window_ms)).await;
700 }
701 while let Some(v) = try_next_ready(inner, vm).await? {
702 last = v;
703 }
704 Ok(Some(last))
705 }
706 VmIter::Chunks { inner, n } => {
707 let n = *n;
708 let mut batch: Vec<VmValue> = Vec::with_capacity(n);
709 for _ in 0..n {
710 let item = next_handle(inner, vm).await?;
711 match item {
712 Some(v) => {
713 batch.push(v);
714 }
715 None => break,
716 }
717 }
718 if batch.is_empty() {
719 *self = VmIter::Exhausted;
720 Ok(None)
721 } else {
722 Ok(Some(VmValue::List(Rc::new(batch))))
723 }
724 }
725 VmIter::Windows { inner, n, buf } => {
726 let n = *n;
727 if buf.is_empty() {
728 while buf.len() < n {
729 let item = next_handle(inner, vm).await?;
730 match item {
731 Some(v) => buf.push_back(v),
732 None => {
733 *self = VmIter::Exhausted;
734 return Ok(None);
735 }
736 }
737 }
738 } else {
739 let item = next_handle(inner, vm).await?;
740 match item {
741 Some(v) => {
742 buf.pop_front();
743 buf.push_back(v);
744 }
745 None => {
746 *self = VmIter::Exhausted;
747 return Ok(None);
748 }
749 }
750 }
751 let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
752 Ok(Some(VmValue::List(Rc::new(snapshot))))
753 }
754 VmIter::Chan { handle } => {
755 let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
756 let rx = handle.receiver.clone();
757 let mut guard = rx.lock().await;
758 let item = if is_closed {
759 guard.try_recv().ok()
760 } else {
761 guard.recv().await
762 };
763 match item {
764 Some(v) => Ok(Some(v)),
765 None => {
766 drop(guard);
767 *self = VmIter::Exhausted;
768 Ok(None)
769 }
770 }
771 }
772 }
773 }
774}
775
776pub async fn next_handle(
785 handle: &Rc<RefCell<VmIter>>,
786 vm: &mut crate::vm::Vm,
787) -> Result<Option<VmValue>, VmError> {
788 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
789 let result = state.next(vm).await;
790 *handle.borrow_mut() = state;
792 result
793}
794
795pub async fn drain(
797 handle: &Rc<RefCell<VmIter>>,
798 vm: &mut crate::vm::Vm,
799) -> Result<Vec<VmValue>, VmError> {
800 let mut out = Vec::new();
801 loop {
802 let v = next_handle(handle, vm).await?;
803 match v {
804 Some(v) => out.push(v),
805 None => break,
806 }
807 }
808 Ok(out)
809}
810
811pub async fn drain_capped(
814 handle: &Rc<RefCell<VmIter>>,
815 vm: &mut crate::vm::Vm,
816 max: usize,
817) -> Result<Vec<VmValue>, VmError> {
818 let mut out = Vec::new();
819 loop {
820 let v = next_handle(handle, vm).await?;
821 match v {
822 Some(v) => {
823 if out.len() >= max {
824 return Err(VmError::Runtime(format!(
825 "stream.collect: max cap {max} exceeded"
826 )));
827 }
828 out.push(v);
829 }
830 None => break,
831 }
832 }
833 Ok(out)
834}
835
836pub fn iter_handle_from_value(v: VmValue) -> Result<Rc<RefCell<VmIter>>, VmError> {
837 match iter_from_value(v)? {
838 VmValue::Iter(handle) => Ok(handle),
839 _ => unreachable!("iter_from_value returns Iter"),
840 }
841}
842
843pub fn broadcast_branches(source: Rc<RefCell<VmIter>>, n: usize) -> Vec<VmValue> {
844 let shared = Rc::new(RefCell::new(VmBroadcastState {
845 source,
846 buffer: Vec::new(),
847 exhausted: false,
848 }));
849 (0..n)
850 .map(|branch| {
851 VmValue::Iter(Rc::new(RefCell::new(VmIter::Broadcast {
852 shared: Rc::clone(&shared),
853 branch,
854 index: 0,
855 })))
856 })
857 .collect()
858}
859
860fn empty_broadcast_state() -> VmBroadcastState {
861 VmBroadcastState {
862 source: Rc::new(RefCell::new(VmIter::Exhausted)),
863 buffer: Vec::new(),
864 exhausted: true,
865 }
866}
867
868fn try_next_ready<'a>(
869 handle: &'a Rc<RefCell<VmIter>>,
870 vm: &'a mut crate::vm::Vm,
871) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>> {
872 Box::pin(async move {
873 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
874 let result = state.try_next_ready_impl(vm).await;
875 *handle.borrow_mut() = state;
876 result
877 })
878}
879
880fn is_exhausted_handle(handle: &Rc<RefCell<VmIter>>) -> bool {
881 matches!(&*handle.borrow(), VmIter::Exhausted)
882}
883
884impl VmIter {
885 async fn try_next_ready_impl(
886 &mut self,
887 vm: &mut crate::vm::Vm,
888 ) -> Result<Option<VmValue>, VmError> {
889 match self {
890 VmIter::Exhausted => Ok(None),
891 VmIter::Map { inner, f } => {
892 let f = f.clone();
893 match try_next_ready(inner, vm).await? {
894 Some(v) => Ok(Some(vm.call_callable_value(&f, &[v]).await?)),
895 None => {
896 if is_exhausted_handle(inner) {
897 *self = VmIter::Exhausted;
898 }
899 Ok(None)
900 }
901 }
902 }
903 VmIter::Tap { inner, f } => {
904 let f = f.clone();
905 match try_next_ready(inner, vm).await? {
906 Some(v) => {
907 vm.call_callable_value(&f, &[v.clone()]).await?;
908 Ok(Some(v))
909 }
910 None => {
911 if is_exhausted_handle(inner) {
912 *self = VmIter::Exhausted;
913 }
914 Ok(None)
915 }
916 }
917 }
918 VmIter::Filter { inner, p } => {
919 let p = p.clone();
920 loop {
921 match try_next_ready(inner, vm).await? {
922 Some(v) => {
923 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
924 if keep.is_truthy() {
925 return Ok(Some(v));
926 }
927 }
928 None => {
929 if is_exhausted_handle(inner) {
930 *self = VmIter::Exhausted;
931 }
932 return Ok(None);
933 }
934 }
935 }
936 }
937 VmIter::Scan { inner, acc, f } => {
938 let f = f.clone();
939 match try_next_ready(inner, vm).await? {
940 Some(v) => {
941 let next_acc = vm.call_callable_value(&f, &[acc.clone(), v]).await?;
942 *acc = next_acc.clone();
943 Ok(Some(next_acc))
944 }
945 None => {
946 if is_exhausted_handle(inner) {
947 *self = VmIter::Exhausted;
948 }
949 Ok(None)
950 }
951 }
952 }
953 VmIter::FlatMap { inner, f, cur } => {
954 let f = f.clone();
955 loop {
956 if let Some(cur_iter) = cur.clone() {
957 match try_next_ready(&cur_iter, vm).await? {
958 Some(v) => return Ok(Some(v)),
959 None => {
960 if is_exhausted_handle(&cur_iter) {
961 *cur = None;
962 }
963 return Ok(None);
964 }
965 }
966 }
967 match try_next_ready(inner, vm).await? {
968 Some(v) => {
969 let result = vm.call_callable_value(&f, &[v]).await?;
970 *cur = Some(iter_handle_from_value(result)?);
971 }
972 None => {
973 if is_exhausted_handle(inner) {
974 *self = VmIter::Exhausted;
975 }
976 return Ok(None);
977 }
978 }
979 }
980 }
981 VmIter::Take { inner, remaining } => {
982 if *remaining == 0 {
983 *self = VmIter::Exhausted;
984 return Ok(None);
985 }
986 match try_next_ready(inner, vm).await? {
987 Some(v) => {
988 *remaining -= 1;
989 if *remaining == 0 {
990 *self = VmIter::Exhausted;
991 }
992 Ok(Some(v))
993 }
994 None => {
995 if is_exhausted_handle(inner) {
996 *self = VmIter::Exhausted;
997 }
998 Ok(None)
999 }
1000 }
1001 }
1002 VmIter::TakeUntil { inner, p } => {
1003 let p = p.clone();
1004 match try_next_ready(inner, vm).await? {
1005 Some(v) => {
1006 let stop = vm.call_callable_value(&p, &[v.clone()]).await?;
1007 if stop.is_truthy() {
1008 *self = VmIter::Exhausted;
1009 Ok(None)
1010 } else {
1011 Ok(Some(v))
1012 }
1013 }
1014 None => {
1015 if is_exhausted_handle(inner) {
1016 *self = VmIter::Exhausted;
1017 }
1018 Ok(None)
1019 }
1020 }
1021 }
1022 VmIter::Throttle {
1023 inner,
1024 interval_ms,
1025 next_ready,
1026 } => {
1027 if let Some(ready_at) = *next_ready {
1028 if ready_at > tokio::time::Instant::now() {
1029 return Ok(None);
1030 }
1031 }
1032 match try_next_ready(inner, vm).await? {
1033 Some(v) => {
1034 *next_ready = Some(
1035 tokio::time::Instant::now()
1036 + tokio::time::Duration::from_millis(*interval_ms),
1037 );
1038 Ok(Some(v))
1039 }
1040 None => {
1041 if is_exhausted_handle(inner) {
1042 *self = VmIter::Exhausted;
1043 }
1044 Ok(None)
1045 }
1046 }
1047 }
1048 VmIter::Range { .. }
1049 | VmIter::Vec { .. }
1050 | VmIter::Dict { .. }
1051 | VmIter::Chars { .. }
1052 | VmIter::Skip { .. }
1053 | VmIter::TakeWhile { .. }
1054 | VmIter::SkipWhile { .. }
1055 | VmIter::Zip { .. }
1056 | VmIter::Enumerate { .. }
1057 | VmIter::Chain { .. }
1058 | VmIter::Merge { .. }
1059 | VmIter::Interleave { .. }
1060 | VmIter::Race { .. }
1061 | VmIter::Broadcast { .. }
1062 | VmIter::Debounce { .. }
1063 | VmIter::Chunks { .. }
1064 | VmIter::Windows { .. } => self.next(vm).await,
1065 VmIter::Gen { gen } => {
1066 if gen.done.get() {
1067 *self = VmIter::Exhausted;
1068 return Ok(None);
1069 }
1070 let rx = gen.receiver.clone();
1071 let result = match rx.try_lock() {
1072 Ok(mut guard) => match guard.try_recv() {
1073 Ok(Ok(v)) => Ok(Some(v)),
1074 Ok(Err(error)) => {
1075 gen.done.set(true);
1076 *self = VmIter::Exhausted;
1077 Err(error)
1078 }
1079 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1080 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1081 gen.done.set(true);
1082 *self = VmIter::Exhausted;
1083 Ok(None)
1084 }
1085 },
1086 Err(_) => Ok(None),
1087 };
1088 result
1089 }
1090 VmIter::Stream { stream } => {
1091 if stream.done.get() {
1092 *self = VmIter::Exhausted;
1093 return Ok(None);
1094 }
1095 let rx = stream.receiver.clone();
1096 let result = match rx.try_lock() {
1097 Ok(mut guard) => match guard.try_recv() {
1098 Ok(Ok(v)) => Ok(Some(v)),
1099 Ok(Err(error)) => {
1100 stream.done.set(true);
1101 *self = VmIter::Exhausted;
1102 Err(error)
1103 }
1104 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1105 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1106 stream.done.set(true);
1107 *self = VmIter::Exhausted;
1108 Ok(None)
1109 }
1110 },
1111 Err(_) => Ok(None),
1112 };
1113 result
1114 }
1115 VmIter::Chan { handle } => {
1116 let rx = handle.receiver.clone();
1117 let result = match rx.try_lock() {
1118 Ok(mut guard) => match guard.try_recv() {
1119 Ok(v) => Ok(Some(v)),
1120 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
1121 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1122 *self = VmIter::Exhausted;
1123 Ok(None)
1124 }
1125 },
1126 Err(_) => Ok(None),
1127 };
1128 result
1129 }
1130 }
1131 }
1132}
1133
1134pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
1137 let inner = match v {
1138 VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
1139 VmValue::Range(r) => {
1140 let stop = if r.inclusive {
1141 r.end.saturating_add(1)
1142 } else {
1143 r.end
1144 };
1145 VmIter::Range {
1146 next: r.start,
1147 stop,
1148 }
1149 }
1150 VmValue::List(items) => VmIter::Vec { items, idx: 0 },
1151 VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
1152 VmValue::Dict(entries) => {
1153 let keys: Vec<String> = entries.keys().cloned().collect();
1154 VmIter::Dict {
1155 entries,
1156 keys,
1157 idx: 0,
1158 }
1159 }
1160 VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
1161 VmValue::Generator(gen) => VmIter::Gen { gen },
1162 VmValue::Stream(stream) => VmIter::Stream { stream },
1163 VmValue::Channel(handle) => VmIter::Chan { handle },
1164 other => {
1165 return Err(VmError::TypeError(format!(
1166 "iter: value of type {} is not iterable",
1167 other.type_name()
1168 )))
1169 }
1170 };
1171 Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
1172}