1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9use harn_parser::DiagnosticSeverity;
10use harn_vm::event_log::EventLog;
11
12use crate::commands::mcp::{self, AuthResolution};
13use crate::package;
14use crate::parse_source_file;
15use crate::skill_loader::{
16 canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
17 SkillLoaderInputs,
18};
19
20mod explain_cost;
21
22pub(crate) enum RunFileMcpServeMode {
23 Stdio,
24 Http {
25 options: harn_serve::McpHttpServeOptions,
26 auth_policy: harn_serve::AuthPolicy,
27 },
28}
29
30const CORE_BUILTINS: &[&str] = &[
32 "println",
33 "print",
34 "log",
35 "type_of",
36 "to_string",
37 "to_int",
38 "to_float",
39 "len",
40 "assert",
41 "assert_eq",
42 "assert_ne",
43 "json_parse",
44 "json_stringify",
45 "runtime_context",
46 "task_current",
47 "runtime_context_values",
48 "runtime_context_get",
49 "runtime_context_set",
50 "runtime_context_clear",
51];
52
53pub(crate) fn build_denied_builtins(
58 deny_csv: Option<&str>,
59 allow_csv: Option<&str>,
60) -> HashSet<String> {
61 if let Some(csv) = deny_csv {
62 csv.split(',')
63 .map(|s| s.trim().to_string())
64 .filter(|s| !s.is_empty())
65 .collect()
66 } else if let Some(csv) = allow_csv {
67 let allowed: HashSet<String> = csv
70 .split(',')
71 .map(|s| s.trim().to_string())
72 .filter(|s| !s.is_empty())
73 .collect();
74 let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
75
76 let mut tmp = harn_vm::Vm::new();
78 harn_vm::register_vm_stdlib(&mut tmp);
79 harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
80 harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
81
82 tmp.builtin_names()
83 .into_iter()
84 .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
85 .collect()
86 } else {
87 HashSet::new()
88 }
89}
90
91fn typecheck_with_imports(
96 program: &[harn_parser::SNode],
97 path: &Path,
98 source: &str,
99) -> Vec<harn_parser::TypeDiagnostic> {
100 if let Err(error) = package::ensure_dependencies_materialized(path) {
101 eprintln!("error: {error}");
102 process::exit(1);
103 }
104 let graph = harn_modules::build(&[path.to_path_buf()]);
105 let mut checker = harn_parser::TypeChecker::new();
106 if let Some(imported) = graph.imported_names_for_file(path) {
107 checker = checker.with_imported_names(imported);
108 }
109 if let Some(imported) = graph.imported_type_declarations_for_file(path) {
110 checker = checker.with_imported_type_decls(imported);
111 }
112 if let Some(imported) = graph.imported_callable_declarations_for_file(path) {
113 checker = checker.with_imported_callable_decls(imported);
114 }
115 checker.check_with_source(program, source)
116}
117
118pub(crate) fn prepare_eval_temp_file(
129 code: &str,
130) -> Result<(String, tempfile::NamedTempFile), String> {
131 let (header, body) = split_eval_header(code);
132 let wrapped = if header.is_empty() {
133 format!("pipeline main(task) {{\n{body}\n}}")
134 } else {
135 format!("{header}\npipeline main(task) {{\n{body}\n}}")
136 };
137
138 let tmp = create_eval_temp_file()?;
139 Ok((wrapped, tmp))
140}
141
142fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
147 if let Some(dir) = std::env::current_dir().ok().as_deref() {
148 match tempfile::Builder::new()
151 .prefix(".harn-eval-")
152 .suffix(".harn")
153 .tempfile_in(dir)
154 {
155 Ok(tmp) => return Ok(tmp),
156 Err(error) => eprintln!(
157 "warning: harn run -e: could not create temp file in {}: {error}; \
158 relative imports will not resolve",
159 dir.display()
160 ),
161 }
162 }
163 tempfile::Builder::new()
164 .prefix("harn-eval-")
165 .suffix(".harn")
166 .tempfile()
167 .map_err(|e| format!("failed to create temp file for -e: {e}"))
168}
169
170fn split_eval_header(code: &str) -> (String, String) {
178 let mut header_end = 0usize;
179 let mut last_kept = 0usize;
180 for (idx, line) in code.lines().enumerate() {
181 let trimmed = line.trim_start();
182 if trimmed.is_empty() || trimmed.starts_with("//") {
183 header_end = idx + 1;
184 continue;
185 }
186 let is_import = trimmed.starts_with("import ")
187 || trimmed.starts_with("import\t")
188 || trimmed.starts_with("import\"")
189 || trimmed.starts_with("pub import ")
190 || trimmed.starts_with("pub import\t");
191 if is_import {
192 header_end = idx + 1;
193 last_kept = idx + 1;
194 } else {
195 break;
196 }
197 }
198 if last_kept == 0 {
199 return (String::new(), code.to_string());
200 }
201 let mut header_lines: Vec<&str> = Vec::new();
202 let mut body_lines: Vec<&str> = Vec::new();
203 for (idx, line) in code.lines().enumerate() {
204 if idx < header_end {
205 header_lines.push(line);
206 } else {
207 body_lines.push(line);
208 }
209 }
210 (header_lines.join("\n"), body_lines.join("\n"))
211}
212
213#[derive(Clone, Debug, Default, PartialEq, Eq)]
214pub enum CliLlmMockMode {
215 #[default]
216 Off,
217 Replay {
218 fixture_path: PathBuf,
219 },
220 Record {
221 fixture_path: PathBuf,
222 },
223}
224
225#[derive(Clone, Debug, Default, PartialEq, Eq)]
226pub struct RunAttestationOptions {
227 pub receipt_out: Option<PathBuf>,
228 pub agent_id: Option<String>,
229}
230
231#[derive(Clone, Debug, Default, PartialEq, Eq)]
236pub struct RunProfileOptions {
237 pub text: bool,
238 pub json_path: Option<PathBuf>,
239}
240
241impl RunProfileOptions {
242 pub fn is_enabled(&self) -> bool {
243 self.text || self.json_path.is_some()
244 }
245}
246
247#[derive(Clone)]
248pub struct RunInterruptTokens {
249 pub cancel_token: Arc<AtomicBool>,
250 pub signal_token: Arc<Mutex<Option<String>>>,
251}
252
253struct ExecuteRunInputs<'a> {
254 path: &'a str,
255 trace: bool,
256 denied_builtins: HashSet<String>,
257 script_argv: Vec<String>,
258 skill_dirs_raw: Vec<String>,
259 llm_mock_mode: CliLlmMockMode,
260 attestation: Option<RunAttestationOptions>,
261 profile: RunProfileOptions,
262 interrupt_tokens: Option<RunInterruptTokens>,
263}
264
265#[derive(Clone, Debug, Default)]
269pub struct RunOutcome {
270 pub stdout: String,
271 pub stderr: String,
272 pub exit_code: i32,
273}
274
275pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
276 harn_vm::llm::clear_cli_llm_mock_mode();
277 match mode {
278 CliLlmMockMode::Off => Ok(()),
279 CliLlmMockMode::Replay { fixture_path } => {
280 let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
281 harn_vm::llm::install_cli_llm_mocks(mocks);
282 Ok(())
283 }
284 CliLlmMockMode::Record { .. } => {
285 harn_vm::llm::enable_cli_llm_mock_recording();
286 Ok(())
287 }
288 }
289}
290
291pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
292 let CliLlmMockMode::Record { fixture_path } = mode else {
293 return Ok(());
294 };
295 if let Some(parent) = fixture_path.parent() {
296 if !parent.as_os_str().is_empty() {
297 fs::create_dir_all(parent).map_err(|error| {
298 format!(
299 "failed to create fixture directory {}: {error}",
300 parent.display()
301 )
302 })?;
303 }
304 }
305
306 let lines = harn_vm::llm::take_cli_llm_recordings()
307 .into_iter()
308 .map(harn_vm::llm::serialize_llm_mock)
309 .collect::<Result<Vec<_>, _>>()?;
310 let body = if lines.is_empty() {
311 String::new()
312 } else {
313 format!("{}\n", lines.join("\n"))
314 };
315 fs::write(fixture_path, body)
316 .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
317}
318
319pub(crate) async fn run_file(
320 path: &str,
321 trace: bool,
322 denied_builtins: HashSet<String>,
323 script_argv: Vec<String>,
324 llm_mock_mode: CliLlmMockMode,
325 attestation: Option<RunAttestationOptions>,
326 profile: RunProfileOptions,
327) {
328 run_file_with_skill_dirs(
329 path,
330 trace,
331 denied_builtins,
332 script_argv,
333 Vec::new(),
334 llm_mock_mode,
335 attestation,
336 profile,
337 )
338 .await;
339}
340
341pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
342 let outcome = execute_explain_cost(path);
343 if !outcome.stderr.is_empty() {
344 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
345 }
346 if !outcome.stdout.is_empty() {
347 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
348 }
349 if outcome.exit_code != 0 {
350 process::exit(outcome.exit_code);
351 }
352}
353
354pub(crate) async fn run_file_with_skill_dirs(
355 path: &str,
356 trace: bool,
357 denied_builtins: HashSet<String>,
358 script_argv: Vec<String>,
359 skill_dirs_raw: Vec<String>,
360 llm_mock_mode: CliLlmMockMode,
361 attestation: Option<RunAttestationOptions>,
362 profile: RunProfileOptions,
363) {
364 let interrupt_tokens = install_signal_shutdown_handler();
366
367 let _stdout_passthrough = StdoutPassthroughGuard::enable();
368 let outcome = execute_run_inner(ExecuteRunInputs {
369 path,
370 trace,
371 denied_builtins,
372 script_argv,
373 skill_dirs_raw,
374 llm_mock_mode,
375 attestation,
376 profile,
377 interrupt_tokens: Some(interrupt_tokens.clone()),
378 })
379 .await;
380
381 if !outcome.stderr.is_empty() {
384 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
385 }
386 if !outcome.stdout.is_empty() {
387 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
388 }
389
390 let mut exit_code = outcome.exit_code;
391 if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
392 exit_code = 124;
393 }
394 if exit_code != 0 {
395 process::exit(exit_code);
396 }
397}
398
399pub fn execute_explain_cost(path: &str) -> RunOutcome {
400 let stdout = String::new();
401 let mut stderr = String::new();
402
403 let (source, program) = parse_source_file(path);
404
405 let mut had_type_error = false;
406 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
407 for diag in &type_diagnostics {
408 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
409 if matches!(diag.severity, DiagnosticSeverity::Error) {
410 had_type_error = true;
411 }
412 stderr.push_str(&rendered);
413 }
414 if had_type_error {
415 return RunOutcome {
416 stdout,
417 stderr,
418 exit_code: 1,
419 };
420 }
421
422 let extensions = package::load_runtime_extensions(Path::new(path));
423 package::install_runtime_extensions(&extensions);
424 RunOutcome {
425 stdout: explain_cost::render_explain_cost(path, &program),
426 stderr,
427 exit_code: 0,
428 }
429}
430
431struct StdoutPassthroughGuard {
432 previous: bool,
433}
434
435impl StdoutPassthroughGuard {
436 fn enable() -> Self {
437 Self {
438 previous: harn_vm::set_stdout_passthrough(true),
439 }
440 }
441}
442
443impl Drop for StdoutPassthroughGuard {
444 fn drop(&mut self) {
445 harn_vm::set_stdout_passthrough(self.previous);
446 }
447}
448
449const FIRST_SIGNAL_MESSAGE: &str =
458 "[harn] signal received, interrupting VM (give it a moment to unwind in-flight async ops; Ctrl-C again to force-exit)...";
459
460fn install_signal_shutdown_handler() -> RunInterruptTokens {
461 let tokens = RunInterruptTokens {
462 cancel_token: Arc::new(AtomicBool::new(false)),
463 signal_token: Arc::new(Mutex::new(None)),
464 };
465 let tokens_clone = tokens.clone();
466 tokio::spawn(async move {
467 #[cfg(unix)]
468 {
469 use tokio::signal::unix::{signal, SignalKind};
470 let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
471 let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
472 let mut sighup = signal(SignalKind::hangup()).expect("SIGHUP handler");
473 let mut seen_signal = false;
474 loop {
475 let signal_name = tokio::select! {
476 _ = sigterm.recv() => "SIGTERM",
477 _ = sigint.recv() => "SIGINT",
478 _ = sighup.recv() => "SIGHUP",
479 };
480 if seen_signal {
481 eprintln!("[harn] second signal received, terminating");
482 process::exit(124);
483 }
484 seen_signal = true;
485 request_vm_interrupt(&tokens_clone, signal_name);
486 eprintln!("{FIRST_SIGNAL_MESSAGE}");
487 }
488 }
489 #[cfg(not(unix))]
490 {
491 let mut seen_signal = false;
492 loop {
493 let _ = tokio::signal::ctrl_c().await;
494 if seen_signal {
495 eprintln!("[harn] second signal received, terminating");
496 process::exit(124);
497 }
498 seen_signal = true;
499 request_vm_interrupt(&tokens_clone, "SIGINT");
500 eprintln!("{FIRST_SIGNAL_MESSAGE}");
501 }
502 }
503 });
504 tokens
505}
506
507fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
508 if let Ok(mut signal) = tokens.signal_token.lock() {
509 *signal = Some(signal_name.to_string());
510 }
511 tokens.cancel_token.store(true, Ordering::SeqCst);
512}
513
514pub async fn execute_run(
520 path: &str,
521 trace: bool,
522 denied_builtins: HashSet<String>,
523 script_argv: Vec<String>,
524 skill_dirs_raw: Vec<String>,
525 llm_mock_mode: CliLlmMockMode,
526 attestation: Option<RunAttestationOptions>,
527 profile: RunProfileOptions,
528) -> RunOutcome {
529 execute_run_inner(ExecuteRunInputs {
530 path,
531 trace,
532 denied_builtins,
533 script_argv,
534 skill_dirs_raw,
535 llm_mock_mode,
536 attestation,
537 profile,
538 interrupt_tokens: None,
539 })
540 .await
541}
542
543async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
544 let ExecuteRunInputs {
545 path,
546 trace,
547 denied_builtins,
548 script_argv,
549 skill_dirs_raw,
550 llm_mock_mode,
551 attestation,
552 profile,
553 interrupt_tokens,
554 } = inputs;
555
556 let mut stderr = String::new();
557 let mut stdout = String::new();
558
559 let (source, program) = parse_source_file(path);
560
561 let mut had_type_error = false;
562 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
563 for diag in &type_diagnostics {
564 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
565 if matches!(diag.severity, DiagnosticSeverity::Error) {
566 had_type_error = true;
567 }
568 stderr.push_str(&rendered);
569 }
570 if had_type_error {
571 return RunOutcome {
572 stdout,
573 stderr,
574 exit_code: 1,
575 };
576 }
577
578 let chunk = match harn_vm::Compiler::new().compile(&program) {
579 Ok(c) => c,
580 Err(e) => {
581 stderr.push_str(&format!("error: compile error: {e}\n"));
582 return RunOutcome {
583 stdout,
584 stderr,
585 exit_code: 1,
586 };
587 }
588 };
589
590 if trace {
591 harn_vm::llm::enable_tracing();
592 }
593 if profile.is_enabled() {
594 harn_vm::tracing::set_tracing_enabled(true);
595 }
596 if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
597 stderr.push_str(&format!("error: {error}\n"));
598 return RunOutcome {
599 stdout,
600 stderr,
601 exit_code: 1,
602 };
603 }
604
605 let mut vm = harn_vm::Vm::new();
606 if let Some(interrupt_tokens) = interrupt_tokens {
607 vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
608 vm.install_cancel_token(interrupt_tokens.cancel_token);
609 }
610 harn_vm::register_vm_stdlib(&mut vm);
611 crate::install_default_hostlib(&mut vm);
612 let source_parent = std::path::Path::new(path)
613 .parent()
614 .unwrap_or(std::path::Path::new("."));
615 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
617 let store_base = project_root.as_deref().unwrap_or(source_parent);
618 let attestation_started_at_ms = now_ms();
619 let attestation_log = if attestation.is_some() {
620 Some(harn_vm::event_log::install_memory_for_current_thread(256))
621 } else {
622 None
623 };
624 if let Some(log) = attestation_log.as_ref() {
625 append_run_provenance_event(
626 log,
627 "started",
628 serde_json::json!({
629 "pipeline": path,
630 "argv": &script_argv,
631 "project_root": store_base.display().to_string(),
632 }),
633 )
634 .await;
635 }
636 harn_vm::register_store_builtins(&mut vm, store_base);
637 harn_vm::register_metadata_builtins(&mut vm, store_base);
638 let pipeline_name = std::path::Path::new(path)
639 .file_stem()
640 .and_then(|s| s.to_str())
641 .unwrap_or("default");
642 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
643 vm.set_source_info(path, &source);
644 if !denied_builtins.is_empty() {
645 vm.set_denied_builtins(denied_builtins);
646 }
647 if let Some(ref root) = project_root {
648 vm.set_project_root(root);
649 }
650
651 if let Some(p) = std::path::Path::new(path).parent() {
652 if !p.as_os_str().is_empty() {
653 vm.set_source_dir(p);
654 }
655 }
656
657 let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
660 let loaded = load_skills(&SkillLoaderInputs {
661 cli_dirs,
662 source_path: Some(std::path::PathBuf::from(path)),
663 });
664 emit_loader_warnings(&loaded.loader_warnings);
665 install_skills_global(&mut vm, &loaded);
666
667 let argv_values: Vec<harn_vm::VmValue> = script_argv
670 .iter()
671 .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
672 .collect();
673 vm.set_global(
674 "argv",
675 harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
676 );
677
678 let extensions = package::load_runtime_extensions(Path::new(path));
679 package::install_runtime_extensions(&extensions);
680 if let Some(manifest) = extensions.root_manifest.as_ref() {
681 if !manifest.mcp.is_empty() {
682 connect_mcp_servers(&manifest.mcp, &mut vm).await;
683 }
684 }
685 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
686 stderr.push_str(&format!(
687 "error: failed to install manifest triggers: {error}\n"
688 ));
689 return RunOutcome {
690 stdout,
691 stderr,
692 exit_code: 1,
693 };
694 }
695 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
696 stderr.push_str(&format!(
697 "error: failed to install manifest hooks: {error}\n"
698 ));
699 return RunOutcome {
700 stdout,
701 stderr,
702 exit_code: 1,
703 };
704 }
705
706 let local = tokio::task::LocalSet::new();
708 let execution = local
709 .run_until(async {
710 match vm.execute(&chunk).await {
711 Ok(value) => Ok((vm.output(), value)),
712 Err(e) => Err(vm.format_runtime_error(&e)),
713 }
714 })
715 .await;
716 if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
717 stderr.push_str(&format!("error: {error}\n"));
718 return RunOutcome {
719 stdout,
720 stderr,
721 exit_code: 1,
722 };
723 }
724
725 let buffered_stderr = harn_vm::take_stderr_buffer();
727 stderr.push_str(&buffered_stderr);
728
729 let exit_code = match &execution {
730 Ok((_, return_value)) => exit_code_from_return_value(return_value),
731 Err(_) => 1,
732 };
733
734 if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
735 if let Err(error) = emit_run_attestation(
736 log,
737 path,
738 store_base,
739 attestation_started_at_ms,
740 exit_code,
741 options,
742 &mut stderr,
743 )
744 .await
745 {
746 stderr.push_str(&format!(
747 "error: failed to emit provenance receipt: {error}\n"
748 ));
749 return RunOutcome {
750 stdout,
751 stderr,
752 exit_code: 1,
753 };
754 }
755 harn_vm::event_log::reset_active_event_log();
756 }
757
758 match execution {
759 Ok((output, return_value)) => {
760 stdout.push_str(output);
761 if trace {
762 stderr.push_str(&render_trace_summary());
763 }
764 if profile.is_enabled() {
765 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
766 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
767 }
768 }
769 if exit_code != 0 {
770 stderr.push_str(&render_return_value_error(&return_value));
771 }
772 RunOutcome {
773 stdout,
774 stderr,
775 exit_code,
776 }
777 }
778 Err(rendered_error) => {
779 stderr.push_str(&rendered_error);
780 if profile.is_enabled() {
781 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
782 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
783 }
784 }
785 RunOutcome {
786 stdout,
787 stderr,
788 exit_code: 1,
789 }
790 }
791 }
792}
793
794fn render_and_persist_profile(
795 options: &RunProfileOptions,
796 stderr: &mut String,
797) -> Result<(), String> {
798 let spans = harn_vm::tracing::peek_spans();
799 let profile = harn_vm::profile::build(&spans);
800 if options.text {
801 stderr.push_str(&harn_vm::profile::render(&profile));
802 }
803 if let Some(path) = options.json_path.as_ref() {
804 if let Some(parent) = path.parent() {
805 if !parent.as_os_str().is_empty() {
806 fs::create_dir_all(parent)
807 .map_err(|error| format!("create {}: {error}", parent.display()))?;
808 }
809 }
810 let json = serde_json::to_string_pretty(&profile)
811 .map_err(|error| format!("serialize profile: {error}"))?;
812 fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
813 }
814 Ok(())
815}
816
817async fn append_run_provenance_event(
818 log: &Arc<harn_vm::event_log::AnyEventLog>,
819 kind: &str,
820 payload: serde_json::Value,
821) {
822 let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
823 return;
824 };
825 let _ = log
826 .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
827 .await;
828}
829
830async fn emit_run_attestation(
831 log: &Arc<harn_vm::event_log::AnyEventLog>,
832 path: &str,
833 store_base: &Path,
834 started_at_ms: i64,
835 exit_code: i32,
836 options: &RunAttestationOptions,
837 stderr: &mut String,
838) -> Result<(), String> {
839 let finished_at_ms = now_ms();
840 let status = if exit_code == 0 { "success" } else { "failure" };
841 append_run_provenance_event(
842 log,
843 "finished",
844 serde_json::json!({
845 "pipeline": path,
846 "status": status,
847 "exit_code": exit_code,
848 }),
849 )
850 .await;
851 log.flush()
852 .await
853 .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
854 let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
855 .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
856 let (signing_key, key_id) =
857 harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
858 .await
859 .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
860 let receipt = harn_vm::build_signed_receipt(
861 log,
862 harn_vm::ReceiptBuildOptions {
863 pipeline: path.to_string(),
864 status: status.to_string(),
865 started_at_ms,
866 finished_at_ms,
867 exit_code,
868 producer_name: "harn-cli".to_string(),
869 producer_version: env!("CARGO_PKG_VERSION").to_string(),
870 },
871 &signing_key,
872 key_id,
873 )
874 .await
875 .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
876 let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
877 if let Some(parent) = receipt_path.parent() {
878 fs::create_dir_all(parent)
879 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
880 }
881 let encoded = serde_json::to_vec_pretty(&receipt)
882 .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
883 fs::write(&receipt_path, encoded)
884 .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
885 stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
886 Ok(())
887}
888
889fn receipt_output_path(
890 store_base: &Path,
891 options: &RunAttestationOptions,
892 receipt_id: &str,
893) -> PathBuf {
894 if let Some(path) = options.receipt_out.as_ref() {
895 return path.clone();
896 }
897 harn_vm::runtime_paths::state_root(store_base)
898 .join("receipts")
899 .join(format!("{receipt_id}.json"))
900}
901
902fn now_ms() -> i64 {
903 std::time::SystemTime::now()
904 .duration_since(std::time::UNIX_EPOCH)
905 .map(|duration| duration.as_millis() as i64)
906 .unwrap_or(0)
907}
908
909fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
916 use harn_vm::VmValue;
917 match value {
918 VmValue::Int(n) => (*n).clamp(0, 255) as i32,
919 VmValue::EnumVariant {
920 enum_name,
921 variant,
922 fields,
923 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => 1,
924 _ => 0,
925 }
926}
927
928fn render_return_value_error(value: &harn_vm::VmValue) -> String {
929 let harn_vm::VmValue::EnumVariant {
930 enum_name,
931 variant,
932 fields,
933 } = value
934 else {
935 return String::new();
936 };
937 if enum_name.as_ref() != "Result" || variant.as_ref() != "Err" {
938 return String::new();
939 }
940 let rendered = fields.first().map(|p| p.display()).unwrap_or_default();
941 if rendered.is_empty() {
942 "error\n".to_string()
943 } else if rendered.ends_with('\n') {
944 rendered
945 } else {
946 format!("{rendered}\n")
947 }
948}
949
950pub(crate) async fn connect_mcp_servers(
959 servers: &[package::McpServerConfig],
960 vm: &mut harn_vm::Vm,
961) {
962 use std::collections::BTreeMap;
963 use std::rc::Rc;
964 use std::time::Duration;
965
966 let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
967 let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
968
969 for server in servers {
970 let resolved_auth = match mcp::resolve_auth_for_server(server).await {
971 Ok(resolution) => resolution,
972 Err(error) => {
973 eprintln!(
974 "warning: mcp: failed to load auth for '{}': {}",
975 server.name, error
976 );
977 AuthResolution::None
978 }
979 };
980 let spec = serde_json::json!({
981 "name": server.name,
982 "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
983 "command": server.command,
984 "args": server.args,
985 "env": server.env,
986 "url": server.url,
987 "auth_token": match resolved_auth {
988 AuthResolution::Bearer(token) => Some(token),
989 AuthResolution::None => server.auth_token.clone(),
990 },
991 "protocol_version": server.protocol_version,
992 "proxy_server_name": server.proxy_server_name,
993 });
994
995 registrations.push(harn_vm::RegisteredMcpServer {
998 name: server.name.clone(),
999 spec: spec.clone(),
1000 lazy: server.lazy,
1001 card: server.card.clone(),
1002 keep_alive: server.keep_alive_ms.map(Duration::from_millis),
1003 });
1004
1005 if server.lazy {
1006 eprintln!(
1007 "[harn] mcp: deferred '{}' (lazy, boots on first use)",
1008 server.name
1009 );
1010 continue;
1011 }
1012
1013 match harn_vm::connect_mcp_server_from_json(&spec).await {
1014 Ok(handle) => {
1015 eprintln!("[harn] mcp: connected to '{}'", server.name);
1016 harn_vm::mcp_install_active(&server.name, handle.clone());
1017 mcp_dict.insert(server.name.clone(), harn_vm::VmValue::McpClient(handle));
1018 }
1019 Err(e) => {
1020 eprintln!(
1021 "warning: mcp: failed to connect to '{}': {}",
1022 server.name, e
1023 );
1024 }
1025 }
1026 }
1027
1028 harn_vm::mcp_register_servers(registrations);
1031
1032 if !mcp_dict.is_empty() {
1033 vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
1034 }
1035}
1036
1037pub(crate) fn render_trace_summary() -> String {
1038 use std::fmt::Write;
1039 let entries = harn_vm::llm::take_trace();
1040 if entries.is_empty() {
1041 return String::new();
1042 }
1043 let mut out = String::new();
1044 let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
1045 let mut total_input = 0i64;
1046 let mut total_output = 0i64;
1047 let mut total_ms = 0u64;
1048 for (i, entry) in entries.iter().enumerate() {
1049 let _ = writeln!(
1050 out,
1051 " #{}: {} | {} in + {} out tokens | {} ms",
1052 i + 1,
1053 entry.model,
1054 entry.input_tokens,
1055 entry.output_tokens,
1056 entry.duration_ms,
1057 );
1058 total_input += entry.input_tokens;
1059 total_output += entry.output_tokens;
1060 total_ms += entry.duration_ms;
1061 }
1062 let total_tokens = total_input + total_output;
1063 let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
1065 let _ = writeln!(
1066 out,
1067 " \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
1068 entries.len(),
1069 if entries.len() == 1 { "" } else { "s" },
1070 total_tokens,
1071 total_input,
1072 total_output,
1073 total_ms,
1074 cost,
1075 );
1076 out
1077}
1078
1079pub(crate) async fn run_file_mcp_serve(
1093 path: &str,
1094 card_source: Option<&str>,
1095 mode: RunFileMcpServeMode,
1096) {
1097 let (source, program) = crate::parse_source_file(path);
1098
1099 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
1100 for diag in &type_diagnostics {
1101 match diag.severity {
1102 DiagnosticSeverity::Error => {
1103 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1104 eprint!("{rendered}");
1105 process::exit(1);
1106 }
1107 DiagnosticSeverity::Warning => {
1108 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1109 eprint!("{rendered}");
1110 }
1111 }
1112 }
1113
1114 let chunk = match harn_vm::Compiler::new().compile(&program) {
1115 Ok(c) => c,
1116 Err(e) => {
1117 eprintln!("error: compile error: {e}");
1118 process::exit(1);
1119 }
1120 };
1121
1122 let mut vm = harn_vm::Vm::new();
1123 harn_vm::register_vm_stdlib(&mut vm);
1124 crate::install_default_hostlib(&mut vm);
1125 let source_parent = std::path::Path::new(path)
1126 .parent()
1127 .unwrap_or(std::path::Path::new("."));
1128 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1129 let store_base = project_root.as_deref().unwrap_or(source_parent);
1130 harn_vm::register_store_builtins(&mut vm, store_base);
1131 harn_vm::register_metadata_builtins(&mut vm, store_base);
1132 let pipeline_name = std::path::Path::new(path)
1133 .file_stem()
1134 .and_then(|s| s.to_str())
1135 .unwrap_or("default");
1136 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1137 vm.set_source_info(path, &source);
1138 if let Some(ref root) = project_root {
1139 vm.set_project_root(root);
1140 }
1141 if let Some(p) = std::path::Path::new(path).parent() {
1142 if !p.as_os_str().is_empty() {
1143 vm.set_source_dir(p);
1144 }
1145 }
1146
1147 let loaded = load_skills(&SkillLoaderInputs {
1149 cli_dirs: Vec::new(),
1150 source_path: Some(std::path::PathBuf::from(path)),
1151 });
1152 emit_loader_warnings(&loaded.loader_warnings);
1153 install_skills_global(&mut vm, &loaded);
1154
1155 let extensions = package::load_runtime_extensions(Path::new(path));
1156 package::install_runtime_extensions(&extensions);
1157 if let Some(manifest) = extensions.root_manifest.as_ref() {
1158 if !manifest.mcp.is_empty() {
1159 connect_mcp_servers(&manifest.mcp, &mut vm).await;
1160 }
1161 }
1162 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1163 eprintln!("error: failed to install manifest triggers: {error}");
1164 process::exit(1);
1165 }
1166 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1167 eprintln!("error: failed to install manifest hooks: {error}");
1168 process::exit(1);
1169 }
1170
1171 let local = tokio::task::LocalSet::new();
1172 local
1173 .run_until(async {
1174 match vm.execute(&chunk).await {
1175 Ok(_) => {}
1176 Err(e) => {
1177 eprint!("{}", vm.format_runtime_error(&e));
1178 process::exit(1);
1179 }
1180 }
1181
1182 let output = vm.output();
1184 if !output.is_empty() {
1185 eprint!("{output}");
1186 }
1187
1188 let registry = match harn_vm::take_mcp_serve_registry() {
1189 Some(r) => r,
1190 None => {
1191 eprintln!("error: pipeline did not call mcp_serve(registry)");
1192 eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
1193 process::exit(1);
1194 }
1195 };
1196
1197 let tools = match harn_vm::tool_registry_to_mcp_tools(®istry) {
1198 Ok(t) => t,
1199 Err(e) => {
1200 eprintln!("error: {e}");
1201 process::exit(1);
1202 }
1203 };
1204
1205 let resources = harn_vm::take_mcp_serve_resources();
1206 let resource_templates = harn_vm::take_mcp_serve_resource_templates();
1207 let prompts = harn_vm::take_mcp_serve_prompts();
1208
1209 let server_name = std::path::Path::new(path)
1210 .file_stem()
1211 .and_then(|s| s.to_str())
1212 .unwrap_or("harn")
1213 .to_string();
1214
1215 let mut caps = Vec::new();
1216 if !tools.is_empty() {
1217 caps.push(format!(
1218 "{} tool{}",
1219 tools.len(),
1220 if tools.len() == 1 { "" } else { "s" }
1221 ));
1222 }
1223 let total_resources = resources.len() + resource_templates.len();
1224 if total_resources > 0 {
1225 caps.push(format!(
1226 "{total_resources} resource{}",
1227 if total_resources == 1 { "" } else { "s" }
1228 ));
1229 }
1230 if !prompts.is_empty() {
1231 caps.push(format!(
1232 "{} prompt{}",
1233 prompts.len(),
1234 if prompts.len() == 1 { "" } else { "s" }
1235 ));
1236 }
1237 eprintln!(
1238 "[harn] serve mcp: serving {} as '{server_name}'",
1239 caps.join(", ")
1240 );
1241
1242 let mut server =
1243 harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
1244 if let Some(source) = card_source {
1245 match resolve_card_source(source) {
1246 Ok(card) => server = server.with_server_card(card),
1247 Err(e) => {
1248 eprintln!("error: --card: {e}");
1249 process::exit(1);
1250 }
1251 }
1252 }
1253 match mode {
1254 RunFileMcpServeMode::Stdio => {
1255 if let Err(e) = server.run(&mut vm).await {
1256 eprintln!("error: MCP server error: {e}");
1257 process::exit(1);
1258 }
1259 }
1260 RunFileMcpServeMode::Http {
1261 options,
1262 auth_policy,
1263 } => {
1264 if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
1265 server,
1266 vm,
1267 options,
1268 auth_policy,
1269 )
1270 .await
1271 {
1272 eprintln!("error: MCP server error: {e}");
1273 process::exit(1);
1274 }
1275 }
1276 }
1277 })
1278 .await;
1279}
1280
1281pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
1286 let trimmed = source.trim_start();
1287 if trimmed.starts_with('{') || trimmed.starts_with('[') {
1288 return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
1289 }
1290 let path = std::path::Path::new(source);
1291 harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
1292}
1293
1294pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
1295 use notify::{Event, EventKind, RecursiveMode, Watcher};
1296
1297 let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
1298 eprintln!("Error: {e}");
1299 process::exit(1);
1300 });
1301 let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
1302
1303 eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
1304 run_file(
1305 path,
1306 false,
1307 denied_builtins.clone(),
1308 Vec::new(),
1309 CliLlmMockMode::Off,
1310 None,
1311 RunProfileOptions::default(),
1312 )
1313 .await;
1314
1315 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1316 let _watcher = {
1317 let tx = tx.clone();
1318 let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
1319 if let Ok(event) = res {
1320 if matches!(
1321 event.kind,
1322 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
1323 ) {
1324 let has_harn = event
1325 .paths
1326 .iter()
1327 .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
1328 if has_harn {
1329 let _ = tx.blocking_send(());
1330 }
1331 }
1332 }
1333 })
1334 .unwrap_or_else(|e| {
1335 eprintln!("Error setting up file watcher: {e}");
1336 process::exit(1);
1337 });
1338 watcher
1339 .watch(watch_dir, RecursiveMode::Recursive)
1340 .unwrap_or_else(|e| {
1341 eprintln!("Error watching directory: {e}");
1342 process::exit(1);
1343 });
1344 watcher };
1346
1347 eprintln!(
1348 "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
1349 watch_dir.display()
1350 );
1351
1352 loop {
1353 rx.recv().await;
1354 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1356 while rx.try_recv().is_ok() {}
1357
1358 eprintln!();
1359 eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
1360 run_file(
1361 path,
1362 false,
1363 denied_builtins.clone(),
1364 Vec::new(),
1365 CliLlmMockMode::Off,
1366 None,
1367 RunProfileOptions::default(),
1368 )
1369 .await;
1370 }
1371}
1372
1373#[cfg(test)]
1374mod tests {
1375 use super::{
1376 execute_explain_cost, execute_run, split_eval_header, CliLlmMockMode, RunProfileOptions,
1377 StdoutPassthroughGuard,
1378 };
1379 use std::collections::HashSet;
1380
1381 #[test]
1382 fn split_eval_header_no_imports_returns_full_body() {
1383 let (header, body) = split_eval_header("println(1 + 2)");
1384 assert_eq!(header, "");
1385 assert_eq!(body, "println(1 + 2)");
1386 }
1387
1388 #[test]
1389 fn split_eval_header_lifts_leading_imports() {
1390 let code = "import \"./lib\"\nimport { x } from \"std/math\"\nprintln(x)";
1391 let (header, body) = split_eval_header(code);
1392 assert_eq!(header, "import \"./lib\"\nimport { x } from \"std/math\"");
1393 assert_eq!(body, "println(x)");
1394 }
1395
1396 #[test]
1397 fn split_eval_header_keeps_pub_import_and_comments_in_header() {
1398 let code = "// header comment\npub import { y } from \"./lib\"\n\nfoo()";
1399 let (header, body) = split_eval_header(code);
1400 assert_eq!(
1401 header,
1402 "// header comment\npub import { y } from \"./lib\"\n"
1403 );
1404 assert_eq!(body, "foo()");
1405 }
1406
1407 #[test]
1408 fn split_eval_header_does_not_lift_imports_after_other_statements() {
1409 let code = "let a = 1\nimport \"./lib\"";
1410 let (header, body) = split_eval_header(code);
1411 assert_eq!(header, "");
1412 assert_eq!(body, "let a = 1\nimport \"./lib\"");
1413 }
1414
1415 #[test]
1416 fn cli_llm_mock_roundtrips_logprobs() {
1417 let mock = harn_vm::llm::parse_llm_mock_value(&serde_json::json!({
1418 "text": "visible",
1419 "logprobs": [{"token": "visible", "logprob": 0.0}]
1420 }))
1421 .expect("parse mock");
1422 assert_eq!(mock.logprobs.len(), 1);
1423
1424 let line = harn_vm::llm::serialize_llm_mock(mock).expect("serialize mock");
1425 let value: serde_json::Value = serde_json::from_str(&line).expect("json line");
1426 assert_eq!(value["logprobs"][0]["token"].as_str(), Some("visible"));
1427
1428 let reparsed = harn_vm::llm::parse_llm_mock_value(&value).expect("reparse mock");
1429 assert_eq!(reparsed.logprobs.len(), 1);
1430 assert_eq!(reparsed.logprobs[0]["logprob"].as_f64(), Some(0.0));
1431 }
1432
1433 #[test]
1434 fn stdout_passthrough_guard_restores_previous_state() {
1435 let original = harn_vm::set_stdout_passthrough(false);
1436 {
1437 let _guard = StdoutPassthroughGuard::enable();
1438 assert!(harn_vm::set_stdout_passthrough(true));
1439 }
1440 assert!(!harn_vm::set_stdout_passthrough(original));
1441 }
1442
1443 #[test]
1444 fn execute_explain_cost_does_not_execute_script() {
1445 let temp = tempfile::TempDir::new().expect("temp dir");
1446 let script = temp.path().join("main.harn");
1447 std::fs::write(
1448 &script,
1449 r#"
1450pipeline main() {
1451 write_file("executed.txt", "bad")
1452 llm_call("hello", nil, {provider: "mock", model: "mock"})
1453}
1454"#,
1455 )
1456 .expect("write script");
1457
1458 let outcome = execute_explain_cost(&script.to_string_lossy());
1459
1460 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1461 assert!(outcome.stdout.contains("LLM cost estimate"));
1462 assert!(
1463 !temp.path().join("executed.txt").exists(),
1464 "--explain-cost must not execute pipeline side effects"
1465 );
1466 }
1467
1468 #[cfg(feature = "hostlib")]
1469 #[tokio::test]
1470 async fn execute_run_installs_hostlib_gate() {
1471 let temp = tempfile::NamedTempFile::new().expect("temp file");
1472 std::fs::write(
1473 temp.path(),
1474 r#"
1475pipeline main() {
1476 let _ = hostlib_enable("tools:deterministic")
1477 println("enabled")
1478}
1479"#,
1480 )
1481 .expect("write script");
1482
1483 let outcome = execute_run(
1484 &temp.path().to_string_lossy(),
1485 false,
1486 HashSet::new(),
1487 Vec::new(),
1488 Vec::new(),
1489 CliLlmMockMode::Off,
1490 None,
1491 RunProfileOptions::default(),
1492 )
1493 .await;
1494
1495 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1496 assert_eq!(outcome.stdout.trim(), "enabled");
1497 }
1498
1499 #[cfg(all(feature = "hostlib", unix))]
1500 #[tokio::test]
1501 async fn execute_run_can_read_hostlib_command_artifacts() {
1502 let temp = tempfile::NamedTempFile::new().expect("temp file");
1503 std::fs::write(
1504 temp.path(),
1505 r#"
1506pipeline main() {
1507 let _ = hostlib_enable("tools:deterministic")
1508 let result = hostlib_tools_run_command({
1509 argv: ["sh", "-c", "i=0; while [ $i -lt 2000 ]; do printf x; i=$((i+1)); done"],
1510 capture: {max_inline_bytes: 8},
1511 timeout_ms: 5000,
1512 })
1513 println(starts_with(result.command_id, "cmd_"))
1514 println(len(result.stdout))
1515 println(result.byte_count)
1516 let window = hostlib_tools_read_command_output({
1517 command_id: result.command_id,
1518 offset: 1990,
1519 length: 20,
1520 })
1521 println(len(window.content))
1522 println(window.eof)
1523}
1524"#,
1525 )
1526 .expect("write script");
1527
1528 let outcome = execute_run(
1529 &temp.path().to_string_lossy(),
1530 false,
1531 HashSet::new(),
1532 Vec::new(),
1533 Vec::new(),
1534 CliLlmMockMode::Off,
1535 None,
1536 RunProfileOptions::default(),
1537 )
1538 .await;
1539
1540 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1541 assert_eq!(outcome.stdout.trim(), "true\n8\n2000\n10\ntrue");
1542 }
1543}