Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use time::format_description::well_known::Rfc3339;
11use time::OffsetDateTime;
12use uuid::Uuid;
13
14use crate::event_log::{
15    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
16    EventLog, LogEvent, Topic,
17};
18use crate::schema::schema_expect_value;
19use crate::stdlib::host::dispatch_mock_host_call;
20use crate::stdlib::waitpoint::{
21    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
22    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
23    WaitpointWaitOptions,
24};
25use crate::triggers::dispatcher::current_dispatch_context;
26use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
27use crate::vm::{clone_async_builtin_child_vm, Vm};
28
29const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
30const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
31
32pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
33pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
34pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
35pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
36
37thread_local! {
38    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
39}
40
41#[derive(Default)]
42pub(crate) struct RequestSequenceState {
43    pub(crate) instance_key: String,
44    pub(crate) next_seq: u64,
45}
46
47#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum HitlRequestKind {
50    Question,
51    Approval,
52    DualControl,
53    Escalation,
54}
55
56impl HitlRequestKind {
57    pub(crate) fn as_str(self) -> &'static str {
58        match self {
59            Self::Question => "question",
60            Self::Approval => "approval",
61            Self::DualControl => "dual_control",
62            Self::Escalation => "escalation",
63        }
64    }
65
66    fn topic(self) -> &'static str {
67        match self {
68            Self::Question => HITL_QUESTIONS_TOPIC,
69            Self::Approval => HITL_APPROVALS_TOPIC,
70            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
71            Self::Escalation => HITL_ESCALATIONS_TOPIC,
72        }
73    }
74
75    fn request_event_kind(self) -> &'static str {
76        match self {
77            Self::Question => "hitl.question_asked",
78            Self::Approval => "hitl.approval_requested",
79            Self::DualControl => "hitl.dual_control_requested",
80            Self::Escalation => "hitl.escalation_issued",
81        }
82    }
83
84    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
85        if request_id.starts_with("hitl_question_") {
86            Some(Self::Question)
87        } else if request_id.starts_with("hitl_approval_") {
88            Some(Self::Approval)
89        } else if request_id.starts_with("hitl_dual_control_") {
90            Some(Self::DualControl)
91        } else if request_id.starts_with("hitl_escalation_") {
92            Some(Self::Escalation)
93        } else {
94            None
95        }
96    }
97}
98
99#[derive(Clone, Debug, Serialize, Deserialize)]
100pub struct HitlHostResponse {
101    pub request_id: String,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub answer: Option<JsonValue>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub approved: Option<bool>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub accepted: Option<bool>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub reviewer: Option<String>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub reason: Option<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub metadata: Option<JsonValue>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub responded_at: Option<String>,
116}
117
118#[derive(Clone, Debug, Serialize, Deserialize)]
119struct HitlRequestEnvelope {
120    request_id: String,
121    kind: HitlRequestKind,
122    #[serde(default)]
123    agent: String,
124    trace_id: String,
125    requested_at: String,
126    payload: JsonValue,
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize)]
130struct HitlTimeoutRecord {
131    request_id: String,
132    kind: HitlRequestKind,
133    trace_id: String,
134    timed_out_at: String,
135}
136
137#[derive(Clone, Debug)]
138struct DispatchKeys {
139    instance_key: String,
140    stable_base: String,
141    agent: String,
142    trace_id: String,
143}
144
145#[derive(Clone, Debug)]
146struct AskUserOptions {
147    schema: Option<VmValue>,
148    timeout: Option<StdDuration>,
149    default: Option<VmValue>,
150}
151
152#[derive(Clone, Debug)]
153struct ApprovalOptions {
154    detail: Option<VmValue>,
155    quorum: u32,
156    reviewers: Vec<String>,
157    deadline: StdDuration,
158}
159
160#[derive(Clone, Debug)]
161struct ApprovalProgress {
162    reviewers: BTreeSet<String>,
163    reason: Option<String>,
164    approved_at: Option<String>,
165}
166
167#[derive(Clone, Debug)]
168enum ApprovalResolution {
169    Pending,
170    Approved(ApprovalProgress),
171    Denied(HitlHostResponse),
172}
173
174#[derive(Clone, Debug)]
175enum WaitpointOutcome {
176    Completed(WaitpointRecord),
177    Timeout,
178    Cancelled {
179        wait_id: String,
180        waitpoint_ids: Vec<String>,
181        reason: Option<String>,
182    },
183}
184
185pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
186    vm.register_async_builtin("ask_user", |args| {
187        Box::pin(async move { ask_user_impl(&args).await })
188    });
189
190    vm.register_async_builtin("request_approval", |args| {
191        Box::pin(async move { request_approval_impl(&args).await })
192    });
193
194    vm.register_async_builtin("dual_control", |args| {
195        Box::pin(async move { dual_control_impl(&args).await })
196    });
197
198    vm.register_async_builtin("escalate_to", |args| {
199        Box::pin(async move { escalate_to_impl(&args).await })
200    });
201}
202
203pub(crate) fn reset_hitl_state() {
204    REQUEST_SEQUENCE.with(|slot| {
205        *slot.borrow_mut() = RequestSequenceState::default();
206    });
207}
208
209pub(crate) fn take_hitl_state() -> RequestSequenceState {
210    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
211}
212
213pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
214    REQUEST_SEQUENCE.with(|slot| {
215        *slot.borrow_mut() = state;
216    });
217}
218
219pub async fn append_hitl_response(
220    base_dir: Option<&Path>,
221    mut response: HitlHostResponse,
222) -> Result<u64, String> {
223    let kind = HitlRequestKind::from_request_id(&response.request_id)
224        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
225    if response.responded_at.is_none() {
226        response.responded_at = Some(now_rfc3339());
227    }
228    let log = ensure_hitl_event_log_for(base_dir)?;
229    let headers = response_headers(&response.request_id);
230    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
231    let event_id = log
232        .append(
233            &topic,
234            LogEvent::new(
235                match kind {
236                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
237                    _ => "hitl.response_received",
238                },
239                serde_json::to_value(&response).map_err(|error| error.to_string())?,
240            )
241            .with_headers(headers),
242        )
243        .await
244        .map_err(|error| error.to_string())?;
245    finalize_hitl_response(&log, kind, &response).await?;
246    Ok(event_id)
247}
248
249async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
250    let prompt = required_string_arg(args, 0, "ask_user")?;
251    let options = parse_ask_user_options(args.get(1))?;
252    let keys = current_dispatch_keys();
253    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
254    let trace_id = keys
255        .as_ref()
256        .map(|keys| keys.trace_id.clone())
257        .unwrap_or_else(new_trace_id);
258    let log = ensure_hitl_event_log();
259    let request = HitlRequestEnvelope {
260        request_id: request_id.clone(),
261        kind: HitlRequestKind::Question,
262        agent: keys
263            .as_ref()
264            .map(|keys| keys.agent.clone())
265            .unwrap_or_default(),
266        trace_id: trace_id.clone(),
267        requested_at: now_rfc3339(),
268        payload: json!({
269            "prompt": prompt,
270            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
271            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
272            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
273        }),
274    };
275    create_request_waitpoint(&log, &request).await?;
276    append_request(&log, &request).await?;
277    maybe_notify_host(&request);
278    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
279
280    match wait_for_request_waitpoint(&request_id, options.timeout).await? {
281        WaitpointOutcome::Completed(record) => {
282            let answer = record
283                .value
284                .as_ref()
285                .map(crate::stdlib::json_to_vm_value)
286                .unwrap_or(VmValue::Nil);
287            if let Some(schema) = options.schema.as_ref() {
288                return schema_expect_value(&answer, schema, true);
289            }
290            if let Some(default) = options.default.as_ref() {
291                return Ok(coerce_like_default(&answer, default));
292            }
293            Ok(answer)
294        }
295        WaitpointOutcome::Timeout => {
296            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
297            if let Some(default) = options.default {
298                return Ok(default);
299            }
300            Err(timeout_error(&request_id, HitlRequestKind::Question))
301        }
302        WaitpointOutcome::Cancelled {
303            wait_id,
304            waitpoint_ids,
305            reason,
306        } => Err(hitl_cancelled_error(
307            &request_id,
308            HitlRequestKind::Question,
309            &wait_id,
310            &waitpoint_ids,
311            reason,
312        )),
313    }
314}
315
316async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
317    let action = required_string_arg(args, 0, "request_approval")?;
318    let options = parse_approval_options(args.get(1), "request_approval")?;
319    let keys = current_dispatch_keys();
320    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
321    let trace_id = keys
322        .as_ref()
323        .map(|keys| keys.trace_id.clone())
324        .unwrap_or_else(new_trace_id);
325    let log = ensure_hitl_event_log();
326    let request = HitlRequestEnvelope {
327        request_id: request_id.clone(),
328        kind: HitlRequestKind::Approval,
329        agent: keys
330            .as_ref()
331            .map(|keys| keys.agent.clone())
332            .unwrap_or_default(),
333        trace_id: trace_id.clone(),
334        requested_at: now_rfc3339(),
335        payload: json!({
336            "action": action,
337            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
338            "quorum": options.quorum,
339            "reviewers": options.reviewers,
340            "deadline_ms": options.deadline.as_millis() as u64,
341        }),
342    };
343    create_request_waitpoint(&log, &request).await?;
344    append_request(&log, &request).await?;
345    maybe_notify_host(&request);
346    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
347
348    match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
349        WaitpointOutcome::Completed(record) => {
350            approval_record_from_waitpoint(&record, "request_approval")
351        }
352        WaitpointOutcome::Timeout => {
353            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
354            Err(timeout_error(&request_id, HitlRequestKind::Approval))
355        }
356        WaitpointOutcome::Cancelled { .. } => {
357            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
358        }
359    }
360}
361
362async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
363    let n = required_positive_int_arg(args, 0, "dual_control")?;
364    let m = required_positive_int_arg(args, 1, "dual_control")?;
365    if n > m {
366        return Err(VmError::Runtime(
367            "dual_control: n must be less than or equal to m".to_string(),
368        ));
369    }
370    let action = args
371        .get(2)
372        .and_then(|value| match value {
373            VmValue::Closure(closure) => Some(closure.clone()),
374            _ => None,
375        })
376        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
377    let approvers = optional_string_list(args.get(3), "dual_control")?;
378    if !approvers.is_empty() && approvers.len() < m as usize {
379        return Err(VmError::Runtime(format!(
380            "dual_control: expected at least {m} approvers, got {}",
381            approvers.len()
382        )));
383    }
384
385    let keys = current_dispatch_keys();
386    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
387    let trace_id = keys
388        .as_ref()
389        .map(|keys| keys.trace_id.clone())
390        .unwrap_or_else(new_trace_id);
391    let action_name = if action.func.name.is_empty() {
392        "anonymous".to_string()
393    } else {
394        action.func.name.clone()
395    };
396    let log = ensure_hitl_event_log();
397    let request = HitlRequestEnvelope {
398        request_id: request_id.clone(),
399        kind: HitlRequestKind::DualControl,
400        agent: keys
401            .as_ref()
402            .map(|keys| keys.agent.clone())
403            .unwrap_or_default(),
404        trace_id: trace_id.clone(),
405        requested_at: now_rfc3339(),
406        payload: json!({
407            "n": n,
408            "m": m,
409            "action": action_name,
410            "approvers": approvers,
411            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
412        }),
413    };
414    create_request_waitpoint(&log, &request).await?;
415    append_request(&log, &request).await?;
416    maybe_notify_host(&request);
417    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
418
419    match wait_for_request_waitpoint(
420        &request_id,
421        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
422    )
423    .await?
424    {
425        WaitpointOutcome::Completed(record) => {
426            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
427            let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
428                VmError::Runtime("dual_control requires an async builtin VM context".to_string())
429            })?;
430            let result = vm.call_closure_pub(&action, &[], &[]).await?;
431
432            append_named_event(
433                &log,
434                HitlRequestKind::DualControl,
435                "hitl.dual_control_executed",
436                &request_id,
437                &trace_id,
438                json!({
439                    "request_id": request_id,
440                    "result": crate::llm::vm_value_to_json(&result),
441                }),
442            )
443            .await?;
444
445            Ok(result)
446        }
447        WaitpointOutcome::Timeout => {
448            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
449            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
450        }
451        WaitpointOutcome::Cancelled { .. } => {
452            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
453        }
454    }
455}
456
457async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
458    let role = required_string_arg(args, 0, "escalate_to")?;
459    let reason = required_string_arg(args, 1, "escalate_to")?;
460    let keys = current_dispatch_keys();
461    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
462    let trace_id = keys
463        .as_ref()
464        .map(|keys| keys.trace_id.clone())
465        .unwrap_or_else(new_trace_id);
466    let log = ensure_hitl_event_log();
467    let request = HitlRequestEnvelope {
468        request_id: request_id.clone(),
469        kind: HitlRequestKind::Escalation,
470        agent: keys
471            .as_ref()
472            .map(|keys| keys.agent.clone())
473            .unwrap_or_default(),
474        trace_id: trace_id.clone(),
475        requested_at: now_rfc3339(),
476        payload: json!({
477            "role": role,
478            "reason": reason,
479        }),
480    };
481    create_request_waitpoint(&log, &request).await?;
482    append_request(&log, &request).await?;
483    maybe_notify_host(&request);
484    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
485
486    match wait_for_request_waitpoint(&request_id, None).await? {
487        WaitpointOutcome::Completed(record) => {
488            let accepted_at = record.completed_at.clone();
489            let reviewer = record.completed_by.clone();
490            let accepted = record
491                .value
492                .as_ref()
493                .and_then(|value| value.get("accepted"))
494                .and_then(JsonValue::as_bool)
495                .unwrap_or(true);
496            Ok(crate::stdlib::json_to_vm_value(&json!({
497                "request_id": request_id,
498                "role": role,
499                "reason": reason,
500                "trace_id": trace_id,
501                "status": if accepted { "accepted" } else { "pending" },
502                "accepted_at": accepted_at,
503                "reviewer": reviewer,
504            })))
505        }
506        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
507        WaitpointOutcome::Cancelled {
508            wait_id,
509            waitpoint_ids,
510            reason,
511        } => Err(hitl_cancelled_error(
512            &request_id,
513            HitlRequestKind::Escalation,
514            &wait_id,
515            &waitpoint_ids,
516            reason,
517        )),
518    }
519}
520
521async fn create_request_waitpoint(
522    log: &Arc<AnyEventLog>,
523    request: &HitlRequestEnvelope,
524) -> Result<(), VmError> {
525    create_waitpoint_on(
526        log,
527        Some(request.request_id.clone()),
528        Some(json!({
529            "kind": request.kind.as_str(),
530            "agent": request.agent.clone(),
531            "trace_id": request.trace_id.clone(),
532            "requested_at": request.requested_at.clone(),
533            "payload": request.payload.clone(),
534        })),
535    )
536    .await?;
537    Ok(())
538}
539
540async fn wait_for_request_waitpoint(
541    request_id: &str,
542    timeout: Option<StdDuration>,
543) -> Result<WaitpointOutcome, VmError> {
544    match wait_on_waitpoints(
545        vec![request_id.to_string()],
546        WaitpointWaitOptions { timeout },
547    )
548    .await
549    {
550        Ok(records) => Ok(WaitpointOutcome::Completed(
551            records
552                .into_iter()
553                .next()
554                .expect("single waitpoint wait result"),
555        )),
556        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
557        Err(WaitpointWaitFailure::Cancelled {
558            wait_id,
559            waitpoint_ids,
560            reason,
561        }) => Ok(WaitpointOutcome::Cancelled {
562            wait_id,
563            waitpoint_ids,
564            reason,
565        }),
566        Err(WaitpointWaitFailure::Vm(error)) => {
567            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
568                return Ok(outcome);
569            }
570            Err(error)
571        }
572    }
573}
574
575fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
576    let VmError::Thrown(VmValue::Dict(dict)) = error else {
577        return None;
578    };
579    let name = dict.get("name").and_then(vm_string)?;
580    match name {
581        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
582        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
583            wait_id: dict
584                .get("wait_id")
585                .and_then(vm_string)
586                .unwrap_or_default()
587                .to_string(),
588            waitpoint_ids: dict
589                .get("waitpoint_ids")
590                .and_then(vm_string_list)
591                .unwrap_or_default(),
592            reason: dict
593                .get("reason")
594                .and_then(vm_string)
595                .map(ToString::to_string),
596        }),
597        _ => None,
598    }
599}
600
601async fn finalize_hitl_response(
602    log: &Arc<AnyEventLog>,
603    kind: HitlRequestKind,
604    response: &HitlHostResponse,
605) -> Result<(), String> {
606    match kind {
607        HitlRequestKind::Question => {
608            if waitpoint_is_terminal(log, &response.request_id).await? {
609                return Ok(());
610            }
611            complete_waitpoint_on(
612                log,
613                &response.request_id,
614                response.answer.clone(),
615                response.reviewer.clone(),
616                response.reason.clone(),
617                response.metadata.clone(),
618            )
619            .await
620            .map(|_| ())
621            .map_err(|error| error.to_string())
622        }
623        HitlRequestKind::Escalation => {
624            if !response.accepted.unwrap_or(false)
625                || waitpoint_is_terminal(log, &response.request_id).await?
626            {
627                return Ok(());
628            }
629            complete_waitpoint_on(
630                log,
631                &response.request_id,
632                Some(json!({
633                    "accepted": true,
634                    "reviewer": response.reviewer,
635                    "reason": response.reason,
636                    "responded_at": response.responded_at,
637                })),
638                response.reviewer.clone(),
639                response.reason.clone(),
640                response.metadata.clone(),
641            )
642            .await
643            .map(|_| ())
644            .map_err(|error| error.to_string())
645        }
646        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
647            if waitpoint_is_terminal(log, &response.request_id).await? {
648                return Ok(());
649            }
650            let request = load_request_envelope(log, kind, &response.request_id)
651                .await
652                .map_err(|error| error.to_string())?;
653            match resolve_approval_state(log, kind, &request)
654                .await
655                .map_err(|error| error.to_string())?
656            {
657                ApprovalResolution::Pending => Ok(()),
658                ApprovalResolution::Approved(progress) => {
659                    let record = approval_record_json(&progress);
660                    append_named_event(
661                        log,
662                        kind,
663                        approved_event_kind(kind),
664                        &request.request_id,
665                        &request.trace_id,
666                        json!({
667                            "request_id": request.request_id.clone(),
668                            "record": record.clone(),
669                        }),
670                    )
671                    .await
672                    .map_err(|error| error.to_string())?;
673                    complete_waitpoint_on(
674                        log,
675                        &request.request_id,
676                        Some(record),
677                        response.reviewer.clone(),
678                        progress.reason.clone(),
679                        response.metadata.clone(),
680                    )
681                    .await
682                    .map(|_| ())
683                    .map_err(|error| error.to_string())
684                }
685                ApprovalResolution::Denied(denied) => {
686                    append_named_event(
687                        log,
688                        kind,
689                        denied_event_kind(kind),
690                        &request.request_id,
691                        &request.trace_id,
692                        json!({
693                            "request_id": request.request_id.clone(),
694                            "reviewer": denied.reviewer.clone(),
695                            "reason": denied.reason.clone(),
696                        }),
697                    )
698                    .await
699                    .map_err(|error| error.to_string())?;
700                    cancel_waitpoint_on(
701                        log,
702                        &request.request_id,
703                        denied.reviewer.clone(),
704                        denied.reason.clone(),
705                        denied.metadata.clone(),
706                    )
707                    .await
708                    .map(|_| ())
709                    .map_err(|error| error.to_string())
710                }
711            }
712        }
713    }
714}
715
716async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
717    Ok(inspect_waitpoint_on(log, request_id)
718        .await
719        .map_err(|error| error.to_string())?
720        .is_some_and(|record| record.status != WaitpointStatus::Open))
721}
722
723async fn load_request_envelope(
724    log: &Arc<AnyEventLog>,
725    kind: HitlRequestKind,
726    request_id: &str,
727) -> Result<HitlRequestEnvelope, VmError> {
728    let topic = topic(kind)?;
729    let events = log
730        .read_range(&topic, None, usize::MAX)
731        .await
732        .map_err(log_error)?;
733    events
734        .into_iter()
735        .filter(|(_, event)| event.kind == kind.request_event_kind())
736        .find_map(|(_, event)| {
737            if !event_matches_request(&event, request_id) {
738                return None;
739            }
740            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
741        })
742        .ok_or_else(|| {
743            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
744        })
745}
746
747async fn resolve_approval_state(
748    log: &Arc<AnyEventLog>,
749    kind: HitlRequestKind,
750    request: &HitlRequestEnvelope,
751) -> Result<ApprovalResolution, VmError> {
752    let quorum = approval_quorum_from_request(kind, request)?;
753    let allowed_reviewers = approval_reviewers_from_request(kind, request)
754        .into_iter()
755        .collect::<BTreeSet<_>>();
756    let mut progress = ApprovalProgress {
757        reviewers: BTreeSet::new(),
758        reason: None,
759        approved_at: None,
760    };
761    let topic = topic(kind)?;
762    let events = log
763        .read_range(&topic, None, usize::MAX)
764        .await
765        .map_err(log_error)?;
766    for (_, event) in events {
767        if !event_matches_request(&event, &request.request_id)
768            || event.kind != "hitl.response_received"
769        {
770            continue;
771        }
772        let response: HitlHostResponse = serde_json::from_value(event.payload)
773            .map_err(|error| VmError::Runtime(error.to_string()))?;
774        if let Some(reviewer) = response.reviewer.as_deref() {
775            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
776                continue;
777            }
778            if progress.reviewers.contains(reviewer) {
779                continue;
780            }
781        }
782        if response.approved.unwrap_or(false) {
783            if let Some(reviewer) = response.reviewer.clone() {
784                progress.reviewers.insert(reviewer);
785            }
786            progress.reason = response.reason.clone();
787            progress.approved_at = response.responded_at.clone();
788            if progress.reviewers.len() as u32 >= quorum {
789                return Ok(ApprovalResolution::Approved(progress));
790            }
791            continue;
792        }
793        return Ok(ApprovalResolution::Denied(response));
794    }
795    Ok(ApprovalResolution::Pending)
796}
797
798fn approval_quorum_from_request(
799    kind: HitlRequestKind,
800    request: &HitlRequestEnvelope,
801) -> Result<u32, VmError> {
802    let key = match kind {
803        HitlRequestKind::DualControl => "n",
804        _ => "quorum",
805    };
806    let quorum = request
807        .payload
808        .get(key)
809        .and_then(JsonValue::as_u64)
810        .unwrap_or(1);
811    u32::try_from(quorum).map_err(|_| {
812        VmError::Runtime(format!(
813            "invalid quorum in HITL request '{}'",
814            request.request_id
815        ))
816    })
817}
818
819fn approval_reviewers_from_request(
820    kind: HitlRequestKind,
821    request: &HitlRequestEnvelope,
822) -> Vec<String> {
823    let key = match kind {
824        HitlRequestKind::DualControl => "approvers",
825        _ => "reviewers",
826    };
827    request
828        .payload
829        .get(key)
830        .and_then(JsonValue::as_array)
831        .map(|values| {
832            values
833                .iter()
834                .filter_map(JsonValue::as_str)
835                .map(str::to_string)
836                .collect()
837        })
838        .unwrap_or_default()
839}
840
841fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
842    json!({
843        "approved": true,
844        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
845        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
846        "reason": progress.reason,
847    })
848}
849
850fn approval_record_from_waitpoint(
851    record: &WaitpointRecord,
852    builtin: &str,
853) -> Result<VmValue, VmError> {
854    record
855        .value
856        .as_ref()
857        .map(crate::stdlib::json_to_vm_value)
858        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
859}
860
861async fn approval_wait_error(
862    log: &Arc<AnyEventLog>,
863    kind: HitlRequestKind,
864    request_id: &str,
865) -> VmError {
866    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
867        if record.status == WaitpointStatus::Cancelled
868            && record.reason.as_deref() != Some("upstream_cancelled")
869        {
870            return approval_denied_error(
871                request_id,
872                HitlHostResponse {
873                    request_id: request_id.to_string(),
874                    answer: None,
875                    approved: Some(false),
876                    accepted: None,
877                    reviewer: record.cancelled_by.clone(),
878                    reason: record.reason.clone(),
879                    metadata: record.metadata.clone(),
880                    responded_at: record.cancelled_at.clone(),
881                },
882            );
883        }
884        if record.status == WaitpointStatus::Cancelled {
885            return hitl_cancelled_error(
886                request_id,
887                kind,
888                "",
889                &[request_id.to_string()],
890                record.reason.clone(),
891            );
892        }
893    }
894    hitl_cancelled_error(
895        request_id,
896        kind,
897        "",
898        &[request_id.to_string()],
899        Some("upstream_cancelled".to_string()),
900    )
901}
902
903async fn append_timeout_once(
904    log: &Arc<AnyEventLog>,
905    kind: HitlRequestKind,
906    request_id: &str,
907    trace_id: &str,
908) -> Result<(), VmError> {
909    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
910        return Ok(());
911    }
912    append_timeout(log, kind, request_id, trace_id).await
913}
914
915async fn hitl_event_exists(
916    log: &Arc<AnyEventLog>,
917    kind: HitlRequestKind,
918    request_id: &str,
919    event_kind: &str,
920) -> Result<bool, VmError> {
921    let topic = topic(kind)?;
922    let events = log
923        .read_range(&topic, None, usize::MAX)
924        .await
925        .map_err(log_error)?;
926    Ok(events
927        .into_iter()
928        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
929}
930
931fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
932    match kind {
933        HitlRequestKind::DualControl => "hitl.dual_control_approved",
934        _ => "hitl.approval_approved",
935    }
936}
937
938fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
939    match kind {
940        HitlRequestKind::DualControl => "hitl.dual_control_denied",
941        _ => "hitl.approval_denied",
942    }
943}
944
945async fn append_request(
946    log: &Arc<AnyEventLog>,
947    request: &HitlRequestEnvelope,
948) -> Result<(), VmError> {
949    let topic = topic(request.kind)?;
950    log.append(
951        &topic,
952        LogEvent::new(
953            request.kind.request_event_kind(),
954            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
955        )
956        .with_headers(request_headers(request)),
957    )
958    .await
959    .map(|_| ())
960    .map_err(log_error)
961}
962
963async fn append_named_event(
964    log: &Arc<AnyEventLog>,
965    kind: HitlRequestKind,
966    event_kind: &str,
967    request_id: &str,
968    trace_id: &str,
969    payload: JsonValue,
970) -> Result<(), VmError> {
971    let topic = topic(kind)?;
972    let headers = headers_with_trace(request_id, trace_id);
973    log.append(
974        &topic,
975        LogEvent::new(event_kind, payload).with_headers(headers),
976    )
977    .await
978    .map(|_| ())
979    .map_err(log_error)
980}
981
982async fn append_timeout(
983    log: &Arc<AnyEventLog>,
984    kind: HitlRequestKind,
985    request_id: &str,
986    trace_id: &str,
987) -> Result<(), VmError> {
988    append_named_event(
989        log,
990        kind,
991        "hitl.timeout",
992        request_id,
993        trace_id,
994        serde_json::to_value(HitlTimeoutRecord {
995            request_id: request_id.to_string(),
996            kind,
997            trace_id: trace_id.to_string(),
998            timed_out_at: now_rfc3339(),
999        })
1000        .map_err(|error| VmError::Runtime(error.to_string()))?,
1001    )
1002    .await
1003}
1004
1005async fn maybe_apply_mock_response(
1006    kind: HitlRequestKind,
1007    request_id: &str,
1008    request_payload: &JsonValue,
1009) -> Result<(), VmError> {
1010    let mut params = request_payload
1011        .as_object()
1012        .cloned()
1013        .unwrap_or_default()
1014        .into_iter()
1015        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1016        .collect::<BTreeMap<_, _>>();
1017    params.insert(
1018        "request_id".to_string(),
1019        VmValue::String(Rc::from(request_id.to_string())),
1020    );
1021    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1022        return Ok(());
1023    };
1024    let value = result?;
1025    let responses = match value {
1026        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1027        other => vec![other],
1028    };
1029    for response in responses {
1030        let response_dict = response.as_dict().ok_or_else(|| {
1031            VmError::Runtime(format!(
1032                "mocked HITL {} response must be a dict or list<dict>",
1033                kind.as_str()
1034            ))
1035        })?;
1036        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1037        append_hitl_response(None, hitl_response)
1038            .await
1039            .map_err(VmError::Runtime)?;
1040    }
1041    Ok(())
1042}
1043
1044fn parse_hitl_response_dict(
1045    request_id: &str,
1046    response_dict: &BTreeMap<String, VmValue>,
1047) -> Result<HitlHostResponse, VmError> {
1048    Ok(HitlHostResponse {
1049        request_id: request_id.to_string(),
1050        answer: response_dict
1051            .get("answer")
1052            .map(crate::llm::vm_value_to_json),
1053        approved: response_dict.get("approved").and_then(vm_bool),
1054        accepted: response_dict.get("accepted").and_then(vm_bool),
1055        reviewer: response_dict.get("reviewer").map(VmValue::display),
1056        reason: response_dict.get("reason").map(VmValue::display),
1057        metadata: response_dict
1058            .get("metadata")
1059            .map(crate::llm::vm_value_to_json),
1060        responded_at: response_dict.get("responded_at").map(VmValue::display),
1061    })
1062}
1063
1064fn maybe_notify_host(request: &HitlRequestEnvelope) {
1065    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1066        return;
1067    };
1068    bridge.notify(
1069        "harn.hitl.requested",
1070        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1071    );
1072}
1073
1074fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1075    let Some(value) = value else {
1076        return Ok(AskUserOptions {
1077            schema: None,
1078            timeout: None,
1079            default: None,
1080        });
1081    };
1082    let dict = value
1083        .as_dict()
1084        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1085    Ok(AskUserOptions {
1086        schema: dict
1087            .get("schema")
1088            .cloned()
1089            .filter(|value| !matches!(value, VmValue::Nil)),
1090        timeout: dict.get("timeout").map(parse_duration_value).transpose()?,
1091        default: dict
1092            .get("default")
1093            .cloned()
1094            .filter(|value| !matches!(value, VmValue::Nil)),
1095    })
1096}
1097
1098fn parse_approval_options(
1099    value: Option<&VmValue>,
1100    builtin: &str,
1101) -> Result<ApprovalOptions, VmError> {
1102    let dict = match value {
1103        None => None,
1104        Some(VmValue::Dict(dict)) => Some(dict),
1105        Some(_) => {
1106            return Err(VmError::Runtime(format!(
1107                "{builtin}: options must be a dict"
1108            )))
1109        }
1110    };
1111    let quorum = dict
1112        .and_then(|dict| dict.get("quorum"))
1113        .and_then(VmValue::as_int)
1114        .unwrap_or(1);
1115    if quorum <= 0 {
1116        return Err(VmError::Runtime(format!(
1117            "{builtin}: quorum must be positive"
1118        )));
1119    }
1120    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1121    let deadline = dict
1122        .and_then(|dict| dict.get("deadline"))
1123        .map(parse_duration_value)
1124        .transpose()?
1125        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1126    Ok(ApprovalOptions {
1127        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1128        quorum: quorum as u32,
1129        reviewers,
1130        deadline,
1131    })
1132}
1133
1134fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1135    args.get(idx)
1136        .map(VmValue::display)
1137        .filter(|value| !value.is_empty())
1138        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1139}
1140
1141fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1142    let value = args
1143        .get(idx)
1144        .and_then(VmValue::as_int)
1145        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1146    if value <= 0 {
1147        return Err(VmError::Runtime(format!(
1148            "{builtin}: expected a positive int at {idx}"
1149        )));
1150    }
1151    Ok(value)
1152}
1153
1154fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1155    let Some(value) = value else {
1156        return Ok(Vec::new());
1157    };
1158    match value {
1159        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1160        _ => Err(VmError::Runtime(format!(
1161            "{builtin}: expected list<string>"
1162        ))),
1163    }
1164}
1165
1166fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1167    match value {
1168        VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
1169        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1170        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1171        _ => Err(VmError::Runtime(
1172            "expected a duration or millisecond count".to_string(),
1173        )),
1174    }
1175}
1176
1177fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1178    active_event_log()
1179        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1180}
1181
1182fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1183    if let Some(log) = active_event_log() {
1184        return Ok(log);
1185    }
1186    let Some(base_dir) = base_dir else {
1187        return Ok(install_memory_for_current_thread(
1188            HITL_EVENT_LOG_QUEUE_DEPTH,
1189        ));
1190    };
1191    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1192}
1193
1194fn current_dispatch_keys() -> Option<DispatchKeys> {
1195    let context = current_dispatch_context()?;
1196    let stable_base = context
1197        .replay_of_event_id
1198        .clone()
1199        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1200    let instance_key = format!(
1201        "{}::{}",
1202        context.trigger_event.id.0,
1203        context.replay_of_event_id.as_deref().unwrap_or("live")
1204    );
1205    Some(DispatchKeys {
1206        instance_key,
1207        stable_base,
1208        agent: context.agent_id,
1209        trace_id: context.trigger_event.trace_id.0,
1210    })
1211}
1212
1213fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1214    if let Some(keys) = dispatch_keys {
1215        let seq = REQUEST_SEQUENCE.with(|slot| {
1216            let mut state = slot.borrow_mut();
1217            if state.instance_key != keys.instance_key {
1218                state.instance_key = keys.instance_key.clone();
1219                state.next_seq = 0;
1220            }
1221            state.next_seq += 1;
1222            state.next_seq
1223        });
1224        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1225    }
1226    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1227}
1228
1229fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1230    headers_with_trace(&request.request_id, &request.trace_id)
1231}
1232
1233fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1234    let mut headers = BTreeMap::new();
1235    headers.insert("request_id".to_string(), request_id.to_string());
1236    headers
1237}
1238
1239fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1240    let mut headers = response_headers(request_id);
1241    headers.insert("trace_id".to_string(), trace_id.to_string());
1242    headers
1243}
1244
1245fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1246    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1247}
1248
1249fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1250    event
1251        .headers
1252        .get("request_id")
1253        .is_some_and(|value| value == request_id)
1254        || event
1255            .payload
1256            .get("request_id")
1257            .and_then(JsonValue::as_str)
1258            .is_some_and(|value| value == request_id)
1259}
1260
1261fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1262    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1263        "name": "ApprovalDeniedError",
1264        "category": "generic",
1265        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1266        "request_id": request_id,
1267        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1268        "reason": response.reason,
1269    })))
1270}
1271
1272fn hitl_cancelled_error(
1273    request_id: &str,
1274    kind: HitlRequestKind,
1275    wait_id: &str,
1276    waitpoint_ids: &[String],
1277    reason: Option<String>,
1278) -> VmError {
1279    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1280    let message = reason
1281        .clone()
1282        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1283    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1284        "name": "HumanCancelledError",
1285        "category": ErrorCategory::Cancelled.as_str(),
1286        "message": message,
1287        "request_id": request_id,
1288        "kind": kind.as_str(),
1289        "wait_id": wait_id,
1290        "waitpoint_ids": waitpoint_ids,
1291        "reason": reason,
1292    })))
1293}
1294
1295fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1296    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1297    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1298        "name": "HumanTimeoutError",
1299        "category": ErrorCategory::Timeout.as_str(),
1300        "message": format!("{} timed out", kind.as_str()),
1301        "request_id": request_id,
1302        "kind": kind.as_str(),
1303    })))
1304}
1305
1306fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1307    match default {
1308        VmValue::Int(_) => match value {
1309            VmValue::Int(_) => value.clone(),
1310            VmValue::Float(number) => VmValue::Int(*number as i64),
1311            VmValue::String(text) => text
1312                .parse::<i64>()
1313                .map(VmValue::Int)
1314                .unwrap_or_else(|_| default.clone()),
1315            _ => default.clone(),
1316        },
1317        VmValue::Float(_) => match value {
1318            VmValue::Float(_) => value.clone(),
1319            VmValue::Int(number) => VmValue::Float(*number as f64),
1320            VmValue::String(text) => text
1321                .parse::<f64>()
1322                .map(VmValue::Float)
1323                .unwrap_or_else(|_| default.clone()),
1324            _ => default.clone(),
1325        },
1326        VmValue::Bool(_) => match value {
1327            VmValue::Bool(_) => value.clone(),
1328            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1329            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1330            _ => default.clone(),
1331        },
1332        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1333        VmValue::Duration(_) => match value {
1334            VmValue::Duration(_) => value.clone(),
1335            VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1336            _ => default.clone(),
1337        },
1338        VmValue::Nil => value.clone(),
1339        _ => {
1340            if value.type_name() == default.type_name() {
1341                value.clone()
1342            } else {
1343                default.clone()
1344            }
1345        }
1346    }
1347}
1348
1349fn log_error(error: impl std::fmt::Display) -> VmError {
1350    VmError::Runtime(error.to_string())
1351}
1352
1353fn now_rfc3339() -> String {
1354    OffsetDateTime::now_utc()
1355        .format(&Rfc3339)
1356        .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1357}
1358
1359fn new_trace_id() -> String {
1360    format!("trace_{}", Uuid::now_v7())
1361}
1362
1363fn vm_bool(value: &VmValue) -> Option<bool> {
1364    match value {
1365        VmValue::Bool(flag) => Some(*flag),
1366        _ => None,
1367    }
1368}
1369
1370fn vm_string(value: &VmValue) -> Option<&str> {
1371    match value {
1372        VmValue::String(text) => Some(text.as_ref()),
1373        _ => None,
1374    }
1375}
1376
1377fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1378    match value {
1379        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1380        _ => None,
1381    }
1382}
1383
1384#[cfg(test)]
1385mod tests {
1386    use super::{
1387        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1388    };
1389    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1390    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1391
1392    async fn execute_hitl_script(
1393        base_dir: &std::path::Path,
1394        source: &str,
1395    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1396        reset_thread_local_state();
1397        let log = install_default_for_base_dir(base_dir).expect("install event log");
1398        let chunk = compile_source(source).expect("compile source");
1399        let mut vm = Vm::new();
1400        register_vm_stdlib(&mut vm);
1401        vm.set_source_dir(base_dir);
1402        vm.execute(&chunk).await?;
1403        let output = vm.output().trim_end().to_string();
1404        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1405        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1406        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1407        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1408        Ok((
1409            output,
1410            question_events,
1411            approval_events,
1412            dual_control_events,
1413            escalation_events,
1414        ))
1415    }
1416
1417    async fn event_kinds(
1418        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1419        topic: &str,
1420    ) -> Vec<String> {
1421        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1422            .await
1423            .expect("read topic")
1424            .into_iter()
1425            .map(|(_, event)| event.kind)
1426            .collect()
1427    }
1428
1429    #[tokio::test(flavor = "current_thread")]
1430    async fn ask_user_coerces_to_default_type_and_logs_events() {
1431        tokio::task::LocalSet::new()
1432            .run_until(async {
1433                let dir = tempfile::tempdir().expect("tempdir");
1434                let source = r#"
1435pipeline test(task) {
1436  host_mock("hitl", "question", {answer: "9"})
1437  let answer: int = ask_user("Pick a number", {default: 0})
1438  println(answer)
1439}
1440"#;
1441                let (
1442                    output,
1443                    question_events,
1444                    approval_events,
1445                    dual_control_events,
1446                    escalation_events,
1447                ) = execute_hitl_script(dir.path(), source)
1448                    .await
1449                    .expect("script succeeds");
1450                assert_eq!(output, "9");
1451                assert_eq!(
1452                    question_events,
1453                    vec![
1454                        "hitl.question_asked".to_string(),
1455                        "hitl.response_received".to_string()
1456                    ]
1457                );
1458                assert!(approval_events.is_empty());
1459                assert!(dual_control_events.is_empty());
1460                assert!(escalation_events.is_empty());
1461            })
1462            .await;
1463    }
1464
1465    #[tokio::test(flavor = "current_thread")]
1466    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1467        tokio::task::LocalSet::new()
1468            .run_until(async {
1469                let dir = tempfile::tempdir().expect("tempdir");
1470                let source = r#"
1471pipeline test(task) {
1472  host_mock("hitl", "approval", [
1473    {approved: true, reviewer: "alice", reason: "ok"},
1474    {approved: true, reviewer: "bob", reason: "ship it"},
1475  ])
1476  let record = request_approval(
1477    "deploy production",
1478    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1479  )
1480  println(record.approved)
1481  println(len(record.reviewers))
1482  println(record.reviewers[0])
1483  println(record.reviewers[1])
1484}
1485"#;
1486                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1487                    .await
1488                    .expect("script succeeds");
1489                assert_eq!(
1490                    approval_events,
1491                    vec![
1492                        "hitl.approval_requested".to_string(),
1493                        "hitl.response_received".to_string(),
1494                        "hitl.response_received".to_string(),
1495                        "hitl.approval_approved".to_string(),
1496                    ]
1497                );
1498            })
1499            .await;
1500    }
1501
1502    #[tokio::test(flavor = "current_thread")]
1503    async fn request_approval_surfaces_denials_as_typed_errors() {
1504        tokio::task::LocalSet::new()
1505            .run_until(async {
1506                let dir = tempfile::tempdir().expect("tempdir");
1507                let source = r#"
1508pipeline test(task) {
1509  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1510  let denied = try {
1511    request_approval("drop table", {reviewers: ["alice"]})
1512  }
1513  println(is_err(denied))
1514  println(unwrap_err(denied).name)
1515  println(unwrap_err(denied).reason)
1516}
1517"#;
1518                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1519                    .await
1520                    .expect("script succeeds");
1521                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1522                assert_eq!(
1523                    approval_events,
1524                    vec![
1525                        "hitl.approval_requested".to_string(),
1526                        "hitl.response_received".to_string(),
1527                        "hitl.approval_denied".to_string(),
1528                    ]
1529                );
1530            })
1531            .await;
1532    }
1533
1534    #[tokio::test(flavor = "current_thread")]
1535    async fn dual_control_executes_action_after_quorum() {
1536        tokio::task::LocalSet::new()
1537            .run_until(async {
1538                let dir = tempfile::tempdir().expect("tempdir");
1539                let source = r#"
1540pipeline test(task) {
1541  host_mock("hitl", "dual_control", [
1542    {approved: true, reviewer: "alice"},
1543    {approved: true, reviewer: "bob"},
1544  ])
1545  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1546  println(result)
1547}
1548"#;
1549                let (output, _, _, dual_control_events, _) =
1550                    execute_hitl_script(dir.path(), source)
1551                        .await
1552                        .expect("script succeeds");
1553                assert_eq!(output, "launched");
1554                assert_eq!(
1555                    dual_control_events,
1556                    vec![
1557                        "hitl.dual_control_requested".to_string(),
1558                        "hitl.response_received".to_string(),
1559                        "hitl.response_received".to_string(),
1560                        "hitl.dual_control_approved".to_string(),
1561                        "hitl.dual_control_executed".to_string(),
1562                    ]
1563                );
1564            })
1565            .await;
1566    }
1567
1568    #[tokio::test(flavor = "current_thread")]
1569    async fn escalate_to_waits_for_acceptance_event() {
1570        tokio::task::LocalSet::new()
1571            .run_until(async {
1572                let dir = tempfile::tempdir().expect("tempdir");
1573                let source = r#"
1574pipeline test(task) {
1575  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1576  let handle = escalate_to("admin", "need override")
1577  println(handle.status)
1578  println(handle.reviewer)
1579}
1580"#;
1581                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1582                    .await
1583                    .expect("script succeeds");
1584                assert_eq!(output, "accepted\nlead");
1585                assert_eq!(
1586                    escalation_events,
1587                    vec![
1588                        "hitl.escalation_issued".to_string(),
1589                        "hitl.escalation_accepted".to_string(),
1590                    ]
1591                );
1592            })
1593            .await;
1594    }
1595}