Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::runtime_limits::RuntimeLimits;
20use crate::schema::schema_expect_value;
21use crate::stdlib::host::dispatch_mock_host_call;
22use crate::stdlib::waitpoint::{
23    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
24    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
25    WaitpointWaitOptions,
26};
27use crate::triggers::dispatcher::current_dispatch_context;
28use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
29use crate::vm::{clone_async_builtin_child_vm, Vm};
30
31const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
32const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
34
35pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
36pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
37pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
38pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
39
40thread_local! {
41    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
42}
43
44#[derive(Default)]
45pub(crate) struct RequestSequenceState {
46    pub(crate) instance_key: String,
47    pub(crate) next_seq: u64,
48}
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub enum HitlRequestKind {
53    Question,
54    Approval,
55    DualControl,
56    Escalation,
57}
58
59impl HitlRequestKind {
60    pub(crate) fn as_str(self) -> &'static str {
61        match self {
62            Self::Question => "question",
63            Self::Approval => "approval",
64            Self::DualControl => "dual_control",
65            Self::Escalation => "escalation",
66        }
67    }
68
69    fn topic(self) -> &'static str {
70        match self {
71            Self::Question => HITL_QUESTIONS_TOPIC,
72            Self::Approval => HITL_APPROVALS_TOPIC,
73            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
74            Self::Escalation => HITL_ESCALATIONS_TOPIC,
75        }
76    }
77
78    fn request_event_kind(self) -> &'static str {
79        match self {
80            Self::Question => "hitl.question_asked",
81            Self::Approval => "hitl.approval_requested",
82            Self::DualControl => "hitl.dual_control_requested",
83            Self::Escalation => "hitl.escalation_issued",
84        }
85    }
86
87    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
88        if request_id.starts_with("hitl_question_") {
89            Some(Self::Question)
90        } else if request_id.starts_with("hitl_approval_") {
91            Some(Self::Approval)
92        } else if request_id.starts_with("hitl_dual_control_") {
93            Some(Self::DualControl)
94        } else if request_id.starts_with("hitl_escalation_") {
95            Some(Self::Escalation)
96        } else {
97            None
98        }
99    }
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103pub struct HitlHostResponse {
104    pub request_id: String,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub answer: Option<JsonValue>,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub approved: Option<bool>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub accepted: Option<bool>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub reviewer: Option<String>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub reason: Option<String>,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub metadata: Option<JsonValue>,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub responded_at: Option<String>,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub signature: Option<String>,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize)]
124struct HitlRequestEnvelope {
125    request_id: String,
126    kind: HitlRequestKind,
127    #[serde(default)]
128    agent: String,
129    trace_id: String,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    run_id: Option<String>,
132    requested_at: String,
133    payload: JsonValue,
134}
135
136#[derive(Clone, Debug, Serialize, Deserialize)]
137struct HitlTimeoutRecord {
138    request_id: String,
139    kind: HitlRequestKind,
140    trace_id: String,
141    timed_out_at: String,
142}
143
144#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
145pub struct ApprovalRequest {
146    pub id: String,
147    pub action: String,
148    #[serde(default)]
149    pub args: JsonValue,
150    pub principal: String,
151    pub requested_at: String,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub deadline: Option<String>,
154    pub approvers_required: u32,
155    #[serde(default)]
156    pub evidence_refs: Vec<JsonValue>,
157    #[serde(default)]
158    pub undo_metadata: JsonValue,
159    #[serde(default)]
160    pub capabilities_requested: Vec<String>,
161}
162
163impl ApprovalRequest {
164    pub fn new(
165        id: impl Into<String>,
166        action: impl Into<String>,
167        args: JsonValue,
168        principal: impl Into<String>,
169        requested_at: impl Into<String>,
170    ) -> Self {
171        Self {
172            id: id.into(),
173            action: action.into(),
174            args,
175            principal: principal.into(),
176            requested_at: requested_at.into(),
177            deadline: None,
178            approvers_required: 1,
179            evidence_refs: Vec::new(),
180            undo_metadata: JsonValue::Null,
181            capabilities_requested: Vec::new(),
182        }
183    }
184}
185
186pub(crate) fn approval_request_for_host_permission(
187    id: impl Into<String>,
188    action: impl Into<String>,
189    args: JsonValue,
190    principal: impl Into<String>,
191    evidence_refs: Vec<JsonValue>,
192    undo_metadata: JsonValue,
193    capabilities_requested: Vec<String>,
194) -> ApprovalRequest {
195    let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
196    request.evidence_refs = evidence_refs;
197    request.undo_metadata = undo_metadata;
198    request.capabilities_requested = capabilities_requested;
199    request
200}
201
202#[derive(Clone, Debug)]
203struct DispatchKeys {
204    instance_key: String,
205    stable_base: String,
206    agent: String,
207    trace_id: String,
208}
209
210#[derive(Clone, Debug)]
211struct AskUserOptions {
212    schema: Option<VmValue>,
213    timeout: Option<StdDuration>,
214    default: Option<VmValue>,
215}
216
217#[derive(Clone, Debug)]
218struct ApprovalOptions {
219    detail: Option<VmValue>,
220    args: Option<VmValue>,
221    quorum: u32,
222    reviewers: Vec<String>,
223    deadline: StdDuration,
224    principal: Option<String>,
225    evidence_refs: Vec<JsonValue>,
226    undo_metadata: Option<JsonValue>,
227    capabilities_requested: Vec<String>,
228}
229
230#[derive(Clone, Debug)]
231struct ApprovalProgress {
232    request_id: String,
233    reviewers: BTreeSet<String>,
234    signatures: Vec<ApprovalSignature>,
235    reason: Option<String>,
236    approved_at: Option<String>,
237}
238
239#[derive(Clone, Debug, Serialize)]
240struct ApprovalSignature {
241    reviewer: String,
242    signed_at: String,
243    signature: String,
244}
245
246#[derive(Clone, Debug)]
247enum ApprovalResolution {
248    Pending,
249    Approved(ApprovalProgress),
250    Denied(HitlHostResponse),
251}
252
253// `Completed` carries the full `WaitpointRecord`, which dominates the
254// enum's size — boxing it would force every match arm to indirect even
255// though the enum is dropped within nanoseconds of being constructed
256// (it's a local return type for the waitpoint poll loop, never stored).
257// Surfaced by the host-target compile of `harn-vm` introduced when
258// `harn-cli`'s build script gained `harn-vm` as a build-dep for the
259// AOT bytecode embedding pass (G7 / harn#2300).
260#[allow(clippy::large_enum_variant)]
261#[derive(Clone, Debug)]
262enum WaitpointOutcome {
263    Completed(WaitpointRecord),
264    Timeout,
265    Cancelled {
266        wait_id: String,
267        waitpoint_ids: Vec<String>,
268        reason: Option<String>,
269    },
270}
271
272pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
273    vm.register_async_builtin("ask_user", |args| {
274        Box::pin(async move { ask_user_impl(&args).await })
275    });
276
277    vm.register_async_builtin("request_approval", |args| {
278        Box::pin(async move { request_approval_impl(&args).await })
279    });
280
281    vm.register_async_builtin("dual_control", |args| {
282        Box::pin(async move { dual_control_impl(&args).await })
283    });
284
285    vm.register_async_builtin("escalate_to", |args| {
286        Box::pin(async move { escalate_to_impl(&args).await })
287    });
288}
289
290pub(crate) fn reset_hitl_state() {
291    REQUEST_SEQUENCE.with(|slot| {
292        *slot.borrow_mut() = RequestSequenceState::default();
293    });
294}
295
296pub(crate) fn take_hitl_state() -> RequestSequenceState {
297    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
298}
299
300pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
301    REQUEST_SEQUENCE.with(|slot| {
302        *slot.borrow_mut() = state;
303    });
304}
305
306pub async fn append_hitl_response(
307    base_dir: Option<&Path>,
308    mut response: HitlHostResponse,
309) -> Result<u64, String> {
310    let kind = HitlRequestKind::from_request_id(&response.request_id)
311        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
312    if response.responded_at.is_none() {
313        response.responded_at = Some(now_rfc3339());
314    }
315    let log = ensure_hitl_event_log_for(base_dir)?;
316    let headers = response_headers(&response.request_id);
317    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
318    let event_id = log
319        .append(
320            &topic,
321            LogEvent::new(
322                match kind {
323                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
324                    _ => "hitl.response_received",
325                },
326                serde_json::to_value(&response).map_err(|error| error.to_string())?,
327            )
328            .with_headers(headers),
329        )
330        .await
331        .map_err(|error| error.to_string())?;
332    finalize_hitl_response(&log, kind, &response).await?;
333    Ok(event_id)
334}
335
336pub async fn append_approval_request_on(
337    log: &Arc<AnyEventLog>,
338    agent: impl Into<String>,
339    trace_id: impl Into<String>,
340    action: impl Into<String>,
341    detail: JsonValue,
342    reviewers: Vec<String>,
343) -> Result<String, VmError> {
344    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
345    let trace_id = trace_id.into();
346    let agent = agent.into();
347    let requested_at_time = OffsetDateTime::now_utc();
348    let requested_at = format_rfc3339(requested_at_time);
349    let mut approval_request = ApprovalRequest::new(
350        request_id.clone(),
351        action.into(),
352        detail.clone(),
353        agent.clone(),
354        requested_at.clone(),
355    );
356    approval_request.deadline = deadline_after(
357        requested_at_time,
358        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
359    );
360    approval_request.approvers_required = 1;
361    let approval_request_json = serde_json::to_value(&approval_request)
362        .map_err(|error| VmError::Runtime(error.to_string()))?;
363    let request = HitlRequestEnvelope {
364        request_id: request_id.clone(),
365        kind: HitlRequestKind::Approval,
366        agent,
367        trace_id: trace_id.clone(),
368        run_id: None,
369        requested_at: requested_at.clone(),
370        payload: json!({
371            "approval_request": approval_request_json,
372            "id": approval_request.id,
373            "action": approval_request.action,
374            "args": approval_request.args,
375            "principal": approval_request.principal,
376            "requested_at": requested_at,
377            "deadline": approval_request.deadline,
378            "approvers_required": approval_request.approvers_required,
379            "evidence_refs": approval_request.evidence_refs,
380            "undo_metadata": approval_request.undo_metadata,
381            "capabilities_requested": approval_request.capabilities_requested,
382            "detail": detail,
383            "quorum": 1,
384            "reviewers": reviewers,
385            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
386        }),
387    };
388    create_request_waitpoint(log, &request).await?;
389    append_request(log, &request).await?;
390    maybe_notify_host(&request);
391    Ok(request_id)
392}
393
394async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
395    let prompt = required_string_arg(args, 0, "ask_user")?;
396    let options = parse_ask_user_options(args.get(1))?;
397    let keys = current_dispatch_keys();
398    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
399    let trace_id = keys
400        .as_ref()
401        .map(|keys| keys.trace_id.clone())
402        .unwrap_or_else(new_trace_id);
403    let log = ensure_hitl_event_log();
404    let request = HitlRequestEnvelope {
405        request_id: request_id.clone(),
406        kind: HitlRequestKind::Question,
407        agent: keys
408            .as_ref()
409            .map(|keys| keys.agent.clone())
410            .unwrap_or_default(),
411        trace_id: trace_id.clone(),
412        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
413        requested_at: now_rfc3339(),
414        payload: json!({
415            "prompt": prompt,
416            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
417            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
418            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
419        }),
420    };
421    create_request_waitpoint(&log, &request).await?;
422    append_request(&log, &request).await?;
423    maybe_notify_host(&request);
424    emit_hitl_requested(&request);
425    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
426
427    match wait_for_request_waitpoint_with_events(
428        &request_id,
429        HitlRequestKind::Question,
430        options.timeout,
431    )
432    .await?
433    {
434        WaitpointOutcome::Completed(record) => {
435            let answer = record
436                .value
437                .as_ref()
438                .map(crate::stdlib::json_to_vm_value)
439                .unwrap_or(VmValue::Nil);
440            if let Some(schema) = options.schema.as_ref() {
441                return schema_expect_value(&answer, schema, true);
442            }
443            if let Some(default) = options.default.as_ref() {
444                return Ok(coerce_like_default(&answer, default));
445            }
446            Ok(answer)
447        }
448        WaitpointOutcome::Timeout => {
449            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
450            if let Some(default) = options.default {
451                return Ok(default);
452            }
453            Err(timeout_error(&request_id, HitlRequestKind::Question))
454        }
455        WaitpointOutcome::Cancelled {
456            wait_id,
457            waitpoint_ids,
458            reason,
459        } => Err(hitl_cancelled_error(
460            &request_id,
461            HitlRequestKind::Question,
462            &wait_id,
463            &waitpoint_ids,
464            reason,
465        )),
466    }
467}
468
469async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
470    let action = required_string_arg(args, 0, "request_approval")?;
471    let options = parse_approval_options(args.get(1), "request_approval")?;
472    let keys = current_dispatch_keys();
473    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
474    let trace_id = keys
475        .as_ref()
476        .map(|keys| keys.trace_id.clone())
477        .unwrap_or_else(new_trace_id);
478    let agent = keys
479        .as_ref()
480        .map(|keys| keys.agent.clone())
481        .unwrap_or_default();
482    let requested_at_time = OffsetDateTime::now_utc();
483    let requested_at = format_rfc3339(requested_at_time);
484    let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
485    let approval_args = options
486        .args
487        .as_ref()
488        .or(options.detail.as_ref())
489        .map(crate::llm::vm_value_to_json)
490        .unwrap_or(JsonValue::Null);
491    let mut approval_request = ApprovalRequest::new(
492        request_id.clone(),
493        action.clone(),
494        approval_args,
495        principal,
496        requested_at.clone(),
497    );
498    approval_request.deadline = deadline_after(requested_at_time, options.deadline);
499    approval_request.approvers_required = options.quorum;
500    approval_request.evidence_refs = options.evidence_refs.clone();
501    approval_request.undo_metadata = options
502        .undo_metadata
503        .clone()
504        .or_else(|| {
505            crate::orchestration::current_mutation_session()
506                .and_then(|session| serde_json::to_value(session).ok())
507        })
508        .unwrap_or(JsonValue::Null);
509    approval_request.capabilities_requested = options.capabilities_requested.clone();
510    let approval_request_json = serde_json::to_value(&approval_request)
511        .map_err(|error| VmError::Runtime(error.to_string()))?;
512    let log = ensure_hitl_event_log();
513    let request = HitlRequestEnvelope {
514        request_id: request_id.clone(),
515        kind: HitlRequestKind::Approval,
516        agent,
517        trace_id: trace_id.clone(),
518        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
519        requested_at: requested_at.clone(),
520        payload: json!({
521            "approval_request": approval_request_json,
522            "id": approval_request.id,
523            "action": action,
524            "args": approval_request.args,
525            "principal": approval_request.principal,
526            "requested_at": requested_at,
527            "deadline": approval_request.deadline,
528            "approvers_required": approval_request.approvers_required,
529            "evidence_refs": approval_request.evidence_refs,
530            "undo_metadata": approval_request.undo_metadata,
531            "capabilities_requested": approval_request.capabilities_requested,
532            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
533            "quorum": options.quorum,
534            "reviewers": options.reviewers,
535            "deadline_ms": options.deadline.as_millis() as u64,
536        }),
537    };
538    create_request_waitpoint(&log, &request).await?;
539    append_request(&log, &request).await?;
540    maybe_notify_host(&request);
541    emit_hitl_requested(&request);
542    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
543
544    match wait_for_request_waitpoint_with_events(
545        &request_id,
546        HitlRequestKind::Approval,
547        Some(options.deadline),
548    )
549    .await?
550    {
551        WaitpointOutcome::Completed(record) => {
552            approval_record_from_waitpoint(&record, "request_approval")
553        }
554        WaitpointOutcome::Timeout => {
555            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
556            Err(timeout_error(&request_id, HitlRequestKind::Approval))
557        }
558        WaitpointOutcome::Cancelled { .. } => {
559            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
560        }
561    }
562}
563
564pub(crate) async fn request_approval_for_side_effect(
565    action: &str,
566    detail: JsonValue,
567    principal: String,
568    reviewers: Vec<String>,
569    capabilities_requested: Vec<String>,
570) -> Result<VmValue, VmError> {
571    let mut options = BTreeMap::new();
572    options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
573    options.insert(
574        "detail".to_string(),
575        crate::stdlib::json_to_vm_value(&detail),
576    );
577    options.insert(
578        "principal".to_string(),
579        VmValue::String(Rc::from(principal)),
580    );
581    options.insert(
582        "reviewers".to_string(),
583        VmValue::List(Rc::new(
584            reviewers
585                .into_iter()
586                .map(|reviewer| VmValue::String(Rc::from(reviewer)))
587                .collect(),
588        )),
589    );
590    options.insert(
591        "capabilities_requested".to_string(),
592        VmValue::List(Rc::new(
593            capabilities_requested
594                .into_iter()
595                .map(|capability| VmValue::String(Rc::from(capability)))
596                .collect(),
597        )),
598    );
599    let args = vec![
600        VmValue::String(Rc::from(action.to_string())),
601        VmValue::Dict(Rc::new(options)),
602    ];
603    request_approval_impl(&args).await
604}
605
606async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
607    let n = required_positive_int_arg(args, 0, "dual_control")?;
608    let m = required_positive_int_arg(args, 1, "dual_control")?;
609    if n > m {
610        return Err(VmError::Runtime(
611            "dual_control: n must be less than or equal to m".to_string(),
612        ));
613    }
614    let action = args
615        .get(2)
616        .and_then(|value| match value {
617            VmValue::Closure(closure) => Some(closure.clone()),
618            _ => None,
619        })
620        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
621    let approvers = optional_string_list(args.get(3), "dual_control")?;
622    if !approvers.is_empty() && approvers.len() < m as usize {
623        return Err(VmError::Runtime(format!(
624            "dual_control: expected at least {m} approvers, got {}",
625            approvers.len()
626        )));
627    }
628
629    let keys = current_dispatch_keys();
630    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
631    let trace_id = keys
632        .as_ref()
633        .map(|keys| keys.trace_id.clone())
634        .unwrap_or_else(new_trace_id);
635    let action_name = if action.func.name.is_empty() {
636        "anonymous".to_string()
637    } else {
638        action.func.name.clone()
639    };
640    let agent = keys
641        .as_ref()
642        .map(|keys| keys.agent.clone())
643        .unwrap_or_default();
644    let requested_at_time = OffsetDateTime::now_utc();
645    let requested_at = format_rfc3339(requested_at_time);
646    let mut approval_request = ApprovalRequest::new(
647        request_id.clone(),
648        action_name.clone(),
649        json!({
650            "n": n,
651            "m": m,
652            "approvers": approvers.clone(),
653        }),
654        agent.clone(),
655        requested_at.clone(),
656    );
657    approval_request.deadline = deadline_after(
658        requested_at_time,
659        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
660    );
661    approval_request.approvers_required = n as u32;
662    approval_request.undo_metadata = crate::orchestration::current_mutation_session()
663        .and_then(|session| serde_json::to_value(session).ok())
664        .unwrap_or(JsonValue::Null);
665    let approval_request_json = serde_json::to_value(&approval_request)
666        .map_err(|error| VmError::Runtime(error.to_string()))?;
667    let log = ensure_hitl_event_log();
668    let request = HitlRequestEnvelope {
669        request_id: request_id.clone(),
670        kind: HitlRequestKind::DualControl,
671        agent,
672        trace_id: trace_id.clone(),
673        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
674        requested_at: requested_at.clone(),
675        payload: json!({
676            "approval_request": approval_request_json,
677            "id": approval_request.id,
678            "args": approval_request.args,
679            "principal": approval_request.principal,
680            "requested_at": requested_at,
681            "deadline": approval_request.deadline,
682            "approvers_required": approval_request.approvers_required,
683            "evidence_refs": approval_request.evidence_refs,
684            "undo_metadata": approval_request.undo_metadata,
685            "capabilities_requested": approval_request.capabilities_requested,
686            "n": n,
687            "m": m,
688            "action": action_name,
689            "approvers": approvers,
690            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
691        }),
692    };
693    create_request_waitpoint(&log, &request).await?;
694    append_request(&log, &request).await?;
695    maybe_notify_host(&request);
696    emit_hitl_requested(&request);
697    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
698
699    match wait_for_request_waitpoint_with_events(
700        &request_id,
701        HitlRequestKind::DualControl,
702        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
703    )
704    .await?
705    {
706        WaitpointOutcome::Completed(record) => {
707            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
708            let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
709                VmError::Runtime("dual_control requires an async builtin VM context".to_string())
710            })?;
711            let result = vm.call_closure_pub(&action, &[]).await?;
712
713            append_named_event(
714                &log,
715                HitlRequestKind::DualControl,
716                "hitl.dual_control_executed",
717                &request_id,
718                &trace_id,
719                json!({
720                    "request_id": request_id,
721                    "result": crate::llm::vm_value_to_json(&result),
722                }),
723            )
724            .await?;
725
726            Ok(result)
727        }
728        WaitpointOutcome::Timeout => {
729            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
730            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
731        }
732        WaitpointOutcome::Cancelled { .. } => {
733            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
734        }
735    }
736}
737
738async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
739    let role = required_string_arg(args, 0, "escalate_to")?;
740    let reason = required_string_arg(args, 1, "escalate_to")?;
741    let keys = current_dispatch_keys();
742    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
743    let trace_id = keys
744        .as_ref()
745        .map(|keys| keys.trace_id.clone())
746        .unwrap_or_else(new_trace_id);
747    let log = ensure_hitl_event_log();
748    let request = HitlRequestEnvelope {
749        request_id: request_id.clone(),
750        kind: HitlRequestKind::Escalation,
751        agent: keys
752            .as_ref()
753            .map(|keys| keys.agent.clone())
754            .unwrap_or_default(),
755        trace_id: trace_id.clone(),
756        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
757        requested_at: now_rfc3339(),
758        payload: json!({
759            "role": role,
760            "reason": reason,
761            "capability_policy": escalation_capability_policy(),
762        }),
763    };
764    create_request_waitpoint(&log, &request).await?;
765    append_request(&log, &request).await?;
766    maybe_notify_host(&request);
767    emit_hitl_requested(&request);
768    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
769
770    match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
771        .await?
772    {
773        WaitpointOutcome::Completed(record) => {
774            let accepted_at = record.completed_at.clone();
775            let reviewer = record.completed_by.clone();
776            let accepted = record
777                .value
778                .as_ref()
779                .and_then(|value| value.get("accepted"))
780                .and_then(JsonValue::as_bool)
781                .unwrap_or(true);
782            Ok(crate::stdlib::json_to_vm_value(&json!({
783                "request_id": request_id,
784                "role": role,
785                "reason": reason,
786                "trace_id": trace_id,
787                "status": if accepted { "accepted" } else { "pending" },
788                "accepted_at": accepted_at,
789                "reviewer": reviewer,
790            })))
791        }
792        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
793        WaitpointOutcome::Cancelled {
794            wait_id,
795            waitpoint_ids,
796            reason,
797        } => Err(hitl_cancelled_error(
798            &request_id,
799            HitlRequestKind::Escalation,
800            &wait_id,
801            &waitpoint_ids,
802            reason,
803        )),
804    }
805}
806
807async fn create_request_waitpoint(
808    log: &Arc<AnyEventLog>,
809    request: &HitlRequestEnvelope,
810) -> Result<(), VmError> {
811    create_waitpoint_on(
812        log,
813        Some(request.request_id.clone()),
814        Some(json!({
815            "kind": request.kind.as_str(),
816            "agent": request.agent.clone(),
817            "trace_id": request.trace_id.clone(),
818            "requested_at": request.requested_at.clone(),
819            "payload": request.payload.clone(),
820        })),
821    )
822    .await?;
823    Ok(())
824}
825
826async fn wait_for_request_waitpoint(
827    request_id: &str,
828    timeout: Option<StdDuration>,
829) -> Result<WaitpointOutcome, VmError> {
830    match wait_on_waitpoints(
831        vec![request_id.to_string()],
832        WaitpointWaitOptions { timeout },
833    )
834    .await
835    {
836        Ok(records) => Ok(WaitpointOutcome::Completed(
837            records
838                .into_iter()
839                .next()
840                .expect("single waitpoint wait result"),
841        )),
842        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
843        Err(WaitpointWaitFailure::Cancelled {
844            wait_id,
845            waitpoint_ids,
846            reason,
847        }) => Ok(WaitpointOutcome::Cancelled {
848            wait_id,
849            waitpoint_ids,
850            reason,
851        }),
852        Err(WaitpointWaitFailure::Vm(error)) => {
853            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
854                return Ok(outcome);
855            }
856            Err(error)
857        }
858    }
859}
860
861fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
862    let VmError::Thrown(VmValue::Dict(dict)) = error else {
863        return None;
864    };
865    let name = dict.get("name").and_then(vm_string)?;
866    match name {
867        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
868        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
869            wait_id: dict
870                .get("wait_id")
871                .and_then(vm_string)
872                .unwrap_or_default()
873                .to_string(),
874            waitpoint_ids: dict
875                .get("waitpoint_ids")
876                .and_then(vm_string_list)
877                .unwrap_or_default(),
878            reason: dict
879                .get("reason")
880                .and_then(vm_string)
881                .map(ToString::to_string),
882        }),
883        _ => None,
884    }
885}
886
887async fn finalize_hitl_response(
888    log: &Arc<AnyEventLog>,
889    kind: HitlRequestKind,
890    response: &HitlHostResponse,
891) -> Result<(), String> {
892    match kind {
893        HitlRequestKind::Question => {
894            if waitpoint_is_terminal(log, &response.request_id).await? {
895                return Ok(());
896            }
897            complete_waitpoint_on(
898                log,
899                &response.request_id,
900                response.answer.clone(),
901                response.reviewer.clone(),
902                response.reason.clone(),
903                response.metadata.clone(),
904            )
905            .await
906            .map(|_| ())
907            .map_err(|error| error.to_string())
908        }
909        HitlRequestKind::Escalation => {
910            if !response.accepted.unwrap_or(false)
911                || waitpoint_is_terminal(log, &response.request_id).await?
912            {
913                return Ok(());
914            }
915            complete_waitpoint_on(
916                log,
917                &response.request_id,
918                Some(json!({
919                    "accepted": true,
920                    "reviewer": response.reviewer,
921                    "reason": response.reason,
922                    "responded_at": response.responded_at,
923                })),
924                response.reviewer.clone(),
925                response.reason.clone(),
926                response.metadata.clone(),
927            )
928            .await
929            .map(|_| ())
930            .map_err(|error| error.to_string())
931        }
932        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
933            if waitpoint_is_terminal(log, &response.request_id).await? {
934                return Ok(());
935            }
936            let request = load_request_envelope(log, kind, &response.request_id)
937                .await
938                .map_err(|error| error.to_string())?;
939            match resolve_approval_state(log, kind, &request)
940                .await
941                .map_err(|error| error.to_string())?
942            {
943                ApprovalResolution::Pending => Ok(()),
944                ApprovalResolution::Approved(progress) => {
945                    let record = approval_record_json(&progress);
946                    append_named_event(
947                        log,
948                        kind,
949                        approved_event_kind(kind),
950                        &request.request_id,
951                        &request.trace_id,
952                        json!({
953                            "request_id": request.request_id.clone(),
954                            "record": record.clone(),
955                        }),
956                    )
957                    .await
958                    .map_err(|error| error.to_string())?;
959                    complete_waitpoint_on(
960                        log,
961                        &request.request_id,
962                        Some(record),
963                        response.reviewer.clone(),
964                        progress.reason.clone(),
965                        response.metadata.clone(),
966                    )
967                    .await
968                    .map(|_| ())
969                    .map_err(|error| error.to_string())
970                }
971                ApprovalResolution::Denied(denied) => {
972                    append_named_event(
973                        log,
974                        kind,
975                        denied_event_kind(kind),
976                        &request.request_id,
977                        &request.trace_id,
978                        json!({
979                            "request_id": request.request_id.clone(),
980                            "reviewer": denied.reviewer.clone(),
981                            "reason": denied.reason.clone(),
982                        }),
983                    )
984                    .await
985                    .map_err(|error| error.to_string())?;
986                    cancel_waitpoint_on(
987                        log,
988                        &request.request_id,
989                        denied.reviewer.clone(),
990                        denied.reason.clone(),
991                        denied.metadata.clone(),
992                    )
993                    .await
994                    .map(|_| ())
995                    .map_err(|error| error.to_string())
996                }
997            }
998        }
999    }
1000}
1001
1002async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1003    Ok(inspect_waitpoint_on(log, request_id)
1004        .await
1005        .map_err(|error| error.to_string())?
1006        .is_some_and(|record| record.status != WaitpointStatus::Open))
1007}
1008
1009async fn load_request_envelope(
1010    log: &Arc<AnyEventLog>,
1011    kind: HitlRequestKind,
1012    request_id: &str,
1013) -> Result<HitlRequestEnvelope, VmError> {
1014    let topic = topic(kind)?;
1015    let events = log
1016        .read_range(&topic, None, usize::MAX)
1017        .await
1018        .map_err(log_error)?;
1019    events
1020        .into_iter()
1021        .filter(|(_, event)| event.kind == kind.request_event_kind())
1022        .find_map(|(_, event)| {
1023            if !event_matches_request(&event, request_id) {
1024                return None;
1025            }
1026            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1027        })
1028        .ok_or_else(|| {
1029            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1030        })
1031}
1032
1033async fn resolve_approval_state(
1034    log: &Arc<AnyEventLog>,
1035    kind: HitlRequestKind,
1036    request: &HitlRequestEnvelope,
1037) -> Result<ApprovalResolution, VmError> {
1038    let quorum = approval_quorum_from_request(kind, request)?;
1039    let allowed_reviewers = approval_reviewers_from_request(kind, request)
1040        .into_iter()
1041        .collect::<BTreeSet<_>>();
1042    let mut progress = ApprovalProgress {
1043        request_id: request.request_id.clone(),
1044        reviewers: BTreeSet::new(),
1045        signatures: Vec::new(),
1046        reason: None,
1047        approved_at: None,
1048    };
1049    let topic = topic(kind)?;
1050    let events = log
1051        .read_range(&topic, None, usize::MAX)
1052        .await
1053        .map_err(log_error)?;
1054    for (_, event) in events {
1055        if !event_matches_request(&event, &request.request_id)
1056            || event.kind != "hitl.response_received"
1057        {
1058            continue;
1059        }
1060        let response: HitlHostResponse = serde_json::from_value(event.payload)
1061            .map_err(|error| VmError::Runtime(error.to_string()))?;
1062        if let Some(reviewer) = response.reviewer.as_deref() {
1063            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1064                continue;
1065            }
1066            if progress.reviewers.contains(reviewer) {
1067                continue;
1068            }
1069        }
1070        if response.approved.unwrap_or(false) {
1071            if let Some(reviewer) = response.reviewer.clone() {
1072                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1073                progress.reviewers.insert(reviewer.clone());
1074                progress.signatures.push(ApprovalSignature {
1075                    reviewer: reviewer.clone(),
1076                    signed_at: signed_at.clone(),
1077                    signature: response.signature.clone().unwrap_or_else(|| {
1078                        approval_receipt_signature(
1079                            &request.request_id,
1080                            &reviewer,
1081                            &signed_at,
1082                            true,
1083                            response.reason.as_deref(),
1084                        )
1085                    }),
1086                });
1087            }
1088            progress.reason = response.reason.clone();
1089            progress.approved_at = response.responded_at.clone();
1090            if progress.reviewers.len() as u32 >= quorum {
1091                return Ok(ApprovalResolution::Approved(progress));
1092            }
1093            continue;
1094        }
1095        return Ok(ApprovalResolution::Denied(response));
1096    }
1097    Ok(ApprovalResolution::Pending)
1098}
1099
1100fn approval_quorum_from_request(
1101    kind: HitlRequestKind,
1102    request: &HitlRequestEnvelope,
1103) -> Result<u32, VmError> {
1104    let key = match kind {
1105        HitlRequestKind::DualControl => "n",
1106        _ => "quorum",
1107    };
1108    let quorum = request
1109        .payload
1110        .get(key)
1111        .or_else(|| request.payload.get("approvers_required"))
1112        .or_else(|| {
1113            request
1114                .payload
1115                .get("approval_request")
1116                .and_then(|approval| approval.get("approvers_required"))
1117        })
1118        .and_then(JsonValue::as_u64)
1119        .unwrap_or(1);
1120    u32::try_from(quorum).map_err(|_| {
1121        VmError::Runtime(format!(
1122            "invalid quorum in HITL request '{}'",
1123            request.request_id
1124        ))
1125    })
1126}
1127
1128fn approval_reviewers_from_request(
1129    kind: HitlRequestKind,
1130    request: &HitlRequestEnvelope,
1131) -> Vec<String> {
1132    let key = match kind {
1133        HitlRequestKind::DualControl => "approvers",
1134        _ => "reviewers",
1135    };
1136    request
1137        .payload
1138        .get(key)
1139        .or_else(|| {
1140            request
1141                .payload
1142                .get("approval_request")
1143                .and_then(|approval| approval.get("reviewers"))
1144        })
1145        .and_then(JsonValue::as_array)
1146        .map(|values| {
1147            values
1148                .iter()
1149                .filter_map(JsonValue::as_str)
1150                .map(str::to_string)
1151                .collect()
1152        })
1153        .unwrap_or_default()
1154}
1155
1156fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1157    json!({
1158        "request_id": progress.request_id.clone(),
1159        "approved": true,
1160        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1161        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1162        "reason": progress.reason,
1163        "signatures": progress.signatures,
1164    })
1165}
1166
1167fn approval_receipt_signature(
1168    request_id: &str,
1169    reviewer: &str,
1170    signed_at: &str,
1171    approved: bool,
1172    reason: Option<&str>,
1173) -> String {
1174    let material = format!(
1175        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1176        reason.unwrap_or("")
1177    );
1178    let hash = sha2::Sha256::digest(material.as_bytes());
1179    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1180    format!("sha256:{hex}")
1181}
1182
1183fn approval_record_from_waitpoint(
1184    record: &WaitpointRecord,
1185    builtin: &str,
1186) -> Result<VmValue, VmError> {
1187    record
1188        .value
1189        .as_ref()
1190        .map(crate::stdlib::json_to_vm_value)
1191        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1192}
1193
1194async fn approval_wait_error(
1195    log: &Arc<AnyEventLog>,
1196    kind: HitlRequestKind,
1197    request_id: &str,
1198) -> VmError {
1199    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1200        if record.status == WaitpointStatus::Cancelled
1201            && record.reason.as_deref() != Some("upstream_cancelled")
1202        {
1203            return approval_denied_error(
1204                request_id,
1205                HitlHostResponse {
1206                    request_id: request_id.to_string(),
1207                    answer: None,
1208                    approved: Some(false),
1209                    accepted: None,
1210                    reviewer: record.cancelled_by.clone(),
1211                    reason: record.reason.clone(),
1212                    metadata: record.metadata.clone(),
1213                    responded_at: record.cancelled_at,
1214                    signature: None,
1215                },
1216            );
1217        }
1218        if record.status == WaitpointStatus::Cancelled {
1219            return hitl_cancelled_error(
1220                request_id,
1221                kind,
1222                "",
1223                &[request_id.to_string()],
1224                record.reason,
1225            );
1226        }
1227    }
1228    hitl_cancelled_error(
1229        request_id,
1230        kind,
1231        "",
1232        &[request_id.to_string()],
1233        Some("upstream_cancelled".to_string()),
1234    )
1235}
1236
1237async fn append_timeout_once(
1238    log: &Arc<AnyEventLog>,
1239    kind: HitlRequestKind,
1240    request_id: &str,
1241    trace_id: &str,
1242) -> Result<(), VmError> {
1243    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1244        return Ok(());
1245    }
1246    append_timeout(log, kind, request_id, trace_id).await
1247}
1248
1249async fn hitl_event_exists(
1250    log: &Arc<AnyEventLog>,
1251    kind: HitlRequestKind,
1252    request_id: &str,
1253    event_kind: &str,
1254) -> Result<bool, VmError> {
1255    let topic = topic(kind)?;
1256    let events = log
1257        .read_range(&topic, None, usize::MAX)
1258        .await
1259        .map_err(log_error)?;
1260    Ok(events
1261        .into_iter()
1262        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1263}
1264
1265fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1266    match kind {
1267        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1268        _ => "hitl.approval_approved",
1269    }
1270}
1271
1272fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1273    match kind {
1274        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1275        _ => "hitl.approval_denied",
1276    }
1277}
1278
1279async fn append_request(
1280    log: &Arc<AnyEventLog>,
1281    request: &HitlRequestEnvelope,
1282) -> Result<(), VmError> {
1283    let topic = topic(request.kind)?;
1284    log.append(
1285        &topic,
1286        LogEvent::new(
1287            request.kind.request_event_kind(),
1288            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1289        )
1290        .with_headers(request_headers(request)),
1291    )
1292    .await
1293    .map(|_| ())
1294    .map_err(log_error)
1295}
1296
1297async fn append_named_event(
1298    log: &Arc<AnyEventLog>,
1299    kind: HitlRequestKind,
1300    event_kind: &str,
1301    request_id: &str,
1302    trace_id: &str,
1303    payload: JsonValue,
1304) -> Result<(), VmError> {
1305    let topic = topic(kind)?;
1306    let headers = headers_with_trace(request_id, trace_id);
1307    log.append(
1308        &topic,
1309        LogEvent::new(event_kind, payload).with_headers(headers),
1310    )
1311    .await
1312    .map(|_| ())
1313    .map_err(log_error)
1314}
1315
1316async fn append_timeout(
1317    log: &Arc<AnyEventLog>,
1318    kind: HitlRequestKind,
1319    request_id: &str,
1320    trace_id: &str,
1321) -> Result<(), VmError> {
1322    append_named_event(
1323        log,
1324        kind,
1325        "hitl.timeout",
1326        request_id,
1327        trace_id,
1328        serde_json::to_value(HitlTimeoutRecord {
1329            request_id: request_id.to_string(),
1330            kind,
1331            trace_id: trace_id.to_string(),
1332            timed_out_at: now_rfc3339(),
1333        })
1334        .map_err(|error| VmError::Runtime(error.to_string()))?,
1335    )
1336    .await
1337}
1338
1339async fn maybe_apply_mock_response(
1340    kind: HitlRequestKind,
1341    request_id: &str,
1342    request_payload: &JsonValue,
1343) -> Result<(), VmError> {
1344    let mut params = request_payload
1345        .as_object()
1346        .cloned()
1347        .unwrap_or_default()
1348        .into_iter()
1349        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1350        .collect::<BTreeMap<_, _>>();
1351    params.insert(
1352        "request_id".to_string(),
1353        VmValue::String(Rc::from(request_id.to_string())),
1354    );
1355    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1356        return Ok(());
1357    };
1358    let value = result?;
1359    let responses = match value {
1360        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1361        other => vec![other],
1362    };
1363    for response in responses {
1364        let response_dict = response.as_dict().ok_or_else(|| {
1365            VmError::Runtime(format!(
1366                "mocked HITL {} response must be a dict or list<dict>",
1367                kind.as_str()
1368            ))
1369        })?;
1370        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1371        append_hitl_response(None, hitl_response)
1372            .await
1373            .map_err(VmError::Runtime)?;
1374    }
1375    Ok(())
1376}
1377
1378fn parse_hitl_response_dict(
1379    request_id: &str,
1380    response_dict: &BTreeMap<String, VmValue>,
1381) -> Result<HitlHostResponse, VmError> {
1382    Ok(HitlHostResponse {
1383        request_id: request_id.to_string(),
1384        answer: response_dict
1385            .get("answer")
1386            .map(crate::llm::vm_value_to_json),
1387        approved: response_dict.get("approved").and_then(vm_bool),
1388        accepted: response_dict.get("accepted").and_then(vm_bool),
1389        reviewer: response_dict.get("reviewer").map(VmValue::display),
1390        reason: response_dict.get("reason").map(VmValue::display),
1391        metadata: response_dict
1392            .get("metadata")
1393            .map(crate::llm::vm_value_to_json),
1394        responded_at: response_dict.get("responded_at").map(VmValue::display),
1395        signature: response_dict.get("signature").map(VmValue::display),
1396    })
1397}
1398
1399fn maybe_notify_host(request: &HitlRequestEnvelope) {
1400    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1401        return;
1402    };
1403    bridge.notify(
1404        "harn.hitl.requested",
1405        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1406    );
1407}
1408
1409/// Emit a `HitlRequested` `AgentEvent` so transport adapters
1410/// (currently the A2A `A2aWorkerSink`) can flip a task into
1411/// `input-required` while the script is suspended on the waitpoint.
1412/// No-op when there is no current agent session — the bridge-level
1413/// `harn.hitl.requested` notification still fires for hosts that drive
1414/// HITL UX through the bridge.
1415fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1416    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1417        return;
1418    };
1419    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1420        session_id,
1421        request_id: request.request_id.clone(),
1422        kind: request.kind.as_str().to_string(),
1423        payload: request.payload.clone(),
1424    });
1425}
1426
1427/// Companion to `emit_hitl_requested`: notifies sinks that the
1428/// suspended waitpoint has resolved so a paused task can flip back
1429/// out of `input-required`. `outcome` is one of `"answered"`,
1430/// `"timeout"`, `"cancelled"`, or `"error"`.
1431fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1432    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1433        return;
1434    };
1435    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1436        session_id,
1437        request_id: request_id.to_string(),
1438        kind: kind.as_str().to_string(),
1439        outcome: outcome.to_string(),
1440    });
1441}
1442
1443/// Wrapper around `wait_for_request_waitpoint` that emits the
1444/// canonical `HitlResolved` `AgentEvent` regardless of which terminal
1445/// branch the waitpoint takes (response / timeout / cancellation /
1446/// error). Pair-emitted with `emit_hitl_requested` so transport
1447/// adapters can bracket the `input-required` pause cleanly without
1448/// each `*_impl` having to duplicate the emission at every match arm.
1449async fn wait_for_request_waitpoint_with_events(
1450    request_id: &str,
1451    kind: HitlRequestKind,
1452    timeout: Option<StdDuration>,
1453) -> Result<WaitpointOutcome, VmError> {
1454    let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1455    let label = match &outcome {
1456        Ok(WaitpointOutcome::Completed(_)) => "answered",
1457        Ok(WaitpointOutcome::Timeout) => "timeout",
1458        Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1459        Err(_) => "error",
1460    };
1461    emit_hitl_resolved(request_id, kind, label);
1462    outcome
1463}
1464
1465fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1466    let Some(value) = value else {
1467        return Ok(AskUserOptions {
1468            schema: None,
1469            timeout: Some(default_question_timeout()),
1470            default: None,
1471        });
1472    };
1473    let dict = value
1474        .as_dict()
1475        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1476    Ok(AskUserOptions {
1477        schema: dict
1478            .get("schema")
1479            .cloned()
1480            .filter(|value| !matches!(value, VmValue::Nil)),
1481        timeout: dict
1482            .get("timeout")
1483            .map(parse_duration_value)
1484            .transpose()?
1485            .or_else(|| Some(default_question_timeout())),
1486        default: dict
1487            .get("default")
1488            .cloned()
1489            .filter(|value| !matches!(value, VmValue::Nil)),
1490    })
1491}
1492
1493fn default_question_timeout() -> StdDuration {
1494    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1495}
1496
1497fn escalation_capability_policy() -> JsonValue {
1498    crate::orchestration::current_execution_policy()
1499        .and_then(|policy| serde_json::to_value(policy).ok())
1500        .unwrap_or(JsonValue::Null)
1501}
1502
1503fn parse_approval_options(
1504    value: Option<&VmValue>,
1505    builtin: &str,
1506) -> Result<ApprovalOptions, VmError> {
1507    let dict = match value {
1508        None => None,
1509        Some(VmValue::Dict(dict)) => Some(dict),
1510        Some(_) => {
1511            return Err(VmError::Runtime(format!(
1512                "{builtin}: options must be a dict"
1513            )))
1514        }
1515    };
1516    let quorum = dict
1517        .and_then(|dict| dict.get("quorum"))
1518        .and_then(VmValue::as_int)
1519        .unwrap_or(1);
1520    if quorum <= 0 {
1521        return Err(VmError::Runtime(format!(
1522            "{builtin}: quorum must be positive"
1523        )));
1524    }
1525    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1526    let capabilities_requested = optional_string_list(
1527        dict.and_then(|dict| dict.get("capabilities_requested")),
1528        builtin,
1529    )?;
1530    let evidence_refs = dict
1531        .and_then(|dict| dict.get("evidence_refs"))
1532        .map(|value| match value {
1533            VmValue::List(items) => Ok(items
1534                .iter()
1535                .map(crate::llm::vm_value_to_json)
1536                .collect::<Vec<_>>()),
1537            _ => Err(VmError::Runtime(format!(
1538                "{builtin}: evidence_refs must be a list"
1539            ))),
1540        })
1541        .transpose()?
1542        .unwrap_or_default();
1543    let deadline = dict
1544        .and_then(|dict| dict.get("deadline"))
1545        .map(parse_duration_value)
1546        .transpose()?
1547        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1548    Ok(ApprovalOptions {
1549        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1550        args: dict.and_then(|dict| dict.get("args")).cloned(),
1551        quorum: quorum as u32,
1552        reviewers,
1553        deadline,
1554        principal: dict
1555            .and_then(|dict| dict.get("principal"))
1556            .map(VmValue::display)
1557            .filter(|value| !value.is_empty()),
1558        evidence_refs,
1559        undo_metadata: dict
1560            .and_then(|dict| dict.get("undo_metadata"))
1561            .map(crate::llm::vm_value_to_json),
1562        capabilities_requested,
1563    })
1564}
1565
1566fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1567    args.get(idx)
1568        .map(VmValue::display)
1569        .filter(|value| !value.is_empty())
1570        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1571}
1572
1573fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1574    let value = args
1575        .get(idx)
1576        .and_then(VmValue::as_int)
1577        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1578    if value <= 0 {
1579        return Err(VmError::Runtime(format!(
1580            "{builtin}: expected a positive int at {idx}"
1581        )));
1582    }
1583    Ok(value)
1584}
1585
1586fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1587    let Some(value) = value else {
1588        return Ok(Vec::new());
1589    };
1590    match value {
1591        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1592        _ => Err(VmError::Runtime(format!(
1593            "{builtin}: expected list<string>"
1594        ))),
1595    }
1596}
1597
1598fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1599    match value {
1600        VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1601        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1602        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1603        _ => Err(VmError::Runtime(
1604            "expected a duration or millisecond count".to_string(),
1605        )),
1606    }
1607}
1608
1609fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1610    active_event_log()
1611        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1612}
1613
1614fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1615    if let Some(log) = active_event_log() {
1616        return Ok(log);
1617    }
1618    let Some(base_dir) = base_dir else {
1619        return Ok(install_memory_for_current_thread(
1620            HITL_EVENT_LOG_QUEUE_DEPTH,
1621        ));
1622    };
1623    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1624}
1625
1626fn current_dispatch_keys() -> Option<DispatchKeys> {
1627    let context = current_dispatch_context()?;
1628    let stable_base = context
1629        .replay_of_event_id
1630        .clone()
1631        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1632    let instance_key = format!(
1633        "{}::{}",
1634        context.trigger_event.id.0,
1635        context.replay_of_event_id.as_deref().unwrap_or("live")
1636    );
1637    Some(DispatchKeys {
1638        instance_key,
1639        stable_base,
1640        agent: context.agent_id,
1641        trace_id: context.trigger_event.trace_id.0,
1642    })
1643}
1644
1645fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1646    if let Some(keys) = dispatch_keys {
1647        let seq = REQUEST_SEQUENCE.with(|slot| {
1648            let mut state = slot.borrow_mut();
1649            if state.instance_key != keys.instance_key {
1650                state.instance_key = keys.instance_key.clone();
1651                state.next_seq = 0;
1652            }
1653            state.next_seq += 1;
1654            state.next_seq
1655        });
1656        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1657    }
1658    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1659}
1660
1661fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1662    let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1663    if let Some(run_id) = request.run_id.as_ref() {
1664        headers.insert("run_id".to_string(), run_id.clone());
1665    }
1666    headers
1667}
1668
1669fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1670    let mut headers = BTreeMap::new();
1671    headers.insert("request_id".to_string(), request_id.to_string());
1672    headers
1673}
1674
1675fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1676    let mut headers = response_headers(request_id);
1677    headers.insert("trace_id".to_string(), trace_id.to_string());
1678    headers
1679}
1680
1681fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1682    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1683}
1684
1685fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1686    event
1687        .headers
1688        .get("request_id")
1689        .is_some_and(|value| value == request_id)
1690        || event
1691            .payload
1692            .get("request_id")
1693            .and_then(JsonValue::as_str)
1694            .is_some_and(|value| value == request_id)
1695}
1696
1697fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1698    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1699        "name": "ApprovalDeniedError",
1700        "category": "generic",
1701        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1702        "request_id": request_id,
1703        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1704        "reason": response.reason,
1705    })))
1706}
1707
1708fn hitl_cancelled_error(
1709    request_id: &str,
1710    kind: HitlRequestKind,
1711    wait_id: &str,
1712    waitpoint_ids: &[String],
1713    reason: Option<String>,
1714) -> VmError {
1715    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1716    let message = reason
1717        .clone()
1718        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1719    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1720        "name": "HumanCancelledError",
1721        "category": ErrorCategory::Cancelled.as_str(),
1722        "message": message,
1723        "request_id": request_id,
1724        "kind": kind.as_str(),
1725        "wait_id": wait_id,
1726        "waitpoint_ids": waitpoint_ids,
1727        "reason": reason,
1728    })))
1729}
1730
1731fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1732    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1733    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1734        "name": "HumanTimeoutError",
1735        "category": ErrorCategory::Timeout.as_str(),
1736        "message": format!("{} timed out", kind.as_str()),
1737        "request_id": request_id,
1738        "kind": kind.as_str(),
1739    })))
1740}
1741
1742fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1743    match default {
1744        VmValue::Int(_) => match value {
1745            VmValue::Int(_) => value.clone(),
1746            VmValue::Float(number) => VmValue::Int(*number as i64),
1747            VmValue::String(text) => text
1748                .parse::<i64>()
1749                .map(VmValue::Int)
1750                .unwrap_or_else(|_| default.clone()),
1751            _ => default.clone(),
1752        },
1753        VmValue::Float(_) => match value {
1754            VmValue::Float(_) => value.clone(),
1755            VmValue::Int(number) => VmValue::Float(*number as f64),
1756            VmValue::String(text) => text
1757                .parse::<f64>()
1758                .map(VmValue::Float)
1759                .unwrap_or_else(|_| default.clone()),
1760            _ => default.clone(),
1761        },
1762        VmValue::Bool(_) => match value {
1763            VmValue::Bool(_) => value.clone(),
1764            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1765            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1766            _ => default.clone(),
1767        },
1768        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1769        VmValue::Duration(_) => match value {
1770            VmValue::Duration(_) => value.clone(),
1771            VmValue::Int(ms) => VmValue::Duration(*ms),
1772            _ => default.clone(),
1773        },
1774        VmValue::Nil => value.clone(),
1775        _ => {
1776            if value.type_name() == default.type_name() {
1777                value.clone()
1778            } else {
1779                default.clone()
1780            }
1781        }
1782    }
1783}
1784
1785fn log_error(error: impl std::fmt::Display) -> VmError {
1786    VmError::Runtime(error.to_string())
1787}
1788
1789fn now_rfc3339() -> String {
1790    format_rfc3339(OffsetDateTime::now_utc())
1791}
1792
1793fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1794    timestamp
1795        .format(&Rfc3339)
1796        .unwrap_or_else(|_| timestamp.to_string())
1797}
1798
1799fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1800    time::Duration::try_from(duration)
1801        .ok()
1802        .map(|duration| format_rfc3339(requested_at + duration))
1803}
1804
1805fn new_trace_id() -> String {
1806    format!("trace_{}", Uuid::now_v7())
1807}
1808
1809fn vm_bool(value: &VmValue) -> Option<bool> {
1810    match value {
1811        VmValue::Bool(flag) => Some(*flag),
1812        _ => None,
1813    }
1814}
1815
1816fn vm_string(value: &VmValue) -> Option<&str> {
1817    match value {
1818        VmValue::String(text) => Some(text.as_ref()),
1819        _ => None,
1820    }
1821}
1822
1823fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1824    match value {
1825        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1826        _ => None,
1827    }
1828}
1829
1830#[cfg(test)]
1831mod tests {
1832    use super::{
1833        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1834    };
1835    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1836    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1837
1838    async fn execute_hitl_script(
1839        base_dir: &std::path::Path,
1840        source: &str,
1841    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1842        reset_thread_local_state();
1843        let log = install_default_for_base_dir(base_dir).expect("install event log");
1844        let chunk = compile_source(source).expect("compile source");
1845        let mut vm = Vm::new();
1846        register_vm_stdlib(&mut vm);
1847        vm.set_source_dir(base_dir);
1848        vm.execute(&chunk).await?;
1849        let output = vm.output().trim_end().to_string();
1850        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1851        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1852        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1853        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1854        Ok((
1855            output,
1856            question_events,
1857            approval_events,
1858            dual_control_events,
1859            escalation_events,
1860        ))
1861    }
1862
1863    async fn event_kinds(
1864        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1865        topic: &str,
1866    ) -> Vec<String> {
1867        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1868            .await
1869            .expect("read topic")
1870            .into_iter()
1871            .map(|(_, event)| event.kind)
1872            .collect()
1873    }
1874
1875    async fn event_payloads(
1876        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1877        topic: &str,
1878    ) -> Vec<serde_json::Value> {
1879        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1880            .await
1881            .expect("read topic")
1882            .into_iter()
1883            .map(|(_, event)| event.payload)
1884            .collect()
1885    }
1886
1887    #[tokio::test(flavor = "current_thread")]
1888    async fn ask_user_coerces_to_default_type_and_logs_events() {
1889        tokio::task::LocalSet::new()
1890            .run_until(async {
1891                let dir = tempfile::tempdir().expect("tempdir");
1892                let source = r#"
1893pipeline test(task) {
1894  host_mock("hitl", "question", {answer: "9"})
1895  let answer: int = ask_user("Pick a number", {default: 0})
1896  __io_println(answer)
1897}
1898"#;
1899                let (
1900                    output,
1901                    question_events,
1902                    approval_events,
1903                    dual_control_events,
1904                    escalation_events,
1905                ) = execute_hitl_script(dir.path(), source)
1906                    .await
1907                    .expect("script succeeds");
1908                assert_eq!(output, "9");
1909                assert_eq!(
1910                    question_events,
1911                    vec![
1912                        "hitl.question_asked".to_string(),
1913                        "hitl.response_received".to_string()
1914                    ]
1915                );
1916                assert!(approval_events.is_empty());
1917                assert!(dual_control_events.is_empty());
1918                assert!(escalation_events.is_empty());
1919            })
1920            .await;
1921    }
1922
1923    #[tokio::test(flavor = "current_thread")]
1924    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1925        tokio::task::LocalSet::new()
1926            .run_until(async {
1927                let dir = tempfile::tempdir().expect("tempdir");
1928                let source = r#"
1929pipeline test(task) {
1930  host_mock("hitl", "approval", [
1931    {approved: true, reviewer: "alice", reason: "ok"},
1932    {approved: true, reviewer: "bob", reason: "ship it"},
1933  ])
1934  let record = request_approval(
1935    "deploy production",
1936    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1937  )
1938  __io_println(record.approved)
1939  __io_println(len(record.reviewers))
1940  __io_println(record.reviewers[0])
1941  __io_println(record.reviewers[1])
1942}
1943"#;
1944                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1945                    .await
1946                    .expect("script succeeds");
1947                assert_eq!(
1948                    approval_events,
1949                    vec![
1950                        "hitl.approval_requested".to_string(),
1951                        "hitl.response_received".to_string(),
1952                        "hitl.response_received".to_string(),
1953                        "hitl.approval_approved".to_string(),
1954                    ]
1955                );
1956            })
1957            .await;
1958    }
1959
1960    #[tokio::test(flavor = "current_thread")]
1961    async fn request_approval_emits_canonical_approval_request_payload() {
1962        tokio::task::LocalSet::new()
1963            .run_until(async {
1964                reset_thread_local_state();
1965                let dir = tempfile::tempdir().expect("tempdir");
1966                let log = install_default_for_base_dir(dir.path()).expect("install event log");
1967                let source = r#"
1968pipeline test(task) {
1969  host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
1970  request_approval("deploy production", {
1971    args: {environment: "prod"},
1972    quorum: 1,
1973    reviewers: ["alice"],
1974    evidence_refs: [{kind: "run", uri: "run_123"}],
1975    undo_metadata: {strategy: "rollback"},
1976    capabilities_requested: ["deploy.production"],
1977  })
1978}
1979"#;
1980                let chunk = compile_source(source).expect("compile source");
1981                let mut vm = Vm::new();
1982                register_vm_stdlib(&mut vm);
1983                vm.set_source_dir(dir.path());
1984                vm.execute(&chunk).await.expect("script succeeds");
1985
1986                let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
1987                let request_payload = &payloads[0]["payload"];
1988                let approval_request = &request_payload["approval_request"];
1989                assert_eq!(approval_request["id"], request_payload["id"]);
1990                assert_eq!(approval_request["action"], "deploy production");
1991                assert_eq!(approval_request["args"]["environment"], "prod");
1992                assert_eq!(approval_request["approvers_required"], 1);
1993                assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
1994                assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
1995                assert_eq!(
1996                    approval_request["capabilities_requested"][0],
1997                    "deploy.production"
1998                );
1999                assert!(approval_request["requested_at"].as_str().is_some());
2000                assert!(approval_request["deadline"].as_str().is_some());
2001            })
2002            .await;
2003    }
2004
2005    #[tokio::test(flavor = "current_thread")]
2006    async fn request_approval_surfaces_denials_as_typed_errors() {
2007        tokio::task::LocalSet::new()
2008            .run_until(async {
2009                let dir = tempfile::tempdir().expect("tempdir");
2010                let source = r#"
2011pipeline test(task) {
2012  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2013  let denied = try {
2014    request_approval("drop table", {reviewers: ["alice"]})
2015  }
2016  __io_println(is_err(denied))
2017  __io_println(unwrap_err(denied).name)
2018  __io_println(unwrap_err(denied).reason)
2019}
2020"#;
2021                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2022                    .await
2023                    .expect("script succeeds");
2024                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2025                assert_eq!(
2026                    approval_events,
2027                    vec![
2028                        "hitl.approval_requested".to_string(),
2029                        "hitl.response_received".to_string(),
2030                        "hitl.approval_denied".to_string(),
2031                    ]
2032                );
2033            })
2034            .await;
2035    }
2036
2037    #[tokio::test(flavor = "current_thread")]
2038    async fn dual_control_executes_action_after_quorum() {
2039        tokio::task::LocalSet::new()
2040            .run_until(async {
2041                let dir = tempfile::tempdir().expect("tempdir");
2042                let source = r#"
2043pipeline test(task) {
2044  host_mock("hitl", "dual_control", [
2045    {approved: true, reviewer: "alice"},
2046    {approved: true, reviewer: "bob"},
2047  ])
2048  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2049  __io_println(result)
2050}
2051"#;
2052                let (output, _, _, dual_control_events, _) =
2053                    execute_hitl_script(dir.path(), source)
2054                        .await
2055                        .expect("script succeeds");
2056                assert_eq!(output, "launched");
2057                assert_eq!(
2058                    dual_control_events,
2059                    vec![
2060                        "hitl.dual_control_requested".to_string(),
2061                        "hitl.response_received".to_string(),
2062                        "hitl.response_received".to_string(),
2063                        "hitl.dual_control_approved".to_string(),
2064                        "hitl.dual_control_executed".to_string(),
2065                    ]
2066                );
2067            })
2068            .await;
2069    }
2070
2071    #[tokio::test(flavor = "current_thread")]
2072    async fn escalate_to_waits_for_acceptance_event() {
2073        tokio::task::LocalSet::new()
2074            .run_until(async {
2075                let dir = tempfile::tempdir().expect("tempdir");
2076                let source = r#"
2077pipeline test(task) {
2078  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2079  let handle = escalate_to("admin", "need override")
2080  __io_println(handle.status)
2081  __io_println(handle.reviewer)
2082}
2083"#;
2084                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2085                    .await
2086                    .expect("script succeeds");
2087                assert_eq!(output, "accepted\nlead");
2088                assert_eq!(
2089                    escalation_events,
2090                    vec![
2091                        "hitl.escalation_issued".to_string(),
2092                        "hitl.escalation_accepted".to_string(),
2093                    ]
2094                );
2095            })
2096            .await;
2097    }
2098
2099    /// `harn-serve` adapters (A2A `input-required`, ACP `hitl_request`)
2100    /// rely on the canonical `AgentEvent::HitlRequested` /
2101    /// `AgentEvent::HitlResolved` pair to bracket every HITL pause.
2102    /// Pin the contract here so future HITL primitives keep emitting
2103    /// the event around their waitpoint blocks.
2104    #[tokio::test(flavor = "current_thread")]
2105    async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2106        use std::sync::Mutex as StdMutex;
2107
2108        tokio::task::LocalSet::new()
2109            .run_until(async {
2110                let dir = tempfile::tempdir().expect("tempdir");
2111                let session_id = "hitl-session".to_string();
2112                let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2113                    std::sync::Arc::new(StdMutex::new(Vec::new()));
2114
2115                struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2116                impl crate::agent_events::AgentEventSink for CaptureSink {
2117                    fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2118                        self.0.lock().expect("captured").push(event.clone());
2119                    }
2120                }
2121
2122                // Inline the script setup rather than using the
2123                // `execute_hitl_script` helper: that helper calls
2124                // `reset_thread_local_state` (which wipes the session
2125                // store), so any session pushed before it would be
2126                // gone by the time `ask_user` runs.
2127                crate::reset_thread_local_state();
2128                crate::event_log::install_default_for_base_dir(dir.path())
2129                    .expect("install event log");
2130
2131                crate::agent_events::reset_all_sinks();
2132                let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2133                    std::sync::Arc::new(CaptureSink(captured.clone()));
2134                crate::agent_events::register_sink(session_id.clone(), sink);
2135                crate::agent_sessions::open_or_create(Some(session_id.clone()));
2136                let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2137
2138                let source = r#"
2139pipeline test(task) {
2140  host_mock("hitl", "question", {answer: "ok"})
2141  let answer: string = ask_user("Are you sure?", {default: "no"})
2142  __io_println(answer)
2143}
2144"#;
2145                let chunk = crate::compile_source(source).expect("compile source");
2146                let mut vm = Vm::new();
2147                register_vm_stdlib(&mut vm);
2148                vm.set_source_dir(dir.path());
2149                vm.execute(&chunk).await.expect("script runs");
2150                assert_eq!(vm.output().trim_end(), "ok");
2151
2152                let events = captured.lock().expect("captured");
2153                let mut iter = events.iter().filter(|event| {
2154                    matches!(
2155                        event,
2156                        crate::agent_events::AgentEvent::HitlRequested { .. }
2157                            | crate::agent_events::AgentEvent::HitlResolved { .. }
2158                    )
2159                });
2160                let requested = iter.next().expect("HitlRequested emitted");
2161                let resolved = iter.next().expect("HitlResolved emitted");
2162                assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2163
2164                let crate::agent_events::AgentEvent::HitlRequested {
2165                    session_id: req_session,
2166                    request_id: req_id,
2167                    kind: req_kind,
2168                    payload,
2169                } = requested
2170                else {
2171                    panic!("expected HitlRequested, got: {requested:?}");
2172                };
2173                assert_eq!(req_session, &session_id);
2174                assert_eq!(req_kind, "question");
2175                assert!(req_id.starts_with("hitl_question_"));
2176                assert_eq!(payload["prompt"], "Are you sure?");
2177
2178                let crate::agent_events::AgentEvent::HitlResolved {
2179                    request_id: res_id,
2180                    kind: res_kind,
2181                    outcome,
2182                    ..
2183                } = resolved
2184                else {
2185                    panic!("expected HitlResolved, got: {resolved:?}");
2186                };
2187                assert_eq!(res_id, req_id);
2188                assert_eq!(res_kind, "question");
2189                assert_eq!(outcome, "answered");
2190
2191                drop(_guard);
2192                crate::agent_events::reset_all_sinks();
2193            })
2194            .await;
2195    }
2196}