1use std::cell::RefCell;
11use std::collections::{BTreeMap, VecDeque};
12use std::rc::Rc;
13
14use crate::value::{VmChannelHandle, VmError, VmGenerator, VmStream, VmValue};
15
16#[derive(Debug)]
18pub enum VmIter {
19 Range { next: i64, stop: i64 },
23 Vec { items: Rc<Vec<VmValue>>, idx: usize },
25 Dict {
28 entries: Rc<BTreeMap<String, VmValue>>,
29 keys: Vec<String>,
30 idx: usize,
31 },
32 Chars { s: Rc<str>, byte_idx: usize },
34 Gen { gen: VmGenerator },
36 Stream { stream: VmStream },
38 Chan { handle: VmChannelHandle },
40 Map {
42 inner: Rc<RefCell<VmIter>>,
43 f: VmValue,
44 },
45 Filter {
47 inner: Rc<RefCell<VmIter>>,
48 p: VmValue,
49 },
50 FlatMap {
52 inner: Rc<RefCell<VmIter>>,
53 f: VmValue,
54 cur: Option<Rc<RefCell<VmIter>>>,
55 },
56 Take {
58 inner: Rc<RefCell<VmIter>>,
59 remaining: usize,
60 },
61 Skip {
64 inner: Rc<RefCell<VmIter>>,
65 remaining: usize,
66 },
67 TakeWhile {
70 inner: Rc<RefCell<VmIter>>,
71 p: VmValue,
72 done: bool,
73 },
74 SkipWhile {
77 inner: Rc<RefCell<VmIter>>,
78 p: VmValue,
79 primed: bool,
80 },
81 Zip {
84 a: Rc<RefCell<VmIter>>,
85 b: Rc<RefCell<VmIter>>,
86 },
87 Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
89 Chain {
91 a: Rc<RefCell<VmIter>>,
92 b: Rc<RefCell<VmIter>>,
93 on_a: bool,
94 },
95 Chunks {
98 inner: Rc<RefCell<VmIter>>,
99 n: usize,
100 },
101 Windows {
104 inner: Rc<RefCell<VmIter>>,
105 n: usize,
106 buf: VecDeque<VmValue>,
107 },
108 Exhausted,
110}
111
112impl VmIter {
113 pub fn next<'a>(
118 &'a mut self,
119 vm: &'a mut crate::vm::Vm,
120 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
121 {
122 Box::pin(async move { self.next_impl(vm).await })
123 }
124
125 async fn next_impl(&mut self, vm: &mut crate::vm::Vm) -> Result<Option<VmValue>, VmError> {
126 match self {
127 VmIter::Exhausted => Ok(None),
128 VmIter::Range { next, stop } => {
129 if *next < *stop {
130 let v = *next;
131 *next += 1;
132 Ok(Some(VmValue::Int(v)))
133 } else {
134 *self = VmIter::Exhausted;
135 Ok(None)
136 }
137 }
138 VmIter::Vec { items, idx } => {
139 if *idx < items.len() {
140 let v = items[*idx].clone();
141 *idx += 1;
142 Ok(Some(v))
143 } else {
144 *self = VmIter::Exhausted;
145 Ok(None)
146 }
147 }
148 VmIter::Dict { entries, keys, idx } => {
149 if *idx < keys.len() {
150 let k = &keys[*idx];
151 let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
152 *idx += 1;
153 Ok(Some(VmValue::Pair(Rc::new((
154 VmValue::String(Rc::from(k.as_str())),
155 v,
156 )))))
157 } else {
158 *self = VmIter::Exhausted;
159 Ok(None)
160 }
161 }
162 VmIter::Chars { s, byte_idx } => {
163 if *byte_idx >= s.len() {
164 *self = VmIter::Exhausted;
165 return Ok(None);
166 }
167 let rest = &s[*byte_idx..];
168 if let Some(c) = rest.chars().next() {
169 *byte_idx += c.len_utf8();
170 Ok(Some(VmValue::String(Rc::from(c.to_string().as_str()))))
171 } else {
172 *self = VmIter::Exhausted;
173 Ok(None)
174 }
175 }
176 VmIter::Gen { gen } => {
177 if gen.done.get() {
178 *self = VmIter::Exhausted;
179 return Ok(None);
180 }
181 let rx = gen.receiver.clone();
182 let mut guard = rx.lock().await;
183 match guard.recv().await {
184 Some(Ok(v)) => Ok(Some(v)),
185 Some(Err(error)) => {
186 gen.done.set(true);
187 drop(guard);
188 *self = VmIter::Exhausted;
189 Err(error)
190 }
191 None => {
192 gen.done.set(true);
193 drop(guard);
194 *self = VmIter::Exhausted;
195 Ok(None)
196 }
197 }
198 }
199 VmIter::Stream { stream } => {
200 if stream.done.get() {
201 *self = VmIter::Exhausted;
202 return Ok(None);
203 }
204 let rx = stream.receiver.clone();
205 let mut guard = rx.lock().await;
206 match guard.recv().await {
207 Some(Ok(v)) => Ok(Some(v)),
208 Some(Err(error)) => {
209 stream.done.set(true);
210 drop(guard);
211 *self = VmIter::Exhausted;
212 Err(error)
213 }
214 None => {
215 stream.done.set(true);
216 drop(guard);
217 *self = VmIter::Exhausted;
218 Ok(None)
219 }
220 }
221 }
222 VmIter::Map { inner, f } => {
223 let f = f.clone();
224 let item = next_handle(inner, vm).await?;
225 match item {
226 None => {
227 *self = VmIter::Exhausted;
228 Ok(None)
229 }
230 Some(v) => {
231 let out = vm.call_callable_value(&f, &[v]).await?;
232 Ok(Some(out))
233 }
234 }
235 }
236 VmIter::Filter { inner, p } => {
237 let p = p.clone();
238 loop {
239 let item = next_handle(inner, vm).await?;
240 match item {
241 None => {
242 *self = VmIter::Exhausted;
243 return Ok(None);
244 }
245 Some(v) => {
246 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
247 if keep.is_truthy() {
248 return Ok(Some(v));
249 }
250 }
251 }
252 }
253 }
254 VmIter::FlatMap { inner, f, cur } => {
255 let f = f.clone();
256 loop {
257 if let Some(cur_iter) = cur.clone() {
258 let item = next_handle(&cur_iter, vm).await?;
259 if let Some(v) = item {
260 return Ok(Some(v));
261 }
262 *cur = None;
263 }
264 let item = next_handle(inner, vm).await?;
265 match item {
266 None => {
267 *self = VmIter::Exhausted;
268 return Ok(None);
269 }
270 Some(v) => {
271 let result = vm.call_callable_value(&f, &[v]).await?;
272 let lifted = iter_from_value(result)?;
273 if let VmValue::Iter(h) = lifted {
274 *cur = Some(h);
275 } else {
276 return Err(VmError::TypeError(
277 "flat_map: expected iterable result".to_string(),
278 ));
279 }
280 }
281 }
282 }
283 }
284 VmIter::Take { inner, remaining } => {
285 if *remaining == 0 {
286 *self = VmIter::Exhausted;
287 return Ok(None);
288 }
289 let item = next_handle(inner, vm).await?;
290 match item {
291 None => {
292 *self = VmIter::Exhausted;
293 Ok(None)
294 }
295 Some(v) => {
296 *remaining -= 1;
297 if *remaining == 0 {
298 *self = VmIter::Exhausted;
299 }
300 Ok(Some(v))
301 }
302 }
303 }
304 VmIter::Skip { inner, remaining } => {
305 while *remaining > 0 {
306 let item = next_handle(inner, vm).await?;
307 match item {
308 None => {
309 *self = VmIter::Exhausted;
310 return Ok(None);
311 }
312 Some(_) => {
313 *remaining -= 1;
314 }
315 }
316 }
317 let item = next_handle(inner, vm).await?;
318 match item {
319 None => {
320 *self = VmIter::Exhausted;
321 Ok(None)
322 }
323 Some(v) => Ok(Some(v)),
324 }
325 }
326 VmIter::TakeWhile { inner, p, done } => {
327 if *done {
328 return Ok(None);
329 }
330 let p = p.clone();
331 let item = next_handle(inner, vm).await?;
332 match item {
333 None => {
334 *self = VmIter::Exhausted;
335 Ok(None)
336 }
337 Some(v) => {
338 let keep = vm.call_callable_value(&p, &[v.clone()]).await?;
339 if keep.is_truthy() {
340 Ok(Some(v))
341 } else {
342 *self = VmIter::Exhausted;
343 Ok(None)
344 }
345 }
346 }
347 }
348 VmIter::SkipWhile { inner, p, primed } => {
349 if *primed {
350 let item = next_handle(inner, vm).await?;
351 return match item {
352 None => {
353 *self = VmIter::Exhausted;
354 Ok(None)
355 }
356 Some(v) => Ok(Some(v)),
357 };
358 }
359 let p = p.clone();
360 loop {
361 let item = next_handle(inner, vm).await?;
362 match item {
363 None => {
364 *self = VmIter::Exhausted;
365 return Ok(None);
366 }
367 Some(v) => {
368 let drop_it = vm.call_callable_value(&p, &[v.clone()]).await?;
369 if !drop_it.is_truthy() {
370 *primed = true;
371 return Ok(Some(v));
372 }
373 }
374 }
375 }
376 }
377 VmIter::Zip { a, b } => {
378 let ia = next_handle(a, vm).await?;
379 let x = match ia {
380 None => {
381 *self = VmIter::Exhausted;
382 return Ok(None);
383 }
384 Some(v) => v,
385 };
386 let ib = next_handle(b, vm).await?;
387 let y = match ib {
388 None => {
389 *self = VmIter::Exhausted;
390 return Ok(None);
391 }
392 Some(v) => v,
393 };
394 Ok(Some(VmValue::Pair(Rc::new((x, y)))))
395 }
396 VmIter::Enumerate { inner, i } => {
397 let item = next_handle(inner, vm).await?;
398 match item {
399 None => {
400 *self = VmIter::Exhausted;
401 Ok(None)
402 }
403 Some(v) => {
404 let idx = *i;
405 *i += 1;
406 Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
407 }
408 }
409 }
410 VmIter::Chain { a, b, on_a } => {
411 if *on_a {
412 let item = next_handle(a, vm).await?;
413 if let Some(v) = item {
414 return Ok(Some(v));
415 }
416 *on_a = false;
417 }
418 let item = next_handle(b, vm).await?;
419 match item {
420 None => {
421 *self = VmIter::Exhausted;
422 Ok(None)
423 }
424 Some(v) => Ok(Some(v)),
425 }
426 }
427 VmIter::Chunks { inner, n } => {
428 let n = *n;
429 let mut batch: Vec<VmValue> = Vec::with_capacity(n);
430 for _ in 0..n {
431 let item = next_handle(inner, vm).await?;
432 match item {
433 Some(v) => batch.push(v),
434 None => break,
435 }
436 }
437 if batch.is_empty() {
438 *self = VmIter::Exhausted;
439 Ok(None)
440 } else {
441 Ok(Some(VmValue::List(Rc::new(batch))))
442 }
443 }
444 VmIter::Windows { inner, n, buf } => {
445 let n = *n;
446 if buf.is_empty() {
447 while buf.len() < n {
448 let item = next_handle(inner, vm).await?;
449 match item {
450 Some(v) => buf.push_back(v),
451 None => {
452 *self = VmIter::Exhausted;
453 return Ok(None);
454 }
455 }
456 }
457 } else {
458 let item = next_handle(inner, vm).await?;
459 match item {
460 Some(v) => {
461 buf.pop_front();
462 buf.push_back(v);
463 }
464 None => {
465 *self = VmIter::Exhausted;
466 return Ok(None);
467 }
468 }
469 }
470 let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
471 Ok(Some(VmValue::List(Rc::new(snapshot))))
472 }
473 VmIter::Chan { handle } => {
474 let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
475 let rx = handle.receiver.clone();
476 let mut guard = rx.lock().await;
477 let item = if is_closed {
478 guard.try_recv().ok()
479 } else {
480 guard.recv().await
481 };
482 match item {
483 Some(v) => Ok(Some(v)),
484 None => {
485 drop(guard);
486 *self = VmIter::Exhausted;
487 Ok(None)
488 }
489 }
490 }
491 }
492 }
493}
494
495pub async fn next_handle(
504 handle: &Rc<RefCell<VmIter>>,
505 vm: &mut crate::vm::Vm,
506) -> Result<Option<VmValue>, VmError> {
507 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
508 let result = state.next(vm).await;
509 *handle.borrow_mut() = state;
511 result
512}
513
514pub async fn drain(
516 handle: &Rc<RefCell<VmIter>>,
517 vm: &mut crate::vm::Vm,
518) -> Result<Vec<VmValue>, VmError> {
519 let mut out = Vec::new();
520 loop {
521 let v = next_handle(handle, vm).await?;
522 match v {
523 Some(v) => out.push(v),
524 None => break,
525 }
526 }
527 Ok(out)
528}
529
530pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
533 let inner = match v {
534 VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
535 VmValue::Range(r) => {
536 let stop = if r.inclusive {
537 r.end.saturating_add(1)
538 } else {
539 r.end
540 };
541 VmIter::Range {
542 next: r.start,
543 stop,
544 }
545 }
546 VmValue::List(items) => VmIter::Vec { items, idx: 0 },
547 VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
548 VmValue::Dict(entries) => {
549 let keys: Vec<String> = entries.keys().cloned().collect();
550 VmIter::Dict {
551 entries,
552 keys,
553 idx: 0,
554 }
555 }
556 VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
557 VmValue::Generator(gen) => VmIter::Gen { gen },
558 VmValue::Stream(stream) => VmIter::Stream { stream },
559 VmValue::Channel(handle) => VmIter::Chan { handle },
560 other => {
561 return Err(VmError::TypeError(format!(
562 "iter: value of type {} is not iterable",
563 other.type_name()
564 )))
565 }
566 };
567 Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
568}