Skip to main content

harn_cli/commands/
run.rs

1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use harn_parser::DiagnosticSeverity;
11use harn_vm::event_log::EventLog;
12
13use crate::commands::mcp::{self, AuthResolution};
14use crate::commands::time::RunTiming;
15use crate::package;
16use crate::parse_source_file;
17use crate::skill_loader::{
18    canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
19    SkillLoaderInputs,
20};
21
22mod explain_cost;
23pub mod json_events;
24
25use self::json_events::NdjsonEmitter;
26
27/// JSON event-stream configuration for `--json` runs.
28#[derive(Clone, Default)]
29pub struct RunJsonOptions {
30    /// Suppress `stdout` / `stderr` events. Transcript, tool, hook,
31    /// persona, and the terminal result/error events still flow.
32    pub quiet: bool,
33}
34
35pub(crate) enum RunFileMcpServeMode {
36    Stdio,
37    Http {
38        options: harn_serve::McpHttpServeOptions,
39        auth_policy: harn_serve::AuthPolicy,
40    },
41}
42
43/// Core builtins that are never denied, even when using `--allow`.
44const CORE_BUILTINS: &[&str] = &[
45    "println",
46    "print",
47    "log",
48    "type_of",
49    "to_string",
50    "to_int",
51    "to_float",
52    "len",
53    "assert",
54    "assert_eq",
55    "assert_ne",
56    "json_parse",
57    "json_stringify",
58    "runtime_context",
59    "task_current",
60    "runtime_context_values",
61    "runtime_context_get",
62    "runtime_context_set",
63    "runtime_context_clear",
64];
65
66/// Build the set of denied builtin names from `--deny` or `--allow` flags.
67///
68/// - `--deny a,b,c` denies exactly those names.
69/// - `--allow a,b,c` denies everything *except* the listed names and the core builtins.
70pub(crate) fn build_denied_builtins(
71    deny_csv: Option<&str>,
72    allow_csv: Option<&str>,
73) -> HashSet<String> {
74    if let Some(csv) = deny_csv {
75        csv.split(',')
76            .map(|s| s.trim().to_string())
77            .filter(|s| !s.is_empty())
78            .collect()
79    } else if let Some(csv) = allow_csv {
80        // With --allow, we mark every registered stdlib builtin as denied
81        // *except* those in the allow list and the core builtins.
82        let allowed: HashSet<String> = csv
83            .split(',')
84            .map(|s| s.trim().to_string())
85            .filter(|s| !s.is_empty())
86            .collect();
87        let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
88
89        // Create a temporary VM with stdlib registered to enumerate all builtin names.
90        let mut tmp = harn_vm::Vm::new();
91        harn_vm::register_vm_stdlib(&mut tmp);
92        harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
93        harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
94
95        tmp.builtin_names()
96            .into_iter()
97            .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
98            .collect()
99    } else {
100        HashSet::new()
101    }
102}
103
104/// Result of [`compile_or_load_chunk_for_run`]. Failures propagate as
105/// diagnostic text on the run path so callers map them straight to a
106/// non-zero exit code without bespoke error types.
107pub(crate) struct LoadedChunk {
108    pub(crate) source: String,
109    pub(crate) chunk: harn_vm::Chunk,
110}
111
112/// Load the entry pipeline as a runnable [`harn_vm::Chunk`], using the
113/// content-addressed bytecode cache when its key matches. On a cache miss
114/// we read, parse, type-check, and compile, then persist the chunk.
115/// On a hit we skip parse/typecheck/compile entirely — the cache invariant
116/// is that a stored chunk passed those phases on the writer's harn build,
117/// and the key includes every transitively-imported user file so any
118/// change re-runs the full path.
119///
120/// `stderr` receives any diagnostic output. Returns `None` when a fatal
121/// type or compile error blocks execution; the caller maps that to
122/// exit-code 1.
123pub(crate) fn compile_or_load_chunk_for_run(
124    path: &str,
125    stderr: &mut String,
126) -> Option<LoadedChunk> {
127    compile_or_load_chunk_with_timing(path, stderr, None)
128}
129
130/// Like [`compile_or_load_chunk_for_run`] but lets the caller observe
131/// per-phase wall-clock timings (parse, typecheck, bytecode compile +
132/// cache hit/miss). Used by `harn time run` to drive the same code
133/// path as `harn run` while reporting phase-level timing.
134//
135// The `as_deref_mut` calls reborrow the inner `&mut RunTiming` so each
136// phase can mutate it independently. Clippy's `needless_option_as_deref`
137// is correct that the surface types match — that's exactly the
138// reborrow we want.
139#[allow(clippy::needless_option_as_deref)]
140pub(crate) fn compile_or_load_chunk_with_timing(
141    path: &str,
142    stderr: &mut String,
143    mut timing: Option<&mut RunTiming>,
144) -> Option<LoadedChunk> {
145    let source = match fs::read_to_string(path) {
146        Ok(s) => s,
147        Err(e) => {
148            stderr.push_str(&format!("Error reading {path}: {e}\n"));
149            return None;
150        }
151    };
152    if let Some(t) = timing.as_deref_mut() {
153        t.input_bytes = source.len() as u64;
154    }
155
156    let compile_phase_start = Instant::now();
157    let lookup = harn_vm::bytecode_cache::load(Path::new(path), &source);
158    if let Some(chunk) = lookup.chunk {
159        if let Some(t) = timing.as_deref_mut() {
160            t.cache_hit = true;
161            t.bytecode_compile = compile_phase_start.elapsed();
162        }
163        return Some(LoadedChunk { source, chunk });
164    }
165    if let Some(t) = timing.as_deref_mut() {
166        t.cache_hit = false;
167    }
168
169    let parse_start = Instant::now();
170    let (parsed_source, program) = parse_source_file(path);
171    debug_assert_eq!(parsed_source, source, "parse_source_file re-read drifted");
172    if let Some(t) = timing.as_deref_mut() {
173        t.parse = parse_start.elapsed();
174    }
175
176    let typecheck_start = Instant::now();
177    let mut had_type_error = false;
178    let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
179    for diag in &type_diagnostics {
180        let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
181        if matches!(diag.severity, DiagnosticSeverity::Error) {
182            had_type_error = true;
183        }
184        stderr.push_str(&rendered);
185    }
186    if let Some(t) = timing.as_deref_mut() {
187        t.typecheck = typecheck_start.elapsed();
188    }
189    if had_type_error {
190        return None;
191    }
192
193    let compile_step_start = Instant::now();
194    let chunk = match harn_vm::Compiler::new().compile(&program) {
195        Ok(c) => c,
196        Err(e) => {
197            stderr.push_str(&format!("error: compile error: {e}\n"));
198            return None;
199        }
200    };
201
202    // Cache misses are best-effort — read-only homedirs, full disks, and
203    // sandboxes are common in CI environments. Surface the failure as a
204    // single-line warning when explicitly requested via the audit hook;
205    // otherwise stay quiet to avoid bloating happy-path output.
206    if let Err(err) = harn_vm::bytecode_cache::store(&lookup.key, &chunk) {
207        if std::env::var_os("HARN_BYTECODE_CACHE_DEBUG").is_some() {
208            eprintln!("[harn] bytecode cache write skipped: {err}");
209        }
210    }
211    if let Some(t) = timing.as_deref_mut() {
212        t.bytecode_compile = compile_step_start.elapsed();
213    }
214
215    Some(LoadedChunk { source, chunk })
216}
217
218/// Run the static type checker against `program` with cross-module
219/// import-aware call resolution when the file's imports all resolve. Used
220/// by `run_file` and the MCP server entry so `harn run` catches undefined
221/// cross-module calls before the VM starts.
222fn typecheck_with_imports(
223    program: &[harn_parser::SNode],
224    path: &Path,
225    source: &str,
226) -> Vec<harn_parser::TypeDiagnostic> {
227    if let Err(error) = package::ensure_dependencies_materialized(path) {
228        eprintln!("error: {error}");
229        process::exit(1);
230    }
231    let graph = harn_modules::build(&[path.to_path_buf()]);
232    let mut checker = harn_parser::TypeChecker::new();
233    if let Some(imported) = graph.imported_names_for_file(path) {
234        checker = checker.with_imported_names(imported);
235    }
236    if let Some(imported) = graph.imported_type_declarations_for_file(path) {
237        checker = checker.with_imported_type_decls(imported);
238    }
239    if let Some(imported) = graph.imported_callable_declarations_for_file(path) {
240        checker = checker.with_imported_callable_decls(imported);
241    }
242    checker.check_with_source(program, source)
243}
244
245/// Build the wrapped source and temp file backing a `harn run -e` invocation.
246///
247/// `import` is a top-level declaration in Harn, so the leading prefix of
248/// import lines (with surrounding blanks/comments) is hoisted out of the
249/// `pipeline main(task) { ... }` wrapper. The temp file is created in the
250/// current working directory so relative imports (`import "./lib"`) and
251/// `harn.toml` discovery resolve against the user's project, not the
252/// system temp dir. If the CWD is unwritable we fall back to the system
253/// temp dir with a stderr warning — pure-expression `-e` still works,
254/// but relative imports will fail to resolve.
255pub(crate) fn prepare_eval_temp_file(
256    code: &str,
257) -> Result<(String, tempfile::NamedTempFile), String> {
258    let (header, body) = split_eval_header(code);
259    let wrapped = if header.is_empty() {
260        format!("pipeline main(task) {{\n{body}\n}}")
261    } else {
262        format!("{header}\npipeline main(task) {{\n{body}\n}}")
263    };
264
265    let tmp = create_eval_temp_file()?;
266    Ok((wrapped, tmp))
267}
268
269/// Try to place the `-e` temp file in the current working directory so
270/// relative imports and `harn.toml` discovery resolve against the user's
271/// project. Fall back to the system temp dir on failure (with a warning),
272/// so pure-expression `-e` keeps working in read-only contexts.
273fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
274    if let Some(dir) = std::env::current_dir().ok().as_deref() {
275        // Hidden prefix on Unix so editors / tree-walkers are less likely
276        // to pick the file up during its short lifetime.
277        match tempfile::Builder::new()
278            .prefix(".harn-eval-")
279            .suffix(".harn")
280            .tempfile_in(dir)
281        {
282            Ok(tmp) => return Ok(tmp),
283            Err(error) => eprintln!(
284                "warning: harn run -e: could not create temp file in {}: {error}; \
285                 relative imports will not resolve",
286                dir.display()
287            ),
288        }
289    }
290    tempfile::Builder::new()
291        .prefix("harn-eval-")
292        .suffix(".harn")
293        .tempfile()
294        .map_err(|e| format!("failed to create temp file for -e: {e}"))
295}
296
297/// Split the `-e` input into a header (top-level imports + leading
298/// blanks/comments) and a body (everything else, to be wrapped in
299/// `pipeline main(task)`). The header may be empty.
300///
301/// Lines whose first non-whitespace token is `import` or `pub import`
302/// are treated as imports. Scanning stops at the first non-blank,
303/// non-comment, non-import line.
304fn split_eval_header(code: &str) -> (String, String) {
305    let mut header_end = 0usize;
306    let mut last_kept = 0usize;
307    for (idx, line) in code.lines().enumerate() {
308        let trimmed = line.trim_start();
309        if trimmed.is_empty() || trimmed.starts_with("//") {
310            header_end = idx + 1;
311            continue;
312        }
313        let is_import = trimmed.starts_with("import ")
314            || trimmed.starts_with("import\t")
315            || trimmed.starts_with("import\"")
316            || trimmed.starts_with("pub import ")
317            || trimmed.starts_with("pub import\t");
318        if is_import {
319            header_end = idx + 1;
320            last_kept = idx + 1;
321        } else {
322            break;
323        }
324    }
325    if last_kept == 0 {
326        return (String::new(), code.to_string());
327    }
328    let mut header_lines: Vec<&str> = Vec::new();
329    let mut body_lines: Vec<&str> = Vec::new();
330    for (idx, line) in code.lines().enumerate() {
331        if idx < header_end {
332            header_lines.push(line);
333        } else {
334            body_lines.push(line);
335        }
336    }
337    (header_lines.join("\n"), body_lines.join("\n"))
338}
339
340#[derive(Clone, Debug, Default, PartialEq, Eq)]
341pub enum CliLlmMockMode {
342    #[default]
343    Off,
344    Replay {
345        fixture_path: PathBuf,
346    },
347    Record {
348        fixture_path: PathBuf,
349    },
350}
351
352#[derive(Clone, Debug, Default, PartialEq, Eq)]
353pub struct RunAttestationOptions {
354    pub receipt_out: Option<PathBuf>,
355    pub agent_id: Option<String>,
356}
357
358/// Opt-in profiling. When `text` is true the run prints a categorical
359/// breakdown to stderr after execution; when `json_path` is set the same
360/// rollup is serialized to that path. Either flag enables span tracing
361/// (i.e. `harn_vm::tracing::set_tracing_enabled(true)`).
362#[derive(Clone, Debug, Default, PartialEq, Eq)]
363pub struct RunProfileOptions {
364    pub text: bool,
365    pub json_path: Option<PathBuf>,
366}
367
368impl RunProfileOptions {
369    pub fn is_enabled(&self) -> bool {
370        self.text || self.json_path.is_some()
371    }
372}
373
374#[derive(Clone)]
375pub struct RunInterruptTokens {
376    pub cancel_token: Arc<AtomicBool>,
377    pub signal_token: Arc<Mutex<Option<String>>>,
378}
379
380struct ExecuteRunInputs<'a> {
381    path: &'a str,
382    trace: bool,
383    denied_builtins: HashSet<String>,
384    script_argv: Vec<String>,
385    skill_dirs_raw: Vec<String>,
386    llm_mock_mode: CliLlmMockMode,
387    attestation: Option<RunAttestationOptions>,
388    profile: RunProfileOptions,
389    interrupt_tokens: Option<RunInterruptTokens>,
390    json: Option<(RunJsonOptions, Box<dyn io::Write + Send>)>,
391    timing: Option<&'a mut RunTiming>,
392}
393
394/// Captured outcome of an in-process `execute_run` invocation. Tests use this
395/// instead of spawning the `harn` binary; the binary entry point translates
396/// it into real stdout/stderr writes + `process::exit`.
397#[derive(Clone, Debug, Default)]
398pub struct RunOutcome {
399    pub stdout: String,
400    pub stderr: String,
401    pub exit_code: i32,
402}
403
404pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
405    harn_vm::llm::clear_cli_llm_mock_mode();
406    match mode {
407        CliLlmMockMode::Off => Ok(()),
408        CliLlmMockMode::Replay { fixture_path } => {
409            let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
410            harn_vm::llm::install_cli_llm_mocks(mocks);
411            Ok(())
412        }
413        CliLlmMockMode::Record { .. } => {
414            harn_vm::llm::enable_cli_llm_mock_recording();
415            Ok(())
416        }
417    }
418}
419
420pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
421    let CliLlmMockMode::Record { fixture_path } = mode else {
422        return Ok(());
423    };
424    if let Some(parent) = fixture_path.parent() {
425        if !parent.as_os_str().is_empty() {
426            fs::create_dir_all(parent).map_err(|error| {
427                format!(
428                    "failed to create fixture directory {}: {error}",
429                    parent.display()
430                )
431            })?;
432        }
433    }
434
435    let lines = harn_vm::llm::take_cli_llm_recordings()
436        .into_iter()
437        .map(harn_vm::llm::serialize_llm_mock)
438        .collect::<Result<Vec<_>, _>>()?;
439    let body = if lines.is_empty() {
440        String::new()
441    } else {
442        format!("{}\n", lines.join("\n"))
443    };
444    fs::write(fixture_path, body)
445        .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
446}
447
448pub(crate) async fn run_file(
449    path: &str,
450    trace: bool,
451    denied_builtins: HashSet<String>,
452    script_argv: Vec<String>,
453    llm_mock_mode: CliLlmMockMode,
454    attestation: Option<RunAttestationOptions>,
455    profile: RunProfileOptions,
456) {
457    run_file_with_skill_dirs(
458        path,
459        trace,
460        denied_builtins,
461        script_argv,
462        Vec::new(),
463        llm_mock_mode,
464        attestation,
465        profile,
466        None,
467    )
468    .await;
469}
470
471pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
472    let outcome = execute_explain_cost(path);
473    if !outcome.stderr.is_empty() {
474        io::stderr().write_all(outcome.stderr.as_bytes()).ok();
475    }
476    if !outcome.stdout.is_empty() {
477        io::stdout().write_all(outcome.stdout.as_bytes()).ok();
478    }
479    if outcome.exit_code != 0 {
480        process::exit(outcome.exit_code);
481    }
482}
483
484#[allow(clippy::too_many_arguments)]
485pub(crate) async fn run_file_with_skill_dirs(
486    path: &str,
487    trace: bool,
488    denied_builtins: HashSet<String>,
489    script_argv: Vec<String>,
490    skill_dirs_raw: Vec<String>,
491    llm_mock_mode: CliLlmMockMode,
492    attestation: Option<RunAttestationOptions>,
493    profile: RunProfileOptions,
494    json: Option<RunJsonOptions>,
495) {
496    // Graceful shutdown: flush run records before exit on SIGINT/SIGTERM.
497    let interrupt_tokens = install_signal_shutdown_handler();
498
499    let _stdout_passthrough = StdoutPassthroughGuard::enable();
500    let json_with_stdout =
501        json.map(|opts| (opts, Box::new(io::stdout()) as Box<dyn io::Write + Send>));
502    let outcome = execute_run_inner(ExecuteRunInputs {
503        path,
504        trace,
505        denied_builtins,
506        script_argv,
507        skill_dirs_raw,
508        llm_mock_mode,
509        attestation,
510        profile,
511        interrupt_tokens: Some(interrupt_tokens.clone()),
512        json: json_with_stdout,
513        timing: None,
514    })
515    .await;
516
517    // `harn run` streams normal program stdout during execution. Any stdout
518    // left here came from older capture paths, so flush it after diagnostics.
519    if !outcome.stderr.is_empty() {
520        io::stderr().write_all(outcome.stderr.as_bytes()).ok();
521    }
522    if !outcome.stdout.is_empty() {
523        io::stdout().write_all(outcome.stdout.as_bytes()).ok();
524    }
525
526    let mut exit_code = outcome.exit_code;
527    if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
528        exit_code = 124;
529    }
530    if exit_code != 0 {
531        process::exit(exit_code);
532    }
533}
534
535pub fn execute_explain_cost(path: &str) -> RunOutcome {
536    let stdout = String::new();
537    let mut stderr = String::new();
538
539    let (source, program) = parse_source_file(path);
540
541    let mut had_type_error = false;
542    let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
543    for diag in &type_diagnostics {
544        let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
545        if matches!(diag.severity, DiagnosticSeverity::Error) {
546            had_type_error = true;
547        }
548        stderr.push_str(&rendered);
549    }
550    if had_type_error {
551        return RunOutcome {
552            stdout,
553            stderr,
554            exit_code: 1,
555        };
556    }
557
558    let extensions = package::load_runtime_extensions(Path::new(path));
559    package::install_runtime_extensions(&extensions);
560    RunOutcome {
561        stdout: explain_cost::render_explain_cost(path, &program),
562        stderr,
563        exit_code: 0,
564    }
565}
566
567pub(crate) struct StdoutPassthroughGuard {
568    previous: bool,
569}
570
571impl StdoutPassthroughGuard {
572    pub(crate) fn enable() -> Self {
573        Self {
574            previous: harn_vm::set_stdout_passthrough(true),
575        }
576    }
577}
578
579impl Drop for StdoutPassthroughGuard {
580    fn drop(&mut self) {
581        harn_vm::set_stdout_passthrough(self.previous);
582    }
583}
584
585// User-facing copy on Ctrl-C. We want the operator to know that a brief
586// pause after the first signal is expected (the VM rewinds the active
587// instruction, drops in-flight async ops like a hanging Ollama request,
588// and unwinds frames before the runtime exits) so they don't reflexively
589// reach for a second Ctrl-C and force-kill the process. The "Ctrl-C
590// again to force-exit" hint is load-bearing — earlier runs of harn
591// released to the fleet showed operators routinely double-tapping the
592// shortcut and losing the chance to inspect the error trace.
593const FIRST_SIGNAL_MESSAGE: &str =
594    "[harn] signal received, interrupting VM (give it a moment to unwind in-flight async ops; Ctrl-C again to force-exit)...";
595
596fn install_signal_shutdown_handler() -> RunInterruptTokens {
597    let tokens = RunInterruptTokens {
598        cancel_token: Arc::new(AtomicBool::new(false)),
599        signal_token: Arc::new(Mutex::new(None)),
600    };
601    let tokens_clone = tokens.clone();
602    tokio::spawn(async move {
603        #[cfg(unix)]
604        {
605            use tokio::signal::unix::{signal, SignalKind};
606            let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
607            let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
608            let mut sighup = signal(SignalKind::hangup()).expect("SIGHUP handler");
609            let mut seen_signal = false;
610            loop {
611                let signal_name = tokio::select! {
612                    _ = sigterm.recv() => "SIGTERM",
613                    _ = sigint.recv() => "SIGINT",
614                    _ = sighup.recv() => "SIGHUP",
615                };
616                if seen_signal {
617                    eprintln!("[harn] second signal received, terminating");
618                    process::exit(124);
619                }
620                seen_signal = true;
621                request_vm_interrupt(&tokens_clone, signal_name);
622                eprintln!("{FIRST_SIGNAL_MESSAGE}");
623            }
624        }
625        #[cfg(not(unix))]
626        {
627            let mut seen_signal = false;
628            loop {
629                let _ = tokio::signal::ctrl_c().await;
630                if seen_signal {
631                    eprintln!("[harn] second signal received, terminating");
632                    process::exit(124);
633                }
634                seen_signal = true;
635                request_vm_interrupt(&tokens_clone, "SIGINT");
636                eprintln!("{FIRST_SIGNAL_MESSAGE}");
637            }
638        }
639    });
640    tokens
641}
642
643fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
644    if let Ok(mut signal) = tokens.signal_token.lock() {
645        *signal = Some(signal_name.to_string());
646    }
647    tokens.cancel_token.store(true, Ordering::SeqCst);
648}
649
650/// In-process equivalent of `run_file_with_skill_dirs`. Returns the captured
651/// stdout, stderr, and what exit code the binary entry would have used,
652/// instead of writing to real stdout/stderr or calling `process::exit`.
653///
654/// Tests should call this directly. The `harn run` binary path wraps it.
655pub async fn execute_run(
656    path: &str,
657    trace: bool,
658    denied_builtins: HashSet<String>,
659    script_argv: Vec<String>,
660    skill_dirs_raw: Vec<String>,
661    llm_mock_mode: CliLlmMockMode,
662    attestation: Option<RunAttestationOptions>,
663    profile: RunProfileOptions,
664) -> RunOutcome {
665    execute_run_inner(ExecuteRunInputs {
666        path,
667        trace,
668        denied_builtins,
669        script_argv,
670        skill_dirs_raw,
671        llm_mock_mode,
672        attestation,
673        profile,
674        interrupt_tokens: None,
675        json: None,
676        timing: None,
677    })
678    .await
679}
680
681/// `execute_run` variant for `--json` mode. Returns once the run is
682/// complete; the NDJSON event stream — including the terminal `result`
683/// or `error` event — has already been written to `out` and flushed.
684/// `out` must be `Send` because the run-event sink may be called from
685/// any worker thread the VM spawns.
686#[allow(clippy::too_many_arguments)]
687pub async fn execute_run_json(
688    path: &str,
689    trace: bool,
690    denied_builtins: HashSet<String>,
691    script_argv: Vec<String>,
692    skill_dirs_raw: Vec<String>,
693    llm_mock_mode: CliLlmMockMode,
694    attestation: Option<RunAttestationOptions>,
695    profile: RunProfileOptions,
696    out: Box<dyn io::Write + Send>,
697    options: RunJsonOptions,
698) -> RunOutcome {
699    execute_run_inner(ExecuteRunInputs {
700        path,
701        trace,
702        denied_builtins,
703        script_argv,
704        skill_dirs_raw,
705        llm_mock_mode,
706        attestation,
707        profile,
708        interrupt_tokens: None,
709        json: Some((options, out)),
710        timing: None,
711    })
712    .await
713}
714
715/// Run a `.harn` file with the default builtin/argv set and record
716/// phase timings into `timing`. Used by `harn time run` so the
717/// instrumented run shares the exact code path as plain `harn run`.
718pub(crate) async fn execute_run_with_timing(
719    path: &str,
720    script_argv: Vec<String>,
721    timing: Option<&mut RunTiming>,
722) -> RunOutcome {
723    execute_run_inner(ExecuteRunInputs {
724        path,
725        trace: false,
726        denied_builtins: HashSet::new(),
727        script_argv,
728        skill_dirs_raw: Vec::new(),
729        llm_mock_mode: CliLlmMockMode::Off,
730        attestation: None,
731        profile: RunProfileOptions::default(),
732        interrupt_tokens: None,
733        json: None,
734        timing,
735    })
736    .await
737}
738
739// See [`compile_or_load_chunk_with_timing`] for why `as_deref_mut` is
740// the intentional reborrow pattern here.
741#[allow(clippy::needless_option_as_deref)]
742async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
743    let ExecuteRunInputs {
744        path,
745        trace,
746        denied_builtins,
747        script_argv,
748        skill_dirs_raw,
749        llm_mock_mode,
750        attestation,
751        profile,
752        interrupt_tokens,
753        json,
754        mut timing,
755    } = inputs;
756
757    // `--json` installs an in-process sink that diverts every
758    // observable VM event (stdout, stderr, transcript, tool, hook,
759    // persona) into a single NDJSON stream on `out`. The sink stays
760    // active until we drop the guard below — fatal errors emit a
761    // terminal `error` event on the same stream before bailing.
762    let json_session = json.map(|(options, out)| JsonRunSession::install(options, out));
763
764    let mut stderr = String::new();
765    let mut stdout = String::new();
766
767    let Some(LoadedChunk { source, chunk }) =
768        compile_or_load_chunk_with_timing(path, &mut stderr, timing.as_deref_mut())
769    else {
770        if let Some(session) = json_session {
771            return session.finalize_error("compile_error", stderr, 1);
772        }
773        return RunOutcome {
774            stdout,
775            stderr,
776            exit_code: 1,
777        };
778    };
779
780    // Bracket the VM-setup phase explicitly. `run_setup` covers
781    // everything between the bytecode compile and the first VM
782    // instruction; `run_main` covers `vm.execute` proper.
783    let setup_start = Instant::now();
784
785    if trace {
786        harn_vm::llm::enable_tracing();
787    }
788    if profile.is_enabled() {
789        harn_vm::tracing::set_tracing_enabled(true);
790    }
791    if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
792        stderr.push_str(&format!("error: {error}\n"));
793        if let Some(session) = json_session {
794            return session.finalize_error("llm_mock_install", error, 1);
795        }
796        return RunOutcome {
797            stdout,
798            stderr,
799            exit_code: 1,
800        };
801    }
802
803    let mut vm = harn_vm::Vm::new();
804    if let Some(interrupt_tokens) = interrupt_tokens {
805        vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
806        vm.install_cancel_token(interrupt_tokens.cancel_token);
807    }
808    harn_vm::register_vm_stdlib(&mut vm);
809    crate::install_default_hostlib(&mut vm);
810    let source_parent = std::path::Path::new(path)
811        .parent()
812        .unwrap_or(std::path::Path::new("."));
813    // Metadata/store rooted at harn.toml when present; source dir otherwise.
814    let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
815    let store_base = project_root.as_deref().unwrap_or(source_parent);
816    let attestation_started_at_ms = now_ms();
817    let attestation_log = if attestation.is_some() {
818        Some(harn_vm::event_log::install_memory_for_current_thread(256))
819    } else {
820        None
821    };
822    if let Some(log) = attestation_log.as_ref() {
823        append_run_provenance_event(
824            log,
825            "started",
826            serde_json::json!({
827                "pipeline": path,
828                "argv": &script_argv,
829                "project_root": store_base.display().to_string(),
830            }),
831        )
832        .await;
833    }
834    harn_vm::register_store_builtins(&mut vm, store_base);
835    harn_vm::register_metadata_builtins(&mut vm, store_base);
836    let pipeline_name = std::path::Path::new(path)
837        .file_stem()
838        .and_then(|s| s.to_str())
839        .unwrap_or("default");
840    harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
841    vm.set_source_info(path, &source);
842    if !denied_builtins.is_empty() {
843        vm.set_denied_builtins(denied_builtins);
844    }
845    if let Some(ref root) = project_root {
846        vm.set_project_root(root);
847    }
848
849    if let Some(p) = std::path::Path::new(path).parent() {
850        if !p.as_os_str().is_empty() {
851            vm.set_source_dir(p);
852        }
853    }
854
855    // Load filesystem + manifest skills before the pipeline runs so
856    // `skills` is populated with a pre-discovered registry (see #73).
857    let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
858    let loaded = load_skills(&SkillLoaderInputs {
859        cli_dirs,
860        source_path: Some(std::path::PathBuf::from(path)),
861    });
862    emit_loader_warnings(&loaded.loader_warnings);
863    install_skills_global(&mut vm, &loaded);
864
865    // `harn run script.harn -- a b c` yields `argv == ["a", "b", "c"]`.
866    // Always set so scripts can rely on `len(argv)`.
867    let argv_values: Vec<harn_vm::VmValue> = script_argv
868        .iter()
869        .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
870        .collect();
871    vm.set_global(
872        "argv",
873        harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
874    );
875
876    // Install the script's `Harness` capability handle so the auto-call
877    // emitted by `Compiler::compile()` for `fn main(harness: Harness)`
878    // entrypoints can read it.
879    vm.set_harness(harn_vm::Harness::real());
880
881    let extensions = package::load_runtime_extensions(Path::new(path));
882    package::install_runtime_extensions(&extensions);
883    if let Some(manifest) = extensions.root_manifest.as_ref() {
884        if !manifest.mcp.is_empty() {
885            connect_mcp_servers(&manifest.mcp, &mut vm).await;
886        }
887    }
888    if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
889        stderr.push_str(&format!(
890            "error: failed to install manifest triggers: {error}\n"
891        ));
892        if let Some(session) = json_session {
893            return session.finalize_error("manifest_triggers", error.to_string(), 1);
894        }
895        return RunOutcome {
896            stdout,
897            stderr,
898            exit_code: 1,
899        };
900    }
901    if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
902        stderr.push_str(&format!(
903            "error: failed to install manifest hooks: {error}\n"
904        ));
905        if let Some(session) = json_session {
906            return session.finalize_error("manifest_hooks", error.to_string(), 1);
907        }
908        return RunOutcome {
909            stdout,
910            stderr,
911            exit_code: 1,
912        };
913    }
914
915    // Run inside a LocalSet so spawn_local works for concurrency builtins.
916    let local = tokio::task::LocalSet::new();
917    if let Some(t) = timing.as_deref_mut() {
918        t.run_setup = setup_start.elapsed();
919    }
920    let main_start = Instant::now();
921    let execution = local
922        .run_until(async {
923            match vm.execute(&chunk).await {
924                Ok(value) => Ok((vm.output(), value)),
925                Err(e) => Err(vm.format_runtime_error(&e)),
926            }
927        })
928        .await;
929    if let Some(t) = timing.as_deref_mut() {
930        t.run_main = main_start.elapsed();
931    }
932    if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
933        stderr.push_str(&format!("error: {error}\n"));
934        if let Some(session) = json_session {
935            return session.finalize_error("llm_mock_record", error, 1);
936        }
937        return RunOutcome {
938            stdout,
939            stderr,
940            exit_code: 1,
941        };
942    }
943
944    // Always drain any captured stderr accumulated during execution.
945    let buffered_stderr = harn_vm::take_stderr_buffer();
946    stderr.push_str(&buffered_stderr);
947
948    let exit_code = match &execution {
949        Ok((_, return_value)) => exit_code_from_return_value(return_value),
950        Err(_) => 1,
951    };
952
953    if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
954        if let Err(error) = emit_run_attestation(
955            log,
956            path,
957            store_base,
958            attestation_started_at_ms,
959            exit_code,
960            options,
961            &mut stderr,
962        )
963        .await
964        {
965            stderr.push_str(&format!(
966                "error: failed to emit provenance receipt: {error}\n"
967            ));
968            if let Some(session) = json_session {
969                return session.finalize_error("attestation", error.to_string(), 1);
970            }
971            return RunOutcome {
972                stdout,
973                stderr,
974                exit_code: 1,
975            };
976        }
977        harn_vm::event_log::reset_active_event_log();
978    }
979
980    match execution {
981        Ok((output, return_value)) => {
982            stdout.push_str(output);
983            if trace {
984                stderr.push_str(&render_trace_summary());
985            }
986            if profile.is_enabled() {
987                if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
988                    stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
989                }
990            }
991            if exit_code != 0 {
992                stderr.push_str(&render_return_value_error(&return_value));
993            }
994            if let Some(session) = json_session {
995                let value = harn_vm::llm::vm_value_to_json(&return_value);
996                return session.finalize_result(value, exit_code);
997            }
998            RunOutcome {
999                stdout,
1000                stderr,
1001                exit_code,
1002            }
1003        }
1004        Err(rendered_error) => {
1005            stderr.push_str(&rendered_error);
1006            if profile.is_enabled() {
1007                if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
1008                    stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1009                }
1010            }
1011            if let Some(session) = json_session {
1012                return session.finalize_error("runtime", rendered_error, 1);
1013            }
1014            RunOutcome {
1015                stdout,
1016                stderr,
1017                exit_code: 1,
1018            }
1019        }
1020    }
1021}
1022
1023fn render_and_persist_profile(
1024    options: &RunProfileOptions,
1025    stderr: &mut String,
1026) -> Result<(), String> {
1027    let spans = harn_vm::tracing::peek_spans();
1028    let profile = harn_vm::profile::build(&spans);
1029    if options.text {
1030        stderr.push_str(&harn_vm::profile::render(&profile));
1031    }
1032    if let Some(path) = options.json_path.as_ref() {
1033        if let Some(parent) = path.parent() {
1034            if !parent.as_os_str().is_empty() {
1035                fs::create_dir_all(parent)
1036                    .map_err(|error| format!("create {}: {error}", parent.display()))?;
1037            }
1038        }
1039        let json = serde_json::to_string_pretty(&profile)
1040            .map_err(|error| format!("serialize profile: {error}"))?;
1041        fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
1042    }
1043    Ok(())
1044}
1045
1046async fn append_run_provenance_event(
1047    log: &Arc<harn_vm::event_log::AnyEventLog>,
1048    kind: &str,
1049    payload: serde_json::Value,
1050) {
1051    let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
1052        return;
1053    };
1054    let _ = log
1055        .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
1056        .await;
1057}
1058
1059async fn emit_run_attestation(
1060    log: &Arc<harn_vm::event_log::AnyEventLog>,
1061    path: &str,
1062    store_base: &Path,
1063    started_at_ms: i64,
1064    exit_code: i32,
1065    options: &RunAttestationOptions,
1066    stderr: &mut String,
1067) -> Result<(), String> {
1068    let finished_at_ms = now_ms();
1069    let status = if exit_code == 0 { "success" } else { "failure" };
1070    append_run_provenance_event(
1071        log,
1072        "finished",
1073        serde_json::json!({
1074            "pipeline": path,
1075            "status": status,
1076            "exit_code": exit_code,
1077        }),
1078    )
1079    .await;
1080    log.flush()
1081        .await
1082        .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
1083    let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
1084        .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
1085    let (signing_key, key_id) =
1086        harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
1087            .await
1088            .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
1089    let receipt = harn_vm::build_signed_receipt(
1090        log,
1091        harn_vm::ReceiptBuildOptions {
1092            pipeline: path.to_string(),
1093            status: status.to_string(),
1094            started_at_ms,
1095            finished_at_ms,
1096            exit_code,
1097            producer_name: "harn-cli".to_string(),
1098            producer_version: env!("CARGO_PKG_VERSION").to_string(),
1099        },
1100        &signing_key,
1101        key_id,
1102    )
1103    .await
1104    .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
1105    let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
1106    if let Some(parent) = receipt_path.parent() {
1107        fs::create_dir_all(parent)
1108            .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
1109    }
1110    let encoded = serde_json::to_vec_pretty(&receipt)
1111        .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
1112    fs::write(&receipt_path, encoded)
1113        .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
1114    stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
1115    Ok(())
1116}
1117
1118fn receipt_output_path(
1119    store_base: &Path,
1120    options: &RunAttestationOptions,
1121    receipt_id: &str,
1122) -> PathBuf {
1123    if let Some(path) = options.receipt_out.as_ref() {
1124        return path.clone();
1125    }
1126    harn_vm::runtime_paths::state_root(store_base)
1127        .join("receipts")
1128        .join(format!("{receipt_id}.json"))
1129}
1130
1131fn now_ms() -> i64 {
1132    std::time::SystemTime::now()
1133        .duration_since(std::time::UNIX_EPOCH)
1134        .map(|duration| duration.as_millis() as i64)
1135        .unwrap_or(0)
1136}
1137
1138/// Map a script's top-level return value to a process exit code.
1139///
1140/// - `int n`             → exit n (clamped to 0..=255)
1141/// - `Result::Ok(_)`     → exit 0
1142/// - `Result::Err(_)`    → exit 1
1143/// - anything else       → exit 0
1144fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
1145    use harn_vm::VmValue;
1146    match value {
1147        VmValue::Int(n) => (*n).clamp(0, 255) as i32,
1148        VmValue::EnumVariant {
1149            enum_name,
1150            variant,
1151            fields,
1152        } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => 1,
1153        _ => 0,
1154    }
1155}
1156
1157/// State for a single `harn run --json` invocation. Installs the
1158/// run-event sink in [`Self::install`] and removes it in [`Drop`], so
1159/// every exit path through `execute_run_inner` cleans up correctly
1160/// even if a panic unwinds out of the VM. Save-and-restore of any
1161/// previously installed sink keeps the helper safe to nest (rare, but
1162/// in-process embeddings can call into `harn run` from a host that
1163/// already had a sink wired).
1164///
1165/// `finalize_result` / `finalize_error` emit the terminal event and
1166/// build a [`RunOutcome`] whose stdout/stderr captured-buffer fields
1167/// stay **empty** — the canonical stream is on `out`.
1168/// `outcome.exit_code` still carries the process exit code so the
1169/// binary entry can `process::exit(...)`.
1170struct JsonRunSession {
1171    emitter: self::json_events::NdjsonEmitter,
1172    prior_sink: Option<Arc<dyn harn_vm::run_events::RunEventSink>>,
1173}
1174
1175impl JsonRunSession {
1176    fn install(options: RunJsonOptions, out: Box<dyn io::Write + Send>) -> Self {
1177        let emitter = NdjsonEmitter::new(out, options.quiet);
1178        let prior_sink = harn_vm::run_events::install_sink(emitter.sink());
1179        Self {
1180            emitter,
1181            prior_sink,
1182        }
1183    }
1184
1185    fn finalize_result(self, value: serde_json::Value, exit_code: i32) -> RunOutcome {
1186        self.emitter.emit_result(value, exit_code);
1187        RunOutcome {
1188            stdout: String::new(),
1189            stderr: String::new(),
1190            exit_code,
1191        }
1192    }
1193
1194    fn finalize_error(
1195        self,
1196        code: impl Into<String>,
1197        message: impl Into<String>,
1198        exit_code: i32,
1199    ) -> RunOutcome {
1200        self.emitter.emit_error(code, message);
1201        RunOutcome {
1202            stdout: String::new(),
1203            stderr: String::new(),
1204            exit_code,
1205        }
1206    }
1207}
1208
1209impl Drop for JsonRunSession {
1210    fn drop(&mut self) {
1211        match self.prior_sink.take() {
1212            Some(prior) => {
1213                harn_vm::run_events::install_sink(prior);
1214            }
1215            None => harn_vm::run_events::clear_sink(),
1216        }
1217    }
1218}
1219
1220fn render_return_value_error(value: &harn_vm::VmValue) -> String {
1221    let harn_vm::VmValue::EnumVariant {
1222        enum_name,
1223        variant,
1224        fields,
1225    } = value
1226    else {
1227        return String::new();
1228    };
1229    if enum_name.as_ref() != "Result" || variant.as_ref() != "Err" {
1230        return String::new();
1231    }
1232    let rendered = fields.first().map(|p| p.display()).unwrap_or_default();
1233    if rendered.is_empty() {
1234        "error\n".to_string()
1235    } else if rendered.ends_with('\n') {
1236        rendered
1237    } else {
1238        format!("{rendered}\n")
1239    }
1240}
1241
1242/// Connect to MCP servers declared in `harn.toml` and register them as
1243/// `mcp.<name>` globals on the VM. Connection failures are warned but do
1244/// not abort execution.
1245///
1246/// Servers with `lazy = true` are registered with the VM-side MCP
1247/// registry but NOT booted — their processes start the first time a
1248/// skill's `requires_mcp` list names them or user code calls
1249/// `mcp_ensure_active("name")` / `mcp_call(mcp.<name>, ...)`.
1250pub(crate) async fn connect_mcp_servers(
1251    servers: &[package::McpServerConfig],
1252    vm: &mut harn_vm::Vm,
1253) {
1254    use std::collections::BTreeMap;
1255    use std::rc::Rc;
1256    use std::time::Duration;
1257
1258    let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
1259    let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
1260
1261    for server in servers {
1262        let resolved_auth = match mcp::resolve_auth_for_server(server).await {
1263            Ok(resolution) => resolution,
1264            Err(error) => {
1265                eprintln!(
1266                    "warning: mcp: failed to load auth for '{}': {}",
1267                    server.name, error
1268                );
1269                AuthResolution::None
1270            }
1271        };
1272        let spec = serde_json::json!({
1273            "name": server.name,
1274            "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
1275            "command": server.command,
1276            "args": server.args,
1277            "env": server.env,
1278            "url": server.url,
1279            "auth_token": match resolved_auth {
1280                AuthResolution::Bearer(token) => Some(token),
1281                AuthResolution::None => server.auth_token.clone(),
1282            },
1283            "protocol_version": server.protocol_version,
1284            "proxy_server_name": server.proxy_server_name,
1285        });
1286
1287        // Register with the VM-side registry regardless of lazy flag —
1288        // skill activation and `mcp_ensure_active` look up specs there.
1289        registrations.push(harn_vm::RegisteredMcpServer {
1290            name: server.name.clone(),
1291            spec: spec.clone(),
1292            lazy: server.lazy,
1293            card: server.card.clone(),
1294            keep_alive: server.keep_alive_ms.map(Duration::from_millis),
1295        });
1296
1297        if server.lazy {
1298            eprintln!(
1299                "[harn] mcp: deferred '{}' (lazy, boots on first use)",
1300                server.name
1301            );
1302            continue;
1303        }
1304
1305        match harn_vm::connect_mcp_server_from_json(&spec).await {
1306            Ok(handle) => {
1307                eprintln!("[harn] mcp: connected to '{}'", server.name);
1308                harn_vm::mcp_install_active(&server.name, handle.clone());
1309                mcp_dict.insert(server.name.clone(), harn_vm::VmValue::McpClient(handle));
1310            }
1311            Err(e) => {
1312                eprintln!(
1313                    "warning: mcp: failed to connect to '{}': {}",
1314                    server.name, e
1315                );
1316            }
1317        }
1318    }
1319
1320    // Install registrations AFTER eager connects so `install_active`
1321    // above doesn't get overwritten.
1322    harn_vm::mcp_register_servers(registrations);
1323
1324    if !mcp_dict.is_empty() {
1325        vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
1326    }
1327}
1328
1329pub(crate) fn render_trace_summary() -> String {
1330    use std::fmt::Write;
1331    let entries = harn_vm::llm::take_trace();
1332    if entries.is_empty() {
1333        return String::new();
1334    }
1335    let mut out = String::new();
1336    let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
1337    let mut total_input = 0i64;
1338    let mut total_output = 0i64;
1339    let mut total_ms = 0u64;
1340    for (i, entry) in entries.iter().enumerate() {
1341        let _ = writeln!(
1342            out,
1343            "  #{}: {} | {} in + {} out tokens | {} ms",
1344            i + 1,
1345            entry.model,
1346            entry.input_tokens,
1347            entry.output_tokens,
1348            entry.duration_ms,
1349        );
1350        total_input += entry.input_tokens;
1351        total_output += entry.output_tokens;
1352        total_ms += entry.duration_ms;
1353    }
1354    let total_tokens = total_input + total_output;
1355    // Rough cost estimate using Sonnet 4 pricing ($3/MTok in, $15/MTok out).
1356    let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
1357    let _ = writeln!(
1358        out,
1359        "  \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
1360        entries.len(),
1361        if entries.len() == 1 { "" } else { "s" },
1362        total_tokens,
1363        total_input,
1364        total_output,
1365        total_ms,
1366        cost,
1367    );
1368    out
1369}
1370
1371/// Run a .harn file as an MCP server using the script-driven surface.
1372/// The pipeline must call `mcp_tools(registry)` (or the alias
1373/// `mcp_serve(registry)`) so the CLI can expose its tools, and may
1374/// register additional resources/prompts via `mcp_resource(...)` /
1375/// `mcp_resource_template(...)` / `mcp_prompt(...)`.
1376///
1377/// Dispatched into by `harn serve mcp <file>` when the script does not
1378/// define any `pub fn` exports — see `commands::serve::run_mcp_server`.
1379///
1380/// `card_source` — optional `--card` argument. Accepts either a path to
1381/// a JSON file or an inline JSON string. When present, the card is
1382/// embedded in the `initialize` response and exposed as the
1383/// `well-known://mcp-card` resource.
1384pub(crate) async fn run_file_mcp_serve(
1385    path: &str,
1386    card_source: Option<&str>,
1387    mode: RunFileMcpServeMode,
1388) {
1389    let mut diagnostics = String::new();
1390    let Some(LoadedChunk { source, chunk }) = compile_or_load_chunk_for_run(path, &mut diagnostics)
1391    else {
1392        eprint!("{diagnostics}");
1393        process::exit(1);
1394    };
1395    if !diagnostics.is_empty() {
1396        eprint!("{diagnostics}");
1397    }
1398
1399    let mut vm = harn_vm::Vm::new();
1400    harn_vm::register_vm_stdlib(&mut vm);
1401    crate::install_default_hostlib(&mut vm);
1402    let source_parent = std::path::Path::new(path)
1403        .parent()
1404        .unwrap_or(std::path::Path::new("."));
1405    let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1406    let store_base = project_root.as_deref().unwrap_or(source_parent);
1407    harn_vm::register_store_builtins(&mut vm, store_base);
1408    harn_vm::register_metadata_builtins(&mut vm, store_base);
1409    let pipeline_name = std::path::Path::new(path)
1410        .file_stem()
1411        .and_then(|s| s.to_str())
1412        .unwrap_or("default");
1413    harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1414    vm.set_source_info(path, &source);
1415    if let Some(ref root) = project_root {
1416        vm.set_project_root(root);
1417    }
1418    if let Some(p) = std::path::Path::new(path).parent() {
1419        if !p.as_os_str().is_empty() {
1420            vm.set_source_dir(p);
1421        }
1422    }
1423
1424    // Same skill discovery as `harn run` — see comment there.
1425    let loaded = load_skills(&SkillLoaderInputs {
1426        cli_dirs: Vec::new(),
1427        source_path: Some(std::path::PathBuf::from(path)),
1428    });
1429    emit_loader_warnings(&loaded.loader_warnings);
1430    install_skills_global(&mut vm, &loaded);
1431
1432    let extensions = package::load_runtime_extensions(Path::new(path));
1433    package::install_runtime_extensions(&extensions);
1434    if let Some(manifest) = extensions.root_manifest.as_ref() {
1435        if !manifest.mcp.is_empty() {
1436            connect_mcp_servers(&manifest.mcp, &mut vm).await;
1437        }
1438    }
1439    if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1440        eprintln!("error: failed to install manifest triggers: {error}");
1441        process::exit(1);
1442    }
1443    if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1444        eprintln!("error: failed to install manifest hooks: {error}");
1445        process::exit(1);
1446    }
1447
1448    let local = tokio::task::LocalSet::new();
1449    local
1450        .run_until(async {
1451            match vm.execute(&chunk).await {
1452                Ok(_) => {}
1453                Err(e) => {
1454                    eprint!("{}", vm.format_runtime_error(&e));
1455                    process::exit(1);
1456                }
1457            }
1458
1459            // Pipeline output goes to stderr — stdout is the MCP transport.
1460            let output = vm.output();
1461            if !output.is_empty() {
1462                eprint!("{output}");
1463            }
1464
1465            let registry = match harn_vm::take_mcp_serve_registry() {
1466                Some(r) => r,
1467                None => {
1468                    eprintln!("error: pipeline did not call mcp_serve(registry)");
1469                    eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
1470                    process::exit(1);
1471                }
1472            };
1473
1474            let tools = match harn_vm::tool_registry_to_mcp_tools(&registry) {
1475                Ok(t) => t,
1476                Err(e) => {
1477                    eprintln!("error: {e}");
1478                    process::exit(1);
1479                }
1480            };
1481
1482            let resources = harn_vm::take_mcp_serve_resources();
1483            let resource_templates = harn_vm::take_mcp_serve_resource_templates();
1484            let prompts = harn_vm::take_mcp_serve_prompts();
1485
1486            let server_name = std::path::Path::new(path)
1487                .file_stem()
1488                .and_then(|s| s.to_str())
1489                .unwrap_or("harn")
1490                .to_string();
1491
1492            let mut caps = Vec::new();
1493            if !tools.is_empty() {
1494                caps.push(format!(
1495                    "{} tool{}",
1496                    tools.len(),
1497                    if tools.len() == 1 { "" } else { "s" }
1498                ));
1499            }
1500            let total_resources = resources.len() + resource_templates.len();
1501            if total_resources > 0 {
1502                caps.push(format!(
1503                    "{total_resources} resource{}",
1504                    if total_resources == 1 { "" } else { "s" }
1505                ));
1506            }
1507            if !prompts.is_empty() {
1508                caps.push(format!(
1509                    "{} prompt{}",
1510                    prompts.len(),
1511                    if prompts.len() == 1 { "" } else { "s" }
1512                ));
1513            }
1514            eprintln!(
1515                "[harn] serve mcp: serving {} as '{server_name}'",
1516                caps.join(", ")
1517            );
1518
1519            let mut server =
1520                harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
1521            if let Some(source) = card_source {
1522                match resolve_card_source(source) {
1523                    Ok(card) => server = server.with_server_card(card),
1524                    Err(e) => {
1525                        eprintln!("error: --card: {e}");
1526                        process::exit(1);
1527                    }
1528                }
1529            }
1530            match mode {
1531                RunFileMcpServeMode::Stdio => {
1532                    if let Err(e) = server.run(&mut vm).await {
1533                        eprintln!("error: MCP server error: {e}");
1534                        process::exit(1);
1535                    }
1536                }
1537                RunFileMcpServeMode::Http {
1538                    options,
1539                    auth_policy,
1540                } => {
1541                    if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
1542                        server,
1543                        vm,
1544                        options,
1545                        auth_policy,
1546                    )
1547                    .await
1548                    {
1549                        eprintln!("error: MCP server error: {e}");
1550                        process::exit(1);
1551                    }
1552                }
1553            }
1554        })
1555        .await;
1556}
1557
1558/// Accept either a path to a JSON file or an inline JSON blob and
1559/// return the parsed `serde_json::Value`. Used by `--card`. Disambiguates
1560/// by peeking at the first non-whitespace character: `{` → inline JSON,
1561/// anything else → path.
1562pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
1563    let trimmed = source.trim_start();
1564    if trimmed.starts_with('{') || trimmed.starts_with('[') {
1565        return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
1566    }
1567    let path = std::path::Path::new(source);
1568    harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
1569}
1570
1571pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
1572    use notify::{Event, EventKind, RecursiveMode, Watcher};
1573
1574    let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
1575        eprintln!("Error: {e}");
1576        process::exit(1);
1577    });
1578    let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
1579
1580    eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
1581    run_file(
1582        path,
1583        false,
1584        denied_builtins.clone(),
1585        Vec::new(),
1586        CliLlmMockMode::Off,
1587        None,
1588        RunProfileOptions::default(),
1589    )
1590    .await;
1591
1592    let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1593    let _watcher = {
1594        let tx = tx.clone();
1595        let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
1596            if let Ok(event) = res {
1597                if matches!(
1598                    event.kind,
1599                    EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
1600                ) {
1601                    let has_harn = event
1602                        .paths
1603                        .iter()
1604                        .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
1605                    if has_harn {
1606                        let _ = tx.blocking_send(());
1607                    }
1608                }
1609            }
1610        })
1611        .unwrap_or_else(|e| {
1612            eprintln!("Error setting up file watcher: {e}");
1613            process::exit(1);
1614        });
1615        watcher
1616            .watch(watch_dir, RecursiveMode::Recursive)
1617            .unwrap_or_else(|e| {
1618                eprintln!("Error watching directory: {e}");
1619                process::exit(1);
1620            });
1621        watcher // keep alive
1622    };
1623
1624    eprintln!(
1625        "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
1626        watch_dir.display()
1627    );
1628
1629    loop {
1630        rx.recv().await;
1631        // Debounce: let bursts of events settle for 200ms before re-running.
1632        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1633        while rx.try_recv().is_ok() {}
1634
1635        eprintln!();
1636        eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
1637        run_file(
1638            path,
1639            false,
1640            denied_builtins.clone(),
1641            Vec::new(),
1642            CliLlmMockMode::Off,
1643            None,
1644            RunProfileOptions::default(),
1645        )
1646        .await;
1647    }
1648}
1649
1650#[cfg(test)]
1651mod tests {
1652    use super::{
1653        execute_explain_cost, execute_run, split_eval_header, CliLlmMockMode, RunProfileOptions,
1654        StdoutPassthroughGuard,
1655    };
1656    use std::collections::HashSet;
1657
1658    #[test]
1659    fn split_eval_header_no_imports_returns_full_body() {
1660        let (header, body) = split_eval_header("println(1 + 2)");
1661        assert_eq!(header, "");
1662        assert_eq!(body, "println(1 + 2)");
1663    }
1664
1665    #[test]
1666    fn split_eval_header_lifts_leading_imports() {
1667        let code = "import \"./lib\"\nimport { x } from \"std/math\"\nprintln(x)";
1668        let (header, body) = split_eval_header(code);
1669        assert_eq!(header, "import \"./lib\"\nimport { x } from \"std/math\"");
1670        assert_eq!(body, "println(x)");
1671    }
1672
1673    #[test]
1674    fn split_eval_header_keeps_pub_import_and_comments_in_header() {
1675        let code = "// header comment\npub import { y } from \"./lib\"\n\nfoo()";
1676        let (header, body) = split_eval_header(code);
1677        assert_eq!(
1678            header,
1679            "// header comment\npub import { y } from \"./lib\"\n"
1680        );
1681        assert_eq!(body, "foo()");
1682    }
1683
1684    #[test]
1685    fn split_eval_header_does_not_lift_imports_after_other_statements() {
1686        let code = "let a = 1\nimport \"./lib\"";
1687        let (header, body) = split_eval_header(code);
1688        assert_eq!(header, "");
1689        assert_eq!(body, "let a = 1\nimport \"./lib\"");
1690    }
1691
1692    #[test]
1693    fn cli_llm_mock_roundtrips_logprobs() {
1694        let mock = harn_vm::llm::parse_llm_mock_value(&serde_json::json!({
1695            "text": "visible",
1696            "logprobs": [{"token": "visible", "logprob": 0.0}]
1697        }))
1698        .expect("parse mock");
1699        assert_eq!(mock.logprobs.len(), 1);
1700
1701        let line = harn_vm::llm::serialize_llm_mock(mock).expect("serialize mock");
1702        let value: serde_json::Value = serde_json::from_str(&line).expect("json line");
1703        assert_eq!(value["logprobs"][0]["token"].as_str(), Some("visible"));
1704
1705        let reparsed = harn_vm::llm::parse_llm_mock_value(&value).expect("reparse mock");
1706        assert_eq!(reparsed.logprobs.len(), 1);
1707        assert_eq!(reparsed.logprobs[0]["logprob"].as_f64(), Some(0.0));
1708    }
1709
1710    #[test]
1711    fn stdout_passthrough_guard_restores_previous_state() {
1712        let original = harn_vm::set_stdout_passthrough(false);
1713        {
1714            let _guard = StdoutPassthroughGuard::enable();
1715            assert!(harn_vm::set_stdout_passthrough(true));
1716        }
1717        assert!(!harn_vm::set_stdout_passthrough(original));
1718    }
1719
1720    #[test]
1721    fn execute_explain_cost_does_not_execute_script() {
1722        let temp = tempfile::TempDir::new().expect("temp dir");
1723        let script = temp.path().join("main.harn");
1724        std::fs::write(
1725            &script,
1726            r#"
1727pipeline main() {
1728  write_file("executed.txt", "bad")
1729  llm_call("hello", nil, {provider: "mock", model: "mock"})
1730}
1731"#,
1732        )
1733        .expect("write script");
1734
1735        let outcome = execute_explain_cost(&script.to_string_lossy());
1736
1737        assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1738        assert!(outcome.stdout.contains("LLM cost estimate"));
1739        assert!(
1740            !temp.path().join("executed.txt").exists(),
1741            "--explain-cost must not execute pipeline side effects"
1742        );
1743    }
1744
1745    #[cfg(feature = "hostlib")]
1746    #[tokio::test]
1747    async fn execute_run_installs_hostlib_gate() {
1748        let temp = tempfile::NamedTempFile::new().expect("temp file");
1749        std::fs::write(
1750            temp.path(),
1751            r#"
1752pipeline main() {
1753  let _ = hostlib_enable("tools:deterministic")
1754  println("enabled")
1755}
1756"#,
1757        )
1758        .expect("write script");
1759
1760        let outcome = execute_run(
1761            &temp.path().to_string_lossy(),
1762            false,
1763            HashSet::new(),
1764            Vec::new(),
1765            Vec::new(),
1766            CliLlmMockMode::Off,
1767            None,
1768            RunProfileOptions::default(),
1769        )
1770        .await;
1771
1772        assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1773        assert_eq!(outcome.stdout.trim(), "enabled");
1774    }
1775
1776    #[cfg(all(feature = "hostlib", unix))]
1777    #[tokio::test]
1778    async fn execute_run_can_read_hostlib_command_artifacts() {
1779        let temp = tempfile::NamedTempFile::new().expect("temp file");
1780        std::fs::write(
1781            temp.path(),
1782            r#"
1783pipeline main() {
1784  let _ = hostlib_enable("tools:deterministic")
1785  let result = hostlib_tools_run_command({
1786    argv: ["sh", "-c", "i=0; while [ $i -lt 2000 ]; do printf x; i=$((i+1)); done"],
1787    capture: {max_inline_bytes: 8},
1788    timeout_ms: 5000,
1789  })
1790  println(starts_with(result.command_id, "cmd_"))
1791  println(len(result.stdout))
1792  println(result.byte_count)
1793  let window = hostlib_tools_read_command_output({
1794    command_id: result.command_id,
1795    offset: 1990,
1796    length: 20,
1797  })
1798  println(len(window.content))
1799  println(window.eof)
1800}
1801"#,
1802        )
1803        .expect("write script");
1804
1805        let outcome = execute_run(
1806            &temp.path().to_string_lossy(),
1807            false,
1808            HashSet::new(),
1809            Vec::new(),
1810            Vec::new(),
1811            CliLlmMockMode::Off,
1812            None,
1813            RunProfileOptions::default(),
1814        )
1815        .await;
1816
1817        assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1818        assert_eq!(outcome.stdout.trim(), "true\n8\n2000\n10\ntrue");
1819    }
1820}