Skip to main content

harn_vm/composition/
mod.rs

1//! Language-neutral executable tool-composition contract.
2//!
3//! A composition run is a tiny program over already-typed tool bindings. The
4//! runtime must expose it as a parent run with child tool operations, not as an
5//! opaque "execute code" blob, so policy, transcript, replay, and host approval
6//! surfaces can keep reasoning about each child call normally.
7
8use std::collections::{BTreeMap, BTreeSet, HashMap};
9use std::sync::Arc;
10use std::time::Duration;
11
12use futures::stream::{FuturesUnordered, StreamExt};
13use serde_json::Value;
14use sha2::{Digest, Sha256};
15use tokio::sync::{OwnedSemaphorePermit, Semaphore};
16
17use crate::agent_events::{ToolCallErrorCategory, ToolCallStatus};
18use crate::tool_annotations::SideEffectLevel;
19use crate::value::{VmError, VmValue};
20use crate::vm::Vm;
21
22mod crystallization;
23mod events;
24mod harn_api;
25mod hosts;
26mod manifest;
27mod types;
28mod typescript;
29
30#[cfg(test)]
31mod tests;
32
33pub use crystallization::composition_crystallization_trace;
34pub use events::composition_report_events;
35pub use harn_api::composition_harn_api;
36pub use hosts::{ClosureCompositionToolHost, StaticCompositionToolHost};
37pub use manifest::{
38    binding_manifest_from_tool_surface, binding_manifest_hash, BindingManifest,
39    BindingManifestEntry, BindingManifestOptions, BindingPolicyDisposition, BindingPolicyStatus,
40    BINDING_MANIFEST_SCHEMA_VERSION,
41};
42pub use types::{
43    CompositionChildCall, CompositionChildResult, CompositionExecutionLimits,
44    CompositionExecutionReport, CompositionExecutionRequest, CompositionFailureCategory,
45    CompositionMcpPolicy, CompositionRetryPolicy, CompositionRunEnvelope, CompositionToolHost,
46    CompositionToolOutput, COMPOSITION_EXECUTION_SCHEMA_VERSION,
47};
48pub use typescript::composition_typescript_declarations;
49
50/// Stable digest for the prompt-visible snippet body.
51pub fn composition_snippet_hash(language: &str, snippet: &str) -> String {
52    let mut hasher = Sha256::new();
53    hasher.update(b"harn.composition.snippet.v1\0");
54    hasher.update(language.as_bytes());
55    hasher.update(b"\0");
56    hasher.update(snippet.as_bytes());
57    format!("sha256:{}", hex::encode(hasher.finalize()))
58}
59
60struct ExecutionState {
61    request: CompositionExecutionRequest,
62    calls: Vec<CompositionChildCall>,
63    results: Vec<CompositionChildResult>,
64    clock: Arc<dyn harn_clock::Clock>,
65    started_ms: i64,
66}
67
68impl ExecutionState {
69    fn next_call(
70        &mut self,
71        tool_name: &str,
72        input: Value,
73    ) -> Result<(BindingManifestEntry, CompositionChildCall), VmError> {
74        if self.results.len() as u64 >= self.request.limits.max_operations {
75            return Err(VmError::Runtime(format!(
76                "composition exceeded max_operations={}",
77                self.request.limits.max_operations
78            )));
79        }
80        if let Some(timeout_ms) = self.request.limits.timeout_ms {
81            if elapsed_ms(&*self.clock, self.started_ms) > timeout_ms {
82                return Err(VmError::Runtime(format!(
83                    "composition exceeded timeout_ms={timeout_ms}"
84                )));
85            }
86        }
87        let binding = self
88            .request
89            .manifest
90            .find_by_name(tool_name)
91            .or_else(|| self.request.manifest.find_by_binding(tool_name))
92            .cloned()
93            .ok_or_else(|| {
94                VmError::Runtime(format!("composition binding '{tool_name}' not found"))
95            })?;
96        let call = self.push_call(&binding, input);
97        if binding.policy.disposition == BindingPolicyDisposition::Denied {
98            let message = format!(
99                "composition binding '{}' denied{}",
100                binding.name,
101                binding
102                    .policy
103                    .reason
104                    .as_deref()
105                    .map(|reason| format!(": {reason}"))
106                    .unwrap_or_default()
107            );
108            self.push_failed_result(&call, &message, ToolCallErrorCategory::PermissionDenied);
109            return Err(VmError::Runtime(message));
110        }
111        if binding.policy.disposition == BindingPolicyDisposition::Gated {
112            let message = format!(
113                "composition binding '{}' requires approval and cannot run in read-only mode",
114                binding.name
115            );
116            self.push_failed_result(&call, &message, ToolCallErrorCategory::PermissionDenied);
117            return Err(VmError::Runtime(message));
118        }
119        if binding.side_effect_level.rank() > self.request.requested_side_effect_ceiling.rank() {
120            let message = format!(
121                "composition binding '{}' requires side-effect level '{}' above requested ceiling '{}'",
122                binding.name,
123                binding.side_effect_level.as_str(),
124                self.request.requested_side_effect_ceiling.as_str()
125            );
126            self.push_failed_result(&call, &message, ToolCallErrorCategory::PermissionDenied);
127            return Err(VmError::Runtime(message));
128        }
129        Ok((binding, call))
130    }
131
132    fn push_call(&mut self, binding: &BindingManifestEntry, input: Value) -> CompositionChildCall {
133        let operation_index = self.calls.len() as u64;
134        let call = CompositionChildCall {
135            run_id: self.request.run_id.clone(),
136            tool_call_id: format!("{}:{operation_index}", self.request.run_id),
137            tool_name: binding.name.clone(),
138            operation_index,
139            annotations: Some(binding.annotations.clone()),
140            requested_side_effect_level: binding.side_effect_level,
141            policy_context: serde_json::json!({
142                "disposition": binding.policy.disposition,
143                "reason": binding.policy.reason,
144                "ceiling": self.request.requested_side_effect_ceiling,
145            }),
146            raw_input: input,
147        };
148        self.calls.push(call.clone());
149        call
150    }
151
152    fn push_failed_result(
153        &mut self,
154        call: &CompositionChildCall,
155        message: &str,
156        category: ToolCallErrorCategory,
157    ) {
158        self.results.push(CompositionChildResult {
159            run_id: call.run_id.clone(),
160            tool_call_id: call.tool_call_id.clone(),
161            tool_name: call.tool_name.clone(),
162            operation_index: call.operation_index,
163            status: ToolCallStatus::Failed,
164            raw_output: None,
165            error: Some(message.to_string()),
166            error_category: Some(category),
167            executor: Some(crate::agent_events::ToolExecutor::HarnBuiltin),
168            duration_ms: Some(0),
169            execution_duration_ms: Some(0),
170            attempt: 1,
171            retry_attempts: 0,
172            retry_errors: Vec::new(),
173            retry_delays_ms: Vec::new(),
174        });
175    }
176
177    fn push_result(
178        &mut self,
179        call: &CompositionChildCall,
180        outcome: &CompositionDispatchOutcome,
181        elapsed_ms: u64,
182    ) {
183        if self
184            .results
185            .iter()
186            .any(|result| result.tool_call_id == call.tool_call_id)
187        {
188            return;
189        }
190        self.results.push(CompositionChildResult {
191            run_id: call.run_id.clone(),
192            tool_call_id: call.tool_call_id.clone(),
193            tool_name: call.tool_name.clone(),
194            operation_index: call.operation_index,
195            status: if outcome.output.error.is_some() {
196                ToolCallStatus::Failed
197            } else {
198                ToolCallStatus::Completed
199            },
200            raw_output: outcome.output.value.clone(),
201            error: outcome.output.error.clone(),
202            error_category: outcome.output.error_category,
203            executor: outcome.output.executor.clone(),
204            duration_ms: Some(elapsed_ms),
205            execution_duration_ms: Some(elapsed_ms),
206            attempt: outcome.attempt,
207            retry_attempts: outcome.retry_attempts,
208            retry_errors: outcome.retry_errors.clone(),
209            retry_delays_ms: outcome.retry_delays_ms.clone(),
210        });
211    }
212}
213
214#[derive(Clone)]
215struct CompositionRuntime {
216    state: Arc<parking_lot::Mutex<ExecutionState>>,
217    host: Arc<dyn CompositionToolHost>,
218    bulkheads: Arc<CompositionBulkheads>,
219}
220
221struct CompositionBulkheads {
222    global: Arc<Semaphore>,
223    per_server: parking_lot::Mutex<HashMap<String, Arc<Semaphore>>>,
224    per_server_limit: usize,
225}
226
227impl CompositionBulkheads {
228    fn new(limits: &CompositionExecutionLimits) -> Self {
229        Self {
230            global: Arc::new(Semaphore::new(
231                limits
232                    .max_concurrent_operations
233                    .clamp(1, Semaphore::MAX_PERMITS),
234            )),
235            per_server: parking_lot::Mutex::new(HashMap::new()),
236            per_server_limit: limits
237                .max_concurrent_per_server
238                .clamp(1, Semaphore::MAX_PERMITS),
239        }
240    }
241
242    async fn acquire(
243        &self,
244        binding: &BindingManifestEntry,
245    ) -> Result<(OwnedSemaphorePermit, Option<OwnedSemaphorePermit>), VmError> {
246        let global = self
247            .global
248            .clone()
249            .acquire_owned()
250            .await
251            .map_err(|_| VmError::Runtime("composition bulkhead closed".to_string()))?;
252        let server = mcp_server_name(binding);
253        let per_server = match server {
254            Some(server) => {
255                let semaphore = {
256                    let mut semaphores = self.per_server.lock();
257                    semaphores
258                        .entry(server)
259                        .or_insert_with(|| Arc::new(Semaphore::new(self.per_server_limit)))
260                        .clone()
261                };
262                Some(semaphore.acquire_owned().await.map_err(|_| {
263                    VmError::Runtime("composition per-server bulkhead closed".to_string())
264                })?)
265            }
266            None => None,
267        };
268        Ok((global, per_server))
269    }
270}
271
272struct CompositionDispatchOutcome {
273    output: CompositionToolOutput,
274    attempt: u32,
275    retry_attempts: u32,
276    retry_errors: Vec<String>,
277    retry_delays_ms: Vec<u64>,
278}
279
280/// Execute a read-only Harn-native composition snippet against a manifest.
281pub async fn execute_harn_composition(
282    mut request: CompositionExecutionRequest,
283    host: Arc<dyn CompositionToolHost>,
284) -> CompositionExecutionReport {
285    if request.run_id.trim().is_empty() {
286        request.run_id = uuid::Uuid::now_v7().to_string();
287    }
288    if request.language.trim().is_empty() {
289        request.language = "harn".to_string();
290    }
291    let manifest_hash = request
292        .manifest
293        .hash()
294        .unwrap_or_else(|_| "sha256:manifest_hash_error".to_string());
295    let snippet_hash = composition_snippet_hash(&request.language, &request.snippet);
296    let mut run = CompositionRunEnvelope::read_only(
297        request.run_id.clone(),
298        request.language.clone(),
299        snippet_hash,
300        manifest_hash,
301    );
302    let session_id = request.session_id.clone();
303    run.requested_side_effect_ceiling = request.requested_side_effect_ceiling;
304    run.metadata = request.metadata.clone();
305    if !run.metadata.is_object() {
306        run.metadata = Value::Object(serde_json::Map::new());
307    }
308    if let Some(session_id) = &session_id {
309        run.metadata["session_id"] = Value::String(session_id.clone());
310    }
311    let clock = harn_clock::RealClock::arc();
312    let started_ms = clock.monotonic_ms();
313
314    let result = if request.language != "harn" {
315        Err((
316            CompositionFailureCategory::UnsupportedLanguage,
317            format!("unsupported composition language '{}'", request.language),
318            Vec::new(),
319            Vec::new(),
320        ))
321    } else if request.requested_side_effect_ceiling.rank() > SideEffectLevel::ReadOnly.rank() {
322        Err((
323            CompositionFailureCategory::PolicyDenied,
324            "read-only composition executor refuses side-effect ceilings above read_only"
325                .to_string(),
326            Vec::new(),
327            Vec::new(),
328        ))
329    } else {
330        execute_harn_composition_inner(request, host).await
331    };
332
333    let report = match result {
334        Ok((value, stdout, calls, results)) => {
335            run.result = Some(value);
336            run.stdout = (!stdout.is_empty()).then_some(stdout);
337            run.duration_ms = Some(elapsed_ms(&*clock, started_ms));
338            CompositionExecutionReport {
339                schema_version: COMPOSITION_EXECUTION_SCHEMA_VERSION,
340                ok: true,
341                summary: format!(
342                    "composition completed with {} child operation(s)",
343                    results.len()
344                ),
345                run,
346                child_calls: calls,
347                child_results: results,
348            }
349        }
350        Err((category, error, calls, results)) => {
351            run.failure_category = Some(category);
352            run.error = Some(error.clone());
353            run.duration_ms = Some(elapsed_ms(&*clock, started_ms));
354            CompositionExecutionReport {
355                schema_version: COMPOSITION_EXECUTION_SCHEMA_VERSION,
356                ok: false,
357                summary: error,
358                run,
359                child_calls: calls,
360                child_results: results,
361            }
362        }
363    };
364    if let Some(session_id) = session_id {
365        events::emit_composition_report_events(&session_id, &report);
366    }
367    report
368}
369
370async fn execute_harn_composition_inner(
371    request: CompositionExecutionRequest,
372    host: Arc<dyn CompositionToolHost>,
373) -> Result<
374    (
375        Value,
376        String,
377        Vec<CompositionChildCall>,
378        Vec<CompositionChildResult>,
379    ),
380    (
381        CompositionFailureCategory,
382        String,
383        Vec<CompositionChildCall>,
384        Vec<CompositionChildResult>,
385    ),
386> {
387    let validation_source = composition_validation_source(&request.snippet);
388    let validation_program = harn_parser::parse_source(&validation_source).map_err(|error| {
389        (
390            CompositionFailureCategory::SchemaValidation,
391            format!("composition parse error: {error}"),
392            Vec::new(),
393            Vec::new(),
394        )
395    })?;
396    validate_composition_program(&validation_program, &request.manifest).map_err(|error| {
397        (
398            CompositionFailureCategory::PolicyDenied,
399            error,
400            Vec::new(),
401            Vec::new(),
402        )
403    })?;
404
405    let source = composition_source(&request.manifest, &request.snippet);
406    let program = harn_parser::parse_source(&source).map_err(|error| {
407        (
408            CompositionFailureCategory::SchemaValidation,
409            format!("composition parse error: {error}"),
410            Vec::new(),
411            Vec::new(),
412        )
413    })?;
414    let chunk = crate::Compiler::new()
415        .compile_named(&program, "main")
416        .map_err(|error| {
417            (
418                CompositionFailureCategory::SchemaValidation,
419                format!("composition compile error: {error}"),
420                Vec::new(),
421                Vec::new(),
422            )
423        })?;
424
425    let execution_clock = harn_clock::RealClock::arc();
426    let execution_started_ms = execution_clock.monotonic_ms();
427    let state = Arc::new(parking_lot::Mutex::new(ExecutionState {
428        request,
429        calls: Vec::new(),
430        results: Vec::new(),
431        clock: execution_clock,
432        started_ms: execution_started_ms,
433    }));
434    let mut vm = Vm::new();
435    crate::register_core_stdlib(&mut vm);
436    let limits = state.lock().request.limits.clone();
437    let runtime = CompositionRuntime {
438        state: state.clone(),
439        host,
440        bulkheads: Arc::new(CompositionBulkheads::new(&limits)),
441    };
442    register_composition_call_builtin(&mut vm, runtime.clone());
443    register_composition_map_bounded_builtin(&mut vm, runtime);
444    if let Some(timeout_ms) = state.lock().request.limits.timeout_ms {
445        vm.push_deadline_after(std::time::Duration::from_millis(timeout_ms));
446    }
447    vm.set_source_info("composition://snippet.harn", &source);
448    match vm.execute(&chunk).await {
449        Ok(value) => {
450            let json = crate::llm::vm_value_to_json(&value);
451            let stdout = vm.output().to_string();
452            let state = state.lock();
453            let result_size = serde_json::to_vec(&json)
454                .map(|bytes| bytes.len())
455                .unwrap_or(0);
456            let output_size = result_size.saturating_add(stdout.len());
457            if output_size as u64 > state.request.limits.max_output_bytes {
458                return Err((
459                    CompositionFailureCategory::ExecutionError,
460                    format!(
461                        "composition output exceeded max_output_bytes={}",
462                        state.request.limits.max_output_bytes
463                    ),
464                    state.calls.clone(),
465                    state.results.clone(),
466                ));
467            }
468            Ok((json, stdout, state.calls.clone(), state.results.clone()))
469        }
470        Err(error) => {
471            let state = state.lock();
472            let category = if error.to_string().contains("denied")
473                || error.to_string().contains("side-effect")
474                || error.to_string().contains("approval")
475            {
476                CompositionFailureCategory::PolicyDenied
477            } else if error.to_string().contains("Deadline exceeded")
478                || error.to_string().contains("max_operations")
479                || error.to_string().contains("timeout_ms")
480                || error.to_string().contains("max_output_bytes")
481            {
482                CompositionFailureCategory::Timeout
483            } else if state
484                .results
485                .iter()
486                .any(|result| result.status == ToolCallStatus::Failed)
487            {
488                CompositionFailureCategory::ChildToolError
489            } else {
490                CompositionFailureCategory::ExecutionError
491            };
492            Err((
493                category,
494                error.to_string(),
495                state.calls.clone(),
496                state.results.clone(),
497            ))
498        }
499    }
500}
501
502fn register_composition_call_builtin(vm: &mut Vm, runtime: CompositionRuntime) {
503    vm.register_async_builtin("__composition_call", move |_ctx, args| {
504        let runtime = runtime.clone();
505        async move {
506            let tool_name = args
507                .first()
508                .map(VmValue::display)
509                .ok_or_else(|| VmError::Runtime("__composition_call: missing tool name".into()))?;
510            let input = args
511                .get(1)
512                .map(crate::llm::vm_value_to_json)
513                .unwrap_or_else(|| serde_json::json!({}));
514            let (binding, call, clock) = {
515                let mut state = runtime.state.lock();
516                let (binding, call) = state.next_call(&tool_name, input.clone())?;
517                (binding, call, state.clock.clone())
518            };
519            let started_ms = clock.monotonic_ms();
520            let outcome = dispatch_binding_with_policy(&runtime, &binding, input).await?;
521            {
522                let mut state = runtime.state.lock();
523                state.push_result(&call, &outcome, elapsed_ms(&*clock, started_ms));
524            }
525            if let Some(error) = outcome.output.error {
526                return Err(VmError::Runtime(error));
527            }
528            Ok(crate::json_to_vm_value(
529                &outcome.output.value.unwrap_or(Value::Null),
530            ))
531        }
532    });
533}
534
535async fn dispatch_binding_with_policy(
536    runtime: &CompositionRuntime,
537    binding: &BindingManifestEntry,
538    input: Value,
539) -> Result<CompositionDispatchOutcome, VmError> {
540    let policy = runtime.state.lock().request.mcp_policy.clone();
541    let retry = policy.retry.clone();
542    let max_attempts = retry.max_attempts.max(1);
543    let can_retry = retry_allowed(binding, &input, &policy);
544    let mut attempt = 1u32;
545    let mut retry_errors = Vec::new();
546    let mut retry_delays_ms = Vec::new();
547
548    loop {
549        let (_global_permit, _server_permit) = runtime.bulkheads.acquire(binding).await?;
550        let call = runtime.host.call(binding, input.clone());
551        let mut output = if let Some(timeout_ms) = policy.call_timeout_ms.filter(|ms| *ms > 0) {
552            match tokio::time::timeout(Duration::from_millis(timeout_ms), call).await {
553                Ok(output) => output,
554                Err(_) => CompositionToolOutput::error(
555                    format!(
556                        "composition binding '{}' timed out after {timeout_ms}ms",
557                        binding.name
558                    ),
559                    ToolCallErrorCategory::Timeout,
560                ),
561            }
562        } else {
563            call.await
564        };
565        drop((_global_permit, _server_permit));
566
567        if output.error.is_none() {
568            if let Some(value) = output.value.take() {
569                match validate_binding_output(binding, value) {
570                    Ok(value) => output.value = Some(value),
571                    Err(message) => {
572                        output = CompositionToolOutput::error(
573                            message,
574                            ToolCallErrorCategory::SchemaValidation,
575                        );
576                    }
577                }
578            }
579        }
580
581        if output.error.is_none()
582            || attempt >= max_attempts
583            || !can_retry
584            || !is_retryable_child_error(&output)
585        {
586            return Ok(CompositionDispatchOutcome {
587                output,
588                attempt,
589                retry_attempts: attempt.saturating_sub(1),
590                retry_errors,
591                retry_delays_ms,
592            });
593        }
594
595        let error = output
596            .error
597            .clone()
598            .unwrap_or_else(|| "composition child call failed".to_string());
599        let delay_ms = compute_retry_delay_ms(binding, &input, attempt, &retry, &error);
600        retry_errors.push(error);
601        retry_delays_ms.push(delay_ms);
602        if delay_ms > 0 {
603            tokio::time::sleep(Duration::from_millis(delay_ms)).await;
604        }
605        attempt = attempt.saturating_add(1);
606    }
607}
608
609fn validate_binding_output(binding: &BindingManifestEntry, value: Value) -> Result<Value, String> {
610    let Some(schema) = &binding.output_schema else {
611        return Ok(value);
612    };
613    let value_vm = crate::json_to_vm_value(&value);
614    let schema_vm = crate::json_to_vm_value(schema);
615    crate::schema::schema_expect_value(&value_vm, &schema_vm, false)
616        .map(|value| crate::llm::vm_value_to_json(&value))
617        .map_err(|error| {
618            format!(
619                "composition binding '{}' outputSchema validation failed: {error}",
620                binding.name
621            )
622        })
623}
624
625fn retry_allowed(
626    binding: &BindingManifestEntry,
627    input: &Value,
628    policy: &CompositionMcpPolicy,
629) -> bool {
630    if idempotency_key_present(input) {
631        return true;
632    }
633    if binding.source == "mcp_server" {
634        if !mcp_binding_trusted(binding, policy) {
635            return false;
636        }
637        return binding.annotations.destructive_hint != Some(true)
638            && (binding.annotations.read_only_hint == Some(true)
639                || binding.annotations.idempotent_hint == Some(true));
640    }
641    binding.side_effect_level == SideEffectLevel::ReadOnly
642        && binding.annotations.kind.is_read_only()
643}
644
645fn mcp_binding_trusted(binding: &BindingManifestEntry, policy: &CompositionMcpPolicy) -> bool {
646    policy.trust_annotations
647        || mcp_server_name(binding)
648            .as_ref()
649            .is_some_and(|server| policy.trusted_servers.contains(server))
650}
651
652fn mcp_server_name(binding: &BindingManifestEntry) -> Option<String> {
653    binding
654        .metadata
655        .get("_mcp_server")
656        .or_else(|| binding.metadata.get("mcp_server"))
657        .or_else(|| binding.metadata.pointer("/server/name"))
658        .and_then(Value::as_str)
659        .filter(|server| !server.is_empty())
660        .map(ToOwned::to_owned)
661}
662
663fn idempotency_key_present(input: &Value) -> bool {
664    for pointer in [
665        "/idempotency_key",
666        "/idempotencyKey",
667        "/_idempotency_key",
668        "/_meta/idempotencyKey",
669        "/_meta/harn/idempotencyKey",
670    ] {
671        if input.pointer(pointer).is_some_and(|value| match value {
672            Value::String(value) => !value.trim().is_empty(),
673            Value::Null => false,
674            _ => true,
675        }) {
676            return true;
677        }
678    }
679    false
680}
681
682fn is_retryable_child_error(output: &CompositionToolOutput) -> bool {
683    if matches!(
684        output.error_category,
685        Some(
686            ToolCallErrorCategory::Network
687                | ToolCallErrorCategory::Timeout
688                | ToolCallErrorCategory::McpServerError
689        )
690    ) {
691        return true;
692    }
693    let Some(error) = &output.error else {
694        return false;
695    };
696    let error = error.to_ascii_lowercase();
697    [
698        "429",
699        "503",
700        "retry-after",
701        "rate limit",
702        "rate-limit",
703        "timeout",
704        "timed out",
705        "transient",
706        "overloaded",
707        "server closed connection",
708        "disconnected",
709        "mcp read error",
710        "mcp write error",
711        "connection reset",
712    ]
713    .iter()
714    .any(|needle| error.contains(needle))
715}
716
717fn compute_retry_delay_ms(
718    binding: &BindingManifestEntry,
719    input: &Value,
720    attempt: u32,
721    retry: &CompositionRetryPolicy,
722    error: &str,
723) -> u64 {
724    if retry.max_delay_ms == 0 {
725        return 0;
726    }
727    if retry.honor_retry_after {
728        if let Some(delay) = retry_after_ms_from_error(error) {
729            return delay.min(retry.max_delay_ms);
730        }
731    }
732    let shift = attempt.saturating_sub(1).min(20);
733    let multiplier = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
734    let base = retry.base_delay_ms.saturating_mul(multiplier);
735    if base == 0 {
736        return 0;
737    }
738    let jitter_span = (base / 2).max(1);
739    let mut hasher = Sha256::new();
740    hasher.update(binding.name.as_bytes());
741    hasher.update(b"\0");
742    hasher.update(attempt.to_le_bytes());
743    hasher.update(b"\0");
744    if let Ok(bytes) = serde_json::to_vec(input) {
745        hasher.update(bytes);
746    }
747    let digest = hasher.finalize();
748    let jitter = u64::from_le_bytes(digest[..8].try_into().unwrap_or([0; 8])) % (jitter_span + 1);
749    base.saturating_add(jitter).min(retry.max_delay_ms)
750}
751
752fn retry_after_ms_from_error(error: &str) -> Option<u64> {
753    let lower = error.to_ascii_lowercase();
754    let (_, tail) = lower.split_once("retry-after")?;
755    let value = tail
756        .trim_start_matches(|c: char| c == ':' || c == '=' || c.is_whitespace())
757        .split(|c: char| !c.is_ascii_digit())
758        .next()
759        .filter(|value| !value.is_empty())?;
760    value
761        .parse::<u64>()
762        .ok()
763        .map(|seconds| seconds.saturating_mul(1000))
764}
765
766fn register_composition_map_bounded_builtin(vm: &mut Vm, runtime: CompositionRuntime) {
767    vm.register_async_builtin("map_bounded", move |ctx, args| {
768        let runtime = runtime.clone();
769        async move {
770            let items = match args.first() {
771                Some(VmValue::List(items)) => items.as_ref().clone(),
772                Some(other) => {
773                    return Err(VmError::TypeError(format!(
774                        "map_bounded: first argument must be a list, got {}",
775                        other.type_name()
776                    )))
777                }
778                None => {
779                    return Err(VmError::Runtime(
780                        "map_bounded: first argument must be a list".to_string(),
781                    ))
782                }
783            };
784            let closure = match args.get(1) {
785                Some(VmValue::Closure(closure)) => closure.clone(),
786                Some(other) => {
787                    return Err(VmError::TypeError(format!(
788                        "map_bounded: second argument must be a closure, got {}",
789                        other.type_name()
790                    )))
791                }
792                None => {
793                    return Err(VmError::Runtime(
794                        "map_bounded: second argument must be a closure".to_string(),
795                    ))
796                }
797            };
798            let options = args
799                .get(2)
800                .map(crate::llm::vm_value_to_json)
801                .unwrap_or_else(|| serde_json::json!({}));
802            let default_cap = runtime
803                .state
804                .lock()
805                .request
806                .limits
807                .max_concurrent_operations
808                .max(1);
809            let cap = options
810                .get("concurrency")
811                .or_else(|| options.get("max_concurrent"))
812                .and_then(Value::as_u64)
813                .map(|value| value.max(1) as usize)
814                .unwrap_or(default_cap)
815                .min(items.len().max(1));
816
817            let total = items.len();
818            let mut pending = items.into_iter().enumerate();
819            let mut in_flight = FuturesUnordered::new();
820            let mut results: Vec<Option<VmValue>> = vec![None; total];
821            let mut succeeded = 0i64;
822            let mut failed = 0i64;
823
824            while in_flight.len() < cap {
825                let Some((index, item)) = pending.next() else {
826                    break;
827                };
828                in_flight.push(run_map_bounded_item(
829                    ctx.clone(),
830                    closure.clone(),
831                    index,
832                    item,
833                ));
834            }
835            while let Some((index, output, result)) = in_flight.next().await {
836                ctx.forward_output(&output);
837                match result {
838                    Ok(value) => {
839                        succeeded += 1;
840                        results[index] = Some(VmValue::enum_variant("Result", "Ok", vec![value]));
841                    }
842                    Err(error) => {
843                        failed += 1;
844                        results[index] = Some(VmValue::enum_variant(
845                            "Result",
846                            "Err",
847                            vec![VmValue::String(std::sync::Arc::from(error.to_string()))],
848                        ));
849                    }
850                }
851                if let Some((next_index, next_item)) = pending.next() {
852                    in_flight.push(run_map_bounded_item(
853                        ctx.clone(),
854                        closure.clone(),
855                        next_index,
856                        next_item,
857                    ));
858                }
859            }
860
861            let mut dict = BTreeMap::new();
862            dict.insert(
863                "results".to_string(),
864                VmValue::List(std::sync::Arc::new(
865                    results
866                        .into_iter()
867                        .map(|value| {
868                            value.unwrap_or_else(|| {
869                                VmValue::enum_variant(
870                                    "Result",
871                                    "Err",
872                                    vec![VmValue::String(std::sync::Arc::from(
873                                        "map_bounded: task did not produce a result",
874                                    ))],
875                                )
876                            })
877                        })
878                        .collect(),
879                )),
880            );
881            dict.insert("succeeded".to_string(), VmValue::Int(succeeded));
882            dict.insert("failed".to_string(), VmValue::Int(failed));
883            Ok(VmValue::Dict(std::sync::Arc::new(dict)))
884        }
885    });
886}
887
888async fn run_map_bounded_item(
889    ctx: crate::vm::AsyncBuiltinCtx,
890    closure: std::sync::Arc<crate::VmClosure>,
891    index: usize,
892    item: VmValue,
893) -> (usize, String, Result<VmValue, VmError>) {
894    let mut vm = ctx.child_vm();
895    let result = vm.call_closure_pub(&closure, &[item]).await;
896    let output = vm.take_output();
897    (index, output, result)
898}
899
900fn elapsed_ms(clock: &dyn harn_clock::Clock, started_ms: i64) -> u64 {
901    clock.monotonic_ms().saturating_sub(started_ms).max(0) as u64
902}
903
904fn composition_validation_source(snippet: &str) -> String {
905    let mut source = String::from("pipeline main() {\n");
906    source.push_str(snippet);
907    if !snippet.ends_with('\n') {
908        source.push('\n');
909    }
910    source.push_str("}\n");
911    source
912}
913
914fn composition_source(manifest: &BindingManifest, snippet: &str) -> String {
915    let mut source = String::new();
916    for binding in &manifest.bindings {
917        source.push_str(&format!(
918            "fn {}(args = {{}}) {{ return __composition_call(\"{}\", args) }}\n",
919            binding.binding,
920            escape_harn_string(&binding.name)
921        ));
922    }
923    source.push_str("pipeline main() {\n");
924    source.push_str(snippet);
925    if !snippet.ends_with('\n') {
926        source.push('\n');
927    }
928    source.push_str("}\n");
929    source
930}
931
932fn escape_harn_string(value: &str) -> String {
933    value.replace('\\', "\\\\").replace('"', "\\\"")
934}
935
936fn validate_composition_program(
937    program: &[harn_parser::SNode],
938    manifest: &BindingManifest,
939) -> Result<(), String> {
940    use harn_parser::visit::walk_program;
941    use harn_parser::Node;
942
943    let bindings = manifest
944        .bindings
945        .iter()
946        .map(|entry| entry.binding.clone())
947        .collect::<BTreeSet<_>>();
948    let mut local_functions = BTreeSet::from(["__composition_call".to_string()]);
949    walk_program(program, &mut |node| {
950        if let Node::FnDecl { name, .. } = &node.node {
951            local_functions.insert(name.clone());
952        }
953    });
954
955    let mut error = None;
956    walk_program(program, &mut |node| {
957        if error.is_some() {
958            return;
959        }
960        match &node.node {
961            Node::ImportDecl { .. } | Node::SelectiveImport { .. } => {
962                error = Some("composition snippets cannot import modules".to_string());
963            }
964            Node::SpawnExpr { .. } | Node::Parallel { .. } => {
965                error = Some("composition snippets cannot spawn or parallelize work".to_string());
966            }
967            Node::HitlExpr { .. } => {
968                error = Some("composition snippets cannot request HITL directly".to_string());
969            }
970            Node::CostRoute { .. } => {
971                error = Some("composition snippets cannot open LLM routing blocks".to_string());
972            }
973            Node::FunctionCall { name, .. } => {
974                if DENIED_COMPOSITION_CALLS.contains(&name.as_str()) && !bindings.contains(name) {
975                    error = Some(format!("composition snippets cannot call `{name}`"));
976                } else if !bindings.contains(name)
977                    && !local_functions.contains(name)
978                    && !PURE_COMPOSITION_CALLS.contains(&name.as_str())
979                {
980                    error = Some(format!(
981                        "composition call target `{name}` is not a manifest binding or pure helper"
982                    ));
983                }
984            }
985            _ => {}
986        }
987    });
988    error.map_or(Ok(()), Err)
989}
990
991const DENIED_COMPOSITION_CALLS: &[&str] = &[
992    "append_file",
993    "ask_user",
994    "connector_call",
995    "copy_file",
996    "delete_file",
997    "dual_control",
998    "escalate_to",
999    "event_log_emit",
1000    "event_log.emit",
1001    "exec",
1002    "host_call",
1003    "host_tool_call",
1004    "http_delete",
1005    "http_download",
1006    "http_get",
1007    "http_patch",
1008    "http_post",
1009    "http_put",
1010    "http_request",
1011    "llm_call",
1012    "mcp_call",
1013    "mcp_connect",
1014    "pg_execute",
1015    "pg_query",
1016    "request_approval",
1017    "secret_get",
1018    "write_file",
1019];
1020
1021const PURE_COMPOSITION_CALLS: &[&str] = &[
1022    "Ok",
1023    "Err",
1024    "abs",
1025    "assert",
1026    "assert_eq",
1027    "assert_ne",
1028    "base64_decode",
1029    "base64_encode",
1030    "ceil",
1031    "contains",
1032    "dedup_by",
1033    "dirname",
1034    "entries",
1035    "ends_with",
1036    "flat_map",
1037    "floor",
1038    "format",
1039    "group_by",
1040    "hash_value",
1041    "hex_decode",
1042    "hex_encode",
1043    "is_err",
1044    "is_ok",
1045    "join",
1046    "jq",
1047    "jq_first",
1048    "json_extract",
1049    "json_parse",
1050    "json_pointer",
1051    "json_stringify",
1052    "keys",
1053    "len",
1054    "lower",
1055    "map_bounded",
1056    "parse_float_or",
1057    "parse_int_or",
1058    "split",
1059    "starts_with",
1060    "to_float",
1061    "to_int",
1062    "to_string",
1063    "trim",
1064    "upper",
1065    "values",
1066];
1067
1068pub fn composition_search_examples(query: &str, limit: usize) -> Value {
1069    let mut examples = vec![
1070        serde_json::json!({
1071            "id": "read-summarize",
1072            "title": "Read two files and return a compact summary",
1073            "language": "harn",
1074            "snippet": "let readme = read_file({path: \"README.md\"})\nlet spec = read_file({path: \"spec/HARN_SPEC.md\", limit: 80})\nreturn {readme: readme, spec_excerpt: spec}",
1075            "required_side_effect_level": "read_only",
1076            "tools": ["read_file"]
1077        }),
1078        serde_json::json!({
1079            "id": "search-then-read",
1080            "title": "Search first, then read the best candidate",
1081            "language": "harn",
1082            "snippet": "let hits = search({query: \"CompositionRunEnvelope\"})\nreturn hits",
1083            "required_side_effect_level": "read_only",
1084            "tools": ["search"]
1085        }),
1086    ];
1087    if !query.trim().is_empty() {
1088        let q = query.to_ascii_lowercase();
1089        examples.retain(|example| {
1090            example
1091                .to_string()
1092                .to_ascii_lowercase()
1093                .contains(q.as_str())
1094        });
1095    }
1096    examples.truncate(limit.max(1));
1097    Value::Array(examples)
1098}
1099
1100pub fn register_composition_builtins(vm: &mut Vm) {
1101    vm.register_builtin("composition_binding_manifest", |args, _out| {
1102        let tools = args
1103            .first()
1104            .map(crate::llm::vm_value_to_json)
1105            .unwrap_or(Value::Null);
1106        let options_json = args
1107            .get(1)
1108            .map(crate::llm::vm_value_to_json)
1109            .unwrap_or(Value::Null);
1110        let mut options = BindingManifestOptions::default();
1111        if let Some(ceiling) = options_json
1112            .get("side_effect_ceiling")
1113            .and_then(Value::as_str)
1114        {
1115            options.side_effect_ceiling = SideEffectLevel::parse(ceiling);
1116        }
1117        if let Some(include_denied) = options_json.get("include_denied").and_then(Value::as_bool) {
1118            options.include_denied = include_denied;
1119        }
1120        options.denied_tools = string_set_option(&options_json, "denied_tools");
1121        options.gated_tools = string_set_option(&options_json, "gated_tools");
1122        let manifest = binding_manifest_from_tool_surface(&tools, options);
1123        let value = if options_json.get("form").and_then(Value::as_str) == Some("compact") {
1124            manifest.to_compact_value()
1125        } else {
1126            manifest.to_value()
1127        };
1128        Ok(crate::json_to_vm_value(&value))
1129    });
1130
1131    vm.register_builtin("composition_search_examples", |args, _out| {
1132        let query = args.first().map(VmValue::display).unwrap_or_default();
1133        let limit = args
1134            .get(1)
1135            .and_then(|value| match value {
1136                VmValue::Int(n) => Some((*n).max(1) as usize),
1137                _ => None,
1138            })
1139            .unwrap_or(10);
1140        Ok(crate::json_to_vm_value(&composition_search_examples(
1141            &query, limit,
1142        )))
1143    });
1144
1145    vm.register_builtin("composition_typescript_declarations", |args, _out| {
1146        let manifest_value = args
1147            .first()
1148            .map(crate::llm::vm_value_to_json)
1149            .ok_or_else(|| {
1150                VmError::Runtime("composition_typescript_declarations: manifest is required".into())
1151            })?;
1152        let manifest: BindingManifest =
1153            serde_json::from_value(manifest_value).map_err(|error| {
1154                VmError::Runtime(format!(
1155                    "composition_typescript_declarations: invalid manifest: {error}"
1156                ))
1157            })?;
1158        Ok(VmValue::String(std::sync::Arc::from(
1159            composition_typescript_declarations(&manifest),
1160        )))
1161    });
1162
1163    vm.register_builtin("composition_harn_api", |args, _out| {
1164        let manifest_value = args
1165            .first()
1166            .map(crate::llm::vm_value_to_json)
1167            .ok_or_else(|| VmError::Runtime("composition_harn_api: manifest is required".into()))?;
1168        let manifest: BindingManifest =
1169            serde_json::from_value(manifest_value).map_err(|error| {
1170                VmError::Runtime(format!("composition_harn_api: invalid manifest: {error}"))
1171            })?;
1172        Ok(VmValue::String(std::sync::Arc::from(composition_harn_api(
1173            &manifest,
1174        ))))
1175    });
1176
1177    vm.register_builtin("composition_crystallization_trace", |args, _out| {
1178        let report_value = args
1179            .first()
1180            .map(crate::llm::vm_value_to_json)
1181            .ok_or_else(|| {
1182                VmError::Runtime("composition_crystallization_trace: report is required".into())
1183            })?;
1184        let report: CompositionExecutionReport =
1185            serde_json::from_value(report_value).map_err(|error| {
1186                VmError::Runtime(format!(
1187                    "composition_crystallization_trace: invalid report: {error}"
1188                ))
1189            })?;
1190        let options = args
1191            .get(1)
1192            .map(crate::llm::vm_value_to_json)
1193            .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
1194        Ok(crate::json_to_vm_value(&composition_crystallization_trace(
1195            &report, &options,
1196        )))
1197    });
1198
1199    vm.register_async_builtin("composition_execute", |ctx, args| async move {
1200        let snippet = args
1201            .first()
1202            .map(VmValue::display)
1203            .ok_or_else(|| VmError::Runtime("composition_execute: snippet is required".into()))?;
1204        let manifest_value = args
1205            .get(1)
1206            .map(crate::llm::vm_value_to_json)
1207            .ok_or_else(|| VmError::Runtime("composition_execute: manifest is required".into()))?;
1208        let dispatcher = args.get(2).and_then(|value| match value {
1209            VmValue::Closure(closure) => Some((**closure).clone()),
1210            VmValue::Dict(dict) => match dict.get("dispatcher") {
1211                Some(VmValue::Closure(closure)) => Some((**closure).clone()),
1212                _ => None,
1213            },
1214            _ => None,
1215        });
1216        let mut request = CompositionExecutionRequest {
1217            snippet,
1218            manifest: serde_json::from_value(manifest_value).map_err(|error| {
1219                VmError::Runtime(format!("composition_execute: invalid manifest: {error}"))
1220            })?,
1221            ..CompositionExecutionRequest::default()
1222        };
1223        if let Some(options) = args.get(2).map(crate::llm::vm_value_to_json) {
1224            if let Some(session_id) = options.get("session_id").and_then(Value::as_str) {
1225                request.session_id = Some(session_id.to_string());
1226            }
1227            if let Some(run_id) = options.get("run_id").and_then(Value::as_str) {
1228                request.run_id = run_id.to_string();
1229            }
1230            if let Some(max_operations) = options.get("max_operations").and_then(Value::as_u64) {
1231                request.limits.max_operations = max_operations;
1232            }
1233            if let Some(timeout_ms) = options.get("timeout_ms").and_then(Value::as_u64) {
1234                request.limits.timeout_ms = Some(timeout_ms);
1235            }
1236            if let Some(max_output_bytes) = options.get("max_output_bytes").and_then(Value::as_u64)
1237            {
1238                request.limits.max_output_bytes = max_output_bytes;
1239            }
1240            if let Some(max_concurrent) = options
1241                .get("max_concurrent_operations")
1242                .or_else(|| options.get("max_concurrent"))
1243                .and_then(Value::as_u64)
1244            {
1245                request.limits.max_concurrent_operations =
1246                    usize::try_from(max_concurrent).unwrap_or(usize::MAX).max(1);
1247            }
1248            if let Some(per_server) = options
1249                .get("max_concurrent_per_server")
1250                .or_else(|| options.get("per_server_concurrency"))
1251                .and_then(Value::as_u64)
1252            {
1253                request.limits.max_concurrent_per_server =
1254                    usize::try_from(per_server).unwrap_or(usize::MAX).max(1);
1255            }
1256            let trusted_servers = string_set_option(&options, "trusted_servers");
1257            let trusted_mcp_servers = string_set_option(&options, "trusted_mcp_servers");
1258            if !trusted_servers.is_empty() || !trusted_mcp_servers.is_empty() {
1259                request
1260                    .mcp_policy
1261                    .trusted_servers
1262                    .extend(trusted_servers.into_iter().chain(trusted_mcp_servers));
1263            }
1264            if let Some(trust_annotations) = options
1265                .get("trust_annotations")
1266                .or_else(|| options.get("trust_mcp_annotations"))
1267                .and_then(Value::as_bool)
1268            {
1269                request.mcp_policy.trust_annotations = trust_annotations;
1270            }
1271            if let Some(call_timeout_ms) = options.get("call_timeout_ms").and_then(Value::as_u64) {
1272                request.mcp_policy.call_timeout_ms = Some(call_timeout_ms);
1273            }
1274            if let Some(retry_options) = options.get("retry") {
1275                if let Some(max_attempts) =
1276                    retry_options.get("max_attempts").and_then(Value::as_u64)
1277                {
1278                    request.mcp_policy.retry.max_attempts =
1279                        u32::try_from(max_attempts).unwrap_or(u32::MAX).max(1);
1280                }
1281                if let Some(base_delay_ms) =
1282                    retry_options.get("base_delay_ms").and_then(Value::as_u64)
1283                {
1284                    request.mcp_policy.retry.base_delay_ms = base_delay_ms;
1285                }
1286                if let Some(max_delay_ms) =
1287                    retry_options.get("max_delay_ms").and_then(Value::as_u64)
1288                {
1289                    request.mcp_policy.retry.max_delay_ms = max_delay_ms;
1290                }
1291                if let Some(honor_retry_after) = retry_options
1292                    .get("honor_retry_after")
1293                    .and_then(Value::as_bool)
1294                {
1295                    request.mcp_policy.retry.honor_retry_after = honor_retry_after;
1296                }
1297            }
1298        }
1299        let host: Arc<dyn CompositionToolHost> = match dispatcher {
1300            Some(closure) => Arc::new(ClosureCompositionToolHost::new(closure, ctx.clone())),
1301            None => Arc::new(StaticCompositionToolHost::new(BTreeMap::new())),
1302        };
1303        let report = execute_harn_composition(request, host).await;
1304        Ok(crate::json_to_vm_value(
1305            &serde_json::to_value(report).unwrap_or_else(|_| serde_json::json!({"ok": false})),
1306        ))
1307    });
1308}
1309
1310fn string_set_option(value: &Value, key: &str) -> BTreeSet<String> {
1311    value
1312        .get(key)
1313        .and_then(Value::as_array)
1314        .map(|items| {
1315            items
1316                .iter()
1317                .filter_map(Value::as_str)
1318                .map(ToOwned::to_owned)
1319                .collect()
1320        })
1321        .unwrap_or_default()
1322}