Skip to main content

awaken_runtime/runtime/agent_runtime/
mod.rs

1//! Agent runtime: top-level orchestrator for run management, routing, and control.
2
3mod active_registry;
4mod control;
5mod run_request;
6mod runner;
7
8use std::sync::Arc;
9
10use awaken_contract::contract::mailbox::{
11    LiveRunCommand, LiveRunCommandEntry, LiveRunTarget, MailboxStore,
12};
13use awaken_contract::contract::storage::ThreadRunStore;
14
15use crate::error::RuntimeError;
16#[cfg(feature = "a2a")]
17use crate::registry::composite::CompositeAgentSpecRegistry;
18use awaken_contract::contract::message::Message;
19use awaken_contract::contract::suspension::ToolCallResume;
20use futures::StreamExt;
21use futures::channel::mpsc;
22
23use crate::cancellation::CancellationToken;
24use crate::inbox::InboxSender;
25use crate::registry::{
26    AgentResolver, ExecutionResolver, LocalExecutionResolver, RegistryHandle, RegistrySet,
27    RegistrySnapshot,
28};
29
30pub use run_request::{RunRequest, ThreadContextSnapshot};
31
32use active_registry::ActiveRunRegistry;
33
34pub(crate) type DecisionBatch = Vec<(String, ToolCallResume)>;
35
36// ---------------------------------------------------------------------------
37// RunHandle
38// ---------------------------------------------------------------------------
39
40/// Internal control handle for a running agent loop.
41///
42/// Stored in `ActiveRunRegistry` for the lifetime of a run.
43/// External control is exposed via `AgentRuntime::cancel()` / `send_decisions()`.
44#[derive(Clone)]
45pub(crate) struct RunHandle {
46    pub(crate) run_id: String,
47    pub(crate) dispatch_id: Option<String>,
48    cancellation_token: CancellationToken,
49    live_forwarder_token: CancellationToken,
50    decision_tx: mpsc::UnboundedSender<DecisionBatch>,
51    inbox_tx: Option<InboxSender>,
52}
53
54impl RunHandle {
55    /// Cancel the running agent loop cooperatively.
56    pub(crate) fn cancel(&self) {
57        self.cancellation_token.cancel();
58    }
59
60    pub(crate) fn stop_live_forwarder(&self) {
61        self.live_forwarder_token.cancel();
62    }
63
64    /// Send one or more tool call decisions to the running loop atomically.
65    pub(crate) fn send_decisions(
66        &self,
67        decisions: DecisionBatch,
68    ) -> Result<(), Box<mpsc::TrySendError<DecisionBatch>>> {
69        self.decision_tx.unbounded_send(decisions).map_err(Box::new)
70    }
71
72    /// Send a single tool call decision to the running loop.
73    pub(crate) fn send_decision(
74        &self,
75        call_id: String,
76        resume: ToolCallResume,
77    ) -> Result<(), Box<mpsc::TrySendError<DecisionBatch>>> {
78        self.send_decisions(vec![(call_id, resume)])
79    }
80
81    /// Send direct input messages into the running loop's inbox.
82    pub(crate) fn send_messages(&self, messages: Vec<Message>) -> bool {
83        let Some(inbox_tx) = self.inbox_tx.as_ref() else {
84            return false;
85        };
86        if messages.is_empty() || inbox_tx.is_closed() {
87            return false;
88        }
89        inbox_tx.try_send(crate::inbox::inbox_messages_payload(messages))
90    }
91}
92
93// ---------------------------------------------------------------------------
94// AgentRuntime
95// ---------------------------------------------------------------------------
96
97/// Top-level agent runtime. Manages runs across threads.
98///
99/// Provides methods for cancelling and sending decisions
100/// to active agent runs. Enforces one active run per thread.
101pub struct AgentRuntime {
102    pub(crate) resolver: Arc<dyn ExecutionResolver>,
103    pub(crate) storage: Option<Arc<dyn ThreadRunStore>>,
104    pub(crate) profile_store:
105        Option<Arc<dyn awaken_contract::contract::profile_store::ProfileStore>>,
106    pub(crate) mailbox_store: Option<Arc<dyn MailboxStore>>,
107    pub(crate) active_runs: ActiveRunRegistry,
108    pub(crate) registry_handle: Option<RegistryHandle>,
109    /// One-shot guard for the "mailbox_store not wired" warning; flips true
110    /// on the first `register_run` without a store so we emit exactly one
111    /// tracing event per runtime instance.
112    missing_mailbox_store_warned: std::sync::atomic::AtomicBool,
113    #[cfg(feature = "a2a")]
114    composite_registry: Option<Arc<CompositeAgentSpecRegistry>>,
115}
116
117impl AgentRuntime {
118    pub fn new(resolver: Arc<dyn AgentResolver>) -> Self {
119        Self::new_with_execution_resolver(Arc::new(LocalExecutionResolver::new(resolver)))
120    }
121
122    pub fn new_with_execution_resolver(resolver: Arc<dyn ExecutionResolver>) -> Self {
123        Self {
124            resolver,
125            storage: None,
126            profile_store: None,
127            mailbox_store: None,
128            active_runs: ActiveRunRegistry::new(),
129            registry_handle: None,
130            missing_mailbox_store_warned: std::sync::atomic::AtomicBool::new(false),
131            #[cfg(feature = "a2a")]
132            composite_registry: None,
133        }
134    }
135
136    #[must_use]
137    pub fn with_registry_handle(mut self, handle: RegistryHandle) -> Self {
138        self.registry_handle = Some(handle);
139        self
140    }
141
142    #[must_use]
143    pub fn with_thread_run_store(mut self, store: Arc<dyn ThreadRunStore>) -> Self {
144        self.storage = Some(store);
145        self
146    }
147
148    /// Wire the mailbox store used to subscribe to live-steering commands for
149    /// each active run. If unset, runs never receive remote `LiveRunCommand`s — this
150    /// is the single-process / test default.
151    #[must_use]
152    pub fn with_mailbox_store(mut self, store: Arc<dyn MailboxStore>) -> Self {
153        self.mailbox_store = Some(store);
154        self
155    }
156
157    #[must_use]
158    pub(crate) fn with_profile_store(
159        mut self,
160        store: Arc<dyn awaken_contract::contract::profile_store::ProfileStore>,
161    ) -> Self {
162        self.profile_store = Some(store);
163        self
164    }
165
166    pub fn resolver(&self) -> &dyn AgentResolver {
167        self.resolver.as_ref()
168    }
169
170    /// Return a cloned `Arc` of the agent resolver.
171    pub fn resolver_arc(&self) -> Arc<dyn AgentResolver> {
172        self.resolver.clone()
173    }
174
175    pub fn execution_resolver(&self) -> &dyn ExecutionResolver {
176        self.resolver.as_ref()
177    }
178
179    pub fn execution_resolver_arc(&self) -> Arc<dyn ExecutionResolver> {
180        self.resolver.clone()
181    }
182
183    pub fn registry_handle(&self) -> Option<RegistryHandle> {
184        self.registry_handle.clone()
185    }
186
187    pub fn registry_snapshot(&self) -> Option<RegistrySnapshot> {
188        self.registry_handle.as_ref().map(RegistryHandle::snapshot)
189    }
190
191    pub fn registry_version(&self) -> Option<u64> {
192        self.registry_handle.as_ref().map(RegistryHandle::version)
193    }
194
195    pub fn registry_set(&self) -> Option<RegistrySet> {
196        self.registry_snapshot()
197            .map(RegistrySnapshot::into_registries)
198    }
199
200    pub fn replace_registry_set(&self, registries: RegistrySet) -> Option<u64> {
201        self.registry_handle
202            .as_ref()
203            .map(|handle| handle.replace(registries))
204    }
205
206    #[cfg(feature = "a2a")]
207    #[must_use]
208    pub fn with_composite_registry(mut self, registry: Arc<CompositeAgentSpecRegistry>) -> Self {
209        self.composite_registry = Some(registry);
210        self
211    }
212
213    /// Return the composite registry, if one was configured.
214    #[cfg(feature = "a2a")]
215    pub fn composite_registry(&self) -> Option<&Arc<CompositeAgentSpecRegistry>> {
216        self.composite_registry.as_ref()
217    }
218
219    /// Initialize the runtime — discover remote agents.
220    /// Call this after `build()` to complete async initialization.
221    #[cfg(feature = "a2a")]
222    pub async fn initialize(&self) -> Result<(), RuntimeError> {
223        if let Some(composite) = &self.composite_registry {
224            composite
225                .discover()
226                .await
227                .map_err(|e| RuntimeError::ResolveFailed {
228                    message: format!("remote agent discovery failed: {e}"),
229                })?;
230        }
231        Ok(())
232    }
233
234    pub fn thread_run_store(&self) -> Option<&dyn ThreadRunStore> {
235        self.storage.as_deref()
236    }
237
238    /// Create a run handle pair (handle + internal channels).
239    ///
240    /// Returns (RunHandle for caller, CancellationToken for loop, decision_rx for loop).
241    #[cfg(test)]
242    pub(crate) fn create_run_channels(
243        &self,
244        run_id: String,
245    ) -> (
246        RunHandle,
247        CancellationToken,
248        mpsc::UnboundedReceiver<DecisionBatch>,
249    ) {
250        self.create_run_channels_with_inbox(run_id, None, None)
251    }
252
253    pub(crate) fn create_run_channels_with_inbox(
254        &self,
255        run_id: String,
256        dispatch_id: Option<String>,
257        inbox_tx: Option<InboxSender>,
258    ) -> (
259        RunHandle,
260        CancellationToken,
261        mpsc::UnboundedReceiver<DecisionBatch>,
262    ) {
263        let token = CancellationToken::new();
264        let live_forwarder_token = CancellationToken::new();
265        let (tx, rx) = mpsc::unbounded();
266
267        let handle = RunHandle {
268            run_id,
269            dispatch_id,
270            cancellation_token: token.clone(),
271            live_forwarder_token,
272            decision_tx: tx,
273            inbox_tx,
274        };
275
276        (handle, token, rx)
277    }
278
279    /// Register an active run. Returns error if thread already has one.
280    ///
281    /// Uses atomic try-insert to avoid TOCTOU race between check and insert.
282    /// When a mailbox store is wired, spawns the live-command forwarder that
283    /// dispatches remote `LiveRunCommand`s into this run's in-process channels.
284    pub(crate) fn register_run(
285        &self,
286        thread_id: &str,
287        handle: RunHandle,
288    ) -> Result<(), RuntimeError> {
289        let run_id = handle.run_id.clone();
290        let dispatch_id = handle.dispatch_id.clone();
291        let forwarder_inputs = self.mailbox_store.as_ref().map(|store| {
292            (
293                Arc::clone(store),
294                handle.inbox_tx.clone(),
295                handle.cancellation_token.clone(),
296                handle.live_forwarder_token.clone(),
297                handle.decision_tx.clone(),
298            )
299        });
300        if !self.active_runs.register(&run_id, thread_id, handle) {
301            return Err(RuntimeError::ThreadAlreadyRunning {
302                thread_id: thread_id.to_string(),
303            });
304        }
305        if let Some((store, inbox_tx, token, forwarder_token, decision_tx)) = forwarder_inputs {
306            let thread_id = thread_id.to_string();
307            let mut target = LiveRunTarget::new(thread_id.clone(), run_id.clone());
308            if let Some(dispatch_id) = dispatch_id {
309                target = target.with_dispatch_id(dispatch_id);
310            }
311            tokio::spawn(async move {
312                run_live_forwarder(store, target, inbox_tx, token, forwarder_token, decision_tx)
313                    .await;
314            });
315        } else if !self
316            .missing_mailbox_store_warned
317            .swap(true, std::sync::atomic::Ordering::Relaxed)
318        {
319            tracing::warn!(
320                "AgentRuntime has no mailbox_store wired: cross-node live steering \
321                 (LiveRunCommand) will always fall through to durable queue. Call \
322                 `AgentRuntime::with_mailbox_store(store)` on multi-node deployments."
323            );
324        }
325        Ok(())
326    }
327
328    /// Unregister an active run when it completes (by run_id).
329    pub(crate) fn unregister_run(&self, run_id: &str) {
330        self.active_runs.unregister(run_id);
331    }
332}
333
334/// Forwarder task: subscribes to the mailbox's live channel for a specific
335/// thread and translates each `LiveRunCommand` into the matching in-process signal.
336///
337/// Exits when:
338/// - the run is unregistered and its forwarder token is cancelled,
339/// - the subscription stream ends (store closed the channel),
340/// - a `Cancel` has been dispatched (nothing more for this run to process),
341/// - or a downstream channel is closed (agent loop already finished).
342async fn run_live_forwarder(
343    store: Arc<dyn MailboxStore>,
344    target: LiveRunTarget,
345    inbox_tx: Option<InboxSender>,
346    cancellation_token: CancellationToken,
347    live_forwarder_token: CancellationToken,
348    decision_tx: mpsc::UnboundedSender<DecisionBatch>,
349) {
350    let mut stream = match store.open_live_channel_for(&target).await {
351        Ok(s) => s,
352        Err(err) => {
353            tracing::warn!(
354                thread_id = %target.thread_id,
355                run_id = %target.run_id,
356                dispatch_id = ?target.dispatch_id,
357                error = %err,
358                "live channel subscribe failed"
359            );
360            return;
361        }
362    };
363
364    loop {
365        if live_forwarder_token.is_cancelled() {
366            break;
367        }
368        let next = tokio::select! {
369            biased;
370            _ = live_forwarder_token.cancelled() => break,
371            next = stream.next() => next,
372        };
373        let Some(LiveRunCommandEntry { command, receipt }) = next else {
374            break;
375        };
376        match command {
377            LiveRunCommand::Messages(messages) => {
378                let Some(tx) = inbox_tx.as_ref() else {
379                    // No inbox: can't deliver. Drop the receipt without
380                    // acking so the producer's `deliver_live` resolves as
381                    // `NoSubscriber` and falls back to durable dispatch.
382                    drop(receipt);
383                    continue;
384                };
385                if tx.is_closed() {
386                    drop(receipt);
387                    break;
388                }
389                if tx.try_send(crate::inbox::inbox_messages_payload(messages)) {
390                    receipt.ack();
391                } else {
392                    // Channel full or closed between the `is_closed` check
393                    // and the send; treat as non-delivery.
394                    drop(receipt);
395                }
396            }
397            LiveRunCommand::Cancel => {
398                cancellation_token.cancel();
399                // Cancellation is idempotent and always "accepted" once
400                // the token is flipped; ack before exiting.
401                receipt.ack();
402                break;
403            }
404            LiveRunCommand::Decision(decisions) => {
405                if decision_tx.is_closed() {
406                    drop(receipt);
407                    break;
408                }
409                if decision_tx.unbounded_send(decisions).is_ok() {
410                    receipt.ack();
411                } else {
412                    drop(receipt);
413                }
414            }
415            _ => {
416                // `LiveRunCommand` is `#[non_exhaustive]`. A variant this
417                // forwarder doesn't recognize usually means the producer is
418                // newer than the consumer; silently dropping would let the
419                // run continue in a state the producer believes it has
420                // already mutated. Cancel the run so the caller observes
421                // the version mismatch instead of getting corrupted output.
422                tracing::error!(
423                    thread_id = %target.thread_id,
424                    run_id = %target.run_id,
425                    dispatch_id = ?target.dispatch_id,
426                    "unsupported live run command received; cancelling run to avoid silent divergence"
427                );
428                cancellation_token.cancel();
429                drop(receipt);
430                break;
431            }
432        }
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use std::sync::Arc;
440
441    use awaken_contract::contract::suspension::{ResumeDecisionAction, ToolCallResume};
442    use serde_json::Value;
443
444    struct StubResolver;
445    impl crate::registry::AgentResolver for StubResolver {
446        fn resolve(
447            &self,
448            agent_id: &str,
449        ) -> Result<crate::registry::ResolvedAgent, crate::error::RuntimeError> {
450            Err(crate::error::RuntimeError::AgentNotFound {
451                agent_id: agent_id.to_string(),
452            })
453        }
454    }
455
456    fn make_runtime() -> AgentRuntime {
457        AgentRuntime::new(Arc::new(StubResolver))
458    }
459
460    fn make_resume() -> ToolCallResume {
461        ToolCallResume {
462            decision_id: "d1".into(),
463            action: ResumeDecisionAction::Resume,
464            result: Value::Null,
465            reason: None,
466            updated_at: 0,
467        }
468    }
469
470    #[test]
471    fn new_creates_runtime() {
472        let rt = make_runtime();
473        assert!(rt.storage.is_none());
474        assert!(rt.profile_store.is_none());
475        assert!(rt.registry_handle().is_none());
476    }
477
478    #[test]
479    fn resolver_returns_ref() {
480        let rt = make_runtime();
481        // The stub resolver always returns AgentNotFound
482        let err = rt.resolver().resolve("any").unwrap_err();
483        assert!(
484            matches!(err, crate::error::RuntimeError::AgentNotFound { .. }),
485            "expected AgentNotFound, got {err:?}"
486        );
487    }
488
489    #[test]
490    fn resolver_arc_returns_clone() {
491        let rt = make_runtime();
492        let arc = rt.resolver_arc();
493        let err = arc.resolve("x").unwrap_err();
494        assert!(matches!(
495            err,
496            crate::error::RuntimeError::AgentNotFound { .. }
497        ));
498    }
499
500    #[test]
501    fn with_thread_run_store_sets_store() {
502        let store = Arc::new(awaken_stores::InMemoryStore::new());
503        let rt = make_runtime().with_thread_run_store(store);
504        assert!(rt.thread_run_store().is_some());
505    }
506
507    #[test]
508    fn thread_run_store_none_by_default() {
509        let rt = make_runtime();
510        assert!(rt.thread_run_store().is_none());
511    }
512
513    #[test]
514    fn create_run_channels_returns_triple() {
515        let rt = make_runtime();
516        let (handle, token, _rx) = rt.create_run_channels("run-1".into());
517        assert_eq!(handle.run_id, "run-1");
518        assert!(!token.is_cancelled());
519    }
520
521    #[test]
522    fn register_run_succeeds() {
523        let rt = make_runtime();
524        let (handle, _token, _rx) = rt.create_run_channels("run-1".into());
525        assert!(rt.register_run("thread-1", handle).is_ok());
526    }
527
528    #[test]
529    fn register_run_fails_for_same_thread() {
530        let rt = make_runtime();
531        let (h1, _, _rx1) = rt.create_run_channels("run-1".into());
532        let (h2, _, _rx2) = rt.create_run_channels("run-2".into());
533        rt.register_run("thread-1", h1).unwrap();
534        let err = rt.register_run("thread-1", h2).unwrap_err();
535        assert!(
536            matches!(err, RuntimeError::ThreadAlreadyRunning { ref thread_id } if thread_id == "thread-1"),
537            "expected ThreadAlreadyRunning, got {err:?}"
538        );
539    }
540
541    #[test]
542    fn unregister_run_allows_reregistration() {
543        let rt = make_runtime();
544        let (h1, _, _rx1) = rt.create_run_channels("run-1".into());
545        rt.register_run("thread-1", h1).unwrap();
546        rt.unregister_run("run-1");
547
548        let (h2, _, _rx2) = rt.create_run_channels("run-2".into());
549        assert!(rt.register_run("thread-1", h2).is_ok());
550    }
551
552    #[test]
553    fn run_handle_cancel() {
554        let rt = make_runtime();
555        let (handle, token, _rx) = rt.create_run_channels("run-1".into());
556        assert!(!token.is_cancelled());
557        handle.cancel();
558        assert!(token.is_cancelled());
559    }
560
561    #[test]
562    fn run_handle_send_decisions() {
563        let rt = make_runtime();
564        let (handle, _token, mut rx) = rt.create_run_channels("run-1".into());
565        let decisions = vec![("call-1".into(), make_resume())];
566        handle.send_decisions(decisions).unwrap();
567
568        // Receive the batch from the channel
569        let batch = rx.try_recv().unwrap();
570        assert_eq!(batch.len(), 1);
571        assert_eq!(batch[0].0, "call-1");
572    }
573
574    #[test]
575    fn run_handle_send_decision_single() {
576        let rt = make_runtime();
577        let (handle, _token, mut rx) = rt.create_run_channels("run-1".into());
578        handle
579            .send_decision("call-2".into(), make_resume())
580            .unwrap();
581
582        let batch = rx.try_recv().unwrap();
583        assert_eq!(batch.len(), 1);
584        assert_eq!(batch[0].0, "call-2");
585    }
586
587    #[test]
588    fn run_handle_send_decisions_closed_channel() {
589        let rt = make_runtime();
590        let (handle, _token, rx) = rt.create_run_channels("run-1".into());
591        // Drop the receiver to close the channel
592        drop(rx);
593
594        let result = handle.send_decisions(vec![("call-1".into(), make_resume())]);
595        assert!(result.is_err(), "send should fail when receiver is dropped");
596    }
597
598    // ── Live forwarder integration ──
599
600    mod live_forwarder {
601        use super::*;
602        use awaken_contract::contract::mailbox::LiveRunCommand;
603        use awaken_stores::InMemoryMailboxStore;
604        use std::time::Duration;
605
606        /// Publish on `store` until the subscriber count for `thread_id` is
607        /// non-zero so the forwarder's background subscription is guaranteed
608        /// active. We cannot inspect the broadcast state directly from here
609        /// (the broadcast sender is private to the store), so we send a
610        /// single no-op ping that will be consumed by the forwarder and
611        /// poll until the first test-visible side effect proves it ran.
612        async fn settle() {
613            // 20ms is enough for a tokio::spawn + one await + subscribe call
614            // in CI. Tests that observe the forwarder output should use
615            // additional polling with timeouts.
616            tokio::time::sleep(Duration::from_millis(20)).await;
617        }
618
619        #[tokio::test]
620        async fn messages_variant_lands_in_inbox() {
621            let store = Arc::new(InMemoryMailboxStore::new());
622            let rt = make_runtime().with_mailbox_store(store.clone());
623            let (inbox_tx, mut inbox_rx) = crate::inbox::inbox_channel();
624            let (handle, _token, _rx) =
625                rt.create_run_channels_with_inbox("run-1".into(), None, Some(inbox_tx));
626            rt.register_run("thread-1", handle).unwrap();
627            settle().await;
628
629            store
630                .deliver_live_to(
631                    &LiveRunTarget::new("thread-1", "run-1"),
632                    LiveRunCommand::Messages(vec![Message::user("live-1")]),
633                )
634                .await
635                .unwrap();
636
637            let mut received = None;
638            for _ in 0..50 {
639                if let Some(json) = inbox_rx.try_recv() {
640                    received = Some(json);
641                    break;
642                }
643                tokio::time::sleep(Duration::from_millis(10)).await;
644            }
645            let payload = received.expect("forwarder must deliver Messages within 500ms");
646            let messages = crate::inbox::inbox_payload_messages(&payload);
647            assert_eq!(messages.len(), 1);
648            assert_eq!(messages[0].text(), "live-1");
649        }
650
651        #[tokio::test]
652        async fn cancel_variant_triggers_token() {
653            let store = Arc::new(InMemoryMailboxStore::new());
654            let rt = make_runtime().with_mailbox_store(store.clone());
655            let (handle, token, _rx) = rt.create_run_channels("run-1".into());
656            rt.register_run("thread-1", handle).unwrap();
657            settle().await;
658
659            store
660                .deliver_live_to(
661                    &LiveRunTarget::new("thread-1", "run-1"),
662                    LiveRunCommand::Cancel,
663                )
664                .await
665                .unwrap();
666
667            let mut cancelled = false;
668            for _ in 0..50 {
669                if token.is_cancelled() {
670                    cancelled = true;
671                    break;
672                }
673                tokio::time::sleep(Duration::from_millis(10)).await;
674            }
675            assert!(cancelled, "forwarder must cancel token within 500ms");
676        }
677
678        #[tokio::test]
679        async fn decision_variant_lands_on_decision_channel() {
680            let store = Arc::new(InMemoryMailboxStore::new());
681            let rt = make_runtime().with_mailbox_store(store.clone());
682            let (handle, _token, mut rx) = rt.create_run_channels("run-1".into());
683            rt.register_run("thread-1", handle).unwrap();
684            settle().await;
685
686            let decisions = vec![("call-1".into(), make_resume())];
687            store
688                .deliver_live_to(
689                    &LiveRunTarget::new("thread-1", "run-1"),
690                    LiveRunCommand::Decision(decisions),
691                )
692                .await
693                .unwrap();
694
695            let mut got = None;
696            for _ in 0..50 {
697                if let Ok(batch) = rx.try_recv() {
698                    got = Some(batch);
699                    break;
700                }
701                tokio::time::sleep(Duration::from_millis(10)).await;
702            }
703            let batch = got.expect("forwarder must deliver Decision within 500ms");
704            assert_eq!(batch.len(), 1);
705            assert_eq!(batch[0].0, "call-1");
706        }
707
708        #[tokio::test]
709        async fn no_store_wired_no_forwarder_runs() {
710            // Baseline: without `with_mailbox_store`, deliver_live published
711            // elsewhere must not reach this runtime's channels.
712            let detached_store = InMemoryMailboxStore::new();
713            let rt = make_runtime(); // no store
714            let (inbox_tx, mut inbox_rx) = crate::inbox::inbox_channel();
715            let (handle, token, _rx) =
716                rt.create_run_channels_with_inbox("run-1".into(), None, Some(inbox_tx));
717            rt.register_run("thread-1", handle).unwrap();
718            settle().await;
719
720            detached_store
721                .deliver_live(
722                    "thread-1",
723                    LiveRunCommand::Messages(vec![Message::user("ignored")]),
724                )
725                .await
726                .unwrap();
727            tokio::time::sleep(Duration::from_millis(100)).await;
728
729            assert!(inbox_rx.try_recv().is_none());
730            assert!(!token.is_cancelled());
731        }
732
733        #[tokio::test]
734        async fn separate_threads_isolated() {
735            let store = Arc::new(InMemoryMailboxStore::new());
736            let rt = make_runtime().with_mailbox_store(store.clone());
737
738            let (tx_a, mut rx_a) = crate::inbox::inbox_channel();
739            let (tx_b, mut rx_b) = crate::inbox::inbox_channel();
740            let (h_a, _tok_a, _dec_a) =
741                rt.create_run_channels_with_inbox("run-a".into(), None, Some(tx_a));
742            let (h_b, _tok_b, _dec_b) =
743                rt.create_run_channels_with_inbox("run-b".into(), None, Some(tx_b));
744            rt.register_run("thread-a", h_a).unwrap();
745            rt.register_run("thread-b", h_b).unwrap();
746            settle().await;
747
748            store
749                .deliver_live_to(
750                    &LiveRunTarget::new("thread-a", "run-a"),
751                    LiveRunCommand::Messages(vec![Message::user("for-a")]),
752                )
753                .await
754                .unwrap();
755            tokio::time::sleep(Duration::from_millis(80)).await;
756
757            assert!(rx_a.try_recv().is_some(), "thread-a must receive");
758            assert!(
759                rx_b.try_recv().is_none(),
760                "thread-b must not receive thread-a's message"
761            );
762        }
763
764        #[tokio::test]
765        async fn unregister_stops_live_forwarder_subscription() {
766            let store = Arc::new(InMemoryMailboxStore::new());
767            let rt = make_runtime().with_mailbox_store(store.clone());
768            let (handle, _token, _rx) = rt.create_run_channels("run-1".into());
769            rt.register_run("thread-1", handle).unwrap();
770            settle().await;
771
772            rt.unregister_run("run-1");
773            let target = LiveRunTarget::new("thread-1", "run-1");
774            let mut outcome = store
775                .deliver_live_to(&target, LiveRunCommand::Cancel)
776                .await
777                .unwrap();
778            for _ in 0..50 {
779                if outcome == awaken_contract::contract::mailbox::LiveDeliveryOutcome::NoSubscriber
780                {
781                    break;
782                }
783                tokio::time::sleep(Duration::from_millis(10)).await;
784                outcome = store
785                    .deliver_live_to(&target, LiveRunCommand::Cancel)
786                    .await
787                    .unwrap();
788            }
789            assert_eq!(
790                outcome,
791                awaken_contract::contract::mailbox::LiveDeliveryOutcome::NoSubscriber,
792                "unregister must stop the old live forwarder"
793            );
794        }
795
796        #[tokio::test]
797        async fn cancel_then_messages_messages_not_processed() {
798            // After forwarder dispatches Cancel it exits, so subsequent
799            // Messages on the same thread should not reach the inbox via
800            // this forwarder instance (agent loop is expected to be torn
801            // down anyway).
802            let store = Arc::new(InMemoryMailboxStore::new());
803            let rt = make_runtime().with_mailbox_store(store.clone());
804            let (inbox_tx, mut inbox_rx) = crate::inbox::inbox_channel();
805            let (handle, token, _rx) =
806                rt.create_run_channels_with_inbox("run-1".into(), None, Some(inbox_tx));
807            rt.register_run("thread-1", handle).unwrap();
808            settle().await;
809
810            store
811                .deliver_live_to(
812                    &LiveRunTarget::new("thread-1", "run-1"),
813                    LiveRunCommand::Cancel,
814                )
815                .await
816                .unwrap();
817            // Wait for cancel to propagate.
818            for _ in 0..50 {
819                if token.is_cancelled() {
820                    break;
821                }
822                tokio::time::sleep(Duration::from_millis(10)).await;
823            }
824            assert!(token.is_cancelled());
825
826            store
827                .deliver_live_to(
828                    &LiveRunTarget::new("thread-1", "run-1"),
829                    LiveRunCommand::Messages(vec![Message::user("too-late")]),
830                )
831                .await
832                .unwrap();
833            tokio::time::sleep(Duration::from_millis(80)).await;
834            assert!(
835                inbox_rx.try_recv().is_none(),
836                "forwarder must have exited after Cancel"
837            );
838        }
839    }
840}