Skip to main content

awaken_runtime/runtime/agent_runtime/
runner.rs

1//! AgentRuntime::run() implementation.
2
3use std::sync::Arc;
4
5use crate::backend::{
6    BackendControl, BackendLocalRootContext, BackendRootRunRequest, ExecutionBackendError,
7    LocalBackend, execute_remote_root_lifecycle, execution_capabilities,
8    validate_root_execution_request,
9};
10use crate::loop_runner::{AgentLoopError, AgentRunResult};
11use crate::registry::{ExecutionResolver, ResolvedExecution};
12use awaken_contract::contract::active_agent::ActiveAgentIdKey;
13use awaken_contract::contract::event_sink::{EventSink, NullEventSink};
14use awaken_contract::contract::identity::RunIdentity;
15use awaken_contract::contract::message::{Message, Role, Visibility};
16use awaken_contract::contract::storage::RunRecord;
17use awaken_contract::contract::suspension::ToolCallStatus;
18use awaken_contract::now_ms;
19use awaken_contract::state::PersistedState;
20
21use super::AgentRuntime;
22use super::run_request::{RunRequest, ThreadContextSnapshot};
23
24const DEFAULT_AGENT_ID: &str = "default";
25
26/// RAII guard that unregisters the active run on drop, ensuring cleanup
27/// even if the run future panics or is cancelled.
28struct RunSlotGuard<'a> {
29    runtime: &'a AgentRuntime,
30    run_id: String,
31}
32
33impl Drop for RunSlotGuard<'_> {
34    fn drop(&mut self) {
35        self.runtime.unregister_run(&self.run_id);
36    }
37}
38
39struct PreparedLocalRootExecution {
40    messages: Vec<Message>,
41    phase_runtime: crate::phase::PhaseRuntime,
42    inbox: crate::inbox::InboxReceiver,
43    inbox_sender: crate::inbox::InboxSender,
44    /// Per-run wiring for context auto-compaction. Some when the preflight
45    /// resolved agent declared `autocompact_threshold` and the runtime had
46    /// not already attached a manager + summarizer.
47    compaction: Option<CompactionRuntime>,
48}
49
50/// Per-run context auto-compaction wiring: shared manager + summarizer that
51/// the loop's resolver-wrapper grafts onto every `ResolvedAgent` it produces.
52#[derive(Clone)]
53struct CompactionRuntime {
54    manager: std::sync::Arc<crate::extensions::background::BackgroundTaskManager>,
55    summarizer: std::sync::Arc<dyn crate::context::ContextSummarizer>,
56}
57
58/// Build the per-run compaction wiring when the preflight agent declared
59/// `autocompact_threshold` and no upstream code (builder, custom resolver)
60/// already attached a manager + summarizer.
61///
62/// The manager has its store and owner inbox bound here so background
63/// compaction tasks can commit metadata and deliver completion events.
64/// `BackgroundTaskPlugin`'s state keys are registered on the store; if a
65/// matching plugin is already installed the dup error is treated as a
66/// no-op since the keys are already live.
67fn build_compaction_runtime(
68    preflight_resolved: &crate::registry::ResolvedAgent,
69    store: &crate::state::StateStore,
70    owner_inbox: &crate::inbox::InboxSender,
71) -> Result<Option<CompactionRuntime>, AgentLoopError> {
72    let opts_in = preflight_resolved
73        .context_policy()
74        .and_then(|policy| policy.autocompact_threshold)
75        .is_some();
76    if !opts_in {
77        return Ok(None);
78    }
79    if preflight_resolved.background_manager.is_some()
80        && preflight_resolved.context_summarizer.is_some()
81    {
82        return Ok(None);
83    }
84
85    let manager = std::sync::Arc::new(crate::extensions::background::BackgroundTaskManager::new());
86    manager.set_store(store.clone());
87    manager.set_owner_inbox(owner_inbox.clone());
88
89    match store.install_plugin(crate::extensions::background::BackgroundTaskPlugin::new(
90        manager.clone(),
91    )) {
92        Ok(()) => {}
93        Err(awaken_contract::StateError::PluginAlreadyInstalled { .. }) => {
94            // Keys already registered by an upstream wiring; reuse store as-is.
95        }
96        Err(awaken_contract::StateError::KeyAlreadyRegistered { .. }) => {
97            // A different plugin owns one of the background-task keys; reuse them.
98        }
99        Err(error) => return Err(AgentLoopError::PhaseError(error)),
100    }
101
102    let compaction_config = preflight_resolved
103        .spec
104        .config::<crate::context::CompactionConfigKey>()
105        .unwrap_or_default();
106    let summarizer: std::sync::Arc<dyn crate::context::ContextSummarizer> = std::sync::Arc::new(
107        crate::context::DefaultSummarizer::with_config(compaction_config),
108    );
109
110    Ok(Some(CompactionRuntime {
111        manager,
112        summarizer,
113    }))
114}
115
116/// Resolver wrapper that grafts a per-run `BackgroundTaskManager` and
117/// `ContextSummarizer` onto every `ResolvedAgent` whose context policy opts
118/// in via `autocompact_threshold`. The same `Arc`s are reused across resolve
119/// calls so the manager bound during `bind_local_execution_env` is the one
120/// used by every subsequent loop step.
121struct CompactionResolver<'a> {
122    inner: &'a dyn crate::registry::ExecutionResolver,
123    runtime: CompactionRuntime,
124}
125
126impl<'a> CompactionResolver<'a> {
127    fn new(inner: &'a dyn crate::registry::ExecutionResolver, runtime: CompactionRuntime) -> Self {
128        Self { inner, runtime }
129    }
130
131    fn graft(
132        &self,
133        mut resolved: crate::registry::ResolvedAgent,
134    ) -> crate::registry::ResolvedAgent {
135        let opts_in = resolved
136            .context_policy()
137            .and_then(|policy| policy.autocompact_threshold)
138            .is_some();
139        if !opts_in {
140            return resolved;
141        }
142        if resolved.background_manager.is_none() {
143            resolved.background_manager = Some(self.runtime.manager.clone());
144        }
145        if resolved.context_summarizer.is_none() {
146            resolved.context_summarizer = Some(self.runtime.summarizer.clone());
147        }
148        resolved
149    }
150}
151
152impl crate::registry::AgentResolver for CompactionResolver<'_> {
153    fn resolve(
154        &self,
155        agent_id: &str,
156    ) -> Result<crate::registry::ResolvedAgent, crate::RuntimeError> {
157        self.inner
158            .resolve(agent_id)
159            .map(|resolved| self.graft(resolved))
160    }
161
162    fn agent_ids(&self) -> Vec<String> {
163        self.inner.agent_ids()
164    }
165}
166
167impl crate::registry::ExecutionResolver for CompactionResolver<'_> {
168    fn resolve_execution(
169        &self,
170        agent_id: &str,
171    ) -> Result<crate::registry::ResolvedExecution, crate::RuntimeError> {
172        let execution = self.inner.resolve_execution(agent_id)?;
173        Ok(match execution {
174            crate::registry::ResolvedExecution::Local(resolved) => {
175                crate::registry::ResolvedExecution::Local(Box::new(self.graft(*resolved)))
176            }
177            other => other,
178        })
179    }
180}
181
182impl AgentRuntime {
183    /// Run an agent loop until it returns an [`AgentRunResult`].
184    ///
185    /// This is a convenience wrapper for one-shot CLI programs and examples
186    /// that only need the final [`AgentRunResult`]. Use [`Self::run`] with an
187    /// [`EventSink`] when streaming events to SSE, WebSocket, protocol adapters,
188    /// or tests.
189    pub async fn run_to_completion(
190        &self,
191        request: RunRequest,
192    ) -> Result<AgentRunResult, AgentLoopError> {
193        self.run(request, Arc::new(NullEventSink)).await
194    }
195
196    /// Run an agent loop.
197    ///
198    /// This is the single production entry point. It:
199    /// 1. Resolves the agent from the registry
200    /// 2. Loads thread messages from storage (if configured)
201    /// 3. Applies resume decisions (if present in request)
202    /// 4. Creates a PhaseRuntime and StateStore
203    /// 5. Registers the active run
204    /// 6. Calls `run_agent_loop` internally
205    /// 7. Unregisters the run when complete
206    ///
207    /// Run an agent loop. Returns the result when the run completes.
208    ///
209    /// Use `cancel()` / `send_decisions()` on `AgentRuntime` for external
210    /// control of in-flight runs.
211    pub async fn run(
212        &self,
213        request: RunRequest,
214        sink: Arc<dyn EventSink>,
215    ) -> Result<AgentRunResult, AgentLoopError> {
216        self.run_inner(request, sink, None).await
217    }
218
219    #[doc(hidden)]
220    pub async fn run_with_thread_context(
221        &self,
222        request: RunRequest,
223        sink: Arc<dyn EventSink>,
224        thread_ctx: Option<ThreadContextSnapshot>,
225    ) -> Result<AgentRunResult, AgentLoopError> {
226        self.run_inner(request, sink, thread_ctx).await
227    }
228
229    async fn run_inner(
230        &self,
231        request: RunRequest,
232        sink: Arc<dyn EventSink>,
233        thread_ctx: Option<ThreadContextSnapshot>,
234    ) -> Result<AgentRunResult, AgentLoopError> {
235        let RunRequest {
236            messages: request_messages,
237            messages_already_persisted,
238            thread_id,
239            agent_id,
240            overrides,
241            decisions,
242            frontend_tools,
243            origin: req_origin,
244            run_mode,
245            adapter,
246            parent_run_id: req_parent_run_id,
247            parent_thread_id: req_parent_thread_id,
248            continue_run_id,
249            run_id_hint,
250            dispatch_id_hint,
251            dispatch_id,
252            session_id,
253            transport_request_id,
254            run_inbox,
255        } = request;
256        let new_messages = request_messages.clone();
257        let requested_continue_run_id = continue_run_id.clone();
258        let agent_id = self
259            .resolve_agent_id(agent_id, &thread_id, &thread_ctx)
260            .await?;
261        let run_resolver: Arc<dyn ExecutionResolver> =
262            if let Some(snapshot) = self.registry_snapshot() {
263                Arc::new(crate::registry::resolve::RegistrySetResolver::new(
264                    snapshot.into_registries(),
265                ))
266            } else {
267                self.execution_resolver_arc()
268            };
269        let resolved_execution = run_resolver
270            .resolve_execution(&agent_id)
271            .map_err(AgentLoopError::RuntimeError)?;
272        let capabilities =
273            execution_capabilities(&resolved_execution).map_err(local_root_execution_error)?;
274        let (run_id, is_continuation) = self
275            .next_root_run_id(
276                &thread_id,
277                continue_run_id,
278                run_id_hint,
279                dispatch_id_hint,
280                matches!(&resolved_execution, ResolvedExecution::Local(_)),
281                &thread_ctx,
282            )
283            .await?;
284        let run_origin = match req_origin {
285            awaken_contract::contract::storage::RunRequestOrigin::User => {
286                awaken_contract::contract::identity::RunOrigin::User
287            }
288            awaken_contract::contract::storage::RunRequestOrigin::A2A => {
289                awaken_contract::contract::identity::RunOrigin::Subagent
290            }
291            awaken_contract::contract::storage::RunRequestOrigin::Internal => {
292                awaken_contract::contract::identity::RunOrigin::Internal
293            }
294        };
295        let mut run_identity = RunIdentity::new(
296            thread_id.clone(),
297            req_parent_thread_id,
298            run_id.clone(),
299            req_parent_run_id,
300            agent_id.clone(),
301            run_origin,
302        )
303        .with_run_mode(run_mode)
304        .with_adapter(adapter);
305        if let Some(dispatch_id) = dispatch_id {
306            run_identity = run_identity.with_dispatch_id(dispatch_id);
307        }
308        if let Some(session_id) = session_id {
309            run_identity = run_identity.with_session_id(session_id);
310        }
311        if let Some(transport_request_id) = transport_request_id {
312            run_identity = run_identity.with_transport_request_id(transport_request_id);
313        }
314
315        let mut run_inbox = run_inbox;
316        let mut compaction_runtime: Option<CompactionRuntime> = None;
317        let (messages, phase_runtime, inbox, live_inbox_sender, previous_non_local_state) =
318            match &resolved_execution {
319                ResolvedExecution::Local(preflight_resolved) => {
320                    let prepared = self
321                        .prepare_local_root_execution(
322                            preflight_resolved,
323                            &thread_id,
324                            request_messages,
325                            messages_already_persisted,
326                            &decisions,
327                            run_inbox.take(),
328                            &thread_ctx,
329                        )
330                        .await?;
331                    compaction_runtime = prepared.compaction;
332                    (
333                        prepared.messages,
334                        Some(prepared.phase_runtime),
335                        Some(prepared.inbox),
336                        Some(prepared.inbox_sender),
337                        None,
338                    )
339                }
340                ResolvedExecution::NonLocal(_) => {
341                    let live_inbox_sender =
342                        run_inbox.as_ref().map(|run_inbox| run_inbox.sender.clone());
343                    (
344                        self.load_non_local_messages(
345                            &thread_id,
346                            request_messages,
347                            messages_already_persisted,
348                            &thread_ctx,
349                        )
350                        .await?,
351                        None,
352                        run_inbox.take().map(|run_inbox| run_inbox.receiver),
353                        live_inbox_sender,
354                        self.load_non_local_state(
355                            &thread_id,
356                            requested_continue_run_id.as_deref(),
357                            &thread_ctx,
358                        )
359                        .await?,
360                    )
361                }
362            };
363        let run_created_at = now_ms();
364
365        let (handle, cancellation_token, raw_decision_rx) = self.create_run_channels_with_inbox(
366            run_id.clone(),
367            run_identity.trace.dispatch_id.clone(),
368            live_inbox_sender,
369        );
370        let runtime_cancellation_token = cancellation_token.clone();
371        let decision_rx = if capabilities.decisions {
372            Some(raw_decision_rx)
373        } else {
374            drop(raw_decision_rx);
375            None
376        };
377
378        // Wrap the resolver so every `ResolvedAgent` it produces during this
379        // run carries the per-run compaction manager + summarizer when the
380        // agent opted in via `autocompact_threshold`. Lifetime is tied to
381        // `backend_request`, which is consumed before this scope ends.
382        let compaction_resolver = compaction_runtime
383            .clone()
384            .map(|runtime| CompactionResolver::new(run_resolver.as_ref(), runtime));
385        let resolver_for_backend: &dyn ExecutionResolver = match compaction_resolver.as_ref() {
386            Some(wrapper) => wrapper,
387            None => run_resolver.as_ref(),
388        };
389
390        let backend_request = BackendRootRunRequest {
391            agent_id: &agent_id,
392            messages,
393            new_messages,
394            sink: sink.clone(),
395            resolver: resolver_for_backend,
396            run_identity: run_identity.clone(),
397            checkpoint_store: match &resolved_execution {
398                ResolvedExecution::Local(_) => phase_runtime.as_ref().and(self.storage.as_deref()),
399                ResolvedExecution::NonLocal(_) => self.storage.as_deref(),
400            },
401            control: BackendControl {
402                cancellation_token: capabilities
403                    .cancellation
404                    .supports_cooperative_token()
405                    .then_some(cancellation_token),
406                decision_rx,
407            },
408            decisions,
409            overrides,
410            frontend_tools,
411            local: phase_runtime
412                .as_ref()
413                .map(|phase_runtime| BackendLocalRootContext { phase_runtime }),
414            inbox,
415            is_continuation,
416        };
417        validate_root_execution_request(&resolved_execution, &backend_request).map_err(
418            |error| match error {
419                ExecutionBackendError::Loop(loop_error) => loop_error,
420                other => AgentLoopError::RuntimeError(crate::RuntimeError::ResolveFailed {
421                    message: other.to_string(),
422                }),
423            },
424        )?;
425
426        // Register active run (guard ensures cleanup on drop/panic/cancellation)
427        self.register_run(&thread_id, handle)
428            .map_err(AgentLoopError::RuntimeError)?;
429        let _guard = RunSlotGuard {
430            runtime: self,
431            run_id: run_id.clone(),
432        };
433
434        match &resolved_execution {
435            ResolvedExecution::Local(_) => {
436                let result = LocalBackend::new()
437                    .execute_root_with_thread_context(backend_request, thread_ctx)
438                    .await
439                    .map_err(local_root_execution_error)?;
440                Ok(AgentRunResult {
441                    run_id: run_id.clone(),
442                    response: result.response.unwrap_or_default(),
443                    termination: result.termination,
444                    steps: result.steps,
445                })
446            }
447            ResolvedExecution::NonLocal(non_local) => {
448                execute_remote_root_lifecycle(
449                    non_local,
450                    backend_request,
451                    run_created_at,
452                    runtime_cancellation_token,
453                    previous_non_local_state,
454                )
455                .await
456            }
457        }
458    }
459
460    #[allow(clippy::too_many_arguments)]
461    async fn prepare_local_root_execution(
462        &self,
463        preflight_resolved: &crate::registry::ResolvedAgent,
464        thread_id: &str,
465        request_messages: Vec<Message>,
466        messages_already_persisted: bool,
467        decisions: &[(
468            String,
469            awaken_contract::contract::suspension::ToolCallResume,
470        )],
471        run_inbox: Option<super::run_request::RunInbox>,
472        thread_ctx: &Option<ThreadContextSnapshot>,
473    ) -> Result<PreparedLocalRootExecution, AgentLoopError> {
474        let store = crate::state::StateStore::new();
475        let phase_runtime =
476            crate::phase::PhaseRuntime::new(store.clone()).map_err(AgentLoopError::PhaseError)?;
477        store
478            .install_plugin(crate::loop_runner::LoopStatePlugin)
479            .map_err(AgentLoopError::PhaseError)?;
480        let run_inbox = run_inbox.unwrap_or_else(|| {
481            let (sender, receiver) = crate::inbox::inbox_channel();
482            super::run_request::RunInbox { sender, receiver }
483        });
484        let owner_inbox = run_inbox.sender.clone();
485        crate::backend::LocalBackend::bind_local_execution_env(
486            &store,
487            preflight_resolved,
488            Some(&owner_inbox),
489        )
490        .map_err(AgentLoopError::PhaseError)?;
491
492        let compaction = build_compaction_runtime(preflight_resolved, &store, &owner_inbox)?;
493
494        let mut messages = if let Some(ctx) = thread_ctx {
495            if let Some(ref prev_run) = ctx.latest_run
496                && let Some(ref persisted) = prev_run.state
497            {
498                store
499                    .restore_thread_scoped(
500                        persisted.clone(),
501                        awaken_contract::UnknownKeyPolicy::Skip,
502                    )
503                    .map_err(AgentLoopError::PhaseError)?;
504            }
505            ctx.messages.clone()
506        } else if let Some(ref ts) = self.storage {
507            if let Some(prev_run) = ts
508                .latest_run(thread_id)
509                .await
510                .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
511                && let Some(persisted) = prev_run.state
512            {
513                store
514                    .restore_thread_scoped(persisted, awaken_contract::UnknownKeyPolicy::Skip)
515                    .map_err(AgentLoopError::PhaseError)?;
516            }
517            ts.load_messages(thread_id)
518                .await
519                .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
520                .unwrap_or_default()
521        } else {
522            vec![]
523        };
524        if should_supersede_suspended_calls(&request_messages, decisions) {
525            strip_superseded_suspended_tool_calls(&mut messages, &store);
526        }
527        strip_unpaired_tool_calls(&mut messages);
528        if !messages_already_persisted {
529            messages.extend(request_messages);
530        }
531
532        Ok(PreparedLocalRootExecution {
533            messages,
534            phase_runtime,
535            inbox: run_inbox.receiver,
536            inbox_sender: owner_inbox,
537            compaction,
538        })
539    }
540
541    async fn load_non_local_messages(
542        &self,
543        thread_id: &str,
544        request_messages: Vec<Message>,
545        messages_already_persisted: bool,
546        thread_ctx: &Option<ThreadContextSnapshot>,
547    ) -> Result<Vec<Message>, AgentLoopError> {
548        let mut messages = if let Some(ctx) = thread_ctx {
549            ctx.messages.clone()
550        } else if let Some(ref storage) = self.storage {
551            storage
552                .load_messages(thread_id)
553                .await
554                .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
555                .unwrap_or_default()
556        } else {
557            Vec::new()
558        };
559        strip_unpaired_tool_calls(&mut messages);
560        if !messages_already_persisted {
561            messages.extend(request_messages);
562        }
563        Ok(messages)
564    }
565
566    async fn next_root_run_id(
567        &self,
568        thread_id: &str,
569        continue_run_id: Option<String>,
570        run_id_hint: Option<String>,
571        dispatch_id_hint: Option<String>,
572        allow_waiting_reuse: bool,
573        thread_ctx: &Option<ThreadContextSnapshot>,
574    ) -> Result<(String, bool), AgentLoopError> {
575        if let Some(run_id) = continue_run_id {
576            // Check cache first for continue_run_id.
577            if let Some(ctx) = thread_ctx
578                && ctx.run_cache.contains_key(&run_id)
579            {
580                return Ok((run_id, true));
581            }
582            let Some(ref ts) = self.storage else {
583                return Err(AgentLoopError::InvalidResume(format!(
584                    "continue_run_id '{run_id}' requires run storage"
585                )));
586            };
587            if ts
588                .load_run(&run_id)
589                .await
590                .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
591                .is_some()
592            {
593                return Ok((run_id, true));
594            }
595            return Err(AgentLoopError::InvalidResume(format!(
596                "continue_run_id '{run_id}' does not reference an existing run"
597            )));
598        }
599        if let Some(run_id) = run_id_hint.and_then(|id| {
600            let trimmed = id.trim();
601            (!trimmed.is_empty()).then(|| trimmed.to_string())
602        }) {
603            // Check cache first, then store.
604            let existing = if let Some(ctx) = thread_ctx {
605                ctx.run_cache.get(&run_id).cloned()
606            } else {
607                None
608            };
609            let existing = if existing.is_some() {
610                existing
611            } else if let Some(ref ts) = self.storage {
612                ts.load_run(&run_id)
613                    .await
614                    .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
615            } else {
616                None
617            };
618            if let Some(existing) = existing {
619                if existing.status == awaken_contract::contract::lifecycle::RunStatus::Created {
620                    return Ok((run_id, false));
621                }
622                return Err(AgentLoopError::InvalidResume(format!(
623                    "run_id_hint '{run_id}' already exists as a run"
624                )));
625            }
626            return Ok((run_id, false));
627        }
628        if let Some(run_id) = dispatch_id_hint.and_then(|id| {
629            let trimmed = id.trim();
630            (!trimmed.is_empty()).then(|| trimmed.to_string())
631        }) {
632            if let Some(ref ts) = self.storage
633                && ts
634                    .load_run(&run_id)
635                    .await
636                    .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
637                    .is_some()
638            {
639                return Err(AgentLoopError::InvalidResume(format!(
640                    "dispatch_id_hint '{run_id}' already exists as a run"
641                )));
642            }
643            return Ok((run_id, false));
644        }
645        if allow_waiting_reuse {
646            if let Some(ctx) = thread_ctx {
647                if let Some(run) = ctx.latest_run.as_ref().filter(|r| r.is_resumable_waiting()) {
648                    return Ok((run.run_id.clone(), true));
649                }
650            } else if let Some(prev) = self.reusable_waiting_run(thread_id).await? {
651                return Ok((prev.run_id.clone(), true));
652            }
653        }
654        Ok((uuid::Uuid::now_v7().to_string(), false))
655    }
656
657    async fn reusable_waiting_run(
658        &self,
659        thread_id: &str,
660    ) -> Result<Option<RunRecord>, AgentLoopError> {
661        let Some(ref ts) = self.storage else {
662            return Ok(None);
663        };
664
665        if let Some(thread) = ts
666            .load_thread(thread_id)
667            .await
668            .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
669            && let Some(open_run_id) = thread.open_run_id.as_deref()
670            && let Some(run) = ts
671                .load_run(open_run_id)
672                .await
673                .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
674            && run.thread_id == thread_id
675            && run.is_resumable_waiting()
676        {
677            return Ok(Some(run));
678        }
679
680        Ok(ts
681            .latest_run(thread_id)
682            .await
683            .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
684            .filter(RunRecord::is_resumable_waiting))
685    }
686
687    async fn resolve_agent_id(
688        &self,
689        requested_agent_id: Option<String>,
690        thread_id: &str,
691        thread_ctx: &Option<ThreadContextSnapshot>,
692    ) -> Result<String, AgentLoopError> {
693        if let Some(agent_id) = requested_agent_id {
694            return Ok(agent_id);
695        }
696
697        if let Some(inferred) = self
698            .infer_agent_id_from_thread(thread_id, thread_ctx)
699            .await?
700        {
701            return Ok(inferred);
702        }
703
704        Ok(DEFAULT_AGENT_ID.to_string())
705    }
706
707    async fn infer_agent_id_from_thread(
708        &self,
709        thread_id: &str,
710        thread_ctx: &Option<ThreadContextSnapshot>,
711    ) -> Result<Option<String>, AgentLoopError> {
712        if let Some(ctx) = thread_ctx {
713            if let Some(ref prev_run) = ctx.latest_run {
714                if let Some(agent_id) = prev_run.state.as_ref().and_then(active_agent_from_state) {
715                    return Ok(Some(agent_id));
716                }
717                let agent_id = prev_run.agent_id.trim();
718                if !agent_id.is_empty() {
719                    return Ok(Some(agent_id.to_string()));
720                }
721            }
722            return Ok(None);
723        }
724
725        let Some(storage) = &self.storage else {
726            return Ok(None);
727        };
728
729        let Some(prev_run) = storage
730            .latest_run(thread_id)
731            .await
732            .map_err(|e| AgentLoopError::StorageError(e.to_string()))?
733        else {
734            return Ok(None);
735        };
736
737        if let Some(agent_id) = prev_run.state.as_ref().and_then(active_agent_from_state) {
738            return Ok(Some(agent_id));
739        }
740
741        let agent_id = prev_run.agent_id.trim();
742        if agent_id.is_empty() {
743            Ok(None)
744        } else {
745            Ok(Some(agent_id.to_string()))
746        }
747    }
748
749    async fn load_non_local_state(
750        &self,
751        thread_id: &str,
752        continue_run_id: Option<&str>,
753        thread_ctx: &Option<ThreadContextSnapshot>,
754    ) -> Result<Option<PersistedState>, AgentLoopError> {
755        if let Some(ctx) = thread_ctx {
756            if let Some(run_id) = continue_run_id {
757                return Ok(ctx.run_cache.get(run_id).and_then(|r| r.state.clone()));
758            }
759            return Ok(ctx.latest_run.as_ref().and_then(|r| r.state.clone()));
760        }
761
762        let Some(storage) = &self.storage else {
763            return Ok(None);
764        };
765
766        if let Some(run_id) = continue_run_id {
767            return Ok(storage
768                .load_run(run_id)
769                .await
770                .map_err(|error| AgentLoopError::StorageError(error.to_string()))?
771                .and_then(|run| run.state));
772        }
773
774        Ok(storage
775            .latest_run(thread_id)
776            .await
777            .map_err(|error| AgentLoopError::StorageError(error.to_string()))?
778            .and_then(|run| run.state))
779    }
780}
781
782fn local_root_execution_error(error: ExecutionBackendError) -> AgentLoopError {
783    match error {
784        ExecutionBackendError::Loop(loop_error) => loop_error,
785        other => AgentLoopError::RuntimeError(crate::RuntimeError::ResolveFailed {
786            message: other.to_string(),
787        }),
788    }
789}
790
791fn active_agent_from_state(state: &PersistedState) -> Option<String> {
792    state
793        .extensions
794        .get(<ActiveAgentIdKey as awaken_contract::StateKey>::KEY)
795        .and_then(|value| value.as_str())
796        .map(str::trim)
797        .filter(|v| !v.is_empty())
798        .map(ToOwned::to_owned)
799}
800
801/// Remove unpaired tool calls from message history.
802///
803/// When a run is cancelled while tool calls are pending, the history may
804/// contain assistant messages with `tool_calls` that have no matching
805/// `Tool` role response. These "orphaned" calls confuse LLMs on the next
806/// turn. This function strips unanswered calls from all assistant messages.
807fn strip_unpaired_tool_calls(messages: &mut Vec<Message>) {
808    use std::collections::HashSet;
809
810    // Collect all tool call IDs that have a Tool-role response.
811    let answered: HashSet<String> = messages
812        .iter()
813        .filter(|m| m.role == Role::Tool)
814        .filter_map(|m| m.tool_call_id.clone())
815        .collect();
816
817    // Strip unanswered tool calls from all assistant messages.
818    for msg in messages.iter_mut() {
819        if msg.role != Role::Assistant {
820            continue;
821        }
822        if let Some(ref mut calls) = msg.tool_calls {
823            calls.retain(|c| answered.contains(&c.id));
824            if calls.is_empty() {
825                msg.tool_calls = None;
826            }
827        }
828    }
829
830    // Remove trailing empty assistant messages (no text, no tool calls).
831    while let Some(last) = messages.last() {
832        if last.role == Role::Assistant
833            && last.tool_calls.is_none()
834            && last.text().trim().is_empty()
835        {
836            messages.pop();
837        } else {
838            break;
839        }
840    }
841}
842
843fn should_supersede_suspended_calls(
844    request_messages: &[Message],
845    decisions: &[(
846        String,
847        awaken_contract::contract::suspension::ToolCallResume,
848    )],
849) -> bool {
850    decisions.is_empty()
851        && request_messages
852            .iter()
853            .any(|message| message.role == Role::User && message.visibility == Visibility::All)
854}
855
856fn strip_superseded_suspended_tool_calls(
857    messages: &mut Vec<Message>,
858    store: &crate::state::StateStore,
859) {
860    use std::collections::HashSet;
861
862    let suspended_ids: HashSet<String> = store
863        .read::<crate::agent::state::ToolCallStates>()
864        .unwrap_or_default()
865        .calls
866        .into_iter()
867        .filter_map(|(call_id, state)| {
868            (state.status == ToolCallStatus::Suspended).then_some(call_id)
869        })
870        .collect();
871    if suspended_ids.is_empty() {
872        return;
873    }
874
875    for message in messages.iter_mut() {
876        if message.role != Role::Assistant {
877            continue;
878        }
879        if let Some(ref mut calls) = message.tool_calls {
880            calls.retain(|call| !suspended_ids.contains(&call.id));
881            if calls.is_empty() {
882                message.tool_calls = None;
883            }
884        }
885    }
886
887    messages.retain(|message| {
888        !(message.role == Role::Tool
889            && message
890                .tool_call_id
891                .as_ref()
892                .is_some_and(|call_id| suspended_ids.contains(call_id)))
893    });
894
895    while let Some(last) = messages.last() {
896        if last.role == Role::Assistant
897            && last.tool_calls.is_none()
898            && last.text().trim().is_empty()
899        {
900            messages.pop();
901        } else {
902            break;
903        }
904    }
905}
906
907#[cfg(test)]
908mod tests {
909    use super::super::*;
910    #[cfg(feature = "a2a")]
911    use crate::extensions::a2a::{
912        AgentBackend, AgentBackendError, AgentBackendFactory, AgentBackendFactoryError,
913        DelegateRunResult, DelegateRunStatus,
914    };
915    use crate::loop_runner::build_agent_env;
916    use crate::plugins::{Plugin, PluginDescriptor, PluginRegistrar};
917    #[cfg(feature = "a2a")]
918    use crate::registry::memory::{
919        MapAgentSpecRegistry, MapBackendRegistry, MapModelRegistry, MapPluginSource,
920        MapProviderRegistry, MapToolRegistry,
921    };
922    #[cfg(feature = "a2a")]
923    use crate::registry::snapshot::RegistryHandle;
924    #[cfg(feature = "a2a")]
925    use crate::registry::traits::{BackendRegistry, ModelBinding, RegistrySet};
926    use crate::registry::{AgentResolver, ResolvedAgent};
927    use crate::state::{KeyScope, StateCommand, StateKey, StateKeyOptions};
928    use crate::{PhaseContext, PhaseHook, ToolPolicyHook};
929    use async_trait::async_trait;
930    use awaken_contract::PersistedState;
931    use awaken_contract::contract::active_agent::ActiveAgentIdKey;
932    use awaken_contract::contract::content::ContentBlock;
933    use awaken_contract::contract::event::AgentEvent;
934    use awaken_contract::contract::event_sink::{EventSink, NullEventSink, VecEventSink};
935    use awaken_contract::contract::executor::{
936        InferenceExecutionError, InferenceRequest, LlmExecutor,
937    };
938    use awaken_contract::contract::inference::{InferenceOverride, StopReason, StreamResult};
939    use awaken_contract::contract::lifecycle::{RunStatus, TerminationReason};
940    use awaken_contract::contract::message::Message;
941    use awaken_contract::contract::storage::{
942        RunQuery, RunRecord, RunStore, RunWaitingState, ThreadRunStore, ThreadStore, WaitingReason,
943    };
944    use awaken_contract::contract::suspension::ResumeDecisionAction;
945    use awaken_contract::contract::suspension::ToolCallResume;
946    use awaken_contract::contract::tool::{
947        Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
948    };
949    use awaken_contract::contract::tool_intercept::{
950        AdapterKind, RunMode, ToolPolicyContext, ToolPolicyDecision,
951    };
952    #[cfg(feature = "a2a")]
953    use awaken_contract::registry_spec::{AgentSpec, RemoteEndpoint};
954    use awaken_stores::InMemoryStore;
955    use serde_json::{Value, json};
956    use std::collections::HashMap;
957    use std::sync::atomic::{AtomicUsize, Ordering};
958    use std::sync::{Arc, Mutex};
959
960    struct ScriptedLlm {
961        responses: Mutex<Vec<StreamResult>>,
962        seen_overrides: Mutex<Vec<Option<InferenceOverride>>>,
963    }
964
965    impl ScriptedLlm {
966        fn new(responses: Vec<StreamResult>) -> Self {
967            Self {
968                responses: Mutex::new(responses),
969                seen_overrides: Mutex::new(Vec::new()),
970            }
971        }
972    }
973
974    #[async_trait]
975    impl LlmExecutor for ScriptedLlm {
976        async fn execute(
977            &self,
978            request: InferenceRequest,
979        ) -> Result<StreamResult, InferenceExecutionError> {
980            self.seen_overrides
981                .lock()
982                .expect("lock poisoned")
983                .push(request.overrides.clone());
984            let mut responses = self.responses.lock().expect("lock poisoned");
985            if responses.is_empty() {
986                Ok(StreamResult {
987                    content: vec![ContentBlock::text("done")],
988                    tool_calls: vec![],
989                    usage: None,
990                    stop_reason: Some(StopReason::EndTurn),
991                    has_incomplete_tool_calls: false,
992                })
993            } else {
994                Ok(responses.remove(0))
995            }
996        }
997
998        fn name(&self) -> &str {
999            "scripted"
1000        }
1001    }
1002
1003    #[cfg(feature = "a2a")]
1004    struct StaticRemoteBackend {
1005        response: String,
1006        delay_ms: u64,
1007        cancellation: bool,
1008        continuation: bool,
1009        abort_count: Arc<AtomicUsize>,
1010        termination: TerminationReason,
1011        status_reason: Option<String>,
1012    }
1013
1014    #[cfg(feature = "a2a")]
1015    #[async_trait]
1016    impl AgentBackend for StaticRemoteBackend {
1017        fn capabilities(&self) -> crate::backend::BackendCapabilities {
1018            crate::backend::BackendCapabilities {
1019                cancellation: if self.cancellation {
1020                    crate::backend::BackendCancellationCapability::RemoteAbort
1021                } else {
1022                    crate::backend::BackendCancellationCapability::None
1023                },
1024                decisions: false,
1025                overrides: false,
1026                frontend_tools: false,
1027                continuation: if self.continuation {
1028                    crate::backend::BackendContinuationCapability::RemoteState
1029                } else {
1030                    crate::backend::BackendContinuationCapability::None
1031                },
1032                waits: crate::backend::BackendWaitCapability::None,
1033                transcript: crate::backend::BackendTranscriptCapability::SinglePrompt,
1034                output: crate::backend::BackendOutputCapability::Text,
1035            }
1036        }
1037
1038        async fn abort(
1039            &self,
1040            _request: crate::backend::BackendAbortRequest<'_>,
1041        ) -> Result<(), AgentBackendError> {
1042            self.abort_count.fetch_add(1, Ordering::SeqCst);
1043            Ok(())
1044        }
1045
1046        async fn execute_root(
1047            &self,
1048            request: crate::backend::BackendRootRunRequest<'_>,
1049        ) -> Result<DelegateRunResult, AgentBackendError> {
1050            if self.delay_ms > 0 {
1051                tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
1052            }
1053            Ok(DelegateRunResult {
1054                agent_id: request.agent_id.to_string(),
1055                status: match &self.termination {
1056                    TerminationReason::Cancelled => DelegateRunStatus::Cancelled,
1057                    TerminationReason::Error(message) => DelegateRunStatus::Failed(message.clone()),
1058                    _ => DelegateRunStatus::Completed,
1059                },
1060                termination: self.termination.clone(),
1061                status_reason: self.status_reason.clone(),
1062                response: Some(self.response.clone()),
1063                output: crate::backend::BackendRunOutput::from_text(Some(self.response.clone())),
1064                steps: 1,
1065                run_id: Some("child-remote-run".into()),
1066                inbox: None,
1067                state: None,
1068            })
1069        }
1070    }
1071
1072    #[cfg(feature = "a2a")]
1073    struct StaticRemoteBackendFactory {
1074        abort_count: Arc<AtomicUsize>,
1075    }
1076
1077    #[cfg(feature = "a2a")]
1078    impl AgentBackendFactory for StaticRemoteBackendFactory {
1079        fn backend(&self) -> &str {
1080            "test-remote"
1081        }
1082
1083        fn build(
1084            &self,
1085            endpoint: &RemoteEndpoint,
1086        ) -> Result<Arc<dyn AgentBackend>, AgentBackendFactoryError> {
1087            if endpoint.backend != "test-remote" {
1088                return Err(AgentBackendFactoryError::InvalidConfig(format!(
1089                    "unexpected backend '{}'",
1090                    endpoint.backend
1091                )));
1092            }
1093            let delay_ms = endpoint
1094                .options
1095                .get("delay_ms")
1096                .and_then(serde_json::Value::as_u64)
1097                .unwrap_or(0);
1098            let cancellation = endpoint
1099                .options
1100                .get("supports_cancellation")
1101                .and_then(serde_json::Value::as_bool)
1102                .unwrap_or(true);
1103            let continuation = endpoint
1104                .options
1105                .get("supports_continuation")
1106                .and_then(serde_json::Value::as_bool)
1107                .unwrap_or(false);
1108            let termination = match endpoint.options.get("termination").and_then(|v| v.as_str()) {
1109                Some("suspended") => TerminationReason::Suspended,
1110                Some("cancelled") => TerminationReason::Cancelled,
1111                Some("error") => TerminationReason::Error("remote root error".into()),
1112                _ => TerminationReason::NaturalEnd,
1113            };
1114            let status_reason = endpoint
1115                .options
1116                .get("status_reason")
1117                .and_then(serde_json::Value::as_str)
1118                .map(ToOwned::to_owned);
1119            Ok(Arc::new(StaticRemoteBackend {
1120                response: "remote root response".into(),
1121                delay_ms,
1122                cancellation,
1123                continuation,
1124                abort_count: self.abort_count.clone(),
1125                termination,
1126                status_reason,
1127            }))
1128        }
1129    }
1130
1131    #[cfg(feature = "a2a")]
1132    fn build_remote_runtime(
1133        endpoint: RemoteEndpoint,
1134        abort_count: Arc<AtomicUsize>,
1135    ) -> AgentRuntime {
1136        let mut models = MapModelRegistry::new();
1137        models
1138            .register_model(
1139                "test-model",
1140                ModelBinding {
1141                    provider_id: "mock".into(),
1142                    upstream_model: "mock-model".into(),
1143                },
1144            )
1145            .unwrap();
1146
1147        let mut providers = MapProviderRegistry::new();
1148        providers
1149            .register_provider("mock", Arc::new(ScriptedLlm::new(Vec::new())))
1150            .unwrap();
1151
1152        let mut agents = MapAgentSpecRegistry::new();
1153        agents
1154            .register_spec(
1155                AgentSpec::new("remote-root")
1156                    .with_model_id("test-model")
1157                    .with_system_prompt("remote root")
1158                    .with_endpoint(endpoint),
1159            )
1160            .unwrap();
1161
1162        let mut backends = MapBackendRegistry::new();
1163        backends
1164            .register_backend_factory(Arc::new(StaticRemoteBackendFactory { abort_count }))
1165            .unwrap();
1166
1167        let registries = RegistrySet {
1168            agents: Arc::new(agents),
1169            tools: Arc::new(MapToolRegistry::new()),
1170            models: Arc::new(models),
1171            providers: Arc::new(providers),
1172            plugins: Arc::new(MapPluginSource::new()),
1173            backends: Arc::new(backends) as Arc<dyn BackendRegistry>,
1174        };
1175        let handle = RegistryHandle::new(registries.clone());
1176        AgentRuntime::new(Arc::new(
1177            crate::registry::resolve::DynamicRegistryResolver::new(handle.clone()),
1178        ))
1179        .with_registry_handle(handle)
1180        .with_thread_run_store(Arc::new(InMemoryStore::new()))
1181    }
1182
1183    #[cfg(feature = "a2a")]
1184    #[tokio::test]
1185    async fn run_supports_endpoint_root_agents() {
1186        let runtime = build_remote_runtime(
1187            RemoteEndpoint {
1188                backend: "test-remote".into(),
1189                base_url: "https://remote.example.com".into(),
1190                ..Default::default()
1191            },
1192            Arc::new(AtomicUsize::new(0)),
1193        );
1194
1195        let sink = Arc::new(VecEventSink::new());
1196        let result = runtime
1197            .run(
1198                RunRequest::new("remote-thread", vec![Message::user("hello")])
1199                    .with_agent_id("remote-root"),
1200                sink.clone(),
1201            )
1202            .await
1203            .expect("endpoint root run should succeed");
1204
1205        assert_eq!(result.response, "remote root response");
1206        assert!(matches!(result.termination, TerminationReason::NaturalEnd));
1207
1208        let events = sink.events();
1209        assert!(matches!(events.first(), Some(AgentEvent::RunStart { .. })));
1210        assert!(events.iter().any(|event| matches!(
1211            event,
1212            AgentEvent::TextDelta { delta } if delta == "remote root response"
1213        )));
1214        assert!(events.iter().any(|event| matches!(
1215            event,
1216            AgentEvent::RunFinish {
1217                termination: TerminationReason::NaturalEnd,
1218                ..
1219            }
1220        )));
1221
1222        let latest_run = runtime
1223            .thread_run_store()
1224            .expect("store")
1225            .latest_run("remote-thread")
1226            .await
1227            .expect("run lookup should succeed")
1228            .expect("run record should be persisted");
1229        assert_eq!(latest_run.agent_id, "remote-root");
1230        assert_eq!(latest_run.status, RunStatus::Done);
1231
1232        let messages = runtime
1233            .thread_run_store()
1234            .expect("store")
1235            .load_messages("remote-thread")
1236            .await
1237            .expect("message lookup should succeed")
1238            .expect("messages should be persisted");
1239        assert!(messages.iter().any(|message| {
1240            message.role == awaken_contract::contract::message::Role::Assistant
1241                && message.text() == "remote root response"
1242        }));
1243    }
1244
1245    #[cfg(feature = "a2a")]
1246    #[tokio::test]
1247    async fn run_persists_non_local_waiting_reason_from_backend() {
1248        let runtime = build_remote_runtime(
1249            RemoteEndpoint {
1250                backend: "test-remote".into(),
1251                base_url: "https://remote.example.com".into(),
1252                options: std::collections::BTreeMap::from([
1253                    ("termination".into(), json!("suspended")),
1254                    ("status_reason".into(), json!("input_required")),
1255                ]),
1256                ..Default::default()
1257            },
1258            Arc::new(AtomicUsize::new(0)),
1259        );
1260
1261        let sink = Arc::new(VecEventSink::new());
1262        let result = runtime
1263            .run(
1264                RunRequest::new("remote-thread-waiting", vec![Message::user("hello")])
1265                    .with_agent_id("remote-root"),
1266                sink.clone(),
1267            )
1268            .await
1269            .expect("endpoint root run should suspend cleanly");
1270
1271        assert_eq!(result.termination, TerminationReason::Suspended);
1272
1273        let latest_run = runtime
1274            .thread_run_store()
1275            .expect("store")
1276            .latest_run("remote-thread-waiting")
1277            .await
1278            .expect("run lookup should succeed")
1279            .expect("run record should be persisted");
1280        assert_eq!(latest_run.status, RunStatus::Waiting);
1281        assert_eq!(latest_run.waiting_reason(), Some(WaitingReason::UserInput));
1282
1283        let events = sink.events();
1284        assert!(events.iter().any(|event| matches!(
1285            event,
1286            AgentEvent::RunFinish {
1287                termination: TerminationReason::Suspended,
1288                result: Some(result),
1289                ..
1290            } if result["status_reason"].as_str() == Some("input_required")
1291        )));
1292    }
1293
1294    #[cfg(feature = "a2a")]
1295    #[tokio::test]
1296    async fn run_rejects_remote_overrides_without_backend_capability() {
1297        let runtime = build_remote_runtime(
1298            RemoteEndpoint {
1299                backend: "test-remote".into(),
1300                base_url: "https://remote.example.com".into(),
1301                ..Default::default()
1302            },
1303            Arc::new(AtomicUsize::new(0)),
1304        );
1305
1306        let error = runtime
1307            .run(
1308                RunRequest::new("remote-thread-overrides", vec![Message::user("hello")])
1309                    .with_agent_id("remote-root")
1310                    .with_overrides(InferenceOverride {
1311                        temperature: Some(0.2),
1312                        ..Default::default()
1313                    }),
1314                Arc::new(VecEventSink::new()),
1315            )
1316            .await
1317            .expect_err("remote backend should reject overrides");
1318
1319        assert!(error.to_string().contains("does not support: overrides"));
1320    }
1321
1322    #[cfg(feature = "a2a")]
1323    #[tokio::test]
1324    async fn run_allows_non_local_root_backends_without_cancellation_capability() {
1325        let abort_count = Arc::new(AtomicUsize::new(0));
1326        let runtime = Arc::new(build_remote_runtime(
1327            RemoteEndpoint {
1328                backend: "test-remote".into(),
1329                base_url: "https://remote.example.com".into(),
1330                options: std::collections::BTreeMap::from([
1331                    ("delay_ms".into(), json!(5_000_u64)),
1332                    ("supports_cancellation".into(), json!(false)),
1333                ]),
1334                ..Default::default()
1335            },
1336            abort_count.clone(),
1337        ));
1338
1339        let run_handle = {
1340            let runtime = runtime.clone();
1341            tokio::spawn(async move {
1342                runtime
1343                    .run(
1344                        RunRequest::new("remote-thread-cancel", vec![Message::user("hello")])
1345                            .with_agent_id("remote-root"),
1346                        Arc::new(VecEventSink::new()),
1347                    )
1348                    .await
1349            })
1350        };
1351
1352        let mut cancelled = false;
1353        for _ in 0..20 {
1354            if runtime.cancel("remote-thread-cancel") {
1355                cancelled = true;
1356                break;
1357            }
1358            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1359        }
1360        assert!(cancelled);
1361
1362        let result = run_handle
1363            .await
1364            .expect("task should join")
1365            .expect("cancelled run should still return a result");
1366        assert!(matches!(result.termination, TerminationReason::Cancelled));
1367        assert_eq!(abort_count.load(Ordering::SeqCst), 0);
1368    }
1369
1370    #[cfg(feature = "a2a")]
1371    #[tokio::test]
1372    async fn run_non_local_root_cancel_invokes_backend_abort_hook() {
1373        let abort_count = Arc::new(AtomicUsize::new(0));
1374        let runtime = Arc::new(build_remote_runtime(
1375            RemoteEndpoint {
1376                backend: "test-remote".into(),
1377                base_url: "https://remote.example.com".into(),
1378                options: std::collections::BTreeMap::from([("delay_ms".into(), json!(5_000_u64))]),
1379                ..Default::default()
1380            },
1381            abort_count.clone(),
1382        ));
1383
1384        let run_handle = {
1385            let runtime = runtime.clone();
1386            tokio::spawn(async move {
1387                runtime
1388                    .run(
1389                        RunRequest::new("remote-thread-abort", vec![Message::user("hello")])
1390                            .with_agent_id("remote-root"),
1391                        Arc::new(VecEventSink::new()),
1392                    )
1393                    .await
1394            })
1395        };
1396
1397        let mut cancelled = false;
1398        for _ in 0..20 {
1399            if runtime.cancel("remote-thread-abort") {
1400                cancelled = true;
1401                break;
1402            }
1403            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1404        }
1405        assert!(cancelled);
1406        let _ = run_handle.await.expect("task should join");
1407
1408        assert_eq!(abort_count.load(Ordering::SeqCst), 1);
1409    }
1410
1411    #[cfg(feature = "a2a")]
1412    #[tokio::test]
1413    async fn run_rejects_remote_resume_decisions_without_backend_capability() {
1414        let runtime = build_remote_runtime(
1415            RemoteEndpoint {
1416                backend: "test-remote".into(),
1417                base_url: "https://remote.example.com".into(),
1418                ..Default::default()
1419            },
1420            Arc::new(AtomicUsize::new(0)),
1421        );
1422
1423        let error = runtime
1424            .run(
1425                RunRequest::new("remote-thread-decisions", vec![Message::user("hello")])
1426                    .with_agent_id("remote-root")
1427                    .with_decisions(vec![(
1428                        "call-1".into(),
1429                        ToolCallResume {
1430                            decision_id: "d1".into(),
1431                            action: ResumeDecisionAction::Resume,
1432                            result: Value::Null,
1433                            reason: None,
1434                            updated_at: 1,
1435                        },
1436                    )]),
1437                Arc::new(VecEventSink::new()),
1438            )
1439            .await
1440            .expect_err("remote backend should reject resume decisions");
1441
1442        assert!(error.to_string().contains("does not support: decisions"));
1443    }
1444
1445    #[cfg(feature = "a2a")]
1446    #[tokio::test]
1447    async fn run_rejects_remote_frontend_tools_without_backend_capability() {
1448        let runtime = build_remote_runtime(
1449            RemoteEndpoint {
1450                backend: "test-remote".into(),
1451                base_url: "https://remote.example.com".into(),
1452                ..Default::default()
1453            },
1454            Arc::new(AtomicUsize::new(0)),
1455        );
1456
1457        let error = runtime
1458            .run(
1459                RunRequest::new("remote-thread-frontend", vec![Message::user("hello")])
1460                    .with_agent_id("remote-root")
1461                    .with_frontend_tools(vec![ToolDescriptor::new(
1462                        "browser",
1463                        "browser",
1464                        "frontend tool",
1465                    )]),
1466                Arc::new(VecEventSink::new()),
1467            )
1468            .await
1469            .expect_err("remote backend should reject frontend tools");
1470
1471        assert!(
1472            error
1473                .to_string()
1474                .contains("does not support: frontend_tools")
1475        );
1476    }
1477
1478    #[tokio::test]
1479    async fn run_rejects_remote_continuation_without_backend_capability() {
1480        let runtime = build_remote_runtime(
1481            RemoteEndpoint {
1482                backend: "test-remote".into(),
1483                base_url: "https://remote.example.com".into(),
1484                ..Default::default()
1485            },
1486            Arc::new(AtomicUsize::new(0)),
1487        );
1488        let store = runtime.thread_run_store().expect("store");
1489        let existing_run = RunRecord {
1490            run_id: "existing-run".into(),
1491            thread_id: "remote-thread-cont".into(),
1492            agent_id: "remote-root".into(),
1493            parent_run_id: None,
1494            request: None,
1495            input: None,
1496            output: None,
1497            status: RunStatus::Waiting,
1498            termination_reason: None,
1499            final_output: None,
1500            error_payload: None,
1501            dispatch_id: None,
1502            session_id: None,
1503            transport_request_id: None,
1504            waiting: None,
1505            outcome: None,
1506            created_at: 1,
1507            started_at: None,
1508            finished_at: None,
1509            updated_at: 1,
1510            steps: 1,
1511            input_tokens: 0,
1512            output_tokens: 0,
1513            state: None,
1514        };
1515        store
1516            .checkpoint(
1517                "remote-thread-cont",
1518                &[Message::user("previous remote turn")],
1519                &existing_run,
1520            )
1521            .await
1522            .expect("seed existing remote run");
1523
1524        let error = runtime
1525            .run(
1526                RunRequest::new("remote-thread-cont", vec![Message::user("hello")])
1527                    .with_agent_id("remote-root")
1528                    .with_continue_run_id("existing-run"),
1529                Arc::new(VecEventSink::new()),
1530            )
1531            .await
1532            .expect_err("remote backend should reject continuation");
1533
1534        assert!(error.to_string().contains("does not support: continuation"));
1535    }
1536
1537    #[tokio::test]
1538    async fn run_rejects_unknown_continue_run_id() {
1539        let runtime = build_remote_runtime(
1540            RemoteEndpoint {
1541                backend: "test-remote".into(),
1542                base_url: "https://remote.example.com".into(),
1543                options: std::collections::BTreeMap::from([(
1544                    "supports_continuation".into(),
1545                    json!(true),
1546                )]),
1547                ..Default::default()
1548            },
1549            Arc::new(AtomicUsize::new(0)),
1550        );
1551
1552        let error = runtime
1553            .run(
1554                RunRequest::new("remote-thread-missing-cont", vec![Message::user("resume")])
1555                    .with_agent_id("remote-root")
1556                    .with_continue_run_id("missing-run"),
1557                Arc::new(VecEventSink::new()),
1558            )
1559            .await
1560            .expect_err("unknown continuation run id should fail");
1561
1562        assert!(
1563            error
1564                .to_string()
1565                .contains("continue_run_id 'missing-run' does not reference an existing run")
1566        );
1567    }
1568
1569    #[tokio::test]
1570    async fn run_uses_dispatch_id_hint_for_new_run_identity() {
1571        let runtime = build_remote_runtime(
1572            RemoteEndpoint {
1573                backend: "test-remote".into(),
1574                base_url: "https://remote.example.com".into(),
1575                ..Default::default()
1576            },
1577            Arc::new(AtomicUsize::new(0)),
1578        );
1579
1580        runtime
1581            .run(
1582                RunRequest::new("remote-thread-dispatch-hint", vec![Message::user("hello")])
1583                    .with_agent_id("remote-root")
1584                    .with_dispatch_id_hint("external-task-1"),
1585                Arc::new(VecEventSink::new()),
1586            )
1587            .await
1588            .expect("dispatch id hint should create the run identity");
1589
1590        let store = runtime.thread_run_store().expect("store");
1591        let run = store
1592            .load_run("external-task-1")
1593            .await
1594            .expect("load hinted run")
1595            .expect("hinted run");
1596        assert_eq!(run.thread_id, "remote-thread-dispatch-hint");
1597        assert_eq!(run.status, RunStatus::Done);
1598    }
1599
1600    #[tokio::test]
1601    async fn run_trace_dispatch_id_does_not_block_local_waiting_reuse() {
1602        let store = Arc::new(InMemoryStore::new());
1603        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
1604            content: vec![ContentBlock::text("continued")],
1605            tool_calls: vec![],
1606            usage: None,
1607            stop_reason: Some(StopReason::EndTurn),
1608            has_incomplete_tool_calls: false,
1609        }]));
1610        let resolver = Arc::new(FixedResolver {
1611            agent: ResolvedAgent::new("agent", "m", "sys", llm),
1612            plugins: vec![],
1613        });
1614        let runtime = AgentRuntime::new(resolver)
1615            .with_thread_run_store(store.clone() as Arc<dyn ThreadRunStore>);
1616        store
1617            .checkpoint(
1618                "thread-default-hint",
1619                &[Message::user("waiting")],
1620                &RunRecord {
1621                    run_id: "waiting-run".into(),
1622                    thread_id: "thread-default-hint".into(),
1623                    agent_id: "agent".into(),
1624                    parent_run_id: None,
1625                    request: None,
1626                    input: None,
1627                    output: None,
1628                    status: RunStatus::Waiting,
1629                    termination_reason: None,
1630                    final_output: None,
1631                    error_payload: None,
1632                    dispatch_id: None,
1633                    session_id: None,
1634                    transport_request_id: None,
1635                    waiting: Some(RunWaitingState {
1636                        reason: WaitingReason::BackgroundTasks,
1637                        ticket_ids: Vec::new(),
1638                        tickets: Vec::new(),
1639                        since_dispatch_id: Some("mailbox-dispatch-1".into()),
1640                        message: Some("waiting for background work".into()),
1641                    }),
1642                    outcome: None,
1643                    created_at: 1,
1644                    started_at: None,
1645                    finished_at: None,
1646                    updated_at: 1,
1647                    steps: 1,
1648                    input_tokens: 0,
1649                    output_tokens: 0,
1650                    state: None,
1651                },
1652            )
1653            .await
1654            .expect("seed waiting run");
1655
1656        let result = runtime
1657            .run(
1658                RunRequest::new("thread-default-hint", vec![Message::user("resume")])
1659                    .with_agent_id("agent")
1660                    .with_trace_dispatch_id("mailbox-dispatch-1"),
1661                Arc::new(VecEventSink::new()),
1662            )
1663            .await
1664            .expect("default dispatch trace should allow waiting reuse");
1665
1666        assert_eq!(result.run_id, "waiting-run");
1667        assert!(
1668            store
1669                .load_run("mailbox-dispatch-1")
1670                .await
1671                .expect("load default hint run")
1672                .is_none(),
1673            "default dispatch trace must not create a new run when a local waiting run is reusable"
1674        );
1675    }
1676
1677    #[tokio::test]
1678    async fn run_reuses_structured_tool_permission_waiting_run() {
1679        let store = Arc::new(InMemoryStore::new());
1680        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
1681            content: vec![ContentBlock::text("approved continuation")],
1682            tool_calls: vec![],
1683            usage: None,
1684            stop_reason: Some(StopReason::EndTurn),
1685            has_incomplete_tool_calls: false,
1686        }]));
1687        let resolver = Arc::new(FixedResolver {
1688            agent: ResolvedAgent::new("agent", "m", "sys", llm),
1689            plugins: vec![],
1690        });
1691        let runtime = AgentRuntime::new(resolver)
1692            .with_thread_run_store(store.clone() as Arc<dyn ThreadRunStore>);
1693        store
1694            .checkpoint(
1695                "thread-tool-permission",
1696                &[Message::user("waiting")],
1697                &RunRecord {
1698                    run_id: "waiting-tool-run".into(),
1699                    thread_id: "thread-tool-permission".into(),
1700                    agent_id: "agent".into(),
1701                    parent_run_id: None,
1702                    request: None,
1703                    input: None,
1704                    output: None,
1705                    status: RunStatus::Waiting,
1706                    termination_reason: None,
1707                    final_output: None,
1708                    error_payload: None,
1709                    dispatch_id: None,
1710                    session_id: None,
1711                    transport_request_id: None,
1712                    waiting: Some(RunWaitingState {
1713                        reason: WaitingReason::ToolPermission,
1714                        ticket_ids: vec!["call-1".into()],
1715                        tickets: Vec::new(),
1716                        since_dispatch_id: None,
1717                        message: Some("approval required".into()),
1718                    }),
1719                    outcome: None,
1720                    created_at: 1,
1721                    started_at: None,
1722                    finished_at: None,
1723                    updated_at: 1,
1724                    steps: 1,
1725                    input_tokens: 0,
1726                    output_tokens: 0,
1727                    state: None,
1728                },
1729            )
1730            .await
1731            .expect("seed waiting run");
1732
1733        let result = runtime
1734            .run(
1735                RunRequest::new("thread-tool-permission", vec![Message::user("approved")])
1736                    .with_agent_id("agent")
1737                    .with_trace_dispatch_id("mailbox-dispatch-tool"),
1738                Arc::new(VecEventSink::new()),
1739            )
1740            .await
1741            .expect("structured waiting run should be reusable");
1742
1743        assert_eq!(result.run_id, "waiting-tool-run");
1744        assert!(
1745            store
1746                .load_run("mailbox-dispatch-tool")
1747                .await
1748                .expect("load default hint run")
1749                .is_none(),
1750            "default dispatch trace must stay trace-only when a structured waiting run is reusable"
1751        );
1752    }
1753
1754    #[tokio::test]
1755    async fn run_trace_dispatch_id_is_trace_not_canonical_run_id_for_new_run() {
1756        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
1757            content: vec![ContentBlock::text("new run")],
1758            tool_calls: vec![],
1759            usage: None,
1760            stop_reason: Some(StopReason::EndTurn),
1761            has_incomplete_tool_calls: false,
1762        }]));
1763        let resolver = Arc::new(FixedResolver {
1764            agent: ResolvedAgent::new("agent", "m", "sys", llm),
1765            plugins: vec![],
1766        });
1767        let runtime = AgentRuntime::new(resolver);
1768        let sink = Arc::new(VecEventSink::new());
1769
1770        let result = runtime
1771            .run(
1772                RunRequest::new("thread-default-new", vec![Message::user("start")])
1773                    .with_agent_id("agent")
1774                    .with_trace_dispatch_id("mailbox-dispatch-new"),
1775                sink.clone(),
1776            )
1777            .await
1778            .expect("run should succeed");
1779
1780        assert_ne!(result.run_id, "mailbox-dispatch-new");
1781        let start = sink
1782            .events()
1783            .into_iter()
1784            .find_map(|event| match event {
1785                AgentEvent::RunStart {
1786                    run_id, identity, ..
1787                } => Some((run_id, identity)),
1788                _ => None,
1789            })
1790            .expect("run start event should be emitted");
1791        assert_eq!(start.0, result.run_id);
1792        assert_eq!(
1793            start.1.and_then(|identity| identity.trace.dispatch_id),
1794            Some("mailbox-dispatch-new".into())
1795        );
1796    }
1797
1798    #[tokio::test]
1799    async fn run_non_local_continuation_uses_requested_run_state_not_latest() {
1800        let runtime = build_remote_runtime(
1801            RemoteEndpoint {
1802                backend: "test-remote".into(),
1803                base_url: "https://remote.example.com".into(),
1804                options: std::collections::BTreeMap::from([(
1805                    "supports_continuation".into(),
1806                    json!(true),
1807                )]),
1808                ..Default::default()
1809            },
1810            Arc::new(AtomicUsize::new(0)),
1811        );
1812        let store = runtime.thread_run_store().expect("store");
1813        let continued_state = PersistedState {
1814            revision: 1,
1815            extensions: HashMap::from([("marker".into(), json!("continued-run-state"))]),
1816        };
1817        let latest_state = PersistedState {
1818            revision: 2,
1819            extensions: HashMap::from([("marker".into(), json!("latest-run-state"))]),
1820        };
1821
1822        store
1823            .checkpoint(
1824                "remote-thread-state",
1825                &[Message::user("waiting turn")],
1826                &RunRecord {
1827                    run_id: "continued-run".into(),
1828                    thread_id: "remote-thread-state".into(),
1829                    agent_id: "remote-root".into(),
1830                    parent_run_id: None,
1831                    request: None,
1832                    input: None,
1833                    output: None,
1834                    status: RunStatus::Waiting,
1835                    termination_reason: None,
1836                    final_output: None,
1837                    error_payload: None,
1838                    dispatch_id: None,
1839                    session_id: None,
1840                    transport_request_id: None,
1841                    waiting: None,
1842                    outcome: None,
1843                    created_at: 1,
1844                    started_at: None,
1845                    finished_at: None,
1846                    updated_at: 1,
1847                    steps: 1,
1848                    input_tokens: 0,
1849                    output_tokens: 0,
1850                    state: Some(continued_state),
1851                },
1852            )
1853            .await
1854            .expect("seed continued run");
1855        store
1856            .checkpoint(
1857                "remote-thread-state",
1858                &[Message::user("latest turn")],
1859                &RunRecord {
1860                    run_id: "latest-run".into(),
1861                    thread_id: "remote-thread-state".into(),
1862                    agent_id: "remote-root".into(),
1863                    parent_run_id: None,
1864                    request: None,
1865                    input: None,
1866                    output: None,
1867                    status: RunStatus::Done,
1868                    termination_reason: None,
1869                    final_output: None,
1870                    error_payload: None,
1871                    dispatch_id: None,
1872                    session_id: None,
1873                    transport_request_id: None,
1874                    waiting: None,
1875                    outcome: None,
1876                    created_at: 2,
1877                    started_at: None,
1878                    finished_at: None,
1879                    updated_at: 2,
1880                    steps: 1,
1881                    input_tokens: 0,
1882                    output_tokens: 0,
1883                    state: Some(latest_state),
1884                },
1885            )
1886            .await
1887            .expect("seed latest run");
1888
1889        runtime
1890            .run(
1891                RunRequest::new("remote-thread-state", vec![Message::user("resume")])
1892                    .with_agent_id("remote-root")
1893                    .with_continue_run_id("continued-run"),
1894                Arc::new(VecEventSink::new()),
1895            )
1896            .await
1897            .expect("remote continuation should run");
1898
1899        let continued = store
1900            .load_run("continued-run")
1901            .await
1902            .expect("load continued run")
1903            .expect("continued run");
1904        assert_eq!(
1905            continued
1906                .state
1907                .as_ref()
1908                .and_then(|state| state.extensions.get("marker"))
1909                .and_then(Value::as_str),
1910            Some("continued-run-state")
1911        );
1912    }
1913
1914    #[cfg(feature = "a2a")]
1915    #[tokio::test]
1916    async fn send_decisions_returns_false_for_remote_backend_without_decision_support() {
1917        let mut endpoint = RemoteEndpoint {
1918            backend: "test-remote".into(),
1919            base_url: "https://remote.example.com".into(),
1920            ..Default::default()
1921        };
1922        endpoint
1923            .options
1924            .insert("delay_ms".into(), serde_json::json!(100));
1925        let runtime = Arc::new(build_remote_runtime(
1926            endpoint,
1927            Arc::new(AtomicUsize::new(0)),
1928        ));
1929        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
1930
1931        let run_task = {
1932            let runtime = runtime.clone();
1933            let sink = sink.clone();
1934            tokio::spawn(async move {
1935                runtime
1936                    .run(
1937                        RunRequest::new("remote-thread-live", vec![Message::user("hello")])
1938                            .with_agent_id("remote-root"),
1939                        sink,
1940                    )
1941                    .await
1942            })
1943        };
1944
1945        tokio::task::yield_now().await;
1946        let sent = runtime.send_decisions(
1947            "remote-thread-live",
1948            vec![(
1949                "call-1".into(),
1950                ToolCallResume {
1951                    decision_id: "d1".into(),
1952                    action: ResumeDecisionAction::Resume,
1953                    result: Value::Null,
1954                    reason: None,
1955                    updated_at: 1,
1956                },
1957            )],
1958        );
1959        assert!(
1960            !sent,
1961            "remote backends without decision support must not expose a live decision channel"
1962        );
1963
1964        let result = run_task
1965            .await
1966            .expect("join should succeed")
1967            .expect("run should succeed");
1968        assert_eq!(result.response, "remote root response");
1969    }
1970
1971    struct ToggleSuspendTool {
1972        calls: AtomicUsize,
1973    }
1974
1975    #[async_trait]
1976    impl Tool for ToggleSuspendTool {
1977        fn descriptor(&self) -> ToolDescriptor {
1978            ToolDescriptor::new("dangerous", "dangerous", "suspend then succeed")
1979        }
1980
1981        async fn execute(
1982            &self,
1983            args: Value,
1984            _ctx: &ToolCallContext,
1985        ) -> Result<ToolOutput, ToolError> {
1986            let n = self.calls.fetch_add(1, Ordering::SeqCst);
1987            if n == 0 {
1988                Ok(ToolResult::suspended("dangerous", "needs approval").into())
1989            } else {
1990                Ok(ToolResult::success_with_message("dangerous", args, "approved").into())
1991            }
1992        }
1993    }
1994
1995    struct EchoTool {
1996        calls: AtomicUsize,
1997    }
1998
1999    #[async_trait]
2000    impl Tool for EchoTool {
2001        fn descriptor(&self) -> ToolDescriptor {
2002            ToolDescriptor::new("echo", "echo", "echo success")
2003        }
2004
2005        async fn execute(
2006            &self,
2007            args: Value,
2008            _ctx: &ToolCallContext,
2009        ) -> Result<ToolOutput, ToolError> {
2010            self.calls.fetch_add(1, Ordering::SeqCst);
2011            Ok(ToolResult::success("echo", args).into())
2012        }
2013    }
2014
2015    struct RecordingToolPolicyHook {
2016        seen: Arc<Mutex<Vec<ToolPolicyContext>>>,
2017    }
2018
2019    #[async_trait]
2020    impl ToolPolicyHook for RecordingToolPolicyHook {
2021        async fn decide(
2022            &self,
2023            ctx: &ToolPolicyContext,
2024        ) -> Result<ToolPolicyDecision, awaken_contract::StateError> {
2025            self.seen.lock().expect("lock poisoned").push(ctx.clone());
2026            if ctx.run_mode == RunMode::Scheduled
2027                && ctx.adapter == AdapterKind::Acp
2028                && ctx.tool_name == "echo"
2029            {
2030                return Ok(ToolPolicyDecision::Deny {
2031                    reason: "scheduled ACP echo denied".into(),
2032                });
2033            }
2034            Ok(ToolPolicyDecision::Allow)
2035        }
2036    }
2037
2038    struct RecordingToolPolicyPlugin {
2039        seen: Arc<Mutex<Vec<ToolPolicyContext>>>,
2040    }
2041
2042    impl Plugin for RecordingToolPolicyPlugin {
2043        fn descriptor(&self) -> PluginDescriptor {
2044            PluginDescriptor {
2045                name: "recording-tool-policy",
2046            }
2047        }
2048
2049        fn register(
2050            &self,
2051            registrar: &mut PluginRegistrar,
2052        ) -> Result<(), awaken_contract::StateError> {
2053            registrar.register_tool_policy_hook(
2054                "recording-tool-policy",
2055                RecordingToolPolicyHook {
2056                    seen: Arc::clone(&self.seen),
2057                },
2058            )
2059        }
2060    }
2061
2062    struct SpawnShortBgTaskTool {
2063        manager: Arc<crate::extensions::background::BackgroundTaskManager>,
2064        delay_ms: u64,
2065    }
2066
2067    #[async_trait]
2068    impl Tool for SpawnShortBgTaskTool {
2069        fn descriptor(&self) -> ToolDescriptor {
2070            ToolDescriptor::new("spawn_bg", "spawn_bg", "spawn short background task")
2071        }
2072
2073        async fn execute(
2074            &self,
2075            _args: Value,
2076            ctx: &ToolCallContext,
2077        ) -> Result<ToolOutput, ToolError> {
2078            let delay = self.delay_ms;
2079            self.manager
2080                .spawn(
2081                    &ctx.run_identity.thread_id,
2082                    "bg",
2083                    None,
2084                    "short task",
2085                    crate::extensions::background::TaskParentContext::default(),
2086                    move |_task_ctx| async move {
2087                        tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2088                        crate::extensions::background::TaskResult::Success(json!({
2089                            "done": true,
2090                            "source": "background"
2091                        }))
2092                    },
2093                )
2094                .await
2095                .map_err(|e| ToolError::ExecutionFailed(e.to_string()))?;
2096            Ok(ToolResult::success("spawn_bg", json!({"spawned": true})).into())
2097        }
2098    }
2099
2100    struct RecordingLlm {
2101        responses: Mutex<Vec<StreamResult>>,
2102        requests: Arc<Mutex<Vec<InferenceRequest>>>,
2103    }
2104
2105    impl RecordingLlm {
2106        fn new(responses: Vec<StreamResult>, requests: Arc<Mutex<Vec<InferenceRequest>>>) -> Self {
2107            Self {
2108                responses: Mutex::new(responses),
2109                requests,
2110            }
2111        }
2112    }
2113
2114    #[async_trait]
2115    impl LlmExecutor for RecordingLlm {
2116        async fn execute(
2117            &self,
2118            request: InferenceRequest,
2119        ) -> Result<StreamResult, InferenceExecutionError> {
2120            self.requests.lock().expect("lock poisoned").push(request);
2121            let mut responses = self.responses.lock().expect("lock poisoned");
2122            Ok(responses.remove(0))
2123        }
2124
2125        fn name(&self) -> &str {
2126            "recording"
2127        }
2128    }
2129
2130    struct FixedResolver {
2131        agent: ResolvedAgent,
2132        plugins: Vec<Arc<dyn Plugin>>,
2133    }
2134
2135    impl AgentResolver for FixedResolver {
2136        fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, crate::error::RuntimeError> {
2137            let mut agent = self.agent.clone();
2138            agent.env = build_agent_env(&self.plugins, &agent)?;
2139            Ok(agent)
2140        }
2141    }
2142
2143    struct ThreadCounterKey;
2144
2145    impl StateKey for ThreadCounterKey {
2146        const KEY: &'static str = "test.thread_counter";
2147        type Value = u32;
2148        type Update = u32;
2149
2150        fn apply(value: &mut Self::Value, update: Self::Update) {
2151            *value = update;
2152        }
2153    }
2154
2155    struct ThreadCounterPlugin;
2156
2157    impl Plugin for ThreadCounterPlugin {
2158        fn descriptor(&self) -> PluginDescriptor {
2159            PluginDescriptor {
2160                name: "test.thread-counter",
2161            }
2162        }
2163
2164        fn register(
2165            &self,
2166            registrar: &mut PluginRegistrar,
2167        ) -> Result<(), awaken_contract::StateError> {
2168            registrar.register_key::<ThreadCounterKey>(StateKeyOptions {
2169                persistent: true,
2170                scope: KeyScope::Thread,
2171                ..StateKeyOptions::default()
2172            })?;
2173            registrar.register_phase_hook(
2174                "test.thread-counter",
2175                awaken_contract::model::Phase::RunStart,
2176                ThreadCounterHook,
2177            )
2178        }
2179    }
2180
2181    struct ThreadCounterHook;
2182
2183    #[async_trait]
2184    impl PhaseHook for ThreadCounterHook {
2185        async fn run(
2186            &self,
2187            ctx: &PhaseContext,
2188        ) -> Result<StateCommand, awaken_contract::StateError> {
2189            let next = ctx.state::<ThreadCounterKey>().copied().unwrap_or(0) + 1;
2190            let mut cmd = StateCommand::new();
2191            cmd.update::<ThreadCounterKey>(next);
2192            Ok(cmd)
2193        }
2194    }
2195
2196    struct SequentialVisibilityKey;
2197
2198    impl StateKey for SequentialVisibilityKey {
2199        const KEY: &'static str = "test.sequential_visibility";
2200        type Value = bool;
2201        type Update = bool;
2202
2203        fn apply(value: &mut Self::Value, update: Self::Update) {
2204            *value = update;
2205        }
2206    }
2207
2208    struct SequentialVisibilityPlugin;
2209
2210    impl Plugin for SequentialVisibilityPlugin {
2211        fn descriptor(&self) -> PluginDescriptor {
2212            PluginDescriptor {
2213                name: "test.sequential-visibility",
2214            }
2215        }
2216
2217        fn register(
2218            &self,
2219            registrar: &mut PluginRegistrar,
2220        ) -> Result<(), awaken_contract::StateError> {
2221            registrar.register_key::<SequentialVisibilityKey>(StateKeyOptions::default())?;
2222            registrar.register_phase_hook(
2223                "test.sequential-visibility",
2224                awaken_contract::model::Phase::AfterToolExecute,
2225                SequentialVisibilityHook,
2226            )
2227        }
2228    }
2229
2230    struct SequentialVisibilityHook;
2231
2232    #[async_trait]
2233    impl PhaseHook for SequentialVisibilityHook {
2234        async fn run(
2235            &self,
2236            ctx: &PhaseContext,
2237        ) -> Result<StateCommand, awaken_contract::StateError> {
2238            let mut cmd = StateCommand::new();
2239            if ctx.tool_name.as_deref() == Some("writer") {
2240                cmd.update::<SequentialVisibilityKey>(true);
2241            }
2242            Ok(cmd)
2243        }
2244    }
2245
2246    struct WriterTool;
2247
2248    #[async_trait]
2249    impl Tool for WriterTool {
2250        fn descriptor(&self) -> ToolDescriptor {
2251            ToolDescriptor::new("writer", "writer", "writes marker in hook")
2252        }
2253
2254        async fn execute(
2255            &self,
2256            _args: Value,
2257            _ctx: &ToolCallContext,
2258        ) -> Result<ToolOutput, ToolError> {
2259            Ok(ToolResult::success("writer", Value::Null).into())
2260        }
2261    }
2262
2263    struct ReaderTool {
2264        saw_marker: Arc<std::sync::atomic::AtomicBool>,
2265    }
2266
2267    #[async_trait]
2268    impl Tool for ReaderTool {
2269        fn descriptor(&self) -> ToolDescriptor {
2270            ToolDescriptor::new("reader", "reader", "reads marker from snapshot")
2271        }
2272
2273        async fn execute(
2274            &self,
2275            _args: Value,
2276            ctx: &ToolCallContext,
2277        ) -> Result<ToolOutput, ToolError> {
2278            let saw = ctx
2279                .snapshot
2280                .get::<SequentialVisibilityKey>()
2281                .copied()
2282                .unwrap_or(false);
2283            self.saw_marker.store(saw, Ordering::SeqCst);
2284            Ok(ToolResult::success("reader", Value::Null).into())
2285        }
2286    }
2287
2288    fn seeded_run_record(
2289        run_id: &str,
2290        thread_id: &str,
2291        agent_id: &str,
2292        state: Option<PersistedState>,
2293    ) -> RunRecord {
2294        RunRecord {
2295            run_id: run_id.to_string(),
2296            thread_id: thread_id.to_string(),
2297            agent_id: agent_id.to_string(),
2298            parent_run_id: None,
2299            request: None,
2300            input: None,
2301            output: None,
2302            status: RunStatus::Done,
2303            termination_reason: None,
2304            final_output: None,
2305            error_payload: None,
2306            dispatch_id: None,
2307            session_id: None,
2308            transport_request_id: None,
2309            waiting: None,
2310            outcome: None,
2311            created_at: 1,
2312            started_at: None,
2313            finished_at: None,
2314            updated_at: 1,
2315            steps: 1,
2316            input_tokens: 0,
2317            output_tokens: 0,
2318            state,
2319        }
2320    }
2321
2322    #[tokio::test]
2323    async fn run_to_completion_returns_final_result() {
2324        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
2325            content: vec![ContentBlock::text("ok")],
2326            tool_calls: vec![],
2327            usage: None,
2328            stop_reason: Some(StopReason::EndTurn),
2329            has_incomplete_tool_calls: false,
2330        }]));
2331        let resolver = Arc::new(FixedResolver {
2332            agent: ResolvedAgent::new("agent", "m", "sys", llm),
2333            plugins: vec![],
2334        });
2335        let runtime = AgentRuntime::new(resolver);
2336
2337        let result = runtime
2338            .run_to_completion(
2339                RunRequest::new("thread-completion", vec![Message::user("hi")])
2340                    .with_agent_id("agent"),
2341            )
2342            .await
2343            .expect("run should succeed");
2344
2345        assert_eq!(result.response, "ok");
2346        assert_eq!(
2347            result.termination,
2348            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
2349        );
2350    }
2351
2352    #[tokio::test]
2353    async fn run_request_overrides_are_forwarded_to_inference() {
2354        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
2355            content: vec![ContentBlock::text("ok")],
2356            tool_calls: vec![],
2357            usage: Some(awaken_contract::contract::inference::TokenUsage {
2358                prompt_tokens: Some(11),
2359                completion_tokens: Some(7),
2360                ..Default::default()
2361            }),
2362            stop_reason: Some(StopReason::EndTurn),
2363            has_incomplete_tool_calls: false,
2364        }]));
2365        let resolver = Arc::new(FixedResolver {
2366            agent: ResolvedAgent::new("agent", "m", "sys", llm.clone()),
2367            plugins: vec![],
2368        });
2369        let runtime = AgentRuntime::new(resolver);
2370        let sink = Arc::new(VecEventSink::new());
2371        let override_req = InferenceOverride {
2372            upstream_model: Some("override-model".into()),
2373            temperature: Some(0.3),
2374            max_tokens: Some(77),
2375            ..Default::default()
2376        };
2377
2378        let result = runtime
2379            .run(
2380                RunRequest::new("thread-ovr", vec![Message::user("hi")])
2381                    .with_agent_id("agent")
2382                    .with_overrides(override_req.clone()),
2383                sink.clone(),
2384            )
2385            .await
2386            .expect("run should succeed");
2387
2388        assert_eq!(
2389            result.termination,
2390            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
2391        );
2392        let seen = llm.seen_overrides.lock().expect("lock poisoned");
2393        assert_eq!(seen.len(), 1);
2394        assert_eq!(
2395            seen[0].as_ref().and_then(|o| o.temperature),
2396            override_req.temperature
2397        );
2398        assert_eq!(
2399            seen[0].as_ref().and_then(|o| o.max_tokens),
2400            override_req.max_tokens
2401        );
2402        assert!(
2403            seen[0]
2404                .as_ref()
2405                .and_then(|o| o.upstream_model.as_ref())
2406                .is_none()
2407        );
2408        let complete_model = sink.events().into_iter().find_map(|event| match event {
2409            AgentEvent::InferenceComplete { model, .. } => Some(model),
2410            _ => None,
2411        });
2412        assert_eq!(complete_model.as_deref(), Some("override-model"));
2413    }
2414
2415    #[tokio::test]
2416    async fn send_decisions_resumes_waiting_run() {
2417        let llm = Arc::new(ScriptedLlm::new(vec![
2418            StreamResult {
2419                content: vec![ContentBlock::text("calling tool")],
2420                tool_calls: vec![awaken_contract::contract::message::ToolCall::new(
2421                    "c1",
2422                    "dangerous",
2423                    json!({"x": 1}),
2424                )],
2425                usage: None,
2426                stop_reason: Some(StopReason::ToolUse),
2427                has_incomplete_tool_calls: false,
2428            },
2429            StreamResult {
2430                content: vec![ContentBlock::text("finished")],
2431                tool_calls: vec![],
2432                usage: None,
2433                stop_reason: Some(StopReason::EndTurn),
2434                has_incomplete_tool_calls: false,
2435            },
2436        ]));
2437        let tool = Arc::new(ToggleSuspendTool {
2438            calls: AtomicUsize::new(0),
2439        });
2440        let resolver = Arc::new(FixedResolver {
2441            agent: ResolvedAgent::new("agent", "m", "sys", llm).with_tool(tool),
2442            plugins: vec![],
2443        });
2444        let runtime = Arc::new(AgentRuntime::new(resolver));
2445        let sink = Arc::new(VecEventSink::new());
2446
2447        let run_task = {
2448            let runtime = Arc::clone(&runtime);
2449            let sink = sink.clone();
2450            tokio::spawn(async move {
2451                runtime
2452                    .run(
2453                        RunRequest::new("thread-live", vec![Message::user("go")])
2454                            .with_agent_id("agent"),
2455                        sink as Arc<dyn EventSink>,
2456                    )
2457                    .await
2458            })
2459        };
2460
2461        let mut sent = false;
2462        for _ in 0..40 {
2463            if runtime.send_decisions(
2464                "thread-live",
2465                vec![(
2466                    "c1".into(),
2467                    ToolCallResume {
2468                        decision_id: "d1".into(),
2469                        action: ResumeDecisionAction::Resume,
2470                        result: Value::Null,
2471                        reason: None,
2472                        updated_at: 1,
2473                    },
2474                )],
2475            ) {
2476                sent = true;
2477                break;
2478            }
2479            tokio::task::yield_now().await;
2480        }
2481        assert!(sent, "should send decision while run is active");
2482
2483        let result = run_task
2484            .await
2485            .expect("join should succeed")
2486            .expect("run should succeed");
2487        assert_eq!(
2488            result.termination,
2489            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
2490        );
2491
2492        let events = sink.take();
2493        assert!(
2494            events.iter().any(|event| {
2495                matches!(
2496                    event,
2497                    AgentEvent::ToolCallResumed { target_id, result }
2498                        if target_id == "c1" && result == &json!({"x": 1})
2499                )
2500            }),
2501            "resumed replay should emit ToolCallResumed with the final tool result: {events:?}"
2502        );
2503    }
2504
2505    #[tokio::test]
2506    async fn run_request_policy_context_reaches_tool_gate() {
2507        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
2508            content: vec![ContentBlock::text("calling echo")],
2509            tool_calls: vec![awaken_contract::contract::message::ToolCall::new(
2510                "c1",
2511                "echo",
2512                json!({"message": "hello"}),
2513            )],
2514            usage: None,
2515            stop_reason: Some(StopReason::ToolUse),
2516            has_incomplete_tool_calls: false,
2517        }]));
2518        let tool = Arc::new(EchoTool {
2519            calls: AtomicUsize::new(0),
2520        });
2521        let seen = Arc::new(Mutex::new(Vec::new()));
2522        let resolver = Arc::new(FixedResolver {
2523            agent: ResolvedAgent::new("agent", "m", "sys", llm).with_tool(tool.clone()),
2524            plugins: vec![Arc::new(RecordingToolPolicyPlugin {
2525                seen: Arc::clone(&seen),
2526            })],
2527        });
2528        let runtime = AgentRuntime::new(resolver);
2529
2530        let result = runtime
2531            .run(
2532                RunRequest::new("thread-policy", vec![Message::user("use echo")])
2533                    .with_agent_id("agent")
2534                    .with_run_mode(RunMode::Scheduled)
2535                    .with_adapter(AdapterKind::Acp),
2536                Arc::new(VecEventSink::new()),
2537            )
2538            .await
2539            .expect("run should reach policy hook");
2540
2541        assert!(matches!(
2542            result.termination,
2543            TerminationReason::Blocked(ref reason) if reason == "scheduled ACP echo denied"
2544        ));
2545        assert_eq!(
2546            tool.calls.load(Ordering::SeqCst),
2547            0,
2548            "denied tool must not execute"
2549        );
2550
2551        let contexts = seen.lock().expect("lock poisoned");
2552        assert_eq!(contexts.len(), 1);
2553        let ctx = &contexts[0];
2554        assert_eq!(ctx.thread_id, "thread-policy");
2555        assert_eq!(ctx.run_mode, RunMode::Scheduled);
2556        assert_eq!(ctx.adapter, AdapterKind::Acp);
2557        assert_eq!(ctx.dispatch_id, None);
2558        assert_eq!(ctx.tool_name, "echo");
2559    }
2560
2561    #[tokio::test]
2562    async fn background_events_buffer_while_suspended_until_decision_arrives() {
2563        use awaken_contract::contract::message::{Role, Visibility};
2564
2565        let requests = Arc::new(Mutex::new(Vec::new()));
2566        let llm = Arc::new(RecordingLlm::new(
2567            vec![
2568                StreamResult {
2569                    content: vec![ContentBlock::text("start tools")],
2570                    tool_calls: vec![
2571                        awaken_contract::contract::message::ToolCall::new(
2572                            "bg1",
2573                            "spawn_bg",
2574                            json!({}),
2575                        ),
2576                        awaken_contract::contract::message::ToolCall::new(
2577                            "c1",
2578                            "dangerous",
2579                            json!({"x": 1}),
2580                        ),
2581                    ],
2582                    usage: None,
2583                    stop_reason: Some(StopReason::ToolUse),
2584                    has_incomplete_tool_calls: false,
2585                },
2586                StreamResult {
2587                    content: vec![ContentBlock::text("done after approval")],
2588                    tool_calls: vec![],
2589                    usage: None,
2590                    stop_reason: Some(StopReason::EndTurn),
2591                    has_incomplete_tool_calls: false,
2592                },
2593            ],
2594            requests.clone(),
2595        ));
2596        let manager = Arc::new(crate::extensions::background::BackgroundTaskManager::new());
2597        let resolver = Arc::new(FixedResolver {
2598            agent: ResolvedAgent::new("agent", "m", "sys", llm)
2599                .with_tool(Arc::new(SpawnShortBgTaskTool {
2600                    manager: manager.clone(),
2601                    delay_ms: 25,
2602                }))
2603                .with_tool(Arc::new(ToggleSuspendTool {
2604                    calls: AtomicUsize::new(0),
2605                })),
2606            plugins: vec![Arc::new(
2607                crate::extensions::background::BackgroundTaskPlugin::new(manager),
2608            )],
2609        });
2610        let runtime = Arc::new(AgentRuntime::new(resolver));
2611        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
2612
2613        let run_task = {
2614            let runtime = runtime.clone();
2615            let sink = sink.clone();
2616            tokio::spawn(async move {
2617                runtime
2618                    .run(
2619                        RunRequest::new("thread-bg-suspend", vec![Message::user("go")])
2620                            .with_agent_id("agent"),
2621                        sink,
2622                    )
2623                    .await
2624            })
2625        };
2626
2627        tokio::time::sleep(std::time::Duration::from_millis(80)).await;
2628        assert_eq!(
2629            requests.lock().expect("lock poisoned").len(),
2630            1,
2631            "background completion must not resume the LLM before the suspended tool is decided"
2632        );
2633
2634        let sent = runtime.send_decisions(
2635            "thread-bg-suspend",
2636            vec![(
2637                "c1".into(),
2638                ToolCallResume {
2639                    decision_id: "d1".into(),
2640                    action: ResumeDecisionAction::Resume,
2641                    result: Value::Null,
2642                    reason: None,
2643                    updated_at: 1,
2644                },
2645            )],
2646        );
2647        assert!(sent, "decision should reach the waiting run");
2648
2649        let result = run_task
2650            .await
2651            .expect("join should succeed")
2652            .expect("run should succeed");
2653        assert_eq!(
2654            result.termination,
2655            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
2656        );
2657
2658        let recorded = requests.lock().expect("lock poisoned");
2659        assert_eq!(
2660            recorded.len(),
2661            2,
2662            "run should resume exactly once after approval"
2663        );
2664        assert!(
2665            recorded[1].messages.iter().any(|message| {
2666                message.role == Role::User
2667                    && message.visibility == Visibility::Internal
2668                    && message.text().contains("background-task-event")
2669                    && message.text().contains("\"done\":true")
2670            }),
2671            "buffered background event should be injected into the resumed request"
2672        );
2673    }
2674
2675    #[tokio::test]
2676    async fn new_user_message_supersedes_suspended_calls_but_keeps_completed_results() {
2677        use awaken_contract::contract::lifecycle::RunStatus;
2678        use awaken_contract::contract::message::Role;
2679        use awaken_contract::contract::storage::ThreadStore;
2680        use awaken_stores::InMemoryStore;
2681
2682        let llm = Arc::new(ScriptedLlm::new(vec![
2683            StreamResult {
2684                content: vec![ContentBlock::text("call tools")],
2685                tool_calls: vec![
2686                    awaken_contract::contract::message::ToolCall::new(
2687                        "c_echo",
2688                        "echo",
2689                        json!({"ok": true}),
2690                    ),
2691                    awaken_contract::contract::message::ToolCall::new(
2692                        "c_suspend",
2693                        "dangerous",
2694                        json!({"danger": true}),
2695                    ),
2696                ],
2697                usage: None,
2698                stop_reason: Some(StopReason::ToolUse),
2699                has_incomplete_tool_calls: false,
2700            },
2701            StreamResult {
2702                content: vec![ContentBlock::text("fresh answer")],
2703                tool_calls: vec![],
2704                usage: None,
2705                stop_reason: Some(StopReason::EndTurn),
2706                has_incomplete_tool_calls: false,
2707            },
2708        ]));
2709        let echo = Arc::new(EchoTool {
2710            calls: AtomicUsize::new(0),
2711        });
2712        let dangerous = Arc::new(ToggleSuspendTool {
2713            calls: AtomicUsize::new(0),
2714        });
2715        let resolver = Arc::new(FixedResolver {
2716            agent: ResolvedAgent::new("agent", "m", "sys", llm)
2717                .with_tool(echo.clone())
2718                .with_tool(dangerous.clone()),
2719            plugins: vec![],
2720        });
2721        let store = Arc::new(InMemoryStore::new());
2722        let runtime = Arc::new(
2723            AgentRuntime::new(resolver)
2724                .with_thread_run_store(store.clone() as Arc<dyn ThreadRunStore>),
2725        );
2726        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
2727
2728        let first_run = {
2729            let runtime = runtime.clone();
2730            let sink = sink.clone();
2731            tokio::spawn(async move {
2732                runtime
2733                    .run(
2734                        RunRequest::new("thread-supersede", vec![Message::user("first")])
2735                            .with_agent_id("agent"),
2736                        sink,
2737                    )
2738                    .await
2739            })
2740        };
2741
2742        let wait_deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
2743        loop {
2744            if let Some(run) = store
2745                .latest_run("thread-supersede")
2746                .await
2747                .expect("latest run lookup should succeed")
2748                && run.status == RunStatus::Waiting
2749                && run.waiting_reason() == Some(WaitingReason::ToolPermission)
2750            {
2751                let waiting = run.waiting.expect("waiting state should be durable");
2752                assert_eq!(waiting.ticket_ids, vec!["c_suspend"]);
2753                assert_eq!(waiting.tickets.len(), 1);
2754                assert_eq!(waiting.tickets[0].tool_call_id, "c_suspend");
2755                assert_eq!(waiting.tickets[0].tool_name, "dangerous");
2756                assert_eq!(waiting.tickets[0].arguments, json!({"danger": true}));
2757                break;
2758            }
2759            assert!(
2760                std::time::Instant::now() < wait_deadline,
2761                "timed out waiting for suspended checkpoint"
2762            );
2763            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2764        }
2765
2766        assert!(
2767            runtime.cancel_and_wait_by_thread("thread-supersede").await,
2768            "new message path should be able to supersede the suspended run"
2769        );
2770
2771        let first = first_run
2772            .await
2773            .expect("join should succeed")
2774            .expect("first run should terminate cleanly");
2775        assert_eq!(
2776            first.termination,
2777            awaken_contract::contract::lifecycle::TerminationReason::Cancelled
2778        );
2779
2780        let second = runtime
2781            .run(
2782                RunRequest::new("thread-supersede", vec![Message::user("second")])
2783                    .with_agent_id("agent"),
2784                sink,
2785            )
2786            .await
2787            .expect("second run should succeed");
2788        assert_eq!(
2789            second.termination,
2790            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
2791        );
2792        assert_eq!(
2793            echo.calls.load(Ordering::SeqCst),
2794            1,
2795            "successful tool calls from the superseded run must not replay"
2796        );
2797        assert_eq!(
2798            dangerous.calls.load(Ordering::SeqCst),
2799            1,
2800            "suspended tool calls must be superseded instead of replayed on new user input"
2801        );
2802
2803        let messages = ThreadStore::load_messages(&*store, "thread-supersede")
2804            .await
2805            .expect("load messages should succeed")
2806            .expect("thread messages should exist");
2807        assert!(
2808            messages.iter().any(|message| message.role == Role::Tool
2809                && message.tool_call_id.as_deref() == Some("c_echo")),
2810            "completed tool result should remain in durable history"
2811        );
2812        assert!(
2813            !messages
2814                .iter()
2815                .filter(|message| message.role == Role::Assistant)
2816                .filter_map(|message| message.tool_calls.as_ref())
2817                .flatten()
2818                .any(|call| call.id == "c_suspend"),
2819            "superseded suspended tool calls should be stripped from later history"
2820        );
2821    }
2822
2823    #[tokio::test]
2824    async fn sequential_tool_execution_sees_latest_state_between_calls() {
2825        let llm = Arc::new(ScriptedLlm::new(vec![
2826            StreamResult {
2827                content: vec![ContentBlock::text("tools")],
2828                tool_calls: vec![
2829                    awaken_contract::contract::message::ToolCall::new("c1", "writer", json!({})),
2830                    awaken_contract::contract::message::ToolCall::new("c2", "reader", json!({})),
2831                ],
2832                usage: None,
2833                stop_reason: Some(StopReason::ToolUse),
2834                has_incomplete_tool_calls: false,
2835            },
2836            StreamResult {
2837                content: vec![ContentBlock::text("done")],
2838                tool_calls: vec![],
2839                usage: None,
2840                stop_reason: Some(StopReason::EndTurn),
2841                has_incomplete_tool_calls: false,
2842            },
2843        ]));
2844        let saw_marker = Arc::new(std::sync::atomic::AtomicBool::new(false));
2845        let resolver = Arc::new(FixedResolver {
2846            agent: ResolvedAgent::new("agent", "m", "sys", llm)
2847                .with_tool(Arc::new(WriterTool))
2848                .with_tool(Arc::new(ReaderTool {
2849                    saw_marker: saw_marker.clone(),
2850                })),
2851            plugins: vec![Arc::new(SequentialVisibilityPlugin)],
2852        });
2853        let runtime = AgentRuntime::new(resolver);
2854        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
2855
2856        let result = runtime
2857            .run(
2858                RunRequest::new("thread-seq-visibility", vec![Message::user("go")])
2859                    .with_agent_id("agent"),
2860                sink.clone(),
2861            )
2862            .await
2863            .expect("run should succeed");
2864
2865        assert_eq!(
2866            result.termination,
2867            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
2868        );
2869        assert!(
2870            saw_marker.load(Ordering::SeqCst),
2871            "second tool should observe state written after first tool"
2872        );
2873    }
2874
2875    #[tokio::test]
2876    async fn checkpoint_persists_state_and_thread_together() {
2877        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
2878            content: vec![ContentBlock::text("ok")],
2879            tool_calls: vec![],
2880            usage: Some(awaken_contract::contract::inference::TokenUsage {
2881                prompt_tokens: Some(11),
2882                completion_tokens: Some(7),
2883                ..Default::default()
2884            }),
2885            stop_reason: Some(StopReason::EndTurn),
2886            has_incomplete_tool_calls: false,
2887        }]));
2888        let resolver = Arc::new(FixedResolver {
2889            agent: ResolvedAgent::new("agent", "m", "sys", llm),
2890            plugins: vec![],
2891        });
2892        let store = Arc::new(InMemoryStore::new());
2893        let runtime = AgentRuntime::new(resolver)
2894            .with_thread_run_store(store.clone() as Arc<dyn ThreadRunStore>);
2895        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
2896
2897        let result = runtime
2898            .run(
2899                RunRequest::new("thread-tx", vec![Message::user("hi")]).with_agent_id("agent"),
2900                sink.clone(),
2901            )
2902            .await
2903            .expect("run should succeed");
2904        assert_eq!(
2905            result.termination,
2906            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
2907        );
2908
2909        let latest = store
2910            .latest_run("thread-tx")
2911            .await
2912            .expect("latest run lookup")
2913            .expect("run persisted");
2914        assert_eq!(latest.thread_id, "thread-tx");
2915        assert!(latest.state.is_some(), "state snapshot should be persisted");
2916        assert_eq!(latest.input_tokens, 11);
2917        assert_eq!(latest.output_tokens, 7);
2918
2919        let msgs = store
2920            .load_messages("thread-tx")
2921            .await
2922            .expect("load messages")
2923            .expect("thread should exist");
2924        assert!(!msgs.is_empty());
2925    }
2926
2927    #[tokio::test]
2928    async fn run_request_without_agent_id_prefers_latest_thread_state_agent() {
2929        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
2930            content: vec![ContentBlock::text("ok")],
2931            tool_calls: vec![],
2932            usage: None,
2933            stop_reason: Some(StopReason::EndTurn),
2934            has_incomplete_tool_calls: false,
2935        }]));
2936        let resolver = Arc::new(FixedResolver {
2937            agent: ResolvedAgent::new("agent", "m", "sys", llm),
2938            plugins: vec![],
2939        });
2940        let store = Arc::new(InMemoryStore::new());
2941
2942        let mut extensions = HashMap::new();
2943        extensions.insert(
2944            <ActiveAgentIdKey as StateKey>::KEY.to_string(),
2945            Value::String("agent-from-state".into()),
2946        );
2947        store
2948            .create_run(&seeded_run_record(
2949                "seed-1",
2950                "thread-infer-state",
2951                "agent-from-record",
2952                Some(PersistedState {
2953                    revision: 1,
2954                    extensions,
2955                }),
2956            ))
2957            .await
2958            .expect("seed run record");
2959
2960        let runtime = AgentRuntime::new(resolver)
2961            .with_thread_run_store(store.clone() as Arc<dyn ThreadRunStore>);
2962        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
2963
2964        runtime
2965            .run(
2966                RunRequest::new("thread-infer-state", vec![Message::user("hi")]),
2967                sink.clone(),
2968            )
2969            .await
2970            .expect("run should succeed");
2971
2972        let latest = store
2973            .latest_run("thread-infer-state")
2974            .await
2975            .expect("latest run lookup")
2976            .expect("run persisted");
2977        assert_eq!(latest.agent_id, "agent-from-state");
2978    }
2979
2980    #[tokio::test]
2981    async fn run_request_without_agent_id_falls_back_to_latest_run_record_agent_id() {
2982        let llm = Arc::new(ScriptedLlm::new(vec![StreamResult {
2983            content: vec![ContentBlock::text("ok")],
2984            tool_calls: vec![],
2985            usage: None,
2986            stop_reason: Some(StopReason::EndTurn),
2987            has_incomplete_tool_calls: false,
2988        }]));
2989        let resolver = Arc::new(FixedResolver {
2990            agent: ResolvedAgent::new("agent", "m", "sys", llm),
2991            plugins: vec![],
2992        });
2993        let store = Arc::new(InMemoryStore::new());
2994
2995        store
2996            .create_run(&seeded_run_record(
2997                "seed-2",
2998                "thread-infer-record",
2999                "agent-from-record",
3000                None,
3001            ))
3002            .await
3003            .expect("seed run record");
3004
3005        let runtime = AgentRuntime::new(resolver)
3006            .with_thread_run_store(store.clone() as Arc<dyn ThreadRunStore>);
3007        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
3008
3009        runtime
3010            .run(
3011                RunRequest::new("thread-infer-record", vec![Message::user("hi")]),
3012                sink.clone(),
3013            )
3014            .await
3015            .expect("run should succeed");
3016
3017        let latest = store
3018            .latest_run("thread-infer-record")
3019            .await
3020            .expect("latest run lookup")
3021            .expect("run persisted");
3022        assert_eq!(latest.agent_id, "agent-from-record");
3023    }
3024
3025    #[tokio::test]
3026    async fn thread_scoped_state_restores_before_run_start_hooks() {
3027        let llm = Arc::new(ScriptedLlm::new(vec![
3028            StreamResult {
3029                content: vec![ContentBlock::text("ok-1")],
3030                tool_calls: vec![],
3031                usage: None,
3032                stop_reason: Some(StopReason::EndTurn),
3033                has_incomplete_tool_calls: false,
3034            },
3035            StreamResult {
3036                content: vec![ContentBlock::text("ok-2")],
3037                tool_calls: vec![],
3038                usage: None,
3039                stop_reason: Some(StopReason::EndTurn),
3040                has_incomplete_tool_calls: false,
3041            },
3042        ]));
3043        let resolver = Arc::new(FixedResolver {
3044            agent: ResolvedAgent::new("agent", "m", "sys", llm),
3045            plugins: vec![Arc::new(ThreadCounterPlugin)],
3046        });
3047        let store = Arc::new(InMemoryStore::new());
3048        let runtime = AgentRuntime::new(resolver)
3049            .with_thread_run_store(store.clone() as Arc<dyn ThreadRunStore>);
3050        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
3051
3052        runtime
3053            .run(
3054                RunRequest::new("thread-counter", vec![Message::user("first")])
3055                    .with_agent_id("agent"),
3056                sink.clone(),
3057            )
3058            .await
3059            .expect("first run should succeed");
3060
3061        runtime
3062            .run(
3063                RunRequest::new("thread-counter", vec![Message::user("second")])
3064                    .with_agent_id("agent"),
3065                sink.clone(),
3066            )
3067            .await
3068            .expect("second run should succeed");
3069
3070        let runs = store
3071            .list_runs(&RunQuery {
3072                thread_id: Some("thread-counter".into()),
3073                ..RunQuery::default()
3074            })
3075            .await
3076            .expect("run list lookup");
3077
3078        let max_counter = runs
3079            .items
3080            .iter()
3081            .filter_map(|record| record.state.as_ref())
3082            .filter_map(|persisted| persisted.extensions.get(ThreadCounterKey::KEY))
3083            .filter_map(serde_json::Value::as_u64)
3084            .max()
3085            .expect("thread counter should be persisted");
3086        assert_eq!(max_counter, 2, "counter should continue across runs");
3087    }
3088
3089    // -----------------------------------------------------------------------
3090    // Truncation recovery tests
3091    // -----------------------------------------------------------------------
3092
3093    /// LLM executor that emits truncated tool call JSON on the first call,
3094    /// then a normal response on subsequent calls.
3095    struct TruncatingLlm {
3096        call_count: AtomicUsize,
3097        /// Responses to return after the first (truncated) call.
3098        followup_responses: Mutex<Vec<StreamResult>>,
3099        upstream_models_seen: Mutex<Vec<String>>,
3100    }
3101
3102    impl TruncatingLlm {
3103        fn new(followup_responses: Vec<StreamResult>) -> Self {
3104            Self {
3105                call_count: AtomicUsize::new(0),
3106                followup_responses: Mutex::new(followup_responses),
3107                upstream_models_seen: Mutex::new(Vec::new()),
3108            }
3109        }
3110    }
3111
3112    #[async_trait]
3113    impl LlmExecutor for TruncatingLlm {
3114        async fn execute(
3115            &self,
3116            _request: InferenceRequest,
3117        ) -> Result<StreamResult, InferenceExecutionError> {
3118            unreachable!("execute_stream is overridden");
3119        }
3120
3121        fn execute_stream(
3122            &self,
3123            request: InferenceRequest,
3124        ) -> std::pin::Pin<
3125            Box<
3126                dyn std::future::Future<
3127                        Output = Result<
3128                            awaken_contract::contract::executor::InferenceStream,
3129                            InferenceExecutionError,
3130                        >,
3131                    > + Send
3132                    + '_,
3133            >,
3134        > {
3135            use awaken_contract::contract::executor::{InferenceStream, LlmStreamEvent};
3136            use awaken_contract::contract::inference::TokenUsage;
3137
3138            Box::pin(async move {
3139                self.upstream_models_seen
3140                    .lock()
3141                    .expect("lock poisoned")
3142                    .push(request.upstream_model.clone());
3143                let n = self.call_count.fetch_add(1, Ordering::SeqCst);
3144                if n == 0 {
3145                    // First call: emit a tool call with truncated JSON, then MaxTokens
3146                    let events: Vec<Result<LlmStreamEvent, InferenceExecutionError>> = vec![
3147                        Ok(LlmStreamEvent::TextDelta("partial ".into())),
3148                        Ok(LlmStreamEvent::ToolCallStart {
3149                            id: "tc1".into(),
3150                            name: "calculator".into(),
3151                        }),
3152                        // Truncated JSON: missing closing brace
3153                        Ok(LlmStreamEvent::ToolCallDelta {
3154                            id: "tc1".into(),
3155                            args_delta: r#"{"expr": "1+1"#.into(),
3156                        }),
3157                        Ok(LlmStreamEvent::Usage(TokenUsage {
3158                            prompt_tokens: Some(50),
3159                            completion_tokens: Some(100),
3160                            ..Default::default()
3161                        })),
3162                        Ok(LlmStreamEvent::Stop(StopReason::MaxTokens)),
3163                    ];
3164                    Ok(Box::pin(futures::stream::iter(events)) as InferenceStream)
3165                } else {
3166                    // Subsequent calls: return from followup queue
3167                    let mut followups = self.followup_responses.lock().expect("lock poisoned");
3168                    let result = if followups.is_empty() {
3169                        StreamResult {
3170                            content: vec![ContentBlock::text("final response")],
3171                            tool_calls: vec![],
3172                            usage: None,
3173                            stop_reason: Some(StopReason::EndTurn),
3174                            has_incomplete_tool_calls: false,
3175                        }
3176                    } else {
3177                        followups.remove(0)
3178                    };
3179                    let events =
3180                        awaken_contract::contract::executor::collected_to_stream_events(result);
3181                    Ok(Box::pin(futures::stream::iter(events)) as InferenceStream)
3182                }
3183            })
3184        }
3185
3186        fn name(&self) -> &str {
3187            "truncating"
3188        }
3189    }
3190
3191    #[tokio::test]
3192    async fn truncation_recovery_continues_on_max_tokens() {
3193        // First call returns MaxTokens with truncated tool call
3194        // Second call returns EndTurn with final text
3195        let llm = Arc::new(TruncatingLlm::new(vec![StreamResult {
3196            content: vec![ContentBlock::text("completed response")],
3197            tool_calls: vec![],
3198            usage: None,
3199            stop_reason: Some(StopReason::EndTurn),
3200            has_incomplete_tool_calls: false,
3201        }]));
3202        let resolver = Arc::new(FixedResolver {
3203            agent: ResolvedAgent::new("agent", "m", "sys", llm.clone())
3204                .with_max_continuation_retries(2),
3205            plugins: vec![],
3206        });
3207        let runtime = AgentRuntime::new(resolver);
3208        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
3209
3210        let result = runtime
3211            .run(
3212                RunRequest::new("thread-trunc", vec![Message::user("hi")]).with_agent_id("agent"),
3213                sink.clone(),
3214            )
3215            .await
3216            .expect("run should succeed");
3217
3218        assert_eq!(
3219            result.termination,
3220            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
3221        );
3222        // The final response should be from the second (continuation) call
3223        assert_eq!(result.response, "completed response");
3224        // Two calls total: truncated + continuation
3225        assert_eq!(llm.call_count.load(Ordering::SeqCst), 2);
3226    }
3227
3228    #[tokio::test]
3229    async fn text_truncation_recovery_continues_on_max_tokens() {
3230        let llm = Arc::new(ScriptedLlm::new(vec![
3231            StreamResult {
3232                content: vec![ContentBlock::text("partial ")],
3233                tool_calls: vec![],
3234                usage: None,
3235                stop_reason: Some(StopReason::MaxTokens),
3236                has_incomplete_tool_calls: false,
3237            },
3238            StreamResult {
3239                content: vec![ContentBlock::text("completed")],
3240                tool_calls: vec![],
3241                usage: None,
3242                stop_reason: Some(StopReason::EndTurn),
3243                has_incomplete_tool_calls: false,
3244            },
3245        ]));
3246        let resolver = Arc::new(FixedResolver {
3247            agent: ResolvedAgent::new("agent", "m", "sys", llm.clone())
3248                .with_max_continuation_retries(2),
3249            plugins: vec![],
3250        });
3251        let runtime = AgentRuntime::new(resolver);
3252        let sink = Arc::new(VecEventSink::new());
3253
3254        let result = runtime
3255            .run(
3256                RunRequest::new("thread-text-trunc", vec![Message::user("hi")])
3257                    .with_agent_id("agent"),
3258                sink.clone(),
3259            )
3260            .await
3261            .expect("run should succeed");
3262
3263        assert_eq!(
3264            result.termination,
3265            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
3266        );
3267        assert_eq!(result.response, "completed");
3268        assert_eq!(llm.seen_overrides.lock().expect("lock poisoned").len(), 2);
3269
3270        let text_deltas: Vec<String> = sink
3271            .events()
3272            .into_iter()
3273            .filter_map(|event| match event {
3274                AgentEvent::TextDelta { delta } => Some(delta),
3275                _ => None,
3276            })
3277            .collect();
3278        assert_eq!(text_deltas, vec!["partial ", "completed"]);
3279    }
3280
3281    #[tokio::test]
3282    async fn truncation_recovery_preserves_model_override() {
3283        let llm = Arc::new(TruncatingLlm::new(vec![StreamResult {
3284            content: vec![ContentBlock::text("completed response")],
3285            tool_calls: vec![],
3286            usage: None,
3287            stop_reason: Some(StopReason::EndTurn),
3288            has_incomplete_tool_calls: false,
3289        }]));
3290        let resolver = Arc::new(FixedResolver {
3291            agent: ResolvedAgent::new("agent", "base-model", "sys", llm.clone())
3292                .with_max_continuation_retries(2),
3293            plugins: vec![],
3294        });
3295        let runtime = AgentRuntime::new(resolver);
3296        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
3297
3298        let result = runtime
3299            .run(
3300                RunRequest::new("thread-trunc-override", vec![Message::user("hi")])
3301                    .with_agent_id("agent")
3302                    .with_overrides(InferenceOverride {
3303                        upstream_model: Some("override-model".into()),
3304                        ..Default::default()
3305                    }),
3306                sink,
3307            )
3308            .await
3309            .expect("run should succeed");
3310
3311        assert_eq!(
3312            result.termination,
3313            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
3314        );
3315        assert_eq!(
3316            llm.upstream_models_seen
3317                .lock()
3318                .expect("lock poisoned")
3319                .clone(),
3320            vec!["override-model".to_string(), "override-model".to_string()]
3321        );
3322    }
3323
3324    #[tokio::test]
3325    async fn truncation_recovery_gives_up_after_max_retries() {
3326        // All calls return MaxTokens with truncated tool calls
3327        // (the TruncatingLlm always returns truncated on first call,
3328        //  and we provide followups that are also truncated)
3329        struct AlwaysTruncatingLlm {
3330            call_count: AtomicUsize,
3331        }
3332
3333        #[async_trait]
3334        impl LlmExecutor for AlwaysTruncatingLlm {
3335            async fn execute(
3336                &self,
3337                _request: InferenceRequest,
3338            ) -> Result<StreamResult, InferenceExecutionError> {
3339                unreachable!("execute_stream is overridden");
3340            }
3341
3342            fn execute_stream(
3343                &self,
3344                _request: InferenceRequest,
3345            ) -> std::pin::Pin<
3346                Box<
3347                    dyn std::future::Future<
3348                            Output = Result<
3349                                awaken_contract::contract::executor::InferenceStream,
3350                                InferenceExecutionError,
3351                            >,
3352                        > + Send
3353                        + '_,
3354                >,
3355            > {
3356                use awaken_contract::contract::executor::{InferenceStream, LlmStreamEvent};
3357                use awaken_contract::contract::inference::TokenUsage;
3358
3359                Box::pin(async move {
3360                    self.call_count.fetch_add(1, Ordering::SeqCst);
3361                    // Always return truncated tool call
3362                    let events: Vec<Result<LlmStreamEvent, InferenceExecutionError>> = vec![
3363                        Ok(LlmStreamEvent::TextDelta("truncated ".into())),
3364                        Ok(LlmStreamEvent::ToolCallStart {
3365                            id: format!("tc{}", self.call_count.load(Ordering::SeqCst)),
3366                            name: "calculator".into(),
3367                        }),
3368                        Ok(LlmStreamEvent::ToolCallDelta {
3369                            id: format!("tc{}", self.call_count.load(Ordering::SeqCst)),
3370                            args_delta: r#"{"incomplete"#.into(),
3371                        }),
3372                        Ok(LlmStreamEvent::Usage(TokenUsage {
3373                            prompt_tokens: Some(50),
3374                            completion_tokens: Some(100),
3375                            ..Default::default()
3376                        })),
3377                        Ok(LlmStreamEvent::Stop(StopReason::MaxTokens)),
3378                    ];
3379                    Ok(Box::pin(futures::stream::iter(events)) as InferenceStream)
3380                })
3381            }
3382
3383            fn name(&self) -> &str {
3384                "always_truncating"
3385            }
3386        }
3387
3388        let llm = Arc::new(AlwaysTruncatingLlm {
3389            call_count: AtomicUsize::new(0),
3390        });
3391        let resolver = Arc::new(FixedResolver {
3392            agent: ResolvedAgent::new("agent", "m", "sys", llm.clone())
3393                .with_max_continuation_retries(2),
3394            plugins: vec![],
3395        });
3396        let runtime = AgentRuntime::new(resolver);
3397        let sink: Arc<dyn EventSink> = Arc::new(NullEventSink);
3398
3399        let result = runtime
3400            .run(
3401                RunRequest::new("thread-trunc-max", vec![Message::user("hi")])
3402                    .with_agent_id("agent"),
3403                sink.clone(),
3404            )
3405            .await
3406            .expect("run should succeed");
3407
3408        // Should give up after 1 initial + 2 retries = 3 calls total
3409        assert_eq!(llm.call_count.load(Ordering::SeqCst), 3);
3410        // After giving up, the result has no tools, so it ends naturally
3411        // with the text from the last truncated response
3412        assert_eq!(
3413            result.termination,
3414            awaken_contract::contract::lifecycle::TerminationReason::NaturalEnd
3415        );
3416        assert_eq!(result.response, "truncated ");
3417    }
3418
3419    // ── strip_unpaired_tool_calls tests ──────────────────────────────
3420
3421    mod strip_unpaired {
3422        use super::super::strip_unpaired_tool_calls;
3423        use awaken_contract::contract::message::{Message, Role, ToolCall};
3424
3425        fn assistant_with_calls(text: &str, call_ids: &[&str]) -> Message {
3426            let mut msg = Message::assistant(text);
3427            msg.tool_calls = Some(
3428                call_ids
3429                    .iter()
3430                    .map(|id| ToolCall {
3431                        id: id.to_string(),
3432                        name: "test_tool".into(),
3433                        arguments: serde_json::json!({}),
3434                    })
3435                    .collect(),
3436            );
3437            msg
3438        }
3439
3440        fn tool_response(call_id: &str) -> Message {
3441            Message::tool(call_id, "result")
3442        }
3443
3444        #[test]
3445        fn paired_calls_unchanged() {
3446            let mut msgs = vec![
3447                Message::user("hi"),
3448                assistant_with_calls("calling", &["tc1"]),
3449                tool_response("tc1"),
3450                Message::assistant("done"),
3451            ];
3452            let original_len = msgs.len();
3453            strip_unpaired_tool_calls(&mut msgs);
3454            assert_eq!(msgs.len(), original_len);
3455            // tc1 should still be present
3456            assert!(msgs[1].tool_calls.as_ref().unwrap().len() == 1);
3457        }
3458
3459        #[test]
3460        fn trailing_unpaired_calls_stripped() {
3461            let mut msgs = vec![
3462                Message::user("hi"),
3463                assistant_with_calls("calling", &["tc1", "tc2"]),
3464                tool_response("tc1"),
3465                // tc2 has no tool_response — should be stripped
3466            ];
3467            strip_unpaired_tool_calls(&mut msgs);
3468            let calls = msgs[1].tool_calls.as_ref().unwrap();
3469            assert_eq!(calls.len(), 1);
3470            assert_eq!(calls[0].id, "tc1");
3471        }
3472
3473        #[test]
3474        fn all_unpaired_removes_tool_calls_field() {
3475            let mut msgs = vec![
3476                Message::user("hi"),
3477                assistant_with_calls("", &["tc1"]),
3478                // no tool response at all
3479            ];
3480            strip_unpaired_tool_calls(&mut msgs);
3481            // Assistant message with no text and no tool calls should be removed
3482            assert_eq!(msgs.len(), 1);
3483            assert_eq!(msgs[0].role, Role::User);
3484        }
3485
3486        #[test]
3487        fn middle_paired_not_affected() {
3488            let mut msgs = vec![
3489                Message::user("first"),
3490                assistant_with_calls("first call", &["tc1"]),
3491                tool_response("tc1"),
3492                Message::user("second"),
3493                assistant_with_calls("", &["tc2"]),
3494                // tc2 has no response — stripped, then empty msg removed
3495            ];
3496            strip_unpaired_tool_calls(&mut msgs);
3497            // tc1 should still be intact
3498            assert_eq!(msgs[1].tool_calls.as_ref().unwrap().len(), 1);
3499            // tc2 stripped → empty assistant removed → 4 messages left
3500            assert_eq!(msgs.len(), 4); // user, assistant+tc1, tool, user
3501        }
3502
3503        #[test]
3504        fn no_tool_calls_is_noop() {
3505            let mut msgs = vec![Message::user("hi"), Message::assistant("hello")];
3506            strip_unpaired_tool_calls(&mut msgs);
3507            assert_eq!(msgs.len(), 2);
3508        }
3509
3510        #[test]
3511        fn empty_messages_is_noop() {
3512            let mut msgs: Vec<Message> = vec![];
3513            strip_unpaired_tool_calls(&mut msgs);
3514            assert!(msgs.is_empty());
3515        }
3516    }
3517}