1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24 WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45 pub(crate) instance_key: String,
46 pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52 Question,
53 Approval,
54 DualControl,
55 Escalation,
56}
57
58impl HitlRequestKind {
59 pub(crate) fn as_str(self) -> &'static str {
60 match self {
61 Self::Question => "question",
62 Self::Approval => "approval",
63 Self::DualControl => "dual_control",
64 Self::Escalation => "escalation",
65 }
66 }
67
68 fn topic(self) -> &'static str {
69 match self {
70 Self::Question => HITL_QUESTIONS_TOPIC,
71 Self::Approval => HITL_APPROVALS_TOPIC,
72 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73 Self::Escalation => HITL_ESCALATIONS_TOPIC,
74 }
75 }
76
77 fn request_event_kind(self) -> &'static str {
78 match self {
79 Self::Question => "hitl.question_asked",
80 Self::Approval => "hitl.approval_requested",
81 Self::DualControl => "hitl.dual_control_requested",
82 Self::Escalation => "hitl.escalation_issued",
83 }
84 }
85
86 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87 if request_id.starts_with("hitl_question_") {
88 Some(Self::Question)
89 } else if request_id.starts_with("hitl_approval_") {
90 Some(Self::Approval)
91 } else if request_id.starts_with("hitl_dual_control_") {
92 Some(Self::DualControl)
93 } else if request_id.starts_with("hitl_escalation_") {
94 Some(Self::Escalation)
95 } else {
96 None
97 }
98 }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103 pub request_id: String,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub answer: Option<JsonValue>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub approved: Option<bool>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub accepted: Option<bool>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub reviewer: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub reason: Option<String>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub metadata: Option<JsonValue>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub responded_at: Option<String>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124 request_id: String,
125 kind: HitlRequestKind,
126 #[serde(default)]
127 agent: String,
128 trace_id: String,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 run_id: Option<String>,
131 requested_at: String,
132 payload: JsonValue,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136struct HitlTimeoutRecord {
137 request_id: String,
138 kind: HitlRequestKind,
139 trace_id: String,
140 timed_out_at: String,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
144pub struct ApprovalRequest {
145 pub id: String,
146 pub action: String,
147 #[serde(default)]
148 pub args: JsonValue,
149 pub principal: String,
150 pub requested_at: String,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub deadline: Option<String>,
153 pub approvers_required: u32,
154 #[serde(default)]
155 pub evidence_refs: Vec<JsonValue>,
156 #[serde(default)]
157 pub undo_metadata: JsonValue,
158 #[serde(default)]
159 pub capabilities_requested: Vec<String>,
160}
161
162impl ApprovalRequest {
163 pub fn new(
164 id: impl Into<String>,
165 action: impl Into<String>,
166 args: JsonValue,
167 principal: impl Into<String>,
168 requested_at: impl Into<String>,
169 ) -> Self {
170 Self {
171 id: id.into(),
172 action: action.into(),
173 args,
174 principal: principal.into(),
175 requested_at: requested_at.into(),
176 deadline: None,
177 approvers_required: 1,
178 evidence_refs: Vec::new(),
179 undo_metadata: JsonValue::Null,
180 capabilities_requested: Vec::new(),
181 }
182 }
183}
184
185pub(crate) fn approval_request_for_host_permission(
186 id: impl Into<String>,
187 action: impl Into<String>,
188 args: JsonValue,
189 principal: impl Into<String>,
190 evidence_refs: Vec<JsonValue>,
191 undo_metadata: JsonValue,
192 capabilities_requested: Vec<String>,
193) -> ApprovalRequest {
194 let mut request = ApprovalRequest::new(id, action, args, principal, now_rfc3339());
195 request.evidence_refs = evidence_refs;
196 request.undo_metadata = undo_metadata;
197 request.capabilities_requested = capabilities_requested;
198 request
199}
200
201#[derive(Clone, Debug)]
202struct DispatchKeys {
203 instance_key: String,
204 stable_base: String,
205 agent: String,
206 trace_id: String,
207}
208
209#[derive(Clone, Debug)]
210struct AskUserOptions {
211 schema: Option<VmValue>,
212 timeout: Option<StdDuration>,
213 default: Option<VmValue>,
214}
215
216#[derive(Clone, Debug)]
217struct ApprovalOptions {
218 detail: Option<VmValue>,
219 args: Option<VmValue>,
220 quorum: u32,
221 reviewers: Vec<String>,
222 deadline: StdDuration,
223 principal: Option<String>,
224 evidence_refs: Vec<JsonValue>,
225 undo_metadata: Option<JsonValue>,
226 capabilities_requested: Vec<String>,
227}
228
229#[derive(Clone, Debug)]
230struct ApprovalProgress {
231 reviewers: BTreeSet<String>,
232 signatures: Vec<ApprovalSignature>,
233 reason: Option<String>,
234 approved_at: Option<String>,
235}
236
237#[derive(Clone, Debug, Serialize)]
238struct ApprovalSignature {
239 reviewer: String,
240 signed_at: String,
241 signature: String,
242}
243
244#[derive(Clone, Debug)]
245enum ApprovalResolution {
246 Pending,
247 Approved(ApprovalProgress),
248 Denied(HitlHostResponse),
249}
250
251#[derive(Clone, Debug)]
252enum WaitpointOutcome {
253 Completed(WaitpointRecord),
254 Timeout,
255 Cancelled {
256 wait_id: String,
257 waitpoint_ids: Vec<String>,
258 reason: Option<String>,
259 },
260}
261
262pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
263 vm.register_async_builtin("ask_user", |args| {
264 Box::pin(async move { ask_user_impl(&args).await })
265 });
266
267 vm.register_async_builtin("request_approval", |args| {
268 Box::pin(async move { request_approval_impl(&args).await })
269 });
270
271 vm.register_async_builtin("dual_control", |args| {
272 Box::pin(async move { dual_control_impl(&args).await })
273 });
274
275 vm.register_async_builtin("escalate_to", |args| {
276 Box::pin(async move { escalate_to_impl(&args).await })
277 });
278}
279
280pub(crate) fn reset_hitl_state() {
281 REQUEST_SEQUENCE.with(|slot| {
282 *slot.borrow_mut() = RequestSequenceState::default();
283 });
284}
285
286pub(crate) fn take_hitl_state() -> RequestSequenceState {
287 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
288}
289
290pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
291 REQUEST_SEQUENCE.with(|slot| {
292 *slot.borrow_mut() = state;
293 });
294}
295
296pub async fn append_hitl_response(
297 base_dir: Option<&Path>,
298 mut response: HitlHostResponse,
299) -> Result<u64, String> {
300 let kind = HitlRequestKind::from_request_id(&response.request_id)
301 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
302 if response.responded_at.is_none() {
303 response.responded_at = Some(now_rfc3339());
304 }
305 let log = ensure_hitl_event_log_for(base_dir)?;
306 let headers = response_headers(&response.request_id);
307 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
308 let event_id = log
309 .append(
310 &topic,
311 LogEvent::new(
312 match kind {
313 HitlRequestKind::Escalation => "hitl.escalation_accepted",
314 _ => "hitl.response_received",
315 },
316 serde_json::to_value(&response).map_err(|error| error.to_string())?,
317 )
318 .with_headers(headers),
319 )
320 .await
321 .map_err(|error| error.to_string())?;
322 finalize_hitl_response(&log, kind, &response).await?;
323 Ok(event_id)
324}
325
326pub async fn append_approval_request_on(
327 log: &Arc<AnyEventLog>,
328 agent: impl Into<String>,
329 trace_id: impl Into<String>,
330 action: impl Into<String>,
331 detail: JsonValue,
332 reviewers: Vec<String>,
333) -> Result<String, VmError> {
334 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
335 let trace_id = trace_id.into();
336 let agent = agent.into();
337 let requested_at_time = OffsetDateTime::now_utc();
338 let requested_at = format_rfc3339(requested_at_time);
339 let mut approval_request = ApprovalRequest::new(
340 request_id.clone(),
341 action.into(),
342 detail.clone(),
343 agent.clone(),
344 requested_at.clone(),
345 );
346 approval_request.deadline = deadline_after(
347 requested_at_time,
348 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
349 );
350 approval_request.approvers_required = 1;
351 let approval_request_json = serde_json::to_value(&approval_request)
352 .map_err(|error| VmError::Runtime(error.to_string()))?;
353 let request = HitlRequestEnvelope {
354 request_id: request_id.clone(),
355 kind: HitlRequestKind::Approval,
356 agent,
357 trace_id: trace_id.clone(),
358 run_id: None,
359 requested_at: requested_at.clone(),
360 payload: json!({
361 "approval_request": approval_request_json,
362 "id": approval_request.id,
363 "action": approval_request.action,
364 "args": approval_request.args,
365 "principal": approval_request.principal,
366 "requested_at": requested_at,
367 "deadline": approval_request.deadline,
368 "approvers_required": approval_request.approvers_required,
369 "evidence_refs": approval_request.evidence_refs,
370 "undo_metadata": approval_request.undo_metadata,
371 "capabilities_requested": approval_request.capabilities_requested,
372 "detail": detail,
373 "quorum": 1,
374 "reviewers": reviewers,
375 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
376 }),
377 };
378 create_request_waitpoint(log, &request).await?;
379 append_request(log, &request).await?;
380 maybe_notify_host(&request);
381 Ok(request_id)
382}
383
384async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
385 let prompt = required_string_arg(args, 0, "ask_user")?;
386 let options = parse_ask_user_options(args.get(1))?;
387 let keys = current_dispatch_keys();
388 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
389 let trace_id = keys
390 .as_ref()
391 .map(|keys| keys.trace_id.clone())
392 .unwrap_or_else(new_trace_id);
393 let log = ensure_hitl_event_log();
394 let request = HitlRequestEnvelope {
395 request_id: request_id.clone(),
396 kind: HitlRequestKind::Question,
397 agent: keys
398 .as_ref()
399 .map(|keys| keys.agent.clone())
400 .unwrap_or_default(),
401 trace_id: trace_id.clone(),
402 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
403 requested_at: now_rfc3339(),
404 payload: json!({
405 "prompt": prompt,
406 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
407 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
408 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
409 }),
410 };
411 create_request_waitpoint(&log, &request).await?;
412 append_request(&log, &request).await?;
413 maybe_notify_host(&request);
414 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
415
416 match wait_for_request_waitpoint(&request_id, options.timeout).await? {
417 WaitpointOutcome::Completed(record) => {
418 let answer = record
419 .value
420 .as_ref()
421 .map(crate::stdlib::json_to_vm_value)
422 .unwrap_or(VmValue::Nil);
423 if let Some(schema) = options.schema.as_ref() {
424 return schema_expect_value(&answer, schema, true);
425 }
426 if let Some(default) = options.default.as_ref() {
427 return Ok(coerce_like_default(&answer, default));
428 }
429 Ok(answer)
430 }
431 WaitpointOutcome::Timeout => {
432 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
433 if let Some(default) = options.default {
434 return Ok(default);
435 }
436 Err(timeout_error(&request_id, HitlRequestKind::Question))
437 }
438 WaitpointOutcome::Cancelled {
439 wait_id,
440 waitpoint_ids,
441 reason,
442 } => Err(hitl_cancelled_error(
443 &request_id,
444 HitlRequestKind::Question,
445 &wait_id,
446 &waitpoint_ids,
447 reason,
448 )),
449 }
450}
451
452async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
453 let action = required_string_arg(args, 0, "request_approval")?;
454 let options = parse_approval_options(args.get(1), "request_approval")?;
455 let keys = current_dispatch_keys();
456 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
457 let trace_id = keys
458 .as_ref()
459 .map(|keys| keys.trace_id.clone())
460 .unwrap_or_else(new_trace_id);
461 let agent = keys
462 .as_ref()
463 .map(|keys| keys.agent.clone())
464 .unwrap_or_default();
465 let requested_at_time = OffsetDateTime::now_utc();
466 let requested_at = format_rfc3339(requested_at_time);
467 let principal = options.principal.clone().unwrap_or_else(|| agent.clone());
468 let approval_args = options
469 .args
470 .as_ref()
471 .or(options.detail.as_ref())
472 .map(crate::llm::vm_value_to_json)
473 .unwrap_or(JsonValue::Null);
474 let mut approval_request = ApprovalRequest::new(
475 request_id.clone(),
476 action.clone(),
477 approval_args,
478 principal,
479 requested_at.clone(),
480 );
481 approval_request.deadline = deadline_after(requested_at_time, options.deadline);
482 approval_request.approvers_required = options.quorum;
483 approval_request.evidence_refs = options.evidence_refs.clone();
484 approval_request.undo_metadata = options
485 .undo_metadata
486 .clone()
487 .or_else(|| {
488 crate::orchestration::current_mutation_session()
489 .and_then(|session| serde_json::to_value(session).ok())
490 })
491 .unwrap_or(JsonValue::Null);
492 approval_request.capabilities_requested = options.capabilities_requested.clone();
493 let approval_request_json = serde_json::to_value(&approval_request)
494 .map_err(|error| VmError::Runtime(error.to_string()))?;
495 let log = ensure_hitl_event_log();
496 let request = HitlRequestEnvelope {
497 request_id: request_id.clone(),
498 kind: HitlRequestKind::Approval,
499 agent,
500 trace_id: trace_id.clone(),
501 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
502 requested_at: requested_at.clone(),
503 payload: json!({
504 "approval_request": approval_request_json,
505 "id": approval_request.id,
506 "action": action,
507 "args": approval_request.args,
508 "principal": approval_request.principal,
509 "requested_at": requested_at,
510 "deadline": approval_request.deadline,
511 "approvers_required": approval_request.approvers_required,
512 "evidence_refs": approval_request.evidence_refs,
513 "undo_metadata": approval_request.undo_metadata,
514 "capabilities_requested": approval_request.capabilities_requested,
515 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
516 "quorum": options.quorum,
517 "reviewers": options.reviewers,
518 "deadline_ms": options.deadline.as_millis() as u64,
519 }),
520 };
521 create_request_waitpoint(&log, &request).await?;
522 append_request(&log, &request).await?;
523 maybe_notify_host(&request);
524 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
525
526 match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
527 WaitpointOutcome::Completed(record) => {
528 approval_record_from_waitpoint(&record, "request_approval")
529 }
530 WaitpointOutcome::Timeout => {
531 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
532 Err(timeout_error(&request_id, HitlRequestKind::Approval))
533 }
534 WaitpointOutcome::Cancelled { .. } => {
535 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
536 }
537 }
538}
539
540async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
541 let n = required_positive_int_arg(args, 0, "dual_control")?;
542 let m = required_positive_int_arg(args, 1, "dual_control")?;
543 if n > m {
544 return Err(VmError::Runtime(
545 "dual_control: n must be less than or equal to m".to_string(),
546 ));
547 }
548 let action = args
549 .get(2)
550 .and_then(|value| match value {
551 VmValue::Closure(closure) => Some(closure.clone()),
552 _ => None,
553 })
554 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
555 let approvers = optional_string_list(args.get(3), "dual_control")?;
556 if !approvers.is_empty() && approvers.len() < m as usize {
557 return Err(VmError::Runtime(format!(
558 "dual_control: expected at least {m} approvers, got {}",
559 approvers.len()
560 )));
561 }
562
563 let keys = current_dispatch_keys();
564 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
565 let trace_id = keys
566 .as_ref()
567 .map(|keys| keys.trace_id.clone())
568 .unwrap_or_else(new_trace_id);
569 let action_name = if action.func.name.is_empty() {
570 "anonymous".to_string()
571 } else {
572 action.func.name.clone()
573 };
574 let agent = keys
575 .as_ref()
576 .map(|keys| keys.agent.clone())
577 .unwrap_or_default();
578 let requested_at_time = OffsetDateTime::now_utc();
579 let requested_at = format_rfc3339(requested_at_time);
580 let mut approval_request = ApprovalRequest::new(
581 request_id.clone(),
582 action_name.clone(),
583 json!({
584 "n": n,
585 "m": m,
586 "approvers": approvers.clone(),
587 }),
588 agent.clone(),
589 requested_at.clone(),
590 );
591 approval_request.deadline = deadline_after(
592 requested_at_time,
593 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
594 );
595 approval_request.approvers_required = n as u32;
596 approval_request.undo_metadata = crate::orchestration::current_mutation_session()
597 .and_then(|session| serde_json::to_value(session).ok())
598 .unwrap_or(JsonValue::Null);
599 let approval_request_json = serde_json::to_value(&approval_request)
600 .map_err(|error| VmError::Runtime(error.to_string()))?;
601 let log = ensure_hitl_event_log();
602 let request = HitlRequestEnvelope {
603 request_id: request_id.clone(),
604 kind: HitlRequestKind::DualControl,
605 agent,
606 trace_id: trace_id.clone(),
607 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
608 requested_at: requested_at.clone(),
609 payload: json!({
610 "approval_request": approval_request_json,
611 "id": approval_request.id,
612 "args": approval_request.args,
613 "principal": approval_request.principal,
614 "requested_at": requested_at,
615 "deadline": approval_request.deadline,
616 "approvers_required": approval_request.approvers_required,
617 "evidence_refs": approval_request.evidence_refs,
618 "undo_metadata": approval_request.undo_metadata,
619 "capabilities_requested": approval_request.capabilities_requested,
620 "n": n,
621 "m": m,
622 "action": action_name,
623 "approvers": approvers,
624 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
625 }),
626 };
627 create_request_waitpoint(&log, &request).await?;
628 append_request(&log, &request).await?;
629 maybe_notify_host(&request);
630 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
631
632 match wait_for_request_waitpoint(
633 &request_id,
634 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
635 )
636 .await?
637 {
638 WaitpointOutcome::Completed(record) => {
639 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
640 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
641 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
642 })?;
643 let result = vm.call_closure_pub(&action, &[]).await?;
644
645 append_named_event(
646 &log,
647 HitlRequestKind::DualControl,
648 "hitl.dual_control_executed",
649 &request_id,
650 &trace_id,
651 json!({
652 "request_id": request_id,
653 "result": crate::llm::vm_value_to_json(&result),
654 }),
655 )
656 .await?;
657
658 Ok(result)
659 }
660 WaitpointOutcome::Timeout => {
661 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
662 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
663 }
664 WaitpointOutcome::Cancelled { .. } => {
665 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
666 }
667 }
668}
669
670async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
671 let role = required_string_arg(args, 0, "escalate_to")?;
672 let reason = required_string_arg(args, 1, "escalate_to")?;
673 let keys = current_dispatch_keys();
674 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
675 let trace_id = keys
676 .as_ref()
677 .map(|keys| keys.trace_id.clone())
678 .unwrap_or_else(new_trace_id);
679 let log = ensure_hitl_event_log();
680 let request = HitlRequestEnvelope {
681 request_id: request_id.clone(),
682 kind: HitlRequestKind::Escalation,
683 agent: keys
684 .as_ref()
685 .map(|keys| keys.agent.clone())
686 .unwrap_or_default(),
687 trace_id: trace_id.clone(),
688 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
689 requested_at: now_rfc3339(),
690 payload: json!({
691 "role": role,
692 "reason": reason,
693 "capability_policy": escalation_capability_policy(),
694 }),
695 };
696 create_request_waitpoint(&log, &request).await?;
697 append_request(&log, &request).await?;
698 maybe_notify_host(&request);
699 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
700
701 match wait_for_request_waitpoint(&request_id, None).await? {
702 WaitpointOutcome::Completed(record) => {
703 let accepted_at = record.completed_at.clone();
704 let reviewer = record.completed_by.clone();
705 let accepted = record
706 .value
707 .as_ref()
708 .and_then(|value| value.get("accepted"))
709 .and_then(JsonValue::as_bool)
710 .unwrap_or(true);
711 Ok(crate::stdlib::json_to_vm_value(&json!({
712 "request_id": request_id,
713 "role": role,
714 "reason": reason,
715 "trace_id": trace_id,
716 "status": if accepted { "accepted" } else { "pending" },
717 "accepted_at": accepted_at,
718 "reviewer": reviewer,
719 })))
720 }
721 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
722 WaitpointOutcome::Cancelled {
723 wait_id,
724 waitpoint_ids,
725 reason,
726 } => Err(hitl_cancelled_error(
727 &request_id,
728 HitlRequestKind::Escalation,
729 &wait_id,
730 &waitpoint_ids,
731 reason,
732 )),
733 }
734}
735
736async fn create_request_waitpoint(
737 log: &Arc<AnyEventLog>,
738 request: &HitlRequestEnvelope,
739) -> Result<(), VmError> {
740 create_waitpoint_on(
741 log,
742 Some(request.request_id.clone()),
743 Some(json!({
744 "kind": request.kind.as_str(),
745 "agent": request.agent.clone(),
746 "trace_id": request.trace_id.clone(),
747 "requested_at": request.requested_at.clone(),
748 "payload": request.payload.clone(),
749 })),
750 )
751 .await?;
752 Ok(())
753}
754
755async fn wait_for_request_waitpoint(
756 request_id: &str,
757 timeout: Option<StdDuration>,
758) -> Result<WaitpointOutcome, VmError> {
759 match wait_on_waitpoints(
760 vec![request_id.to_string()],
761 WaitpointWaitOptions { timeout },
762 )
763 .await
764 {
765 Ok(records) => Ok(WaitpointOutcome::Completed(
766 records
767 .into_iter()
768 .next()
769 .expect("single waitpoint wait result"),
770 )),
771 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
772 Err(WaitpointWaitFailure::Cancelled {
773 wait_id,
774 waitpoint_ids,
775 reason,
776 }) => Ok(WaitpointOutcome::Cancelled {
777 wait_id,
778 waitpoint_ids,
779 reason,
780 }),
781 Err(WaitpointWaitFailure::Vm(error)) => {
782 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
783 return Ok(outcome);
784 }
785 Err(error)
786 }
787 }
788}
789
790fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
791 let VmError::Thrown(VmValue::Dict(dict)) = error else {
792 return None;
793 };
794 let name = dict.get("name").and_then(vm_string)?;
795 match name {
796 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
797 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
798 wait_id: dict
799 .get("wait_id")
800 .and_then(vm_string)
801 .unwrap_or_default()
802 .to_string(),
803 waitpoint_ids: dict
804 .get("waitpoint_ids")
805 .and_then(vm_string_list)
806 .unwrap_or_default(),
807 reason: dict
808 .get("reason")
809 .and_then(vm_string)
810 .map(ToString::to_string),
811 }),
812 _ => None,
813 }
814}
815
816async fn finalize_hitl_response(
817 log: &Arc<AnyEventLog>,
818 kind: HitlRequestKind,
819 response: &HitlHostResponse,
820) -> Result<(), String> {
821 match kind {
822 HitlRequestKind::Question => {
823 if waitpoint_is_terminal(log, &response.request_id).await? {
824 return Ok(());
825 }
826 complete_waitpoint_on(
827 log,
828 &response.request_id,
829 response.answer.clone(),
830 response.reviewer.clone(),
831 response.reason.clone(),
832 response.metadata.clone(),
833 )
834 .await
835 .map(|_| ())
836 .map_err(|error| error.to_string())
837 }
838 HitlRequestKind::Escalation => {
839 if !response.accepted.unwrap_or(false)
840 || waitpoint_is_terminal(log, &response.request_id).await?
841 {
842 return Ok(());
843 }
844 complete_waitpoint_on(
845 log,
846 &response.request_id,
847 Some(json!({
848 "accepted": true,
849 "reviewer": response.reviewer,
850 "reason": response.reason,
851 "responded_at": response.responded_at,
852 })),
853 response.reviewer.clone(),
854 response.reason.clone(),
855 response.metadata.clone(),
856 )
857 .await
858 .map(|_| ())
859 .map_err(|error| error.to_string())
860 }
861 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
862 if waitpoint_is_terminal(log, &response.request_id).await? {
863 return Ok(());
864 }
865 let request = load_request_envelope(log, kind, &response.request_id)
866 .await
867 .map_err(|error| error.to_string())?;
868 match resolve_approval_state(log, kind, &request)
869 .await
870 .map_err(|error| error.to_string())?
871 {
872 ApprovalResolution::Pending => Ok(()),
873 ApprovalResolution::Approved(progress) => {
874 let record = approval_record_json(&progress);
875 append_named_event(
876 log,
877 kind,
878 approved_event_kind(kind),
879 &request.request_id,
880 &request.trace_id,
881 json!({
882 "request_id": request.request_id.clone(),
883 "record": record.clone(),
884 }),
885 )
886 .await
887 .map_err(|error| error.to_string())?;
888 complete_waitpoint_on(
889 log,
890 &request.request_id,
891 Some(record),
892 response.reviewer.clone(),
893 progress.reason.clone(),
894 response.metadata.clone(),
895 )
896 .await
897 .map(|_| ())
898 .map_err(|error| error.to_string())
899 }
900 ApprovalResolution::Denied(denied) => {
901 append_named_event(
902 log,
903 kind,
904 denied_event_kind(kind),
905 &request.request_id,
906 &request.trace_id,
907 json!({
908 "request_id": request.request_id.clone(),
909 "reviewer": denied.reviewer.clone(),
910 "reason": denied.reason.clone(),
911 }),
912 )
913 .await
914 .map_err(|error| error.to_string())?;
915 cancel_waitpoint_on(
916 log,
917 &request.request_id,
918 denied.reviewer.clone(),
919 denied.reason.clone(),
920 denied.metadata.clone(),
921 )
922 .await
923 .map(|_| ())
924 .map_err(|error| error.to_string())
925 }
926 }
927 }
928 }
929}
930
931async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
932 Ok(inspect_waitpoint_on(log, request_id)
933 .await
934 .map_err(|error| error.to_string())?
935 .is_some_and(|record| record.status != WaitpointStatus::Open))
936}
937
938async fn load_request_envelope(
939 log: &Arc<AnyEventLog>,
940 kind: HitlRequestKind,
941 request_id: &str,
942) -> Result<HitlRequestEnvelope, VmError> {
943 let topic = topic(kind)?;
944 let events = log
945 .read_range(&topic, None, usize::MAX)
946 .await
947 .map_err(log_error)?;
948 events
949 .into_iter()
950 .filter(|(_, event)| event.kind == kind.request_event_kind())
951 .find_map(|(_, event)| {
952 if !event_matches_request(&event, request_id) {
953 return None;
954 }
955 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
956 })
957 .ok_or_else(|| {
958 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
959 })
960}
961
962async fn resolve_approval_state(
963 log: &Arc<AnyEventLog>,
964 kind: HitlRequestKind,
965 request: &HitlRequestEnvelope,
966) -> Result<ApprovalResolution, VmError> {
967 let quorum = approval_quorum_from_request(kind, request)?;
968 let allowed_reviewers = approval_reviewers_from_request(kind, request)
969 .into_iter()
970 .collect::<BTreeSet<_>>();
971 let mut progress = ApprovalProgress {
972 reviewers: BTreeSet::new(),
973 signatures: Vec::new(),
974 reason: None,
975 approved_at: None,
976 };
977 let topic = topic(kind)?;
978 let events = log
979 .read_range(&topic, None, usize::MAX)
980 .await
981 .map_err(log_error)?;
982 for (_, event) in events {
983 if !event_matches_request(&event, &request.request_id)
984 || event.kind != "hitl.response_received"
985 {
986 continue;
987 }
988 let response: HitlHostResponse = serde_json::from_value(event.payload)
989 .map_err(|error| VmError::Runtime(error.to_string()))?;
990 if let Some(reviewer) = response.reviewer.as_deref() {
991 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
992 continue;
993 }
994 if progress.reviewers.contains(reviewer) {
995 continue;
996 }
997 }
998 if response.approved.unwrap_or(false) {
999 if let Some(reviewer) = response.reviewer.clone() {
1000 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
1001 progress.reviewers.insert(reviewer.clone());
1002 progress.signatures.push(ApprovalSignature {
1003 reviewer: reviewer.clone(),
1004 signed_at: signed_at.clone(),
1005 signature: response.signature.clone().unwrap_or_else(|| {
1006 approval_receipt_signature(
1007 &request.request_id,
1008 &reviewer,
1009 &signed_at,
1010 true,
1011 response.reason.as_deref(),
1012 )
1013 }),
1014 });
1015 }
1016 progress.reason = response.reason.clone();
1017 progress.approved_at = response.responded_at.clone();
1018 if progress.reviewers.len() as u32 >= quorum {
1019 return Ok(ApprovalResolution::Approved(progress));
1020 }
1021 continue;
1022 }
1023 return Ok(ApprovalResolution::Denied(response));
1024 }
1025 Ok(ApprovalResolution::Pending)
1026}
1027
1028fn approval_quorum_from_request(
1029 kind: HitlRequestKind,
1030 request: &HitlRequestEnvelope,
1031) -> Result<u32, VmError> {
1032 let key = match kind {
1033 HitlRequestKind::DualControl => "n",
1034 _ => "quorum",
1035 };
1036 let quorum = request
1037 .payload
1038 .get(key)
1039 .or_else(|| request.payload.get("approvers_required"))
1040 .or_else(|| {
1041 request
1042 .payload
1043 .get("approval_request")
1044 .and_then(|approval| approval.get("approvers_required"))
1045 })
1046 .and_then(JsonValue::as_u64)
1047 .unwrap_or(1);
1048 u32::try_from(quorum).map_err(|_| {
1049 VmError::Runtime(format!(
1050 "invalid quorum in HITL request '{}'",
1051 request.request_id
1052 ))
1053 })
1054}
1055
1056fn approval_reviewers_from_request(
1057 kind: HitlRequestKind,
1058 request: &HitlRequestEnvelope,
1059) -> Vec<String> {
1060 let key = match kind {
1061 HitlRequestKind::DualControl => "approvers",
1062 _ => "reviewers",
1063 };
1064 request
1065 .payload
1066 .get(key)
1067 .or_else(|| {
1068 request
1069 .payload
1070 .get("approval_request")
1071 .and_then(|approval| approval.get("reviewers"))
1072 })
1073 .and_then(JsonValue::as_array)
1074 .map(|values| {
1075 values
1076 .iter()
1077 .filter_map(JsonValue::as_str)
1078 .map(str::to_string)
1079 .collect()
1080 })
1081 .unwrap_or_default()
1082}
1083
1084fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
1085 json!({
1086 "approved": true,
1087 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1088 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1089 "reason": progress.reason,
1090 "signatures": progress.signatures,
1091 })
1092}
1093
1094fn approval_receipt_signature(
1095 request_id: &str,
1096 reviewer: &str,
1097 signed_at: &str,
1098 approved: bool,
1099 reason: Option<&str>,
1100) -> String {
1101 let material = format!(
1102 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
1103 reason.unwrap_or("")
1104 );
1105 let hash = sha2::Sha256::digest(material.as_bytes());
1106 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
1107 format!("sha256:{hex}")
1108}
1109
1110fn approval_record_from_waitpoint(
1111 record: &WaitpointRecord,
1112 builtin: &str,
1113) -> Result<VmValue, VmError> {
1114 record
1115 .value
1116 .as_ref()
1117 .map(crate::stdlib::json_to_vm_value)
1118 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
1119}
1120
1121async fn approval_wait_error(
1122 log: &Arc<AnyEventLog>,
1123 kind: HitlRequestKind,
1124 request_id: &str,
1125) -> VmError {
1126 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
1127 if record.status == WaitpointStatus::Cancelled
1128 && record.reason.as_deref() != Some("upstream_cancelled")
1129 {
1130 return approval_denied_error(
1131 request_id,
1132 HitlHostResponse {
1133 request_id: request_id.to_string(),
1134 answer: None,
1135 approved: Some(false),
1136 accepted: None,
1137 reviewer: record.cancelled_by.clone(),
1138 reason: record.reason.clone(),
1139 metadata: record.metadata.clone(),
1140 responded_at: record.cancelled_at.clone(),
1141 signature: None,
1142 },
1143 );
1144 }
1145 if record.status == WaitpointStatus::Cancelled {
1146 return hitl_cancelled_error(
1147 request_id,
1148 kind,
1149 "",
1150 &[request_id.to_string()],
1151 record.reason.clone(),
1152 );
1153 }
1154 }
1155 hitl_cancelled_error(
1156 request_id,
1157 kind,
1158 "",
1159 &[request_id.to_string()],
1160 Some("upstream_cancelled".to_string()),
1161 )
1162}
1163
1164async fn append_timeout_once(
1165 log: &Arc<AnyEventLog>,
1166 kind: HitlRequestKind,
1167 request_id: &str,
1168 trace_id: &str,
1169) -> Result<(), VmError> {
1170 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
1171 return Ok(());
1172 }
1173 append_timeout(log, kind, request_id, trace_id).await
1174}
1175
1176async fn hitl_event_exists(
1177 log: &Arc<AnyEventLog>,
1178 kind: HitlRequestKind,
1179 request_id: &str,
1180 event_kind: &str,
1181) -> Result<bool, VmError> {
1182 let topic = topic(kind)?;
1183 let events = log
1184 .read_range(&topic, None, usize::MAX)
1185 .await
1186 .map_err(log_error)?;
1187 Ok(events
1188 .into_iter()
1189 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1190}
1191
1192fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1193 match kind {
1194 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1195 _ => "hitl.approval_approved",
1196 }
1197}
1198
1199fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1200 match kind {
1201 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1202 _ => "hitl.approval_denied",
1203 }
1204}
1205
1206async fn append_request(
1207 log: &Arc<AnyEventLog>,
1208 request: &HitlRequestEnvelope,
1209) -> Result<(), VmError> {
1210 let topic = topic(request.kind)?;
1211 log.append(
1212 &topic,
1213 LogEvent::new(
1214 request.kind.request_event_kind(),
1215 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1216 )
1217 .with_headers(request_headers(request)),
1218 )
1219 .await
1220 .map(|_| ())
1221 .map_err(log_error)
1222}
1223
1224async fn append_named_event(
1225 log: &Arc<AnyEventLog>,
1226 kind: HitlRequestKind,
1227 event_kind: &str,
1228 request_id: &str,
1229 trace_id: &str,
1230 payload: JsonValue,
1231) -> Result<(), VmError> {
1232 let topic = topic(kind)?;
1233 let headers = headers_with_trace(request_id, trace_id);
1234 log.append(
1235 &topic,
1236 LogEvent::new(event_kind, payload).with_headers(headers),
1237 )
1238 .await
1239 .map(|_| ())
1240 .map_err(log_error)
1241}
1242
1243async fn append_timeout(
1244 log: &Arc<AnyEventLog>,
1245 kind: HitlRequestKind,
1246 request_id: &str,
1247 trace_id: &str,
1248) -> Result<(), VmError> {
1249 append_named_event(
1250 log,
1251 kind,
1252 "hitl.timeout",
1253 request_id,
1254 trace_id,
1255 serde_json::to_value(HitlTimeoutRecord {
1256 request_id: request_id.to_string(),
1257 kind,
1258 trace_id: trace_id.to_string(),
1259 timed_out_at: now_rfc3339(),
1260 })
1261 .map_err(|error| VmError::Runtime(error.to_string()))?,
1262 )
1263 .await
1264}
1265
1266async fn maybe_apply_mock_response(
1267 kind: HitlRequestKind,
1268 request_id: &str,
1269 request_payload: &JsonValue,
1270) -> Result<(), VmError> {
1271 let mut params = request_payload
1272 .as_object()
1273 .cloned()
1274 .unwrap_or_default()
1275 .into_iter()
1276 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1277 .collect::<BTreeMap<_, _>>();
1278 params.insert(
1279 "request_id".to_string(),
1280 VmValue::String(Rc::from(request_id.to_string())),
1281 );
1282 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1283 return Ok(());
1284 };
1285 let value = result?;
1286 let responses = match value {
1287 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1288 other => vec![other],
1289 };
1290 for response in responses {
1291 let response_dict = response.as_dict().ok_or_else(|| {
1292 VmError::Runtime(format!(
1293 "mocked HITL {} response must be a dict or list<dict>",
1294 kind.as_str()
1295 ))
1296 })?;
1297 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1298 append_hitl_response(None, hitl_response)
1299 .await
1300 .map_err(VmError::Runtime)?;
1301 }
1302 Ok(())
1303}
1304
1305fn parse_hitl_response_dict(
1306 request_id: &str,
1307 response_dict: &BTreeMap<String, VmValue>,
1308) -> Result<HitlHostResponse, VmError> {
1309 Ok(HitlHostResponse {
1310 request_id: request_id.to_string(),
1311 answer: response_dict
1312 .get("answer")
1313 .map(crate::llm::vm_value_to_json),
1314 approved: response_dict.get("approved").and_then(vm_bool),
1315 accepted: response_dict.get("accepted").and_then(vm_bool),
1316 reviewer: response_dict.get("reviewer").map(VmValue::display),
1317 reason: response_dict.get("reason").map(VmValue::display),
1318 metadata: response_dict
1319 .get("metadata")
1320 .map(crate::llm::vm_value_to_json),
1321 responded_at: response_dict.get("responded_at").map(VmValue::display),
1322 signature: response_dict.get("signature").map(VmValue::display),
1323 })
1324}
1325
1326fn maybe_notify_host(request: &HitlRequestEnvelope) {
1327 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1328 return;
1329 };
1330 bridge.notify(
1331 "harn.hitl.requested",
1332 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1333 );
1334}
1335
1336fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1337 let Some(value) = value else {
1338 return Ok(AskUserOptions {
1339 schema: None,
1340 timeout: Some(default_question_timeout()),
1341 default: None,
1342 });
1343 };
1344 let dict = value
1345 .as_dict()
1346 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1347 Ok(AskUserOptions {
1348 schema: dict
1349 .get("schema")
1350 .cloned()
1351 .filter(|value| !matches!(value, VmValue::Nil)),
1352 timeout: dict
1353 .get("timeout")
1354 .map(parse_duration_value)
1355 .transpose()?
1356 .or_else(|| Some(default_question_timeout())),
1357 default: dict
1358 .get("default")
1359 .cloned()
1360 .filter(|value| !matches!(value, VmValue::Nil)),
1361 })
1362}
1363
1364fn default_question_timeout() -> StdDuration {
1365 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1366}
1367
1368fn escalation_capability_policy() -> JsonValue {
1369 crate::orchestration::current_execution_policy()
1370 .and_then(|policy| serde_json::to_value(policy).ok())
1371 .unwrap_or(JsonValue::Null)
1372}
1373
1374fn parse_approval_options(
1375 value: Option<&VmValue>,
1376 builtin: &str,
1377) -> Result<ApprovalOptions, VmError> {
1378 let dict = match value {
1379 None => None,
1380 Some(VmValue::Dict(dict)) => Some(dict),
1381 Some(_) => {
1382 return Err(VmError::Runtime(format!(
1383 "{builtin}: options must be a dict"
1384 )))
1385 }
1386 };
1387 let quorum = dict
1388 .and_then(|dict| dict.get("quorum"))
1389 .and_then(VmValue::as_int)
1390 .unwrap_or(1);
1391 if quorum <= 0 {
1392 return Err(VmError::Runtime(format!(
1393 "{builtin}: quorum must be positive"
1394 )));
1395 }
1396 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1397 let capabilities_requested = optional_string_list(
1398 dict.and_then(|dict| dict.get("capabilities_requested")),
1399 builtin,
1400 )?;
1401 let evidence_refs = dict
1402 .and_then(|dict| dict.get("evidence_refs"))
1403 .map(|value| match value {
1404 VmValue::List(items) => Ok(items
1405 .iter()
1406 .map(crate::llm::vm_value_to_json)
1407 .collect::<Vec<_>>()),
1408 _ => Err(VmError::Runtime(format!(
1409 "{builtin}: evidence_refs must be a list"
1410 ))),
1411 })
1412 .transpose()?
1413 .unwrap_or_default();
1414 let deadline = dict
1415 .and_then(|dict| dict.get("deadline"))
1416 .map(parse_duration_value)
1417 .transpose()?
1418 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1419 Ok(ApprovalOptions {
1420 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1421 args: dict.and_then(|dict| dict.get("args")).cloned(),
1422 quorum: quorum as u32,
1423 reviewers,
1424 deadline,
1425 principal: dict
1426 .and_then(|dict| dict.get("principal"))
1427 .map(VmValue::display)
1428 .filter(|value| !value.is_empty()),
1429 evidence_refs,
1430 undo_metadata: dict
1431 .and_then(|dict| dict.get("undo_metadata"))
1432 .map(crate::llm::vm_value_to_json),
1433 capabilities_requested,
1434 })
1435}
1436
1437fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1438 args.get(idx)
1439 .map(VmValue::display)
1440 .filter(|value| !value.is_empty())
1441 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1442}
1443
1444fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1445 let value = args
1446 .get(idx)
1447 .and_then(VmValue::as_int)
1448 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1449 if value <= 0 {
1450 return Err(VmError::Runtime(format!(
1451 "{builtin}: expected a positive int at {idx}"
1452 )));
1453 }
1454 Ok(value)
1455}
1456
1457fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1458 let Some(value) = value else {
1459 return Ok(Vec::new());
1460 };
1461 match value {
1462 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1463 _ => Err(VmError::Runtime(format!(
1464 "{builtin}: expected list<string>"
1465 ))),
1466 }
1467}
1468
1469fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1470 match value {
1471 VmValue::Duration(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1472 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1473 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1474 _ => Err(VmError::Runtime(
1475 "expected a duration or millisecond count".to_string(),
1476 )),
1477 }
1478}
1479
1480fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1481 active_event_log()
1482 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1483}
1484
1485fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1486 if let Some(log) = active_event_log() {
1487 return Ok(log);
1488 }
1489 let Some(base_dir) = base_dir else {
1490 return Ok(install_memory_for_current_thread(
1491 HITL_EVENT_LOG_QUEUE_DEPTH,
1492 ));
1493 };
1494 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1495}
1496
1497fn current_dispatch_keys() -> Option<DispatchKeys> {
1498 let context = current_dispatch_context()?;
1499 let stable_base = context
1500 .replay_of_event_id
1501 .clone()
1502 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1503 let instance_key = format!(
1504 "{}::{}",
1505 context.trigger_event.id.0,
1506 context.replay_of_event_id.as_deref().unwrap_or("live")
1507 );
1508 Some(DispatchKeys {
1509 instance_key,
1510 stable_base,
1511 agent: context.agent_id,
1512 trace_id: context.trigger_event.trace_id.0,
1513 })
1514}
1515
1516fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1517 if let Some(keys) = dispatch_keys {
1518 let seq = REQUEST_SEQUENCE.with(|slot| {
1519 let mut state = slot.borrow_mut();
1520 if state.instance_key != keys.instance_key {
1521 state.instance_key = keys.instance_key.clone();
1522 state.next_seq = 0;
1523 }
1524 state.next_seq += 1;
1525 state.next_seq
1526 });
1527 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1528 }
1529 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1530}
1531
1532fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1533 let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1534 if let Some(run_id) = request.run_id.as_ref() {
1535 headers.insert("run_id".to_string(), run_id.clone());
1536 }
1537 headers
1538}
1539
1540fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1541 let mut headers = BTreeMap::new();
1542 headers.insert("request_id".to_string(), request_id.to_string());
1543 headers
1544}
1545
1546fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1547 let mut headers = response_headers(request_id);
1548 headers.insert("trace_id".to_string(), trace_id.to_string());
1549 headers
1550}
1551
1552fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1553 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1554}
1555
1556fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1557 event
1558 .headers
1559 .get("request_id")
1560 .is_some_and(|value| value == request_id)
1561 || event
1562 .payload
1563 .get("request_id")
1564 .and_then(JsonValue::as_str)
1565 .is_some_and(|value| value == request_id)
1566}
1567
1568fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1569 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1570 "name": "ApprovalDeniedError",
1571 "category": "generic",
1572 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1573 "request_id": request_id,
1574 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1575 "reason": response.reason,
1576 })))
1577}
1578
1579fn hitl_cancelled_error(
1580 request_id: &str,
1581 kind: HitlRequestKind,
1582 wait_id: &str,
1583 waitpoint_ids: &[String],
1584 reason: Option<String>,
1585) -> VmError {
1586 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1587 let message = reason
1588 .clone()
1589 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1590 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1591 "name": "HumanCancelledError",
1592 "category": ErrorCategory::Cancelled.as_str(),
1593 "message": message,
1594 "request_id": request_id,
1595 "kind": kind.as_str(),
1596 "wait_id": wait_id,
1597 "waitpoint_ids": waitpoint_ids,
1598 "reason": reason,
1599 })))
1600}
1601
1602fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1603 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1604 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1605 "name": "HumanTimeoutError",
1606 "category": ErrorCategory::Timeout.as_str(),
1607 "message": format!("{} timed out", kind.as_str()),
1608 "request_id": request_id,
1609 "kind": kind.as_str(),
1610 })))
1611}
1612
1613fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1614 match default {
1615 VmValue::Int(_) => match value {
1616 VmValue::Int(_) => value.clone(),
1617 VmValue::Float(number) => VmValue::Int(*number as i64),
1618 VmValue::String(text) => text
1619 .parse::<i64>()
1620 .map(VmValue::Int)
1621 .unwrap_or_else(|_| default.clone()),
1622 _ => default.clone(),
1623 },
1624 VmValue::Float(_) => match value {
1625 VmValue::Float(_) => value.clone(),
1626 VmValue::Int(number) => VmValue::Float(*number as f64),
1627 VmValue::String(text) => text
1628 .parse::<f64>()
1629 .map(VmValue::Float)
1630 .unwrap_or_else(|_| default.clone()),
1631 _ => default.clone(),
1632 },
1633 VmValue::Bool(_) => match value {
1634 VmValue::Bool(_) => value.clone(),
1635 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1636 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1637 _ => default.clone(),
1638 },
1639 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1640 VmValue::Duration(_) => match value {
1641 VmValue::Duration(_) => value.clone(),
1642 VmValue::Int(ms) => VmValue::Duration(*ms),
1643 _ => default.clone(),
1644 },
1645 VmValue::Nil => value.clone(),
1646 _ => {
1647 if value.type_name() == default.type_name() {
1648 value.clone()
1649 } else {
1650 default.clone()
1651 }
1652 }
1653 }
1654}
1655
1656fn log_error(error: impl std::fmt::Display) -> VmError {
1657 VmError::Runtime(error.to_string())
1658}
1659
1660fn now_rfc3339() -> String {
1661 format_rfc3339(OffsetDateTime::now_utc())
1662}
1663
1664fn format_rfc3339(timestamp: OffsetDateTime) -> String {
1665 timestamp
1666 .format(&Rfc3339)
1667 .unwrap_or_else(|_| timestamp.to_string())
1668}
1669
1670fn deadline_after(requested_at: OffsetDateTime, duration: StdDuration) -> Option<String> {
1671 time::Duration::try_from(duration)
1672 .ok()
1673 .map(|duration| format_rfc3339(requested_at + duration))
1674}
1675
1676fn new_trace_id() -> String {
1677 format!("trace_{}", Uuid::now_v7())
1678}
1679
1680fn vm_bool(value: &VmValue) -> Option<bool> {
1681 match value {
1682 VmValue::Bool(flag) => Some(*flag),
1683 _ => None,
1684 }
1685}
1686
1687fn vm_string(value: &VmValue) -> Option<&str> {
1688 match value {
1689 VmValue::String(text) => Some(text.as_ref()),
1690 _ => None,
1691 }
1692}
1693
1694fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1695 match value {
1696 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1697 _ => None,
1698 }
1699}
1700
1701#[cfg(test)]
1702mod tests {
1703 use super::{
1704 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1705 };
1706 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1707 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1708
1709 async fn execute_hitl_script(
1710 base_dir: &std::path::Path,
1711 source: &str,
1712 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1713 reset_thread_local_state();
1714 let log = install_default_for_base_dir(base_dir).expect("install event log");
1715 let chunk = compile_source(source).expect("compile source");
1716 let mut vm = Vm::new();
1717 register_vm_stdlib(&mut vm);
1718 vm.set_source_dir(base_dir);
1719 vm.execute(&chunk).await?;
1720 let output = vm.output().trim_end().to_string();
1721 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1722 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1723 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1724 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1725 Ok((
1726 output,
1727 question_events,
1728 approval_events,
1729 dual_control_events,
1730 escalation_events,
1731 ))
1732 }
1733
1734 async fn event_kinds(
1735 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1736 topic: &str,
1737 ) -> Vec<String> {
1738 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1739 .await
1740 .expect("read topic")
1741 .into_iter()
1742 .map(|(_, event)| event.kind)
1743 .collect()
1744 }
1745
1746 async fn event_payloads(
1747 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1748 topic: &str,
1749 ) -> Vec<serde_json::Value> {
1750 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1751 .await
1752 .expect("read topic")
1753 .into_iter()
1754 .map(|(_, event)| event.payload)
1755 .collect()
1756 }
1757
1758 #[tokio::test(flavor = "current_thread")]
1759 async fn ask_user_coerces_to_default_type_and_logs_events() {
1760 tokio::task::LocalSet::new()
1761 .run_until(async {
1762 let dir = tempfile::tempdir().expect("tempdir");
1763 let source = r#"
1764pipeline test(task) {
1765 host_mock("hitl", "question", {answer: "9"})
1766 let answer: int = ask_user("Pick a number", {default: 0})
1767 println(answer)
1768}
1769"#;
1770 let (
1771 output,
1772 question_events,
1773 approval_events,
1774 dual_control_events,
1775 escalation_events,
1776 ) = execute_hitl_script(dir.path(), source)
1777 .await
1778 .expect("script succeeds");
1779 assert_eq!(output, "9");
1780 assert_eq!(
1781 question_events,
1782 vec![
1783 "hitl.question_asked".to_string(),
1784 "hitl.response_received".to_string()
1785 ]
1786 );
1787 assert!(approval_events.is_empty());
1788 assert!(dual_control_events.is_empty());
1789 assert!(escalation_events.is_empty());
1790 })
1791 .await;
1792 }
1793
1794 #[tokio::test(flavor = "current_thread")]
1795 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1796 tokio::task::LocalSet::new()
1797 .run_until(async {
1798 let dir = tempfile::tempdir().expect("tempdir");
1799 let source = r#"
1800pipeline test(task) {
1801 host_mock("hitl", "approval", [
1802 {approved: true, reviewer: "alice", reason: "ok"},
1803 {approved: true, reviewer: "bob", reason: "ship it"},
1804 ])
1805 let record = request_approval(
1806 "deploy production",
1807 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1808 )
1809 println(record.approved)
1810 println(len(record.reviewers))
1811 println(record.reviewers[0])
1812 println(record.reviewers[1])
1813}
1814"#;
1815 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1816 .await
1817 .expect("script succeeds");
1818 assert_eq!(
1819 approval_events,
1820 vec![
1821 "hitl.approval_requested".to_string(),
1822 "hitl.response_received".to_string(),
1823 "hitl.response_received".to_string(),
1824 "hitl.approval_approved".to_string(),
1825 ]
1826 );
1827 })
1828 .await;
1829 }
1830
1831 #[tokio::test(flavor = "current_thread")]
1832 async fn request_approval_emits_canonical_approval_request_payload() {
1833 tokio::task::LocalSet::new()
1834 .run_until(async {
1835 reset_thread_local_state();
1836 let dir = tempfile::tempdir().expect("tempdir");
1837 let log = install_default_for_base_dir(dir.path()).expect("install event log");
1838 let source = r#"
1839pipeline test(task) {
1840 host_mock("hitl", "approval", {approved: true, reviewer: "alice", reason: "ok"})
1841 request_approval("deploy production", {
1842 args: {environment: "prod"},
1843 quorum: 1,
1844 reviewers: ["alice"],
1845 evidence_refs: [{kind: "run", uri: "run_123"}],
1846 undo_metadata: {strategy: "rollback"},
1847 capabilities_requested: ["deploy.production"],
1848 })
1849}
1850"#;
1851 let chunk = compile_source(source).expect("compile source");
1852 let mut vm = Vm::new();
1853 register_vm_stdlib(&mut vm);
1854 vm.set_source_dir(dir.path());
1855 vm.execute(&chunk).await.expect("script succeeds");
1856
1857 let payloads = event_payloads(log, HITL_APPROVALS_TOPIC).await;
1858 let request_payload = &payloads[0]["payload"];
1859 let approval_request = &request_payload["approval_request"];
1860 assert_eq!(approval_request["id"], request_payload["id"]);
1861 assert_eq!(approval_request["action"], "deploy production");
1862 assert_eq!(approval_request["args"]["environment"], "prod");
1863 assert_eq!(approval_request["approvers_required"], 1);
1864 assert_eq!(approval_request["evidence_refs"][0]["uri"], "run_123");
1865 assert_eq!(approval_request["undo_metadata"]["strategy"], "rollback");
1866 assert_eq!(
1867 approval_request["capabilities_requested"][0],
1868 "deploy.production"
1869 );
1870 assert!(approval_request["requested_at"].as_str().is_some());
1871 assert!(approval_request["deadline"].as_str().is_some());
1872 })
1873 .await;
1874 }
1875
1876 #[tokio::test(flavor = "current_thread")]
1877 async fn request_approval_surfaces_denials_as_typed_errors() {
1878 tokio::task::LocalSet::new()
1879 .run_until(async {
1880 let dir = tempfile::tempdir().expect("tempdir");
1881 let source = r#"
1882pipeline test(task) {
1883 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1884 let denied = try {
1885 request_approval("drop table", {reviewers: ["alice"]})
1886 }
1887 println(is_err(denied))
1888 println(unwrap_err(denied).name)
1889 println(unwrap_err(denied).reason)
1890}
1891"#;
1892 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1893 .await
1894 .expect("script succeeds");
1895 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1896 assert_eq!(
1897 approval_events,
1898 vec![
1899 "hitl.approval_requested".to_string(),
1900 "hitl.response_received".to_string(),
1901 "hitl.approval_denied".to_string(),
1902 ]
1903 );
1904 })
1905 .await;
1906 }
1907
1908 #[tokio::test(flavor = "current_thread")]
1909 async fn dual_control_executes_action_after_quorum() {
1910 tokio::task::LocalSet::new()
1911 .run_until(async {
1912 let dir = tempfile::tempdir().expect("tempdir");
1913 let source = r#"
1914pipeline test(task) {
1915 host_mock("hitl", "dual_control", [
1916 {approved: true, reviewer: "alice"},
1917 {approved: true, reviewer: "bob"},
1918 ])
1919 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1920 println(result)
1921}
1922"#;
1923 let (output, _, _, dual_control_events, _) =
1924 execute_hitl_script(dir.path(), source)
1925 .await
1926 .expect("script succeeds");
1927 assert_eq!(output, "launched");
1928 assert_eq!(
1929 dual_control_events,
1930 vec![
1931 "hitl.dual_control_requested".to_string(),
1932 "hitl.response_received".to_string(),
1933 "hitl.response_received".to_string(),
1934 "hitl.dual_control_approved".to_string(),
1935 "hitl.dual_control_executed".to_string(),
1936 ]
1937 );
1938 })
1939 .await;
1940 }
1941
1942 #[tokio::test(flavor = "current_thread")]
1943 async fn escalate_to_waits_for_acceptance_event() {
1944 tokio::task::LocalSet::new()
1945 .run_until(async {
1946 let dir = tempfile::tempdir().expect("tempdir");
1947 let source = r#"
1948pipeline test(task) {
1949 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1950 let handle = escalate_to("admin", "need override")
1951 println(handle.status)
1952 println(handle.reviewer)
1953}
1954"#;
1955 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1956 .await
1957 .expect("script succeeds");
1958 assert_eq!(output, "accepted\nlead");
1959 assert_eq!(
1960 escalation_events,
1961 vec![
1962 "hitl.escalation_issued".to_string(),
1963 "hitl.escalation_accepted".to_string(),
1964 ]
1965 );
1966 })
1967 .await;
1968 }
1969}