1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use harn_parser::DiagnosticSeverity;
11use harn_vm::event_log::EventLog;
12use serde::Serialize;
13
14use crate::commands::mcp::{self, AuthResolution};
15use crate::commands::time::{self, PhaseRecord, RunTiming};
16use crate::package;
17use crate::parse_source_file;
18use crate::skill_loader::{
19 canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
20 SkillLoaderInputs,
21};
22
23mod explain_cost;
24pub mod harnpack;
25pub mod json_events;
26
27use self::harnpack::{HarnpackError, HarnpackRunOptions, PreparedHarnpack};
28use self::json_events::NdjsonEmitter;
29
30#[derive(Clone, Default)]
32pub struct RunJsonOptions {
33 pub quiet: bool,
36}
37
38#[derive(Clone, Debug)]
40pub struct RunSummaryOptions {
41 pub sink: RunJsonSink,
42}
43
44#[derive(Clone, Debug)]
45pub struct RunPhaseOptions {
46 pub sink: RunJsonSink,
47}
48
49#[derive(Clone, Debug)]
50pub struct RunRusageOptions {
51 pub sink: RunJsonSink,
52}
53
54#[derive(Clone, Debug, Default)]
55pub struct RunAuxOptions {
56 pub summary: Option<RunSummaryOptions>,
57 pub phase: Option<RunPhaseOptions>,
58 pub rusage: Option<RunRusageOptions>,
59}
60
61#[derive(Clone, Debug)]
62pub struct RunJsonSink {
63 pub target: RunJsonSinkTarget,
64 pub fd_flag: &'static str,
65}
66
67#[derive(Clone, Debug)]
68pub enum RunJsonSinkTarget {
69 Stderr,
73 File(PathBuf),
74 Fd(i32),
75}
76
77#[derive(Serialize)]
78struct RunSummary<'a> {
79 schema_version: u32,
80 event: &'static str,
81 wall_time_ms: u64,
82 exit_code: i32,
83 llm: RunSummaryLlm,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 profile: Option<&'a harn_vm::profile::RunProfile>,
86}
87
88#[derive(Serialize)]
89struct RunSummaryLlm {
90 call_count: i64,
91 input_tokens: i64,
92 output_tokens: i64,
93 time_ms: i64,
94 cost_usd: f64,
95}
96
97pub const RUN_SUMMARY_SCHEMA_VERSION: u32 = 1;
98pub const RUN_PHASE_SCHEMA_VERSION: u32 = 1;
99pub const RUN_RUSAGE_SCHEMA_VERSION: u32 = 1;
100
101#[derive(Serialize)]
102struct RunPhaseEvent {
103 schema_version: u32,
104 event: &'static str,
105 phases: Vec<PhaseRecord>,
106}
107
108#[derive(Serialize)]
109struct RunRusageEvent {
110 schema_version: u32,
111 event: &'static str,
112 cpu_ms: u64,
113}
114
115pub(crate) fn run_summary_options_from_args(
116 args: &crate::cli::RunArgs,
117) -> Option<RunSummaryOptions> {
118 args.emit_summary_json.then(|| RunSummaryOptions {
119 sink: build_run_json_sink(args.summary_file.clone(), args.summary_fd, "--summary-fd"),
120 })
121}
122
123pub(crate) fn run_aux_options_from_args(args: &crate::cli::RunArgs) -> RunAuxOptions {
124 RunAuxOptions {
125 summary: run_summary_options_from_args(args),
126 phase: run_phase_options_from_args(args),
127 rusage: run_rusage_options_from_args(args),
128 }
129}
130
131pub(crate) fn run_phase_options_from_args(args: &crate::cli::RunArgs) -> Option<RunPhaseOptions> {
132 args.emit_phase_json.then(|| RunPhaseOptions {
133 sink: build_run_json_sink(args.phase_file.clone(), args.phase_fd, "--phase-fd"),
134 })
135}
136
137pub(crate) fn run_rusage_options_from_args(args: &crate::cli::RunArgs) -> Option<RunRusageOptions> {
138 args.emit_rusage_json.then(|| RunRusageOptions {
139 sink: build_run_json_sink(args.rusage_file.clone(), args.rusage_fd, "--rusage-fd"),
140 })
141}
142
143fn build_run_json_sink(
144 file: Option<PathBuf>,
145 fd: Option<i32>,
146 fd_flag: &'static str,
147) -> RunJsonSink {
148 RunJsonSink {
149 target: if let Some(path) = file {
150 RunJsonSinkTarget::File(path)
151 } else if let Some(fd) = fd {
152 RunJsonSinkTarget::Fd(fd)
153 } else {
154 RunJsonSinkTarget::Stderr
155 },
156 fd_flag,
157 }
158}
159
160pub(crate) enum RunFileMcpServeMode {
161 Stdio,
162 Http(Box<RunFileMcpServeHttp>),
163}
164
165pub(crate) struct RunFileMcpServeHttp {
166 pub options: harn_serve::McpHttpServeOptions,
167 pub auth_policy: harn_serve::AuthPolicy,
168}
169
170const CORE_BUILTINS: &[&str] = &[
172 "println",
173 "print",
174 "log",
175 "type_of",
176 "to_string",
177 "to_int",
178 "to_float",
179 "len",
180 "assert",
181 "assert_eq",
182 "assert_ne",
183 "json_parse",
184 "json_stringify",
185 "runtime_context",
186 "task_current",
187 "runtime_context_values",
188 "runtime_context_get",
189 "runtime_context_set",
190 "runtime_context_clear",
191];
192
193pub(crate) fn build_denied_builtins(
198 deny_csv: Option<&str>,
199 allow_csv: Option<&str>,
200) -> HashSet<String> {
201 if let Some(csv) = deny_csv {
202 csv.split(',')
203 .map(|s| s.trim().to_string())
204 .filter(|s| !s.is_empty())
205 .collect()
206 } else if let Some(csv) = allow_csv {
207 let allowed: HashSet<String> = csv
210 .split(',')
211 .map(|s| s.trim().to_string())
212 .filter(|s| !s.is_empty())
213 .collect();
214 let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
215
216 let mut tmp = harn_vm::Vm::new();
218 harn_vm::register_vm_stdlib(&mut tmp);
219 harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
220 harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
221
222 tmp.builtin_names()
223 .into_iter()
224 .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
225 .collect()
226 } else {
227 HashSet::new()
228 }
229}
230
231pub(crate) struct LoadedChunk {
235 pub(crate) source: String,
236 pub(crate) chunk: harn_vm::Chunk,
237}
238
239pub(crate) fn compile_or_load_chunk_for_run(
251 path: &str,
252 stderr: &mut String,
253) -> Option<LoadedChunk> {
254 compile_or_load_chunk_with_timing(path, stderr, None)
255}
256
257#[allow(clippy::needless_option_as_deref)]
267pub(crate) fn compile_or_load_chunk_with_timing(
268 path: &str,
269 stderr: &mut String,
270 mut timing: Option<&mut RunTiming>,
271) -> Option<LoadedChunk> {
272 let source = match fs::read_to_string(path) {
273 Ok(s) => s,
274 Err(e) => {
275 stderr.push_str(&format!("Error reading {path}: {e}\n"));
276 return None;
277 }
278 };
279 if let Some(t) = timing.as_deref_mut() {
280 t.input_bytes = source.len() as u64;
281 }
282
283 let compile_phase_start = Instant::now();
284 let lookup = harn_vm::bytecode_cache::load(Path::new(path), &source);
285 if let Some(chunk) = lookup.chunk {
286 if let Some(t) = timing.as_deref_mut() {
287 t.cache_hit = true;
288 t.bytecode_compile = compile_phase_start.elapsed();
289 }
290 return Some(LoadedChunk { source, chunk });
291 }
292 if let Some(t) = timing.as_deref_mut() {
293 t.cache_hit = false;
294 }
295
296 let parse_start = Instant::now();
297 let program = parse_source_for_run(path, &source, stderr)?;
298 if let Some(t) = timing.as_deref_mut() {
299 t.parse = parse_start.elapsed();
300 }
301
302 let typecheck_start = Instant::now();
303 let mut had_type_error = false;
304 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
305 for diag in &type_diagnostics {
306 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
307 if matches!(diag.severity, DiagnosticSeverity::Error) {
308 had_type_error = true;
309 }
310 stderr.push_str(&rendered);
311 }
312 if let Some(t) = timing.as_deref_mut() {
313 t.typecheck = typecheck_start.elapsed();
314 }
315 if had_type_error {
316 return None;
317 }
318
319 let compile_step_start = Instant::now();
320 let chunk = match harn_vm::Compiler::new().compile(&program) {
321 Ok(c) => c,
322 Err(e) => {
323 stderr.push_str(&format!("error: compile error: {e}\n"));
324 return None;
325 }
326 };
327
328 if let Err(err) = harn_vm::bytecode_cache::store(&lookup.key, &chunk) {
333 if std::env::var_os("HARN_BYTECODE_CACHE_DEBUG").is_some() {
334 eprintln!("[harn] bytecode cache write skipped: {err}");
335 }
336 }
337 if let Some(t) = timing.as_deref_mut() {
338 t.bytecode_compile = compile_step_start.elapsed();
339 }
340
341 Some(LoadedChunk { source, chunk })
342}
343
344fn parse_source_for_run(
345 path: &str,
346 source: &str,
347 stderr: &mut String,
348) -> Option<Vec<harn_parser::SNode>> {
349 crate::ensure_builtin_signatures_installed();
350
351 let mut lexer = harn_lexer::Lexer::new(source);
352 let tokens = match lexer.tokenize() {
353 Ok(tokens) => tokens,
354 Err(error) => {
355 let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
356 source,
357 path,
358 &error_span_from_lex(&error),
359 "error",
360 harn_parser::diagnostic::lexer_error_code(&error),
361 &error.to_string(),
362 Some("here"),
363 None,
364 );
365 stderr.push_str(&diagnostic);
366 return None;
367 }
368 };
369
370 let mut parser = harn_parser::Parser::new(tokens);
371 match parser.parse() {
372 Ok(program) => Some(program),
373 Err(error) => {
374 if parser.all_errors().is_empty() {
375 render_parse_error(path, source, &error, stderr);
376 } else {
377 for error in parser.all_errors() {
378 render_parse_error(path, source, error, stderr);
379 }
380 }
381 None
382 }
383 }
384}
385
386fn render_parse_error(
387 path: &str,
388 source: &str,
389 error: &harn_parser::ParserError,
390 stderr: &mut String,
391) {
392 let span = error_span_from_parse(error);
393 let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
394 source,
395 path,
396 &span,
397 "error",
398 harn_parser::diagnostic::parser_error_code(error),
399 &harn_parser::diagnostic::parser_error_message(error),
400 Some(harn_parser::diagnostic::parser_error_label(error)),
401 harn_parser::diagnostic::parser_error_help(error),
402 );
403 stderr.push_str(&diagnostic);
404}
405
406fn error_span_from_lex(error: &harn_lexer::LexerError) -> harn_lexer::Span {
407 match error {
408 harn_lexer::LexerError::UnexpectedCharacter(_, span)
409 | harn_lexer::LexerError::UnterminatedString(span)
410 | harn_lexer::LexerError::UnterminatedBlockComment(span) => *span,
411 }
412}
413
414fn error_span_from_parse(error: &harn_parser::ParserError) -> harn_lexer::Span {
415 match error {
416 harn_parser::ParserError::Unexpected { span, .. } => *span,
417 harn_parser::ParserError::UnexpectedEof { span, .. } => *span,
418 }
419}
420
421fn typecheck_with_imports(
426 program: &[harn_parser::SNode],
427 path: &Path,
428 source: &str,
429) -> Vec<harn_parser::TypeDiagnostic> {
430 if let Err(error) = package::ensure_dependencies_materialized(path) {
431 eprintln!("error: {error}");
432 process::exit(1);
433 }
434 let graph = harn_modules::build(&[path.to_path_buf()]);
435 let mut checker = harn_parser::TypeChecker::new();
436 if let Some(imported) = graph.imported_names_for_file(path) {
437 checker = checker.with_imported_names(imported);
438 }
439 if let Some(imported) = graph.imported_type_declarations_for_file(path) {
440 checker = checker.with_imported_type_decls(imported);
441 }
442 if let Some(imported) = graph.imported_callable_declarations_for_file(path) {
443 checker = checker.with_imported_callable_decls(imported);
444 }
445 checker.check_with_source(program, source)
446}
447
448pub(crate) fn prepare_eval_temp_file(
459 code: &str,
460) -> Result<(String, tempfile::NamedTempFile), String> {
461 let (header, body) = split_eval_header(code);
462 let wrapped = if header.is_empty() {
463 format!("pipeline main(task) {{\n{body}\n}}")
464 } else {
465 format!("{header}\npipeline main(task) {{\n{body}\n}}")
466 };
467
468 let tmp = create_eval_temp_file()?;
469 Ok((wrapped, tmp))
470}
471
472fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
477 if let Some(dir) = std::env::current_dir().ok().as_deref() {
478 match tempfile::Builder::new()
481 .prefix(".harn-eval-")
482 .suffix(".harn")
483 .tempfile_in(dir)
484 {
485 Ok(tmp) => return Ok(tmp),
486 Err(error) => eprintln!(
487 "warning: harn run -e: could not create temp file in {}: {error}; \
488 relative imports will not resolve",
489 dir.display()
490 ),
491 }
492 }
493 tempfile::Builder::new()
494 .prefix("harn-eval-")
495 .suffix(".harn")
496 .tempfile()
497 .map_err(|e| format!("failed to create temp file for -e: {e}"))
498}
499
500fn split_eval_header(code: &str) -> (String, String) {
508 let mut header_end = 0usize;
509 let mut last_kept = 0usize;
510 for (idx, line) in code.lines().enumerate() {
511 let trimmed = line.trim_start();
512 if trimmed.is_empty() || trimmed.starts_with("//") {
513 header_end = idx + 1;
514 continue;
515 }
516 let is_import = trimmed.starts_with("import ")
517 || trimmed.starts_with("import\t")
518 || trimmed.starts_with("import\"")
519 || trimmed.starts_with("pub import ")
520 || trimmed.starts_with("pub import\t");
521 if is_import {
522 header_end = idx + 1;
523 last_kept = idx + 1;
524 } else {
525 break;
526 }
527 }
528 if last_kept == 0 {
529 return (String::new(), code.to_string());
530 }
531 let mut header_lines: Vec<&str> = Vec::new();
532 let mut body_lines: Vec<&str> = Vec::new();
533 for (idx, line) in code.lines().enumerate() {
534 if idx < header_end {
535 header_lines.push(line);
536 } else {
537 body_lines.push(line);
538 }
539 }
540 (header_lines.join("\n"), body_lines.join("\n"))
541}
542
543#[derive(Clone, Debug, Default, PartialEq, Eq)]
544pub enum CliLlmMockMode {
545 #[default]
546 Off,
547 Replay {
548 fixture_path: PathBuf,
549 },
550 Record {
551 fixture_path: PathBuf,
552 },
553}
554
555#[derive(Clone, Debug, Default, PartialEq, Eq)]
556pub struct RunAttestationOptions {
557 pub receipt_out: Option<PathBuf>,
558 pub agent_id: Option<String>,
559}
560
561#[derive(Clone, Debug, Default, PartialEq, Eq)]
566pub struct RunProfileOptions {
567 pub text: bool,
568 pub json_path: Option<PathBuf>,
569}
570
571impl RunProfileOptions {
572 pub fn is_enabled(&self) -> bool {
573 self.text || self.json_path.is_some()
574 }
575}
576
577#[derive(Clone, Debug, PartialEq, Eq)]
578pub struct RunSandboxOptions {
579 pub enabled: bool,
581 pub workspace_root: Option<PathBuf>,
585}
586
587impl Default for RunSandboxOptions {
588 fn default() -> Self {
589 Self {
590 enabled: true,
591 workspace_root: None,
592 }
593 }
594}
595
596impl RunSandboxOptions {
597 pub fn disabled() -> Self {
599 Self {
600 enabled: false,
601 workspace_root: None,
602 }
603 }
604
605 pub fn with_workspace_root(mut self, workspace_root: impl Into<PathBuf>) -> Self {
607 self.workspace_root = Some(workspace_root.into());
608 self
609 }
610}
611
612#[derive(Clone)]
613pub struct RunInterruptTokens {
614 pub cancel_token: Arc<AtomicBool>,
615 pub signal_token: Arc<Mutex<Option<String>>>,
616}
617
618struct ExecuteRunInputs<'a> {
619 path: &'a str,
620 trace: bool,
621 denied_builtins: HashSet<String>,
622 script_argv: Vec<String>,
623 skill_dirs_raw: Vec<String>,
624 llm_mock_mode: CliLlmMockMode,
625 attestation: Option<RunAttestationOptions>,
626 profile: RunProfileOptions,
627 sandbox: RunSandboxOptions,
628 interrupt_tokens: Option<RunInterruptTokens>,
629 json: Option<(RunJsonOptions, Box<dyn io::Write + Send>)>,
630 aux: RunAuxOptions,
631 timing: Option<&'a mut RunTiming>,
632 harnpack: HarnpackRunOptions,
633}
634
635#[derive(Clone, Debug, Default)]
639pub struct RunOutcome {
640 pub stdout: String,
641 pub stderr: String,
642 pub exit_code: i32,
643}
644
645pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
646 harn_vm::llm::clear_cli_llm_mock_mode();
647 match mode {
648 CliLlmMockMode::Off => Ok(()),
649 CliLlmMockMode::Replay { fixture_path } => {
650 let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
651 harn_vm::llm::install_cli_llm_mocks(mocks);
652 Ok(())
653 }
654 CliLlmMockMode::Record { .. } => {
655 harn_vm::llm::enable_cli_llm_mock_recording();
656 Ok(())
657 }
658 }
659}
660
661pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
662 let CliLlmMockMode::Record { fixture_path } = mode else {
663 return Ok(());
664 };
665 if let Some(parent) = fixture_path.parent() {
666 if !parent.as_os_str().is_empty() {
667 fs::create_dir_all(parent).map_err(|error| {
668 format!(
669 "failed to create fixture directory {}: {error}",
670 parent.display()
671 )
672 })?;
673 }
674 }
675
676 let lines = harn_vm::llm::take_cli_llm_recordings()
677 .into_iter()
678 .map(harn_vm::llm::serialize_llm_mock)
679 .collect::<Result<Vec<_>, _>>()?;
680 let body = if lines.is_empty() {
681 String::new()
682 } else {
683 format!("{}\n", lines.join("\n"))
684 };
685 fs::write(fixture_path, body)
686 .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
687}
688
689pub(crate) async fn run_file(
690 path: &str,
691 trace: bool,
692 denied_builtins: HashSet<String>,
693 script_argv: Vec<String>,
694 llm_mock_mode: CliLlmMockMode,
695 attestation: Option<RunAttestationOptions>,
696 profile: RunProfileOptions,
697) {
698 run_file_with_skill_dirs(
699 path,
700 trace,
701 denied_builtins,
702 script_argv,
703 Vec::new(),
704 llm_mock_mode,
705 attestation,
706 profile,
707 RunSandboxOptions::default(),
708 None,
709 RunAuxOptions::default(),
710 HarnpackRunOptions::default(),
711 )
712 .await;
713}
714
715pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
716 let outcome = execute_explain_cost(path);
717 if !outcome.stderr.is_empty() {
718 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
719 }
720 if !outcome.stdout.is_empty() {
721 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
722 }
723 if outcome.exit_code != 0 {
724 process::exit(outcome.exit_code);
725 }
726}
727
728#[allow(clippy::too_many_arguments)]
729pub(crate) async fn run_file_with_skill_dirs(
730 path: &str,
731 trace: bool,
732 denied_builtins: HashSet<String>,
733 script_argv: Vec<String>,
734 skill_dirs_raw: Vec<String>,
735 llm_mock_mode: CliLlmMockMode,
736 attestation: Option<RunAttestationOptions>,
737 profile: RunProfileOptions,
738 sandbox: RunSandboxOptions,
739 json: Option<RunJsonOptions>,
740 aux: RunAuxOptions,
741 harnpack: HarnpackRunOptions,
742) {
743 let interrupt_tokens = install_signal_shutdown_handler();
745
746 let _stdout_passthrough = StdoutPassthroughGuard::enable();
747 let json_with_stdout =
748 json.map(|opts| (opts, Box::new(io::stdout()) as Box<dyn io::Write + Send>));
749 let outcome = execute_run_inner(ExecuteRunInputs {
750 path,
751 trace,
752 denied_builtins,
753 script_argv,
754 skill_dirs_raw,
755 llm_mock_mode,
756 attestation,
757 profile,
758 sandbox,
759 interrupt_tokens: Some(interrupt_tokens.clone()),
760 json: json_with_stdout,
761 aux,
762 timing: None,
763 harnpack,
764 })
765 .await;
766
767 if !outcome.stderr.is_empty() {
770 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
771 }
772 if !outcome.stdout.is_empty() {
773 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
774 }
775
776 let mut exit_code = outcome.exit_code;
777 if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
778 exit_code = 124;
779 }
780 if exit_code != 0 {
781 process::exit(exit_code);
782 }
783}
784
785#[allow(clippy::too_many_arguments)]
786pub(crate) async fn run_resume_with_skill_dirs(
787 target: &str,
788 trace: bool,
789 denied_builtins: HashSet<String>,
790 resume_argv: Vec<String>,
791 skill_dirs_raw: Vec<String>,
792 llm_mock_mode: CliLlmMockMode,
793 attestation: Option<RunAttestationOptions>,
794 profile: RunProfileOptions,
795 sandbox: RunSandboxOptions,
796 json: Option<RunJsonOptions>,
797 aux: RunAuxOptions,
798) {
799 let source = r#"import { resume_agent, wait_agent } from "std/agent/workers"
800
801pipeline main(task) {
802 let input = if len(argv) > 1 {
803 argv[1]
804 } else {
805 nil
806 }
807 let handle = resume_agent(argv[0], input, true)
808 return wait_agent(handle)
809}
810"#;
811 let tmp = create_eval_temp_file().unwrap_or_else(|e| {
812 eprintln!("error: {e}");
813 process::exit(1);
814 });
815 let tmp_path = tmp.path().to_path_buf();
816 if let Err(error) = fs::write(&tmp_path, source) {
817 eprintln!("error: failed to write temp file for --resume: {error}");
818 process::exit(1);
819 }
820 let mut argv = Vec::with_capacity(resume_argv.len() + 1);
821 argv.push(target.to_string());
822 argv.extend(resume_argv);
823 let tmp_str = tmp_path.to_string_lossy().into_owned();
824 run_file_with_skill_dirs(
825 &tmp_str,
826 trace,
827 denied_builtins,
828 argv,
829 skill_dirs_raw,
830 llm_mock_mode,
831 attestation,
832 profile,
833 sandbox,
834 json,
835 aux,
836 HarnpackRunOptions::default(),
837 )
838 .await;
839}
840
841pub fn execute_explain_cost(path: &str) -> RunOutcome {
842 let stdout = String::new();
843 let mut stderr = String::new();
844
845 let (source, program) = parse_source_file(path);
846
847 let mut had_type_error = false;
848 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
849 for diag in &type_diagnostics {
850 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
851 if matches!(diag.severity, DiagnosticSeverity::Error) {
852 had_type_error = true;
853 }
854 stderr.push_str(&rendered);
855 }
856 if had_type_error {
857 return RunOutcome {
858 stdout,
859 stderr,
860 exit_code: 1,
861 };
862 }
863
864 let extensions = package::load_runtime_extensions(Path::new(path));
865 package::install_runtime_extensions(&extensions);
866 RunOutcome {
867 stdout: explain_cost::render_explain_cost(path, &program),
868 stderr,
869 exit_code: 0,
870 }
871}
872
873pub(crate) struct StdoutPassthroughGuard {
874 previous: bool,
875}
876
877impl StdoutPassthroughGuard {
878 pub(crate) fn enable() -> Self {
879 Self {
880 previous: harn_vm::set_stdout_passthrough(true),
881 }
882 }
883}
884
885impl Drop for StdoutPassthroughGuard {
886 fn drop(&mut self) {
887 harn_vm::set_stdout_passthrough(self.previous);
888 }
889}
890
891struct ExecutionPolicyGuard;
892
893impl Drop for ExecutionPolicyGuard {
894 fn drop(&mut self) {
895 harn_vm::orchestration::pop_execution_policy();
896 }
897}
898
899struct RunSandboxScope {
900 _execution_policy: Option<ExecutionPolicyGuard>,
901 _egress_policy: Option<harn_vm::egress::ExplicitEgressPolicyGuard>,
902}
903
904impl RunSandboxScope {
905 fn disabled() -> Self {
906 Self {
907 _execution_policy: None,
908 _egress_policy: None,
909 }
910 }
911}
912
913fn install_run_sandbox_scope(
914 options: &RunSandboxOptions,
915 workspace_root: &Path,
916 stderr: &mut String,
917) -> RunSandboxScope {
918 if !options.enabled {
919 stderr.push_str(
920 "warning: harn run --no-sandbox disables filesystem, process, and egress sandbox defaults\n",
921 );
922 return RunSandboxScope::disabled();
923 }
924
925 let execution_policy = if harn_vm::orchestration::current_execution_policy().is_none() {
926 harn_vm::orchestration::push_execution_policy(default_run_capability_policy(
927 workspace_root,
928 ));
929 Some(ExecutionPolicyGuard)
930 } else {
931 None
932 };
933 let egress_policy = Some(harn_vm::egress::require_explicit_egress_policy_for_host());
934
935 RunSandboxScope {
936 _execution_policy: execution_policy,
937 _egress_policy: egress_policy,
938 }
939}
940
941fn default_run_capability_policy(
942 workspace_root: &Path,
943) -> harn_vm::orchestration::CapabilityPolicy {
944 harn_vm::orchestration::CapabilityPolicy {
945 workspace_roots: vec![normalize_run_workspace_root(workspace_root)
946 .display()
947 .to_string()],
948 side_effect_level: Some("process_exec".to_string()),
949 sandbox_profile: harn_vm::orchestration::SandboxProfile::Worktree,
950 ..harn_vm::orchestration::CapabilityPolicy::default()
951 }
952}
953
954fn normalize_run_workspace_root(path: &Path) -> PathBuf {
955 if path.is_absolute() {
956 return path.to_path_buf();
957 }
958 std::env::current_dir()
959 .map(|cwd| cwd.join(path))
960 .unwrap_or_else(|_| path.to_path_buf())
961}
962
963fn default_run_workspace_root(project_root: Option<&Path>, source_parent: &Path) -> PathBuf {
964 project_root
965 .map(Path::to_path_buf)
966 .or_else(|| std::env::current_dir().ok())
967 .unwrap_or_else(|| source_parent.to_path_buf())
968}
969
970fn run_sandbox_attestation(sandbox: &RunSandboxOptions) -> serde_json::Value {
971 let active_policy = harn_vm::orchestration::current_execution_policy();
972 let active = active_policy.is_some();
973 let workspace_roots = active_policy
974 .as_ref()
975 .map(|policy| policy.workspace_roots.clone())
976 .unwrap_or_default();
977 let profile = active_policy
978 .as_ref()
979 .map(|policy| policy.sandbox_profile.as_str())
980 .unwrap_or("unrestricted");
981 let egress = if sandbox.enabled {
982 "explicit_policy_required"
983 } else if active {
984 "host_policy"
985 } else {
986 "unrestricted"
987 };
988
989 serde_json::json!({
990 "run_default_enabled": sandbox.enabled,
991 "active": active,
992 "workspace_roots": workspace_roots,
993 "profile": profile,
994 "egress": egress,
995 })
996}
997
998const FIRST_SIGNAL_MESSAGE: &str =
1007 "[harn] signal received, interrupting VM (give it a moment to unwind in-flight async ops; Ctrl-C again to force-exit)...";
1008
1009fn install_signal_shutdown_handler() -> RunInterruptTokens {
1010 let tokens = RunInterruptTokens {
1011 cancel_token: Arc::new(AtomicBool::new(false)),
1012 signal_token: Arc::new(Mutex::new(None)),
1013 };
1014 let tokens_clone = tokens.clone();
1015 tokio::spawn(async move {
1016 #[cfg(unix)]
1017 {
1018 use tokio::signal::unix::{signal, SignalKind};
1019 let sigterm_stream = signal(SignalKind::terminate()).ok();
1024 let sigint_stream = signal(SignalKind::interrupt()).ok();
1025 let sighup_stream = signal(SignalKind::hangup()).ok();
1026 let (mut sigterm, mut sigint, mut sighup) =
1027 match (sigterm_stream, sigint_stream, sighup_stream) {
1028 (Some(t), Some(i), Some(h)) => (t, i, h),
1029 _ => {
1030 eprintln!(
1031 "[harn] signal handlers unavailable in this environment; \
1032 continuing without graceful-shutdown interception"
1033 );
1034 return;
1035 }
1036 };
1037 let mut seen_signal = false;
1038 loop {
1039 let signal_name = tokio::select! {
1040 _ = sigterm.recv() => "SIGTERM",
1041 _ = sigint.recv() => "SIGINT",
1042 _ = sighup.recv() => "SIGHUP",
1043 };
1044 if seen_signal {
1045 eprintln!("[harn] second signal received, terminating");
1046 process::exit(124);
1047 }
1048 seen_signal = true;
1049 request_vm_interrupt(&tokens_clone, signal_name);
1050 eprintln!("{FIRST_SIGNAL_MESSAGE}");
1051 }
1052 }
1053 #[cfg(not(unix))]
1054 {
1055 let mut seen_signal = false;
1056 loop {
1057 let _ = tokio::signal::ctrl_c().await;
1058 if seen_signal {
1059 eprintln!("[harn] second signal received, terminating");
1060 process::exit(124);
1061 }
1062 seen_signal = true;
1063 request_vm_interrupt(&tokens_clone, "SIGINT");
1064 eprintln!("{FIRST_SIGNAL_MESSAGE}");
1065 }
1066 }
1067 });
1068 tokens
1069}
1070
1071fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
1072 if let Ok(mut signal) = tokens.signal_token.lock() {
1073 *signal = Some(signal_name.to_string());
1074 }
1075 tokens.cancel_token.store(true, Ordering::SeqCst);
1076}
1077
1078pub async fn execute_run(
1084 path: &str,
1085 trace: bool,
1086 denied_builtins: HashSet<String>,
1087 script_argv: Vec<String>,
1088 skill_dirs_raw: Vec<String>,
1089 llm_mock_mode: CliLlmMockMode,
1090 attestation: Option<RunAttestationOptions>,
1091 profile: RunProfileOptions,
1092) -> RunOutcome {
1093 crate::ensure_builtin_signatures_installed();
1094 execute_run_with_harnpack_and_sandbox_options(
1095 path,
1096 trace,
1097 denied_builtins,
1098 script_argv,
1099 skill_dirs_raw,
1100 llm_mock_mode,
1101 attestation,
1102 profile,
1103 RunSandboxOptions::default(),
1104 HarnpackRunOptions::default(),
1105 )
1106 .await
1107}
1108
1109#[allow(clippy::too_many_arguments)]
1113pub async fn execute_run_with_sandbox_options(
1114 path: &str,
1115 trace: bool,
1116 denied_builtins: HashSet<String>,
1117 script_argv: Vec<String>,
1118 skill_dirs_raw: Vec<String>,
1119 llm_mock_mode: CliLlmMockMode,
1120 attestation: Option<RunAttestationOptions>,
1121 profile: RunProfileOptions,
1122 sandbox: RunSandboxOptions,
1123) -> RunOutcome {
1124 execute_run_with_harnpack_and_sandbox_options(
1125 path,
1126 trace,
1127 denied_builtins,
1128 script_argv,
1129 skill_dirs_raw,
1130 llm_mock_mode,
1131 attestation,
1132 profile,
1133 sandbox,
1134 HarnpackRunOptions::default(),
1135 )
1136 .await
1137}
1138
1139#[allow(clippy::too_many_arguments)]
1144pub async fn execute_run_with_harnpack_options(
1145 path: &str,
1146 trace: bool,
1147 denied_builtins: HashSet<String>,
1148 script_argv: Vec<String>,
1149 skill_dirs_raw: Vec<String>,
1150 llm_mock_mode: CliLlmMockMode,
1151 attestation: Option<RunAttestationOptions>,
1152 profile: RunProfileOptions,
1153 harnpack: HarnpackRunOptions,
1154) -> RunOutcome {
1155 execute_run_with_harnpack_and_sandbox_options(
1156 path,
1157 trace,
1158 denied_builtins,
1159 script_argv,
1160 skill_dirs_raw,
1161 llm_mock_mode,
1162 attestation,
1163 profile,
1164 RunSandboxOptions::default(),
1165 harnpack,
1166 )
1167 .await
1168}
1169
1170#[allow(clippy::too_many_arguments)]
1171async fn execute_run_with_harnpack_and_sandbox_options(
1172 path: &str,
1173 trace: bool,
1174 denied_builtins: HashSet<String>,
1175 script_argv: Vec<String>,
1176 skill_dirs_raw: Vec<String>,
1177 llm_mock_mode: CliLlmMockMode,
1178 attestation: Option<RunAttestationOptions>,
1179 profile: RunProfileOptions,
1180 sandbox: RunSandboxOptions,
1181 harnpack: HarnpackRunOptions,
1182) -> RunOutcome {
1183 execute_run_inner(ExecuteRunInputs {
1184 path,
1185 trace,
1186 denied_builtins,
1187 script_argv,
1188 skill_dirs_raw,
1189 llm_mock_mode,
1190 attestation,
1191 profile,
1192 sandbox,
1193 interrupt_tokens: None,
1194 json: None,
1195 aux: RunAuxOptions::default(),
1196 timing: None,
1197 harnpack,
1198 })
1199 .await
1200}
1201
1202#[allow(clippy::too_many_arguments)]
1208pub async fn execute_run_json(
1209 path: &str,
1210 trace: bool,
1211 denied_builtins: HashSet<String>,
1212 script_argv: Vec<String>,
1213 skill_dirs_raw: Vec<String>,
1214 llm_mock_mode: CliLlmMockMode,
1215 attestation: Option<RunAttestationOptions>,
1216 profile: RunProfileOptions,
1217 out: Box<dyn io::Write + Send>,
1218 options: RunJsonOptions,
1219) -> RunOutcome {
1220 execute_run_inner(ExecuteRunInputs {
1221 path,
1222 trace,
1223 denied_builtins,
1224 script_argv,
1225 skill_dirs_raw,
1226 llm_mock_mode,
1227 attestation,
1228 profile,
1229 sandbox: RunSandboxOptions::default(),
1230 interrupt_tokens: None,
1231 json: Some((options, out)),
1232 aux: RunAuxOptions::default(),
1233 timing: None,
1234 harnpack: HarnpackRunOptions::default(),
1235 })
1236 .await
1237}
1238
1239pub(crate) async fn execute_run_with_timing(
1243 path: &str,
1244 script_argv: Vec<String>,
1245 timing: Option<&mut RunTiming>,
1246 sandbox: RunSandboxOptions,
1247) -> RunOutcome {
1248 execute_run_inner(ExecuteRunInputs {
1249 path,
1250 trace: false,
1251 denied_builtins: HashSet::new(),
1252 script_argv,
1253 skill_dirs_raw: Vec::new(),
1254 llm_mock_mode: CliLlmMockMode::Off,
1255 attestation: None,
1256 profile: RunProfileOptions::default(),
1257 sandbox,
1258 interrupt_tokens: None,
1259 json: None,
1260 aux: RunAuxOptions::default(),
1261 timing,
1262 harnpack: HarnpackRunOptions::default(),
1263 })
1264 .await
1265}
1266
1267#[allow(clippy::needless_option_as_deref)]
1270async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
1271 let ExecuteRunInputs {
1272 path,
1273 trace,
1274 denied_builtins,
1275 script_argv,
1276 skill_dirs_raw,
1277 llm_mock_mode,
1278 attestation,
1279 profile,
1280 sandbox,
1281 interrupt_tokens,
1282 json,
1283 aux,
1284 timing,
1285 harnpack,
1286 } = inputs;
1287 let RunAuxOptions {
1288 summary,
1289 phase,
1290 rusage,
1291 } = aux;
1292 let run_started = Instant::now();
1293 let cpu_started_ms = rusage.as_ref().map(|_| time::cpu_ms());
1294 let mut owned_timing = if timing.is_none() && (phase.is_some() || rusage.is_some()) {
1295 Some(RunTiming::default())
1296 } else {
1297 None
1298 };
1299 let mut timing = timing.or(owned_timing.as_mut());
1300
1301 let json_session = json.map(|(options, out)| JsonRunSession::install(options, out));
1307
1308 let mut stderr = String::new();
1309 let mut stdout = String::new();
1310
1311 let owned_run_path: String;
1316 let resolved_path: &str = if harnpack::looks_like_harnpack(Path::new(path)) {
1317 let outcome = match harnpack::prepare_harnpack(Path::new(path), &harnpack, &mut stderr) {
1318 Ok(prepared) => prepared,
1319 Err(err) => {
1320 return finalize_harnpack_error(
1321 stderr,
1322 json_session,
1323 summary.as_ref(),
1324 phase.as_ref(),
1325 rusage.as_ref(),
1326 run_started,
1327 err,
1328 );
1329 }
1330 };
1331 harn_vm::run_events::emit(harn_vm::run_events::RunEvent::PackRun {
1332 bundle_hash: outcome.bundle_hash.clone(),
1333 signature_verified: outcome.signature_verified,
1334 key_id: outcome.key_id.clone(),
1335 cache_hit: outcome.cache_hit,
1336 dry_run_verify: harnpack.dry_run_verify,
1337 });
1338 if harnpack.dry_run_verify {
1339 return finalize_harnpack_dry_run(
1340 stderr,
1341 json_session,
1342 summary.as_ref(),
1343 phase.as_ref(),
1344 rusage.as_ref(),
1345 run_started,
1346 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1347 &outcome,
1348 );
1349 }
1350 owned_run_path = outcome.entrypoint_path.to_string_lossy().into_owned();
1351 owned_run_path.as_str()
1352 } else {
1353 path
1354 };
1355
1356 let Some(LoadedChunk { source, chunk }) =
1357 compile_or_load_chunk_with_timing(resolved_path, &mut stderr, timing.as_deref_mut())
1358 else {
1359 let message = stderr.clone();
1360 return finalize_run_error(
1361 stdout,
1362 stderr,
1363 json_session,
1364 summary.as_ref(),
1365 phase.as_ref(),
1366 rusage.as_ref(),
1367 run_started,
1368 None,
1369 timing.as_deref(),
1370 0,
1371 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1372 "compile_error",
1373 message,
1374 );
1375 };
1376 let path = resolved_path;
1377
1378 let setup_start = Instant::now();
1382
1383 if trace || summary.is_some() {
1384 harn_vm::llm::enable_tracing();
1385 }
1386 if profile.is_enabled() || phase.is_some() {
1387 harn_vm::tracing::set_tracing_enabled(true);
1388 }
1389 if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
1390 stderr.push_str(&format!("error: {error}\n"));
1391 return finalize_run_error(
1392 stdout,
1393 stderr,
1394 json_session,
1395 summary.as_ref(),
1396 phase.as_ref(),
1397 rusage.as_ref(),
1398 run_started,
1399 None,
1400 timing.as_deref(),
1401 0,
1402 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1403 "llm_mock_install",
1404 error,
1405 );
1406 }
1407
1408 let mut vm = harn_vm::Vm::new();
1409 if let Some(interrupt_tokens) = interrupt_tokens {
1410 vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
1411 vm.install_cancel_token(interrupt_tokens.cancel_token);
1412 }
1413 harn_vm::register_vm_stdlib(&mut vm);
1414 crate::install_default_hostlib(&mut vm);
1415 let source_parent = std::path::Path::new(path)
1416 .parent()
1417 .unwrap_or(std::path::Path::new("."));
1418 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1420 let store_base = project_root.as_deref().unwrap_or(source_parent);
1421 let sandbox_root = sandbox
1422 .workspace_root
1423 .clone()
1424 .unwrap_or_else(|| default_run_workspace_root(project_root.as_deref(), source_parent));
1425 let _sandbox_scope = install_run_sandbox_scope(&sandbox, &sandbox_root, &mut stderr);
1426 let attestation_started_at_ms = now_ms();
1427 let attestation_log = if attestation.is_some() {
1428 Some(harn_vm::event_log::install_memory_for_current_thread(256))
1429 } else {
1430 None
1431 };
1432 if let Some(log) = attestation_log.as_ref() {
1433 append_run_provenance_event(
1434 log,
1435 "started",
1436 serde_json::json!({
1437 "pipeline": path,
1438 "argv": &script_argv,
1439 "project_root": store_base.display().to_string(),
1440 "sandbox": run_sandbox_attestation(&sandbox),
1441 }),
1442 )
1443 .await;
1444 }
1445 harn_vm::register_store_builtins(&mut vm, store_base);
1446 harn_vm::register_metadata_builtins(&mut vm, store_base);
1447 let pipeline_name = std::path::Path::new(path)
1448 .file_stem()
1449 .and_then(|s| s.to_str())
1450 .unwrap_or("default");
1451 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1452 vm.set_source_info(path, &source);
1453 if !denied_builtins.is_empty() {
1454 vm.set_denied_builtins(denied_builtins);
1455 }
1456 if let Some(ref root) = project_root {
1457 vm.set_project_root(root);
1458 }
1459
1460 if let Some(p) = std::path::Path::new(path).parent() {
1461 if !p.as_os_str().is_empty() {
1462 vm.set_source_dir(p);
1463 }
1464 }
1465
1466 let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
1469 let loaded = load_skills(&SkillLoaderInputs {
1470 cli_dirs,
1471 source_path: Some(std::path::PathBuf::from(path)),
1472 });
1473 emit_loader_warnings(&loaded.loader_warnings);
1474 install_skills_global(&mut vm, &loaded);
1475
1476 let argv_values: Vec<harn_vm::VmValue> = script_argv
1479 .iter()
1480 .map(|s| harn_vm::VmValue::String(std::sync::Arc::from(s.as_str())))
1481 .collect();
1482 vm.set_global(
1483 "argv",
1484 harn_vm::VmValue::List(std::sync::Arc::new(argv_values)),
1485 );
1486
1487 vm.set_harness(harn_vm::Harness::real());
1491
1492 let extensions = package::load_runtime_extensions(Path::new(path));
1493 package::install_runtime_extensions(&extensions);
1494 if let Some(manifest) = extensions.root_manifest.as_ref() {
1495 if !manifest.mcp.is_empty() {
1496 connect_mcp_servers(&manifest.mcp, &mut vm).await;
1497 }
1498 }
1499 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1500 stderr.push_str(&format!(
1501 "error: failed to install manifest triggers: {error}\n"
1502 ));
1503 return finalize_run_error(
1504 stdout,
1505 stderr,
1506 json_session,
1507 summary.as_ref(),
1508 phase.as_ref(),
1509 rusage.as_ref(),
1510 run_started,
1511 None,
1512 timing.as_deref(),
1513 0,
1514 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1515 "manifest_triggers",
1516 error.to_string(),
1517 );
1518 }
1519 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1520 stderr.push_str(&format!(
1521 "error: failed to install manifest hooks: {error}\n"
1522 ));
1523 return finalize_run_error(
1524 stdout,
1525 stderr,
1526 json_session,
1527 summary.as_ref(),
1528 phase.as_ref(),
1529 rusage.as_ref(),
1530 run_started,
1531 None,
1532 timing.as_deref(),
1533 0,
1534 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1535 "manifest_hooks",
1536 error.to_string(),
1537 );
1538 }
1539
1540 let local = tokio::task::LocalSet::new();
1542 if let Some(t) = timing.as_deref_mut() {
1543 t.run_setup = setup_start.elapsed();
1544 }
1545 let main_start = Instant::now();
1546 let execution = local
1547 .run_until(async {
1548 match vm.execute(&chunk).await {
1549 Ok(value) => Ok((vm.output(), value)),
1550 Err(e) => Err(vm.format_runtime_error(&e)),
1551 }
1552 })
1553 .await;
1554 if let Some(t) = timing.as_deref_mut() {
1555 t.run_main = main_start.elapsed();
1556 }
1557 if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
1558 stderr.push_str(&format!("error: {error}\n"));
1559 let profile_rollup = if profile.is_enabled() {
1560 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1561 } else {
1562 None
1563 };
1564 return finalize_run_error(
1565 stdout,
1566 stderr,
1567 json_session,
1568 summary.as_ref(),
1569 phase.as_ref(),
1570 rusage.as_ref(),
1571 run_started,
1572 profile_rollup.as_ref(),
1573 timing.as_deref(),
1574 harn_vm::tracing::peek_spans().len() as u64,
1575 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1576 "llm_mock_record",
1577 error,
1578 );
1579 }
1580
1581 let buffered_stderr = harn_vm::take_stderr_buffer();
1583 stderr.push_str(&buffered_stderr);
1584
1585 let exit_code = match &execution {
1586 Ok((_, return_value)) => exit_code_from_return_value(return_value),
1587 Err(_) => 1,
1588 };
1589
1590 if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
1591 if let Err(error) = emit_run_attestation(
1592 log,
1593 path,
1594 store_base,
1595 attestation_started_at_ms,
1596 exit_code,
1597 options,
1598 &mut stderr,
1599 )
1600 .await
1601 {
1602 stderr.push_str(&format!(
1603 "error: failed to emit provenance receipt: {error}\n"
1604 ));
1605 let profile_rollup = if profile.is_enabled() {
1606 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1607 } else {
1608 None
1609 };
1610 return finalize_run_error(
1611 stdout,
1612 stderr,
1613 json_session,
1614 summary.as_ref(),
1615 phase.as_ref(),
1616 rusage.as_ref(),
1617 run_started,
1618 profile_rollup.as_ref(),
1619 timing.as_deref(),
1620 harn_vm::tracing::peek_spans().len() as u64,
1621 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1622 "attestation",
1623 error,
1624 );
1625 }
1626 harn_vm::event_log::reset_active_event_log();
1627 }
1628
1629 match execution {
1630 Ok((output, return_value)) => {
1631 stdout.push_str(output);
1632 let main_events = harn_vm::tracing::peek_spans().len() as u64;
1633 let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1634 let profile_rollup = if profile.is_enabled() {
1635 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1636 } else {
1637 None
1638 };
1639 let summary_llm = summary.as_ref().map(|_| run_summary_llm_snapshot());
1640 if trace {
1641 stderr.push_str(&render_trace_summary());
1642 }
1643 if let Some(profile_rollup) = profile_rollup.as_ref() {
1644 if let Err(error) =
1645 render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1646 {
1647 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1648 }
1649 }
1650 if exit_code != 0 {
1651 stderr.push_str(&render_return_value_error(&return_value));
1652 }
1653 let aux_emission = emit_run_aux_for_exit(
1654 summary.as_ref(),
1655 phase.as_ref(),
1656 rusage.as_ref(),
1657 run_started,
1658 exit_code,
1659 profile_rollup.as_ref(),
1660 summary_llm,
1661 timing.as_deref(),
1662 main_events,
1663 cpu_ms_total,
1664 json_session.is_some(),
1665 &mut stderr,
1666 );
1667 if let Some(session) = json_session {
1668 if let Some(error) = aux_emission.error {
1669 let mut outcome = session.finalize_error(
1670 "run_aux",
1671 format!("failed to emit auxiliary run JSON: {error}"),
1672 1,
1673 );
1674 outcome.stderr = aux_emission.stderr;
1675 return outcome;
1676 }
1677 let value = harn_vm::llm::vm_value_to_json(&return_value);
1678 let mut outcome = session.finalize_result(value, aux_emission.exit_code);
1679 outcome.stderr = aux_emission.stderr;
1680 return outcome;
1681 }
1682 RunOutcome {
1683 stdout,
1684 stderr,
1685 exit_code: aux_emission.exit_code,
1686 }
1687 }
1688 Err(rendered_error) => {
1689 stderr.push_str(&rendered_error);
1690 let main_events = harn_vm::tracing::peek_spans().len() as u64;
1691 let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1692 let profile_rollup = if profile.is_enabled() {
1693 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1694 } else {
1695 None
1696 };
1697 if let Some(profile_rollup) = profile_rollup.as_ref() {
1698 if let Err(error) =
1699 render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1700 {
1701 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1702 }
1703 }
1704 let aux_emission = emit_run_aux_for_exit(
1705 summary.as_ref(),
1706 phase.as_ref(),
1707 rusage.as_ref(),
1708 run_started,
1709 1,
1710 profile_rollup.as_ref(),
1711 None,
1712 timing.as_deref(),
1713 main_events,
1714 cpu_ms_total,
1715 json_session.is_some(),
1716 &mut stderr,
1717 );
1718 if let Some(session) = json_session {
1719 let mut outcome =
1720 session.finalize_error("runtime", rendered_error, aux_emission.exit_code);
1721 outcome.stderr = aux_emission.stderr;
1722 return outcome;
1723 }
1724 RunOutcome {
1725 stdout,
1726 stderr,
1727 exit_code: aux_emission.exit_code,
1728 }
1729 }
1730 }
1731}
1732
1733fn render_and_persist_profile_rollup(
1734 options: &RunProfileOptions,
1735 profile: &harn_vm::profile::RunProfile,
1736 stderr: &mut String,
1737) -> Result<(), String> {
1738 if options.text {
1739 stderr.push_str(&harn_vm::profile::render(profile));
1740 }
1741 if let Some(path) = options.json_path.as_ref() {
1742 if let Some(parent) = path.parent() {
1743 if !parent.as_os_str().is_empty() {
1744 fs::create_dir_all(parent)
1745 .map_err(|error| format!("create {}: {error}", parent.display()))?;
1746 }
1747 }
1748 let json = serde_json::to_string_pretty(profile)
1749 .map_err(|error| format!("serialize profile: {error}"))?;
1750 fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
1751 }
1752 Ok(())
1753}
1754
1755fn build_run_summary<'a>(
1756 started: Instant,
1757 exit_code: i32,
1758 profile: Option<&'a harn_vm::profile::RunProfile>,
1759 llm: RunSummaryLlm,
1760) -> RunSummary<'a> {
1761 RunSummary {
1762 schema_version: RUN_SUMMARY_SCHEMA_VERSION,
1763 event: "run_summary",
1764 wall_time_ms: started.elapsed().as_millis().min(u128::from(u64::MAX)) as u64,
1765 exit_code,
1766 llm,
1767 profile,
1768 }
1769}
1770
1771fn run_summary_llm_snapshot() -> RunSummaryLlm {
1772 let (input_tokens, output_tokens, time_ms, call_count) = harn_vm::llm::peek_trace_summary();
1773 let cost_usd = harn_vm::llm::peek_total_cost();
1774 RunSummaryLlm {
1775 call_count,
1776 input_tokens,
1777 output_tokens,
1778 time_ms,
1779 cost_usd: if cost_usd.is_finite() { cost_usd } else { 0.0 },
1780 }
1781}
1782
1783struct RunAuxEmission {
1784 stderr: String,
1785 exit_code: i32,
1786 error: Option<String>,
1787}
1788
1789#[allow(clippy::too_many_arguments)]
1790fn emit_run_aux_for_exit(
1791 summary: Option<&RunSummaryOptions>,
1792 phase: Option<&RunPhaseOptions>,
1793 rusage: Option<&RunRusageOptions>,
1794 started: Instant,
1795 exit_code: i32,
1796 profile: Option<&harn_vm::profile::RunProfile>,
1797 llm: Option<RunSummaryLlm>,
1798 timing: Option<&RunTiming>,
1799 main_events: u64,
1800 cpu_ms_total: Option<u64>,
1801 json_mode: bool,
1802 stderr: &mut String,
1803) -> RunAuxEmission {
1804 let mut aux_stderr = String::new();
1805 let mut final_exit_code = exit_code;
1806 let mut aux_error = None;
1807 let aux_target = if json_mode { &mut aux_stderr } else { stderr };
1808 let default_timing = RunTiming::default();
1809 let timing = timing.unwrap_or(&default_timing);
1810
1811 if let Some(options) = summary {
1812 let llm = llm.unwrap_or_else(run_summary_llm_snapshot);
1813 let summary = build_run_summary(started, exit_code, profile, llm);
1814 if let Err(error) = emit_raw_json_line(&options.sink, &summary, "run summary", aux_target) {
1815 record_aux_error(
1816 &mut final_exit_code,
1817 &mut aux_error,
1818 aux_target,
1819 "run summary",
1820 error,
1821 );
1822 }
1823 }
1824 if let Some(options) = phase {
1825 let phase_event = RunPhaseEvent {
1826 schema_version: RUN_PHASE_SCHEMA_VERSION,
1827 event: "run_phase",
1828 phases: time::build_phase_records(timing, main_events),
1829 };
1830 if let Err(error) = emit_raw_json_line(&options.sink, &phase_event, "run phase", aux_target)
1831 {
1832 record_aux_error(
1833 &mut final_exit_code,
1834 &mut aux_error,
1835 aux_target,
1836 "run phase",
1837 error,
1838 );
1839 }
1840 }
1841 if let Some(options) = rusage {
1842 let rusage_event = RunRusageEvent {
1843 schema_version: RUN_RUSAGE_SCHEMA_VERSION,
1844 event: "run_rusage",
1845 cpu_ms: cpu_ms_total.unwrap_or(0),
1846 };
1847 if let Err(error) =
1848 emit_raw_json_line(&options.sink, &rusage_event, "run rusage", aux_target)
1849 {
1850 record_aux_error(
1851 &mut final_exit_code,
1852 &mut aux_error,
1853 aux_target,
1854 "run rusage",
1855 error,
1856 );
1857 }
1858 }
1859
1860 RunAuxEmission {
1861 stderr: aux_stderr,
1862 exit_code: final_exit_code,
1863 error: aux_error,
1864 }
1865}
1866
1867fn record_aux_error(
1868 final_exit_code: &mut i32,
1869 aux_error: &mut Option<String>,
1870 stderr: &mut String,
1871 label: &str,
1872 error: String,
1873) {
1874 stderr.push_str(&format!("error: failed to emit {label}: {error}\n"));
1875 if *final_exit_code == 0 {
1876 *final_exit_code = 1;
1877 }
1878 if aux_error.is_none() {
1879 *aux_error = Some(error);
1880 }
1881}
1882
1883fn emit_raw_json_line(
1884 sink: &RunJsonSink,
1885 value: &impl Serialize,
1886 label: &str,
1887 stderr: &mut String,
1888) -> Result<(), String> {
1889 let line =
1890 serde_json::to_string(value).map_err(|error| format!("serialize {label}: {error}"))? + "\n";
1891 match &sink.target {
1892 RunJsonSinkTarget::Stderr => {
1893 stderr.push_str(&line);
1894 Ok(())
1895 }
1896 RunJsonSinkTarget::File(path) => write_raw_json_file(path, &line),
1897 RunJsonSinkTarget::Fd(fd) => write_raw_json_fd(*fd, &line, sink.fd_flag),
1898 }
1899}
1900
1901fn write_raw_json_file(path: &Path, line: &str) -> Result<(), String> {
1902 if let Some(parent) = path.parent() {
1903 if !parent.as_os_str().is_empty() {
1904 fs::create_dir_all(parent)
1905 .map_err(|error| format!("create {}: {error}", parent.display()))?;
1906 }
1907 }
1908 fs::write(path, line).map_err(|error| format!("write {}: {error}", path.display()))
1909}
1910
1911#[cfg(unix)]
1912fn write_raw_json_fd(fd: i32, line: &str, flag: &str) -> Result<(), String> {
1913 use std::fs::File;
1914 use std::os::unix::io::FromRawFd;
1915
1916 if fd < 0 {
1917 return Err(format!("invalid {flag} {fd}: must be non-negative"));
1918 }
1919 let duped = unsafe { libc::dup(fd) };
1920 if duped < 0 {
1921 return Err(format!(
1922 "duplicate {flag} {fd}: {}",
1923 io::Error::last_os_error()
1924 ));
1925 }
1926 let mut file = unsafe { File::from_raw_fd(duped) };
1927 file.write_all(line.as_bytes())
1928 .and_then(|_| file.flush())
1929 .map_err(|error| format!("write {flag} {fd}: {error}"))
1930}
1931
1932#[cfg(not(unix))]
1933fn write_raw_json_fd(_fd: i32, _line: &str, flag: &str) -> Result<(), String> {
1934 Err(format!("{flag} is only supported on Unix platforms"))
1935}
1936
1937async fn append_run_provenance_event(
1938 log: &Arc<harn_vm::event_log::AnyEventLog>,
1939 kind: &str,
1940 payload: serde_json::Value,
1941) {
1942 let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
1943 return;
1944 };
1945 let _ = log
1946 .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
1947 .await;
1948}
1949
1950async fn emit_run_attestation(
1951 log: &Arc<harn_vm::event_log::AnyEventLog>,
1952 path: &str,
1953 store_base: &Path,
1954 started_at_ms: i64,
1955 exit_code: i32,
1956 options: &RunAttestationOptions,
1957 stderr: &mut String,
1958) -> Result<(), String> {
1959 let finished_at_ms = now_ms();
1960 let status = if exit_code == 0 { "success" } else { "failure" };
1961 append_run_provenance_event(
1962 log,
1963 "finished",
1964 serde_json::json!({
1965 "pipeline": path,
1966 "status": status,
1967 "exit_code": exit_code,
1968 }),
1969 )
1970 .await;
1971 log.flush()
1972 .await
1973 .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
1974 let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
1975 .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
1976 let (signing_key, key_id) =
1977 harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
1978 .await
1979 .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
1980 let receipt = harn_vm::build_signed_receipt(
1981 log,
1982 harn_vm::ReceiptBuildOptions {
1983 pipeline: path.to_string(),
1984 status: status.to_string(),
1985 started_at_ms,
1986 finished_at_ms,
1987 exit_code,
1988 producer_name: "harn-cli".to_string(),
1989 producer_version: env!("CARGO_PKG_VERSION").to_string(),
1990 },
1991 &signing_key,
1992 key_id,
1993 )
1994 .await
1995 .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
1996 let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
1997 if let Some(parent) = receipt_path.parent() {
1998 fs::create_dir_all(parent)
1999 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2000 }
2001 let encoded = serde_json::to_vec_pretty(&receipt)
2002 .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
2003 fs::write(&receipt_path, encoded)
2004 .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
2005 stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
2006 Ok(())
2007}
2008
2009fn receipt_output_path(
2010 store_base: &Path,
2011 options: &RunAttestationOptions,
2012 receipt_id: &str,
2013) -> PathBuf {
2014 if let Some(path) = options.receipt_out.as_ref() {
2015 return path.clone();
2016 }
2017 harn_vm::runtime_paths::state_root(store_base)
2018 .join("receipts")
2019 .join(format!("{receipt_id}.json"))
2020}
2021
2022fn now_ms() -> i64 {
2023 std::time::SystemTime::now()
2024 .duration_since(std::time::UNIX_EPOCH)
2025 .map(|duration| duration.as_millis() as i64)
2026 .unwrap_or(0)
2027}
2028
2029fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
2036 use harn_vm::VmValue;
2037 match value {
2038 VmValue::Int(n) => (*n).clamp(0, 255) as i32,
2039 VmValue::EnumVariant(enum_variant) if enum_variant.is_variant("Result", "Err") => 1,
2040 _ => 0,
2041 }
2042}
2043
2044struct JsonRunSession {
2058 emitter: self::json_events::NdjsonEmitter,
2059 prior_sink: Option<Arc<dyn harn_vm::run_events::RunEventSink>>,
2060}
2061
2062impl JsonRunSession {
2063 fn install(options: RunJsonOptions, out: Box<dyn io::Write + Send>) -> Self {
2064 let emitter = NdjsonEmitter::new(out, options.quiet);
2065 let prior_sink = harn_vm::run_events::install_sink(emitter.sink());
2066 Self {
2067 emitter,
2068 prior_sink,
2069 }
2070 }
2071
2072 fn finalize_result(self, value: serde_json::Value, exit_code: i32) -> RunOutcome {
2073 self.emitter.emit_result(value, exit_code);
2074 RunOutcome {
2075 stdout: String::new(),
2076 stderr: String::new(),
2077 exit_code,
2078 }
2079 }
2080
2081 fn finalize_error(
2082 self,
2083 code: impl Into<String>,
2084 message: impl Into<String>,
2085 exit_code: i32,
2086 ) -> RunOutcome {
2087 self.emitter.emit_error(code, message);
2088 RunOutcome {
2089 stdout: String::new(),
2090 stderr: String::new(),
2091 exit_code,
2092 }
2093 }
2094}
2095
2096impl Drop for JsonRunSession {
2097 fn drop(&mut self) {
2098 match self.prior_sink.take() {
2099 Some(prior) => {
2100 harn_vm::run_events::install_sink(prior);
2101 }
2102 None => harn_vm::run_events::clear_sink(),
2103 }
2104 }
2105}
2106
2107#[allow(clippy::too_many_arguments)]
2108fn finalize_run_error(
2109 stdout: String,
2110 mut stderr: String,
2111 json_session: Option<JsonRunSession>,
2112 summary: Option<&RunSummaryOptions>,
2113 phase: Option<&RunPhaseOptions>,
2114 rusage: Option<&RunRusageOptions>,
2115 started: Instant,
2116 profile: Option<&harn_vm::profile::RunProfile>,
2117 timing: Option<&RunTiming>,
2118 main_events: u64,
2119 cpu_ms_total: Option<u64>,
2120 code: impl Into<String>,
2121 message: impl Into<String>,
2122) -> RunOutcome {
2123 let aux_emission = emit_run_aux_for_exit(
2124 summary,
2125 phase,
2126 rusage,
2127 started,
2128 1,
2129 profile,
2130 None,
2131 timing,
2132 main_events,
2133 cpu_ms_total,
2134 json_session.is_some(),
2135 &mut stderr,
2136 );
2137 if let Some(session) = json_session {
2138 let mut outcome = session.finalize_error(code, message, aux_emission.exit_code);
2139 outcome.stderr = aux_emission.stderr;
2140 return outcome;
2141 }
2142 RunOutcome {
2143 stdout,
2144 stderr,
2145 exit_code: aux_emission.exit_code,
2146 }
2147}
2148
2149fn finalize_harnpack_error(
2154 mut stderr: String,
2155 json_session: Option<JsonRunSession>,
2156 summary: Option<&RunSummaryOptions>,
2157 phase: Option<&RunPhaseOptions>,
2158 rusage: Option<&RunRusageOptions>,
2159 started: Instant,
2160 err: HarnpackError,
2161) -> RunOutcome {
2162 let code = err.code;
2163 let message = err.message;
2164 stderr.push_str(&format!("error: {message}\n"));
2165 finalize_run_error(
2166 String::new(),
2167 stderr,
2168 json_session,
2169 summary,
2170 phase,
2171 rusage,
2172 started,
2173 None,
2174 None,
2175 0,
2176 None,
2177 code,
2178 message,
2179 )
2180}
2181
2182fn finalize_harnpack_dry_run(
2187 mut stderr: String,
2188 json_session: Option<JsonRunSession>,
2189 summary_options: Option<&RunSummaryOptions>,
2190 phase_options: Option<&RunPhaseOptions>,
2191 rusage_options: Option<&RunRusageOptions>,
2192 started: Instant,
2193 cpu_ms_total: Option<u64>,
2194 prepared: &PreparedHarnpack,
2195) -> RunOutcome {
2196 let summary = format!(
2197 "[harn] harnpack verify ok: bundle_hash={}, signature_verified={}, cache_hit={}\n",
2198 prepared.bundle_hash, prepared.signature_verified, prepared.cache_hit
2199 );
2200 stderr.push_str(&summary);
2201 let aux_emission = emit_run_aux_for_exit(
2202 summary_options,
2203 phase_options,
2204 rusage_options,
2205 started,
2206 0,
2207 None,
2208 None,
2209 None,
2210 0,
2211 cpu_ms_total,
2212 json_session.is_some(),
2213 &mut stderr,
2214 );
2215 if let Some(session) = json_session {
2216 if let Some(error) = aux_emission.error {
2217 let mut outcome = session.finalize_error(
2218 "run_aux",
2219 format!("failed to emit auxiliary run JSON: {error}"),
2220 1,
2221 );
2222 outcome.stderr = aux_emission.stderr;
2223 return outcome;
2224 }
2225 let value = serde_json::json!({
2226 "bundle_hash": prepared.bundle_hash,
2227 "signature_verified": prepared.signature_verified,
2228 "key_id": prepared.key_id,
2229 "cache_hit": prepared.cache_hit,
2230 "dry_run_verify": true,
2231 });
2232 let mut outcome = session.finalize_result(value, aux_emission.exit_code);
2233 outcome.stderr = aux_emission.stderr;
2234 return outcome;
2235 }
2236 RunOutcome {
2237 stdout: String::new(),
2238 stderr,
2239 exit_code: aux_emission.exit_code,
2240 }
2241}
2242
2243fn render_return_value_error(value: &harn_vm::VmValue) -> String {
2244 let harn_vm::VmValue::EnumVariant(enum_variant) = value else {
2245 return String::new();
2246 };
2247 if !enum_variant.is_variant("Result", "Err") {
2248 return String::new();
2249 }
2250 let rendered = enum_variant
2251 .fields
2252 .first()
2253 .map(|p| p.display())
2254 .unwrap_or_default();
2255 if rendered.is_empty() {
2256 "error\n".to_string()
2257 } else if rendered.ends_with('\n') {
2258 rendered
2259 } else {
2260 format!("{rendered}\n")
2261 }
2262}
2263
2264pub(crate) async fn connect_mcp_servers(
2273 servers: &[package::McpServerConfig],
2274 vm: &mut harn_vm::Vm,
2275) {
2276 use std::collections::BTreeMap;
2277 use std::time::Duration;
2278
2279 let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
2280 let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
2281
2282 for server in servers {
2283 let resolved_auth = match mcp::resolve_auth_for_server(server).await {
2284 Ok(resolution) => resolution,
2285 Err(error) => {
2286 eprintln!(
2287 "warning: mcp: failed to load auth for '{}': {}",
2288 server.name, error
2289 );
2290 AuthResolution::None
2291 }
2292 };
2293 let spec = serde_json::json!({
2294 "name": server.name,
2295 "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
2296 "command": server.command,
2297 "args": server.args,
2298 "env": server.env,
2299 "url": server.url,
2300 "auth_token": match resolved_auth {
2301 AuthResolution::Bearer(token) => Some(token),
2302 AuthResolution::None => server.auth_token.clone(),
2303 },
2304 "protocol_version": server.protocol_version,
2305 "protocol_mode": server.protocol_mode,
2306 "proxy_server_name": server.proxy_server_name,
2307 });
2308
2309 registrations.push(harn_vm::RegisteredMcpServer {
2312 name: server.name.clone(),
2313 spec: spec.clone(),
2314 lazy: server.lazy,
2315 card: server.card.clone(),
2316 keep_alive: server.keep_alive_ms.map(Duration::from_millis),
2317 });
2318
2319 if server.lazy {
2320 eprintln!(
2321 "[harn] mcp: deferred '{}' (lazy, boots on first use)",
2322 server.name
2323 );
2324 continue;
2325 }
2326
2327 match harn_vm::connect_mcp_server_from_json(&spec).await {
2328 Ok(handle) => {
2329 eprintln!("[harn] mcp: connected to '{}'", server.name);
2330 harn_vm::mcp_install_active(&server.name, handle.clone());
2331 mcp_dict.insert(server.name.clone(), harn_vm::VmValue::mcp_client(handle));
2332 }
2333 Err(e) => {
2334 eprintln!(
2335 "warning: mcp: failed to connect to '{}': {}",
2336 server.name, e
2337 );
2338 }
2339 }
2340 }
2341
2342 harn_vm::mcp_register_servers(registrations);
2345
2346 if !mcp_dict.is_empty() {
2347 vm.set_global("mcp", harn_vm::VmValue::Dict(std::sync::Arc::new(mcp_dict)));
2348 }
2349}
2350
2351pub(crate) fn render_trace_summary() -> String {
2352 use std::fmt::Write;
2353 let entries = harn_vm::llm::take_trace();
2354 if entries.is_empty() {
2355 return String::new();
2356 }
2357 let mut out = String::new();
2358 let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
2359 let mut total_input = 0i64;
2360 let mut total_output = 0i64;
2361 let mut total_ms = 0u64;
2362 for (i, entry) in entries.iter().enumerate() {
2363 let _ = writeln!(
2364 out,
2365 " #{}: {} | {} in + {} out tokens | {} ms",
2366 i + 1,
2367 entry.model,
2368 entry.input_tokens,
2369 entry.output_tokens,
2370 entry.duration_ms,
2371 );
2372 total_input += entry.input_tokens;
2373 total_output += entry.output_tokens;
2374 total_ms += entry.duration_ms;
2375 }
2376 let total_tokens = total_input + total_output;
2377 let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
2379 let _ = writeln!(
2380 out,
2381 " \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
2382 entries.len(),
2383 if entries.len() == 1 { "" } else { "s" },
2384 total_tokens,
2385 total_input,
2386 total_output,
2387 total_ms,
2388 cost,
2389 );
2390 out
2391}
2392
2393pub(crate) async fn run_file_mcp_serve(
2407 path: &str,
2408 card_source: Option<&str>,
2409 mode: RunFileMcpServeMode,
2410) {
2411 let mut diagnostics = String::new();
2412 let Some(LoadedChunk { source, chunk }) = compile_or_load_chunk_for_run(path, &mut diagnostics)
2413 else {
2414 eprint!("{diagnostics}");
2415 process::exit(1);
2416 };
2417 if !diagnostics.is_empty() {
2418 eprint!("{diagnostics}");
2419 }
2420
2421 let mut vm = harn_vm::Vm::new();
2422 harn_vm::register_vm_stdlib(&mut vm);
2423 crate::install_default_hostlib(&mut vm);
2424 let source_parent = std::path::Path::new(path)
2425 .parent()
2426 .unwrap_or(std::path::Path::new("."));
2427 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
2428 let store_base = project_root.as_deref().unwrap_or(source_parent);
2429 harn_vm::register_store_builtins(&mut vm, store_base);
2430 harn_vm::register_metadata_builtins(&mut vm, store_base);
2431 let pipeline_name = std::path::Path::new(path)
2432 .file_stem()
2433 .and_then(|s| s.to_str())
2434 .unwrap_or("default");
2435 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
2436 vm.set_source_info(path, &source);
2437 if let Some(ref root) = project_root {
2438 vm.set_project_root(root);
2439 }
2440 if let Some(p) = std::path::Path::new(path).parent() {
2441 if !p.as_os_str().is_empty() {
2442 vm.set_source_dir(p);
2443 }
2444 }
2445
2446 let loaded = load_skills(&SkillLoaderInputs {
2448 cli_dirs: Vec::new(),
2449 source_path: Some(std::path::PathBuf::from(path)),
2450 });
2451 emit_loader_warnings(&loaded.loader_warnings);
2452 install_skills_global(&mut vm, &loaded);
2453
2454 let extensions = package::load_runtime_extensions(Path::new(path));
2455 package::install_runtime_extensions(&extensions);
2456 if let Some(manifest) = extensions.root_manifest.as_ref() {
2457 if !manifest.mcp.is_empty() {
2458 connect_mcp_servers(&manifest.mcp, &mut vm).await;
2459 }
2460 }
2461 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
2462 eprintln!("error: failed to install manifest triggers: {error}");
2463 process::exit(1);
2464 }
2465 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
2466 eprintln!("error: failed to install manifest hooks: {error}");
2467 process::exit(1);
2468 }
2469
2470 let local = tokio::task::LocalSet::new();
2471 local
2472 .run_until(async {
2473 match vm.execute(&chunk).await {
2474 Ok(_) => {}
2475 Err(e) => {
2476 eprint!("{}", vm.format_runtime_error(&e));
2477 process::exit(1);
2478 }
2479 }
2480
2481 let output = vm.output();
2483 if !output.is_empty() {
2484 eprint!("{output}");
2485 }
2486
2487 let registry = match harn_vm::take_mcp_serve_registry() {
2488 Some(r) => r,
2489 None => {
2490 eprintln!("error: pipeline did not call mcp_serve(registry)");
2491 eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
2492 process::exit(1);
2493 }
2494 };
2495
2496 let tools = match harn_vm::tool_registry_to_mcp_tools(®istry) {
2497 Ok(t) => t,
2498 Err(e) => {
2499 eprintln!("error: {e}");
2500 process::exit(1);
2501 }
2502 };
2503
2504 let resources = harn_vm::take_mcp_serve_resources();
2505 let resource_templates = harn_vm::take_mcp_serve_resource_templates();
2506 let prompts = harn_vm::take_mcp_serve_prompts();
2507
2508 let server_name = std::path::Path::new(path)
2509 .file_stem()
2510 .and_then(|s| s.to_str())
2511 .unwrap_or("harn")
2512 .to_string();
2513
2514 let mut caps = Vec::new();
2515 if !tools.is_empty() {
2516 caps.push(format!(
2517 "{} tool{}",
2518 tools.len(),
2519 if tools.len() == 1 { "" } else { "s" }
2520 ));
2521 }
2522 let total_resources = resources.len() + resource_templates.len();
2523 if total_resources > 0 {
2524 caps.push(format!(
2525 "{total_resources} resource{}",
2526 if total_resources == 1 { "" } else { "s" }
2527 ));
2528 }
2529 if !prompts.is_empty() {
2530 caps.push(format!(
2531 "{} prompt{}",
2532 prompts.len(),
2533 if prompts.len() == 1 { "" } else { "s" }
2534 ));
2535 }
2536 eprintln!(
2537 "[harn] serve mcp: serving {} as '{server_name}'",
2538 caps.join(", ")
2539 );
2540
2541 let mut server =
2542 harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
2543 if let Some(source) = card_source {
2544 match resolve_card_source(source) {
2545 Ok(card) => server = server.with_server_card(card),
2546 Err(e) => {
2547 eprintln!("error: --card: {e}");
2548 process::exit(1);
2549 }
2550 }
2551 }
2552 match mode {
2553 RunFileMcpServeMode::Stdio => {
2554 if let Err(e) = server.run(&mut vm).await {
2555 eprintln!("error: MCP server error: {e}");
2556 process::exit(1);
2557 }
2558 }
2559 RunFileMcpServeMode::Http(http) => {
2560 let RunFileMcpServeHttp {
2561 options,
2562 auth_policy,
2563 } = *http;
2564 if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
2565 server,
2566 vm,
2567 options,
2568 auth_policy,
2569 )
2570 .await
2571 {
2572 eprintln!("error: MCP server error: {e}");
2573 process::exit(1);
2574 }
2575 }
2576 }
2577 })
2578 .await;
2579}
2580
2581pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
2586 let trimmed = source.trim_start();
2587 if trimmed.starts_with('{') || trimmed.starts_with('[') {
2588 return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
2589 }
2590 let path = std::path::Path::new(source);
2591 harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
2592}
2593
2594pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
2595 use notify::{Event, EventKind, RecursiveMode, Watcher};
2596
2597 let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
2598 eprintln!("Error: {e}");
2599 process::exit(1);
2600 });
2601 let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
2602
2603 eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
2604 run_file(
2605 path,
2606 false,
2607 denied_builtins.clone(),
2608 Vec::new(),
2609 CliLlmMockMode::Off,
2610 None,
2611 RunProfileOptions::default(),
2612 )
2613 .await;
2614
2615 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
2616 let _watcher = {
2617 let tx = tx.clone();
2618 let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
2619 if let Ok(event) = res {
2620 if matches!(
2621 event.kind,
2622 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
2623 ) {
2624 let has_harn = event
2625 .paths
2626 .iter()
2627 .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
2628 if has_harn {
2629 let _ = tx.blocking_send(());
2630 }
2631 }
2632 }
2633 })
2634 .unwrap_or_else(|e| {
2635 eprintln!("Error setting up file watcher: {e}");
2636 process::exit(1);
2637 });
2638 watcher
2639 .watch(watch_dir, RecursiveMode::Recursive)
2640 .unwrap_or_else(|e| {
2641 eprintln!("Error watching directory: {e}");
2642 process::exit(1);
2643 });
2644 watcher };
2646
2647 eprintln!(
2648 "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
2649 watch_dir.display()
2650 );
2651
2652 loop {
2653 rx.recv().await;
2654 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2656 while rx.try_recv().is_ok() {}
2657
2658 eprintln!();
2659 eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
2660 run_file(
2661 path,
2662 false,
2663 denied_builtins.clone(),
2664 Vec::new(),
2665 CliLlmMockMode::Off,
2666 None,
2667 RunProfileOptions::default(),
2668 )
2669 .await;
2670 }
2671}
2672
2673#[cfg(test)]
2674mod tests;