Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24    WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45    pub(crate) instance_key: String,
46    pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52    Question,
53    Approval,
54    DualControl,
55    Escalation,
56}
57
58impl HitlRequestKind {
59    pub(crate) fn as_str(self) -> &'static str {
60        match self {
61            Self::Question => "question",
62            Self::Approval => "approval",
63            Self::DualControl => "dual_control",
64            Self::Escalation => "escalation",
65        }
66    }
67
68    fn topic(self) -> &'static str {
69        match self {
70            Self::Question => HITL_QUESTIONS_TOPIC,
71            Self::Approval => HITL_APPROVALS_TOPIC,
72            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73            Self::Escalation => HITL_ESCALATIONS_TOPIC,
74        }
75    }
76
77    fn request_event_kind(self) -> &'static str {
78        match self {
79            Self::Question => "hitl.question_asked",
80            Self::Approval => "hitl.approval_requested",
81            Self::DualControl => "hitl.dual_control_requested",
82            Self::Escalation => "hitl.escalation_issued",
83        }
84    }
85
86    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87        if request_id.starts_with("hitl_question_") {
88            Some(Self::Question)
89        } else if request_id.starts_with("hitl_approval_") {
90            Some(Self::Approval)
91        } else if request_id.starts_with("hitl_dual_control_") {
92            Some(Self::DualControl)
93        } else if request_id.starts_with("hitl_escalation_") {
94            Some(Self::Escalation)
95        } else {
96            None
97        }
98    }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103    pub request_id: String,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub answer: Option<JsonValue>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub approved: Option<bool>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub accepted: Option<bool>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub reviewer: Option<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub reason: Option<String>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub metadata: Option<JsonValue>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub responded_at: Option<String>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124    request_id: String,
125    kind: HitlRequestKind,
126    #[serde(default)]
127    agent: String,
128    trace_id: String,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    run_id: Option<String>,
131    requested_at: String,
132    payload: JsonValue,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136struct HitlTimeoutRecord {
137    request_id: String,
138    kind: HitlRequestKind,
139    trace_id: String,
140    timed_out_at: String,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
144pub struct ApprovalRequest {
145    pub id: String,
146    pub action: String,
147    #[serde(default)]
148    pub args: JsonValue,
149    pub principal: String,
150    pub requested_at: String,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub deadline: Option<String>,
153    pub approvers_required: u32,
154    #[serde(default)]
155    pub evidence_refs: Vec<JsonValue>,
156    #[serde(default)]
157    pub undo_metadata: JsonValue,
158    #[serde(default)]
159    pub capabilities_requested: Vec<String>,
160}
161
162impl ApprovalRequest {
163    pub fn new(
164        id: impl Into<String>,
165        action: impl Into<String>,
166        args: JsonValue,
167        principal: impl Into<String>,
168        requested_at: impl Into<String>,
169    ) -> Self {
170        Self {
171            id: id.into(),
172            action: action.into(),
173            args,
174            principal: principal.into(),
175            requested_at: requested_at.into(),
176            deadline: None,
177            approvers_required: 1,
178            evidence_refs: Vec::new(),
179            undo_metadata: JsonValue::Null,
180            capabilities_requested: Vec::new(),
181        }
182    }
183}
184
185pub(crate) fn approval_request_for_host_permission(
186    id: impl Into<String>,
187    action: impl Into<String>,
188    args: JsonValue,
189    principal: impl Into<String>,
190    evidence_refs: Vec<JsonValue>,
191    undo_metadata: JsonValue,
192    capabilities_requested: Vec<String>,
193) -> ApprovalRequest {
194    let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
195    request.evidence_refs = evidence_refs;
196    request.undo_metadata = undo_metadata;
197    request.capabilities_requested = capabilities_requested;
198    request
199}
200
201#[derive(Clone, Debug)]
202struct DispatchKeys {
203    instance_key: String,
204    stable_base: String,
205    agent: String,
206    trace_id: String,
207}
208
209#[derive(Clone, Debug)]
210struct AskUserOptions {
211    schema: Option<VmValue>,
212    timeout: Option<StdDuration>,
213    default: Option<VmValue>,
214}
215
216#[derive(Clone, Debug)]
217struct ApprovalOptions {
218    detail: Option<VmValue>,
219    args: Option<VmValue>,
220    quorum: u32,
221    reviewers: Vec<String>,
222    deadline: StdDuration,
223    principal: Option<String>,
224    evidence_refs: Vec<JsonValue>,
225    undo_metadata: Option<JsonValue>,
226    capabilities_requested: Vec<String>,
227}
228
229#[derive(Clone, Debug)]
230struct ApprovalProgress {
231    reviewers: BTreeSet<String>,
232    signatures: Vec<ApprovalSignature>,
233    reason: Option<String>,
234    approved_at: Option<String>,
235}
236
237#[derive(Clone, Debug, Serialize)]
238struct ApprovalSignature {
239    reviewer: String,
240    signed_at: String,
241    signature: String,
242}
243
244#[derive(Clone, Debug)]
245enum ApprovalResolution {
246    Pending,
247    Approved(ApprovalProgress),
248    Denied(HitlHostResponse),
249}
250
251#[derive(Clone, Debug)]
252enum WaitpointOutcome {
253    Completed(WaitpointRecord),
254    Timeout,
255    Cancelled {
256        wait_id: String,
257        waitpoint_ids: Vec<String>,
258        reason: Option<String>,
259    },
260}
261
262pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
263    vm.register_async_builtin("ask_user", |args| {
264        Box::pin(async move { ask_user_impl(&args).await })
265    });
266
267    vm.register_async_builtin("request_approval", |args| {
268        Box::pin(async move { request_approval_impl(&args).await })
269    });
270
271    vm.register_async_builtin("dual_control", |args| {
272        Box::pin(async move { dual_control_impl(&args).await })
273    });
274
275    vm.register_async_builtin("escalate_to", |args| {
276        Box::pin(async move { escalate_to_impl(&args).await })
277    });
278}
279
280pub(crate) fn reset_hitl_state() {
281    REQUEST_SEQUENCE.with(|slot| {
282        *slot.borrow_mut() = RequestSequenceState::default();
283    });
284}
285
286pub(crate) fn take_hitl_state() -> RequestSequenceState {
287    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
288}
289
290pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
291    REQUEST_SEQUENCE.with(|slot| {
292        *slot.borrow_mut() = state;
293    });
294}
295
296pub async fn append_hitl_response(
297    base_dir: Option<&Path>,
298    mut response: HitlHostResponse,
299) -> Result<u64, String> {
300    let kind = HitlRequestKind::from_request_id(&response.request_id)
301        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
302    if response.responded_at.is_none() {
303        response.responded_at = Some(now_rfc3339());
304    }
305    let log = ensure_hitl_event_log_for(base_dir)?;
306    let headers = response_headers(&response.request_id);
307    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
308    let event_id = log
309        .append(
310            &topic,
311            LogEvent::new(
312                match kind {
313                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
314                    _ => "hitl.response_received",
315                },
316                serde_json::to_value(&response).map_err(|error| error.to_string())?,
317            )
318            .with_headers(headers),
319        )
320        .await
321        .map_err(|error| error.to_string())?;
322    finalize_hitl_response(&log, kind, &response).await?;
323    Ok(event_id)
324}
325
326pub async fn append_approval_request_on(
327    log: &Arc<AnyEventLog>,
328    agent: impl Into<String>,
329    trace_id: impl Into<String>,
330    action: impl Into<String>,
331    detail: JsonValue,
332    reviewers: Vec<String>,
333) -> Result<String, VmError> {
334    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
335    let trace_id = trace_id.into();
336    let agent = agent.into();
337    let requested_at_time = OffsetDateTime::now_utc();
338    let requested_at = format_rfc3339(requested_at_time);
339    let mut approval_request = ApprovalRequest::new(
340        request_id.clone(),
341        action.into(),
342        detail.clone(),
343        agent.clone(),
344        requested_at.clone(),
345    );
346    approval_request.deadline = deadline_after(
347        requested_at_time,
348        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
349    );
350    approval_request.approvers_required = 1;
351    let approval_request_json = serde_json::to_value(&approval_request)
352        .map_err(|error| VmError::Runtime(error.to_string()))?;
353    let request = HitlRequestEnvelope {
354        request_id: request_id.clone(),
355        kind: HitlRequestKind::Approval,
356        agent,
357        trace_id: trace_id.clone(),
358        run_id: None,
359        requested_at: requested_at.clone(),
360        payload: json!({
361            "approval_request": approval_request_json,
362            "id": approval_request.id,
363            "action": approval_request.action,
364            "args": approval_request.args,
365            "principal": approval_request.principal,
366            "requested_at": requested_at,
367            "deadline": approval_request.deadline,
368            "approvers_required": approval_request.approvers_required,
369            "evidence_refs": approval_request.evidence_refs,
370            "undo_metadata": approval_request.undo_metadata,
371            "capabilities_requested": approval_request.capabilities_requested,
372            "detail": detail,
373            "quorum": 1,
374            "reviewers": reviewers,
375            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
376        }),
377    };
378    create_request_waitpoint(log, &request).await?;
379    append_request(log, &request).await?;
380    maybe_notify_host(&request);
381    Ok(request_id)
382}
383
384async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
385    let prompt = required_string_arg(args, 0, "ask_user")?;
386    let options = parse_ask_user_options(args.get(1))?;
387    let keys = current_dispatch_keys();
388    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
389    let trace_id = keys
390        .as_ref()
391        .map(|keys| keys.trace_id.clone())
392        .unwrap_or_else(new_trace_id);
393    let log = ensure_hitl_event_log();
394    let request = HitlRequestEnvelope {
395        request_id: request_id.clone(),
396        kind: HitlRequestKind::Question,
397        agent: keys
398            .as_ref()
399            .map(|keys| keys.agent.clone())
400            .unwrap_or_default(),
401        trace_id: trace_id.clone(),
402        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
403        requested_at: now_rfc3339(),
404        payload: json!({
405            "prompt": prompt,
406            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
407            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
408            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
409        }),
410    };
411    create_request_waitpoint(&log, &request).await?;
412    append_request(&log, &request).await?;
413    maybe_notify_host(&request);
414    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
415
416    match wait_for_request_waitpoint(&request_id, options.timeout).await? {
417        WaitpointOutcome::Completed(record) => {
418            let answer = record
419                .value
420                .as_ref()
421                .map(crate::stdlib::json_to_vm_value)
422                .unwrap_or(VmValue::Nil);
423            if let Some(schema) = options.schema.as_ref() {
424                return schema_expect_value(&answer, schema, true);
425            }
426            if let Some(default) = options.default.as_ref() {
427                return Ok(coerce_like_default(&answer, default));
428            }
429            Ok(answer)
430        }
431        WaitpointOutcome::Timeout => {
432            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
433            if let Some(default) = options.default {
434                return Ok(default);
435            }
436            Err(timeout_error(&request_id, HitlRequestKind::Question))
437        }
438        WaitpointOutcome::Cancelled {
439            wait_id,
440            waitpoint_ids,
441            reason,
442        } => Err(hitl_cancelled_error(
443            &request_id,
444            HitlRequestKind::Question,
445            &wait_id,
446            &waitpoint_ids,
447            reason,
448        )),
449    }
450}
451
452async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
453    let action = required_string_arg(args, 0, "request_approval")?;
454    let options = parse_approval_options(args.get(1), "request_approval")?;
455    let keys = current_dispatch_keys();
456    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
457    let trace_id = keys
458        .as_ref()
459        .map(|keys| keys.trace_id.clone())
460        .unwrap_or_else(new_trace_id);
461    let agent = keys
462        .as_ref()
463        .map(|keys| keys.agent.clone())
464        .unwrap_or_default();
465    let requested_at_time = OffsetDateTime::now_utc();
466    let requested_at = format_rfc3339(requested_at_time);
467    let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
468    let approval_args = options
469        .args
470        .as_ref()
471        .or(options.detail.as_ref())
472        .map(crate::llm::vm_value_to_json)
473        .unwrap_or(JsonValue::Null);
474    let mut approval_request = ApprovalRequest::new(
475        request_id.clone(),
476        action.clone(),
477        approval_args,
478        principal,
479        requested_at.clone(),
480    );
481    approval_request.deadline = deadline_after(requested_at_time, options.deadline);
482    approval_request.approvers_required = options.quorum;
483    approval_request.evidence_refs = options.evidence_refs.clone();
484    approval_request.undo_metadata = options
485        .undo_metadata
486        .clone()
487        .or_else(|| {
488            crate::orchestration::current_mutation_session()
489                .and_then(|session| serde_json::to_value(session).ok())
490        })
491        .unwrap_or(JsonValue::Null);
492    approval_request.capabilities_requested = options.capabilities_requested.clone();
493    let approval_request_json = serde_json::to_value(&approval_request)
494        .map_err(|error| VmError::Runtime(error.to_string()))?;
495    let log = ensure_hitl_event_log();
496    let request = HitlRequestEnvelope {
497        request_id: request_id.clone(),
498        kind: HitlRequestKind::Approval,
499        agent,
500        trace_id: trace_id.clone(),
501        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
502        requested_at: requested_at.clone(),
503        payload: json!({
504            "approval_request": approval_request_json,
505            "id": approval_request.id,
506            "action": action,
507            "args": approval_request.args,
508            "principal": approval_request.principal,
509            "requested_at": requested_at,
510            "deadline": approval_request.deadline,
511            "approvers_required": approval_request.approvers_required,
512            "evidence_refs": approval_request.evidence_refs,
513            "undo_metadata": approval_request.undo_metadata,
514            "capabilities_requested": approval_request.capabilities_requested,
515            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
516            "quorum": options.quorum,
517            "reviewers": options.reviewers,
518            "deadline_ms": options.deadline.as_millis() as u64,
519        }),
520    };
521    create_request_waitpoint(&log, &request).await?;
522    append_request(&log, &request).await?;
523    maybe_notify_host(&request);
524    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
525
526    match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
527        WaitpointOutcome::Completed(record) => {
528            approval_record_from_waitpoint(&record, "request_approval")
529        }
530        WaitpointOutcome::Timeout => {
531            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
532            Err(timeout_error(&request_id, HitlRequestKind::Approval))
533        }
534        WaitpointOutcome::Cancelled { .. } => {
535            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
536        }
537    }
538}
539
540async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
541    let n = required_positive_int_arg(args, 0, "dual_control")?;
542    let m = required_positive_int_arg(args, 1, "dual_control")?;
543    if n > m {
544        return Err(VmError::Runtime(
545            "dual_control: n must be less than or equal to m".to_string(),
546        ));
547    }
548    let action = args
549        .get(2)
550        .and_then(|value| match value {
551            VmValue::Closure(closure) => Some(closure.clone()),
552            _ => None,
553        })
554        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
555    let approvers = optional_string_list(args.get(3), "dual_control")?;
556    if !approvers.is_empty() && approvers.len() < m as usize {
557        return Err(VmError::Runtime(format!(
558            "dual_control: expected at least {m} approvers, got {}",
559            approvers.len()
560        )));
561    }
562
563    let keys = current_dispatch_keys();
564    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
565    let trace_id = keys
566        .as_ref()
567        .map(|keys| keys.trace_id.clone())
568        .unwrap_or_else(new_trace_id);
569    let action_name = if action.func.name.is_empty() {
570        "anonymous".to_string()
571    } else {
572        action.func.name.clone()
573    };
574    let agent = keys
575        .as_ref()
576        .map(|keys| keys.agent.clone())
577        .unwrap_or_default();
578    let requested_at_time = OffsetDateTime::now_utc();
579    let requested_at = format_rfc3339(requested_at_time);
580    let mut approval_request = ApprovalRequest::new(
581        request_id.clone(),
582        action_name.clone(),
583        json!({
584            "n": n,
585            "m": m,
586            "approvers": approvers.clone(),
587        }),
588        agent.clone(),
589        requested_at.clone(),
590    );
591    approval_request.deadline = deadline_after(
592        requested_at_time,
593        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
594    );
595    approval_request.approvers_required = n as u32;
596    approval_request.undo_metadata = crate::orchestration::current_mutation_session()
597        .and_then(|session| serde_json::to_value(session).ok())
598        .unwrap_or(JsonValue::Null);
599    let approval_request_json = serde_json::to_value(&approval_request)
600        .map_err(|error| VmError::Runtime(error.to_string()))?;
601    let log = ensure_hitl_event_log();
602    let request = HitlRequestEnvelope {
603        request_id: request_id.clone(),
604        kind: HitlRequestKind::DualControl,
605        agent,
606        trace_id: trace_id.clone(),
607        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
608        requested_at: requested_at.clone(),
609        payload: json!({
610            "approval_request": approval_request_json,
611            "id": approval_request.id,
612            "args": approval_request.args,
613            "principal": approval_request.principal,
614            "requested_at": requested_at,
615            "deadline": approval_request.deadline,
616            "approvers_required": approval_request.approvers_required,
617            "evidence_refs": approval_request.evidence_refs,
618            "undo_metadata": approval_request.undo_metadata,
619            "capabilities_requested": approval_request.capabilities_requested,
620            "n": n,
621            "m": m,
622            "action": action_name,
623            "approvers": approvers,
624            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
625        }),
626    };
627    create_request_waitpoint(&log, &request).await?;
628    append_request(&log, &request).await?;
629    maybe_notify_host(&request);
630    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
631
632    match wait_for_request_waitpoint(
633        &request_id,
634        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
635    )
636    .await?
637    {
638        WaitpointOutcome::Completed(record) => {
639            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
640            let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
641                VmError::Runtime("dual_control requires an async builtin VM context".to_string())
642            })?;
643            let result = vm.call_closure_pub(&action, &[]).await?;
644
645            append_named_event(
646                &log,
647                HitlRequestKind::DualControl,
648                "hitl.dual_control_executed",
649                &request_id,
650                &trace_id,
651                json!({
652                    "request_id": request_id,
653                    "result": crate::llm::vm_value_to_json(&result),
654                }),
655            )
656            .await?;
657
658            Ok(result)
659        }
660        WaitpointOutcome::Timeout => {
661            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
662            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
663        }
664        WaitpointOutcome::Cancelled { .. } => {
665            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
666        }
667    }
668}
669
670async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
671    let role = required_string_arg(args, 0, "escalate_to")?;
672    let reason = required_string_arg(args, 1, "escalate_to")?;
673    let keys = current_dispatch_keys();
674    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
675    let trace_id = keys
676        .as_ref()
677        .map(|keys| keys.trace_id.clone())
678        .unwrap_or_else(new_trace_id);
679    let log = ensure_hitl_event_log();
680    let request = HitlRequestEnvelope {
681        request_id: request_id.clone(),
682        kind: HitlRequestKind::Escalation,
683        agent: keys
684            .as_ref()
685            .map(|keys| keys.agent.clone())
686            .unwrap_or_default(),
687        trace_id: trace_id.clone(),
688        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
689        requested_at: now_rfc3339(),
690        payload: json!({
691            "role": role,
692            "reason": reason,
693            "capability_policy": escalation_capability_policy(),
694        }),
695    };
696    create_request_waitpoint(&log, &request).await?;
697    append_request(&log, &request).await?;
698    maybe_notify_host(&request);
699    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
700
701    match wait_for_request_waitpoint(&request_id, None).await? {
702        WaitpointOutcome::Completed(record) => {
703            let accepted_at = record.completed_at.clone();
704            let reviewer = record.completed_by.clone();
705            let accepted = record
706                .value
707                .as_ref()
708                .and_then(|value| value.get("accepted"))
709                .and_then(JsonValue::as_bool)
710                .unwrap_or(true);
711            Ok(crate::stdlib::json_to_vm_value(&json!({
712                "request_id": request_id,
713                "role": role,
714                "reason": reason,
715                "trace_id": trace_id,
716                "status": if accepted { "accepted" } else { "pending" },
717                "accepted_at": accepted_at,
718                "reviewer": reviewer,
719            })))
720        }
721        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
722        WaitpointOutcome::Cancelled {
723            wait_id,
724            waitpoint_ids,
725            reason,
726        } => Err(hitl_cancelled_error(
727            &request_id,
728            HitlRequestKind::Escalation,
729            &wait_id,
730            &waitpoint_ids,
731            reason,
732        )),
733    }
734}
735
736async fn create_request_waitpoint(
737    log: &Arc<AnyEventLog>,
738    request: &HitlRequestEnvelope,
739) -> Result<(), VmError> {
740    create_waitpoint_on(
741        log,
742        Some(request.request_id.clone()),
743        Some(json!({
744            "kind": request.kind.as_str(),
745            "agent": request.agent.clone(),
746            "trace_id": request.trace_id.clone(),
747            "requested_at": request.requested_at.clone(),
748            "payload": request.payload.clone(),
749        })),
750    )
751    .await?;
752    Ok(())
753}
754
755async fn wait_for_request_waitpoint(
756    request_id: &str,
757    timeout: Option<StdDuration>,
758) -> Result<WaitpointOutcome, VmError> {
759    match wait_on_waitpoints(
760        vec![request_id.to_string()],
761        WaitpointWaitOptions { timeout },
762    )
763    .await
764    {
765        Ok(records) => Ok(WaitpointOutcome::Completed(
766            records
767                .into_iter()
768                .next()
769                .expect("single waitpoint wait result"),
770        )),
771        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
772        Err(WaitpointWaitFailure::Cancelled {
773            wait_id,
774            waitpoint_ids,
775            reason,
776        }) => Ok(WaitpointOutcome::Cancelled {
777            wait_id,
778            waitpoint_ids,
779            reason,
780        }),
781        Err(WaitpointWaitFailure::Vm(error)) => {
782            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
783                return Ok(outcome);
784            }
785            Err(error)
786        }
787    }
788}
789
790fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
791    let VmError::Thrown(VmValue::Dict(dict)) = error else {
792        return None;
793    };
794    let name = dict.get("name").and_then(vm_string)?;
795    match name {
796        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
797        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
798            wait_id: dict
799                .get("wait_id")
800                .and_then(vm_string)
801                .unwrap_or_default()
802                .to_string(),
803            waitpoint_ids: dict
804                .get("waitpoint_ids")
805                .and_then(vm_string_list)
806                .unwrap_or_default(),
807            reason: dict
808                .get("reason")
809                .and_then(vm_string)
810                .map(ToString::to_string),
811        }),
812        _ => None,
813    }
814}
815
816async fn finalize_hitl_response(
817    log: &Arc<AnyEventLog>,
818    kind: HitlRequestKind,
819    response: &HitlHostResponse,
820) -> Result<(), String> {
821    match kind {
822        HitlRequestKind::Question => {
823            if waitpoint_is_terminal(log, &response.request_id).await? {
824                return Ok(());
825            }
826            complete_waitpoint_on(
827                log,
828                &response.request_id,
829                response.answer.clone(),
830                response.reviewer.clone(),
831                response.reason.clone(),
832                response.metadata.clone(),
833            )
834            .await
835            .map(|_| ())
836            .map_err(|error| error.to_string())
837        }
838        HitlRequestKind::Escalation => {
839            if !response.accepted.unwrap_or(false)
840                || waitpoint_is_terminal(log, &response.request_id).await?
841            {
842                return Ok(());
843            }
844            complete_waitpoint_on(
845                log,
846                &response.request_id,
847                Some(json!({
848                    "accepted": true,
849                    "reviewer": response.reviewer,
850                    "reason": response.reason,
851                    "responded_at": response.responded_at,
852                })),
853                response.reviewer.clone(),
854                response.reason.clone(),
855                response.metadata.clone(),
856            )
857            .await
858            .map(|_| ())
859            .map_err(|error| error.to_string())
860        }
861        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
862            if waitpoint_is_terminal(log, &response.request_id).await? {
863                return Ok(());
864            }
865            let request = load_request_envelope(log, kind, &response.request_id)
866                .await
867                .map_err(|error| error.to_string())?;
868            match resolve_approval_state(log, kind, &request)
869                .await
870                .map_err(|error| error.to_string())?
871            {
872                ApprovalResolution::Pending => Ok(()),
873                ApprovalResolution::Approved(progress) => {
874                    let record = approval_record_json(&progress);
875                    append_named_event(
876                        log,
877                        kind,
878                        approved_event_kind(kind),
879                        &request.request_id,
880                        &request.trace_id,
881                        json!({
882                            "request_id": request.request_id.clone(),
883                            "record": record.clone(),
884                        }),
885                    )
886                    .await
887                    .map_err(|error| error.to_string())?;
888                    complete_waitpoint_on(
889                        log,
890                        &request.request_id,
891                        Some(record),
892                        response.reviewer.clone(),
893                        progress.reason.clone(),
894                        response.metadata.clone(),
895                    )
896                    .await
897                    .map(|_| ())
898                    .map_err(|error| error.to_string())
899                }
900                ApprovalResolution::Denied(denied) => {
901                    append_named_event(
902                        log,
903                        kind,
904                        denied_event_kind(kind),
905                        &request.request_id,
906                        &request.trace_id,
907                        json!({
908                            "request_id": request.request_id.clone(),
909                            "reviewer": denied.reviewer.clone(),
910                            "reason": denied.reason.clone(),
911                        }),
912                    )
913                    .await
914                    .map_err(|error| error.to_string())?;
915                    cancel_waitpoint_on(
916                        log,
917                        &request.request_id,
918                        denied.reviewer.clone(),
919                        denied.reason.clone(),
920                        denied.metadata.clone(),
921                    )
922                    .await
923                    .map(|_| ())
924                    .map_err(|error| error.to_string())
925                }
926            }
927        }
928    }
929}
930
931async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
932    Ok(inspect_waitpoint_on(log, request_id)
933        .await
934        .map_err(|error| error.to_string())?
935        .is_some_and(|record| record.status != WaitpointStatus::Open))
936}
937
938async fn load_request_envelope(
939    log: &Arc<AnyEventLog>,
940    kind: HitlRequestKind,
941    request_id: &str,
942) -> Result<HitlRequestEnvelope, VmError> {
943    let topic = topic(kind)?;
944    let events = log
945        .read_range(&topic, None, usize::MAX)
946        .await
947        .map_err(log_error)?;
948    events
949        .into_iter()
950        .filter(|(_, event)| event.kind == kind.request_event_kind())
951        .find_map(|(_, event)| {
952            if !event_matches_request(&event, request_id) {
953                return None;
954            }
955            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
956        })
957        .ok_or_else(|| {
958            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
959        })
960}
961
962async fn resolve_approval_state(
963    log: &Arc<AnyEventLog>,
964    kind: HitlRequestKind,
965    request: &HitlRequestEnvelope,
966) -> Result<ApprovalResolution, VmError> {
967    let quorum = approval_quorum_from_request(kind, request)?;
968    let allowed_reviewers = approval_reviewers_from_request(kind, request)
969        .into_iter()
970        .collect::<BTreeSet<_>>();
971    let mut progress = ApprovalProgress {
972        reviewers: BTreeSet::new(),
973        signatures: Vec::new(),
974        reason: None,
975        approved_at: None,
976    };
977    let topic = topic(kind)?;
978    let events = log
979        .read_range(&topic, None, usize::MAX)
980        .await
981        .map_err(log_error)?;
982    for (_, event) in events {
983        if !event_matches_request(&event, &request.request_id)
984            || event.kind != "hitl.response_received"
985        {
986            continue;
987        }
988        let response: HitlHostResponse = serde_json::from_value(event.payload)
989            .map_err(|error| VmError::Runtime(error.to_string()))?;
990        if let Some(reviewer) = response.reviewer.as_deref() {
991            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
992                continue;
993            }
994            if progress.reviewers.contains(reviewer) {
995                continue;
996            }
997        }
998        if response.approved.unwrap_or(false) {
999            if let Some(reviewer) = response.reviewer.clone() {
1000                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1001                progress.reviewers.insert(reviewer.clone());
1002                progress.signatures.push(ApprovalSignature {
1003                    reviewer: reviewer.clone(),
1004                    signed_at: signed_at.clone(),
1005                    signature: response.signature.clone().unwrap_or_else(|| {
1006                        approval_receipt_signature(
1007                            &request.request_id,
1008                            &reviewer,
1009                            &signed_at,
1010                            true,
1011                            response.reason.as_deref(),
1012                        )
1013                    }),
1014                });
1015            }
1016            progress.reason = response.reason.clone();
1017            progress.approved_at = response.responded_at.clone();
1018            if progress.reviewers.len() as u32 >= quorum {
1019                return Ok(ApprovalResolution::Approved(progress));
1020            }
1021            continue;
1022        }
1023        return Ok(ApprovalResolution::Denied(response));
1024    }
1025    Ok(ApprovalResolution::Pending)
1026}
1027
1028fn approval_quorum_from_request(
1029    kind: HitlRequestKind,
1030    request: &HitlRequestEnvelope,
1031) -> Result<u32, VmError> {
1032    let key = match kind {
1033        HitlRequestKind::DualControl => "n",
1034        _ => "quorum",
1035    };
1036    let quorum = request
1037        .payload
1038        .get(key)
1039        .or_else(|| request.payload.get("approvers_required"))
1040        .or_else(|| {
1041            request
1042                .payload
1043                .get("approval_request")
1044                .and_then(|approval| approval.get("approvers_required"))
1045        })
1046        .and_then(JsonValue::as_u64)
1047        .unwrap_or(1);
1048    u32::try_from(quorum).map_err(|_| {
1049        VmError::Runtime(format!(
1050            "invalid quorum in HITL request '{}'",
1051            request.request_id
1052        ))
1053    })
1054}
1055
1056fn approval_reviewers_from_request(
1057    kind: HitlRequestKind,
1058    request: &HitlRequestEnvelope,
1059) -> Vec<String> {
1060    let key = match kind {
1061        HitlRequestKind::DualControl => "approvers",
1062        _ => "reviewers",
1063    };
1064    request
1065        .payload
1066        .get(key)
1067        .or_else(|| {
1068            request
1069                .payload
1070                .get("approval_request")
1071                .and_then(|approval| approval.get("reviewers"))
1072        })
1073        .and_then(JsonValue::as_array)
1074        .map(|values| {
1075            values
1076                .iter()
1077                .filter_map(JsonValue::as_str)
1078                .map(str::to_string)
1079                .collect()
1080        })
1081        .unwrap_or_default()
1082}
1083
1084fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1085    json!({
1086        "approved": true,
1087        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1088        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1089        "reason": progress.reason,
1090        "signatures": progress.signatures,
1091    })
1092}
1093
1094fn approval_receipt_signature(
1095    request_id: &str,
1096    reviewer: &str,
1097    signed_at: &str,
1098    approved: bool,
1099    reason: Option<&str>,
1100) -> String {
1101    let material = format!(
1102        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1103        reason.unwrap_or("")
1104    );
1105    let hash = sha2::Sha256::digest(material.as_bytes());
1106    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1107    format!("sha256:{hex}")
1108}
1109
1110fn approval_record_from_waitpoint(
1111    record: &WaitpointRecord,
1112    builtin: &str,
1113) -> Result<VmValue, VmError> {
1114    record
1115        .value
1116        .as_ref()
1117        .map(crate::stdlib::json_to_vm_value)
1118        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1119}
1120
1121async fn approval_wait_error(
1122    log: &Arc<AnyEventLog>,
1123    kind: HitlRequestKind,
1124    request_id: &str,
1125) -> VmError {
1126    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1127        if record.status == WaitpointStatus::Cancelled
1128            && record.reason.as_deref() != Some("upstream_cancelled")
1129        {
1130            return approval_denied_error(
1131                request_id,
1132                HitlHostResponse {
1133                    request_id: request_id.to_string(),
1134                    answer: None,
1135                    approved: Some(false),
1136                    accepted: None,
1137                    reviewer: record.cancelled_by.clone(),
1138                    reason: record.reason.clone(),
1139                    metadata: record.metadata.clone(),
1140                    responded_at: record.cancelled_at.clone(),
1141                    signature: None,
1142                },
1143            );
1144        }
1145        if record.status == WaitpointStatus::Cancelled {
1146            return hitl_cancelled_error(
1147                request_id,
1148                kind,
1149                "",
1150                &[request_id.to_string()],
1151                record.reason.clone(),
1152            );
1153        }
1154    }
1155    hitl_cancelled_error(
1156        request_id,
1157        kind,
1158        "",
1159        &[request_id.to_string()],
1160        Some("upstream_cancelled".to_string()),
1161    )
1162}
1163
1164async fn append_timeout_once(
1165    log: &Arc<AnyEventLog>,
1166    kind: HitlRequestKind,
1167    request_id: &str,
1168    trace_id: &str,
1169) -> Result<(), VmError> {
1170    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1171        return Ok(());
1172    }
1173    append_timeout(log, kind, request_id, trace_id).await
1174}
1175
1176async fn hitl_event_exists(
1177    log: &Arc<AnyEventLog>,
1178    kind: HitlRequestKind,
1179    request_id: &str,
1180    event_kind: &str,
1181) -> Result<bool, VmError> {
1182    let topic = topic(kind)?;
1183    let events = log
1184        .read_range(&topic, None, usize::MAX)
1185        .await
1186        .map_err(log_error)?;
1187    Ok(events
1188        .into_iter()
1189        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1190}
1191
1192fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1193    match kind {
1194        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1195        _ => "hitl.approval_approved",
1196    }
1197}
1198
1199fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1200    match kind {
1201        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1202        _ => "hitl.approval_denied",
1203    }
1204}
1205
1206async fn append_request(
1207    log: &Arc<AnyEventLog>,
1208    request: &HitlRequestEnvelope,
1209) -> Result<(), VmError> {
1210    let topic = topic(request.kind)?;
1211    log.append(
1212        &topic,
1213        LogEvent::new(
1214            request.kind.request_event_kind(),
1215            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1216        )
1217        .with_headers(request_headers(request)),
1218    )
1219    .await
1220    .map(|_| ())
1221    .map_err(log_error)
1222}
1223
1224async fn append_named_event(
1225    log: &Arc<AnyEventLog>,
1226    kind: HitlRequestKind,
1227    event_kind: &str,
1228    request_id: &str,
1229    trace_id: &str,
1230    payload: JsonValue,
1231) -> Result<(), VmError> {
1232    let topic = topic(kind)?;
1233    let headers = headers_with_trace(request_id, trace_id);
1234    log.append(
1235        &topic,
1236        LogEvent::new(event_kind, payload).with_headers(headers),
1237    )
1238    .await
1239    .map(|_| ())
1240    .map_err(log_error)
1241}
1242
1243async fn append_timeout(
1244    log: &Arc<AnyEventLog>,
1245    kind: HitlRequestKind,
1246    request_id: &str,
1247    trace_id: &str,
1248) -> Result<(), VmError> {
1249    append_named_event(
1250        log,
1251        kind,
1252        "hitl.timeout",
1253        request_id,
1254        trace_id,
1255        serde_json::to_value(HitlTimeoutRecord {
1256            request_id: request_id.to_string(),
1257            kind,
1258            trace_id: trace_id.to_string(),
1259            timed_out_at: now_rfc3339(),
1260        })
1261        .map_err(|error| VmError::Runtime(error.to_string()))?,
1262    )
1263    .await
1264}
1265
1266async fn maybe_apply_mock_response(
1267    kind: HitlRequestKind,
1268    request_id: &str,
1269    request_payload: &JsonValue,
1270) -> Result<(), VmError> {
1271    let mut params = request_payload
1272        .as_object()
1273        .cloned()
1274        .unwrap_or_default()
1275        .into_iter()
1276        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1277        .collect::<BTreeMap<_, _>>();
1278    params.insert(
1279        "request_id".to_string(),
1280        VmValue::String(Rc::from(request_id.to_string())),
1281    );
1282    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1283        return Ok(());
1284    };
1285    let value = result?;
1286    let responses = match value {
1287        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1288        other => vec![other],
1289    };
1290    for response in responses {
1291        let response_dict = response.as_dict().ok_or_else(|| {
1292            VmError::Runtime(format!(
1293                "mocked HITL {} response must be a dict or list<dict>",
1294                kind.as_str()
1295            ))
1296        })?;
1297        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1298        append_hitl_response(None, hitl_response)
1299            .await
1300            .map_err(VmError::Runtime)?;
1301    }
1302    Ok(())
1303}
1304
1305fn parse_hitl_response_dict(
1306    request_id: &str,
1307    response_dict: &BTreeMap<String, VmValue>,
1308) -> Result<HitlHostResponse, VmError> {
1309    Ok(HitlHostResponse {
1310        request_id: request_id.to_string(),
1311        answer: response_dict
1312            .get("answer")
1313            .map(crate::llm::vm_value_to_json),
1314        approved: response_dict.get("approved").and_then(vm_bool),
1315        accepted: response_dict.get("accepted").and_then(vm_bool),
1316        reviewer: response_dict.get("reviewer").map(VmValue::display),
1317        reason: response_dict.get("reason").map(VmValue::display),
1318        metadata: response_dict
1319            .get("metadata")
1320            .map(crate::llm::vm_value_to_json),
1321        responded_at: response_dict.get("responded_at").map(VmValue::display),
1322        signature: response_dict.get("signature").map(VmValue::display),
1323    })
1324}
1325
1326fn maybe_notify_host(request: &HitlRequestEnvelope) {
1327    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1328        return;
1329    };
1330    bridge.notify(
1331        "harn.hitl.requested",
1332        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1333    );
1334}
1335
1336fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1337    let Some(value) = value else {
1338        return Ok(AskUserOptions {
1339            schema: None,
1340            timeout: Some(default_question_timeout()),
1341            default: None,
1342        });
1343    };
1344    let dict = value
1345        .as_dict()
1346        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1347    Ok(AskUserOptions {
1348        schema: dict
1349            .get("schema")
1350            .cloned()
1351            .filter(|value| !matches!(value, VmValue::Nil)),
1352        timeout: dict
1353            .get("timeout")
1354            .map(parse_duration_value)
1355            .transpose()?
1356            .or_else(|| Some(default_question_timeout())),
1357        default: dict
1358            .get("default")
1359            .cloned()
1360            .filter(|value| !matches!(value, VmValue::Nil)),
1361    })
1362}
1363
1364fn default_question_timeout() -> StdDuration {
1365    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1366}
1367
1368fn escalation_capability_policy() -> JsonValue {
1369    crate::orchestration::current_execution_policy()
1370        .and_then(|policy| serde_json::to_value(policy).ok())
1371        .unwrap_or(JsonValue::Null)
1372}
1373
1374fn parse_approval_options(
1375    value: Option<&VmValue>,
1376    builtin: &str,
1377) -> Result<ApprovalOptions, VmError> {
1378    let dict = match value {
1379        None => None,
1380        Some(VmValue::Dict(dict)) => Some(dict),
1381        Some(_) => {
1382            return Err(VmError::Runtime(format!(
1383                "{builtin}: options must be a dict"
1384            )))
1385        }
1386    };
1387    let quorum = dict
1388        .and_then(|dict| dict.get("quorum"))
1389        .and_then(VmValue::as_int)
1390        .unwrap_or(1);
1391    if quorum <= 0 {
1392        return Err(VmError::Runtime(format!(
1393            "{builtin}: quorum must be positive"
1394        )));
1395    }
1396    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1397    let capabilities_requested = optional_string_list(
1398        dict.and_then(|dict| dict.get("capabilities_requested")),
1399        builtin,
1400    )?;
1401    let evidence_refs = dict
1402        .and_then(|dict| dict.get("evidence_refs"))
1403        .map(|value| match value {
1404            VmValue::List(items) => Ok(items
1405                .iter()
1406                .map(crate::llm::vm_value_to_json)
1407                .collect::<Vec<_>>()),
1408            _ => Err(VmError::Runtime(format!(
1409                "{builtin}: evidence_refs must be a list"
1410            ))),
1411        })
1412        .transpose()?
1413        .unwrap_or_default();
1414    let deadline = dict
1415        .and_then(|dict| dict.get("deadline"))
1416        .map(parse_duration_value)
1417        .transpose()?
1418        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1419    Ok(ApprovalOptions {
1420        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1421        args: dict.and_then(|dict| dict.get("args")).cloned(),
1422        quorum: quorum as u32,
1423        reviewers,
1424        deadline,
1425        principal: dict
1426            .and_then(|dict| dict.get("principal"))
1427            .map(VmValue::display)
1428            .filter(|value| !value.is_empty()),
1429        evidence_refs,
1430        undo_metadata: dict
1431            .and_then(|dict| dict.get("undo_metadata"))
1432            .map(crate::llm::vm_value_to_json),
1433        capabilities_requested,
1434    })
1435}
1436
1437fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1438    args.get(idx)
1439        .map(VmValue::display)
1440        .filter(|value| !value.is_empty())
1441        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1442}
1443
1444fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1445    let value = args
1446        .get(idx)
1447        .and_then(VmValue::as_int)
1448        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1449    if value <= 0 {
1450        return Err(VmError::Runtime(format!(
1451            "{builtin}: expected a positive int at {idx}"
1452        )));
1453    }
1454    Ok(value)
1455}
1456
1457fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1458    let Some(value) = value else {
1459        return Ok(Vec::new());
1460    };
1461    match value {
1462        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1463        _ => Err(VmError::Runtime(format!(
1464            "{builtin}: expected list<string>"
1465        ))),
1466    }
1467}
1468
1469fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1470    match value {
1471        VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1472        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1473        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1474        _ => Err(VmError::Runtime(
1475            "expected a duration or millisecond count".to_string(),
1476        )),
1477    }
1478}
1479
1480fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1481    active_event_log()
1482        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1483}
1484
1485fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1486    if let Some(log) = active_event_log() {
1487        return Ok(log);
1488    }
1489    let Some(base_dir) = base_dir else {
1490        return Ok(install_memory_for_current_thread(
1491            HITL_EVENT_LOG_QUEUE_DEPTH,
1492        ));
1493    };
1494    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1495}
1496
1497fn current_dispatch_keys() -> Option<DispatchKeys> {
1498    let context = current_dispatch_context()?;
1499    let stable_base = context
1500        .replay_of_event_id
1501        .clone()
1502        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1503    let instance_key = format!(
1504        "{}::{}",
1505        context.trigger_event.id.0,
1506        context.replay_of_event_id.as_deref().unwrap_or("live")
1507    );
1508    Some(DispatchKeys {
1509        instance_key,
1510        stable_base,
1511        agent: context.agent_id,
1512        trace_id: context.trigger_event.trace_id.0,
1513    })
1514}
1515
1516fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1517    if let Some(keys) = dispatch_keys {
1518        let seq = REQUEST_SEQUENCE.with(|slot| {
1519            let mut state = slot.borrow_mut();
1520            if state.instance_key != keys.instance_key {
1521                state.instance_key = keys.instance_key.clone();
1522                state.next_seq = 0;
1523            }
1524            state.next_seq += 1;
1525            state.next_seq
1526        });
1527        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1528    }
1529    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1530}
1531
1532fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1533    let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1534    if let Some(run_id) = request.run_id.as_ref() {
1535        headers.insert("run_id".to_string(), run_id.clone());
1536    }
1537    headers
1538}
1539
1540fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1541    let mut headers = BTreeMap::new();
1542    headers.insert("request_id".to_string(), request_id.to_string());
1543    headers
1544}
1545
1546fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1547    let mut headers = response_headers(request_id);
1548    headers.insert("trace_id".to_string(), trace_id.to_string());
1549    headers
1550}
1551
1552fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1553    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1554}
1555
1556fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1557    event
1558        .headers
1559        .get("request_id")
1560        .is_some_and(|value| value == request_id)
1561        || event
1562            .payload
1563            .get("request_id")
1564            .and_then(JsonValue::as_str)
1565            .is_some_and(|value| value == request_id)
1566}
1567
1568fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1569    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1570        "name": "ApprovalDeniedError",
1571        "category": "generic",
1572        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1573        "request_id": request_id,
1574        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1575        "reason": response.reason,
1576    })))
1577}
1578
1579fn hitl_cancelled_error(
1580    request_id: &str,
1581    kind: HitlRequestKind,
1582    wait_id: &str,
1583    waitpoint_ids: &[String],
1584    reason: Option<String>,
1585) -> VmError {
1586    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1587    let message = reason
1588        .clone()
1589        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1590    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1591        "name": "HumanCancelledError",
1592        "category": ErrorCategory::Cancelled.as_str(),
1593        "message": message,
1594        "request_id": request_id,
1595        "kind": kind.as_str(),
1596        "wait_id": wait_id,
1597        "waitpoint_ids": waitpoint_ids,
1598        "reason": reason,
1599    })))
1600}
1601
1602fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1603    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1604    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1605        "name": "HumanTimeoutError",
1606        "category": ErrorCategory::Timeout.as_str(),
1607        "message": format!("{} timed out", kind.as_str()),
1608        "request_id": request_id,
1609        "kind": kind.as_str(),
1610    })))
1611}
1612
1613fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1614    match default {
1615        VmValue::Int(_) => match value {
1616            VmValue::Int(_) => value.clone(),
1617            VmValue::Float(number) => VmValue::Int(*number as i64),
1618            VmValue::String(text) => text
1619                .parse::<i64>()
1620                .map(VmValue::Int)
1621                .unwrap_or_else(|_| default.clone()),
1622            _ => default.clone(),
1623        },
1624        VmValue::Float(_) => match value {
1625            VmValue::Float(_) => value.clone(),
1626            VmValue::Int(number) => VmValue::Float(*number as f64),
1627            VmValue::String(text) => text
1628                .parse::<f64>()
1629                .map(VmValue::Float)
1630                .unwrap_or_else(|_| default.clone()),
1631            _ => default.clone(),
1632        },
1633        VmValue::Bool(_) => match value {
1634            VmValue::Bool(_) => value.clone(),
1635            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1636            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1637            _ => default.clone(),
1638        },
1639        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1640        VmValue::Duration(_) => match value {
1641            VmValue::Duration(_) => value.clone(),
1642            VmValue::Int(ms) => VmValue::Duration(*ms),
1643            _ => default.clone(),
1644        },
1645        VmValue::Nil => value.clone(),
1646        _ => {
1647            if value.type_name() == default.type_name() {
1648                value.clone()
1649            } else {
1650                default.clone()
1651            }
1652        }
1653    }
1654}
1655
1656fn log_error(error: impl std::fmt::Display) -> VmError {
1657    VmError::Runtime(error.to_string())
1658}
1659
1660fn now_rfc3339() -> String {
1661    format_rfc3339(OffsetDateTime::now_utc())
1662}
1663
1664fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1665    timestamp
1666        .format(&Rfc3339)
1667        .unwrap_or_else(|_| timestamp.to_string())
1668}
1669
1670fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1671    time::Duration::try_from(duration)
1672        .ok()
1673        .map(|duration| format_rfc3339(requested_at + duration))
1674}
1675
1676fn new_trace_id() -> String {
1677    format!("trace_{}", Uuid::now_v7())
1678}
1679
1680fn vm_bool(value: &VmValue) -> Option<bool> {
1681    match value {
1682        VmValue::Bool(flag) => Some(*flag),
1683        _ => None,
1684    }
1685}
1686
1687fn vm_string(value: &VmValue) -> Option<&str> {
1688    match value {
1689        VmValue::String(text) => Some(text.as_ref()),
1690        _ => None,
1691    }
1692}
1693
1694fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1695    match value {
1696        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1697        _ => None,
1698    }
1699}
1700
1701#[cfg(test)]
1702mod tests {
1703    use super::{
1704        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1705    };
1706    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1707    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1708
1709    async fn execute_hitl_script(
1710        base_dir: &std::path::Path,
1711        source: &str,
1712    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1713        reset_thread_local_state();
1714        let log = install_default_for_base_dir(base_dir).expect("install event log");
1715        let chunk = compile_source(source).expect("compile source");
1716        let mut vm = Vm::new();
1717        register_vm_stdlib(&mut vm);
1718        vm.set_source_dir(base_dir);
1719        vm.execute(&chunk).await?;
1720        let output = vm.output().trim_end().to_string();
1721        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1722        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1723        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1724        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1725        Ok((
1726            output,
1727            question_events,
1728            approval_events,
1729            dual_control_events,
1730            escalation_events,
1731        ))
1732    }
1733
1734    async fn event_kinds(
1735        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1736        topic: &str,
1737    ) -> Vec<String> {
1738        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1739            .await
1740            .expect("read topic")
1741            .into_iter()
1742            .map(|(_, event)| event.kind)
1743            .collect()
1744    }
1745
1746    async fn event_payloads(
1747        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1748        topic: &str,
1749    ) -> Vec<serde_json::Value> {
1750        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1751            .await
1752            .expect("read topic")
1753            .into_iter()
1754            .map(|(_, event)| event.payload)
1755            .collect()
1756    }
1757
1758    #[tokio::test(flavor = "current_thread")]
1759    async fn ask_user_coerces_to_default_type_and_logs_events() {
1760        tokio::task::LocalSet::new()
1761            .run_until(async {
1762                let dir = tempfile::tempdir().expect("tempdir");
1763                let source = r#"
1764pipeline test(task) {
1765  host_mock("hitl", "question", {answer: "9"})
1766  let answer: int = ask_user("Pick a number", {default: 0})
1767  println(answer)
1768}
1769"#;
1770                let (
1771                    output,
1772                    question_events,
1773                    approval_events,
1774                    dual_control_events,
1775                    escalation_events,
1776                ) = execute_hitl_script(dir.path(), source)
1777                    .await
1778                    .expect("script succeeds");
1779                assert_eq!(output, "9");
1780                assert_eq!(
1781                    question_events,
1782                    vec![
1783                        "hitl.question_asked".to_string(),
1784                        "hitl.response_received".to_string()
1785                    ]
1786                );
1787                assert!(approval_events.is_empty());
1788                assert!(dual_control_events.is_empty());
1789                assert!(escalation_events.is_empty());
1790            })
1791            .await;
1792    }
1793
1794    #[tokio::test(flavor = "current_thread")]
1795    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1796        tokio::task::LocalSet::new()
1797            .run_until(async {
1798                let dir = tempfile::tempdir().expect("tempdir");
1799                let source = r#"
1800pipeline test(task) {
1801  host_mock("hitl", "approval", [
1802    {approved: true, reviewer: "alice", reason: "ok"},
1803    {approved: true, reviewer: "bob", reason: "ship it"},
1804  ])
1805  let record = request_approval(
1806    "deploy production",
1807    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1808  )
1809  println(record.approved)
1810  println(len(record.reviewers))
1811  println(record.reviewers[0])
1812  println(record.reviewers[1])
1813}
1814"#;
1815                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1816                    .await
1817                    .expect("script succeeds");
1818                assert_eq!(
1819                    approval_events,
1820                    vec![
1821                        "hitl.approval_requested".to_string(),
1822                        "hitl.response_received".to_string(),
1823                        "hitl.response_received".to_string(),
1824                        "hitl.approval_approved".to_string(),
1825                    ]
1826                );
1827            })
1828            .await;
1829    }
1830
1831    #[tokio::test(flavor = "current_thread")]
1832    async fn request_approval_emits_canonical_approval_request_payload() {
1833        tokio::task::LocalSet::new()
1834            .run_until(async {
1835                reset_thread_local_state();
1836                let dir = tempfile::tempdir().expect("tempdir");
1837                let log = install_default_for_base_dir(dir.path()).expect("install event log");
1838                let source = r#"
1839pipeline test(task) {
1840  host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
1841  request_approval("deploy production", {
1842    args: {environment: "prod"},
1843    quorum: 1,
1844    reviewers: ["alice"],
1845    evidence_refs: [{kind: "run", uri: "run_123"}],
1846    undo_metadata: {strategy: "rollback"},
1847    capabilities_requested: ["deploy.production"],
1848  })
1849}
1850"#;
1851                let chunk = compile_source(source).expect("compile source");
1852                let mut vm = Vm::new();
1853                register_vm_stdlib(&mut vm);
1854                vm.set_source_dir(dir.path());
1855                vm.execute(&chunk).await.expect("script succeeds");
1856
1857                let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
1858                let request_payload = &payloads[0]["payload"];
1859                let approval_request = &request_payload["approval_request"];
1860                assert_eq!(approval_request["id"], request_payload["id"]);
1861                assert_eq!(approval_request["action"], "deploy production");
1862                assert_eq!(approval_request["args"]["environment"], "prod");
1863                assert_eq!(approval_request["approvers_required"], 1);
1864                assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
1865                assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
1866                assert_eq!(
1867                    approval_request["capabilities_requested"][0],
1868                    "deploy.production"
1869                );
1870                assert!(approval_request["requested_at"].as_str().is_some());
1871                assert!(approval_request["deadline"].as_str().is_some());
1872            })
1873            .await;
1874    }
1875
1876    #[tokio::test(flavor = "current_thread")]
1877    async fn request_approval_surfaces_denials_as_typed_errors() {
1878        tokio::task::LocalSet::new()
1879            .run_until(async {
1880                let dir = tempfile::tempdir().expect("tempdir");
1881                let source = r#"
1882pipeline test(task) {
1883  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1884  let denied = try {
1885    request_approval("drop table", {reviewers: ["alice"]})
1886  }
1887  println(is_err(denied))
1888  println(unwrap_err(denied).name)
1889  println(unwrap_err(denied).reason)
1890}
1891"#;
1892                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1893                    .await
1894                    .expect("script succeeds");
1895                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1896                assert_eq!(
1897                    approval_events,
1898                    vec![
1899                        "hitl.approval_requested".to_string(),
1900                        "hitl.response_received".to_string(),
1901                        "hitl.approval_denied".to_string(),
1902                    ]
1903                );
1904            })
1905            .await;
1906    }
1907
1908    #[tokio::test(flavor = "current_thread")]
1909    async fn dual_control_executes_action_after_quorum() {
1910        tokio::task::LocalSet::new()
1911            .run_until(async {
1912                let dir = tempfile::tempdir().expect("tempdir");
1913                let source = r#"
1914pipeline test(task) {
1915  host_mock("hitl", "dual_control", [
1916    {approved: true, reviewer: "alice"},
1917    {approved: true, reviewer: "bob"},
1918  ])
1919  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1920  println(result)
1921}
1922"#;
1923                let (output, _, _, dual_control_events, _) =
1924                    execute_hitl_script(dir.path(), source)
1925                        .await
1926                        .expect("script succeeds");
1927                assert_eq!(output, "launched");
1928                assert_eq!(
1929                    dual_control_events,
1930                    vec![
1931                        "hitl.dual_control_requested".to_string(),
1932                        "hitl.response_received".to_string(),
1933                        "hitl.response_received".to_string(),
1934                        "hitl.dual_control_approved".to_string(),
1935                        "hitl.dual_control_executed".to_string(),
1936                    ]
1937                );
1938            })
1939            .await;
1940    }
1941
1942    #[tokio::test(flavor = "current_thread")]
1943    async fn escalate_to_waits_for_acceptance_event() {
1944        tokio::task::LocalSet::new()
1945            .run_until(async {
1946                let dir = tempfile::tempdir().expect("tempdir");
1947                let source = r#"
1948pipeline test(task) {
1949  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1950  let handle = escalate_to("admin", "need override")
1951  println(handle.status)
1952  println(handle.reviewer)
1953}
1954"#;
1955                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1956                    .await
1957                    .expect("script succeeds");
1958                assert_eq!(output, "accepted\nlead");
1959                assert_eq!(
1960                    escalation_events,
1961                    vec![
1962                        "hitl.escalation_issued".to_string(),
1963                        "hitl.escalation_accepted".to_string(),
1964                    ]
1965                );
1966            })
1967            .await;
1968    }
1969}