1use std::collections::{BTreeMap, BTreeSet, HashMap};
9use std::sync::Arc;
10use std::time::Duration;
11
12use futures::stream::{FuturesUnordered, StreamExt};
13use serde_json::Value;
14use sha2::{Digest, Sha256};
15use tokio::sync::{OwnedSemaphorePermit, Semaphore};
16
17use crate::agent_events::{ToolCallErrorCategory, ToolCallStatus};
18use crate::tool_annotations::SideEffectLevel;
19use crate::value::{VmError, VmValue};
20use crate::vm::Vm;
21
22mod crystallization;
23mod events;
24mod harn_api;
25mod hosts;
26mod manifest;
27mod types;
28mod typescript;
29
30#[cfg(test)]
31mod tests;
32
33pub use crystallization::composition_crystallization_trace;
34pub use events::composition_report_events;
35pub use harn_api::composition_harn_api;
36pub use hosts::{ClosureCompositionToolHost, StaticCompositionToolHost};
37pub use manifest::{
38 binding_manifest_from_tool_surface, binding_manifest_hash, BindingManifest,
39 BindingManifestEntry, BindingManifestOptions, BindingPolicyDisposition, BindingPolicyStatus,
40 BINDING_MANIFEST_SCHEMA_VERSION,
41};
42pub use types::{
43 CompositionChildCall, CompositionChildResult, CompositionExecutionLimits,
44 CompositionExecutionReport, CompositionExecutionRequest, CompositionFailureCategory,
45 CompositionMcpPolicy, CompositionRetryPolicy, CompositionRunEnvelope, CompositionToolHost,
46 CompositionToolOutput, COMPOSITION_EXECUTION_SCHEMA_VERSION,
47};
48pub use typescript::composition_typescript_declarations;
49
50pub fn composition_snippet_hash(language: &str, snippet: &str) -> String {
52 let mut hasher = Sha256::new();
53 hasher.update(b"harn.composition.snippet.v1\0");
54 hasher.update(language.as_bytes());
55 hasher.update(b"\0");
56 hasher.update(snippet.as_bytes());
57 format!("sha256:{}", hex::encode(hasher.finalize()))
58}
59
60struct ExecutionState {
61 request: CompositionExecutionRequest,
62 calls: Vec<CompositionChildCall>,
63 results: Vec<CompositionChildResult>,
64 clock: Arc<dyn harn_clock::Clock>,
65 started_ms: i64,
66}
67
68impl ExecutionState {
69 fn next_call(
70 &mut self,
71 tool_name: &str,
72 input: Value,
73 ) -> Result<(BindingManifestEntry, CompositionChildCall), VmError> {
74 if self.results.len() as u64 >= self.request.limits.max_operations {
75 return Err(VmError::Runtime(format!(
76 "composition exceeded max_operations={}",
77 self.request.limits.max_operations
78 )));
79 }
80 if let Some(timeout_ms) = self.request.limits.timeout_ms {
81 if elapsed_ms(&*self.clock, self.started_ms) > timeout_ms {
82 return Err(VmError::Runtime(format!(
83 "composition exceeded timeout_ms={timeout_ms}"
84 )));
85 }
86 }
87 let binding = self
88 .request
89 .manifest
90 .find_by_name(tool_name)
91 .or_else(|| self.request.manifest.find_by_binding(tool_name))
92 .cloned()
93 .ok_or_else(|| {
94 VmError::Runtime(format!("composition binding '{tool_name}' not found"))
95 })?;
96 let call = self.push_call(&binding, input);
97 if binding.policy.disposition == BindingPolicyDisposition::Denied {
98 let message = format!(
99 "composition binding '{}' denied{}",
100 binding.name,
101 binding
102 .policy
103 .reason
104 .as_deref()
105 .map(|reason| format!(": {reason}"))
106 .unwrap_or_default()
107 );
108 self.push_failed_result(&call, &message, ToolCallErrorCategory::PermissionDenied);
109 return Err(VmError::Runtime(message));
110 }
111 if binding.policy.disposition == BindingPolicyDisposition::Gated {
112 let message = format!(
113 "composition binding '{}' requires approval and cannot run in read-only mode",
114 binding.name
115 );
116 self.push_failed_result(&call, &message, ToolCallErrorCategory::PermissionDenied);
117 return Err(VmError::Runtime(message));
118 }
119 if binding.side_effect_level.rank() > self.request.requested_side_effect_ceiling.rank() {
120 let message = format!(
121 "composition binding '{}' requires side-effect level '{}' above requested ceiling '{}'",
122 binding.name,
123 binding.side_effect_level.as_str(),
124 self.request.requested_side_effect_ceiling.as_str()
125 );
126 self.push_failed_result(&call, &message, ToolCallErrorCategory::PermissionDenied);
127 return Err(VmError::Runtime(message));
128 }
129 Ok((binding, call))
130 }
131
132 fn push_call(&mut self, binding: &BindingManifestEntry, input: Value) -> CompositionChildCall {
133 let operation_index = self.calls.len() as u64;
134 let call = CompositionChildCall {
135 run_id: self.request.run_id.clone(),
136 tool_call_id: format!("{}:{operation_index}", self.request.run_id),
137 tool_name: binding.name.clone(),
138 operation_index,
139 annotations: Some(binding.annotations.clone()),
140 requested_side_effect_level: binding.side_effect_level,
141 policy_context: serde_json::json!({
142 "disposition": binding.policy.disposition,
143 "reason": binding.policy.reason,
144 "ceiling": self.request.requested_side_effect_ceiling,
145 }),
146 raw_input: input,
147 };
148 self.calls.push(call.clone());
149 call
150 }
151
152 fn push_failed_result(
153 &mut self,
154 call: &CompositionChildCall,
155 message: &str,
156 category: ToolCallErrorCategory,
157 ) {
158 self.results.push(CompositionChildResult {
159 run_id: call.run_id.clone(),
160 tool_call_id: call.tool_call_id.clone(),
161 tool_name: call.tool_name.clone(),
162 operation_index: call.operation_index,
163 status: ToolCallStatus::Failed,
164 raw_output: None,
165 error: Some(message.to_string()),
166 error_category: Some(category),
167 executor: Some(crate::agent_events::ToolExecutor::HarnBuiltin),
168 duration_ms: Some(0),
169 execution_duration_ms: Some(0),
170 attempt: 1,
171 retry_attempts: 0,
172 retry_errors: Vec::new(),
173 retry_delays_ms: Vec::new(),
174 });
175 }
176
177 fn push_result(
178 &mut self,
179 call: &CompositionChildCall,
180 outcome: &CompositionDispatchOutcome,
181 elapsed_ms: u64,
182 ) {
183 if self
184 .results
185 .iter()
186 .any(|result| result.tool_call_id == call.tool_call_id)
187 {
188 return;
189 }
190 self.results.push(CompositionChildResult {
191 run_id: call.run_id.clone(),
192 tool_call_id: call.tool_call_id.clone(),
193 tool_name: call.tool_name.clone(),
194 operation_index: call.operation_index,
195 status: if outcome.output.error.is_some() {
196 ToolCallStatus::Failed
197 } else {
198 ToolCallStatus::Completed
199 },
200 raw_output: outcome.output.value.clone(),
201 error: outcome.output.error.clone(),
202 error_category: outcome.output.error_category,
203 executor: outcome.output.executor.clone(),
204 duration_ms: Some(elapsed_ms),
205 execution_duration_ms: Some(elapsed_ms),
206 attempt: outcome.attempt,
207 retry_attempts: outcome.retry_attempts,
208 retry_errors: outcome.retry_errors.clone(),
209 retry_delays_ms: outcome.retry_delays_ms.clone(),
210 });
211 }
212}
213
214#[derive(Clone)]
215struct CompositionRuntime {
216 state: Arc<parking_lot::Mutex<ExecutionState>>,
217 host: Arc<dyn CompositionToolHost>,
218 bulkheads: Arc<CompositionBulkheads>,
219}
220
221struct CompositionBulkheads {
222 global: Arc<Semaphore>,
223 per_server: parking_lot::Mutex<HashMap<String, Arc<Semaphore>>>,
224 per_server_limit: usize,
225}
226
227impl CompositionBulkheads {
228 fn new(limits: &CompositionExecutionLimits) -> Self {
229 Self {
230 global: Arc::new(Semaphore::new(
231 limits
232 .max_concurrent_operations
233 .clamp(1, Semaphore::MAX_PERMITS),
234 )),
235 per_server: parking_lot::Mutex::new(HashMap::new()),
236 per_server_limit: limits
237 .max_concurrent_per_server
238 .clamp(1, Semaphore::MAX_PERMITS),
239 }
240 }
241
242 async fn acquire(
243 &self,
244 binding: &BindingManifestEntry,
245 ) -> Result<(OwnedSemaphorePermit, Option<OwnedSemaphorePermit>), VmError> {
246 let global = self
247 .global
248 .clone()
249 .acquire_owned()
250 .await
251 .map_err(|_| VmError::Runtime("composition bulkhead closed".to_string()))?;
252 let server = mcp_server_name(binding);
253 let per_server = match server {
254 Some(server) => {
255 let semaphore = {
256 let mut semaphores = self.per_server.lock();
257 semaphores
258 .entry(server)
259 .or_insert_with(|| Arc::new(Semaphore::new(self.per_server_limit)))
260 .clone()
261 };
262 Some(semaphore.acquire_owned().await.map_err(|_| {
263 VmError::Runtime("composition per-server bulkhead closed".to_string())
264 })?)
265 }
266 None => None,
267 };
268 Ok((global, per_server))
269 }
270}
271
272struct CompositionDispatchOutcome {
273 output: CompositionToolOutput,
274 attempt: u32,
275 retry_attempts: u32,
276 retry_errors: Vec<String>,
277 retry_delays_ms: Vec<u64>,
278}
279
280pub async fn execute_harn_composition(
282 mut request: CompositionExecutionRequest,
283 host: Arc<dyn CompositionToolHost>,
284) -> CompositionExecutionReport {
285 if request.run_id.trim().is_empty() {
286 request.run_id = uuid::Uuid::now_v7().to_string();
287 }
288 if request.language.trim().is_empty() {
289 request.language = "harn".to_string();
290 }
291 let manifest_hash = request
292 .manifest
293 .hash()
294 .unwrap_or_else(|_| "sha256:manifest_hash_error".to_string());
295 let snippet_hash = composition_snippet_hash(&request.language, &request.snippet);
296 let mut run = CompositionRunEnvelope::read_only(
297 request.run_id.clone(),
298 request.language.clone(),
299 snippet_hash,
300 manifest_hash,
301 );
302 let session_id = request.session_id.clone();
303 run.requested_side_effect_ceiling = request.requested_side_effect_ceiling;
304 run.metadata = request.metadata.clone();
305 if !run.metadata.is_object() {
306 run.metadata = Value::Object(serde_json::Map::new());
307 }
308 if let Some(session_id) = &session_id {
309 run.metadata["session_id"] = Value::String(session_id.clone());
310 }
311 let clock = harn_clock::RealClock::arc();
312 let started_ms = clock.monotonic_ms();
313
314 let result = if request.language != "harn" {
315 Err((
316 CompositionFailureCategory::UnsupportedLanguage,
317 format!("unsupported composition language '{}'", request.language),
318 Vec::new(),
319 Vec::new(),
320 ))
321 } else if request.requested_side_effect_ceiling.rank() > SideEffectLevel::ReadOnly.rank() {
322 Err((
323 CompositionFailureCategory::PolicyDenied,
324 "read-only composition executor refuses side-effect ceilings above read_only"
325 .to_string(),
326 Vec::new(),
327 Vec::new(),
328 ))
329 } else {
330 execute_harn_composition_inner(request, host).await
331 };
332
333 let report = match result {
334 Ok((value, stdout, calls, results)) => {
335 run.result = Some(value);
336 run.stdout = (!stdout.is_empty()).then_some(stdout);
337 run.duration_ms = Some(elapsed_ms(&*clock, started_ms));
338 CompositionExecutionReport {
339 schema_version: COMPOSITION_EXECUTION_SCHEMA_VERSION,
340 ok: true,
341 summary: format!(
342 "composition completed with {} child operation(s)",
343 results.len()
344 ),
345 run,
346 child_calls: calls,
347 child_results: results,
348 }
349 }
350 Err((category, error, calls, results)) => {
351 run.failure_category = Some(category);
352 run.error = Some(error.clone());
353 run.duration_ms = Some(elapsed_ms(&*clock, started_ms));
354 CompositionExecutionReport {
355 schema_version: COMPOSITION_EXECUTION_SCHEMA_VERSION,
356 ok: false,
357 summary: error,
358 run,
359 child_calls: calls,
360 child_results: results,
361 }
362 }
363 };
364 if let Some(session_id) = session_id {
365 events::emit_composition_report_events(&session_id, &report);
366 }
367 report
368}
369
370async fn execute_harn_composition_inner(
371 request: CompositionExecutionRequest,
372 host: Arc<dyn CompositionToolHost>,
373) -> Result<
374 (
375 Value,
376 String,
377 Vec<CompositionChildCall>,
378 Vec<CompositionChildResult>,
379 ),
380 (
381 CompositionFailureCategory,
382 String,
383 Vec<CompositionChildCall>,
384 Vec<CompositionChildResult>,
385 ),
386> {
387 let validation_source = composition_validation_source(&request.snippet);
388 let validation_program = harn_parser::parse_source(&validation_source).map_err(|error| {
389 (
390 CompositionFailureCategory::SchemaValidation,
391 format!("composition parse error: {error}"),
392 Vec::new(),
393 Vec::new(),
394 )
395 })?;
396 validate_composition_program(&validation_program, &request.manifest).map_err(|error| {
397 (
398 CompositionFailureCategory::PolicyDenied,
399 error,
400 Vec::new(),
401 Vec::new(),
402 )
403 })?;
404
405 let source = composition_source(&request.manifest, &request.snippet);
406 let program = harn_parser::parse_source(&source).map_err(|error| {
407 (
408 CompositionFailureCategory::SchemaValidation,
409 format!("composition parse error: {error}"),
410 Vec::new(),
411 Vec::new(),
412 )
413 })?;
414 let chunk = crate::Compiler::new()
415 .compile_named(&program, "main")
416 .map_err(|error| {
417 (
418 CompositionFailureCategory::SchemaValidation,
419 format!("composition compile error: {error}"),
420 Vec::new(),
421 Vec::new(),
422 )
423 })?;
424
425 let execution_clock = harn_clock::RealClock::arc();
426 let execution_started_ms = execution_clock.monotonic_ms();
427 let state = Arc::new(parking_lot::Mutex::new(ExecutionState {
428 request,
429 calls: Vec::new(),
430 results: Vec::new(),
431 clock: execution_clock,
432 started_ms: execution_started_ms,
433 }));
434 let mut vm = Vm::new();
435 crate::register_core_stdlib(&mut vm);
436 let limits = state.lock().request.limits.clone();
437 let runtime = CompositionRuntime {
438 state: state.clone(),
439 host,
440 bulkheads: Arc::new(CompositionBulkheads::new(&limits)),
441 };
442 register_composition_call_builtin(&mut vm, runtime.clone());
443 register_composition_map_bounded_builtin(&mut vm, runtime);
444 if let Some(timeout_ms) = state.lock().request.limits.timeout_ms {
445 vm.push_deadline_after(std::time::Duration::from_millis(timeout_ms));
446 }
447 vm.set_source_info("composition://snippet.harn", &source);
448 match vm.execute(&chunk).await {
449 Ok(value) => {
450 let json = crate::llm::vm_value_to_json(&value);
451 let stdout = vm.output().to_string();
452 let state = state.lock();
453 let result_size = serde_json::to_vec(&json)
454 .map(|bytes| bytes.len())
455 .unwrap_or(0);
456 let output_size = result_size.saturating_add(stdout.len());
457 if output_size as u64 > state.request.limits.max_output_bytes {
458 return Err((
459 CompositionFailureCategory::ExecutionError,
460 format!(
461 "composition output exceeded max_output_bytes={}",
462 state.request.limits.max_output_bytes
463 ),
464 state.calls.clone(),
465 state.results.clone(),
466 ));
467 }
468 Ok((json, stdout, state.calls.clone(), state.results.clone()))
469 }
470 Err(error) => {
471 let state = state.lock();
472 let category = if error.to_string().contains("denied")
473 || error.to_string().contains("side-effect")
474 || error.to_string().contains("approval")
475 {
476 CompositionFailureCategory::PolicyDenied
477 } else if error.to_string().contains("Deadline exceeded")
478 || error.to_string().contains("max_operations")
479 || error.to_string().contains("timeout_ms")
480 || error.to_string().contains("max_output_bytes")
481 {
482 CompositionFailureCategory::Timeout
483 } else if state
484 .results
485 .iter()
486 .any(|result| result.status == ToolCallStatus::Failed)
487 {
488 CompositionFailureCategory::ChildToolError
489 } else {
490 CompositionFailureCategory::ExecutionError
491 };
492 Err((
493 category,
494 error.to_string(),
495 state.calls.clone(),
496 state.results.clone(),
497 ))
498 }
499 }
500}
501
502fn register_composition_call_builtin(vm: &mut Vm, runtime: CompositionRuntime) {
503 vm.register_async_builtin("__composition_call", move |_ctx, args| {
504 let runtime = runtime.clone();
505 async move {
506 let tool_name = args
507 .first()
508 .map(VmValue::display)
509 .ok_or_else(|| VmError::Runtime("__composition_call: missing tool name".into()))?;
510 let input = args
511 .get(1)
512 .map(crate::llm::vm_value_to_json)
513 .unwrap_or_else(|| serde_json::json!({}));
514 let (binding, call, clock) = {
515 let mut state = runtime.state.lock();
516 let (binding, call) = state.next_call(&tool_name, input.clone())?;
517 (binding, call, state.clock.clone())
518 };
519 let started_ms = clock.monotonic_ms();
520 let outcome = dispatch_binding_with_policy(&runtime, &binding, input).await?;
521 {
522 let mut state = runtime.state.lock();
523 state.push_result(&call, &outcome, elapsed_ms(&*clock, started_ms));
524 }
525 if let Some(error) = outcome.output.error {
526 return Err(VmError::Runtime(error));
527 }
528 Ok(crate::json_to_vm_value(
529 &outcome.output.value.unwrap_or(Value::Null),
530 ))
531 }
532 });
533}
534
535async fn dispatch_binding_with_policy(
536 runtime: &CompositionRuntime,
537 binding: &BindingManifestEntry,
538 input: Value,
539) -> Result<CompositionDispatchOutcome, VmError> {
540 let policy = runtime.state.lock().request.mcp_policy.clone();
541 let retry = policy.retry.clone();
542 let max_attempts = retry.max_attempts.max(1);
543 let can_retry = retry_allowed(binding, &input, &policy);
544 let mut attempt = 1u32;
545 let mut retry_errors = Vec::new();
546 let mut retry_delays_ms = Vec::new();
547
548 loop {
549 let (_global_permit, _server_permit) = runtime.bulkheads.acquire(binding).await?;
550 let call = runtime.host.call(binding, input.clone());
551 let mut output = if let Some(timeout_ms) = policy.call_timeout_ms.filter(|ms| *ms > 0) {
552 match tokio::time::timeout(Duration::from_millis(timeout_ms), call).await {
553 Ok(output) => output,
554 Err(_) => CompositionToolOutput::error(
555 format!(
556 "composition binding '{}' timed out after {timeout_ms}ms",
557 binding.name
558 ),
559 ToolCallErrorCategory::Timeout,
560 ),
561 }
562 } else {
563 call.await
564 };
565 drop((_global_permit, _server_permit));
566
567 if output.error.is_none() {
568 if let Some(value) = output.value.take() {
569 match validate_binding_output(binding, value) {
570 Ok(value) => output.value = Some(value),
571 Err(message) => {
572 output = CompositionToolOutput::error(
573 message,
574 ToolCallErrorCategory::SchemaValidation,
575 );
576 }
577 }
578 }
579 }
580
581 if output.error.is_none()
582 || attempt >= max_attempts
583 || !can_retry
584 || !is_retryable_child_error(&output)
585 {
586 return Ok(CompositionDispatchOutcome {
587 output,
588 attempt,
589 retry_attempts: attempt.saturating_sub(1),
590 retry_errors,
591 retry_delays_ms,
592 });
593 }
594
595 let error = output
596 .error
597 .clone()
598 .unwrap_or_else(|| "composition child call failed".to_string());
599 let delay_ms = compute_retry_delay_ms(binding, &input, attempt, &retry, &error);
600 retry_errors.push(error);
601 retry_delays_ms.push(delay_ms);
602 if delay_ms > 0 {
603 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
604 }
605 attempt = attempt.saturating_add(1);
606 }
607}
608
609fn validate_binding_output(binding: &BindingManifestEntry, value: Value) -> Result<Value, String> {
610 let Some(schema) = &binding.output_schema else {
611 return Ok(value);
612 };
613 let value_vm = crate::json_to_vm_value(&value);
614 let schema_vm = crate::json_to_vm_value(schema);
615 crate::schema::schema_expect_value(&value_vm, &schema_vm, false)
616 .map(|value| crate::llm::vm_value_to_json(&value))
617 .map_err(|error| {
618 format!(
619 "composition binding '{}' outputSchema validation failed: {error}",
620 binding.name
621 )
622 })
623}
624
625fn retry_allowed(
626 binding: &BindingManifestEntry,
627 input: &Value,
628 policy: &CompositionMcpPolicy,
629) -> bool {
630 if idempotency_key_present(input) {
631 return true;
632 }
633 if binding.source == "mcp_server" {
634 if !mcp_binding_trusted(binding, policy) {
635 return false;
636 }
637 return binding.annotations.destructive_hint != Some(true)
638 && (binding.annotations.read_only_hint == Some(true)
639 || binding.annotations.idempotent_hint == Some(true));
640 }
641 binding.side_effect_level == SideEffectLevel::ReadOnly
642 && binding.annotations.kind.is_read_only()
643}
644
645fn mcp_binding_trusted(binding: &BindingManifestEntry, policy: &CompositionMcpPolicy) -> bool {
646 policy.trust_annotations
647 || mcp_server_name(binding)
648 .as_ref()
649 .is_some_and(|server| policy.trusted_servers.contains(server))
650}
651
652fn mcp_server_name(binding: &BindingManifestEntry) -> Option<String> {
653 binding
654 .metadata
655 .get("_mcp_server")
656 .or_else(|| binding.metadata.get("mcp_server"))
657 .or_else(|| binding.metadata.pointer("/server/name"))
658 .and_then(Value::as_str)
659 .filter(|server| !server.is_empty())
660 .map(ToOwned::to_owned)
661}
662
663fn idempotency_key_present(input: &Value) -> bool {
664 for pointer in [
665 "/idempotency_key",
666 "/idempotencyKey",
667 "/_idempotency_key",
668 "/_meta/idempotencyKey",
669 "/_meta/harn/idempotencyKey",
670 ] {
671 if input.pointer(pointer).is_some_and(|value| match value {
672 Value::String(value) => !value.trim().is_empty(),
673 Value::Null => false,
674 _ => true,
675 }) {
676 return true;
677 }
678 }
679 false
680}
681
682fn is_retryable_child_error(output: &CompositionToolOutput) -> bool {
683 if matches!(
684 output.error_category,
685 Some(
686 ToolCallErrorCategory::Network
687 | ToolCallErrorCategory::Timeout
688 | ToolCallErrorCategory::McpServerError
689 )
690 ) {
691 return true;
692 }
693 let Some(error) = &output.error else {
694 return false;
695 };
696 let error = error.to_ascii_lowercase();
697 [
698 "429",
699 "503",
700 "retry-after",
701 "rate limit",
702 "rate-limit",
703 "timeout",
704 "timed out",
705 "transient",
706 "overloaded",
707 "server closed connection",
708 "disconnected",
709 "mcp read error",
710 "mcp write error",
711 "connection reset",
712 ]
713 .iter()
714 .any(|needle| error.contains(needle))
715}
716
717fn compute_retry_delay_ms(
718 binding: &BindingManifestEntry,
719 input: &Value,
720 attempt: u32,
721 retry: &CompositionRetryPolicy,
722 error: &str,
723) -> u64 {
724 if retry.max_delay_ms == 0 {
725 return 0;
726 }
727 if retry.honor_retry_after {
728 if let Some(delay) = retry_after_ms_from_error(error) {
729 return delay.min(retry.max_delay_ms);
730 }
731 }
732 let shift = attempt.saturating_sub(1).min(20);
733 let multiplier = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
734 let base = retry.base_delay_ms.saturating_mul(multiplier);
735 if base == 0 {
736 return 0;
737 }
738 let jitter_span = (base / 2).max(1);
739 let mut hasher = Sha256::new();
740 hasher.update(binding.name.as_bytes());
741 hasher.update(b"\0");
742 hasher.update(attempt.to_le_bytes());
743 hasher.update(b"\0");
744 if let Ok(bytes) = serde_json::to_vec(input) {
745 hasher.update(bytes);
746 }
747 let digest = hasher.finalize();
748 let jitter = u64::from_le_bytes(digest[..8].try_into().unwrap_or([0; 8])) % (jitter_span + 1);
749 base.saturating_add(jitter).min(retry.max_delay_ms)
750}
751
752fn retry_after_ms_from_error(error: &str) -> Option<u64> {
753 let lower = error.to_ascii_lowercase();
754 let (_, tail) = lower.split_once("retry-after")?;
755 let value = tail
756 .trim_start_matches(|c: char| c == ':' || c == '=' || c.is_whitespace())
757 .split(|c: char| !c.is_ascii_digit())
758 .next()
759 .filter(|value| !value.is_empty())?;
760 value
761 .parse::<u64>()
762 .ok()
763 .map(|seconds| seconds.saturating_mul(1000))
764}
765
766fn register_composition_map_bounded_builtin(vm: &mut Vm, runtime: CompositionRuntime) {
767 vm.register_async_builtin("map_bounded", move |ctx, args| {
768 let runtime = runtime.clone();
769 async move {
770 let items = match args.first() {
771 Some(VmValue::List(items)) => items.as_ref().clone(),
772 Some(other) => {
773 return Err(VmError::TypeError(format!(
774 "map_bounded: first argument must be a list, got {}",
775 other.type_name()
776 )))
777 }
778 None => {
779 return Err(VmError::Runtime(
780 "map_bounded: first argument must be a list".to_string(),
781 ))
782 }
783 };
784 let closure = match args.get(1) {
785 Some(VmValue::Closure(closure)) => closure.clone(),
786 Some(other) => {
787 return Err(VmError::TypeError(format!(
788 "map_bounded: second argument must be a closure, got {}",
789 other.type_name()
790 )))
791 }
792 None => {
793 return Err(VmError::Runtime(
794 "map_bounded: second argument must be a closure".to_string(),
795 ))
796 }
797 };
798 let options = args
799 .get(2)
800 .map(crate::llm::vm_value_to_json)
801 .unwrap_or_else(|| serde_json::json!({}));
802 let default_cap = runtime
803 .state
804 .lock()
805 .request
806 .limits
807 .max_concurrent_operations
808 .max(1);
809 let cap = options
810 .get("concurrency")
811 .or_else(|| options.get("max_concurrent"))
812 .and_then(Value::as_u64)
813 .map(|value| value.max(1) as usize)
814 .unwrap_or(default_cap)
815 .min(items.len().max(1));
816
817 let total = items.len();
818 let mut pending = items.into_iter().enumerate();
819 let mut in_flight = FuturesUnordered::new();
820 let mut results: Vec<Option<VmValue>> = vec![None; total];
821 let mut succeeded = 0i64;
822 let mut failed = 0i64;
823
824 while in_flight.len() < cap {
825 let Some((index, item)) = pending.next() else {
826 break;
827 };
828 in_flight.push(run_map_bounded_item(
829 ctx.clone(),
830 closure.clone(),
831 index,
832 item,
833 ));
834 }
835 while let Some((index, output, result)) = in_flight.next().await {
836 ctx.forward_output(&output);
837 match result {
838 Ok(value) => {
839 succeeded += 1;
840 results[index] = Some(VmValue::enum_variant("Result", "Ok", vec![value]));
841 }
842 Err(error) => {
843 failed += 1;
844 results[index] = Some(VmValue::enum_variant(
845 "Result",
846 "Err",
847 vec![VmValue::String(std::sync::Arc::from(error.to_string()))],
848 ));
849 }
850 }
851 if let Some((next_index, next_item)) = pending.next() {
852 in_flight.push(run_map_bounded_item(
853 ctx.clone(),
854 closure.clone(),
855 next_index,
856 next_item,
857 ));
858 }
859 }
860
861 let mut dict = BTreeMap::new();
862 dict.insert(
863 "results".to_string(),
864 VmValue::List(std::sync::Arc::new(
865 results
866 .into_iter()
867 .map(|value| {
868 value.unwrap_or_else(|| {
869 VmValue::enum_variant(
870 "Result",
871 "Err",
872 vec![VmValue::String(std::sync::Arc::from(
873 "map_bounded: task did not produce a result",
874 ))],
875 )
876 })
877 })
878 .collect(),
879 )),
880 );
881 dict.insert("succeeded".to_string(), VmValue::Int(succeeded));
882 dict.insert("failed".to_string(), VmValue::Int(failed));
883 Ok(VmValue::Dict(std::sync::Arc::new(dict)))
884 }
885 });
886}
887
888async fn run_map_bounded_item(
889 ctx: crate::vm::AsyncBuiltinCtx,
890 closure: std::sync::Arc<crate::VmClosure>,
891 index: usize,
892 item: VmValue,
893) -> (usize, String, Result<VmValue, VmError>) {
894 let mut vm = ctx.child_vm();
895 let result = vm.call_closure_pub(&closure, &[item]).await;
896 let output = vm.take_output();
897 (index, output, result)
898}
899
900fn elapsed_ms(clock: &dyn harn_clock::Clock, started_ms: i64) -> u64 {
901 clock.monotonic_ms().saturating_sub(started_ms).max(0) as u64
902}
903
904fn composition_validation_source(snippet: &str) -> String {
905 let mut source = String::from("pipeline main() {\n");
906 source.push_str(snippet);
907 if !snippet.ends_with('\n') {
908 source.push('\n');
909 }
910 source.push_str("}\n");
911 source
912}
913
914fn composition_source(manifest: &BindingManifest, snippet: &str) -> String {
915 let mut source = String::new();
916 for binding in &manifest.bindings {
917 source.push_str(&format!(
918 "fn {}(args = {{}}) {{ return __composition_call(\"{}\", args) }}\n",
919 binding.binding,
920 escape_harn_string(&binding.name)
921 ));
922 }
923 source.push_str("pipeline main() {\n");
924 source.push_str(snippet);
925 if !snippet.ends_with('\n') {
926 source.push('\n');
927 }
928 source.push_str("}\n");
929 source
930}
931
932fn escape_harn_string(value: &str) -> String {
933 value.replace('\\', "\\\\").replace('"', "\\\"")
934}
935
936fn validate_composition_program(
937 program: &[harn_parser::SNode],
938 manifest: &BindingManifest,
939) -> Result<(), String> {
940 use harn_parser::visit::walk_program;
941 use harn_parser::Node;
942
943 let bindings = manifest
944 .bindings
945 .iter()
946 .map(|entry| entry.binding.clone())
947 .collect::<BTreeSet<_>>();
948 let mut local_functions = BTreeSet::from(["__composition_call".to_string()]);
949 walk_program(program, &mut |node| {
950 if let Node::FnDecl { name, .. } = &node.node {
951 local_functions.insert(name.clone());
952 }
953 });
954
955 let mut error = None;
956 walk_program(program, &mut |node| {
957 if error.is_some() {
958 return;
959 }
960 match &node.node {
961 Node::ImportDecl { .. } | Node::SelectiveImport { .. } => {
962 error = Some("composition snippets cannot import modules".to_string());
963 }
964 Node::SpawnExpr { .. } | Node::Parallel { .. } => {
965 error = Some("composition snippets cannot spawn or parallelize work".to_string());
966 }
967 Node::HitlExpr { .. } => {
968 error = Some("composition snippets cannot request HITL directly".to_string());
969 }
970 Node::CostRoute { .. } => {
971 error = Some("composition snippets cannot open LLM routing blocks".to_string());
972 }
973 Node::FunctionCall { name, .. } => {
974 if DENIED_COMPOSITION_CALLS.contains(&name.as_str()) && !bindings.contains(name) {
975 error = Some(format!("composition snippets cannot call `{name}`"));
976 } else if !bindings.contains(name)
977 && !local_functions.contains(name)
978 && !PURE_COMPOSITION_CALLS.contains(&name.as_str())
979 {
980 error = Some(format!(
981 "composition call target `{name}` is not a manifest binding or pure helper"
982 ));
983 }
984 }
985 _ => {}
986 }
987 });
988 error.map_or(Ok(()), Err)
989}
990
991const DENIED_COMPOSITION_CALLS: &[&str] = &[
992 "append_file",
993 "ask_user",
994 "connector_call",
995 "copy_file",
996 "delete_file",
997 "dual_control",
998 "escalate_to",
999 "event_log_emit",
1000 "event_log.emit",
1001 "exec",
1002 "host_call",
1003 "host_tool_call",
1004 "http_delete",
1005 "http_download",
1006 "http_get",
1007 "http_patch",
1008 "http_post",
1009 "http_put",
1010 "http_request",
1011 "llm_call",
1012 "mcp_call",
1013 "mcp_connect",
1014 "pg_execute",
1015 "pg_query",
1016 "request_approval",
1017 "secret_get",
1018 "write_file",
1019];
1020
1021const PURE_COMPOSITION_CALLS: &[&str] = &[
1022 "Ok",
1023 "Err",
1024 "abs",
1025 "assert",
1026 "assert_eq",
1027 "assert_ne",
1028 "base64_decode",
1029 "base64_encode",
1030 "ceil",
1031 "contains",
1032 "dedup_by",
1033 "dirname",
1034 "entries",
1035 "ends_with",
1036 "flat_map",
1037 "floor",
1038 "format",
1039 "group_by",
1040 "hash_value",
1041 "hex_decode",
1042 "hex_encode",
1043 "is_err",
1044 "is_ok",
1045 "join",
1046 "jq",
1047 "jq_first",
1048 "json_extract",
1049 "json_parse",
1050 "json_pointer",
1051 "json_stringify",
1052 "keys",
1053 "len",
1054 "lower",
1055 "map_bounded",
1056 "parse_float_or",
1057 "parse_int_or",
1058 "split",
1059 "starts_with",
1060 "to_float",
1061 "to_int",
1062 "to_string",
1063 "trim",
1064 "upper",
1065 "values",
1066];
1067
1068pub fn composition_search_examples(query: &str, limit: usize) -> Value {
1069 let mut examples = vec![
1070 serde_json::json!({
1071 "id": "read-summarize",
1072 "title": "Read two files and return a compact summary",
1073 "language": "harn",
1074 "snippet": "let readme = read_file({path: \"README.md\"})\nlet spec = read_file({path: \"spec/HARN_SPEC.md\", limit: 80})\nreturn {readme: readme, spec_excerpt: spec}",
1075 "required_side_effect_level": "read_only",
1076 "tools": ["read_file"]
1077 }),
1078 serde_json::json!({
1079 "id": "search-then-read",
1080 "title": "Search first, then read the best candidate",
1081 "language": "harn",
1082 "snippet": "let hits = search({query: \"CompositionRunEnvelope\"})\nreturn hits",
1083 "required_side_effect_level": "read_only",
1084 "tools": ["search"]
1085 }),
1086 ];
1087 if !query.trim().is_empty() {
1088 let q = query.to_ascii_lowercase();
1089 examples.retain(|example| {
1090 example
1091 .to_string()
1092 .to_ascii_lowercase()
1093 .contains(q.as_str())
1094 });
1095 }
1096 examples.truncate(limit.max(1));
1097 Value::Array(examples)
1098}
1099
1100pub fn register_composition_builtins(vm: &mut Vm) {
1101 vm.register_builtin("composition_binding_manifest", |args, _out| {
1102 let tools = args
1103 .first()
1104 .map(crate::llm::vm_value_to_json)
1105 .unwrap_or(Value::Null);
1106 let options_json = args
1107 .get(1)
1108 .map(crate::llm::vm_value_to_json)
1109 .unwrap_or(Value::Null);
1110 let mut options = BindingManifestOptions::default();
1111 if let Some(ceiling) = options_json
1112 .get("side_effect_ceiling")
1113 .and_then(Value::as_str)
1114 {
1115 options.side_effect_ceiling = SideEffectLevel::parse(ceiling);
1116 }
1117 if let Some(include_denied) = options_json.get("include_denied").and_then(Value::as_bool) {
1118 options.include_denied = include_denied;
1119 }
1120 options.denied_tools = string_set_option(&options_json, "denied_tools");
1121 options.gated_tools = string_set_option(&options_json, "gated_tools");
1122 let manifest = binding_manifest_from_tool_surface(&tools, options);
1123 let value = if options_json.get("form").and_then(Value::as_str) == Some("compact") {
1124 manifest.to_compact_value()
1125 } else {
1126 manifest.to_value()
1127 };
1128 Ok(crate::json_to_vm_value(&value))
1129 });
1130
1131 vm.register_builtin("composition_search_examples", |args, _out| {
1132 let query = args.first().map(VmValue::display).unwrap_or_default();
1133 let limit = args
1134 .get(1)
1135 .and_then(|value| match value {
1136 VmValue::Int(n) => Some((*n).max(1) as usize),
1137 _ => None,
1138 })
1139 .unwrap_or(10);
1140 Ok(crate::json_to_vm_value(&composition_search_examples(
1141 &query, limit,
1142 )))
1143 });
1144
1145 vm.register_builtin("composition_typescript_declarations", |args, _out| {
1146 let manifest_value = args
1147 .first()
1148 .map(crate::llm::vm_value_to_json)
1149 .ok_or_else(|| {
1150 VmError::Runtime("composition_typescript_declarations: manifest is required".into())
1151 })?;
1152 let manifest: BindingManifest =
1153 serde_json::from_value(manifest_value).map_err(|error| {
1154 VmError::Runtime(format!(
1155 "composition_typescript_declarations: invalid manifest: {error}"
1156 ))
1157 })?;
1158 Ok(VmValue::String(std::sync::Arc::from(
1159 composition_typescript_declarations(&manifest),
1160 )))
1161 });
1162
1163 vm.register_builtin("composition_harn_api", |args, _out| {
1164 let manifest_value = args
1165 .first()
1166 .map(crate::llm::vm_value_to_json)
1167 .ok_or_else(|| VmError::Runtime("composition_harn_api: manifest is required".into()))?;
1168 let manifest: BindingManifest =
1169 serde_json::from_value(manifest_value).map_err(|error| {
1170 VmError::Runtime(format!("composition_harn_api: invalid manifest: {error}"))
1171 })?;
1172 Ok(VmValue::String(std::sync::Arc::from(composition_harn_api(
1173 &manifest,
1174 ))))
1175 });
1176
1177 vm.register_builtin("composition_crystallization_trace", |args, _out| {
1178 let report_value = args
1179 .first()
1180 .map(crate::llm::vm_value_to_json)
1181 .ok_or_else(|| {
1182 VmError::Runtime("composition_crystallization_trace: report is required".into())
1183 })?;
1184 let report: CompositionExecutionReport =
1185 serde_json::from_value(report_value).map_err(|error| {
1186 VmError::Runtime(format!(
1187 "composition_crystallization_trace: invalid report: {error}"
1188 ))
1189 })?;
1190 let options = args
1191 .get(1)
1192 .map(crate::llm::vm_value_to_json)
1193 .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
1194 Ok(crate::json_to_vm_value(&composition_crystallization_trace(
1195 &report, &options,
1196 )))
1197 });
1198
1199 vm.register_async_builtin("composition_execute", |ctx, args| async move {
1200 let snippet = args
1201 .first()
1202 .map(VmValue::display)
1203 .ok_or_else(|| VmError::Runtime("composition_execute: snippet is required".into()))?;
1204 let manifest_value = args
1205 .get(1)
1206 .map(crate::llm::vm_value_to_json)
1207 .ok_or_else(|| VmError::Runtime("composition_execute: manifest is required".into()))?;
1208 let dispatcher = args.get(2).and_then(|value| match value {
1209 VmValue::Closure(closure) => Some((**closure).clone()),
1210 VmValue::Dict(dict) => match dict.get("dispatcher") {
1211 Some(VmValue::Closure(closure)) => Some((**closure).clone()),
1212 _ => None,
1213 },
1214 _ => None,
1215 });
1216 let mut request = CompositionExecutionRequest {
1217 snippet,
1218 manifest: serde_json::from_value(manifest_value).map_err(|error| {
1219 VmError::Runtime(format!("composition_execute: invalid manifest: {error}"))
1220 })?,
1221 ..CompositionExecutionRequest::default()
1222 };
1223 if let Some(options) = args.get(2).map(crate::llm::vm_value_to_json) {
1224 if let Some(session_id) = options.get("session_id").and_then(Value::as_str) {
1225 request.session_id = Some(session_id.to_string());
1226 }
1227 if let Some(run_id) = options.get("run_id").and_then(Value::as_str) {
1228 request.run_id = run_id.to_string();
1229 }
1230 if let Some(max_operations) = options.get("max_operations").and_then(Value::as_u64) {
1231 request.limits.max_operations = max_operations;
1232 }
1233 if let Some(timeout_ms) = options.get("timeout_ms").and_then(Value::as_u64) {
1234 request.limits.timeout_ms = Some(timeout_ms);
1235 }
1236 if let Some(max_output_bytes) = options.get("max_output_bytes").and_then(Value::as_u64)
1237 {
1238 request.limits.max_output_bytes = max_output_bytes;
1239 }
1240 if let Some(max_concurrent) = options
1241 .get("max_concurrent_operations")
1242 .or_else(|| options.get("max_concurrent"))
1243 .and_then(Value::as_u64)
1244 {
1245 request.limits.max_concurrent_operations =
1246 usize::try_from(max_concurrent).unwrap_or(usize::MAX).max(1);
1247 }
1248 if let Some(per_server) = options
1249 .get("max_concurrent_per_server")
1250 .or_else(|| options.get("per_server_concurrency"))
1251 .and_then(Value::as_u64)
1252 {
1253 request.limits.max_concurrent_per_server =
1254 usize::try_from(per_server).unwrap_or(usize::MAX).max(1);
1255 }
1256 let trusted_servers = string_set_option(&options, "trusted_servers");
1257 let trusted_mcp_servers = string_set_option(&options, "trusted_mcp_servers");
1258 if !trusted_servers.is_empty() || !trusted_mcp_servers.is_empty() {
1259 request
1260 .mcp_policy
1261 .trusted_servers
1262 .extend(trusted_servers.into_iter().chain(trusted_mcp_servers));
1263 }
1264 if let Some(trust_annotations) = options
1265 .get("trust_annotations")
1266 .or_else(|| options.get("trust_mcp_annotations"))
1267 .and_then(Value::as_bool)
1268 {
1269 request.mcp_policy.trust_annotations = trust_annotations;
1270 }
1271 if let Some(call_timeout_ms) = options.get("call_timeout_ms").and_then(Value::as_u64) {
1272 request.mcp_policy.call_timeout_ms = Some(call_timeout_ms);
1273 }
1274 if let Some(retry_options) = options.get("retry") {
1275 if let Some(max_attempts) =
1276 retry_options.get("max_attempts").and_then(Value::as_u64)
1277 {
1278 request.mcp_policy.retry.max_attempts =
1279 u32::try_from(max_attempts).unwrap_or(u32::MAX).max(1);
1280 }
1281 if let Some(base_delay_ms) =
1282 retry_options.get("base_delay_ms").and_then(Value::as_u64)
1283 {
1284 request.mcp_policy.retry.base_delay_ms = base_delay_ms;
1285 }
1286 if let Some(max_delay_ms) =
1287 retry_options.get("max_delay_ms").and_then(Value::as_u64)
1288 {
1289 request.mcp_policy.retry.max_delay_ms = max_delay_ms;
1290 }
1291 if let Some(honor_retry_after) = retry_options
1292 .get("honor_retry_after")
1293 .and_then(Value::as_bool)
1294 {
1295 request.mcp_policy.retry.honor_retry_after = honor_retry_after;
1296 }
1297 }
1298 }
1299 let host: Arc<dyn CompositionToolHost> = match dispatcher {
1300 Some(closure) => Arc::new(ClosureCompositionToolHost::new(closure, ctx.clone())),
1301 None => Arc::new(StaticCompositionToolHost::new(BTreeMap::new())),
1302 };
1303 let report = execute_harn_composition(request, host).await;
1304 Ok(crate::json_to_vm_value(
1305 &serde_json::to_value(report).unwrap_or_else(|_| serde_json::json!({"ok": false})),
1306 ))
1307 });
1308}
1309
1310fn string_set_option(value: &Value, key: &str) -> BTreeSet<String> {
1311 value
1312 .get(key)
1313 .and_then(Value::as_array)
1314 .map(|items| {
1315 items
1316 .iter()
1317 .filter_map(Value::as_str)
1318 .map(ToOwned::to_owned)
1319 .collect()
1320 })
1321 .unwrap_or_default()
1322}