1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use time::format_description::well_known::Rfc3339;
11use time::OffsetDateTime;
12use uuid::Uuid;
13
14use crate::event_log::{
15 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
16 EventLog, LogEvent, Topic,
17};
18use crate::schema::schema_expect_value;
19use crate::stdlib::host::dispatch_mock_host_call;
20use crate::stdlib::waitpoint::{
21 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
22 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
23 WaitpointWaitOptions,
24};
25use crate::triggers::dispatcher::current_dispatch_context;
26use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
27use crate::vm::{clone_async_builtin_child_vm, Vm};
28
29const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
30const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
31
32pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
33pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
34pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
35pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
36
37thread_local! {
38 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
39}
40
41#[derive(Default)]
42pub(crate) struct RequestSequenceState {
43 pub(crate) instance_key: String,
44 pub(crate) next_seq: u64,
45}
46
47#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum HitlRequestKind {
50 Question,
51 Approval,
52 DualControl,
53 Escalation,
54}
55
56impl HitlRequestKind {
57 pub(crate) fn as_str(self) -> &'static str {
58 match self {
59 Self::Question => "question",
60 Self::Approval => "approval",
61 Self::DualControl => "dual_control",
62 Self::Escalation => "escalation",
63 }
64 }
65
66 fn topic(self) -> &'static str {
67 match self {
68 Self::Question => HITL_QUESTIONS_TOPIC,
69 Self::Approval => HITL_APPROVALS_TOPIC,
70 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
71 Self::Escalation => HITL_ESCALATIONS_TOPIC,
72 }
73 }
74
75 fn request_event_kind(self) -> &'static str {
76 match self {
77 Self::Question => "hitl.question_asked",
78 Self::Approval => "hitl.approval_requested",
79 Self::DualControl => "hitl.dual_control_requested",
80 Self::Escalation => "hitl.escalation_issued",
81 }
82 }
83
84 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
85 if request_id.starts_with("hitl_question_") {
86 Some(Self::Question)
87 } else if request_id.starts_with("hitl_approval_") {
88 Some(Self::Approval)
89 } else if request_id.starts_with("hitl_dual_control_") {
90 Some(Self::DualControl)
91 } else if request_id.starts_with("hitl_escalation_") {
92 Some(Self::Escalation)
93 } else {
94 None
95 }
96 }
97}
98
99#[derive(Clone, Debug, Serialize, Deserialize)]
100pub struct HitlHostResponse {
101 pub request_id: String,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub answer: Option<JsonValue>,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub approved: Option<bool>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub accepted: Option<bool>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub reviewer: Option<String>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub reason: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub metadata: Option<JsonValue>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub responded_at: Option<String>,
116}
117
118#[derive(Clone, Debug, Serialize, Deserialize)]
119struct HitlRequestEnvelope {
120 request_id: String,
121 kind: HitlRequestKind,
122 #[serde(default)]
123 agent: String,
124 trace_id: String,
125 requested_at: String,
126 payload: JsonValue,
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize)]
130struct HitlTimeoutRecord {
131 request_id: String,
132 kind: HitlRequestKind,
133 trace_id: String,
134 timed_out_at: String,
135}
136
137#[derive(Clone, Debug)]
138struct DispatchKeys {
139 instance_key: String,
140 stable_base: String,
141 agent: String,
142 trace_id: String,
143}
144
145#[derive(Clone, Debug)]
146struct AskUserOptions {
147 schema: Option<VmValue>,
148 timeout: Option<StdDuration>,
149 default: Option<VmValue>,
150}
151
152#[derive(Clone, Debug)]
153struct ApprovalOptions {
154 detail: Option<VmValue>,
155 quorum: u32,
156 reviewers: Vec<String>,
157 deadline: StdDuration,
158}
159
160#[derive(Clone, Debug)]
161struct ApprovalProgress {
162 reviewers: BTreeSet<String>,
163 reason: Option<String>,
164 approved_at: Option<String>,
165}
166
167#[derive(Clone, Debug)]
168enum ApprovalResolution {
169 Pending,
170 Approved(ApprovalProgress),
171 Denied(HitlHostResponse),
172}
173
174#[derive(Clone, Debug)]
175enum WaitpointOutcome {
176 Completed(WaitpointRecord),
177 Timeout,
178 Cancelled {
179 wait_id: String,
180 waitpoint_ids: Vec<String>,
181 reason: Option<String>,
182 },
183}
184
185pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
186 vm.register_async_builtin("ask_user", |args| {
187 Box::pin(async move { ask_user_impl(&args).await })
188 });
189
190 vm.register_async_builtin("request_approval", |args| {
191 Box::pin(async move { request_approval_impl(&args).await })
192 });
193
194 vm.register_async_builtin("dual_control", |args| {
195 Box::pin(async move { dual_control_impl(&args).await })
196 });
197
198 vm.register_async_builtin("escalate_to", |args| {
199 Box::pin(async move { escalate_to_impl(&args).await })
200 });
201}
202
203pub(crate) fn reset_hitl_state() {
204 REQUEST_SEQUENCE.with(|slot| {
205 *slot.borrow_mut() = RequestSequenceState::default();
206 });
207}
208
209pub(crate) fn take_hitl_state() -> RequestSequenceState {
210 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
211}
212
213pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
214 REQUEST_SEQUENCE.with(|slot| {
215 *slot.borrow_mut() = state;
216 });
217}
218
219pub async fn append_hitl_response(
220 base_dir: Option<&Path>,
221 mut response: HitlHostResponse,
222) -> Result<u64, String> {
223 let kind = HitlRequestKind::from_request_id(&response.request_id)
224 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
225 if response.responded_at.is_none() {
226 response.responded_at = Some(now_rfc3339());
227 }
228 let log = ensure_hitl_event_log_for(base_dir)?;
229 let headers = response_headers(&response.request_id);
230 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
231 let event_id = log
232 .append(
233 &topic,
234 LogEvent::new(
235 match kind {
236 HitlRequestKind::Escalation => "hitl.escalation_accepted",
237 _ => "hitl.response_received",
238 },
239 serde_json::to_value(&response).map_err(|error| error.to_string())?,
240 )
241 .with_headers(headers),
242 )
243 .await
244 .map_err(|error| error.to_string())?;
245 finalize_hitl_response(&log, kind, &response).await?;
246 Ok(event_id)
247}
248
249async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
250 let prompt = required_string_arg(args, 0, "ask_user")?;
251 let options = parse_ask_user_options(args.get(1))?;
252 let keys = current_dispatch_keys();
253 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
254 let trace_id = keys
255 .as_ref()
256 .map(|keys| keys.trace_id.clone())
257 .unwrap_or_else(new_trace_id);
258 let log = ensure_hitl_event_log();
259 let request = HitlRequestEnvelope {
260 request_id: request_id.clone(),
261 kind: HitlRequestKind::Question,
262 agent: keys
263 .as_ref()
264 .map(|keys| keys.agent.clone())
265 .unwrap_or_default(),
266 trace_id: trace_id.clone(),
267 requested_at: now_rfc3339(),
268 payload: json!({
269 "prompt": prompt,
270 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
271 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
272 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
273 }),
274 };
275 create_request_waitpoint(&log, &request).await?;
276 append_request(&log, &request).await?;
277 maybe_notify_host(&request);
278 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
279
280 match wait_for_request_waitpoint(&request_id, options.timeout).await? {
281 WaitpointOutcome::Completed(record) => {
282 let answer = record
283 .value
284 .as_ref()
285 .map(crate::stdlib::json_to_vm_value)
286 .unwrap_or(VmValue::Nil);
287 if let Some(schema) = options.schema.as_ref() {
288 return schema_expect_value(&answer, schema, true);
289 }
290 if let Some(default) = options.default.as_ref() {
291 return Ok(coerce_like_default(&answer, default));
292 }
293 Ok(answer)
294 }
295 WaitpointOutcome::Timeout => {
296 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
297 if let Some(default) = options.default {
298 return Ok(default);
299 }
300 Err(timeout_error(&request_id, HitlRequestKind::Question))
301 }
302 WaitpointOutcome::Cancelled {
303 wait_id,
304 waitpoint_ids,
305 reason,
306 } => Err(hitl_cancelled_error(
307 &request_id,
308 HitlRequestKind::Question,
309 &wait_id,
310 &waitpoint_ids,
311 reason,
312 )),
313 }
314}
315
316async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
317 let action = required_string_arg(args, 0, "request_approval")?;
318 let options = parse_approval_options(args.get(1), "request_approval")?;
319 let keys = current_dispatch_keys();
320 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
321 let trace_id = keys
322 .as_ref()
323 .map(|keys| keys.trace_id.clone())
324 .unwrap_or_else(new_trace_id);
325 let log = ensure_hitl_event_log();
326 let request = HitlRequestEnvelope {
327 request_id: request_id.clone(),
328 kind: HitlRequestKind::Approval,
329 agent: keys
330 .as_ref()
331 .map(|keys| keys.agent.clone())
332 .unwrap_or_default(),
333 trace_id: trace_id.clone(),
334 requested_at: now_rfc3339(),
335 payload: json!({
336 "action": action,
337 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
338 "quorum": options.quorum,
339 "reviewers": options.reviewers,
340 "deadline_ms": options.deadline.as_millis() as u64,
341 }),
342 };
343 create_request_waitpoint(&log, &request).await?;
344 append_request(&log, &request).await?;
345 maybe_notify_host(&request);
346 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
347
348 match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
349 WaitpointOutcome::Completed(record) => {
350 approval_record_from_waitpoint(&record, "request_approval")
351 }
352 WaitpointOutcome::Timeout => {
353 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
354 Err(timeout_error(&request_id, HitlRequestKind::Approval))
355 }
356 WaitpointOutcome::Cancelled { .. } => {
357 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
358 }
359 }
360}
361
362async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
363 let n = required_positive_int_arg(args, 0, "dual_control")?;
364 let m = required_positive_int_arg(args, 1, "dual_control")?;
365 if n > m {
366 return Err(VmError::Runtime(
367 "dual_control: n must be less than or equal to m".to_string(),
368 ));
369 }
370 let action = args
371 .get(2)
372 .and_then(|value| match value {
373 VmValue::Closure(closure) => Some(closure.clone()),
374 _ => None,
375 })
376 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
377 let approvers = optional_string_list(args.get(3), "dual_control")?;
378 if !approvers.is_empty() && approvers.len() < m as usize {
379 return Err(VmError::Runtime(format!(
380 "dual_control: expected at least {m} approvers, got {}",
381 approvers.len()
382 )));
383 }
384
385 let keys = current_dispatch_keys();
386 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
387 let trace_id = keys
388 .as_ref()
389 .map(|keys| keys.trace_id.clone())
390 .unwrap_or_else(new_trace_id);
391 let action_name = if action.func.name.is_empty() {
392 "anonymous".to_string()
393 } else {
394 action.func.name.clone()
395 };
396 let log = ensure_hitl_event_log();
397 let request = HitlRequestEnvelope {
398 request_id: request_id.clone(),
399 kind: HitlRequestKind::DualControl,
400 agent: keys
401 .as_ref()
402 .map(|keys| keys.agent.clone())
403 .unwrap_or_default(),
404 trace_id: trace_id.clone(),
405 requested_at: now_rfc3339(),
406 payload: json!({
407 "n": n,
408 "m": m,
409 "action": action_name,
410 "approvers": approvers,
411 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
412 }),
413 };
414 create_request_waitpoint(&log, &request).await?;
415 append_request(&log, &request).await?;
416 maybe_notify_host(&request);
417 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
418
419 match wait_for_request_waitpoint(
420 &request_id,
421 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
422 )
423 .await?
424 {
425 WaitpointOutcome::Completed(record) => {
426 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
427 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
428 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
429 })?;
430 let result = vm.call_closure_pub(&action, &[], &[]).await?;
431
432 append_named_event(
433 &log,
434 HitlRequestKind::DualControl,
435 "hitl.dual_control_executed",
436 &request_id,
437 &trace_id,
438 json!({
439 "request_id": request_id,
440 "result": crate::llm::vm_value_to_json(&result),
441 }),
442 )
443 .await?;
444
445 Ok(result)
446 }
447 WaitpointOutcome::Timeout => {
448 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
449 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
450 }
451 WaitpointOutcome::Cancelled { .. } => {
452 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
453 }
454 }
455}
456
457async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
458 let role = required_string_arg(args, 0, "escalate_to")?;
459 let reason = required_string_arg(args, 1, "escalate_to")?;
460 let keys = current_dispatch_keys();
461 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
462 let trace_id = keys
463 .as_ref()
464 .map(|keys| keys.trace_id.clone())
465 .unwrap_or_else(new_trace_id);
466 let log = ensure_hitl_event_log();
467 let request = HitlRequestEnvelope {
468 request_id: request_id.clone(),
469 kind: HitlRequestKind::Escalation,
470 agent: keys
471 .as_ref()
472 .map(|keys| keys.agent.clone())
473 .unwrap_or_default(),
474 trace_id: trace_id.clone(),
475 requested_at: now_rfc3339(),
476 payload: json!({
477 "role": role,
478 "reason": reason,
479 }),
480 };
481 create_request_waitpoint(&log, &request).await?;
482 append_request(&log, &request).await?;
483 maybe_notify_host(&request);
484 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
485
486 match wait_for_request_waitpoint(&request_id, None).await? {
487 WaitpointOutcome::Completed(record) => {
488 let accepted_at = record.completed_at.clone();
489 let reviewer = record.completed_by.clone();
490 let accepted = record
491 .value
492 .as_ref()
493 .and_then(|value| value.get("accepted"))
494 .and_then(JsonValue::as_bool)
495 .unwrap_or(true);
496 Ok(crate::stdlib::json_to_vm_value(&json!({
497 "request_id": request_id,
498 "role": role,
499 "reason": reason,
500 "trace_id": trace_id,
501 "status": if accepted { "accepted" } else { "pending" },
502 "accepted_at": accepted_at,
503 "reviewer": reviewer,
504 })))
505 }
506 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
507 WaitpointOutcome::Cancelled {
508 wait_id,
509 waitpoint_ids,
510 reason,
511 } => Err(hitl_cancelled_error(
512 &request_id,
513 HitlRequestKind::Escalation,
514 &wait_id,
515 &waitpoint_ids,
516 reason,
517 )),
518 }
519}
520
521async fn create_request_waitpoint(
522 log: &Arc<AnyEventLog>,
523 request: &HitlRequestEnvelope,
524) -> Result<(), VmError> {
525 create_waitpoint_on(
526 log,
527 Some(request.request_id.clone()),
528 Some(json!({
529 "kind": request.kind.as_str(),
530 "agent": request.agent.clone(),
531 "trace_id": request.trace_id.clone(),
532 "requested_at": request.requested_at.clone(),
533 "payload": request.payload.clone(),
534 })),
535 )
536 .await?;
537 Ok(())
538}
539
540async fn wait_for_request_waitpoint(
541 request_id: &str,
542 timeout: Option<StdDuration>,
543) -> Result<WaitpointOutcome, VmError> {
544 match wait_on_waitpoints(
545 vec![request_id.to_string()],
546 WaitpointWaitOptions { timeout },
547 )
548 .await
549 {
550 Ok(records) => Ok(WaitpointOutcome::Completed(
551 records
552 .into_iter()
553 .next()
554 .expect("single waitpoint wait result"),
555 )),
556 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
557 Err(WaitpointWaitFailure::Cancelled {
558 wait_id,
559 waitpoint_ids,
560 reason,
561 }) => Ok(WaitpointOutcome::Cancelled {
562 wait_id,
563 waitpoint_ids,
564 reason,
565 }),
566 Err(WaitpointWaitFailure::Vm(error)) => {
567 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
568 return Ok(outcome);
569 }
570 Err(error)
571 }
572 }
573}
574
575fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
576 let VmError::Thrown(VmValue::Dict(dict)) = error else {
577 return None;
578 };
579 let name = dict.get("name").and_then(vm_string)?;
580 match name {
581 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
582 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
583 wait_id: dict
584 .get("wait_id")
585 .and_then(vm_string)
586 .unwrap_or_default()
587 .to_string(),
588 waitpoint_ids: dict
589 .get("waitpoint_ids")
590 .and_then(vm_string_list)
591 .unwrap_or_default(),
592 reason: dict
593 .get("reason")
594 .and_then(vm_string)
595 .map(ToString::to_string),
596 }),
597 _ => None,
598 }
599}
600
601async fn finalize_hitl_response(
602 log: &Arc<AnyEventLog>,
603 kind: HitlRequestKind,
604 response: &HitlHostResponse,
605) -> Result<(), String> {
606 match kind {
607 HitlRequestKind::Question => {
608 if waitpoint_is_terminal(log, &response.request_id).await? {
609 return Ok(());
610 }
611 complete_waitpoint_on(
612 log,
613 &response.request_id,
614 response.answer.clone(),
615 response.reviewer.clone(),
616 response.reason.clone(),
617 response.metadata.clone(),
618 )
619 .await
620 .map(|_| ())
621 .map_err(|error| error.to_string())
622 }
623 HitlRequestKind::Escalation => {
624 if !response.accepted.unwrap_or(false)
625 || waitpoint_is_terminal(log, &response.request_id).await?
626 {
627 return Ok(());
628 }
629 complete_waitpoint_on(
630 log,
631 &response.request_id,
632 Some(json!({
633 "accepted": true,
634 "reviewer": response.reviewer,
635 "reason": response.reason,
636 "responded_at": response.responded_at,
637 })),
638 response.reviewer.clone(),
639 response.reason.clone(),
640 response.metadata.clone(),
641 )
642 .await
643 .map(|_| ())
644 .map_err(|error| error.to_string())
645 }
646 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
647 if waitpoint_is_terminal(log, &response.request_id).await? {
648 return Ok(());
649 }
650 let request = load_request_envelope(log, kind, &response.request_id)
651 .await
652 .map_err(|error| error.to_string())?;
653 match resolve_approval_state(log, kind, &request)
654 .await
655 .map_err(|error| error.to_string())?
656 {
657 ApprovalResolution::Pending => Ok(()),
658 ApprovalResolution::Approved(progress) => {
659 let record = approval_record_json(&progress);
660 append_named_event(
661 log,
662 kind,
663 approved_event_kind(kind),
664 &request.request_id,
665 &request.trace_id,
666 json!({
667 "request_id": request.request_id.clone(),
668 "record": record.clone(),
669 }),
670 )
671 .await
672 .map_err(|error| error.to_string())?;
673 complete_waitpoint_on(
674 log,
675 &request.request_id,
676 Some(record),
677 response.reviewer.clone(),
678 progress.reason.clone(),
679 response.metadata.clone(),
680 )
681 .await
682 .map(|_| ())
683 .map_err(|error| error.to_string())
684 }
685 ApprovalResolution::Denied(denied) => {
686 append_named_event(
687 log,
688 kind,
689 denied_event_kind(kind),
690 &request.request_id,
691 &request.trace_id,
692 json!({
693 "request_id": request.request_id.clone(),
694 "reviewer": denied.reviewer.clone(),
695 "reason": denied.reason.clone(),
696 }),
697 )
698 .await
699 .map_err(|error| error.to_string())?;
700 cancel_waitpoint_on(
701 log,
702 &request.request_id,
703 denied.reviewer.clone(),
704 denied.reason.clone(),
705 denied.metadata.clone(),
706 )
707 .await
708 .map(|_| ())
709 .map_err(|error| error.to_string())
710 }
711 }
712 }
713 }
714}
715
716async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
717 Ok(inspect_waitpoint_on(log, request_id)
718 .await
719 .map_err(|error| error.to_string())?
720 .is_some_and(|record| record.status != WaitpointStatus::Open))
721}
722
723async fn load_request_envelope(
724 log: &Arc<AnyEventLog>,
725 kind: HitlRequestKind,
726 request_id: &str,
727) -> Result<HitlRequestEnvelope, VmError> {
728 let topic = topic(kind)?;
729 let events = log
730 .read_range(&topic, None, usize::MAX)
731 .await
732 .map_err(log_error)?;
733 events
734 .into_iter()
735 .filter(|(_, event)| event.kind == kind.request_event_kind())
736 .find_map(|(_, event)| {
737 if !event_matches_request(&event, request_id) {
738 return None;
739 }
740 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
741 })
742 .ok_or_else(|| {
743 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
744 })
745}
746
747async fn resolve_approval_state(
748 log: &Arc<AnyEventLog>,
749 kind: HitlRequestKind,
750 request: &HitlRequestEnvelope,
751) -> Result<ApprovalResolution, VmError> {
752 let quorum = approval_quorum_from_request(kind, request)?;
753 let allowed_reviewers = approval_reviewers_from_request(kind, request)
754 .into_iter()
755 .collect::<BTreeSet<_>>();
756 let mut progress = ApprovalProgress {
757 reviewers: BTreeSet::new(),
758 reason: None,
759 approved_at: None,
760 };
761 let topic = topic(kind)?;
762 let events = log
763 .read_range(&topic, None, usize::MAX)
764 .await
765 .map_err(log_error)?;
766 for (_, event) in events {
767 if !event_matches_request(&event, &request.request_id)
768 || event.kind != "hitl.response_received"
769 {
770 continue;
771 }
772 let response: HitlHostResponse = serde_json::from_value(event.payload)
773 .map_err(|error| VmError::Runtime(error.to_string()))?;
774 if let Some(reviewer) = response.reviewer.as_deref() {
775 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
776 continue;
777 }
778 if progress.reviewers.contains(reviewer) {
779 continue;
780 }
781 }
782 if response.approved.unwrap_or(false) {
783 if let Some(reviewer) = response.reviewer.clone() {
784 progress.reviewers.insert(reviewer);
785 }
786 progress.reason = response.reason.clone();
787 progress.approved_at = response.responded_at.clone();
788 if progress.reviewers.len() as u32 >= quorum {
789 return Ok(ApprovalResolution::Approved(progress));
790 }
791 continue;
792 }
793 return Ok(ApprovalResolution::Denied(response));
794 }
795 Ok(ApprovalResolution::Pending)
796}
797
798fn approval_quorum_from_request(
799 kind: HitlRequestKind,
800 request: &HitlRequestEnvelope,
801) -> Result<u32, VmError> {
802 let key = match kind {
803 HitlRequestKind::DualControl => "n",
804 _ => "quorum",
805 };
806 let quorum = request
807 .payload
808 .get(key)
809 .and_then(JsonValue::as_u64)
810 .unwrap_or(1);
811 u32::try_from(quorum).map_err(|_| {
812 VmError::Runtime(format!(
813 "invalid quorum in HITL request '{}'",
814 request.request_id
815 ))
816 })
817}
818
819fn approval_reviewers_from_request(
820 kind: HitlRequestKind,
821 request: &HitlRequestEnvelope,
822) -> Vec<String> {
823 let key = match kind {
824 HitlRequestKind::DualControl => "approvers",
825 _ => "reviewers",
826 };
827 request
828 .payload
829 .get(key)
830 .and_then(JsonValue::as_array)
831 .map(|values| {
832 values
833 .iter()
834 .filter_map(JsonValue::as_str)
835 .map(str::to_string)
836 .collect()
837 })
838 .unwrap_or_default()
839}
840
841fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
842 json!({
843 "approved": true,
844 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
845 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
846 "reason": progress.reason,
847 })
848}
849
850fn approval_record_from_waitpoint(
851 record: &WaitpointRecord,
852 builtin: &str,
853) -> Result<VmValue, VmError> {
854 record
855 .value
856 .as_ref()
857 .map(crate::stdlib::json_to_vm_value)
858 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
859}
860
861async fn approval_wait_error(
862 log: &Arc<AnyEventLog>,
863 kind: HitlRequestKind,
864 request_id: &str,
865) -> VmError {
866 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
867 if record.status == WaitpointStatus::Cancelled
868 && record.reason.as_deref() != Some("upstream_cancelled")
869 {
870 return approval_denied_error(
871 request_id,
872 HitlHostResponse {
873 request_id: request_id.to_string(),
874 answer: None,
875 approved: Some(false),
876 accepted: None,
877 reviewer: record.cancelled_by.clone(),
878 reason: record.reason.clone(),
879 metadata: record.metadata.clone(),
880 responded_at: record.cancelled_at.clone(),
881 },
882 );
883 }
884 if record.status == WaitpointStatus::Cancelled {
885 return hitl_cancelled_error(
886 request_id,
887 kind,
888 "",
889 &[request_id.to_string()],
890 record.reason.clone(),
891 );
892 }
893 }
894 hitl_cancelled_error(
895 request_id,
896 kind,
897 "",
898 &[request_id.to_string()],
899 Some("upstream_cancelled".to_string()),
900 )
901}
902
903async fn append_timeout_once(
904 log: &Arc<AnyEventLog>,
905 kind: HitlRequestKind,
906 request_id: &str,
907 trace_id: &str,
908) -> Result<(), VmError> {
909 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
910 return Ok(());
911 }
912 append_timeout(log, kind, request_id, trace_id).await
913}
914
915async fn hitl_event_exists(
916 log: &Arc<AnyEventLog>,
917 kind: HitlRequestKind,
918 request_id: &str,
919 event_kind: &str,
920) -> Result<bool, VmError> {
921 let topic = topic(kind)?;
922 let events = log
923 .read_range(&topic, None, usize::MAX)
924 .await
925 .map_err(log_error)?;
926 Ok(events
927 .into_iter()
928 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
929}
930
931fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
932 match kind {
933 HitlRequestKind::DualControl => "hitl.dual_control_approved",
934 _ => "hitl.approval_approved",
935 }
936}
937
938fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
939 match kind {
940 HitlRequestKind::DualControl => "hitl.dual_control_denied",
941 _ => "hitl.approval_denied",
942 }
943}
944
945async fn append_request(
946 log: &Arc<AnyEventLog>,
947 request: &HitlRequestEnvelope,
948) -> Result<(), VmError> {
949 let topic = topic(request.kind)?;
950 log.append(
951 &topic,
952 LogEvent::new(
953 request.kind.request_event_kind(),
954 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
955 )
956 .with_headers(request_headers(request)),
957 )
958 .await
959 .map(|_| ())
960 .map_err(log_error)
961}
962
963async fn append_named_event(
964 log: &Arc<AnyEventLog>,
965 kind: HitlRequestKind,
966 event_kind: &str,
967 request_id: &str,
968 trace_id: &str,
969 payload: JsonValue,
970) -> Result<(), VmError> {
971 let topic = topic(kind)?;
972 let headers = headers_with_trace(request_id, trace_id);
973 log.append(
974 &topic,
975 LogEvent::new(event_kind, payload).with_headers(headers),
976 )
977 .await
978 .map(|_| ())
979 .map_err(log_error)
980}
981
982async fn append_timeout(
983 log: &Arc<AnyEventLog>,
984 kind: HitlRequestKind,
985 request_id: &str,
986 trace_id: &str,
987) -> Result<(), VmError> {
988 append_named_event(
989 log,
990 kind,
991 "hitl.timeout",
992 request_id,
993 trace_id,
994 serde_json::to_value(HitlTimeoutRecord {
995 request_id: request_id.to_string(),
996 kind,
997 trace_id: trace_id.to_string(),
998 timed_out_at: now_rfc3339(),
999 })
1000 .map_err(|error| VmError::Runtime(error.to_string()))?,
1001 )
1002 .await
1003}
1004
1005async fn maybe_apply_mock_response(
1006 kind: HitlRequestKind,
1007 request_id: &str,
1008 request_payload: &JsonValue,
1009) -> Result<(), VmError> {
1010 let mut params = request_payload
1011 .as_object()
1012 .cloned()
1013 .unwrap_or_default()
1014 .into_iter()
1015 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1016 .collect::<BTreeMap<_, _>>();
1017 params.insert(
1018 "request_id".to_string(),
1019 VmValue::String(Rc::from(request_id.to_string())),
1020 );
1021 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1022 return Ok(());
1023 };
1024 let value = result?;
1025 let responses = match value {
1026 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1027 other => vec![other],
1028 };
1029 for response in responses {
1030 let response_dict = response.as_dict().ok_or_else(|| {
1031 VmError::Runtime(format!(
1032 "mocked HITL {} response must be a dict or list<dict>",
1033 kind.as_str()
1034 ))
1035 })?;
1036 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1037 append_hitl_response(None, hitl_response)
1038 .await
1039 .map_err(VmError::Runtime)?;
1040 }
1041 Ok(())
1042}
1043
1044fn parse_hitl_response_dict(
1045 request_id: &str,
1046 response_dict: &BTreeMap<String, VmValue>,
1047) -> Result<HitlHostResponse, VmError> {
1048 Ok(HitlHostResponse {
1049 request_id: request_id.to_string(),
1050 answer: response_dict
1051 .get("answer")
1052 .map(crate::llm::vm_value_to_json),
1053 approved: response_dict.get("approved").and_then(vm_bool),
1054 accepted: response_dict.get("accepted").and_then(vm_bool),
1055 reviewer: response_dict.get("reviewer").map(VmValue::display),
1056 reason: response_dict.get("reason").map(VmValue::display),
1057 metadata: response_dict
1058 .get("metadata")
1059 .map(crate::llm::vm_value_to_json),
1060 responded_at: response_dict.get("responded_at").map(VmValue::display),
1061 })
1062}
1063
1064fn maybe_notify_host(request: &HitlRequestEnvelope) {
1065 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1066 return;
1067 };
1068 bridge.notify(
1069 "harn.hitl.requested",
1070 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1071 );
1072}
1073
1074fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1075 let Some(value) = value else {
1076 return Ok(AskUserOptions {
1077 schema: None,
1078 timeout: None,
1079 default: None,
1080 });
1081 };
1082 let dict = value
1083 .as_dict()
1084 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1085 Ok(AskUserOptions {
1086 schema: dict
1087 .get("schema")
1088 .cloned()
1089 .filter(|value| !matches!(value, VmValue::Nil)),
1090 timeout: dict.get("timeout").map(parse_duration_value).transpose()?,
1091 default: dict
1092 .get("default")
1093 .cloned()
1094 .filter(|value| !matches!(value, VmValue::Nil)),
1095 })
1096}
1097
1098fn parse_approval_options(
1099 value: Option<&VmValue>,
1100 builtin: &str,
1101) -> Result<ApprovalOptions, VmError> {
1102 let dict = match value {
1103 None => None,
1104 Some(VmValue::Dict(dict)) => Some(dict),
1105 Some(_) => {
1106 return Err(VmError::Runtime(format!(
1107 "{builtin}: options must be a dict"
1108 )))
1109 }
1110 };
1111 let quorum = dict
1112 .and_then(|dict| dict.get("quorum"))
1113 .and_then(VmValue::as_int)
1114 .unwrap_or(1);
1115 if quorum <= 0 {
1116 return Err(VmError::Runtime(format!(
1117 "{builtin}: quorum must be positive"
1118 )));
1119 }
1120 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1121 let deadline = dict
1122 .and_then(|dict| dict.get("deadline"))
1123 .map(parse_duration_value)
1124 .transpose()?
1125 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1126 Ok(ApprovalOptions {
1127 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1128 quorum: quorum as u32,
1129 reviewers,
1130 deadline,
1131 })
1132}
1133
1134fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1135 args.get(idx)
1136 .map(VmValue::display)
1137 .filter(|value| !value.is_empty())
1138 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1139}
1140
1141fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1142 let value = args
1143 .get(idx)
1144 .and_then(VmValue::as_int)
1145 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1146 if value <= 0 {
1147 return Err(VmError::Runtime(format!(
1148 "{builtin}: expected a positive int at {idx}"
1149 )));
1150 }
1151 Ok(value)
1152}
1153
1154fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1155 let Some(value) = value else {
1156 return Ok(Vec::new());
1157 };
1158 match value {
1159 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1160 _ => Err(VmError::Runtime(format!(
1161 "{builtin}: expected list<string>"
1162 ))),
1163 }
1164}
1165
1166fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1167 match value {
1168 VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
1169 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1170 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1171 _ => Err(VmError::Runtime(
1172 "expected a duration or millisecond count".to_string(),
1173 )),
1174 }
1175}
1176
1177fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1178 active_event_log()
1179 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1180}
1181
1182fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1183 if let Some(log) = active_event_log() {
1184 return Ok(log);
1185 }
1186 let Some(base_dir) = base_dir else {
1187 return Ok(install_memory_for_current_thread(
1188 HITL_EVENT_LOG_QUEUE_DEPTH,
1189 ));
1190 };
1191 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1192}
1193
1194fn current_dispatch_keys() -> Option<DispatchKeys> {
1195 let context = current_dispatch_context()?;
1196 let stable_base = context
1197 .replay_of_event_id
1198 .clone()
1199 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1200 let instance_key = format!(
1201 "{}::{}",
1202 context.trigger_event.id.0,
1203 context.replay_of_event_id.as_deref().unwrap_or("live")
1204 );
1205 Some(DispatchKeys {
1206 instance_key,
1207 stable_base,
1208 agent: context.agent_id,
1209 trace_id: context.trigger_event.trace_id.0,
1210 })
1211}
1212
1213fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1214 if let Some(keys) = dispatch_keys {
1215 let seq = REQUEST_SEQUENCE.with(|slot| {
1216 let mut state = slot.borrow_mut();
1217 if state.instance_key != keys.instance_key {
1218 state.instance_key = keys.instance_key.clone();
1219 state.next_seq = 0;
1220 }
1221 state.next_seq += 1;
1222 state.next_seq
1223 });
1224 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1225 }
1226 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1227}
1228
1229fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1230 headers_with_trace(&request.request_id, &request.trace_id)
1231}
1232
1233fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1234 let mut headers = BTreeMap::new();
1235 headers.insert("request_id".to_string(), request_id.to_string());
1236 headers
1237}
1238
1239fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1240 let mut headers = response_headers(request_id);
1241 headers.insert("trace_id".to_string(), trace_id.to_string());
1242 headers
1243}
1244
1245fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1246 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1247}
1248
1249fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1250 event
1251 .headers
1252 .get("request_id")
1253 .is_some_and(|value| value == request_id)
1254 || event
1255 .payload
1256 .get("request_id")
1257 .and_then(JsonValue::as_str)
1258 .is_some_and(|value| value == request_id)
1259}
1260
1261fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1262 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1263 "name": "ApprovalDeniedError",
1264 "category": "generic",
1265 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1266 "request_id": request_id,
1267 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1268 "reason": response.reason,
1269 })))
1270}
1271
1272fn hitl_cancelled_error(
1273 request_id: &str,
1274 kind: HitlRequestKind,
1275 wait_id: &str,
1276 waitpoint_ids: &[String],
1277 reason: Option<String>,
1278) -> VmError {
1279 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1280 let message = reason
1281 .clone()
1282 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1283 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1284 "name": "HumanCancelledError",
1285 "category": ErrorCategory::Cancelled.as_str(),
1286 "message": message,
1287 "request_id": request_id,
1288 "kind": kind.as_str(),
1289 "wait_id": wait_id,
1290 "waitpoint_ids": waitpoint_ids,
1291 "reason": reason,
1292 })))
1293}
1294
1295fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1296 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1297 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1298 "name": "HumanTimeoutError",
1299 "category": ErrorCategory::Timeout.as_str(),
1300 "message": format!("{} timed out", kind.as_str()),
1301 "request_id": request_id,
1302 "kind": kind.as_str(),
1303 })))
1304}
1305
1306fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1307 match default {
1308 VmValue::Int(_) => match value {
1309 VmValue::Int(_) => value.clone(),
1310 VmValue::Float(number) => VmValue::Int(*number as i64),
1311 VmValue::String(text) => text
1312 .parse::<i64>()
1313 .map(VmValue::Int)
1314 .unwrap_or_else(|_| default.clone()),
1315 _ => default.clone(),
1316 },
1317 VmValue::Float(_) => match value {
1318 VmValue::Float(_) => value.clone(),
1319 VmValue::Int(number) => VmValue::Float(*number as f64),
1320 VmValue::String(text) => text
1321 .parse::<f64>()
1322 .map(VmValue::Float)
1323 .unwrap_or_else(|_| default.clone()),
1324 _ => default.clone(),
1325 },
1326 VmValue::Bool(_) => match value {
1327 VmValue::Bool(_) => value.clone(),
1328 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1329 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1330 _ => default.clone(),
1331 },
1332 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1333 VmValue::Duration(_) => match value {
1334 VmValue::Duration(_) => value.clone(),
1335 VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1336 _ => default.clone(),
1337 },
1338 VmValue::Nil => value.clone(),
1339 _ => {
1340 if value.type_name() == default.type_name() {
1341 value.clone()
1342 } else {
1343 default.clone()
1344 }
1345 }
1346 }
1347}
1348
1349fn log_error(error: impl std::fmt::Display) -> VmError {
1350 VmError::Runtime(error.to_string())
1351}
1352
1353fn now_rfc3339() -> String {
1354 OffsetDateTime::now_utc()
1355 .format(&Rfc3339)
1356 .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1357}
1358
1359fn new_trace_id() -> String {
1360 format!("trace_{}", Uuid::now_v7())
1361}
1362
1363fn vm_bool(value: &VmValue) -> Option<bool> {
1364 match value {
1365 VmValue::Bool(flag) => Some(*flag),
1366 _ => None,
1367 }
1368}
1369
1370fn vm_string(value: &VmValue) -> Option<&str> {
1371 match value {
1372 VmValue::String(text) => Some(text.as_ref()),
1373 _ => None,
1374 }
1375}
1376
1377fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1378 match value {
1379 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1380 _ => None,
1381 }
1382}
1383
1384#[cfg(test)]
1385mod tests {
1386 use super::{
1387 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1388 };
1389 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1390 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1391
1392 async fn execute_hitl_script(
1393 base_dir: &std::path::Path,
1394 source: &str,
1395 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1396 reset_thread_local_state();
1397 let log = install_default_for_base_dir(base_dir).expect("install event log");
1398 let chunk = compile_source(source).expect("compile source");
1399 let mut vm = Vm::new();
1400 register_vm_stdlib(&mut vm);
1401 vm.set_source_dir(base_dir);
1402 vm.execute(&chunk).await?;
1403 let output = vm.output().trim_end().to_string();
1404 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1405 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1406 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1407 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1408 Ok((
1409 output,
1410 question_events,
1411 approval_events,
1412 dual_control_events,
1413 escalation_events,
1414 ))
1415 }
1416
1417 async fn event_kinds(
1418 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1419 topic: &str,
1420 ) -> Vec<String> {
1421 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1422 .await
1423 .expect("read topic")
1424 .into_iter()
1425 .map(|(_, event)| event.kind)
1426 .collect()
1427 }
1428
1429 #[tokio::test(flavor = "current_thread")]
1430 async fn ask_user_coerces_to_default_type_and_logs_events() {
1431 tokio::task::LocalSet::new()
1432 .run_until(async {
1433 let dir = tempfile::tempdir().expect("tempdir");
1434 let source = r#"
1435pipeline test(task) {
1436 host_mock("hitl", "question", {answer: "9"})
1437 let answer: int = ask_user("Pick a number", {default: 0})
1438 println(answer)
1439}
1440"#;
1441 let (
1442 output,
1443 question_events,
1444 approval_events,
1445 dual_control_events,
1446 escalation_events,
1447 ) = execute_hitl_script(dir.path(), source)
1448 .await
1449 .expect("script succeeds");
1450 assert_eq!(output, "9");
1451 assert_eq!(
1452 question_events,
1453 vec![
1454 "hitl.question_asked".to_string(),
1455 "hitl.response_received".to_string()
1456 ]
1457 );
1458 assert!(approval_events.is_empty());
1459 assert!(dual_control_events.is_empty());
1460 assert!(escalation_events.is_empty());
1461 })
1462 .await;
1463 }
1464
1465 #[tokio::test(flavor = "current_thread")]
1466 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1467 tokio::task::LocalSet::new()
1468 .run_until(async {
1469 let dir = tempfile::tempdir().expect("tempdir");
1470 let source = r#"
1471pipeline test(task) {
1472 host_mock("hitl", "approval", [
1473 {approved: true, reviewer: "alice", reason: "ok"},
1474 {approved: true, reviewer: "bob", reason: "ship it"},
1475 ])
1476 let record = request_approval(
1477 "deploy production",
1478 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1479 )
1480 println(record.approved)
1481 println(len(record.reviewers))
1482 println(record.reviewers[0])
1483 println(record.reviewers[1])
1484}
1485"#;
1486 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1487 .await
1488 .expect("script succeeds");
1489 assert_eq!(
1490 approval_events,
1491 vec![
1492 "hitl.approval_requested".to_string(),
1493 "hitl.response_received".to_string(),
1494 "hitl.response_received".to_string(),
1495 "hitl.approval_approved".to_string(),
1496 ]
1497 );
1498 })
1499 .await;
1500 }
1501
1502 #[tokio::test(flavor = "current_thread")]
1503 async fn request_approval_surfaces_denials_as_typed_errors() {
1504 tokio::task::LocalSet::new()
1505 .run_until(async {
1506 let dir = tempfile::tempdir().expect("tempdir");
1507 let source = r#"
1508pipeline test(task) {
1509 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1510 let denied = try {
1511 request_approval("drop table", {reviewers: ["alice"]})
1512 }
1513 println(is_err(denied))
1514 println(unwrap_err(denied).name)
1515 println(unwrap_err(denied).reason)
1516}
1517"#;
1518 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1519 .await
1520 .expect("script succeeds");
1521 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1522 assert_eq!(
1523 approval_events,
1524 vec![
1525 "hitl.approval_requested".to_string(),
1526 "hitl.response_received".to_string(),
1527 "hitl.approval_denied".to_string(),
1528 ]
1529 );
1530 })
1531 .await;
1532 }
1533
1534 #[tokio::test(flavor = "current_thread")]
1535 async fn dual_control_executes_action_after_quorum() {
1536 tokio::task::LocalSet::new()
1537 .run_until(async {
1538 let dir = tempfile::tempdir().expect("tempdir");
1539 let source = r#"
1540pipeline test(task) {
1541 host_mock("hitl", "dual_control", [
1542 {approved: true, reviewer: "alice"},
1543 {approved: true, reviewer: "bob"},
1544 ])
1545 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1546 println(result)
1547}
1548"#;
1549 let (output, _, _, dual_control_events, _) =
1550 execute_hitl_script(dir.path(), source)
1551 .await
1552 .expect("script succeeds");
1553 assert_eq!(output, "launched");
1554 assert_eq!(
1555 dual_control_events,
1556 vec![
1557 "hitl.dual_control_requested".to_string(),
1558 "hitl.response_received".to_string(),
1559 "hitl.response_received".to_string(),
1560 "hitl.dual_control_approved".to_string(),
1561 "hitl.dual_control_executed".to_string(),
1562 ]
1563 );
1564 })
1565 .await;
1566 }
1567
1568 #[tokio::test(flavor = "current_thread")]
1569 async fn escalate_to_waits_for_acceptance_event() {
1570 tokio::task::LocalSet::new()
1571 .run_until(async {
1572 let dir = tempfile::tempdir().expect("tempdir");
1573 let source = r#"
1574pipeline test(task) {
1575 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1576 let handle = escalate_to("admin", "need override")
1577 println(handle.status)
1578 println(handle.reviewer)
1579}
1580"#;
1581 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1582 .await
1583 .expect("script succeeds");
1584 assert_eq!(output, "accepted\nlead");
1585 assert_eq!(
1586 escalation_events,
1587 vec![
1588 "hitl.escalation_issued".to_string(),
1589 "hitl.escalation_accepted".to_string(),
1590 ]
1591 );
1592 })
1593 .await;
1594 }
1595}