Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::{Duration as StdDuration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::triggers::dispatcher::current_dispatch_context;
22use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
23use crate::vm::{clone_async_builtin_child_vm, Vm};
24
25const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
26const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
27
28pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
29pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
30pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
31pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
32
33thread_local! {
34    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
35}
36
37#[derive(Default)]
38pub(crate) struct RequestSequenceState {
39    pub(crate) instance_key: String,
40    pub(crate) next_seq: u64,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum HitlRequestKind {
46    Question,
47    Approval,
48    DualControl,
49    Escalation,
50}
51
52impl HitlRequestKind {
53    fn as_str(self) -> &'static str {
54        match self {
55            Self::Question => "question",
56            Self::Approval => "approval",
57            Self::DualControl => "dual_control",
58            Self::Escalation => "escalation",
59        }
60    }
61
62    fn topic(self) -> &'static str {
63        match self {
64            Self::Question => HITL_QUESTIONS_TOPIC,
65            Self::Approval => HITL_APPROVALS_TOPIC,
66            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
67            Self::Escalation => HITL_ESCALATIONS_TOPIC,
68        }
69    }
70
71    fn request_event_kind(self) -> &'static str {
72        match self {
73            Self::Question => "hitl.question_asked",
74            Self::Approval => "hitl.approval_requested",
75            Self::DualControl => "hitl.dual_control_requested",
76            Self::Escalation => "hitl.escalation_issued",
77        }
78    }
79
80    fn from_request_id(request_id: &str) -> Option<Self> {
81        if request_id.starts_with("hitl_question_") {
82            Some(Self::Question)
83        } else if request_id.starts_with("hitl_approval_") {
84            Some(Self::Approval)
85        } else if request_id.starts_with("hitl_dual_control_") {
86            Some(Self::DualControl)
87        } else if request_id.starts_with("hitl_escalation_") {
88            Some(Self::Escalation)
89        } else {
90            None
91        }
92    }
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct HitlHostResponse {
97    pub request_id: String,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub answer: Option<JsonValue>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub approved: Option<bool>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub accepted: Option<bool>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub reviewer: Option<String>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub reason: Option<String>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub metadata: Option<JsonValue>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub responded_at: Option<String>,
112}
113
114#[derive(Clone, Debug, Serialize, Deserialize)]
115struct HitlRequestEnvelope {
116    request_id: String,
117    kind: HitlRequestKind,
118    trace_id: String,
119    requested_at: String,
120    payload: JsonValue,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize)]
124struct HitlTimeoutRecord {
125    request_id: String,
126    kind: HitlRequestKind,
127    trace_id: String,
128    timed_out_at: String,
129}
130
131#[derive(Clone, Debug)]
132struct DispatchKeys {
133    instance_key: String,
134    stable_base: String,
135    trace_id: String,
136}
137
138#[derive(Clone, Debug)]
139struct AskUserOptions {
140    schema: Option<VmValue>,
141    timeout: Option<StdDuration>,
142    default: Option<VmValue>,
143}
144
145#[derive(Clone, Debug)]
146struct ApprovalOptions {
147    detail: Option<VmValue>,
148    quorum: u32,
149    reviewers: Vec<String>,
150    deadline: StdDuration,
151}
152
153#[derive(Clone, Debug)]
154struct ApprovalProgress {
155    reviewers: BTreeSet<String>,
156    reason: Option<String>,
157    approved_at: Option<String>,
158}
159
160#[derive(Clone, Debug)]
161enum WaitOutcome {
162    Response(HitlHostResponse),
163    Timeout,
164}
165
166pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
167    vm.register_async_builtin("ask_user", |args| {
168        Box::pin(async move { ask_user_impl(&args).await })
169    });
170
171    vm.register_async_builtin("request_approval", |args| {
172        Box::pin(async move { request_approval_impl(&args).await })
173    });
174
175    vm.register_async_builtin("dual_control", |args| {
176        Box::pin(async move { dual_control_impl(&args).await })
177    });
178
179    vm.register_async_builtin("escalate_to", |args| {
180        Box::pin(async move { escalate_to_impl(&args).await })
181    });
182}
183
184pub(crate) fn reset_hitl_state() {
185    REQUEST_SEQUENCE.with(|slot| {
186        *slot.borrow_mut() = RequestSequenceState::default();
187    });
188}
189
190pub(crate) fn take_hitl_state() -> RequestSequenceState {
191    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
192}
193
194pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
195    REQUEST_SEQUENCE.with(|slot| {
196        *slot.borrow_mut() = state;
197    });
198}
199
200pub async fn append_hitl_response(
201    base_dir: Option<&Path>,
202    mut response: HitlHostResponse,
203) -> Result<u64, String> {
204    let kind = HitlRequestKind::from_request_id(&response.request_id)
205        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
206    if response.responded_at.is_none() {
207        response.responded_at = Some(now_rfc3339());
208    }
209    let log = ensure_hitl_event_log_for(base_dir)?;
210    let headers = response_headers(&response.request_id);
211    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
212    log.append(
213        &topic,
214        LogEvent::new(
215            match kind {
216                HitlRequestKind::Escalation => "hitl.escalation_accepted",
217                _ => "hitl.response_received",
218            },
219            serde_json::to_value(response).map_err(|error| error.to_string())?,
220        )
221        .with_headers(headers),
222    )
223    .await
224    .map_err(|error| error.to_string())
225}
226
227async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
228    let prompt = required_string_arg(args, 0, "ask_user")?;
229    let options = parse_ask_user_options(args.get(1))?;
230    let keys = current_dispatch_keys();
231    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
232    let trace_id = keys
233        .as_ref()
234        .map(|keys| keys.trace_id.clone())
235        .unwrap_or_else(new_trace_id);
236    let log = ensure_hitl_event_log();
237    let request = HitlRequestEnvelope {
238        request_id: request_id.clone(),
239        kind: HitlRequestKind::Question,
240        trace_id: trace_id.clone(),
241        requested_at: now_rfc3339(),
242        payload: json!({
243            "prompt": prompt,
244            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
245            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
246            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
247        }),
248    };
249    append_request(&log, &request).await?;
250    maybe_notify_host(&request);
251    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
252
253    match wait_for_terminal_event(
254        &log,
255        HitlRequestKind::Question,
256        &request_id,
257        options.timeout,
258        &trace_id,
259    )
260    .await?
261    {
262        WaitOutcome::Timeout => {
263            if let Some(default) = options.default {
264                return Ok(default);
265            }
266            Err(timeout_error(&request_id, HitlRequestKind::Question))
267        }
268        WaitOutcome::Response(response) => {
269            let answer = response
270                .answer
271                .as_ref()
272                .map(crate::stdlib::json_to_vm_value)
273                .unwrap_or(VmValue::Nil);
274            if let Some(schema) = options.schema.as_ref() {
275                return schema_expect_value(&answer, schema, true);
276            }
277            if let Some(default) = options.default.as_ref() {
278                return Ok(coerce_like_default(&answer, default));
279            }
280            Ok(answer)
281        }
282    }
283}
284
285async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
286    let action = required_string_arg(args, 0, "request_approval")?;
287    let options = parse_approval_options(args.get(1), "request_approval")?;
288    let keys = current_dispatch_keys();
289    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
290    let trace_id = keys
291        .as_ref()
292        .map(|keys| keys.trace_id.clone())
293        .unwrap_or_else(new_trace_id);
294    let log = ensure_hitl_event_log();
295    let request = HitlRequestEnvelope {
296        request_id: request_id.clone(),
297        kind: HitlRequestKind::Approval,
298        trace_id: trace_id.clone(),
299        requested_at: now_rfc3339(),
300        payload: json!({
301            "action": action,
302            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
303            "quorum": options.quorum,
304            "reviewers": options.reviewers,
305            "deadline_ms": options.deadline.as_millis() as u64,
306        }),
307    };
308    append_request(&log, &request).await?;
309    maybe_notify_host(&request);
310    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
311
312    let record = wait_for_approval_quorum(
313        &log,
314        HitlRequestKind::Approval,
315        &request_id,
316        options.quorum,
317        &options.reviewers,
318        options.deadline,
319        &trace_id,
320    )
321    .await?;
322
323    append_named_event(
324        &log,
325        HitlRequestKind::Approval,
326        "hitl.approval_approved",
327        &request_id,
328        &trace_id,
329        json!({
330            "request_id": request_id,
331            "record": crate::llm::vm_value_to_json(&record),
332        }),
333    )
334    .await?;
335
336    Ok(record)
337}
338
339async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
340    let n = required_positive_int_arg(args, 0, "dual_control")?;
341    let m = required_positive_int_arg(args, 1, "dual_control")?;
342    if n > m {
343        return Err(VmError::Runtime(
344            "dual_control: n must be less than or equal to m".to_string(),
345        ));
346    }
347    let action = args
348        .get(2)
349        .and_then(|value| match value {
350            VmValue::Closure(closure) => Some(closure.clone()),
351            _ => None,
352        })
353        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
354    let approvers = optional_string_list(args.get(3), "dual_control")?;
355    if !approvers.is_empty() && approvers.len() < m as usize {
356        return Err(VmError::Runtime(format!(
357            "dual_control: expected at least {m} approvers, got {}",
358            approvers.len()
359        )));
360    }
361
362    let keys = current_dispatch_keys();
363    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
364    let trace_id = keys
365        .as_ref()
366        .map(|keys| keys.trace_id.clone())
367        .unwrap_or_else(new_trace_id);
368    let action_name = if action.func.name.is_empty() {
369        "anonymous".to_string()
370    } else {
371        action.func.name.clone()
372    };
373    let log = ensure_hitl_event_log();
374    let request = HitlRequestEnvelope {
375        request_id: request_id.clone(),
376        kind: HitlRequestKind::DualControl,
377        trace_id: trace_id.clone(),
378        requested_at: now_rfc3339(),
379        payload: json!({
380            "n": n,
381            "m": m,
382            "action": action_name,
383            "approvers": approvers,
384            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
385        }),
386    };
387    append_request(&log, &request).await?;
388    maybe_notify_host(&request);
389    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
390
391    let record = wait_for_approval_quorum(
392        &log,
393        HitlRequestKind::DualControl,
394        &request_id,
395        n as u32,
396        &approvers,
397        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
398        &trace_id,
399    )
400    .await?;
401
402    append_named_event(
403        &log,
404        HitlRequestKind::DualControl,
405        "hitl.dual_control_approved",
406        &request_id,
407        &trace_id,
408        json!({
409            "request_id": request_id,
410            "record": crate::llm::vm_value_to_json(&record),
411        }),
412    )
413    .await?;
414
415    let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
416        VmError::Runtime("dual_control requires an async builtin VM context".to_string())
417    })?;
418    let result = vm.call_closure_pub(&action, &[], &[]).await?;
419
420    append_named_event(
421        &log,
422        HitlRequestKind::DualControl,
423        "hitl.dual_control_executed",
424        &request_id,
425        &trace_id,
426        json!({
427            "request_id": request_id,
428            "result": crate::llm::vm_value_to_json(&result),
429        }),
430    )
431    .await?;
432
433    Ok(result)
434}
435
436async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
437    let role = required_string_arg(args, 0, "escalate_to")?;
438    let reason = required_string_arg(args, 1, "escalate_to")?;
439    let keys = current_dispatch_keys();
440    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
441    let trace_id = keys
442        .as_ref()
443        .map(|keys| keys.trace_id.clone())
444        .unwrap_or_else(new_trace_id);
445    let log = ensure_hitl_event_log();
446    let request = HitlRequestEnvelope {
447        request_id: request_id.clone(),
448        kind: HitlRequestKind::Escalation,
449        trace_id: trace_id.clone(),
450        requested_at: now_rfc3339(),
451        payload: json!({
452            "role": role,
453            "reason": reason,
454        }),
455    };
456    append_request(&log, &request).await?;
457    maybe_notify_host(&request);
458    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
459
460    match wait_for_terminal_event(
461        &log,
462        HitlRequestKind::Escalation,
463        &request_id,
464        None,
465        &trace_id,
466    )
467    .await?
468    {
469        WaitOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
470        WaitOutcome::Response(response) => Ok(crate::stdlib::json_to_vm_value(&json!({
471            "request_id": request_id,
472            "role": role,
473            "reason": reason,
474            "trace_id": trace_id,
475            "status": if response.accepted.unwrap_or(false) { "accepted" } else { "pending" },
476            "accepted_at": response.responded_at,
477            "reviewer": response.reviewer,
478        }))),
479    }
480}
481
482async fn wait_for_approval_quorum(
483    log: &Arc<AnyEventLog>,
484    kind: HitlRequestKind,
485    request_id: &str,
486    quorum: u32,
487    reviewers: &[String],
488    timeout: StdDuration,
489    trace_id: &str,
490) -> Result<VmValue, VmError> {
491    let deadline = Instant::now() + timeout;
492    let mut progress = ApprovalProgress {
493        reviewers: BTreeSet::new(),
494        reason: None,
495        approved_at: None,
496    };
497    let allowed_reviewers: BTreeSet<&str> = reviewers.iter().map(String::as_str).collect();
498    let topic = topic(kind)?;
499
500    process_existing_approval_events(
501        log,
502        &topic,
503        request_id,
504        quorum,
505        &allowed_reviewers,
506        &mut progress,
507        trace_id,
508    )
509    .await?;
510    if progress.reviewers.len() as u32 >= quorum {
511        return Ok(approval_record_value(&progress));
512    }
513    if is_replay() {
514        return Err(VmError::Runtime(format!(
515            "replay is missing recorded HITL approvals for '{request_id}'"
516        )));
517    }
518
519    let start_from = log.latest(&topic).await.map_err(log_error)?;
520    let stream = log
521        .clone()
522        .subscribe(&topic, start_from)
523        .await
524        .map_err(log_error)?;
525    pin_mut!(stream);
526    loop {
527        let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
528            append_timeout(log, kind, request_id, trace_id).await?;
529            return Err(timeout_error(request_id, kind));
530        };
531        let next = tokio::time::timeout(remaining, stream.next()).await;
532        let event = match next {
533            Ok(Some(Ok((_, event)))) => event,
534            Ok(Some(Err(error))) => return Err(log_error(error)),
535            Ok(None) => {
536                append_timeout(log, kind, request_id, trace_id).await?;
537                return Err(timeout_error(request_id, kind));
538            }
539            Err(_) => {
540                append_timeout(log, kind, request_id, trace_id).await?;
541                return Err(timeout_error(request_id, kind));
542            }
543        };
544        if !event_matches_request(&event, request_id) {
545            continue;
546        }
547        if event.kind == "hitl.timeout" {
548            return Err(timeout_error(request_id, kind));
549        }
550        if event.kind != "hitl.response_received" {
551            continue;
552        }
553        let response: HitlHostResponse = serde_json::from_value(event.payload)
554            .map_err(|error| VmError::Runtime(error.to_string()))?;
555        if let Some(reviewer) = response.reviewer.as_deref() {
556            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
557                continue;
558            }
559            if progress.reviewers.contains(reviewer) {
560                continue;
561            }
562        }
563        if response.approved.unwrap_or(false) {
564            if let Some(reviewer) = response.reviewer {
565                progress.reviewers.insert(reviewer);
566            }
567            progress.reason = response.reason;
568            progress.approved_at = response.responded_at;
569            if progress.reviewers.len() as u32 >= quorum {
570                return Ok(approval_record_value(&progress));
571            }
572            continue;
573        }
574
575        append_named_event(
576            log,
577            kind,
578            match kind {
579                HitlRequestKind::DualControl => "hitl.dual_control_denied",
580                _ => "hitl.approval_denied",
581            },
582            request_id,
583            trace_id,
584            json!({
585                "request_id": request_id,
586                "reviewer": response.reviewer,
587                "reason": response.reason,
588            }),
589        )
590        .await?;
591        return Err(approval_denied_error(request_id, response));
592    }
593}
594
595async fn process_existing_approval_events(
596    log: &Arc<AnyEventLog>,
597    topic: &Topic,
598    request_id: &str,
599    quorum: u32,
600    allowed_reviewers: &BTreeSet<&str>,
601    progress: &mut ApprovalProgress,
602    trace_id: &str,
603) -> Result<(), VmError> {
604    let existing = log
605        .read_range(topic, None, usize::MAX)
606        .await
607        .map_err(log_error)?;
608    for (_, event) in existing {
609        if !event_matches_request(&event, request_id) {
610            continue;
611        }
612        if event.kind == "hitl.timeout" {
613            return Err(timeout_error(request_id, kind_from_topic(topic)?));
614        }
615        if event.kind != "hitl.response_received" {
616            continue;
617        }
618        let response: HitlHostResponse = serde_json::from_value(event.payload)
619            .map_err(|error| VmError::Runtime(error.to_string()))?;
620        let Some(reviewer) = response.reviewer.as_deref() else {
621            continue;
622        };
623        if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
624            continue;
625        }
626        if !response.approved.unwrap_or(false) {
627            let kind = kind_from_topic(topic)?;
628            append_named_event(
629                log,
630                kind,
631                match kind {
632                    HitlRequestKind::DualControl => "hitl.dual_control_denied",
633                    _ => "hitl.approval_denied",
634                },
635                request_id,
636                trace_id,
637                json!({
638                    "request_id": request_id,
639                    "reviewer": response.reviewer,
640                    "reason": response.reason,
641                }),
642            )
643            .await?;
644            return Err(approval_denied_error(request_id, response));
645        }
646        if progress.reviewers.insert(reviewer.to_string()) {
647            progress.reason = response.reason;
648            progress.approved_at = response.responded_at;
649        }
650        if progress.reviewers.len() as u32 >= quorum {
651            break;
652        }
653    }
654    Ok(())
655}
656
657async fn wait_for_terminal_event(
658    log: &Arc<AnyEventLog>,
659    kind: HitlRequestKind,
660    request_id: &str,
661    timeout: Option<StdDuration>,
662    trace_id: &str,
663) -> Result<WaitOutcome, VmError> {
664    let topic = topic(kind)?;
665    if let Some(outcome) = find_existing_terminal_event(log, &topic, request_id).await? {
666        return Ok(outcome);
667    }
668    if is_replay() {
669        return Err(VmError::Runtime(format!(
670            "replay is missing a recorded HITL response for '{request_id}'"
671        )));
672    }
673
674    let start_from = log.latest(&topic).await.map_err(log_error)?;
675    let stream = log
676        .clone()
677        .subscribe(&topic, start_from)
678        .await
679        .map_err(log_error)?;
680    pin_mut!(stream);
681    let deadline = timeout.map(|timeout| Instant::now() + timeout);
682
683    loop {
684        let next_event = if let Some(deadline) = deadline {
685            let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
686                append_timeout(log, kind, request_id, trace_id).await?;
687                return Ok(WaitOutcome::Timeout);
688            };
689            match tokio::time::timeout(remaining, stream.next()).await {
690                Ok(next) => next,
691                Err(_) => {
692                    append_timeout(log, kind, request_id, trace_id).await?;
693                    return Ok(WaitOutcome::Timeout);
694                }
695            }
696        } else {
697            stream.next().await
698        };
699
700        let Some(received) = next_event else {
701            return Err(VmError::Runtime(format!(
702                "HITL wait for '{request_id}' ended before the host responded"
703            )));
704        };
705        let (_, event) = received.map_err(log_error)?;
706        if !event_matches_request(&event, request_id) {
707            continue;
708        }
709        match event.kind.as_str() {
710            "hitl.timeout" => return Ok(WaitOutcome::Timeout),
711            "hitl.response_received" | "hitl.escalation_accepted" => {
712                let response: HitlHostResponse = serde_json::from_value(event.payload)
713                    .map_err(|error| VmError::Runtime(error.to_string()))?;
714                if kind == HitlRequestKind::Escalation && !response.accepted.unwrap_or(false) {
715                    continue;
716                }
717                return Ok(WaitOutcome::Response(response));
718            }
719            _ => {}
720        }
721    }
722}
723
724async fn find_existing_terminal_event(
725    log: &Arc<AnyEventLog>,
726    topic: &Topic,
727    request_id: &str,
728) -> Result<Option<WaitOutcome>, VmError> {
729    let events = log
730        .read_range(topic, None, usize::MAX)
731        .await
732        .map_err(log_error)?;
733    for (_, event) in events {
734        if !event_matches_request(&event, request_id) {
735            continue;
736        }
737        match event.kind.as_str() {
738            "hitl.timeout" => return Ok(Some(WaitOutcome::Timeout)),
739            "hitl.response_received" | "hitl.escalation_accepted" => {
740                let response: HitlHostResponse = serde_json::from_value(event.payload)
741                    .map_err(|error| VmError::Runtime(error.to_string()))?;
742                if event.kind == "hitl.escalation_accepted" && !response.accepted.unwrap_or(false) {
743                    continue;
744                }
745                return Ok(Some(WaitOutcome::Response(response)));
746            }
747            _ => {}
748        }
749    }
750    Ok(None)
751}
752
753async fn append_request(
754    log: &Arc<AnyEventLog>,
755    request: &HitlRequestEnvelope,
756) -> Result<(), VmError> {
757    let topic = topic(request.kind)?;
758    log.append(
759        &topic,
760        LogEvent::new(
761            request.kind.request_event_kind(),
762            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
763        )
764        .with_headers(request_headers(request)),
765    )
766    .await
767    .map(|_| ())
768    .map_err(log_error)
769}
770
771async fn append_named_event(
772    log: &Arc<AnyEventLog>,
773    kind: HitlRequestKind,
774    event_kind: &str,
775    request_id: &str,
776    trace_id: &str,
777    payload: JsonValue,
778) -> Result<(), VmError> {
779    let topic = topic(kind)?;
780    let headers = headers_with_trace(request_id, trace_id);
781    log.append(
782        &topic,
783        LogEvent::new(event_kind, payload).with_headers(headers),
784    )
785    .await
786    .map(|_| ())
787    .map_err(log_error)
788}
789
790async fn append_timeout(
791    log: &Arc<AnyEventLog>,
792    kind: HitlRequestKind,
793    request_id: &str,
794    trace_id: &str,
795) -> Result<(), VmError> {
796    append_named_event(
797        log,
798        kind,
799        "hitl.timeout",
800        request_id,
801        trace_id,
802        serde_json::to_value(HitlTimeoutRecord {
803            request_id: request_id.to_string(),
804            kind,
805            trace_id: trace_id.to_string(),
806            timed_out_at: now_rfc3339(),
807        })
808        .map_err(|error| VmError::Runtime(error.to_string()))?,
809    )
810    .await
811}
812
813async fn maybe_apply_mock_response(
814    kind: HitlRequestKind,
815    request_id: &str,
816    request_payload: &JsonValue,
817) -> Result<(), VmError> {
818    let mut params = request_payload
819        .as_object()
820        .cloned()
821        .unwrap_or_default()
822        .into_iter()
823        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
824        .collect::<BTreeMap<_, _>>();
825    params.insert(
826        "request_id".to_string(),
827        VmValue::String(Rc::from(request_id.to_string())),
828    );
829    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
830        return Ok(());
831    };
832    let value = result?;
833    let responses = match value {
834        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
835        other => vec![other],
836    };
837    for response in responses {
838        let response_dict = response.as_dict().ok_or_else(|| {
839            VmError::Runtime(format!(
840                "mocked HITL {} response must be a dict or list<dict>",
841                kind.as_str()
842            ))
843        })?;
844        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
845        append_hitl_response(None, hitl_response)
846            .await
847            .map_err(VmError::Runtime)?;
848    }
849    Ok(())
850}
851
852fn parse_hitl_response_dict(
853    request_id: &str,
854    response_dict: &BTreeMap<String, VmValue>,
855) -> Result<HitlHostResponse, VmError> {
856    Ok(HitlHostResponse {
857        request_id: request_id.to_string(),
858        answer: response_dict
859            .get("answer")
860            .map(crate::llm::vm_value_to_json),
861        approved: response_dict.get("approved").and_then(vm_bool),
862        accepted: response_dict.get("accepted").and_then(vm_bool),
863        reviewer: response_dict.get("reviewer").map(VmValue::display),
864        reason: response_dict.get("reason").map(VmValue::display),
865        metadata: response_dict
866            .get("metadata")
867            .map(crate::llm::vm_value_to_json),
868        responded_at: response_dict.get("responded_at").map(VmValue::display),
869    })
870}
871
872fn maybe_notify_host(request: &HitlRequestEnvelope) {
873    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
874        return;
875    };
876    bridge.notify(
877        "harn.hitl.requested",
878        serde_json::to_value(request).unwrap_or(JsonValue::Null),
879    );
880}
881
882fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
883    let Some(value) = value else {
884        return Ok(AskUserOptions {
885            schema: None,
886            timeout: None,
887            default: None,
888        });
889    };
890    let dict = value
891        .as_dict()
892        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
893    Ok(AskUserOptions {
894        schema: dict
895            .get("schema")
896            .cloned()
897            .filter(|value| !matches!(value, VmValue::Nil)),
898        timeout: dict.get("timeout").map(parse_duration_value).transpose()?,
899        default: dict
900            .get("default")
901            .cloned()
902            .filter(|value| !matches!(value, VmValue::Nil)),
903    })
904}
905
906fn parse_approval_options(
907    value: Option<&VmValue>,
908    builtin: &str,
909) -> Result<ApprovalOptions, VmError> {
910    let dict = match value {
911        None => None,
912        Some(VmValue::Dict(dict)) => Some(dict),
913        Some(_) => {
914            return Err(VmError::Runtime(format!(
915                "{builtin}: options must be a dict"
916            )))
917        }
918    };
919    let quorum = dict
920        .and_then(|dict| dict.get("quorum"))
921        .and_then(VmValue::as_int)
922        .unwrap_or(1);
923    if quorum <= 0 {
924        return Err(VmError::Runtime(format!(
925            "{builtin}: quorum must be positive"
926        )));
927    }
928    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
929    let deadline = dict
930        .and_then(|dict| dict.get("deadline"))
931        .map(parse_duration_value)
932        .transpose()?
933        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
934    Ok(ApprovalOptions {
935        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
936        quorum: quorum as u32,
937        reviewers,
938        deadline,
939    })
940}
941
942fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
943    args.get(idx)
944        .map(VmValue::display)
945        .filter(|value| !value.is_empty())
946        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
947}
948
949fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
950    let value = args
951        .get(idx)
952        .and_then(VmValue::as_int)
953        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
954    if value <= 0 {
955        return Err(VmError::Runtime(format!(
956            "{builtin}: expected a positive int at {idx}"
957        )));
958    }
959    Ok(value)
960}
961
962fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
963    let Some(value) = value else {
964        return Ok(Vec::new());
965    };
966    match value {
967        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
968        _ => Err(VmError::Runtime(format!(
969            "{builtin}: expected list<string>"
970        ))),
971    }
972}
973
974fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
975    match value {
976        VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
977        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
978        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
979        _ => Err(VmError::Runtime(
980            "expected a duration or millisecond count".to_string(),
981        )),
982    }
983}
984
985fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
986    active_event_log()
987        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
988}
989
990fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
991    if let Some(log) = active_event_log() {
992        return Ok(log);
993    }
994    let Some(base_dir) = base_dir else {
995        return Ok(install_memory_for_current_thread(
996            HITL_EVENT_LOG_QUEUE_DEPTH,
997        ));
998    };
999    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1000}
1001
1002fn current_dispatch_keys() -> Option<DispatchKeys> {
1003    let context = current_dispatch_context()?;
1004    let stable_base = context
1005        .replay_of_event_id
1006        .clone()
1007        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1008    let instance_key = format!(
1009        "{}::{}",
1010        context.trigger_event.id.0,
1011        context.replay_of_event_id.as_deref().unwrap_or("live")
1012    );
1013    Some(DispatchKeys {
1014        instance_key,
1015        stable_base,
1016        trace_id: context.trigger_event.trace_id.0,
1017    })
1018}
1019
1020fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1021    if let Some(keys) = dispatch_keys {
1022        let seq = REQUEST_SEQUENCE.with(|slot| {
1023            let mut state = slot.borrow_mut();
1024            if state.instance_key != keys.instance_key {
1025                state.instance_key = keys.instance_key.clone();
1026                state.next_seq = 0;
1027            }
1028            state.next_seq += 1;
1029            state.next_seq
1030        });
1031        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1032    }
1033    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1034}
1035
1036fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1037    headers_with_trace(&request.request_id, &request.trace_id)
1038}
1039
1040fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1041    let mut headers = BTreeMap::new();
1042    headers.insert("request_id".to_string(), request_id.to_string());
1043    headers
1044}
1045
1046fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1047    let mut headers = response_headers(request_id);
1048    headers.insert("trace_id".to_string(), trace_id.to_string());
1049    headers
1050}
1051
1052fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1053    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1054}
1055
1056fn kind_from_topic(topic: &Topic) -> Result<HitlRequestKind, VmError> {
1057    match topic.as_str() {
1058        HITL_QUESTIONS_TOPIC => Ok(HitlRequestKind::Question),
1059        HITL_APPROVALS_TOPIC => Ok(HitlRequestKind::Approval),
1060        HITL_DUAL_CONTROL_TOPIC => Ok(HitlRequestKind::DualControl),
1061        HITL_ESCALATIONS_TOPIC => Ok(HitlRequestKind::Escalation),
1062        other => Err(VmError::Runtime(format!("unknown HITL topic '{other}'"))),
1063    }
1064}
1065
1066fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1067    event
1068        .headers
1069        .get("request_id")
1070        .is_some_and(|value| value == request_id)
1071        || event
1072            .payload
1073            .get("request_id")
1074            .and_then(JsonValue::as_str)
1075            .is_some_and(|value| value == request_id)
1076}
1077
1078fn approval_record_value(progress: &ApprovalProgress) -> VmValue {
1079    crate::stdlib::json_to_vm_value(&json!({
1080        "approved": true,
1081        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1082        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1083        "reason": progress.reason,
1084    }))
1085}
1086
1087fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1088    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1089        "name": "ApprovalDeniedError",
1090        "category": "generic",
1091        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1092        "request_id": request_id,
1093        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1094        "reason": response.reason,
1095    })))
1096}
1097
1098fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1099    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1100    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1101        "name": "HumanTimeoutError",
1102        "category": ErrorCategory::Timeout.as_str(),
1103        "message": format!("{} timed out", kind.as_str()),
1104        "request_id": request_id,
1105        "kind": kind.as_str(),
1106    })))
1107}
1108
1109fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1110    match default {
1111        VmValue::Int(_) => match value {
1112            VmValue::Int(_) => value.clone(),
1113            VmValue::Float(number) => VmValue::Int(*number as i64),
1114            VmValue::String(text) => text
1115                .parse::<i64>()
1116                .map(VmValue::Int)
1117                .unwrap_or_else(|_| default.clone()),
1118            _ => default.clone(),
1119        },
1120        VmValue::Float(_) => match value {
1121            VmValue::Float(_) => value.clone(),
1122            VmValue::Int(number) => VmValue::Float(*number as f64),
1123            VmValue::String(text) => text
1124                .parse::<f64>()
1125                .map(VmValue::Float)
1126                .unwrap_or_else(|_| default.clone()),
1127            _ => default.clone(),
1128        },
1129        VmValue::Bool(_) => match value {
1130            VmValue::Bool(_) => value.clone(),
1131            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1132            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1133            _ => default.clone(),
1134        },
1135        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1136        VmValue::Duration(_) => match value {
1137            VmValue::Duration(_) => value.clone(),
1138            VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1139            _ => default.clone(),
1140        },
1141        VmValue::Nil => value.clone(),
1142        _ => {
1143            if value.type_name() == default.type_name() {
1144                value.clone()
1145            } else {
1146                default.clone()
1147            }
1148        }
1149    }
1150}
1151
1152fn log_error(error: impl std::fmt::Display) -> VmError {
1153    VmError::Runtime(error.to_string())
1154}
1155
1156fn is_replay() -> bool {
1157    std::env::var("HARN_REPLAY")
1158        .ok()
1159        .is_some_and(|value| !value.trim().is_empty() && value != "0")
1160}
1161
1162fn now_rfc3339() -> String {
1163    OffsetDateTime::now_utc()
1164        .format(&Rfc3339)
1165        .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1166}
1167
1168fn new_trace_id() -> String {
1169    format!("trace_{}", Uuid::now_v7())
1170}
1171
1172fn vm_bool(value: &VmValue) -> Option<bool> {
1173    match value {
1174        VmValue::Bool(flag) => Some(*flag),
1175        _ => None,
1176    }
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181    use super::{
1182        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1183    };
1184    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1185    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1186
1187    async fn execute_hitl_script(
1188        base_dir: &std::path::Path,
1189        source: &str,
1190    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1191        reset_thread_local_state();
1192        let log = install_default_for_base_dir(base_dir).expect("install event log");
1193        let chunk = compile_source(source).expect("compile source");
1194        let mut vm = Vm::new();
1195        register_vm_stdlib(&mut vm);
1196        vm.set_source_dir(base_dir);
1197        vm.execute(&chunk).await?;
1198        let output = vm.output().trim_end().to_string();
1199        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1200        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1201        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1202        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1203        Ok((
1204            output,
1205            question_events,
1206            approval_events,
1207            dual_control_events,
1208            escalation_events,
1209        ))
1210    }
1211
1212    async fn event_kinds(
1213        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1214        topic: &str,
1215    ) -> Vec<String> {
1216        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1217            .await
1218            .expect("read topic")
1219            .into_iter()
1220            .map(|(_, event)| event.kind)
1221            .collect()
1222    }
1223
1224    #[tokio::test(flavor = "current_thread")]
1225    async fn ask_user_coerces_to_default_type_and_logs_events() {
1226        tokio::task::LocalSet::new()
1227            .run_until(async {
1228                let dir = tempfile::tempdir().expect("tempdir");
1229                let source = r#"
1230pipeline test(task) {
1231  host_mock("hitl", "question", {answer: "9"})
1232  let answer: int = ask_user("Pick a number", {default: 0})
1233  println(answer)
1234}
1235"#;
1236                let (
1237                    output,
1238                    question_events,
1239                    approval_events,
1240                    dual_control_events,
1241                    escalation_events,
1242                ) = execute_hitl_script(dir.path(), source)
1243                    .await
1244                    .expect("script succeeds");
1245                assert_eq!(output, "9");
1246                assert_eq!(
1247                    question_events,
1248                    vec![
1249                        "hitl.question_asked".to_string(),
1250                        "hitl.response_received".to_string()
1251                    ]
1252                );
1253                assert!(approval_events.is_empty());
1254                assert!(dual_control_events.is_empty());
1255                assert!(escalation_events.is_empty());
1256            })
1257            .await;
1258    }
1259
1260    #[tokio::test(flavor = "current_thread")]
1261    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1262        tokio::task::LocalSet::new()
1263            .run_until(async {
1264                let dir = tempfile::tempdir().expect("tempdir");
1265                let source = r#"
1266pipeline test(task) {
1267  host_mock("hitl", "approval", [
1268    {approved: true, reviewer: "alice", reason: "ok"},
1269    {approved: true, reviewer: "bob", reason: "ship it"},
1270  ])
1271  let record = request_approval(
1272    "deploy production",
1273    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1274  )
1275  println(record.approved)
1276  println(len(record.reviewers))
1277  println(record.reviewers[0])
1278  println(record.reviewers[1])
1279}
1280"#;
1281                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1282                    .await
1283                    .expect("script succeeds");
1284                assert_eq!(
1285                    approval_events,
1286                    vec![
1287                        "hitl.approval_requested".to_string(),
1288                        "hitl.response_received".to_string(),
1289                        "hitl.response_received".to_string(),
1290                        "hitl.approval_approved".to_string(),
1291                    ]
1292                );
1293            })
1294            .await;
1295    }
1296
1297    #[tokio::test(flavor = "current_thread")]
1298    async fn request_approval_surfaces_denials_as_typed_errors() {
1299        tokio::task::LocalSet::new()
1300            .run_until(async {
1301                let dir = tempfile::tempdir().expect("tempdir");
1302                let source = r#"
1303pipeline test(task) {
1304  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1305  let denied = try {
1306    request_approval("drop table", {reviewers: ["alice"]})
1307  }
1308  println(is_err(denied))
1309  println(unwrap_err(denied).name)
1310  println(unwrap_err(denied).reason)
1311}
1312"#;
1313                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1314                    .await
1315                    .expect("script succeeds");
1316                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1317                assert_eq!(
1318                    approval_events,
1319                    vec![
1320                        "hitl.approval_requested".to_string(),
1321                        "hitl.response_received".to_string(),
1322                        "hitl.approval_denied".to_string(),
1323                    ]
1324                );
1325            })
1326            .await;
1327    }
1328
1329    #[tokio::test(flavor = "current_thread")]
1330    async fn dual_control_executes_action_after_quorum() {
1331        tokio::task::LocalSet::new()
1332            .run_until(async {
1333                let dir = tempfile::tempdir().expect("tempdir");
1334                let source = r#"
1335pipeline test(task) {
1336  host_mock("hitl", "dual_control", [
1337    {approved: true, reviewer: "alice"},
1338    {approved: true, reviewer: "bob"},
1339  ])
1340  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1341  println(result)
1342}
1343"#;
1344                let (output, _, _, dual_control_events, _) =
1345                    execute_hitl_script(dir.path(), source)
1346                        .await
1347                        .expect("script succeeds");
1348                assert_eq!(output, "launched");
1349                assert_eq!(
1350                    dual_control_events,
1351                    vec![
1352                        "hitl.dual_control_requested".to_string(),
1353                        "hitl.response_received".to_string(),
1354                        "hitl.response_received".to_string(),
1355                        "hitl.dual_control_approved".to_string(),
1356                        "hitl.dual_control_executed".to_string(),
1357                    ]
1358                );
1359            })
1360            .await;
1361    }
1362
1363    #[tokio::test(flavor = "current_thread")]
1364    async fn escalate_to_waits_for_acceptance_event() {
1365        tokio::task::LocalSet::new()
1366            .run_until(async {
1367                let dir = tempfile::tempdir().expect("tempdir");
1368                let source = r#"
1369pipeline test(task) {
1370  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1371  let handle = escalate_to("admin", "need override")
1372  println(handle.status)
1373  println(handle.reviewer)
1374}
1375"#;
1376                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1377                    .await
1378                    .expect("script succeeds");
1379                assert_eq!(output, "accepted\nlead");
1380                assert_eq!(
1381                    escalation_events,
1382                    vec![
1383                        "hitl.escalation_issued".to_string(),
1384                        "hitl.escalation_accepted".to_string(),
1385                    ]
1386                );
1387            })
1388            .await;
1389    }
1390}