1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::Duration as StdDuration;
7
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use sha2::Digest;
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::stdlib::waitpoint::{
22 cancel_waitpoint_on, complete_waitpoint_on, create_waitpoint_on, inspect_waitpoint_on,
23 wait_on_waitpoints, WaitpointRecord, WaitpointStatus, WaitpointWaitFailure,
24 WaitpointWaitOptions,
25};
26use crate::triggers::dispatcher::current_dispatch_context;
27use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
28use crate::vm::{clone_async_builtin_child_vm, Vm};
29
30const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
31const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
32const HITL_QUESTION_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
33
34pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
35pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
36pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
37pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
38
39thread_local! {
40 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
41}
42
43#[derive(Default)]
44pub(crate) struct RequestSequenceState {
45 pub(crate) instance_key: String,
46 pub(crate) next_seq: u64,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum HitlRequestKind {
52 Question,
53 Approval,
54 DualControl,
55 Escalation,
56}
57
58impl HitlRequestKind {
59 pub(crate) fn as_str(self) -> &'static str {
60 match self {
61 Self::Question => "question",
62 Self::Approval => "approval",
63 Self::DualControl => "dual_control",
64 Self::Escalation => "escalation",
65 }
66 }
67
68 fn topic(self) -> &'static str {
69 match self {
70 Self::Question => HITL_QUESTIONS_TOPIC,
71 Self::Approval => HITL_APPROVALS_TOPIC,
72 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
73 Self::Escalation => HITL_ESCALATIONS_TOPIC,
74 }
75 }
76
77 fn request_event_kind(self) -> &'static str {
78 match self {
79 Self::Question => "hitl.question_asked",
80 Self::Approval => "hitl.approval_requested",
81 Self::DualControl => "hitl.dual_control_requested",
82 Self::Escalation => "hitl.escalation_issued",
83 }
84 }
85
86 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
87 if request_id.starts_with("hitl_question_") {
88 Some(Self::Question)
89 } else if request_id.starts_with("hitl_approval_") {
90 Some(Self::Approval)
91 } else if request_id.starts_with("hitl_dual_control_") {
92 Some(Self::DualControl)
93 } else if request_id.starts_with("hitl_escalation_") {
94 Some(Self::Escalation)
95 } else {
96 None
97 }
98 }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize)]
102pub struct HitlHostResponse {
103 pub request_id: String,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub answer: Option<JsonValue>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub approved: Option<bool>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub accepted: Option<bool>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub reviewer: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub reason: Option<String>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub metadata: Option<JsonValue>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub responded_at: Option<String>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub signature: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123struct HitlRequestEnvelope {
124 request_id: String,
125 kind: HitlRequestKind,
126 #[serde(default)]
127 agent: String,
128 trace_id: String,
129 requested_at: String,
130 payload: JsonValue,
131}
132
133#[derive(Clone, Debug, Serialize, Deserialize)]
134struct HitlTimeoutRecord {
135 request_id: String,
136 kind: HitlRequestKind,
137 trace_id: String,
138 timed_out_at: String,
139}
140
141#[derive(Clone, Debug)]
142struct DispatchKeys {
143 instance_key: String,
144 stable_base: String,
145 agent: String,
146 trace_id: String,
147}
148
149#[derive(Clone, Debug)]
150struct AskUserOptions {
151 schema: Option<VmValue>,
152 timeout: Option<StdDuration>,
153 default: Option<VmValue>,
154}
155
156#[derive(Clone, Debug)]
157struct ApprovalOptions {
158 detail: Option<VmValue>,
159 quorum: u32,
160 reviewers: Vec<String>,
161 deadline: StdDuration,
162}
163
164#[derive(Clone, Debug)]
165struct ApprovalProgress {
166 reviewers: BTreeSet<String>,
167 signatures: Vec<ApprovalSignature>,
168 reason: Option<String>,
169 approved_at: Option<String>,
170}
171
172#[derive(Clone, Debug, Serialize)]
173struct ApprovalSignature {
174 reviewer: String,
175 signed_at: String,
176 signature: String,
177}
178
179#[derive(Clone, Debug)]
180enum ApprovalResolution {
181 Pending,
182 Approved(ApprovalProgress),
183 Denied(HitlHostResponse),
184}
185
186#[derive(Clone, Debug)]
187enum WaitpointOutcome {
188 Completed(WaitpointRecord),
189 Timeout,
190 Cancelled {
191 wait_id: String,
192 waitpoint_ids: Vec<String>,
193 reason: Option<String>,
194 },
195}
196
197pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
198 vm.register_async_builtin("ask_user", |args| {
199 Box::pin(async move { ask_user_impl(&args).await })
200 });
201
202 vm.register_async_builtin("request_approval", |args| {
203 Box::pin(async move { request_approval_impl(&args).await })
204 });
205
206 vm.register_async_builtin("dual_control", |args| {
207 Box::pin(async move { dual_control_impl(&args).await })
208 });
209
210 vm.register_async_builtin("escalate_to", |args| {
211 Box::pin(async move { escalate_to_impl(&args).await })
212 });
213}
214
215pub(crate) fn reset_hitl_state() {
216 REQUEST_SEQUENCE.with(|slot| {
217 *slot.borrow_mut() = RequestSequenceState::default();
218 });
219}
220
221pub(crate) fn take_hitl_state() -> RequestSequenceState {
222 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
223}
224
225pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
226 REQUEST_SEQUENCE.with(|slot| {
227 *slot.borrow_mut() = state;
228 });
229}
230
231pub async fn append_hitl_response(
232 base_dir: Option<&Path>,
233 mut response: HitlHostResponse,
234) -> Result<u64, String> {
235 let kind = HitlRequestKind::from_request_id(&response.request_id)
236 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
237 if response.responded_at.is_none() {
238 response.responded_at = Some(now_rfc3339());
239 }
240 let log = ensure_hitl_event_log_for(base_dir)?;
241 let headers = response_headers(&response.request_id);
242 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
243 let event_id = log
244 .append(
245 &topic,
246 LogEvent::new(
247 match kind {
248 HitlRequestKind::Escalation => "hitl.escalation_accepted",
249 _ => "hitl.response_received",
250 },
251 serde_json::to_value(&response).map_err(|error| error.to_string())?,
252 )
253 .with_headers(headers),
254 )
255 .await
256 .map_err(|error| error.to_string())?;
257 finalize_hitl_response(&log, kind, &response).await?;
258 Ok(event_id)
259}
260
261pub async fn append_approval_request_on(
262 log: &Arc<AnyEventLog>,
263 agent: impl Into<String>,
264 trace_id: impl Into<String>,
265 action: impl Into<String>,
266 detail: JsonValue,
267 reviewers: Vec<String>,
268) -> Result<String, VmError> {
269 let request_id = next_request_id(HitlRequestKind::Approval, current_dispatch_keys().as_ref());
270 let trace_id = trace_id.into();
271 let request = HitlRequestEnvelope {
272 request_id: request_id.clone(),
273 kind: HitlRequestKind::Approval,
274 agent: agent.into(),
275 trace_id: trace_id.clone(),
276 requested_at: now_rfc3339(),
277 payload: json!({
278 "action": action.into(),
279 "detail": detail,
280 "quorum": 1,
281 "reviewers": reviewers,
282 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
283 }),
284 };
285 create_request_waitpoint(log, &request).await?;
286 append_request(log, &request).await?;
287 maybe_notify_host(&request);
288 Ok(request_id)
289}
290
291async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
292 let prompt = required_string_arg(args, 0, "ask_user")?;
293 let options = parse_ask_user_options(args.get(1))?;
294 let keys = current_dispatch_keys();
295 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
296 let trace_id = keys
297 .as_ref()
298 .map(|keys| keys.trace_id.clone())
299 .unwrap_or_else(new_trace_id);
300 let log = ensure_hitl_event_log();
301 let request = HitlRequestEnvelope {
302 request_id: request_id.clone(),
303 kind: HitlRequestKind::Question,
304 agent: keys
305 .as_ref()
306 .map(|keys| keys.agent.clone())
307 .unwrap_or_default(),
308 trace_id: trace_id.clone(),
309 requested_at: now_rfc3339(),
310 payload: json!({
311 "prompt": prompt,
312 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
313 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
314 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
315 }),
316 };
317 create_request_waitpoint(&log, &request).await?;
318 append_request(&log, &request).await?;
319 maybe_notify_host(&request);
320 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
321
322 match wait_for_request_waitpoint(&request_id, options.timeout).await? {
323 WaitpointOutcome::Completed(record) => {
324 let answer = record
325 .value
326 .as_ref()
327 .map(crate::stdlib::json_to_vm_value)
328 .unwrap_or(VmValue::Nil);
329 if let Some(schema) = options.schema.as_ref() {
330 return schema_expect_value(&answer, schema, true);
331 }
332 if let Some(default) = options.default.as_ref() {
333 return Ok(coerce_like_default(&answer, default));
334 }
335 Ok(answer)
336 }
337 WaitpointOutcome::Timeout => {
338 append_timeout_once(&log, HitlRequestKind::Question, &request_id, &trace_id).await?;
339 if let Some(default) = options.default {
340 return Ok(default);
341 }
342 Err(timeout_error(&request_id, HitlRequestKind::Question))
343 }
344 WaitpointOutcome::Cancelled {
345 wait_id,
346 waitpoint_ids,
347 reason,
348 } => Err(hitl_cancelled_error(
349 &request_id,
350 HitlRequestKind::Question,
351 &wait_id,
352 &waitpoint_ids,
353 reason,
354 )),
355 }
356}
357
358async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
359 let action = required_string_arg(args, 0, "request_approval")?;
360 let options = parse_approval_options(args.get(1), "request_approval")?;
361 let keys = current_dispatch_keys();
362 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
363 let trace_id = keys
364 .as_ref()
365 .map(|keys| keys.trace_id.clone())
366 .unwrap_or_else(new_trace_id);
367 let log = ensure_hitl_event_log();
368 let request = HitlRequestEnvelope {
369 request_id: request_id.clone(),
370 kind: HitlRequestKind::Approval,
371 agent: keys
372 .as_ref()
373 .map(|keys| keys.agent.clone())
374 .unwrap_or_default(),
375 trace_id: trace_id.clone(),
376 requested_at: now_rfc3339(),
377 payload: json!({
378 "action": action,
379 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
380 "quorum": options.quorum,
381 "reviewers": options.reviewers,
382 "deadline_ms": options.deadline.as_millis() as u64,
383 }),
384 };
385 create_request_waitpoint(&log, &request).await?;
386 append_request(&log, &request).await?;
387 maybe_notify_host(&request);
388 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
389
390 match wait_for_request_waitpoint(&request_id, Some(options.deadline)).await? {
391 WaitpointOutcome::Completed(record) => {
392 approval_record_from_waitpoint(&record, "request_approval")
393 }
394 WaitpointOutcome::Timeout => {
395 append_timeout_once(&log, HitlRequestKind::Approval, &request_id, &trace_id).await?;
396 Err(timeout_error(&request_id, HitlRequestKind::Approval))
397 }
398 WaitpointOutcome::Cancelled { .. } => {
399 Err(approval_wait_error(&log, HitlRequestKind::Approval, &request_id).await)
400 }
401 }
402}
403
404async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
405 let n = required_positive_int_arg(args, 0, "dual_control")?;
406 let m = required_positive_int_arg(args, 1, "dual_control")?;
407 if n > m {
408 return Err(VmError::Runtime(
409 "dual_control: n must be less than or equal to m".to_string(),
410 ));
411 }
412 let action = args
413 .get(2)
414 .and_then(|value| match value {
415 VmValue::Closure(closure) => Some(closure.clone()),
416 _ => None,
417 })
418 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
419 let approvers = optional_string_list(args.get(3), "dual_control")?;
420 if !approvers.is_empty() && approvers.len() < m as usize {
421 return Err(VmError::Runtime(format!(
422 "dual_control: expected at least {m} approvers, got {}",
423 approvers.len()
424 )));
425 }
426
427 let keys = current_dispatch_keys();
428 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
429 let trace_id = keys
430 .as_ref()
431 .map(|keys| keys.trace_id.clone())
432 .unwrap_or_else(new_trace_id);
433 let action_name = if action.func.name.is_empty() {
434 "anonymous".to_string()
435 } else {
436 action.func.name.clone()
437 };
438 let log = ensure_hitl_event_log();
439 let request = HitlRequestEnvelope {
440 request_id: request_id.clone(),
441 kind: HitlRequestKind::DualControl,
442 agent: keys
443 .as_ref()
444 .map(|keys| keys.agent.clone())
445 .unwrap_or_default(),
446 trace_id: trace_id.clone(),
447 requested_at: now_rfc3339(),
448 payload: json!({
449 "n": n,
450 "m": m,
451 "action": action_name,
452 "approvers": approvers,
453 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
454 }),
455 };
456 create_request_waitpoint(&log, &request).await?;
457 append_request(&log, &request).await?;
458 maybe_notify_host(&request);
459 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
460
461 match wait_for_request_waitpoint(
462 &request_id,
463 Some(StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS)),
464 )
465 .await?
466 {
467 WaitpointOutcome::Completed(record) => {
468 let _ = approval_record_from_waitpoint(&record, "dual_control")?;
469 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
470 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
471 })?;
472 let result = vm.call_closure_pub(&action, &[]).await?;
473
474 append_named_event(
475 &log,
476 HitlRequestKind::DualControl,
477 "hitl.dual_control_executed",
478 &request_id,
479 &trace_id,
480 json!({
481 "request_id": request_id,
482 "result": crate::llm::vm_value_to_json(&result),
483 }),
484 )
485 .await?;
486
487 Ok(result)
488 }
489 WaitpointOutcome::Timeout => {
490 append_timeout_once(&log, HitlRequestKind::DualControl, &request_id, &trace_id).await?;
491 Err(timeout_error(&request_id, HitlRequestKind::DualControl))
492 }
493 WaitpointOutcome::Cancelled { .. } => {
494 Err(approval_wait_error(&log, HitlRequestKind::DualControl, &request_id).await)
495 }
496 }
497}
498
499async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
500 let role = required_string_arg(args, 0, "escalate_to")?;
501 let reason = required_string_arg(args, 1, "escalate_to")?;
502 let keys = current_dispatch_keys();
503 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
504 let trace_id = keys
505 .as_ref()
506 .map(|keys| keys.trace_id.clone())
507 .unwrap_or_else(new_trace_id);
508 let log = ensure_hitl_event_log();
509 let request = HitlRequestEnvelope {
510 request_id: request_id.clone(),
511 kind: HitlRequestKind::Escalation,
512 agent: keys
513 .as_ref()
514 .map(|keys| keys.agent.clone())
515 .unwrap_or_default(),
516 trace_id: trace_id.clone(),
517 requested_at: now_rfc3339(),
518 payload: json!({
519 "role": role,
520 "reason": reason,
521 "capability_policy": escalation_capability_policy(),
522 }),
523 };
524 create_request_waitpoint(&log, &request).await?;
525 append_request(&log, &request).await?;
526 maybe_notify_host(&request);
527 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
528
529 match wait_for_request_waitpoint(&request_id, None).await? {
530 WaitpointOutcome::Completed(record) => {
531 let accepted_at = record.completed_at.clone();
532 let reviewer = record.completed_by.clone();
533 let accepted = record
534 .value
535 .as_ref()
536 .and_then(|value| value.get("accepted"))
537 .and_then(JsonValue::as_bool)
538 .unwrap_or(true);
539 Ok(crate::stdlib::json_to_vm_value(&json!({
540 "request_id": request_id,
541 "role": role,
542 "reason": reason,
543 "trace_id": trace_id,
544 "status": if accepted { "accepted" } else { "pending" },
545 "accepted_at": accepted_at,
546 "reviewer": reviewer,
547 })))
548 }
549 WaitpointOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
550 WaitpointOutcome::Cancelled {
551 wait_id,
552 waitpoint_ids,
553 reason,
554 } => Err(hitl_cancelled_error(
555 &request_id,
556 HitlRequestKind::Escalation,
557 &wait_id,
558 &waitpoint_ids,
559 reason,
560 )),
561 }
562}
563
564async fn create_request_waitpoint(
565 log: &Arc<AnyEventLog>,
566 request: &HitlRequestEnvelope,
567) -> Result<(), VmError> {
568 create_waitpoint_on(
569 log,
570 Some(request.request_id.clone()),
571 Some(json!({
572 "kind": request.kind.as_str(),
573 "agent": request.agent.clone(),
574 "trace_id": request.trace_id.clone(),
575 "requested_at": request.requested_at.clone(),
576 "payload": request.payload.clone(),
577 })),
578 )
579 .await?;
580 Ok(())
581}
582
583async fn wait_for_request_waitpoint(
584 request_id: &str,
585 timeout: Option<StdDuration>,
586) -> Result<WaitpointOutcome, VmError> {
587 match wait_on_waitpoints(
588 vec![request_id.to_string()],
589 WaitpointWaitOptions { timeout },
590 )
591 .await
592 {
593 Ok(records) => Ok(WaitpointOutcome::Completed(
594 records
595 .into_iter()
596 .next()
597 .expect("single waitpoint wait result"),
598 )),
599 Err(WaitpointWaitFailure::Timeout { .. }) => Ok(WaitpointOutcome::Timeout),
600 Err(WaitpointWaitFailure::Cancelled {
601 wait_id,
602 waitpoint_ids,
603 reason,
604 }) => Ok(WaitpointOutcome::Cancelled {
605 wait_id,
606 waitpoint_ids,
607 reason,
608 }),
609 Err(WaitpointWaitFailure::Vm(error)) => {
610 if let Some(outcome) = waitpoint_outcome_from_vm_error(&error) {
611 return Ok(outcome);
612 }
613 Err(error)
614 }
615 }
616}
617
618fn waitpoint_outcome_from_vm_error(error: &VmError) -> Option<WaitpointOutcome> {
619 let VmError::Thrown(VmValue::Dict(dict)) = error else {
620 return None;
621 };
622 let name = dict.get("name").and_then(vm_string)?;
623 match name {
624 "WaitpointTimeoutError" => Some(WaitpointOutcome::Timeout),
625 "WaitpointCancelledError" => Some(WaitpointOutcome::Cancelled {
626 wait_id: dict
627 .get("wait_id")
628 .and_then(vm_string)
629 .unwrap_or_default()
630 .to_string(),
631 waitpoint_ids: dict
632 .get("waitpoint_ids")
633 .and_then(vm_string_list)
634 .unwrap_or_default(),
635 reason: dict
636 .get("reason")
637 .and_then(vm_string)
638 .map(ToString::to_string),
639 }),
640 _ => None,
641 }
642}
643
644async fn finalize_hitl_response(
645 log: &Arc<AnyEventLog>,
646 kind: HitlRequestKind,
647 response: &HitlHostResponse,
648) -> Result<(), String> {
649 match kind {
650 HitlRequestKind::Question => {
651 if waitpoint_is_terminal(log, &response.request_id).await? {
652 return Ok(());
653 }
654 complete_waitpoint_on(
655 log,
656 &response.request_id,
657 response.answer.clone(),
658 response.reviewer.clone(),
659 response.reason.clone(),
660 response.metadata.clone(),
661 )
662 .await
663 .map(|_| ())
664 .map_err(|error| error.to_string())
665 }
666 HitlRequestKind::Escalation => {
667 if !response.accepted.unwrap_or(false)
668 || waitpoint_is_terminal(log, &response.request_id).await?
669 {
670 return Ok(());
671 }
672 complete_waitpoint_on(
673 log,
674 &response.request_id,
675 Some(json!({
676 "accepted": true,
677 "reviewer": response.reviewer,
678 "reason": response.reason,
679 "responded_at": response.responded_at,
680 })),
681 response.reviewer.clone(),
682 response.reason.clone(),
683 response.metadata.clone(),
684 )
685 .await
686 .map(|_| ())
687 .map_err(|error| error.to_string())
688 }
689 HitlRequestKind::Approval | HitlRequestKind::DualControl => {
690 if waitpoint_is_terminal(log, &response.request_id).await? {
691 return Ok(());
692 }
693 let request = load_request_envelope(log, kind, &response.request_id)
694 .await
695 .map_err(|error| error.to_string())?;
696 match resolve_approval_state(log, kind, &request)
697 .await
698 .map_err(|error| error.to_string())?
699 {
700 ApprovalResolution::Pending => Ok(()),
701 ApprovalResolution::Approved(progress) => {
702 let record = approval_record_json(&progress);
703 append_named_event(
704 log,
705 kind,
706 approved_event_kind(kind),
707 &request.request_id,
708 &request.trace_id,
709 json!({
710 "request_id": request.request_id.clone(),
711 "record": record.clone(),
712 }),
713 )
714 .await
715 .map_err(|error| error.to_string())?;
716 complete_waitpoint_on(
717 log,
718 &request.request_id,
719 Some(record),
720 response.reviewer.clone(),
721 progress.reason.clone(),
722 response.metadata.clone(),
723 )
724 .await
725 .map(|_| ())
726 .map_err(|error| error.to_string())
727 }
728 ApprovalResolution::Denied(denied) => {
729 append_named_event(
730 log,
731 kind,
732 denied_event_kind(kind),
733 &request.request_id,
734 &request.trace_id,
735 json!({
736 "request_id": request.request_id.clone(),
737 "reviewer": denied.reviewer.clone(),
738 "reason": denied.reason.clone(),
739 }),
740 )
741 .await
742 .map_err(|error| error.to_string())?;
743 cancel_waitpoint_on(
744 log,
745 &request.request_id,
746 denied.reviewer.clone(),
747 denied.reason.clone(),
748 denied.metadata.clone(),
749 )
750 .await
751 .map(|_| ())
752 .map_err(|error| error.to_string())
753 }
754 }
755 }
756 }
757}
758
759async fn waitpoint_is_terminal(log: &Arc<AnyEventLog>, request_id: &str) -> Result<bool, String> {
760 Ok(inspect_waitpoint_on(log, request_id)
761 .await
762 .map_err(|error| error.to_string())?
763 .is_some_and(|record| record.status != WaitpointStatus::Open))
764}
765
766async fn load_request_envelope(
767 log: &Arc<AnyEventLog>,
768 kind: HitlRequestKind,
769 request_id: &str,
770) -> Result<HitlRequestEnvelope, VmError> {
771 let topic = topic(kind)?;
772 let events = log
773 .read_range(&topic, None, usize::MAX)
774 .await
775 .map_err(log_error)?;
776 events
777 .into_iter()
778 .filter(|(_, event)| event.kind == kind.request_event_kind())
779 .find_map(|(_, event)| {
780 if !event_matches_request(&event, request_id) {
781 return None;
782 }
783 serde_json::from_value::<HitlRequestEnvelope>(event.payload).ok()
784 })
785 .ok_or_else(|| {
786 VmError::Runtime(format!("missing HITL request envelope for '{request_id}'"))
787 })
788}
789
790async fn resolve_approval_state(
791 log: &Arc<AnyEventLog>,
792 kind: HitlRequestKind,
793 request: &HitlRequestEnvelope,
794) -> Result<ApprovalResolution, VmError> {
795 let quorum = approval_quorum_from_request(kind, request)?;
796 let allowed_reviewers = approval_reviewers_from_request(kind, request)
797 .into_iter()
798 .collect::<BTreeSet<_>>();
799 let mut progress = ApprovalProgress {
800 reviewers: BTreeSet::new(),
801 signatures: Vec::new(),
802 reason: None,
803 approved_at: None,
804 };
805 let topic = topic(kind)?;
806 let events = log
807 .read_range(&topic, None, usize::MAX)
808 .await
809 .map_err(log_error)?;
810 for (_, event) in events {
811 if !event_matches_request(&event, &request.request_id)
812 || event.kind != "hitl.response_received"
813 {
814 continue;
815 }
816 let response: HitlHostResponse = serde_json::from_value(event.payload)
817 .map_err(|error| VmError::Runtime(error.to_string()))?;
818 if let Some(reviewer) = response.reviewer.as_deref() {
819 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
820 continue;
821 }
822 if progress.reviewers.contains(reviewer) {
823 continue;
824 }
825 }
826 if response.approved.unwrap_or(false) {
827 if let Some(reviewer) = response.reviewer.clone() {
828 let signed_at = response.responded_at.clone().unwrap_or_else(now_rfc3339);
829 progress.reviewers.insert(reviewer.clone());
830 progress.signatures.push(ApprovalSignature {
831 reviewer: reviewer.clone(),
832 signed_at: signed_at.clone(),
833 signature: response.signature.clone().unwrap_or_else(|| {
834 approval_receipt_signature(
835 &request.request_id,
836 &reviewer,
837 &signed_at,
838 true,
839 response.reason.as_deref(),
840 )
841 }),
842 });
843 }
844 progress.reason = response.reason.clone();
845 progress.approved_at = response.responded_at.clone();
846 if progress.reviewers.len() as u32 >= quorum {
847 return Ok(ApprovalResolution::Approved(progress));
848 }
849 continue;
850 }
851 return Ok(ApprovalResolution::Denied(response));
852 }
853 Ok(ApprovalResolution::Pending)
854}
855
856fn approval_quorum_from_request(
857 kind: HitlRequestKind,
858 request: &HitlRequestEnvelope,
859) -> Result<u32, VmError> {
860 let key = match kind {
861 HitlRequestKind::DualControl => "n",
862 _ => "quorum",
863 };
864 let quorum = request
865 .payload
866 .get(key)
867 .and_then(JsonValue::as_u64)
868 .unwrap_or(1);
869 u32::try_from(quorum).map_err(|_| {
870 VmError::Runtime(format!(
871 "invalid quorum in HITL request '{}'",
872 request.request_id
873 ))
874 })
875}
876
877fn approval_reviewers_from_request(
878 kind: HitlRequestKind,
879 request: &HitlRequestEnvelope,
880) -> Vec<String> {
881 let key = match kind {
882 HitlRequestKind::DualControl => "approvers",
883 _ => "reviewers",
884 };
885 request
886 .payload
887 .get(key)
888 .and_then(JsonValue::as_array)
889 .map(|values| {
890 values
891 .iter()
892 .filter_map(JsonValue::as_str)
893 .map(str::to_string)
894 .collect()
895 })
896 .unwrap_or_default()
897}
898
899fn approval_record_json(progress: &ApprovalProgress) -> JsonValue {
900 json!({
901 "approved": true,
902 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
903 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
904 "reason": progress.reason,
905 "signatures": progress.signatures,
906 })
907}
908
909fn approval_receipt_signature(
910 request_id: &str,
911 reviewer: &str,
912 signed_at: &str,
913 approved: bool,
914 reason: Option<&str>,
915) -> String {
916 let material = format!(
917 "harn-hitl-approval-v1\nrequest_id:{request_id}\nreviewer:{reviewer}\nsigned_at:{signed_at}\napproved:{approved}\nreason:{}\n",
918 reason.unwrap_or("")
919 );
920 let hash = sha2::Sha256::digest(material.as_bytes());
921 let hex: String = hash.iter().map(|byte| format!("{byte:02x}")).collect();
922 format!("sha256:{hex}")
923}
924
925fn approval_record_from_waitpoint(
926 record: &WaitpointRecord,
927 builtin: &str,
928) -> Result<VmValue, VmError> {
929 record
930 .value
931 .as_ref()
932 .map(crate::stdlib::json_to_vm_value)
933 .ok_or_else(|| VmError::Runtime(format!("{builtin}: missing approval record")))
934}
935
936async fn approval_wait_error(
937 log: &Arc<AnyEventLog>,
938 kind: HitlRequestKind,
939 request_id: &str,
940) -> VmError {
941 if let Ok(Some(record)) = inspect_waitpoint_on(log, request_id).await {
942 if record.status == WaitpointStatus::Cancelled
943 && record.reason.as_deref() != Some("upstream_cancelled")
944 {
945 return approval_denied_error(
946 request_id,
947 HitlHostResponse {
948 request_id: request_id.to_string(),
949 answer: None,
950 approved: Some(false),
951 accepted: None,
952 reviewer: record.cancelled_by.clone(),
953 reason: record.reason.clone(),
954 metadata: record.metadata.clone(),
955 responded_at: record.cancelled_at.clone(),
956 signature: None,
957 },
958 );
959 }
960 if record.status == WaitpointStatus::Cancelled {
961 return hitl_cancelled_error(
962 request_id,
963 kind,
964 "",
965 &[request_id.to_string()],
966 record.reason.clone(),
967 );
968 }
969 }
970 hitl_cancelled_error(
971 request_id,
972 kind,
973 "",
974 &[request_id.to_string()],
975 Some("upstream_cancelled".to_string()),
976 )
977}
978
979async fn append_timeout_once(
980 log: &Arc<AnyEventLog>,
981 kind: HitlRequestKind,
982 request_id: &str,
983 trace_id: &str,
984) -> Result<(), VmError> {
985 if hitl_event_exists(log, kind, request_id, "hitl.timeout").await? {
986 return Ok(());
987 }
988 append_timeout(log, kind, request_id, trace_id).await
989}
990
991async fn hitl_event_exists(
992 log: &Arc<AnyEventLog>,
993 kind: HitlRequestKind,
994 request_id: &str,
995 event_kind: &str,
996) -> Result<bool, VmError> {
997 let topic = topic(kind)?;
998 let events = log
999 .read_range(&topic, None, usize::MAX)
1000 .await
1001 .map_err(log_error)?;
1002 Ok(events
1003 .into_iter()
1004 .any(|(_, event)| event.kind == event_kind && event_matches_request(&event, request_id)))
1005}
1006
1007fn approved_event_kind(kind: HitlRequestKind) -> &'static str {
1008 match kind {
1009 HitlRequestKind::DualControl => "hitl.dual_control_approved",
1010 _ => "hitl.approval_approved",
1011 }
1012}
1013
1014fn denied_event_kind(kind: HitlRequestKind) -> &'static str {
1015 match kind {
1016 HitlRequestKind::DualControl => "hitl.dual_control_denied",
1017 _ => "hitl.approval_denied",
1018 }
1019}
1020
1021async fn append_request(
1022 log: &Arc<AnyEventLog>,
1023 request: &HitlRequestEnvelope,
1024) -> Result<(), VmError> {
1025 let topic = topic(request.kind)?;
1026 log.append(
1027 &topic,
1028 LogEvent::new(
1029 request.kind.request_event_kind(),
1030 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
1031 )
1032 .with_headers(request_headers(request)),
1033 )
1034 .await
1035 .map(|_| ())
1036 .map_err(log_error)
1037}
1038
1039async fn append_named_event(
1040 log: &Arc<AnyEventLog>,
1041 kind: HitlRequestKind,
1042 event_kind: &str,
1043 request_id: &str,
1044 trace_id: &str,
1045 payload: JsonValue,
1046) -> Result<(), VmError> {
1047 let topic = topic(kind)?;
1048 let headers = headers_with_trace(request_id, trace_id);
1049 log.append(
1050 &topic,
1051 LogEvent::new(event_kind, payload).with_headers(headers),
1052 )
1053 .await
1054 .map(|_| ())
1055 .map_err(log_error)
1056}
1057
1058async fn append_timeout(
1059 log: &Arc<AnyEventLog>,
1060 kind: HitlRequestKind,
1061 request_id: &str,
1062 trace_id: &str,
1063) -> Result<(), VmError> {
1064 append_named_event(
1065 log,
1066 kind,
1067 "hitl.timeout",
1068 request_id,
1069 trace_id,
1070 serde_json::to_value(HitlTimeoutRecord {
1071 request_id: request_id.to_string(),
1072 kind,
1073 trace_id: trace_id.to_string(),
1074 timed_out_at: now_rfc3339(),
1075 })
1076 .map_err(|error| VmError::Runtime(error.to_string()))?,
1077 )
1078 .await
1079}
1080
1081async fn maybe_apply_mock_response(
1082 kind: HitlRequestKind,
1083 request_id: &str,
1084 request_payload: &JsonValue,
1085) -> Result<(), VmError> {
1086 let mut params = request_payload
1087 .as_object()
1088 .cloned()
1089 .unwrap_or_default()
1090 .into_iter()
1091 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
1092 .collect::<BTreeMap<_, _>>();
1093 params.insert(
1094 "request_id".to_string(),
1095 VmValue::String(Rc::from(request_id.to_string())),
1096 );
1097 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
1098 return Ok(());
1099 };
1100 let value = result?;
1101 let responses = match value {
1102 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
1103 other => vec![other],
1104 };
1105 for response in responses {
1106 let response_dict = response.as_dict().ok_or_else(|| {
1107 VmError::Runtime(format!(
1108 "mocked HITL {} response must be a dict or list<dict>",
1109 kind.as_str()
1110 ))
1111 })?;
1112 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
1113 append_hitl_response(None, hitl_response)
1114 .await
1115 .map_err(VmError::Runtime)?;
1116 }
1117 Ok(())
1118}
1119
1120fn parse_hitl_response_dict(
1121 request_id: &str,
1122 response_dict: &BTreeMap<String, VmValue>,
1123) -> Result<HitlHostResponse, VmError> {
1124 Ok(HitlHostResponse {
1125 request_id: request_id.to_string(),
1126 answer: response_dict
1127 .get("answer")
1128 .map(crate::llm::vm_value_to_json),
1129 approved: response_dict.get("approved").and_then(vm_bool),
1130 accepted: response_dict.get("accepted").and_then(vm_bool),
1131 reviewer: response_dict.get("reviewer").map(VmValue::display),
1132 reason: response_dict.get("reason").map(VmValue::display),
1133 metadata: response_dict
1134 .get("metadata")
1135 .map(crate::llm::vm_value_to_json),
1136 responded_at: response_dict.get("responded_at").map(VmValue::display),
1137 signature: response_dict.get("signature").map(VmValue::display),
1138 })
1139}
1140
1141fn maybe_notify_host(request: &HitlRequestEnvelope) {
1142 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
1143 return;
1144 };
1145 bridge.notify(
1146 "harn.hitl.requested",
1147 serde_json::to_value(request).unwrap_or(JsonValue::Null),
1148 );
1149}
1150
1151fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
1152 let Some(value) = value else {
1153 return Ok(AskUserOptions {
1154 schema: None,
1155 timeout: Some(default_question_timeout()),
1156 default: None,
1157 });
1158 };
1159 let dict = value
1160 .as_dict()
1161 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
1162 Ok(AskUserOptions {
1163 schema: dict
1164 .get("schema")
1165 .cloned()
1166 .filter(|value| !matches!(value, VmValue::Nil)),
1167 timeout: dict
1168 .get("timeout")
1169 .map(parse_duration_value)
1170 .transpose()?
1171 .or_else(|| Some(default_question_timeout())),
1172 default: dict
1173 .get("default")
1174 .cloned()
1175 .filter(|value| !matches!(value, VmValue::Nil)),
1176 })
1177}
1178
1179fn default_question_timeout() -> StdDuration {
1180 StdDuration::from_millis(HITL_QUESTION_TIMEOUT_MS)
1181}
1182
1183fn escalation_capability_policy() -> JsonValue {
1184 crate::orchestration::current_execution_policy()
1185 .and_then(|policy| serde_json::to_value(policy).ok())
1186 .unwrap_or(JsonValue::Null)
1187}
1188
1189fn parse_approval_options(
1190 value: Option<&VmValue>,
1191 builtin: &str,
1192) -> Result<ApprovalOptions, VmError> {
1193 let dict = match value {
1194 None => None,
1195 Some(VmValue::Dict(dict)) => Some(dict),
1196 Some(_) => {
1197 return Err(VmError::Runtime(format!(
1198 "{builtin}: options must be a dict"
1199 )))
1200 }
1201 };
1202 let quorum = dict
1203 .and_then(|dict| dict.get("quorum"))
1204 .and_then(VmValue::as_int)
1205 .unwrap_or(1);
1206 if quorum <= 0 {
1207 return Err(VmError::Runtime(format!(
1208 "{builtin}: quorum must be positive"
1209 )));
1210 }
1211 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
1212 let deadline = dict
1213 .and_then(|dict| dict.get("deadline"))
1214 .map(parse_duration_value)
1215 .transpose()?
1216 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
1217 Ok(ApprovalOptions {
1218 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
1219 quorum: quorum as u32,
1220 reviewers,
1221 deadline,
1222 })
1223}
1224
1225fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
1226 args.get(idx)
1227 .map(VmValue::display)
1228 .filter(|value| !value.is_empty())
1229 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
1230}
1231
1232fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
1233 let value = args
1234 .get(idx)
1235 .and_then(VmValue::as_int)
1236 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
1237 if value <= 0 {
1238 return Err(VmError::Runtime(format!(
1239 "{builtin}: expected a positive int at {idx}"
1240 )));
1241 }
1242 Ok(value)
1243}
1244
1245fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
1246 let Some(value) = value else {
1247 return Ok(Vec::new());
1248 };
1249 match value {
1250 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
1251 _ => Err(VmError::Runtime(format!(
1252 "{builtin}: expected list<string>"
1253 ))),
1254 }
1255}
1256
1257fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
1258 match value {
1259 VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
1260 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
1261 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
1262 _ => Err(VmError::Runtime(
1263 "expected a duration or millisecond count".to_string(),
1264 )),
1265 }
1266}
1267
1268fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1269 active_event_log()
1270 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1271}
1272
1273fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1274 if let Some(log) = active_event_log() {
1275 return Ok(log);
1276 }
1277 let Some(base_dir) = base_dir else {
1278 return Ok(install_memory_for_current_thread(
1279 HITL_EVENT_LOG_QUEUE_DEPTH,
1280 ));
1281 };
1282 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1283}
1284
1285fn current_dispatch_keys() -> Option<DispatchKeys> {
1286 let context = current_dispatch_context()?;
1287 let stable_base = context
1288 .replay_of_event_id
1289 .clone()
1290 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1291 let instance_key = format!(
1292 "{}::{}",
1293 context.trigger_event.id.0,
1294 context.replay_of_event_id.as_deref().unwrap_or("live")
1295 );
1296 Some(DispatchKeys {
1297 instance_key,
1298 stable_base,
1299 agent: context.agent_id,
1300 trace_id: context.trigger_event.trace_id.0,
1301 })
1302}
1303
1304fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1305 if let Some(keys) = dispatch_keys {
1306 let seq = REQUEST_SEQUENCE.with(|slot| {
1307 let mut state = slot.borrow_mut();
1308 if state.instance_key != keys.instance_key {
1309 state.instance_key = keys.instance_key.clone();
1310 state.next_seq = 0;
1311 }
1312 state.next_seq += 1;
1313 state.next_seq
1314 });
1315 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1316 }
1317 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1318}
1319
1320fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1321 headers_with_trace(&request.request_id, &request.trace_id)
1322}
1323
1324fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1325 let mut headers = BTreeMap::new();
1326 headers.insert("request_id".to_string(), request_id.to_string());
1327 headers
1328}
1329
1330fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1331 let mut headers = response_headers(request_id);
1332 headers.insert("trace_id".to_string(), trace_id.to_string());
1333 headers
1334}
1335
1336fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1337 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1338}
1339
1340fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1341 event
1342 .headers
1343 .get("request_id")
1344 .is_some_and(|value| value == request_id)
1345 || event
1346 .payload
1347 .get("request_id")
1348 .and_then(JsonValue::as_str)
1349 .is_some_and(|value| value == request_id)
1350}
1351
1352fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1353 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1354 "name": "ApprovalDeniedError",
1355 "category": "generic",
1356 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1357 "request_id": request_id,
1358 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1359 "reason": response.reason,
1360 })))
1361}
1362
1363fn hitl_cancelled_error(
1364 request_id: &str,
1365 kind: HitlRequestKind,
1366 wait_id: &str,
1367 waitpoint_ids: &[String],
1368 reason: Option<String>,
1369) -> VmError {
1370 let _ = categorized_error("HITL cancelled", ErrorCategory::Cancelled);
1371 let message = reason
1372 .clone()
1373 .unwrap_or_else(|| format!("{} cancelled", kind.as_str()));
1374 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1375 "name": "HumanCancelledError",
1376 "category": ErrorCategory::Cancelled.as_str(),
1377 "message": message,
1378 "request_id": request_id,
1379 "kind": kind.as_str(),
1380 "wait_id": wait_id,
1381 "waitpoint_ids": waitpoint_ids,
1382 "reason": reason,
1383 })))
1384}
1385
1386fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1387 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1388 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1389 "name": "HumanTimeoutError",
1390 "category": ErrorCategory::Timeout.as_str(),
1391 "message": format!("{} timed out", kind.as_str()),
1392 "request_id": request_id,
1393 "kind": kind.as_str(),
1394 })))
1395}
1396
1397fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1398 match default {
1399 VmValue::Int(_) => match value {
1400 VmValue::Int(_) => value.clone(),
1401 VmValue::Float(number) => VmValue::Int(*number as i64),
1402 VmValue::String(text) => text
1403 .parse::<i64>()
1404 .map(VmValue::Int)
1405 .unwrap_or_else(|_| default.clone()),
1406 _ => default.clone(),
1407 },
1408 VmValue::Float(_) => match value {
1409 VmValue::Float(_) => value.clone(),
1410 VmValue::Int(number) => VmValue::Float(*number as f64),
1411 VmValue::String(text) => text
1412 .parse::<f64>()
1413 .map(VmValue::Float)
1414 .unwrap_or_else(|_| default.clone()),
1415 _ => default.clone(),
1416 },
1417 VmValue::Bool(_) => match value {
1418 VmValue::Bool(_) => value.clone(),
1419 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1420 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1421 _ => default.clone(),
1422 },
1423 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1424 VmValue::Duration(_) => match value {
1425 VmValue::Duration(_) => value.clone(),
1426 VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1427 _ => default.clone(),
1428 },
1429 VmValue::Nil => value.clone(),
1430 _ => {
1431 if value.type_name() == default.type_name() {
1432 value.clone()
1433 } else {
1434 default.clone()
1435 }
1436 }
1437 }
1438}
1439
1440fn log_error(error: impl std::fmt::Display) -> VmError {
1441 VmError::Runtime(error.to_string())
1442}
1443
1444fn now_rfc3339() -> String {
1445 OffsetDateTime::now_utc()
1446 .format(&Rfc3339)
1447 .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1448}
1449
1450fn new_trace_id() -> String {
1451 format!("trace_{}", Uuid::now_v7())
1452}
1453
1454fn vm_bool(value: &VmValue) -> Option<bool> {
1455 match value {
1456 VmValue::Bool(flag) => Some(*flag),
1457 _ => None,
1458 }
1459}
1460
1461fn vm_string(value: &VmValue) -> Option<&str> {
1462 match value {
1463 VmValue::String(text) => Some(text.as_ref()),
1464 _ => None,
1465 }
1466}
1467
1468fn vm_string_list(value: &VmValue) -> Option<Vec<String>> {
1469 match value {
1470 VmValue::List(values) => Some(values.iter().map(VmValue::display).collect()),
1471 _ => None,
1472 }
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477 use super::{
1478 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1479 };
1480 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1481 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1482
1483 async fn execute_hitl_script(
1484 base_dir: &std::path::Path,
1485 source: &str,
1486 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1487 reset_thread_local_state();
1488 let log = install_default_for_base_dir(base_dir).expect("install event log");
1489 let chunk = compile_source(source).expect("compile source");
1490 let mut vm = Vm::new();
1491 register_vm_stdlib(&mut vm);
1492 vm.set_source_dir(base_dir);
1493 vm.execute(&chunk).await?;
1494 let output = vm.output().trim_end().to_string();
1495 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1496 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1497 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1498 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1499 Ok((
1500 output,
1501 question_events,
1502 approval_events,
1503 dual_control_events,
1504 escalation_events,
1505 ))
1506 }
1507
1508 async fn event_kinds(
1509 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1510 topic: &str,
1511 ) -> Vec<String> {
1512 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1513 .await
1514 .expect("read topic")
1515 .into_iter()
1516 .map(|(_, event)| event.kind)
1517 .collect()
1518 }
1519
1520 #[tokio::test(flavor = "current_thread")]
1521 async fn ask_user_coerces_to_default_type_and_logs_events() {
1522 tokio::task::LocalSet::new()
1523 .run_until(async {
1524 let dir = tempfile::tempdir().expect("tempdir");
1525 let source = r#"
1526pipeline test(task) {
1527 host_mock("hitl", "question", {answer: "9"})
1528 let answer: int = ask_user("Pick a number", {default: 0})
1529 println(answer)
1530}
1531"#;
1532 let (
1533 output,
1534 question_events,
1535 approval_events,
1536 dual_control_events,
1537 escalation_events,
1538 ) = execute_hitl_script(dir.path(), source)
1539 .await
1540 .expect("script succeeds");
1541 assert_eq!(output, "9");
1542 assert_eq!(
1543 question_events,
1544 vec![
1545 "hitl.question_asked".to_string(),
1546 "hitl.response_received".to_string()
1547 ]
1548 );
1549 assert!(approval_events.is_empty());
1550 assert!(dual_control_events.is_empty());
1551 assert!(escalation_events.is_empty());
1552 })
1553 .await;
1554 }
1555
1556 #[tokio::test(flavor = "current_thread")]
1557 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1558 tokio::task::LocalSet::new()
1559 .run_until(async {
1560 let dir = tempfile::tempdir().expect("tempdir");
1561 let source = r#"
1562pipeline test(task) {
1563 host_mock("hitl", "approval", [
1564 {approved: true, reviewer: "alice", reason: "ok"},
1565 {approved: true, reviewer: "bob", reason: "ship it"},
1566 ])
1567 let record = request_approval(
1568 "deploy production",
1569 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1570 )
1571 println(record.approved)
1572 println(len(record.reviewers))
1573 println(record.reviewers[0])
1574 println(record.reviewers[1])
1575}
1576"#;
1577 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1578 .await
1579 .expect("script succeeds");
1580 assert_eq!(
1581 approval_events,
1582 vec![
1583 "hitl.approval_requested".to_string(),
1584 "hitl.response_received".to_string(),
1585 "hitl.response_received".to_string(),
1586 "hitl.approval_approved".to_string(),
1587 ]
1588 );
1589 })
1590 .await;
1591 }
1592
1593 #[tokio::test(flavor = "current_thread")]
1594 async fn request_approval_surfaces_denials_as_typed_errors() {
1595 tokio::task::LocalSet::new()
1596 .run_until(async {
1597 let dir = tempfile::tempdir().expect("tempdir");
1598 let source = r#"
1599pipeline test(task) {
1600 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1601 let denied = try {
1602 request_approval("drop table", {reviewers: ["alice"]})
1603 }
1604 println(is_err(denied))
1605 println(unwrap_err(denied).name)
1606 println(unwrap_err(denied).reason)
1607}
1608"#;
1609 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1610 .await
1611 .expect("script succeeds");
1612 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1613 assert_eq!(
1614 approval_events,
1615 vec![
1616 "hitl.approval_requested".to_string(),
1617 "hitl.response_received".to_string(),
1618 "hitl.approval_denied".to_string(),
1619 ]
1620 );
1621 })
1622 .await;
1623 }
1624
1625 #[tokio::test(flavor = "current_thread")]
1626 async fn dual_control_executes_action_after_quorum() {
1627 tokio::task::LocalSet::new()
1628 .run_until(async {
1629 let dir = tempfile::tempdir().expect("tempdir");
1630 let source = r#"
1631pipeline test(task) {
1632 host_mock("hitl", "dual_control", [
1633 {approved: true, reviewer: "alice"},
1634 {approved: true, reviewer: "bob"},
1635 ])
1636 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1637 println(result)
1638}
1639"#;
1640 let (output, _, _, dual_control_events, _) =
1641 execute_hitl_script(dir.path(), source)
1642 .await
1643 .expect("script succeeds");
1644 assert_eq!(output, "launched");
1645 assert_eq!(
1646 dual_control_events,
1647 vec![
1648 "hitl.dual_control_requested".to_string(),
1649 "hitl.response_received".to_string(),
1650 "hitl.response_received".to_string(),
1651 "hitl.dual_control_approved".to_string(),
1652 "hitl.dual_control_executed".to_string(),
1653 ]
1654 );
1655 })
1656 .await;
1657 }
1658
1659 #[tokio::test(flavor = "current_thread")]
1660 async fn escalate_to_waits_for_acceptance_event() {
1661 tokio::task::LocalSet::new()
1662 .run_until(async {
1663 let dir = tempfile::tempdir().expect("tempdir");
1664 let source = r#"
1665pipeline test(task) {
1666 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1667 let handle = escalate_to("admin", "need override")
1668 println(handle.status)
1669 println(handle.reviewer)
1670}
1671"#;
1672 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1673 .await
1674 .expect("script succeeds");
1675 assert_eq!(output, "accepted\nlead");
1676 assert_eq!(
1677 escalation_events,
1678 vec![
1679 "hitl.escalation_issued".to_string(),
1680 "hitl.escalation_accepted".to_string(),
1681 ]
1682 );
1683 })
1684 .await;
1685 }
1686}