1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::path::Path;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::time::{Duration as StdDuration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use uuid::Uuid;
14
15use crate::event_log::{
16 active_event_log, install_default_for_base_dir, install_memory_for_current_thread, AnyEventLog,
17 EventLog, LogEvent, Topic,
18};
19use crate::schema::schema_expect_value;
20use crate::stdlib::host::dispatch_mock_host_call;
21use crate::triggers::dispatcher::current_dispatch_context;
22use crate::value::{categorized_error, ErrorCategory, VmError, VmValue};
23use crate::vm::{clone_async_builtin_child_vm, Vm};
24
25const HITL_EVENT_LOG_QUEUE_DEPTH: usize = 128;
26const HITL_APPROVAL_TIMEOUT_MS: u64 = 24 * 60 * 60 * 1000;
27
28pub const HITL_QUESTIONS_TOPIC: &str = "hitl.questions";
29pub const HITL_APPROVALS_TOPIC: &str = "hitl.approvals";
30pub const HITL_DUAL_CONTROL_TOPIC: &str = "hitl.dual_control";
31pub const HITL_ESCALATIONS_TOPIC: &str = "hitl.escalations";
32
33thread_local! {
34 static REQUEST_SEQUENCE: RefCell<RequestSequenceState> = RefCell::new(RequestSequenceState::default());
35}
36
37#[derive(Default)]
38pub(crate) struct RequestSequenceState {
39 pub(crate) instance_key: String,
40 pub(crate) next_seq: u64,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum HitlRequestKind {
46 Question,
47 Approval,
48 DualControl,
49 Escalation,
50}
51
52impl HitlRequestKind {
53 pub(crate) fn as_str(self) -> &'static str {
54 match self {
55 Self::Question => "question",
56 Self::Approval => "approval",
57 Self::DualControl => "dual_control",
58 Self::Escalation => "escalation",
59 }
60 }
61
62 fn topic(self) -> &'static str {
63 match self {
64 Self::Question => HITL_QUESTIONS_TOPIC,
65 Self::Approval => HITL_APPROVALS_TOPIC,
66 Self::DualControl => HITL_DUAL_CONTROL_TOPIC,
67 Self::Escalation => HITL_ESCALATIONS_TOPIC,
68 }
69 }
70
71 fn request_event_kind(self) -> &'static str {
72 match self {
73 Self::Question => "hitl.question_asked",
74 Self::Approval => "hitl.approval_requested",
75 Self::DualControl => "hitl.dual_control_requested",
76 Self::Escalation => "hitl.escalation_issued",
77 }
78 }
79
80 pub(crate) fn from_request_id(request_id: &str) -> Option<Self> {
81 if request_id.starts_with("hitl_question_") {
82 Some(Self::Question)
83 } else if request_id.starts_with("hitl_approval_") {
84 Some(Self::Approval)
85 } else if request_id.starts_with("hitl_dual_control_") {
86 Some(Self::DualControl)
87 } else if request_id.starts_with("hitl_escalation_") {
88 Some(Self::Escalation)
89 } else {
90 None
91 }
92 }
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct HitlHostResponse {
97 pub request_id: String,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub answer: Option<JsonValue>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub approved: Option<bool>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub accepted: Option<bool>,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub reviewer: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub reason: Option<String>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub metadata: Option<JsonValue>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub responded_at: Option<String>,
112}
113
114#[derive(Clone, Debug, Serialize, Deserialize)]
115struct HitlRequestEnvelope {
116 request_id: String,
117 kind: HitlRequestKind,
118 #[serde(default)]
119 agent: String,
120 trace_id: String,
121 requested_at: String,
122 payload: JsonValue,
123}
124
125#[derive(Clone, Debug, Serialize, Deserialize)]
126struct HitlTimeoutRecord {
127 request_id: String,
128 kind: HitlRequestKind,
129 trace_id: String,
130 timed_out_at: String,
131}
132
133#[derive(Clone, Debug)]
134struct DispatchKeys {
135 instance_key: String,
136 stable_base: String,
137 agent: String,
138 trace_id: String,
139}
140
141#[derive(Clone, Debug)]
142struct AskUserOptions {
143 schema: Option<VmValue>,
144 timeout: Option<StdDuration>,
145 default: Option<VmValue>,
146}
147
148#[derive(Clone, Debug)]
149struct ApprovalOptions {
150 detail: Option<VmValue>,
151 quorum: u32,
152 reviewers: Vec<String>,
153 deadline: StdDuration,
154}
155
156#[derive(Clone, Debug)]
157struct ApprovalProgress {
158 reviewers: BTreeSet<String>,
159 reason: Option<String>,
160 approved_at: Option<String>,
161}
162
163#[derive(Clone, Debug)]
164enum WaitOutcome {
165 Response(HitlHostResponse),
166 Timeout,
167}
168
169pub(crate) fn register_hitl_builtins(vm: &mut Vm) {
170 vm.register_async_builtin("ask_user", |args| {
171 Box::pin(async move { ask_user_impl(&args).await })
172 });
173
174 vm.register_async_builtin("request_approval", |args| {
175 Box::pin(async move { request_approval_impl(&args).await })
176 });
177
178 vm.register_async_builtin("dual_control", |args| {
179 Box::pin(async move { dual_control_impl(&args).await })
180 });
181
182 vm.register_async_builtin("escalate_to", |args| {
183 Box::pin(async move { escalate_to_impl(&args).await })
184 });
185}
186
187pub(crate) fn reset_hitl_state() {
188 REQUEST_SEQUENCE.with(|slot| {
189 *slot.borrow_mut() = RequestSequenceState::default();
190 });
191}
192
193pub(crate) fn take_hitl_state() -> RequestSequenceState {
194 REQUEST_SEQUENCE.with(|slot| std::mem::take(&mut *slot.borrow_mut()))
195}
196
197pub(crate) fn restore_hitl_state(state: RequestSequenceState) {
198 REQUEST_SEQUENCE.with(|slot| {
199 *slot.borrow_mut() = state;
200 });
201}
202
203pub async fn append_hitl_response(
204 base_dir: Option<&Path>,
205 mut response: HitlHostResponse,
206) -> Result<u64, String> {
207 let kind = HitlRequestKind::from_request_id(&response.request_id)
208 .ok_or_else(|| format!("unknown HITL request id '{}'", response.request_id))?;
209 if response.responded_at.is_none() {
210 response.responded_at = Some(now_rfc3339());
211 }
212 let log = ensure_hitl_event_log_for(base_dir)?;
213 let headers = response_headers(&response.request_id);
214 let topic = Topic::new(kind.topic()).map_err(|error| error.to_string())?;
215 log.append(
216 &topic,
217 LogEvent::new(
218 match kind {
219 HitlRequestKind::Escalation => "hitl.escalation_accepted",
220 _ => "hitl.response_received",
221 },
222 serde_json::to_value(response).map_err(|error| error.to_string())?,
223 )
224 .with_headers(headers),
225 )
226 .await
227 .map_err(|error| error.to_string())
228}
229
230async fn ask_user_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
231 let prompt = required_string_arg(args, 0, "ask_user")?;
232 let options = parse_ask_user_options(args.get(1))?;
233 let keys = current_dispatch_keys();
234 let request_id = next_request_id(HitlRequestKind::Question, keys.as_ref());
235 let trace_id = keys
236 .as_ref()
237 .map(|keys| keys.trace_id.clone())
238 .unwrap_or_else(new_trace_id);
239 let log = ensure_hitl_event_log();
240 let request = HitlRequestEnvelope {
241 request_id: request_id.clone(),
242 kind: HitlRequestKind::Question,
243 agent: keys
244 .as_ref()
245 .map(|keys| keys.agent.clone())
246 .unwrap_or_default(),
247 trace_id: trace_id.clone(),
248 requested_at: now_rfc3339(),
249 payload: json!({
250 "prompt": prompt,
251 "schema": options.schema.as_ref().map(crate::llm::vm_value_to_json),
252 "default": options.default.as_ref().map(crate::llm::vm_value_to_json),
253 "timeout_ms": options.timeout.map(|timeout| timeout.as_millis() as u64),
254 }),
255 };
256 append_request(&log, &request).await?;
257 maybe_notify_host(&request);
258 maybe_apply_mock_response(HitlRequestKind::Question, &request_id, &request.payload).await?;
259
260 match wait_for_terminal_event(
261 &log,
262 HitlRequestKind::Question,
263 &request_id,
264 options.timeout,
265 &trace_id,
266 )
267 .await?
268 {
269 WaitOutcome::Timeout => {
270 if let Some(default) = options.default {
271 return Ok(default);
272 }
273 Err(timeout_error(&request_id, HitlRequestKind::Question))
274 }
275 WaitOutcome::Response(response) => {
276 let answer = response
277 .answer
278 .as_ref()
279 .map(crate::stdlib::json_to_vm_value)
280 .unwrap_or(VmValue::Nil);
281 if let Some(schema) = options.schema.as_ref() {
282 return schema_expect_value(&answer, schema, true);
283 }
284 if let Some(default) = options.default.as_ref() {
285 return Ok(coerce_like_default(&answer, default));
286 }
287 Ok(answer)
288 }
289 }
290}
291
292async fn request_approval_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
293 let action = required_string_arg(args, 0, "request_approval")?;
294 let options = parse_approval_options(args.get(1), "request_approval")?;
295 let keys = current_dispatch_keys();
296 let request_id = next_request_id(HitlRequestKind::Approval, keys.as_ref());
297 let trace_id = keys
298 .as_ref()
299 .map(|keys| keys.trace_id.clone())
300 .unwrap_or_else(new_trace_id);
301 let log = ensure_hitl_event_log();
302 let request = HitlRequestEnvelope {
303 request_id: request_id.clone(),
304 kind: HitlRequestKind::Approval,
305 agent: keys
306 .as_ref()
307 .map(|keys| keys.agent.clone())
308 .unwrap_or_default(),
309 trace_id: trace_id.clone(),
310 requested_at: now_rfc3339(),
311 payload: json!({
312 "action": action,
313 "detail": options.detail.as_ref().map(crate::llm::vm_value_to_json),
314 "quorum": options.quorum,
315 "reviewers": options.reviewers,
316 "deadline_ms": options.deadline.as_millis() as u64,
317 }),
318 };
319 append_request(&log, &request).await?;
320 maybe_notify_host(&request);
321 maybe_apply_mock_response(HitlRequestKind::Approval, &request_id, &request.payload).await?;
322
323 let record = wait_for_approval_quorum(
324 &log,
325 HitlRequestKind::Approval,
326 &request_id,
327 options.quorum,
328 &options.reviewers,
329 options.deadline,
330 &trace_id,
331 )
332 .await?;
333
334 append_named_event(
335 &log,
336 HitlRequestKind::Approval,
337 "hitl.approval_approved",
338 &request_id,
339 &trace_id,
340 json!({
341 "request_id": request_id,
342 "record": crate::llm::vm_value_to_json(&record),
343 }),
344 )
345 .await?;
346
347 Ok(record)
348}
349
350async fn dual_control_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
351 let n = required_positive_int_arg(args, 0, "dual_control")?;
352 let m = required_positive_int_arg(args, 1, "dual_control")?;
353 if n > m {
354 return Err(VmError::Runtime(
355 "dual_control: n must be less than or equal to m".to_string(),
356 ));
357 }
358 let action = args
359 .get(2)
360 .and_then(|value| match value {
361 VmValue::Closure(closure) => Some(closure.clone()),
362 _ => None,
363 })
364 .ok_or_else(|| VmError::Runtime("dual_control: action must be a closure".to_string()))?;
365 let approvers = optional_string_list(args.get(3), "dual_control")?;
366 if !approvers.is_empty() && approvers.len() < m as usize {
367 return Err(VmError::Runtime(format!(
368 "dual_control: expected at least {m} approvers, got {}",
369 approvers.len()
370 )));
371 }
372
373 let keys = current_dispatch_keys();
374 let request_id = next_request_id(HitlRequestKind::DualControl, keys.as_ref());
375 let trace_id = keys
376 .as_ref()
377 .map(|keys| keys.trace_id.clone())
378 .unwrap_or_else(new_trace_id);
379 let action_name = if action.func.name.is_empty() {
380 "anonymous".to_string()
381 } else {
382 action.func.name.clone()
383 };
384 let log = ensure_hitl_event_log();
385 let request = HitlRequestEnvelope {
386 request_id: request_id.clone(),
387 kind: HitlRequestKind::DualControl,
388 agent: keys
389 .as_ref()
390 .map(|keys| keys.agent.clone())
391 .unwrap_or_default(),
392 trace_id: trace_id.clone(),
393 requested_at: now_rfc3339(),
394 payload: json!({
395 "n": n,
396 "m": m,
397 "action": action_name,
398 "approvers": approvers,
399 "deadline_ms": HITL_APPROVAL_TIMEOUT_MS,
400 }),
401 };
402 append_request(&log, &request).await?;
403 maybe_notify_host(&request);
404 maybe_apply_mock_response(HitlRequestKind::DualControl, &request_id, &request.payload).await?;
405
406 let record = wait_for_approval_quorum(
407 &log,
408 HitlRequestKind::DualControl,
409 &request_id,
410 n as u32,
411 &approvers,
412 StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS),
413 &trace_id,
414 )
415 .await?;
416
417 append_named_event(
418 &log,
419 HitlRequestKind::DualControl,
420 "hitl.dual_control_approved",
421 &request_id,
422 &trace_id,
423 json!({
424 "request_id": request_id,
425 "record": crate::llm::vm_value_to_json(&record),
426 }),
427 )
428 .await?;
429
430 let mut vm = clone_async_builtin_child_vm().ok_or_else(|| {
431 VmError::Runtime("dual_control requires an async builtin VM context".to_string())
432 })?;
433 let result = vm.call_closure_pub(&action, &[], &[]).await?;
434
435 append_named_event(
436 &log,
437 HitlRequestKind::DualControl,
438 "hitl.dual_control_executed",
439 &request_id,
440 &trace_id,
441 json!({
442 "request_id": request_id,
443 "result": crate::llm::vm_value_to_json(&result),
444 }),
445 )
446 .await?;
447
448 Ok(result)
449}
450
451async fn escalate_to_impl(args: &[VmValue]) -> Result<VmValue, VmError> {
452 let role = required_string_arg(args, 0, "escalate_to")?;
453 let reason = required_string_arg(args, 1, "escalate_to")?;
454 let keys = current_dispatch_keys();
455 let request_id = next_request_id(HitlRequestKind::Escalation, keys.as_ref());
456 let trace_id = keys
457 .as_ref()
458 .map(|keys| keys.trace_id.clone())
459 .unwrap_or_else(new_trace_id);
460 let log = ensure_hitl_event_log();
461 let request = HitlRequestEnvelope {
462 request_id: request_id.clone(),
463 kind: HitlRequestKind::Escalation,
464 agent: keys
465 .as_ref()
466 .map(|keys| keys.agent.clone())
467 .unwrap_or_default(),
468 trace_id: trace_id.clone(),
469 requested_at: now_rfc3339(),
470 payload: json!({
471 "role": role,
472 "reason": reason,
473 }),
474 };
475 append_request(&log, &request).await?;
476 maybe_notify_host(&request);
477 maybe_apply_mock_response(HitlRequestKind::Escalation, &request_id, &request.payload).await?;
478
479 match wait_for_terminal_event(
480 &log,
481 HitlRequestKind::Escalation,
482 &request_id,
483 None,
484 &trace_id,
485 )
486 .await?
487 {
488 WaitOutcome::Timeout => Err(timeout_error(&request_id, HitlRequestKind::Escalation)),
489 WaitOutcome::Response(response) => Ok(crate::stdlib::json_to_vm_value(&json!({
490 "request_id": request_id,
491 "role": role,
492 "reason": reason,
493 "trace_id": trace_id,
494 "status": if response.accepted.unwrap_or(false) { "accepted" } else { "pending" },
495 "accepted_at": response.responded_at,
496 "reviewer": response.reviewer,
497 }))),
498 }
499}
500
501async fn wait_for_approval_quorum(
502 log: &Arc<AnyEventLog>,
503 kind: HitlRequestKind,
504 request_id: &str,
505 quorum: u32,
506 reviewers: &[String],
507 timeout: StdDuration,
508 trace_id: &str,
509) -> Result<VmValue, VmError> {
510 let deadline = Instant::now() + timeout;
511 let mut progress = ApprovalProgress {
512 reviewers: BTreeSet::new(),
513 reason: None,
514 approved_at: None,
515 };
516 let allowed_reviewers: BTreeSet<&str> = reviewers.iter().map(String::as_str).collect();
517 let topic = topic(kind)?;
518
519 process_existing_approval_events(
520 log,
521 &topic,
522 request_id,
523 quorum,
524 &allowed_reviewers,
525 &mut progress,
526 trace_id,
527 )
528 .await?;
529 if progress.reviewers.len() as u32 >= quorum {
530 return Ok(approval_record_value(&progress));
531 }
532 if is_replay() {
533 return Err(VmError::Runtime(format!(
534 "replay is missing recorded HITL approvals for '{request_id}'"
535 )));
536 }
537
538 let start_from = log.latest(&topic).await.map_err(log_error)?;
539 let stream = log
540 .clone()
541 .subscribe(&topic, start_from)
542 .await
543 .map_err(log_error)?;
544 pin_mut!(stream);
545 loop {
546 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
547 append_timeout(log, kind, request_id, trace_id).await?;
548 return Err(timeout_error(request_id, kind));
549 };
550 let next = tokio::time::timeout(remaining, stream.next()).await;
551 let event = match next {
552 Ok(Some(Ok((_, event)))) => event,
553 Ok(Some(Err(error))) => return Err(log_error(error)),
554 Ok(None) => {
555 append_timeout(log, kind, request_id, trace_id).await?;
556 return Err(timeout_error(request_id, kind));
557 }
558 Err(_) => {
559 append_timeout(log, kind, request_id, trace_id).await?;
560 return Err(timeout_error(request_id, kind));
561 }
562 };
563 if !event_matches_request(&event, request_id) {
564 continue;
565 }
566 if event.kind == "hitl.timeout" {
567 return Err(timeout_error(request_id, kind));
568 }
569 if event.kind != "hitl.response_received" {
570 continue;
571 }
572 let response: HitlHostResponse = serde_json::from_value(event.payload)
573 .map_err(|error| VmError::Runtime(error.to_string()))?;
574 if let Some(reviewer) = response.reviewer.as_deref() {
575 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
576 continue;
577 }
578 if progress.reviewers.contains(reviewer) {
579 continue;
580 }
581 }
582 if response.approved.unwrap_or(false) {
583 if let Some(reviewer) = response.reviewer {
584 progress.reviewers.insert(reviewer);
585 }
586 progress.reason = response.reason;
587 progress.approved_at = response.responded_at;
588 if progress.reviewers.len() as u32 >= quorum {
589 return Ok(approval_record_value(&progress));
590 }
591 continue;
592 }
593
594 append_named_event(
595 log,
596 kind,
597 match kind {
598 HitlRequestKind::DualControl => "hitl.dual_control_denied",
599 _ => "hitl.approval_denied",
600 },
601 request_id,
602 trace_id,
603 json!({
604 "request_id": request_id,
605 "reviewer": response.reviewer,
606 "reason": response.reason,
607 }),
608 )
609 .await?;
610 return Err(approval_denied_error(request_id, response));
611 }
612}
613
614async fn process_existing_approval_events(
615 log: &Arc<AnyEventLog>,
616 topic: &Topic,
617 request_id: &str,
618 quorum: u32,
619 allowed_reviewers: &BTreeSet<&str>,
620 progress: &mut ApprovalProgress,
621 trace_id: &str,
622) -> Result<(), VmError> {
623 let existing = log
624 .read_range(topic, None, usize::MAX)
625 .await
626 .map_err(log_error)?;
627 for (_, event) in existing {
628 if !event_matches_request(&event, request_id) {
629 continue;
630 }
631 if event.kind == "hitl.timeout" {
632 return Err(timeout_error(request_id, kind_from_topic(topic)?));
633 }
634 if event.kind != "hitl.response_received" {
635 continue;
636 }
637 let response: HitlHostResponse = serde_json::from_value(event.payload)
638 .map_err(|error| VmError::Runtime(error.to_string()))?;
639 let Some(reviewer) = response.reviewer.as_deref() else {
640 continue;
641 };
642 if !allowed_reviewers.is_empty() && !allowed_reviewers.contains(reviewer) {
643 continue;
644 }
645 if !response.approved.unwrap_or(false) {
646 let kind = kind_from_topic(topic)?;
647 append_named_event(
648 log,
649 kind,
650 match kind {
651 HitlRequestKind::DualControl => "hitl.dual_control_denied",
652 _ => "hitl.approval_denied",
653 },
654 request_id,
655 trace_id,
656 json!({
657 "request_id": request_id,
658 "reviewer": response.reviewer,
659 "reason": response.reason,
660 }),
661 )
662 .await?;
663 return Err(approval_denied_error(request_id, response));
664 }
665 if progress.reviewers.insert(reviewer.to_string()) {
666 progress.reason = response.reason;
667 progress.approved_at = response.responded_at;
668 }
669 if progress.reviewers.len() as u32 >= quorum {
670 break;
671 }
672 }
673 Ok(())
674}
675
676async fn wait_for_terminal_event(
677 log: &Arc<AnyEventLog>,
678 kind: HitlRequestKind,
679 request_id: &str,
680 timeout: Option<StdDuration>,
681 trace_id: &str,
682) -> Result<WaitOutcome, VmError> {
683 let topic = topic(kind)?;
684 if let Some(outcome) = find_existing_terminal_event(log, &topic, request_id).await? {
685 return Ok(outcome);
686 }
687 if is_replay() {
688 return Err(VmError::Runtime(format!(
689 "replay is missing a recorded HITL response for '{request_id}'"
690 )));
691 }
692
693 let start_from = log.latest(&topic).await.map_err(log_error)?;
694 let stream = log
695 .clone()
696 .subscribe(&topic, start_from)
697 .await
698 .map_err(log_error)?;
699 pin_mut!(stream);
700 let deadline = timeout.map(|timeout| Instant::now() + timeout);
701
702 loop {
703 let next_event = if let Some(deadline) = deadline {
704 let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
705 append_timeout(log, kind, request_id, trace_id).await?;
706 return Ok(WaitOutcome::Timeout);
707 };
708 match tokio::time::timeout(remaining, stream.next()).await {
709 Ok(next) => next,
710 Err(_) => {
711 append_timeout(log, kind, request_id, trace_id).await?;
712 return Ok(WaitOutcome::Timeout);
713 }
714 }
715 } else {
716 stream.next().await
717 };
718
719 let Some(received) = next_event else {
720 return Err(VmError::Runtime(format!(
721 "HITL wait for '{request_id}' ended before the host responded"
722 )));
723 };
724 let (_, event) = received.map_err(log_error)?;
725 if !event_matches_request(&event, request_id) {
726 continue;
727 }
728 match event.kind.as_str() {
729 "hitl.timeout" => return Ok(WaitOutcome::Timeout),
730 "hitl.response_received" | "hitl.escalation_accepted" => {
731 let response: HitlHostResponse = serde_json::from_value(event.payload)
732 .map_err(|error| VmError::Runtime(error.to_string()))?;
733 if kind == HitlRequestKind::Escalation && !response.accepted.unwrap_or(false) {
734 continue;
735 }
736 return Ok(WaitOutcome::Response(response));
737 }
738 _ => {}
739 }
740 }
741}
742
743async fn find_existing_terminal_event(
744 log: &Arc<AnyEventLog>,
745 topic: &Topic,
746 request_id: &str,
747) -> Result<Option<WaitOutcome>, VmError> {
748 let events = log
749 .read_range(topic, None, usize::MAX)
750 .await
751 .map_err(log_error)?;
752 for (_, event) in events {
753 if !event_matches_request(&event, request_id) {
754 continue;
755 }
756 match event.kind.as_str() {
757 "hitl.timeout" => return Ok(Some(WaitOutcome::Timeout)),
758 "hitl.response_received" | "hitl.escalation_accepted" => {
759 let response: HitlHostResponse = serde_json::from_value(event.payload)
760 .map_err(|error| VmError::Runtime(error.to_string()))?;
761 if event.kind == "hitl.escalation_accepted" && !response.accepted.unwrap_or(false) {
762 continue;
763 }
764 return Ok(Some(WaitOutcome::Response(response)));
765 }
766 _ => {}
767 }
768 }
769 Ok(None)
770}
771
772async fn append_request(
773 log: &Arc<AnyEventLog>,
774 request: &HitlRequestEnvelope,
775) -> Result<(), VmError> {
776 let topic = topic(request.kind)?;
777 log.append(
778 &topic,
779 LogEvent::new(
780 request.kind.request_event_kind(),
781 serde_json::to_value(request).map_err(|error| VmError::Runtime(error.to_string()))?,
782 )
783 .with_headers(request_headers(request)),
784 )
785 .await
786 .map(|_| ())
787 .map_err(log_error)
788}
789
790async fn append_named_event(
791 log: &Arc<AnyEventLog>,
792 kind: HitlRequestKind,
793 event_kind: &str,
794 request_id: &str,
795 trace_id: &str,
796 payload: JsonValue,
797) -> Result<(), VmError> {
798 let topic = topic(kind)?;
799 let headers = headers_with_trace(request_id, trace_id);
800 log.append(
801 &topic,
802 LogEvent::new(event_kind, payload).with_headers(headers),
803 )
804 .await
805 .map(|_| ())
806 .map_err(log_error)
807}
808
809async fn append_timeout(
810 log: &Arc<AnyEventLog>,
811 kind: HitlRequestKind,
812 request_id: &str,
813 trace_id: &str,
814) -> Result<(), VmError> {
815 append_named_event(
816 log,
817 kind,
818 "hitl.timeout",
819 request_id,
820 trace_id,
821 serde_json::to_value(HitlTimeoutRecord {
822 request_id: request_id.to_string(),
823 kind,
824 trace_id: trace_id.to_string(),
825 timed_out_at: now_rfc3339(),
826 })
827 .map_err(|error| VmError::Runtime(error.to_string()))?,
828 )
829 .await
830}
831
832async fn maybe_apply_mock_response(
833 kind: HitlRequestKind,
834 request_id: &str,
835 request_payload: &JsonValue,
836) -> Result<(), VmError> {
837 let mut params = request_payload
838 .as_object()
839 .cloned()
840 .unwrap_or_default()
841 .into_iter()
842 .map(|(key, value)| (key, crate::stdlib::json_to_vm_value(&value)))
843 .collect::<BTreeMap<_, _>>();
844 params.insert(
845 "request_id".to_string(),
846 VmValue::String(Rc::from(request_id.to_string())),
847 );
848 let Some(result) = dispatch_mock_host_call("hitl", kind.as_str(), ¶ms) else {
849 return Ok(());
850 };
851 let value = result?;
852 let responses = match value {
853 VmValue::List(items) => items.iter().cloned().collect::<Vec<_>>(),
854 other => vec![other],
855 };
856 for response in responses {
857 let response_dict = response.as_dict().ok_or_else(|| {
858 VmError::Runtime(format!(
859 "mocked HITL {} response must be a dict or list<dict>",
860 kind.as_str()
861 ))
862 })?;
863 let hitl_response = parse_hitl_response_dict(request_id, response_dict)?;
864 append_hitl_response(None, hitl_response)
865 .await
866 .map_err(VmError::Runtime)?;
867 }
868 Ok(())
869}
870
871fn parse_hitl_response_dict(
872 request_id: &str,
873 response_dict: &BTreeMap<String, VmValue>,
874) -> Result<HitlHostResponse, VmError> {
875 Ok(HitlHostResponse {
876 request_id: request_id.to_string(),
877 answer: response_dict
878 .get("answer")
879 .map(crate::llm::vm_value_to_json),
880 approved: response_dict.get("approved").and_then(vm_bool),
881 accepted: response_dict.get("accepted").and_then(vm_bool),
882 reviewer: response_dict.get("reviewer").map(VmValue::display),
883 reason: response_dict.get("reason").map(VmValue::display),
884 metadata: response_dict
885 .get("metadata")
886 .map(crate::llm::vm_value_to_json),
887 responded_at: response_dict.get("responded_at").map(VmValue::display),
888 })
889}
890
891fn maybe_notify_host(request: &HitlRequestEnvelope) {
892 let Some(bridge) = clone_async_builtin_child_vm().and_then(|vm| vm.bridge.clone()) else {
893 return;
894 };
895 bridge.notify(
896 "harn.hitl.requested",
897 serde_json::to_value(request).unwrap_or(JsonValue::Null),
898 );
899}
900
901fn parse_ask_user_options(value: Option<&VmValue>) -> Result<AskUserOptions, VmError> {
902 let Some(value) = value else {
903 return Ok(AskUserOptions {
904 schema: None,
905 timeout: None,
906 default: None,
907 });
908 };
909 let dict = value
910 .as_dict()
911 .ok_or_else(|| VmError::Runtime("ask_user: options must be a dict".to_string()))?;
912 Ok(AskUserOptions {
913 schema: dict
914 .get("schema")
915 .cloned()
916 .filter(|value| !matches!(value, VmValue::Nil)),
917 timeout: dict.get("timeout").map(parse_duration_value).transpose()?,
918 default: dict
919 .get("default")
920 .cloned()
921 .filter(|value| !matches!(value, VmValue::Nil)),
922 })
923}
924
925fn parse_approval_options(
926 value: Option<&VmValue>,
927 builtin: &str,
928) -> Result<ApprovalOptions, VmError> {
929 let dict = match value {
930 None => None,
931 Some(VmValue::Dict(dict)) => Some(dict),
932 Some(_) => {
933 return Err(VmError::Runtime(format!(
934 "{builtin}: options must be a dict"
935 )))
936 }
937 };
938 let quorum = dict
939 .and_then(|dict| dict.get("quorum"))
940 .and_then(VmValue::as_int)
941 .unwrap_or(1);
942 if quorum <= 0 {
943 return Err(VmError::Runtime(format!(
944 "{builtin}: quorum must be positive"
945 )));
946 }
947 let reviewers = optional_string_list(dict.and_then(|dict| dict.get("reviewers")), builtin)?;
948 let deadline = dict
949 .and_then(|dict| dict.get("deadline"))
950 .map(parse_duration_value)
951 .transpose()?
952 .unwrap_or_else(|| StdDuration::from_millis(HITL_APPROVAL_TIMEOUT_MS));
953 Ok(ApprovalOptions {
954 detail: dict.and_then(|dict| dict.get("detail")).cloned(),
955 quorum: quorum as u32,
956 reviewers,
957 deadline,
958 })
959}
960
961fn required_string_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<String, VmError> {
962 args.get(idx)
963 .map(VmValue::display)
964 .filter(|value| !value.is_empty())
965 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected string argument at {idx}")))
966}
967
968fn required_positive_int_arg(args: &[VmValue], idx: usize, builtin: &str) -> Result<i64, VmError> {
969 let value = args
970 .get(idx)
971 .and_then(VmValue::as_int)
972 .ok_or_else(|| VmError::Runtime(format!("{builtin}: expected int argument at {idx}")))?;
973 if value <= 0 {
974 return Err(VmError::Runtime(format!(
975 "{builtin}: expected a positive int at {idx}"
976 )));
977 }
978 Ok(value)
979}
980
981fn optional_string_list(value: Option<&VmValue>, builtin: &str) -> Result<Vec<String>, VmError> {
982 let Some(value) = value else {
983 return Ok(Vec::new());
984 };
985 match value {
986 VmValue::List(list) => Ok(list.iter().map(VmValue::display).collect()),
987 _ => Err(VmError::Runtime(format!(
988 "{builtin}: expected list<string>"
989 ))),
990 }
991}
992
993fn parse_duration_value(value: &VmValue) -> Result<StdDuration, VmError> {
994 match value {
995 VmValue::Duration(ms) => Ok(StdDuration::from_millis(*ms)),
996 VmValue::Int(ms) if *ms >= 0 => Ok(StdDuration::from_millis(*ms as u64)),
997 VmValue::Float(ms) if *ms >= 0.0 => Ok(StdDuration::from_millis(*ms as u64)),
998 _ => Err(VmError::Runtime(
999 "expected a duration or millisecond count".to_string(),
1000 )),
1001 }
1002}
1003
1004fn ensure_hitl_event_log() -> Arc<AnyEventLog> {
1005 active_event_log()
1006 .unwrap_or_else(|| install_memory_for_current_thread(HITL_EVENT_LOG_QUEUE_DEPTH))
1007}
1008
1009fn ensure_hitl_event_log_for(base_dir: Option<&Path>) -> Result<Arc<AnyEventLog>, String> {
1010 if let Some(log) = active_event_log() {
1011 return Ok(log);
1012 }
1013 let Some(base_dir) = base_dir else {
1014 return Ok(install_memory_for_current_thread(
1015 HITL_EVENT_LOG_QUEUE_DEPTH,
1016 ));
1017 };
1018 install_default_for_base_dir(base_dir).map_err(|error| error.to_string())
1019}
1020
1021fn current_dispatch_keys() -> Option<DispatchKeys> {
1022 let context = current_dispatch_context()?;
1023 let stable_base = context
1024 .replay_of_event_id
1025 .clone()
1026 .unwrap_or_else(|| context.trigger_event.id.0.clone());
1027 let instance_key = format!(
1028 "{}::{}",
1029 context.trigger_event.id.0,
1030 context.replay_of_event_id.as_deref().unwrap_or("live")
1031 );
1032 Some(DispatchKeys {
1033 instance_key,
1034 stable_base,
1035 agent: context.agent_id,
1036 trace_id: context.trigger_event.trace_id.0,
1037 })
1038}
1039
1040fn next_request_id(kind: HitlRequestKind, dispatch_keys: Option<&DispatchKeys>) -> String {
1041 if let Some(keys) = dispatch_keys {
1042 let seq = REQUEST_SEQUENCE.with(|slot| {
1043 let mut state = slot.borrow_mut();
1044 if state.instance_key != keys.instance_key {
1045 state.instance_key = keys.instance_key.clone();
1046 state.next_seq = 0;
1047 }
1048 state.next_seq += 1;
1049 state.next_seq
1050 });
1051 return format!("hitl_{}_{}_{}", kind.as_str(), keys.stable_base, seq);
1052 }
1053 format!("hitl_{}_{}", kind.as_str(), Uuid::now_v7())
1054}
1055
1056fn request_headers(request: &HitlRequestEnvelope) -> BTreeMap<String, String> {
1057 headers_with_trace(&request.request_id, &request.trace_id)
1058}
1059
1060fn response_headers(request_id: &str) -> BTreeMap<String, String> {
1061 let mut headers = BTreeMap::new();
1062 headers.insert("request_id".to_string(), request_id.to_string());
1063 headers
1064}
1065
1066fn headers_with_trace(request_id: &str, trace_id: &str) -> BTreeMap<String, String> {
1067 let mut headers = response_headers(request_id);
1068 headers.insert("trace_id".to_string(), trace_id.to_string());
1069 headers
1070}
1071
1072fn topic(kind: HitlRequestKind) -> Result<Topic, VmError> {
1073 Topic::new(kind.topic()).map_err(|error| VmError::Runtime(error.to_string()))
1074}
1075
1076fn kind_from_topic(topic: &Topic) -> Result<HitlRequestKind, VmError> {
1077 match topic.as_str() {
1078 HITL_QUESTIONS_TOPIC => Ok(HitlRequestKind::Question),
1079 HITL_APPROVALS_TOPIC => Ok(HitlRequestKind::Approval),
1080 HITL_DUAL_CONTROL_TOPIC => Ok(HitlRequestKind::DualControl),
1081 HITL_ESCALATIONS_TOPIC => Ok(HitlRequestKind::Escalation),
1082 other => Err(VmError::Runtime(format!("unknown HITL topic '{other}'"))),
1083 }
1084}
1085
1086fn event_matches_request(event: &LogEvent, request_id: &str) -> bool {
1087 event
1088 .headers
1089 .get("request_id")
1090 .is_some_and(|value| value == request_id)
1091 || event
1092 .payload
1093 .get("request_id")
1094 .and_then(JsonValue::as_str)
1095 .is_some_and(|value| value == request_id)
1096}
1097
1098fn approval_record_value(progress: &ApprovalProgress) -> VmValue {
1099 crate::stdlib::json_to_vm_value(&json!({
1100 "approved": true,
1101 "reviewers": progress.reviewers.iter().cloned().collect::<Vec<_>>(),
1102 "approved_at": progress.approved_at.clone().unwrap_or_else(now_rfc3339),
1103 "reason": progress.reason,
1104 }))
1105}
1106
1107fn approval_denied_error(request_id: &str, response: HitlHostResponse) -> VmError {
1108 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1109 "name": "ApprovalDeniedError",
1110 "category": "generic",
1111 "message": response.reason.clone().unwrap_or_else(|| "approval was denied".to_string()),
1112 "request_id": request_id,
1113 "reviewers": response.reviewer.into_iter().collect::<Vec<_>>(),
1114 "reason": response.reason,
1115 })))
1116}
1117
1118fn timeout_error(request_id: &str, kind: HitlRequestKind) -> VmError {
1119 let _ = categorized_error("HITL timed out", ErrorCategory::Timeout);
1120 VmError::Thrown(crate::stdlib::json_to_vm_value(&json!({
1121 "name": "HumanTimeoutError",
1122 "category": ErrorCategory::Timeout.as_str(),
1123 "message": format!("{} timed out", kind.as_str()),
1124 "request_id": request_id,
1125 "kind": kind.as_str(),
1126 })))
1127}
1128
1129fn coerce_like_default(value: &VmValue, default: &VmValue) -> VmValue {
1130 match default {
1131 VmValue::Int(_) => match value {
1132 VmValue::Int(_) => value.clone(),
1133 VmValue::Float(number) => VmValue::Int(*number as i64),
1134 VmValue::String(text) => text
1135 .parse::<i64>()
1136 .map(VmValue::Int)
1137 .unwrap_or_else(|_| default.clone()),
1138 _ => default.clone(),
1139 },
1140 VmValue::Float(_) => match value {
1141 VmValue::Float(_) => value.clone(),
1142 VmValue::Int(number) => VmValue::Float(*number as f64),
1143 VmValue::String(text) => text
1144 .parse::<f64>()
1145 .map(VmValue::Float)
1146 .unwrap_or_else(|_| default.clone()),
1147 _ => default.clone(),
1148 },
1149 VmValue::Bool(_) => match value {
1150 VmValue::Bool(_) => value.clone(),
1151 VmValue::String(text) if text.eq_ignore_ascii_case("true") => VmValue::Bool(true),
1152 VmValue::String(text) if text.eq_ignore_ascii_case("false") => VmValue::Bool(false),
1153 _ => default.clone(),
1154 },
1155 VmValue::String(_) => VmValue::String(Rc::from(value.display())),
1156 VmValue::Duration(_) => match value {
1157 VmValue::Duration(_) => value.clone(),
1158 VmValue::Int(ms) if *ms >= 0 => VmValue::Duration(*ms as u64),
1159 _ => default.clone(),
1160 },
1161 VmValue::Nil => value.clone(),
1162 _ => {
1163 if value.type_name() == default.type_name() {
1164 value.clone()
1165 } else {
1166 default.clone()
1167 }
1168 }
1169 }
1170}
1171
1172fn log_error(error: impl std::fmt::Display) -> VmError {
1173 VmError::Runtime(error.to_string())
1174}
1175
1176fn is_replay() -> bool {
1177 std::env::var("HARN_REPLAY")
1178 .ok()
1179 .is_some_and(|value| !value.trim().is_empty() && value != "0")
1180}
1181
1182fn now_rfc3339() -> String {
1183 OffsetDateTime::now_utc()
1184 .format(&Rfc3339)
1185 .unwrap_or_else(|_| OffsetDateTime::now_utc().to_string())
1186}
1187
1188fn new_trace_id() -> String {
1189 format!("trace_{}", Uuid::now_v7())
1190}
1191
1192fn vm_bool(value: &VmValue) -> Option<bool> {
1193 match value {
1194 VmValue::Bool(flag) => Some(*flag),
1195 _ => None,
1196 }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201 use super::{
1202 HITL_APPROVALS_TOPIC, HITL_DUAL_CONTROL_TOPIC, HITL_ESCALATIONS_TOPIC, HITL_QUESTIONS_TOPIC,
1203 };
1204 use crate::event_log::{install_default_for_base_dir, EventLog, Topic};
1205 use crate::{compile_source, register_vm_stdlib, reset_thread_local_state, Vm, VmError};
1206
1207 async fn execute_hitl_script(
1208 base_dir: &std::path::Path,
1209 source: &str,
1210 ) -> Result<(String, Vec<String>, Vec<String>, Vec<String>, Vec<String>), VmError> {
1211 reset_thread_local_state();
1212 let log = install_default_for_base_dir(base_dir).expect("install event log");
1213 let chunk = compile_source(source).expect("compile source");
1214 let mut vm = Vm::new();
1215 register_vm_stdlib(&mut vm);
1216 vm.set_source_dir(base_dir);
1217 vm.execute(&chunk).await?;
1218 let output = vm.output().trim_end().to_string();
1219 let question_events = event_kinds(log.clone(), HITL_QUESTIONS_TOPIC).await;
1220 let approval_events = event_kinds(log.clone(), HITL_APPROVALS_TOPIC).await;
1221 let dual_control_events = event_kinds(log.clone(), HITL_DUAL_CONTROL_TOPIC).await;
1222 let escalation_events = event_kinds(log, HITL_ESCALATIONS_TOPIC).await;
1223 Ok((
1224 output,
1225 question_events,
1226 approval_events,
1227 dual_control_events,
1228 escalation_events,
1229 ))
1230 }
1231
1232 async fn event_kinds(
1233 log: std::sync::Arc<crate::event_log::AnyEventLog>,
1234 topic: &str,
1235 ) -> Vec<String> {
1236 log.read_range(&Topic::new(topic).expect("valid topic"), None, usize::MAX)
1237 .await
1238 .expect("read topic")
1239 .into_iter()
1240 .map(|(_, event)| event.kind)
1241 .collect()
1242 }
1243
1244 #[tokio::test(flavor = "current_thread")]
1245 async fn ask_user_coerces_to_default_type_and_logs_events() {
1246 tokio::task::LocalSet::new()
1247 .run_until(async {
1248 let dir = tempfile::tempdir().expect("tempdir");
1249 let source = r#"
1250pipeline test(task) {
1251 host_mock("hitl", "question", {answer: "9"})
1252 let answer: int = ask_user("Pick a number", {default: 0})
1253 println(answer)
1254}
1255"#;
1256 let (
1257 output,
1258 question_events,
1259 approval_events,
1260 dual_control_events,
1261 escalation_events,
1262 ) = execute_hitl_script(dir.path(), source)
1263 .await
1264 .expect("script succeeds");
1265 assert_eq!(output, "9");
1266 assert_eq!(
1267 question_events,
1268 vec![
1269 "hitl.question_asked".to_string(),
1270 "hitl.response_received".to_string()
1271 ]
1272 );
1273 assert!(approval_events.is_empty());
1274 assert!(dual_control_events.is_empty());
1275 assert!(escalation_events.is_empty());
1276 })
1277 .await;
1278 }
1279
1280 #[tokio::test(flavor = "current_thread")]
1281 async fn request_approval_waits_for_quorum_and_emits_a_record() {
1282 tokio::task::LocalSet::new()
1283 .run_until(async {
1284 let dir = tempfile::tempdir().expect("tempdir");
1285 let source = r#"
1286pipeline test(task) {
1287 host_mock("hitl", "approval", [
1288 {approved: true, reviewer: "alice", reason: "ok"},
1289 {approved: true, reviewer: "bob", reason: "ship it"},
1290 ])
1291 let record = request_approval(
1292 "deploy production",
1293 {quorum: 2, reviewers: ["alice", "bob", "carol"]},
1294 )
1295 println(record.approved)
1296 println(len(record.reviewers))
1297 println(record.reviewers[0])
1298 println(record.reviewers[1])
1299}
1300"#;
1301 let (_, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1302 .await
1303 .expect("script succeeds");
1304 assert_eq!(
1305 approval_events,
1306 vec![
1307 "hitl.approval_requested".to_string(),
1308 "hitl.response_received".to_string(),
1309 "hitl.response_received".to_string(),
1310 "hitl.approval_approved".to_string(),
1311 ]
1312 );
1313 })
1314 .await;
1315 }
1316
1317 #[tokio::test(flavor = "current_thread")]
1318 async fn request_approval_surfaces_denials_as_typed_errors() {
1319 tokio::task::LocalSet::new()
1320 .run_until(async {
1321 let dir = tempfile::tempdir().expect("tempdir");
1322 let source = r#"
1323pipeline test(task) {
1324 host_mock("hitl", "approval", {approved: false, reviewer: "alice", reason: "unsafe"})
1325 let denied = try {
1326 request_approval("drop table", {reviewers: ["alice"]})
1327 }
1328 println(is_err(denied))
1329 println(unwrap_err(denied).name)
1330 println(unwrap_err(denied).reason)
1331}
1332"#;
1333 let (output, _, approval_events, _, _) = execute_hitl_script(dir.path(), source)
1334 .await
1335 .expect("script succeeds");
1336 assert_eq!(output, "true\nApprovalDeniedError\nunsafe");
1337 assert_eq!(
1338 approval_events,
1339 vec![
1340 "hitl.approval_requested".to_string(),
1341 "hitl.response_received".to_string(),
1342 "hitl.approval_denied".to_string(),
1343 ]
1344 );
1345 })
1346 .await;
1347 }
1348
1349 #[tokio::test(flavor = "current_thread")]
1350 async fn dual_control_executes_action_after_quorum() {
1351 tokio::task::LocalSet::new()
1352 .run_until(async {
1353 let dir = tempfile::tempdir().expect("tempdir");
1354 let source = r#"
1355pipeline test(task) {
1356 host_mock("hitl", "dual_control", [
1357 {approved: true, reviewer: "alice"},
1358 {approved: true, reviewer: "bob"},
1359 ])
1360 let result = dual_control(2, 3, { -> "launched" }, ["alice", "bob", "carol"])
1361 println(result)
1362}
1363"#;
1364 let (output, _, _, dual_control_events, _) =
1365 execute_hitl_script(dir.path(), source)
1366 .await
1367 .expect("script succeeds");
1368 assert_eq!(output, "launched");
1369 assert_eq!(
1370 dual_control_events,
1371 vec![
1372 "hitl.dual_control_requested".to_string(),
1373 "hitl.response_received".to_string(),
1374 "hitl.response_received".to_string(),
1375 "hitl.dual_control_approved".to_string(),
1376 "hitl.dual_control_executed".to_string(),
1377 ]
1378 );
1379 })
1380 .await;
1381 }
1382
1383 #[tokio::test(flavor = "current_thread")]
1384 async fn escalate_to_waits_for_acceptance_event() {
1385 tokio::task::LocalSet::new()
1386 .run_until(async {
1387 let dir = tempfile::tempdir().expect("tempdir");
1388 let source = r#"
1389pipeline test(task) {
1390 host_mock("hitl", "escalation", {accepted: true, reviewer: "lead", reason: "taking over"})
1391 let handle = escalate_to("admin", "need override")
1392 println(handle.status)
1393 println(handle.reviewer)
1394}
1395"#;
1396 let (output, _, _, _, escalation_events) = execute_hitl_script(dir.path(), source)
1397 .await
1398 .expect("script succeeds");
1399 assert_eq!(output, "accepted\nlead");
1400 assert_eq!(
1401 escalation_events,
1402 vec![
1403 "hitl.escalation_issued".to_string(),
1404 "hitl.escalation_accepted".to_string(),
1405 ]
1406 );
1407 })
1408 .await;
1409 }
1410}