Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24    WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45    pub(crate) instance_key: String,
46    pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52    Question,
53    Approval,
54    DualControl,
55    Escalation,
56}
57
58impl HitlRequestKind {
59    pub(crate) fn as_str(self) -> &'static str {
60        match self {
61            Self::Question => "question",
62            Self::Approval => "approval",
63            Self::DualControl => "dual_control",
64            Self::Escalation => "escalation",
65        }
66    }
67
68    fn topic(self) -> &'static str {
69        match self {
70            Self::Question => HITL_QUESTIONS_TOPIC,
71            Self::Approval => HITL_APPROVALS_TOPIC,
72            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73            Self::Escalation => HITL_ESCALATIONS_TOPIC,
74        }
75    }
76
77    fn request_event_kind(self) -> &'static str {
78        match self {
79            Self::Question => "hitl.question_asked",
80            Self::Approval => "hitl.approval_requested",
81            Self::DualControl => "hitl.dual_control_requested",
82            Self::Escalation => "hitl.escalation_issued",
83        }
84    }
85
86    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87        if request_id.starts_with("hitl_question_") {
88            Some(Self::Question)
89        } else if request_id.starts_with("hitl_approval_") {
90            Some(Self::Approval)
91        } else if request_id.starts_with("hitl_dual_control_") {
92            Some(Self::DualControl)
93        } else if request_id.starts_with("hitl_escalation_") {
94            Some(Self::Escalation)
95        } else {
96            None
97        }
98    }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103    pub request_id: String,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub answer: Option<JsonValue>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub approved: Option<bool>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub accepted: Option<bool>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub reviewer: Option<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub reason: Option<String>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub metadata: Option<JsonValue>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub responded_at: Option<String>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124    request_id: String,
125    kind: HitlRequestKind,
126    #[serde(default)]
127    agent: String,
128    trace_id: String,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    run_id: Option<String>,
131    requested_at: String,
132    payload: JsonValue,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136struct HitlTimeoutRecord {
137    request_id: String,
138    kind: HitlRequestKind,
139    trace_id: String,
140    timed_out_at: String,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
144pub struct ApprovalRequest {
145    pub id: String,
146    pub action: String,
147    #[serde(default)]
148    pub args: JsonValue,
149    pub principal: String,
150    pub requested_at: String,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub deadline: Option<String>,
153    pub approvers_required: u32,
154    #[serde(default)]
155    pub evidence_refs: Vec<JsonValue>,
156    #[serde(default)]
157    pub undo_metadata: JsonValue,
158    #[serde(default)]
159    pub capabilities_requested: Vec<String>,
160}
161
162impl ApprovalRequest {
163    pub fn new(
164        id: impl Into<String>,
165        action: impl Into<String>,
166        args: JsonValue,
167        principal: impl Into<String>,
168        requested_at: impl Into<String>,
169    ) -> Self {
170        Self {
171            id: id.into(),
172            action: action.into(),
173            args,
174            principal: principal.into(),
175            requested_at: requested_at.into(),
176            deadline: None,
177            approvers_required: 1,
178            evidence_refs: Vec::new(),
179            undo_metadata: JsonValue::Null,
180            capabilities_requested: Vec::new(),
181        }
182    }
183}
184
185pub(crate) fn approval_request_for_host_permission(
186    id: impl Into<String>,
187    action: impl Into<String>,
188    args: JsonValue,
189    principal: impl Into<String>,
190    evidence_refs: Vec<JsonValue>,
191    undo_metadata: JsonValue,
192    capabilities_requested: Vec<String>,
193) -> ApprovalRequest {
194    let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
195    request.evidence_refs = evidence_refs;
196    request.undo_metadata = undo_metadata;
197    request.capabilities_requested = capabilities_requested;
198    request
199}
200
201#[derive(Clone, Debug)]
202struct DispatchKeys {
203    instance_key: String,
204    stable_base: String,
205    agent: String,
206    trace_id: String,
207}
208
209#[derive(Clone, Debug)]
210struct AskUserOptions {
211    schema: Option<VmValue>,
212    timeout: Option<StdDuration>,
213    default: Option<VmValue>,
214}
215
216#[derive(Clone, Debug)]
217struct ApprovalOptions {
218    detail: Option<VmValue>,
219    args: Option<VmValue>,
220    quorum: u32,
221    reviewers: Vec<String>,
222    deadline: StdDuration,
223    principal: Option<String>,
224    evidence_refs: Vec<JsonValue>,
225    undo_metadata: Option<JsonValue>,
226    capabilities_requested: Vec<String>,
227}
228
229#[derive(Clone, Debug)]
230struct ApprovalProgress {
231    request_id: String,
232    reviewers: BTreeSet<String>,
233    signatures: Vec<ApprovalSignature>,
234    reason: Option<String>,
235    approved_at: Option<String>,
236}
237
238#[derive(Clone, Debug, Serialize)]
239struct ApprovalSignature {
240    reviewer: String,
241    signed_at: String,
242    signature: String,
243}
244
245#[derive(Clone, Debug)]
246enum ApprovalResolution {
247    Pending,
248    Approved(ApprovalProgress),
249    Denied(HitlHostResponse),
250}
251
252#[derive(Clone, Debug)]
253enum WaitpointOutcome {
254    Completed(WaitpointRecord),
255    Timeout,
256    Cancelled {
257        wait_id: String,
258        waitpoint_ids: Vec<String>,
259        reason: Option<String>,
260    },
261}
262
263pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
264    vm.register_async_builtin("ask_user", |args| {
265        Box::pin(async move { ask_user_impl(&args).await })
266    });
267
268    vm.register_async_builtin("request_approval", |args| {
269        Box::pin(async move { request_approval_impl(&args).await })
270    });
271
272    vm.register_async_builtin("dual_control", |args| {
273        Box::pin(async move { dual_control_impl(&args).await })
274    });
275
276    vm.register_async_builtin("escalate_to", |args| {
277        Box::pin(async move { escalate_to_impl(&args).await })
278    });
279}
280
281pub(crate) fn reset_hitl_state() {
282    REQUEST_SEQUENCE.with(|slot| {
283        *slot.borrow_mut() = RequestSequenceState::default();
284    });
285}
286
287pub(crate) fn take_hitl_state() -> RequestSequenceState {
288    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
289}
290
291pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
292    REQUEST_SEQUENCE.with(|slot| {
293        *slot.borrow_mut() = state;
294    });
295}
296
297pub async fn append_hitl_response(
298    base_dir: Option<&Path>,
299    mut response: HitlHostResponse,
300) -> Result<u64, String> {
301    let kind = HitlRequestKind::from_request_id(&response.request_id)
302        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
303    if response.responded_at.is_none() {
304        response.responded_at = Some(now_rfc3339());
305    }
306    let log = ensure_hitl_event_log_for(base_dir)?;
307    let headers = response_headers(&response.request_id);
308    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
309    let event_id = log
310        .append(
311            &topic,
312            LogEvent::new(
313                match kind {
314                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
315                    _ => "hitl.response_received",
316                },
317                serde_json::to_value(&response).map_err(|error| error.to_string())?,
318            )
319            .with_headers(headers),
320        )
321        .await
322        .map_err(|error| error.to_string())?;
323    finalize_hitl_response(&log, kind, &response).await?;
324    Ok(event_id)
325}
326
327pub async fn append_approval_request_on(
328    log: &Arc<AnyEventLog>,
329    agent: impl Into<String>,
330    trace_id: impl Into<String>,
331    action: impl Into<String>,
332    detail: JsonValue,
333    reviewers: Vec<String>,
334) -> Result<String, VmError> {
335    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
336    let trace_id = trace_id.into();
337    let agent = agent.into();
338    let requested_at_time = OffsetDateTime::now_utc();
339    let requested_at = format_rfc3339(requested_at_time);
340    let mut approval_request = ApprovalRequest::new(
341        request_id.clone(),
342        action.into(),
343        detail.clone(),
344        agent.clone(),
345        requested_at.clone(),
346    );
347    approval_request.deadline = deadline_after(
348        requested_at_time,
349        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
350    );
351    approval_request.approvers_required = 1;
352    let approval_request_json = serde_json::to_value(&approval_request)
353        .map_err(|error| VmError::Runtime(error.to_string()))?;
354    let request = HitlRequestEnvelope {
355        request_id: request_id.clone(),
356        kind: HitlRequestKind::Approval,
357        agent,
358        trace_id: trace_id.clone(),
359        run_id: None,
360        requested_at: requested_at.clone(),
361        payload: json!({
362            "approval_request": approval_request_json,
363            "id": approval_request.id,
364            "action": approval_request.action,
365            "args": approval_request.args,
366            "principal": approval_request.principal,
367            "requested_at": requested_at,
368            "deadline": approval_request.deadline,
369            "approvers_required": approval_request.approvers_required,
370            "evidence_refs": approval_request.evidence_refs,
371            "undo_metadata": approval_request.undo_metadata,
372            "capabilities_requested": approval_request.capabilities_requested,
373            "detail": detail,
374            "quorum": 1,
375            "reviewers": reviewers,
376            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
377        }),
378    };
379    create_request_waitpoint(log, &request).await?;
380    append_request(log, &request).await?;
381    maybe_notify_host(&request);
382    Ok(request_id)
383}
384
385async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
386    let prompt = required_string_arg(args, 0, "ask_user")?;
387    let options = parse_ask_user_options(args.get(1))?;
388    let keys = current_dispatch_keys();
389    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
390    let trace_id = keys
391        .as_ref()
392        .map(|keys| keys.trace_id.clone())
393        .unwrap_or_else(new_trace_id);
394    let log = ensure_hitl_event_log();
395    let request = HitlRequestEnvelope {
396        request_id: request_id.clone(),
397        kind: HitlRequestKind::Question,
398        agent: keys
399            .as_ref()
400            .map(|keys| keys.agent.clone())
401            .unwrap_or_default(),
402        trace_id: trace_id.clone(),
403        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
404        requested_at: now_rfc3339(),
405        payload: json!({
406            "prompt": prompt,
407            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
408            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
409            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
410        }),
411    };
412    create_request_waitpoint(&log, &request).await?;
413    append_request(&log, &request).await?;
414    maybe_notify_host(&request);
415    emit_hitl_requested(&request);
416    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
417
418    match wait_for_request_waitpoint_with_events(
419        &request_id,
420        HitlRequestKind::Question,
421        options.timeout,
422    )
423    .await?
424    {
425        WaitpointOutcome::Completed(record) => {
426            let answer = record
427                .value
428                .as_ref()
429                .map(crate::stdlib::json_to_vm_value)
430                .unwrap_or(VmValue::Nil);
431            if let Some(schema) = options.schema.as_ref() {
432                return schema_expect_value(&answer, schema, true);
433            }
434            if let Some(default) = options.default.as_ref() {
435                return Ok(coerce_like_default(&answer, default));
436            }
437            Ok(answer)
438        }
439        WaitpointOutcome::Timeout => {
440            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
441            if let Some(default) = options.default {
442                return Ok(default);
443            }
444            Err(timeout_error(&request_id, HitlRequestKind::Question))
445        }
446        WaitpointOutcome::Cancelled {
447            wait_id,
448            waitpoint_ids,
449            reason,
450        } => Err(hitl_cancelled_error(
451            &request_id,
452            HitlRequestKind::Question,
453            &wait_id,
454            &waitpoint_ids,
455            reason,
456        )),
457    }
458}
459
460async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
461    let action = required_string_arg(args, 0, "request_approval")?;
462    let options = parse_approval_options(args.get(1), "request_approval")?;
463    let keys = current_dispatch_keys();
464    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
465    let trace_id = keys
466        .as_ref()
467        .map(|keys| keys.trace_id.clone())
468        .unwrap_or_else(new_trace_id);
469    let agent = keys
470        .as_ref()
471        .map(|keys| keys.agent.clone())
472        .unwrap_or_default();
473    let requested_at_time = OffsetDateTime::now_utc();
474    let requested_at = format_rfc3339(requested_at_time);
475    let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
476    let approval_args = options
477        .args
478        .as_ref()
479        .or(options.detail.as_ref())
480        .map(crate::llm::vm_value_to_json)
481        .unwrap_or(JsonValue::Null);
482    let mut approval_request = ApprovalRequest::new(
483        request_id.clone(),
484        action.clone(),
485        approval_args,
486        principal,
487        requested_at.clone(),
488    );
489    approval_request.deadline = deadline_after(requested_at_time, options.deadline);
490    approval_request.approvers_required = options.quorum;
491    approval_request.evidence_refs = options.evidence_refs.clone();
492    approval_request.undo_metadata = options
493        .undo_metadata
494        .clone()
495        .or_else(|| {
496            crate::orchestration::current_mutation_session()
497                .and_then(|session| serde_json::to_value(session).ok())
498        })
499        .unwrap_or(JsonValue::Null);
500    approval_request.capabilities_requested = options.capabilities_requested.clone();
501    let approval_request_json = serde_json::to_value(&approval_request)
502        .map_err(|error| VmError::Runtime(error.to_string()))?;
503    let log = ensure_hitl_event_log();
504    let request = HitlRequestEnvelope {
505        request_id: request_id.clone(),
506        kind: HitlRequestKind::Approval,
507        agent,
508        trace_id: trace_id.clone(),
509        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
510        requested_at: requested_at.clone(),
511        payload: json!({
512            "approval_request": approval_request_json,
513            "id": approval_request.id,
514            "action": action,
515            "args": approval_request.args,
516            "principal": approval_request.principal,
517            "requested_at": requested_at,
518            "deadline": approval_request.deadline,
519            "approvers_required": approval_request.approvers_required,
520            "evidence_refs": approval_request.evidence_refs,
521            "undo_metadata": approval_request.undo_metadata,
522            "capabilities_requested": approval_request.capabilities_requested,
523            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
524            "quorum": options.quorum,
525            "reviewers": options.reviewers,
526            "deadline_ms": options.deadline.as_millis() as u64,
527        }),
528    };
529    create_request_waitpoint(&log, &request).await?;
530    append_request(&log, &request).await?;
531    maybe_notify_host(&request);
532    emit_hitl_requested(&request);
533    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
534
535    match wait_for_request_waitpoint_with_events(
536        &request_id,
537        HitlRequestKind::Approval,
538        Some(options.deadline),
539    )
540    .await?
541    {
542        WaitpointOutcome::Completed(record) => {
543            approval_record_from_waitpoint(&record, "request_approval")
544        }
545        WaitpointOutcome::Timeout => {
546            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
547            Err(timeout_error(&request_id, HitlRequestKind::Approval))
548        }
549        WaitpointOutcome::Cancelled { .. } => {
550            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
551        }
552    }
553}
554
555pub(crate) async fn request_approval_for_side_effect(
556    action: &str,
557    detail: JsonValue,
558    principal: String,
559    reviewers: Vec<String>,
560    capabilities_requested: Vec<String>,
561) -> Result<VmValue, VmError> {
562    let mut options = BTreeMap::new();
563    options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
564    options.insert(
565        "detail".to_string(),
566        crate::stdlib::json_to_vm_value(&detail),
567    );
568    options.insert(
569        "principal".to_string(),
570        VmValue::String(Rc::from(principal)),
571    );
572    options.insert(
573        "reviewers".to_string(),
574        VmValue::List(Rc::new(
575            reviewers
576                .into_iter()
577                .map(|reviewer| VmValue::String(Rc::from(reviewer)))
578                .collect(),
579        )),
580    );
581    options.insert(
582        "capabilities_requested".to_string(),
583        VmValue::List(Rc::new(
584            capabilities_requested
585                .into_iter()
586                .map(|capability| VmValue::String(Rc::from(capability)))
587                .collect(),
588        )),
589    );
590    let args = vec![
591        VmValue::String(Rc::from(action.to_string())),
592        VmValue::Dict(Rc::new(options)),
593    ];
594    request_approval_impl(&args).await
595}
596
597async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
598    let n = required_positive_int_arg(args, 0, "dual_control")?;
599    let m = required_positive_int_arg(args, 1, "dual_control")?;
600    if n > m {
601        return Err(VmError::Runtime(
602            "dual_control: n must be less than or equal to m".to_string(),
603        ));
604    }
605    let action = args
606        .get(2)
607        .and_then(|value| match value {
608            VmValue::Closure(closure) => Some(closure.clone()),
609            _ => None,
610        })
611        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
612    let approvers = optional_string_list(args.get(3), "dual_control")?;
613    if !approvers.is_empty() && approvers.len() < m as usize {
614        return Err(VmError::Runtime(format!(
615            "dual_control: expected at least {m} approvers, got {}",
616            approvers.len()
617        )));
618    }
619
620    let keys = current_dispatch_keys();
621    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
622    let trace_id = keys
623        .as_ref()
624        .map(|keys| keys.trace_id.clone())
625        .unwrap_or_else(new_trace_id);
626    let action_name = if action.func.name.is_empty() {
627        "anonymous".to_string()
628    } else {
629        action.func.name.clone()
630    };
631    let agent = keys
632        .as_ref()
633        .map(|keys| keys.agent.clone())
634        .unwrap_or_default();
635    let requested_at_time = OffsetDateTime::now_utc();
636    let requested_at = format_rfc3339(requested_at_time);
637    let mut approval_request = ApprovalRequest::new(
638        request_id.clone(),
639        action_name.clone(),
640        json!({
641            "n": n,
642            "m": m,
643            "approvers": approvers.clone(),
644        }),
645        agent.clone(),
646        requested_at.clone(),
647    );
648    approval_request.deadline = deadline_after(
649        requested_at_time,
650        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
651    );
652    approval_request.approvers_required = n as u32;
653    approval_request.undo_metadata = crate::orchestration::current_mutation_session()
654        .and_then(|session| serde_json::to_value(session).ok())
655        .unwrap_or(JsonValue::Null);
656    let approval_request_json = serde_json::to_value(&approval_request)
657        .map_err(|error| VmError::Runtime(error.to_string()))?;
658    let log = ensure_hitl_event_log();
659    let request = HitlRequestEnvelope {
660        request_id: request_id.clone(),
661        kind: HitlRequestKind::DualControl,
662        agent,
663        trace_id: trace_id.clone(),
664        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
665        requested_at: requested_at.clone(),
666        payload: json!({
667            "approval_request": approval_request_json,
668            "id": approval_request.id,
669            "args": approval_request.args,
670            "principal": approval_request.principal,
671            "requested_at": requested_at,
672            "deadline": approval_request.deadline,
673            "approvers_required": approval_request.approvers_required,
674            "evidence_refs": approval_request.evidence_refs,
675            "undo_metadata": approval_request.undo_metadata,
676            "capabilities_requested": approval_request.capabilities_requested,
677            "n": n,
678            "m": m,
679            "action": action_name,
680            "approvers": approvers,
681            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
682        }),
683    };
684    create_request_waitpoint(&log, &request).await?;
685    append_request(&log, &request).await?;
686    maybe_notify_host(&request);
687    emit_hitl_requested(&request);
688    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
689
690    match wait_for_request_waitpoint_with_events(
691        &request_id,
692        HitlRequestKind::DualControl,
693        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
694    )
695    .await?
696    {
697        WaitpointOutcome::Completed(record) => {
698            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
699            let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
700                VmError::Runtime("dual_control requires an async builtin VM context".to_string())
701            })?;
702            let result = vm.call_closure_pub(&action, &[]).await?;
703
704            append_named_event(
705                &log,
706                HitlRequestKind::DualControl,
707                "hitl.dual_control_executed",
708                &request_id,
709                &trace_id,
710                json!({
711                    "request_id": request_id,
712                    "result": crate::llm::vm_value_to_json(&result),
713                }),
714            )
715            .await?;
716
717            Ok(result)
718        }
719        WaitpointOutcome::Timeout => {
720            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
721            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
722        }
723        WaitpointOutcome::Cancelled { .. } => {
724            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
725        }
726    }
727}
728
729async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
730    let role = required_string_arg(args, 0, "escalate_to")?;
731    let reason = required_string_arg(args, 1, "escalate_to")?;
732    let keys = current_dispatch_keys();
733    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
734    let trace_id = keys
735        .as_ref()
736        .map(|keys| keys.trace_id.clone())
737        .unwrap_or_else(new_trace_id);
738    let log = ensure_hitl_event_log();
739    let request = HitlRequestEnvelope {
740        request_id: request_id.clone(),
741        kind: HitlRequestKind::Escalation,
742        agent: keys
743            .as_ref()
744            .map(|keys| keys.agent.clone())
745            .unwrap_or_default(),
746        trace_id: trace_id.clone(),
747        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
748        requested_at: now_rfc3339(),
749        payload: json!({
750            "role": role,
751            "reason": reason,
752            "capability_policy": escalation_capability_policy(),
753        }),
754    };
755    create_request_waitpoint(&log, &request).await?;
756    append_request(&log, &request).await?;
757    maybe_notify_host(&request);
758    emit_hitl_requested(&request);
759    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
760
761    match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
762        .await?
763    {
764        WaitpointOutcome::Completed(record) => {
765            let accepted_at = record.completed_at.clone();
766            let reviewer = record.completed_by.clone();
767            let accepted = record
768                .value
769                .as_ref()
770                .and_then(|value| value.get("accepted"))
771                .and_then(JsonValue::as_bool)
772                .unwrap_or(true);
773            Ok(crate::stdlib::json_to_vm_value(&json!({
774                "request_id": request_id,
775                "role": role,
776                "reason": reason,
777                "trace_id": trace_id,
778                "status": if accepted { "accepted" } else { "pending" },
779                "accepted_at": accepted_at,
780                "reviewer": reviewer,
781            })))
782        }
783        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
784        WaitpointOutcome::Cancelled {
785            wait_id,
786            waitpoint_ids,
787            reason,
788        } => Err(hitl_cancelled_error(
789            &request_id,
790            HitlRequestKind::Escalation,
791            &wait_id,
792            &waitpoint_ids,
793            reason,
794        )),
795    }
796}
797
798async fn create_request_waitpoint(
799    log: &Arc<AnyEventLog>,
800    request: &HitlRequestEnvelope,
801) -> Result<(), VmError> {
802    create_waitpoint_on(
803        log,
804        Some(request.request_id.clone()),
805        Some(json!({
806            "kind": request.kind.as_str(),
807            "agent": request.agent.clone(),
808            "trace_id": request.trace_id.clone(),
809            "requested_at": request.requested_at.clone(),
810            "payload": request.payload.clone(),
811        })),
812    )
813    .await?;
814    Ok(())
815}
816
817async fn wait_for_request_waitpoint(
818    request_id: &str,
819    timeout: Option<StdDuration>,
820) -> Result<WaitpointOutcome, VmError> {
821    match wait_on_waitpoints(
822        vec![request_id.to_string()],
823        WaitpointWaitOptions { timeout },
824    )
825    .await
826    {
827        Ok(records) => Ok(WaitpointOutcome::Completed(
828            records
829                .into_iter()
830                .next()
831                .expect("single waitpoint wait result"),
832        )),
833        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
834        Err(WaitpointWaitFailure::Cancelled {
835            wait_id,
836            waitpoint_ids,
837            reason,
838        }) => Ok(WaitpointOutcome::Cancelled {
839            wait_id,
840            waitpoint_ids,
841            reason,
842        }),
843        Err(WaitpointWaitFailure::Vm(error)) => {
844            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
845                return Ok(outcome);
846            }
847            Err(error)
848        }
849    }
850}
851
852fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
853    let VmError::Thrown(VmValue::Dict(dict)) = error else {
854        return None;
855    };
856    let name = dict.get("name").and_then(vm_string)?;
857    match name {
858        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
859        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
860            wait_id: dict
861                .get("wait_id")
862                .and_then(vm_string)
863                .unwrap_or_default()
864                .to_string(),
865            waitpoint_ids: dict
866                .get("waitpoint_ids")
867                .and_then(vm_string_list)
868                .unwrap_or_default(),
869            reason: dict
870                .get("reason")
871                .and_then(vm_string)
872                .map(ToString::to_string),
873        }),
874        _ => None,
875    }
876}
877
878async fn finalize_hitl_response(
879    log: &Arc<AnyEventLog>,
880    kind: HitlRequestKind,
881    response: &HitlHostResponse,
882) -> Result<(), String> {
883    match kind {
884        HitlRequestKind::Question => {
885            if waitpoint_is_terminal(log, &response.request_id).await? {
886                return Ok(());
887            }
888            complete_waitpoint_on(
889                log,
890                &response.request_id,
891                response.answer.clone(),
892                response.reviewer.clone(),
893                response.reason.clone(),
894                response.metadata.clone(),
895            )
896            .await
897            .map(|_| ())
898            .map_err(|error| error.to_string())
899        }
900        HitlRequestKind::Escalation => {
901            if !response.accepted.unwrap_or(false)
902                || waitpoint_is_terminal(log, &response.request_id).await?
903            {
904                return Ok(());
905            }
906            complete_waitpoint_on(
907                log,
908                &response.request_id,
909                Some(json!({
910                    "accepted": true,
911                    "reviewer": response.reviewer,
912                    "reason": response.reason,
913                    "responded_at": response.responded_at,
914                })),
915                response.reviewer.clone(),
916                response.reason.clone(),
917                response.metadata.clone(),
918            )
919            .await
920            .map(|_| ())
921            .map_err(|error| error.to_string())
922        }
923        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
924            if waitpoint_is_terminal(log, &response.request_id).await? {
925                return Ok(());
926            }
927            let request = load_request_envelope(log, kind, &response.request_id)
928                .await
929                .map_err(|error| error.to_string())?;
930            match resolve_approval_state(log, kind, &request)
931                .await
932                .map_err(|error| error.to_string())?
933            {
934                ApprovalResolution::Pending => Ok(()),
935                ApprovalResolution::Approved(progress) => {
936                    let record = approval_record_json(&progress);
937                    append_named_event(
938                        log,
939                        kind,
940                        approved_event_kind(kind),
941                        &request.request_id,
942                        &request.trace_id,
943                        json!({
944                            "request_id": request.request_id.clone(),
945                            "record": record.clone(),
946                        }),
947                    )
948                    .await
949                    .map_err(|error| error.to_string())?;
950                    complete_waitpoint_on(
951                        log,
952                        &request.request_id,
953                        Some(record),
954                        response.reviewer.clone(),
955                        progress.reason.clone(),
956                        response.metadata.clone(),
957                    )
958                    .await
959                    .map(|_| ())
960                    .map_err(|error| error.to_string())
961                }
962                ApprovalResolution::Denied(denied) => {
963                    append_named_event(
964                        log,
965                        kind,
966                        denied_event_kind(kind),
967                        &request.request_id,
968                        &request.trace_id,
969                        json!({
970                            "request_id": request.request_id.clone(),
971                            "reviewer": denied.reviewer.clone(),
972                            "reason": denied.reason.clone(),
973                        }),
974                    )
975                    .await
976                    .map_err(|error| error.to_string())?;
977                    cancel_waitpoint_on(
978                        log,
979                        &request.request_id,
980                        denied.reviewer.clone(),
981                        denied.reason.clone(),
982                        denied.metadata.clone(),
983                    )
984                    .await
985                    .map(|_| ())
986                    .map_err(|error| error.to_string())
987                }
988            }
989        }
990    }
991}
992
993async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
994    Ok(inspect_waitpoint_on(log, request_id)
995        .await
996        .map_err(|error| error.to_string())?
997        .is_some_and(|record| record.status != WaitpointStatus::Open))
998}
999
1000async fn load_request_envelope(
1001    log: &Arc<AnyEventLog>,
1002    kind: HitlRequestKind,
1003    request_id: &str,
1004) -> Result<HitlRequestEnvelope, VmError> {
1005    let topic = topic(kind)?;
1006    let events = log
1007        .read_range(&topic, None, usize::MAX)
1008        .await
1009        .map_err(log_error)?;
1010    events
1011        .into_iter()
1012        .filter(|(_, event)| event.kind == kind.request_event_kind())
1013        .find_map(|(_, event)| {
1014            if !event_matches_request(&event, request_id) {
1015                return None;
1016            }
1017            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1018        })
1019        .ok_or_else(|| {
1020            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1021        })
1022}
1023
1024async fn resolve_approval_state(
1025    log: &Arc<AnyEventLog>,
1026    kind: HitlRequestKind,
1027    request: &HitlRequestEnvelope,
1028) -> Result<ApprovalResolution, VmError> {
1029    let quorum = approval_quorum_from_request(kind, request)?;
1030    let allowed_reviewers = approval_reviewers_from_request(kind, request)
1031        .into_iter()
1032        .collect::<BTreeSet<_>>();
1033    let mut progress = ApprovalProgress {
1034        request_id: request.request_id.clone(),
1035        reviewers: BTreeSet::new(),
1036        signatures: Vec::new(),
1037        reason: None,
1038        approved_at: None,
1039    };
1040    let topic = topic(kind)?;
1041    let events = log
1042        .read_range(&topic, None, usize::MAX)
1043        .await
1044        .map_err(log_error)?;
1045    for (_, event) in events {
1046        if !event_matches_request(&event, &request.request_id)
1047            || event.kind != "hitl.response_received"
1048        {
1049            continue;
1050        }
1051        let response: HitlHostResponse = serde_json::from_value(event.payload)
1052            .map_err(|error| VmError::Runtime(error.to_string()))?;
1053        if let Some(reviewer) = response.reviewer.as_deref() {
1054            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1055                continue;
1056            }
1057            if progress.reviewers.contains(reviewer) {
1058                continue;
1059            }
1060        }
1061        if response.approved.unwrap_or(false) {
1062            if let Some(reviewer) = response.reviewer.clone() {
1063                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1064                progress.reviewers.insert(reviewer.clone());
1065                progress.signatures.push(ApprovalSignature {
1066                    reviewer: reviewer.clone(),
1067                    signed_at: signed_at.clone(),
1068                    signature: response.signature.clone().unwrap_or_else(|| {
1069                        approval_receipt_signature(
1070                            &request.request_id,
1071                            &reviewer,
1072                            &signed_at,
1073                            true,
1074                            response.reason.as_deref(),
1075                        )
1076                    }),
1077                });
1078            }
1079            progress.reason = response.reason.clone();
1080            progress.approved_at = response.responded_at.clone();
1081            if progress.reviewers.len() as u32 >= quorum {
1082                return Ok(ApprovalResolution::Approved(progress));
1083            }
1084            continue;
1085        }
1086        return Ok(ApprovalResolution::Denied(response));
1087    }
1088    Ok(ApprovalResolution::Pending)
1089}
1090
1091fn approval_quorum_from_request(
1092    kind: HitlRequestKind,
1093    request: &HitlRequestEnvelope,
1094) -> Result<u32, VmError> {
1095    let key = match kind {
1096        HitlRequestKind::DualControl => "n",
1097        _ => "quorum",
1098    };
1099    let quorum = request
1100        .payload
1101        .get(key)
1102        .or_else(|| request.payload.get("approvers_required"))
1103        .or_else(|| {
1104            request
1105                .payload
1106                .get("approval_request")
1107                .and_then(|approval| approval.get("approvers_required"))
1108        })
1109        .and_then(JsonValue::as_u64)
1110        .unwrap_or(1);
1111    u32::try_from(quorum).map_err(|_| {
1112        VmError::Runtime(format!(
1113            "invalid quorum in HITL request '{}'",
1114            request.request_id
1115        ))
1116    })
1117}
1118
1119fn approval_reviewers_from_request(
1120    kind: HitlRequestKind,
1121    request: &HitlRequestEnvelope,
1122) -> Vec<String> {
1123    let key = match kind {
1124        HitlRequestKind::DualControl => "approvers",
1125        _ => "reviewers",
1126    };
1127    request
1128        .payload
1129        .get(key)
1130        .or_else(|| {
1131            request
1132                .payload
1133                .get("approval_request")
1134                .and_then(|approval| approval.get("reviewers"))
1135        })
1136        .and_then(JsonValue::as_array)
1137        .map(|values| {
1138            values
1139                .iter()
1140                .filter_map(JsonValue::as_str)
1141                .map(str::to_string)
1142                .collect()
1143        })
1144        .unwrap_or_default()
1145}
1146
1147fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1148    json!({
1149        "request_id": progress.request_id.clone(),
1150        "approved": true,
1151        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1152        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1153        "reason": progress.reason,
1154        "signatures": progress.signatures,
1155    })
1156}
1157
1158fn approval_receipt_signature(
1159    request_id: &str,
1160    reviewer: &str,
1161    signed_at: &str,
1162    approved: bool,
1163    reason: Option<&str>,
1164) -> String {
1165    let material = format!(
1166        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1167        reason.unwrap_or("")
1168    );
1169    let hash = sha2::Sha256::digest(material.as_bytes());
1170    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1171    format!("sha256:{hex}")
1172}
1173
1174fn approval_record_from_waitpoint(
1175    record: &WaitpointRecord,
1176    builtin: &str,
1177) -> Result<VmValue, VmError> {
1178    record
1179        .value
1180        .as_ref()
1181        .map(crate::stdlib::json_to_vm_value)
1182        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1183}
1184
1185async fn approval_wait_error(
1186    log: &Arc<AnyEventLog>,
1187    kind: HitlRequestKind,
1188    request_id: &str,
1189) -> VmError {
1190    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1191        if record.status == WaitpointStatus::Cancelled
1192            && record.reason.as_deref() != Some("upstream_cancelled")
1193        {
1194            return approval_denied_error(
1195                request_id,
1196                HitlHostResponse {
1197                    request_id: request_id.to_string(),
1198                    answer: None,
1199                    approved: Some(false),
1200                    accepted: None,
1201                    reviewer: record.cancelled_by.clone(),
1202                    reason: record.reason.clone(),
1203                    metadata: record.metadata.clone(),
1204                    responded_at: record.cancelled_at.clone(),
1205                    signature: None,
1206                },
1207            );
1208        }
1209        if record.status == WaitpointStatus::Cancelled {
1210            return hitl_cancelled_error(
1211                request_id,
1212                kind,
1213                "",
1214                &[request_id.to_string()],
1215                record.reason.clone(),
1216            );
1217        }
1218    }
1219    hitl_cancelled_error(
1220        request_id,
1221        kind,
1222        "",
1223        &[request_id.to_string()],
1224        Some("upstream_cancelled".to_string()),
1225    )
1226}
1227
1228async fn append_timeout_once(
1229    log: &Arc<AnyEventLog>,
1230    kind: HitlRequestKind,
1231    request_id: &str,
1232    trace_id: &str,
1233) -> Result<(), VmError> {
1234    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1235        return Ok(());
1236    }
1237    append_timeout(log, kind, request_id, trace_id).await
1238}
1239
1240async fn hitl_event_exists(
1241    log: &Arc<AnyEventLog>,
1242    kind: HitlRequestKind,
1243    request_id: &str,
1244    event_kind: &str,
1245) -> Result<bool, VmError> {
1246    let topic = topic(kind)?;
1247    let events = log
1248        .read_range(&topic, None, usize::MAX)
1249        .await
1250        .map_err(log_error)?;
1251    Ok(events
1252        .into_iter()
1253        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1254}
1255
1256fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1257    match kind {
1258        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1259        _ => "hitl.approval_approved",
1260    }
1261}
1262
1263fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1264    match kind {
1265        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1266        _ => "hitl.approval_denied",
1267    }
1268}
1269
1270async fn append_request(
1271    log: &Arc<AnyEventLog>,
1272    request: &HitlRequestEnvelope,
1273) -> Result<(), VmError> {
1274    let topic = topic(request.kind)?;
1275    log.append(
1276        &topic,
1277        LogEvent::new(
1278            request.kind.request_event_kind(),
1279            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1280        )
1281        .with_headers(request_headers(request)),
1282    )
1283    .await
1284    .map(|_| ())
1285    .map_err(log_error)
1286}
1287
1288async fn append_named_event(
1289    log: &Arc<AnyEventLog>,
1290    kind: HitlRequestKind,
1291    event_kind: &str,
1292    request_id: &str,
1293    trace_id: &str,
1294    payload: JsonValue,
1295) -> Result<(), VmError> {
1296    let topic = topic(kind)?;
1297    let headers = headers_with_trace(request_id, trace_id);
1298    log.append(
1299        &topic,
1300        LogEvent::new(event_kind, payload).with_headers(headers),
1301    )
1302    .await
1303    .map(|_| ())
1304    .map_err(log_error)
1305}
1306
1307async fn append_timeout(
1308    log: &Arc<AnyEventLog>,
1309    kind: HitlRequestKind,
1310    request_id: &str,
1311    trace_id: &str,
1312) -> Result<(), VmError> {
1313    append_named_event(
1314        log,
1315        kind,
1316        "hitl.timeout",
1317        request_id,
1318        trace_id,
1319        serde_json::to_value(HitlTimeoutRecord {
1320            request_id: request_id.to_string(),
1321            kind,
1322            trace_id: trace_id.to_string(),
1323            timed_out_at: now_rfc3339(),
1324        })
1325        .map_err(|error| VmError::Runtime(error.to_string()))?,
1326    )
1327    .await
1328}
1329
1330async fn maybe_apply_mock_response(
1331    kind: HitlRequestKind,
1332    request_id: &str,
1333    request_payload: &JsonValue,
1334) -> Result<(), VmError> {
1335    let mut params = request_payload
1336        .as_object()
1337        .cloned()
1338        .unwrap_or_default()
1339        .into_iter()
1340        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1341        .collect::<BTreeMap<_, _>>();
1342    params.insert(
1343        "request_id".to_string(),
1344        VmValue::String(Rc::from(request_id.to_string())),
1345    );
1346    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1347        return Ok(());
1348    };
1349    let value = result?;
1350    let responses = match value {
1351        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1352        other => vec![other],
1353    };
1354    for response in responses {
1355        let response_dict = response.as_dict().ok_or_else(|| {
1356            VmError::Runtime(format!(
1357                "mocked HITL {} response must be a dict or list<dict>",
1358                kind.as_str()
1359            ))
1360        })?;
1361        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1362        append_hitl_response(None, hitl_response)
1363            .await
1364            .map_err(VmError::Runtime)?;
1365    }
1366    Ok(())
1367}
1368
1369fn parse_hitl_response_dict(
1370    request_id: &str,
1371    response_dict: &BTreeMap<String, VmValue>,
1372) -> Result<HitlHostResponse, VmError> {
1373    Ok(HitlHostResponse {
1374        request_id: request_id.to_string(),
1375        answer: response_dict
1376            .get("answer")
1377            .map(crate::llm::vm_value_to_json),
1378        approved: response_dict.get("approved").and_then(vm_bool),
1379        accepted: response_dict.get("accepted").and_then(vm_bool),
1380        reviewer: response_dict.get("reviewer").map(VmValue::display),
1381        reason: response_dict.get("reason").map(VmValue::display),
1382        metadata: response_dict
1383            .get("metadata")
1384            .map(crate::llm::vm_value_to_json),
1385        responded_at: response_dict.get("responded_at").map(VmValue::display),
1386        signature: response_dict.get("signature").map(VmValue::display),
1387    })
1388}
1389
1390fn maybe_notify_host(request: &HitlRequestEnvelope) {
1391    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1392        return;
1393    };
1394    bridge.notify(
1395        "harn.hitl.requested",
1396        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1397    );
1398}
1399
1400/// Emit a `HitlRequested` `AgentEvent` so transport adapters
1401/// (currently the A2A `A2aWorkerSink`) can flip a task into
1402/// `input-required` while the script is suspended on the waitpoint.
1403/// No-op when there is no current agent session — the bridge-level
1404/// `harn.hitl.requested` notification still fires for hosts that drive
1405/// HITL UX through the bridge.
1406fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1407    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1408        return;
1409    };
1410    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1411        session_id,
1412        request_id: request.request_id.clone(),
1413        kind: request.kind.as_str().to_string(),
1414        payload: request.payload.clone(),
1415    });
1416}
1417
1418/// Companion to `emit_hitl_requested`: notifies sinks that the
1419/// suspended waitpoint has resolved so a paused task can flip back
1420/// out of `input-required`. `outcome` is one of `"answered"`,
1421/// `"timeout"`, `"cancelled"`, or `"error"`.
1422fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1423    let Some(session_id) = crate::agent_sessions::current_session_id() else {
1424        return;
1425    };
1426    crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1427        session_id,
1428        request_id: request_id.to_string(),
1429        kind: kind.as_str().to_string(),
1430        outcome: outcome.to_string(),
1431    });
1432}
1433
1434/// Wrapper around `wait_for_request_waitpoint` that emits the
1435/// canonical `HitlResolved` `AgentEvent` regardless of which terminal
1436/// branch the waitpoint takes (response / timeout / cancellation /
1437/// error). Pair-emitted with `emit_hitl_requested` so transport
1438/// adapters can bracket the `input-required` pause cleanly without
1439/// each `*_impl` having to duplicate the emission at every match arm.
1440async fn wait_for_request_waitpoint_with_events(
1441    request_id: &str,
1442    kind: HitlRequestKind,
1443    timeout: Option<StdDuration>,
1444) -> Result<WaitpointOutcome, VmError> {
1445    let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1446    let label = match &outcome {
1447        Ok(WaitpointOutcome::Completed(_)) => "answered",
1448        Ok(WaitpointOutcome::Timeout) => "timeout",
1449        Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1450        Err(_) => "error",
1451    };
1452    emit_hitl_resolved(request_id, kind, label);
1453    outcome
1454}
1455
1456fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1457    let Some(value) = value else {
1458        return Ok(AskUserOptions {
1459            schema: None,
1460            timeout: Some(default_question_timeout()),
1461            default: None,
1462        });
1463    };
1464    let dict = value
1465        .as_dict()
1466        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1467    Ok(AskUserOptions {
1468        schema: dict
1469            .get("schema")
1470            .cloned()
1471            .filter(|value| !matches!(value, VmValue::Nil)),
1472        timeout: dict
1473            .get("timeout")
1474            .map(parse_duration_value)
1475            .transpose()?
1476            .or_else(|| Some(default_question_timeout())),
1477        default: dict
1478            .get("default")
1479            .cloned()
1480            .filter(|value| !matches!(value, VmValue::Nil)),
1481    })
1482}
1483
1484fn default_question_timeout() -> StdDuration {
1485    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1486}
1487
1488fn escalation_capability_policy() -> JsonValue {
1489    crate::orchestration::current_execution_policy()
1490        .and_then(|policy| serde_json::to_value(policy).ok())
1491        .unwrap_or(JsonValue::Null)
1492}
1493
1494fn parse_approval_options(
1495    value: Option<&VmValue>,
1496    builtin: &str,
1497) -> Result<ApprovalOptions, VmError> {
1498    let dict = match value {
1499        None => None,
1500        Some(VmValue::Dict(dict)) => Some(dict),
1501        Some(_) => {
1502            return Err(VmError::Runtime(format!(
1503                "{builtin}: options must be a dict"
1504            )))
1505        }
1506    };
1507    let quorum = dict
1508        .and_then(|dict| dict.get("quorum"))
1509        .and_then(VmValue::as_int)
1510        .unwrap_or(1);
1511    if quorum <= 0 {
1512        return Err(VmError::Runtime(format!(
1513            "{builtin}: quorum must be positive"
1514        )));
1515    }
1516    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1517    let capabilities_requested = optional_string_list(
1518        dict.and_then(|dict| dict.get("capabilities_requested")),
1519        builtin,
1520    )?;
1521    let evidence_refs = dict
1522        .and_then(|dict| dict.get("evidence_refs"))
1523        .map(|value| match value {
1524            VmValue::List(items) => Ok(items
1525                .iter()
1526                .map(crate::llm::vm_value_to_json)
1527                .collect::<Vec<_>>()),
1528            _ => Err(VmError::Runtime(format!(
1529                "{builtin}: evidence_refs must be a list"
1530            ))),
1531        })
1532        .transpose()?
1533        .unwrap_or_default();
1534    let deadline = dict
1535        .and_then(|dict| dict.get("deadline"))
1536        .map(parse_duration_value)
1537        .transpose()?
1538        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1539    Ok(ApprovalOptions {
1540        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1541        args: dict.and_then(|dict| dict.get("args")).cloned(),
1542        quorum: quorum as u32,
1543        reviewers,
1544        deadline,
1545        principal: dict
1546            .and_then(|dict| dict.get("principal"))
1547            .map(VmValue::display)
1548            .filter(|value| !value.is_empty()),
1549        evidence_refs,
1550        undo_metadata: dict
1551            .and_then(|dict| dict.get("undo_metadata"))
1552            .map(crate::llm::vm_value_to_json),
1553        capabilities_requested,
1554    })
1555}
1556
1557fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1558    args.get(idx)
1559        .map(VmValue::display)
1560        .filter(|value| !value.is_empty())
1561        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1562}
1563
1564fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1565    let value = args
1566        .get(idx)
1567        .and_then(VmValue::as_int)
1568        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1569    if value <= 0 {
1570        return Err(VmError::Runtime(format!(
1571            "{builtin}: expected a positive int at {idx}"
1572        )));
1573    }
1574    Ok(value)
1575}
1576
1577fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1578    let Some(value) = value else {
1579        return Ok(Vec::new());
1580    };
1581    match value {
1582        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1583        _ => Err(VmError::Runtime(format!(
1584            "{builtin}: expected list<string>"
1585        ))),
1586    }
1587}
1588
1589fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1590    match value {
1591        VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1592        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1593        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1594        _ => Err(VmError::Runtime(
1595            "expected a duration or millisecond count".to_string(),
1596        )),
1597    }
1598}
1599
1600fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1601    active_event_log()
1602        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1603}
1604
1605fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1606    if let Some(log) = active_event_log() {
1607        return Ok(log);
1608    }
1609    let Some(base_dir) = base_dir else {
1610        return Ok(install_memory_for_current_thread(
1611            HITL_EVENT_LOG_QUEUE_DEPTH,
1612        ));
1613    };
1614    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1615}
1616
1617fn current_dispatch_keys() -> Option<DispatchKeys> {
1618    let context = current_dispatch_context()?;
1619    let stable_base = context
1620        .replay_of_event_id
1621        .clone()
1622        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1623    let instance_key = format!(
1624        "{}::{}",
1625        context.trigger_event.id.0,
1626        context.replay_of_event_id.as_deref().unwrap_or("live")
1627    );
1628    Some(DispatchKeys {
1629        instance_key,
1630        stable_base,
1631        agent: context.agent_id,
1632        trace_id: context.trigger_event.trace_id.0,
1633    })
1634}
1635
1636fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1637    if let Some(keys) = dispatch_keys {
1638        let seq = REQUEST_SEQUENCE.with(|slot| {
1639            let mut state = slot.borrow_mut();
1640            if state.instance_key != keys.instance_key {
1641                state.instance_key = keys.instance_key.clone();
1642                state.next_seq = 0;
1643            }
1644            state.next_seq += 1;
1645            state.next_seq
1646        });
1647        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1648    }
1649    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1650}
1651
1652fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1653    let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1654    if let Some(run_id) = request.run_id.as_ref() {
1655        headers.insert("run_id".to_string(), run_id.clone());
1656    }
1657    headers
1658}
1659
1660fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1661    let mut headers = BTreeMap::new();
1662    headers.insert("request_id".to_string(), request_id.to_string());
1663    headers
1664}
1665
1666fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1667    let mut headers = response_headers(request_id);
1668    headers.insert("trace_id".to_string(), trace_id.to_string());
1669    headers
1670}
1671
1672fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1673    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1674}
1675
1676fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1677    event
1678        .headers
1679        .get("request_id")
1680        .is_some_and(|value| value == request_id)
1681        || event
1682            .payload
1683            .get("request_id")
1684            .and_then(JsonValue::as_str)
1685            .is_some_and(|value| value == request_id)
1686}
1687
1688fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1689    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1690        "name": "ApprovalDeniedError",
1691        "category": "generic",
1692        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1693        "request_id": request_id,
1694        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1695        "reason": response.reason,
1696    })))
1697}
1698
1699fn hitl_cancelled_error(
1700    request_id: &str,
1701    kind: HitlRequestKind,
1702    wait_id: &str,
1703    waitpoint_ids: &[String],
1704    reason: Option<String>,
1705) -> VmError {
1706    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1707    let message = reason
1708        .clone()
1709        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1710    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1711        "name": "HumanCancelledError",
1712        "category": ErrorCategory::Cancelled.as_str(),
1713        "message": message,
1714        "request_id": request_id,
1715        "kind": kind.as_str(),
1716        "wait_id": wait_id,
1717        "waitpoint_ids": waitpoint_ids,
1718        "reason": reason,
1719    })))
1720}
1721
1722fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1723    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1724    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1725        "name": "HumanTimeoutError",
1726        "category": ErrorCategory::Timeout.as_str(),
1727        "message": format!("{} timed out", kind.as_str()),
1728        "request_id": request_id,
1729        "kind": kind.as_str(),
1730    })))
1731}
1732
1733fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1734    match default {
1735        VmValue::Int(_) => match value {
1736            VmValue::Int(_) => value.clone(),
1737            VmValue::Float(number) => VmValue::Int(*number as i64),
1738            VmValue::String(text) => text
1739                .parse::<i64>()
1740                .map(VmValue::Int)
1741                .unwrap_or_else(|_| default.clone()),
1742            _ => default.clone(),
1743        },
1744        VmValue::Float(_) => match value {
1745            VmValue::Float(_) => value.clone(),
1746            VmValue::Int(number) => VmValue::Float(*number as f64),
1747            VmValue::String(text) => text
1748                .parse::<f64>()
1749                .map(VmValue::Float)
1750                .unwrap_or_else(|_| default.clone()),
1751            _ => default.clone(),
1752        },
1753        VmValue::Bool(_) => match value {
1754            VmValue::Bool(_) => value.clone(),
1755            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1756            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1757            _ => default.clone(),
1758        },
1759        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1760        VmValue::Duration(_) => match value {
1761            VmValue::Duration(_) => value.clone(),
1762            VmValue::Int(ms) => VmValue::Duration(*ms),
1763            _ => default.clone(),
1764        },
1765        VmValue::Nil => value.clone(),
1766        _ => {
1767            if value.type_name() == default.type_name() {
1768                value.clone()
1769            } else {
1770                default.clone()
1771            }
1772        }
1773    }
1774}
1775
1776fn log_error(error: impl std::fmt::Display) -> VmError {
1777    VmError::Runtime(error.to_string())
1778}
1779
1780fn now_rfc3339() -> String {
1781    format_rfc3339(OffsetDateTime::now_utc())
1782}
1783
1784fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1785    timestamp
1786        .format(&Rfc3339)
1787        .unwrap_or_else(|_| timestamp.to_string())
1788}
1789
1790fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1791    time::Duration::try_from(duration)
1792        .ok()
1793        .map(|duration| format_rfc3339(requested_at + duration))
1794}
1795
1796fn new_trace_id() -> String {
1797    format!("trace_{}", Uuid::now_v7())
1798}
1799
1800fn vm_bool(value: &VmValue) -> Option<bool> {
1801    match value {
1802        VmValue::Bool(flag) => Some(*flag),
1803        _ => None,
1804    }
1805}
1806
1807fn vm_string(value: &VmValue) -> Option<&str> {
1808    match value {
1809        VmValue::String(text) => Some(text.as_ref()),
1810        _ => None,
1811    }
1812}
1813
1814fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1815    match value {
1816        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1817        _ => None,
1818    }
1819}
1820
1821#[cfg(test)]
1822mod tests {
1823    use super::{
1824        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1825    };
1826    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1827    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1828
1829    async fn execute_hitl_script(
1830        base_dir: &std::path::Path,
1831        source: &str,
1832    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1833        reset_thread_local_state();
1834        let log = install_default_for_base_dir(base_dir).expect("install event log");
1835        let chunk = compile_source(source).expect("compile source");
1836        let mut vm = Vm::new();
1837        register_vm_stdlib(&mut vm);
1838        vm.set_source_dir(base_dir);
1839        vm.execute(&chunk).await?;
1840        let output = vm.output().trim_end().to_string();
1841        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1842        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1843        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1844        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1845        Ok((
1846            output,
1847            question_events,
1848            approval_events,
1849            dual_control_events,
1850            escalation_events,
1851        ))
1852    }
1853
1854    async fn event_kinds(
1855        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1856        topic: &str,
1857    ) -> Vec<String> {
1858        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1859            .await
1860            .expect("read topic")
1861            .into_iter()
1862            .map(|(_, event)| event.kind)
1863            .collect()
1864    }
1865
1866    async fn event_payloads(
1867        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1868        topic: &str,
1869    ) -> Vec<serde_json::Value> {
1870        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1871            .await
1872            .expect("read topic")
1873            .into_iter()
1874            .map(|(_, event)| event.payload)
1875            .collect()
1876    }
1877
1878    #[tokio::test(flavor = "current_thread")]
1879    async fn ask_user_coerces_to_default_type_and_logs_events() {
1880        tokio::task::LocalSet::new()
1881            .run_until(async {
1882                let dir = tempfile::tempdir().expect("tempdir");
1883                let source = r#"
1884pipeline test(task) {
1885  host_mock("hitl", "question", {answer: "9"})
1886  let answer: int = ask_user("Pick a number", {default: 0})
1887  println(answer)
1888}
1889"#;
1890                let (
1891                    output,
1892                    question_events,
1893                    approval_events,
1894                    dual_control_events,
1895                    escalation_events,
1896                ) = execute_hitl_script(dir.path(), source)
1897                    .await
1898                    .expect("script succeeds");
1899                assert_eq!(output, "9");
1900                assert_eq!(
1901                    question_events,
1902                    vec![
1903                        "hitl.question_asked".to_string(),
1904                        "hitl.response_received".to_string()
1905                    ]
1906                );
1907                assert!(approval_events.is_empty());
1908                assert!(dual_control_events.is_empty());
1909                assert!(escalation_events.is_empty());
1910            })
1911            .await;
1912    }
1913
1914    #[tokio::test(flavor = "current_thread")]
1915    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1916        tokio::task::LocalSet::new()
1917            .run_until(async {
1918                let dir = tempfile::tempdir().expect("tempdir");
1919                let source = r#"
1920pipeline test(task) {
1921  host_mock("hitl", "approval", [
1922    {approved: true, reviewer: "alice", reason: "ok"},
1923    {approved: true, reviewer: "bob", reason: "ship it"},
1924  ])
1925  let record = request_approval(
1926    "deploy production",
1927    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1928  )
1929  println(record.approved)
1930  println(len(record.reviewers))
1931  println(record.reviewers[0])
1932  println(record.reviewers[1])
1933}
1934"#;
1935                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1936                    .await
1937                    .expect("script succeeds");
1938                assert_eq!(
1939                    approval_events,
1940                    vec![
1941                        "hitl.approval_requested".to_string(),
1942                        "hitl.response_received".to_string(),
1943                        "hitl.response_received".to_string(),
1944                        "hitl.approval_approved".to_string(),
1945                    ]
1946                );
1947            })
1948            .await;
1949    }
1950
1951    #[tokio::test(flavor = "current_thread")]
1952    async fn request_approval_emits_canonical_approval_request_payload() {
1953        tokio::task::LocalSet::new()
1954            .run_until(async {
1955                reset_thread_local_state();
1956                let dir = tempfile::tempdir().expect("tempdir");
1957                let log = install_default_for_base_dir(dir.path()).expect("install event log");
1958                let source = r#"
1959pipeline test(task) {
1960  host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
1961  request_approval("deploy production", {
1962    args: {environment: "prod"},
1963    quorum: 1,
1964    reviewers: ["alice"],
1965    evidence_refs: [{kind: "run", uri: "run_123"}],
1966    undo_metadata: {strategy: "rollback"},
1967    capabilities_requested: ["deploy.production"],
1968  })
1969}
1970"#;
1971                let chunk = compile_source(source).expect("compile source");
1972                let mut vm = Vm::new();
1973                register_vm_stdlib(&mut vm);
1974                vm.set_source_dir(dir.path());
1975                vm.execute(&chunk).await.expect("script succeeds");
1976
1977                let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
1978                let request_payload = &payloads[0]["payload"];
1979                let approval_request = &request_payload["approval_request"];
1980                assert_eq!(approval_request["id"], request_payload["id"]);
1981                assert_eq!(approval_request["action"], "deploy production");
1982                assert_eq!(approval_request["args"]["environment"], "prod");
1983                assert_eq!(approval_request["approvers_required"], 1);
1984                assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
1985                assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
1986                assert_eq!(
1987                    approval_request["capabilities_requested"][0],
1988                    "deploy.production"
1989                );
1990                assert!(approval_request["requested_at"].as_str().is_some());
1991                assert!(approval_request["deadline"].as_str().is_some());
1992            })
1993            .await;
1994    }
1995
1996    #[tokio::test(flavor = "current_thread")]
1997    async fn request_approval_surfaces_denials_as_typed_errors() {
1998        tokio::task::LocalSet::new()
1999            .run_until(async {
2000                let dir = tempfile::tempdir().expect("tempdir");
2001                let source = r#"
2002pipeline test(task) {
2003  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2004  let denied = try {
2005    request_approval("drop table", {reviewers: ["alice"]})
2006  }
2007  println(is_err(denied))
2008  println(unwrap_err(denied).name)
2009  println(unwrap_err(denied).reason)
2010}
2011"#;
2012                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2013                    .await
2014                    .expect("script succeeds");
2015                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2016                assert_eq!(
2017                    approval_events,
2018                    vec![
2019                        "hitl.approval_requested".to_string(),
2020                        "hitl.response_received".to_string(),
2021                        "hitl.approval_denied".to_string(),
2022                    ]
2023                );
2024            })
2025            .await;
2026    }
2027
2028    #[tokio::test(flavor = "current_thread")]
2029    async fn dual_control_executes_action_after_quorum() {
2030        tokio::task::LocalSet::new()
2031            .run_until(async {
2032                let dir = tempfile::tempdir().expect("tempdir");
2033                let source = r#"
2034pipeline test(task) {
2035  host_mock("hitl", "dual_control", [
2036    {approved: true, reviewer: "alice"},
2037    {approved: true, reviewer: "bob"},
2038  ])
2039  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2040  println(result)
2041}
2042"#;
2043                let (output, _, _, dual_control_events, _) =
2044                    execute_hitl_script(dir.path(), source)
2045                        .await
2046                        .expect("script succeeds");
2047                assert_eq!(output, "launched");
2048                assert_eq!(
2049                    dual_control_events,
2050                    vec![
2051                        "hitl.dual_control_requested".to_string(),
2052                        "hitl.response_received".to_string(),
2053                        "hitl.response_received".to_string(),
2054                        "hitl.dual_control_approved".to_string(),
2055                        "hitl.dual_control_executed".to_string(),
2056                    ]
2057                );
2058            })
2059            .await;
2060    }
2061
2062    #[tokio::test(flavor = "current_thread")]
2063    async fn escalate_to_waits_for_acceptance_event() {
2064        tokio::task::LocalSet::new()
2065            .run_until(async {
2066                let dir = tempfile::tempdir().expect("tempdir");
2067                let source = r#"
2068pipeline test(task) {
2069  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2070  let handle = escalate_to("admin", "need override")
2071  println(handle.status)
2072  println(handle.reviewer)
2073}
2074"#;
2075                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2076                    .await
2077                    .expect("script succeeds");
2078                assert_eq!(output, "accepted\nlead");
2079                assert_eq!(
2080                    escalation_events,
2081                    vec![
2082                        "hitl.escalation_issued".to_string(),
2083                        "hitl.escalation_accepted".to_string(),
2084                    ]
2085                );
2086            })
2087            .await;
2088    }
2089
2090    /// `harn-serve` adapters (A2A `input-required`, ACP `hitl_request`)
2091    /// rely on the canonical `AgentEvent::HitlRequested` /
2092    /// `AgentEvent::HitlResolved` pair to bracket every HITL pause.
2093    /// Pin the contract here so future HITL primitives keep emitting
2094    /// the event around their waitpoint blocks.
2095    #[tokio::test(flavor = "current_thread")]
2096    async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2097        use std::sync::Mutex as StdMutex;
2098
2099        tokio::task::LocalSet::new()
2100            .run_until(async {
2101                let dir = tempfile::tempdir().expect("tempdir");
2102                let session_id = "hitl-session".to_string();
2103                let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2104                    std::sync::Arc::new(StdMutex::new(Vec::new()));
2105
2106                struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2107                impl crate::agent_events::AgentEventSink for CaptureSink {
2108                    fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2109                        self.0.lock().expect("captured").push(event.clone());
2110                    }
2111                }
2112
2113                // Inline the script setup rather than using the
2114                // `execute_hitl_script` helper: that helper calls
2115                // `reset_thread_local_state` (which wipes the session
2116                // store), so any session pushed before it would be
2117                // gone by the time `ask_user` runs.
2118                crate::reset_thread_local_state();
2119                crate::event_log::install_default_for_base_dir(dir.path())
2120                    .expect("install event log");
2121
2122                crate::agent_events::reset_all_sinks();
2123                let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2124                    std::sync::Arc::new(CaptureSink(captured.clone()));
2125                crate::agent_events::register_sink(session_id.clone(), sink);
2126                crate::agent_sessions::open_or_create(Some(session_id.clone()));
2127                let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2128
2129                let source = r#"
2130pipeline test(task) {
2131  host_mock("hitl", "question", {answer: "ok"})
2132  let answer: string = ask_user("Are you sure?", {default: "no"})
2133  println(answer)
2134}
2135"#;
2136                let chunk = crate::compile_source(source).expect("compile source");
2137                let mut vm = Vm::new();
2138                register_vm_stdlib(&mut vm);
2139                vm.set_source_dir(dir.path());
2140                vm.execute(&chunk).await.expect("script runs");
2141                assert_eq!(vm.output().trim_end(), "ok");
2142
2143                let events = captured.lock().expect("captured");
2144                let mut iter = events.iter().filter(|event| {
2145                    matches!(
2146                        event,
2147                        crate::agent_events::AgentEvent::HitlRequested { .. }
2148                            | crate::agent_events::AgentEvent::HitlResolved { .. }
2149                    )
2150                });
2151                let requested = iter.next().expect("HitlRequested emitted");
2152                let resolved = iter.next().expect("HitlResolved emitted");
2153                assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2154
2155                let crate::agent_events::AgentEvent::HitlRequested {
2156                    session_id: req_session,
2157                    request_id: req_id,
2158                    kind: req_kind,
2159                    payload,
2160                } = requested
2161                else {
2162                    panic!("expected HitlRequested, got: {requested:?}");
2163                };
2164                assert_eq!(req_session, &session_id);
2165                assert_eq!(req_kind, "question");
2166                assert!(req_id.starts_with("hitl_question_"));
2167                assert_eq!(payload["prompt"], "Are you sure?");
2168
2169                let crate::agent_events::AgentEvent::HitlResolved {
2170                    request_id: res_id,
2171                    kind: res_kind,
2172                    outcome,
2173                    ..
2174                } = resolved
2175                else {
2176                    panic!("expected HitlResolved, got: {resolved:?}");
2177                };
2178                assert_eq!(res_id, req_id);
2179                assert_eq!(res_kind, "question");
2180                assert_eq!(outcome, "answered");
2181
2182                drop(_guard);
2183                crate::agent_events::reset_all_sinks();
2184            })
2185            .await;
2186    }
2187}