1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use crate::{
FromValue, GeneratorState, OwnedMut, OwnedRef, RawOwnedMut, RawOwnedRef, Shared,
UnsafeFromValue, Value, Vm, VmError, VmErrorKind, VmExecution,
};
use std::fmt;
use std::mem;
value_types!(crate::STREAM_TYPE, Stream => Stream, &Stream, &mut Stream);
pub struct Stream {
execution: Option<VmExecution>,
first: bool,
}
impl Stream {
pub(crate) fn new(vm: Vm) -> Self {
Self {
execution: Some(VmExecution::of(vm)),
first: true,
}
}
pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
Ok(match self.resume(Value::Unit).await? {
GeneratorState::Yielded(value) => Some(value),
GeneratorState::Complete(_) => None,
})
}
pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
let execution = match &mut self.execution {
Some(execution) => execution,
None => {
return Err(VmError::from(VmErrorKind::GeneratorComplete));
}
};
if !mem::take(&mut self.first) {
execution.vm_mut()?.stack_mut().push(value);
}
let state = execution.async_resume().await?;
if state.is_complete() {
self.execution = None;
}
Ok(state)
}
}
impl fmt::Debug for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Stream")
.field("completed", &self.execution.is_none())
.finish()
}
}
impl FromValue for Shared<Stream> {
fn from_value(value: Value) -> Result<Self, VmError> {
Ok(value.into_stream()?)
}
}
impl FromValue for Stream {
fn from_value(value: Value) -> Result<Self, VmError> {
let stream = value.into_stream()?;
Ok(stream.take()?)
}
}
impl UnsafeFromValue for &Stream {
type Output = *const Stream;
type Guard = RawOwnedRef;
unsafe fn unsafe_from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
let stream = value.into_stream()?;
let (stream, guard) = OwnedRef::into_raw(stream.owned_ref()?);
Ok((stream, guard))
}
unsafe fn to_arg(output: Self::Output) -> Self {
&*output
}
}
impl UnsafeFromValue for &mut Stream {
type Output = *mut Stream;
type Guard = RawOwnedMut;
unsafe fn unsafe_from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
let stream = value.into_stream()?;
Ok(OwnedMut::into_raw(stream.owned_mut()?))
}
unsafe fn to_arg(output: Self::Output) -> Self {
&mut *output
}
}