1use std::collections::HashSet;
2use std::fs;
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8
9use harn_parser::DiagnosticSeverity;
10use harn_vm::event_log::EventLog;
11
12use crate::commands::mcp::{self, AuthResolution};
13use crate::package;
14use crate::parse_source_file;
15use crate::skill_loader::{
16 canonicalize_cli_dirs, emit_loader_warnings, install_skills_global, load_skills,
17 SkillLoaderInputs,
18};
19
20pub(crate) enum RunFileMcpServeMode {
21 Stdio,
22 Http {
23 options: harn_serve::McpHttpServeOptions,
24 auth_policy: harn_serve::AuthPolicy,
25 },
26}
27
28const CORE_BUILTINS: &[&str] = &[
30 "println",
31 "print",
32 "log",
33 "type_of",
34 "to_string",
35 "to_int",
36 "to_float",
37 "len",
38 "assert",
39 "assert_eq",
40 "assert_ne",
41 "json_parse",
42 "json_stringify",
43 "runtime_context",
44 "task_current",
45 "runtime_context_values",
46 "runtime_context_get",
47 "runtime_context_set",
48 "runtime_context_clear",
49];
50
51pub(crate) fn build_denied_builtins(
56 deny_csv: Option<&str>,
57 allow_csv: Option<&str>,
58) -> HashSet<String> {
59 if let Some(csv) = deny_csv {
60 csv.split(',')
61 .map(|s| s.trim().to_string())
62 .filter(|s| !s.is_empty())
63 .collect()
64 } else if let Some(csv) = allow_csv {
65 let allowed: HashSet<String> = csv
68 .split(',')
69 .map(|s| s.trim().to_string())
70 .filter(|s| !s.is_empty())
71 .collect();
72 let core: HashSet<&str> = CORE_BUILTINS.iter().copied().collect();
73
74 let mut tmp = harn_vm::Vm::new();
76 harn_vm::register_vm_stdlib(&mut tmp);
77 harn_vm::register_store_builtins(&mut tmp, std::path::Path::new("."));
78 harn_vm::register_metadata_builtins(&mut tmp, std::path::Path::new("."));
79
80 tmp.builtin_names()
81 .into_iter()
82 .filter(|name| !allowed.contains(name) && !core.contains(name.as_str()))
83 .collect()
84 } else {
85 HashSet::new()
86 }
87}
88
89fn typecheck_with_imports(
94 program: &[harn_parser::SNode],
95 path: &Path,
96 source: &str,
97) -> Vec<harn_parser::TypeDiagnostic> {
98 if let Err(error) = package::ensure_dependencies_materialized(path) {
99 eprintln!("error: {error}");
100 process::exit(1);
101 }
102 let graph = harn_modules::build(&[path.to_path_buf()]);
103 let mut checker = harn_parser::TypeChecker::new();
104 if let Some(imported) = graph.imported_names_for_file(path) {
105 checker = checker.with_imported_names(imported);
106 }
107 if let Some(imported) = graph.imported_type_declarations_for_file(path) {
108 checker = checker.with_imported_type_decls(imported);
109 }
110 checker.check_with_source(program, source)
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Eq)]
114pub enum CliLlmMockMode {
115 #[default]
116 Off,
117 Replay {
118 fixture_path: PathBuf,
119 },
120 Record {
121 fixture_path: PathBuf,
122 },
123}
124
125#[derive(Clone, Debug, Default, PartialEq, Eq)]
126pub struct RunAttestationOptions {
127 pub receipt_out: Option<PathBuf>,
128 pub agent_id: Option<String>,
129}
130
131#[derive(Clone, Debug, Default)]
135pub struct RunOutcome {
136 pub stdout: String,
137 pub stderr: String,
138 pub exit_code: i32,
139}
140
141fn load_cli_llm_mocks(path: &Path) -> Result<Vec<harn_vm::llm::LlmMock>, String> {
142 let content = fs::read_to_string(path)
143 .map_err(|error| format!("failed to read {}: {error}", path.display()))?;
144 let mut mocks = Vec::new();
145 for (idx, raw_line) in content.lines().enumerate() {
146 let line_no = idx + 1;
147 let line = raw_line.trim();
148 if line.is_empty() {
149 continue;
150 }
151 let value: serde_json::Value = serde_json::from_str(line).map_err(|error| {
152 format!(
153 "invalid JSON in {} line {}: {error}",
154 path.display(),
155 line_no
156 )
157 })?;
158 mocks.push(parse_cli_llm_mock_value(&value).map_err(|error| {
159 format!(
160 "invalid --llm-mock fixture in {} line {}: {error}",
161 path.display(),
162 line_no
163 )
164 })?);
165 }
166 Ok(mocks)
167}
168
169fn parse_cli_llm_mock_value(value: &serde_json::Value) -> Result<harn_vm::llm::LlmMock, String> {
170 let object = value
171 .as_object()
172 .ok_or_else(|| "fixture line must be a JSON object".to_string())?;
173
174 let match_pattern = optional_string_field(object, "match")?;
175 let consume_on_match = object
176 .get("consume_match")
177 .and_then(|value| value.as_bool())
178 .unwrap_or(false);
179 let text = optional_string_field(object, "text")?.unwrap_or_default();
180 let input_tokens = optional_i64_field(object, "input_tokens")?;
181 let output_tokens = optional_i64_field(object, "output_tokens")?;
182 let cache_read_tokens = optional_i64_field(object, "cache_read_tokens")?;
183 let cache_write_tokens = optional_i64_field(object, "cache_write_tokens")?
184 .or(optional_i64_field(object, "cache_creation_input_tokens")?);
185 let thinking = optional_string_field(object, "thinking")?;
186 let thinking_summary = optional_string_field(object, "thinking_summary")?;
187 let stop_reason = optional_string_field(object, "stop_reason")?;
188 let model = optional_string_field(object, "model")?.unwrap_or_else(|| "mock".to_string());
189 let provider = optional_string_field(object, "provider")?;
190 let blocks = optional_vec_field(object, "blocks")?;
191 let tool_calls = parse_cli_llm_tool_calls(object.get("tool_calls"))?;
192 let error = parse_cli_llm_mock_error(object.get("error"))?;
193
194 Ok(harn_vm::llm::LlmMock {
195 text,
196 tool_calls,
197 match_pattern,
198 consume_on_match,
199 input_tokens,
200 output_tokens,
201 cache_read_tokens,
202 cache_write_tokens,
203 thinking,
204 thinking_summary,
205 stop_reason,
206 model,
207 provider,
208 blocks,
209 error,
210 })
211}
212
213fn parse_cli_llm_tool_calls(
214 value: Option<&serde_json::Value>,
215) -> Result<Vec<serde_json::Value>, String> {
216 let Some(value) = value else {
217 return Ok(Vec::new());
218 };
219 let items = value
220 .as_array()
221 .ok_or_else(|| "tool_calls must be an array".to_string())?;
222 items
223 .iter()
224 .enumerate()
225 .map(|(idx, item)| {
226 normalize_cli_llm_tool_call(item).map_err(|error| format!("tool_calls[{idx}] {error}"))
227 })
228 .collect()
229}
230
231fn normalize_cli_llm_tool_call(value: &serde_json::Value) -> Result<serde_json::Value, String> {
232 let object = value
233 .as_object()
234 .ok_or_else(|| "must be a JSON object".to_string())?;
235 let name = object
236 .get("name")
237 .and_then(|value| value.as_str())
238 .ok_or_else(|| "is missing string field `name`".to_string())?;
239 let arguments = object
240 .get("arguments")
241 .cloned()
242 .or_else(|| object.get("args").cloned())
243 .unwrap_or_else(|| serde_json::json!({}));
244 Ok(serde_json::json!({
245 "name": name,
246 "arguments": arguments,
247 }))
248}
249
250fn parse_cli_llm_mock_error(
251 value: Option<&serde_json::Value>,
252) -> Result<Option<harn_vm::llm::MockError>, String> {
253 let Some(value) = value else {
254 return Ok(None);
255 };
256 if value.is_null() {
257 return Ok(None);
258 }
259 let object = value.as_object().ok_or_else(|| {
260 "error must be an object {category, message, retry_after_ms?}".to_string()
261 })?;
262 let category_str = object
263 .get("category")
264 .and_then(|value| value.as_str())
265 .ok_or_else(|| "error.category is required".to_string())?;
266 let category = harn_vm::ErrorCategory::parse(category_str);
267 if category.as_str() != category_str {
268 return Err(format!("unknown error category `{category_str}`"));
269 }
270 let message = object
271 .get("message")
272 .and_then(|value| value.as_str())
273 .unwrap_or_default()
274 .to_string();
275 let retry_after_ms = match object.get("retry_after_ms") {
276 None | Some(serde_json::Value::Null) => None,
277 Some(serde_json::Value::Number(n)) => match n.as_u64() {
278 Some(v) => Some(v),
279 None => return Err("error.retry_after_ms must be a non-negative integer".to_string()),
280 },
281 Some(_) => return Err("error.retry_after_ms must be a non-negative integer".to_string()),
282 };
283 Ok(Some(harn_vm::llm::MockError {
284 category,
285 message,
286 retry_after_ms,
287 }))
288}
289
290fn optional_string_field(
291 object: &serde_json::Map<String, serde_json::Value>,
292 key: &str,
293) -> Result<Option<String>, String> {
294 match object.get(key) {
295 None | Some(serde_json::Value::Null) => Ok(None),
296 Some(serde_json::Value::String(value)) => Ok(Some(value.clone())),
297 Some(_) => Err(format!("`{key}` must be a string")),
298 }
299}
300
301fn optional_i64_field(
302 object: &serde_json::Map<String, serde_json::Value>,
303 key: &str,
304) -> Result<Option<i64>, String> {
305 match object.get(key) {
306 None | Some(serde_json::Value::Null) => Ok(None),
307 Some(value) => value
308 .as_i64()
309 .map(Some)
310 .ok_or_else(|| format!("`{key}` must be an integer")),
311 }
312}
313
314fn optional_vec_field(
315 object: &serde_json::Map<String, serde_json::Value>,
316 key: &str,
317) -> Result<Option<Vec<serde_json::Value>>, String> {
318 match object.get(key) {
319 None | Some(serde_json::Value::Null) => Ok(None),
320 Some(serde_json::Value::Array(items)) => Ok(Some(items.clone())),
321 Some(_) => Err(format!("`{key}` must be an array")),
322 }
323}
324
325pub fn install_cli_llm_mock_mode(mode: &CliLlmMockMode) -> Result<(), String> {
326 harn_vm::llm::clear_cli_llm_mock_mode();
327 match mode {
328 CliLlmMockMode::Off => Ok(()),
329 CliLlmMockMode::Replay { fixture_path } => {
330 let mocks = load_cli_llm_mocks(fixture_path)?;
331 harn_vm::llm::install_cli_llm_mocks(mocks);
332 Ok(())
333 }
334 CliLlmMockMode::Record { .. } => {
335 harn_vm::llm::enable_cli_llm_mock_recording();
336 Ok(())
337 }
338 }
339}
340
341pub fn persist_cli_llm_mock_recording(mode: &CliLlmMockMode) -> Result<(), String> {
342 let CliLlmMockMode::Record { fixture_path } = mode else {
343 return Ok(());
344 };
345 if let Some(parent) = fixture_path.parent() {
346 if !parent.as_os_str().is_empty() {
347 fs::create_dir_all(parent).map_err(|error| {
348 format!(
349 "failed to create fixture directory {}: {error}",
350 parent.display()
351 )
352 })?;
353 }
354 }
355
356 let lines = harn_vm::llm::take_cli_llm_recordings()
357 .into_iter()
358 .map(serialize_cli_llm_mock)
359 .collect::<Result<Vec<_>, _>>()?;
360 let body = if lines.is_empty() {
361 String::new()
362 } else {
363 format!("{}\n", lines.join("\n"))
364 };
365 fs::write(fixture_path, body)
366 .map_err(|error| format!("failed to write {}: {error}", fixture_path.display()))
367}
368
369fn serialize_cli_llm_mock(mock: harn_vm::llm::LlmMock) -> Result<String, String> {
370 let mut object = serde_json::Map::new();
371 if let Some(match_pattern) = mock.match_pattern {
372 object.insert(
373 "match".to_string(),
374 serde_json::Value::String(match_pattern),
375 );
376 }
377 if !mock.text.is_empty() {
378 object.insert("text".to_string(), serde_json::Value::String(mock.text));
379 }
380 if !mock.tool_calls.is_empty() {
381 let tool_calls = mock
382 .tool_calls
383 .into_iter()
384 .map(|tool_call| {
385 let object = tool_call
386 .as_object()
387 .ok_or_else(|| "recorded tool call must be an object".to_string())?;
388 let name = object
389 .get("name")
390 .and_then(|value| value.as_str())
391 .ok_or_else(|| "recorded tool call is missing `name`".to_string())?;
392 Ok(serde_json::json!({
393 "name": name,
394 "args": object
395 .get("arguments")
396 .cloned()
397 .unwrap_or_else(|| serde_json::json!({})),
398 }))
399 })
400 .collect::<Result<Vec<_>, String>>()?;
401 object.insert(
402 "tool_calls".to_string(),
403 serde_json::Value::Array(tool_calls),
404 );
405 }
406 if let Some(input_tokens) = mock.input_tokens {
407 object.insert(
408 "input_tokens".to_string(),
409 serde_json::Value::Number(input_tokens.into()),
410 );
411 }
412 if let Some(output_tokens) = mock.output_tokens {
413 object.insert(
414 "output_tokens".to_string(),
415 serde_json::Value::Number(output_tokens.into()),
416 );
417 }
418 if let Some(cache_read_tokens) = mock.cache_read_tokens {
419 object.insert(
420 "cache_read_tokens".to_string(),
421 serde_json::Value::Number(cache_read_tokens.into()),
422 );
423 }
424 if let Some(cache_write_tokens) = mock.cache_write_tokens {
425 object.insert(
426 "cache_write_tokens".to_string(),
427 serde_json::Value::Number(cache_write_tokens.into()),
428 );
429 object.insert(
430 "cache_creation_input_tokens".to_string(),
431 serde_json::Value::Number(cache_write_tokens.into()),
432 );
433 }
434 if let Some(thinking) = mock.thinking {
435 object.insert("thinking".to_string(), serde_json::Value::String(thinking));
436 }
437 if let Some(stop_reason) = mock.stop_reason {
438 object.insert(
439 "stop_reason".to_string(),
440 serde_json::Value::String(stop_reason),
441 );
442 }
443 object.insert("model".to_string(), serde_json::Value::String(mock.model));
444 if let Some(provider) = mock.provider {
445 object.insert("provider".to_string(), serde_json::Value::String(provider));
446 }
447 if let Some(blocks) = mock.blocks {
448 object.insert("blocks".to_string(), serde_json::Value::Array(blocks));
449 }
450 if let Some(error) = mock.error {
451 object.insert(
452 "error".to_string(),
453 serde_json::json!({
454 "category": error.category.as_str(),
455 "message": error.message,
456 }),
457 );
458 }
459 serde_json::to_string(&serde_json::Value::Object(object))
460 .map_err(|error| format!("failed to serialize recorded fixture: {error}"))
461}
462
463pub(crate) async fn run_file(
464 path: &str,
465 trace: bool,
466 denied_builtins: HashSet<String>,
467 script_argv: Vec<String>,
468 llm_mock_mode: CliLlmMockMode,
469 attestation: Option<RunAttestationOptions>,
470) {
471 run_file_with_skill_dirs(
472 path,
473 trace,
474 denied_builtins,
475 script_argv,
476 Vec::new(),
477 llm_mock_mode,
478 attestation,
479 )
480 .await;
481}
482
483pub(crate) async fn run_file_with_skill_dirs(
484 path: &str,
485 trace: bool,
486 denied_builtins: HashSet<String>,
487 script_argv: Vec<String>,
488 skill_dirs_raw: Vec<String>,
489 llm_mock_mode: CliLlmMockMode,
490 attestation: Option<RunAttestationOptions>,
491) {
492 let cancelled = install_signal_shutdown_handler();
494
495 let outcome = execute_run(
496 path,
497 trace,
498 denied_builtins,
499 script_argv,
500 skill_dirs_raw,
501 llm_mock_mode,
502 attestation,
503 )
504 .await;
505
506 if !outcome.stderr.is_empty() {
509 io::stderr().write_all(outcome.stderr.as_bytes()).ok();
510 }
511 if !outcome.stdout.is_empty() {
512 io::stdout().write_all(outcome.stdout.as_bytes()).ok();
513 }
514
515 let mut exit_code = outcome.exit_code;
516 if exit_code != 0 && cancelled.load(Ordering::SeqCst) {
517 exit_code = 124;
518 }
519 if exit_code != 0 {
520 process::exit(exit_code);
521 }
522}
523
524fn install_signal_shutdown_handler() -> Arc<AtomicBool> {
525 let cancelled = Arc::new(AtomicBool::new(false));
526 let cancelled_clone = cancelled.clone();
527 tokio::spawn(async move {
528 #[cfg(unix)]
529 {
530 use tokio::signal::unix::{signal, SignalKind};
531 let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
532 let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
533 tokio::select! {
534 _ = sigterm.recv() => {},
535 _ = sigint.recv() => {},
536 }
537 cancelled_clone.store(true, Ordering::SeqCst);
538 eprintln!("[harn] signal received, flushing state...");
539 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
540 process::exit(124);
541 }
542 #[cfg(not(unix))]
543 {
544 let _ = tokio::signal::ctrl_c().await;
545 cancelled_clone.store(true, Ordering::SeqCst);
546 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
547 process::exit(124);
548 }
549 });
550 cancelled
551}
552
553pub async fn execute_run(
559 path: &str,
560 trace: bool,
561 denied_builtins: HashSet<String>,
562 script_argv: Vec<String>,
563 skill_dirs_raw: Vec<String>,
564 llm_mock_mode: CliLlmMockMode,
565 attestation: Option<RunAttestationOptions>,
566) -> RunOutcome {
567 let mut stderr = String::new();
568 let mut stdout = String::new();
569
570 let (source, program) = parse_source_file(path);
571
572 let mut had_type_error = false;
573 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
574 for diag in &type_diagnostics {
575 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
576 if matches!(diag.severity, DiagnosticSeverity::Error) {
577 had_type_error = true;
578 }
579 stderr.push_str(&rendered);
580 }
581 if had_type_error {
582 return RunOutcome {
583 stdout,
584 stderr,
585 exit_code: 1,
586 };
587 }
588
589 let chunk = match harn_vm::Compiler::new().compile(&program) {
590 Ok(c) => c,
591 Err(e) => {
592 stderr.push_str(&format!("error: compile error: {e}\n"));
593 return RunOutcome {
594 stdout,
595 stderr,
596 exit_code: 1,
597 };
598 }
599 };
600
601 if trace {
602 harn_vm::llm::enable_tracing();
603 }
604 if let Err(error) = install_cli_llm_mock_mode(&llm_mock_mode) {
605 stderr.push_str(&format!("error: {error}\n"));
606 return RunOutcome {
607 stdout,
608 stderr,
609 exit_code: 1,
610 };
611 }
612
613 let mut vm = harn_vm::Vm::new();
614 harn_vm::register_vm_stdlib(&mut vm);
615 crate::install_default_hostlib(&mut vm);
616 let source_parent = std::path::Path::new(path)
617 .parent()
618 .unwrap_or(std::path::Path::new("."));
619 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
621 let store_base = project_root.as_deref().unwrap_or(source_parent);
622 let attestation_started_at_ms = now_ms();
623 let attestation_log = if attestation.is_some() {
624 Some(harn_vm::event_log::install_memory_for_current_thread(256))
625 } else {
626 None
627 };
628 if let Some(log) = attestation_log.as_ref() {
629 append_run_provenance_event(
630 log,
631 "started",
632 serde_json::json!({
633 "pipeline": path,
634 "argv": &script_argv,
635 "project_root": store_base.display().to_string(),
636 }),
637 )
638 .await;
639 }
640 harn_vm::register_store_builtins(&mut vm, store_base);
641 harn_vm::register_metadata_builtins(&mut vm, store_base);
642 let pipeline_name = std::path::Path::new(path)
643 .file_stem()
644 .and_then(|s| s.to_str())
645 .unwrap_or("default");
646 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
647 vm.set_source_info(path, &source);
648 if !denied_builtins.is_empty() {
649 vm.set_denied_builtins(denied_builtins);
650 }
651 if let Some(ref root) = project_root {
652 vm.set_project_root(root);
653 }
654
655 if let Some(p) = std::path::Path::new(path).parent() {
656 if !p.as_os_str().is_empty() {
657 vm.set_source_dir(p);
658 }
659 }
660
661 let cli_dirs = canonicalize_cli_dirs(&skill_dirs_raw, None);
664 let loaded = load_skills(&SkillLoaderInputs {
665 cli_dirs,
666 source_path: Some(std::path::PathBuf::from(path)),
667 });
668 emit_loader_warnings(&loaded.loader_warnings);
669 install_skills_global(&mut vm, &loaded);
670
671 let argv_values: Vec<harn_vm::VmValue> = script_argv
674 .iter()
675 .map(|s| harn_vm::VmValue::String(std::rc::Rc::from(s.as_str())))
676 .collect();
677 vm.set_global(
678 "argv",
679 harn_vm::VmValue::List(std::rc::Rc::new(argv_values)),
680 );
681
682 let extensions = package::load_runtime_extensions(Path::new(path));
683 package::install_runtime_extensions(&extensions);
684 if let Some(manifest) = extensions.root_manifest.as_ref() {
685 if !manifest.mcp.is_empty() {
686 connect_mcp_servers(&manifest.mcp, &mut vm).await;
687 }
688 }
689 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
690 stderr.push_str(&format!(
691 "error: failed to install manifest triggers: {error}\n"
692 ));
693 return RunOutcome {
694 stdout,
695 stderr,
696 exit_code: 1,
697 };
698 }
699 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
700 stderr.push_str(&format!(
701 "error: failed to install manifest hooks: {error}\n"
702 ));
703 return RunOutcome {
704 stdout,
705 stderr,
706 exit_code: 1,
707 };
708 }
709
710 let local = tokio::task::LocalSet::new();
712 let execution = local
713 .run_until(async {
714 match vm.execute(&chunk).await {
715 Ok(value) => Ok((vm.output(), value)),
716 Err(e) => Err(vm.format_runtime_error(&e)),
717 }
718 })
719 .await;
720 if let Err(error) = persist_cli_llm_mock_recording(&llm_mock_mode) {
721 stderr.push_str(&format!("error: {error}\n"));
722 return RunOutcome {
723 stdout,
724 stderr,
725 exit_code: 1,
726 };
727 }
728
729 let buffered_stderr = harn_vm::take_stderr_buffer();
731 stderr.push_str(&buffered_stderr);
732
733 let exit_code = match &execution {
734 Ok((_, return_value)) => exit_code_from_return_value(return_value),
735 Err(_) => 1,
736 };
737
738 if let (Some(options), Some(log)) = (attestation.as_ref(), attestation_log.as_ref()) {
739 if let Err(error) = emit_run_attestation(
740 log,
741 path,
742 store_base,
743 attestation_started_at_ms,
744 exit_code,
745 options,
746 &mut stderr,
747 )
748 .await
749 {
750 stderr.push_str(&format!(
751 "error: failed to emit provenance receipt: {error}\n"
752 ));
753 return RunOutcome {
754 stdout,
755 stderr,
756 exit_code: 1,
757 };
758 }
759 harn_vm::event_log::reset_active_event_log();
760 }
761
762 match execution {
763 Ok((output, return_value)) => {
764 stdout.push_str(output);
765 if trace {
766 stderr.push_str(&render_trace_summary());
767 }
768 if exit_code != 0 {
769 stderr.push_str(&render_return_value_error(&return_value));
770 }
771 RunOutcome {
772 stdout,
773 stderr,
774 exit_code,
775 }
776 }
777 Err(rendered_error) => {
778 stderr.push_str(&rendered_error);
779 RunOutcome {
780 stdout,
781 stderr,
782 exit_code: 1,
783 }
784 }
785 }
786}
787
788async fn append_run_provenance_event(
789 log: &Arc<harn_vm::event_log::AnyEventLog>,
790 kind: &str,
791 payload: serde_json::Value,
792) {
793 let Ok(topic) = harn_vm::event_log::Topic::new("run.provenance") else {
794 return;
795 };
796 let _ = log
797 .append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
798 .await;
799}
800
801async fn emit_run_attestation(
802 log: &Arc<harn_vm::event_log::AnyEventLog>,
803 path: &str,
804 store_base: &Path,
805 started_at_ms: i64,
806 exit_code: i32,
807 options: &RunAttestationOptions,
808 stderr: &mut String,
809) -> Result<(), String> {
810 let finished_at_ms = now_ms();
811 let status = if exit_code == 0 { "success" } else { "failure" };
812 append_run_provenance_event(
813 log,
814 "finished",
815 serde_json::json!({
816 "pipeline": path,
817 "status": status,
818 "exit_code": exit_code,
819 }),
820 )
821 .await;
822 log.flush()
823 .await
824 .map_err(|error| format!("failed to flush attestation event log: {error}"))?;
825 let secret_provider = harn_vm::secrets::configured_default_chain("harn.provenance")
826 .map_err(|error| format!("failed to configure provenance secrets: {error}"))?;
827 let (signing_key, key_id) =
828 harn_vm::load_or_generate_agent_signing_key(&secret_provider, options.agent_id.as_deref())
829 .await
830 .map_err(|error| format!("failed to load provenance signing key: {error}"))?;
831 let receipt = harn_vm::build_signed_receipt(
832 log,
833 harn_vm::ReceiptBuildOptions {
834 pipeline: path.to_string(),
835 status: status.to_string(),
836 started_at_ms,
837 finished_at_ms,
838 exit_code,
839 producer_name: "harn-cli".to_string(),
840 producer_version: env!("CARGO_PKG_VERSION").to_string(),
841 },
842 &signing_key,
843 key_id,
844 )
845 .await
846 .map_err(|error| format!("failed to build provenance receipt: {error}"))?;
847 let receipt_path = receipt_output_path(store_base, options, &receipt.receipt_id);
848 if let Some(parent) = receipt_path.parent() {
849 fs::create_dir_all(parent)
850 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
851 }
852 let encoded = serde_json::to_vec_pretty(&receipt)
853 .map_err(|error| format!("failed to encode provenance receipt: {error}"))?;
854 fs::write(&receipt_path, encoded)
855 .map_err(|error| format!("failed to write {}: {error}", receipt_path.display()))?;
856 stderr.push_str(&format!("provenance receipt: {}\n", receipt_path.display()));
857 Ok(())
858}
859
860fn receipt_output_path(
861 store_base: &Path,
862 options: &RunAttestationOptions,
863 receipt_id: &str,
864) -> PathBuf {
865 if let Some(path) = options.receipt_out.as_ref() {
866 return path.clone();
867 }
868 harn_vm::runtime_paths::state_root(store_base)
869 .join("receipts")
870 .join(format!("{receipt_id}.json"))
871}
872
873fn now_ms() -> i64 {
874 std::time::SystemTime::now()
875 .duration_since(std::time::UNIX_EPOCH)
876 .map(|duration| duration.as_millis() as i64)
877 .unwrap_or(0)
878}
879
880fn exit_code_from_return_value(value: &harn_vm::VmValue) -> i32 {
887 use harn_vm::VmValue;
888 match value {
889 VmValue::Int(n) => (*n).clamp(0, 255) as i32,
890 VmValue::EnumVariant {
891 enum_name,
892 variant,
893 fields,
894 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => 1,
895 _ => 0,
896 }
897}
898
899fn render_return_value_error(value: &harn_vm::VmValue) -> String {
900 let harn_vm::VmValue::EnumVariant {
901 enum_name,
902 variant,
903 fields,
904 } = value
905 else {
906 return String::new();
907 };
908 if enum_name.as_ref() != "Result" || variant.as_ref() != "Err" {
909 return String::new();
910 }
911 let rendered = fields.first().map(|p| p.display()).unwrap_or_default();
912 if rendered.is_empty() {
913 "error\n".to_string()
914 } else if rendered.ends_with('\n') {
915 rendered
916 } else {
917 format!("{rendered}\n")
918 }
919}
920
921pub(crate) async fn connect_mcp_servers(
930 servers: &[package::McpServerConfig],
931 vm: &mut harn_vm::Vm,
932) {
933 use std::collections::BTreeMap;
934 use std::rc::Rc;
935 use std::time::Duration;
936
937 let mut mcp_dict: BTreeMap<String, harn_vm::VmValue> = BTreeMap::new();
938 let mut registrations: Vec<harn_vm::RegisteredMcpServer> = Vec::new();
939
940 for server in servers {
941 let resolved_auth = match mcp::resolve_auth_for_server(server).await {
942 Ok(resolution) => resolution,
943 Err(error) => {
944 eprintln!(
945 "warning: mcp: failed to load auth for '{}': {}",
946 server.name, error
947 );
948 AuthResolution::None
949 }
950 };
951 let spec = serde_json::json!({
952 "name": server.name,
953 "transport": server.transport.clone().unwrap_or_else(|| "stdio".to_string()),
954 "command": server.command,
955 "args": server.args,
956 "env": server.env,
957 "url": server.url,
958 "auth_token": match resolved_auth {
959 AuthResolution::Bearer(token) => Some(token),
960 AuthResolution::None => server.auth_token.clone(),
961 },
962 "protocol_version": server.protocol_version,
963 "proxy_server_name": server.proxy_server_name,
964 });
965
966 registrations.push(harn_vm::RegisteredMcpServer {
969 name: server.name.clone(),
970 spec: spec.clone(),
971 lazy: server.lazy,
972 card: server.card.clone(),
973 keep_alive: server.keep_alive_ms.map(Duration::from_millis),
974 });
975
976 if server.lazy {
977 eprintln!(
978 "[harn] mcp: deferred '{}' (lazy, boots on first use)",
979 server.name
980 );
981 continue;
982 }
983
984 match harn_vm::connect_mcp_server_from_json(&spec).await {
985 Ok(handle) => {
986 eprintln!("[harn] mcp: connected to '{}'", server.name);
987 harn_vm::mcp_install_active(&server.name, handle.clone());
988 mcp_dict.insert(server.name.clone(), harn_vm::VmValue::McpClient(handle));
989 }
990 Err(e) => {
991 eprintln!(
992 "warning: mcp: failed to connect to '{}': {}",
993 server.name, e
994 );
995 }
996 }
997 }
998
999 harn_vm::mcp_register_servers(registrations);
1002
1003 if !mcp_dict.is_empty() {
1004 vm.set_global("mcp", harn_vm::VmValue::Dict(Rc::new(mcp_dict)));
1005 }
1006}
1007
1008fn render_trace_summary() -> String {
1009 use std::fmt::Write;
1010 let entries = harn_vm::llm::take_trace();
1011 if entries.is_empty() {
1012 return String::new();
1013 }
1014 let mut out = String::new();
1015 let _ = writeln!(out, "\n\x1b[2m─── LLM trace ───\x1b[0m");
1016 let mut total_input = 0i64;
1017 let mut total_output = 0i64;
1018 let mut total_ms = 0u64;
1019 for (i, entry) in entries.iter().enumerate() {
1020 let _ = writeln!(
1021 out,
1022 " #{}: {} | {} in + {} out tokens | {} ms",
1023 i + 1,
1024 entry.model,
1025 entry.input_tokens,
1026 entry.output_tokens,
1027 entry.duration_ms,
1028 );
1029 total_input += entry.input_tokens;
1030 total_output += entry.output_tokens;
1031 total_ms += entry.duration_ms;
1032 }
1033 let total_tokens = total_input + total_output;
1034 let cost = (total_input as f64 * 3.0 + total_output as f64 * 15.0) / 1_000_000.0;
1036 let _ = writeln!(
1037 out,
1038 " \x1b[1m{} call{}, {} tokens ({}in + {}out), {} ms, ~${:.4}\x1b[0m",
1039 entries.len(),
1040 if entries.len() == 1 { "" } else { "s" },
1041 total_tokens,
1042 total_input,
1043 total_output,
1044 total_ms,
1045 cost,
1046 );
1047 out
1048}
1049
1050pub(crate) async fn run_file_mcp_serve(
1064 path: &str,
1065 card_source: Option<&str>,
1066 mode: RunFileMcpServeMode,
1067) {
1068 let (source, program) = crate::parse_source_file(path);
1069
1070 let type_diagnostics = typecheck_with_imports(&program, Path::new(path), &source);
1071 for diag in &type_diagnostics {
1072 match diag.severity {
1073 DiagnosticSeverity::Error => {
1074 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1075 eprint!("{rendered}");
1076 process::exit(1);
1077 }
1078 DiagnosticSeverity::Warning => {
1079 let rendered = harn_parser::diagnostic::render_type_diagnostic(&source, path, diag);
1080 eprint!("{rendered}");
1081 }
1082 }
1083 }
1084
1085 let chunk = match harn_vm::Compiler::new().compile(&program) {
1086 Ok(c) => c,
1087 Err(e) => {
1088 eprintln!("error: compile error: {e}");
1089 process::exit(1);
1090 }
1091 };
1092
1093 let mut vm = harn_vm::Vm::new();
1094 harn_vm::register_vm_stdlib(&mut vm);
1095 crate::install_default_hostlib(&mut vm);
1096 let source_parent = std::path::Path::new(path)
1097 .parent()
1098 .unwrap_or(std::path::Path::new("."));
1099 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
1100 let store_base = project_root.as_deref().unwrap_or(source_parent);
1101 harn_vm::register_store_builtins(&mut vm, store_base);
1102 harn_vm::register_metadata_builtins(&mut vm, store_base);
1103 let pipeline_name = std::path::Path::new(path)
1104 .file_stem()
1105 .and_then(|s| s.to_str())
1106 .unwrap_or("default");
1107 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
1108 vm.set_source_info(path, &source);
1109 if let Some(ref root) = project_root {
1110 vm.set_project_root(root);
1111 }
1112 if let Some(p) = std::path::Path::new(path).parent() {
1113 if !p.as_os_str().is_empty() {
1114 vm.set_source_dir(p);
1115 }
1116 }
1117
1118 let loaded = load_skills(&SkillLoaderInputs {
1120 cli_dirs: Vec::new(),
1121 source_path: Some(std::path::PathBuf::from(path)),
1122 });
1123 emit_loader_warnings(&loaded.loader_warnings);
1124 install_skills_global(&mut vm, &loaded);
1125
1126 let extensions = package::load_runtime_extensions(Path::new(path));
1127 package::install_runtime_extensions(&extensions);
1128 if let Some(manifest) = extensions.root_manifest.as_ref() {
1129 if !manifest.mcp.is_empty() {
1130 connect_mcp_servers(&manifest.mcp, &mut vm).await;
1131 }
1132 }
1133 if let Err(error) = package::install_manifest_triggers(&mut vm, &extensions).await {
1134 eprintln!("error: failed to install manifest triggers: {error}");
1135 process::exit(1);
1136 }
1137 if let Err(error) = package::install_manifest_hooks(&mut vm, &extensions).await {
1138 eprintln!("error: failed to install manifest hooks: {error}");
1139 process::exit(1);
1140 }
1141
1142 let local = tokio::task::LocalSet::new();
1143 local
1144 .run_until(async {
1145 match vm.execute(&chunk).await {
1146 Ok(_) => {}
1147 Err(e) => {
1148 eprint!("{}", vm.format_runtime_error(&e));
1149 process::exit(1);
1150 }
1151 }
1152
1153 let output = vm.output();
1155 if !output.is_empty() {
1156 eprint!("{output}");
1157 }
1158
1159 let registry = match harn_vm::take_mcp_serve_registry() {
1160 Some(r) => r,
1161 None => {
1162 eprintln!("error: pipeline did not call mcp_serve(registry)");
1163 eprintln!("hint: call mcp_serve(tools) at the end of your pipeline");
1164 process::exit(1);
1165 }
1166 };
1167
1168 let tools = match harn_vm::tool_registry_to_mcp_tools(®istry) {
1169 Ok(t) => t,
1170 Err(e) => {
1171 eprintln!("error: {e}");
1172 process::exit(1);
1173 }
1174 };
1175
1176 let resources = harn_vm::take_mcp_serve_resources();
1177 let resource_templates = harn_vm::take_mcp_serve_resource_templates();
1178 let prompts = harn_vm::take_mcp_serve_prompts();
1179
1180 let server_name = std::path::Path::new(path)
1181 .file_stem()
1182 .and_then(|s| s.to_str())
1183 .unwrap_or("harn")
1184 .to_string();
1185
1186 let mut caps = Vec::new();
1187 if !tools.is_empty() {
1188 caps.push(format!(
1189 "{} tool{}",
1190 tools.len(),
1191 if tools.len() == 1 { "" } else { "s" }
1192 ));
1193 }
1194 let total_resources = resources.len() + resource_templates.len();
1195 if total_resources > 0 {
1196 caps.push(format!(
1197 "{total_resources} resource{}",
1198 if total_resources == 1 { "" } else { "s" }
1199 ));
1200 }
1201 if !prompts.is_empty() {
1202 caps.push(format!(
1203 "{} prompt{}",
1204 prompts.len(),
1205 if prompts.len() == 1 { "" } else { "s" }
1206 ));
1207 }
1208 eprintln!(
1209 "[harn] serve mcp: serving {} as '{server_name}'",
1210 caps.join(", ")
1211 );
1212
1213 let mut server =
1214 harn_vm::McpServer::new(server_name, tools, resources, resource_templates, prompts);
1215 if let Some(source) = card_source {
1216 match resolve_card_source(source) {
1217 Ok(card) => server = server.with_server_card(card),
1218 Err(e) => {
1219 eprintln!("error: --card: {e}");
1220 process::exit(1);
1221 }
1222 }
1223 }
1224 match mode {
1225 RunFileMcpServeMode::Stdio => {
1226 if let Err(e) = server.run(&mut vm).await {
1227 eprintln!("error: MCP server error: {e}");
1228 process::exit(1);
1229 }
1230 }
1231 RunFileMcpServeMode::Http {
1232 options,
1233 auth_policy,
1234 } => {
1235 if let Err(e) = crate::commands::serve::run_script_mcp_http_server(
1236 server,
1237 vm,
1238 options,
1239 auth_policy,
1240 )
1241 .await
1242 {
1243 eprintln!("error: MCP server error: {e}");
1244 process::exit(1);
1245 }
1246 }
1247 }
1248 })
1249 .await;
1250}
1251
1252pub(crate) fn resolve_card_source(source: &str) -> Result<serde_json::Value, String> {
1257 let trimmed = source.trim_start();
1258 if trimmed.starts_with('{') || trimmed.starts_with('[') {
1259 return serde_json::from_str(source).map_err(|e| format!("inline JSON parse error: {e}"));
1260 }
1261 let path = std::path::Path::new(source);
1262 harn_vm::load_server_card_from_path(path).map_err(|e| format!("{e}"))
1263}
1264
1265pub(crate) async fn run_watch(path: &str, denied_builtins: HashSet<String>) {
1266 use notify::{Event, EventKind, RecursiveMode, Watcher};
1267
1268 let abs_path = std::fs::canonicalize(path).unwrap_or_else(|e| {
1269 eprintln!("Error: {e}");
1270 process::exit(1);
1271 });
1272 let watch_dir = abs_path.parent().unwrap_or(Path::new("."));
1273
1274 eprintln!("\x1b[2m[watch] running {path}...\x1b[0m");
1275 run_file(
1276 path,
1277 false,
1278 denied_builtins.clone(),
1279 Vec::new(),
1280 CliLlmMockMode::Off,
1281 None,
1282 )
1283 .await;
1284
1285 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1286 let _watcher = {
1287 let tx = tx.clone();
1288 let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
1289 if let Ok(event) = res {
1290 if matches!(
1291 event.kind,
1292 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
1293 ) {
1294 let has_harn = event
1295 .paths
1296 .iter()
1297 .any(|p| p.extension().is_some_and(|ext| ext == "harn"));
1298 if has_harn {
1299 let _ = tx.blocking_send(());
1300 }
1301 }
1302 }
1303 })
1304 .unwrap_or_else(|e| {
1305 eprintln!("Error setting up file watcher: {e}");
1306 process::exit(1);
1307 });
1308 watcher
1309 .watch(watch_dir, RecursiveMode::Recursive)
1310 .unwrap_or_else(|e| {
1311 eprintln!("Error watching directory: {e}");
1312 process::exit(1);
1313 });
1314 watcher };
1316
1317 eprintln!(
1318 "\x1b[2m[watch] watching {} for .harn changes (ctrl-c to stop)\x1b[0m",
1319 watch_dir.display()
1320 );
1321
1322 loop {
1323 rx.recv().await;
1324 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1326 while rx.try_recv().is_ok() {}
1327
1328 eprintln!();
1329 eprintln!("\x1b[2m[watch] change detected, re-running {path}...\x1b[0m");
1330 run_file(
1331 path,
1332 false,
1333 denied_builtins.clone(),
1334 Vec::new(),
1335 CliLlmMockMode::Off,
1336 None,
1337 )
1338 .await;
1339 }
1340}
1341
1342#[cfg(test)]
1343mod tests {
1344 use super::{execute_run, CliLlmMockMode};
1345 use std::collections::HashSet;
1346
1347 #[cfg(feature = "hostlib")]
1348 #[tokio::test]
1349 async fn execute_run_installs_hostlib_gate() {
1350 let temp = tempfile::NamedTempFile::new().expect("temp file");
1351 std::fs::write(
1352 temp.path(),
1353 r#"
1354pipeline main() {
1355 let _ = hostlib_enable("tools:deterministic")
1356 println("enabled")
1357}
1358"#,
1359 )
1360 .expect("write script");
1361
1362 let outcome = execute_run(
1363 &temp.path().to_string_lossy(),
1364 false,
1365 HashSet::new(),
1366 Vec::new(),
1367 Vec::new(),
1368 CliLlmMockMode::Off,
1369 None,
1370 )
1371 .await;
1372
1373 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1374 assert_eq!(outcome.stdout.trim(), "enabled");
1375 }
1376
1377 #[cfg(all(feature = "hostlib", unix))]
1378 #[tokio::test]
1379 async fn execute_run_can_read_hostlib_command_artifacts() {
1380 let temp = tempfile::NamedTempFile::new().expect("temp file");
1381 std::fs::write(
1382 temp.path(),
1383 r#"
1384pipeline main() {
1385 let _ = hostlib_enable("tools:deterministic")
1386 let result = hostlib_tools_run_command({
1387 argv: ["sh", "-c", "i=0; while [ $i -lt 2000 ]; do printf x; i=$((i+1)); done"],
1388 capture: {max_inline_bytes: 8},
1389 timeout_ms: 5000,
1390 })
1391 println(starts_with(result.command_id, "cmd_"))
1392 println(len(result.stdout))
1393 println(result.byte_count)
1394 let window = hostlib_tools_read_command_output({
1395 command_id: result.command_id,
1396 offset: 1990,
1397 length: 20,
1398 })
1399 println(len(window.content))
1400 println(window.eof)
1401}
1402"#,
1403 )
1404 .expect("write script");
1405
1406 let outcome = execute_run(
1407 &temp.path().to_string_lossy(),
1408 false,
1409 HashSet::new(),
1410 Vec::new(),
1411 Vec::new(),
1412 CliLlmMockMode::Off,
1413 None,
1414 )
1415 .await;
1416
1417 assert_eq!(outcome.exit_code, 0, "stderr:\n{}", outcome.stderr);
1418 assert_eq!(outcome.stdout.trim(), "true\n8\n2000\n10\ntrue");
1419 }
1420}