1use crate::{
2 FromValue, GeneratorState, InstallWith, Mut, Named, RawMut, RawRef, RawStr, Ref, Shared,
3 UnsafeFromValue, Value, Vm, VmError, VmErrorKind, VmExecution,
4};
5use std::fmt;
6use std::mem;
7
8pub struct Stream {
10 execution: Option<VmExecution>,
11 first: bool,
12}
13
14impl Stream {
15 pub(crate) fn new(vm: Vm) -> Self {
17 Self {
18 execution: Some(VmExecution::new(vm)),
19 first: true,
20 }
21 }
22
23 pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
25 Ok(match self.resume(Value::Unit).await? {
26 GeneratorState::Yielded(value) => Some(value),
27 GeneratorState::Complete(_) => None,
28 })
29 }
30
31 pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
33 let execution = self
34 .execution
35 .as_mut()
36 .ok_or_else(|| VmErrorKind::GeneratorComplete)?;
37
38 if !mem::take(&mut self.first) {
39 execution.vm_mut()?.stack_mut().push(value);
40 }
41
42 let state = execution.async_resume().await?;
43
44 if state.is_complete() {
45 self.execution = None;
46 }
47
48 Ok(state)
49 }
50}
51
52impl fmt::Debug for Stream {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("Stream")
55 .field("completed", &self.execution.is_none())
56 .finish()
57 }
58}
59
60impl FromValue for Shared<Stream> {
61 fn from_value(value: Value) -> Result<Self, VmError> {
62 Ok(value.into_stream()?)
63 }
64}
65
66impl FromValue for Stream {
67 fn from_value(value: Value) -> Result<Self, VmError> {
68 let stream = value.into_stream()?;
69 Ok(stream.take()?)
70 }
71}
72
73impl UnsafeFromValue for &Stream {
74 type Output = *const Stream;
75 type Guard = RawRef;
76
77 fn from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
78 let stream = value.into_stream()?;
79 let (stream, guard) = Ref::into_raw(stream.into_ref()?);
80 Ok((stream, guard))
81 }
82
83 unsafe fn unsafe_coerce(output: Self::Output) -> Self {
84 &*output
85 }
86}
87
88impl UnsafeFromValue for &mut Stream {
89 type Output = *mut Stream;
90 type Guard = RawMut;
91
92 fn from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
93 let stream = value.into_stream()?;
94 Ok(Mut::into_raw(stream.into_mut()?))
95 }
96
97 unsafe fn unsafe_coerce(output: Self::Output) -> Self {
98 &mut *output
99 }
100}
101
102impl Named for Stream {
103 const BASE_NAME: RawStr = RawStr::from_str("Stream");
104}
105
106impl InstallWith for Stream {}