1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
use crate::{
    FromValue, GeneratorState, InstallWith, Mut, Named, RawMut, RawRef, RawStr, Ref, Shared,
    UnsafeFromValue, Value, Vm, VmError, VmErrorKind, VmExecution,
};
use std::fmt;
use std::mem;

/// A stream with a stored virtual machine.
pub struct Stream {
    execution: Option<VmExecution>,
    first: bool,
}

impl Stream {
    /// Construct a stream from a virtual machine.
    pub(crate) fn new(vm: Vm) -> Self {
        Self {
            execution: Some(VmExecution::new(vm)),
            first: true,
        }
    }

    /// Get the next value produced by this stream.
    pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
        Ok(match self.resume(Value::Unit).await? {
            GeneratorState::Yielded(value) => Some(value),
            GeneratorState::Complete(_) => None,
        })
    }

    /// Get the next value produced by this stream.
    pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
        let execution = self
            .execution
            .as_mut()
            .ok_or_else(|| VmErrorKind::GeneratorComplete)?;

        if !mem::take(&mut self.first) {
            execution.vm_mut()?.stack_mut().push(value);
        }

        let state = execution.async_resume().await?;

        if state.is_complete() {
            self.execution = None;
        }

        Ok(state)
    }
}

impl fmt::Debug for Stream {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Stream")
            .field("completed", &self.execution.is_none())
            .finish()
    }
}

impl FromValue for Shared<Stream> {
    fn from_value(value: Value) -> Result<Self, VmError> {
        Ok(value.into_stream()?)
    }
}

impl FromValue for Stream {
    fn from_value(value: Value) -> Result<Self, VmError> {
        let stream = value.into_stream()?;
        Ok(stream.take()?)
    }
}

impl UnsafeFromValue for &Stream {
    type Output = *const Stream;
    type Guard = RawRef;

    fn from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
        let stream = value.into_stream()?;
        let (stream, guard) = Ref::into_raw(stream.into_ref()?);
        Ok((stream, guard))
    }

    unsafe fn unsafe_coerce(output: Self::Output) -> Self {
        &*output
    }
}

impl UnsafeFromValue for &mut Stream {
    type Output = *mut Stream;
    type Guard = RawMut;

    fn from_value(value: Value) -> Result<(Self::Output, Self::Guard), VmError> {
        let stream = value.into_stream()?;
        Ok(Mut::into_raw(stream.into_mut()?))
    }

    unsafe fn unsafe_coerce(output: Self::Output) -> Self {
        &mut *output
    }
}

impl Named for Stream {
    const BASE_NAME: RawStr = RawStr::from_str("Stream");
}

impl InstallWith for Stream {}