Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration as StdDuration;
6
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value as JsonValue};
9use sha2::Digest;
10use time::format_description::well_known::Rfc3339;
11use time::OffsetDateTime;
12use uuid::Uuid;
13
14use crate::event_log::{
15    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
16    EventLog, LogEvent, Topic,
17};
18use crate::runtime_limits::RuntimeLimits;
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::macros::{harn_builtin, BuiltinSignature, Param, VmBuiltinDef, TY_ANY, TY_DICT};
22use crate::stdlib::options::{duration_from_value, ErrorKind};
23use crate::stdlib::waitpoint::{
24    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
25    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
26    WaitpointWaitOptions,
27};
28use crate::triggers::dispatcher::current_dispatch_context;
29use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
30use crate::vm::{AsyncBuiltinCtx, Vm};
31
32const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
33const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
34const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
35
36pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
37pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
38pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
39pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
40
41thread_local! {
42    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
43}
44
45#[derive(Default)]
46pub(crate) struct RequestSequenceState {
47    pub(crate) instance_key: String,
48    pub(crate) next_seq: u64,
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub enum HitlRequestKind {
54    Question,
55    Approval,
56    DualControl,
57    Escalation,
58}
59
60impl HitlRequestKind {
61    pub(crate) fn as_str(self) -> &'static str {
62        match self {
63            Self::Question => "question",
64            Self::Approval => "approval",
65            Self::DualControl => "dual_control",
66            Self::Escalation => "escalation",
67        }
68    }
69
70    fn topic(self) -> &'static str {
71        match self {
72            Self::Question => HITL_QUESTIONS_TOPIC,
73            Self::Approval => HITL_APPROVALS_TOPIC,
74            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
75            Self::Escalation => HITL_ESCALATIONS_TOPIC,
76        }
77    }
78
79    fn request_event_kind(self) -> &'static str {
80        match self {
81            Self::Question => "hitl.question_asked",
82            Self::Approval => "hitl.approval_requested",
83            Self::DualControl => "hitl.dual_control_requested",
84            Self::Escalation => "hitl.escalation_issued",
85        }
86    }
87
88    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
89        if request_id.starts_with("hitl_question_") {
90            Some(Self::Question)
91        } else if request_id.starts_with("hitl_approval_") {
92            Some(Self::Approval)
93        } else if request_id.starts_with("hitl_dual_control_") {
94            Some(Self::DualControl)
95        } else if request_id.starts_with("hitl_escalation_") {
96            Some(Self::Escalation)
97        } else {
98            None
99        }
100    }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct HitlHostResponse {
105    pub request_id: String,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub answer: Option<JsonValue>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub approved: Option<bool>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub accepted: Option<bool>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub reviewer: Option<String>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub reason: Option<String>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub metadata: Option<JsonValue>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub responded_at: Option<String>,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub signature: Option<String>,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125struct HitlRequestEnvelope {
126    request_id: String,
127    kind: HitlRequestKind,
128    #[serde(default)]
129    agent: String,
130    trace_id: String,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    run_id: Option<String>,
133    requested_at: String,
134    payload: JsonValue,
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize)]
138struct HitlTimeoutRecord {
139    request_id: String,
140    kind: HitlRequestKind,
141    trace_id: String,
142    timed_out_at: String,
143}
144
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct ApprovalRequest {
147    pub id: String,
148    pub action: String,
149    #[serde(default)]
150    pub args: JsonValue,
151    pub principal: String,
152    pub requested_at: String,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub deadline: Option<String>,
155    pub approvers_required: u32,
156    #[serde(default)]
157    pub evidence_refs: Vec<JsonValue>,
158    #[serde(default)]
159    pub undo_metadata: JsonValue,
160    #[serde(default)]
161    pub capabilities_requested: Vec<String>,
162}
163
164impl ApprovalRequest {
165    pub fn new(
166        id: impl Into<String>,
167        action: impl Into<String>,
168        args: JsonValue,
169        principal: impl Into<String>,
170        requested_at: impl Into<String>,
171    ) -> Self {
172        Self {
173            id: id.into(),
174            action: action.into(),
175            args,
176            principal: principal.into(),
177            requested_at: requested_at.into(),
178            deadline: None,
179            approvers_required: 1,
180            evidence_refs: Vec::new(),
181            undo_metadata: JsonValue::Null,
182            capabilities_requested: Vec::new(),
183        }
184    }
185}
186
187pub(crate) fn approval_request_for_host_permission(
188    id: impl Into<String>,
189    action: impl Into<String>,
190    args: JsonValue,
191    principal: impl Into<String>,
192    evidence_refs: Vec<JsonValue>,
193    undo_metadata: JsonValue,
194    capabilities_requested: Vec<String>,
195) -> ApprovalRequest {
196    let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
197    request.evidence_refs = evidence_refs;
198    request.undo_metadata = undo_metadata;
199    request.capabilities_requested = capabilities_requested;
200    request
201}
202
203#[derive(Clone, Debug)]
204struct DispatchKeys {
205    instance_key: String,
206    stable_base: String,
207    agent: String,
208    trace_id: String,
209}
210
211#[derive(Clone, Debug)]
212struct AskUserOptions {
213    schema: Option<VmValue>,
214    timeout: Option<StdDuration>,
215    default: Option<VmValue>,
216}
217
218#[derive(Clone, Debug)]
219struct ApprovalOptions {
220    detail: Option<VmValue>,
221    args: Option<VmValue>,
222    quorum: u32,
223    reviewers: Vec<String>,
224    deadline: StdDuration,
225    principal: Option<String>,
226    evidence_refs: Vec<JsonValue>,
227    undo_metadata: Option<JsonValue>,
228    capabilities_requested: Vec<String>,
229}
230
231#[derive(Clone, Debug)]
232struct ApprovalProgress {
233    request_id: String,
234    reviewers: BTreeSet<String>,
235    signatures: Vec<ApprovalSignature>,
236    reason: Option<String>,
237    approved_at: Option<String>,
238}
239
240#[derive(Clone, Debug, Serialize)]
241struct ApprovalSignature {
242    reviewer: String,
243    signed_at: String,
244    signature: String,
245}
246
247#[derive(Clone, Debug)]
248enum ApprovalResolution {
249    Pending,
250    Approved(ApprovalProgress),
251    Denied(HitlHostResponse),
252}
253
254// `Completed` carries the full `WaitpointRecord`, which dominates the
255// enum's size — boxing it would force every match arm to indirect even
256// though the enum is dropped within nanoseconds of being constructed
257// (it's a local return type for the waitpoint poll loop, never stored).
258// Surfaced by the host-target compile of `harn-vm` introduced when
259// `harn-cli`'s build script gained `harn-vm` as a build-dep for the
260// AOT bytecode embedding pass (G7 / harn#2300).
261#[allow(clippy::large_enum_variant)]
262#[derive(Clone, Debug)]
263enum WaitpointOutcome {
264    Completed(WaitpointRecord),
265    Timeout,
266    Cancelled {
267        wait_id: String,
268        waitpoint_ids: Vec<String>,
269        reason: Option<String>,
270    },
271}
272
273pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
274    for def in MODULE_BUILTINS {
275        vm.register_builtin_def(def);
276    }
277}
278
279pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
280    &ASK_USER_BUILTIN_DEF,
281    &REQUEST_APPROVAL_BUILTIN_DEF,
282    &DUAL_CONTROL_BUILTIN_DEF,
283    &ESCALATE_TO_BUILTIN_DEF,
284];
285
286#[harn_builtin(
287    sig = "ask_user(prompt: string, options?: dict) -> any",
288    kind = "async",
289    category = "hitl"
290)]
291async fn ask_user_builtin(
292    ctx: crate::vm::AsyncBuiltinCtx,
293    args: Vec<VmValue>,
294) -> Result<VmValue, VmError> {
295    ask_user_impl(Some(&ctx), &args).await
296}
297
298#[harn_builtin(
299    sig_expr = BuiltinSignature::variadic("request_approval", &[Param::new("args", TY_ANY)], TY_DICT),
300    kind = "async",
301    category = "hitl"
302)]
303async fn request_approval_builtin(
304    ctx: crate::vm::AsyncBuiltinCtx,
305    args: Vec<VmValue>,
306) -> Result<VmValue, VmError> {
307    request_approval_impl(Some(&ctx), &args).await
308}
309
310#[harn_builtin(
311    sig = "dual_control(n: int, m: int, action: closure, approvers?: list) -> dict",
312    kind = "async",
313    category = "hitl"
314)]
315async fn dual_control_builtin(
316    ctx: crate::vm::AsyncBuiltinCtx,
317    args: Vec<VmValue>,
318) -> Result<VmValue, VmError> {
319    dual_control_impl(&ctx, &args).await
320}
321
322#[harn_builtin(
323    sig = "escalate_to(role: string, reason: string) -> dict",
324    kind = "async",
325    category = "hitl"
326)]
327async fn escalate_to_builtin(
328    ctx: crate::vm::AsyncBuiltinCtx,
329    args: Vec<VmValue>,
330) -> Result<VmValue, VmError> {
331    escalate_to_impl(Some(&ctx), &args).await
332}
333
334pub(crate) fn reset_hitl_state() {
335    REQUEST_SEQUENCE.with(|slot| {
336        *slot.borrow_mut() = RequestSequenceState::default();
337    });
338}
339
340pub(crate) fn take_hitl_state() -> RequestSequenceState {
341    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
342}
343
344pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
345    REQUEST_SEQUENCE.with(|slot| {
346        *slot.borrow_mut() = state;
347    });
348}
349
350pub async fn append_hitl_response(
351    base_dir: Option<&Path>,
352    mut response: HitlHostResponse,
353) -> Result<u64, String> {
354    let kind = HitlRequestKind::from_request_id(&response.request_id)
355        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
356    if response.responded_at.is_none() {
357        response.responded_at = Some(now_rfc3339());
358    }
359    let log = ensure_hitl_event_log_for(base_dir)?;
360    let headers = response_headers(&response.request_id);
361    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
362    let event_id = log
363        .append(
364            &topic,
365            LogEvent::new(
366                match kind {
367                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
368                    _ => "hitl.response_received",
369                },
370                serde_json::to_value(&response).map_err(|error| error.to_string())?,
371            )
372            .with_headers(headers),
373        )
374        .await
375        .map_err(|error| error.to_string())?;
376    finalize_hitl_response(&log, kind, &response).await?;
377    Ok(event_id)
378}
379
380pub async fn append_approval_request_on(
381    log: &Arc<AnyEventLog>,
382    agent: impl Into<String>,
383    trace_id: impl Into<String>,
384    action: impl Into<String>,
385    detail: JsonValue,
386    reviewers: Vec<String>,
387) -> Result<String, VmError> {
388    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
389    let trace_id = trace_id.into();
390    let agent = agent.into();
391    let requested_at_time = OffsetDateTime::now_utc();
392    let requested_at = format_rfc3339(requested_at_time);
393    let mut approval_request = ApprovalRequest::new(
394        request_id.clone(),
395        action.into(),
396        detail.clone(),
397        agent.clone(),
398        requested_at.clone(),
399    );
400    approval_request.deadline = deadline_after(
401        requested_at_time,
402        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
403    );
404    approval_request.approvers_required = 1;
405    let approval_request_json = serde_json::to_value(&approval_request)
406        .map_err(|error| VmError::Runtime(error.to_string()))?;
407    let request = HitlRequestEnvelope {
408        request_id: request_id.clone(),
409        kind: HitlRequestKind::Approval,
410        agent,
411        trace_id: trace_id.clone(),
412        run_id: None,
413        requested_at: requested_at.clone(),
414        payload: json!({
415            "approval_request": approval_request_json,
416            "id": approval_request.id,
417            "action": approval_request.action,
418            "args": approval_request.args,
419            "principal": approval_request.principal,
420            "requested_at": requested_at,
421            "deadline": approval_request.deadline,
422            "approvers_required": approval_request.approvers_required,
423            "evidence_refs": approval_request.evidence_refs,
424            "undo_metadata": approval_request.undo_metadata,
425            "capabilities_requested": approval_request.capabilities_requested,
426            "detail": detail,
427            "quorum": 1,
428            "reviewers": reviewers,
429            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
430        }),
431    };
432    create_request_waitpoint(log, &request).await?;
433    append_request(log, &request).await?;
434    maybe_notify_host(None, &request);
435    Ok(request_id)
436}
437
438async fn ask_user_impl(
439    ctx: Option<&AsyncBuiltinCtx>,
440    args: &[VmValue],
441) -> Result<VmValue, VmError> {
442    let prompt = required_string_arg(args, 0, "ask_user")?;
443    let options = parse_ask_user_options(args.get(1))?;
444    let keys = current_dispatch_keys();
445    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
446    let trace_id = keys
447        .as_ref()
448        .map(|keys| keys.trace_id.clone())
449        .unwrap_or_else(new_trace_id);
450    let log = ensure_hitl_event_log();
451    let request = HitlRequestEnvelope {
452        request_id: request_id.clone(),
453        kind: HitlRequestKind::Question,
454        agent: keys
455            .as_ref()
456            .map(|keys| keys.agent.clone())
457            .unwrap_or_default(),
458        trace_id: trace_id.clone(),
459        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
460        requested_at: now_rfc3339(),
461        payload: json!({
462            "prompt": prompt,
463            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
464            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
465            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
466        }),
467    };
468    create_request_waitpoint(&log, &request).await?;
469    append_request(&log, &request).await?;
470    maybe_notify_host(ctx, &request);
471    emit_hitl_requested(&request);
472    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
473
474    match wait_for_request_waitpoint_with_events(
475        &request_id,
476        HitlRequestKind::Question,
477        options.timeout,
478    )
479    .await?
480    {
481        WaitpointOutcome::Completed(record) => {
482            let answer = record
483                .value
484                .as_ref()
485                .map(crate::stdlib::json_to_vm_value)
486                .unwrap_or(VmValue::Nil);
487            if let Some(schema) = options.schema.as_ref() {
488                return schema_expect_value(&answer, schema, true);
489            }
490            if let Some(default) = options.default.as_ref() {
491                return Ok(coerce_like_default(&answer, default));
492            }
493            Ok(answer)
494        }
495        WaitpointOutcome::Timeout => {
496            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
497            if let Some(default) = options.default {
498                return Ok(default);
499            }
500            Err(timeout_error(&request_id, HitlRequestKind::Question))
501        }
502        WaitpointOutcome::Cancelled {
503            wait_id,
504            waitpoint_ids,
505            reason,
506        } => Err(hitl_cancelled_error(
507            &request_id,
508            HitlRequestKind::Question,
509            &wait_id,
510            &waitpoint_ids,
511            reason,
512        )),
513    }
514}
515
516async fn request_approval_impl(
517    ctx: Option<&AsyncBuiltinCtx>,
518    args: &[VmValue],
519) -> Result<VmValue, VmError> {
520    let action = required_string_arg(args, 0, "request_approval")?;
521    let options = parse_approval_options(args.get(1), "request_approval")?;
522    let keys = current_dispatch_keys();
523    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
524    let trace_id = keys
525        .as_ref()
526        .map(|keys| keys.trace_id.clone())
527        .unwrap_or_else(new_trace_id);
528    let agent = keys
529        .as_ref()
530        .map(|keys| keys.agent.clone())
531        .unwrap_or_default();
532    let requested_at_time = OffsetDateTime::now_utc();
533    let requested_at = format_rfc3339(requested_at_time);
534    let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
535    let approval_args = options
536        .args
537        .as_ref()
538        .or(options.detail.as_ref())
539        .map(crate::llm::vm_value_to_json)
540        .unwrap_or(JsonValue::Null);
541    let mut approval_request = ApprovalRequest::new(
542        request_id.clone(),
543        action.clone(),
544        approval_args,
545        principal,
546        requested_at.clone(),
547    );
548    approval_request.deadline = deadline_after(requested_at_time, options.deadline);
549    approval_request.approvers_required = options.quorum;
550    approval_request.evidence_refs = options.evidence_refs.clone();
551    approval_request.undo_metadata = options
552        .undo_metadata
553        .clone()
554        .or_else(|| {
555            crate::orchestration::current_mutation_session()
556                .and_then(|session| serde_json::to_value(session).ok())
557        })
558        .unwrap_or(JsonValue::Null);
559    approval_request.capabilities_requested = options.capabilities_requested.clone();
560    let approval_request_json = serde_json::to_value(&approval_request)
561        .map_err(|error| VmError::Runtime(error.to_string()))?;
562    let log = ensure_hitl_event_log();
563    let request = HitlRequestEnvelope {
564        request_id: request_id.clone(),
565        kind: HitlRequestKind::Approval,
566        agent,
567        trace_id: trace_id.clone(),
568        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
569        requested_at: requested_at.clone(),
570        payload: json!({
571            "approval_request": approval_request_json,
572            "id": approval_request.id,
573            "action": action,
574            "args": approval_request.args,
575            "principal": approval_request.principal,
576            "requested_at": requested_at,
577            "deadline": approval_request.deadline,
578            "approvers_required": approval_request.approvers_required,
579            "evidence_refs": approval_request.evidence_refs,
580            "undo_metadata": approval_request.undo_metadata,
581            "capabilities_requested": approval_request.capabilities_requested,
582            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
583            "quorum": options.quorum,
584            "reviewers": options.reviewers,
585            "deadline_ms": options.deadline.as_millis() as u64,
586        }),
587    };
588    create_request_waitpoint(&log, &request).await?;
589    append_request(&log, &request).await?;
590    maybe_notify_host(ctx, &request);
591    emit_hitl_requested(&request);
592    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
593
594    match wait_for_request_waitpoint_with_events(
595        &request_id,
596        HitlRequestKind::Approval,
597        Some(options.deadline),
598    )
599    .await?
600    {
601        WaitpointOutcome::Completed(record) => {
602            approval_record_from_waitpoint(&record, "request_approval")
603        }
604        WaitpointOutcome::Timeout => {
605            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
606            Err(timeout_error(&request_id, HitlRequestKind::Approval))
607        }
608        WaitpointOutcome::Cancelled { .. } => {
609            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
610        }
611    }
612}
613
614pub(crate) async fn request_approval_for_side_effect(
615    action: &str,
616    detail: JsonValue,
617    principal: String,
618    reviewers: Vec<String>,
619    capabilities_requested: Vec<String>,
620) -> Result<VmValue, VmError> {
621    let mut options = BTreeMap::new();
622    options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
623    options.insert(
624        "detail".to_string(),
625        crate::stdlib::json_to_vm_value(&detail),
626    );
627    options.insert(
628        "principal".to_string(),
629        VmValue::String(std::sync::Arc::from(principal)),
630    );
631    options.insert(
632        "reviewers".to_string(),
633        VmValue::List(std::sync::Arc::new(
634            reviewers
635                .into_iter()
636                .map(|reviewer| VmValue::String(std::sync::Arc::from(reviewer)))
637                .collect(),
638        )),
639    );
640    options.insert(
641        "capabilities_requested".to_string(),
642        VmValue::List(std::sync::Arc::new(
643            capabilities_requested
644                .into_iter()
645                .map(|capability| VmValue::String(std::sync::Arc::from(capability)))
646                .collect(),
647        )),
648    );
649    let args = vec![
650        VmValue::String(std::sync::Arc::from(action.to_string())),
651        VmValue::Dict(std::sync::Arc::new(options)),
652    ];
653    request_approval_impl(None, &args).await
654}
655
656async fn dual_control_impl(ctx: &AsyncBuiltinCtx, args: &[VmValue]) -> Result<VmValue, VmError> {
657    let n = required_positive_int_arg(args, 0, "dual_control")?;
658    let m = required_positive_int_arg(args, 1, "dual_control")?;
659    if n > m {
660        return Err(VmError::Runtime(
661            "dual_control: n must be less than or equal to m".to_string(),
662        ));
663    }
664    let action = args
665        .get(2)
666        .and_then(|value| match value {
667            VmValue::Closure(closure) => Some(closure.clone()),
668            _ => None,
669        })
670        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
671    let approvers = optional_string_list(args.get(3), "dual_control")?;
672    if !approvers.is_empty() && approvers.len() < m as usize {
673        return Err(VmError::Runtime(format!(
674            "dual_control: expected at least {m} approvers, got {}",
675            approvers.len()
676        )));
677    }
678
679    let keys = current_dispatch_keys();
680    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
681    let trace_id = keys
682        .as_ref()
683        .map(|keys| keys.trace_id.clone())
684        .unwrap_or_else(new_trace_id);
685    let action_name = if action.func.name.is_empty() {
686        "anonymous".to_string()
687    } else {
688        action.func.name.clone()
689    };
690    let agent = keys
691        .as_ref()
692        .map(|keys| keys.agent.clone())
693        .unwrap_or_default();
694    let requested_at_time = OffsetDateTime::now_utc();
695    let requested_at = format_rfc3339(requested_at_time);
696    let mut approval_request = ApprovalRequest::new(
697        request_id.clone(),
698        action_name.clone(),
699        json!({
700            "n": n,
701            "m": m,
702            "approvers": approvers.clone(),
703        }),
704        agent.clone(),
705        requested_at.clone(),
706    );
707    approval_request.deadline = deadline_after(
708        requested_at_time,
709        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
710    );
711    approval_request.approvers_required = n as u32;
712    approval_request.undo_metadata = crate::orchestration::current_mutation_session()
713        .and_then(|session| serde_json::to_value(session).ok())
714        .unwrap_or(JsonValue::Null);
715    let approval_request_json = serde_json::to_value(&approval_request)
716        .map_err(|error| VmError::Runtime(error.to_string()))?;
717    let log = ensure_hitl_event_log();
718    let request = HitlRequestEnvelope {
719        request_id: request_id.clone(),
720        kind: HitlRequestKind::DualControl,
721        agent,
722        trace_id: trace_id.clone(),
723        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
724        requested_at: requested_at.clone(),
725        payload: json!({
726            "approval_request": approval_request_json,
727            "id": approval_request.id,
728            "args": approval_request.args,
729            "principal": approval_request.principal,
730            "requested_at": requested_at,
731            "deadline": approval_request.deadline,
732            "approvers_required": approval_request.approvers_required,
733            "evidence_refs": approval_request.evidence_refs,
734            "undo_metadata": approval_request.undo_metadata,
735            "capabilities_requested": approval_request.capabilities_requested,
736            "n": n,
737            "m": m,
738            "action": action_name,
739            "approvers": approvers,
740            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
741        }),
742    };
743    create_request_waitpoint(&log, &request).await?;
744    append_request(&log, &request).await?;
745    maybe_notify_host(Some(ctx), &request);
746    emit_hitl_requested(&request);
747    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
748
749    match wait_for_request_waitpoint_with_events(
750        &request_id,
751        HitlRequestKind::DualControl,
752        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
753    )
754    .await?
755    {
756        WaitpointOutcome::Completed(record) => {
757            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
758            let mut vm = ctx.child_vm();
759            let result = vm.call_closure_pub(&action, &[]).await?;
760            ctx.forward_output(&vm.take_output());
761
762            append_named_event(
763                &log,
764                HitlRequestKind::DualControl,
765                "hitl.dual_control_executed",
766                &request_id,
767                &trace_id,
768                json!({
769                    "request_id": request_id,
770                    "result": crate::llm::vm_value_to_json(&result),
771                }),
772            )
773            .await?;
774
775            Ok(result)
776        }
777        WaitpointOutcome::Timeout => {
778            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
779            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
780        }
781        WaitpointOutcome::Cancelled { .. } => {
782            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
783        }
784    }
785}
786
787async fn escalate_to_impl(
788    ctx: Option<&AsyncBuiltinCtx>,
789    args: &[VmValue],
790) -> Result<VmValue, VmError> {
791    let role = required_string_arg(args, 0, "escalate_to")?;
792    let reason = required_string_arg(args, 1, "escalate_to")?;
793    let keys = current_dispatch_keys();
794    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
795    let trace_id = keys
796        .as_ref()
797        .map(|keys| keys.trace_id.clone())
798        .unwrap_or_else(new_trace_id);
799    let log = ensure_hitl_event_log();
800    let request = HitlRequestEnvelope {
801        request_id: request_id.clone(),
802        kind: HitlRequestKind::Escalation,
803        agent: keys
804            .as_ref()
805            .map(|keys| keys.agent.clone())
806            .unwrap_or_default(),
807        trace_id: trace_id.clone(),
808        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
809        requested_at: now_rfc3339(),
810        payload: json!({
811            "role": role,
812            "reason": reason,
813            "capability_policy": escalation_capability_policy(),
814        }),
815    };
816    create_request_waitpoint(&log, &request).await?;
817    append_request(&log, &request).await?;
818    maybe_notify_host(ctx, &request);
819    emit_hitl_requested(&request);
820    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
821
822    match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
823        .await?
824    {
825        WaitpointOutcome::Completed(record) => {
826            let accepted_at = record.completed_at.clone();
827            let reviewer = record.completed_by.clone();
828            let accepted = record
829                .value
830                .as_ref()
831                .and_then(|value| value.get("accepted"))
832                .and_then(JsonValue::as_bool)
833                .unwrap_or(true);
834            Ok(crate::stdlib::json_to_vm_value(&json!({
835                "request_id": request_id,
836                "role": role,
837                "reason": reason,
838                "trace_id": trace_id,
839                "status": if accepted { "accepted" } else { "pending" },
840                "accepted_at": accepted_at,
841                "reviewer": reviewer,
842            })))
843        }
844        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
845        WaitpointOutcome::Cancelled {
846            wait_id,
847            waitpoint_ids,
848            reason,
849        } => Err(hitl_cancelled_error(
850            &request_id,
851            HitlRequestKind::Escalation,
852            &wait_id,
853            &waitpoint_ids,
854            reason,
855        )),
856    }
857}
858
859async fn create_request_waitpoint(
860    log: &Arc<AnyEventLog>,
861    request: &HitlRequestEnvelope,
862) -> Result<(), VmError> {
863    create_waitpoint_on(
864        log,
865        Some(request.request_id.clone()),
866        Some(json!({
867            "kind": request.kind.as_str(),
868            "agent": request.agent.clone(),
869            "trace_id": request.trace_id.clone(),
870            "requested_at": request.requested_at.clone(),
871            "payload": request.payload.clone(),
872        })),
873    )
874    .await?;
875    Ok(())
876}
877
878async fn wait_for_request_waitpoint(
879    request_id: &str,
880    timeout: Option<StdDuration>,
881) -> Result<WaitpointOutcome, VmError> {
882    match wait_on_waitpoints(
883        vec![request_id.to_string()],
884        WaitpointWaitOptions { timeout },
885    )
886    .await
887    {
888        Ok(records) => Ok(WaitpointOutcome::Completed(
889            records
890                .into_iter()
891                .next()
892                .expect("single waitpoint wait result"),
893        )),
894        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
895        Err(WaitpointWaitFailure::Cancelled {
896            wait_id,
897            waitpoint_ids,
898            reason,
899        }) => Ok(WaitpointOutcome::Cancelled {
900            wait_id,
901            waitpoint_ids,
902            reason,
903        }),
904        Err(WaitpointWaitFailure::Vm(error)) => {
905            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
906                return Ok(outcome);
907            }
908            Err(error)
909        }
910    }
911}
912
913fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
914    let VmError::Thrown(VmValue::Dict(dict)) = error else {
915        return None;
916    };
917    let name = dict.get("name").and_then(vm_string)?;
918    match name {
919        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
920        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
921            wait_id: dict
922                .get("wait_id")
923                .and_then(vm_string)
924                .unwrap_or_default()
925                .to_string(),
926            waitpoint_ids: dict
927                .get("waitpoint_ids")
928                .and_then(vm_string_list)
929                .unwrap_or_default(),
930            reason: dict
931                .get("reason")
932                .and_then(vm_string)
933                .map(ToString::to_string),
934        }),
935        _ => None,
936    }
937}
938
939async fn finalize_hitl_response(
940    log: &Arc<AnyEventLog>,
941    kind: HitlRequestKind,
942    response: &HitlHostResponse,
943) -> Result<(), String> {
944    match kind {
945        HitlRequestKind::Question => {
946            if waitpoint_is_terminal(log, &response.request_id).await? {
947                return Ok(());
948            }
949            complete_waitpoint_on(
950                log,
951                &response.request_id,
952                response.answer.clone(),
953                response.reviewer.clone(),
954                response.reason.clone(),
955                response.metadata.clone(),
956            )
957            .await
958            .map(|_| ())
959            .map_err(|error| error.to_string())
960        }
961        HitlRequestKind::Escalation => {
962            if !response.accepted.unwrap_or(false)
963                || waitpoint_is_terminal(log, &response.request_id).await?
964            {
965                return Ok(());
966            }
967            complete_waitpoint_on(
968                log,
969                &response.request_id,
970                Some(json!({
971                    "accepted": true,
972                    "reviewer": response.reviewer,
973                    "reason": response.reason,
974                    "responded_at": response.responded_at,
975                })),
976                response.reviewer.clone(),
977                response.reason.clone(),
978                response.metadata.clone(),
979            )
980            .await
981            .map(|_| ())
982            .map_err(|error| error.to_string())
983        }
984        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
985            if waitpoint_is_terminal(log, &response.request_id).await? {
986                return Ok(());
987            }
988            let request = load_request_envelope(log, kind, &response.request_id)
989                .await
990                .map_err(|error| error.to_string())?;
991            match resolve_approval_state(log, kind, &request)
992                .await
993                .map_err(|error| error.to_string())?
994            {
995                ApprovalResolution::Pending => Ok(()),
996                ApprovalResolution::Approved(progress) => {
997                    let record = approval_record_json(&progress);
998                    append_named_event(
999                        log,
1000                        kind,
1001                        approved_event_kind(kind),
1002                        &request.request_id,
1003                        &request.trace_id,
1004                        json!({
1005                            "request_id": request.request_id.clone(),
1006                            "record": record.clone(),
1007                        }),
1008                    )
1009                    .await
1010                    .map_err(|error| error.to_string())?;
1011                    complete_waitpoint_on(
1012                        log,
1013                        &request.request_id,
1014                        Some(record),
1015                        response.reviewer.clone(),
1016                        progress.reason.clone(),
1017                        response.metadata.clone(),
1018                    )
1019                    .await
1020                    .map(|_| ())
1021                    .map_err(|error| error.to_string())
1022                }
1023                ApprovalResolution::Denied(denied) => {
1024                    append_named_event(
1025                        log,
1026                        kind,
1027                        denied_event_kind(kind),
1028                        &request.request_id,
1029                        &request.trace_id,
1030                        json!({
1031                            "request_id": request.request_id.clone(),
1032                            "reviewer": denied.reviewer.clone(),
1033                            "reason": denied.reason.clone(),
1034                        }),
1035                    )
1036                    .await
1037                    .map_err(|error| error.to_string())?;
1038                    cancel_waitpoint_on(
1039                        log,
1040                        &request.request_id,
1041                        denied.reviewer.clone(),
1042                        denied.reason.clone(),
1043                        denied.metadata.clone(),
1044                    )
1045                    .await
1046                    .map(|_| ())
1047                    .map_err(|error| error.to_string())
1048                }
1049            }
1050        }
1051    }
1052}
1053
1054async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1055    Ok(inspect_waitpoint_on(log, request_id)
1056        .await
1057        .map_err(|error| error.to_string())?
1058        .is_some_and(|record| record.status != WaitpointStatus::Open))
1059}
1060
1061async fn load_request_envelope(
1062    log: &Arc<AnyEventLog>,
1063    kind: HitlRequestKind,
1064    request_id: &str,
1065) -> Result<HitlRequestEnvelope, VmError> {
1066    let topic = topic(kind)?;
1067    let events = log
1068        .read_range(&topic, None, usize::MAX)
1069        .await
1070        .map_err(log_error)?;
1071    events
1072        .into_iter()
1073        .filter(|(_, event)| event.kind == kind.request_event_kind())
1074        .find_map(|(_, event)| {
1075            if !event_matches_request(&event, request_id) {
1076                return None;
1077            }
1078            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1079        })
1080        .ok_or_else(|| {
1081            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1082        })
1083}
1084
1085async fn resolve_approval_state(
1086    log: &Arc<AnyEventLog>,
1087    kind: HitlRequestKind,
1088    request: &HitlRequestEnvelope,
1089) -> Result<ApprovalResolution, VmError> {
1090    let quorum = approval_quorum_from_request(kind, request)?;
1091    let allowed_reviewers = approval_reviewers_from_request(kind, request)
1092        .into_iter()
1093        .collect::<BTreeSet<_>>();
1094    let mut progress = ApprovalProgress {
1095        request_id: request.request_id.clone(),
1096        reviewers: BTreeSet::new(),
1097        signatures: Vec::new(),
1098        reason: None,
1099        approved_at: None,
1100    };
1101    let topic = topic(kind)?;
1102    let events = log
1103        .read_range(&topic, None, usize::MAX)
1104        .await
1105        .map_err(log_error)?;
1106    for (_, event) in events {
1107        if !event_matches_request(&event, &request.request_id)
1108            || event.kind != "hitl.response_received"
1109        {
1110            continue;
1111        }
1112        let response: HitlHostResponse = serde_json::from_value(event.payload)
1113            .map_err(|error| VmError::Runtime(error.to_string()))?;
1114        if let Some(reviewer) = response.reviewer.as_deref() {
1115            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1116                continue;
1117            }
1118            if progress.reviewers.contains(reviewer) {
1119                continue;
1120            }
1121        }
1122        if response.approved.unwrap_or(false) {
1123            if let Some(reviewer) = response.reviewer.clone() {
1124                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1125                progress.reviewers.insert(reviewer.clone());
1126                progress.signatures.push(ApprovalSignature {
1127                    reviewer: reviewer.clone(),
1128                    signed_at: signed_at.clone(),
1129                    signature: response.signature.clone().unwrap_or_else(|| {
1130                        approval_receipt_signature(
1131                            &request.request_id,
1132                            &reviewer,
1133                            &signed_at,
1134                            true,
1135                            response.reason.as_deref(),
1136                        )
1137                    }),
1138                });
1139            }
1140            progress.reason = response.reason.clone();
1141            progress.approved_at = response.responded_at.clone();
1142            if progress.reviewers.len() as u32 >= quorum {
1143                return Ok(ApprovalResolution::Approved(progress));
1144            }
1145            continue;
1146        }
1147        return Ok(ApprovalResolution::Denied(response));
1148    }
1149    Ok(ApprovalResolution::Pending)
1150}
1151
1152fn approval_quorum_from_request(
1153    kind: HitlRequestKind,
1154    request: &HitlRequestEnvelope,
1155) -> Result<u32, VmError> {
1156    let key = match kind {
1157        HitlRequestKind::DualControl => "n",
1158        _ => "quorum",
1159    };
1160    let quorum = request
1161        .payload
1162        .get(key)
1163        .or_else(|| request.payload.get("approvers_required"))
1164        .or_else(|| {
1165            request
1166                .payload
1167                .get("approval_request")
1168                .and_then(|approval| approval.get("approvers_required"))
1169        })
1170        .and_then(JsonValue::as_u64)
1171        .unwrap_or(1);
1172    u32::try_from(quorum).map_err(|_| {
1173        VmError::Runtime(format!(
1174            "invalid quorum in HITL request '{}'",
1175            request.request_id
1176        ))
1177    })
1178}
1179
1180fn approval_reviewers_from_request(
1181    kind: HitlRequestKind,
1182    request: &HitlRequestEnvelope,
1183) -> Vec<String> {
1184    let key = match kind {
1185        HitlRequestKind::DualControl => "approvers",
1186        _ => "reviewers",
1187    };
1188    request
1189        .payload
1190        .get(key)
1191        .or_else(|| {
1192            request
1193                .payload
1194                .get("approval_request")
1195                .and_then(|approval| approval.get("reviewers"))
1196        })
1197        .and_then(JsonValue::as_array)
1198        .map(|values| {
1199            values
1200                .iter()
1201                .filter_map(JsonValue::as_str)
1202                .map(str::to_string)
1203                .collect()
1204        })
1205        .unwrap_or_default()
1206}
1207
1208fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1209    json!({
1210        "request_id": progress.request_id.clone(),
1211        "approved": true,
1212        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1213        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1214        "reason": progress.reason,
1215        "signatures": progress.signatures,
1216    })
1217}
1218
1219fn approval_receipt_signature(
1220    request_id: &str,
1221    reviewer: &str,
1222    signed_at: &str,
1223    approved: bool,
1224    reason: Option<&str>,
1225) -> String {
1226    let material = format!(
1227        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1228        reason.unwrap_or("")
1229    );
1230    let hash = sha2::Sha256::digest(material.as_bytes());
1231    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1232    format!("sha256:{hex}")
1233}
1234
1235fn approval_record_from_waitpoint(
1236    record: &WaitpointRecord,
1237    builtin: &str,
1238) -> Result<VmValue, VmError> {
1239    record
1240        .value
1241        .as_ref()
1242        .map(crate::stdlib::json_to_vm_value)
1243        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1244}
1245
1246async fn approval_wait_error(
1247    log: &Arc<AnyEventLog>,
1248    kind: HitlRequestKind,
1249    request_id: &str,
1250) -> VmError {
1251    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1252        if record.status == WaitpointStatus::Cancelled
1253            && record.reason.as_deref() != Some("upstream_cancelled")
1254        {
1255            return approval_denied_error(
1256                request_id,
1257                HitlHostResponse {
1258                    request_id: request_id.to_string(),
1259                    answer: None,
1260                    approved: Some(false),
1261                    accepted: None,
1262                    reviewer: record.cancelled_by.clone(),
1263                    reason: record.reason.clone(),
1264                    metadata: record.metadata.clone(),
1265                    responded_at: record.cancelled_at,
1266                    signature: None,
1267                },
1268            );
1269        }
1270        if record.status == WaitpointStatus::Cancelled {
1271            return hitl_cancelled_error(
1272                request_id,
1273                kind,
1274                "",
1275                &[request_id.to_string()],
1276                record.reason,
1277            );
1278        }
1279    }
1280    hitl_cancelled_error(
1281        request_id,
1282        kind,
1283        "",
1284        &[request_id.to_string()],
1285        Some("upstream_cancelled".to_string()),
1286    )
1287}
1288
1289async fn append_timeout_once(
1290    log: &Arc<AnyEventLog>,
1291    kind: HitlRequestKind,
1292    request_id: &str,
1293    trace_id: &str,
1294) -> Result<(), VmError> {
1295    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1296        return Ok(());
1297    }
1298    append_timeout(log, kind, request_id, trace_id).await
1299}
1300
1301async fn hitl_event_exists(
1302    log: &Arc<AnyEventLog>,
1303    kind: HitlRequestKind,
1304    request_id: &str,
1305    event_kind: &str,
1306) -> Result<bool, VmError> {
1307    let topic = topic(kind)?;
1308    let events = log
1309        .read_range(&topic, None, usize::MAX)
1310        .await
1311        .map_err(log_error)?;
1312    Ok(events
1313        .into_iter()
1314        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1315}
1316
1317fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1318    match kind {
1319        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1320        _ => "hitl.approval_approved",
1321    }
1322}
1323
1324fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1325    match kind {
1326        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1327        _ => "hitl.approval_denied",
1328    }
1329}
1330
1331async fn append_request(
1332    log: &Arc<AnyEventLog>,
1333    request: &HitlRequestEnvelope,
1334) -> Result<(), VmError> {
1335    let topic = topic(request.kind)?;
1336    log.append(
1337        &topic,
1338        LogEvent::new(
1339            request.kind.request_event_kind(),
1340            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1341        )
1342        .with_headers(request_headers(request)),
1343    )
1344    .await
1345    .map(|_| ())
1346    .map_err(log_error)
1347}
1348
1349async fn append_named_event(
1350    log: &Arc<AnyEventLog>,
1351    kind: HitlRequestKind,
1352    event_kind: &str,
1353    request_id: &str,
1354    trace_id: &str,
1355    payload: JsonValue,
1356) -> Result<(), VmError> {
1357    let topic = topic(kind)?;
1358    let headers = headers_with_trace(request_id, trace_id);
1359    log.append(
1360        &topic,
1361        LogEvent::new(event_kind, payload).with_headers(headers),
1362    )
1363    .await
1364    .map(|_| ())
1365    .map_err(log_error)
1366}
1367
1368async fn append_timeout(
1369    log: &Arc<AnyEventLog>,
1370    kind: HitlRequestKind,
1371    request_id: &str,
1372    trace_id: &str,
1373) -> Result<(), VmError> {
1374    append_named_event(
1375        log,
1376        kind,
1377        "hitl.timeout",
1378        request_id,
1379        trace_id,
1380        serde_json::to_value(HitlTimeoutRecord {
1381            request_id: request_id.to_string(),
1382            kind,
1383            trace_id: trace_id.to_string(),
1384            timed_out_at: now_rfc3339(),
1385        })
1386        .map_err(|error| VmError::Runtime(error.to_string()))?,
1387    )
1388    .await
1389}
1390
1391async fn maybe_apply_mock_response(
1392    kind: HitlRequestKind,
1393    request_id: &str,
1394    request_payload: &JsonValue,
1395) -> Result<(), VmError> {
1396    let mut params = request_payload
1397        .as_object()
1398        .cloned()
1399        .unwrap_or_default()
1400        .into_iter()
1401        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1402        .collect::<BTreeMap<_, _>>();
1403    params.insert(
1404        "request_id".to_string(),
1405        VmValue::String(std::sync::Arc::from(request_id.to_string())),
1406    );
1407    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1408        return Ok(());
1409    };
1410    let value = result?;
1411    let responses = match value {
1412        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1413        other => vec![other],
1414    };
1415    for response in responses {
1416        let response_dict = response.as_dict().ok_or_else(|| {
1417            VmError::Runtime(format!(
1418                "mocked HITL {} response must be a dict or list<dict>",
1419                kind.as_str()
1420            ))
1421        })?;
1422        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1423        append_hitl_response(None, hitl_response)
1424            .await
1425            .map_err(VmError::Runtime)?;
1426    }
1427    Ok(())
1428}
1429
1430fn parse_hitl_response_dict(
1431    request_id: &str,
1432    response_dict: &BTreeMap<String, VmValue>,
1433) -> Result<HitlHostResponse, VmError> {
1434    Ok(HitlHostResponse {
1435        request_id: request_id.to_string(),
1436        answer: response_dict
1437            .get("answer")
1438            .map(crate::llm::vm_value_to_json),
1439        approved: response_dict.get("approved").and_then(vm_bool),
1440        accepted: response_dict.get("accepted").and_then(vm_bool),
1441        reviewer: response_dict.get("reviewer").map(VmValue::display),
1442        reason: response_dict.get("reason").map(VmValue::display),
1443        metadata: response_dict
1444            .get("metadata")
1445            .map(crate::llm::vm_value_to_json),
1446        responded_at: response_dict.get("responded_at").map(VmValue::display),
1447        signature: response_dict.get("signature").map(VmValue::display),
1448    })
1449}
1450
1451fn maybe_notify_host(ctx: Option<&AsyncBuiltinCtx>, request: &HitlRequestEnvelope) {
1452    let Some(bridge) = ctx.and_then(|ctx| ctx.child_vm().bridge.clone()) else {
1453        return;
1454    };
1455    bridge.notify(
1456        "harn.hitl.requested",
1457        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1458    );
1459}
1460
1461/// Emit a `HitlRequested` `AgentEvent` so transport adapters
1462/// (currently the A2A `A2aWorkerSink`) can flip a task into
1463/// `input-required` while the script is suspended on the waitpoint.
1464/// No-op when there is no current agent session — the bridge-level
1465/// `harn.hitl.requested` notification still fires for hosts that drive
1466/// HITL UX through the bridge.
1467fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1468    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1469        return;
1470    };
1471    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1472        session_id,
1473        request_id: request.request_id.clone(),
1474        kind: request.kind.as_str().to_string(),
1475        payload: request.payload.clone(),
1476    });
1477}
1478
1479/// Companion to `emit_hitl_requested`: notifies sinks that the
1480/// suspended waitpoint has resolved so a paused task can flip back
1481/// out of `input-required`. `outcome` is one of `"answered"`,
1482/// `"timeout"`, `"cancelled"`, or `"error"`.
1483fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1484    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1485        return;
1486    };
1487    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1488        session_id,
1489        request_id: request_id.to_string(),
1490        kind: kind.as_str().to_string(),
1491        outcome: outcome.to_string(),
1492    });
1493}
1494
1495/// Wrapper around `wait_for_request_waitpoint` that emits the
1496/// canonical `HitlResolved` `AgentEvent` regardless of which terminal
1497/// branch the waitpoint takes (response / timeout / cancellation /
1498/// error). Pair-emitted with `emit_hitl_requested` so transport
1499/// adapters can bracket the `input-required` pause cleanly without
1500/// each `*_impl` having to duplicate the emission at every match arm.
1501async fn wait_for_request_waitpoint_with_events(
1502    request_id: &str,
1503    kind: HitlRequestKind,
1504    timeout: Option<StdDuration>,
1505) -> Result<WaitpointOutcome, VmError> {
1506    let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1507    let label = match &outcome {
1508        Ok(WaitpointOutcome::Completed(_)) => "answered",
1509        Ok(WaitpointOutcome::Timeout) => "timeout",
1510        Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1511        Err(_) => "error",
1512    };
1513    emit_hitl_resolved(request_id, kind, label);
1514    outcome
1515}
1516
1517fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1518    let Some(value) = value else {
1519        return Ok(AskUserOptions {
1520            schema: None,
1521            timeout: Some(default_question_timeout()),
1522            default: None,
1523        });
1524    };
1525    let dict = value
1526        .as_dict()
1527        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1528    Ok(AskUserOptions {
1529        schema: dict
1530            .get("schema")
1531            .cloned()
1532            .filter(|value| !matches!(value, VmValue::Nil)),
1533        timeout: dict
1534            .get("timeout")
1535            .map(parse_duration_value)
1536            .transpose()?
1537            .or_else(|| Some(default_question_timeout())),
1538        default: dict
1539            .get("default")
1540            .cloned()
1541            .filter(|value| !matches!(value, VmValue::Nil)),
1542    })
1543}
1544
1545fn default_question_timeout() -> StdDuration {
1546    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1547}
1548
1549fn escalation_capability_policy() -> JsonValue {
1550    crate::orchestration::current_execution_policy()
1551        .and_then(|policy| serde_json::to_value(policy).ok())
1552        .unwrap_or(JsonValue::Null)
1553}
1554
1555fn parse_approval_options(
1556    value: Option<&VmValue>,
1557    builtin: &str,
1558) -> Result<ApprovalOptions, VmError> {
1559    let dict = match value {
1560        None => None,
1561        Some(VmValue::Dict(dict)) => Some(dict),
1562        Some(_) => {
1563            return Err(VmError::Runtime(format!(
1564                "{builtin}: options must be a dict"
1565            )))
1566        }
1567    };
1568    let quorum = dict
1569        .and_then(|dict| dict.get("quorum"))
1570        .and_then(VmValue::as_int)
1571        .unwrap_or(1);
1572    if quorum <= 0 {
1573        return Err(VmError::Runtime(format!(
1574            "{builtin}: quorum must be positive"
1575        )));
1576    }
1577    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1578    let capabilities_requested = optional_string_list(
1579        dict.and_then(|dict| dict.get("capabilities_requested")),
1580        builtin,
1581    )?;
1582    let evidence_refs = dict
1583        .and_then(|dict| dict.get("evidence_refs"))
1584        .map(|value| match value {
1585            VmValue::List(items) => Ok(items
1586                .iter()
1587                .map(crate::llm::vm_value_to_json)
1588                .collect::<Vec<_>>()),
1589            _ => Err(VmError::Runtime(format!(
1590                "{builtin}: evidence_refs must be a list"
1591            ))),
1592        })
1593        .transpose()?
1594        .unwrap_or_default();
1595    let deadline = dict
1596        .and_then(|dict| dict.get("deadline"))
1597        .map(parse_duration_value)
1598        .transpose()?
1599        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1600    Ok(ApprovalOptions {
1601        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1602        args: dict.and_then(|dict| dict.get("args")).cloned(),
1603        quorum: quorum as u32,
1604        reviewers,
1605        deadline,
1606        principal: dict
1607            .and_then(|dict| dict.get("principal"))
1608            .map(VmValue::display)
1609            .filter(|value| !value.is_empty()),
1610        evidence_refs,
1611        undo_metadata: dict
1612            .and_then(|dict| dict.get("undo_metadata"))
1613            .map(crate::llm::vm_value_to_json),
1614        capabilities_requested,
1615    })
1616}
1617
1618fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1619    args.get(idx)
1620        .map(VmValue::display)
1621        .filter(|value| !value.is_empty())
1622        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1623}
1624
1625fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1626    let value = args
1627        .get(idx)
1628        .and_then(VmValue::as_int)
1629        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1630    if value <= 0 {
1631        return Err(VmError::Runtime(format!(
1632            "{builtin}: expected a positive int at {idx}"
1633        )));
1634    }
1635    Ok(value)
1636}
1637
1638fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1639    let Some(value) = value else {
1640        return Ok(Vec::new());
1641    };
1642    match value {
1643        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1644        _ => Err(VmError::Runtime(format!(
1645            "{builtin}: expected list<string>"
1646        ))),
1647    }
1648}
1649
1650fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1651    duration_from_value(value, "hitl", "timeout", ErrorKind::Runtime)
1652}
1653
1654fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1655    active_event_log()
1656        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1657}
1658
1659fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1660    if let Some(log) = active_event_log() {
1661        return Ok(log);
1662    }
1663    let Some(base_dir) = base_dir else {
1664        return Ok(install_memory_for_current_thread(
1665            HITL_EVENT_LOG_QUEUE_DEPTH,
1666        ));
1667    };
1668    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1669}
1670
1671fn current_dispatch_keys() -> Option<DispatchKeys> {
1672    let context = current_dispatch_context()?;
1673    let stable_base = context
1674        .replay_of_event_id
1675        .clone()
1676        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1677    let instance_key = format!(
1678        "{}::{}",
1679        context.trigger_event.id.0,
1680        context.replay_of_event_id.as_deref().unwrap_or("live")
1681    );
1682    Some(DispatchKeys {
1683        instance_key,
1684        stable_base,
1685        agent: context.agent_id,
1686        trace_id: context.trigger_event.trace_id.0,
1687    })
1688}
1689
1690fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1691    if let Some(keys) = dispatch_keys {
1692        let seq = REQUEST_SEQUENCE.with(|slot| {
1693            let mut state = slot.borrow_mut();
1694            if state.instance_key != keys.instance_key {
1695                state.instance_key = keys.instance_key.clone();
1696                state.next_seq = 0;
1697            }
1698            state.next_seq += 1;
1699            state.next_seq
1700        });
1701        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1702    }
1703    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1704}
1705
1706fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1707    let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1708    if let Some(run_id) = request.run_id.as_ref() {
1709        headers.insert("run_id".to_string(), run_id.clone());
1710    }
1711    headers
1712}
1713
1714fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1715    let mut headers = BTreeMap::new();
1716    headers.insert("request_id".to_string(), request_id.to_string());
1717    headers
1718}
1719
1720fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1721    let mut headers = response_headers(request_id);
1722    headers.insert("trace_id".to_string(), trace_id.to_string());
1723    headers
1724}
1725
1726fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1727    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1728}
1729
1730fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1731    event
1732        .headers
1733        .get("request_id")
1734        .is_some_and(|value| value == request_id)
1735        || event
1736            .payload
1737            .get("request_id")
1738            .and_then(JsonValue::as_str)
1739            .is_some_and(|value| value == request_id)
1740}
1741
1742fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1743    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1744        "name": "ApprovalDeniedError",
1745        "category": "generic",
1746        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1747        "request_id": request_id,
1748        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1749        "reason": response.reason,
1750    })))
1751}
1752
1753fn hitl_cancelled_error(
1754    request_id: &str,
1755    kind: HitlRequestKind,
1756    wait_id: &str,
1757    waitpoint_ids: &[String],
1758    reason: Option<String>,
1759) -> VmError {
1760    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1761    let message = reason
1762        .clone()
1763        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1764    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1765        "name": "HumanCancelledError",
1766        "category": ErrorCategory::Cancelled.as_str(),
1767        "message": message,
1768        "request_id": request_id,
1769        "kind": kind.as_str(),
1770        "wait_id": wait_id,
1771        "waitpoint_ids": waitpoint_ids,
1772        "reason": reason,
1773    })))
1774}
1775
1776fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1777    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1778    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1779        "name": "HumanTimeoutError",
1780        "category": ErrorCategory::Timeout.as_str(),
1781        "message": format!("{} timed out", kind.as_str()),
1782        "request_id": request_id,
1783        "kind": kind.as_str(),
1784    })))
1785}
1786
1787fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1788    match default {
1789        VmValue::Int(_) => match value {
1790            VmValue::Int(_) => value.clone(),
1791            VmValue::Float(number) => VmValue::Int(*number as i64),
1792            VmValue::String(text) => text
1793                .parse::<i64>()
1794                .map(VmValue::Int)
1795                .unwrap_or_else(|_| default.clone()),
1796            _ => default.clone(),
1797        },
1798        VmValue::Float(_) => match value {
1799            VmValue::Float(_) => value.clone(),
1800            VmValue::Int(number) => VmValue::Float(*number as f64),
1801            VmValue::String(text) => text
1802                .parse::<f64>()
1803                .map(VmValue::Float)
1804                .unwrap_or_else(|_| default.clone()),
1805            _ => default.clone(),
1806        },
1807        VmValue::Bool(_) => match value {
1808            VmValue::Bool(_) => value.clone(),
1809            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1810            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1811            _ => default.clone(),
1812        },
1813        VmValue::String(_) => VmValue::String(std::sync::Arc::from(value.display())),
1814        VmValue::Duration(_) => match value {
1815            VmValue::Duration(_) => value.clone(),
1816            VmValue::Int(ms) => VmValue::Duration(*ms),
1817            _ => default.clone(),
1818        },
1819        VmValue::Nil => value.clone(),
1820        _ => {
1821            if value.type_name() == default.type_name() {
1822                value.clone()
1823            } else {
1824                default.clone()
1825            }
1826        }
1827    }
1828}
1829
1830fn log_error(error: impl std::fmt::Display) -> VmError {
1831    VmError::Runtime(error.to_string())
1832}
1833
1834fn now_rfc3339() -> String {
1835    format_rfc3339(OffsetDateTime::now_utc())
1836}
1837
1838fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1839    timestamp
1840        .format(&Rfc3339)
1841        .unwrap_or_else(|_| timestamp.to_string())
1842}
1843
1844fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1845    time::Duration::try_from(duration)
1846        .ok()
1847        .map(|duration| format_rfc3339(requested_at + duration))
1848}
1849
1850fn new_trace_id() -> String {
1851    format!("trace_{}", Uuid::now_v7())
1852}
1853
1854fn vm_bool(value: &VmValue) -> Option<bool> {
1855    match value {
1856        VmValue::Bool(flag) => Some(*flag),
1857        _ => None,
1858    }
1859}
1860
1861fn vm_string(value: &VmValue) -> Option<&str> {
1862    match value {
1863        VmValue::String(text) => Some(text.as_ref()),
1864        _ => None,
1865    }
1866}
1867
1868fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1869    match value {
1870        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1871        _ => None,
1872    }
1873}
1874
1875#[cfg(test)]
1876mod tests {
1877    use super::{
1878        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1879    };
1880    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1881    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1882
1883    async fn execute_hitl_script(
1884        base_dir: &std::path::Path,
1885        source: &str,
1886    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1887        reset_thread_local_state();
1888        let log = install_default_for_base_dir(base_dir).expect("install event log");
1889        let chunk = compile_source(source).expect("compile source");
1890        let mut vm = Vm::new();
1891        register_vm_stdlib(&mut vm);
1892        vm.set_source_dir(base_dir);
1893        vm.execute(&chunk).await?;
1894        let output = vm.output().trim_end().to_string();
1895        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1896        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1897        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1898        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1899        Ok((
1900            output,
1901            question_events,
1902            approval_events,
1903            dual_control_events,
1904            escalation_events,
1905        ))
1906    }
1907
1908    async fn event_kinds(
1909        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1910        topic: &str,
1911    ) -> Vec<String> {
1912        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1913            .await
1914            .expect("read topic")
1915            .into_iter()
1916            .map(|(_, event)| event.kind)
1917            .collect()
1918    }
1919
1920    async fn event_payloads(
1921        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1922        topic: &str,
1923    ) -> Vec<serde_json::Value> {
1924        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1925            .await
1926            .expect("read topic")
1927            .into_iter()
1928            .map(|(_, event)| event.payload)
1929            .collect()
1930    }
1931
1932    #[tokio::test(flavor = "current_thread")]
1933    async fn ask_user_coerces_to_default_type_and_logs_events() {
1934        tokio::task::LocalSet::new()
1935            .run_until(async {
1936                let dir = tempfile::tempdir().expect("tempdir");
1937                let source = r#"
1938pipeline test(task) {
1939  host_mock("hitl", "question", {answer: "9"})
1940  let answer: int = ask_user("Pick a number", {default: 0})
1941  __io_println(answer)
1942}
1943"#;
1944                let (
1945                    output,
1946                    question_events,
1947                    approval_events,
1948                    dual_control_events,
1949                    escalation_events,
1950                ) = execute_hitl_script(dir.path(), source)
1951                    .await
1952                    .expect("script succeeds");
1953                assert_eq!(output, "9");
1954                assert_eq!(
1955                    question_events,
1956                    vec![
1957                        "hitl.question_asked".to_string(),
1958                        "hitl.response_received".to_string()
1959                    ]
1960                );
1961                assert!(approval_events.is_empty());
1962                assert!(dual_control_events.is_empty());
1963                assert!(escalation_events.is_empty());
1964            })
1965            .await;
1966    }
1967
1968    #[tokio::test(flavor = "current_thread")]
1969    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1970        tokio::task::LocalSet::new()
1971            .run_until(async {
1972                let dir = tempfile::tempdir().expect("tempdir");
1973                let source = r#"
1974pipeline test(task) {
1975  host_mock("hitl", "approval", [
1976    {approved: true, reviewer: "alice", reason: "ok"},
1977    {approved: true, reviewer: "bob", reason: "ship it"},
1978  ])
1979  let record = request_approval(
1980    "deploy production",
1981    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1982  )
1983  __io_println(record.approved)
1984  __io_println(len(record.reviewers))
1985  __io_println(record.reviewers[0])
1986  __io_println(record.reviewers[1])
1987}
1988"#;
1989                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1990                    .await
1991                    .expect("script succeeds");
1992                assert_eq!(
1993                    approval_events,
1994                    vec![
1995                        "hitl.approval_requested".to_string(),
1996                        "hitl.response_received".to_string(),
1997                        "hitl.response_received".to_string(),
1998                        "hitl.approval_approved".to_string(),
1999                    ]
2000                );
2001            })
2002            .await;
2003    }
2004
2005    #[tokio::test(flavor = "current_thread")]
2006    async fn request_approval_emits_canonical_approval_request_payload() {
2007        tokio::task::LocalSet::new()
2008            .run_until(async {
2009                reset_thread_local_state();
2010                let dir = tempfile::tempdir().expect("tempdir");
2011                let log = install_default_for_base_dir(dir.path()).expect("install event log");
2012                let source = r#"
2013pipeline test(task) {
2014  host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
2015  request_approval("deploy production", {
2016    args: {environment: "prod"},
2017    quorum: 1,
2018    reviewers: ["alice"],
2019    evidence_refs: [{kind: "run", uri: "run_123"}],
2020    undo_metadata: {strategy: "rollback"},
2021    capabilities_requested: ["deploy.production"],
2022  })
2023}
2024"#;
2025                let chunk = compile_source(source).expect("compile source");
2026                let mut vm = Vm::new();
2027                register_vm_stdlib(&mut vm);
2028                vm.set_source_dir(dir.path());
2029                vm.execute(&chunk).await.expect("script succeeds");
2030
2031                let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
2032                let request_payload = &payloads[0]["payload"];
2033                let approval_request = &request_payload["approval_request"];
2034                assert_eq!(approval_request["id"], request_payload["id"]);
2035                assert_eq!(approval_request["action"], "deploy production");
2036                assert_eq!(approval_request["args"]["environment"], "prod");
2037                assert_eq!(approval_request["approvers_required"], 1);
2038                assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
2039                assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
2040                assert_eq!(
2041                    approval_request["capabilities_requested"][0],
2042                    "deploy.production"
2043                );
2044                assert!(approval_request["requested_at"].as_str().is_some());
2045                assert!(approval_request["deadline"].as_str().is_some());
2046            })
2047            .await;
2048    }
2049
2050    #[tokio::test(flavor = "current_thread")]
2051    async fn request_approval_surfaces_denials_as_typed_errors() {
2052        tokio::task::LocalSet::new()
2053            .run_until(async {
2054                let dir = tempfile::tempdir().expect("tempdir");
2055                let source = r#"
2056pipeline test(task) {
2057  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2058  let denied = try {
2059    request_approval("drop table", {reviewers: ["alice"]})
2060  }
2061  __io_println(is_err(denied))
2062  __io_println(unwrap_err(denied).name)
2063  __io_println(unwrap_err(denied).reason)
2064}
2065"#;
2066                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2067                    .await
2068                    .expect("script succeeds");
2069                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2070                assert_eq!(
2071                    approval_events,
2072                    vec![
2073                        "hitl.approval_requested".to_string(),
2074                        "hitl.response_received".to_string(),
2075                        "hitl.approval_denied".to_string(),
2076                    ]
2077                );
2078            })
2079            .await;
2080    }
2081
2082    #[tokio::test(flavor = "current_thread")]
2083    async fn dual_control_executes_action_after_quorum() {
2084        tokio::task::LocalSet::new()
2085            .run_until(async {
2086                let dir = tempfile::tempdir().expect("tempdir");
2087                let source = r#"
2088pipeline test(task) {
2089  host_mock("hitl", "dual_control", [
2090    {approved: true, reviewer: "alice"},
2091    {approved: true, reviewer: "bob"},
2092  ])
2093  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2094  __io_println(result)
2095}
2096"#;
2097                let (output, _, _, dual_control_events, _) =
2098                    execute_hitl_script(dir.path(), source)
2099                        .await
2100                        .expect("script succeeds");
2101                assert_eq!(output, "launched");
2102                assert_eq!(
2103                    dual_control_events,
2104                    vec![
2105                        "hitl.dual_control_requested".to_string(),
2106                        "hitl.response_received".to_string(),
2107                        "hitl.response_received".to_string(),
2108                        "hitl.dual_control_approved".to_string(),
2109                        "hitl.dual_control_executed".to_string(),
2110                    ]
2111                );
2112            })
2113            .await;
2114    }
2115
2116    #[tokio::test(flavor = "current_thread")]
2117    async fn escalate_to_waits_for_acceptance_event() {
2118        tokio::task::LocalSet::new()
2119            .run_until(async {
2120                let dir = tempfile::tempdir().expect("tempdir");
2121                let source = r#"
2122pipeline test(task) {
2123  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2124  let handle = escalate_to("admin", "need override")
2125  __io_println(handle.status)
2126  __io_println(handle.reviewer)
2127}
2128"#;
2129                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2130                    .await
2131                    .expect("script succeeds");
2132                assert_eq!(output, "accepted\nlead");
2133                assert_eq!(
2134                    escalation_events,
2135                    vec![
2136                        "hitl.escalation_issued".to_string(),
2137                        "hitl.escalation_accepted".to_string(),
2138                    ]
2139                );
2140            })
2141            .await;
2142    }
2143
2144    /// `harn-serve` adapters (A2A `input-required`, ACP `hitl_request`)
2145    /// rely on the canonical `AgentEvent::HitlRequested` /
2146    /// `AgentEvent::HitlResolved` pair to bracket every HITL pause.
2147    /// Pin the contract here so future HITL primitives keep emitting
2148    /// the event around their waitpoint blocks.
2149    #[tokio::test(flavor = "current_thread")]
2150    async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2151        use std::sync::Mutex as StdMutex;
2152
2153        tokio::task::LocalSet::new()
2154            .run_until(async {
2155                let dir = tempfile::tempdir().expect("tempdir");
2156                let session_id = "hitl-session".to_string();
2157                let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2158                    std::sync::Arc::new(StdMutex::new(Vec::new()));
2159
2160                struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2161                impl crate::agent_events::AgentEventSink for CaptureSink {
2162                    fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2163                        self.0.lock().expect("captured").push(event.clone());
2164                    }
2165                }
2166
2167                // Inline the script setup rather than using the
2168                // `execute_hitl_script` helper: that helper calls
2169                // `reset_thread_local_state` (which wipes the session
2170                // store), so any session pushed before it would be
2171                // gone by the time `ask_user` runs.
2172                crate::reset_thread_local_state();
2173                crate::event_log::install_default_for_base_dir(dir.path())
2174                    .expect("install event log");
2175
2176                crate::agent_events::reset_all_sinks();
2177                let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2178                    std::sync::Arc::new(CaptureSink(captured.clone()));
2179                crate::agent_events::register_sink(session_id.clone(), sink);
2180                crate::agent_sessions::open_or_create(Some(session_id.clone()));
2181                let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2182
2183                let source = r#"
2184pipeline test(task) {
2185  host_mock("hitl", "question", {answer: "ok"})
2186  let answer: string = ask_user("Are you sure?", {default: "no"})
2187  __io_println(answer)
2188}
2189"#;
2190                let chunk = crate::compile_source(source).expect("compile source");
2191                let mut vm = Vm::new();
2192                register_vm_stdlib(&mut vm);
2193                vm.set_source_dir(dir.path());
2194                vm.execute(&chunk).await.expect("script runs");
2195                assert_eq!(vm.output().trim_end(), "ok");
2196
2197                let events = captured.lock().expect("captured");
2198                let mut iter = events.iter().filter(|event| {
2199                    matches!(
2200                        event,
2201                        crate::agent_events::AgentEvent::HitlRequested { .. }
2202                            | crate::agent_events::AgentEvent::HitlResolved { .. }
2203                    )
2204                });
2205                let requested = iter.next().expect("HitlRequested emitted");
2206                let resolved = iter.next().expect("HitlResolved emitted");
2207                assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2208
2209                let crate::agent_events::AgentEvent::HitlRequested {
2210                    session_id: req_session,
2211                    request_id: req_id,
2212                    kind: req_kind,
2213                    payload,
2214                } = requested
2215                else {
2216                    panic!("expected HitlRequested, got: {requested:?}");
2217                };
2218                assert_eq!(req_session, &session_id);
2219                assert_eq!(req_kind, "question");
2220                assert!(req_id.starts_with("hitl_question_"));
2221                assert_eq!(payload["prompt"], "Are you sure?");
2222
2223                let crate::agent_events::AgentEvent::HitlResolved {
2224                    request_id: res_id,
2225                    kind: res_kind,
2226                    outcome,
2227                    ..
2228                } = resolved
2229                else {
2230                    panic!("expected HitlResolved, got: {resolved:?}");
2231                };
2232                assert_eq!(res_id, req_id);
2233                assert_eq!(res_kind, "question");
2234                assert_eq!(outcome, "answered");
2235
2236                drop(_guard);
2237                crate::agent_events::reset_all_sinks();
2238            })
2239            .await;
2240    }
2241}