Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22    cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23    wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24    WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45    pub(crate) instance_key: String,
46    pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52    Question,
53    Approval,
54    DualControl,
55    Escalation,
56}
57
58impl HitlRequestKind {
59    pub(crate) fn as_str(self) -> &'static str {
60        match self {
61            Self::Question => "question",
62            Self::Approval => "approval",
63            Self::DualControl => "dual_control",
64            Self::Escalation => "escalation",
65        }
66    }
67
68    fn topic(self) -> &'static str {
69        match self {
70            Self::Question => HITL_QUESTIONS_TOPIC,
71            Self::Approval => HITL_APPROVALS_TOPIC,
72            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73            Self::Escalation => HITL_ESCALATIONS_TOPIC,
74        }
75    }
76
77    fn request_event_kind(self) -> &'static str {
78        match self {
79            Self::Question => "hitl.question_asked",
80            Self::Approval => "hitl.approval_requested",
81            Self::DualControl => "hitl.dual_control_requested",
82            Self::Escalation => "hitl.escalation_issued",
83        }
84    }
85
86    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87        if request_id.starts_with("hitl_question_") {
88            Some(Self::Question)
89        } else if request_id.starts_with("hitl_approval_") {
90            Some(Self::Approval)
91        } else if request_id.starts_with("hitl_dual_control_") {
92            Some(Self::DualControl)
93        } else if request_id.starts_with("hitl_escalation_") {
94            Some(Self::Escalation)
95        } else {
96            None
97        }
98    }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103    pub request_id: String,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub answer: Option<JsonValue>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub approved: Option<bool>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub accepted: Option<bool>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub reviewer: Option<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub reason: Option<String>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub metadata: Option<JsonValue>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub responded_at: Option<String>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124    request_id: String,
125    kind: HitlRequestKind,
126    #[serde(default)]
127    agent: String,
128    trace_id: String,
129    requested_at: String,
130    payload: JsonValue,
131}
132
133#[derive(Clone, Debug, Serialize, Deserialize)]
134struct HitlTimeoutRecord {
135    request_id: String,
136    kind: HitlRequestKind,
137    trace_id: String,
138    timed_out_at: String,
139}
140
141#[derive(Clone, Debug)]
142struct DispatchKeys {
143    instance_key: String,
144    stable_base: String,
145    agent: String,
146    trace_id: String,
147}
148
149#[derive(Clone, Debug)]
150struct AskUserOptions {
151    schema: Option<VmValue>,
152    timeout: Option<StdDuration>,
153    default: Option<VmValue>,
154}
155
156#[derive(Clone, Debug)]
157struct ApprovalOptions {
158    detail: Option<VmValue>,
159    quorum: u32,
160    reviewers: Vec<String>,
161    deadline: StdDuration,
162}
163
164#[derive(Clone, Debug)]
165struct ApprovalProgress {
166    reviewers: BTreeSet<String>,
167    signatures: Vec<ApprovalSignature>,
168    reason: Option<String>,
169    approved_at: Option<String>,
170}
171
172#[derive(Clone, Debug, Serialize)]
173struct ApprovalSignature {
174    reviewer: String,
175    signed_at: String,
176    signature: String,
177}
178
179#[derive(Clone, Debug)]
180enum ApprovalResolution {
181    Pending,
182    Approved(ApprovalProgress),
183    Denied(HitlHostResponse),
184}
185
186#[derive(Clone, Debug)]
187enum WaitpointOutcome {
188    Completed(WaitpointRecord),
189    Timeout,
190    Cancelled {
191        wait_id: String,
192        waitpoint_ids: Vec<String>,
193        reason: Option<String>,
194    },
195}
196
197pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
198    vm.register_async_builtin("ask_user", |args| {
199        Box::pin(async move { ask_user_impl(&args).await })
200    });
201
202    vm.register_async_builtin("request_approval", |args| {
203        Box::pin(async move { request_approval_impl(&args).await })
204    });
205
206    vm.register_async_builtin("dual_control", |args| {
207        Box::pin(async move { dual_control_impl(&args).await })
208    });
209
210    vm.register_async_builtin("escalate_to", |args| {
211        Box::pin(async move { escalate_to_impl(&args).await })
212    });
213}
214
215pub(crate) fn reset_hitl_state() {
216    REQUEST_SEQUENCE.with(|slot| {
217        *slot.borrow_mut() = RequestSequenceState::default();
218    });
219}
220
221pub(crate) fn take_hitl_state() -> RequestSequenceState {
222    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
223}
224
225pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
226    REQUEST_SEQUENCE.with(|slot| {
227        *slot.borrow_mut() = state;
228    });
229}
230
231pub async fn append_hitl_response(
232    base_dir: Option<&Path>,
233    mut response: HitlHostResponse,
234) -> Result<u64, String> {
235    let kind = HitlRequestKind::from_request_id(&response.request_id)
236        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
237    if response.responded_at.is_none() {
238        response.responded_at = Some(now_rfc3339());
239    }
240    let log = ensure_hitl_event_log_for(base_dir)?;
241    let headers = response_headers(&response.request_id);
242    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
243    let event_id = log
244        .append(
245            &topic,
246            LogEvent::new(
247                match kind {
248                    HitlRequestKind::Escalation => "hitl.escalation_accepted",
249                    _ => "hitl.response_received",
250                },
251                serde_json::to_value(&response).map_err(|error| error.to_string())?,
252            )
253            .with_headers(headers),
254        )
255        .await
256        .map_err(|error| error.to_string())?;
257    finalize_hitl_response(&log, kind, &response).await?;
258    Ok(event_id)
259}
260
261pub async fn append_approval_request_on(
262    log: &Arc<AnyEventLog>,
263    agent: impl Into<String>,
264    trace_id: impl Into<String>,
265    action: impl Into<String>,
266    detail: JsonValue,
267    reviewers: Vec<String>,
268) -> Result<String, VmError> {
269    let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
270    let trace_id = trace_id.into();
271    let request = HitlRequestEnvelope {
272        request_id: request_id.clone(),
273        kind: HitlRequestKind::Approval,
274        agent: agent.into(),
275        trace_id: trace_id.clone(),
276        requested_at: now_rfc3339(),
277        payload: json!({
278            "action": action.into(),
279            "detail": detail,
280            "quorum": 1,
281            "reviewers": reviewers,
282            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
283        }),
284    };
285    create_request_waitpoint(log, &request).await?;
286    append_request(log, &request).await?;
287    maybe_notify_host(&request);
288    Ok(request_id)
289}
290
291async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
292    let prompt = required_string_arg(args, 0, "ask_user")?;
293    let options = parse_ask_user_options(args.get(1))?;
294    let keys = current_dispatch_keys();
295    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
296    let trace_id = keys
297        .as_ref()
298        .map(|keys| keys.trace_id.clone())
299        .unwrap_or_else(new_trace_id);
300    let log = ensure_hitl_event_log();
301    let request = HitlRequestEnvelope {
302        request_id: request_id.clone(),
303        kind: HitlRequestKind::Question,
304        agent: keys
305            .as_ref()
306            .map(|keys| keys.agent.clone())
307            .unwrap_or_default(),
308        trace_id: trace_id.clone(),
309        requested_at: now_rfc3339(),
310        payload: json!({
311            "prompt": prompt,
312            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
313            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
314            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
315        }),
316    };
317    create_request_waitpoint(&log, &request).await?;
318    append_request(&log, &request).await?;
319    maybe_notify_host(&request);
320    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
321
322    match wait_for_request_waitpoint(&request_id, options.timeout).await? {
323        WaitpointOutcome::Completed(record) => {
324            let answer = record
325                .value
326                .as_ref()
327                .map(crate::stdlib::json_to_vm_value)
328                .unwrap_or(VmValue::Nil);
329            if let Some(schema) = options.schema.as_ref() {
330                return schema_expect_value(&answer, schema, true);
331            }
332            if let Some(default) = options.default.as_ref() {
333                return Ok(coerce_like_default(&answer, default));
334            }
335            Ok(answer)
336        }
337        WaitpointOutcome::Timeout => {
338            append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
339            if let Some(default) = options.default {
340                return Ok(default);
341            }
342            Err(timeout_error(&request_id, HitlRequestKind::Question))
343        }
344        WaitpointOutcome::Cancelled {
345            wait_id,
346            waitpoint_ids,
347            reason,
348        } => Err(hitl_cancelled_error(
349            &request_id,
350            HitlRequestKind::Question,
351            &wait_id,
352            &waitpoint_ids,
353            reason,
354        )),
355    }
356}
357
358async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
359    let action = required_string_arg(args, 0, "request_approval")?;
360    let options = parse_approval_options(args.get(1), "request_approval")?;
361    let keys = current_dispatch_keys();
362    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
363    let trace_id = keys
364        .as_ref()
365        .map(|keys| keys.trace_id.clone())
366        .unwrap_or_else(new_trace_id);
367    let log = ensure_hitl_event_log();
368    let request = HitlRequestEnvelope {
369        request_id: request_id.clone(),
370        kind: HitlRequestKind::Approval,
371        agent: keys
372            .as_ref()
373            .map(|keys| keys.agent.clone())
374            .unwrap_or_default(),
375        trace_id: trace_id.clone(),
376        requested_at: now_rfc3339(),
377        payload: json!({
378            "action": action,
379            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
380            "quorum": options.quorum,
381            "reviewers": options.reviewers,
382            "deadline_ms": options.deadline.as_millis() as u64,
383        }),
384    };
385    create_request_waitpoint(&log, &request).await?;
386    append_request(&log, &request).await?;
387    maybe_notify_host(&request);
388    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
389
390    match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
391        WaitpointOutcome::Completed(record) => {
392            approval_record_from_waitpoint(&record, "request_approval")
393        }
394        WaitpointOutcome::Timeout => {
395            append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
396            Err(timeout_error(&request_id, HitlRequestKind::Approval))
397        }
398        WaitpointOutcome::Cancelled { .. } => {
399            Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
400        }
401    }
402}
403
404async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
405    let n = required_positive_int_arg(args, 0, "dual_control")?;
406    let m = required_positive_int_arg(args, 1, "dual_control")?;
407    if n > m {
408        return Err(VmError::Runtime(
409            "dual_control: n must be less than or equal to m".to_string(),
410        ));
411    }
412    let action = args
413        .get(2)
414        .and_then(|value| match value {
415            VmValue::Closure(closure) => Some(closure.clone()),
416            _ => None,
417        })
418        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
419    let approvers = optional_string_list(args.get(3), "dual_control")?;
420    if !approvers.is_empty() && approvers.len() < m as usize {
421        return Err(VmError::Runtime(format!(
422            "dual_control: expected at least {m} approvers, got {}",
423            approvers.len()
424        )));
425    }
426
427    let keys = current_dispatch_keys();
428    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
429    let trace_id = keys
430        .as_ref()
431        .map(|keys| keys.trace_id.clone())
432        .unwrap_or_else(new_trace_id);
433    let action_name = if action.func.name.is_empty() {
434        "anonymous".to_string()
435    } else {
436        action.func.name.clone()
437    };
438    let log = ensure_hitl_event_log();
439    let request = HitlRequestEnvelope {
440        request_id: request_id.clone(),
441        kind: HitlRequestKind::DualControl,
442        agent: keys
443            .as_ref()
444            .map(|keys| keys.agent.clone())
445            .unwrap_or_default(),
446        trace_id: trace_id.clone(),
447        requested_at: now_rfc3339(),
448        payload: json!({
449            "n": n,
450            "m": m,
451            "action": action_name,
452            "approvers": approvers,
453            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
454        }),
455    };
456    create_request_waitpoint(&log, &request).await?;
457    append_request(&log, &request).await?;
458    maybe_notify_host(&request);
459    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
460
461    match wait_for_request_waitpoint(
462        &request_id,
463        Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
464    )
465    .await?
466    {
467        WaitpointOutcome::Completed(record) => {
468            let _ = approval_record_from_waitpoint(&record, "dual_control")?;
469            let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
470                VmError::Runtime("dual_control requires an async builtin VM context".to_string())
471            })?;
472            let result = vm.call_closure_pub(&action, &[]).await?;
473
474            append_named_event(
475                &log,
476                HitlRequestKind::DualControl,
477                "hitl.dual_control_executed",
478                &request_id,
479                &trace_id,
480                json!({
481                    "request_id": request_id,
482                    "result": crate::llm::vm_value_to_json(&result),
483                }),
484            )
485            .await?;
486
487            Ok(result)
488        }
489        WaitpointOutcome::Timeout => {
490            append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
491            Err(timeout_error(&request_id, HitlRequestKind::DualControl))
492        }
493        WaitpointOutcome::Cancelled { .. } => {
494            Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
495        }
496    }
497}
498
499async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
500    let role = required_string_arg(args, 0, "escalate_to")?;
501    let reason = required_string_arg(args, 1, "escalate_to")?;
502    let keys = current_dispatch_keys();
503    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
504    let trace_id = keys
505        .as_ref()
506        .map(|keys| keys.trace_id.clone())
507        .unwrap_or_else(new_trace_id);
508    let log = ensure_hitl_event_log();
509    let request = HitlRequestEnvelope {
510        request_id: request_id.clone(),
511        kind: HitlRequestKind::Escalation,
512        agent: keys
513            .as_ref()
514            .map(|keys| keys.agent.clone())
515            .unwrap_or_default(),
516        trace_id: trace_id.clone(),
517        requested_at: now_rfc3339(),
518        payload: json!({
519            "role": role,
520            "reason": reason,
521            "capability_policy": escalation_capability_policy(),
522        }),
523    };
524    create_request_waitpoint(&log, &request).await?;
525    append_request(&log, &request).await?;
526    maybe_notify_host(&request);
527    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
528
529    match wait_for_request_waitpoint(&request_id, None).await? {
530        WaitpointOutcome::Completed(record) => {
531            let accepted_at = record.completed_at.clone();
532            let reviewer = record.completed_by.clone();
533            let accepted = record
534                .value
535                .as_ref()
536                .and_then(|value| value.get("accepted"))
537                .and_then(JsonValue::as_bool)
538                .unwrap_or(true);
539            Ok(crate::stdlib::json_to_vm_value(&json!({
540                "request_id": request_id,
541                "role": role,
542                "reason": reason,
543                "trace_id": trace_id,
544                "status": if accepted { "accepted" } else { "pending" },
545                "accepted_at": accepted_at,
546                "reviewer": reviewer,
547            })))
548        }
549        WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
550        WaitpointOutcome::Cancelled {
551            wait_id,
552            waitpoint_ids,
553            reason,
554        } => Err(hitl_cancelled_error(
555            &request_id,
556            HitlRequestKind::Escalation,
557            &wait_id,
558            &waitpoint_ids,
559            reason,
560        )),
561    }
562}
563
564async fn create_request_waitpoint(
565    log: &Arc<AnyEventLog>,
566    request: &HitlRequestEnvelope,
567) -> Result<(), VmError> {
568    create_waitpoint_on(
569        log,
570        Some(request.request_id.clone()),
571        Some(json!({
572            "kind": request.kind.as_str(),
573            "agent": request.agent.clone(),
574            "trace_id": request.trace_id.clone(),
575            "requested_at": request.requested_at.clone(),
576            "payload": request.payload.clone(),
577        })),
578    )
579    .await?;
580    Ok(())
581}
582
583async fn wait_for_request_waitpoint(
584    request_id: &str,
585    timeout: Option<StdDuration>,
586) -> Result<WaitpointOutcome, VmError> {
587    match wait_on_waitpoints(
588        vec![request_id.to_string()],
589        WaitpointWaitOptions { timeout },
590    )
591    .await
592    {
593        Ok(records) => Ok(WaitpointOutcome::Completed(
594            records
595                .into_iter()
596                .next()
597                .expect("single waitpoint wait result"),
598        )),
599        Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
600        Err(WaitpointWaitFailure::Cancelled {
601            wait_id,
602            waitpoint_ids,
603            reason,
604        }) => Ok(WaitpointOutcome::Cancelled {
605            wait_id,
606            waitpoint_ids,
607            reason,
608        }),
609        Err(WaitpointWaitFailure::Vm(error)) => {
610            if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
611                return Ok(outcome);
612            }
613            Err(error)
614        }
615    }
616}
617
618fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
619    let VmError::Thrown(VmValue::Dict(dict)) = error else {
620        return None;
621    };
622    let name = dict.get("name").and_then(vm_string)?;
623    match name {
624        "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
625        "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
626            wait_id: dict
627                .get("wait_id")
628                .and_then(vm_string)
629                .unwrap_or_default()
630                .to_string(),
631            waitpoint_ids: dict
632                .get("waitpoint_ids")
633                .and_then(vm_string_list)
634                .unwrap_or_default(),
635            reason: dict
636                .get("reason")
637                .and_then(vm_string)
638                .map(ToString::to_string),
639        }),
640        _ => None,
641    }
642}
643
644async fn finalize_hitl_response(
645    log: &Arc<AnyEventLog>,
646    kind: HitlRequestKind,
647    response: &HitlHostResponse,
648) -> Result<(), String> {
649    match kind {
650        HitlRequestKind::Question => {
651            if waitpoint_is_terminal(log, &response.request_id).await? {
652                return Ok(());
653            }
654            complete_waitpoint_on(
655                log,
656                &response.request_id,
657                response.answer.clone(),
658                response.reviewer.clone(),
659                response.reason.clone(),
660                response.metadata.clone(),
661            )
662            .await
663            .map(|_| ())
664            .map_err(|error| error.to_string())
665        }
666        HitlRequestKind::Escalation => {
667            if !response.accepted.unwrap_or(false)
668                || waitpoint_is_terminal(log, &response.request_id).await?
669            {
670                return Ok(());
671            }
672            complete_waitpoint_on(
673                log,
674                &response.request_id,
675                Some(json!({
676                    "accepted": true,
677                    "reviewer": response.reviewer,
678                    "reason": response.reason,
679                    "responded_at": response.responded_at,
680                })),
681                response.reviewer.clone(),
682                response.reason.clone(),
683                response.metadata.clone(),
684            )
685            .await
686            .map(|_| ())
687            .map_err(|error| error.to_string())
688        }
689        HitlRequestKind::Approval | HitlRequestKind::DualControl => {
690            if waitpoint_is_terminal(log, &response.request_id).await? {
691                return Ok(());
692            }
693            let request = load_request_envelope(log, kind, &response.request_id)
694                .await
695                .map_err(|error| error.to_string())?;
696            match resolve_approval_state(log, kind, &request)
697                .await
698                .map_err(|error| error.to_string())?
699            {
700                ApprovalResolution::Pending => Ok(()),
701                ApprovalResolution::Approved(progress) => {
702                    let record = approval_record_json(&progress);
703                    append_named_event(
704                        log,
705                        kind,
706                        approved_event_kind(kind),
707                        &request.request_id,
708                        &request.trace_id,
709                        json!({
710                            "request_id": request.request_id.clone(),
711                            "record": record.clone(),
712                        }),
713                    )
714                    .await
715                    .map_err(|error| error.to_string())?;
716                    complete_waitpoint_on(
717                        log,
718                        &request.request_id,
719                        Some(record),
720                        response.reviewer.clone(),
721                        progress.reason.clone(),
722                        response.metadata.clone(),
723                    )
724                    .await
725                    .map(|_| ())
726                    .map_err(|error| error.to_string())
727                }
728                ApprovalResolution::Denied(denied) => {
729                    append_named_event(
730                        log,
731                        kind,
732                        denied_event_kind(kind),
733                        &request.request_id,
734                        &request.trace_id,
735                        json!({
736                            "request_id": request.request_id.clone(),
737                            "reviewer": denied.reviewer.clone(),
738                            "reason": denied.reason.clone(),
739                        }),
740                    )
741                    .await
742                    .map_err(|error| error.to_string())?;
743                    cancel_waitpoint_on(
744                        log,
745                        &request.request_id,
746                        denied.reviewer.clone(),
747                        denied.reason.clone(),
748                        denied.metadata.clone(),
749                    )
750                    .await
751                    .map(|_| ())
752                    .map_err(|error| error.to_string())
753                }
754            }
755        }
756    }
757}
758
759async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
760    Ok(inspect_waitpoint_on(log, request_id)
761        .await
762        .map_err(|error| error.to_string())?
763        .is_some_and(|record| record.status != WaitpointStatus::Open))
764}
765
766async fn load_request_envelope(
767    log: &Arc<AnyEventLog>,
768    kind: HitlRequestKind,
769    request_id: &str,
770) -> Result<HitlRequestEnvelope, VmError> {
771    let topic = topic(kind)?;
772    let events = log
773        .read_range(&topic, None, usize::MAX)
774        .await
775        .map_err(log_error)?;
776    events
777        .into_iter()
778        .filter(|(_, event)| event.kind == kind.request_event_kind())
779        .find_map(|(_, event)| {
780            if !event_matches_request(&event, request_id) {
781                return None;
782            }
783            serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
784        })
785        .ok_or_else(|| {
786            VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
787        })
788}
789
790async fn resolve_approval_state(
791    log: &Arc<AnyEventLog>,
792    kind: HitlRequestKind,
793    request: &HitlRequestEnvelope,
794) -> Result<ApprovalResolution, VmError> {
795    let quorum = approval_quorum_from_request(kind, request)?;
796    let allowed_reviewers = approval_reviewers_from_request(kind, request)
797        .into_iter()
798        .collect::<BTreeSet<_>>();
799    let mut progress = ApprovalProgress {
800        reviewers: BTreeSet::new(),
801        signatures: Vec::new(),
802        reason: None,
803        approved_at: None,
804    };
805    let topic = topic(kind)?;
806    let events = log
807        .read_range(&topic, None, usize::MAX)
808        .await
809        .map_err(log_error)?;
810    for (_, event) in events {
811        if !event_matches_request(&event, &request.request_id)
812            || event.kind != "hitl.response_received"
813        {
814            continue;
815        }
816        let response: HitlHostResponse = serde_json::from_value(event.payload)
817            .map_err(|error| VmError::Runtime(error.to_string()))?;
818        if let Some(reviewer) = response.reviewer.as_deref() {
819            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
820                continue;
821            }
822            if progress.reviewers.contains(reviewer) {
823                continue;
824            }
825        }
826        if response.approved.unwrap_or(false) {
827            if let Some(reviewer) = response.reviewer.clone() {
828                let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
829                progress.reviewers.insert(reviewer.clone());
830                progress.signatures.push(ApprovalSignature {
831                    reviewer: reviewer.clone(),
832                    signed_at: signed_at.clone(),
833                    signature: response.signature.clone().unwrap_or_else(|| {
834                        approval_receipt_signature(
835                            &request.request_id,
836                            &reviewer,
837                            &signed_at,
838                            true,
839                            response.reason.as_deref(),
840                        )
841                    }),
842                });
843            }
844            progress.reason = response.reason.clone();
845            progress.approved_at = response.responded_at.clone();
846            if progress.reviewers.len() as u32 >= quorum {
847                return Ok(ApprovalResolution::Approved(progress));
848            }
849            continue;
850        }
851        return Ok(ApprovalResolution::Denied(response));
852    }
853    Ok(ApprovalResolution::Pending)
854}
855
856fn approval_quorum_from_request(
857    kind: HitlRequestKind,
858    request: &HitlRequestEnvelope,
859) -> Result<u32, VmError> {
860    let key = match kind {
861        HitlRequestKind::DualControl => "n",
862        _ => "quorum",
863    };
864    let quorum = request
865        .payload
866        .get(key)
867        .and_then(JsonValue::as_u64)
868        .unwrap_or(1);
869    u32::try_from(quorum).map_err(|_| {
870        VmError::Runtime(format!(
871            "invalid quorum in HITL request '{}'",
872            request.request_id
873        ))
874    })
875}
876
877fn approval_reviewers_from_request(
878    kind: HitlRequestKind,
879    request: &HitlRequestEnvelope,
880) -> Vec<String> {
881    let key = match kind {
882        HitlRequestKind::DualControl => "approvers",
883        _ => "reviewers",
884    };
885    request
886        .payload
887        .get(key)
888        .and_then(JsonValue::as_array)
889        .map(|values| {
890            values
891                .iter()
892                .filter_map(JsonValue::as_str)
893                .map(str::to_string)
894                .collect()
895        })
896        .unwrap_or_default()
897}
898
899fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
900    json!({
901        "approved": true,
902        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
903        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
904        "reason": progress.reason,
905        "signatures": progress.signatures,
906    })
907}
908
909fn approval_receipt_signature(
910    request_id: &str,
911    reviewer: &str,
912    signed_at: &str,
913    approved: bool,
914    reason: Option<&str>,
915) -> String {
916    let material = format!(
917        "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
918        reason.unwrap_or("")
919    );
920    let hash = sha2::Sha256::digest(material.as_bytes());
921    let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
922    format!("sha256:{hex}")
923}
924
925fn approval_record_from_waitpoint(
926    record: &WaitpointRecord,
927    builtin: &str,
928) -> Result<VmValue, VmError> {
929    record
930        .value
931        .as_ref()
932        .map(crate::stdlib::json_to_vm_value)
933        .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
934}
935
936async fn approval_wait_error(
937    log: &Arc<AnyEventLog>,
938    kind: HitlRequestKind,
939    request_id: &str,
940) -> VmError {
941    if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
942        if record.status == WaitpointStatus::Cancelled
943            && record.reason.as_deref() != Some("upstream_cancelled")
944        {
945            return approval_denied_error(
946                request_id,
947                HitlHostResponse {
948                    request_id: request_id.to_string(),
949                    answer: None,
950                    approved: Some(false),
951                    accepted: None,
952                    reviewer: record.cancelled_by.clone(),
953                    reason: record.reason.clone(),
954                    metadata: record.metadata.clone(),
955                    responded_at: record.cancelled_at.clone(),
956                    signature: None,
957                },
958            );
959        }
960        if record.status == WaitpointStatus::Cancelled {
961            return hitl_cancelled_error(
962                request_id,
963                kind,
964                "",
965                &[request_id.to_string()],
966                record.reason.clone(),
967            );
968        }
969    }
970    hitl_cancelled_error(
971        request_id,
972        kind,
973        "",
974        &[request_id.to_string()],
975        Some("upstream_cancelled".to_string()),
976    )
977}
978
979async fn append_timeout_once(
980    log: &Arc<AnyEventLog>,
981    kind: HitlRequestKind,
982    request_id: &str,
983    trace_id: &str,
984) -> Result<(), VmError> {
985    if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
986        return Ok(());
987    }
988    append_timeout(log, kind, request_id, trace_id).await
989}
990
991async fn hitl_event_exists(
992    log: &Arc<AnyEventLog>,
993    kind: HitlRequestKind,
994    request_id: &str,
995    event_kind: &str,
996) -> Result<bool, VmError> {
997    let topic = topic(kind)?;
998    let events = log
999        .read_range(&topic, None, usize::MAX)
1000        .await
1001        .map_err(log_error)?;
1002    Ok(events
1003        .into_iter()
1004        .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1005}
1006
1007fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1008    match kind {
1009        HitlRequestKind::DualControl => "hitl.dual_control_approved",
1010        _ => "hitl.approval_approved",
1011    }
1012}
1013
1014fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1015    match kind {
1016        HitlRequestKind::DualControl => "hitl.dual_control_denied",
1017        _ => "hitl.approval_denied",
1018    }
1019}
1020
1021async fn append_request(
1022    log: &Arc<AnyEventLog>,
1023    request: &HitlRequestEnvelope,
1024) -> Result<(), VmError> {
1025    let topic = topic(request.kind)?;
1026    log.append(
1027        &topic,
1028        LogEvent::new(
1029            request.kind.request_event_kind(),
1030            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1031        )
1032        .with_headers(request_headers(request)),
1033    )
1034    .await
1035    .map(|_| ())
1036    .map_err(log_error)
1037}
1038
1039async fn append_named_event(
1040    log: &Arc<AnyEventLog>,
1041    kind: HitlRequestKind,
1042    event_kind: &str,
1043    request_id: &str,
1044    trace_id: &str,
1045    payload: JsonValue,
1046) -> Result<(), VmError> {
1047    let topic = topic(kind)?;
1048    let headers = headers_with_trace(request_id, trace_id);
1049    log.append(
1050        &topic,
1051        LogEvent::new(event_kind, payload).with_headers(headers),
1052    )
1053    .await
1054    .map(|_| ())
1055    .map_err(log_error)
1056}
1057
1058async fn append_timeout(
1059    log: &Arc<AnyEventLog>,
1060    kind: HitlRequestKind,
1061    request_id: &str,
1062    trace_id: &str,
1063) -> Result<(), VmError> {
1064    append_named_event(
1065        log,
1066        kind,
1067        "hitl.timeout",
1068        request_id,
1069        trace_id,
1070        serde_json::to_value(HitlTimeoutRecord {
1071            request_id: request_id.to_string(),
1072            kind,
1073            trace_id: trace_id.to_string(),
1074            timed_out_at: now_rfc3339(),
1075        })
1076        .map_err(|error| VmError::Runtime(error.to_string()))?,
1077    )
1078    .await
1079}
1080
1081async fn maybe_apply_mock_response(
1082    kind: HitlRequestKind,
1083    request_id: &str,
1084    request_payload: &JsonValue,
1085) -> Result<(), VmError> {
1086    let mut params = request_payload
1087        .as_object()
1088        .cloned()
1089        .unwrap_or_default()
1090        .into_iter()
1091        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1092        .collect::<BTreeMap<_, _>>();
1093    params.insert(
1094        "request_id".to_string(),
1095        VmValue::String(Rc::from(request_id.to_string())),
1096    );
1097    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
1098        return Ok(());
1099    };
1100    let value = result?;
1101    let responses = match value {
1102        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1103        other => vec![other],
1104    };
1105    for response in responses {
1106        let response_dict = response.as_dict().ok_or_else(|| {
1107            VmError::Runtime(format!(
1108                "mocked HITL {} response must be a dict or list<dict>",
1109                kind.as_str()
1110            ))
1111        })?;
1112        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1113        append_hitl_response(None, hitl_response)
1114            .await
1115            .map_err(VmError::Runtime)?;
1116    }
1117    Ok(())
1118}
1119
1120fn parse_hitl_response_dict(
1121    request_id: &str,
1122    response_dict: &BTreeMap<String, VmValue>,
1123) -> Result<HitlHostResponse, VmError> {
1124    Ok(HitlHostResponse {
1125        request_id: request_id.to_string(),
1126        answer: response_dict
1127            .get("answer")
1128            .map(crate::llm::vm_value_to_json),
1129        approved: response_dict.get("approved").and_then(vm_bool),
1130        accepted: response_dict.get("accepted").and_then(vm_bool),
1131        reviewer: response_dict.get("reviewer").map(VmValue::display),
1132        reason: response_dict.get("reason").map(VmValue::display),
1133        metadata: response_dict
1134            .get("metadata")
1135            .map(crate::llm::vm_value_to_json),
1136        responded_at: response_dict.get("responded_at").map(VmValue::display),
1137        signature: response_dict.get("signature").map(VmValue::display),
1138    })
1139}
1140
1141fn maybe_notify_host(request: &HitlRequestEnvelope) {
1142    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1143        return;
1144    };
1145    bridge.notify(
1146        "harn.hitl.requested",
1147        serde_json::to_value(request).unwrap_or(JsonValue::Null),
1148    );
1149}
1150
1151fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1152    let Some(value) = value else {
1153        return Ok(AskUserOptions {
1154            schema: None,
1155            timeout: Some(default_question_timeout()),
1156            default: None,
1157        });
1158    };
1159    let dict = value
1160        .as_dict()
1161        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1162    Ok(AskUserOptions {
1163        schema: dict
1164            .get("schema")
1165            .cloned()
1166            .filter(|value| !matches!(value, VmValue::Nil)),
1167        timeout: dict
1168            .get("timeout")
1169            .map(parse_duration_value)
1170            .transpose()?
1171            .or_else(|| Some(default_question_timeout())),
1172        default: dict
1173            .get("default")
1174            .cloned()
1175            .filter(|value| !matches!(value, VmValue::Nil)),
1176    })
1177}
1178
1179fn default_question_timeout() -> StdDuration {
1180    StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1181}
1182
1183fn escalation_capability_policy() -> JsonValue {
1184    crate::orchestration::current_execution_policy()
1185        .and_then(|policy| serde_json::to_value(policy).ok())
1186        .unwrap_or(JsonValue::Null)
1187}
1188
1189fn parse_approval_options(
1190    value: Option<&VmValue>,
1191    builtin: &str,
1192) -> Result<ApprovalOptions, VmError> {
1193    let dict = match value {
1194        None => None,
1195        Some(VmValue::Dict(dict)) => Some(dict),
1196        Some(_) => {
1197            return Err(VmError::Runtime(format!(
1198                "{builtin}: options must be a dict"
1199            )))
1200        }
1201    };
1202    let quorum = dict
1203        .and_then(|dict| dict.get("quorum"))
1204        .and_then(VmValue::as_int)
1205        .unwrap_or(1);
1206    if quorum <= 0 {
1207        return Err(VmError::Runtime(format!(
1208            "{builtin}: quorum must be positive"
1209        )));
1210    }
1211    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1212    let deadline = dict
1213        .and_then(|dict| dict.get("deadline"))
1214        .map(parse_duration_value)
1215        .transpose()?
1216        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1217    Ok(ApprovalOptions {
1218        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1219        quorum: quorum as u32,
1220        reviewers,
1221        deadline,
1222    })
1223}
1224
1225fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1226    args.get(idx)
1227        .map(VmValue::display)
1228        .filter(|value| !value.is_empty())
1229        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1230}
1231
1232fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1233    let value = args
1234        .get(idx)
1235        .and_then(VmValue::as_int)
1236        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1237    if value <= 0 {
1238        return Err(VmError::Runtime(format!(
1239            "{builtin}: expected a positive int at {idx}"
1240        )));
1241    }
1242    Ok(value)
1243}
1244
1245fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1246    let Some(value) = value else {
1247        return Ok(Vec::new());
1248    };
1249    match value {
1250        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1251        _ => Err(VmError::Runtime(format!(
1252            "{builtin}: expected list<string>"
1253        ))),
1254    }
1255}
1256
1257fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1258    match value {
1259        VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
1260        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1261        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1262        _ => Err(VmError::Runtime(
1263            "expected a duration or millisecond count".to_string(),
1264        )),
1265    }
1266}
1267
1268fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1269    active_event_log()
1270        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1271}
1272
1273fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1274    if let Some(log) = active_event_log() {
1275        return Ok(log);
1276    }
1277    let Some(base_dir) = base_dir else {
1278        return Ok(install_memory_for_current_thread(
1279            HITL_EVENT_LOG_QUEUE_DEPTH,
1280        ));
1281    };
1282    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1283}
1284
1285fn current_dispatch_keys() -> Option<DispatchKeys> {
1286    let context = current_dispatch_context()?;
1287    let stable_base = context
1288        .replay_of_event_id
1289        .clone()
1290        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1291    let instance_key = format!(
1292        "{}::{}",
1293        context.trigger_event.id.0,
1294        context.replay_of_event_id.as_deref().unwrap_or("live")
1295    );
1296    Some(DispatchKeys {
1297        instance_key,
1298        stable_base,
1299        agent: context.agent_id,
1300        trace_id: context.trigger_event.trace_id.0,
1301    })
1302}
1303
1304fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1305    if let Some(keys) = dispatch_keys {
1306        let seq = REQUEST_SEQUENCE.with(|slot| {
1307            let mut state = slot.borrow_mut();
1308            if state.instance_key != keys.instance_key {
1309                state.instance_key = keys.instance_key.clone();
1310                state.next_seq = 0;
1311            }
1312            state.next_seq += 1;
1313            state.next_seq
1314        });
1315        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1316    }
1317    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1318}
1319
1320fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1321    headers_with_trace(&request.request_id, &request.trace_id)
1322}
1323
1324fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1325    let mut headers = BTreeMap::new();
1326    headers.insert("request_id".to_string(), request_id.to_string());
1327    headers
1328}
1329
1330fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1331    let mut headers = response_headers(request_id);
1332    headers.insert("trace_id".to_string(), trace_id.to_string());
1333    headers
1334}
1335
1336fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1337    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1338}
1339
1340fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1341    event
1342        .headers
1343        .get("request_id")
1344        .is_some_and(|value| value == request_id)
1345        || event
1346            .payload
1347            .get("request_id")
1348            .and_then(JsonValue::as_str)
1349            .is_some_and(|value| value == request_id)
1350}
1351
1352fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1353    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1354        "name": "ApprovalDeniedError",
1355        "category": "generic",
1356        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1357        "request_id": request_id,
1358        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1359        "reason": response.reason,
1360    })))
1361}
1362
1363fn hitl_cancelled_error(
1364    request_id: &str,
1365    kind: HitlRequestKind,
1366    wait_id: &str,
1367    waitpoint_ids: &[String],
1368    reason: Option<String>,
1369) -> VmError {
1370    let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1371    let message = reason
1372        .clone()
1373        .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1374    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1375        "name": "HumanCancelledError",
1376        "category": ErrorCategory::Cancelled.as_str(),
1377        "message": message,
1378        "request_id": request_id,
1379        "kind": kind.as_str(),
1380        "wait_id": wait_id,
1381        "waitpoint_ids": waitpoint_ids,
1382        "reason": reason,
1383    })))
1384}
1385
1386fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1387    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1388    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1389        "name": "HumanTimeoutError",
1390        "category": ErrorCategory::Timeout.as_str(),
1391        "message": format!("{} timed out", kind.as_str()),
1392        "request_id": request_id,
1393        "kind": kind.as_str(),
1394    })))
1395}
1396
1397fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1398    match default {
1399        VmValue::Int(_) => match value {
1400            VmValue::Int(_) => value.clone(),
1401            VmValue::Float(number) => VmValue::Int(*number as i64),
1402            VmValue::String(text) => text
1403                .parse::<i64>()
1404                .map(VmValue::Int)
1405                .unwrap_or_else(|_| default.clone()),
1406            _ => default.clone(),
1407        },
1408        VmValue::Float(_) => match value {
1409            VmValue::Float(_) => value.clone(),
1410            VmValue::Int(number) => VmValue::Float(*number as f64),
1411            VmValue::String(text) => text
1412                .parse::<f64>()
1413                .map(VmValue::Float)
1414                .unwrap_or_else(|_| default.clone()),
1415            _ => default.clone(),
1416        },
1417        VmValue::Bool(_) => match value {
1418            VmValue::Bool(_) => value.clone(),
1419            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1420            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1421            _ => default.clone(),
1422        },
1423        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1424        VmValue::Duration(_) => match value {
1425            VmValue::Duration(_) => value.clone(),
1426            VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1427            _ => default.clone(),
1428        },
1429        VmValue::Nil => value.clone(),
1430        _ => {
1431            if value.type_name() == default.type_name() {
1432                value.clone()
1433            } else {
1434                default.clone()
1435            }
1436        }
1437    }
1438}
1439
1440fn log_error(error: impl std::fmt::Display) -> VmError {
1441    VmError::Runtime(error.to_string())
1442}
1443
1444fn now_rfc3339() -> String {
1445    OffsetDateTime::now_utc()
1446        .format(&Rfc3339)
1447        .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1448}
1449
1450fn new_trace_id() -> String {
1451    format!("trace_{}", Uuid::now_v7())
1452}
1453
1454fn vm_bool(value: &VmValue) -> Option<bool> {
1455    match value {
1456        VmValue::Bool(flag) => Some(*flag),
1457        _ => None,
1458    }
1459}
1460
1461fn vm_string(value: &VmValue) -> Option<&str> {
1462    match value {
1463        VmValue::String(text) => Some(text.as_ref()),
1464        _ => None,
1465    }
1466}
1467
1468fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1469    match value {
1470        VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1471        _ => None,
1472    }
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477    use super::{
1478        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1479    };
1480    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1481    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1482
1483    async fn execute_hitl_script(
1484        base_dir: &std::path::Path,
1485        source: &str,
1486    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1487        reset_thread_local_state();
1488        let log = install_default_for_base_dir(base_dir).expect("install event log");
1489        let chunk = compile_source(source).expect("compile source");
1490        let mut vm = Vm::new();
1491        register_vm_stdlib(&mut vm);
1492        vm.set_source_dir(base_dir);
1493        vm.execute(&chunk).await?;
1494        let output = vm.output().trim_end().to_string();
1495        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1496        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1497        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1498        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1499        Ok((
1500            output,
1501            question_events,
1502            approval_events,
1503            dual_control_events,
1504            escalation_events,
1505        ))
1506    }
1507
1508    async fn event_kinds(
1509        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1510        topic: &str,
1511    ) -> Vec<String> {
1512        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1513            .await
1514            .expect("read topic")
1515            .into_iter()
1516            .map(|(_, event)| event.kind)
1517            .collect()
1518    }
1519
1520    #[tokio::test(flavor = "current_thread")]
1521    async fn ask_user_coerces_to_default_type_and_logs_events() {
1522        tokio::task::LocalSet::new()
1523            .run_until(async {
1524                let dir = tempfile::tempdir().expect("tempdir");
1525                let source = r#"
1526pipeline test(task) {
1527  host_mock("hitl", "question", {answer: "9"})
1528  let answer: int = ask_user("Pick a number", {default: 0})
1529  println(answer)
1530}
1531"#;
1532                let (
1533                    output,
1534                    question_events,
1535                    approval_events,
1536                    dual_control_events,
1537                    escalation_events,
1538                ) = execute_hitl_script(dir.path(), source)
1539                    .await
1540                    .expect("script succeeds");
1541                assert_eq!(output, "9");
1542                assert_eq!(
1543                    question_events,
1544                    vec![
1545                        "hitl.question_asked".to_string(),
1546                        "hitl.response_received".to_string()
1547                    ]
1548                );
1549                assert!(approval_events.is_empty());
1550                assert!(dual_control_events.is_empty());
1551                assert!(escalation_events.is_empty());
1552            })
1553            .await;
1554    }
1555
1556    #[tokio::test(flavor = "current_thread")]
1557    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1558        tokio::task::LocalSet::new()
1559            .run_until(async {
1560                let dir = tempfile::tempdir().expect("tempdir");
1561                let source = r#"
1562pipeline test(task) {
1563  host_mock("hitl", "approval", [
1564    {approved: true, reviewer: "alice", reason: "ok"},
1565    {approved: true, reviewer: "bob", reason: "ship it"},
1566  ])
1567  let record = request_approval(
1568    "deploy production",
1569    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1570  )
1571  println(record.approved)
1572  println(len(record.reviewers))
1573  println(record.reviewers[0])
1574  println(record.reviewers[1])
1575}
1576"#;
1577                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1578                    .await
1579                    .expect("script succeeds");
1580                assert_eq!(
1581                    approval_events,
1582                    vec![
1583                        "hitl.approval_requested".to_string(),
1584                        "hitl.response_received".to_string(),
1585                        "hitl.response_received".to_string(),
1586                        "hitl.approval_approved".to_string(),
1587                    ]
1588                );
1589            })
1590            .await;
1591    }
1592
1593    #[tokio::test(flavor = "current_thread")]
1594    async fn request_approval_surfaces_denials_as_typed_errors() {
1595        tokio::task::LocalSet::new()
1596            .run_until(async {
1597                let dir = tempfile::tempdir().expect("tempdir");
1598                let source = r#"
1599pipeline test(task) {
1600  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1601  let denied = try {
1602    request_approval("drop table", {reviewers: ["alice"]})
1603  }
1604  println(is_err(denied))
1605  println(unwrap_err(denied).name)
1606  println(unwrap_err(denied).reason)
1607}
1608"#;
1609                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1610                    .await
1611                    .expect("script succeeds");
1612                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1613                assert_eq!(
1614                    approval_events,
1615                    vec![
1616                        "hitl.approval_requested".to_string(),
1617                        "hitl.response_received".to_string(),
1618                        "hitl.approval_denied".to_string(),
1619                    ]
1620                );
1621            })
1622            .await;
1623    }
1624
1625    #[tokio::test(flavor = "current_thread")]
1626    async fn dual_control_executes_action_after_quorum() {
1627        tokio::task::LocalSet::new()
1628            .run_until(async {
1629                let dir = tempfile::tempdir().expect("tempdir");
1630                let source = r#"
1631pipeline test(task) {
1632  host_mock("hitl", "dual_control", [
1633    {approved: true, reviewer: "alice"},
1634    {approved: true, reviewer: "bob"},
1635  ])
1636  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1637  println(result)
1638}
1639"#;
1640                let (output, _, _, dual_control_events, _) =
1641                    execute_hitl_script(dir.path(), source)
1642                        .await
1643                        .expect("script succeeds");
1644                assert_eq!(output, "launched");
1645                assert_eq!(
1646                    dual_control_events,
1647                    vec![
1648                        "hitl.dual_control_requested".to_string(),
1649                        "hitl.response_received".to_string(),
1650                        "hitl.response_received".to_string(),
1651                        "hitl.dual_control_approved".to_string(),
1652                        "hitl.dual_control_executed".to_string(),
1653                    ]
1654                );
1655            })
1656            .await;
1657    }
1658
1659    #[tokio::test(flavor = "current_thread")]
1660    async fn escalate_to_waits_for_acceptance_event() {
1661        tokio::task::LocalSet::new()
1662            .run_until(async {
1663                let dir = tempfile::tempdir().expect("tempdir");
1664                let source = r#"
1665pipeline test(task) {
1666  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1667  let handle = escalate_to("admin", "need override")
1668  println(handle.status)
1669  println(handle.reviewer)
1670}
1671"#;
1672                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1673                    .await
1674                    .expect("script succeeds");
1675                assert_eq!(output, "accepted\nlead");
1676                assert_eq!(
1677                    escalation_events,
1678                    vec![
1679                        "hitl.escalation_issued".to_string(),
1680                        "hitl.escalation_accepted".to_string(),
1681                    ]
1682                );
1683            })
1684            .await;
1685    }
1686}