Skip to main content

agentkit_loop/
lib.rs

1//! Runtime-agnostic agent loop orchestration for sessions, turns, tools, and interrupts.
2//!
3//! `agentkit-loop` is the central coordination layer in the agentkit workspace.  It
4//! drives a model through a multi-turn agentic loop, executing tool calls,
5//! respecting permission checks, surfacing approval interrupts to the host
6//! application, and optionally compacting the transcript when it grows too large.
7//!
8//! # Architecture
9//!
10//! The main entry point is [`Agent`], constructed via [`AgentBuilder`]. The
11//! builder optionally accepts the prior conversation transcript via
12//! [`AgentBuilder::transcript`] and the next user turn via
13//! [`AgentBuilder::input`] — both default to empty. Calling
14//! [`Agent::start`] with a [`SessionConfig`] returns a [`LoopDriver`] that
15//! yields [`LoopStep`]s — either a finished turn or an interrupt that
16//! requires host resolution before the loop can continue.
17//!
18//! If no input was preloaded, the first call to [`LoopDriver::next`] yields
19//! [`LoopInterrupt::AwaitingInput`] and the host supplies the first user
20//! turn via [`InputRequest::submit`]. If input was preloaded, the first
21//! `next()` dispatches the model directly — convenient for one-shot calls.
22//!
23//! ```text
24//! Agent::builder()
25//!     .model(adapter)              // ModelAdapter implementation
26//!     .add_tool_source(registry)   // ToolRegistry (or any ToolSource); call again to federate
27//!     .permissions(checker)        // PermissionChecker for gating tool use
28//!     .observer(obs)               // LoopObserver for streaming events
29//!     .transcript(prior)           // optional: passive prior transcript (system prompt, resumed session)
30//!     .input(first_user_turn)      // optional: preload next user turn so first next() drives a turn
31//!     .build()?
32//!     .start(config).await?  -> LoopDriver
33//!         .next().await?     -> LoopStep::Finished | LoopStep::Interrupt(...)
34//! ```
35//!
36//! # Example
37//!
38//! ```rust,no_run
39//! use agentkit_core::{Item, ItemKind};
40//! use agentkit_loop::{
41//!     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
42//! };
43//!
44//! # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
45//! // One-shot: preload system prompt and first user message; first next()
46//! // drives the model directly.
47//! let agent = Agent::builder()
48//!     .model(adapter)
49//!     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
50//!     .input(vec![Item::text(ItemKind::User, "Hello!")])
51//!     .build()?;
52//!
53//! let mut driver = agent
54//!     .start(SessionConfig::new("demo").with_cache(
55//!         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
56//!     ))
57//!     .await?;
58//!
59//! let _ = driver.next().await?;
60//! # Ok(())
61//! # }
62//! ```
63
64use std::collections::{BTreeMap, HashSet, VecDeque};
65use std::sync::Arc;
66
67use agentkit_compaction::{
68    CompactionConfig, CompactionContext, CompactionReason, CompactionResult,
69};
70use agentkit_core::{
71    CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
72    TextPart, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation, Usage,
73};
74use agentkit_task_manager::{
75    PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskLaunchKind, TaskLaunchRequest,
76    TaskManager, TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
77};
78#[cfg(test)]
79use agentkit_tools_core::ToolContext;
80use agentkit_tools_core::{
81    AllowAllPermissions, ApprovalDecision, ApprovalRequest, BasicToolExecutor, OwnedToolContext,
82    PermissionChecker, ToolCatalogEvent, ToolError, ToolExecutor, ToolRequest, ToolResources,
83    ToolSource, ToolSpec,
84};
85use async_trait::async_trait;
86use serde::{Deserialize, Serialize};
87use thiserror::Error;
88
89const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
90const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
91const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
92const USER_CANCELLED_REASON: &str = "user_cancelled";
93
94/// Configuration required to start a new model session.
95///
96/// Pass this to [`Agent::start`] to initialise the underlying [`ModelSession`]
97/// and obtain a [`LoopDriver`].
98///
99/// # Example
100///
101/// ```rust
102/// use agentkit_loop::{PromptCacheRequest, PromptCacheRetention, SessionConfig};
103///
104/// let config = SessionConfig::new("my-session").with_cache(
105///     PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
106/// );
107/// ```
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109pub struct SessionConfig {
110    /// Unique identifier for the session.
111    pub session_id: SessionId,
112    /// Arbitrary key-value metadata forwarded to the model adapter.
113    pub metadata: MetadataMap,
114    /// Default provider-side prompt caching policy for turns in this session.
115    pub cache: Option<PromptCacheRequest>,
116}
117
118impl SessionConfig {
119    /// Builds a session config with empty metadata and no cache policy.
120    pub fn new(session_id: impl Into<SessionId>) -> Self {
121        Self {
122            session_id: session_id.into(),
123            metadata: MetadataMap::new(),
124            cache: None,
125        }
126    }
127
128    /// Replaces the session metadata map.
129    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
130        self.metadata = metadata;
131        self
132    }
133
134    /// Sets the default prompt cache request for the session.
135    pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
136        self.cache = Some(cache);
137        self
138    }
139
140    /// Clears any default prompt cache request for the session.
141    pub fn without_cache(mut self) -> Self {
142        self.cache = None;
143        self
144    }
145}
146
147/// Strength of a prompt-cache request.
148///
149/// `BestEffort` lets adapters ignore unsupported controls while still using
150/// any provider-native automatic caching they support. `Required` upgrades
151/// unsupported cache requests into provider errors.
152#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
153pub enum PromptCacheMode {
154    /// Disable prompt caching for this request.
155    Disabled,
156    /// Use caching when the provider can honor the request.
157    #[default]
158    BestEffort,
159    /// Fail the turn if the provider cannot honor the request.
160    Required,
161}
162
163/// High-level provider-neutral cache retention hint.
164///
165/// Providers map this to their native controls. For example, OpenAI maps
166/// `Short` to in-memory retention while OpenRouter Anthropic models map it to
167/// the default 5-minute ephemeral cache.
168#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
169pub enum PromptCacheRetention {
170    /// Use the provider's default cache retention.
171    Default,
172    /// Prefer the provider's short-lived cache retention mode.
173    Short,
174    /// Prefer the provider's longest generally available cache retention mode.
175    Extended,
176}
177
178/// Provider-neutral prompt caching strategy.
179#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
180pub enum PromptCacheStrategy {
181    /// Let the provider decide the cacheable prefix automatically.
182    #[default]
183    Automatic,
184    /// Apply explicit cache breakpoints to selected prefix boundaries.
185    Explicit {
186        /// Cache breakpoints in transcript/tool order.
187        breakpoints: Vec<PromptCacheBreakpoint>,
188    },
189}
190
191impl PromptCacheStrategy {
192    /// Uses the provider's native automatic cache behavior when available, or
193    /// any adapter-provided automatic planning fallback.
194    pub fn automatic() -> Self {
195        Self::Automatic
196    }
197
198    /// Uses explicit cache breakpoints.
199    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
200        Self::Explicit {
201            breakpoints: breakpoints.into_iter().collect(),
202        }
203    }
204}
205
206/// Prefix boundary that a provider should cache when using explicit caching.
207#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
208pub enum PromptCacheBreakpoint {
209    /// Cache the tool schema prefix through the last available tool.
210    ToolsEnd,
211    /// Cache through the end of the transcript item at `index`.
212    TranscriptItemEnd { index: usize },
213    /// Cache through the specific transcript part.
214    ///
215    /// Not every adapter can target every part precisely; unsupported
216    /// fine-grained breakpoints become best-effort no-ops unless the request is
217    /// marked [`PromptCacheMode::Required`].
218    TranscriptPartEnd {
219        item_index: usize,
220        part_index: usize,
221    },
222}
223
224impl PromptCacheBreakpoint {
225    /// Cache the tool schema prefix through the last available tool.
226    pub fn tools_end() -> Self {
227        Self::ToolsEnd
228    }
229
230    /// Cache through the end of a transcript item.
231    pub fn transcript_item_end(index: usize) -> Self {
232        Self::TranscriptItemEnd { index }
233    }
234
235    /// Cache through a specific part within a transcript item.
236    pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
237        Self::TranscriptPartEnd {
238            item_index,
239            part_index,
240        }
241    }
242}
243
244/// Prompt caching request sent alongside a turn.
245#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
246pub struct PromptCacheRequest {
247    /// Strength of the caching request.
248    pub mode: PromptCacheMode,
249    /// Automatic or explicit caching strategy.
250    pub strategy: PromptCacheStrategy,
251    /// Optional provider-neutral retention hint.
252    pub retention: Option<PromptCacheRetention>,
253    /// Optional provider cache key or routing key.
254    pub key: Option<String>,
255}
256
257impl PromptCacheRequest {
258    /// Builds a best-effort automatic cache request.
259    pub fn automatic() -> Self {
260        Self::best_effort(PromptCacheStrategy::automatic())
261    }
262
263    /// Builds a required automatic cache request.
264    pub fn automatic_required() -> Self {
265        Self::required(PromptCacheStrategy::automatic())
266    }
267
268    /// Builds a best-effort explicit cache request.
269    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
270        Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
271    }
272
273    /// Builds a required explicit cache request.
274    pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
275        Self::required(PromptCacheStrategy::explicit(breakpoints))
276    }
277
278    /// Builds a disabled cache request.
279    pub fn disabled() -> Self {
280        Self {
281            mode: PromptCacheMode::Disabled,
282            strategy: PromptCacheStrategy::Automatic,
283            retention: None,
284            key: None,
285        }
286    }
287
288    /// Builds a best-effort cache request with the given strategy.
289    pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
290        Self {
291            mode: PromptCacheMode::BestEffort,
292            strategy,
293            retention: None,
294            key: None,
295        }
296    }
297
298    /// Builds a required cache request with the given strategy.
299    pub fn required(strategy: PromptCacheStrategy) -> Self {
300        Self {
301            mode: PromptCacheMode::Required,
302            strategy,
303            retention: None,
304            key: None,
305        }
306    }
307
308    /// Overrides the request mode.
309    pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
310        self.mode = mode;
311        self
312    }
313
314    /// Overrides the request strategy.
315    pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
316        self.strategy = strategy;
317        self
318    }
319
320    /// Applies a provider-neutral retention hint.
321    pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
322        self.retention = Some(retention);
323        self
324    }
325
326    /// Applies a provider cache key or routing key.
327    pub fn with_key(mut self, key: impl Into<String>) -> Self {
328        self.key = Some(key.into());
329        self
330    }
331
332    /// Clears any provider-neutral retention hint.
333    pub fn without_retention(mut self) -> Self {
334        self.retention = None;
335        self
336    }
337
338    /// Clears any provider cache key or routing key.
339    pub fn without_key(mut self) -> Self {
340        self.key = None;
341        self
342    }
343
344    /// Returns true when caching should be active for this request.
345    pub fn is_enabled(&self) -> bool {
346        !matches!(self.mode, PromptCacheMode::Disabled)
347    }
348}
349
350/// Payload sent to the model at the start of each turn.
351///
352/// The [`LoopDriver`] constructs this automatically from its internal state
353/// and passes it to [`ModelSession::begin_turn`].  Model adapter authors
354/// use the fields to build the provider-specific request.
355#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
356pub struct TurnRequest {
357    /// Session this turn belongs to.
358    pub session_id: SessionId,
359    /// Unique identifier for the current turn.
360    pub turn_id: agentkit_core::TurnId,
361    /// Full conversation transcript accumulated so far.
362    pub transcript: Vec<Item>,
363    /// Tool specifications the model may invoke during this turn.
364    pub available_tools: Vec<ToolSpec>,
365    /// Provider-side prompt caching request for this turn.
366    pub cache: Option<PromptCacheRequest>,
367    /// Per-turn metadata (e.g. provider hints).
368    pub metadata: MetadataMap,
369}
370
371/// Final result produced by a single model turn.
372///
373/// Returned inside [`ModelTurnEvent::Finished`] to signal that the model has
374/// completed its generation for this turn.
375#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
376pub struct ModelTurnResult {
377    /// Why the model stopped generating (e.g. completed, tool call, length).
378    pub finish_reason: FinishReason,
379    /// Items the model produced during this turn (text, tool calls, etc.).
380    pub output_items: Vec<Item>,
381    /// Token usage statistics, if available.
382    pub usage: Option<Usage>,
383    /// Provider-specific metadata about the turn.
384    pub metadata: MetadataMap,
385}
386
387/// Streaming event emitted by a [`ModelTurn`] during generation.
388///
389/// The [`LoopDriver`] consumes these events one-by-one via
390/// [`ModelTurn::next_event`] and translates them into [`AgentEvent`]s for
391/// observers and into transcript mutations.
392#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
393pub enum ModelTurnEvent {
394    /// Incremental text or content delta from the model.
395    Delta(Delta),
396    /// The model is requesting a tool call.
397    ToolCall(ToolCallPart),
398    /// Updated token usage statistics.
399    Usage(Usage),
400    /// The model has finished generating for this turn.
401    Finished(ModelTurnResult),
402}
403
404/// Factory for creating model sessions.
405///
406/// Implement this trait to integrate a model provider (e.g. OpenRouter,
407/// Anthropic, a local LLM server) with the agent loop.  [`Agent`] holds a
408/// single adapter and calls [`start_session`](ModelAdapter::start_session)
409/// once when [`Agent::start`] is invoked.
410///
411/// # Example
412///
413/// ```rust,no_run
414/// use agentkit_loop::{ModelAdapter, ModelSession, SessionConfig, LoopError};
415/// use async_trait::async_trait;
416///
417/// struct MyAdapter;
418///
419/// #[async_trait]
420/// impl ModelAdapter for MyAdapter {
421///     type Session = MySession;
422///
423///     async fn start_session(&self, config: SessionConfig) -> Result<MySession, LoopError> {
424///         // Initialize provider-specific session state here.
425///         Ok(MySession { /* ... */ })
426///     }
427/// }
428/// # struct MySession;
429/// # #[async_trait]
430/// # impl ModelSession for MySession {
431/// #     type Turn = MyTurn;
432/// #     async fn begin_turn(&mut self, _r: agentkit_loop::TurnRequest, _c: Option<agentkit_core::TurnCancellation>) -> Result<MyTurn, LoopError> { todo!() }
433/// # }
434/// # struct MyTurn;
435/// # #[async_trait]
436/// # impl agentkit_loop::ModelTurn for MyTurn {
437/// #     async fn next_event(&mut self, _c: Option<agentkit_core::TurnCancellation>) -> Result<Option<agentkit_loop::ModelTurnEvent>, LoopError> { todo!() }
438/// # }
439/// ```
440#[async_trait]
441pub trait ModelAdapter: Send + Sync {
442    /// The session type produced by this adapter.
443    type Session: ModelSession;
444
445    /// Create a new model session from the given configuration.
446    ///
447    /// # Errors
448    ///
449    /// Returns [`LoopError`] if the provider connection or initialisation fails.
450    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
451}
452
453/// An active model session that can produce sequential turns.
454///
455/// A session is created once per [`Agent::start`] call and lives for the
456/// lifetime of the [`LoopDriver`].  Each call to [`begin_turn`](ModelSession::begin_turn)
457/// hands the full transcript to the model and returns a streaming
458/// [`ModelTurn`].
459#[async_trait]
460pub trait ModelSession: Send {
461    /// The turn type produced by this session.
462    type Turn: ModelTurn;
463
464    /// Start a new turn, sending the transcript and available tools to the model.
465    ///
466    /// # Arguments
467    ///
468    /// * `request` -- the turn payload including transcript and tool specs.
469    /// * `cancellation` -- optional handle the implementation should poll to
470    ///   detect user-initiated cancellation.
471    ///
472    /// # Errors
473    ///
474    /// Returns [`LoopError::Cancelled`] when the turn is cancelled, or a
475    /// provider-specific error wrapped in [`LoopError`].
476    async fn begin_turn(
477        &mut self,
478        request: TurnRequest,
479        cancellation: Option<TurnCancellation>,
480    ) -> Result<Self::Turn, LoopError>;
481}
482
483/// A streaming model turn that yields events one at a time.
484///
485/// The loop driver calls [`next_event`](ModelTurn::next_event) repeatedly
486/// until it returns `Ok(None)` (stream exhausted) or
487/// `Ok(Some(ModelTurnEvent::Finished(_)))`.
488#[async_trait]
489pub trait ModelTurn: Send {
490    /// Retrieve the next event from the model's response stream.
491    ///
492    /// Returns `Ok(None)` when the stream is exhausted.
493    ///
494    /// # Errors
495    ///
496    /// Returns [`LoopError::Cancelled`] if `cancellation` fires, or a
497    /// provider-specific error wrapped in [`LoopError`].
498    async fn next_event(
499        &mut self,
500        cancellation: Option<TurnCancellation>,
501    ) -> Result<Option<ModelTurnEvent>, LoopError>;
502}
503
504/// Observer hook for streaming agent events to the host application.
505///
506/// Register observers via [`AgentBuilder::observer`] to receive real-time
507/// notifications about deltas, tool calls, usage, warnings, and lifecycle
508/// events.
509///
510/// # Example
511///
512/// ```rust
513/// use agentkit_loop::{AgentEvent, LoopObserver};
514///
515/// struct StdoutObserver;
516///
517/// impl LoopObserver for StdoutObserver {
518///     fn handle_event(&mut self, event: AgentEvent) {
519///         println!("{event:?}");
520///     }
521/// }
522/// ```
523pub trait LoopObserver: Send {
524    /// Called synchronously for every [`AgentEvent`] emitted by the loop driver.
525    fn handle_event(&mut self, event: AgentEvent);
526}
527
528/// Receives full [`Item`]s as they are appended to the driver's transcript.
529///
530/// While [`LoopObserver`] surfaces operational events (deltas, tool calls,
531/// lifecycle, telemetry), it can't be reconstructed back into a faithful
532/// transcript on its own — content deltas span partial parts and don't
533/// carry their parent-Item identity, and historically tool results were
534/// pushed into the transcript with no observer event at all. A
535/// `TranscriptObserver` is the loss-free counterpart: it fires once per
536/// [`Item`] appended, with the full Item shape ready for persistence,
537/// replication, or audit.
538///
539/// Observers are called *synchronously* from inside the driver, in the
540/// same order items land in the transcript. Compaction-driven transcript
541/// rewrites do **not** fire `on_item_appended` — those are signaled by
542/// [`AgentEvent::CompactionFinished`] instead.
543///
544/// Register via [`AgentBuilder::transcript_observer`]; multiple observers
545/// may be registered and are called in registration order.
546///
547/// # Example
548///
549/// ```rust
550/// use agentkit_core::Item;
551/// use agentkit_loop::TranscriptObserver;
552///
553/// struct CountingObserver { items: usize }
554///
555/// impl TranscriptObserver for CountingObserver {
556///     fn on_item_appended(&mut self, _item: &Item) {
557///         self.items += 1;
558///     }
559/// }
560/// ```
561pub trait TranscriptObserver: Send {
562    /// Called synchronously every time an [`Item`] is appended to the
563    /// driver's transcript, in transcript order.
564    fn on_item_appended(&mut self, item: &Item);
565}
566
567/// Lifecycle and streaming events emitted by the [`LoopDriver`].
568///
569/// Observers (see [`LoopObserver`]) receive these events in the order they
570/// occur.  They are useful for building UIs, logging, or telemetry.
571#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
572pub enum AgentEvent {
573    /// The agent run has been initialised.
574    RunStarted { session_id: SessionId },
575    /// A new model turn is starting.
576    TurnStarted {
577        session_id: SessionId,
578        turn_id: agentkit_core::TurnId,
579    },
580    /// User input has been accepted into the pending queue.
581    InputAccepted {
582        session_id: SessionId,
583        items: Vec<Item>,
584    },
585    /// Incremental content delta from the model.
586    ContentDelta(Delta),
587    /// The model has requested a tool call.
588    ToolCallRequested(ToolCallPart),
589    /// A tool call's result has landed in the transcript.
590    ///
591    /// Fires once per [`Part::ToolResult`] that's appended, including the
592    /// synthetic placeholder that's pushed when a tool is detached to the
593    /// background — the real result fires a second event with the same
594    /// [`ToolResultPart::call_id`] once the background task completes.
595    /// Cancellation/denial paths (auth cancelled, approval denied) also
596    /// emit this with `is_error = true`.
597    ///
598    /// Correlate with the matching [`AgentEvent::ToolCallRequested`] via
599    /// `call_id`.
600    ToolResultReceived(ToolResultPart),
601    /// A tool call requires explicit user approval before execution.
602    ApprovalRequired(ApprovalRequest),
603    /// An approval interrupt has been resolved.
604    ApprovalResolved { approved: bool },
605    /// The available tool catalog changed and will be reflected on the next model request.
606    ToolCatalogChanged(ToolCatalogEvent),
607    /// Transcript compaction is about to begin.
608    CompactionStarted {
609        session_id: SessionId,
610        turn_id: Option<agentkit_core::TurnId>,
611        reason: CompactionReason,
612    },
613    /// Transcript compaction has finished.
614    CompactionFinished {
615        session_id: SessionId,
616        turn_id: Option<agentkit_core::TurnId>,
617        replaced_items: usize,
618        transcript_len: usize,
619        metadata: MetadataMap,
620    },
621    /// Updated token usage statistics.
622    UsageUpdated(Usage),
623    /// Non-fatal warning (e.g. a tool failure that was recovered from).
624    Warning { message: String },
625    /// The agent run has failed with an unrecoverable error.
626    RunFailed { message: String },
627    /// A turn has finished (successfully, via cancellation, etc.).
628    TurnFinished(TurnResult),
629}
630
631/// Handle for a pending approval interrupt.
632///
633/// Wraps an [`ApprovalRequest`] and provides ergonomic resolution methods
634/// so callers can resolve the interrupt directly instead of searching for
635/// the matching method on [`LoopDriver`].
636///
637/// # Example
638///
639/// ```rust,no_run
640/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
641/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
642/// match driver.next().await? {
643///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
644///         println!("Needs approval: {}", pending.request.summary);
645///         pending.approve(driver)?;
646///     }
647///     _ => {}
648/// }
649/// # Ok(())
650/// # }
651/// ```
652#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
653pub struct PendingApproval {
654    /// The underlying approval request details.
655    pub request: ApprovalRequest,
656}
657
658impl std::ops::Deref for PendingApproval {
659    type Target = ApprovalRequest;
660    fn deref(&self) -> &ApprovalRequest {
661        &self.request
662    }
663}
664
665impl PendingApproval {
666    /// Approve the pending tool call.
667    pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
668        let call_id = self
669            .request
670            .call_id
671            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
672        driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
673    }
674
675    /// Deny the pending tool call.
676    pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
677        let call_id = self
678            .request
679            .call_id
680            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
681        driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
682    }
683
684    /// Deny the pending tool call with a reason.
685    pub fn deny_with_reason<S: ModelSession>(
686        self,
687        driver: &mut LoopDriver<S>,
688        reason: impl Into<String>,
689    ) -> Result<(), LoopError> {
690        let call_id = self
691            .request
692            .call_id
693            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
694        driver.resolve_approval_for(
695            call_id,
696            ApprovalDecision::Deny {
697                reason: Some(reason.into()),
698            },
699        )
700    }
701
702    /// Approve the pending tool call with a patched input.
703    ///
704    /// The model's original tool input is replaced with `input` before the
705    /// tool executes. The transcript still records the call as the model
706    /// emitted it; only the executor sees the patched payload. This mirrors
707    /// the `PermissionResultAllow(updated_input=...)` pattern from the
708    /// Anthropic Agent SDK and is intended for hosts that want to sanitise,
709    /// restrict, or augment arguments before tool execution without forcing
710    /// the model to re-issue the call.
711    pub fn approve_with_patched_input<S: ModelSession>(
712        self,
713        driver: &mut LoopDriver<S>,
714        input: serde_json::Value,
715    ) -> Result<(), LoopError> {
716        let call_id = self
717            .request
718            .call_id
719            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
720        driver.resolve_approval_for_with_patched_input(call_id, input)
721    }
722}
723
724/// Descriptor for a [`LoopInterrupt::AwaitingInput`] interrupt.
725///
726/// Returned when the driver has no pending input and needs the host to
727/// supply items before advancing. This is the entry point for every user
728/// turn that wasn't preloaded via [`AgentBuilder::input`]. Transcript items
729/// loaded via [`AgentBuilder::transcript`] are passive, so when no input is
730/// preloaded the first [`LoopDriver::next`] call surfaces `AwaitingInput`
731/// and the host injects the first user message via [`InputRequest::submit`].
732///
733/// # Example
734///
735/// ```rust,no_run
736/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
737/// # use agentkit_core::Item;
738/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>, items: Vec<Item>) -> Result<(), agentkit_loop::LoopError> {
739/// match driver.next().await? {
740///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
741///         pending.submit(driver, items)?;
742///     }
743///     _ => {}
744/// }
745/// # Ok(())
746/// # }
747/// ```
748#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
749pub struct InputRequest {
750    /// The session that is waiting for input.
751    pub session_id: SessionId,
752    /// Human-readable explanation of why input is needed.
753    pub reason: String,
754}
755
756impl InputRequest {
757    /// Submit input items to the driver.
758    pub fn submit<S: ModelSession>(
759        self,
760        driver: &mut LoopDriver<S>,
761        items: Vec<Item>,
762    ) -> Result<(), LoopError> {
763        driver.submit_input(items)
764    }
765}
766
767/// Outcome of a completed (or cancelled) turn.
768///
769/// Wrapped by [`LoopStep::Finished`] and also emitted as
770/// [`AgentEvent::TurnFinished`] to observers.
771#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
772pub struct TurnResult {
773    /// Identifier for the turn that produced this result.
774    pub turn_id: agentkit_core::TurnId,
775    /// Why the turn ended (completed, tool call, cancelled, etc.).
776    pub finish_reason: FinishReason,
777    /// Items produced during this turn (assistant text, tool results, etc.).
778    pub items: Vec<Item>,
779    /// Aggregated token usage, if reported by the model.
780    pub usage: Option<Usage>,
781    /// Additional metadata about the turn.
782    pub metadata: MetadataMap,
783}
784
785/// An interrupt that pauses the agent loop until the host resolves it.
786///
787/// The loop returns an interrupt inside [`LoopStep::Interrupt`] whenever it
788/// cannot proceed autonomously.  Each variant carries a handle with
789/// resolution methods so callers can resolve the interrupt directly.
790///
791/// # Example
792///
793/// ```rust,no_run
794/// use agentkit_loop::{LoopInterrupt, LoopStep};
795/// # use agentkit_loop::LoopDriver;
796///
797/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
798/// match driver.next().await? {
799///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
800///         println!("Tool {} needs approval: {}", pending.request.request_kind, pending.request.summary);
801///         pending.approve(driver)?;
802///     }
803///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
804///         println!("Waiting for input: {}", pending.reason);
805///         // ... call pending.submit(driver, items)
806///     }
807///     LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => {
808///         // Cooperative yield between tool rounds.  Optionally call
809///         // driver.submit_input(...) to interject a user message, then
810///         // call driver.next() to resume the turn.
811///         let _ = info;
812///     }
813///     LoopStep::Finished(result) => {
814///         println!("Turn finished: {:?}", result.finish_reason);
815///     }
816/// }
817/// # Ok(())
818/// # }
819/// ```
820#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
821pub enum LoopInterrupt {
822    /// A tool call requires explicit approval before it can execute.
823    ApprovalRequest(PendingApproval),
824    /// The driver has no pending input and needs the host to supply some.
825    AwaitingInput(InputRequest),
826    /// A tool round finished: all tool calls from the previous assistant
827    /// message now have results in the transcript, and the driver is about to
828    /// invoke the model again. The host may interject user messages via the
829    /// [`ToolRoundInfo::submit`] handle before calling [`LoopDriver::next`]
830    /// to resume.
831    ///
832    /// This is a non-blocking interrupt: callers that do not care about
833    /// mid-turn interjection can treat it as a no-op (`_ => continue`) and
834    /// the next `next()` call resumes the turn.
835    AfterToolResult(ToolRoundInfo),
836}
837
838impl LoopInterrupt {
839    /// Returns `true` if the interrupt must be explicitly resolved before
840    /// the loop can make progress. Approvals are blocking;
841    /// [`AwaitingInput`](LoopInterrupt::AwaitingInput) and
842    /// [`AfterToolResult`](LoopInterrupt::AfterToolResult) are cooperative
843    /// and can be ignored by calling [`LoopDriver::next`] again.
844    pub fn is_blocking(&self) -> bool {
845        matches!(self, LoopInterrupt::ApprovalRequest(_))
846    }
847}
848
849/// Metadata describing a completed tool round, surfaced via
850/// [`LoopInterrupt::AfterToolResult`].
851#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
852pub struct ToolRoundInfo {
853    /// The session that produced this tool round.
854    pub session_id: SessionId,
855    /// The turn that is about to continue into the next model call.
856    pub turn_id: agentkit_core::TurnId,
857    /// Transcript length at the yield point (for snapshots / UIs).
858    pub transcript_len: usize,
859}
860
861impl ToolRoundInfo {
862    /// Interject user input between tool rounds. Consumes the
863    /// [`ToolRoundInfo`] handle so the same yield cannot accept input twice.
864    pub fn submit<S: ModelSession>(
865        self,
866        driver: &mut LoopDriver<S>,
867        items: Vec<Item>,
868    ) -> Result<(), LoopError> {
869        driver.submit_input(items)
870    }
871}
872
873/// The result of advancing the agent loop by one step.
874///
875/// Returned by [`LoopDriver::next`].  The host should pattern-match on this
876/// to decide whether to continue the loop or resolve an interrupt first.
877///
878/// # Example
879///
880/// ```rust,no_run
881/// use agentkit_loop::LoopStep;
882/// # use agentkit_loop::LoopDriver;
883///
884/// # async fn run<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
885/// loop {
886///     match driver.next().await? {
887///         LoopStep::Finished(result) => {
888///             println!("Turn complete: {:?}", result.finish_reason);
889///             break;
890///         }
891///         LoopStep::Interrupt(interrupt) => {
892///             // Resolve the interrupt, then continue the loop.
893///             # break;
894///         }
895///     }
896/// }
897/// # Ok(())
898/// # }
899/// ```
900#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
901pub enum LoopStep {
902    /// The loop is paused and requires host action.
903    Interrupt(LoopInterrupt),
904    /// A turn has completed (or been cancelled).
905    Finished(TurnResult),
906}
907
908/// A read-only snapshot of the loop driver's current state.
909///
910/// Obtained via [`LoopDriver::snapshot`].  Useful for persisting or
911/// inspecting the conversation transcript without holding a mutable
912/// reference to the driver.
913#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
914pub struct LoopSnapshot {
915    /// Session identifier.
916    pub session_id: SessionId,
917    /// The full transcript accumulated so far.
918    pub transcript: Vec<Item>,
919    /// Input items queued but not yet consumed by a turn.
920    pub pending_input: Vec<Item>,
921}
922
923#[derive(Clone, Debug)]
924struct PendingApprovalToolCall {
925    request: ApprovalRequest,
926    decision: Option<ApprovalDecision>,
927    surfaced: bool,
928    turn_id: agentkit_core::TurnId,
929    task_id: TaskId,
930    call: ToolCallPart,
931    tool_request: ToolRequest,
932}
933
934#[derive(Clone, Debug, Default)]
935struct ActiveToolRound {
936    turn_id: agentkit_core::TurnId,
937    pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
938    background_pending: bool,
939    foreground_progressed: bool,
940}
941
942/// A configured agent ready to start a session.
943///
944/// Build one with [`Agent::builder`], supplying at minimum a [`ModelAdapter`].
945/// Optionally preload prior conversation state via
946/// [`AgentBuilder::transcript`] and the next user turn via
947/// [`AgentBuilder::input`]. Then call [`Agent::start`] with a
948/// [`SessionConfig`] to obtain a [`LoopDriver`] that drives the agentic loop.
949///
950/// If no input is preloaded, the first call to [`LoopDriver::next`] yields
951/// [`LoopInterrupt::AwaitingInput`] so the host can supply the first user
952/// message via [`InputRequest::submit`]. If input was preloaded, the first
953/// `next()` dispatches the model directly.
954///
955/// # Example
956///
957/// ```rust,no_run
958/// use agentkit_core::{Item, ItemKind};
959/// use agentkit_loop::{
960///     Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig,
961/// };
962/// use agentkit_tools_core::ToolRegistry;
963///
964/// # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
965/// let agent = Agent::builder()
966///     .model(adapter)
967///     .add_tool_source(ToolRegistry::new())
968///     .transcript(vec![Item::text(ItemKind::System, "You are a helpful assistant.")])
969///     .input(vec![Item::text(ItemKind::User, "Hello!")])
970///     .build()?;
971///
972/// let mut driver = agent
973///     .start(SessionConfig::new("s1").with_cache(
974///         PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
975///     ))
976///     .await?;
977///
978/// // First next() drives the model since input was preloaded.
979/// let _ = driver.next().await?;
980/// # Ok(())
981/// # }
982/// ```
983pub struct Agent<M>
984where
985    M: ModelAdapter,
986{
987    model: M,
988    tool_sources: Vec<Arc<dyn ToolSource>>,
989    tool_executor: Option<Arc<dyn ToolExecutor>>,
990    task_manager: Arc<dyn TaskManager>,
991    permissions: Arc<dyn PermissionChecker>,
992    resources: Arc<dyn ToolResources>,
993    cancellation: Option<CancellationHandle>,
994    compaction: Option<CompactionConfig>,
995    observers: Vec<Box<dyn LoopObserver>>,
996    transcript_observers: Vec<Box<dyn TranscriptObserver>>,
997    transcript: Vec<Item>,
998    input: Vec<Item>,
999}
1000
1001impl<M> Agent<M>
1002where
1003    M: ModelAdapter,
1004{
1005    /// Create a new [`AgentBuilder`] for configuring this agent.
1006    pub fn builder() -> AgentBuilder<M> {
1007        AgentBuilder::default()
1008    }
1009
1010    /// Consume the agent and start a session, returning a [`LoopDriver`]
1011    /// preloaded with whatever transcript and input were configured on the
1012    /// builder. See [`AgentBuilder::transcript`] and [`AgentBuilder::input`]
1013    /// for what each one does and when to use them.
1014    ///
1015    /// This calls [`ModelAdapter::start_session`] and emits an
1016    /// [`AgentEvent::RunStarted`] event to all registered observers.
1017    ///
1018    /// # Errors
1019    ///
1020    /// Returns [`LoopError`] if the model adapter fails to create a session.
1021    pub async fn start(self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
1022        let session_id = config.session_id.clone();
1023        let default_cache = config.cache.clone();
1024        let session = self.model.start_session(config).await?;
1025        let tool_executor = self
1026            .tool_executor
1027            .unwrap_or_else(|| Arc::new(BasicToolExecutor::new(self.tool_sources.clone())));
1028        let mut driver = LoopDriver {
1029            session_id: session_id.clone(),
1030            default_cache,
1031            next_turn_cache: None,
1032            session: Some(session),
1033            tool_executor,
1034            task_manager: self.task_manager,
1035            permissions: self.permissions,
1036            resources: self.resources,
1037            cancellation: self.cancellation,
1038            compaction: self.compaction,
1039            observers: self.observers,
1040            transcript_observers: self.transcript_observers,
1041            transcript: self.transcript,
1042            pending_input: self.input,
1043            pending_approvals: BTreeMap::new(),
1044            pending_approval_order: VecDeque::new(),
1045            active_tool_round: None,
1046            pending_round_resume: None,
1047            next_turn_index: 1,
1048            detached_call_ids: HashSet::new(),
1049        };
1050        driver.emit(AgentEvent::RunStarted { session_id });
1051        Ok(driver)
1052    }
1053}
1054
1055/// Builder for constructing an [`Agent`].
1056///
1057/// Obtained via [`Agent::builder`].  The only required field is
1058/// [`model`](AgentBuilder::model); all others have sensible defaults
1059/// (no tools, allow-all permissions, no compaction, no observers).
1060pub struct AgentBuilder<M>
1061where
1062    M: ModelAdapter,
1063{
1064    model: Option<M>,
1065    tool_sources: Vec<Arc<dyn ToolSource>>,
1066    tool_executor: Option<Arc<dyn ToolExecutor>>,
1067    task_manager: Option<Arc<dyn TaskManager>>,
1068    permissions: Arc<dyn PermissionChecker>,
1069    resources: Arc<dyn ToolResources>,
1070    cancellation: Option<CancellationHandle>,
1071    compaction: Option<CompactionConfig>,
1072    observers: Vec<Box<dyn LoopObserver>>,
1073    transcript_observers: Vec<Box<dyn TranscriptObserver>>,
1074    transcript: Vec<Item>,
1075    input: Vec<Item>,
1076}
1077
1078impl<M> Default for AgentBuilder<M>
1079where
1080    M: ModelAdapter,
1081{
1082    fn default() -> Self {
1083        Self {
1084            model: None,
1085            tool_sources: Vec::new(),
1086            tool_executor: None,
1087            task_manager: None,
1088            permissions: Arc::new(AllowAllPermissions),
1089            resources: Arc::new(()),
1090            cancellation: None,
1091            compaction: None,
1092            observers: Vec::new(),
1093            transcript_observers: Vec::new(),
1094            transcript: Vec::new(),
1095            input: Vec::new(),
1096        }
1097    }
1098}
1099
1100impl<M> AgentBuilder<M>
1101where
1102    M: ModelAdapter,
1103{
1104    /// Set the model adapter (required).
1105    pub fn model(mut self, model: M) -> Self {
1106        self.model = Some(model);
1107        self
1108    }
1109
1110    /// Adds a tool source to the agent. Call multiple times to compose
1111    /// federated sources — for example a frozen native [`ToolRegistry`]
1112    /// alongside an MCP manager's [`agentkit_tools_core::CatalogReader`]
1113    /// and a skill-watcher reader. Sources are walked in registration
1114    /// order; the default [`agentkit_tools_core::CollisionPolicy`] is
1115    /// `FirstWins`.
1116    ///
1117    /// Accepts any sized [`ToolSource`]; the agent owns it for the
1118    /// session. To share a dynamic source between the agent and the
1119    /// subsystem mutating it, mint a [`agentkit_tools_core::CatalogReader`]
1120    /// from a [`agentkit_tools_core::dynamic_catalog`] pair — the reader
1121    /// is sized and owned, hosts never see the underlying `Arc`.
1122    pub fn add_tool_source<S: ToolSource + 'static>(mut self, source: S) -> Self {
1123        self.tool_sources.push(Arc::new(source));
1124        self
1125    }
1126
1127    /// Set a custom [`ToolExecutor`]. When provided, the agent uses it
1128    /// instead of building a [`BasicToolExecutor`] from the configured
1129    /// sources. Most hosts should use [`add_tool_source`](Self::add_tool_source)
1130    /// instead; this is for advanced cases (custom routing, instrumentation,
1131    /// test fakes).
1132    pub fn tool_executor(mut self, executor: impl ToolExecutor + 'static) -> Self {
1133        self.tool_executor = Some(Arc::new(executor));
1134        self
1135    }
1136
1137    /// Set the task manager that schedules tool-call execution.
1138    ///
1139    /// Defaults to [`SimpleTaskManager`], which preserves the existing
1140    /// sequential request/response behavior.
1141    pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1142        self.task_manager = Some(Arc::new(manager));
1143        self
1144    }
1145
1146    /// Set the permission checker that gates tool execution.
1147    ///
1148    /// Defaults to allowing all tool calls without prompting.
1149    pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1150        self.permissions = Arc::new(permissions);
1151        self
1152    }
1153
1154    /// Set shared resources available to tool implementations.
1155    pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1156        self.resources = Arc::new(resources);
1157        self
1158    }
1159
1160    /// Attach a [`CancellationHandle`] for cooperative cancellation of turns.
1161    pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1162        self.cancellation = Some(handle);
1163        self
1164    }
1165
1166    /// Enable transcript compaction with the given configuration.
1167    ///
1168    /// When configured, the driver checks the compaction trigger before each
1169    /// turn and applies the compaction strategy if the transcript is too long.
1170    pub fn compaction(mut self, config: CompactionConfig) -> Self {
1171        self.compaction = Some(config);
1172        self
1173    }
1174
1175    /// Register a [`LoopObserver`] that receives [`AgentEvent`]s.
1176    ///
1177    /// Multiple observers may be registered; they are called in order.
1178    pub fn observer(mut self, observer: impl LoopObserver + 'static) -> Self {
1179        self.observers.push(Box::new(observer));
1180        self
1181    }
1182
1183    /// Register a [`TranscriptObserver`] that receives an [`Item`] every
1184    /// time one is appended to the transcript.
1185    ///
1186    /// Multiple observers may be registered; they are called in order.
1187    /// Use this when you need a loss-free view of the transcript (e.g.
1188    /// for persistence or replication) — [`LoopObserver`] alone is
1189    /// insufficient because it doesn't expose item boundaries for model
1190    /// output and historically did not surface tool results at all.
1191    pub fn transcript_observer(mut self, observer: impl TranscriptObserver + 'static) -> Self {
1192        self.transcript_observers.push(Box::new(observer));
1193        self
1194    }
1195
1196    /// Preload the driver's transcript with prior conversation state
1197    /// (defaults to empty).
1198    ///
1199    /// Items pass straight into the driver's transcript without firing
1200    /// [`TranscriptObserver::on_item_appended`] — the host is expected to
1201    /// already know about (and have persisted) anything it preloads. Use
1202    /// this for resumed sessions or to seed a system prompt.
1203    pub fn transcript(mut self, transcript: Vec<Item>) -> Self {
1204        self.transcript = transcript;
1205        self
1206    }
1207
1208    /// Preload the driver's pending-input queue with the next user turn
1209    /// (defaults to empty).
1210    ///
1211    /// When non-empty, the first [`LoopDriver::next`] dispatches the model
1212    /// directly instead of yielding [`LoopInterrupt::AwaitingInput`]. Use
1213    /// this for one-shot calls and scripts where the first user turn is
1214    /// known up front. Items move to the transcript on turn dispatch the
1215    /// same way submitted input does, firing transcript observers.
1216    pub fn input(mut self, input: Vec<Item>) -> Self {
1217        self.input = input;
1218        self
1219    }
1220
1221    /// Consume the builder and produce an [`Agent`].
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns [`LoopError::InvalidState`] if no model adapter was provided.
1226    pub fn build(self) -> Result<Agent<M>, LoopError> {
1227        let model = self
1228            .model
1229            .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1230        Ok(Agent {
1231            model,
1232            tool_sources: self.tool_sources,
1233            tool_executor: self.tool_executor,
1234            task_manager: self
1235                .task_manager
1236                .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1237            permissions: self.permissions,
1238            resources: self.resources,
1239            cancellation: self.cancellation,
1240            compaction: self.compaction,
1241            observers: self.observers,
1242            transcript_observers: self.transcript_observers,
1243            transcript: self.transcript,
1244            input: self.input,
1245        })
1246    }
1247}
1248
1249/// The runtime driver that advances the agent loop step by step.
1250///
1251/// Obtained from [`Agent::start`] with the builder's preloaded transcript
1252/// and pending-input queue baked in.
1253/// The typical usage pattern is:
1254///
1255/// 1. Call [`next`](LoopDriver::next) to advance the loop.
1256/// 2. Handle the returned [`LoopStep`]:
1257///    - [`LoopStep::Finished`] -- the turn completed, inspect the result.
1258///    - [`LoopStep::Interrupt`] -- resolve the interrupt via the bound
1259///      [`Pending*`](LoopInterrupt) handle, then call `next` again.
1260///
1261/// # Example
1262///
1263/// ```rust,no_run
1264/// use agentkit_core::{Item, ItemKind};
1265/// use agentkit_loop::{LoopDriver, LoopStep};
1266///
1267/// # async fn drive<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1268/// let step = driver.next().await?;
1269/// match step {
1270///     LoopStep::Finished(result) => println!("Done: {:?}", result.finish_reason),
1271///     LoopStep::Interrupt(interrupt) => {
1272///         // Resolve via the pending handle, then call next() again.
1273///         println!("Interrupted: {interrupt:?}");
1274///     }
1275/// }
1276/// # Ok(())
1277/// # }
1278/// ```
1279pub struct LoopDriver<S>
1280where
1281    S: ModelSession,
1282{
1283    session_id: SessionId,
1284    default_cache: Option<PromptCacheRequest>,
1285    next_turn_cache: Option<PromptCacheRequest>,
1286    session: Option<S>,
1287    tool_executor: Arc<dyn ToolExecutor>,
1288    task_manager: Arc<dyn TaskManager>,
1289    permissions: Arc<dyn PermissionChecker>,
1290    resources: Arc<dyn ToolResources>,
1291    cancellation: Option<CancellationHandle>,
1292    compaction: Option<CompactionConfig>,
1293    observers: Vec<Box<dyn LoopObserver>>,
1294    transcript_observers: Vec<Box<dyn TranscriptObserver>>,
1295    transcript: Vec<Item>,
1296    pending_input: Vec<Item>,
1297    pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1298    pending_approval_order: VecDeque<ToolCallId>,
1299    active_tool_round: Option<ActiveToolRound>,
1300    pending_round_resume: Option<agentkit_core::TurnId>,
1301    next_turn_index: u64,
1302    /// Call ids whose original tool_use was already paired with a
1303    /// synthetic detach tool_result. When the real result eventually
1304    /// arrives via the task manager, we MUST NOT emit a second
1305    /// tool_result for the same id — the provider schema requires
1306    /// exactly one tool_result per tool_use. Instead we route the
1307    /// resolution into a [`ItemKind::Notification`] item that the model
1308    /// can react to on the next turn.
1309    detached_call_ids: HashSet<ToolCallId>,
1310}
1311
1312impl<S> LoopDriver<S>
1313where
1314    S: ModelSession,
1315{
1316    fn execute_tool_span(
1317        &self,
1318        request: &ToolRequest,
1319        turn_id: &agentkit_core::TurnId,
1320        launch_kind: &'static str,
1321    ) -> tracing::Span {
1322        tracing::info_span!(
1323            "agent.execute_tool",
1324            "gen_ai.tool.name" = %request.tool_name,
1325            "gen_ai.tool.call.id" = %request.call_id,
1326            session.id = %self.session_id,
1327            turn.id = %turn_id,
1328            launch_kind = launch_kind,
1329        )
1330    }
1331
1332    fn start_task_via_manager(
1333        &self,
1334        task_id: Option<TaskId>,
1335        tool_request: ToolRequest,
1336        kind: TaskLaunchKind,
1337        cancellation: Option<TurnCancellation>,
1338    ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1339    {
1340        let task_manager = self.task_manager.clone();
1341        let tool_executor = self.tool_executor.clone();
1342        let permissions = self.permissions.clone();
1343        let resources = self.resources.clone();
1344        let session_id = self.session_id.clone();
1345        let turn_id = tool_request.turn_id.clone();
1346        let metadata = tool_request.metadata.clone();
1347
1348        async move {
1349            task_manager
1350                .start_task(
1351                    TaskLaunchRequest {
1352                        task_id,
1353                        request: tool_request.clone(),
1354                        kind,
1355                    },
1356                    TaskStartContext {
1357                        executor: tool_executor,
1358                        tool_context: OwnedToolContext {
1359                            session_id,
1360                            turn_id,
1361                            metadata,
1362                            permissions,
1363                            resources,
1364                            cancellation,
1365                        },
1366                    },
1367                )
1368                .await
1369                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1370        }
1371    }
1372
1373    fn has_pending_interrupts(&self) -> bool {
1374        !self.pending_approvals.is_empty()
1375    }
1376
1377    fn emit_tool_catalog_events(&mut self, events: Vec<ToolCatalogEvent>) {
1378        for event in events {
1379            self.emit(AgentEvent::ToolCatalogChanged(event));
1380        }
1381    }
1382
1383    fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1384        let call_id = task.tool_request.call_id.clone();
1385        let call = ToolCallPart {
1386            id: call_id.clone(),
1387            name: task.tool_request.tool_name.to_string(),
1388            input: task.tool_request.input.clone(),
1389            metadata: task.tool_request.metadata.clone(),
1390        };
1391        let mut request = task.approval;
1392        request.call_id = Some(call_id.clone());
1393        let pending = PendingApprovalToolCall {
1394            request: request.clone(),
1395            decision: None,
1396            surfaced: false,
1397            turn_id: turn_id.clone(),
1398            task_id: task.task_id,
1399            call,
1400            tool_request: task.tool_request,
1401        };
1402        self.pending_approvals.insert(call_id.clone(), pending);
1403        if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1404            self.pending_approval_order.push_back(call_id);
1405        }
1406        self.emit(AgentEvent::ApprovalRequired(request));
1407    }
1408
1409    fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1410        for call_id in self.pending_approval_order.clone() {
1411            let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1412                continue;
1413            };
1414            if pending.decision.is_none() && !pending.surfaced {
1415                pending.surfaced = true;
1416                return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1417                    PendingApproval {
1418                        request: pending.request.clone(),
1419                    },
1420                )));
1421            }
1422        }
1423        None
1424    }
1425
1426    fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1427        self.pending_approval_order.iter().find_map(|call_id| {
1428            self.pending_approvals.get(call_id).and_then(|pending| {
1429                pending.decision.is_none().then(|| {
1430                    LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1431                        request: pending.request.clone(),
1432                    }))
1433                })
1434            })
1435        })
1436    }
1437
1438    fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1439        let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1440            self.pending_approvals
1441                .get(call_id)
1442                .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1443        })?;
1444        self.pending_approval_order.retain(|id| id != &call_id);
1445        self.pending_approvals.remove(&call_id)
1446    }
1447
1448    fn queue_resolution_interrupt(
1449        &mut self,
1450        turn_id: &agentkit_core::TurnId,
1451        resolution: TaskResolution,
1452    ) -> Option<LoopStep> {
1453        match resolution {
1454            TaskResolution::Item(item) => {
1455                self.append_tool_result_item(item);
1456                None
1457            }
1458            TaskResolution::Approval(task) => {
1459                self.enqueue_pending_approval(turn_id, task);
1460                self.take_next_unsurfaced_approval_interrupt()
1461            }
1462        }
1463    }
1464
1465    async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1466        let PendingLoopUpdates { mut resolutions } = self
1467            .task_manager
1468            .take_pending_loop_updates()
1469            .await
1470            .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1471        let mut saw_items = false;
1472        while let Some(resolution) = resolutions.pop_front() {
1473            match resolution {
1474                TaskResolution::Item(item) => {
1475                    self.append_tool_result_item(item);
1476                    saw_items = true;
1477                }
1478                TaskResolution::Approval(task) => {
1479                    self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1480                }
1481            }
1482        }
1483        Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1484    }
1485
1486    async fn maybe_compact(
1487        &mut self,
1488        turn_id: Option<&agentkit_core::TurnId>,
1489        cancellation: Option<TurnCancellation>,
1490    ) -> Result<(), LoopError> {
1491        let Some(compaction) = self.compaction.as_ref().cloned() else {
1492            return Ok(());
1493        };
1494        if cancellation
1495            .as_ref()
1496            .is_some_and(TurnCancellation::is_cancelled)
1497        {
1498            return Err(LoopError::Cancelled);
1499        }
1500        let Some(reason) =
1501            compaction
1502                .trigger
1503                .should_compact(&self.session_id, turn_id, &self.transcript)
1504        else {
1505            return Ok(());
1506        };
1507
1508        self.emit(AgentEvent::CompactionStarted {
1509            session_id: self.session_id.clone(),
1510            turn_id: turn_id.cloned(),
1511            reason: reason.clone(),
1512        });
1513
1514        let CompactionResult {
1515            transcript,
1516            replaced_items,
1517            metadata,
1518        } = compaction
1519            .strategy
1520            .apply(
1521                agentkit_compaction::CompactionRequest {
1522                    session_id: self.session_id.clone(),
1523                    turn_id: turn_id.cloned(),
1524                    transcript: self.transcript.clone(),
1525                    reason,
1526                    metadata: compaction.metadata.clone(),
1527                },
1528                &mut CompactionContext {
1529                    backend: compaction.backend.as_deref(),
1530                    metadata: &compaction.metadata,
1531                    cancellation,
1532                },
1533            )
1534            .await
1535            .map_err(|error| match error {
1536                agentkit_compaction::CompactionError::Cancelled => LoopError::Cancelled,
1537                other => LoopError::Compaction(other.to_string()),
1538            })?;
1539
1540        self.transcript = transcript;
1541        self.emit(AgentEvent::CompactionFinished {
1542            session_id: self.session_id.clone(),
1543            turn_id: turn_id.cloned(),
1544            replaced_items,
1545            transcript_len: self.transcript.len(),
1546            metadata,
1547        });
1548        Ok(())
1549    }
1550
1551    async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1552        let Some(_) = self.active_tool_round.as_ref() else {
1553            return Ok(None);
1554        };
1555        loop {
1556            let cancellation = self
1557                .cancellation
1558                .as_ref()
1559                .map(CancellationHandle::checkpoint);
1560            let turn_id = self
1561                .active_tool_round
1562                .as_ref()
1563                .map(|active| active.turn_id.clone())
1564                .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1565
1566            if cancellation
1567                .as_ref()
1568                .is_some_and(TurnCancellation::is_cancelled)
1569            {
1570                self.task_manager
1571                    .on_turn_interrupted(&turn_id)
1572                    .await
1573                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1574                self.active_tool_round = None;
1575                return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1576            }
1577
1578            let next_call = self
1579                .active_tool_round
1580                .as_mut()
1581                .and_then(|active| active.pending_calls.pop_front());
1582            if let Some((_call, tool_request)) = next_call {
1583                use tracing::Instrument;
1584                let dispatch_span = self.execute_tool_span(&tool_request, &turn_id, "plain");
1585                match self
1586                    .start_task_via_manager(
1587                        None,
1588                        tool_request.clone(),
1589                        TaskLaunchKind::Plain,
1590                        cancellation.clone(),
1591                    )
1592                    .instrument(dispatch_span)
1593                    .await?
1594                {
1595                    TaskStartOutcome::Ready(resolution) => {
1596                        let resolution = *resolution;
1597                        match resolution {
1598                            TaskResolution::Item(item) => {
1599                                if let Some(active) = self.active_tool_round.as_mut() {
1600                                    active.foreground_progressed = true;
1601                                }
1602                                self.append_tool_result_item(item);
1603                            }
1604                            TaskResolution::Approval(task) => {
1605                                self.enqueue_pending_approval(&turn_id, task);
1606                            }
1607                        }
1608                        continue;
1609                    }
1610                    TaskStartOutcome::Pending { kind, .. } => {
1611                        if kind == agentkit_task_manager::TaskKind::Background
1612                            && let Some(active) = self.active_tool_round.as_mut()
1613                        {
1614                            active.background_pending = true;
1615                        }
1616                        continue;
1617                    }
1618                }
1619            }
1620
1621            match self
1622                .task_manager
1623                .wait_for_turn(&turn_id, cancellation.clone())
1624                .await
1625                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1626            {
1627                Some(TurnTaskUpdate::Resolution(resolution)) => {
1628                    let resolution = *resolution;
1629                    match resolution {
1630                        TaskResolution::Item(item) => {
1631                            if let Some(active) = self.active_tool_round.as_mut() {
1632                                active.foreground_progressed = true;
1633                            }
1634                            self.append_tool_result_item(item);
1635                        }
1636                        TaskResolution::Approval(task) => {
1637                            self.enqueue_pending_approval(&turn_id, task);
1638                        }
1639                    }
1640                }
1641                Some(TurnTaskUpdate::Detached(snapshot)) => {
1642                    // The task was promoted to background. Push a synthetic
1643                    // tool result so the model knows the call is still
1644                    // running and can continue its turn. Track the
1645                    // call_id so when the real result arrives later via
1646                    // the task manager, we route it to a Notification
1647                    // item instead of emitting a second tool_result for
1648                    // the same call_id (which would violate the
1649                    // provider schema — exactly one tool_result per
1650                    // tool_use).
1651                    // Order matters: append the synthetic placeholder FIRST as
1652                    // a real Tool/ToolResult so the tool_use slot is filled
1653                    // (provider schemas require exactly one tool_result per
1654                    // tool_use). Only AFTER appending do we record the
1655                    // call_id in `detached_call_ids` — so the *next* item
1656                    // for this call_id (the real completion arriving later
1657                    // via the task manager) is the one converted to a
1658                    // Notification by `maybe_convert_detached`.
1659                    let detached_call_id = snapshot.call_id.clone();
1660                    self.append_tool_result_item(Item {
1661                        id: None,
1662                        kind: ItemKind::Tool,
1663                        parts: vec![Part::ToolResult(ToolResultPart {
1664                            call_id: detached_call_id.clone(),
1665                            output: ToolOutput::Text(format!(
1666                                "Tool {} is now running in the background. \
1667                                 The result will be delivered when it completes.",
1668                                snapshot.tool_name,
1669                            )),
1670                            is_error: false,
1671                            metadata: MetadataMap::new(),
1672                        })],
1673                        metadata: MetadataMap::new(),
1674                    });
1675                    self.detached_call_ids.insert(detached_call_id);
1676                    if let Some(active) = self.active_tool_round.as_mut() {
1677                        active.background_pending = true;
1678                        active.foreground_progressed = true;
1679                    }
1680                }
1681                None => {
1682                    if cancellation
1683                        .as_ref()
1684                        .is_some_and(TurnCancellation::is_cancelled)
1685                    {
1686                        self.task_manager
1687                            .on_turn_interrupted(&turn_id)
1688                            .await
1689                            .map_err(|error| {
1690                                LoopError::Tool(ToolError::Internal(error.to_string()))
1691                            })?;
1692                        self.active_tool_round = None;
1693                        return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1694                    }
1695                    let active = self.active_tool_round.take().ok_or_else(|| {
1696                        LoopError::InvalidState("missing active tool round".into())
1697                    })?;
1698                    if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1699                        return Ok(Some(step));
1700                    }
1701                    if let Some(step) = self.next_unresolved_approval_interrupt() {
1702                        return Ok(Some(step));
1703                    }
1704                    if active.background_pending && !active.foreground_progressed {
1705                        return Ok(None);
1706                    }
1707                    // Yield control back to the host between tool rounds.
1708                    // All tool calls in this round have results in the
1709                    // transcript; the transcript is provider-valid.  The
1710                    // host may submit_input before calling next() to
1711                    // resume, which will re-enter drive_turn via
1712                    // pending_round_resume.
1713                    let info = ToolRoundInfo {
1714                        session_id: self.session_id.clone(),
1715                        turn_id: turn_id.clone(),
1716                        transcript_len: self.transcript.len(),
1717                    };
1718                    self.pending_round_resume = Some(turn_id);
1719                    return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1720                        info,
1721                    ))));
1722                }
1723            }
1724        }
1725    }
1726
1727    #[tracing::instrument(
1728        name = "agent.turn",
1729        skip_all,
1730        fields(
1731            session.id = %self.session_id,
1732            turn.id = %turn_id,
1733            transcript.len = self.transcript.len(),
1734            saw_tool_call = tracing::field::Empty,
1735            finish_reason = tracing::field::Empty,
1736        ),
1737    )]
1738    async fn drive_turn(
1739        &mut self,
1740        turn_id: agentkit_core::TurnId,
1741        emit_started: bool,
1742    ) -> Result<LoopStep, LoopError> {
1743        let cancellation = self
1744            .cancellation
1745            .as_ref()
1746            .map(CancellationHandle::checkpoint);
1747        match self
1748            .maybe_compact(Some(&turn_id), cancellation.clone())
1749            .await
1750        {
1751            Ok(()) => {}
1752            Err(LoopError::Cancelled) => {
1753                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1754            }
1755            Err(error) => return Err(error),
1756        }
1757        if emit_started {
1758            self.emit(AgentEvent::TurnStarted {
1759                session_id: self.session_id.clone(),
1760                turn_id: turn_id.clone(),
1761            });
1762        }
1763        if cancellation
1764            .as_ref()
1765            .is_some_and(TurnCancellation::is_cancelled)
1766        {
1767            return self.finish_cancelled(turn_id, interrupted_assistant_items());
1768        }
1769
1770        let catalog_events = self.tool_executor.drain_catalog_events();
1771        self.emit_tool_catalog_events(catalog_events);
1772
1773        let request = TurnRequest {
1774            session_id: self.session_id.clone(),
1775            turn_id: turn_id.clone(),
1776            transcript: self.transcript.clone(),
1777            available_tools: self.tool_executor.specs(),
1778            cache: self
1779                .next_turn_cache
1780                .take()
1781                .or_else(|| self.default_cache.clone()),
1782            metadata: MetadataMap::new(),
1783        };
1784
1785        let session = self
1786            .session
1787            .as_mut()
1788            .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1789        let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1790            Ok(turn) => turn,
1791            Err(LoopError::Cancelled) => {
1792                self.task_manager
1793                    .on_turn_interrupted(&turn_id)
1794                    .await
1795                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1796                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1797            }
1798            Err(error) => return Err(error),
1799        };
1800        let mut saw_tool_call = false;
1801        let mut finished_result = None;
1802
1803        while let Some(event) = match turn.next_event(cancellation.clone()).await {
1804            Ok(event) => event,
1805            Err(LoopError::Cancelled) => {
1806                self.task_manager
1807                    .on_turn_interrupted(&turn_id)
1808                    .await
1809                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1810                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1811            }
1812            Err(error) => return Err(error),
1813        } {
1814            if cancellation
1815                .as_ref()
1816                .is_some_and(TurnCancellation::is_cancelled)
1817            {
1818                self.task_manager
1819                    .on_turn_interrupted(&turn_id)
1820                    .await
1821                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1822                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1823            }
1824            match event {
1825                ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1826                ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1827                ModelTurnEvent::ToolCall(call) => {
1828                    saw_tool_call = true;
1829                    self.emit(AgentEvent::ToolCallRequested(call.clone()));
1830                }
1831                ModelTurnEvent::Finished(result) => {
1832                    finished_result = Some(result);
1833                    break;
1834                }
1835            }
1836        }
1837
1838        let result = finished_result.ok_or_else(|| {
1839            LoopError::Provider("model turn ended without a Finished event".into())
1840        })?;
1841        tracing::Span::current().record("saw_tool_call", saw_tool_call);
1842        tracing::Span::current().record(
1843            "finish_reason",
1844            tracing::field::debug(&result.finish_reason),
1845        );
1846        self.extend_transcript(result.output_items.clone());
1847
1848        if saw_tool_call {
1849            let pending_calls = extract_tool_calls(&result.output_items)
1850                .into_iter()
1851                .map(|call| {
1852                    let tool_request = ToolRequest {
1853                        call_id: call.id.clone(),
1854                        tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1855                        input: call.input.clone(),
1856                        session_id: self.session_id.clone(),
1857                        turn_id: turn_id.clone(),
1858                        metadata: call.metadata.clone(),
1859                    };
1860                    (call, tool_request)
1861                })
1862                .collect();
1863            self.active_tool_round = Some(ActiveToolRound {
1864                turn_id: turn_id.clone(),
1865                pending_calls,
1866                background_pending: false,
1867                foreground_progressed: false,
1868            });
1869            if let Some(step) = self.continue_active_tool_round().await? {
1870                return Ok(step);
1871            }
1872            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1873                InputRequest {
1874                    session_id: self.session_id.clone(),
1875                    reason: "driver is waiting for input".into(),
1876                },
1877            )));
1878        }
1879
1880        let turn_result = TurnResult {
1881            turn_id,
1882            finish_reason: result.finish_reason,
1883            items: result.output_items,
1884            usage: result.usage,
1885            metadata: result.metadata,
1886        };
1887        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1888        Ok(LoopStep::Finished(turn_result))
1889    }
1890
1891    async fn resume_after_approval(
1892        &mut self,
1893        pending: PendingApprovalToolCall,
1894    ) -> Result<LoopStep, LoopError> {
1895        let decision = pending
1896            .decision
1897            .clone()
1898            .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
1899
1900        match decision {
1901            ApprovalDecision::Approve => {
1902                use tracing::Instrument;
1903                let dispatch_span =
1904                    self.execute_tool_span(&pending.tool_request, &pending.turn_id, "approved");
1905                match self
1906                    .start_task_via_manager(
1907                        Some(pending.task_id.clone()),
1908                        pending.tool_request.clone(),
1909                        TaskLaunchKind::Approved(pending.request.clone()),
1910                        self.cancellation
1911                            .as_ref()
1912                            .map(CancellationHandle::checkpoint),
1913                    )
1914                    .instrument(dispatch_span)
1915                    .await?
1916                {
1917                    TaskStartOutcome::Ready(resolution) => {
1918                        let resolution = *resolution;
1919                        if let Some(step) =
1920                            self.queue_resolution_interrupt(&pending.turn_id, resolution)
1921                        {
1922                            return Ok(step);
1923                        }
1924                    }
1925                    TaskStartOutcome::Pending { .. } => {}
1926                }
1927            }
1928            ApprovalDecision::Deny { reason } => {
1929                self.append_tool_result_item(Item {
1930                    id: None,
1931                    kind: ItemKind::Tool,
1932                    parts: vec![Part::ToolResult(ToolResultPart {
1933                        call_id: pending.call.id.clone(),
1934                        output: ToolOutput::Text(
1935                            reason.unwrap_or_else(|| "approval denied".into()),
1936                        ),
1937                        is_error: true,
1938                        metadata: pending.call.metadata.clone(),
1939                    })],
1940                    metadata: MetadataMap::new(),
1941                });
1942            }
1943        }
1944
1945        if let Some(step) = self.continue_active_tool_round().await? {
1946            Ok(step)
1947        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1948            Ok(step)
1949        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1950            Ok(step)
1951        } else {
1952            self.drive_turn(pending.turn_id, false).await
1953        }
1954    }
1955
1956    fn finish_cancelled(
1957        &mut self,
1958        turn_id: agentkit_core::TurnId,
1959        items: Vec<Item>,
1960    ) -> Result<LoopStep, LoopError> {
1961        self.extend_transcript(items.clone());
1962        let turn_result = TurnResult {
1963            turn_id,
1964            finish_reason: FinishReason::Cancelled,
1965            items,
1966            usage: None,
1967            metadata: interrupted_metadata("turn"),
1968        };
1969        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1970        Ok(LoopStep::Finished(turn_result))
1971    }
1972
1973    /// Internal entry point for buffering user input. Reachable only via
1974    /// [`InputRequest::submit`] (resolves an `AwaitingInput` interrupt,
1975    /// including the very first one after [`Agent::start`]) and
1976    /// [`ToolRoundInfo::submit`] (interjects between tool rounds). Prior
1977    /// transcript items — the passive starting state of a session — are
1978    /// preloaded via [`AgentBuilder::transcript`]; an opening user turn for
1979    /// one-shot calls is preloaded via [`AgentBuilder::input`]. New input
1980    /// after start-up always flows through one of the typed `submit`
1981    /// handles.
1982    pub(crate) fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
1983        if self.has_pending_interrupts() {
1984            return Err(LoopError::InvalidState(
1985                "cannot submit input while an interrupt is pending".into(),
1986            ));
1987        }
1988        self.emit(AgentEvent::InputAccepted {
1989            session_id: self.session_id.clone(),
1990            items: input.clone(),
1991        });
1992        self.pending_input.extend(input);
1993        Ok(())
1994    }
1995
1996    /// Override the prompt cache request for the next model turn.
1997    ///
1998    /// The override is consumed the next time the driver starts a model turn.
1999    /// Session-level defaults still apply to later turns.
2000    pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
2001        if self.has_pending_interrupts() {
2002            return Err(LoopError::InvalidState(
2003                "cannot update next-turn cache while an interrupt is pending".into(),
2004            ));
2005        }
2006        self.next_turn_cache = Some(cache);
2007        Ok(())
2008    }
2009
2010    #[cfg(test)]
2011    pub(crate) fn submit_input_with_cache(
2012        &mut self,
2013        input: Vec<Item>,
2014        cache: PromptCacheRequest,
2015    ) -> Result<(), LoopError> {
2016        self.set_next_turn_cache(cache)?;
2017        self.submit_input(input)
2018    }
2019
2020    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`].
2021    ///
2022    /// After calling this, invoke [`next`](LoopDriver::next) to continue the
2023    /// loop.  If the decision is [`ApprovalDecision::Approve`] the tool call
2024    /// executes; if denied, an error result is fed back to the model.
2025    ///
2026    /// # Errors
2027    ///
2028    /// Returns [`LoopError::InvalidState`] if no approval is pending.
2029    pub fn resolve_approval_for(
2030        &mut self,
2031        call_id: ToolCallId,
2032        decision: ApprovalDecision,
2033    ) -> Result<(), LoopError> {
2034        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2035            return Err(LoopError::InvalidState(format!(
2036                "no approval request is pending for call {}",
2037                call_id.0
2038            )));
2039        };
2040        pending.decision = Some(decision.clone());
2041        self.emit(AgentEvent::ApprovalResolved {
2042            approved: matches!(decision, ApprovalDecision::Approve),
2043        });
2044        Ok(())
2045    }
2046
2047    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] with a patched
2048    /// input that replaces the model's original tool arguments.
2049    ///
2050    /// Equivalent to calling [`resolve_approval_for`] with
2051    /// [`ApprovalDecision::Approve`] except the tool sees `input` instead of
2052    /// what the model emitted. The transcript still records the model's
2053    /// original call.
2054    ///
2055    /// # Errors
2056    ///
2057    /// Returns [`LoopError::InvalidState`] if no approval is pending for
2058    /// `call_id`.
2059    pub fn resolve_approval_for_with_patched_input(
2060        &mut self,
2061        call_id: ToolCallId,
2062        input: serde_json::Value,
2063    ) -> Result<(), LoopError> {
2064        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
2065            return Err(LoopError::InvalidState(format!(
2066                "no approval request is pending for call {}",
2067                call_id.0
2068            )));
2069        };
2070        pending.tool_request.input = input;
2071        self.resolve_approval_for(call_id, ApprovalDecision::Approve)
2072    }
2073
2074    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] when exactly one
2075    /// approval is outstanding.
2076    pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
2077        let mut unresolved = self
2078            .pending_approval_order
2079            .iter()
2080            .filter(|call_id| {
2081                self.pending_approvals
2082                    .get(*call_id)
2083                    .is_some_and(|pending| pending.decision.is_none())
2084            })
2085            .cloned();
2086        let Some(call_id) = unresolved.next() else {
2087            return Err(LoopError::InvalidState(
2088                "no approval request is pending".into(),
2089            ));
2090        };
2091        if unresolved.next().is_some() {
2092            return Err(LoopError::InvalidState(
2093                "multiple approvals are pending; use resolve_approval_for".into(),
2094            ));
2095        }
2096        self.resolve_approval_for(call_id, decision)
2097    }
2098
2099    /// Take a read-only snapshot of the driver's current transcript and input queue.
2100    pub fn snapshot(&self) -> LoopSnapshot {
2101        LoopSnapshot {
2102            session_id: self.session_id.clone(),
2103            transcript: self.transcript.clone(),
2104            pending_input: self.pending_input.clone(),
2105        }
2106    }
2107
2108    /// Advance the loop by one step.
2109    ///
2110    /// This is the main method for driving the agent.  It processes pending
2111    /// interrupt resolutions, consumes queued input, starts a model turn,
2112    /// executes tool calls, and returns once the turn finishes or an
2113    /// interrupt occurs.
2114    ///
2115    /// If no input is queued and no interrupt is pending, returns
2116    /// [`LoopStep::Interrupt(LoopInterrupt::AwaitingInput(..))`](LoopInterrupt::AwaitingInput).
2117    /// This is the steady state after [`Agent::start`] when no input was
2118    /// preloaded via [`AgentBuilder::input`]: the prior transcript loaded
2119    /// via [`AgentBuilder::transcript`] is passive, so the first call
2120    /// surfaces `AwaitingInput` and waits for the host to supply input via
2121    /// [`InputRequest::submit`] before any model turn is dispatched. If
2122    /// input was preloaded, the first call dispatches the model directly.
2123    ///
2124    /// # Errors
2125    ///
2126    /// Returns [`LoopError::InvalidState`] if called while an unresolved
2127    /// interrupt is pending, or propagates provider / tool / compaction errors.
2128    pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2129        if let Some(pending) = self.take_next_resolved_approval() {
2130            return self.resume_after_approval(pending).await;
2131        }
2132
2133        if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2134            return Ok(step);
2135        }
2136
2137        if let Some(step) = self.next_unresolved_approval_interrupt() {
2138            return Ok(step);
2139        }
2140
2141        if let Some(step) = self.continue_active_tool_round().await? {
2142            return Ok(step);
2143        }
2144
2145        let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2146        if let Some(step) = loop_step {
2147            return Ok(step);
2148        }
2149
2150        // Resume after an AfterToolResult yield.  Any input submitted by the
2151        // host during the yield is folded into the transcript as part of the
2152        // continuation turn; background task results drained just above are
2153        // already in the transcript.
2154        if let Some(turn_id) = self.pending_round_resume.take() {
2155            let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2156            self.extend_transcript(drained);
2157            return self.drive_turn(turn_id, false).await;
2158        }
2159
2160        if self.pending_input.is_empty() && !had_loop_updates {
2161            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2162                InputRequest {
2163                    session_id: self.session_id.clone(),
2164                    reason: "driver is waiting for input".into(),
2165                },
2166            )));
2167        }
2168
2169        let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2170        self.next_turn_index += 1;
2171        let drained: Vec<Item> = std::mem::take(&mut self.pending_input);
2172        self.extend_transcript(drained);
2173        self.drive_turn(turn_id, true).await
2174    }
2175
2176    fn emit(&mut self, event: AgentEvent) {
2177        for observer in &mut self.observers {
2178            observer.handle_event(event.clone());
2179        }
2180    }
2181
2182    /// Append a single [`Item`] to the transcript and notify all
2183    /// registered [`TranscriptObserver`]s. The single mutation point —
2184    /// every push to `self.transcript` should funnel through here so
2185    /// observers see exactly what landed in the transcript.
2186    fn append_item(&mut self, item: Item) {
2187        for observer in &mut self.transcript_observers {
2188            observer.on_item_appended(&item);
2189        }
2190        self.transcript.push(item);
2191    }
2192
2193    /// Append a tool-result Item: emit one [`AgentEvent::ToolResultReceived`]
2194    /// per [`Part::ToolResult`] inside the Item, then funnel through
2195    /// [`Self::append_item`].
2196    ///
2197    /// If every `ToolResult` in the item references a `call_id` that was
2198    /// already paired with a synthetic detach tool_result, the item is
2199    /// converted to a [`ItemKind::Notification`] before appending.
2200    /// Without this, we would emit a second `tool_result` for the same
2201    /// `tool_use_id` — a provider-schema violation that
2202    /// Anthropic/OpenRouter reject as an "orphaned tool_result".
2203    /// Observers still see `ToolResultReceived` for each result so any
2204    /// UI spinner or task tracker can close.
2205    fn append_tool_result_item(&mut self, item: Item) {
2206        for part in &item.parts {
2207            if let Part::ToolResult(result) = part {
2208                self.emit(AgentEvent::ToolResultReceived(result.clone()));
2209            }
2210        }
2211        let item = self.maybe_convert_detached(item);
2212        self.append_item(item);
2213    }
2214
2215    fn maybe_convert_detached(&mut self, item: Item) -> Item {
2216        if !matches!(item.kind, ItemKind::Tool) {
2217            return item;
2218        }
2219        let results: Vec<&ToolResultPart> = item
2220            .parts
2221            .iter()
2222            .filter_map(|p| match p {
2223                Part::ToolResult(r) => Some(r),
2224                _ => None,
2225            })
2226            .collect();
2227        if results.is_empty()
2228            || !results
2229                .iter()
2230                .all(|r| self.detached_call_ids.contains(&r.call_id))
2231        {
2232            return item;
2233        }
2234        let mut text = String::new();
2235        for result in &results {
2236            self.detached_call_ids.remove(&result.call_id);
2237            if !text.is_empty() {
2238                text.push_str("\n\n");
2239            }
2240            let label = if result.is_error {
2241                "failed"
2242            } else {
2243                "completed"
2244            };
2245            let body = render_tool_output_brief(&result.output);
2246            text.push_str(&format!(
2247                "Background tool call {} {}: {body}",
2248                result.call_id.0, label
2249            ));
2250        }
2251        Item::notification(text)
2252    }
2253
2254    /// Append several Items in order through [`Self::append_item`].
2255    fn extend_transcript(&mut self, items: impl IntoIterator<Item = Item>) {
2256        for item in items {
2257            self.append_item(item);
2258        }
2259    }
2260}
2261
2262fn render_tool_output_brief(output: &ToolOutput) -> String {
2263    match output {
2264        ToolOutput::Text(t) => t.clone(),
2265        ToolOutput::Structured(value) => value.to_string(),
2266        ToolOutput::Parts(parts) => format!("[{} parts]", parts.len()),
2267        ToolOutput::Files(files) => format!("[{} files]", files.len()),
2268    }
2269}
2270
2271fn interrupted_metadata(stage: &str) -> MetadataMap {
2272    let mut metadata = MetadataMap::new();
2273    metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2274    metadata.insert(
2275        INTERRUPT_REASON_METADATA_KEY.into(),
2276        USER_CANCELLED_REASON.into(),
2277    );
2278    metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2279    metadata
2280}
2281
2282fn interrupted_assistant_items() -> Vec<Item> {
2283    vec![Item {
2284        id: None,
2285        kind: ItemKind::Assistant,
2286        parts: vec![Part::Text(TextPart {
2287            text: "Previous assistant response was interrupted by the user before completion."
2288                .into(),
2289            metadata: interrupted_metadata("assistant"),
2290        })],
2291        metadata: interrupted_metadata("assistant"),
2292    }]
2293}
2294
2295fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2296    let mut calls = Vec::new();
2297    for item in items {
2298        for part in &item.parts {
2299            if let Part::ToolCall(call) = part {
2300                calls.push(call.clone());
2301            }
2302        }
2303    }
2304    calls
2305}
2306
2307/// Errors that can occur while driving the agent loop.
2308#[derive(Debug, Error)]
2309pub enum LoopError {
2310    /// The driver was in an unexpected state for the requested operation.
2311    #[error("invalid driver state: {0}")]
2312    InvalidState(String),
2313    /// The current turn was cancelled via the [`CancellationHandle`].
2314    #[error("turn cancelled")]
2315    Cancelled,
2316    /// An error originating from the model provider.
2317    #[error("provider error: {0}")]
2318    Provider(String),
2319    /// An error originating from tool execution.
2320    #[error("tool error: {0}")]
2321    Tool(#[from] ToolError),
2322    /// An error that occurred during transcript compaction.
2323    #[error("compaction error: {0}")]
2324    Compaction(String),
2325    /// The requested operation is not supported.
2326    #[error("unsupported operation: {0}")]
2327    Unsupported(String),
2328}
2329
2330#[cfg(test)]
2331mod tests {
2332    use std::collections::VecDeque;
2333    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2334    use std::sync::{Arc as StdArc, Mutex as StdMutex};
2335
2336    use agentkit_compaction::{CompactionPipeline, CompactionTrigger, KeepRecentStrategy};
2337    use agentkit_core::{
2338        CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolOutput, ToolResultPart,
2339    };
2340    use agentkit_task_manager::{
2341        AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2342        TaskRoutingPolicy,
2343    };
2344    use agentkit_tools_core::{
2345        FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2346        ToolAnnotations, ToolCatalogEvent, ToolExecutionOutcome, ToolName, ToolRegistry,
2347        ToolResult, ToolSpec,
2348    };
2349    use serde_json::{Value, json};
2350    use tokio::sync::Notify;
2351    use tokio::time::{Duration, timeout};
2352
2353    use super::*;
2354
2355    struct FakeAdapter;
2356    struct SlowAdapter;
2357    struct RecordingAdapter {
2358        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2359        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2360    }
2361    struct MultiToolAdapter;
2362    struct DualApprovalAdapter;
2363
2364    struct FakeSession;
2365    struct SlowSession;
2366    struct RecordingSession {
2367        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2368        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2369    }
2370    struct MultiToolSession;
2371    struct DualApprovalSession;
2372
2373    struct FakeTurn {
2374        events: VecDeque<ModelTurnEvent>,
2375    }
2376
2377    struct SlowTurn {
2378        emitted: bool,
2379    }
2380
2381    struct RecordingTurn {
2382        emitted: bool,
2383    }
2384    struct MultiToolTurn {
2385        events: VecDeque<ModelTurnEvent>,
2386    }
2387    struct DualApprovalTurn {
2388        events: VecDeque<ModelTurnEvent>,
2389    }
2390
2391    #[async_trait]
2392    impl ModelAdapter for FakeAdapter {
2393        type Session = FakeSession;
2394
2395        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2396            Ok(FakeSession)
2397        }
2398    }
2399
2400    #[async_trait]
2401    impl ModelAdapter for SlowAdapter {
2402        type Session = SlowSession;
2403
2404        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2405            Ok(SlowSession)
2406        }
2407    }
2408
2409    #[async_trait]
2410    impl ModelAdapter for RecordingAdapter {
2411        type Session = RecordingSession;
2412
2413        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2414            Ok(RecordingSession {
2415                seen_descriptions: self.seen_descriptions.clone(),
2416                seen_caches: self.seen_caches.clone(),
2417            })
2418        }
2419    }
2420
2421    #[async_trait]
2422    impl ModelAdapter for MultiToolAdapter {
2423        type Session = MultiToolSession;
2424
2425        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2426            Ok(MultiToolSession)
2427        }
2428    }
2429
2430    #[async_trait]
2431    impl ModelAdapter for DualApprovalAdapter {
2432        type Session = DualApprovalSession;
2433
2434        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2435            Ok(DualApprovalSession)
2436        }
2437    }
2438
2439    #[async_trait]
2440    impl ModelSession for FakeSession {
2441        type Turn = FakeTurn;
2442
2443        async fn begin_turn(
2444            &mut self,
2445            request: TurnRequest,
2446            _cancellation: Option<TurnCancellation>,
2447        ) -> Result<Self::Turn, LoopError> {
2448            let has_tool_result = request.transcript.iter().any(|item| {
2449                item.kind == ItemKind::Tool
2450                    && item
2451                        .parts
2452                        .iter()
2453                        .any(|part| matches!(part, Part::ToolResult(_)))
2454            });
2455            let tool_name = request
2456                .available_tools
2457                .first()
2458                .map(|tool| tool.name.0.clone())
2459                .unwrap_or_else(|| "echo".into());
2460
2461            let events = if has_tool_result {
2462                let result_text = request
2463                    .transcript
2464                    .iter()
2465                    .rev()
2466                    .find_map(|item| {
2467                        item.parts.iter().find_map(|part| match part {
2468                            Part::ToolResult(ToolResultPart {
2469                                output: ToolOutput::Text(text),
2470                                ..
2471                            }) => Some(text.clone()),
2472                            _ => None,
2473                        })
2474                    })
2475                    .unwrap_or_else(|| "missing".into());
2476
2477                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2478                    finish_reason: FinishReason::Completed,
2479                    output_items: vec![Item {
2480                        id: None,
2481                        kind: ItemKind::Assistant,
2482                        parts: vec![Part::Text(TextPart {
2483                            text: format!("tool said: {result_text}"),
2484                            metadata: MetadataMap::new(),
2485                        })],
2486                        metadata: MetadataMap::new(),
2487                    }],
2488                    usage: None,
2489                    metadata: MetadataMap::new(),
2490                })])
2491            } else {
2492                VecDeque::from([
2493                    ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2494                        id: ToolCallId::new("call-1"),
2495                        name: tool_name.clone(),
2496                        input: json!({ "value": "pong" }),
2497                        metadata: MetadataMap::new(),
2498                    }),
2499                    ModelTurnEvent::Finished(ModelTurnResult {
2500                        finish_reason: FinishReason::ToolCall,
2501                        output_items: vec![Item {
2502                            id: None,
2503                            kind: ItemKind::Assistant,
2504                            parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2505                                id: ToolCallId::new("call-1"),
2506                                name: tool_name,
2507                                input: json!({ "value": "pong" }),
2508                                metadata: MetadataMap::new(),
2509                            })],
2510                            metadata: MetadataMap::new(),
2511                        }],
2512                        usage: None,
2513                        metadata: MetadataMap::new(),
2514                    }),
2515                ])
2516            };
2517
2518            Ok(FakeTurn { events })
2519        }
2520    }
2521
2522    #[async_trait]
2523    impl ModelSession for SlowSession {
2524        type Turn = SlowTurn;
2525
2526        async fn begin_turn(
2527            &mut self,
2528            request: TurnRequest,
2529            cancellation: Option<TurnCancellation>,
2530        ) -> Result<Self::Turn, LoopError> {
2531            let should_block = request
2532                .transcript
2533                .iter()
2534                .rev()
2535                .find(|item| item.kind == ItemKind::User)
2536                .is_some_and(|item| {
2537                    item.parts.iter().any(|part| match part {
2538                        Part::Text(text) => text.text == "do the long task",
2539                        _ => false,
2540                    })
2541                });
2542
2543            if should_block && let Some(cancellation) = cancellation {
2544                cancellation.cancelled().await;
2545                return Err(LoopError::Cancelled);
2546            }
2547
2548            Ok(SlowTurn { emitted: false })
2549        }
2550    }
2551
2552    #[async_trait]
2553    impl ModelSession for RecordingSession {
2554        type Turn = RecordingTurn;
2555
2556        async fn begin_turn(
2557            &mut self,
2558            request: TurnRequest,
2559            _cancellation: Option<TurnCancellation>,
2560        ) -> Result<Self::Turn, LoopError> {
2561            let descriptions = request
2562                .available_tools
2563                .iter()
2564                .map(|tool| tool.description.clone())
2565                .collect::<Vec<_>>();
2566            self.seen_descriptions.lock().unwrap().push(descriptions);
2567            self.seen_caches.lock().unwrap().push(request.cache.clone());
2568
2569            Ok(RecordingTurn { emitted: false })
2570        }
2571    }
2572
2573    #[async_trait]
2574    impl ModelSession for MultiToolSession {
2575        type Turn = MultiToolTurn;
2576
2577        async fn begin_turn(
2578            &mut self,
2579            request: TurnRequest,
2580            _cancellation: Option<TurnCancellation>,
2581        ) -> Result<Self::Turn, LoopError> {
2582            let has_tool_result = request.transcript.iter().any(|item| {
2583                item.kind == ItemKind::Tool
2584                    && item
2585                        .parts
2586                        .iter()
2587                        .any(|part| matches!(part, Part::ToolResult(_)))
2588            });
2589
2590            let events = if has_tool_result {
2591                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2592                    finish_reason: FinishReason::Completed,
2593                    output_items: vec![Item {
2594                        id: None,
2595                        kind: ItemKind::Assistant,
2596                        parts: vec![Part::Text(TextPart {
2597                            text: "mixed tools finished".into(),
2598                            metadata: MetadataMap::new(),
2599                        })],
2600                        metadata: MetadataMap::new(),
2601                    }],
2602                    usage: None,
2603                    metadata: MetadataMap::new(),
2604                })])
2605            } else {
2606                let foreground = agentkit_core::ToolCallPart {
2607                    id: ToolCallId::new("call-foreground"),
2608                    name: "foreground-wait".into(),
2609                    input: json!({}),
2610                    metadata: MetadataMap::new(),
2611                };
2612                let background = agentkit_core::ToolCallPart {
2613                    id: ToolCallId::new("call-background"),
2614                    name: "background-wait".into(),
2615                    input: json!({}),
2616                    metadata: MetadataMap::new(),
2617                };
2618                VecDeque::from([
2619                    ModelTurnEvent::ToolCall(foreground.clone()),
2620                    ModelTurnEvent::ToolCall(background.clone()),
2621                    ModelTurnEvent::Finished(ModelTurnResult {
2622                        finish_reason: FinishReason::ToolCall,
2623                        output_items: vec![Item {
2624                            id: None,
2625                            kind: ItemKind::Assistant,
2626                            parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2627                            metadata: MetadataMap::new(),
2628                        }],
2629                        usage: None,
2630                        metadata: MetadataMap::new(),
2631                    }),
2632                ])
2633            };
2634
2635            Ok(MultiToolTurn { events })
2636        }
2637    }
2638
2639    #[async_trait]
2640    impl ModelSession for DualApprovalSession {
2641        type Turn = DualApprovalTurn;
2642
2643        async fn begin_turn(
2644            &mut self,
2645            request: TurnRequest,
2646            _cancellation: Option<TurnCancellation>,
2647        ) -> Result<Self::Turn, LoopError> {
2648            let tool_results = request
2649                .transcript
2650                .iter()
2651                .flat_map(|item| item.parts.iter())
2652                .filter(|part| matches!(part, Part::ToolResult(_)))
2653                .count();
2654
2655            let events = if tool_results >= 2 {
2656                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2657                    finish_reason: FinishReason::Completed,
2658                    output_items: vec![Item {
2659                        id: None,
2660                        kind: ItemKind::Assistant,
2661                        parts: vec![Part::Text(TextPart {
2662                            text: "both approvals finished".into(),
2663                            metadata: MetadataMap::new(),
2664                        })],
2665                        metadata: MetadataMap::new(),
2666                    }],
2667                    usage: None,
2668                    metadata: MetadataMap::new(),
2669                })])
2670            } else {
2671                let first = agentkit_core::ToolCallPart {
2672                    id: ToolCallId::new("call-1"),
2673                    name: "echo".into(),
2674                    input: json!({ "value": "first" }),
2675                    metadata: MetadataMap::new(),
2676                };
2677                let second = agentkit_core::ToolCallPart {
2678                    id: ToolCallId::new("call-2"),
2679                    name: "echo".into(),
2680                    input: json!({ "value": "second" }),
2681                    metadata: MetadataMap::new(),
2682                };
2683                VecDeque::from([
2684                    ModelTurnEvent::ToolCall(first.clone()),
2685                    ModelTurnEvent::ToolCall(second.clone()),
2686                    ModelTurnEvent::Finished(ModelTurnResult {
2687                        finish_reason: FinishReason::ToolCall,
2688                        output_items: vec![Item {
2689                            id: None,
2690                            kind: ItemKind::Assistant,
2691                            parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2692                            metadata: MetadataMap::new(),
2693                        }],
2694                        usage: None,
2695                        metadata: MetadataMap::new(),
2696                    }),
2697                ])
2698            };
2699
2700            Ok(DualApprovalTurn { events })
2701        }
2702    }
2703
2704    #[async_trait]
2705    impl ModelTurn for FakeTurn {
2706        async fn next_event(
2707            &mut self,
2708            _cancellation: Option<TurnCancellation>,
2709        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2710            Ok(self.events.pop_front())
2711        }
2712    }
2713
2714    #[async_trait]
2715    impl ModelTurn for SlowTurn {
2716        async fn next_event(
2717            &mut self,
2718            cancellation: Option<TurnCancellation>,
2719        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2720            if let Some(cancellation) = cancellation
2721                && cancellation.is_cancelled()
2722            {
2723                return Err(LoopError::Cancelled);
2724            }
2725
2726            if self.emitted {
2727                Ok(None)
2728            } else {
2729                self.emitted = true;
2730                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2731                    finish_reason: FinishReason::Completed,
2732                    output_items: vec![Item {
2733                        id: None,
2734                        kind: ItemKind::Assistant,
2735                        parts: vec![Part::Text(TextPart {
2736                            text: "done".into(),
2737                            metadata: MetadataMap::new(),
2738                        })],
2739                        metadata: MetadataMap::new(),
2740                    }],
2741                    usage: None,
2742                    metadata: MetadataMap::new(),
2743                })))
2744            }
2745        }
2746    }
2747
2748    #[async_trait]
2749    impl ModelTurn for RecordingTurn {
2750        async fn next_event(
2751            &mut self,
2752            _cancellation: Option<TurnCancellation>,
2753        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2754            if self.emitted {
2755                Ok(None)
2756            } else {
2757                self.emitted = true;
2758                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2759                    finish_reason: FinishReason::Completed,
2760                    output_items: vec![Item {
2761                        id: None,
2762                        kind: ItemKind::Assistant,
2763                        parts: vec![Part::Text(TextPart {
2764                            text: "done".into(),
2765                            metadata: MetadataMap::new(),
2766                        })],
2767                        metadata: MetadataMap::new(),
2768                    }],
2769                    usage: None,
2770                    metadata: MetadataMap::new(),
2771                })))
2772            }
2773        }
2774    }
2775
2776    #[async_trait]
2777    impl ModelTurn for MultiToolTurn {
2778        async fn next_event(
2779            &mut self,
2780            _cancellation: Option<TurnCancellation>,
2781        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2782            Ok(self.events.pop_front())
2783        }
2784    }
2785
2786    #[async_trait]
2787    impl ModelTurn for DualApprovalTurn {
2788        async fn next_event(
2789            &mut self,
2790            _cancellation: Option<TurnCancellation>,
2791        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2792            Ok(self.events.pop_front())
2793        }
2794    }
2795
2796    #[derive(Clone)]
2797    struct EchoTool {
2798        spec: ToolSpec,
2799    }
2800
2801    impl Default for EchoTool {
2802        fn default() -> Self {
2803            Self {
2804                spec: ToolSpec {
2805                    name: ToolName::new("echo"),
2806                    description: "Echo back a value".into(),
2807                    input_schema: json!({
2808                        "type": "object",
2809                        "properties": {
2810                            "value": { "type": "string" }
2811                        },
2812                        "required": ["value"],
2813                        "additionalProperties": false
2814                    }),
2815                    annotations: ToolAnnotations::default(),
2816                    metadata: MetadataMap::new(),
2817                },
2818            }
2819        }
2820    }
2821
2822    #[derive(Clone)]
2823    struct DynamicSpecTool {
2824        spec: ToolSpec,
2825        version: StdArc<AtomicUsize>,
2826    }
2827
2828    impl DynamicSpecTool {
2829        fn new(version: StdArc<AtomicUsize>) -> Self {
2830            Self {
2831                spec: ToolSpec {
2832                    name: ToolName::new("dynamic"),
2833                    description: "dynamic version 0".into(),
2834                    input_schema: json!({
2835                        "type": "object",
2836                        "properties": {},
2837                        "additionalProperties": false
2838                    }),
2839                    annotations: ToolAnnotations::default(),
2840                    metadata: MetadataMap::new(),
2841                },
2842                version,
2843            }
2844        }
2845    }
2846
2847    #[async_trait]
2848    impl Tool for EchoTool {
2849        fn spec(&self) -> &ToolSpec {
2850            &self.spec
2851        }
2852
2853        fn proposed_requests(
2854            &self,
2855            request: &agentkit_tools_core::ToolRequest,
2856        ) -> Result<
2857            Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
2858            agentkit_tools_core::ToolError,
2859        > {
2860            Ok(vec![Box::new(FileSystemPermissionRequest::Read {
2861                path: "/tmp/echo".into(),
2862                metadata: request.metadata.clone(),
2863            })])
2864        }
2865
2866        async fn invoke(
2867            &self,
2868            request: agentkit_tools_core::ToolRequest,
2869            _ctx: &mut ToolContext<'_>,
2870        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2871            let value = request
2872                .input
2873                .get("value")
2874                .and_then(Value::as_str)
2875                .ok_or_else(|| {
2876                    agentkit_tools_core::ToolError::InvalidInput("missing value".into())
2877                })?;
2878
2879            Ok(ToolResult {
2880                result: ToolResultPart {
2881                    call_id: request.call_id,
2882                    output: ToolOutput::Text(value.into()),
2883                    is_error: false,
2884                    metadata: MetadataMap::new(),
2885                },
2886                duration: None,
2887                metadata: MetadataMap::new(),
2888            })
2889        }
2890    }
2891
2892    #[async_trait]
2893    impl Tool for DynamicSpecTool {
2894        fn spec(&self) -> &ToolSpec {
2895            &self.spec
2896        }
2897
2898        fn current_spec(&self) -> Option<ToolSpec> {
2899            let mut spec = self.spec.clone();
2900            spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
2901            Some(spec)
2902        }
2903
2904        async fn invoke(
2905            &self,
2906            request: agentkit_tools_core::ToolRequest,
2907            _ctx: &mut ToolContext<'_>,
2908        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2909            Ok(ToolResult {
2910                result: ToolResultPart {
2911                    call_id: request.call_id,
2912                    output: ToolOutput::Text("ok".into()),
2913                    is_error: false,
2914                    metadata: MetadataMap::new(),
2915                },
2916                duration: None,
2917                metadata: MetadataMap::new(),
2918            })
2919        }
2920    }
2921
2922    struct DenyFsReads;
2923
2924    impl PermissionChecker for DenyFsReads {
2925        fn evaluate(
2926            &self,
2927            request: &dyn agentkit_tools_core::PermissionRequest,
2928        ) -> PermissionDecision {
2929            if request.kind() == "filesystem.read" {
2930                return PermissionDecision::Deny(PermissionDenial {
2931                    code: PermissionCode::PathNotAllowed,
2932                    message: "reads denied in test".into(),
2933                    metadata: MetadataMap::new(),
2934                });
2935            }
2936
2937            PermissionDecision::Allow
2938        }
2939    }
2940
2941    struct ApproveFsReads;
2942
2943    impl PermissionChecker for ApproveFsReads {
2944        fn evaluate(
2945            &self,
2946            request: &dyn agentkit_tools_core::PermissionRequest,
2947        ) -> PermissionDecision {
2948            if request.kind() == "filesystem.read" {
2949                return PermissionDecision::RequireApproval(ApprovalRequest {
2950                    task_id: None,
2951                    call_id: None,
2952                    id: "approval:fs-read".into(),
2953                    request_kind: request.kind().into(),
2954                    reason: agentkit_tools_core::ApprovalReason::SensitivePath,
2955                    summary: request.summary(),
2956                    metadata: request.metadata().clone(),
2957                });
2958            }
2959
2960            PermissionDecision::Allow
2961        }
2962    }
2963
2964    struct CountTrigger;
2965
2966    impl CompactionTrigger for CountTrigger {
2967        fn should_compact(
2968            &self,
2969            _session_id: &SessionId,
2970            _turn_id: Option<&agentkit_core::TurnId>,
2971            transcript: &[Item],
2972        ) -> Option<agentkit_compaction::CompactionReason> {
2973            (transcript.len() >= 2)
2974                .then_some(agentkit_compaction::CompactionReason::TranscriptTooLong)
2975        }
2976    }
2977
2978    struct RecordingObserver {
2979        events: StdArc<StdMutex<Vec<AgentEvent>>>,
2980    }
2981
2982    impl LoopObserver for RecordingObserver {
2983        fn handle_event(&mut self, event: AgentEvent) {
2984            self.events.lock().unwrap().push(event);
2985        }
2986    }
2987
2988    struct CatalogExecutor {
2989        version: AtomicUsize,
2990        events: StdMutex<Vec<ToolCatalogEvent>>,
2991    }
2992
2993    impl CatalogExecutor {
2994        fn new() -> Self {
2995            Self {
2996                version: AtomicUsize::new(0),
2997                events: StdMutex::new(Vec::new()),
2998            }
2999        }
3000
3001        fn publish_change(&self, version: usize, event: ToolCatalogEvent) {
3002            self.version.store(version, Ordering::SeqCst);
3003            self.events.lock().unwrap().push(event);
3004        }
3005    }
3006
3007    #[async_trait]
3008    impl ToolExecutor for CatalogExecutor {
3009        fn specs(&self) -> Vec<ToolSpec> {
3010            vec![ToolSpec {
3011                name: ToolName::new("dynamic"),
3012                description: format!("dynamic version {}", self.version.load(Ordering::SeqCst)),
3013                input_schema: json!({
3014                    "type": "object",
3015                    "properties": {},
3016                    "additionalProperties": false
3017                }),
3018                annotations: ToolAnnotations::default(),
3019                metadata: MetadataMap::new(),
3020            }]
3021        }
3022
3023        fn drain_catalog_events(&self) -> Vec<ToolCatalogEvent> {
3024            std::mem::take(&mut *self.events.lock().unwrap())
3025        }
3026
3027        async fn execute(
3028            &self,
3029            request: ToolRequest,
3030            _ctx: &mut ToolContext<'_>,
3031        ) -> ToolExecutionOutcome {
3032            ToolExecutionOutcome::Completed(ToolResult {
3033                result: ToolResultPart {
3034                    call_id: request.call_id,
3035                    output: ToolOutput::Text("dynamic-ok".into()),
3036                    is_error: false,
3037                    metadata: MetadataMap::new(),
3038                },
3039                duration: None,
3040                metadata: MetadataMap::new(),
3041            })
3042        }
3043    }
3044
3045    #[derive(Clone)]
3046    struct BlockingTool {
3047        spec: ToolSpec,
3048        entered: StdArc<AtomicBool>,
3049        release: StdArc<Notify>,
3050        output: &'static str,
3051    }
3052
3053    impl BlockingTool {
3054        fn new(
3055            name: &str,
3056            entered: StdArc<AtomicBool>,
3057            release: StdArc<Notify>,
3058            output: &'static str,
3059        ) -> Self {
3060            Self {
3061                spec: ToolSpec {
3062                    name: ToolName::new(name),
3063                    description: format!("blocking tool {name}"),
3064                    input_schema: json!({
3065                        "type": "object",
3066                        "properties": {},
3067                        "additionalProperties": false
3068                    }),
3069                    annotations: ToolAnnotations::default(),
3070                    metadata: MetadataMap::new(),
3071                },
3072                entered,
3073                release,
3074                output,
3075            }
3076        }
3077    }
3078
3079    #[async_trait]
3080    impl Tool for BlockingTool {
3081        fn spec(&self) -> &ToolSpec {
3082            &self.spec
3083        }
3084
3085        async fn invoke(
3086            &self,
3087            request: agentkit_tools_core::ToolRequest,
3088            _ctx: &mut ToolContext<'_>,
3089        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
3090            self.entered.store(true, Ordering::SeqCst);
3091            self.release.notified().await;
3092            Ok(ToolResult {
3093                result: ToolResultPart {
3094                    call_id: request.call_id,
3095                    output: ToolOutput::Text(self.output.into()),
3096                    is_error: false,
3097                    metadata: MetadataMap::new(),
3098                },
3099                duration: None,
3100                metadata: MetadataMap::new(),
3101            })
3102        }
3103    }
3104
3105    struct NameRoutingPolicy {
3106        routes: Vec<(String, RoutingDecision)>,
3107    }
3108
3109    impl NameRoutingPolicy {
3110        fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
3111            Self {
3112                routes: routes
3113                    .into_iter()
3114                    .map(|(name, decision)| (name.into(), decision))
3115                    .collect(),
3116            }
3117        }
3118    }
3119
3120    impl TaskRoutingPolicy for NameRoutingPolicy {
3121        fn route(&self, request: &ToolRequest) -> RoutingDecision {
3122            self.routes
3123                .iter()
3124                .find(|(name, _)| name == &request.tool_name.0)
3125                .map(|(_, decision)| *decision)
3126                .unwrap_or(RoutingDecision::Foreground)
3127        }
3128    }
3129
3130    async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3131        timeout(Duration::from_secs(1), handle.next_event())
3132            .await
3133            .expect("timed out waiting for task event")
3134            .expect("task event stream ended unexpectedly")
3135    }
3136
3137    async fn wait_until_entered(flag: &AtomicBool) {
3138        timeout(Duration::from_secs(1), async {
3139            while !flag.load(Ordering::SeqCst) {
3140                tokio::task::yield_now().await;
3141            }
3142        })
3143        .await
3144        .expect("task never entered execution");
3145    }
3146
3147    #[tokio::test]
3148    async fn loop_continues_after_completed_tool_call() {
3149        let tools = ToolRegistry::new().with(EchoTool::default());
3150        let agent = Agent::builder()
3151            .model(FakeAdapter)
3152            .add_tool_source(tools)
3153            .permissions(AllowAllPermissions)
3154            .build()
3155            .unwrap();
3156
3157        let mut driver = agent
3158            .start(SessionConfig {
3159                session_id: SessionId::new("session-1"),
3160                metadata: MetadataMap::new(),
3161                cache: None,
3162            })
3163            .await
3164            .unwrap();
3165
3166        driver
3167            .submit_input(vec![Item {
3168                id: None,
3169                kind: ItemKind::User,
3170                parts: vec![Part::Text(TextPart {
3171                    text: "ping".into(),
3172                    metadata: MetadataMap::new(),
3173                })],
3174                metadata: MetadataMap::new(),
3175            }])
3176            .unwrap();
3177
3178        let result = run_until_finished(&mut driver).await;
3179
3180        match result {
3181            LoopStep::Finished(turn) => {
3182                assert_eq!(turn.finish_reason, FinishReason::Completed);
3183                assert_eq!(turn.items.len(), 1);
3184                match &turn.items[0].parts[0] {
3185                    Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3186                    other => panic!("unexpected part: {other:?}"),
3187                }
3188            }
3189            other => panic!("unexpected loop step: {other:?}"),
3190        }
3191    }
3192
3193    /// Test helper: drives the loop, transparently resuming non-blocking
3194    /// cooperative interrupts (AfterToolResult), until a terminal step or a
3195    /// blocking interrupt is reached.
3196    async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3197        loop {
3198            match driver.next().await.unwrap() {
3199                LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3200                step => return step,
3201            }
3202        }
3203    }
3204
3205    #[tokio::test]
3206    async fn loop_uses_injected_permission_checker() {
3207        let tools = ToolRegistry::new().with(EchoTool::default());
3208        let agent = Agent::builder()
3209            .model(FakeAdapter)
3210            .add_tool_source(tools)
3211            .permissions(DenyFsReads)
3212            .build()
3213            .unwrap();
3214
3215        let mut driver = agent
3216            .start(SessionConfig {
3217                session_id: SessionId::new("session-2"),
3218                metadata: MetadataMap::new(),
3219                cache: None,
3220            })
3221            .await
3222            .unwrap();
3223
3224        driver
3225            .submit_input(vec![Item {
3226                id: None,
3227                kind: ItemKind::User,
3228                parts: vec![Part::Text(TextPart {
3229                    text: "ping".into(),
3230                    metadata: MetadataMap::new(),
3231                })],
3232                metadata: MetadataMap::new(),
3233            }])
3234            .unwrap();
3235
3236        let result = run_until_finished(&mut driver).await;
3237
3238        match result {
3239            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3240                Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3241                other => panic!("unexpected part: {other:?}"),
3242            },
3243            other => panic!("unexpected loop step: {other:?}"),
3244        }
3245    }
3246
3247    #[tokio::test]
3248    async fn async_task_manager_background_round_requires_explicit_continue() {
3249        let entered = StdArc::new(AtomicBool::new(false));
3250        let release = StdArc::new(Notify::new());
3251        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3252            "background-wait",
3253            RoutingDecision::Background,
3254        )]));
3255        let handle = task_manager.handle();
3256        let tools = ToolRegistry::new().with(BlockingTool::new(
3257            "background-wait",
3258            entered.clone(),
3259            release.clone(),
3260            "background-done",
3261        ));
3262        let agent = Agent::builder()
3263            .model(FakeAdapter)
3264            .add_tool_source(tools)
3265            .permissions(AllowAllPermissions)
3266            .task_manager(task_manager)
3267            .build()
3268            .unwrap();
3269
3270        let mut driver = agent
3271            .start(SessionConfig {
3272                session_id: SessionId::new("session-background"),
3273                metadata: MetadataMap::new(),
3274                cache: None,
3275            })
3276            .await
3277            .unwrap();
3278
3279        driver
3280            .submit_input(vec![Item {
3281                id: None,
3282                kind: ItemKind::User,
3283                parts: vec![Part::Text(TextPart {
3284                    text: "ping".into(),
3285                    metadata: MetadataMap::new(),
3286                })],
3287                metadata: MetadataMap::new(),
3288            }])
3289            .unwrap();
3290
3291        let first = driver.next().await.unwrap();
3292        match first {
3293            LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3294            other => panic!("unexpected first loop step: {other:?}"),
3295        }
3296
3297        match wait_for_task_event(&handle).await {
3298            TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3299            other => panic!("unexpected task event: {other:?}"),
3300        }
3301        wait_until_entered(entered.as_ref()).await;
3302        release.notify_waiters();
3303
3304        match wait_for_task_event(&handle).await {
3305            TaskEvent::Completed(_, result) => {
3306                assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3307            }
3308            other => panic!("unexpected completion event: {other:?}"),
3309        }
3310
3311        let resumed = driver.next().await.unwrap();
3312        match resumed {
3313            LoopStep::Finished(turn) => {
3314                assert_eq!(turn.finish_reason, FinishReason::Completed);
3315                match &turn.items[0].parts[0] {
3316                    Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3317                    other => panic!("unexpected part after resume: {other:?}"),
3318                }
3319            }
3320            other => panic!("unexpected resumed step: {other:?}"),
3321        }
3322    }
3323
3324    #[tokio::test]
3325    async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3326        let controller = CancellationController::new();
3327        let agent = Agent::builder()
3328            .model(SlowAdapter)
3329            .cancellation(controller.handle())
3330            .build()
3331            .unwrap();
3332
3333        let mut driver = agent
3334            .start(SessionConfig {
3335                session_id: SessionId::new("session-cancel"),
3336                metadata: MetadataMap::new(),
3337                cache: None,
3338            })
3339            .await
3340            .unwrap();
3341
3342        driver
3343            .submit_input(vec![Item {
3344                id: None,
3345                kind: ItemKind::User,
3346                parts: vec![Part::Text(TextPart {
3347                    text: "do the long task".into(),
3348                    metadata: MetadataMap::new(),
3349                })],
3350                metadata: MetadataMap::new(),
3351            }])
3352            .unwrap();
3353
3354        let cancelled = tokio::join!(async { driver.next().await }, async {
3355            tokio::task::yield_now().await;
3356            controller.interrupt();
3357        })
3358        .0
3359        .unwrap();
3360
3361        match cancelled {
3362            LoopStep::Finished(turn) => {
3363                assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3364                assert_eq!(turn.items.len(), 1);
3365                assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3366                assert_eq!(
3367                    turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3368                    Some(&Value::Bool(true))
3369                );
3370            }
3371            other => panic!("unexpected loop step: {other:?}"),
3372        }
3373
3374        driver
3375            .submit_input(vec![Item {
3376                id: None,
3377                kind: ItemKind::User,
3378                parts: vec![Part::Text(TextPart {
3379                    text: "try again".into(),
3380                    metadata: MetadataMap::new(),
3381                })],
3382                metadata: MetadataMap::new(),
3383            }])
3384            .unwrap();
3385
3386        let result = driver.next().await.unwrap();
3387        match result {
3388            LoopStep::Finished(turn) => {
3389                assert_eq!(turn.finish_reason, FinishReason::Completed);
3390            }
3391            other => panic!("unexpected loop step after retry: {other:?}"),
3392        }
3393    }
3394
3395    #[tokio::test]
3396    async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3397        let controller = CancellationController::new();
3398        let fg_entered = StdArc::new(AtomicBool::new(false));
3399        let fg_release = StdArc::new(Notify::new());
3400        let bg_entered = StdArc::new(AtomicBool::new(false));
3401        let bg_release = StdArc::new(Notify::new());
3402        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3403            ("foreground-wait", RoutingDecision::Foreground),
3404            ("background-wait", RoutingDecision::Background),
3405        ]));
3406        let handle = task_manager.handle();
3407        let tools = ToolRegistry::new()
3408            .with(BlockingTool::new(
3409                "foreground-wait",
3410                fg_entered.clone(),
3411                fg_release,
3412                "foreground-done",
3413            ))
3414            .with(BlockingTool::new(
3415                "background-wait",
3416                bg_entered.clone(),
3417                bg_release.clone(),
3418                "background-done",
3419            ));
3420        let agent = Agent::builder()
3421            .model(MultiToolAdapter)
3422            .add_tool_source(tools)
3423            .permissions(AllowAllPermissions)
3424            .cancellation(controller.handle())
3425            .task_manager(task_manager)
3426            .build()
3427            .unwrap();
3428
3429        let mut driver = agent
3430            .start(SessionConfig {
3431                session_id: SessionId::new("session-mixed-cancel"),
3432                metadata: MetadataMap::new(),
3433                cache: None,
3434            })
3435            .await
3436            .unwrap();
3437
3438        driver
3439            .submit_input(vec![Item {
3440                id: None,
3441                kind: ItemKind::User,
3442                parts: vec![Part::Text(TextPart {
3443                    text: "run both".into(),
3444                    metadata: MetadataMap::new(),
3445                })],
3446                metadata: MetadataMap::new(),
3447            }])
3448            .unwrap();
3449
3450        let cancelled = tokio::join!(async { driver.next().await }, async {
3451            let _ = wait_for_task_event(&handle).await;
3452            let _ = wait_for_task_event(&handle).await;
3453            wait_until_entered(fg_entered.as_ref()).await;
3454            wait_until_entered(bg_entered.as_ref()).await;
3455            controller.interrupt();
3456        })
3457        .0
3458        .unwrap();
3459
3460        match cancelled {
3461            LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3462            other => panic!("unexpected loop step after interrupt: {other:?}"),
3463        }
3464
3465        match wait_for_task_event(&handle).await {
3466            TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3467            other => panic!("unexpected post-interrupt event: {other:?}"),
3468        }
3469
3470        let running = handle.list_running().await;
3471        assert_eq!(running.len(), 1);
3472        assert_eq!(running[0].tool_name, "background-wait");
3473
3474        bg_release.notify_waiters();
3475        match wait_for_task_event(&handle).await {
3476            TaskEvent::Completed(snapshot, result) => {
3477                assert_eq!(snapshot.tool_name, "background-wait");
3478                assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3479            }
3480            other => panic!("unexpected background completion event: {other:?}"),
3481        }
3482    }
3483
3484    #[tokio::test]
3485    async fn loop_resumes_after_approved_tool_request() {
3486        let tools = ToolRegistry::new().with(EchoTool::default());
3487        let agent = Agent::builder()
3488            .model(FakeAdapter)
3489            .add_tool_source(tools)
3490            .permissions(ApproveFsReads)
3491            .build()
3492            .unwrap();
3493
3494        let mut driver = agent
3495            .start(SessionConfig {
3496                session_id: SessionId::new("session-approval"),
3497                metadata: MetadataMap::new(),
3498                cache: None,
3499            })
3500            .await
3501            .unwrap();
3502
3503        driver
3504            .submit_input(vec![Item {
3505                id: None,
3506                kind: ItemKind::User,
3507                parts: vec![Part::Text(TextPart {
3508                    text: "ping".into(),
3509                    metadata: MetadataMap::new(),
3510                })],
3511                metadata: MetadataMap::new(),
3512            }])
3513            .unwrap();
3514
3515        let first = driver.next().await.unwrap();
3516        match first {
3517            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3518                assert!(pending.request.task_id.is_some());
3519                assert_eq!(pending.request.id.0, "approval:fs-read");
3520                pending.approve(&mut driver).unwrap();
3521            }
3522            other => panic!("unexpected loop step: {other:?}"),
3523        }
3524        let second = driver.next().await.unwrap();
3525        match second {
3526            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3527                Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3528                other => panic!("unexpected part: {other:?}"),
3529            },
3530            other => panic!("unexpected loop step after approval: {other:?}"),
3531        }
3532    }
3533
3534    #[tokio::test]
3535    async fn loop_resumes_with_patched_input_on_approval() {
3536        let tools = ToolRegistry::new().with(EchoTool::default());
3537        let agent = Agent::builder()
3538            .model(FakeAdapter)
3539            .add_tool_source(tools)
3540            .permissions(ApproveFsReads)
3541            .build()
3542            .unwrap();
3543
3544        let mut driver = agent
3545            .start(SessionConfig {
3546                session_id: SessionId::new("session-approval-patched"),
3547                metadata: MetadataMap::new(),
3548                cache: None,
3549            })
3550            .await
3551            .unwrap();
3552
3553        driver
3554            .submit_input(vec![Item {
3555                id: None,
3556                kind: ItemKind::User,
3557                parts: vec![Part::Text(TextPart {
3558                    text: "ping".into(),
3559                    metadata: MetadataMap::new(),
3560                })],
3561                metadata: MetadataMap::new(),
3562            }])
3563            .unwrap();
3564
3565        match driver.next().await.unwrap() {
3566            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3567                pending
3568                    .approve_with_patched_input(&mut driver, json!({ "value": "patched" }))
3569                    .unwrap();
3570            }
3571            other => panic!("unexpected loop step: {other:?}"),
3572        }
3573        match driver.next().await.unwrap() {
3574            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3575                Part::Text(text) => assert_eq!(text.text, "tool said: patched"),
3576                other => panic!("unexpected part: {other:?}"),
3577            },
3578            other => panic!("unexpected loop step after approval: {other:?}"),
3579        }
3580    }
3581
3582    #[tokio::test]
3583    async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3584        let tools = ToolRegistry::new().with(EchoTool::default());
3585        let agent = Agent::builder()
3586            .model(DualApprovalAdapter)
3587            .add_tool_source(tools)
3588            .permissions(ApproveFsReads)
3589            .build()
3590            .unwrap();
3591
3592        let mut driver = agent
3593            .start(SessionConfig {
3594                session_id: SessionId::new("session-dual-approval"),
3595                metadata: MetadataMap::new(),
3596                cache: None,
3597            })
3598            .await
3599            .unwrap();
3600
3601        driver
3602            .submit_input(vec![Item {
3603                id: None,
3604                kind: ItemKind::User,
3605                parts: vec![Part::Text(TextPart {
3606                    text: "run both approvals".into(),
3607                    metadata: MetadataMap::new(),
3608                })],
3609                metadata: MetadataMap::new(),
3610            }])
3611            .unwrap();
3612
3613        let pending_first = match driver.next().await.unwrap() {
3614            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3615                assert_eq!(
3616                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3617                    Some("call-1")
3618                );
3619                pending
3620            }
3621            other => panic!("unexpected first loop step: {other:?}"),
3622        };
3623
3624        let pending_second = match driver.next().await.unwrap() {
3625            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3626                assert_eq!(
3627                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3628                    Some("call-2")
3629                );
3630                pending
3631            }
3632            other => panic!("unexpected second loop step: {other:?}"),
3633        };
3634
3635        pending_second.approve(&mut driver).unwrap();
3636        match driver.next().await.unwrap() {
3637            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3638                assert_eq!(
3639                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3640                    Some("call-1")
3641                );
3642            }
3643            other => panic!("unexpected step after approving second request: {other:?}"),
3644        }
3645
3646        pending_first.approve(&mut driver).unwrap();
3647        match driver.next().await.unwrap() {
3648            LoopStep::Finished(turn) => {
3649                assert_eq!(turn.finish_reason, FinishReason::Completed);
3650                match &turn.items[0].parts[0] {
3651                    Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3652                    other => panic!("unexpected final part: {other:?}"),
3653                }
3654            }
3655            other => panic!("unexpected final loop step: {other:?}"),
3656        }
3657    }
3658
3659    #[tokio::test]
3660    async fn loop_compacts_transcript_before_new_turns() {
3661        let events = StdArc::new(StdMutex::new(Vec::new()));
3662        let agent = Agent::builder()
3663            .model(FakeAdapter)
3664            .compaction(CompactionConfig::new(
3665                CountTrigger,
3666                CompactionPipeline::new().with_strategy(KeepRecentStrategy::new(1)),
3667            ))
3668            .observer(RecordingObserver {
3669                events: events.clone(),
3670            })
3671            .build()
3672            .unwrap();
3673
3674        let mut driver = agent
3675            .start(SessionConfig {
3676                session_id: SessionId::new("session-4"),
3677                metadata: MetadataMap::new(),
3678                cache: None,
3679            })
3680            .await
3681            .unwrap();
3682
3683        for text in ["first", "second"] {
3684            driver
3685                .submit_input(vec![Item {
3686                    id: None,
3687                    kind: ItemKind::User,
3688                    parts: vec![Part::Text(TextPart {
3689                        text: text.into(),
3690                        metadata: MetadataMap::new(),
3691                    })],
3692                    metadata: MetadataMap::new(),
3693                }])
3694                .unwrap();
3695            let _ = driver.next().await.unwrap();
3696        }
3697
3698        let events = events.lock().unwrap();
3699        assert!(events.iter().any(|event| matches!(
3700            event,
3701            AgentEvent::CompactionFinished {
3702                replaced_items,
3703                ..
3704            } if *replaced_items > 0
3705        )));
3706    }
3707
3708    #[tokio::test]
3709    async fn loop_refreshes_tool_specs_each_turn() {
3710        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3711        let version = StdArc::new(AtomicUsize::new(1));
3712        let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
3713        let agent = Agent::builder()
3714            .model(RecordingAdapter {
3715                seen_descriptions: seen_descriptions.clone(),
3716                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3717            })
3718            .add_tool_source(tools)
3719            .permissions(AllowAllPermissions)
3720            .build()
3721            .unwrap();
3722
3723        let mut driver = agent
3724            .start(SessionConfig {
3725                session_id: SessionId::new("session-dynamic-tools"),
3726                metadata: MetadataMap::new(),
3727                cache: None,
3728            })
3729            .await
3730            .unwrap();
3731
3732        for text in ["first", "second"] {
3733            driver
3734                .submit_input(vec![Item {
3735                    id: None,
3736                    kind: ItemKind::User,
3737                    parts: vec![Part::Text(TextPart {
3738                        text: text.into(),
3739                        metadata: MetadataMap::new(),
3740                    })],
3741                    metadata: MetadataMap::new(),
3742                }])
3743                .unwrap();
3744
3745            let _ = driver.next().await.unwrap();
3746            if text == "first" {
3747                version.store(2, Ordering::SeqCst);
3748            }
3749        }
3750
3751        let seen_descriptions = seen_descriptions.lock().unwrap();
3752        assert_eq!(seen_descriptions.len(), 2);
3753        assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
3754        assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
3755    }
3756
3757    #[tokio::test]
3758    async fn loop_emits_catalog_change_and_uses_updated_specs_next_turn() {
3759        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3760        let events = StdArc::new(StdMutex::new(Vec::new()));
3761        let executor = StdArc::new(CatalogExecutor::new());
3762        let executor_for_agent: Arc<dyn ToolExecutor> = executor.clone();
3763        let agent = Agent::builder()
3764            .model(RecordingAdapter {
3765                seen_descriptions: seen_descriptions.clone(),
3766                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3767            })
3768            .tool_executor(executor_for_agent)
3769            .permissions(AllowAllPermissions)
3770            .observer(RecordingObserver {
3771                events: events.clone(),
3772            })
3773            .build()
3774            .unwrap();
3775
3776        let mut driver = agent
3777            .start(SessionConfig {
3778                session_id: SessionId::new("session-catalog-events"),
3779                metadata: MetadataMap::new(),
3780                cache: None,
3781            })
3782            .await
3783            .unwrap();
3784
3785        driver
3786            .submit_input(vec![Item::text(ItemKind::User, "first")])
3787            .unwrap();
3788        let _ = driver.next().await.unwrap();
3789
3790        executor.publish_change(
3791            1,
3792            ToolCatalogEvent {
3793                source: "mcp:mock".into(),
3794                added: vec!["dynamic".into()],
3795                removed: Vec::new(),
3796                changed: Vec::new(),
3797            },
3798        );
3799
3800        driver
3801            .submit_input(vec![Item::text(ItemKind::User, "second")])
3802            .unwrap();
3803        let _ = driver.next().await.unwrap();
3804
3805        let seen_descriptions = seen_descriptions.lock().unwrap();
3806        assert_eq!(seen_descriptions.len(), 2);
3807        assert_eq!(seen_descriptions[0], vec!["dynamic version 0".to_string()]);
3808        assert_eq!(seen_descriptions[1], vec!["dynamic version 1".to_string()]);
3809
3810        let events = events.lock().unwrap();
3811        assert!(events.iter().any(|event| matches!(
3812            event,
3813            AgentEvent::ToolCatalogChanged(ToolCatalogEvent {
3814                source,
3815                added,
3816                removed,
3817                changed,
3818            }) if source == "mcp:mock"
3819                && added == &vec!["dynamic".to_string()]
3820                && removed.is_empty()
3821                && changed.is_empty()
3822        )));
3823    }
3824
3825    #[tokio::test]
3826    async fn loop_passes_session_default_and_next_turn_cache_requests() {
3827        let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
3828        let agent = Agent::builder()
3829            .model(RecordingAdapter {
3830                seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
3831                seen_caches: seen_caches.clone(),
3832            })
3833            .permissions(AllowAllPermissions)
3834            .build()
3835            .unwrap();
3836
3837        let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
3838            .with_retention(PromptCacheRetention::Short);
3839        let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
3840            breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
3841        });
3842
3843        let mut driver = agent
3844            .start(SessionConfig {
3845                session_id: SessionId::new("session-cache"),
3846                metadata: MetadataMap::new(),
3847                cache: Some(default_cache.clone()),
3848            })
3849            .await
3850            .unwrap();
3851
3852        driver
3853            .submit_input(vec![Item {
3854                id: None,
3855                kind: ItemKind::User,
3856                parts: vec![Part::Text(TextPart {
3857                    text: "first".into(),
3858                    metadata: MetadataMap::new(),
3859                })],
3860                metadata: MetadataMap::new(),
3861            }])
3862            .unwrap();
3863        let _ = driver.next().await.unwrap();
3864
3865        driver
3866            .submit_input_with_cache(
3867                vec![Item {
3868                    id: None,
3869                    kind: ItemKind::User,
3870                    parts: vec![Part::Text(TextPart {
3871                        text: "second".into(),
3872                        metadata: MetadataMap::new(),
3873                    })],
3874                    metadata: MetadataMap::new(),
3875                }],
3876                override_cache.clone(),
3877            )
3878            .unwrap();
3879        let _ = driver.next().await.unwrap();
3880
3881        let seen = seen_caches.lock().unwrap();
3882        assert_eq!(seen.len(), 2);
3883        assert_eq!(seen[0], Some(default_cache));
3884        assert_eq!(seen[1], Some(override_cache));
3885    }
3886
3887    #[tokio::test]
3888    async fn loop_yields_after_tool_result_between_rounds() {
3889        let tools = ToolRegistry::new().with(EchoTool::default());
3890        let agent = Agent::builder()
3891            .model(FakeAdapter)
3892            .add_tool_source(tools)
3893            .permissions(AllowAllPermissions)
3894            .build()
3895            .unwrap();
3896
3897        let mut driver = agent
3898            .start(SessionConfig {
3899                session_id: SessionId::new("yield-session"),
3900                metadata: MetadataMap::new(),
3901                cache: None,
3902            })
3903            .await
3904            .unwrap();
3905
3906        driver
3907            .submit_input(vec![Item::text(ItemKind::User, "ping")])
3908            .unwrap();
3909
3910        // First next() runs the model turn, resolves the tool call, and
3911        // yields AfterToolResult before calling the model again.
3912        let step = driver.next().await.unwrap();
3913        let info = match step {
3914            LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
3915            other => panic!("expected AfterToolResult, got {other:?}"),
3916        };
3917        assert_eq!(info.session_id, SessionId::new("yield-session"));
3918        // Transcript at yield: [User, Assistant(tool_call), Tool(result)]
3919        assert_eq!(info.transcript_len, 3);
3920
3921        // The yield is cooperative, not blocking.
3922        let interrupt = LoopInterrupt::AfterToolResult(info.clone());
3923        assert!(!interrupt.is_blocking());
3924
3925        // Host interjects a message mid-turn.
3926        driver
3927            .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
3928            .unwrap();
3929
3930        // Second next() resumes the turn into the next model call, which
3931        // sees the tool result (and the injected user message) and finishes.
3932        let step = driver.next().await.unwrap();
3933        match step {
3934            LoopStep::Finished(turn) => {
3935                assert_eq!(turn.finish_reason, FinishReason::Completed);
3936            }
3937            other => panic!("expected Finished, got {other:?}"),
3938        }
3939
3940        // Transcript must now include the injected user message.
3941        let snapshot = driver.snapshot();
3942        let has_injected_message = snapshot.transcript.iter().any(|item| {
3943            item.kind == ItemKind::User
3944                && item.parts.iter().any(|part| match part {
3945                    Part::Text(text) => text.text == "also: report back",
3946                    _ => false,
3947                })
3948        });
3949        assert!(
3950            has_injected_message,
3951            "injected user message should be in transcript, got: {:?}",
3952            snapshot.transcript
3953        );
3954    }
3955
3956    struct RecordingTranscriptObserver {
3957        items: StdArc<StdMutex<Vec<Item>>>,
3958    }
3959
3960    impl TranscriptObserver for RecordingTranscriptObserver {
3961        fn on_item_appended(&mut self, item: &Item) {
3962            self.items.lock().unwrap().push(item.clone());
3963        }
3964    }
3965
3966    #[tokio::test]
3967    async fn observers_see_full_tool_round() {
3968        // A turn with one tool call exercises every interesting path:
3969        //   user input drained -> model output_items (assistant w/ tool call)
3970        //   -> tool result Item -> next model output_items (assistant text)
3971        // The LoopObserver should see exactly one ToolResultReceived; the
3972        // TranscriptObserver should see all four items in transcript order.
3973        let events = StdArc::new(StdMutex::new(Vec::<AgentEvent>::new()));
3974        let items = StdArc::new(StdMutex::new(Vec::<Item>::new()));
3975        let agent = Agent::builder()
3976            .model(FakeAdapter)
3977            .add_tool_source(ToolRegistry::new().with(EchoTool::default()))
3978            .permissions(AllowAllPermissions)
3979            .observer(RecordingObserver {
3980                events: events.clone(),
3981            })
3982            .transcript_observer(RecordingTranscriptObserver {
3983                items: items.clone(),
3984            })
3985            .build()
3986            .unwrap();
3987
3988        let mut driver = agent
3989            .start(SessionConfig {
3990                session_id: SessionId::new("observer-session"),
3991                metadata: MetadataMap::new(),
3992                cache: None,
3993            })
3994            .await
3995            .unwrap();
3996
3997        driver
3998            .submit_input(vec![Item {
3999                id: None,
4000                kind: ItemKind::User,
4001                parts: vec![Part::Text(TextPart {
4002                    text: "ping".into(),
4003                    metadata: MetadataMap::new(),
4004                })],
4005                metadata: MetadataMap::new(),
4006            }])
4007            .unwrap();
4008
4009        let result = run_until_finished(&mut driver).await;
4010        assert!(matches!(result, LoopStep::Finished(_)), "got {result:?}");
4011
4012        // LoopObserver: exactly one ToolResultReceived, with the echo
4013        // tool's output, correlating back to the model's tool call.
4014        let events = events.lock().unwrap().clone();
4015        let tool_call_id = events.iter().find_map(|e| match e {
4016            AgentEvent::ToolCallRequested(c) => Some(c.id.clone()),
4017            _ => None,
4018        });
4019        let tool_results: Vec<_> = events
4020            .iter()
4021            .filter_map(|e| match e {
4022                AgentEvent::ToolResultReceived(r) => Some(r.clone()),
4023                _ => None,
4024            })
4025            .collect();
4026        assert_eq!(tool_results.len(), 1, "events: {events:?}");
4027        assert_eq!(Some(tool_results[0].call_id.clone()), tool_call_id);
4028        assert!(!tool_results[0].is_error);
4029
4030        // TranscriptObserver: every transcript mutation surfaces.
4031        // Expected order: User("ping"), Assistant(tool call), Tool(result),
4032        // Assistant("tool said: pong").
4033        let items = items.lock().unwrap().clone();
4034        assert_eq!(items.len(), 4, "items: {items:?}");
4035        assert_eq!(items[0].kind, ItemKind::User);
4036        assert_eq!(items[1].kind, ItemKind::Assistant);
4037        assert!(
4038            items[1]
4039                .parts
4040                .iter()
4041                .any(|p| matches!(p, Part::ToolCall(_)))
4042        );
4043        assert_eq!(items[2].kind, ItemKind::Tool);
4044        assert!(
4045            items[2]
4046                .parts
4047                .iter()
4048                .any(|p| matches!(p, Part::ToolResult(_)))
4049        );
4050        assert_eq!(items[3].kind, ItemKind::Assistant);
4051    }
4052
4053    #[test]
4054    fn convenience_cache_builders_construct_expected_defaults() {
4055        let cache = PromptCacheRequest::automatic()
4056            .with_retention(PromptCacheRetention::Short)
4057            .with_key("workspace:demo");
4058        let session = SessionConfig::new("demo").with_cache(cache.clone());
4059
4060        assert_eq!(session.session_id, SessionId::new("demo"));
4061        assert_eq!(session.cache, Some(cache));
4062
4063        let explicit = PromptCacheRequest::explicit([
4064            PromptCacheBreakpoint::tools_end(),
4065            PromptCacheBreakpoint::transcript_item_end(2),
4066            PromptCacheBreakpoint::transcript_part_end(3, 1),
4067        ]);
4068
4069        assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
4070        assert_eq!(
4071            explicit.strategy,
4072            PromptCacheStrategy::Explicit {
4073                breakpoints: vec![
4074                    PromptCacheBreakpoint::ToolsEnd,
4075                    PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
4076                    PromptCacheBreakpoint::TranscriptPartEnd {
4077                        item_index: 3,
4078                        part_index: 1,
4079                    },
4080                ],
4081            }
4082        );
4083    }
4084}