1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24 WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45 pub(crate) instance_key: String,
46 pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52 Question,
53 Approval,
54 DualControl,
55 Escalation,
56}
57
58impl HitlRequestKind {
59 pub(crate) fn as_str(self) -> &'static str {
60 match self {
61 Self::Question => "question",
62 Self::Approval => "approval",
63 Self::DualControl => "dual_control",
64 Self::Escalation => "escalation",
65 }
66 }
67
68 fn topic(self) -> &'static str {
69 match self {
70 Self::Question => HITL_QUESTIONS_TOPIC,
71 Self::Approval => HITL_APPROVALS_TOPIC,
72 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73 Self::Escalation => HITL_ESCALATIONS_TOPIC,
74 }
75 }
76
77 fn request_event_kind(self) -> &'static str {
78 match self {
79 Self::Question => "hitl.question_asked",
80 Self::Approval => "hitl.approval_requested",
81 Self::DualControl => "hitl.dual_control_requested",
82 Self::Escalation => "hitl.escalation_issued",
83 }
84 }
85
86 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87 if request_id.starts_with("hitl_question_") {
88 Some(Self::Question)
89 } else if request_id.starts_with("hitl_approval_") {
90 Some(Self::Approval)
91 } else if request_id.starts_with("hitl_dual_control_") {
92 Some(Self::DualControl)
93 } else if request_id.starts_with("hitl_escalation_") {
94 Some(Self::Escalation)
95 } else {
96 None
97 }
98 }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103 pub request_id: String,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub answer: Option<JsonValue>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub approved: Option<bool>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub accepted: Option<bool>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub reviewer: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub reason: Option<String>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub metadata: Option<JsonValue>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub responded_at: Option<String>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124 request_id: String,
125 kind: HitlRequestKind,
126 #[serde(default)]
127 agent: String,
128 trace_id: String,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 run_id: Option<String>,
131 requested_at: String,
132 payload: JsonValue,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136struct HitlTimeoutRecord {
137 request_id: String,
138 kind: HitlRequestKind,
139 trace_id: String,
140 timed_out_at: String,
141}
142
143#[derive(Clone, Debug)]
144struct DispatchKeys {
145 instance_key: String,
146 stable_base: String,
147 agent: String,
148 trace_id: String,
149}
150
151#[derive(Clone, Debug)]
152struct AskUserOptions {
153 schema: Option<VmValue>,
154 timeout: Option<StdDuration>,
155 default: Option<VmValue>,
156}
157
158#[derive(Clone, Debug)]
159struct ApprovalOptions {
160 detail: Option<VmValue>,
161 quorum: u32,
162 reviewers: Vec<String>,
163 deadline: StdDuration,
164}
165
166#[derive(Clone, Debug)]
167struct ApprovalProgress {
168 reviewers: BTreeSet<String>,
169 signatures: Vec<ApprovalSignature>,
170 reason: Option<String>,
171 approved_at: Option<String>,
172}
173
174#[derive(Clone, Debug, Serialize)]
175struct ApprovalSignature {
176 reviewer: String,
177 signed_at: String,
178 signature: String,
179}
180
181#[derive(Clone, Debug)]
182enum ApprovalResolution {
183 Pending,
184 Approved(ApprovalProgress),
185 Denied(HitlHostResponse),
186}
187
188#[derive(Clone, Debug)]
189enum WaitpointOutcome {
190 Completed(WaitpointRecord),
191 Timeout,
192 Cancelled {
193 wait_id: String,
194 waitpoint_ids: Vec<String>,
195 reason: Option<String>,
196 },
197}
198
199pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
200 vm.register_async_builtin("ask_user", |args| {
201 Box::pin(async move { ask_user_impl(&args).await })
202 });
203
204 vm.register_async_builtin("request_approval", |args| {
205 Box::pin(async move { request_approval_impl(&args).await })
206 });
207
208 vm.register_async_builtin("dual_control", |args| {
209 Box::pin(async move { dual_control_impl(&args).await })
210 });
211
212 vm.register_async_builtin("escalate_to", |args| {
213 Box::pin(async move { escalate_to_impl(&args).await })
214 });
215}
216
217pub(crate) fn reset_hitl_state() {
218 REQUEST_SEQUENCE.with(|slot| {
219 *slot.borrow_mut() = RequestSequenceState::default();
220 });
221}
222
223pub(crate) fn take_hitl_state() -> RequestSequenceState {
224 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
225}
226
227pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
228 REQUEST_SEQUENCE.with(|slot| {
229 *slot.borrow_mut() = state;
230 });
231}
232
233pub async fn append_hitl_response(
234 base_dir: Option<&Path>,
235 mut response: HitlHostResponse,
236) -> Result<u64, String> {
237 let kind = HitlRequestKind::from_request_id(&response.request_id)
238 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
239 if response.responded_at.is_none() {
240 response.responded_at = Some(now_rfc3339());
241 }
242 let log = ensure_hitl_event_log_for(base_dir)?;
243 let headers = response_headers(&response.request_id);
244 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
245 let event_id = log
246 .append(
247 &topic,
248 LogEvent::new(
249 match kind {
250 HitlRequestKind::Escalation => "hitl.escalation_accepted",
251 _ => "hitl.response_received",
252 },
253 serde_json::to_value(&response).map_err(|error| error.to_string())?,
254 )
255 .with_headers(headers),
256 )
257 .await
258 .map_err(|error| error.to_string())?;
259 finalize_hitl_response(&log, kind, &response).await?;
260 Ok(event_id)
261}
262
263pub async fn append_approval_request_on(
264 log: &Arc<AnyEventLog>,
265 agent: impl Into<String>,
266 trace_id: impl Into<String>,
267 action: impl Into<String>,
268 detail: JsonValue,
269 reviewers: Vec<String>,
270) -> Result<String, VmError> {
271 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
272 let trace_id = trace_id.into();
273 let request = HitlRequestEnvelope {
274 request_id: request_id.clone(),
275 kind: HitlRequestKind::Approval,
276 agent: agent.into(),
277 trace_id: trace_id.clone(),
278 run_id: None,
279 requested_at: now_rfc3339(),
280 payload: json!({
281 "action": action.into(),
282 "detail": detail,
283 "quorum": 1,
284 "reviewers": reviewers,
285 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
286 }),
287 };
288 create_request_waitpoint(log, &request).await?;
289 append_request(log, &request).await?;
290 maybe_notify_host(&request);
291 Ok(request_id)
292}
293
294async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
295 let prompt = required_string_arg(args, 0, "ask_user")?;
296 let options = parse_ask_user_options(args.get(1))?;
297 let keys = current_dispatch_keys();
298 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
299 let trace_id = keys
300 .as_ref()
301 .map(|keys| keys.trace_id.clone())
302 .unwrap_or_else(new_trace_id);
303 let log = ensure_hitl_event_log();
304 let request = HitlRequestEnvelope {
305 request_id: request_id.clone(),
306 kind: HitlRequestKind::Question,
307 agent: keys
308 .as_ref()
309 .map(|keys| keys.agent.clone())
310 .unwrap_or_default(),
311 trace_id: trace_id.clone(),
312 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
313 requested_at: now_rfc3339(),
314 payload: json!({
315 "prompt": prompt,
316 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
317 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
318 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
319 }),
320 };
321 create_request_waitpoint(&log, &request).await?;
322 append_request(&log, &request).await?;
323 maybe_notify_host(&request);
324 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
325
326 match wait_for_request_waitpoint(&request_id, options.timeout).await? {
327 WaitpointOutcome::Completed(record) => {
328 let answer = record
329 .value
330 .as_ref()
331 .map(crate::stdlib::json_to_vm_value)
332 .unwrap_or(VmValue::Nil);
333 if let Some(schema) = options.schema.as_ref() {
334 return schema_expect_value(&answer, schema, true);
335 }
336 if let Some(default) = options.default.as_ref() {
337 return Ok(coerce_like_default(&answer, default));
338 }
339 Ok(answer)
340 }
341 WaitpointOutcome::Timeout => {
342 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
343 if let Some(default) = options.default {
344 return Ok(default);
345 }
346 Err(timeout_error(&request_id, HitlRequestKind::Question))
347 }
348 WaitpointOutcome::Cancelled {
349 wait_id,
350 waitpoint_ids,
351 reason,
352 } => Err(hitl_cancelled_error(
353 &request_id,
354 HitlRequestKind::Question,
355 &wait_id,
356 &waitpoint_ids,
357 reason,
358 )),
359 }
360}
361
362async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
363 let action = required_string_arg(args, 0, "request_approval")?;
364 let options = parse_approval_options(args.get(1), "request_approval")?;
365 let keys = current_dispatch_keys();
366 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
367 let trace_id = keys
368 .as_ref()
369 .map(|keys| keys.trace_id.clone())
370 .unwrap_or_else(new_trace_id);
371 let log = ensure_hitl_event_log();
372 let request = HitlRequestEnvelope {
373 request_id: request_id.clone(),
374 kind: HitlRequestKind::Approval,
375 agent: keys
376 .as_ref()
377 .map(|keys| keys.agent.clone())
378 .unwrap_or_default(),
379 trace_id: trace_id.clone(),
380 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
381 requested_at: now_rfc3339(),
382 payload: json!({
383 "action": action,
384 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
385 "quorum": options.quorum,
386 "reviewers": options.reviewers,
387 "deadline_ms": options.deadline.as_millis() as u64,
388 }),
389 };
390 create_request_waitpoint(&log, &request).await?;
391 append_request(&log, &request).await?;
392 maybe_notify_host(&request);
393 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
394
395 match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
396 WaitpointOutcome::Completed(record) => {
397 approval_record_from_waitpoint(&record, "request_approval")
398 }
399 WaitpointOutcome::Timeout => {
400 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
401 Err(timeout_error(&request_id, HitlRequestKind::Approval))
402 }
403 WaitpointOutcome::Cancelled { .. } => {
404 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
405 }
406 }
407}
408
409async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
410 let n = required_positive_int_arg(args, 0, "dual_control")?;
411 let m = required_positive_int_arg(args, 1, "dual_control")?;
412 if n > m {
413 return Err(VmError::Runtime(
414 "dual_control: n must be less than or equal to m".to_string(),
415 ));
416 }
417 let action = args
418 .get(2)
419 .and_then(|value| match value {
420 VmValue::Closure(closure) => Some(closure.clone()),
421 _ => None,
422 })
423 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
424 let approvers = optional_string_list(args.get(3), "dual_control")?;
425 if !approvers.is_empty() && approvers.len() < m as usize {
426 return Err(VmError::Runtime(format!(
427 "dual_control: expected at least {m} approvers, got {}",
428 approvers.len()
429 )));
430 }
431
432 let keys = current_dispatch_keys();
433 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
434 let trace_id = keys
435 .as_ref()
436 .map(|keys| keys.trace_id.clone())
437 .unwrap_or_else(new_trace_id);
438 let action_name = if action.func.name.is_empty() {
439 "anonymous".to_string()
440 } else {
441 action.func.name.clone()
442 };
443 let log = ensure_hitl_event_log();
444 let request = HitlRequestEnvelope {
445 request_id: request_id.clone(),
446 kind: HitlRequestKind::DualControl,
447 agent: keys
448 .as_ref()
449 .map(|keys| keys.agent.clone())
450 .unwrap_or_default(),
451 trace_id: trace_id.clone(),
452 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
453 requested_at: now_rfc3339(),
454 payload: json!({
455 "n": n,
456 "m": m,
457 "action": action_name,
458 "approvers": approvers,
459 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
460 }),
461 };
462 create_request_waitpoint(&log, &request).await?;
463 append_request(&log, &request).await?;
464 maybe_notify_host(&request);
465 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
466
467 match wait_for_request_waitpoint(
468 &request_id,
469 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
470 )
471 .await?
472 {
473 WaitpointOutcome::Completed(record) => {
474 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
475 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
476 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
477 })?;
478 let result = vm.call_closure_pub(&action, &[]).await?;
479
480 append_named_event(
481 &log,
482 HitlRequestKind::DualControl,
483 "hitl.dual_control_executed",
484 &request_id,
485 &trace_id,
486 json!({
487 "request_id": request_id,
488 "result": crate::llm::vm_value_to_json(&result),
489 }),
490 )
491 .await?;
492
493 Ok(result)
494 }
495 WaitpointOutcome::Timeout => {
496 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
497 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
498 }
499 WaitpointOutcome::Cancelled { .. } => {
500 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
501 }
502 }
503}
504
505async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
506 let role = required_string_arg(args, 0, "escalate_to")?;
507 let reason = required_string_arg(args, 1, "escalate_to")?;
508 let keys = current_dispatch_keys();
509 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
510 let trace_id = keys
511 .as_ref()
512 .map(|keys| keys.trace_id.clone())
513 .unwrap_or_else(new_trace_id);
514 let log = ensure_hitl_event_log();
515 let request = HitlRequestEnvelope {
516 request_id: request_id.clone(),
517 kind: HitlRequestKind::Escalation,
518 agent: keys
519 .as_ref()
520 .map(|keys| keys.agent.clone())
521 .unwrap_or_default(),
522 trace_id: trace_id.clone(),
523 run_id: crate::orchestration::current_mutation_session().and_then(|session| session.run_id),
524 requested_at: now_rfc3339(),
525 payload: json!({
526 "role": role,
527 "reason": reason,
528 "capability_policy": escalation_capability_policy(),
529 }),
530 };
531 create_request_waitpoint(&log, &request).await?;
532 append_request(&log, &request).await?;
533 maybe_notify_host(&request);
534 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
535
536 match wait_for_request_waitpoint(&request_id, None).await? {
537 WaitpointOutcome::Completed(record) => {
538 let accepted_at = record.completed_at.clone();
539 let reviewer = record.completed_by.clone();
540 let accepted = record
541 .value
542 .as_ref()
543 .and_then(|value| value.get("accepted"))
544 .and_then(JsonValue::as_bool)
545 .unwrap_or(true);
546 Ok(crate::stdlib::json_to_vm_value(&json!({
547 "request_id": request_id,
548 "role": role,
549 "reason": reason,
550 "trace_id": trace_id,
551 "status": if accepted { "accepted" } else { "pending" },
552 "accepted_at": accepted_at,
553 "reviewer": reviewer,
554 })))
555 }
556 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
557 WaitpointOutcome::Cancelled {
558 wait_id,
559 waitpoint_ids,
560 reason,
561 } => Err(hitl_cancelled_error(
562 &request_id,
563 HitlRequestKind::Escalation,
564 &wait_id,
565 &waitpoint_ids,
566 reason,
567 )),
568 }
569}
570
571async fn create_request_waitpoint(
572 log: &Arc<AnyEventLog>,
573 request: &HitlRequestEnvelope,
574) -> Result<(), VmError> {
575 create_waitpoint_on(
576 log,
577 Some(request.request_id.clone()),
578 Some(json!({
579 "kind": request.kind.as_str(),
580 "agent": request.agent.clone(),
581 "trace_id": request.trace_id.clone(),
582 "requested_at": request.requested_at.clone(),
583 "payload": request.payload.clone(),
584 })),
585 )
586 .await?;
587 Ok(())
588}
589
590async fn wait_for_request_waitpoint(
591 request_id: &str,
592 timeout: Option<StdDuration>,
593) -> Result<WaitpointOutcome, VmError> {
594 match wait_on_waitpoints(
595 vec![request_id.to_string()],
596 WaitpointWaitOptions { timeout },
597 )
598 .await
599 {
600 Ok(records) => Ok(WaitpointOutcome::Completed(
601 records
602 .into_iter()
603 .next()
604 .expect("single waitpoint wait result"),
605 )),
606 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
607 Err(WaitpointWaitFailure::Cancelled {
608 wait_id,
609 waitpoint_ids,
610 reason,
611 }) => Ok(WaitpointOutcome::Cancelled {
612 wait_id,
613 waitpoint_ids,
614 reason,
615 }),
616 Err(WaitpointWaitFailure::Vm(error)) => {
617 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
618 return Ok(outcome);
619 }
620 Err(error)
621 }
622 }
623}
624
625fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
626 let VmError::Thrown(VmValue::Dict(dict)) = error else {
627 return None;
628 };
629 let name = dict.get("name").and_then(vm_string)?;
630 match name {
631 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
632 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
633 wait_id: dict
634 .get("wait_id")
635 .and_then(vm_string)
636 .unwrap_or_default()
637 .to_string(),
638 waitpoint_ids: dict
639 .get("waitpoint_ids")
640 .and_then(vm_string_list)
641 .unwrap_or_default(),
642 reason: dict
643 .get("reason")
644 .and_then(vm_string)
645 .map(ToString::to_string),
646 }),
647 _ => None,
648 }
649}
650
651async fn finalize_hitl_response(
652 log: &Arc<AnyEventLog>,
653 kind: HitlRequestKind,
654 response: &HitlHostResponse,
655) -> Result<(), String> {
656 match kind {
657 HitlRequestKind::Question => {
658 if waitpoint_is_terminal(log, &response.request_id).await? {
659 return Ok(());
660 }
661 complete_waitpoint_on(
662 log,
663 &response.request_id,
664 response.answer.clone(),
665 response.reviewer.clone(),
666 response.reason.clone(),
667 response.metadata.clone(),
668 )
669 .await
670 .map(|_| ())
671 .map_err(|error| error.to_string())
672 }
673 HitlRequestKind::Escalation => {
674 if !response.accepted.unwrap_or(false)
675 || waitpoint_is_terminal(log, &response.request_id).await?
676 {
677 return Ok(());
678 }
679 complete_waitpoint_on(
680 log,
681 &response.request_id,
682 Some(json!({
683 "accepted": true,
684 "reviewer": response.reviewer,
685 "reason": response.reason,
686 "responded_at": response.responded_at,
687 })),
688 response.reviewer.clone(),
689 response.reason.clone(),
690 response.metadata.clone(),
691 )
692 .await
693 .map(|_| ())
694 .map_err(|error| error.to_string())
695 }
696 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
697 if waitpoint_is_terminal(log, &response.request_id).await? {
698 return Ok(());
699 }
700 let request = load_request_envelope(log, kind, &response.request_id)
701 .await
702 .map_err(|error| error.to_string())?;
703 match resolve_approval_state(log, kind, &request)
704 .await
705 .map_err(|error| error.to_string())?
706 {
707 ApprovalResolution::Pending => Ok(()),
708 ApprovalResolution::Approved(progress) => {
709 let record = approval_record_json(&progress);
710 append_named_event(
711 log,
712 kind,
713 approved_event_kind(kind),
714 &request.request_id,
715 &request.trace_id,
716 json!({
717 "request_id": request.request_id.clone(),
718 "record": record.clone(),
719 }),
720 )
721 .await
722 .map_err(|error| error.to_string())?;
723 complete_waitpoint_on(
724 log,
725 &request.request_id,
726 Some(record),
727 response.reviewer.clone(),
728 progress.reason.clone(),
729 response.metadata.clone(),
730 )
731 .await
732 .map(|_| ())
733 .map_err(|error| error.to_string())
734 }
735 ApprovalResolution::Denied(denied) => {
736 append_named_event(
737 log,
738 kind,
739 denied_event_kind(kind),
740 &request.request_id,
741 &request.trace_id,
742 json!({
743 "request_id": request.request_id.clone(),
744 "reviewer": denied.reviewer.clone(),
745 "reason": denied.reason.clone(),
746 }),
747 )
748 .await
749 .map_err(|error| error.to_string())?;
750 cancel_waitpoint_on(
751 log,
752 &request.request_id,
753 denied.reviewer.clone(),
754 denied.reason.clone(),
755 denied.metadata.clone(),
756 )
757 .await
758 .map(|_| ())
759 .map_err(|error| error.to_string())
760 }
761 }
762 }
763 }
764}
765
766async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
767 Ok(inspect_waitpoint_on(log, request_id)
768 .await
769 .map_err(|error| error.to_string())?
770 .is_some_and(|record| record.status != WaitpointStatus::Open))
771}
772
773async fn load_request_envelope(
774 log: &Arc<AnyEventLog>,
775 kind: HitlRequestKind,
776 request_id: &str,
777) -> Result<HitlRequestEnvelope, VmError> {
778 let topic = topic(kind)?;
779 let events = log
780 .read_range(&topic, None, usize::MAX)
781 .await
782 .map_err(log_error)?;
783 events
784 .into_iter()
785 .filter(|(_, event)| event.kind == kind.request_event_kind())
786 .find_map(|(_, event)| {
787 if !event_matches_request(&event, request_id) {
788 return None;
789 }
790 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
791 })
792 .ok_or_else(|| {
793 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
794 })
795}
796
797async fn resolve_approval_state(
798 log: &Arc<AnyEventLog>,
799 kind: HitlRequestKind,
800 request: &HitlRequestEnvelope,
801) -> Result<ApprovalResolution, VmError> {
802 let quorum = approval_quorum_from_request(kind, request)?;
803 let allowed_reviewers = approval_reviewers_from_request(kind, request)
804 .into_iter()
805 .collect::<BTreeSet<_>>();
806 let mut progress = ApprovalProgress {
807 reviewers: BTreeSet::new(),
808 signatures: Vec::new(),
809 reason: None,
810 approved_at: None,
811 };
812 let topic = topic(kind)?;
813 let events = log
814 .read_range(&topic, None, usize::MAX)
815 .await
816 .map_err(log_error)?;
817 for (_, event) in events {
818 if !event_matches_request(&event, &request.request_id)
819 || event.kind != "hitl.response_received"
820 {
821 continue;
822 }
823 let response: HitlHostResponse = serde_json::from_value(event.payload)
824 .map_err(|error| VmError::Runtime(error.to_string()))?;
825 if let Some(reviewer) = response.reviewer.as_deref() {
826 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
827 continue;
828 }
829 if progress.reviewers.contains(reviewer) {
830 continue;
831 }
832 }
833 if response.approved.unwrap_or(false) {
834 if let Some(reviewer) = response.reviewer.clone() {
835 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
836 progress.reviewers.insert(reviewer.clone());
837 progress.signatures.push(ApprovalSignature {
838 reviewer: reviewer.clone(),
839 signed_at: signed_at.clone(),
840 signature: response.signature.clone().unwrap_or_else(|| {
841 approval_receipt_signature(
842 &request.request_id,
843 &reviewer,
844 &signed_at,
845 true,
846 response.reason.as_deref(),
847 )
848 }),
849 });
850 }
851 progress.reason = response.reason.clone();
852 progress.approved_at = response.responded_at.clone();
853 if progress.reviewers.len() as u32 >= quorum {
854 return Ok(ApprovalResolution::Approved(progress));
855 }
856 continue;
857 }
858 return Ok(ApprovalResolution::Denied(response));
859 }
860 Ok(ApprovalResolution::Pending)
861}
862
863fn approval_quorum_from_request(
864 kind: HitlRequestKind,
865 request: &HitlRequestEnvelope,
866) -> Result<u32, VmError> {
867 let key = match kind {
868 HitlRequestKind::DualControl => "n",
869 _ => "quorum",
870 };
871 let quorum = request
872 .payload
873 .get(key)
874 .and_then(JsonValue::as_u64)
875 .unwrap_or(1);
876 u32::try_from(quorum).map_err(|_| {
877 VmError::Runtime(format!(
878 "invalid quorum in HITL request '{}'",
879 request.request_id
880 ))
881 })
882}
883
884fn approval_reviewers_from_request(
885 kind: HitlRequestKind,
886 request: &HitlRequestEnvelope,
887) -> Vec<String> {
888 let key = match kind {
889 HitlRequestKind::DualControl => "approvers",
890 _ => "reviewers",
891 };
892 request
893 .payload
894 .get(key)
895 .and_then(JsonValue::as_array)
896 .map(|values| {
897 values
898 .iter()
899 .filter_map(JsonValue::as_str)
900 .map(str::to_string)
901 .collect()
902 })
903 .unwrap_or_default()
904}
905
906fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
907 json!({
908 "approved": true,
909 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
910 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
911 "reason": progress.reason,
912 "signatures": progress.signatures,
913 })
914}
915
916fn approval_receipt_signature(
917 request_id: &str,
918 reviewer: &str,
919 signed_at: &str,
920 approved: bool,
921 reason: Option<&str>,
922) -> String {
923 let material = format!(
924 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
925 reason.unwrap_or("")
926 );
927 let hash = sha2::Sha256::digest(material.as_bytes());
928 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
929 format!("sha256:{hex}")
930}
931
932fn approval_record_from_waitpoint(
933 record: &WaitpointRecord,
934 builtin: &str,
935) -> Result<VmValue, VmError> {
936 record
937 .value
938 .as_ref()
939 .map(crate::stdlib::json_to_vm_value)
940 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
941}
942
943async fn approval_wait_error(
944 log: &Arc<AnyEventLog>,
945 kind: HitlRequestKind,
946 request_id: &str,
947) -> VmError {
948 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
949 if record.status == WaitpointStatus::Cancelled
950 && record.reason.as_deref() != Some("upstream_cancelled")
951 {
952 return approval_denied_error(
953 request_id,
954 HitlHostResponse {
955 request_id: request_id.to_string(),
956 answer: None,
957 approved: Some(false),
958 accepted: None,
959 reviewer: record.cancelled_by.clone(),
960 reason: record.reason.clone(),
961 metadata: record.metadata.clone(),
962 responded_at: record.cancelled_at.clone(),
963 signature: None,
964 },
965 );
966 }
967 if record.status == WaitpointStatus::Cancelled {
968 return hitl_cancelled_error(
969 request_id,
970 kind,
971 "",
972 &[request_id.to_string()],
973 record.reason.clone(),
974 );
975 }
976 }
977 hitl_cancelled_error(
978 request_id,
979 kind,
980 "",
981 &[request_id.to_string()],
982 Some("upstream_cancelled".to_string()),
983 )
984}
985
986async fn append_timeout_once(
987 log: &Arc<AnyEventLog>,
988 kind: HitlRequestKind,
989 request_id: &str,
990 trace_id: &str,
991) -> Result<(), VmError> {
992 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
993 return Ok(());
994 }
995 append_timeout(log, kind, request_id, trace_id).await
996}
997
998async fn hitl_event_exists(
999 log: &Arc<AnyEventLog>,
1000 kind: HitlRequestKind,
1001 request_id: &str,
1002 event_kind: &str,
1003) -> Result<bool, VmError> {
1004 let topic = topic(kind)?;
1005 let events = log
1006 .read_range(&topic, None, usize::MAX)
1007 .await
1008 .map_err(log_error)?;
1009 Ok(events
1010 .into_iter()
1011 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1012}
1013
1014fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1015 match kind {
1016 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1017 _ => "hitl.approval_approved",
1018 }
1019}
1020
1021fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1022 match kind {
1023 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1024 _ => "hitl.approval_denied",
1025 }
1026}
1027
1028async fn append_request(
1029 log: &Arc<AnyEventLog>,
1030 request: &HitlRequestEnvelope,
1031) -> Result<(), VmError> {
1032 let topic = topic(request.kind)?;
1033 log.append(
1034 &topic,
1035 LogEvent::new(
1036 request.kind.request_event_kind(),
1037 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1038 )
1039 .with_headers(request_headers(request)),
1040 )
1041 .await
1042 .map(|_| ())
1043 .map_err(log_error)
1044}
1045
1046async fn append_named_event(
1047 log: &Arc<AnyEventLog>,
1048 kind: HitlRequestKind,
1049 event_kind: &str,
1050 request_id: &str,
1051 trace_id: &str,
1052 payload: JsonValue,
1053) -> Result<(), VmError> {
1054 let topic = topic(kind)?;
1055 let headers = headers_with_trace(request_id, trace_id);
1056 log.append(
1057 &topic,
1058 LogEvent::new(event_kind, payload).with_headers(headers),
1059 )
1060 .await
1061 .map(|_| ())
1062 .map_err(log_error)
1063}
1064
1065async fn append_timeout(
1066 log: &Arc<AnyEventLog>,
1067 kind: HitlRequestKind,
1068 request_id: &str,
1069 trace_id: &str,
1070) -> Result<(), VmError> {
1071 append_named_event(
1072 log,
1073 kind,
1074 "hitl.timeout",
1075 request_id,
1076 trace_id,
1077 serde_json::to_value(HitlTimeoutRecord {
1078 request_id: request_id.to_string(),
1079 kind,
1080 trace_id: trace_id.to_string(),
1081 timed_out_at: now_rfc3339(),
1082 })
1083 .map_err(|error| VmError::Runtime(error.to_string()))?,
1084 )
1085 .await
1086}
1087
1088async fn maybe_apply_mock_response(
1089 kind: HitlRequestKind,
1090 request_id: &str,
1091 request_payload: &JsonValue,
1092) -> Result<(), VmError> {
1093 let mut params = request_payload
1094 .as_object()
1095 .cloned()
1096 .unwrap_or_default()
1097 .into_iter()
1098 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1099 .collect::<BTreeMap<_, _>>();
1100 params.insert(
1101 "request_id".to_string(),
1102 VmValue::String(Rc::from(request_id.to_string())),
1103 );
1104 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1105 return Ok(());
1106 };
1107 let value = result?;
1108 let responses = match value {
1109 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1110 other => vec![other],
1111 };
1112 for response in responses {
1113 let response_dict = response.as_dict().ok_or_else(|| {
1114 VmError::Runtime(format!(
1115 "mocked HITL {} response must be a dict or list<dict>",
1116 kind.as_str()
1117 ))
1118 })?;
1119 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1120 append_hitl_response(None, hitl_response)
1121 .await
1122 .map_err(VmError::Runtime)?;
1123 }
1124 Ok(())
1125}
1126
1127fn parse_hitl_response_dict(
1128 request_id: &str,
1129 response_dict: &BTreeMap<String, VmValue>,
1130) -> Result<HitlHostResponse, VmError> {
1131 Ok(HitlHostResponse {
1132 request_id: request_id.to_string(),
1133 answer: response_dict
1134 .get("answer")
1135 .map(crate::llm::vm_value_to_json),
1136 approved: response_dict.get("approved").and_then(vm_bool),
1137 accepted: response_dict.get("accepted").and_then(vm_bool),
1138 reviewer: response_dict.get("reviewer").map(VmValue::display),
1139 reason: response_dict.get("reason").map(VmValue::display),
1140 metadata: response_dict
1141 .get("metadata")
1142 .map(crate::llm::vm_value_to_json),
1143 responded_at: response_dict.get("responded_at").map(VmValue::display),
1144 signature: response_dict.get("signature").map(VmValue::display),
1145 })
1146}
1147
1148fn maybe_notify_host(request: &HitlRequestEnvelope) {
1149 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1150 return;
1151 };
1152 bridge.notify(
1153 "harn.hitl.requested",
1154 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1155 );
1156}
1157
1158fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1159 let Some(value) = value else {
1160 return Ok(AskUserOptions {
1161 schema: None,
1162 timeout: Some(default_question_timeout()),
1163 default: None,
1164 });
1165 };
1166 let dict = value
1167 .as_dict()
1168 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1169 Ok(AskUserOptions {
1170 schema: dict
1171 .get("schema")
1172 .cloned()
1173 .filter(|value| !matches!(value, VmValue::Nil)),
1174 timeout: dict
1175 .get("timeout")
1176 .map(parse_duration_value)
1177 .transpose()?
1178 .or_else(|| Some(default_question_timeout())),
1179 default: dict
1180 .get("default")
1181 .cloned()
1182 .filter(|value| !matches!(value, VmValue::Nil)),
1183 })
1184}
1185
1186fn default_question_timeout() -> StdDuration {
1187 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1188}
1189
1190fn escalation_capability_policy() -> JsonValue {
1191 crate::orchestration::current_execution_policy()
1192 .and_then(|policy| serde_json::to_value(policy).ok())
1193 .unwrap_or(JsonValue::Null)
1194}
1195
1196fn parse_approval_options(
1197 value: Option<&VmValue>,
1198 builtin: &str,
1199) -> Result<ApprovalOptions, VmError> {
1200 let dict = match value {
1201 None => None,
1202 Some(VmValue::Dict(dict)) => Some(dict),
1203 Some(_) => {
1204 return Err(VmError::Runtime(format!(
1205 "{builtin}: options must be a dict"
1206 )))
1207 }
1208 };
1209 let quorum = dict
1210 .and_then(|dict| dict.get("quorum"))
1211 .and_then(VmValue::as_int)
1212 .unwrap_or(1);
1213 if quorum <= 0 {
1214 return Err(VmError::Runtime(format!(
1215 "{builtin}: quorum must be positive"
1216 )));
1217 }
1218 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1219 let deadline = dict
1220 .and_then(|dict| dict.get("deadline"))
1221 .map(parse_duration_value)
1222 .transpose()?
1223 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1224 Ok(ApprovalOptions {
1225 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1226 quorum: quorum as u32,
1227 reviewers,
1228 deadline,
1229 })
1230}
1231
1232fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1233 args.get(idx)
1234 .map(VmValue::display)
1235 .filter(|value| !value.is_empty())
1236 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1237}
1238
1239fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1240 let value = args
1241 .get(idx)
1242 .and_then(VmValue::as_int)
1243 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1244 if value <= 0 {
1245 return Err(VmError::Runtime(format!(
1246 "{builtin}: expected a positive int at {idx}"
1247 )));
1248 }
1249 Ok(value)
1250}
1251
1252fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1253 let Some(value) = value else {
1254 return Ok(Vec::new());
1255 };
1256 match value {
1257 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1258 _ => Err(VmError::Runtime(format!(
1259 "{builtin}: expected list<string>"
1260 ))),
1261 }
1262}
1263
1264fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1265 match value {
1266 VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
1267 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1268 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1269 _ => Err(VmError::Runtime(
1270 "expected a duration or millisecond count".to_string(),
1271 )),
1272 }
1273}
1274
1275fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1276 active_event_log()
1277 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1278}
1279
1280fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1281 if let Some(log) = active_event_log() {
1282 return Ok(log);
1283 }
1284 let Some(base_dir) = base_dir else {
1285 return Ok(install_memory_for_current_thread(
1286 HITL_EVENT_LOG_QUEUE_DEPTH,
1287 ));
1288 };
1289 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1290}
1291
1292fn current_dispatch_keys() -> Option<DispatchKeys> {
1293 let context = current_dispatch_context()?;
1294 let stable_base = context
1295 .replay_of_event_id
1296 .clone()
1297 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1298 let instance_key = format!(
1299 "{}::{}",
1300 context.trigger_event.id.0,
1301 context.replay_of_event_id.as_deref().unwrap_or("live")
1302 );
1303 Some(DispatchKeys {
1304 instance_key,
1305 stable_base,
1306 agent: context.agent_id,
1307 trace_id: context.trigger_event.trace_id.0,
1308 })
1309}
1310
1311fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1312 if let Some(keys) = dispatch_keys {
1313 let seq = REQUEST_SEQUENCE.with(|slot| {
1314 let mut state = slot.borrow_mut();
1315 if state.instance_key != keys.instance_key {
1316 state.instance_key = keys.instance_key.clone();
1317 state.next_seq = 0;
1318 }
1319 state.next_seq += 1;
1320 state.next_seq
1321 });
1322 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1323 }
1324 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1325}
1326
1327fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1328 let mut headers = headers_with_trace(&request.request_id, &request.trace_id);
1329 if let Some(run_id) = request.run_id.as_ref() {
1330 headers.insert("run_id".to_string(), run_id.clone());
1331 }
1332 headers
1333}
1334
1335fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1336 let mut headers = BTreeMap::new();
1337 headers.insert("request_id".to_string(), request_id.to_string());
1338 headers
1339}
1340
1341fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1342 let mut headers = response_headers(request_id);
1343 headers.insert("trace_id".to_string(), trace_id.to_string());
1344 headers
1345}
1346
1347fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1348 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1349}
1350
1351fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1352 event
1353 .headers
1354 .get("request_id")
1355 .is_some_and(|value| value == request_id)
1356 || event
1357 .payload
1358 .get("request_id")
1359 .and_then(JsonValue::as_str)
1360 .is_some_and(|value| value == request_id)
1361}
1362
1363fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1364 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1365 "name": "ApprovalDeniedError",
1366 "category": "generic",
1367 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1368 "request_id": request_id,
1369 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1370 "reason": response.reason,
1371 })))
1372}
1373
1374fn hitl_cancelled_error(
1375 request_id: &str,
1376 kind: HitlRequestKind,
1377 wait_id: &str,
1378 waitpoint_ids: &[String],
1379 reason: Option<String>,
1380) -> VmError {
1381 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1382 let message = reason
1383 .clone()
1384 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1385 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1386 "name": "HumanCancelledError",
1387 "category": ErrorCategory::Cancelled.as_str(),
1388 "message": message,
1389 "request_id": request_id,
1390 "kind": kind.as_str(),
1391 "wait_id": wait_id,
1392 "waitpoint_ids": waitpoint_ids,
1393 "reason": reason,
1394 })))
1395}
1396
1397fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1398 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1399 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1400 "name": "HumanTimeoutError",
1401 "category": ErrorCategory::Timeout.as_str(),
1402 "message": format!("{} timed out", kind.as_str()),
1403 "request_id": request_id,
1404 "kind": kind.as_str(),
1405 })))
1406}
1407
1408fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1409 match default {
1410 VmValue::Int(_) => match value {
1411 VmValue::Int(_) => value.clone(),
1412 VmValue::Float(number) => VmValue::Int(*number as i64),
1413 VmValue::String(text) => text
1414 .parse::<i64>()
1415 .map(VmValue::Int)
1416 .unwrap_or_else(|_| default.clone()),
1417 _ => default.clone(),
1418 },
1419 VmValue::Float(_) => match value {
1420 VmValue::Float(_) => value.clone(),
1421 VmValue::Int(number) => VmValue::Float(*number as f64),
1422 VmValue::String(text) => text
1423 .parse::<f64>()
1424 .map(VmValue::Float)
1425 .unwrap_or_else(|_| default.clone()),
1426 _ => default.clone(),
1427 },
1428 VmValue::Bool(_) => match value {
1429 VmValue::Bool(_) => value.clone(),
1430 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1431 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1432 _ => default.clone(),
1433 },
1434 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1435 VmValue::Duration(_) => match value {
1436 VmValue::Duration(_) => value.clone(),
1437 VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1438 _ => default.clone(),
1439 },
1440 VmValue::Nil => value.clone(),
1441 _ => {
1442 if value.type_name() == default.type_name() {
1443 value.clone()
1444 } else {
1445 default.clone()
1446 }
1447 }
1448 }
1449}
1450
1451fn log_error(error: impl std::fmt::Display) -> VmError {
1452 VmError::Runtime(error.to_string())
1453}
1454
1455fn now_rfc3339() -> String {
1456 OffsetDateTime::now_utc()
1457 .format(&Rfc3339)
1458 .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1459}
1460
1461fn new_trace_id() -> String {
1462 format!("trace_{}", Uuid::now_v7())
1463}
1464
1465fn vm_bool(value: &VmValue) -> Option<bool> {
1466 match value {
1467 VmValue::Bool(flag) => Some(*flag),
1468 _ => None,
1469 }
1470}
1471
1472fn vm_string(value: &VmValue) -> Option<&str> {
1473 match value {
1474 VmValue::String(text) => Some(text.as_ref()),
1475 _ => None,
1476 }
1477}
1478
1479fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1480 match value {
1481 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1482 _ => None,
1483 }
1484}
1485
1486#[cfg(test)]
1487mod tests {
1488 use super::{
1489 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1490 };
1491 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1492 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1493
1494 async fn execute_hitl_script(
1495 base_dir: &std::path::Path,
1496 source: &str,
1497 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1498 reset_thread_local_state();
1499 let log = install_default_for_base_dir(base_dir).expect("install event log");
1500 let chunk = compile_source(source).expect("compile source");
1501 let mut vm = Vm::new();
1502 register_vm_stdlib(&mut vm);
1503 vm.set_source_dir(base_dir);
1504 vm.execute(&chunk).await?;
1505 let output = vm.output().trim_end().to_string();
1506 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1507 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1508 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1509 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1510 Ok((
1511 output,
1512 question_events,
1513 approval_events,
1514 dual_control_events,
1515 escalation_events,
1516 ))
1517 }
1518
1519 async fn event_kinds(
1520 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1521 topic: &str,
1522 ) -> Vec<String> {
1523 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1524 .await
1525 .expect("read topic")
1526 .into_iter()
1527 .map(|(_, event)| event.kind)
1528 .collect()
1529 }
1530
1531 #[tokio::test(flavor = "current_thread")]
1532 async fn ask_user_coerces_to_default_type_and_logs_events() {
1533 tokio::task::LocalSet::new()
1534 .run_until(async {
1535 let dir = tempfile::tempdir().expect("tempdir");
1536 let source = r#"
1537pipeline test(task) {
1538 host_mock("hitl", "question", {answer: "9"})
1539 let answer: int = ask_user("Pick a number", {default: 0})
1540 println(answer)
1541}
1542"#;
1543 let (
1544 output,
1545 question_events,
1546 approval_events,
1547 dual_control_events,
1548 escalation_events,
1549 ) = execute_hitl_script(dir.path(), source)
1550 .await
1551 .expect("script succeeds");
1552 assert_eq!(output, "9");
1553 assert_eq!(
1554 question_events,
1555 vec![
1556 "hitl.question_asked".to_string(),
1557 "hitl.response_received".to_string()
1558 ]
1559 );
1560 assert!(approval_events.is_empty());
1561 assert!(dual_control_events.is_empty());
1562 assert!(escalation_events.is_empty());
1563 })
1564 .await;
1565 }
1566
1567 #[tokio::test(flavor = "current_thread")]
1568 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1569 tokio::task::LocalSet::new()
1570 .run_until(async {
1571 let dir = tempfile::tempdir().expect("tempdir");
1572 let source = r#"
1573pipeline test(task) {
1574 host_mock("hitl", "approval", [
1575 {approved: true, reviewer: "alice", reason: "ok"},
1576 {approved: true, reviewer: "bob", reason: "ship it"},
1577 ])
1578 let record = request_approval(
1579 "deploy production",
1580 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1581 )
1582 println(record.approved)
1583 println(len(record.reviewers))
1584 println(record.reviewers[0])
1585 println(record.reviewers[1])
1586}
1587"#;
1588 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1589 .await
1590 .expect("script succeeds");
1591 assert_eq!(
1592 approval_events,
1593 vec![
1594 "hitl.approval_requested".to_string(),
1595 "hitl.response_received".to_string(),
1596 "hitl.response_received".to_string(),
1597 "hitl.approval_approved".to_string(),
1598 ]
1599 );
1600 })
1601 .await;
1602 }
1603
1604 #[tokio::test(flavor = "current_thread")]
1605 async fn request_approval_surfaces_denials_as_typed_errors() {
1606 tokio::task::LocalSet::new()
1607 .run_until(async {
1608 let dir = tempfile::tempdir().expect("tempdir");
1609 let source = r#"
1610pipeline test(task) {
1611 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1612 let denied = try {
1613 request_approval("drop table", {reviewers: ["alice"]})
1614 }
1615 println(is_err(denied))
1616 println(unwrap_err(denied).name)
1617 println(unwrap_err(denied).reason)
1618}
1619"#;
1620 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1621 .await
1622 .expect("script succeeds");
1623 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1624 assert_eq!(
1625 approval_events,
1626 vec![
1627 "hitl.approval_requested".to_string(),
1628 "hitl.response_received".to_string(),
1629 "hitl.approval_denied".to_string(),
1630 ]
1631 );
1632 })
1633 .await;
1634 }
1635
1636 #[tokio::test(flavor = "current_thread")]
1637 async fn dual_control_executes_action_after_quorum() {
1638 tokio::task::LocalSet::new()
1639 .run_until(async {
1640 let dir = tempfile::tempdir().expect("tempdir");
1641 let source = r#"
1642pipeline test(task) {
1643 host_mock("hitl", "dual_control", [
1644 {approved: true, reviewer: "alice"},
1645 {approved: true, reviewer: "bob"},
1646 ])
1647 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1648 println(result)
1649}
1650"#;
1651 let (output, _, _, dual_control_events, _) =
1652 execute_hitl_script(dir.path(), source)
1653 .await
1654 .expect("script succeeds");
1655 assert_eq!(output, "launched");
1656 assert_eq!(
1657 dual_control_events,
1658 vec![
1659 "hitl.dual_control_requested".to_string(),
1660 "hitl.response_received".to_string(),
1661 "hitl.response_received".to_string(),
1662 "hitl.dual_control_approved".to_string(),
1663 "hitl.dual_control_executed".to_string(),
1664 ]
1665 );
1666 })
1667 .await;
1668 }
1669
1670 #[tokio::test(flavor = "current_thread")]
1671 async fn escalate_to_waits_for_acceptance_event() {
1672 tokio::task::LocalSet::new()
1673 .run_until(async {
1674 let dir = tempfile::tempdir().expect("tempdir");
1675 let source = r#"
1676pipeline test(task) {
1677 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1678 let handle = escalate_to("admin", "need override")
1679 println(handle.status)
1680 println(handle.reviewer)
1681}
1682"#;
1683 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1684 .await
1685 .expect("script succeeds");
1686 assert_eq!(output, "accepted\nlead");
1687 assert_eq!(
1688 escalation_events,
1689 vec![
1690 "hitl.escalation_issued".to_string(),
1691 "hitl.escalation_accepted".to_string(),
1692 ]
1693 );
1694 })
1695 .await;
1696 }
1697}