1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration as StdDuration;
6
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value as JsonValue};
9use sha2::Digest;
10use time::format_description::well_known::Rfc3339;
11use time::OffsetDateTime;
12use uuid::Uuid;
13
14use crate::event_log::{
15 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
16 EventLog, LogEvent, Topic,
17};
18use crate::runtime_limits::RuntimeLimits;
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::macros::{harn_builtin, BuiltinSignature, Param, VmBuiltinDef, TY_ANY, TY_DICT};
22use crate::stdlib::options::{duration_from_value, ErrorKind};
23use crate::stdlib::waitpoint::{
24 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
25 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
26 WaitpointWaitOptions,
27};
28use crate::triggers::dispatcher::current_dispatch_context;
29use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
30use crate::vm::{AsyncBuiltinCtx, Vm};
31
32const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
33const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
34const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
35
36pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
37pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
38pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
39pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
40
41thread_local! {
42 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
43}
44
45#[derive(Default)]
46pub(crate) struct RequestSequenceState {
47 pub(crate) instance_key: String,
48 pub(crate) next_seq: u64,
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub enum HitlRequestKind {
54 Question,
55 Approval,
56 DualControl,
57 Escalation,
58}
59
60impl HitlRequestKind {
61 pub(crate) fn as_str(self) -> &'static str {
62 match self {
63 Self::Question => "question",
64 Self::Approval => "approval",
65 Self::DualControl => "dual_control",
66 Self::Escalation => "escalation",
67 }
68 }
69
70 fn topic(self) -> &'static str {
71 match self {
72 Self::Question => HITL_QUESTIONS_TOPIC,
73 Self::Approval => HITL_APPROVALS_TOPIC,
74 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
75 Self::Escalation => HITL_ESCALATIONS_TOPIC,
76 }
77 }
78
79 fn request_event_kind(self) -> &'static str {
80 match self {
81 Self::Question => "hitl.question_asked",
82 Self::Approval => "hitl.approval_requested",
83 Self::DualControl => "hitl.dual_control_requested",
84 Self::Escalation => "hitl.escalation_issued",
85 }
86 }
87
88 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
89 if request_id.starts_with("hitl_question_") {
90 Some(Self::Question)
91 } else if request_id.starts_with("hitl_approval_") {
92 Some(Self::Approval)
93 } else if request_id.starts_with("hitl_dual_control_") {
94 Some(Self::DualControl)
95 } else if request_id.starts_with("hitl_escalation_") {
96 Some(Self::Escalation)
97 } else {
98 None
99 }
100 }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct HitlHostResponse {
105 pub request_id: String,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub answer: Option<JsonValue>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub approved: Option<bool>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub accepted: Option<bool>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub reviewer: Option<String>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub reason: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub metadata: Option<JsonValue>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub responded_at: Option<String>,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 pub signature: Option<String>,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125struct HitlRequestEnvelope {
126 request_id: String,
127 kind: HitlRequestKind,
128 #[serde(default)]
129 agent: String,
130 trace_id: String,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 run_id: Option<String>,
133 requested_at: String,
134 payload: JsonValue,
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize)]
138struct HitlTimeoutRecord {
139 request_id: String,
140 kind: HitlRequestKind,
141 trace_id: String,
142 timed_out_at: String,
143}
144
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct ApprovalRequest {
147 pub id: String,
148 pub action: String,
149 #[serde(default)]
150 pub args: JsonValue,
151 pub principal: String,
152 pub requested_at: String,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub deadline: Option<String>,
155 pub approvers_required: u32,
156 #[serde(default)]
157 pub evidence_refs: Vec<JsonValue>,
158 #[serde(default)]
159 pub undo_metadata: JsonValue,
160 #[serde(default)]
161 pub capabilities_requested: Vec<String>,
162}
163
164impl ApprovalRequest {
165 pub fn new(
166 id: impl Into<String>,
167 action: impl Into<String>,
168 args: JsonValue,
169 principal: impl Into<String>,
170 requested_at: impl Into<String>,
171 ) -> Self {
172 Self {
173 id: id.into(),
174 action: action.into(),
175 args,
176 principal: principal.into(),
177 requested_at: requested_at.into(),
178 deadline: None,
179 approvers_required: 1,
180 evidence_refs: Vec::new(),
181 undo_metadata: JsonValue::Null,
182 capabilities_requested: Vec::new(),
183 }
184 }
185}
186
187pub(crate) fn approval_request_for_host_permission(
188 id: impl Into<String>,
189 action: impl Into<String>,
190 args: JsonValue,
191 principal: impl Into<String>,
192 evidence_refs: Vec<JsonValue>,
193 undo_metadata: JsonValue,
194 capabilities_requested: Vec<String>,
195) -> ApprovalRequest {
196 let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
197 request.evidence_refs = evidence_refs;
198 request.undo_metadata = undo_metadata;
199 request.capabilities_requested = capabilities_requested;
200 request
201}
202
203#[derive(Clone, Debug)]
204struct DispatchKeys {
205 instance_key: String,
206 stable_base: String,
207 agent: String,
208 trace_id: String,
209}
210
211#[derive(Clone, Debug)]
212struct AskUserOptions {
213 schema: Option<VmValue>,
214 timeout: Option<StdDuration>,
215 default: Option<VmValue>,
216}
217
218#[derive(Clone, Debug)]
219struct ApprovalOptions {
220 detail: Option<VmValue>,
221 args: Option<VmValue>,
222 quorum: u32,
223 reviewers: Vec<String>,
224 deadline: StdDuration,
225 principal: Option<String>,
226 evidence_refs: Vec<JsonValue>,
227 undo_metadata: Option<JsonValue>,
228 capabilities_requested: Vec<String>,
229}
230
231#[derive(Clone, Debug)]
232struct ApprovalProgress {
233 request_id: String,
234 reviewers: BTreeSet<String>,
235 signatures: Vec<ApprovalSignature>,
236 reason: Option<String>,
237 approved_at: Option<String>,
238}
239
240#[derive(Clone, Debug, Serialize)]
241struct ApprovalSignature {
242 reviewer: String,
243 signed_at: String,
244 signature: String,
245}
246
247#[derive(Clone, Debug)]
248enum ApprovalResolution {
249 Pending,
250 Approved(ApprovalProgress),
251 Denied(HitlHostResponse),
252}
253
254#[allow(clippy::large_enum_variant)]
262#[derive(Clone, Debug)]
263enum WaitpointOutcome {
264 Completed(WaitpointRecord),
265 Timeout,
266 Cancelled {
267 wait_id: String,
268 waitpoint_ids: Vec<String>,
269 reason: Option<String>,
270 },
271}
272
273pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
274 for def in MODULE_BUILTINS {
275 vm.register_builtin_def(def);
276 }
277}
278
279pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
280 &ASK_USER_BUILTIN_DEF,
281 &REQUEST_APPROVAL_BUILTIN_DEF,
282 &DUAL_CONTROL_BUILTIN_DEF,
283 &ESCALATE_TO_BUILTIN_DEF,
284];
285
286#[harn_builtin(
287 sig = "ask_user(prompt: string, options?: dict) -> any",
288 kind = "async",
289 category = "hitl"
290)]
291async fn ask_user_builtin(
292 ctx: crate::vm::AsyncBuiltinCtx,
293 args: Vec<VmValue>,
294) -> Result<VmValue, VmError> {
295 ask_user_impl(Some(&ctx), &args).await
296}
297
298#[harn_builtin(
299 sig_expr = BuiltinSignature::variadic("request_approval", &[Param::new("args", TY_ANY)], TY_DICT),
300 kind = "async",
301 category = "hitl"
302)]
303async fn request_approval_builtin(
304 ctx: crate::vm::AsyncBuiltinCtx,
305 args: Vec<VmValue>,
306) -> Result<VmValue, VmError> {
307 request_approval_impl(Some(&ctx), &args).await
308}
309
310#[harn_builtin(
311 sig = "dual_control(n: int, m: int, action: closure, approvers?: list) -> dict",
312 kind = "async",
313 category = "hitl"
314)]
315async fn dual_control_builtin(
316 ctx: crate::vm::AsyncBuiltinCtx,
317 args: Vec<VmValue>,
318) -> Result<VmValue, VmError> {
319 dual_control_impl(&ctx, &args).await
320}
321
322#[harn_builtin(
323 sig = "escalate_to(role: string, reason: string) -> dict",
324 kind = "async",
325 category = "hitl"
326)]
327async fn escalate_to_builtin(
328 ctx: crate::vm::AsyncBuiltinCtx,
329 args: Vec<VmValue>,
330) -> Result<VmValue, VmError> {
331 escalate_to_impl(Some(&ctx), &args).await
332}
333
334pub(crate) fn reset_hitl_state() {
335 REQUEST_SEQUENCE.with(|slot| {
336 *slot.borrow_mut() = RequestSequenceState::default();
337 });
338}
339
340pub(crate) fn take_hitl_state() -> RequestSequenceState {
341 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
342}
343
344pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
345 REQUEST_SEQUENCE.with(|slot| {
346 *slot.borrow_mut() = state;
347 });
348}
349
350pub async fn append_hitl_response(
351 base_dir: Option<&Path>,
352 mut response: HitlHostResponse,
353) -> Result<u64, String> {
354 let kind = HitlRequestKind::from_request_id(&response.request_id)
355 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
356 if response.responded_at.is_none() {
357 response.responded_at = Some(now_rfc3339());
358 }
359 let log = ensure_hitl_event_log_for(base_dir)?;
360 let headers = response_headers(&response.request_id);
361 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
362 let event_id = log
363 .append(
364 &topic,
365 LogEvent::new(
366 match kind {
367 HitlRequestKind::Escalation => "hitl.escalation_accepted",
368 _ => "hitl.response_received",
369 },
370 serde_json::to_value(&response).map_err(|error| error.to_string())?,
371 )
372 .with_headers(headers),
373 )
374 .await
375 .map_err(|error| error.to_string())?;
376 finalize_hitl_response(&log, kind, &response).await?;
377 Ok(event_id)
378}
379
380pub async fn append_approval_request_on(
381 log: &Arc<AnyEventLog>,
382 agent: impl Into<String>,
383 trace_id: impl Into<String>,
384 action: impl Into<String>,
385 detail: JsonValue,
386 reviewers: Vec<String>,
387) -> Result<String, VmError> {
388 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
389 let trace_id = trace_id.into();
390 let agent = agent.into();
391 let requested_at_time = OffsetDateTime::now_utc();
392 let requested_at = format_rfc3339(requested_at_time);
393 let mut approval_request = ApprovalRequest::new(
394 request_id.clone(),
395 action.into(),
396 detail.clone(),
397 agent.clone(),
398 requested_at.clone(),
399 );
400 approval_request.deadline = deadline_after(
401 requested_at_time,
402 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
403 );
404 approval_request.approvers_required = 1;
405 let approval_request_json = serde_json::to_value(&approval_request)
406 .map_err(|error| VmError::Runtime(error.to_string()))?;
407 let request = HitlRequestEnvelope {
408 request_id: request_id.clone(),
409 kind: HitlRequestKind::Approval,
410 agent,
411 trace_id: trace_id.clone(),
412 run_id: None,
413 requested_at: requested_at.clone(),
414 payload: json!({
415 "approval_request": approval_request_json,
416 "id": approval_request.id,
417 "action": approval_request.action,
418 "args": approval_request.args,
419 "principal": approval_request.principal,
420 "requested_at": requested_at,
421 "deadline": approval_request.deadline,
422 "approvers_required": approval_request.approvers_required,
423 "evidence_refs": approval_request.evidence_refs,
424 "undo_metadata": approval_request.undo_metadata,
425 "capabilities_requested": approval_request.capabilities_requested,
426 "detail": detail,
427 "quorum": 1,
428 "reviewers": reviewers,
429 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
430 }),
431 };
432 create_request_waitpoint(log, &request).await?;
433 append_request(log, &request).await?;
434 maybe_notify_host(None, &request);
435 Ok(request_id)
436}
437
438async fn ask_user_impl(
439 ctx: Option<&AsyncBuiltinCtx>,
440 args: &[VmValue],
441) -> Result<VmValue, VmError> {
442 let prompt = required_string_arg(args, 0, "ask_user")?;
443 let options = parse_ask_user_options(args.get(1))?;
444 let keys = current_dispatch_keys();
445 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
446 let trace_id = keys
447 .as_ref()
448 .map(|keys| keys.trace_id.clone())
449 .unwrap_or_else(new_trace_id);
450 let log = ensure_hitl_event_log();
451 let request = HitlRequestEnvelope {
452 request_id: request_id.clone(),
453 kind: HitlRequestKind::Question,
454 agent: keys
455 .as_ref()
456 .map(|keys| keys.agent.clone())
457 .unwrap_or_default(),
458 trace_id: trace_id.clone(),
459 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
460 requested_at: now_rfc3339(),
461 payload: json!({
462 "prompt": prompt,
463 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
464 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
465 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
466 }),
467 };
468 create_request_waitpoint(&log, &request).await?;
469 append_request(&log, &request).await?;
470 maybe_notify_host(ctx, &request);
471 emit_hitl_requested(&request);
472 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
473
474 match wait_for_request_waitpoint_with_events(
475 &request_id,
476 HitlRequestKind::Question,
477 options.timeout,
478 )
479 .await?
480 {
481 WaitpointOutcome::Completed(record) => {
482 let answer = record
483 .value
484 .as_ref()
485 .map(crate::stdlib::json_to_vm_value)
486 .unwrap_or(VmValue::Nil);
487 if let Some(schema) = options.schema.as_ref() {
488 return schema_expect_value(&answer, schema, true);
489 }
490 if let Some(default) = options.default.as_ref() {
491 return Ok(coerce_like_default(&answer, default));
492 }
493 Ok(answer)
494 }
495 WaitpointOutcome::Timeout => {
496 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
497 if let Some(default) = options.default {
498 return Ok(default);
499 }
500 Err(timeout_error(&request_id, HitlRequestKind::Question))
501 }
502 WaitpointOutcome::Cancelled {
503 wait_id,
504 waitpoint_ids,
505 reason,
506 } => Err(hitl_cancelled_error(
507 &request_id,
508 HitlRequestKind::Question,
509 &wait_id,
510 &waitpoint_ids,
511 reason,
512 )),
513 }
514}
515
516async fn request_approval_impl(
517 ctx: Option<&AsyncBuiltinCtx>,
518 args: &[VmValue],
519) -> Result<VmValue, VmError> {
520 let action = required_string_arg(args, 0, "request_approval")?;
521 let options = parse_approval_options(args.get(1), "request_approval")?;
522 let keys = current_dispatch_keys();
523 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
524 let trace_id = keys
525 .as_ref()
526 .map(|keys| keys.trace_id.clone())
527 .unwrap_or_else(new_trace_id);
528 let agent = keys
529 .as_ref()
530 .map(|keys| keys.agent.clone())
531 .unwrap_or_default();
532 let requested_at_time = OffsetDateTime::now_utc();
533 let requested_at = format_rfc3339(requested_at_time);
534 let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
535 let approval_args = options
536 .args
537 .as_ref()
538 .or(options.detail.as_ref())
539 .map(crate::llm::vm_value_to_json)
540 .unwrap_or(JsonValue::Null);
541 let mut approval_request = ApprovalRequest::new(
542 request_id.clone(),
543 action.clone(),
544 approval_args,
545 principal,
546 requested_at.clone(),
547 );
548 approval_request.deadline = deadline_after(requested_at_time, options.deadline);
549 approval_request.approvers_required = options.quorum;
550 approval_request.evidence_refs = options.evidence_refs.clone();
551 approval_request.undo_metadata = options
552 .undo_metadata
553 .clone()
554 .or_else(|| {
555 crate::orchestration::current_mutation_session()
556 .and_then(|session| serde_json::to_value(session).ok())
557 })
558 .unwrap_or(JsonValue::Null);
559 approval_request.capabilities_requested = options.capabilities_requested.clone();
560 let approval_request_json = serde_json::to_value(&approval_request)
561 .map_err(|error| VmError::Runtime(error.to_string()))?;
562 let log = ensure_hitl_event_log();
563 let request = HitlRequestEnvelope {
564 request_id: request_id.clone(),
565 kind: HitlRequestKind::Approval,
566 agent,
567 trace_id: trace_id.clone(),
568 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
569 requested_at: requested_at.clone(),
570 payload: json!({
571 "approval_request": approval_request_json,
572 "id": approval_request.id,
573 "action": action,
574 "args": approval_request.args,
575 "principal": approval_request.principal,
576 "requested_at": requested_at,
577 "deadline": approval_request.deadline,
578 "approvers_required": approval_request.approvers_required,
579 "evidence_refs": approval_request.evidence_refs,
580 "undo_metadata": approval_request.undo_metadata,
581 "capabilities_requested": approval_request.capabilities_requested,
582 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
583 "quorum": options.quorum,
584 "reviewers": options.reviewers,
585 "deadline_ms": options.deadline.as_millis() as u64,
586 }),
587 };
588 create_request_waitpoint(&log, &request).await?;
589 append_request(&log, &request).await?;
590 maybe_notify_host(ctx, &request);
591 emit_hitl_requested(&request);
592 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
593
594 match wait_for_request_waitpoint_with_events(
595 &request_id,
596 HitlRequestKind::Approval,
597 Some(options.deadline),
598 )
599 .await?
600 {
601 WaitpointOutcome::Completed(record) => {
602 approval_record_from_waitpoint(&record, "request_approval")
603 }
604 WaitpointOutcome::Timeout => {
605 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
606 Err(timeout_error(&request_id, HitlRequestKind::Approval))
607 }
608 WaitpointOutcome::Cancelled { .. } => {
609 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
610 }
611 }
612}
613
614pub(crate) async fn request_approval_for_side_effect(
615 action: &str,
616 detail: JsonValue,
617 principal: String,
618 reviewers: Vec<String>,
619 capabilities_requested: Vec<String>,
620) -> Result<VmValue, VmError> {
621 let mut options = BTreeMap::new();
622 options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
623 options.insert(
624 "detail".to_string(),
625 crate::stdlib::json_to_vm_value(&detail),
626 );
627 options.insert(
628 "principal".to_string(),
629 VmValue::String(std::sync::Arc::from(principal)),
630 );
631 options.insert(
632 "reviewers".to_string(),
633 VmValue::List(std::sync::Arc::new(
634 reviewers
635 .into_iter()
636 .map(|reviewer| VmValue::String(std::sync::Arc::from(reviewer)))
637 .collect(),
638 )),
639 );
640 options.insert(
641 "capabilities_requested".to_string(),
642 VmValue::List(std::sync::Arc::new(
643 capabilities_requested
644 .into_iter()
645 .map(|capability| VmValue::String(std::sync::Arc::from(capability)))
646 .collect(),
647 )),
648 );
649 let args = vec![
650 VmValue::String(std::sync::Arc::from(action.to_string())),
651 VmValue::Dict(std::sync::Arc::new(options)),
652 ];
653 request_approval_impl(None, &args).await
654}
655
656async fn dual_control_impl(ctx: &AsyncBuiltinCtx, args: &[VmValue]) -> Result<VmValue, VmError> {
657 let n = required_positive_int_arg(args, 0, "dual_control")?;
658 let m = required_positive_int_arg(args, 1, "dual_control")?;
659 if n > m {
660 return Err(VmError::Runtime(
661 "dual_control: n must be less than or equal to m".to_string(),
662 ));
663 }
664 let action = args
665 .get(2)
666 .and_then(|value| match value {
667 VmValue::Closure(closure) => Some(closure.clone()),
668 _ => None,
669 })
670 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
671 let approvers = optional_string_list(args.get(3), "dual_control")?;
672 if !approvers.is_empty() && approvers.len() < m as usize {
673 return Err(VmError::Runtime(format!(
674 "dual_control: expected at least {m} approvers, got {}",
675 approvers.len()
676 )));
677 }
678
679 let keys = current_dispatch_keys();
680 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
681 let trace_id = keys
682 .as_ref()
683 .map(|keys| keys.trace_id.clone())
684 .unwrap_or_else(new_trace_id);
685 let action_name = if action.func.name.is_empty() {
686 "anonymous".to_string()
687 } else {
688 action.func.name.clone()
689 };
690 let agent = keys
691 .as_ref()
692 .map(|keys| keys.agent.clone())
693 .unwrap_or_default();
694 let requested_at_time = OffsetDateTime::now_utc();
695 let requested_at = format_rfc3339(requested_at_time);
696 let mut approval_request = ApprovalRequest::new(
697 request_id.clone(),
698 action_name.clone(),
699 json!({
700 "n": n,
701 "m": m,
702 "approvers": approvers.clone(),
703 }),
704 agent.clone(),
705 requested_at.clone(),
706 );
707 approval_request.deadline = deadline_after(
708 requested_at_time,
709 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
710 );
711 approval_request.approvers_required = n as u32;
712 approval_request.undo_metadata = crate::orchestration::current_mutation_session()
713 .and_then(|session| serde_json::to_value(session).ok())
714 .unwrap_or(JsonValue::Null);
715 let approval_request_json = serde_json::to_value(&approval_request)
716 .map_err(|error| VmError::Runtime(error.to_string()))?;
717 let log = ensure_hitl_event_log();
718 let request = HitlRequestEnvelope {
719 request_id: request_id.clone(),
720 kind: HitlRequestKind::DualControl,
721 agent,
722 trace_id: trace_id.clone(),
723 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
724 requested_at: requested_at.clone(),
725 payload: json!({
726 "approval_request": approval_request_json,
727 "id": approval_request.id,
728 "args": approval_request.args,
729 "principal": approval_request.principal,
730 "requested_at": requested_at,
731 "deadline": approval_request.deadline,
732 "approvers_required": approval_request.approvers_required,
733 "evidence_refs": approval_request.evidence_refs,
734 "undo_metadata": approval_request.undo_metadata,
735 "capabilities_requested": approval_request.capabilities_requested,
736 "n": n,
737 "m": m,
738 "action": action_name,
739 "approvers": approvers,
740 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
741 }),
742 };
743 create_request_waitpoint(&log, &request).await?;
744 append_request(&log, &request).await?;
745 maybe_notify_host(Some(ctx), &request);
746 emit_hitl_requested(&request);
747 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
748
749 match wait_for_request_waitpoint_with_events(
750 &request_id,
751 HitlRequestKind::DualControl,
752 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
753 )
754 .await?
755 {
756 WaitpointOutcome::Completed(record) => {
757 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
758 let mut vm = ctx.child_vm();
759 let result = vm.call_closure_pub(&action, &[]).await?;
760 ctx.forward_output(&vm.take_output());
761
762 append_named_event(
763 &log,
764 HitlRequestKind::DualControl,
765 "hitl.dual_control_executed",
766 &request_id,
767 &trace_id,
768 json!({
769 "request_id": request_id,
770 "result": crate::llm::vm_value_to_json(&result),
771 }),
772 )
773 .await?;
774
775 Ok(result)
776 }
777 WaitpointOutcome::Timeout => {
778 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
779 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
780 }
781 WaitpointOutcome::Cancelled { .. } => {
782 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
783 }
784 }
785}
786
787async fn escalate_to_impl(
788 ctx: Option<&AsyncBuiltinCtx>,
789 args: &[VmValue],
790) -> Result<VmValue, VmError> {
791 let role = required_string_arg(args, 0, "escalate_to")?;
792 let reason = required_string_arg(args, 1, "escalate_to")?;
793 let keys = current_dispatch_keys();
794 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
795 let trace_id = keys
796 .as_ref()
797 .map(|keys| keys.trace_id.clone())
798 .unwrap_or_else(new_trace_id);
799 let log = ensure_hitl_event_log();
800 let request = HitlRequestEnvelope {
801 request_id: request_id.clone(),
802 kind: HitlRequestKind::Escalation,
803 agent: keys
804 .as_ref()
805 .map(|keys| keys.agent.clone())
806 .unwrap_or_default(),
807 trace_id: trace_id.clone(),
808 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
809 requested_at: now_rfc3339(),
810 payload: json!({
811 "role": role,
812 "reason": reason,
813 "capability_policy": escalation_capability_policy(),
814 }),
815 };
816 create_request_waitpoint(&log, &request).await?;
817 append_request(&log, &request).await?;
818 maybe_notify_host(ctx, &request);
819 emit_hitl_requested(&request);
820 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
821
822 match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
823 .await?
824 {
825 WaitpointOutcome::Completed(record) => {
826 let accepted_at = record.completed_at.clone();
827 let reviewer = record.completed_by.clone();
828 let accepted = record
829 .value
830 .as_ref()
831 .and_then(|value| value.get("accepted"))
832 .and_then(JsonValue::as_bool)
833 .unwrap_or(true);
834 Ok(crate::stdlib::json_to_vm_value(&json!({
835 "request_id": request_id,
836 "role": role,
837 "reason": reason,
838 "trace_id": trace_id,
839 "status": if accepted { "accepted" } else { "pending" },
840 "accepted_at": accepted_at,
841 "reviewer": reviewer,
842 })))
843 }
844 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
845 WaitpointOutcome::Cancelled {
846 wait_id,
847 waitpoint_ids,
848 reason,
849 } => Err(hitl_cancelled_error(
850 &request_id,
851 HitlRequestKind::Escalation,
852 &wait_id,
853 &waitpoint_ids,
854 reason,
855 )),
856 }
857}
858
859async fn create_request_waitpoint(
860 log: &Arc<AnyEventLog>,
861 request: &HitlRequestEnvelope,
862) -> Result<(), VmError> {
863 create_waitpoint_on(
864 log,
865 Some(request.request_id.clone()),
866 Some(json!({
867 "kind": request.kind.as_str(),
868 "agent": request.agent.clone(),
869 "trace_id": request.trace_id.clone(),
870 "requested_at": request.requested_at.clone(),
871 "payload": request.payload.clone(),
872 })),
873 )
874 .await?;
875 Ok(())
876}
877
878async fn wait_for_request_waitpoint(
879 request_id: &str,
880 timeout: Option<StdDuration>,
881) -> Result<WaitpointOutcome, VmError> {
882 match wait_on_waitpoints(
883 vec![request_id.to_string()],
884 WaitpointWaitOptions { timeout },
885 )
886 .await
887 {
888 Ok(records) => Ok(WaitpointOutcome::Completed(
889 records
890 .into_iter()
891 .next()
892 .expect("single waitpoint wait result"),
893 )),
894 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
895 Err(WaitpointWaitFailure::Cancelled {
896 wait_id,
897 waitpoint_ids,
898 reason,
899 }) => Ok(WaitpointOutcome::Cancelled {
900 wait_id,
901 waitpoint_ids,
902 reason,
903 }),
904 Err(WaitpointWaitFailure::Vm(error)) => {
905 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
906 return Ok(outcome);
907 }
908 Err(error)
909 }
910 }
911}
912
913fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
914 let VmError::Thrown(VmValue::Dict(dict)) = error else {
915 return None;
916 };
917 let name = dict.get("name").and_then(vm_string)?;
918 match name {
919 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
920 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
921 wait_id: dict
922 .get("wait_id")
923 .and_then(vm_string)
924 .unwrap_or_default()
925 .to_string(),
926 waitpoint_ids: dict
927 .get("waitpoint_ids")
928 .and_then(vm_string_list)
929 .unwrap_or_default(),
930 reason: dict
931 .get("reason")
932 .and_then(vm_string)
933 .map(ToString::to_string),
934 }),
935 _ => None,
936 }
937}
938
939async fn finalize_hitl_response(
940 log: &Arc<AnyEventLog>,
941 kind: HitlRequestKind,
942 response: &HitlHostResponse,
943) -> Result<(), String> {
944 match kind {
945 HitlRequestKind::Question => {
946 if waitpoint_is_terminal(log, &response.request_id).await? {
947 return Ok(());
948 }
949 complete_waitpoint_on(
950 log,
951 &response.request_id,
952 response.answer.clone(),
953 response.reviewer.clone(),
954 response.reason.clone(),
955 response.metadata.clone(),
956 )
957 .await
958 .map(|_| ())
959 .map_err(|error| error.to_string())
960 }
961 HitlRequestKind::Escalation => {
962 if !response.accepted.unwrap_or(false)
963 || waitpoint_is_terminal(log, &response.request_id).await?
964 {
965 return Ok(());
966 }
967 complete_waitpoint_on(
968 log,
969 &response.request_id,
970 Some(json!({
971 "accepted": true,
972 "reviewer": response.reviewer,
973 "reason": response.reason,
974 "responded_at": response.responded_at,
975 })),
976 response.reviewer.clone(),
977 response.reason.clone(),
978 response.metadata.clone(),
979 )
980 .await
981 .map(|_| ())
982 .map_err(|error| error.to_string())
983 }
984 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
985 if waitpoint_is_terminal(log, &response.request_id).await? {
986 return Ok(());
987 }
988 let request = load_request_envelope(log, kind, &response.request_id)
989 .await
990 .map_err(|error| error.to_string())?;
991 match resolve_approval_state(log, kind, &request)
992 .await
993 .map_err(|error| error.to_string())?
994 {
995 ApprovalResolution::Pending => Ok(()),
996 ApprovalResolution::Approved(progress) => {
997 let record = approval_record_json(&progress);
998 append_named_event(
999 log,
1000 kind,
1001 approved_event_kind(kind),
1002 &request.request_id,
1003 &request.trace_id,
1004 json!({
1005 "request_id": request.request_id.clone(),
1006 "record": record.clone(),
1007 }),
1008 )
1009 .await
1010 .map_err(|error| error.to_string())?;
1011 complete_waitpoint_on(
1012 log,
1013 &request.request_id,
1014 Some(record),
1015 response.reviewer.clone(),
1016 progress.reason.clone(),
1017 response.metadata.clone(),
1018 )
1019 .await
1020 .map(|_| ())
1021 .map_err(|error| error.to_string())
1022 }
1023 ApprovalResolution::Denied(denied) => {
1024 append_named_event(
1025 log,
1026 kind,
1027 denied_event_kind(kind),
1028 &request.request_id,
1029 &request.trace_id,
1030 json!({
1031 "request_id": request.request_id.clone(),
1032 "reviewer": denied.reviewer.clone(),
1033 "reason": denied.reason.clone(),
1034 }),
1035 )
1036 .await
1037 .map_err(|error| error.to_string())?;
1038 cancel_waitpoint_on(
1039 log,
1040 &request.request_id,
1041 denied.reviewer.clone(),
1042 denied.reason.clone(),
1043 denied.metadata.clone(),
1044 )
1045 .await
1046 .map(|_| ())
1047 .map_err(|error| error.to_string())
1048 }
1049 }
1050 }
1051 }
1052}
1053
1054async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1055 Ok(inspect_waitpoint_on(log, request_id)
1056 .await
1057 .map_err(|error| error.to_string())?
1058 .is_some_and(|record| record.status != WaitpointStatus::Open))
1059}
1060
1061async fn load_request_envelope(
1062 log: &Arc<AnyEventLog>,
1063 kind: HitlRequestKind,
1064 request_id: &str,
1065) -> Result<HitlRequestEnvelope, VmError> {
1066 let topic = topic(kind)?;
1067 let events = log
1068 .read_range(&topic, None, usize::MAX)
1069 .await
1070 .map_err(log_error)?;
1071 events
1072 .into_iter()
1073 .filter(|(_, event)| event.kind == kind.request_event_kind())
1074 .find_map(|(_, event)| {
1075 if !event_matches_request(&event, request_id) {
1076 return None;
1077 }
1078 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1079 })
1080 .ok_or_else(|| {
1081 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1082 })
1083}
1084
1085async fn resolve_approval_state(
1086 log: &Arc<AnyEventLog>,
1087 kind: HitlRequestKind,
1088 request: &HitlRequestEnvelope,
1089) -> Result<ApprovalResolution, VmError> {
1090 let quorum = approval_quorum_from_request(kind, request)?;
1091 let allowed_reviewers = approval_reviewers_from_request(kind, request)
1092 .into_iter()
1093 .collect::<BTreeSet<_>>();
1094 let mut progress = ApprovalProgress {
1095 request_id: request.request_id.clone(),
1096 reviewers: BTreeSet::new(),
1097 signatures: Vec::new(),
1098 reason: None,
1099 approved_at: None,
1100 };
1101 let topic = topic(kind)?;
1102 let events = log
1103 .read_range(&topic, None, usize::MAX)
1104 .await
1105 .map_err(log_error)?;
1106 for (_, event) in events {
1107 if !event_matches_request(&event, &request.request_id)
1108 || event.kind != "hitl.response_received"
1109 {
1110 continue;
1111 }
1112 let response: HitlHostResponse = serde_json::from_value(event.payload)
1113 .map_err(|error| VmError::Runtime(error.to_string()))?;
1114 if let Some(reviewer) = response.reviewer.as_deref() {
1115 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1116 continue;
1117 }
1118 if progress.reviewers.contains(reviewer) {
1119 continue;
1120 }
1121 }
1122 if response.approved.unwrap_or(false) {
1123 if let Some(reviewer) = response.reviewer.clone() {
1124 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1125 progress.reviewers.insert(reviewer.clone());
1126 progress.signatures.push(ApprovalSignature {
1127 reviewer: reviewer.clone(),
1128 signed_at: signed_at.clone(),
1129 signature: response.signature.clone().unwrap_or_else(|| {
1130 approval_receipt_signature(
1131 &request.request_id,
1132 &reviewer,
1133 &signed_at,
1134 true,
1135 response.reason.as_deref(),
1136 )
1137 }),
1138 });
1139 }
1140 progress.reason = response.reason.clone();
1141 progress.approved_at = response.responded_at.clone();
1142 if progress.reviewers.len() as u32 >= quorum {
1143 return Ok(ApprovalResolution::Approved(progress));
1144 }
1145 continue;
1146 }
1147 return Ok(ApprovalResolution::Denied(response));
1148 }
1149 Ok(ApprovalResolution::Pending)
1150}
1151
1152fn approval_quorum_from_request(
1153 kind: HitlRequestKind,
1154 request: &HitlRequestEnvelope,
1155) -> Result<u32, VmError> {
1156 let key = match kind {
1157 HitlRequestKind::DualControl => "n",
1158 _ => "quorum",
1159 };
1160 let quorum = request
1161 .payload
1162 .get(key)
1163 .or_else(|| request.payload.get("approvers_required"))
1164 .or_else(|| {
1165 request
1166 .payload
1167 .get("approval_request")
1168 .and_then(|approval| approval.get("approvers_required"))
1169 })
1170 .and_then(JsonValue::as_u64)
1171 .unwrap_or(1);
1172 u32::try_from(quorum).map_err(|_| {
1173 VmError::Runtime(format!(
1174 "invalid quorum in HITL request '{}'",
1175 request.request_id
1176 ))
1177 })
1178}
1179
1180fn approval_reviewers_from_request(
1181 kind: HitlRequestKind,
1182 request: &HitlRequestEnvelope,
1183) -> Vec<String> {
1184 let key = match kind {
1185 HitlRequestKind::DualControl => "approvers",
1186 _ => "reviewers",
1187 };
1188 request
1189 .payload
1190 .get(key)
1191 .or_else(|| {
1192 request
1193 .payload
1194 .get("approval_request")
1195 .and_then(|approval| approval.get("reviewers"))
1196 })
1197 .and_then(JsonValue::as_array)
1198 .map(|values| {
1199 values
1200 .iter()
1201 .filter_map(JsonValue::as_str)
1202 .map(str::to_string)
1203 .collect()
1204 })
1205 .unwrap_or_default()
1206}
1207
1208fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1209 json!({
1210 "request_id": progress.request_id.clone(),
1211 "approved": true,
1212 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1213 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1214 "reason": progress.reason,
1215 "signatures": progress.signatures,
1216 })
1217}
1218
1219fn approval_receipt_signature(
1220 request_id: &str,
1221 reviewer: &str,
1222 signed_at: &str,
1223 approved: bool,
1224 reason: Option<&str>,
1225) -> String {
1226 let material = format!(
1227 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1228 reason.unwrap_or("")
1229 );
1230 let hash = sha2::Sha256::digest(material.as_bytes());
1231 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1232 format!("sha256:{hex}")
1233}
1234
1235fn approval_record_from_waitpoint(
1236 record: &WaitpointRecord,
1237 builtin: &str,
1238) -> Result<VmValue, VmError> {
1239 record
1240 .value
1241 .as_ref()
1242 .map(crate::stdlib::json_to_vm_value)
1243 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1244}
1245
1246async fn approval_wait_error(
1247 log: &Arc<AnyEventLog>,
1248 kind: HitlRequestKind,
1249 request_id: &str,
1250) -> VmError {
1251 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1252 if record.status == WaitpointStatus::Cancelled
1253 && record.reason.as_deref() != Some("upstream_cancelled")
1254 {
1255 return approval_denied_error(
1256 request_id,
1257 HitlHostResponse {
1258 request_id: request_id.to_string(),
1259 answer: None,
1260 approved: Some(false),
1261 accepted: None,
1262 reviewer: record.cancelled_by.clone(),
1263 reason: record.reason.clone(),
1264 metadata: record.metadata.clone(),
1265 responded_at: record.cancelled_at,
1266 signature: None,
1267 },
1268 );
1269 }
1270 if record.status == WaitpointStatus::Cancelled {
1271 return hitl_cancelled_error(
1272 request_id,
1273 kind,
1274 "",
1275 &[request_id.to_string()],
1276 record.reason,
1277 );
1278 }
1279 }
1280 hitl_cancelled_error(
1281 request_id,
1282 kind,
1283 "",
1284 &[request_id.to_string()],
1285 Some("upstream_cancelled".to_string()),
1286 )
1287}
1288
1289async fn append_timeout_once(
1290 log: &Arc<AnyEventLog>,
1291 kind: HitlRequestKind,
1292 request_id: &str,
1293 trace_id: &str,
1294) -> Result<(), VmError> {
1295 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1296 return Ok(());
1297 }
1298 append_timeout(log, kind, request_id, trace_id).await
1299}
1300
1301async fn hitl_event_exists(
1302 log: &Arc<AnyEventLog>,
1303 kind: HitlRequestKind,
1304 request_id: &str,
1305 event_kind: &str,
1306) -> Result<bool, VmError> {
1307 let topic = topic(kind)?;
1308 let events = log
1309 .read_range(&topic, None, usize::MAX)
1310 .await
1311 .map_err(log_error)?;
1312 Ok(events
1313 .into_iter()
1314 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1315}
1316
1317fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1318 match kind {
1319 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1320 _ => "hitl.approval_approved",
1321 }
1322}
1323
1324fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1325 match kind {
1326 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1327 _ => "hitl.approval_denied",
1328 }
1329}
1330
1331async fn append_request(
1332 log: &Arc<AnyEventLog>,
1333 request: &HitlRequestEnvelope,
1334) -> Result<(), VmError> {
1335 let topic = topic(request.kind)?;
1336 log.append(
1337 &topic,
1338 LogEvent::new(
1339 request.kind.request_event_kind(),
1340 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1341 )
1342 .with_headers(request_headers(request)),
1343 )
1344 .await
1345 .map(|_| ())
1346 .map_err(log_error)
1347}
1348
1349async fn append_named_event(
1350 log: &Arc<AnyEventLog>,
1351 kind: HitlRequestKind,
1352 event_kind: &str,
1353 request_id: &str,
1354 trace_id: &str,
1355 payload: JsonValue,
1356) -> Result<(), VmError> {
1357 let topic = topic(kind)?;
1358 let headers = headers_with_trace(request_id, trace_id);
1359 log.append(
1360 &topic,
1361 LogEvent::new(event_kind, payload).with_headers(headers),
1362 )
1363 .await
1364 .map(|_| ())
1365 .map_err(log_error)
1366}
1367
1368async fn append_timeout(
1369 log: &Arc<AnyEventLog>,
1370 kind: HitlRequestKind,
1371 request_id: &str,
1372 trace_id: &str,
1373) -> Result<(), VmError> {
1374 append_named_event(
1375 log,
1376 kind,
1377 "hitl.timeout",
1378 request_id,
1379 trace_id,
1380 serde_json::to_value(HitlTimeoutRecord {
1381 request_id: request_id.to_string(),
1382 kind,
1383 trace_id: trace_id.to_string(),
1384 timed_out_at: now_rfc3339(),
1385 })
1386 .map_err(|error| VmError::Runtime(error.to_string()))?,
1387 )
1388 .await
1389}
1390
1391async fn maybe_apply_mock_response(
1392 kind: HitlRequestKind,
1393 request_id: &str,
1394 request_payload: &JsonValue,
1395) -> Result<(), VmError> {
1396 let mut params = request_payload
1397 .as_object()
1398 .cloned()
1399 .unwrap_or_default()
1400 .into_iter()
1401 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1402 .collect::<BTreeMap<_, _>>();
1403 params.insert(
1404 "request_id".to_string(),
1405 VmValue::String(std::sync::Arc::from(request_id.to_string())),
1406 );
1407 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1408 return Ok(());
1409 };
1410 let value = result?;
1411 let responses = match value {
1412 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1413 other => vec![other],
1414 };
1415 for response in responses {
1416 let response_dict = response.as_dict().ok_or_else(|| {
1417 VmError::Runtime(format!(
1418 "mocked HITL {} response must be a dict or list<dict>",
1419 kind.as_str()
1420 ))
1421 })?;
1422 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1423 append_hitl_response(None, hitl_response)
1424 .await
1425 .map_err(VmError::Runtime)?;
1426 }
1427 Ok(())
1428}
1429
1430fn parse_hitl_response_dict(
1431 request_id: &str,
1432 response_dict: &BTreeMap<String, VmValue>,
1433) -> Result<HitlHostResponse, VmError> {
1434 Ok(HitlHostResponse {
1435 request_id: request_id.to_string(),
1436 answer: response_dict
1437 .get("answer")
1438 .map(crate::llm::vm_value_to_json),
1439 approved: response_dict.get("approved").and_then(vm_bool),
1440 accepted: response_dict.get("accepted").and_then(vm_bool),
1441 reviewer: response_dict.get("reviewer").map(VmValue::display),
1442 reason: response_dict.get("reason").map(VmValue::display),
1443 metadata: response_dict
1444 .get("metadata")
1445 .map(crate::llm::vm_value_to_json),
1446 responded_at: response_dict.get("responded_at").map(VmValue::display),
1447 signature: response_dict.get("signature").map(VmValue::display),
1448 })
1449}
1450
1451fn maybe_notify_host(ctx: Option<&AsyncBuiltinCtx>, request: &HitlRequestEnvelope) {
1452 let Some(bridge) = ctx.and_then(|ctx| ctx.child_vm().bridge.clone()) else {
1453 return;
1454 };
1455 bridge.notify(
1456 "harn.hitl.requested",
1457 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1458 );
1459}
1460
1461fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1468 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1469 return;
1470 };
1471 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1472 session_id,
1473 request_id: request.request_id.clone(),
1474 kind: request.kind.as_str().to_string(),
1475 payload: request.payload.clone(),
1476 });
1477}
1478
1479fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1484 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1485 return;
1486 };
1487 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1488 session_id,
1489 request_id: request_id.to_string(),
1490 kind: kind.as_str().to_string(),
1491 outcome: outcome.to_string(),
1492 });
1493}
1494
1495async fn wait_for_request_waitpoint_with_events(
1502 request_id: &str,
1503 kind: HitlRequestKind,
1504 timeout: Option<StdDuration>,
1505) -> Result<WaitpointOutcome, VmError> {
1506 let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1507 let label = match &outcome {
1508 Ok(WaitpointOutcome::Completed(_)) => "answered",
1509 Ok(WaitpointOutcome::Timeout) => "timeout",
1510 Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1511 Err(_) => "error",
1512 };
1513 emit_hitl_resolved(request_id, kind, label);
1514 outcome
1515}
1516
1517fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1518 let Some(value) = value else {
1519 return Ok(AskUserOptions {
1520 schema: None,
1521 timeout: Some(default_question_timeout()),
1522 default: None,
1523 });
1524 };
1525 let dict = value
1526 .as_dict()
1527 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1528 Ok(AskUserOptions {
1529 schema: dict
1530 .get("schema")
1531 .cloned()
1532 .filter(|value| !matches!(value, VmValue::Nil)),
1533 timeout: dict
1534 .get("timeout")
1535 .map(parse_duration_value)
1536 .transpose()?
1537 .or_else(|| Some(default_question_timeout())),
1538 default: dict
1539 .get("default")
1540 .cloned()
1541 .filter(|value| !matches!(value, VmValue::Nil)),
1542 })
1543}
1544
1545fn default_question_timeout() -> StdDuration {
1546 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1547}
1548
1549fn escalation_capability_policy() -> JsonValue {
1550 crate::orchestration::current_execution_policy()
1551 .and_then(|policy| serde_json::to_value(policy).ok())
1552 .unwrap_or(JsonValue::Null)
1553}
1554
1555fn parse_approval_options(
1556 value: Option<&VmValue>,
1557 builtin: &str,
1558) -> Result<ApprovalOptions, VmError> {
1559 let dict = match value {
1560 None => None,
1561 Some(VmValue::Dict(dict)) => Some(dict),
1562 Some(_) => {
1563 return Err(VmError::Runtime(format!(
1564 "{builtin}: options must be a dict"
1565 )))
1566 }
1567 };
1568 let quorum = dict
1569 .and_then(|dict| dict.get("quorum"))
1570 .and_then(VmValue::as_int)
1571 .unwrap_or(1);
1572 if quorum <= 0 {
1573 return Err(VmError::Runtime(format!(
1574 "{builtin}: quorum must be positive"
1575 )));
1576 }
1577 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1578 let capabilities_requested = optional_string_list(
1579 dict.and_then(|dict| dict.get("capabilities_requested")),
1580 builtin,
1581 )?;
1582 let evidence_refs = dict
1583 .and_then(|dict| dict.get("evidence_refs"))
1584 .map(|value| match value {
1585 VmValue::List(items) => Ok(items
1586 .iter()
1587 .map(crate::llm::vm_value_to_json)
1588 .collect::<Vec<_>>()),
1589 _ => Err(VmError::Runtime(format!(
1590 "{builtin}: evidence_refs must be a list"
1591 ))),
1592 })
1593 .transpose()?
1594 .unwrap_or_default();
1595 let deadline = dict
1596 .and_then(|dict| dict.get("deadline"))
1597 .map(parse_duration_value)
1598 .transpose()?
1599 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1600 Ok(ApprovalOptions {
1601 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1602 args: dict.and_then(|dict| dict.get("args")).cloned(),
1603 quorum: quorum as u32,
1604 reviewers,
1605 deadline,
1606 principal: dict
1607 .and_then(|dict| dict.get("principal"))
1608 .map(VmValue::display)
1609 .filter(|value| !value.is_empty()),
1610 evidence_refs,
1611 undo_metadata: dict
1612 .and_then(|dict| dict.get("undo_metadata"))
1613 .map(crate::llm::vm_value_to_json),
1614 capabilities_requested,
1615 })
1616}
1617
1618fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1619 args.get(idx)
1620 .map(VmValue::display)
1621 .filter(|value| !value.is_empty())
1622 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1623}
1624
1625fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1626 let value = args
1627 .get(idx)
1628 .and_then(VmValue::as_int)
1629 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1630 if value <= 0 {
1631 return Err(VmError::Runtime(format!(
1632 "{builtin}: expected a positive int at {idx}"
1633 )));
1634 }
1635 Ok(value)
1636}
1637
1638fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1639 let Some(value) = value else {
1640 return Ok(Vec::new());
1641 };
1642 match value {
1643 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1644 _ => Err(VmError::Runtime(format!(
1645 "{builtin}: expected list<string>"
1646 ))),
1647 }
1648}
1649
1650fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1651 duration_from_value(value, "hitl", "timeout", ErrorKind::Runtime)
1652}
1653
1654fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1655 active_event_log()
1656 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1657}
1658
1659fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1660 if let Some(log) = active_event_log() {
1661 return Ok(log);
1662 }
1663 let Some(base_dir) = base_dir else {
1664 return Ok(install_memory_for_current_thread(
1665 HITL_EVENT_LOG_QUEUE_DEPTH,
1666 ));
1667 };
1668 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1669}
1670
1671fn current_dispatch_keys() -> Option<DispatchKeys> {
1672 let context = current_dispatch_context()?;
1673 let stable_base = context
1674 .replay_of_event_id
1675 .clone()
1676 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1677 let instance_key = format!(
1678 "{}::{}",
1679 context.trigger_event.id.0,
1680 context.replay_of_event_id.as_deref().unwrap_or("live")
1681 );
1682 Some(DispatchKeys {
1683 instance_key,
1684 stable_base,
1685 agent: context.agent_id,
1686 trace_id: context.trigger_event.trace_id.0,
1687 })
1688}
1689
1690fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1691 if let Some(keys) = dispatch_keys {
1692 let seq = REQUEST_SEQUENCE.with(|slot| {
1693 let mut state = slot.borrow_mut();
1694 if state.instance_key != keys.instance_key {
1695 state.instance_key = keys.instance_key.clone();
1696 state.next_seq = 0;
1697 }
1698 state.next_seq += 1;
1699 state.next_seq
1700 });
1701 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1702 }
1703 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1704}
1705
1706fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1707 let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1708 if let Some(run_id) = request.run_id.as_ref() {
1709 headers.insert("run_id".to_string(), run_id.clone());
1710 }
1711 headers
1712}
1713
1714fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1715 let mut headers = BTreeMap::new();
1716 headers.insert("request_id".to_string(), request_id.to_string());
1717 headers
1718}
1719
1720fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1721 let mut headers = response_headers(request_id);
1722 headers.insert("trace_id".to_string(), trace_id.to_string());
1723 headers
1724}
1725
1726fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1727 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1728}
1729
1730fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1731 event
1732 .headers
1733 .get("request_id")
1734 .is_some_and(|value| value == request_id)
1735 || event
1736 .payload
1737 .get("request_id")
1738 .and_then(JsonValue::as_str)
1739 .is_some_and(|value| value == request_id)
1740}
1741
1742fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1743 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1744 "name": "ApprovalDeniedError",
1745 "category": "generic",
1746 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1747 "request_id": request_id,
1748 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1749 "reason": response.reason,
1750 })))
1751}
1752
1753fn hitl_cancelled_error(
1754 request_id: &str,
1755 kind: HitlRequestKind,
1756 wait_id: &str,
1757 waitpoint_ids: &[String],
1758 reason: Option<String>,
1759) -> VmError {
1760 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1761 let message = reason
1762 .clone()
1763 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1764 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1765 "name": "HumanCancelledError",
1766 "category": ErrorCategory::Cancelled.as_str(),
1767 "message": message,
1768 "request_id": request_id,
1769 "kind": kind.as_str(),
1770 "wait_id": wait_id,
1771 "waitpoint_ids": waitpoint_ids,
1772 "reason": reason,
1773 })))
1774}
1775
1776fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1777 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1778 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1779 "name": "HumanTimeoutError",
1780 "category": ErrorCategory::Timeout.as_str(),
1781 "message": format!("{} timed out", kind.as_str()),
1782 "request_id": request_id,
1783 "kind": kind.as_str(),
1784 })))
1785}
1786
1787fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1788 match default {
1789 VmValue::Int(_) => match value {
1790 VmValue::Int(_) => value.clone(),
1791 VmValue::Float(number) => VmValue::Int(*number as i64),
1792 VmValue::String(text) => text
1793 .parse::<i64>()
1794 .map(VmValue::Int)
1795 .unwrap_or_else(|_| default.clone()),
1796 _ => default.clone(),
1797 },
1798 VmValue::Float(_) => match value {
1799 VmValue::Float(_) => value.clone(),
1800 VmValue::Int(number) => VmValue::Float(*number as f64),
1801 VmValue::String(text) => text
1802 .parse::<f64>()
1803 .map(VmValue::Float)
1804 .unwrap_or_else(|_| default.clone()),
1805 _ => default.clone(),
1806 },
1807 VmValue::Bool(_) => match value {
1808 VmValue::Bool(_) => value.clone(),
1809 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1810 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1811 _ => default.clone(),
1812 },
1813 VmValue::String(_) => VmValue::String(std::sync::Arc::from(value.display())),
1814 VmValue::Duration(_) => match value {
1815 VmValue::Duration(_) => value.clone(),
1816 VmValue::Int(ms) => VmValue::Duration(*ms),
1817 _ => default.clone(),
1818 },
1819 VmValue::Nil => value.clone(),
1820 _ => {
1821 if value.type_name() == default.type_name() {
1822 value.clone()
1823 } else {
1824 default.clone()
1825 }
1826 }
1827 }
1828}
1829
1830fn log_error(error: impl std::fmt::Display) -> VmError {
1831 VmError::Runtime(error.to_string())
1832}
1833
1834fn now_rfc3339() -> String {
1835 format_rfc3339(OffsetDateTime::now_utc())
1836}
1837
1838fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1839 timestamp
1840 .format(&Rfc3339)
1841 .unwrap_or_else(|_| timestamp.to_string())
1842}
1843
1844fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1845 time::Duration::try_from(duration)
1846 .ok()
1847 .map(|duration| format_rfc3339(requested_at + duration))
1848}
1849
1850fn new_trace_id() -> String {
1851 format!("trace_{}", Uuid::now_v7())
1852}
1853
1854fn vm_bool(value: &VmValue) -> Option<bool> {
1855 match value {
1856 VmValue::Bool(flag) => Some(*flag),
1857 _ => None,
1858 }
1859}
1860
1861fn vm_string(value: &VmValue) -> Option<&str> {
1862 match value {
1863 VmValue::String(text) => Some(text.as_ref()),
1864 _ => None,
1865 }
1866}
1867
1868fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1869 match value {
1870 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1871 _ => None,
1872 }
1873}
1874
1875#[cfg(test)]
1876mod tests {
1877 use std::sync::OnceLock;
1878
1879 use tokio::sync::Mutex;
1880
1881 use super::{
1882 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1883 };
1884 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1885 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1886
1887 fn hitl_lock() -> &'static Mutex<()> {
1898 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
1899 LOCK.get_or_init(|| Mutex::new(()))
1900 }
1901
1902 async fn execute_hitl_script(
1903 base_dir: &std::path::Path,
1904 source: &str,
1905 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1906 reset_thread_local_state();
1907 let log = install_default_for_base_dir(base_dir).expect("install event log");
1908 let chunk = compile_source(source).expect("compile source");
1909 let mut vm = Vm::new();
1910 register_vm_stdlib(&mut vm);
1911 vm.set_source_dir(base_dir);
1912 vm.execute(&chunk).await?;
1913 let output = vm.output().trim_end().to_string();
1914 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1915 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1916 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1917 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1918 Ok((
1919 output,
1920 question_events,
1921 approval_events,
1922 dual_control_events,
1923 escalation_events,
1924 ))
1925 }
1926
1927 async fn event_kinds(
1928 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1929 topic: &str,
1930 ) -> Vec<String> {
1931 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1932 .await
1933 .expect("read topic")
1934 .into_iter()
1935 .map(|(_, event)| event.kind)
1936 .collect()
1937 }
1938
1939 async fn event_payloads(
1940 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1941 topic: &str,
1942 ) -> Vec<serde_json::Value> {
1943 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1944 .await
1945 .expect("read topic")
1946 .into_iter()
1947 .map(|(_, event)| event.payload)
1948 .collect()
1949 }
1950
1951 #[tokio::test(flavor = "current_thread")]
1952 async fn ask_user_coerces_to_default_type_and_logs_events() {
1953 tokio::task::LocalSet::new()
1954 .run_until(async {
1955 let dir = tempfile::tempdir().expect("tempdir");
1956 let source = r#"
1957pipeline test(task) {
1958 host_mock("hitl", "question", {answer: "9"})
1959 let answer: int = ask_user("Pick a number", {default: 0})
1960 __io_println(answer)
1961}
1962"#;
1963 let (
1964 output,
1965 question_events,
1966 approval_events,
1967 dual_control_events,
1968 escalation_events,
1969 ) = execute_hitl_script(dir.path(), source)
1970 .await
1971 .expect("script succeeds");
1972 assert_eq!(output, "9");
1973 assert_eq!(
1974 question_events,
1975 vec![
1976 "hitl.question_asked".to_string(),
1977 "hitl.response_received".to_string()
1978 ]
1979 );
1980 assert!(approval_events.is_empty());
1981 assert!(dual_control_events.is_empty());
1982 assert!(escalation_events.is_empty());
1983 })
1984 .await;
1985 }
1986
1987 #[tokio::test(flavor = "current_thread")]
1988 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1989 let _guard = hitl_lock().lock().await;
1990 tokio::task::LocalSet::new()
1991 .run_until(async {
1992 reset_thread_local_state();
1993 let dir = tempfile::tempdir().expect("tempdir");
1994 let source = r#"
1995pipeline test(task) {
1996 host_mock("hitl", "approval", [
1997 {approved: true, reviewer: "alice", reason: "ok"},
1998 {approved: true, reviewer: "bob", reason: "ship it"},
1999 ])
2000 let record = request_approval(
2001 "deploy production",
2002 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
2003 )
2004 __io_println(record.approved)
2005 __io_println(len(record.reviewers))
2006 __io_println(record.reviewers[0])
2007 __io_println(record.reviewers[1])
2008}
2009"#;
2010 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2011 .await
2012 .expect("script succeeds");
2013 assert_eq!(
2014 approval_events,
2015 vec![
2016 "hitl.approval_requested".to_string(),
2017 "hitl.response_received".to_string(),
2018 "hitl.response_received".to_string(),
2019 "hitl.approval_approved".to_string(),
2020 ]
2021 );
2022 })
2023 .await;
2024 }
2025
2026 #[tokio::test(flavor = "current_thread")]
2027 async fn request_approval_emits_canonical_approval_request_payload() {
2028 tokio::task::LocalSet::new()
2029 .run_until(async {
2030 reset_thread_local_state();
2031 let dir = tempfile::tempdir().expect("tempdir");
2032 let log = install_default_for_base_dir(dir.path()).expect("install event log");
2033 let source = r#"
2034pipeline test(task) {
2035 host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
2036 request_approval("deploy production", {
2037 args: {environment: "prod"},
2038 quorum: 1,
2039 reviewers: ["alice"],
2040 evidence_refs: [{kind: "run", uri: "run_123"}],
2041 undo_metadata: {strategy: "rollback"},
2042 capabilities_requested: ["deploy.production"],
2043 })
2044}
2045"#;
2046 let chunk = compile_source(source).expect("compile source");
2047 let mut vm = Vm::new();
2048 register_vm_stdlib(&mut vm);
2049 vm.set_source_dir(dir.path());
2050 vm.execute(&chunk).await.expect("script succeeds");
2051
2052 let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
2053 let request_payload = &payloads[0]["payload"];
2054 let approval_request = &request_payload["approval_request"];
2055 assert_eq!(approval_request["id"], request_payload["id"]);
2056 assert_eq!(approval_request["action"], "deploy production");
2057 assert_eq!(approval_request["args"]["environment"], "prod");
2058 assert_eq!(approval_request["approvers_required"], 1);
2059 assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
2060 assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
2061 assert_eq!(
2062 approval_request["capabilities_requested"][0],
2063 "deploy.production"
2064 );
2065 assert!(approval_request["requested_at"].as_str().is_some());
2066 assert!(approval_request["deadline"].as_str().is_some());
2067 })
2068 .await;
2069 }
2070
2071 #[tokio::test(flavor = "current_thread")]
2072 async fn request_approval_surfaces_denials_as_typed_errors() {
2073 let _guard = hitl_lock().lock().await;
2074 tokio::task::LocalSet::new()
2075 .run_until(async {
2076 reset_thread_local_state();
2077 let dir = tempfile::tempdir().expect("tempdir");
2078 let source = r#"
2079pipeline test(task) {
2080 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2081 let denied = try {
2082 request_approval("drop table", {reviewers: ["alice"]})
2083 }
2084 __io_println(is_err(denied))
2085 __io_println(unwrap_err(denied).name)
2086 __io_println(unwrap_err(denied).reason)
2087}
2088"#;
2089 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2090 .await
2091 .expect("script succeeds");
2092 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2093 assert_eq!(
2094 approval_events,
2095 vec![
2096 "hitl.approval_requested".to_string(),
2097 "hitl.response_received".to_string(),
2098 "hitl.approval_denied".to_string(),
2099 ]
2100 );
2101 })
2102 .await;
2103 }
2104
2105 #[tokio::test(flavor = "current_thread")]
2106 async fn dual_control_executes_action_after_quorum() {
2107 tokio::task::LocalSet::new()
2108 .run_until(async {
2109 let dir = tempfile::tempdir().expect("tempdir");
2110 let source = r#"
2111pipeline test(task) {
2112 host_mock("hitl", "dual_control", [
2113 {approved: true, reviewer: "alice"},
2114 {approved: true, reviewer: "bob"},
2115 ])
2116 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2117 __io_println(result)
2118}
2119"#;
2120 let (output, _, _, dual_control_events, _) =
2121 execute_hitl_script(dir.path(), source)
2122 .await
2123 .expect("script succeeds");
2124 assert_eq!(output, "launched");
2125 assert_eq!(
2126 dual_control_events,
2127 vec![
2128 "hitl.dual_control_requested".to_string(),
2129 "hitl.response_received".to_string(),
2130 "hitl.response_received".to_string(),
2131 "hitl.dual_control_approved".to_string(),
2132 "hitl.dual_control_executed".to_string(),
2133 ]
2134 );
2135 })
2136 .await;
2137 }
2138
2139 #[tokio::test(flavor = "current_thread")]
2140 async fn escalate_to_waits_for_acceptance_event() {
2141 tokio::task::LocalSet::new()
2142 .run_until(async {
2143 let dir = tempfile::tempdir().expect("tempdir");
2144 let source = r#"
2145pipeline test(task) {
2146 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2147 let handle = escalate_to("admin", "need override")
2148 __io_println(handle.status)
2149 __io_println(handle.reviewer)
2150}
2151"#;
2152 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2153 .await
2154 .expect("script succeeds");
2155 assert_eq!(output, "accepted\nlead");
2156 assert_eq!(
2157 escalation_events,
2158 vec![
2159 "hitl.escalation_issued".to_string(),
2160 "hitl.escalation_accepted".to_string(),
2161 ]
2162 );
2163 })
2164 .await;
2165 }
2166
2167 #[tokio::test(flavor = "current_thread")]
2173 async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2174 use std::sync::Mutex as StdMutex;
2175
2176 tokio::task::LocalSet::new()
2177 .run_until(async {
2178 let dir = tempfile::tempdir().expect("tempdir");
2179 let session_id = "hitl-session".to_string();
2180 let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2181 std::sync::Arc::new(StdMutex::new(Vec::new()));
2182
2183 struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2184 impl crate::agent_events::AgentEventSink for CaptureSink {
2185 fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2186 self.0.lock().expect("captured").push(event.clone());
2187 }
2188 }
2189
2190 crate::reset_thread_local_state();
2196 crate::event_log::install_default_for_base_dir(dir.path())
2197 .expect("install event log");
2198
2199 crate::agent_events::reset_all_sinks();
2200 let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2201 std::sync::Arc::new(CaptureSink(captured.clone()));
2202 crate::agent_events::register_sink(session_id.clone(), sink);
2203 crate::agent_sessions::open_or_create(Some(session_id.clone()));
2204 let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2205
2206 let source = r#"
2207pipeline test(task) {
2208 host_mock("hitl", "question", {answer: "ok"})
2209 let answer: string = ask_user("Are you sure?", {default: "no"})
2210 __io_println(answer)
2211}
2212"#;
2213 let chunk = crate::compile_source(source).expect("compile source");
2214 let mut vm = Vm::new();
2215 register_vm_stdlib(&mut vm);
2216 vm.set_source_dir(dir.path());
2217 vm.execute(&chunk).await.expect("script runs");
2218 assert_eq!(vm.output().trim_end(), "ok");
2219
2220 let events = captured.lock().expect("captured");
2221 let mut iter = events.iter().filter(|event| {
2222 matches!(
2223 event,
2224 crate::agent_events::AgentEvent::HitlRequested { .. }
2225 | crate::agent_events::AgentEvent::HitlResolved { .. }
2226 )
2227 });
2228 let requested = iter.next().expect("HitlRequested emitted");
2229 let resolved = iter.next().expect("HitlResolved emitted");
2230 assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2231
2232 let crate::agent_events::AgentEvent::HitlRequested {
2233 session_id: req_session,
2234 request_id: req_id,
2235 kind: req_kind,
2236 payload,
2237 } = requested
2238 else {
2239 panic!("expected HitlRequested, got: {requested:?}");
2240 };
2241 assert_eq!(req_session, &session_id);
2242 assert_eq!(req_kind, "question");
2243 assert!(req_id.starts_with("hitl_question_"));
2244 assert_eq!(payload["prompt"], "Are you sure?");
2245
2246 let crate::agent_events::AgentEvent::HitlResolved {
2247 request_id: res_id,
2248 kind: res_kind,
2249 outcome,
2250 ..
2251 } = resolved
2252 else {
2253 panic!("expected HitlResolved, got: {resolved:?}");
2254 };
2255 assert_eq!(res_id, req_id);
2256 assert_eq!(res_kind, "question");
2257 assert_eq!(outcome, "answered");
2258
2259 drop(_guard);
2260 crate::agent_events::reset_all_sinks();
2261 })
2262 .await;
2263 }
2264}