Skip to main content

harn_vm/stdlib/
hitl.rs

1use crate::value::VmDictExt;
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::runtime_limits::RuntimeLimits;
20use crate::schema::schema_expect_value;
21use crate::stdlib::host::dispatch_mock_host_call;
22use crate::stdlib::macros::{harn_builtin, BuiltinSignature, Param, VmBuiltinDef, TY_ANY, TY_DICT};
23use crate::stdlib::options::{duration_from_value, ErrorKind};
24use crate::stdlib::waitpoint::{
25    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
26    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
27    WaitpointWaitOptions,
28};
29use crate::triggers::dispatcher::current_dispatch_context;
30use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
31use crate::vm::{AsyncBuiltinCtx, Vm};
32
33const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
34const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
35const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
36
37pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
38pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
39pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
40pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
41
42thread_local! {
43    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
44}
45
46#[derive(Default)]
47pub(crate) struct RequestSequenceState {
48    pub(crate) instance_key: String,
49    pub(crate) next_seq: u64,
50}
51
52#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub enum HitlRequestKind {
55    Question,
56    Approval,
57    DualControl,
58    Escalation,
59}
60
61impl HitlRequestKind {
62    pub(crate) fn as_str(self) -> &'static str {
63        match self {
64            Self::Question => "question",
65            Self::Approval => "approval",
66            Self::DualControl => "dual_control",
67            Self::Escalation => "escalation",
68        }
69    }
70
71    fn topic(self) -> &'static str {
72        match self {
73            Self::Question => HITL_QUESTIONS_TOPIC,
74            Self::Approval => HITL_APPROVALS_TOPIC,
75            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
76            Self::Escalation => HITL_ESCALATIONS_TOPIC,
77        }
78    }
79
80    fn request_event_kind(self) -> &'static str {
81        match self {
82            Self::Question => "hitl.question_asked",
83            Self::Approval => "hitl.approval_requested",
84            Self::DualControl => "hitl.dual_control_requested",
85            Self::Escalation => "hitl.escalation_issued",
86        }
87    }
88
89    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
90        if request_id.starts_with("hitl_question_") {
91            Some(Self::Question)
92        } else if request_id.starts_with("hitl_approval_") {
93            Some(Self::Approval)
94        } else if request_id.starts_with("hitl_dual_control_") {
95            Some(Self::DualControl)
96        } else if request_id.starts_with("hitl_escalation_") {
97            Some(Self::Escalation)
98        } else {
99            None
100        }
101    }
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize)]
105pub struct HitlHostResponse {
106    pub request_id: String,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub answer: Option<JsonValue>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub approved: Option<bool>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub accepted: Option<bool>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub reviewer: Option<String>,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub reason: Option<String>,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub metadata: Option<JsonValue>,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub responded_at: Option<String>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub signature: Option<String>,
123}
124
125#[derive(Clone, Debug, Serialize, Deserialize)]
126struct HitlRequestEnvelope {
127    request_id: String,
128    kind: HitlRequestKind,
129    #[serde(default)]
130    agent: String,
131    trace_id: String,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    run_id: Option<String>,
134    requested_at: String,
135    payload: JsonValue,
136}
137
138#[derive(Clone, Debug, Serialize, Deserialize)]
139struct HitlTimeoutRecord {
140    request_id: String,
141    kind: HitlRequestKind,
142    trace_id: String,
143    timed_out_at: String,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147pub struct ApprovalRequest {
148    pub id: String,
149    pub action: String,
150    #[serde(default)]
151    pub args: JsonValue,
152    pub principal: String,
153    pub requested_at: String,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub deadline: Option<String>,
156    pub approvers_required: u32,
157    #[serde(default)]
158    pub evidence_refs: Vec<JsonValue>,
159    #[serde(default)]
160    pub undo_metadata: JsonValue,
161    #[serde(default)]
162    pub capabilities_requested: Vec<String>,
163}
164
165impl ApprovalRequest {
166    pub fn new(
167        id: impl Into<String>,
168        action: impl Into<String>,
169        args: JsonValue,
170        principal: impl Into<String>,
171        requested_at: impl Into<String>,
172    ) -> Self {
173        Self {
174            id: id.into(),
175            action: action.into(),
176            args,
177            principal: principal.into(),
178            requested_at: requested_at.into(),
179            deadline: None,
180            approvers_required: 1,
181            evidence_refs: Vec::new(),
182            undo_metadata: JsonValue::Null,
183            capabilities_requested: Vec::new(),
184        }
185    }
186}
187
188pub(crate) fn approval_request_for_host_permission(
189    id: impl Into<String>,
190    action: impl Into<String>,
191    args: JsonValue,
192    principal: impl Into<String>,
193    evidence_refs: Vec<JsonValue>,
194    undo_metadata: JsonValue,
195    capabilities_requested: Vec<String>,
196) -> ApprovalRequest {
197    let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
198    request.evidence_refs = evidence_refs;
199    request.undo_metadata = undo_metadata;
200    request.capabilities_requested = capabilities_requested;
201    request
202}
203
204#[derive(Clone, Debug)]
205struct DispatchKeys {
206    instance_key: String,
207    stable_base: String,
208    agent: String,
209    trace_id: String,
210}
211
212#[derive(Clone, Debug)]
213struct AskUserOptions {
214    schema: Option<VmValue>,
215    timeout: Option<StdDuration>,
216    default: Option<VmValue>,
217}
218
219#[derive(Clone, Debug)]
220struct ApprovalOptions {
221    detail: Option<VmValue>,
222    args: Option<VmValue>,
223    quorum: u32,
224    reviewers: Vec<String>,
225    deadline: StdDuration,
226    principal: Option<String>,
227    evidence_refs: Vec<JsonValue>,
228    undo_metadata: Option<JsonValue>,
229    capabilities_requested: Vec<String>,
230}
231
232#[derive(Clone, Debug)]
233struct ApprovalProgress {
234    request_id: String,
235    reviewers: BTreeSet<String>,
236    signatures: Vec<ApprovalSignature>,
237    reason: Option<String>,
238    approved_at: Option<String>,
239}
240
241#[derive(Clone, Debug, Serialize)]
242struct ApprovalSignature {
243    reviewer: String,
244    signed_at: String,
245    signature: String,
246}
247
248#[derive(Clone, Debug)]
249enum ApprovalResolution {
250    Pending,
251    Approved(ApprovalProgress),
252    Denied(HitlHostResponse),
253}
254
255// `Completed` carries the full `WaitpointRecord`, which dominates the
256// enum's size — boxing it would force every match arm to indirect even
257// though the enum is dropped within nanoseconds of being constructed
258// (it's a local return type for the waitpoint poll loop, never stored).
259// Surfaced by the host-target compile of `harn-vm` introduced when
260// `harn-cli`'s build script gained `harn-vm` as a build-dep for the
261// AOT bytecode embedding pass (G7 / harn#2300).
262#[allow(clippy::large_enum_variant)]
263#[derive(Clone, Debug)]
264enum WaitpointOutcome {
265    Completed(WaitpointRecord),
266    Timeout,
267    Cancelled {
268        wait_id: String,
269        waitpoint_ids: Vec<String>,
270        reason: Option<String>,
271    },
272}
273
274pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
275    for def in MODULE_BUILTINS {
276        vm.register_builtin_def(def);
277    }
278}
279
280pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
281    &ASK_USER_BUILTIN_DEF,
282    &REQUEST_APPROVAL_BUILTIN_DEF,
283    &DUAL_CONTROL_BUILTIN_DEF,
284    &ESCALATE_TO_BUILTIN_DEF,
285];
286
287#[harn_builtin(
288    sig = "ask_user(prompt: string, options?: dict) -> any",
289    kind = "async",
290    category = "hitl"
291)]
292async fn ask_user_builtin(
293    ctx: crate::vm::AsyncBuiltinCtx,
294    args: Vec<VmValue>,
295) -> Result<VmValue, VmError> {
296    ask_user_impl(Some(&ctx), &args).await
297}
298
299#[harn_builtin(
300    sig_expr = BuiltinSignature::variadic("request_approval", &[Param::new("args", TY_ANY)], TY_DICT),
301    kind = "async",
302    category = "hitl"
303)]
304async fn request_approval_builtin(
305    ctx: crate::vm::AsyncBuiltinCtx,
306    args: Vec<VmValue>,
307) -> Result<VmValue, VmError> {
308    request_approval_impl(Some(&ctx), &args).await
309}
310
311#[harn_builtin(
312    sig = "dual_control(n: int, m: int, action: closure, approvers?: list) -> dict",
313    kind = "async",
314    category = "hitl"
315)]
316async fn dual_control_builtin(
317    ctx: crate::vm::AsyncBuiltinCtx,
318    args: Vec<VmValue>,
319) -> Result<VmValue, VmError> {
320    dual_control_impl(&ctx, &args).await
321}
322
323#[harn_builtin(
324    sig = "escalate_to(role: string, reason: string) -> dict",
325    kind = "async",
326    category = "hitl"
327)]
328async fn escalate_to_builtin(
329    ctx: crate::vm::AsyncBuiltinCtx,
330    args: Vec<VmValue>,
331) -> Result<VmValue, VmError> {
332    escalate_to_impl(Some(&ctx), &args).await
333}
334
335pub(crate) fn reset_hitl_state() {
336    REQUEST_SEQUENCE.with(|slot| {
337        *slot.borrow_mut() = RequestSequenceState::default();
338    });
339}
340
341pub(crate) fn take_hitl_state() -> RequestSequenceState {
342    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
343}
344
345pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
346    REQUEST_SEQUENCE.with(|slot| {
347        *slot.borrow_mut() = state;
348    });
349}
350
351pub async fn append_hitl_response(
352    base_dir: Option<&Path>,
353    mut response: HitlHostResponse,
354) -> Result<u64, String> {
355    let kind = HitlRequestKind::from_request_id(&response.request_id)
356        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
357    if response.responded_at.is_none() {
358        response.responded_at = Some(now_rfc3339());
359    }
360    let log = ensure_hitl_event_log_for(base_dir)?;
361    let headers = response_headers(&response.request_id);
362    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
363    let event_id = log
364        .append(
365            &topic,
366            LogEvent::new(
367                match kind {
368                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
369                    _ => "hitl.response_received",
370                },
371                serde_json::to_value(&response).map_err(|error| error.to_string())?,
372            )
373            .with_headers(headers),
374        )
375        .await
376        .map_err(|error| error.to_string())?;
377    finalize_hitl_response(&log, kind, &response).await?;
378    Ok(event_id)
379}
380
381pub async fn append_approval_request_on(
382    log: &Arc<AnyEventLog>,
383    agent: impl Into<String>,
384    trace_id: impl Into<String>,
385    action: impl Into<String>,
386    detail: JsonValue,
387    reviewers: Vec<String>,
388) -> Result<String, VmError> {
389    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
390    let trace_id = trace_id.into();
391    let agent = agent.into();
392    let requested_at_time = OffsetDateTime::now_utc();
393    let requested_at = format_rfc3339(requested_at_time);
394    let mut approval_request = ApprovalRequest::new(
395        request_id.clone(),
396        action.into(),
397        detail.clone(),
398        agent.clone(),
399        requested_at.clone(),
400    );
401    approval_request.deadline = deadline_after(
402        requested_at_time,
403        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
404    );
405    approval_request.approvers_required = 1;
406    let approval_request_json = serde_json::to_value(&approval_request)
407        .map_err(|error| VmError::Runtime(error.to_string()))?;
408    let request = HitlRequestEnvelope {
409        request_id: request_id.clone(),
410        kind: HitlRequestKind::Approval,
411        agent,
412        trace_id: trace_id.clone(),
413        run_id: None,
414        requested_at: requested_at.clone(),
415        payload: json!({
416            "approval_request": approval_request_json,
417            "id": approval_request.id,
418            "action": approval_request.action,
419            "args": approval_request.args,
420            "principal": approval_request.principal,
421            "requested_at": requested_at,
422            "deadline": approval_request.deadline,
423            "approvers_required": approval_request.approvers_required,
424            "evidence_refs": approval_request.evidence_refs,
425            "undo_metadata": approval_request.undo_metadata,
426            "capabilities_requested": approval_request.capabilities_requested,
427            "detail": detail,
428            "quorum": 1,
429            "reviewers": reviewers,
430            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
431        }),
432    };
433    create_request_waitpoint(log, &request).await?;
434    append_request(log, &request).await?;
435    maybe_notify_host(None, &request);
436    Ok(request_id)
437}
438
439async fn ask_user_impl(
440    ctx: Option<&AsyncBuiltinCtx>,
441    args: &[VmValue],
442) -> Result<VmValue, VmError> {
443    let prompt = required_string_arg(args, 0, "ask_user")?;
444    let options = parse_ask_user_options(args.get(1))?;
445    let keys = current_dispatch_keys();
446    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
447    let trace_id = keys
448        .as_ref()
449        .map(|keys| keys.trace_id.clone())
450        .unwrap_or_else(new_trace_id);
451    let log = ensure_hitl_event_log();
452    let request = HitlRequestEnvelope {
453        request_id: request_id.clone(),
454        kind: HitlRequestKind::Question,
455        agent: keys
456            .as_ref()
457            .map(|keys| keys.agent.clone())
458            .unwrap_or_default(),
459        trace_id: trace_id.clone(),
460        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
461        requested_at: now_rfc3339(),
462        payload: json!({
463            "prompt": prompt,
464            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
465            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
466            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
467        }),
468    };
469    create_request_waitpoint(&log, &request).await?;
470    append_request(&log, &request).await?;
471    maybe_notify_host(ctx, &request);
472    emit_hitl_requested(&request);
473    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
474
475    match wait_for_request_waitpoint_with_events(
476        &request_id,
477        HitlRequestKind::Question,
478        options.timeout,
479    )
480    .await?
481    {
482        WaitpointOutcome::Completed(record) => {
483            let answer = record
484                .value
485                .as_ref()
486                .map(crate::stdlib::json_to_vm_value)
487                .unwrap_or(VmValue::Nil);
488            if let Some(schema) = options.schema.as_ref() {
489                return schema_expect_value(&answer, schema, true);
490            }
491            if let Some(default) = options.default.as_ref() {
492                return Ok(coerce_like_default(&answer, default));
493            }
494            Ok(answer)
495        }
496        WaitpointOutcome::Timeout => {
497            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
498            if let Some(default) = options.default {
499                return Ok(default);
500            }
501            Err(timeout_error(&request_id, HitlRequestKind::Question))
502        }
503        WaitpointOutcome::Cancelled {
504            wait_id,
505            waitpoint_ids,
506            reason,
507        } => Err(hitl_cancelled_error(
508            &request_id,
509            HitlRequestKind::Question,
510            &wait_id,
511            &waitpoint_ids,
512            reason,
513        )),
514    }
515}
516
517async fn request_approval_impl(
518    ctx: Option<&AsyncBuiltinCtx>,
519    args: &[VmValue],
520) -> Result<VmValue, VmError> {
521    let action = required_string_arg(args, 0, "request_approval")?;
522    let options = parse_approval_options(args.get(1), "request_approval")?;
523    let keys = current_dispatch_keys();
524    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
525    let trace_id = keys
526        .as_ref()
527        .map(|keys| keys.trace_id.clone())
528        .unwrap_or_else(new_trace_id);
529    let agent = keys
530        .as_ref()
531        .map(|keys| keys.agent.clone())
532        .unwrap_or_default();
533    let requested_at_time = OffsetDateTime::now_utc();
534    let requested_at = format_rfc3339(requested_at_time);
535    let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
536    let approval_args = options
537        .args
538        .as_ref()
539        .or(options.detail.as_ref())
540        .map(crate::llm::vm_value_to_json)
541        .unwrap_or(JsonValue::Null);
542    let mut approval_request = ApprovalRequest::new(
543        request_id.clone(),
544        action.clone(),
545        approval_args,
546        principal,
547        requested_at.clone(),
548    );
549    approval_request.deadline = deadline_after(requested_at_time, options.deadline);
550    approval_request.approvers_required = options.quorum;
551    approval_request.evidence_refs = options.evidence_refs.clone();
552    approval_request.undo_metadata = options
553        .undo_metadata
554        .clone()
555        .or_else(|| {
556            crate::orchestration::current_mutation_session()
557                .and_then(|session| serde_json::to_value(session).ok())
558        })
559        .unwrap_or(JsonValue::Null);
560    approval_request.capabilities_requested = options.capabilities_requested.clone();
561    let approval_request_json = serde_json::to_value(&approval_request)
562        .map_err(|error| VmError::Runtime(error.to_string()))?;
563    let log = ensure_hitl_event_log();
564    let request = HitlRequestEnvelope {
565        request_id: request_id.clone(),
566        kind: HitlRequestKind::Approval,
567        agent,
568        trace_id: trace_id.clone(),
569        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
570        requested_at: requested_at.clone(),
571        payload: json!({
572            "approval_request": approval_request_json,
573            "id": approval_request.id,
574            "action": action,
575            "args": approval_request.args,
576            "principal": approval_request.principal,
577            "requested_at": requested_at,
578            "deadline": approval_request.deadline,
579            "approvers_required": approval_request.approvers_required,
580            "evidence_refs": approval_request.evidence_refs,
581            "undo_metadata": approval_request.undo_metadata,
582            "capabilities_requested": approval_request.capabilities_requested,
583            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
584            "quorum": options.quorum,
585            "reviewers": options.reviewers,
586            "deadline_ms": options.deadline.as_millis() as u64,
587        }),
588    };
589    create_request_waitpoint(&log, &request).await?;
590    append_request(&log, &request).await?;
591    maybe_notify_host(ctx, &request);
592    emit_hitl_requested(&request);
593    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
594
595    match wait_for_request_waitpoint_with_events(
596        &request_id,
597        HitlRequestKind::Approval,
598        Some(options.deadline),
599    )
600    .await?
601    {
602        WaitpointOutcome::Completed(record) => {
603            approval_record_from_waitpoint(&record, "request_approval")
604        }
605        WaitpointOutcome::Timeout => {
606            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
607            Err(timeout_error(&request_id, HitlRequestKind::Approval))
608        }
609        WaitpointOutcome::Cancelled { .. } => {
610            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
611        }
612    }
613}
614
615pub(crate) async fn request_approval_for_side_effect(
616    action: &str,
617    detail: JsonValue,
618    principal: String,
619    reviewers: Vec<String>,
620    capabilities_requested: Vec<String>,
621) -> Result<VmValue, VmError> {
622    let mut options = crate::value::DictMap::new();
623    options.insert(
624        crate::value::intern_key("args"),
625        crate::stdlib::json_to_vm_value(&detail),
626    );
627    options.insert(
628        crate::value::intern_key("detail"),
629        crate::stdlib::json_to_vm_value(&detail),
630    );
631    options.put_str("principal", principal);
632    options.insert(
633        crate::value::intern_key("reviewers"),
634        VmValue::List(std::sync::Arc::new(
635            reviewers
636                .into_iter()
637                .map(|reviewer| VmValue::String(arcstr::ArcStr::from(reviewer)))
638                .collect(),
639        )),
640    );
641    options.insert(
642        crate::value::intern_key("capabilities_requested"),
643        VmValue::List(std::sync::Arc::new(
644            capabilities_requested
645                .into_iter()
646                .map(|capability| VmValue::String(arcstr::ArcStr::from(capability)))
647                .collect(),
648        )),
649    );
650    let args = vec![
651        VmValue::String(arcstr::ArcStr::from(action.to_string())),
652        VmValue::dict(options),
653    ];
654    request_approval_impl(None, &args).await
655}
656
657async fn dual_control_impl(ctx: &AsyncBuiltinCtx, args: &[VmValue]) -> Result<VmValue, VmError> {
658    let n = required_positive_int_arg(args, 0, "dual_control")?;
659    let m = required_positive_int_arg(args, 1, "dual_control")?;
660    if n > m {
661        return Err(VmError::Runtime(
662            "dual_control: n must be less than or equal to m".to_string(),
663        ));
664    }
665    let action = args
666        .get(2)
667        .and_then(|value| match value {
668            VmValue::Closure(closure) => Some(closure.clone()),
669            _ => None,
670        })
671        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
672    let approvers = optional_string_list(args.get(3), "dual_control")?;
673    if !approvers.is_empty() && approvers.len() < m as usize {
674        return Err(VmError::Runtime(format!(
675            "dual_control: expected at least {m} approvers, got {}",
676            approvers.len()
677        )));
678    }
679
680    let keys = current_dispatch_keys();
681    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
682    let trace_id = keys
683        .as_ref()
684        .map(|keys| keys.trace_id.clone())
685        .unwrap_or_else(new_trace_id);
686    let action_name = if action.func.name.is_empty() {
687        "anonymous".to_string()
688    } else {
689        action.func.name.clone()
690    };
691    let agent = keys
692        .as_ref()
693        .map(|keys| keys.agent.clone())
694        .unwrap_or_default();
695    let requested_at_time = OffsetDateTime::now_utc();
696    let requested_at = format_rfc3339(requested_at_time);
697    let mut approval_request = ApprovalRequest::new(
698        request_id.clone(),
699        action_name.clone(),
700        json!({
701            "n": n,
702            "m": m,
703            "approvers": approvers.clone(),
704        }),
705        agent.clone(),
706        requested_at.clone(),
707    );
708    approval_request.deadline = deadline_after(
709        requested_at_time,
710        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
711    );
712    approval_request.approvers_required = n as u32;
713    approval_request.undo_metadata = crate::orchestration::current_mutation_session()
714        .and_then(|session| serde_json::to_value(session).ok())
715        .unwrap_or(JsonValue::Null);
716    let approval_request_json = serde_json::to_value(&approval_request)
717        .map_err(|error| VmError::Runtime(error.to_string()))?;
718    let log = ensure_hitl_event_log();
719    let request = HitlRequestEnvelope {
720        request_id: request_id.clone(),
721        kind: HitlRequestKind::DualControl,
722        agent,
723        trace_id: trace_id.clone(),
724        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
725        requested_at: requested_at.clone(),
726        payload: json!({
727            "approval_request": approval_request_json,
728            "id": approval_request.id,
729            "args": approval_request.args,
730            "principal": approval_request.principal,
731            "requested_at": requested_at,
732            "deadline": approval_request.deadline,
733            "approvers_required": approval_request.approvers_required,
734            "evidence_refs": approval_request.evidence_refs,
735            "undo_metadata": approval_request.undo_metadata,
736            "capabilities_requested": approval_request.capabilities_requested,
737            "n": n,
738            "m": m,
739            "action": action_name,
740            "approvers": approvers,
741            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
742        }),
743    };
744    create_request_waitpoint(&log, &request).await?;
745    append_request(&log, &request).await?;
746    maybe_notify_host(Some(ctx), &request);
747    emit_hitl_requested(&request);
748    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
749
750    match wait_for_request_waitpoint_with_events(
751        &request_id,
752        HitlRequestKind::DualControl,
753        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
754    )
755    .await?
756    {
757        WaitpointOutcome::Completed(record) => {
758            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
759            let mut vm = ctx.child_vm();
760            let result = vm.call_closure_pub(&action, &[]).await?;
761            ctx.forward_output(&vm.take_output());
762
763            append_named_event(
764                &log,
765                HitlRequestKind::DualControl,
766                "hitl.dual_control_executed",
767                &request_id,
768                &trace_id,
769                json!({
770                    "request_id": request_id,
771                    "result": crate::llm::vm_value_to_json(&result),
772                }),
773            )
774            .await?;
775
776            Ok(result)
777        }
778        WaitpointOutcome::Timeout => {
779            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
780            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
781        }
782        WaitpointOutcome::Cancelled { .. } => {
783            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
784        }
785    }
786}
787
788async fn escalate_to_impl(
789    ctx: Option<&AsyncBuiltinCtx>,
790    args: &[VmValue],
791) -> Result<VmValue, VmError> {
792    let role = required_string_arg(args, 0, "escalate_to")?;
793    let reason = required_string_arg(args, 1, "escalate_to")?;
794    let keys = current_dispatch_keys();
795    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
796    let trace_id = keys
797        .as_ref()
798        .map(|keys| keys.trace_id.clone())
799        .unwrap_or_else(new_trace_id);
800    let log = ensure_hitl_event_log();
801    let request = HitlRequestEnvelope {
802        request_id: request_id.clone(),
803        kind: HitlRequestKind::Escalation,
804        agent: keys
805            .as_ref()
806            .map(|keys| keys.agent.clone())
807            .unwrap_or_default(),
808        trace_id: trace_id.clone(),
809        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
810        requested_at: now_rfc3339(),
811        payload: json!({
812            "role": role,
813            "reason": reason,
814            "capability_policy": escalation_capability_policy(),
815        }),
816    };
817    create_request_waitpoint(&log, &request).await?;
818    append_request(&log, &request).await?;
819    maybe_notify_host(ctx, &request);
820    emit_hitl_requested(&request);
821    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
822
823    match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
824        .await?
825    {
826        WaitpointOutcome::Completed(record) => {
827            let accepted_at = record.completed_at.clone();
828            let reviewer = record.completed_by.clone();
829            let accepted = record
830                .value
831                .as_ref()
832                .and_then(|value| value.get("accepted"))
833                .and_then(JsonValue::as_bool)
834                .unwrap_or(true);
835            Ok(crate::stdlib::json_to_vm_value(&json!({
836                "request_id": request_id,
837                "role": role,
838                "reason": reason,
839                "trace_id": trace_id,
840                "status": if accepted { "accepted" } else { "pending" },
841                "accepted_at": accepted_at,
842                "reviewer": reviewer,
843            })))
844        }
845        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
846        WaitpointOutcome::Cancelled {
847            wait_id,
848            waitpoint_ids,
849            reason,
850        } => Err(hitl_cancelled_error(
851            &request_id,
852            HitlRequestKind::Escalation,
853            &wait_id,
854            &waitpoint_ids,
855            reason,
856        )),
857    }
858}
859
860async fn create_request_waitpoint(
861    log: &Arc<AnyEventLog>,
862    request: &HitlRequestEnvelope,
863) -> Result<(), VmError> {
864    create_waitpoint_on(
865        log,
866        Some(request.request_id.clone()),
867        Some(json!({
868            "kind": request.kind.as_str(),
869            "agent": request.agent.clone(),
870            "trace_id": request.trace_id.clone(),
871            "requested_at": request.requested_at.clone(),
872            "payload": request.payload.clone(),
873        })),
874    )
875    .await?;
876    Ok(())
877}
878
879async fn wait_for_request_waitpoint(
880    request_id: &str,
881    timeout: Option<StdDuration>,
882) -> Result<WaitpointOutcome, VmError> {
883    match wait_on_waitpoints(
884        vec![request_id.to_string()],
885        WaitpointWaitOptions { timeout },
886    )
887    .await
888    {
889        Ok(records) => Ok(WaitpointOutcome::Completed(
890            records
891                .into_iter()
892                .next()
893                .expect("single waitpoint wait result"),
894        )),
895        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
896        Err(WaitpointWaitFailure::Cancelled {
897            wait_id,
898            waitpoint_ids,
899            reason,
900        }) => Ok(WaitpointOutcome::Cancelled {
901            wait_id,
902            waitpoint_ids,
903            reason,
904        }),
905        Err(WaitpointWaitFailure::Vm(error)) => {
906            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
907                return Ok(outcome);
908            }
909            Err(error)
910        }
911    }
912}
913
914fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
915    let VmError::Thrown(VmValue::Dict(dict)) = error else {
916        return None;
917    };
918    let name = dict.get("name").and_then(vm_string)?;
919    match name {
920        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
921        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
922            wait_id: dict
923                .get("wait_id")
924                .and_then(vm_string)
925                .unwrap_or_default()
926                .to_string(),
927            waitpoint_ids: dict
928                .get("waitpoint_ids")
929                .and_then(vm_string_list)
930                .unwrap_or_default(),
931            reason: dict
932                .get("reason")
933                .and_then(vm_string)
934                .map(ToString::to_string),
935        }),
936        _ => None,
937    }
938}
939
940async fn finalize_hitl_response(
941    log: &Arc<AnyEventLog>,
942    kind: HitlRequestKind,
943    response: &HitlHostResponse,
944) -> Result<(), String> {
945    match kind {
946        HitlRequestKind::Question => {
947            if waitpoint_is_terminal(log, &response.request_id).await? {
948                return Ok(());
949            }
950            complete_waitpoint_on(
951                log,
952                &response.request_id,
953                response.answer.clone(),
954                response.reviewer.clone(),
955                response.reason.clone(),
956                response.metadata.clone(),
957            )
958            .await
959            .map(|_| ())
960            .map_err(|error| error.to_string())
961        }
962        HitlRequestKind::Escalation => {
963            if !response.accepted.unwrap_or(false)
964                || waitpoint_is_terminal(log, &response.request_id).await?
965            {
966                return Ok(());
967            }
968            complete_waitpoint_on(
969                log,
970                &response.request_id,
971                Some(json!({
972                    "accepted": true,
973                    "reviewer": response.reviewer,
974                    "reason": response.reason,
975                    "responded_at": response.responded_at,
976                })),
977                response.reviewer.clone(),
978                response.reason.clone(),
979                response.metadata.clone(),
980            )
981            .await
982            .map(|_| ())
983            .map_err(|error| error.to_string())
984        }
985        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
986            if waitpoint_is_terminal(log, &response.request_id).await? {
987                return Ok(());
988            }
989            let request = load_request_envelope(log, kind, &response.request_id)
990                .await
991                .map_err(|error| error.to_string())?;
992            match resolve_approval_state(log, kind, &request)
993                .await
994                .map_err(|error| error.to_string())?
995            {
996                ApprovalResolution::Pending => Ok(()),
997                ApprovalResolution::Approved(progress) => {
998                    let record = approval_record_json(&progress);
999                    append_named_event(
1000                        log,
1001                        kind,
1002                        approved_event_kind(kind),
1003                        &request.request_id,
1004                        &request.trace_id,
1005                        json!({
1006                            "request_id": request.request_id.clone(),
1007                            "record": record.clone(),
1008                        }),
1009                    )
1010                    .await
1011                    .map_err(|error| error.to_string())?;
1012                    complete_waitpoint_on(
1013                        log,
1014                        &request.request_id,
1015                        Some(record),
1016                        response.reviewer.clone(),
1017                        progress.reason.clone(),
1018                        response.metadata.clone(),
1019                    )
1020                    .await
1021                    .map(|_| ())
1022                    .map_err(|error| error.to_string())
1023                }
1024                ApprovalResolution::Denied(denied) => {
1025                    append_named_event(
1026                        log,
1027                        kind,
1028                        denied_event_kind(kind),
1029                        &request.request_id,
1030                        &request.trace_id,
1031                        json!({
1032                            "request_id": request.request_id.clone(),
1033                            "reviewer": denied.reviewer.clone(),
1034                            "reason": denied.reason.clone(),
1035                        }),
1036                    )
1037                    .await
1038                    .map_err(|error| error.to_string())?;
1039                    cancel_waitpoint_on(
1040                        log,
1041                        &request.request_id,
1042                        denied.reviewer.clone(),
1043                        denied.reason.clone(),
1044                        denied.metadata.clone(),
1045                    )
1046                    .await
1047                    .map(|_| ())
1048                    .map_err(|error| error.to_string())
1049                }
1050            }
1051        }
1052    }
1053}
1054
1055async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1056    Ok(inspect_waitpoint_on(log, request_id)
1057        .await
1058        .map_err(|error| error.to_string())?
1059        .is_some_and(|record| record.status != WaitpointStatus::Open))
1060}
1061
1062async fn load_request_envelope(
1063    log: &Arc<AnyEventLog>,
1064    kind: HitlRequestKind,
1065    request_id: &str,
1066) -> Result<HitlRequestEnvelope, VmError> {
1067    let topic = topic(kind)?;
1068    let events = log
1069        .read_range(&topic, None, usize::MAX)
1070        .await
1071        .map_err(log_error)?;
1072    events
1073        .into_iter()
1074        .filter(|(_, event)| event.kind == kind.request_event_kind())
1075        .find_map(|(_, event)| {
1076            if !event_matches_request(&event, request_id) {
1077                return None;
1078            }
1079            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1080        })
1081        .ok_or_else(|| {
1082            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1083        })
1084}
1085
1086async fn resolve_approval_state(
1087    log: &Arc<AnyEventLog>,
1088    kind: HitlRequestKind,
1089    request: &HitlRequestEnvelope,
1090) -> Result<ApprovalResolution, VmError> {
1091    let quorum = approval_quorum_from_request(kind, request)?;
1092    let allowed_reviewers = approval_reviewers_from_request(kind, request)
1093        .into_iter()
1094        .collect::<BTreeSet<_>>();
1095    let mut progress = ApprovalProgress {
1096        request_id: request.request_id.clone(),
1097        reviewers: BTreeSet::new(),
1098        signatures: Vec::new(),
1099        reason: None,
1100        approved_at: None,
1101    };
1102    let topic = topic(kind)?;
1103    let events = log
1104        .read_range(&topic, None, usize::MAX)
1105        .await
1106        .map_err(log_error)?;
1107    for (_, event) in events {
1108        if !event_matches_request(&event, &request.request_id)
1109            || event.kind != "hitl.response_received"
1110        {
1111            continue;
1112        }
1113        let response: HitlHostResponse = serde_json::from_value(event.payload)
1114            .map_err(|error| VmError::Runtime(error.to_string()))?;
1115        if let Some(reviewer) = response.reviewer.as_deref() {
1116            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1117                continue;
1118            }
1119            if progress.reviewers.contains(reviewer) {
1120                continue;
1121            }
1122        }
1123        if response.approved.unwrap_or(false) {
1124            if let Some(reviewer) = response.reviewer.clone() {
1125                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1126                progress.reviewers.insert(reviewer.clone());
1127                progress.signatures.push(ApprovalSignature {
1128                    reviewer: reviewer.clone(),
1129                    signed_at: signed_at.clone(),
1130                    signature: response.signature.clone().unwrap_or_else(|| {
1131                        approval_receipt_signature(
1132                            &request.request_id,
1133                            &reviewer,
1134                            &signed_at,
1135                            true,
1136                            response.reason.as_deref(),
1137                        )
1138                    }),
1139                });
1140            }
1141            progress.reason = response.reason.clone();
1142            progress.approved_at = response.responded_at.clone();
1143            if progress.reviewers.len() as u32 >= quorum {
1144                return Ok(ApprovalResolution::Approved(progress));
1145            }
1146            continue;
1147        }
1148        return Ok(ApprovalResolution::Denied(response));
1149    }
1150    Ok(ApprovalResolution::Pending)
1151}
1152
1153fn approval_quorum_from_request(
1154    kind: HitlRequestKind,
1155    request: &HitlRequestEnvelope,
1156) -> Result<u32, VmError> {
1157    let key = match kind {
1158        HitlRequestKind::DualControl => "n",
1159        _ => "quorum",
1160    };
1161    let quorum = request
1162        .payload
1163        .get(key)
1164        .or_else(|| request.payload.get("approvers_required"))
1165        .or_else(|| {
1166            request
1167                .payload
1168                .get("approval_request")
1169                .and_then(|approval| approval.get("approvers_required"))
1170        })
1171        .and_then(JsonValue::as_u64)
1172        .unwrap_or(1);
1173    u32::try_from(quorum).map_err(|_| {
1174        VmError::Runtime(format!(
1175            "invalid quorum in HITL request '{}'",
1176            request.request_id
1177        ))
1178    })
1179}
1180
1181fn approval_reviewers_from_request(
1182    kind: HitlRequestKind,
1183    request: &HitlRequestEnvelope,
1184) -> Vec<String> {
1185    let key = match kind {
1186        HitlRequestKind::DualControl => "approvers",
1187        _ => "reviewers",
1188    };
1189    request
1190        .payload
1191        .get(key)
1192        .or_else(|| {
1193            request
1194                .payload
1195                .get("approval_request")
1196                .and_then(|approval| approval.get("reviewers"))
1197        })
1198        .and_then(JsonValue::as_array)
1199        .map(|values| {
1200            values
1201                .iter()
1202                .filter_map(JsonValue::as_str)
1203                .map(str::to_string)
1204                .collect()
1205        })
1206        .unwrap_or_default()
1207}
1208
1209fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1210    json!({
1211        "request_id": progress.request_id.clone(),
1212        "approved": true,
1213        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1214        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1215        "reason": progress.reason,
1216        "signatures": progress.signatures,
1217    })
1218}
1219
1220fn approval_receipt_signature(
1221    request_id: &str,
1222    reviewer: &str,
1223    signed_at: &str,
1224    approved: bool,
1225    reason: Option<&str>,
1226) -> String {
1227    let material = format!(
1228        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1229        reason.unwrap_or("")
1230    );
1231    let hash = sha2::Sha256::digest(material.as_bytes());
1232    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1233    format!("sha256:{hex}")
1234}
1235
1236fn approval_record_from_waitpoint(
1237    record: &WaitpointRecord,
1238    builtin: &str,
1239) -> Result<VmValue, VmError> {
1240    record
1241        .value
1242        .as_ref()
1243        .map(crate::stdlib::json_to_vm_value)
1244        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1245}
1246
1247async fn approval_wait_error(
1248    log: &Arc<AnyEventLog>,
1249    kind: HitlRequestKind,
1250    request_id: &str,
1251) -> VmError {
1252    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1253        if record.status == WaitpointStatus::Cancelled
1254            && record.reason.as_deref() != Some("upstream_cancelled")
1255        {
1256            return approval_denied_error(
1257                request_id,
1258                HitlHostResponse {
1259                    request_id: request_id.to_string(),
1260                    answer: None,
1261                    approved: Some(false),
1262                    accepted: None,
1263                    reviewer: record.cancelled_by.clone(),
1264                    reason: record.reason.clone(),
1265                    metadata: record.metadata.clone(),
1266                    responded_at: record.cancelled_at,
1267                    signature: None,
1268                },
1269            );
1270        }
1271        if record.status == WaitpointStatus::Cancelled {
1272            return hitl_cancelled_error(
1273                request_id,
1274                kind,
1275                "",
1276                &[request_id.to_string()],
1277                record.reason,
1278            );
1279        }
1280    }
1281    hitl_cancelled_error(
1282        request_id,
1283        kind,
1284        "",
1285        &[request_id.to_string()],
1286        Some("upstream_cancelled".to_string()),
1287    )
1288}
1289
1290async fn append_timeout_once(
1291    log: &Arc<AnyEventLog>,
1292    kind: HitlRequestKind,
1293    request_id: &str,
1294    trace_id: &str,
1295) -> Result<(), VmError> {
1296    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1297        return Ok(());
1298    }
1299    append_timeout(log, kind, request_id, trace_id).await
1300}
1301
1302async fn hitl_event_exists(
1303    log: &Arc<AnyEventLog>,
1304    kind: HitlRequestKind,
1305    request_id: &str,
1306    event_kind: &str,
1307) -> Result<bool, VmError> {
1308    let topic = topic(kind)?;
1309    let events = log
1310        .read_range(&topic, None, usize::MAX)
1311        .await
1312        .map_err(log_error)?;
1313    Ok(events
1314        .into_iter()
1315        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1316}
1317
1318fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1319    match kind {
1320        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1321        _ => "hitl.approval_approved",
1322    }
1323}
1324
1325fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1326    match kind {
1327        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1328        _ => "hitl.approval_denied",
1329    }
1330}
1331
1332async fn append_request(
1333    log: &Arc<AnyEventLog>,
1334    request: &HitlRequestEnvelope,
1335) -> Result<(), VmError> {
1336    let topic = topic(request.kind)?;
1337    log.append(
1338        &topic,
1339        LogEvent::new(
1340            request.kind.request_event_kind(),
1341            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1342        )
1343        .with_headers(request_headers(request)),
1344    )
1345    .await
1346    .map(|_| ())
1347    .map_err(log_error)
1348}
1349
1350async fn append_named_event(
1351    log: &Arc<AnyEventLog>,
1352    kind: HitlRequestKind,
1353    event_kind: &str,
1354    request_id: &str,
1355    trace_id: &str,
1356    payload: JsonValue,
1357) -> Result<(), VmError> {
1358    let topic = topic(kind)?;
1359    let headers = headers_with_trace(request_id, trace_id);
1360    log.append(
1361        &topic,
1362        LogEvent::new(event_kind, payload).with_headers(headers),
1363    )
1364    .await
1365    .map(|_| ())
1366    .map_err(log_error)
1367}
1368
1369async fn append_timeout(
1370    log: &Arc<AnyEventLog>,
1371    kind: HitlRequestKind,
1372    request_id: &str,
1373    trace_id: &str,
1374) -> Result<(), VmError> {
1375    append_named_event(
1376        log,
1377        kind,
1378        "hitl.timeout",
1379        request_id,
1380        trace_id,
1381        serde_json::to_value(HitlTimeoutRecord {
1382            request_id: request_id.to_string(),
1383            kind,
1384            trace_id: trace_id.to_string(),
1385            timed_out_at: now_rfc3339(),
1386        })
1387        .map_err(|error| VmError::Runtime(error.to_string()))?,
1388    )
1389    .await
1390}
1391
1392async fn maybe_apply_mock_response(
1393    kind: HitlRequestKind,
1394    request_id: &str,
1395    request_payload: &JsonValue,
1396) -> Result<(), VmError> {
1397    let mut params = request_payload
1398        .as_object()
1399        .cloned()
1400        .unwrap_or_default()
1401        .into_iter()
1402        .map(|(key, value)| {
1403            (
1404                crate::value::intern_key(&key),
1405                crate::stdlib::json_to_vm_value(&value),
1406            )
1407        })
1408        .collect::<crate::value::DictMap>();
1409    params.put_str("request_id", request_id);
1410    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1411        return Ok(());
1412    };
1413    let value = result?;
1414    let responses = match value {
1415        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1416        other => vec![other],
1417    };
1418    for response in responses {
1419        let response_dict = response.as_dict().ok_or_else(|| {
1420            VmError::Runtime(format!(
1421                "mocked HITL {} response must be a dict or list<dict>",
1422                kind.as_str()
1423            ))
1424        })?;
1425        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1426        append_hitl_response(None, hitl_response)
1427            .await
1428            .map_err(VmError::Runtime)?;
1429    }
1430    Ok(())
1431}
1432
1433fn parse_hitl_response_dict(
1434    request_id: &str,
1435    response_dict: &crate::value::DictMap,
1436) -> Result<HitlHostResponse, VmError> {
1437    Ok(HitlHostResponse {
1438        request_id: request_id.to_string(),
1439        answer: response_dict
1440            .get("answer")
1441            .map(crate::llm::vm_value_to_json),
1442        approved: response_dict.get("approved").and_then(vm_bool),
1443        accepted: response_dict.get("accepted").and_then(vm_bool),
1444        reviewer: response_dict.get("reviewer").map(VmValue::display),
1445        reason: response_dict.get("reason").map(VmValue::display),
1446        metadata: response_dict
1447            .get("metadata")
1448            .map(crate::llm::vm_value_to_json),
1449        responded_at: response_dict.get("responded_at").map(VmValue::display),
1450        signature: response_dict.get("signature").map(VmValue::display),
1451    })
1452}
1453
1454fn maybe_notify_host(ctx: Option<&AsyncBuiltinCtx>, request: &HitlRequestEnvelope) {
1455    let Some(bridge) = ctx.and_then(|ctx| ctx.child_vm().bridge.clone()) else {
1456        return;
1457    };
1458    bridge.notify(
1459        "harn.hitl.requested",
1460        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1461    );
1462}
1463
1464/// Emit a `HitlRequested` `AgentEvent` so transport adapters
1465/// (currently the A2A `A2aWorkerSink`) can flip a task into
1466/// `input-required` while the script is suspended on the waitpoint.
1467/// No-op when there is no current agent session — the bridge-level
1468/// `harn.hitl.requested` notification still fires for hosts that drive
1469/// HITL UX through the bridge.
1470fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1471    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1472        return;
1473    };
1474    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1475        session_id,
1476        request_id: request.request_id.clone(),
1477        kind: request.kind.as_str().to_string(),
1478        payload: request.payload.clone(),
1479    });
1480}
1481
1482/// Companion to `emit_hitl_requested`: notifies sinks that the
1483/// suspended waitpoint has resolved so a paused task can flip back
1484/// out of `input-required`. `outcome` is one of `"answered"`,
1485/// `"timeout"`, `"cancelled"`, or `"error"`.
1486fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1487    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1488        return;
1489    };
1490    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1491        session_id,
1492        request_id: request_id.to_string(),
1493        kind: kind.as_str().to_string(),
1494        outcome: outcome.to_string(),
1495    });
1496}
1497
1498/// Wrapper around `wait_for_request_waitpoint` that emits the
1499/// canonical `HitlResolved` `AgentEvent` regardless of which terminal
1500/// branch the waitpoint takes (response / timeout / cancellation /
1501/// error). Pair-emitted with `emit_hitl_requested` so transport
1502/// adapters can bracket the `input-required` pause cleanly without
1503/// each `*_impl` having to duplicate the emission at every match arm.
1504async fn wait_for_request_waitpoint_with_events(
1505    request_id: &str,
1506    kind: HitlRequestKind,
1507    timeout: Option<StdDuration>,
1508) -> Result<WaitpointOutcome, VmError> {
1509    let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1510    let label = match &outcome {
1511        Ok(WaitpointOutcome::Completed(_)) => "answered",
1512        Ok(WaitpointOutcome::Timeout) => "timeout",
1513        Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1514        Err(_) => "error",
1515    };
1516    emit_hitl_resolved(request_id, kind, label);
1517    outcome
1518}
1519
1520fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1521    let Some(value) = value else {
1522        return Ok(AskUserOptions {
1523            schema: None,
1524            timeout: Some(default_question_timeout()),
1525            default: None,
1526        });
1527    };
1528    let dict = value
1529        .as_dict()
1530        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1531    Ok(AskUserOptions {
1532        schema: dict
1533            .get("schema")
1534            .cloned()
1535            .filter(|value| !matches!(value, VmValue::Nil)),
1536        timeout: dict
1537            .get("timeout")
1538            .map(parse_duration_value)
1539            .transpose()?
1540            .or_else(|| Some(default_question_timeout())),
1541        default: dict
1542            .get("default")
1543            .cloned()
1544            .filter(|value| !matches!(value, VmValue::Nil)),
1545    })
1546}
1547
1548fn default_question_timeout() -> StdDuration {
1549    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1550}
1551
1552fn escalation_capability_policy() -> JsonValue {
1553    crate::orchestration::current_execution_policy()
1554        .and_then(|policy| serde_json::to_value(policy).ok())
1555        .unwrap_or(JsonValue::Null)
1556}
1557
1558fn parse_approval_options(
1559    value: Option<&VmValue>,
1560    builtin: &str,
1561) -> Result<ApprovalOptions, VmError> {
1562    let dict = match value {
1563        None => None,
1564        Some(VmValue::Dict(dict)) => Some(dict),
1565        Some(_) => {
1566            return Err(VmError::Runtime(format!(
1567                "{builtin}: options must be a dict"
1568            )))
1569        }
1570    };
1571    let quorum = dict
1572        .and_then(|dict| dict.get("quorum"))
1573        .and_then(VmValue::as_int)
1574        .unwrap_or(1);
1575    if quorum <= 0 {
1576        return Err(VmError::Runtime(format!(
1577            "{builtin}: quorum must be positive"
1578        )));
1579    }
1580    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1581    let capabilities_requested = optional_string_list(
1582        dict.and_then(|dict| dict.get("capabilities_requested")),
1583        builtin,
1584    )?;
1585    let evidence_refs = dict
1586        .and_then(|dict| dict.get("evidence_refs"))
1587        .map(|value| match value {
1588            VmValue::List(items) => Ok(items
1589                .iter()
1590                .map(crate::llm::vm_value_to_json)
1591                .collect::<Vec<_>>()),
1592            _ => Err(VmError::Runtime(format!(
1593                "{builtin}: evidence_refs must be a list"
1594            ))),
1595        })
1596        .transpose()?
1597        .unwrap_or_default();
1598    let deadline = dict
1599        .and_then(|dict| dict.get("deadline"))
1600        .map(parse_duration_value)
1601        .transpose()?
1602        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1603    Ok(ApprovalOptions {
1604        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1605        args: dict.and_then(|dict| dict.get("args")).cloned(),
1606        quorum: quorum as u32,
1607        reviewers,
1608        deadline,
1609        principal: dict
1610            .and_then(|dict| dict.get("principal"))
1611            .map(VmValue::display)
1612            .filter(|value| !value.is_empty()),
1613        evidence_refs,
1614        undo_metadata: dict
1615            .and_then(|dict| dict.get("undo_metadata"))
1616            .map(crate::llm::vm_value_to_json),
1617        capabilities_requested,
1618    })
1619}
1620
1621fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1622    args.get(idx)
1623        .map(VmValue::display)
1624        .filter(|value| !value.is_empty())
1625        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1626}
1627
1628fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1629    let value = args
1630        .get(idx)
1631        .and_then(VmValue::as_int)
1632        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1633    if value <= 0 {
1634        return Err(VmError::Runtime(format!(
1635            "{builtin}: expected a positive int at {idx}"
1636        )));
1637    }
1638    Ok(value)
1639}
1640
1641fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1642    let Some(value) = value else {
1643        return Ok(Vec::new());
1644    };
1645    match value {
1646        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1647        _ => Err(VmError::Runtime(format!(
1648            "{builtin}: expected list<string>"
1649        ))),
1650    }
1651}
1652
1653fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1654    duration_from_value(value, "hitl", "timeout", ErrorKind::Runtime)
1655}
1656
1657fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1658    active_event_log()
1659        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1660}
1661
1662fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1663    if let Some(log) = active_event_log() {
1664        return Ok(log);
1665    }
1666    let Some(base_dir) = base_dir else {
1667        return Ok(install_memory_for_current_thread(
1668            HITL_EVENT_LOG_QUEUE_DEPTH,
1669        ));
1670    };
1671    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1672}
1673
1674fn current_dispatch_keys() -> Option<DispatchKeys> {
1675    let context = current_dispatch_context()?;
1676    let stable_base = context
1677        .replay_of_event_id
1678        .clone()
1679        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1680    let instance_key = format!(
1681        "{}::{}",
1682        context.trigger_event.id.0,
1683        context.replay_of_event_id.as_deref().unwrap_or("live")
1684    );
1685    Some(DispatchKeys {
1686        instance_key,
1687        stable_base,
1688        agent: context.agent_id,
1689        trace_id: context.trigger_event.trace_id.0,
1690    })
1691}
1692
1693fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1694    if let Some(keys) = dispatch_keys {
1695        let seq = REQUEST_SEQUENCE.with(|slot| {
1696            let mut state = slot.borrow_mut();
1697            if state.instance_key != keys.instance_key {
1698                state.instance_key = keys.instance_key.clone();
1699                state.next_seq = 0;
1700            }
1701            state.next_seq += 1;
1702            state.next_seq
1703        });
1704        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1705    }
1706    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1707}
1708
1709fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1710    let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1711    if let Some(run_id) = request.run_id.as_ref() {
1712        headers.insert("run_id".to_string(), run_id.clone());
1713    }
1714    headers
1715}
1716
1717fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1718    let mut headers = std::collections::BTreeMap::new();
1719    headers.insert("request_id".to_string(), request_id.to_string());
1720    headers
1721}
1722
1723fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1724    let mut headers = response_headers(request_id);
1725    headers.insert("trace_id".to_string(), trace_id.to_string());
1726    headers
1727}
1728
1729fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1730    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1731}
1732
1733fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1734    event
1735        .headers
1736        .get("request_id")
1737        .is_some_and(|value| value == request_id)
1738        || event
1739            .payload
1740            .get("request_id")
1741            .and_then(JsonValue::as_str)
1742            .is_some_and(|value| value == request_id)
1743}
1744
1745fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1746    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1747        "name": "ApprovalDeniedError",
1748        "category": "generic",
1749        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1750        "request_id": request_id,
1751        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1752        "reason": response.reason,
1753    })))
1754}
1755
1756fn hitl_cancelled_error(
1757    request_id: &str,
1758    kind: HitlRequestKind,
1759    wait_id: &str,
1760    waitpoint_ids: &[String],
1761    reason: Option<String>,
1762) -> VmError {
1763    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1764    let message = reason
1765        .clone()
1766        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1767    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1768        "name": "HumanCancelledError",
1769        "category": ErrorCategory::Cancelled.as_str(),
1770        "message": message,
1771        "request_id": request_id,
1772        "kind": kind.as_str(),
1773        "wait_id": wait_id,
1774        "waitpoint_ids": waitpoint_ids,
1775        "reason": reason,
1776    })))
1777}
1778
1779fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1780    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1781    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1782        "name": "HumanTimeoutError",
1783        "category": ErrorCategory::Timeout.as_str(),
1784        "message": format!("{} timed out", kind.as_str()),
1785        "request_id": request_id,
1786        "kind": kind.as_str(),
1787    })))
1788}
1789
1790fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1791    match default {
1792        VmValue::Int(_) => match value {
1793            VmValue::Int(_) => value.clone(),
1794            VmValue::Float(number) => VmValue::Int(*number as i64),
1795            VmValue::String(text) => text
1796                .parse::<i64>()
1797                .map(VmValue::Int)
1798                .unwrap_or_else(|_| default.clone()),
1799            _ => default.clone(),
1800        },
1801        VmValue::Float(_) => match value {
1802            VmValue::Float(_) => value.clone(),
1803            VmValue::Int(number) => VmValue::Float(*number as f64),
1804            VmValue::String(text) => text
1805                .parse::<f64>()
1806                .map(VmValue::Float)
1807                .unwrap_or_else(|_| default.clone()),
1808            _ => default.clone(),
1809        },
1810        VmValue::Bool(_) => match value {
1811            VmValue::Bool(_) => value.clone(),
1812            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1813            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1814            _ => default.clone(),
1815        },
1816        VmValue::String(_) => VmValue::String(arcstr::ArcStr::from(value.display())),
1817        VmValue::Duration(_) => match value {
1818            VmValue::Duration(_) => value.clone(),
1819            VmValue::Int(ms) => VmValue::Duration(*ms),
1820            _ => default.clone(),
1821        },
1822        VmValue::Nil => value.clone(),
1823        _ => {
1824            if value.type_name() == default.type_name() {
1825                value.clone()
1826            } else {
1827                default.clone()
1828            }
1829        }
1830    }
1831}
1832
1833fn log_error(error: impl std::fmt::Display) -> VmError {
1834    VmError::Runtime(error.to_string())
1835}
1836
1837fn now_rfc3339() -> String {
1838    format_rfc3339(OffsetDateTime::now_utc())
1839}
1840
1841fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1842    timestamp
1843        .format(&Rfc3339)
1844        .unwrap_or_else(|_| timestamp.to_string())
1845}
1846
1847fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1848    time::Duration::try_from(duration)
1849        .ok()
1850        .map(|duration| format_rfc3339(requested_at + duration))
1851}
1852
1853fn new_trace_id() -> String {
1854    format!("trace_{}", Uuid::now_v7())
1855}
1856
1857fn vm_bool(value: &VmValue) -> Option<bool> {
1858    match value {
1859        VmValue::Bool(flag) => Some(*flag),
1860        _ => None,
1861    }
1862}
1863
1864fn vm_string(value: &VmValue) -> Option<&str> {
1865    match value {
1866        VmValue::String(text) => Some(text.as_ref()),
1867        _ => None,
1868    }
1869}
1870
1871fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1872    match value {
1873        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1874        _ => None,
1875    }
1876}
1877
1878#[cfg(test)]
1879mod tests {
1880    use std::sync::OnceLock;
1881
1882    use tokio::sync::Mutex;
1883
1884    use super::{
1885        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1886    };
1887    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1888    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1889
1890    /// Serialize tests that exercise the request-approval path. Those tests
1891    /// drive the Harn VM through its full HITL state machine and rely on
1892    /// thread-local event-log handles that are set up by
1893    /// `execute_hitl_script` → `reset_thread_local_state()`. Under heavy
1894    /// parallel load the OS thread that a `current_thread` tokio runtime
1895    /// runs on can be reused between tests; if the outgoing test's async
1896    /// drop runs concurrently with the incoming test's reset the thread-
1897    /// local event log is in a transitional state and events can be double-
1898    /// counted or missed. Holding this mutex for the duration of each test
1899    /// turns the hazard into a hard serialize.
1900    fn hitl_lock() -> &'static Mutex<()> {
1901        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
1902        LOCK.get_or_init(|| Mutex::new(()))
1903    }
1904
1905    async fn execute_hitl_script(
1906        base_dir: &std::path::Path,
1907        source: &str,
1908    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1909        reset_thread_local_state();
1910        let log = install_default_for_base_dir(base_dir).expect("install event log");
1911        let chunk = compile_source(source).expect("compile source");
1912        let mut vm = Vm::new();
1913        register_vm_stdlib(&mut vm);
1914        vm.set_source_dir(base_dir);
1915        vm.execute(&chunk).await?;
1916        let output = vm.output().trim_end().to_string();
1917        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1918        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1919        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1920        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1921        Ok((
1922            output,
1923            question_events,
1924            approval_events,
1925            dual_control_events,
1926            escalation_events,
1927        ))
1928    }
1929
1930    async fn event_kinds(
1931        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1932        topic: &str,
1933    ) -> Vec<String> {
1934        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1935            .await
1936            .expect("read topic")
1937            .into_iter()
1938            .map(|(_, event)| event.kind)
1939            .collect()
1940    }
1941
1942    async fn event_payloads(
1943        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1944        topic: &str,
1945    ) -> Vec<serde_json::Value> {
1946        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1947            .await
1948            .expect("read topic")
1949            .into_iter()
1950            .map(|(_, event)| event.payload)
1951            .collect()
1952    }
1953
1954    #[tokio::test(flavor = "current_thread")]
1955    async fn ask_user_coerces_to_default_type_and_logs_events() {
1956        tokio::task::LocalSet::new()
1957            .run_until(async {
1958                let dir = tempfile::tempdir().expect("tempdir");
1959                let source = r#"
1960pipeline test(task) {
1961  host_mock("hitl", "question", {answer: "9"})
1962  let answer: int = ask_user("Pick a number", {default: 0})
1963  __io_println(answer)
1964}
1965"#;
1966                let (
1967                    output,
1968                    question_events,
1969                    approval_events,
1970                    dual_control_events,
1971                    escalation_events,
1972                ) = execute_hitl_script(dir.path(), source)
1973                    .await
1974                    .expect("script succeeds");
1975                assert_eq!(output, "9");
1976                assert_eq!(
1977                    question_events,
1978                    vec![
1979                        "hitl.question_asked".to_string(),
1980                        "hitl.response_received".to_string()
1981                    ]
1982                );
1983                assert!(approval_events.is_empty());
1984                assert!(dual_control_events.is_empty());
1985                assert!(escalation_events.is_empty());
1986            })
1987            .await;
1988    }
1989
1990    #[tokio::test(flavor = "current_thread")]
1991    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1992        let _guard = hitl_lock().lock().await;
1993        tokio::task::LocalSet::new()
1994            .run_until(async {
1995                reset_thread_local_state();
1996                let dir = tempfile::tempdir().expect("tempdir");
1997                let source = r#"
1998pipeline test(task) {
1999  host_mock("hitl", "approval", [
2000    {approved: true, reviewer: "alice", reason: "ok"},
2001    {approved: true, reviewer: "bob", reason: "ship it"},
2002  ])
2003  let record = request_approval(
2004    "deploy production",
2005    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
2006  )
2007  __io_println(record.approved)
2008  __io_println(len(record.reviewers))
2009  __io_println(record.reviewers[0])
2010  __io_println(record.reviewers[1])
2011}
2012"#;
2013                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2014                    .await
2015                    .expect("script succeeds");
2016                assert_eq!(
2017                    approval_events,
2018                    vec![
2019                        "hitl.approval_requested".to_string(),
2020                        "hitl.response_received".to_string(),
2021                        "hitl.response_received".to_string(),
2022                        "hitl.approval_approved".to_string(),
2023                    ]
2024                );
2025            })
2026            .await;
2027    }
2028
2029    #[tokio::test(flavor = "current_thread")]
2030    async fn request_approval_emits_canonical_approval_request_payload() {
2031        tokio::task::LocalSet::new()
2032            .run_until(async {
2033                reset_thread_local_state();
2034                let dir = tempfile::tempdir().expect("tempdir");
2035                let log = install_default_for_base_dir(dir.path()).expect("install event log");
2036                let source = r#"
2037pipeline test(task) {
2038  host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
2039  request_approval("deploy production", {
2040    args: {environment: "prod"},
2041    quorum: 1,
2042    reviewers: ["alice"],
2043    evidence_refs: [{kind: "run", uri: "run_123"}],
2044    undo_metadata: {strategy: "rollback"},
2045    capabilities_requested: ["deploy.production"],
2046  })
2047}
2048"#;
2049                let chunk = compile_source(source).expect("compile source");
2050                let mut vm = Vm::new();
2051                register_vm_stdlib(&mut vm);
2052                vm.set_source_dir(dir.path());
2053                vm.execute(&chunk).await.expect("script succeeds");
2054
2055                let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
2056                let request_payload = &payloads[0]["payload"];
2057                let approval_request = &request_payload["approval_request"];
2058                assert_eq!(approval_request["id"], request_payload["id"]);
2059                assert_eq!(approval_request["action"], "deploy production");
2060                assert_eq!(approval_request["args"]["environment"], "prod");
2061                assert_eq!(approval_request["approvers_required"], 1);
2062                assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
2063                assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
2064                assert_eq!(
2065                    approval_request["capabilities_requested"][0],
2066                    "deploy.production"
2067                );
2068                assert!(approval_request["requested_at"].as_str().is_some());
2069                assert!(approval_request["deadline"].as_str().is_some());
2070            })
2071            .await;
2072    }
2073
2074    #[tokio::test(flavor = "current_thread")]
2075    async fn request_approval_surfaces_denials_as_typed_errors() {
2076        let _guard = hitl_lock().lock().await;
2077        tokio::task::LocalSet::new()
2078            .run_until(async {
2079                reset_thread_local_state();
2080                let dir = tempfile::tempdir().expect("tempdir");
2081                let source = r#"
2082pipeline test(task) {
2083  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2084  let denied = try {
2085    request_approval("drop table", {reviewers: ["alice"]})
2086  }
2087  __io_println(is_err(denied))
2088  __io_println(unwrap_err(denied).name)
2089  __io_println(unwrap_err(denied).reason)
2090}
2091"#;
2092                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2093                    .await
2094                    .expect("script succeeds");
2095                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2096                assert_eq!(
2097                    approval_events,
2098                    vec![
2099                        "hitl.approval_requested".to_string(),
2100                        "hitl.response_received".to_string(),
2101                        "hitl.approval_denied".to_string(),
2102                    ]
2103                );
2104            })
2105            .await;
2106    }
2107
2108    #[tokio::test(flavor = "current_thread")]
2109    async fn dual_control_executes_action_after_quorum() {
2110        tokio::task::LocalSet::new()
2111            .run_until(async {
2112                let dir = tempfile::tempdir().expect("tempdir");
2113                let source = r#"
2114pipeline test(task) {
2115  host_mock("hitl", "dual_control", [
2116    {approved: true, reviewer: "alice"},
2117    {approved: true, reviewer: "bob"},
2118  ])
2119  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2120  __io_println(result)
2121}
2122"#;
2123                let (output, _, _, dual_control_events, _) =
2124                    execute_hitl_script(dir.path(), source)
2125                        .await
2126                        .expect("script succeeds");
2127                assert_eq!(output, "launched");
2128                assert_eq!(
2129                    dual_control_events,
2130                    vec![
2131                        "hitl.dual_control_requested".to_string(),
2132                        "hitl.response_received".to_string(),
2133                        "hitl.response_received".to_string(),
2134                        "hitl.dual_control_approved".to_string(),
2135                        "hitl.dual_control_executed".to_string(),
2136                    ]
2137                );
2138            })
2139            .await;
2140    }
2141
2142    #[tokio::test(flavor = "current_thread")]
2143    async fn escalate_to_waits_for_acceptance_event() {
2144        tokio::task::LocalSet::new()
2145            .run_until(async {
2146                let dir = tempfile::tempdir().expect("tempdir");
2147                let source = r#"
2148pipeline test(task) {
2149  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2150  let handle = escalate_to("admin", "need override")
2151  __io_println(handle.status)
2152  __io_println(handle.reviewer)
2153}
2154"#;
2155                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2156                    .await
2157                    .expect("script succeeds");
2158                assert_eq!(output, "accepted\nlead");
2159                assert_eq!(
2160                    escalation_events,
2161                    vec![
2162                        "hitl.escalation_issued".to_string(),
2163                        "hitl.escalation_accepted".to_string(),
2164                    ]
2165                );
2166            })
2167            .await;
2168    }
2169
2170    /// `harn-serve` adapters (A2A `input-required`, ACP `hitl_request`)
2171    /// rely on the canonical `AgentEvent::HitlRequested` /
2172    /// `AgentEvent::HitlResolved` pair to bracket every HITL pause.
2173    /// Pin the contract here so future HITL primitives keep emitting
2174    /// the event around their waitpoint blocks.
2175    #[tokio::test(flavor = "current_thread")]
2176    async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2177        use std::sync::Mutex as StdMutex;
2178
2179        tokio::task::LocalSet::new()
2180            .run_until(async {
2181                let dir = tempfile::tempdir().expect("tempdir");
2182                let session_id = "hitl-session".to_string();
2183                let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2184                    std::sync::Arc::new(StdMutex::new(Vec::new()));
2185
2186                struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2187                impl crate::agent_events::AgentEventSink for CaptureSink {
2188                    fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2189                        self.0.lock().expect("captured").push(event.clone());
2190                    }
2191                }
2192
2193                // Inline the script setup rather than using the
2194                // `execute_hitl_script` helper: that helper calls
2195                // `reset_thread_local_state` (which wipes the session
2196                // store), so any session pushed before it would be
2197                // gone by the time `ask_user` runs.
2198                crate::reset_thread_local_state();
2199                crate::event_log::install_default_for_base_dir(dir.path())
2200                    .expect("install event log");
2201
2202                crate::agent_events::reset_all_sinks();
2203                let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2204                    std::sync::Arc::new(CaptureSink(captured.clone()));
2205                crate::agent_events::register_sink(session_id.clone(), sink);
2206                crate::agent_sessions::open_or_create(Some(session_id.clone()));
2207                let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2208
2209                let source = r#"
2210pipeline test(task) {
2211  host_mock("hitl", "question", {answer: "ok"})
2212  let answer: string = ask_user("Are you sure?", {default: "no"})
2213  __io_println(answer)
2214}
2215"#;
2216                let chunk = crate::compile_source(source).expect("compile source");
2217                let mut vm = Vm::new();
2218                register_vm_stdlib(&mut vm);
2219                vm.set_source_dir(dir.path());
2220                vm.execute(&chunk).await.expect("script runs");
2221                assert_eq!(vm.output().trim_end(), "ok");
2222
2223                let events = captured.lock().expect("captured");
2224                let mut iter = events.iter().filter(|event| {
2225                    matches!(
2226                        event,
2227                        crate::agent_events::AgentEvent::HitlRequested { .. }
2228                            | crate::agent_events::AgentEvent::HitlResolved { .. }
2229                    )
2230                });
2231                let requested = iter.next().expect("HitlRequested emitted");
2232                let resolved = iter.next().expect("HitlResolved emitted");
2233                assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2234
2235                let crate::agent_events::AgentEvent::HitlRequested {
2236                    session_id: req_session,
2237                    request_id: req_id,
2238                    kind: req_kind,
2239                    payload,
2240                } = requested
2241                else {
2242                    panic!("expected HitlRequested, got: {requested:?}");
2243                };
2244                assert_eq!(req_session, &session_id);
2245                assert_eq!(req_kind, "question");
2246                assert!(req_id.starts_with("hitl_question_"));
2247                assert_eq!(payload["prompt"], "Are you sure?");
2248
2249                let crate::agent_events::AgentEvent::HitlResolved {
2250                    request_id: res_id,
2251                    kind: res_kind,
2252                    outcome,
2253                    ..
2254                } = resolved
2255                else {
2256                    panic!("expected HitlResolved, got: {resolved:?}");
2257                };
2258                assert_eq!(res_id, req_id);
2259                assert_eq!(res_kind, "question");
2260                assert_eq!(outcome, "answered");
2261
2262                drop(_guard);
2263                crate::agent_events::reset_all_sinks();
2264            })
2265            .await;
2266    }
2267}