1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::{Duration as StdDuration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::triggers::dispatcher::current_dispatch_context;
22use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
23use crate::vm::{clone_async_builtin_child_vm, Vm};
24
25const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
26const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
27
28pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
29pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
30pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
31pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
32
33thread_local! {
34 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
35}
36
37#[derive(Default)]
38pub(crate) struct RequestSequenceState {
39 pub(crate) instance_key: String,
40 pub(crate) next_seq: u64,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum HitlRequestKind {
46 Question,
47 Approval,
48 DualControl,
49 Escalation,
50}
51
52impl HitlRequestKind {
53 fn as_str(self) -> &'static str {
54 match self {
55 Self::Question => "question",
56 Self::Approval => "approval",
57 Self::DualControl => "dual_control",
58 Self::Escalation => "escalation",
59 }
60 }
61
62 fn topic(self) -> &'static str {
63 match self {
64 Self::Question => HITL_QUESTIONS_TOPIC,
65 Self::Approval => HITL_APPROVALS_TOPIC,
66 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
67 Self::Escalation => HITL_ESCALATIONS_TOPIC,
68 }
69 }
70
71 fn request_event_kind(self) -> &'static str {
72 match self {
73 Self::Question => "hitl.question_asked",
74 Self::Approval => "hitl.approval_requested",
75 Self::DualControl => "hitl.dual_control_requested",
76 Self::Escalation => "hitl.escalation_issued",
77 }
78 }
79
80 fn from_request_id(request_id: &str) -> Option<Self> {
81 if request_id.starts_with("hitl_question_") {
82 Some(Self::Question)
83 } else if request_id.starts_with("hitl_approval_") {
84 Some(Self::Approval)
85 } else if request_id.starts_with("hitl_dual_control_") {
86 Some(Self::DualControl)
87 } else if request_id.starts_with("hitl_escalation_") {
88 Some(Self::Escalation)
89 } else {
90 None
91 }
92 }
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct HitlHostResponse {
97 pub request_id: String,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub answer: Option<JsonValue>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub approved: Option<bool>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub accepted: Option<bool>,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub reviewer: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub reason: Option<String>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub metadata: Option<JsonValue>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub responded_at: Option<String>,
112}
113
114#[derive(Clone, Debug, Serialize, Deserialize)]
115struct HitlRequestEnvelope {
116 request_id: String,
117 kind: HitlRequestKind,
118 trace_id: String,
119 requested_at: String,
120 payload: JsonValue,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize)]
124struct HitlTimeoutRecord {
125 request_id: String,
126 kind: HitlRequestKind,
127 trace_id: String,
128 timed_out_at: String,
129}
130
131#[derive(Clone, Debug)]
132struct DispatchKeys {
133 instance_key: String,
134 stable_base: String,
135 trace_id: String,
136}
137
138#[derive(Clone, Debug)]
139struct AskUserOptions {
140 schema: Option<VmValue>,
141 timeout: Option<StdDuration>,
142 default: Option<VmValue>,
143}
144
145#[derive(Clone, Debug)]
146struct ApprovalOptions {
147 detail: Option<VmValue>,
148 quorum: u32,
149 reviewers: Vec<String>,
150 deadline: StdDuration,
151}
152
153#[derive(Clone, Debug)]
154struct ApprovalProgress {
155 reviewers: BTreeSet<String>,
156 reason: Option<String>,
157 approved_at: Option<String>,
158}
159
160#[derive(Clone, Debug)]
161enum WaitOutcome {
162 Response(HitlHostResponse),
163 Timeout,
164}
165
166pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
167 vm.register_async_builtin("ask_user", |args| {
168 Box::pin(async move { ask_user_impl(&args).await })
169 });
170
171 vm.register_async_builtin("request_approval", |args| {
172 Box::pin(async move { request_approval_impl(&args).await })
173 });
174
175 vm.register_async_builtin("dual_control", |args| {
176 Box::pin(async move { dual_control_impl(&args).await })
177 });
178
179 vm.register_async_builtin("escalate_to", |args| {
180 Box::pin(async move { escalate_to_impl(&args).await })
181 });
182}
183
184pub(crate) fn reset_hitl_state() {
185 REQUEST_SEQUENCE.with(|slot| {
186 *slot.borrow_mut() = RequestSequenceState::default();
187 });
188}
189
190pub(crate) fn take_hitl_state() -> RequestSequenceState {
191 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
192}
193
194pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
195 REQUEST_SEQUENCE.with(|slot| {
196 *slot.borrow_mut() = state;
197 });
198}
199
200pub async fn append_hitl_response(
201 base_dir: Option<&Path>,
202 mut response: HitlHostResponse,
203) -> Result<u64, String> {
204 let kind = HitlRequestKind::from_request_id(&response.request_id)
205 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
206 if response.responded_at.is_none() {
207 response.responded_at = Some(now_rfc3339());
208 }
209 let log = ensure_hitl_event_log_for(base_dir)?;
210 let headers = response_headers(&response.request_id);
211 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
212 log.append(
213 &topic,
214 LogEvent::new(
215 match kind {
216 HitlRequestKind::Escalation => "hitl.escalation_accepted",
217 _ => "hitl.response_received",
218 },
219 serde_json::to_value(response).map_err(|error| error.to_string())?,
220 )
221 .with_headers(headers),
222 )
223 .await
224 .map_err(|error| error.to_string())
225}
226
227async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
228 let prompt = required_string_arg(args, 0, "ask_user")?;
229 let options = parse_ask_user_options(args.get(1))?;
230 let keys = current_dispatch_keys();
231 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
232 let trace_id = keys
233 .as_ref()
234 .map(|keys| keys.trace_id.clone())
235 .unwrap_or_else(new_trace_id);
236 let log = ensure_hitl_event_log();
237 let request = HitlRequestEnvelope {
238 request_id: request_id.clone(),
239 kind: HitlRequestKind::Question,
240 trace_id: trace_id.clone(),
241 requested_at: now_rfc3339(),
242 payload: json!({
243 "prompt": prompt,
244 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
245 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
246 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
247 }),
248 };
249 append_request(&log, &request).await?;
250 maybe_notify_host(&request);
251 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
252
253 match wait_for_terminal_event(
254 &log,
255 HitlRequestKind::Question,
256 &request_id,
257 options.timeout,
258 &trace_id,
259 )
260 .await?
261 {
262 WaitOutcome::Timeout => {
263 if let Some(default) = options.default {
264 return Ok(default);
265 }
266 Err(timeout_error(&request_id, HitlRequestKind::Question))
267 }
268 WaitOutcome::Response(response) => {
269 let answer = response
270 .answer
271 .as_ref()
272 .map(crate::stdlib::json_to_vm_value)
273 .unwrap_or(VmValue::Nil);
274 if let Some(schema) = options.schema.as_ref() {
275 return schema_expect_value(&answer, schema, true);
276 }
277 if let Some(default) = options.default.as_ref() {
278 return Ok(coerce_like_default(&answer, default));
279 }
280 Ok(answer)
281 }
282 }
283}
284
285async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
286 let action = required_string_arg(args, 0, "request_approval")?;
287 let options = parse_approval_options(args.get(1), "request_approval")?;
288 let keys = current_dispatch_keys();
289 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
290 let trace_id = keys
291 .as_ref()
292 .map(|keys| keys.trace_id.clone())
293 .unwrap_or_else(new_trace_id);
294 let log = ensure_hitl_event_log();
295 let request = HitlRequestEnvelope {
296 request_id: request_id.clone(),
297 kind: HitlRequestKind::Approval,
298 trace_id: trace_id.clone(),
299 requested_at: now_rfc3339(),
300 payload: json!({
301 "action": action,
302 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
303 "quorum": options.quorum,
304 "reviewers": options.reviewers,
305 "deadline_ms": options.deadline.as_millis() as u64,
306 }),
307 };
308 append_request(&log, &request).await?;
309 maybe_notify_host(&request);
310 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
311
312 let record = wait_for_approval_quorum(
313 &log,
314 HitlRequestKind::Approval,
315 &request_id,
316 options.quorum,
317 &options.reviewers,
318 options.deadline,
319 &trace_id,
320 )
321 .await?;
322
323 append_named_event(
324 &log,
325 HitlRequestKind::Approval,
326 "hitl.approval_approved",
327 &request_id,
328 &trace_id,
329 json!({
330 "request_id": request_id,
331 "record": crate::llm::vm_value_to_json(&record),
332 }),
333 )
334 .await?;
335
336 Ok(record)
337}
338
339async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
340 let n = required_positive_int_arg(args, 0, "dual_control")?;
341 let m = required_positive_int_arg(args, 1, "dual_control")?;
342 if n > m {
343 return Err(VmError::Runtime(
344 "dual_control: n must be less than or equal to m".to_string(),
345 ));
346 }
347 let action = args
348 .get(2)
349 .and_then(|value| match value {
350 VmValue::Closure(closure) => Some(closure.clone()),
351 _ => None,
352 })
353 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
354 let approvers = optional_string_list(args.get(3), "dual_control")?;
355 if !approvers.is_empty() && approvers.len() < m as usize {
356 return Err(VmError::Runtime(format!(
357 "dual_control: expected at least {m} approvers, got {}",
358 approvers.len()
359 )));
360 }
361
362 let keys = current_dispatch_keys();
363 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
364 let trace_id = keys
365 .as_ref()
366 .map(|keys| keys.trace_id.clone())
367 .unwrap_or_else(new_trace_id);
368 let action_name = if action.func.name.is_empty() {
369 "anonymous".to_string()
370 } else {
371 action.func.name.clone()
372 };
373 let log = ensure_hitl_event_log();
374 let request = HitlRequestEnvelope {
375 request_id: request_id.clone(),
376 kind: HitlRequestKind::DualControl,
377 trace_id: trace_id.clone(),
378 requested_at: now_rfc3339(),
379 payload: json!({
380 "n": n,
381 "m": m,
382 "action": action_name,
383 "approvers": approvers,
384 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
385 }),
386 };
387 append_request(&log, &request).await?;
388 maybe_notify_host(&request);
389 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
390
391 let record = wait_for_approval_quorum(
392 &log,
393 HitlRequestKind::DualControl,
394 &request_id,
395 n as u32,
396 &approvers,
397 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
398 &trace_id,
399 )
400 .await?;
401
402 append_named_event(
403 &log,
404 HitlRequestKind::DualControl,
405 "hitl.dual_control_approved",
406 &request_id,
407 &trace_id,
408 json!({
409 "request_id": request_id,
410 "record": crate::llm::vm_value_to_json(&record),
411 }),
412 )
413 .await?;
414
415 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
416 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
417 })?;
418 let result = vm.call_closure_pub(&action, &[], &[]).await?;
419
420 append_named_event(
421 &log,
422 HitlRequestKind::DualControl,
423 "hitl.dual_control_executed",
424 &request_id,
425 &trace_id,
426 json!({
427 "request_id": request_id,
428 "result": crate::llm::vm_value_to_json(&result),
429 }),
430 )
431 .await?;
432
433 Ok(result)
434}
435
436async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
437 let role = required_string_arg(args, 0, "escalate_to")?;
438 let reason = required_string_arg(args, 1, "escalate_to")?;
439 let keys = current_dispatch_keys();
440 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
441 let trace_id = keys
442 .as_ref()
443 .map(|keys| keys.trace_id.clone())
444 .unwrap_or_else(new_trace_id);
445 let log = ensure_hitl_event_log();
446 let request = HitlRequestEnvelope {
447 request_id: request_id.clone(),
448 kind: HitlRequestKind::Escalation,
449 trace_id: trace_id.clone(),
450 requested_at: now_rfc3339(),
451 payload: json!({
452 "role": role,
453 "reason": reason,
454 }),
455 };
456 append_request(&log, &request).await?;
457 maybe_notify_host(&request);
458 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
459
460 match wait_for_terminal_event(
461 &log,
462 HitlRequestKind::Escalation,
463 &request_id,
464 None,
465 &trace_id,
466 )
467 .await?
468 {
469 WaitOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
470 WaitOutcome::Response(response) => Ok(crate::stdlib::json_to_vm_value(&json!({
471 "request_id": request_id,
472 "role": role,
473 "reason": reason,
474 "trace_id": trace_id,
475 "status": if response.accepted.unwrap_or(false) { "accepted" } else { "pending" },
476 "accepted_at": response.responded_at,
477 "reviewer": response.reviewer,
478 }))),
479 }
480}
481
482async fn wait_for_approval_quorum(
483 log: &Arc<AnyEventLog>,
484 kind: HitlRequestKind,
485 request_id: &str,
486 quorum: u32,
487 reviewers: &[String],
488 timeout: StdDuration,
489 trace_id: &str,
490) -> Result<VmValue, VmError> {
491 let deadline = Instant::now() + timeout;
492 let mut progress = ApprovalProgress {
493 reviewers: BTreeSet::new(),
494 reason: None,
495 approved_at: None,
496 };
497 let allowed_reviewers: BTreeSet<&str> = reviewers.iter().map(String::as_str).collect();
498 let topic = topic(kind)?;
499
500 process_existing_approval_events(
501 log,
502 &topic,
503 request_id,
504 quorum,
505 &allowed_reviewers,
506 &mut progress,
507 trace_id,
508 )
509 .await?;
510 if progress.reviewers.len() as u32 >= quorum {
511 return Ok(approval_record_value(&progress));
512 }
513 if is_replay() {
514 return Err(VmError::Runtime(format!(
515 "replay is missing recorded HITL approvals for '{request_id}'"
516 )));
517 }
518
519 let start_from = log.latest(&topic).await.map_err(log_error)?;
520 let stream = log
521 .clone()
522 .subscribe(&topic, start_from)
523 .await
524 .map_err(log_error)?;
525 pin_mut!(stream);
526 loop {
527 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
528 append_timeout(log, kind, request_id, trace_id).await?;
529 return Err(timeout_error(request_id, kind));
530 };
531 let next = tokio::time::timeout(remaining, stream.next()).await;
532 let event = match next {
533 Ok(Some(Ok((_, event)))) => event,
534 Ok(Some(Err(error))) => return Err(log_error(error)),
535 Ok(None) => {
536 append_timeout(log, kind, request_id, trace_id).await?;
537 return Err(timeout_error(request_id, kind));
538 }
539 Err(_) => {
540 append_timeout(log, kind, request_id, trace_id).await?;
541 return Err(timeout_error(request_id, kind));
542 }
543 };
544 if !event_matches_request(&event, request_id) {
545 continue;
546 }
547 if event.kind == "hitl.timeout" {
548 return Err(timeout_error(request_id, kind));
549 }
550 if event.kind != "hitl.response_received" {
551 continue;
552 }
553 let response: HitlHostResponse = serde_json::from_value(event.payload)
554 .map_err(|error| VmError::Runtime(error.to_string()))?;
555 if let Some(reviewer) = response.reviewer.as_deref() {
556 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
557 continue;
558 }
559 if progress.reviewers.contains(reviewer) {
560 continue;
561 }
562 }
563 if response.approved.unwrap_or(false) {
564 if let Some(reviewer) = response.reviewer {
565 progress.reviewers.insert(reviewer);
566 }
567 progress.reason = response.reason;
568 progress.approved_at = response.responded_at;
569 if progress.reviewers.len() as u32 >= quorum {
570 return Ok(approval_record_value(&progress));
571 }
572 continue;
573 }
574
575 append_named_event(
576 log,
577 kind,
578 match kind {
579 HitlRequestKind::DualControl => "hitl.dual_control_denied",
580 _ => "hitl.approval_denied",
581 },
582 request_id,
583 trace_id,
584 json!({
585 "request_id": request_id,
586 "reviewer": response.reviewer,
587 "reason": response.reason,
588 }),
589 )
590 .await?;
591 return Err(approval_denied_error(request_id, response));
592 }
593}
594
595async fn process_existing_approval_events(
596 log: &Arc<AnyEventLog>,
597 topic: &Topic,
598 request_id: &str,
599 quorum: u32,
600 allowed_reviewers: &BTreeSet<&str>,
601 progress: &mut ApprovalProgress,
602 trace_id: &str,
603) -> Result<(), VmError> {
604 let existing = log
605 .read_range(topic, None, usize::MAX)
606 .await
607 .map_err(log_error)?;
608 for (_, event) in existing {
609 if !event_matches_request(&event, request_id) {
610 continue;
611 }
612 if event.kind == "hitl.timeout" {
613 return Err(timeout_error(request_id, kind_from_topic(topic)?));
614 }
615 if event.kind != "hitl.response_received" {
616 continue;
617 }
618 let response: HitlHostResponse = serde_json::from_value(event.payload)
619 .map_err(|error| VmError::Runtime(error.to_string()))?;
620 let Some(reviewer) = response.reviewer.as_deref() else {
621 continue;
622 };
623 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
624 continue;
625 }
626 if !response.approved.unwrap_or(false) {
627 let kind = kind_from_topic(topic)?;
628 append_named_event(
629 log,
630 kind,
631 match kind {
632 HitlRequestKind::DualControl => "hitl.dual_control_denied",
633 _ => "hitl.approval_denied",
634 },
635 request_id,
636 trace_id,
637 json!({
638 "request_id": request_id,
639 "reviewer": response.reviewer,
640 "reason": response.reason,
641 }),
642 )
643 .await?;
644 return Err(approval_denied_error(request_id, response));
645 }
646 if progress.reviewers.insert(reviewer.to_string()) {
647 progress.reason = response.reason;
648 progress.approved_at = response.responded_at;
649 }
650 if progress.reviewers.len() as u32 >= quorum {
651 break;
652 }
653 }
654 Ok(())
655}
656
657async fn wait_for_terminal_event(
658 log: &Arc<AnyEventLog>,
659 kind: HitlRequestKind,
660 request_id: &str,
661 timeout: Option<StdDuration>,
662 trace_id: &str,
663) -> Result<WaitOutcome, VmError> {
664 let topic = topic(kind)?;
665 if let Some(outcome) = find_existing_terminal_event(log, &topic, request_id).await? {
666 return Ok(outcome);
667 }
668 if is_replay() {
669 return Err(VmError::Runtime(format!(
670 "replay is missing a recorded HITL response for '{request_id}'"
671 )));
672 }
673
674 let start_from = log.latest(&topic).await.map_err(log_error)?;
675 let stream = log
676 .clone()
677 .subscribe(&topic, start_from)
678 .await
679 .map_err(log_error)?;
680 pin_mut!(stream);
681 let deadline = timeout.map(|timeout| Instant::now() + timeout);
682
683 loop {
684 let next_event = if let Some(deadline) = deadline {
685 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
686 append_timeout(log, kind, request_id, trace_id).await?;
687 return Ok(WaitOutcome::Timeout);
688 };
689 match tokio::time::timeout(remaining, stream.next()).await {
690 Ok(next) => next,
691 Err(_) => {
692 append_timeout(log, kind, request_id, trace_id).await?;
693 return Ok(WaitOutcome::Timeout);
694 }
695 }
696 } else {
697 stream.next().await
698 };
699
700 let Some(received) = next_event else {
701 return Err(VmError::Runtime(format!(
702 "HITL wait for '{request_id}' ended before the host responded"
703 )));
704 };
705 let (_, event) = received.map_err(log_error)?;
706 if !event_matches_request(&event, request_id) {
707 continue;
708 }
709 match event.kind.as_str() {
710 "hitl.timeout" => return Ok(WaitOutcome::Timeout),
711 "hitl.response_received" | "hitl.escalation_accepted" => {
712 let response: HitlHostResponse = serde_json::from_value(event.payload)
713 .map_err(|error| VmError::Runtime(error.to_string()))?;
714 if kind == HitlRequestKind::Escalation && !response.accepted.unwrap_or(false) {
715 continue;
716 }
717 return Ok(WaitOutcome::Response(response));
718 }
719 _ => {}
720 }
721 }
722}
723
724async fn find_existing_terminal_event(
725 log: &Arc<AnyEventLog>,
726 topic: &Topic,
727 request_id: &str,
728) -> Result<Option<WaitOutcome>, VmError> {
729 let events = log
730 .read_range(topic, None, usize::MAX)
731 .await
732 .map_err(log_error)?;
733 for (_, event) in events {
734 if !event_matches_request(&event, request_id) {
735 continue;
736 }
737 match event.kind.as_str() {
738 "hitl.timeout" => return Ok(Some(WaitOutcome::Timeout)),
739 "hitl.response_received" | "hitl.escalation_accepted" => {
740 let response: HitlHostResponse = serde_json::from_value(event.payload)
741 .map_err(|error| VmError::Runtime(error.to_string()))?;
742 if event.kind == "hitl.escalation_accepted" && !response.accepted.unwrap_or(false) {
743 continue;
744 }
745 return Ok(Some(WaitOutcome::Response(response)));
746 }
747 _ => {}
748 }
749 }
750 Ok(None)
751}
752
753async fn append_request(
754 log: &Arc<AnyEventLog>,
755 request: &HitlRequestEnvelope,
756) -> Result<(), VmError> {
757 let topic = topic(request.kind)?;
758 log.append(
759 &topic,
760 LogEvent::new(
761 request.kind.request_event_kind(),
762 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
763 )
764 .with_headers(request_headers(request)),
765 )
766 .await
767 .map(|_| ())
768 .map_err(log_error)
769}
770
771async fn append_named_event(
772 log: &Arc<AnyEventLog>,
773 kind: HitlRequestKind,
774 event_kind: &str,
775 request_id: &str,
776 trace_id: &str,
777 payload: JsonValue,
778) -> Result<(), VmError> {
779 let topic = topic(kind)?;
780 let headers = headers_with_trace(request_id, trace_id);
781 log.append(
782 &topic,
783 LogEvent::new(event_kind, payload).with_headers(headers),
784 )
785 .await
786 .map(|_| ())
787 .map_err(log_error)
788}
789
790async fn append_timeout(
791 log: &Arc<AnyEventLog>,
792 kind: HitlRequestKind,
793 request_id: &str,
794 trace_id: &str,
795) -> Result<(), VmError> {
796 append_named_event(
797 log,
798 kind,
799 "hitl.timeout",
800 request_id,
801 trace_id,
802 serde_json::to_value(HitlTimeoutRecord {
803 request_id: request_id.to_string(),
804 kind,
805 trace_id: trace_id.to_string(),
806 timed_out_at: now_rfc3339(),
807 })
808 .map_err(|error| VmError::Runtime(error.to_string()))?,
809 )
810 .await
811}
812
813async fn maybe_apply_mock_response(
814 kind: HitlRequestKind,
815 request_id: &str,
816 request_payload: &JsonValue,
817) -> Result<(), VmError> {
818 let mut params = request_payload
819 .as_object()
820 .cloned()
821 .unwrap_or_default()
822 .into_iter()
823 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
824 .collect::<BTreeMap<_, _>>();
825 params.insert(
826 "request_id".to_string(),
827 VmValue::String(Rc::from(request_id.to_string())),
828 );
829 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
830 return Ok(());
831 };
832 let value = result?;
833 let responses = match value {
834 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
835 other => vec![other],
836 };
837 for response in responses {
838 let response_dict = response.as_dict().ok_or_else(|| {
839 VmError::Runtime(format!(
840 "mocked HITL {} response must be a dict or list<dict>",
841 kind.as_str()
842 ))
843 })?;
844 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
845 append_hitl_response(None, hitl_response)
846 .await
847 .map_err(VmError::Runtime)?;
848 }
849 Ok(())
850}
851
852fn parse_hitl_response_dict(
853 request_id: &str,
854 response_dict: &BTreeMap<String, VmValue>,
855) -> Result<HitlHostResponse, VmError> {
856 Ok(HitlHostResponse {
857 request_id: request_id.to_string(),
858 answer: response_dict
859 .get("answer")
860 .map(crate::llm::vm_value_to_json),
861 approved: response_dict.get("approved").and_then(vm_bool),
862 accepted: response_dict.get("accepted").and_then(vm_bool),
863 reviewer: response_dict.get("reviewer").map(VmValue::display),
864 reason: response_dict.get("reason").map(VmValue::display),
865 metadata: response_dict
866 .get("metadata")
867 .map(crate::llm::vm_value_to_json),
868 responded_at: response_dict.get("responded_at").map(VmValue::display),
869 })
870}
871
872fn maybe_notify_host(request: &HitlRequestEnvelope) {
873 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
874 return;
875 };
876 bridge.notify(
877 "harn.hitl.requested",
878 serde_json::to_value(request).unwrap_or(JsonValue::Null),
879 );
880}
881
882fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
883 let Some(value) = value else {
884 return Ok(AskUserOptions {
885 schema: None,
886 timeout: None,
887 default: None,
888 });
889 };
890 let dict = value
891 .as_dict()
892 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
893 Ok(AskUserOptions {
894 schema: dict
895 .get("schema")
896 .cloned()
897 .filter(|value| !matches!(value, VmValue::Nil)),
898 timeout: dict.get("timeout").map(parse_duration_value).transpose()?,
899 default: dict
900 .get("default")
901 .cloned()
902 .filter(|value| !matches!(value, VmValue::Nil)),
903 })
904}
905
906fn parse_approval_options(
907 value: Option<&VmValue>,
908 builtin: &str,
909) -> Result<ApprovalOptions, VmError> {
910 let dict = match value {
911 None => None,
912 Some(VmValue::Dict(dict)) => Some(dict),
913 Some(_) => {
914 return Err(VmError::Runtime(format!(
915 "{builtin}: options must be a dict"
916 )))
917 }
918 };
919 let quorum = dict
920 .and_then(|dict| dict.get("quorum"))
921 .and_then(VmValue::as_int)
922 .unwrap_or(1);
923 if quorum <= 0 {
924 return Err(VmError::Runtime(format!(
925 "{builtin}: quorum must be positive"
926 )));
927 }
928 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
929 let deadline = dict
930 .and_then(|dict| dict.get("deadline"))
931 .map(parse_duration_value)
932 .transpose()?
933 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
934 Ok(ApprovalOptions {
935 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
936 quorum: quorum as u32,
937 reviewers,
938 deadline,
939 })
940}
941
942fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
943 args.get(idx)
944 .map(VmValue::display)
945 .filter(|value| !value.is_empty())
946 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
947}
948
949fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
950 let value = args
951 .get(idx)
952 .and_then(VmValue::as_int)
953 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
954 if value <= 0 {
955 return Err(VmError::Runtime(format!(
956 "{builtin}: expected a positive int at {idx}"
957 )));
958 }
959 Ok(value)
960}
961
962fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
963 let Some(value) = value else {
964 return Ok(Vec::new());
965 };
966 match value {
967 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
968 _ => Err(VmError::Runtime(format!(
969 "{builtin}: expected list<string>"
970 ))),
971 }
972}
973
974fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
975 match value {
976 VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
977 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
978 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
979 _ => Err(VmError::Runtime(
980 "expected a duration or millisecond count".to_string(),
981 )),
982 }
983}
984
985fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
986 active_event_log()
987 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
988}
989
990fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
991 if let Some(log) = active_event_log() {
992 return Ok(log);
993 }
994 let Some(base_dir) = base_dir else {
995 return Ok(install_memory_for_current_thread(
996 HITL_EVENT_LOG_QUEUE_DEPTH,
997 ));
998 };
999 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1000}
1001
1002fn current_dispatch_keys() -> Option<DispatchKeys> {
1003 let context = current_dispatch_context()?;
1004 let stable_base = context
1005 .replay_of_event_id
1006 .clone()
1007 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1008 let instance_key = format!(
1009 "{}::{}",
1010 context.trigger_event.id.0,
1011 context.replay_of_event_id.as_deref().unwrap_or("live")
1012 );
1013 Some(DispatchKeys {
1014 instance_key,
1015 stable_base,
1016 trace_id: context.trigger_event.trace_id.0,
1017 })
1018}
1019
1020fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1021 if let Some(keys) = dispatch_keys {
1022 let seq = REQUEST_SEQUENCE.with(|slot| {
1023 let mut state = slot.borrow_mut();
1024 if state.instance_key != keys.instance_key {
1025 state.instance_key = keys.instance_key.clone();
1026 state.next_seq = 0;
1027 }
1028 state.next_seq += 1;
1029 state.next_seq
1030 });
1031 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1032 }
1033 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1034}
1035
1036fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1037 headers_with_trace(&request.request_id, &request.trace_id)
1038}
1039
1040fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1041 let mut headers = BTreeMap::new();
1042 headers.insert("request_id".to_string(), request_id.to_string());
1043 headers
1044}
1045
1046fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1047 let mut headers = response_headers(request_id);
1048 headers.insert("trace_id".to_string(), trace_id.to_string());
1049 headers
1050}
1051
1052fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1053 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1054}
1055
1056fn kind_from_topic(topic: &Topic) -> Result<HitlRequestKind, VmError> {
1057 match topic.as_str() {
1058 HITL_QUESTIONS_TOPIC => Ok(HitlRequestKind::Question),
1059 HITL_APPROVALS_TOPIC => Ok(HitlRequestKind::Approval),
1060 HITL_DUAL_CONTROL_TOPIC => Ok(HitlRequestKind::DualControl),
1061 HITL_ESCALATIONS_TOPIC => Ok(HitlRequestKind::Escalation),
1062 other => Err(VmError::Runtime(format!("unknown HITL topic '{other}'"))),
1063 }
1064}
1065
1066fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1067 event
1068 .headers
1069 .get("request_id")
1070 .is_some_and(|value| value == request_id)
1071 || event
1072 .payload
1073 .get("request_id")
1074 .and_then(JsonValue::as_str)
1075 .is_some_and(|value| value == request_id)
1076}
1077
1078fn approval_record_value(progress: &ApprovalProgress) -> VmValue {
1079 crate::stdlib::json_to_vm_value(&json!({
1080 "approved": true,
1081 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1082 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1083 "reason": progress.reason,
1084 }))
1085}
1086
1087fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1088 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1089 "name": "ApprovalDeniedError",
1090 "category": "generic",
1091 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1092 "request_id": request_id,
1093 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1094 "reason": response.reason,
1095 })))
1096}
1097
1098fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1099 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1100 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1101 "name": "HumanTimeoutError",
1102 "category": ErrorCategory::Timeout.as_str(),
1103 "message": format!("{} timed out", kind.as_str()),
1104 "request_id": request_id,
1105 "kind": kind.as_str(),
1106 })))
1107}
1108
1109fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1110 match default {
1111 VmValue::Int(_) => match value {
1112 VmValue::Int(_) => value.clone(),
1113 VmValue::Float(number) => VmValue::Int(*number as i64),
1114 VmValue::String(text) => text
1115 .parse::<i64>()
1116 .map(VmValue::Int)
1117 .unwrap_or_else(|_| default.clone()),
1118 _ => default.clone(),
1119 },
1120 VmValue::Float(_) => match value {
1121 VmValue::Float(_) => value.clone(),
1122 VmValue::Int(number) => VmValue::Float(*number as f64),
1123 VmValue::String(text) => text
1124 .parse::<f64>()
1125 .map(VmValue::Float)
1126 .unwrap_or_else(|_| default.clone()),
1127 _ => default.clone(),
1128 },
1129 VmValue::Bool(_) => match value {
1130 VmValue::Bool(_) => value.clone(),
1131 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1132 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1133 _ => default.clone(),
1134 },
1135 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1136 VmValue::Duration(_) => match value {
1137 VmValue::Duration(_) => value.clone(),
1138 VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1139 _ => default.clone(),
1140 },
1141 VmValue::Nil => value.clone(),
1142 _ => {
1143 if value.type_name() == default.type_name() {
1144 value.clone()
1145 } else {
1146 default.clone()
1147 }
1148 }
1149 }
1150}
1151
1152fn log_error(error: impl std::fmt::Display) -> VmError {
1153 VmError::Runtime(error.to_string())
1154}
1155
1156fn is_replay() -> bool {
1157 std::env::var("HARN_REPLAY")
1158 .ok()
1159 .is_some_and(|value| !value.trim().is_empty() && value != "0")
1160}
1161
1162fn now_rfc3339() -> String {
1163 OffsetDateTime::now_utc()
1164 .format(&Rfc3339)
1165 .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1166}
1167
1168fn new_trace_id() -> String {
1169 format!("trace_{}", Uuid::now_v7())
1170}
1171
1172fn vm_bool(value: &VmValue) -> Option<bool> {
1173 match value {
1174 VmValue::Bool(flag) => Some(*flag),
1175 _ => None,
1176 }
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181 use super::{
1182 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1183 };
1184 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1185 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1186
1187 async fn execute_hitl_script(
1188 base_dir: &std::path::Path,
1189 source: &str,
1190 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1191 reset_thread_local_state();
1192 let log = install_default_for_base_dir(base_dir).expect("install event log");
1193 let chunk = compile_source(source).expect("compile source");
1194 let mut vm = Vm::new();
1195 register_vm_stdlib(&mut vm);
1196 vm.set_source_dir(base_dir);
1197 vm.execute(&chunk).await?;
1198 let output = vm.output().trim_end().to_string();
1199 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1200 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1201 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1202 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1203 Ok((
1204 output,
1205 question_events,
1206 approval_events,
1207 dual_control_events,
1208 escalation_events,
1209 ))
1210 }
1211
1212 async fn event_kinds(
1213 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1214 topic: &str,
1215 ) -> Vec<String> {
1216 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1217 .await
1218 .expect("read topic")
1219 .into_iter()
1220 .map(|(_, event)| event.kind)
1221 .collect()
1222 }
1223
1224 #[tokio::test(flavor = "current_thread")]
1225 async fn ask_user_coerces_to_default_type_and_logs_events() {
1226 tokio::task::LocalSet::new()
1227 .run_until(async {
1228 let dir = tempfile::tempdir().expect("tempdir");
1229 let source = r#"
1230pipeline test(task) {
1231 host_mock("hitl", "question", {answer: "9"})
1232 let answer: int = ask_user("Pick a number", {default: 0})
1233 println(answer)
1234}
1235"#;
1236 let (
1237 output,
1238 question_events,
1239 approval_events,
1240 dual_control_events,
1241 escalation_events,
1242 ) = execute_hitl_script(dir.path(), source)
1243 .await
1244 .expect("script succeeds");
1245 assert_eq!(output, "9");
1246 assert_eq!(
1247 question_events,
1248 vec![
1249 "hitl.question_asked".to_string(),
1250 "hitl.response_received".to_string()
1251 ]
1252 );
1253 assert!(approval_events.is_empty());
1254 assert!(dual_control_events.is_empty());
1255 assert!(escalation_events.is_empty());
1256 })
1257 .await;
1258 }
1259
1260 #[tokio::test(flavor = "current_thread")]
1261 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1262 tokio::task::LocalSet::new()
1263 .run_until(async {
1264 let dir = tempfile::tempdir().expect("tempdir");
1265 let source = r#"
1266pipeline test(task) {
1267 host_mock("hitl", "approval", [
1268 {approved: true, reviewer: "alice", reason: "ok"},
1269 {approved: true, reviewer: "bob", reason: "ship it"},
1270 ])
1271 let record = request_approval(
1272 "deploy production",
1273 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1274 )
1275 println(record.approved)
1276 println(len(record.reviewers))
1277 println(record.reviewers[0])
1278 println(record.reviewers[1])
1279}
1280"#;
1281 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1282 .await
1283 .expect("script succeeds");
1284 assert_eq!(
1285 approval_events,
1286 vec![
1287 "hitl.approval_requested".to_string(),
1288 "hitl.response_received".to_string(),
1289 "hitl.response_received".to_string(),
1290 "hitl.approval_approved".to_string(),
1291 ]
1292 );
1293 })
1294 .await;
1295 }
1296
1297 #[tokio::test(flavor = "current_thread")]
1298 async fn request_approval_surfaces_denials_as_typed_errors() {
1299 tokio::task::LocalSet::new()
1300 .run_until(async {
1301 let dir = tempfile::tempdir().expect("tempdir");
1302 let source = r#"
1303pipeline test(task) {
1304 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1305 let denied = try {
1306 request_approval("drop table", {reviewers: ["alice"]})
1307 }
1308 println(is_err(denied))
1309 println(unwrap_err(denied).name)
1310 println(unwrap_err(denied).reason)
1311}
1312"#;
1313 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1314 .await
1315 .expect("script succeeds");
1316 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1317 assert_eq!(
1318 approval_events,
1319 vec![
1320 "hitl.approval_requested".to_string(),
1321 "hitl.response_received".to_string(),
1322 "hitl.approval_denied".to_string(),
1323 ]
1324 );
1325 })
1326 .await;
1327 }
1328
1329 #[tokio::test(flavor = "current_thread")]
1330 async fn dual_control_executes_action_after_quorum() {
1331 tokio::task::LocalSet::new()
1332 .run_until(async {
1333 let dir = tempfile::tempdir().expect("tempdir");
1334 let source = r#"
1335pipeline test(task) {
1336 host_mock("hitl", "dual_control", [
1337 {approved: true, reviewer: "alice"},
1338 {approved: true, reviewer: "bob"},
1339 ])
1340 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1341 println(result)
1342}
1343"#;
1344 let (output, _, _, dual_control_events, _) =
1345 execute_hitl_script(dir.path(), source)
1346 .await
1347 .expect("script succeeds");
1348 assert_eq!(output, "launched");
1349 assert_eq!(
1350 dual_control_events,
1351 vec![
1352 "hitl.dual_control_requested".to_string(),
1353 "hitl.response_received".to_string(),
1354 "hitl.response_received".to_string(),
1355 "hitl.dual_control_approved".to_string(),
1356 "hitl.dual_control_executed".to_string(),
1357 ]
1358 );
1359 })
1360 .await;
1361 }
1362
1363 #[tokio::test(flavor = "current_thread")]
1364 async fn escalate_to_waits_for_acceptance_event() {
1365 tokio::task::LocalSet::new()
1366 .run_until(async {
1367 let dir = tempfile::tempdir().expect("tempdir");
1368 let source = r#"
1369pipeline test(task) {
1370 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1371 let handle = escalate_to("admin", "need override")
1372 println(handle.status)
1373 println(handle.reviewer)
1374}
1375"#;
1376 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1377 .await
1378 .expect("script succeeds");
1379 assert_eq!(output, "accepted\nlead");
1380 assert_eq!(
1381 escalation_events,
1382 vec![
1383 "hitl.escalation_issued".to_string(),
1384 "hitl.escalation_accepted".to_string(),
1385 ]
1386 );
1387 })
1388 .await;
1389 }
1390}