Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::runtime_limits::RuntimeLimits;
20use crate::schema::schema_expect_value;
21use crate::stdlib::host::dispatch_mock_host_call;
22use crate::stdlib::macros::{harn_builtin, BuiltinSignature, Param, VmBuiltinDef, TY_ANY, TY_DICT};
23use crate::stdlib::waitpoint::{
24    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
25    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
26    WaitpointWaitOptions,
27};
28use crate::triggers::dispatcher::current_dispatch_context;
29use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
30use crate::vm::{clone_async_builtin_child_vm, Vm};
31
32const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
33const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
34const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
35
36pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
37pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
38pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
39pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
40
41thread_local! {
42    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
43}
44
45#[derive(Default)]
46pub(crate) struct RequestSequenceState {
47    pub(crate) instance_key: String,
48    pub(crate) next_seq: u64,
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub enum HitlRequestKind {
54    Question,
55    Approval,
56    DualControl,
57    Escalation,
58}
59
60impl HitlRequestKind {
61    pub(crate) fn as_str(self) -> &'static str {
62        match self {
63            Self::Question => "question",
64            Self::Approval => "approval",
65            Self::DualControl => "dual_control",
66            Self::Escalation => "escalation",
67        }
68    }
69
70    fn topic(self) -> &'static str {
71        match self {
72            Self::Question => HITL_QUESTIONS_TOPIC,
73            Self::Approval => HITL_APPROVALS_TOPIC,
74            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
75            Self::Escalation => HITL_ESCALATIONS_TOPIC,
76        }
77    }
78
79    fn request_event_kind(self) -> &'static str {
80        match self {
81            Self::Question => "hitl.question_asked",
82            Self::Approval => "hitl.approval_requested",
83            Self::DualControl => "hitl.dual_control_requested",
84            Self::Escalation => "hitl.escalation_issued",
85        }
86    }
87
88    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
89        if request_id.starts_with("hitl_question_") {
90            Some(Self::Question)
91        } else if request_id.starts_with("hitl_approval_") {
92            Some(Self::Approval)
93        } else if request_id.starts_with("hitl_dual_control_") {
94            Some(Self::DualControl)
95        } else if request_id.starts_with("hitl_escalation_") {
96            Some(Self::Escalation)
97        } else {
98            None
99        }
100    }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct HitlHostResponse {
105    pub request_id: String,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub answer: Option<JsonValue>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub approved: Option<bool>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub accepted: Option<bool>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub reviewer: Option<String>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub reason: Option<String>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub metadata: Option<JsonValue>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub responded_at: Option<String>,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub signature: Option<String>,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125struct HitlRequestEnvelope {
126    request_id: String,
127    kind: HitlRequestKind,
128    #[serde(default)]
129    agent: String,
130    trace_id: String,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    run_id: Option<String>,
133    requested_at: String,
134    payload: JsonValue,
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize)]
138struct HitlTimeoutRecord {
139    request_id: String,
140    kind: HitlRequestKind,
141    trace_id: String,
142    timed_out_at: String,
143}
144
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct ApprovalRequest {
147    pub id: String,
148    pub action: String,
149    #[serde(default)]
150    pub args: JsonValue,
151    pub principal: String,
152    pub requested_at: String,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub deadline: Option<String>,
155    pub approvers_required: u32,
156    #[serde(default)]
157    pub evidence_refs: Vec<JsonValue>,
158    #[serde(default)]
159    pub undo_metadata: JsonValue,
160    #[serde(default)]
161    pub capabilities_requested: Vec<String>,
162}
163
164impl ApprovalRequest {
165    pub fn new(
166        id: impl Into<String>,
167        action: impl Into<String>,
168        args: JsonValue,
169        principal: impl Into<String>,
170        requested_at: impl Into<String>,
171    ) -> Self {
172        Self {
173            id: id.into(),
174            action: action.into(),
175            args,
176            principal: principal.into(),
177            requested_at: requested_at.into(),
178            deadline: None,
179            approvers_required: 1,
180            evidence_refs: Vec::new(),
181            undo_metadata: JsonValue::Null,
182            capabilities_requested: Vec::new(),
183        }
184    }
185}
186
187pub(crate) fn approval_request_for_host_permission(
188    id: impl Into<String>,
189    action: impl Into<String>,
190    args: JsonValue,
191    principal: impl Into<String>,
192    evidence_refs: Vec<JsonValue>,
193    undo_metadata: JsonValue,
194    capabilities_requested: Vec<String>,
195) -> ApprovalRequest {
196    let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
197    request.evidence_refs = evidence_refs;
198    request.undo_metadata = undo_metadata;
199    request.capabilities_requested = capabilities_requested;
200    request
201}
202
203#[derive(Clone, Debug)]
204struct DispatchKeys {
205    instance_key: String,
206    stable_base: String,
207    agent: String,
208    trace_id: String,
209}
210
211#[derive(Clone, Debug)]
212struct AskUserOptions {
213    schema: Option<VmValue>,
214    timeout: Option<StdDuration>,
215    default: Option<VmValue>,
216}
217
218#[derive(Clone, Debug)]
219struct ApprovalOptions {
220    detail: Option<VmValue>,
221    args: Option<VmValue>,
222    quorum: u32,
223    reviewers: Vec<String>,
224    deadline: StdDuration,
225    principal: Option<String>,
226    evidence_refs: Vec<JsonValue>,
227    undo_metadata: Option<JsonValue>,
228    capabilities_requested: Vec<String>,
229}
230
231#[derive(Clone, Debug)]
232struct ApprovalProgress {
233    request_id: String,
234    reviewers: BTreeSet<String>,
235    signatures: Vec<ApprovalSignature>,
236    reason: Option<String>,
237    approved_at: Option<String>,
238}
239
240#[derive(Clone, Debug, Serialize)]
241struct ApprovalSignature {
242    reviewer: String,
243    signed_at: String,
244    signature: String,
245}
246
247#[derive(Clone, Debug)]
248enum ApprovalResolution {
249    Pending,
250    Approved(ApprovalProgress),
251    Denied(HitlHostResponse),
252}
253
254// `Completed` carries the full `WaitpointRecord`, which dominates the
255// enum's size — boxing it would force every match arm to indirect even
256// though the enum is dropped within nanoseconds of being constructed
257// (it's a local return type for the waitpoint poll loop, never stored).
258// Surfaced by the host-target compile of `harn-vm` introduced when
259// `harn-cli`'s build script gained `harn-vm` as a build-dep for the
260// AOT bytecode embedding pass (G7 / harn#2300).
261#[allow(clippy::large_enum_variant)]
262#[derive(Clone, Debug)]
263enum WaitpointOutcome {
264    Completed(WaitpointRecord),
265    Timeout,
266    Cancelled {
267        wait_id: String,
268        waitpoint_ids: Vec<String>,
269        reason: Option<String>,
270    },
271}
272
273pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
274    for def in MODULE_BUILTINS {
275        vm.register_builtin_def(def);
276    }
277}
278
279pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
280    &ASK_USER_BUILTIN_DEF,
281    &REQUEST_APPROVAL_BUILTIN_DEF,
282    &DUAL_CONTROL_BUILTIN_DEF,
283    &ESCALATE_TO_BUILTIN_DEF,
284];
285
286#[harn_builtin(
287    sig = "ask_user(prompt: string, options?: dict) -> any",
288    kind = "async",
289    category = "hitl"
290)]
291async fn ask_user_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
292    ask_user_impl(&args).await
293}
294
295#[harn_builtin(
296    sig_expr = BuiltinSignature::variadic("request_approval", &[Param::new("args", TY_ANY)], TY_DICT),
297    kind = "async",
298    category = "hitl"
299)]
300async fn request_approval_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
301    request_approval_impl(&args).await
302}
303
304#[harn_builtin(
305    sig = "dual_control(n: int, m: int, action: closure, approvers?: list) -> dict",
306    kind = "async",
307    category = "hitl"
308)]
309async fn dual_control_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
310    dual_control_impl(&args).await
311}
312
313#[harn_builtin(
314    sig = "escalate_to(role: string, reason: string) -> dict",
315    kind = "async",
316    category = "hitl"
317)]
318async fn escalate_to_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
319    escalate_to_impl(&args).await
320}
321
322pub(crate) fn reset_hitl_state() {
323    REQUEST_SEQUENCE.with(|slot| {
324        *slot.borrow_mut() = RequestSequenceState::default();
325    });
326}
327
328pub(crate) fn take_hitl_state() -> RequestSequenceState {
329    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
330}
331
332pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
333    REQUEST_SEQUENCE.with(|slot| {
334        *slot.borrow_mut() = state;
335    });
336}
337
338pub async fn append_hitl_response(
339    base_dir: Option<&Path>,
340    mut response: HitlHostResponse,
341) -> Result<u64, String> {
342    let kind = HitlRequestKind::from_request_id(&response.request_id)
343        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
344    if response.responded_at.is_none() {
345        response.responded_at = Some(now_rfc3339());
346    }
347    let log = ensure_hitl_event_log_for(base_dir)?;
348    let headers = response_headers(&response.request_id);
349    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
350    let event_id = log
351        .append(
352            &topic,
353            LogEvent::new(
354                match kind {
355                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
356                    _ => "hitl.response_received",
357                },
358                serde_json::to_value(&response).map_err(|error| error.to_string())?,
359            )
360            .with_headers(headers),
361        )
362        .await
363        .map_err(|error| error.to_string())?;
364    finalize_hitl_response(&log, kind, &response).await?;
365    Ok(event_id)
366}
367
368pub async fn append_approval_request_on(
369    log: &Arc<AnyEventLog>,
370    agent: impl Into<String>,
371    trace_id: impl Into<String>,
372    action: impl Into<String>,
373    detail: JsonValue,
374    reviewers: Vec<String>,
375) -> Result<String, VmError> {
376    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
377    let trace_id = trace_id.into();
378    let agent = agent.into();
379    let requested_at_time = OffsetDateTime::now_utc();
380    let requested_at = format_rfc3339(requested_at_time);
381    let mut approval_request = ApprovalRequest::new(
382        request_id.clone(),
383        action.into(),
384        detail.clone(),
385        agent.clone(),
386        requested_at.clone(),
387    );
388    approval_request.deadline = deadline_after(
389        requested_at_time,
390        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
391    );
392    approval_request.approvers_required = 1;
393    let approval_request_json = serde_json::to_value(&approval_request)
394        .map_err(|error| VmError::Runtime(error.to_string()))?;
395    let request = HitlRequestEnvelope {
396        request_id: request_id.clone(),
397        kind: HitlRequestKind::Approval,
398        agent,
399        trace_id: trace_id.clone(),
400        run_id: None,
401        requested_at: requested_at.clone(),
402        payload: json!({
403            "approval_request": approval_request_json,
404            "id": approval_request.id,
405            "action": approval_request.action,
406            "args": approval_request.args,
407            "principal": approval_request.principal,
408            "requested_at": requested_at,
409            "deadline": approval_request.deadline,
410            "approvers_required": approval_request.approvers_required,
411            "evidence_refs": approval_request.evidence_refs,
412            "undo_metadata": approval_request.undo_metadata,
413            "capabilities_requested": approval_request.capabilities_requested,
414            "detail": detail,
415            "quorum": 1,
416            "reviewers": reviewers,
417            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
418        }),
419    };
420    create_request_waitpoint(log, &request).await?;
421    append_request(log, &request).await?;
422    maybe_notify_host(&request);
423    Ok(request_id)
424}
425
426async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
427    let prompt = required_string_arg(args, 0, "ask_user")?;
428    let options = parse_ask_user_options(args.get(1))?;
429    let keys = current_dispatch_keys();
430    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
431    let trace_id = keys
432        .as_ref()
433        .map(|keys| keys.trace_id.clone())
434        .unwrap_or_else(new_trace_id);
435    let log = ensure_hitl_event_log();
436    let request = HitlRequestEnvelope {
437        request_id: request_id.clone(),
438        kind: HitlRequestKind::Question,
439        agent: keys
440            .as_ref()
441            .map(|keys| keys.agent.clone())
442            .unwrap_or_default(),
443        trace_id: trace_id.clone(),
444        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
445        requested_at: now_rfc3339(),
446        payload: json!({
447            "prompt": prompt,
448            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
449            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
450            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
451        }),
452    };
453    create_request_waitpoint(&log, &request).await?;
454    append_request(&log, &request).await?;
455    maybe_notify_host(&request);
456    emit_hitl_requested(&request);
457    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
458
459    match wait_for_request_waitpoint_with_events(
460        &request_id,
461        HitlRequestKind::Question,
462        options.timeout,
463    )
464    .await?
465    {
466        WaitpointOutcome::Completed(record) => {
467            let answer = record
468                .value
469                .as_ref()
470                .map(crate::stdlib::json_to_vm_value)
471                .unwrap_or(VmValue::Nil);
472            if let Some(schema) = options.schema.as_ref() {
473                return schema_expect_value(&answer, schema, true);
474            }
475            if let Some(default) = options.default.as_ref() {
476                return Ok(coerce_like_default(&answer, default));
477            }
478            Ok(answer)
479        }
480        WaitpointOutcome::Timeout => {
481            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
482            if let Some(default) = options.default {
483                return Ok(default);
484            }
485            Err(timeout_error(&request_id, HitlRequestKind::Question))
486        }
487        WaitpointOutcome::Cancelled {
488            wait_id,
489            waitpoint_ids,
490            reason,
491        } => Err(hitl_cancelled_error(
492            &request_id,
493            HitlRequestKind::Question,
494            &wait_id,
495            &waitpoint_ids,
496            reason,
497        )),
498    }
499}
500
501async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
502    let action = required_string_arg(args, 0, "request_approval")?;
503    let options = parse_approval_options(args.get(1), "request_approval")?;
504    let keys = current_dispatch_keys();
505    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
506    let trace_id = keys
507        .as_ref()
508        .map(|keys| keys.trace_id.clone())
509        .unwrap_or_else(new_trace_id);
510    let agent = keys
511        .as_ref()
512        .map(|keys| keys.agent.clone())
513        .unwrap_or_default();
514    let requested_at_time = OffsetDateTime::now_utc();
515    let requested_at = format_rfc3339(requested_at_time);
516    let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
517    let approval_args = options
518        .args
519        .as_ref()
520        .or(options.detail.as_ref())
521        .map(crate::llm::vm_value_to_json)
522        .unwrap_or(JsonValue::Null);
523    let mut approval_request = ApprovalRequest::new(
524        request_id.clone(),
525        action.clone(),
526        approval_args,
527        principal,
528        requested_at.clone(),
529    );
530    approval_request.deadline = deadline_after(requested_at_time, options.deadline);
531    approval_request.approvers_required = options.quorum;
532    approval_request.evidence_refs = options.evidence_refs.clone();
533    approval_request.undo_metadata = options
534        .undo_metadata
535        .clone()
536        .or_else(|| {
537            crate::orchestration::current_mutation_session()
538                .and_then(|session| serde_json::to_value(session).ok())
539        })
540        .unwrap_or(JsonValue::Null);
541    approval_request.capabilities_requested = options.capabilities_requested.clone();
542    let approval_request_json = serde_json::to_value(&approval_request)
543        .map_err(|error| VmError::Runtime(error.to_string()))?;
544    let log = ensure_hitl_event_log();
545    let request = HitlRequestEnvelope {
546        request_id: request_id.clone(),
547        kind: HitlRequestKind::Approval,
548        agent,
549        trace_id: trace_id.clone(),
550        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
551        requested_at: requested_at.clone(),
552        payload: json!({
553            "approval_request": approval_request_json,
554            "id": approval_request.id,
555            "action": action,
556            "args": approval_request.args,
557            "principal": approval_request.principal,
558            "requested_at": requested_at,
559            "deadline": approval_request.deadline,
560            "approvers_required": approval_request.approvers_required,
561            "evidence_refs": approval_request.evidence_refs,
562            "undo_metadata": approval_request.undo_metadata,
563            "capabilities_requested": approval_request.capabilities_requested,
564            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
565            "quorum": options.quorum,
566            "reviewers": options.reviewers,
567            "deadline_ms": options.deadline.as_millis() as u64,
568        }),
569    };
570    create_request_waitpoint(&log, &request).await?;
571    append_request(&log, &request).await?;
572    maybe_notify_host(&request);
573    emit_hitl_requested(&request);
574    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
575
576    match wait_for_request_waitpoint_with_events(
577        &request_id,
578        HitlRequestKind::Approval,
579        Some(options.deadline),
580    )
581    .await?
582    {
583        WaitpointOutcome::Completed(record) => {
584            approval_record_from_waitpoint(&record, "request_approval")
585        }
586        WaitpointOutcome::Timeout => {
587            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
588            Err(timeout_error(&request_id, HitlRequestKind::Approval))
589        }
590        WaitpointOutcome::Cancelled { .. } => {
591            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
592        }
593    }
594}
595
596pub(crate) async fn request_approval_for_side_effect(
597    action: &str,
598    detail: JsonValue,
599    principal: String,
600    reviewers: Vec<String>,
601    capabilities_requested: Vec<String>,
602) -> Result<VmValue, VmError> {
603    let mut options = BTreeMap::new();
604    options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
605    options.insert(
606        "detail".to_string(),
607        crate::stdlib::json_to_vm_value(&detail),
608    );
609    options.insert(
610        "principal".to_string(),
611        VmValue::String(Rc::from(principal)),
612    );
613    options.insert(
614        "reviewers".to_string(),
615        VmValue::List(Rc::new(
616            reviewers
617                .into_iter()
618                .map(|reviewer| VmValue::String(Rc::from(reviewer)))
619                .collect(),
620        )),
621    );
622    options.insert(
623        "capabilities_requested".to_string(),
624        VmValue::List(Rc::new(
625            capabilities_requested
626                .into_iter()
627                .map(|capability| VmValue::String(Rc::from(capability)))
628                .collect(),
629        )),
630    );
631    let args = vec![
632        VmValue::String(Rc::from(action.to_string())),
633        VmValue::Dict(Rc::new(options)),
634    ];
635    request_approval_impl(&args).await
636}
637
638async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
639    let n = required_positive_int_arg(args, 0, "dual_control")?;
640    let m = required_positive_int_arg(args, 1, "dual_control")?;
641    if n > m {
642        return Err(VmError::Runtime(
643            "dual_control: n must be less than or equal to m".to_string(),
644        ));
645    }
646    let action = args
647        .get(2)
648        .and_then(|value| match value {
649            VmValue::Closure(closure) => Some(closure.clone()),
650            _ => None,
651        })
652        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
653    let approvers = optional_string_list(args.get(3), "dual_control")?;
654    if !approvers.is_empty() && approvers.len() < m as usize {
655        return Err(VmError::Runtime(format!(
656            "dual_control: expected at least {m} approvers, got {}",
657            approvers.len()
658        )));
659    }
660
661    let keys = current_dispatch_keys();
662    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
663    let trace_id = keys
664        .as_ref()
665        .map(|keys| keys.trace_id.clone())
666        .unwrap_or_else(new_trace_id);
667    let action_name = if action.func.name.is_empty() {
668        "anonymous".to_string()
669    } else {
670        action.func.name.clone()
671    };
672    let agent = keys
673        .as_ref()
674        .map(|keys| keys.agent.clone())
675        .unwrap_or_default();
676    let requested_at_time = OffsetDateTime::now_utc();
677    let requested_at = format_rfc3339(requested_at_time);
678    let mut approval_request = ApprovalRequest::new(
679        request_id.clone(),
680        action_name.clone(),
681        json!({
682            "n": n,
683            "m": m,
684            "approvers": approvers.clone(),
685        }),
686        agent.clone(),
687        requested_at.clone(),
688    );
689    approval_request.deadline = deadline_after(
690        requested_at_time,
691        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
692    );
693    approval_request.approvers_required = n as u32;
694    approval_request.undo_metadata = crate::orchestration::current_mutation_session()
695        .and_then(|session| serde_json::to_value(session).ok())
696        .unwrap_or(JsonValue::Null);
697    let approval_request_json = serde_json::to_value(&approval_request)
698        .map_err(|error| VmError::Runtime(error.to_string()))?;
699    let log = ensure_hitl_event_log();
700    let request = HitlRequestEnvelope {
701        request_id: request_id.clone(),
702        kind: HitlRequestKind::DualControl,
703        agent,
704        trace_id: trace_id.clone(),
705        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
706        requested_at: requested_at.clone(),
707        payload: json!({
708            "approval_request": approval_request_json,
709            "id": approval_request.id,
710            "args": approval_request.args,
711            "principal": approval_request.principal,
712            "requested_at": requested_at,
713            "deadline": approval_request.deadline,
714            "approvers_required": approval_request.approvers_required,
715            "evidence_refs": approval_request.evidence_refs,
716            "undo_metadata": approval_request.undo_metadata,
717            "capabilities_requested": approval_request.capabilities_requested,
718            "n": n,
719            "m": m,
720            "action": action_name,
721            "approvers": approvers,
722            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
723        }),
724    };
725    create_request_waitpoint(&log, &request).await?;
726    append_request(&log, &request).await?;
727    maybe_notify_host(&request);
728    emit_hitl_requested(&request);
729    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
730
731    match wait_for_request_waitpoint_with_events(
732        &request_id,
733        HitlRequestKind::DualControl,
734        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
735    )
736    .await?
737    {
738        WaitpointOutcome::Completed(record) => {
739            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
740            let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
741                VmError::Runtime("dual_control requires an async builtin VM context".to_string())
742            })?;
743            let result = vm.call_closure_pub(&action, &[]).await?;
744
745            append_named_event(
746                &log,
747                HitlRequestKind::DualControl,
748                "hitl.dual_control_executed",
749                &request_id,
750                &trace_id,
751                json!({
752                    "request_id": request_id,
753                    "result": crate::llm::vm_value_to_json(&result),
754                }),
755            )
756            .await?;
757
758            Ok(result)
759        }
760        WaitpointOutcome::Timeout => {
761            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
762            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
763        }
764        WaitpointOutcome::Cancelled { .. } => {
765            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
766        }
767    }
768}
769
770async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
771    let role = required_string_arg(args, 0, "escalate_to")?;
772    let reason = required_string_arg(args, 1, "escalate_to")?;
773    let keys = current_dispatch_keys();
774    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
775    let trace_id = keys
776        .as_ref()
777        .map(|keys| keys.trace_id.clone())
778        .unwrap_or_else(new_trace_id);
779    let log = ensure_hitl_event_log();
780    let request = HitlRequestEnvelope {
781        request_id: request_id.clone(),
782        kind: HitlRequestKind::Escalation,
783        agent: keys
784            .as_ref()
785            .map(|keys| keys.agent.clone())
786            .unwrap_or_default(),
787        trace_id: trace_id.clone(),
788        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
789        requested_at: now_rfc3339(),
790        payload: json!({
791            "role": role,
792            "reason": reason,
793            "capability_policy": escalation_capability_policy(),
794        }),
795    };
796    create_request_waitpoint(&log, &request).await?;
797    append_request(&log, &request).await?;
798    maybe_notify_host(&request);
799    emit_hitl_requested(&request);
800    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
801
802    match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
803        .await?
804    {
805        WaitpointOutcome::Completed(record) => {
806            let accepted_at = record.completed_at.clone();
807            let reviewer = record.completed_by.clone();
808            let accepted = record
809                .value
810                .as_ref()
811                .and_then(|value| value.get("accepted"))
812                .and_then(JsonValue::as_bool)
813                .unwrap_or(true);
814            Ok(crate::stdlib::json_to_vm_value(&json!({
815                "request_id": request_id,
816                "role": role,
817                "reason": reason,
818                "trace_id": trace_id,
819                "status": if accepted { "accepted" } else { "pending" },
820                "accepted_at": accepted_at,
821                "reviewer": reviewer,
822            })))
823        }
824        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
825        WaitpointOutcome::Cancelled {
826            wait_id,
827            waitpoint_ids,
828            reason,
829        } => Err(hitl_cancelled_error(
830            &request_id,
831            HitlRequestKind::Escalation,
832            &wait_id,
833            &waitpoint_ids,
834            reason,
835        )),
836    }
837}
838
839async fn create_request_waitpoint(
840    log: &Arc<AnyEventLog>,
841    request: &HitlRequestEnvelope,
842) -> Result<(), VmError> {
843    create_waitpoint_on(
844        log,
845        Some(request.request_id.clone()),
846        Some(json!({
847            "kind": request.kind.as_str(),
848            "agent": request.agent.clone(),
849            "trace_id": request.trace_id.clone(),
850            "requested_at": request.requested_at.clone(),
851            "payload": request.payload.clone(),
852        })),
853    )
854    .await?;
855    Ok(())
856}
857
858async fn wait_for_request_waitpoint(
859    request_id: &str,
860    timeout: Option<StdDuration>,
861) -> Result<WaitpointOutcome, VmError> {
862    match wait_on_waitpoints(
863        vec![request_id.to_string()],
864        WaitpointWaitOptions { timeout },
865    )
866    .await
867    {
868        Ok(records) => Ok(WaitpointOutcome::Completed(
869            records
870                .into_iter()
871                .next()
872                .expect("single waitpoint wait result"),
873        )),
874        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
875        Err(WaitpointWaitFailure::Cancelled {
876            wait_id,
877            waitpoint_ids,
878            reason,
879        }) => Ok(WaitpointOutcome::Cancelled {
880            wait_id,
881            waitpoint_ids,
882            reason,
883        }),
884        Err(WaitpointWaitFailure::Vm(error)) => {
885            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
886                return Ok(outcome);
887            }
888            Err(error)
889        }
890    }
891}
892
893fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
894    let VmError::Thrown(VmValue::Dict(dict)) = error else {
895        return None;
896    };
897    let name = dict.get("name").and_then(vm_string)?;
898    match name {
899        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
900        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
901            wait_id: dict
902                .get("wait_id")
903                .and_then(vm_string)
904                .unwrap_or_default()
905                .to_string(),
906            waitpoint_ids: dict
907                .get("waitpoint_ids")
908                .and_then(vm_string_list)
909                .unwrap_or_default(),
910            reason: dict
911                .get("reason")
912                .and_then(vm_string)
913                .map(ToString::to_string),
914        }),
915        _ => None,
916    }
917}
918
919async fn finalize_hitl_response(
920    log: &Arc<AnyEventLog>,
921    kind: HitlRequestKind,
922    response: &HitlHostResponse,
923) -> Result<(), String> {
924    match kind {
925        HitlRequestKind::Question => {
926            if waitpoint_is_terminal(log, &response.request_id).await? {
927                return Ok(());
928            }
929            complete_waitpoint_on(
930                log,
931                &response.request_id,
932                response.answer.clone(),
933                response.reviewer.clone(),
934                response.reason.clone(),
935                response.metadata.clone(),
936            )
937            .await
938            .map(|_| ())
939            .map_err(|error| error.to_string())
940        }
941        HitlRequestKind::Escalation => {
942            if !response.accepted.unwrap_or(false)
943                || waitpoint_is_terminal(log, &response.request_id).await?
944            {
945                return Ok(());
946            }
947            complete_waitpoint_on(
948                log,
949                &response.request_id,
950                Some(json!({
951                    "accepted": true,
952                    "reviewer": response.reviewer,
953                    "reason": response.reason,
954                    "responded_at": response.responded_at,
955                })),
956                response.reviewer.clone(),
957                response.reason.clone(),
958                response.metadata.clone(),
959            )
960            .await
961            .map(|_| ())
962            .map_err(|error| error.to_string())
963        }
964        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
965            if waitpoint_is_terminal(log, &response.request_id).await? {
966                return Ok(());
967            }
968            let request = load_request_envelope(log, kind, &response.request_id)
969                .await
970                .map_err(|error| error.to_string())?;
971            match resolve_approval_state(log, kind, &request)
972                .await
973                .map_err(|error| error.to_string())?
974            {
975                ApprovalResolution::Pending => Ok(()),
976                ApprovalResolution::Approved(progress) => {
977                    let record = approval_record_json(&progress);
978                    append_named_event(
979                        log,
980                        kind,
981                        approved_event_kind(kind),
982                        &request.request_id,
983                        &request.trace_id,
984                        json!({
985                            "request_id": request.request_id.clone(),
986                            "record": record.clone(),
987                        }),
988                    )
989                    .await
990                    .map_err(|error| error.to_string())?;
991                    complete_waitpoint_on(
992                        log,
993                        &request.request_id,
994                        Some(record),
995                        response.reviewer.clone(),
996                        progress.reason.clone(),
997                        response.metadata.clone(),
998                    )
999                    .await
1000                    .map(|_| ())
1001                    .map_err(|error| error.to_string())
1002                }
1003                ApprovalResolution::Denied(denied) => {
1004                    append_named_event(
1005                        log,
1006                        kind,
1007                        denied_event_kind(kind),
1008                        &request.request_id,
1009                        &request.trace_id,
1010                        json!({
1011                            "request_id": request.request_id.clone(),
1012                            "reviewer": denied.reviewer.clone(),
1013                            "reason": denied.reason.clone(),
1014                        }),
1015                    )
1016                    .await
1017                    .map_err(|error| error.to_string())?;
1018                    cancel_waitpoint_on(
1019                        log,
1020                        &request.request_id,
1021                        denied.reviewer.clone(),
1022                        denied.reason.clone(),
1023                        denied.metadata.clone(),
1024                    )
1025                    .await
1026                    .map(|_| ())
1027                    .map_err(|error| error.to_string())
1028                }
1029            }
1030        }
1031    }
1032}
1033
1034async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1035    Ok(inspect_waitpoint_on(log, request_id)
1036        .await
1037        .map_err(|error| error.to_string())?
1038        .is_some_and(|record| record.status != WaitpointStatus::Open))
1039}
1040
1041async fn load_request_envelope(
1042    log: &Arc<AnyEventLog>,
1043    kind: HitlRequestKind,
1044    request_id: &str,
1045) -> Result<HitlRequestEnvelope, VmError> {
1046    let topic = topic(kind)?;
1047    let events = log
1048        .read_range(&topic, None, usize::MAX)
1049        .await
1050        .map_err(log_error)?;
1051    events
1052        .into_iter()
1053        .filter(|(_, event)| event.kind == kind.request_event_kind())
1054        .find_map(|(_, event)| {
1055            if !event_matches_request(&event, request_id) {
1056                return None;
1057            }
1058            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1059        })
1060        .ok_or_else(|| {
1061            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1062        })
1063}
1064
1065async fn resolve_approval_state(
1066    log: &Arc<AnyEventLog>,
1067    kind: HitlRequestKind,
1068    request: &HitlRequestEnvelope,
1069) -> Result<ApprovalResolution, VmError> {
1070    let quorum = approval_quorum_from_request(kind, request)?;
1071    let allowed_reviewers = approval_reviewers_from_request(kind, request)
1072        .into_iter()
1073        .collect::<BTreeSet<_>>();
1074    let mut progress = ApprovalProgress {
1075        request_id: request.request_id.clone(),
1076        reviewers: BTreeSet::new(),
1077        signatures: Vec::new(),
1078        reason: None,
1079        approved_at: None,
1080    };
1081    let topic = topic(kind)?;
1082    let events = log
1083        .read_range(&topic, None, usize::MAX)
1084        .await
1085        .map_err(log_error)?;
1086    for (_, event) in events {
1087        if !event_matches_request(&event, &request.request_id)
1088            || event.kind != "hitl.response_received"
1089        {
1090            continue;
1091        }
1092        let response: HitlHostResponse = serde_json::from_value(event.payload)
1093            .map_err(|error| VmError::Runtime(error.to_string()))?;
1094        if let Some(reviewer) = response.reviewer.as_deref() {
1095            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1096                continue;
1097            }
1098            if progress.reviewers.contains(reviewer) {
1099                continue;
1100            }
1101        }
1102        if response.approved.unwrap_or(false) {
1103            if let Some(reviewer) = response.reviewer.clone() {
1104                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1105                progress.reviewers.insert(reviewer.clone());
1106                progress.signatures.push(ApprovalSignature {
1107                    reviewer: reviewer.clone(),
1108                    signed_at: signed_at.clone(),
1109                    signature: response.signature.clone().unwrap_or_else(|| {
1110                        approval_receipt_signature(
1111                            &request.request_id,
1112                            &reviewer,
1113                            &signed_at,
1114                            true,
1115                            response.reason.as_deref(),
1116                        )
1117                    }),
1118                });
1119            }
1120            progress.reason = response.reason.clone();
1121            progress.approved_at = response.responded_at.clone();
1122            if progress.reviewers.len() as u32 >= quorum {
1123                return Ok(ApprovalResolution::Approved(progress));
1124            }
1125            continue;
1126        }
1127        return Ok(ApprovalResolution::Denied(response));
1128    }
1129    Ok(ApprovalResolution::Pending)
1130}
1131
1132fn approval_quorum_from_request(
1133    kind: HitlRequestKind,
1134    request: &HitlRequestEnvelope,
1135) -> Result<u32, VmError> {
1136    let key = match kind {
1137        HitlRequestKind::DualControl => "n",
1138        _ => "quorum",
1139    };
1140    let quorum = request
1141        .payload
1142        .get(key)
1143        .or_else(|| request.payload.get("approvers_required"))
1144        .or_else(|| {
1145            request
1146                .payload
1147                .get("approval_request")
1148                .and_then(|approval| approval.get("approvers_required"))
1149        })
1150        .and_then(JsonValue::as_u64)
1151        .unwrap_or(1);
1152    u32::try_from(quorum).map_err(|_| {
1153        VmError::Runtime(format!(
1154            "invalid quorum in HITL request '{}'",
1155            request.request_id
1156        ))
1157    })
1158}
1159
1160fn approval_reviewers_from_request(
1161    kind: HitlRequestKind,
1162    request: &HitlRequestEnvelope,
1163) -> Vec<String> {
1164    let key = match kind {
1165        HitlRequestKind::DualControl => "approvers",
1166        _ => "reviewers",
1167    };
1168    request
1169        .payload
1170        .get(key)
1171        .or_else(|| {
1172            request
1173                .payload
1174                .get("approval_request")
1175                .and_then(|approval| approval.get("reviewers"))
1176        })
1177        .and_then(JsonValue::as_array)
1178        .map(|values| {
1179            values
1180                .iter()
1181                .filter_map(JsonValue::as_str)
1182                .map(str::to_string)
1183                .collect()
1184        })
1185        .unwrap_or_default()
1186}
1187
1188fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1189    json!({
1190        "request_id": progress.request_id.clone(),
1191        "approved": true,
1192        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1193        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1194        "reason": progress.reason,
1195        "signatures": progress.signatures,
1196    })
1197}
1198
1199fn approval_receipt_signature(
1200    request_id: &str,
1201    reviewer: &str,
1202    signed_at: &str,
1203    approved: bool,
1204    reason: Option<&str>,
1205) -> String {
1206    let material = format!(
1207        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1208        reason.unwrap_or("")
1209    );
1210    let hash = sha2::Sha256::digest(material.as_bytes());
1211    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1212    format!("sha256:{hex}")
1213}
1214
1215fn approval_record_from_waitpoint(
1216    record: &WaitpointRecord,
1217    builtin: &str,
1218) -> Result<VmValue, VmError> {
1219    record
1220        .value
1221        .as_ref()
1222        .map(crate::stdlib::json_to_vm_value)
1223        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1224}
1225
1226async fn approval_wait_error(
1227    log: &Arc<AnyEventLog>,
1228    kind: HitlRequestKind,
1229    request_id: &str,
1230) -> VmError {
1231    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1232        if record.status == WaitpointStatus::Cancelled
1233            && record.reason.as_deref() != Some("upstream_cancelled")
1234        {
1235            return approval_denied_error(
1236                request_id,
1237                HitlHostResponse {
1238                    request_id: request_id.to_string(),
1239                    answer: None,
1240                    approved: Some(false),
1241                    accepted: None,
1242                    reviewer: record.cancelled_by.clone(),
1243                    reason: record.reason.clone(),
1244                    metadata: record.metadata.clone(),
1245                    responded_at: record.cancelled_at,
1246                    signature: None,
1247                },
1248            );
1249        }
1250        if record.status == WaitpointStatus::Cancelled {
1251            return hitl_cancelled_error(
1252                request_id,
1253                kind,
1254                "",
1255                &[request_id.to_string()],
1256                record.reason,
1257            );
1258        }
1259    }
1260    hitl_cancelled_error(
1261        request_id,
1262        kind,
1263        "",
1264        &[request_id.to_string()],
1265        Some("upstream_cancelled".to_string()),
1266    )
1267}
1268
1269async fn append_timeout_once(
1270    log: &Arc<AnyEventLog>,
1271    kind: HitlRequestKind,
1272    request_id: &str,
1273    trace_id: &str,
1274) -> Result<(), VmError> {
1275    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1276        return Ok(());
1277    }
1278    append_timeout(log, kind, request_id, trace_id).await
1279}
1280
1281async fn hitl_event_exists(
1282    log: &Arc<AnyEventLog>,
1283    kind: HitlRequestKind,
1284    request_id: &str,
1285    event_kind: &str,
1286) -> Result<bool, VmError> {
1287    let topic = topic(kind)?;
1288    let events = log
1289        .read_range(&topic, None, usize::MAX)
1290        .await
1291        .map_err(log_error)?;
1292    Ok(events
1293        .into_iter()
1294        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1295}
1296
1297fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1298    match kind {
1299        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1300        _ => "hitl.approval_approved",
1301    }
1302}
1303
1304fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1305    match kind {
1306        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1307        _ => "hitl.approval_denied",
1308    }
1309}
1310
1311async fn append_request(
1312    log: &Arc<AnyEventLog>,
1313    request: &HitlRequestEnvelope,
1314) -> Result<(), VmError> {
1315    let topic = topic(request.kind)?;
1316    log.append(
1317        &topic,
1318        LogEvent::new(
1319            request.kind.request_event_kind(),
1320            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1321        )
1322        .with_headers(request_headers(request)),
1323    )
1324    .await
1325    .map(|_| ())
1326    .map_err(log_error)
1327}
1328
1329async fn append_named_event(
1330    log: &Arc<AnyEventLog>,
1331    kind: HitlRequestKind,
1332    event_kind: &str,
1333    request_id: &str,
1334    trace_id: &str,
1335    payload: JsonValue,
1336) -> Result<(), VmError> {
1337    let topic = topic(kind)?;
1338    let headers = headers_with_trace(request_id, trace_id);
1339    log.append(
1340        &topic,
1341        LogEvent::new(event_kind, payload).with_headers(headers),
1342    )
1343    .await
1344    .map(|_| ())
1345    .map_err(log_error)
1346}
1347
1348async fn append_timeout(
1349    log: &Arc<AnyEventLog>,
1350    kind: HitlRequestKind,
1351    request_id: &str,
1352    trace_id: &str,
1353) -> Result<(), VmError> {
1354    append_named_event(
1355        log,
1356        kind,
1357        "hitl.timeout",
1358        request_id,
1359        trace_id,
1360        serde_json::to_value(HitlTimeoutRecord {
1361            request_id: request_id.to_string(),
1362            kind,
1363            trace_id: trace_id.to_string(),
1364            timed_out_at: now_rfc3339(),
1365        })
1366        .map_err(|error| VmError::Runtime(error.to_string()))?,
1367    )
1368    .await
1369}
1370
1371async fn maybe_apply_mock_response(
1372    kind: HitlRequestKind,
1373    request_id: &str,
1374    request_payload: &JsonValue,
1375) -> Result<(), VmError> {
1376    let mut params = request_payload
1377        .as_object()
1378        .cloned()
1379        .unwrap_or_default()
1380        .into_iter()
1381        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1382        .collect::<BTreeMap<_, _>>();
1383    params.insert(
1384        "request_id".to_string(),
1385        VmValue::String(Rc::from(request_id.to_string())),
1386    );
1387    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1388        return Ok(());
1389    };
1390    let value = result?;
1391    let responses = match value {
1392        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1393        other => vec![other],
1394    };
1395    for response in responses {
1396        let response_dict = response.as_dict().ok_or_else(|| {
1397            VmError::Runtime(format!(
1398                "mocked HITL {} response must be a dict or list<dict>",
1399                kind.as_str()
1400            ))
1401        })?;
1402        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1403        append_hitl_response(None, hitl_response)
1404            .await
1405            .map_err(VmError::Runtime)?;
1406    }
1407    Ok(())
1408}
1409
1410fn parse_hitl_response_dict(
1411    request_id: &str,
1412    response_dict: &BTreeMap<String, VmValue>,
1413) -> Result<HitlHostResponse, VmError> {
1414    Ok(HitlHostResponse {
1415        request_id: request_id.to_string(),
1416        answer: response_dict
1417            .get("answer")
1418            .map(crate::llm::vm_value_to_json),
1419        approved: response_dict.get("approved").and_then(vm_bool),
1420        accepted: response_dict.get("accepted").and_then(vm_bool),
1421        reviewer: response_dict.get("reviewer").map(VmValue::display),
1422        reason: response_dict.get("reason").map(VmValue::display),
1423        metadata: response_dict
1424            .get("metadata")
1425            .map(crate::llm::vm_value_to_json),
1426        responded_at: response_dict.get("responded_at").map(VmValue::display),
1427        signature: response_dict.get("signature").map(VmValue::display),
1428    })
1429}
1430
1431fn maybe_notify_host(request: &HitlRequestEnvelope) {
1432    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1433        return;
1434    };
1435    bridge.notify(
1436        "harn.hitl.requested",
1437        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1438    );
1439}
1440
1441/// Emit a `HitlRequested` `AgentEvent` so transport adapters
1442/// (currently the A2A `A2aWorkerSink`) can flip a task into
1443/// `input-required` while the script is suspended on the waitpoint.
1444/// No-op when there is no current agent session — the bridge-level
1445/// `harn.hitl.requested` notification still fires for hosts that drive
1446/// HITL UX through the bridge.
1447fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1448    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1449        return;
1450    };
1451    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1452        session_id,
1453        request_id: request.request_id.clone(),
1454        kind: request.kind.as_str().to_string(),
1455        payload: request.payload.clone(),
1456    });
1457}
1458
1459/// Companion to `emit_hitl_requested`: notifies sinks that the
1460/// suspended waitpoint has resolved so a paused task can flip back
1461/// out of `input-required`. `outcome` is one of `"answered"`,
1462/// `"timeout"`, `"cancelled"`, or `"error"`.
1463fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1464    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1465        return;
1466    };
1467    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1468        session_id,
1469        request_id: request_id.to_string(),
1470        kind: kind.as_str().to_string(),
1471        outcome: outcome.to_string(),
1472    });
1473}
1474
1475/// Wrapper around `wait_for_request_waitpoint` that emits the
1476/// canonical `HitlResolved` `AgentEvent` regardless of which terminal
1477/// branch the waitpoint takes (response / timeout / cancellation /
1478/// error). Pair-emitted with `emit_hitl_requested` so transport
1479/// adapters can bracket the `input-required` pause cleanly without
1480/// each `*_impl` having to duplicate the emission at every match arm.
1481async fn wait_for_request_waitpoint_with_events(
1482    request_id: &str,
1483    kind: HitlRequestKind,
1484    timeout: Option<StdDuration>,
1485) -> Result<WaitpointOutcome, VmError> {
1486    let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1487    let label = match &outcome {
1488        Ok(WaitpointOutcome::Completed(_)) => "answered",
1489        Ok(WaitpointOutcome::Timeout) => "timeout",
1490        Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1491        Err(_) => "error",
1492    };
1493    emit_hitl_resolved(request_id, kind, label);
1494    outcome
1495}
1496
1497fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1498    let Some(value) = value else {
1499        return Ok(AskUserOptions {
1500            schema: None,
1501            timeout: Some(default_question_timeout()),
1502            default: None,
1503        });
1504    };
1505    let dict = value
1506        .as_dict()
1507        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1508    Ok(AskUserOptions {
1509        schema: dict
1510            .get("schema")
1511            .cloned()
1512            .filter(|value| !matches!(value, VmValue::Nil)),
1513        timeout: dict
1514            .get("timeout")
1515            .map(parse_duration_value)
1516            .transpose()?
1517            .or_else(|| Some(default_question_timeout())),
1518        default: dict
1519            .get("default")
1520            .cloned()
1521            .filter(|value| !matches!(value, VmValue::Nil)),
1522    })
1523}
1524
1525fn default_question_timeout() -> StdDuration {
1526    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1527}
1528
1529fn escalation_capability_policy() -> JsonValue {
1530    crate::orchestration::current_execution_policy()
1531        .and_then(|policy| serde_json::to_value(policy).ok())
1532        .unwrap_or(JsonValue::Null)
1533}
1534
1535fn parse_approval_options(
1536    value: Option<&VmValue>,
1537    builtin: &str,
1538) -> Result<ApprovalOptions, VmError> {
1539    let dict = match value {
1540        None => None,
1541        Some(VmValue::Dict(dict)) => Some(dict),
1542        Some(_) => {
1543            return Err(VmError::Runtime(format!(
1544                "{builtin}: options must be a dict"
1545            )))
1546        }
1547    };
1548    let quorum = dict
1549        .and_then(|dict| dict.get("quorum"))
1550        .and_then(VmValue::as_int)
1551        .unwrap_or(1);
1552    if quorum <= 0 {
1553        return Err(VmError::Runtime(format!(
1554            "{builtin}: quorum must be positive"
1555        )));
1556    }
1557    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1558    let capabilities_requested = optional_string_list(
1559        dict.and_then(|dict| dict.get("capabilities_requested")),
1560        builtin,
1561    )?;
1562    let evidence_refs = dict
1563        .and_then(|dict| dict.get("evidence_refs"))
1564        .map(|value| match value {
1565            VmValue::List(items) => Ok(items
1566                .iter()
1567                .map(crate::llm::vm_value_to_json)
1568                .collect::<Vec<_>>()),
1569            _ => Err(VmError::Runtime(format!(
1570                "{builtin}: evidence_refs must be a list"
1571            ))),
1572        })
1573        .transpose()?
1574        .unwrap_or_default();
1575    let deadline = dict
1576        .and_then(|dict| dict.get("deadline"))
1577        .map(parse_duration_value)
1578        .transpose()?
1579        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1580    Ok(ApprovalOptions {
1581        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1582        args: dict.and_then(|dict| dict.get("args")).cloned(),
1583        quorum: quorum as u32,
1584        reviewers,
1585        deadline,
1586        principal: dict
1587            .and_then(|dict| dict.get("principal"))
1588            .map(VmValue::display)
1589            .filter(|value| !value.is_empty()),
1590        evidence_refs,
1591        undo_metadata: dict
1592            .and_then(|dict| dict.get("undo_metadata"))
1593            .map(crate::llm::vm_value_to_json),
1594        capabilities_requested,
1595    })
1596}
1597
1598fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1599    args.get(idx)
1600        .map(VmValue::display)
1601        .filter(|value| !value.is_empty())
1602        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1603}
1604
1605fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1606    let value = args
1607        .get(idx)
1608        .and_then(VmValue::as_int)
1609        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1610    if value <= 0 {
1611        return Err(VmError::Runtime(format!(
1612            "{builtin}: expected a positive int at {idx}"
1613        )));
1614    }
1615    Ok(value)
1616}
1617
1618fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1619    let Some(value) = value else {
1620        return Ok(Vec::new());
1621    };
1622    match value {
1623        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1624        _ => Err(VmError::Runtime(format!(
1625            "{builtin}: expected list<string>"
1626        ))),
1627    }
1628}
1629
1630fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1631    match value {
1632        VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1633        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1634        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1635        _ => Err(VmError::Runtime(
1636            "expected a duration or millisecond count".to_string(),
1637        )),
1638    }
1639}
1640
1641fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1642    active_event_log()
1643        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1644}
1645
1646fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1647    if let Some(log) = active_event_log() {
1648        return Ok(log);
1649    }
1650    let Some(base_dir) = base_dir else {
1651        return Ok(install_memory_for_current_thread(
1652            HITL_EVENT_LOG_QUEUE_DEPTH,
1653        ));
1654    };
1655    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1656}
1657
1658fn current_dispatch_keys() -> Option<DispatchKeys> {
1659    let context = current_dispatch_context()?;
1660    let stable_base = context
1661        .replay_of_event_id
1662        .clone()
1663        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1664    let instance_key = format!(
1665        "{}::{}",
1666        context.trigger_event.id.0,
1667        context.replay_of_event_id.as_deref().unwrap_or("live")
1668    );
1669    Some(DispatchKeys {
1670        instance_key,
1671        stable_base,
1672        agent: context.agent_id,
1673        trace_id: context.trigger_event.trace_id.0,
1674    })
1675}
1676
1677fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1678    if let Some(keys) = dispatch_keys {
1679        let seq = REQUEST_SEQUENCE.with(|slot| {
1680            let mut state = slot.borrow_mut();
1681            if state.instance_key != keys.instance_key {
1682                state.instance_key = keys.instance_key.clone();
1683                state.next_seq = 0;
1684            }
1685            state.next_seq += 1;
1686            state.next_seq
1687        });
1688        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1689    }
1690    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1691}
1692
1693fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1694    let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1695    if let Some(run_id) = request.run_id.as_ref() {
1696        headers.insert("run_id".to_string(), run_id.clone());
1697    }
1698    headers
1699}
1700
1701fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1702    let mut headers = BTreeMap::new();
1703    headers.insert("request_id".to_string(), request_id.to_string());
1704    headers
1705}
1706
1707fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1708    let mut headers = response_headers(request_id);
1709    headers.insert("trace_id".to_string(), trace_id.to_string());
1710    headers
1711}
1712
1713fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1714    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1715}
1716
1717fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1718    event
1719        .headers
1720        .get("request_id")
1721        .is_some_and(|value| value == request_id)
1722        || event
1723            .payload
1724            .get("request_id")
1725            .and_then(JsonValue::as_str)
1726            .is_some_and(|value| value == request_id)
1727}
1728
1729fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1730    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1731        "name": "ApprovalDeniedError",
1732        "category": "generic",
1733        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1734        "request_id": request_id,
1735        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1736        "reason": response.reason,
1737    })))
1738}
1739
1740fn hitl_cancelled_error(
1741    request_id: &str,
1742    kind: HitlRequestKind,
1743    wait_id: &str,
1744    waitpoint_ids: &[String],
1745    reason: Option<String>,
1746) -> VmError {
1747    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1748    let message = reason
1749        .clone()
1750        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1751    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1752        "name": "HumanCancelledError",
1753        "category": ErrorCategory::Cancelled.as_str(),
1754        "message": message,
1755        "request_id": request_id,
1756        "kind": kind.as_str(),
1757        "wait_id": wait_id,
1758        "waitpoint_ids": waitpoint_ids,
1759        "reason": reason,
1760    })))
1761}
1762
1763fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1764    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1765    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1766        "name": "HumanTimeoutError",
1767        "category": ErrorCategory::Timeout.as_str(),
1768        "message": format!("{} timed out", kind.as_str()),
1769        "request_id": request_id,
1770        "kind": kind.as_str(),
1771    })))
1772}
1773
1774fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1775    match default {
1776        VmValue::Int(_) => match value {
1777            VmValue::Int(_) => value.clone(),
1778            VmValue::Float(number) => VmValue::Int(*number as i64),
1779            VmValue::String(text) => text
1780                .parse::<i64>()
1781                .map(VmValue::Int)
1782                .unwrap_or_else(|_| default.clone()),
1783            _ => default.clone(),
1784        },
1785        VmValue::Float(_) => match value {
1786            VmValue::Float(_) => value.clone(),
1787            VmValue::Int(number) => VmValue::Float(*number as f64),
1788            VmValue::String(text) => text
1789                .parse::<f64>()
1790                .map(VmValue::Float)
1791                .unwrap_or_else(|_| default.clone()),
1792            _ => default.clone(),
1793        },
1794        VmValue::Bool(_) => match value {
1795            VmValue::Bool(_) => value.clone(),
1796            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1797            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1798            _ => default.clone(),
1799        },
1800        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1801        VmValue::Duration(_) => match value {
1802            VmValue::Duration(_) => value.clone(),
1803            VmValue::Int(ms) => VmValue::Duration(*ms),
1804            _ => default.clone(),
1805        },
1806        VmValue::Nil => value.clone(),
1807        _ => {
1808            if value.type_name() == default.type_name() {
1809                value.clone()
1810            } else {
1811                default.clone()
1812            }
1813        }
1814    }
1815}
1816
1817fn log_error(error: impl std::fmt::Display) -> VmError {
1818    VmError::Runtime(error.to_string())
1819}
1820
1821fn now_rfc3339() -> String {
1822    format_rfc3339(OffsetDateTime::now_utc())
1823}
1824
1825fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1826    timestamp
1827        .format(&Rfc3339)
1828        .unwrap_or_else(|_| timestamp.to_string())
1829}
1830
1831fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1832    time::Duration::try_from(duration)
1833        .ok()
1834        .map(|duration| format_rfc3339(requested_at + duration))
1835}
1836
1837fn new_trace_id() -> String {
1838    format!("trace_{}", Uuid::now_v7())
1839}
1840
1841fn vm_bool(value: &VmValue) -> Option<bool> {
1842    match value {
1843        VmValue::Bool(flag) => Some(*flag),
1844        _ => None,
1845    }
1846}
1847
1848fn vm_string(value: &VmValue) -> Option<&str> {
1849    match value {
1850        VmValue::String(text) => Some(text.as_ref()),
1851        _ => None,
1852    }
1853}
1854
1855fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1856    match value {
1857        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1858        _ => None,
1859    }
1860}
1861
1862#[cfg(test)]
1863mod tests {
1864    use super::{
1865        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1866    };
1867    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1868    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1869
1870    async fn execute_hitl_script(
1871        base_dir: &std::path::Path,
1872        source: &str,
1873    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1874        reset_thread_local_state();
1875        let log = install_default_for_base_dir(base_dir).expect("install event log");
1876        let chunk = compile_source(source).expect("compile source");
1877        let mut vm = Vm::new();
1878        register_vm_stdlib(&mut vm);
1879        vm.set_source_dir(base_dir);
1880        vm.execute(&chunk).await?;
1881        let output = vm.output().trim_end().to_string();
1882        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1883        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1884        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1885        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1886        Ok((
1887            output,
1888            question_events,
1889            approval_events,
1890            dual_control_events,
1891            escalation_events,
1892        ))
1893    }
1894
1895    async fn event_kinds(
1896        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1897        topic: &str,
1898    ) -> Vec<String> {
1899        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1900            .await
1901            .expect("read topic")
1902            .into_iter()
1903            .map(|(_, event)| event.kind)
1904            .collect()
1905    }
1906
1907    async fn event_payloads(
1908        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1909        topic: &str,
1910    ) -> Vec<serde_json::Value> {
1911        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1912            .await
1913            .expect("read topic")
1914            .into_iter()
1915            .map(|(_, event)| event.payload)
1916            .collect()
1917    }
1918
1919    #[tokio::test(flavor = "current_thread")]
1920    async fn ask_user_coerces_to_default_type_and_logs_events() {
1921        tokio::task::LocalSet::new()
1922            .run_until(async {
1923                let dir = tempfile::tempdir().expect("tempdir");
1924                let source = r#"
1925pipeline test(task) {
1926  host_mock("hitl", "question", {answer: "9"})
1927  let answer: int = ask_user("Pick a number", {default: 0})
1928  __io_println(answer)
1929}
1930"#;
1931                let (
1932                    output,
1933                    question_events,
1934                    approval_events,
1935                    dual_control_events,
1936                    escalation_events,
1937                ) = execute_hitl_script(dir.path(), source)
1938                    .await
1939                    .expect("script succeeds");
1940                assert_eq!(output, "9");
1941                assert_eq!(
1942                    question_events,
1943                    vec![
1944                        "hitl.question_asked".to_string(),
1945                        "hitl.response_received".to_string()
1946                    ]
1947                );
1948                assert!(approval_events.is_empty());
1949                assert!(dual_control_events.is_empty());
1950                assert!(escalation_events.is_empty());
1951            })
1952            .await;
1953    }
1954
1955    #[tokio::test(flavor = "current_thread")]
1956    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1957        tokio::task::LocalSet::new()
1958            .run_until(async {
1959                let dir = tempfile::tempdir().expect("tempdir");
1960                let source = r#"
1961pipeline test(task) {
1962  host_mock("hitl", "approval", [
1963    {approved: true, reviewer: "alice", reason: "ok"},
1964    {approved: true, reviewer: "bob", reason: "ship it"},
1965  ])
1966  let record = request_approval(
1967    "deploy production",
1968    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1969  )
1970  __io_println(record.approved)
1971  __io_println(len(record.reviewers))
1972  __io_println(record.reviewers[0])
1973  __io_println(record.reviewers[1])
1974}
1975"#;
1976                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1977                    .await
1978                    .expect("script succeeds");
1979                assert_eq!(
1980                    approval_events,
1981                    vec![
1982                        "hitl.approval_requested".to_string(),
1983                        "hitl.response_received".to_string(),
1984                        "hitl.response_received".to_string(),
1985                        "hitl.approval_approved".to_string(),
1986                    ]
1987                );
1988            })
1989            .await;
1990    }
1991
1992    #[tokio::test(flavor = "current_thread")]
1993    async fn request_approval_emits_canonical_approval_request_payload() {
1994        tokio::task::LocalSet::new()
1995            .run_until(async {
1996                reset_thread_local_state();
1997                let dir = tempfile::tempdir().expect("tempdir");
1998                let log = install_default_for_base_dir(dir.path()).expect("install event log");
1999                let source = r#"
2000pipeline test(task) {
2001  host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
2002  request_approval("deploy production", {
2003    args: {environment: "prod"},
2004    quorum: 1,
2005    reviewers: ["alice"],
2006    evidence_refs: [{kind: "run", uri: "run_123"}],
2007    undo_metadata: {strategy: "rollback"},
2008    capabilities_requested: ["deploy.production"],
2009  })
2010}
2011"#;
2012                let chunk = compile_source(source).expect("compile source");
2013                let mut vm = Vm::new();
2014                register_vm_stdlib(&mut vm);
2015                vm.set_source_dir(dir.path());
2016                vm.execute(&chunk).await.expect("script succeeds");
2017
2018                let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
2019                let request_payload = &payloads[0]["payload"];
2020                let approval_request = &request_payload["approval_request"];
2021                assert_eq!(approval_request["id"], request_payload["id"]);
2022                assert_eq!(approval_request["action"], "deploy production");
2023                assert_eq!(approval_request["args"]["environment"], "prod");
2024                assert_eq!(approval_request["approvers_required"], 1);
2025                assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
2026                assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
2027                assert_eq!(
2028                    approval_request["capabilities_requested"][0],
2029                    "deploy.production"
2030                );
2031                assert!(approval_request["requested_at"].as_str().is_some());
2032                assert!(approval_request["deadline"].as_str().is_some());
2033            })
2034            .await;
2035    }
2036
2037    #[tokio::test(flavor = "current_thread")]
2038    async fn request_approval_surfaces_denials_as_typed_errors() {
2039        tokio::task::LocalSet::new()
2040            .run_until(async {
2041                let dir = tempfile::tempdir().expect("tempdir");
2042                let source = r#"
2043pipeline test(task) {
2044  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2045  let denied = try {
2046    request_approval("drop table", {reviewers: ["alice"]})
2047  }
2048  __io_println(is_err(denied))
2049  __io_println(unwrap_err(denied).name)
2050  __io_println(unwrap_err(denied).reason)
2051}
2052"#;
2053                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2054                    .await
2055                    .expect("script succeeds");
2056                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2057                assert_eq!(
2058                    approval_events,
2059                    vec![
2060                        "hitl.approval_requested".to_string(),
2061                        "hitl.response_received".to_string(),
2062                        "hitl.approval_denied".to_string(),
2063                    ]
2064                );
2065            })
2066            .await;
2067    }
2068
2069    #[tokio::test(flavor = "current_thread")]
2070    async fn dual_control_executes_action_after_quorum() {
2071        tokio::task::LocalSet::new()
2072            .run_until(async {
2073                let dir = tempfile::tempdir().expect("tempdir");
2074                let source = r#"
2075pipeline test(task) {
2076  host_mock("hitl", "dual_control", [
2077    {approved: true, reviewer: "alice"},
2078    {approved: true, reviewer: "bob"},
2079  ])
2080  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2081  __io_println(result)
2082}
2083"#;
2084                let (output, _, _, dual_control_events, _) =
2085                    execute_hitl_script(dir.path(), source)
2086                        .await
2087                        .expect("script succeeds");
2088                assert_eq!(output, "launched");
2089                assert_eq!(
2090                    dual_control_events,
2091                    vec![
2092                        "hitl.dual_control_requested".to_string(),
2093                        "hitl.response_received".to_string(),
2094                        "hitl.response_received".to_string(),
2095                        "hitl.dual_control_approved".to_string(),
2096                        "hitl.dual_control_executed".to_string(),
2097                    ]
2098                );
2099            })
2100            .await;
2101    }
2102
2103    #[tokio::test(flavor = "current_thread")]
2104    async fn escalate_to_waits_for_acceptance_event() {
2105        tokio::task::LocalSet::new()
2106            .run_until(async {
2107                let dir = tempfile::tempdir().expect("tempdir");
2108                let source = r#"
2109pipeline test(task) {
2110  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2111  let handle = escalate_to("admin", "need override")
2112  __io_println(handle.status)
2113  __io_println(handle.reviewer)
2114}
2115"#;
2116                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2117                    .await
2118                    .expect("script succeeds");
2119                assert_eq!(output, "accepted\nlead");
2120                assert_eq!(
2121                    escalation_events,
2122                    vec![
2123                        "hitl.escalation_issued".to_string(),
2124                        "hitl.escalation_accepted".to_string(),
2125                    ]
2126                );
2127            })
2128            .await;
2129    }
2130
2131    /// `harn-serve` adapters (A2A `input-required`, ACP `hitl_request`)
2132    /// rely on the canonical `AgentEvent::HitlRequested` /
2133    /// `AgentEvent::HitlResolved` pair to bracket every HITL pause.
2134    /// Pin the contract here so future HITL primitives keep emitting
2135    /// the event around their waitpoint blocks.
2136    #[tokio::test(flavor = "current_thread")]
2137    async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2138        use std::sync::Mutex as StdMutex;
2139
2140        tokio::task::LocalSet::new()
2141            .run_until(async {
2142                let dir = tempfile::tempdir().expect("tempdir");
2143                let session_id = "hitl-session".to_string();
2144                let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2145                    std::sync::Arc::new(StdMutex::new(Vec::new()));
2146
2147                struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2148                impl crate::agent_events::AgentEventSink for CaptureSink {
2149                    fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2150                        self.0.lock().expect("captured").push(event.clone());
2151                    }
2152                }
2153
2154                // Inline the script setup rather than using the
2155                // `execute_hitl_script` helper: that helper calls
2156                // `reset_thread_local_state` (which wipes the session
2157                // store), so any session pushed before it would be
2158                // gone by the time `ask_user` runs.
2159                crate::reset_thread_local_state();
2160                crate::event_log::install_default_for_base_dir(dir.path())
2161                    .expect("install event log");
2162
2163                crate::agent_events::reset_all_sinks();
2164                let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2165                    std::sync::Arc::new(CaptureSink(captured.clone()));
2166                crate::agent_events::register_sink(session_id.clone(), sink);
2167                crate::agent_sessions::open_or_create(Some(session_id.clone()));
2168                let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2169
2170                let source = r#"
2171pipeline test(task) {
2172  host_mock("hitl", "question", {answer: "ok"})
2173  let answer: string = ask_user("Are you sure?", {default: "no"})
2174  __io_println(answer)
2175}
2176"#;
2177                let chunk = crate::compile_source(source).expect("compile source");
2178                let mut vm = Vm::new();
2179                register_vm_stdlib(&mut vm);
2180                vm.set_source_dir(dir.path());
2181                vm.execute(&chunk).await.expect("script runs");
2182                assert_eq!(vm.output().trim_end(), "ok");
2183
2184                let events = captured.lock().expect("captured");
2185                let mut iter = events.iter().filter(|event| {
2186                    matches!(
2187                        event,
2188                        crate::agent_events::AgentEvent::HitlRequested { .. }
2189                            | crate::agent_events::AgentEvent::HitlResolved { .. }
2190                    )
2191                });
2192                let requested = iter.next().expect("HitlRequested emitted");
2193                let resolved = iter.next().expect("HitlResolved emitted");
2194                assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2195
2196                let crate::agent_events::AgentEvent::HitlRequested {
2197                    session_id: req_session,
2198                    request_id: req_id,
2199                    kind: req_kind,
2200                    payload,
2201                } = requested
2202                else {
2203                    panic!("expected HitlRequested, got: {requested:?}");
2204                };
2205                assert_eq!(req_session, &session_id);
2206                assert_eq!(req_kind, "question");
2207                assert!(req_id.starts_with("hitl_question_"));
2208                assert_eq!(payload["prompt"], "Are you sure?");
2209
2210                let crate::agent_events::AgentEvent::HitlResolved {
2211                    request_id: res_id,
2212                    kind: res_kind,
2213                    outcome,
2214                    ..
2215                } = resolved
2216                else {
2217                    panic!("expected HitlResolved, got: {resolved:?}");
2218                };
2219                assert_eq!(res_id, req_id);
2220                assert_eq!(res_kind, "question");
2221                assert_eq!(outcome, "answered");
2222
2223                drop(_guard);
2224                crate::agent_events::reset_all_sinks();
2225            })
2226            .await;
2227    }
2228}