1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::runtime_limits::RuntimeLimits;
20use crate::schema::schema_expect_value;
21use crate::stdlib::host::dispatch_mock_host_call;
22use crate::stdlib::waitpoint::{
23 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
24 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
25 WaitpointWaitOptions,
26};
27use crate::triggers::dispatcher::current_dispatch_context;
28use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
29use crate::vm::{clone_async_builtin_child_vm, Vm};
30
31const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
32const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
34
35pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
36pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
37pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
38pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
39
40thread_local! {
41 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
42}
43
44#[derive(Default)]
45pub(crate) struct RequestSequenceState {
46 pub(crate) instance_key: String,
47 pub(crate) next_seq: u64,
48}
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub enum HitlRequestKind {
53 Question,
54 Approval,
55 DualControl,
56 Escalation,
57}
58
59impl HitlRequestKind {
60 pub(crate) fn as_str(self) -> &'static str {
61 match self {
62 Self::Question => "question",
63 Self::Approval => "approval",
64 Self::DualControl => "dual_control",
65 Self::Escalation => "escalation",
66 }
67 }
68
69 fn topic(self) -> &'static str {
70 match self {
71 Self::Question => HITL_QUESTIONS_TOPIC,
72 Self::Approval => HITL_APPROVALS_TOPIC,
73 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
74 Self::Escalation => HITL_ESCALATIONS_TOPIC,
75 }
76 }
77
78 fn request_event_kind(self) -> &'static str {
79 match self {
80 Self::Question => "hitl.question_asked",
81 Self::Approval => "hitl.approval_requested",
82 Self::DualControl => "hitl.dual_control_requested",
83 Self::Escalation => "hitl.escalation_issued",
84 }
85 }
86
87 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
88 if request_id.starts_with("hitl_question_") {
89 Some(Self::Question)
90 } else if request_id.starts_with("hitl_approval_") {
91 Some(Self::Approval)
92 } else if request_id.starts_with("hitl_dual_control_") {
93 Some(Self::DualControl)
94 } else if request_id.starts_with("hitl_escalation_") {
95 Some(Self::Escalation)
96 } else {
97 None
98 }
99 }
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103pub struct HitlHostResponse {
104 pub request_id: String,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub answer: Option<JsonValue>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub approved: Option<bool>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub accepted: Option<bool>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub reviewer: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub reason: Option<String>,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 pub metadata: Option<JsonValue>,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub responded_at: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub signature: Option<String>,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize)]
124struct HitlRequestEnvelope {
125 request_id: String,
126 kind: HitlRequestKind,
127 #[serde(default)]
128 agent: String,
129 trace_id: String,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 run_id: Option<String>,
132 requested_at: String,
133 payload: JsonValue,
134}
135
136#[derive(Clone, Debug, Serialize, Deserialize)]
137struct HitlTimeoutRecord {
138 request_id: String,
139 kind: HitlRequestKind,
140 trace_id: String,
141 timed_out_at: String,
142}
143
144#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
145pub struct ApprovalRequest {
146 pub id: String,
147 pub action: String,
148 #[serde(default)]
149 pub args: JsonValue,
150 pub principal: String,
151 pub requested_at: String,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub deadline: Option<String>,
154 pub approvers_required: u32,
155 #[serde(default)]
156 pub evidence_refs: Vec<JsonValue>,
157 #[serde(default)]
158 pub undo_metadata: JsonValue,
159 #[serde(default)]
160 pub capabilities_requested: Vec<String>,
161}
162
163impl ApprovalRequest {
164 pub fn new(
165 id: impl Into<String>,
166 action: impl Into<String>,
167 args: JsonValue,
168 principal: impl Into<String>,
169 requested_at: impl Into<String>,
170 ) -> Self {
171 Self {
172 id: id.into(),
173 action: action.into(),
174 args,
175 principal: principal.into(),
176 requested_at: requested_at.into(),
177 deadline: None,
178 approvers_required: 1,
179 evidence_refs: Vec::new(),
180 undo_metadata: JsonValue::Null,
181 capabilities_requested: Vec::new(),
182 }
183 }
184}
185
186pub(crate) fn approval_request_for_host_permission(
187 id: impl Into<String>,
188 action: impl Into<String>,
189 args: JsonValue,
190 principal: impl Into<String>,
191 evidence_refs: Vec<JsonValue>,
192 undo_metadata: JsonValue,
193 capabilities_requested: Vec<String>,
194) -> ApprovalRequest {
195 let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
196 request.evidence_refs = evidence_refs;
197 request.undo_metadata = undo_metadata;
198 request.capabilities_requested = capabilities_requested;
199 request
200}
201
202#[derive(Clone, Debug)]
203struct DispatchKeys {
204 instance_key: String,
205 stable_base: String,
206 agent: String,
207 trace_id: String,
208}
209
210#[derive(Clone, Debug)]
211struct AskUserOptions {
212 schema: Option<VmValue>,
213 timeout: Option<StdDuration>,
214 default: Option<VmValue>,
215}
216
217#[derive(Clone, Debug)]
218struct ApprovalOptions {
219 detail: Option<VmValue>,
220 args: Option<VmValue>,
221 quorum: u32,
222 reviewers: Vec<String>,
223 deadline: StdDuration,
224 principal: Option<String>,
225 evidence_refs: Vec<JsonValue>,
226 undo_metadata: Option<JsonValue>,
227 capabilities_requested: Vec<String>,
228}
229
230#[derive(Clone, Debug)]
231struct ApprovalProgress {
232 request_id: String,
233 reviewers: BTreeSet<String>,
234 signatures: Vec<ApprovalSignature>,
235 reason: Option<String>,
236 approved_at: Option<String>,
237}
238
239#[derive(Clone, Debug, Serialize)]
240struct ApprovalSignature {
241 reviewer: String,
242 signed_at: String,
243 signature: String,
244}
245
246#[derive(Clone, Debug)]
247enum ApprovalResolution {
248 Pending,
249 Approved(ApprovalProgress),
250 Denied(HitlHostResponse),
251}
252
253#[allow(clippy::large_enum_variant)]
261#[derive(Clone, Debug)]
262enum WaitpointOutcome {
263 Completed(WaitpointRecord),
264 Timeout,
265 Cancelled {
266 wait_id: String,
267 waitpoint_ids: Vec<String>,
268 reason: Option<String>,
269 },
270}
271
272pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
273 vm.register_async_builtin("ask_user", |args| {
274 Box::pin(async move { ask_user_impl(&args).await })
275 });
276
277 vm.register_async_builtin("request_approval", |args| {
278 Box::pin(async move { request_approval_impl(&args).await })
279 });
280
281 vm.register_async_builtin("dual_control", |args| {
282 Box::pin(async move { dual_control_impl(&args).await })
283 });
284
285 vm.register_async_builtin("escalate_to", |args| {
286 Box::pin(async move { escalate_to_impl(&args).await })
287 });
288}
289
290pub(crate) fn reset_hitl_state() {
291 REQUEST_SEQUENCE.with(|slot| {
292 *slot.borrow_mut() = RequestSequenceState::default();
293 });
294}
295
296pub(crate) fn take_hitl_state() -> RequestSequenceState {
297 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
298}
299
300pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
301 REQUEST_SEQUENCE.with(|slot| {
302 *slot.borrow_mut() = state;
303 });
304}
305
306pub async fn append_hitl_response(
307 base_dir: Option<&Path>,
308 mut response: HitlHostResponse,
309) -> Result<u64, String> {
310 let kind = HitlRequestKind::from_request_id(&response.request_id)
311 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
312 if response.responded_at.is_none() {
313 response.responded_at = Some(now_rfc3339());
314 }
315 let log = ensure_hitl_event_log_for(base_dir)?;
316 let headers = response_headers(&response.request_id);
317 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
318 let event_id = log
319 .append(
320 &topic,
321 LogEvent::new(
322 match kind {
323 HitlRequestKind::Escalation => "hitl.escalation_accepted",
324 _ => "hitl.response_received",
325 },
326 serde_json::to_value(&response).map_err(|error| error.to_string())?,
327 )
328 .with_headers(headers),
329 )
330 .await
331 .map_err(|error| error.to_string())?;
332 finalize_hitl_response(&log, kind, &response).await?;
333 Ok(event_id)
334}
335
336pub async fn append_approval_request_on(
337 log: &Arc<AnyEventLog>,
338 agent: impl Into<String>,
339 trace_id: impl Into<String>,
340 action: impl Into<String>,
341 detail: JsonValue,
342 reviewers: Vec<String>,
343) -> Result<String, VmError> {
344 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
345 let trace_id = trace_id.into();
346 let agent = agent.into();
347 let requested_at_time = OffsetDateTime::now_utc();
348 let requested_at = format_rfc3339(requested_at_time);
349 let mut approval_request = ApprovalRequest::new(
350 request_id.clone(),
351 action.into(),
352 detail.clone(),
353 agent.clone(),
354 requested_at.clone(),
355 );
356 approval_request.deadline = deadline_after(
357 requested_at_time,
358 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
359 );
360 approval_request.approvers_required = 1;
361 let approval_request_json = serde_json::to_value(&approval_request)
362 .map_err(|error| VmError::Runtime(error.to_string()))?;
363 let request = HitlRequestEnvelope {
364 request_id: request_id.clone(),
365 kind: HitlRequestKind::Approval,
366 agent,
367 trace_id: trace_id.clone(),
368 run_id: None,
369 requested_at: requested_at.clone(),
370 payload: json!({
371 "approval_request": approval_request_json,
372 "id": approval_request.id,
373 "action": approval_request.action,
374 "args": approval_request.args,
375 "principal": approval_request.principal,
376 "requested_at": requested_at,
377 "deadline": approval_request.deadline,
378 "approvers_required": approval_request.approvers_required,
379 "evidence_refs": approval_request.evidence_refs,
380 "undo_metadata": approval_request.undo_metadata,
381 "capabilities_requested": approval_request.capabilities_requested,
382 "detail": detail,
383 "quorum": 1,
384 "reviewers": reviewers,
385 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
386 }),
387 };
388 create_request_waitpoint(log, &request).await?;
389 append_request(log, &request).await?;
390 maybe_notify_host(&request);
391 Ok(request_id)
392}
393
394async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
395 let prompt = required_string_arg(args, 0, "ask_user")?;
396 let options = parse_ask_user_options(args.get(1))?;
397 let keys = current_dispatch_keys();
398 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
399 let trace_id = keys
400 .as_ref()
401 .map(|keys| keys.trace_id.clone())
402 .unwrap_or_else(new_trace_id);
403 let log = ensure_hitl_event_log();
404 let request = HitlRequestEnvelope {
405 request_id: request_id.clone(),
406 kind: HitlRequestKind::Question,
407 agent: keys
408 .as_ref()
409 .map(|keys| keys.agent.clone())
410 .unwrap_or_default(),
411 trace_id: trace_id.clone(),
412 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
413 requested_at: now_rfc3339(),
414 payload: json!({
415 "prompt": prompt,
416 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
417 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
418 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
419 }),
420 };
421 create_request_waitpoint(&log, &request).await?;
422 append_request(&log, &request).await?;
423 maybe_notify_host(&request);
424 emit_hitl_requested(&request);
425 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
426
427 match wait_for_request_waitpoint_with_events(
428 &request_id,
429 HitlRequestKind::Question,
430 options.timeout,
431 )
432 .await?
433 {
434 WaitpointOutcome::Completed(record) => {
435 let answer = record
436 .value
437 .as_ref()
438 .map(crate::stdlib::json_to_vm_value)
439 .unwrap_or(VmValue::Nil);
440 if let Some(schema) = options.schema.as_ref() {
441 return schema_expect_value(&answer, schema, true);
442 }
443 if let Some(default) = options.default.as_ref() {
444 return Ok(coerce_like_default(&answer, default));
445 }
446 Ok(answer)
447 }
448 WaitpointOutcome::Timeout => {
449 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
450 if let Some(default) = options.default {
451 return Ok(default);
452 }
453 Err(timeout_error(&request_id, HitlRequestKind::Question))
454 }
455 WaitpointOutcome::Cancelled {
456 wait_id,
457 waitpoint_ids,
458 reason,
459 } => Err(hitl_cancelled_error(
460 &request_id,
461 HitlRequestKind::Question,
462 &wait_id,
463 &waitpoint_ids,
464 reason,
465 )),
466 }
467}
468
469async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
470 let action = required_string_arg(args, 0, "request_approval")?;
471 let options = parse_approval_options(args.get(1), "request_approval")?;
472 let keys = current_dispatch_keys();
473 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
474 let trace_id = keys
475 .as_ref()
476 .map(|keys| keys.trace_id.clone())
477 .unwrap_or_else(new_trace_id);
478 let agent = keys
479 .as_ref()
480 .map(|keys| keys.agent.clone())
481 .unwrap_or_default();
482 let requested_at_time = OffsetDateTime::now_utc();
483 let requested_at = format_rfc3339(requested_at_time);
484 let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
485 let approval_args = options
486 .args
487 .as_ref()
488 .or(options.detail.as_ref())
489 .map(crate::llm::vm_value_to_json)
490 .unwrap_or(JsonValue::Null);
491 let mut approval_request = ApprovalRequest::new(
492 request_id.clone(),
493 action.clone(),
494 approval_args,
495 principal,
496 requested_at.clone(),
497 );
498 approval_request.deadline = deadline_after(requested_at_time, options.deadline);
499 approval_request.approvers_required = options.quorum;
500 approval_request.evidence_refs = options.evidence_refs.clone();
501 approval_request.undo_metadata = options
502 .undo_metadata
503 .clone()
504 .or_else(|| {
505 crate::orchestration::current_mutation_session()
506 .and_then(|session| serde_json::to_value(session).ok())
507 })
508 .unwrap_or(JsonValue::Null);
509 approval_request.capabilities_requested = options.capabilities_requested.clone();
510 let approval_request_json = serde_json::to_value(&approval_request)
511 .map_err(|error| VmError::Runtime(error.to_string()))?;
512 let log = ensure_hitl_event_log();
513 let request = HitlRequestEnvelope {
514 request_id: request_id.clone(),
515 kind: HitlRequestKind::Approval,
516 agent,
517 trace_id: trace_id.clone(),
518 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
519 requested_at: requested_at.clone(),
520 payload: json!({
521 "approval_request": approval_request_json,
522 "id": approval_request.id,
523 "action": action,
524 "args": approval_request.args,
525 "principal": approval_request.principal,
526 "requested_at": requested_at,
527 "deadline": approval_request.deadline,
528 "approvers_required": approval_request.approvers_required,
529 "evidence_refs": approval_request.evidence_refs,
530 "undo_metadata": approval_request.undo_metadata,
531 "capabilities_requested": approval_request.capabilities_requested,
532 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
533 "quorum": options.quorum,
534 "reviewers": options.reviewers,
535 "deadline_ms": options.deadline.as_millis() as u64,
536 }),
537 };
538 create_request_waitpoint(&log, &request).await?;
539 append_request(&log, &request).await?;
540 maybe_notify_host(&request);
541 emit_hitl_requested(&request);
542 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
543
544 match wait_for_request_waitpoint_with_events(
545 &request_id,
546 HitlRequestKind::Approval,
547 Some(options.deadline),
548 )
549 .await?
550 {
551 WaitpointOutcome::Completed(record) => {
552 approval_record_from_waitpoint(&record, "request_approval")
553 }
554 WaitpointOutcome::Timeout => {
555 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
556 Err(timeout_error(&request_id, HitlRequestKind::Approval))
557 }
558 WaitpointOutcome::Cancelled { .. } => {
559 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
560 }
561 }
562}
563
564pub(crate) async fn request_approval_for_side_effect(
565 action: &str,
566 detail: JsonValue,
567 principal: String,
568 reviewers: Vec<String>,
569 capabilities_requested: Vec<String>,
570) -> Result<VmValue, VmError> {
571 let mut options = BTreeMap::new();
572 options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
573 options.insert(
574 "detail".to_string(),
575 crate::stdlib::json_to_vm_value(&detail),
576 );
577 options.insert(
578 "principal".to_string(),
579 VmValue::String(Rc::from(principal)),
580 );
581 options.insert(
582 "reviewers".to_string(),
583 VmValue::List(Rc::new(
584 reviewers
585 .into_iter()
586 .map(|reviewer| VmValue::String(Rc::from(reviewer)))
587 .collect(),
588 )),
589 );
590 options.insert(
591 "capabilities_requested".to_string(),
592 VmValue::List(Rc::new(
593 capabilities_requested
594 .into_iter()
595 .map(|capability| VmValue::String(Rc::from(capability)))
596 .collect(),
597 )),
598 );
599 let args = vec![
600 VmValue::String(Rc::from(action.to_string())),
601 VmValue::Dict(Rc::new(options)),
602 ];
603 request_approval_impl(&args).await
604}
605
606async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
607 let n = required_positive_int_arg(args, 0, "dual_control")?;
608 let m = required_positive_int_arg(args, 1, "dual_control")?;
609 if n > m {
610 return Err(VmError::Runtime(
611 "dual_control: n must be less than or equal to m".to_string(),
612 ));
613 }
614 let action = args
615 .get(2)
616 .and_then(|value| match value {
617 VmValue::Closure(closure) => Some(closure.clone()),
618 _ => None,
619 })
620 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
621 let approvers = optional_string_list(args.get(3), "dual_control")?;
622 if !approvers.is_empty() && approvers.len() < m as usize {
623 return Err(VmError::Runtime(format!(
624 "dual_control: expected at least {m} approvers, got {}",
625 approvers.len()
626 )));
627 }
628
629 let keys = current_dispatch_keys();
630 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
631 let trace_id = keys
632 .as_ref()
633 .map(|keys| keys.trace_id.clone())
634 .unwrap_or_else(new_trace_id);
635 let action_name = if action.func.name.is_empty() {
636 "anonymous".to_string()
637 } else {
638 action.func.name.clone()
639 };
640 let agent = keys
641 .as_ref()
642 .map(|keys| keys.agent.clone())
643 .unwrap_or_default();
644 let requested_at_time = OffsetDateTime::now_utc();
645 let requested_at = format_rfc3339(requested_at_time);
646 let mut approval_request = ApprovalRequest::new(
647 request_id.clone(),
648 action_name.clone(),
649 json!({
650 "n": n,
651 "m": m,
652 "approvers": approvers.clone(),
653 }),
654 agent.clone(),
655 requested_at.clone(),
656 );
657 approval_request.deadline = deadline_after(
658 requested_at_time,
659 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
660 );
661 approval_request.approvers_required = n as u32;
662 approval_request.undo_metadata = crate::orchestration::current_mutation_session()
663 .and_then(|session| serde_json::to_value(session).ok())
664 .unwrap_or(JsonValue::Null);
665 let approval_request_json = serde_json::to_value(&approval_request)
666 .map_err(|error| VmError::Runtime(error.to_string()))?;
667 let log = ensure_hitl_event_log();
668 let request = HitlRequestEnvelope {
669 request_id: request_id.clone(),
670 kind: HitlRequestKind::DualControl,
671 agent,
672 trace_id: trace_id.clone(),
673 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
674 requested_at: requested_at.clone(),
675 payload: json!({
676 "approval_request": approval_request_json,
677 "id": approval_request.id,
678 "args": approval_request.args,
679 "principal": approval_request.principal,
680 "requested_at": requested_at,
681 "deadline": approval_request.deadline,
682 "approvers_required": approval_request.approvers_required,
683 "evidence_refs": approval_request.evidence_refs,
684 "undo_metadata": approval_request.undo_metadata,
685 "capabilities_requested": approval_request.capabilities_requested,
686 "n": n,
687 "m": m,
688 "action": action_name,
689 "approvers": approvers,
690 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
691 }),
692 };
693 create_request_waitpoint(&log, &request).await?;
694 append_request(&log, &request).await?;
695 maybe_notify_host(&request);
696 emit_hitl_requested(&request);
697 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
698
699 match wait_for_request_waitpoint_with_events(
700 &request_id,
701 HitlRequestKind::DualControl,
702 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
703 )
704 .await?
705 {
706 WaitpointOutcome::Completed(record) => {
707 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
708 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
709 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
710 })?;
711 let result = vm.call_closure_pub(&action, &[]).await?;
712
713 append_named_event(
714 &log,
715 HitlRequestKind::DualControl,
716 "hitl.dual_control_executed",
717 &request_id,
718 &trace_id,
719 json!({
720 "request_id": request_id,
721 "result": crate::llm::vm_value_to_json(&result),
722 }),
723 )
724 .await?;
725
726 Ok(result)
727 }
728 WaitpointOutcome::Timeout => {
729 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
730 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
731 }
732 WaitpointOutcome::Cancelled { .. } => {
733 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
734 }
735 }
736}
737
738async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
739 let role = required_string_arg(args, 0, "escalate_to")?;
740 let reason = required_string_arg(args, 1, "escalate_to")?;
741 let keys = current_dispatch_keys();
742 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
743 let trace_id = keys
744 .as_ref()
745 .map(|keys| keys.trace_id.clone())
746 .unwrap_or_else(new_trace_id);
747 let log = ensure_hitl_event_log();
748 let request = HitlRequestEnvelope {
749 request_id: request_id.clone(),
750 kind: HitlRequestKind::Escalation,
751 agent: keys
752 .as_ref()
753 .map(|keys| keys.agent.clone())
754 .unwrap_or_default(),
755 trace_id: trace_id.clone(),
756 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
757 requested_at: now_rfc3339(),
758 payload: json!({
759 "role": role,
760 "reason": reason,
761 "capability_policy": escalation_capability_policy(),
762 }),
763 };
764 create_request_waitpoint(&log, &request).await?;
765 append_request(&log, &request).await?;
766 maybe_notify_host(&request);
767 emit_hitl_requested(&request);
768 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
769
770 match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
771 .await?
772 {
773 WaitpointOutcome::Completed(record) => {
774 let accepted_at = record.completed_at.clone();
775 let reviewer = record.completed_by.clone();
776 let accepted = record
777 .value
778 .as_ref()
779 .and_then(|value| value.get("accepted"))
780 .and_then(JsonValue::as_bool)
781 .unwrap_or(true);
782 Ok(crate::stdlib::json_to_vm_value(&json!({
783 "request_id": request_id,
784 "role": role,
785 "reason": reason,
786 "trace_id": trace_id,
787 "status": if accepted { "accepted" } else { "pending" },
788 "accepted_at": accepted_at,
789 "reviewer": reviewer,
790 })))
791 }
792 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
793 WaitpointOutcome::Cancelled {
794 wait_id,
795 waitpoint_ids,
796 reason,
797 } => Err(hitl_cancelled_error(
798 &request_id,
799 HitlRequestKind::Escalation,
800 &wait_id,
801 &waitpoint_ids,
802 reason,
803 )),
804 }
805}
806
807async fn create_request_waitpoint(
808 log: &Arc<AnyEventLog>,
809 request: &HitlRequestEnvelope,
810) -> Result<(), VmError> {
811 create_waitpoint_on(
812 log,
813 Some(request.request_id.clone()),
814 Some(json!({
815 "kind": request.kind.as_str(),
816 "agent": request.agent.clone(),
817 "trace_id": request.trace_id.clone(),
818 "requested_at": request.requested_at.clone(),
819 "payload": request.payload.clone(),
820 })),
821 )
822 .await?;
823 Ok(())
824}
825
826async fn wait_for_request_waitpoint(
827 request_id: &str,
828 timeout: Option<StdDuration>,
829) -> Result<WaitpointOutcome, VmError> {
830 match wait_on_waitpoints(
831 vec![request_id.to_string()],
832 WaitpointWaitOptions { timeout },
833 )
834 .await
835 {
836 Ok(records) => Ok(WaitpointOutcome::Completed(
837 records
838 .into_iter()
839 .next()
840 .expect("single waitpoint wait result"),
841 )),
842 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
843 Err(WaitpointWaitFailure::Cancelled {
844 wait_id,
845 waitpoint_ids,
846 reason,
847 }) => Ok(WaitpointOutcome::Cancelled {
848 wait_id,
849 waitpoint_ids,
850 reason,
851 }),
852 Err(WaitpointWaitFailure::Vm(error)) => {
853 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
854 return Ok(outcome);
855 }
856 Err(error)
857 }
858 }
859}
860
861fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
862 let VmError::Thrown(VmValue::Dict(dict)) = error else {
863 return None;
864 };
865 let name = dict.get("name").and_then(vm_string)?;
866 match name {
867 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
868 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
869 wait_id: dict
870 .get("wait_id")
871 .and_then(vm_string)
872 .unwrap_or_default()
873 .to_string(),
874 waitpoint_ids: dict
875 .get("waitpoint_ids")
876 .and_then(vm_string_list)
877 .unwrap_or_default(),
878 reason: dict
879 .get("reason")
880 .and_then(vm_string)
881 .map(ToString::to_string),
882 }),
883 _ => None,
884 }
885}
886
887async fn finalize_hitl_response(
888 log: &Arc<AnyEventLog>,
889 kind: HitlRequestKind,
890 response: &HitlHostResponse,
891) -> Result<(), String> {
892 match kind {
893 HitlRequestKind::Question => {
894 if waitpoint_is_terminal(log, &response.request_id).await? {
895 return Ok(());
896 }
897 complete_waitpoint_on(
898 log,
899 &response.request_id,
900 response.answer.clone(),
901 response.reviewer.clone(),
902 response.reason.clone(),
903 response.metadata.clone(),
904 )
905 .await
906 .map(|_| ())
907 .map_err(|error| error.to_string())
908 }
909 HitlRequestKind::Escalation => {
910 if !response.accepted.unwrap_or(false)
911 || waitpoint_is_terminal(log, &response.request_id).await?
912 {
913 return Ok(());
914 }
915 complete_waitpoint_on(
916 log,
917 &response.request_id,
918 Some(json!({
919 "accepted": true,
920 "reviewer": response.reviewer,
921 "reason": response.reason,
922 "responded_at": response.responded_at,
923 })),
924 response.reviewer.clone(),
925 response.reason.clone(),
926 response.metadata.clone(),
927 )
928 .await
929 .map(|_| ())
930 .map_err(|error| error.to_string())
931 }
932 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
933 if waitpoint_is_terminal(log, &response.request_id).await? {
934 return Ok(());
935 }
936 let request = load_request_envelope(log, kind, &response.request_id)
937 .await
938 .map_err(|error| error.to_string())?;
939 match resolve_approval_state(log, kind, &request)
940 .await
941 .map_err(|error| error.to_string())?
942 {
943 ApprovalResolution::Pending => Ok(()),
944 ApprovalResolution::Approved(progress) => {
945 let record = approval_record_json(&progress);
946 append_named_event(
947 log,
948 kind,
949 approved_event_kind(kind),
950 &request.request_id,
951 &request.trace_id,
952 json!({
953 "request_id": request.request_id.clone(),
954 "record": record.clone(),
955 }),
956 )
957 .await
958 .map_err(|error| error.to_string())?;
959 complete_waitpoint_on(
960 log,
961 &request.request_id,
962 Some(record),
963 response.reviewer.clone(),
964 progress.reason.clone(),
965 response.metadata.clone(),
966 )
967 .await
968 .map(|_| ())
969 .map_err(|error| error.to_string())
970 }
971 ApprovalResolution::Denied(denied) => {
972 append_named_event(
973 log,
974 kind,
975 denied_event_kind(kind),
976 &request.request_id,
977 &request.trace_id,
978 json!({
979 "request_id": request.request_id.clone(),
980 "reviewer": denied.reviewer.clone(),
981 "reason": denied.reason.clone(),
982 }),
983 )
984 .await
985 .map_err(|error| error.to_string())?;
986 cancel_waitpoint_on(
987 log,
988 &request.request_id,
989 denied.reviewer.clone(),
990 denied.reason.clone(),
991 denied.metadata.clone(),
992 )
993 .await
994 .map(|_| ())
995 .map_err(|error| error.to_string())
996 }
997 }
998 }
999 }
1000}
1001
1002async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1003 Ok(inspect_waitpoint_on(log, request_id)
1004 .await
1005 .map_err(|error| error.to_string())?
1006 .is_some_and(|record| record.status != WaitpointStatus::Open))
1007}
1008
1009async fn load_request_envelope(
1010 log: &Arc<AnyEventLog>,
1011 kind: HitlRequestKind,
1012 request_id: &str,
1013) -> Result<HitlRequestEnvelope, VmError> {
1014 let topic = topic(kind)?;
1015 let events = log
1016 .read_range(&topic, None, usize::MAX)
1017 .await
1018 .map_err(log_error)?;
1019 events
1020 .into_iter()
1021 .filter(|(_, event)| event.kind == kind.request_event_kind())
1022 .find_map(|(_, event)| {
1023 if !event_matches_request(&event, request_id) {
1024 return None;
1025 }
1026 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1027 })
1028 .ok_or_else(|| {
1029 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1030 })
1031}
1032
1033async fn resolve_approval_state(
1034 log: &Arc<AnyEventLog>,
1035 kind: HitlRequestKind,
1036 request: &HitlRequestEnvelope,
1037) -> Result<ApprovalResolution, VmError> {
1038 let quorum = approval_quorum_from_request(kind, request)?;
1039 let allowed_reviewers = approval_reviewers_from_request(kind, request)
1040 .into_iter()
1041 .collect::<BTreeSet<_>>();
1042 let mut progress = ApprovalProgress {
1043 request_id: request.request_id.clone(),
1044 reviewers: BTreeSet::new(),
1045 signatures: Vec::new(),
1046 reason: None,
1047 approved_at: None,
1048 };
1049 let topic = topic(kind)?;
1050 let events = log
1051 .read_range(&topic, None, usize::MAX)
1052 .await
1053 .map_err(log_error)?;
1054 for (_, event) in events {
1055 if !event_matches_request(&event, &request.request_id)
1056 || event.kind != "hitl.response_received"
1057 {
1058 continue;
1059 }
1060 let response: HitlHostResponse = serde_json::from_value(event.payload)
1061 .map_err(|error| VmError::Runtime(error.to_string()))?;
1062 if let Some(reviewer) = response.reviewer.as_deref() {
1063 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1064 continue;
1065 }
1066 if progress.reviewers.contains(reviewer) {
1067 continue;
1068 }
1069 }
1070 if response.approved.unwrap_or(false) {
1071 if let Some(reviewer) = response.reviewer.clone() {
1072 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1073 progress.reviewers.insert(reviewer.clone());
1074 progress.signatures.push(ApprovalSignature {
1075 reviewer: reviewer.clone(),
1076 signed_at: signed_at.clone(),
1077 signature: response.signature.clone().unwrap_or_else(|| {
1078 approval_receipt_signature(
1079 &request.request_id,
1080 &reviewer,
1081 &signed_at,
1082 true,
1083 response.reason.as_deref(),
1084 )
1085 }),
1086 });
1087 }
1088 progress.reason = response.reason.clone();
1089 progress.approved_at = response.responded_at.clone();
1090 if progress.reviewers.len() as u32 >= quorum {
1091 return Ok(ApprovalResolution::Approved(progress));
1092 }
1093 continue;
1094 }
1095 return Ok(ApprovalResolution::Denied(response));
1096 }
1097 Ok(ApprovalResolution::Pending)
1098}
1099
1100fn approval_quorum_from_request(
1101 kind: HitlRequestKind,
1102 request: &HitlRequestEnvelope,
1103) -> Result<u32, VmError> {
1104 let key = match kind {
1105 HitlRequestKind::DualControl => "n",
1106 _ => "quorum",
1107 };
1108 let quorum = request
1109 .payload
1110 .get(key)
1111 .or_else(|| request.payload.get("approvers_required"))
1112 .or_else(|| {
1113 request
1114 .payload
1115 .get("approval_request")
1116 .and_then(|approval| approval.get("approvers_required"))
1117 })
1118 .and_then(JsonValue::as_u64)
1119 .unwrap_or(1);
1120 u32::try_from(quorum).map_err(|_| {
1121 VmError::Runtime(format!(
1122 "invalid quorum in HITL request '{}'",
1123 request.request_id
1124 ))
1125 })
1126}
1127
1128fn approval_reviewers_from_request(
1129 kind: HitlRequestKind,
1130 request: &HitlRequestEnvelope,
1131) -> Vec<String> {
1132 let key = match kind {
1133 HitlRequestKind::DualControl => "approvers",
1134 _ => "reviewers",
1135 };
1136 request
1137 .payload
1138 .get(key)
1139 .or_else(|| {
1140 request
1141 .payload
1142 .get("approval_request")
1143 .and_then(|approval| approval.get("reviewers"))
1144 })
1145 .and_then(JsonValue::as_array)
1146 .map(|values| {
1147 values
1148 .iter()
1149 .filter_map(JsonValue::as_str)
1150 .map(str::to_string)
1151 .collect()
1152 })
1153 .unwrap_or_default()
1154}
1155
1156fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1157 json!({
1158 "request_id": progress.request_id.clone(),
1159 "approved": true,
1160 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1161 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1162 "reason": progress.reason,
1163 "signatures": progress.signatures,
1164 })
1165}
1166
1167fn approval_receipt_signature(
1168 request_id: &str,
1169 reviewer: &str,
1170 signed_at: &str,
1171 approved: bool,
1172 reason: Option<&str>,
1173) -> String {
1174 let material = format!(
1175 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1176 reason.unwrap_or("")
1177 );
1178 let hash = sha2::Sha256::digest(material.as_bytes());
1179 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1180 format!("sha256:{hex}")
1181}
1182
1183fn approval_record_from_waitpoint(
1184 record: &WaitpointRecord,
1185 builtin: &str,
1186) -> Result<VmValue, VmError> {
1187 record
1188 .value
1189 .as_ref()
1190 .map(crate::stdlib::json_to_vm_value)
1191 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1192}
1193
1194async fn approval_wait_error(
1195 log: &Arc<AnyEventLog>,
1196 kind: HitlRequestKind,
1197 request_id: &str,
1198) -> VmError {
1199 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1200 if record.status == WaitpointStatus::Cancelled
1201 && record.reason.as_deref() != Some("upstream_cancelled")
1202 {
1203 return approval_denied_error(
1204 request_id,
1205 HitlHostResponse {
1206 request_id: request_id.to_string(),
1207 answer: None,
1208 approved: Some(false),
1209 accepted: None,
1210 reviewer: record.cancelled_by.clone(),
1211 reason: record.reason.clone(),
1212 metadata: record.metadata.clone(),
1213 responded_at: record.cancelled_at.clone(),
1214 signature: None,
1215 },
1216 );
1217 }
1218 if record.status == WaitpointStatus::Cancelled {
1219 return hitl_cancelled_error(
1220 request_id,
1221 kind,
1222 "",
1223 &[request_id.to_string()],
1224 record.reason.clone(),
1225 );
1226 }
1227 }
1228 hitl_cancelled_error(
1229 request_id,
1230 kind,
1231 "",
1232 &[request_id.to_string()],
1233 Some("upstream_cancelled".to_string()),
1234 )
1235}
1236
1237async fn append_timeout_once(
1238 log: &Arc<AnyEventLog>,
1239 kind: HitlRequestKind,
1240 request_id: &str,
1241 trace_id: &str,
1242) -> Result<(), VmError> {
1243 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1244 return Ok(());
1245 }
1246 append_timeout(log, kind, request_id, trace_id).await
1247}
1248
1249async fn hitl_event_exists(
1250 log: &Arc<AnyEventLog>,
1251 kind: HitlRequestKind,
1252 request_id: &str,
1253 event_kind: &str,
1254) -> Result<bool, VmError> {
1255 let topic = topic(kind)?;
1256 let events = log
1257 .read_range(&topic, None, usize::MAX)
1258 .await
1259 .map_err(log_error)?;
1260 Ok(events
1261 .into_iter()
1262 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1263}
1264
1265fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1266 match kind {
1267 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1268 _ => "hitl.approval_approved",
1269 }
1270}
1271
1272fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1273 match kind {
1274 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1275 _ => "hitl.approval_denied",
1276 }
1277}
1278
1279async fn append_request(
1280 log: &Arc<AnyEventLog>,
1281 request: &HitlRequestEnvelope,
1282) -> Result<(), VmError> {
1283 let topic = topic(request.kind)?;
1284 log.append(
1285 &topic,
1286 LogEvent::new(
1287 request.kind.request_event_kind(),
1288 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1289 )
1290 .with_headers(request_headers(request)),
1291 )
1292 .await
1293 .map(|_| ())
1294 .map_err(log_error)
1295}
1296
1297async fn append_named_event(
1298 log: &Arc<AnyEventLog>,
1299 kind: HitlRequestKind,
1300 event_kind: &str,
1301 request_id: &str,
1302 trace_id: &str,
1303 payload: JsonValue,
1304) -> Result<(), VmError> {
1305 let topic = topic(kind)?;
1306 let headers = headers_with_trace(request_id, trace_id);
1307 log.append(
1308 &topic,
1309 LogEvent::new(event_kind, payload).with_headers(headers),
1310 )
1311 .await
1312 .map(|_| ())
1313 .map_err(log_error)
1314}
1315
1316async fn append_timeout(
1317 log: &Arc<AnyEventLog>,
1318 kind: HitlRequestKind,
1319 request_id: &str,
1320 trace_id: &str,
1321) -> Result<(), VmError> {
1322 append_named_event(
1323 log,
1324 kind,
1325 "hitl.timeout",
1326 request_id,
1327 trace_id,
1328 serde_json::to_value(HitlTimeoutRecord {
1329 request_id: request_id.to_string(),
1330 kind,
1331 trace_id: trace_id.to_string(),
1332 timed_out_at: now_rfc3339(),
1333 })
1334 .map_err(|error| VmError::Runtime(error.to_string()))?,
1335 )
1336 .await
1337}
1338
1339async fn maybe_apply_mock_response(
1340 kind: HitlRequestKind,
1341 request_id: &str,
1342 request_payload: &JsonValue,
1343) -> Result<(), VmError> {
1344 let mut params = request_payload
1345 .as_object()
1346 .cloned()
1347 .unwrap_or_default()
1348 .into_iter()
1349 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1350 .collect::<BTreeMap<_, _>>();
1351 params.insert(
1352 "request_id".to_string(),
1353 VmValue::String(Rc::from(request_id.to_string())),
1354 );
1355 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1356 return Ok(());
1357 };
1358 let value = result?;
1359 let responses = match value {
1360 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1361 other => vec![other],
1362 };
1363 for response in responses {
1364 let response_dict = response.as_dict().ok_or_else(|| {
1365 VmError::Runtime(format!(
1366 "mocked HITL {} response must be a dict or list<dict>",
1367 kind.as_str()
1368 ))
1369 })?;
1370 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1371 append_hitl_response(None, hitl_response)
1372 .await
1373 .map_err(VmError::Runtime)?;
1374 }
1375 Ok(())
1376}
1377
1378fn parse_hitl_response_dict(
1379 request_id: &str,
1380 response_dict: &BTreeMap<String, VmValue>,
1381) -> Result<HitlHostResponse, VmError> {
1382 Ok(HitlHostResponse {
1383 request_id: request_id.to_string(),
1384 answer: response_dict
1385 .get("answer")
1386 .map(crate::llm::vm_value_to_json),
1387 approved: response_dict.get("approved").and_then(vm_bool),
1388 accepted: response_dict.get("accepted").and_then(vm_bool),
1389 reviewer: response_dict.get("reviewer").map(VmValue::display),
1390 reason: response_dict.get("reason").map(VmValue::display),
1391 metadata: response_dict
1392 .get("metadata")
1393 .map(crate::llm::vm_value_to_json),
1394 responded_at: response_dict.get("responded_at").map(VmValue::display),
1395 signature: response_dict.get("signature").map(VmValue::display),
1396 })
1397}
1398
1399fn maybe_notify_host(request: &HitlRequestEnvelope) {
1400 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1401 return;
1402 };
1403 bridge.notify(
1404 "harn.hitl.requested",
1405 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1406 );
1407}
1408
1409fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1416 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1417 return;
1418 };
1419 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1420 session_id,
1421 request_id: request.request_id.clone(),
1422 kind: request.kind.as_str().to_string(),
1423 payload: request.payload.clone(),
1424 });
1425}
1426
1427fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1432 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1433 return;
1434 };
1435 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1436 session_id,
1437 request_id: request_id.to_string(),
1438 kind: kind.as_str().to_string(),
1439 outcome: outcome.to_string(),
1440 });
1441}
1442
1443async fn wait_for_request_waitpoint_with_events(
1450 request_id: &str,
1451 kind: HitlRequestKind,
1452 timeout: Option<StdDuration>,
1453) -> Result<WaitpointOutcome, VmError> {
1454 let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1455 let label = match &outcome {
1456 Ok(WaitpointOutcome::Completed(_)) => "answered",
1457 Ok(WaitpointOutcome::Timeout) => "timeout",
1458 Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1459 Err(_) => "error",
1460 };
1461 emit_hitl_resolved(request_id, kind, label);
1462 outcome
1463}
1464
1465fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1466 let Some(value) = value else {
1467 return Ok(AskUserOptions {
1468 schema: None,
1469 timeout: Some(default_question_timeout()),
1470 default: None,
1471 });
1472 };
1473 let dict = value
1474 .as_dict()
1475 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1476 Ok(AskUserOptions {
1477 schema: dict
1478 .get("schema")
1479 .cloned()
1480 .filter(|value| !matches!(value, VmValue::Nil)),
1481 timeout: dict
1482 .get("timeout")
1483 .map(parse_duration_value)
1484 .transpose()?
1485 .or_else(|| Some(default_question_timeout())),
1486 default: dict
1487 .get("default")
1488 .cloned()
1489 .filter(|value| !matches!(value, VmValue::Nil)),
1490 })
1491}
1492
1493fn default_question_timeout() -> StdDuration {
1494 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1495}
1496
1497fn escalation_capability_policy() -> JsonValue {
1498 crate::orchestration::current_execution_policy()
1499 .and_then(|policy| serde_json::to_value(policy).ok())
1500 .unwrap_or(JsonValue::Null)
1501}
1502
1503fn parse_approval_options(
1504 value: Option<&VmValue>,
1505 builtin: &str,
1506) -> Result<ApprovalOptions, VmError> {
1507 let dict = match value {
1508 None => None,
1509 Some(VmValue::Dict(dict)) => Some(dict),
1510 Some(_) => {
1511 return Err(VmError::Runtime(format!(
1512 "{builtin}: options must be a dict"
1513 )))
1514 }
1515 };
1516 let quorum = dict
1517 .and_then(|dict| dict.get("quorum"))
1518 .and_then(VmValue::as_int)
1519 .unwrap_or(1);
1520 if quorum <= 0 {
1521 return Err(VmError::Runtime(format!(
1522 "{builtin}: quorum must be positive"
1523 )));
1524 }
1525 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1526 let capabilities_requested = optional_string_list(
1527 dict.and_then(|dict| dict.get("capabilities_requested")),
1528 builtin,
1529 )?;
1530 let evidence_refs = dict
1531 .and_then(|dict| dict.get("evidence_refs"))
1532 .map(|value| match value {
1533 VmValue::List(items) => Ok(items
1534 .iter()
1535 .map(crate::llm::vm_value_to_json)
1536 .collect::<Vec<_>>()),
1537 _ => Err(VmError::Runtime(format!(
1538 "{builtin}: evidence_refs must be a list"
1539 ))),
1540 })
1541 .transpose()?
1542 .unwrap_or_default();
1543 let deadline = dict
1544 .and_then(|dict| dict.get("deadline"))
1545 .map(parse_duration_value)
1546 .transpose()?
1547 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1548 Ok(ApprovalOptions {
1549 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1550 args: dict.and_then(|dict| dict.get("args")).cloned(),
1551 quorum: quorum as u32,
1552 reviewers,
1553 deadline,
1554 principal: dict
1555 .and_then(|dict| dict.get("principal"))
1556 .map(VmValue::display)
1557 .filter(|value| !value.is_empty()),
1558 evidence_refs,
1559 undo_metadata: dict
1560 .and_then(|dict| dict.get("undo_metadata"))
1561 .map(crate::llm::vm_value_to_json),
1562 capabilities_requested,
1563 })
1564}
1565
1566fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1567 args.get(idx)
1568 .map(VmValue::display)
1569 .filter(|value| !value.is_empty())
1570 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1571}
1572
1573fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1574 let value = args
1575 .get(idx)
1576 .and_then(VmValue::as_int)
1577 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1578 if value <= 0 {
1579 return Err(VmError::Runtime(format!(
1580 "{builtin}: expected a positive int at {idx}"
1581 )));
1582 }
1583 Ok(value)
1584}
1585
1586fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1587 let Some(value) = value else {
1588 return Ok(Vec::new());
1589 };
1590 match value {
1591 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1592 _ => Err(VmError::Runtime(format!(
1593 "{builtin}: expected list<string>"
1594 ))),
1595 }
1596}
1597
1598fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1599 match value {
1600 VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1601 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1602 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1603 _ => Err(VmError::Runtime(
1604 "expected a duration or millisecond count".to_string(),
1605 )),
1606 }
1607}
1608
1609fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1610 active_event_log()
1611 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1612}
1613
1614fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1615 if let Some(log) = active_event_log() {
1616 return Ok(log);
1617 }
1618 let Some(base_dir) = base_dir else {
1619 return Ok(install_memory_for_current_thread(
1620 HITL_EVENT_LOG_QUEUE_DEPTH,
1621 ));
1622 };
1623 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1624}
1625
1626fn current_dispatch_keys() -> Option<DispatchKeys> {
1627 let context = current_dispatch_context()?;
1628 let stable_base = context
1629 .replay_of_event_id
1630 .clone()
1631 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1632 let instance_key = format!(
1633 "{}::{}",
1634 context.trigger_event.id.0,
1635 context.replay_of_event_id.as_deref().unwrap_or("live")
1636 );
1637 Some(DispatchKeys {
1638 instance_key,
1639 stable_base,
1640 agent: context.agent_id,
1641 trace_id: context.trigger_event.trace_id.0,
1642 })
1643}
1644
1645fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1646 if let Some(keys) = dispatch_keys {
1647 let seq = REQUEST_SEQUENCE.with(|slot| {
1648 let mut state = slot.borrow_mut();
1649 if state.instance_key != keys.instance_key {
1650 state.instance_key = keys.instance_key.clone();
1651 state.next_seq = 0;
1652 }
1653 state.next_seq += 1;
1654 state.next_seq
1655 });
1656 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1657 }
1658 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1659}
1660
1661fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1662 let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1663 if let Some(run_id) = request.run_id.as_ref() {
1664 headers.insert("run_id".to_string(), run_id.clone());
1665 }
1666 headers
1667}
1668
1669fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1670 let mut headers = BTreeMap::new();
1671 headers.insert("request_id".to_string(), request_id.to_string());
1672 headers
1673}
1674
1675fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1676 let mut headers = response_headers(request_id);
1677 headers.insert("trace_id".to_string(), trace_id.to_string());
1678 headers
1679}
1680
1681fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1682 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1683}
1684
1685fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1686 event
1687 .headers
1688 .get("request_id")
1689 .is_some_and(|value| value == request_id)
1690 || event
1691 .payload
1692 .get("request_id")
1693 .and_then(JsonValue::as_str)
1694 .is_some_and(|value| value == request_id)
1695}
1696
1697fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1698 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1699 "name": "ApprovalDeniedError",
1700 "category": "generic",
1701 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1702 "request_id": request_id,
1703 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1704 "reason": response.reason,
1705 })))
1706}
1707
1708fn hitl_cancelled_error(
1709 request_id: &str,
1710 kind: HitlRequestKind,
1711 wait_id: &str,
1712 waitpoint_ids: &[String],
1713 reason: Option<String>,
1714) -> VmError {
1715 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1716 let message = reason
1717 .clone()
1718 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1719 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1720 "name": "HumanCancelledError",
1721 "category": ErrorCategory::Cancelled.as_str(),
1722 "message": message,
1723 "request_id": request_id,
1724 "kind": kind.as_str(),
1725 "wait_id": wait_id,
1726 "waitpoint_ids": waitpoint_ids,
1727 "reason": reason,
1728 })))
1729}
1730
1731fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1732 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1733 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1734 "name": "HumanTimeoutError",
1735 "category": ErrorCategory::Timeout.as_str(),
1736 "message": format!("{} timed out", kind.as_str()),
1737 "request_id": request_id,
1738 "kind": kind.as_str(),
1739 })))
1740}
1741
1742fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1743 match default {
1744 VmValue::Int(_) => match value {
1745 VmValue::Int(_) => value.clone(),
1746 VmValue::Float(number) => VmValue::Int(*number as i64),
1747 VmValue::String(text) => text
1748 .parse::<i64>()
1749 .map(VmValue::Int)
1750 .unwrap_or_else(|_| default.clone()),
1751 _ => default.clone(),
1752 },
1753 VmValue::Float(_) => match value {
1754 VmValue::Float(_) => value.clone(),
1755 VmValue::Int(number) => VmValue::Float(*number as f64),
1756 VmValue::String(text) => text
1757 .parse::<f64>()
1758 .map(VmValue::Float)
1759 .unwrap_or_else(|_| default.clone()),
1760 _ => default.clone(),
1761 },
1762 VmValue::Bool(_) => match value {
1763 VmValue::Bool(_) => value.clone(),
1764 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1765 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1766 _ => default.clone(),
1767 },
1768 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1769 VmValue::Duration(_) => match value {
1770 VmValue::Duration(_) => value.clone(),
1771 VmValue::Int(ms) => VmValue::Duration(*ms),
1772 _ => default.clone(),
1773 },
1774 VmValue::Nil => value.clone(),
1775 _ => {
1776 if value.type_name() == default.type_name() {
1777 value.clone()
1778 } else {
1779 default.clone()
1780 }
1781 }
1782 }
1783}
1784
1785fn log_error(error: impl std::fmt::Display) -> VmError {
1786 VmError::Runtime(error.to_string())
1787}
1788
1789fn now_rfc3339() -> String {
1790 format_rfc3339(OffsetDateTime::now_utc())
1791}
1792
1793fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1794 timestamp
1795 .format(&Rfc3339)
1796 .unwrap_or_else(|_| timestamp.to_string())
1797}
1798
1799fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1800 time::Duration::try_from(duration)
1801 .ok()
1802 .map(|duration| format_rfc3339(requested_at + duration))
1803}
1804
1805fn new_trace_id() -> String {
1806 format!("trace_{}", Uuid::now_v7())
1807}
1808
1809fn vm_bool(value: &VmValue) -> Option<bool> {
1810 match value {
1811 VmValue::Bool(flag) => Some(*flag),
1812 _ => None,
1813 }
1814}
1815
1816fn vm_string(value: &VmValue) -> Option<&str> {
1817 match value {
1818 VmValue::String(text) => Some(text.as_ref()),
1819 _ => None,
1820 }
1821}
1822
1823fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1824 match value {
1825 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1826 _ => None,
1827 }
1828}
1829
1830#[cfg(test)]
1831mod tests {
1832 use super::{
1833 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1834 };
1835 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1836 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1837
1838 async fn execute_hitl_script(
1839 base_dir: &std::path::Path,
1840 source: &str,
1841 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1842 reset_thread_local_state();
1843 let log = install_default_for_base_dir(base_dir).expect("install event log");
1844 let chunk = compile_source(source).expect("compile source");
1845 let mut vm = Vm::new();
1846 register_vm_stdlib(&mut vm);
1847 vm.set_source_dir(base_dir);
1848 vm.execute(&chunk).await?;
1849 let output = vm.output().trim_end().to_string();
1850 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1851 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1852 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1853 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1854 Ok((
1855 output,
1856 question_events,
1857 approval_events,
1858 dual_control_events,
1859 escalation_events,
1860 ))
1861 }
1862
1863 async fn event_kinds(
1864 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1865 topic: &str,
1866 ) -> Vec<String> {
1867 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1868 .await
1869 .expect("read topic")
1870 .into_iter()
1871 .map(|(_, event)| event.kind)
1872 .collect()
1873 }
1874
1875 async fn event_payloads(
1876 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1877 topic: &str,
1878 ) -> Vec<serde_json::Value> {
1879 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1880 .await
1881 .expect("read topic")
1882 .into_iter()
1883 .map(|(_, event)| event.payload)
1884 .collect()
1885 }
1886
1887 #[tokio::test(flavor = "current_thread")]
1888 async fn ask_user_coerces_to_default_type_and_logs_events() {
1889 tokio::task::LocalSet::new()
1890 .run_until(async {
1891 let dir = tempfile::tempdir().expect("tempdir");
1892 let source = r#"
1893pipeline test(task) {
1894 host_mock("hitl", "question", {answer: "9"})
1895 let answer: int = ask_user("Pick a number", {default: 0})
1896 __io_println(answer)
1897}
1898"#;
1899 let (
1900 output,
1901 question_events,
1902 approval_events,
1903 dual_control_events,
1904 escalation_events,
1905 ) = execute_hitl_script(dir.path(), source)
1906 .await
1907 .expect("script succeeds");
1908 assert_eq!(output, "9");
1909 assert_eq!(
1910 question_events,
1911 vec![
1912 "hitl.question_asked".to_string(),
1913 "hitl.response_received".to_string()
1914 ]
1915 );
1916 assert!(approval_events.is_empty());
1917 assert!(dual_control_events.is_empty());
1918 assert!(escalation_events.is_empty());
1919 })
1920 .await;
1921 }
1922
1923 #[tokio::test(flavor = "current_thread")]
1924 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1925 tokio::task::LocalSet::new()
1926 .run_until(async {
1927 let dir = tempfile::tempdir().expect("tempdir");
1928 let source = r#"
1929pipeline test(task) {
1930 host_mock("hitl", "approval", [
1931 {approved: true, reviewer: "alice", reason: "ok"},
1932 {approved: true, reviewer: "bob", reason: "ship it"},
1933 ])
1934 let record = request_approval(
1935 "deploy production",
1936 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1937 )
1938 __io_println(record.approved)
1939 __io_println(len(record.reviewers))
1940 __io_println(record.reviewers[0])
1941 __io_println(record.reviewers[1])
1942}
1943"#;
1944 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1945 .await
1946 .expect("script succeeds");
1947 assert_eq!(
1948 approval_events,
1949 vec![
1950 "hitl.approval_requested".to_string(),
1951 "hitl.response_received".to_string(),
1952 "hitl.response_received".to_string(),
1953 "hitl.approval_approved".to_string(),
1954 ]
1955 );
1956 })
1957 .await;
1958 }
1959
1960 #[tokio::test(flavor = "current_thread")]
1961 async fn request_approval_emits_canonical_approval_request_payload() {
1962 tokio::task::LocalSet::new()
1963 .run_until(async {
1964 reset_thread_local_state();
1965 let dir = tempfile::tempdir().expect("tempdir");
1966 let log = install_default_for_base_dir(dir.path()).expect("install event log");
1967 let source = r#"
1968pipeline test(task) {
1969 host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
1970 request_approval("deploy production", {
1971 args: {environment: "prod"},
1972 quorum: 1,
1973 reviewers: ["alice"],
1974 evidence_refs: [{kind: "run", uri: "run_123"}],
1975 undo_metadata: {strategy: "rollback"},
1976 capabilities_requested: ["deploy.production"],
1977 })
1978}
1979"#;
1980 let chunk = compile_source(source).expect("compile source");
1981 let mut vm = Vm::new();
1982 register_vm_stdlib(&mut vm);
1983 vm.set_source_dir(dir.path());
1984 vm.execute(&chunk).await.expect("script succeeds");
1985
1986 let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
1987 let request_payload = &payloads[0]["payload"];
1988 let approval_request = &request_payload["approval_request"];
1989 assert_eq!(approval_request["id"], request_payload["id"]);
1990 assert_eq!(approval_request["action"], "deploy production");
1991 assert_eq!(approval_request["args"]["environment"], "prod");
1992 assert_eq!(approval_request["approvers_required"], 1);
1993 assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
1994 assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
1995 assert_eq!(
1996 approval_request["capabilities_requested"][0],
1997 "deploy.production"
1998 );
1999 assert!(approval_request["requested_at"].as_str().is_some());
2000 assert!(approval_request["deadline"].as_str().is_some());
2001 })
2002 .await;
2003 }
2004
2005 #[tokio::test(flavor = "current_thread")]
2006 async fn request_approval_surfaces_denials_as_typed_errors() {
2007 tokio::task::LocalSet::new()
2008 .run_until(async {
2009 let dir = tempfile::tempdir().expect("tempdir");
2010 let source = r#"
2011pipeline test(task) {
2012 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2013 let denied = try {
2014 request_approval("drop table", {reviewers: ["alice"]})
2015 }
2016 __io_println(is_err(denied))
2017 __io_println(unwrap_err(denied).name)
2018 __io_println(unwrap_err(denied).reason)
2019}
2020"#;
2021 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2022 .await
2023 .expect("script succeeds");
2024 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2025 assert_eq!(
2026 approval_events,
2027 vec![
2028 "hitl.approval_requested".to_string(),
2029 "hitl.response_received".to_string(),
2030 "hitl.approval_denied".to_string(),
2031 ]
2032 );
2033 })
2034 .await;
2035 }
2036
2037 #[tokio::test(flavor = "current_thread")]
2038 async fn dual_control_executes_action_after_quorum() {
2039 tokio::task::LocalSet::new()
2040 .run_until(async {
2041 let dir = tempfile::tempdir().expect("tempdir");
2042 let source = r#"
2043pipeline test(task) {
2044 host_mock("hitl", "dual_control", [
2045 {approved: true, reviewer: "alice"},
2046 {approved: true, reviewer: "bob"},
2047 ])
2048 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2049 __io_println(result)
2050}
2051"#;
2052 let (output, _, _, dual_control_events, _) =
2053 execute_hitl_script(dir.path(), source)
2054 .await
2055 .expect("script succeeds");
2056 assert_eq!(output, "launched");
2057 assert_eq!(
2058 dual_control_events,
2059 vec![
2060 "hitl.dual_control_requested".to_string(),
2061 "hitl.response_received".to_string(),
2062 "hitl.response_received".to_string(),
2063 "hitl.dual_control_approved".to_string(),
2064 "hitl.dual_control_executed".to_string(),
2065 ]
2066 );
2067 })
2068 .await;
2069 }
2070
2071 #[tokio::test(flavor = "current_thread")]
2072 async fn escalate_to_waits_for_acceptance_event() {
2073 tokio::task::LocalSet::new()
2074 .run_until(async {
2075 let dir = tempfile::tempdir().expect("tempdir");
2076 let source = r#"
2077pipeline test(task) {
2078 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2079 let handle = escalate_to("admin", "need override")
2080 __io_println(handle.status)
2081 __io_println(handle.reviewer)
2082}
2083"#;
2084 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2085 .await
2086 .expect("script succeeds");
2087 assert_eq!(output, "accepted\nlead");
2088 assert_eq!(
2089 escalation_events,
2090 vec![
2091 "hitl.escalation_issued".to_string(),
2092 "hitl.escalation_accepted".to_string(),
2093 ]
2094 );
2095 })
2096 .await;
2097 }
2098
2099 #[tokio::test(flavor = "current_thread")]
2105 async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2106 use std::sync::Mutex as StdMutex;
2107
2108 tokio::task::LocalSet::new()
2109 .run_until(async {
2110 let dir = tempfile::tempdir().expect("tempdir");
2111 let session_id = "hitl-session".to_string();
2112 let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2113 std::sync::Arc::new(StdMutex::new(Vec::new()));
2114
2115 struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2116 impl crate::agent_events::AgentEventSink for CaptureSink {
2117 fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2118 self.0.lock().expect("captured").push(event.clone());
2119 }
2120 }
2121
2122 crate::reset_thread_local_state();
2128 crate::event_log::install_default_for_base_dir(dir.path())
2129 .expect("install event log");
2130
2131 crate::agent_events::reset_all_sinks();
2132 let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2133 std::sync::Arc::new(CaptureSink(captured.clone()));
2134 crate::agent_events::register_sink(session_id.clone(), sink);
2135 crate::agent_sessions::open_or_create(Some(session_id.clone()));
2136 let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2137
2138 let source = r#"
2139pipeline test(task) {
2140 host_mock("hitl", "question", {answer: "ok"})
2141 let answer: string = ask_user("Are you sure?", {default: "no"})
2142 __io_println(answer)
2143}
2144"#;
2145 let chunk = crate::compile_source(source).expect("compile source");
2146 let mut vm = Vm::new();
2147 register_vm_stdlib(&mut vm);
2148 vm.set_source_dir(dir.path());
2149 vm.execute(&chunk).await.expect("script runs");
2150 assert_eq!(vm.output().trim_end(), "ok");
2151
2152 let events = captured.lock().expect("captured");
2153 let mut iter = events.iter().filter(|event| {
2154 matches!(
2155 event,
2156 crate::agent_events::AgentEvent::HitlRequested { .. }
2157 | crate::agent_events::AgentEvent::HitlResolved { .. }
2158 )
2159 });
2160 let requested = iter.next().expect("HitlRequested emitted");
2161 let resolved = iter.next().expect("HitlResolved emitted");
2162 assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2163
2164 let crate::agent_events::AgentEvent::HitlRequested {
2165 session_id: req_session,
2166 request_id: req_id,
2167 kind: req_kind,
2168 payload,
2169 } = requested
2170 else {
2171 panic!("expected HitlRequested, got: {requested:?}");
2172 };
2173 assert_eq!(req_session, &session_id);
2174 assert_eq!(req_kind, "question");
2175 assert!(req_id.starts_with("hitl_question_"));
2176 assert_eq!(payload["prompt"], "Are you sure?");
2177
2178 let crate::agent_events::AgentEvent::HitlResolved {
2179 request_id: res_id,
2180 kind: res_kind,
2181 outcome,
2182 ..
2183 } = resolved
2184 else {
2185 panic!("expected HitlResolved, got: {resolved:?}");
2186 };
2187 assert_eq!(res_id, req_id);
2188 assert_eq!(res_kind, "question");
2189 assert_eq!(outcome, "answered");
2190
2191 drop(_guard);
2192 crate::agent_events::reset_all_sinks();
2193 })
2194 .await;
2195 }
2196}