1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9use harn_parser::DiagnosticSeverity;
10use harn_vm::event_log::EventLog;
11
12use crate::commands::mcp::{self, AuthResolution};
13use crate::package;
14use crate::parse_source_file;
15use crate::skill_loader::{
16 canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
17 SkillLoaderInputs,
18};
19
20mod explain_cost;
21
22pub(crate) enum RunFileMcpServeMode {
23 Stdio,
24 Http {
25 options: harn_serve::McpHttpServeOptions,
26 auth_policy: harn_serve::AuthPolicy,
27 },
28}
29
30const CORE_BUILTINS: &[&str] = &[
32 "println",
33 "print",
34 "log",
35 "type_of",
36 "to_string",
37 "to_int",
38 "to_float",
39 "len",
40 "assert",
41 "assert_eq",
42 "assert_ne",
43 "json_parse",
44 "json_stringify",
45 "runtime_context",
46 "task_current",
47 "runtime_context_values",
48 "runtime_context_get",
49 "runtime_context_set",
50 "runtime_context_clear",
51];
52
53pub(crate) fn build_denied_builtins(
58 deny_csv: Option<&str>,
59 allow_csv: Option<&str>,
60) -> HashSet<String> {
61 if let Some(csv) = deny_csv {
62 csv.split(',')
63 .map(|s| s.trim().to_string())
64 .filter(|s| !s.is_empty())
65 .collect()
66 } else if let Some(csv) = allow_csv {
67 let allowed: HashSet<String> = csv
70 .split(',')
71 .map(|s| s.trim().to_string())
72 .filter(|s| !s.is_empty())
73 .collect();
74 let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
75
76 let mut tmp = harn_vm::Vm::new();
78 harn_vm::register_vm_stdlib(&mut tmp);
79 harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
80 harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
81
82 tmp.builtin_names()
83 .into_iter()
84 .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
85 .collect()
86 } else {
87 HashSet::new()
88 }
89}
90
91fn typecheck_with_imports(
96 program: &[harn_parser::SNode],
97 path: &Path,
98 source: &str,
99) -> Vec<harn_parser::TypeDiagnostic> {
100 if let Err(error) = package::ensure_dependencies_materialized(path) {
101 eprintln!("error: {error}");
102 process::exit(1);
103 }
104 let graph = harn_modules::build(&[path.to_path_buf()]);
105 let mut checker = harn_parser::TypeChecker::new();
106 if let Some(imported) = graph.imported_names_for_file(path) {
107 checker = checker.with_imported_names(imported);
108 }
109 if let Some(imported) = graph.imported_type_declarations_for_file(path) {
110 checker = checker.with_imported_type_decls(imported);
111 }
112 checker.check_with_source(program, source)
113}
114
115pub(crate) fn prepare_eval_temp_file(
126 code: &str,
127) -> Result<(String, tempfile::NamedTempFile), String> {
128 let (header, body) = split_eval_header(code);
129 let wrapped = if header.is_empty() {
130 format!("pipeline main(task) {{\n{body}\n}}")
131 } else {
132 format!("{header}\npipeline main(task) {{\n{body}\n}}")
133 };
134
135 let tmp = create_eval_temp_file()?;
136 Ok((wrapped, tmp))
137}
138
139fn create_eval_temp_file() -> Result<tempfile::NamedTempFile, String> {
144 if let Some(dir) = std::env::current_dir().ok().as_deref() {
145 match tempfile::Builder::new()
148 .prefix(".harn-eval-")
149 .suffix(".harn")
150 .tempfile_in(dir)
151 {
152 Ok(tmp) => return Ok(tmp),
153 Err(error) => eprintln!(
154 "warning: harn run -e: could not create temp file in {}: {error}; \
155 relative imports will not resolve",
156 dir.display()
157 ),
158 }
159 }
160 tempfile::Builder::new()
161 .prefix("harn-eval-")
162 .suffix(".harn")
163 .tempfile()
164 .map_err(|e| format!("failed to create temp file for -e: {e}"))
165}
166
167fn split_eval_header(code: &str) -> (String, String) {
175 let mut header_end = 0usize;
176 let mut last_kept = 0usize;
177 for (idx, line) in code.lines().enumerate() {
178 let trimmed = line.trim_start();
179 if trimmed.is_empty() || trimmed.starts_with("//") {
180 header_end = idx + 1;
181 continue;
182 }
183 let is_import = trimmed.starts_with("import ")
184 || trimmed.starts_with("import\t")
185 || trimmed.starts_with("import\"")
186 || trimmed.starts_with("pub import ")
187 || trimmed.starts_with("pub import\t");
188 if is_import {
189 header_end = idx + 1;
190 last_kept = idx + 1;
191 } else {
192 break;
193 }
194 }
195 if last_kept == 0 {
196 return (String::new(), code.to_string());
197 }
198 let mut header_lines: Vec<&str> = Vec::new();
199 let mut body_lines: Vec<&str> = Vec::new();
200 for (idx, line) in code.lines().enumerate() {
201 if idx < header_end {
202 header_lines.push(line);
203 } else {
204 body_lines.push(line);
205 }
206 }
207 (header_lines.join("\n"), body_lines.join("\n"))
208}
209
210#[derive(Clone, Debug, Default, PartialEq, Eq)]
211pub enum CliLlmMockMode {
212 #[default]
213 Off,
214 Replay {
215 fixture_path: PathBuf,
216 },
217 Record {
218 fixture_path: PathBuf,
219 },
220}
221
222#[derive(Clone, Debug, Default, PartialEq, Eq)]
223pub struct RunAttestationOptions {
224 pub receipt_out: Option<PathBuf>,
225 pub agent_id: Option<String>,
226}
227
228#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct RunProfileOptions {
234 pub text: bool,
235 pub json_path: Option<PathBuf>,
236}
237
238impl RunProfileOptions {
239 pub fn is_enabled(&self) -> bool {
240 self.text || self.json_path.is_some()
241 }
242}
243
244#[derive(Clone)]
245pub struct RunInterruptTokens {
246 pub cancel_token: Arc<AtomicBool>,
247 pub signal_token: Arc<Mutex<Option<String>>>,
248}
249
250struct ExecuteRunInputs<'a> {
251 path: &'a str,
252 trace: bool,
253 denied_builtins: HashSet<String>,
254 script_argv: Vec<String>,
255 skill_dirs_raw: Vec<String>,
256 llm_mock_mode: CliLlmMockMode,
257 attestation: Option<RunAttestationOptions>,
258 profile: RunProfileOptions,
259 interrupt_tokens: Option<RunInterruptTokens>,
260}
261
262#[derive(Clone, Debug, Default)]
266pub struct RunOutcome {
267 pub stdout: String,
268 pub stderr: String,
269 pub exit_code: i32,
270}
271
272pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
273 harn_vm::llm::clear_cli_llm_mock_mode();
274 match mode {
275 CliLlmMockMode::Off => Ok(()),
276 CliLlmMockMode::Replay { fixture_path } => {
277 let mocks = harn_vm::llm::load_llm_mocks_jsonl(fixture_path)?;
278 harn_vm::llm::install_cli_llm_mocks(mocks);
279 Ok(())
280 }
281 CliLlmMockMode::Record { .. } => {
282 harn_vm::llm::enable_cli_llm_mock_recording();
283 Ok(())
284 }
285 }
286}
287
288pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
289 let CliLlmMockMode::Record { fixture_path } = mode else {
290 return Ok(());
291 };
292 if let Some(parent) = fixture_path.parent() {
293 if !parent.as_os_str().is_empty() {
294 fs::create_dir_all(parent).map_err(|error| {
295 format!(
296 "failed to create fixture directory {}: {error}",
297 parent.display()
298 )
299 })?;
300 }
301 }
302
303 let lines = harn_vm::llm::take_cli_llm_recordings()
304 .into_iter()
305 .map(harn_vm::llm::serialize_llm_mock)
306 .collect::<Result<Vec<_>, _>>()?;
307 let body = if lines.is_empty() {
308 String::new()
309 } else {
310 format!("{}\n", lines.join("\n"))
311 };
312 fs::write(fixture_path, body)
313 .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
314}
315
316pub(crate) async fn run_file(
317 path: &str,
318 trace: bool,
319 denied_builtins: HashSet<String>,
320 script_argv: Vec<String>,
321 llm_mock_mode: CliLlmMockMode,
322 attestation: Option<RunAttestationOptions>,
323 profile: RunProfileOptions,
324) {
325 run_file_with_skill_dirs(
326 path,
327 trace,
328 denied_builtins,
329 script_argv,
330 Vec::new(),
331 llm_mock_mode,
332 attestation,
333 profile,
334 )
335 .await;
336}
337
338pub(crate) fn run_explain_cost_file_with_skill_dirs(path: &str) {
339 let outcome = execute_explain_cost(path);
340 if !outcome.stderr.is_empty() {
341 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
342 }
343 if !outcome.stdout.is_empty() {
344 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
345 }
346 if outcome.exit_code != 0 {
347 process::exit(outcome.exit_code);
348 }
349}
350
351pub(crate) async fn run_file_with_skill_dirs(
352 path: &str,
353 trace: bool,
354 denied_builtins: HashSet<String>,
355 script_argv: Vec<String>,
356 skill_dirs_raw: Vec<String>,
357 llm_mock_mode: CliLlmMockMode,
358 attestation: Option<RunAttestationOptions>,
359 profile: RunProfileOptions,
360) {
361 let interrupt_tokens = install_signal_shutdown_handler();
363
364 let _stdout_passthrough = StdoutPassthroughGuard::enable();
365 let outcome = execute_run_inner(ExecuteRunInputs {
366 path,
367 trace,
368 denied_builtins,
369 script_argv,
370 skill_dirs_raw,
371 llm_mock_mode,
372 attestation,
373 profile,
374 interrupt_tokens: Some(interrupt_tokens.clone()),
375 })
376 .await;
377
378 if !outcome.stderr.is_empty() {
381 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
382 }
383 if !outcome.stdout.is_empty() {
384 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
385 }
386
387 let mut exit_code = outcome.exit_code;
388 if exit_code != 0 && interrupt_tokens.cancel_token.load(Ordering::SeqCst) {
389 exit_code = 124;
390 }
391 if exit_code != 0 {
392 process::exit(exit_code);
393 }
394}
395
396pub fn execute_explain_cost(path: &str) -> RunOutcome {
397 let stdout = String::new();
398 let mut stderr = String::new();
399
400 let (source, program) = parse_source_file(path);
401
402 let mut had_type_error = false;
403 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
404 for diag in &type_diagnostics {
405 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
406 if matches!(diag.severity, DiagnosticSeverity::Error) {
407 had_type_error = true;
408 }
409 stderr.push_str(&rendered);
410 }
411 if had_type_error {
412 return RunOutcome {
413 stdout,
414 stderr,
415 exit_code: 1,
416 };
417 }
418
419 let extensions = package::load_runtime_extensions(Path::new(path));
420 package::install_runtime_extensions(&extensions);
421 RunOutcome {
422 stdout: explain_cost::render_explain_cost(path, &program),
423 stderr,
424 exit_code: 0,
425 }
426}
427
428struct StdoutPassthroughGuard {
429 previous: bool,
430}
431
432impl StdoutPassthroughGuard {
433 fn enable() -> Self {
434 Self {
435 previous: harn_vm::set_stdout_passthrough(true),
436 }
437 }
438}
439
440impl Drop for StdoutPassthroughGuard {
441 fn drop(&mut self) {
442 harn_vm::set_stdout_passthrough(self.previous);
443 }
444}
445
446fn install_signal_shutdown_handler() -> RunInterruptTokens {
447 let tokens = RunInterruptTokens {
448 cancel_token: Arc::new(AtomicBool::new(false)),
449 signal_token: Arc::new(Mutex::new(None)),
450 };
451 let tokens_clone = tokens.clone();
452 tokio::spawn(async move {
453 #[cfg(unix)]
454 {
455 use tokio::signal::unix::{signal, SignalKind};
456 let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
457 let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
458 let mut sighup = signal(SignalKind::hangup()).expect("SIGHUP handler");
459 let mut seen_signal = false;
460 loop {
461 let signal_name = tokio::select! {
462 _ = sigterm.recv() => "SIGTERM",
463 _ = sigint.recv() => "SIGINT",
464 _ = sighup.recv() => "SIGHUP",
465 };
466 if seen_signal {
467 eprintln!("[harn] second signal received, terminating");
468 process::exit(124);
469 }
470 seen_signal = true;
471 request_vm_interrupt(&tokens_clone, signal_name);
472 eprintln!("[harn] signal received, interrupting VM...");
473 }
474 }
475 #[cfg(not(unix))]
476 {
477 let mut seen_signal = false;
478 loop {
479 let _ = tokio::signal::ctrl_c().await;
480 if seen_signal {
481 eprintln!("[harn] second signal received, terminating");
482 process::exit(124);
483 }
484 seen_signal = true;
485 request_vm_interrupt(&tokens_clone, "SIGINT");
486 eprintln!("[harn] signal received, interrupting VM...");
487 }
488 }
489 });
490 tokens
491}
492
493fn request_vm_interrupt(tokens: &RunInterruptTokens, signal_name: &str) {
494 if let Ok(mut signal) = tokens.signal_token.lock() {
495 *signal = Some(signal_name.to_string());
496 }
497 tokens.cancel_token.store(true, Ordering::SeqCst);
498}
499
500pub async fn execute_run(
506 path: &str,
507 trace: bool,
508 denied_builtins: HashSet<String>,
509 script_argv: Vec<String>,
510 skill_dirs_raw: Vec<String>,
511 llm_mock_mode: CliLlmMockMode,
512 attestation: Option<RunAttestationOptions>,
513 profile: RunProfileOptions,
514) -> RunOutcome {
515 execute_run_inner(ExecuteRunInputs {
516 path,
517 trace,
518 denied_builtins,
519 script_argv,
520 skill_dirs_raw,
521 llm_mock_mode,
522 attestation,
523 profile,
524 interrupt_tokens: None,
525 })
526 .await
527}
528
529async fn execute_run_inner(inputs: ExecuteRunInputs<'_>) -> RunOutcome {
530 let ExecuteRunInputs {
531 path,
532 trace,
533 denied_builtins,
534 script_argv,
535 skill_dirs_raw,
536 llm_mock_mode,
537 attestation,
538 profile,
539 interrupt_tokens,
540 } = inputs;
541
542 let mut stderr = String::new();
543 let mut stdout = String::new();
544
545 let (source, program) = parse_source_file(path);
546
547 let mut had_type_error = false;
548 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
549 for diag in &type_diagnostics {
550 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
551 if matches!(diag.severity, DiagnosticSeverity::Error) {
552 had_type_error = true;
553 }
554 stderr.push_str(&rendered);
555 }
556 if had_type_error {
557 return RunOutcome {
558 stdout,
559 stderr,
560 exit_code: 1,
561 };
562 }
563
564 let chunk = match harn_vm::Compiler::new().compile(&program) {
565 Ok(c) => c,
566 Err(e) => {
567 stderr.push_str(&format!("error: compile error: {e}\n"));
568 return RunOutcome {
569 stdout,
570 stderr,
571 exit_code: 1,
572 };
573 }
574 };
575
576 if trace {
577 harn_vm::llm::enable_tracing();
578 }
579 if profile.is_enabled() {
580 harn_vm::tracing::set_tracing_enabled(true);
581 }
582 if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
583 stderr.push_str(&format!("error: {error}\n"));
584 return RunOutcome {
585 stdout,
586 stderr,
587 exit_code: 1,
588 };
589 }
590
591 let mut vm = harn_vm::Vm::new();
592 if let Some(interrupt_tokens) = interrupt_tokens {
593 vm.install_interrupt_signal_token(interrupt_tokens.signal_token);
594 vm.install_cancel_token(interrupt_tokens.cancel_token);
595 }
596 harn_vm::register_vm_stdlib(&mut vm);
597 crate::install_default_hostlib(&mut vm);
598 let source_parent = std::path::Path::new(path)
599 .parent()
600 .unwrap_or(std::path::Path::new("."));
601 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
603 let store_base = project_root.as_deref().unwrap_or(source_parent);
604 let attestation_started_at_ms = now_ms();
605 let attestation_log = if attestation.is_some() {
606 Some(harn_vm::event_log::install_memory_for_current_thread(256))
607 } else {
608 None
609 };
610 if let Some(log) = attestation_log.as_ref() {
611 append_run_provenance_event(
612 log,
613 "started",
614 serde_json::json!({
615 "pipeline": path,
616 "argv": &script_argv,
617 "project_root": store_base.display().to_string(),
618 }),
619 )
620 .await;
621 }
622 harn_vm::register_store_builtins(&mut vm, store_base);
623 harn_vm::register_metadata_builtins(&mut vm, store_base);
624 let pipeline_name = std::path::Path::new(path)
625 .file_stem()
626 .and_then(|s| s.to_str())
627 .unwrap_or("default");
628 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
629 vm.set_source_info(path, &source);
630 if !denied_builtins.is_empty() {
631 vm.set_denied_builtins(denied_builtins);
632 }
633 if let Some(ref root) = project_root {
634 vm.set_project_root(root);
635 }
636
637 if let Some(p) = std::path::Path::new(path).parent() {
638 if !p.as_os_str().is_empty() {
639 vm.set_source_dir(p);
640 }
641 }
642
643 let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
646 let loaded = load_skills(&SkillLoaderInputs {
647 cli_dirs,
648 source_path: Some(std::path::PathBuf::from(path)),
649 });
650 emit_loader_warnings(&loaded.loader_warnings);
651 install_skills_global(&mut vm, &loaded);
652
653 let argv_values: Vec<harn_vm::VmValue> = script_argv
656 .iter()
657 .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
658 .collect();
659 vm.set_global(
660 "argv",
661 harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
662 );
663
664 let extensions = package::load_runtime_extensions(Path::new(path));
665 package::install_runtime_extensions(&extensions);
666 if let Some(manifest) = extensions.root_manifest.as_ref() {
667 if !manifest.mcp.is_empty() {
668 connect_mcp_servers(&manifest.mcp, &mut vm).await;
669 }
670 }
671 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
672 stderr.push_str(&format!(
673 "error: failed to install manifest triggers: {error}\n"
674 ));
675 return RunOutcome {
676 stdout,
677 stderr,
678 exit_code: 1,
679 };
680 }
681 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
682 stderr.push_str(&format!(
683 "error: failed to install manifest hooks: {error}\n"
684 ));
685 return RunOutcome {
686 stdout,
687 stderr,
688 exit_code: 1,
689 };
690 }
691
692 let local = tokio::task::LocalSet::new();
694 let execution = local
695 .run_until(async {
696 match vm.execute(&chunk).await {
697 Ok(value) => Ok((vm.output(), value)),
698 Err(e) => Err(vm.format_runtime_error(&e)),
699 }
700 })
701 .await;
702 if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
703 stderr.push_str(&format!("error: {error}\n"));
704 return RunOutcome {
705 stdout,
706 stderr,
707 exit_code: 1,
708 };
709 }
710
711 let buffered_stderr = harn_vm::take_stderr_buffer();
713 stderr.push_str(&buffered_stderr);
714
715 let exit_code = match &execution {
716 Ok((_, return_value)) => exit_code_from_return_value(return_value),
717 Err(_) => 1,
718 };
719
720 if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
721 if let Err(error) = emit_run_attestation(
722 log,
723 path,
724 store_base,
725 attestation_started_at_ms,
726 exit_code,
727 options,
728 &mut stderr,
729 )
730 .await
731 {
732 stderr.push_str(&format!(
733 "error: failed to emit provenance receipt: {error}\n"
734 ));
735 return RunOutcome {
736 stdout,
737 stderr,
738 exit_code: 1,
739 };
740 }
741 harn_vm::event_log::reset_active_event_log();
742 }
743
744 match execution {
745 Ok((output, return_value)) => {
746 stdout.push_str(output);
747 if trace {
748 stderr.push_str(&render_trace_summary());
749 }
750 if profile.is_enabled() {
751 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
752 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
753 }
754 }
755 if exit_code != 0 {
756 stderr.push_str(&render_return_value_error(&return_value));
757 }
758 RunOutcome {
759 stdout,
760 stderr,
761 exit_code,
762 }
763 }
764 Err(rendered_error) => {
765 stderr.push_str(&rendered_error);
766 if profile.is_enabled() {
767 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
768 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
769 }
770 }
771 RunOutcome {
772 stdout,
773 stderr,
774 exit_code: 1,
775 }
776 }
777 }
778}
779
780fn render_and_persist_profile(
781 options: &RunProfileOptions,
782 stderr: &mut String,
783) -> Result<(), String> {
784 let spans = harn_vm::tracing::peek_spans();
785 let profile = harn_vm::profile::build(&spans);
786 if options.text {
787 stderr.push_str(&harn_vm::profile::render(&profile));
788 }
789 if let Some(path) = options.json_path.as_ref() {
790 if let Some(parent) = path.parent() {
791 if !parent.as_os_str().is_empty() {
792 fs::create_dir_all(parent)
793 .map_err(|error| format!("create {}: {error}", parent.display()))?;
794 }
795 }
796 let json = serde_json::to_string_pretty(&profile)
797 .map_err(|error| format!("serialize profile: {error}"))?;
798 fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
799 }
800 Ok(())
801}
802
803async fn append_run_provenance_event(
804 log: &Arc<harn_vm::event_log::AnyEventLog>,
805 kind: &str,
806 payload: serde_json::Value,
807) {
808 let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
809 return;
810 };
811 let _ = log
812 .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
813 .await;
814}
815
816async fn emit_run_attestation(
817 log: &Arc<harn_vm::event_log::AnyEventLog>,
818 path: &str,
819 store_base: &Path,
820 started_at_ms: i64,
821 exit_code: i32,
822 options: &RunAttestationOptions,
823 stderr: &mut String,
824) -> Result<(), String> {
825 let finished_at_ms = now_ms();
826 let status = if exit_code == 0 { "success" } else { "failure" };
827 append_run_provenance_event(
828 log,
829 "finished",
830 serde_json::json!({
831 "pipeline": path,
832 "status": status,
833 "exit_code": exit_code,
834 }),
835 )
836 .await;
837 log.flush()
838 .await
839 .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
840 let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
841 .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
842 let (signing_key, key_id) =
843 harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
844 .await
845 .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
846 let receipt = harn_vm::build_signed_receipt(
847 log,
848 harn_vm::ReceiptBuildOptions {
849 pipeline: path.to_string(),
850 status: status.to_string(),
851 started_at_ms,
852 finished_at_ms,
853 exit_code,
854 producer_name: "harn-cli".to_string(),
855 producer_version: env!("CARGO_PKG_VERSION").to_string(),
856 },
857 &signing_key,
858 key_id,
859 )
860 .await
861 .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
862 let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
863 if let Some(parent) = receipt_path.parent() {
864 fs::create_dir_all(parent)
865 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
866 }
867 let encoded = serde_json::to_vec_pretty(&receipt)
868 .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
869 fs::write(&receipt_path, encoded)
870 .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
871 stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
872 Ok(())
873}
874
875fn receipt_output_path(
876 store_base: &Path,
877 options: &RunAttestationOptions,
878 receipt_id: &str,
879) -> PathBuf {
880 if let Some(path) = options.receipt_out.as_ref() {
881 return path.clone();
882 }
883 harn_vm::runtime_paths::state_root(store_base)
884 .join("receipts")
885 .join(format!("{receipt_id}.json"))
886}
887
888fn now_ms() -> i64 {
889 std::time::SystemTime::now()
890 .duration_since(std::time::UNIX_EPOCH)
891 .map(|duration| duration.as_millis() as i64)
892 .unwrap_or(0)
893}
894
895fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
902 use harn_vm::VmValue;
903 match value {
904 VmValue::Int(n) => (*n).clamp(0, 255) as i32,
905 VmValue::EnumVariant {
906 enum_name,
907 variant,
908 fields,
909 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => 1,
910 _ => 0,
911 }
912}
913
914fn render_return_value_error(value: &harn_vm::VmValue) -> String {
915 let harn_vm::VmValue::EnumVariant {
916 enum_name,
917 variant,
918 fields,
919 } = value
920 else {
921 return String::new();
922 };
923 if enum_name.as_ref() != "Result" || variant.as_ref() != "Err" {
924 return String::new();
925 }
926 let rendered = fields.first().map(|p| p.display()).unwrap_or_default();
927 if rendered.is_empty() {
928 "error\n".to_string()
929 } else if rendered.ends_with('\n') {
930 rendered
931 } else {
932 format!("{rendered}\n")
933 }
934}
935
936pub(crate) async fn connect_mcp_servers(
945 servers: &[package::McpServerConfig],
946 vm: &mut harn_vm::Vm,
947) {
948 use std::collections::BTreeMap;
949 use std::rc::Rc;
950 use std::time::Duration;
951
952 let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
953 let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
954
955 for server in servers {
956 let resolved_auth = match mcp::resolve_auth_for_server(server).await {
957 Ok(resolution) => resolution,
958 Err(error) => {
959 eprintln!(
960 "warning: mcp: failed to load auth for '{}': {}",
961 server.name, error
962 );
963 AuthResolution::None
964 }
965 };
966 let spec = serde_json::json!({
967 "name": server.name,
968 "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
969 "command": server.command,
970 "args": server.args,
971 "env": server.env,
972 "url": server.url,
973 "auth_token": match resolved_auth {
974 AuthResolution::Bearer(token) => Some(token),
975 AuthResolution::None => server.auth_token.clone(),
976 },
977 "protocol_version": server.protocol_version,
978 "proxy_server_name": server.proxy_server_name,
979 });
980
981 registrations.push(harn_vm::RegisteredMcpServer {
984 name: server.name.clone(),
985 spec: spec.clone(),
986 lazy: server.lazy,
987 card: server.card.clone(),
988 keep_alive: server.keep_alive_ms.map(Duration::from_millis),
989 });
990
991 if server.lazy {
992 eprintln!(
993 "[harn] mcp: deferred '{}' (lazy, boots on first use)",
994 server.name
995 );
996 continue;
997 }
998
999 match harn_vm::connect_mcp_server_from_json(&spec).await {
1000 Ok(handle) => {
1001 eprintln!("[harn] mcp: connected to '{}'", server.name);
1002 harn_vm::mcp_install_active(&server.name, handle.clone());
1003 mcp_dict.insert(server.name.clone(), harn_vm::VmValue::McpClient(handle));
1004 }
1005 Err(e) => {
1006 eprintln!(
1007 "warning: mcp: failed to connect to '{}': {}",
1008 server.name, e
1009 );
1010 }
1011 }
1012 }
1013
1014 harn_vm::mcp_register_servers(registrations);
1017
1018 if !mcp_dict.is_empty() {
1019 vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
1020 }
1021}
1022
1023fn render_trace_summary() -> String {
1024 use std::fmt::Write;
1025 let entries = harn_vm::llm::take_trace();
1026 if entries.is_empty() {
1027 return String::new();
1028 }
1029 let mut out = String::new();
1030 let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
1031 let mut total_input = 0i64;
1032 let mut total_output = 0i64;
1033 let mut total_ms = 0u64;
1034 for (i, entry) in entries.iter().enumerate() {
1035 let _ = writeln!(
1036 out,
1037 " #{}: {} | {} in + {} out tokens | {} ms",
1038 i + 1,
1039 entry.model,
1040 entry.input_tokens,
1041 entry.output_tokens,
1042 entry.duration_ms,
1043 );
1044 total_input += entry.input_tokens;
1045 total_output += entry.output_tokens;
1046 total_ms += entry.duration_ms;
1047 }
1048 let total_tokens = total_input + total_output;
1049 let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
1051 let _ = writeln!(
1052 out,
1053 " \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
1054 entries.len(),
1055 if entries.len() == 1 { "" } else { "s" },
1056 total_tokens,
1057 total_input,
1058 total_output,
1059 total_ms,
1060 cost,
1061 );
1062 out
1063}
1064
1065pub(crate) async fn run_file_mcp_serve(
1079 path: &str,
1080 card_source: Option<&str>,
1081 mode: RunFileMcpServeMode,
1082) {
1083 let (source, program) = crate::parse_source_file(path);
1084
1085 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
1086 for diag in &type_diagnostics {
1087 match diag.severity {
1088 DiagnosticSeverity::Error => {
1089 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1090 eprint!("{rendered}");
1091 process::exit(1);
1092 }
1093 DiagnosticSeverity::Warning => {
1094 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1095 eprint!("{rendered}");
1096 }
1097 }
1098 }
1099
1100 let chunk = match harn_vm::Compiler::new().compile(&program) {
1101 Ok(c) => c,
1102 Err(e) => {
1103 eprintln!("error: compile error: {e}");
1104 process::exit(1);
1105 }
1106 };
1107
1108 let mut vm = harn_vm::Vm::new();
1109 harn_vm::register_vm_stdlib(&mut vm);
1110 crate::install_default_hostlib(&mut vm);
1111 let source_parent = std::path::Path::new(path)
1112 .parent()
1113 .unwrap_or(std::path::Path::new("."));
1114 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1115 let store_base = project_root.as_deref().unwrap_or(source_parent);
1116 harn_vm::register_store_builtins(&mut vm, store_base);
1117 harn_vm::register_metadata_builtins(&mut vm, store_base);
1118 let pipeline_name = std::path::Path::new(path)
1119 .file_stem()
1120 .and_then(|s| s.to_str())
1121 .unwrap_or("default");
1122 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1123 vm.set_source_info(path, &source);
1124 if let Some(ref root) = project_root {
1125 vm.set_project_root(root);
1126 }
1127 if let Some(p) = std::path::Path::new(path).parent() {
1128 if !p.as_os_str().is_empty() {
1129 vm.set_source_dir(p);
1130 }
1131 }
1132
1133 let loaded = load_skills(&SkillLoaderInputs {
1135 cli_dirs: Vec::new(),
1136 source_path: Some(std::path::PathBuf::from(path)),
1137 });
1138 emit_loader_warnings(&loaded.loader_warnings);
1139 install_skills_global(&mut vm, &loaded);
1140
1141 let extensions = package::load_runtime_extensions(Path::new(path));
1142 package::install_runtime_extensions(&extensions);
1143 if let Some(manifest) = extensions.root_manifest.as_ref() {
1144 if !manifest.mcp.is_empty() {
1145 connect_mcp_servers(&manifest.mcp, &mut vm).await;
1146 }
1147 }
1148 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1149 eprintln!("error: failed to install manifest triggers: {error}");
1150 process::exit(1);
1151 }
1152 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1153 eprintln!("error: failed to install manifest hooks: {error}");
1154 process::exit(1);
1155 }
1156
1157 let local = tokio::task::LocalSet::new();
1158 local
1159 .run_until(async {
1160 match vm.execute(&chunk).await {
1161 Ok(_) => {}
1162 Err(e) => {
1163 eprint!("{}", vm.format_runtime_error(&e));
1164 process::exit(1);
1165 }
1166 }
1167
1168 let output = vm.output();
1170 if !output.is_empty() {
1171 eprint!("{output}");
1172 }
1173
1174 let registry = match harn_vm::take_mcp_serve_registry() {
1175 Some(r) => r,
1176 None => {
1177 eprintln!("error: pipeline did not call mcp_serve(registry)");
1178 eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
1179 process::exit(1);
1180 }
1181 };
1182
1183 let tools = match harn_vm::tool_registry_to_mcp_tools(®istry) {
1184 Ok(t) => t,
1185 Err(e) => {
1186 eprintln!("error: {e}");
1187 process::exit(1);
1188 }
1189 };
1190
1191 let resources = harn_vm::take_mcp_serve_resources();
1192 let resource_templates = harn_vm::take_mcp_serve_resource_templates();
1193 let prompts = harn_vm::take_mcp_serve_prompts();
1194
1195 let server_name = std::path::Path::new(path)
1196 .file_stem()
1197 .and_then(|s| s.to_str())
1198 .unwrap_or("harn")
1199 .to_string();
1200
1201 let mut caps = Vec::new();
1202 if !tools.is_empty() {
1203 caps.push(format!(
1204 "{} tool{}",
1205 tools.len(),
1206 if tools.len() == 1 { "" } else { "s" }
1207 ));
1208 }
1209 let total_resources = resources.len() + resource_templates.len();
1210 if total_resources > 0 {
1211 caps.push(format!(
1212 "{total_resources} resource{}",
1213 if total_resources == 1 { "" } else { "s" }
1214 ));
1215 }
1216 if !prompts.is_empty() {
1217 caps.push(format!(
1218 "{} prompt{}",
1219 prompts.len(),
1220 if prompts.len() == 1 { "" } else { "s" }
1221 ));
1222 }
1223 eprintln!(
1224 "[harn] serve mcp: serving {} as '{server_name}'",
1225 caps.join(", ")
1226 );
1227
1228 let mut server =
1229 harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
1230 if let Some(source) = card_source {
1231 match resolve_card_source(source) {
1232 Ok(card) => server = server.with_server_card(card),
1233 Err(e) => {
1234 eprintln!("error: --card: {e}");
1235 process::exit(1);
1236 }
1237 }
1238 }
1239 match mode {
1240 RunFileMcpServeMode::Stdio => {
1241 if let Err(e) = server.run(&mut vm).await {
1242 eprintln!("error: MCP server error: {e}");
1243 process::exit(1);
1244 }
1245 }
1246 RunFileMcpServeMode::Http {
1247 options,
1248 auth_policy,
1249 } => {
1250 if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
1251 server,
1252 vm,
1253 options,
1254 auth_policy,
1255 )
1256 .await
1257 {
1258 eprintln!("error: MCP server error: {e}");
1259 process::exit(1);
1260 }
1261 }
1262 }
1263 })
1264 .await;
1265}
1266
1267pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
1272 let trimmed = source.trim_start();
1273 if trimmed.starts_with('{') || trimmed.starts_with('[') {
1274 return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
1275 }
1276 let path = std::path::Path::new(source);
1277 harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
1278}
1279
1280pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
1281 use notify::{Event, EventKind, RecursiveMode, Watcher};
1282
1283 let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
1284 eprintln!("Error: {e}");
1285 process::exit(1);
1286 });
1287 let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
1288
1289 eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
1290 run_file(
1291 path,
1292 false,
1293 denied_builtins.clone(),
1294 Vec::new(),
1295 CliLlmMockMode::Off,
1296 None,
1297 RunProfileOptions::default(),
1298 )
1299 .await;
1300
1301 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1302 let _watcher = {
1303 let tx = tx.clone();
1304 let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
1305 if let Ok(event) = res {
1306 if matches!(
1307 event.kind,
1308 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
1309 ) {
1310 let has_harn = event
1311 .paths
1312 .iter()
1313 .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
1314 if has_harn {
1315 let _ = tx.blocking_send(());
1316 }
1317 }
1318 }
1319 })
1320 .unwrap_or_else(|e| {
1321 eprintln!("Error setting up file watcher: {e}");
1322 process::exit(1);
1323 });
1324 watcher
1325 .watch(watch_dir, RecursiveMode::Recursive)
1326 .unwrap_or_else(|e| {
1327 eprintln!("Error watching directory: {e}");
1328 process::exit(1);
1329 });
1330 watcher };
1332
1333 eprintln!(
1334 "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
1335 watch_dir.display()
1336 );
1337
1338 loop {
1339 rx.recv().await;
1340 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1342 while rx.try_recv().is_ok() {}
1343
1344 eprintln!();
1345 eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
1346 run_file(
1347 path,
1348 false,
1349 denied_builtins.clone(),
1350 Vec::new(),
1351 CliLlmMockMode::Off,
1352 None,
1353 RunProfileOptions::default(),
1354 )
1355 .await;
1356 }
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361 use super::{
1362 execute_explain_cost, execute_run, split_eval_header, CliLlmMockMode, RunProfileOptions,
1363 StdoutPassthroughGuard,
1364 };
1365 use std::collections::HashSet;
1366
1367 #[test]
1368 fn split_eval_header_no_imports_returns_full_body() {
1369 let (header, body) = split_eval_header("println(1 + 2)");
1370 assert_eq!(header, "");
1371 assert_eq!(body, "println(1 + 2)");
1372 }
1373
1374 #[test]
1375 fn split_eval_header_lifts_leading_imports() {
1376 let code = "import \"./lib\"\nimport { x } from \"std/math\"\nprintln(x)";
1377 let (header, body) = split_eval_header(code);
1378 assert_eq!(header, "import \"./lib\"\nimport { x } from \"std/math\"");
1379 assert_eq!(body, "println(x)");
1380 }
1381
1382 #[test]
1383 fn split_eval_header_keeps_pub_import_and_comments_in_header() {
1384 let code = "// header comment\npub import { y } from \"./lib\"\n\nfoo()";
1385 let (header, body) = split_eval_header(code);
1386 assert_eq!(
1387 header,
1388 "// header comment\npub import { y } from \"./lib\"\n"
1389 );
1390 assert_eq!(body, "foo()");
1391 }
1392
1393 #[test]
1394 fn split_eval_header_does_not_lift_imports_after_other_statements() {
1395 let code = "let a = 1\nimport \"./lib\"";
1396 let (header, body) = split_eval_header(code);
1397 assert_eq!(header, "");
1398 assert_eq!(body, "let a = 1\nimport \"./lib\"");
1399 }
1400
1401 #[test]
1402 fn cli_llm_mock_roundtrips_logprobs() {
1403 let mock = harn_vm::llm::parse_llm_mock_value(&serde_json::json!({
1404 "text": "visible",
1405 "logprobs": [{"token": "visible", "logprob": 0.0}]
1406 }))
1407 .expect("parse mock");
1408 assert_eq!(mock.logprobs.len(), 1);
1409
1410 let line = harn_vm::llm::serialize_llm_mock(mock).expect("serialize mock");
1411 let value: serde_json::Value = serde_json::from_str(&line).expect("json line");
1412 assert_eq!(value["logprobs"][0]["token"].as_str(), Some("visible"));
1413
1414 let reparsed = harn_vm::llm::parse_llm_mock_value(&value).expect("reparse mock");
1415 assert_eq!(reparsed.logprobs.len(), 1);
1416 assert_eq!(reparsed.logprobs[0]["logprob"].as_f64(), Some(0.0));
1417 }
1418
1419 #[test]
1420 fn stdout_passthrough_guard_restores_previous_state() {
1421 let original = harn_vm::set_stdout_passthrough(false);
1422 {
1423 let _guard = StdoutPassthroughGuard::enable();
1424 assert!(harn_vm::set_stdout_passthrough(true));
1425 }
1426 assert!(!harn_vm::set_stdout_passthrough(original));
1427 }
1428
1429 #[test]
1430 fn execute_explain_cost_does_not_execute_script() {
1431 let temp = tempfile::TempDir::new().expect("temp dir");
1432 let script = temp.path().join("main.harn");
1433 std::fs::write(
1434 &script,
1435 r#"
1436pipeline main() {
1437 write_file("executed.txt", "bad")
1438 llm_call("hello", nil, {provider: "mock", model: "mock"})
1439}
1440"#,
1441 )
1442 .expect("write script");
1443
1444 let outcome = execute_explain_cost(&script.to_string_lossy());
1445
1446 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1447 assert!(outcome.stdout.contains("LLM cost estimate"));
1448 assert!(
1449 !temp.path().join("executed.txt").exists(),
1450 "--explain-cost must not execute pipeline side effects"
1451 );
1452 }
1453
1454 #[cfg(feature = "hostlib")]
1455 #[tokio::test]
1456 async fn execute_run_installs_hostlib_gate() {
1457 let temp = tempfile::NamedTempFile::new().expect("temp file");
1458 std::fs::write(
1459 temp.path(),
1460 r#"
1461pipeline main() {
1462 let _ = hostlib_enable("tools:deterministic")
1463 println("enabled")
1464}
1465"#,
1466 )
1467 .expect("write script");
1468
1469 let outcome = execute_run(
1470 &temp.path().to_string_lossy(),
1471 false,
1472 HashSet::new(),
1473 Vec::new(),
1474 Vec::new(),
1475 CliLlmMockMode::Off,
1476 None,
1477 RunProfileOptions::default(),
1478 )
1479 .await;
1480
1481 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1482 assert_eq!(outcome.stdout.trim(), "enabled");
1483 }
1484
1485 #[cfg(all(feature = "hostlib", unix))]
1486 #[tokio::test]
1487 async fn execute_run_can_read_hostlib_command_artifacts() {
1488 let temp = tempfile::NamedTempFile::new().expect("temp file");
1489 std::fs::write(
1490 temp.path(),
1491 r#"
1492pipeline main() {
1493 let _ = hostlib_enable("tools:deterministic")
1494 let result = hostlib_tools_run_command({
1495 argv: ["sh", "-c", "i=0; while [ $i -lt 2000 ]; do printf x; i=$((i+1)); done"],
1496 capture: {max_inline_bytes: 8},
1497 timeout_ms: 5000,
1498 })
1499 println(starts_with(result.command_id, "cmd_"))
1500 println(len(result.stdout))
1501 println(result.byte_count)
1502 let window = hostlib_tools_read_command_output({
1503 command_id: result.command_id,
1504 offset: 1990,
1505 length: 20,
1506 })
1507 println(len(window.content))
1508 println(window.eof)
1509}
1510"#,
1511 )
1512 .expect("write script");
1513
1514 let outcome = execute_run(
1515 &temp.path().to_string_lossy(),
1516 false,
1517 HashSet::new(),
1518 Vec::new(),
1519 Vec::new(),
1520 CliLlmMockMode::Off,
1521 None,
1522 RunProfileOptions::default(),
1523 )
1524 .await;
1525
1526 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1527 assert_eq!(outcome.stdout.trim(), "true\n8\n2000\n10\ntrue");
1528 }
1529}