harn-vm 0.7.58

Async bytecode virtual machine for the Harn programming language
Documentation
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;

use crate::value::{VmError, VmStream, VmTaskHandle, VmValue};

/// Decode the `cap_val` stack operand pushed by `parallel ... with
/// { max_concurrent: N }`. A value of `0` (emitted when no option was
/// given) and any negative integer both mean "unlimited"; returning
/// `None` tells callers to run all tasks without a slot limit. Any
/// non-integer is rejected as a type error — the parser should have
/// already caught this, so hitting it implies a VM/compiler drift.
fn parallel_cap_from_value(cap_val: &VmValue, task_count: usize) -> Result<Option<usize>, VmError> {
    match cap_val {
        VmValue::Int(n) => {
            if *n <= 0 {
                Ok(None)
            } else {
                Ok(Some((*n as usize).min(task_count.max(1))))
            }
        }
        VmValue::Nil => Ok(None),
        other => Err(VmError::TypeError(format!(
            "parallel max_concurrent must be an int; got {}",
            other.type_name()
        ))),
    }
}

/// Run `futures` concurrently, capped to at most `cap` in-flight tasks
/// at any moment (or unlimited when `cap` is `None`). Results come back
/// in source order so callers can index by original position. A single
/// join error fails the whole batch, mirroring the pre-cap behavior of
/// the `Parallel*` opcodes.
async fn run_capped_ordered<F, T>(
    futures: Vec<F>,
    cap: Option<usize>,
    error_label: &'static str,
) -> Result<Vec<T>, VmError>
where
    F: std::future::Future<Output = T> + 'static,
    T: 'static,
{
    let total = futures.len();
    if total == 0 {
        return Ok(Vec::new());
    }
    let mut results: Vec<Option<T>> = (0..total).map(|_| None).collect();
    let slot = cap.unwrap_or(total).max(1).min(total);
    let mut pending: VecDeque<(usize, F)> = futures.into_iter().enumerate().collect();
    let mut join_set: tokio::task::JoinSet<(usize, T)> = tokio::task::JoinSet::new();

    while join_set.len() < slot {
        let Some((i, fut)) = pending.pop_front() else {
            break;
        };
        join_set.spawn_local(async move { (i, fut.await) });
    }

    while let Some(joined) = join_set.join_next().await {
        let (index, value) = joined.map_err(|e| VmError::Runtime(format!("{error_label}: {e}")))?;
        results[index] = Some(value);
        if let Some((i, fut)) = pending.pop_front() {
            join_set.spawn_local(async move { (i, fut.await) });
        }
    }

    Ok(results
        .into_iter()
        .map(|slot| slot.expect("run_capped_ordered: missing result slot"))
        .collect())
}

async fn stream_capped_unordered<F, T>(
    futures: Vec<F>,
    cap: Option<usize>,
    sender: tokio::sync::mpsc::Sender<Result<T, VmError>>,
    error_label: &'static str,
) where
    F: std::future::Future<Output = Result<T, VmError>> + 'static,
    T: 'static,
{
    let total = futures.len();
    if total == 0 {
        return;
    }
    let slot = cap.unwrap_or(total).max(1).min(total);
    let mut pending: VecDeque<F> = futures.into_iter().collect();
    let mut join_set: tokio::task::JoinSet<Result<T, VmError>> = tokio::task::JoinSet::new();

    while join_set.len() < slot {
        let Some(fut) = pending.pop_front() else {
            break;
        };
        join_set.spawn_local(fut);
    }

    while let Some(joined) = join_set.join_next().await {
        let value = match joined {
            Ok(Ok(value)) => Ok(value),
            Ok(Err(error)) => Err(error),
            Err(error) => Err(VmError::Runtime(format!("{error_label}: {error}"))),
        };
        let should_stop = value.is_err();
        if sender.send(value).await.is_err() || should_stop {
            return;
        }
        if let Some(fut) = pending.pop_front() {
            join_set.spawn_local(fut);
        }
    }
}

impl super::super::Vm {
    pub(super) async fn execute_parallel(&mut self) -> Result<(), VmError> {
        let _par_span =
            super::super::ScopeSpan::new(crate::tracing::SpanKind::Parallel, "parallel".into());
        let closure = self.pop()?;
        let count_val = self.pop()?;
        let cap_val = self.pop()?;
        let count = match &count_val {
            VmValue::Int(n) => (*n).max(0) as usize,
            _ => 0,
        };
        let cap = parallel_cap_from_value(&cap_val, count)?;
        if let VmValue::Closure(closure) = closure {
            self.runtime_context_counter += 1;
            let task_group_id = format!(
                "{}:parallel:{}",
                self.runtime_context.task_id, self.runtime_context_counter
            );
            let mut futures: Vec<_> = Vec::with_capacity(count);
            for i in 0..count {
                let mut child = self.child_vm();
                child.runtime_context = self.runtime_context.child_task(
                    format!("{task_group_id}:{i}"),
                    "parallel",
                    Some(task_group_id.clone()),
                );
                let closure = closure.clone();
                futures.push(async move {
                    let result = child
                        .call_closure(&closure, &[VmValue::Int(i as i64)])
                        .await?;
                    Ok::<(VmValue, String), VmError>((result, std::mem::take(&mut child.output)))
                });
            }
            let joined = run_capped_ordered(futures, cap, "Parallel task error").await?;
            let mut results = Vec::with_capacity(count);
            for entry in joined {
                let (val, task_output) = entry?;
                self.output.push_str(&task_output);
                results.push(val);
            }
            self.stack.push(VmValue::List(Rc::new(results)));
        } else {
            self.stack.push(VmValue::Nil);
        }
        Ok(())
    }

    pub(super) async fn execute_parallel_map(&mut self) -> Result<(), VmError> {
        let closure = self.pop()?;
        let list_val = self.pop()?;
        let cap_val = self.pop()?;
        match (&list_val, &closure) {
            (VmValue::List(items), VmValue::Closure(closure)) => {
                let len = items.len();
                let cap = parallel_cap_from_value(&cap_val, len)?;
                self.runtime_context_counter += 1;
                let task_group_id = format!(
                    "{}:parallel_each:{}",
                    self.runtime_context.task_id, self.runtime_context_counter
                );
                let mut futures = Vec::with_capacity(len);
                for (i, item) in items.iter().enumerate() {
                    let mut child = self.child_vm();
                    child.runtime_context = self.runtime_context.child_task(
                        format!("{task_group_id}:{i}"),
                        "parallel each",
                        Some(task_group_id.clone()),
                    );
                    let closure = closure.clone();
                    let item = item.clone();
                    futures.push(async move {
                        let result = child.call_closure(&closure, &[item]).await?;
                        Ok::<(VmValue, String), VmError>((
                            result,
                            std::mem::take(&mut child.output),
                        ))
                    });
                }
                let joined = run_capped_ordered(futures, cap, "Parallel map error").await?;
                let mut results = Vec::with_capacity(len);
                for entry in joined {
                    let (val, task_output) = entry?;
                    self.output.push_str(&task_output);
                    results.push(val);
                }
                self.stack.push(VmValue::List(Rc::new(results)));
            }
            _ => self.stack.push(VmValue::Nil),
        }
        Ok(())
    }

    pub(super) async fn execute_parallel_map_stream(&mut self) -> Result<(), VmError> {
        let closure = self.pop()?;
        let list_val = self.pop()?;
        let cap_val = self.pop()?;
        match (&list_val, &closure) {
            (VmValue::List(items), VmValue::Closure(closure)) => {
                let len = items.len();
                let cap = parallel_cap_from_value(&cap_val, len)?;
                self.runtime_context_counter += 1;
                let task_group_id = format!(
                    "{}:parallel_each_stream:{}",
                    self.runtime_context.task_id, self.runtime_context_counter
                );
                let mut futures = Vec::with_capacity(len);
                for (i, item) in items.iter().enumerate() {
                    let mut child = self.child_vm();
                    child.runtime_context = self.runtime_context.child_task(
                        format!("{task_group_id}:{i}"),
                        "parallel each as stream",
                        Some(task_group_id.clone()),
                    );
                    let closure = closure.clone();
                    let item = item.clone();
                    futures.push(async move { child.call_closure(&closure, &[item]).await });
                }

                let (tx, rx) = tokio::sync::mpsc::channel::<Result<VmValue, VmError>>(1);
                tokio::task::spawn_local(stream_capped_unordered(
                    futures,
                    cap,
                    tx,
                    "Parallel map stream error",
                ));
                self.stack.push(VmValue::Stream(VmStream {
                    done: Rc::new(std::cell::Cell::new(false)),
                    receiver: Rc::new(tokio::sync::Mutex::new(rx)),
                    cancel: None,
                }));
            }
            _ => self.stack.push(VmValue::Nil),
        }
        Ok(())
    }

    pub(super) async fn execute_parallel_settle(&mut self) -> Result<(), VmError> {
        let closure = self.pop()?;
        let list_val = self.pop()?;
        let cap_val = self.pop()?;
        match (&list_val, &closure) {
            (VmValue::List(items), VmValue::Closure(closure)) => {
                let len = items.len();
                let cap = parallel_cap_from_value(&cap_val, len)?;
                self.runtime_context_counter += 1;
                let task_group_id = format!(
                    "{}:parallel_settle:{}",
                    self.runtime_context.task_id, self.runtime_context_counter
                );
                let mut futures = Vec::with_capacity(len);
                for (i, item) in items.iter().enumerate() {
                    let mut child = self.child_vm();
                    child.runtime_context = self.runtime_context.child_task(
                        format!("{task_group_id}:{i}"),
                        "parallel settle",
                        Some(task_group_id.clone()),
                    );
                    let closure = closure.clone();
                    let item = item.clone();
                    futures.push(async move {
                        let result = child.call_closure(&closure, &[item]).await;
                        let output = std::mem::take(&mut child.output);
                        (result, output)
                    });
                }
                let joined = run_capped_ordered(futures, cap, "Parallel settle error").await?;
                let mut results = Vec::with_capacity(len);
                let mut succeeded = 0i64;
                let mut failed = 0i64;
                for (result, task_output) in joined {
                    self.output.push_str(&task_output);
                    match result {
                        Ok(val) => {
                            succeeded += 1;
                            results.push(VmValue::enum_variant("Result", "Ok", vec![val]));
                        }
                        Err(e) => {
                            failed += 1;
                            results.push(VmValue::enum_variant(
                                "Result",
                                "Err",
                                vec![VmValue::String(Rc::from(e.to_string()))],
                            ));
                        }
                    }
                }
                let mut dict = BTreeMap::new();
                dict.insert("results".to_string(), VmValue::List(Rc::new(results)));
                dict.insert("succeeded".to_string(), VmValue::Int(succeeded));
                dict.insert("failed".to_string(), VmValue::Int(failed));
                self.stack.push(VmValue::Dict(Rc::new(dict)));
            }
            _ => self.stack.push(VmValue::Nil),
        }
        Ok(())
    }

    pub(super) fn execute_spawn(&mut self) -> Result<(), VmError> {
        let _spawn_span =
            super::super::ScopeSpan::new(crate::tracing::SpanKind::Spawn, "spawn".into());
        let closure = self.pop()?;
        if let VmValue::Closure(closure) = closure {
            self.task_counter += 1;
            let task_id = format!("vm_task_{}", self.task_counter);
            let mut child = self.child_vm();
            child.runtime_context = self.runtime_context.child_task(
                format!(
                    "{}:spawn:{}",
                    self.runtime_context.task_id, self.task_counter
                ),
                "spawn",
                None,
            );
            let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
            child.cancel_token = Some(cancel_token.clone());
            let handle = tokio::task::spawn_local(async move {
                let result = child.call_closure(&closure, &[]).await?;
                Ok((result, std::mem::take(&mut child.output)))
            });
            self.spawned_tasks.insert(
                task_id.clone(),
                VmTaskHandle {
                    handle,
                    cancel_token,
                },
            );
            self.stack.push(VmValue::TaskHandle(task_id));
        } else {
            self.stack.push(VmValue::Nil);
        }
        Ok(())
    }

    pub(super) async fn execute_sync_mutex_enter(&mut self) -> Result<(), VmError> {
        let key = {
            let frame = self.frames.last_mut().unwrap();
            let idx = frame.chunk.read_u16(frame.ip) as usize;
            frame.ip += 2;
            Self::const_string(&frame.chunk.constants[idx])?
        };
        let permit = self
            .sync_runtime
            .acquire("mutex", &key, 1, 1, None, self.cancel_token.clone())
            .await?
            .ok_or_else(|| VmError::Runtime(format!("mutex '{key}' timed out")))?;
        self.held_sync_guards
            .push(crate::synchronization::VmSyncHeldGuard {
                _permit: permit,
                frame_depth: self.frames.len(),
                env_scope_depth: self.env.scope_depth(),
            });
        Ok(())
    }

    pub(super) fn execute_deadline_setup(&mut self) -> Result<(), VmError> {
        let dur_val = self.pop()?;
        let ms = match &dur_val {
            VmValue::Duration(ms) => (*ms).max(0) as u64,
            VmValue::Int(n) => (*n).max(0) as u64,
            _ => 30_000,
        };
        let deadline = Instant::now() + std::time::Duration::from_millis(ms);
        self.deadlines.push((deadline, self.frames.len()));
        Ok(())
    }

    pub(super) fn execute_deadline_end(&mut self) {
        self.deadlines.pop();
    }
}