harn-vm 0.5.76

Async bytecode virtual machine for the Harn programming language
Documentation
use super::*;
use crate::orchestration::{save_run_record, RunChildRecord, RunRecord};
use crate::tracing::{set_tracing_enabled, span_end, span_start, SpanKind};
use std::cell::Cell;
use std::rc::Rc;

#[test]
fn classify_stage_outcome_fails_when_agent_loop_is_stuck() {
    let (outcome, branch) = classify_stage_outcome(
        "stage",
        &serde_json::json!({"status": "stuck"}),
        &serde_json::json!({"ok": true}),
    );
    assert_eq!(outcome, "stuck");
    assert_eq!(branch.as_deref(), Some("failed"));
}

#[test]
fn classify_stage_outcome_accepts_done_status_for_mutating_stage() {
    let (outcome, branch) = classify_stage_outcome(
        "stage",
        &serde_json::json!({"status": "done"}),
        &serde_json::json!({"ok": true}),
    );
    assert_eq!(outcome, "success");
    assert_eq!(branch.as_deref(), Some("success"));
}

#[test]
fn classify_stage_outcome_fails_when_required_write_never_succeeds() {
    let (outcome, branch) = classify_stage_outcome(
        "stage",
        &serde_json::json!({"status": "failed"}),
        &serde_json::json!({"ok": true}),
    );
    assert_eq!(outcome, "failed");
    assert_eq!(branch.as_deref(), Some("failed"));
}

#[test]
fn load_run_tree_recurses_into_child_runs() {
    let dir = std::env::temp_dir().join(format!("harn-run-tree-{}", uuid::Uuid::now_v7()));
    std::fs::create_dir_all(&dir).unwrap();
    let child_path = dir.join("child.json");
    let parent_path = dir.join("parent.json");

    let child = RunRecord {
        id: "child".to_string(),
        workflow_id: "wf".to_string(),
        root_run_id: Some("root".to_string()),
        status: "completed".to_string(),
        ..Default::default()
    };
    let parent = RunRecord {
        id: "parent".to_string(),
        workflow_id: "wf".to_string(),
        root_run_id: Some("root".to_string()),
        status: "completed".to_string(),
        child_runs: vec![RunChildRecord {
            worker_id: "worker_1".to_string(),
            worker_name: "worker".to_string(),
            run_id: Some("child".to_string()),
            run_path: Some(child_path.to_string_lossy().to_string()),
            ..Default::default()
        }],
        ..Default::default()
    };

    save_run_record(&child, Some(child_path.to_str().unwrap())).unwrap();
    save_run_record(&parent, Some(parent_path.to_str().unwrap())).unwrap();

    let tree = load_run_tree(parent_path.to_str().unwrap()).unwrap();
    assert_eq!(tree["run"]["id"], "parent");
    assert_eq!(tree["children"][0]["run"]["id"], "child");

    let _ = std::fs::remove_dir_all(&dir);
}

#[test]
fn snapshot_trace_spans_returns_completed_trace_tree() {
    set_tracing_enabled(true);
    let parent = span_start(SpanKind::Pipeline, "workflow".to_string());
    let child = span_start(SpanKind::ToolCall, "read".to_string());
    span_end(child);
    span_end(parent);

    let spans = snapshot_trace_spans();
    assert_eq!(spans.len(), 2);
    assert_eq!(spans[0].kind, "tool_call");
    assert_eq!(spans[0].parent_id, Some(parent));
    assert_eq!(spans[1].kind, "pipeline");

    set_tracing_enabled(false);
}

#[tokio::test(flavor = "current_thread")]
async fn execute_join_policy_stops_after_first_completion() {
    tokio::task::LocalSet::new()
        .run_until(async {
            let tasks: Vec<LocalTask<i32>> = vec![
                Box::pin(async {
                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
                    1
                }),
                Box::pin(async {
                    tokio::time::sleep(std::time::Duration::from_millis(5)).await;
                    2
                }),
            ];
            let started = std::time::Instant::now();
            let results = execute_join_policy(tasks, "first", None, None).await;
            assert_eq!(results.len(), 1);
            assert!(started.elapsed() < std::time::Duration::from_millis(40));
            assert_eq!(results[0].as_ref().ok().copied(), Some(2));
        })
        .await;
}

#[tokio::test(flavor = "current_thread")]
async fn execute_join_policy_honors_quorum_and_concurrency_limit() {
    tokio::task::LocalSet::new()
        .run_until(async {
            let active = Rc::new(Cell::new(0usize));
            let max_seen = Rc::new(Cell::new(0usize));
            let tasks = (0..5)
                .map(|value| {
                    let active = active.clone();
                    let max_seen = max_seen.clone();
                    Box::pin(async move {
                        active.set(active.get() + 1);
                        max_seen.set(max_seen.get().max(active.get()));
                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
                        active.set(active.get().saturating_sub(1));
                        value
                    }) as LocalTask<i32>
                })
                .collect::<Vec<_>>();
            let results = execute_join_policy(tasks, "quorum", Some(2), Some(2)).await;
            assert_eq!(results.len(), 2);
            assert!(
                max_seen.get() <= 2,
                "observed concurrency {}",
                max_seen.get()
            );
        })
        .await;
}

#[tokio::test(flavor = "current_thread")]
async fn failed_verify_stage_preserves_verification_artifact_and_result() {
    let node = crate::orchestration::WorkflowNode {
        id: Some("verify".to_string()),
        kind: "verify".to_string(),
        retry_policy: crate::orchestration::RetryPolicy {
            max_attempts: 1,
            ..Default::default()
        },
        verify: Some(serde_json::json!({
            "command": "printf nope; exit 1",
            "expect_status": 0,
        })),
        output_contract: crate::orchestration::StageContract {
            output_kinds: vec!["verification_result".to_string()],
            ..Default::default()
        },
        ..Default::default()
    };

    let executed = execute_stage_attempts("run verification", "verify", &node, &[], None)
        .await
        .expect("stage executes");

    assert_eq!(executed.status, "failed");
    assert_eq!(executed.outcome, "verification_failed");
    assert_eq!(executed.branch.as_deref(), Some("failed"));
    assert_eq!(executed.artifacts.len(), 1);
    assert_eq!(executed.artifacts[0].kind, "verification_result");
    assert!(executed.result["visible_text"]
        .as_str()
        .unwrap_or("")
        .contains("nope"));
    assert_eq!(
        executed
            .verification
            .as_ref()
            .and_then(|value| value.get("ok"))
            .and_then(|value| value.as_bool()),
        Some(false)
    );
}

#[tokio::test(flavor = "current_thread")]
async fn verify_stage_preserves_input_transcript() {
    // Build a mock transcript with conversation history, simulating what an
    // implement stage would return after an agent loop.
    let messages = vec![
        serde_json::json!({"role": "user", "content": "implement the feature"}),
        serde_json::json!({"role": "assistant", "content": "I'll edit the file now."}),
        serde_json::json!({"role": "user", "content": "Tool result: file written"}),
    ];
    let input_transcript = crate::llm::helpers::transcript_to_vm_with_events(
        Some("test-transcript-id".to_string()),
        None,
        None,
        &messages,
        Vec::new(),
        Vec::new(),
        Some("active"),
    );

    let node = crate::orchestration::WorkflowNode {
        id: Some("verify".to_string()),
        kind: "verify".to_string(),
        retry_policy: crate::orchestration::RetryPolicy {
            max_attempts: 1,
            ..Default::default()
        },
        verify: Some(serde_json::json!({
            "command": "echo ok",
            "expect_status": 0,
        })),
        output_contract: crate::orchestration::StageContract {
            output_kinds: vec!["verification_result".to_string()],
            ..Default::default()
        },
        ..Default::default()
    };

    let executed =
        execute_stage_attempts("run tests", "verify", &node, &[], Some(input_transcript))
            .await
            .expect("stage executes");

    assert_eq!(executed.status, "completed");
    // The critical check: the transcript must survive the verify stage.
    let transcript = executed
        .transcript
        .expect("verify stage must preserve input transcript");
    let dict = transcript.as_dict().expect("transcript must be a dict");
    let msg_list = match dict.get("messages") {
        Some(crate::value::VmValue::List(list)) => list,
        _ => panic!("transcript must have a messages list"),
    };
    assert_eq!(
        msg_list.len(),
        3,
        "verify stage should preserve all 3 messages from the implement stage transcript"
    );
}

#[tokio::test(flavor = "current_thread")]
async fn verify_stage_with_reset_transcript_policy_clears_transcript() {
    let messages = vec![
        serde_json::json!({"role": "user", "content": "implement the feature"}),
        serde_json::json!({"role": "assistant", "content": "Done."}),
    ];
    let input_transcript = crate::llm::helpers::transcript_to_vm_with_events(
        Some("test-transcript-id".to_string()),
        None,
        None,
        &messages,
        Vec::new(),
        Vec::new(),
        Some("active"),
    );

    let node = crate::orchestration::WorkflowNode {
        id: Some("verify".to_string()),
        kind: "verify".to_string(),
        retry_policy: crate::orchestration::RetryPolicy {
            max_attempts: 1,
            ..Default::default()
        },
        transcript_policy: crate::orchestration::TranscriptPolicy {
            mode: Some("reset".to_string()),
            ..Default::default()
        },
        verify: Some(serde_json::json!({
            "command": "echo ok",
            "expect_status": 0,
        })),
        output_contract: crate::orchestration::StageContract {
            output_kinds: vec!["verification_result".to_string()],
            ..Default::default()
        },
        ..Default::default()
    };

    let executed =
        execute_stage_attempts("run tests", "verify", &node, &[], Some(input_transcript))
            .await
            .expect("stage executes");

    assert!(
        executed.transcript.is_none(),
        "reset transcript policy should clear the transcript even for verify stages"
    );
}

#[tokio::test(flavor = "current_thread")]
async fn failing_stage_records_exactly_one_attempt_regardless_of_max_attempts() {
    // `retry_policy.max_attempts` is a no-op. A stage that fails runs once;
    // iteration lives at the workflow-graph level.
    let node = crate::orchestration::WorkflowNode {
        id: Some("verify".to_string()),
        kind: "verify".to_string(),
        retry_policy: crate::orchestration::RetryPolicy {
            max_attempts: 5,
            backoff_ms: Some(1),
            ..Default::default()
        },
        verify: Some(serde_json::json!({
            "command": "exit 7",
            "expect_status": 0,
        })),
        output_contract: crate::orchestration::StageContract {
            output_kinds: vec!["verification_result".to_string()],
            ..Default::default()
        },
        ..Default::default()
    };

    let executed = execute_stage_attempts("verify", "verify", &node, &[], None)
        .await
        .expect("stage executes");

    assert_eq!(
        executed.attempts.len(),
        1,
        "failing stage must record exactly one attempt; retry-loop is removed"
    );
    assert_eq!(executed.status, "failed");
    assert_eq!(executed.branch.as_deref(), Some("failed"));
}

#[tokio::test(flavor = "current_thread")]
async fn succeeding_stage_records_single_attempt() {
    let node = crate::orchestration::WorkflowNode {
        id: Some("verify".to_string()),
        kind: "verify".to_string(),
        retry_policy: crate::orchestration::RetryPolicy {
            max_attempts: 3,
            ..Default::default()
        },
        verify: Some(serde_json::json!({
            "command": "echo ok",
            "expect_status": 0,
        })),
        output_contract: crate::orchestration::StageContract {
            output_kinds: vec!["verification_result".to_string()],
            ..Default::default()
        },
        ..Default::default()
    };

    let executed = execute_stage_attempts("verify", "verify", &node, &[], None)
        .await
        .expect("stage executes");

    assert_eq!(executed.attempts.len(), 1);
    assert_eq!(executed.status, "completed");
}

#[tokio::test(flavor = "current_thread")]
async fn stage_task_reaches_execution_verbatim() {
    // The task passed to `execute_stage_attempts` reaches the stage
    // unmodified; no retry-suffix text is appended.
    let node = crate::orchestration::WorkflowNode {
        id: Some("verify".to_string()),
        kind: "verify".to_string(),
        retry_policy: crate::orchestration::RetryPolicy {
            max_attempts: 3,
            ..Default::default()
        },
        verify: Some(serde_json::json!({
            "command": "echo 'verification'; exit 1",
            "expect_status": 0,
        })),
        output_contract: crate::orchestration::StageContract {
            output_kinds: vec!["verification_result".to_string()],
            ..Default::default()
        },
        ..Default::default()
    };

    let executed =
        execute_stage_attempts("the original task, pristine", "verify", &node, &[], None)
            .await
            .expect("stage executes");

    assert_eq!(executed.attempts.len(), 1);
}