1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24 WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45 pub(crate) instance_key: String,
46 pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52 Question,
53 Approval,
54 DualControl,
55 Escalation,
56}
57
58impl HitlRequestKind {
59 pub(crate) fn as_str(self) -> &'static str {
60 match self {
61 Self::Question => "question",
62 Self::Approval => "approval",
63 Self::DualControl => "dual_control",
64 Self::Escalation => "escalation",
65 }
66 }
67
68 fn topic(self) -> &'static str {
69 match self {
70 Self::Question => HITL_QUESTIONS_TOPIC,
71 Self::Approval => HITL_APPROVALS_TOPIC,
72 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73 Self::Escalation => HITL_ESCALATIONS_TOPIC,
74 }
75 }
76
77 fn request_event_kind(self) -> &'static str {
78 match self {
79 Self::Question => "hitl.question_asked",
80 Self::Approval => "hitl.approval_requested",
81 Self::DualControl => "hitl.dual_control_requested",
82 Self::Escalation => "hitl.escalation_issued",
83 }
84 }
85
86 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87 if request_id.starts_with("hitl_question_") {
88 Some(Self::Question)
89 } else if request_id.starts_with("hitl_approval_") {
90 Some(Self::Approval)
91 } else if request_id.starts_with("hitl_dual_control_") {
92 Some(Self::DualControl)
93 } else if request_id.starts_with("hitl_escalation_") {
94 Some(Self::Escalation)
95 } else {
96 None
97 }
98 }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103 pub request_id: String,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub answer: Option<JsonValue>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub approved: Option<bool>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub accepted: Option<bool>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub reviewer: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub reason: Option<String>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub metadata: Option<JsonValue>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub responded_at: Option<String>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124 request_id: String,
125 kind: HitlRequestKind,
126 #[serde(default)]
127 agent: String,
128 trace_id: String,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 run_id: Option<String>,
131 requested_at: String,
132 payload: JsonValue,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136struct HitlTimeoutRecord {
137 request_id: String,
138 kind: HitlRequestKind,
139 trace_id: String,
140 timed_out_at: String,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
144pub struct ApprovalRequest {
145 pub id: String,
146 pub action: String,
147 #[serde(default)]
148 pub args: JsonValue,
149 pub principal: String,
150 pub requested_at: String,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub deadline: Option<String>,
153 pub approvers_required: u32,
154 #[serde(default)]
155 pub evidence_refs: Vec<JsonValue>,
156 #[serde(default)]
157 pub undo_metadata: JsonValue,
158 #[serde(default)]
159 pub capabilities_requested: Vec<String>,
160}
161
162impl ApprovalRequest {
163 pub fn new(
164 id: impl Into<String>,
165 action: impl Into<String>,
166 args: JsonValue,
167 principal: impl Into<String>,
168 requested_at: impl Into<String>,
169 ) -> Self {
170 Self {
171 id: id.into(),
172 action: action.into(),
173 args,
174 principal: principal.into(),
175 requested_at: requested_at.into(),
176 deadline: None,
177 approvers_required: 1,
178 evidence_refs: Vec::new(),
179 undo_metadata: JsonValue::Null,
180 capabilities_requested: Vec::new(),
181 }
182 }
183}
184
185pub(crate) fn approval_request_for_host_permission(
186 id: impl Into<String>,
187 action: impl Into<String>,
188 args: JsonValue,
189 principal: impl Into<String>,
190 evidence_refs: Vec<JsonValue>,
191 undo_metadata: JsonValue,
192 capabilities_requested: Vec<String>,
193) -> ApprovalRequest {
194 let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
195 request.evidence_refs = evidence_refs;
196 request.undo_metadata = undo_metadata;
197 request.capabilities_requested = capabilities_requested;
198 request
199}
200
201#[derive(Clone, Debug)]
202struct DispatchKeys {
203 instance_key: String,
204 stable_base: String,
205 agent: String,
206 trace_id: String,
207}
208
209#[derive(Clone, Debug)]
210struct AskUserOptions {
211 schema: Option<VmValue>,
212 timeout: Option<StdDuration>,
213 default: Option<VmValue>,
214}
215
216#[derive(Clone, Debug)]
217struct ApprovalOptions {
218 detail: Option<VmValue>,
219 args: Option<VmValue>,
220 quorum: u32,
221 reviewers: Vec<String>,
222 deadline: StdDuration,
223 principal: Option<String>,
224 evidence_refs: Vec<JsonValue>,
225 undo_metadata: Option<JsonValue>,
226 capabilities_requested: Vec<String>,
227}
228
229#[derive(Clone, Debug)]
230struct ApprovalProgress {
231 request_id: String,
232 reviewers: BTreeSet<String>,
233 signatures: Vec<ApprovalSignature>,
234 reason: Option<String>,
235 approved_at: Option<String>,
236}
237
238#[derive(Clone, Debug, Serialize)]
239struct ApprovalSignature {
240 reviewer: String,
241 signed_at: String,
242 signature: String,
243}
244
245#[derive(Clone, Debug)]
246enum ApprovalResolution {
247 Pending,
248 Approved(ApprovalProgress),
249 Denied(HitlHostResponse),
250}
251
252#[derive(Clone, Debug)]
253enum WaitpointOutcome {
254 Completed(WaitpointRecord),
255 Timeout,
256 Cancelled {
257 wait_id: String,
258 waitpoint_ids: Vec<String>,
259 reason: Option<String>,
260 },
261}
262
263pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
264 vm.register_async_builtin("ask_user", |args| {
265 Box::pin(async move { ask_user_impl(&args).await })
266 });
267
268 vm.register_async_builtin("request_approval", |args| {
269 Box::pin(async move { request_approval_impl(&args).await })
270 });
271
272 vm.register_async_builtin("dual_control", |args| {
273 Box::pin(async move { dual_control_impl(&args).await })
274 });
275
276 vm.register_async_builtin("escalate_to", |args| {
277 Box::pin(async move { escalate_to_impl(&args).await })
278 });
279}
280
281pub(crate) fn reset_hitl_state() {
282 REQUEST_SEQUENCE.with(|slot| {
283 *slot.borrow_mut() = RequestSequenceState::default();
284 });
285}
286
287pub(crate) fn take_hitl_state() -> RequestSequenceState {
288 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
289}
290
291pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
292 REQUEST_SEQUENCE.with(|slot| {
293 *slot.borrow_mut() = state;
294 });
295}
296
297pub async fn append_hitl_response(
298 base_dir: Option<&Path>,
299 mut response: HitlHostResponse,
300) -> Result<u64, String> {
301 let kind = HitlRequestKind::from_request_id(&response.request_id)
302 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
303 if response.responded_at.is_none() {
304 response.responded_at = Some(now_rfc3339());
305 }
306 let log = ensure_hitl_event_log_for(base_dir)?;
307 let headers = response_headers(&response.request_id);
308 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
309 let event_id = log
310 .append(
311 &topic,
312 LogEvent::new(
313 match kind {
314 HitlRequestKind::Escalation => "hitl.escalation_accepted",
315 _ => "hitl.response_received",
316 },
317 serde_json::to_value(&response).map_err(|error| error.to_string())?,
318 )
319 .with_headers(headers),
320 )
321 .await
322 .map_err(|error| error.to_string())?;
323 finalize_hitl_response(&log, kind, &response).await?;
324 Ok(event_id)
325}
326
327pub async fn append_approval_request_on(
328 log: &Arc<AnyEventLog>,
329 agent: impl Into<String>,
330 trace_id: impl Into<String>,
331 action: impl Into<String>,
332 detail: JsonValue,
333 reviewers: Vec<String>,
334) -> Result<String, VmError> {
335 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
336 let trace_id = trace_id.into();
337 let agent = agent.into();
338 let requested_at_time = OffsetDateTime::now_utc();
339 let requested_at = format_rfc3339(requested_at_time);
340 let mut approval_request = ApprovalRequest::new(
341 request_id.clone(),
342 action.into(),
343 detail.clone(),
344 agent.clone(),
345 requested_at.clone(),
346 );
347 approval_request.deadline = deadline_after(
348 requested_at_time,
349 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
350 );
351 approval_request.approvers_required = 1;
352 let approval_request_json = serde_json::to_value(&approval_request)
353 .map_err(|error| VmError::Runtime(error.to_string()))?;
354 let request = HitlRequestEnvelope {
355 request_id: request_id.clone(),
356 kind: HitlRequestKind::Approval,
357 agent,
358 trace_id: trace_id.clone(),
359 run_id: None,
360 requested_at: requested_at.clone(),
361 payload: json!({
362 "approval_request": approval_request_json,
363 "id": approval_request.id,
364 "action": approval_request.action,
365 "args": approval_request.args,
366 "principal": approval_request.principal,
367 "requested_at": requested_at,
368 "deadline": approval_request.deadline,
369 "approvers_required": approval_request.approvers_required,
370 "evidence_refs": approval_request.evidence_refs,
371 "undo_metadata": approval_request.undo_metadata,
372 "capabilities_requested": approval_request.capabilities_requested,
373 "detail": detail,
374 "quorum": 1,
375 "reviewers": reviewers,
376 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
377 }),
378 };
379 create_request_waitpoint(log, &request).await?;
380 append_request(log, &request).await?;
381 maybe_notify_host(&request);
382 Ok(request_id)
383}
384
385async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
386 let prompt = required_string_arg(args, 0, "ask_user")?;
387 let options = parse_ask_user_options(args.get(1))?;
388 let keys = current_dispatch_keys();
389 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
390 let trace_id = keys
391 .as_ref()
392 .map(|keys| keys.trace_id.clone())
393 .unwrap_or_else(new_trace_id);
394 let log = ensure_hitl_event_log();
395 let request = HitlRequestEnvelope {
396 request_id: request_id.clone(),
397 kind: HitlRequestKind::Question,
398 agent: keys
399 .as_ref()
400 .map(|keys| keys.agent.clone())
401 .unwrap_or_default(),
402 trace_id: trace_id.clone(),
403 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
404 requested_at: now_rfc3339(),
405 payload: json!({
406 "prompt": prompt,
407 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
408 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
409 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
410 }),
411 };
412 create_request_waitpoint(&log, &request).await?;
413 append_request(&log, &request).await?;
414 maybe_notify_host(&request);
415 emit_hitl_requested(&request);
416 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
417
418 match wait_for_request_waitpoint_with_events(
419 &request_id,
420 HitlRequestKind::Question,
421 options.timeout,
422 )
423 .await?
424 {
425 WaitpointOutcome::Completed(record) => {
426 let answer = record
427 .value
428 .as_ref()
429 .map(crate::stdlib::json_to_vm_value)
430 .unwrap_or(VmValue::Nil);
431 if let Some(schema) = options.schema.as_ref() {
432 return schema_expect_value(&answer, schema, true);
433 }
434 if let Some(default) = options.default.as_ref() {
435 return Ok(coerce_like_default(&answer, default));
436 }
437 Ok(answer)
438 }
439 WaitpointOutcome::Timeout => {
440 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
441 if let Some(default) = options.default {
442 return Ok(default);
443 }
444 Err(timeout_error(&request_id, HitlRequestKind::Question))
445 }
446 WaitpointOutcome::Cancelled {
447 wait_id,
448 waitpoint_ids,
449 reason,
450 } => Err(hitl_cancelled_error(
451 &request_id,
452 HitlRequestKind::Question,
453 &wait_id,
454 &waitpoint_ids,
455 reason,
456 )),
457 }
458}
459
460async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
461 let action = required_string_arg(args, 0, "request_approval")?;
462 let options = parse_approval_options(args.get(1), "request_approval")?;
463 let keys = current_dispatch_keys();
464 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
465 let trace_id = keys
466 .as_ref()
467 .map(|keys| keys.trace_id.clone())
468 .unwrap_or_else(new_trace_id);
469 let agent = keys
470 .as_ref()
471 .map(|keys| keys.agent.clone())
472 .unwrap_or_default();
473 let requested_at_time = OffsetDateTime::now_utc();
474 let requested_at = format_rfc3339(requested_at_time);
475 let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
476 let approval_args = options
477 .args
478 .as_ref()
479 .or(options.detail.as_ref())
480 .map(crate::llm::vm_value_to_json)
481 .unwrap_or(JsonValue::Null);
482 let mut approval_request = ApprovalRequest::new(
483 request_id.clone(),
484 action.clone(),
485 approval_args,
486 principal,
487 requested_at.clone(),
488 );
489 approval_request.deadline = deadline_after(requested_at_time, options.deadline);
490 approval_request.approvers_required = options.quorum;
491 approval_request.evidence_refs = options.evidence_refs.clone();
492 approval_request.undo_metadata = options
493 .undo_metadata
494 .clone()
495 .or_else(|| {
496 crate::orchestration::current_mutation_session()
497 .and_then(|session| serde_json::to_value(session).ok())
498 })
499 .unwrap_or(JsonValue::Null);
500 approval_request.capabilities_requested = options.capabilities_requested.clone();
501 let approval_request_json = serde_json::to_value(&approval_request)
502 .map_err(|error| VmError::Runtime(error.to_string()))?;
503 let log = ensure_hitl_event_log();
504 let request = HitlRequestEnvelope {
505 request_id: request_id.clone(),
506 kind: HitlRequestKind::Approval,
507 agent,
508 trace_id: trace_id.clone(),
509 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
510 requested_at: requested_at.clone(),
511 payload: json!({
512 "approval_request": approval_request_json,
513 "id": approval_request.id,
514 "action": action,
515 "args": approval_request.args,
516 "principal": approval_request.principal,
517 "requested_at": requested_at,
518 "deadline": approval_request.deadline,
519 "approvers_required": approval_request.approvers_required,
520 "evidence_refs": approval_request.evidence_refs,
521 "undo_metadata": approval_request.undo_metadata,
522 "capabilities_requested": approval_request.capabilities_requested,
523 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
524 "quorum": options.quorum,
525 "reviewers": options.reviewers,
526 "deadline_ms": options.deadline.as_millis() as u64,
527 }),
528 };
529 create_request_waitpoint(&log, &request).await?;
530 append_request(&log, &request).await?;
531 maybe_notify_host(&request);
532 emit_hitl_requested(&request);
533 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
534
535 match wait_for_request_waitpoint_with_events(
536 &request_id,
537 HitlRequestKind::Approval,
538 Some(options.deadline),
539 )
540 .await?
541 {
542 WaitpointOutcome::Completed(record) => {
543 approval_record_from_waitpoint(&record, "request_approval")
544 }
545 WaitpointOutcome::Timeout => {
546 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
547 Err(timeout_error(&request_id, HitlRequestKind::Approval))
548 }
549 WaitpointOutcome::Cancelled { .. } => {
550 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
551 }
552 }
553}
554
555pub(crate) async fn request_approval_for_side_effect(
556 action: &str,
557 detail: JsonValue,
558 principal: String,
559 reviewers: Vec<String>,
560 capabilities_requested: Vec<String>,
561) -> Result<VmValue, VmError> {
562 let mut options = BTreeMap::new();
563 options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
564 options.insert(
565 "detail".to_string(),
566 crate::stdlib::json_to_vm_value(&detail),
567 );
568 options.insert(
569 "principal".to_string(),
570 VmValue::String(Rc::from(principal)),
571 );
572 options.insert(
573 "reviewers".to_string(),
574 VmValue::List(Rc::new(
575 reviewers
576 .into_iter()
577 .map(|reviewer| VmValue::String(Rc::from(reviewer)))
578 .collect(),
579 )),
580 );
581 options.insert(
582 "capabilities_requested".to_string(),
583 VmValue::List(Rc::new(
584 capabilities_requested
585 .into_iter()
586 .map(|capability| VmValue::String(Rc::from(capability)))
587 .collect(),
588 )),
589 );
590 let args = vec![
591 VmValue::String(Rc::from(action.to_string())),
592 VmValue::Dict(Rc::new(options)),
593 ];
594 request_approval_impl(&args).await
595}
596
597async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
598 let n = required_positive_int_arg(args, 0, "dual_control")?;
599 let m = required_positive_int_arg(args, 1, "dual_control")?;
600 if n > m {
601 return Err(VmError::Runtime(
602 "dual_control: n must be less than or equal to m".to_string(),
603 ));
604 }
605 let action = args
606 .get(2)
607 .and_then(|value| match value {
608 VmValue::Closure(closure) => Some(closure.clone()),
609 _ => None,
610 })
611 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
612 let approvers = optional_string_list(args.get(3), "dual_control")?;
613 if !approvers.is_empty() && approvers.len() < m as usize {
614 return Err(VmError::Runtime(format!(
615 "dual_control: expected at least {m} approvers, got {}",
616 approvers.len()
617 )));
618 }
619
620 let keys = current_dispatch_keys();
621 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
622 let trace_id = keys
623 .as_ref()
624 .map(|keys| keys.trace_id.clone())
625 .unwrap_or_else(new_trace_id);
626 let action_name = if action.func.name.is_empty() {
627 "anonymous".to_string()
628 } else {
629 action.func.name.clone()
630 };
631 let agent = keys
632 .as_ref()
633 .map(|keys| keys.agent.clone())
634 .unwrap_or_default();
635 let requested_at_time = OffsetDateTime::now_utc();
636 let requested_at = format_rfc3339(requested_at_time);
637 let mut approval_request = ApprovalRequest::new(
638 request_id.clone(),
639 action_name.clone(),
640 json!({
641 "n": n,
642 "m": m,
643 "approvers": approvers.clone(),
644 }),
645 agent.clone(),
646 requested_at.clone(),
647 );
648 approval_request.deadline = deadline_after(
649 requested_at_time,
650 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
651 );
652 approval_request.approvers_required = n as u32;
653 approval_request.undo_metadata = crate::orchestration::current_mutation_session()
654 .and_then(|session| serde_json::to_value(session).ok())
655 .unwrap_or(JsonValue::Null);
656 let approval_request_json = serde_json::to_value(&approval_request)
657 .map_err(|error| VmError::Runtime(error.to_string()))?;
658 let log = ensure_hitl_event_log();
659 let request = HitlRequestEnvelope {
660 request_id: request_id.clone(),
661 kind: HitlRequestKind::DualControl,
662 agent,
663 trace_id: trace_id.clone(),
664 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
665 requested_at: requested_at.clone(),
666 payload: json!({
667 "approval_request": approval_request_json,
668 "id": approval_request.id,
669 "args": approval_request.args,
670 "principal": approval_request.principal,
671 "requested_at": requested_at,
672 "deadline": approval_request.deadline,
673 "approvers_required": approval_request.approvers_required,
674 "evidence_refs": approval_request.evidence_refs,
675 "undo_metadata": approval_request.undo_metadata,
676 "capabilities_requested": approval_request.capabilities_requested,
677 "n": n,
678 "m": m,
679 "action": action_name,
680 "approvers": approvers,
681 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
682 }),
683 };
684 create_request_waitpoint(&log, &request).await?;
685 append_request(&log, &request).await?;
686 maybe_notify_host(&request);
687 emit_hitl_requested(&request);
688 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
689
690 match wait_for_request_waitpoint_with_events(
691 &request_id,
692 HitlRequestKind::DualControl,
693 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
694 )
695 .await?
696 {
697 WaitpointOutcome::Completed(record) => {
698 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
699 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
700 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
701 })?;
702 let result = vm.call_closure_pub(&action, &[]).await?;
703
704 append_named_event(
705 &log,
706 HitlRequestKind::DualControl,
707 "hitl.dual_control_executed",
708 &request_id,
709 &trace_id,
710 json!({
711 "request_id": request_id,
712 "result": crate::llm::vm_value_to_json(&result),
713 }),
714 )
715 .await?;
716
717 Ok(result)
718 }
719 WaitpointOutcome::Timeout => {
720 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
721 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
722 }
723 WaitpointOutcome::Cancelled { .. } => {
724 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
725 }
726 }
727}
728
729async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
730 let role = required_string_arg(args, 0, "escalate_to")?;
731 let reason = required_string_arg(args, 1, "escalate_to")?;
732 let keys = current_dispatch_keys();
733 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
734 let trace_id = keys
735 .as_ref()
736 .map(|keys| keys.trace_id.clone())
737 .unwrap_or_else(new_trace_id);
738 let log = ensure_hitl_event_log();
739 let request = HitlRequestEnvelope {
740 request_id: request_id.clone(),
741 kind: HitlRequestKind::Escalation,
742 agent: keys
743 .as_ref()
744 .map(|keys| keys.agent.clone())
745 .unwrap_or_default(),
746 trace_id: trace_id.clone(),
747 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
748 requested_at: now_rfc3339(),
749 payload: json!({
750 "role": role,
751 "reason": reason,
752 "capability_policy": escalation_capability_policy(),
753 }),
754 };
755 create_request_waitpoint(&log, &request).await?;
756 append_request(&log, &request).await?;
757 maybe_notify_host(&request);
758 emit_hitl_requested(&request);
759 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
760
761 match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
762 .await?
763 {
764 WaitpointOutcome::Completed(record) => {
765 let accepted_at = record.completed_at.clone();
766 let reviewer = record.completed_by.clone();
767 let accepted = record
768 .value
769 .as_ref()
770 .and_then(|value| value.get("accepted"))
771 .and_then(JsonValue::as_bool)
772 .unwrap_or(true);
773 Ok(crate::stdlib::json_to_vm_value(&json!({
774 "request_id": request_id,
775 "role": role,
776 "reason": reason,
777 "trace_id": trace_id,
778 "status": if accepted { "accepted" } else { "pending" },
779 "accepted_at": accepted_at,
780 "reviewer": reviewer,
781 })))
782 }
783 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
784 WaitpointOutcome::Cancelled {
785 wait_id,
786 waitpoint_ids,
787 reason,
788 } => Err(hitl_cancelled_error(
789 &request_id,
790 HitlRequestKind::Escalation,
791 &wait_id,
792 &waitpoint_ids,
793 reason,
794 )),
795 }
796}
797
798async fn create_request_waitpoint(
799 log: &Arc<AnyEventLog>,
800 request: &HitlRequestEnvelope,
801) -> Result<(), VmError> {
802 create_waitpoint_on(
803 log,
804 Some(request.request_id.clone()),
805 Some(json!({
806 "kind": request.kind.as_str(),
807 "agent": request.agent.clone(),
808 "trace_id": request.trace_id.clone(),
809 "requested_at": request.requested_at.clone(),
810 "payload": request.payload.clone(),
811 })),
812 )
813 .await?;
814 Ok(())
815}
816
817async fn wait_for_request_waitpoint(
818 request_id: &str,
819 timeout: Option<StdDuration>,
820) -> Result<WaitpointOutcome, VmError> {
821 match wait_on_waitpoints(
822 vec![request_id.to_string()],
823 WaitpointWaitOptions { timeout },
824 )
825 .await
826 {
827 Ok(records) => Ok(WaitpointOutcome::Completed(
828 records
829 .into_iter()
830 .next()
831 .expect("single waitpoint wait result"),
832 )),
833 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
834 Err(WaitpointWaitFailure::Cancelled {
835 wait_id,
836 waitpoint_ids,
837 reason,
838 }) => Ok(WaitpointOutcome::Cancelled {
839 wait_id,
840 waitpoint_ids,
841 reason,
842 }),
843 Err(WaitpointWaitFailure::Vm(error)) => {
844 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
845 return Ok(outcome);
846 }
847 Err(error)
848 }
849 }
850}
851
852fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
853 let VmError::Thrown(VmValue::Dict(dict)) = error else {
854 return None;
855 };
856 let name = dict.get("name").and_then(vm_string)?;
857 match name {
858 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
859 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
860 wait_id: dict
861 .get("wait_id")
862 .and_then(vm_string)
863 .unwrap_or_default()
864 .to_string(),
865 waitpoint_ids: dict
866 .get("waitpoint_ids")
867 .and_then(vm_string_list)
868 .unwrap_or_default(),
869 reason: dict
870 .get("reason")
871 .and_then(vm_string)
872 .map(ToString::to_string),
873 }),
874 _ => None,
875 }
876}
877
878async fn finalize_hitl_response(
879 log: &Arc<AnyEventLog>,
880 kind: HitlRequestKind,
881 response: &HitlHostResponse,
882) -> Result<(), String> {
883 match kind {
884 HitlRequestKind::Question => {
885 if waitpoint_is_terminal(log, &response.request_id).await? {
886 return Ok(());
887 }
888 complete_waitpoint_on(
889 log,
890 &response.request_id,
891 response.answer.clone(),
892 response.reviewer.clone(),
893 response.reason.clone(),
894 response.metadata.clone(),
895 )
896 .await
897 .map(|_| ())
898 .map_err(|error| error.to_string())
899 }
900 HitlRequestKind::Escalation => {
901 if !response.accepted.unwrap_or(false)
902 || waitpoint_is_terminal(log, &response.request_id).await?
903 {
904 return Ok(());
905 }
906 complete_waitpoint_on(
907 log,
908 &response.request_id,
909 Some(json!({
910 "accepted": true,
911 "reviewer": response.reviewer,
912 "reason": response.reason,
913 "responded_at": response.responded_at,
914 })),
915 response.reviewer.clone(),
916 response.reason.clone(),
917 response.metadata.clone(),
918 )
919 .await
920 .map(|_| ())
921 .map_err(|error| error.to_string())
922 }
923 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
924 if waitpoint_is_terminal(log, &response.request_id).await? {
925 return Ok(());
926 }
927 let request = load_request_envelope(log, kind, &response.request_id)
928 .await
929 .map_err(|error| error.to_string())?;
930 match resolve_approval_state(log, kind, &request)
931 .await
932 .map_err(|error| error.to_string())?
933 {
934 ApprovalResolution::Pending => Ok(()),
935 ApprovalResolution::Approved(progress) => {
936 let record = approval_record_json(&progress);
937 append_named_event(
938 log,
939 kind,
940 approved_event_kind(kind),
941 &request.request_id,
942 &request.trace_id,
943 json!({
944 "request_id": request.request_id.clone(),
945 "record": record.clone(),
946 }),
947 )
948 .await
949 .map_err(|error| error.to_string())?;
950 complete_waitpoint_on(
951 log,
952 &request.request_id,
953 Some(record),
954 response.reviewer.clone(),
955 progress.reason.clone(),
956 response.metadata.clone(),
957 )
958 .await
959 .map(|_| ())
960 .map_err(|error| error.to_string())
961 }
962 ApprovalResolution::Denied(denied) => {
963 append_named_event(
964 log,
965 kind,
966 denied_event_kind(kind),
967 &request.request_id,
968 &request.trace_id,
969 json!({
970 "request_id": request.request_id.clone(),
971 "reviewer": denied.reviewer.clone(),
972 "reason": denied.reason.clone(),
973 }),
974 )
975 .await
976 .map_err(|error| error.to_string())?;
977 cancel_waitpoint_on(
978 log,
979 &request.request_id,
980 denied.reviewer.clone(),
981 denied.reason.clone(),
982 denied.metadata.clone(),
983 )
984 .await
985 .map(|_| ())
986 .map_err(|error| error.to_string())
987 }
988 }
989 }
990 }
991}
992
993async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
994 Ok(inspect_waitpoint_on(log, request_id)
995 .await
996 .map_err(|error| error.to_string())?
997 .is_some_and(|record| record.status != WaitpointStatus::Open))
998}
999
1000async fn load_request_envelope(
1001 log: &Arc<AnyEventLog>,
1002 kind: HitlRequestKind,
1003 request_id: &str,
1004) -> Result<HitlRequestEnvelope, VmError> {
1005 let topic = topic(kind)?;
1006 let events = log
1007 .read_range(&topic, None, usize::MAX)
1008 .await
1009 .map_err(log_error)?;
1010 events
1011 .into_iter()
1012 .filter(|(_, event)| event.kind == kind.request_event_kind())
1013 .find_map(|(_, event)| {
1014 if !event_matches_request(&event, request_id) {
1015 return None;
1016 }
1017 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1018 })
1019 .ok_or_else(|| {
1020 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1021 })
1022}
1023
1024async fn resolve_approval_state(
1025 log: &Arc<AnyEventLog>,
1026 kind: HitlRequestKind,
1027 request: &HitlRequestEnvelope,
1028) -> Result<ApprovalResolution, VmError> {
1029 let quorum = approval_quorum_from_request(kind, request)?;
1030 let allowed_reviewers = approval_reviewers_from_request(kind, request)
1031 .into_iter()
1032 .collect::<BTreeSet<_>>();
1033 let mut progress = ApprovalProgress {
1034 request_id: request.request_id.clone(),
1035 reviewers: BTreeSet::new(),
1036 signatures: Vec::new(),
1037 reason: None,
1038 approved_at: None,
1039 };
1040 let topic = topic(kind)?;
1041 let events = log
1042 .read_range(&topic, None, usize::MAX)
1043 .await
1044 .map_err(log_error)?;
1045 for (_, event) in events {
1046 if !event_matches_request(&event, &request.request_id)
1047 || event.kind != "hitl.response_received"
1048 {
1049 continue;
1050 }
1051 let response: HitlHostResponse = serde_json::from_value(event.payload)
1052 .map_err(|error| VmError::Runtime(error.to_string()))?;
1053 if let Some(reviewer) = response.reviewer.as_deref() {
1054 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1055 continue;
1056 }
1057 if progress.reviewers.contains(reviewer) {
1058 continue;
1059 }
1060 }
1061 if response.approved.unwrap_or(false) {
1062 if let Some(reviewer) = response.reviewer.clone() {
1063 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1064 progress.reviewers.insert(reviewer.clone());
1065 progress.signatures.push(ApprovalSignature {
1066 reviewer: reviewer.clone(),
1067 signed_at: signed_at.clone(),
1068 signature: response.signature.clone().unwrap_or_else(|| {
1069 approval_receipt_signature(
1070 &request.request_id,
1071 &reviewer,
1072 &signed_at,
1073 true,
1074 response.reason.as_deref(),
1075 )
1076 }),
1077 });
1078 }
1079 progress.reason = response.reason.clone();
1080 progress.approved_at = response.responded_at.clone();
1081 if progress.reviewers.len() as u32 >= quorum {
1082 return Ok(ApprovalResolution::Approved(progress));
1083 }
1084 continue;
1085 }
1086 return Ok(ApprovalResolution::Denied(response));
1087 }
1088 Ok(ApprovalResolution::Pending)
1089}
1090
1091fn approval_quorum_from_request(
1092 kind: HitlRequestKind,
1093 request: &HitlRequestEnvelope,
1094) -> Result<u32, VmError> {
1095 let key = match kind {
1096 HitlRequestKind::DualControl => "n",
1097 _ => "quorum",
1098 };
1099 let quorum = request
1100 .payload
1101 .get(key)
1102 .or_else(|| request.payload.get("approvers_required"))
1103 .or_else(|| {
1104 request
1105 .payload
1106 .get("approval_request")
1107 .and_then(|approval| approval.get("approvers_required"))
1108 })
1109 .and_then(JsonValue::as_u64)
1110 .unwrap_or(1);
1111 u32::try_from(quorum).map_err(|_| {
1112 VmError::Runtime(format!(
1113 "invalid quorum in HITL request '{}'",
1114 request.request_id
1115 ))
1116 })
1117}
1118
1119fn approval_reviewers_from_request(
1120 kind: HitlRequestKind,
1121 request: &HitlRequestEnvelope,
1122) -> Vec<String> {
1123 let key = match kind {
1124 HitlRequestKind::DualControl => "approvers",
1125 _ => "reviewers",
1126 };
1127 request
1128 .payload
1129 .get(key)
1130 .or_else(|| {
1131 request
1132 .payload
1133 .get("approval_request")
1134 .and_then(|approval| approval.get("reviewers"))
1135 })
1136 .and_then(JsonValue::as_array)
1137 .map(|values| {
1138 values
1139 .iter()
1140 .filter_map(JsonValue::as_str)
1141 .map(str::to_string)
1142 .collect()
1143 })
1144 .unwrap_or_default()
1145}
1146
1147fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1148 json!({
1149 "request_id": progress.request_id.clone(),
1150 "approved": true,
1151 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1152 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1153 "reason": progress.reason,
1154 "signatures": progress.signatures,
1155 })
1156}
1157
1158fn approval_receipt_signature(
1159 request_id: &str,
1160 reviewer: &str,
1161 signed_at: &str,
1162 approved: bool,
1163 reason: Option<&str>,
1164) -> String {
1165 let material = format!(
1166 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1167 reason.unwrap_or("")
1168 );
1169 let hash = sha2::Sha256::digest(material.as_bytes());
1170 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1171 format!("sha256:{hex}")
1172}
1173
1174fn approval_record_from_waitpoint(
1175 record: &WaitpointRecord,
1176 builtin: &str,
1177) -> Result<VmValue, VmError> {
1178 record
1179 .value
1180 .as_ref()
1181 .map(crate::stdlib::json_to_vm_value)
1182 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1183}
1184
1185async fn approval_wait_error(
1186 log: &Arc<AnyEventLog>,
1187 kind: HitlRequestKind,
1188 request_id: &str,
1189) -> VmError {
1190 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1191 if record.status == WaitpointStatus::Cancelled
1192 && record.reason.as_deref() != Some("upstream_cancelled")
1193 {
1194 return approval_denied_error(
1195 request_id,
1196 HitlHostResponse {
1197 request_id: request_id.to_string(),
1198 answer: None,
1199 approved: Some(false),
1200 accepted: None,
1201 reviewer: record.cancelled_by.clone(),
1202 reason: record.reason.clone(),
1203 metadata: record.metadata.clone(),
1204 responded_at: record.cancelled_at.clone(),
1205 signature: None,
1206 },
1207 );
1208 }
1209 if record.status == WaitpointStatus::Cancelled {
1210 return hitl_cancelled_error(
1211 request_id,
1212 kind,
1213 "",
1214 &[request_id.to_string()],
1215 record.reason.clone(),
1216 );
1217 }
1218 }
1219 hitl_cancelled_error(
1220 request_id,
1221 kind,
1222 "",
1223 &[request_id.to_string()],
1224 Some("upstream_cancelled".to_string()),
1225 )
1226}
1227
1228async fn append_timeout_once(
1229 log: &Arc<AnyEventLog>,
1230 kind: HitlRequestKind,
1231 request_id: &str,
1232 trace_id: &str,
1233) -> Result<(), VmError> {
1234 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1235 return Ok(());
1236 }
1237 append_timeout(log, kind, request_id, trace_id).await
1238}
1239
1240async fn hitl_event_exists(
1241 log: &Arc<AnyEventLog>,
1242 kind: HitlRequestKind,
1243 request_id: &str,
1244 event_kind: &str,
1245) -> Result<bool, VmError> {
1246 let topic = topic(kind)?;
1247 let events = log
1248 .read_range(&topic, None, usize::MAX)
1249 .await
1250 .map_err(log_error)?;
1251 Ok(events
1252 .into_iter()
1253 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1254}
1255
1256fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1257 match kind {
1258 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1259 _ => "hitl.approval_approved",
1260 }
1261}
1262
1263fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1264 match kind {
1265 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1266 _ => "hitl.approval_denied",
1267 }
1268}
1269
1270async fn append_request(
1271 log: &Arc<AnyEventLog>,
1272 request: &HitlRequestEnvelope,
1273) -> Result<(), VmError> {
1274 let topic = topic(request.kind)?;
1275 log.append(
1276 &topic,
1277 LogEvent::new(
1278 request.kind.request_event_kind(),
1279 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1280 )
1281 .with_headers(request_headers(request)),
1282 )
1283 .await
1284 .map(|_| ())
1285 .map_err(log_error)
1286}
1287
1288async fn append_named_event(
1289 log: &Arc<AnyEventLog>,
1290 kind: HitlRequestKind,
1291 event_kind: &str,
1292 request_id: &str,
1293 trace_id: &str,
1294 payload: JsonValue,
1295) -> Result<(), VmError> {
1296 let topic = topic(kind)?;
1297 let headers = headers_with_trace(request_id, trace_id);
1298 log.append(
1299 &topic,
1300 LogEvent::new(event_kind, payload).with_headers(headers),
1301 )
1302 .await
1303 .map(|_| ())
1304 .map_err(log_error)
1305}
1306
1307async fn append_timeout(
1308 log: &Arc<AnyEventLog>,
1309 kind: HitlRequestKind,
1310 request_id: &str,
1311 trace_id: &str,
1312) -> Result<(), VmError> {
1313 append_named_event(
1314 log,
1315 kind,
1316 "hitl.timeout",
1317 request_id,
1318 trace_id,
1319 serde_json::to_value(HitlTimeoutRecord {
1320 request_id: request_id.to_string(),
1321 kind,
1322 trace_id: trace_id.to_string(),
1323 timed_out_at: now_rfc3339(),
1324 })
1325 .map_err(|error| VmError::Runtime(error.to_string()))?,
1326 )
1327 .await
1328}
1329
1330async fn maybe_apply_mock_response(
1331 kind: HitlRequestKind,
1332 request_id: &str,
1333 request_payload: &JsonValue,
1334) -> Result<(), VmError> {
1335 let mut params = request_payload
1336 .as_object()
1337 .cloned()
1338 .unwrap_or_default()
1339 .into_iter()
1340 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1341 .collect::<BTreeMap<_, _>>();
1342 params.insert(
1343 "request_id".to_string(),
1344 VmValue::String(Rc::from(request_id.to_string())),
1345 );
1346 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1347 return Ok(());
1348 };
1349 let value = result?;
1350 let responses = match value {
1351 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1352 other => vec![other],
1353 };
1354 for response in responses {
1355 let response_dict = response.as_dict().ok_or_else(|| {
1356 VmError::Runtime(format!(
1357 "mocked HITL {} response must be a dict or list<dict>",
1358 kind.as_str()
1359 ))
1360 })?;
1361 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1362 append_hitl_response(None, hitl_response)
1363 .await
1364 .map_err(VmError::Runtime)?;
1365 }
1366 Ok(())
1367}
1368
1369fn parse_hitl_response_dict(
1370 request_id: &str,
1371 response_dict: &BTreeMap<String, VmValue>,
1372) -> Result<HitlHostResponse, VmError> {
1373 Ok(HitlHostResponse {
1374 request_id: request_id.to_string(),
1375 answer: response_dict
1376 .get("answer")
1377 .map(crate::llm::vm_value_to_json),
1378 approved: response_dict.get("approved").and_then(vm_bool),
1379 accepted: response_dict.get("accepted").and_then(vm_bool),
1380 reviewer: response_dict.get("reviewer").map(VmValue::display),
1381 reason: response_dict.get("reason").map(VmValue::display),
1382 metadata: response_dict
1383 .get("metadata")
1384 .map(crate::llm::vm_value_to_json),
1385 responded_at: response_dict.get("responded_at").map(VmValue::display),
1386 signature: response_dict.get("signature").map(VmValue::display),
1387 })
1388}
1389
1390fn maybe_notify_host(request: &HitlRequestEnvelope) {
1391 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1392 return;
1393 };
1394 bridge.notify(
1395 "harn.hitl.requested",
1396 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1397 );
1398}
1399
1400fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1407 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1408 return;
1409 };
1410 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1411 session_id,
1412 request_id: request.request_id.clone(),
1413 kind: request.kind.as_str().to_string(),
1414 payload: request.payload.clone(),
1415 });
1416}
1417
1418fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1423 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1424 return;
1425 };
1426 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1427 session_id,
1428 request_id: request_id.to_string(),
1429 kind: kind.as_str().to_string(),
1430 outcome: outcome.to_string(),
1431 });
1432}
1433
1434async fn wait_for_request_waitpoint_with_events(
1441 request_id: &str,
1442 kind: HitlRequestKind,
1443 timeout: Option<StdDuration>,
1444) -> Result<WaitpointOutcome, VmError> {
1445 let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1446 let label = match &outcome {
1447 Ok(WaitpointOutcome::Completed(_)) => "answered",
1448 Ok(WaitpointOutcome::Timeout) => "timeout",
1449 Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1450 Err(_) => "error",
1451 };
1452 emit_hitl_resolved(request_id, kind, label);
1453 outcome
1454}
1455
1456fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1457 let Some(value) = value else {
1458 return Ok(AskUserOptions {
1459 schema: None,
1460 timeout: Some(default_question_timeout()),
1461 default: None,
1462 });
1463 };
1464 let dict = value
1465 .as_dict()
1466 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1467 Ok(AskUserOptions {
1468 schema: dict
1469 .get("schema")
1470 .cloned()
1471 .filter(|value| !matches!(value, VmValue::Nil)),
1472 timeout: dict
1473 .get("timeout")
1474 .map(parse_duration_value)
1475 .transpose()?
1476 .or_else(|| Some(default_question_timeout())),
1477 default: dict
1478 .get("default")
1479 .cloned()
1480 .filter(|value| !matches!(value, VmValue::Nil)),
1481 })
1482}
1483
1484fn default_question_timeout() -> StdDuration {
1485 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1486}
1487
1488fn escalation_capability_policy() -> JsonValue {
1489 crate::orchestration::current_execution_policy()
1490 .and_then(|policy| serde_json::to_value(policy).ok())
1491 .unwrap_or(JsonValue::Null)
1492}
1493
1494fn parse_approval_options(
1495 value: Option<&VmValue>,
1496 builtin: &str,
1497) -> Result<ApprovalOptions, VmError> {
1498 let dict = match value {
1499 None => None,
1500 Some(VmValue::Dict(dict)) => Some(dict),
1501 Some(_) => {
1502 return Err(VmError::Runtime(format!(
1503 "{builtin}: options must be a dict"
1504 )))
1505 }
1506 };
1507 let quorum = dict
1508 .and_then(|dict| dict.get("quorum"))
1509 .and_then(VmValue::as_int)
1510 .unwrap_or(1);
1511 if quorum <= 0 {
1512 return Err(VmError::Runtime(format!(
1513 "{builtin}: quorum must be positive"
1514 )));
1515 }
1516 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1517 let capabilities_requested = optional_string_list(
1518 dict.and_then(|dict| dict.get("capabilities_requested")),
1519 builtin,
1520 )?;
1521 let evidence_refs = dict
1522 .and_then(|dict| dict.get("evidence_refs"))
1523 .map(|value| match value {
1524 VmValue::List(items) => Ok(items
1525 .iter()
1526 .map(crate::llm::vm_value_to_json)
1527 .collect::<Vec<_>>()),
1528 _ => Err(VmError::Runtime(format!(
1529 "{builtin}: evidence_refs must be a list"
1530 ))),
1531 })
1532 .transpose()?
1533 .unwrap_or_default();
1534 let deadline = dict
1535 .and_then(|dict| dict.get("deadline"))
1536 .map(parse_duration_value)
1537 .transpose()?
1538 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1539 Ok(ApprovalOptions {
1540 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1541 args: dict.and_then(|dict| dict.get("args")).cloned(),
1542 quorum: quorum as u32,
1543 reviewers,
1544 deadline,
1545 principal: dict
1546 .and_then(|dict| dict.get("principal"))
1547 .map(VmValue::display)
1548 .filter(|value| !value.is_empty()),
1549 evidence_refs,
1550 undo_metadata: dict
1551 .and_then(|dict| dict.get("undo_metadata"))
1552 .map(crate::llm::vm_value_to_json),
1553 capabilities_requested,
1554 })
1555}
1556
1557fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1558 args.get(idx)
1559 .map(VmValue::display)
1560 .filter(|value| !value.is_empty())
1561 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1562}
1563
1564fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1565 let value = args
1566 .get(idx)
1567 .and_then(VmValue::as_int)
1568 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1569 if value <= 0 {
1570 return Err(VmError::Runtime(format!(
1571 "{builtin}: expected a positive int at {idx}"
1572 )));
1573 }
1574 Ok(value)
1575}
1576
1577fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1578 let Some(value) = value else {
1579 return Ok(Vec::new());
1580 };
1581 match value {
1582 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1583 _ => Err(VmError::Runtime(format!(
1584 "{builtin}: expected list<string>"
1585 ))),
1586 }
1587}
1588
1589fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1590 match value {
1591 VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1592 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1593 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1594 _ => Err(VmError::Runtime(
1595 "expected a duration or millisecond count".to_string(),
1596 )),
1597 }
1598}
1599
1600fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1601 active_event_log()
1602 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1603}
1604
1605fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1606 if let Some(log) = active_event_log() {
1607 return Ok(log);
1608 }
1609 let Some(base_dir) = base_dir else {
1610 return Ok(install_memory_for_current_thread(
1611 HITL_EVENT_LOG_QUEUE_DEPTH,
1612 ));
1613 };
1614 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1615}
1616
1617fn current_dispatch_keys() -> Option<DispatchKeys> {
1618 let context = current_dispatch_context()?;
1619 let stable_base = context
1620 .replay_of_event_id
1621 .clone()
1622 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1623 let instance_key = format!(
1624 "{}::{}",
1625 context.trigger_event.id.0,
1626 context.replay_of_event_id.as_deref().unwrap_or("live")
1627 );
1628 Some(DispatchKeys {
1629 instance_key,
1630 stable_base,
1631 agent: context.agent_id,
1632 trace_id: context.trigger_event.trace_id.0,
1633 })
1634}
1635
1636fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1637 if let Some(keys) = dispatch_keys {
1638 let seq = REQUEST_SEQUENCE.with(|slot| {
1639 let mut state = slot.borrow_mut();
1640 if state.instance_key != keys.instance_key {
1641 state.instance_key = keys.instance_key.clone();
1642 state.next_seq = 0;
1643 }
1644 state.next_seq += 1;
1645 state.next_seq
1646 });
1647 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1648 }
1649 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1650}
1651
1652fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1653 let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1654 if let Some(run_id) = request.run_id.as_ref() {
1655 headers.insert("run_id".to_string(), run_id.clone());
1656 }
1657 headers
1658}
1659
1660fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1661 let mut headers = BTreeMap::new();
1662 headers.insert("request_id".to_string(), request_id.to_string());
1663 headers
1664}
1665
1666fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1667 let mut headers = response_headers(request_id);
1668 headers.insert("trace_id".to_string(), trace_id.to_string());
1669 headers
1670}
1671
1672fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1673 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1674}
1675
1676fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1677 event
1678 .headers
1679 .get("request_id")
1680 .is_some_and(|value| value == request_id)
1681 || event
1682 .payload
1683 .get("request_id")
1684 .and_then(JsonValue::as_str)
1685 .is_some_and(|value| value == request_id)
1686}
1687
1688fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1689 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1690 "name": "ApprovalDeniedError",
1691 "category": "generic",
1692 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1693 "request_id": request_id,
1694 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1695 "reason": response.reason,
1696 })))
1697}
1698
1699fn hitl_cancelled_error(
1700 request_id: &str,
1701 kind: HitlRequestKind,
1702 wait_id: &str,
1703 waitpoint_ids: &[String],
1704 reason: Option<String>,
1705) -> VmError {
1706 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1707 let message = reason
1708 .clone()
1709 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1710 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1711 "name": "HumanCancelledError",
1712 "category": ErrorCategory::Cancelled.as_str(),
1713 "message": message,
1714 "request_id": request_id,
1715 "kind": kind.as_str(),
1716 "wait_id": wait_id,
1717 "waitpoint_ids": waitpoint_ids,
1718 "reason": reason,
1719 })))
1720}
1721
1722fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1723 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1724 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1725 "name": "HumanTimeoutError",
1726 "category": ErrorCategory::Timeout.as_str(),
1727 "message": format!("{} timed out", kind.as_str()),
1728 "request_id": request_id,
1729 "kind": kind.as_str(),
1730 })))
1731}
1732
1733fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1734 match default {
1735 VmValue::Int(_) => match value {
1736 VmValue::Int(_) => value.clone(),
1737 VmValue::Float(number) => VmValue::Int(*number as i64),
1738 VmValue::String(text) => text
1739 .parse::<i64>()
1740 .map(VmValue::Int)
1741 .unwrap_or_else(|_| default.clone()),
1742 _ => default.clone(),
1743 },
1744 VmValue::Float(_) => match value {
1745 VmValue::Float(_) => value.clone(),
1746 VmValue::Int(number) => VmValue::Float(*number as f64),
1747 VmValue::String(text) => text
1748 .parse::<f64>()
1749 .map(VmValue::Float)
1750 .unwrap_or_else(|_| default.clone()),
1751 _ => default.clone(),
1752 },
1753 VmValue::Bool(_) => match value {
1754 VmValue::Bool(_) => value.clone(),
1755 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1756 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1757 _ => default.clone(),
1758 },
1759 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1760 VmValue::Duration(_) => match value {
1761 VmValue::Duration(_) => value.clone(),
1762 VmValue::Int(ms) => VmValue::Duration(*ms),
1763 _ => default.clone(),
1764 },
1765 VmValue::Nil => value.clone(),
1766 _ => {
1767 if value.type_name() == default.type_name() {
1768 value.clone()
1769 } else {
1770 default.clone()
1771 }
1772 }
1773 }
1774}
1775
1776fn log_error(error: impl std::fmt::Display) -> VmError {
1777 VmError::Runtime(error.to_string())
1778}
1779
1780fn now_rfc3339() -> String {
1781 format_rfc3339(OffsetDateTime::now_utc())
1782}
1783
1784fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1785 timestamp
1786 .format(&Rfc3339)
1787 .unwrap_or_else(|_| timestamp.to_string())
1788}
1789
1790fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1791 time::Duration::try_from(duration)
1792 .ok()
1793 .map(|duration| format_rfc3339(requested_at + duration))
1794}
1795
1796fn new_trace_id() -> String {
1797 format!("trace_{}", Uuid::now_v7())
1798}
1799
1800fn vm_bool(value: &VmValue) -> Option<bool> {
1801 match value {
1802 VmValue::Bool(flag) => Some(*flag),
1803 _ => None,
1804 }
1805}
1806
1807fn vm_string(value: &VmValue) -> Option<&str> {
1808 match value {
1809 VmValue::String(text) => Some(text.as_ref()),
1810 _ => None,
1811 }
1812}
1813
1814fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1815 match value {
1816 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1817 _ => None,
1818 }
1819}
1820
1821#[cfg(test)]
1822mod tests {
1823 use super::{
1824 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1825 };
1826 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1827 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1828
1829 async fn execute_hitl_script(
1830 base_dir: &std::path::Path,
1831 source: &str,
1832 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1833 reset_thread_local_state();
1834 let log = install_default_for_base_dir(base_dir).expect("install event log");
1835 let chunk = compile_source(source).expect("compile source");
1836 let mut vm = Vm::new();
1837 register_vm_stdlib(&mut vm);
1838 vm.set_source_dir(base_dir);
1839 vm.execute(&chunk).await?;
1840 let output = vm.output().trim_end().to_string();
1841 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1842 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1843 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1844 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1845 Ok((
1846 output,
1847 question_events,
1848 approval_events,
1849 dual_control_events,
1850 escalation_events,
1851 ))
1852 }
1853
1854 async fn event_kinds(
1855 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1856 topic: &str,
1857 ) -> Vec<String> {
1858 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1859 .await
1860 .expect("read topic")
1861 .into_iter()
1862 .map(|(_, event)| event.kind)
1863 .collect()
1864 }
1865
1866 async fn event_payloads(
1867 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1868 topic: &str,
1869 ) -> Vec<serde_json::Value> {
1870 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1871 .await
1872 .expect("read topic")
1873 .into_iter()
1874 .map(|(_, event)| event.payload)
1875 .collect()
1876 }
1877
1878 #[tokio::test(flavor = "current_thread")]
1879 async fn ask_user_coerces_to_default_type_and_logs_events() {
1880 tokio::task::LocalSet::new()
1881 .run_until(async {
1882 let dir = tempfile::tempdir().expect("tempdir");
1883 let source = r#"
1884pipeline test(task) {
1885 host_mock("hitl", "question", {answer: "9"})
1886 let answer: int = ask_user("Pick a number", {default: 0})
1887 println(answer)
1888}
1889"#;
1890 let (
1891 output,
1892 question_events,
1893 approval_events,
1894 dual_control_events,
1895 escalation_events,
1896 ) = execute_hitl_script(dir.path(), source)
1897 .await
1898 .expect("script succeeds");
1899 assert_eq!(output, "9");
1900 assert_eq!(
1901 question_events,
1902 vec![
1903 "hitl.question_asked".to_string(),
1904 "hitl.response_received".to_string()
1905 ]
1906 );
1907 assert!(approval_events.is_empty());
1908 assert!(dual_control_events.is_empty());
1909 assert!(escalation_events.is_empty());
1910 })
1911 .await;
1912 }
1913
1914 #[tokio::test(flavor = "current_thread")]
1915 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1916 tokio::task::LocalSet::new()
1917 .run_until(async {
1918 let dir = tempfile::tempdir().expect("tempdir");
1919 let source = r#"
1920pipeline test(task) {
1921 host_mock("hitl", "approval", [
1922 {approved: true, reviewer: "alice", reason: "ok"},
1923 {approved: true, reviewer: "bob", reason: "ship it"},
1924 ])
1925 let record = request_approval(
1926 "deploy production",
1927 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1928 )
1929 println(record.approved)
1930 println(len(record.reviewers))
1931 println(record.reviewers[0])
1932 println(record.reviewers[1])
1933}
1934"#;
1935 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1936 .await
1937 .expect("script succeeds");
1938 assert_eq!(
1939 approval_events,
1940 vec![
1941 "hitl.approval_requested".to_string(),
1942 "hitl.response_received".to_string(),
1943 "hitl.response_received".to_string(),
1944 "hitl.approval_approved".to_string(),
1945 ]
1946 );
1947 })
1948 .await;
1949 }
1950
1951 #[tokio::test(flavor = "current_thread")]
1952 async fn request_approval_emits_canonical_approval_request_payload() {
1953 tokio::task::LocalSet::new()
1954 .run_until(async {
1955 reset_thread_local_state();
1956 let dir = tempfile::tempdir().expect("tempdir");
1957 let log = install_default_for_base_dir(dir.path()).expect("install event log");
1958 let source = r#"
1959pipeline test(task) {
1960 host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
1961 request_approval("deploy production", {
1962 args: {environment: "prod"},
1963 quorum: 1,
1964 reviewers: ["alice"],
1965 evidence_refs: [{kind: "run", uri: "run_123"}],
1966 undo_metadata: {strategy: "rollback"},
1967 capabilities_requested: ["deploy.production"],
1968 })
1969}
1970"#;
1971 let chunk = compile_source(source).expect("compile source");
1972 let mut vm = Vm::new();
1973 register_vm_stdlib(&mut vm);
1974 vm.set_source_dir(dir.path());
1975 vm.execute(&chunk).await.expect("script succeeds");
1976
1977 let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
1978 let request_payload = &payloads[0]["payload"];
1979 let approval_request = &request_payload["approval_request"];
1980 assert_eq!(approval_request["id"], request_payload["id"]);
1981 assert_eq!(approval_request["action"], "deploy production");
1982 assert_eq!(approval_request["args"]["environment"], "prod");
1983 assert_eq!(approval_request["approvers_required"], 1);
1984 assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
1985 assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
1986 assert_eq!(
1987 approval_request["capabilities_requested"][0],
1988 "deploy.production"
1989 );
1990 assert!(approval_request["requested_at"].as_str().is_some());
1991 assert!(approval_request["deadline"].as_str().is_some());
1992 })
1993 .await;
1994 }
1995
1996 #[tokio::test(flavor = "current_thread")]
1997 async fn request_approval_surfaces_denials_as_typed_errors() {
1998 tokio::task::LocalSet::new()
1999 .run_until(async {
2000 let dir = tempfile::tempdir().expect("tempdir");
2001 let source = r#"
2002pipeline test(task) {
2003 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2004 let denied = try {
2005 request_approval("drop table", {reviewers: ["alice"]})
2006 }
2007 println(is_err(denied))
2008 println(unwrap_err(denied).name)
2009 println(unwrap_err(denied).reason)
2010}
2011"#;
2012 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2013 .await
2014 .expect("script succeeds");
2015 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2016 assert_eq!(
2017 approval_events,
2018 vec![
2019 "hitl.approval_requested".to_string(),
2020 "hitl.response_received".to_string(),
2021 "hitl.approval_denied".to_string(),
2022 ]
2023 );
2024 })
2025 .await;
2026 }
2027
2028 #[tokio::test(flavor = "current_thread")]
2029 async fn dual_control_executes_action_after_quorum() {
2030 tokio::task::LocalSet::new()
2031 .run_until(async {
2032 let dir = tempfile::tempdir().expect("tempdir");
2033 let source = r#"
2034pipeline test(task) {
2035 host_mock("hitl", "dual_control", [
2036 {approved: true, reviewer: "alice"},
2037 {approved: true, reviewer: "bob"},
2038 ])
2039 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2040 println(result)
2041}
2042"#;
2043 let (output, _, _, dual_control_events, _) =
2044 execute_hitl_script(dir.path(), source)
2045 .await
2046 .expect("script succeeds");
2047 assert_eq!(output, "launched");
2048 assert_eq!(
2049 dual_control_events,
2050 vec![
2051 "hitl.dual_control_requested".to_string(),
2052 "hitl.response_received".to_string(),
2053 "hitl.response_received".to_string(),
2054 "hitl.dual_control_approved".to_string(),
2055 "hitl.dual_control_executed".to_string(),
2056 ]
2057 );
2058 })
2059 .await;
2060 }
2061
2062 #[tokio::test(flavor = "current_thread")]
2063 async fn escalate_to_waits_for_acceptance_event() {
2064 tokio::task::LocalSet::new()
2065 .run_until(async {
2066 let dir = tempfile::tempdir().expect("tempdir");
2067 let source = r#"
2068pipeline test(task) {
2069 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2070 let handle = escalate_to("admin", "need override")
2071 println(handle.status)
2072 println(handle.reviewer)
2073}
2074"#;
2075 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2076 .await
2077 .expect("script succeeds");
2078 assert_eq!(output, "accepted\nlead");
2079 assert_eq!(
2080 escalation_events,
2081 vec![
2082 "hitl.escalation_issued".to_string(),
2083 "hitl.escalation_accepted".to_string(),
2084 ]
2085 );
2086 })
2087 .await;
2088 }
2089
2090 #[tokio::test(flavor = "current_thread")]
2096 async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2097 use std::sync::Mutex as StdMutex;
2098
2099 tokio::task::LocalSet::new()
2100 .run_until(async {
2101 let dir = tempfile::tempdir().expect("tempdir");
2102 let session_id = "hitl-session".to_string();
2103 let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2104 std::sync::Arc::new(StdMutex::new(Vec::new()));
2105
2106 struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2107 impl crate::agent_events::AgentEventSink for CaptureSink {
2108 fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2109 self.0.lock().expect("captured").push(event.clone());
2110 }
2111 }
2112
2113 crate::reset_thread_local_state();
2119 crate::event_log::install_default_for_base_dir(dir.path())
2120 .expect("install event log");
2121
2122 crate::agent_events::reset_all_sinks();
2123 let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2124 std::sync::Arc::new(CaptureSink(captured.clone()));
2125 crate::agent_events::register_sink(session_id.clone(), sink);
2126 crate::agent_sessions::open_or_create(Some(session_id.clone()));
2127 let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2128
2129 let source = r#"
2130pipeline test(task) {
2131 host_mock("hitl", "question", {answer: "ok"})
2132 let answer: string = ask_user("Are you sure?", {default: "no"})
2133 println(answer)
2134}
2135"#;
2136 let chunk = crate::compile_source(source).expect("compile source");
2137 let mut vm = Vm::new();
2138 register_vm_stdlib(&mut vm);
2139 vm.set_source_dir(dir.path());
2140 vm.execute(&chunk).await.expect("script runs");
2141 assert_eq!(vm.output().trim_end(), "ok");
2142
2143 let events = captured.lock().expect("captured");
2144 let mut iter = events.iter().filter(|event| {
2145 matches!(
2146 event,
2147 crate::agent_events::AgentEvent::HitlRequested { .. }
2148 | crate::agent_events::AgentEvent::HitlResolved { .. }
2149 )
2150 });
2151 let requested = iter.next().expect("HitlRequested emitted");
2152 let resolved = iter.next().expect("HitlResolved emitted");
2153 assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2154
2155 let crate::agent_events::AgentEvent::HitlRequested {
2156 session_id: req_session,
2157 request_id: req_id,
2158 kind: req_kind,
2159 payload,
2160 } = requested
2161 else {
2162 panic!("expected HitlRequested, got: {requested:?}");
2163 };
2164 assert_eq!(req_session, &session_id);
2165 assert_eq!(req_kind, "question");
2166 assert!(req_id.starts_with("hitl_question_"));
2167 assert_eq!(payload["prompt"], "Are you sure?");
2168
2169 let crate::agent_events::AgentEvent::HitlResolved {
2170 request_id: res_id,
2171 kind: res_kind,
2172 outcome,
2173 ..
2174 } = resolved
2175 else {
2176 panic!("expected HitlResolved, got: {resolved:?}");
2177 };
2178 assert_eq!(res_id, req_id);
2179 assert_eq!(res_kind, "question");
2180 assert_eq!(outcome, "answered");
2181
2182 drop(_guard);
2183 crate::agent_events::reset_all_sinks();
2184 })
2185 .await;
2186 }
2187}