1use crate::value::VmDictExt;
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::runtime_limits::RuntimeLimits;
20use crate::schema::schema_expect_value;
21use crate::stdlib::host::dispatch_mock_host_call;
22use crate::stdlib::macros::{harn_builtin, BuiltinSignature, Param, VmBuiltinDef, TY_ANY, TY_DICT};
23use crate::stdlib::options::{duration_from_value, ErrorKind};
24use crate::stdlib::waitpoint::{
25 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
26 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
27 WaitpointWaitOptions,
28};
29use crate::triggers::dispatcher::current_dispatch_context;
30use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
31use crate::vm::{AsyncBuiltinCtx, Vm};
32
33const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
34const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
35const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
36
37pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
38pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
39pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
40pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
41
42thread_local! {
43 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
44}
45
46#[derive(Default)]
47pub(crate) struct RequestSequenceState {
48 pub(crate) instance_key: String,
49 pub(crate) next_seq: u64,
50}
51
52#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub enum HitlRequestKind {
55 Question,
56 Approval,
57 DualControl,
58 Escalation,
59}
60
61impl HitlRequestKind {
62 pub(crate) fn as_str(self) -> &'static str {
63 match self {
64 Self::Question => "question",
65 Self::Approval => "approval",
66 Self::DualControl => "dual_control",
67 Self::Escalation => "escalation",
68 }
69 }
70
71 fn topic(self) -> &'static str {
72 match self {
73 Self::Question => HITL_QUESTIONS_TOPIC,
74 Self::Approval => HITL_APPROVALS_TOPIC,
75 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
76 Self::Escalation => HITL_ESCALATIONS_TOPIC,
77 }
78 }
79
80 fn request_event_kind(self) -> &'static str {
81 match self {
82 Self::Question => "hitl.question_asked",
83 Self::Approval => "hitl.approval_requested",
84 Self::DualControl => "hitl.dual_control_requested",
85 Self::Escalation => "hitl.escalation_issued",
86 }
87 }
88
89 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
90 if request_id.starts_with("hitl_question_") {
91 Some(Self::Question)
92 } else if request_id.starts_with("hitl_approval_") {
93 Some(Self::Approval)
94 } else if request_id.starts_with("hitl_dual_control_") {
95 Some(Self::DualControl)
96 } else if request_id.starts_with("hitl_escalation_") {
97 Some(Self::Escalation)
98 } else {
99 None
100 }
101 }
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize)]
105pub struct HitlHostResponse {
106 pub request_id: String,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub answer: Option<JsonValue>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub approved: Option<bool>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub accepted: Option<bool>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub reviewer: Option<String>,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 pub reason: Option<String>,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub metadata: Option<JsonValue>,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub responded_at: Option<String>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub signature: Option<String>,
123}
124
125#[derive(Clone, Debug, Serialize, Deserialize)]
126struct HitlRequestEnvelope {
127 request_id: String,
128 kind: HitlRequestKind,
129 #[serde(default)]
130 agent: String,
131 trace_id: String,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 run_id: Option<String>,
134 requested_at: String,
135 payload: JsonValue,
136}
137
138#[derive(Clone, Debug, Serialize, Deserialize)]
139struct HitlTimeoutRecord {
140 request_id: String,
141 kind: HitlRequestKind,
142 trace_id: String,
143 timed_out_at: String,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147pub struct ApprovalRequest {
148 pub id: String,
149 pub action: String,
150 #[serde(default)]
151 pub args: JsonValue,
152 pub principal: String,
153 pub requested_at: String,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub deadline: Option<String>,
156 pub approvers_required: u32,
157 #[serde(default)]
158 pub evidence_refs: Vec<JsonValue>,
159 #[serde(default)]
160 pub undo_metadata: JsonValue,
161 #[serde(default)]
162 pub capabilities_requested: Vec<String>,
163}
164
165impl ApprovalRequest {
166 pub fn new(
167 id: impl Into<String>,
168 action: impl Into<String>,
169 args: JsonValue,
170 principal: impl Into<String>,
171 requested_at: impl Into<String>,
172 ) -> Self {
173 Self {
174 id: id.into(),
175 action: action.into(),
176 args,
177 principal: principal.into(),
178 requested_at: requested_at.into(),
179 deadline: None,
180 approvers_required: 1,
181 evidence_refs: Vec::new(),
182 undo_metadata: JsonValue::Null,
183 capabilities_requested: Vec::new(),
184 }
185 }
186}
187
188pub(crate) fn approval_request_for_host_permission(
189 id: impl Into<String>,
190 action: impl Into<String>,
191 args: JsonValue,
192 principal: impl Into<String>,
193 evidence_refs: Vec<JsonValue>,
194 undo_metadata: JsonValue,
195 capabilities_requested: Vec<String>,
196) -> ApprovalRequest {
197 let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
198 request.evidence_refs = evidence_refs;
199 request.undo_metadata = undo_metadata;
200 request.capabilities_requested = capabilities_requested;
201 request
202}
203
204#[derive(Clone, Debug)]
205struct DispatchKeys {
206 instance_key: String,
207 stable_base: String,
208 agent: String,
209 trace_id: String,
210}
211
212#[derive(Clone, Debug)]
213struct AskUserOptions {
214 schema: Option<VmValue>,
215 timeout: Option<StdDuration>,
216 default: Option<VmValue>,
217}
218
219#[derive(Clone, Debug)]
220struct ApprovalOptions {
221 detail: Option<VmValue>,
222 args: Option<VmValue>,
223 quorum: u32,
224 reviewers: Vec<String>,
225 deadline: StdDuration,
226 principal: Option<String>,
227 evidence_refs: Vec<JsonValue>,
228 undo_metadata: Option<JsonValue>,
229 capabilities_requested: Vec<String>,
230}
231
232#[derive(Clone, Debug)]
233struct ApprovalProgress {
234 request_id: String,
235 reviewers: BTreeSet<String>,
236 signatures: Vec<ApprovalSignature>,
237 reason: Option<String>,
238 approved_at: Option<String>,
239}
240
241#[derive(Clone, Debug, Serialize)]
242struct ApprovalSignature {
243 reviewer: String,
244 signed_at: String,
245 signature: String,
246}
247
248#[derive(Clone, Debug)]
249enum ApprovalResolution {
250 Pending,
251 Approved(ApprovalProgress),
252 Denied(HitlHostResponse),
253}
254
255#[allow(clippy::large_enum_variant)]
263#[derive(Clone, Debug)]
264enum WaitpointOutcome {
265 Completed(WaitpointRecord),
266 Timeout,
267 Cancelled {
268 wait_id: String,
269 waitpoint_ids: Vec<String>,
270 reason: Option<String>,
271 },
272}
273
274pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
275 for def in MODULE_BUILTINS {
276 vm.register_builtin_def(def);
277 }
278}
279
280pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
281 &ASK_USER_BUILTIN_DEF,
282 &REQUEST_APPROVAL_BUILTIN_DEF,
283 &DUAL_CONTROL_BUILTIN_DEF,
284 &ESCALATE_TO_BUILTIN_DEF,
285];
286
287#[harn_builtin(
288 sig = "ask_user(prompt: string, options?: dict) -> any",
289 kind = "async",
290 category = "hitl"
291)]
292async fn ask_user_builtin(
293 ctx: crate::vm::AsyncBuiltinCtx,
294 args: Vec<VmValue>,
295) -> Result<VmValue, VmError> {
296 ask_user_impl(Some(&ctx), &args).await
297}
298
299#[harn_builtin(
300 sig_expr = BuiltinSignature::variadic("request_approval", &[Param::new("args", TY_ANY)], TY_DICT),
301 kind = "async",
302 category = "hitl"
303)]
304async fn request_approval_builtin(
305 ctx: crate::vm::AsyncBuiltinCtx,
306 args: Vec<VmValue>,
307) -> Result<VmValue, VmError> {
308 request_approval_impl(Some(&ctx), &args).await
309}
310
311#[harn_builtin(
312 sig = "dual_control(n: int, m: int, action: closure, approvers?: list) -> dict",
313 kind = "async",
314 category = "hitl"
315)]
316async fn dual_control_builtin(
317 ctx: crate::vm::AsyncBuiltinCtx,
318 args: Vec<VmValue>,
319) -> Result<VmValue, VmError> {
320 dual_control_impl(&ctx, &args).await
321}
322
323#[harn_builtin(
324 sig = "escalate_to(role: string, reason: string) -> dict",
325 kind = "async",
326 category = "hitl"
327)]
328async fn escalate_to_builtin(
329 ctx: crate::vm::AsyncBuiltinCtx,
330 args: Vec<VmValue>,
331) -> Result<VmValue, VmError> {
332 escalate_to_impl(Some(&ctx), &args).await
333}
334
335pub(crate) fn reset_hitl_state() {
336 REQUEST_SEQUENCE.with(|slot| {
337 *slot.borrow_mut() = RequestSequenceState::default();
338 });
339}
340
341pub(crate) fn take_hitl_state() -> RequestSequenceState {
342 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
343}
344
345pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
346 REQUEST_SEQUENCE.with(|slot| {
347 *slot.borrow_mut() = state;
348 });
349}
350
351pub async fn append_hitl_response(
352 base_dir: Option<&Path>,
353 mut response: HitlHostResponse,
354) -> Result<u64, String> {
355 let kind = HitlRequestKind::from_request_id(&response.request_id)
356 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
357 if response.responded_at.is_none() {
358 response.responded_at = Some(now_rfc3339());
359 }
360 let log = ensure_hitl_event_log_for(base_dir)?;
361 let headers = response_headers(&response.request_id);
362 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
363 let event_id = log
364 .append(
365 &topic,
366 LogEvent::new(
367 match kind {
368 HitlRequestKind::Escalation => "hitl.escalation_accepted",
369 _ => "hitl.response_received",
370 },
371 serde_json::to_value(&response).map_err(|error| error.to_string())?,
372 )
373 .with_headers(headers),
374 )
375 .await
376 .map_err(|error| error.to_string())?;
377 finalize_hitl_response(&log, kind, &response).await?;
378 Ok(event_id)
379}
380
381pub async fn append_approval_request_on(
382 log: &Arc<AnyEventLog>,
383 agent: impl Into<String>,
384 trace_id: impl Into<String>,
385 action: impl Into<String>,
386 detail: JsonValue,
387 reviewers: Vec<String>,
388) -> Result<String, VmError> {
389 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
390 let trace_id = trace_id.into();
391 let agent = agent.into();
392 let requested_at_time = OffsetDateTime::now_utc();
393 let requested_at = format_rfc3339(requested_at_time);
394 let mut approval_request = ApprovalRequest::new(
395 request_id.clone(),
396 action.into(),
397 detail.clone(),
398 agent.clone(),
399 requested_at.clone(),
400 );
401 approval_request.deadline = deadline_after(
402 requested_at_time,
403 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
404 );
405 approval_request.approvers_required = 1;
406 let approval_request_json = serde_json::to_value(&approval_request)
407 .map_err(|error| VmError::Runtime(error.to_string()))?;
408 let request = HitlRequestEnvelope {
409 request_id: request_id.clone(),
410 kind: HitlRequestKind::Approval,
411 agent,
412 trace_id: trace_id.clone(),
413 run_id: None,
414 requested_at: requested_at.clone(),
415 payload: json!({
416 "approval_request": approval_request_json,
417 "id": approval_request.id,
418 "action": approval_request.action,
419 "args": approval_request.args,
420 "principal": approval_request.principal,
421 "requested_at": requested_at,
422 "deadline": approval_request.deadline,
423 "approvers_required": approval_request.approvers_required,
424 "evidence_refs": approval_request.evidence_refs,
425 "undo_metadata": approval_request.undo_metadata,
426 "capabilities_requested": approval_request.capabilities_requested,
427 "detail": detail,
428 "quorum": 1,
429 "reviewers": reviewers,
430 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
431 }),
432 };
433 create_request_waitpoint(log, &request).await?;
434 append_request(log, &request).await?;
435 maybe_notify_host(None, &request);
436 Ok(request_id)
437}
438
439async fn ask_user_impl(
440 ctx: Option<&AsyncBuiltinCtx>,
441 args: &[VmValue],
442) -> Result<VmValue, VmError> {
443 let prompt = required_string_arg(args, 0, "ask_user")?;
444 let options = parse_ask_user_options(args.get(1))?;
445 let keys = current_dispatch_keys();
446 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
447 let trace_id = keys
448 .as_ref()
449 .map(|keys| keys.trace_id.clone())
450 .unwrap_or_else(new_trace_id);
451 let log = ensure_hitl_event_log();
452 let request = HitlRequestEnvelope {
453 request_id: request_id.clone(),
454 kind: HitlRequestKind::Question,
455 agent: keys
456 .as_ref()
457 .map(|keys| keys.agent.clone())
458 .unwrap_or_default(),
459 trace_id: trace_id.clone(),
460 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
461 requested_at: now_rfc3339(),
462 payload: json!({
463 "prompt": prompt,
464 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
465 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
466 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
467 }),
468 };
469 create_request_waitpoint(&log, &request).await?;
470 append_request(&log, &request).await?;
471 maybe_notify_host(ctx, &request);
472 emit_hitl_requested(&request);
473 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
474
475 match wait_for_request_waitpoint_with_events(
476 &request_id,
477 HitlRequestKind::Question,
478 options.timeout,
479 )
480 .await?
481 {
482 WaitpointOutcome::Completed(record) => {
483 let answer = record
484 .value
485 .as_ref()
486 .map(crate::stdlib::json_to_vm_value)
487 .unwrap_or(VmValue::Nil);
488 if let Some(schema) = options.schema.as_ref() {
489 return schema_expect_value(&answer, schema, true);
490 }
491 if let Some(default) = options.default.as_ref() {
492 return Ok(coerce_like_default(&answer, default));
493 }
494 Ok(answer)
495 }
496 WaitpointOutcome::Timeout => {
497 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
498 if let Some(default) = options.default {
499 return Ok(default);
500 }
501 Err(timeout_error(&request_id, HitlRequestKind::Question))
502 }
503 WaitpointOutcome::Cancelled {
504 wait_id,
505 waitpoint_ids,
506 reason,
507 } => Err(hitl_cancelled_error(
508 &request_id,
509 HitlRequestKind::Question,
510 &wait_id,
511 &waitpoint_ids,
512 reason,
513 )),
514 }
515}
516
517async fn request_approval_impl(
518 ctx: Option<&AsyncBuiltinCtx>,
519 args: &[VmValue],
520) -> Result<VmValue, VmError> {
521 let action = required_string_arg(args, 0, "request_approval")?;
522 let options = parse_approval_options(args.get(1), "request_approval")?;
523 let keys = current_dispatch_keys();
524 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
525 let trace_id = keys
526 .as_ref()
527 .map(|keys| keys.trace_id.clone())
528 .unwrap_or_else(new_trace_id);
529 let agent = keys
530 .as_ref()
531 .map(|keys| keys.agent.clone())
532 .unwrap_or_default();
533 let requested_at_time = OffsetDateTime::now_utc();
534 let requested_at = format_rfc3339(requested_at_time);
535 let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
536 let approval_args = options
537 .args
538 .as_ref()
539 .or(options.detail.as_ref())
540 .map(crate::llm::vm_value_to_json)
541 .unwrap_or(JsonValue::Null);
542 let mut approval_request = ApprovalRequest::new(
543 request_id.clone(),
544 action.clone(),
545 approval_args,
546 principal,
547 requested_at.clone(),
548 );
549 approval_request.deadline = deadline_after(requested_at_time, options.deadline);
550 approval_request.approvers_required = options.quorum;
551 approval_request.evidence_refs = options.evidence_refs.clone();
552 approval_request.undo_metadata = options
553 .undo_metadata
554 .clone()
555 .or_else(|| {
556 crate::orchestration::current_mutation_session()
557 .and_then(|session| serde_json::to_value(session).ok())
558 })
559 .unwrap_or(JsonValue::Null);
560 approval_request.capabilities_requested = options.capabilities_requested.clone();
561 let approval_request_json = serde_json::to_value(&approval_request)
562 .map_err(|error| VmError::Runtime(error.to_string()))?;
563 let log = ensure_hitl_event_log();
564 let request = HitlRequestEnvelope {
565 request_id: request_id.clone(),
566 kind: HitlRequestKind::Approval,
567 agent,
568 trace_id: trace_id.clone(),
569 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
570 requested_at: requested_at.clone(),
571 payload: json!({
572 "approval_request": approval_request_json,
573 "id": approval_request.id,
574 "action": action,
575 "args": approval_request.args,
576 "principal": approval_request.principal,
577 "requested_at": requested_at,
578 "deadline": approval_request.deadline,
579 "approvers_required": approval_request.approvers_required,
580 "evidence_refs": approval_request.evidence_refs,
581 "undo_metadata": approval_request.undo_metadata,
582 "capabilities_requested": approval_request.capabilities_requested,
583 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
584 "quorum": options.quorum,
585 "reviewers": options.reviewers,
586 "deadline_ms": options.deadline.as_millis() as u64,
587 }),
588 };
589 create_request_waitpoint(&log, &request).await?;
590 append_request(&log, &request).await?;
591 maybe_notify_host(ctx, &request);
592 emit_hitl_requested(&request);
593 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
594
595 match wait_for_request_waitpoint_with_events(
596 &request_id,
597 HitlRequestKind::Approval,
598 Some(options.deadline),
599 )
600 .await?
601 {
602 WaitpointOutcome::Completed(record) => {
603 approval_record_from_waitpoint(&record, "request_approval")
604 }
605 WaitpointOutcome::Timeout => {
606 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
607 Err(timeout_error(&request_id, HitlRequestKind::Approval))
608 }
609 WaitpointOutcome::Cancelled { .. } => {
610 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
611 }
612 }
613}
614
615pub(crate) async fn request_approval_for_side_effect(
616 action: &str,
617 detail: JsonValue,
618 principal: String,
619 reviewers: Vec<String>,
620 capabilities_requested: Vec<String>,
621) -> Result<VmValue, VmError> {
622 let mut options = crate::value::DictMap::new();
623 options.insert(
624 crate::value::intern_key("args"),
625 crate::stdlib::json_to_vm_value(&detail),
626 );
627 options.insert(
628 crate::value::intern_key("detail"),
629 crate::stdlib::json_to_vm_value(&detail),
630 );
631 options.put_str("principal", principal);
632 options.insert(
633 crate::value::intern_key("reviewers"),
634 VmValue::List(std::sync::Arc::new(
635 reviewers
636 .into_iter()
637 .map(|reviewer| VmValue::String(arcstr::ArcStr::from(reviewer)))
638 .collect(),
639 )),
640 );
641 options.insert(
642 crate::value::intern_key("capabilities_requested"),
643 VmValue::List(std::sync::Arc::new(
644 capabilities_requested
645 .into_iter()
646 .map(|capability| VmValue::String(arcstr::ArcStr::from(capability)))
647 .collect(),
648 )),
649 );
650 let args = vec![
651 VmValue::String(arcstr::ArcStr::from(action.to_string())),
652 VmValue::dict(options),
653 ];
654 request_approval_impl(None, &args).await
655}
656
657async fn dual_control_impl(ctx: &AsyncBuiltinCtx, args: &[VmValue]) -> Result<VmValue, VmError> {
658 let n = required_positive_int_arg(args, 0, "dual_control")?;
659 let m = required_positive_int_arg(args, 1, "dual_control")?;
660 if n > m {
661 return Err(VmError::Runtime(
662 "dual_control: n must be less than or equal to m".to_string(),
663 ));
664 }
665 let action = args
666 .get(2)
667 .and_then(|value| match value {
668 VmValue::Closure(closure) => Some(closure.clone()),
669 _ => None,
670 })
671 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
672 let approvers = optional_string_list(args.get(3), "dual_control")?;
673 if !approvers.is_empty() && approvers.len() < m as usize {
674 return Err(VmError::Runtime(format!(
675 "dual_control: expected at least {m} approvers, got {}",
676 approvers.len()
677 )));
678 }
679
680 let keys = current_dispatch_keys();
681 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
682 let trace_id = keys
683 .as_ref()
684 .map(|keys| keys.trace_id.clone())
685 .unwrap_or_else(new_trace_id);
686 let action_name = if action.func.name.is_empty() {
687 "anonymous".to_string()
688 } else {
689 action.func.name.clone()
690 };
691 let agent = keys
692 .as_ref()
693 .map(|keys| keys.agent.clone())
694 .unwrap_or_default();
695 let requested_at_time = OffsetDateTime::now_utc();
696 let requested_at = format_rfc3339(requested_at_time);
697 let mut approval_request = ApprovalRequest::new(
698 request_id.clone(),
699 action_name.clone(),
700 json!({
701 "n": n,
702 "m": m,
703 "approvers": approvers.clone(),
704 }),
705 agent.clone(),
706 requested_at.clone(),
707 );
708 approval_request.deadline = deadline_after(
709 requested_at_time,
710 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
711 );
712 approval_request.approvers_required = n as u32;
713 approval_request.undo_metadata = crate::orchestration::current_mutation_session()
714 .and_then(|session| serde_json::to_value(session).ok())
715 .unwrap_or(JsonValue::Null);
716 let approval_request_json = serde_json::to_value(&approval_request)
717 .map_err(|error| VmError::Runtime(error.to_string()))?;
718 let log = ensure_hitl_event_log();
719 let request = HitlRequestEnvelope {
720 request_id: request_id.clone(),
721 kind: HitlRequestKind::DualControl,
722 agent,
723 trace_id: trace_id.clone(),
724 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
725 requested_at: requested_at.clone(),
726 payload: json!({
727 "approval_request": approval_request_json,
728 "id": approval_request.id,
729 "args": approval_request.args,
730 "principal": approval_request.principal,
731 "requested_at": requested_at,
732 "deadline": approval_request.deadline,
733 "approvers_required": approval_request.approvers_required,
734 "evidence_refs": approval_request.evidence_refs,
735 "undo_metadata": approval_request.undo_metadata,
736 "capabilities_requested": approval_request.capabilities_requested,
737 "n": n,
738 "m": m,
739 "action": action_name,
740 "approvers": approvers,
741 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
742 }),
743 };
744 create_request_waitpoint(&log, &request).await?;
745 append_request(&log, &request).await?;
746 maybe_notify_host(Some(ctx), &request);
747 emit_hitl_requested(&request);
748 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
749
750 match wait_for_request_waitpoint_with_events(
751 &request_id,
752 HitlRequestKind::DualControl,
753 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
754 )
755 .await?
756 {
757 WaitpointOutcome::Completed(record) => {
758 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
759 let mut vm = ctx.child_vm();
760 let result = vm.call_closure_pub(&action, &[]).await?;
761 ctx.forward_output(&vm.take_output());
762
763 append_named_event(
764 &log,
765 HitlRequestKind::DualControl,
766 "hitl.dual_control_executed",
767 &request_id,
768 &trace_id,
769 json!({
770 "request_id": request_id,
771 "result": crate::llm::vm_value_to_json(&result),
772 }),
773 )
774 .await?;
775
776 Ok(result)
777 }
778 WaitpointOutcome::Timeout => {
779 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
780 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
781 }
782 WaitpointOutcome::Cancelled { .. } => {
783 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
784 }
785 }
786}
787
788async fn escalate_to_impl(
789 ctx: Option<&AsyncBuiltinCtx>,
790 args: &[VmValue],
791) -> Result<VmValue, VmError> {
792 let role = required_string_arg(args, 0, "escalate_to")?;
793 let reason = required_string_arg(args, 1, "escalate_to")?;
794 let keys = current_dispatch_keys();
795 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
796 let trace_id = keys
797 .as_ref()
798 .map(|keys| keys.trace_id.clone())
799 .unwrap_or_else(new_trace_id);
800 let log = ensure_hitl_event_log();
801 let request = HitlRequestEnvelope {
802 request_id: request_id.clone(),
803 kind: HitlRequestKind::Escalation,
804 agent: keys
805 .as_ref()
806 .map(|keys| keys.agent.clone())
807 .unwrap_or_default(),
808 trace_id: trace_id.clone(),
809 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
810 requested_at: now_rfc3339(),
811 payload: json!({
812 "role": role,
813 "reason": reason,
814 "capability_policy": escalation_capability_policy(),
815 }),
816 };
817 create_request_waitpoint(&log, &request).await?;
818 append_request(&log, &request).await?;
819 maybe_notify_host(ctx, &request);
820 emit_hitl_requested(&request);
821 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
822
823 match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
824 .await?
825 {
826 WaitpointOutcome::Completed(record) => {
827 let accepted_at = record.completed_at.clone();
828 let reviewer = record.completed_by.clone();
829 let accepted = record
830 .value
831 .as_ref()
832 .and_then(|value| value.get("accepted"))
833 .and_then(JsonValue::as_bool)
834 .unwrap_or(true);
835 Ok(crate::stdlib::json_to_vm_value(&json!({
836 "request_id": request_id,
837 "role": role,
838 "reason": reason,
839 "trace_id": trace_id,
840 "status": if accepted { "accepted" } else { "pending" },
841 "accepted_at": accepted_at,
842 "reviewer": reviewer,
843 })))
844 }
845 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
846 WaitpointOutcome::Cancelled {
847 wait_id,
848 waitpoint_ids,
849 reason,
850 } => Err(hitl_cancelled_error(
851 &request_id,
852 HitlRequestKind::Escalation,
853 &wait_id,
854 &waitpoint_ids,
855 reason,
856 )),
857 }
858}
859
860async fn create_request_waitpoint(
861 log: &Arc<AnyEventLog>,
862 request: &HitlRequestEnvelope,
863) -> Result<(), VmError> {
864 create_waitpoint_on(
865 log,
866 Some(request.request_id.clone()),
867 Some(json!({
868 "kind": request.kind.as_str(),
869 "agent": request.agent.clone(),
870 "trace_id": request.trace_id.clone(),
871 "requested_at": request.requested_at.clone(),
872 "payload": request.payload.clone(),
873 })),
874 )
875 .await?;
876 Ok(())
877}
878
879async fn wait_for_request_waitpoint(
880 request_id: &str,
881 timeout: Option<StdDuration>,
882) -> Result<WaitpointOutcome, VmError> {
883 match wait_on_waitpoints(
884 vec![request_id.to_string()],
885 WaitpointWaitOptions { timeout },
886 )
887 .await
888 {
889 Ok(records) => Ok(WaitpointOutcome::Completed(
890 records
891 .into_iter()
892 .next()
893 .expect("single waitpoint wait result"),
894 )),
895 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
896 Err(WaitpointWaitFailure::Cancelled {
897 wait_id,
898 waitpoint_ids,
899 reason,
900 }) => Ok(WaitpointOutcome::Cancelled {
901 wait_id,
902 waitpoint_ids,
903 reason,
904 }),
905 Err(WaitpointWaitFailure::Vm(error)) => {
906 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
907 return Ok(outcome);
908 }
909 Err(error)
910 }
911 }
912}
913
914fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
915 let VmError::Thrown(VmValue::Dict(dict)) = error else {
916 return None;
917 };
918 let name = dict.get("name").and_then(vm_string)?;
919 match name {
920 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
921 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
922 wait_id: dict
923 .get("wait_id")
924 .and_then(vm_string)
925 .unwrap_or_default()
926 .to_string(),
927 waitpoint_ids: dict
928 .get("waitpoint_ids")
929 .and_then(vm_string_list)
930 .unwrap_or_default(),
931 reason: dict
932 .get("reason")
933 .and_then(vm_string)
934 .map(ToString::to_string),
935 }),
936 _ => None,
937 }
938}
939
940async fn finalize_hitl_response(
941 log: &Arc<AnyEventLog>,
942 kind: HitlRequestKind,
943 response: &HitlHostResponse,
944) -> Result<(), String> {
945 match kind {
946 HitlRequestKind::Question => {
947 if waitpoint_is_terminal(log, &response.request_id).await? {
948 return Ok(());
949 }
950 complete_waitpoint_on(
951 log,
952 &response.request_id,
953 response.answer.clone(),
954 response.reviewer.clone(),
955 response.reason.clone(),
956 response.metadata.clone(),
957 )
958 .await
959 .map(|_| ())
960 .map_err(|error| error.to_string())
961 }
962 HitlRequestKind::Escalation => {
963 if !response.accepted.unwrap_or(false)
964 || waitpoint_is_terminal(log, &response.request_id).await?
965 {
966 return Ok(());
967 }
968 complete_waitpoint_on(
969 log,
970 &response.request_id,
971 Some(json!({
972 "accepted": true,
973 "reviewer": response.reviewer,
974 "reason": response.reason,
975 "responded_at": response.responded_at,
976 })),
977 response.reviewer.clone(),
978 response.reason.clone(),
979 response.metadata.clone(),
980 )
981 .await
982 .map(|_| ())
983 .map_err(|error| error.to_string())
984 }
985 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
986 if waitpoint_is_terminal(log, &response.request_id).await? {
987 return Ok(());
988 }
989 let request = load_request_envelope(log, kind, &response.request_id)
990 .await
991 .map_err(|error| error.to_string())?;
992 match resolve_approval_state(log, kind, &request)
993 .await
994 .map_err(|error| error.to_string())?
995 {
996 ApprovalResolution::Pending => Ok(()),
997 ApprovalResolution::Approved(progress) => {
998 let record = approval_record_json(&progress);
999 append_named_event(
1000 log,
1001 kind,
1002 approved_event_kind(kind),
1003 &request.request_id,
1004 &request.trace_id,
1005 json!({
1006 "request_id": request.request_id.clone(),
1007 "record": record.clone(),
1008 }),
1009 )
1010 .await
1011 .map_err(|error| error.to_string())?;
1012 complete_waitpoint_on(
1013 log,
1014 &request.request_id,
1015 Some(record),
1016 response.reviewer.clone(),
1017 progress.reason.clone(),
1018 response.metadata.clone(),
1019 )
1020 .await
1021 .map(|_| ())
1022 .map_err(|error| error.to_string())
1023 }
1024 ApprovalResolution::Denied(denied) => {
1025 append_named_event(
1026 log,
1027 kind,
1028 denied_event_kind(kind),
1029 &request.request_id,
1030 &request.trace_id,
1031 json!({
1032 "request_id": request.request_id.clone(),
1033 "reviewer": denied.reviewer.clone(),
1034 "reason": denied.reason.clone(),
1035 }),
1036 )
1037 .await
1038 .map_err(|error| error.to_string())?;
1039 cancel_waitpoint_on(
1040 log,
1041 &request.request_id,
1042 denied.reviewer.clone(),
1043 denied.reason.clone(),
1044 denied.metadata.clone(),
1045 )
1046 .await
1047 .map(|_| ())
1048 .map_err(|error| error.to_string())
1049 }
1050 }
1051 }
1052 }
1053}
1054
1055async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1056 Ok(inspect_waitpoint_on(log, request_id)
1057 .await
1058 .map_err(|error| error.to_string())?
1059 .is_some_and(|record| record.status != WaitpointStatus::Open))
1060}
1061
1062async fn load_request_envelope(
1063 log: &Arc<AnyEventLog>,
1064 kind: HitlRequestKind,
1065 request_id: &str,
1066) -> Result<HitlRequestEnvelope, VmError> {
1067 let topic = topic(kind)?;
1068 let events = log
1069 .read_range(&topic, None, usize::MAX)
1070 .await
1071 .map_err(log_error)?;
1072 events
1073 .into_iter()
1074 .filter(|(_, event)| event.kind == kind.request_event_kind())
1075 .find_map(|(_, event)| {
1076 if !event_matches_request(&event, request_id) {
1077 return None;
1078 }
1079 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1080 })
1081 .ok_or_else(|| {
1082 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1083 })
1084}
1085
1086async fn resolve_approval_state(
1087 log: &Arc<AnyEventLog>,
1088 kind: HitlRequestKind,
1089 request: &HitlRequestEnvelope,
1090) -> Result<ApprovalResolution, VmError> {
1091 let quorum = approval_quorum_from_request(kind, request)?;
1092 let allowed_reviewers = approval_reviewers_from_request(kind, request)
1093 .into_iter()
1094 .collect::<BTreeSet<_>>();
1095 let mut progress = ApprovalProgress {
1096 request_id: request.request_id.clone(),
1097 reviewers: BTreeSet::new(),
1098 signatures: Vec::new(),
1099 reason: None,
1100 approved_at: None,
1101 };
1102 let topic = topic(kind)?;
1103 let events = log
1104 .read_range(&topic, None, usize::MAX)
1105 .await
1106 .map_err(log_error)?;
1107 for (_, event) in events {
1108 if !event_matches_request(&event, &request.request_id)
1109 || event.kind != "hitl.response_received"
1110 {
1111 continue;
1112 }
1113 let response: HitlHostResponse = serde_json::from_value(event.payload)
1114 .map_err(|error| VmError::Runtime(error.to_string()))?;
1115 if let Some(reviewer) = response.reviewer.as_deref() {
1116 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1117 continue;
1118 }
1119 if progress.reviewers.contains(reviewer) {
1120 continue;
1121 }
1122 }
1123 if response.approved.unwrap_or(false) {
1124 if let Some(reviewer) = response.reviewer.clone() {
1125 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1126 progress.reviewers.insert(reviewer.clone());
1127 progress.signatures.push(ApprovalSignature {
1128 reviewer: reviewer.clone(),
1129 signed_at: signed_at.clone(),
1130 signature: response.signature.clone().unwrap_or_else(|| {
1131 approval_receipt_signature(
1132 &request.request_id,
1133 &reviewer,
1134 &signed_at,
1135 true,
1136 response.reason.as_deref(),
1137 )
1138 }),
1139 });
1140 }
1141 progress.reason = response.reason.clone();
1142 progress.approved_at = response.responded_at.clone();
1143 if progress.reviewers.len() as u32 >= quorum {
1144 return Ok(ApprovalResolution::Approved(progress));
1145 }
1146 continue;
1147 }
1148 return Ok(ApprovalResolution::Denied(response));
1149 }
1150 Ok(ApprovalResolution::Pending)
1151}
1152
1153fn approval_quorum_from_request(
1154 kind: HitlRequestKind,
1155 request: &HitlRequestEnvelope,
1156) -> Result<u32, VmError> {
1157 let key = match kind {
1158 HitlRequestKind::DualControl => "n",
1159 _ => "quorum",
1160 };
1161 let quorum = request
1162 .payload
1163 .get(key)
1164 .or_else(|| request.payload.get("approvers_required"))
1165 .or_else(|| {
1166 request
1167 .payload
1168 .get("approval_request")
1169 .and_then(|approval| approval.get("approvers_required"))
1170 })
1171 .and_then(JsonValue::as_u64)
1172 .unwrap_or(1);
1173 u32::try_from(quorum).map_err(|_| {
1174 VmError::Runtime(format!(
1175 "invalid quorum in HITL request '{}'",
1176 request.request_id
1177 ))
1178 })
1179}
1180
1181fn approval_reviewers_from_request(
1182 kind: HitlRequestKind,
1183 request: &HitlRequestEnvelope,
1184) -> Vec<String> {
1185 let key = match kind {
1186 HitlRequestKind::DualControl => "approvers",
1187 _ => "reviewers",
1188 };
1189 request
1190 .payload
1191 .get(key)
1192 .or_else(|| {
1193 request
1194 .payload
1195 .get("approval_request")
1196 .and_then(|approval| approval.get("reviewers"))
1197 })
1198 .and_then(JsonValue::as_array)
1199 .map(|values| {
1200 values
1201 .iter()
1202 .filter_map(JsonValue::as_str)
1203 .map(str::to_string)
1204 .collect()
1205 })
1206 .unwrap_or_default()
1207}
1208
1209fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1210 json!({
1211 "request_id": progress.request_id.clone(),
1212 "approved": true,
1213 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1214 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1215 "reason": progress.reason,
1216 "signatures": progress.signatures,
1217 })
1218}
1219
1220fn approval_receipt_signature(
1221 request_id: &str,
1222 reviewer: &str,
1223 signed_at: &str,
1224 approved: bool,
1225 reason: Option<&str>,
1226) -> String {
1227 let material = format!(
1228 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1229 reason.unwrap_or("")
1230 );
1231 let hash = sha2::Sha256::digest(material.as_bytes());
1232 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1233 format!("sha256:{hex}")
1234}
1235
1236fn approval_record_from_waitpoint(
1237 record: &WaitpointRecord,
1238 builtin: &str,
1239) -> Result<VmValue, VmError> {
1240 record
1241 .value
1242 .as_ref()
1243 .map(crate::stdlib::json_to_vm_value)
1244 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1245}
1246
1247async fn approval_wait_error(
1248 log: &Arc<AnyEventLog>,
1249 kind: HitlRequestKind,
1250 request_id: &str,
1251) -> VmError {
1252 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1253 if record.status == WaitpointStatus::Cancelled
1254 && record.reason.as_deref() != Some("upstream_cancelled")
1255 {
1256 return approval_denied_error(
1257 request_id,
1258 HitlHostResponse {
1259 request_id: request_id.to_string(),
1260 answer: None,
1261 approved: Some(false),
1262 accepted: None,
1263 reviewer: record.cancelled_by.clone(),
1264 reason: record.reason.clone(),
1265 metadata: record.metadata.clone(),
1266 responded_at: record.cancelled_at,
1267 signature: None,
1268 },
1269 );
1270 }
1271 if record.status == WaitpointStatus::Cancelled {
1272 return hitl_cancelled_error(
1273 request_id,
1274 kind,
1275 "",
1276 &[request_id.to_string()],
1277 record.reason,
1278 );
1279 }
1280 }
1281 hitl_cancelled_error(
1282 request_id,
1283 kind,
1284 "",
1285 &[request_id.to_string()],
1286 Some("upstream_cancelled".to_string()),
1287 )
1288}
1289
1290async fn append_timeout_once(
1291 log: &Arc<AnyEventLog>,
1292 kind: HitlRequestKind,
1293 request_id: &str,
1294 trace_id: &str,
1295) -> Result<(), VmError> {
1296 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1297 return Ok(());
1298 }
1299 append_timeout(log, kind, request_id, trace_id).await
1300}
1301
1302async fn hitl_event_exists(
1303 log: &Arc<AnyEventLog>,
1304 kind: HitlRequestKind,
1305 request_id: &str,
1306 event_kind: &str,
1307) -> Result<bool, VmError> {
1308 let topic = topic(kind)?;
1309 let events = log
1310 .read_range(&topic, None, usize::MAX)
1311 .await
1312 .map_err(log_error)?;
1313 Ok(events
1314 .into_iter()
1315 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1316}
1317
1318fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1319 match kind {
1320 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1321 _ => "hitl.approval_approved",
1322 }
1323}
1324
1325fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1326 match kind {
1327 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1328 _ => "hitl.approval_denied",
1329 }
1330}
1331
1332async fn append_request(
1333 log: &Arc<AnyEventLog>,
1334 request: &HitlRequestEnvelope,
1335) -> Result<(), VmError> {
1336 let topic = topic(request.kind)?;
1337 log.append(
1338 &topic,
1339 LogEvent::new(
1340 request.kind.request_event_kind(),
1341 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1342 )
1343 .with_headers(request_headers(request)),
1344 )
1345 .await
1346 .map(|_| ())
1347 .map_err(log_error)
1348}
1349
1350async fn append_named_event(
1351 log: &Arc<AnyEventLog>,
1352 kind: HitlRequestKind,
1353 event_kind: &str,
1354 request_id: &str,
1355 trace_id: &str,
1356 payload: JsonValue,
1357) -> Result<(), VmError> {
1358 let topic = topic(kind)?;
1359 let headers = headers_with_trace(request_id, trace_id);
1360 log.append(
1361 &topic,
1362 LogEvent::new(event_kind, payload).with_headers(headers),
1363 )
1364 .await
1365 .map(|_| ())
1366 .map_err(log_error)
1367}
1368
1369async fn append_timeout(
1370 log: &Arc<AnyEventLog>,
1371 kind: HitlRequestKind,
1372 request_id: &str,
1373 trace_id: &str,
1374) -> Result<(), VmError> {
1375 append_named_event(
1376 log,
1377 kind,
1378 "hitl.timeout",
1379 request_id,
1380 trace_id,
1381 serde_json::to_value(HitlTimeoutRecord {
1382 request_id: request_id.to_string(),
1383 kind,
1384 trace_id: trace_id.to_string(),
1385 timed_out_at: now_rfc3339(),
1386 })
1387 .map_err(|error| VmError::Runtime(error.to_string()))?,
1388 )
1389 .await
1390}
1391
1392async fn maybe_apply_mock_response(
1393 kind: HitlRequestKind,
1394 request_id: &str,
1395 request_payload: &JsonValue,
1396) -> Result<(), VmError> {
1397 let mut params = request_payload
1398 .as_object()
1399 .cloned()
1400 .unwrap_or_default()
1401 .into_iter()
1402 .map(|(key, value)| {
1403 (
1404 crate::value::intern_key(&key),
1405 crate::stdlib::json_to_vm_value(&value),
1406 )
1407 })
1408 .collect::<crate::value::DictMap>();
1409 params.put_str("request_id", request_id);
1410 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1411 return Ok(());
1412 };
1413 let value = result?;
1414 let responses = match value {
1415 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1416 other => vec![other],
1417 };
1418 for response in responses {
1419 let response_dict = response.as_dict().ok_or_else(|| {
1420 VmError::Runtime(format!(
1421 "mocked HITL {} response must be a dict or list<dict>",
1422 kind.as_str()
1423 ))
1424 })?;
1425 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1426 append_hitl_response(None, hitl_response)
1427 .await
1428 .map_err(VmError::Runtime)?;
1429 }
1430 Ok(())
1431}
1432
1433fn parse_hitl_response_dict(
1434 request_id: &str,
1435 response_dict: &crate::value::DictMap,
1436) -> Result<HitlHostResponse, VmError> {
1437 Ok(HitlHostResponse {
1438 request_id: request_id.to_string(),
1439 answer: response_dict
1440 .get("answer")
1441 .map(crate::llm::vm_value_to_json),
1442 approved: response_dict.get("approved").and_then(vm_bool),
1443 accepted: response_dict.get("accepted").and_then(vm_bool),
1444 reviewer: response_dict.get("reviewer").map(VmValue::display),
1445 reason: response_dict.get("reason").map(VmValue::display),
1446 metadata: response_dict
1447 .get("metadata")
1448 .map(crate::llm::vm_value_to_json),
1449 responded_at: response_dict.get("responded_at").map(VmValue::display),
1450 signature: response_dict.get("signature").map(VmValue::display),
1451 })
1452}
1453
1454fn maybe_notify_host(ctx: Option<&AsyncBuiltinCtx>, request: &HitlRequestEnvelope) {
1455 let Some(bridge) = ctx.and_then(|ctx| ctx.child_vm().bridge.clone()) else {
1456 return;
1457 };
1458 bridge.notify(
1459 "harn.hitl.requested",
1460 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1461 );
1462}
1463
1464fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1471 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1472 return;
1473 };
1474 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1475 session_id,
1476 request_id: request.request_id.clone(),
1477 kind: request.kind.as_str().to_string(),
1478 payload: request.payload.clone(),
1479 });
1480}
1481
1482fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1487 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1488 return;
1489 };
1490 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1491 session_id,
1492 request_id: request_id.to_string(),
1493 kind: kind.as_str().to_string(),
1494 outcome: outcome.to_string(),
1495 });
1496}
1497
1498async fn wait_for_request_waitpoint_with_events(
1505 request_id: &str,
1506 kind: HitlRequestKind,
1507 timeout: Option<StdDuration>,
1508) -> Result<WaitpointOutcome, VmError> {
1509 let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1510 let label = match &outcome {
1511 Ok(WaitpointOutcome::Completed(_)) => "answered",
1512 Ok(WaitpointOutcome::Timeout) => "timeout",
1513 Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1514 Err(_) => "error",
1515 };
1516 emit_hitl_resolved(request_id, kind, label);
1517 outcome
1518}
1519
1520fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1521 let Some(value) = value else {
1522 return Ok(AskUserOptions {
1523 schema: None,
1524 timeout: Some(default_question_timeout()),
1525 default: None,
1526 });
1527 };
1528 let dict = value
1529 .as_dict()
1530 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1531 Ok(AskUserOptions {
1532 schema: dict
1533 .get("schema")
1534 .cloned()
1535 .filter(|value| !matches!(value, VmValue::Nil)),
1536 timeout: dict
1537 .get("timeout")
1538 .map(parse_duration_value)
1539 .transpose()?
1540 .or_else(|| Some(default_question_timeout())),
1541 default: dict
1542 .get("default")
1543 .cloned()
1544 .filter(|value| !matches!(value, VmValue::Nil)),
1545 })
1546}
1547
1548fn default_question_timeout() -> StdDuration {
1549 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1550}
1551
1552fn escalation_capability_policy() -> JsonValue {
1553 crate::orchestration::current_execution_policy()
1554 .and_then(|policy| serde_json::to_value(policy).ok())
1555 .unwrap_or(JsonValue::Null)
1556}
1557
1558fn parse_approval_options(
1559 value: Option<&VmValue>,
1560 builtin: &str,
1561) -> Result<ApprovalOptions, VmError> {
1562 let dict = match value {
1563 None => None,
1564 Some(VmValue::Dict(dict)) => Some(dict),
1565 Some(_) => {
1566 return Err(VmError::Runtime(format!(
1567 "{builtin}: options must be a dict"
1568 )))
1569 }
1570 };
1571 let quorum = dict
1572 .and_then(|dict| dict.get("quorum"))
1573 .and_then(VmValue::as_int)
1574 .unwrap_or(1);
1575 if quorum <= 0 {
1576 return Err(VmError::Runtime(format!(
1577 "{builtin}: quorum must be positive"
1578 )));
1579 }
1580 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1581 let capabilities_requested = optional_string_list(
1582 dict.and_then(|dict| dict.get("capabilities_requested")),
1583 builtin,
1584 )?;
1585 let evidence_refs = dict
1586 .and_then(|dict| dict.get("evidence_refs"))
1587 .map(|value| match value {
1588 VmValue::List(items) => Ok(items
1589 .iter()
1590 .map(crate::llm::vm_value_to_json)
1591 .collect::<Vec<_>>()),
1592 _ => Err(VmError::Runtime(format!(
1593 "{builtin}: evidence_refs must be a list"
1594 ))),
1595 })
1596 .transpose()?
1597 .unwrap_or_default();
1598 let deadline = dict
1599 .and_then(|dict| dict.get("deadline"))
1600 .map(parse_duration_value)
1601 .transpose()?
1602 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1603 Ok(ApprovalOptions {
1604 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1605 args: dict.and_then(|dict| dict.get("args")).cloned(),
1606 quorum: quorum as u32,
1607 reviewers,
1608 deadline,
1609 principal: dict
1610 .and_then(|dict| dict.get("principal"))
1611 .map(VmValue::display)
1612 .filter(|value| !value.is_empty()),
1613 evidence_refs,
1614 undo_metadata: dict
1615 .and_then(|dict| dict.get("undo_metadata"))
1616 .map(crate::llm::vm_value_to_json),
1617 capabilities_requested,
1618 })
1619}
1620
1621fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1622 args.get(idx)
1623 .map(VmValue::display)
1624 .filter(|value| !value.is_empty())
1625 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1626}
1627
1628fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1629 let value = args
1630 .get(idx)
1631 .and_then(VmValue::as_int)
1632 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1633 if value <= 0 {
1634 return Err(VmError::Runtime(format!(
1635 "{builtin}: expected a positive int at {idx}"
1636 )));
1637 }
1638 Ok(value)
1639}
1640
1641fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1642 let Some(value) = value else {
1643 return Ok(Vec::new());
1644 };
1645 match value {
1646 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1647 _ => Err(VmError::Runtime(format!(
1648 "{builtin}: expected list<string>"
1649 ))),
1650 }
1651}
1652
1653fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1654 duration_from_value(value, "hitl", "timeout", ErrorKind::Runtime)
1655}
1656
1657fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1658 active_event_log()
1659 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1660}
1661
1662fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1663 if let Some(log) = active_event_log() {
1664 return Ok(log);
1665 }
1666 let Some(base_dir) = base_dir else {
1667 return Ok(install_memory_for_current_thread(
1668 HITL_EVENT_LOG_QUEUE_DEPTH,
1669 ));
1670 };
1671 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1672}
1673
1674fn current_dispatch_keys() -> Option<DispatchKeys> {
1675 let context = current_dispatch_context()?;
1676 let stable_base = context
1677 .replay_of_event_id
1678 .clone()
1679 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1680 let instance_key = format!(
1681 "{}::{}",
1682 context.trigger_event.id.0,
1683 context.replay_of_event_id.as_deref().unwrap_or("live")
1684 );
1685 Some(DispatchKeys {
1686 instance_key,
1687 stable_base,
1688 agent: context.agent_id,
1689 trace_id: context.trigger_event.trace_id.0,
1690 })
1691}
1692
1693fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1694 if let Some(keys) = dispatch_keys {
1695 let seq = REQUEST_SEQUENCE.with(|slot| {
1696 let mut state = slot.borrow_mut();
1697 if state.instance_key != keys.instance_key {
1698 state.instance_key = keys.instance_key.clone();
1699 state.next_seq = 0;
1700 }
1701 state.next_seq += 1;
1702 state.next_seq
1703 });
1704 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1705 }
1706 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1707}
1708
1709fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1710 let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1711 if let Some(run_id) = request.run_id.as_ref() {
1712 headers.insert("run_id".to_string(), run_id.clone());
1713 }
1714 headers
1715}
1716
1717fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1718 let mut headers = std::collections::BTreeMap::new();
1719 headers.insert("request_id".to_string(), request_id.to_string());
1720 headers
1721}
1722
1723fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1724 let mut headers = response_headers(request_id);
1725 headers.insert("trace_id".to_string(), trace_id.to_string());
1726 headers
1727}
1728
1729fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1730 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1731}
1732
1733fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1734 event
1735 .headers
1736 .get("request_id")
1737 .is_some_and(|value| value == request_id)
1738 || event
1739 .payload
1740 .get("request_id")
1741 .and_then(JsonValue::as_str)
1742 .is_some_and(|value| value == request_id)
1743}
1744
1745fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1746 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1747 "name": "ApprovalDeniedError",
1748 "category": "generic",
1749 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1750 "request_id": request_id,
1751 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1752 "reason": response.reason,
1753 })))
1754}
1755
1756fn hitl_cancelled_error(
1757 request_id: &str,
1758 kind: HitlRequestKind,
1759 wait_id: &str,
1760 waitpoint_ids: &[String],
1761 reason: Option<String>,
1762) -> VmError {
1763 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1764 let message = reason
1765 .clone()
1766 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1767 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1768 "name": "HumanCancelledError",
1769 "category": ErrorCategory::Cancelled.as_str(),
1770 "message": message,
1771 "request_id": request_id,
1772 "kind": kind.as_str(),
1773 "wait_id": wait_id,
1774 "waitpoint_ids": waitpoint_ids,
1775 "reason": reason,
1776 })))
1777}
1778
1779fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1780 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1781 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1782 "name": "HumanTimeoutError",
1783 "category": ErrorCategory::Timeout.as_str(),
1784 "message": format!("{} timed out", kind.as_str()),
1785 "request_id": request_id,
1786 "kind": kind.as_str(),
1787 })))
1788}
1789
1790fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1791 match default {
1792 VmValue::Int(_) => match value {
1793 VmValue::Int(_) => value.clone(),
1794 VmValue::Float(number) => VmValue::Int(*number as i64),
1795 VmValue::String(text) => text
1796 .parse::<i64>()
1797 .map(VmValue::Int)
1798 .unwrap_or_else(|_| default.clone()),
1799 _ => default.clone(),
1800 },
1801 VmValue::Float(_) => match value {
1802 VmValue::Float(_) => value.clone(),
1803 VmValue::Int(number) => VmValue::Float(*number as f64),
1804 VmValue::String(text) => text
1805 .parse::<f64>()
1806 .map(VmValue::Float)
1807 .unwrap_or_else(|_| default.clone()),
1808 _ => default.clone(),
1809 },
1810 VmValue::Bool(_) => match value {
1811 VmValue::Bool(_) => value.clone(),
1812 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1813 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1814 _ => default.clone(),
1815 },
1816 VmValue::String(_) => VmValue::String(arcstr::ArcStr::from(value.display())),
1817 VmValue::Duration(_) => match value {
1818 VmValue::Duration(_) => value.clone(),
1819 VmValue::Int(ms) => VmValue::Duration(*ms),
1820 _ => default.clone(),
1821 },
1822 VmValue::Nil => value.clone(),
1823 _ => {
1824 if value.type_name() == default.type_name() {
1825 value.clone()
1826 } else {
1827 default.clone()
1828 }
1829 }
1830 }
1831}
1832
1833fn log_error(error: impl std::fmt::Display) -> VmError {
1834 VmError::Runtime(error.to_string())
1835}
1836
1837fn now_rfc3339() -> String {
1838 format_rfc3339(OffsetDateTime::now_utc())
1839}
1840
1841fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1842 timestamp
1843 .format(&Rfc3339)
1844 .unwrap_or_else(|_| timestamp.to_string())
1845}
1846
1847fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1848 time::Duration::try_from(duration)
1849 .ok()
1850 .map(|duration| format_rfc3339(requested_at + duration))
1851}
1852
1853fn new_trace_id() -> String {
1854 format!("trace_{}", Uuid::now_v7())
1855}
1856
1857fn vm_bool(value: &VmValue) -> Option<bool> {
1858 match value {
1859 VmValue::Bool(flag) => Some(*flag),
1860 _ => None,
1861 }
1862}
1863
1864fn vm_string(value: &VmValue) -> Option<&str> {
1865 match value {
1866 VmValue::String(text) => Some(text.as_ref()),
1867 _ => None,
1868 }
1869}
1870
1871fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1872 match value {
1873 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1874 _ => None,
1875 }
1876}
1877
1878#[cfg(test)]
1879mod tests {
1880 use std::sync::OnceLock;
1881
1882 use tokio::sync::Mutex;
1883
1884 use super::{
1885 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1886 };
1887 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1888 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1889
1890 fn hitl_lock() -> &'static Mutex<()> {
1901 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
1902 LOCK.get_or_init(|| Mutex::new(()))
1903 }
1904
1905 async fn execute_hitl_script(
1906 base_dir: &std::path::Path,
1907 source: &str,
1908 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1909 reset_thread_local_state();
1910 let log = install_default_for_base_dir(base_dir).expect("install event log");
1911 let chunk = compile_source(source).expect("compile source");
1912 let mut vm = Vm::new();
1913 register_vm_stdlib(&mut vm);
1914 vm.set_source_dir(base_dir);
1915 vm.execute(&chunk).await?;
1916 let output = vm.output().trim_end().to_string();
1917 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1918 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1919 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1920 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1921 Ok((
1922 output,
1923 question_events,
1924 approval_events,
1925 dual_control_events,
1926 escalation_events,
1927 ))
1928 }
1929
1930 async fn event_kinds(
1931 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1932 topic: &str,
1933 ) -> Vec<String> {
1934 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1935 .await
1936 .expect("read topic")
1937 .into_iter()
1938 .map(|(_, event)| event.kind)
1939 .collect()
1940 }
1941
1942 async fn event_payloads(
1943 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1944 topic: &str,
1945 ) -> Vec<serde_json::Value> {
1946 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1947 .await
1948 .expect("read topic")
1949 .into_iter()
1950 .map(|(_, event)| event.payload)
1951 .collect()
1952 }
1953
1954 #[tokio::test(flavor = "current_thread")]
1955 async fn ask_user_coerces_to_default_type_and_logs_events() {
1956 tokio::task::LocalSet::new()
1957 .run_until(async {
1958 let dir = tempfile::tempdir().expect("tempdir");
1959 let source = r#"
1960pipeline test(task) {
1961 host_mock("hitl", "question", {answer: "9"})
1962 let answer: int = ask_user("Pick a number", {default: 0})
1963 __io_println(answer)
1964}
1965"#;
1966 let (
1967 output,
1968 question_events,
1969 approval_events,
1970 dual_control_events,
1971 escalation_events,
1972 ) = execute_hitl_script(dir.path(), source)
1973 .await
1974 .expect("script succeeds");
1975 assert_eq!(output, "9");
1976 assert_eq!(
1977 question_events,
1978 vec![
1979 "hitl.question_asked".to_string(),
1980 "hitl.response_received".to_string()
1981 ]
1982 );
1983 assert!(approval_events.is_empty());
1984 assert!(dual_control_events.is_empty());
1985 assert!(escalation_events.is_empty());
1986 })
1987 .await;
1988 }
1989
1990 #[tokio::test(flavor = "current_thread")]
1991 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1992 let _guard = hitl_lock().lock().await;
1993 tokio::task::LocalSet::new()
1994 .run_until(async {
1995 reset_thread_local_state();
1996 let dir = tempfile::tempdir().expect("tempdir");
1997 let source = r#"
1998pipeline test(task) {
1999 host_mock("hitl", "approval", [
2000 {approved: true, reviewer: "alice", reason: "ok"},
2001 {approved: true, reviewer: "bob", reason: "ship it"},
2002 ])
2003 let record = request_approval(
2004 "deploy production",
2005 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
2006 )
2007 __io_println(record.approved)
2008 __io_println(len(record.reviewers))
2009 __io_println(record.reviewers[0])
2010 __io_println(record.reviewers[1])
2011}
2012"#;
2013 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2014 .await
2015 .expect("script succeeds");
2016 assert_eq!(
2017 approval_events,
2018 vec![
2019 "hitl.approval_requested".to_string(),
2020 "hitl.response_received".to_string(),
2021 "hitl.response_received".to_string(),
2022 "hitl.approval_approved".to_string(),
2023 ]
2024 );
2025 })
2026 .await;
2027 }
2028
2029 #[tokio::test(flavor = "current_thread")]
2030 async fn request_approval_emits_canonical_approval_request_payload() {
2031 tokio::task::LocalSet::new()
2032 .run_until(async {
2033 reset_thread_local_state();
2034 let dir = tempfile::tempdir().expect("tempdir");
2035 let log = install_default_for_base_dir(dir.path()).expect("install event log");
2036 let source = r#"
2037pipeline test(task) {
2038 host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
2039 request_approval("deploy production", {
2040 args: {environment: "prod"},
2041 quorum: 1,
2042 reviewers: ["alice"],
2043 evidence_refs: [{kind: "run", uri: "run_123"}],
2044 undo_metadata: {strategy: "rollback"},
2045 capabilities_requested: ["deploy.production"],
2046 })
2047}
2048"#;
2049 let chunk = compile_source(source).expect("compile source");
2050 let mut vm = Vm::new();
2051 register_vm_stdlib(&mut vm);
2052 vm.set_source_dir(dir.path());
2053 vm.execute(&chunk).await.expect("script succeeds");
2054
2055 let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
2056 let request_payload = &payloads[0]["payload"];
2057 let approval_request = &request_payload["approval_request"];
2058 assert_eq!(approval_request["id"], request_payload["id"]);
2059 assert_eq!(approval_request["action"], "deploy production");
2060 assert_eq!(approval_request["args"]["environment"], "prod");
2061 assert_eq!(approval_request["approvers_required"], 1);
2062 assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
2063 assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
2064 assert_eq!(
2065 approval_request["capabilities_requested"][0],
2066 "deploy.production"
2067 );
2068 assert!(approval_request["requested_at"].as_str().is_some());
2069 assert!(approval_request["deadline"].as_str().is_some());
2070 })
2071 .await;
2072 }
2073
2074 #[tokio::test(flavor = "current_thread")]
2075 async fn request_approval_surfaces_denials_as_typed_errors() {
2076 let _guard = hitl_lock().lock().await;
2077 tokio::task::LocalSet::new()
2078 .run_until(async {
2079 reset_thread_local_state();
2080 let dir = tempfile::tempdir().expect("tempdir");
2081 let source = r#"
2082pipeline test(task) {
2083 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2084 let denied = try {
2085 request_approval("drop table", {reviewers: ["alice"]})
2086 }
2087 __io_println(is_err(denied))
2088 __io_println(unwrap_err(denied).name)
2089 __io_println(unwrap_err(denied).reason)
2090}
2091"#;
2092 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2093 .await
2094 .expect("script succeeds");
2095 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2096 assert_eq!(
2097 approval_events,
2098 vec![
2099 "hitl.approval_requested".to_string(),
2100 "hitl.response_received".to_string(),
2101 "hitl.approval_denied".to_string(),
2102 ]
2103 );
2104 })
2105 .await;
2106 }
2107
2108 #[tokio::test(flavor = "current_thread")]
2109 async fn dual_control_executes_action_after_quorum() {
2110 tokio::task::LocalSet::new()
2111 .run_until(async {
2112 let dir = tempfile::tempdir().expect("tempdir");
2113 let source = r#"
2114pipeline test(task) {
2115 host_mock("hitl", "dual_control", [
2116 {approved: true, reviewer: "alice"},
2117 {approved: true, reviewer: "bob"},
2118 ])
2119 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2120 __io_println(result)
2121}
2122"#;
2123 let (output, _, _, dual_control_events, _) =
2124 execute_hitl_script(dir.path(), source)
2125 .await
2126 .expect("script succeeds");
2127 assert_eq!(output, "launched");
2128 assert_eq!(
2129 dual_control_events,
2130 vec![
2131 "hitl.dual_control_requested".to_string(),
2132 "hitl.response_received".to_string(),
2133 "hitl.response_received".to_string(),
2134 "hitl.dual_control_approved".to_string(),
2135 "hitl.dual_control_executed".to_string(),
2136 ]
2137 );
2138 })
2139 .await;
2140 }
2141
2142 #[tokio::test(flavor = "current_thread")]
2143 async fn escalate_to_waits_for_acceptance_event() {
2144 tokio::task::LocalSet::new()
2145 .run_until(async {
2146 let dir = tempfile::tempdir().expect("tempdir");
2147 let source = r#"
2148pipeline test(task) {
2149 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2150 let handle = escalate_to("admin", "need override")
2151 __io_println(handle.status)
2152 __io_println(handle.reviewer)
2153}
2154"#;
2155 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2156 .await
2157 .expect("script succeeds");
2158 assert_eq!(output, "accepted\nlead");
2159 assert_eq!(
2160 escalation_events,
2161 vec![
2162 "hitl.escalation_issued".to_string(),
2163 "hitl.escalation_accepted".to_string(),
2164 ]
2165 );
2166 })
2167 .await;
2168 }
2169
2170 #[tokio::test(flavor = "current_thread")]
2176 async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2177 use std::sync::Mutex as StdMutex;
2178
2179 tokio::task::LocalSet::new()
2180 .run_until(async {
2181 let dir = tempfile::tempdir().expect("tempdir");
2182 let session_id = "hitl-session".to_string();
2183 let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2184 std::sync::Arc::new(StdMutex::new(Vec::new()));
2185
2186 struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2187 impl crate::agent_events::AgentEventSink for CaptureSink {
2188 fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2189 self.0.lock().expect("captured").push(event.clone());
2190 }
2191 }
2192
2193 crate::reset_thread_local_state();
2199 crate::event_log::install_default_for_base_dir(dir.path())
2200 .expect("install event log");
2201
2202 crate::agent_events::reset_all_sinks();
2203 let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2204 std::sync::Arc::new(CaptureSink(captured.clone()));
2205 crate::agent_events::register_sink(session_id.clone(), sink);
2206 crate::agent_sessions::open_or_create(Some(session_id.clone()));
2207 let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2208
2209 let source = r#"
2210pipeline test(task) {
2211 host_mock("hitl", "question", {answer: "ok"})
2212 let answer: string = ask_user("Are you sure?", {default: "no"})
2213 __io_println(answer)
2214}
2215"#;
2216 let chunk = crate::compile_source(source).expect("compile source");
2217 let mut vm = Vm::new();
2218 register_vm_stdlib(&mut vm);
2219 vm.set_source_dir(dir.path());
2220 vm.execute(&chunk).await.expect("script runs");
2221 assert_eq!(vm.output().trim_end(), "ok");
2222
2223 let events = captured.lock().expect("captured");
2224 let mut iter = events.iter().filter(|event| {
2225 matches!(
2226 event,
2227 crate::agent_events::AgentEvent::HitlRequested { .. }
2228 | crate::agent_events::AgentEvent::HitlResolved { .. }
2229 )
2230 });
2231 let requested = iter.next().expect("HitlRequested emitted");
2232 let resolved = iter.next().expect("HitlResolved emitted");
2233 assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2234
2235 let crate::agent_events::AgentEvent::HitlRequested {
2236 session_id: req_session,
2237 request_id: req_id,
2238 kind: req_kind,
2239 payload,
2240 } = requested
2241 else {
2242 panic!("expected HitlRequested, got: {requested:?}");
2243 };
2244 assert_eq!(req_session, &session_id);
2245 assert_eq!(req_kind, "question");
2246 assert!(req_id.starts_with("hitl_question_"));
2247 assert_eq!(payload["prompt"], "Are you sure?");
2248
2249 let crate::agent_events::AgentEvent::HitlResolved {
2250 request_id: res_id,
2251 kind: res_kind,
2252 outcome,
2253 ..
2254 } = resolved
2255 else {
2256 panic!("expected HitlResolved, got: {resolved:?}");
2257 };
2258 assert_eq!(res_id, req_id);
2259 assert_eq!(res_kind, "question");
2260 assert_eq!(outcome, "answered");
2261
2262 drop(_guard);
2263 crate::agent_events::reset_all_sinks();
2264 })
2265 .await;
2266 }
2267}