Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24    WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45    pub(crate) instance_key: String,
46    pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52    Question,
53    Approval,
54    DualControl,
55    Escalation,
56}
57
58impl HitlRequestKind {
59    pub(crate) fn as_str(self) -> &'static str {
60        match self {
61            Self::Question => "question",
62            Self::Approval => "approval",
63            Self::DualControl => "dual_control",
64            Self::Escalation => "escalation",
65        }
66    }
67
68    fn topic(self) -> &'static str {
69        match self {
70            Self::Question => HITL_QUESTIONS_TOPIC,
71            Self::Approval => HITL_APPROVALS_TOPIC,
72            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73            Self::Escalation => HITL_ESCALATIONS_TOPIC,
74        }
75    }
76
77    fn request_event_kind(self) -> &'static str {
78        match self {
79            Self::Question => "hitl.question_asked",
80            Self::Approval => "hitl.approval_requested",
81            Self::DualControl => "hitl.dual_control_requested",
82            Self::Escalation => "hitl.escalation_issued",
83        }
84    }
85
86    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87        if request_id.starts_with("hitl_question_") {
88            Some(Self::Question)
89        } else if request_id.starts_with("hitl_approval_") {
90            Some(Self::Approval)
91        } else if request_id.starts_with("hitl_dual_control_") {
92            Some(Self::DualControl)
93        } else if request_id.starts_with("hitl_escalation_") {
94            Some(Self::Escalation)
95        } else {
96            None
97        }
98    }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103    pub request_id: String,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub answer: Option<JsonValue>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub approved: Option<bool>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub accepted: Option<bool>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub reviewer: Option<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub reason: Option<String>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub metadata: Option<JsonValue>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub responded_at: Option<String>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124    request_id: String,
125    kind: HitlRequestKind,
126    #[serde(default)]
127    agent: String,
128    trace_id: String,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    run_id: Option<String>,
131    requested_at: String,
132    payload: JsonValue,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136struct HitlTimeoutRecord {
137    request_id: String,
138    kind: HitlRequestKind,
139    trace_id: String,
140    timed_out_at: String,
141}
142
143#[derive(Clone, Debug)]
144struct DispatchKeys {
145    instance_key: String,
146    stable_base: String,
147    agent: String,
148    trace_id: String,
149}
150
151#[derive(Clone, Debug)]
152struct AskUserOptions {
153    schema: Option<VmValue>,
154    timeout: Option<StdDuration>,
155    default: Option<VmValue>,
156}
157
158#[derive(Clone, Debug)]
159struct ApprovalOptions {
160    detail: Option<VmValue>,
161    quorum: u32,
162    reviewers: Vec<String>,
163    deadline: StdDuration,
164}
165
166#[derive(Clone, Debug)]
167struct ApprovalProgress {
168    reviewers: BTreeSet<String>,
169    signatures: Vec<ApprovalSignature>,
170    reason: Option<String>,
171    approved_at: Option<String>,
172}
173
174#[derive(Clone, Debug, Serialize)]
175struct ApprovalSignature {
176    reviewer: String,
177    signed_at: String,
178    signature: String,
179}
180
181#[derive(Clone, Debug)]
182enum ApprovalResolution {
183    Pending,
184    Approved(ApprovalProgress),
185    Denied(HitlHostResponse),
186}
187
188#[derive(Clone, Debug)]
189enum WaitpointOutcome {
190    Completed(WaitpointRecord),
191    Timeout,
192    Cancelled {
193        wait_id: String,
194        waitpoint_ids: Vec<String>,
195        reason: Option<String>,
196    },
197}
198
199pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
200    vm.register_async_builtin("ask_user", |args| {
201        Box::pin(async move { ask_user_impl(&args).await })
202    });
203
204    vm.register_async_builtin("request_approval", |args| {
205        Box::pin(async move { request_approval_impl(&args).await })
206    });
207
208    vm.register_async_builtin("dual_control", |args| {
209        Box::pin(async move { dual_control_impl(&args).await })
210    });
211
212    vm.register_async_builtin("escalate_to", |args| {
213        Box::pin(async move { escalate_to_impl(&args).await })
214    });
215}
216
217pub(crate) fn reset_hitl_state() {
218    REQUEST_SEQUENCE.with(|slot| {
219        *slot.borrow_mut() = RequestSequenceState::default();
220    });
221}
222
223pub(crate) fn take_hitl_state() -> RequestSequenceState {
224    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
225}
226
227pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
228    REQUEST_SEQUENCE.with(|slot| {
229        *slot.borrow_mut() = state;
230    });
231}
232
233pub async fn append_hitl_response(
234    base_dir: Option<&Path>,
235    mut response: HitlHostResponse,
236) -> Result<u64, String> {
237    let kind = HitlRequestKind::from_request_id(&response.request_id)
238        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
239    if response.responded_at.is_none() {
240        response.responded_at = Some(now_rfc3339());
241    }
242    let log = ensure_hitl_event_log_for(base_dir)?;
243    let headers = response_headers(&response.request_id);
244    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
245    let event_id = log
246        .append(
247            &topic,
248            LogEvent::new(
249                match kind {
250                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
251                    _ => "hitl.response_received",
252                },
253                serde_json::to_value(&response).map_err(|error| error.to_string())?,
254            )
255            .with_headers(headers),
256        )
257        .await
258        .map_err(|error| error.to_string())?;
259    finalize_hitl_response(&log, kind, &response).await?;
260    Ok(event_id)
261}
262
263pub async fn append_approval_request_on(
264    log: &Arc<AnyEventLog>,
265    agent: impl Into<String>,
266    trace_id: impl Into<String>,
267    action: impl Into<String>,
268    detail: JsonValue,
269    reviewers: Vec<String>,
270) -> Result<String, VmError> {
271    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
272    let trace_id = trace_id.into();
273    let request = HitlRequestEnvelope {
274        request_id: request_id.clone(),
275        kind: HitlRequestKind::Approval,
276        agent: agent.into(),
277        trace_id: trace_id.clone(),
278        run_id: None,
279        requested_at: now_rfc3339(),
280        payload: json!({
281            "action": action.into(),
282            "detail": detail,
283            "quorum": 1,
284            "reviewers": reviewers,
285            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
286        }),
287    };
288    create_request_waitpoint(log, &request).await?;
289    append_request(log, &request).await?;
290    maybe_notify_host(&request);
291    Ok(request_id)
292}
293
294async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
295    let prompt = required_string_arg(args, 0, "ask_user")?;
296    let options = parse_ask_user_options(args.get(1))?;
297    let keys = current_dispatch_keys();
298    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
299    let trace_id = keys
300        .as_ref()
301        .map(|keys| keys.trace_id.clone())
302        .unwrap_or_else(new_trace_id);
303    let log = ensure_hitl_event_log();
304    let request = HitlRequestEnvelope {
305        request_id: request_id.clone(),
306        kind: HitlRequestKind::Question,
307        agent: keys
308            .as_ref()
309            .map(|keys| keys.agent.clone())
310            .unwrap_or_default(),
311        trace_id: trace_id.clone(),
312        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
313        requested_at: now_rfc3339(),
314        payload: json!({
315            "prompt": prompt,
316            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
317            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
318            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
319        }),
320    };
321    create_request_waitpoint(&log, &request).await?;
322    append_request(&log, &request).await?;
323    maybe_notify_host(&request);
324    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
325
326    match wait_for_request_waitpoint(&request_id, options.timeout).await? {
327        WaitpointOutcome::Completed(record) => {
328            let answer = record
329                .value
330                .as_ref()
331                .map(crate::stdlib::json_to_vm_value)
332                .unwrap_or(VmValue::Nil);
333            if let Some(schema) = options.schema.as_ref() {
334                return schema_expect_value(&answer, schema, true);
335            }
336            if let Some(default) = options.default.as_ref() {
337                return Ok(coerce_like_default(&answer, default));
338            }
339            Ok(answer)
340        }
341        WaitpointOutcome::Timeout => {
342            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
343            if let Some(default) = options.default {
344                return Ok(default);
345            }
346            Err(timeout_error(&request_id, HitlRequestKind::Question))
347        }
348        WaitpointOutcome::Cancelled {
349            wait_id,
350            waitpoint_ids,
351            reason,
352        } => Err(hitl_cancelled_error(
353            &request_id,
354            HitlRequestKind::Question,
355            &wait_id,
356            &waitpoint_ids,
357            reason,
358        )),
359    }
360}
361
362async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
363    let action = required_string_arg(args, 0, "request_approval")?;
364    let options = parse_approval_options(args.get(1), "request_approval")?;
365    let keys = current_dispatch_keys();
366    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
367    let trace_id = keys
368        .as_ref()
369        .map(|keys| keys.trace_id.clone())
370        .unwrap_or_else(new_trace_id);
371    let log = ensure_hitl_event_log();
372    let request = HitlRequestEnvelope {
373        request_id: request_id.clone(),
374        kind: HitlRequestKind::Approval,
375        agent: keys
376            .as_ref()
377            .map(|keys| keys.agent.clone())
378            .unwrap_or_default(),
379        trace_id: trace_id.clone(),
380        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
381        requested_at: now_rfc3339(),
382        payload: json!({
383            "action": action,
384            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
385            "quorum": options.quorum,
386            "reviewers": options.reviewers,
387            "deadline_ms": options.deadline.as_millis() as u64,
388        }),
389    };
390    create_request_waitpoint(&log, &request).await?;
391    append_request(&log, &request).await?;
392    maybe_notify_host(&request);
393    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
394
395    match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
396        WaitpointOutcome::Completed(record) => {
397            approval_record_from_waitpoint(&record, "request_approval")
398        }
399        WaitpointOutcome::Timeout => {
400            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
401            Err(timeout_error(&request_id, HitlRequestKind::Approval))
402        }
403        WaitpointOutcome::Cancelled { .. } => {
404            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
405        }
406    }
407}
408
409async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
410    let n = required_positive_int_arg(args, 0, "dual_control")?;
411    let m = required_positive_int_arg(args, 1, "dual_control")?;
412    if n > m {
413        return Err(VmError::Runtime(
414            "dual_control: n must be less than or equal to m".to_string(),
415        ));
416    }
417    let action = args
418        .get(2)
419        .and_then(|value| match value {
420            VmValue::Closure(closure) => Some(closure.clone()),
421            _ => None,
422        })
423        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
424    let approvers = optional_string_list(args.get(3), "dual_control")?;
425    if !approvers.is_empty() && approvers.len() < m as usize {
426        return Err(VmError::Runtime(format!(
427            "dual_control: expected at least {m} approvers, got {}",
428            approvers.len()
429        )));
430    }
431
432    let keys = current_dispatch_keys();
433    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
434    let trace_id = keys
435        .as_ref()
436        .map(|keys| keys.trace_id.clone())
437        .unwrap_or_else(new_trace_id);
438    let action_name = if action.func.name.is_empty() {
439        "anonymous".to_string()
440    } else {
441        action.func.name.clone()
442    };
443    let log = ensure_hitl_event_log();
444    let request = HitlRequestEnvelope {
445        request_id: request_id.clone(),
446        kind: HitlRequestKind::DualControl,
447        agent: keys
448            .as_ref()
449            .map(|keys| keys.agent.clone())
450            .unwrap_or_default(),
451        trace_id: trace_id.clone(),
452        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
453        requested_at: now_rfc3339(),
454        payload: json!({
455            "n": n,
456            "m": m,
457            "action": action_name,
458            "approvers": approvers,
459            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
460        }),
461    };
462    create_request_waitpoint(&log, &request).await?;
463    append_request(&log, &request).await?;
464    maybe_notify_host(&request);
465    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
466
467    match wait_for_request_waitpoint(
468        &request_id,
469        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
470    )
471    .await?
472    {
473        WaitpointOutcome::Completed(record) => {
474            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
475            let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
476                VmError::Runtime("dual_control requires an async builtin VM context".to_string())
477            })?;
478            let result = vm.call_closure_pub(&action, &[]).await?;
479
480            append_named_event(
481                &log,
482                HitlRequestKind::DualControl,
483                "hitl.dual_control_executed",
484                &request_id,
485                &trace_id,
486                json!({
487                    "request_id": request_id,
488                    "result": crate::llm::vm_value_to_json(&result),
489                }),
490            )
491            .await?;
492
493            Ok(result)
494        }
495        WaitpointOutcome::Timeout => {
496            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
497            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
498        }
499        WaitpointOutcome::Cancelled { .. } => {
500            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
501        }
502    }
503}
504
505async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
506    let role = required_string_arg(args, 0, "escalate_to")?;
507    let reason = required_string_arg(args, 1, "escalate_to")?;
508    let keys = current_dispatch_keys();
509    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
510    let trace_id = keys
511        .as_ref()
512        .map(|keys| keys.trace_id.clone())
513        .unwrap_or_else(new_trace_id);
514    let log = ensure_hitl_event_log();
515    let request = HitlRequestEnvelope {
516        request_id: request_id.clone(),
517        kind: HitlRequestKind::Escalation,
518        agent: keys
519            .as_ref()
520            .map(|keys| keys.agent.clone())
521            .unwrap_or_default(),
522        trace_id: trace_id.clone(),
523        run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
524        requested_at: now_rfc3339(),
525        payload: json!({
526            "role": role,
527            "reason": reason,
528            "capability_policy": escalation_capability_policy(),
529        }),
530    };
531    create_request_waitpoint(&log, &request).await?;
532    append_request(&log, &request).await?;
533    maybe_notify_host(&request);
534    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
535
536    match wait_for_request_waitpoint(&request_id, None).await? {
537        WaitpointOutcome::Completed(record) => {
538            let accepted_at = record.completed_at.clone();
539            let reviewer = record.completed_by.clone();
540            let accepted = record
541                .value
542                .as_ref()
543                .and_then(|value| value.get("accepted"))
544                .and_then(JsonValue::as_bool)
545                .unwrap_or(true);
546            Ok(crate::stdlib::json_to_vm_value(&json!({
547                "request_id": request_id,
548                "role": role,
549                "reason": reason,
550                "trace_id": trace_id,
551                "status": if accepted { "accepted" } else { "pending" },
552                "accepted_at": accepted_at,
553                "reviewer": reviewer,
554            })))
555        }
556        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
557        WaitpointOutcome::Cancelled {
558            wait_id,
559            waitpoint_ids,
560            reason,
561        } => Err(hitl_cancelled_error(
562            &request_id,
563            HitlRequestKind::Escalation,
564            &wait_id,
565            &waitpoint_ids,
566            reason,
567        )),
568    }
569}
570
571async fn create_request_waitpoint(
572    log: &Arc<AnyEventLog>,
573    request: &HitlRequestEnvelope,
574) -> Result<(), VmError> {
575    create_waitpoint_on(
576        log,
577        Some(request.request_id.clone()),
578        Some(json!({
579            "kind": request.kind.as_str(),
580            "agent": request.agent.clone(),
581            "trace_id": request.trace_id.clone(),
582            "requested_at": request.requested_at.clone(),
583            "payload": request.payload.clone(),
584        })),
585    )
586    .await?;
587    Ok(())
588}
589
590async fn wait_for_request_waitpoint(
591    request_id: &str,
592    timeout: Option<StdDuration>,
593) -> Result<WaitpointOutcome, VmError> {
594    match wait_on_waitpoints(
595        vec![request_id.to_string()],
596        WaitpointWaitOptions { timeout },
597    )
598    .await
599    {
600        Ok(records) => Ok(WaitpointOutcome::Completed(
601            records
602                .into_iter()
603                .next()
604                .expect("single waitpoint wait result"),
605        )),
606        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
607        Err(WaitpointWaitFailure::Cancelled {
608            wait_id,
609            waitpoint_ids,
610            reason,
611        }) => Ok(WaitpointOutcome::Cancelled {
612            wait_id,
613            waitpoint_ids,
614            reason,
615        }),
616        Err(WaitpointWaitFailure::Vm(error)) => {
617            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
618                return Ok(outcome);
619            }
620            Err(error)
621        }
622    }
623}
624
625fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
626    let VmError::Thrown(VmValue::Dict(dict)) = error else {
627        return None;
628    };
629    let name = dict.get("name").and_then(vm_string)?;
630    match name {
631        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
632        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
633            wait_id: dict
634                .get("wait_id")
635                .and_then(vm_string)
636                .unwrap_or_default()
637                .to_string(),
638            waitpoint_ids: dict
639                .get("waitpoint_ids")
640                .and_then(vm_string_list)
641                .unwrap_or_default(),
642            reason: dict
643                .get("reason")
644                .and_then(vm_string)
645                .map(ToString::to_string),
646        }),
647        _ => None,
648    }
649}
650
651async fn finalize_hitl_response(
652    log: &Arc<AnyEventLog>,
653    kind: HitlRequestKind,
654    response: &HitlHostResponse,
655) -> Result<(), String> {
656    match kind {
657        HitlRequestKind::Question => {
658            if waitpoint_is_terminal(log, &response.request_id).await? {
659                return Ok(());
660            }
661            complete_waitpoint_on(
662                log,
663                &response.request_id,
664                response.answer.clone(),
665                response.reviewer.clone(),
666                response.reason.clone(),
667                response.metadata.clone(),
668            )
669            .await
670            .map(|_| ())
671            .map_err(|error| error.to_string())
672        }
673        HitlRequestKind::Escalation => {
674            if !response.accepted.unwrap_or(false)
675                || waitpoint_is_terminal(log, &response.request_id).await?
676            {
677                return Ok(());
678            }
679            complete_waitpoint_on(
680                log,
681                &response.request_id,
682                Some(json!({
683                    "accepted": true,
684                    "reviewer": response.reviewer,
685                    "reason": response.reason,
686                    "responded_at": response.responded_at,
687                })),
688                response.reviewer.clone(),
689                response.reason.clone(),
690                response.metadata.clone(),
691            )
692            .await
693            .map(|_| ())
694            .map_err(|error| error.to_string())
695        }
696        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
697            if waitpoint_is_terminal(log, &response.request_id).await? {
698                return Ok(());
699            }
700            let request = load_request_envelope(log, kind, &response.request_id)
701                .await
702                .map_err(|error| error.to_string())?;
703            match resolve_approval_state(log, kind, &request)
704                .await
705                .map_err(|error| error.to_string())?
706            {
707                ApprovalResolution::Pending => Ok(()),
708                ApprovalResolution::Approved(progress) => {
709                    let record = approval_record_json(&progress);
710                    append_named_event(
711                        log,
712                        kind,
713                        approved_event_kind(kind),
714                        &request.request_id,
715                        &request.trace_id,
716                        json!({
717                            "request_id": request.request_id.clone(),
718                            "record": record.clone(),
719                        }),
720                    )
721                    .await
722                    .map_err(|error| error.to_string())?;
723                    complete_waitpoint_on(
724                        log,
725                        &request.request_id,
726                        Some(record),
727                        response.reviewer.clone(),
728                        progress.reason.clone(),
729                        response.metadata.clone(),
730                    )
731                    .await
732                    .map(|_| ())
733                    .map_err(|error| error.to_string())
734                }
735                ApprovalResolution::Denied(denied) => {
736                    append_named_event(
737                        log,
738                        kind,
739                        denied_event_kind(kind),
740                        &request.request_id,
741                        &request.trace_id,
742                        json!({
743                            "request_id": request.request_id.clone(),
744                            "reviewer": denied.reviewer.clone(),
745                            "reason": denied.reason.clone(),
746                        }),
747                    )
748                    .await
749                    .map_err(|error| error.to_string())?;
750                    cancel_waitpoint_on(
751                        log,
752                        &request.request_id,
753                        denied.reviewer.clone(),
754                        denied.reason.clone(),
755                        denied.metadata.clone(),
756                    )
757                    .await
758                    .map(|_| ())
759                    .map_err(|error| error.to_string())
760                }
761            }
762        }
763    }
764}
765
766async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
767    Ok(inspect_waitpoint_on(log, request_id)
768        .await
769        .map_err(|error| error.to_string())?
770        .is_some_and(|record| record.status != WaitpointStatus::Open))
771}
772
773async fn load_request_envelope(
774    log: &Arc<AnyEventLog>,
775    kind: HitlRequestKind,
776    request_id: &str,
777) -> Result<HitlRequestEnvelope, VmError> {
778    let topic = topic(kind)?;
779    let events = log
780        .read_range(&topic, None, usize::MAX)
781        .await
782        .map_err(log_error)?;
783    events
784        .into_iter()
785        .filter(|(_, event)| event.kind == kind.request_event_kind())
786        .find_map(|(_, event)| {
787            if !event_matches_request(&event, request_id) {
788                return None;
789            }
790            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
791        })
792        .ok_or_else(|| {
793            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
794        })
795}
796
797async fn resolve_approval_state(
798    log: &Arc<AnyEventLog>,
799    kind: HitlRequestKind,
800    request: &HitlRequestEnvelope,
801) -> Result<ApprovalResolution, VmError> {
802    let quorum = approval_quorum_from_request(kind, request)?;
803    let allowed_reviewers = approval_reviewers_from_request(kind, request)
804        .into_iter()
805        .collect::<BTreeSet<_>>();
806    let mut progress = ApprovalProgress {
807        reviewers: BTreeSet::new(),
808        signatures: Vec::new(),
809        reason: None,
810        approved_at: None,
811    };
812    let topic = topic(kind)?;
813    let events = log
814        .read_range(&topic, None, usize::MAX)
815        .await
816        .map_err(log_error)?;
817    for (_, event) in events {
818        if !event_matches_request(&event, &request.request_id)
819            || event.kind != "hitl.response_received"
820        {
821            continue;
822        }
823        let response: HitlHostResponse = serde_json::from_value(event.payload)
824            .map_err(|error| VmError::Runtime(error.to_string()))?;
825        if let Some(reviewer) = response.reviewer.as_deref() {
826            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
827                continue;
828            }
829            if progress.reviewers.contains(reviewer) {
830                continue;
831            }
832        }
833        if response.approved.unwrap_or(false) {
834            if let Some(reviewer) = response.reviewer.clone() {
835                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
836                progress.reviewers.insert(reviewer.clone());
837                progress.signatures.push(ApprovalSignature {
838                    reviewer: reviewer.clone(),
839                    signed_at: signed_at.clone(),
840                    signature: response.signature.clone().unwrap_or_else(|| {
841                        approval_receipt_signature(
842                            &request.request_id,
843                            &reviewer,
844                            &signed_at,
845                            true,
846                            response.reason.as_deref(),
847                        )
848                    }),
849                });
850            }
851            progress.reason = response.reason.clone();
852            progress.approved_at = response.responded_at.clone();
853            if progress.reviewers.len() as u32 >= quorum {
854                return Ok(ApprovalResolution::Approved(progress));
855            }
856            continue;
857        }
858        return Ok(ApprovalResolution::Denied(response));
859    }
860    Ok(ApprovalResolution::Pending)
861}
862
863fn approval_quorum_from_request(
864    kind: HitlRequestKind,
865    request: &HitlRequestEnvelope,
866) -> Result<u32, VmError> {
867    let key = match kind {
868        HitlRequestKind::DualControl => "n",
869        _ => "quorum",
870    };
871    let quorum = request
872        .payload
873        .get(key)
874        .and_then(JsonValue::as_u64)
875        .unwrap_or(1);
876    u32::try_from(quorum).map_err(|_| {
877        VmError::Runtime(format!(
878            "invalid quorum in HITL request '{}'",
879            request.request_id
880        ))
881    })
882}
883
884fn approval_reviewers_from_request(
885    kind: HitlRequestKind,
886    request: &HitlRequestEnvelope,
887) -> Vec<String> {
888    let key = match kind {
889        HitlRequestKind::DualControl => "approvers",
890        _ => "reviewers",
891    };
892    request
893        .payload
894        .get(key)
895        .and_then(JsonValue::as_array)
896        .map(|values| {
897            values
898                .iter()
899                .filter_map(JsonValue::as_str)
900                .map(str::to_string)
901                .collect()
902        })
903        .unwrap_or_default()
904}
905
906fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
907    json!({
908        "approved": true,
909        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
910        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
911        "reason": progress.reason,
912        "signatures": progress.signatures,
913    })
914}
915
916fn approval_receipt_signature(
917    request_id: &str,
918    reviewer: &str,
919    signed_at: &str,
920    approved: bool,
921    reason: Option<&str>,
922) -> String {
923    let material = format!(
924        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
925        reason.unwrap_or("")
926    );
927    let hash = sha2::Sha256::digest(material.as_bytes());
928    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
929    format!("sha256:{hex}")
930}
931
932fn approval_record_from_waitpoint(
933    record: &WaitpointRecord,
934    builtin: &str,
935) -> Result<VmValue, VmError> {
936    record
937        .value
938        .as_ref()
939        .map(crate::stdlib::json_to_vm_value)
940        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
941}
942
943async fn approval_wait_error(
944    log: &Arc<AnyEventLog>,
945    kind: HitlRequestKind,
946    request_id: &str,
947) -> VmError {
948    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
949        if record.status == WaitpointStatus::Cancelled
950            && record.reason.as_deref() != Some("upstream_cancelled")
951        {
952            return approval_denied_error(
953                request_id,
954                HitlHostResponse {
955                    request_id: request_id.to_string(),
956                    answer: None,
957                    approved: Some(false),
958                    accepted: None,
959                    reviewer: record.cancelled_by.clone(),
960                    reason: record.reason.clone(),
961                    metadata: record.metadata.clone(),
962                    responded_at: record.cancelled_at.clone(),
963                    signature: None,
964                },
965            );
966        }
967        if record.status == WaitpointStatus::Cancelled {
968            return hitl_cancelled_error(
969                request_id,
970                kind,
971                "",
972                &[request_id.to_string()],
973                record.reason.clone(),
974            );
975        }
976    }
977    hitl_cancelled_error(
978        request_id,
979        kind,
980        "",
981        &[request_id.to_string()],
982        Some("upstream_cancelled".to_string()),
983    )
984}
985
986async fn append_timeout_once(
987    log: &Arc<AnyEventLog>,
988    kind: HitlRequestKind,
989    request_id: &str,
990    trace_id: &str,
991) -> Result<(), VmError> {
992    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
993        return Ok(());
994    }
995    append_timeout(log, kind, request_id, trace_id).await
996}
997
998async fn hitl_event_exists(
999    log: &Arc<AnyEventLog>,
1000    kind: HitlRequestKind,
1001    request_id: &str,
1002    event_kind: &str,
1003) -> Result<bool, VmError> {
1004    let topic = topic(kind)?;
1005    let events = log
1006        .read_range(&topic, None, usize::MAX)
1007        .await
1008        .map_err(log_error)?;
1009    Ok(events
1010        .into_iter()
1011        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1012}
1013
1014fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1015    match kind {
1016        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1017        _ => "hitl.approval_approved",
1018    }
1019}
1020
1021fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1022    match kind {
1023        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1024        _ => "hitl.approval_denied",
1025    }
1026}
1027
1028async fn append_request(
1029    log: &Arc<AnyEventLog>,
1030    request: &HitlRequestEnvelope,
1031) -> Result<(), VmError> {
1032    let topic = topic(request.kind)?;
1033    log.append(
1034        &topic,
1035        LogEvent::new(
1036            request.kind.request_event_kind(),
1037            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1038        )
1039        .with_headers(request_headers(request)),
1040    )
1041    .await
1042    .map(|_| ())
1043    .map_err(log_error)
1044}
1045
1046async fn append_named_event(
1047    log: &Arc<AnyEventLog>,
1048    kind: HitlRequestKind,
1049    event_kind: &str,
1050    request_id: &str,
1051    trace_id: &str,
1052    payload: JsonValue,
1053) -> Result<(), VmError> {
1054    let topic = topic(kind)?;
1055    let headers = headers_with_trace(request_id, trace_id);
1056    log.append(
1057        &topic,
1058        LogEvent::new(event_kind, payload).with_headers(headers),
1059    )
1060    .await
1061    .map(|_| ())
1062    .map_err(log_error)
1063}
1064
1065async fn append_timeout(
1066    log: &Arc<AnyEventLog>,
1067    kind: HitlRequestKind,
1068    request_id: &str,
1069    trace_id: &str,
1070) -> Result<(), VmError> {
1071    append_named_event(
1072        log,
1073        kind,
1074        "hitl.timeout",
1075        request_id,
1076        trace_id,
1077        serde_json::to_value(HitlTimeoutRecord {
1078            request_id: request_id.to_string(),
1079            kind,
1080            trace_id: trace_id.to_string(),
1081            timed_out_at: now_rfc3339(),
1082        })
1083        .map_err(|error| VmError::Runtime(error.to_string()))?,
1084    )
1085    .await
1086}
1087
1088async fn maybe_apply_mock_response(
1089    kind: HitlRequestKind,
1090    request_id: &str,
1091    request_payload: &JsonValue,
1092) -> Result<(), VmError> {
1093    let mut params = request_payload
1094        .as_object()
1095        .cloned()
1096        .unwrap_or_default()
1097        .into_iter()
1098        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1099        .collect::<BTreeMap<_, _>>();
1100    params.insert(
1101        "request_id".to_string(),
1102        VmValue::String(Rc::from(request_id.to_string())),
1103    );
1104    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1105        return Ok(());
1106    };
1107    let value = result?;
1108    let responses = match value {
1109        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1110        other => vec![other],
1111    };
1112    for response in responses {
1113        let response_dict = response.as_dict().ok_or_else(|| {
1114            VmError::Runtime(format!(
1115                "mocked HITL {} response must be a dict or list<dict>",
1116                kind.as_str()
1117            ))
1118        })?;
1119        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1120        append_hitl_response(None, hitl_response)
1121            .await
1122            .map_err(VmError::Runtime)?;
1123    }
1124    Ok(())
1125}
1126
1127fn parse_hitl_response_dict(
1128    request_id: &str,
1129    response_dict: &BTreeMap<String, VmValue>,
1130) -> Result<HitlHostResponse, VmError> {
1131    Ok(HitlHostResponse {
1132        request_id: request_id.to_string(),
1133        answer: response_dict
1134            .get("answer")
1135            .map(crate::llm::vm_value_to_json),
1136        approved: response_dict.get("approved").and_then(vm_bool),
1137        accepted: response_dict.get("accepted").and_then(vm_bool),
1138        reviewer: response_dict.get("reviewer").map(VmValue::display),
1139        reason: response_dict.get("reason").map(VmValue::display),
1140        metadata: response_dict
1141            .get("metadata")
1142            .map(crate::llm::vm_value_to_json),
1143        responded_at: response_dict.get("responded_at").map(VmValue::display),
1144        signature: response_dict.get("signature").map(VmValue::display),
1145    })
1146}
1147
1148fn maybe_notify_host(request: &HitlRequestEnvelope) {
1149    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1150        return;
1151    };
1152    bridge.notify(
1153        "harn.hitl.requested",
1154        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1155    );
1156}
1157
1158fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1159    let Some(value) = value else {
1160        return Ok(AskUserOptions {
1161            schema: None,
1162            timeout: Some(default_question_timeout()),
1163            default: None,
1164        });
1165    };
1166    let dict = value
1167        .as_dict()
1168        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1169    Ok(AskUserOptions {
1170        schema: dict
1171            .get("schema")
1172            .cloned()
1173            .filter(|value| !matches!(value, VmValue::Nil)),
1174        timeout: dict
1175            .get("timeout")
1176            .map(parse_duration_value)
1177            .transpose()?
1178            .or_else(|| Some(default_question_timeout())),
1179        default: dict
1180            .get("default")
1181            .cloned()
1182            .filter(|value| !matches!(value, VmValue::Nil)),
1183    })
1184}
1185
1186fn default_question_timeout() -> StdDuration {
1187    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1188}
1189
1190fn escalation_capability_policy() -> JsonValue {
1191    crate::orchestration::current_execution_policy()
1192        .and_then(|policy| serde_json::to_value(policy).ok())
1193        .unwrap_or(JsonValue::Null)
1194}
1195
1196fn parse_approval_options(
1197    value: Option<&VmValue>,
1198    builtin: &str,
1199) -> Result<ApprovalOptions, VmError> {
1200    let dict = match value {
1201        None => None,
1202        Some(VmValue::Dict(dict)) => Some(dict),
1203        Some(_) => {
1204            return Err(VmError::Runtime(format!(
1205                "{builtin}: options must be a dict"
1206            )))
1207        }
1208    };
1209    let quorum = dict
1210        .and_then(|dict| dict.get("quorum"))
1211        .and_then(VmValue::as_int)
1212        .unwrap_or(1);
1213    if quorum <= 0 {
1214        return Err(VmError::Runtime(format!(
1215            "{builtin}: quorum must be positive"
1216        )));
1217    }
1218    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1219    let deadline = dict
1220        .and_then(|dict| dict.get("deadline"))
1221        .map(parse_duration_value)
1222        .transpose()?
1223        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1224    Ok(ApprovalOptions {
1225        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1226        quorum: quorum as u32,
1227        reviewers,
1228        deadline,
1229    })
1230}
1231
1232fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1233    args.get(idx)
1234        .map(VmValue::display)
1235        .filter(|value| !value.is_empty())
1236        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1237}
1238
1239fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1240    let value = args
1241        .get(idx)
1242        .and_then(VmValue::as_int)
1243        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1244    if value <= 0 {
1245        return Err(VmError::Runtime(format!(
1246            "{builtin}: expected a positive int at {idx}"
1247        )));
1248    }
1249    Ok(value)
1250}
1251
1252fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1253    let Some(value) = value else {
1254        return Ok(Vec::new());
1255    };
1256    match value {
1257        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1258        _ => Err(VmError::Runtime(format!(
1259            "{builtin}: expected list<string>"
1260        ))),
1261    }
1262}
1263
1264fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1265    match value {
1266        VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
1267        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1268        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1269        _ => Err(VmError::Runtime(
1270            "expected a duration or millisecond count".to_string(),
1271        )),
1272    }
1273}
1274
1275fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1276    active_event_log()
1277        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1278}
1279
1280fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1281    if let Some(log) = active_event_log() {
1282        return Ok(log);
1283    }
1284    let Some(base_dir) = base_dir else {
1285        return Ok(install_memory_for_current_thread(
1286            HITL_EVENT_LOG_QUEUE_DEPTH,
1287        ));
1288    };
1289    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1290}
1291
1292fn current_dispatch_keys() -> Option<DispatchKeys> {
1293    let context = current_dispatch_context()?;
1294    let stable_base = context
1295        .replay_of_event_id
1296        .clone()
1297        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1298    let instance_key = format!(
1299        "{}::{}",
1300        context.trigger_event.id.0,
1301        context.replay_of_event_id.as_deref().unwrap_or("live")
1302    );
1303    Some(DispatchKeys {
1304        instance_key,
1305        stable_base,
1306        agent: context.agent_id,
1307        trace_id: context.trigger_event.trace_id.0,
1308    })
1309}
1310
1311fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1312    if let Some(keys) = dispatch_keys {
1313        let seq = REQUEST_SEQUENCE.with(|slot| {
1314            let mut state = slot.borrow_mut();
1315            if state.instance_key != keys.instance_key {
1316                state.instance_key = keys.instance_key.clone();
1317                state.next_seq = 0;
1318            }
1319            state.next_seq += 1;
1320            state.next_seq
1321        });
1322        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1323    }
1324    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1325}
1326
1327fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1328    let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1329    if let Some(run_id) = request.run_id.as_ref() {
1330        headers.insert("run_id".to_string(), run_id.clone());
1331    }
1332    headers
1333}
1334
1335fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1336    let mut headers = BTreeMap::new();
1337    headers.insert("request_id".to_string(), request_id.to_string());
1338    headers
1339}
1340
1341fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1342    let mut headers = response_headers(request_id);
1343    headers.insert("trace_id".to_string(), trace_id.to_string());
1344    headers
1345}
1346
1347fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1348    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1349}
1350
1351fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1352    event
1353        .headers
1354        .get("request_id")
1355        .is_some_and(|value| value == request_id)
1356        || event
1357            .payload
1358            .get("request_id")
1359            .and_then(JsonValue::as_str)
1360            .is_some_and(|value| value == request_id)
1361}
1362
1363fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1364    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1365        "name": "ApprovalDeniedError",
1366        "category": "generic",
1367        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1368        "request_id": request_id,
1369        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1370        "reason": response.reason,
1371    })))
1372}
1373
1374fn hitl_cancelled_error(
1375    request_id: &str,
1376    kind: HitlRequestKind,
1377    wait_id: &str,
1378    waitpoint_ids: &[String],
1379    reason: Option<String>,
1380) -> VmError {
1381    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1382    let message = reason
1383        .clone()
1384        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1385    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1386        "name": "HumanCancelledError",
1387        "category": ErrorCategory::Cancelled.as_str(),
1388        "message": message,
1389        "request_id": request_id,
1390        "kind": kind.as_str(),
1391        "wait_id": wait_id,
1392        "waitpoint_ids": waitpoint_ids,
1393        "reason": reason,
1394    })))
1395}
1396
1397fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1398    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1399    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1400        "name": "HumanTimeoutError",
1401        "category": ErrorCategory::Timeout.as_str(),
1402        "message": format!("{} timed out", kind.as_str()),
1403        "request_id": request_id,
1404        "kind": kind.as_str(),
1405    })))
1406}
1407
1408fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1409    match default {
1410        VmValue::Int(_) => match value {
1411            VmValue::Int(_) => value.clone(),
1412            VmValue::Float(number) => VmValue::Int(*number as i64),
1413            VmValue::String(text) => text
1414                .parse::<i64>()
1415                .map(VmValue::Int)
1416                .unwrap_or_else(|_| default.clone()),
1417            _ => default.clone(),
1418        },
1419        VmValue::Float(_) => match value {
1420            VmValue::Float(_) => value.clone(),
1421            VmValue::Int(number) => VmValue::Float(*number as f64),
1422            VmValue::String(text) => text
1423                .parse::<f64>()
1424                .map(VmValue::Float)
1425                .unwrap_or_else(|_| default.clone()),
1426            _ => default.clone(),
1427        },
1428        VmValue::Bool(_) => match value {
1429            VmValue::Bool(_) => value.clone(),
1430            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1431            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1432            _ => default.clone(),
1433        },
1434        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1435        VmValue::Duration(_) => match value {
1436            VmValue::Duration(_) => value.clone(),
1437            VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1438            _ => default.clone(),
1439        },
1440        VmValue::Nil => value.clone(),
1441        _ => {
1442            if value.type_name() == default.type_name() {
1443                value.clone()
1444            } else {
1445                default.clone()
1446            }
1447        }
1448    }
1449}
1450
1451fn log_error(error: impl std::fmt::Display) -> VmError {
1452    VmError::Runtime(error.to_string())
1453}
1454
1455fn now_rfc3339() -> String {
1456    OffsetDateTime::now_utc()
1457        .format(&Rfc3339)
1458        .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1459}
1460
1461fn new_trace_id() -> String {
1462    format!("trace_{}", Uuid::now_v7())
1463}
1464
1465fn vm_bool(value: &VmValue) -> Option<bool> {
1466    match value {
1467        VmValue::Bool(flag) => Some(*flag),
1468        _ => None,
1469    }
1470}
1471
1472fn vm_string(value: &VmValue) -> Option<&str> {
1473    match value {
1474        VmValue::String(text) => Some(text.as_ref()),
1475        _ => None,
1476    }
1477}
1478
1479fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1480    match value {
1481        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1482        _ => None,
1483    }
1484}
1485
1486#[cfg(test)]
1487mod tests {
1488    use super::{
1489        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1490    };
1491    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1492    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1493
1494    async fn execute_hitl_script(
1495        base_dir: &std::path::Path,
1496        source: &str,
1497    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1498        reset_thread_local_state();
1499        let log = install_default_for_base_dir(base_dir).expect("install event log");
1500        let chunk = compile_source(source).expect("compile source");
1501        let mut vm = Vm::new();
1502        register_vm_stdlib(&mut vm);
1503        vm.set_source_dir(base_dir);
1504        vm.execute(&chunk).await?;
1505        let output = vm.output().trim_end().to_string();
1506        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1507        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1508        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1509        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1510        Ok((
1511            output,
1512            question_events,
1513            approval_events,
1514            dual_control_events,
1515            escalation_events,
1516        ))
1517    }
1518
1519    async fn event_kinds(
1520        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1521        topic: &str,
1522    ) -> Vec<String> {
1523        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1524            .await
1525            .expect("read topic")
1526            .into_iter()
1527            .map(|(_, event)| event.kind)
1528            .collect()
1529    }
1530
1531    #[tokio::test(flavor = "current_thread")]
1532    async fn ask_user_coerces_to_default_type_and_logs_events() {
1533        tokio::task::LocalSet::new()
1534            .run_until(async {
1535                let dir = tempfile::tempdir().expect("tempdir");
1536                let source = r#"
1537pipeline test(task) {
1538  host_mock("hitl", "question", {answer: "9"})
1539  let answer: int = ask_user("Pick a number", {default: 0})
1540  println(answer)
1541}
1542"#;
1543                let (
1544                    output,
1545                    question_events,
1546                    approval_events,
1547                    dual_control_events,
1548                    escalation_events,
1549                ) = execute_hitl_script(dir.path(), source)
1550                    .await
1551                    .expect("script succeeds");
1552                assert_eq!(output, "9");
1553                assert_eq!(
1554                    question_events,
1555                    vec![
1556                        "hitl.question_asked".to_string(),
1557                        "hitl.response_received".to_string()
1558                    ]
1559                );
1560                assert!(approval_events.is_empty());
1561                assert!(dual_control_events.is_empty());
1562                assert!(escalation_events.is_empty());
1563            })
1564            .await;
1565    }
1566
1567    #[tokio::test(flavor = "current_thread")]
1568    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1569        tokio::task::LocalSet::new()
1570            .run_until(async {
1571                let dir = tempfile::tempdir().expect("tempdir");
1572                let source = r#"
1573pipeline test(task) {
1574  host_mock("hitl", "approval", [
1575    {approved: true, reviewer: "alice", reason: "ok"},
1576    {approved: true, reviewer: "bob", reason: "ship it"},
1577  ])
1578  let record = request_approval(
1579    "deploy production",
1580    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1581  )
1582  println(record.approved)
1583  println(len(record.reviewers))
1584  println(record.reviewers[0])
1585  println(record.reviewers[1])
1586}
1587"#;
1588                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1589                    .await
1590                    .expect("script succeeds");
1591                assert_eq!(
1592                    approval_events,
1593                    vec![
1594                        "hitl.approval_requested".to_string(),
1595                        "hitl.response_received".to_string(),
1596                        "hitl.response_received".to_string(),
1597                        "hitl.approval_approved".to_string(),
1598                    ]
1599                );
1600            })
1601            .await;
1602    }
1603
1604    #[tokio::test(flavor = "current_thread")]
1605    async fn request_approval_surfaces_denials_as_typed_errors() {
1606        tokio::task::LocalSet::new()
1607            .run_until(async {
1608                let dir = tempfile::tempdir().expect("tempdir");
1609                let source = r#"
1610pipeline test(task) {
1611  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1612  let denied = try {
1613    request_approval("drop table", {reviewers: ["alice"]})
1614  }
1615  println(is_err(denied))
1616  println(unwrap_err(denied).name)
1617  println(unwrap_err(denied).reason)
1618}
1619"#;
1620                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1621                    .await
1622                    .expect("script succeeds");
1623                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1624                assert_eq!(
1625                    approval_events,
1626                    vec![
1627                        "hitl.approval_requested".to_string(),
1628                        "hitl.response_received".to_string(),
1629                        "hitl.approval_denied".to_string(),
1630                    ]
1631                );
1632            })
1633            .await;
1634    }
1635
1636    #[tokio::test(flavor = "current_thread")]
1637    async fn dual_control_executes_action_after_quorum() {
1638        tokio::task::LocalSet::new()
1639            .run_until(async {
1640                let dir = tempfile::tempdir().expect("tempdir");
1641                let source = r#"
1642pipeline test(task) {
1643  host_mock("hitl", "dual_control", [
1644    {approved: true, reviewer: "alice"},
1645    {approved: true, reviewer: "bob"},
1646  ])
1647  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1648  println(result)
1649}
1650"#;
1651                let (output, _, _, dual_control_events, _) =
1652                    execute_hitl_script(dir.path(), source)
1653                        .await
1654                        .expect("script succeeds");
1655                assert_eq!(output, "launched");
1656                assert_eq!(
1657                    dual_control_events,
1658                    vec![
1659                        "hitl.dual_control_requested".to_string(),
1660                        "hitl.response_received".to_string(),
1661                        "hitl.response_received".to_string(),
1662                        "hitl.dual_control_approved".to_string(),
1663                        "hitl.dual_control_executed".to_string(),
1664                    ]
1665                );
1666            })
1667            .await;
1668    }
1669
1670    #[tokio::test(flavor = "current_thread")]
1671    async fn escalate_to_waits_for_acceptance_event() {
1672        tokio::task::LocalSet::new()
1673            .run_until(async {
1674                let dir = tempfile::tempdir().expect("tempdir");
1675                let source = r#"
1676pipeline test(task) {
1677  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1678  let handle = escalate_to("admin", "need override")
1679  println(handle.status)
1680  println(handle.reviewer)
1681}
1682"#;
1683                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1684                    .await
1685                    .expect("script succeeds");
1686                assert_eq!(output, "accepted\nlead");
1687                assert_eq!(
1688                    escalation_events,
1689                    vec![
1690                        "hitl.escalation_issued".to_string(),
1691                        "hitl.escalation_accepted".to_string(),
1692                    ]
1693                );
1694            })
1695            .await;
1696    }
1697}