use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;
use crate::value::{VmError, VmStream, VmTaskHandle, VmValue};
fn parallel_cap_from_value(cap_val: &VmValue, task_count: usize) -> Result<Option<usize>, VmError> {
match cap_val {
VmValue::Int(n) => {
if *n <= 0 {
Ok(None)
} else {
Ok(Some((*n as usize).min(task_count.max(1))))
}
}
VmValue::Nil => Ok(None),
other => Err(VmError::TypeError(format!(
"parallel max_concurrent must be an int; got {}",
other.type_name()
))),
}
}
async fn run_capped_ordered<F, T>(
futures: Vec<F>,
cap: Option<usize>,
error_label: &'static str,
) -> Result<Vec<T>, VmError>
where
F: std::future::Future<Output = T> + 'static,
T: 'static,
{
let total = futures.len();
if total == 0 {
return Ok(Vec::new());
}
let mut results: Vec<Option<T>> = (0..total).map(|_| None).collect();
let slot = cap.unwrap_or(total).max(1).min(total);
let mut pending: VecDeque<(usize, F)> = futures.into_iter().enumerate().collect();
let mut join_set: tokio::task::JoinSet<(usize, T)> = tokio::task::JoinSet::new();
while join_set.len() < slot {
let Some((i, fut)) = pending.pop_front() else {
break;
};
join_set.spawn_local(async move { (i, fut.await) });
}
while let Some(joined) = join_set.join_next().await {
let (index, value) = joined.map_err(|e| VmError::Runtime(format!("{error_label}: {e}")))?;
results[index] = Some(value);
if let Some((i, fut)) = pending.pop_front() {
join_set.spawn_local(async move { (i, fut.await) });
}
}
Ok(results
.into_iter()
.map(|slot| slot.expect("run_capped_ordered: missing result slot"))
.collect())
}
async fn stream_capped_unordered<F, T>(
futures: Vec<F>,
cap: Option<usize>,
sender: tokio::sync::mpsc::Sender<Result<T, VmError>>,
error_label: &'static str,
) where
F: std::future::Future<Output = Result<T, VmError>> + 'static,
T: 'static,
{
let total = futures.len();
if total == 0 {
return;
}
let slot = cap.unwrap_or(total).max(1).min(total);
let mut pending: VecDeque<F> = futures.into_iter().collect();
let mut join_set: tokio::task::JoinSet<Result<T, VmError>> = tokio::task::JoinSet::new();
while join_set.len() < slot {
let Some(fut) = pending.pop_front() else {
break;
};
join_set.spawn_local(fut);
}
while let Some(joined) = join_set.join_next().await {
let value = match joined {
Ok(Ok(value)) => Ok(value),
Ok(Err(error)) => Err(error),
Err(error) => Err(VmError::Runtime(format!("{error_label}: {error}"))),
};
let should_stop = value.is_err();
if sender.send(value).await.is_err() || should_stop {
return;
}
if let Some(fut) = pending.pop_front() {
join_set.spawn_local(fut);
}
}
}
impl super::super::Vm {
pub(super) async fn execute_parallel(&mut self) -> Result<(), VmError> {
let _par_span =
super::super::ScopeSpan::new(crate::tracing::SpanKind::Parallel, "parallel".into());
let closure = self.pop()?;
let count_val = self.pop()?;
let cap_val = self.pop()?;
let count = match &count_val {
VmValue::Int(n) => (*n).max(0) as usize,
_ => 0,
};
let cap = parallel_cap_from_value(&cap_val, count)?;
if let VmValue::Closure(closure) = closure {
self.runtime_context_counter += 1;
let task_group_id = format!(
"{}:parallel:{}",
self.runtime_context.task_id, self.runtime_context_counter
);
let mut futures: Vec<_> = Vec::with_capacity(count);
for i in 0..count {
let mut child = self.child_vm();
child.runtime_context = self.runtime_context.child_task(
format!("{task_group_id}:{i}"),
"parallel",
Some(task_group_id.clone()),
);
let closure = closure.clone();
futures.push(async move {
let result = child
.call_closure(&closure, &[VmValue::Int(i as i64)])
.await?;
Ok::<(VmValue, String), VmError>((result, std::mem::take(&mut child.output)))
});
}
let joined = run_capped_ordered(futures, cap, "Parallel task error").await?;
let mut results = Vec::with_capacity(count);
for entry in joined {
let (val, task_output) = entry?;
self.output.push_str(&task_output);
results.push(val);
}
self.stack.push(VmValue::List(Rc::new(results)));
} else {
self.stack.push(VmValue::Nil);
}
Ok(())
}
pub(super) async fn execute_parallel_map(&mut self) -> Result<(), VmError> {
let closure = self.pop()?;
let list_val = self.pop()?;
let cap_val = self.pop()?;
match (&list_val, &closure) {
(VmValue::List(items), VmValue::Closure(closure)) => {
let len = items.len();
let cap = parallel_cap_from_value(&cap_val, len)?;
self.runtime_context_counter += 1;
let task_group_id = format!(
"{}:parallel_each:{}",
self.runtime_context.task_id, self.runtime_context_counter
);
let mut futures = Vec::with_capacity(len);
for (i, item) in items.iter().enumerate() {
let mut child = self.child_vm();
child.runtime_context = self.runtime_context.child_task(
format!("{task_group_id}:{i}"),
"parallel each",
Some(task_group_id.clone()),
);
let closure = closure.clone();
let item = item.clone();
futures.push(async move {
let result = child.call_closure(&closure, &[item]).await?;
Ok::<(VmValue, String), VmError>((
result,
std::mem::take(&mut child.output),
))
});
}
let joined = run_capped_ordered(futures, cap, "Parallel map error").await?;
let mut results = Vec::with_capacity(len);
for entry in joined {
let (val, task_output) = entry?;
self.output.push_str(&task_output);
results.push(val);
}
self.stack.push(VmValue::List(Rc::new(results)));
}
_ => self.stack.push(VmValue::Nil),
}
Ok(())
}
pub(super) async fn execute_parallel_map_stream(&mut self) -> Result<(), VmError> {
let closure = self.pop()?;
let list_val = self.pop()?;
let cap_val = self.pop()?;
match (&list_val, &closure) {
(VmValue::List(items), VmValue::Closure(closure)) => {
let len = items.len();
let cap = parallel_cap_from_value(&cap_val, len)?;
self.runtime_context_counter += 1;
let task_group_id = format!(
"{}:parallel_each_stream:{}",
self.runtime_context.task_id, self.runtime_context_counter
);
let mut futures = Vec::with_capacity(len);
for (i, item) in items.iter().enumerate() {
let mut child = self.child_vm();
child.runtime_context = self.runtime_context.child_task(
format!("{task_group_id}:{i}"),
"parallel each as stream",
Some(task_group_id.clone()),
);
let closure = closure.clone();
let item = item.clone();
futures.push(async move { child.call_closure(&closure, &[item]).await });
}
let (tx, rx) = tokio::sync::mpsc::channel::<Result<VmValue, VmError>>(1);
tokio::task::spawn_local(stream_capped_unordered(
futures,
cap,
tx,
"Parallel map stream error",
));
self.stack.push(VmValue::Stream(VmStream {
done: Rc::new(std::cell::Cell::new(false)),
receiver: Rc::new(tokio::sync::Mutex::new(rx)),
cancel: None,
}));
}
_ => self.stack.push(VmValue::Nil),
}
Ok(())
}
pub(super) async fn execute_parallel_settle(&mut self) -> Result<(), VmError> {
let closure = self.pop()?;
let list_val = self.pop()?;
let cap_val = self.pop()?;
match (&list_val, &closure) {
(VmValue::List(items), VmValue::Closure(closure)) => {
let len = items.len();
let cap = parallel_cap_from_value(&cap_val, len)?;
self.runtime_context_counter += 1;
let task_group_id = format!(
"{}:parallel_settle:{}",
self.runtime_context.task_id, self.runtime_context_counter
);
let mut futures = Vec::with_capacity(len);
for (i, item) in items.iter().enumerate() {
let mut child = self.child_vm();
child.runtime_context = self.runtime_context.child_task(
format!("{task_group_id}:{i}"),
"parallel settle",
Some(task_group_id.clone()),
);
let closure = closure.clone();
let item = item.clone();
futures.push(async move {
let result = child.call_closure(&closure, &[item]).await;
let output = std::mem::take(&mut child.output);
(result, output)
});
}
let joined = run_capped_ordered(futures, cap, "Parallel settle error").await?;
let mut results = Vec::with_capacity(len);
let mut succeeded = 0i64;
let mut failed = 0i64;
for (result, task_output) in joined {
self.output.push_str(&task_output);
match result {
Ok(val) => {
succeeded += 1;
results.push(VmValue::enum_variant("Result", "Ok", vec![val]));
}
Err(e) => {
failed += 1;
results.push(VmValue::enum_variant(
"Result",
"Err",
vec![VmValue::String(Rc::from(e.to_string()))],
));
}
}
}
let mut dict = BTreeMap::new();
dict.insert("results".to_string(), VmValue::List(Rc::new(results)));
dict.insert("succeeded".to_string(), VmValue::Int(succeeded));
dict.insert("failed".to_string(), VmValue::Int(failed));
self.stack.push(VmValue::Dict(Rc::new(dict)));
}
_ => self.stack.push(VmValue::Nil),
}
Ok(())
}
pub(super) fn execute_spawn(&mut self) -> Result<(), VmError> {
let _spawn_span =
super::super::ScopeSpan::new(crate::tracing::SpanKind::Spawn, "spawn".into());
let closure = self.pop()?;
if let VmValue::Closure(closure) = closure {
self.task_counter += 1;
let task_id = format!("vm_task_{}", self.task_counter);
let mut child = self.child_vm();
child.runtime_context = self.runtime_context.child_task(
format!(
"{}:spawn:{}",
self.runtime_context.task_id, self.task_counter
),
"spawn",
None,
);
let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
child.cancel_token = Some(cancel_token.clone());
let handle = tokio::task::spawn_local(async move {
let result = child.call_closure(&closure, &[]).await?;
Ok((result, std::mem::take(&mut child.output)))
});
self.spawned_tasks.insert(
task_id.clone(),
VmTaskHandle {
handle,
cancel_token,
},
);
self.stack.push(VmValue::TaskHandle(task_id));
} else {
self.stack.push(VmValue::Nil);
}
Ok(())
}
pub(super) async fn execute_sync_mutex_enter(&mut self) -> Result<(), VmError> {
let key = {
let frame = self.frames.last_mut().unwrap();
let idx = frame.chunk.read_u16(frame.ip) as usize;
frame.ip += 2;
Self::const_string(&frame.chunk.constants[idx])?
};
let permit = self
.sync_runtime
.acquire("mutex", &key, 1, 1, None, self.cancel_token.clone())
.await?
.ok_or_else(|| VmError::Runtime(format!("mutex '{key}' timed out")))?;
self.held_sync_guards
.push(crate::synchronization::VmSyncHeldGuard {
_permit: permit,
frame_depth: self.frames.len(),
env_scope_depth: self.env.scope_depth(),
});
Ok(())
}
pub(super) fn execute_deadline_setup(&mut self) -> Result<(), VmError> {
let dur_val = self.pop()?;
let ms = match &dur_val {
VmValue::Duration(ms) => (*ms).max(0) as u64,
VmValue::Int(n) => (*n).max(0) as u64,
_ => 30_000,
};
let deadline = Instant::now() + std::time::Duration::from_millis(ms);
self.deadlines.push((deadline, self.frames.len()));
Ok(())
}
pub(super) fn execute_deadline_end(&mut self) {
self.deadlines.pop();
}
}