Skip to main content

agentkit_loop/
lib.rs

1//! Runtime-agnostic agent loop orchestration for sessions, turns, tools, and interrupts.
2//!
3//! `agentkit-loop` is the central coordination layer in the agentkit workspace.  It
4//! drives a model through a multi-turn agentic loop, executing tool calls,
5//! respecting permission checks, surfacing approval interrupts to the host
6//! application, and optionally compacting the transcript when it grows too large.
7//!
8//! # Architecture
9//!
10//! The main entry point is [`Agent`], constructed via [`AgentBuilder`]. The
11//! builder optionally accepts the prior conversation transcript via
12//! [`AgentBuilder::transcript`] and the next user turn via
13//! [`AgentBuilder::input`] — both default to empty. Calling
14//! [`Agent::start`] with a [`SessionConfig`] returns a [`LoopDriver`] that
15//! yields [`LoopStep`]s — either a finished turn or an interrupt that
16//! requires host resolution before the loop can continue.
17//!
18//! If no input was preloaded, the first call to [`LoopDriver::next`] yields
19//! [`LoopInterrupt::AwaitingInput`] and the host supplies the first user
20//! turn via [`InputRequest::submit`]. If input was preloaded, the first
21//! `next()` dispatches the model directly — convenient for one-shot calls.
22//!
23//! ```text
24//! Agent::builder()
25//!     .model(adapter)              // ModelAdapter implementation
26//!     .add_tool_source(registry)   // ToolRegistry (or any ToolSource); call again to federate
27//!     .permissions(checker)        // PermissionChecker for gating tool use
28//!     .observer(obs)               // LoopObserver for streaming events
29//!     .transcript(prior)           // optional: passive prior transcript (system prompt, resumed session)
30//!     .input(first_user_turn)      // optional: preload next user turn so first next() drives a turn
31//!     .build()?
32//!     .start(config).await?  -> LoopDriver
33//!         .next().await?     -> LoopStep::Finished | LoopStep::Interrupt(...)
34//! ```
35//!
36//! # Example
37//!
38//! ```rust,no_run
39//! use agentkit_core::{Item, ItemKind};
40//! use agentkit_loop::{
41//!     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
42//! };
43//!
44//! # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
45//! // One-shot: preload system prompt and first user message; first next()
46//! // drives the model directly.
47//! let agent = Agent::builder()
48//!     .model(adapter)
49//!     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
50//!     .input(vec![Item::text(ItemKind::User, "Hello!")])
51//!     .build()?;
52//!
53//! let mut driver = agent
54//!     .start(SessionConfig::new("demo").with_cache(
55//!         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
56//!     ))
57//!     .await?;
58//!
59//! let _ = driver.next().await?;
60//! # Ok(())
61//! # }
62//! ```
63
64use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_core::{
68    CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
69    TextPart, Timestamp, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation,
70    Usage,
71};
72use agentkit_task_manager::{
73    PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
74    TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
75};
76#[cfg(test)]
77use agentkit_tools_core::ToolContext;
78use agentkit_tools_core::{
79    AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
80    PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutor, ToolRequest, ToolResources,
81    ToolSource, ToolSpec,
82};
83use async_trait::async_trait;
84use serde::{Deserialize, Serialize};
85use thiserror::Error;
86
87const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
88const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
89const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
90const USER_CANCELLED_REASON: &str = "user_cancelled";
91
92/// Configuration required to start a new model session.
93///
94/// Pass this to [`Agent::start`] to initialise the underlying [`ModelSession`]
95/// and obtain a [`LoopDriver`].
96///
97/// # Example
98///
99/// ```rust
100/// use agentkit_loop::{PromptCacheRequest, PromptCacheRetention, SessionConfig};
101///
102/// let config = SessionConfig::new("my-session").with_cache(
103///     PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
104/// );
105/// ```
106#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub struct SessionConfig {
108    /// Unique identifier for the session.
109    pub session_id: SessionId,
110    /// Arbitrary key-value metadata forwarded to the model adapter.
111    pub metadata: MetadataMap,
112    /// Default provider-side prompt caching policy for turns in this session.
113    pub cache: Option<PromptCacheRequest>,
114}
115
116impl SessionConfig {
117    /// Builds a session config with empty metadata and no cache policy.
118    pub fn new(session_id: impl Into<SessionId>) -> Self {
119        Self {
120            session_id: session_id.into(),
121            metadata: MetadataMap::new(),
122            cache: None,
123        }
124    }
125
126    /// Replaces the session metadata map.
127    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
128        self.metadata = metadata;
129        self
130    }
131
132    /// Sets the default prompt cache request for the session.
133    pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
134        self.cache = Some(cache);
135        self
136    }
137
138    /// Clears any default prompt cache request for the session.
139    pub fn without_cache(mut self) -> Self {
140        self.cache = None;
141        self
142    }
143}
144
145/// Strength of a prompt-cache request.
146///
147/// `BestEffort` lets adapters ignore unsupported controls while still using
148/// any provider-native automatic caching they support. `Required` upgrades
149/// unsupported cache requests into provider errors.
150#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
151pub enum PromptCacheMode {
152    /// Disable prompt caching for this request.
153    Disabled,
154    /// Use caching when the provider can honor the request.
155    #[default]
156    BestEffort,
157    /// Fail the turn if the provider cannot honor the request.
158    Required,
159}
160
161/// High-level provider-neutral cache retention hint.
162///
163/// Providers map this to their native controls. For example, OpenAI maps
164/// `Short` to in-memory retention while OpenRouter Anthropic models map it to
165/// the default 5-minute ephemeral cache.
166#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum PromptCacheRetention {
168    /// Use the provider's default cache retention.
169    Default,
170    /// Prefer the provider's short-lived cache retention mode.
171    Short,
172    /// Prefer the provider's longest generally available cache retention mode.
173    Extended,
174}
175
176/// Provider-neutral prompt caching strategy.
177#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
178pub enum PromptCacheStrategy {
179    /// Let the provider decide the cacheable prefix automatically.
180    #[default]
181    Automatic,
182    /// Apply explicit cache breakpoints to selected prefix boundaries.
183    Explicit {
184        /// Cache breakpoints in transcript/tool order.
185        breakpoints: Vec<PromptCacheBreakpoint>,
186    },
187}
188
189impl PromptCacheStrategy {
190    /// Uses the provider's native automatic cache behavior when available, or
191    /// any adapter-provided automatic planning fallback.
192    pub fn automatic() -> Self {
193        Self::Automatic
194    }
195
196    /// Uses explicit cache breakpoints.
197    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
198        Self::Explicit {
199            breakpoints: breakpoints.into_iter().collect(),
200        }
201    }
202}
203
204/// Prefix boundary that a provider should cache when using explicit caching.
205#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
206pub enum PromptCacheBreakpoint {
207    /// Cache the tool schema prefix through the last available tool.
208    ToolsEnd,
209    /// Cache through the end of the transcript item at `index`.
210    TranscriptItemEnd { index: usize },
211    /// Cache through the specific transcript part.
212    ///
213    /// Not every adapter can target every part precisely; unsupported
214    /// fine-grained breakpoints become best-effort no-ops unless the request is
215    /// marked [`PromptCacheMode::Required`].
216    TranscriptPartEnd {
217        item_index: usize,
218        part_index: usize,
219    },
220}
221
222impl PromptCacheBreakpoint {
223    /// Cache the tool schema prefix through the last available tool.
224    pub fn tools_end() -> Self {
225        Self::ToolsEnd
226    }
227
228    /// Cache through the end of a transcript item.
229    pub fn transcript_item_end(index: usize) -> Self {
230        Self::TranscriptItemEnd { index }
231    }
232
233    /// Cache through a specific part within a transcript item.
234    pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
235        Self::TranscriptPartEnd {
236            item_index,
237            part_index,
238        }
239    }
240}
241
242/// Prompt caching request sent alongside a turn.
243#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
244pub struct PromptCacheRequest {
245    /// Strength of the caching request.
246    pub mode: PromptCacheMode,
247    /// Automatic or explicit caching strategy.
248    pub strategy: PromptCacheStrategy,
249    /// Optional provider-neutral retention hint.
250    pub retention: Option<PromptCacheRetention>,
251    /// Optional provider cache key or routing key.
252    pub key: Option<String>,
253}
254
255impl PromptCacheRequest {
256    /// Builds a best-effort automatic cache request.
257    pub fn automatic() -> Self {
258        Self::best_effort(PromptCacheStrategy::automatic())
259    }
260
261    /// Builds a required automatic cache request.
262    pub fn automatic_required() -> Self {
263        Self::required(PromptCacheStrategy::automatic())
264    }
265
266    /// Builds a best-effort explicit cache request.
267    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
268        Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
269    }
270
271    /// Builds a required explicit cache request.
272    pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
273        Self::required(PromptCacheStrategy::explicit(breakpoints))
274    }
275
276    /// Builds a disabled cache request.
277    pub fn disabled() -> Self {
278        Self {
279            mode: PromptCacheMode::Disabled,
280            strategy: PromptCacheStrategy::Automatic,
281            retention: None,
282            key: None,
283        }
284    }
285
286    /// Builds a best-effort cache request with the given strategy.
287    pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
288        Self {
289            mode: PromptCacheMode::BestEffort,
290            strategy,
291            retention: None,
292            key: None,
293        }
294    }
295
296    /// Builds a required cache request with the given strategy.
297    pub fn required(strategy: PromptCacheStrategy) -> Self {
298        Self {
299            mode: PromptCacheMode::Required,
300            strategy,
301            retention: None,
302            key: None,
303        }
304    }
305
306    /// Overrides the request mode.
307    pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
308        self.mode = mode;
309        self
310    }
311
312    /// Overrides the request strategy.
313    pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
314        self.strategy = strategy;
315        self
316    }
317
318    /// Applies a provider-neutral retention hint.
319    pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
320        self.retention = Some(retention);
321        self
322    }
323
324    /// Applies a provider cache key or routing key.
325    pub fn with_key(mut self, key: impl Into<String>) -> Self {
326        self.key = Some(key.into());
327        self
328    }
329
330    /// Clears any provider-neutral retention hint.
331    pub fn without_retention(mut self) -> Self {
332        self.retention = None;
333        self
334    }
335
336    /// Clears any provider cache key or routing key.
337    pub fn without_key(mut self) -> Self {
338        self.key = None;
339        self
340    }
341
342    /// Returns true when caching should be active for this request.
343    pub fn is_enabled(&self) -> bool {
344        !matches!(self.mode, PromptCacheMode::Disabled)
345    }
346}
347
348/// Payload sent to the model at the start of each turn.
349///
350/// The [`LoopDriver`] constructs this automatically from its internal state
351/// and passes it to [`ModelSession::begin_turn`].  Model adapter authors
352/// use the fields to build the provider-specific request.
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct TurnRequest {
355    /// Session this turn belongs to.
356    pub session_id: SessionId,
357    /// Unique identifier for the current turn.
358    pub turn_id: agentkit_core::TurnId,
359    /// Full conversation transcript accumulated so far.
360    pub transcript: Vec<Item>,
361    /// Tool specifications the model may invoke during this turn.
362    pub available_tools: Vec<ToolSpec>,
363    /// Provider-side prompt caching request for this turn.
364    pub cache: Option<PromptCacheRequest>,
365    /// Per-turn metadata (e.g. provider hints).
366    pub metadata: MetadataMap,
367}
368
369/// Final result produced by a single model turn.
370///
371/// Returned inside [`ModelTurnEvent::Finished`] to signal that the model has
372/// completed its generation for this turn.
373#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
374pub struct ModelTurnResult {
375    /// Why the model stopped generating (e.g. completed, tool call, length).
376    pub finish_reason: FinishReason,
377    /// Items the model produced during this turn (text, tool calls, etc.).
378    pub output_items: Vec<Item>,
379    /// Token usage statistics, if available.
380    pub usage: Option<Usage>,
381    /// Provider-specific metadata about the turn.
382    pub metadata: MetadataMap,
383}
384
385/// Streaming event emitted by a [`ModelTurn`] during generation.
386///
387/// The [`LoopDriver`] consumes these events one-by-one via
388/// [`ModelTurn::next_event`] and translates them into [`AgentEvent`]s for
389/// observers and into transcript mutations.
390#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
391pub enum ModelTurnEvent {
392    /// Incremental text or content delta from the model.
393    Delta(Delta),
394    /// The model is requesting a tool call.
395    ToolCall(ToolCallPart),
396    /// Updated token usage statistics.
397    Usage(Usage),
398    /// The model has finished generating for this turn.
399    Finished(ModelTurnResult),
400}
401
402/// Factory for creating model sessions.
403///
404/// Implement this trait to integrate a model provider (e.g. OpenRouter,
405/// Anthropic, a local LLM server) with the agent loop.  [`Agent`] holds a
406/// single adapter and calls [`start_session`](ModelAdapter::start_session)
407/// once when [`Agent::start`] is invoked.
408///
409/// # Example
410///
411/// ```rust,no_run
412/// use agentkit_loop::{ModelAdapter, ModelSession, SessionConfig, LoopError};
413/// use async_trait::async_trait;
414///
415/// struct MyAdapter;
416///
417/// #[async_trait]
418/// impl ModelAdapter for MyAdapter {
419///     type Session = MySession;
420///
421///     async fn start_session(&self, config: SessionConfig) -> Result<MySession, LoopError> {
422///         // Initialize provider-specific session state here.
423///         Ok(MySession { /* ... */ })
424///     }
425/// }
426/// # struct MySession;
427/// # #[async_trait]
428/// # impl ModelSession for MySession {
429/// #     type Turn = MyTurn;
430/// #     async fn begin_turn(&mut self, _r: agentkit_loop::TurnRequest, _c: Option<agentkit_core::TurnCancellation>) -> Result<MyTurn, LoopError> { todo!() }
431/// # }
432/// # struct MyTurn;
433/// # #[async_trait]
434/// # impl agentkit_loop::ModelTurn for MyTurn {
435/// #     async fn next_event(&mut self, _c: Option<agentkit_core::TurnCancellation>) -> Result<Option<agentkit_loop::ModelTurnEvent>, LoopError> { todo!() }
436/// # }
437/// ```
438#[async_trait]
439pub trait ModelAdapter: Send + Sync {
440    /// The session type produced by this adapter.
441    type Session: ModelSession;
442
443    /// Create a new model session from the given configuration.
444    ///
445    /// # Errors
446    ///
447    /// Returns [`LoopError`] if the provider connection or initialisation fails.
448    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
449}
450
451/// An active model session that can produce sequential turns.
452///
453/// A session is created once per [`Agent::start`] call and lives for the
454/// lifetime of the [`LoopDriver`].  Each call to [`begin_turn`](ModelSession::begin_turn)
455/// hands the full transcript to the model and returns a streaming
456/// [`ModelTurn`].
457#[async_trait]
458pub trait ModelSession: Send {
459    /// The turn type produced by this session.
460    type Turn: ModelTurn;
461
462    /// Start a new turn, sending the transcript and available tools to the model.
463    ///
464    /// # Arguments
465    ///
466    /// * `request` -- the turn payload including transcript and tool specs.
467    /// * `cancellation` -- optional handle the implementation should poll to
468    ///   detect user-initiated cancellation.
469    ///
470    /// # Errors
471    ///
472    /// Returns [`LoopError::Cancelled`] when the turn is cancelled, or a
473    /// provider-specific error wrapped in [`LoopError`].
474    async fn begin_turn(
475        &mut self,
476        request: TurnRequest,
477        cancellation: Option<TurnCancellation>,
478    ) -> Result<Self::Turn, LoopError>;
479}
480
481/// A streaming model turn that yields events one at a time.
482///
483/// The loop driver calls [`next_event`](ModelTurn::next_event) repeatedly
484/// until it returns `Ok(None)` (stream exhausted) or
485/// `Ok(Some(ModelTurnEvent::Finished(_)))`.
486#[async_trait]
487pub trait ModelTurn: Send {
488    /// Retrieve the next event from the model's response stream.
489    ///
490    /// Returns `Ok(None)` when the stream is exhausted.
491    ///
492    /// # Errors
493    ///
494    /// Returns [`LoopError::Cancelled`] if `cancellation` fires, or a
495    /// provider-specific error wrapped in [`LoopError`].
496    async fn next_event(
497        &mut self,
498        cancellation: Option<TurnCancellation>,
499    ) -> Result<Option<ModelTurnEvent>, LoopError>;
500}
501
502/// Observer hook for streaming agent events to the host application.
503///
504/// Register observers via [`AgentBuilder::observer`] to receive real-time
505/// notifications about deltas, tool calls, usage, warnings, and lifecycle
506/// events.
507///
508/// # Example
509///
510/// ```rust
511/// use agentkit_loop::{AgentEvent, LoopObserver};
512///
513/// struct StdoutObserver;
514///
515/// impl LoopObserver for StdoutObserver {
516///     fn handle_event(&self, event: AgentEvent) {
517///         println!("{event:?}");
518///     }
519/// }
520/// ```
521pub trait LoopObserver: Send + Sync {
522    /// Called synchronously for every [`AgentEvent`] emitted by the loop driver.
523    /// Observers store mutable state behind interior mutability (`Mutex`,
524    /// atomics, channels) so the driver can share an `Arc<dyn LoopObserver>`
525    /// across reusable [`Agent`] starts.
526    fn handle_event(&self, event: AgentEvent);
527}
528
529/// Receives full [`Item`]s as they are appended to the driver's transcript.
530///
531/// While [`LoopObserver`] surfaces operational events (deltas, tool calls,
532/// lifecycle, telemetry), it can't be reconstructed back into a faithful
533/// transcript on its own — content deltas span partial parts and don't
534/// carry their parent-Item identity, and historically tool results were
535/// pushed into the transcript with no observer event at all. A
536/// `TranscriptObserver` is the loss-free counterpart: it fires once per
537/// [`Item`] appended, with the full Item shape ready for persistence,
538/// replication, or audit.
539///
540/// Observers are called *synchronously* from inside the driver, in the
541/// same order items land in the transcript. Compaction-driven transcript
542/// rewrites do **not** fire `on_item_appended` — those are signaled by
543/// [`AgentEvent::CompactionFinished`] instead.
544///
545/// Register via [`AgentBuilder::transcript_observer`]; multiple observers
546/// may be registered and are called in registration order.
547///
548/// # Example
549///
550/// ```rust
551/// use agentkit_core::Item;
552/// use agentkit_loop::TranscriptObserver;
553/// use std::sync::atomic::{AtomicUsize, Ordering};
554///
555/// struct CountingObserver { items: AtomicUsize }
556///
557/// impl TranscriptObserver for CountingObserver {
558///     fn on_item_appended(&self, _item: &Item) {
559///         self.items.fetch_add(1, Ordering::Relaxed);
560///     }
561/// }
562/// ```
563pub trait TranscriptObserver: Send + Sync {
564    /// Called synchronously every time an [`Item`] is appended to the
565    /// driver's transcript, in transcript order. Observers store mutable
566    /// state behind interior mutability so the driver can share an
567    /// `Arc<dyn TranscriptObserver>`.
568    fn on_item_appended(&self, item: &Item);
569}
570
571/// Where in the loop a [`LoopMutator`] is given a chance to modify the
572/// transcript. Mutators run synchronously at these points; mid-stream
573/// mutation (e.g. between content deltas) is intentionally not supported
574/// because the assistant item is not yet fully constructed.
575#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
576#[non_exhaustive]
577pub enum MutationPoint {
578    /// A tool result has just been appended; the next loop step will be
579    /// another inference call.
580    AfterToolResult,
581    /// A turn has fully ended (assistant final, interrupt, or cancellation)
582    /// and any new user input has not yet been dispatched.
583    AfterTurnEnded,
584}
585
586/// Sink for emitting [`AgentEvent`]s from inside a [`LoopMutator`].
587/// The driver supplies a concrete implementation via [`LoopCtx::emitter`].
588pub trait EventEmitter: Send + Sync {
589    /// Forward `event` to all registered observers.
590    fn emit(&self, event: AgentEvent);
591}
592
593/// Read-only context handed to a [`LoopMutator`] alongside the cursor.
594#[non_exhaustive]
595pub struct LoopCtx<'a> {
596    /// Session this mutation point belongs to.
597    pub session_id: &'a SessionId,
598    /// Turn the mutation is associated with, if any.
599    pub turn_id: Option<&'a agentkit_core::TurnId>,
600    /// Where in the loop the mutator is running.
601    pub point: MutationPoint,
602    /// Cancellation handle for the active turn, if any.
603    pub cancellation: Option<TurnCancellation>,
604    /// Sink for emitting events from the mutator (telemetry, progress).
605    pub emitter: &'a dyn EventEmitter,
606}
607
608/// Mutable handle over the live transcript with dirty tracking.
609///
610/// Implements [`Deref`](std::ops::Deref)/[`DerefMut`](std::ops::DerefMut) to
611/// `Vec<Item>` so mutators read and write through `Vec`'s native API
612/// (`push`, `retain`, `iter`, `*cursor = ...`). Any `&mut` access marks the
613/// cursor dirty; the loop validates transcript invariants when at least one
614/// mutator dirtied the transcript and hard-fails on protocol violations.
615pub struct TranscriptCursor<'a> {
616    items: &'a mut Vec<Item>,
617    pub(crate) dirty: bool,
618}
619
620impl<'a> std::ops::Deref for TranscriptCursor<'a> {
621    type Target = Vec<Item>;
622    fn deref(&self) -> &Vec<Item> {
623        self.items
624    }
625}
626
627impl<'a> std::ops::DerefMut for TranscriptCursor<'a> {
628    fn deref_mut(&mut self) -> &mut Vec<Item> {
629        self.dirty = true;
630        self.items
631    }
632}
633
634/// Async transcript mutator. Registered via [`AgentBuilder::mutator`] and
635/// invoked at each [`MutationPoint`]. Mutators own their derived state
636/// (e.g. running token totals via interior mutability) and decide for
637/// themselves whether and how to modify the transcript.
638///
639/// The default implementation is a no-op so trait users override only
640/// `mutate`.
641#[async_trait]
642pub trait LoopMutator: Send + Sync {
643    /// Run this mutator. Returning without writing to `cursor` is a no-op.
644    /// Errors abort the loop; protocol-violating mutations (orphaned tool
645    /// uses or results) are detected by validation and turned into
646    /// [`LoopError::Mutator`].
647    async fn mutate(
648        &self,
649        cursor: &mut TranscriptCursor<'_>,
650        ctx: LoopCtx<'_>,
651    ) -> Result<(), LoopError> {
652        let _ = (cursor, ctx);
653        Ok(())
654    }
655}
656
657/// Lifecycle and streaming events emitted by the [`LoopDriver`].
658///
659/// Observers (see [`LoopObserver`]) receive these events in the order they
660/// occur.  They are useful for building UIs, logging, or telemetry.
661#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
662pub enum AgentEvent {
663    /// The agent run has been initialised.
664    RunStarted { session_id: SessionId },
665    /// A new model turn is starting.
666    TurnStarted {
667        session_id: SessionId,
668        turn_id: agentkit_core::TurnId,
669    },
670    /// User input has been accepted into the pending queue.
671    InputAccepted {
672        session_id: SessionId,
673        items: Vec<Item>,
674    },
675    /// Incremental content delta from the model.
676    ContentDelta(Delta),
677    /// The model has requested a tool call.
678    ToolCallRequested(ToolCallPart),
679    /// A tool call's result has landed in the transcript.
680    ///
681    /// Fires once per [`Part::ToolResult`] that's appended, including the
682    /// synthetic placeholder that's pushed when a tool is detached to the
683    /// background — the real result fires a second event with the same
684    /// [`ToolResultPart::call_id`] once the background task completes.
685    /// Cancellation/denial paths (auth cancelled, approval denied) also
686    /// emit this with `is_error = true`.
687    ///
688    /// Correlate with the matching [`AgentEvent::ToolCallRequested`] via
689    /// `call_id`.
690    ToolResultReceived(ToolResultPart),
691    /// A tool call requires explicit user approval before execution.
692    ApprovalRequired(ApprovalRequest),
693    /// An approval interrupt has been resolved.
694    ApprovalResolved { approved: bool },
695    /// The available tool catalog changed and will be reflected on the next model request.
696    ToolCatalogChanged(ToolCatalogEvent),
697    /// A [`LoopMutator`] is about to run at one of the mutation points.
698    /// `mutator` is a stable label the implementation chooses for itself.
699    MutationStarted {
700        session_id: SessionId,
701        turn_id: Option<agentkit_core::TurnId>,
702        mutator: String,
703        point: MutationPoint,
704    },
705    /// A [`LoopMutator`] has finished running. `dirty` indicates whether the
706    /// transcript was modified; `metadata` carries mutator-specific extras
707    /// (e.g. compaction reason, replaced item count).
708    MutationFinished {
709        session_id: SessionId,
710        turn_id: Option<agentkit_core::TurnId>,
711        mutator: String,
712        dirty: bool,
713        metadata: MetadataMap,
714    },
715    /// Updated token usage statistics.
716    UsageUpdated(Usage),
717    /// Non-fatal warning (e.g. a tool failure that was recovered from).
718    Warning { message: String },
719    /// The agent run has failed with an unrecoverable error.
720    RunFailed { message: String },
721    /// A turn has finished (successfully, via cancellation, etc.).
722    TurnFinished(TurnResult),
723}
724
725/// Handle for a pending approval interrupt.
726///
727/// Wraps an [`ApprovalRequest`] and provides ergonomic resolution methods
728/// so callers can resolve the interrupt directly instead of searching for
729/// the matching method on [`LoopDriver`].
730///
731/// # Example
732///
733/// ```rust,no_run
734/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
735/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
736/// match driver.next().await? {
737///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
738///         println!("Needs approval: {}", pending.request.summary);
739///         pending.approve(driver)?;
740///     }
741///     _ => {}
742/// }
743/// # Ok(())
744/// # }
745/// ```
746#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
747pub struct PendingApproval {
748    /// The underlying approval request details.
749    pub request: ApprovalRequest,
750}
751
752impl std::ops::Deref for PendingApproval {
753    type Target = ApprovalRequest;
754    fn deref(&self) -> &ApprovalRequest {
755        &self.request
756    }
757}
758
759impl PendingApproval {
760    /// Approve the pending tool call.
761    pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
762        let call_id = self
763            .request
764            .call_id
765            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
766        driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
767    }
768
769    /// Deny the pending tool call.
770    pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
771        let call_id = self
772            .request
773            .call_id
774            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
775        driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
776    }
777
778    /// Deny the pending tool call with a reason.
779    pub fn deny_with_reason<S: ModelSession>(
780        self,
781        driver: &mut LoopDriver<S>,
782        reason: impl Into<String>,
783    ) -> Result<(), LoopError> {
784        let call_id = self
785            .request
786            .call_id
787            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
788        driver.resolve_approval_for(
789            call_id,
790            ApprovalDecision::Deny {
791                reason: Some(reason.into()),
792            },
793        )
794    }
795
796    /// Approve the pending tool call with a patched input.
797    ///
798    /// The model's original tool input is replaced with `input` before the
799    /// tool executes. The transcript still records the call as the model
800    /// emitted it; only the executor sees the patched payload. This mirrors
801    /// the `PermissionResultAllow(updated_input=...)` pattern from the
802    /// Anthropic Agent SDK and is intended for hosts that want to sanitise,
803    /// restrict, or augment arguments before tool execution without forcing
804    /// the model to re-issue the call.
805    pub fn approve_with_patched_input<S: ModelSession>(
806        self,
807        driver: &mut LoopDriver<S>,
808        input: serde_json::Value,
809    ) -> Result<(), LoopError> {
810        let call_id = self
811            .request
812            .call_id
813            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
814        driver.resolve_approval_for_with_patched_input(call_id, input)
815    }
816}
817
818/// Descriptor for a [`LoopInterrupt::AwaitingInput`] interrupt.
819///
820/// Returned when the driver has no pending input and needs the host to
821/// supply items before advancing. This is the entry point for every user
822/// turn that wasn't preloaded via [`AgentBuilder::input`]. Transcript items
823/// loaded via [`AgentBuilder::transcript`] are passive, so when no input is
824/// preloaded the first [`LoopDriver::next`] call surfaces `AwaitingInput`
825/// and the host injects the first user message via [`InputRequest::submit`].
826///
827/// # Example
828///
829/// ```rust,no_run
830/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
831/// # use agentkit_core::Item;
832/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>, items: Vec<Item>) -> Result<(), agentkit_loop::LoopError> {
833/// match driver.next().await? {
834///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
835///         pending.submit(driver, items)?;
836///     }
837///     _ => {}
838/// }
839/// # Ok(())
840/// # }
841/// ```
842#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
843pub struct InputRequest {
844    /// The session that is waiting for input.
845    pub session_id: SessionId,
846    /// Human-readable explanation of why input is needed.
847    pub reason: String,
848}
849
850impl InputRequest {
851    /// Submit input items to the driver.
852    pub fn submit<S: ModelSession>(
853        self,
854        driver: &mut LoopDriver<S>,
855        items: Vec<Item>,
856    ) -> Result<(), LoopError> {
857        driver.submit_input(items)
858    }
859}
860
861/// Outcome of a completed (or cancelled) turn.
862///
863/// Wrapped by [`LoopStep::Finished`] and also emitted as
864/// [`AgentEvent::TurnFinished`] to observers.
865#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
866pub struct TurnResult {
867    /// Identifier for the turn that produced this result.
868    pub turn_id: agentkit_core::TurnId,
869    /// Why the turn ended (completed, tool call, cancelled, etc.).
870    pub finish_reason: FinishReason,
871    /// Items produced during this turn (assistant text, tool results, etc.).
872    pub items: Vec<Item>,
873    /// Aggregated token usage, if reported by the model.
874    pub usage: Option<Usage>,
875    /// Additional metadata about the turn.
876    pub metadata: MetadataMap,
877}
878
879/// An interrupt that pauses the agent loop until the host resolves it.
880///
881/// The loop returns an interrupt inside [`LoopStep::Interrupt`] whenever it
882/// cannot proceed autonomously.  Each variant carries a handle with
883/// resolution methods so callers can resolve the interrupt directly.
884///
885/// # Example
886///
887/// ```rust,no_run
888/// use agentkit_loop::{LoopInterrupt, LoopStep};
889/// # use agentkit_loop::LoopDriver;
890///
891/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
892/// match driver.next().await? {
893///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
894///         println!("Tool {} needs approval: {}", pending.request.request_kind, pending.request.summary);
895///         pending.approve(driver)?;
896///     }
897///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
898///         println!("Waiting for input: {}", pending.reason);
899///         // ... call pending.submit(driver, items)
900///     }
901///     LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => {
902///         // Cooperative yield between tool rounds.  Optionally call
903///         // driver.submit_input(...) to interject a user message, then
904///         // call driver.next() to resume the turn.
905///         let _ = info;
906///     }
907///     LoopStep::Finished(result) => {
908///         println!("Turn finished: {:?}", result.finish_reason);
909///     }
910/// }
911/// # Ok(())
912/// # }
913/// ```
914#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
915pub enum LoopInterrupt {
916    /// A tool call requires explicit approval before it can execute.
917    ApprovalRequest(PendingApproval),
918    /// The driver has no pending input and needs the host to supply some.
919    AwaitingInput(InputRequest),
920    /// A tool round finished: all tool calls from the previous assistant
921    /// message now have results in the transcript, and the driver is about to
922    /// invoke the model again. The host may interject user messages via the
923    /// [`ToolRoundInfo::submit`] handle before calling [`LoopDriver::next`]
924    /// to resume.
925    ///
926    /// This is a non-blocking interrupt: callers that do not care about
927    /// mid-turn interjection can treat it as a no-op (`_ => continue`) and
928    /// the next `next()` call resumes the turn.
929    AfterToolResult(ToolRoundInfo),
930}
931
932impl LoopInterrupt {
933    /// Returns `true` if the interrupt must be explicitly resolved before
934    /// the loop can make progress. Approvals are blocking;
935    /// [`AwaitingInput`](LoopInterrupt::AwaitingInput) and
936    /// [`AfterToolResult`](LoopInterrupt::AfterToolResult) are cooperative
937    /// and can be ignored by calling [`LoopDriver::next`] again.
938    pub fn is_blocking(&self) -> bool {
939        matches!(self, LoopInterrupt::ApprovalRequest(_))
940    }
941}
942
943/// Metadata describing a completed tool round, surfaced via
944/// [`LoopInterrupt::AfterToolResult`].
945#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
946pub struct ToolRoundInfo {
947    /// The session that produced this tool round.
948    pub session_id: SessionId,
949    /// The turn that is about to continue into the next model call.
950    pub turn_id: agentkit_core::TurnId,
951    /// Transcript length at the yield point (for snapshots / UIs).
952    pub transcript_len: usize,
953}
954
955impl ToolRoundInfo {
956    /// Interject user input between tool rounds. Consumes the
957    /// [`ToolRoundInfo`] handle so the same yield cannot accept input twice.
958    pub fn submit<S: ModelSession>(
959        self,
960        driver: &mut LoopDriver<S>,
961        items: Vec<Item>,
962    ) -> Result<(), LoopError> {
963        driver.submit_input(items)
964    }
965}
966
967/// The result of advancing the agent loop by one step.
968///
969/// Returned by [`LoopDriver::next`].  The host should pattern-match on this
970/// to decide whether to continue the loop or resolve an interrupt first.
971///
972/// # Example
973///
974/// ```rust,no_run
975/// use agentkit_loop::LoopStep;
976/// # use agentkit_loop::LoopDriver;
977///
978/// # async fn run<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
979/// loop {
980///     match driver.next().await? {
981///         LoopStep::Finished(result) => {
982///             println!("Turn complete: {:?}", result.finish_reason);
983///             break;
984///         }
985///         LoopStep::Interrupt(interrupt) => {
986///             // Resolve the interrupt, then continue the loop.
987///             # break;
988///         }
989///     }
990/// }
991/// # Ok(())
992/// # }
993/// ```
994#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
995pub enum LoopStep {
996    /// The loop is paused and requires host action.
997    Interrupt(LoopInterrupt),
998    /// A turn has completed (or been cancelled).
999    Finished(TurnResult),
1000}
1001
1002/// A read-only snapshot of the loop driver's current state.
1003///
1004/// Obtained via [`LoopDriver::snapshot`].  Useful for persisting or
1005/// inspecting the conversation transcript without holding a mutable
1006/// reference to the driver.
1007#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1008pub struct LoopSnapshot {
1009    /// Session identifier.
1010    pub session_id: SessionId,
1011    /// The full transcript accumulated so far.
1012    pub transcript: Vec<Item>,
1013    /// Input items queued but not yet consumed by a turn.
1014    pub pending_input: Vec<Item>,
1015}
1016
1017#[derive(Clone, Debug)]
1018struct PendingApprovalToolCall {
1019    request: ApprovalRequest,
1020    decision: Option<ApprovalDecision>,
1021    surfaced: bool,
1022    turn_id: agentkit_core::TurnId,
1023    task_id: TaskId,
1024    call: ToolCallPart,
1025    tool_request: ToolRequest,
1026}
1027
1028#[derive(Clone, Debug, Default)]
1029struct ActiveToolRound {
1030    turn_id: agentkit_core::TurnId,
1031    pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
1032    background_pending: bool,
1033    foreground_progressed: bool,
1034}
1035
1036/// A configured agent ready to start a session.
1037///
1038/// Build one with [`Agent::builder`], supplying at minimum a [`ModelAdapter`].
1039/// Optionally preload prior conversation state via
1040/// [`AgentBuilder::transcript`] and the next user turn via
1041/// [`AgentBuilder::input`]. Then call [`Agent::start`] with a
1042/// [`SessionConfig`] to obtain a [`LoopDriver`] that drives the agentic loop.
1043///
1044/// If no input is preloaded, the first call to [`LoopDriver::next`] yields
1045/// [`LoopInterrupt::AwaitingInput`] so the host can supply the first user
1046/// message via [`InputRequest::submit`]. If input was preloaded, the first
1047/// `next()` dispatches the model directly.
1048///
1049/// # Example
1050///
1051/// ```rust,no_run
1052/// use agentkit_core::{Item, ItemKind};
1053/// use agentkit_loop::{
1054///     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
1055/// };
1056/// use agentkit_tools_core::ToolRegistry;
1057///
1058/// # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
1059/// let agent = Agent::builder()
1060///     .model(adapter)
1061///     .add_tool_source(ToolRegistry::new())
1062///     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
1063///     .input(vec![Item::text(ItemKind::User, "Hello!")])
1064///     .build()?;
1065///
1066/// let mut driver = agent
1067///     .start(SessionConfig::new("s1").with_cache(
1068///         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
1069///     ))
1070///     .await?;
1071///
1072/// // First next() drives the model since input was preloaded.
1073/// let _ = driver.next().await?;
1074/// # Ok(())
1075/// # }
1076/// ```
1077pub struct Agent<M>
1078where
1079    M: ModelAdapter,
1080{
1081    model: M,
1082    tool_sources: Vec<Arc<dyn ToolSource>>,
1083    tool_executor: Option<Arc<dyn ToolExecutor>>,
1084    task_manager: Arc<dyn TaskManager>,
1085    permissions: Arc<dyn PermissionChecker>,
1086    resources: Arc<dyn ToolResources>,
1087    cancellation: Option<CancellationHandle>,
1088    mutators: Vec<Arc<dyn LoopMutator>>,
1089    observers: Vec<Arc<dyn LoopObserver>>,
1090    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1091    transcript: Vec<Item>,
1092    input: Vec<Item>,
1093}
1094
1095impl<M> Agent<M>
1096where
1097    M: ModelAdapter,
1098{
1099    /// Create a new [`AgentBuilder`] for configuring this agent.
1100    pub fn builder() -> AgentBuilder<M> {
1101        AgentBuilder::default()
1102    }
1103
1104    /// Start a session, returning a [`LoopDriver`] preloaded with whatever
1105    /// transcript and input were configured on the builder. See
1106    /// [`AgentBuilder::transcript`] and [`AgentBuilder::input`] for what each
1107    /// one does and when to use them.
1108    ///
1109    /// This calls [`ModelAdapter::start_session`] and emits an
1110    /// [`AgentEvent::RunStarted`] event to all registered observers.
1111    ///
1112    /// `&self` so a single configured agent can mint multiple sessions over
1113    /// its lifetime — e.g. an outer agent that uses an inner sub-agent for
1114    /// transcript compaction.
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns [`LoopError`] if the model adapter fails to create a session.
1119    pub async fn start(&self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1120        let session_id = config.session_id.clone();
1121        let default_cache = config.cache.clone();
1122        let session = self.model.start_session(config).await?;
1123        let tool_executor = self
1124            .tool_executor
1125            .clone()
1126            .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1127        let driver = LoopDriver {
1128            session_id: session_id.clone(),
1129            default_cache,
1130            next_turn_cache: None,
1131            session: Some(session),
1132            tool_executor,
1133            task_manager: self.task_manager.clone(),
1134            permissions: self.permissions.clone(),
1135            resources: self.resources.clone(),
1136            cancellation: self.cancellation.clone(),
1137            mutators: self.mutators.clone(),
1138            observers: self.observers.clone(),
1139            transcript_observers: self.transcript_observers.clone(),
1140            transcript: self.transcript.clone(),
1141            pending_input: self.input.clone(),
1142            pending_approvals: BTreeMap::new(),
1143            pending_approval_order: VecDeque::new(),
1144            active_tool_round: None,
1145            pending_round_resume: None,
1146            next_turn_index: 1,
1147            detached_call_ids: HashSet::new(),
1148        };
1149        driver.emit(AgentEvent::RunStarted { session_id });
1150        Ok(driver)
1151    }
1152}
1153
1154/// Builder for constructing an [`Agent`].
1155///
1156/// Obtained via [`Agent::builder`].  The only required field is
1157/// [`model`](AgentBuilder::model); all others have sensible defaults
1158/// (no tools, allow-all permissions, no compaction, no observers).
1159pub struct AgentBuilder<M>
1160where
1161    M: ModelAdapter,
1162{
1163    model: Option<M>,
1164    tool_sources: Vec<Arc<dyn ToolSource>>,
1165    tool_executor: Option<Arc<dyn ToolExecutor>>,
1166    task_manager: Option<Arc<dyn TaskManager>>,
1167    permissions: Arc<dyn PermissionChecker>,
1168    resources: Arc<dyn ToolResources>,
1169    cancellation: Option<CancellationHandle>,
1170    mutators: Vec<Arc<dyn LoopMutator>>,
1171    observers: Vec<Arc<dyn LoopObserver>>,
1172    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1173    transcript: Vec<Item>,
1174    input: Vec<Item>,
1175}
1176
1177impl<M> Default for AgentBuilder<M>
1178where
1179    M: ModelAdapter,
1180{
1181    fn default() -> Self {
1182        Self {
1183            model: None,
1184            tool_sources: Vec::new(),
1185            tool_executor: None,
1186            task_manager: None,
1187            permissions: Arc::new(AllowAllPermissions),
1188            resources: Arc::new(()),
1189            cancellation: None,
1190            mutators: Vec::new(),
1191            observers: Vec::new(),
1192            transcript_observers: Vec::new(),
1193            transcript: Vec::new(),
1194            input: Vec::new(),
1195        }
1196    }
1197}
1198
1199impl<M> AgentBuilder<M>
1200where
1201    M: ModelAdapter,
1202{
1203    /// Set the model adapter (required).
1204    pub fn model(mut self, model: M) -> Self {
1205        self.model = Some(model);
1206        self
1207    }
1208
1209    /// Adds a tool source to the agent. Call multiple times to compose
1210    /// federated sources — for example a frozen native [`ToolRegistry`]
1211    /// alongside an MCP manager's [`agentkit_tools_core::CatalogReader`]
1212    /// and a skill-watcher reader. Sources are walked in registration
1213    /// order; the default [`agentkit_tools_core::CollisionPolicy`] is
1214    /// `FirstWins`.
1215    ///
1216    /// Accepts any sized [`ToolSource`]; the agent owns it for the
1217    /// session. To share a dynamic source between the agent and the
1218    /// subsystem mutating it, mint a [`agentkit_tools_core::CatalogReader`]
1219    /// from a [`agentkit_tools_core::dynamic_catalog`] pair — the reader
1220    /// is sized and owned, hosts never see the underlying `Arc`.
1221    pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1222        self.tool_sources.push(Arc::new(source));
1223        self
1224    }
1225
1226    /// Set a custom [`ToolExecutor`]. When provided, the agent uses it
1227    /// instead of building a [`BasicToolExecutor`] from the configured
1228    /// sources. Most hosts should use [`add_tool_source`](Self::add_tool_source)
1229    /// instead; this is for advanced cases (custom routing, instrumentation,
1230    /// test fakes).
1231    pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1232        self.tool_executor = Some(Arc::new(executor));
1233        self
1234    }
1235
1236    /// Set the task manager that schedules tool-call execution.
1237    ///
1238    /// Defaults to [`SimpleTaskManager`], which preserves the existing
1239    /// sequential request/response behavior.
1240    pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1241        self.task_manager = Some(Arc::new(manager));
1242        self
1243    }
1244
1245    /// Set the permission checker that gates tool execution.
1246    ///
1247    /// Defaults to allowing all tool calls without prompting.
1248    pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1249        self.permissions = Arc::new(permissions);
1250        self
1251    }
1252
1253    /// Set shared resources available to tool implementations.
1254    pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1255        self.resources = Arc::new(resources);
1256        self
1257    }
1258
1259    /// Attach a [`CancellationHandle`] for cooperative cancellation of turns.
1260    pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1261        self.cancellation = Some(handle);
1262        self
1263    }
1264
1265    /// Register a [`LoopMutator`] that runs at every [`MutationPoint`].
1266    ///
1267    /// Multiple mutators may be registered; they run in registration order
1268    /// and the dirty flag propagates across the pipeline. After every pass
1269    /// in which any mutator dirtied the transcript, the loop validates
1270    /// protocol invariants (tool_use/tool_result pairing); a violation is a
1271    /// hard [`LoopError::Mutator`] failure.
1272    pub fn mutator<L: LoopMutator + 'static>(mut self, mutator: L) -> Self {
1273        self.mutators.push(Arc::new(mutator));
1274        self
1275    }
1276
1277    /// Register a [`LoopObserver`] that receives [`AgentEvent`]s.
1278    ///
1279    /// Multiple observers may be registered; they are called in order.
1280    pub fn observer<O: LoopObserver + 'static>(mut self, observer: O) -> Self {
1281        self.observers.push(Arc::new(observer));
1282        self
1283    }
1284
1285    /// Register a [`TranscriptObserver`] that receives an [`Item`] every
1286    /// time one is appended to the transcript.
1287    ///
1288    /// Multiple observers may be registered; they are called in order.
1289    /// Use this when you need a loss-free view of the transcript (e.g.
1290    /// for persistence or replication) — [`LoopObserver`] alone is
1291    /// insufficient because it doesn't expose item boundaries for model
1292    /// output and historically did not surface tool results at all.
1293    pub fn transcript_observer<O: TranscriptObserver + 'static>(mut self, observer: O) -> Self {
1294        self.transcript_observers.push(Arc::new(observer));
1295        self
1296    }
1297
1298    /// Preload the driver's transcript with prior conversation state
1299    /// (defaults to empty).
1300    ///
1301    /// Items pass straight into the driver's transcript without firing
1302    /// [`TranscriptObserver::on_item_appended`] — the host is expected to
1303    /// already know about (and have persisted) anything it preloads. Use
1304    /// this for resumed sessions or to seed a system prompt.
1305    pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1306        self.transcript = transcript;
1307        self
1308    }
1309
1310    /// Preload the driver's pending-input queue with the next user turn
1311    /// (defaults to empty).
1312    ///
1313    /// When non-empty, the first [`LoopDriver::next`] dispatches the model
1314    /// directly instead of yielding [`LoopInterrupt::AwaitingInput`]. Use
1315    /// this for one-shot calls and scripts where the first user turn is
1316    /// known up front. Items move to the transcript on turn dispatch the
1317    /// same way submitted input does, firing transcript observers.
1318    pub fn input(mut self, input: Vec<Item>) -> Self {
1319        self.input = input;
1320        self
1321    }
1322
1323    /// Consume the builder and produce an [`Agent`].
1324    ///
1325    /// # Errors
1326    ///
1327    /// Returns [`LoopError::InvalidState`] if no model adapter was provided.
1328    pub fn build(self) -> Result<Agent<M>, LoopError> {
1329        let model = self
1330            .model
1331            .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1332        Ok(Agent {
1333            model,
1334            tool_sources: self.tool_sources,
1335            tool_executor: self.tool_executor,
1336            task_manager: self
1337                .task_manager
1338                .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1339            permissions: self.permissions,
1340            resources: self.resources,
1341            cancellation: self.cancellation,
1342            mutators: self.mutators,
1343            observers: self.observers,
1344            transcript_observers: self.transcript_observers,
1345            transcript: self.transcript,
1346            input: self.input,
1347        })
1348    }
1349}
1350
1351/// The runtime driver that advances the agent loop step by step.
1352///
1353/// Obtained from [`Agent::start`] with the builder's preloaded transcript
1354/// and pending-input queue baked in.
1355/// The typical usage pattern is:
1356///
1357/// 1. Call [`next`](LoopDriver::next) to advance the loop.
1358/// 2. Handle the returned [`LoopStep`]:
1359///    - [`LoopStep::Finished`] -- the turn completed, inspect the result.
1360///    - [`LoopStep::Interrupt`] -- resolve the interrupt via the bound
1361///      [`Pending*`](LoopInterrupt) handle, then call `next` again.
1362///
1363/// # Example
1364///
1365/// ```rust,no_run
1366/// use agentkit_core::{Item, ItemKind};
1367/// use agentkit_loop::{LoopDriver, LoopStep};
1368///
1369/// # async fn drive<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1370/// let step = driver.next().await?;
1371/// match step {
1372///     LoopStep::Finished(result) => println!("Done: {:?}", result.finish_reason),
1373///     LoopStep::Interrupt(interrupt) => {
1374///         // Resolve via the pending handle, then call next() again.
1375///         println!("Interrupted: {interrupt:?}");
1376///     }
1377/// }
1378/// # Ok(())
1379/// # }
1380/// ```
1381pub struct LoopDriver<S>
1382where
1383    S: ModelSession,
1384{
1385    session_id: SessionId,
1386    default_cache: Option<PromptCacheRequest>,
1387    next_turn_cache: Option<PromptCacheRequest>,
1388    session: Option<S>,
1389    tool_executor: Arc<dyn ToolExecutor>,
1390    task_manager: Arc<dyn TaskManager>,
1391    permissions: Arc<dyn PermissionChecker>,
1392    resources: Arc<dyn ToolResources>,
1393    cancellation: Option<CancellationHandle>,
1394    mutators: Vec<Arc<dyn LoopMutator>>,
1395    observers: Vec<Arc<dyn LoopObserver>>,
1396    transcript_observers: Vec<Arc<dyn TranscriptObserver>>,
1397    transcript: Vec<Item>,
1398    pending_input: Vec<Item>,
1399    pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1400    pending_approval_order: VecDeque<ToolCallId>,
1401    active_tool_round: Option<ActiveToolRound>,
1402    pending_round_resume: Option<agentkit_core::TurnId>,
1403    next_turn_index: u64,
1404    /// Call ids whose original tool_use was already paired with a
1405    /// synthetic detach tool_result. When the real result eventually
1406    /// arrives via the task manager, we MUST NOT emit a second
1407    /// tool_result for the same id — the provider schema requires
1408    /// exactly one tool_result per tool_use. Instead we route the
1409    /// resolution into a [`ItemKind::Notification`] item that the model
1410    /// can react to on the next turn.
1411    detached_call_ids: HashSet<ToolCallId>,
1412}
1413
1414impl<S> LoopDriver<S>
1415where
1416    S: ModelSession,
1417{
1418    fn execute_tool_span(
1419        &self,
1420        request: &ToolRequest,
1421        turn_id: &agentkit_core::TurnId,
1422        launch_kind: &'static str,
1423    ) -> tracing::Span {
1424        tracing::info_span!(
1425            "agent.execute_tool",
1426            "gen_ai.tool.name" = %request.tool_name,
1427            "gen_ai.tool.call.id" = %request.call_id,
1428            session.id = %self.session_id,
1429            turn.id = %turn_id,
1430            launch_kind = launch_kind,
1431        )
1432    }
1433
1434    fn start_task_via_manager(
1435        &self,
1436        task_id: Option<TaskId>,
1437        tool_request: ToolRequest,
1438        kind: TaskLaunchKind,
1439        cancellation: Option<TurnCancellation>,
1440    ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1441    {
1442        let task_manager = self.task_manager.clone();
1443        let tool_executor = self.tool_executor.clone();
1444        let permissions = self.permissions.clone();
1445        let resources = self.resources.clone();
1446        let session_id = self.session_id.clone();
1447        let turn_id = tool_request.turn_id.clone();
1448        let metadata = tool_request.metadata.clone();
1449
1450        async move {
1451            task_manager
1452                .start_task(
1453                    TaskLaunchRequest {
1454                        task_id,
1455                        request: tool_request.clone(),
1456                        kind,
1457                    },
1458                    TaskStartContext {
1459                        executor: tool_executor,
1460                        tool_context: OwnedToolContext {
1461                            session_id,
1462                            turn_id,
1463                            metadata,
1464                            permissions,
1465                            resources,
1466                            cancellation,
1467                        },
1468                    },
1469                )
1470                .await
1471                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1472        }
1473    }
1474
1475    fn has_pending_interrupts(&self) -> bool {
1476        !self.pending_approvals.is_empty()
1477    }
1478
1479    fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1480        for event in events {
1481            self.emit(AgentEvent::ToolCatalogChanged(event));
1482        }
1483    }
1484
1485    fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1486        let call_id = task.tool_request.call_id.clone();
1487        let call = ToolCallPart {
1488            id: call_id.clone(),
1489            name: task.tool_request.tool_name.to_string(),
1490            input: task.tool_request.input.clone(),
1491            metadata: task.tool_request.metadata.clone(),
1492        };
1493        let mut request = task.approval;
1494        request.call_id = Some(call_id.clone());
1495        let pending = PendingApprovalToolCall {
1496            request: request.clone(),
1497            decision: None,
1498            surfaced: false,
1499            turn_id: turn_id.clone(),
1500            task_id: task.task_id,
1501            call,
1502            tool_request: task.tool_request,
1503        };
1504        self.pending_approvals.insert(call_id.clone(), pending);
1505        if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1506            self.pending_approval_order.push_back(call_id);
1507        }
1508        self.emit(AgentEvent::ApprovalRequired(request));
1509    }
1510
1511    fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1512        for call_id in self.pending_approval_order.clone() {
1513            let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1514                continue;
1515            };
1516            if pending.decision.is_none() && !pending.surfaced {
1517                pending.surfaced = true;
1518                return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1519                    PendingApproval {
1520                        request: pending.request.clone(),
1521                    },
1522                )));
1523            }
1524        }
1525        None
1526    }
1527
1528    fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1529        self.pending_approval_order.iter().find_map(|call_id| {
1530            self.pending_approvals.get(call_id).and_then(|pending| {
1531                pending.decision.is_none().then(|| {
1532                    LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1533                        request: pending.request.clone(),
1534                    }))
1535                })
1536            })
1537        })
1538    }
1539
1540    fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1541        let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1542            self.pending_approvals
1543                .get(call_id)
1544                .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1545        })?;
1546        self.pending_approval_order.retain(|id| id != &call_id);
1547        self.pending_approvals.remove(&call_id)
1548    }
1549
1550    fn queue_resolution_interrupt(
1551        &mut self,
1552        turn_id: &agentkit_core::TurnId,
1553        resolution: TaskResolution,
1554    ) -> Option<LoopStep> {
1555        match resolution {
1556            TaskResolution::Item(item) => {
1557                self.append_tool_result_item(item);
1558                None
1559            }
1560            TaskResolution::Approval(task) => {
1561                self.enqueue_pending_approval(turn_id, task);
1562                self.take_next_unsurfaced_approval_interrupt()
1563            }
1564        }
1565    }
1566
1567    async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1568        let PendingLoopUpdates { mut resolutions } = self
1569            .task_manager
1570            .take_pending_loop_updates()
1571            .await
1572            .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1573        let mut saw_items = false;
1574        while let Some(resolution) = resolutions.pop_front() {
1575            match resolution {
1576                TaskResolution::Item(item) => {
1577                    self.append_tool_result_item(item);
1578                    saw_items = true;
1579                }
1580                TaskResolution::Approval(task) => {
1581                    self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1582                }
1583            }
1584        }
1585        Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1586    }
1587
1588    async fn run_mutators(
1589        &mut self,
1590        point: MutationPoint,
1591        turn_id: Option<&agentkit_core::TurnId>,
1592        cancellation: Option<TurnCancellation>,
1593    ) -> Result<(), LoopError> {
1594        if self.mutators.is_empty() {
1595            return Ok(());
1596        }
1597        if cancellation
1598            .as_ref()
1599            .is_some_and(TurnCancellation::is_cancelled)
1600        {
1601            return Err(LoopError::Cancelled);
1602        }
1603        let mutators = self.mutators.clone();
1604        let session_id = self.session_id.clone();
1605        let observers = self.observers.clone();
1606        let emitter = DriverEmitter {
1607            observers: &observers,
1608        };
1609        let mut cursor = TranscriptCursor {
1610            items: &mut self.transcript,
1611            dirty: false,
1612        };
1613        for mutator in &mutators {
1614            if cancellation
1615                .as_ref()
1616                .is_some_and(TurnCancellation::is_cancelled)
1617            {
1618                return Err(LoopError::Cancelled);
1619            }
1620            let ctx = LoopCtx {
1621                session_id: &session_id,
1622                turn_id,
1623                point,
1624                cancellation: cancellation.clone(),
1625                emitter: &emitter,
1626            };
1627            mutator.mutate(&mut cursor, ctx).await?;
1628        }
1629        if cursor.dirty {
1630            validate_transcript_invariants(cursor.items)?;
1631        }
1632        Ok(())
1633    }
1634
1635    async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1636        let Some(_) = self.active_tool_round.as_ref() else {
1637            return Ok(None);
1638        };
1639        loop {
1640            let cancellation = self
1641                .cancellation
1642                .as_ref()
1643                .map(CancellationHandle::checkpoint);
1644            let turn_id = self
1645                .active_tool_round
1646                .as_ref()
1647                .map(|active| active.turn_id.clone())
1648                .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1649
1650            if cancellation
1651                .as_ref()
1652                .is_some_and(TurnCancellation::is_cancelled)
1653            {
1654                self.task_manager
1655                    .on_turn_interrupted(&turn_id)
1656                    .await
1657                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1658                self.active_tool_round = None;
1659                return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1660            }
1661
1662            let next_call = self
1663                .active_tool_round
1664                .as_mut()
1665                .and_then(|active| active.pending_calls.pop_front());
1666            if let Some((_call, tool_request)) = next_call {
1667                use tracing::Instrument;
1668                let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1669                match self
1670                    .start_task_via_manager(
1671                        None,
1672                        tool_request.clone(),
1673                        TaskLaunchKind::Plain,
1674                        cancellation.clone(),
1675                    )
1676                    .instrument(dispatch_span)
1677                    .await?
1678                {
1679                    TaskStartOutcome::Ready(resolution) => {
1680                        let resolution = *resolution;
1681                        match resolution {
1682                            TaskResolution::Item(item) => {
1683                                if let Some(active) = self.active_tool_round.as_mut() {
1684                                    active.foreground_progressed = true;
1685                                }
1686                                self.append_tool_result_item(item);
1687                            }
1688                            TaskResolution::Approval(task) => {
1689                                self.enqueue_pending_approval(&turn_id, task);
1690                            }
1691                        }
1692                        continue;
1693                    }
1694                    TaskStartOutcome::Pending { kind, .. } => {
1695                        if kind == agentkit_task_manager::TaskKind::Background
1696                            && let Some(active) = self.active_tool_round.as_mut()
1697                        {
1698                            active.background_pending = true;
1699                        }
1700                        continue;
1701                    }
1702                }
1703            }
1704
1705            match self
1706                .task_manager
1707                .wait_for_turn(&turn_id, cancellation.clone())
1708                .await
1709                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1710            {
1711                Some(TurnTaskUpdate::Resolution(resolution)) => {
1712                    let resolution = *resolution;
1713                    match resolution {
1714                        TaskResolution::Item(item) => {
1715                            if let Some(active) = self.active_tool_round.as_mut() {
1716                                active.foreground_progressed = true;
1717                            }
1718                            self.append_tool_result_item(item);
1719                        }
1720                        TaskResolution::Approval(task) => {
1721                            self.enqueue_pending_approval(&turn_id, task);
1722                        }
1723                    }
1724                }
1725                Some(TurnTaskUpdate::Detached(snapshot)) => {
1726                    // The task was promoted to background. Push a synthetic
1727                    // tool result so the model knows the call is still
1728                    // running and can continue its turn. Track the
1729                    // call_id so when the real result arrives later via
1730                    // the task manager, we route it to a Notification
1731                    // item instead of emitting a second tool_result for
1732                    // the same call_id (which would violate the
1733                    // provider schema — exactly one tool_result per
1734                    // tool_use).
1735                    // Order matters: append the synthetic placeholder FIRST as
1736                    // a real Tool/ToolResult so the tool_use slot is filled
1737                    // (provider schemas require exactly one tool_result per
1738                    // tool_use). Only AFTER appending do we record the
1739                    // call_id in `detached_call_ids` — so the *next* item
1740                    // for this call_id (the real completion arriving later
1741                    // via the task manager) is the one converted to a
1742                    // Notification by `maybe_convert_detached`.
1743                    let detached_call_id = snapshot.call_id.clone();
1744                    self.append_tool_result_item(Item {
1745                        id: None,
1746                        kind: ItemKind::Tool,
1747                        parts: vec![Part::ToolResult(ToolResultPart {
1748                            call_id: detached_call_id.clone(),
1749                            output: ToolOutput::Text(format!(
1750                                "Tool {} is now running in the background. \
1751                                 The result will be delivered when it completes.",
1752                                snapshot.tool_name,
1753                            )),
1754                            is_error: false,
1755                            metadata: MetadataMap::new(),
1756                        })],
1757                        metadata: MetadataMap::new(),
1758                        usage: None,
1759                        finish_reason: None,
1760                        created_at: None,
1761                    });
1762                    self.detached_call_ids.insert(detached_call_id);
1763                    if let Some(active) = self.active_tool_round.as_mut() {
1764                        active.background_pending = true;
1765                        active.foreground_progressed = true;
1766                    }
1767                }
1768                None => {
1769                    if cancellation
1770                        .as_ref()
1771                        .is_some_and(TurnCancellation::is_cancelled)
1772                    {
1773                        self.task_manager
1774                            .on_turn_interrupted(&turn_id)
1775                            .await
1776                            .map_err(|error| {
1777                                LoopError::Tool(ToolError::Internal(error.to_string()))
1778                            })?;
1779                        self.active_tool_round = None;
1780                        return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1781                    }
1782                    let active = self.active_tool_round.take().ok_or_else(|| {
1783                        LoopError::InvalidState("missing active tool round".into())
1784                    })?;
1785                    if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1786                        return Ok(Some(step));
1787                    }
1788                    if let Some(step) = self.next_unresolved_approval_interrupt() {
1789                        return Ok(Some(step));
1790                    }
1791                    if active.background_pending && !active.foreground_progressed {
1792                        return Ok(None);
1793                    }
1794                    // Yield control back to the host between tool rounds.
1795                    // All tool calls in this round have results in the
1796                    // transcript; the transcript is provider-valid.  The
1797                    // host may submit_input before calling next() to
1798                    // resume, which will re-enter drive_turn via
1799                    // pending_round_resume.
1800                    let info = ToolRoundInfo {
1801                        session_id: self.session_id.clone(),
1802                        turn_id: turn_id.clone(),
1803                        transcript_len: self.transcript.len(),
1804                    };
1805                    self.pending_round_resume = Some(turn_id);
1806                    return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1807                        info,
1808                    ))));
1809                }
1810            }
1811        }
1812    }
1813
1814    #[tracing::instrument(
1815        name = "agent.turn",
1816        skip_all,
1817        fields(
1818            session.id = %self.session_id,
1819            turn.id = %turn_id,
1820            transcript.len = self.transcript.len(),
1821            saw_tool_call = tracing::field::Empty,
1822            finish_reason = tracing::field::Empty,
1823        ),
1824    )]
1825    async fn drive_turn(
1826        &mut self,
1827        turn_id: agentkit_core::TurnId,
1828        emit_started: bool,
1829    ) -> Result<LoopStep, LoopError> {
1830        let cancellation = self
1831            .cancellation
1832            .as_ref()
1833            .map(CancellationHandle::checkpoint);
1834        let mutation_point = if self.pending_round_resume.is_some() {
1835            MutationPoint::AfterToolResult
1836        } else {
1837            MutationPoint::AfterTurnEnded
1838        };
1839        match self
1840            .run_mutators(mutation_point, Some(&turn_id), cancellation.clone())
1841            .await
1842        {
1843            Ok(()) => {}
1844            Err(LoopError::Cancelled) => {
1845                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1846            }
1847            Err(error) => return Err(error),
1848        }
1849        if emit_started {
1850            self.emit(AgentEvent::TurnStarted {
1851                session_id: self.session_id.clone(),
1852                turn_id: turn_id.clone(),
1853            });
1854        }
1855        if cancellation
1856            .as_ref()
1857            .is_some_and(TurnCancellation::is_cancelled)
1858        {
1859            return self.finish_cancelled(turn_id, interrupted_assistant_items());
1860        }
1861
1862        let catalog_events = self.tool_executor.drain_catalog_events();
1863        self.emit_tool_catalog_events(catalog_events);
1864
1865        let request = TurnRequest {
1866            session_id: self.session_id.clone(),
1867            turn_id: turn_id.clone(),
1868            transcript: self.transcript.clone(),
1869            available_tools: self.tool_executor.specs(),
1870            cache: self
1871                .next_turn_cache
1872                .take()
1873                .or_else(|| self.default_cache.clone()),
1874            metadata: MetadataMap::new(),
1875        };
1876
1877        let session = self
1878            .session
1879            .as_mut()
1880            .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1881        let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1882            Ok(turn) => turn,
1883            Err(LoopError::Cancelled) => {
1884                self.task_manager
1885                    .on_turn_interrupted(&turn_id)
1886                    .await
1887                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1888                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1889            }
1890            Err(error) => return Err(error),
1891        };
1892        let mut saw_tool_call = false;
1893        let mut finished_result = None;
1894
1895        while let Some(event) = match turn.next_event(cancellation.clone()).await {
1896            Ok(event) => event,
1897            Err(LoopError::Cancelled) => {
1898                self.task_manager
1899                    .on_turn_interrupted(&turn_id)
1900                    .await
1901                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1902                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1903            }
1904            Err(error) => return Err(error),
1905        } {
1906            if cancellation
1907                .as_ref()
1908                .is_some_and(TurnCancellation::is_cancelled)
1909            {
1910                self.task_manager
1911                    .on_turn_interrupted(&turn_id)
1912                    .await
1913                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1914                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1915            }
1916            match event {
1917                ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1918                ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1919                ModelTurnEvent::ToolCall(call) => {
1920                    saw_tool_call = true;
1921                    self.emit(AgentEvent::ToolCallRequested(call.clone()));
1922                }
1923                ModelTurnEvent::Finished(result) => {
1924                    finished_result = Some(result);
1925                    break;
1926                }
1927            }
1928        }
1929
1930        let mut result = finished_result.ok_or_else(|| {
1931            LoopError::Provider("model turn ended without a Finished event".into())
1932        })?;
1933        tracing::Span::current().record("saw_tool_call", saw_tool_call);
1934        tracing::Span::current().record(
1935            "finish_reason",
1936            tracing::field::debug(&result.finish_reason),
1937        );
1938        let now = Timestamp::now();
1939        let usage = result.usage.clone();
1940        let finish_reason = result.finish_reason.clone();
1941        let output_items: Vec<Item> = result
1942            .output_items
1943            .drain(..)
1944            .map(|mut item| {
1945                if matches!(item.kind, ItemKind::Assistant) {
1946                    if item.usage.is_none() {
1947                        item.usage = usage.clone();
1948                    }
1949                    if item.finish_reason.is_none() {
1950                        item.finish_reason = Some(finish_reason.clone());
1951                    }
1952                }
1953                if item.created_at.is_none() {
1954                    item.created_at = Some(now);
1955                }
1956                item
1957            })
1958            .collect();
1959        self.extend_transcript(output_items.clone());
1960
1961        if saw_tool_call {
1962            let pending_calls = extract_tool_calls(&output_items)
1963                .into_iter()
1964                .map(|call| {
1965                    let tool_request = ToolRequest {
1966                        call_id: call.id.clone(),
1967                        tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1968                        input: call.input.clone(),
1969                        session_id: self.session_id.clone(),
1970                        turn_id: turn_id.clone(),
1971                        metadata: call.metadata.clone(),
1972                    };
1973                    (call, tool_request)
1974                })
1975                .collect();
1976            self.active_tool_round = Some(ActiveToolRound {
1977                turn_id: turn_id.clone(),
1978                pending_calls,
1979                background_pending: false,
1980                foreground_progressed: false,
1981            });
1982            if let Some(step) = self.continue_active_tool_round().await? {
1983                return Ok(step);
1984            }
1985            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1986                InputRequest {
1987                    session_id: self.session_id.clone(),
1988                    reason: "driver is waiting for input".into(),
1989                },
1990            )));
1991        }
1992
1993        let turn_result = TurnResult {
1994            turn_id,
1995            finish_reason: result.finish_reason,
1996            items: output_items,
1997            usage: result.usage,
1998            metadata: result.metadata,
1999        };
2000        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2001        Ok(LoopStep::Finished(turn_result))
2002    }
2003
2004    async fn resume_after_approval(
2005        &mut self,
2006        pending: PendingApprovalToolCall,
2007    ) -> Result<LoopStep, LoopError> {
2008        let decision = pending
2009            .decision
2010            .clone()
2011            .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
2012
2013        match decision {
2014            ApprovalDecision::Approve => {
2015                use tracing::Instrument;
2016                let dispatch_span =
2017                    self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
2018                match self
2019                    .start_task_via_manager(
2020                        Some(pending.task_id.clone()),
2021                        pending.tool_request.clone(),
2022                        TaskLaunchKind::Approved(pending.request.clone()),
2023                        self.cancellation
2024                            .as_ref()
2025                            .map(CancellationHandle::checkpoint),
2026                    )
2027                    .instrument(dispatch_span)
2028                    .await?
2029                {
2030                    TaskStartOutcome::Ready(resolution) => {
2031                        let resolution = *resolution;
2032                        if let Some(step) =
2033                            self.queue_resolution_interrupt(&pending.turn_id, resolution)
2034                        {
2035                            return Ok(step);
2036                        }
2037                    }
2038                    TaskStartOutcome::Pending { .. } => {}
2039                }
2040            }
2041            ApprovalDecision::Deny { reason } => {
2042                self.append_tool_result_item(Item {
2043                    id: None,
2044                    kind: ItemKind::Tool,
2045                    parts: vec![Part::ToolResult(ToolResultPart {
2046                        call_id: pending.call.id.clone(),
2047                        output: ToolOutput::Text(
2048                            reason.unwrap_or_else(|| "approval denied".into()),
2049                        ),
2050                        is_error: true,
2051                        metadata: pending.call.metadata.clone(),
2052                    })],
2053                    metadata: MetadataMap::new(),
2054                    usage: None,
2055                    finish_reason: None,
2056                    created_at: None,
2057                });
2058            }
2059        }
2060
2061        if let Some(step) = self.continue_active_tool_round().await? {
2062            Ok(step)
2063        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2064            Ok(step)
2065        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
2066            Ok(step)
2067        } else {
2068            self.drive_turn(pending.turn_id, false).await
2069        }
2070    }
2071
2072    fn finish_cancelled(
2073        &mut self,
2074        turn_id: agentkit_core::TurnId,
2075        items: Vec<Item>,
2076    ) -> Result<LoopStep, LoopError> {
2077        self.extend_transcript(items.clone());
2078        let turn_result = TurnResult {
2079            turn_id,
2080            finish_reason: FinishReason::Cancelled,
2081            items,
2082            usage: None,
2083            metadata: interrupted_metadata("turn"),
2084        };
2085        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
2086        Ok(LoopStep::Finished(turn_result))
2087    }
2088
2089    /// Internal entry point for buffering user input. Reachable only via
2090    /// [`InputRequest::submit`] (resolves an `AwaitingInput` interrupt,
2091    /// including the very first one after [`Agent::start`]) and
2092    /// [`ToolRoundInfo::submit`] (interjects between tool rounds). Prior
2093    /// transcript items — the passive starting state of a session — are
2094    /// preloaded via [`AgentBuilder::transcript`]; an opening user turn for
2095    /// one-shot calls is preloaded via [`AgentBuilder::input`]. New input
2096    /// after start-up always flows through one of the typed `submit`
2097    /// handles.
2098    pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
2099        if self.has_pending_interrupts() {
2100            return Err(LoopError::InvalidState(
2101                "cannot submit input while an interrupt is pending".into(),
2102            ));
2103        }
2104        self.emit(AgentEvent::InputAccepted {
2105            session_id: self.session_id.clone(),
2106            items: input.clone(),
2107        });
2108        self.pending_input.extend(input);
2109        Ok(())
2110    }
2111
2112    /// Override the prompt cache request for the next model turn.
2113    ///
2114    /// The override is consumed the next time the driver starts a model turn.
2115    /// Session-level defaults still apply to later turns.
2116    pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2117        if self.has_pending_interrupts() {
2118            return Err(LoopError::InvalidState(
2119                "cannot update next-turn cache while an interrupt is pending".into(),
2120            ));
2121        }
2122        self.next_turn_cache = Some(cache);
2123        Ok(())
2124    }
2125
2126    #[cfg(test)]
2127    pub(crate) fn submit_input_with_cache(
2128        &mut self,
2129        input: Vec<Item>,
2130        cache: PromptCacheRequest,
2131    ) -> Result<(), LoopError> {
2132        self.set_next_turn_cache(cache)?;
2133        self.submit_input(input)
2134    }
2135
2136    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`].
2137    ///
2138    /// After calling this, invoke [`next`](LoopDriver::next) to continue the
2139    /// loop.  If the decision is [`ApprovalDecision::Approve`] the tool call
2140    /// executes; if denied, an error result is fed back to the model.
2141    ///
2142    /// # Errors
2143    ///
2144    /// Returns [`LoopError::InvalidState`] if no approval is pending.
2145    pub fn resolve_approval_for(
2146        &mut self,
2147        call_id: ToolCallId,
2148        decision: ApprovalDecision,
2149    ) -> Result<(), LoopError> {
2150        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2151            return Err(LoopError::InvalidState(format!(
2152                "no approval request is pending for call {}",
2153                call_id.0
2154            )));
2155        };
2156        pending.decision = Some(decision.clone());
2157        self.emit(AgentEvent::ApprovalResolved {
2158            approved: matches!(decision, ApprovalDecision::Approve),
2159        });
2160        Ok(())
2161    }
2162
2163    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] with a patched
2164    /// input that replaces the model's original tool arguments.
2165    ///
2166    /// Equivalent to calling [`resolve_approval_for`] with
2167    /// [`ApprovalDecision::Approve`] except the tool sees `input` instead of
2168    /// what the model emitted. The transcript still records the model's
2169    /// original call.
2170    ///
2171    /// # Errors
2172    ///
2173    /// Returns [`LoopError::InvalidState`] if no approval is pending for
2174    /// `call_id`.
2175    pub fn resolve_approval_for_with_patched_input(
2176        &mut self,
2177        call_id: ToolCallId,
2178        input: serde_json::Value,
2179    ) -> Result<(), LoopError> {
2180        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2181            return Err(LoopError::InvalidState(format!(
2182                "no approval request is pending for call {}",
2183                call_id.0
2184            )));
2185        };
2186        pending.tool_request.input = input;
2187        self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2188    }
2189
2190    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] when exactly one
2191    /// approval is outstanding.
2192    pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2193        let mut unresolved = self
2194            .pending_approval_order
2195            .iter()
2196            .filter(|call_id| {
2197                self.pending_approvals
2198                    .get(*call_id)
2199                    .is_some_and(|pending| pending.decision.is_none())
2200            })
2201            .cloned();
2202        let Some(call_id) = unresolved.next() else {
2203            return Err(LoopError::InvalidState(
2204                "no approval request is pending".into(),
2205            ));
2206        };
2207        if unresolved.next().is_some() {
2208            return Err(LoopError::InvalidState(
2209                "multiple approvals are pending; use resolve_approval_for".into(),
2210            ));
2211        }
2212        self.resolve_approval_for(call_id, decision)
2213    }
2214
2215    /// Take a read-only snapshot of the driver's current transcript and input queue.
2216    pub fn snapshot(&self) -> LoopSnapshot {
2217        LoopSnapshot {
2218            session_id: self.session_id.clone(),
2219            transcript: self.transcript.clone(),
2220            pending_input: self.pending_input.clone(),
2221        }
2222    }
2223
2224    /// Advance the loop by one step.
2225    ///
2226    /// This is the main method for driving the agent.  It processes pending
2227    /// interrupt resolutions, consumes queued input, starts a model turn,
2228    /// executes tool calls, and returns once the turn finishes or an
2229    /// interrupt occurs.
2230    ///
2231    /// If no input is queued and no interrupt is pending, returns
2232    /// [`LoopStep::Interrupt(LoopInterrupt::AwaitingInput(..))`](LoopInterrupt::AwaitingInput).
2233    /// This is the steady state after [`Agent::start`] when no input was
2234    /// preloaded via [`AgentBuilder::input`]: the prior transcript loaded
2235    /// via [`AgentBuilder::transcript`] is passive, so the first call
2236    /// surfaces `AwaitingInput` and waits for the host to supply input via
2237    /// [`InputRequest::submit`] before any model turn is dispatched. If
2238    /// input was preloaded, the first call dispatches the model directly.
2239    ///
2240    /// # Errors
2241    ///
2242    /// Returns [`LoopError::InvalidState`] if called while an unresolved
2243    /// interrupt is pending, or propagates provider / tool / compaction errors.
2244    pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2245        if let Some(pending) = self.take_next_resolved_approval() {
2246            return self.resume_after_approval(pending).await;
2247        }
2248
2249        if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2250            return Ok(step);
2251        }
2252
2253        if let Some(step) = self.next_unresolved_approval_interrupt() {
2254            return Ok(step);
2255        }
2256
2257        if let Some(step) = self.continue_active_tool_round().await? {
2258            return Ok(step);
2259        }
2260
2261        let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2262        if let Some(step) = loop_step {
2263            return Ok(step);
2264        }
2265
2266        // Resume after an AfterToolResult yield.  Any input submitted by the
2267        // host during the yield is folded into the transcript as part of the
2268        // continuation turn; background task results drained just above are
2269        // already in the transcript.
2270        if let Some(turn_id) = self.pending_round_resume.take() {
2271            let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2272            self.extend_transcript(drained);
2273            return self.drive_turn(turn_id, false).await;
2274        }
2275
2276        if self.pending_input.is_empty() && !had_loop_updates {
2277            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2278                InputRequest {
2279                    session_id: self.session_id.clone(),
2280                    reason: "driver is waiting for input".into(),
2281                },
2282            )));
2283        }
2284
2285        let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2286        self.next_turn_index += 1;
2287        let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2288        self.extend_transcript(drained);
2289        self.drive_turn(turn_id, true).await
2290    }
2291
2292    fn emit(&self, event: AgentEvent) {
2293        for observer in &self.observers {
2294            observer.handle_event(event.clone());
2295        }
2296    }
2297
2298    /// Append a single [`Item`] to the transcript and notify all
2299    /// registered [`TranscriptObserver`]s. The single mutation point —
2300    /// every push to `self.transcript` should funnel through here so
2301    /// observers see exactly what landed in the transcript.
2302    fn append_item(&mut self, mut item: Item) {
2303        if item.created_at.is_none() {
2304            item.created_at = Some(Timestamp::now());
2305        }
2306        for observer in &self.transcript_observers {
2307            observer.on_item_appended(&item);
2308        }
2309        self.transcript.push(item);
2310    }
2311
2312    /// Append a tool-result Item: emit one [`AgentEvent::ToolResultReceived`]
2313    /// per [`Part::ToolResult`] inside the Item, then funnel through
2314    /// [`Self::append_item`].
2315    ///
2316    /// If every `ToolResult` in the item references a `call_id` that was
2317    /// already paired with a synthetic detach tool_result, the item is
2318    /// converted to a [`ItemKind::Notification`] before appending.
2319    /// Without this, we would emit a second `tool_result` for the same
2320    /// `tool_use_id` — a provider-schema violation that
2321    /// Anthropic/OpenRouter reject as an "orphaned tool_result".
2322    /// Observers still see `ToolResultReceived` for each result so any
2323    /// UI spinner or task tracker can close.
2324    fn append_tool_result_item(&mut self, item: Item) {
2325        for part in &item.parts {
2326            if let Part::ToolResult(result) = part {
2327                self.emit(AgentEvent::ToolResultReceived(result.clone()));
2328            }
2329        }
2330        let item = self.maybe_convert_detached(item);
2331        self.append_item(item);
2332    }
2333
2334    fn maybe_convert_detached(&mut self, item: Item) -> Item {
2335        if !matches!(item.kind, ItemKind::Tool) {
2336            return item;
2337        }
2338        let results: Vec<&ToolResultPart> = item
2339            .parts
2340            .iter()
2341            .filter_map(|p| match p {
2342                Part::ToolResult(r) => Some(r),
2343                _ => None,
2344            })
2345            .collect();
2346        if results.is_empty()
2347            || !results
2348                .iter()
2349                .all(|r| self.detached_call_ids.contains(&r.call_id))
2350        {
2351            return item;
2352        }
2353        let mut text = String::new();
2354        for result in &results {
2355            self.detached_call_ids.remove(&result.call_id);
2356            if !text.is_empty() {
2357                text.push_str("\n\n");
2358            }
2359            let label = if result.is_error {
2360                "failed"
2361            } else {
2362                "completed"
2363            };
2364            let body = render_tool_output_brief(&result.output);
2365            text.push_str(&format!(
2366                "Background tool call {} {}: {body}",
2367                result.call_id.0, label
2368            ));
2369        }
2370        Item::notification(text)
2371    }
2372
2373    /// Append several Items in order through [`Self::append_item`].
2374    /// Pre-stamps `created_at` once per batch so all items in the batch
2375    /// share a timestamp and `append_item` skips its own clock read.
2376    fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2377        let now = Timestamp::now();
2378        for mut item in items {
2379            if item.created_at.is_none() {
2380                item.created_at = Some(now);
2381            }
2382            self.append_item(item);
2383        }
2384    }
2385}
2386
2387fn render_tool_output_brief(output: &ToolOutput) -> String {
2388    match output {
2389        ToolOutput::Text(t) => t.clone(),
2390        ToolOutput::Structured(value) => value.to_string(),
2391        ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2392        ToolOutput::Files(files) => format!("[{} files]", files.len()),
2393    }
2394}
2395
2396fn interrupted_metadata(stage: &str) -> MetadataMap {
2397    let mut metadata = MetadataMap::new();
2398    metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2399    metadata.insert(
2400        INTERRUPT_REASON_METADATA_KEY.into(),
2401        USER_CANCELLED_REASON.into(),
2402    );
2403    metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2404    metadata
2405}
2406
2407fn interrupted_assistant_items() -> Vec<Item> {
2408    vec![Item {
2409        id: None,
2410        kind: ItemKind::Assistant,
2411        parts: vec![Part::Text(TextPart {
2412            text: "Previous assistant response was interrupted by the user before completion."
2413                .into(),
2414            metadata: interrupted_metadata("assistant"),
2415        })],
2416        metadata: interrupted_metadata("assistant"),
2417        usage: None,
2418        finish_reason: None,
2419        created_at: None,
2420    }]
2421}
2422
2423fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2424    let mut calls = Vec::new();
2425    for item in items {
2426        for part in &item.parts {
2427            if let Part::ToolCall(call) = part {
2428                calls.push(call.clone());
2429            }
2430        }
2431    }
2432    calls
2433}
2434
2435/// Errors that can occur while driving the agent loop.
2436#[derive(Debug, Error)]
2437pub enum LoopError {
2438    /// The driver was in an unexpected state for the requested operation.
2439    #[error("invalid driver state: {0}")]
2440    InvalidState(String),
2441    /// The current turn was cancelled via the [`CancellationHandle`].
2442    #[error("turn cancelled")]
2443    Cancelled,
2444    /// An error originating from the model provider.
2445    #[error("provider error: {0}")]
2446    Provider(String),
2447    /// An error originating from tool execution.
2448    #[error("tool error: {0}")]
2449    Tool(#[from] ToolError),
2450    /// An error reported by a [`LoopMutator`] (compaction, redaction, repair).
2451    #[error("mutator error: {0}")]
2452    Mutator(String),
2453    /// The requested operation is not supported.
2454    #[error("unsupported operation: {0}")]
2455    Unsupported(String),
2456}
2457
2458/// Internal [`EventEmitter`] backed by the driver's observer slice. Lives
2459/// only for the duration of a [`LoopDriver::run_mutators`] call so the
2460/// borrow against `self.observers` stays disjoint from the cursor's borrow
2461/// of `self.transcript`.
2462struct DriverEmitter<'a> {
2463    observers: &'a [Arc<dyn LoopObserver>],
2464}
2465
2466impl<'a> EventEmitter for DriverEmitter<'a> {
2467    fn emit(&self, event: AgentEvent) {
2468        for observer in self.observers {
2469            observer.handle_event(event.clone());
2470        }
2471    }
2472}
2473
2474/// Hard-fails when a mutator's edit leaves the transcript protocol-invalid.
2475/// The only invariant currently checked is tool_use ↔ tool_result pairing
2476/// — every [`Part::ToolCall`] must be followed (in transcript order) by a
2477/// matching [`Part::ToolResult`] with the same `call_id`.
2478fn validate_transcript_invariants(transcript: &[Item]) -> Result<(), LoopError> {
2479    let mut pending: HashSet<ToolCallId> = HashSet::new();
2480    let mut seen_calls: HashSet<ToolCallId> = HashSet::new();
2481    let mut seen_results: HashSet<ToolCallId> = HashSet::new();
2482    for item in transcript {
2483        for part in &item.parts {
2484            match part {
2485                Part::ToolCall(call) => {
2486                    if !seen_calls.insert(call.id.clone()) {
2487                        return Err(LoopError::Mutator(format!(
2488                            "transcript invariant violation: duplicate tool_use: {}",
2489                            call.id.0
2490                        )));
2491                    }
2492                    pending.insert(call.id.clone());
2493                }
2494                Part::ToolResult(result) => {
2495                    if !pending.remove(&result.call_id) {
2496                        let kind = if seen_results.contains(&result.call_id) {
2497                            "duplicate"
2498                        } else {
2499                            "orphaned"
2500                        };
2501                        return Err(LoopError::Mutator(format!(
2502                            "transcript invariant violation: {kind} tool_result: {}",
2503                            result.call_id.0
2504                        )));
2505                    }
2506                    seen_results.insert(result.call_id.clone());
2507                }
2508                _ => {}
2509            }
2510        }
2511    }
2512    if !pending.is_empty() {
2513        let missing: Vec<String> = pending.into_iter().map(|id| id.0).collect();
2514        return Err(LoopError::Mutator(format!(
2515            "transcript invariant violation: tool_use(s) without matching tool_result: {}",
2516            missing.join(", ")
2517        )));
2518    }
2519    Ok(())
2520}
2521
2522#[cfg(test)]
2523mod tests {
2524    use std::collections::VecDeque;
2525    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2526    use std::sync::{Arc as StdArc, Mutex as StdMutex};
2527
2528    use agentkit_core::{
2529        CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolCallPart, ToolOutput,
2530        ToolResultPart,
2531    };
2532    use agentkit_task_manager::{
2533        AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2534        TaskRoutingPolicy,
2535    };
2536    use agentkit_tools_core::{
2537        FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2538        ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2539        ToolResult, ToolSpec,
2540    };
2541    use serde_json::{Value, json};
2542    use tokio::sync::Notify;
2543    use tokio::time::{Duration, timeout};
2544
2545    use super::*;
2546
2547    struct FakeAdapter;
2548    struct SlowAdapter;
2549    struct RecordingAdapter {
2550        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2551        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2552    }
2553    struct MultiToolAdapter;
2554    struct DualApprovalAdapter;
2555
2556    struct FakeSession;
2557    struct SlowSession;
2558    struct RecordingSession {
2559        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2560        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2561    }
2562    struct MultiToolSession;
2563    struct DualApprovalSession;
2564
2565    struct FakeTurn {
2566        events: VecDeque<ModelTurnEvent>,
2567    }
2568
2569    struct SlowTurn {
2570        emitted: bool,
2571    }
2572
2573    struct RecordingTurn {
2574        emitted: bool,
2575    }
2576    struct MultiToolTurn {
2577        events: VecDeque<ModelTurnEvent>,
2578    }
2579    struct DualApprovalTurn {
2580        events: VecDeque<ModelTurnEvent>,
2581    }
2582
2583    #[async_trait]
2584    impl ModelAdapter for FakeAdapter {
2585        type Session = FakeSession;
2586
2587        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2588            Ok(FakeSession)
2589        }
2590    }
2591
2592    #[async_trait]
2593    impl ModelAdapter for SlowAdapter {
2594        type Session = SlowSession;
2595
2596        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2597            Ok(SlowSession)
2598        }
2599    }
2600
2601    #[async_trait]
2602    impl ModelAdapter for RecordingAdapter {
2603        type Session = RecordingSession;
2604
2605        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2606            Ok(RecordingSession {
2607                seen_descriptions: self.seen_descriptions.clone(),
2608                seen_caches: self.seen_caches.clone(),
2609            })
2610        }
2611    }
2612
2613    #[async_trait]
2614    impl ModelAdapter for MultiToolAdapter {
2615        type Session = MultiToolSession;
2616
2617        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2618            Ok(MultiToolSession)
2619        }
2620    }
2621
2622    #[async_trait]
2623    impl ModelAdapter for DualApprovalAdapter {
2624        type Session = DualApprovalSession;
2625
2626        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2627            Ok(DualApprovalSession)
2628        }
2629    }
2630
2631    #[async_trait]
2632    impl ModelSession for FakeSession {
2633        type Turn = FakeTurn;
2634
2635        async fn begin_turn(
2636            &mut self,
2637            request: TurnRequest,
2638            _cancellation: Option<TurnCancellation>,
2639        ) -> Result<Self::Turn, LoopError> {
2640            let has_tool_result = request.transcript.iter().any(|item| {
2641                item.kind == ItemKind::Tool
2642                    && item
2643                        .parts
2644                        .iter()
2645                        .any(|part| matches!(part, Part::ToolResult(_)))
2646            });
2647            let tool_name = request
2648                .available_tools
2649                .first()
2650                .map(|tool| tool.name.0.clone())
2651                .unwrap_or_else(|| "echo".into());
2652
2653            let events = if has_tool_result {
2654                let result_text = request
2655                    .transcript
2656                    .iter()
2657                    .rev()
2658                    .find_map(|item| {
2659                        item.parts.iter().find_map(|part| match part {
2660                            Part::ToolResult(ToolResultPart {
2661                                output: ToolOutput::Text(text),
2662                                ..
2663                            }) => Some(text.clone()),
2664                            _ => None,
2665                        })
2666                    })
2667                    .unwrap_or_else(|| "missing".into());
2668
2669                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2670                    finish_reason: FinishReason::Completed,
2671                    output_items: vec![Item {
2672                        id: None,
2673                        kind: ItemKind::Assistant,
2674                        parts: vec![Part::Text(TextPart {
2675                            text: format!("tool said: {result_text}"),
2676                            metadata: MetadataMap::new(),
2677                        })],
2678                        metadata: MetadataMap::new(),
2679                        usage: None,
2680                        finish_reason: None,
2681                        created_at: None,
2682                    }],
2683                    usage: None,
2684                    metadata: MetadataMap::new(),
2685                })])
2686            } else {
2687                VecDeque::from([
2688                    ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2689                        id: ToolCallId::new("call-1"),
2690                        name: tool_name.clone(),
2691                        input: json!({ "value": "pong" }),
2692                        metadata: MetadataMap::new(),
2693                    }),
2694                    ModelTurnEvent::Finished(ModelTurnResult {
2695                        finish_reason: FinishReason::ToolCall,
2696                        output_items: vec![Item {
2697                            id: None,
2698                            kind: ItemKind::Assistant,
2699                            parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2700                                id: ToolCallId::new("call-1"),
2701                                name: tool_name,
2702                                input: json!({ "value": "pong" }),
2703                                metadata: MetadataMap::new(),
2704                            })],
2705                            metadata: MetadataMap::new(),
2706                            usage: None,
2707                            finish_reason: None,
2708                            created_at: None,
2709                        }],
2710                        usage: None,
2711                        metadata: MetadataMap::new(),
2712                    }),
2713                ])
2714            };
2715
2716            Ok(FakeTurn { events })
2717        }
2718    }
2719
2720    #[async_trait]
2721    impl ModelSession for SlowSession {
2722        type Turn = SlowTurn;
2723
2724        async fn begin_turn(
2725            &mut self,
2726            request: TurnRequest,
2727            cancellation: Option<TurnCancellation>,
2728        ) -> Result<Self::Turn, LoopError> {
2729            let should_block = request
2730                .transcript
2731                .iter()
2732                .rev()
2733                .find(|item| item.kind == ItemKind::User)
2734                .is_some_and(|item| {
2735                    item.parts.iter().any(|part| match part {
2736                        Part::Text(text) => text.text == "do the long task",
2737                        _ => false,
2738                    })
2739                });
2740
2741            if should_block && let Some(cancellation) = cancellation {
2742                cancellation.cancelled().await;
2743                return Err(LoopError::Cancelled);
2744            }
2745
2746            Ok(SlowTurn { emitted: false })
2747        }
2748    }
2749
2750    #[async_trait]
2751    impl ModelSession for RecordingSession {
2752        type Turn = RecordingTurn;
2753
2754        async fn begin_turn(
2755            &mut self,
2756            request: TurnRequest,
2757            _cancellation: Option<TurnCancellation>,
2758        ) -> Result<Self::Turn, LoopError> {
2759            let descriptions = request
2760                .available_tools
2761                .iter()
2762                .map(|tool| tool.description.clone())
2763                .collect::<Vec<_>>();
2764            self.seen_descriptions.lock().unwrap().push(descriptions);
2765            self.seen_caches.lock().unwrap().push(request.cache.clone());
2766
2767            Ok(RecordingTurn { emitted: false })
2768        }
2769    }
2770
2771    #[async_trait]
2772    impl ModelSession for MultiToolSession {
2773        type Turn = MultiToolTurn;
2774
2775        async fn begin_turn(
2776            &mut self,
2777            request: TurnRequest,
2778            _cancellation: Option<TurnCancellation>,
2779        ) -> Result<Self::Turn, LoopError> {
2780            let has_tool_result = request.transcript.iter().any(|item| {
2781                item.kind == ItemKind::Tool
2782                    && item
2783                        .parts
2784                        .iter()
2785                        .any(|part| matches!(part, Part::ToolResult(_)))
2786            });
2787
2788            let events = if has_tool_result {
2789                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2790                    finish_reason: FinishReason::Completed,
2791                    output_items: vec![Item {
2792                        id: None,
2793                        kind: ItemKind::Assistant,
2794                        parts: vec![Part::Text(TextPart {
2795                            text: "mixed tools finished".into(),
2796                            metadata: MetadataMap::new(),
2797                        })],
2798                        metadata: MetadataMap::new(),
2799                        usage: None,
2800                        finish_reason: None,
2801                        created_at: None,
2802                    }],
2803                    usage: None,
2804                    metadata: MetadataMap::new(),
2805                })])
2806            } else {
2807                let foreground = agentkit_core::ToolCallPart {
2808                    id: ToolCallId::new("call-foreground"),
2809                    name: "foreground-wait".into(),
2810                    input: json!({}),
2811                    metadata: MetadataMap::new(),
2812                };
2813                let background = agentkit_core::ToolCallPart {
2814                    id: ToolCallId::new("call-background"),
2815                    name: "background-wait".into(),
2816                    input: json!({}),
2817                    metadata: MetadataMap::new(),
2818                };
2819                VecDeque::from([
2820                    ModelTurnEvent::ToolCall(foreground.clone()),
2821                    ModelTurnEvent::ToolCall(background.clone()),
2822                    ModelTurnEvent::Finished(ModelTurnResult {
2823                        finish_reason: FinishReason::ToolCall,
2824                        output_items: vec![Item {
2825                            id: None,
2826                            kind: ItemKind::Assistant,
2827                            parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2828                            metadata: MetadataMap::new(),
2829                            usage: None,
2830                            finish_reason: None,
2831                            created_at: None,
2832                        }],
2833                        usage: None,
2834                        metadata: MetadataMap::new(),
2835                    }),
2836                ])
2837            };
2838
2839            Ok(MultiToolTurn { events })
2840        }
2841    }
2842
2843    #[async_trait]
2844    impl ModelSession for DualApprovalSession {
2845        type Turn = DualApprovalTurn;
2846
2847        async fn begin_turn(
2848            &mut self,
2849            request: TurnRequest,
2850            _cancellation: Option<TurnCancellation>,
2851        ) -> Result<Self::Turn, LoopError> {
2852            let tool_results = request
2853                .transcript
2854                .iter()
2855                .flat_map(|item| item.parts.iter())
2856                .filter(|part| matches!(part, Part::ToolResult(_)))
2857                .count();
2858
2859            let events = if tool_results >= 2 {
2860                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2861                    finish_reason: FinishReason::Completed,
2862                    output_items: vec![Item {
2863                        id: None,
2864                        kind: ItemKind::Assistant,
2865                        parts: vec![Part::Text(TextPart {
2866                            text: "both approvals finished".into(),
2867                            metadata: MetadataMap::new(),
2868                        })],
2869                        metadata: MetadataMap::new(),
2870                        usage: None,
2871                        finish_reason: None,
2872                        created_at: None,
2873                    }],
2874                    usage: None,
2875                    metadata: MetadataMap::new(),
2876                })])
2877            } else {
2878                let first = agentkit_core::ToolCallPart {
2879                    id: ToolCallId::new("call-1"),
2880                    name: "echo".into(),
2881                    input: json!({ "value": "first" }),
2882                    metadata: MetadataMap::new(),
2883                };
2884                let second = agentkit_core::ToolCallPart {
2885                    id: ToolCallId::new("call-2"),
2886                    name: "echo".into(),
2887                    input: json!({ "value": "second" }),
2888                    metadata: MetadataMap::new(),
2889                };
2890                VecDeque::from([
2891                    ModelTurnEvent::ToolCall(first.clone()),
2892                    ModelTurnEvent::ToolCall(second.clone()),
2893                    ModelTurnEvent::Finished(ModelTurnResult {
2894                        finish_reason: FinishReason::ToolCall,
2895                        output_items: vec![Item {
2896                            id: None,
2897                            kind: ItemKind::Assistant,
2898                            parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2899                            metadata: MetadataMap::new(),
2900                            usage: None,
2901                            finish_reason: None,
2902                            created_at: None,
2903                        }],
2904                        usage: None,
2905                        metadata: MetadataMap::new(),
2906                    }),
2907                ])
2908            };
2909
2910            Ok(DualApprovalTurn { events })
2911        }
2912    }
2913
2914    #[async_trait]
2915    impl ModelTurn for FakeTurn {
2916        async fn next_event(
2917            &mut self,
2918            _cancellation: Option<TurnCancellation>,
2919        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2920            Ok(self.events.pop_front())
2921        }
2922    }
2923
2924    #[async_trait]
2925    impl ModelTurn for SlowTurn {
2926        async fn next_event(
2927            &mut self,
2928            cancellation: Option<TurnCancellation>,
2929        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2930            if let Some(cancellation) = cancellation
2931                && cancellation.is_cancelled()
2932            {
2933                return Err(LoopError::Cancelled);
2934            }
2935
2936            if self.emitted {
2937                Ok(None)
2938            } else {
2939                self.emitted = true;
2940                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2941                    finish_reason: FinishReason::Completed,
2942                    output_items: vec![Item {
2943                        id: None,
2944                        kind: ItemKind::Assistant,
2945                        parts: vec![Part::Text(TextPart {
2946                            text: "done".into(),
2947                            metadata: MetadataMap::new(),
2948                        })],
2949                        metadata: MetadataMap::new(),
2950                        usage: None,
2951                        finish_reason: None,
2952                        created_at: None,
2953                    }],
2954                    usage: None,
2955                    metadata: MetadataMap::new(),
2956                })))
2957            }
2958        }
2959    }
2960
2961    #[async_trait]
2962    impl ModelTurn for RecordingTurn {
2963        async fn next_event(
2964            &mut self,
2965            _cancellation: Option<TurnCancellation>,
2966        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2967            if self.emitted {
2968                Ok(None)
2969            } else {
2970                self.emitted = true;
2971                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2972                    finish_reason: FinishReason::Completed,
2973                    output_items: vec![Item {
2974                        id: None,
2975                        kind: ItemKind::Assistant,
2976                        parts: vec![Part::Text(TextPart {
2977                            text: "done".into(),
2978                            metadata: MetadataMap::new(),
2979                        })],
2980                        metadata: MetadataMap::new(),
2981                        usage: None,
2982                        finish_reason: None,
2983                        created_at: None,
2984                    }],
2985                    usage: None,
2986                    metadata: MetadataMap::new(),
2987                })))
2988            }
2989        }
2990    }
2991
2992    #[async_trait]
2993    impl ModelTurn for MultiToolTurn {
2994        async fn next_event(
2995            &mut self,
2996            _cancellation: Option<TurnCancellation>,
2997        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2998            Ok(self.events.pop_front())
2999        }
3000    }
3001
3002    #[async_trait]
3003    impl ModelTurn for DualApprovalTurn {
3004        async fn next_event(
3005            &mut self,
3006            _cancellation: Option<TurnCancellation>,
3007        ) -> Result<Option<ModelTurnEvent>, LoopError> {
3008            Ok(self.events.pop_front())
3009        }
3010    }
3011
3012    #[derive(Clone)]
3013    struct EchoTool {
3014        spec: ToolSpec,
3015    }
3016
3017    impl Default for EchoTool {
3018        fn default() -> Self {
3019            Self {
3020                spec: ToolSpec {
3021                    name: ToolName::new("echo"),
3022                    description: "Echo back a value".into(),
3023                    input_schema: json!({
3024                        "type": "object",
3025                        "properties": {
3026                            "value": { "type": "string" }
3027                        },
3028                        "required": ["value"],
3029                        "additionalProperties": false
3030                    }),
3031                    annotations: ToolAnnotations::default(),
3032                    metadata: MetadataMap::new(),
3033                },
3034            }
3035        }
3036    }
3037
3038    #[derive(Clone)]
3039    struct DynamicSpecTool {
3040        spec: ToolSpec,
3041        version: StdArc<AtomicUsize>,
3042    }
3043
3044    impl DynamicSpecTool {
3045        fn new(version: StdArc<AtomicUsize>) -> Self {
3046            Self {
3047                spec: ToolSpec {
3048                    name: ToolName::new("dynamic"),
3049                    description: "dynamic version 0".into(),
3050                    input_schema: json!({
3051                        "type": "object",
3052                        "properties": {},
3053                        "additionalProperties": false
3054                    }),
3055                    annotations: ToolAnnotations::default(),
3056                    metadata: MetadataMap::new(),
3057                },
3058                version,
3059            }
3060        }
3061    }
3062
3063    #[async_trait]
3064    impl Tool for EchoTool {
3065        fn spec(&self) -> &ToolSpec {
3066            &self.spec
3067        }
3068
3069        fn proposed_requests(
3070            &self,
3071            request: &agentkit_tools_core::ToolRequest,
3072        ) -> Result<
3073            Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
3074            agentkit_tools_core::ToolError,
3075        > {
3076            Ok(vec![Box::new(FileSystemPermissionRequest::Read {
3077                path: "/tmp/echo".into(),
3078                metadata: request.metadata.clone(),
3079            })])
3080        }
3081
3082        async fn invoke(
3083            &self,
3084            request: agentkit_tools_core::ToolRequest,
3085            _ctx: &mut ToolContext<'_>,
3086        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3087            let value = request
3088                .input
3089                .get("value")
3090                .and_then(Value::as_str)
3091                .ok_or_else(|| {
3092                    agentkit_tools_core::ToolError::InvalidInput("missing value".into())
3093                })?;
3094
3095            Ok(ToolResult {
3096                result: ToolResultPart {
3097                    call_id: request.call_id,
3098                    output: ToolOutput::Text(value.into()),
3099                    is_error: false,
3100                    metadata: MetadataMap::new(),
3101                },
3102                duration: None,
3103                metadata: MetadataMap::new(),
3104            })
3105        }
3106    }
3107
3108    #[async_trait]
3109    impl Tool for DynamicSpecTool {
3110        fn spec(&self) -> &ToolSpec {
3111            &self.spec
3112        }
3113
3114        fn current_spec(&self) -> Option<ToolSpec> {
3115            let mut spec = self.spec.clone();
3116            spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
3117            Some(spec)
3118        }
3119
3120        async fn invoke(
3121            &self,
3122            request: agentkit_tools_core::ToolRequest,
3123            _ctx: &mut ToolContext<'_>,
3124        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3125            Ok(ToolResult {
3126                result: ToolResultPart {
3127                    call_id: request.call_id,
3128                    output: ToolOutput::Text("ok".into()),
3129                    is_error: false,
3130                    metadata: MetadataMap::new(),
3131                },
3132                duration: None,
3133                metadata: MetadataMap::new(),
3134            })
3135        }
3136    }
3137
3138    struct DenyFsReads;
3139
3140    impl PermissionChecker for DenyFsReads {
3141        fn evaluate(
3142            &self,
3143            request: &dyn agentkit_tools_core::PermissionRequest,
3144        ) -> PermissionDecision {
3145            if request.kind() == "filesystem.read" {
3146                return PermissionDecision::Deny(PermissionDenial {
3147                    code: PermissionCode::PathNotAllowed,
3148                    message: "reads denied in test".into(),
3149                    metadata: MetadataMap::new(),
3150                });
3151            }
3152
3153            PermissionDecision::Allow
3154        }
3155    }
3156
3157    struct ApproveFsReads;
3158
3159    impl PermissionChecker for ApproveFsReads {
3160        fn evaluate(
3161            &self,
3162            request: &dyn agentkit_tools_core::PermissionRequest,
3163        ) -> PermissionDecision {
3164            if request.kind() == "filesystem.read" {
3165                return PermissionDecision::RequireApproval(ApprovalRequest {
3166                    task_id: None,
3167                    call_id: None,
3168                    id: "approval:fs-read".into(),
3169                    request_kind: request.kind().into(),
3170                    reason: agentkit_tools_core::ApprovalReason::SensitivePath,
3171                    summary: request.summary(),
3172                    metadata: request.metadata().clone(),
3173                });
3174            }
3175
3176            PermissionDecision::Allow
3177        }
3178    }
3179
3180    struct KeepRecentMutator {
3181        keep: usize,
3182    }
3183
3184    #[async_trait]
3185    impl LoopMutator for KeepRecentMutator {
3186        async fn mutate(
3187            &self,
3188            cursor: &mut TranscriptCursor<'_>,
3189            ctx: LoopCtx<'_>,
3190        ) -> Result<(), LoopError> {
3191            if cursor.len() < 2 {
3192                return Ok(());
3193            }
3194            let drop = cursor.len().saturating_sub(self.keep);
3195            ctx.emitter.emit(AgentEvent::MutationStarted {
3196                session_id: ctx.session_id.clone(),
3197                turn_id: ctx.turn_id.cloned(),
3198                mutator: "keep-recent".into(),
3199                point: ctx.point,
3200            });
3201            cursor.drain(..drop);
3202            ctx.emitter.emit(AgentEvent::MutationFinished {
3203                session_id: ctx.session_id.clone(),
3204                turn_id: ctx.turn_id.cloned(),
3205                mutator: "keep-recent".into(),
3206                dirty: true,
3207                metadata: MetadataMap::new(),
3208            });
3209            Ok(())
3210        }
3211    }
3212
3213    struct RecordingObserver {
3214        events: StdArc<StdMutex<Vec<AgentEvent>>>,
3215    }
3216
3217    impl LoopObserver for RecordingObserver {
3218        fn handle_event(&self, event: AgentEvent) {
3219            self.events.lock().unwrap().push(event);
3220        }
3221    }
3222
3223    struct CatalogExecutor {
3224        version: AtomicUsize,
3225        events: StdMutex<Vec<ToolCatalogEvent>>,
3226    }
3227
3228    impl CatalogExecutor {
3229        fn new() -> Self {
3230            Self {
3231                version: AtomicUsize::new(0),
3232                events: StdMutex::new(Vec::new()),
3233            }
3234        }
3235
3236        fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3237            self.version.store(version, Ordering::SeqCst);
3238            self.events.lock().unwrap().push(event);
3239        }
3240    }
3241
3242    #[async_trait]
3243    impl ToolExecutor for CatalogExecutor {
3244        fn specs(&self) -> Vec<ToolSpec> {
3245            vec![ToolSpec {
3246                name: ToolName::new("dynamic"),
3247                description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3248                input_schema: json!({
3249                    "type": "object",
3250                    "properties": {},
3251                    "additionalProperties": false
3252                }),
3253                annotations: ToolAnnotations::default(),
3254                metadata: MetadataMap::new(),
3255            }]
3256        }
3257
3258        fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3259            std::mem::take(&mut *self.events.lock().unwrap())
3260        }
3261
3262        async fn execute(
3263            &self,
3264            request: ToolRequest,
3265            _ctx: &mut ToolContext<'_>,
3266        ) -> ToolExecutionOutcome {
3267            ToolExecutionOutcome::Completed(ToolResult {
3268                result: ToolResultPart {
3269                    call_id: request.call_id,
3270                    output: ToolOutput::Text("dynamic-ok".into()),
3271                    is_error: false,
3272                    metadata: MetadataMap::new(),
3273                },
3274                duration: None,
3275                metadata: MetadataMap::new(),
3276            })
3277        }
3278    }
3279
3280    #[derive(Clone)]
3281    struct BlockingTool {
3282        spec: ToolSpec,
3283        entered: StdArc<AtomicBool>,
3284        release: StdArc<Notify>,
3285        output: &'static str,
3286    }
3287
3288    impl BlockingTool {
3289        fn new(
3290            name: &str,
3291            entered: StdArc<AtomicBool>,
3292            release: StdArc<Notify>,
3293            output: &'static str,
3294        ) -> Self {
3295            Self {
3296                spec: ToolSpec {
3297                    name: ToolName::new(name),
3298                    description: format!("blocking tool {name}"),
3299                    input_schema: json!({
3300                        "type": "object",
3301                        "properties": {},
3302                        "additionalProperties": false
3303                    }),
3304                    annotations: ToolAnnotations::default(),
3305                    metadata: MetadataMap::new(),
3306                },
3307                entered,
3308                release,
3309                output,
3310            }
3311        }
3312    }
3313
3314    #[async_trait]
3315    impl Tool for BlockingTool {
3316        fn spec(&self) -> &ToolSpec {
3317            &self.spec
3318        }
3319
3320        async fn invoke(
3321            &self,
3322            request: agentkit_tools_core::ToolRequest,
3323            _ctx: &mut ToolContext<'_>,
3324        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3325            self.entered.store(true, Ordering::SeqCst);
3326            self.release.notified().await;
3327            Ok(ToolResult {
3328                result: ToolResultPart {
3329                    call_id: request.call_id,
3330                    output: ToolOutput::Text(self.output.into()),
3331                    is_error: false,
3332                    metadata: MetadataMap::new(),
3333                },
3334                duration: None,
3335                metadata: MetadataMap::new(),
3336            })
3337        }
3338    }
3339
3340    struct NameRoutingPolicy {
3341        routes: Vec<(String, RoutingDecision)>,
3342    }
3343
3344    impl NameRoutingPolicy {
3345        fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3346            Self {
3347                routes: routes
3348                    .into_iter()
3349                    .map(|(name, decision)| (name.into(), decision))
3350                    .collect(),
3351            }
3352        }
3353    }
3354
3355    impl TaskRoutingPolicy for NameRoutingPolicy {
3356        fn route(&self, request: &ToolRequest) -> RoutingDecision {
3357            self.routes
3358                .iter()
3359                .find(|(name, _)| name == &request.tool_name.0)
3360                .map(|(_, decision)| *decision)
3361                .unwrap_or(RoutingDecision::Foreground)
3362        }
3363    }
3364
3365    async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3366        timeout(Duration::from_secs(1), handle.next_event())
3367            .await
3368            .expect("timed out waiting for task event")
3369            .expect("task event stream ended unexpectedly")
3370    }
3371
3372    async fn wait_until_entered(flag: &AtomicBool) {
3373        timeout(Duration::from_secs(1), async {
3374            while !flag.load(Ordering::SeqCst) {
3375                tokio::task::yield_now().await;
3376            }
3377        })
3378        .await
3379        .expect("task never entered execution");
3380    }
3381
3382    #[tokio::test]
3383    async fn loop_continues_after_completed_tool_call() {
3384        let tools = ToolRegistry::new().with(EchoTool::default());
3385        let agent = Agent::builder()
3386            .model(FakeAdapter)
3387            .add_tool_source(tools)
3388            .permissions(AllowAllPermissions)
3389            .build()
3390            .unwrap();
3391
3392        let mut driver = agent
3393            .start(SessionConfig {
3394                session_id: SessionId::new("session-1"),
3395                metadata: MetadataMap::new(),
3396                cache: None,
3397            })
3398            .await
3399            .unwrap();
3400
3401        driver
3402            .submit_input(vec![Item {
3403                id: None,
3404                kind: ItemKind::User,
3405                parts: vec![Part::Text(TextPart {
3406                    text: "ping".into(),
3407                    metadata: MetadataMap::new(),
3408                })],
3409                metadata: MetadataMap::new(),
3410                usage: None,
3411                finish_reason: None,
3412                created_at: None,
3413            }])
3414            .unwrap();
3415
3416        let result = run_until_finished(&mut driver).await;
3417
3418        match result {
3419            LoopStep::Finished(turn) => {
3420                assert_eq!(turn.finish_reason, FinishReason::Completed);
3421                assert_eq!(turn.items.len(), 1);
3422                match &turn.items[0].parts[0] {
3423                    Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3424                    other => panic!("unexpected part: {other:?}"),
3425                }
3426            }
3427            other => panic!("unexpected loop step: {other:?}"),
3428        }
3429    }
3430
3431    /// Test helper: drives the loop, transparently resuming non-blocking
3432    /// cooperative interrupts (AfterToolResult), until a terminal step or a
3433    /// blocking interrupt is reached.
3434    async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3435        loop {
3436            match driver.next().await.unwrap() {
3437                LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3438                step => return step,
3439            }
3440        }
3441    }
3442
3443    #[tokio::test]
3444    async fn loop_uses_injected_permission_checker() {
3445        let tools = ToolRegistry::new().with(EchoTool::default());
3446        let agent = Agent::builder()
3447            .model(FakeAdapter)
3448            .add_tool_source(tools)
3449            .permissions(DenyFsReads)
3450            .build()
3451            .unwrap();
3452
3453        let mut driver = agent
3454            .start(SessionConfig {
3455                session_id: SessionId::new("session-2"),
3456                metadata: MetadataMap::new(),
3457                cache: None,
3458            })
3459            .await
3460            .unwrap();
3461
3462        driver
3463            .submit_input(vec![Item {
3464                id: None,
3465                kind: ItemKind::User,
3466                parts: vec![Part::Text(TextPart {
3467                    text: "ping".into(),
3468                    metadata: MetadataMap::new(),
3469                })],
3470                metadata: MetadataMap::new(),
3471                usage: None,
3472                finish_reason: None,
3473                created_at: None,
3474            }])
3475            .unwrap();
3476
3477        let result = run_until_finished(&mut driver).await;
3478
3479        match result {
3480            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3481                Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3482                other => panic!("unexpected part: {other:?}"),
3483            },
3484            other => panic!("unexpected loop step: {other:?}"),
3485        }
3486    }
3487
3488    #[tokio::test]
3489    async fn async_task_manager_background_round_requires_explicit_continue() {
3490        let entered = StdArc::new(AtomicBool::new(false));
3491        let release = StdArc::new(Notify::new());
3492        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3493            "background-wait",
3494            RoutingDecision::Background,
3495        )]));
3496        let handle = task_manager.handle();
3497        let tools = ToolRegistry::new().with(BlockingTool::new(
3498            "background-wait",
3499            entered.clone(),
3500            release.clone(),
3501            "background-done",
3502        ));
3503        let agent = Agent::builder()
3504            .model(FakeAdapter)
3505            .add_tool_source(tools)
3506            .permissions(AllowAllPermissions)
3507            .task_manager(task_manager)
3508            .build()
3509            .unwrap();
3510
3511        let mut driver = agent
3512            .start(SessionConfig {
3513                session_id: SessionId::new("session-background"),
3514                metadata: MetadataMap::new(),
3515                cache: None,
3516            })
3517            .await
3518            .unwrap();
3519
3520        driver
3521            .submit_input(vec![Item {
3522                id: None,
3523                kind: ItemKind::User,
3524                parts: vec![Part::Text(TextPart {
3525                    text: "ping".into(),
3526                    metadata: MetadataMap::new(),
3527                })],
3528                metadata: MetadataMap::new(),
3529                usage: None,
3530                finish_reason: None,
3531                created_at: None,
3532            }])
3533            .unwrap();
3534
3535        let first = driver.next().await.unwrap();
3536        match first {
3537            LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3538            other => panic!("unexpected first loop step: {other:?}"),
3539        }
3540
3541        match wait_for_task_event(&handle).await {
3542            TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3543            other => panic!("unexpected task event: {other:?}"),
3544        }
3545        wait_until_entered(entered.as_ref()).await;
3546        release.notify_waiters();
3547
3548        match wait_for_task_event(&handle).await {
3549            TaskEvent::Completed(_, result) => {
3550                assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3551            }
3552            other => panic!("unexpected completion event: {other:?}"),
3553        }
3554
3555        let resumed = driver.next().await.unwrap();
3556        match resumed {
3557            LoopStep::Finished(turn) => {
3558                assert_eq!(turn.finish_reason, FinishReason::Completed);
3559                match &turn.items[0].parts[0] {
3560                    Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3561                    other => panic!("unexpected part after resume: {other:?}"),
3562                }
3563            }
3564            other => panic!("unexpected resumed step: {other:?}"),
3565        }
3566    }
3567
3568    #[tokio::test]
3569    async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3570        let controller = CancellationController::new();
3571        let agent = Agent::builder()
3572            .model(SlowAdapter)
3573            .cancellation(controller.handle())
3574            .build()
3575            .unwrap();
3576
3577        let mut driver = agent
3578            .start(SessionConfig {
3579                session_id: SessionId::new("session-cancel"),
3580                metadata: MetadataMap::new(),
3581                cache: None,
3582            })
3583            .await
3584            .unwrap();
3585
3586        driver
3587            .submit_input(vec![Item {
3588                id: None,
3589                kind: ItemKind::User,
3590                parts: vec![Part::Text(TextPart {
3591                    text: "do the long task".into(),
3592                    metadata: MetadataMap::new(),
3593                })],
3594                metadata: MetadataMap::new(),
3595                usage: None,
3596                finish_reason: None,
3597                created_at: None,
3598            }])
3599            .unwrap();
3600
3601        let cancelled = tokio::join!(async { driver.next().await }, async {
3602            tokio::task::yield_now().await;
3603            controller.interrupt();
3604        })
3605        .0
3606        .unwrap();
3607
3608        match cancelled {
3609            LoopStep::Finished(turn) => {
3610                assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3611                assert_eq!(turn.items.len(), 1);
3612                assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3613                assert_eq!(
3614                    turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3615                    Some(&Value::Bool(true))
3616                );
3617            }
3618            other => panic!("unexpected loop step: {other:?}"),
3619        }
3620
3621        driver
3622            .submit_input(vec![Item {
3623                id: None,
3624                kind: ItemKind::User,
3625                parts: vec![Part::Text(TextPart {
3626                    text: "try again".into(),
3627                    metadata: MetadataMap::new(),
3628                })],
3629                metadata: MetadataMap::new(),
3630                usage: None,
3631                finish_reason: None,
3632                created_at: None,
3633            }])
3634            .unwrap();
3635
3636        let result = driver.next().await.unwrap();
3637        match result {
3638            LoopStep::Finished(turn) => {
3639                assert_eq!(turn.finish_reason, FinishReason::Completed);
3640            }
3641            other => panic!("unexpected loop step after retry: {other:?}"),
3642        }
3643    }
3644
3645    #[tokio::test]
3646    async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3647        let controller = CancellationController::new();
3648        let fg_entered = StdArc::new(AtomicBool::new(false));
3649        let fg_release = StdArc::new(Notify::new());
3650        let bg_entered = StdArc::new(AtomicBool::new(false));
3651        let bg_release = StdArc::new(Notify::new());
3652        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3653            ("foreground-wait", RoutingDecision::Foreground),
3654            ("background-wait", RoutingDecision::Background),
3655        ]));
3656        let handle = task_manager.handle();
3657        let tools = ToolRegistry::new()
3658            .with(BlockingTool::new(
3659                "foreground-wait",
3660                fg_entered.clone(),
3661                fg_release,
3662                "foreground-done",
3663            ))
3664            .with(BlockingTool::new(
3665                "background-wait",
3666                bg_entered.clone(),
3667                bg_release.clone(),
3668                "background-done",
3669            ));
3670        let agent = Agent::builder()
3671            .model(MultiToolAdapter)
3672            .add_tool_source(tools)
3673            .permissions(AllowAllPermissions)
3674            .cancellation(controller.handle())
3675            .task_manager(task_manager)
3676            .build()
3677            .unwrap();
3678
3679        let mut driver = agent
3680            .start(SessionConfig {
3681                session_id: SessionId::new("session-mixed-cancel"),
3682                metadata: MetadataMap::new(),
3683                cache: None,
3684            })
3685            .await
3686            .unwrap();
3687
3688        driver
3689            .submit_input(vec![Item {
3690                id: None,
3691                kind: ItemKind::User,
3692                parts: vec![Part::Text(TextPart {
3693                    text: "run both".into(),
3694                    metadata: MetadataMap::new(),
3695                })],
3696                metadata: MetadataMap::new(),
3697                usage: None,
3698                finish_reason: None,
3699                created_at: None,
3700            }])
3701            .unwrap();
3702
3703        let cancelled = tokio::join!(async { driver.next().await }, async {
3704            let _ = wait_for_task_event(&handle).await;
3705            let _ = wait_for_task_event(&handle).await;
3706            wait_until_entered(fg_entered.as_ref()).await;
3707            wait_until_entered(bg_entered.as_ref()).await;
3708            controller.interrupt();
3709        })
3710        .0
3711        .unwrap();
3712
3713        match cancelled {
3714            LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3715            other => panic!("unexpected loop step after interrupt: {other:?}"),
3716        }
3717
3718        match wait_for_task_event(&handle).await {
3719            TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3720            other => panic!("unexpected post-interrupt event: {other:?}"),
3721        }
3722
3723        let running = handle.list_running().await;
3724        assert_eq!(running.len(), 1);
3725        assert_eq!(running[0].tool_name, "background-wait");
3726
3727        bg_release.notify_waiters();
3728        match wait_for_task_event(&handle).await {
3729            TaskEvent::Completed(snapshot, result) => {
3730                assert_eq!(snapshot.tool_name, "background-wait");
3731                assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3732            }
3733            other => panic!("unexpected background completion event: {other:?}"),
3734        }
3735    }
3736
3737    #[tokio::test]
3738    async fn loop_resumes_after_approved_tool_request() {
3739        let tools = ToolRegistry::new().with(EchoTool::default());
3740        let agent = Agent::builder()
3741            .model(FakeAdapter)
3742            .add_tool_source(tools)
3743            .permissions(ApproveFsReads)
3744            .build()
3745            .unwrap();
3746
3747        let mut driver = agent
3748            .start(SessionConfig {
3749                session_id: SessionId::new("session-approval"),
3750                metadata: MetadataMap::new(),
3751                cache: None,
3752            })
3753            .await
3754            .unwrap();
3755
3756        driver
3757            .submit_input(vec![Item {
3758                id: None,
3759                kind: ItemKind::User,
3760                parts: vec![Part::Text(TextPart {
3761                    text: "ping".into(),
3762                    metadata: MetadataMap::new(),
3763                })],
3764                metadata: MetadataMap::new(),
3765                usage: None,
3766                finish_reason: None,
3767                created_at: None,
3768            }])
3769            .unwrap();
3770
3771        let first = driver.next().await.unwrap();
3772        match first {
3773            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3774                assert!(pending.request.task_id.is_some());
3775                assert_eq!(pending.request.id.0, "approval:fs-read");
3776                pending.approve(&mut driver).unwrap();
3777            }
3778            other => panic!("unexpected loop step: {other:?}"),
3779        }
3780        let second = driver.next().await.unwrap();
3781        match second {
3782            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3783                Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3784                other => panic!("unexpected part: {other:?}"),
3785            },
3786            other => panic!("unexpected loop step after approval: {other:?}"),
3787        }
3788    }
3789
3790    #[tokio::test]
3791    async fn loop_resumes_with_patched_input_on_approval() {
3792        let tools = ToolRegistry::new().with(EchoTool::default());
3793        let agent = Agent::builder()
3794            .model(FakeAdapter)
3795            .add_tool_source(tools)
3796            .permissions(ApproveFsReads)
3797            .build()
3798            .unwrap();
3799
3800        let mut driver = agent
3801            .start(SessionConfig {
3802                session_id: SessionId::new("session-approval-patched"),
3803                metadata: MetadataMap::new(),
3804                cache: None,
3805            })
3806            .await
3807            .unwrap();
3808
3809        driver
3810            .submit_input(vec![Item {
3811                id: None,
3812                kind: ItemKind::User,
3813                parts: vec![Part::Text(TextPart {
3814                    text: "ping".into(),
3815                    metadata: MetadataMap::new(),
3816                })],
3817                metadata: MetadataMap::new(),
3818                usage: None,
3819                finish_reason: None,
3820                created_at: None,
3821            }])
3822            .unwrap();
3823
3824        match driver.next().await.unwrap() {
3825            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3826                pending
3827                    .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
3828                    .unwrap();
3829            }
3830            other => panic!("unexpected loop step: {other:?}"),
3831        }
3832        match driver.next().await.unwrap() {
3833            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3834                Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
3835                other => panic!("unexpected part: {other:?}"),
3836            },
3837            other => panic!("unexpected loop step after approval: {other:?}"),
3838        }
3839    }
3840
3841    #[tokio::test]
3842    async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3843        let tools = ToolRegistry::new().with(EchoTool::default());
3844        let agent = Agent::builder()
3845            .model(DualApprovalAdapter)
3846            .add_tool_source(tools)
3847            .permissions(ApproveFsReads)
3848            .build()
3849            .unwrap();
3850
3851        let mut driver = agent
3852            .start(SessionConfig {
3853                session_id: SessionId::new("session-dual-approval"),
3854                metadata: MetadataMap::new(),
3855                cache: None,
3856            })
3857            .await
3858            .unwrap();
3859
3860        driver
3861            .submit_input(vec![Item {
3862                id: None,
3863                kind: ItemKind::User,
3864                parts: vec![Part::Text(TextPart {
3865                    text: "run both approvals".into(),
3866                    metadata: MetadataMap::new(),
3867                })],
3868                metadata: MetadataMap::new(),
3869                usage: None,
3870                finish_reason: None,
3871                created_at: None,
3872            }])
3873            .unwrap();
3874
3875        let pending_first = match driver.next().await.unwrap() {
3876            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3877                assert_eq!(
3878                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3879                    Some("call-1")
3880                );
3881                pending
3882            }
3883            other => panic!("unexpected first loop step: {other:?}"),
3884        };
3885
3886        let pending_second = match driver.next().await.unwrap() {
3887            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3888                assert_eq!(
3889                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3890                    Some("call-2")
3891                );
3892                pending
3893            }
3894            other => panic!("unexpected second loop step: {other:?}"),
3895        };
3896
3897        pending_second.approve(&mut driver).unwrap();
3898        match driver.next().await.unwrap() {
3899            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3900                assert_eq!(
3901                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3902                    Some("call-1")
3903                );
3904            }
3905            other => panic!("unexpected step after approving second request: {other:?}"),
3906        }
3907
3908        pending_first.approve(&mut driver).unwrap();
3909        match driver.next().await.unwrap() {
3910            LoopStep::Finished(turn) => {
3911                assert_eq!(turn.finish_reason, FinishReason::Completed);
3912                match &turn.items[0].parts[0] {
3913                    Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3914                    other => panic!("unexpected final part: {other:?}"),
3915                }
3916            }
3917            other => panic!("unexpected final loop step: {other:?}"),
3918        }
3919    }
3920
3921    #[tokio::test]
3922    async fn loop_compacts_transcript_before_new_turns() {
3923        let events = StdArc::new(StdMutex::new(Vec::new()));
3924        let agent = Agent::builder()
3925            .model(FakeAdapter)
3926            .mutator(KeepRecentMutator { keep: 1 })
3927            .observer(RecordingObserver {
3928                events: events.clone(),
3929            })
3930            .build()
3931            .unwrap();
3932
3933        let mut driver = agent
3934            .start(SessionConfig {
3935                session_id: SessionId::new("session-4"),
3936                metadata: MetadataMap::new(),
3937                cache: None,
3938            })
3939            .await
3940            .unwrap();
3941
3942        for text in ["first", "second"] {
3943            driver
3944                .submit_input(vec![Item {
3945                    id: None,
3946                    kind: ItemKind::User,
3947                    parts: vec![Part::Text(TextPart {
3948                        text: text.into(),
3949                        metadata: MetadataMap::new(),
3950                    })],
3951                    metadata: MetadataMap::new(),
3952                    usage: None,
3953                    finish_reason: None,
3954                    created_at: None,
3955                }])
3956                .unwrap();
3957            let _ = driver.next().await.unwrap();
3958        }
3959
3960        let events = events.lock().unwrap();
3961        assert!(
3962            events
3963                .iter()
3964                .any(|event| matches!(event, AgentEvent::MutationFinished { dirty: true, .. }))
3965        );
3966    }
3967
3968    #[test]
3969    fn transcript_validation_rejects_orphaned_tool_result() {
3970        let transcript = vec![Item {
3971            id: None,
3972            kind: ItemKind::Tool,
3973            parts: vec![Part::ToolResult(ToolResultPart {
3974                call_id: "call-1".into(),
3975                output: ToolOutput::Text("result".into()),
3976                is_error: false,
3977                metadata: MetadataMap::new(),
3978            })],
3979            metadata: MetadataMap::new(),
3980            usage: None,
3981            finish_reason: None,
3982            created_at: None,
3983        }];
3984
3985        let error = validate_transcript_invariants(&transcript).unwrap_err();
3986        assert!(error.to_string().contains("orphaned tool_result"));
3987    }
3988
3989    #[test]
3990    fn transcript_validation_rejects_duplicate_tool_result() {
3991        let transcript = vec![
3992            Item {
3993                id: None,
3994                kind: ItemKind::Assistant,
3995                parts: vec![Part::ToolCall(ToolCallPart {
3996                    id: "call-1".into(),
3997                    name: "lookup".into(),
3998                    input: serde_json::json!({}),
3999                    metadata: MetadataMap::new(),
4000                })],
4001                metadata: MetadataMap::new(),
4002                usage: None,
4003                finish_reason: None,
4004                created_at: None,
4005            },
4006            Item {
4007                id: None,
4008                kind: ItemKind::Tool,
4009                parts: vec![Part::ToolResult(ToolResultPart {
4010                    call_id: "call-1".into(),
4011                    output: ToolOutput::Text("result".into()),
4012                    is_error: false,
4013                    metadata: MetadataMap::new(),
4014                })],
4015                metadata: MetadataMap::new(),
4016                usage: None,
4017                finish_reason: None,
4018                created_at: None,
4019            },
4020            Item {
4021                id: None,
4022                kind: ItemKind::Tool,
4023                parts: vec![Part::ToolResult(ToolResultPart {
4024                    call_id: "call-1".into(),
4025                    output: ToolOutput::Text("again".into()),
4026                    is_error: false,
4027                    metadata: MetadataMap::new(),
4028                })],
4029                metadata: MetadataMap::new(),
4030                usage: None,
4031                finish_reason: None,
4032                created_at: None,
4033            },
4034        ];
4035
4036        let error = validate_transcript_invariants(&transcript).unwrap_err();
4037        assert!(error.to_string().contains("duplicate tool_result"));
4038    }
4039
4040    #[tokio::test]
4041    async fn loop_refreshes_tool_specs_each_turn() {
4042        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4043        let version = StdArc::new(AtomicUsize::new(1));
4044        let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
4045        let agent = Agent::builder()
4046            .model(RecordingAdapter {
4047                seen_descriptions: seen_descriptions.clone(),
4048                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4049            })
4050            .add_tool_source(tools)
4051            .permissions(AllowAllPermissions)
4052            .build()
4053            .unwrap();
4054
4055        let mut driver = agent
4056            .start(SessionConfig {
4057                session_id: SessionId::new("session-dynamic-tools"),
4058                metadata: MetadataMap::new(),
4059                cache: None,
4060            })
4061            .await
4062            .unwrap();
4063
4064        for text in ["first", "second"] {
4065            driver
4066                .submit_input(vec![Item {
4067                    id: None,
4068                    kind: ItemKind::User,
4069                    parts: vec![Part::Text(TextPart {
4070                        text: text.into(),
4071                        metadata: MetadataMap::new(),
4072                    })],
4073                    metadata: MetadataMap::new(),
4074                    usage: None,
4075                    finish_reason: None,
4076                    created_at: None,
4077                }])
4078                .unwrap();
4079
4080            let _ = driver.next().await.unwrap();
4081            if text == "first" {
4082                version.store(2, Ordering::SeqCst);
4083            }
4084        }
4085
4086        let seen_descriptions = seen_descriptions.lock().unwrap();
4087        assert_eq!(seen_descriptions.len(), 2);
4088        assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
4089        assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
4090    }
4091
4092    #[tokio::test]
4093    async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
4094        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
4095        let events = StdArc::new(StdMutex::new(Vec::new()));
4096        let executor = StdArc::new(CatalogExecutor::new());
4097        let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
4098        let agent = Agent::builder()
4099            .model(RecordingAdapter {
4100                seen_descriptions: seen_descriptions.clone(),
4101                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
4102            })
4103            .tool_executor(executor_for_agent)
4104            .permissions(AllowAllPermissions)
4105            .observer(RecordingObserver {
4106                events: events.clone(),
4107            })
4108            .build()
4109            .unwrap();
4110
4111        let mut driver = agent
4112            .start(SessionConfig {
4113                session_id: SessionId::new("session-catalog-events"),
4114                metadata: MetadataMap::new(),
4115                cache: None,
4116            })
4117            .await
4118            .unwrap();
4119
4120        driver
4121            .submit_input(vec![Item::text(ItemKind::User, "first")])
4122            .unwrap();
4123        let _ = driver.next().await.unwrap();
4124
4125        executor.publish_change(
4126            1,
4127            ToolCatalogEvent {
4128                source: "mcp:mock".into(),
4129                added: vec!["dynamic".into()],
4130                removed: Vec::new(),
4131                changed: Vec::new(),
4132            },
4133        );
4134
4135        driver
4136            .submit_input(vec![Item::text(ItemKind::User, "second")])
4137            .unwrap();
4138        let _ = driver.next().await.unwrap();
4139
4140        let seen_descriptions = seen_descriptions.lock().unwrap();
4141        assert_eq!(seen_descriptions.len(), 2);
4142        assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
4143        assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
4144
4145        let events = events.lock().unwrap();
4146        assert!(events.iter().any(|event| matches!(
4147            event,
4148            AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
4149                source,
4150                added,
4151                removed,
4152                changed,
4153            }) if source == "mcp:mock"
4154                && added == &vec!["dynamic".to_string()]
4155                && removed.is_empty()
4156                && changed.is_empty()
4157        )));
4158    }
4159
4160    #[tokio::test]
4161    async fn loop_passes_session_default_and_next_turn_cache_requests() {
4162        let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
4163        let agent = Agent::builder()
4164            .model(RecordingAdapter {
4165                seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
4166                seen_caches: seen_caches.clone(),
4167            })
4168            .permissions(AllowAllPermissions)
4169            .build()
4170            .unwrap();
4171
4172        let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
4173            .with_retention(PromptCacheRetention::Short);
4174        let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
4175            breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
4176        });
4177
4178        let mut driver = agent
4179            .start(SessionConfig {
4180                session_id: SessionId::new("session-cache"),
4181                metadata: MetadataMap::new(),
4182                cache: Some(default_cache.clone()),
4183            })
4184            .await
4185            .unwrap();
4186
4187        driver
4188            .submit_input(vec![Item {
4189                id: None,
4190                kind: ItemKind::User,
4191                parts: vec![Part::Text(TextPart {
4192                    text: "first".into(),
4193                    metadata: MetadataMap::new(),
4194                })],
4195                metadata: MetadataMap::new(),
4196                usage: None,
4197                finish_reason: None,
4198                created_at: None,
4199            }])
4200            .unwrap();
4201        let _ = driver.next().await.unwrap();
4202
4203        driver
4204            .submit_input_with_cache(
4205                vec![Item {
4206                    id: None,
4207                    kind: ItemKind::User,
4208                    parts: vec![Part::Text(TextPart {
4209                        text: "second".into(),
4210                        metadata: MetadataMap::new(),
4211                    })],
4212                    metadata: MetadataMap::new(),
4213                    usage: None,
4214                    finish_reason: None,
4215                    created_at: None,
4216                }],
4217                override_cache.clone(),
4218            )
4219            .unwrap();
4220        let _ = driver.next().await.unwrap();
4221
4222        let seen = seen_caches.lock().unwrap();
4223        assert_eq!(seen.len(), 2);
4224        assert_eq!(seen[0], Some(default_cache));
4225        assert_eq!(seen[1], Some(override_cache));
4226    }
4227
4228    #[tokio::test]
4229    async fn loop_yields_after_tool_result_between_rounds() {
4230        let tools = ToolRegistry::new().with(EchoTool::default());
4231        let agent = Agent::builder()
4232            .model(FakeAdapter)
4233            .add_tool_source(tools)
4234            .permissions(AllowAllPermissions)
4235            .build()
4236            .unwrap();
4237
4238        let mut driver = agent
4239            .start(SessionConfig {
4240                session_id: SessionId::new("yield-session"),
4241                metadata: MetadataMap::new(),
4242                cache: None,
4243            })
4244            .await
4245            .unwrap();
4246
4247        driver
4248            .submit_input(vec![Item::text(ItemKind::User, "ping")])
4249            .unwrap();
4250
4251        // First next() runs the model turn, resolves the tool call, and
4252        // yields AfterToolResult before calling the model again.
4253        let step = driver.next().await.unwrap();
4254        let info = match step {
4255            LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
4256            other => panic!("expected AfterToolResult, got {other:?}"),
4257        };
4258        assert_eq!(info.session_id, SessionId::new("yield-session"));
4259        // Transcript at yield: [User, Assistant(tool_call), Tool(result)]
4260        assert_eq!(info.transcript_len, 3);
4261
4262        // The yield is cooperative, not blocking.
4263        let interrupt = LoopInterrupt::AfterToolResult(info.clone());
4264        assert!(!interrupt.is_blocking());
4265
4266        // Host interjects a message mid-turn.
4267        driver
4268            .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
4269            .unwrap();
4270
4271        // Second next() resumes the turn into the next model call, which
4272        // sees the tool result (and the injected user message) and finishes.
4273        let step = driver.next().await.unwrap();
4274        match step {
4275            LoopStep::Finished(turn) => {
4276                assert_eq!(turn.finish_reason, FinishReason::Completed);
4277            }
4278            other => panic!("expected Finished, got {other:?}"),
4279        }
4280
4281        // Transcript must now include the injected user message.
4282        let snapshot = driver.snapshot();
4283        let has_injected_message = snapshot.transcript.iter().any(|item| {
4284            item.kind == ItemKind::User
4285                && item.parts.iter().any(|part| match part {
4286                    Part::Text(text) => text.text == "also: report back",
4287                    _ => false,
4288                })
4289        });
4290        assert!(
4291            has_injected_message,
4292            "injected user message should be in transcript, got: {:?}",
4293            snapshot.transcript
4294        );
4295    }
4296
4297    struct RecordingTranscriptObserver {
4298        items: StdArc<StdMutex<Vec<Item>>>,
4299    }
4300
4301    impl TranscriptObserver for RecordingTranscriptObserver {
4302        fn on_item_appended(&self, item: &Item) {
4303            self.items.lock().unwrap().push(item.clone());
4304        }
4305    }
4306
4307    #[tokio::test]
4308    async fn observers_see_full_tool_round() {
4309        // A turn with one tool call exercises every interesting path:
4310        //   user input drained -> model output_items (assistant w/ tool call)
4311        //   -> tool result Item -> next model output_items (assistant text)
4312        // The LoopObserver should see exactly one ToolResultReceived; the
4313        // TranscriptObserver should see all four items in transcript order.
4314        let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
4315        let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
4316        let agent = Agent::builder()
4317            .model(FakeAdapter)
4318            .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
4319            .permissions(AllowAllPermissions)
4320            .observer(RecordingObserver {
4321                events: events.clone(),
4322            })
4323            .transcript_observer(RecordingTranscriptObserver {
4324                items: items.clone(),
4325            })
4326            .build()
4327            .unwrap();
4328
4329        let mut driver = agent
4330            .start(SessionConfig {
4331                session_id: SessionId::new("observer-session"),
4332                metadata: MetadataMap::new(),
4333                cache: None,
4334            })
4335            .await
4336            .unwrap();
4337
4338        driver
4339            .submit_input(vec![Item {
4340                id: None,
4341                kind: ItemKind::User,
4342                parts: vec![Part::Text(TextPart {
4343                    text: "ping".into(),
4344                    metadata: MetadataMap::new(),
4345                })],
4346                metadata: MetadataMap::new(),
4347                usage: None,
4348                finish_reason: None,
4349                created_at: None,
4350            }])
4351            .unwrap();
4352
4353        let result = run_until_finished(&mut driver).await;
4354        assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4355
4356        // LoopObserver: exactly one ToolResultReceived, with the echo
4357        // tool's output, correlating back to the model's tool call.
4358        let events = events.lock().unwrap().clone();
4359        let tool_call_id = events.iter().find_map(|e| match e {
4360            AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4361            _ => None,
4362        });
4363        let tool_results: Vec<_> = events
4364            .iter()
4365            .filter_map(|e| match e {
4366                AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4367                _ => None,
4368            })
4369            .collect();
4370        assert_eq!(tool_results.len(), 1, "events: {events:?}");
4371        assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4372        assert!(!tool_results[0].is_error);
4373
4374        // TranscriptObserver: every transcript mutation surfaces.
4375        // Expected order: User("ping"), Assistant(tool call), Tool(result),
4376        // Assistant("tool said: pong").
4377        let items = items.lock().unwrap().clone();
4378        assert_eq!(items.len(), 4, "items: {items:?}");
4379        assert_eq!(items[0].kind, ItemKind::User);
4380        assert_eq!(items[1].kind, ItemKind::Assistant);
4381        assert!(
4382            items[1]
4383                .parts
4384                .iter()
4385                .any(|p| matches!(p, Part::ToolCall(_)))
4386        );
4387        assert_eq!(items[2].kind, ItemKind::Tool);
4388        assert!(
4389            items[2]
4390                .parts
4391                .iter()
4392                .any(|p| matches!(p, Part::ToolResult(_)))
4393        );
4394        assert_eq!(items[3].kind, ItemKind::Assistant);
4395    }
4396
4397    #[test]
4398    fn convenience_cache_builders_construct_expected_defaults() {
4399        let cache = PromptCacheRequest::automatic()
4400            .with_retention(PromptCacheRetention::Short)
4401            .with_key("workspace:demo");
4402        let session = SessionConfig::new("demo").with_cache(cache.clone());
4403
4404        assert_eq!(session.session_id, SessionId::new("demo"));
4405        assert_eq!(session.cache, Some(cache));
4406
4407        let explicit = PromptCacheRequest::explicit([
4408            PromptCacheBreakpoint::tools_end(),
4409            PromptCacheBreakpoint::transcript_item_end(2),
4410            PromptCacheBreakpoint::transcript_part_end(3, 1),
4411        ]);
4412
4413        assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4414        assert_eq!(
4415            explicit.strategy,
4416            PromptCacheStrategy::Explicit {
4417                breakpoints: vec![
4418                    PromptCacheBreakpoint::ToolsEnd,
4419                    PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4420                    PromptCacheBreakpoint::TranscriptPartEnd {
4421                        item_index: 3,
4422                        part_index: 1,
4423                    },
4424                ],
4425            }
4426        );
4427    }
4428}