Skip to main content

harn_cli/commands/run/
mod.rs

1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use harn_parser::DiagnosticSeverity;
11use harn_vm::event_log::EventLog;
12use serde::Serialize;
13
14use crate::commands::mcp::{self, AuthResolution};
15use crate::commands::time::{self, PhaseRecord, RunTiming};
16use crate::package;
17use crate::parse_source_file;
18use crate::skill_loader::{
19    canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
20    SkillLoaderInputs,
21};
22
23mod explain_cost;
24pub mod harnpack;
25pub mod json_events;
26
27use self::harnpack::{HarnpackError, HarnpackRunOptions, PreparedHarnpack};
28use self::json_events::NdjsonEmitter;
29
30/// JSON event-stream configuration for `--json` runs.
31#[derive(Clone, Default)]
32pub struct RunJsonOptions {
33    /// Suppress `stdout` / `stderr` events. Transcript, tool, hook,
34    /// persona, and the terminal result/error events still flow.
35    pub quiet: bool,
36}
37
38/// Post-run summary configuration for `harn run --emit-summary-json`.
39#[derive(Clone, Debug)]
40pub struct RunSummaryOptions {
41    pub sink: RunJsonSink,
42}
43
44#[derive(Clone, Debug)]
45pub struct RunPhaseOptions {
46    pub sink: RunJsonSink,
47}
48
49#[derive(Clone, Debug)]
50pub struct RunRusageOptions {
51    pub sink: RunJsonSink,
52}
53
54#[derive(Clone, Debug, Default)]
55pub struct RunAuxOptions {
56    pub summary: Option<RunSummaryOptions>,
57    pub phase: Option<RunPhaseOptions>,
58    pub rusage: Option<RunRusageOptions>,
59}
60
61#[derive(Clone, Debug)]
62pub struct RunJsonSink {
63    pub target: RunJsonSinkTarget,
64    pub fd_flag: &'static str,
65}
66
67#[derive(Clone, Debug)]
68pub enum RunJsonSinkTarget {
69    /// Append the summary to the captured stderr buffer so it remains
70    /// terminal after all diagnostics that `run_file_with_skill_dirs`
71    /// flushes on return.
72    Stderr,
73    File(PathBuf),
74    Fd(i32),
75}
76
77#[derive(Serialize)]
78struct RunSummary<'a> {
79    schema_version: u32,
80    event: &'static str,
81    wall_time_ms: u64,
82    exit_code: i32,
83    llm: RunSummaryLlm,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    profile: Option<&'a harn_vm::profile::RunProfile>,
86}
87
88#[derive(Serialize)]
89struct RunSummaryLlm {
90    call_count: i64,
91    input_tokens: i64,
92    output_tokens: i64,
93    time_ms: i64,
94    cost_usd: f64,
95}
96
97pub const RUN_SUMMARY_SCHEMA_VERSION: u32 = 1;
98pub const RUN_PHASE_SCHEMA_VERSION: u32 = 1;
99pub const RUN_RUSAGE_SCHEMA_VERSION: u32 = 1;
100
101#[derive(Serialize)]
102struct RunPhaseEvent {
103    schema_version: u32,
104    event: &'static str,
105    phases: Vec<PhaseRecord>,
106}
107
108#[derive(Serialize)]
109struct RunRusageEvent {
110    schema_version: u32,
111    event: &'static str,
112    cpu_ms: u64,
113}
114
115pub(crate) fn run_summary_options_from_args(
116    args: &crate::cli::RunArgs,
117) -> Option<RunSummaryOptions> {
118    args.emit_summary_json.then(|| RunSummaryOptions {
119        sink: build_run_json_sink(args.summary_file.clone(), args.summary_fd, "--summary-fd"),
120    })
121}
122
123pub(crate) fn run_aux_options_from_args(args: &crate::cli::RunArgs) -> RunAuxOptions {
124    RunAuxOptions {
125        summary: run_summary_options_from_args(args),
126        phase: run_phase_options_from_args(args),
127        rusage: run_rusage_options_from_args(args),
128    }
129}
130
131pub(crate) fn run_phase_options_from_args(args: &crate::cli::RunArgs) -> Option<RunPhaseOptions> {
132    args.emit_phase_json.then(|| RunPhaseOptions {
133        sink: build_run_json_sink(args.phase_file.clone(), args.phase_fd, "--phase-fd"),
134    })
135}
136
137pub(crate) fn run_rusage_options_from_args(args: &crate::cli::RunArgs) -> Option<RunRusageOptions> {
138    args.emit_rusage_json.then(|| RunRusageOptions {
139        sink: build_run_json_sink(args.rusage_file.clone(), args.rusage_fd, "--rusage-fd"),
140    })
141}
142
143fn build_run_json_sink(
144    file: Option<PathBuf>,
145    fd: Option<i32>,
146    fd_flag: &'static str,
147) -> RunJsonSink {
148    RunJsonSink {
149        target: if let Some(path) = file {
150            RunJsonSinkTarget::File(path)
151        } else if let Some(fd) = fd {
152            RunJsonSinkTarget::Fd(fd)
153        } else {
154            RunJsonSinkTarget::Stderr
155        },
156        fd_flag,
157    }
158}
159
160pub(crate) enum RunFileMcpServeMode {
161    Stdio,
162    Http {
163        options: harn_serve::McpHttpServeOptions,
164        auth_policy: harn_serve::AuthPolicy,
165    },
166}
167
168/// Core builtins that are never denied, even when using `--allow`.
169const CORE_BUILTINS: &[&str] = &[
170    "println",
171    "print",
172    "log",
173    "type_of",
174    "to_string",
175    "to_int",
176    "to_float",
177    "len",
178    "assert",
179    "assert_eq",
180    "assert_ne",
181    "json_parse",
182    "json_stringify",
183    "runtime_context",
184    "task_current",
185    "runtime_context_values",
186    "runtime_context_get",
187    "runtime_context_set",
188    "runtime_context_clear",
189];
190
191/// Build the set of denied builtin names from `--deny` or `--allow` flags.
192///
193/// - `--deny a,b,c` denies exactly those names.
194/// - `--allow a,b,c` denies everything *except* the listed names and the core builtins.
195pub(crate) fn build_denied_builtins(
196    deny_csv: Option<&str>,
197    allow_csv: Option<&str>,
198) -> HashSet<String> {
199    if let Some(csv) = deny_csv {
200        csv.split(',')
201            .map(|s| s.trim().to_string())
202            .filter(|s| !s.is_empty())
203            .collect()
204    } else if let Some(csv) = allow_csv {
205        // With --allow, we mark every registered stdlib builtin as denied
206        // *except* those in the allow list and the core builtins.
207        let allowed: HashSet<String> = csv
208            .split(',')
209            .map(|s| s.trim().to_string())
210            .filter(|s| !s.is_empty())
211            .collect();
212        let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
213
214        // Create a temporary VM with stdlib registered to enumerate all builtin names.
215        let mut tmp = harn_vm::Vm::new();
216        harn_vm::register_vm_stdlib(&mut tmp);
217        harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
218        harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
219
220        tmp.builtin_names()
221            .into_iter()
222            .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
223            .collect()
224    } else {
225        HashSet::new()
226    }
227}
228
229/// Result of [`compile_or_load_chunk_for_run`]. Failures propagate as
230/// diagnostic text on the run path so callers map them straight to a
231/// non-zero exit code without bespoke error types.
232pub(crate) struct LoadedChunk {
233    pub(crate) source: String,
234    pub(crate) chunk: harn_vm::Chunk,
235}
236
237/// Load the entry pipeline as a runnable [`harn_vm::Chunk`], using the
238/// content-addressed bytecode cache when its key matches. On a cache miss
239/// we read, parse, type-check, and compile, then persist the chunk.
240/// On a hit we skip parse/typecheck/compile entirely — the cache invariant
241/// is that a stored chunk passed those phases on the writer's harn build,
242/// and the key includes every transitively-imported user file so any
243/// change re-runs the full path.
244///
245/// `stderr` receives any diagnostic output. Returns `None` when a fatal
246/// type or compile error blocks execution; the caller maps that to
247/// exit-code 1.
248pub(crate) fn compile_or_load_chunk_for_run(
249    path: &str,
250    stderr: &mut String,
251) -> Option<LoadedChunk> {
252    compile_or_load_chunk_with_timing(path, stderr, None)
253}
254
255/// Like [`compile_or_load_chunk_for_run`] but lets the caller observe
256/// per-phase wall-clock timings (parse, typecheck, bytecode compile +
257/// cache hit/miss). Used by `harn time run` to drive the same code
258/// path as `harn run` while reporting phase-level timing.
259//
260// The `as_deref_mut` calls reborrow the inner `&mut RunTiming` so each
261// phase can mutate it independently. Clippy's `needless_option_as_deref`
262// is correct that the surface types match — that's exactly the
263// reborrow we want.
264#[allow(clippy::needless_option_as_deref)]
265pub(crate) fn compile_or_load_chunk_with_timing(
266    path: &str,
267    stderr: &mut String,
268    mut timing: Option<&mut RunTiming>,
269) -> Option<LoadedChunk> {
270    let source = match fs::read_to_string(path) {
271        Ok(s) => s,
272        Err(e) => {
273            stderr.push_str(&format!("Error reading {path}: {e}\n"));
274            return None;
275        }
276    };
277    if let Some(t) = timing.as_deref_mut() {
278        t.input_bytes = source.len() as u64;
279    }
280
281    let compile_phase_start = Instant::now();
282    let lookup = harn_vm::bytecode_cache::load(Path::new(path), &source);
283    if let Some(chunk) = lookup.chunk {
284        if let Some(t) = timing.as_deref_mut() {
285            t.cache_hit = true;
286            t.bytecode_compile = compile_phase_start.elapsed();
287        }
288        return Some(LoadedChunk { source, chunk });
289    }
290    if let Some(t) = timing.as_deref_mut() {
291        t.cache_hit = false;
292    }
293
294    let parse_start = Instant::now();
295    let program = parse_source_for_run(path, &source, stderr)?;
296    if let Some(t) = timing.as_deref_mut() {
297        t.parse = parse_start.elapsed();
298    }
299
300    let typecheck_start = Instant::now();
301    let mut had_type_error = false;
302    let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
303    for diag in &type_diagnostics {
304        let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
305        if matches!(diag.severity, DiagnosticSeverity::Error) {
306            had_type_error = true;
307        }
308        stderr.push_str(&rendered);
309    }
310    if let Some(t) = timing.as_deref_mut() {
311        t.typecheck = typecheck_start.elapsed();
312    }
313    if had_type_error {
314        return None;
315    }
316
317    let compile_step_start = Instant::now();
318    let chunk = match harn_vm::Compiler::new().compile(&program) {
319        Ok(c) => c,
320        Err(e) => {
321            stderr.push_str(&format!("error: compile error: {e}\n"));
322            return None;
323        }
324    };
325
326    // Cache misses are best-effort — read-only homedirs, full disks, and
327    // sandboxes are common in CI environments. Surface the failure as a
328    // single-line warning when explicitly requested via the audit hook;
329    // otherwise stay quiet to avoid bloating happy-path output.
330    if let Err(err) = harn_vm::bytecode_cache::store(&lookup.key, &chunk) {
331        if std::env::var_os("HARN_BYTECODE_CACHE_DEBUG").is_some() {
332            eprintln!("[harn] bytecode cache write skipped: {err}");
333        }
334    }
335    if let Some(t) = timing.as_deref_mut() {
336        t.bytecode_compile = compile_step_start.elapsed();
337    }
338
339    Some(LoadedChunk { source, chunk })
340}
341
342fn parse_source_for_run(
343    path: &str,
344    source: &str,
345    stderr: &mut String,
346) -> Option<Vec<harn_parser::SNode>> {
347    let mut lexer = harn_lexer::Lexer::new(source);
348    let tokens = match lexer.tokenize() {
349        Ok(tokens) => tokens,
350        Err(error) => {
351            let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
352                source,
353                path,
354                &error_span_from_lex(&error),
355                "error",
356                harn_parser::diagnostic::lexer_error_code(&error),
357                &error.to_string(),
358                Some("here"),
359                None,
360            );
361            stderr.push_str(&diagnostic);
362            return None;
363        }
364    };
365
366    let mut parser = harn_parser::Parser::new(tokens);
367    match parser.parse() {
368        Ok(program) => Some(program),
369        Err(error) => {
370            if parser.all_errors().is_empty() {
371                render_parse_error(path, source, &error, stderr);
372            } else {
373                for error in parser.all_errors() {
374                    render_parse_error(path, source, error, stderr);
375                }
376            }
377            None
378        }
379    }
380}
381
382fn render_parse_error(
383    path: &str,
384    source: &str,
385    error: &harn_parser::ParserError,
386    stderr: &mut String,
387) {
388    let span = error_span_from_parse(error);
389    let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
390        source,
391        path,
392        &span,
393        "error",
394        harn_parser::diagnostic::parser_error_code(error),
395        &harn_parser::diagnostic::parser_error_message(error),
396        Some(harn_parser::diagnostic::parser_error_label(error)),
397        harn_parser::diagnostic::parser_error_help(error),
398    );
399    stderr.push_str(&diagnostic);
400}
401
402fn error_span_from_lex(error: &harn_lexer::LexerError) -> harn_lexer::Span {
403    match error {
404        harn_lexer::LexerError::UnexpectedCharacter(_, span)
405        | harn_lexer::LexerError::UnterminatedString(span)
406        | harn_lexer::LexerError::UnterminatedBlockComment(span) => *span,
407    }
408}
409
410fn error_span_from_parse(error: &harn_parser::ParserError) -> harn_lexer::Span {
411    match error {
412        harn_parser::ParserError::Unexpected { span, .. } => *span,
413        harn_parser::ParserError::UnexpectedEof { span, .. } => *span,
414    }
415}
416
417/// Run the static type checker against `program` with cross-module
418/// import-aware call resolution when the file's imports all resolve. Used
419/// by `run_file` and the MCP server entry so `harn run` catches undefined
420/// cross-module calls before the VM starts.
421fn typecheck_with_imports(
422    program: &[harn_parser::SNode],
423    path: &Path,
424    source: &str,
425) -> Vec<harn_parser::TypeDiagnostic> {
426    if let Err(error) = package::ensure_dependencies_materialized(path) {
427        eprintln!("error: {error}");
428        process::exit(1);
429    }
430    let graph = harn_modules::build(&[path.to_path_buf()]);
431    let mut checker = harn_parser::TypeChecker::new();
432    if let Some(imported) = graph.imported_names_for_file(path) {
433        checker = checker.with_imported_names(imported);
434    }
435    if let Some(imported) = graph.imported_type_declarations_for_file(path) {
436        checker = checker.with_imported_type_decls(imported);
437    }
438    if let Some(imported) = graph.imported_callable_declarations_for_file(path) {
439        checker = checker.with_imported_callable_decls(imported);
440    }
441    checker.check_with_source(program, source)
442}
443
444/// Build the wrapped source and temp file backing a `harn run -e` invocation.
445///
446/// `import` is a top-level declaration in Harn, so the leading prefix of
447/// import lines (with surrounding blanks/comments) is hoisted out of the
448/// `pipeline main(task) { ... }` wrapper. The temp file is created in the
449/// current working directory so relative imports (`import "./lib"`) and
450/// `harn.toml` discovery resolve against the user's project, not the
451/// system temp dir. If the CWD is unwritable we fall back to the system
452/// temp dir with a stderr warning — pure-expression `-e` still works,
453/// but relative imports will fail to resolve.
454pub(crate) fn prepare_eval_temp_file(
455    code: &str,
456) -> Result<(String, tempfile::NamedTempFile), String> {
457    let (header, body) = split_eval_header(code);
458    let wrapped = if header.is_empty() {
459        format!("pipeline main(task) {{\n{body}\n}}")
460    } else {
461        format!("{header}\npipeline main(task) {{\n{body}\n}}")
462    };
463
464    let tmp = create_eval_temp_file()?;
465    Ok((wrapped, tmp))
466}
467
468/// Try to place the `-e` temp file in the current working directory so
469/// relative imports and `harn.toml` discovery resolve against the user's
470/// project. Fall back to the system temp dir on failure (with a warning),
471/// so pure-expression `-e` keeps working in read-only contexts.
472fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
473    if let Some(dir) = std::env::current_dir().ok().as_deref() {
474        // Hidden prefix on Unix so editors / tree-walkers are less likely
475        // to pick the file up during its short lifetime.
476        match tempfile::Builder::new()
477            .prefix(".harn-eval-")
478            .suffix(".harn")
479            .tempfile_in(dir)
480        {
481            Ok(tmp) => return Ok(tmp),
482            Err(error) => eprintln!(
483                "warning: harn run -e: could not create temp file in {}: {error}; \
484                 relative imports will not resolve",
485                dir.display()
486            ),
487        }
488    }
489    tempfile::Builder::new()
490        .prefix("harn-eval-")
491        .suffix(".harn")
492        .tempfile()
493        .map_err(|e| format!("failed to create temp file for -e: {e}"))
494}
495
496/// Split the `-e` input into a header (top-level imports + leading
497/// blanks/comments) and a body (everything else, to be wrapped in
498/// `pipeline main(task)`). The header may be empty.
499///
500/// Lines whose first non-whitespace token is `import` or `pub import`
501/// are treated as imports. Scanning stops at the first non-blank,
502/// non-comment, non-import line.
503fn split_eval_header(code: &str) -> (String, String) {
504    let mut header_end = 0usize;
505    let mut last_kept = 0usize;
506    for (idx, line) in code.lines().enumerate() {
507        let trimmed = line.trim_start();
508        if trimmed.is_empty() || trimmed.starts_with("//") {
509            header_end = idx + 1;
510            continue;
511        }
512        let is_import = trimmed.starts_with("import ")
513            || trimmed.starts_with("import\t")
514            || trimmed.starts_with("import\"")
515            || trimmed.starts_with("pub import ")
516            || trimmed.starts_with("pub import\t");
517        if is_import {
518            header_end = idx + 1;
519            last_kept = idx + 1;
520        } else {
521            break;
522        }
523    }
524    if last_kept == 0 {
525        return (String::new(), code.to_string());
526    }
527    let mut header_lines: Vec<&str> = Vec::new();
528    let mut body_lines: Vec<&str> = Vec::new();
529    for (idx, line) in code.lines().enumerate() {
530        if idx < header_end {
531            header_lines.push(line);
532        } else {
533            body_lines.push(line);
534        }
535    }
536    (header_lines.join("\n"), body_lines.join("\n"))
537}
538
539#[derive(Clone, Debug, Default, PartialEq, Eq)]
540pub enum CliLlmMockMode {
541    #[default]
542    Off,
543    Replay {
544        fixture_path: PathBuf,
545    },
546    Record {
547        fixture_path: PathBuf,
548    },
549}
550
551#[derive(Clone, Debug, Default, PartialEq, Eq)]
552pub struct RunAttestationOptions {
553    pub receipt_out: Option<PathBuf>,
554    pub agent_id: Option<String>,
555}
556
557/// Opt-in profiling. When `text` is true the run prints a categorical
558/// breakdown to stderr after execution; when `json_path` is set the same
559/// rollup is serialized to that path. Either flag enables span tracing
560/// (i.e. `harn_vm::tracing::set_tracing_enabled(true)`).
561#[derive(Clone, Debug, Default, PartialEq, Eq)]
562pub struct RunProfileOptions {
563    pub text: bool,
564    pub json_path: Option<PathBuf>,
565}
566
567impl RunProfileOptions {
568    pub fn is_enabled(&self) -> bool {
569        self.text || self.json_path.is_some()
570    }
571}
572
573#[derive(Clone, Debug, PartialEq, Eq)]
574pub struct RunSandboxOptions {
575    /// Install the default `harn run` sandbox for this invocation.
576    pub enabled: bool,
577    /// Override the workspace root used by the default sandbox. This is
578    /// intended for host-generated scripts whose source file lives outside
579    /// the workspace they operate on.
580    pub workspace_root: Option<PathBuf>,
581}
582
583impl Default for RunSandboxOptions {
584    fn default() -> Self {
585        Self {
586            enabled: true,
587            workspace_root: None,
588        }
589    }
590}
591
592impl RunSandboxOptions {
593    /// Disable the default direct-run sandbox and egress guard.
594    pub fn disabled() -> Self {
595        Self {
596            enabled: false,
597            workspace_root: None,
598        }
599    }
600
601    /// Constrain the default sandbox to an explicit workspace root.
602    pub fn with_workspace_root(mut self, workspace_root: impl Into<PathBuf>) -> Self {
603        self.workspace_root = Some(workspace_root.into());
604        self
605    }
606}
607
608#[derive(Clone)]
609pub struct RunInterruptTokens {
610    pub cancel_token: Arc<AtomicBool>,
611    pub signal_token: Arc<Mutex<Option<String>>>,
612}
613
614struct ExecuteRunInputs<'a> {
615    path: &'a str,
616    trace: bool,
617    denied_builtins: HashSet<String>,
618    script_argv: Vec<String>,
619    skill_dirs_raw: Vec<String>,
620    llm_mock_mode: CliLlmMockMode,
621    attestation: Option<RunAttestationOptions>,
622    profile: RunProfileOptions,
623    sandbox: RunSandboxOptions,
624    interrupt_tokens: Option<RunInterruptTokens>,
625    json: Option<(RunJsonOptions, Box<dyn io::Write + Send>)>,
626    aux: RunAuxOptions,
627    timing: Option<&'a mut RunTiming>,
628    harnpack: HarnpackRunOptions,
629}
630
631/// Captured outcome of an in-process `execute_run` invocation. Tests use this
632/// instead of spawning the `harn` binary; the binary entry point translates
633/// it into real stdout/stderr writes + `process::exit`.
634#[derive(Clone, Debug, Default)]
635pub struct RunOutcome {
636    pub stdout: String,
637    pub stderr: String,
638    pub exit_code: i32,
639}
640
641pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
642    harn_vm::llm::clear_cli_llm_mock_mode();
643    match mode {
644        CliLlmMockMode::Off => Ok(()),
645        CliLlmMockMode::Replay { fixture_path } => {
646            let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
647            harn_vm::llm::install_cli_llm_mocks(mocks);
648            Ok(())
649        }
650        CliLlmMockMode::Record { .. } => {
651            harn_vm::llm::enable_cli_llm_mock_recording();
652            Ok(())
653        }
654    }
655}
656
657pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
658    let CliLlmMockMode::Record { fixture_path } = mode else {
659        return Ok(());
660    };
661    if let Some(parent) = fixture_path.parent() {
662        if !parent.as_os_str().is_empty() {
663            fs::create_dir_all(parent).map_err(|error| {
664                format!(
665                    "failed to create fixture directory {}: {error}",
666                    parent.display()
667                )
668            })?;
669        }
670    }
671
672    let lines = harn_vm::llm::take_cli_llm_recordings()
673        .into_iter()
674        .map(harn_vm::llm::serialize_llm_mock)
675        .collect::<Result<Vec<_>, _>>()?;
676    let body = if lines.is_empty() {
677        String::new()
678    } else {
679        format!("{}\n", lines.join("\n"))
680    };
681    fs::write(fixture_path, body)
682        .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
683}
684
685pub(crate) async fn run_file(
686    path: &str,
687    trace: bool,
688    denied_builtins: HashSet<String>,
689    script_argv: Vec<String>,
690    llm_mock_mode: CliLlmMockMode,
691    attestation: Option<RunAttestationOptions>,
692    profile: RunProfileOptions,
693) {
694    run_file_with_skill_dirs(
695        path,
696        trace,
697        denied_builtins,
698        script_argv,
699        Vec::new(),
700        llm_mock_mode,
701        attestation,
702        profile,
703        RunSandboxOptions::default(),
704        None,
705        RunAuxOptions::default(),
706        HarnpackRunOptions::default(),
707    )
708    .await;
709}
710
711pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
712    let outcome = execute_explain_cost(path);
713    if !outcome.stderr.is_empty() {
714        io::stderr().write_all(outcome.stderr.as_bytes()).ok();
715    }
716    if !outcome.stdout.is_empty() {
717        io::stdout().write_all(outcome.stdout.as_bytes()).ok();
718    }
719    if outcome.exit_code != 0 {
720        process::exit(outcome.exit_code);
721    }
722}
723
724#[allow(clippy::too_many_arguments)]
725pub(crate) async fn run_file_with_skill_dirs(
726    path: &str,
727    trace: bool,
728    denied_builtins: HashSet<String>,
729    script_argv: Vec<String>,
730    skill_dirs_raw: Vec<String>,
731    llm_mock_mode: CliLlmMockMode,
732    attestation: Option<RunAttestationOptions>,
733    profile: RunProfileOptions,
734    sandbox: RunSandboxOptions,
735    json: Option<RunJsonOptions>,
736    aux: RunAuxOptions,
737    harnpack: HarnpackRunOptions,
738) {
739    // Graceful shutdown: flush run records before exit on SIGINT/SIGTERM.
740    let interrupt_tokens = install_signal_shutdown_handler();
741
742    let _stdout_passthrough = StdoutPassthroughGuard::enable();
743    let json_with_stdout =
744        json.map(|opts| (opts, Box::new(io::stdout()) as Box<dyn io::Write + Send>));
745    let outcome = execute_run_inner(ExecuteRunInputs {
746        path,
747        trace,
748        denied_builtins,
749        script_argv,
750        skill_dirs_raw,
751        llm_mock_mode,
752        attestation,
753        profile,
754        sandbox,
755        interrupt_tokens: Some(interrupt_tokens.clone()),
756        json: json_with_stdout,
757        aux,
758        timing: None,
759        harnpack,
760    })
761    .await;
762
763    // `harn run` streams normal program stdout during execution. Any stdout
764    // left here came from older capture paths, so flush it after diagnostics.
765    if !outcome.stderr.is_empty() {
766        io::stderr().write_all(outcome.stderr.as_bytes()).ok();
767    }
768    if !outcome.stdout.is_empty() {
769        io::stdout().write_all(outcome.stdout.as_bytes()).ok();
770    }
771
772    let mut exit_code = outcome.exit_code;
773    if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
774        exit_code = 124;
775    }
776    if exit_code != 0 {
777        process::exit(exit_code);
778    }
779}
780
781#[allow(clippy::too_many_arguments)]
782pub(crate) async fn run_resume_with_skill_dirs(
783    target: &str,
784    trace: bool,
785    denied_builtins: HashSet<String>,
786    resume_argv: Vec<String>,
787    skill_dirs_raw: Vec<String>,
788    llm_mock_mode: CliLlmMockMode,
789    attestation: Option<RunAttestationOptions>,
790    profile: RunProfileOptions,
791    sandbox: RunSandboxOptions,
792    json: Option<RunJsonOptions>,
793    aux: RunAuxOptions,
794) {
795    let source = r#"import { resume_agent, wait_agent } from "std/agent/workers"
796
797pipeline main(task) {
798  let input = if len(argv) > 1 {
799    argv[1]
800  } else {
801    nil
802  }
803  let handle = resume_agent(argv[0], input, true)
804  return wait_agent(handle)
805}
806"#;
807    let tmp = create_eval_temp_file().unwrap_or_else(|e| {
808        eprintln!("error: {e}");
809        process::exit(1);
810    });
811    let tmp_path = tmp.path().to_path_buf();
812    if let Err(error) = fs::write(&tmp_path, source) {
813        eprintln!("error: failed to write temp file for --resume: {error}");
814        process::exit(1);
815    }
816    let mut argv = Vec::with_capacity(resume_argv.len() + 1);
817    argv.push(target.to_string());
818    argv.extend(resume_argv);
819    let tmp_str = tmp_path.to_string_lossy().into_owned();
820    run_file_with_skill_dirs(
821        &tmp_str,
822        trace,
823        denied_builtins,
824        argv,
825        skill_dirs_raw,
826        llm_mock_mode,
827        attestation,
828        profile,
829        sandbox,
830        json,
831        aux,
832        HarnpackRunOptions::default(),
833    )
834    .await;
835}
836
837pub fn execute_explain_cost(path: &str) -> RunOutcome {
838    let stdout = String::new();
839    let mut stderr = String::new();
840
841    let (source, program) = parse_source_file(path);
842
843    let mut had_type_error = false;
844    let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
845    for diag in &type_diagnostics {
846        let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
847        if matches!(diag.severity, DiagnosticSeverity::Error) {
848            had_type_error = true;
849        }
850        stderr.push_str(&rendered);
851    }
852    if had_type_error {
853        return RunOutcome {
854            stdout,
855            stderr,
856            exit_code: 1,
857        };
858    }
859
860    let extensions = package::load_runtime_extensions(Path::new(path));
861    package::install_runtime_extensions(&extensions);
862    RunOutcome {
863        stdout: explain_cost::render_explain_cost(path, &program),
864        stderr,
865        exit_code: 0,
866    }
867}
868
869pub(crate) struct StdoutPassthroughGuard {
870    previous: bool,
871}
872
873impl StdoutPassthroughGuard {
874    pub(crate) fn enable() -> Self {
875        Self {
876            previous: harn_vm::set_stdout_passthrough(true),
877        }
878    }
879}
880
881impl Drop for StdoutPassthroughGuard {
882    fn drop(&mut self) {
883        harn_vm::set_stdout_passthrough(self.previous);
884    }
885}
886
887struct ExecutionPolicyGuard;
888
889impl Drop for ExecutionPolicyGuard {
890    fn drop(&mut self) {
891        harn_vm::orchestration::pop_execution_policy();
892    }
893}
894
895struct RunSandboxScope {
896    _execution_policy: Option<ExecutionPolicyGuard>,
897    _egress_policy: Option<harn_vm::egress::ExplicitEgressPolicyGuard>,
898}
899
900impl RunSandboxScope {
901    fn disabled() -> Self {
902        Self {
903            _execution_policy: None,
904            _egress_policy: None,
905        }
906    }
907}
908
909fn install_run_sandbox_scope(
910    options: &RunSandboxOptions,
911    workspace_root: &Path,
912    stderr: &mut String,
913) -> RunSandboxScope {
914    if !options.enabled {
915        stderr.push_str(
916            "warning: harn run --no-sandbox disables filesystem, process, and egress sandbox defaults\n",
917        );
918        return RunSandboxScope::disabled();
919    }
920
921    let execution_policy = if harn_vm::orchestration::current_execution_policy().is_none() {
922        harn_vm::orchestration::push_execution_policy(default_run_capability_policy(
923            workspace_root,
924        ));
925        Some(ExecutionPolicyGuard)
926    } else {
927        None
928    };
929    let egress_policy = Some(harn_vm::egress::require_explicit_egress_policy_for_host());
930
931    RunSandboxScope {
932        _execution_policy: execution_policy,
933        _egress_policy: egress_policy,
934    }
935}
936
937fn default_run_capability_policy(
938    workspace_root: &Path,
939) -> harn_vm::orchestration::CapabilityPolicy {
940    harn_vm::orchestration::CapabilityPolicy {
941        workspace_roots: vec![normalize_run_workspace_root(workspace_root)
942            .display()
943            .to_string()],
944        side_effect_level: Some("process_exec".to_string()),
945        sandbox_profile: harn_vm::orchestration::SandboxProfile::Worktree,
946        ..harn_vm::orchestration::CapabilityPolicy::default()
947    }
948}
949
950fn normalize_run_workspace_root(path: &Path) -> PathBuf {
951    if path.is_absolute() {
952        return path.to_path_buf();
953    }
954    std::env::current_dir()
955        .map(|cwd| cwd.join(path))
956        .unwrap_or_else(|_| path.to_path_buf())
957}
958
959fn default_run_workspace_root(project_root: Option<&Path>, source_parent: &Path) -> PathBuf {
960    project_root
961        .map(Path::to_path_buf)
962        .or_else(|| std::env::current_dir().ok())
963        .unwrap_or_else(|| source_parent.to_path_buf())
964}
965
966fn run_sandbox_attestation(sandbox: &RunSandboxOptions) -> serde_json::Value {
967    let active_policy = harn_vm::orchestration::current_execution_policy();
968    let active = active_policy.is_some();
969    let workspace_roots = active_policy
970        .as_ref()
971        .map(|policy| policy.workspace_roots.clone())
972        .unwrap_or_default();
973    let profile = active_policy
974        .as_ref()
975        .map(|policy| policy.sandbox_profile.as_str())
976        .unwrap_or("unrestricted");
977    let egress = if sandbox.enabled {
978        "explicit_policy_required"
979    } else if active {
980        "host_policy"
981    } else {
982        "unrestricted"
983    };
984
985    serde_json::json!({
986        "run_default_enabled": sandbox.enabled,
987        "active": active,
988        "workspace_roots": workspace_roots,
989        "profile": profile,
990        "egress": egress,
991    })
992}
993
994// User-facing copy on Ctrl-C. We want the operator to know that a brief
995// pause after the first signal is expected (the VM rewinds the active
996// instruction, drops in-flight async ops like a hanging Ollama request,
997// and unwinds frames before the runtime exits) so they don't reflexively
998// reach for a second Ctrl-C and force-kill the process. The "Ctrl-C
999// again to force-exit" hint is load-bearing — earlier runs of harn
1000// released to the fleet showed operators routinely double-tapping the
1001// shortcut and losing the chance to inspect the error trace.
1002const FIRST_SIGNAL_MESSAGE: &str =
1003    "[harn] signal received, interrupting VM (give it a moment to unwind in-flight async ops; Ctrl-C again to force-exit)...";
1004
1005fn install_signal_shutdown_handler() -> RunInterruptTokens {
1006    let tokens = RunInterruptTokens {
1007        cancel_token: Arc::new(AtomicBool::new(false)),
1008        signal_token: Arc::new(Mutex::new(None)),
1009    };
1010    let tokens_clone = tokens.clone();
1011    tokio::spawn(async move {
1012        #[cfg(unix)]
1013        {
1014            use tokio::signal::unix::{signal, SignalKind};
1015            // Containers without controlling-tty access can refuse signal()
1016            // registration. In that case, degrade to no-signal mode rather
1017            // than crashing the `harn run` script — the script still runs,
1018            // it just can't be interrupted cleanly.
1019            let sigterm_stream = signal(SignalKind::terminate()).ok();
1020            let sigint_stream = signal(SignalKind::interrupt()).ok();
1021            let sighup_stream = signal(SignalKind::hangup()).ok();
1022            let (mut sigterm, mut sigint, mut sighup) =
1023                match (sigterm_stream, sigint_stream, sighup_stream) {
1024                    (Some(t), Some(i), Some(h)) => (t, i, h),
1025                    _ => {
1026                        eprintln!(
1027                            "[harn] signal handlers unavailable in this environment; \
1028                             continuing without graceful-shutdown interception"
1029                        );
1030                        return;
1031                    }
1032                };
1033            let mut seen_signal = false;
1034            loop {
1035                let signal_name = tokio::select! {
1036                    _ = sigterm.recv() => "SIGTERM",
1037                    _ = sigint.recv() => "SIGINT",
1038                    _ = sighup.recv() => "SIGHUP",
1039                };
1040                if seen_signal {
1041                    eprintln!("[harn] second signal received, terminating");
1042                    process::exit(124);
1043                }
1044                seen_signal = true;
1045                request_vm_interrupt(&tokens_clone, signal_name);
1046                eprintln!("{FIRST_SIGNAL_MESSAGE}");
1047            }
1048        }
1049        #[cfg(not(unix))]
1050        {
1051            let mut seen_signal = false;
1052            loop {
1053                let _ = tokio::signal::ctrl_c().await;
1054                if seen_signal {
1055                    eprintln!("[harn] second signal received, terminating");
1056                    process::exit(124);
1057                }
1058                seen_signal = true;
1059                request_vm_interrupt(&tokens_clone, "SIGINT");
1060                eprintln!("{FIRST_SIGNAL_MESSAGE}");
1061            }
1062        }
1063    });
1064    tokens
1065}
1066
1067fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
1068    if let Ok(mut signal) = tokens.signal_token.lock() {
1069        *signal = Some(signal_name.to_string());
1070    }
1071    tokens.cancel_token.store(true, Ordering::SeqCst);
1072}
1073
1074/// In-process equivalent of `run_file_with_skill_dirs`. Returns the captured
1075/// stdout, stderr, and what exit code the binary entry would have used,
1076/// instead of writing to real stdout/stderr or calling `process::exit`.
1077///
1078/// Tests should call this directly. The `harn run` binary path wraps it.
1079pub async fn execute_run(
1080    path: &str,
1081    trace: bool,
1082    denied_builtins: HashSet<String>,
1083    script_argv: Vec<String>,
1084    skill_dirs_raw: Vec<String>,
1085    llm_mock_mode: CliLlmMockMode,
1086    attestation: Option<RunAttestationOptions>,
1087    profile: RunProfileOptions,
1088) -> RunOutcome {
1089    execute_run_with_harnpack_and_sandbox_options(
1090        path,
1091        trace,
1092        denied_builtins,
1093        script_argv,
1094        skill_dirs_raw,
1095        llm_mock_mode,
1096        attestation,
1097        profile,
1098        RunSandboxOptions::default(),
1099        HarnpackRunOptions::default(),
1100    )
1101    .await
1102}
1103
1104/// [`execute_run`] with an explicit sandbox policy override for in-process
1105/// callers whose source path is intentionally outside the workspace they
1106/// operate on.
1107#[allow(clippy::too_many_arguments)]
1108pub async fn execute_run_with_sandbox_options(
1109    path: &str,
1110    trace: bool,
1111    denied_builtins: HashSet<String>,
1112    script_argv: Vec<String>,
1113    skill_dirs_raw: Vec<String>,
1114    llm_mock_mode: CliLlmMockMode,
1115    attestation: Option<RunAttestationOptions>,
1116    profile: RunProfileOptions,
1117    sandbox: RunSandboxOptions,
1118) -> RunOutcome {
1119    execute_run_with_harnpack_and_sandbox_options(
1120        path,
1121        trace,
1122        denied_builtins,
1123        script_argv,
1124        skill_dirs_raw,
1125        llm_mock_mode,
1126        attestation,
1127        profile,
1128        sandbox,
1129        HarnpackRunOptions::default(),
1130    )
1131    .await
1132}
1133
1134/// [`execute_run`] for callers that want to opt-in to the `.harnpack`
1135/// verify-replay-execute path. Used by `harn run <bundle.harnpack>`
1136/// integration tests and by the binary entry once it has parsed the
1137/// `--allow-unsigned` / `--dry-run-verify` flags.
1138#[allow(clippy::too_many_arguments)]
1139pub async fn execute_run_with_harnpack_options(
1140    path: &str,
1141    trace: bool,
1142    denied_builtins: HashSet<String>,
1143    script_argv: Vec<String>,
1144    skill_dirs_raw: Vec<String>,
1145    llm_mock_mode: CliLlmMockMode,
1146    attestation: Option<RunAttestationOptions>,
1147    profile: RunProfileOptions,
1148    harnpack: HarnpackRunOptions,
1149) -> RunOutcome {
1150    execute_run_with_harnpack_and_sandbox_options(
1151        path,
1152        trace,
1153        denied_builtins,
1154        script_argv,
1155        skill_dirs_raw,
1156        llm_mock_mode,
1157        attestation,
1158        profile,
1159        RunSandboxOptions::default(),
1160        harnpack,
1161    )
1162    .await
1163}
1164
1165#[allow(clippy::too_many_arguments)]
1166async fn execute_run_with_harnpack_and_sandbox_options(
1167    path: &str,
1168    trace: bool,
1169    denied_builtins: HashSet<String>,
1170    script_argv: Vec<String>,
1171    skill_dirs_raw: Vec<String>,
1172    llm_mock_mode: CliLlmMockMode,
1173    attestation: Option<RunAttestationOptions>,
1174    profile: RunProfileOptions,
1175    sandbox: RunSandboxOptions,
1176    harnpack: HarnpackRunOptions,
1177) -> RunOutcome {
1178    execute_run_inner(ExecuteRunInputs {
1179        path,
1180        trace,
1181        denied_builtins,
1182        script_argv,
1183        skill_dirs_raw,
1184        llm_mock_mode,
1185        attestation,
1186        profile,
1187        sandbox,
1188        interrupt_tokens: None,
1189        json: None,
1190        aux: RunAuxOptions::default(),
1191        timing: None,
1192        harnpack,
1193    })
1194    .await
1195}
1196
1197/// `execute_run` variant for `--json` mode. Returns once the run is
1198/// complete; the NDJSON event stream — including the terminal `result`
1199/// or `error` event — has already been written to `out` and flushed.
1200/// `out` must be `Send` because the run-event sink may be called from
1201/// any worker thread the VM spawns.
1202#[allow(clippy::too_many_arguments)]
1203pub async fn execute_run_json(
1204    path: &str,
1205    trace: bool,
1206    denied_builtins: HashSet<String>,
1207    script_argv: Vec<String>,
1208    skill_dirs_raw: Vec<String>,
1209    llm_mock_mode: CliLlmMockMode,
1210    attestation: Option<RunAttestationOptions>,
1211    profile: RunProfileOptions,
1212    out: Box<dyn io::Write + Send>,
1213    options: RunJsonOptions,
1214) -> RunOutcome {
1215    execute_run_inner(ExecuteRunInputs {
1216        path,
1217        trace,
1218        denied_builtins,
1219        script_argv,
1220        skill_dirs_raw,
1221        llm_mock_mode,
1222        attestation,
1223        profile,
1224        sandbox: RunSandboxOptions::default(),
1225        interrupt_tokens: None,
1226        json: Some((options, out)),
1227        aux: RunAuxOptions::default(),
1228        timing: None,
1229        harnpack: HarnpackRunOptions::default(),
1230    })
1231    .await
1232}
1233
1234/// Run a `.harn` file with the default builtin/argv set and record
1235/// phase timings into `timing`. Used by `harn time run` so the
1236/// instrumented run shares the exact code path as plain `harn run`.
1237pub(crate) async fn execute_run_with_timing(
1238    path: &str,
1239    script_argv: Vec<String>,
1240    timing: Option<&mut RunTiming>,
1241    sandbox: RunSandboxOptions,
1242) -> RunOutcome {
1243    execute_run_inner(ExecuteRunInputs {
1244        path,
1245        trace: false,
1246        denied_builtins: HashSet::new(),
1247        script_argv,
1248        skill_dirs_raw: Vec::new(),
1249        llm_mock_mode: CliLlmMockMode::Off,
1250        attestation: None,
1251        profile: RunProfileOptions::default(),
1252        sandbox,
1253        interrupt_tokens: None,
1254        json: None,
1255        aux: RunAuxOptions::default(),
1256        timing,
1257        harnpack: HarnpackRunOptions::default(),
1258    })
1259    .await
1260}
1261
1262// See [`compile_or_load_chunk_with_timing`] for why `as_deref_mut` is
1263// the intentional reborrow pattern here.
1264#[allow(clippy::needless_option_as_deref)]
1265async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
1266    let ExecuteRunInputs {
1267        path,
1268        trace,
1269        denied_builtins,
1270        script_argv,
1271        skill_dirs_raw,
1272        llm_mock_mode,
1273        attestation,
1274        profile,
1275        sandbox,
1276        interrupt_tokens,
1277        json,
1278        aux,
1279        timing,
1280        harnpack,
1281    } = inputs;
1282    let RunAuxOptions {
1283        summary,
1284        phase,
1285        rusage,
1286    } = aux;
1287    let run_started = Instant::now();
1288    let cpu_started_ms = rusage.as_ref().map(|_| time::cpu_ms());
1289    let mut owned_timing = if timing.is_none() && (phase.is_some() || rusage.is_some()) {
1290        Some(RunTiming::default())
1291    } else {
1292        None
1293    };
1294    let mut timing = timing.or(owned_timing.as_mut());
1295
1296    // `--json` installs an in-process sink that diverts every
1297    // observable VM event (stdout, stderr, transcript, tool, hook,
1298    // persona) into a single NDJSON stream on `out`. The sink stays
1299    // active until we drop the guard below — fatal errors emit a
1300    // terminal `error` event on the same stream before bailing.
1301    let json_session = json.map(|(options, out)| JsonRunSession::install(options, out));
1302
1303    let mut stderr = String::new();
1304    let mut stdout = String::new();
1305
1306    // `.harnpack` preflight: verify signature + replay archive into the
1307    // content-addressed cache before we touch the chunk loader. The
1308    // outcome path (entrypoint inside the unpacked tree) replaces the
1309    // CLI-supplied `path` for everything below.
1310    let owned_run_path: String;
1311    let resolved_path: &str = if harnpack::looks_like_harnpack(Path::new(path)) {
1312        let outcome = match harnpack::prepare_harnpack(Path::new(path), &harnpack, &mut stderr) {
1313            Ok(prepared) => prepared,
1314            Err(err) => {
1315                return finalize_harnpack_error(
1316                    stderr,
1317                    json_session,
1318                    summary.as_ref(),
1319                    phase.as_ref(),
1320                    rusage.as_ref(),
1321                    run_started,
1322                    err,
1323                );
1324            }
1325        };
1326        harn_vm::run_events::emit(harn_vm::run_events::RunEvent::PackRun {
1327            bundle_hash: outcome.bundle_hash.clone(),
1328            signature_verified: outcome.signature_verified,
1329            key_id: outcome.key_id.clone(),
1330            cache_hit: outcome.cache_hit,
1331            dry_run_verify: harnpack.dry_run_verify,
1332        });
1333        if harnpack.dry_run_verify {
1334            return finalize_harnpack_dry_run(
1335                stderr,
1336                json_session,
1337                summary.as_ref(),
1338                phase.as_ref(),
1339                rusage.as_ref(),
1340                run_started,
1341                cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1342                &outcome,
1343            );
1344        }
1345        owned_run_path = outcome.entrypoint_path.to_string_lossy().into_owned();
1346        owned_run_path.as_str()
1347    } else {
1348        path
1349    };
1350
1351    let Some(LoadedChunk { source, chunk }) =
1352        compile_or_load_chunk_with_timing(resolved_path, &mut stderr, timing.as_deref_mut())
1353    else {
1354        let message = stderr.clone();
1355        return finalize_run_error(
1356            stdout,
1357            stderr,
1358            json_session,
1359            summary.as_ref(),
1360            phase.as_ref(),
1361            rusage.as_ref(),
1362            run_started,
1363            None,
1364            timing.as_deref(),
1365            0,
1366            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1367            "compile_error",
1368            message,
1369        );
1370    };
1371    let path = resolved_path;
1372
1373    // Bracket the VM-setup phase explicitly. `run_setup` covers
1374    // everything between the bytecode compile and the first VM
1375    // instruction; `run_main` covers `vm.execute` proper.
1376    let setup_start = Instant::now();
1377
1378    if trace || summary.is_some() {
1379        harn_vm::llm::enable_tracing();
1380    }
1381    if profile.is_enabled() || phase.is_some() {
1382        harn_vm::tracing::set_tracing_enabled(true);
1383    }
1384    if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
1385        stderr.push_str(&format!("error: {error}\n"));
1386        return finalize_run_error(
1387            stdout,
1388            stderr,
1389            json_session,
1390            summary.as_ref(),
1391            phase.as_ref(),
1392            rusage.as_ref(),
1393            run_started,
1394            None,
1395            timing.as_deref(),
1396            0,
1397            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1398            "llm_mock_install",
1399            error,
1400        );
1401    }
1402
1403    let mut vm = harn_vm::Vm::new();
1404    if let Some(interrupt_tokens) = interrupt_tokens {
1405        vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
1406        vm.install_cancel_token(interrupt_tokens.cancel_token);
1407    }
1408    harn_vm::register_vm_stdlib_with_deferred_llm(&mut vm);
1409    crate::install_default_hostlib(&mut vm);
1410    let source_parent = std::path::Path::new(path)
1411        .parent()
1412        .unwrap_or(std::path::Path::new("."));
1413    // Metadata/store rooted at harn.toml when present; source dir otherwise.
1414    let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1415    let store_base = project_root.as_deref().unwrap_or(source_parent);
1416    let sandbox_root = sandbox
1417        .workspace_root
1418        .clone()
1419        .unwrap_or_else(|| default_run_workspace_root(project_root.as_deref(), source_parent));
1420    let _sandbox_scope = install_run_sandbox_scope(&sandbox, &sandbox_root, &mut stderr);
1421    let attestation_started_at_ms = now_ms();
1422    let attestation_log = if attestation.is_some() {
1423        Some(harn_vm::event_log::install_memory_for_current_thread(256))
1424    } else {
1425        None
1426    };
1427    if let Some(log) = attestation_log.as_ref() {
1428        append_run_provenance_event(
1429            log,
1430            "started",
1431            serde_json::json!({
1432                "pipeline": path,
1433                "argv": &script_argv,
1434                "project_root": store_base.display().to_string(),
1435                "sandbox": run_sandbox_attestation(&sandbox),
1436            }),
1437        )
1438        .await;
1439    }
1440    harn_vm::register_store_builtins(&mut vm, store_base);
1441    harn_vm::register_metadata_builtins(&mut vm, store_base);
1442    let pipeline_name = std::path::Path::new(path)
1443        .file_stem()
1444        .and_then(|s| s.to_str())
1445        .unwrap_or("default");
1446    harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1447    vm.set_source_info(path, &source);
1448    if !denied_builtins.is_empty() {
1449        vm.set_denied_builtins(denied_builtins);
1450    }
1451    if let Some(ref root) = project_root {
1452        vm.set_project_root(root);
1453    }
1454
1455    if let Some(p) = std::path::Path::new(path).parent() {
1456        if !p.as_os_str().is_empty() {
1457            vm.set_source_dir(p);
1458        }
1459    }
1460
1461    // Load filesystem + manifest skills before the pipeline runs so
1462    // `skills` is populated with a pre-discovered registry (see #73).
1463    let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
1464    let loaded = load_skills(&SkillLoaderInputs {
1465        cli_dirs,
1466        source_path: Some(std::path::PathBuf::from(path)),
1467    });
1468    emit_loader_warnings(&loaded.loader_warnings);
1469    install_skills_global(&mut vm, &loaded);
1470
1471    // `harn run script.harn -- a b c` yields `argv == ["a", "b", "c"]`.
1472    // Always set so scripts can rely on `len(argv)`.
1473    let argv_values: Vec<harn_vm::VmValue> = script_argv
1474        .iter()
1475        .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
1476        .collect();
1477    vm.set_global(
1478        "argv",
1479        harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
1480    );
1481
1482    // Install the script's `Harness` capability handle so the auto-call
1483    // emitted by `Compiler::compile()` for `fn main(harness: Harness)`
1484    // entrypoints can read it.
1485    vm.set_harness(harn_vm::Harness::real());
1486
1487    let extensions = package::load_runtime_extensions(Path::new(path));
1488    package::install_runtime_extensions(&extensions);
1489    if let Some(manifest) = extensions.root_manifest.as_ref() {
1490        if !manifest.mcp.is_empty() {
1491            connect_mcp_servers(&manifest.mcp, &mut vm).await;
1492        }
1493    }
1494    if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1495        stderr.push_str(&format!(
1496            "error: failed to install manifest triggers: {error}\n"
1497        ));
1498        return finalize_run_error(
1499            stdout,
1500            stderr,
1501            json_session,
1502            summary.as_ref(),
1503            phase.as_ref(),
1504            rusage.as_ref(),
1505            run_started,
1506            None,
1507            timing.as_deref(),
1508            0,
1509            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1510            "manifest_triggers",
1511            error.to_string(),
1512        );
1513    }
1514    if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1515        stderr.push_str(&format!(
1516            "error: failed to install manifest hooks: {error}\n"
1517        ));
1518        return finalize_run_error(
1519            stdout,
1520            stderr,
1521            json_session,
1522            summary.as_ref(),
1523            phase.as_ref(),
1524            rusage.as_ref(),
1525            run_started,
1526            None,
1527            timing.as_deref(),
1528            0,
1529            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1530            "manifest_hooks",
1531            error.to_string(),
1532        );
1533    }
1534
1535    // Run inside a LocalSet so spawn_local works for concurrency builtins.
1536    let local = tokio::task::LocalSet::new();
1537    if let Some(t) = timing.as_deref_mut() {
1538        t.run_setup = setup_start.elapsed();
1539    }
1540    let main_start = Instant::now();
1541    let execution = local
1542        .run_until(async {
1543            match vm.execute(&chunk).await {
1544                Ok(value) => Ok((vm.output(), value)),
1545                Err(e) => Err(vm.format_runtime_error(&e)),
1546            }
1547        })
1548        .await;
1549    if let Some(t) = timing.as_deref_mut() {
1550        t.run_main = main_start.elapsed();
1551    }
1552    if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
1553        stderr.push_str(&format!("error: {error}\n"));
1554        let profile_rollup = if profile.is_enabled() {
1555            Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1556        } else {
1557            None
1558        };
1559        return finalize_run_error(
1560            stdout,
1561            stderr,
1562            json_session,
1563            summary.as_ref(),
1564            phase.as_ref(),
1565            rusage.as_ref(),
1566            run_started,
1567            profile_rollup.as_ref(),
1568            timing.as_deref(),
1569            harn_vm::tracing::peek_spans().len() as u64,
1570            cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1571            "llm_mock_record",
1572            error,
1573        );
1574    }
1575
1576    // Always drain any captured stderr accumulated during execution.
1577    let buffered_stderr = harn_vm::take_stderr_buffer();
1578    stderr.push_str(&buffered_stderr);
1579
1580    let exit_code = match &execution {
1581        Ok((_, return_value)) => exit_code_from_return_value(return_value),
1582        Err(_) => 1,
1583    };
1584
1585    if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
1586        if let Err(error) = emit_run_attestation(
1587            log,
1588            path,
1589            store_base,
1590            attestation_started_at_ms,
1591            exit_code,
1592            options,
1593            &mut stderr,
1594        )
1595        .await
1596        {
1597            stderr.push_str(&format!(
1598                "error: failed to emit provenance receipt: {error}\n"
1599            ));
1600            let profile_rollup = if profile.is_enabled() {
1601                Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1602            } else {
1603                None
1604            };
1605            return finalize_run_error(
1606                stdout,
1607                stderr,
1608                json_session,
1609                summary.as_ref(),
1610                phase.as_ref(),
1611                rusage.as_ref(),
1612                run_started,
1613                profile_rollup.as_ref(),
1614                timing.as_deref(),
1615                harn_vm::tracing::peek_spans().len() as u64,
1616                cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1617                "attestation",
1618                error,
1619            );
1620        }
1621        harn_vm::event_log::reset_active_event_log();
1622    }
1623
1624    match execution {
1625        Ok((output, return_value)) => {
1626            stdout.push_str(output);
1627            let main_events = harn_vm::tracing::peek_spans().len() as u64;
1628            let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1629            let profile_rollup = if profile.is_enabled() {
1630                Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1631            } else {
1632                None
1633            };
1634            let summary_llm = summary.as_ref().map(|_| run_summary_llm_snapshot());
1635            if trace {
1636                stderr.push_str(&render_trace_summary());
1637            }
1638            if let Some(profile_rollup) = profile_rollup.as_ref() {
1639                if let Err(error) =
1640                    render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1641                {
1642                    stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1643                }
1644            }
1645            if exit_code != 0 {
1646                stderr.push_str(&render_return_value_error(&return_value));
1647            }
1648            let aux_emission = emit_run_aux_for_exit(
1649                summary.as_ref(),
1650                phase.as_ref(),
1651                rusage.as_ref(),
1652                run_started,
1653                exit_code,
1654                profile_rollup.as_ref(),
1655                summary_llm,
1656                timing.as_deref(),
1657                main_events,
1658                cpu_ms_total,
1659                json_session.is_some(),
1660                &mut stderr,
1661            );
1662            if let Some(session) = json_session {
1663                if let Some(error) = aux_emission.error {
1664                    let mut outcome = session.finalize_error(
1665                        "run_aux",
1666                        format!("failed to emit auxiliary run JSON: {error}"),
1667                        1,
1668                    );
1669                    outcome.stderr = aux_emission.stderr;
1670                    return outcome;
1671                }
1672                let value = harn_vm::llm::vm_value_to_json(&return_value);
1673                let mut outcome = session.finalize_result(value, aux_emission.exit_code);
1674                outcome.stderr = aux_emission.stderr;
1675                return outcome;
1676            }
1677            RunOutcome {
1678                stdout,
1679                stderr,
1680                exit_code: aux_emission.exit_code,
1681            }
1682        }
1683        Err(rendered_error) => {
1684            stderr.push_str(&rendered_error);
1685            let main_events = harn_vm::tracing::peek_spans().len() as u64;
1686            let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1687            let profile_rollup = if profile.is_enabled() {
1688                Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1689            } else {
1690                None
1691            };
1692            if let Some(profile_rollup) = profile_rollup.as_ref() {
1693                if let Err(error) =
1694                    render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1695                {
1696                    stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1697                }
1698            }
1699            let aux_emission = emit_run_aux_for_exit(
1700                summary.as_ref(),
1701                phase.as_ref(),
1702                rusage.as_ref(),
1703                run_started,
1704                1,
1705                profile_rollup.as_ref(),
1706                None,
1707                timing.as_deref(),
1708                main_events,
1709                cpu_ms_total,
1710                json_session.is_some(),
1711                &mut stderr,
1712            );
1713            if let Some(session) = json_session {
1714                let mut outcome =
1715                    session.finalize_error("runtime", rendered_error, aux_emission.exit_code);
1716                outcome.stderr = aux_emission.stderr;
1717                return outcome;
1718            }
1719            RunOutcome {
1720                stdout,
1721                stderr,
1722                exit_code: aux_emission.exit_code,
1723            }
1724        }
1725    }
1726}
1727
1728fn render_and_persist_profile_rollup(
1729    options: &RunProfileOptions,
1730    profile: &harn_vm::profile::RunProfile,
1731    stderr: &mut String,
1732) -> Result<(), String> {
1733    if options.text {
1734        stderr.push_str(&harn_vm::profile::render(profile));
1735    }
1736    if let Some(path) = options.json_path.as_ref() {
1737        if let Some(parent) = path.parent() {
1738            if !parent.as_os_str().is_empty() {
1739                fs::create_dir_all(parent)
1740                    .map_err(|error| format!("create {}: {error}", parent.display()))?;
1741            }
1742        }
1743        let json = serde_json::to_string_pretty(profile)
1744            .map_err(|error| format!("serialize profile: {error}"))?;
1745        fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
1746    }
1747    Ok(())
1748}
1749
1750fn build_run_summary<'a>(
1751    started: Instant,
1752    exit_code: i32,
1753    profile: Option<&'a harn_vm::profile::RunProfile>,
1754    llm: RunSummaryLlm,
1755) -> RunSummary<'a> {
1756    RunSummary {
1757        schema_version: RUN_SUMMARY_SCHEMA_VERSION,
1758        event: "run_summary",
1759        wall_time_ms: started.elapsed().as_millis().min(u128::from(u64::MAX)) as u64,
1760        exit_code,
1761        llm,
1762        profile,
1763    }
1764}
1765
1766fn run_summary_llm_snapshot() -> RunSummaryLlm {
1767    let (input_tokens, output_tokens, time_ms, call_count) = harn_vm::llm::peek_trace_summary();
1768    let cost_usd = harn_vm::llm::peek_total_cost();
1769    RunSummaryLlm {
1770        call_count,
1771        input_tokens,
1772        output_tokens,
1773        time_ms,
1774        cost_usd: if cost_usd.is_finite() { cost_usd } else { 0.0 },
1775    }
1776}
1777
1778struct RunAuxEmission {
1779    stderr: String,
1780    exit_code: i32,
1781    error: Option<String>,
1782}
1783
1784#[allow(clippy::too_many_arguments)]
1785fn emit_run_aux_for_exit(
1786    summary: Option<&RunSummaryOptions>,
1787    phase: Option<&RunPhaseOptions>,
1788    rusage: Option<&RunRusageOptions>,
1789    started: Instant,
1790    exit_code: i32,
1791    profile: Option<&harn_vm::profile::RunProfile>,
1792    llm: Option<RunSummaryLlm>,
1793    timing: Option<&RunTiming>,
1794    main_events: u64,
1795    cpu_ms_total: Option<u64>,
1796    json_mode: bool,
1797    stderr: &mut String,
1798) -> RunAuxEmission {
1799    let mut aux_stderr = String::new();
1800    let mut final_exit_code = exit_code;
1801    let mut aux_error = None;
1802    let aux_target = if json_mode { &mut aux_stderr } else { stderr };
1803    let default_timing = RunTiming::default();
1804    let timing = timing.unwrap_or(&default_timing);
1805
1806    if let Some(options) = summary {
1807        let llm = llm.unwrap_or_else(run_summary_llm_snapshot);
1808        let summary = build_run_summary(started, exit_code, profile, llm);
1809        if let Err(error) = emit_raw_json_line(&options.sink, &summary, "run summary", aux_target) {
1810            record_aux_error(
1811                &mut final_exit_code,
1812                &mut aux_error,
1813                aux_target,
1814                "run summary",
1815                error,
1816            );
1817        }
1818    }
1819    if let Some(options) = phase {
1820        let phase_event = RunPhaseEvent {
1821            schema_version: RUN_PHASE_SCHEMA_VERSION,
1822            event: "run_phase",
1823            phases: time::build_phase_records(timing, main_events),
1824        };
1825        if let Err(error) = emit_raw_json_line(&options.sink, &phase_event, "run phase", aux_target)
1826        {
1827            record_aux_error(
1828                &mut final_exit_code,
1829                &mut aux_error,
1830                aux_target,
1831                "run phase",
1832                error,
1833            );
1834        }
1835    }
1836    if let Some(options) = rusage {
1837        let rusage_event = RunRusageEvent {
1838            schema_version: RUN_RUSAGE_SCHEMA_VERSION,
1839            event: "run_rusage",
1840            cpu_ms: cpu_ms_total.unwrap_or(0),
1841        };
1842        if let Err(error) =
1843            emit_raw_json_line(&options.sink, &rusage_event, "run rusage", aux_target)
1844        {
1845            record_aux_error(
1846                &mut final_exit_code,
1847                &mut aux_error,
1848                aux_target,
1849                "run rusage",
1850                error,
1851            );
1852        }
1853    }
1854
1855    RunAuxEmission {
1856        stderr: aux_stderr,
1857        exit_code: final_exit_code,
1858        error: aux_error,
1859    }
1860}
1861
1862fn record_aux_error(
1863    final_exit_code: &mut i32,
1864    aux_error: &mut Option<String>,
1865    stderr: &mut String,
1866    label: &str,
1867    error: String,
1868) {
1869    stderr.push_str(&format!("error: failed to emit {label}: {error}\n"));
1870    if *final_exit_code == 0 {
1871        *final_exit_code = 1;
1872    }
1873    if aux_error.is_none() {
1874        *aux_error = Some(error);
1875    }
1876}
1877
1878fn emit_raw_json_line(
1879    sink: &RunJsonSink,
1880    value: &impl Serialize,
1881    label: &str,
1882    stderr: &mut String,
1883) -> Result<(), String> {
1884    let line =
1885        serde_json::to_string(value).map_err(|error| format!("serialize {label}: {error}"))? + "\n";
1886    match &sink.target {
1887        RunJsonSinkTarget::Stderr => {
1888            stderr.push_str(&line);
1889            Ok(())
1890        }
1891        RunJsonSinkTarget::File(path) => write_raw_json_file(path, &line),
1892        RunJsonSinkTarget::Fd(fd) => write_raw_json_fd(*fd, &line, sink.fd_flag),
1893    }
1894}
1895
1896fn write_raw_json_file(path: &Path, line: &str) -> Result<(), String> {
1897    if let Some(parent) = path.parent() {
1898        if !parent.as_os_str().is_empty() {
1899            fs::create_dir_all(parent)
1900                .map_err(|error| format!("create {}: {error}", parent.display()))?;
1901        }
1902    }
1903    fs::write(path, line).map_err(|error| format!("write {}: {error}", path.display()))
1904}
1905
1906#[cfg(unix)]
1907fn write_raw_json_fd(fd: i32, line: &str, flag: &str) -> Result<(), String> {
1908    use std::fs::File;
1909    use std::os::unix::io::FromRawFd;
1910
1911    if fd < 0 {
1912        return Err(format!("invalid {flag} {fd}: must be non-negative"));
1913    }
1914    let duped = unsafe { libc::dup(fd) };
1915    if duped < 0 {
1916        return Err(format!(
1917            "duplicate {flag} {fd}: {}",
1918            io::Error::last_os_error()
1919        ));
1920    }
1921    let mut file = unsafe { File::from_raw_fd(duped) };
1922    file.write_all(line.as_bytes())
1923        .and_then(|_| file.flush())
1924        .map_err(|error| format!("write {flag} {fd}: {error}"))
1925}
1926
1927#[cfg(not(unix))]
1928fn write_raw_json_fd(_fd: i32, _line: &str, flag: &str) -> Result<(), String> {
1929    Err(format!("{flag} is only supported on Unix platforms"))
1930}
1931
1932async fn append_run_provenance_event(
1933    log: &Arc<harn_vm::event_log::AnyEventLog>,
1934    kind: &str,
1935    payload: serde_json::Value,
1936) {
1937    let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
1938        return;
1939    };
1940    let _ = log
1941        .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
1942        .await;
1943}
1944
1945async fn emit_run_attestation(
1946    log: &Arc<harn_vm::event_log::AnyEventLog>,
1947    path: &str,
1948    store_base: &Path,
1949    started_at_ms: i64,
1950    exit_code: i32,
1951    options: &RunAttestationOptions,
1952    stderr: &mut String,
1953) -> Result<(), String> {
1954    let finished_at_ms = now_ms();
1955    let status = if exit_code == 0 { "success" } else { "failure" };
1956    append_run_provenance_event(
1957        log,
1958        "finished",
1959        serde_json::json!({
1960            "pipeline": path,
1961            "status": status,
1962            "exit_code": exit_code,
1963        }),
1964    )
1965    .await;
1966    log.flush()
1967        .await
1968        .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
1969    let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
1970        .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
1971    let (signing_key, key_id) =
1972        harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
1973            .await
1974            .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
1975    let receipt = harn_vm::build_signed_receipt(
1976        log,
1977        harn_vm::ReceiptBuildOptions {
1978            pipeline: path.to_string(),
1979            status: status.to_string(),
1980            started_at_ms,
1981            finished_at_ms,
1982            exit_code,
1983            producer_name: "harn-cli".to_string(),
1984            producer_version: env!("CARGO_PKG_VERSION").to_string(),
1985        },
1986        &signing_key,
1987        key_id,
1988    )
1989    .await
1990    .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
1991    let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
1992    if let Some(parent) = receipt_path.parent() {
1993        fs::create_dir_all(parent)
1994            .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
1995    }
1996    let encoded = serde_json::to_vec_pretty(&receipt)
1997        .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
1998    fs::write(&receipt_path, encoded)
1999        .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
2000    stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
2001    Ok(())
2002}
2003
2004fn receipt_output_path(
2005    store_base: &Path,
2006    options: &RunAttestationOptions,
2007    receipt_id: &str,
2008) -> PathBuf {
2009    if let Some(path) = options.receipt_out.as_ref() {
2010        return path.clone();
2011    }
2012    harn_vm::runtime_paths::state_root(store_base)
2013        .join("receipts")
2014        .join(format!("{receipt_id}.json"))
2015}
2016
2017fn now_ms() -> i64 {
2018    std::time::SystemTime::now()
2019        .duration_since(std::time::UNIX_EPOCH)
2020        .map(|duration| duration.as_millis() as i64)
2021        .unwrap_or(0)
2022}
2023
2024/// Map a script's top-level return value to a process exit code.
2025///
2026/// - `int n`             → exit n (clamped to 0..=255)
2027/// - `Result::Ok(_)`     → exit 0
2028/// - `Result::Err(_)`    → exit 1
2029/// - anything else       → exit 0
2030fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
2031    use harn_vm::VmValue;
2032    match value {
2033        VmValue::Int(n) => (*n).clamp(0, 255) as i32,
2034        VmValue::EnumVariant(enum_variant) if enum_variant.is_variant("Result", "Err") => 1,
2035        _ => 0,
2036    }
2037}
2038
2039/// State for a single `harn run --json` invocation. Installs the
2040/// run-event sink in [`Self::install`] and removes it in [`Drop`], so
2041/// every exit path through `execute_run_inner` cleans up correctly
2042/// even if a panic unwinds out of the VM. Save-and-restore of any
2043/// previously installed sink keeps the helper safe to nest (rare, but
2044/// in-process embeddings can call into `harn run` from a host that
2045/// already had a sink wired).
2046///
2047/// `finalize_result` / `finalize_error` emit the terminal event and
2048/// build a [`RunOutcome`] whose stdout/stderr captured-buffer fields
2049/// stay **empty** — the canonical stream is on `out`.
2050/// `outcome.exit_code` still carries the process exit code so the
2051/// binary entry can `process::exit(...)`.
2052struct JsonRunSession {
2053    emitter: self::json_events::NdjsonEmitter,
2054    prior_sink: Option<Arc<dyn harn_vm::run_events::RunEventSink>>,
2055}
2056
2057impl JsonRunSession {
2058    fn install(options: RunJsonOptions, out: Box<dyn io::Write + Send>) -> Self {
2059        let emitter = NdjsonEmitter::new(out, options.quiet);
2060        let prior_sink = harn_vm::run_events::install_sink(emitter.sink());
2061        Self {
2062            emitter,
2063            prior_sink,
2064        }
2065    }
2066
2067    fn finalize_result(self, value: serde_json::Value, exit_code: i32) -> RunOutcome {
2068        self.emitter.emit_result(value, exit_code);
2069        RunOutcome {
2070            stdout: String::new(),
2071            stderr: String::new(),
2072            exit_code,
2073        }
2074    }
2075
2076    fn finalize_error(
2077        self,
2078        code: impl Into<String>,
2079        message: impl Into<String>,
2080        exit_code: i32,
2081    ) -> RunOutcome {
2082        self.emitter.emit_error(code, message);
2083        RunOutcome {
2084            stdout: String::new(),
2085            stderr: String::new(),
2086            exit_code,
2087        }
2088    }
2089}
2090
2091impl Drop for JsonRunSession {
2092    fn drop(&mut self) {
2093        match self.prior_sink.take() {
2094            Some(prior) => {
2095                harn_vm::run_events::install_sink(prior);
2096            }
2097            None => harn_vm::run_events::clear_sink(),
2098        }
2099    }
2100}
2101
2102#[allow(clippy::too_many_arguments)]
2103fn finalize_run_error(
2104    stdout: String,
2105    mut stderr: String,
2106    json_session: Option<JsonRunSession>,
2107    summary: Option<&RunSummaryOptions>,
2108    phase: Option<&RunPhaseOptions>,
2109    rusage: Option<&RunRusageOptions>,
2110    started: Instant,
2111    profile: Option<&harn_vm::profile::RunProfile>,
2112    timing: Option<&RunTiming>,
2113    main_events: u64,
2114    cpu_ms_total: Option<u64>,
2115    code: impl Into<String>,
2116    message: impl Into<String>,
2117) -> RunOutcome {
2118    let aux_emission = emit_run_aux_for_exit(
2119        summary,
2120        phase,
2121        rusage,
2122        started,
2123        1,
2124        profile,
2125        None,
2126        timing,
2127        main_events,
2128        cpu_ms_total,
2129        json_session.is_some(),
2130        &mut stderr,
2131    );
2132    if let Some(session) = json_session {
2133        let mut outcome = session.finalize_error(code, message, aux_emission.exit_code);
2134        outcome.stderr = aux_emission.stderr;
2135        return outcome;
2136    }
2137    RunOutcome {
2138        stdout,
2139        stderr,
2140        exit_code: aux_emission.exit_code,
2141    }
2142}
2143
2144/// Translate a preflight failure into either the `--json` error event
2145/// stream or a plain stderr message plus exit-code 1. Keeps the
2146/// `.harnpack` verify path's error reporting consistent with the rest
2147/// of `harn run`.
2148fn finalize_harnpack_error(
2149    mut stderr: String,
2150    json_session: Option<JsonRunSession>,
2151    summary: Option<&RunSummaryOptions>,
2152    phase: Option<&RunPhaseOptions>,
2153    rusage: Option<&RunRusageOptions>,
2154    started: Instant,
2155    err: HarnpackError,
2156) -> RunOutcome {
2157    let code = err.code;
2158    let message = err.message;
2159    stderr.push_str(&format!("error: {message}\n"));
2160    finalize_run_error(
2161        String::new(),
2162        stderr,
2163        json_session,
2164        summary,
2165        phase,
2166        rusage,
2167        started,
2168        None,
2169        None,
2170        0,
2171        None,
2172        code,
2173        message,
2174    )
2175}
2176
2177/// Successful `--dry-run-verify` path. Reports the bundle hash and
2178/// signature outcome on stderr (since stdout belongs to the script) and
2179/// emits a terminal `result` event when `--json` is active so consumers
2180/// see the run complete.
2181fn finalize_harnpack_dry_run(
2182    mut stderr: String,
2183    json_session: Option<JsonRunSession>,
2184    summary_options: Option<&RunSummaryOptions>,
2185    phase_options: Option<&RunPhaseOptions>,
2186    rusage_options: Option<&RunRusageOptions>,
2187    started: Instant,
2188    cpu_ms_total: Option<u64>,
2189    prepared: &PreparedHarnpack,
2190) -> RunOutcome {
2191    let summary = format!(
2192        "[harn] harnpack verify ok: bundle_hash={}, signature_verified={}, cache_hit={}\n",
2193        prepared.bundle_hash, prepared.signature_verified, prepared.cache_hit
2194    );
2195    stderr.push_str(&summary);
2196    let aux_emission = emit_run_aux_for_exit(
2197        summary_options,
2198        phase_options,
2199        rusage_options,
2200        started,
2201        0,
2202        None,
2203        None,
2204        None,
2205        0,
2206        cpu_ms_total,
2207        json_session.is_some(),
2208        &mut stderr,
2209    );
2210    if let Some(session) = json_session {
2211        if let Some(error) = aux_emission.error {
2212            let mut outcome = session.finalize_error(
2213                "run_aux",
2214                format!("failed to emit auxiliary run JSON: {error}"),
2215                1,
2216            );
2217            outcome.stderr = aux_emission.stderr;
2218            return outcome;
2219        }
2220        let value = serde_json::json!({
2221            "bundle_hash": prepared.bundle_hash,
2222            "signature_verified": prepared.signature_verified,
2223            "key_id": prepared.key_id,
2224            "cache_hit": prepared.cache_hit,
2225            "dry_run_verify": true,
2226        });
2227        let mut outcome = session.finalize_result(value, aux_emission.exit_code);
2228        outcome.stderr = aux_emission.stderr;
2229        return outcome;
2230    }
2231    RunOutcome {
2232        stdout: String::new(),
2233        stderr,
2234        exit_code: aux_emission.exit_code,
2235    }
2236}
2237
2238fn render_return_value_error(value: &harn_vm::VmValue) -> String {
2239    let harn_vm::VmValue::EnumVariant(enum_variant) = value else {
2240        return String::new();
2241    };
2242    if !enum_variant.is_variant("Result", "Err") {
2243        return String::new();
2244    }
2245    let rendered = enum_variant
2246        .fields
2247        .first()
2248        .map(|p| p.display())
2249        .unwrap_or_default();
2250    if rendered.is_empty() {
2251        "error\n".to_string()
2252    } else if rendered.ends_with('\n') {
2253        rendered
2254    } else {
2255        format!("{rendered}\n")
2256    }
2257}
2258
2259/// Connect to MCP servers declared in `harn.toml` and register them as
2260/// `mcp.<name>` globals on the VM. Connection failures are warned but do
2261/// not abort execution.
2262///
2263/// Servers with `lazy = true` are registered with the VM-side MCP
2264/// registry but NOT booted — their processes start the first time a
2265/// skill's `requires_mcp` list names them or user code calls
2266/// `mcp_ensure_active("name")` / `mcp_call(mcp.<name>, ...)`.
2267pub(crate) async fn connect_mcp_servers(
2268    servers: &[package::McpServerConfig],
2269    vm: &mut harn_vm::Vm,
2270) {
2271    use std::collections::BTreeMap;
2272    use std::rc::Rc;
2273    use std::time::Duration;
2274
2275    let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
2276    let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
2277
2278    for server in servers {
2279        let resolved_auth = match mcp::resolve_auth_for_server(server).await {
2280            Ok(resolution) => resolution,
2281            Err(error) => {
2282                eprintln!(
2283                    "warning: mcp: failed to load auth for '{}': {}",
2284                    server.name, error
2285                );
2286                AuthResolution::None
2287            }
2288        };
2289        let spec = serde_json::json!({
2290            "name": server.name,
2291            "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
2292            "command": server.command,
2293            "args": server.args,
2294            "env": server.env,
2295            "url": server.url,
2296            "auth_token": match resolved_auth {
2297                AuthResolution::Bearer(token) => Some(token),
2298                AuthResolution::None => server.auth_token.clone(),
2299            },
2300            "protocol_version": server.protocol_version,
2301            "protocol_mode": server.protocol_mode,
2302            "proxy_server_name": server.proxy_server_name,
2303        });
2304
2305        // Register with the VM-side registry regardless of lazy flag —
2306        // skill activation and `mcp_ensure_active` look up specs there.
2307        registrations.push(harn_vm::RegisteredMcpServer {
2308            name: server.name.clone(),
2309            spec: spec.clone(),
2310            lazy: server.lazy,
2311            card: server.card.clone(),
2312            keep_alive: server.keep_alive_ms.map(Duration::from_millis),
2313        });
2314
2315        if server.lazy {
2316            eprintln!(
2317                "[harn] mcp: deferred '{}' (lazy, boots on first use)",
2318                server.name
2319            );
2320            continue;
2321        }
2322
2323        match harn_vm::connect_mcp_server_from_json(&spec).await {
2324            Ok(handle) => {
2325                eprintln!("[harn] mcp: connected to '{}'", server.name);
2326                harn_vm::mcp_install_active(&server.name, handle.clone());
2327                mcp_dict.insert(server.name.clone(), harn_vm::VmValue::mcp_client(handle));
2328            }
2329            Err(e) => {
2330                eprintln!(
2331                    "warning: mcp: failed to connect to '{}': {}",
2332                    server.name, e
2333                );
2334            }
2335        }
2336    }
2337
2338    // Install registrations AFTER eager connects so `install_active`
2339    // above doesn't get overwritten.
2340    harn_vm::mcp_register_servers(registrations);
2341
2342    if !mcp_dict.is_empty() {
2343        vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
2344    }
2345}
2346
2347pub(crate) fn render_trace_summary() -> String {
2348    use std::fmt::Write;
2349    let entries = harn_vm::llm::take_trace();
2350    if entries.is_empty() {
2351        return String::new();
2352    }
2353    let mut out = String::new();
2354    let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
2355    let mut total_input = 0i64;
2356    let mut total_output = 0i64;
2357    let mut total_ms = 0u64;
2358    for (i, entry) in entries.iter().enumerate() {
2359        let _ = writeln!(
2360            out,
2361            "  #{}: {} | {} in + {} out tokens | {} ms",
2362            i + 1,
2363            entry.model,
2364            entry.input_tokens,
2365            entry.output_tokens,
2366            entry.duration_ms,
2367        );
2368        total_input += entry.input_tokens;
2369        total_output += entry.output_tokens;
2370        total_ms += entry.duration_ms;
2371    }
2372    let total_tokens = total_input + total_output;
2373    // Rough cost estimate using Sonnet 4 pricing ($3/MTok in, $15/MTok out).
2374    let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
2375    let _ = writeln!(
2376        out,
2377        "  \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
2378        entries.len(),
2379        if entries.len() == 1 { "" } else { "s" },
2380        total_tokens,
2381        total_input,
2382        total_output,
2383        total_ms,
2384        cost,
2385    );
2386    out
2387}
2388
2389/// Run a .harn file as an MCP server using the script-driven surface.
2390/// The pipeline must call `mcp_tools(registry)` (or the alias
2391/// `mcp_serve(registry)`) so the CLI can expose its tools, and may
2392/// register additional resources/prompts via `mcp_resource(...)` /
2393/// `mcp_resource_template(...)` / `mcp_prompt(...)`.
2394///
2395/// Dispatched into by `harn serve mcp <file>` when the script does not
2396/// define any `pub fn` exports — see `commands::serve::run_mcp_server`.
2397///
2398/// `card_source` — optional `--card` argument. Accepts either a path to
2399/// a JSON file or an inline JSON string. When present, the card is
2400/// embedded in the `initialize` response and exposed as the
2401/// `well-known://mcp-card` resource.
2402pub(crate) async fn run_file_mcp_serve(
2403    path: &str,
2404    card_source: Option<&str>,
2405    mode: RunFileMcpServeMode,
2406) {
2407    let mut diagnostics = String::new();
2408    let Some(LoadedChunk { source, chunk }) = compile_or_load_chunk_for_run(path, &mut diagnostics)
2409    else {
2410        eprint!("{diagnostics}");
2411        process::exit(1);
2412    };
2413    if !diagnostics.is_empty() {
2414        eprint!("{diagnostics}");
2415    }
2416
2417    let mut vm = harn_vm::Vm::new();
2418    harn_vm::register_vm_stdlib(&mut vm);
2419    crate::install_default_hostlib(&mut vm);
2420    let source_parent = std::path::Path::new(path)
2421        .parent()
2422        .unwrap_or(std::path::Path::new("."));
2423    let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
2424    let store_base = project_root.as_deref().unwrap_or(source_parent);
2425    harn_vm::register_store_builtins(&mut vm, store_base);
2426    harn_vm::register_metadata_builtins(&mut vm, store_base);
2427    let pipeline_name = std::path::Path::new(path)
2428        .file_stem()
2429        .and_then(|s| s.to_str())
2430        .unwrap_or("default");
2431    harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
2432    vm.set_source_info(path, &source);
2433    if let Some(ref root) = project_root {
2434        vm.set_project_root(root);
2435    }
2436    if let Some(p) = std::path::Path::new(path).parent() {
2437        if !p.as_os_str().is_empty() {
2438            vm.set_source_dir(p);
2439        }
2440    }
2441
2442    // Same skill discovery as `harn run` — see comment there.
2443    let loaded = load_skills(&SkillLoaderInputs {
2444        cli_dirs: Vec::new(),
2445        source_path: Some(std::path::PathBuf::from(path)),
2446    });
2447    emit_loader_warnings(&loaded.loader_warnings);
2448    install_skills_global(&mut vm, &loaded);
2449
2450    let extensions = package::load_runtime_extensions(Path::new(path));
2451    package::install_runtime_extensions(&extensions);
2452    if let Some(manifest) = extensions.root_manifest.as_ref() {
2453        if !manifest.mcp.is_empty() {
2454            connect_mcp_servers(&manifest.mcp, &mut vm).await;
2455        }
2456    }
2457    if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
2458        eprintln!("error: failed to install manifest triggers: {error}");
2459        process::exit(1);
2460    }
2461    if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
2462        eprintln!("error: failed to install manifest hooks: {error}");
2463        process::exit(1);
2464    }
2465
2466    let local = tokio::task::LocalSet::new();
2467    local
2468        .run_until(async {
2469            match vm.execute(&chunk).await {
2470                Ok(_) => {}
2471                Err(e) => {
2472                    eprint!("{}", vm.format_runtime_error(&e));
2473                    process::exit(1);
2474                }
2475            }
2476
2477            // Pipeline output goes to stderr — stdout is the MCP transport.
2478            let output = vm.output();
2479            if !output.is_empty() {
2480                eprint!("{output}");
2481            }
2482
2483            let registry = match harn_vm::take_mcp_serve_registry() {
2484                Some(r) => r,
2485                None => {
2486                    eprintln!("error: pipeline did not call mcp_serve(registry)");
2487                    eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
2488                    process::exit(1);
2489                }
2490            };
2491
2492            let tools = match harn_vm::tool_registry_to_mcp_tools(&registry) {
2493                Ok(t) => t,
2494                Err(e) => {
2495                    eprintln!("error: {e}");
2496                    process::exit(1);
2497                }
2498            };
2499
2500            let resources = harn_vm::take_mcp_serve_resources();
2501            let resource_templates = harn_vm::take_mcp_serve_resource_templates();
2502            let prompts = harn_vm::take_mcp_serve_prompts();
2503
2504            let server_name = std::path::Path::new(path)
2505                .file_stem()
2506                .and_then(|s| s.to_str())
2507                .unwrap_or("harn")
2508                .to_string();
2509
2510            let mut caps = Vec::new();
2511            if !tools.is_empty() {
2512                caps.push(format!(
2513                    "{} tool{}",
2514                    tools.len(),
2515                    if tools.len() == 1 { "" } else { "s" }
2516                ));
2517            }
2518            let total_resources = resources.len() + resource_templates.len();
2519            if total_resources > 0 {
2520                caps.push(format!(
2521                    "{total_resources} resource{}",
2522                    if total_resources == 1 { "" } else { "s" }
2523                ));
2524            }
2525            if !prompts.is_empty() {
2526                caps.push(format!(
2527                    "{} prompt{}",
2528                    prompts.len(),
2529                    if prompts.len() == 1 { "" } else { "s" }
2530                ));
2531            }
2532            eprintln!(
2533                "[harn] serve mcp: serving {} as '{server_name}'",
2534                caps.join(", ")
2535            );
2536
2537            let mut server =
2538                harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
2539            if let Some(source) = card_source {
2540                match resolve_card_source(source) {
2541                    Ok(card) => server = server.with_server_card(card),
2542                    Err(e) => {
2543                        eprintln!("error: --card: {e}");
2544                        process::exit(1);
2545                    }
2546                }
2547            }
2548            match mode {
2549                RunFileMcpServeMode::Stdio => {
2550                    if let Err(e) = server.run(&mut vm).await {
2551                        eprintln!("error: MCP server error: {e}");
2552                        process::exit(1);
2553                    }
2554                }
2555                RunFileMcpServeMode::Http {
2556                    options,
2557                    auth_policy,
2558                } => {
2559                    if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
2560                        server,
2561                        vm,
2562                        options,
2563                        auth_policy,
2564                    )
2565                    .await
2566                    {
2567                        eprintln!("error: MCP server error: {e}");
2568                        process::exit(1);
2569                    }
2570                }
2571            }
2572        })
2573        .await;
2574}
2575
2576/// Accept either a path to a JSON file or an inline JSON blob and
2577/// return the parsed `serde_json::Value`. Used by `--card`. Disambiguates
2578/// by peeking at the first non-whitespace character: `{` → inline JSON,
2579/// anything else → path.
2580pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
2581    let trimmed = source.trim_start();
2582    if trimmed.starts_with('{') || trimmed.starts_with('[') {
2583        return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
2584    }
2585    let path = std::path::Path::new(source);
2586    harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
2587}
2588
2589pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
2590    use notify::{Event, EventKind, RecursiveMode, Watcher};
2591
2592    let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
2593        eprintln!("Error: {e}");
2594        process::exit(1);
2595    });
2596    let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
2597
2598    eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
2599    run_file(
2600        path,
2601        false,
2602        denied_builtins.clone(),
2603        Vec::new(),
2604        CliLlmMockMode::Off,
2605        None,
2606        RunProfileOptions::default(),
2607    )
2608    .await;
2609
2610    let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
2611    let _watcher = {
2612        let tx = tx.clone();
2613        let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
2614            if let Ok(event) = res {
2615                if matches!(
2616                    event.kind,
2617                    EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
2618                ) {
2619                    let has_harn = event
2620                        .paths
2621                        .iter()
2622                        .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
2623                    if has_harn {
2624                        let _ = tx.blocking_send(());
2625                    }
2626                }
2627            }
2628        })
2629        .unwrap_or_else(|e| {
2630            eprintln!("Error setting up file watcher: {e}");
2631            process::exit(1);
2632        });
2633        watcher
2634            .watch(watch_dir, RecursiveMode::Recursive)
2635            .unwrap_or_else(|e| {
2636                eprintln!("Error watching directory: {e}");
2637                process::exit(1);
2638            });
2639        watcher // keep alive
2640    };
2641
2642    eprintln!(
2643        "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
2644        watch_dir.display()
2645    );
2646
2647    loop {
2648        rx.recv().await;
2649        // Debounce: let bursts of events settle for 200ms before re-running.
2650        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2651        while rx.try_recv().is_ok() {}
2652
2653        eprintln!();
2654        eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
2655        run_file(
2656            path,
2657            false,
2658            denied_builtins.clone(),
2659            Vec::new(),
2660            CliLlmMockMode::Off,
2661            None,
2662            RunProfileOptions::default(),
2663        )
2664        .await;
2665    }
2666}
2667
2668#[cfg(test)]
2669mod tests;