Skip to main content

harn_vm/stdlib/
hitl.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::{Duration as StdDuration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16    active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17    EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::triggers::dispatcher::current_dispatch_context;
22use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
23use crate::vm::{clone_async_builtin_child_vm, Vm};
24
25const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
26const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
27
28pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
29pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
30pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
31pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
32
33thread_local! {
34    static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
35}
36
37#[derive(Default)]
38pub(crate) struct RequestSequenceState {
39    pub(crate) instance_key: String,
40    pub(crate) next_seq: u64,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum HitlRequestKind {
46    Question,
47    Approval,
48    DualControl,
49    Escalation,
50}
51
52impl HitlRequestKind {
53    pub(crate) fn as_str(self) -> &'static str {
54        match self {
55            Self::Question => "question",
56            Self::Approval => "approval",
57            Self::DualControl => "dual_control",
58            Self::Escalation => "escalation",
59        }
60    }
61
62    fn topic(self) -> &'static str {
63        match self {
64            Self::Question => HITL_QUESTIONS_TOPIC,
65            Self::Approval => HITL_APPROVALS_TOPIC,
66            Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
67            Self::Escalation => HITL_ESCALATIONS_TOPIC,
68        }
69    }
70
71    fn request_event_kind(self) -> &'static str {
72        match self {
73            Self::Question => "hitl.question_asked",
74            Self::Approval => "hitl.approval_requested",
75            Self::DualControl => "hitl.dual_control_requested",
76            Self::Escalation => "hitl.escalation_issued",
77        }
78    }
79
80    pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
81        if request_id.starts_with("hitl_question_") {
82            Some(Self::Question)
83        } else if request_id.starts_with("hitl_approval_") {
84            Some(Self::Approval)
85        } else if request_id.starts_with("hitl_dual_control_") {
86            Some(Self::DualControl)
87        } else if request_id.starts_with("hitl_escalation_") {
88            Some(Self::Escalation)
89        } else {
90            None
91        }
92    }
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct HitlHostResponse {
97    pub request_id: String,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub answer: Option<JsonValue>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub approved: Option<bool>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub accepted: Option<bool>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub reviewer: Option<String>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub reason: Option<String>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub metadata: Option<JsonValue>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub responded_at: Option<String>,
112}
113
114#[derive(Clone, Debug, Serialize, Deserialize)]
115struct HitlRequestEnvelope {
116    request_id: String,
117    kind: HitlRequestKind,
118    #[serde(default)]
119    agent: String,
120    trace_id: String,
121    requested_at: String,
122    payload: JsonValue,
123}
124
125#[derive(Clone, Debug, Serialize, Deserialize)]
126struct HitlTimeoutRecord {
127    request_id: String,
128    kind: HitlRequestKind,
129    trace_id: String,
130    timed_out_at: String,
131}
132
133#[derive(Clone, Debug)]
134struct DispatchKeys {
135    instance_key: String,
136    stable_base: String,
137    agent: String,
138    trace_id: String,
139}
140
141#[derive(Clone, Debug)]
142struct AskUserOptions {
143    schema: Option<VmValue>,
144    timeout: Option<StdDuration>,
145    default: Option<VmValue>,
146}
147
148#[derive(Clone, Debug)]
149struct ApprovalOptions {
150    detail: Option<VmValue>,
151    quorum: u32,
152    reviewers: Vec<String>,
153    deadline: StdDuration,
154}
155
156#[derive(Clone, Debug)]
157struct ApprovalProgress {
158    reviewers: BTreeSet<String>,
159    reason: Option<String>,
160    approved_at: Option<String>,
161}
162
163#[derive(Clone, Debug)]
164enum WaitOutcome {
165    Response(HitlHostResponse),
166    Timeout,
167}
168
169pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
170    vm.register_async_builtin("ask_user", |args| {
171        Box::pin(async move { ask_user_impl(&args).await })
172    });
173
174    vm.register_async_builtin("request_approval", |args| {
175        Box::pin(async move { request_approval_impl(&args).await })
176    });
177
178    vm.register_async_builtin("dual_control", |args| {
179        Box::pin(async move { dual_control_impl(&args).await })
180    });
181
182    vm.register_async_builtin("escalate_to", |args| {
183        Box::pin(async move { escalate_to_impl(&args).await })
184    });
185}
186
187pub(crate) fn reset_hitl_state() {
188    REQUEST_SEQUENCE.with(|slot| {
189        *slot.borrow_mut() = RequestSequenceState::default();
190    });
191}
192
193pub(crate) fn take_hitl_state() -> RequestSequenceState {
194    REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
195}
196
197pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
198    REQUEST_SEQUENCE.with(|slot| {
199        *slot.borrow_mut() = state;
200    });
201}
202
203pub async fn append_hitl_response(
204    base_dir: Option<&Path>,
205    mut response: HitlHostResponse,
206) -> Result<u64, String> {
207    let kind = HitlRequestKind::from_request_id(&response.request_id)
208        .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
209    if response.responded_at.is_none() {
210        response.responded_at = Some(now_rfc3339());
211    }
212    let log = ensure_hitl_event_log_for(base_dir)?;
213    let headers = response_headers(&response.request_id);
214    let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
215    log.append(
216        &topic,
217        LogEvent::new(
218            match kind {
219                HitlRequestKind::Escalation => "hitl.escalation_accepted",
220                _ => "hitl.response_received",
221            },
222            serde_json::to_value(response).map_err(|error| error.to_string())?,
223        )
224        .with_headers(headers),
225    )
226    .await
227    .map_err(|error| error.to_string())
228}
229
230async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
231    let prompt = required_string_arg(args, 0, "ask_user")?;
232    let options = parse_ask_user_options(args.get(1))?;
233    let keys = current_dispatch_keys();
234    let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
235    let trace_id = keys
236        .as_ref()
237        .map(|keys| keys.trace_id.clone())
238        .unwrap_or_else(new_trace_id);
239    let log = ensure_hitl_event_log();
240    let request = HitlRequestEnvelope {
241        request_id: request_id.clone(),
242        kind: HitlRequestKind::Question,
243        agent: keys
244            .as_ref()
245            .map(|keys| keys.agent.clone())
246            .unwrap_or_default(),
247        trace_id: trace_id.clone(),
248        requested_at: now_rfc3339(),
249        payload: json!({
250            "prompt": prompt,
251            "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
252            "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
253            "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
254        }),
255    };
256    append_request(&log, &request).await?;
257    maybe_notify_host(&request);
258    maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
259
260    match wait_for_terminal_event(
261        &log,
262        HitlRequestKind::Question,
263        &request_id,
264        options.timeout,
265        &trace_id,
266    )
267    .await?
268    {
269        WaitOutcome::Timeout => {
270            if let Some(default) = options.default {
271                return Ok(default);
272            }
273            Err(timeout_error(&request_id, HitlRequestKind::Question))
274        }
275        WaitOutcome::Response(response) => {
276            let answer = response
277                .answer
278                .as_ref()
279                .map(crate::stdlib::json_to_vm_value)
280                .unwrap_or(VmValue::Nil);
281            if let Some(schema) = options.schema.as_ref() {
282                return schema_expect_value(&answer, schema, true);
283            }
284            if let Some(default) = options.default.as_ref() {
285                return Ok(coerce_like_default(&answer, default));
286            }
287            Ok(answer)
288        }
289    }
290}
291
292async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
293    let action = required_string_arg(args, 0, "request_approval")?;
294    let options = parse_approval_options(args.get(1), "request_approval")?;
295    let keys = current_dispatch_keys();
296    let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
297    let trace_id = keys
298        .as_ref()
299        .map(|keys| keys.trace_id.clone())
300        .unwrap_or_else(new_trace_id);
301    let log = ensure_hitl_event_log();
302    let request = HitlRequestEnvelope {
303        request_id: request_id.clone(),
304        kind: HitlRequestKind::Approval,
305        agent: keys
306            .as_ref()
307            .map(|keys| keys.agent.clone())
308            .unwrap_or_default(),
309        trace_id: trace_id.clone(),
310        requested_at: now_rfc3339(),
311        payload: json!({
312            "action": action,
313            "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
314            "quorum": options.quorum,
315            "reviewers": options.reviewers,
316            "deadline_ms": options.deadline.as_millis() as u64,
317        }),
318    };
319    append_request(&log, &request).await?;
320    maybe_notify_host(&request);
321    maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
322
323    let record = wait_for_approval_quorum(
324        &log,
325        HitlRequestKind::Approval,
326        &request_id,
327        options.quorum,
328        &options.reviewers,
329        options.deadline,
330        &trace_id,
331    )
332    .await?;
333
334    append_named_event(
335        &log,
336        HitlRequestKind::Approval,
337        "hitl.approval_approved",
338        &request_id,
339        &trace_id,
340        json!({
341            "request_id": request_id,
342            "record": crate::llm::vm_value_to_json(&record),
343        }),
344    )
345    .await?;
346
347    Ok(record)
348}
349
350async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
351    let n = required_positive_int_arg(args, 0, "dual_control")?;
352    let m = required_positive_int_arg(args, 1, "dual_control")?;
353    if n > m {
354        return Err(VmError::Runtime(
355            "dual_control: n must be less than or equal to m".to_string(),
356        ));
357    }
358    let action = args
359        .get(2)
360        .and_then(|value| match value {
361            VmValue::Closure(closure) => Some(closure.clone()),
362            _ => None,
363        })
364        .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
365    let approvers = optional_string_list(args.get(3), "dual_control")?;
366    if !approvers.is_empty() && approvers.len() < m as usize {
367        return Err(VmError::Runtime(format!(
368            "dual_control: expected at least {m} approvers, got {}",
369            approvers.len()
370        )));
371    }
372
373    let keys = current_dispatch_keys();
374    let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
375    let trace_id = keys
376        .as_ref()
377        .map(|keys| keys.trace_id.clone())
378        .unwrap_or_else(new_trace_id);
379    let action_name = if action.func.name.is_empty() {
380        "anonymous".to_string()
381    } else {
382        action.func.name.clone()
383    };
384    let log = ensure_hitl_event_log();
385    let request = HitlRequestEnvelope {
386        request_id: request_id.clone(),
387        kind: HitlRequestKind::DualControl,
388        agent: keys
389            .as_ref()
390            .map(|keys| keys.agent.clone())
391            .unwrap_or_default(),
392        trace_id: trace_id.clone(),
393        requested_at: now_rfc3339(),
394        payload: json!({
395            "n": n,
396            "m": m,
397            "action": action_name,
398            "approvers": approvers,
399            "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
400        }),
401    };
402    append_request(&log, &request).await?;
403    maybe_notify_host(&request);
404    maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
405
406    let record = wait_for_approval_quorum(
407        &log,
408        HitlRequestKind::DualControl,
409        &request_id,
410        n as u32,
411        &approvers,
412        StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
413        &trace_id,
414    )
415    .await?;
416
417    append_named_event(
418        &log,
419        HitlRequestKind::DualControl,
420        "hitl.dual_control_approved",
421        &request_id,
422        &trace_id,
423        json!({
424            "request_id": request_id,
425            "record": crate::llm::vm_value_to_json(&record),
426        }),
427    )
428    .await?;
429
430    let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
431        VmError::Runtime("dual_control requires an async builtin VM context".to_string())
432    })?;
433    let result = vm.call_closure_pub(&action, &[], &[]).await?;
434
435    append_named_event(
436        &log,
437        HitlRequestKind::DualControl,
438        "hitl.dual_control_executed",
439        &request_id,
440        &trace_id,
441        json!({
442            "request_id": request_id,
443            "result": crate::llm::vm_value_to_json(&result),
444        }),
445    )
446    .await?;
447
448    Ok(result)
449}
450
451async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
452    let role = required_string_arg(args, 0, "escalate_to")?;
453    let reason = required_string_arg(args, 1, "escalate_to")?;
454    let keys = current_dispatch_keys();
455    let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
456    let trace_id = keys
457        .as_ref()
458        .map(|keys| keys.trace_id.clone())
459        .unwrap_or_else(new_trace_id);
460    let log = ensure_hitl_event_log();
461    let request = HitlRequestEnvelope {
462        request_id: request_id.clone(),
463        kind: HitlRequestKind::Escalation,
464        agent: keys
465            .as_ref()
466            .map(|keys| keys.agent.clone())
467            .unwrap_or_default(),
468        trace_id: trace_id.clone(),
469        requested_at: now_rfc3339(),
470        payload: json!({
471            "role": role,
472            "reason": reason,
473        }),
474    };
475    append_request(&log, &request).await?;
476    maybe_notify_host(&request);
477    maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
478
479    match wait_for_terminal_event(
480        &log,
481        HitlRequestKind::Escalation,
482        &request_id,
483        None,
484        &trace_id,
485    )
486    .await?
487    {
488        WaitOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
489        WaitOutcome::Response(response) => Ok(crate::stdlib::json_to_vm_value(&json!({
490            "request_id": request_id,
491            "role": role,
492            "reason": reason,
493            "trace_id": trace_id,
494            "status": if response.accepted.unwrap_or(false) { "accepted" } else { "pending" },
495            "accepted_at": response.responded_at,
496            "reviewer": response.reviewer,
497        }))),
498    }
499}
500
501async fn wait_for_approval_quorum(
502    log: &Arc<AnyEventLog>,
503    kind: HitlRequestKind,
504    request_id: &str,
505    quorum: u32,
506    reviewers: &[String],
507    timeout: StdDuration,
508    trace_id: &str,
509) -> Result<VmValue, VmError> {
510    let deadline = Instant::now() + timeout;
511    let mut progress = ApprovalProgress {
512        reviewers: BTreeSet::new(),
513        reason: None,
514        approved_at: None,
515    };
516    let allowed_reviewers: BTreeSet<&str> = reviewers.iter().map(String::as_str).collect();
517    let topic = topic(kind)?;
518
519    process_existing_approval_events(
520        log,
521        &topic,
522        request_id,
523        quorum,
524        &allowed_reviewers,
525        &mut progress,
526        trace_id,
527    )
528    .await?;
529    if progress.reviewers.len() as u32 >= quorum {
530        return Ok(approval_record_value(&progress));
531    }
532    if is_replay() {
533        return Err(VmError::Runtime(format!(
534            "replay is missing recorded HITL approvals for '{request_id}'"
535        )));
536    }
537
538    let start_from = log.latest(&topic).await.map_err(log_error)?;
539    let stream = log
540        .clone()
541        .subscribe(&topic, start_from)
542        .await
543        .map_err(log_error)?;
544    pin_mut!(stream);
545    loop {
546        let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
547            append_timeout(log, kind, request_id, trace_id).await?;
548            return Err(timeout_error(request_id, kind));
549        };
550        let next = tokio::time::timeout(remaining, stream.next()).await;
551        let event = match next {
552            Ok(Some(Ok((_, event)))) => event,
553            Ok(Some(Err(error))) => return Err(log_error(error)),
554            Ok(None) => {
555                append_timeout(log, kind, request_id, trace_id).await?;
556                return Err(timeout_error(request_id, kind));
557            }
558            Err(_) => {
559                append_timeout(log, kind, request_id, trace_id).await?;
560                return Err(timeout_error(request_id, kind));
561            }
562        };
563        if !event_matches_request(&event, request_id) {
564            continue;
565        }
566        if event.kind == "hitl.timeout" {
567            return Err(timeout_error(request_id, kind));
568        }
569        if event.kind != "hitl.response_received" {
570            continue;
571        }
572        let response: HitlHostResponse = serde_json::from_value(event.payload)
573            .map_err(|error| VmError::Runtime(error.to_string()))?;
574        if let Some(reviewer) = response.reviewer.as_deref() {
575            if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
576                continue;
577            }
578            if progress.reviewers.contains(reviewer) {
579                continue;
580            }
581        }
582        if response.approved.unwrap_or(false) {
583            if let Some(reviewer) = response.reviewer {
584                progress.reviewers.insert(reviewer);
585            }
586            progress.reason = response.reason;
587            progress.approved_at = response.responded_at;
588            if progress.reviewers.len() as u32 >= quorum {
589                return Ok(approval_record_value(&progress));
590            }
591            continue;
592        }
593
594        append_named_event(
595            log,
596            kind,
597            match kind {
598                HitlRequestKind::DualControl => "hitl.dual_control_denied",
599                _ => "hitl.approval_denied",
600            },
601            request_id,
602            trace_id,
603            json!({
604                "request_id": request_id,
605                "reviewer": response.reviewer,
606                "reason": response.reason,
607            }),
608        )
609        .await?;
610        return Err(approval_denied_error(request_id, response));
611    }
612}
613
614async fn process_existing_approval_events(
615    log: &Arc<AnyEventLog>,
616    topic: &Topic,
617    request_id: &str,
618    quorum: u32,
619    allowed_reviewers: &BTreeSet<&str>,
620    progress: &mut ApprovalProgress,
621    trace_id: &str,
622) -> Result<(), VmError> {
623    let existing = log
624        .read_range(topic, None, usize::MAX)
625        .await
626        .map_err(log_error)?;
627    for (_, event) in existing {
628        if !event_matches_request(&event, request_id) {
629            continue;
630        }
631        if event.kind == "hitl.timeout" {
632            return Err(timeout_error(request_id, kind_from_topic(topic)?));
633        }
634        if event.kind != "hitl.response_received" {
635            continue;
636        }
637        let response: HitlHostResponse = serde_json::from_value(event.payload)
638            .map_err(|error| VmError::Runtime(error.to_string()))?;
639        let Some(reviewer) = response.reviewer.as_deref() else {
640            continue;
641        };
642        if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
643            continue;
644        }
645        if !response.approved.unwrap_or(false) {
646            let kind = kind_from_topic(topic)?;
647            append_named_event(
648                log,
649                kind,
650                match kind {
651                    HitlRequestKind::DualControl => "hitl.dual_control_denied",
652                    _ => "hitl.approval_denied",
653                },
654                request_id,
655                trace_id,
656                json!({
657                    "request_id": request_id,
658                    "reviewer": response.reviewer,
659                    "reason": response.reason,
660                }),
661            )
662            .await?;
663            return Err(approval_denied_error(request_id, response));
664        }
665        if progress.reviewers.insert(reviewer.to_string()) {
666            progress.reason = response.reason;
667            progress.approved_at = response.responded_at;
668        }
669        if progress.reviewers.len() as u32 >= quorum {
670            break;
671        }
672    }
673    Ok(())
674}
675
676async fn wait_for_terminal_event(
677    log: &Arc<AnyEventLog>,
678    kind: HitlRequestKind,
679    request_id: &str,
680    timeout: Option<StdDuration>,
681    trace_id: &str,
682) -> Result<WaitOutcome, VmError> {
683    let topic = topic(kind)?;
684    if let Some(outcome) = find_existing_terminal_event(log, &topic, request_id).await? {
685        return Ok(outcome);
686    }
687    if is_replay() {
688        return Err(VmError::Runtime(format!(
689            "replay is missing a recorded HITL response for '{request_id}'"
690        )));
691    }
692
693    let start_from = log.latest(&topic).await.map_err(log_error)?;
694    let stream = log
695        .clone()
696        .subscribe(&topic, start_from)
697        .await
698        .map_err(log_error)?;
699    pin_mut!(stream);
700    let deadline = timeout.map(|timeout| Instant::now() + timeout);
701
702    loop {
703        let next_event = if let Some(deadline) = deadline {
704            let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
705                append_timeout(log, kind, request_id, trace_id).await?;
706                return Ok(WaitOutcome::Timeout);
707            };
708            match tokio::time::timeout(remaining, stream.next()).await {
709                Ok(next) => next,
710                Err(_) => {
711                    append_timeout(log, kind, request_id, trace_id).await?;
712                    return Ok(WaitOutcome::Timeout);
713                }
714            }
715        } else {
716            stream.next().await
717        };
718
719        let Some(received) = next_event else {
720            return Err(VmError::Runtime(format!(
721                "HITL wait for '{request_id}' ended before the host responded"
722            )));
723        };
724        let (_, event) = received.map_err(log_error)?;
725        if !event_matches_request(&event, request_id) {
726            continue;
727        }
728        match event.kind.as_str() {
729            "hitl.timeout" => return Ok(WaitOutcome::Timeout),
730            "hitl.response_received" | "hitl.escalation_accepted" => {
731                let response: HitlHostResponse = serde_json::from_value(event.payload)
732                    .map_err(|error| VmError::Runtime(error.to_string()))?;
733                if kind == HitlRequestKind::Escalation && !response.accepted.unwrap_or(false) {
734                    continue;
735                }
736                return Ok(WaitOutcome::Response(response));
737            }
738            _ => {}
739        }
740    }
741}
742
743async fn find_existing_terminal_event(
744    log: &Arc<AnyEventLog>,
745    topic: &Topic,
746    request_id: &str,
747) -> Result<Option<WaitOutcome>, VmError> {
748    let events = log
749        .read_range(topic, None, usize::MAX)
750        .await
751        .map_err(log_error)?;
752    for (_, event) in events {
753        if !event_matches_request(&event, request_id) {
754            continue;
755        }
756        match event.kind.as_str() {
757            "hitl.timeout" => return Ok(Some(WaitOutcome::Timeout)),
758            "hitl.response_received" | "hitl.escalation_accepted" => {
759                let response: HitlHostResponse = serde_json::from_value(event.payload)
760                    .map_err(|error| VmError::Runtime(error.to_string()))?;
761                if event.kind == "hitl.escalation_accepted" && !response.accepted.unwrap_or(false) {
762                    continue;
763                }
764                return Ok(Some(WaitOutcome::Response(response)));
765            }
766            _ => {}
767        }
768    }
769    Ok(None)
770}
771
772async fn append_request(
773    log: &Arc<AnyEventLog>,
774    request: &HitlRequestEnvelope,
775) -> Result<(), VmError> {
776    let topic = topic(request.kind)?;
777    log.append(
778        &topic,
779        LogEvent::new(
780            request.kind.request_event_kind(),
781            serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
782        )
783        .with_headers(request_headers(request)),
784    )
785    .await
786    .map(|_| ())
787    .map_err(log_error)
788}
789
790async fn append_named_event(
791    log: &Arc<AnyEventLog>,
792    kind: HitlRequestKind,
793    event_kind: &str,
794    request_id: &str,
795    trace_id: &str,
796    payload: JsonValue,
797) -> Result<(), VmError> {
798    let topic = topic(kind)?;
799    let headers = headers_with_trace(request_id, trace_id);
800    log.append(
801        &topic,
802        LogEvent::new(event_kind, payload).with_headers(headers),
803    )
804    .await
805    .map(|_| ())
806    .map_err(log_error)
807}
808
809async fn append_timeout(
810    log: &Arc<AnyEventLog>,
811    kind: HitlRequestKind,
812    request_id: &str,
813    trace_id: &str,
814) -> Result<(), VmError> {
815    append_named_event(
816        log,
817        kind,
818        "hitl.timeout",
819        request_id,
820        trace_id,
821        serde_json::to_value(HitlTimeoutRecord {
822            request_id: request_id.to_string(),
823            kind,
824            trace_id: trace_id.to_string(),
825            timed_out_at: now_rfc3339(),
826        })
827        .map_err(|error| VmError::Runtime(error.to_string()))?,
828    )
829    .await
830}
831
832async fn maybe_apply_mock_response(
833    kind: HitlRequestKind,
834    request_id: &str,
835    request_payload: &JsonValue,
836) -> Result<(), VmError> {
837    let mut params = request_payload
838        .as_object()
839        .cloned()
840        .unwrap_or_default()
841        .into_iter()
842        .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
843        .collect::<BTreeMap<_, _>>();
844    params.insert(
845        "request_id".to_string(),
846        VmValue::String(Rc::from(request_id.to_string())),
847    );
848    let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), &params) else {
849        return Ok(());
850    };
851    let value = result?;
852    let responses = match value {
853        VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
854        other => vec![other],
855    };
856    for response in responses {
857        let response_dict = response.as_dict().ok_or_else(|| {
858            VmError::Runtime(format!(
859                "mocked HITL {} response must be a dict or list<dict>",
860                kind.as_str()
861            ))
862        })?;
863        let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
864        append_hitl_response(None, hitl_response)
865            .await
866            .map_err(VmError::Runtime)?;
867    }
868    Ok(())
869}
870
871fn parse_hitl_response_dict(
872    request_id: &str,
873    response_dict: &BTreeMap<String, VmValue>,
874) -> Result<HitlHostResponse, VmError> {
875    Ok(HitlHostResponse {
876        request_id: request_id.to_string(),
877        answer: response_dict
878            .get("answer")
879            .map(crate::llm::vm_value_to_json),
880        approved: response_dict.get("approved").and_then(vm_bool),
881        accepted: response_dict.get("accepted").and_then(vm_bool),
882        reviewer: response_dict.get("reviewer").map(VmValue::display),
883        reason: response_dict.get("reason").map(VmValue::display),
884        metadata: response_dict
885            .get("metadata")
886            .map(crate::llm::vm_value_to_json),
887        responded_at: response_dict.get("responded_at").map(VmValue::display),
888    })
889}
890
891fn maybe_notify_host(request: &HitlRequestEnvelope) {
892    let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
893        return;
894    };
895    bridge.notify(
896        "harn.hitl.requested",
897        serde_json::to_value(request).unwrap_or(JsonValue::Null),
898    );
899}
900
901fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
902    let Some(value) = value else {
903        return Ok(AskUserOptions {
904            schema: None,
905            timeout: None,
906            default: None,
907        });
908    };
909    let dict = value
910        .as_dict()
911        .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
912    Ok(AskUserOptions {
913        schema: dict
914            .get("schema")
915            .cloned()
916            .filter(|value| !matches!(value, VmValue::Nil)),
917        timeout: dict.get("timeout").map(parse_duration_value).transpose()?,
918        default: dict
919            .get("default")
920            .cloned()
921            .filter(|value| !matches!(value, VmValue::Nil)),
922    })
923}
924
925fn parse_approval_options(
926    value: Option<&VmValue>,
927    builtin: &str,
928) -> Result<ApprovalOptions, VmError> {
929    let dict = match value {
930        None => None,
931        Some(VmValue::Dict(dict)) => Some(dict),
932        Some(_) => {
933            return Err(VmError::Runtime(format!(
934                "{builtin}: options must be a dict"
935            )))
936        }
937    };
938    let quorum = dict
939        .and_then(|dict| dict.get("quorum"))
940        .and_then(VmValue::as_int)
941        .unwrap_or(1);
942    if quorum <= 0 {
943        return Err(VmError::Runtime(format!(
944            "{builtin}: quorum must be positive"
945        )));
946    }
947    let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
948    let deadline = dict
949        .and_then(|dict| dict.get("deadline"))
950        .map(parse_duration_value)
951        .transpose()?
952        .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
953    Ok(ApprovalOptions {
954        detail: dict.and_then(|dict| dict.get("detail")).cloned(),
955        quorum: quorum as u32,
956        reviewers,
957        deadline,
958    })
959}
960
961fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
962    args.get(idx)
963        .map(VmValue::display)
964        .filter(|value| !value.is_empty())
965        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
966}
967
968fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
969    let value = args
970        .get(idx)
971        .and_then(VmValue::as_int)
972        .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
973    if value <= 0 {
974        return Err(VmError::Runtime(format!(
975            "{builtin}: expected a positive int at {idx}"
976        )));
977    }
978    Ok(value)
979}
980
981fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
982    let Some(value) = value else {
983        return Ok(Vec::new());
984    };
985    match value {
986        VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
987        _ => Err(VmError::Runtime(format!(
988            "{builtin}: expected list<string>"
989        ))),
990    }
991}
992
993fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
994    match value {
995        VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
996        VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
997        VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
998        _ => Err(VmError::Runtime(
999            "expected a duration or millisecond count".to_string(),
1000        )),
1001    }
1002}
1003
1004fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1005    active_event_log()
1006        .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1007}
1008
1009fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1010    if let Some(log) = active_event_log() {
1011        return Ok(log);
1012    }
1013    let Some(base_dir) = base_dir else {
1014        return Ok(install_memory_for_current_thread(
1015            HITL_EVENT_LOG_QUEUE_DEPTH,
1016        ));
1017    };
1018    install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1019}
1020
1021fn current_dispatch_keys() -> Option<DispatchKeys> {
1022    let context = current_dispatch_context()?;
1023    let stable_base = context
1024        .replay_of_event_id
1025        .clone()
1026        .unwrap_or_else(|| context.trigger_event.id.0.clone());
1027    let instance_key = format!(
1028        "{}::{}",
1029        context.trigger_event.id.0,
1030        context.replay_of_event_id.as_deref().unwrap_or("live")
1031    );
1032    Some(DispatchKeys {
1033        instance_key,
1034        stable_base,
1035        agent: context.agent_id,
1036        trace_id: context.trigger_event.trace_id.0,
1037    })
1038}
1039
1040fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1041    if let Some(keys) = dispatch_keys {
1042        let seq = REQUEST_SEQUENCE.with(|slot| {
1043            let mut state = slot.borrow_mut();
1044            if state.instance_key != keys.instance_key {
1045                state.instance_key = keys.instance_key.clone();
1046                state.next_seq = 0;
1047            }
1048            state.next_seq += 1;
1049            state.next_seq
1050        });
1051        return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1052    }
1053    format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1054}
1055
1056fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1057    headers_with_trace(&request.request_id, &request.trace_id)
1058}
1059
1060fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1061    let mut headers = BTreeMap::new();
1062    headers.insert("request_id".to_string(), request_id.to_string());
1063    headers
1064}
1065
1066fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1067    let mut headers = response_headers(request_id);
1068    headers.insert("trace_id".to_string(), trace_id.to_string());
1069    headers
1070}
1071
1072fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1073    Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1074}
1075
1076fn kind_from_topic(topic: &Topic) -> Result<HitlRequestKind, VmError> {
1077    match topic.as_str() {
1078        HITL_QUESTIONS_TOPIC => Ok(HitlRequestKind::Question),
1079        HITL_APPROVALS_TOPIC => Ok(HitlRequestKind::Approval),
1080        HITL_DUAL_CONTROL_TOPIC => Ok(HitlRequestKind::DualControl),
1081        HITL_ESCALATIONS_TOPIC => Ok(HitlRequestKind::Escalation),
1082        other => Err(VmError::Runtime(format!("unknown HITL topic '{other}'"))),
1083    }
1084}
1085
1086fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1087    event
1088        .headers
1089        .get("request_id")
1090        .is_some_and(|value| value == request_id)
1091        || event
1092            .payload
1093            .get("request_id")
1094            .and_then(JsonValue::as_str)
1095            .is_some_and(|value| value == request_id)
1096}
1097
1098fn approval_record_value(progress: &ApprovalProgress) -> VmValue {
1099    crate::stdlib::json_to_vm_value(&json!({
1100        "approved": true,
1101        "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1102        "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1103        "reason": progress.reason,
1104    }))
1105}
1106
1107fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1108    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1109        "name": "ApprovalDeniedError",
1110        "category": "generic",
1111        "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1112        "request_id": request_id,
1113        "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1114        "reason": response.reason,
1115    })))
1116}
1117
1118fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1119    let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1120    VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1121        "name": "HumanTimeoutError",
1122        "category": ErrorCategory::Timeout.as_str(),
1123        "message": format!("{} timed out", kind.as_str()),
1124        "request_id": request_id,
1125        "kind": kind.as_str(),
1126    })))
1127}
1128
1129fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1130    match default {
1131        VmValue::Int(_) => match value {
1132            VmValue::Int(_) => value.clone(),
1133            VmValue::Float(number) => VmValue::Int(*number as i64),
1134            VmValue::String(text) => text
1135                .parse::<i64>()
1136                .map(VmValue::Int)
1137                .unwrap_or_else(|_| default.clone()),
1138            _ => default.clone(),
1139        },
1140        VmValue::Float(_) => match value {
1141            VmValue::Float(_) => value.clone(),
1142            VmValue::Int(number) => VmValue::Float(*number as f64),
1143            VmValue::String(text) => text
1144                .parse::<f64>()
1145                .map(VmValue::Float)
1146                .unwrap_or_else(|_| default.clone()),
1147            _ => default.clone(),
1148        },
1149        VmValue::Bool(_) => match value {
1150            VmValue::Bool(_) => value.clone(),
1151            VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1152            VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1153            _ => default.clone(),
1154        },
1155        VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1156        VmValue::Duration(_) => match value {
1157            VmValue::Duration(_) => value.clone(),
1158            VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1159            _ => default.clone(),
1160        },
1161        VmValue::Nil => value.clone(),
1162        _ => {
1163            if value.type_name() == default.type_name() {
1164                value.clone()
1165            } else {
1166                default.clone()
1167            }
1168        }
1169    }
1170}
1171
1172fn log_error(error: impl std::fmt::Display) -> VmError {
1173    VmError::Runtime(error.to_string())
1174}
1175
1176fn is_replay() -> bool {
1177    std::env::var("HARN_REPLAY")
1178        .ok()
1179        .is_some_and(|value| !value.trim().is_empty() && value != "0")
1180}
1181
1182fn now_rfc3339() -> String {
1183    OffsetDateTime::now_utc()
1184        .format(&Rfc3339)
1185        .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1186}
1187
1188fn new_trace_id() -> String {
1189    format!("trace_{}", Uuid::now_v7())
1190}
1191
1192fn vm_bool(value: &VmValue) -> Option<bool> {
1193    match value {
1194        VmValue::Bool(flag) => Some(*flag),
1195        _ => None,
1196    }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201    use super::{
1202        HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1203    };
1204    use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1205    use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1206
1207    async fn execute_hitl_script(
1208        base_dir: &std::path::Path,
1209        source: &str,
1210    ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1211        reset_thread_local_state();
1212        let log = install_default_for_base_dir(base_dir).expect("install event log");
1213        let chunk = compile_source(source).expect("compile source");
1214        let mut vm = Vm::new();
1215        register_vm_stdlib(&mut vm);
1216        vm.set_source_dir(base_dir);
1217        vm.execute(&chunk).await?;
1218        let output = vm.output().trim_end().to_string();
1219        let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1220        let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1221        let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1222        let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1223        Ok((
1224            output,
1225            question_events,
1226            approval_events,
1227            dual_control_events,
1228            escalation_events,
1229        ))
1230    }
1231
1232    async fn event_kinds(
1233        log: std::sync::Arc<crate::event_log::AnyEventLog>,
1234        topic: &str,
1235    ) -> Vec<String> {
1236        log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1237            .await
1238            .expect("read topic")
1239            .into_iter()
1240            .map(|(_, event)| event.kind)
1241            .collect()
1242    }
1243
1244    #[tokio::test(flavor = "current_thread")]
1245    async fn ask_user_coerces_to_default_type_and_logs_events() {
1246        tokio::task::LocalSet::new()
1247            .run_until(async {
1248                let dir = tempfile::tempdir().expect("tempdir");
1249                let source = r#"
1250pipeline test(task) {
1251  host_mock("hitl", "question", {answer: "9"})
1252  let answer: int = ask_user("Pick a number", {default: 0})
1253  println(answer)
1254}
1255"#;
1256                let (
1257                    output,
1258                    question_events,
1259                    approval_events,
1260                    dual_control_events,
1261                    escalation_events,
1262                ) = execute_hitl_script(dir.path(), source)
1263                    .await
1264                    .expect("script succeeds");
1265                assert_eq!(output, "9");
1266                assert_eq!(
1267                    question_events,
1268                    vec![
1269                        "hitl.question_asked".to_string(),
1270                        "hitl.response_received".to_string()
1271                    ]
1272                );
1273                assert!(approval_events.is_empty());
1274                assert!(dual_control_events.is_empty());
1275                assert!(escalation_events.is_empty());
1276            })
1277            .await;
1278    }
1279
1280    #[tokio::test(flavor = "current_thread")]
1281    async fn request_approval_waits_for_quorum_and_emits_a_record() {
1282        tokio::task::LocalSet::new()
1283            .run_until(async {
1284                let dir = tempfile::tempdir().expect("tempdir");
1285                let source = r#"
1286pipeline test(task) {
1287  host_mock("hitl", "approval", [
1288    {approved: true, reviewer: "alice", reason: "ok"},
1289    {approved: true, reviewer: "bob", reason: "ship it"},
1290  ])
1291  let record = request_approval(
1292    "deploy production",
1293    {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1294  )
1295  println(record.approved)
1296  println(len(record.reviewers))
1297  println(record.reviewers[0])
1298  println(record.reviewers[1])
1299}
1300"#;
1301                let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1302                    .await
1303                    .expect("script succeeds");
1304                assert_eq!(
1305                    approval_events,
1306                    vec![
1307                        "hitl.approval_requested".to_string(),
1308                        "hitl.response_received".to_string(),
1309                        "hitl.response_received".to_string(),
1310                        "hitl.approval_approved".to_string(),
1311                    ]
1312                );
1313            })
1314            .await;
1315    }
1316
1317    #[tokio::test(flavor = "current_thread")]
1318    async fn request_approval_surfaces_denials_as_typed_errors() {
1319        tokio::task::LocalSet::new()
1320            .run_until(async {
1321                let dir = tempfile::tempdir().expect("tempdir");
1322                let source = r#"
1323pipeline test(task) {
1324  host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1325  let denied = try {
1326    request_approval("drop table", {reviewers: ["alice"]})
1327  }
1328  println(is_err(denied))
1329  println(unwrap_err(denied).name)
1330  println(unwrap_err(denied).reason)
1331}
1332"#;
1333                let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1334                    .await
1335                    .expect("script succeeds");
1336                assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1337                assert_eq!(
1338                    approval_events,
1339                    vec![
1340                        "hitl.approval_requested".to_string(),
1341                        "hitl.response_received".to_string(),
1342                        "hitl.approval_denied".to_string(),
1343                    ]
1344                );
1345            })
1346            .await;
1347    }
1348
1349    #[tokio::test(flavor = "current_thread")]
1350    async fn dual_control_executes_action_after_quorum() {
1351        tokio::task::LocalSet::new()
1352            .run_until(async {
1353                let dir = tempfile::tempdir().expect("tempdir");
1354                let source = r#"
1355pipeline test(task) {
1356  host_mock("hitl", "dual_control", [
1357    {approved: true, reviewer: "alice"},
1358    {approved: true, reviewer: "bob"},
1359  ])
1360  let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1361  println(result)
1362}
1363"#;
1364                let (output, _, _, dual_control_events, _) =
1365                    execute_hitl_script(dir.path(), source)
1366                        .await
1367                        .expect("script succeeds");
1368                assert_eq!(output, "launched");
1369                assert_eq!(
1370                    dual_control_events,
1371                    vec![
1372                        "hitl.dual_control_requested".to_string(),
1373                        "hitl.response_received".to_string(),
1374                        "hitl.response_received".to_string(),
1375                        "hitl.dual_control_approved".to_string(),
1376                        "hitl.dual_control_executed".to_string(),
1377                    ]
1378                );
1379            })
1380            .await;
1381    }
1382
1383    #[tokio::test(flavor = "current_thread")]
1384    async fn escalate_to_waits_for_acceptance_event() {
1385        tokio::task::LocalSet::new()
1386            .run_until(async {
1387                let dir = tempfile::tempdir().expect("tempdir");
1388                let source = r#"
1389pipeline test(task) {
1390  host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1391  let handle = escalate_to("admin", "need override")
1392  println(handle.status)
1393  println(handle.reviewer)
1394}
1395"#;
1396                let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1397                    .await
1398                    .expect("script succeeds");
1399                assert_eq!(output, "accepted\nlead");
1400                assert_eq!(
1401                    escalation_events,
1402                    vec![
1403                        "hitl.escalation_issued".to_string(),
1404                        "hitl.escalation_accepted".to_string(),
1405                    ]
1406                );
1407            })
1408            .await;
1409    }
1410}