Skip to main content

agentkit_loop/
lib.rs

1//! Runtime-agnostic agent loop orchestration for sessions, turns, tools, and interrupts.
2//!
3//! `agentkit-loop` is the central coordination layer in the agentkit workspace.  It
4//! drives a model through a multi-turn agentic loop, executing tool calls,
5//! respecting permission checks, surfacing approval interrupts to the host
6//! application, and optionally compacting the transcript when it grows too large.
7//!
8//! # Architecture
9//!
10//! The main entry point is [`Agent`], constructed via [`AgentBuilder`]. The
11//! builder optionally accepts the prior conversation transcript via
12//! [`AgentBuilder::transcript`] and the next user turn via
13//! [`AgentBuilder::input`] — both default to empty. Calling
14//! [`Agent::start`] with a [`SessionConfig`] returns a [`LoopDriver`] that
15//! yields [`LoopStep`]s — either a finished turn or an interrupt that
16//! requires host resolution before the loop can continue.
17//!
18//! If no input was preloaded, the first call to [`LoopDriver::next`] yields
19//! [`LoopInterrupt::AwaitingInput`] and the host supplies the first user
20//! turn via [`InputRequest::submit`]. If input was preloaded, the first
21//! `next()` dispatches the model directly — convenient for one-shot calls.
22//!
23//! ```text
24//! Agent::builder()
25//!     .model(adapter)              // ModelAdapter implementation
26//!     .add_tool_source(registry)   // ToolRegistry (or any ToolSource); call again to federate
27//!     .permissions(checker)        // PermissionChecker for gating tool use
28//!     .observer(obs)               // LoopObserver for streaming events
29//!     .transcript(prior)           // optional: passive prior transcript (system prompt, resumed session)
30//!     .input(first_user_turn)      // optional: preload next user turn so first next() drives a turn
31//!     .build()?
32//!     .start(config).await?  -> LoopDriver
33//!         .next().await?     -> LoopStep::Finished | LoopStep::Interrupt(...)
34//! ```
35//!
36//! # Example
37//!
38//! ```rust,no_run
39//! use agentkit_core::{Item, ItemKind};
40//! use agentkit_loop::{
41//!     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
42//! };
43//!
44//! # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
45//! // One-shot: preload system prompt and first user message; first next()
46//! // drives the model directly.
47//! let agent = Agent::builder()
48//!     .model(adapter)
49//!     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
50//!     .input(vec![Item::text(ItemKind::User, "Hello!")])
51//!     .build()?;
52//!
53//! let mut driver = agent
54//!     .start(SessionConfig::new("demo").with_cache(
55//!         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
56//!     ))
57//!     .await?;
58//!
59//! let _ = driver.next().await?;
60//! # Ok(())
61//! # }
62//! ```
63
64use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_core::{
68    CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
69    TextPart, Timestamp, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation,
70    Usage,
71};
72use agentkit_task_manager::{
73    PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
74    TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
75};
76#[cfg(test)]
77use agentkit_tools_core::ToolContext;
78use agentkit_tools_core::{
79    AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
80    PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutionScope, ToolExecutor, ToolRequest,
81    ToolResources, ToolSource, ToolSpec,
82};
83use async_trait::async_trait;
84use serde::{Deserialize, Serialize};
85use thiserror::Error;
86
87const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
88const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
89const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
90const USER_CANCELLED_REASON: &str = "user_cancelled";
91
92/// Configuration required to start a new model session.
93///
94/// Pass this to [`Agent::start`] to initialise the underlying [`ModelSession`]
95/// and obtain a [`LoopDriver`].
96///
97/// # Example
98///
99/// ```rust
100/// use agentkit_loop::{PromptCacheRequest, PromptCacheRetention, SessionConfig};
101///
102/// let config = SessionConfig::new("my-session").with_cache(
103///     PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
104/// );
105/// ```
106#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub struct SessionConfig {
108    /// Unique identifier for the session.
109    pub session_id: SessionId,
110    /// Arbitrary key-value metadata forwarded to the model adapter.
111    pub metadata: MetadataMap,
112    /// Default provider-side prompt caching policy for turns in this session.
113    pub cache: Option<PromptCacheRequest>,
114}
115
116impl SessionConfig {
117    /// Builds a session config with empty metadata and no cache policy.
118    pub fn new(session_id: impl Into<SessionId>) -> Self {
119        Self {
120            session_id: session_id.into(),
121            metadata: MetadataMap::new(),
122            cache: None,
123        }
124    }
125
126    /// Replaces the session metadata map.
127    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
128        self.metadata = metadata;
129        self
130    }
131
132    /// Sets the default prompt cache request for the session.
133    pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
134        self.cache = Some(cache);
135        self
136    }
137
138    /// Clears any default prompt cache request for the session.
139    pub fn without_cache(mut self) -> Self {
140        self.cache = None;
141        self
142    }
143}
144
145/// Strength of a prompt-cache request.
146///
147/// `BestEffort` lets adapters ignore unsupported controls while still using
148/// any provider-native automatic caching they support. `Required` upgrades
149/// unsupported cache requests into provider errors.
150#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub enum PromptCacheMode {
152    /// Disable prompt caching for this request.
153    Disabled,
154    /// Use caching when the provider can honor the request.
155    #[default]
156    BestEffort,
157    /// Fail the turn if the provider cannot honor the request.
158    Required,
159}
160
161/// High-level provider-neutral cache retention hint.
162///
163/// Providers map this to their native controls. For example, OpenAI maps
164/// `Short` to in-memory retention while OpenRouter Anthropic models map it to
165/// the default 5-minute ephemeral cache.
166#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum PromptCacheRetention {
168    /// Use the provider's default cache retention.
169    Default,
170    /// Prefer the provider's short-lived cache retention mode.
171    Short,
172    /// Prefer the provider's longest generally available cache retention mode.
173    Extended,
174}
175
176/// Provider-neutral prompt caching strategy.
177#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
178pub enum PromptCacheStrategy {
179    /// Let the provider decide the cacheable prefix automatically.
180    #[default]
181    Automatic,
182    /// Apply explicit cache breakpoints to selected prefix boundaries.
183    Explicit {
184        /// Cache breakpoints in transcript/tool order.
185        breakpoints: Vec<PromptCacheBreakpoint>,
186    },
187}
188
189impl PromptCacheStrategy {
190    /// Uses the provider's native automatic cache behavior when available, or
191    /// any adapter-provided automatic planning fallback.
192    pub fn automatic() -> Self {
193        Self::Automatic
194    }
195
196    /// Uses explicit cache breakpoints.
197    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
198        Self::Explicit {
199            breakpoints: breakpoints.into_iter().collect(),
200        }
201    }
202}
203
204/// Prefix boundary that a provider should cache when using explicit caching.
205#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
206pub enum PromptCacheBreakpoint {
207    /// Cache the tool schema prefix through the last available tool.
208    ToolsEnd,
209    /// Cache through the end of the transcript item at `index`.
210    TranscriptItemEnd { index: usize },
211    /// Cache through the specific transcript part.
212    ///
213    /// Not every adapter can target every part precisely; unsupported
214    /// fine-grained breakpoints become best-effort no-ops unless the request is
215    /// marked [`PromptCacheMode::Required`].
216    TranscriptPartEnd {
217        item_index: usize,
218        part_index: usize,
219    },
220}
221
222impl PromptCacheBreakpoint {
223    /// Cache the tool schema prefix through the last available tool.
224    pub fn tools_end() -> Self {
225        Self::ToolsEnd
226    }
227
228    /// Cache through the end of a transcript item.
229    pub fn transcript_item_end(index: usize) -> Self {
230        Self::TranscriptItemEnd { index }
231    }
232
233    /// Cache through a specific part within a transcript item.
234    pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
235        Self::TranscriptPartEnd {
236            item_index,
237            part_index,
238        }
239    }
240}
241
242/// Prompt caching request sent alongside a turn.
243#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
244pub struct PromptCacheRequest {
245    /// Strength of the caching request.
246    pub mode: PromptCacheMode,
247    /// Automatic or explicit caching strategy.
248    pub strategy: PromptCacheStrategy,
249    /// Optional provider-neutral retention hint.
250    pub retention: Option<PromptCacheRetention>,
251    /// Optional provider cache key or routing key.
252    pub key: Option<String>,
253}
254
255impl PromptCacheRequest {
256    /// Builds a best-effort automatic cache request.
257    pub fn automatic() -> Self {
258        Self::best_effort(PromptCacheStrategy::automatic())
259    }
260
261    /// Builds a required automatic cache request.
262    pub fn automatic_required() -> Self {
263        Self::required(PromptCacheStrategy::automatic())
264    }
265
266    /// Builds a best-effort explicit cache request.
267    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
268        Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
269    }
270
271    /// Builds a required explicit cache request.
272    pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
273        Self::required(PromptCacheStrategy::explicit(breakpoints))
274    }
275
276    /// Builds a disabled cache request.
277    pub fn disabled() -> Self {
278        Self {
279            mode: PromptCacheMode::Disabled,
280            strategy: PromptCacheStrategy::Automatic,
281            retention: None,
282            key: None,
283        }
284    }
285
286    /// Builds a best-effort cache request with the given strategy.
287    pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
288        Self {
289            mode: PromptCacheMode::BestEffort,
290            strategy,
291            retention: None,
292            key: None,
293        }
294    }
295
296    /// Builds a required cache request with the given strategy.
297    pub fn required(strategy: PromptCacheStrategy) -> Self {
298        Self {
299            mode: PromptCacheMode::Required,
300            strategy,
301            retention: None,
302            key: None,
303        }
304    }
305
306    /// Overrides the request mode.
307    pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
308        self.mode = mode;
309        self
310    }
311
312    /// Overrides the request strategy.
313    pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
314        self.strategy = strategy;
315        self
316    }
317
318    /// Applies a provider-neutral retention hint.
319    pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
320        self.retention = Some(retention);
321        self
322    }
323
324    /// Applies a provider cache key or routing key.
325    pub fn with_key(mut self, key: impl Into<String>) -> Self {
326        self.key = Some(key.into());
327        self
328    }
329
330    /// Clears any provider-neutral retention hint.
331    pub fn without_retention(mut self) -> Self {
332        self.retention = None;
333        self
334    }
335
336    /// Clears any provider cache key or routing key.
337    pub fn without_key(mut self) -> Self {
338        self.key = None;
339        self
340    }
341
342    /// Returns true when caching should be active for this request.
343    pub fn is_enabled(&self) -> bool {
344        !matches!(self.mode, PromptCacheMode::Disabled)
345    }
346}
347
348/// Payload sent to the model at the start of each turn.
349///
350/// The [`LoopDriver`] constructs this automatically from its internal state
351/// and passes it to [`ModelSession::begin_turn`].  Model adapter authors
352/// use the fields to build the provider-specific request.
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct TurnRequest {
355    /// Session this turn belongs to.
356    pub session_id: SessionId,
357    /// Unique identifier for the current turn.
358    pub turn_id: agentkit_core::TurnId,
359    /// Full conversation transcript accumulated so far.
360    pub transcript: Vec<Item>,
361    /// Tool specifications the model may invoke during this turn.
362    pub available_tools: Vec<ToolSpec>,
363    /// Provider-side prompt caching request for this turn.
364    pub cache: Option<PromptCacheRequest>,
365    /// Per-turn metadata (e.g. provider hints).
366    pub metadata: MetadataMap,
367}
368
369/// Final result produced by a single model turn.
370///
371/// Returned inside [`ModelTurnEvent::Finished`] to signal that the model has
372/// completed its generation for this turn.
373#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
374pub struct ModelTurnResult {
375    /// Why the model stopped generating (e.g. completed, tool call, length).
376    pub finish_reason: FinishReason,
377    /// Items the model produced during this turn (text, tool calls, etc.).
378    pub output_items: Vec<Item>,
379    /// Token usage statistics, if available.
380    pub usage: Option<Usage>,
381    /// Provider-specific metadata about the turn.
382    pub metadata: MetadataMap,
383    /// Model identifier reported by the provider for this turn, if known.
384    ///
385    /// Stamped onto inference telemetry spans as `gen_ai.response.model`.
386    #[serde(default)]
387    pub model: Option<String>,
388    /// Provider-assigned response identifier for this turn, if known.
389    ///
390    /// Stamped onto inference telemetry spans as `gen_ai.response.id`.
391    #[serde(default)]
392    pub response_id: Option<String>,
393}
394
395/// Streaming event emitted by a [`ModelTurn`] during generation.
396///
397/// The [`LoopDriver`] consumes these events one-by-one via
398/// [`ModelTurn::next_event`] and translates them into [`AgentEvent`]s for
399/// observers and into transcript mutations.
400#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
401pub enum ModelTurnEvent {
402    /// Incremental text or content delta from the model.
403    Delta(Delta),
404    /// The model is requesting a tool call.
405    ToolCall(ToolCallPart),
406    /// Updated token usage statistics.
407    Usage(Usage),
408    /// The model has finished generating for this turn.
409    Finished(ModelTurnResult),
410}
411
412/// Factory for creating model sessions.
413///
414/// Implement this trait to integrate a model provider (e.g. OpenRouter,
415/// Anthropic, a local LLM server) with the agent loop.  [`Agent`] holds a
416/// single adapter and calls [`start_session`](ModelAdapter::start_session)
417/// once when [`Agent::start`] is invoked.
418///
419/// # Example
420///
421/// ```rust,no_run
422/// use agentkit_loop::{ModelAdapter, ModelSession, SessionConfig, LoopError};
423/// use async_trait::async_trait;
424///
425/// struct MyAdapter;
426///
427/// #[async_trait]
428/// impl ModelAdapter for MyAdapter {
429///     type Session = MySession;
430///
431///     async fn start_session(&self, config: SessionConfig) -> Result<MySession, LoopError> {
432///         // Initialize provider-specific session state here.
433///         Ok(MySession { /* ... */ })
434///     }
435/// }
436/// # struct MySession;
437/// # #[async_trait]
438/// # impl ModelSession for MySession {
439/// #     type Turn = MyTurn;
440/// #     async fn begin_turn(&mut self, _r: agentkit_loop::TurnRequest, _c: Option<agentkit_core::TurnCancellation>) -> Result<MyTurn, LoopError> { todo!() }
441/// # }
442/// # struct MyTurn;
443/// # #[async_trait]
444/// # impl agentkit_loop::ModelTurn for MyTurn {
445/// #     async fn next_event(&mut self, _c: Option<agentkit_core::TurnCancellation>) -> Result<Option<agentkit_loop::ModelTurnEvent>, LoopError> { todo!() }
446/// # }
447/// ```
448#[async_trait]
449pub trait ModelAdapter: Send + Sync {
450    /// The session type produced by this adapter.
451    type Session: ModelSession;
452
453    /// Create a new model session from the given configuration.
454    ///
455    /// # Errors
456    ///
457    /// Returns [`LoopError`] if the provider connection or initialisation fails.
458    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
459
460    /// Name of the underlying model provider, when known.
461    ///
462    /// Stamped onto agent telemetry spans as the `gen_ai.provider.name`
463    /// attribute from the OpenTelemetry GenAI semantic conventions. Use a
464    /// lowercase identifier (e.g. `openrouter`, `ollama`). The default
465    /// returns `None` for adapters without a meaningful provider identity.
466    fn provider_name(&self) -> Option<&str> {
467        None
468    }
469}
470
471/// An active model session that can produce sequential turns.
472///
473/// A session is created once per [`Agent::start`] call and lives for the
474/// lifetime of the [`LoopDriver`].  Each call to [`begin_turn`](ModelSession::begin_turn)
475/// hands the full transcript to the model and returns a streaming
476/// [`ModelTurn`].
477#[async_trait]
478pub trait ModelSession: Send {
479    /// The turn type produced by this session.
480    type Turn: ModelTurn;
481
482    /// Start a new turn, sending the transcript and available tools to the model.
483    ///
484    /// # Arguments
485    ///
486    /// * `request` -- the turn payload including transcript and tool specs.
487    /// * `cancellation` -- optional handle the implementation should poll to
488    ///   detect user-initiated cancellation.
489    ///
490    /// # Errors
491    ///
492    /// Returns [`LoopError::Cancelled`] when the turn is cancelled, or a
493    /// provider-specific error wrapped in [`LoopError`].
494    async fn begin_turn(
495        &mut self,
496        request: TurnRequest,
497        cancellation: Option<TurnCancellation>,
498    ) -> Result<Self::Turn, LoopError>;
499
500    /// Model identifier this session sends requests to, when known.
501    ///
502    /// Stamped onto inference telemetry spans as the `gen_ai.request.model`
503    /// attribute from the OpenTelemetry GenAI semantic conventions. The
504    /// default returns `None` for sessions without a fixed model.
505    fn model_name(&self) -> Option<&str> {
506        None
507    }
508}
509
510/// A streaming model turn that yields events one at a time.
511///
512/// The loop driver calls [`next_event`](ModelTurn::next_event) repeatedly
513/// until it returns `Ok(None)` (stream exhausted) or
514/// `Ok(Some(ModelTurnEvent::Finished(_)))`.
515#[async_trait]
516pub trait ModelTurn: Send {
517    /// Retrieve the next event from the model's response stream.
518    ///
519    /// Returns `Ok(None)` when the stream is exhausted.
520    ///
521    /// # Errors
522    ///
523    /// Returns [`LoopError::Cancelled`] if `cancellation` fires, or a
524    /// provider-specific error wrapped in [`LoopError`].
525    async fn next_event(
526        &mut self,
527        cancellation: Option<TurnCancellation>,
528    ) -> Result<Option<ModelTurnEvent>, LoopError>;
529}
530
531/// Observer hook for streaming agent events to the host application.
532///
533/// Register observers via [`AgentBuilder::observer`] to receive real-time
534/// notifications about deltas, tool calls, usage, warnings, and lifecycle
535/// events.
536///
537/// # Example
538///
539/// ```rust
540/// use agentkit_loop::{AgentEvent, LoopObserver};
541///
542/// struct StdoutObserver;
543///
544/// impl LoopObserver for StdoutObserver {
545///     fn handle_event(&self, event: AgentEvent) {
546///         println!("{event:?}");
547///     }
548/// }
549/// ```
550pub trait LoopObserver: Send + Sync {
551    /// Called synchronously for every [`AgentEvent`] emitted by the loop driver.
552    /// Observers store mutable state behind interior mutability (`Mutex`,
553    /// atomics, channels) so the driver can share an `Arc<dyn LoopObserver>`
554    /// across reusable [`Agent`] starts.
555    fn handle_event(&self, event: AgentEvent);
556}
557
558/// Receives full [`Item`]s as they are appended to the driver's transcript.
559///
560/// While [`LoopObserver`] surfaces operational events (deltas, tool calls,
561/// lifecycle, telemetry), it can't be reconstructed back into a faithful
562/// transcript on its own — content deltas span partial parts and don't
563/// carry their parent-Item identity, and historically tool results were
564/// pushed into the transcript with no observer event at all. A
565/// `TranscriptObserver` is the loss-free counterpart: it fires once per
566/// [`Item`] appended, with the full Item shape ready for persistence,
567/// replication, or audit.
568///
569/// Observers are called *synchronously* from inside the driver, in the
570/// same order items land in the transcript. Compaction-driven transcript
571/// rewrites do **not** fire `on_item_appended` — those are signaled by
572/// [`AgentEvent::CompactionFinished`] instead.
573///
574/// Register via [`AgentBuilder::transcript_observer`]; multiple observers
575/// may be registered and are called in registration order.
576///
577/// # Example
578///
579/// ```rust
580/// use agentkit_core::Item;
581/// use agentkit_loop::TranscriptObserver;
582/// use std::sync::atomic::{AtomicUsize, Ordering};
583///
584/// struct CountingObserver { items: AtomicUsize }
585///
586/// impl TranscriptObserver for CountingObserver {
587///     fn on_item_appended(&self, _item: &Item) {
588///         self.items.fetch_add(1, Ordering::Relaxed);
589///     }
590/// }
591/// ```
592pub trait TranscriptObserver: Send + Sync {
593    /// Called synchronously every time an [`Item`] is appended to the
594    /// driver's transcript, in transcript order. Observers store mutable
595    /// state behind interior mutability so the driver can share an
596    /// `Arc<dyn TranscriptObserver>`.
597    fn on_item_appended(&self, item: &Item);
598}
599
600/// Where in the loop a [`LoopMutator`] is given a chance to modify the
601/// transcript. Mutators run synchronously at these points; mid-stream
602/// mutation (e.g. between content deltas) is intentionally not supported
603/// because the assistant item is not yet fully constructed.
604#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
605#[non_exhaustive]
606pub enum MutationPoint {
607    /// A tool result has just been appended; the next loop step will be
608    /// another inference call.
609    AfterToolResult,
610    /// A turn has fully ended (assistant final, interrupt, or cancellation)
611    /// and any new user input has not yet been dispatched.
612    AfterTurnEnded,
613}
614
615/// Sink for emitting [`AgentEvent`]s from inside a [`LoopMutator`].
616/// The driver supplies a concrete implementation via [`LoopCtx::emitter`].
617pub trait EventEmitter: Send + Sync {
618    /// Forward `event` to all registered observers.
619    fn emit(&self, event: AgentEvent);
620}
621
622/// Read-only context handed to a [`LoopMutator`] alongside the cursor.
623#[non_exhaustive]
624pub struct LoopCtx<'a> {
625    /// Session this mutation point belongs to.
626    pub session_id: &'a SessionId,
627    /// Turn the mutation is associated with, if any.
628    pub turn_id: Option<&'a agentkit_core::TurnId>,
629    /// Where in the loop the mutator is running.
630    pub point: MutationPoint,
631    /// Cancellation handle for the active turn, if any.
632    pub cancellation: Option<TurnCancellation>,
633    /// Sink for emitting events from the mutator (telemetry, progress).
634    pub emitter: &'a dyn EventEmitter,
635}
636
637/// Mutable handle over the live transcript with dirty tracking.
638///
639/// Implements [`Deref`](std::ops::Deref)/[`DerefMut`](std::ops::DerefMut) to
640/// `Vec<Item>` so mutators read and write through `Vec`'s native API
641/// (`push`, `retain`, `iter`, `*cursor = ...`). Any `&mut` access marks the
642/// cursor dirty; the loop validates transcript invariants when at least one
643/// mutator dirtied the transcript and hard-fails on protocol violations.
644pub struct TranscriptCursor<'a> {
645    items: &'a mut Vec<Item>,
646    pub(crate) dirty: bool,
647}
648
649impl<'a> std::ops::Deref for TranscriptCursor<'a> {
650    type Target = Vec<Item>;
651    fn deref(&self) -> &Vec<Item> {
652        self.items
653    }
654}
655
656impl<'a> std::ops::DerefMut for TranscriptCursor<'a> {
657    fn deref_mut(&mut self) -> &mut Vec<Item> {
658        self.dirty = true;
659        self.items
660    }
661}
662
663/// Async transcript mutator. Registered via [`AgentBuilder::mutator`] and
664/// invoked at each [`MutationPoint`]. Mutators own their derived state
665/// (e.g. running token totals via interior mutability) and decide for
666/// themselves whether and how to modify the transcript.
667///
668/// The default implementation is a no-op so trait users override only
669/// `mutate`.
670#[async_trait]
671pub trait LoopMutator: Send + Sync {
672    /// Run this mutator. Returning without writing to `cursor` is a no-op.
673    /// Errors abort the loop; protocol-violating mutations (orphaned tool
674    /// uses or results) are detected by validation and turned into
675    /// [`LoopError::Mutator`].
676    async fn mutate(
677        &self,
678        cursor: &mut TranscriptCursor<'_>,
679        ctx: LoopCtx<'_>,
680    ) -> Result<(), LoopError> {
681        let _ = (cursor, ctx);
682        Ok(())
683    }
684}
685
686/// Lifecycle and streaming events emitted by the [`LoopDriver`].
687///
688/// Observers (see [`LoopObserver`]) receive these events in the order they
689/// occur.  They are useful for building UIs, logging, or telemetry.
690#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
691pub enum AgentEvent {
692    /// The agent run has been initialised.
693    RunStarted { session_id: SessionId },
694    /// A new model turn is starting.
695    TurnStarted {
696        session_id: SessionId,
697        turn_id: agentkit_core::TurnId,
698    },
699    /// User input has been accepted into the pending queue.
700    InputAccepted {
701        session_id: SessionId,
702        items: Vec<Item>,
703    },
704    /// Incremental content delta from the model.
705    ContentDelta(Delta),
706    /// The model has requested a tool call.
707    ToolCallRequested(ToolCallPart),
708    /// A tool call's result has landed in the transcript.
709    ///
710    /// Fires once per [`Part::ToolResult`] that's appended, including the
711    /// synthetic placeholder that's pushed when a tool is detached to the
712    /// background — the real result fires a second event with the same
713    /// [`ToolResultPart::call_id`] once the background task completes.
714    /// Cancellation/denial paths (auth cancelled, approval denied) also
715    /// emit this with `is_error = true`.
716    ///
717    /// Correlate with the matching [`AgentEvent::ToolCallRequested`] via
718    /// `call_id`.
719    ToolResultReceived(ToolResultPart),
720    /// A tool call requires explicit user approval before execution.
721    ApprovalRequired(ApprovalRequest),
722    /// An approval interrupt has been resolved.
723    ApprovalResolved { approved: bool },
724    /// The available tool catalog changed and will be reflected on the next model request.
725    ToolCatalogChanged(ToolCatalogEvent),
726    /// A [`LoopMutator`] is about to run at one of the mutation points.
727    /// `mutator` is a stable label the implementation chooses for itself.
728    MutationStarted {
729        session_id: SessionId,
730        turn_id: Option<agentkit_core::TurnId>,
731        mutator: String,
732        point: MutationPoint,
733    },
734    /// A [`LoopMutator`] has finished running. `dirty` indicates whether the
735    /// transcript was modified; `metadata` carries mutator-specific extras
736    /// (e.g. compaction reason, replaced item count).
737    MutationFinished {
738        session_id: SessionId,
739        turn_id: Option<agentkit_core::TurnId>,
740        mutator: String,
741        dirty: bool,
742        metadata: MetadataMap,
743    },
744    /// Updated token usage statistics.
745    UsageUpdated(Usage),
746    /// Non-fatal warning (e.g. a tool failure that was recovered from).
747    Warning { message: String },
748    /// The agent run has failed with an unrecoverable error.
749    RunFailed { message: String },
750    /// A turn has finished (successfully, via cancellation, etc.).
751    TurnFinished(TurnResult),
752}
753
754/// Handle for a pending approval interrupt.
755///
756/// Wraps an [`ApprovalRequest`] and provides ergonomic resolution methods
757/// so callers can resolve the interrupt directly instead of searching for
758/// the matching method on [`LoopDriver`].
759///
760/// # Example
761///
762/// ```rust,no_run
763/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
764/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
765/// match driver.next().await? {
766///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
767///         println!("Needs approval: {}", pending.request.summary);
768///         pending.approve(driver)?;
769///     }
770///     _ => {}
771/// }
772/// # Ok(())
773/// # }
774/// ```
775#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
776pub struct PendingApproval {
777    /// The underlying approval request details.
778    pub request: ApprovalRequest,
779}
780
781impl std::ops::Deref for PendingApproval {
782    type Target = ApprovalRequest;
783    fn deref(&self) -> &ApprovalRequest {
784        &self.request
785    }
786}
787
788impl PendingApproval {
789    /// Approve the pending tool call.
790    pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
791        let call_id = self
792            .request
793            .call_id
794            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
795        driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
796    }
797
798    /// Deny the pending tool call.
799    pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
800        let call_id = self
801            .request
802            .call_id
803            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
804        driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
805    }
806
807    /// Deny the pending tool call with a reason.
808    pub fn deny_with_reason<S: ModelSession>(
809        self,
810        driver: &mut LoopDriver<S>,
811        reason: impl Into<String>,
812    ) -> Result<(), LoopError> {
813        let call_id = self
814            .request
815            .call_id
816            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
817        driver.resolve_approval_for(
818            call_id,
819            ApprovalDecision::Deny {
820                reason: Some(reason.into()),
821            },
822        )
823    }
824
825    /// Approve the pending tool call with a patched input.
826    ///
827    /// The model's original tool input is replaced with `input` before the
828    /// tool executes. The transcript still records the call as the model
829    /// emitted it; only the executor sees the patched payload. This mirrors
830    /// the `PermissionResultAllow(updated_input=...)` pattern from the
831    /// Anthropic Agent SDK and is intended for hosts that want to sanitise,
832    /// restrict, or augment arguments before tool execution without forcing
833    /// the model to re-issue the call.
834    pub fn approve_with_patched_input<S: ModelSession>(
835        self,
836        driver: &mut LoopDriver<S>,
837        input: serde_json::Value,
838    ) -> Result<(), LoopError> {
839        let call_id = self
840            .request
841            .call_id
842            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
843        driver.resolve_approval_for_with_patched_input(call_id, input)
844    }
845}
846
847/// Descriptor for a [`LoopInterrupt::AwaitingInput`] interrupt.
848///
849/// Returned when the driver has no pending input and needs the host to
850/// supply items before advancing. This is the entry point for every user
851/// turn that wasn't preloaded via [`AgentBuilder::input`]. Transcript items
852/// loaded via [`AgentBuilder::transcript`] are passive, so when no input is
853/// preloaded the first [`LoopDriver::next`] call surfaces `AwaitingInput`
854/// and the host injects the first user message via [`InputRequest::submit`].
855///
856/// # Example
857///
858/// ```rust,no_run
859/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
860/// # use agentkit_core::Item;
861/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>, items: Vec<Item>) -> Result<(), agentkit_loop::LoopError> {
862/// match driver.next().await? {
863///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
864///         pending.submit(driver, items)?;
865///     }
866///     _ => {}
867/// }
868/// # Ok(())
869/// # }
870/// ```
871#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
872pub struct InputRequest {
873    /// The session that is waiting for input.
874    pub session_id: SessionId,
875    /// Human-readable explanation of why input is needed.
876    pub reason: String,
877}
878
879impl InputRequest {
880    /// Submit input items to the driver.
881    pub fn submit<S: ModelSession>(
882        self,
883        driver: &mut LoopDriver<S>,
884        items: Vec<Item>,
885    ) -> Result<(), LoopError> {
886        driver.submit_input(items)
887    }
888}
889
890/// Outcome of a completed (or cancelled) turn.
891///
892/// Wrapped by [`LoopStep::Finished`] and also emitted as
893/// [`AgentEvent::TurnFinished`] to observers.
894#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
895pub struct TurnResult {
896    /// Identifier for the turn that produced this result.
897    pub turn_id: agentkit_core::TurnId,
898    /// Why the turn ended (completed, tool call, cancelled, etc.).
899    pub finish_reason: FinishReason,
900    /// Items produced during this turn (assistant text, tool results, etc.).
901    pub items: Vec<Item>,
902    /// Aggregated token usage, if reported by the model.
903    pub usage: Option<Usage>,
904    /// Additional metadata about the turn.
905    pub metadata: MetadataMap,
906}
907
908/// An interrupt that pauses the agent loop until the host resolves it.
909///
910/// The loop returns an interrupt inside [`LoopStep::Interrupt`] whenever it
911/// cannot proceed autonomously.  Each variant carries a handle with
912/// resolution methods so callers can resolve the interrupt directly.
913///
914/// # Example
915///
916/// ```rust,no_run
917/// use agentkit_loop::{LoopInterrupt, LoopStep};
918/// # use agentkit_loop::LoopDriver;
919///
920/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
921/// match driver.next().await? {
922///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
923///         println!("Tool {} needs approval: {}", pending.request.request_kind, pending.request.summary);
924///         pending.approve(driver)?;
925///     }
926///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
927///         println!("Waiting for input: {}", pending.reason);
928///         // ... call pending.submit(driver, items)
929///     }
930///     LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => {
931///         // Cooperative yield between tool rounds.  Optionally call
932///         // driver.submit_input(...) to interject a user message, then
933///         // call driver.next() to resume the turn.
934///         let _ = info;
935///     }
936///     LoopStep::Finished(result) => {
937///         println!("Turn finished: {:?}", result.finish_reason);
938///     }
939/// }
940/// # Ok(())
941/// # }
942/// ```
943#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
944pub enum LoopInterrupt {
945    /// A tool call requires explicit approval before it can execute.
946    ApprovalRequest(PendingApproval),
947    /// The driver has no pending input and needs the host to supply some.
948    AwaitingInput(InputRequest),
949    /// A tool round finished: all tool calls from the previous assistant
950    /// message now have results in the transcript, and the driver is about to
951    /// invoke the model again. The host may interject user messages via the
952    /// [`ToolRoundInfo::submit`] handle before calling [`LoopDriver::next`]
953    /// to resume.
954    ///
955    /// This is a non-blocking interrupt: callers that do not care about
956    /// mid-turn interjection can treat it as a no-op (`_ => continue`) and
957    /// the next `next()` call resumes the turn.
958    AfterToolResult(ToolRoundInfo),
959}
960
961impl LoopInterrupt {
962    /// Returns `true` if the interrupt must be explicitly resolved before
963    /// the loop can make progress. Approvals are blocking;
964    /// [`AwaitingInput`](LoopInterrupt::AwaitingInput) and
965    /// [`AfterToolResult`](LoopInterrupt::AfterToolResult) are cooperative
966    /// and can be ignored by calling [`LoopDriver::next`] again.
967    pub fn is_blocking(&self) -> bool {
968        matches!(self, LoopInterrupt::ApprovalRequest(_))
969    }
970}
971
972/// Metadata describing a completed tool round, surfaced via
973/// [`LoopInterrupt::AfterToolResult`].
974#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
975pub struct ToolRoundInfo {
976    /// The session that produced this tool round.
977    pub session_id: SessionId,
978    /// The turn that is about to continue into the next model call.
979    pub turn_id: agentkit_core::TurnId,
980    /// Transcript length at the yield point (for snapshots / UIs).
981    pub transcript_len: usize,
982}
983
984impl ToolRoundInfo {
985    /// Interject user input between tool rounds. Consumes the
986    /// [`ToolRoundInfo`] handle so the same yield cannot accept input twice.
987    pub fn submit<S: ModelSession>(
988        self,
989        driver: &mut LoopDriver<S>,
990        items: Vec<Item>,
991    ) -> Result<(), LoopError> {
992        driver.submit_input(items)
993    }
994}
995
996/// The result of advancing the agent loop by one step.
997///
998/// Returned by [`LoopDriver::next`].  The host should pattern-match on this
999/// to decide whether to continue the loop or resolve an interrupt first.
1000///
1001/// # Example
1002///
1003/// ```rust,no_run
1004/// use agentkit_loop::LoopStep;
1005/// # use agentkit_loop::LoopDriver;
1006///
1007/// # async fn run<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1008/// loop {
1009///     match driver.next().await? {
1010///         LoopStep::Finished(result) => {
1011///             println!("Turn complete: {:?}", result.finish_reason);
1012///             break;
1013///         }
1014///         LoopStep::Interrupt(interrupt) => {
1015///             // Resolve the interrupt, then continue the loop.
1016///             # break;
1017///         }
1018///     }
1019/// }
1020/// # Ok(())
1021/// # }
1022/// ```
1023#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1024pub enum LoopStep {
1025    /// The loop is paused and requires host action.
1026    Interrupt(LoopInterrupt),
1027    /// A turn has completed (or been cancelled).
1028    Finished(TurnResult),
1029}
1030
1031/// A read-only snapshot of the loop driver's current state.
1032///
1033/// Obtained via [`LoopDriver::snapshot`].  Useful for persisting or
1034/// inspecting the conversation transcript without holding a mutable
1035/// reference to the driver.
1036#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1037pub struct LoopSnapshot {
1038    /// Session identifier.
1039    pub session_id: SessionId,
1040    /// The full transcript accumulated so far.
1041    pub transcript: Vec<Item>,
1042    /// Input items queued but not yet consumed by a turn.
1043    pub pending_input: Vec<Item>,
1044}
1045
1046#[derive(Clone, Debug)]
1047struct PendingApprovalToolCall {
1048    request: ApprovalRequest,
1049    decision: Option<ApprovalDecision>,
1050    surfaced: bool,
1051    turn_id: agentkit_core::TurnId,
1052    task_id: TaskId,
1053    call: ToolCallPart,
1054    tool_request: ToolRequest,
1055}
1056
1057#[derive(Clone, Debug, Default)]
1058struct ActiveToolRound {
1059    turn_id: agentkit_core::TurnId,
1060    pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
1061    background_pending: bool,
1062    foreground_progressed: bool,
1063}
1064
1065/// A configured agent ready to start a session.
1066///
1067/// Build one with [`Agent::builder`], supplying at minimum a [`ModelAdapter`].
1068/// Optionally preload prior conversation state via
1069/// [`AgentBuilder::transcript`] and the next user turn via
1070/// [`AgentBuilder::input`]. Then call [`Agent::start`] with a
1071/// [`SessionConfig`] to obtain a [`LoopDriver`] that drives the agentic loop.
1072///
1073/// If no input is preloaded, the first call to [`LoopDriver::next`] yields
1074/// [`LoopInterrupt::AwaitingInput`] so the host can supply the first user
1075/// message via [`InputRequest::submit`]. If input was preloaded, the first
1076/// `next()` dispatches the model directly.
1077///
1078/// # Example
1079///
1080/// ```rust,no_run
1081/// use agentkit_core::{Item, ItemKind};
1082/// use agentkit_loop::{
1083///     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
1084/// };
1085/// use agentkit_tools_core::ToolRegistry;
1086///
1087/// # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
1088/// let agent = Agent::builder()
1089///     .model(adapter)
1090///     .add_tool_source(ToolRegistry::new())
1091///     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
1092///     .input(vec![Item::text(ItemKind::User, "Hello!")])
1093///     .build()?;
1094///
1095/// let mut driver = agent
1096///     .start(SessionConfig::new("s1").with_cache(
1097///         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
1098///     ))
1099///     .await?;
1100///
1101/// // First next() drives the model since input was preloaded.
1102/// let _ = driver.next().await?;
1103/// # Ok(())
1104/// # }
1105/// ```
1106pub struct Agent<M>
1107where
1108    M: ModelAdapter,
1109{
1110    model: M,
1111    tool_sources: Vec<Arc<dyn ToolSource>>,
1112    tool_executor: Option<Arc<dyn ToolExecutor>>,
1113    task_manager: Arc<dyn TaskManager>,
1114    permissions: Arc<dyn PermissionChecker>,
1115    resources: Arc<dyn ToolResources>,
1116    cancellation: Option<CancellationHandle>,
1117    mutators: Vec<Arc<dyn LoopMutator>>,
1118    observers: Vec<Arc<dyn LoopObserver>>,
1119    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1120    transcript: Vec<Item>,
1121    input: Vec<Item>,
1122}
1123
1124impl<M> Agent<M>
1125where
1126    M: ModelAdapter,
1127{
1128    /// Create a new [`AgentBuilder`] for configuring this agent.
1129    pub fn builder() -> AgentBuilder<M> {
1130        AgentBuilder::default()
1131    }
1132
1133    /// Start a session, returning a [`LoopDriver`] preloaded with whatever
1134    /// transcript and input were configured on the builder. See
1135    /// [`AgentBuilder::transcript`] and [`AgentBuilder::input`] for what each
1136    /// one does and when to use them.
1137    ///
1138    /// This calls [`ModelAdapter::start_session`] and emits an
1139    /// [`AgentEvent::RunStarted`] event to all registered observers.
1140    ///
1141    /// `&self` so a single configured agent can mint multiple sessions over
1142    /// its lifetime — e.g. an outer agent that uses an inner sub-agent for
1143    /// transcript compaction.
1144    ///
1145    /// # Errors
1146    ///
1147    /// Returns [`LoopError`] if the model adapter fails to create a session.
1148    pub async fn start(&self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1149        let session_id = config.session_id.clone();
1150        let default_cache = config.cache.clone();
1151        let session = self.model.start_session(config).await?;
1152        let tool_executor = self
1153            .tool_executor
1154            .clone()
1155            .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1156        let driver = LoopDriver {
1157            session_id: session_id.clone(),
1158            provider_name: self.model.provider_name().map(str::to_owned),
1159            default_cache,
1160            next_turn_cache: None,
1161            session: Some(session),
1162            tool_executor,
1163            task_manager: self.task_manager.clone(),
1164            permissions: self.permissions.clone(),
1165            resources: self.resources.clone(),
1166            cancellation: self.cancellation.clone(),
1167            mutators: self.mutators.clone(),
1168            observers: self.observers.clone(),
1169            transcript_observers: self.transcript_observers.clone(),
1170            transcript: self.transcript.clone(),
1171            pending_input: self.input.clone(),
1172            pending_approvals: BTreeMap::new(),
1173            pending_approval_order: VecDeque::new(),
1174            active_tool_round: None,
1175            pending_round_resume: None,
1176            next_turn_index: 1,
1177            detached_call_ids: HashSet::new(),
1178        };
1179        driver.emit(AgentEvent::RunStarted { session_id });
1180        Ok(driver)
1181    }
1182}
1183
1184/// Builder for constructing an [`Agent`].
1185///
1186/// Obtained via [`Agent::builder`].  The only required field is
1187/// [`model`](AgentBuilder::model); all others have sensible defaults
1188/// (no tools, allow-all permissions, no compaction, no observers).
1189pub struct AgentBuilder<M>
1190where
1191    M: ModelAdapter,
1192{
1193    model: Option<M>,
1194    tool_sources: Vec<Arc<dyn ToolSource>>,
1195    tool_executor: Option<Arc<dyn ToolExecutor>>,
1196    task_manager: Option<Arc<dyn TaskManager>>,
1197    permissions: Arc<dyn PermissionChecker>,
1198    resources: Arc<dyn ToolResources>,
1199    cancellation: Option<CancellationHandle>,
1200    mutators: Vec<Arc<dyn LoopMutator>>,
1201    observers: Vec<Arc<dyn LoopObserver>>,
1202    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1203    transcript: Vec<Item>,
1204    input: Vec<Item>,
1205}
1206
1207impl<M> Default for AgentBuilder<M>
1208where
1209    M: ModelAdapter,
1210{
1211    fn default() -> Self {
1212        Self {
1213            model: None,
1214            tool_sources: Vec::new(),
1215            tool_executor: None,
1216            task_manager: None,
1217            permissions: Arc::new(AllowAllPermissions),
1218            resources: Arc::new(()),
1219            cancellation: None,
1220            mutators: Vec::new(),
1221            observers: Vec::new(),
1222            transcript_observers: Vec::new(),
1223            transcript: Vec::new(),
1224            input: Vec::new(),
1225        }
1226    }
1227}
1228
1229impl<M> AgentBuilder<M>
1230where
1231    M: ModelAdapter,
1232{
1233    /// Set the model adapter (required).
1234    pub fn model(mut self, model: M) -> Self {
1235        self.model = Some(model);
1236        self
1237    }
1238
1239    /// Adds a tool source to the agent. Call multiple times to compose
1240    /// federated sources — for example a frozen native [`ToolRegistry`]
1241    /// alongside an MCP manager's [`agentkit_tools_core::CatalogReader`]
1242    /// and a skill-watcher reader. Sources are walked in registration
1243    /// order; the default [`agentkit_tools_core::CollisionPolicy`] is
1244    /// `FirstWins`.
1245    ///
1246    /// Accepts any sized [`ToolSource`]; the agent owns it for the
1247    /// session. To share a dynamic source between the agent and the
1248    /// subsystem mutating it, mint a [`agentkit_tools_core::CatalogReader`]
1249    /// from a [`agentkit_tools_core::dynamic_catalog`] pair — the reader
1250    /// is sized and owned, hosts never see the underlying `Arc`.
1251    pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1252        self.tool_sources.push(Arc::new(source));
1253        self
1254    }
1255
1256    /// Set a custom [`ToolExecutor`]. When provided, the agent uses it
1257    /// instead of building a [`BasicToolExecutor`] from the configured
1258    /// sources. Most hosts should use [`add_tool_source`](Self::add_tool_source)
1259    /// instead; this is for advanced cases (custom routing, instrumentation,
1260    /// test fakes).
1261    pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1262        self.tool_executor = Some(Arc::new(executor));
1263        self
1264    }
1265
1266    /// Set the task manager that schedules tool-call execution.
1267    ///
1268    /// Defaults to [`SimpleTaskManager`], which preserves the existing
1269    /// sequential request/response behavior.
1270    pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1271        self.task_manager = Some(Arc::new(manager));
1272        self
1273    }
1274
1275    /// Set the permission checker that gates tool execution.
1276    ///
1277    /// Defaults to allowing all tool calls without prompting.
1278    pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1279        self.permissions = Arc::new(permissions);
1280        self
1281    }
1282
1283    /// Set shared resources available to tool implementations.
1284    pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1285        self.resources = Arc::new(resources);
1286        self
1287    }
1288
1289    /// Attach a [`CancellationHandle`] for cooperative cancellation of turns.
1290    pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1291        self.cancellation = Some(handle);
1292        self
1293    }
1294
1295    /// Register a [`LoopMutator`] that runs at every [`MutationPoint`].
1296    ///
1297    /// Multiple mutators may be registered; they run in registration order
1298    /// and the dirty flag propagates across the pipeline. After every pass
1299    /// in which any mutator dirtied the transcript, the loop validates
1300    /// protocol invariants (tool_use/tool_result pairing); a violation is a
1301    /// hard [`LoopError::Mutator`] failure.
1302    pub fn mutator<L: LoopMutator + 'static>(mut self, mutator: L) -> Self {
1303        self.mutators.push(Arc::new(mutator));
1304        self
1305    }
1306
1307    /// Register a [`LoopObserver`] that receives [`AgentEvent`]s.
1308    ///
1309    /// Multiple observers may be registered; they are called in order.
1310    pub fn observer<O: LoopObserver + 'static>(mut self, observer: O) -> Self {
1311        self.observers.push(Arc::new(observer));
1312        self
1313    }
1314
1315    /// Register a [`TranscriptObserver`] that receives an [`Item`] every
1316    /// time one is appended to the transcript.
1317    ///
1318    /// Multiple observers may be registered; they are called in order.
1319    /// Use this when you need a loss-free view of the transcript (e.g.
1320    /// for persistence or replication) — [`LoopObserver`] alone is
1321    /// insufficient because it doesn't expose item boundaries for model
1322    /// output and historically did not surface tool results at all.
1323    pub fn transcript_observer<O: TranscriptObserver + 'static>(mut self, observer: O) -> Self {
1324        self.transcript_observers.push(Arc::new(observer));
1325        self
1326    }
1327
1328    /// Preload the driver's transcript with prior conversation state
1329    /// (defaults to empty).
1330    ///
1331    /// Items pass straight into the driver's transcript without firing
1332    /// [`TranscriptObserver::on_item_appended`] — the host is expected to
1333    /// already know about (and have persisted) anything it preloads. Use
1334    /// this for resumed sessions or to seed a system prompt.
1335    pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1336        self.transcript = transcript;
1337        self
1338    }
1339
1340    /// Preload the driver's pending-input queue with the next user turn
1341    /// (defaults to empty).
1342    ///
1343    /// When non-empty, the first [`LoopDriver::next`] dispatches the model
1344    /// directly instead of yielding [`LoopInterrupt::AwaitingInput`]. Use
1345    /// this for one-shot calls and scripts where the first user turn is
1346    /// known up front. Items move to the transcript on turn dispatch the
1347    /// same way submitted input does, firing transcript observers.
1348    pub fn input(mut self, input: Vec<Item>) -> Self {
1349        self.input = input;
1350        self
1351    }
1352
1353    /// Consume the builder and produce an [`Agent`].
1354    ///
1355    /// # Errors
1356    ///
1357    /// Returns [`LoopError::InvalidState`] if no model adapter was provided.
1358    pub fn build(self) -> Result<Agent<M>, LoopError> {
1359        let model = self
1360            .model
1361            .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1362        Ok(Agent {
1363            model,
1364            tool_sources: self.tool_sources,
1365            tool_executor: self.tool_executor,
1366            task_manager: self
1367                .task_manager
1368                .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1369            permissions: self.permissions,
1370            resources: self.resources,
1371            cancellation: self.cancellation,
1372            mutators: self.mutators,
1373            observers: self.observers,
1374            transcript_observers: self.transcript_observers,
1375            transcript: self.transcript,
1376            input: self.input,
1377        })
1378    }
1379}
1380
1381/// The runtime driver that advances the agent loop step by step.
1382///
1383/// Obtained from [`Agent::start`] with the builder's preloaded transcript
1384/// and pending-input queue baked in.
1385/// The typical usage pattern is:
1386///
1387/// 1. Call [`next`](LoopDriver::next) to advance the loop.
1388/// 2. Handle the returned [`LoopStep`]:
1389///    - [`LoopStep::Finished`] -- the turn completed, inspect the result.
1390///    - [`LoopStep::Interrupt`] -- resolve the interrupt via the bound
1391///      [`Pending*`](LoopInterrupt) handle, then call `next` again.
1392///
1393/// # Example
1394///
1395/// ```rust,no_run
1396/// use agentkit_core::{Item, ItemKind};
1397/// use agentkit_loop::{LoopDriver, LoopStep};
1398///
1399/// # async fn drive<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1400/// let step = driver.next().await?;
1401/// match step {
1402///     LoopStep::Finished(result) => println!("Done: {:?}", result.finish_reason),
1403///     LoopStep::Interrupt(interrupt) => {
1404///         // Resolve via the pending handle, then call next() again.
1405///         println!("Interrupted: {interrupt:?}");
1406///     }
1407/// }
1408/// # Ok(())
1409/// # }
1410/// ```
1411pub struct LoopDriver<S>
1412where
1413    S: ModelSession,
1414{
1415    session_id: SessionId,
1416    provider_name: Option<String>,
1417    default_cache: Option<PromptCacheRequest>,
1418    next_turn_cache: Option<PromptCacheRequest>,
1419    session: Option<S>,
1420    tool_executor: Arc<dyn ToolExecutor>,
1421    task_manager: Arc<dyn TaskManager>,
1422    permissions: Arc<dyn PermissionChecker>,
1423    resources: Arc<dyn ToolResources>,
1424    cancellation: Option<CancellationHandle>,
1425    mutators: Vec<Arc<dyn LoopMutator>>,
1426    observers: Vec<Arc<dyn LoopObserver>>,
1427    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1428    transcript: Vec<Item>,
1429    pending_input: Vec<Item>,
1430    pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1431    pending_approval_order: VecDeque<ToolCallId>,
1432    active_tool_round: Option<ActiveToolRound>,
1433    pending_round_resume: Option<agentkit_core::TurnId>,
1434    next_turn_index: u64,
1435    /// Call ids whose original tool_use was already paired with a
1436    /// synthetic detach tool_result. When the real result eventually
1437    /// arrives via the task manager, we MUST NOT emit a second
1438    /// tool_result for the same id — the provider schema requires
1439    /// exactly one tool_result per tool_use. Instead we route the
1440    /// resolution into a [`ItemKind::Notification`] item that the model
1441    /// can react to on the next turn.
1442    detached_call_ids: HashSet<ToolCallId>,
1443}
1444
1445impl<S> LoopDriver<S>
1446where
1447    S: ModelSession,
1448{
1449    fn execute_tool_span(
1450        &self,
1451        request: &ToolRequest,
1452        turn_id: &agentkit_core::TurnId,
1453        launch_kind: &'static str,
1454    ) -> tracing::Span {
1455        tracing::info_span!(
1456            "agent.execute_tool",
1457            "otel.name" = %format!("execute_tool {}", request.tool_name),
1458            "gen_ai.operation.name" = "execute_tool",
1459            "gen_ai.tool.name" = %request.tool_name,
1460            "gen_ai.tool.call.id" = %request.call_id,
1461            "gen_ai.conversation.id" = %self.session_id,
1462            "error.type" = tracing::field::Empty,
1463            session.id = %self.session_id,
1464            turn.id = %turn_id,
1465            launch_kind = launch_kind,
1466        )
1467    }
1468
1469    fn start_task_via_manager(
1470        &self,
1471        task_id: Option<TaskId>,
1472        tool_request: ToolRequest,
1473        kind: TaskLaunchKind,
1474        cancellation: Option<TurnCancellation>,
1475    ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1476    {
1477        let task_manager = self.task_manager.clone();
1478        let tool_executor = self.tool_executor.clone();
1479        let permissions = self.permissions.clone();
1480        let resources = self.resources.clone();
1481        let session_id = self.session_id.clone();
1482        let turn_id = tool_request.turn_id.clone();
1483        let metadata = tool_request.metadata.clone();
1484
1485        async move {
1486            task_manager
1487                .start_task(
1488                    TaskLaunchRequest {
1489                        task_id,
1490                        request: tool_request.clone(),
1491                        kind,
1492                    },
1493                    TaskStartContext {
1494                        executor: tool_executor.clone(),
1495                        tool_context: {
1496                            let execution_scope = ToolExecutionScope {
1497                                executor: tool_executor,
1498                                session_id: session_id.clone(),
1499                                turn_id: turn_id.clone(),
1500                                permissions: permissions.clone(),
1501                                resources: resources.clone(),
1502                                cancellation: cancellation.clone(),
1503                            };
1504                            OwnedToolContext {
1505                                session_id,
1506                                turn_id,
1507                                metadata,
1508                                permissions,
1509                                resources,
1510                                cancellation,
1511                                execution_scope: Some(execution_scope),
1512                                approved_request: None,
1513                            }
1514                        },
1515                    },
1516                )
1517                .await
1518                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1519        }
1520    }
1521
1522    fn has_pending_interrupts(&self) -> bool {
1523        !self.pending_approvals.is_empty()
1524    }
1525
1526    fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1527        for event in events {
1528            self.emit(AgentEvent::ToolCatalogChanged(event));
1529        }
1530    }
1531
1532    fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1533        let call_id = task.tool_request.call_id.clone();
1534        let call = ToolCallPart {
1535            id: call_id.clone(),
1536            name: task.tool_request.tool_name.to_string(),
1537            input: task.tool_request.input.clone(),
1538            metadata: task.tool_request.metadata.clone(),
1539        };
1540        let mut request = task.approval;
1541        request.call_id = Some(call_id.clone());
1542        let pending = PendingApprovalToolCall {
1543            request: request.clone(),
1544            decision: None,
1545            surfaced: false,
1546            turn_id: turn_id.clone(),
1547            task_id: task.task_id,
1548            call,
1549            tool_request: task.tool_request,
1550        };
1551        self.pending_approvals.insert(call_id.clone(), pending);
1552        if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1553            self.pending_approval_order.push_back(call_id);
1554        }
1555        self.emit(AgentEvent::ApprovalRequired(request));
1556    }
1557
1558    fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1559        for call_id in self.pending_approval_order.clone() {
1560            let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1561                continue;
1562            };
1563            if pending.decision.is_none() && !pending.surfaced {
1564                pending.surfaced = true;
1565                return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1566                    PendingApproval {
1567                        request: pending.request.clone(),
1568                    },
1569                )));
1570            }
1571        }
1572        None
1573    }
1574
1575    fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1576        self.pending_approval_order.iter().find_map(|call_id| {
1577            self.pending_approvals.get(call_id).and_then(|pending| {
1578                pending.decision.is_none().then(|| {
1579                    LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1580                        request: pending.request.clone(),
1581                    }))
1582                })
1583            })
1584        })
1585    }
1586
1587    fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1588        let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1589            self.pending_approvals
1590                .get(call_id)
1591                .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1592        })?;
1593        self.pending_approval_order.retain(|id| id != &call_id);
1594        self.pending_approvals.remove(&call_id)
1595    }
1596
1597    fn queue_resolution_interrupt(
1598        &mut self,
1599        turn_id: &agentkit_core::TurnId,
1600        resolution: TaskResolution,
1601    ) -> Option<LoopStep> {
1602        match resolution {
1603            TaskResolution::Item(item) => {
1604                self.append_tool_result_item(item);
1605                None
1606            }
1607            TaskResolution::Approval(task) => {
1608                self.enqueue_pending_approval(turn_id, task);
1609                self.take_next_unsurfaced_approval_interrupt()
1610            }
1611        }
1612    }
1613
1614    async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1615        let PendingLoopUpdates { mut resolutions } = self
1616            .task_manager
1617            .take_pending_loop_updates()
1618            .await
1619            .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1620        let mut saw_items = false;
1621        while let Some(resolution) = resolutions.pop_front() {
1622            match resolution {
1623                TaskResolution::Item(item) => {
1624                    self.append_tool_result_item(item);
1625                    saw_items = true;
1626                }
1627                TaskResolution::Approval(task) => {
1628                    self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1629                }
1630            }
1631        }
1632        Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1633    }
1634
1635    async fn run_mutators(
1636        &mut self,
1637        point: MutationPoint,
1638        turn_id: Option<&agentkit_core::TurnId>,
1639        cancellation: Option<TurnCancellation>,
1640    ) -> Result<(), LoopError> {
1641        if self.mutators.is_empty() {
1642            return Ok(());
1643        }
1644        if cancellation
1645            .as_ref()
1646            .is_some_and(TurnCancellation::is_cancelled)
1647        {
1648            return Err(LoopError::Cancelled);
1649        }
1650        let mutators = self.mutators.clone();
1651        let session_id = self.session_id.clone();
1652        let observers = self.observers.clone();
1653        let emitter = DriverEmitter {
1654            observers: &observers,
1655        };
1656        let mut cursor = TranscriptCursor {
1657            items: &mut self.transcript,
1658            dirty: false,
1659        };
1660        for mutator in &mutators {
1661            if cancellation
1662                .as_ref()
1663                .is_some_and(TurnCancellation::is_cancelled)
1664            {
1665                return Err(LoopError::Cancelled);
1666            }
1667            let ctx = LoopCtx {
1668                session_id: &session_id,
1669                turn_id,
1670                point,
1671                cancellation: cancellation.clone(),
1672                emitter: &emitter,
1673            };
1674            mutator.mutate(&mut cursor, ctx).await?;
1675        }
1676        if cursor.dirty {
1677            validate_transcript_invariants(cursor.items)?;
1678        }
1679        Ok(())
1680    }
1681
1682    async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1683        let Some(_) = self.active_tool_round.as_ref() else {
1684            return Ok(None);
1685        };
1686        loop {
1687            let cancellation = self
1688                .cancellation
1689                .as_ref()
1690                .map(CancellationHandle::checkpoint);
1691            let turn_id = self
1692                .active_tool_round
1693                .as_ref()
1694                .map(|active| active.turn_id.clone())
1695                .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1696
1697            if cancellation
1698                .as_ref()
1699                .is_some_and(TurnCancellation::is_cancelled)
1700            {
1701                self.task_manager
1702                    .on_turn_interrupted(&turn_id)
1703                    .await
1704                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1705                self.active_tool_round = None;
1706                return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1707            }
1708
1709            let next_call = self
1710                .active_tool_round
1711                .as_mut()
1712                .and_then(|active| active.pending_calls.pop_front());
1713            if let Some((_call, tool_request)) = next_call {
1714                use tracing::Instrument;
1715                let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1716                match self
1717                    .start_task_via_manager(
1718                        None,
1719                        tool_request.clone(),
1720                        TaskLaunchKind::Plain,
1721                        cancellation.clone(),
1722                    )
1723                    .instrument(dispatch_span.clone())
1724                    .await?
1725                {
1726                    TaskStartOutcome::Ready(resolution) => {
1727                        let resolution = *resolution;
1728                        match resolution {
1729                            TaskResolution::Item(item) => {
1730                                if tool_result_is_error(&item) {
1731                                    dispatch_span.record("error.type", "tool_error");
1732                                }
1733                                if let Some(active) = self.active_tool_round.as_mut() {
1734                                    active.foreground_progressed = true;
1735                                }
1736                                self.append_tool_result_item(item);
1737                            }
1738                            TaskResolution::Approval(task) => {
1739                                self.enqueue_pending_approval(&turn_id, task);
1740                            }
1741                        }
1742                        continue;
1743                    }
1744                    TaskStartOutcome::Pending { kind, .. } => {
1745                        if kind == agentkit_task_manager::TaskKind::Background
1746                            && let Some(active) = self.active_tool_round.as_mut()
1747                        {
1748                            active.background_pending = true;
1749                        }
1750                        continue;
1751                    }
1752                }
1753            }
1754
1755            match self
1756                .task_manager
1757                .wait_for_turn(&turn_id, cancellation.clone())
1758                .await
1759                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1760            {
1761                Some(TurnTaskUpdate::Resolution(resolution)) => {
1762                    let resolution = *resolution;
1763                    match resolution {
1764                        TaskResolution::Item(item) => {
1765                            if let Some(active) = self.active_tool_round.as_mut() {
1766                                active.foreground_progressed = true;
1767                            }
1768                            self.append_tool_result_item(item);
1769                        }
1770                        TaskResolution::Approval(task) => {
1771                            self.enqueue_pending_approval(&turn_id, task);
1772                        }
1773                    }
1774                }
1775                Some(TurnTaskUpdate::Detached(snapshot)) => {
1776                    // The task was promoted to background. Push a synthetic
1777                    // tool result so the model knows the call is still
1778                    // running and can continue its turn. Track the
1779                    // call_id so when the real result arrives later via
1780                    // the task manager, we route it to a Notification
1781                    // item instead of emitting a second tool_result for
1782                    // the same call_id (which would violate the
1783                    // provider schema — exactly one tool_result per
1784                    // tool_use).
1785                    // Order matters: append the synthetic placeholder FIRST as
1786                    // a real Tool/ToolResult so the tool_use slot is filled
1787                    // (provider schemas require exactly one tool_result per
1788                    // tool_use). Only AFTER appending do we record the
1789                    // call_id in `detached_call_ids` — so the *next* item
1790                    // for this call_id (the real completion arriving later
1791                    // via the task manager) is the one converted to a
1792                    // Notification by `maybe_convert_detached`.
1793                    let detached_call_id = snapshot.call_id.clone();
1794                    self.append_tool_result_item(Item {
1795                        id: None,
1796                        kind: ItemKind::Tool,
1797                        parts: vec![Part::ToolResult(ToolResultPart {
1798                            call_id: detached_call_id.clone(),
1799                            output: ToolOutput::Text(format!(
1800                                "Tool {} is now running in the background. \
1801                                 The result will be delivered when it completes.",
1802                                snapshot.tool_name,
1803                            )),
1804                            is_error: false,
1805                            metadata: MetadataMap::new(),
1806                        })],
1807                        metadata: MetadataMap::new(),
1808                        usage: None,
1809                        finish_reason: None,
1810                        created_at: None,
1811                    });
1812                    self.detached_call_ids.insert(detached_call_id);
1813                    if let Some(active) = self.active_tool_round.as_mut() {
1814                        active.background_pending = true;
1815                        active.foreground_progressed = true;
1816                    }
1817                }
1818                None => {
1819                    if cancellation
1820                        .as_ref()
1821                        .is_some_and(TurnCancellation::is_cancelled)
1822                    {
1823                        self.task_manager
1824                            .on_turn_interrupted(&turn_id)
1825                            .await
1826                            .map_err(|error| {
1827                                LoopError::Tool(ToolError::Internal(error.to_string()))
1828                            })?;
1829                        self.active_tool_round = None;
1830                        return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1831                    }
1832                    let active = self.active_tool_round.take().ok_or_else(|| {
1833                        LoopError::InvalidState("missing active tool round".into())
1834                    })?;
1835                    if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1836                        return Ok(Some(step));
1837                    }
1838                    if let Some(step) = self.next_unresolved_approval_interrupt() {
1839                        return Ok(Some(step));
1840                    }
1841                    if active.background_pending && !active.foreground_progressed {
1842                        return Ok(None);
1843                    }
1844                    // Yield control back to the host between tool rounds.
1845                    // All tool calls in this round have results in the
1846                    // transcript; the transcript is provider-valid.  The
1847                    // host may submit_input before calling next() to
1848                    // resume, which will re-enter drive_turn via
1849                    // pending_round_resume.
1850                    let info = ToolRoundInfo {
1851                        session_id: self.session_id.clone(),
1852                        turn_id: turn_id.clone(),
1853                        transcript_len: self.transcript.len(),
1854                    };
1855                    self.pending_round_resume = Some(turn_id);
1856                    return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1857                        info,
1858                    ))));
1859                }
1860            }
1861        }
1862    }
1863
1864    #[tracing::instrument(
1865        name = "agent.turn",
1866        skip_all,
1867        fields(
1868            otel.name = "invoke_agent",
1869            gen_ai.operation.name = "invoke_agent",
1870            gen_ai.conversation.id = %self.session_id,
1871            gen_ai.provider.name = tracing::field::Empty,
1872            gen_ai.usage.input_tokens = tracing::field::Empty,
1873            gen_ai.usage.output_tokens = tracing::field::Empty,
1874            session.id = %self.session_id,
1875            turn.id = %turn_id,
1876            transcript.len = self.transcript.len(),
1877            saw_tool_call = tracing::field::Empty,
1878            finish_reason = tracing::field::Empty,
1879        ),
1880    )]
1881    async fn drive_turn(
1882        &mut self,
1883        turn_id: agentkit_core::TurnId,
1884        emit_started: bool,
1885    ) -> Result<LoopStep, LoopError> {
1886        if let Some(provider) = &self.provider_name {
1887            tracing::Span::current().record("gen_ai.provider.name", provider.as_str());
1888        }
1889        let cancellation = self
1890            .cancellation
1891            .as_ref()
1892            .map(CancellationHandle::checkpoint);
1893        let mutation_point = if self.pending_round_resume.is_some() {
1894            MutationPoint::AfterToolResult
1895        } else {
1896            MutationPoint::AfterTurnEnded
1897        };
1898        match self
1899            .run_mutators(mutation_point, Some(&turn_id), cancellation.clone())
1900            .await
1901        {
1902            Ok(()) => {}
1903            Err(LoopError::Cancelled) => {
1904                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1905            }
1906            Err(error) => return Err(error),
1907        }
1908        if emit_started {
1909            self.emit(AgentEvent::TurnStarted {
1910                session_id: self.session_id.clone(),
1911                turn_id: turn_id.clone(),
1912            });
1913        }
1914        if cancellation
1915            .as_ref()
1916            .is_some_and(TurnCancellation::is_cancelled)
1917        {
1918            return self.finish_cancelled(turn_id, interrupted_assistant_items());
1919        }
1920
1921        let catalog_events = self.tool_executor.drain_catalog_events();
1922        self.emit_tool_catalog_events(catalog_events);
1923
1924        let request = TurnRequest {
1925            session_id: self.session_id.clone(),
1926            turn_id: turn_id.clone(),
1927            transcript: self.transcript.clone(),
1928            available_tools: self.tool_executor.specs(),
1929            cache: self
1930                .next_turn_cache
1931                .take()
1932                .or_else(|| self.default_cache.clone()),
1933            metadata: MetadataMap::new(),
1934        };
1935
1936        let session = self
1937            .session
1938            .as_mut()
1939            .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1940
1941        // Inference span per the OTel GenAI semantic conventions. It wraps the
1942        // model request and the full event drain rather than just `begin_turn`,
1943        // so attributes that streaming adapters only learn mid-stream (usage,
1944        // stop reason, response identity) still land before the span closes.
1945        // `otel.name` carries the dynamic `chat {model}` span name for
1946        // OpenTelemetry bridges since tracing span names are static.
1947        let chat_span = tracing::info_span!(
1948            "chat",
1949            "otel.name" = tracing::field::Empty,
1950            "otel.kind" = "client",
1951            "gen_ai.operation.name" = "chat",
1952            "gen_ai.provider.name" = tracing::field::Empty,
1953            "gen_ai.conversation.id" = %self.session_id,
1954            "gen_ai.request.model" = tracing::field::Empty,
1955            "gen_ai.response.model" = tracing::field::Empty,
1956            "gen_ai.response.id" = tracing::field::Empty,
1957            "gen_ai.response.finish_reasons" = tracing::field::Empty,
1958            "gen_ai.usage.input_tokens" = tracing::field::Empty,
1959            "gen_ai.usage.output_tokens" = tracing::field::Empty,
1960        );
1961        if let Some(provider) = &self.provider_name {
1962            chat_span.record("gen_ai.provider.name", provider.as_str());
1963        }
1964        match session.model_name() {
1965            Some(model) => {
1966                chat_span.record("gen_ai.request.model", model);
1967                chat_span.record("otel.name", format!("chat {model}").as_str());
1968            }
1969            None => {
1970                chat_span.record("otel.name", "chat");
1971            }
1972        }
1973
1974        use tracing::Instrument;
1975        let mut turn = match session
1976            .begin_turn(request, cancellation.clone())
1977            .instrument(chat_span.clone())
1978            .await
1979        {
1980            Ok(turn) => turn,
1981            Err(LoopError::Cancelled) => {
1982                self.task_manager
1983                    .on_turn_interrupted(&turn_id)
1984                    .await
1985                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1986                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1987            }
1988            Err(error) => return Err(error),
1989        };
1990        let mut saw_tool_call = false;
1991        let mut finished_result = None;
1992
1993        while let Some(event) = match turn
1994            .next_event(cancellation.clone())
1995            .instrument(chat_span.clone())
1996            .await
1997        {
1998            Ok(event) => event,
1999            Err(LoopError::Cancelled) => {
2000                self.task_manager
2001                    .on_turn_interrupted(&turn_id)
2002                    .await
2003                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2004                return self.finish_cancelled(turn_id, interrupted_assistant_items());
2005            }
2006            Err(error) => return Err(error),
2007        } {
2008            if cancellation
2009                .as_ref()
2010                .is_some_and(TurnCancellation::is_cancelled)
2011            {
2012                self.task_manager
2013                    .on_turn_interrupted(&turn_id)
2014                    .await
2015                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
2016                return self.finish_cancelled(turn_id, interrupted_assistant_items());
2017            }
2018            match event {
2019                ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
2020                ModelTurnEvent::Usage(usage) => {
2021                    if let Some(tokens) = &usage.tokens {
2022                        chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2023                        chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2024                    }
2025                    self.emit(AgentEvent::UsageUpdated(usage));
2026                }
2027                ModelTurnEvent::ToolCall(call) => {
2028                    saw_tool_call = true;
2029                    self.emit(AgentEvent::ToolCallRequested(call.clone()));
2030                }
2031                ModelTurnEvent::Finished(result) => {
2032                    finished_result = Some(result);
2033                    break;
2034                }
2035            }
2036        }
2037
2038        let mut result = finished_result.ok_or_else(|| {
2039            LoopError::Provider("model turn ended without a Finished event".into())
2040        })?;
2041        if let Some(model) = &result.model {
2042            chat_span.record("gen_ai.response.model", model.as_str());
2043        }
2044        if let Some(id) = &result.response_id {
2045            chat_span.record("gen_ai.response.id", id.as_str());
2046        }
2047        if let Some(tokens) = result
2048            .usage
2049            .as_ref()
2050            .and_then(|usage| usage.tokens.as_ref())
2051        {
2052            chat_span.record("gen_ai.usage.input_tokens", tokens.input_tokens);
2053            chat_span.record("gen_ai.usage.output_tokens", tokens.output_tokens);
2054        }
2055        chat_span.record(
2056            "gen_ai.response.finish_reasons",
2057            tracing::field::debug(&result.finish_reason),
2058        );
2059        drop(chat_span);
2060        tracing::Span::current().record("saw_tool_call", saw_tool_call);
2061        tracing::Span::current().record(
2062            "finish_reason",
2063            tracing::field::debug(&result.finish_reason),
2064        );
2065        if let Some(tokens) = result
2066            .usage
2067            .as_ref()
2068            .and_then(|usage| usage.tokens.as_ref())
2069        {
2070            tracing::Span::current().record("gen_ai.usage.input_tokens", tokens.input_tokens);
2071            tracing::Span::current().record("gen_ai.usage.output_tokens", tokens.output_tokens);
2072        }
2073        let now = Timestamp::now();
2074        let usage = result.usage.clone();
2075        let finish_reason = result.finish_reason.clone();
2076        let output_items: Vec<Item> = result
2077            .output_items
2078            .drain(..)
2079            .map(|mut item| {
2080                if matches!(item.kind, ItemKind::Assistant) {
2081                    if item.usage.is_none() {
2082                        item.usage = usage.clone();
2083                    }
2084                    if item.finish_reason.is_none() {
2085                        item.finish_reason = Some(finish_reason.clone());
2086                    }
2087                }
2088                if item.created_at.is_none() {
2089                    item.created_at = Some(now);
2090                }
2091                item
2092            })
2093            .collect();
2094        self.extend_transcript(output_items.clone());
2095
2096        if saw_tool_call {
2097            let pending_calls = extract_tool_calls(&output_items)
2098                .into_iter()
2099                .map(|call| {
2100                    let tool_request = ToolRequest {
2101                        call_id: call.id.clone(),
2102                        tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
2103                        input: call.input.clone(),
2104                        session_id: self.session_id.clone(),
2105                        turn_id: turn_id.clone(),
2106                        metadata: call.metadata.clone(),
2107                    };
2108                    (call, tool_request)
2109                })
2110                .collect();
2111            self.active_tool_round = Some(ActiveToolRound {
2112                turn_id: turn_id.clone(),
2113                pending_calls,
2114                background_pending: false,
2115                foreground_progressed: false,
2116            });
2117            if let Some(step) = self.continue_active_tool_round().await? {
2118                return Ok(step);
2119            }
2120            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2121                InputRequest {
2122                    session_id: self.session_id.clone(),
2123                    reason: "driver is waiting for input".into(),
2124                },
2125            )));
2126        }
2127
2128        let turn_result = TurnResult {
2129            turn_id,
2130            finish_reason: result.finish_reason,
2131            items: output_items,
2132            usage: result.usage,
2133            metadata: result.metadata,
2134        };
2135        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2136        Ok(LoopStep::Finished(turn_result))
2137    }
2138
2139    async fn resume_after_approval(
2140        &mut self,
2141        pending: PendingApprovalToolCall,
2142    ) -> Result<LoopStep, LoopError> {
2143        let decision = pending
2144            .decision
2145            .clone()
2146            .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
2147
2148        match decision {
2149            ApprovalDecision::Approve => {
2150                use tracing::Instrument;
2151                let dispatch_span =
2152                    self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
2153                match self
2154                    .start_task_via_manager(
2155                        Some(pending.task_id.clone()),
2156                        pending.tool_request.clone(),
2157                        TaskLaunchKind::Approved(pending.request.clone()),
2158                        self.cancellation
2159                            .as_ref()
2160                            .map(CancellationHandle::checkpoint),
2161                    )
2162                    .instrument(dispatch_span.clone())
2163                    .await?
2164                {
2165                    TaskStartOutcome::Ready(resolution) => {
2166                        let resolution = *resolution;
2167                        if let TaskResolution::Item(item) = &resolution
2168                            && tool_result_is_error(item)
2169                        {
2170                            dispatch_span.record("error.type", "tool_error");
2171                        }
2172                        if let Some(step) =
2173                            self.queue_resolution_interrupt(&pending.turn_id, resolution)
2174                        {
2175                            return Ok(step);
2176                        }
2177                    }
2178                    TaskStartOutcome::Pending { .. } => {}
2179                }
2180            }
2181            ApprovalDecision::Deny { reason } => {
2182                self.append_tool_result_item(Item {
2183                    id: None,
2184                    kind: ItemKind::Tool,
2185                    parts: vec![Part::ToolResult(ToolResultPart {
2186                        call_id: pending.call.id.clone(),
2187                        output: ToolOutput::Text(
2188                            reason.unwrap_or_else(|| "approval denied".into()),
2189                        ),
2190                        is_error: true,
2191                        metadata: pending.call.metadata.clone(),
2192                    })],
2193                    metadata: MetadataMap::new(),
2194                    usage: None,
2195                    finish_reason: None,
2196                    created_at: None,
2197                });
2198            }
2199        }
2200
2201        if let Some(step) = self.continue_active_tool_round().await? {
2202            Ok(step)
2203        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2204            Ok(step)
2205        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
2206            Ok(step)
2207        } else {
2208            self.drive_turn(pending.turn_id, false).await
2209        }
2210    }
2211
2212    fn finish_cancelled(
2213        &mut self,
2214        turn_id: agentkit_core::TurnId,
2215        items: Vec<Item>,
2216    ) -> Result<LoopStep, LoopError> {
2217        self.extend_transcript(items.clone());
2218        let turn_result = TurnResult {
2219            turn_id,
2220            finish_reason: FinishReason::Cancelled,
2221            items,
2222            usage: None,
2223            metadata: interrupted_metadata("turn"),
2224        };
2225        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2226        Ok(LoopStep::Finished(turn_result))
2227    }
2228
2229    /// Internal entry point for buffering user input. Reachable only via
2230    /// [`InputRequest::submit`] (resolves an `AwaitingInput` interrupt,
2231    /// including the very first one after [`Agent::start`]) and
2232    /// [`ToolRoundInfo::submit`] (interjects between tool rounds). Prior
2233    /// transcript items — the passive starting state of a session — are
2234    /// preloaded via [`AgentBuilder::transcript`]; an opening user turn for
2235    /// one-shot calls is preloaded via [`AgentBuilder::input`]. New input
2236    /// after start-up always flows through one of the typed `submit`
2237    /// handles.
2238    pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
2239        if self.has_pending_interrupts() {
2240            return Err(LoopError::InvalidState(
2241                "cannot submit input while an interrupt is pending".into(),
2242            ));
2243        }
2244        self.emit(AgentEvent::InputAccepted {
2245            session_id: self.session_id.clone(),
2246            items: input.clone(),
2247        });
2248        self.pending_input.extend(input);
2249        Ok(())
2250    }
2251
2252    /// Override the prompt cache request for the next model turn.
2253    ///
2254    /// The override is consumed the next time the driver starts a model turn.
2255    /// Session-level defaults still apply to later turns.
2256    pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2257        if self.has_pending_interrupts() {
2258            return Err(LoopError::InvalidState(
2259                "cannot update next-turn cache while an interrupt is pending".into(),
2260            ));
2261        }
2262        self.next_turn_cache = Some(cache);
2263        Ok(())
2264    }
2265
2266    #[cfg(test)]
2267    pub(crate) fn submit_input_with_cache(
2268        &mut self,
2269        input: Vec<Item>,
2270        cache: PromptCacheRequest,
2271    ) -> Result<(), LoopError> {
2272        self.set_next_turn_cache(cache)?;
2273        self.submit_input(input)
2274    }
2275
2276    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`].
2277    ///
2278    /// After calling this, invoke [`next`](LoopDriver::next) to continue the
2279    /// loop.  If the decision is [`ApprovalDecision::Approve`] the tool call
2280    /// executes; if denied, an error result is fed back to the model.
2281    ///
2282    /// # Errors
2283    ///
2284    /// Returns [`LoopError::InvalidState`] if no approval is pending.
2285    pub fn resolve_approval_for(
2286        &mut self,
2287        call_id: ToolCallId,
2288        decision: ApprovalDecision,
2289    ) -> Result<(), LoopError> {
2290        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2291            return Err(LoopError::InvalidState(format!(
2292                "no approval request is pending for call {}",
2293                call_id.0
2294            )));
2295        };
2296        pending.decision = Some(decision.clone());
2297        self.emit(AgentEvent::ApprovalResolved {
2298            approved: matches!(decision, ApprovalDecision::Approve),
2299        });
2300        Ok(())
2301    }
2302
2303    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] with a patched
2304    /// input that replaces the model's original tool arguments.
2305    ///
2306    /// Equivalent to calling [`resolve_approval_for`] with
2307    /// [`ApprovalDecision::Approve`] except the tool sees `input` instead of
2308    /// what the model emitted. The transcript still records the model's
2309    /// original call.
2310    ///
2311    /// # Errors
2312    ///
2313    /// Returns [`LoopError::InvalidState`] if no approval is pending for
2314    /// `call_id`.
2315    pub fn resolve_approval_for_with_patched_input(
2316        &mut self,
2317        call_id: ToolCallId,
2318        input: serde_json::Value,
2319    ) -> Result<(), LoopError> {
2320        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2321            return Err(LoopError::InvalidState(format!(
2322                "no approval request is pending for call {}",
2323                call_id.0
2324            )));
2325        };
2326        pending.tool_request.input = input;
2327        self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2328    }
2329
2330    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] when exactly one
2331    /// approval is outstanding.
2332    pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2333        let mut unresolved = self
2334            .pending_approval_order
2335            .iter()
2336            .filter(|call_id| {
2337                self.pending_approvals
2338                    .get(*call_id)
2339                    .is_some_and(|pending| pending.decision.is_none())
2340            })
2341            .cloned();
2342        let Some(call_id) = unresolved.next() else {
2343            return Err(LoopError::InvalidState(
2344                "no approval request is pending".into(),
2345            ));
2346        };
2347        if unresolved.next().is_some() {
2348            return Err(LoopError::InvalidState(
2349                "multiple approvals are pending; use resolve_approval_for".into(),
2350            ));
2351        }
2352        self.resolve_approval_for(call_id, decision)
2353    }
2354
2355    /// Take a read-only snapshot of the driver's current transcript and input queue.
2356    pub fn snapshot(&self) -> LoopSnapshot {
2357        LoopSnapshot {
2358            session_id: self.session_id.clone(),
2359            transcript: self.transcript.clone(),
2360            pending_input: self.pending_input.clone(),
2361        }
2362    }
2363
2364    /// Advance the loop by one step.
2365    ///
2366    /// This is the main method for driving the agent.  It processes pending
2367    /// interrupt resolutions, consumes queued input, starts a model turn,
2368    /// executes tool calls, and returns once the turn finishes or an
2369    /// interrupt occurs.
2370    ///
2371    /// If no input is queued and no interrupt is pending, returns
2372    /// [`LoopStep::Interrupt(LoopInterrupt::AwaitingInput(..))`](LoopInterrupt::AwaitingInput).
2373    /// This is the steady state after [`Agent::start`] when no input was
2374    /// preloaded via [`AgentBuilder::input`]: the prior transcript loaded
2375    /// via [`AgentBuilder::transcript`] is passive, so the first call
2376    /// surfaces `AwaitingInput` and waits for the host to supply input via
2377    /// [`InputRequest::submit`] before any model turn is dispatched. If
2378    /// input was preloaded, the first call dispatches the model directly.
2379    ///
2380    /// # Errors
2381    ///
2382    /// Returns [`LoopError::InvalidState`] if called while an unresolved
2383    /// interrupt is pending, or propagates provider / tool / compaction errors.
2384    pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2385        if let Some(pending) = self.take_next_resolved_approval() {
2386            return self.resume_after_approval(pending).await;
2387        }
2388
2389        if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2390            return Ok(step);
2391        }
2392
2393        if let Some(step) = self.next_unresolved_approval_interrupt() {
2394            return Ok(step);
2395        }
2396
2397        if let Some(step) = self.continue_active_tool_round().await? {
2398            return Ok(step);
2399        }
2400
2401        let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2402        if let Some(step) = loop_step {
2403            return Ok(step);
2404        }
2405
2406        // Resume after an AfterToolResult yield.  Any input submitted by the
2407        // host during the yield is folded into the transcript as part of the
2408        // continuation turn; background task results drained just above are
2409        // already in the transcript.
2410        if let Some(turn_id) = self.pending_round_resume.take() {
2411            let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2412            self.extend_transcript(drained);
2413            return self.drive_turn(turn_id, false).await;
2414        }
2415
2416        if self.pending_input.is_empty() && !had_loop_updates {
2417            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2418                InputRequest {
2419                    session_id: self.session_id.clone(),
2420                    reason: "driver is waiting for input".into(),
2421                },
2422            )));
2423        }
2424
2425        let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2426        self.next_turn_index += 1;
2427        let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2428        self.extend_transcript(drained);
2429        self.drive_turn(turn_id, true).await
2430    }
2431
2432    fn emit(&self, event: AgentEvent) {
2433        for observer in &self.observers {
2434            observer.handle_event(event.clone());
2435        }
2436    }
2437
2438    /// Append a single [`Item`] to the transcript and notify all
2439    /// registered [`TranscriptObserver`]s. The single mutation point —
2440    /// every push to `self.transcript` should funnel through here so
2441    /// observers see exactly what landed in the transcript.
2442    fn append_item(&mut self, mut item: Item) {
2443        if item.created_at.is_none() {
2444            item.created_at = Some(Timestamp::now());
2445        }
2446        for observer in &self.transcript_observers {
2447            observer.on_item_appended(&item);
2448        }
2449        self.transcript.push(item);
2450    }
2451
2452    /// Append a tool-result Item: emit one [`AgentEvent::ToolResultReceived`]
2453    /// per [`Part::ToolResult`] inside the Item, then funnel through
2454    /// [`Self::append_item`].
2455    ///
2456    /// If every `ToolResult` in the item references a `call_id` that was
2457    /// already paired with a synthetic detach tool_result, the item is
2458    /// converted to a [`ItemKind::Notification`] before appending.
2459    /// Without this, we would emit a second `tool_result` for the same
2460    /// `tool_use_id` — a provider-schema violation that
2461    /// Anthropic/OpenRouter reject as an "orphaned tool_result".
2462    /// Observers still see `ToolResultReceived` for each result so any
2463    /// UI spinner or task tracker can close.
2464    fn append_tool_result_item(&mut self, item: Item) {
2465        for part in &item.parts {
2466            if let Part::ToolResult(result) = part {
2467                self.emit(AgentEvent::ToolResultReceived(result.clone()));
2468            }
2469        }
2470        let item = self.maybe_convert_detached(item);
2471        self.append_item(item);
2472    }
2473
2474    fn maybe_convert_detached(&mut self, item: Item) -> Item {
2475        if !matches!(item.kind, ItemKind::Tool) {
2476            return item;
2477        }
2478        let results: Vec<&ToolResultPart> = item
2479            .parts
2480            .iter()
2481            .filter_map(|p| match p {
2482                Part::ToolResult(r) => Some(r),
2483                _ => None,
2484            })
2485            .collect();
2486        if results.is_empty()
2487            || !results
2488                .iter()
2489                .all(|r| self.detached_call_ids.contains(&r.call_id))
2490        {
2491            return item;
2492        }
2493        let mut text = String::new();
2494        for result in &results {
2495            self.detached_call_ids.remove(&result.call_id);
2496            if !text.is_empty() {
2497                text.push_str("\n\n");
2498            }
2499            let label = if result.is_error {
2500                "failed"
2501            } else {
2502                "completed"
2503            };
2504            let body = render_tool_output_brief(&result.output);
2505            text.push_str(&format!(
2506                "Background tool call {} {}: {body}",
2507                result.call_id.0, label
2508            ));
2509        }
2510        Item::notification(text)
2511    }
2512
2513    /// Append several Items in order through [`Self::append_item`].
2514    /// Pre-stamps `created_at` once per batch so all items in the batch
2515    /// share a timestamp and `append_item` skips its own clock read.
2516    fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2517        let now = Timestamp::now();
2518        for mut item in items {
2519            if item.created_at.is_none() {
2520                item.created_at = Some(now);
2521            }
2522            self.append_item(item);
2523        }
2524    }
2525}
2526
2527fn render_tool_output_brief(output: &ToolOutput) -> String {
2528    match output {
2529        ToolOutput::Text(t) => t.clone(),
2530        ToolOutput::Structured(value) => value.to_string(),
2531        ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2532        ToolOutput::Files(files) => format!("[{} files]", files.len()),
2533    }
2534}
2535
2536fn interrupted_metadata(stage: &str) -> MetadataMap {
2537    let mut metadata = MetadataMap::new();
2538    metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2539    metadata.insert(
2540        INTERRUPT_REASON_METADATA_KEY.into(),
2541        USER_CANCELLED_REASON.into(),
2542    );
2543    metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2544    metadata
2545}
2546
2547fn interrupted_assistant_items() -> Vec<Item> {
2548    vec![Item {
2549        id: None,
2550        kind: ItemKind::Assistant,
2551        parts: vec![Part::Text(TextPart {
2552            text: "Previous assistant response was interrupted by the user before completion."
2553                .into(),
2554            metadata: interrupted_metadata("assistant"),
2555        })],
2556        metadata: interrupted_metadata("assistant"),
2557        usage: None,
2558        finish_reason: None,
2559        created_at: None,
2560    }]
2561}
2562
2563fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2564    let mut calls = Vec::new();
2565    for item in items {
2566        for part in &item.parts {
2567            if let Part::ToolCall(call) = part {
2568                calls.push(call.clone());
2569            }
2570        }
2571    }
2572    calls
2573}
2574
2575fn tool_result_is_error(item: &Item) -> bool {
2576    item.parts
2577        .iter()
2578        .any(|part| matches!(part, Part::ToolResult(result) if result.is_error))
2579}
2580
2581/// Errors that can occur while driving the agent loop.
2582#[derive(Debug, Error)]
2583pub enum LoopError {
2584    /// The driver was in an unexpected state for the requested operation.
2585    #[error("invalid driver state: {0}")]
2586    InvalidState(String),
2587    /// The current turn was cancelled via the [`CancellationHandle`].
2588    #[error("turn cancelled")]
2589    Cancelled,
2590    /// An error originating from the model provider.
2591    #[error("provider error: {0}")]
2592    Provider(String),
2593    /// An error originating from tool execution.
2594    #[error("tool error: {0}")]
2595    Tool(#[from] ToolError),
2596    /// An error reported by a [`LoopMutator`] (compaction, redaction, repair).
2597    #[error("mutator error: {0}")]
2598    Mutator(String),
2599    /// The requested operation is not supported.
2600    #[error("unsupported operation: {0}")]
2601    Unsupported(String),
2602}
2603
2604/// Internal [`EventEmitter`] backed by the driver's observer slice. Lives
2605/// only for the duration of a [`LoopDriver::run_mutators`] call so the
2606/// borrow against `self.observers` stays disjoint from the cursor's borrow
2607/// of `self.transcript`.
2608struct DriverEmitter<'a> {
2609    observers: &'a [Arc<dyn LoopObserver>],
2610}
2611
2612impl<'a> EventEmitter for DriverEmitter<'a> {
2613    fn emit(&self, event: AgentEvent) {
2614        for observer in self.observers {
2615            observer.handle_event(event.clone());
2616        }
2617    }
2618}
2619
2620/// Hard-fails when a mutator's edit leaves the transcript protocol-invalid.
2621/// The only invariant currently checked is tool_use ↔ tool_result pairing
2622/// — every [`Part::ToolCall`] must be followed (in transcript order) by a
2623/// matching [`Part::ToolResult`] with the same `call_id`.
2624fn validate_transcript_invariants(transcript: &[Item]) -> Result<(), LoopError> {
2625    let mut pending: HashSet<ToolCallId> = HashSet::new();
2626    let mut seen_calls: HashSet<ToolCallId> = HashSet::new();
2627    let mut seen_results: HashSet<ToolCallId> = HashSet::new();
2628    for item in transcript {
2629        for part in &item.parts {
2630            match part {
2631                Part::ToolCall(call) => {
2632                    if !seen_calls.insert(call.id.clone()) {
2633                        return Err(LoopError::Mutator(format!(
2634                            "transcript invariant violation: duplicate tool_use: {}",
2635                            call.id.0
2636                        )));
2637                    }
2638                    pending.insert(call.id.clone());
2639                }
2640                Part::ToolResult(result) => {
2641                    if !pending.remove(&result.call_id) {
2642                        let kind = if seen_results.contains(&result.call_id) {
2643                            "duplicate"
2644                        } else {
2645                            "orphaned"
2646                        };
2647                        return Err(LoopError::Mutator(format!(
2648                            "transcript invariant violation: {kind} tool_result: {}",
2649                            result.call_id.0
2650                        )));
2651                    }
2652                    seen_results.insert(result.call_id.clone());
2653                }
2654                _ => {}
2655            }
2656        }
2657    }
2658    if !pending.is_empty() {
2659        let missing: Vec<String> = pending.into_iter().map(|id| id.0).collect();
2660        return Err(LoopError::Mutator(format!(
2661            "transcript invariant violation: tool_use(s) without matching tool_result: {}",
2662            missing.join(", ")
2663        )));
2664    }
2665    Ok(())
2666}
2667
2668#[cfg(test)]
2669mod tests {
2670    use std::collections::VecDeque;
2671    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2672    use std::sync::{Arc as StdArc, Mutex as StdMutex};
2673
2674    use agentkit_core::{
2675        CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolCallPart, ToolOutput,
2676        ToolResultPart,
2677    };
2678    use agentkit_task_manager::{
2679        AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2680        TaskRoutingPolicy,
2681    };
2682    use agentkit_tools_core::{
2683        FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2684        ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2685        ToolResult, ToolSpec,
2686    };
2687    use serde_json::{Value, json};
2688    use tokio::sync::Notify;
2689    use tokio::time::{Duration, timeout};
2690
2691    use super::*;
2692
2693    struct FakeAdapter;
2694    struct SlowAdapter;
2695    struct RecordingAdapter {
2696        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2697        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2698    }
2699    struct MultiToolAdapter;
2700    struct DualApprovalAdapter;
2701
2702    struct FakeSession;
2703    struct SlowSession;
2704    struct RecordingSession {
2705        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2706        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2707    }
2708    struct MultiToolSession;
2709    struct DualApprovalSession;
2710
2711    struct FakeTurn {
2712        events: VecDeque<ModelTurnEvent>,
2713    }
2714
2715    struct SlowTurn {
2716        emitted: bool,
2717    }
2718
2719    struct RecordingTurn {
2720        emitted: bool,
2721    }
2722    struct MultiToolTurn {
2723        events: VecDeque<ModelTurnEvent>,
2724    }
2725    struct DualApprovalTurn {
2726        events: VecDeque<ModelTurnEvent>,
2727    }
2728
2729    #[async_trait]
2730    impl ModelAdapter for FakeAdapter {
2731        type Session = FakeSession;
2732
2733        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2734            Ok(FakeSession)
2735        }
2736    }
2737
2738    #[async_trait]
2739    impl ModelAdapter for SlowAdapter {
2740        type Session = SlowSession;
2741
2742        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2743            Ok(SlowSession)
2744        }
2745    }
2746
2747    #[async_trait]
2748    impl ModelAdapter for RecordingAdapter {
2749        type Session = RecordingSession;
2750
2751        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2752            Ok(RecordingSession {
2753                seen_descriptions: self.seen_descriptions.clone(),
2754                seen_caches: self.seen_caches.clone(),
2755            })
2756        }
2757    }
2758
2759    #[async_trait]
2760    impl ModelAdapter for MultiToolAdapter {
2761        type Session = MultiToolSession;
2762
2763        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2764            Ok(MultiToolSession)
2765        }
2766    }
2767
2768    #[async_trait]
2769    impl ModelAdapter for DualApprovalAdapter {
2770        type Session = DualApprovalSession;
2771
2772        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2773            Ok(DualApprovalSession)
2774        }
2775    }
2776
2777    #[async_trait]
2778    impl ModelSession for FakeSession {
2779        type Turn = FakeTurn;
2780
2781        async fn begin_turn(
2782            &mut self,
2783            request: TurnRequest,
2784            _cancellation: Option<TurnCancellation>,
2785        ) -> Result<Self::Turn, LoopError> {
2786            let has_tool_result = request.transcript.iter().any(|item| {
2787                item.kind == ItemKind::Tool
2788                    && item
2789                        .parts
2790                        .iter()
2791                        .any(|part| matches!(part, Part::ToolResult(_)))
2792            });
2793            let tool_name = request
2794                .available_tools
2795                .first()
2796                .map(|tool| tool.name.0.clone())
2797                .unwrap_or_else(|| "echo".into());
2798
2799            let events = if has_tool_result {
2800                let result_text = request
2801                    .transcript
2802                    .iter()
2803                    .rev()
2804                    .find_map(|item| {
2805                        item.parts.iter().find_map(|part| match part {
2806                            Part::ToolResult(ToolResultPart {
2807                                output: ToolOutput::Text(text),
2808                                ..
2809                            }) => Some(text.clone()),
2810                            _ => None,
2811                        })
2812                    })
2813                    .unwrap_or_else(|| "missing".into());
2814
2815                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2816                    model: None,
2817                    response_id: None,
2818                    finish_reason: FinishReason::Completed,
2819                    output_items: vec![Item {
2820                        id: None,
2821                        kind: ItemKind::Assistant,
2822                        parts: vec![Part::Text(TextPart {
2823                            text: format!("tool said: {result_text}"),
2824                            metadata: MetadataMap::new(),
2825                        })],
2826                        metadata: MetadataMap::new(),
2827                        usage: None,
2828                        finish_reason: None,
2829                        created_at: None,
2830                    }],
2831                    usage: None,
2832                    metadata: MetadataMap::new(),
2833                })])
2834            } else {
2835                VecDeque::from([
2836                    ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2837                        id: ToolCallId::new("call-1"),
2838                        name: tool_name.clone(),
2839                        input: json!({ "value": "pong" }),
2840                        metadata: MetadataMap::new(),
2841                    }),
2842                    ModelTurnEvent::Finished(ModelTurnResult {
2843                        model: None,
2844                        response_id: None,
2845                        finish_reason: FinishReason::ToolCall,
2846                        output_items: vec![Item {
2847                            id: None,
2848                            kind: ItemKind::Assistant,
2849                            parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2850                                id: ToolCallId::new("call-1"),
2851                                name: tool_name,
2852                                input: json!({ "value": "pong" }),
2853                                metadata: MetadataMap::new(),
2854                            })],
2855                            metadata: MetadataMap::new(),
2856                            usage: None,
2857                            finish_reason: None,
2858                            created_at: None,
2859                        }],
2860                        usage: None,
2861                        metadata: MetadataMap::new(),
2862                    }),
2863                ])
2864            };
2865
2866            Ok(FakeTurn { events })
2867        }
2868    }
2869
2870    #[async_trait]
2871    impl ModelSession for SlowSession {
2872        type Turn = SlowTurn;
2873
2874        async fn begin_turn(
2875            &mut self,
2876            request: TurnRequest,
2877            cancellation: Option<TurnCancellation>,
2878        ) -> Result<Self::Turn, LoopError> {
2879            let should_block = request
2880                .transcript
2881                .iter()
2882                .rev()
2883                .find(|item| item.kind == ItemKind::User)
2884                .is_some_and(|item| {
2885                    item.parts.iter().any(|part| match part {
2886                        Part::Text(text) => text.text == "do the long task",
2887                        _ => false,
2888                    })
2889                });
2890
2891            if should_block && let Some(cancellation) = cancellation {
2892                cancellation.cancelled().await;
2893                return Err(LoopError::Cancelled);
2894            }
2895
2896            Ok(SlowTurn { emitted: false })
2897        }
2898    }
2899
2900    #[async_trait]
2901    impl ModelSession for RecordingSession {
2902        type Turn = RecordingTurn;
2903
2904        async fn begin_turn(
2905            &mut self,
2906            request: TurnRequest,
2907            _cancellation: Option<TurnCancellation>,
2908        ) -> Result<Self::Turn, LoopError> {
2909            let descriptions = request
2910                .available_tools
2911                .iter()
2912                .map(|tool| tool.description.clone())
2913                .collect::<Vec<_>>();
2914            self.seen_descriptions.lock().unwrap().push(descriptions);
2915            self.seen_caches.lock().unwrap().push(request.cache.clone());
2916
2917            Ok(RecordingTurn { emitted: false })
2918        }
2919    }
2920
2921    #[async_trait]
2922    impl ModelSession for MultiToolSession {
2923        type Turn = MultiToolTurn;
2924
2925        async fn begin_turn(
2926            &mut self,
2927            request: TurnRequest,
2928            _cancellation: Option<TurnCancellation>,
2929        ) -> Result<Self::Turn, LoopError> {
2930            let has_tool_result = request.transcript.iter().any(|item| {
2931                item.kind == ItemKind::Tool
2932                    && item
2933                        .parts
2934                        .iter()
2935                        .any(|part| matches!(part, Part::ToolResult(_)))
2936            });
2937
2938            let events = if has_tool_result {
2939                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2940                    model: None,
2941                    response_id: None,
2942                    finish_reason: FinishReason::Completed,
2943                    output_items: vec![Item {
2944                        id: None,
2945                        kind: ItemKind::Assistant,
2946                        parts: vec![Part::Text(TextPart {
2947                            text: "mixed tools finished".into(),
2948                            metadata: MetadataMap::new(),
2949                        })],
2950                        metadata: MetadataMap::new(),
2951                        usage: None,
2952                        finish_reason: None,
2953                        created_at: None,
2954                    }],
2955                    usage: None,
2956                    metadata: MetadataMap::new(),
2957                })])
2958            } else {
2959                let foreground = agentkit_core::ToolCallPart {
2960                    id: ToolCallId::new("call-foreground"),
2961                    name: "foreground-wait".into(),
2962                    input: json!({}),
2963                    metadata: MetadataMap::new(),
2964                };
2965                let background = agentkit_core::ToolCallPart {
2966                    id: ToolCallId::new("call-background"),
2967                    name: "background-wait".into(),
2968                    input: json!({}),
2969                    metadata: MetadataMap::new(),
2970                };
2971                VecDeque::from([
2972                    ModelTurnEvent::ToolCall(foreground.clone()),
2973                    ModelTurnEvent::ToolCall(background.clone()),
2974                    ModelTurnEvent::Finished(ModelTurnResult {
2975                        model: None,
2976                        response_id: None,
2977                        finish_reason: FinishReason::ToolCall,
2978                        output_items: vec![Item {
2979                            id: None,
2980                            kind: ItemKind::Assistant,
2981                            parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2982                            metadata: MetadataMap::new(),
2983                            usage: None,
2984                            finish_reason: None,
2985                            created_at: None,
2986                        }],
2987                        usage: None,
2988                        metadata: MetadataMap::new(),
2989                    }),
2990                ])
2991            };
2992
2993            Ok(MultiToolTurn { events })
2994        }
2995    }
2996
2997    #[async_trait]
2998    impl ModelSession for DualApprovalSession {
2999        type Turn = DualApprovalTurn;
3000
3001        async fn begin_turn(
3002            &mut self,
3003            request: TurnRequest,
3004            _cancellation: Option<TurnCancellation>,
3005        ) -> Result<Self::Turn, LoopError> {
3006            let tool_results = request
3007                .transcript
3008                .iter()
3009                .flat_map(|item| item.parts.iter())
3010                .filter(|part| matches!(part, Part::ToolResult(_)))
3011                .count();
3012
3013            let events = if tool_results >= 2 {
3014                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
3015                    model: None,
3016                    response_id: None,
3017                    finish_reason: FinishReason::Completed,
3018                    output_items: vec![Item {
3019                        id: None,
3020                        kind: ItemKind::Assistant,
3021                        parts: vec![Part::Text(TextPart {
3022                            text: "both approvals finished".into(),
3023                            metadata: MetadataMap::new(),
3024                        })],
3025                        metadata: MetadataMap::new(),
3026                        usage: None,
3027                        finish_reason: None,
3028                        created_at: None,
3029                    }],
3030                    usage: None,
3031                    metadata: MetadataMap::new(),
3032                })])
3033            } else {
3034                let first = agentkit_core::ToolCallPart {
3035                    id: ToolCallId::new("call-1"),
3036                    name: "echo".into(),
3037                    input: json!({ "value": "first" }),
3038                    metadata: MetadataMap::new(),
3039                };
3040                let second = agentkit_core::ToolCallPart {
3041                    id: ToolCallId::new("call-2"),
3042                    name: "echo".into(),
3043                    input: json!({ "value": "second" }),
3044                    metadata: MetadataMap::new(),
3045                };
3046                VecDeque::from([
3047                    ModelTurnEvent::ToolCall(first.clone()),
3048                    ModelTurnEvent::ToolCall(second.clone()),
3049                    ModelTurnEvent::Finished(ModelTurnResult {
3050                        model: None,
3051                        response_id: None,
3052                        finish_reason: FinishReason::ToolCall,
3053                        output_items: vec![Item {
3054                            id: None,
3055                            kind: ItemKind::Assistant,
3056                            parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
3057                            metadata: MetadataMap::new(),
3058                            usage: None,
3059                            finish_reason: None,
3060                            created_at: None,
3061                        }],
3062                        usage: None,
3063                        metadata: MetadataMap::new(),
3064                    }),
3065                ])
3066            };
3067
3068            Ok(DualApprovalTurn { events })
3069        }
3070    }
3071
3072    #[async_trait]
3073    impl ModelTurn for FakeTurn {
3074        async fn next_event(
3075            &mut self,
3076            _cancellation: Option<TurnCancellation>,
3077        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3078            Ok(self.events.pop_front())
3079        }
3080    }
3081
3082    #[async_trait]
3083    impl ModelTurn for SlowTurn {
3084        async fn next_event(
3085            &mut self,
3086            cancellation: Option<TurnCancellation>,
3087        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3088            if let Some(cancellation) = cancellation
3089                && cancellation.is_cancelled()
3090            {
3091                return Err(LoopError::Cancelled);
3092            }
3093
3094            if self.emitted {
3095                Ok(None)
3096            } else {
3097                self.emitted = true;
3098                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3099                    model: None,
3100                    response_id: None,
3101                    finish_reason: FinishReason::Completed,
3102                    output_items: vec![Item {
3103                        id: None,
3104                        kind: ItemKind::Assistant,
3105                        parts: vec![Part::Text(TextPart {
3106                            text: "done".into(),
3107                            metadata: MetadataMap::new(),
3108                        })],
3109                        metadata: MetadataMap::new(),
3110                        usage: None,
3111                        finish_reason: None,
3112                        created_at: None,
3113                    }],
3114                    usage: None,
3115                    metadata: MetadataMap::new(),
3116                })))
3117            }
3118        }
3119    }
3120
3121    #[async_trait]
3122    impl ModelTurn for RecordingTurn {
3123        async fn next_event(
3124            &mut self,
3125            _cancellation: Option<TurnCancellation>,
3126        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3127            if self.emitted {
3128                Ok(None)
3129            } else {
3130                self.emitted = true;
3131                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
3132                    model: None,
3133                    response_id: None,
3134                    finish_reason: FinishReason::Completed,
3135                    output_items: vec![Item {
3136                        id: None,
3137                        kind: ItemKind::Assistant,
3138                        parts: vec![Part::Text(TextPart {
3139                            text: "done".into(),
3140                            metadata: MetadataMap::new(),
3141                        })],
3142                        metadata: MetadataMap::new(),
3143                        usage: None,
3144                        finish_reason: None,
3145                        created_at: None,
3146                    }],
3147                    usage: None,
3148                    metadata: MetadataMap::new(),
3149                })))
3150            }
3151        }
3152    }
3153
3154    #[async_trait]
3155    impl ModelTurn for MultiToolTurn {
3156        async fn next_event(
3157            &mut self,
3158            _cancellation: Option<TurnCancellation>,
3159        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3160            Ok(self.events.pop_front())
3161        }
3162    }
3163
3164    #[async_trait]
3165    impl ModelTurn for DualApprovalTurn {
3166        async fn next_event(
3167            &mut self,
3168            _cancellation: Option<TurnCancellation>,
3169        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3170            Ok(self.events.pop_front())
3171        }
3172    }
3173
3174    #[derive(Clone)]
3175    struct EchoTool {
3176        spec: ToolSpec,
3177    }
3178
3179    impl Default for EchoTool {
3180        fn default() -> Self {
3181            Self {
3182                spec: ToolSpec {
3183                    name: ToolName::new("echo"),
3184                    description: "Echo back a value".into(),
3185                    input_schema: json!({
3186                        "type": "object",
3187                        "properties": {
3188                            "value": { "type": "string" }
3189                        },
3190                        "required": ["value"],
3191                        "additionalProperties": false
3192                    }),
3193                    output_schema: None,
3194                    annotations: ToolAnnotations::default(),
3195                    metadata: MetadataMap::new(),
3196                },
3197            }
3198        }
3199    }
3200
3201    #[derive(Clone)]
3202    struct DynamicSpecTool {
3203        spec: ToolSpec,
3204        version: StdArc<AtomicUsize>,
3205    }
3206
3207    impl DynamicSpecTool {
3208        fn new(version: StdArc<AtomicUsize>) -> Self {
3209            Self {
3210                spec: ToolSpec {
3211                    name: ToolName::new("dynamic"),
3212                    description: "dynamic version 0".into(),
3213                    input_schema: json!({
3214                        "type": "object",
3215                        "properties": {},
3216                        "additionalProperties": false
3217                    }),
3218                    output_schema: None,
3219                    annotations: ToolAnnotations::default(),
3220                    metadata: MetadataMap::new(),
3221                },
3222                version,
3223            }
3224        }
3225    }
3226
3227    #[async_trait]
3228    impl Tool for EchoTool {
3229        fn spec(&self) -> &ToolSpec {
3230            &self.spec
3231        }
3232
3233        fn proposed_requests(
3234            &self,
3235            request: &agentkit_tools_core::ToolRequest,
3236        ) -> Result<
3237            Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
3238            agentkit_tools_core::ToolError,
3239        > {
3240            Ok(vec![Box::new(FileSystemPermissionRequest::Read {
3241                path: "/tmp/echo".into(),
3242                metadata: request.metadata.clone(),
3243            })])
3244        }
3245
3246        async fn invoke(
3247            &self,
3248            request: agentkit_tools_core::ToolRequest,
3249            _ctx: &mut ToolContext<'_>,
3250        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3251            let value = request
3252                .input
3253                .get("value")
3254                .and_then(Value::as_str)
3255                .ok_or_else(|| {
3256                    agentkit_tools_core::ToolError::InvalidInput("missing value".into())
3257                })?;
3258
3259            Ok(ToolResult {
3260                result: ToolResultPart {
3261                    call_id: request.call_id,
3262                    output: ToolOutput::Text(value.into()),
3263                    is_error: false,
3264                    metadata: MetadataMap::new(),
3265                },
3266                duration: None,
3267                metadata: MetadataMap::new(),
3268            })
3269        }
3270    }
3271
3272    #[async_trait]
3273    impl Tool for DynamicSpecTool {
3274        fn spec(&self) -> &ToolSpec {
3275            &self.spec
3276        }
3277
3278        fn current_spec(&self) -> Option<ToolSpec> {
3279            let mut spec = self.spec.clone();
3280            spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
3281            Some(spec)
3282        }
3283
3284        async fn invoke(
3285            &self,
3286            request: agentkit_tools_core::ToolRequest,
3287            _ctx: &mut ToolContext<'_>,
3288        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3289            Ok(ToolResult {
3290                result: ToolResultPart {
3291                    call_id: request.call_id,
3292                    output: ToolOutput::Text("ok".into()),
3293                    is_error: false,
3294                    metadata: MetadataMap::new(),
3295                },
3296                duration: None,
3297                metadata: MetadataMap::new(),
3298            })
3299        }
3300    }
3301
3302    struct DenyFsReads;
3303
3304    impl PermissionChecker for DenyFsReads {
3305        fn evaluate(
3306            &self,
3307            request: &dyn agentkit_tools_core::PermissionRequest,
3308        ) -> PermissionDecision {
3309            if request.kind() == "filesystem.read" {
3310                return PermissionDecision::Deny(PermissionDenial {
3311                    code: PermissionCode::PathNotAllowed,
3312                    message: "reads denied in test".into(),
3313                    metadata: MetadataMap::new(),
3314                });
3315            }
3316
3317            PermissionDecision::Allow
3318        }
3319    }
3320
3321    struct ApproveFsReads;
3322
3323    impl PermissionChecker for ApproveFsReads {
3324        fn evaluate(
3325            &self,
3326            request: &dyn agentkit_tools_core::PermissionRequest,
3327        ) -> PermissionDecision {
3328            if request.kind() == "filesystem.read" {
3329                return PermissionDecision::RequireApproval(ApprovalRequest {
3330                    task_id: None,
3331                    call_id: None,
3332                    id: "approval:fs-read".into(),
3333                    request_kind: request.kind().into(),
3334                    reason: agentkit_tools_core::ApprovalReason::SensitivePath,
3335                    summary: request.summary(),
3336                    metadata: request.metadata().clone(),
3337                });
3338            }
3339
3340            PermissionDecision::Allow
3341        }
3342    }
3343
3344    struct KeepRecentMutator {
3345        keep: usize,
3346    }
3347
3348    #[async_trait]
3349    impl LoopMutator for KeepRecentMutator {
3350        async fn mutate(
3351            &self,
3352            cursor: &mut TranscriptCursor<'_>,
3353            ctx: LoopCtx<'_>,
3354        ) -> Result<(), LoopError> {
3355            if cursor.len() < 2 {
3356                return Ok(());
3357            }
3358            let drop = cursor.len().saturating_sub(self.keep);
3359            ctx.emitter.emit(AgentEvent::MutationStarted {
3360                session_id: ctx.session_id.clone(),
3361                turn_id: ctx.turn_id.cloned(),
3362                mutator: "keep-recent".into(),
3363                point: ctx.point,
3364            });
3365            cursor.drain(..drop);
3366            ctx.emitter.emit(AgentEvent::MutationFinished {
3367                session_id: ctx.session_id.clone(),
3368                turn_id: ctx.turn_id.cloned(),
3369                mutator: "keep-recent".into(),
3370                dirty: true,
3371                metadata: MetadataMap::new(),
3372            });
3373            Ok(())
3374        }
3375    }
3376
3377    struct RecordingObserver {
3378        events: StdArc<StdMutex<Vec<AgentEvent>>>,
3379    }
3380
3381    impl LoopObserver for RecordingObserver {
3382        fn handle_event(&self, event: AgentEvent) {
3383            self.events.lock().unwrap().push(event);
3384        }
3385    }
3386
3387    struct CatalogExecutor {
3388        version: AtomicUsize,
3389        events: StdMutex<Vec<ToolCatalogEvent>>,
3390    }
3391
3392    impl CatalogExecutor {
3393        fn new() -> Self {
3394            Self {
3395                version: AtomicUsize::new(0),
3396                events: StdMutex::new(Vec::new()),
3397            }
3398        }
3399
3400        fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3401            self.version.store(version, Ordering::SeqCst);
3402            self.events.lock().unwrap().push(event);
3403        }
3404    }
3405
3406    #[async_trait]
3407    impl ToolExecutor for CatalogExecutor {
3408        fn specs(&self) -> Vec<ToolSpec> {
3409            vec![ToolSpec {
3410                name: ToolName::new("dynamic"),
3411                description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3412                input_schema: json!({
3413                    "type": "object",
3414                    "properties": {},
3415                    "additionalProperties": false
3416                }),
3417                output_schema: None,
3418                annotations: ToolAnnotations::default(),
3419                metadata: MetadataMap::new(),
3420            }]
3421        }
3422
3423        fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3424            std::mem::take(&mut *self.events.lock().unwrap())
3425        }
3426
3427        async fn execute(
3428            &self,
3429            request: ToolRequest,
3430            _ctx: &mut ToolContext<'_>,
3431        ) -> ToolExecutionOutcome {
3432            ToolExecutionOutcome::Completed(ToolResult {
3433                result: ToolResultPart {
3434                    call_id: request.call_id,
3435                    output: ToolOutput::Text("dynamic-ok".into()),
3436                    is_error: false,
3437                    metadata: MetadataMap::new(),
3438                },
3439                duration: None,
3440                metadata: MetadataMap::new(),
3441            })
3442        }
3443    }
3444
3445    #[derive(Clone)]
3446    struct BlockingTool {
3447        spec: ToolSpec,
3448        entered: StdArc<AtomicBool>,
3449        release: StdArc<Notify>,
3450        output: &'static str,
3451    }
3452
3453    impl BlockingTool {
3454        fn new(
3455            name: &str,
3456            entered: StdArc<AtomicBool>,
3457            release: StdArc<Notify>,
3458            output: &'static str,
3459        ) -> Self {
3460            Self {
3461                spec: ToolSpec {
3462                    name: ToolName::new(name),
3463                    description: format!("blocking tool {name}"),
3464                    input_schema: json!({
3465                        "type": "object",
3466                        "properties": {},
3467                        "additionalProperties": false
3468                    }),
3469                    output_schema: None,
3470                    annotations: ToolAnnotations::default(),
3471                    metadata: MetadataMap::new(),
3472                },
3473                entered,
3474                release,
3475                output,
3476            }
3477        }
3478    }
3479
3480    #[async_trait]
3481    impl Tool for BlockingTool {
3482        fn spec(&self) -> &ToolSpec {
3483            &self.spec
3484        }
3485
3486        async fn invoke(
3487            &self,
3488            request: agentkit_tools_core::ToolRequest,
3489            _ctx: &mut ToolContext<'_>,
3490        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3491            self.entered.store(true, Ordering::SeqCst);
3492            self.release.notified().await;
3493            Ok(ToolResult {
3494                result: ToolResultPart {
3495                    call_id: request.call_id,
3496                    output: ToolOutput::Text(self.output.into()),
3497                    is_error: false,
3498                    metadata: MetadataMap::new(),
3499                },
3500                duration: None,
3501                metadata: MetadataMap::new(),
3502            })
3503        }
3504    }
3505
3506    struct NameRoutingPolicy {
3507        routes: Vec<(String, RoutingDecision)>,
3508    }
3509
3510    impl NameRoutingPolicy {
3511        fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3512            Self {
3513                routes: routes
3514                    .into_iter()
3515                    .map(|(name, decision)| (name.into(), decision))
3516                    .collect(),
3517            }
3518        }
3519    }
3520
3521    impl TaskRoutingPolicy for NameRoutingPolicy {
3522        fn route(&self, request: &ToolRequest) -> RoutingDecision {
3523            self.routes
3524                .iter()
3525                .find(|(name, _)| name == &request.tool_name.0)
3526                .map(|(_, decision)| *decision)
3527                .unwrap_or(RoutingDecision::Foreground)
3528        }
3529    }
3530
3531    async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3532        timeout(Duration::from_secs(1), handle.next_event())
3533            .await
3534            .expect("timed out waiting for task event")
3535            .expect("task event stream ended unexpectedly")
3536    }
3537
3538    async fn wait_until_entered(flag: &AtomicBool) {
3539        timeout(Duration::from_secs(1), async {
3540            while !flag.load(Ordering::SeqCst) {
3541                tokio::task::yield_now().await;
3542            }
3543        })
3544        .await
3545        .expect("task never entered execution");
3546    }
3547
3548    #[tokio::test]
3549    async fn loop_continues_after_completed_tool_call() {
3550        let tools = ToolRegistry::new().with(EchoTool::default());
3551        let agent = Agent::builder()
3552            .model(FakeAdapter)
3553            .add_tool_source(tools)
3554            .permissions(AllowAllPermissions)
3555            .build()
3556            .unwrap();
3557
3558        let mut driver = agent
3559            .start(SessionConfig {
3560                session_id: SessionId::new("session-1"),
3561                metadata: MetadataMap::new(),
3562                cache: None,
3563            })
3564            .await
3565            .unwrap();
3566
3567        driver
3568            .submit_input(vec![Item {
3569                id: None,
3570                kind: ItemKind::User,
3571                parts: vec![Part::Text(TextPart {
3572                    text: "ping".into(),
3573                    metadata: MetadataMap::new(),
3574                })],
3575                metadata: MetadataMap::new(),
3576                usage: None,
3577                finish_reason: None,
3578                created_at: None,
3579            }])
3580            .unwrap();
3581
3582        let result = run_until_finished(&mut driver).await;
3583
3584        match result {
3585            LoopStep::Finished(turn) => {
3586                assert_eq!(turn.finish_reason, FinishReason::Completed);
3587                assert_eq!(turn.items.len(), 1);
3588                match &turn.items[0].parts[0] {
3589                    Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3590                    other => panic!("unexpected part: {other:?}"),
3591                }
3592            }
3593            other => panic!("unexpected loop step: {other:?}"),
3594        }
3595    }
3596
3597    /// Test helper: drives the loop, transparently resuming non-blocking
3598    /// cooperative interrupts (AfterToolResult), until a terminal step or a
3599    /// blocking interrupt is reached.
3600    async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3601        loop {
3602            match driver.next().await.unwrap() {
3603                LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3604                step => return step,
3605            }
3606        }
3607    }
3608
3609    #[tokio::test]
3610    async fn loop_uses_injected_permission_checker() {
3611        let tools = ToolRegistry::new().with(EchoTool::default());
3612        let agent = Agent::builder()
3613            .model(FakeAdapter)
3614            .add_tool_source(tools)
3615            .permissions(DenyFsReads)
3616            .build()
3617            .unwrap();
3618
3619        let mut driver = agent
3620            .start(SessionConfig {
3621                session_id: SessionId::new("session-2"),
3622                metadata: MetadataMap::new(),
3623                cache: None,
3624            })
3625            .await
3626            .unwrap();
3627
3628        driver
3629            .submit_input(vec![Item {
3630                id: None,
3631                kind: ItemKind::User,
3632                parts: vec![Part::Text(TextPart {
3633                    text: "ping".into(),
3634                    metadata: MetadataMap::new(),
3635                })],
3636                metadata: MetadataMap::new(),
3637                usage: None,
3638                finish_reason: None,
3639                created_at: None,
3640            }])
3641            .unwrap();
3642
3643        let result = run_until_finished(&mut driver).await;
3644
3645        match result {
3646            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3647                Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3648                other => panic!("unexpected part: {other:?}"),
3649            },
3650            other => panic!("unexpected loop step: {other:?}"),
3651        }
3652    }
3653
3654    #[tokio::test]
3655    async fn async_task_manager_background_round_requires_explicit_continue() {
3656        let entered = StdArc::new(AtomicBool::new(false));
3657        let release = StdArc::new(Notify::new());
3658        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3659            "background-wait",
3660            RoutingDecision::Background,
3661        )]));
3662        let handle = task_manager.handle();
3663        let tools = ToolRegistry::new().with(BlockingTool::new(
3664            "background-wait",
3665            entered.clone(),
3666            release.clone(),
3667            "background-done",
3668        ));
3669        let agent = Agent::builder()
3670            .model(FakeAdapter)
3671            .add_tool_source(tools)
3672            .permissions(AllowAllPermissions)
3673            .task_manager(task_manager)
3674            .build()
3675            .unwrap();
3676
3677        let mut driver = agent
3678            .start(SessionConfig {
3679                session_id: SessionId::new("session-background"),
3680                metadata: MetadataMap::new(),
3681                cache: None,
3682            })
3683            .await
3684            .unwrap();
3685
3686        driver
3687            .submit_input(vec![Item {
3688                id: None,
3689                kind: ItemKind::User,
3690                parts: vec![Part::Text(TextPart {
3691                    text: "ping".into(),
3692                    metadata: MetadataMap::new(),
3693                })],
3694                metadata: MetadataMap::new(),
3695                usage: None,
3696                finish_reason: None,
3697                created_at: None,
3698            }])
3699            .unwrap();
3700
3701        let first = driver.next().await.unwrap();
3702        match first {
3703            LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3704            other => panic!("unexpected first loop step: {other:?}"),
3705        }
3706
3707        match wait_for_task_event(&handle).await {
3708            TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3709            other => panic!("unexpected task event: {other:?}"),
3710        }
3711        wait_until_entered(entered.as_ref()).await;
3712        release.notify_waiters();
3713
3714        match wait_for_task_event(&handle).await {
3715            TaskEvent::Completed(_, result) => {
3716                assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3717            }
3718            other => panic!("unexpected completion event: {other:?}"),
3719        }
3720
3721        let resumed = driver.next().await.unwrap();
3722        match resumed {
3723            LoopStep::Finished(turn) => {
3724                assert_eq!(turn.finish_reason, FinishReason::Completed);
3725                match &turn.items[0].parts[0] {
3726                    Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3727                    other => panic!("unexpected part after resume: {other:?}"),
3728                }
3729            }
3730            other => panic!("unexpected resumed step: {other:?}"),
3731        }
3732    }
3733
3734    #[tokio::test]
3735    async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3736        let controller = CancellationController::new();
3737        let agent = Agent::builder()
3738            .model(SlowAdapter)
3739            .cancellation(controller.handle())
3740            .build()
3741            .unwrap();
3742
3743        let mut driver = agent
3744            .start(SessionConfig {
3745                session_id: SessionId::new("session-cancel"),
3746                metadata: MetadataMap::new(),
3747                cache: None,
3748            })
3749            .await
3750            .unwrap();
3751
3752        driver
3753            .submit_input(vec![Item {
3754                id: None,
3755                kind: ItemKind::User,
3756                parts: vec![Part::Text(TextPart {
3757                    text: "do the long task".into(),
3758                    metadata: MetadataMap::new(),
3759                })],
3760                metadata: MetadataMap::new(),
3761                usage: None,
3762                finish_reason: None,
3763                created_at: None,
3764            }])
3765            .unwrap();
3766
3767        let cancelled = tokio::join!(async { driver.next().await }, async {
3768            tokio::task::yield_now().await;
3769            controller.interrupt();
3770        })
3771        .0
3772        .unwrap();
3773
3774        match cancelled {
3775            LoopStep::Finished(turn) => {
3776                assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3777                assert_eq!(turn.items.len(), 1);
3778                assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3779                assert_eq!(
3780                    turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3781                    Some(&Value::Bool(true))
3782                );
3783            }
3784            other => panic!("unexpected loop step: {other:?}"),
3785        }
3786
3787        driver
3788            .submit_input(vec![Item {
3789                id: None,
3790                kind: ItemKind::User,
3791                parts: vec![Part::Text(TextPart {
3792                    text: "try again".into(),
3793                    metadata: MetadataMap::new(),
3794                })],
3795                metadata: MetadataMap::new(),
3796                usage: None,
3797                finish_reason: None,
3798                created_at: None,
3799            }])
3800            .unwrap();
3801
3802        let result = driver.next().await.unwrap();
3803        match result {
3804            LoopStep::Finished(turn) => {
3805                assert_eq!(turn.finish_reason, FinishReason::Completed);
3806            }
3807            other => panic!("unexpected loop step after retry: {other:?}"),
3808        }
3809    }
3810
3811    #[tokio::test]
3812    async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3813        let controller = CancellationController::new();
3814        let fg_entered = StdArc::new(AtomicBool::new(false));
3815        let fg_release = StdArc::new(Notify::new());
3816        let bg_entered = StdArc::new(AtomicBool::new(false));
3817        let bg_release = StdArc::new(Notify::new());
3818        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3819            ("foreground-wait", RoutingDecision::Foreground),
3820            ("background-wait", RoutingDecision::Background),
3821        ]));
3822        let handle = task_manager.handle();
3823        let tools = ToolRegistry::new()
3824            .with(BlockingTool::new(
3825                "foreground-wait",
3826                fg_entered.clone(),
3827                fg_release,
3828                "foreground-done",
3829            ))
3830            .with(BlockingTool::new(
3831                "background-wait",
3832                bg_entered.clone(),
3833                bg_release.clone(),
3834                "background-done",
3835            ));
3836        let agent = Agent::builder()
3837            .model(MultiToolAdapter)
3838            .add_tool_source(tools)
3839            .permissions(AllowAllPermissions)
3840            .cancellation(controller.handle())
3841            .task_manager(task_manager)
3842            .build()
3843            .unwrap();
3844
3845        let mut driver = agent
3846            .start(SessionConfig {
3847                session_id: SessionId::new("session-mixed-cancel"),
3848                metadata: MetadataMap::new(),
3849                cache: None,
3850            })
3851            .await
3852            .unwrap();
3853
3854        driver
3855            .submit_input(vec![Item {
3856                id: None,
3857                kind: ItemKind::User,
3858                parts: vec![Part::Text(TextPart {
3859                    text: "run both".into(),
3860                    metadata: MetadataMap::new(),
3861                })],
3862                metadata: MetadataMap::new(),
3863                usage: None,
3864                finish_reason: None,
3865                created_at: None,
3866            }])
3867            .unwrap();
3868
3869        let cancelled = tokio::join!(async { driver.next().await }, async {
3870            let _ = wait_for_task_event(&handle).await;
3871            let _ = wait_for_task_event(&handle).await;
3872            wait_until_entered(fg_entered.as_ref()).await;
3873            wait_until_entered(bg_entered.as_ref()).await;
3874            controller.interrupt();
3875        })
3876        .0
3877        .unwrap();
3878
3879        match cancelled {
3880            LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3881            other => panic!("unexpected loop step after interrupt: {other:?}"),
3882        }
3883
3884        match wait_for_task_event(&handle).await {
3885            TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3886            other => panic!("unexpected post-interrupt event: {other:?}"),
3887        }
3888
3889        let running = handle.list_running().await;
3890        assert_eq!(running.len(), 1);
3891        assert_eq!(running[0].tool_name, "background-wait");
3892
3893        bg_release.notify_waiters();
3894        match wait_for_task_event(&handle).await {
3895            TaskEvent::Completed(snapshot, result) => {
3896                assert_eq!(snapshot.tool_name, "background-wait");
3897                assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3898            }
3899            other => panic!("unexpected background completion event: {other:?}"),
3900        }
3901    }
3902
3903    #[tokio::test]
3904    async fn loop_resumes_after_approved_tool_request() {
3905        let tools = ToolRegistry::new().with(EchoTool::default());
3906        let agent = Agent::builder()
3907            .model(FakeAdapter)
3908            .add_tool_source(tools)
3909            .permissions(ApproveFsReads)
3910            .build()
3911            .unwrap();
3912
3913        let mut driver = agent
3914            .start(SessionConfig {
3915                session_id: SessionId::new("session-approval"),
3916                metadata: MetadataMap::new(),
3917                cache: None,
3918            })
3919            .await
3920            .unwrap();
3921
3922        driver
3923            .submit_input(vec![Item {
3924                id: None,
3925                kind: ItemKind::User,
3926                parts: vec![Part::Text(TextPart {
3927                    text: "ping".into(),
3928                    metadata: MetadataMap::new(),
3929                })],
3930                metadata: MetadataMap::new(),
3931                usage: None,
3932                finish_reason: None,
3933                created_at: None,
3934            }])
3935            .unwrap();
3936
3937        let first = driver.next().await.unwrap();
3938        match first {
3939            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3940                assert!(pending.request.task_id.is_some());
3941                assert_eq!(pending.request.id.0, "approval:fs-read");
3942                pending.approve(&mut driver).unwrap();
3943            }
3944            other => panic!("unexpected loop step: {other:?}"),
3945        }
3946        let second = driver.next().await.unwrap();
3947        match second {
3948            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3949                Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3950                other => panic!("unexpected part: {other:?}"),
3951            },
3952            other => panic!("unexpected loop step after approval: {other:?}"),
3953        }
3954    }
3955
3956    #[tokio::test]
3957    async fn loop_resumes_with_patched_input_on_approval() {
3958        let tools = ToolRegistry::new().with(EchoTool::default());
3959        let agent = Agent::builder()
3960            .model(FakeAdapter)
3961            .add_tool_source(tools)
3962            .permissions(ApproveFsReads)
3963            .build()
3964            .unwrap();
3965
3966        let mut driver = agent
3967            .start(SessionConfig {
3968                session_id: SessionId::new("session-approval-patched"),
3969                metadata: MetadataMap::new(),
3970                cache: None,
3971            })
3972            .await
3973            .unwrap();
3974
3975        driver
3976            .submit_input(vec![Item {
3977                id: None,
3978                kind: ItemKind::User,
3979                parts: vec![Part::Text(TextPart {
3980                    text: "ping".into(),
3981                    metadata: MetadataMap::new(),
3982                })],
3983                metadata: MetadataMap::new(),
3984                usage: None,
3985                finish_reason: None,
3986                created_at: None,
3987            }])
3988            .unwrap();
3989
3990        match driver.next().await.unwrap() {
3991            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3992                pending
3993                    .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
3994                    .unwrap();
3995            }
3996            other => panic!("unexpected loop step: {other:?}"),
3997        }
3998        match driver.next().await.unwrap() {
3999            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
4000                Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
4001                other => panic!("unexpected part: {other:?}"),
4002            },
4003            other => panic!("unexpected loop step after approval: {other:?}"),
4004        }
4005    }
4006
4007    #[tokio::test]
4008    async fn loop_tracks_multiple_pending_approvals_by_call_id() {
4009        let tools = ToolRegistry::new().with(EchoTool::default());
4010        let agent = Agent::builder()
4011            .model(DualApprovalAdapter)
4012            .add_tool_source(tools)
4013            .permissions(ApproveFsReads)
4014            .build()
4015            .unwrap();
4016
4017        let mut driver = agent
4018            .start(SessionConfig {
4019                session_id: SessionId::new("session-dual-approval"),
4020                metadata: MetadataMap::new(),
4021                cache: None,
4022            })
4023            .await
4024            .unwrap();
4025
4026        driver
4027            .submit_input(vec![Item {
4028                id: None,
4029                kind: ItemKind::User,
4030                parts: vec![Part::Text(TextPart {
4031                    text: "run both approvals".into(),
4032                    metadata: MetadataMap::new(),
4033                })],
4034                metadata: MetadataMap::new(),
4035                usage: None,
4036                finish_reason: None,
4037                created_at: None,
4038            }])
4039            .unwrap();
4040
4041        let pending_first = match driver.next().await.unwrap() {
4042            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4043                assert_eq!(
4044                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4045                    Some("call-1")
4046                );
4047                pending
4048            }
4049            other => panic!("unexpected first loop step: {other:?}"),
4050        };
4051
4052        let pending_second = match driver.next().await.unwrap() {
4053            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4054                assert_eq!(
4055                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4056                    Some("call-2")
4057                );
4058                pending
4059            }
4060            other => panic!("unexpected second loop step: {other:?}"),
4061        };
4062
4063        pending_second.approve(&mut driver).unwrap();
4064        match driver.next().await.unwrap() {
4065            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
4066                assert_eq!(
4067                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
4068                    Some("call-1")
4069                );
4070            }
4071            other => panic!("unexpected step after approving second request: {other:?}"),
4072        }
4073
4074        pending_first.approve(&mut driver).unwrap();
4075        match driver.next().await.unwrap() {
4076            LoopStep::Finished(turn) => {
4077                assert_eq!(turn.finish_reason, FinishReason::Completed);
4078                match &turn.items[0].parts[0] {
4079                    Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
4080                    other => panic!("unexpected final part: {other:?}"),
4081                }
4082            }
4083            other => panic!("unexpected final loop step: {other:?}"),
4084        }
4085    }
4086
4087    #[tokio::test]
4088    async fn loop_compacts_transcript_before_new_turns() {
4089        let events = StdArc::new(StdMutex::new(Vec::new()));
4090        let agent = Agent::builder()
4091            .model(FakeAdapter)
4092            .mutator(KeepRecentMutator { keep: 1 })
4093            .observer(RecordingObserver {
4094                events: events.clone(),
4095            })
4096            .build()
4097            .unwrap();
4098
4099        let mut driver = agent
4100            .start(SessionConfig {
4101                session_id: SessionId::new("session-4"),
4102                metadata: MetadataMap::new(),
4103                cache: None,
4104            })
4105            .await
4106            .unwrap();
4107
4108        for text in ["first", "second"] {
4109            driver
4110                .submit_input(vec![Item {
4111                    id: None,
4112                    kind: ItemKind::User,
4113                    parts: vec![Part::Text(TextPart {
4114                        text: text.into(),
4115                        metadata: MetadataMap::new(),
4116                    })],
4117                    metadata: MetadataMap::new(),
4118                    usage: None,
4119                    finish_reason: None,
4120                    created_at: None,
4121                }])
4122                .unwrap();
4123            let _ = driver.next().await.unwrap();
4124        }
4125
4126        let events = events.lock().unwrap();
4127        assert!(
4128            events
4129                .iter()
4130                .any(|event| matches!(event, AgentEvent::MutationFinished { dirty: true, .. }))
4131        );
4132    }
4133
4134    #[test]
4135    fn transcript_validation_rejects_orphaned_tool_result() {
4136        let transcript = vec![Item {
4137            id: None,
4138            kind: ItemKind::Tool,
4139            parts: vec![Part::ToolResult(ToolResultPart {
4140                call_id: "call-1".into(),
4141                output: ToolOutput::Text("result".into()),
4142                is_error: false,
4143                metadata: MetadataMap::new(),
4144            })],
4145            metadata: MetadataMap::new(),
4146            usage: None,
4147            finish_reason: None,
4148            created_at: None,
4149        }];
4150
4151        let error = validate_transcript_invariants(&transcript).unwrap_err();
4152        assert!(error.to_string().contains("orphaned tool_result"));
4153    }
4154
4155    #[test]
4156    fn transcript_validation_rejects_duplicate_tool_result() {
4157        let transcript = vec![
4158            Item {
4159                id: None,
4160                kind: ItemKind::Assistant,
4161                parts: vec![Part::ToolCall(ToolCallPart {
4162                    id: "call-1".into(),
4163                    name: "lookup".into(),
4164                    input: serde_json::json!({}),
4165                    metadata: MetadataMap::new(),
4166                })],
4167                metadata: MetadataMap::new(),
4168                usage: None,
4169                finish_reason: None,
4170                created_at: None,
4171            },
4172            Item {
4173                id: None,
4174                kind: ItemKind::Tool,
4175                parts: vec![Part::ToolResult(ToolResultPart {
4176                    call_id: "call-1".into(),
4177                    output: ToolOutput::Text("result".into()),
4178                    is_error: false,
4179                    metadata: MetadataMap::new(),
4180                })],
4181                metadata: MetadataMap::new(),
4182                usage: None,
4183                finish_reason: None,
4184                created_at: None,
4185            },
4186            Item {
4187                id: None,
4188                kind: ItemKind::Tool,
4189                parts: vec![Part::ToolResult(ToolResultPart {
4190                    call_id: "call-1".into(),
4191                    output: ToolOutput::Text("again".into()),
4192                    is_error: false,
4193                    metadata: MetadataMap::new(),
4194                })],
4195                metadata: MetadataMap::new(),
4196                usage: None,
4197                finish_reason: None,
4198                created_at: None,
4199            },
4200        ];
4201
4202        let error = validate_transcript_invariants(&transcript).unwrap_err();
4203        assert!(error.to_string().contains("duplicate tool_result"));
4204    }
4205
4206    #[tokio::test]
4207    async fn loop_refreshes_tool_specs_each_turn() {
4208        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4209        let version = StdArc::new(AtomicUsize::new(1));
4210        let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
4211        let agent = Agent::builder()
4212            .model(RecordingAdapter {
4213                seen_descriptions: seen_descriptions.clone(),
4214                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4215            })
4216            .add_tool_source(tools)
4217            .permissions(AllowAllPermissions)
4218            .build()
4219            .unwrap();
4220
4221        let mut driver = agent
4222            .start(SessionConfig {
4223                session_id: SessionId::new("session-dynamic-tools"),
4224                metadata: MetadataMap::new(),
4225                cache: None,
4226            })
4227            .await
4228            .unwrap();
4229
4230        for text in ["first", "second"] {
4231            driver
4232                .submit_input(vec![Item {
4233                    id: None,
4234                    kind: ItemKind::User,
4235                    parts: vec![Part::Text(TextPart {
4236                        text: text.into(),
4237                        metadata: MetadataMap::new(),
4238                    })],
4239                    metadata: MetadataMap::new(),
4240                    usage: None,
4241                    finish_reason: None,
4242                    created_at: None,
4243                }])
4244                .unwrap();
4245
4246            let _ = driver.next().await.unwrap();
4247            if text == "first" {
4248                version.store(2, Ordering::SeqCst);
4249            }
4250        }
4251
4252        let seen_descriptions = seen_descriptions.lock().unwrap();
4253        assert_eq!(seen_descriptions.len(), 2);
4254        assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
4255        assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
4256    }
4257
4258    #[tokio::test]
4259    async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
4260        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4261        let events = StdArc::new(StdMutex::new(Vec::new()));
4262        let executor = StdArc::new(CatalogExecutor::new());
4263        let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
4264        let agent = Agent::builder()
4265            .model(RecordingAdapter {
4266                seen_descriptions: seen_descriptions.clone(),
4267                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4268            })
4269            .tool_executor(executor_for_agent)
4270            .permissions(AllowAllPermissions)
4271            .observer(RecordingObserver {
4272                events: events.clone(),
4273            })
4274            .build()
4275            .unwrap();
4276
4277        let mut driver = agent
4278            .start(SessionConfig {
4279                session_id: SessionId::new("session-catalog-events"),
4280                metadata: MetadataMap::new(),
4281                cache: None,
4282            })
4283            .await
4284            .unwrap();
4285
4286        driver
4287            .submit_input(vec![Item::text(ItemKind::User, "first")])
4288            .unwrap();
4289        let _ = driver.next().await.unwrap();
4290
4291        executor.publish_change(
4292            1,
4293            ToolCatalogEvent {
4294                source: "mcp:mock".into(),
4295                added: vec!["dynamic".into()],
4296                removed: Vec::new(),
4297                changed: Vec::new(),
4298            },
4299        );
4300
4301        driver
4302            .submit_input(vec![Item::text(ItemKind::User, "second")])
4303            .unwrap();
4304        let _ = driver.next().await.unwrap();
4305
4306        let seen_descriptions = seen_descriptions.lock().unwrap();
4307        assert_eq!(seen_descriptions.len(), 2);
4308        assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
4309        assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
4310
4311        let events = events.lock().unwrap();
4312        assert!(events.iter().any(|event| matches!(
4313            event,
4314            AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
4315                source,
4316                added,
4317                removed,
4318                changed,
4319            }) if source == "mcp:mock"
4320                && added == &vec!["dynamic".to_string()]
4321                && removed.is_empty()
4322                && changed.is_empty()
4323        )));
4324    }
4325
4326    #[tokio::test]
4327    async fn loop_passes_session_default_and_next_turn_cache_requests() {
4328        let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
4329        let agent = Agent::builder()
4330            .model(RecordingAdapter {
4331                seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
4332                seen_caches: seen_caches.clone(),
4333            })
4334            .permissions(AllowAllPermissions)
4335            .build()
4336            .unwrap();
4337
4338        let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
4339            .with_retention(PromptCacheRetention::Short);
4340        let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
4341            breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
4342        });
4343
4344        let mut driver = agent
4345            .start(SessionConfig {
4346                session_id: SessionId::new("session-cache"),
4347                metadata: MetadataMap::new(),
4348                cache: Some(default_cache.clone()),
4349            })
4350            .await
4351            .unwrap();
4352
4353        driver
4354            .submit_input(vec![Item {
4355                id: None,
4356                kind: ItemKind::User,
4357                parts: vec![Part::Text(TextPart {
4358                    text: "first".into(),
4359                    metadata: MetadataMap::new(),
4360                })],
4361                metadata: MetadataMap::new(),
4362                usage: None,
4363                finish_reason: None,
4364                created_at: None,
4365            }])
4366            .unwrap();
4367        let _ = driver.next().await.unwrap();
4368
4369        driver
4370            .submit_input_with_cache(
4371                vec![Item {
4372                    id: None,
4373                    kind: ItemKind::User,
4374                    parts: vec![Part::Text(TextPart {
4375                        text: "second".into(),
4376                        metadata: MetadataMap::new(),
4377                    })],
4378                    metadata: MetadataMap::new(),
4379                    usage: None,
4380                    finish_reason: None,
4381                    created_at: None,
4382                }],
4383                override_cache.clone(),
4384            )
4385            .unwrap();
4386        let _ = driver.next().await.unwrap();
4387
4388        let seen = seen_caches.lock().unwrap();
4389        assert_eq!(seen.len(), 2);
4390        assert_eq!(seen[0], Some(default_cache));
4391        assert_eq!(seen[1], Some(override_cache));
4392    }
4393
4394    #[tokio::test]
4395    async fn loop_yields_after_tool_result_between_rounds() {
4396        let tools = ToolRegistry::new().with(EchoTool::default());
4397        let agent = Agent::builder()
4398            .model(FakeAdapter)
4399            .add_tool_source(tools)
4400            .permissions(AllowAllPermissions)
4401            .build()
4402            .unwrap();
4403
4404        let mut driver = agent
4405            .start(SessionConfig {
4406                session_id: SessionId::new("yield-session"),
4407                metadata: MetadataMap::new(),
4408                cache: None,
4409            })
4410            .await
4411            .unwrap();
4412
4413        driver
4414            .submit_input(vec![Item::text(ItemKind::User, "ping")])
4415            .unwrap();
4416
4417        // First next() runs the model turn, resolves the tool call, and
4418        // yields AfterToolResult before calling the model again.
4419        let step = driver.next().await.unwrap();
4420        let info = match step {
4421            LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
4422            other => panic!("expected AfterToolResult, got {other:?}"),
4423        };
4424        assert_eq!(info.session_id, SessionId::new("yield-session"));
4425        // Transcript at yield: [User, Assistant(tool_call), Tool(result)]
4426        assert_eq!(info.transcript_len, 3);
4427
4428        // The yield is cooperative, not blocking.
4429        let interrupt = LoopInterrupt::AfterToolResult(info.clone());
4430        assert!(!interrupt.is_blocking());
4431
4432        // Host interjects a message mid-turn.
4433        driver
4434            .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
4435            .unwrap();
4436
4437        // Second next() resumes the turn into the next model call, which
4438        // sees the tool result (and the injected user message) and finishes.
4439        let step = driver.next().await.unwrap();
4440        match step {
4441            LoopStep::Finished(turn) => {
4442                assert_eq!(turn.finish_reason, FinishReason::Completed);
4443            }
4444            other => panic!("expected Finished, got {other:?}"),
4445        }
4446
4447        // Transcript must now include the injected user message.
4448        let snapshot = driver.snapshot();
4449        let has_injected_message = snapshot.transcript.iter().any(|item| {
4450            item.kind == ItemKind::User
4451                && item.parts.iter().any(|part| match part {
4452                    Part::Text(text) => text.text == "also: report back",
4453                    _ => false,
4454                })
4455        });
4456        assert!(
4457            has_injected_message,
4458            "injected user message should be in transcript, got: {:?}",
4459            snapshot.transcript
4460        );
4461    }
4462
4463    struct RecordingTranscriptObserver {
4464        items: StdArc<StdMutex<Vec<Item>>>,
4465    }
4466
4467    impl TranscriptObserver for RecordingTranscriptObserver {
4468        fn on_item_appended(&self, item: &Item) {
4469            self.items.lock().unwrap().push(item.clone());
4470        }
4471    }
4472
4473    #[tokio::test]
4474    async fn observers_see_full_tool_round() {
4475        // A turn with one tool call exercises every interesting path:
4476        //   user input drained -> model output_items (assistant w/ tool call)
4477        //   -> tool result Item -> next model output_items (assistant text)
4478        // The LoopObserver should see exactly one ToolResultReceived; the
4479        // TranscriptObserver should see all four items in transcript order.
4480        let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
4481        let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
4482        let agent = Agent::builder()
4483            .model(FakeAdapter)
4484            .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
4485            .permissions(AllowAllPermissions)
4486            .observer(RecordingObserver {
4487                events: events.clone(),
4488            })
4489            .transcript_observer(RecordingTranscriptObserver {
4490                items: items.clone(),
4491            })
4492            .build()
4493            .unwrap();
4494
4495        let mut driver = agent
4496            .start(SessionConfig {
4497                session_id: SessionId::new("observer-session"),
4498                metadata: MetadataMap::new(),
4499                cache: None,
4500            })
4501            .await
4502            .unwrap();
4503
4504        driver
4505            .submit_input(vec![Item {
4506                id: None,
4507                kind: ItemKind::User,
4508                parts: vec![Part::Text(TextPart {
4509                    text: "ping".into(),
4510                    metadata: MetadataMap::new(),
4511                })],
4512                metadata: MetadataMap::new(),
4513                usage: None,
4514                finish_reason: None,
4515                created_at: None,
4516            }])
4517            .unwrap();
4518
4519        let result = run_until_finished(&mut driver).await;
4520        assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4521
4522        // LoopObserver: exactly one ToolResultReceived, with the echo
4523        // tool's output, correlating back to the model's tool call.
4524        let events = events.lock().unwrap().clone();
4525        let tool_call_id = events.iter().find_map(|e| match e {
4526            AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4527            _ => None,
4528        });
4529        let tool_results: Vec<_> = events
4530            .iter()
4531            .filter_map(|e| match e {
4532                AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4533                _ => None,
4534            })
4535            .collect();
4536        assert_eq!(tool_results.len(), 1, "events: {events:?}");
4537        assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4538        assert!(!tool_results[0].is_error);
4539
4540        // TranscriptObserver: every transcript mutation surfaces.
4541        // Expected order: User("ping"), Assistant(tool call), Tool(result),
4542        // Assistant("tool said: pong").
4543        let items = items.lock().unwrap().clone();
4544        assert_eq!(items.len(), 4, "items: {items:?}");
4545        assert_eq!(items[0].kind, ItemKind::User);
4546        assert_eq!(items[1].kind, ItemKind::Assistant);
4547        assert!(
4548            items[1]
4549                .parts
4550                .iter()
4551                .any(|p| matches!(p, Part::ToolCall(_)))
4552        );
4553        assert_eq!(items[2].kind, ItemKind::Tool);
4554        assert!(
4555            items[2]
4556                .parts
4557                .iter()
4558                .any(|p| matches!(p, Part::ToolResult(_)))
4559        );
4560        assert_eq!(items[3].kind, ItemKind::Assistant);
4561    }
4562
4563    #[test]
4564    fn convenience_cache_builders_construct_expected_defaults() {
4565        let cache = PromptCacheRequest::automatic()
4566            .with_retention(PromptCacheRetention::Short)
4567            .with_key("workspace:demo");
4568        let session = SessionConfig::new("demo").with_cache(cache.clone());
4569
4570        assert_eq!(session.session_id, SessionId::new("demo"));
4571        assert_eq!(session.cache, Some(cache));
4572
4573        let explicit = PromptCacheRequest::explicit([
4574            PromptCacheBreakpoint::tools_end(),
4575            PromptCacheBreakpoint::transcript_item_end(2),
4576            PromptCacheBreakpoint::transcript_part_end(3, 1),
4577        ]);
4578
4579        assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4580        assert_eq!(
4581            explicit.strategy,
4582            PromptCacheStrategy::Explicit {
4583                breakpoints: vec![
4584                    PromptCacheBreakpoint::ToolsEnd,
4585                    PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4586                    PromptCacheBreakpoint::TranscriptPartEnd {
4587                        item_index: 3,
4588                        part_index: 1,
4589                    },
4590                ],
4591            }
4592        );
4593    }
4594}