1mod active_registry;
4mod control;
5mod run_request;
6mod runner;
7
8use std::sync::Arc;
9
10use awaken_contract::contract::mailbox::{
11 LiveRunCommand, LiveRunCommandEntry, LiveRunTarget, MailboxStore,
12};
13use awaken_contract::contract::storage::ThreadRunStore;
14
15use crate::error::RuntimeError;
16#[cfg(feature = "a2a")]
17use crate::registry::composite::CompositeAgentSpecRegistry;
18use awaken_contract::contract::message::Message;
19use awaken_contract::contract::suspension::ToolCallResume;
20use futures::StreamExt;
21use futures::channel::mpsc;
22
23use crate::cancellation::CancellationToken;
24use crate::inbox::InboxSender;
25use crate::registry::{
26 AgentResolver, ExecutionResolver, LocalExecutionResolver, RegistryHandle, RegistrySet,
27 RegistrySnapshot,
28};
29
30pub use run_request::{RunRequest, ThreadContextSnapshot};
31
32use active_registry::ActiveRunRegistry;
33
34pub(crate) type DecisionBatch = Vec<(String, ToolCallResume)>;
35
36#[derive(Clone)]
45pub(crate) struct RunHandle {
46 pub(crate) run_id: String,
47 pub(crate) dispatch_id: Option<String>,
48 cancellation_token: CancellationToken,
49 live_forwarder_token: CancellationToken,
50 decision_tx: mpsc::UnboundedSender<DecisionBatch>,
51 inbox_tx: Option<InboxSender>,
52}
53
54impl RunHandle {
55 pub(crate) fn cancel(&self) {
57 self.cancellation_token.cancel();
58 }
59
60 pub(crate) fn stop_live_forwarder(&self) {
61 self.live_forwarder_token.cancel();
62 }
63
64 pub(crate) fn send_decisions(
66 &self,
67 decisions: DecisionBatch,
68 ) -> Result<(), Box<mpsc::TrySendError<DecisionBatch>>> {
69 self.decision_tx.unbounded_send(decisions).map_err(Box::new)
70 }
71
72 pub(crate) fn send_decision(
74 &self,
75 call_id: String,
76 resume: ToolCallResume,
77 ) -> Result<(), Box<mpsc::TrySendError<DecisionBatch>>> {
78 self.send_decisions(vec![(call_id, resume)])
79 }
80
81 pub(crate) fn send_messages(&self, messages: Vec<Message>) -> bool {
83 let Some(inbox_tx) = self.inbox_tx.as_ref() else {
84 return false;
85 };
86 if messages.is_empty() || inbox_tx.is_closed() {
87 return false;
88 }
89 inbox_tx.try_send(crate::inbox::inbox_messages_payload(messages))
90 }
91}
92
93pub struct AgentRuntime {
102 pub(crate) resolver: Arc<dyn ExecutionResolver>,
103 pub(crate) storage: Option<Arc<dyn ThreadRunStore>>,
104 pub(crate) profile_store:
105 Option<Arc<dyn awaken_contract::contract::profile_store::ProfileStore>>,
106 pub(crate) mailbox_store: Option<Arc<dyn MailboxStore>>,
107 pub(crate) active_runs: ActiveRunRegistry,
108 pub(crate) registry_handle: Option<RegistryHandle>,
109 missing_mailbox_store_warned: std::sync::atomic::AtomicBool,
113 #[cfg(feature = "a2a")]
114 composite_registry: Option<Arc<CompositeAgentSpecRegistry>>,
115}
116
117impl AgentRuntime {
118 pub fn new(resolver: Arc<dyn AgentResolver>) -> Self {
119 Self::new_with_execution_resolver(Arc::new(LocalExecutionResolver::new(resolver)))
120 }
121
122 pub fn new_with_execution_resolver(resolver: Arc<dyn ExecutionResolver>) -> Self {
123 Self {
124 resolver,
125 storage: None,
126 profile_store: None,
127 mailbox_store: None,
128 active_runs: ActiveRunRegistry::new(),
129 registry_handle: None,
130 missing_mailbox_store_warned: std::sync::atomic::AtomicBool::new(false),
131 #[cfg(feature = "a2a")]
132 composite_registry: None,
133 }
134 }
135
136 #[must_use]
137 pub fn with_registry_handle(mut self, handle: RegistryHandle) -> Self {
138 self.registry_handle = Some(handle);
139 self
140 }
141
142 #[must_use]
143 pub fn with_thread_run_store(mut self, store: Arc<dyn ThreadRunStore>) -> Self {
144 self.storage = Some(store);
145 self
146 }
147
148 #[must_use]
152 pub fn with_mailbox_store(mut self, store: Arc<dyn MailboxStore>) -> Self {
153 self.mailbox_store = Some(store);
154 self
155 }
156
157 #[must_use]
158 pub(crate) fn with_profile_store(
159 mut self,
160 store: Arc<dyn awaken_contract::contract::profile_store::ProfileStore>,
161 ) -> Self {
162 self.profile_store = Some(store);
163 self
164 }
165
166 pub fn resolver(&self) -> &dyn AgentResolver {
167 self.resolver.as_ref()
168 }
169
170 pub fn resolver_arc(&self) -> Arc<dyn AgentResolver> {
172 self.resolver.clone()
173 }
174
175 pub fn execution_resolver(&self) -> &dyn ExecutionResolver {
176 self.resolver.as_ref()
177 }
178
179 pub fn execution_resolver_arc(&self) -> Arc<dyn ExecutionResolver> {
180 self.resolver.clone()
181 }
182
183 pub fn registry_handle(&self) -> Option<RegistryHandle> {
184 self.registry_handle.clone()
185 }
186
187 pub fn registry_snapshot(&self) -> Option<RegistrySnapshot> {
188 self.registry_handle.as_ref().map(RegistryHandle::snapshot)
189 }
190
191 pub fn registry_version(&self) -> Option<u64> {
192 self.registry_handle.as_ref().map(RegistryHandle::version)
193 }
194
195 pub fn registry_set(&self) -> Option<RegistrySet> {
196 self.registry_snapshot()
197 .map(RegistrySnapshot::into_registries)
198 }
199
200 pub fn replace_registry_set(&self, registries: RegistrySet) -> Option<u64> {
201 self.registry_handle
202 .as_ref()
203 .map(|handle| handle.replace(registries))
204 }
205
206 #[cfg(feature = "a2a")]
207 #[must_use]
208 pub fn with_composite_registry(mut self, registry: Arc<CompositeAgentSpecRegistry>) -> Self {
209 self.composite_registry = Some(registry);
210 self
211 }
212
213 #[cfg(feature = "a2a")]
215 pub fn composite_registry(&self) -> Option<&Arc<CompositeAgentSpecRegistry>> {
216 self.composite_registry.as_ref()
217 }
218
219 #[cfg(feature = "a2a")]
222 pub async fn initialize(&self) -> Result<(), RuntimeError> {
223 if let Some(composite) = &self.composite_registry {
224 composite
225 .discover()
226 .await
227 .map_err(|e| RuntimeError::ResolveFailed {
228 message: format!("remote agent discovery failed: {e}"),
229 })?;
230 }
231 Ok(())
232 }
233
234 pub fn thread_run_store(&self) -> Option<&dyn ThreadRunStore> {
235 self.storage.as_deref()
236 }
237
238 #[cfg(test)]
242 pub(crate) fn create_run_channels(
243 &self,
244 run_id: String,
245 ) -> (
246 RunHandle,
247 CancellationToken,
248 mpsc::UnboundedReceiver<DecisionBatch>,
249 ) {
250 self.create_run_channels_with_inbox(run_id, None, None)
251 }
252
253 pub(crate) fn create_run_channels_with_inbox(
254 &self,
255 run_id: String,
256 dispatch_id: Option<String>,
257 inbox_tx: Option<InboxSender>,
258 ) -> (
259 RunHandle,
260 CancellationToken,
261 mpsc::UnboundedReceiver<DecisionBatch>,
262 ) {
263 let token = CancellationToken::new();
264 let live_forwarder_token = CancellationToken::new();
265 let (tx, rx) = mpsc::unbounded();
266
267 let handle = RunHandle {
268 run_id,
269 dispatch_id,
270 cancellation_token: token.clone(),
271 live_forwarder_token,
272 decision_tx: tx,
273 inbox_tx,
274 };
275
276 (handle, token, rx)
277 }
278
279 pub(crate) fn register_run(
285 &self,
286 thread_id: &str,
287 handle: RunHandle,
288 ) -> Result<(), RuntimeError> {
289 let run_id = handle.run_id.clone();
290 let dispatch_id = handle.dispatch_id.clone();
291 let forwarder_inputs = self.mailbox_store.as_ref().map(|store| {
292 (
293 Arc::clone(store),
294 handle.inbox_tx.clone(),
295 handle.cancellation_token.clone(),
296 handle.live_forwarder_token.clone(),
297 handle.decision_tx.clone(),
298 )
299 });
300 if !self.active_runs.register(&run_id, thread_id, handle) {
301 return Err(RuntimeError::ThreadAlreadyRunning {
302 thread_id: thread_id.to_string(),
303 });
304 }
305 if let Some((store, inbox_tx, token, forwarder_token, decision_tx)) = forwarder_inputs {
306 let thread_id = thread_id.to_string();
307 let mut target = LiveRunTarget::new(thread_id.clone(), run_id.clone());
308 if let Some(dispatch_id) = dispatch_id {
309 target = target.with_dispatch_id(dispatch_id);
310 }
311 tokio::spawn(async move {
312 run_live_forwarder(store, target, inbox_tx, token, forwarder_token, decision_tx)
313 .await;
314 });
315 } else if !self
316 .missing_mailbox_store_warned
317 .swap(true, std::sync::atomic::Ordering::Relaxed)
318 {
319 tracing::warn!(
320 "AgentRuntime has no mailbox_store wired: cross-node live steering \
321 (LiveRunCommand) will always fall through to durable queue. Call \
322 `AgentRuntime::with_mailbox_store(store)` on multi-node deployments."
323 );
324 }
325 Ok(())
326 }
327
328 pub(crate) fn unregister_run(&self, run_id: &str) {
330 self.active_runs.unregister(run_id);
331 }
332}
333
334async fn run_live_forwarder(
343 store: Arc<dyn MailboxStore>,
344 target: LiveRunTarget,
345 inbox_tx: Option<InboxSender>,
346 cancellation_token: CancellationToken,
347 live_forwarder_token: CancellationToken,
348 decision_tx: mpsc::UnboundedSender<DecisionBatch>,
349) {
350 let mut stream = match store.open_live_channel_for(&target).await {
351 Ok(s) => s,
352 Err(err) => {
353 tracing::warn!(
354 thread_id = %target.thread_id,
355 run_id = %target.run_id,
356 dispatch_id = ?target.dispatch_id,
357 error = %err,
358 "live channel subscribe failed"
359 );
360 return;
361 }
362 };
363
364 loop {
365 if live_forwarder_token.is_cancelled() {
366 break;
367 }
368 let next = tokio::select! {
369 biased;
370 _ = live_forwarder_token.cancelled() => break,
371 next = stream.next() => next,
372 };
373 let Some(LiveRunCommandEntry { command, receipt }) = next else {
374 break;
375 };
376 match command {
377 LiveRunCommand::Messages(messages) => {
378 let Some(tx) = inbox_tx.as_ref() else {
379 drop(receipt);
383 continue;
384 };
385 if tx.is_closed() {
386 drop(receipt);
387 break;
388 }
389 if tx.try_send(crate::inbox::inbox_messages_payload(messages)) {
390 receipt.ack();
391 } else {
392 drop(receipt);
395 }
396 }
397 LiveRunCommand::Cancel => {
398 cancellation_token.cancel();
399 receipt.ack();
402 break;
403 }
404 LiveRunCommand::Decision(decisions) => {
405 if decision_tx.is_closed() {
406 drop(receipt);
407 break;
408 }
409 if decision_tx.unbounded_send(decisions).is_ok() {
410 receipt.ack();
411 } else {
412 drop(receipt);
413 }
414 }
415 _ => {
416 tracing::error!(
423 thread_id = %target.thread_id,
424 run_id = %target.run_id,
425 dispatch_id = ?target.dispatch_id,
426 "unsupported live run command received; cancelling run to avoid silent divergence"
427 );
428 cancellation_token.cancel();
429 drop(receipt);
430 break;
431 }
432 }
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use std::sync::Arc;
440
441 use awaken_contract::contract::suspension::{ResumeDecisionAction, ToolCallResume};
442 use serde_json::Value;
443
444 struct StubResolver;
445 impl crate::registry::AgentResolver for StubResolver {
446 fn resolve(
447 &self,
448 agent_id: &str,
449 ) -> Result<crate::registry::ResolvedAgent, crate::error::RuntimeError> {
450 Err(crate::error::RuntimeError::AgentNotFound {
451 agent_id: agent_id.to_string(),
452 })
453 }
454 }
455
456 fn make_runtime() -> AgentRuntime {
457 AgentRuntime::new(Arc::new(StubResolver))
458 }
459
460 fn make_resume() -> ToolCallResume {
461 ToolCallResume {
462 decision_id: "d1".into(),
463 action: ResumeDecisionAction::Resume,
464 result: Value::Null,
465 reason: None,
466 updated_at: 0,
467 }
468 }
469
470 #[test]
471 fn new_creates_runtime() {
472 let rt = make_runtime();
473 assert!(rt.storage.is_none());
474 assert!(rt.profile_store.is_none());
475 assert!(rt.registry_handle().is_none());
476 }
477
478 #[test]
479 fn resolver_returns_ref() {
480 let rt = make_runtime();
481 let err = rt.resolver().resolve("any").unwrap_err();
483 assert!(
484 matches!(err, crate::error::RuntimeError::AgentNotFound { .. }),
485 "expected AgentNotFound, got {err:?}"
486 );
487 }
488
489 #[test]
490 fn resolver_arc_returns_clone() {
491 let rt = make_runtime();
492 let arc = rt.resolver_arc();
493 let err = arc.resolve("x").unwrap_err();
494 assert!(matches!(
495 err,
496 crate::error::RuntimeError::AgentNotFound { .. }
497 ));
498 }
499
500 #[test]
501 fn with_thread_run_store_sets_store() {
502 let store = Arc::new(awaken_stores::InMemoryStore::new());
503 let rt = make_runtime().with_thread_run_store(store);
504 assert!(rt.thread_run_store().is_some());
505 }
506
507 #[test]
508 fn thread_run_store_none_by_default() {
509 let rt = make_runtime();
510 assert!(rt.thread_run_store().is_none());
511 }
512
513 #[test]
514 fn create_run_channels_returns_triple() {
515 let rt = make_runtime();
516 let (handle, token, _rx) = rt.create_run_channels("run-1".into());
517 assert_eq!(handle.run_id, "run-1");
518 assert!(!token.is_cancelled());
519 }
520
521 #[test]
522 fn register_run_succeeds() {
523 let rt = make_runtime();
524 let (handle, _token, _rx) = rt.create_run_channels("run-1".into());
525 assert!(rt.register_run("thread-1", handle).is_ok());
526 }
527
528 #[test]
529 fn register_run_fails_for_same_thread() {
530 let rt = make_runtime();
531 let (h1, _, _rx1) = rt.create_run_channels("run-1".into());
532 let (h2, _, _rx2) = rt.create_run_channels("run-2".into());
533 rt.register_run("thread-1", h1).unwrap();
534 let err = rt.register_run("thread-1", h2).unwrap_err();
535 assert!(
536 matches!(err, RuntimeError::ThreadAlreadyRunning { ref thread_id } if thread_id == "thread-1"),
537 "expected ThreadAlreadyRunning, got {err:?}"
538 );
539 }
540
541 #[test]
542 fn unregister_run_allows_reregistration() {
543 let rt = make_runtime();
544 let (h1, _, _rx1) = rt.create_run_channels("run-1".into());
545 rt.register_run("thread-1", h1).unwrap();
546 rt.unregister_run("run-1");
547
548 let (h2, _, _rx2) = rt.create_run_channels("run-2".into());
549 assert!(rt.register_run("thread-1", h2).is_ok());
550 }
551
552 #[test]
553 fn run_handle_cancel() {
554 let rt = make_runtime();
555 let (handle, token, _rx) = rt.create_run_channels("run-1".into());
556 assert!(!token.is_cancelled());
557 handle.cancel();
558 assert!(token.is_cancelled());
559 }
560
561 #[test]
562 fn run_handle_send_decisions() {
563 let rt = make_runtime();
564 let (handle, _token, mut rx) = rt.create_run_channels("run-1".into());
565 let decisions = vec![("call-1".into(), make_resume())];
566 handle.send_decisions(decisions).unwrap();
567
568 let batch = rx.try_recv().unwrap();
570 assert_eq!(batch.len(), 1);
571 assert_eq!(batch[0].0, "call-1");
572 }
573
574 #[test]
575 fn run_handle_send_decision_single() {
576 let rt = make_runtime();
577 let (handle, _token, mut rx) = rt.create_run_channels("run-1".into());
578 handle
579 .send_decision("call-2".into(), make_resume())
580 .unwrap();
581
582 let batch = rx.try_recv().unwrap();
583 assert_eq!(batch.len(), 1);
584 assert_eq!(batch[0].0, "call-2");
585 }
586
587 #[test]
588 fn run_handle_send_decisions_closed_channel() {
589 let rt = make_runtime();
590 let (handle, _token, rx) = rt.create_run_channels("run-1".into());
591 drop(rx);
593
594 let result = handle.send_decisions(vec![("call-1".into(), make_resume())]);
595 assert!(result.is_err(), "send should fail when receiver is dropped");
596 }
597
598 mod live_forwarder {
601 use super::*;
602 use awaken_contract::contract::mailbox::LiveRunCommand;
603 use awaken_stores::InMemoryMailboxStore;
604 use std::time::Duration;
605
606 async fn settle() {
613 tokio::time::sleep(Duration::from_millis(20)).await;
617 }
618
619 #[tokio::test]
620 async fn messages_variant_lands_in_inbox() {
621 let store = Arc::new(InMemoryMailboxStore::new());
622 let rt = make_runtime().with_mailbox_store(store.clone());
623 let (inbox_tx, mut inbox_rx) = crate::inbox::inbox_channel();
624 let (handle, _token, _rx) =
625 rt.create_run_channels_with_inbox("run-1".into(), None, Some(inbox_tx));
626 rt.register_run("thread-1", handle).unwrap();
627 settle().await;
628
629 store
630 .deliver_live_to(
631 &LiveRunTarget::new("thread-1", "run-1"),
632 LiveRunCommand::Messages(vec![Message::user("live-1")]),
633 )
634 .await
635 .unwrap();
636
637 let mut received = None;
638 for _ in 0..50 {
639 if let Some(json) = inbox_rx.try_recv() {
640 received = Some(json);
641 break;
642 }
643 tokio::time::sleep(Duration::from_millis(10)).await;
644 }
645 let payload = received.expect("forwarder must deliver Messages within 500ms");
646 let messages = crate::inbox::inbox_payload_messages(&payload);
647 assert_eq!(messages.len(), 1);
648 assert_eq!(messages[0].text(), "live-1");
649 }
650
651 #[tokio::test]
652 async fn cancel_variant_triggers_token() {
653 let store = Arc::new(InMemoryMailboxStore::new());
654 let rt = make_runtime().with_mailbox_store(store.clone());
655 let (handle, token, _rx) = rt.create_run_channels("run-1".into());
656 rt.register_run("thread-1", handle).unwrap();
657 settle().await;
658
659 store
660 .deliver_live_to(
661 &LiveRunTarget::new("thread-1", "run-1"),
662 LiveRunCommand::Cancel,
663 )
664 .await
665 .unwrap();
666
667 let mut cancelled = false;
668 for _ in 0..50 {
669 if token.is_cancelled() {
670 cancelled = true;
671 break;
672 }
673 tokio::time::sleep(Duration::from_millis(10)).await;
674 }
675 assert!(cancelled, "forwarder must cancel token within 500ms");
676 }
677
678 #[tokio::test]
679 async fn decision_variant_lands_on_decision_channel() {
680 let store = Arc::new(InMemoryMailboxStore::new());
681 let rt = make_runtime().with_mailbox_store(store.clone());
682 let (handle, _token, mut rx) = rt.create_run_channels("run-1".into());
683 rt.register_run("thread-1", handle).unwrap();
684 settle().await;
685
686 let decisions = vec![("call-1".into(), make_resume())];
687 store
688 .deliver_live_to(
689 &LiveRunTarget::new("thread-1", "run-1"),
690 LiveRunCommand::Decision(decisions),
691 )
692 .await
693 .unwrap();
694
695 let mut got = None;
696 for _ in 0..50 {
697 if let Ok(batch) = rx.try_recv() {
698 got = Some(batch);
699 break;
700 }
701 tokio::time::sleep(Duration::from_millis(10)).await;
702 }
703 let batch = got.expect("forwarder must deliver Decision within 500ms");
704 assert_eq!(batch.len(), 1);
705 assert_eq!(batch[0].0, "call-1");
706 }
707
708 #[tokio::test]
709 async fn no_store_wired_no_forwarder_runs() {
710 let detached_store = InMemoryMailboxStore::new();
713 let rt = make_runtime(); let (inbox_tx, mut inbox_rx) = crate::inbox::inbox_channel();
715 let (handle, token, _rx) =
716 rt.create_run_channels_with_inbox("run-1".into(), None, Some(inbox_tx));
717 rt.register_run("thread-1", handle).unwrap();
718 settle().await;
719
720 detached_store
721 .deliver_live(
722 "thread-1",
723 LiveRunCommand::Messages(vec![Message::user("ignored")]),
724 )
725 .await
726 .unwrap();
727 tokio::time::sleep(Duration::from_millis(100)).await;
728
729 assert!(inbox_rx.try_recv().is_none());
730 assert!(!token.is_cancelled());
731 }
732
733 #[tokio::test]
734 async fn separate_threads_isolated() {
735 let store = Arc::new(InMemoryMailboxStore::new());
736 let rt = make_runtime().with_mailbox_store(store.clone());
737
738 let (tx_a, mut rx_a) = crate::inbox::inbox_channel();
739 let (tx_b, mut rx_b) = crate::inbox::inbox_channel();
740 let (h_a, _tok_a, _dec_a) =
741 rt.create_run_channels_with_inbox("run-a".into(), None, Some(tx_a));
742 let (h_b, _tok_b, _dec_b) =
743 rt.create_run_channels_with_inbox("run-b".into(), None, Some(tx_b));
744 rt.register_run("thread-a", h_a).unwrap();
745 rt.register_run("thread-b", h_b).unwrap();
746 settle().await;
747
748 store
749 .deliver_live_to(
750 &LiveRunTarget::new("thread-a", "run-a"),
751 LiveRunCommand::Messages(vec![Message::user("for-a")]),
752 )
753 .await
754 .unwrap();
755 tokio::time::sleep(Duration::from_millis(80)).await;
756
757 assert!(rx_a.try_recv().is_some(), "thread-a must receive");
758 assert!(
759 rx_b.try_recv().is_none(),
760 "thread-b must not receive thread-a's message"
761 );
762 }
763
764 #[tokio::test]
765 async fn unregister_stops_live_forwarder_subscription() {
766 let store = Arc::new(InMemoryMailboxStore::new());
767 let rt = make_runtime().with_mailbox_store(store.clone());
768 let (handle, _token, _rx) = rt.create_run_channels("run-1".into());
769 rt.register_run("thread-1", handle).unwrap();
770 settle().await;
771
772 rt.unregister_run("run-1");
773 let target = LiveRunTarget::new("thread-1", "run-1");
774 let mut outcome = store
775 .deliver_live_to(&target, LiveRunCommand::Cancel)
776 .await
777 .unwrap();
778 for _ in 0..50 {
779 if outcome == awaken_contract::contract::mailbox::LiveDeliveryOutcome::NoSubscriber
780 {
781 break;
782 }
783 tokio::time::sleep(Duration::from_millis(10)).await;
784 outcome = store
785 .deliver_live_to(&target, LiveRunCommand::Cancel)
786 .await
787 .unwrap();
788 }
789 assert_eq!(
790 outcome,
791 awaken_contract::contract::mailbox::LiveDeliveryOutcome::NoSubscriber,
792 "unregister must stop the old live forwarder"
793 );
794 }
795
796 #[tokio::test]
797 async fn cancel_then_messages_messages_not_processed() {
798 let store = Arc::new(InMemoryMailboxStore::new());
803 let rt = make_runtime().with_mailbox_store(store.clone());
804 let (inbox_tx, mut inbox_rx) = crate::inbox::inbox_channel();
805 let (handle, token, _rx) =
806 rt.create_run_channels_with_inbox("run-1".into(), None, Some(inbox_tx));
807 rt.register_run("thread-1", handle).unwrap();
808 settle().await;
809
810 store
811 .deliver_live_to(
812 &LiveRunTarget::new("thread-1", "run-1"),
813 LiveRunCommand::Cancel,
814 )
815 .await
816 .unwrap();
817 for _ in 0..50 {
819 if token.is_cancelled() {
820 break;
821 }
822 tokio::time::sleep(Duration::from_millis(10)).await;
823 }
824 assert!(token.is_cancelled());
825
826 store
827 .deliver_live_to(
828 &LiveRunTarget::new("thread-1", "run-1"),
829 LiveRunCommand::Messages(vec![Message::user("too-late")]),
830 )
831 .await
832 .unwrap();
833 tokio::time::sleep(Duration::from_millis(80)).await;
834 assert!(
835 inbox_rx.try_recv().is_none(),
836 "forwarder must have exited after Cancel"
837 );
838 }
839 }
840}