1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use crate::future::SelectFuture;
use crate::{ContextError, Future, Module, Shared, Stack, Value, VmError, VmErrorKind};
pub fn module() -> Result<Module, ContextError> {
let mut module = Module::new(&["std", "future"]);
module.ty(&["Future"]).build::<Future>()?;
module.raw_fn(&["join"], raw_join)?;
Ok(module)
}
async fn try_join_impl<'a, I, F>(values: I, len: usize, factory: F) -> Result<Value, VmError>
where
I: IntoIterator<Item = &'a Value>,
F: FnOnce(Vec<Value>) -> Value,
{
use futures::StreamExt as _;
let mut futures = futures::stream::FuturesUnordered::new();
let mut results = Vec::with_capacity(len);
for (index, value) in values.into_iter().enumerate() {
let future = match value {
Value::Future(future) => future.clone().owned_mut()?,
value => return Err(VmError::bad_argument::<Future>(index, value)?),
};
futures.push(SelectFuture::new(index, future));
results.push(Value::Unit);
}
while !futures.is_empty() {
let (index, value) = futures.next().await.unwrap()?;
*results.get_mut(index).unwrap() = value;
}
Ok(factory(results))
}
async fn join(value: Value) -> Result<Value, VmError> {
match value {
Value::Tuple(tuple) => {
let tuple = tuple.borrow_ref()?;
Ok(try_join_impl(tuple.iter(), tuple.len(), Value::tuple).await?)
}
Value::Vec(vec) => {
let vec = vec.borrow_ref()?;
Ok(try_join_impl(vec.iter(), vec.len(), Value::vec).await?)
}
value => Err(VmError::bad_argument::<Vec<Value>>(0, &value)?),
}
}
fn raw_join(stack: &mut Stack, args: usize) -> Result<(), VmError> {
if args != 1 {
return Err(VmError::from(VmErrorKind::BadArgumentCount {
actual: args,
expected: 1,
}));
}
let value = stack.pop()?;
let value = Value::Future(Shared::new(Future::new(join(value))));
stack.push(value);
Ok(())
}