1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8
9use harn_parser::DiagnosticSeverity;
10use harn_vm::event_log::EventLog;
11
12use crate::commands::mcp::{self, AuthResolution};
13use crate::package;
14use crate::parse_source_file;
15use crate::skill_loader::{
16 canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
17 SkillLoaderInputs,
18};
19
20pub(crate) enum RunFileMcpServeMode {
21 Stdio,
22 Http {
23 options: harn_serve::McpHttpServeOptions,
24 auth_policy: harn_serve::AuthPolicy,
25 },
26}
27
28const CORE_BUILTINS: &[&str] = &[
30 "println",
31 "print",
32 "log",
33 "type_of",
34 "to_string",
35 "to_int",
36 "to_float",
37 "len",
38 "assert",
39 "assert_eq",
40 "assert_ne",
41 "json_parse",
42 "json_stringify",
43 "runtime_context",
44 "task_current",
45 "runtime_context_values",
46 "runtime_context_get",
47 "runtime_context_set",
48 "runtime_context_clear",
49];
50
51pub(crate) fn build_denied_builtins(
56 deny_csv: Option<&str>,
57 allow_csv: Option<&str>,
58) -> HashSet<String> {
59 if let Some(csv) = deny_csv {
60 csv.split(',')
61 .map(|s| s.trim().to_string())
62 .filter(|s| !s.is_empty())
63 .collect()
64 } else if let Some(csv) = allow_csv {
65 let allowed: HashSet<String> = csv
68 .split(',')
69 .map(|s| s.trim().to_string())
70 .filter(|s| !s.is_empty())
71 .collect();
72 let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
73
74 let mut tmp = harn_vm::Vm::new();
76 harn_vm::register_vm_stdlib(&mut tmp);
77 harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
78 harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
79
80 tmp.builtin_names()
81 .into_iter()
82 .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
83 .collect()
84 } else {
85 HashSet::new()
86 }
87}
88
89fn typecheck_with_imports(
94 program: &[harn_parser::SNode],
95 path: &Path,
96 source: &str,
97) -> Vec<harn_parser::TypeDiagnostic> {
98 if let Err(error) = package::ensure_dependencies_materialized(path) {
99 eprintln!("error: {error}");
100 process::exit(1);
101 }
102 let graph = harn_modules::build(&[path.to_path_buf()]);
103 let mut checker = harn_parser::TypeChecker::new();
104 if let Some(imported) = graph.imported_names_for_file(path) {
105 checker = checker.with_imported_names(imported);
106 }
107 if let Some(imported) = graph.imported_type_declarations_for_file(path) {
108 checker = checker.with_imported_type_decls(imported);
109 }
110 checker.check_with_source(program, source)
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Eq)]
114pub enum CliLlmMockMode {
115 #[default]
116 Off,
117 Replay {
118 fixture_path: PathBuf,
119 },
120 Record {
121 fixture_path: PathBuf,
122 },
123}
124
125#[derive(Clone, Debug, Default, PartialEq, Eq)]
126pub struct RunAttestationOptions {
127 pub receipt_out: Option<PathBuf>,
128 pub agent_id: Option<String>,
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Eq)]
136pub struct RunProfileOptions {
137 pub text: bool,
138 pub json_path: Option<PathBuf>,
139}
140
141impl RunProfileOptions {
142 pub fn is_enabled(&self) -> bool {
143 self.text || self.json_path.is_some()
144 }
145}
146
147#[derive(Clone, Debug, Default)]
151pub struct RunOutcome {
152 pub stdout: String,
153 pub stderr: String,
154 pub exit_code: i32,
155}
156
157fn load_cli_llm_mocks(path: &Path) -> Result<Vec<harn_vm::llm::LlmMock>, String> {
158 let content = fs::read_to_string(path)
159 .map_err(|error| format!("failed to read {}: {error}", path.display()))?;
160 let mut mocks = Vec::new();
161 for (idx, raw_line) in content.lines().enumerate() {
162 let line_no = idx + 1;
163 let line = raw_line.trim();
164 if line.is_empty() {
165 continue;
166 }
167 let value: serde_json::Value = serde_json::from_str(line).map_err(|error| {
168 format!(
169 "invalid JSON in {} line {}: {error}",
170 path.display(),
171 line_no
172 )
173 })?;
174 mocks.push(parse_cli_llm_mock_value(&value).map_err(|error| {
175 format!(
176 "invalid --llm-mock fixture in {} line {}: {error}",
177 path.display(),
178 line_no
179 )
180 })?);
181 }
182 Ok(mocks)
183}
184
185fn parse_cli_llm_mock_value(value: &serde_json::Value) -> Result<harn_vm::llm::LlmMock, String> {
186 let object = value
187 .as_object()
188 .ok_or_else(|| "fixture line must be a JSON object".to_string())?;
189
190 let match_pattern = optional_string_field(object, "match")?;
191 let consume_on_match = object
192 .get("consume_match")
193 .and_then(|value| value.as_bool())
194 .unwrap_or(false);
195 let text = optional_string_field(object, "text")?.unwrap_or_default();
196 let input_tokens = optional_i64_field(object, "input_tokens")?;
197 let output_tokens = optional_i64_field(object, "output_tokens")?;
198 let cache_read_tokens = optional_i64_field(object, "cache_read_tokens")?;
199 let cache_write_tokens = optional_i64_field(object, "cache_write_tokens")?
200 .or(optional_i64_field(object, "cache_creation_input_tokens")?);
201 let thinking = optional_string_field(object, "thinking")?;
202 let thinking_summary = optional_string_field(object, "thinking_summary")?;
203 let stop_reason = optional_string_field(object, "stop_reason")?;
204 let model = optional_string_field(object, "model")?.unwrap_or_else(|| "mock".to_string());
205 let provider = optional_string_field(object, "provider")?;
206 let blocks = optional_vec_field(object, "blocks")?;
207 let tool_calls = parse_cli_llm_tool_calls(object.get("tool_calls"))?;
208 let error = parse_cli_llm_mock_error(object.get("error"))?;
209
210 Ok(harn_vm::llm::LlmMock {
211 text,
212 tool_calls,
213 match_pattern,
214 consume_on_match,
215 input_tokens,
216 output_tokens,
217 cache_read_tokens,
218 cache_write_tokens,
219 thinking,
220 thinking_summary,
221 stop_reason,
222 model,
223 provider,
224 blocks,
225 error,
226 })
227}
228
229fn parse_cli_llm_tool_calls(
230 value: Option<&serde_json::Value>,
231) -> Result<Vec<serde_json::Value>, String> {
232 let Some(value) = value else {
233 return Ok(Vec::new());
234 };
235 let items = value
236 .as_array()
237 .ok_or_else(|| "tool_calls must be an array".to_string())?;
238 items
239 .iter()
240 .enumerate()
241 .map(|(idx, item)| {
242 normalize_cli_llm_tool_call(item).map_err(|error| format!("tool_calls[{idx}] {error}"))
243 })
244 .collect()
245}
246
247fn normalize_cli_llm_tool_call(value: &serde_json::Value) -> Result<serde_json::Value, String> {
248 let object = value
249 .as_object()
250 .ok_or_else(|| "must be a JSON object".to_string())?;
251 let name = object
252 .get("name")
253 .and_then(|value| value.as_str())
254 .ok_or_else(|| "is missing string field `name`".to_string())?;
255 let arguments = object
256 .get("arguments")
257 .cloned()
258 .or_else(|| object.get("args").cloned())
259 .unwrap_or_else(|| serde_json::json!({}));
260 Ok(serde_json::json!({
261 "name": name,
262 "arguments": arguments,
263 }))
264}
265
266fn parse_cli_llm_mock_error(
267 value: Option<&serde_json::Value>,
268) -> Result<Option<harn_vm::llm::MockError>, String> {
269 let Some(value) = value else {
270 return Ok(None);
271 };
272 if value.is_null() {
273 return Ok(None);
274 }
275 let object = value.as_object().ok_or_else(|| {
276 "error must be an object {category, message, retry_after_ms?}".to_string()
277 })?;
278 let category_str = object
279 .get("category")
280 .and_then(|value| value.as_str())
281 .ok_or_else(|| "error.category is required".to_string())?;
282 let category = harn_vm::ErrorCategory::parse(category_str);
283 if category.as_str() != category_str {
284 return Err(format!("unknown error category `{category_str}`"));
285 }
286 let message = object
287 .get("message")
288 .and_then(|value| value.as_str())
289 .unwrap_or_default()
290 .to_string();
291 let retry_after_ms = match object.get("retry_after_ms") {
292 None | Some(serde_json::Value::Null) => None,
293 Some(serde_json::Value::Number(n)) => match n.as_u64() {
294 Some(v) => Some(v),
295 None => return Err("error.retry_after_ms must be a non-negative integer".to_string()),
296 },
297 Some(_) => return Err("error.retry_after_ms must be a non-negative integer".to_string()),
298 };
299 Ok(Some(harn_vm::llm::MockError {
300 category,
301 message,
302 retry_after_ms,
303 }))
304}
305
306fn optional_string_field(
307 object: &serde_json::Map<String, serde_json::Value>,
308 key: &str,
309) -> Result<Option<String>, String> {
310 match object.get(key) {
311 None | Some(serde_json::Value::Null) => Ok(None),
312 Some(serde_json::Value::String(value)) => Ok(Some(value.clone())),
313 Some(_) => Err(format!("`{key}` must be a string")),
314 }
315}
316
317fn optional_i64_field(
318 object: &serde_json::Map<String, serde_json::Value>,
319 key: &str,
320) -> Result<Option<i64>, String> {
321 match object.get(key) {
322 None | Some(serde_json::Value::Null) => Ok(None),
323 Some(value) => value
324 .as_i64()
325 .map(Some)
326 .ok_or_else(|| format!("`{key}` must be an integer")),
327 }
328}
329
330fn optional_vec_field(
331 object: &serde_json::Map<String, serde_json::Value>,
332 key: &str,
333) -> Result<Option<Vec<serde_json::Value>>, String> {
334 match object.get(key) {
335 None | Some(serde_json::Value::Null) => Ok(None),
336 Some(serde_json::Value::Array(items)) => Ok(Some(items.clone())),
337 Some(_) => Err(format!("`{key}` must be an array")),
338 }
339}
340
341pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
342 harn_vm::llm::clear_cli_llm_mock_mode();
343 match mode {
344 CliLlmMockMode::Off => Ok(()),
345 CliLlmMockMode::Replay { fixture_path } => {
346 let mocks = load_cli_llm_mocks(fixture_path)?;
347 harn_vm::llm::install_cli_llm_mocks(mocks);
348 Ok(())
349 }
350 CliLlmMockMode::Record { .. } => {
351 harn_vm::llm::enable_cli_llm_mock_recording();
352 Ok(())
353 }
354 }
355}
356
357pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
358 let CliLlmMockMode::Record { fixture_path } = mode else {
359 return Ok(());
360 };
361 if let Some(parent) = fixture_path.parent() {
362 if !parent.as_os_str().is_empty() {
363 fs::create_dir_all(parent).map_err(|error| {
364 format!(
365 "failed to create fixture directory {}: {error}",
366 parent.display()
367 )
368 })?;
369 }
370 }
371
372 let lines = harn_vm::llm::take_cli_llm_recordings()
373 .into_iter()
374 .map(serialize_cli_llm_mock)
375 .collect::<Result<Vec<_>, _>>()?;
376 let body = if lines.is_empty() {
377 String::new()
378 } else {
379 format!("{}\n", lines.join("\n"))
380 };
381 fs::write(fixture_path, body)
382 .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
383}
384
385fn serialize_cli_llm_mock(mock: harn_vm::llm::LlmMock) -> Result<String, String> {
386 let mut object = serde_json::Map::new();
387 if let Some(match_pattern) = mock.match_pattern {
388 object.insert(
389 "match".to_string(),
390 serde_json::Value::String(match_pattern),
391 );
392 }
393 if !mock.text.is_empty() {
394 object.insert("text".to_string(), serde_json::Value::String(mock.text));
395 }
396 if !mock.tool_calls.is_empty() {
397 let tool_calls = mock
398 .tool_calls
399 .into_iter()
400 .map(|tool_call| {
401 let object = tool_call
402 .as_object()
403 .ok_or_else(|| "recorded tool call must be an object".to_string())?;
404 let name = object
405 .get("name")
406 .and_then(|value| value.as_str())
407 .ok_or_else(|| "recorded tool call is missing `name`".to_string())?;
408 Ok(serde_json::json!({
409 "name": name,
410 "args": object
411 .get("arguments")
412 .cloned()
413 .unwrap_or_else(|| serde_json::json!({})),
414 }))
415 })
416 .collect::<Result<Vec<_>, String>>()?;
417 object.insert(
418 "tool_calls".to_string(),
419 serde_json::Value::Array(tool_calls),
420 );
421 }
422 if let Some(input_tokens) = mock.input_tokens {
423 object.insert(
424 "input_tokens".to_string(),
425 serde_json::Value::Number(input_tokens.into()),
426 );
427 }
428 if let Some(output_tokens) = mock.output_tokens {
429 object.insert(
430 "output_tokens".to_string(),
431 serde_json::Value::Number(output_tokens.into()),
432 );
433 }
434 if let Some(cache_read_tokens) = mock.cache_read_tokens {
435 object.insert(
436 "cache_read_tokens".to_string(),
437 serde_json::Value::Number(cache_read_tokens.into()),
438 );
439 }
440 if let Some(cache_write_tokens) = mock.cache_write_tokens {
441 object.insert(
442 "cache_write_tokens".to_string(),
443 serde_json::Value::Number(cache_write_tokens.into()),
444 );
445 object.insert(
446 "cache_creation_input_tokens".to_string(),
447 serde_json::Value::Number(cache_write_tokens.into()),
448 );
449 }
450 if let Some(thinking) = mock.thinking {
451 object.insert("thinking".to_string(), serde_json::Value::String(thinking));
452 }
453 if let Some(stop_reason) = mock.stop_reason {
454 object.insert(
455 "stop_reason".to_string(),
456 serde_json::Value::String(stop_reason),
457 );
458 }
459 object.insert("model".to_string(), serde_json::Value::String(mock.model));
460 if let Some(provider) = mock.provider {
461 object.insert("provider".to_string(), serde_json::Value::String(provider));
462 }
463 if let Some(blocks) = mock.blocks {
464 object.insert("blocks".to_string(), serde_json::Value::Array(blocks));
465 }
466 if let Some(error) = mock.error {
467 object.insert(
468 "error".to_string(),
469 serde_json::json!({
470 "category": error.category.as_str(),
471 "message": error.message,
472 }),
473 );
474 }
475 serde_json::to_string(&serde_json::Value::Object(object))
476 .map_err(|error| format!("failed to serialize recorded fixture: {error}"))
477}
478
479pub(crate) async fn run_file(
480 path: &str,
481 trace: bool,
482 denied_builtins: HashSet<String>,
483 script_argv: Vec<String>,
484 llm_mock_mode: CliLlmMockMode,
485 attestation: Option<RunAttestationOptions>,
486 profile: RunProfileOptions,
487) {
488 run_file_with_skill_dirs(
489 path,
490 trace,
491 denied_builtins,
492 script_argv,
493 Vec::new(),
494 llm_mock_mode,
495 attestation,
496 profile,
497 )
498 .await;
499}
500
501pub(crate) async fn run_file_with_skill_dirs(
502 path: &str,
503 trace: bool,
504 denied_builtins: HashSet<String>,
505 script_argv: Vec<String>,
506 skill_dirs_raw: Vec<String>,
507 llm_mock_mode: CliLlmMockMode,
508 attestation: Option<RunAttestationOptions>,
509 profile: RunProfileOptions,
510) {
511 let cancelled = install_signal_shutdown_handler();
513
514 let outcome = execute_run(
515 path,
516 trace,
517 denied_builtins,
518 script_argv,
519 skill_dirs_raw,
520 llm_mock_mode,
521 attestation,
522 profile,
523 )
524 .await;
525
526 if !outcome.stderr.is_empty() {
529 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
530 }
531 if !outcome.stdout.is_empty() {
532 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
533 }
534
535 let mut exit_code = outcome.exit_code;
536 if exit_code != 0 && cancelled.load(Ordering::SeqCst) {
537 exit_code = 124;
538 }
539 if exit_code != 0 {
540 process::exit(exit_code);
541 }
542}
543
544fn install_signal_shutdown_handler() -> Arc<AtomicBool> {
545 let cancelled = Arc::new(AtomicBool::new(false));
546 let cancelled_clone = cancelled.clone();
547 tokio::spawn(async move {
548 #[cfg(unix)]
549 {
550 use tokio::signal::unix::{signal, SignalKind};
551 let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
552 let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
553 tokio::select! {
554 _ = sigterm.recv() => {},
555 _ = sigint.recv() => {},
556 }
557 cancelled_clone.store(true, Ordering::SeqCst);
558 eprintln!("[harn] signal received, flushing state...");
559 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
560 process::exit(124);
561 }
562 #[cfg(not(unix))]
563 {
564 let _ = tokio::signal::ctrl_c().await;
565 cancelled_clone.store(true, Ordering::SeqCst);
566 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
567 process::exit(124);
568 }
569 });
570 cancelled
571}
572
573pub async fn execute_run(
579 path: &str,
580 trace: bool,
581 denied_builtins: HashSet<String>,
582 script_argv: Vec<String>,
583 skill_dirs_raw: Vec<String>,
584 llm_mock_mode: CliLlmMockMode,
585 attestation: Option<RunAttestationOptions>,
586 profile: RunProfileOptions,
587) -> RunOutcome {
588 let mut stderr = String::new();
589 let mut stdout = String::new();
590
591 let (source, program) = parse_source_file(path);
592
593 let mut had_type_error = false;
594 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
595 for diag in &type_diagnostics {
596 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
597 if matches!(diag.severity, DiagnosticSeverity::Error) {
598 had_type_error = true;
599 }
600 stderr.push_str(&rendered);
601 }
602 if had_type_error {
603 return RunOutcome {
604 stdout,
605 stderr,
606 exit_code: 1,
607 };
608 }
609
610 let chunk = match harn_vm::Compiler::new().compile(&program) {
611 Ok(c) => c,
612 Err(e) => {
613 stderr.push_str(&format!("error: compile error: {e}\n"));
614 return RunOutcome {
615 stdout,
616 stderr,
617 exit_code: 1,
618 };
619 }
620 };
621
622 if trace {
623 harn_vm::llm::enable_tracing();
624 }
625 if profile.is_enabled() {
626 harn_vm::tracing::set_tracing_enabled(true);
627 }
628 if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
629 stderr.push_str(&format!("error: {error}\n"));
630 return RunOutcome {
631 stdout,
632 stderr,
633 exit_code: 1,
634 };
635 }
636
637 let mut vm = harn_vm::Vm::new();
638 harn_vm::register_vm_stdlib(&mut vm);
639 crate::install_default_hostlib(&mut vm);
640 let source_parent = std::path::Path::new(path)
641 .parent()
642 .unwrap_or(std::path::Path::new("."));
643 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
645 let store_base = project_root.as_deref().unwrap_or(source_parent);
646 let attestation_started_at_ms = now_ms();
647 let attestation_log = if attestation.is_some() {
648 Some(harn_vm::event_log::install_memory_for_current_thread(256))
649 } else {
650 None
651 };
652 if let Some(log) = attestation_log.as_ref() {
653 append_run_provenance_event(
654 log,
655 "started",
656 serde_json::json!({
657 "pipeline": path,
658 "argv": &script_argv,
659 "project_root": store_base.display().to_string(),
660 }),
661 )
662 .await;
663 }
664 harn_vm::register_store_builtins(&mut vm, store_base);
665 harn_vm::register_metadata_builtins(&mut vm, store_base);
666 let pipeline_name = std::path::Path::new(path)
667 .file_stem()
668 .and_then(|s| s.to_str())
669 .unwrap_or("default");
670 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
671 vm.set_source_info(path, &source);
672 if !denied_builtins.is_empty() {
673 vm.set_denied_builtins(denied_builtins);
674 }
675 if let Some(ref root) = project_root {
676 vm.set_project_root(root);
677 }
678
679 if let Some(p) = std::path::Path::new(path).parent() {
680 if !p.as_os_str().is_empty() {
681 vm.set_source_dir(p);
682 }
683 }
684
685 let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
688 let loaded = load_skills(&SkillLoaderInputs {
689 cli_dirs,
690 source_path: Some(std::path::PathBuf::from(path)),
691 });
692 emit_loader_warnings(&loaded.loader_warnings);
693 install_skills_global(&mut vm, &loaded);
694
695 let argv_values: Vec<harn_vm::VmValue> = script_argv
698 .iter()
699 .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
700 .collect();
701 vm.set_global(
702 "argv",
703 harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
704 );
705
706 let extensions = package::load_runtime_extensions(Path::new(path));
707 package::install_runtime_extensions(&extensions);
708 if let Some(manifest) = extensions.root_manifest.as_ref() {
709 if !manifest.mcp.is_empty() {
710 connect_mcp_servers(&manifest.mcp, &mut vm).await;
711 }
712 }
713 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
714 stderr.push_str(&format!(
715 "error: failed to install manifest triggers: {error}\n"
716 ));
717 return RunOutcome {
718 stdout,
719 stderr,
720 exit_code: 1,
721 };
722 }
723 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
724 stderr.push_str(&format!(
725 "error: failed to install manifest hooks: {error}\n"
726 ));
727 return RunOutcome {
728 stdout,
729 stderr,
730 exit_code: 1,
731 };
732 }
733
734 let local = tokio::task::LocalSet::new();
736 let execution = local
737 .run_until(async {
738 match vm.execute(&chunk).await {
739 Ok(value) => Ok((vm.output(), value)),
740 Err(e) => Err(vm.format_runtime_error(&e)),
741 }
742 })
743 .await;
744 if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
745 stderr.push_str(&format!("error: {error}\n"));
746 return RunOutcome {
747 stdout,
748 stderr,
749 exit_code: 1,
750 };
751 }
752
753 let buffered_stderr = harn_vm::take_stderr_buffer();
755 stderr.push_str(&buffered_stderr);
756
757 let exit_code = match &execution {
758 Ok((_, return_value)) => exit_code_from_return_value(return_value),
759 Err(_) => 1,
760 };
761
762 if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
763 if let Err(error) = emit_run_attestation(
764 log,
765 path,
766 store_base,
767 attestation_started_at_ms,
768 exit_code,
769 options,
770 &mut stderr,
771 )
772 .await
773 {
774 stderr.push_str(&format!(
775 "error: failed to emit provenance receipt: {error}\n"
776 ));
777 return RunOutcome {
778 stdout,
779 stderr,
780 exit_code: 1,
781 };
782 }
783 harn_vm::event_log::reset_active_event_log();
784 }
785
786 match execution {
787 Ok((output, return_value)) => {
788 stdout.push_str(output);
789 if trace {
790 stderr.push_str(&render_trace_summary());
791 }
792 if profile.is_enabled() {
793 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
794 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
795 }
796 }
797 if exit_code != 0 {
798 stderr.push_str(&render_return_value_error(&return_value));
799 }
800 RunOutcome {
801 stdout,
802 stderr,
803 exit_code,
804 }
805 }
806 Err(rendered_error) => {
807 stderr.push_str(&rendered_error);
808 if profile.is_enabled() {
809 if let Err(error) = render_and_persist_profile(&profile, &mut stderr) {
810 stderr.push_str(&format!("warning: failed to write profile: {error}\n"));
811 }
812 }
813 RunOutcome {
814 stdout,
815 stderr,
816 exit_code: 1,
817 }
818 }
819 }
820}
821
822fn render_and_persist_profile(
823 options: &RunProfileOptions,
824 stderr: &mut String,
825) -> Result<(), String> {
826 let spans = harn_vm::tracing::peek_spans();
827 let profile = harn_vm::profile::build(&spans);
828 if options.text {
829 stderr.push_str(&harn_vm::profile::render(&profile));
830 }
831 if let Some(path) = options.json_path.as_ref() {
832 if let Some(parent) = path.parent() {
833 if !parent.as_os_str().is_empty() {
834 fs::create_dir_all(parent)
835 .map_err(|error| format!("create {}: {error}", parent.display()))?;
836 }
837 }
838 let json = serde_json::to_string_pretty(&profile)
839 .map_err(|error| format!("serialize profile: {error}"))?;
840 fs::write(path, json).map_err(|error| format!("write {}: {error}", path.display()))?;
841 }
842 Ok(())
843}
844
845async fn append_run_provenance_event(
846 log: &Arc<harn_vm::event_log::AnyEventLog>,
847 kind: &str,
848 payload: serde_json::Value,
849) {
850 let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
851 return;
852 };
853 let _ = log
854 .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
855 .await;
856}
857
858async fn emit_run_attestation(
859 log: &Arc<harn_vm::event_log::AnyEventLog>,
860 path: &str,
861 store_base: &Path,
862 started_at_ms: i64,
863 exit_code: i32,
864 options: &RunAttestationOptions,
865 stderr: &mut String,
866) -> Result<(), String> {
867 let finished_at_ms = now_ms();
868 let status = if exit_code == 0 { "success" } else { "failure" };
869 append_run_provenance_event(
870 log,
871 "finished",
872 serde_json::json!({
873 "pipeline": path,
874 "status": status,
875 "exit_code": exit_code,
876 }),
877 )
878 .await;
879 log.flush()
880 .await
881 .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
882 let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
883 .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
884 let (signing_key, key_id) =
885 harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
886 .await
887 .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
888 let receipt = harn_vm::build_signed_receipt(
889 log,
890 harn_vm::ReceiptBuildOptions {
891 pipeline: path.to_string(),
892 status: status.to_string(),
893 started_at_ms,
894 finished_at_ms,
895 exit_code,
896 producer_name: "harn-cli".to_string(),
897 producer_version: env!("CARGO_PKG_VERSION").to_string(),
898 },
899 &signing_key,
900 key_id,
901 )
902 .await
903 .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
904 let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
905 if let Some(parent) = receipt_path.parent() {
906 fs::create_dir_all(parent)
907 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
908 }
909 let encoded = serde_json::to_vec_pretty(&receipt)
910 .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
911 fs::write(&receipt_path, encoded)
912 .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
913 stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
914 Ok(())
915}
916
917fn receipt_output_path(
918 store_base: &Path,
919 options: &RunAttestationOptions,
920 receipt_id: &str,
921) -> PathBuf {
922 if let Some(path) = options.receipt_out.as_ref() {
923 return path.clone();
924 }
925 harn_vm::runtime_paths::state_root(store_base)
926 .join("receipts")
927 .join(format!("{receipt_id}.json"))
928}
929
930fn now_ms() -> i64 {
931 std::time::SystemTime::now()
932 .duration_since(std::time::UNIX_EPOCH)
933 .map(|duration| duration.as_millis() as i64)
934 .unwrap_or(0)
935}
936
937fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
944 use harn_vm::VmValue;
945 match value {
946 VmValue::Int(n) => (*n).clamp(0, 255) as i32,
947 VmValue::EnumVariant {
948 enum_name,
949 variant,
950 fields,
951 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => 1,
952 _ => 0,
953 }
954}
955
956fn render_return_value_error(value: &harn_vm::VmValue) -> String {
957 let harn_vm::VmValue::EnumVariant {
958 enum_name,
959 variant,
960 fields,
961 } = value
962 else {
963 return String::new();
964 };
965 if enum_name.as_ref() != "Result" || variant.as_ref() != "Err" {
966 return String::new();
967 }
968 let rendered = fields.first().map(|p| p.display()).unwrap_or_default();
969 if rendered.is_empty() {
970 "error\n".to_string()
971 } else if rendered.ends_with('\n') {
972 rendered
973 } else {
974 format!("{rendered}\n")
975 }
976}
977
978pub(crate) async fn connect_mcp_servers(
987 servers: &[package::McpServerConfig],
988 vm: &mut harn_vm::Vm,
989) {
990 use std::collections::BTreeMap;
991 use std::rc::Rc;
992 use std::time::Duration;
993
994 let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
995 let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
996
997 for server in servers {
998 let resolved_auth = match mcp::resolve_auth_for_server(server).await {
999 Ok(resolution) => resolution,
1000 Err(error) => {
1001 eprintln!(
1002 "warning: mcp: failed to load auth for '{}': {}",
1003 server.name, error
1004 );
1005 AuthResolution::None
1006 }
1007 };
1008 let spec = serde_json::json!({
1009 "name": server.name,
1010 "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
1011 "command": server.command,
1012 "args": server.args,
1013 "env": server.env,
1014 "url": server.url,
1015 "auth_token": match resolved_auth {
1016 AuthResolution::Bearer(token) => Some(token),
1017 AuthResolution::None => server.auth_token.clone(),
1018 },
1019 "protocol_version": server.protocol_version,
1020 "proxy_server_name": server.proxy_server_name,
1021 });
1022
1023 registrations.push(harn_vm::RegisteredMcpServer {
1026 name: server.name.clone(),
1027 spec: spec.clone(),
1028 lazy: server.lazy,
1029 card: server.card.clone(),
1030 keep_alive: server.keep_alive_ms.map(Duration::from_millis),
1031 });
1032
1033 if server.lazy {
1034 eprintln!(
1035 "[harn] mcp: deferred '{}' (lazy, boots on first use)",
1036 server.name
1037 );
1038 continue;
1039 }
1040
1041 match harn_vm::connect_mcp_server_from_json(&spec).await {
1042 Ok(handle) => {
1043 eprintln!("[harn] mcp: connected to '{}'", server.name);
1044 harn_vm::mcp_install_active(&server.name, handle.clone());
1045 mcp_dict.insert(server.name.clone(), harn_vm::VmValue::McpClient(handle));
1046 }
1047 Err(e) => {
1048 eprintln!(
1049 "warning: mcp: failed to connect to '{}': {}",
1050 server.name, e
1051 );
1052 }
1053 }
1054 }
1055
1056 harn_vm::mcp_register_servers(registrations);
1059
1060 if !mcp_dict.is_empty() {
1061 vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
1062 }
1063}
1064
1065fn render_trace_summary() -> String {
1066 use std::fmt::Write;
1067 let entries = harn_vm::llm::take_trace();
1068 if entries.is_empty() {
1069 return String::new();
1070 }
1071 let mut out = String::new();
1072 let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
1073 let mut total_input = 0i64;
1074 let mut total_output = 0i64;
1075 let mut total_ms = 0u64;
1076 for (i, entry) in entries.iter().enumerate() {
1077 let _ = writeln!(
1078 out,
1079 " #{}: {} | {} in + {} out tokens | {} ms",
1080 i + 1,
1081 entry.model,
1082 entry.input_tokens,
1083 entry.output_tokens,
1084 entry.duration_ms,
1085 );
1086 total_input += entry.input_tokens;
1087 total_output += entry.output_tokens;
1088 total_ms += entry.duration_ms;
1089 }
1090 let total_tokens = total_input + total_output;
1091 let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
1093 let _ = writeln!(
1094 out,
1095 " \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
1096 entries.len(),
1097 if entries.len() == 1 { "" } else { "s" },
1098 total_tokens,
1099 total_input,
1100 total_output,
1101 total_ms,
1102 cost,
1103 );
1104 out
1105}
1106
1107pub(crate) async fn run_file_mcp_serve(
1121 path: &str,
1122 card_source: Option<&str>,
1123 mode: RunFileMcpServeMode,
1124) {
1125 let (source, program) = crate::parse_source_file(path);
1126
1127 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
1128 for diag in &type_diagnostics {
1129 match diag.severity {
1130 DiagnosticSeverity::Error => {
1131 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1132 eprint!("{rendered}");
1133 process::exit(1);
1134 }
1135 DiagnosticSeverity::Warning => {
1136 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1137 eprint!("{rendered}");
1138 }
1139 }
1140 }
1141
1142 let chunk = match harn_vm::Compiler::new().compile(&program) {
1143 Ok(c) => c,
1144 Err(e) => {
1145 eprintln!("error: compile error: {e}");
1146 process::exit(1);
1147 }
1148 };
1149
1150 let mut vm = harn_vm::Vm::new();
1151 harn_vm::register_vm_stdlib(&mut vm);
1152 crate::install_default_hostlib(&mut vm);
1153 let source_parent = std::path::Path::new(path)
1154 .parent()
1155 .unwrap_or(std::path::Path::new("."));
1156 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1157 let store_base = project_root.as_deref().unwrap_or(source_parent);
1158 harn_vm::register_store_builtins(&mut vm, store_base);
1159 harn_vm::register_metadata_builtins(&mut vm, store_base);
1160 let pipeline_name = std::path::Path::new(path)
1161 .file_stem()
1162 .and_then(|s| s.to_str())
1163 .unwrap_or("default");
1164 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1165 vm.set_source_info(path, &source);
1166 if let Some(ref root) = project_root {
1167 vm.set_project_root(root);
1168 }
1169 if let Some(p) = std::path::Path::new(path).parent() {
1170 if !p.as_os_str().is_empty() {
1171 vm.set_source_dir(p);
1172 }
1173 }
1174
1175 let loaded = load_skills(&SkillLoaderInputs {
1177 cli_dirs: Vec::new(),
1178 source_path: Some(std::path::PathBuf::from(path)),
1179 });
1180 emit_loader_warnings(&loaded.loader_warnings);
1181 install_skills_global(&mut vm, &loaded);
1182
1183 let extensions = package::load_runtime_extensions(Path::new(path));
1184 package::install_runtime_extensions(&extensions);
1185 if let Some(manifest) = extensions.root_manifest.as_ref() {
1186 if !manifest.mcp.is_empty() {
1187 connect_mcp_servers(&manifest.mcp, &mut vm).await;
1188 }
1189 }
1190 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1191 eprintln!("error: failed to install manifest triggers: {error}");
1192 process::exit(1);
1193 }
1194 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1195 eprintln!("error: failed to install manifest hooks: {error}");
1196 process::exit(1);
1197 }
1198
1199 let local = tokio::task::LocalSet::new();
1200 local
1201 .run_until(async {
1202 match vm.execute(&chunk).await {
1203 Ok(_) => {}
1204 Err(e) => {
1205 eprint!("{}", vm.format_runtime_error(&e));
1206 process::exit(1);
1207 }
1208 }
1209
1210 let output = vm.output();
1212 if !output.is_empty() {
1213 eprint!("{output}");
1214 }
1215
1216 let registry = match harn_vm::take_mcp_serve_registry() {
1217 Some(r) => r,
1218 None => {
1219 eprintln!("error: pipeline did not call mcp_serve(registry)");
1220 eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
1221 process::exit(1);
1222 }
1223 };
1224
1225 let tools = match harn_vm::tool_registry_to_mcp_tools(®istry) {
1226 Ok(t) => t,
1227 Err(e) => {
1228 eprintln!("error: {e}");
1229 process::exit(1);
1230 }
1231 };
1232
1233 let resources = harn_vm::take_mcp_serve_resources();
1234 let resource_templates = harn_vm::take_mcp_serve_resource_templates();
1235 let prompts = harn_vm::take_mcp_serve_prompts();
1236
1237 let server_name = std::path::Path::new(path)
1238 .file_stem()
1239 .and_then(|s| s.to_str())
1240 .unwrap_or("harn")
1241 .to_string();
1242
1243 let mut caps = Vec::new();
1244 if !tools.is_empty() {
1245 caps.push(format!(
1246 "{} tool{}",
1247 tools.len(),
1248 if tools.len() == 1 { "" } else { "s" }
1249 ));
1250 }
1251 let total_resources = resources.len() + resource_templates.len();
1252 if total_resources > 0 {
1253 caps.push(format!(
1254 "{total_resources} resource{}",
1255 if total_resources == 1 { "" } else { "s" }
1256 ));
1257 }
1258 if !prompts.is_empty() {
1259 caps.push(format!(
1260 "{} prompt{}",
1261 prompts.len(),
1262 if prompts.len() == 1 { "" } else { "s" }
1263 ));
1264 }
1265 eprintln!(
1266 "[harn] serve mcp: serving {} as '{server_name}'",
1267 caps.join(", ")
1268 );
1269
1270 let mut server =
1271 harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
1272 if let Some(source) = card_source {
1273 match resolve_card_source(source) {
1274 Ok(card) => server = server.with_server_card(card),
1275 Err(e) => {
1276 eprintln!("error: --card: {e}");
1277 process::exit(1);
1278 }
1279 }
1280 }
1281 match mode {
1282 RunFileMcpServeMode::Stdio => {
1283 if let Err(e) = server.run(&mut vm).await {
1284 eprintln!("error: MCP server error: {e}");
1285 process::exit(1);
1286 }
1287 }
1288 RunFileMcpServeMode::Http {
1289 options,
1290 auth_policy,
1291 } => {
1292 if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
1293 server,
1294 vm,
1295 options,
1296 auth_policy,
1297 )
1298 .await
1299 {
1300 eprintln!("error: MCP server error: {e}");
1301 process::exit(1);
1302 }
1303 }
1304 }
1305 })
1306 .await;
1307}
1308
1309pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
1314 let trimmed = source.trim_start();
1315 if trimmed.starts_with('{') || trimmed.starts_with('[') {
1316 return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
1317 }
1318 let path = std::path::Path::new(source);
1319 harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
1320}
1321
1322pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
1323 use notify::{Event, EventKind, RecursiveMode, Watcher};
1324
1325 let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
1326 eprintln!("Error: {e}");
1327 process::exit(1);
1328 });
1329 let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
1330
1331 eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
1332 run_file(
1333 path,
1334 false,
1335 denied_builtins.clone(),
1336 Vec::new(),
1337 CliLlmMockMode::Off,
1338 None,
1339 RunProfileOptions::default(),
1340 )
1341 .await;
1342
1343 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1344 let _watcher = {
1345 let tx = tx.clone();
1346 let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
1347 if let Ok(event) = res {
1348 if matches!(
1349 event.kind,
1350 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
1351 ) {
1352 let has_harn = event
1353 .paths
1354 .iter()
1355 .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
1356 if has_harn {
1357 let _ = tx.blocking_send(());
1358 }
1359 }
1360 }
1361 })
1362 .unwrap_or_else(|e| {
1363 eprintln!("Error setting up file watcher: {e}");
1364 process::exit(1);
1365 });
1366 watcher
1367 .watch(watch_dir, RecursiveMode::Recursive)
1368 .unwrap_or_else(|e| {
1369 eprintln!("Error watching directory: {e}");
1370 process::exit(1);
1371 });
1372 watcher };
1374
1375 eprintln!(
1376 "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
1377 watch_dir.display()
1378 );
1379
1380 loop {
1381 rx.recv().await;
1382 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1384 while rx.try_recv().is_ok() {}
1385
1386 eprintln!();
1387 eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
1388 run_file(
1389 path,
1390 false,
1391 denied_builtins.clone(),
1392 Vec::new(),
1393 CliLlmMockMode::Off,
1394 None,
1395 RunProfileOptions::default(),
1396 )
1397 .await;
1398 }
1399}
1400
1401#[cfg(test)]
1402mod tests {
1403 use super::{execute_run, CliLlmMockMode, RunProfileOptions};
1404 use std::collections::HashSet;
1405
1406 #[cfg(feature = "hostlib")]
1407 #[tokio::test]
1408 async fn execute_run_installs_hostlib_gate() {
1409 let temp = tempfile::NamedTempFile::new().expect("temp file");
1410 std::fs::write(
1411 temp.path(),
1412 r#"
1413pipeline main() {
1414 let _ = hostlib_enable("tools:deterministic")
1415 println("enabled")
1416}
1417"#,
1418 )
1419 .expect("write script");
1420
1421 let outcome = execute_run(
1422 &temp.path().to_string_lossy(),
1423 false,
1424 HashSet::new(),
1425 Vec::new(),
1426 Vec::new(),
1427 CliLlmMockMode::Off,
1428 None,
1429 RunProfileOptions::default(),
1430 )
1431 .await;
1432
1433 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1434 assert_eq!(outcome.stdout.trim(), "enabled");
1435 }
1436
1437 #[cfg(all(feature = "hostlib", unix))]
1438 #[tokio::test]
1439 async fn execute_run_can_read_hostlib_command_artifacts() {
1440 let temp = tempfile::NamedTempFile::new().expect("temp file");
1441 std::fs::write(
1442 temp.path(),
1443 r#"
1444pipeline main() {
1445 let _ = hostlib_enable("tools:deterministic")
1446 let result = hostlib_tools_run_command({
1447 argv: ["sh", "-c", "i=0; while [ $i -lt 2000 ]; do printf x; i=$((i+1)); done"],
1448 capture: {max_inline_bytes: 8},
1449 timeout_ms: 5000,
1450 })
1451 println(starts_with(result.command_id, "cmd_"))
1452 println(len(result.stdout))
1453 println(result.byte_count)
1454 let window = hostlib_tools_read_command_output({
1455 command_id: result.command_id,
1456 offset: 1990,
1457 length: 20,
1458 })
1459 println(len(window.content))
1460 println(window.eof)
1461}
1462"#,
1463 )
1464 .expect("write script");
1465
1466 let outcome = execute_run(
1467 &temp.path().to_string_lossy(),
1468 false,
1469 HashSet::new(),
1470 Vec::new(),
1471 Vec::new(),
1472 CliLlmMockMode::Off,
1473 None,
1474 RunProfileOptions::default(),
1475 )
1476 .await;
1477
1478 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1479 assert_eq!(outcome.stdout.trim(), "true\n8\n2000\n10\ntrue");
1480 }
1481}