Skip to main content

harn_cli/commands/run/
mod.rs

1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use harn_parser::DiagnosticSeverity;
11use harn_vm::event_log::EventLog;
12use serde::Serialize;
13
14use crate::commands::mcp::{self, AuthResolution};
15use crate::commands::time::{self, PhaseRecord, RunTiming};
16use crate::package;
17use crate::parse_source_file;
18use crate::skill_loader::{
19    canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
20    SkillLoaderInputs,
21};
22
23mod explain_cost;
24pub mod harnpack;
25pub mod json_events;
26
27use self::harnpack::{HarnpackError, HarnpackRunOptions, PreparedHarnpack};
28use self::json_events::NdjsonEmitter;
29
30/// JSON event-stream configuration for `--json` runs.
31#[derive(Clone, Default)]
32pub struct RunJsonOptions {
33    /// Suppress `stdout` / `stderr` events. Transcript, tool, hook,
34    /// persona, and the terminal result/error events still flow.
35    pub quiet: bool,
36}
37
38/// Post-run summary configuration for `harn run --emit-summary-json`.
39#[derive(Clone, Debug)]
40pub struct RunSummaryOptions {
41    pub sink: RunJsonSink,
42}
43
44#[derive(Clone, Debug)]
45pub struct RunPhaseOptions {
46    pub sink: RunJsonSink,
47}
48
49#[derive(Clone, Debug)]
50pub struct RunRusageOptions {
51    pub sink: RunJsonSink,
52}
53
54#[derive(Clone, Debug, Default)]
55pub struct RunAuxOptions {
56    pub summary: Option<RunSummaryOptions>,
57    pub phase: Option<RunPhaseOptions>,
58    pub rusage: Option<RunRusageOptions>,
59}
60
61#[derive(Clone, Debug)]
62pub struct RunJsonSink {
63    pub target: RunJsonSinkTarget,
64    pub fd_flag: &'static str,
65}
66
67#[derive(Clone, Debug)]
68pub enum RunJsonSinkTarget {
69    /// Append the summary to the captured stderr buffer so it remains
70    /// terminal after all diagnostics that `run_file_with_skill_dirs`
71    /// flushes on return.
72    Stderr,
73    File(PathBuf),
74    Fd(i32),
75}
76
77#[derive(Serialize)]
78struct RunSummary<'a> {
79    schema_version: u32,
80    event: &'static str,
81    wall_time_ms: u64,
82    exit_code: i32,
83    llm: RunSummaryLlm,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    profile: Option<&'a harn_vm::profile::RunProfile>,
86}
87
88#[derive(Serialize)]
89struct RunSummaryLlm {
90    call_count: i64,
91    input_tokens: i64,
92    output_tokens: i64,
93    time_ms: i64,
94    cost_usd: f64,
95}
96
97pub const RUN_SUMMARY_SCHEMA_VERSION: u32 = 1;
98pub const RUN_PHASE_SCHEMA_VERSION: u32 = 1;
99pub const RUN_RUSAGE_SCHEMA_VERSION: u32 = 1;
100
101#[derive(Serialize)]
102struct RunPhaseEvent {
103    schema_version: u32,
104    event: &'static str,
105    phases: Vec<PhaseRecord>,
106}
107
108#[derive(Serialize)]
109struct RunRusageEvent {
110    schema_version: u32,
111    event: &'static str,
112    cpu_ms: u64,
113}
114
115pub(crate) fn run_summary_options_from_args(
116    args: &crate::cli::RunArgs,
117) -> Option<RunSummaryOptions> {
118    args.emit_summary_json.then(|| RunSummaryOptions {
119        sink: build_run_json_sink(args.summary_file.clone(), args.summary_fd, "--summary-fd"),
120    })
121}
122
123pub(crate) fn run_aux_options_from_args(args: &crate::cli::RunArgs) -> RunAuxOptions {
124    RunAuxOptions {
125        summary: run_summary_options_from_args(args),
126        phase: run_phase_options_from_args(args),
127        rusage: run_rusage_options_from_args(args),
128    }
129}
130
131pub(crate) fn run_phase_options_from_args(args: &crate::cli::RunArgs) -> Option<RunPhaseOptions> {
132    args.emit_phase_json.then(|| RunPhaseOptions {
133        sink: build_run_json_sink(args.phase_file.clone(), args.phase_fd, "--phase-fd"),
134    })
135}
136
137pub(crate) fn run_rusage_options_from_args(args: &crate::cli::RunArgs) -> Option<RunRusageOptions> {
138    args.emit_rusage_json.then(|| RunRusageOptions {
139        sink: build_run_json_sink(args.rusage_file.clone(), args.rusage_fd, "--rusage-fd"),
140    })
141}
142
143fn build_run_json_sink(
144    file: Option<PathBuf>,
145    fd: Option<i32>,
146    fd_flag: &'static str,
147) -> RunJsonSink {
148    RunJsonSink {
149        target: if let Some(path) = file {
150            RunJsonSinkTarget::File(path)
151        } else if let Some(fd) = fd {
152            RunJsonSinkTarget::Fd(fd)
153        } else {
154            RunJsonSinkTarget::Stderr
155        },
156        fd_flag,
157    }
158}
159
160pub(crate) enum RunFileMcpServeMode {
161    Stdio,
162    Http(Box<RunFileMcpServeHttp>),
163}
164
165pub(crate) struct RunFileMcpServeHttp {
166    pub options: harn_serve::McpHttpServeOptions,
167    pub auth_policy: harn_serve::AuthPolicy,
168}
169
170/// Core builtins that are never denied, even when using `--allow`.
171const CORE_BUILTINS: &[&str] = &[
172    "println",
173    "print",
174    "log",
175    "type_of",
176    "to_string",
177    "to_int",
178    "to_float",
179    "len",
180    "assert",
181    "assert_eq",
182    "assert_ne",
183    "json_parse",
184    "json_stringify",
185    "runtime_context",
186    "task_current",
187    "runtime_context_values",
188    "runtime_context_get",
189    "runtime_context_set",
190    "runtime_context_clear",
191];
192
193/// Build the set of denied builtin names from `--deny` or `--allow` flags.
194///
195/// - `--deny a,b,c` denies exactly those names.
196/// - `--allow a,b,c` denies everything *except* the listed names and the core builtins.
197pub(crate) fn build_denied_builtins(
198    deny_csv: Option<&str>,
199    allow_csv: Option<&str>,
200) -> HashSet<String> {
201    if let Some(csv) = deny_csv {
202        csv.split(',')
203            .map(|s| s.trim().to_string())
204            .filter(|s| !s.is_empty())
205            .collect()
206    } else if let Some(csv) = allow_csv {
207        // With --allow, we mark every registered stdlib builtin as denied
208        // *except* those in the allow list and the core builtins.
209        let allowed: HashSet<String> = csv
210            .split(',')
211            .map(|s| s.trim().to_string())
212            .filter(|s| !s.is_empty())
213            .collect();
214        let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
215
216        // Create a temporary VM with stdlib registered to enumerate all builtin names.
217        let mut tmp = harn_vm::Vm::new();
218        harn_vm::register_vm_stdlib(&mut tmp);
219        harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
220        harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
221
222        tmp.builtin_names()
223            .into_iter()
224            .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
225            .collect()
226    } else {
227        HashSet::new()
228    }
229}
230
231/// Result of [`compile_or_load_chunk_for_run`]. Failures propagate as
232/// diagnostic text on the run path so callers map them straight to a
233/// non-zero exit code without bespoke error types.
234pub(crate) struct LoadedChunk {
235    pub(crate) source: String,
236    pub(crate) chunk: harn_vm::Chunk,
237}
238
239/// Load the entry pipeline as a runnable [`harn_vm::Chunk`], using the
240/// content-addressed bytecode cache when its key matches. On a cache miss
241/// we read, parse, type-check, and compile, then persist the chunk.
242/// On a hit we skip parse/typecheck/compile entirely — the cache invariant
243/// is that a stored chunk passed those phases on the writer's harn build,
244/// and the key includes every transitively-imported user file so any
245/// change re-runs the full path.
246///
247/// `stderr` receives any diagnostic output. Returns `None` when a fatal
248/// type or compile error blocks execution; the caller maps that to
249/// exit-code 1.
250pub(crate) fn compile_or_load_chunk_for_run(
251    path: &str,
252    stderr: &mut String,
253) -> Option<LoadedChunk> {
254    compile_or_load_chunk_with_timing(path, stderr, None)
255}
256
257/// Like [`compile_or_load_chunk_for_run`] but lets the caller observe
258/// per-phase wall-clock timings (parse, typecheck, bytecode compile +
259/// cache hit/miss). Used by `harn time run` to drive the same code
260/// path as `harn run` while reporting phase-level timing.
261//
262// The `as_deref_mut` calls reborrow the inner `&mut RunTiming` so each
263// phase can mutate it independently. Clippy's `needless_option_as_deref`
264// is correct that the surface types match — that's exactly the
265// reborrow we want.
266#[allow(clippy::needless_option_as_deref)]
267pub(crate) fn compile_or_load_chunk_with_timing(
268    path: &str,
269    stderr: &mut String,
270    mut timing: Option<&mut RunTiming>,
271) -> Option<LoadedChunk> {
272    let source = match fs::read_to_string(path) {
273        Ok(s) => s,
274        Err(e) => {
275            stderr.push_str(&format!("Error reading {path}: {e}\n"));
276            return None;
277        }
278    };
279    if let Some(t) = timing.as_deref_mut() {
280        t.input_bytes = source.len() as u64;
281    }
282
283    let compile_phase_start = Instant::now();
284    let lookup = harn_vm::bytecode_cache::load(Path::new(path), &source);
285    if let Some(chunk) = lookup.chunk {
286        if let Some(t) = timing.as_deref_mut() {
287            t.cache_hit = true;
288            t.bytecode_compile = compile_phase_start.elapsed();
289        }
290        return Some(LoadedChunk { source, chunk });
291    }
292    if let Some(t) = timing.as_deref_mut() {
293        t.cache_hit = false;
294    }
295
296    let parse_start = Instant::now();
297    let program = parse_source_for_run(path, &source, stderr)?;
298    if let Some(t) = timing.as_deref_mut() {
299        t.parse = parse_start.elapsed();
300    }
301
302    let typecheck_start = Instant::now();
303    let mut had_type_error = false;
304    let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
305    for diag in &type_diagnostics {
306        let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
307        if matches!(diag.severity, DiagnosticSeverity::Error) {
308            had_type_error = true;
309        }
310        stderr.push_str(&rendered);
311    }
312    if let Some(t) = timing.as_deref_mut() {
313        t.typecheck = typecheck_start.elapsed();
314    }
315    if had_type_error {
316        return None;
317    }
318
319    let compile_step_start = Instant::now();
320    let chunk = match harn_vm::Compiler::new().compile(&program) {
321        Ok(c) => c,
322        Err(e) => {
323            stderr.push_str(&format!("error: compile error: {e}\n"));
324            return None;
325        }
326    };
327
328    // Cache misses are best-effort — read-only homedirs, full disks, and
329    // sandboxes are common in CI environments. Surface the failure as a
330    // single-line warning when explicitly requested via the audit hook;
331    // otherwise stay quiet to avoid bloating happy-path output.
332    if let Err(err) = harn_vm::bytecode_cache::store(&lookup.key, &chunk) {
333        if std::env::var_os("HARN_BYTECODE_CACHE_DEBUG").is_some() {
334            eprintln!("[harn] bytecode cache write skipped: {err}");
335        }
336    }
337    if let Some(t) = timing.as_deref_mut() {
338        t.bytecode_compile = compile_step_start.elapsed();
339    }
340
341    Some(LoadedChunk { source, chunk })
342}
343
344fn parse_source_for_run(
345    path: &str,
346    source: &str,
347    stderr: &mut String,
348) -> Option<Vec<harn_parser::SNode>> {
349    crate::ensure_builtin_signatures_installed();
350
351    let mut lexer = harn_lexer::Lexer::new(source);
352    let tokens = match lexer.tokenize() {
353        Ok(tokens) => tokens,
354        Err(error) => {
355            let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
356                source,
357                path,
358                &error_span_from_lex(&error),
359                "error",
360                harn_parser::diagnostic::lexer_error_code(&error),
361                &error.to_string(),
362                Some("here"),
363                None,
364            );
365            stderr.push_str(&diagnostic);
366            return None;
367        }
368    };
369
370    let mut parser = harn_parser::Parser::new(tokens);
371    match parser.parse() {
372        Ok(program) => Some(program),
373        Err(error) => {
374            if parser.all_errors().is_empty() {
375                render_parse_error(path, source, &error, stderr);
376            } else {
377                for error in parser.all_errors() {
378                    render_parse_error(path, source, error, stderr);
379                }
380            }
381            None
382        }
383    }
384}
385
386fn render_parse_error(
387    path: &str,
388    source: &str,
389    error: &harn_parser::ParserError,
390    stderr: &mut String,
391) {
392    let span = error_span_from_parse(error);
393    let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
394        source,
395        path,
396        &span,
397        "error",
398        harn_parser::diagnostic::parser_error_code(error),
399        &harn_parser::diagnostic::parser_error_message(error),
400        Some(harn_parser::diagnostic::parser_error_label(error)),
401        harn_parser::diagnostic::parser_error_help(error),
402    );
403    stderr.push_str(&diagnostic);
404}
405
406fn error_span_from_lex(error: &harn_lexer::LexerError) -> harn_lexer::Span {
407    match error {
408        harn_lexer::LexerError::UnexpectedCharacter(_, span)
409        | harn_lexer::LexerError::UnterminatedString(span)
410        | harn_lexer::LexerError::UnterminatedBlockComment(span) => *span,
411    }
412}
413
414fn error_span_from_parse(error: &harn_parser::ParserError) -> harn_lexer::Span {
415    match error {
416        harn_parser::ParserError::Unexpected { span, .. } => *span,
417        harn_parser::ParserError::UnexpectedEof { span, .. } => *span,
418    }
419}
420
421/// Run the static type checker against `program` with cross-module
422/// import-aware call resolution when the file's imports all resolve. Used
423/// by `run_file` and the MCP server entry so `harn run` catches undefined
424/// cross-module calls before the VM starts.
425fn typecheck_with_imports(
426    program: &[harn_parser::SNode],
427    path: &Path,
428    source: &str,
429) -> Vec<harn_parser::TypeDiagnostic> {
430    if let Err(error) = package::ensure_dependencies_materialized(path) {
431        eprintln!("error: {error}");
432        process::exit(1);
433    }
434    let graph = harn_modules::build(&[path.to_path_buf()]);
435    let mut checker = harn_parser::TypeChecker::new();
436    if let Some(imported) = graph.imported_names_for_file(path) {
437        checker = checker.with_imported_names(imported);
438    }
439    if let Some(imported) = graph.imported_type_declarations_for_file(path) {
440        checker = checker.with_imported_type_decls(imported);
441    }
442    if let Some(imported) = graph.imported_callable_declarations_for_file(path) {
443        checker = checker.with_imported_callable_decls(imported);
444    }
445    checker.check_with_source(program, source)
446}
447
448/// Build the wrapped source and temp file backing a `harn run -e` invocation.
449///
450/// `import` is a top-level declaration in Harn, so the leading prefix of
451/// import lines (with surrounding blanks/comments) is hoisted out of the
452/// `pipeline main(task) { ... }` wrapper. The temp file is created in the
453/// current working directory so relative imports (`import "./lib"`) and
454/// `harn.toml` discovery resolve against the user's project, not the
455/// system temp dir. If the CWD is unwritable we fall back to the system
456/// temp dir with a stderr warning — pure-expression `-e` still works,
457/// but relative imports will fail to resolve.
458pub(crate) fn prepare_eval_temp_file(
459    code: &str,
460) -> Result<(String, tempfile::NamedTempFile), String> {
461    let (header, body) = split_eval_header(code);
462    let wrapped = if header.is_empty() {
463        format!("pipeline main(task) {{\n{body}\n}}")
464    } else {
465        format!("{header}\npipeline main(task) {{\n{body}\n}}")
466    };
467
468    let tmp = create_eval_temp_file()?;
469    Ok((wrapped, tmp))
470}
471
472/// Try to place the `-e` temp file in the current working directory so
473/// relative imports and `harn.toml` discovery resolve against the user's
474/// project. Fall back to the system temp dir on failure (with a warning),
475/// so pure-expression `-e` keeps working in read-only contexts.
476fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
477    if let Some(dir) = std::env::current_dir().ok().as_deref() {
478        // Hidden prefix on Unix so editors / tree-walkers are less likely
479        // to pick the file up during its short lifetime.
480        match tempfile::Builder::new()
481            .prefix(".harn-eval-")
482            .suffix(".harn")
483            .tempfile_in(dir)
484        {
485            Ok(tmp) => return Ok(tmp),
486            Err(error) => eprintln!(
487                "warning: harn run -e: could not create temp file in {}: {error}; \
488                 relative imports will not resolve",
489                dir.display()
490            ),
491        }
492    }
493    tempfile::Builder::new()
494        .prefix("harn-eval-")
495        .suffix(".harn")
496        .tempfile()
497        .map_err(|e| format!("failed to create temp file for -e: {e}"))
498}
499
500/// Split the `-e` input into a header (top-level imports + leading
501/// blanks/comments) and a body (everything else, to be wrapped in
502/// `pipeline main(task)`). The header may be empty.
503///
504/// Lines whose first non-whitespace token is `import` or `pub import`
505/// are treated as imports. Scanning stops at the first non-blank,
506/// non-comment, non-import line.
507fn split_eval_header(code: &str) -> (String, String) {
508    let mut header_end = 0usize;
509    let mut last_kept = 0usize;
510    for (idx, line) in code.lines().enumerate() {
511        let trimmed = line.trim_start();
512        if trimmed.is_empty() || trimmed.starts_with("//") {
513            header_end = idx + 1;
514            continue;
515        }
516        let is_import = trimmed.starts_with("import ")
517            || trimmed.starts_with("import\t")
518            || trimmed.starts_with("import\"")
519            || trimmed.starts_with("pub import ")
520            || trimmed.starts_with("pub import\t");
521        if is_import {
522            header_end = idx + 1;
523            last_kept = idx + 1;
524        } else {
525            break;
526        }
527    }
528    if last_kept == 0 {
529        return (String::new(), code.to_string());
530    }
531    let mut header_lines: Vec<&str> = Vec::new();
532    let mut body_lines: Vec<&str> = Vec::new();
533    for (idx, line) in code.lines().enumerate() {
534        if idx < header_end {
535            header_lines.push(line);
536        } else {
537            body_lines.push(line);
538        }
539    }
540    (header_lines.join("\n"), body_lines.join("\n"))
541}
542
543#[derive(Clone, Debug, Default, PartialEq, Eq)]
544pub enum CliLlmMockMode {
545    #[default]
546    Off,
547    Replay {
548        fixture_path: PathBuf,
549    },
550    Record {
551        fixture_path: PathBuf,
552    },
553}
554
555#[derive(Clone, Debug, Default, PartialEq, Eq)]
556pub struct RunAttestationOptions {
557    pub receipt_out: Option<PathBuf>,
558    pub agent_id: Option<String>,
559}
560
561/// Opt-in profiling. When `text` is true the run prints a categorical
562/// breakdown to stderr after execution; when `json_path` is set the same
563/// rollup is serialized to that path. Either flag enables span tracing
564/// (i.e. `harn_vm::tracing::set_tracing_enabled(true)`).
565#[derive(Clone, Debug, Default, PartialEq, Eq)]
566pub struct RunProfileOptions {
567    pub text: bool,
568    pub json_path: Option<PathBuf>,
569}
570
571impl RunProfileOptions {
572    pub fn is_enabled(&self) -> bool {
573        self.text || self.json_path.is_some()
574    }
575}
576
577#[derive(Clone, Debug, PartialEq, Eq)]
578pub struct RunSandboxOptions {
579    /// Install the default `harn run` sandbox for this invocation.
580    pub enabled: bool,
581    /// Override the workspace root used by the default sandbox. This is
582    /// intended for host-generated scripts whose source file lives outside
583    /// the workspace they operate on.
584    pub workspace_root: Option<PathBuf>,
585}
586
587impl Default for RunSandboxOptions {
588    fn default() -> Self {
589        Self {
590            enabled: true,
591            workspace_root: None,
592        }
593    }
594}
595
596impl RunSandboxOptions {
597    /// Disable the default direct-run sandbox and egress guard.
598    pub fn disabled() -> Self {
599        Self {
600            enabled: false,
601            workspace_root: None,
602        }
603    }
604
605    /// Constrain the default sandbox to an explicit workspace root.
606    pub fn with_workspace_root(mut self, workspace_root: impl Into<PathBuf>) -> Self {
607        self.workspace_root = Some(workspace_root.into());
608        self
609    }
610}
611
612#[derive(Clone)]
613pub struct RunInterruptTokens {
614    pub cancel_token: Arc<AtomicBool>,
615    pub signal_token: Arc<Mutex<Option<String>>>,
616}
617
618struct ExecuteRunInputs<'a> {
619    path: &'a str,
620    trace: bool,
621    denied_builtins: HashSet<String>,
622    script_argv: Vec<String>,
623    skill_dirs_raw: Vec<String>,
624    llm_mock_mode: CliLlmMockMode,
625    attestation: Option<RunAttestationOptions>,
626    profile: RunProfileOptions,
627    sandbox: RunSandboxOptions,
628    interrupt_tokens: Option<RunInterruptTokens>,
629    json: Option<(RunJsonOptions, Box<dyn io::Write + Send>)>,
630    aux: RunAuxOptions,
631    timing: Option<&'a mut RunTiming>,
632    harnpack: HarnpackRunOptions,
633}
634
635/// Captured outcome of an in-process `execute_run` invocation. Tests use this
636/// instead of spawning the `harn` binary; the binary entry point translates
637/// it into real stdout/stderr writes + `process::exit`.
638#[derive(Clone, Debug, Default)]
639pub struct RunOutcome {
640    pub stdout: String,
641    pub stderr: String,
642    pub exit_code: i32,
643}
644
645pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
646    harn_vm::llm::clear_cli_llm_mock_mode();
647    match mode {
648        CliLlmMockMode::Off => Ok(()),
649        CliLlmMockMode::Replay { fixture_path } => {
650            let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
651            harn_vm::llm::install_cli_llm_mocks(mocks);
652            Ok(())
653        }
654        CliLlmMockMode::Record { .. } => {
655            harn_vm::llm::enable_cli_llm_mock_recording();
656            Ok(())
657        }
658    }
659}
660
661pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
662    let CliLlmMockMode::Record { fixture_path } = mode else {
663        return Ok(());
664    };
665    if let Some(parent) = fixture_path.parent() {
666        if !parent.as_os_str().is_empty() {
667            fs::create_dir_all(parent).map_err(|error| {
668                format!(
669                    "failed to create fixture directory {}: {error}",
670                    parent.display()
671                )
672            })?;
673        }
674    }
675
676    let lines = harn_vm::llm::take_cli_llm_recordings()
677        .into_iter()
678        .map(harn_vm::llm::serialize_llm_mock)
679        .collect::<Result<Vec<_>, _>>()?;
680    let body = if lines.is_empty() {
681        String::new()
682    } else {
683        format!("{}\n", lines.join("\n"))
684    };
685    fs::write(fixture_path, body)
686        .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
687}
688
689pub(crate) async fn run_file(
690    path: &str,
691    trace: bool,
692    denied_builtins: HashSet<String>,
693    script_argv: Vec<String>,
694    llm_mock_mode: CliLlmMockMode,
695    attestation: Option<RunAttestationOptions>,
696    profile: RunProfileOptions,
697) {
698    run_file_with_skill_dirs(
699        path,
700        trace,
701        denied_builtins,
702        script_argv,
703        Vec::new(),
704        llm_mock_mode,
705        attestation,
706        profile,
707        RunSandboxOptions::default(),
708        None,
709        RunAuxOptions::default(),
710        HarnpackRunOptions::default(),
711    )
712    .await;
713}
714
715pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
716    let outcome = execute_explain_cost(path);
717    if !outcome.stderr.is_empty() {
718        io::stderr().write_all(outcome.stderr.as_bytes()).ok();
719    }
720    if !outcome.stdout.is_empty() {
721        io::stdout().write_all(outcome.stdout.as_bytes()).ok();
722    }
723    if outcome.exit_code != 0 {
724        process::exit(outcome.exit_code);
725    }
726}
727
728#[allow(clippy::too_many_arguments)]
729pub(crate) async fn run_file_with_skill_dirs(
730    path: &str,
731    trace: bool,
732    denied_builtins: HashSet<String>,
733    script_argv: Vec<String>,
734    skill_dirs_raw: Vec<String>,
735    llm_mock_mode: CliLlmMockMode,
736    attestation: Option<RunAttestationOptions>,
737    profile: RunProfileOptions,
738    sandbox: RunSandboxOptions,
739    json: Option<RunJsonOptions>,
740    aux: RunAuxOptions,
741    harnpack: HarnpackRunOptions,
742) {
743    // Graceful shutdown: flush run records before exit on SIGINT/SIGTERM.
744    let interrupt_tokens = install_signal_shutdown_handler();
745
746    let _stdout_passthrough = StdoutPassthroughGuard::enable();
747    let json_with_stdout =
748        json.map(|opts| (opts, Box::new(io::stdout()) as Box<dyn io::Write + Send>));
749    let outcome = execute_run_inner(ExecuteRunInputs {
750        path,
751        trace,
752        denied_builtins,
753        script_argv,
754        skill_dirs_raw,
755        llm_mock_mode,
756        attestation,
757        profile,
758        sandbox,
759        interrupt_tokens: Some(interrupt_tokens.clone()),
760        json: json_with_stdout,
761        aux,
762        timing: None,
763        harnpack,
764    })
765    .await;
766
767    // `harn run` streams normal program stdout during execution. Any stdout
768    // left here came from older capture paths, so flush it after diagnostics.
769    if !outcome.stderr.is_empty() {
770        io::stderr().write_all(outcome.stderr.as_bytes()).ok();
771    }
772    if !outcome.stdout.is_empty() {
773        io::stdout().write_all(outcome.stdout.as_bytes()).ok();
774    }
775
776    let mut exit_code = outcome.exit_code;
777    if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
778        exit_code = 124;
779    }
780    if exit_code != 0 {
781        process::exit(exit_code);
782    }
783}
784
785#[allow(clippy::too_many_arguments)]
786pub(crate) async fn run_resume_with_skill_dirs(
787    target: &str,
788    trace: bool,
789    denied_builtins: HashSet<String>,
790    resume_argv: Vec<String>,
791    skill_dirs_raw: Vec<String>,
792    llm_mock_mode: CliLlmMockMode,
793    attestation: Option<RunAttestationOptions>,
794    profile: RunProfileOptions,
795    sandbox: RunSandboxOptions,
796    json: Option<RunJsonOptions>,
797    aux: RunAuxOptions,
798) {
799    let source = r#"import { resume_agent, wait_agent } from "std/agent/workers"
800
801pipeline main(task) {
802  let input = if len(argv) > 1 {
803    argv[1]
804  } else {
805    nil
806  }
807  let handle = resume_agent(argv[0], input, true)
808  return wait_agent(handle)
809}
810"#;
811    let tmp = create_eval_temp_file().unwrap_or_else(|e| {
812        eprintln!("error: {e}");
813        process::exit(1);
814    });
815    let tmp_path = tmp.path().to_path_buf();
816    if let Err(error) = fs::write(&tmp_path, source) {
817        eprintln!("error: failed to write temp file for --resume: {error}");
818        process::exit(1);
819    }
820    let mut argv = Vec::with_capacity(resume_argv.len() + 1);
821    argv.push(target.to_string());
822    argv.extend(resume_argv);
823    let tmp_str = tmp_path.to_string_lossy().into_owned();
824    run_file_with_skill_dirs(
825        &tmp_str,
826        trace,
827        denied_builtins,
828        argv,
829        skill_dirs_raw,
830        llm_mock_mode,
831        attestation,
832        profile,
833        sandbox,
834        json,
835        aux,
836        HarnpackRunOptions::default(),
837    )
838    .await;
839}
840
841pub fn execute_explain_cost(path: &str) -> RunOutcome {
842    let stdout = String::new();
843    let mut stderr = String::new();
844
845    let (source, program) = parse_source_file(path);
846
847    let mut had_type_error = false;
848    let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
849    for diag in &type_diagnostics {
850        let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
851        if matches!(diag.severity, DiagnosticSeverity::Error) {
852            had_type_error = true;
853        }
854        stderr.push_str(&rendered);
855    }
856    if had_type_error {
857        return RunOutcome {
858            stdout,
859            stderr,
860            exit_code: 1,
861        };
862    }
863
864    let extensions = package::load_runtime_extensions(Path::new(path));
865    package::install_runtime_extensions(&extensions);
866    RunOutcome {
867        stdout: explain_cost::render_explain_cost(path, &program),
868        stderr,
869        exit_code: 0,
870    }
871}
872
873pub(crate) struct StdoutPassthroughGuard {
874    previous: bool,
875}
876
877impl StdoutPassthroughGuard {
878    pub(crate) fn enable() -> Self {
879        Self {
880            previous: harn_vm::set_stdout_passthrough(true),
881        }
882    }
883}
884
885impl Drop for StdoutPassthroughGuard {
886    fn drop(&mut self) {
887        harn_vm::set_stdout_passthrough(self.previous);
888    }
889}
890
891struct ExecutionPolicyGuard;
892
893impl Drop for ExecutionPolicyGuard {
894    fn drop(&mut self) {
895        harn_vm::orchestration::pop_execution_policy();
896    }
897}
898
899struct RunSandboxScope {
900    _execution_policy: Option<ExecutionPolicyGuard>,
901    _egress_policy: Option<harn_vm::egress::ExplicitEgressPolicyGuard>,
902}
903
904impl RunSandboxScope {
905    fn disabled() -> Self {
906        Self {
907            _execution_policy: None,
908            _egress_policy: None,
909        }
910    }
911}
912
913fn install_run_sandbox_scope(
914    options: &RunSandboxOptions,
915    workspace_root: &Path,
916    stderr: &mut String,
917) -> RunSandboxScope {
918    if !options.enabled {
919        stderr.push_str(
920            "warning: harn run --no-sandbox disables filesystem, process, and egress sandbox defaults\n",
921        );
922        return RunSandboxScope::disabled();
923    }
924
925    let execution_policy = if harn_vm::orchestration::current_execution_policy().is_none() {
926        harn_vm::orchestration::push_execution_policy(default_run_capability_policy(
927            workspace_root,
928        ));
929        Some(ExecutionPolicyGuard)
930    } else {
931        None
932    };
933    let egress_policy = Some(harn_vm::egress::require_explicit_egress_policy_for_host());
934
935    RunSandboxScope {
936        _execution_policy: execution_policy,
937        _egress_policy: egress_policy,
938    }
939}
940
941fn default_run_capability_policy(
942    workspace_root: &Path,
943) -> harn_vm::orchestration::CapabilityPolicy {
944    harn_vm::orchestration::CapabilityPolicy {
945        workspace_roots: vec![normalize_run_workspace_root(workspace_root)
946            .display()
947            .to_string()],
948        side_effect_level: Some("process_exec".to_string()),
949        sandbox_profile: harn_vm::orchestration::SandboxProfile::Worktree,
950        ..harn_vm::orchestration::CapabilityPolicy::default()
951    }
952}
953
954fn normalize_run_workspace_root(path: &Path) -> PathBuf {
955    if path.is_absolute() {
956        return path.to_path_buf();
957    }
958    std::env::current_dir()
959        .map(|cwd| cwd.join(path))
960        .unwrap_or_else(|_| path.to_path_buf())
961}
962
963fn default_run_workspace_root(project_root: Option<&Path>, source_parent: &Path) -> PathBuf {
964    project_root
965        .map(Path::to_path_buf)
966        .or_else(|| std::env::current_dir().ok())
967        .unwrap_or_else(|| source_parent.to_path_buf())
968}
969
970fn run_sandbox_attestation(sandbox: &RunSandboxOptions) -> serde_json::Value {
971    let active_policy = harn_vm::orchestration::current_execution_policy();
972    let active = active_policy.is_some();
973    let workspace_roots = active_policy
974        .as_ref()
975        .map(|policy| policy.workspace_roots.clone())
976        .unwrap_or_default();
977    let profile = active_policy
978        .as_ref()
979        .map(|policy| policy.sandbox_profile.as_str())
980        .unwrap_or("unrestricted");
981    let egress = if sandbox.enabled {
982        "explicit_policy_required"
983    } else if active {
984        "host_policy"
985    } else {
986        "unrestricted"
987    };
988
989    serde_json::json!({
990        "run_default_enabled": sandbox.enabled,
991        "active": active,
992        "workspace_roots": workspace_roots,
993        "profile": profile,
994        "egress": egress,
995    })
996}
997
998// User-facing copy on Ctrl-C. We want the operator to know that a brief
999// pause after the first signal is expected (the VM rewinds the active
1000// instruction, drops in-flight async ops like a hanging Ollama request,
1001// and unwinds frames before the runtime exits) so they don't reflexively
1002// reach for a second Ctrl-C and force-kill the process. The "Ctrl-C
1003// again to force-exit" hint is load-bearing — earlier runs of harn
1004// released to the fleet showed operators routinely double-tapping the
1005// shortcut and losing the chance to inspect the error trace.
1006const FIRST_SIGNAL_MESSAGE: &str =
1007    "[harn] signal received, interrupting VM (give it a moment to unwind in-flight async ops; Ctrl-C again to force-exit)...";
1008
1009fn install_signal_shutdown_handler() -> RunInterruptTokens {
1010    let tokens = RunInterruptTokens {
1011        cancel_token: Arc::new(AtomicBool::new(false)),
1012        signal_token: Arc::new(Mutex::new(None)),
1013    };
1014    let tokens_clone = tokens.clone();
1015    tokio::spawn(async move {
1016        #[cfg(unix)]
1017        {
1018            use tokio::signal::unix::{signal, SignalKind};
1019            // Containers without controlling-tty access can refuse signal()
1020            // registration. In that case, degrade to no-signal mode rather
1021            // than crashing the `harn run` script — the script still runs,
1022            // it just can't be interrupted cleanly.
1023            let sigterm_stream = signal(SignalKind::terminate()).ok();
1024            let sigint_stream = signal(SignalKind::interrupt()).ok();
1025            let sighup_stream = signal(SignalKind::hangup()).ok();
1026            let (mut sigterm, mut sigint, mut sighup) =
1027                match (sigterm_stream, sigint_stream, sighup_stream) {
1028                    (Some(t), Some(i), Some(h)) => (t, i, h),
1029                    _ => {
1030                        eprintln!(
1031                            "[harn] signal handlers unavailable in this environment; \
1032                             continuing without graceful-shutdown interception"
1033                        );
1034                        return;
1035                    }
1036                };
1037            let mut seen_signal = false;
1038            loop {
1039                let signal_name = tokio::select! {
1040                    _ = sigterm.recv() => "SIGTERM",
1041                    _ = sigint.recv() => "SIGINT",
1042                    _ = sighup.recv() => "SIGHUP",
1043                };
1044                if seen_signal {
1045                    eprintln!("[harn] second signal received, terminating");
1046                    process::exit(124);
1047                }
1048                seen_signal = true;
1049                request_vm_interrupt(&tokens_clone, signal_name);
1050                eprintln!("{FIRST_SIGNAL_MESSAGE}");
1051            }
1052        }
1053        #[cfg(not(unix))]
1054        {
1055            let mut seen_signal = false;
1056            loop {
1057                let _ = tokio::signal::ctrl_c().await;
1058                if seen_signal {
1059                    eprintln!("[harn] second signal received, terminating");
1060                    process::exit(124);
1061                }
1062                seen_signal = true;
1063                request_vm_interrupt(&tokens_clone, "SIGINT");
1064                eprintln!("{FIRST_SIGNAL_MESSAGE}");
1065            }
1066        }
1067    });
1068    tokens
1069}
1070
1071fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
1072    if let Ok(mut signal) = tokens.signal_token.lock() {
1073        *signal = Some(signal_name.to_string());
1074    }
1075    tokens.cancel_token.store(true, Ordering::SeqCst);
1076}
1077
1078/// In-process equivalent of `run_file_with_skill_dirs`. Returns the captured
1079/// stdout, stderr, and what exit code the binary entry would have used,
1080/// instead of writing to real stdout/stderr or calling `process::exit`.
1081///
1082/// Tests should call this directly. The `harn run` binary path wraps it.
1083pub async fn execute_run(
1084    path: &str,
1085    trace: bool,
1086    denied_builtins: HashSet<String>,
1087    script_argv: Vec<String>,
1088    skill_dirs_raw: Vec<String>,
1089    llm_mock_mode: CliLlmMockMode,
1090    attestation: Option<RunAttestationOptions>,
1091    profile: RunProfileOptions,
1092) -> RunOutcome {
1093    crate::ensure_builtin_signatures_installed();
1094    execute_run_with_harnpack_and_sandbox_options(
1095        path,
1096        trace,
1097        denied_builtins,
1098        script_argv,
1099        skill_dirs_raw,
1100        llm_mock_mode,
1101        attestation,
1102        profile,
1103        RunSandboxOptions::default(),
1104        HarnpackRunOptions::default(),
1105    )
1106    .await
1107}
1108
1109/// [`execute_run`] with an explicit sandbox policy override for in-process
1110/// callers whose source path is intentionally outside the workspace they
1111/// operate on.
1112#[allow(clippy::too_many_arguments)]
1113pub async fn execute_run_with_sandbox_options(
1114    path: &str,
1115    trace: bool,
1116    denied_builtins: HashSet<String>,
1117    script_argv: Vec<String>,
1118    skill_dirs_raw: Vec<String>,
1119    llm_mock_mode: CliLlmMockMode,
1120    attestation: Option<RunAttestationOptions>,
1121    profile: RunProfileOptions,
1122    sandbox: RunSandboxOptions,
1123) -> RunOutcome {
1124    execute_run_with_harnpack_and_sandbox_options(
1125        path,
1126        trace,
1127        denied_builtins,
1128        script_argv,
1129        skill_dirs_raw,
1130        llm_mock_mode,
1131        attestation,
1132        profile,
1133        sandbox,
1134        HarnpackRunOptions::default(),
1135    )
1136    .await
1137}
1138
1139/// [`execute_run`] for callers that want to opt-in to the `.harnpack`
1140/// verify-replay-execute path. Used by `harn run <bundle.harnpack>`
1141/// integration tests and by the binary entry once it has parsed the
1142/// `--allow-unsigned` / `--dry-run-verify` flags.
1143#[allow(clippy::too_many_arguments)]
1144pub async fn execute_run_with_harnpack_options(
1145    path: &str,
1146    trace: bool,
1147    denied_builtins: HashSet<String>,
1148    script_argv: Vec<String>,
1149    skill_dirs_raw: Vec<String>,
1150    llm_mock_mode: CliLlmMockMode,
1151    attestation: Option<RunAttestationOptions>,
1152    profile: RunProfileOptions,
1153    harnpack: HarnpackRunOptions,
1154) -> RunOutcome {
1155    execute_run_with_harnpack_and_sandbox_options(
1156        path,
1157        trace,
1158        denied_builtins,
1159        script_argv,
1160        skill_dirs_raw,
1161        llm_mock_mode,
1162        attestation,
1163        profile,
1164        RunSandboxOptions::default(),
1165        harnpack,
1166    )
1167    .await
1168}
1169
1170#[allow(clippy::too_many_arguments)]
1171async fn execute_run_with_harnpack_and_sandbox_options(
1172    path: &str,
1173    trace: bool,
1174    denied_builtins: HashSet<String>,
1175    script_argv: Vec<String>,
1176    skill_dirs_raw: Vec<String>,
1177    llm_mock_mode: CliLlmMockMode,
1178    attestation: Option<RunAttestationOptions>,
1179    profile: RunProfileOptions,
1180    sandbox: RunSandboxOptions,
1181    harnpack: HarnpackRunOptions,
1182) -> RunOutcome {
1183    execute_run_inner(ExecuteRunInputs {
1184        path,
1185        trace,
1186        denied_builtins,
1187        script_argv,
1188        skill_dirs_raw,
1189        llm_mock_mode,
1190        attestation,
1191        profile,
1192        sandbox,
1193        interrupt_tokens: None,
1194        json: None,
1195        aux: RunAuxOptions::default(),
1196        timing: None,
1197        harnpack,
1198    })
1199    .await
1200}
1201
1202/// `execute_run` variant for `--json` mode. Returns once the run is
1203/// complete; the NDJSON event stream — including the terminal `result`
1204/// or `error` event — has already been written to `out` and flushed.
1205/// `out` must be `Send` because the run-event sink may be called from
1206/// any worker thread the VM spawns.
1207#[allow(clippy::too_many_arguments)]
1208pub async fn execute_run_json(
1209    path: &str,
1210    trace: bool,
1211    denied_builtins: HashSet<String>,
1212    script_argv: Vec<String>,
1213    skill_dirs_raw: Vec<String>,
1214    llm_mock_mode: CliLlmMockMode,
1215    attestation: Option<RunAttestationOptions>,
1216    profile: RunProfileOptions,
1217    out: Box<dyn io::Write + Send>,
1218    options: RunJsonOptions,
1219) -> RunOutcome {
1220    execute_run_inner(ExecuteRunInputs {
1221        path,
1222        trace,
1223        denied_builtins,
1224        script_argv,
1225        skill_dirs_raw,
1226        llm_mock_mode,
1227        attestation,
1228        profile,
1229        sandbox: RunSandboxOptions::default(),
1230        interrupt_tokens: None,
1231        json: Some((options, out)),
1232        aux: RunAuxOptions::default(),
1233        timing: None,
1234        harnpack: HarnpackRunOptions::default(),
1235    })
1236    .await
1237}
1238
1239/// Run a `.harn` file with the default builtin/argv set and record
1240/// phase timings into `timing`. Used by `harn time run` so the
1241/// instrumented run shares the exact code path as plain `harn run`.
1242pub(crate) async fn execute_run_with_timing(
1243    path: &str,
1244    script_argv: Vec<String>,
1245    timing: Option<&mut RunTiming>,
1246    sandbox: RunSandboxOptions,
1247) -> RunOutcome {
1248    execute_run_inner(ExecuteRunInputs {
1249        path,
1250        trace: false,
1251        denied_builtins: HashSet::new(),
1252        script_argv,
1253        skill_dirs_raw: Vec::new(),
1254        llm_mock_mode: CliLlmMockMode::Off,
1255        attestation: None,
1256        profile: RunProfileOptions::default(),
1257        sandbox,
1258        interrupt_tokens: None,
1259        json: None,
1260        aux: RunAuxOptions::default(),
1261        timing,
1262        harnpack: HarnpackRunOptions::default(),
1263    })
1264    .await
1265}
1266
1267// See [`compile_or_load_chunk_with_timing`] for why `as_deref_mut` is
1268// the intentional reborrow pattern here.
1269#[allow(clippy::needless_option_as_deref)]
1270async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
1271    let ExecuteRunInputs {
1272        path,
1273        trace,
1274        denied_builtins,
1275        script_argv,
1276        skill_dirs_raw,
1277        llm_mock_mode,
1278        attestation,
1279        profile,
1280        sandbox,
1281        interrupt_tokens,
1282        json,
1283        aux,
1284        timing,
1285        harnpack,
1286    } = inputs;
1287    let RunAuxOptions {
1288        summary,
1289        phase,
1290        rusage,
1291    } = aux;
1292    let run_started = Instant::now();
1293    let cpu_started_ms = rusage.as_ref().map(|_| time::cpu_ms());
1294    let mut owned_timing = if timing.is_none() && (phase.is_some() || rusage.is_some()) {
1295        Some(RunTiming::default())
1296    } else {
1297        None
1298    };
1299    let mut timing = timing.or(owned_timing.as_mut());
1300
1301    // `--json` installs an in-process sink that diverts every
1302    // observable VM event (stdout, stderr, transcript, tool, hook,
1303    // persona) into a single NDJSON stream on `out`. The sink stays
1304    // active until we drop the guard below — fatal errors emit a
1305    // terminal `error` event on the same stream before bailing.
1306    let json_session = json.map(|(options, out)| JsonRunSession::install(options, out));
1307
1308    let mut stderr = String::new();
1309    let mut stdout = String::new();
1310
1311    // `.harnpack` preflight: verify signature + replay archive into the
1312    // content-addressed cache before we touch the chunk loader. The
1313    // outcome path (entrypoint inside the unpacked tree) replaces the
1314    // CLI-supplied `path` for everything below.
1315    let owned_run_path: String;
1316    let resolved_path: &str = if harnpack::looks_like_harnpack(Path::new(path)) {
1317        let outcome = match harnpack::prepare_harnpack(Path::new(path), &harnpack, &mut stderr) {
1318            Ok(prepared) => prepared,
1319            Err(err) => {
1320                return finalize_harnpack_error(
1321                    stderr,
1322                    json_session,
1323                    summary.as_ref(),
1324                    phase.as_ref(),
1325                    rusage.as_ref(),
1326                    run_started,
1327                    err,
1328                );
1329            }
1330        };
1331        harn_vm::run_events::emit(harn_vm::run_events::RunEvent::PackRun {
1332            bundle_hash: outcome.bundle_hash.clone(),
1333            signature_verified: outcome.signature_verified,
1334            key_id: outcome.key_id.clone(),
1335            cache_hit: outcome.cache_hit,
1336            dry_run_verify: harnpack.dry_run_verify,
1337        });
1338        if harnpack.dry_run_verify {
1339            return finalize_harnpack_dry_run(
1340                stderr,
1341                json_session,
1342                summary.as_ref(),
1343                phase.as_ref(),
1344                rusage.as_ref(),
1345                run_started,
1346                cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1347                &outcome,
1348            );
1349        }
1350        owned_run_path = outcome.entrypoint_path.to_string_lossy().into_owned();
1351        owned_run_path.as_str()
1352    } else {
1353        path
1354    };
1355
1356    let Some(LoadedChunk { source, chunk }) =
1357        compile_or_load_chunk_with_timing(resolved_path, &mut stderr, timing.as_deref_mut())
1358    else {
1359        let message = stderr.clone();
1360        return finalize_run_error(
1361            stdout,
1362            stderr,
1363            json_session,
1364            summary.as_ref(),
1365            phase.as_ref(),
1366            rusage.as_ref(),
1367            run_started,
1368            None,
1369            timing.as_deref(),
1370            0,
1371            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1372            "compile_error",
1373            message,
1374        );
1375    };
1376    let path = resolved_path;
1377
1378    // Bracket the VM-setup phase explicitly. `run_setup` covers
1379    // everything between the bytecode compile and the first VM
1380    // instruction; `run_main` covers `vm.execute` proper.
1381    let setup_start = Instant::now();
1382
1383    if trace || summary.is_some() {
1384        harn_vm::llm::enable_tracing();
1385    }
1386    if profile.is_enabled() || phase.is_some() {
1387        harn_vm::tracing::set_tracing_enabled(true);
1388    }
1389    if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
1390        stderr.push_str(&format!("error: {error}\n"));
1391        return finalize_run_error(
1392            stdout,
1393            stderr,
1394            json_session,
1395            summary.as_ref(),
1396            phase.as_ref(),
1397            rusage.as_ref(),
1398            run_started,
1399            None,
1400            timing.as_deref(),
1401            0,
1402            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1403            "llm_mock_install",
1404            error,
1405        );
1406    }
1407
1408    let mut vm = harn_vm::Vm::new();
1409    if let Some(interrupt_tokens) = interrupt_tokens {
1410        vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
1411        vm.install_cancel_token(interrupt_tokens.cancel_token);
1412    }
1413    harn_vm::register_vm_stdlib(&mut vm);
1414    crate::install_default_hostlib(&mut vm);
1415    let source_parent = std::path::Path::new(path)
1416        .parent()
1417        .unwrap_or(std::path::Path::new("."));
1418    // Metadata/store rooted at harn.toml when present; source dir otherwise.
1419    let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1420    let store_base = project_root.as_deref().unwrap_or(source_parent);
1421    let sandbox_root = sandbox
1422        .workspace_root
1423        .clone()
1424        .unwrap_or_else(|| default_run_workspace_root(project_root.as_deref(), source_parent));
1425    let _sandbox_scope = install_run_sandbox_scope(&sandbox, &sandbox_root, &mut stderr);
1426    let attestation_started_at_ms = now_ms();
1427    let attestation_log = if attestation.is_some() {
1428        Some(harn_vm::event_log::install_memory_for_current_thread(256))
1429    } else {
1430        None
1431    };
1432    if let Some(log) = attestation_log.as_ref() {
1433        append_run_provenance_event(
1434            log,
1435            "started",
1436            serde_json::json!({
1437                "pipeline": path,
1438                "argv": &script_argv,
1439                "project_root": store_base.display().to_string(),
1440                "sandbox": run_sandbox_attestation(&sandbox),
1441            }),
1442        )
1443        .await;
1444    }
1445    harn_vm::register_store_builtins(&mut vm, store_base);
1446    harn_vm::register_metadata_builtins(&mut vm, store_base);
1447    let pipeline_name = std::path::Path::new(path)
1448        .file_stem()
1449        .and_then(|s| s.to_str())
1450        .unwrap_or("default");
1451    harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1452    vm.set_source_info(path, &source);
1453    if !denied_builtins.is_empty() {
1454        vm.set_denied_builtins(denied_builtins);
1455    }
1456    if let Some(ref root) = project_root {
1457        vm.set_project_root(root);
1458    }
1459
1460    if let Some(p) = std::path::Path::new(path).parent() {
1461        if !p.as_os_str().is_empty() {
1462            vm.set_source_dir(p);
1463        }
1464    }
1465
1466    // Load filesystem + manifest skills before the pipeline runs so
1467    // `skills` is populated with a pre-discovered registry (see #73).
1468    let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
1469    let loaded = load_skills(&SkillLoaderInputs {
1470        cli_dirs,
1471        source_path: Some(std::path::PathBuf::from(path)),
1472    });
1473    emit_loader_warnings(&loaded.loader_warnings);
1474    install_skills_global(&mut vm, &loaded);
1475
1476    // `harn run script.harn -- a b c` yields `argv == ["a", "b", "c"]`.
1477    // Always set so scripts can rely on `len(argv)`.
1478    let argv_values: Vec<harn_vm::VmValue> = script_argv
1479        .iter()
1480        .map(|s| harn_vm::VmValue::String(std::sync::Arc::from(s.as_str())))
1481        .collect();
1482    vm.set_global(
1483        "argv",
1484        harn_vm::VmValue::List(std::sync::Arc::new(argv_values)),
1485    );
1486
1487    // Install the script's `Harness` capability handle so the auto-call
1488    // emitted by `Compiler::compile()` for `fn main(harness: Harness)`
1489    // entrypoints can read it.
1490    vm.set_harness(harn_vm::Harness::real());
1491
1492    let extensions = package::load_runtime_extensions(Path::new(path));
1493    package::install_runtime_extensions(&extensions);
1494    if let Some(manifest) = extensions.root_manifest.as_ref() {
1495        if !manifest.mcp.is_empty() {
1496            connect_mcp_servers(&manifest.mcp, &mut vm).await;
1497        }
1498    }
1499    if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1500        stderr.push_str(&format!(
1501            "error: failed to install manifest triggers: {error}\n"
1502        ));
1503        return finalize_run_error(
1504            stdout,
1505            stderr,
1506            json_session,
1507            summary.as_ref(),
1508            phase.as_ref(),
1509            rusage.as_ref(),
1510            run_started,
1511            None,
1512            timing.as_deref(),
1513            0,
1514            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1515            "manifest_triggers",
1516            error.to_string(),
1517        );
1518    }
1519    if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1520        stderr.push_str(&format!(
1521            "error: failed to install manifest hooks: {error}\n"
1522        ));
1523        return finalize_run_error(
1524            stdout,
1525            stderr,
1526            json_session,
1527            summary.as_ref(),
1528            phase.as_ref(),
1529            rusage.as_ref(),
1530            run_started,
1531            None,
1532            timing.as_deref(),
1533            0,
1534            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1535            "manifest_hooks",
1536            error.to_string(),
1537        );
1538    }
1539
1540    // Run inside a LocalSet so spawn_local works for concurrency builtins.
1541    let local = tokio::task::LocalSet::new();
1542    if let Some(t) = timing.as_deref_mut() {
1543        t.run_setup = setup_start.elapsed();
1544    }
1545    let main_start = Instant::now();
1546    let execution = local
1547        .run_until(async {
1548            match vm.execute(&chunk).await {
1549                Ok(value) => Ok((vm.output(), value)),
1550                Err(e) => Err(vm.format_runtime_error(&e)),
1551            }
1552        })
1553        .await;
1554    if let Some(t) = timing.as_deref_mut() {
1555        t.run_main = main_start.elapsed();
1556    }
1557    if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
1558        stderr.push_str(&format!("error: {error}\n"));
1559        let profile_rollup = if profile.is_enabled() {
1560            Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1561        } else {
1562            None
1563        };
1564        return finalize_run_error(
1565            stdout,
1566            stderr,
1567            json_session,
1568            summary.as_ref(),
1569            phase.as_ref(),
1570            rusage.as_ref(),
1571            run_started,
1572            profile_rollup.as_ref(),
1573            timing.as_deref(),
1574            harn_vm::tracing::peek_spans().len() as u64,
1575            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1576            "llm_mock_record",
1577            error,
1578        );
1579    }
1580
1581    // Always drain any captured stderr accumulated during execution.
1582    let buffered_stderr = harn_vm::take_stderr_buffer();
1583    stderr.push_str(&buffered_stderr);
1584
1585    let exit_code = match &execution {
1586        Ok((_, return_value)) => exit_code_from_return_value(return_value),
1587        Err(_) => 1,
1588    };
1589
1590    if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
1591        if let Err(error) = emit_run_attestation(
1592            log,
1593            path,
1594            store_base,
1595            attestation_started_at_ms,
1596            exit_code,
1597            options,
1598            &mut stderr,
1599        )
1600        .await
1601        {
1602            stderr.push_str(&format!(
1603                "error: failed to emit provenance receipt: {error}\n"
1604            ));
1605            let profile_rollup = if profile.is_enabled() {
1606                Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1607            } else {
1608                None
1609            };
1610            return finalize_run_error(
1611                stdout,
1612                stderr,
1613                json_session,
1614                summary.as_ref(),
1615                phase.as_ref(),
1616                rusage.as_ref(),
1617                run_started,
1618                profile_rollup.as_ref(),
1619                timing.as_deref(),
1620                harn_vm::tracing::peek_spans().len() as u64,
1621                cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1622                "attestation",
1623                error,
1624            );
1625        }
1626        harn_vm::event_log::reset_active_event_log();
1627    }
1628
1629    match execution {
1630        Ok((output, return_value)) => {
1631            stdout.push_str(output);
1632            let main_events = harn_vm::tracing::peek_spans().len() as u64;
1633            let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1634            let profile_rollup = if profile.is_enabled() {
1635                Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1636            } else {
1637                None
1638            };
1639            let summary_llm = summary.as_ref().map(|_| run_summary_llm_snapshot());
1640            if trace {
1641                stderr.push_str(&render_trace_summary());
1642            }
1643            if let Some(profile_rollup) = profile_rollup.as_ref() {
1644                if let Err(error) =
1645                    render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1646                {
1647                    stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1648                }
1649            }
1650            if exit_code != 0 {
1651                stderr.push_str(&render_return_value_error(&return_value));
1652            }
1653            let aux_emission = emit_run_aux_for_exit(
1654                summary.as_ref(),
1655                phase.as_ref(),
1656                rusage.as_ref(),
1657                run_started,
1658                exit_code,
1659                profile_rollup.as_ref(),
1660                summary_llm,
1661                timing.as_deref(),
1662                main_events,
1663                cpu_ms_total,
1664                json_session.is_some(),
1665                &mut stderr,
1666            );
1667            if let Some(session) = json_session {
1668                if let Some(error) = aux_emission.error {
1669                    let mut outcome = session.finalize_error(
1670                        "run_aux",
1671                        format!("failed to emit auxiliary run JSON: {error}"),
1672                        1,
1673                    );
1674                    outcome.stderr = aux_emission.stderr;
1675                    return outcome;
1676                }
1677                let value = harn_vm::llm::vm_value_to_json(&return_value);
1678                let mut outcome = session.finalize_result(value, aux_emission.exit_code);
1679                outcome.stderr = aux_emission.stderr;
1680                return outcome;
1681            }
1682            RunOutcome {
1683                stdout,
1684                stderr,
1685                exit_code: aux_emission.exit_code,
1686            }
1687        }
1688        Err(rendered_error) => {
1689            stderr.push_str(&rendered_error);
1690            let main_events = harn_vm::tracing::peek_spans().len() as u64;
1691            let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1692            let profile_rollup = if profile.is_enabled() {
1693                Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1694            } else {
1695                None
1696            };
1697            if let Some(profile_rollup) = profile_rollup.as_ref() {
1698                if let Err(error) =
1699                    render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1700                {
1701                    stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1702                }
1703            }
1704            let aux_emission = emit_run_aux_for_exit(
1705                summary.as_ref(),
1706                phase.as_ref(),
1707                rusage.as_ref(),
1708                run_started,
1709                1,
1710                profile_rollup.as_ref(),
1711                None,
1712                timing.as_deref(),
1713                main_events,
1714                cpu_ms_total,
1715                json_session.is_some(),
1716                &mut stderr,
1717            );
1718            if let Some(session) = json_session {
1719                let mut outcome =
1720                    session.finalize_error("runtime", rendered_error, aux_emission.exit_code);
1721                outcome.stderr = aux_emission.stderr;
1722                return outcome;
1723            }
1724            RunOutcome {
1725                stdout,
1726                stderr,
1727                exit_code: aux_emission.exit_code,
1728            }
1729        }
1730    }
1731}
1732
1733fn render_and_persist_profile_rollup(
1734    options: &RunProfileOptions,
1735    profile: &harn_vm::profile::RunProfile,
1736    stderr: &mut String,
1737) -> Result<(), String> {
1738    if options.text {
1739        stderr.push_str(&harn_vm::profile::render(profile));
1740    }
1741    if let Some(path) = options.json_path.as_ref() {
1742        if let Some(parent) = path.parent() {
1743            if !parent.as_os_str().is_empty() {
1744                fs::create_dir_all(parent)
1745                    .map_err(|error| format!("create {}: {error}", parent.display()))?;
1746            }
1747        }
1748        let json = serde_json::to_string_pretty(profile)
1749            .map_err(|error| format!("serialize profile: {error}"))?;
1750        fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
1751    }
1752    Ok(())
1753}
1754
1755fn build_run_summary<'a>(
1756    started: Instant,
1757    exit_code: i32,
1758    profile: Option<&'a harn_vm::profile::RunProfile>,
1759    llm: RunSummaryLlm,
1760) -> RunSummary<'a> {
1761    RunSummary {
1762        schema_version: RUN_SUMMARY_SCHEMA_VERSION,
1763        event: "run_summary",
1764        wall_time_ms: started.elapsed().as_millis().min(u128::from(u64::MAX)) as u64,
1765        exit_code,
1766        llm,
1767        profile,
1768    }
1769}
1770
1771fn run_summary_llm_snapshot() -> RunSummaryLlm {
1772    let (input_tokens, output_tokens, time_ms, call_count) = harn_vm::llm::peek_trace_summary();
1773    let cost_usd = harn_vm::llm::peek_total_cost();
1774    RunSummaryLlm {
1775        call_count,
1776        input_tokens,
1777        output_tokens,
1778        time_ms,
1779        cost_usd: if cost_usd.is_finite() { cost_usd } else { 0.0 },
1780    }
1781}
1782
1783struct RunAuxEmission {
1784    stderr: String,
1785    exit_code: i32,
1786    error: Option<String>,
1787}
1788
1789#[allow(clippy::too_many_arguments)]
1790fn emit_run_aux_for_exit(
1791    summary: Option<&RunSummaryOptions>,
1792    phase: Option<&RunPhaseOptions>,
1793    rusage: Option<&RunRusageOptions>,
1794    started: Instant,
1795    exit_code: i32,
1796    profile: Option<&harn_vm::profile::RunProfile>,
1797    llm: Option<RunSummaryLlm>,
1798    timing: Option<&RunTiming>,
1799    main_events: u64,
1800    cpu_ms_total: Option<u64>,
1801    json_mode: bool,
1802    stderr: &mut String,
1803) -> RunAuxEmission {
1804    let mut aux_stderr = String::new();
1805    let mut final_exit_code = exit_code;
1806    let mut aux_error = None;
1807    let aux_target = if json_mode { &mut aux_stderr } else { stderr };
1808    let default_timing = RunTiming::default();
1809    let timing = timing.unwrap_or(&default_timing);
1810
1811    if let Some(options) = summary {
1812        let llm = llm.unwrap_or_else(run_summary_llm_snapshot);
1813        let summary = build_run_summary(started, exit_code, profile, llm);
1814        if let Err(error) = emit_raw_json_line(&options.sink, &summary, "run summary", aux_target) {
1815            record_aux_error(
1816                &mut final_exit_code,
1817                &mut aux_error,
1818                aux_target,
1819                "run summary",
1820                error,
1821            );
1822        }
1823    }
1824    if let Some(options) = phase {
1825        let phase_event = RunPhaseEvent {
1826            schema_version: RUN_PHASE_SCHEMA_VERSION,
1827            event: "run_phase",
1828            phases: time::build_phase_records(timing, main_events),
1829        };
1830        if let Err(error) = emit_raw_json_line(&options.sink, &phase_event, "run phase", aux_target)
1831        {
1832            record_aux_error(
1833                &mut final_exit_code,
1834                &mut aux_error,
1835                aux_target,
1836                "run phase",
1837                error,
1838            );
1839        }
1840    }
1841    if let Some(options) = rusage {
1842        let rusage_event = RunRusageEvent {
1843            schema_version: RUN_RUSAGE_SCHEMA_VERSION,
1844            event: "run_rusage",
1845            cpu_ms: cpu_ms_total.unwrap_or(0),
1846        };
1847        if let Err(error) =
1848            emit_raw_json_line(&options.sink, &rusage_event, "run rusage", aux_target)
1849        {
1850            record_aux_error(
1851                &mut final_exit_code,
1852                &mut aux_error,
1853                aux_target,
1854                "run rusage",
1855                error,
1856            );
1857        }
1858    }
1859
1860    RunAuxEmission {
1861        stderr: aux_stderr,
1862        exit_code: final_exit_code,
1863        error: aux_error,
1864    }
1865}
1866
1867fn record_aux_error(
1868    final_exit_code: &mut i32,
1869    aux_error: &mut Option<String>,
1870    stderr: &mut String,
1871    label: &str,
1872    error: String,
1873) {
1874    stderr.push_str(&format!("error: failed to emit {label}: {error}\n"));
1875    if *final_exit_code == 0 {
1876        *final_exit_code = 1;
1877    }
1878    if aux_error.is_none() {
1879        *aux_error = Some(error);
1880    }
1881}
1882
1883fn emit_raw_json_line(
1884    sink: &RunJsonSink,
1885    value: &impl Serialize,
1886    label: &str,
1887    stderr: &mut String,
1888) -> Result<(), String> {
1889    let line =
1890        serde_json::to_string(value).map_err(|error| format!("serialize {label}: {error}"))? + "\n";
1891    match &sink.target {
1892        RunJsonSinkTarget::Stderr => {
1893            stderr.push_str(&line);
1894            Ok(())
1895        }
1896        RunJsonSinkTarget::File(path) => write_raw_json_file(path, &line),
1897        RunJsonSinkTarget::Fd(fd) => write_raw_json_fd(*fd, &line, sink.fd_flag),
1898    }
1899}
1900
1901fn write_raw_json_file(path: &Path, line: &str) -> Result<(), String> {
1902    if let Some(parent) = path.parent() {
1903        if !parent.as_os_str().is_empty() {
1904            fs::create_dir_all(parent)
1905                .map_err(|error| format!("create {}: {error}", parent.display()))?;
1906        }
1907    }
1908    fs::write(path, line).map_err(|error| format!("write {}: {error}", path.display()))
1909}
1910
1911#[cfg(unix)]
1912fn write_raw_json_fd(fd: i32, line: &str, flag: &str) -> Result<(), String> {
1913    use std::fs::File;
1914    use std::os::unix::io::FromRawFd;
1915
1916    if fd < 0 {
1917        return Err(format!("invalid {flag} {fd}: must be non-negative"));
1918    }
1919    let duped = unsafe { libc::dup(fd) };
1920    if duped < 0 {
1921        return Err(format!(
1922            "duplicate {flag} {fd}: {}",
1923            io::Error::last_os_error()
1924        ));
1925    }
1926    let mut file = unsafe { File::from_raw_fd(duped) };
1927    file.write_all(line.as_bytes())
1928        .and_then(|_| file.flush())
1929        .map_err(|error| format!("write {flag} {fd}: {error}"))
1930}
1931
1932#[cfg(not(unix))]
1933fn write_raw_json_fd(_fd: i32, _line: &str, flag: &str) -> Result<(), String> {
1934    Err(format!("{flag} is only supported on Unix platforms"))
1935}
1936
1937async fn append_run_provenance_event(
1938    log: &Arc<harn_vm::event_log::AnyEventLog>,
1939    kind: &str,
1940    payload: serde_json::Value,
1941) {
1942    let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
1943        return;
1944    };
1945    let _ = log
1946        .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
1947        .await;
1948}
1949
1950async fn emit_run_attestation(
1951    log: &Arc<harn_vm::event_log::AnyEventLog>,
1952    path: &str,
1953    store_base: &Path,
1954    started_at_ms: i64,
1955    exit_code: i32,
1956    options: &RunAttestationOptions,
1957    stderr: &mut String,
1958) -> Result<(), String> {
1959    let finished_at_ms = now_ms();
1960    let status = if exit_code == 0 { "success" } else { "failure" };
1961    append_run_provenance_event(
1962        log,
1963        "finished",
1964        serde_json::json!({
1965            "pipeline": path,
1966            "status": status,
1967            "exit_code": exit_code,
1968        }),
1969    )
1970    .await;
1971    log.flush()
1972        .await
1973        .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
1974    let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
1975        .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
1976    let (signing_key, key_id) =
1977        harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
1978            .await
1979            .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
1980    let receipt = harn_vm::build_signed_receipt(
1981        log,
1982        harn_vm::ReceiptBuildOptions {
1983            pipeline: path.to_string(),
1984            status: status.to_string(),
1985            started_at_ms,
1986            finished_at_ms,
1987            exit_code,
1988            producer_name: "harn-cli".to_string(),
1989            producer_version: env!("CARGO_PKG_VERSION").to_string(),
1990        },
1991        &signing_key,
1992        key_id,
1993    )
1994    .await
1995    .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
1996    let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
1997    if let Some(parent) = receipt_path.parent() {
1998        fs::create_dir_all(parent)
1999            .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2000    }
2001    let encoded = serde_json::to_vec_pretty(&receipt)
2002        .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
2003    fs::write(&receipt_path, encoded)
2004        .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
2005    stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
2006    Ok(())
2007}
2008
2009fn receipt_output_path(
2010    store_base: &Path,
2011    options: &RunAttestationOptions,
2012    receipt_id: &str,
2013) -> PathBuf {
2014    if let Some(path) = options.receipt_out.as_ref() {
2015        return path.clone();
2016    }
2017    harn_vm::runtime_paths::state_root(store_base)
2018        .join("receipts")
2019        .join(format!("{receipt_id}.json"))
2020}
2021
2022fn now_ms() -> i64 {
2023    std::time::SystemTime::now()
2024        .duration_since(std::time::UNIX_EPOCH)
2025        .map(|duration| duration.as_millis() as i64)
2026        .unwrap_or(0)
2027}
2028
2029/// Map a script's top-level return value to a process exit code.
2030///
2031/// - `int n`             → exit n (clamped to 0..=255)
2032/// - `Result::Ok(_)`     → exit 0
2033/// - `Result::Err(_)`    → exit 1
2034/// - anything else       → exit 0
2035fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
2036    use harn_vm::VmValue;
2037    match value {
2038        VmValue::Int(n) => (*n).clamp(0, 255) as i32,
2039        VmValue::EnumVariant(enum_variant) if enum_variant.is_variant("Result", "Err") => 1,
2040        _ => 0,
2041    }
2042}
2043
2044/// State for a single `harn run --json` invocation. Installs the
2045/// run-event sink in [`Self::install`] and removes it in [`Drop`], so
2046/// every exit path through `execute_run_inner` cleans up correctly
2047/// even if a panic unwinds out of the VM. Save-and-restore of any
2048/// previously installed sink keeps the helper safe to nest (rare, but
2049/// in-process embeddings can call into `harn run` from a host that
2050/// already had a sink wired).
2051///
2052/// `finalize_result` / `finalize_error` emit the terminal event and
2053/// build a [`RunOutcome`] whose stdout/stderr captured-buffer fields
2054/// stay **empty** — the canonical stream is on `out`.
2055/// `outcome.exit_code` still carries the process exit code so the
2056/// binary entry can `process::exit(...)`.
2057struct JsonRunSession {
2058    emitter: self::json_events::NdjsonEmitter,
2059    prior_sink: Option<Arc<dyn harn_vm::run_events::RunEventSink>>,
2060}
2061
2062impl JsonRunSession {
2063    fn install(options: RunJsonOptions, out: Box<dyn io::Write + Send>) -> Self {
2064        let emitter = NdjsonEmitter::new(out, options.quiet);
2065        let prior_sink = harn_vm::run_events::install_sink(emitter.sink());
2066        Self {
2067            emitter,
2068            prior_sink,
2069        }
2070    }
2071
2072    fn finalize_result(self, value: serde_json::Value, exit_code: i32) -> RunOutcome {
2073        self.emitter.emit_result(value, exit_code);
2074        RunOutcome {
2075            stdout: String::new(),
2076            stderr: String::new(),
2077            exit_code,
2078        }
2079    }
2080
2081    fn finalize_error(
2082        self,
2083        code: impl Into<String>,
2084        message: impl Into<String>,
2085        exit_code: i32,
2086    ) -> RunOutcome {
2087        self.emitter.emit_error(code, message);
2088        RunOutcome {
2089            stdout: String::new(),
2090            stderr: String::new(),
2091            exit_code,
2092        }
2093    }
2094}
2095
2096impl Drop for JsonRunSession {
2097    fn drop(&mut self) {
2098        match self.prior_sink.take() {
2099            Some(prior) => {
2100                harn_vm::run_events::install_sink(prior);
2101            }
2102            None => harn_vm::run_events::clear_sink(),
2103        }
2104    }
2105}
2106
2107#[allow(clippy::too_many_arguments)]
2108fn finalize_run_error(
2109    stdout: String,
2110    mut stderr: String,
2111    json_session: Option<JsonRunSession>,
2112    summary: Option<&RunSummaryOptions>,
2113    phase: Option<&RunPhaseOptions>,
2114    rusage: Option<&RunRusageOptions>,
2115    started: Instant,
2116    profile: Option<&harn_vm::profile::RunProfile>,
2117    timing: Option<&RunTiming>,
2118    main_events: u64,
2119    cpu_ms_total: Option<u64>,
2120    code: impl Into<String>,
2121    message: impl Into<String>,
2122) -> RunOutcome {
2123    let aux_emission = emit_run_aux_for_exit(
2124        summary,
2125        phase,
2126        rusage,
2127        started,
2128        1,
2129        profile,
2130        None,
2131        timing,
2132        main_events,
2133        cpu_ms_total,
2134        json_session.is_some(),
2135        &mut stderr,
2136    );
2137    if let Some(session) = json_session {
2138        let mut outcome = session.finalize_error(code, message, aux_emission.exit_code);
2139        outcome.stderr = aux_emission.stderr;
2140        return outcome;
2141    }
2142    RunOutcome {
2143        stdout,
2144        stderr,
2145        exit_code: aux_emission.exit_code,
2146    }
2147}
2148
2149/// Translate a preflight failure into either the `--json` error event
2150/// stream or a plain stderr message plus exit-code 1. Keeps the
2151/// `.harnpack` verify path's error reporting consistent with the rest
2152/// of `harn run`.
2153fn finalize_harnpack_error(
2154    mut stderr: String,
2155    json_session: Option<JsonRunSession>,
2156    summary: Option<&RunSummaryOptions>,
2157    phase: Option<&RunPhaseOptions>,
2158    rusage: Option<&RunRusageOptions>,
2159    started: Instant,
2160    err: HarnpackError,
2161) -> RunOutcome {
2162    let code = err.code;
2163    let message = err.message;
2164    stderr.push_str(&format!("error: {message}\n"));
2165    finalize_run_error(
2166        String::new(),
2167        stderr,
2168        json_session,
2169        summary,
2170        phase,
2171        rusage,
2172        started,
2173        None,
2174        None,
2175        0,
2176        None,
2177        code,
2178        message,
2179    )
2180}
2181
2182/// Successful `--dry-run-verify` path. Reports the bundle hash and
2183/// signature outcome on stderr (since stdout belongs to the script) and
2184/// emits a terminal `result` event when `--json` is active so consumers
2185/// see the run complete.
2186fn finalize_harnpack_dry_run(
2187    mut stderr: String,
2188    json_session: Option<JsonRunSession>,
2189    summary_options: Option<&RunSummaryOptions>,
2190    phase_options: Option<&RunPhaseOptions>,
2191    rusage_options: Option<&RunRusageOptions>,
2192    started: Instant,
2193    cpu_ms_total: Option<u64>,
2194    prepared: &PreparedHarnpack,
2195) -> RunOutcome {
2196    let summary = format!(
2197        "[harn] harnpack verify ok: bundle_hash={}, signature_verified={}, cache_hit={}\n",
2198        prepared.bundle_hash, prepared.signature_verified, prepared.cache_hit
2199    );
2200    stderr.push_str(&summary);
2201    let aux_emission = emit_run_aux_for_exit(
2202        summary_options,
2203        phase_options,
2204        rusage_options,
2205        started,
2206        0,
2207        None,
2208        None,
2209        None,
2210        0,
2211        cpu_ms_total,
2212        json_session.is_some(),
2213        &mut stderr,
2214    );
2215    if let Some(session) = json_session {
2216        if let Some(error) = aux_emission.error {
2217            let mut outcome = session.finalize_error(
2218                "run_aux",
2219                format!("failed to emit auxiliary run JSON: {error}"),
2220                1,
2221            );
2222            outcome.stderr = aux_emission.stderr;
2223            return outcome;
2224        }
2225        let value = serde_json::json!({
2226            "bundle_hash": prepared.bundle_hash,
2227            "signature_verified": prepared.signature_verified,
2228            "key_id": prepared.key_id,
2229            "cache_hit": prepared.cache_hit,
2230            "dry_run_verify": true,
2231        });
2232        let mut outcome = session.finalize_result(value, aux_emission.exit_code);
2233        outcome.stderr = aux_emission.stderr;
2234        return outcome;
2235    }
2236    RunOutcome {
2237        stdout: String::new(),
2238        stderr,
2239        exit_code: aux_emission.exit_code,
2240    }
2241}
2242
2243fn render_return_value_error(value: &harn_vm::VmValue) -> String {
2244    let harn_vm::VmValue::EnumVariant(enum_variant) = value else {
2245        return String::new();
2246    };
2247    if !enum_variant.is_variant("Result", "Err") {
2248        return String::new();
2249    }
2250    let rendered = enum_variant
2251        .fields
2252        .first()
2253        .map(|p| p.display())
2254        .unwrap_or_default();
2255    if rendered.is_empty() {
2256        "error\n".to_string()
2257    } else if rendered.ends_with('\n') {
2258        rendered
2259    } else {
2260        format!("{rendered}\n")
2261    }
2262}
2263
2264/// Connect to MCP servers declared in `harn.toml` and register them as
2265/// `mcp.<name>` globals on the VM. Connection failures are warned but do
2266/// not abort execution.
2267///
2268/// Servers with `lazy = true` are registered with the VM-side MCP
2269/// registry but NOT booted — their processes start the first time a
2270/// skill's `requires_mcp` list names them or user code calls
2271/// `mcp_ensure_active("name")` / `mcp_call(mcp.<name>, ...)`.
2272pub(crate) async fn connect_mcp_servers(
2273    servers: &[package::McpServerConfig],
2274    vm: &mut harn_vm::Vm,
2275) {
2276    use std::collections::BTreeMap;
2277    use std::time::Duration;
2278
2279    let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
2280    let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
2281
2282    for server in servers {
2283        let resolved_auth = match mcp::resolve_auth_for_server(server).await {
2284            Ok(resolution) => resolution,
2285            Err(error) => {
2286                eprintln!(
2287                    "warning: mcp: failed to load auth for '{}': {}",
2288                    server.name, error
2289                );
2290                AuthResolution::None
2291            }
2292        };
2293        let spec = serde_json::json!({
2294            "name": server.name,
2295            "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
2296            "command": server.command,
2297            "args": server.args,
2298            "env": server.env,
2299            "url": server.url,
2300            "auth_token": match resolved_auth {
2301                AuthResolution::Bearer(token) => Some(token),
2302                AuthResolution::None => server.auth_token.clone(),
2303            },
2304            "protocol_version": server.protocol_version,
2305            "protocol_mode": server.protocol_mode,
2306            "proxy_server_name": server.proxy_server_name,
2307        });
2308
2309        // Register with the VM-side registry regardless of lazy flag —
2310        // skill activation and `mcp_ensure_active` look up specs there.
2311        registrations.push(harn_vm::RegisteredMcpServer {
2312            name: server.name.clone(),
2313            spec: spec.clone(),
2314            lazy: server.lazy,
2315            card: server.card.clone(),
2316            keep_alive: server.keep_alive_ms.map(Duration::from_millis),
2317        });
2318
2319        if server.lazy {
2320            eprintln!(
2321                "[harn] mcp: deferred '{}' (lazy, boots on first use)",
2322                server.name
2323            );
2324            continue;
2325        }
2326
2327        match harn_vm::connect_mcp_server_from_json(&spec).await {
2328            Ok(handle) => {
2329                eprintln!("[harn] mcp: connected to '{}'", server.name);
2330                harn_vm::mcp_install_active(&server.name, handle.clone());
2331                mcp_dict.insert(server.name.clone(), harn_vm::VmValue::mcp_client(handle));
2332            }
2333            Err(e) => {
2334                eprintln!(
2335                    "warning: mcp: failed to connect to '{}': {}",
2336                    server.name, e
2337                );
2338            }
2339        }
2340    }
2341
2342    // Install registrations AFTER eager connects so `install_active`
2343    // above doesn't get overwritten.
2344    harn_vm::mcp_register_servers(registrations);
2345
2346    if !mcp_dict.is_empty() {
2347        vm.set_global("mcp", harn_vm::VmValue::Dict(std::sync::Arc::new(mcp_dict)));
2348    }
2349}
2350
2351pub(crate) fn render_trace_summary() -> String {
2352    use std::fmt::Write;
2353    let entries = harn_vm::llm::take_trace();
2354    if entries.is_empty() {
2355        return String::new();
2356    }
2357    let mut out = String::new();
2358    let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
2359    let mut total_input = 0i64;
2360    let mut total_output = 0i64;
2361    let mut total_ms = 0u64;
2362    for (i, entry) in entries.iter().enumerate() {
2363        let _ = writeln!(
2364            out,
2365            "  #{}: {} | {} in + {} out tokens | {} ms",
2366            i + 1,
2367            entry.model,
2368            entry.input_tokens,
2369            entry.output_tokens,
2370            entry.duration_ms,
2371        );
2372        total_input += entry.input_tokens;
2373        total_output += entry.output_tokens;
2374        total_ms += entry.duration_ms;
2375    }
2376    let total_tokens = total_input + total_output;
2377    // Rough cost estimate using Sonnet 4 pricing ($3/MTok in, $15/MTok out).
2378    let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
2379    let _ = writeln!(
2380        out,
2381        "  \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
2382        entries.len(),
2383        if entries.len() == 1 { "" } else { "s" },
2384        total_tokens,
2385        total_input,
2386        total_output,
2387        total_ms,
2388        cost,
2389    );
2390    out
2391}
2392
2393/// Run a .harn file as an MCP server using the script-driven surface.
2394/// The pipeline must call `mcp_tools(registry)` (or the alias
2395/// `mcp_serve(registry)`) so the CLI can expose its tools, and may
2396/// register additional resources/prompts via `mcp_resource(...)` /
2397/// `mcp_resource_template(...)` / `mcp_prompt(...)`.
2398///
2399/// Dispatched into by `harn serve mcp <file>` when the script does not
2400/// define any `pub fn` exports — see `commands::serve::run_mcp_server`.
2401///
2402/// `card_source` — optional `--card` argument. Accepts either a path to
2403/// a JSON file or an inline JSON string. When present, the card is
2404/// embedded in the `initialize` response and exposed as the
2405/// `well-known://mcp-card` resource.
2406pub(crate) async fn run_file_mcp_serve(
2407    path: &str,
2408    card_source: Option<&str>,
2409    mode: RunFileMcpServeMode,
2410) {
2411    let mut diagnostics = String::new();
2412    let Some(LoadedChunk { source, chunk }) = compile_or_load_chunk_for_run(path, &mut diagnostics)
2413    else {
2414        eprint!("{diagnostics}");
2415        process::exit(1);
2416    };
2417    if !diagnostics.is_empty() {
2418        eprint!("{diagnostics}");
2419    }
2420
2421    let mut vm = harn_vm::Vm::new();
2422    harn_vm::register_vm_stdlib(&mut vm);
2423    crate::install_default_hostlib(&mut vm);
2424    let source_parent = std::path::Path::new(path)
2425        .parent()
2426        .unwrap_or(std::path::Path::new("."));
2427    let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
2428    let store_base = project_root.as_deref().unwrap_or(source_parent);
2429    harn_vm::register_store_builtins(&mut vm, store_base);
2430    harn_vm::register_metadata_builtins(&mut vm, store_base);
2431    let pipeline_name = std::path::Path::new(path)
2432        .file_stem()
2433        .and_then(|s| s.to_str())
2434        .unwrap_or("default");
2435    harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
2436    vm.set_source_info(path, &source);
2437    if let Some(ref root) = project_root {
2438        vm.set_project_root(root);
2439    }
2440    if let Some(p) = std::path::Path::new(path).parent() {
2441        if !p.as_os_str().is_empty() {
2442            vm.set_source_dir(p);
2443        }
2444    }
2445
2446    // Same skill discovery as `harn run` — see comment there.
2447    let loaded = load_skills(&SkillLoaderInputs {
2448        cli_dirs: Vec::new(),
2449        source_path: Some(std::path::PathBuf::from(path)),
2450    });
2451    emit_loader_warnings(&loaded.loader_warnings);
2452    install_skills_global(&mut vm, &loaded);
2453
2454    let extensions = package::load_runtime_extensions(Path::new(path));
2455    package::install_runtime_extensions(&extensions);
2456    if let Some(manifest) = extensions.root_manifest.as_ref() {
2457        if !manifest.mcp.is_empty() {
2458            connect_mcp_servers(&manifest.mcp, &mut vm).await;
2459        }
2460    }
2461    if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
2462        eprintln!("error: failed to install manifest triggers: {error}");
2463        process::exit(1);
2464    }
2465    if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
2466        eprintln!("error: failed to install manifest hooks: {error}");
2467        process::exit(1);
2468    }
2469
2470    let local = tokio::task::LocalSet::new();
2471    local
2472        .run_until(async {
2473            match vm.execute(&chunk).await {
2474                Ok(_) => {}
2475                Err(e) => {
2476                    eprint!("{}", vm.format_runtime_error(&e));
2477                    process::exit(1);
2478                }
2479            }
2480
2481            // Pipeline output goes to stderr — stdout is the MCP transport.
2482            let output = vm.output();
2483            if !output.is_empty() {
2484                eprint!("{output}");
2485            }
2486
2487            let registry = match harn_vm::take_mcp_serve_registry() {
2488                Some(r) => r,
2489                None => {
2490                    eprintln!("error: pipeline did not call mcp_serve(registry)");
2491                    eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
2492                    process::exit(1);
2493                }
2494            };
2495
2496            let tools = match harn_vm::tool_registry_to_mcp_tools(&registry) {
2497                Ok(t) => t,
2498                Err(e) => {
2499                    eprintln!("error: {e}");
2500                    process::exit(1);
2501                }
2502            };
2503
2504            let resources = harn_vm::take_mcp_serve_resources();
2505            let resource_templates = harn_vm::take_mcp_serve_resource_templates();
2506            let prompts = harn_vm::take_mcp_serve_prompts();
2507
2508            let server_name = std::path::Path::new(path)
2509                .file_stem()
2510                .and_then(|s| s.to_str())
2511                .unwrap_or("harn")
2512                .to_string();
2513
2514            let mut caps = Vec::new();
2515            if !tools.is_empty() {
2516                caps.push(format!(
2517                    "{} tool{}",
2518                    tools.len(),
2519                    if tools.len() == 1 { "" } else { "s" }
2520                ));
2521            }
2522            let total_resources = resources.len() + resource_templates.len();
2523            if total_resources > 0 {
2524                caps.push(format!(
2525                    "{total_resources} resource{}",
2526                    if total_resources == 1 { "" } else { "s" }
2527                ));
2528            }
2529            if !prompts.is_empty() {
2530                caps.push(format!(
2531                    "{} prompt{}",
2532                    prompts.len(),
2533                    if prompts.len() == 1 { "" } else { "s" }
2534                ));
2535            }
2536            eprintln!(
2537                "[harn] serve mcp: serving {} as '{server_name}'",
2538                caps.join(", ")
2539            );
2540
2541            let mut server =
2542                harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
2543            if let Some(source) = card_source {
2544                match resolve_card_source(source) {
2545                    Ok(card) => server = server.with_server_card(card),
2546                    Err(e) => {
2547                        eprintln!("error: --card: {e}");
2548                        process::exit(1);
2549                    }
2550                }
2551            }
2552            match mode {
2553                RunFileMcpServeMode::Stdio => {
2554                    if let Err(e) = server.run(&mut vm).await {
2555                        eprintln!("error: MCP server error: {e}");
2556                        process::exit(1);
2557                    }
2558                }
2559                RunFileMcpServeMode::Http(http) => {
2560                    let RunFileMcpServeHttp {
2561                        options,
2562                        auth_policy,
2563                    } = *http;
2564                    if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
2565                        server,
2566                        vm,
2567                        options,
2568                        auth_policy,
2569                    )
2570                    .await
2571                    {
2572                        eprintln!("error: MCP server error: {e}");
2573                        process::exit(1);
2574                    }
2575                }
2576            }
2577        })
2578        .await;
2579}
2580
2581/// Accept either a path to a JSON file or an inline JSON blob and
2582/// return the parsed `serde_json::Value`. Used by `--card`. Disambiguates
2583/// by peeking at the first non-whitespace character: `{` → inline JSON,
2584/// anything else → path.
2585pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
2586    let trimmed = source.trim_start();
2587    if trimmed.starts_with('{') || trimmed.starts_with('[') {
2588        return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
2589    }
2590    let path = std::path::Path::new(source);
2591    harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
2592}
2593
2594pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
2595    use notify::{Event, EventKind, RecursiveMode, Watcher};
2596
2597    let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
2598        eprintln!("Error: {e}");
2599        process::exit(1);
2600    });
2601    let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
2602
2603    eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
2604    run_file(
2605        path,
2606        false,
2607        denied_builtins.clone(),
2608        Vec::new(),
2609        CliLlmMockMode::Off,
2610        None,
2611        RunProfileOptions::default(),
2612    )
2613    .await;
2614
2615    let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
2616    let _watcher = {
2617        let tx = tx.clone();
2618        let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
2619            if let Ok(event) = res {
2620                if matches!(
2621                    event.kind,
2622                    EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
2623                ) {
2624                    let has_harn = event
2625                        .paths
2626                        .iter()
2627                        .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
2628                    if has_harn {
2629                        let _ = tx.blocking_send(());
2630                    }
2631                }
2632            }
2633        })
2634        .unwrap_or_else(|e| {
2635            eprintln!("Error setting up file watcher: {e}");
2636            process::exit(1);
2637        });
2638        watcher
2639            .watch(watch_dir, RecursiveMode::Recursive)
2640            .unwrap_or_else(|e| {
2641                eprintln!("Error watching directory: {e}");
2642                process::exit(1);
2643            });
2644        watcher // keep alive
2645    };
2646
2647    eprintln!(
2648        "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
2649        watch_dir.display()
2650    );
2651
2652    loop {
2653        rx.recv().await;
2654        // Debounce: let bursts of events settle for 200ms before re-running.
2655        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2656        while rx.try_recv().is_ok() {}
2657
2658        eprintln!();
2659        eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
2660        run_file(
2661            path,
2662            false,
2663            denied_builtins.clone(),
2664            Vec::new(),
2665            CliLlmMockMode::Off,
2666            None,
2667            RunProfileOptions::default(),
2668        )
2669        .await;
2670    }
2671}
2672
2673#[cfg(test)]
2674mod tests;