1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//! The `std::future` module.

use crate::future::SelectFuture;
use crate::{ContextError, Future, Module, Shared, Stack, Value, VmError, VmErrorKind};

/// Construct the `std::future` module.
pub fn module() -> Result<Module, ContextError> {
    let mut module = Module::new(&["std", "future"]);
    module.ty(&["Future"]).build::<Future>()?;
    module.raw_fn(&["join"], raw_join)?;
    Ok(module)
}

async fn try_join_impl<'a, I, F>(values: I, len: usize, factory: F) -> Result<Value, VmError>
where
    I: IntoIterator<Item = &'a Value>,
    F: FnOnce(Vec<Value>) -> Value,
{
    use futures::StreamExt as _;

    let mut futures = futures::stream::FuturesUnordered::new();
    let mut results = Vec::with_capacity(len);

    for (index, value) in values.into_iter().enumerate() {
        let future = match value {
            Value::Future(future) => future.clone().owned_mut()?,
            value => return Err(VmError::bad_argument::<Future>(index, value)?),
        };

        futures.push(SelectFuture::new(index, future));
        results.push(Value::Unit);
    }

    while !futures.is_empty() {
        let (index, value) = futures.next().await.unwrap()?;
        *results.get_mut(index).unwrap() = value;
    }

    Ok(factory(results))
}

async fn join(value: Value) -> Result<Value, VmError> {
    match value {
        Value::Tuple(tuple) => {
            let tuple = tuple.borrow_ref()?;
            Ok(try_join_impl(tuple.iter(), tuple.len(), Value::tuple).await?)
        }
        Value::Vec(vec) => {
            let vec = vec.borrow_ref()?;
            Ok(try_join_impl(vec.iter(), vec.len(), Value::vec).await?)
        }
        value => Err(VmError::bad_argument::<Vec<Value>>(0, &value)?),
    }
}

/// The join implementation.
fn raw_join(stack: &mut Stack, args: usize) -> Result<(), VmError> {
    if args != 1 {
        return Err(VmError::from(VmErrorKind::BadArgumentCount {
            actual: args,
            expected: 1,
        }));
    }

    let value = stack.pop()?;
    let value = Value::Future(Shared::new(Future::new(join(value))));
    stack.push(value);
    Ok(())
}