goscript-vm 0.1.0

goscript vm
Documentation
use super::instruction::*;
use super::value::*;
use futures_lite::future;
use rand::prelude::*;
use std::cell::RefCell;
use std::mem;
use std::rc::Rc;

#[derive(Clone, Debug)]
pub enum RendezvousState {
    Empty,
    Full(GosValue),
    Closed,
}

#[derive(Clone, Debug)]
pub enum Channel {
    Bounded(
        async_channel::Sender<GosValue>,
        async_channel::Receiver<GosValue>,
    ),
    Rendezvous(Rc<RefCell<RendezvousState>>),
}

impl Channel {
    pub fn new(cap: usize) -> Channel {
        if cap == 0 {
            Channel::Rendezvous(Rc::new(RefCell::new(RendezvousState::Empty)))
        } else {
            let (s, r) = async_channel::bounded(cap);
            Channel::Bounded(s, r)
        }
    }

    #[inline]
    pub fn len(&self) -> usize {
        match self {
            Channel::Bounded(s, _) => s.len(),
            Channel::Rendezvous(_) => 0,
        }
    }

    #[inline]
    pub fn cap(&self) -> usize {
        match self {
            Channel::Bounded(s, _) => s.capacity().unwrap(),
            Channel::Rendezvous(_) => 0,
        }
    }

    #[inline]
    pub fn close(&self) {
        match self {
            Channel::Bounded(s, _) => {
                s.close();
            }
            Channel::Rendezvous(state) => *state.borrow_mut() = RendezvousState::Closed,
        }
    }

    pub fn try_send(&self, v: GosValue) -> Result<(), async_channel::TrySendError<GosValue>> {
        match self {
            Channel::Bounded(s, _) => s.try_send(v),
            Channel::Rendezvous(state) => {
                let state_ref = state.borrow();
                let s: &RendezvousState = &state_ref;
                match s {
                    RendezvousState::Empty => {
                        drop(state_ref);
                        *state.borrow_mut() = RendezvousState::Full(v);
                        Ok(())
                    }
                    RendezvousState::Full(_) => Err(async_channel::TrySendError::Full(v)),
                    RendezvousState::Closed => Err(async_channel::TrySendError::Closed(v)),
                }
            }
        }
    }

    pub fn try_recv(&self) -> Result<GosValue, async_channel::TryRecvError> {
        match self {
            Channel::Bounded(_, r) => r.try_recv(),
            Channel::Rendezvous(state) => {
                let state_ref = state.borrow();
                let s: &RendezvousState = &state_ref;
                match s {
                    RendezvousState::Empty => Err(async_channel::TryRecvError::Empty),
                    RendezvousState::Full(_) => {
                        drop(state_ref);
                        let cur_state: &mut RendezvousState = &mut state.borrow_mut();
                        let full = mem::replace(cur_state, RendezvousState::Empty);
                        if let RendezvousState::Full(v) = full {
                            Ok(v)
                        } else {
                            unreachable!()
                        }
                    }
                    RendezvousState::Closed => Err(async_channel::TryRecvError::Closed),
                }
            }
        }
    }

    pub async fn send(&self, v: &GosValue) -> EmptyResult {
        loop {
            match self.try_send(v.clone()) {
                Ok(()) => return Ok(()),
                Err(e) => match e {
                    async_channel::TrySendError::Full(_) => {
                        future::yield_now().await;
                    }
                    async_channel::TrySendError::Closed(_) => {
                        return Err("channel closed!".to_string());
                    }
                },
            }
        }
    }

    pub async fn recv(&self) -> Option<GosValue> {
        loop {
            match self.try_recv() {
                Ok(v) => return Some(v),
                Err(e) => match e {
                    async_channel::TryRecvError::Empty => {
                        future::yield_now().await;
                    }
                    async_channel::TryRecvError::Closed => return None,
                },
            }
        }
    }
}

pub enum SelectComm {
    Send(GosValue, GosValue, OpIndex),
    Recv(GosValue, ValueType, OpIndex),
}

pub struct Selector {
    pub comms: Vec<SelectComm>,
    pub default_offset: Option<OpIndex>,
}

impl Selector {
    pub fn new(comms: Vec<SelectComm>, default_offset: Option<OpIndex>) -> Selector {
        Selector {
            comms: comms,
            default_offset: default_offset,
        }
    }

    pub async fn select(&self) -> RuntimeResult<(usize, Option<GosValue>)> {
        let count = self.comms.len();
        let mut rng = rand::thread_rng();
        loop {
            for (i, entry) in self
                .comms
                .iter()
                .enumerate()
                .choose_multiple(&mut rng, count)
            {
                match entry {
                    SelectComm::Send(c, val, _) => {
                        match c.as_channel().chan.try_send(val.clone()) {
                            Ok(_) => return Ok((i, None)),
                            Err(e) => match e {
                                async_channel::TrySendError::Full(_) => {}
                                async_channel::TrySendError::Closed(_) => {
                                    return Err("channel closed!".to_string());
                                }
                            },
                        }
                    }
                    SelectComm::Recv(c, _, _) => match c.as_channel().chan.try_recv() {
                        Ok(v) => return Ok((i, Some(v))),
                        Err(e) => match e {
                            async_channel::TryRecvError::Empty => {}
                            async_channel::TryRecvError::Closed => return Ok((i, None)),
                        },
                    },
                }
            }

            if let Some(_) = self.default_offset {
                return Ok((self.comms.len(), None));
            }
            future::yield_now().await;
        }
    }
}