1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::runtime_limits::RuntimeLimits;
20use crate::schema::schema_expect_value;
21use crate::stdlib::host::dispatch_mock_host_call;
22use crate::stdlib::macros::{harn_builtin, BuiltinSignature, Param, VmBuiltinDef, TY_ANY, TY_DICT};
23use crate::stdlib::waitpoint::{
24 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
25 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
26 WaitpointWaitOptions,
27};
28use crate::triggers::dispatcher::current_dispatch_context;
29use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
30use crate::vm::{clone_async_builtin_child_vm, Vm};
31
32const HITL_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
33const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
34const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
35
36pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
37pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
38pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
39pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
40
41thread_local! {
42 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
43}
44
45#[derive(Default)]
46pub(crate) struct RequestSequenceState {
47 pub(crate) instance_key: String,
48 pub(crate) next_seq: u64,
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub enum HitlRequestKind {
54 Question,
55 Approval,
56 DualControl,
57 Escalation,
58}
59
60impl HitlRequestKind {
61 pub(crate) fn as_str(self) -> &'static str {
62 match self {
63 Self::Question => "question",
64 Self::Approval => "approval",
65 Self::DualControl => "dual_control",
66 Self::Escalation => "escalation",
67 }
68 }
69
70 fn topic(self) -> &'static str {
71 match self {
72 Self::Question => HITL_QUESTIONS_TOPIC,
73 Self::Approval => HITL_APPROVALS_TOPIC,
74 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
75 Self::Escalation => HITL_ESCALATIONS_TOPIC,
76 }
77 }
78
79 fn request_event_kind(self) -> &'static str {
80 match self {
81 Self::Question => "hitl.question_asked",
82 Self::Approval => "hitl.approval_requested",
83 Self::DualControl => "hitl.dual_control_requested",
84 Self::Escalation => "hitl.escalation_issued",
85 }
86 }
87
88 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
89 if request_id.starts_with("hitl_question_") {
90 Some(Self::Question)
91 } else if request_id.starts_with("hitl_approval_") {
92 Some(Self::Approval)
93 } else if request_id.starts_with("hitl_dual_control_") {
94 Some(Self::DualControl)
95 } else if request_id.starts_with("hitl_escalation_") {
96 Some(Self::Escalation)
97 } else {
98 None
99 }
100 }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct HitlHostResponse {
105 pub request_id: String,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub answer: Option<JsonValue>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub approved: Option<bool>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub accepted: Option<bool>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub reviewer: Option<String>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub reason: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub metadata: Option<JsonValue>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub responded_at: Option<String>,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 pub signature: Option<String>,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125struct HitlRequestEnvelope {
126 request_id: String,
127 kind: HitlRequestKind,
128 #[serde(default)]
129 agent: String,
130 trace_id: String,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 run_id: Option<String>,
133 requested_at: String,
134 payload: JsonValue,
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize)]
138struct HitlTimeoutRecord {
139 request_id: String,
140 kind: HitlRequestKind,
141 trace_id: String,
142 timed_out_at: String,
143}
144
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct ApprovalRequest {
147 pub id: String,
148 pub action: String,
149 #[serde(default)]
150 pub args: JsonValue,
151 pub principal: String,
152 pub requested_at: String,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub deadline: Option<String>,
155 pub approvers_required: u32,
156 #[serde(default)]
157 pub evidence_refs: Vec<JsonValue>,
158 #[serde(default)]
159 pub undo_metadata: JsonValue,
160 #[serde(default)]
161 pub capabilities_requested: Vec<String>,
162}
163
164impl ApprovalRequest {
165 pub fn new(
166 id: impl Into<String>,
167 action: impl Into<String>,
168 args: JsonValue,
169 principal: impl Into<String>,
170 requested_at: impl Into<String>,
171 ) -> Self {
172 Self {
173 id: id.into(),
174 action: action.into(),
175 args,
176 principal: principal.into(),
177 requested_at: requested_at.into(),
178 deadline: None,
179 approvers_required: 1,
180 evidence_refs: Vec::new(),
181 undo_metadata: JsonValue::Null,
182 capabilities_requested: Vec::new(),
183 }
184 }
185}
186
187pub(crate) fn approval_request_for_host_permission(
188 id: impl Into<String>,
189 action: impl Into<String>,
190 args: JsonValue,
191 principal: impl Into<String>,
192 evidence_refs: Vec<JsonValue>,
193 undo_metadata: JsonValue,
194 capabilities_requested: Vec<String>,
195) -> ApprovalRequest {
196 let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
197 request.evidence_refs = evidence_refs;
198 request.undo_metadata = undo_metadata;
199 request.capabilities_requested = capabilities_requested;
200 request
201}
202
203#[derive(Clone, Debug)]
204struct DispatchKeys {
205 instance_key: String,
206 stable_base: String,
207 agent: String,
208 trace_id: String,
209}
210
211#[derive(Clone, Debug)]
212struct AskUserOptions {
213 schema: Option<VmValue>,
214 timeout: Option<StdDuration>,
215 default: Option<VmValue>,
216}
217
218#[derive(Clone, Debug)]
219struct ApprovalOptions {
220 detail: Option<VmValue>,
221 args: Option<VmValue>,
222 quorum: u32,
223 reviewers: Vec<String>,
224 deadline: StdDuration,
225 principal: Option<String>,
226 evidence_refs: Vec<JsonValue>,
227 undo_metadata: Option<JsonValue>,
228 capabilities_requested: Vec<String>,
229}
230
231#[derive(Clone, Debug)]
232struct ApprovalProgress {
233 request_id: String,
234 reviewers: BTreeSet<String>,
235 signatures: Vec<ApprovalSignature>,
236 reason: Option<String>,
237 approved_at: Option<String>,
238}
239
240#[derive(Clone, Debug, Serialize)]
241struct ApprovalSignature {
242 reviewer: String,
243 signed_at: String,
244 signature: String,
245}
246
247#[derive(Clone, Debug)]
248enum ApprovalResolution {
249 Pending,
250 Approved(ApprovalProgress),
251 Denied(HitlHostResponse),
252}
253
254#[allow(clippy::large_enum_variant)]
262#[derive(Clone, Debug)]
263enum WaitpointOutcome {
264 Completed(WaitpointRecord),
265 Timeout,
266 Cancelled {
267 wait_id: String,
268 waitpoint_ids: Vec<String>,
269 reason: Option<String>,
270 },
271}
272
273pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
274 for def in MODULE_BUILTINS {
275 vm.register_builtin_def(def);
276 }
277}
278
279pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
280 &ASK_USER_BUILTIN_DEF,
281 &REQUEST_APPROVAL_BUILTIN_DEF,
282 &DUAL_CONTROL_BUILTIN_DEF,
283 &ESCALATE_TO_BUILTIN_DEF,
284];
285
286#[harn_builtin(
287 sig = "ask_user(prompt: string, options?: dict) -> any",
288 kind = "async",
289 category = "hitl"
290)]
291async fn ask_user_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
292 ask_user_impl(&args).await
293}
294
295#[harn_builtin(
296 sig_expr = BuiltinSignature::variadic("request_approval", &[Param::new("args", TY_ANY)], TY_DICT),
297 kind = "async",
298 category = "hitl"
299)]
300async fn request_approval_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
301 request_approval_impl(&args).await
302}
303
304#[harn_builtin(
305 sig = "dual_control(n: int, m: int, action: closure, approvers?: list) -> dict",
306 kind = "async",
307 category = "hitl"
308)]
309async fn dual_control_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
310 dual_control_impl(&args).await
311}
312
313#[harn_builtin(
314 sig = "escalate_to(role: string, reason: string) -> dict",
315 kind = "async",
316 category = "hitl"
317)]
318async fn escalate_to_builtin(args: Vec<VmValue>) -> Result<VmValue, VmError> {
319 escalate_to_impl(&args).await
320}
321
322pub(crate) fn reset_hitl_state() {
323 REQUEST_SEQUENCE.with(|slot| {
324 *slot.borrow_mut() = RequestSequenceState::default();
325 });
326}
327
328pub(crate) fn take_hitl_state() -> RequestSequenceState {
329 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
330}
331
332pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
333 REQUEST_SEQUENCE.with(|slot| {
334 *slot.borrow_mut() = state;
335 });
336}
337
338pub async fn append_hitl_response(
339 base_dir: Option<&Path>,
340 mut response: HitlHostResponse,
341) -> Result<u64, String> {
342 let kind = HitlRequestKind::from_request_id(&response.request_id)
343 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
344 if response.responded_at.is_none() {
345 response.responded_at = Some(now_rfc3339());
346 }
347 let log = ensure_hitl_event_log_for(base_dir)?;
348 let headers = response_headers(&response.request_id);
349 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
350 let event_id = log
351 .append(
352 &topic,
353 LogEvent::new(
354 match kind {
355 HitlRequestKind::Escalation => "hitl.escalation_accepted",
356 _ => "hitl.response_received",
357 },
358 serde_json::to_value(&response).map_err(|error| error.to_string())?,
359 )
360 .with_headers(headers),
361 )
362 .await
363 .map_err(|error| error.to_string())?;
364 finalize_hitl_response(&log, kind, &response).await?;
365 Ok(event_id)
366}
367
368pub async fn append_approval_request_on(
369 log: &Arc<AnyEventLog>,
370 agent: impl Into<String>,
371 trace_id: impl Into<String>,
372 action: impl Into<String>,
373 detail: JsonValue,
374 reviewers: Vec<String>,
375) -> Result<String, VmError> {
376 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
377 let trace_id = trace_id.into();
378 let agent = agent.into();
379 let requested_at_time = OffsetDateTime::now_utc();
380 let requested_at = format_rfc3339(requested_at_time);
381 let mut approval_request = ApprovalRequest::new(
382 request_id.clone(),
383 action.into(),
384 detail.clone(),
385 agent.clone(),
386 requested_at.clone(),
387 );
388 approval_request.deadline = deadline_after(
389 requested_at_time,
390 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
391 );
392 approval_request.approvers_required = 1;
393 let approval_request_json = serde_json::to_value(&approval_request)
394 .map_err(|error| VmError::Runtime(error.to_string()))?;
395 let request = HitlRequestEnvelope {
396 request_id: request_id.clone(),
397 kind: HitlRequestKind::Approval,
398 agent,
399 trace_id: trace_id.clone(),
400 run_id: None,
401 requested_at: requested_at.clone(),
402 payload: json!({
403 "approval_request": approval_request_json,
404 "id": approval_request.id,
405 "action": approval_request.action,
406 "args": approval_request.args,
407 "principal": approval_request.principal,
408 "requested_at": requested_at,
409 "deadline": approval_request.deadline,
410 "approvers_required": approval_request.approvers_required,
411 "evidence_refs": approval_request.evidence_refs,
412 "undo_metadata": approval_request.undo_metadata,
413 "capabilities_requested": approval_request.capabilities_requested,
414 "detail": detail,
415 "quorum": 1,
416 "reviewers": reviewers,
417 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
418 }),
419 };
420 create_request_waitpoint(log, &request).await?;
421 append_request(log, &request).await?;
422 maybe_notify_host(&request);
423 Ok(request_id)
424}
425
426async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
427 let prompt = required_string_arg(args, 0, "ask_user")?;
428 let options = parse_ask_user_options(args.get(1))?;
429 let keys = current_dispatch_keys();
430 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
431 let trace_id = keys
432 .as_ref()
433 .map(|keys| keys.trace_id.clone())
434 .unwrap_or_else(new_trace_id);
435 let log = ensure_hitl_event_log();
436 let request = HitlRequestEnvelope {
437 request_id: request_id.clone(),
438 kind: HitlRequestKind::Question,
439 agent: keys
440 .as_ref()
441 .map(|keys| keys.agent.clone())
442 .unwrap_or_default(),
443 trace_id: trace_id.clone(),
444 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
445 requested_at: now_rfc3339(),
446 payload: json!({
447 "prompt": prompt,
448 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
449 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
450 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
451 }),
452 };
453 create_request_waitpoint(&log, &request).await?;
454 append_request(&log, &request).await?;
455 maybe_notify_host(&request);
456 emit_hitl_requested(&request);
457 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
458
459 match wait_for_request_waitpoint_with_events(
460 &request_id,
461 HitlRequestKind::Question,
462 options.timeout,
463 )
464 .await?
465 {
466 WaitpointOutcome::Completed(record) => {
467 let answer = record
468 .value
469 .as_ref()
470 .map(crate::stdlib::json_to_vm_value)
471 .unwrap_or(VmValue::Nil);
472 if let Some(schema) = options.schema.as_ref() {
473 return schema_expect_value(&answer, schema, true);
474 }
475 if let Some(default) = options.default.as_ref() {
476 return Ok(coerce_like_default(&answer, default));
477 }
478 Ok(answer)
479 }
480 WaitpointOutcome::Timeout => {
481 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
482 if let Some(default) = options.default {
483 return Ok(default);
484 }
485 Err(timeout_error(&request_id, HitlRequestKind::Question))
486 }
487 WaitpointOutcome::Cancelled {
488 wait_id,
489 waitpoint_ids,
490 reason,
491 } => Err(hitl_cancelled_error(
492 &request_id,
493 HitlRequestKind::Question,
494 &wait_id,
495 &waitpoint_ids,
496 reason,
497 )),
498 }
499}
500
501async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
502 let action = required_string_arg(args, 0, "request_approval")?;
503 let options = parse_approval_options(args.get(1), "request_approval")?;
504 let keys = current_dispatch_keys();
505 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
506 let trace_id = keys
507 .as_ref()
508 .map(|keys| keys.trace_id.clone())
509 .unwrap_or_else(new_trace_id);
510 let agent = keys
511 .as_ref()
512 .map(|keys| keys.agent.clone())
513 .unwrap_or_default();
514 let requested_at_time = OffsetDateTime::now_utc();
515 let requested_at = format_rfc3339(requested_at_time);
516 let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
517 let approval_args = options
518 .args
519 .as_ref()
520 .or(options.detail.as_ref())
521 .map(crate::llm::vm_value_to_json)
522 .unwrap_or(JsonValue::Null);
523 let mut approval_request = ApprovalRequest::new(
524 request_id.clone(),
525 action.clone(),
526 approval_args,
527 principal,
528 requested_at.clone(),
529 );
530 approval_request.deadline = deadline_after(requested_at_time, options.deadline);
531 approval_request.approvers_required = options.quorum;
532 approval_request.evidence_refs = options.evidence_refs.clone();
533 approval_request.undo_metadata = options
534 .undo_metadata
535 .clone()
536 .or_else(|| {
537 crate::orchestration::current_mutation_session()
538 .and_then(|session| serde_json::to_value(session).ok())
539 })
540 .unwrap_or(JsonValue::Null);
541 approval_request.capabilities_requested = options.capabilities_requested.clone();
542 let approval_request_json = serde_json::to_value(&approval_request)
543 .map_err(|error| VmError::Runtime(error.to_string()))?;
544 let log = ensure_hitl_event_log();
545 let request = HitlRequestEnvelope {
546 request_id: request_id.clone(),
547 kind: HitlRequestKind::Approval,
548 agent,
549 trace_id: trace_id.clone(),
550 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
551 requested_at: requested_at.clone(),
552 payload: json!({
553 "approval_request": approval_request_json,
554 "id": approval_request.id,
555 "action": action,
556 "args": approval_request.args,
557 "principal": approval_request.principal,
558 "requested_at": requested_at,
559 "deadline": approval_request.deadline,
560 "approvers_required": approval_request.approvers_required,
561 "evidence_refs": approval_request.evidence_refs,
562 "undo_metadata": approval_request.undo_metadata,
563 "capabilities_requested": approval_request.capabilities_requested,
564 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
565 "quorum": options.quorum,
566 "reviewers": options.reviewers,
567 "deadline_ms": options.deadline.as_millis() as u64,
568 }),
569 };
570 create_request_waitpoint(&log, &request).await?;
571 append_request(&log, &request).await?;
572 maybe_notify_host(&request);
573 emit_hitl_requested(&request);
574 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
575
576 match wait_for_request_waitpoint_with_events(
577 &request_id,
578 HitlRequestKind::Approval,
579 Some(options.deadline),
580 )
581 .await?
582 {
583 WaitpointOutcome::Completed(record) => {
584 approval_record_from_waitpoint(&record, "request_approval")
585 }
586 WaitpointOutcome::Timeout => {
587 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
588 Err(timeout_error(&request_id, HitlRequestKind::Approval))
589 }
590 WaitpointOutcome::Cancelled { .. } => {
591 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
592 }
593 }
594}
595
596pub(crate) async fn request_approval_for_side_effect(
597 action: &str,
598 detail: JsonValue,
599 principal: String,
600 reviewers: Vec<String>,
601 capabilities_requested: Vec<String>,
602) -> Result<VmValue, VmError> {
603 let mut options = BTreeMap::new();
604 options.insert("args".to_string(), crate::stdlib::json_to_vm_value(&detail));
605 options.insert(
606 "detail".to_string(),
607 crate::stdlib::json_to_vm_value(&detail),
608 );
609 options.insert(
610 "principal".to_string(),
611 VmValue::String(Rc::from(principal)),
612 );
613 options.insert(
614 "reviewers".to_string(),
615 VmValue::List(Rc::new(
616 reviewers
617 .into_iter()
618 .map(|reviewer| VmValue::String(Rc::from(reviewer)))
619 .collect(),
620 )),
621 );
622 options.insert(
623 "capabilities_requested".to_string(),
624 VmValue::List(Rc::new(
625 capabilities_requested
626 .into_iter()
627 .map(|capability| VmValue::String(Rc::from(capability)))
628 .collect(),
629 )),
630 );
631 let args = vec![
632 VmValue::String(Rc::from(action.to_string())),
633 VmValue::Dict(Rc::new(options)),
634 ];
635 request_approval_impl(&args).await
636}
637
638async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
639 let n = required_positive_int_arg(args, 0, "dual_control")?;
640 let m = required_positive_int_arg(args, 1, "dual_control")?;
641 if n > m {
642 return Err(VmError::Runtime(
643 "dual_control: n must be less than or equal to m".to_string(),
644 ));
645 }
646 let action = args
647 .get(2)
648 .and_then(|value| match value {
649 VmValue::Closure(closure) => Some(closure.clone()),
650 _ => None,
651 })
652 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
653 let approvers = optional_string_list(args.get(3), "dual_control")?;
654 if !approvers.is_empty() && approvers.len() < m as usize {
655 return Err(VmError::Runtime(format!(
656 "dual_control: expected at least {m} approvers, got {}",
657 approvers.len()
658 )));
659 }
660
661 let keys = current_dispatch_keys();
662 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
663 let trace_id = keys
664 .as_ref()
665 .map(|keys| keys.trace_id.clone())
666 .unwrap_or_else(new_trace_id);
667 let action_name = if action.func.name.is_empty() {
668 "anonymous".to_string()
669 } else {
670 action.func.name.clone()
671 };
672 let agent = keys
673 .as_ref()
674 .map(|keys| keys.agent.clone())
675 .unwrap_or_default();
676 let requested_at_time = OffsetDateTime::now_utc();
677 let requested_at = format_rfc3339(requested_at_time);
678 let mut approval_request = ApprovalRequest::new(
679 request_id.clone(),
680 action_name.clone(),
681 json!({
682 "n": n,
683 "m": m,
684 "approvers": approvers.clone(),
685 }),
686 agent.clone(),
687 requested_at.clone(),
688 );
689 approval_request.deadline = deadline_after(
690 requested_at_time,
691 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
692 );
693 approval_request.approvers_required = n as u32;
694 approval_request.undo_metadata = crate::orchestration::current_mutation_session()
695 .and_then(|session| serde_json::to_value(session).ok())
696 .unwrap_or(JsonValue::Null);
697 let approval_request_json = serde_json::to_value(&approval_request)
698 .map_err(|error| VmError::Runtime(error.to_string()))?;
699 let log = ensure_hitl_event_log();
700 let request = HitlRequestEnvelope {
701 request_id: request_id.clone(),
702 kind: HitlRequestKind::DualControl,
703 agent,
704 trace_id: trace_id.clone(),
705 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
706 requested_at: requested_at.clone(),
707 payload: json!({
708 "approval_request": approval_request_json,
709 "id": approval_request.id,
710 "args": approval_request.args,
711 "principal": approval_request.principal,
712 "requested_at": requested_at,
713 "deadline": approval_request.deadline,
714 "approvers_required": approval_request.approvers_required,
715 "evidence_refs": approval_request.evidence_refs,
716 "undo_metadata": approval_request.undo_metadata,
717 "capabilities_requested": approval_request.capabilities_requested,
718 "n": n,
719 "m": m,
720 "action": action_name,
721 "approvers": approvers,
722 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
723 }),
724 };
725 create_request_waitpoint(&log, &request).await?;
726 append_request(&log, &request).await?;
727 maybe_notify_host(&request);
728 emit_hitl_requested(&request);
729 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
730
731 match wait_for_request_waitpoint_with_events(
732 &request_id,
733 HitlRequestKind::DualControl,
734 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
735 )
736 .await?
737 {
738 WaitpointOutcome::Completed(record) => {
739 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
740 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
741 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
742 })?;
743 let result = vm.call_closure_pub(&action, &[]).await?;
744
745 append_named_event(
746 &log,
747 HitlRequestKind::DualControl,
748 "hitl.dual_control_executed",
749 &request_id,
750 &trace_id,
751 json!({
752 "request_id": request_id,
753 "result": crate::llm::vm_value_to_json(&result),
754 }),
755 )
756 .await?;
757
758 Ok(result)
759 }
760 WaitpointOutcome::Timeout => {
761 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
762 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
763 }
764 WaitpointOutcome::Cancelled { .. } => {
765 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
766 }
767 }
768}
769
770async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
771 let role = required_string_arg(args, 0, "escalate_to")?;
772 let reason = required_string_arg(args, 1, "escalate_to")?;
773 let keys = current_dispatch_keys();
774 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
775 let trace_id = keys
776 .as_ref()
777 .map(|keys| keys.trace_id.clone())
778 .unwrap_or_else(new_trace_id);
779 let log = ensure_hitl_event_log();
780 let request = HitlRequestEnvelope {
781 request_id: request_id.clone(),
782 kind: HitlRequestKind::Escalation,
783 agent: keys
784 .as_ref()
785 .map(|keys| keys.agent.clone())
786 .unwrap_or_default(),
787 trace_id: trace_id.clone(),
788 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
789 requested_at: now_rfc3339(),
790 payload: json!({
791 "role": role,
792 "reason": reason,
793 "capability_policy": escalation_capability_policy(),
794 }),
795 };
796 create_request_waitpoint(&log, &request).await?;
797 append_request(&log, &request).await?;
798 maybe_notify_host(&request);
799 emit_hitl_requested(&request);
800 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
801
802 match wait_for_request_waitpoint_with_events(&request_id, HitlRequestKind::Escalation, None)
803 .await?
804 {
805 WaitpointOutcome::Completed(record) => {
806 let accepted_at = record.completed_at.clone();
807 let reviewer = record.completed_by.clone();
808 let accepted = record
809 .value
810 .as_ref()
811 .and_then(|value| value.get("accepted"))
812 .and_then(JsonValue::as_bool)
813 .unwrap_or(true);
814 Ok(crate::stdlib::json_to_vm_value(&json!({
815 "request_id": request_id,
816 "role": role,
817 "reason": reason,
818 "trace_id": trace_id,
819 "status": if accepted { "accepted" } else { "pending" },
820 "accepted_at": accepted_at,
821 "reviewer": reviewer,
822 })))
823 }
824 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
825 WaitpointOutcome::Cancelled {
826 wait_id,
827 waitpoint_ids,
828 reason,
829 } => Err(hitl_cancelled_error(
830 &request_id,
831 HitlRequestKind::Escalation,
832 &wait_id,
833 &waitpoint_ids,
834 reason,
835 )),
836 }
837}
838
839async fn create_request_waitpoint(
840 log: &Arc<AnyEventLog>,
841 request: &HitlRequestEnvelope,
842) -> Result<(), VmError> {
843 create_waitpoint_on(
844 log,
845 Some(request.request_id.clone()),
846 Some(json!({
847 "kind": request.kind.as_str(),
848 "agent": request.agent.clone(),
849 "trace_id": request.trace_id.clone(),
850 "requested_at": request.requested_at.clone(),
851 "payload": request.payload.clone(),
852 })),
853 )
854 .await?;
855 Ok(())
856}
857
858async fn wait_for_request_waitpoint(
859 request_id: &str,
860 timeout: Option<StdDuration>,
861) -> Result<WaitpointOutcome, VmError> {
862 match wait_on_waitpoints(
863 vec![request_id.to_string()],
864 WaitpointWaitOptions { timeout },
865 )
866 .await
867 {
868 Ok(records) => Ok(WaitpointOutcome::Completed(
869 records
870 .into_iter()
871 .next()
872 .expect("single waitpoint wait result"),
873 )),
874 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
875 Err(WaitpointWaitFailure::Cancelled {
876 wait_id,
877 waitpoint_ids,
878 reason,
879 }) => Ok(WaitpointOutcome::Cancelled {
880 wait_id,
881 waitpoint_ids,
882 reason,
883 }),
884 Err(WaitpointWaitFailure::Vm(error)) => {
885 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
886 return Ok(outcome);
887 }
888 Err(error)
889 }
890 }
891}
892
893fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
894 let VmError::Thrown(VmValue::Dict(dict)) = error else {
895 return None;
896 };
897 let name = dict.get("name").and_then(vm_string)?;
898 match name {
899 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
900 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
901 wait_id: dict
902 .get("wait_id")
903 .and_then(vm_string)
904 .unwrap_or_default()
905 .to_string(),
906 waitpoint_ids: dict
907 .get("waitpoint_ids")
908 .and_then(vm_string_list)
909 .unwrap_or_default(),
910 reason: dict
911 .get("reason")
912 .and_then(vm_string)
913 .map(ToString::to_string),
914 }),
915 _ => None,
916 }
917}
918
919async fn finalize_hitl_response(
920 log: &Arc<AnyEventLog>,
921 kind: HitlRequestKind,
922 response: &HitlHostResponse,
923) -> Result<(), String> {
924 match kind {
925 HitlRequestKind::Question => {
926 if waitpoint_is_terminal(log, &response.request_id).await? {
927 return Ok(());
928 }
929 complete_waitpoint_on(
930 log,
931 &response.request_id,
932 response.answer.clone(),
933 response.reviewer.clone(),
934 response.reason.clone(),
935 response.metadata.clone(),
936 )
937 .await
938 .map(|_| ())
939 .map_err(|error| error.to_string())
940 }
941 HitlRequestKind::Escalation => {
942 if !response.accepted.unwrap_or(false)
943 || waitpoint_is_terminal(log, &response.request_id).await?
944 {
945 return Ok(());
946 }
947 complete_waitpoint_on(
948 log,
949 &response.request_id,
950 Some(json!({
951 "accepted": true,
952 "reviewer": response.reviewer,
953 "reason": response.reason,
954 "responded_at": response.responded_at,
955 })),
956 response.reviewer.clone(),
957 response.reason.clone(),
958 response.metadata.clone(),
959 )
960 .await
961 .map(|_| ())
962 .map_err(|error| error.to_string())
963 }
964 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
965 if waitpoint_is_terminal(log, &response.request_id).await? {
966 return Ok(());
967 }
968 let request = load_request_envelope(log, kind, &response.request_id)
969 .await
970 .map_err(|error| error.to_string())?;
971 match resolve_approval_state(log, kind, &request)
972 .await
973 .map_err(|error| error.to_string())?
974 {
975 ApprovalResolution::Pending => Ok(()),
976 ApprovalResolution::Approved(progress) => {
977 let record = approval_record_json(&progress);
978 append_named_event(
979 log,
980 kind,
981 approved_event_kind(kind),
982 &request.request_id,
983 &request.trace_id,
984 json!({
985 "request_id": request.request_id.clone(),
986 "record": record.clone(),
987 }),
988 )
989 .await
990 .map_err(|error| error.to_string())?;
991 complete_waitpoint_on(
992 log,
993 &request.request_id,
994 Some(record),
995 response.reviewer.clone(),
996 progress.reason.clone(),
997 response.metadata.clone(),
998 )
999 .await
1000 .map(|_| ())
1001 .map_err(|error| error.to_string())
1002 }
1003 ApprovalResolution::Denied(denied) => {
1004 append_named_event(
1005 log,
1006 kind,
1007 denied_event_kind(kind),
1008 &request.request_id,
1009 &request.trace_id,
1010 json!({
1011 "request_id": request.request_id.clone(),
1012 "reviewer": denied.reviewer.clone(),
1013 "reason": denied.reason.clone(),
1014 }),
1015 )
1016 .await
1017 .map_err(|error| error.to_string())?;
1018 cancel_waitpoint_on(
1019 log,
1020 &request.request_id,
1021 denied.reviewer.clone(),
1022 denied.reason.clone(),
1023 denied.metadata.clone(),
1024 )
1025 .await
1026 .map(|_| ())
1027 .map_err(|error| error.to_string())
1028 }
1029 }
1030 }
1031 }
1032}
1033
1034async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
1035 Ok(inspect_waitpoint_on(log, request_id)
1036 .await
1037 .map_err(|error| error.to_string())?
1038 .is_some_and(|record| record.status != WaitpointStatus::Open))
1039}
1040
1041async fn load_request_envelope(
1042 log: &Arc<AnyEventLog>,
1043 kind: HitlRequestKind,
1044 request_id: &str,
1045) -> Result<HitlRequestEnvelope, VmError> {
1046 let topic = topic(kind)?;
1047 let events = log
1048 .read_range(&topic, None, usize::MAX)
1049 .await
1050 .map_err(log_error)?;
1051 events
1052 .into_iter()
1053 .filter(|(_, event)| event.kind == kind.request_event_kind())
1054 .find_map(|(_, event)| {
1055 if !event_matches_request(&event, request_id) {
1056 return None;
1057 }
1058 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
1059 })
1060 .ok_or_else(|| {
1061 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
1062 })
1063}
1064
1065async fn resolve_approval_state(
1066 log: &Arc<AnyEventLog>,
1067 kind: HitlRequestKind,
1068 request: &HitlRequestEnvelope,
1069) -> Result<ApprovalResolution, VmError> {
1070 let quorum = approval_quorum_from_request(kind, request)?;
1071 let allowed_reviewers = approval_reviewers_from_request(kind, request)
1072 .into_iter()
1073 .collect::<BTreeSet<_>>();
1074 let mut progress = ApprovalProgress {
1075 request_id: request.request_id.clone(),
1076 reviewers: BTreeSet::new(),
1077 signatures: Vec::new(),
1078 reason: None,
1079 approved_at: None,
1080 };
1081 let topic = topic(kind)?;
1082 let events = log
1083 .read_range(&topic, None, usize::MAX)
1084 .await
1085 .map_err(log_error)?;
1086 for (_, event) in events {
1087 if !event_matches_request(&event, &request.request_id)
1088 || event.kind != "hitl.response_received"
1089 {
1090 continue;
1091 }
1092 let response: HitlHostResponse = serde_json::from_value(event.payload)
1093 .map_err(|error| VmError::Runtime(error.to_string()))?;
1094 if let Some(reviewer) = response.reviewer.as_deref() {
1095 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
1096 continue;
1097 }
1098 if progress.reviewers.contains(reviewer) {
1099 continue;
1100 }
1101 }
1102 if response.approved.unwrap_or(false) {
1103 if let Some(reviewer) = response.reviewer.clone() {
1104 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1105 progress.reviewers.insert(reviewer.clone());
1106 progress.signatures.push(ApprovalSignature {
1107 reviewer: reviewer.clone(),
1108 signed_at: signed_at.clone(),
1109 signature: response.signature.clone().unwrap_or_else(|| {
1110 approval_receipt_signature(
1111 &request.request_id,
1112 &reviewer,
1113 &signed_at,
1114 true,
1115 response.reason.as_deref(),
1116 )
1117 }),
1118 });
1119 }
1120 progress.reason = response.reason.clone();
1121 progress.approved_at = response.responded_at.clone();
1122 if progress.reviewers.len() as u32 >= quorum {
1123 return Ok(ApprovalResolution::Approved(progress));
1124 }
1125 continue;
1126 }
1127 return Ok(ApprovalResolution::Denied(response));
1128 }
1129 Ok(ApprovalResolution::Pending)
1130}
1131
1132fn approval_quorum_from_request(
1133 kind: HitlRequestKind,
1134 request: &HitlRequestEnvelope,
1135) -> Result<u32, VmError> {
1136 let key = match kind {
1137 HitlRequestKind::DualControl => "n",
1138 _ => "quorum",
1139 };
1140 let quorum = request
1141 .payload
1142 .get(key)
1143 .or_else(|| request.payload.get("approvers_required"))
1144 .or_else(|| {
1145 request
1146 .payload
1147 .get("approval_request")
1148 .and_then(|approval| approval.get("approvers_required"))
1149 })
1150 .and_then(JsonValue::as_u64)
1151 .unwrap_or(1);
1152 u32::try_from(quorum).map_err(|_| {
1153 VmError::Runtime(format!(
1154 "invalid quorum in HITL request '{}'",
1155 request.request_id
1156 ))
1157 })
1158}
1159
1160fn approval_reviewers_from_request(
1161 kind: HitlRequestKind,
1162 request: &HitlRequestEnvelope,
1163) -> Vec<String> {
1164 let key = match kind {
1165 HitlRequestKind::DualControl => "approvers",
1166 _ => "reviewers",
1167 };
1168 request
1169 .payload
1170 .get(key)
1171 .or_else(|| {
1172 request
1173 .payload
1174 .get("approval_request")
1175 .and_then(|approval| approval.get("reviewers"))
1176 })
1177 .and_then(JsonValue::as_array)
1178 .map(|values| {
1179 values
1180 .iter()
1181 .filter_map(JsonValue::as_str)
1182 .map(str::to_string)
1183 .collect()
1184 })
1185 .unwrap_or_default()
1186}
1187
1188fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1189 json!({
1190 "request_id": progress.request_id.clone(),
1191 "approved": true,
1192 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1193 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1194 "reason": progress.reason,
1195 "signatures": progress.signatures,
1196 })
1197}
1198
1199fn approval_receipt_signature(
1200 request_id: &str,
1201 reviewer: &str,
1202 signed_at: &str,
1203 approved: bool,
1204 reason: Option<&str>,
1205) -> String {
1206 let material = format!(
1207 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1208 reason.unwrap_or("")
1209 );
1210 let hash = sha2::Sha256::digest(material.as_bytes());
1211 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1212 format!("sha256:{hex}")
1213}
1214
1215fn approval_record_from_waitpoint(
1216 record: &WaitpointRecord,
1217 builtin: &str,
1218) -> Result<VmValue, VmError> {
1219 record
1220 .value
1221 .as_ref()
1222 .map(crate::stdlib::json_to_vm_value)
1223 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1224}
1225
1226async fn approval_wait_error(
1227 log: &Arc<AnyEventLog>,
1228 kind: HitlRequestKind,
1229 request_id: &str,
1230) -> VmError {
1231 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1232 if record.status == WaitpointStatus::Cancelled
1233 && record.reason.as_deref() != Some("upstream_cancelled")
1234 {
1235 return approval_denied_error(
1236 request_id,
1237 HitlHostResponse {
1238 request_id: request_id.to_string(),
1239 answer: None,
1240 approved: Some(false),
1241 accepted: None,
1242 reviewer: record.cancelled_by.clone(),
1243 reason: record.reason.clone(),
1244 metadata: record.metadata.clone(),
1245 responded_at: record.cancelled_at,
1246 signature: None,
1247 },
1248 );
1249 }
1250 if record.status == WaitpointStatus::Cancelled {
1251 return hitl_cancelled_error(
1252 request_id,
1253 kind,
1254 "",
1255 &[request_id.to_string()],
1256 record.reason,
1257 );
1258 }
1259 }
1260 hitl_cancelled_error(
1261 request_id,
1262 kind,
1263 "",
1264 &[request_id.to_string()],
1265 Some("upstream_cancelled".to_string()),
1266 )
1267}
1268
1269async fn append_timeout_once(
1270 log: &Arc<AnyEventLog>,
1271 kind: HitlRequestKind,
1272 request_id: &str,
1273 trace_id: &str,
1274) -> Result<(), VmError> {
1275 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1276 return Ok(());
1277 }
1278 append_timeout(log, kind, request_id, trace_id).await
1279}
1280
1281async fn hitl_event_exists(
1282 log: &Arc<AnyEventLog>,
1283 kind: HitlRequestKind,
1284 request_id: &str,
1285 event_kind: &str,
1286) -> Result<bool, VmError> {
1287 let topic = topic(kind)?;
1288 let events = log
1289 .read_range(&topic, None, usize::MAX)
1290 .await
1291 .map_err(log_error)?;
1292 Ok(events
1293 .into_iter()
1294 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1295}
1296
1297fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1298 match kind {
1299 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1300 _ => "hitl.approval_approved",
1301 }
1302}
1303
1304fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1305 match kind {
1306 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1307 _ => "hitl.approval_denied",
1308 }
1309}
1310
1311async fn append_request(
1312 log: &Arc<AnyEventLog>,
1313 request: &HitlRequestEnvelope,
1314) -> Result<(), VmError> {
1315 let topic = topic(request.kind)?;
1316 log.append(
1317 &topic,
1318 LogEvent::new(
1319 request.kind.request_event_kind(),
1320 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1321 )
1322 .with_headers(request_headers(request)),
1323 )
1324 .await
1325 .map(|_| ())
1326 .map_err(log_error)
1327}
1328
1329async fn append_named_event(
1330 log: &Arc<AnyEventLog>,
1331 kind: HitlRequestKind,
1332 event_kind: &str,
1333 request_id: &str,
1334 trace_id: &str,
1335 payload: JsonValue,
1336) -> Result<(), VmError> {
1337 let topic = topic(kind)?;
1338 let headers = headers_with_trace(request_id, trace_id);
1339 log.append(
1340 &topic,
1341 LogEvent::new(event_kind, payload).with_headers(headers),
1342 )
1343 .await
1344 .map(|_| ())
1345 .map_err(log_error)
1346}
1347
1348async fn append_timeout(
1349 log: &Arc<AnyEventLog>,
1350 kind: HitlRequestKind,
1351 request_id: &str,
1352 trace_id: &str,
1353) -> Result<(), VmError> {
1354 append_named_event(
1355 log,
1356 kind,
1357 "hitl.timeout",
1358 request_id,
1359 trace_id,
1360 serde_json::to_value(HitlTimeoutRecord {
1361 request_id: request_id.to_string(),
1362 kind,
1363 trace_id: trace_id.to_string(),
1364 timed_out_at: now_rfc3339(),
1365 })
1366 .map_err(|error| VmError::Runtime(error.to_string()))?,
1367 )
1368 .await
1369}
1370
1371async fn maybe_apply_mock_response(
1372 kind: HitlRequestKind,
1373 request_id: &str,
1374 request_payload: &JsonValue,
1375) -> Result<(), VmError> {
1376 let mut params = request_payload
1377 .as_object()
1378 .cloned()
1379 .unwrap_or_default()
1380 .into_iter()
1381 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1382 .collect::<BTreeMap<_, _>>();
1383 params.insert(
1384 "request_id".to_string(),
1385 VmValue::String(Rc::from(request_id.to_string())),
1386 );
1387 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1388 return Ok(());
1389 };
1390 let value = result?;
1391 let responses = match value {
1392 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1393 other => vec![other],
1394 };
1395 for response in responses {
1396 let response_dict = response.as_dict().ok_or_else(|| {
1397 VmError::Runtime(format!(
1398 "mocked HITL {} response must be a dict or list<dict>",
1399 kind.as_str()
1400 ))
1401 })?;
1402 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1403 append_hitl_response(None, hitl_response)
1404 .await
1405 .map_err(VmError::Runtime)?;
1406 }
1407 Ok(())
1408}
1409
1410fn parse_hitl_response_dict(
1411 request_id: &str,
1412 response_dict: &BTreeMap<String, VmValue>,
1413) -> Result<HitlHostResponse, VmError> {
1414 Ok(HitlHostResponse {
1415 request_id: request_id.to_string(),
1416 answer: response_dict
1417 .get("answer")
1418 .map(crate::llm::vm_value_to_json),
1419 approved: response_dict.get("approved").and_then(vm_bool),
1420 accepted: response_dict.get("accepted").and_then(vm_bool),
1421 reviewer: response_dict.get("reviewer").map(VmValue::display),
1422 reason: response_dict.get("reason").map(VmValue::display),
1423 metadata: response_dict
1424 .get("metadata")
1425 .map(crate::llm::vm_value_to_json),
1426 responded_at: response_dict.get("responded_at").map(VmValue::display),
1427 signature: response_dict.get("signature").map(VmValue::display),
1428 })
1429}
1430
1431fn maybe_notify_host(request: &HitlRequestEnvelope) {
1432 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1433 return;
1434 };
1435 bridge.notify(
1436 "harn.hitl.requested",
1437 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1438 );
1439}
1440
1441fn emit_hitl_requested(request: &HitlRequestEnvelope) {
1448 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1449 return;
1450 };
1451 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlRequested {
1452 session_id,
1453 request_id: request.request_id.clone(),
1454 kind: request.kind.as_str().to_string(),
1455 payload: request.payload.clone(),
1456 });
1457}
1458
1459fn emit_hitl_resolved(request_id: &str, kind: HitlRequestKind, outcome: &str) {
1464 let Some(session_id) = crate::agent_sessions::current_session_id() else {
1465 return;
1466 };
1467 crate::agent_events::emit_event(&crate::agent_events::AgentEvent::HitlResolved {
1468 session_id,
1469 request_id: request_id.to_string(),
1470 kind: kind.as_str().to_string(),
1471 outcome: outcome.to_string(),
1472 });
1473}
1474
1475async fn wait_for_request_waitpoint_with_events(
1482 request_id: &str,
1483 kind: HitlRequestKind,
1484 timeout: Option<StdDuration>,
1485) -> Result<WaitpointOutcome, VmError> {
1486 let outcome = wait_for_request_waitpoint(request_id, timeout).await;
1487 let label = match &outcome {
1488 Ok(WaitpointOutcome::Completed(_)) => "answered",
1489 Ok(WaitpointOutcome::Timeout) => "timeout",
1490 Ok(WaitpointOutcome::Cancelled { .. }) => "cancelled",
1491 Err(_) => "error",
1492 };
1493 emit_hitl_resolved(request_id, kind, label);
1494 outcome
1495}
1496
1497fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1498 let Some(value) = value else {
1499 return Ok(AskUserOptions {
1500 schema: None,
1501 timeout: Some(default_question_timeout()),
1502 default: None,
1503 });
1504 };
1505 let dict = value
1506 .as_dict()
1507 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1508 Ok(AskUserOptions {
1509 schema: dict
1510 .get("schema")
1511 .cloned()
1512 .filter(|value| !matches!(value, VmValue::Nil)),
1513 timeout: dict
1514 .get("timeout")
1515 .map(parse_duration_value)
1516 .transpose()?
1517 .or_else(|| Some(default_question_timeout())),
1518 default: dict
1519 .get("default")
1520 .cloned()
1521 .filter(|value| !matches!(value, VmValue::Nil)),
1522 })
1523}
1524
1525fn default_question_timeout() -> StdDuration {
1526 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1527}
1528
1529fn escalation_capability_policy() -> JsonValue {
1530 crate::orchestration::current_execution_policy()
1531 .and_then(|policy| serde_json::to_value(policy).ok())
1532 .unwrap_or(JsonValue::Null)
1533}
1534
1535fn parse_approval_options(
1536 value: Option<&VmValue>,
1537 builtin: &str,
1538) -> Result<ApprovalOptions, VmError> {
1539 let dict = match value {
1540 None => None,
1541 Some(VmValue::Dict(dict)) => Some(dict),
1542 Some(_) => {
1543 return Err(VmError::Runtime(format!(
1544 "{builtin}: options must be a dict"
1545 )))
1546 }
1547 };
1548 let quorum = dict
1549 .and_then(|dict| dict.get("quorum"))
1550 .and_then(VmValue::as_int)
1551 .unwrap_or(1);
1552 if quorum <= 0 {
1553 return Err(VmError::Runtime(format!(
1554 "{builtin}: quorum must be positive"
1555 )));
1556 }
1557 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1558 let capabilities_requested = optional_string_list(
1559 dict.and_then(|dict| dict.get("capabilities_requested")),
1560 builtin,
1561 )?;
1562 let evidence_refs = dict
1563 .and_then(|dict| dict.get("evidence_refs"))
1564 .map(|value| match value {
1565 VmValue::List(items) => Ok(items
1566 .iter()
1567 .map(crate::llm::vm_value_to_json)
1568 .collect::<Vec<_>>()),
1569 _ => Err(VmError::Runtime(format!(
1570 "{builtin}: evidence_refs must be a list"
1571 ))),
1572 })
1573 .transpose()?
1574 .unwrap_or_default();
1575 let deadline = dict
1576 .and_then(|dict| dict.get("deadline"))
1577 .map(parse_duration_value)
1578 .transpose()?
1579 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1580 Ok(ApprovalOptions {
1581 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1582 args: dict.and_then(|dict| dict.get("args")).cloned(),
1583 quorum: quorum as u32,
1584 reviewers,
1585 deadline,
1586 principal: dict
1587 .and_then(|dict| dict.get("principal"))
1588 .map(VmValue::display)
1589 .filter(|value| !value.is_empty()),
1590 evidence_refs,
1591 undo_metadata: dict
1592 .and_then(|dict| dict.get("undo_metadata"))
1593 .map(crate::llm::vm_value_to_json),
1594 capabilities_requested,
1595 })
1596}
1597
1598fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1599 args.get(idx)
1600 .map(VmValue::display)
1601 .filter(|value| !value.is_empty())
1602 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1603}
1604
1605fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1606 let value = args
1607 .get(idx)
1608 .and_then(VmValue::as_int)
1609 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1610 if value <= 0 {
1611 return Err(VmError::Runtime(format!(
1612 "{builtin}: expected a positive int at {idx}"
1613 )));
1614 }
1615 Ok(value)
1616}
1617
1618fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1619 let Some(value) = value else {
1620 return Ok(Vec::new());
1621 };
1622 match value {
1623 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1624 _ => Err(VmError::Runtime(format!(
1625 "{builtin}: expected list<string>"
1626 ))),
1627 }
1628}
1629
1630fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1631 match value {
1632 VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1633 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1634 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1635 _ => Err(VmError::Runtime(
1636 "expected a duration or millisecond count".to_string(),
1637 )),
1638 }
1639}
1640
1641fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1642 active_event_log()
1643 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1644}
1645
1646fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1647 if let Some(log) = active_event_log() {
1648 return Ok(log);
1649 }
1650 let Some(base_dir) = base_dir else {
1651 return Ok(install_memory_for_current_thread(
1652 HITL_EVENT_LOG_QUEUE_DEPTH,
1653 ));
1654 };
1655 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1656}
1657
1658fn current_dispatch_keys() -> Option<DispatchKeys> {
1659 let context = current_dispatch_context()?;
1660 let stable_base = context
1661 .replay_of_event_id
1662 .clone()
1663 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1664 let instance_key = format!(
1665 "{}::{}",
1666 context.trigger_event.id.0,
1667 context.replay_of_event_id.as_deref().unwrap_or("live")
1668 );
1669 Some(DispatchKeys {
1670 instance_key,
1671 stable_base,
1672 agent: context.agent_id,
1673 trace_id: context.trigger_event.trace_id.0,
1674 })
1675}
1676
1677fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1678 if let Some(keys) = dispatch_keys {
1679 let seq = REQUEST_SEQUENCE.with(|slot| {
1680 let mut state = slot.borrow_mut();
1681 if state.instance_key != keys.instance_key {
1682 state.instance_key = keys.instance_key.clone();
1683 state.next_seq = 0;
1684 }
1685 state.next_seq += 1;
1686 state.next_seq
1687 });
1688 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1689 }
1690 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1691}
1692
1693fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1694 let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1695 if let Some(run_id) = request.run_id.as_ref() {
1696 headers.insert("run_id".to_string(), run_id.clone());
1697 }
1698 headers
1699}
1700
1701fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1702 let mut headers = BTreeMap::new();
1703 headers.insert("request_id".to_string(), request_id.to_string());
1704 headers
1705}
1706
1707fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1708 let mut headers = response_headers(request_id);
1709 headers.insert("trace_id".to_string(), trace_id.to_string());
1710 headers
1711}
1712
1713fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1714 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1715}
1716
1717fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1718 event
1719 .headers
1720 .get("request_id")
1721 .is_some_and(|value| value == request_id)
1722 || event
1723 .payload
1724 .get("request_id")
1725 .and_then(JsonValue::as_str)
1726 .is_some_and(|value| value == request_id)
1727}
1728
1729fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1730 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1731 "name": "ApprovalDeniedError",
1732 "category": "generic",
1733 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1734 "request_id": request_id,
1735 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1736 "reason": response.reason,
1737 })))
1738}
1739
1740fn hitl_cancelled_error(
1741 request_id: &str,
1742 kind: HitlRequestKind,
1743 wait_id: &str,
1744 waitpoint_ids: &[String],
1745 reason: Option<String>,
1746) -> VmError {
1747 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1748 let message = reason
1749 .clone()
1750 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1751 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1752 "name": "HumanCancelledError",
1753 "category": ErrorCategory::Cancelled.as_str(),
1754 "message": message,
1755 "request_id": request_id,
1756 "kind": kind.as_str(),
1757 "wait_id": wait_id,
1758 "waitpoint_ids": waitpoint_ids,
1759 "reason": reason,
1760 })))
1761}
1762
1763fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1764 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1765 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1766 "name": "HumanTimeoutError",
1767 "category": ErrorCategory::Timeout.as_str(),
1768 "message": format!("{} timed out", kind.as_str()),
1769 "request_id": request_id,
1770 "kind": kind.as_str(),
1771 })))
1772}
1773
1774fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1775 match default {
1776 VmValue::Int(_) => match value {
1777 VmValue::Int(_) => value.clone(),
1778 VmValue::Float(number) => VmValue::Int(*number as i64),
1779 VmValue::String(text) => text
1780 .parse::<i64>()
1781 .map(VmValue::Int)
1782 .unwrap_or_else(|_| default.clone()),
1783 _ => default.clone(),
1784 },
1785 VmValue::Float(_) => match value {
1786 VmValue::Float(_) => value.clone(),
1787 VmValue::Int(number) => VmValue::Float(*number as f64),
1788 VmValue::String(text) => text
1789 .parse::<f64>()
1790 .map(VmValue::Float)
1791 .unwrap_or_else(|_| default.clone()),
1792 _ => default.clone(),
1793 },
1794 VmValue::Bool(_) => match value {
1795 VmValue::Bool(_) => value.clone(),
1796 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1797 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1798 _ => default.clone(),
1799 },
1800 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1801 VmValue::Duration(_) => match value {
1802 VmValue::Duration(_) => value.clone(),
1803 VmValue::Int(ms) => VmValue::Duration(*ms),
1804 _ => default.clone(),
1805 },
1806 VmValue::Nil => value.clone(),
1807 _ => {
1808 if value.type_name() == default.type_name() {
1809 value.clone()
1810 } else {
1811 default.clone()
1812 }
1813 }
1814 }
1815}
1816
1817fn log_error(error: impl std::fmt::Display) -> VmError {
1818 VmError::Runtime(error.to_string())
1819}
1820
1821fn now_rfc3339() -> String {
1822 format_rfc3339(OffsetDateTime::now_utc())
1823}
1824
1825fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1826 timestamp
1827 .format(&Rfc3339)
1828 .unwrap_or_else(|_| timestamp.to_string())
1829}
1830
1831fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1832 time::Duration::try_from(duration)
1833 .ok()
1834 .map(|duration| format_rfc3339(requested_at + duration))
1835}
1836
1837fn new_trace_id() -> String {
1838 format!("trace_{}", Uuid::now_v7())
1839}
1840
1841fn vm_bool(value: &VmValue) -> Option<bool> {
1842 match value {
1843 VmValue::Bool(flag) => Some(*flag),
1844 _ => None,
1845 }
1846}
1847
1848fn vm_string(value: &VmValue) -> Option<&str> {
1849 match value {
1850 VmValue::String(text) => Some(text.as_ref()),
1851 _ => None,
1852 }
1853}
1854
1855fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1856 match value {
1857 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1858 _ => None,
1859 }
1860}
1861
1862#[cfg(test)]
1863mod tests {
1864 use super::{
1865 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1866 };
1867 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1868 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1869
1870 async fn execute_hitl_script(
1871 base_dir: &std::path::Path,
1872 source: &str,
1873 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1874 reset_thread_local_state();
1875 let log = install_default_for_base_dir(base_dir).expect("install event log");
1876 let chunk = compile_source(source).expect("compile source");
1877 let mut vm = Vm::new();
1878 register_vm_stdlib(&mut vm);
1879 vm.set_source_dir(base_dir);
1880 vm.execute(&chunk).await?;
1881 let output = vm.output().trim_end().to_string();
1882 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1883 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1884 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1885 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1886 Ok((
1887 output,
1888 question_events,
1889 approval_events,
1890 dual_control_events,
1891 escalation_events,
1892 ))
1893 }
1894
1895 async fn event_kinds(
1896 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1897 topic: &str,
1898 ) -> Vec<String> {
1899 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1900 .await
1901 .expect("read topic")
1902 .into_iter()
1903 .map(|(_, event)| event.kind)
1904 .collect()
1905 }
1906
1907 async fn event_payloads(
1908 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1909 topic: &str,
1910 ) -> Vec<serde_json::Value> {
1911 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1912 .await
1913 .expect("read topic")
1914 .into_iter()
1915 .map(|(_, event)| event.payload)
1916 .collect()
1917 }
1918
1919 #[tokio::test(flavor = "current_thread")]
1920 async fn ask_user_coerces_to_default_type_and_logs_events() {
1921 tokio::task::LocalSet::new()
1922 .run_until(async {
1923 let dir = tempfile::tempdir().expect("tempdir");
1924 let source = r#"
1925pipeline test(task) {
1926 host_mock("hitl", "question", {answer: "9"})
1927 let answer: int = ask_user("Pick a number", {default: 0})
1928 __io_println(answer)
1929}
1930"#;
1931 let (
1932 output,
1933 question_events,
1934 approval_events,
1935 dual_control_events,
1936 escalation_events,
1937 ) = execute_hitl_script(dir.path(), source)
1938 .await
1939 .expect("script succeeds");
1940 assert_eq!(output, "9");
1941 assert_eq!(
1942 question_events,
1943 vec![
1944 "hitl.question_asked".to_string(),
1945 "hitl.response_received".to_string()
1946 ]
1947 );
1948 assert!(approval_events.is_empty());
1949 assert!(dual_control_events.is_empty());
1950 assert!(escalation_events.is_empty());
1951 })
1952 .await;
1953 }
1954
1955 #[tokio::test(flavor = "current_thread")]
1956 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1957 tokio::task::LocalSet::new()
1958 .run_until(async {
1959 let dir = tempfile::tempdir().expect("tempdir");
1960 let source = r#"
1961pipeline test(task) {
1962 host_mock("hitl", "approval", [
1963 {approved: true, reviewer: "alice", reason: "ok"},
1964 {approved: true, reviewer: "bob", reason: "ship it"},
1965 ])
1966 let record = request_approval(
1967 "deploy production",
1968 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1969 )
1970 __io_println(record.approved)
1971 __io_println(len(record.reviewers))
1972 __io_println(record.reviewers[0])
1973 __io_println(record.reviewers[1])
1974}
1975"#;
1976 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1977 .await
1978 .expect("script succeeds");
1979 assert_eq!(
1980 approval_events,
1981 vec![
1982 "hitl.approval_requested".to_string(),
1983 "hitl.response_received".to_string(),
1984 "hitl.response_received".to_string(),
1985 "hitl.approval_approved".to_string(),
1986 ]
1987 );
1988 })
1989 .await;
1990 }
1991
1992 #[tokio::test(flavor = "current_thread")]
1993 async fn request_approval_emits_canonical_approval_request_payload() {
1994 tokio::task::LocalSet::new()
1995 .run_until(async {
1996 reset_thread_local_state();
1997 let dir = tempfile::tempdir().expect("tempdir");
1998 let log = install_default_for_base_dir(dir.path()).expect("install event log");
1999 let source = r#"
2000pipeline test(task) {
2001 host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
2002 request_approval("deploy production", {
2003 args: {environment: "prod"},
2004 quorum: 1,
2005 reviewers: ["alice"],
2006 evidence_refs: [{kind: "run", uri: "run_123"}],
2007 undo_metadata: {strategy: "rollback"},
2008 capabilities_requested: ["deploy.production"],
2009 })
2010}
2011"#;
2012 let chunk = compile_source(source).expect("compile source");
2013 let mut vm = Vm::new();
2014 register_vm_stdlib(&mut vm);
2015 vm.set_source_dir(dir.path());
2016 vm.execute(&chunk).await.expect("script succeeds");
2017
2018 let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
2019 let request_payload = &payloads[0]["payload"];
2020 let approval_request = &request_payload["approval_request"];
2021 assert_eq!(approval_request["id"], request_payload["id"]);
2022 assert_eq!(approval_request["action"], "deploy production");
2023 assert_eq!(approval_request["args"]["environment"], "prod");
2024 assert_eq!(approval_request["approvers_required"], 1);
2025 assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
2026 assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
2027 assert_eq!(
2028 approval_request["capabilities_requested"][0],
2029 "deploy.production"
2030 );
2031 assert!(approval_request["requested_at"].as_str().is_some());
2032 assert!(approval_request["deadline"].as_str().is_some());
2033 })
2034 .await;
2035 }
2036
2037 #[tokio::test(flavor = "current_thread")]
2038 async fn request_approval_surfaces_denials_as_typed_errors() {
2039 tokio::task::LocalSet::new()
2040 .run_until(async {
2041 let dir = tempfile::tempdir().expect("tempdir");
2042 let source = r#"
2043pipeline test(task) {
2044 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
2045 let denied = try {
2046 request_approval("drop table", {reviewers: ["alice"]})
2047 }
2048 __io_println(is_err(denied))
2049 __io_println(unwrap_err(denied).name)
2050 __io_println(unwrap_err(denied).reason)
2051}
2052"#;
2053 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
2054 .await
2055 .expect("script succeeds");
2056 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
2057 assert_eq!(
2058 approval_events,
2059 vec![
2060 "hitl.approval_requested".to_string(),
2061 "hitl.response_received".to_string(),
2062 "hitl.approval_denied".to_string(),
2063 ]
2064 );
2065 })
2066 .await;
2067 }
2068
2069 #[tokio::test(flavor = "current_thread")]
2070 async fn dual_control_executes_action_after_quorum() {
2071 tokio::task::LocalSet::new()
2072 .run_until(async {
2073 let dir = tempfile::tempdir().expect("tempdir");
2074 let source = r#"
2075pipeline test(task) {
2076 host_mock("hitl", "dual_control", [
2077 {approved: true, reviewer: "alice"},
2078 {approved: true, reviewer: "bob"},
2079 ])
2080 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
2081 __io_println(result)
2082}
2083"#;
2084 let (output, _, _, dual_control_events, _) =
2085 execute_hitl_script(dir.path(), source)
2086 .await
2087 .expect("script succeeds");
2088 assert_eq!(output, "launched");
2089 assert_eq!(
2090 dual_control_events,
2091 vec![
2092 "hitl.dual_control_requested".to_string(),
2093 "hitl.response_received".to_string(),
2094 "hitl.response_received".to_string(),
2095 "hitl.dual_control_approved".to_string(),
2096 "hitl.dual_control_executed".to_string(),
2097 ]
2098 );
2099 })
2100 .await;
2101 }
2102
2103 #[tokio::test(flavor = "current_thread")]
2104 async fn escalate_to_waits_for_acceptance_event() {
2105 tokio::task::LocalSet::new()
2106 .run_until(async {
2107 let dir = tempfile::tempdir().expect("tempdir");
2108 let source = r#"
2109pipeline test(task) {
2110 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
2111 let handle = escalate_to("admin", "need override")
2112 __io_println(handle.status)
2113 __io_println(handle.reviewer)
2114}
2115"#;
2116 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
2117 .await
2118 .expect("script succeeds");
2119 assert_eq!(output, "accepted\nlead");
2120 assert_eq!(
2121 escalation_events,
2122 vec![
2123 "hitl.escalation_issued".to_string(),
2124 "hitl.escalation_accepted".to_string(),
2125 ]
2126 );
2127 })
2128 .await;
2129 }
2130
2131 #[tokio::test(flavor = "current_thread")]
2137 async fn ask_user_emits_hitl_request_and_resolution_to_agent_event_sinks() {
2138 use std::sync::Mutex as StdMutex;
2139
2140 tokio::task::LocalSet::new()
2141 .run_until(async {
2142 let dir = tempfile::tempdir().expect("tempdir");
2143 let session_id = "hitl-session".to_string();
2144 let captured: std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>> =
2145 std::sync::Arc::new(StdMutex::new(Vec::new()));
2146
2147 struct CaptureSink(std::sync::Arc<StdMutex<Vec<crate::agent_events::AgentEvent>>>);
2148 impl crate::agent_events::AgentEventSink for CaptureSink {
2149 fn handle_event(&self, event: &crate::agent_events::AgentEvent) {
2150 self.0.lock().expect("captured").push(event.clone());
2151 }
2152 }
2153
2154 crate::reset_thread_local_state();
2160 crate::event_log::install_default_for_base_dir(dir.path())
2161 .expect("install event log");
2162
2163 crate::agent_events::reset_all_sinks();
2164 let sink: std::sync::Arc<dyn crate::agent_events::AgentEventSink> =
2165 std::sync::Arc::new(CaptureSink(captured.clone()));
2166 crate::agent_events::register_sink(session_id.clone(), sink);
2167 crate::agent_sessions::open_or_create(Some(session_id.clone()));
2168 let _guard = crate::agent_sessions::enter_current_session(session_id.clone());
2169
2170 let source = r#"
2171pipeline test(task) {
2172 host_mock("hitl", "question", {answer: "ok"})
2173 let answer: string = ask_user("Are you sure?", {default: "no"})
2174 __io_println(answer)
2175}
2176"#;
2177 let chunk = crate::compile_source(source).expect("compile source");
2178 let mut vm = Vm::new();
2179 register_vm_stdlib(&mut vm);
2180 vm.set_source_dir(dir.path());
2181 vm.execute(&chunk).await.expect("script runs");
2182 assert_eq!(vm.output().trim_end(), "ok");
2183
2184 let events = captured.lock().expect("captured");
2185 let mut iter = events.iter().filter(|event| {
2186 matches!(
2187 event,
2188 crate::agent_events::AgentEvent::HitlRequested { .. }
2189 | crate::agent_events::AgentEvent::HitlResolved { .. }
2190 )
2191 });
2192 let requested = iter.next().expect("HitlRequested emitted");
2193 let resolved = iter.next().expect("HitlResolved emitted");
2194 assert!(iter.next().is_none(), "exactly one pair: {events:?}");
2195
2196 let crate::agent_events::AgentEvent::HitlRequested {
2197 session_id: req_session,
2198 request_id: req_id,
2199 kind: req_kind,
2200 payload,
2201 } = requested
2202 else {
2203 panic!("expected HitlRequested, got: {requested:?}");
2204 };
2205 assert_eq!(req_session, &session_id);
2206 assert_eq!(req_kind, "question");
2207 assert!(req_id.starts_with("hitl_question_"));
2208 assert_eq!(payload["prompt"], "Are you sure?");
2209
2210 let crate::agent_events::AgentEvent::HitlResolved {
2211 request_id: res_id,
2212 kind: res_kind,
2213 outcome,
2214 ..
2215 } = resolved
2216 else {
2217 panic!("expected HitlResolved, got: {resolved:?}");
2218 };
2219 assert_eq!(res_id, req_id);
2220 assert_eq!(res_kind, "question");
2221 assert_eq!(outcome, "answered");
2222
2223 drop(_guard);
2224 crate::agent_events::reset_all_sinks();
2225 })
2226 .await;
2227 }
2228}