1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9use harn_parser::DiagnosticSeverity;
10use harn_vm::event_log::EventLog;
11
12use crate::commands::mcp::{self, AuthResolution};
13use crate::package;
14use crate::parse_source_file;
15use crate::skill_loader::{
16 canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
17 SkillLoaderInputs,
18};
19
20mod explain_cost;
21
22pub(crate) enum RunFileMcpServeMode {
23 Stdio,
24 Http {
25 options: harn_serve::McpHttpServeOptions,
26 auth_policy: harn_serve::AuthPolicy,
27 },
28}
29
30const CORE_BUILTINS: &[&str] = &[
32 "println",
33 "print",
34 "log",
35 "type_of",
36 "to_string",
37 "to_int",
38 "to_float",
39 "len",
40 "assert",
41 "assert_eq",
42 "assert_ne",
43 "json_parse",
44 "json_stringify",
45 "runtime_context",
46 "task_current",
47 "runtime_context_values",
48 "runtime_context_get",
49 "runtime_context_set",
50 "runtime_context_clear",
51];
52
53pub(crate) fn build_denied_builtins(
58 deny_csv: Option<&str>,
59 allow_csv: Option<&str>,
60) -> HashSet<String> {
61 if let Some(csv) = deny_csv {
62 csv.split(',')
63 .map(|s| s.trim().to_string())
64 .filter(|s| !s.is_empty())
65 .collect()
66 } else if let Some(csv) = allow_csv {
67 let allowed: HashSet<String> = csv
70 .split(',')
71 .map(|s| s.trim().to_string())
72 .filter(|s| !s.is_empty())
73 .collect();
74 let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
75
76 let mut tmp = harn_vm::Vm::new();
78 harn_vm::register_vm_stdlib(&mut tmp);
79 harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
80 harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
81
82 tmp.builtin_names()
83 .into_iter()
84 .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
85 .collect()
86 } else {
87 HashSet::new()
88 }
89}
90
91pub(crate) struct LoadedChunk {
95 pub(crate) source: String,
96 pub(crate) chunk: harn_vm::Chunk,
97}
98
99pub(crate) fn compile_or_load_chunk_for_run(
111 path: &str,
112 stderr: &mut String,
113) -> Option<LoadedChunk> {
114 let source = match fs::read_to_string(path) {
115 Ok(s) => s,
116 Err(e) => {
117 stderr.push_str(&format!("Error reading {path}: {e}\n"));
118 return None;
119 }
120 };
121 let lookup = harn_vm::bytecode_cache::load(Path::new(path), &source);
122 if let Some(chunk) = lookup.chunk {
123 return Some(LoadedChunk { source, chunk });
124 }
125
126 let (parsed_source, program) = parse_source_file(path);
127 debug_assert_eq!(parsed_source, source, "parse_source_file re-read drifted");
128
129 let mut had_type_error = false;
130 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
131 for diag in &type_diagnostics {
132 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
133 if matches!(diag.severity, DiagnosticSeverity::Error) {
134 had_type_error = true;
135 }
136 stderr.push_str(&rendered);
137 }
138 if had_type_error {
139 return None;
140 }
141
142 let chunk = match harn_vm::Compiler::new().compile(&program) {
143 Ok(c) => c,
144 Err(e) => {
145 stderr.push_str(&format!("error: compile error: {e}\n"));
146 return None;
147 }
148 };
149
150 if let Err(err) = harn_vm::bytecode_cache::store(&lookup.key, &chunk) {
155 if std::env::var_os("HARN_BYTECODE_CACHE_DEBUG").is_some() {
156 eprintln!("[harn] bytecode cache write skipped: {err}");
157 }
158 }
159
160 Some(LoadedChunk { source, chunk })
161}
162
163fn typecheck_with_imports(
168 program: &[harn_parser::SNode],
169 path: &Path,
170 source: &str,
171) -> Vec<harn_parser::TypeDiagnostic> {
172 if let Err(error) = package::ensure_dependencies_materialized(path) {
173 eprintln!("error: {error}");
174 process::exit(1);
175 }
176 let graph = harn_modules::build(&[path.to_path_buf()]);
177 let mut checker = harn_parser::TypeChecker::new();
178 if let Some(imported) = graph.imported_names_for_file(path) {
179 checker = checker.with_imported_names(imported);
180 }
181 if let Some(imported) = graph.imported_type_declarations_for_file(path) {
182 checker = checker.with_imported_type_decls(imported);
183 }
184 if let Some(imported) = graph.imported_callable_declarations_for_file(path) {
185 checker = checker.with_imported_callable_decls(imported);
186 }
187 checker.check_with_source(program, source)
188}
189
190pub(crate) fn prepare_eval_temp_file(
201 code: &str,
202) -> Result<(String, tempfile::NamedTempFile), String> {
203 let (header, body) = split_eval_header(code);
204 let wrapped = if header.is_empty() {
205 format!("pipeline main(task) {{\n{body}\n}}")
206 } else {
207 format!("{header}\npipeline main(task) {{\n{body}\n}}")
208 };
209
210 let tmp = create_eval_temp_file()?;
211 Ok((wrapped, tmp))
212}
213
214fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
219 if let Some(dir) = std::env::current_dir().ok().as_deref() {
220 match tempfile::Builder::new()
223 .prefix(".harn-eval-")
224 .suffix(".harn")
225 .tempfile_in(dir)
226 {
227 Ok(tmp) => return Ok(tmp),
228 Err(error) => eprintln!(
229 "warning: harn run -e: could not create temp file in {}: {error}; \
230 relative imports will not resolve",
231 dir.display()
232 ),
233 }
234 }
235 tempfile::Builder::new()
236 .prefix("harn-eval-")
237 .suffix(".harn")
238 .tempfile()
239 .map_err(|e| format!("failed to create temp file for -e: {e}"))
240}
241
242fn split_eval_header(code: &str) -> (String, String) {
250 let mut header_end = 0usize;
251 let mut last_kept = 0usize;
252 for (idx, line) in code.lines().enumerate() {
253 let trimmed = line.trim_start();
254 if trimmed.is_empty() || trimmed.starts_with("//") {
255 header_end = idx + 1;
256 continue;
257 }
258 let is_import = trimmed.starts_with("import ")
259 || trimmed.starts_with("import\t")
260 || trimmed.starts_with("import\"")
261 || trimmed.starts_with("pub import ")
262 || trimmed.starts_with("pub import\t");
263 if is_import {
264 header_end = idx + 1;
265 last_kept = idx + 1;
266 } else {
267 break;
268 }
269 }
270 if last_kept == 0 {
271 return (String::new(), code.to_string());
272 }
273 let mut header_lines: Vec<&str> = Vec::new();
274 let mut body_lines: Vec<&str> = Vec::new();
275 for (idx, line) in code.lines().enumerate() {
276 if idx < header_end {
277 header_lines.push(line);
278 } else {
279 body_lines.push(line);
280 }
281 }
282 (header_lines.join("\n"), body_lines.join("\n"))
283}
284
285#[derive(Clone, Debug, Default, PartialEq, Eq)]
286pub enum CliLlmMockMode {
287 #[default]
288 Off,
289 Replay {
290 fixture_path: PathBuf,
291 },
292 Record {
293 fixture_path: PathBuf,
294 },
295}
296
297#[derive(Clone, Debug, Default, PartialEq, Eq)]
298pub struct RunAttestationOptions {
299 pub receipt_out: Option<PathBuf>,
300 pub agent_id: Option<String>,
301}
302
303#[derive(Clone, Debug, Default, PartialEq, Eq)]
308pub struct RunProfileOptions {
309 pub text: bool,
310 pub json_path: Option<PathBuf>,
311}
312
313impl RunProfileOptions {
314 pub fn is_enabled(&self) -> bool {
315 self.text || self.json_path.is_some()
316 }
317}
318
319#[derive(Clone)]
320pub struct RunInterruptTokens {
321 pub cancel_token: Arc<AtomicBool>,
322 pub signal_token: Arc<Mutex<Option<String>>>,
323}
324
325struct ExecuteRunInputs<'a> {
326 path: &'a str,
327 trace: bool,
328 denied_builtins: HashSet<String>,
329 script_argv: Vec<String>,
330 skill_dirs_raw: Vec<String>,
331 llm_mock_mode: CliLlmMockMode,
332 attestation: Option<RunAttestationOptions>,
333 profile: RunProfileOptions,
334 interrupt_tokens: Option<RunInterruptTokens>,
335}
336
337#[derive(Clone, Debug, Default)]
341pub struct RunOutcome {
342 pub stdout: String,
343 pub stderr: String,
344 pub exit_code: i32,
345}
346
347pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
348 harn_vm::llm::clear_cli_llm_mock_mode();
349 match mode {
350 CliLlmMockMode::Off => Ok(()),
351 CliLlmMockMode::Replay { fixture_path } => {
352 let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
353 harn_vm::llm::install_cli_llm_mocks(mocks);
354 Ok(())
355 }
356 CliLlmMockMode::Record { .. } => {
357 harn_vm::llm::enable_cli_llm_mock_recording();
358 Ok(())
359 }
360 }
361}
362
363pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
364 let CliLlmMockMode::Record { fixture_path } = mode else {
365 return Ok(());
366 };
367 if let Some(parent) = fixture_path.parent() {
368 if !parent.as_os_str().is_empty() {
369 fs::create_dir_all(parent).map_err(|error| {
370 format!(
371 "failed to create fixture directory {}: {error}",
372 parent.display()
373 )
374 })?;
375 }
376 }
377
378 let lines = harn_vm::llm::take_cli_llm_recordings()
379 .into_iter()
380 .map(harn_vm::llm::serialize_llm_mock)
381 .collect::<Result<Vec<_>, _>>()?;
382 let body = if lines.is_empty() {
383 String::new()
384 } else {
385 format!("{}\n", lines.join("\n"))
386 };
387 fs::write(fixture_path, body)
388 .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
389}
390
391pub(crate) async fn run_file(
392 path: &str,
393 trace: bool,
394 denied_builtins: HashSet<String>,
395 script_argv: Vec<String>,
396 llm_mock_mode: CliLlmMockMode,
397 attestation: Option<RunAttestationOptions>,
398 profile: RunProfileOptions,
399) {
400 run_file_with_skill_dirs(
401 path,
402 trace,
403 denied_builtins,
404 script_argv,
405 Vec::new(),
406 llm_mock_mode,
407 attestation,
408 profile,
409 )
410 .await;
411}
412
413pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
414 let outcome = execute_explain_cost(path);
415 if !outcome.stderr.is_empty() {
416 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
417 }
418 if !outcome.stdout.is_empty() {
419 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
420 }
421 if outcome.exit_code != 0 {
422 process::exit(outcome.exit_code);
423 }
424}
425
426pub(crate) async fn run_file_with_skill_dirs(
427 path: &str,
428 trace: bool,
429 denied_builtins: HashSet<String>,
430 script_argv: Vec<String>,
431 skill_dirs_raw: Vec<String>,
432 llm_mock_mode: CliLlmMockMode,
433 attestation: Option<RunAttestationOptions>,
434 profile: RunProfileOptions,
435) {
436 let interrupt_tokens = install_signal_shutdown_handler();
438
439 let _stdout_passthrough = StdoutPassthroughGuard::enable();
440 let outcome = execute_run_inner(ExecuteRunInputs {
441 path,
442 trace,
443 denied_builtins,
444 script_argv,
445 skill_dirs_raw,
446 llm_mock_mode,
447 attestation,
448 profile,
449 interrupt_tokens: Some(interrupt_tokens.clone()),
450 })
451 .await;
452
453 if !outcome.stderr.is_empty() {
456 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
457 }
458 if !outcome.stdout.is_empty() {
459 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
460 }
461
462 let mut exit_code = outcome.exit_code;
463 if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
464 exit_code = 124;
465 }
466 if exit_code != 0 {
467 process::exit(exit_code);
468 }
469}
470
471pub fn execute_explain_cost(path: &str) -> RunOutcome {
472 let stdout = String::new();
473 let mut stderr = String::new();
474
475 let (source, program) = parse_source_file(path);
476
477 let mut had_type_error = false;
478 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
479 for diag in &type_diagnostics {
480 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
481 if matches!(diag.severity, DiagnosticSeverity::Error) {
482 had_type_error = true;
483 }
484 stderr.push_str(&rendered);
485 }
486 if had_type_error {
487 return RunOutcome {
488 stdout,
489 stderr,
490 exit_code: 1,
491 };
492 }
493
494 let extensions = package::load_runtime_extensions(Path::new(path));
495 package::install_runtime_extensions(&extensions);
496 RunOutcome {
497 stdout: explain_cost::render_explain_cost(path, &program),
498 stderr,
499 exit_code: 0,
500 }
501}
502
503struct StdoutPassthroughGuard {
504 previous: bool,
505}
506
507impl StdoutPassthroughGuard {
508 fn enable() -> Self {
509 Self {
510 previous: harn_vm::set_stdout_passthrough(true),
511 }
512 }
513}
514
515impl Drop for StdoutPassthroughGuard {
516 fn drop(&mut self) {
517 harn_vm::set_stdout_passthrough(self.previous);
518 }
519}
520
521const FIRST_SIGNAL_MESSAGE: &str =
530 "[harn] signal received, interrupting VM (give it a moment to unwind in-flight async ops; Ctrl-C again to force-exit)...";
531
532fn install_signal_shutdown_handler() -> RunInterruptTokens {
533 let tokens = RunInterruptTokens {
534 cancel_token: Arc::new(AtomicBool::new(false)),
535 signal_token: Arc::new(Mutex::new(None)),
536 };
537 let tokens_clone = tokens.clone();
538 tokio::spawn(async move {
539 #[cfg(unix)]
540 {
541 use tokio::signal::unix::{signal, SignalKind};
542 let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
543 let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
544 let mut sighup = signal(SignalKind::hangup()).expect("SIGHUP handler");
545 let mut seen_signal = false;
546 loop {
547 let signal_name = tokio::select! {
548 _ = sigterm.recv() => "SIGTERM",
549 _ = sigint.recv() => "SIGINT",
550 _ = sighup.recv() => "SIGHUP",
551 };
552 if seen_signal {
553 eprintln!("[harn] second signal received, terminating");
554 process::exit(124);
555 }
556 seen_signal = true;
557 request_vm_interrupt(&tokens_clone, signal_name);
558 eprintln!("{FIRST_SIGNAL_MESSAGE}");
559 }
560 }
561 #[cfg(not(unix))]
562 {
563 let mut seen_signal = false;
564 loop {
565 let _ = tokio::signal::ctrl_c().await;
566 if seen_signal {
567 eprintln!("[harn] second signal received, terminating");
568 process::exit(124);
569 }
570 seen_signal = true;
571 request_vm_interrupt(&tokens_clone, "SIGINT");
572 eprintln!("{FIRST_SIGNAL_MESSAGE}");
573 }
574 }
575 });
576 tokens
577}
578
579fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
580 if let Ok(mut signal) = tokens.signal_token.lock() {
581 *signal = Some(signal_name.to_string());
582 }
583 tokens.cancel_token.store(true, Ordering::SeqCst);
584}
585
586pub async fn execute_run(
592 path: &str,
593 trace: bool,
594 denied_builtins: HashSet<String>,
595 script_argv: Vec<String>,
596 skill_dirs_raw: Vec<String>,
597 llm_mock_mode: CliLlmMockMode,
598 attestation: Option<RunAttestationOptions>,
599 profile: RunProfileOptions,
600) -> RunOutcome {
601 execute_run_inner(ExecuteRunInputs {
602 path,
603 trace,
604 denied_builtins,
605 script_argv,
606 skill_dirs_raw,
607 llm_mock_mode,
608 attestation,
609 profile,
610 interrupt_tokens: None,
611 })
612 .await
613}
614
615async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
616 let ExecuteRunInputs {
617 path,
618 trace,
619 denied_builtins,
620 script_argv,
621 skill_dirs_raw,
622 llm_mock_mode,
623 attestation,
624 profile,
625 interrupt_tokens,
626 } = inputs;
627
628 let mut stderr = String::new();
629 let mut stdout = String::new();
630
631 let Some(LoadedChunk { source, chunk }) = compile_or_load_chunk_for_run(path, &mut stderr)
632 else {
633 return RunOutcome {
634 stdout,
635 stderr,
636 exit_code: 1,
637 };
638 };
639
640 if trace {
641 harn_vm::llm::enable_tracing();
642 }
643 if profile.is_enabled() {
644 harn_vm::tracing::set_tracing_enabled(true);
645 }
646 if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
647 stderr.push_str(&format!("error: {error}\n"));
648 return RunOutcome {
649 stdout,
650 stderr,
651 exit_code: 1,
652 };
653 }
654
655 let mut vm = harn_vm::Vm::new();
656 if let Some(interrupt_tokens) = interrupt_tokens {
657 vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
658 vm.install_cancel_token(interrupt_tokens.cancel_token);
659 }
660 harn_vm::register_vm_stdlib(&mut vm);
661 crate::install_default_hostlib(&mut vm);
662 let source_parent = std::path::Path::new(path)
663 .parent()
664 .unwrap_or(std::path::Path::new("."));
665 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
667 let store_base = project_root.as_deref().unwrap_or(source_parent);
668 let attestation_started_at_ms = now_ms();
669 let attestation_log = if attestation.is_some() {
670 Some(harn_vm::event_log::install_memory_for_current_thread(256))
671 } else {
672 None
673 };
674 if let Some(log) = attestation_log.as_ref() {
675 append_run_provenance_event(
676 log,
677 "started",
678 serde_json::json!({
679 "pipeline": path,
680 "argv": &script_argv,
681 "project_root": store_base.display().to_string(),
682 }),
683 )
684 .await;
685 }
686 harn_vm::register_store_builtins(&mut vm, store_base);
687 harn_vm::register_metadata_builtins(&mut vm, store_base);
688 let pipeline_name = std::path::Path::new(path)
689 .file_stem()
690 .and_then(|s| s.to_str())
691 .unwrap_or("default");
692 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
693 vm.set_source_info(path, &source);
694 if !denied_builtins.is_empty() {
695 vm.set_denied_builtins(denied_builtins);
696 }
697 if let Some(ref root) = project_root {
698 vm.set_project_root(root);
699 }
700
701 if let Some(p) = std::path::Path::new(path).parent() {
702 if !p.as_os_str().is_empty() {
703 vm.set_source_dir(p);
704 }
705 }
706
707 let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
710 let loaded = load_skills(&SkillLoaderInputs {
711 cli_dirs,
712 source_path: Some(std::path::PathBuf::from(path)),
713 });
714 emit_loader_warnings(&loaded.loader_warnings);
715 install_skills_global(&mut vm, &loaded);
716
717 let argv_values: Vec<harn_vm::VmValue> = script_argv
720 .iter()
721 .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
722 .collect();
723 vm.set_global(
724 "argv",
725 harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
726 );
727
728 let extensions = package::load_runtime_extensions(Path::new(path));
729 package::install_runtime_extensions(&extensions);
730 if let Some(manifest) = extensions.root_manifest.as_ref() {
731 if !manifest.mcp.is_empty() {
732 connect_mcp_servers(&manifest.mcp, &mut vm).await;
733 }
734 }
735 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
736 stderr.push_str(&format!(
737 "error: failed to install manifest triggers: {error}\n"
738 ));
739 return RunOutcome {
740 stdout,
741 stderr,
742 exit_code: 1,
743 };
744 }
745 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
746 stderr.push_str(&format!(
747 "error: failed to install manifest hooks: {error}\n"
748 ));
749 return RunOutcome {
750 stdout,
751 stderr,
752 exit_code: 1,
753 };
754 }
755
756 let local = tokio::task::LocalSet::new();
758 let execution = local
759 .run_until(async {
760 match vm.execute(&chunk).await {
761 Ok(value) => Ok((vm.output(), value)),
762 Err(e) => Err(vm.format_runtime_error(&e)),
763 }
764 })
765 .await;
766 if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
767 stderr.push_str(&format!("error: {error}\n"));
768 return RunOutcome {
769 stdout,
770 stderr,
771 exit_code: 1,
772 };
773 }
774
775 let buffered_stderr = harn_vm::take_stderr_buffer();
777 stderr.push_str(&buffered_stderr);
778
779 let exit_code = match &execution {
780 Ok((_, return_value)) => exit_code_from_return_value(return_value),
781 Err(_) => 1,
782 };
783
784 if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
785 if let Err(error) = emit_run_attestation(
786 log,
787 path,
788 store_base,
789 attestation_started_at_ms,
790 exit_code,
791 options,
792 &mut stderr,
793 )
794 .await
795 {
796 stderr.push_str(&format!(
797 "error: failed to emit provenance receipt: {error}\n"
798 ));
799 return RunOutcome {
800 stdout,
801 stderr,
802 exit_code: 1,
803 };
804 }
805 harn_vm::event_log::reset_active_event_log();
806 }
807
808 match execution {
809 Ok((output, return_value)) => {
810 stdout.push_str(output);
811 if trace {
812 stderr.push_str(&render_trace_summary());
813 }
814 if profile.is_enabled() {
815 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
816 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
817 }
818 }
819 if exit_code != 0 {
820 stderr.push_str(&render_return_value_error(&return_value));
821 }
822 RunOutcome {
823 stdout,
824 stderr,
825 exit_code,
826 }
827 }
828 Err(rendered_error) => {
829 stderr.push_str(&rendered_error);
830 if profile.is_enabled() {
831 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
832 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
833 }
834 }
835 RunOutcome {
836 stdout,
837 stderr,
838 exit_code: 1,
839 }
840 }
841 }
842}
843
844fn render_and_persist_profile(
845 options: &RunProfileOptions,
846 stderr: &mut String,
847) -> Result<(), String> {
848 let spans = harn_vm::tracing::peek_spans();
849 let profile = harn_vm::profile::build(&spans);
850 if options.text {
851 stderr.push_str(&harn_vm::profile::render(&profile));
852 }
853 if let Some(path) = options.json_path.as_ref() {
854 if let Some(parent) = path.parent() {
855 if !parent.as_os_str().is_empty() {
856 fs::create_dir_all(parent)
857 .map_err(|error| format!("create {}: {error}", parent.display()))?;
858 }
859 }
860 let json = serde_json::to_string_pretty(&profile)
861 .map_err(|error| format!("serialize profile: {error}"))?;
862 fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
863 }
864 Ok(())
865}
866
867async fn append_run_provenance_event(
868 log: &Arc<harn_vm::event_log::AnyEventLog>,
869 kind: &str,
870 payload: serde_json::Value,
871) {
872 let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
873 return;
874 };
875 let _ = log
876 .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
877 .await;
878}
879
880async fn emit_run_attestation(
881 log: &Arc<harn_vm::event_log::AnyEventLog>,
882 path: &str,
883 store_base: &Path,
884 started_at_ms: i64,
885 exit_code: i32,
886 options: &RunAttestationOptions,
887 stderr: &mut String,
888) -> Result<(), String> {
889 let finished_at_ms = now_ms();
890 let status = if exit_code == 0 { "success" } else { "failure" };
891 append_run_provenance_event(
892 log,
893 "finished",
894 serde_json::json!({
895 "pipeline": path,
896 "status": status,
897 "exit_code": exit_code,
898 }),
899 )
900 .await;
901 log.flush()
902 .await
903 .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
904 let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
905 .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
906 let (signing_key, key_id) =
907 harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
908 .await
909 .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
910 let receipt = harn_vm::build_signed_receipt(
911 log,
912 harn_vm::ReceiptBuildOptions {
913 pipeline: path.to_string(),
914 status: status.to_string(),
915 started_at_ms,
916 finished_at_ms,
917 exit_code,
918 producer_name: "harn-cli".to_string(),
919 producer_version: env!("CARGO_PKG_VERSION").to_string(),
920 },
921 &signing_key,
922 key_id,
923 )
924 .await
925 .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
926 let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
927 if let Some(parent) = receipt_path.parent() {
928 fs::create_dir_all(parent)
929 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
930 }
931 let encoded = serde_json::to_vec_pretty(&receipt)
932 .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
933 fs::write(&receipt_path, encoded)
934 .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
935 stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
936 Ok(())
937}
938
939fn receipt_output_path(
940 store_base: &Path,
941 options: &RunAttestationOptions,
942 receipt_id: &str,
943) -> PathBuf {
944 if let Some(path) = options.receipt_out.as_ref() {
945 return path.clone();
946 }
947 harn_vm::runtime_paths::state_root(store_base)
948 .join("receipts")
949 .join(format!("{receipt_id}.json"))
950}
951
952fn now_ms() -> i64 {
953 std::time::SystemTime::now()
954 .duration_since(std::time::UNIX_EPOCH)
955 .map(|duration| duration.as_millis() as i64)
956 .unwrap_or(0)
957}
958
959fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
966 use harn_vm::VmValue;
967 match value {
968 VmValue::Int(n) => (*n).clamp(0, 255) as i32,
969 VmValue::EnumVariant {
970 enum_name,
971 variant,
972 fields,
973 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => 1,
974 _ => 0,
975 }
976}
977
978fn render_return_value_error(value: &harn_vm::VmValue) -> String {
979 let harn_vm::VmValue::EnumVariant {
980 enum_name,
981 variant,
982 fields,
983 } = value
984 else {
985 return String::new();
986 };
987 if enum_name.as_ref() != "Result" || variant.as_ref() != "Err" {
988 return String::new();
989 }
990 let rendered = fields.first().map(|p| p.display()).unwrap_or_default();
991 if rendered.is_empty() {
992 "error\n".to_string()
993 } else if rendered.ends_with('\n') {
994 rendered
995 } else {
996 format!("{rendered}\n")
997 }
998}
999
1000pub(crate) async fn connect_mcp_servers(
1009 servers: &[package::McpServerConfig],
1010 vm: &mut harn_vm::Vm,
1011) {
1012 use std::collections::BTreeMap;
1013 use std::rc::Rc;
1014 use std::time::Duration;
1015
1016 let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
1017 let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
1018
1019 for server in servers {
1020 let resolved_auth = match mcp::resolve_auth_for_server(server).await {
1021 Ok(resolution) => resolution,
1022 Err(error) => {
1023 eprintln!(
1024 "warning: mcp: failed to load auth for '{}': {}",
1025 server.name, error
1026 );
1027 AuthResolution::None
1028 }
1029 };
1030 let spec = serde_json::json!({
1031 "name": server.name,
1032 "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
1033 "command": server.command,
1034 "args": server.args,
1035 "env": server.env,
1036 "url": server.url,
1037 "auth_token": match resolved_auth {
1038 AuthResolution::Bearer(token) => Some(token),
1039 AuthResolution::None => server.auth_token.clone(),
1040 },
1041 "protocol_version": server.protocol_version,
1042 "proxy_server_name": server.proxy_server_name,
1043 });
1044
1045 registrations.push(harn_vm::RegisteredMcpServer {
1048 name: server.name.clone(),
1049 spec: spec.clone(),
1050 lazy: server.lazy,
1051 card: server.card.clone(),
1052 keep_alive: server.keep_alive_ms.map(Duration::from_millis),
1053 });
1054
1055 if server.lazy {
1056 eprintln!(
1057 "[harn] mcp: deferred '{}' (lazy, boots on first use)",
1058 server.name
1059 );
1060 continue;
1061 }
1062
1063 match harn_vm::connect_mcp_server_from_json(&spec).await {
1064 Ok(handle) => {
1065 eprintln!("[harn] mcp: connected to '{}'", server.name);
1066 harn_vm::mcp_install_active(&server.name, handle.clone());
1067 mcp_dict.insert(server.name.clone(), harn_vm::VmValue::McpClient(handle));
1068 }
1069 Err(e) => {
1070 eprintln!(
1071 "warning: mcp: failed to connect to '{}': {}",
1072 server.name, e
1073 );
1074 }
1075 }
1076 }
1077
1078 harn_vm::mcp_register_servers(registrations);
1081
1082 if !mcp_dict.is_empty() {
1083 vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
1084 }
1085}
1086
1087pub(crate) fn render_trace_summary() -> String {
1088 use std::fmt::Write;
1089 let entries = harn_vm::llm::take_trace();
1090 if entries.is_empty() {
1091 return String::new();
1092 }
1093 let mut out = String::new();
1094 let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
1095 let mut total_input = 0i64;
1096 let mut total_output = 0i64;
1097 let mut total_ms = 0u64;
1098 for (i, entry) in entries.iter().enumerate() {
1099 let _ = writeln!(
1100 out,
1101 " #{}: {} | {} in + {} out tokens | {} ms",
1102 i + 1,
1103 entry.model,
1104 entry.input_tokens,
1105 entry.output_tokens,
1106 entry.duration_ms,
1107 );
1108 total_input += entry.input_tokens;
1109 total_output += entry.output_tokens;
1110 total_ms += entry.duration_ms;
1111 }
1112 let total_tokens = total_input + total_output;
1113 let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
1115 let _ = writeln!(
1116 out,
1117 " \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
1118 entries.len(),
1119 if entries.len() == 1 { "" } else { "s" },
1120 total_tokens,
1121 total_input,
1122 total_output,
1123 total_ms,
1124 cost,
1125 );
1126 out
1127}
1128
1129pub(crate) async fn run_file_mcp_serve(
1143 path: &str,
1144 card_source: Option<&str>,
1145 mode: RunFileMcpServeMode,
1146) {
1147 let mut diagnostics = String::new();
1148 let Some(LoadedChunk { source, chunk }) = compile_or_load_chunk_for_run(path, &mut diagnostics)
1149 else {
1150 eprint!("{diagnostics}");
1151 process::exit(1);
1152 };
1153 if !diagnostics.is_empty() {
1154 eprint!("{diagnostics}");
1155 }
1156
1157 let mut vm = harn_vm::Vm::new();
1158 harn_vm::register_vm_stdlib(&mut vm);
1159 crate::install_default_hostlib(&mut vm);
1160 let source_parent = std::path::Path::new(path)
1161 .parent()
1162 .unwrap_or(std::path::Path::new("."));
1163 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1164 let store_base = project_root.as_deref().unwrap_or(source_parent);
1165 harn_vm::register_store_builtins(&mut vm, store_base);
1166 harn_vm::register_metadata_builtins(&mut vm, store_base);
1167 let pipeline_name = std::path::Path::new(path)
1168 .file_stem()
1169 .and_then(|s| s.to_str())
1170 .unwrap_or("default");
1171 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1172 vm.set_source_info(path, &source);
1173 if let Some(ref root) = project_root {
1174 vm.set_project_root(root);
1175 }
1176 if let Some(p) = std::path::Path::new(path).parent() {
1177 if !p.as_os_str().is_empty() {
1178 vm.set_source_dir(p);
1179 }
1180 }
1181
1182 let loaded = load_skills(&SkillLoaderInputs {
1184 cli_dirs: Vec::new(),
1185 source_path: Some(std::path::PathBuf::from(path)),
1186 });
1187 emit_loader_warnings(&loaded.loader_warnings);
1188 install_skills_global(&mut vm, &loaded);
1189
1190 let extensions = package::load_runtime_extensions(Path::new(path));
1191 package::install_runtime_extensions(&extensions);
1192 if let Some(manifest) = extensions.root_manifest.as_ref() {
1193 if !manifest.mcp.is_empty() {
1194 connect_mcp_servers(&manifest.mcp, &mut vm).await;
1195 }
1196 }
1197 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1198 eprintln!("error: failed to install manifest triggers: {error}");
1199 process::exit(1);
1200 }
1201 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1202 eprintln!("error: failed to install manifest hooks: {error}");
1203 process::exit(1);
1204 }
1205
1206 let local = tokio::task::LocalSet::new();
1207 local
1208 .run_until(async {
1209 match vm.execute(&chunk).await {
1210 Ok(_) => {}
1211 Err(e) => {
1212 eprint!("{}", vm.format_runtime_error(&e));
1213 process::exit(1);
1214 }
1215 }
1216
1217 let output = vm.output();
1219 if !output.is_empty() {
1220 eprint!("{output}");
1221 }
1222
1223 let registry = match harn_vm::take_mcp_serve_registry() {
1224 Some(r) => r,
1225 None => {
1226 eprintln!("error: pipeline did not call mcp_serve(registry)");
1227 eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
1228 process::exit(1);
1229 }
1230 };
1231
1232 let tools = match harn_vm::tool_registry_to_mcp_tools(®istry) {
1233 Ok(t) => t,
1234 Err(e) => {
1235 eprintln!("error: {e}");
1236 process::exit(1);
1237 }
1238 };
1239
1240 let resources = harn_vm::take_mcp_serve_resources();
1241 let resource_templates = harn_vm::take_mcp_serve_resource_templates();
1242 let prompts = harn_vm::take_mcp_serve_prompts();
1243
1244 let server_name = std::path::Path::new(path)
1245 .file_stem()
1246 .and_then(|s| s.to_str())
1247 .unwrap_or("harn")
1248 .to_string();
1249
1250 let mut caps = Vec::new();
1251 if !tools.is_empty() {
1252 caps.push(format!(
1253 "{} tool{}",
1254 tools.len(),
1255 if tools.len() == 1 { "" } else { "s" }
1256 ));
1257 }
1258 let total_resources = resources.len() + resource_templates.len();
1259 if total_resources > 0 {
1260 caps.push(format!(
1261 "{total_resources} resource{}",
1262 if total_resources == 1 { "" } else { "s" }
1263 ));
1264 }
1265 if !prompts.is_empty() {
1266 caps.push(format!(
1267 "{} prompt{}",
1268 prompts.len(),
1269 if prompts.len() == 1 { "" } else { "s" }
1270 ));
1271 }
1272 eprintln!(
1273 "[harn] serve mcp: serving {} as '{server_name}'",
1274 caps.join(", ")
1275 );
1276
1277 let mut server =
1278 harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
1279 if let Some(source) = card_source {
1280 match resolve_card_source(source) {
1281 Ok(card) => server = server.with_server_card(card),
1282 Err(e) => {
1283 eprintln!("error: --card: {e}");
1284 process::exit(1);
1285 }
1286 }
1287 }
1288 match mode {
1289 RunFileMcpServeMode::Stdio => {
1290 if let Err(e) = server.run(&mut vm).await {
1291 eprintln!("error: MCP server error: {e}");
1292 process::exit(1);
1293 }
1294 }
1295 RunFileMcpServeMode::Http {
1296 options,
1297 auth_policy,
1298 } => {
1299 if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
1300 server,
1301 vm,
1302 options,
1303 auth_policy,
1304 )
1305 .await
1306 {
1307 eprintln!("error: MCP server error: {e}");
1308 process::exit(1);
1309 }
1310 }
1311 }
1312 })
1313 .await;
1314}
1315
1316pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
1321 let trimmed = source.trim_start();
1322 if trimmed.starts_with('{') || trimmed.starts_with('[') {
1323 return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
1324 }
1325 let path = std::path::Path::new(source);
1326 harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
1327}
1328
1329pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
1330 use notify::{Event, EventKind, RecursiveMode, Watcher};
1331
1332 let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
1333 eprintln!("Error: {e}");
1334 process::exit(1);
1335 });
1336 let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
1337
1338 eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
1339 run_file(
1340 path,
1341 false,
1342 denied_builtins.clone(),
1343 Vec::new(),
1344 CliLlmMockMode::Off,
1345 None,
1346 RunProfileOptions::default(),
1347 )
1348 .await;
1349
1350 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1351 let _watcher = {
1352 let tx = tx.clone();
1353 let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
1354 if let Ok(event) = res {
1355 if matches!(
1356 event.kind,
1357 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
1358 ) {
1359 let has_harn = event
1360 .paths
1361 .iter()
1362 .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
1363 if has_harn {
1364 let _ = tx.blocking_send(());
1365 }
1366 }
1367 }
1368 })
1369 .unwrap_or_else(|e| {
1370 eprintln!("Error setting up file watcher: {e}");
1371 process::exit(1);
1372 });
1373 watcher
1374 .watch(watch_dir, RecursiveMode::Recursive)
1375 .unwrap_or_else(|e| {
1376 eprintln!("Error watching directory: {e}");
1377 process::exit(1);
1378 });
1379 watcher };
1381
1382 eprintln!(
1383 "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
1384 watch_dir.display()
1385 );
1386
1387 loop {
1388 rx.recv().await;
1389 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1391 while rx.try_recv().is_ok() {}
1392
1393 eprintln!();
1394 eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
1395 run_file(
1396 path,
1397 false,
1398 denied_builtins.clone(),
1399 Vec::new(),
1400 CliLlmMockMode::Off,
1401 None,
1402 RunProfileOptions::default(),
1403 )
1404 .await;
1405 }
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410 use super::{
1411 execute_explain_cost, execute_run, split_eval_header, CliLlmMockMode, RunProfileOptions,
1412 StdoutPassthroughGuard,
1413 };
1414 use std::collections::HashSet;
1415
1416 #[test]
1417 fn split_eval_header_no_imports_returns_full_body() {
1418 let (header, body) = split_eval_header("println(1 + 2)");
1419 assert_eq!(header, "");
1420 assert_eq!(body, "println(1 + 2)");
1421 }
1422
1423 #[test]
1424 fn split_eval_header_lifts_leading_imports() {
1425 let code = "import \"./lib\"\nimport { x } from \"std/math\"\nprintln(x)";
1426 let (header, body) = split_eval_header(code);
1427 assert_eq!(header, "import \"./lib\"\nimport { x } from \"std/math\"");
1428 assert_eq!(body, "println(x)");
1429 }
1430
1431 #[test]
1432 fn split_eval_header_keeps_pub_import_and_comments_in_header() {
1433 let code = "// header comment\npub import { y } from \"./lib\"\n\nfoo()";
1434 let (header, body) = split_eval_header(code);
1435 assert_eq!(
1436 header,
1437 "// header comment\npub import { y } from \"./lib\"\n"
1438 );
1439 assert_eq!(body, "foo()");
1440 }
1441
1442 #[test]
1443 fn split_eval_header_does_not_lift_imports_after_other_statements() {
1444 let code = "let a = 1\nimport \"./lib\"";
1445 let (header, body) = split_eval_header(code);
1446 assert_eq!(header, "");
1447 assert_eq!(body, "let a = 1\nimport \"./lib\"");
1448 }
1449
1450 #[test]
1451 fn cli_llm_mock_roundtrips_logprobs() {
1452 let mock = harn_vm::llm::parse_llm_mock_value(&serde_json::json!({
1453 "text": "visible",
1454 "logprobs": [{"token": "visible", "logprob": 0.0}]
1455 }))
1456 .expect("parse mock");
1457 assert_eq!(mock.logprobs.len(), 1);
1458
1459 let line = harn_vm::llm::serialize_llm_mock(mock).expect("serialize mock");
1460 let value: serde_json::Value = serde_json::from_str(&line).expect("json line");
1461 assert_eq!(value["logprobs"][0]["token"].as_str(), Some("visible"));
1462
1463 let reparsed = harn_vm::llm::parse_llm_mock_value(&value).expect("reparse mock");
1464 assert_eq!(reparsed.logprobs.len(), 1);
1465 assert_eq!(reparsed.logprobs[0]["logprob"].as_f64(), Some(0.0));
1466 }
1467
1468 #[test]
1469 fn stdout_passthrough_guard_restores_previous_state() {
1470 let original = harn_vm::set_stdout_passthrough(false);
1471 {
1472 let _guard = StdoutPassthroughGuard::enable();
1473 assert!(harn_vm::set_stdout_passthrough(true));
1474 }
1475 assert!(!harn_vm::set_stdout_passthrough(original));
1476 }
1477
1478 #[test]
1479 fn execute_explain_cost_does_not_execute_script() {
1480 let temp = tempfile::TempDir::new().expect("temp dir");
1481 let script = temp.path().join("main.harn");
1482 std::fs::write(
1483 &script,
1484 r#"
1485pipeline main() {
1486 write_file("executed.txt", "bad")
1487 llm_call("hello", nil, {provider: "mock", model: "mock"})
1488}
1489"#,
1490 )
1491 .expect("write script");
1492
1493 let outcome = execute_explain_cost(&script.to_string_lossy());
1494
1495 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1496 assert!(outcome.stdout.contains("LLM cost estimate"));
1497 assert!(
1498 !temp.path().join("executed.txt").exists(),
1499 "--explain-cost must not execute pipeline side effects"
1500 );
1501 }
1502
1503 #[cfg(feature = "hostlib")]
1504 #[tokio::test]
1505 async fn execute_run_installs_hostlib_gate() {
1506 let temp = tempfile::NamedTempFile::new().expect("temp file");
1507 std::fs::write(
1508 temp.path(),
1509 r#"
1510pipeline main() {
1511 let _ = hostlib_enable("tools:deterministic")
1512 println("enabled")
1513}
1514"#,
1515 )
1516 .expect("write script");
1517
1518 let outcome = execute_run(
1519 &temp.path().to_string_lossy(),
1520 false,
1521 HashSet::new(),
1522 Vec::new(),
1523 Vec::new(),
1524 CliLlmMockMode::Off,
1525 None,
1526 RunProfileOptions::default(),
1527 )
1528 .await;
1529
1530 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1531 assert_eq!(outcome.stdout.trim(), "enabled");
1532 }
1533
1534 #[cfg(all(feature = "hostlib", unix))]
1535 #[tokio::test]
1536 async fn execute_run_can_read_hostlib_command_artifacts() {
1537 let temp = tempfile::NamedTempFile::new().expect("temp file");
1538 std::fs::write(
1539 temp.path(),
1540 r#"
1541pipeline main() {
1542 let _ = hostlib_enable("tools:deterministic")
1543 let result = hostlib_tools_run_command({
1544 argv: ["sh", "-c", "i=0; while [ $i -lt 2000 ]; do printf x; i=$((i+1)); done"],
1545 capture: {max_inline_bytes: 8},
1546 timeout_ms: 5000,
1547 })
1548 println(starts_with(result.command_id, "cmd_"))
1549 println(len(result.stdout))
1550 println(result.byte_count)
1551 let window = hostlib_tools_read_command_output({
1552 command_id: result.command_id,
1553 offset: 1990,
1554 length: 20,
1555 })
1556 println(len(window.content))
1557 println(window.eof)
1558}
1559"#,
1560 )
1561 .expect("write script");
1562
1563 let outcome = execute_run(
1564 &temp.path().to_string_lossy(),
1565 false,
1566 HashSet::new(),
1567 Vec::new(),
1568 Vec::new(),
1569 CliLlmMockMode::Off,
1570 None,
1571 RunProfileOptions::default(),
1572 )
1573 .await;
1574
1575 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1576 assert_eq!(outcome.stdout.trim(), "true\n8\n2000\n10\ntrue");
1577 }
1578}