1use std::cell::RefCell;
11use std::collections::{BTreeMap, VecDeque};
12use std::rc::Rc;
13
14use crate::chunk::CompiledFunction;
15use crate::value::{VmChannelHandle, VmError, VmGenerator, VmValue};
16
17#[derive(Debug)]
19pub enum VmIter {
20 Range { next: i64, stop: i64 },
24 Vec { items: Rc<Vec<VmValue>>, idx: usize },
26 Dict {
29 entries: Rc<BTreeMap<String, VmValue>>,
30 keys: Vec<String>,
31 idx: usize,
32 },
33 Chars { s: Rc<str>, byte_idx: usize },
35 Gen { gen: VmGenerator },
37 Chan { handle: VmChannelHandle },
39 Map {
41 inner: Rc<RefCell<VmIter>>,
42 f: VmValue,
43 },
44 Filter {
46 inner: Rc<RefCell<VmIter>>,
47 p: VmValue,
48 },
49 FlatMap {
51 inner: Rc<RefCell<VmIter>>,
52 f: VmValue,
53 cur: Option<Rc<RefCell<VmIter>>>,
54 },
55 Take {
57 inner: Rc<RefCell<VmIter>>,
58 remaining: usize,
59 },
60 Skip {
63 inner: Rc<RefCell<VmIter>>,
64 remaining: usize,
65 },
66 TakeWhile {
69 inner: Rc<RefCell<VmIter>>,
70 p: VmValue,
71 done: bool,
72 },
73 SkipWhile {
76 inner: Rc<RefCell<VmIter>>,
77 p: VmValue,
78 primed: bool,
79 },
80 Zip {
83 a: Rc<RefCell<VmIter>>,
84 b: Rc<RefCell<VmIter>>,
85 },
86 Enumerate { inner: Rc<RefCell<VmIter>>, i: i64 },
88 Chain {
90 a: Rc<RefCell<VmIter>>,
91 b: Rc<RefCell<VmIter>>,
92 on_a: bool,
93 },
94 Chunks {
97 inner: Rc<RefCell<VmIter>>,
98 n: usize,
99 },
100 Windows {
103 inner: Rc<RefCell<VmIter>>,
104 n: usize,
105 buf: VecDeque<VmValue>,
106 },
107 Exhausted,
109}
110
111impl VmIter {
112 pub fn next<'a>(
117 &'a mut self,
118 vm: &'a mut crate::vm::Vm,
119 functions: &'a [CompiledFunction],
120 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<VmValue>, VmError>> + 'a>>
121 {
122 Box::pin(async move { self.next_impl(vm, functions).await })
123 }
124
125 async fn next_impl(
126 &mut self,
127 vm: &mut crate::vm::Vm,
128 functions: &[CompiledFunction],
129 ) -> Result<Option<VmValue>, VmError> {
130 match self {
131 VmIter::Exhausted => Ok(None),
132 VmIter::Range { next, stop } => {
133 if *next < *stop {
134 let v = *next;
135 *next += 1;
136 Ok(Some(VmValue::Int(v)))
137 } else {
138 *self = VmIter::Exhausted;
139 Ok(None)
140 }
141 }
142 VmIter::Vec { items, idx } => {
143 if *idx < items.len() {
144 let v = items[*idx].clone();
145 *idx += 1;
146 Ok(Some(v))
147 } else {
148 *self = VmIter::Exhausted;
149 Ok(None)
150 }
151 }
152 VmIter::Dict { entries, keys, idx } => {
153 if *idx < keys.len() {
154 let k = &keys[*idx];
155 let v = entries.get(k).cloned().unwrap_or(VmValue::Nil);
156 *idx += 1;
157 Ok(Some(VmValue::Pair(Rc::new((
158 VmValue::String(Rc::from(k.as_str())),
159 v,
160 )))))
161 } else {
162 *self = VmIter::Exhausted;
163 Ok(None)
164 }
165 }
166 VmIter::Chars { s, byte_idx } => {
167 if *byte_idx >= s.len() {
168 *self = VmIter::Exhausted;
169 return Ok(None);
170 }
171 let rest = &s[*byte_idx..];
172 if let Some(c) = rest.chars().next() {
173 *byte_idx += c.len_utf8();
174 Ok(Some(VmValue::String(Rc::from(c.to_string().as_str()))))
175 } else {
176 *self = VmIter::Exhausted;
177 Ok(None)
178 }
179 }
180 VmIter::Gen { gen } => {
181 if gen.done.get() {
182 *self = VmIter::Exhausted;
183 return Ok(None);
184 }
185 let rx = gen.receiver.clone();
186 let mut guard = rx.lock().await;
187 match guard.recv().await {
188 Some(v) => Ok(Some(v)),
189 None => {
190 gen.done.set(true);
191 drop(guard);
192 *self = VmIter::Exhausted;
193 Ok(None)
194 }
195 }
196 }
197 VmIter::Map { inner, f } => {
198 let f = f.clone();
199 let item = next_handle(inner, vm, functions).await?;
200 match item {
201 None => {
202 *self = VmIter::Exhausted;
203 Ok(None)
204 }
205 Some(v) => {
206 let out = vm.call_callable_value(&f, &[v], functions).await?;
207 Ok(Some(out))
208 }
209 }
210 }
211 VmIter::Filter { inner, p } => {
212 let p = p.clone();
213 loop {
214 let item = next_handle(inner, vm, functions).await?;
215 match item {
216 None => {
217 *self = VmIter::Exhausted;
218 return Ok(None);
219 }
220 Some(v) => {
221 let keep = vm.call_callable_value(&p, &[v.clone()], functions).await?;
222 if keep.is_truthy() {
223 return Ok(Some(v));
224 }
225 }
226 }
227 }
228 }
229 VmIter::FlatMap { inner, f, cur } => {
230 let f = f.clone();
231 loop {
232 if let Some(cur_iter) = cur.clone() {
233 let item = next_handle(&cur_iter, vm, functions).await?;
234 if let Some(v) = item {
235 return Ok(Some(v));
236 }
237 *cur = None;
238 }
239 let item = next_handle(inner, vm, functions).await?;
240 match item {
241 None => {
242 *self = VmIter::Exhausted;
243 return Ok(None);
244 }
245 Some(v) => {
246 let result = vm.call_callable_value(&f, &[v], functions).await?;
247 let lifted = iter_from_value(result)?;
248 if let VmValue::Iter(h) = lifted {
249 *cur = Some(h);
250 } else {
251 return Err(VmError::TypeError(
252 "flat_map: expected iterable result".to_string(),
253 ));
254 }
255 }
256 }
257 }
258 }
259 VmIter::Take { inner, remaining } => {
260 if *remaining == 0 {
261 *self = VmIter::Exhausted;
262 return Ok(None);
263 }
264 let item = next_handle(inner, vm, functions).await?;
265 match item {
266 None => {
267 *self = VmIter::Exhausted;
268 Ok(None)
269 }
270 Some(v) => {
271 *remaining -= 1;
272 if *remaining == 0 {
273 *self = VmIter::Exhausted;
274 }
275 Ok(Some(v))
276 }
277 }
278 }
279 VmIter::Skip { inner, remaining } => {
280 while *remaining > 0 {
281 let item = next_handle(inner, vm, functions).await?;
282 match item {
283 None => {
284 *self = VmIter::Exhausted;
285 return Ok(None);
286 }
287 Some(_) => {
288 *remaining -= 1;
289 }
290 }
291 }
292 let item = next_handle(inner, vm, functions).await?;
293 match item {
294 None => {
295 *self = VmIter::Exhausted;
296 Ok(None)
297 }
298 Some(v) => Ok(Some(v)),
299 }
300 }
301 VmIter::TakeWhile { inner, p, done } => {
302 if *done {
303 return Ok(None);
304 }
305 let p = p.clone();
306 let item = next_handle(inner, vm, functions).await?;
307 match item {
308 None => {
309 *self = VmIter::Exhausted;
310 Ok(None)
311 }
312 Some(v) => {
313 let keep = vm.call_callable_value(&p, &[v.clone()], functions).await?;
314 if keep.is_truthy() {
315 Ok(Some(v))
316 } else {
317 *self = VmIter::Exhausted;
318 Ok(None)
319 }
320 }
321 }
322 }
323 VmIter::SkipWhile { inner, p, primed } => {
324 if *primed {
325 let item = next_handle(inner, vm, functions).await?;
326 return match item {
327 None => {
328 *self = VmIter::Exhausted;
329 Ok(None)
330 }
331 Some(v) => Ok(Some(v)),
332 };
333 }
334 let p = p.clone();
335 loop {
336 let item = next_handle(inner, vm, functions).await?;
337 match item {
338 None => {
339 *self = VmIter::Exhausted;
340 return Ok(None);
341 }
342 Some(v) => {
343 let drop_it =
344 vm.call_callable_value(&p, &[v.clone()], functions).await?;
345 if !drop_it.is_truthy() {
346 *primed = true;
347 return Ok(Some(v));
348 }
349 }
350 }
351 }
352 }
353 VmIter::Zip { a, b } => {
354 let ia = next_handle(a, vm, functions).await?;
355 let x = match ia {
356 None => {
357 *self = VmIter::Exhausted;
358 return Ok(None);
359 }
360 Some(v) => v,
361 };
362 let ib = next_handle(b, vm, functions).await?;
363 let y = match ib {
364 None => {
365 *self = VmIter::Exhausted;
366 return Ok(None);
367 }
368 Some(v) => v,
369 };
370 Ok(Some(VmValue::Pair(Rc::new((x, y)))))
371 }
372 VmIter::Enumerate { inner, i } => {
373 let item = next_handle(inner, vm, functions).await?;
374 match item {
375 None => {
376 *self = VmIter::Exhausted;
377 Ok(None)
378 }
379 Some(v) => {
380 let idx = *i;
381 *i += 1;
382 Ok(Some(VmValue::Pair(Rc::new((VmValue::Int(idx), v)))))
383 }
384 }
385 }
386 VmIter::Chain { a, b, on_a } => {
387 if *on_a {
388 let item = next_handle(a, vm, functions).await?;
389 if let Some(v) = item {
390 return Ok(Some(v));
391 }
392 *on_a = false;
393 }
394 let item = next_handle(b, vm, functions).await?;
395 match item {
396 None => {
397 *self = VmIter::Exhausted;
398 Ok(None)
399 }
400 Some(v) => Ok(Some(v)),
401 }
402 }
403 VmIter::Chunks { inner, n } => {
404 let n = *n;
405 let mut batch: Vec<VmValue> = Vec::with_capacity(n);
406 for _ in 0..n {
407 let item = next_handle(inner, vm, functions).await?;
408 match item {
409 Some(v) => batch.push(v),
410 None => break,
411 }
412 }
413 if batch.is_empty() {
414 *self = VmIter::Exhausted;
415 Ok(None)
416 } else {
417 Ok(Some(VmValue::List(Rc::new(batch))))
418 }
419 }
420 VmIter::Windows { inner, n, buf } => {
421 let n = *n;
422 if buf.is_empty() {
423 while buf.len() < n {
425 let item = next_handle(inner, vm, functions).await?;
426 match item {
427 Some(v) => buf.push_back(v),
428 None => {
429 *self = VmIter::Exhausted;
430 return Ok(None);
431 }
432 }
433 }
434 } else {
435 let item = next_handle(inner, vm, functions).await?;
437 match item {
438 Some(v) => {
439 buf.pop_front();
440 buf.push_back(v);
441 }
442 None => {
443 *self = VmIter::Exhausted;
444 return Ok(None);
445 }
446 }
447 }
448 let snapshot: Vec<VmValue> = buf.iter().cloned().collect();
449 Ok(Some(VmValue::List(Rc::new(snapshot))))
450 }
451 VmIter::Chan { handle } => {
452 let is_closed = handle.closed.load(std::sync::atomic::Ordering::Relaxed);
453 let rx = handle.receiver.clone();
454 let mut guard = rx.lock().await;
455 let item = if is_closed {
456 guard.try_recv().ok()
457 } else {
458 guard.recv().await
459 };
460 match item {
461 Some(v) => Ok(Some(v)),
462 None => {
463 drop(guard);
464 *self = VmIter::Exhausted;
465 Ok(None)
466 }
467 }
468 }
469 }
470 }
471}
472
473pub async fn next_handle(
482 handle: &Rc<RefCell<VmIter>>,
483 vm: &mut crate::vm::Vm,
484 functions: &[CompiledFunction],
485) -> Result<Option<VmValue>, VmError> {
486 let mut state = std::mem::replace(&mut *handle.borrow_mut(), VmIter::Exhausted);
487 let result = state.next(vm, functions).await;
488 *handle.borrow_mut() = state;
491 result
492}
493
494pub async fn drain(
496 handle: &Rc<RefCell<VmIter>>,
497 vm: &mut crate::vm::Vm,
498 functions: &[CompiledFunction],
499) -> Result<Vec<VmValue>, VmError> {
500 let mut out = Vec::new();
501 loop {
502 let v = next_handle(handle, vm, functions).await?;
503 match v {
504 Some(v) => out.push(v),
505 None => break,
506 }
507 }
508 Ok(out)
509}
510
511pub fn iter_from_value(v: VmValue) -> Result<VmValue, VmError> {
514 let inner = match v {
515 VmValue::Iter(h) => return Ok(VmValue::Iter(h)),
516 VmValue::Range(r) => {
517 let stop = if r.inclusive {
518 r.end.saturating_add(1)
519 } else {
520 r.end
521 };
522 VmIter::Range {
523 next: r.start,
524 stop,
525 }
526 }
527 VmValue::List(items) => VmIter::Vec { items, idx: 0 },
528 VmValue::Set(items) => VmIter::Vec { items, idx: 0 },
529 VmValue::Dict(entries) => {
530 let keys: Vec<String> = entries.keys().cloned().collect();
531 VmIter::Dict {
532 entries,
533 keys,
534 idx: 0,
535 }
536 }
537 VmValue::String(s) => VmIter::Chars { s, byte_idx: 0 },
538 VmValue::Generator(gen) => VmIter::Gen { gen },
539 VmValue::Channel(handle) => VmIter::Chan { handle },
540 other => {
541 return Err(VmError::TypeError(format!(
542 "iter: value of type {} is not iterable",
543 other.type_name()
544 )))
545 }
546 };
547 Ok(VmValue::Iter(Rc::new(RefCell::new(inner))))
548}