runestick/
stream.rs

1use crate::{
2    FromValue, GeneratorState, InstallWith, Mut, Named, RawMut, RawRef, RawStr, Ref, Shared,
3    UnsafeFromValue, Value, Vm, VmError, VmErrorKind, VmExecution,
4};
5use std::fmt;
6use std::mem;
7
8/// A stream with a stored virtual machine.
9pub struct Stream {
10    execution: Option<VmExecution>,
11    first: bool,
12}
13
14impl Stream {
15    /// Construct a stream from a virtual machine.
16    pub(crate) fn new(vm: Vm) -> Self {
17        Self {
18            execution: Some(VmExecution::new(vm)),
19            first: true,
20        }
21    }
22
23    /// Get the next value produced by this stream.
24    pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
25        Ok(match self.resume(Value::Unit).await? {
26            GeneratorState::Yielded(value) => Some(value),
27            GeneratorState::Complete(_) => None,
28        })
29    }
30
31    /// Get the next value produced by this stream.
32    pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
33        let execution = self
34            .execution
35            .as_mut()
36            .ok_or_else(|| VmErrorKind::GeneratorComplete)?;
37
38        if !mem::take(&mut self.first) {
39            execution.vm_mut()?.stack_mut().push(value);
40        }
41
42        let state = execution.async_resume().await?;
43
44        if state.is_complete() {
45            self.execution = None;
46        }
47
48        Ok(state)
49    }
50}
51
52impl fmt::Debug for Stream {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("Stream")
55            .field("completed", &self.execution.is_none())
56            .finish()
57    }
58}
59
60impl FromValue for Shared<Stream> {
61    fn from_value(value: Value) -> Result<Self, VmError> {
62        Ok(value.into_stream()?)
63    }
64}
65
66impl FromValue for Stream {
67    fn from_value(value: Value) -> Result<Self, VmError> {
68        let stream = value.into_stream()?;
69        Ok(stream.take()?)
70    }
71}
72
73impl UnsafeFromValue for &Stream {
74    type Output = *const Stream;
75    type Guard = RawRef;
76
77    fn from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
78        let stream = value.into_stream()?;
79        let (stream, guard) = Ref::into_raw(stream.into_ref()?);
80        Ok((stream, guard))
81    }
82
83    unsafe fn unsafe_coerce(output: Self::Output) -> Self {
84        &*output
85    }
86}
87
88impl UnsafeFromValue for &mut Stream {
89    type Output = *mut Stream;
90    type Guard = RawMut;
91
92    fn from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
93        let stream = value.into_stream()?;
94        Ok(Mut::into_raw(stream.into_mut()?))
95    }
96
97    unsafe fn unsafe_coerce(output: Self::Output) -> Self {
98        &mut *output
99    }
100}
101
102impl Named for Stream {
103    const BASE_NAME: RawStr = RawStr::from_str("Stream");
104}
105
106impl InstallWith for Stream {}