1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::Instant;
9
10use harn_parser::DiagnosticSeverity;
11use harn_vm::event_log::EventLog;
12use serde::Serialize;
13
14use crate::commands::mcp::{self, AuthResolution};
15use crate::commands::time::{self, PhaseRecord, RunTiming};
16use crate::package;
17use crate::parse_source_file;
18use crate::skill_loader::{
19 canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
20 SkillLoaderInputs,
21};
22
23mod explain_cost;
24pub mod harnpack;
25pub mod json_events;
26
27use self::harnpack::{HarnpackError, HarnpackRunOptions, PreparedHarnpack};
28use self::json_events::NdjsonEmitter;
29
30#[derive(Clone, Default)]
32pub struct RunJsonOptions {
33 pub quiet: bool,
36}
37
38#[derive(Clone, Debug)]
40pub struct RunSummaryOptions {
41 pub sink: RunJsonSink,
42}
43
44#[derive(Clone, Debug)]
45pub struct RunPhaseOptions {
46 pub sink: RunJsonSink,
47}
48
49#[derive(Clone, Debug)]
50pub struct RunRusageOptions {
51 pub sink: RunJsonSink,
52}
53
54#[derive(Clone, Debug, Default)]
55pub struct RunAuxOptions {
56 pub summary: Option<RunSummaryOptions>,
57 pub phase: Option<RunPhaseOptions>,
58 pub rusage: Option<RunRusageOptions>,
59}
60
61#[derive(Clone, Debug)]
62pub struct RunJsonSink {
63 pub target: RunJsonSinkTarget,
64 pub fd_flag: &'static str,
65}
66
67#[derive(Clone, Debug)]
68pub enum RunJsonSinkTarget {
69 Stderr,
73 File(PathBuf),
74 Fd(i32),
75}
76
77#[derive(Serialize)]
78struct RunSummary<'a> {
79 schema_version: u32,
80 event: &'static str,
81 wall_time_ms: u64,
82 exit_code: i32,
83 llm: RunSummaryLlm,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 profile: Option<&'a harn_vm::profile::RunProfile>,
86}
87
88#[derive(Serialize)]
89struct RunSummaryLlm {
90 call_count: i64,
91 input_tokens: i64,
92 output_tokens: i64,
93 time_ms: i64,
94 cost_usd: f64,
95}
96
97pub const RUN_SUMMARY_SCHEMA_VERSION: u32 = 1;
98pub const RUN_PHASE_SCHEMA_VERSION: u32 = 1;
99pub const RUN_RUSAGE_SCHEMA_VERSION: u32 = 1;
100
101#[derive(Serialize)]
102struct RunPhaseEvent {
103 schema_version: u32,
104 event: &'static str,
105 phases: Vec<PhaseRecord>,
106}
107
108#[derive(Serialize)]
109struct RunRusageEvent {
110 schema_version: u32,
111 event: &'static str,
112 cpu_ms: u64,
113}
114
115pub(crate) fn run_summary_options_from_args(
116 args: &crate::cli::RunArgs,
117) -> Option<RunSummaryOptions> {
118 args.emit_summary_json.then(|| RunSummaryOptions {
119 sink: build_run_json_sink(args.summary_file.clone(), args.summary_fd, "--summary-fd"),
120 })
121}
122
123pub(crate) fn run_aux_options_from_args(args: &crate::cli::RunArgs) -> RunAuxOptions {
124 RunAuxOptions {
125 summary: run_summary_options_from_args(args),
126 phase: run_phase_options_from_args(args),
127 rusage: run_rusage_options_from_args(args),
128 }
129}
130
131pub(crate) fn run_phase_options_from_args(args: &crate::cli::RunArgs) -> Option<RunPhaseOptions> {
132 args.emit_phase_json.then(|| RunPhaseOptions {
133 sink: build_run_json_sink(args.phase_file.clone(), args.phase_fd, "--phase-fd"),
134 })
135}
136
137pub(crate) fn run_rusage_options_from_args(args: &crate::cli::RunArgs) -> Option<RunRusageOptions> {
138 args.emit_rusage_json.then(|| RunRusageOptions {
139 sink: build_run_json_sink(args.rusage_file.clone(), args.rusage_fd, "--rusage-fd"),
140 })
141}
142
143fn build_run_json_sink(
144 file: Option<PathBuf>,
145 fd: Option<i32>,
146 fd_flag: &'static str,
147) -> RunJsonSink {
148 RunJsonSink {
149 target: if let Some(path) = file {
150 RunJsonSinkTarget::File(path)
151 } else if let Some(fd) = fd {
152 RunJsonSinkTarget::Fd(fd)
153 } else {
154 RunJsonSinkTarget::Stderr
155 },
156 fd_flag,
157 }
158}
159
160pub(crate) enum RunFileMcpServeMode {
161 Stdio,
162 Http {
163 options: harn_serve::McpHttpServeOptions,
164 auth_policy: harn_serve::AuthPolicy,
165 },
166}
167
168const CORE_BUILTINS: &[&str] = &[
170 "println",
171 "print",
172 "log",
173 "type_of",
174 "to_string",
175 "to_int",
176 "to_float",
177 "len",
178 "assert",
179 "assert_eq",
180 "assert_ne",
181 "json_parse",
182 "json_stringify",
183 "runtime_context",
184 "task_current",
185 "runtime_context_values",
186 "runtime_context_get",
187 "runtime_context_set",
188 "runtime_context_clear",
189];
190
191pub(crate) fn build_denied_builtins(
196 deny_csv: Option<&str>,
197 allow_csv: Option<&str>,
198) -> HashSet<String> {
199 if let Some(csv) = deny_csv {
200 csv.split(',')
201 .map(|s| s.trim().to_string())
202 .filter(|s| !s.is_empty())
203 .collect()
204 } else if let Some(csv) = allow_csv {
205 let allowed: HashSet<String> = csv
208 .split(',')
209 .map(|s| s.trim().to_string())
210 .filter(|s| !s.is_empty())
211 .collect();
212 let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
213
214 let mut tmp = harn_vm::Vm::new();
216 harn_vm::register_vm_stdlib(&mut tmp);
217 harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
218 harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
219
220 tmp.builtin_names()
221 .into_iter()
222 .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
223 .collect()
224 } else {
225 HashSet::new()
226 }
227}
228
229pub(crate) struct LoadedChunk {
233 pub(crate) source: String,
234 pub(crate) chunk: harn_vm::Chunk,
235}
236
237pub(crate) fn compile_or_load_chunk_for_run(
249 path: &str,
250 stderr: &mut String,
251) -> Option<LoadedChunk> {
252 compile_or_load_chunk_with_timing(path, stderr, None)
253}
254
255#[allow(clippy::needless_option_as_deref)]
265pub(crate) fn compile_or_load_chunk_with_timing(
266 path: &str,
267 stderr: &mut String,
268 mut timing: Option<&mut RunTiming>,
269) -> Option<LoadedChunk> {
270 let source = match fs::read_to_string(path) {
271 Ok(s) => s,
272 Err(e) => {
273 stderr.push_str(&format!("Error reading {path}: {e}\n"));
274 return None;
275 }
276 };
277 if let Some(t) = timing.as_deref_mut() {
278 t.input_bytes = source.len() as u64;
279 }
280
281 let compile_phase_start = Instant::now();
282 let lookup = harn_vm::bytecode_cache::load(Path::new(path), &source);
283 if let Some(chunk) = lookup.chunk {
284 if let Some(t) = timing.as_deref_mut() {
285 t.cache_hit = true;
286 t.bytecode_compile = compile_phase_start.elapsed();
287 }
288 return Some(LoadedChunk { source, chunk });
289 }
290 if let Some(t) = timing.as_deref_mut() {
291 t.cache_hit = false;
292 }
293
294 let parse_start = Instant::now();
295 let program = parse_source_for_run(path, &source, stderr)?;
296 if let Some(t) = timing.as_deref_mut() {
297 t.parse = parse_start.elapsed();
298 }
299
300 let typecheck_start = Instant::now();
301 let mut had_type_error = false;
302 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
303 for diag in &type_diagnostics {
304 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
305 if matches!(diag.severity, DiagnosticSeverity::Error) {
306 had_type_error = true;
307 }
308 stderr.push_str(&rendered);
309 }
310 if let Some(t) = timing.as_deref_mut() {
311 t.typecheck = typecheck_start.elapsed();
312 }
313 if had_type_error {
314 return None;
315 }
316
317 let compile_step_start = Instant::now();
318 let chunk = match harn_vm::Compiler::new().compile(&program) {
319 Ok(c) => c,
320 Err(e) => {
321 stderr.push_str(&format!("error: compile error: {e}\n"));
322 return None;
323 }
324 };
325
326 if let Err(err) = harn_vm::bytecode_cache::store(&lookup.key, &chunk) {
331 if std::env::var_os("HARN_BYTECODE_CACHE_DEBUG").is_some() {
332 eprintln!("[harn] bytecode cache write skipped: {err}");
333 }
334 }
335 if let Some(t) = timing.as_deref_mut() {
336 t.bytecode_compile = compile_step_start.elapsed();
337 }
338
339 Some(LoadedChunk { source, chunk })
340}
341
342fn parse_source_for_run(
343 path: &str,
344 source: &str,
345 stderr: &mut String,
346) -> Option<Vec<harn_parser::SNode>> {
347 let mut lexer = harn_lexer::Lexer::new(source);
348 let tokens = match lexer.tokenize() {
349 Ok(tokens) => tokens,
350 Err(error) => {
351 let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
352 source,
353 path,
354 &error_span_from_lex(&error),
355 "error",
356 harn_parser::diagnostic::lexer_error_code(&error),
357 &error.to_string(),
358 Some("here"),
359 None,
360 );
361 stderr.push_str(&diagnostic);
362 return None;
363 }
364 };
365
366 let mut parser = harn_parser::Parser::new(tokens);
367 match parser.parse() {
368 Ok(program) => Some(program),
369 Err(error) => {
370 if parser.all_errors().is_empty() {
371 render_parse_error(path, source, &error, stderr);
372 } else {
373 for error in parser.all_errors() {
374 render_parse_error(path, source, error, stderr);
375 }
376 }
377 None
378 }
379 }
380}
381
382fn render_parse_error(
383 path: &str,
384 source: &str,
385 error: &harn_parser::ParserError,
386 stderr: &mut String,
387) {
388 let span = error_span_from_parse(error);
389 let diagnostic = harn_parser::diagnostic::render_diagnostic_with_code(
390 source,
391 path,
392 &span,
393 "error",
394 harn_parser::diagnostic::parser_error_code(error),
395 &harn_parser::diagnostic::parser_error_message(error),
396 Some(harn_parser::diagnostic::parser_error_label(error)),
397 harn_parser::diagnostic::parser_error_help(error),
398 );
399 stderr.push_str(&diagnostic);
400}
401
402fn error_span_from_lex(error: &harn_lexer::LexerError) -> harn_lexer::Span {
403 match error {
404 harn_lexer::LexerError::UnexpectedCharacter(_, span)
405 | harn_lexer::LexerError::UnterminatedString(span)
406 | harn_lexer::LexerError::UnterminatedBlockComment(span) => *span,
407 }
408}
409
410fn error_span_from_parse(error: &harn_parser::ParserError) -> harn_lexer::Span {
411 match error {
412 harn_parser::ParserError::Unexpected { span, .. } => *span,
413 harn_parser::ParserError::UnexpectedEof { span, .. } => *span,
414 }
415}
416
417fn typecheck_with_imports(
422 program: &[harn_parser::SNode],
423 path: &Path,
424 source: &str,
425) -> Vec<harn_parser::TypeDiagnostic> {
426 if let Err(error) = package::ensure_dependencies_materialized(path) {
427 eprintln!("error: {error}");
428 process::exit(1);
429 }
430 let graph = harn_modules::build(&[path.to_path_buf()]);
431 let mut checker = harn_parser::TypeChecker::new();
432 if let Some(imported) = graph.imported_names_for_file(path) {
433 checker = checker.with_imported_names(imported);
434 }
435 if let Some(imported) = graph.imported_type_declarations_for_file(path) {
436 checker = checker.with_imported_type_decls(imported);
437 }
438 if let Some(imported) = graph.imported_callable_declarations_for_file(path) {
439 checker = checker.with_imported_callable_decls(imported);
440 }
441 checker.check_with_source(program, source)
442}
443
444pub(crate) fn prepare_eval_temp_file(
455 code: &str,
456) -> Result<(String, tempfile::NamedTempFile), String> {
457 let (header, body) = split_eval_header(code);
458 let wrapped = if header.is_empty() {
459 format!("pipeline main(task) {{\n{body}\n}}")
460 } else {
461 format!("{header}\npipeline main(task) {{\n{body}\n}}")
462 };
463
464 let tmp = create_eval_temp_file()?;
465 Ok((wrapped, tmp))
466}
467
468fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
473 if let Some(dir) = std::env::current_dir().ok().as_deref() {
474 match tempfile::Builder::new()
477 .prefix(".harn-eval-")
478 .suffix(".harn")
479 .tempfile_in(dir)
480 {
481 Ok(tmp) => return Ok(tmp),
482 Err(error) => eprintln!(
483 "warning: harn run -e: could not create temp file in {}: {error}; \
484 relative imports will not resolve",
485 dir.display()
486 ),
487 }
488 }
489 tempfile::Builder::new()
490 .prefix("harn-eval-")
491 .suffix(".harn")
492 .tempfile()
493 .map_err(|e| format!("failed to create temp file for -e: {e}"))
494}
495
496fn split_eval_header(code: &str) -> (String, String) {
504 let mut header_end = 0usize;
505 let mut last_kept = 0usize;
506 for (idx, line) in code.lines().enumerate() {
507 let trimmed = line.trim_start();
508 if trimmed.is_empty() || trimmed.starts_with("//") {
509 header_end = idx + 1;
510 continue;
511 }
512 let is_import = trimmed.starts_with("import ")
513 || trimmed.starts_with("import\t")
514 || trimmed.starts_with("import\"")
515 || trimmed.starts_with("pub import ")
516 || trimmed.starts_with("pub import\t");
517 if is_import {
518 header_end = idx + 1;
519 last_kept = idx + 1;
520 } else {
521 break;
522 }
523 }
524 if last_kept == 0 {
525 return (String::new(), code.to_string());
526 }
527 let mut header_lines: Vec<&str> = Vec::new();
528 let mut body_lines: Vec<&str> = Vec::new();
529 for (idx, line) in code.lines().enumerate() {
530 if idx < header_end {
531 header_lines.push(line);
532 } else {
533 body_lines.push(line);
534 }
535 }
536 (header_lines.join("\n"), body_lines.join("\n"))
537}
538
539#[derive(Clone, Debug, Default, PartialEq, Eq)]
540pub enum CliLlmMockMode {
541 #[default]
542 Off,
543 Replay {
544 fixture_path: PathBuf,
545 },
546 Record {
547 fixture_path: PathBuf,
548 },
549}
550
551#[derive(Clone, Debug, Default, PartialEq, Eq)]
552pub struct RunAttestationOptions {
553 pub receipt_out: Option<PathBuf>,
554 pub agent_id: Option<String>,
555}
556
557#[derive(Clone, Debug, Default, PartialEq, Eq)]
562pub struct RunProfileOptions {
563 pub text: bool,
564 pub json_path: Option<PathBuf>,
565}
566
567impl RunProfileOptions {
568 pub fn is_enabled(&self) -> bool {
569 self.text || self.json_path.is_some()
570 }
571}
572
573#[derive(Clone, Debug, PartialEq, Eq)]
574pub struct RunSandboxOptions {
575 pub enabled: bool,
577 pub workspace_root: Option<PathBuf>,
581}
582
583impl Default for RunSandboxOptions {
584 fn default() -> Self {
585 Self {
586 enabled: true,
587 workspace_root: None,
588 }
589 }
590}
591
592impl RunSandboxOptions {
593 pub fn disabled() -> Self {
595 Self {
596 enabled: false,
597 workspace_root: None,
598 }
599 }
600
601 pub fn with_workspace_root(mut self, workspace_root: impl Into<PathBuf>) -> Self {
603 self.workspace_root = Some(workspace_root.into());
604 self
605 }
606}
607
608#[derive(Clone)]
609pub struct RunInterruptTokens {
610 pub cancel_token: Arc<AtomicBool>,
611 pub signal_token: Arc<Mutex<Option<String>>>,
612}
613
614struct ExecuteRunInputs<'a> {
615 path: &'a str,
616 trace: bool,
617 denied_builtins: HashSet<String>,
618 script_argv: Vec<String>,
619 skill_dirs_raw: Vec<String>,
620 llm_mock_mode: CliLlmMockMode,
621 attestation: Option<RunAttestationOptions>,
622 profile: RunProfileOptions,
623 sandbox: RunSandboxOptions,
624 interrupt_tokens: Option<RunInterruptTokens>,
625 json: Option<(RunJsonOptions, Box<dyn io::Write + Send>)>,
626 aux: RunAuxOptions,
627 timing: Option<&'a mut RunTiming>,
628 harnpack: HarnpackRunOptions,
629}
630
631#[derive(Clone, Debug, Default)]
635pub struct RunOutcome {
636 pub stdout: String,
637 pub stderr: String,
638 pub exit_code: i32,
639}
640
641pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
642 harn_vm::llm::clear_cli_llm_mock_mode();
643 match mode {
644 CliLlmMockMode::Off => Ok(()),
645 CliLlmMockMode::Replay { fixture_path } => {
646 let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
647 harn_vm::llm::install_cli_llm_mocks(mocks);
648 Ok(())
649 }
650 CliLlmMockMode::Record { .. } => {
651 harn_vm::llm::enable_cli_llm_mock_recording();
652 Ok(())
653 }
654 }
655}
656
657pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
658 let CliLlmMockMode::Record { fixture_path } = mode else {
659 return Ok(());
660 };
661 if let Some(parent) = fixture_path.parent() {
662 if !parent.as_os_str().is_empty() {
663 fs::create_dir_all(parent).map_err(|error| {
664 format!(
665 "failed to create fixture directory {}: {error}",
666 parent.display()
667 )
668 })?;
669 }
670 }
671
672 let lines = harn_vm::llm::take_cli_llm_recordings()
673 .into_iter()
674 .map(harn_vm::llm::serialize_llm_mock)
675 .collect::<Result<Vec<_>, _>>()?;
676 let body = if lines.is_empty() {
677 String::new()
678 } else {
679 format!("{}\n", lines.join("\n"))
680 };
681 fs::write(fixture_path, body)
682 .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
683}
684
685pub(crate) async fn run_file(
686 path: &str,
687 trace: bool,
688 denied_builtins: HashSet<String>,
689 script_argv: Vec<String>,
690 llm_mock_mode: CliLlmMockMode,
691 attestation: Option<RunAttestationOptions>,
692 profile: RunProfileOptions,
693) {
694 run_file_with_skill_dirs(
695 path,
696 trace,
697 denied_builtins,
698 script_argv,
699 Vec::new(),
700 llm_mock_mode,
701 attestation,
702 profile,
703 RunSandboxOptions::default(),
704 None,
705 RunAuxOptions::default(),
706 HarnpackRunOptions::default(),
707 )
708 .await;
709}
710
711pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
712 let outcome = execute_explain_cost(path);
713 if !outcome.stderr.is_empty() {
714 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
715 }
716 if !outcome.stdout.is_empty() {
717 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
718 }
719 if outcome.exit_code != 0 {
720 process::exit(outcome.exit_code);
721 }
722}
723
724#[allow(clippy::too_many_arguments)]
725pub(crate) async fn run_file_with_skill_dirs(
726 path: &str,
727 trace: bool,
728 denied_builtins: HashSet<String>,
729 script_argv: Vec<String>,
730 skill_dirs_raw: Vec<String>,
731 llm_mock_mode: CliLlmMockMode,
732 attestation: Option<RunAttestationOptions>,
733 profile: RunProfileOptions,
734 sandbox: RunSandboxOptions,
735 json: Option<RunJsonOptions>,
736 aux: RunAuxOptions,
737 harnpack: HarnpackRunOptions,
738) {
739 let interrupt_tokens = install_signal_shutdown_handler();
741
742 let _stdout_passthrough = StdoutPassthroughGuard::enable();
743 let json_with_stdout =
744 json.map(|opts| (opts, Box::new(io::stdout()) as Box<dyn io::Write + Send>));
745 let outcome = execute_run_inner(ExecuteRunInputs {
746 path,
747 trace,
748 denied_builtins,
749 script_argv,
750 skill_dirs_raw,
751 llm_mock_mode,
752 attestation,
753 profile,
754 sandbox,
755 interrupt_tokens: Some(interrupt_tokens.clone()),
756 json: json_with_stdout,
757 aux,
758 timing: None,
759 harnpack,
760 })
761 .await;
762
763 if !outcome.stderr.is_empty() {
766 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
767 }
768 if !outcome.stdout.is_empty() {
769 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
770 }
771
772 let mut exit_code = outcome.exit_code;
773 if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
774 exit_code = 124;
775 }
776 if exit_code != 0 {
777 process::exit(exit_code);
778 }
779}
780
781#[allow(clippy::too_many_arguments)]
782pub(crate) async fn run_resume_with_skill_dirs(
783 target: &str,
784 trace: bool,
785 denied_builtins: HashSet<String>,
786 resume_argv: Vec<String>,
787 skill_dirs_raw: Vec<String>,
788 llm_mock_mode: CliLlmMockMode,
789 attestation: Option<RunAttestationOptions>,
790 profile: RunProfileOptions,
791 sandbox: RunSandboxOptions,
792 json: Option<RunJsonOptions>,
793 aux: RunAuxOptions,
794) {
795 let source = r#"import { resume_agent, wait_agent } from "std/agent/workers"
796
797pipeline main(task) {
798 let input = if len(argv) > 1 {
799 argv[1]
800 } else {
801 nil
802 }
803 let handle = resume_agent(argv[0], input, true)
804 return wait_agent(handle)
805}
806"#;
807 let tmp = create_eval_temp_file().unwrap_or_else(|e| {
808 eprintln!("error: {e}");
809 process::exit(1);
810 });
811 let tmp_path = tmp.path().to_path_buf();
812 if let Err(error) = fs::write(&tmp_path, source) {
813 eprintln!("error: failed to write temp file for --resume: {error}");
814 process::exit(1);
815 }
816 let mut argv = Vec::with_capacity(resume_argv.len() + 1);
817 argv.push(target.to_string());
818 argv.extend(resume_argv);
819 let tmp_str = tmp_path.to_string_lossy().into_owned();
820 run_file_with_skill_dirs(
821 &tmp_str,
822 trace,
823 denied_builtins,
824 argv,
825 skill_dirs_raw,
826 llm_mock_mode,
827 attestation,
828 profile,
829 sandbox,
830 json,
831 aux,
832 HarnpackRunOptions::default(),
833 )
834 .await;
835}
836
837pub fn execute_explain_cost(path: &str) -> RunOutcome {
838 let stdout = String::new();
839 let mut stderr = String::new();
840
841 let (source, program) = parse_source_file(path);
842
843 let mut had_type_error = false;
844 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
845 for diag in &type_diagnostics {
846 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
847 if matches!(diag.severity, DiagnosticSeverity::Error) {
848 had_type_error = true;
849 }
850 stderr.push_str(&rendered);
851 }
852 if had_type_error {
853 return RunOutcome {
854 stdout,
855 stderr,
856 exit_code: 1,
857 };
858 }
859
860 let extensions = package::load_runtime_extensions(Path::new(path));
861 package::install_runtime_extensions(&extensions);
862 RunOutcome {
863 stdout: explain_cost::render_explain_cost(path, &program),
864 stderr,
865 exit_code: 0,
866 }
867}
868
869pub(crate) struct StdoutPassthroughGuard {
870 previous: bool,
871}
872
873impl StdoutPassthroughGuard {
874 pub(crate) fn enable() -> Self {
875 Self {
876 previous: harn_vm::set_stdout_passthrough(true),
877 }
878 }
879}
880
881impl Drop for StdoutPassthroughGuard {
882 fn drop(&mut self) {
883 harn_vm::set_stdout_passthrough(self.previous);
884 }
885}
886
887struct ExecutionPolicyGuard;
888
889impl Drop for ExecutionPolicyGuard {
890 fn drop(&mut self) {
891 harn_vm::orchestration::pop_execution_policy();
892 }
893}
894
895struct RunSandboxScope {
896 _execution_policy: Option<ExecutionPolicyGuard>,
897 _egress_policy: Option<harn_vm::egress::ExplicitEgressPolicyGuard>,
898}
899
900impl RunSandboxScope {
901 fn disabled() -> Self {
902 Self {
903 _execution_policy: None,
904 _egress_policy: None,
905 }
906 }
907}
908
909fn install_run_sandbox_scope(
910 options: &RunSandboxOptions,
911 workspace_root: &Path,
912 stderr: &mut String,
913) -> RunSandboxScope {
914 if !options.enabled {
915 stderr.push_str(
916 "warning: harn run --no-sandbox disables filesystem, process, and egress sandbox defaults\n",
917 );
918 return RunSandboxScope::disabled();
919 }
920
921 let execution_policy = if harn_vm::orchestration::current_execution_policy().is_none() {
922 harn_vm::orchestration::push_execution_policy(default_run_capability_policy(
923 workspace_root,
924 ));
925 Some(ExecutionPolicyGuard)
926 } else {
927 None
928 };
929 let egress_policy = Some(harn_vm::egress::require_explicit_egress_policy_for_host());
930
931 RunSandboxScope {
932 _execution_policy: execution_policy,
933 _egress_policy: egress_policy,
934 }
935}
936
937fn default_run_capability_policy(
938 workspace_root: &Path,
939) -> harn_vm::orchestration::CapabilityPolicy {
940 harn_vm::orchestration::CapabilityPolicy {
941 workspace_roots: vec![normalize_run_workspace_root(workspace_root)
942 .display()
943 .to_string()],
944 side_effect_level: Some("process_exec".to_string()),
945 sandbox_profile: harn_vm::orchestration::SandboxProfile::Worktree,
946 ..harn_vm::orchestration::CapabilityPolicy::default()
947 }
948}
949
950fn normalize_run_workspace_root(path: &Path) -> PathBuf {
951 if path.is_absolute() {
952 return path.to_path_buf();
953 }
954 std::env::current_dir()
955 .map(|cwd| cwd.join(path))
956 .unwrap_or_else(|_| path.to_path_buf())
957}
958
959fn default_run_workspace_root(project_root: Option<&Path>, source_parent: &Path) -> PathBuf {
960 project_root
961 .map(Path::to_path_buf)
962 .or_else(|| std::env::current_dir().ok())
963 .unwrap_or_else(|| source_parent.to_path_buf())
964}
965
966fn run_sandbox_attestation(sandbox: &RunSandboxOptions) -> serde_json::Value {
967 let active_policy = harn_vm::orchestration::current_execution_policy();
968 let active = active_policy.is_some();
969 let workspace_roots = active_policy
970 .as_ref()
971 .map(|policy| policy.workspace_roots.clone())
972 .unwrap_or_default();
973 let profile = active_policy
974 .as_ref()
975 .map(|policy| policy.sandbox_profile.as_str())
976 .unwrap_or("unrestricted");
977 let egress = if sandbox.enabled {
978 "explicit_policy_required"
979 } else if active {
980 "host_policy"
981 } else {
982 "unrestricted"
983 };
984
985 serde_json::json!({
986 "run_default_enabled": sandbox.enabled,
987 "active": active,
988 "workspace_roots": workspace_roots,
989 "profile": profile,
990 "egress": egress,
991 })
992}
993
994const FIRST_SIGNAL_MESSAGE: &str =
1003 "[harn] signal received, interrupting VM (give it a moment to unwind in-flight async ops; Ctrl-C again to force-exit)...";
1004
1005fn install_signal_shutdown_handler() -> RunInterruptTokens {
1006 let tokens = RunInterruptTokens {
1007 cancel_token: Arc::new(AtomicBool::new(false)),
1008 signal_token: Arc::new(Mutex::new(None)),
1009 };
1010 let tokens_clone = tokens.clone();
1011 tokio::spawn(async move {
1012 #[cfg(unix)]
1013 {
1014 use tokio::signal::unix::{signal, SignalKind};
1015 let sigterm_stream = signal(SignalKind::terminate()).ok();
1020 let sigint_stream = signal(SignalKind::interrupt()).ok();
1021 let sighup_stream = signal(SignalKind::hangup()).ok();
1022 let (mut sigterm, mut sigint, mut sighup) =
1023 match (sigterm_stream, sigint_stream, sighup_stream) {
1024 (Some(t), Some(i), Some(h)) => (t, i, h),
1025 _ => {
1026 eprintln!(
1027 "[harn] signal handlers unavailable in this environment; \
1028 continuing without graceful-shutdown interception"
1029 );
1030 return;
1031 }
1032 };
1033 let mut seen_signal = false;
1034 loop {
1035 let signal_name = tokio::select! {
1036 _ = sigterm.recv() => "SIGTERM",
1037 _ = sigint.recv() => "SIGINT",
1038 _ = sighup.recv() => "SIGHUP",
1039 };
1040 if seen_signal {
1041 eprintln!("[harn] second signal received, terminating");
1042 process::exit(124);
1043 }
1044 seen_signal = true;
1045 request_vm_interrupt(&tokens_clone, signal_name);
1046 eprintln!("{FIRST_SIGNAL_MESSAGE}");
1047 }
1048 }
1049 #[cfg(not(unix))]
1050 {
1051 let mut seen_signal = false;
1052 loop {
1053 let _ = tokio::signal::ctrl_c().await;
1054 if seen_signal {
1055 eprintln!("[harn] second signal received, terminating");
1056 process::exit(124);
1057 }
1058 seen_signal = true;
1059 request_vm_interrupt(&tokens_clone, "SIGINT");
1060 eprintln!("{FIRST_SIGNAL_MESSAGE}");
1061 }
1062 }
1063 });
1064 tokens
1065}
1066
1067fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
1068 if let Ok(mut signal) = tokens.signal_token.lock() {
1069 *signal = Some(signal_name.to_string());
1070 }
1071 tokens.cancel_token.store(true, Ordering::SeqCst);
1072}
1073
1074pub async fn execute_run(
1080 path: &str,
1081 trace: bool,
1082 denied_builtins: HashSet<String>,
1083 script_argv: Vec<String>,
1084 skill_dirs_raw: Vec<String>,
1085 llm_mock_mode: CliLlmMockMode,
1086 attestation: Option<RunAttestationOptions>,
1087 profile: RunProfileOptions,
1088) -> RunOutcome {
1089 execute_run_with_harnpack_and_sandbox_options(
1090 path,
1091 trace,
1092 denied_builtins,
1093 script_argv,
1094 skill_dirs_raw,
1095 llm_mock_mode,
1096 attestation,
1097 profile,
1098 RunSandboxOptions::default(),
1099 HarnpackRunOptions::default(),
1100 )
1101 .await
1102}
1103
1104#[allow(clippy::too_many_arguments)]
1108pub async fn execute_run_with_sandbox_options(
1109 path: &str,
1110 trace: bool,
1111 denied_builtins: HashSet<String>,
1112 script_argv: Vec<String>,
1113 skill_dirs_raw: Vec<String>,
1114 llm_mock_mode: CliLlmMockMode,
1115 attestation: Option<RunAttestationOptions>,
1116 profile: RunProfileOptions,
1117 sandbox: RunSandboxOptions,
1118) -> RunOutcome {
1119 execute_run_with_harnpack_and_sandbox_options(
1120 path,
1121 trace,
1122 denied_builtins,
1123 script_argv,
1124 skill_dirs_raw,
1125 llm_mock_mode,
1126 attestation,
1127 profile,
1128 sandbox,
1129 HarnpackRunOptions::default(),
1130 )
1131 .await
1132}
1133
1134#[allow(clippy::too_many_arguments)]
1139pub async fn execute_run_with_harnpack_options(
1140 path: &str,
1141 trace: bool,
1142 denied_builtins: HashSet<String>,
1143 script_argv: Vec<String>,
1144 skill_dirs_raw: Vec<String>,
1145 llm_mock_mode: CliLlmMockMode,
1146 attestation: Option<RunAttestationOptions>,
1147 profile: RunProfileOptions,
1148 harnpack: HarnpackRunOptions,
1149) -> RunOutcome {
1150 execute_run_with_harnpack_and_sandbox_options(
1151 path,
1152 trace,
1153 denied_builtins,
1154 script_argv,
1155 skill_dirs_raw,
1156 llm_mock_mode,
1157 attestation,
1158 profile,
1159 RunSandboxOptions::default(),
1160 harnpack,
1161 )
1162 .await
1163}
1164
1165#[allow(clippy::too_many_arguments)]
1166async fn execute_run_with_harnpack_and_sandbox_options(
1167 path: &str,
1168 trace: bool,
1169 denied_builtins: HashSet<String>,
1170 script_argv: Vec<String>,
1171 skill_dirs_raw: Vec<String>,
1172 llm_mock_mode: CliLlmMockMode,
1173 attestation: Option<RunAttestationOptions>,
1174 profile: RunProfileOptions,
1175 sandbox: RunSandboxOptions,
1176 harnpack: HarnpackRunOptions,
1177) -> RunOutcome {
1178 execute_run_inner(ExecuteRunInputs {
1179 path,
1180 trace,
1181 denied_builtins,
1182 script_argv,
1183 skill_dirs_raw,
1184 llm_mock_mode,
1185 attestation,
1186 profile,
1187 sandbox,
1188 interrupt_tokens: None,
1189 json: None,
1190 aux: RunAuxOptions::default(),
1191 timing: None,
1192 harnpack,
1193 })
1194 .await
1195}
1196
1197#[allow(clippy::too_many_arguments)]
1203pub async fn execute_run_json(
1204 path: &str,
1205 trace: bool,
1206 denied_builtins: HashSet<String>,
1207 script_argv: Vec<String>,
1208 skill_dirs_raw: Vec<String>,
1209 llm_mock_mode: CliLlmMockMode,
1210 attestation: Option<RunAttestationOptions>,
1211 profile: RunProfileOptions,
1212 out: Box<dyn io::Write + Send>,
1213 options: RunJsonOptions,
1214) -> RunOutcome {
1215 execute_run_inner(ExecuteRunInputs {
1216 path,
1217 trace,
1218 denied_builtins,
1219 script_argv,
1220 skill_dirs_raw,
1221 llm_mock_mode,
1222 attestation,
1223 profile,
1224 sandbox: RunSandboxOptions::default(),
1225 interrupt_tokens: None,
1226 json: Some((options, out)),
1227 aux: RunAuxOptions::default(),
1228 timing: None,
1229 harnpack: HarnpackRunOptions::default(),
1230 })
1231 .await
1232}
1233
1234pub(crate) async fn execute_run_with_timing(
1238 path: &str,
1239 script_argv: Vec<String>,
1240 timing: Option<&mut RunTiming>,
1241 sandbox: RunSandboxOptions,
1242) -> RunOutcome {
1243 execute_run_inner(ExecuteRunInputs {
1244 path,
1245 trace: false,
1246 denied_builtins: HashSet::new(),
1247 script_argv,
1248 skill_dirs_raw: Vec::new(),
1249 llm_mock_mode: CliLlmMockMode::Off,
1250 attestation: None,
1251 profile: RunProfileOptions::default(),
1252 sandbox,
1253 interrupt_tokens: None,
1254 json: None,
1255 aux: RunAuxOptions::default(),
1256 timing,
1257 harnpack: HarnpackRunOptions::default(),
1258 })
1259 .await
1260}
1261
1262#[allow(clippy::needless_option_as_deref)]
1265async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
1266 let ExecuteRunInputs {
1267 path,
1268 trace,
1269 denied_builtins,
1270 script_argv,
1271 skill_dirs_raw,
1272 llm_mock_mode,
1273 attestation,
1274 profile,
1275 sandbox,
1276 interrupt_tokens,
1277 json,
1278 aux,
1279 timing,
1280 harnpack,
1281 } = inputs;
1282 let RunAuxOptions {
1283 summary,
1284 phase,
1285 rusage,
1286 } = aux;
1287 let run_started = Instant::now();
1288 let cpu_started_ms = rusage.as_ref().map(|_| time::cpu_ms());
1289 let mut owned_timing = if timing.is_none() && (phase.is_some() || rusage.is_some()) {
1290 Some(RunTiming::default())
1291 } else {
1292 None
1293 };
1294 let mut timing = timing.or(owned_timing.as_mut());
1295
1296 let json_session = json.map(|(options, out)| JsonRunSession::install(options, out));
1302
1303 let mut stderr = String::new();
1304 let mut stdout = String::new();
1305
1306 let owned_run_path: String;
1311 let resolved_path: &str = if harnpack::looks_like_harnpack(Path::new(path)) {
1312 let outcome = match harnpack::prepare_harnpack(Path::new(path), &harnpack, &mut stderr) {
1313 Ok(prepared) => prepared,
1314 Err(err) => {
1315 return finalize_harnpack_error(
1316 stderr,
1317 json_session,
1318 summary.as_ref(),
1319 phase.as_ref(),
1320 rusage.as_ref(),
1321 run_started,
1322 err,
1323 );
1324 }
1325 };
1326 harn_vm::run_events::emit(harn_vm::run_events::RunEvent::PackRun {
1327 bundle_hash: outcome.bundle_hash.clone(),
1328 signature_verified: outcome.signature_verified,
1329 key_id: outcome.key_id.clone(),
1330 cache_hit: outcome.cache_hit,
1331 dry_run_verify: harnpack.dry_run_verify,
1332 });
1333 if harnpack.dry_run_verify {
1334 return finalize_harnpack_dry_run(
1335 stderr,
1336 json_session,
1337 summary.as_ref(),
1338 phase.as_ref(),
1339 rusage.as_ref(),
1340 run_started,
1341 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1342 &outcome,
1343 );
1344 }
1345 owned_run_path = outcome.entrypoint_path.to_string_lossy().into_owned();
1346 owned_run_path.as_str()
1347 } else {
1348 path
1349 };
1350
1351 let Some(LoadedChunk { source, chunk }) =
1352 compile_or_load_chunk_with_timing(resolved_path, &mut stderr, timing.as_deref_mut())
1353 else {
1354 let message = stderr.clone();
1355 return finalize_run_error(
1356 stdout,
1357 stderr,
1358 json_session,
1359 summary.as_ref(),
1360 phase.as_ref(),
1361 rusage.as_ref(),
1362 run_started,
1363 None,
1364 timing.as_deref(),
1365 0,
1366 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1367 "compile_error",
1368 message,
1369 );
1370 };
1371 let path = resolved_path;
1372
1373 let setup_start = Instant::now();
1377
1378 if trace || summary.is_some() {
1379 harn_vm::llm::enable_tracing();
1380 }
1381 if profile.is_enabled() || phase.is_some() {
1382 harn_vm::tracing::set_tracing_enabled(true);
1383 }
1384 if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
1385 stderr.push_str(&format!("error: {error}\n"));
1386 return finalize_run_error(
1387 stdout,
1388 stderr,
1389 json_session,
1390 summary.as_ref(),
1391 phase.as_ref(),
1392 rusage.as_ref(),
1393 run_started,
1394 None,
1395 timing.as_deref(),
1396 0,
1397 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1398 "llm_mock_install",
1399 error,
1400 );
1401 }
1402
1403 let mut vm = harn_vm::Vm::new();
1404 if let Some(interrupt_tokens) = interrupt_tokens {
1405 vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
1406 vm.install_cancel_token(interrupt_tokens.cancel_token);
1407 }
1408 harn_vm::register_vm_stdlib_with_deferred_llm(&mut vm);
1409 crate::install_default_hostlib(&mut vm);
1410 let source_parent = std::path::Path::new(path)
1411 .parent()
1412 .unwrap_or(std::path::Path::new("."));
1413 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1415 let store_base = project_root.as_deref().unwrap_or(source_parent);
1416 let sandbox_root = sandbox
1417 .workspace_root
1418 .clone()
1419 .unwrap_or_else(|| default_run_workspace_root(project_root.as_deref(), source_parent));
1420 let _sandbox_scope = install_run_sandbox_scope(&sandbox, &sandbox_root, &mut stderr);
1421 let attestation_started_at_ms = now_ms();
1422 let attestation_log = if attestation.is_some() {
1423 Some(harn_vm::event_log::install_memory_for_current_thread(256))
1424 } else {
1425 None
1426 };
1427 if let Some(log) = attestation_log.as_ref() {
1428 append_run_provenance_event(
1429 log,
1430 "started",
1431 serde_json::json!({
1432 "pipeline": path,
1433 "argv": &script_argv,
1434 "project_root": store_base.display().to_string(),
1435 "sandbox": run_sandbox_attestation(&sandbox),
1436 }),
1437 )
1438 .await;
1439 }
1440 harn_vm::register_store_builtins(&mut vm, store_base);
1441 harn_vm::register_metadata_builtins(&mut vm, store_base);
1442 let pipeline_name = std::path::Path::new(path)
1443 .file_stem()
1444 .and_then(|s| s.to_str())
1445 .unwrap_or("default");
1446 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1447 vm.set_source_info(path, &source);
1448 if !denied_builtins.is_empty() {
1449 vm.set_denied_builtins(denied_builtins);
1450 }
1451 if let Some(ref root) = project_root {
1452 vm.set_project_root(root);
1453 }
1454
1455 if let Some(p) = std::path::Path::new(path).parent() {
1456 if !p.as_os_str().is_empty() {
1457 vm.set_source_dir(p);
1458 }
1459 }
1460
1461 let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
1464 let loaded = load_skills(&SkillLoaderInputs {
1465 cli_dirs,
1466 source_path: Some(std::path::PathBuf::from(path)),
1467 });
1468 emit_loader_warnings(&loaded.loader_warnings);
1469 install_skills_global(&mut vm, &loaded);
1470
1471 let argv_values: Vec<harn_vm::VmValue> = script_argv
1474 .iter()
1475 .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
1476 .collect();
1477 vm.set_global(
1478 "argv",
1479 harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
1480 );
1481
1482 vm.set_harness(harn_vm::Harness::real());
1486
1487 let extensions = package::load_runtime_extensions(Path::new(path));
1488 package::install_runtime_extensions(&extensions);
1489 if let Some(manifest) = extensions.root_manifest.as_ref() {
1490 if !manifest.mcp.is_empty() {
1491 connect_mcp_servers(&manifest.mcp, &mut vm).await;
1492 }
1493 }
1494 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1495 stderr.push_str(&format!(
1496 "error: failed to install manifest triggers: {error}\n"
1497 ));
1498 return finalize_run_error(
1499 stdout,
1500 stderr,
1501 json_session,
1502 summary.as_ref(),
1503 phase.as_ref(),
1504 rusage.as_ref(),
1505 run_started,
1506 None,
1507 timing.as_deref(),
1508 0,
1509 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1510 "manifest_triggers",
1511 error.to_string(),
1512 );
1513 }
1514 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1515 stderr.push_str(&format!(
1516 "error: failed to install manifest hooks: {error}\n"
1517 ));
1518 return finalize_run_error(
1519 stdout,
1520 stderr,
1521 json_session,
1522 summary.as_ref(),
1523 phase.as_ref(),
1524 rusage.as_ref(),
1525 run_started,
1526 None,
1527 timing.as_deref(),
1528 0,
1529 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1530 "manifest_hooks",
1531 error.to_string(),
1532 );
1533 }
1534
1535 let local = tokio::task::LocalSet::new();
1537 if let Some(t) = timing.as_deref_mut() {
1538 t.run_setup = setup_start.elapsed();
1539 }
1540 let main_start = Instant::now();
1541 let execution = local
1542 .run_until(async {
1543 match vm.execute(&chunk).await {
1544 Ok(value) => Ok((vm.output(), value)),
1545 Err(e) => Err(vm.format_runtime_error(&e)),
1546 }
1547 })
1548 .await;
1549 if let Some(t) = timing.as_deref_mut() {
1550 t.run_main = main_start.elapsed();
1551 }
1552 if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
1553 stderr.push_str(&format!("error: {error}\n"));
1554 let profile_rollup = if profile.is_enabled() {
1555 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1556 } else {
1557 None
1558 };
1559 return finalize_run_error(
1560 stdout,
1561 stderr,
1562 json_session,
1563 summary.as_ref(),
1564 phase.as_ref(),
1565 rusage.as_ref(),
1566 run_started,
1567 profile_rollup.as_ref(),
1568 timing.as_deref(),
1569 harn_vm::tracing::peek_spans().len() as u64,
1570 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1571 "llm_mock_record",
1572 error,
1573 );
1574 }
1575
1576 let buffered_stderr = harn_vm::take_stderr_buffer();
1578 stderr.push_str(&buffered_stderr);
1579
1580 let exit_code = match &execution {
1581 Ok((_, return_value)) => exit_code_from_return_value(return_value),
1582 Err(_) => 1,
1583 };
1584
1585 if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
1586 if let Err(error) = emit_run_attestation(
1587 log,
1588 path,
1589 store_base,
1590 attestation_started_at_ms,
1591 exit_code,
1592 options,
1593 &mut stderr,
1594 )
1595 .await
1596 {
1597 stderr.push_str(&format!(
1598 "error: failed to emit provenance receipt: {error}\n"
1599 ));
1600 let profile_rollup = if profile.is_enabled() {
1601 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1602 } else {
1603 None
1604 };
1605 return finalize_run_error(
1606 stdout,
1607 stderr,
1608 json_session,
1609 summary.as_ref(),
1610 phase.as_ref(),
1611 rusage.as_ref(),
1612 run_started,
1613 profile_rollup.as_ref(),
1614 timing.as_deref(),
1615 harn_vm::tracing::peek_spans().len() as u64,
1616 cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start)),
1617 "attestation",
1618 error,
1619 );
1620 }
1621 harn_vm::event_log::reset_active_event_log();
1622 }
1623
1624 match execution {
1625 Ok((output, return_value)) => {
1626 stdout.push_str(output);
1627 let main_events = harn_vm::tracing::peek_spans().len() as u64;
1628 let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1629 let profile_rollup = if profile.is_enabled() {
1630 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1631 } else {
1632 None
1633 };
1634 let summary_llm = summary.as_ref().map(|_| run_summary_llm_snapshot());
1635 if trace {
1636 stderr.push_str(&render_trace_summary());
1637 }
1638 if let Some(profile_rollup) = profile_rollup.as_ref() {
1639 if let Err(error) =
1640 render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1641 {
1642 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1643 }
1644 }
1645 if exit_code != 0 {
1646 stderr.push_str(&render_return_value_error(&return_value));
1647 }
1648 let aux_emission = emit_run_aux_for_exit(
1649 summary.as_ref(),
1650 phase.as_ref(),
1651 rusage.as_ref(),
1652 run_started,
1653 exit_code,
1654 profile_rollup.as_ref(),
1655 summary_llm,
1656 timing.as_deref(),
1657 main_events,
1658 cpu_ms_total,
1659 json_session.is_some(),
1660 &mut stderr,
1661 );
1662 if let Some(session) = json_session {
1663 if let Some(error) = aux_emission.error {
1664 let mut outcome = session.finalize_error(
1665 "run_aux",
1666 format!("failed to emit auxiliary run JSON: {error}"),
1667 1,
1668 );
1669 outcome.stderr = aux_emission.stderr;
1670 return outcome;
1671 }
1672 let value = harn_vm::llm::vm_value_to_json(&return_value);
1673 let mut outcome = session.finalize_result(value, aux_emission.exit_code);
1674 outcome.stderr = aux_emission.stderr;
1675 return outcome;
1676 }
1677 RunOutcome {
1678 stdout,
1679 stderr,
1680 exit_code: aux_emission.exit_code,
1681 }
1682 }
1683 Err(rendered_error) => {
1684 stderr.push_str(&rendered_error);
1685 let main_events = harn_vm::tracing::peek_spans().len() as u64;
1686 let cpu_ms_total = cpu_started_ms.map(|start| time::cpu_ms().saturating_sub(start));
1687 let profile_rollup = if profile.is_enabled() {
1688 Some(harn_vm::profile::build(&harn_vm::tracing::peek_spans()))
1689 } else {
1690 None
1691 };
1692 if let Some(profile_rollup) = profile_rollup.as_ref() {
1693 if let Err(error) =
1694 render_and_persist_profile_rollup(&profile, profile_rollup, &mut stderr)
1695 {
1696 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
1697 }
1698 }
1699 let aux_emission = emit_run_aux_for_exit(
1700 summary.as_ref(),
1701 phase.as_ref(),
1702 rusage.as_ref(),
1703 run_started,
1704 1,
1705 profile_rollup.as_ref(),
1706 None,
1707 timing.as_deref(),
1708 main_events,
1709 cpu_ms_total,
1710 json_session.is_some(),
1711 &mut stderr,
1712 );
1713 if let Some(session) = json_session {
1714 let mut outcome =
1715 session.finalize_error("runtime", rendered_error, aux_emission.exit_code);
1716 outcome.stderr = aux_emission.stderr;
1717 return outcome;
1718 }
1719 RunOutcome {
1720 stdout,
1721 stderr,
1722 exit_code: aux_emission.exit_code,
1723 }
1724 }
1725 }
1726}
1727
1728fn render_and_persist_profile_rollup(
1729 options: &RunProfileOptions,
1730 profile: &harn_vm::profile::RunProfile,
1731 stderr: &mut String,
1732) -> Result<(), String> {
1733 if options.text {
1734 stderr.push_str(&harn_vm::profile::render(profile));
1735 }
1736 if let Some(path) = options.json_path.as_ref() {
1737 if let Some(parent) = path.parent() {
1738 if !parent.as_os_str().is_empty() {
1739 fs::create_dir_all(parent)
1740 .map_err(|error| format!("create {}: {error}", parent.display()))?;
1741 }
1742 }
1743 let json = serde_json::to_string_pretty(profile)
1744 .map_err(|error| format!("serialize profile: {error}"))?;
1745 fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
1746 }
1747 Ok(())
1748}
1749
1750fn build_run_summary<'a>(
1751 started: Instant,
1752 exit_code: i32,
1753 profile: Option<&'a harn_vm::profile::RunProfile>,
1754 llm: RunSummaryLlm,
1755) -> RunSummary<'a> {
1756 RunSummary {
1757 schema_version: RUN_SUMMARY_SCHEMA_VERSION,
1758 event: "run_summary",
1759 wall_time_ms: started.elapsed().as_millis().min(u128::from(u64::MAX)) as u64,
1760 exit_code,
1761 llm,
1762 profile,
1763 }
1764}
1765
1766fn run_summary_llm_snapshot() -> RunSummaryLlm {
1767 let (input_tokens, output_tokens, time_ms, call_count) = harn_vm::llm::peek_trace_summary();
1768 let cost_usd = harn_vm::llm::peek_total_cost();
1769 RunSummaryLlm {
1770 call_count,
1771 input_tokens,
1772 output_tokens,
1773 time_ms,
1774 cost_usd: if cost_usd.is_finite() { cost_usd } else { 0.0 },
1775 }
1776}
1777
1778struct RunAuxEmission {
1779 stderr: String,
1780 exit_code: i32,
1781 error: Option<String>,
1782}
1783
1784#[allow(clippy::too_many_arguments)]
1785fn emit_run_aux_for_exit(
1786 summary: Option<&RunSummaryOptions>,
1787 phase: Option<&RunPhaseOptions>,
1788 rusage: Option<&RunRusageOptions>,
1789 started: Instant,
1790 exit_code: i32,
1791 profile: Option<&harn_vm::profile::RunProfile>,
1792 llm: Option<RunSummaryLlm>,
1793 timing: Option<&RunTiming>,
1794 main_events: u64,
1795 cpu_ms_total: Option<u64>,
1796 json_mode: bool,
1797 stderr: &mut String,
1798) -> RunAuxEmission {
1799 let mut aux_stderr = String::new();
1800 let mut final_exit_code = exit_code;
1801 let mut aux_error = None;
1802 let aux_target = if json_mode { &mut aux_stderr } else { stderr };
1803 let default_timing = RunTiming::default();
1804 let timing = timing.unwrap_or(&default_timing);
1805
1806 if let Some(options) = summary {
1807 let llm = llm.unwrap_or_else(run_summary_llm_snapshot);
1808 let summary = build_run_summary(started, exit_code, profile, llm);
1809 if let Err(error) = emit_raw_json_line(&options.sink, &summary, "run summary", aux_target) {
1810 record_aux_error(
1811 &mut final_exit_code,
1812 &mut aux_error,
1813 aux_target,
1814 "run summary",
1815 error,
1816 );
1817 }
1818 }
1819 if let Some(options) = phase {
1820 let phase_event = RunPhaseEvent {
1821 schema_version: RUN_PHASE_SCHEMA_VERSION,
1822 event: "run_phase",
1823 phases: time::build_phase_records(timing, main_events),
1824 };
1825 if let Err(error) = emit_raw_json_line(&options.sink, &phase_event, "run phase", aux_target)
1826 {
1827 record_aux_error(
1828 &mut final_exit_code,
1829 &mut aux_error,
1830 aux_target,
1831 "run phase",
1832 error,
1833 );
1834 }
1835 }
1836 if let Some(options) = rusage {
1837 let rusage_event = RunRusageEvent {
1838 schema_version: RUN_RUSAGE_SCHEMA_VERSION,
1839 event: "run_rusage",
1840 cpu_ms: cpu_ms_total.unwrap_or(0),
1841 };
1842 if let Err(error) =
1843 emit_raw_json_line(&options.sink, &rusage_event, "run rusage", aux_target)
1844 {
1845 record_aux_error(
1846 &mut final_exit_code,
1847 &mut aux_error,
1848 aux_target,
1849 "run rusage",
1850 error,
1851 );
1852 }
1853 }
1854
1855 RunAuxEmission {
1856 stderr: aux_stderr,
1857 exit_code: final_exit_code,
1858 error: aux_error,
1859 }
1860}
1861
1862fn record_aux_error(
1863 final_exit_code: &mut i32,
1864 aux_error: &mut Option<String>,
1865 stderr: &mut String,
1866 label: &str,
1867 error: String,
1868) {
1869 stderr.push_str(&format!("error: failed to emit {label}: {error}\n"));
1870 if *final_exit_code == 0 {
1871 *final_exit_code = 1;
1872 }
1873 if aux_error.is_none() {
1874 *aux_error = Some(error);
1875 }
1876}
1877
1878fn emit_raw_json_line(
1879 sink: &RunJsonSink,
1880 value: &impl Serialize,
1881 label: &str,
1882 stderr: &mut String,
1883) -> Result<(), String> {
1884 let line =
1885 serde_json::to_string(value).map_err(|error| format!("serialize {label}: {error}"))? + "\n";
1886 match &sink.target {
1887 RunJsonSinkTarget::Stderr => {
1888 stderr.push_str(&line);
1889 Ok(())
1890 }
1891 RunJsonSinkTarget::File(path) => write_raw_json_file(path, &line),
1892 RunJsonSinkTarget::Fd(fd) => write_raw_json_fd(*fd, &line, sink.fd_flag),
1893 }
1894}
1895
1896fn write_raw_json_file(path: &Path, line: &str) -> Result<(), String> {
1897 if let Some(parent) = path.parent() {
1898 if !parent.as_os_str().is_empty() {
1899 fs::create_dir_all(parent)
1900 .map_err(|error| format!("create {}: {error}", parent.display()))?;
1901 }
1902 }
1903 fs::write(path, line).map_err(|error| format!("write {}: {error}", path.display()))
1904}
1905
1906#[cfg(unix)]
1907fn write_raw_json_fd(fd: i32, line: &str, flag: &str) -> Result<(), String> {
1908 use std::fs::File;
1909 use std::os::unix::io::FromRawFd;
1910
1911 if fd < 0 {
1912 return Err(format!("invalid {flag} {fd}: must be non-negative"));
1913 }
1914 let duped = unsafe { libc::dup(fd) };
1915 if duped < 0 {
1916 return Err(format!(
1917 "duplicate {flag} {fd}: {}",
1918 io::Error::last_os_error()
1919 ));
1920 }
1921 let mut file = unsafe { File::from_raw_fd(duped) };
1922 file.write_all(line.as_bytes())
1923 .and_then(|_| file.flush())
1924 .map_err(|error| format!("write {flag} {fd}: {error}"))
1925}
1926
1927#[cfg(not(unix))]
1928fn write_raw_json_fd(_fd: i32, _line: &str, flag: &str) -> Result<(), String> {
1929 Err(format!("{flag} is only supported on Unix platforms"))
1930}
1931
1932async fn append_run_provenance_event(
1933 log: &Arc<harn_vm::event_log::AnyEventLog>,
1934 kind: &str,
1935 payload: serde_json::Value,
1936) {
1937 let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
1938 return;
1939 };
1940 let _ = log
1941 .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
1942 .await;
1943}
1944
1945async fn emit_run_attestation(
1946 log: &Arc<harn_vm::event_log::AnyEventLog>,
1947 path: &str,
1948 store_base: &Path,
1949 started_at_ms: i64,
1950 exit_code: i32,
1951 options: &RunAttestationOptions,
1952 stderr: &mut String,
1953) -> Result<(), String> {
1954 let finished_at_ms = now_ms();
1955 let status = if exit_code == 0 { "success" } else { "failure" };
1956 append_run_provenance_event(
1957 log,
1958 "finished",
1959 serde_json::json!({
1960 "pipeline": path,
1961 "status": status,
1962 "exit_code": exit_code,
1963 }),
1964 )
1965 .await;
1966 log.flush()
1967 .await
1968 .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
1969 let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
1970 .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
1971 let (signing_key, key_id) =
1972 harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
1973 .await
1974 .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
1975 let receipt = harn_vm::build_signed_receipt(
1976 log,
1977 harn_vm::ReceiptBuildOptions {
1978 pipeline: path.to_string(),
1979 status: status.to_string(),
1980 started_at_ms,
1981 finished_at_ms,
1982 exit_code,
1983 producer_name: "harn-cli".to_string(),
1984 producer_version: env!("CARGO_PKG_VERSION").to_string(),
1985 },
1986 &signing_key,
1987 key_id,
1988 )
1989 .await
1990 .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
1991 let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
1992 if let Some(parent) = receipt_path.parent() {
1993 fs::create_dir_all(parent)
1994 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
1995 }
1996 let encoded = serde_json::to_vec_pretty(&receipt)
1997 .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
1998 fs::write(&receipt_path, encoded)
1999 .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
2000 stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
2001 Ok(())
2002}
2003
2004fn receipt_output_path(
2005 store_base: &Path,
2006 options: &RunAttestationOptions,
2007 receipt_id: &str,
2008) -> PathBuf {
2009 if let Some(path) = options.receipt_out.as_ref() {
2010 return path.clone();
2011 }
2012 harn_vm::runtime_paths::state_root(store_base)
2013 .join("receipts")
2014 .join(format!("{receipt_id}.json"))
2015}
2016
2017fn now_ms() -> i64 {
2018 std::time::SystemTime::now()
2019 .duration_since(std::time::UNIX_EPOCH)
2020 .map(|duration| duration.as_millis() as i64)
2021 .unwrap_or(0)
2022}
2023
2024fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
2031 use harn_vm::VmValue;
2032 match value {
2033 VmValue::Int(n) => (*n).clamp(0, 255) as i32,
2034 VmValue::EnumVariant(enum_variant) if enum_variant.is_variant("Result", "Err") => 1,
2035 _ => 0,
2036 }
2037}
2038
2039struct JsonRunSession {
2053 emitter: self::json_events::NdjsonEmitter,
2054 prior_sink: Option<Arc<dyn harn_vm::run_events::RunEventSink>>,
2055}
2056
2057impl JsonRunSession {
2058 fn install(options: RunJsonOptions, out: Box<dyn io::Write + Send>) -> Self {
2059 let emitter = NdjsonEmitter::new(out, options.quiet);
2060 let prior_sink = harn_vm::run_events::install_sink(emitter.sink());
2061 Self {
2062 emitter,
2063 prior_sink,
2064 }
2065 }
2066
2067 fn finalize_result(self, value: serde_json::Value, exit_code: i32) -> RunOutcome {
2068 self.emitter.emit_result(value, exit_code);
2069 RunOutcome {
2070 stdout: String::new(),
2071 stderr: String::new(),
2072 exit_code,
2073 }
2074 }
2075
2076 fn finalize_error(
2077 self,
2078 code: impl Into<String>,
2079 message: impl Into<String>,
2080 exit_code: i32,
2081 ) -> RunOutcome {
2082 self.emitter.emit_error(code, message);
2083 RunOutcome {
2084 stdout: String::new(),
2085 stderr: String::new(),
2086 exit_code,
2087 }
2088 }
2089}
2090
2091impl Drop for JsonRunSession {
2092 fn drop(&mut self) {
2093 match self.prior_sink.take() {
2094 Some(prior) => {
2095 harn_vm::run_events::install_sink(prior);
2096 }
2097 None => harn_vm::run_events::clear_sink(),
2098 }
2099 }
2100}
2101
2102#[allow(clippy::too_many_arguments)]
2103fn finalize_run_error(
2104 stdout: String,
2105 mut stderr: String,
2106 json_session: Option<JsonRunSession>,
2107 summary: Option<&RunSummaryOptions>,
2108 phase: Option<&RunPhaseOptions>,
2109 rusage: Option<&RunRusageOptions>,
2110 started: Instant,
2111 profile: Option<&harn_vm::profile::RunProfile>,
2112 timing: Option<&RunTiming>,
2113 main_events: u64,
2114 cpu_ms_total: Option<u64>,
2115 code: impl Into<String>,
2116 message: impl Into<String>,
2117) -> RunOutcome {
2118 let aux_emission = emit_run_aux_for_exit(
2119 summary,
2120 phase,
2121 rusage,
2122 started,
2123 1,
2124 profile,
2125 None,
2126 timing,
2127 main_events,
2128 cpu_ms_total,
2129 json_session.is_some(),
2130 &mut stderr,
2131 );
2132 if let Some(session) = json_session {
2133 let mut outcome = session.finalize_error(code, message, aux_emission.exit_code);
2134 outcome.stderr = aux_emission.stderr;
2135 return outcome;
2136 }
2137 RunOutcome {
2138 stdout,
2139 stderr,
2140 exit_code: aux_emission.exit_code,
2141 }
2142}
2143
2144fn finalize_harnpack_error(
2149 mut stderr: String,
2150 json_session: Option<JsonRunSession>,
2151 summary: Option<&RunSummaryOptions>,
2152 phase: Option<&RunPhaseOptions>,
2153 rusage: Option<&RunRusageOptions>,
2154 started: Instant,
2155 err: HarnpackError,
2156) -> RunOutcome {
2157 let code = err.code;
2158 let message = err.message;
2159 stderr.push_str(&format!("error: {message}\n"));
2160 finalize_run_error(
2161 String::new(),
2162 stderr,
2163 json_session,
2164 summary,
2165 phase,
2166 rusage,
2167 started,
2168 None,
2169 None,
2170 0,
2171 None,
2172 code,
2173 message,
2174 )
2175}
2176
2177fn finalize_harnpack_dry_run(
2182 mut stderr: String,
2183 json_session: Option<JsonRunSession>,
2184 summary_options: Option<&RunSummaryOptions>,
2185 phase_options: Option<&RunPhaseOptions>,
2186 rusage_options: Option<&RunRusageOptions>,
2187 started: Instant,
2188 cpu_ms_total: Option<u64>,
2189 prepared: &PreparedHarnpack,
2190) -> RunOutcome {
2191 let summary = format!(
2192 "[harn] harnpack verify ok: bundle_hash={}, signature_verified={}, cache_hit={}\n",
2193 prepared.bundle_hash, prepared.signature_verified, prepared.cache_hit
2194 );
2195 stderr.push_str(&summary);
2196 let aux_emission = emit_run_aux_for_exit(
2197 summary_options,
2198 phase_options,
2199 rusage_options,
2200 started,
2201 0,
2202 None,
2203 None,
2204 None,
2205 0,
2206 cpu_ms_total,
2207 json_session.is_some(),
2208 &mut stderr,
2209 );
2210 if let Some(session) = json_session {
2211 if let Some(error) = aux_emission.error {
2212 let mut outcome = session.finalize_error(
2213 "run_aux",
2214 format!("failed to emit auxiliary run JSON: {error}"),
2215 1,
2216 );
2217 outcome.stderr = aux_emission.stderr;
2218 return outcome;
2219 }
2220 let value = serde_json::json!({
2221 "bundle_hash": prepared.bundle_hash,
2222 "signature_verified": prepared.signature_verified,
2223 "key_id": prepared.key_id,
2224 "cache_hit": prepared.cache_hit,
2225 "dry_run_verify": true,
2226 });
2227 let mut outcome = session.finalize_result(value, aux_emission.exit_code);
2228 outcome.stderr = aux_emission.stderr;
2229 return outcome;
2230 }
2231 RunOutcome {
2232 stdout: String::new(),
2233 stderr,
2234 exit_code: aux_emission.exit_code,
2235 }
2236}
2237
2238fn render_return_value_error(value: &harn_vm::VmValue) -> String {
2239 let harn_vm::VmValue::EnumVariant(enum_variant) = value else {
2240 return String::new();
2241 };
2242 if !enum_variant.is_variant("Result", "Err") {
2243 return String::new();
2244 }
2245 let rendered = enum_variant
2246 .fields
2247 .first()
2248 .map(|p| p.display())
2249 .unwrap_or_default();
2250 if rendered.is_empty() {
2251 "error\n".to_string()
2252 } else if rendered.ends_with('\n') {
2253 rendered
2254 } else {
2255 format!("{rendered}\n")
2256 }
2257}
2258
2259pub(crate) async fn connect_mcp_servers(
2268 servers: &[package::McpServerConfig],
2269 vm: &mut harn_vm::Vm,
2270) {
2271 use std::collections::BTreeMap;
2272 use std::rc::Rc;
2273 use std::time::Duration;
2274
2275 let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
2276 let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
2277
2278 for server in servers {
2279 let resolved_auth = match mcp::resolve_auth_for_server(server).await {
2280 Ok(resolution) => resolution,
2281 Err(error) => {
2282 eprintln!(
2283 "warning: mcp: failed to load auth for '{}': {}",
2284 server.name, error
2285 );
2286 AuthResolution::None
2287 }
2288 };
2289 let spec = serde_json::json!({
2290 "name": server.name,
2291 "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
2292 "command": server.command,
2293 "args": server.args,
2294 "env": server.env,
2295 "url": server.url,
2296 "auth_token": match resolved_auth {
2297 AuthResolution::Bearer(token) => Some(token),
2298 AuthResolution::None => server.auth_token.clone(),
2299 },
2300 "protocol_version": server.protocol_version,
2301 "protocol_mode": server.protocol_mode,
2302 "proxy_server_name": server.proxy_server_name,
2303 });
2304
2305 registrations.push(harn_vm::RegisteredMcpServer {
2308 name: server.name.clone(),
2309 spec: spec.clone(),
2310 lazy: server.lazy,
2311 card: server.card.clone(),
2312 keep_alive: server.keep_alive_ms.map(Duration::from_millis),
2313 });
2314
2315 if server.lazy {
2316 eprintln!(
2317 "[harn] mcp: deferred '{}' (lazy, boots on first use)",
2318 server.name
2319 );
2320 continue;
2321 }
2322
2323 match harn_vm::connect_mcp_server_from_json(&spec).await {
2324 Ok(handle) => {
2325 eprintln!("[harn] mcp: connected to '{}'", server.name);
2326 harn_vm::mcp_install_active(&server.name, handle.clone());
2327 mcp_dict.insert(server.name.clone(), harn_vm::VmValue::mcp_client(handle));
2328 }
2329 Err(e) => {
2330 eprintln!(
2331 "warning: mcp: failed to connect to '{}': {}",
2332 server.name, e
2333 );
2334 }
2335 }
2336 }
2337
2338 harn_vm::mcp_register_servers(registrations);
2341
2342 if !mcp_dict.is_empty() {
2343 vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
2344 }
2345}
2346
2347pub(crate) fn render_trace_summary() -> String {
2348 use std::fmt::Write;
2349 let entries = harn_vm::llm::take_trace();
2350 if entries.is_empty() {
2351 return String::new();
2352 }
2353 let mut out = String::new();
2354 let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
2355 let mut total_input = 0i64;
2356 let mut total_output = 0i64;
2357 let mut total_ms = 0u64;
2358 for (i, entry) in entries.iter().enumerate() {
2359 let _ = writeln!(
2360 out,
2361 " #{}: {} | {} in + {} out tokens | {} ms",
2362 i + 1,
2363 entry.model,
2364 entry.input_tokens,
2365 entry.output_tokens,
2366 entry.duration_ms,
2367 );
2368 total_input += entry.input_tokens;
2369 total_output += entry.output_tokens;
2370 total_ms += entry.duration_ms;
2371 }
2372 let total_tokens = total_input + total_output;
2373 let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
2375 let _ = writeln!(
2376 out,
2377 " \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
2378 entries.len(),
2379 if entries.len() == 1 { "" } else { "s" },
2380 total_tokens,
2381 total_input,
2382 total_output,
2383 total_ms,
2384 cost,
2385 );
2386 out
2387}
2388
2389pub(crate) async fn run_file_mcp_serve(
2403 path: &str,
2404 card_source: Option<&str>,
2405 mode: RunFileMcpServeMode,
2406) {
2407 let mut diagnostics = String::new();
2408 let Some(LoadedChunk { source, chunk }) = compile_or_load_chunk_for_run(path, &mut diagnostics)
2409 else {
2410 eprint!("{diagnostics}");
2411 process::exit(1);
2412 };
2413 if !diagnostics.is_empty() {
2414 eprint!("{diagnostics}");
2415 }
2416
2417 let mut vm = harn_vm::Vm::new();
2418 harn_vm::register_vm_stdlib(&mut vm);
2419 crate::install_default_hostlib(&mut vm);
2420 let source_parent = std::path::Path::new(path)
2421 .parent()
2422 .unwrap_or(std::path::Path::new("."));
2423 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
2424 let store_base = project_root.as_deref().unwrap_or(source_parent);
2425 harn_vm::register_store_builtins(&mut vm, store_base);
2426 harn_vm::register_metadata_builtins(&mut vm, store_base);
2427 let pipeline_name = std::path::Path::new(path)
2428 .file_stem()
2429 .and_then(|s| s.to_str())
2430 .unwrap_or("default");
2431 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
2432 vm.set_source_info(path, &source);
2433 if let Some(ref root) = project_root {
2434 vm.set_project_root(root);
2435 }
2436 if let Some(p) = std::path::Path::new(path).parent() {
2437 if !p.as_os_str().is_empty() {
2438 vm.set_source_dir(p);
2439 }
2440 }
2441
2442 let loaded = load_skills(&SkillLoaderInputs {
2444 cli_dirs: Vec::new(),
2445 source_path: Some(std::path::PathBuf::from(path)),
2446 });
2447 emit_loader_warnings(&loaded.loader_warnings);
2448 install_skills_global(&mut vm, &loaded);
2449
2450 let extensions = package::load_runtime_extensions(Path::new(path));
2451 package::install_runtime_extensions(&extensions);
2452 if let Some(manifest) = extensions.root_manifest.as_ref() {
2453 if !manifest.mcp.is_empty() {
2454 connect_mcp_servers(&manifest.mcp, &mut vm).await;
2455 }
2456 }
2457 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
2458 eprintln!("error: failed to install manifest triggers: {error}");
2459 process::exit(1);
2460 }
2461 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
2462 eprintln!("error: failed to install manifest hooks: {error}");
2463 process::exit(1);
2464 }
2465
2466 let local = tokio::task::LocalSet::new();
2467 local
2468 .run_until(async {
2469 match vm.execute(&chunk).await {
2470 Ok(_) => {}
2471 Err(e) => {
2472 eprint!("{}", vm.format_runtime_error(&e));
2473 process::exit(1);
2474 }
2475 }
2476
2477 let output = vm.output();
2479 if !output.is_empty() {
2480 eprint!("{output}");
2481 }
2482
2483 let registry = match harn_vm::take_mcp_serve_registry() {
2484 Some(r) => r,
2485 None => {
2486 eprintln!("error: pipeline did not call mcp_serve(registry)");
2487 eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
2488 process::exit(1);
2489 }
2490 };
2491
2492 let tools = match harn_vm::tool_registry_to_mcp_tools(®istry) {
2493 Ok(t) => t,
2494 Err(e) => {
2495 eprintln!("error: {e}");
2496 process::exit(1);
2497 }
2498 };
2499
2500 let resources = harn_vm::take_mcp_serve_resources();
2501 let resource_templates = harn_vm::take_mcp_serve_resource_templates();
2502 let prompts = harn_vm::take_mcp_serve_prompts();
2503
2504 let server_name = std::path::Path::new(path)
2505 .file_stem()
2506 .and_then(|s| s.to_str())
2507 .unwrap_or("harn")
2508 .to_string();
2509
2510 let mut caps = Vec::new();
2511 if !tools.is_empty() {
2512 caps.push(format!(
2513 "{} tool{}",
2514 tools.len(),
2515 if tools.len() == 1 { "" } else { "s" }
2516 ));
2517 }
2518 let total_resources = resources.len() + resource_templates.len();
2519 if total_resources > 0 {
2520 caps.push(format!(
2521 "{total_resources} resource{}",
2522 if total_resources == 1 { "" } else { "s" }
2523 ));
2524 }
2525 if !prompts.is_empty() {
2526 caps.push(format!(
2527 "{} prompt{}",
2528 prompts.len(),
2529 if prompts.len() == 1 { "" } else { "s" }
2530 ));
2531 }
2532 eprintln!(
2533 "[harn] serve mcp: serving {} as '{server_name}'",
2534 caps.join(", ")
2535 );
2536
2537 let mut server =
2538 harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
2539 if let Some(source) = card_source {
2540 match resolve_card_source(source) {
2541 Ok(card) => server = server.with_server_card(card),
2542 Err(e) => {
2543 eprintln!("error: --card: {e}");
2544 process::exit(1);
2545 }
2546 }
2547 }
2548 match mode {
2549 RunFileMcpServeMode::Stdio => {
2550 if let Err(e) = server.run(&mut vm).await {
2551 eprintln!("error: MCP server error: {e}");
2552 process::exit(1);
2553 }
2554 }
2555 RunFileMcpServeMode::Http {
2556 options,
2557 auth_policy,
2558 } => {
2559 if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
2560 server,
2561 vm,
2562 options,
2563 auth_policy,
2564 )
2565 .await
2566 {
2567 eprintln!("error: MCP server error: {e}");
2568 process::exit(1);
2569 }
2570 }
2571 }
2572 })
2573 .await;
2574}
2575
2576pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
2581 let trimmed = source.trim_start();
2582 if trimmed.starts_with('{') || trimmed.starts_with('[') {
2583 return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
2584 }
2585 let path = std::path::Path::new(source);
2586 harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
2587}
2588
2589pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
2590 use notify::{Event, EventKind, RecursiveMode, Watcher};
2591
2592 let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
2593 eprintln!("Error: {e}");
2594 process::exit(1);
2595 });
2596 let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
2597
2598 eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
2599 run_file(
2600 path,
2601 false,
2602 denied_builtins.clone(),
2603 Vec::new(),
2604 CliLlmMockMode::Off,
2605 None,
2606 RunProfileOptions::default(),
2607 )
2608 .await;
2609
2610 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
2611 let _watcher = {
2612 let tx = tx.clone();
2613 let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
2614 if let Ok(event) = res {
2615 if matches!(
2616 event.kind,
2617 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
2618 ) {
2619 let has_harn = event
2620 .paths
2621 .iter()
2622 .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
2623 if has_harn {
2624 let _ = tx.blocking_send(());
2625 }
2626 }
2627 }
2628 })
2629 .unwrap_or_else(|e| {
2630 eprintln!("Error setting up file watcher: {e}");
2631 process::exit(1);
2632 });
2633 watcher
2634 .watch(watch_dir, RecursiveMode::Recursive)
2635 .unwrap_or_else(|e| {
2636 eprintln!("Error watching directory: {e}");
2637 process::exit(1);
2638 });
2639 watcher };
2641
2642 eprintln!(
2643 "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
2644 watch_dir.display()
2645 );
2646
2647 loop {
2648 rx.recv().await;
2649 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2651 while rx.try_recv().is_ok() {}
2652
2653 eprintln!();
2654 eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
2655 run_file(
2656 path,
2657 false,
2658 denied_builtins.clone(),
2659 Vec::new(),
2660 CliLlmMockMode::Off,
2661 None,
2662 RunProfileOptions::default(),
2663 )
2664 .await;
2665 }
2666}
2667
2668#[cfg(test)]
2669mod tests;