Skip to main content

agentkit_loop/
lib.rs

1//! Runtime-agnostic agent loop orchestration for sessions, turns, tools, and interrupts.
2//!
3//! `agentkit-loop` is the central coordination layer in the agentkit workspace.  It
4//! drives a model through a multi-turn agentic loop, executing tool calls,
5//! respecting permission checks, surfacing approval interrupts to the host
6//! application, and optionally compacting the transcript when it grows too large.
7//!
8//! # Architecture
9//!
10//! The main entry point is [`Agent`], constructed via [`AgentBuilder`]. The
11//! builder optionally accepts the prior conversation transcript via
12//! [`AgentBuilder::transcript`] and the next user turn via
13//! [`AgentBuilder::input`] — both default to empty. Calling
14//! [`Agent::start`] with a [`SessionConfig`] returns a [`LoopDriver`] that
15//! yields [`LoopStep`]s — either a finished turn or an interrupt that
16//! requires host resolution before the loop can continue.
17//!
18//! If no input was preloaded, the first call to [`LoopDriver::next`] yields
19//! [`LoopInterrupt::AwaitingInput`] and the host supplies the first user
20//! turn via [`InputRequest::submit`]. If input was preloaded, the first
21//! `next()` dispatches the model directly — convenient for one-shot calls.
22//!
23//! ```text
24//! Agent::builder()
25//!     .model(adapter)              // ModelAdapter implementation
26//!     .add_tool_source(registry)   // ToolRegistry (or any ToolSource); call again to federate
27//!     .permissions(checker)        // PermissionChecker for gating tool use
28//!     .observer(obs)               // LoopObserver for streaming events
29//!     .transcript(prior)           // optional: passive prior transcript (system prompt, resumed session)
30//!     .input(first_user_turn)      // optional: preload next user turn so first next() drives a turn
31//!     .build()?
32//!     .start(config).await?  -> LoopDriver
33//!         .next().await?     -> LoopStep::Finished | LoopStep::Interrupt(...)
34//! ```
35//!
36//! # Example
37//!
38//! ```rust,no_run
39//! use agentkit_core::{Item, ItemKind};
40//! use agentkit_loop::{
41//!     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
42//! };
43//!
44//! # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
45//! // One-shot: preload system prompt and first user message; first next()
46//! // drives the model directly.
47//! let agent = Agent::builder()
48//!     .model(adapter)
49//!     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
50//!     .input(vec![Item::text(ItemKind::User, "Hello!")])
51//!     .build()?;
52//!
53//! let mut driver = agent
54//!     .start(SessionConfig::new("demo").with_cache(
55//!         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
56//!     ))
57//!     .await?;
58//!
59//! let _ = driver.next().await?;
60//! # Ok(())
61//! # }
62//! ```
63
64use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_core::{
68    CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
69    TextPart, Timestamp, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation,
70    Usage,
71};
72use agentkit_task_manager::{
73    PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
74    TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
75};
76#[cfg(test)]
77use agentkit_tools_core::ToolContext;
78use agentkit_tools_core::{
79    AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
80    PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutionScope, ToolExecutor, ToolRequest,
81    ToolResources, ToolSource, ToolSpec,
82};
83use async_trait::async_trait;
84use serde::{Deserialize, Serialize};
85use thiserror::Error;
86
87const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
88const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
89const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
90const USER_CANCELLED_REASON: &str = "user_cancelled";
91
92/// Configuration required to start a new model session.
93///
94/// Pass this to [`Agent::start`] to initialise the underlying [`ModelSession`]
95/// and obtain a [`LoopDriver`].
96///
97/// # Example
98///
99/// ```rust
100/// use agentkit_loop::{PromptCacheRequest, PromptCacheRetention, SessionConfig};
101///
102/// let config = SessionConfig::new("my-session").with_cache(
103///     PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
104/// );
105/// ```
106#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub struct SessionConfig {
108    /// Unique identifier for the session.
109    pub session_id: SessionId,
110    /// Arbitrary key-value metadata forwarded to the model adapter.
111    pub metadata: MetadataMap,
112    /// Default provider-side prompt caching policy for turns in this session.
113    pub cache: Option<PromptCacheRequest>,
114}
115
116impl SessionConfig {
117    /// Builds a session config with empty metadata and no cache policy.
118    pub fn new(session_id: impl Into<SessionId>) -> Self {
119        Self {
120            session_id: session_id.into(),
121            metadata: MetadataMap::new(),
122            cache: None,
123        }
124    }
125
126    /// Replaces the session metadata map.
127    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
128        self.metadata = metadata;
129        self
130    }
131
132    /// Sets the default prompt cache request for the session.
133    pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
134        self.cache = Some(cache);
135        self
136    }
137
138    /// Clears any default prompt cache request for the session.
139    pub fn without_cache(mut self) -> Self {
140        self.cache = None;
141        self
142    }
143}
144
145/// Strength of a prompt-cache request.
146///
147/// `BestEffort` lets adapters ignore unsupported controls while still using
148/// any provider-native automatic caching they support. `Required` upgrades
149/// unsupported cache requests into provider errors.
150#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub enum PromptCacheMode {
152    /// Disable prompt caching for this request.
153    Disabled,
154    /// Use caching when the provider can honor the request.
155    #[default]
156    BestEffort,
157    /// Fail the turn if the provider cannot honor the request.
158    Required,
159}
160
161/// High-level provider-neutral cache retention hint.
162///
163/// Providers map this to their native controls. For example, OpenAI maps
164/// `Short` to in-memory retention while OpenRouter Anthropic models map it to
165/// the default 5-minute ephemeral cache.
166#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum PromptCacheRetention {
168    /// Use the provider's default cache retention.
169    Default,
170    /// Prefer the provider's short-lived cache retention mode.
171    Short,
172    /// Prefer the provider's longest generally available cache retention mode.
173    Extended,
174}
175
176/// Provider-neutral prompt caching strategy.
177#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
178pub enum PromptCacheStrategy {
179    /// Let the provider decide the cacheable prefix automatically.
180    #[default]
181    Automatic,
182    /// Apply explicit cache breakpoints to selected prefix boundaries.
183    Explicit {
184        /// Cache breakpoints in transcript/tool order.
185        breakpoints: Vec<PromptCacheBreakpoint>,
186    },
187}
188
189impl PromptCacheStrategy {
190    /// Uses the provider's native automatic cache behavior when available, or
191    /// any adapter-provided automatic planning fallback.
192    pub fn automatic() -> Self {
193        Self::Automatic
194    }
195
196    /// Uses explicit cache breakpoints.
197    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
198        Self::Explicit {
199            breakpoints: breakpoints.into_iter().collect(),
200        }
201    }
202}
203
204/// Prefix boundary that a provider should cache when using explicit caching.
205#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
206pub enum PromptCacheBreakpoint {
207    /// Cache the tool schema prefix through the last available tool.
208    ToolsEnd,
209    /// Cache through the end of the transcript item at `index`.
210    TranscriptItemEnd { index: usize },
211    /// Cache through the specific transcript part.
212    ///
213    /// Not every adapter can target every part precisely; unsupported
214    /// fine-grained breakpoints become best-effort no-ops unless the request is
215    /// marked [`PromptCacheMode::Required`].
216    TranscriptPartEnd {
217        item_index: usize,
218        part_index: usize,
219    },
220}
221
222impl PromptCacheBreakpoint {
223    /// Cache the tool schema prefix through the last available tool.
224    pub fn tools_end() -> Self {
225        Self::ToolsEnd
226    }
227
228    /// Cache through the end of a transcript item.
229    pub fn transcript_item_end(index: usize) -> Self {
230        Self::TranscriptItemEnd { index }
231    }
232
233    /// Cache through a specific part within a transcript item.
234    pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
235        Self::TranscriptPartEnd {
236            item_index,
237            part_index,
238        }
239    }
240}
241
242/// Prompt caching request sent alongside a turn.
243#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
244pub struct PromptCacheRequest {
245    /// Strength of the caching request.
246    pub mode: PromptCacheMode,
247    /// Automatic or explicit caching strategy.
248    pub strategy: PromptCacheStrategy,
249    /// Optional provider-neutral retention hint.
250    pub retention: Option<PromptCacheRetention>,
251    /// Optional provider cache key or routing key.
252    pub key: Option<String>,
253}
254
255impl PromptCacheRequest {
256    /// Builds a best-effort automatic cache request.
257    pub fn automatic() -> Self {
258        Self::best_effort(PromptCacheStrategy::automatic())
259    }
260
261    /// Builds a required automatic cache request.
262    pub fn automatic_required() -> Self {
263        Self::required(PromptCacheStrategy::automatic())
264    }
265
266    /// Builds a best-effort explicit cache request.
267    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
268        Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
269    }
270
271    /// Builds a required explicit cache request.
272    pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
273        Self::required(PromptCacheStrategy::explicit(breakpoints))
274    }
275
276    /// Builds a disabled cache request.
277    pub fn disabled() -> Self {
278        Self {
279            mode: PromptCacheMode::Disabled,
280            strategy: PromptCacheStrategy::Automatic,
281            retention: None,
282            key: None,
283        }
284    }
285
286    /// Builds a best-effort cache request with the given strategy.
287    pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
288        Self {
289            mode: PromptCacheMode::BestEffort,
290            strategy,
291            retention: None,
292            key: None,
293        }
294    }
295
296    /// Builds a required cache request with the given strategy.
297    pub fn required(strategy: PromptCacheStrategy) -> Self {
298        Self {
299            mode: PromptCacheMode::Required,
300            strategy,
301            retention: None,
302            key: None,
303        }
304    }
305
306    /// Overrides the request mode.
307    pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
308        self.mode = mode;
309        self
310    }
311
312    /// Overrides the request strategy.
313    pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
314        self.strategy = strategy;
315        self
316    }
317
318    /// Applies a provider-neutral retention hint.
319    pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
320        self.retention = Some(retention);
321        self
322    }
323
324    /// Applies a provider cache key or routing key.
325    pub fn with_key(mut self, key: impl Into<String>) -> Self {
326        self.key = Some(key.into());
327        self
328    }
329
330    /// Clears any provider-neutral retention hint.
331    pub fn without_retention(mut self) -> Self {
332        self.retention = None;
333        self
334    }
335
336    /// Clears any provider cache key or routing key.
337    pub fn without_key(mut self) -> Self {
338        self.key = None;
339        self
340    }
341
342    /// Returns true when caching should be active for this request.
343    pub fn is_enabled(&self) -> bool {
344        !matches!(self.mode, PromptCacheMode::Disabled)
345    }
346}
347
348/// Payload sent to the model at the start of each turn.
349///
350/// The [`LoopDriver`] constructs this automatically from its internal state
351/// and passes it to [`ModelSession::begin_turn`].  Model adapter authors
352/// use the fields to build the provider-specific request.
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct TurnRequest {
355    /// Session this turn belongs to.
356    pub session_id: SessionId,
357    /// Unique identifier for the current turn.
358    pub turn_id: agentkit_core::TurnId,
359    /// Full conversation transcript accumulated so far.
360    pub transcript: Vec<Item>,
361    /// Tool specifications the model may invoke during this turn.
362    pub available_tools: Vec<ToolSpec>,
363    /// Provider-side prompt caching request for this turn.
364    pub cache: Option<PromptCacheRequest>,
365    /// Per-turn metadata (e.g. provider hints).
366    pub metadata: MetadataMap,
367}
368
369/// Final result produced by a single model turn.
370///
371/// Returned inside [`ModelTurnEvent::Finished`] to signal that the model has
372/// completed its generation for this turn.
373#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
374pub struct ModelTurnResult {
375    /// Why the model stopped generating (e.g. completed, tool call, length).
376    pub finish_reason: FinishReason,
377    /// Items the model produced during this turn (text, tool calls, etc.).
378    pub output_items: Vec<Item>,
379    /// Token usage statistics, if available.
380    pub usage: Option<Usage>,
381    /// Provider-specific metadata about the turn.
382    pub metadata: MetadataMap,
383    /// Model identifier reported by the provider for this turn, if known.
384    ///
385    /// Stamped onto inference telemetry spans as `gen_ai.response.model`.
386    #[serde(default)]
387    pub model: Option<String>,
388    /// Provider-assigned response identifier for this turn, if known.
389    ///
390    /// Stamped onto inference telemetry spans as `gen_ai.response.id`.
391    #[serde(default)]
392    pub response_id: Option<String>,
393}
394
395/// Streaming event emitted by a [`ModelTurn`] during generation.
396///
397/// The [`LoopDriver`] consumes these events one-by-one via
398/// [`ModelTurn::next_event`] and translates them into [`AgentEvent`]s for
399/// observers and into transcript mutations.
400#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
401pub enum ModelTurnEvent {
402    /// Incremental text or content delta from the model.
403    Delta(Delta),
404    /// The model is requesting a tool call.
405    ToolCall(ToolCallPart),
406    /// Updated token usage statistics.
407    Usage(Usage),
408    /// The model has finished generating for this turn.
409    Finished(ModelTurnResult),
410}
411
412/// Factory for creating model sessions.
413///
414/// Implement this trait to integrate a model provider (e.g. OpenRouter,
415/// Anthropic, a local LLM server) with the agent loop.  [`Agent`] holds a
416/// single adapter and calls [`start_session`](ModelAdapter::start_session)
417/// once when [`Agent::start`] is invoked.
418///
419/// # Example
420///
421/// ```rust,no_run
422/// use agentkit_loop::{ModelAdapter, ModelSession, SessionConfig, LoopError};
423/// use async_trait::async_trait;
424///
425/// struct MyAdapter;
426///
427/// #[async_trait]
428/// impl ModelAdapter for MyAdapter {
429///     type Session = MySession;
430///
431///     async fn start_session(&self, config: SessionConfig) -> Result<MySession, LoopError> {
432///         // Initialize provider-specific session state here.
433///         Ok(MySession { /* ... */ })
434///     }
435/// }
436/// # struct MySession;
437/// # #[async_trait]
438/// # impl ModelSession for MySession {
439/// #     type Turn = MyTurn;
440/// #     async fn begin_turn(&mut self, _r: agentkit_loop::TurnRequest, _c: Option<agentkit_core::TurnCancellation>) -> Result<MyTurn, LoopError> { todo!() }
441/// # }
442/// # struct MyTurn;
443/// # #[async_trait]
444/// # impl agentkit_loop::ModelTurn for MyTurn {
445/// #     async fn next_event(&mut self, _c: Option<agentkit_core::TurnCancellation>) -> Result<Option<agentkit_loop::ModelTurnEvent>, LoopError> { todo!() }
446/// # }
447/// ```
448#[async_trait]
449pub trait ModelAdapter: Send + Sync {
450    /// The session type produced by this adapter.
451    type Session: ModelSession;
452
453    /// Create a new model session from the given configuration.
454    ///
455    /// # Errors
456    ///
457    /// Returns [`LoopError`] if the provider connection or initialisation fails.
458    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
459
460    /// Name of the underlying model provider, when known.
461    ///
462    /// Stamped onto agent telemetry spans as the `gen_ai.provider.name`
463    /// attribute from the OpenTelemetry GenAI semantic conventions. Use a
464    /// lowercase identifier (e.g. `openrouter`, `ollama`). The default
465    /// returns `None` for adapters without a meaningful provider identity.
466    fn provider_name(&self) -> Option<&str> {
467        None
468    }
469}
470
471/// An active model session that can produce sequential turns.
472///
473/// A session is created once per [`Agent::start`] call and lives for the
474/// lifetime of the [`LoopDriver`].  Each call to [`begin_turn`](ModelSession::begin_turn)
475/// hands the full transcript to the model and returns a streaming
476/// [`ModelTurn`].
477#[async_trait]
478pub trait ModelSession: Send {
479    /// The turn type produced by this session.
480    type Turn: ModelTurn;
481
482    /// Start a new turn, sending the transcript and available tools to the model.
483    ///
484    /// # Arguments
485    ///
486    /// * `request` -- the turn payload including transcript and tool specs.
487    /// * `cancellation` -- optional handle the implementation should poll to
488    ///   detect user-initiated cancellation.
489    ///
490    /// # Errors
491    ///
492    /// Returns [`LoopError::Cancelled`] when the turn is cancelled, or a
493    /// provider-specific error wrapped in [`LoopError`].
494    async fn begin_turn(
495        &mut self,
496        request: TurnRequest,
497        cancellation: Option<TurnCancellation>,
498    ) -> Result<Self::Turn, LoopError>;
499
500    /// Model identifier this session sends requests to, when known.
501    ///
502    /// Stamped onto inference telemetry spans as the `gen_ai.request.model`
503    /// attribute from the OpenTelemetry GenAI semantic conventions. The
504    /// default returns `None` for sessions without a fixed model.
505    fn model_name(&self) -> Option<&str> {
506        None
507    }
508}
509
510/// A streaming model turn that yields events one at a time.
511///
512/// The loop driver calls [`next_event`](ModelTurn::next_event) repeatedly
513/// until it returns `Ok(None)` (stream exhausted) or
514/// `Ok(Some(ModelTurnEvent::Finished(_)))`.
515#[async_trait]
516pub trait ModelTurn: Send {
517    /// Retrieve the next event from the model's response stream.
518    ///
519    /// Returns `Ok(None)` when the stream is exhausted.
520    ///
521    /// # Errors
522    ///
523    /// Returns [`LoopError::Cancelled`] if `cancellation` fires, or a
524    /// provider-specific error wrapped in [`LoopError`].
525    async fn next_event(
526        &mut self,
527        cancellation: Option<TurnCancellation>,
528    ) -> Result<Option<ModelTurnEvent>, LoopError>;
529}
530
531/// Observer hook for streaming agent events to the host application.
532///
533/// Register observers via [`AgentBuilder::observer`] to receive real-time
534/// notifications about deltas, tool calls, usage, warnings, and lifecycle
535/// events.
536///
537/// # Example
538///
539/// ```rust
540/// use agentkit_loop::{AgentEvent, LoopObserver};
541///
542/// struct StdoutObserver;
543///
544/// impl LoopObserver for StdoutObserver {
545///     fn handle_event(&self, event: AgentEvent) {
546///         println!("{event:?}");
547///     }
548/// }
549/// ```
550pub trait LoopObserver: Send + Sync {
551    /// Called synchronously for every [`AgentEvent`] emitted by the loop driver.
552    /// Observers store mutable state behind interior mutability (`Mutex`,
553    /// atomics, channels) so the driver can share an `Arc<dyn LoopObserver>`
554    /// across reusable [`Agent`] starts.
555    fn handle_event(&self, event: AgentEvent);
556}
557
558/// Receives full [`Item`]s as they are appended to the driver's transcript.
559///
560/// While [`LoopObserver`] surfaces operational events (deltas, tool calls,
561/// lifecycle, telemetry), it can't be reconstructed back into a faithful
562/// transcript on its own — content deltas span partial parts and don't
563/// carry their parent-Item identity, and historically tool results were
564/// pushed into the transcript with no observer event at all. A
565/// `TranscriptObserver` is the loss-free counterpart: it fires once per
566/// [`Item`] appended, with the full Item shape ready for persistence,
567/// replication, or audit.
568///
569/// Observers are called *synchronously* from inside the driver, in the
570/// same order items land in the transcript. Compaction-driven transcript
571/// rewrites do **not** fire `on_item_appended` — those are signaled by
572/// [`AgentEvent::CompactionFinished`] instead.
573///
574/// Register via [`AgentBuilder::transcript_observer`]; multiple observers
575/// may be registered and are called in registration order.
576///
577/// # Example
578///
579/// ```rust
580/// use agentkit_core::Item;
581/// use agentkit_loop::TranscriptObserver;
582/// use std::sync::atomic::{AtomicUsize, Ordering};
583///
584/// struct CountingObserver { items: AtomicUsize }
585///
586/// impl TranscriptObserver for CountingObserver {
587///     fn on_item_appended(&self, _item: &Item) {
588///         self.items.fetch_add(1, Ordering::Relaxed);
589///     }
590/// }
591/// ```
592pub trait TranscriptObserver: Send + Sync {
593    /// Called synchronously every time an [`Item`] is appended to the
594    /// driver's transcript, in transcript order. Observers store mutable
595    /// state behind interior mutability so the driver can share an
596    /// `Arc<dyn TranscriptObserver>`.
597    fn on_item_appended(&self, item: &Item);
598}
599
600/// Where in the loop a [`LoopMutator`] is given a chance to modify the
601/// transcript. Mutators run synchronously at these points; mid-stream
602/// mutation (e.g. between content deltas) is intentionally not supported
603/// because the assistant item is not yet fully constructed.
604#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
605#[non_exhaustive]
606pub enum MutationPoint {
607    /// A tool result has just been appended; the next loop step will be
608    /// another inference call.
609    AfterToolResult,
610    /// A turn has fully ended (assistant final, interrupt, or cancellation)
611    /// and any new user input has not yet been dispatched.
612    AfterTurnEnded,
613}
614
615/// Sink for emitting [`AgentEvent`]s from inside a [`LoopMutator`].
616/// The driver supplies a concrete implementation via [`LoopCtx::emitter`].
617pub trait EventEmitter: Send + Sync {
618    /// Forward `event` to all registered observers.
619    fn emit(&self, event: AgentEvent);
620}
621
622/// Read-only context handed to a [`LoopMutator`] alongside the cursor.
623#[non_exhaustive]
624pub struct LoopCtx<'a> {
625    /// Session this mutation point belongs to.
626    pub session_id: &'a SessionId,
627    /// Turn the mutation is associated with, if any.
628    pub turn_id: Option<&'a agentkit_core::TurnId>,
629    /// Where in the loop the mutator is running.
630    pub point: MutationPoint,
631    /// Cancellation handle for the active turn, if any.
632    pub cancellation: Option<TurnCancellation>,
633    /// Sink for emitting events from the mutator (telemetry, progress).
634    pub emitter: &'a dyn EventEmitter,
635}
636
637/// Mutable handle over the live transcript with dirty tracking.
638///
639/// Implements [`Deref`](std::ops::Deref)/[`DerefMut`](std::ops::DerefMut) to
640/// `Vec<Item>` so mutators read and write through `Vec`'s native API
641/// (`push`, `retain`, `iter`, `*cursor = ...`). Any `&mut` access marks the
642/// cursor dirty; the loop validates transcript invariants when at least one
643/// mutator dirtied the transcript and hard-fails on protocol violations.
644pub struct TranscriptCursor<'a> {
645    items: &'a mut Vec<Item>,
646    pub(crate) dirty: bool,
647}
648
649impl<'a> std::ops::Deref for TranscriptCursor<'a> {
650    type Target = Vec<Item>;
651    fn deref(&self) -> &Vec<Item> {
652        self.items
653    }
654}
655
656impl<'a> std::ops::DerefMut for TranscriptCursor<'a> {
657    fn deref_mut(&mut self) -> &mut Vec<Item> {
658        self.dirty = true;
659        self.items
660    }
661}
662
663/// Async transcript mutator. Registered via [`AgentBuilder::mutator`] and
664/// invoked at each [`MutationPoint`]. Mutators own their derived state
665/// (e.g. running token totals via interior mutability) and decide for
666/// themselves whether and how to modify the transcript.
667///
668/// The default implementation is a no-op so trait users override only
669/// `mutate`.
670#[async_trait]
671pub trait LoopMutator: Send + Sync {
672    /// Run this mutator. Returning without writing to `cursor` is a no-op.
673    /// Errors abort the loop; protocol-violating mutations (orphaned tool
674    /// uses or results) are detected by validation and turned into
675    /// [`LoopError::Mutator`].
676    async fn mutate(
677        &self,
678        cursor: &mut TranscriptCursor<'_>,
679        ctx: LoopCtx<'_>,
680    ) -> Result<(), LoopError> {
681        let _ = (cursor, ctx);
682        Ok(())
683    }
684}
685
686/// Lifecycle and streaming events emitted by the [`LoopDriver`].
687///
688/// Observers (see [`LoopObserver`]) receive these events in the order they
689/// occur.  They are useful for building UIs, logging, or telemetry.
690#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
691pub enum AgentEvent {
692    /// The agent run has been initialised.
693    RunStarted { session_id: SessionId },
694    /// A new model turn is starting.
695    TurnStarted {
696        session_id: SessionId,
697        turn_id: agentkit_core::TurnId,
698    },
699    /// User input has been accepted into the pending queue.
700    InputAccepted {
701        session_id: SessionId,
702        items: Vec<Item>,
703    },
704    /// Incremental content delta from the model.
705    ContentDelta(Delta),
706    /// The model has requested a tool call.
707    ToolCallRequested(ToolCallPart),
708    /// A tool call's result has landed in the transcript.
709    ///
710    /// Fires once per [`Part::ToolResult`] that's appended, including the
711    /// synthetic placeholder that's pushed when a tool is detached to the
712    /// background — the real result fires a second event with the same
713    /// [`ToolResultPart::call_id`] once the background task completes.
714    /// Cancellation/denial paths (auth cancelled, approval denied) also
715    /// emit this with `is_error = true`.
716    ///
717    /// Correlate with the matching [`AgentEvent::ToolCallRequested`] via
718    /// `call_id`.
719    ToolResultReceived(ToolResultPart),
720    /// A tool call requires explicit user approval before execution.
721    ApprovalRequired(ApprovalRequest),
722    /// An approval interrupt has been resolved.
723    ApprovalResolved { approved: bool },
724    /// The available tool catalog changed and will be reflected on the next model request.
725    ToolCatalogChanged(ToolCatalogEvent),
726    /// A [`LoopMutator`] is about to run at one of the mutation points.
727    /// `mutator` is a stable label the implementation chooses for itself.
728    MutationStarted {
729        session_id: SessionId,
730        turn_id: Option<agentkit_core::TurnId>,
731        mutator: String,
732        point: MutationPoint,
733    },
734    /// A [`LoopMutator`] has finished running. `dirty` indicates whether the
735    /// transcript was modified; `metadata` carries mutator-specific extras
736    /// (e.g. compaction reason, replaced item count).
737    MutationFinished {
738        session_id: SessionId,
739        turn_id: Option<agentkit_core::TurnId>,
740        mutator: String,
741        dirty: bool,
742        metadata: MetadataMap,
743    },
744    /// Updated token usage statistics.
745    UsageUpdated(Usage),
746    /// Non-fatal warning (e.g. a tool failure that was recovered from).
747    Warning { message: String },
748    /// The agent run has failed with an unrecoverable error.
749    RunFailed { message: String },
750    /// A turn has finished (successfully, via cancellation, etc.).
751    TurnFinished(TurnResult),
752}
753
754/// Handle for a pending approval interrupt.
755///
756/// Wraps an [`ApprovalRequest`] and provides ergonomic resolution methods
757/// so callers can resolve the interrupt directly instead of searching for
758/// the matching method on [`LoopDriver`].
759///
760/// # Example
761///
762/// ```rust,no_run
763/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
764/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
765/// match driver.next().await? {
766///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
767///         println!("Needs approval: {}", pending.request.summary);
768///         pending.approve(driver)?;
769///     }
770///     _ => {}
771/// }
772/// # Ok(())
773/// # }
774/// ```
775#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
776pub struct PendingApproval {
777    /// The underlying approval request details.
778    pub request: ApprovalRequest,
779}
780
781impl std::ops::Deref for PendingApproval {
782    type Target = ApprovalRequest;
783    fn deref(&self) -> &ApprovalRequest {
784        &self.request
785    }
786}
787
788impl PendingApproval {
789    /// Approve the pending tool call.
790    pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
791        let call_id = self
792            .request
793            .call_id
794            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
795        driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
796    }
797
798    /// Deny the pending tool call.
799    pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
800        let call_id = self
801            .request
802            .call_id
803            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
804        driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
805    }
806
807    /// Deny the pending tool call with a reason.
808    pub fn deny_with_reason<S: ModelSession>(
809        self,
810        driver: &mut LoopDriver<S>,
811        reason: impl Into<String>,
812    ) -> Result<(), LoopError> {
813        let call_id = self
814            .request
815            .call_id
816            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
817        driver.resolve_approval_for(
818            call_id,
819            ApprovalDecision::Deny {
820                reason: Some(reason.into()),
821            },
822        )
823    }
824
825    /// Approve the pending tool call with a patched input.
826    ///
827    /// The model's original tool input is replaced with `input` before the
828    /// tool executes. The transcript still records the call as the model
829    /// emitted it; only the executor sees the patched payload. This mirrors
830    /// the `PermissionResultAllow(updated_input=...)` pattern from the
831    /// Anthropic Agent SDK and is intended for hosts that want to sanitise,
832    /// restrict, or augment arguments before tool execution without forcing
833    /// the model to re-issue the call.
834    pub fn approve_with_patched_input<S: ModelSession>(
835        self,
836        driver: &mut LoopDriver<S>,
837        input: serde_json::Value,
838    ) -> Result<(), LoopError> {
839        let call_id = self
840            .request
841            .call_id
842            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
843        driver.resolve_approval_for_with_patched_input(call_id, input)
844    }
845}
846
847/// Descriptor for a [`LoopInterrupt::AwaitingInput`] interrupt.
848///
849/// Returned when the driver has no pending input and needs the host to
850/// supply items before advancing. This is the entry point for every user
851/// turn that wasn't preloaded via [`AgentBuilder::input`]. Transcript items
852/// loaded via [`AgentBuilder::transcript`] are passive, so when no input is
853/// preloaded the first [`LoopDriver::next`] call surfaces `AwaitingInput`
854/// and the host injects the first user message via [`InputRequest::submit`].
855///
856/// # Example
857///
858/// ```rust,no_run
859/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
860/// # use agentkit_core::Item;
861/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>, items: Vec<Item>) -> Result<(), agentkit_loop::LoopError> {
862/// match driver.next().await? {
863///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
864///         pending.submit(driver, items)?;
865///     }
866///     _ => {}
867/// }
868/// # Ok(())
869/// # }
870/// ```
871#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
872pub struct InputRequest {
873    /// The session that is waiting for input.
874    pub session_id: SessionId,
875    /// Human-readable explanation of why input is needed.
876    pub reason: String,
877}
878
879impl InputRequest {
880    /// Submit input items to the driver.
881    pub fn submit<S: ModelSession>(
882        self,
883        driver: &mut LoopDriver<S>,
884        items: Vec<Item>,
885    ) -> Result<(), LoopError> {
886        driver.submit_input(items)
887    }
888}
889
890/// Outcome of a completed (or cancelled) turn.
891///
892/// Wrapped by [`LoopStep::Finished`] and also emitted as
893/// [`AgentEvent::TurnFinished`] to observers.
894#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
895pub struct TurnResult {
896    /// Identifier for the turn that produced this result.
897    pub turn_id: agentkit_core::TurnId,
898    /// Why the turn ended (completed, tool call, cancelled, etc.).
899    pub finish_reason: FinishReason,
900    /// Items produced during this turn (assistant text, tool results, etc.).
901    pub items: Vec<Item>,
902    /// Aggregated token usage, if reported by the model.
903    pub usage: Option<Usage>,
904    /// Additional metadata about the turn.
905    pub metadata: MetadataMap,
906}
907
908/// An interrupt that pauses the agent loop until the host resolves it.
909///
910/// The loop returns an interrupt inside [`LoopStep::Interrupt`] whenever it
911/// cannot proceed autonomously.  Each variant carries a handle with
912/// resolution methods so callers can resolve the interrupt directly.
913///
914/// # Example
915///
916/// ```rust,no_run
917/// use agentkit_loop::{LoopInterrupt, LoopStep};
918/// # use agentkit_loop::LoopDriver;
919///
920/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
921/// match driver.next().await? {
922///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
923///         println!("Tool {} needs approval: {}", pending.request.request_kind, pending.request.summary);
924///         pending.approve(driver)?;
925///     }
926///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
927///         println!("Waiting for input: {}", pending.reason);
928///         // ... call pending.submit(driver, items)
929///     }
930///     LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => {
931///         // Cooperative yield between tool rounds.  Optionally call
932///         // driver.submit_input(...) to interject a user message, then
933///         // call driver.next() to resume the turn.
934///         let _ = info;
935///     }
936///     LoopStep::Finished(result) => {
937///         println!("Turn finished: {:?}", result.finish_reason);
938///     }
939/// }
940/// # Ok(())
941/// # }
942/// ```
943#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
944pub enum LoopInterrupt {
945    /// A tool call requires explicit approval before it can execute.
946    ApprovalRequest(PendingApproval),
947    /// The driver has no pending input and needs the host to supply some.
948    AwaitingInput(InputRequest),
949    /// A tool round finished: all tool calls from the previous assistant
950    /// message now have results in the transcript, and the driver is about to
951    /// invoke the model again. The host may interject user messages via the
952    /// [`ToolRoundInfo::submit`] handle before calling [`LoopDriver::next`]
953    /// to resume.
954    ///
955    /// This is a non-blocking interrupt: callers that do not care about
956    /// mid-turn interjection can treat it as a no-op (`_ => continue`) and
957    /// the next `next()` call resumes the turn.
958    AfterToolResult(ToolRoundInfo),
959}
960
961impl LoopInterrupt {
962    /// Returns `true` if the interrupt must be explicitly resolved before
963    /// the loop can make progress. Approvals are blocking;
964    /// [`AwaitingInput`](LoopInterrupt::AwaitingInput) and
965    /// [`AfterToolResult`](LoopInterrupt::AfterToolResult) are cooperative
966    /// and can be ignored by calling [`LoopDriver::next`] again.
967    pub fn is_blocking(&self) -> bool {
968        matches!(self, LoopInterrupt::ApprovalRequest(_))
969    }
970}
971
972/// Metadata describing a completed tool round, surfaced via
973/// [`LoopInterrupt::AfterToolResult`].
974#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
975pub struct ToolRoundInfo {
976    /// The session that produced this tool round.
977    pub session_id: SessionId,
978    /// The turn that is about to continue into the next model call.
979    pub turn_id: agentkit_core::TurnId,
980    /// Transcript length at the yield point (for snapshots / UIs).
981    pub transcript_len: usize,
982}
983
984impl ToolRoundInfo {
985    /// Interject user input between tool rounds. Consumes the
986    /// [`ToolRoundInfo`] handle so the same yield cannot accept input twice.
987    pub fn submit<S: ModelSession>(
988        self,
989        driver: &mut LoopDriver<S>,
990        items: Vec<Item>,
991    ) -> Result<(), LoopError> {
992        driver.submit_input(items)
993    }
994}
995
996/// The result of advancing the agent loop by one step.
997///
998/// Returned by [`LoopDriver::next`].  The host should pattern-match on this
999/// to decide whether to continue the loop or resolve an interrupt first.
1000///
1001/// # Example
1002///
1003/// ```rust,no_run
1004/// use agentkit_loop::LoopStep;
1005/// # use agentkit_loop::LoopDriver;
1006///
1007/// # async fn run<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1008/// loop {
1009///     match driver.next().await? {
1010///         LoopStep::Finished(result) => {
1011///             println!("Turn complete: {:?}", result.finish_reason);
1012///             break;
1013///         }
1014///         LoopStep::Interrupt(interrupt) => {
1015///             // Resolve the interrupt, then continue the loop.
1016///             # break;
1017///         }
1018///     }
1019/// }
1020/// # Ok(())
1021/// # }
1022/// ```
1023#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1024pub enum LoopStep {
1025    /// The loop is paused and requires host action.
1026    Interrupt(LoopInterrupt),
1027    /// A turn has completed (or been cancelled).
1028    Finished(TurnResult),
1029}
1030
1031/// A read-only snapshot of the loop driver's current state.
1032///
1033/// Obtained via [`LoopDriver::snapshot`].  Useful for persisting or
1034/// inspecting the conversation transcript without holding a mutable
1035/// reference to the driver.
1036#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1037pub struct LoopSnapshot {
1038    /// Session identifier.
1039    pub session_id: SessionId,
1040    /// The full transcript accumulated so far.
1041    pub transcript: Vec<Item>,
1042    /// Input items queued but not yet consumed by a turn.
1043    pub pending_input: Vec<Item>,
1044}
1045
1046#[derive(Clone, Debug)]
1047struct PendingApprovalToolCall {
1048    request: ApprovalRequest,
1049    decision: Option<ApprovalDecision>,
1050    surfaced: bool,
1051    turn_id: agentkit_core::TurnId,
1052    task_id: TaskId,
1053    call: ToolCallPart,
1054    tool_request: ToolRequest,
1055}
1056
1057#[derive(Clone, Debug, Default)]
1058struct ActiveToolRound {
1059    turn_id: agentkit_core::TurnId,
1060    pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
1061    background_pending: bool,
1062    foreground_progressed: bool,
1063}
1064
1065/// A configured agent ready to start a session.
1066///
1067/// Build one with [`Agent::builder`], supplying at minimum a [`ModelAdapter`].
1068/// Optionally preload prior conversation state via
1069/// [`AgentBuilder::transcript`] and the next user turn via
1070/// [`AgentBuilder::input`]. Then call [`Agent::start`] with a
1071/// [`SessionConfig`] to obtain a [`LoopDriver`] that drives the agentic loop.
1072///
1073/// If no input is preloaded, the first call to [`LoopDriver::next`] yields
1074/// [`LoopInterrupt::AwaitingInput`] so the host can supply the first user
1075/// message via [`InputRequest::submit`]. If input was preloaded, the first
1076/// `next()` dispatches the model directly.
1077///
1078/// # Example
1079///
1080/// ```rust,no_run
1081/// use agentkit_core::{Item, ItemKind};
1082/// use agentkit_loop::{
1083///     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
1084/// };
1085/// use agentkit_tools_core::ToolRegistry;
1086///
1087/// # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
1088/// let agent = Agent::builder()
1089///     .model(adapter)
1090///     .add_tool_source(ToolRegistry::new())
1091///     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
1092///     .input(vec![Item::text(ItemKind::User, "Hello!")])
1093///     .build()?;
1094///
1095/// let mut driver = agent
1096///     .start(SessionConfig::new("s1").with_cache(
1097///         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
1098///     ))
1099///     .await?;
1100///
1101/// // First next() drives the model since input was preloaded.
1102/// let _ = driver.next().await?;
1103/// # Ok(())
1104/// # }
1105/// ```
1106pub struct Agent<M>
1107where
1108    M: ModelAdapter,
1109{
1110    model: M,
1111    tool_sources: Vec<Arc<dyn ToolSource>>,
1112    tool_executor: Option<Arc<dyn ToolExecutor>>,
1113    task_manager: Arc<dyn TaskManager>,
1114    permissions: Arc<dyn PermissionChecker>,
1115    resources: Arc<dyn ToolResources>,
1116    cancellation: Option<CancellationHandle>,
1117    mutators: Vec<Arc<dyn LoopMutator>>,
1118    observers: Vec<Arc<dyn LoopObserver>>,
1119    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1120    transcript: Vec<Item>,
1121    input: Vec<Item>,
1122}
1123
1124impl<M> Agent<M>
1125where
1126    M: ModelAdapter,
1127{
1128    /// Create a new [`AgentBuilder`] for configuring this agent.
1129    pub fn builder() -> AgentBuilder<M> {
1130        AgentBuilder::default()
1131    }
1132
1133    /// Start a session, returning a [`LoopDriver`] preloaded with whatever
1134    /// transcript and input were configured on the builder. See
1135    /// [`AgentBuilder::transcript`] and [`AgentBuilder::input`] for what each
1136    /// one does and when to use them.
1137    ///
1138    /// This calls [`ModelAdapter::start_session`] and emits an
1139    /// [`AgentEvent::RunStarted`] event to all registered observers.
1140    ///
1141    /// `&self` so a single configured agent can mint multiple sessions over
1142    /// its lifetime — e.g. an outer agent that uses an inner sub-agent for
1143    /// transcript compaction.
1144    ///
1145    /// # Errors
1146    ///
1147    /// Returns [`LoopError`] if the model adapter fails to create a session.
1148    pub async fn start(&self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1149        let session_id = config.session_id.clone();
1150        let default_cache = config.cache.clone();
1151        let session = self.model.start_session(config).await?;
1152        let tool_executor = self
1153            .tool_executor
1154            .clone()
1155            .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1156        let driver = LoopDriver {
1157            session_id: session_id.clone(),
1158            provider_name: self.model.provider_name().map(str::to_owned),
1159            default_cache,
1160            next_turn_cache: None,
1161            session: Some(session),
1162            tool_executor,
1163            task_manager: self.task_manager.clone(),
1164            permissions: self.permissions.clone(),
1165            resources: self.resources.clone(),
1166            cancellation: self.cancellation.clone(),
1167            mutators: self.mutators.clone(),
1168            observers: self.observers.clone(),
1169            transcript_observers: self.transcript_observers.clone(),
1170            transcript: self.transcript.clone(),
1171            pending_input: self.input.clone(),
1172            pending_approvals: BTreeMap::new(),
1173            pending_approval_order: VecDeque::new(),
1174            active_tool_round: None,
1175            pending_round_resume: None,
1176            next_turn_index: 1,
1177            detached_call_ids: HashSet::new(),
1178        };
1179        driver.emit(AgentEvent::RunStarted { session_id });
1180        Ok(driver)
1181    }
1182}
1183
1184/// Builder for constructing an [`Agent`].
1185///
1186/// Obtained via [`Agent::builder`].  The only required field is
1187/// [`model`](AgentBuilder::model); all others have sensible defaults
1188/// (no tools, allow-all permissions, no compaction, no observers).
1189pub struct AgentBuilder<M>
1190where
1191    M: ModelAdapter,
1192{
1193    model: Option<M>,
1194    tool_sources: Vec<Arc<dyn ToolSource>>,
1195    tool_executor: Option<Arc<dyn ToolExecutor>>,
1196    task_manager: Option<Arc<dyn TaskManager>>,
1197    permissions: Arc<dyn PermissionChecker>,
1198    resources: Arc<dyn ToolResources>,
1199    cancellation: Option<CancellationHandle>,
1200    mutators: Vec<Arc<dyn LoopMutator>>,
1201    observers: Vec<Arc<dyn LoopObserver>>,
1202    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1203    transcript: Vec<Item>,
1204    input: Vec<Item>,
1205}
1206
1207impl<M> Default for AgentBuilder<M>
1208where
1209    M: ModelAdapter,
1210{
1211    fn default() -> Self {
1212        Self {
1213            model: None,
1214            tool_sources: Vec::new(),
1215            tool_executor: None,
1216            task_manager: None,
1217            permissions: Arc::new(AllowAllPermissions),
1218            resources: Arc::new(()),
1219            cancellation: None,
1220            mutators: Vec::new(),
1221            observers: Vec::new(),
1222            transcript_observers: Vec::new(),
1223            transcript: Vec::new(),
1224            input: Vec::new(),
1225        }
1226    }
1227}
1228
1229impl<M> AgentBuilder<M>
1230where
1231    M: ModelAdapter,
1232{
1233    /// Set the model adapter (required).
1234    pub fn model(mut self, model: M) -> Self {
1235        self.model = Some(model);
1236        self
1237    }
1238
1239    /// Adds a tool source to the agent. Call multiple times to compose
1240    /// federated sources — for example a frozen native [`ToolRegistry`]
1241    /// alongside an MCP manager's [`agentkit_tools_core::CatalogReader`]
1242    /// and a skill-watcher reader. Sources are walked in registration
1243    /// order; the default [`agentkit_tools_core::CollisionPolicy`] is
1244    /// `FirstWins`.
1245    ///
1246    /// Accepts any sized [`ToolSource`]; the agent owns it for the
1247    /// session. To share a dynamic source between the agent and the
1248    /// subsystem mutating it, mint a [`agentkit_tools_core::CatalogReader`]
1249    /// from a [`agentkit_tools_core::dynamic_catalog`] pair — the reader
1250    /// is sized and owned, hosts never see the underlying `Arc`.
1251    pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1252        self.tool_sources.push(Arc::new(source));
1253        self
1254    }
1255
1256    /// Set a custom [`ToolExecutor`]. When provided, the agent uses it
1257    /// instead of building a [`BasicToolExecutor`] from the configured
1258    /// sources. Most hosts should use [`add_tool_source`](Self::add_tool_source)
1259    /// instead; this is for advanced cases (custom routing, instrumentation,
1260    /// test fakes).
1261    pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1262        self.tool_executor = Some(Arc::new(executor));
1263        self
1264    }
1265
1266    /// Set the task manager that schedules tool-call execution.
1267    ///
1268    /// Defaults to [`SimpleTaskManager`], which preserves the existing
1269    /// sequential request/response behavior.
1270    pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1271        self.task_manager = Some(Arc::new(manager));
1272        self
1273    }
1274
1275    /// Set the permission checker that gates tool execution.
1276    ///
1277    /// Defaults to allowing all tool calls without prompting.
1278    pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1279        self.permissions = Arc::new(permissions);
1280        self
1281    }
1282
1283    /// Set shared resources available to tool implementations.
1284    pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1285        self.resources = Arc::new(resources);
1286        self
1287    }
1288
1289    /// Attach a [`CancellationHandle`] for cooperative cancellation of turns.
1290    pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1291        self.cancellation = Some(handle);
1292        self
1293    }
1294
1295    /// Register a [`LoopMutator`] that runs at every [`MutationPoint`].
1296    ///
1297    /// Multiple mutators may be registered; they run in registration order
1298    /// and the dirty flag propagates across the pipeline. After every pass
1299    /// in which any mutator dirtied the transcript, the loop validates
1300    /// protocol invariants (tool_use/tool_result pairing); a violation is a
1301    /// hard [`LoopError::Mutator`] failure.
1302    pub fn mutator<L: LoopMutator + 'static>(mut self, mutator: L) -> Self {
1303        self.mutators.push(Arc::new(mutator));
1304        self
1305    }
1306
1307    /// Register a [`LoopObserver`] that receives [`AgentEvent`]s.
1308    ///
1309    /// Multiple observers may be registered; they are called in order.
1310    pub fn observer<O: LoopObserver + 'static>(mut self, observer: O) -> Self {
1311        self.observers.push(Arc::new(observer));
1312        self
1313    }
1314
1315    /// Register a [`TranscriptObserver`] that receives an [`Item`] every
1316    /// time one is appended to the transcript.
1317    ///
1318    /// Multiple observers may be registered; they are called in order.
1319    /// Use this when you need a loss-free view of the transcript (e.g.
1320    /// for persistence or replication) — [`LoopObserver`] alone is
1321    /// insufficient because it doesn't expose item boundaries for model
1322    /// output and historically did not surface tool results at all.
1323    pub fn transcript_observer<O: TranscriptObserver + 'static>(mut self, observer: O) -> Self {
1324        self.transcript_observers.push(Arc::new(observer));
1325        self
1326    }
1327
1328    /// Preload the driver's transcript with prior conversation state
1329    /// (defaults to empty).
1330    ///
1331    /// Items pass straight into the driver's transcript without firing
1332    /// [`TranscriptObserver::on_item_appended`] — the host is expected to
1333    /// already know about (and have persisted) anything it preloads. Use
1334    /// this for resumed sessions or to seed a system prompt.
1335    pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1336        self.transcript = transcript;
1337        self
1338    }
1339
1340    /// Preload the driver's pending-input queue with the next user turn
1341    /// (defaults to empty).
1342    ///
1343    /// When non-empty, the first [`LoopDriver::next`] dispatches the model
1344    /// directly instead of yielding [`LoopInterrupt::AwaitingInput`]. Use
1345    /// this for one-shot calls and scripts where the first user turn is
1346    /// known up front. Items move to the transcript on turn dispatch the
1347    /// same way submitted input does, firing transcript observers.
1348    pub fn input(mut self, input: Vec<Item>) -> Self {
1349        self.input = input;
1350        self
1351    }
1352
1353    /// Consume the builder and produce an [`Agent`].
1354    ///
1355    /// # Errors
1356    ///
1357    /// Returns [`LoopError::InvalidState`] if no model adapter was provided.
1358    pub fn build(self) -> Result<Agent<M>, LoopError> {
1359        let model = self
1360            .model
1361            .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1362        Ok(Agent {
1363            model,
1364            tool_sources: self.tool_sources,
1365            tool_executor: self.tool_executor,
1366            task_manager: self
1367                .task_manager
1368                .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1369            permissions: self.permissions,
1370            resources: self.resources,
1371            cancellation: self.cancellation,
1372            mutators: self.mutators,
1373            observers: self.observers,
1374            transcript_observers: self.transcript_observers,
1375            transcript: self.transcript,
1376            input: self.input,
1377        })
1378    }
1379}
1380
1381/// The runtime driver that advances the agent loop step by step.
1382///
1383/// Obtained from [`Agent::start`] with the builder's preloaded transcript
1384/// and pending-input queue baked in.
1385/// The typical usage pattern is:
1386///
1387/// 1. Call [`next`](LoopDriver::next) to advance the loop.
1388/// 2. Handle the returned [`LoopStep`]:
1389///    - [`LoopStep::Finished`] -- the turn completed, inspect the result.
1390///    - [`LoopStep::Interrupt`] -- resolve the interrupt via the bound
1391///      [`Pending*`](LoopInterrupt) handle, then call `next` again.
1392///
1393/// # Example
1394///
1395/// ```rust,no_run
1396/// use agentkit_core::{Item, ItemKind};
1397/// use agentkit_loop::{LoopDriver, LoopStep};
1398///
1399/// # async fn drive<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1400/// let step = driver.next().await?;
1401/// match step {
1402///     LoopStep::Finished(result) => println!("Done: {:?}", result.finish_reason),
1403///     LoopStep::Interrupt(interrupt) => {
1404///         // Resolve via the pending handle, then call next() again.
1405///         println!("Interrupted: {interrupt:?}");
1406///     }
1407/// }
1408/// # Ok(())
1409/// # }
1410/// ```
1411pub struct LoopDriver<S>
1412where
1413    S: ModelSession,
1414{
1415    session_id: SessionId,
1416    provider_name: Option<String>,
1417    default_cache: Option<PromptCacheRequest>,
1418    next_turn_cache: Option<PromptCacheRequest>,
1419    session: Option<S>,
1420    tool_executor: Arc<dyn ToolExecutor>,
1421    task_manager: Arc<dyn TaskManager>,
1422    permissions: Arc<dyn PermissionChecker>,
1423    resources: Arc<dyn ToolResources>,
1424    cancellation: Option<CancellationHandle>,
1425    mutators: Vec<Arc<dyn LoopMutator>>,
1426    observers: Vec<Arc<dyn LoopObserver>>,
1427    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1428    transcript: Vec<Item>,
1429    pending_input: Vec<Item>,
1430    pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1431    pending_approval_order: VecDeque<ToolCallId>,
1432    active_tool_round: Option<ActiveToolRound>,
1433    pending_round_resume: Option<agentkit_core::TurnId>,
1434    next_turn_index: u64,
1435    /// Call ids whose original tool_use was already paired with a
1436    /// synthetic detach tool_result. When the real result eventually
1437    /// arrives via the task manager, we MUST NOT emit a second
1438    /// tool_result for the same id — the provider schema requires
1439    /// exactly one tool_result per tool_use. Instead we route the
1440    /// resolution into a [`ItemKind::Notification`] item that the model
1441    /// can react to on the next turn.
1442    detached_call_ids: HashSet<ToolCallId>,
1443}
1444
1445impl<S> LoopDriver<S>
1446where
1447    S: ModelSession,
1448{
1449    fn execute_tool_span(
1450        &self,
1451        request: &ToolRequest,
1452        turn_id: &agentkit_core::TurnId,
1453        launch_kind: &'static str,
1454    ) -> tracing::Span {
1455        tracing::info_span!(
1456            "agent.execute_tool",
1457            "otel.name" = %format!("execute_tool {}", request.tool_name),
1458            "gen_ai.operation.name" = "execute_tool",
1459            "gen_ai.tool.name" = %request.tool_name,
1460            "gen_ai.tool.call.id" = %request.call_id,
1461            "gen_ai.conversation.id" = %self.session_id,
1462            "error.type" = tracing::field::Empty,
1463            session.id = %self.session_id,
1464            turn.id = %turn_id,
1465            launch_kind = launch_kind,
1466        )
1467    }
1468
1469    fn start_task_via_manager(
1470        &self,
1471        task_id: Option<TaskId>,
1472        tool_request: ToolRequest,
1473        kind: TaskLaunchKind,
1474        cancellation: Option<TurnCancellation>,
1475    ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1476    {
1477        let task_manager = self.task_manager.clone();
1478        let tool_executor = self.tool_executor.clone();
1479        let permissions = self.permissions.clone();
1480        let resources = self.resources.clone();
1481        let session_id = self.session_id.clone();
1482        let turn_id = tool_request.turn_id.clone();
1483        let metadata = tool_request.metadata.clone();
1484
1485        async move {
1486            task_manager
1487                .start_task(
1488                    TaskLaunchRequest {
1489                        task_id,
1490                        request: tool_request.clone(),
1491                        kind,
1492                    },
1493                    TaskStartContext {
1494                        executor: tool_executor.clone(),
1495                        tool_context: {
1496                            let execution_scope = ToolExecutionScope {
1497                                executor: tool_executor,
1498                                session_id: session_id.clone(),
1499                                turn_id: turn_id.clone(),
1500                                permissions: permissions.clone(),
1501                                resources: resources.clone(),
1502                                cancellation: cancellation.clone(),
1503                            };
1504                            OwnedToolContext {
1505                                session_id,
1506                                turn_id,
1507                                metadata,
1508                                permissions,
1509                                resources,
1510                                cancellation,
1511                                execution_scope: Some(execution_scope),
1512                                approved_request: None,
1513                            }
1514                        },
1515                    },
1516                )
1517                .await
1518                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1519        }
1520    }
1521
1522    fn has_pending_interrupts(&self) -> bool {
1523        !self.pending_approvals.is_empty()
1524    }
1525
1526    fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1527        for event in events {
1528            self.emit(AgentEvent::ToolCatalogChanged(event));
1529        }
1530    }
1531
1532    fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1533        let call_id = task.tool_request.call_id.clone();
1534        let call = ToolCallPart {
1535            id: call_id.clone(),
1536            name: task.tool_request.tool_name.to_string(),
1537            input: task.tool_request.input.clone(),
1538            metadata: task.tool_request.metadata.clone(),
1539        };
1540        let mut request = task.approval;
1541        request.call_id = Some(call_id.clone());
1542        let pending = PendingApprovalToolCall {
1543            request: request.clone(),
1544            decision: None,
1545            surfaced: false,
1546            turn_id: turn_id.clone(),
1547            task_id: task.task_id,
1548            call,
1549            tool_request: task.tool_request,
1550        };
1551        self.pending_approvals.insert(call_id.clone(), pending);
1552        if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1553            self.pending_approval_order.push_back(call_id);
1554        }
1555        self.emit(AgentEvent::ApprovalRequired(request));
1556    }
1557
1558    fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1559        for call_id in self.pending_approval_order.clone() {
1560            let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1561                continue;
1562            };
1563            if pending.decision.is_none() && !pending.surfaced {
1564                pending.surfaced = true;
1565                return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1566                    PendingApproval {
1567                        request: pending.request.clone(),
1568                    },
1569                )));
1570            }
1571        }
1572        None
1573    }
1574
1575    fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1576        self.pending_approval_order.iter().find_map(|call_id| {
1577            self.pending_approvals.get(call_id).and_then(|pending| {
1578                pending.decision.is_none().then(|| {
1579                    LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1580                        request: pending.request.clone(),
1581                    }))
1582                })
1583            })
1584        })
1585    }
1586
1587    fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1588        let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1589            self.pending_approvals
1590                .get(call_id)
1591                .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1592        })?;
1593        self.pending_approval_order.retain(|id| id != &call_id);
1594        self.pending_approvals.remove(&call_id)
1595    }
1596
1597    fn queue_resolution_interrupt(
1598        &mut self,
1599        turn_id: &agentkit_core::TurnId,
1600        resolution: TaskResolution,
1601    ) -> Option<LoopStep> {
1602        match resolution {
1603            TaskResolution::Item(item) => {
1604                self.append_tool_result_item(item);
1605                None
1606            }
1607            TaskResolution::Approval(task) => {
1608                self.enqueue_pending_approval(turn_id, task);
1609                self.take_next_unsurfaced_approval_interrupt()
1610            }
1611        }
1612    }
1613
1614    async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1615        let PendingLoopUpdates { mut resolutions } = self
1616            .task_manager
1617            .take_pending_loop_updates()
1618            .await
1619            .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1620        let mut saw_items = false;
1621        while let Some(resolution) = resolutions.pop_front() {
1622            match resolution {
1623                TaskResolution::Item(item) => {
1624                    self.append_tool_result_item(item);
1625                    saw_items = true;
1626                }
1627                TaskResolution::Approval(task) => {
1628                    self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1629                }
1630            }
1631        }
1632        Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1633    }
1634
1635    async fn run_mutators(
1636        &mut self,
1637        point: MutationPoint,
1638        turn_id: Option<&agentkit_core::TurnId>,
1639        cancellation: Option<TurnCancellation>,
1640    ) -> Result<(), LoopError> {
1641        if self.mutators.is_empty() {
1642            return Ok(());
1643        }
1644        if cancellation
1645            .as_ref()
1646            .is_some_and(TurnCancellation::is_cancelled)
1647        {
1648            return Err(LoopError::Cancelled);
1649        }
1650        let mutators = self.mutators.clone();
1651        let session_id = self.session_id.clone();
1652        let observers = self.observers.clone();
1653        let emitter = DriverEmitter {
1654            observers: &observers,
1655        };
1656        let mut cursor = TranscriptCursor {
1657            items: &mut self.transcript,
1658            dirty: false,
1659        };
1660        for mutator in &mutators {
1661            if cancellation
1662                .as_ref()
1663                .is_some_and(TurnCancellation::is_cancelled)
1664            {
1665                return Err(LoopError::Cancelled);
1666            }
1667            let ctx = LoopCtx {
1668                session_id: &session_id,
1669                turn_id,
1670                point,
1671                cancellation: cancellation.clone(),
1672                emitter: &emitter,
1673            };
1674            mutator.mutate(&mut cursor, ctx).await?;
1675        }
1676        if cursor.dirty {
1677            validate_transcript_invariants(cursor.items)?;
1678        }
1679        Ok(())
1680    }
1681
1682    async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1683        let Some(_) = self.active_tool_round.as_ref() else {
1684            return Ok(None);
1685        };
1686        loop {
1687            let cancellation = self
1688                .cancellation
1689                .as_ref()
1690                .map(CancellationHandle::checkpoint);
1691            let turn_id = self
1692                .active_tool_round
1693                .as_ref()
1694                .map(|active| active.turn_id.clone())
1695                .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1696
1697            if cancellation
1698                .as_ref()
1699                .is_some_and(TurnCancellation::is_cancelled)
1700            {
1701                self.task_manager
1702                    .on_turn_interrupted(&turn_id)
1703                    .await
1704                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1705                self.active_tool_round = None;
1706                return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1707            }
1708
1709            let next_call = self
1710                .active_tool_round
1711                .as_mut()
1712                .and_then(|active| active.pending_calls.pop_front());
1713            if let Some((_call, tool_request)) = next_call {
1714                use tracing::Instrument;
1715                let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1716                match self
1717                    .start_task_via_manager(
1718                        None,
1719                        tool_request.clone(),
1720                        TaskLaunchKind::Plain,
1721                        cancellation.clone(),
1722                    )
1723                    .instrument(dispatch_span.clone())
1724                    .await?
1725                {
1726                    TaskStartOutcome::Ready(resolution) => {
1727                        let resolution = *resolution;
1728                        match resolution {
1729                            TaskResolution::Item(item) => {
1730                                if tool_result_is_error(&item) {
1731                                    dispatch_span.record("error.type", "tool_error");
1732                                }
1733                                if let Some(active) = self.active_tool_round.as_mut() {
1734                                    active.foreground_progressed = true;
1735                                }
1736                                self.append_tool_result_item(item);
1737                            }
1738                            TaskResolution::Approval(task) => {
1739                                self.enqueue_pending_approval(&turn_id, task);
1740                            }
1741                        }
1742                        continue;
1743                    }
1744                    TaskStartOutcome::Pending { kind, .. } => {
1745                        if kind == agentkit_task_manager::TaskKind::Background
1746                            && let Some(active) = self.active_tool_round.as_mut()
1747                        {
1748                            active.background_pending = true;
1749                        }
1750                        continue;
1751                    }
1752                }
1753            }
1754
1755            match self
1756                .task_manager
1757                .wait_for_turn(&turn_id, cancellation.clone())
1758                .await
1759                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1760            {
1761                Some(TurnTaskUpdate::Resolution(resolution)) => {
1762                    let resolution = *resolution;
1763                    match resolution {
1764                        TaskResolution::Item(item) => {
1765                            if let Some(active) = self.active_tool_round.as_mut() {
1766                                active.foreground_progressed = true;
1767                            }
1768                            self.append_tool_result_item(item);
1769                        }
1770                        TaskResolution::Approval(task) => {
1771                            self.enqueue_pending_approval(&turn_id, task);
1772                        }
1773                    }
1774                }
1775                Some(TurnTaskUpdate::Detached(snapshot)) => {
1776                    // The task was promoted to background. Push a synthetic
1777                    // tool result so the model knows the call is still
1778                    // running and can continue its turn. Track the
1779                    // call_id so when the real result arrives later via
1780                    // the task manager, we route it to a Notification
1781                    // item instead of emitting a second tool_result for
1782                    // the same call_id (which would violate the
1783                    // provider schema — exactly one tool_result per
1784                    // tool_use).
1785                    // Order matters: append the synthetic placeholder FIRST as
1786                    // a real Tool/ToolResult so the tool_use slot is filled
1787                    // (provider schemas require exactly one tool_result per
1788                    // tool_use). Only AFTER appending do we record the
1789                    // call_id in `detached_call_ids` — so the *next* item
1790                    // for this call_id (the real completion arriving later
1791                    // via the task manager) is the one converted to a
1792                    // Notification by `maybe_convert_detached`.
1793                    let detached_call_id = snapshot.call_id.clone();
1794                    self.append_tool_result_item(Item {
1795                        id: None,
1796                        kind: ItemKind::Tool,
1797                        parts: vec![Part::ToolResult(ToolResultPart {
1798                            call_id: detached_call_id.clone(),
1799                            output: ToolOutput::Text(format!(
1800                                "Tool {} is now running in the background. \
1801                                 The result will be delivered when it completes.",
1802                                snapshot.tool_name,
1803                            )),
1804                            is_error: false,
1805                            metadata: MetadataMap::new(),
1806                        })],
1807                        metadata: MetadataMap::new(),
1808                        usage: None,
1809                        finish_reason: None,
1810                        created_at: None,
1811                    });
1812                    self.detached_call_ids.insert(detached_call_id);
1813                    if let Some(active) = self.active_tool_round.as_mut() {
1814                        active.background_pending = true;
1815                        active.foreground_progressed = true;
1816                    }
1817                }
1818                None => {
1819                    if cancellation
1820                        .as_ref()
1821                        .is_some_and(TurnCancellation::is_cancelled)
1822                    {
1823                        self.task_manager
1824                            .on_turn_interrupted(&turn_id)
1825                            .await
1826                            .map_err(|error| {
1827                                LoopError::Tool(ToolError::Internal(error.to_string()))
1828                            })?;
1829                        self.active_tool_round = None;
1830                        return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1831                    }
1832                    let active = self.active_tool_round.take().ok_or_else(|| {
1833                        LoopError::InvalidState("missing active tool round".into())
1834                    })?;
1835                    if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1836                        return Ok(Some(step));
1837                    }
1838                    if let Some(step) = self.next_unresolved_approval_interrupt() {
1839                        return Ok(Some(step));
1840                    }
1841                    if active.background_pending && !active.foreground_progressed {
1842                        return Ok(None);
1843                    }
1844                    // Yield control back to the host between tool rounds.
1845                    // All tool calls in this round have results in the
1846                    // transcript; the transcript is provider-valid.  The
1847                    // host may submit_input before calling next() to
1848                    // resume, which will re-enter drive_turn via
1849                    // pending_round_resume.
1850                    let info = ToolRoundInfo {
1851                        session_id: self.session_id.clone(),
1852                        turn_id: turn_id.clone(),
1853                        transcript_len: self.transcript.len(),
1854                    };
1855                    self.pending_round_resume = Some(turn_id);
1856                    return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1857                        info,
1858                    ))));
1859                }
1860            }
1861        }
1862    }
1863
1864    #[tracing::instrument(
1865        name = "agent.turn",
1866        skip_all,
1867        fields(
1868            otel.name = "invoke_agent",
1869            gen_ai.operation.name = "invoke_agent",
1870            gen_ai.conversation.id = %self.session_id,
1871            gen_ai.provider.name = tracing::field::Empty,
1872            gen_ai.usage.input_tokens = tracing::field::Empty,
1873            gen_ai.usage.output_tokens = tracing::field::Empty,
1874            session.id = %self.session_id,
1875            turn.id = %turn_id,
1876            transcript.len = self.transcript.len(),
1877            saw_tool_call = tracing::field::Empty,
1878            finish_reason = tracing::field::Empty,
1879        ),
1880    )]
1881    async fn drive_turn(
1882        &mut self,
1883        turn_id: agentkit_core::TurnId,
1884        emit_started: bool,
1885        mutation_point: MutationPoint,
1886    ) -> Result<LoopStep, LoopError> {
1887        if let Some(provider) = &self.provider_name {
1888            tracing::Span::current().record("gen_ai.provider.name", provider.as_str());
1889        }
1890        let cancellation = self
1891            .cancellation
1892            .as_ref()
1893            .map(CancellationHandle::checkpoint);
1894        match self
1895            .run_mutators(mutation_point, Some(&turn_id), cancellation.clone())
1896            .await
1897        {
1898            Ok(()) => {}
1899            Err(LoopError::Cancelled) => {
1900                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1901            }
1902            Err(error) => return Err(error),
1903        }
1904
1905        // A mutator may have removed the freshly-submitted input (e.g. a
1906        // compaction pass that summarised the latest user turn away), leaving
1907        // the transcript ending in an assistant message or empty — nothing new
1908        // for the model to respond to. Finish the turn rather than dispatch an
1909        // assistant-prefill request, which most providers reject.
1910        if !transcript_has_pending_input(&self.transcript) {
1911            let turn_result = TurnResult {
1912                turn_id,
1913                finish_reason: FinishReason::Completed,
1914                items: Vec::new(),
1915                usage: None,
1916                metadata: MetadataMap::new(),
1917            };
1918            self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1919            return Ok(LoopStep::Finished(turn_result));
1920        }
1921
1922        if emit_started {
1923            self.emit(AgentEvent::TurnStarted {
1924                session_id: self.session_id.clone(),
1925                turn_id: turn_id.clone(),
1926            });
1927        }
1928        if cancellation
1929            .as_ref()
1930            .is_some_and(TurnCancellation::is_cancelled)
1931        {
1932            return self.finish_cancelled(turn_id, interrupted_assistant_items());
1933        }
1934
1935        let catalog_events = self.tool_executor.drain_catalog_events();
1936        self.emit_tool_catalog_events(catalog_events);
1937
1938        let request = TurnRequest {
1939            session_id: self.session_id.clone(),
1940            turn_id: turn_id.clone(),
1941            transcript: self.transcript.clone(),
1942            available_tools: self.tool_executor.specs(),
1943            cache: self
1944                .next_turn_cache
1945                .take()
1946                .or_else(|| self.default_cache.clone()),
1947            metadata: MetadataMap::new(),
1948        };
1949
1950        let session = self
1951            .session
1952            .as_mut()
1953            .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1954
1955        // Inference span per the OTel GenAI semantic conventions. It wraps the
1956        // model request and the full event drain rather than just `begin_turn`,
1957        // so attributes that streaming adapters only learn mid-stream (usage,
1958        // stop reason, response identity) still land before the span closes.
1959        // `otel.name` carries the dynamic `chat {model}` span name for
1960        // OpenTelemetry bridges since tracing span names are static.
1961        let chat_span = tracing::info_span!(
1962            "chat",
1963            "otel.name" = tracing::field::Empty,
1964            "otel.kind" = "client",
1965            "gen_ai.operation.name" = "chat",
1966            "gen_ai.provider.name" = tracing::field::Empty,
1967            "gen_ai.conversation.id" = %self.session_id,
1968            "gen_ai.request.model" = tracing::field::Empty,
1969            "gen_ai.response.model" = tracing::field::Empty,
1970            "gen_ai.response.id" = tracing::field::Empty,
1971            "gen_ai.response.finish_reasons" = tracing::field::Empty,
1972            "gen_ai.usage.input_tokens" = tracing::field::Empty,
1973            "gen_ai.usage.output_tokens" = tracing::field::Empty,
1974        );
1975        if let Some(provider) = &self.provider_name {
1976            chat_span.record("gen_ai.provider.name", provider.as_str());
1977        }
1978        match session.model_name() {
1979            Some(model) => {
1980                chat_span.record("gen_ai.request.model", model);
1981                chat_span.record("otel.name", format!("chat {model}").as_str());
1982            }
1983            None => {
1984                chat_span.record("otel.name", "chat");
1985            }
1986        }
1987
1988        use tracing::Instrument;
1989        let mut turn = match session
1990            .begin_turn(request, cancellation.clone())
1991            .instrument(chat_span.clone())
1992            .await
1993        {
1994            Ok(turn) => turn,
1995            Err(LoopError::Cancelled) => {
1996                self.task_manager
1997                    .on_turn_interrupted(&turn_id)
1998                    .await
1999                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2000                return self.finish_cancelled(turn_id, interrupted_assistant_items());
2001            }
2002            Err(error) => return Err(error),
2003        };
2004        let mut saw_tool_call = false;
2005        let mut finished_result = None;
2006
2007        while let Some(event) = match turn
2008            .next_event(cancellation.clone())
2009            .instrument(chat_span.clone())
2010            .await
2011        {
2012            Ok(event) => event,
2013            Err(LoopError::Cancelled) => {
2014                self.task_manager
2015                    .on_turn_interrupted(&turn_id)
2016                    .await
2017                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2018                return self.finish_cancelled(turn_id, interrupted_assistant_items());
2019            }
2020            Err(error) => return Err(error),
2021        } {
2022            if cancellation
2023                .as_ref()
2024                .is_some_and(TurnCancellation::is_cancelled)
2025            {
2026                self.task_manager
2027                    .on_turn_interrupted(&turn_id)
2028                    .await
2029                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2030                return self.finish_cancelled(turn_id, interrupted_assistant_items());
2031            }
2032            match event {
2033                ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
2034                ModelTurnEvent::Usage(usage) => {
2035                    if let Some(tokens) = &usage.tokens {
2036                        chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2037                        chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2038                    }
2039                    self.emit(AgentEvent::UsageUpdated(usage));
2040                }
2041                ModelTurnEvent::ToolCall(call) => {
2042                    saw_tool_call = true;
2043                    self.emit(AgentEvent::ToolCallRequested(call.clone()));
2044                }
2045                ModelTurnEvent::Finished(result) => {
2046                    finished_result = Some(result);
2047                    break;
2048                }
2049            }
2050        }
2051
2052        let mut result = finished_result.ok_or_else(|| {
2053            LoopError::Provider("model turn ended without a Finished event".into())
2054        })?;
2055        if let Some(model) = &result.model {
2056            chat_span.record("gen_ai.response.model", model.as_str());
2057        }
2058        if let Some(id) = &result.response_id {
2059            chat_span.record("gen_ai.response.id", id.as_str());
2060        }
2061        if let Some(tokens) = result
2062            .usage
2063            .as_ref()
2064            .and_then(|usage| usage.tokens.as_ref())
2065        {
2066            chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2067            chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2068        }
2069        chat_span.record(
2070            "gen_ai.response.finish_reasons",
2071            tracing::field::debug(&result.finish_reason),
2072        );
2073        drop(chat_span);
2074        tracing::Span::current().record("saw_tool_call", saw_tool_call);
2075        tracing::Span::current().record(
2076            "finish_reason",
2077            tracing::field::debug(&result.finish_reason),
2078        );
2079        if let Some(tokens) = result
2080            .usage
2081            .as_ref()
2082            .and_then(|usage| usage.tokens.as_ref())
2083        {
2084            tracing::Span::current().record("gen_ai.usage.input_tokens", tokens.input_tokens);
2085            tracing::Span::current().record("gen_ai.usage.output_tokens", tokens.output_tokens);
2086        }
2087        let now = Timestamp::now();
2088        let usage = result.usage.clone();
2089        let finish_reason = result.finish_reason.clone();
2090        let output_items: Vec<Item> = result
2091            .output_items
2092            .drain(..)
2093            .map(|mut item| {
2094                if matches!(item.kind, ItemKind::Assistant) {
2095                    if item.usage.is_none() {
2096                        item.usage = usage.clone();
2097                    }
2098                    if item.finish_reason.is_none() {
2099                        item.finish_reason = Some(finish_reason.clone());
2100                    }
2101                }
2102                if item.created_at.is_none() {
2103                    item.created_at = Some(now);
2104                }
2105                item
2106            })
2107            .collect();
2108        self.extend_transcript(output_items.clone());
2109
2110        if saw_tool_call {
2111            let pending_calls = extract_tool_calls(&output_items)
2112                .into_iter()
2113                .map(|call| {
2114                    let tool_request = ToolRequest {
2115                        call_id: call.id.clone(),
2116                        tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
2117                        input: call.input.clone(),
2118                        session_id: self.session_id.clone(),
2119                        turn_id: turn_id.clone(),
2120                        metadata: call.metadata.clone(),
2121                    };
2122                    (call, tool_request)
2123                })
2124                .collect();
2125            self.active_tool_round = Some(ActiveToolRound {
2126                turn_id: turn_id.clone(),
2127                pending_calls,
2128                background_pending: false,
2129                foreground_progressed: false,
2130            });
2131            if let Some(step) = self.continue_active_tool_round().await? {
2132                return Ok(step);
2133            }
2134            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2135                InputRequest {
2136                    session_id: self.session_id.clone(),
2137                    reason: "driver is waiting for input".into(),
2138                },
2139            )));
2140        }
2141
2142        let turn_result = TurnResult {
2143            turn_id,
2144            finish_reason: result.finish_reason,
2145            items: output_items,
2146            usage: result.usage,
2147            metadata: result.metadata,
2148        };
2149        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2150        Ok(LoopStep::Finished(turn_result))
2151    }
2152
2153    async fn resume_after_approval(
2154        &mut self,
2155        pending: PendingApprovalToolCall,
2156    ) -> Result<LoopStep, LoopError> {
2157        let decision = pending
2158            .decision
2159            .clone()
2160            .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
2161
2162        match decision {
2163            ApprovalDecision::Approve => {
2164                use tracing::Instrument;
2165                let dispatch_span =
2166                    self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
2167                match self
2168                    .start_task_via_manager(
2169                        Some(pending.task_id.clone()),
2170                        pending.tool_request.clone(),
2171                        TaskLaunchKind::Approved(pending.request.clone()),
2172                        self.cancellation
2173                            .as_ref()
2174                            .map(CancellationHandle::checkpoint),
2175                    )
2176                    .instrument(dispatch_span.clone())
2177                    .await?
2178                {
2179                    TaskStartOutcome::Ready(resolution) => {
2180                        let resolution = *resolution;
2181                        if let TaskResolution::Item(item) = &resolution
2182                            && tool_result_is_error(item)
2183                        {
2184                            dispatch_span.record("error.type", "tool_error");
2185                        }
2186                        if let Some(step) =
2187                            self.queue_resolution_interrupt(&pending.turn_id, resolution)
2188                        {
2189                            return Ok(step);
2190                        }
2191                    }
2192                    TaskStartOutcome::Pending { .. } => {}
2193                }
2194            }
2195            ApprovalDecision::Deny { reason } => {
2196                self.append_tool_result_item(Item {
2197                    id: None,
2198                    kind: ItemKind::Tool,
2199                    parts: vec![Part::ToolResult(ToolResultPart {
2200                        call_id: pending.call.id.clone(),
2201                        output: ToolOutput::Text(
2202                            reason.unwrap_or_else(|| "approval denied".into()),
2203                        ),
2204                        is_error: true,
2205                        metadata: pending.call.metadata.clone(),
2206                    })],
2207                    metadata: MetadataMap::new(),
2208                    usage: None,
2209                    finish_reason: None,
2210                    created_at: None,
2211                });
2212            }
2213        }
2214
2215        if let Some(step) = self.continue_active_tool_round().await? {
2216            Ok(step)
2217        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2218            Ok(step)
2219        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
2220            Ok(step)
2221        } else {
2222            self.drive_turn(pending.turn_id, false, MutationPoint::AfterToolResult)
2223                .await
2224        }
2225    }
2226
2227    fn finish_cancelled(
2228        &mut self,
2229        turn_id: agentkit_core::TurnId,
2230        items: Vec<Item>,
2231    ) -> Result<LoopStep, LoopError> {
2232        self.extend_transcript(items.clone());
2233        let turn_result = TurnResult {
2234            turn_id,
2235            finish_reason: FinishReason::Cancelled,
2236            items,
2237            usage: None,
2238            metadata: interrupted_metadata("turn"),
2239        };
2240        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2241        Ok(LoopStep::Finished(turn_result))
2242    }
2243
2244    /// Internal entry point for buffering user input. Reachable only via
2245    /// [`InputRequest::submit`] (resolves an `AwaitingInput` interrupt,
2246    /// including the very first one after [`Agent::start`]) and
2247    /// [`ToolRoundInfo::submit`] (interjects between tool rounds). Prior
2248    /// transcript items — the passive starting state of a session — are
2249    /// preloaded via [`AgentBuilder::transcript`]; an opening user turn for
2250    /// one-shot calls is preloaded via [`AgentBuilder::input`]. New input
2251    /// after start-up always flows through one of the typed `submit`
2252    /// handles.
2253    pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
2254        if self.has_pending_interrupts() {
2255            return Err(LoopError::InvalidState(
2256                "cannot submit input while an interrupt is pending".into(),
2257            ));
2258        }
2259        self.emit(AgentEvent::InputAccepted {
2260            session_id: self.session_id.clone(),
2261            items: input.clone(),
2262        });
2263        self.pending_input.extend(input);
2264        Ok(())
2265    }
2266
2267    /// Override the prompt cache request for the next model turn.
2268    ///
2269    /// The override is consumed the next time the driver starts a model turn.
2270    /// Session-level defaults still apply to later turns.
2271    pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2272        if self.has_pending_interrupts() {
2273            return Err(LoopError::InvalidState(
2274                "cannot update next-turn cache while an interrupt is pending".into(),
2275            ));
2276        }
2277        self.next_turn_cache = Some(cache);
2278        Ok(())
2279    }
2280
2281    #[cfg(test)]
2282    pub(crate) fn submit_input_with_cache(
2283        &mut self,
2284        input: Vec<Item>,
2285        cache: PromptCacheRequest,
2286    ) -> Result<(), LoopError> {
2287        self.set_next_turn_cache(cache)?;
2288        self.submit_input(input)
2289    }
2290
2291    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`].
2292    ///
2293    /// After calling this, invoke [`next`](LoopDriver::next) to continue the
2294    /// loop.  If the decision is [`ApprovalDecision::Approve`] the tool call
2295    /// executes; if denied, an error result is fed back to the model.
2296    ///
2297    /// # Errors
2298    ///
2299    /// Returns [`LoopError::InvalidState`] if no approval is pending.
2300    pub fn resolve_approval_for(
2301        &mut self,
2302        call_id: ToolCallId,
2303        decision: ApprovalDecision,
2304    ) -> Result<(), LoopError> {
2305        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2306            return Err(LoopError::InvalidState(format!(
2307                "no approval request is pending for call {}",
2308                call_id.0
2309            )));
2310        };
2311        pending.decision = Some(decision.clone());
2312        self.emit(AgentEvent::ApprovalResolved {
2313            approved: matches!(decision, ApprovalDecision::Approve),
2314        });
2315        Ok(())
2316    }
2317
2318    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] with a patched
2319    /// input that replaces the model's original tool arguments.
2320    ///
2321    /// Equivalent to calling [`resolve_approval_for`] with
2322    /// [`ApprovalDecision::Approve`] except the tool sees `input` instead of
2323    /// what the model emitted. The transcript still records the model's
2324    /// original call.
2325    ///
2326    /// # Errors
2327    ///
2328    /// Returns [`LoopError::InvalidState`] if no approval is pending for
2329    /// `call_id`.
2330    pub fn resolve_approval_for_with_patched_input(
2331        &mut self,
2332        call_id: ToolCallId,
2333        input: serde_json::Value,
2334    ) -> Result<(), LoopError> {
2335        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2336            return Err(LoopError::InvalidState(format!(
2337                "no approval request is pending for call {}",
2338                call_id.0
2339            )));
2340        };
2341        pending.tool_request.input = input;
2342        self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2343    }
2344
2345    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] when exactly one
2346    /// approval is outstanding.
2347    pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2348        let mut unresolved = self
2349            .pending_approval_order
2350            .iter()
2351            .filter(|call_id| {
2352                self.pending_approvals
2353                    .get(*call_id)
2354                    .is_some_and(|pending| pending.decision.is_none())
2355            })
2356            .cloned();
2357        let Some(call_id) = unresolved.next() else {
2358            return Err(LoopError::InvalidState(
2359                "no approval request is pending".into(),
2360            ));
2361        };
2362        if unresolved.next().is_some() {
2363            return Err(LoopError::InvalidState(
2364                "multiple approvals are pending; use resolve_approval_for".into(),
2365            ));
2366        }
2367        self.resolve_approval_for(call_id, decision)
2368    }
2369
2370    /// Take a read-only snapshot of the driver's current transcript and input queue.
2371    pub fn snapshot(&self) -> LoopSnapshot {
2372        LoopSnapshot {
2373            session_id: self.session_id.clone(),
2374            transcript: self.transcript.clone(),
2375            pending_input: self.pending_input.clone(),
2376        }
2377    }
2378
2379    /// Advance the loop by one step.
2380    ///
2381    /// This is the main method for driving the agent.  It processes pending
2382    /// interrupt resolutions, consumes queued input, starts a model turn,
2383    /// executes tool calls, and returns once the turn finishes or an
2384    /// interrupt occurs.
2385    ///
2386    /// If no input is queued and no interrupt is pending, returns
2387    /// [`LoopStep::Interrupt(LoopInterrupt::AwaitingInput(..))`](LoopInterrupt::AwaitingInput).
2388    /// This is the steady state after [`Agent::start`] when no input was
2389    /// preloaded via [`AgentBuilder::input`]: the prior transcript loaded
2390    /// via [`AgentBuilder::transcript`] is passive, so the first call
2391    /// surfaces `AwaitingInput` and waits for the host to supply input via
2392    /// [`InputRequest::submit`] before any model turn is dispatched. If
2393    /// input was preloaded, the first call dispatches the model directly.
2394    ///
2395    /// # Errors
2396    ///
2397    /// Returns [`LoopError::InvalidState`] if called while an unresolved
2398    /// interrupt is pending, or propagates provider / tool / compaction errors.
2399    pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2400        if let Some(pending) = self.take_next_resolved_approval() {
2401            return self.resume_after_approval(pending).await;
2402        }
2403
2404        if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2405            return Ok(step);
2406        }
2407
2408        if let Some(step) = self.next_unresolved_approval_interrupt() {
2409            return Ok(step);
2410        }
2411
2412        if let Some(step) = self.continue_active_tool_round().await? {
2413            return Ok(step);
2414        }
2415
2416        let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2417        if let Some(step) = loop_step {
2418            return Ok(step);
2419        }
2420
2421        // Resume after an AfterToolResult yield.  Any input submitted by the
2422        // host during the yield is folded into the transcript as part of the
2423        // continuation turn; background task results drained just above are
2424        // already in the transcript.
2425        if let Some(turn_id) = self.pending_round_resume.take() {
2426            let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2427            self.extend_transcript(drained);
2428            return self
2429                .drive_turn(turn_id, false, MutationPoint::AfterToolResult)
2430                .await;
2431        }
2432
2433        if self.pending_input.is_empty() && !had_loop_updates {
2434            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2435                InputRequest {
2436                    session_id: self.session_id.clone(),
2437                    reason: "driver is waiting for input".into(),
2438                },
2439            )));
2440        }
2441
2442        let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2443        self.next_turn_index += 1;
2444        let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2445        self.extend_transcript(drained);
2446        self.drive_turn(turn_id, true, MutationPoint::AfterTurnEnded)
2447            .await
2448    }
2449
2450    fn emit(&self, event: AgentEvent) {
2451        for observer in &self.observers {
2452            observer.handle_event(event.clone());
2453        }
2454    }
2455
2456    /// Append a single [`Item`] to the transcript and notify all
2457    /// registered [`TranscriptObserver`]s. The single mutation point —
2458    /// every push to `self.transcript` should funnel through here so
2459    /// observers see exactly what landed in the transcript.
2460    fn append_item(&mut self, mut item: Item) {
2461        if item.created_at.is_none() {
2462            item.created_at = Some(Timestamp::now());
2463        }
2464        for observer in &self.transcript_observers {
2465            observer.on_item_appended(&item);
2466        }
2467        self.transcript.push(item);
2468    }
2469
2470    /// Append a tool-result Item: emit one [`AgentEvent::ToolResultReceived`]
2471    /// per [`Part::ToolResult`] inside the Item, then funnel through
2472    /// [`Self::append_item`].
2473    ///
2474    /// If every `ToolResult` in the item references a `call_id` that was
2475    /// already paired with a synthetic detach tool_result, the item is
2476    /// converted to a [`ItemKind::Notification`] before appending.
2477    /// Without this, we would emit a second `tool_result` for the same
2478    /// `tool_use_id` — a provider-schema violation that
2479    /// Anthropic/OpenRouter reject as an "orphaned tool_result".
2480    /// Observers still see `ToolResultReceived` for each result so any
2481    /// UI spinner or task tracker can close.
2482    fn append_tool_result_item(&mut self, item: Item) {
2483        for part in &item.parts {
2484            if let Part::ToolResult(result) = part {
2485                self.emit(AgentEvent::ToolResultReceived(result.clone()));
2486            }
2487        }
2488        let item = self.maybe_convert_detached(item);
2489        self.append_item(item);
2490    }
2491
2492    fn maybe_convert_detached(&mut self, item: Item) -> Item {
2493        if !matches!(item.kind, ItemKind::Tool) {
2494            return item;
2495        }
2496        let results: Vec<&ToolResultPart> = item
2497            .parts
2498            .iter()
2499            .filter_map(|p| match p {
2500                Part::ToolResult(r) => Some(r),
2501                _ => None,
2502            })
2503            .collect();
2504        if results.is_empty()
2505            || !results
2506                .iter()
2507                .all(|r| self.detached_call_ids.contains(&r.call_id))
2508        {
2509            return item;
2510        }
2511        let mut text = String::new();
2512        for result in &results {
2513            self.detached_call_ids.remove(&result.call_id);
2514            if !text.is_empty() {
2515                text.push_str("\n\n");
2516            }
2517            let label = if result.is_error {
2518                "failed"
2519            } else {
2520                "completed"
2521            };
2522            let body = render_tool_output_brief(&result.output);
2523            text.push_str(&format!(
2524                "Background tool call {} {}: {body}",
2525                result.call_id.0, label
2526            ));
2527        }
2528        Item::notification(text)
2529    }
2530
2531    /// Append several Items in order through [`Self::append_item`].
2532    /// Pre-stamps `created_at` once per batch so all items in the batch
2533    /// share a timestamp and `append_item` skips its own clock read.
2534    fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2535        let now = Timestamp::now();
2536        for mut item in items {
2537            if item.created_at.is_none() {
2538                item.created_at = Some(now);
2539            }
2540            self.append_item(item);
2541        }
2542    }
2543}
2544
2545fn render_tool_output_brief(output: &ToolOutput) -> String {
2546    match output {
2547        ToolOutput::Text(t) => t.clone(),
2548        ToolOutput::Structured(value) => value.to_string(),
2549        ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2550        ToolOutput::Files(files) => format!("[{} files]", files.len()),
2551    }
2552}
2553
2554fn interrupted_metadata(stage: &str) -> MetadataMap {
2555    let mut metadata = MetadataMap::new();
2556    metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2557    metadata.insert(
2558        INTERRUPT_REASON_METADATA_KEY.into(),
2559        USER_CANCELLED_REASON.into(),
2560    );
2561    metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2562    metadata
2563}
2564
2565fn interrupted_assistant_items() -> Vec<Item> {
2566    vec![Item {
2567        id: None,
2568        kind: ItemKind::Assistant,
2569        parts: vec![Part::Text(TextPart {
2570            text: "Previous assistant response was interrupted by the user before completion."
2571                .into(),
2572            metadata: interrupted_metadata("assistant"),
2573        })],
2574        metadata: interrupted_metadata("assistant"),
2575        usage: None,
2576        finish_reason: None,
2577        created_at: None,
2578    }]
2579}
2580
2581/// Whether the transcript ends in something the model should respond to.
2582///
2583/// Only input-bearing trailing roles should drive inference. Passive transcript
2584/// state (`System`, `Developer`, `Context`), an assistant tail, or an empty
2585/// transcript has nothing new for the model to respond to.
2586fn transcript_has_pending_input(transcript: &[Item]) -> bool {
2587    matches!(
2588        transcript.last().map(|item| item.kind),
2589        Some(ItemKind::User | ItemKind::Tool | ItemKind::Notification)
2590    )
2591}
2592
2593fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2594    let mut calls = Vec::new();
2595    for item in items {
2596        for part in &item.parts {
2597            if let Part::ToolCall(call) = part {
2598                calls.push(call.clone());
2599            }
2600        }
2601    }
2602    calls
2603}
2604
2605fn tool_result_is_error(item: &Item) -> bool {
2606    item.parts
2607        .iter()
2608        .any(|part| matches!(part, Part::ToolResult(result) if result.is_error))
2609}
2610
2611/// Errors that can occur while driving the agent loop.
2612#[derive(Debug, Error)]
2613pub enum LoopError {
2614    /// The driver was in an unexpected state for the requested operation.
2615    #[error("invalid driver state: {0}")]
2616    InvalidState(String),
2617    /// The current turn was cancelled via the [`CancellationHandle`].
2618    #[error("turn cancelled")]
2619    Cancelled,
2620    /// An error originating from the model provider.
2621    #[error("provider error: {0}")]
2622    Provider(String),
2623    /// An error originating from tool execution.
2624    #[error("tool error: {0}")]
2625    Tool(#[from] ToolError),
2626    /// An error reported by a [`LoopMutator`] (compaction, redaction, repair).
2627    #[error("mutator error: {0}")]
2628    Mutator(String),
2629    /// The requested operation is not supported.
2630    #[error("unsupported operation: {0}")]
2631    Unsupported(String),
2632}
2633
2634/// Internal [`EventEmitter`] backed by the driver's observer slice. Lives
2635/// only for the duration of a [`LoopDriver::run_mutators`] call so the
2636/// borrow against `self.observers` stays disjoint from the cursor's borrow
2637/// of `self.transcript`.
2638struct DriverEmitter<'a> {
2639    observers: &'a [Arc<dyn LoopObserver>],
2640}
2641
2642impl<'a> EventEmitter for DriverEmitter<'a> {
2643    fn emit(&self, event: AgentEvent) {
2644        for observer in self.observers {
2645            observer.handle_event(event.clone());
2646        }
2647    }
2648}
2649
2650/// Hard-fails when a mutator's edit leaves the transcript protocol-invalid.
2651/// The only invariant currently checked is tool_use ↔ tool_result pairing
2652/// — every [`Part::ToolCall`] must be followed (in transcript order) by a
2653/// matching [`Part::ToolResult`] with the same `call_id`.
2654fn validate_transcript_invariants(transcript: &[Item]) -> Result<(), LoopError> {
2655    let mut pending: HashSet<ToolCallId> = HashSet::new();
2656    let mut seen_calls: HashSet<ToolCallId> = HashSet::new();
2657    let mut seen_results: HashSet<ToolCallId> = HashSet::new();
2658    for item in transcript {
2659        for part in &item.parts {
2660            match part {
2661                Part::ToolCall(call) => {
2662                    if !seen_calls.insert(call.id.clone()) {
2663                        return Err(LoopError::Mutator(format!(
2664                            "transcript invariant violation: duplicate tool_use: {}",
2665                            call.id.0
2666                        )));
2667                    }
2668                    pending.insert(call.id.clone());
2669                }
2670                Part::ToolResult(result) => {
2671                    if !pending.remove(&result.call_id) {
2672                        let kind = if seen_results.contains(&result.call_id) {
2673                            "duplicate"
2674                        } else {
2675                            "orphaned"
2676                        };
2677                        return Err(LoopError::Mutator(format!(
2678                            "transcript invariant violation: {kind} tool_result: {}",
2679                            result.call_id.0
2680                        )));
2681                    }
2682                    seen_results.insert(result.call_id.clone());
2683                }
2684                _ => {}
2685            }
2686        }
2687    }
2688    if !pending.is_empty() {
2689        let missing: Vec<String> = pending.into_iter().map(|id| id.0).collect();
2690        return Err(LoopError::Mutator(format!(
2691            "transcript invariant violation: tool_use(s) without matching tool_result: {}",
2692            missing.join(", ")
2693        )));
2694    }
2695    Ok(())
2696}
2697
2698#[cfg(test)]
2699mod tests {
2700    use std::collections::VecDeque;
2701    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2702    use std::sync::{Arc as StdArc, Mutex as StdMutex};
2703
2704    use agentkit_core::{
2705        CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolCallPart, ToolOutput,
2706        ToolResultPart,
2707    };
2708    use agentkit_task_manager::{
2709        AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2710        TaskRoutingPolicy,
2711    };
2712    use agentkit_tools_core::{
2713        FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2714        ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2715        ToolResult, ToolSpec,
2716    };
2717    use serde_json::{Value, json};
2718    use tokio::sync::Notify;
2719    use tokio::time::{Duration, timeout};
2720
2721    use super::*;
2722
2723    struct FakeAdapter;
2724    struct SlowAdapter;
2725    struct RecordingAdapter {
2726        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2727        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2728    }
2729    struct MultiToolAdapter;
2730    struct DualApprovalAdapter;
2731
2732    struct FakeSession;
2733    struct SlowSession;
2734    struct RecordingSession {
2735        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2736        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2737    }
2738    struct MultiToolSession;
2739    struct DualApprovalSession;
2740
2741    struct FakeTurn {
2742        events: VecDeque<ModelTurnEvent>,
2743    }
2744
2745    struct SlowTurn {
2746        emitted: bool,
2747    }
2748
2749    struct RecordingTurn {
2750        emitted: bool,
2751    }
2752    struct MultiToolTurn {
2753        events: VecDeque<ModelTurnEvent>,
2754    }
2755    struct DualApprovalTurn {
2756        events: VecDeque<ModelTurnEvent>,
2757    }
2758
2759    #[async_trait]
2760    impl ModelAdapter for FakeAdapter {
2761        type Session = FakeSession;
2762
2763        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2764            Ok(FakeSession)
2765        }
2766    }
2767
2768    #[async_trait]
2769    impl ModelAdapter for SlowAdapter {
2770        type Session = SlowSession;
2771
2772        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2773            Ok(SlowSession)
2774        }
2775    }
2776
2777    #[async_trait]
2778    impl ModelAdapter for RecordingAdapter {
2779        type Session = RecordingSession;
2780
2781        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2782            Ok(RecordingSession {
2783                seen_descriptions: self.seen_descriptions.clone(),
2784                seen_caches: self.seen_caches.clone(),
2785            })
2786        }
2787    }
2788
2789    #[async_trait]
2790    impl ModelAdapter for MultiToolAdapter {
2791        type Session = MultiToolSession;
2792
2793        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2794            Ok(MultiToolSession)
2795        }
2796    }
2797
2798    #[async_trait]
2799    impl ModelAdapter for DualApprovalAdapter {
2800        type Session = DualApprovalSession;
2801
2802        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2803            Ok(DualApprovalSession)
2804        }
2805    }
2806
2807    #[async_trait]
2808    impl ModelSession for FakeSession {
2809        type Turn = FakeTurn;
2810
2811        async fn begin_turn(
2812            &mut self,
2813            request: TurnRequest,
2814            _cancellation: Option<TurnCancellation>,
2815        ) -> Result<Self::Turn, LoopError> {
2816            let has_tool_result = request.transcript.iter().any(|item| {
2817                item.kind == ItemKind::Tool
2818                    && item
2819                        .parts
2820                        .iter()
2821                        .any(|part| matches!(part, Part::ToolResult(_)))
2822            });
2823            let tool_name = request
2824                .available_tools
2825                .first()
2826                .map(|tool| tool.name.0.clone())
2827                .unwrap_or_else(|| "echo".into());
2828
2829            let events = if has_tool_result {
2830                let result_text = request
2831                    .transcript
2832                    .iter()
2833                    .rev()
2834                    .find_map(|item| {
2835                        item.parts.iter().find_map(|part| match part {
2836                            Part::ToolResult(ToolResultPart {
2837                                output: ToolOutput::Text(text),
2838                                ..
2839                            }) => Some(text.clone()),
2840                            _ => None,
2841                        })
2842                    })
2843                    .unwrap_or_else(|| "missing".into());
2844
2845                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2846                    model: None,
2847                    response_id: None,
2848                    finish_reason: FinishReason::Completed,
2849                    output_items: vec![Item {
2850                        id: None,
2851                        kind: ItemKind::Assistant,
2852                        parts: vec![Part::Text(TextPart {
2853                            text: format!("tool said: {result_text}"),
2854                            metadata: MetadataMap::new(),
2855                        })],
2856                        metadata: MetadataMap::new(),
2857                        usage: None,
2858                        finish_reason: None,
2859                        created_at: None,
2860                    }],
2861                    usage: None,
2862                    metadata: MetadataMap::new(),
2863                })])
2864            } else {
2865                VecDeque::from([
2866                    ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2867                        id: ToolCallId::new("call-1"),
2868                        name: tool_name.clone(),
2869                        input: json!({ "value": "pong" }),
2870                        metadata: MetadataMap::new(),
2871                    }),
2872                    ModelTurnEvent::Finished(ModelTurnResult {
2873                        model: None,
2874                        response_id: None,
2875                        finish_reason: FinishReason::ToolCall,
2876                        output_items: vec![Item {
2877                            id: None,
2878                            kind: ItemKind::Assistant,
2879                            parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2880                                id: ToolCallId::new("call-1"),
2881                                name: tool_name,
2882                                input: json!({ "value": "pong" }),
2883                                metadata: MetadataMap::new(),
2884                            })],
2885                            metadata: MetadataMap::new(),
2886                            usage: None,
2887                            finish_reason: None,
2888                            created_at: None,
2889                        }],
2890                        usage: None,
2891                        metadata: MetadataMap::new(),
2892                    }),
2893                ])
2894            };
2895
2896            Ok(FakeTurn { events })
2897        }
2898    }
2899
2900    #[async_trait]
2901    impl ModelSession for SlowSession {
2902        type Turn = SlowTurn;
2903
2904        async fn begin_turn(
2905            &mut self,
2906            request: TurnRequest,
2907            cancellation: Option<TurnCancellation>,
2908        ) -> Result<Self::Turn, LoopError> {
2909            let should_block = request
2910                .transcript
2911                .iter()
2912                .rev()
2913                .find(|item| item.kind == ItemKind::User)
2914                .is_some_and(|item| {
2915                    item.parts.iter().any(|part| match part {
2916                        Part::Text(text) => text.text == "do the long task",
2917                        _ => false,
2918                    })
2919                });
2920
2921            if should_block && let Some(cancellation) = cancellation {
2922                cancellation.cancelled().await;
2923                return Err(LoopError::Cancelled);
2924            }
2925
2926            Ok(SlowTurn { emitted: false })
2927        }
2928    }
2929
2930    #[async_trait]
2931    impl ModelSession for RecordingSession {
2932        type Turn = RecordingTurn;
2933
2934        async fn begin_turn(
2935            &mut self,
2936            request: TurnRequest,
2937            _cancellation: Option<TurnCancellation>,
2938        ) -> Result<Self::Turn, LoopError> {
2939            let descriptions = request
2940                .available_tools
2941                .iter()
2942                .map(|tool| tool.description.clone())
2943                .collect::<Vec<_>>();
2944            self.seen_descriptions.lock().unwrap().push(descriptions);
2945            self.seen_caches.lock().unwrap().push(request.cache.clone());
2946
2947            Ok(RecordingTurn { emitted: false })
2948        }
2949    }
2950
2951    #[async_trait]
2952    impl ModelSession for MultiToolSession {
2953        type Turn = MultiToolTurn;
2954
2955        async fn begin_turn(
2956            &mut self,
2957            request: TurnRequest,
2958            _cancellation: Option<TurnCancellation>,
2959        ) -> Result<Self::Turn, LoopError> {
2960            let has_tool_result = request.transcript.iter().any(|item| {
2961                item.kind == ItemKind::Tool
2962                    && item
2963                        .parts
2964                        .iter()
2965                        .any(|part| matches!(part, Part::ToolResult(_)))
2966            });
2967
2968            let events = if has_tool_result {
2969                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2970                    model: None,
2971                    response_id: None,
2972                    finish_reason: FinishReason::Completed,
2973                    output_items: vec![Item {
2974                        id: None,
2975                        kind: ItemKind::Assistant,
2976                        parts: vec![Part::Text(TextPart {
2977                            text: "mixed tools finished".into(),
2978                            metadata: MetadataMap::new(),
2979                        })],
2980                        metadata: MetadataMap::new(),
2981                        usage: None,
2982                        finish_reason: None,
2983                        created_at: None,
2984                    }],
2985                    usage: None,
2986                    metadata: MetadataMap::new(),
2987                })])
2988            } else {
2989                let foreground = agentkit_core::ToolCallPart {
2990                    id: ToolCallId::new("call-foreground"),
2991                    name: "foreground-wait".into(),
2992                    input: json!({}),
2993                    metadata: MetadataMap::new(),
2994                };
2995                let background = agentkit_core::ToolCallPart {
2996                    id: ToolCallId::new("call-background"),
2997                    name: "background-wait".into(),
2998                    input: json!({}),
2999                    metadata: MetadataMap::new(),
3000                };
3001                VecDeque::from([
3002                    ModelTurnEvent::ToolCall(foreground.clone()),
3003                    ModelTurnEvent::ToolCall(background.clone()),
3004                    ModelTurnEvent::Finished(ModelTurnResult {
3005                        model: None,
3006                        response_id: None,
3007                        finish_reason: FinishReason::ToolCall,
3008                        output_items: vec![Item {
3009                            id: None,
3010                            kind: ItemKind::Assistant,
3011                            parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
3012                            metadata: MetadataMap::new(),
3013                            usage: None,
3014                            finish_reason: None,
3015                            created_at: None,
3016                        }],
3017                        usage: None,
3018                        metadata: MetadataMap::new(),
3019                    }),
3020                ])
3021            };
3022
3023            Ok(MultiToolTurn { events })
3024        }
3025    }
3026
3027    #[async_trait]
3028    impl ModelSession for DualApprovalSession {
3029        type Turn = DualApprovalTurn;
3030
3031        async fn begin_turn(
3032            &mut self,
3033            request: TurnRequest,
3034            _cancellation: Option<TurnCancellation>,
3035        ) -> Result<Self::Turn, LoopError> {
3036            let tool_results = request
3037                .transcript
3038                .iter()
3039                .flat_map(|item| item.parts.iter())
3040                .filter(|part| matches!(part, Part::ToolResult(_)))
3041                .count();
3042
3043            let events = if tool_results >= 2 {
3044                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
3045                    model: None,
3046                    response_id: None,
3047                    finish_reason: FinishReason::Completed,
3048                    output_items: vec![Item {
3049                        id: None,
3050                        kind: ItemKind::Assistant,
3051                        parts: vec![Part::Text(TextPart {
3052                            text: "both approvals finished".into(),
3053                            metadata: MetadataMap::new(),
3054                        })],
3055                        metadata: MetadataMap::new(),
3056                        usage: None,
3057                        finish_reason: None,
3058                        created_at: None,
3059                    }],
3060                    usage: None,
3061                    metadata: MetadataMap::new(),
3062                })])
3063            } else {
3064                let first = agentkit_core::ToolCallPart {
3065                    id: ToolCallId::new("call-1"),
3066                    name: "echo".into(),
3067                    input: json!({ "value": "first" }),
3068                    metadata: MetadataMap::new(),
3069                };
3070                let second = agentkit_core::ToolCallPart {
3071                    id: ToolCallId::new("call-2"),
3072                    name: "echo".into(),
3073                    input: json!({ "value": "second" }),
3074                    metadata: MetadataMap::new(),
3075                };
3076                VecDeque::from([
3077                    ModelTurnEvent::ToolCall(first.clone()),
3078                    ModelTurnEvent::ToolCall(second.clone()),
3079                    ModelTurnEvent::Finished(ModelTurnResult {
3080                        model: None,
3081                        response_id: None,
3082                        finish_reason: FinishReason::ToolCall,
3083                        output_items: vec![Item {
3084                            id: None,
3085                            kind: ItemKind::Assistant,
3086                            parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
3087                            metadata: MetadataMap::new(),
3088                            usage: None,
3089                            finish_reason: None,
3090                            created_at: None,
3091                        }],
3092                        usage: None,
3093                        metadata: MetadataMap::new(),
3094                    }),
3095                ])
3096            };
3097
3098            Ok(DualApprovalTurn { events })
3099        }
3100    }
3101
3102    #[async_trait]
3103    impl ModelTurn for FakeTurn {
3104        async fn next_event(
3105            &mut self,
3106            _cancellation: Option<TurnCancellation>,
3107        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3108            Ok(self.events.pop_front())
3109        }
3110    }
3111
3112    #[async_trait]
3113    impl ModelTurn for SlowTurn {
3114        async fn next_event(
3115            &mut self,
3116            cancellation: Option<TurnCancellation>,
3117        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3118            if let Some(cancellation) = cancellation
3119                && cancellation.is_cancelled()
3120            {
3121                return Err(LoopError::Cancelled);
3122            }
3123
3124            if self.emitted {
3125                Ok(None)
3126            } else {
3127                self.emitted = true;
3128                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3129                    model: None,
3130                    response_id: None,
3131                    finish_reason: FinishReason::Completed,
3132                    output_items: vec![Item {
3133                        id: None,
3134                        kind: ItemKind::Assistant,
3135                        parts: vec![Part::Text(TextPart {
3136                            text: "done".into(),
3137                            metadata: MetadataMap::new(),
3138                        })],
3139                        metadata: MetadataMap::new(),
3140                        usage: None,
3141                        finish_reason: None,
3142                        created_at: None,
3143                    }],
3144                    usage: None,
3145                    metadata: MetadataMap::new(),
3146                })))
3147            }
3148        }
3149    }
3150
3151    #[async_trait]
3152    impl ModelTurn for RecordingTurn {
3153        async fn next_event(
3154            &mut self,
3155            _cancellation: Option<TurnCancellation>,
3156        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3157            if self.emitted {
3158                Ok(None)
3159            } else {
3160                self.emitted = true;
3161                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3162                    model: None,
3163                    response_id: None,
3164                    finish_reason: FinishReason::Completed,
3165                    output_items: vec![Item {
3166                        id: None,
3167                        kind: ItemKind::Assistant,
3168                        parts: vec![Part::Text(TextPart {
3169                            text: "done".into(),
3170                            metadata: MetadataMap::new(),
3171                        })],
3172                        metadata: MetadataMap::new(),
3173                        usage: None,
3174                        finish_reason: None,
3175                        created_at: None,
3176                    }],
3177                    usage: None,
3178                    metadata: MetadataMap::new(),
3179                })))
3180            }
3181        }
3182    }
3183
3184    #[async_trait]
3185    impl ModelTurn for MultiToolTurn {
3186        async fn next_event(
3187            &mut self,
3188            _cancellation: Option<TurnCancellation>,
3189        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3190            Ok(self.events.pop_front())
3191        }
3192    }
3193
3194    #[async_trait]
3195    impl ModelTurn for DualApprovalTurn {
3196        async fn next_event(
3197            &mut self,
3198            _cancellation: Option<TurnCancellation>,
3199        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3200            Ok(self.events.pop_front())
3201        }
3202    }
3203
3204    #[derive(Clone)]
3205    struct EchoTool {
3206        spec: ToolSpec,
3207    }
3208
3209    impl Default for EchoTool {
3210        fn default() -> Self {
3211            Self {
3212                spec: ToolSpec {
3213                    name: ToolName::new("echo"),
3214                    description: "Echo back a value".into(),
3215                    input_schema: json!({
3216                        "type": "object",
3217                        "properties": {
3218                            "value": { "type": "string" }
3219                        },
3220                        "required": ["value"],
3221                        "additionalProperties": false
3222                    }),
3223                    output_schema: None,
3224                    annotations: ToolAnnotations::default(),
3225                    metadata: MetadataMap::new(),
3226                },
3227            }
3228        }
3229    }
3230
3231    #[derive(Clone)]
3232    struct DynamicSpecTool {
3233        spec: ToolSpec,
3234        version: StdArc<AtomicUsize>,
3235    }
3236
3237    impl DynamicSpecTool {
3238        fn new(version: StdArc<AtomicUsize>) -> Self {
3239            Self {
3240                spec: ToolSpec {
3241                    name: ToolName::new("dynamic"),
3242                    description: "dynamic version 0".into(),
3243                    input_schema: json!({
3244                        "type": "object",
3245                        "properties": {},
3246                        "additionalProperties": false
3247                    }),
3248                    output_schema: None,
3249                    annotations: ToolAnnotations::default(),
3250                    metadata: MetadataMap::new(),
3251                },
3252                version,
3253            }
3254        }
3255    }
3256
3257    #[async_trait]
3258    impl Tool for EchoTool {
3259        fn spec(&self) -> &ToolSpec {
3260            &self.spec
3261        }
3262
3263        fn proposed_requests(
3264            &self,
3265            request: &agentkit_tools_core::ToolRequest,
3266        ) -> Result<
3267            Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
3268            agentkit_tools_core::ToolError,
3269        > {
3270            Ok(vec![Box::new(FileSystemPermissionRequest::Read {
3271                path: "/tmp/echo".into(),
3272                metadata: request.metadata.clone(),
3273            })])
3274        }
3275
3276        async fn invoke(
3277            &self,
3278            request: agentkit_tools_core::ToolRequest,
3279            _ctx: &mut ToolContext<'_>,
3280        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3281            let value = request
3282                .input
3283                .get("value")
3284                .and_then(Value::as_str)
3285                .ok_or_else(|| {
3286                    agentkit_tools_core::ToolError::InvalidInput("missing value".into())
3287                })?;
3288
3289            Ok(ToolResult {
3290                result: ToolResultPart {
3291                    call_id: request.call_id,
3292                    output: ToolOutput::Text(value.into()),
3293                    is_error: false,
3294                    metadata: MetadataMap::new(),
3295                },
3296                duration: None,
3297                metadata: MetadataMap::new(),
3298            })
3299        }
3300    }
3301
3302    #[async_trait]
3303    impl Tool for DynamicSpecTool {
3304        fn spec(&self) -> &ToolSpec {
3305            &self.spec
3306        }
3307
3308        fn current_spec(&self) -> Option<ToolSpec> {
3309            let mut spec = self.spec.clone();
3310            spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
3311            Some(spec)
3312        }
3313
3314        async fn invoke(
3315            &self,
3316            request: agentkit_tools_core::ToolRequest,
3317            _ctx: &mut ToolContext<'_>,
3318        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3319            Ok(ToolResult {
3320                result: ToolResultPart {
3321                    call_id: request.call_id,
3322                    output: ToolOutput::Text("ok".into()),
3323                    is_error: false,
3324                    metadata: MetadataMap::new(),
3325                },
3326                duration: None,
3327                metadata: MetadataMap::new(),
3328            })
3329        }
3330    }
3331
3332    struct DenyFsReads;
3333
3334    impl PermissionChecker for DenyFsReads {
3335        fn evaluate(
3336            &self,
3337            request: &dyn agentkit_tools_core::PermissionRequest,
3338        ) -> PermissionDecision {
3339            if request.kind() == "filesystem.read" {
3340                return PermissionDecision::Deny(PermissionDenial {
3341                    code: PermissionCode::PathNotAllowed,
3342                    message: "reads denied in test".into(),
3343                    metadata: MetadataMap::new(),
3344                });
3345            }
3346
3347            PermissionDecision::Allow
3348        }
3349    }
3350
3351    struct ApproveFsReads;
3352
3353    impl PermissionChecker for ApproveFsReads {
3354        fn evaluate(
3355            &self,
3356            request: &dyn agentkit_tools_core::PermissionRequest,
3357        ) -> PermissionDecision {
3358            if request.kind() == "filesystem.read" {
3359                return PermissionDecision::RequireApproval(ApprovalRequest {
3360                    task_id: None,
3361                    call_id: None,
3362                    id: "approval:fs-read".into(),
3363                    request_kind: request.kind().into(),
3364                    reason: agentkit_tools_core::ApprovalReason::SensitivePath,
3365                    summary: request.summary(),
3366                    metadata: request.metadata().clone(),
3367                });
3368            }
3369
3370            PermissionDecision::Allow
3371        }
3372    }
3373
3374    struct KeepRecentMutator {
3375        keep: usize,
3376    }
3377
3378    #[async_trait]
3379    impl LoopMutator for KeepRecentMutator {
3380        async fn mutate(
3381            &self,
3382            cursor: &mut TranscriptCursor<'_>,
3383            ctx: LoopCtx<'_>,
3384        ) -> Result<(), LoopError> {
3385            if cursor.len() < 2 {
3386                return Ok(());
3387            }
3388            let drop = cursor.len().saturating_sub(self.keep);
3389            ctx.emitter.emit(AgentEvent::MutationStarted {
3390                session_id: ctx.session_id.clone(),
3391                turn_id: ctx.turn_id.cloned(),
3392                mutator: "keep-recent".into(),
3393                point: ctx.point,
3394            });
3395            cursor.drain(..drop);
3396            ctx.emitter.emit(AgentEvent::MutationFinished {
3397                session_id: ctx.session_id.clone(),
3398                turn_id: ctx.turn_id.cloned(),
3399                mutator: "keep-recent".into(),
3400                dirty: true,
3401                metadata: MetadataMap::new(),
3402            });
3403            Ok(())
3404        }
3405    }
3406
3407    /// No-op mutator that records the [`MutationPoint`] it is invoked with at
3408    /// each mutation site, so a test can assert which point the loop reports.
3409    struct PointRecordingMutator {
3410        points: StdArc<StdMutex<Vec<MutationPoint>>>,
3411    }
3412
3413    #[async_trait]
3414    impl LoopMutator for PointRecordingMutator {
3415        async fn mutate(
3416            &self,
3417            _cursor: &mut TranscriptCursor<'_>,
3418            ctx: LoopCtx<'_>,
3419        ) -> Result<(), LoopError> {
3420            self.points.lock().unwrap().push(ctx.point);
3421            Ok(())
3422        }
3423    }
3424
3425    struct RecordingObserver {
3426        events: StdArc<StdMutex<Vec<AgentEvent>>>,
3427    }
3428
3429    impl LoopObserver for RecordingObserver {
3430        fn handle_event(&self, event: AgentEvent) {
3431            self.events.lock().unwrap().push(event);
3432        }
3433    }
3434
3435    struct CatalogExecutor {
3436        version: AtomicUsize,
3437        events: StdMutex<Vec<ToolCatalogEvent>>,
3438    }
3439
3440    impl CatalogExecutor {
3441        fn new() -> Self {
3442            Self {
3443                version: AtomicUsize::new(0),
3444                events: StdMutex::new(Vec::new()),
3445            }
3446        }
3447
3448        fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3449            self.version.store(version, Ordering::SeqCst);
3450            self.events.lock().unwrap().push(event);
3451        }
3452    }
3453
3454    #[async_trait]
3455    impl ToolExecutor for CatalogExecutor {
3456        fn specs(&self) -> Vec<ToolSpec> {
3457            vec![ToolSpec {
3458                name: ToolName::new("dynamic"),
3459                description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3460                input_schema: json!({
3461                    "type": "object",
3462                    "properties": {},
3463                    "additionalProperties": false
3464                }),
3465                output_schema: None,
3466                annotations: ToolAnnotations::default(),
3467                metadata: MetadataMap::new(),
3468            }]
3469        }
3470
3471        fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3472            std::mem::take(&mut *self.events.lock().unwrap())
3473        }
3474
3475        async fn execute(
3476            &self,
3477            request: ToolRequest,
3478            _ctx: &mut ToolContext<'_>,
3479        ) -> ToolExecutionOutcome {
3480            ToolExecutionOutcome::Completed(ToolResult {
3481                result: ToolResultPart {
3482                    call_id: request.call_id,
3483                    output: ToolOutput::Text("dynamic-ok".into()),
3484                    is_error: false,
3485                    metadata: MetadataMap::new(),
3486                },
3487                duration: None,
3488                metadata: MetadataMap::new(),
3489            })
3490        }
3491    }
3492
3493    #[derive(Clone)]
3494    struct BlockingTool {
3495        spec: ToolSpec,
3496        entered: StdArc<AtomicBool>,
3497        release: StdArc<Notify>,
3498        output: &'static str,
3499    }
3500
3501    impl BlockingTool {
3502        fn new(
3503            name: &str,
3504            entered: StdArc<AtomicBool>,
3505            release: StdArc<Notify>,
3506            output: &'static str,
3507        ) -> Self {
3508            Self {
3509                spec: ToolSpec {
3510                    name: ToolName::new(name),
3511                    description: format!("blocking tool {name}"),
3512                    input_schema: json!({
3513                        "type": "object",
3514                        "properties": {},
3515                        "additionalProperties": false
3516                    }),
3517                    output_schema: None,
3518                    annotations: ToolAnnotations::default(),
3519                    metadata: MetadataMap::new(),
3520                },
3521                entered,
3522                release,
3523                output,
3524            }
3525        }
3526    }
3527
3528    #[async_trait]
3529    impl Tool for BlockingTool {
3530        fn spec(&self) -> &ToolSpec {
3531            &self.spec
3532        }
3533
3534        async fn invoke(
3535            &self,
3536            request: agentkit_tools_core::ToolRequest,
3537            _ctx: &mut ToolContext<'_>,
3538        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3539            self.entered.store(true, Ordering::SeqCst);
3540            self.release.notified().await;
3541            Ok(ToolResult {
3542                result: ToolResultPart {
3543                    call_id: request.call_id,
3544                    output: ToolOutput::Text(self.output.into()),
3545                    is_error: false,
3546                    metadata: MetadataMap::new(),
3547                },
3548                duration: None,
3549                metadata: MetadataMap::new(),
3550            })
3551        }
3552    }
3553
3554    struct NameRoutingPolicy {
3555        routes: Vec<(String, RoutingDecision)>,
3556    }
3557
3558    impl NameRoutingPolicy {
3559        fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3560            Self {
3561                routes: routes
3562                    .into_iter()
3563                    .map(|(name, decision)| (name.into(), decision))
3564                    .collect(),
3565            }
3566        }
3567    }
3568
3569    impl TaskRoutingPolicy for NameRoutingPolicy {
3570        fn route(&self, request: &ToolRequest) -> RoutingDecision {
3571            self.routes
3572                .iter()
3573                .find(|(name, _)| name == &request.tool_name.0)
3574                .map(|(_, decision)| *decision)
3575                .unwrap_or(RoutingDecision::Foreground)
3576        }
3577    }
3578
3579    async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3580        timeout(Duration::from_secs(1), handle.next_event())
3581            .await
3582            .expect("timed out waiting for task event")
3583            .expect("task event stream ended unexpectedly")
3584    }
3585
3586    async fn wait_until_entered(flag: &AtomicBool) {
3587        timeout(Duration::from_secs(1), async {
3588            while !flag.load(Ordering::SeqCst) {
3589                tokio::task::yield_now().await;
3590            }
3591        })
3592        .await
3593        .expect("task never entered execution");
3594    }
3595
3596    #[tokio::test]
3597    async fn loop_continues_after_completed_tool_call() {
3598        let tools = ToolRegistry::new().with(EchoTool::default());
3599        let agent = Agent::builder()
3600            .model(FakeAdapter)
3601            .add_tool_source(tools)
3602            .permissions(AllowAllPermissions)
3603            .build()
3604            .unwrap();
3605
3606        let mut driver = agent
3607            .start(SessionConfig {
3608                session_id: SessionId::new("session-1"),
3609                metadata: MetadataMap::new(),
3610                cache: None,
3611            })
3612            .await
3613            .unwrap();
3614
3615        driver
3616            .submit_input(vec![Item {
3617                id: None,
3618                kind: ItemKind::User,
3619                parts: vec![Part::Text(TextPart {
3620                    text: "ping".into(),
3621                    metadata: MetadataMap::new(),
3622                })],
3623                metadata: MetadataMap::new(),
3624                usage: None,
3625                finish_reason: None,
3626                created_at: None,
3627            }])
3628            .unwrap();
3629
3630        let result = run_until_finished(&mut driver).await;
3631
3632        match result {
3633            LoopStep::Finished(turn) => {
3634                assert_eq!(turn.finish_reason, FinishReason::Completed);
3635                assert_eq!(turn.items.len(), 1);
3636                match &turn.items[0].parts[0] {
3637                    Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3638                    other => panic!("unexpected part: {other:?}"),
3639                }
3640            }
3641            other => panic!("unexpected loop step: {other:?}"),
3642        }
3643    }
3644
3645    /// Test helper: drives the loop, transparently resuming non-blocking
3646    /// cooperative interrupts (AfterToolResult), until a terminal step or a
3647    /// blocking interrupt is reached.
3648    async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3649        loop {
3650            match driver.next().await.unwrap() {
3651                LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3652                step => return step,
3653            }
3654        }
3655    }
3656
3657    /// A mutator runs at the top of every `drive_turn`, and the loop labels
3658    /// the site via [`MutationPoint`]. The first drive of a turn is
3659    /// `AfterTurnEnded`; the continuation drive that follows a completed tool
3660    /// round must be `AfterToolResult` (a tool result was just appended and an
3661    /// inference call is imminent). This pins that the continuation reports the
3662    /// correct point.
3663    #[tokio::test]
3664    async fn post_tool_continuation_reports_after_tool_result_mutation_point() {
3665        let points = StdArc::new(StdMutex::new(Vec::<MutationPoint>::new()));
3666        let tools = ToolRegistry::new().with(EchoTool::default());
3667        let agent = Agent::builder()
3668            .model(FakeAdapter)
3669            .add_tool_source(tools)
3670            .permissions(AllowAllPermissions)
3671            .mutator(PointRecordingMutator {
3672                points: points.clone(),
3673            })
3674            .build()
3675            .unwrap();
3676
3677        let mut driver = agent
3678            .start(SessionConfig {
3679                session_id: SessionId::new("session-mutation-point"),
3680                metadata: MetadataMap::new(),
3681                cache: None,
3682            })
3683            .await
3684            .unwrap();
3685
3686        driver
3687            .submit_input(vec![Item::text(ItemKind::User, "ping")])
3688            .unwrap();
3689
3690        // FakeSession: turn 1 emits a tool call, the continuation turn finishes.
3691        let _ = run_until_finished(&mut driver).await;
3692
3693        let recorded = points.lock().unwrap().clone();
3694        assert_eq!(
3695            recorded.first(),
3696            Some(&MutationPoint::AfterTurnEnded),
3697            "first drive of a fresh turn must report AfterTurnEnded, got {recorded:?}"
3698        );
3699        assert!(
3700            recorded.contains(&MutationPoint::AfterToolResult),
3701            "post-tool continuation must report AfterToolResult, got {recorded:?}"
3702        );
3703    }
3704
3705    #[test]
3706    fn pending_input_requires_input_bearing_tail_role() {
3707        assert!(!transcript_has_pending_input(&[]));
3708        assert!(!transcript_has_pending_input(&[Item::text(
3709            ItemKind::System,
3710            "system"
3711        )]));
3712        assert!(!transcript_has_pending_input(&[Item::text(
3713            ItemKind::Developer,
3714            "developer"
3715        )]));
3716        assert!(!transcript_has_pending_input(&[Item::text(
3717            ItemKind::Context,
3718            "context"
3719        )]));
3720        assert!(!transcript_has_pending_input(&[Item::text(
3721            ItemKind::Assistant,
3722            "assistant"
3723        )]));
3724
3725        assert!(transcript_has_pending_input(&[Item::text(
3726            ItemKind::User,
3727            "user"
3728        )]));
3729        assert!(transcript_has_pending_input(&[Item::notification(
3730            "background update"
3731        )]));
3732        assert!(transcript_has_pending_input(&[Item {
3733            id: None,
3734            kind: ItemKind::Tool,
3735            parts: vec![Part::ToolResult(ToolResultPart {
3736                call_id: ToolCallId::new("call-test"),
3737                output: ToolOutput::Text("ok".into()),
3738                is_error: false,
3739                metadata: MetadataMap::new(),
3740            })],
3741            metadata: MetadataMap::new(),
3742            usage: None,
3743            finish_reason: None,
3744            created_at: None,
3745        }]));
3746    }
3747
3748    /// Drops a trailing `User` item. Stands in for any mutator that removes the
3749    /// freshly-submitted input during `drive_turn` — e.g. a compaction pass
3750    /// that summarises the latest user turn away, or a normalisation step that
3751    /// strips an empty user prompt — leaving the transcript ending in an
3752    /// assistant message.
3753    struct DropTrailingUserMutator;
3754
3755    #[async_trait]
3756    impl LoopMutator for DropTrailingUserMutator {
3757        async fn mutate(
3758            &self,
3759            cursor: &mut TranscriptCursor<'_>,
3760            _ctx: LoopCtx<'_>,
3761        ) -> Result<(), LoopError> {
3762            if cursor.last().map(|item| item.kind) == Some(ItemKind::User) {
3763                cursor.pop();
3764            }
3765            Ok(())
3766        }
3767    }
3768
3769    /// Mirrors the provider gram hit (Vertex/Bedrock via OpenRouter): a model
3770    /// that rejects any request whose final message is an assistant message
3771    /// ("assistant prefill — the conversation must end with a user message").
3772    /// Records whether it was ever asked to begin such a turn.
3773    struct RejectAssistantPrefillAdapter {
3774        saw_assistant_tail: StdArc<AtomicBool>,
3775    }
3776
3777    struct RejectAssistantPrefillSession {
3778        saw_assistant_tail: StdArc<AtomicBool>,
3779    }
3780
3781    #[async_trait]
3782    impl ModelAdapter for RejectAssistantPrefillAdapter {
3783        type Session = RejectAssistantPrefillSession;
3784
3785        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
3786            Ok(RejectAssistantPrefillSession {
3787                saw_assistant_tail: self.saw_assistant_tail.clone(),
3788            })
3789        }
3790    }
3791
3792    #[async_trait]
3793    impl ModelSession for RejectAssistantPrefillSession {
3794        type Turn = FakeTurn;
3795
3796        async fn begin_turn(
3797            &mut self,
3798            request: TurnRequest,
3799            _cancellation: Option<TurnCancellation>,
3800        ) -> Result<Self::Turn, LoopError> {
3801            if request.transcript.last().map(|item| item.kind) == Some(ItemKind::Assistant) {
3802                self.saw_assistant_tail.store(true, Ordering::SeqCst);
3803                return Err(LoopError::Provider(
3804                    "conversation must end with a user message".into(),
3805                ));
3806            }
3807            Ok(FakeTurn {
3808                events: VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
3809                    model: None,
3810                    response_id: None,
3811                    finish_reason: FinishReason::Completed,
3812                    output_items: vec![Item::text(ItemKind::Assistant, "ok")],
3813                    usage: None,
3814                    metadata: MetadataMap::new(),
3815                })]),
3816            })
3817        }
3818    }
3819
3820    /// Reproduces the exact failure mode observed in gram: a mutator removes
3821    /// the just-submitted user input during `drive_turn`, so the transcript
3822    /// ends in an assistant message with nothing for the model to respond to.
3823    /// The loop must NOT dispatch a model request in that state — there is no
3824    /// valid trailing input to drive with — it should finish the turn instead.
3825    /// The adapter stands in for a provider that rejects assistant prefill, so
3826    /// any dispatch in this state would surface as a provider error.
3827    #[tokio::test]
3828    async fn drive_does_not_dispatch_without_valid_trailing_input() {
3829        let saw_assistant_tail = StdArc::new(AtomicBool::new(false));
3830        let agent = Agent::builder()
3831            .model(RejectAssistantPrefillAdapter {
3832                saw_assistant_tail: saw_assistant_tail.clone(),
3833            })
3834            .mutator(DropTrailingUserMutator)
3835            // Prior conversation ending in an assistant message — e.g. a cold
3836            // bootstrap that loaded a completed turn's history.
3837            .transcript(vec![
3838                Item::text(ItemKind::User, "kickoff"),
3839                Item::text(ItemKind::Assistant, "prior reply"),
3840            ])
3841            .build()
3842            .unwrap();
3843
3844        let mut driver = agent
3845            .start(SessionConfig {
3846                session_id: SessionId::new("session-no-valid-input"),
3847                metadata: MetadataMap::new(),
3848                cache: None,
3849            })
3850            .await
3851            .unwrap();
3852
3853        driver
3854            .submit_input(vec![Item::text(ItemKind::User, "follow up")])
3855            .unwrap();
3856
3857        // The mutator strips the "follow up" user item, leaving [user, assistant].
3858        let outcome = driver.next().await;
3859
3860        assert!(
3861            !saw_assistant_tail.load(Ordering::SeqCst),
3862            "loop dispatched a model turn whose transcript ends in an assistant \
3863             message (outcome: {outcome:?}); with no valid trailing input the turn \
3864             must finish instead of driving"
3865        );
3866    }
3867
3868    #[tokio::test]
3869    async fn loop_uses_injected_permission_checker() {
3870        let tools = ToolRegistry::new().with(EchoTool::default());
3871        let agent = Agent::builder()
3872            .model(FakeAdapter)
3873            .add_tool_source(tools)
3874            .permissions(DenyFsReads)
3875            .build()
3876            .unwrap();
3877
3878        let mut driver = agent
3879            .start(SessionConfig {
3880                session_id: SessionId::new("session-2"),
3881                metadata: MetadataMap::new(),
3882                cache: None,
3883            })
3884            .await
3885            .unwrap();
3886
3887        driver
3888            .submit_input(vec![Item {
3889                id: None,
3890                kind: ItemKind::User,
3891                parts: vec![Part::Text(TextPart {
3892                    text: "ping".into(),
3893                    metadata: MetadataMap::new(),
3894                })],
3895                metadata: MetadataMap::new(),
3896                usage: None,
3897                finish_reason: None,
3898                created_at: None,
3899            }])
3900            .unwrap();
3901
3902        let result = run_until_finished(&mut driver).await;
3903
3904        match result {
3905            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3906                Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3907                other => panic!("unexpected part: {other:?}"),
3908            },
3909            other => panic!("unexpected loop step: {other:?}"),
3910        }
3911    }
3912
3913    #[tokio::test]
3914    async fn async_task_manager_background_round_requires_explicit_continue() {
3915        let entered = StdArc::new(AtomicBool::new(false));
3916        let release = StdArc::new(Notify::new());
3917        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3918            "background-wait",
3919            RoutingDecision::Background,
3920        )]));
3921        let handle = task_manager.handle();
3922        let tools = ToolRegistry::new().with(BlockingTool::new(
3923            "background-wait",
3924            entered.clone(),
3925            release.clone(),
3926            "background-done",
3927        ));
3928        let agent = Agent::builder()
3929            .model(FakeAdapter)
3930            .add_tool_source(tools)
3931            .permissions(AllowAllPermissions)
3932            .task_manager(task_manager)
3933            .build()
3934            .unwrap();
3935
3936        let mut driver = agent
3937            .start(SessionConfig {
3938                session_id: SessionId::new("session-background"),
3939                metadata: MetadataMap::new(),
3940                cache: None,
3941            })
3942            .await
3943            .unwrap();
3944
3945        driver
3946            .submit_input(vec![Item {
3947                id: None,
3948                kind: ItemKind::User,
3949                parts: vec![Part::Text(TextPart {
3950                    text: "ping".into(),
3951                    metadata: MetadataMap::new(),
3952                })],
3953                metadata: MetadataMap::new(),
3954                usage: None,
3955                finish_reason: None,
3956                created_at: None,
3957            }])
3958            .unwrap();
3959
3960        let first = driver.next().await.unwrap();
3961        match first {
3962            LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3963            other => panic!("unexpected first loop step: {other:?}"),
3964        }
3965
3966        match wait_for_task_event(&handle).await {
3967            TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3968            other => panic!("unexpected task event: {other:?}"),
3969        }
3970        wait_until_entered(entered.as_ref()).await;
3971        release.notify_waiters();
3972
3973        match wait_for_task_event(&handle).await {
3974            TaskEvent::Completed(_, result) => {
3975                assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3976            }
3977            other => panic!("unexpected completion event: {other:?}"),
3978        }
3979
3980        let resumed = driver.next().await.unwrap();
3981        match resumed {
3982            LoopStep::Finished(turn) => {
3983                assert_eq!(turn.finish_reason, FinishReason::Completed);
3984                match &turn.items[0].parts[0] {
3985                    Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3986                    other => panic!("unexpected part after resume: {other:?}"),
3987                }
3988            }
3989            other => panic!("unexpected resumed step: {other:?}"),
3990        }
3991    }
3992
3993    #[tokio::test]
3994    async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3995        let controller = CancellationController::new();
3996        let agent = Agent::builder()
3997            .model(SlowAdapter)
3998            .cancellation(controller.handle())
3999            .build()
4000            .unwrap();
4001
4002        let mut driver = agent
4003            .start(SessionConfig {
4004                session_id: SessionId::new("session-cancel"),
4005                metadata: MetadataMap::new(),
4006                cache: None,
4007            })
4008            .await
4009            .unwrap();
4010
4011        driver
4012            .submit_input(vec![Item {
4013                id: None,
4014                kind: ItemKind::User,
4015                parts: vec![Part::Text(TextPart {
4016                    text: "do the long task".into(),
4017                    metadata: MetadataMap::new(),
4018                })],
4019                metadata: MetadataMap::new(),
4020                usage: None,
4021                finish_reason: None,
4022                created_at: None,
4023            }])
4024            .unwrap();
4025
4026        let cancelled = tokio::join!(async { driver.next().await }, async {
4027            tokio::task::yield_now().await;
4028            controller.interrupt();
4029        })
4030        .0
4031        .unwrap();
4032
4033        match cancelled {
4034            LoopStep::Finished(turn) => {
4035                assert_eq!(turn.finish_reason, FinishReason::Cancelled);
4036                assert_eq!(turn.items.len(), 1);
4037                assert_eq!(turn.items[0].kind, ItemKind::Assistant);
4038                assert_eq!(
4039                    turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
4040                    Some(&Value::Bool(true))
4041                );
4042            }
4043            other => panic!("unexpected loop step: {other:?}"),
4044        }
4045
4046        driver
4047            .submit_input(vec![Item {
4048                id: None,
4049                kind: ItemKind::User,
4050                parts: vec![Part::Text(TextPart {
4051                    text: "try again".into(),
4052                    metadata: MetadataMap::new(),
4053                })],
4054                metadata: MetadataMap::new(),
4055                usage: None,
4056                finish_reason: None,
4057                created_at: None,
4058            }])
4059            .unwrap();
4060
4061        let result = driver.next().await.unwrap();
4062        match result {
4063            LoopStep::Finished(turn) => {
4064                assert_eq!(turn.finish_reason, FinishReason::Completed);
4065            }
4066            other => panic!("unexpected loop step after retry: {other:?}"),
4067        }
4068    }
4069
4070    #[tokio::test]
4071    async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
4072        let controller = CancellationController::new();
4073        let fg_entered = StdArc::new(AtomicBool::new(false));
4074        let fg_release = StdArc::new(Notify::new());
4075        let bg_entered = StdArc::new(AtomicBool::new(false));
4076        let bg_release = StdArc::new(Notify::new());
4077        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
4078            ("foreground-wait", RoutingDecision::Foreground),
4079            ("background-wait", RoutingDecision::Background),
4080        ]));
4081        let handle = task_manager.handle();
4082        let tools = ToolRegistry::new()
4083            .with(BlockingTool::new(
4084                "foreground-wait",
4085                fg_entered.clone(),
4086                fg_release,
4087                "foreground-done",
4088            ))
4089            .with(BlockingTool::new(
4090                "background-wait",
4091                bg_entered.clone(),
4092                bg_release.clone(),
4093                "background-done",
4094            ));
4095        let agent = Agent::builder()
4096            .model(MultiToolAdapter)
4097            .add_tool_source(tools)
4098            .permissions(AllowAllPermissions)
4099            .cancellation(controller.handle())
4100            .task_manager(task_manager)
4101            .build()
4102            .unwrap();
4103
4104        let mut driver = agent
4105            .start(SessionConfig {
4106                session_id: SessionId::new("session-mixed-cancel"),
4107                metadata: MetadataMap::new(),
4108                cache: None,
4109            })
4110            .await
4111            .unwrap();
4112
4113        driver
4114            .submit_input(vec![Item {
4115                id: None,
4116                kind: ItemKind::User,
4117                parts: vec![Part::Text(TextPart {
4118                    text: "run both".into(),
4119                    metadata: MetadataMap::new(),
4120                })],
4121                metadata: MetadataMap::new(),
4122                usage: None,
4123                finish_reason: None,
4124                created_at: None,
4125            }])
4126            .unwrap();
4127
4128        let cancelled = tokio::join!(async { driver.next().await }, async {
4129            let _ = wait_for_task_event(&handle).await;
4130            let _ = wait_for_task_event(&handle).await;
4131            wait_until_entered(fg_entered.as_ref()).await;
4132            wait_until_entered(bg_entered.as_ref()).await;
4133            controller.interrupt();
4134        })
4135        .0
4136        .unwrap();
4137
4138        match cancelled {
4139            LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
4140            other => panic!("unexpected loop step after interrupt: {other:?}"),
4141        }
4142
4143        match wait_for_task_event(&handle).await {
4144            TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
4145            other => panic!("unexpected post-interrupt event: {other:?}"),
4146        }
4147
4148        let running = handle.list_running().await;
4149        assert_eq!(running.len(), 1);
4150        assert_eq!(running[0].tool_name, "background-wait");
4151
4152        bg_release.notify_waiters();
4153        match wait_for_task_event(&handle).await {
4154            TaskEvent::Completed(snapshot, result) => {
4155                assert_eq!(snapshot.tool_name, "background-wait");
4156                assert_eq!(result.output, ToolOutput::Text("background-done".into()));
4157            }
4158            other => panic!("unexpected background completion event: {other:?}"),
4159        }
4160    }
4161
4162    #[tokio::test]
4163    async fn loop_resumes_after_approved_tool_request() {
4164        let tools = ToolRegistry::new().with(EchoTool::default());
4165        let agent = Agent::builder()
4166            .model(FakeAdapter)
4167            .add_tool_source(tools)
4168            .permissions(ApproveFsReads)
4169            .build()
4170            .unwrap();
4171
4172        let mut driver = agent
4173            .start(SessionConfig {
4174                session_id: SessionId::new("session-approval"),
4175                metadata: MetadataMap::new(),
4176                cache: None,
4177            })
4178            .await
4179            .unwrap();
4180
4181        driver
4182            .submit_input(vec![Item {
4183                id: None,
4184                kind: ItemKind::User,
4185                parts: vec![Part::Text(TextPart {
4186                    text: "ping".into(),
4187                    metadata: MetadataMap::new(),
4188                })],
4189                metadata: MetadataMap::new(),
4190                usage: None,
4191                finish_reason: None,
4192                created_at: None,
4193            }])
4194            .unwrap();
4195
4196        let first = driver.next().await.unwrap();
4197        match first {
4198            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4199                assert!(pending.request.task_id.is_some());
4200                assert_eq!(pending.request.id.0, "approval:fs-read");
4201                pending.approve(&mut driver).unwrap();
4202            }
4203            other => panic!("unexpected loop step: {other:?}"),
4204        }
4205        let second = driver.next().await.unwrap();
4206        match second {
4207            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
4208                Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
4209                other => panic!("unexpected part: {other:?}"),
4210            },
4211            other => panic!("unexpected loop step after approval: {other:?}"),
4212        }
4213    }
4214
4215    #[tokio::test]
4216    async fn loop_resumes_with_patched_input_on_approval() {
4217        let tools = ToolRegistry::new().with(EchoTool::default());
4218        let agent = Agent::builder()
4219            .model(FakeAdapter)
4220            .add_tool_source(tools)
4221            .permissions(ApproveFsReads)
4222            .build()
4223            .unwrap();
4224
4225        let mut driver = agent
4226            .start(SessionConfig {
4227                session_id: SessionId::new("session-approval-patched"),
4228                metadata: MetadataMap::new(),
4229                cache: None,
4230            })
4231            .await
4232            .unwrap();
4233
4234        driver
4235            .submit_input(vec![Item {
4236                id: None,
4237                kind: ItemKind::User,
4238                parts: vec![Part::Text(TextPart {
4239                    text: "ping".into(),
4240                    metadata: MetadataMap::new(),
4241                })],
4242                metadata: MetadataMap::new(),
4243                usage: None,
4244                finish_reason: None,
4245                created_at: None,
4246            }])
4247            .unwrap();
4248
4249        match driver.next().await.unwrap() {
4250            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4251                pending
4252                    .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
4253                    .unwrap();
4254            }
4255            other => panic!("unexpected loop step: {other:?}"),
4256        }
4257        match driver.next().await.unwrap() {
4258            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
4259                Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
4260                other => panic!("unexpected part: {other:?}"),
4261            },
4262            other => panic!("unexpected loop step after approval: {other:?}"),
4263        }
4264    }
4265
4266    #[tokio::test]
4267    async fn loop_tracks_multiple_pending_approvals_by_call_id() {
4268        let tools = ToolRegistry::new().with(EchoTool::default());
4269        let agent = Agent::builder()
4270            .model(DualApprovalAdapter)
4271            .add_tool_source(tools)
4272            .permissions(ApproveFsReads)
4273            .build()
4274            .unwrap();
4275
4276        let mut driver = agent
4277            .start(SessionConfig {
4278                session_id: SessionId::new("session-dual-approval"),
4279                metadata: MetadataMap::new(),
4280                cache: None,
4281            })
4282            .await
4283            .unwrap();
4284
4285        driver
4286            .submit_input(vec![Item {
4287                id: None,
4288                kind: ItemKind::User,
4289                parts: vec![Part::Text(TextPart {
4290                    text: "run both approvals".into(),
4291                    metadata: MetadataMap::new(),
4292                })],
4293                metadata: MetadataMap::new(),
4294                usage: None,
4295                finish_reason: None,
4296                created_at: None,
4297            }])
4298            .unwrap();
4299
4300        let pending_first = match driver.next().await.unwrap() {
4301            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4302                assert_eq!(
4303                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4304                    Some("call-1")
4305                );
4306                pending
4307            }
4308            other => panic!("unexpected first loop step: {other:?}"),
4309        };
4310
4311        let pending_second = match driver.next().await.unwrap() {
4312            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4313                assert_eq!(
4314                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4315                    Some("call-2")
4316                );
4317                pending
4318            }
4319            other => panic!("unexpected second loop step: {other:?}"),
4320        };
4321
4322        pending_second.approve(&mut driver).unwrap();
4323        match driver.next().await.unwrap() {
4324            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4325                assert_eq!(
4326                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4327                    Some("call-1")
4328                );
4329            }
4330            other => panic!("unexpected step after approving second request: {other:?}"),
4331        }
4332
4333        pending_first.approve(&mut driver).unwrap();
4334        match driver.next().await.unwrap() {
4335            LoopStep::Finished(turn) => {
4336                assert_eq!(turn.finish_reason, FinishReason::Completed);
4337                match &turn.items[0].parts[0] {
4338                    Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
4339                    other => panic!("unexpected final part: {other:?}"),
4340                }
4341            }
4342            other => panic!("unexpected final loop step: {other:?}"),
4343        }
4344    }
4345
4346    #[tokio::test]
4347    async fn loop_compacts_transcript_before_new_turns() {
4348        let events = StdArc::new(StdMutex::new(Vec::new()));
4349        let agent = Agent::builder()
4350            .model(FakeAdapter)
4351            .mutator(KeepRecentMutator { keep: 1 })
4352            .observer(RecordingObserver {
4353                events: events.clone(),
4354            })
4355            .build()
4356            .unwrap();
4357
4358        let mut driver = agent
4359            .start(SessionConfig {
4360                session_id: SessionId::new("session-4"),
4361                metadata: MetadataMap::new(),
4362                cache: None,
4363            })
4364            .await
4365            .unwrap();
4366
4367        for text in ["first", "second"] {
4368            driver
4369                .submit_input(vec![Item {
4370                    id: None,
4371                    kind: ItemKind::User,
4372                    parts: vec![Part::Text(TextPart {
4373                        text: text.into(),
4374                        metadata: MetadataMap::new(),
4375                    })],
4376                    metadata: MetadataMap::new(),
4377                    usage: None,
4378                    finish_reason: None,
4379                    created_at: None,
4380                }])
4381                .unwrap();
4382            let _ = driver.next().await.unwrap();
4383        }
4384
4385        let events = events.lock().unwrap();
4386        assert!(
4387            events
4388                .iter()
4389                .any(|event| matches!(event, AgentEvent::MutationFinished { dirty: true, .. }))
4390        );
4391    }
4392
4393    #[test]
4394    fn transcript_validation_rejects_orphaned_tool_result() {
4395        let transcript = vec![Item {
4396            id: None,
4397            kind: ItemKind::Tool,
4398            parts: vec![Part::ToolResult(ToolResultPart {
4399                call_id: "call-1".into(),
4400                output: ToolOutput::Text("result".into()),
4401                is_error: false,
4402                metadata: MetadataMap::new(),
4403            })],
4404            metadata: MetadataMap::new(),
4405            usage: None,
4406            finish_reason: None,
4407            created_at: None,
4408        }];
4409
4410        let error = validate_transcript_invariants(&transcript).unwrap_err();
4411        assert!(error.to_string().contains("orphaned tool_result"));
4412    }
4413
4414    #[test]
4415    fn transcript_validation_rejects_duplicate_tool_result() {
4416        let transcript = vec![
4417            Item {
4418                id: None,
4419                kind: ItemKind::Assistant,
4420                parts: vec![Part::ToolCall(ToolCallPart {
4421                    id: "call-1".into(),
4422                    name: "lookup".into(),
4423                    input: serde_json::json!({}),
4424                    metadata: MetadataMap::new(),
4425                })],
4426                metadata: MetadataMap::new(),
4427                usage: None,
4428                finish_reason: None,
4429                created_at: None,
4430            },
4431            Item {
4432                id: None,
4433                kind: ItemKind::Tool,
4434                parts: vec![Part::ToolResult(ToolResultPart {
4435                    call_id: "call-1".into(),
4436                    output: ToolOutput::Text("result".into()),
4437                    is_error: false,
4438                    metadata: MetadataMap::new(),
4439                })],
4440                metadata: MetadataMap::new(),
4441                usage: None,
4442                finish_reason: None,
4443                created_at: None,
4444            },
4445            Item {
4446                id: None,
4447                kind: ItemKind::Tool,
4448                parts: vec![Part::ToolResult(ToolResultPart {
4449                    call_id: "call-1".into(),
4450                    output: ToolOutput::Text("again".into()),
4451                    is_error: false,
4452                    metadata: MetadataMap::new(),
4453                })],
4454                metadata: MetadataMap::new(),
4455                usage: None,
4456                finish_reason: None,
4457                created_at: None,
4458            },
4459        ];
4460
4461        let error = validate_transcript_invariants(&transcript).unwrap_err();
4462        assert!(error.to_string().contains("duplicate tool_result"));
4463    }
4464
4465    #[tokio::test]
4466    async fn loop_refreshes_tool_specs_each_turn() {
4467        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4468        let version = StdArc::new(AtomicUsize::new(1));
4469        let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
4470        let agent = Agent::builder()
4471            .model(RecordingAdapter {
4472                seen_descriptions: seen_descriptions.clone(),
4473                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4474            })
4475            .add_tool_source(tools)
4476            .permissions(AllowAllPermissions)
4477            .build()
4478            .unwrap();
4479
4480        let mut driver = agent
4481            .start(SessionConfig {
4482                session_id: SessionId::new("session-dynamic-tools"),
4483                metadata: MetadataMap::new(),
4484                cache: None,
4485            })
4486            .await
4487            .unwrap();
4488
4489        for text in ["first", "second"] {
4490            driver
4491                .submit_input(vec![Item {
4492                    id: None,
4493                    kind: ItemKind::User,
4494                    parts: vec![Part::Text(TextPart {
4495                        text: text.into(),
4496                        metadata: MetadataMap::new(),
4497                    })],
4498                    metadata: MetadataMap::new(),
4499                    usage: None,
4500                    finish_reason: None,
4501                    created_at: None,
4502                }])
4503                .unwrap();
4504
4505            let _ = driver.next().await.unwrap();
4506            if text == "first" {
4507                version.store(2, Ordering::SeqCst);
4508            }
4509        }
4510
4511        let seen_descriptions = seen_descriptions.lock().unwrap();
4512        assert_eq!(seen_descriptions.len(), 2);
4513        assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
4514        assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
4515    }
4516
4517    #[tokio::test]
4518    async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
4519        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4520        let events = StdArc::new(StdMutex::new(Vec::new()));
4521        let executor = StdArc::new(CatalogExecutor::new());
4522        let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
4523        let agent = Agent::builder()
4524            .model(RecordingAdapter {
4525                seen_descriptions: seen_descriptions.clone(),
4526                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4527            })
4528            .tool_executor(executor_for_agent)
4529            .permissions(AllowAllPermissions)
4530            .observer(RecordingObserver {
4531                events: events.clone(),
4532            })
4533            .build()
4534            .unwrap();
4535
4536        let mut driver = agent
4537            .start(SessionConfig {
4538                session_id: SessionId::new("session-catalog-events"),
4539                metadata: MetadataMap::new(),
4540                cache: None,
4541            })
4542            .await
4543            .unwrap();
4544
4545        driver
4546            .submit_input(vec![Item::text(ItemKind::User, "first")])
4547            .unwrap();
4548        let _ = driver.next().await.unwrap();
4549
4550        executor.publish_change(
4551            1,
4552            ToolCatalogEvent {
4553                source: "mcp:mock".into(),
4554                added: vec!["dynamic".into()],
4555                removed: Vec::new(),
4556                changed: Vec::new(),
4557            },
4558        );
4559
4560        driver
4561            .submit_input(vec![Item::text(ItemKind::User, "second")])
4562            .unwrap();
4563        let _ = driver.next().await.unwrap();
4564
4565        let seen_descriptions = seen_descriptions.lock().unwrap();
4566        assert_eq!(seen_descriptions.len(), 2);
4567        assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
4568        assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
4569
4570        let events = events.lock().unwrap();
4571        assert!(events.iter().any(|event| matches!(
4572            event,
4573            AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
4574                source,
4575                added,
4576                removed,
4577                changed,
4578            }) if source == "mcp:mock"
4579                && added == &vec!["dynamic".to_string()]
4580                && removed.is_empty()
4581                && changed.is_empty()
4582        )));
4583    }
4584
4585    #[tokio::test]
4586    async fn loop_passes_session_default_and_next_turn_cache_requests() {
4587        let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
4588        let agent = Agent::builder()
4589            .model(RecordingAdapter {
4590                seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
4591                seen_caches: seen_caches.clone(),
4592            })
4593            .permissions(AllowAllPermissions)
4594            .build()
4595            .unwrap();
4596
4597        let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
4598            .with_retention(PromptCacheRetention::Short);
4599        let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
4600            breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
4601        });
4602
4603        let mut driver = agent
4604            .start(SessionConfig {
4605                session_id: SessionId::new("session-cache"),
4606                metadata: MetadataMap::new(),
4607                cache: Some(default_cache.clone()),
4608            })
4609            .await
4610            .unwrap();
4611
4612        driver
4613            .submit_input(vec![Item {
4614                id: None,
4615                kind: ItemKind::User,
4616                parts: vec![Part::Text(TextPart {
4617                    text: "first".into(),
4618                    metadata: MetadataMap::new(),
4619                })],
4620                metadata: MetadataMap::new(),
4621                usage: None,
4622                finish_reason: None,
4623                created_at: None,
4624            }])
4625            .unwrap();
4626        let _ = driver.next().await.unwrap();
4627
4628        driver
4629            .submit_input_with_cache(
4630                vec![Item {
4631                    id: None,
4632                    kind: ItemKind::User,
4633                    parts: vec![Part::Text(TextPart {
4634                        text: "second".into(),
4635                        metadata: MetadataMap::new(),
4636                    })],
4637                    metadata: MetadataMap::new(),
4638                    usage: None,
4639                    finish_reason: None,
4640                    created_at: None,
4641                }],
4642                override_cache.clone(),
4643            )
4644            .unwrap();
4645        let _ = driver.next().await.unwrap();
4646
4647        let seen = seen_caches.lock().unwrap();
4648        assert_eq!(seen.len(), 2);
4649        assert_eq!(seen[0], Some(default_cache));
4650        assert_eq!(seen[1], Some(override_cache));
4651    }
4652
4653    #[tokio::test]
4654    async fn loop_yields_after_tool_result_between_rounds() {
4655        let tools = ToolRegistry::new().with(EchoTool::default());
4656        let agent = Agent::builder()
4657            .model(FakeAdapter)
4658            .add_tool_source(tools)
4659            .permissions(AllowAllPermissions)
4660            .build()
4661            .unwrap();
4662
4663        let mut driver = agent
4664            .start(SessionConfig {
4665                session_id: SessionId::new("yield-session"),
4666                metadata: MetadataMap::new(),
4667                cache: None,
4668            })
4669            .await
4670            .unwrap();
4671
4672        driver
4673            .submit_input(vec![Item::text(ItemKind::User, "ping")])
4674            .unwrap();
4675
4676        // First next() runs the model turn, resolves the tool call, and
4677        // yields AfterToolResult before calling the model again.
4678        let step = driver.next().await.unwrap();
4679        let info = match step {
4680            LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
4681            other => panic!("expected AfterToolResult, got {other:?}"),
4682        };
4683        assert_eq!(info.session_id, SessionId::new("yield-session"));
4684        // Transcript at yield: [User, Assistant(tool_call), Tool(result)]
4685        assert_eq!(info.transcript_len, 3);
4686
4687        // The yield is cooperative, not blocking.
4688        let interrupt = LoopInterrupt::AfterToolResult(info.clone());
4689        assert!(!interrupt.is_blocking());
4690
4691        // Host interjects a message mid-turn.
4692        driver
4693            .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
4694            .unwrap();
4695
4696        // Second next() resumes the turn into the next model call, which
4697        // sees the tool result (and the injected user message) and finishes.
4698        let step = driver.next().await.unwrap();
4699        match step {
4700            LoopStep::Finished(turn) => {
4701                assert_eq!(turn.finish_reason, FinishReason::Completed);
4702            }
4703            other => panic!("expected Finished, got {other:?}"),
4704        }
4705
4706        // Transcript must now include the injected user message.
4707        let snapshot = driver.snapshot();
4708        let has_injected_message = snapshot.transcript.iter().any(|item| {
4709            item.kind == ItemKind::User
4710                && item.parts.iter().any(|part| match part {
4711                    Part::Text(text) => text.text == "also: report back",
4712                    _ => false,
4713                })
4714        });
4715        assert!(
4716            has_injected_message,
4717            "injected user message should be in transcript, got: {:?}",
4718            snapshot.transcript
4719        );
4720    }
4721
4722    struct RecordingTranscriptObserver {
4723        items: StdArc<StdMutex<Vec<Item>>>,
4724    }
4725
4726    impl TranscriptObserver for RecordingTranscriptObserver {
4727        fn on_item_appended(&self, item: &Item) {
4728            self.items.lock().unwrap().push(item.clone());
4729        }
4730    }
4731
4732    #[tokio::test]
4733    async fn observers_see_full_tool_round() {
4734        // A turn with one tool call exercises every interesting path:
4735        //   user input drained -> model output_items (assistant w/ tool call)
4736        //   -> tool result Item -> next model output_items (assistant text)
4737        // The LoopObserver should see exactly one ToolResultReceived; the
4738        // TranscriptObserver should see all four items in transcript order.
4739        let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
4740        let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
4741        let agent = Agent::builder()
4742            .model(FakeAdapter)
4743            .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
4744            .permissions(AllowAllPermissions)
4745            .observer(RecordingObserver {
4746                events: events.clone(),
4747            })
4748            .transcript_observer(RecordingTranscriptObserver {
4749                items: items.clone(),
4750            })
4751            .build()
4752            .unwrap();
4753
4754        let mut driver = agent
4755            .start(SessionConfig {
4756                session_id: SessionId::new("observer-session"),
4757                metadata: MetadataMap::new(),
4758                cache: None,
4759            })
4760            .await
4761            .unwrap();
4762
4763        driver
4764            .submit_input(vec![Item {
4765                id: None,
4766                kind: ItemKind::User,
4767                parts: vec![Part::Text(TextPart {
4768                    text: "ping".into(),
4769                    metadata: MetadataMap::new(),
4770                })],
4771                metadata: MetadataMap::new(),
4772                usage: None,
4773                finish_reason: None,
4774                created_at: None,
4775            }])
4776            .unwrap();
4777
4778        let result = run_until_finished(&mut driver).await;
4779        assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4780
4781        // LoopObserver: exactly one ToolResultReceived, with the echo
4782        // tool's output, correlating back to the model's tool call.
4783        let events = events.lock().unwrap().clone();
4784        let tool_call_id = events.iter().find_map(|e| match e {
4785            AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4786            _ => None,
4787        });
4788        let tool_results: Vec<_> = events
4789            .iter()
4790            .filter_map(|e| match e {
4791                AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4792                _ => None,
4793            })
4794            .collect();
4795        assert_eq!(tool_results.len(), 1, "events: {events:?}");
4796        assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4797        assert!(!tool_results[0].is_error);
4798
4799        // TranscriptObserver: every transcript mutation surfaces.
4800        // Expected order: User("ping"), Assistant(tool call), Tool(result),
4801        // Assistant("tool said: pong").
4802        let items = items.lock().unwrap().clone();
4803        assert_eq!(items.len(), 4, "items: {items:?}");
4804        assert_eq!(items[0].kind, ItemKind::User);
4805        assert_eq!(items[1].kind, ItemKind::Assistant);
4806        assert!(
4807            items[1]
4808                .parts
4809                .iter()
4810                .any(|p| matches!(p, Part::ToolCall(_)))
4811        );
4812        assert_eq!(items[2].kind, ItemKind::Tool);
4813        assert!(
4814            items[2]
4815                .parts
4816                .iter()
4817                .any(|p| matches!(p, Part::ToolResult(_)))
4818        );
4819        assert_eq!(items[3].kind, ItemKind::Assistant);
4820    }
4821
4822    #[test]
4823    fn convenience_cache_builders_construct_expected_defaults() {
4824        let cache = PromptCacheRequest::automatic()
4825            .with_retention(PromptCacheRetention::Short)
4826            .with_key("workspace:demo");
4827        let session = SessionConfig::new("demo").with_cache(cache.clone());
4828
4829        assert_eq!(session.session_id, SessionId::new("demo"));
4830        assert_eq!(session.cache, Some(cache));
4831
4832        let explicit = PromptCacheRequest::explicit([
4833            PromptCacheBreakpoint::tools_end(),
4834            PromptCacheBreakpoint::transcript_item_end(2),
4835            PromptCacheBreakpoint::transcript_part_end(3, 1),
4836        ]);
4837
4838        assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4839        assert_eq!(
4840            explicit.strategy,
4841            PromptCacheStrategy::Explicit {
4842                breakpoints: vec![
4843                    PromptCacheBreakpoint::ToolsEnd,
4844                    PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4845                    PromptCacheBreakpoint::TranscriptPartEnd {
4846                        item_index: 3,
4847                        part_index: 1,
4848                    },
4849                ],
4850            }
4851        );
4852    }
4853}