Skip to main content

agentkit_loop/
lib.rs

1//! Runtime-agnostic agent loop orchestration for sessions, turns, tools, and interrupts.
2//!
3//! `agentkit-loop` is the central coordination layer in the agentkit workspace.  It
4//! drives a model through a multi-turn agentic loop, executing tool calls,
5//! respecting permission checks, surfacing approval and authentication interrupts
6//! to the host application, and optionally compacting the transcript when it grows
7//! too large.
8//!
9//! # Architecture
10//!
11//! The main entry point is [`Agent`], constructed via [`AgentBuilder`].  After
12//! calling [`Agent::start`] you receive a [`LoopDriver`] that yields
13//! [`LoopStep`]s -- either a finished turn or an interrupt that requires host
14//! resolution before the loop can continue.
15//!
16//! ```text
17//! Agent::builder()
18//!     .model(adapter)        // ModelAdapter implementation
19//!     .tools(registry)       // ToolRegistry with registered tools
20//!     .permissions(checker)  // PermissionChecker for gating tool use
21//!     .observer(obs)         // LoopObserver for streaming events
22//!     .build()?
23//!     .start(config).await?  -> LoopDriver
24//!         .submit_input(...)
25//!         .next().await?     -> LoopStep::Finished | LoopStep::Interrupt
26//! ```
27//!
28//! # Example
29//!
30//! ```rust,no_run
31//! use agentkit_loop::{Agent, PromptCacheRequest, PromptCacheRetention, SessionConfig};
32//!
33//! # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
34//! let agent = Agent::builder()
35//!     .model(adapter)
36//!     .build()?;
37//!
38//! let mut driver = agent
39//!     .start(
40//!         SessionConfig::new("demo").with_cache(
41//!             PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
42//!         ),
43//!     )
44//!     .await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use std::collections::{BTreeMap, VecDeque};
50use std::sync::Arc;
51
52use agentkit_compaction::{
53    CompactionConfig, CompactionContext, CompactionReason, CompactionResult,
54};
55use agentkit_core::{
56    CancellationHandle, Delta, FinishReason, Item, ItemKind, MetadataMap, Part, SessionId, TaskId,
57    TextPart, ToolCallId, ToolCallPart, ToolOutput, ToolResultPart, TurnCancellation, Usage,
58};
59use agentkit_task_manager::{
60    PendingLoopUpdates, SimpleTaskManager, TaskApproval, TaskAuth, TaskLaunchRequest, TaskManager,
61    TaskResolution, TaskStartContext, TaskStartOutcome, TurnTaskUpdate,
62};
63#[cfg(test)]
64use agentkit_tools_core::ToolContext;
65use agentkit_tools_core::{
66    ApprovalDecision, ApprovalRequest, AuthOperation, AuthRequest, AuthResolution,
67    BasicToolExecutor, OwnedToolContext, PermissionChecker, ToolError, ToolExecutor, ToolRegistry,
68    ToolRequest, ToolResources, ToolSpec,
69};
70use async_trait::async_trait;
71use serde::{Deserialize, Serialize};
72use thiserror::Error;
73
74const INTERRUPTED_METADATA_KEY: &str = "agentkit.interrupted";
75const INTERRUPT_REASON_METADATA_KEY: &str = "agentkit.interrupt_reason";
76const INTERRUPT_STAGE_METADATA_KEY: &str = "agentkit.interrupt_stage";
77const USER_CANCELLED_REASON: &str = "user_cancelled";
78
79/// Configuration required to start a new model session.
80///
81/// Pass this to [`Agent::start`] to initialise the underlying [`ModelSession`]
82/// and obtain a [`LoopDriver`].
83///
84/// # Example
85///
86/// ```rust
87/// use agentkit_loop::{PromptCacheRequest, PromptCacheRetention, SessionConfig};
88///
89/// let config = SessionConfig::new("my-session").with_cache(
90///     PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
91/// );
92/// ```
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct SessionConfig {
95    /// Unique identifier for the session.
96    pub session_id: SessionId,
97    /// Arbitrary key-value metadata forwarded to the model adapter.
98    pub metadata: MetadataMap,
99    /// Default provider-side prompt caching policy for turns in this session.
100    pub cache: Option<PromptCacheRequest>,
101}
102
103impl SessionConfig {
104    /// Builds a session config with empty metadata and no cache policy.
105    pub fn new(session_id: impl Into<SessionId>) -> Self {
106        Self {
107            session_id: session_id.into(),
108            metadata: MetadataMap::new(),
109            cache: None,
110        }
111    }
112
113    /// Replaces the session metadata map.
114    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
115        self.metadata = metadata;
116        self
117    }
118
119    /// Sets the default prompt cache request for the session.
120    pub fn with_cache(mut self, cache: PromptCacheRequest) -> Self {
121        self.cache = Some(cache);
122        self
123    }
124
125    /// Clears any default prompt cache request for the session.
126    pub fn without_cache(mut self) -> Self {
127        self.cache = None;
128        self
129    }
130}
131
132/// Strength of a prompt-cache request.
133///
134/// `BestEffort` lets adapters ignore unsupported controls while still using
135/// any provider-native automatic caching they support. `Required` upgrades
136/// unsupported cache requests into provider errors.
137#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub enum PromptCacheMode {
139    /// Disable prompt caching for this request.
140    Disabled,
141    /// Use caching when the provider can honor the request.
142    #[default]
143    BestEffort,
144    /// Fail the turn if the provider cannot honor the request.
145    Required,
146}
147
148/// High-level provider-neutral cache retention hint.
149///
150/// Providers map this to their native controls. For example, OpenAI maps
151/// `Short` to in-memory retention while OpenRouter Anthropic models map it to
152/// the default 5-minute ephemeral cache.
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
154pub enum PromptCacheRetention {
155    /// Use the provider's default cache retention.
156    Default,
157    /// Prefer the provider's short-lived cache retention mode.
158    Short,
159    /// Prefer the provider's longest generally available cache retention mode.
160    Extended,
161}
162
163/// Provider-neutral prompt caching strategy.
164#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
165pub enum PromptCacheStrategy {
166    /// Let the provider decide the cacheable prefix automatically.
167    #[default]
168    Automatic,
169    /// Apply explicit cache breakpoints to selected prefix boundaries.
170    Explicit {
171        /// Cache breakpoints in transcript/tool order.
172        breakpoints: Vec<PromptCacheBreakpoint>,
173    },
174}
175
176impl PromptCacheStrategy {
177    /// Uses the provider's native automatic cache behavior when available, or
178    /// any adapter-provided automatic planning fallback.
179    pub fn automatic() -> Self {
180        Self::Automatic
181    }
182
183    /// Uses explicit cache breakpoints.
184    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
185        Self::Explicit {
186            breakpoints: breakpoints.into_iter().collect(),
187        }
188    }
189}
190
191/// Prefix boundary that a provider should cache when using explicit caching.
192#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum PromptCacheBreakpoint {
194    /// Cache the tool schema prefix through the last available tool.
195    ToolsEnd,
196    /// Cache through the end of the transcript item at `index`.
197    TranscriptItemEnd { index: usize },
198    /// Cache through the specific transcript part.
199    ///
200    /// Not every adapter can target every part precisely; unsupported
201    /// fine-grained breakpoints become best-effort no-ops unless the request is
202    /// marked [`PromptCacheMode::Required`].
203    TranscriptPartEnd {
204        item_index: usize,
205        part_index: usize,
206    },
207}
208
209impl PromptCacheBreakpoint {
210    /// Cache the tool schema prefix through the last available tool.
211    pub fn tools_end() -> Self {
212        Self::ToolsEnd
213    }
214
215    /// Cache through the end of a transcript item.
216    pub fn transcript_item_end(index: usize) -> Self {
217        Self::TranscriptItemEnd { index }
218    }
219
220    /// Cache through a specific part within a transcript item.
221    pub fn transcript_part_end(item_index: usize, part_index: usize) -> Self {
222        Self::TranscriptPartEnd {
223            item_index,
224            part_index,
225        }
226    }
227}
228
229/// Prompt caching request sent alongside a turn.
230#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
231pub struct PromptCacheRequest {
232    /// Strength of the caching request.
233    pub mode: PromptCacheMode,
234    /// Automatic or explicit caching strategy.
235    pub strategy: PromptCacheStrategy,
236    /// Optional provider-neutral retention hint.
237    pub retention: Option<PromptCacheRetention>,
238    /// Optional provider cache key or routing key.
239    pub key: Option<String>,
240}
241
242impl PromptCacheRequest {
243    /// Builds a best-effort automatic cache request.
244    pub fn automatic() -> Self {
245        Self::best_effort(PromptCacheStrategy::automatic())
246    }
247
248    /// Builds a required automatic cache request.
249    pub fn automatic_required() -> Self {
250        Self::required(PromptCacheStrategy::automatic())
251    }
252
253    /// Builds a best-effort explicit cache request.
254    pub fn explicit(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
255        Self::best_effort(PromptCacheStrategy::explicit(breakpoints))
256    }
257
258    /// Builds a required explicit cache request.
259    pub fn explicit_required(breakpoints: impl IntoIterator<Item = PromptCacheBreakpoint>) -> Self {
260        Self::required(PromptCacheStrategy::explicit(breakpoints))
261    }
262
263    /// Builds a disabled cache request.
264    pub fn disabled() -> Self {
265        Self {
266            mode: PromptCacheMode::Disabled,
267            strategy: PromptCacheStrategy::Automatic,
268            retention: None,
269            key: None,
270        }
271    }
272
273    /// Builds a best-effort cache request with the given strategy.
274    pub fn best_effort(strategy: PromptCacheStrategy) -> Self {
275        Self {
276            mode: PromptCacheMode::BestEffort,
277            strategy,
278            retention: None,
279            key: None,
280        }
281    }
282
283    /// Builds a required cache request with the given strategy.
284    pub fn required(strategy: PromptCacheStrategy) -> Self {
285        Self {
286            mode: PromptCacheMode::Required,
287            strategy,
288            retention: None,
289            key: None,
290        }
291    }
292
293    /// Overrides the request mode.
294    pub fn with_mode(mut self, mode: PromptCacheMode) -> Self {
295        self.mode = mode;
296        self
297    }
298
299    /// Overrides the request strategy.
300    pub fn with_strategy(mut self, strategy: PromptCacheStrategy) -> Self {
301        self.strategy = strategy;
302        self
303    }
304
305    /// Applies a provider-neutral retention hint.
306    pub fn with_retention(mut self, retention: PromptCacheRetention) -> Self {
307        self.retention = Some(retention);
308        self
309    }
310
311    /// Applies a provider cache key or routing key.
312    pub fn with_key(mut self, key: impl Into<String>) -> Self {
313        self.key = Some(key.into());
314        self
315    }
316
317    /// Clears any provider-neutral retention hint.
318    pub fn without_retention(mut self) -> Self {
319        self.retention = None;
320        self
321    }
322
323    /// Clears any provider cache key or routing key.
324    pub fn without_key(mut self) -> Self {
325        self.key = None;
326        self
327    }
328
329    /// Returns true when caching should be active for this request.
330    pub fn is_enabled(&self) -> bool {
331        !matches!(self.mode, PromptCacheMode::Disabled)
332    }
333}
334
335/// Payload sent to the model at the start of each turn.
336///
337/// The [`LoopDriver`] constructs this automatically from its internal state
338/// and passes it to [`ModelSession::begin_turn`].  Model adapter authors
339/// use the fields to build the provider-specific request.
340#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
341pub struct TurnRequest {
342    /// Session this turn belongs to.
343    pub session_id: SessionId,
344    /// Unique identifier for the current turn.
345    pub turn_id: agentkit_core::TurnId,
346    /// Full conversation transcript accumulated so far.
347    pub transcript: Vec<Item>,
348    /// Tool specifications the model may invoke during this turn.
349    pub available_tools: Vec<ToolSpec>,
350    /// Provider-side prompt caching request for this turn.
351    pub cache: Option<PromptCacheRequest>,
352    /// Per-turn metadata (e.g. provider hints).
353    pub metadata: MetadataMap,
354}
355
356/// Final result produced by a single model turn.
357///
358/// Returned inside [`ModelTurnEvent::Finished`] to signal that the model has
359/// completed its generation for this turn.
360#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
361pub struct ModelTurnResult {
362    /// Why the model stopped generating (e.g. completed, tool call, length).
363    pub finish_reason: FinishReason,
364    /// Items the model produced during this turn (text, tool calls, etc.).
365    pub output_items: Vec<Item>,
366    /// Token usage statistics, if available.
367    pub usage: Option<Usage>,
368    /// Provider-specific metadata about the turn.
369    pub metadata: MetadataMap,
370}
371
372/// Streaming event emitted by a [`ModelTurn`] during generation.
373///
374/// The [`LoopDriver`] consumes these events one-by-one via
375/// [`ModelTurn::next_event`] and translates them into [`AgentEvent`]s for
376/// observers and into transcript mutations.
377#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
378pub enum ModelTurnEvent {
379    /// Incremental text or content delta from the model.
380    Delta(Delta),
381    /// The model is requesting a tool call.
382    ToolCall(ToolCallPart),
383    /// Updated token usage statistics.
384    Usage(Usage),
385    /// The model has finished generating for this turn.
386    Finished(ModelTurnResult),
387}
388
389/// Factory for creating model sessions.
390///
391/// Implement this trait to integrate a model provider (e.g. OpenRouter,
392/// Anthropic, a local LLM server) with the agent loop.  [`Agent`] holds a
393/// single adapter and calls [`start_session`](ModelAdapter::start_session)
394/// once when [`Agent::start`] is invoked.
395///
396/// # Example
397///
398/// ```rust,no_run
399/// use agentkit_loop::{ModelAdapter, ModelSession, SessionConfig, LoopError};
400/// use async_trait::async_trait;
401///
402/// struct MyAdapter;
403///
404/// #[async_trait]
405/// impl ModelAdapter for MyAdapter {
406///     type Session = MySession;
407///
408///     async fn start_session(&self, config: SessionConfig) -> Result<MySession, LoopError> {
409///         // Initialize provider-specific session state here.
410///         Ok(MySession { /* ... */ })
411///     }
412/// }
413/// # struct MySession;
414/// # #[async_trait]
415/// # impl ModelSession for MySession {
416/// #     type Turn = MyTurn;
417/// #     async fn begin_turn(&mut self, _r: agentkit_loop::TurnRequest, _c: Option<agentkit_core::TurnCancellation>) -> Result<MyTurn, LoopError> { todo!() }
418/// # }
419/// # struct MyTurn;
420/// # #[async_trait]
421/// # impl agentkit_loop::ModelTurn for MyTurn {
422/// #     async fn next_event(&mut self, _c: Option<agentkit_core::TurnCancellation>) -> Result<Option<agentkit_loop::ModelTurnEvent>, LoopError> { todo!() }
423/// # }
424/// ```
425#[async_trait]
426pub trait ModelAdapter: Send + Sync {
427    /// The session type produced by this adapter.
428    type Session: ModelSession;
429
430    /// Create a new model session from the given configuration.
431    ///
432    /// # Errors
433    ///
434    /// Returns [`LoopError`] if the provider connection or initialisation fails.
435    async fn start_session(&self, config: SessionConfig) -> Result<Self::Session, LoopError>;
436}
437
438/// An active model session that can produce sequential turns.
439///
440/// A session is created once per [`Agent::start`] call and lives for the
441/// lifetime of the [`LoopDriver`].  Each call to [`begin_turn`](ModelSession::begin_turn)
442/// hands the full transcript to the model and returns a streaming
443/// [`ModelTurn`].
444#[async_trait]
445pub trait ModelSession: Send {
446    /// The turn type produced by this session.
447    type Turn: ModelTurn;
448
449    /// Start a new turn, sending the transcript and available tools to the model.
450    ///
451    /// # Arguments
452    ///
453    /// * `request` -- the turn payload including transcript and tool specs.
454    /// * `cancellation` -- optional handle the implementation should poll to
455    ///   detect user-initiated cancellation.
456    ///
457    /// # Errors
458    ///
459    /// Returns [`LoopError::Cancelled`] when the turn is cancelled, or a
460    /// provider-specific error wrapped in [`LoopError`].
461    async fn begin_turn(
462        &mut self,
463        request: TurnRequest,
464        cancellation: Option<TurnCancellation>,
465    ) -> Result<Self::Turn, LoopError>;
466}
467
468/// A streaming model turn that yields events one at a time.
469///
470/// The loop driver calls [`next_event`](ModelTurn::next_event) repeatedly
471/// until it returns `Ok(None)` (stream exhausted) or
472/// `Ok(Some(ModelTurnEvent::Finished(_)))`.
473#[async_trait]
474pub trait ModelTurn: Send {
475    /// Retrieve the next event from the model's response stream.
476    ///
477    /// Returns `Ok(None)` when the stream is exhausted.
478    ///
479    /// # Errors
480    ///
481    /// Returns [`LoopError::Cancelled`] if `cancellation` fires, or a
482    /// provider-specific error wrapped in [`LoopError`].
483    async fn next_event(
484        &mut self,
485        cancellation: Option<TurnCancellation>,
486    ) -> Result<Option<ModelTurnEvent>, LoopError>;
487}
488
489/// Observer hook for streaming agent events to the host application.
490///
491/// Register observers via [`AgentBuilder::observer`] to receive real-time
492/// notifications about deltas, tool calls, usage, warnings, and lifecycle
493/// events.
494///
495/// # Example
496///
497/// ```rust
498/// use agentkit_loop::{AgentEvent, LoopObserver};
499///
500/// struct StdoutObserver;
501///
502/// impl LoopObserver for StdoutObserver {
503///     fn handle_event(&mut self, event: AgentEvent) {
504///         println!("{event:?}");
505///     }
506/// }
507/// ```
508pub trait LoopObserver: Send {
509    /// Called synchronously for every [`AgentEvent`] emitted by the loop driver.
510    fn handle_event(&mut self, event: AgentEvent);
511}
512
513/// Lifecycle and streaming events emitted by the [`LoopDriver`].
514///
515/// Observers (see [`LoopObserver`]) receive these events in the order they
516/// occur.  They are useful for building UIs, logging, or telemetry.
517#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
518pub enum AgentEvent {
519    /// The agent run has been initialised.
520    RunStarted { session_id: SessionId },
521    /// A new model turn is starting.
522    TurnStarted {
523        session_id: SessionId,
524        turn_id: agentkit_core::TurnId,
525    },
526    /// User input has been accepted into the pending queue.
527    InputAccepted {
528        session_id: SessionId,
529        items: Vec<Item>,
530    },
531    /// Incremental content delta from the model.
532    ContentDelta(Delta),
533    /// The model has requested a tool call.
534    ToolCallRequested(ToolCallPart),
535    /// A tool call requires explicit user approval before execution.
536    ApprovalRequired(ApprovalRequest),
537    /// A tool call requires authentication before execution.
538    AuthRequired(AuthRequest),
539    /// An approval interrupt has been resolved.
540    ApprovalResolved { approved: bool },
541    /// An authentication interrupt has been resolved.
542    AuthResolved { provided: bool },
543    /// Transcript compaction is about to begin.
544    CompactionStarted {
545        session_id: SessionId,
546        turn_id: Option<agentkit_core::TurnId>,
547        reason: CompactionReason,
548    },
549    /// Transcript compaction has finished.
550    CompactionFinished {
551        session_id: SessionId,
552        turn_id: Option<agentkit_core::TurnId>,
553        replaced_items: usize,
554        transcript_len: usize,
555        metadata: MetadataMap,
556    },
557    /// Updated token usage statistics.
558    UsageUpdated(Usage),
559    /// Non-fatal warning (e.g. a tool failure that was recovered from).
560    Warning { message: String },
561    /// The agent run has failed with an unrecoverable error.
562    RunFailed { message: String },
563    /// A turn has finished (successfully, via cancellation, etc.).
564    TurnFinished(TurnResult),
565}
566
567/// Handle for a pending approval interrupt.
568///
569/// Wraps an [`ApprovalRequest`] and provides ergonomic resolution methods
570/// so callers can resolve the interrupt directly instead of searching for
571/// the matching method on [`LoopDriver`].
572///
573/// # Example
574///
575/// ```rust,no_run
576/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
577/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
578/// match driver.next().await? {
579///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
580///         println!("Needs approval: {}", pending.request.summary);
581///         pending.approve(driver)?;
582///     }
583///     _ => {}
584/// }
585/// # Ok(())
586/// # }
587/// ```
588#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
589pub struct PendingApproval {
590    /// The underlying approval request details.
591    pub request: ApprovalRequest,
592}
593
594impl std::ops::Deref for PendingApproval {
595    type Target = ApprovalRequest;
596    fn deref(&self) -> &ApprovalRequest {
597        &self.request
598    }
599}
600
601impl PendingApproval {
602    /// Approve the pending tool call.
603    pub fn approve<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
604        let call_id = self
605            .request
606            .call_id
607            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
608        driver.resolve_approval_for(call_id, ApprovalDecision::Approve)
609    }
610
611    /// Deny the pending tool call.
612    pub fn deny<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
613        let call_id = self
614            .request
615            .call_id
616            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
617        driver.resolve_approval_for(call_id, ApprovalDecision::Deny { reason: None })
618    }
619
620    /// Deny the pending tool call with a reason.
621    pub fn deny_with_reason<S: ModelSession>(
622        self,
623        driver: &mut LoopDriver<S>,
624        reason: impl Into<String>,
625    ) -> Result<(), LoopError> {
626        let call_id = self
627            .request
628            .call_id
629            .ok_or_else(|| LoopError::InvalidState("pending approval is missing call id".into()))?;
630        driver.resolve_approval_for(
631            call_id,
632            ApprovalDecision::Deny {
633                reason: Some(reason.into()),
634            },
635        )
636    }
637}
638
639/// Handle for a pending authentication interrupt.
640///
641/// Wraps an [`AuthRequest`] and provides ergonomic resolution methods.
642///
643/// # Example
644///
645/// ```rust,no_run
646/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
647/// # use agentkit_core::MetadataMap;
648/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
649/// match driver.next().await? {
650///     LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
651///         println!("Auth required from: {}", pending.request.provider);
652///         pending.cancel(driver)?;
653///     }
654///     _ => {}
655/// }
656/// # Ok(())
657/// # }
658/// ```
659#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
660pub struct PendingAuth {
661    /// The underlying auth request details.
662    pub request: AuthRequest,
663}
664
665impl std::ops::Deref for PendingAuth {
666    type Target = AuthRequest;
667    fn deref(&self) -> &AuthRequest {
668        &self.request
669    }
670}
671
672impl PendingAuth {
673    /// Provide credentials to satisfy the auth request.
674    pub fn provide<S: ModelSession>(
675        self,
676        driver: &mut LoopDriver<S>,
677        credentials: MetadataMap,
678    ) -> Result<(), LoopError> {
679        driver.resolve_auth(AuthResolution::Provided {
680            request: self.request,
681            credentials,
682        })
683    }
684
685    /// Cancel the auth flow.
686    pub fn cancel<S: ModelSession>(self, driver: &mut LoopDriver<S>) -> Result<(), LoopError> {
687        driver.resolve_auth(AuthResolution::Cancelled {
688            request: self.request,
689        })
690    }
691}
692
693/// Descriptor for a [`LoopInterrupt::AwaitingInput`] interrupt.
694///
695/// Returned when the driver has no pending input and needs the host to
696/// supply items before advancing.
697///
698/// # Example
699///
700/// ```rust,no_run
701/// # use agentkit_loop::{LoopInterrupt, LoopStep, LoopDriver};
702/// # use agentkit_core::Item;
703/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>, items: Vec<Item>) -> Result<(), agentkit_loop::LoopError> {
704/// match driver.next().await? {
705///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
706///         pending.submit(driver, items)?;
707///     }
708///     _ => {}
709/// }
710/// # Ok(())
711/// # }
712/// ```
713#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
714pub struct InputRequest {
715    /// The session that is waiting for input.
716    pub session_id: SessionId,
717    /// Human-readable explanation of why input is needed.
718    pub reason: String,
719}
720
721impl InputRequest {
722    /// Submit input items to the driver.
723    pub fn submit<S: ModelSession>(
724        self,
725        driver: &mut LoopDriver<S>,
726        items: Vec<Item>,
727    ) -> Result<(), LoopError> {
728        driver.submit_input(items)
729    }
730}
731
732/// Outcome of a completed (or cancelled) turn.
733///
734/// Wrapped by [`LoopStep::Finished`] and also emitted as
735/// [`AgentEvent::TurnFinished`] to observers.
736#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
737pub struct TurnResult {
738    /// Identifier for the turn that produced this result.
739    pub turn_id: agentkit_core::TurnId,
740    /// Why the turn ended (completed, tool call, cancelled, etc.).
741    pub finish_reason: FinishReason,
742    /// Items produced during this turn (assistant text, tool results, etc.).
743    pub items: Vec<Item>,
744    /// Aggregated token usage, if reported by the model.
745    pub usage: Option<Usage>,
746    /// Additional metadata about the turn.
747    pub metadata: MetadataMap,
748}
749
750/// An interrupt that pauses the agent loop until the host resolves it.
751///
752/// The loop returns an interrupt inside [`LoopStep::Interrupt`] whenever it
753/// cannot proceed autonomously.  Each variant carries a handle with
754/// resolution methods so callers can resolve the interrupt directly.
755///
756/// # Example
757///
758/// ```rust,no_run
759/// use agentkit_loop::{LoopInterrupt, LoopStep};
760/// # use agentkit_loop::LoopDriver;
761///
762/// # async fn handle<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
763/// match driver.next().await? {
764///     LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
765///         println!("Tool {} needs approval: {}", pending.request.request_kind, pending.request.summary);
766///         pending.approve(driver)?;
767///     }
768///     LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
769///         println!("Auth required from provider: {}", pending.request.provider);
770///         pending.cancel(driver)?;
771///     }
772///     LoopStep::Interrupt(LoopInterrupt::AwaitingInput(pending)) => {
773///         println!("Waiting for input: {}", pending.reason);
774///         // ... call pending.submit(driver, items)
775///     }
776///     LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => {
777///         // Cooperative yield between tool rounds.  Optionally call
778///         // driver.submit_input(...) to interject a user message, then
779///         // call driver.next() to resume the turn.
780///         let _ = info;
781///     }
782///     LoopStep::Finished(result) => {
783///         println!("Turn finished: {:?}", result.finish_reason);
784///     }
785/// }
786/// # Ok(())
787/// # }
788/// ```
789#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
790pub enum LoopInterrupt {
791    /// A tool call requires explicit approval before it can execute.
792    ApprovalRequest(PendingApproval),
793    /// A tool call requires authentication credentials.
794    AuthRequest(PendingAuth),
795    /// The driver has no pending input and needs the host to supply some.
796    AwaitingInput(InputRequest),
797    /// A tool round finished: all tool calls from the previous assistant
798    /// message now have results in the transcript, and the driver is about to
799    /// invoke the model again.  The host may call
800    /// [`LoopDriver::submit_input`] to interject user messages before the
801    /// next model turn, then call [`LoopDriver::next`] to resume.
802    ///
803    /// This is a non-blocking interrupt: callers that do not care about
804    /// mid-turn interjection can treat it as a no-op (`_ => continue`) and
805    /// the next `next()` call resumes the turn.
806    AfterToolResult(ToolRoundInfo),
807}
808
809impl LoopInterrupt {
810    /// Returns `true` if the interrupt must be explicitly resolved before
811    /// the loop can make progress.  Approvals and auth requests are
812    /// blocking; [`AwaitingInput`](LoopInterrupt::AwaitingInput) and
813    /// [`AfterToolResult`](LoopInterrupt::AfterToolResult) are cooperative
814    /// and can be ignored by calling [`LoopDriver::next`] again.
815    pub fn is_blocking(&self) -> bool {
816        matches!(
817            self,
818            LoopInterrupt::ApprovalRequest(_) | LoopInterrupt::AuthRequest(_)
819        )
820    }
821}
822
823/// Metadata describing a completed tool round, surfaced via
824/// [`LoopInterrupt::AfterToolResult`].
825#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
826pub struct ToolRoundInfo {
827    /// The session that produced this tool round.
828    pub session_id: SessionId,
829    /// The turn that is about to continue into the next model call.
830    pub turn_id: agentkit_core::TurnId,
831    /// Transcript length at the yield point (for snapshots / UIs).
832    pub transcript_len: usize,
833}
834
835impl ToolRoundInfo {
836    /// Convenience: forwards to [`LoopDriver::submit_input`].
837    pub fn submit<S: ModelSession>(
838        &self,
839        driver: &mut LoopDriver<S>,
840        items: Vec<Item>,
841    ) -> Result<(), LoopError> {
842        driver.submit_input(items)
843    }
844}
845
846/// The result of advancing the agent loop by one step.
847///
848/// Returned by [`LoopDriver::next`].  The host should pattern-match on this
849/// to decide whether to continue the loop or resolve an interrupt first.
850///
851/// # Example
852///
853/// ```rust,no_run
854/// use agentkit_loop::LoopStep;
855/// # use agentkit_loop::LoopDriver;
856///
857/// # async fn run<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
858/// loop {
859///     match driver.next().await? {
860///         LoopStep::Finished(result) => {
861///             println!("Turn complete: {:?}", result.finish_reason);
862///             break;
863///         }
864///         LoopStep::Interrupt(interrupt) => {
865///             // Resolve the interrupt, then continue the loop.
866///             # break;
867///         }
868///     }
869/// }
870/// # Ok(())
871/// # }
872/// ```
873#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
874pub enum LoopStep {
875    /// The loop is paused and requires host action.
876    Interrupt(LoopInterrupt),
877    /// A turn has completed (or been cancelled).
878    Finished(TurnResult),
879}
880
881/// A read-only snapshot of the loop driver's current state.
882///
883/// Obtained via [`LoopDriver::snapshot`].  Useful for persisting or
884/// inspecting the conversation transcript without holding a mutable
885/// reference to the driver.
886#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
887pub struct LoopSnapshot {
888    /// Session identifier.
889    pub session_id: SessionId,
890    /// The full transcript accumulated so far.
891    pub transcript: Vec<Item>,
892    /// Input items queued but not yet consumed by a turn.
893    pub pending_input: Vec<Item>,
894}
895
896#[derive(Clone, Debug)]
897struct PendingApprovalToolCall {
898    request: ApprovalRequest,
899    decision: Option<ApprovalDecision>,
900    surfaced: bool,
901    turn_id: agentkit_core::TurnId,
902    task_id: TaskId,
903    call: ToolCallPart,
904    tool_request: ToolRequest,
905}
906
907#[derive(Clone, Debug)]
908struct PendingAuthToolCall {
909    request: AuthRequest,
910    resolution: Option<AuthResolution>,
911    turn_id: agentkit_core::TurnId,
912    task_id: TaskId,
913    call: ToolCallPart,
914    tool_request: ToolRequest,
915}
916
917#[derive(Clone, Debug, Default)]
918struct ActiveToolRound {
919    turn_id: agentkit_core::TurnId,
920    pending_calls: VecDeque<(ToolCallPart, ToolRequest)>,
921    background_pending: bool,
922    foreground_progressed: bool,
923}
924
925/// A configured agent ready to start a session.
926///
927/// Build one with [`Agent::builder`], supplying at minimum a [`ModelAdapter`].
928/// Then call [`Agent::start`] with a [`SessionConfig`] to obtain a
929/// [`LoopDriver`] that drives the agentic loop.
930///
931/// # Example
932///
933/// ```rust,no_run
934/// use agentkit_loop::{
935///     Agent, LoopStep, PromptCacheRequest, PromptCacheRetention, SessionConfig,
936/// };
937/// use agentkit_tools_core::ToolRegistry;
938///
939/// # async fn example<M: agentkit_loop::ModelAdapter>(adapter: M) -> Result<(), agentkit_loop::LoopError> {
940/// let agent = Agent::builder()
941///     .model(adapter)
942///     .tools(ToolRegistry::new())
943///     .build()?;
944///
945/// let mut driver = agent
946///     .start(
947///         SessionConfig::new("s1").with_cache(
948///             PromptCacheRequest::automatic().with_retention(PromptCacheRetention::Short),
949///         ),
950///     )
951///     .await?;
952///
953/// // Submit input and advance
954/// # Ok(())
955/// # }
956/// ```
957pub struct Agent<M>
958where
959    M: ModelAdapter,
960{
961    model: M,
962    tools: ToolRegistry,
963    task_manager: Arc<dyn TaskManager>,
964    permissions: Arc<dyn PermissionChecker>,
965    resources: Arc<dyn ToolResources>,
966    cancellation: Option<CancellationHandle>,
967    compaction: Option<CompactionConfig>,
968    observers: Vec<Box<dyn LoopObserver>>,
969}
970
971impl<M> Agent<M>
972where
973    M: ModelAdapter,
974{
975    /// Create a new [`AgentBuilder`] for configuring this agent.
976    pub fn builder() -> AgentBuilder<M> {
977        AgentBuilder::default()
978    }
979
980    /// Consume the agent and start a new session, returning a [`LoopDriver`].
981    ///
982    /// This calls [`ModelAdapter::start_session`] and emits an
983    /// [`AgentEvent::RunStarted`] event to all registered observers.
984    ///
985    /// # Errors
986    ///
987    /// Returns [`LoopError`] if the model adapter fails to create a session.
988    pub async fn start(self, config: SessionConfig) -> Result<LoopDriver<M::Session>, LoopError> {
989        let session_id = config.session_id.clone();
990        let default_cache = config.cache.clone();
991        let session = self.model.start_session(config).await?;
992        let tool_executor = Arc::new(BasicToolExecutor::new(self.tools.clone()));
993        let mut driver = LoopDriver {
994            session_id: session_id.clone(),
995            default_cache,
996            next_turn_cache: None,
997            session: Some(session),
998            tool_executor,
999            task_manager: self.task_manager,
1000            permissions: self.permissions,
1001            resources: self.resources,
1002            cancellation: self.cancellation,
1003            compaction: self.compaction,
1004            observers: self.observers,
1005            transcript: Vec::new(),
1006            pending_input: Vec::new(),
1007            pending_approvals: BTreeMap::new(),
1008            pending_approval_order: VecDeque::new(),
1009            pending_auth: None,
1010            active_tool_round: None,
1011            pending_round_resume: None,
1012            next_turn_index: 1,
1013        };
1014        driver.emit(AgentEvent::RunStarted { session_id });
1015        Ok(driver)
1016    }
1017}
1018
1019/// Builder for constructing an [`Agent`].
1020///
1021/// Obtained via [`Agent::builder`].  The only required field is
1022/// [`model`](AgentBuilder::model); all others have sensible defaults
1023/// (no tools, allow-all permissions, no compaction, no observers).
1024pub struct AgentBuilder<M>
1025where
1026    M: ModelAdapter,
1027{
1028    model: Option<M>,
1029    tools: ToolRegistry,
1030    task_manager: Option<Arc<dyn TaskManager>>,
1031    permissions: Arc<dyn PermissionChecker>,
1032    resources: Arc<dyn ToolResources>,
1033    cancellation: Option<CancellationHandle>,
1034    compaction: Option<CompactionConfig>,
1035    observers: Vec<Box<dyn LoopObserver>>,
1036}
1037
1038impl<M> Default for AgentBuilder<M>
1039where
1040    M: ModelAdapter,
1041{
1042    fn default() -> Self {
1043        Self {
1044            model: None,
1045            tools: ToolRegistry::new(),
1046            task_manager: None,
1047            permissions: Arc::new(AllowAllPermissions),
1048            resources: Arc::new(()),
1049            cancellation: None,
1050            compaction: None,
1051            observers: Vec::new(),
1052        }
1053    }
1054}
1055
1056impl<M> AgentBuilder<M>
1057where
1058    M: ModelAdapter,
1059{
1060    /// Set the model adapter (required).
1061    pub fn model(mut self, model: M) -> Self {
1062        self.model = Some(model);
1063        self
1064    }
1065
1066    /// Set the tool registry.  Defaults to an empty [`ToolRegistry`].
1067    pub fn tools(mut self, tools: ToolRegistry) -> Self {
1068        self.tools = tools;
1069        self
1070    }
1071
1072    /// Set the task manager that schedules tool-call execution.
1073    ///
1074    /// Defaults to [`SimpleTaskManager`], which preserves the existing
1075    /// sequential request/response behavior.
1076    pub fn task_manager(mut self, manager: impl TaskManager + 'static) -> Self {
1077        self.task_manager = Some(Arc::new(manager));
1078        self
1079    }
1080
1081    /// Set the permission checker that gates tool execution.
1082    ///
1083    /// Defaults to allowing all tool calls without prompting.
1084    pub fn permissions(mut self, permissions: impl PermissionChecker + 'static) -> Self {
1085        self.permissions = Arc::new(permissions);
1086        self
1087    }
1088
1089    /// Set shared resources available to tool implementations.
1090    pub fn resources(mut self, resources: impl ToolResources + 'static) -> Self {
1091        self.resources = Arc::new(resources);
1092        self
1093    }
1094
1095    /// Attach a [`CancellationHandle`] for cooperative cancellation of turns.
1096    pub fn cancellation(mut self, handle: CancellationHandle) -> Self {
1097        self.cancellation = Some(handle);
1098        self
1099    }
1100
1101    /// Enable transcript compaction with the given configuration.
1102    ///
1103    /// When configured, the driver checks the compaction trigger before each
1104    /// turn and applies the compaction strategy if the transcript is too long.
1105    pub fn compaction(mut self, config: CompactionConfig) -> Self {
1106        self.compaction = Some(config);
1107        self
1108    }
1109
1110    /// Register a [`LoopObserver`] that receives [`AgentEvent`]s.
1111    ///
1112    /// Multiple observers may be registered; they are called in order.
1113    pub fn observer(mut self, observer: impl LoopObserver + 'static) -> Self {
1114        self.observers.push(Box::new(observer));
1115        self
1116    }
1117
1118    /// Consume the builder and produce an [`Agent`].
1119    ///
1120    /// # Errors
1121    ///
1122    /// Returns [`LoopError::InvalidState`] if no model adapter was provided.
1123    pub fn build(self) -> Result<Agent<M>, LoopError> {
1124        let model = self
1125            .model
1126            .ok_or_else(|| LoopError::InvalidState("model adapter is required".into()))?;
1127        Ok(Agent {
1128            model,
1129            tools: self.tools,
1130            task_manager: self
1131                .task_manager
1132                .unwrap_or_else(|| Arc::new(SimpleTaskManager::new())),
1133            permissions: self.permissions,
1134            resources: self.resources,
1135            cancellation: self.cancellation,
1136            compaction: self.compaction,
1137            observers: self.observers,
1138        })
1139    }
1140}
1141
1142/// The runtime driver that advances the agent loop step by step.
1143///
1144/// Obtained from [`Agent::start`].  The typical usage pattern is:
1145///
1146/// 1. Call [`submit_input`](LoopDriver::submit_input) to enqueue user messages.
1147/// 2. Call [`next`](LoopDriver::next) to run the next turn.
1148/// 3. Handle the returned [`LoopStep`]:
1149///    - [`LoopStep::Finished`] -- the turn completed, inspect the result.
1150///    - [`LoopStep::Interrupt`] -- resolve the interrupt and call `next` again.
1151///
1152/// # Example
1153///
1154/// ```rust,no_run
1155/// use agentkit_core::{Item, ItemKind};
1156/// use agentkit_loop::{LoopDriver, LoopStep};
1157///
1158/// # async fn drive<S: agentkit_loop::ModelSession>(driver: &mut LoopDriver<S>) -> Result<(), agentkit_loop::LoopError> {
1159/// driver.submit_input(vec![Item::text(ItemKind::User, "Hello!")])?;
1160///
1161/// let step = driver.next().await?;
1162/// match step {
1163///     LoopStep::Finished(result) => println!("Done: {:?}", result.finish_reason),
1164///     LoopStep::Interrupt(interrupt) => {
1165///         // Resolve the interrupt (approval, auth, or input), then call next() again.
1166///         println!("Interrupted: {interrupt:?}");
1167///     }
1168/// }
1169/// # Ok(())
1170/// # }
1171/// ```
1172pub struct LoopDriver<S>
1173where
1174    S: ModelSession,
1175{
1176    session_id: SessionId,
1177    default_cache: Option<PromptCacheRequest>,
1178    next_turn_cache: Option<PromptCacheRequest>,
1179    session: Option<S>,
1180    tool_executor: Arc<dyn ToolExecutor>,
1181    task_manager: Arc<dyn TaskManager>,
1182    permissions: Arc<dyn PermissionChecker>,
1183    resources: Arc<dyn ToolResources>,
1184    cancellation: Option<CancellationHandle>,
1185    compaction: Option<CompactionConfig>,
1186    observers: Vec<Box<dyn LoopObserver>>,
1187    transcript: Vec<Item>,
1188    pending_input: Vec<Item>,
1189    pending_approvals: BTreeMap<ToolCallId, PendingApprovalToolCall>,
1190    pending_approval_order: VecDeque<ToolCallId>,
1191    pending_auth: Option<PendingAuthToolCall>,
1192    active_tool_round: Option<ActiveToolRound>,
1193    pending_round_resume: Option<agentkit_core::TurnId>,
1194    next_turn_index: u64,
1195}
1196
1197impl<S> LoopDriver<S>
1198where
1199    S: ModelSession,
1200{
1201    fn start_task_via_manager(
1202        &self,
1203        task_id: Option<TaskId>,
1204        tool_request: ToolRequest,
1205        approved_request: Option<ApprovalRequest>,
1206        cancellation: Option<TurnCancellation>,
1207    ) -> impl std::future::Future<Output = Result<TaskStartOutcome, LoopError>> + Send + 'static
1208    {
1209        let task_manager = self.task_manager.clone();
1210        let tool_executor = self.tool_executor.clone();
1211        let permissions = self.permissions.clone();
1212        let resources = self.resources.clone();
1213        let session_id = self.session_id.clone();
1214        let turn_id = tool_request.turn_id.clone();
1215        let metadata = tool_request.metadata.clone();
1216
1217        async move {
1218            task_manager
1219                .start_task(
1220                    TaskLaunchRequest {
1221                        task_id,
1222                        request: tool_request.clone(),
1223                        approved_request,
1224                    },
1225                    TaskStartContext {
1226                        executor: tool_executor,
1227                        tool_context: OwnedToolContext {
1228                            session_id,
1229                            turn_id,
1230                            metadata,
1231                            permissions,
1232                            resources,
1233                            cancellation,
1234                        },
1235                    },
1236                )
1237                .await
1238                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))
1239        }
1240    }
1241
1242    fn has_pending_interrupts(&self) -> bool {
1243        self.pending_auth.is_some() || !self.pending_approvals.is_empty()
1244    }
1245
1246    fn enqueue_pending_approval(&mut self, turn_id: &agentkit_core::TurnId, task: TaskApproval) {
1247        let call_id = task.tool_request.call_id.clone();
1248        let call = ToolCallPart {
1249            id: call_id.clone(),
1250            name: task.tool_request.tool_name.to_string(),
1251            input: task.tool_request.input.clone(),
1252            metadata: task.tool_request.metadata.clone(),
1253        };
1254        let mut request = task.approval;
1255        request.call_id = Some(call_id.clone());
1256        let pending = PendingApprovalToolCall {
1257            request: request.clone(),
1258            decision: None,
1259            surfaced: false,
1260            turn_id: turn_id.clone(),
1261            task_id: task.task_id,
1262            call,
1263            tool_request: task.tool_request,
1264        };
1265        self.pending_approvals.insert(call_id.clone(), pending);
1266        if !self.pending_approval_order.iter().any(|id| id == &call_id) {
1267            self.pending_approval_order.push_back(call_id);
1268        }
1269        self.emit(AgentEvent::ApprovalRequired(request));
1270    }
1271
1272    fn take_next_unsurfaced_approval_interrupt(&mut self) -> Option<LoopStep> {
1273        for call_id in self.pending_approval_order.clone() {
1274            let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1275                continue;
1276            };
1277            if pending.decision.is_none() && !pending.surfaced {
1278                pending.surfaced = true;
1279                return Some(LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(
1280                    PendingApproval {
1281                        request: pending.request.clone(),
1282                    },
1283                )));
1284            }
1285        }
1286        None
1287    }
1288
1289    fn next_unresolved_approval_interrupt(&self) -> Option<LoopStep> {
1290        self.pending_approval_order.iter().find_map(|call_id| {
1291            self.pending_approvals.get(call_id).and_then(|pending| {
1292                pending.decision.is_none().then(|| {
1293                    LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(PendingApproval {
1294                        request: pending.request.clone(),
1295                    }))
1296                })
1297            })
1298        })
1299    }
1300
1301    fn take_next_resolved_approval(&mut self) -> Option<PendingApprovalToolCall> {
1302        let call_id = self.pending_approval_order.iter().find_map(|call_id| {
1303            self.pending_approvals
1304                .get(call_id)
1305                .and_then(|pending| pending.decision.as_ref().map(|_| call_id.clone()))
1306        })?;
1307        self.pending_approval_order.retain(|id| id != &call_id);
1308        self.pending_approvals.remove(&call_id)
1309    }
1310
1311    fn pending_auth_interrupt(&self) -> Option<LoopStep> {
1312        self.pending_auth.as_ref().and_then(|pending| {
1313            pending.resolution.is_none().then(|| {
1314                LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth {
1315                    request: pending.request.clone(),
1316                }))
1317            })
1318        })
1319    }
1320
1321    fn queue_auth_interrupt(
1322        &mut self,
1323        turn_id: &agentkit_core::TurnId,
1324        task: TaskAuth,
1325    ) -> LoopStep {
1326        let call = ToolCallPart {
1327            id: task.tool_request.call_id.clone(),
1328            name: task.tool_request.tool_name.to_string(),
1329            input: task.tool_request.input.clone(),
1330            metadata: task.tool_request.metadata.clone(),
1331        };
1332        let request = upgrade_auth_request(task.auth, &task.tool_request, &call);
1333        self.pending_auth = Some(PendingAuthToolCall {
1334            request: request.clone(),
1335            resolution: None,
1336            turn_id: turn_id.clone(),
1337            task_id: task.task_id,
1338            call,
1339            tool_request: task.tool_request,
1340        });
1341        self.emit(AgentEvent::AuthRequired(request.clone()));
1342        LoopStep::Interrupt(LoopInterrupt::AuthRequest(PendingAuth { request }))
1343    }
1344
1345    fn queue_resolution_interrupt(
1346        &mut self,
1347        turn_id: &agentkit_core::TurnId,
1348        resolution: TaskResolution,
1349    ) -> Option<LoopStep> {
1350        match resolution {
1351            TaskResolution::Item(item) => {
1352                self.transcript.push(item);
1353                None
1354            }
1355            TaskResolution::Approval(task) => {
1356                self.enqueue_pending_approval(turn_id, task);
1357                self.take_next_unsurfaced_approval_interrupt()
1358            }
1359            TaskResolution::Auth(task) => Some(self.queue_auth_interrupt(turn_id, task)),
1360        }
1361    }
1362
1363    async fn drain_pending_loop_updates(&mut self) -> Result<(bool, Option<LoopStep>), LoopError> {
1364        let PendingLoopUpdates { mut resolutions } = self
1365            .task_manager
1366            .take_pending_loop_updates()
1367            .await
1368            .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1369        let mut saw_items = false;
1370        while let Some(resolution) = resolutions.pop_front() {
1371            match resolution {
1372                TaskResolution::Item(item) => {
1373                    self.transcript.push(item);
1374                    saw_items = true;
1375                }
1376                TaskResolution::Approval(task) => {
1377                    self.enqueue_pending_approval(&task.tool_request.turn_id.clone(), task);
1378                }
1379                TaskResolution::Auth(task) => {
1380                    return Ok((
1381                        saw_items,
1382                        Some(self.queue_auth_interrupt(&task.tool_request.turn_id.clone(), task)),
1383                    ));
1384                }
1385            }
1386        }
1387        Ok((saw_items, self.take_next_unsurfaced_approval_interrupt()))
1388    }
1389
1390    async fn maybe_compact(
1391        &mut self,
1392        turn_id: Option<&agentkit_core::TurnId>,
1393        cancellation: Option<TurnCancellation>,
1394    ) -> Result<(), LoopError> {
1395        let Some(compaction) = self.compaction.as_ref().cloned() else {
1396            return Ok(());
1397        };
1398        if cancellation
1399            .as_ref()
1400            .is_some_and(TurnCancellation::is_cancelled)
1401        {
1402            return Err(LoopError::Cancelled);
1403        }
1404        let Some(reason) =
1405            compaction
1406                .trigger
1407                .should_compact(&self.session_id, turn_id, &self.transcript)
1408        else {
1409            return Ok(());
1410        };
1411
1412        self.emit(AgentEvent::CompactionStarted {
1413            session_id: self.session_id.clone(),
1414            turn_id: turn_id.cloned(),
1415            reason: reason.clone(),
1416        });
1417
1418        let CompactionResult {
1419            transcript,
1420            replaced_items,
1421            metadata,
1422        } = compaction
1423            .strategy
1424            .apply(
1425                agentkit_compaction::CompactionRequest {
1426                    session_id: self.session_id.clone(),
1427                    turn_id: turn_id.cloned(),
1428                    transcript: self.transcript.clone(),
1429                    reason,
1430                    metadata: compaction.metadata.clone(),
1431                },
1432                &mut CompactionContext {
1433                    backend: compaction.backend.as_deref(),
1434                    metadata: &compaction.metadata,
1435                    cancellation,
1436                },
1437            )
1438            .await
1439            .map_err(|error| match error {
1440                agentkit_compaction::CompactionError::Cancelled => LoopError::Cancelled,
1441                other => LoopError::Compaction(other.to_string()),
1442            })?;
1443
1444        self.transcript = transcript;
1445        self.emit(AgentEvent::CompactionFinished {
1446            session_id: self.session_id.clone(),
1447            turn_id: turn_id.cloned(),
1448            replaced_items,
1449            transcript_len: self.transcript.len(),
1450            metadata,
1451        });
1452        Ok(())
1453    }
1454
1455    async fn continue_active_tool_round(&mut self) -> Result<Option<LoopStep>, LoopError> {
1456        let Some(_) = self.active_tool_round.as_ref() else {
1457            return Ok(None);
1458        };
1459        loop {
1460            let cancellation = self
1461                .cancellation
1462                .as_ref()
1463                .map(CancellationHandle::checkpoint);
1464            let turn_id = self
1465                .active_tool_round
1466                .as_ref()
1467                .map(|active| active.turn_id.clone())
1468                .ok_or_else(|| LoopError::InvalidState("missing active tool round".into()))?;
1469
1470            if cancellation
1471                .as_ref()
1472                .is_some_and(TurnCancellation::is_cancelled)
1473            {
1474                self.task_manager
1475                    .on_turn_interrupted(&turn_id)
1476                    .await
1477                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1478                self.active_tool_round = None;
1479                return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1480            }
1481
1482            let next_call = self
1483                .active_tool_round
1484                .as_mut()
1485                .and_then(|active| active.pending_calls.pop_front());
1486            if let Some((_call, tool_request)) = next_call {
1487                match self
1488                    .start_task_via_manager(None, tool_request.clone(), None, cancellation.clone())
1489                    .await?
1490                {
1491                    TaskStartOutcome::Ready(resolution) => {
1492                        let resolution = *resolution;
1493                        match resolution {
1494                            TaskResolution::Item(item) => {
1495                                if let Some(active) = self.active_tool_round.as_mut() {
1496                                    active.foreground_progressed = true;
1497                                }
1498                                self.transcript.push(item);
1499                            }
1500                            TaskResolution::Approval(task) => {
1501                                self.enqueue_pending_approval(&turn_id, task);
1502                            }
1503                            TaskResolution::Auth(task) => {
1504                                return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1505                            }
1506                        }
1507                        continue;
1508                    }
1509                    TaskStartOutcome::Pending { kind, .. } => {
1510                        if kind == agentkit_task_manager::TaskKind::Background
1511                            && let Some(active) = self.active_tool_round.as_mut()
1512                        {
1513                            active.background_pending = true;
1514                        }
1515                        continue;
1516                    }
1517                }
1518            }
1519
1520            match self
1521                .task_manager
1522                .wait_for_turn(&turn_id, cancellation.clone())
1523                .await
1524                .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?
1525            {
1526                Some(TurnTaskUpdate::Resolution(resolution)) => {
1527                    let resolution = *resolution;
1528                    match resolution {
1529                        TaskResolution::Item(item) => {
1530                            if let Some(active) = self.active_tool_round.as_mut() {
1531                                active.foreground_progressed = true;
1532                            }
1533                            self.transcript.push(item);
1534                        }
1535                        TaskResolution::Approval(task) => {
1536                            self.enqueue_pending_approval(&turn_id, task);
1537                        }
1538                        TaskResolution::Auth(task) => {
1539                            return Ok(Some(self.queue_auth_interrupt(&turn_id, task)));
1540                        }
1541                    }
1542                }
1543                Some(TurnTaskUpdate::Detached(snapshot)) => {
1544                    // The task was promoted to background. Push a synthetic
1545                    // tool result so the model knows the call is still
1546                    // running and can continue its turn.
1547                    self.transcript.push(Item {
1548                        id: None,
1549                        kind: ItemKind::Tool,
1550                        parts: vec![Part::ToolResult(ToolResultPart {
1551                            call_id: snapshot.call_id,
1552                            output: ToolOutput::Text(format!(
1553                                "Tool {} is now running in the background. \
1554                                 The result will be delivered when it completes.",
1555                                snapshot.tool_name,
1556                            )),
1557                            is_error: false,
1558                            metadata: MetadataMap::new(),
1559                        })],
1560                        metadata: MetadataMap::new(),
1561                    });
1562                    if let Some(active) = self.active_tool_round.as_mut() {
1563                        active.background_pending = true;
1564                        active.foreground_progressed = true;
1565                    }
1566                }
1567                None => {
1568                    if cancellation
1569                        .as_ref()
1570                        .is_some_and(TurnCancellation::is_cancelled)
1571                    {
1572                        self.task_manager
1573                            .on_turn_interrupted(&turn_id)
1574                            .await
1575                            .map_err(|error| {
1576                                LoopError::Tool(ToolError::Internal(error.to_string()))
1577                            })?;
1578                        self.active_tool_round = None;
1579                        return self.finish_cancelled(turn_id, Vec::new()).map(Some);
1580                    }
1581                    let active = self.active_tool_round.take().ok_or_else(|| {
1582                        LoopError::InvalidState("missing active tool round".into())
1583                    })?;
1584                    if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1585                        return Ok(Some(step));
1586                    }
1587                    if let Some(step) = self.pending_auth_interrupt() {
1588                        return Ok(Some(step));
1589                    }
1590                    if let Some(step) = self.next_unresolved_approval_interrupt() {
1591                        return Ok(Some(step));
1592                    }
1593                    if active.background_pending && !active.foreground_progressed {
1594                        return Ok(None);
1595                    }
1596                    // Yield control back to the host between tool rounds.
1597                    // All tool calls in this round have results in the
1598                    // transcript; the transcript is provider-valid.  The
1599                    // host may submit_input before calling next() to
1600                    // resume, which will re-enter drive_turn via
1601                    // pending_round_resume.
1602                    let info = ToolRoundInfo {
1603                        session_id: self.session_id.clone(),
1604                        turn_id: turn_id.clone(),
1605                        transcript_len: self.transcript.len(),
1606                    };
1607                    self.pending_round_resume = Some(turn_id);
1608                    return Ok(Some(LoopStep::Interrupt(LoopInterrupt::AfterToolResult(
1609                        info,
1610                    ))));
1611                }
1612            }
1613        }
1614    }
1615
1616    async fn drive_turn(
1617        &mut self,
1618        turn_id: agentkit_core::TurnId,
1619        emit_started: bool,
1620    ) -> Result<LoopStep, LoopError> {
1621        let cancellation = self
1622            .cancellation
1623            .as_ref()
1624            .map(CancellationHandle::checkpoint);
1625        match self
1626            .maybe_compact(Some(&turn_id), cancellation.clone())
1627            .await
1628        {
1629            Ok(()) => {}
1630            Err(LoopError::Cancelled) => {
1631                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1632            }
1633            Err(error) => return Err(error),
1634        }
1635        if emit_started {
1636            self.emit(AgentEvent::TurnStarted {
1637                session_id: self.session_id.clone(),
1638                turn_id: turn_id.clone(),
1639            });
1640        }
1641        if cancellation
1642            .as_ref()
1643            .is_some_and(TurnCancellation::is_cancelled)
1644        {
1645            return self.finish_cancelled(turn_id, interrupted_assistant_items());
1646        }
1647
1648        let request = TurnRequest {
1649            session_id: self.session_id.clone(),
1650            turn_id: turn_id.clone(),
1651            transcript: self.transcript.clone(),
1652            available_tools: self.tool_executor.specs(),
1653            cache: self
1654                .next_turn_cache
1655                .take()
1656                .or_else(|| self.default_cache.clone()),
1657            metadata: MetadataMap::new(),
1658        };
1659
1660        let session = self
1661            .session
1662            .as_mut()
1663            .ok_or_else(|| LoopError::InvalidState("model session is not available".into()))?;
1664        let mut turn = match session.begin_turn(request, cancellation.clone()).await {
1665            Ok(turn) => turn,
1666            Err(LoopError::Cancelled) => {
1667                self.task_manager
1668                    .on_turn_interrupted(&turn_id)
1669                    .await
1670                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1671                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1672            }
1673            Err(error) => return Err(error),
1674        };
1675        let mut saw_tool_call = false;
1676        let mut finished_result = None;
1677
1678        while let Some(event) = match turn.next_event(cancellation.clone()).await {
1679            Ok(event) => event,
1680            Err(LoopError::Cancelled) => {
1681                self.task_manager
1682                    .on_turn_interrupted(&turn_id)
1683                    .await
1684                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1685                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1686            }
1687            Err(error) => return Err(error),
1688        } {
1689            if cancellation
1690                .as_ref()
1691                .is_some_and(TurnCancellation::is_cancelled)
1692            {
1693                self.task_manager
1694                    .on_turn_interrupted(&turn_id)
1695                    .await
1696                    .map_err(|error| LoopError::Tool(ToolError::Internal(error.to_string())))?;
1697                return self.finish_cancelled(turn_id, interrupted_assistant_items());
1698            }
1699            match event {
1700                ModelTurnEvent::Delta(delta) => self.emit(AgentEvent::ContentDelta(delta)),
1701                ModelTurnEvent::Usage(usage) => self.emit(AgentEvent::UsageUpdated(usage)),
1702                ModelTurnEvent::ToolCall(call) => {
1703                    saw_tool_call = true;
1704                    self.emit(AgentEvent::ToolCallRequested(call.clone()));
1705                }
1706                ModelTurnEvent::Finished(result) => {
1707                    finished_result = Some(result);
1708                    break;
1709                }
1710            }
1711        }
1712
1713        let result = finished_result.ok_or_else(|| {
1714            LoopError::Provider("model turn ended without a Finished event".into())
1715        })?;
1716        self.transcript.extend(result.output_items.clone());
1717
1718        if saw_tool_call {
1719            let pending_calls = extract_tool_calls(&result.output_items)
1720                .into_iter()
1721                .map(|call| {
1722                    let tool_request = ToolRequest {
1723                        call_id: call.id.clone(),
1724                        tool_name: agentkit_tools_core::ToolName::new(call.name.clone()),
1725                        input: call.input.clone(),
1726                        session_id: self.session_id.clone(),
1727                        turn_id: turn_id.clone(),
1728                        metadata: call.metadata.clone(),
1729                    };
1730                    (call, tool_request)
1731                })
1732                .collect();
1733            self.active_tool_round = Some(ActiveToolRound {
1734                turn_id: turn_id.clone(),
1735                pending_calls,
1736                background_pending: false,
1737                foreground_progressed: false,
1738            });
1739            if let Some(step) = self.continue_active_tool_round().await? {
1740                return Ok(step);
1741            }
1742            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
1743                InputRequest {
1744                    session_id: self.session_id.clone(),
1745                    reason: "driver is waiting for input".into(),
1746                },
1747            )));
1748        }
1749
1750        let turn_result = TurnResult {
1751            turn_id,
1752            finish_reason: result.finish_reason,
1753            items: result.output_items,
1754            usage: result.usage,
1755            metadata: result.metadata,
1756        };
1757        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1758        Ok(LoopStep::Finished(turn_result))
1759    }
1760
1761    async fn resume_after_auth(
1762        &mut self,
1763        pending: PendingAuthToolCall,
1764    ) -> Result<LoopStep, LoopError> {
1765        let resolution = pending
1766            .resolution
1767            .clone()
1768            .ok_or_else(|| LoopError::InvalidState("pending auth has no resolution".into()))?;
1769        match resolution {
1770            AuthResolution::Provided { .. } => match self
1771                .start_task_via_manager(
1772                    Some(pending.task_id.clone()),
1773                    pending.tool_request.clone(),
1774                    None,
1775                    self.cancellation
1776                        .as_ref()
1777                        .map(CancellationHandle::checkpoint),
1778                )
1779                .await?
1780            {
1781                TaskStartOutcome::Ready(resolution) => {
1782                    let resolution = *resolution;
1783                    if let Some(step) =
1784                        self.queue_resolution_interrupt(&pending.turn_id, resolution)
1785                    {
1786                        return Ok(step);
1787                    }
1788                }
1789                TaskStartOutcome::Pending { .. } => {}
1790            },
1791            AuthResolution::Cancelled { .. } => {
1792                self.transcript.push(Item {
1793                    id: None,
1794                    kind: ItemKind::Tool,
1795                    parts: vec![Part::ToolResult(ToolResultPart {
1796                        call_id: pending.call.id.clone(),
1797                        output: ToolOutput::Text("auth cancelled".into()),
1798                        is_error: true,
1799                        metadata: pending.call.metadata.clone(),
1800                    })],
1801                    metadata: MetadataMap::new(),
1802                });
1803            }
1804        }
1805
1806        if let Some(step) = self.continue_active_tool_round().await? {
1807            Ok(step)
1808        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1809            Ok(step)
1810        } else if let Some(step) = self.pending_auth_interrupt() {
1811            Ok(step)
1812        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1813            Ok(step)
1814        } else {
1815            self.drive_turn(pending.turn_id, false).await
1816        }
1817    }
1818
1819    async fn resume_after_approval(
1820        &mut self,
1821        pending: PendingApprovalToolCall,
1822    ) -> Result<LoopStep, LoopError> {
1823        let decision = pending
1824            .decision
1825            .clone()
1826            .ok_or_else(|| LoopError::InvalidState("pending approval has no decision".into()))?;
1827
1828        match decision {
1829            ApprovalDecision::Approve => match self
1830                .start_task_via_manager(
1831                    Some(pending.task_id.clone()),
1832                    pending.tool_request.clone(),
1833                    Some(pending.request.clone()),
1834                    self.cancellation
1835                        .as_ref()
1836                        .map(CancellationHandle::checkpoint),
1837                )
1838                .await?
1839            {
1840                TaskStartOutcome::Ready(resolution) => {
1841                    let resolution = *resolution;
1842                    if let Some(step) =
1843                        self.queue_resolution_interrupt(&pending.turn_id, resolution)
1844                    {
1845                        return Ok(step);
1846                    }
1847                }
1848                TaskStartOutcome::Pending { .. } => {}
1849            },
1850            ApprovalDecision::Deny { reason } => {
1851                self.transcript.push(Item {
1852                    id: None,
1853                    kind: ItemKind::Tool,
1854                    parts: vec![Part::ToolResult(ToolResultPart {
1855                        call_id: pending.call.id.clone(),
1856                        output: ToolOutput::Text(
1857                            reason.unwrap_or_else(|| "approval denied".into()),
1858                        ),
1859                        is_error: true,
1860                        metadata: pending.call.metadata.clone(),
1861                    })],
1862                    metadata: MetadataMap::new(),
1863                });
1864            }
1865        }
1866
1867        if let Some(step) = self.continue_active_tool_round().await? {
1868            Ok(step)
1869        } else if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
1870            Ok(step)
1871        } else if let Some(step) = self.pending_auth_interrupt() {
1872            Ok(step)
1873        } else if let Some(step) = self.next_unresolved_approval_interrupt() {
1874            Ok(step)
1875        } else {
1876            self.drive_turn(pending.turn_id, false).await
1877        }
1878    }
1879
1880    fn finish_cancelled(
1881        &mut self,
1882        turn_id: agentkit_core::TurnId,
1883        items: Vec<Item>,
1884    ) -> Result<LoopStep, LoopError> {
1885        self.transcript.extend(items.clone());
1886        let turn_result = TurnResult {
1887            turn_id,
1888            finish_reason: FinishReason::Cancelled,
1889            items,
1890            usage: None,
1891            metadata: interrupted_metadata("turn"),
1892        };
1893        self.emit(AgentEvent::TurnFinished(turn_result.clone()));
1894        Ok(LoopStep::Finished(turn_result))
1895    }
1896
1897    /// Enqueue user input items for the next turn.
1898    ///
1899    /// Items are buffered and consumed the next time [`next`](LoopDriver::next)
1900    /// is called.  Must not be called while an interrupt is pending.
1901    ///
1902    /// # Errors
1903    ///
1904    /// Returns [`LoopError::InvalidState`] if an interrupt is still unresolved.
1905    pub fn submit_input(&mut self, input: Vec<Item>) -> Result<(), LoopError> {
1906        if self.has_pending_interrupts() {
1907            return Err(LoopError::InvalidState(
1908                "cannot submit input while an interrupt is pending".into(),
1909            ));
1910        }
1911        self.emit(AgentEvent::InputAccepted {
1912            session_id: self.session_id.clone(),
1913            items: input.clone(),
1914        });
1915        self.pending_input.extend(input);
1916        Ok(())
1917    }
1918
1919    /// Override the prompt cache request for the next model turn.
1920    ///
1921    /// The override is consumed the next time the driver starts a model turn.
1922    /// Session-level defaults still apply to later turns.
1923    pub fn set_next_turn_cache(&mut self, cache: PromptCacheRequest) -> Result<(), LoopError> {
1924        if self.has_pending_interrupts() {
1925            return Err(LoopError::InvalidState(
1926                "cannot update next-turn cache while an interrupt is pending".into(),
1927            ));
1928        }
1929        self.next_turn_cache = Some(cache);
1930        Ok(())
1931    }
1932
1933    /// Enqueue user input and set a prompt cache override for the next model
1934    /// turn in one call.
1935    pub fn submit_input_with_cache(
1936        &mut self,
1937        input: Vec<Item>,
1938        cache: PromptCacheRequest,
1939    ) -> Result<(), LoopError> {
1940        self.set_next_turn_cache(cache)?;
1941        self.submit_input(input)
1942    }
1943
1944    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`].
1945    ///
1946    /// After calling this, invoke [`next`](LoopDriver::next) to continue the
1947    /// loop.  If the decision is [`ApprovalDecision::Approve`] the tool call
1948    /// executes; if denied, an error result is fed back to the model.
1949    ///
1950    /// # Errors
1951    ///
1952    /// Returns [`LoopError::InvalidState`] if no approval is pending.
1953    pub fn resolve_approval_for(
1954        &mut self,
1955        call_id: ToolCallId,
1956        decision: ApprovalDecision,
1957    ) -> Result<(), LoopError> {
1958        let Some(pending) = self.pending_approvals.get_mut(&call_id) else {
1959            return Err(LoopError::InvalidState(format!(
1960                "no approval request is pending for call {}",
1961                call_id.0
1962            )));
1963        };
1964        pending.decision = Some(decision.clone());
1965        self.emit(AgentEvent::ApprovalResolved {
1966            approved: matches!(decision, ApprovalDecision::Approve),
1967        });
1968        Ok(())
1969    }
1970
1971    /// Resolve a pending [`LoopInterrupt::ApprovalRequest`] when exactly one
1972    /// approval is outstanding.
1973    pub fn resolve_approval(&mut self, decision: ApprovalDecision) -> Result<(), LoopError> {
1974        let mut unresolved = self
1975            .pending_approval_order
1976            .iter()
1977            .filter(|call_id| {
1978                self.pending_approvals
1979                    .get(*call_id)
1980                    .is_some_and(|pending| pending.decision.is_none())
1981            })
1982            .cloned();
1983        let Some(call_id) = unresolved.next() else {
1984            return Err(LoopError::InvalidState(
1985                "no approval request is pending".into(),
1986            ));
1987        };
1988        if unresolved.next().is_some() {
1989            return Err(LoopError::InvalidState(
1990                "multiple approvals are pending; use resolve_approval_for".into(),
1991            ));
1992        }
1993        self.resolve_approval_for(call_id, decision)
1994    }
1995
1996    /// Resolve a pending [`LoopInterrupt::AuthRequest`].
1997    ///
1998    /// The resolution must reference the same request id as the pending
1999    /// [`AuthRequest`].  After calling this, invoke [`next`](LoopDriver::next)
2000    /// to continue the loop.
2001    ///
2002    /// # Errors
2003    ///
2004    /// Returns [`LoopError::InvalidState`] if no auth request is pending or
2005    /// if the resolution's request id does not match.
2006    pub fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), LoopError> {
2007        let Some(pending) = self.pending_auth.as_mut() else {
2008            return Err(LoopError::InvalidState("no auth request is pending".into()));
2009        };
2010        if pending.request.id != resolution.request().id {
2011            return Err(LoopError::InvalidState(
2012                "auth resolution does not match the pending request".into(),
2013            ));
2014        }
2015        pending.resolution = Some(resolution.clone());
2016        self.emit(AgentEvent::AuthResolved {
2017            provided: matches!(resolution, AuthResolution::Provided { .. }),
2018        });
2019        Ok(())
2020    }
2021
2022    /// Take a read-only snapshot of the driver's current transcript and input queue.
2023    pub fn snapshot(&self) -> LoopSnapshot {
2024        LoopSnapshot {
2025            session_id: self.session_id.clone(),
2026            transcript: self.transcript.clone(),
2027            pending_input: self.pending_input.clone(),
2028        }
2029    }
2030
2031    /// Advance the loop by one step.
2032    ///
2033    /// This is the main method for driving the agent.  It processes pending
2034    /// interrupt resolutions, consumes queued input, starts a model turn,
2035    /// executes tool calls, and returns once the turn finishes or an
2036    /// interrupt occurs.
2037    ///
2038    /// If no input is queued and no interrupt is pending, returns
2039    /// [`LoopStep::Interrupt(LoopInterrupt::AwaitingInput(..))`](LoopInterrupt::AwaitingInput).
2040    ///
2041    /// # Errors
2042    ///
2043    /// Returns [`LoopError::InvalidState`] if called while an unresolved
2044    /// interrupt is pending, or propagates provider / tool / compaction errors.
2045    pub async fn next(&mut self) -> Result<LoopStep, LoopError> {
2046        if self
2047            .pending_auth
2048            .as_ref()
2049            .is_some_and(|pending| pending.resolution.is_some())
2050        {
2051            let pending = self
2052                .pending_auth
2053                .take()
2054                .ok_or_else(|| LoopError::InvalidState("missing pending auth state".into()))?;
2055            return self.resume_after_auth(pending).await;
2056        }
2057
2058        if let Some(pending) = self.take_next_resolved_approval() {
2059            return self.resume_after_approval(pending).await;
2060        }
2061
2062        if let Some(step) = self.take_next_unsurfaced_approval_interrupt() {
2063            return Ok(step);
2064        }
2065
2066        if let Some(step) = self.pending_auth_interrupt() {
2067            return Ok(step);
2068        }
2069
2070        if let Some(step) = self.next_unresolved_approval_interrupt() {
2071            return Ok(step);
2072        }
2073
2074        if let Some(step) = self.continue_active_tool_round().await? {
2075            return Ok(step);
2076        }
2077
2078        let (had_loop_updates, loop_step) = self.drain_pending_loop_updates().await?;
2079        if let Some(step) = loop_step {
2080            return Ok(step);
2081        }
2082
2083        // Resume after an AfterToolResult yield.  Any input submitted by the
2084        // host during the yield is folded into the transcript as part of the
2085        // continuation turn; background task results drained just above are
2086        // already in the transcript.
2087        if let Some(turn_id) = self.pending_round_resume.take() {
2088            self.transcript.append(&mut self.pending_input);
2089            return self.drive_turn(turn_id, false).await;
2090        }
2091
2092        if self.pending_input.is_empty() && !had_loop_updates {
2093            return Ok(LoopStep::Interrupt(LoopInterrupt::AwaitingInput(
2094                InputRequest {
2095                    session_id: self.session_id.clone(),
2096                    reason: "driver is waiting for input".into(),
2097                },
2098            )));
2099        }
2100
2101        let turn_id = agentkit_core::TurnId::new(format!("turn-{}", self.next_turn_index));
2102        self.next_turn_index += 1;
2103        self.transcript.append(&mut self.pending_input);
2104        self.drive_turn(turn_id, true).await
2105    }
2106
2107    fn emit(&mut self, event: AgentEvent) {
2108        for observer in &mut self.observers {
2109            observer.handle_event(event.clone());
2110        }
2111    }
2112}
2113
2114fn interrupted_metadata(stage: &str) -> MetadataMap {
2115    let mut metadata = MetadataMap::new();
2116    metadata.insert(INTERRUPTED_METADATA_KEY.into(), true.into());
2117    metadata.insert(
2118        INTERRUPT_REASON_METADATA_KEY.into(),
2119        USER_CANCELLED_REASON.into(),
2120    );
2121    metadata.insert(INTERRUPT_STAGE_METADATA_KEY.into(), stage.into());
2122    metadata
2123}
2124
2125fn interrupted_assistant_items() -> Vec<Item> {
2126    vec![Item {
2127        id: None,
2128        kind: ItemKind::Assistant,
2129        parts: vec![Part::Text(TextPart {
2130            text: "Previous assistant response was interrupted by the user before completion."
2131                .into(),
2132            metadata: interrupted_metadata("assistant"),
2133        })],
2134        metadata: interrupted_metadata("assistant"),
2135    }]
2136}
2137
2138fn extract_tool_calls(items: &[Item]) -> Vec<ToolCallPart> {
2139    let mut calls = Vec::new();
2140    for item in items {
2141        for part in &item.parts {
2142            if let Part::ToolCall(call) = part {
2143                calls.push(call.clone());
2144            }
2145        }
2146    }
2147    calls
2148}
2149
2150fn upgrade_auth_request(
2151    mut request: AuthRequest,
2152    tool_request: &ToolRequest,
2153    _call: &ToolCallPart,
2154) -> AuthRequest {
2155    if matches!(request.operation, AuthOperation::ToolCall { .. }) {
2156        return request;
2157    }
2158
2159    let prior_server_id = request.challenge.get("server_id").cloned();
2160    let mut metadata = tool_request.metadata.clone();
2161    if let Some(server_id) = prior_server_id {
2162        metadata.entry("server_id".into()).or_insert(server_id);
2163    }
2164    request.operation = AuthOperation::ToolCall {
2165        tool_name: tool_request.tool_name.0.clone(),
2166        input: tool_request.input.clone(),
2167        call_id: Some(tool_request.call_id.clone()),
2168        session_id: Some(tool_request.session_id.clone()),
2169        turn_id: Some(tool_request.turn_id.clone()),
2170        metadata,
2171    };
2172    request
2173}
2174
2175struct AllowAllPermissions;
2176
2177impl PermissionChecker for AllowAllPermissions {
2178    fn evaluate(
2179        &self,
2180        _request: &dyn agentkit_tools_core::PermissionRequest,
2181    ) -> agentkit_tools_core::PermissionDecision {
2182        agentkit_tools_core::PermissionDecision::Allow
2183    }
2184}
2185
2186/// Errors that can occur while driving the agent loop.
2187#[derive(Debug, Error)]
2188pub enum LoopError {
2189    /// The driver was in an unexpected state for the requested operation.
2190    #[error("invalid driver state: {0}")]
2191    InvalidState(String),
2192    /// The current turn was cancelled via the [`CancellationHandle`].
2193    #[error("turn cancelled")]
2194    Cancelled,
2195    /// An error originating from the model provider.
2196    #[error("provider error: {0}")]
2197    Provider(String),
2198    /// An error originating from tool execution.
2199    #[error("tool error: {0}")]
2200    Tool(#[from] ToolError),
2201    /// An error that occurred during transcript compaction.
2202    #[error("compaction error: {0}")]
2203    Compaction(String),
2204    /// The requested operation is not supported.
2205    #[error("unsupported operation: {0}")]
2206    Unsupported(String),
2207}
2208
2209#[cfg(test)]
2210mod tests {
2211    use std::collections::VecDeque;
2212    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2213    use std::sync::{Arc as StdArc, Mutex as StdMutex};
2214
2215    use agentkit_compaction::{CompactionPipeline, CompactionTrigger, KeepRecentStrategy};
2216    use agentkit_core::{
2217        CancellationController, ItemKind, Part, TextPart, ToolCallId, ToolOutput, ToolResultPart,
2218    };
2219    use agentkit_task_manager::{
2220        AsyncTaskManager, RoutingDecision, TaskEvent, TaskManager, TaskManagerHandle,
2221        TaskRoutingPolicy,
2222    };
2223    use agentkit_tools_core::{
2224        FileSystemPermissionRequest, PermissionCode, PermissionDecision, PermissionDenial, Tool,
2225        ToolAnnotations, ToolName, ToolResult, ToolSpec,
2226    };
2227    use serde_json::{Value, json};
2228    use tokio::sync::Notify;
2229    use tokio::time::{Duration, timeout};
2230
2231    use super::*;
2232
2233    struct FakeAdapter;
2234    struct SlowAdapter;
2235    struct RecordingAdapter {
2236        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2237        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2238    }
2239    struct MultiToolAdapter;
2240    struct DualApprovalAdapter;
2241
2242    struct FakeSession;
2243    struct SlowSession;
2244    struct RecordingSession {
2245        seen_descriptions: StdArc<StdMutex<Vec<Vec<String>>>>,
2246        seen_caches: StdArc<StdMutex<Vec<Option<PromptCacheRequest>>>>,
2247    }
2248    struct MultiToolSession;
2249    struct DualApprovalSession;
2250
2251    struct FakeTurn {
2252        events: VecDeque<ModelTurnEvent>,
2253    }
2254
2255    struct SlowTurn {
2256        emitted: bool,
2257    }
2258
2259    struct RecordingTurn {
2260        emitted: bool,
2261    }
2262    struct MultiToolTurn {
2263        events: VecDeque<ModelTurnEvent>,
2264    }
2265    struct DualApprovalTurn {
2266        events: VecDeque<ModelTurnEvent>,
2267    }
2268
2269    #[async_trait]
2270    impl ModelAdapter for FakeAdapter {
2271        type Session = FakeSession;
2272
2273        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2274            Ok(FakeSession)
2275        }
2276    }
2277
2278    #[async_trait]
2279    impl ModelAdapter for SlowAdapter {
2280        type Session = SlowSession;
2281
2282        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2283            Ok(SlowSession)
2284        }
2285    }
2286
2287    #[async_trait]
2288    impl ModelAdapter for RecordingAdapter {
2289        type Session = RecordingSession;
2290
2291        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2292            Ok(RecordingSession {
2293                seen_descriptions: self.seen_descriptions.clone(),
2294                seen_caches: self.seen_caches.clone(),
2295            })
2296        }
2297    }
2298
2299    #[async_trait]
2300    impl ModelAdapter for MultiToolAdapter {
2301        type Session = MultiToolSession;
2302
2303        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2304            Ok(MultiToolSession)
2305        }
2306    }
2307
2308    #[async_trait]
2309    impl ModelAdapter for DualApprovalAdapter {
2310        type Session = DualApprovalSession;
2311
2312        async fn start_session(&self, _config: SessionConfig) -> Result<Self::Session, LoopError> {
2313            Ok(DualApprovalSession)
2314        }
2315    }
2316
2317    #[async_trait]
2318    impl ModelSession for FakeSession {
2319        type Turn = FakeTurn;
2320
2321        async fn begin_turn(
2322            &mut self,
2323            request: TurnRequest,
2324            _cancellation: Option<TurnCancellation>,
2325        ) -> Result<Self::Turn, LoopError> {
2326            let has_tool_result = request.transcript.iter().any(|item| {
2327                item.kind == ItemKind::Tool
2328                    && item
2329                        .parts
2330                        .iter()
2331                        .any(|part| matches!(part, Part::ToolResult(_)))
2332            });
2333            let tool_name = request
2334                .available_tools
2335                .first()
2336                .map(|tool| tool.name.0.clone())
2337                .unwrap_or_else(|| "echo".into());
2338
2339            let events = if has_tool_result {
2340                let result_text = request
2341                    .transcript
2342                    .iter()
2343                    .rev()
2344                    .find_map(|item| {
2345                        item.parts.iter().find_map(|part| match part {
2346                            Part::ToolResult(ToolResultPart {
2347                                output: ToolOutput::Text(text),
2348                                ..
2349                            }) => Some(text.clone()),
2350                            _ => None,
2351                        })
2352                    })
2353                    .unwrap_or_else(|| "missing".into());
2354
2355                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2356                    finish_reason: FinishReason::Completed,
2357                    output_items: vec![Item {
2358                        id: None,
2359                        kind: ItemKind::Assistant,
2360                        parts: vec![Part::Text(TextPart {
2361                            text: format!("tool said: {result_text}"),
2362                            metadata: MetadataMap::new(),
2363                        })],
2364                        metadata: MetadataMap::new(),
2365                    }],
2366                    usage: None,
2367                    metadata: MetadataMap::new(),
2368                })])
2369            } else {
2370                VecDeque::from([
2371                    ModelTurnEvent::ToolCall(agentkit_core::ToolCallPart {
2372                        id: ToolCallId::new("call-1"),
2373                        name: tool_name.clone(),
2374                        input: json!({ "value": "pong" }),
2375                        metadata: MetadataMap::new(),
2376                    }),
2377                    ModelTurnEvent::Finished(ModelTurnResult {
2378                        finish_reason: FinishReason::ToolCall,
2379                        output_items: vec![Item {
2380                            id: None,
2381                            kind: ItemKind::Assistant,
2382                            parts: vec![Part::ToolCall(agentkit_core::ToolCallPart {
2383                                id: ToolCallId::new("call-1"),
2384                                name: tool_name,
2385                                input: json!({ "value": "pong" }),
2386                                metadata: MetadataMap::new(),
2387                            })],
2388                            metadata: MetadataMap::new(),
2389                        }],
2390                        usage: None,
2391                        metadata: MetadataMap::new(),
2392                    }),
2393                ])
2394            };
2395
2396            Ok(FakeTurn { events })
2397        }
2398    }
2399
2400    #[async_trait]
2401    impl ModelSession for SlowSession {
2402        type Turn = SlowTurn;
2403
2404        async fn begin_turn(
2405            &mut self,
2406            request: TurnRequest,
2407            cancellation: Option<TurnCancellation>,
2408        ) -> Result<Self::Turn, LoopError> {
2409            let should_block = request
2410                .transcript
2411                .iter()
2412                .rev()
2413                .find(|item| item.kind == ItemKind::User)
2414                .is_some_and(|item| {
2415                    item.parts.iter().any(|part| match part {
2416                        Part::Text(text) => text.text == "do the long task",
2417                        _ => false,
2418                    })
2419                });
2420
2421            if should_block && let Some(cancellation) = cancellation {
2422                cancellation.cancelled().await;
2423                return Err(LoopError::Cancelled);
2424            }
2425
2426            Ok(SlowTurn { emitted: false })
2427        }
2428    }
2429
2430    #[async_trait]
2431    impl ModelSession for RecordingSession {
2432        type Turn = RecordingTurn;
2433
2434        async fn begin_turn(
2435            &mut self,
2436            request: TurnRequest,
2437            _cancellation: Option<TurnCancellation>,
2438        ) -> Result<Self::Turn, LoopError> {
2439            let descriptions = request
2440                .available_tools
2441                .iter()
2442                .map(|tool| tool.description.clone())
2443                .collect::<Vec<_>>();
2444            self.seen_descriptions.lock().unwrap().push(descriptions);
2445            self.seen_caches.lock().unwrap().push(request.cache.clone());
2446
2447            Ok(RecordingTurn { emitted: false })
2448        }
2449    }
2450
2451    #[async_trait]
2452    impl ModelSession for MultiToolSession {
2453        type Turn = MultiToolTurn;
2454
2455        async fn begin_turn(
2456            &mut self,
2457            request: TurnRequest,
2458            _cancellation: Option<TurnCancellation>,
2459        ) -> Result<Self::Turn, LoopError> {
2460            let has_tool_result = request.transcript.iter().any(|item| {
2461                item.kind == ItemKind::Tool
2462                    && item
2463                        .parts
2464                        .iter()
2465                        .any(|part| matches!(part, Part::ToolResult(_)))
2466            });
2467
2468            let events = if has_tool_result {
2469                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2470                    finish_reason: FinishReason::Completed,
2471                    output_items: vec![Item {
2472                        id: None,
2473                        kind: ItemKind::Assistant,
2474                        parts: vec![Part::Text(TextPart {
2475                            text: "mixed tools finished".into(),
2476                            metadata: MetadataMap::new(),
2477                        })],
2478                        metadata: MetadataMap::new(),
2479                    }],
2480                    usage: None,
2481                    metadata: MetadataMap::new(),
2482                })])
2483            } else {
2484                let foreground = agentkit_core::ToolCallPart {
2485                    id: ToolCallId::new("call-foreground"),
2486                    name: "foreground-wait".into(),
2487                    input: json!({}),
2488                    metadata: MetadataMap::new(),
2489                };
2490                let background = agentkit_core::ToolCallPart {
2491                    id: ToolCallId::new("call-background"),
2492                    name: "background-wait".into(),
2493                    input: json!({}),
2494                    metadata: MetadataMap::new(),
2495                };
2496                VecDeque::from([
2497                    ModelTurnEvent::ToolCall(foreground.clone()),
2498                    ModelTurnEvent::ToolCall(background.clone()),
2499                    ModelTurnEvent::Finished(ModelTurnResult {
2500                        finish_reason: FinishReason::ToolCall,
2501                        output_items: vec![Item {
2502                            id: None,
2503                            kind: ItemKind::Assistant,
2504                            parts: vec![Part::ToolCall(foreground), Part::ToolCall(background)],
2505                            metadata: MetadataMap::new(),
2506                        }],
2507                        usage: None,
2508                        metadata: MetadataMap::new(),
2509                    }),
2510                ])
2511            };
2512
2513            Ok(MultiToolTurn { events })
2514        }
2515    }
2516
2517    #[async_trait]
2518    impl ModelSession for DualApprovalSession {
2519        type Turn = DualApprovalTurn;
2520
2521        async fn begin_turn(
2522            &mut self,
2523            request: TurnRequest,
2524            _cancellation: Option<TurnCancellation>,
2525        ) -> Result<Self::Turn, LoopError> {
2526            let tool_results = request
2527                .transcript
2528                .iter()
2529                .flat_map(|item| item.parts.iter())
2530                .filter(|part| matches!(part, Part::ToolResult(_)))
2531                .count();
2532
2533            let events = if tool_results >= 2 {
2534                VecDeque::from([ModelTurnEvent::Finished(ModelTurnResult {
2535                    finish_reason: FinishReason::Completed,
2536                    output_items: vec![Item {
2537                        id: None,
2538                        kind: ItemKind::Assistant,
2539                        parts: vec![Part::Text(TextPart {
2540                            text: "both approvals finished".into(),
2541                            metadata: MetadataMap::new(),
2542                        })],
2543                        metadata: MetadataMap::new(),
2544                    }],
2545                    usage: None,
2546                    metadata: MetadataMap::new(),
2547                })])
2548            } else {
2549                let first = agentkit_core::ToolCallPart {
2550                    id: ToolCallId::new("call-1"),
2551                    name: "echo".into(),
2552                    input: json!({ "value": "first" }),
2553                    metadata: MetadataMap::new(),
2554                };
2555                let second = agentkit_core::ToolCallPart {
2556                    id: ToolCallId::new("call-2"),
2557                    name: "echo".into(),
2558                    input: json!({ "value": "second" }),
2559                    metadata: MetadataMap::new(),
2560                };
2561                VecDeque::from([
2562                    ModelTurnEvent::ToolCall(first.clone()),
2563                    ModelTurnEvent::ToolCall(second.clone()),
2564                    ModelTurnEvent::Finished(ModelTurnResult {
2565                        finish_reason: FinishReason::ToolCall,
2566                        output_items: vec![Item {
2567                            id: None,
2568                            kind: ItemKind::Assistant,
2569                            parts: vec![Part::ToolCall(first), Part::ToolCall(second)],
2570                            metadata: MetadataMap::new(),
2571                        }],
2572                        usage: None,
2573                        metadata: MetadataMap::new(),
2574                    }),
2575                ])
2576            };
2577
2578            Ok(DualApprovalTurn { events })
2579        }
2580    }
2581
2582    #[async_trait]
2583    impl ModelTurn for FakeTurn {
2584        async fn next_event(
2585            &mut self,
2586            _cancellation: Option<TurnCancellation>,
2587        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2588            Ok(self.events.pop_front())
2589        }
2590    }
2591
2592    #[async_trait]
2593    impl ModelTurn for SlowTurn {
2594        async fn next_event(
2595            &mut self,
2596            cancellation: Option<TurnCancellation>,
2597        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2598            if let Some(cancellation) = cancellation
2599                && cancellation.is_cancelled()
2600            {
2601                return Err(LoopError::Cancelled);
2602            }
2603
2604            if self.emitted {
2605                Ok(None)
2606            } else {
2607                self.emitted = true;
2608                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2609                    finish_reason: FinishReason::Completed,
2610                    output_items: vec![Item {
2611                        id: None,
2612                        kind: ItemKind::Assistant,
2613                        parts: vec![Part::Text(TextPart {
2614                            text: "done".into(),
2615                            metadata: MetadataMap::new(),
2616                        })],
2617                        metadata: MetadataMap::new(),
2618                    }],
2619                    usage: None,
2620                    metadata: MetadataMap::new(),
2621                })))
2622            }
2623        }
2624    }
2625
2626    #[async_trait]
2627    impl ModelTurn for RecordingTurn {
2628        async fn next_event(
2629            &mut self,
2630            _cancellation: Option<TurnCancellation>,
2631        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2632            if self.emitted {
2633                Ok(None)
2634            } else {
2635                self.emitted = true;
2636                Ok(Some(ModelTurnEvent::Finished(ModelTurnResult {
2637                    finish_reason: FinishReason::Completed,
2638                    output_items: vec![Item {
2639                        id: None,
2640                        kind: ItemKind::Assistant,
2641                        parts: vec![Part::Text(TextPart {
2642                            text: "done".into(),
2643                            metadata: MetadataMap::new(),
2644                        })],
2645                        metadata: MetadataMap::new(),
2646                    }],
2647                    usage: None,
2648                    metadata: MetadataMap::new(),
2649                })))
2650            }
2651        }
2652    }
2653
2654    #[async_trait]
2655    impl ModelTurn for MultiToolTurn {
2656        async fn next_event(
2657            &mut self,
2658            _cancellation: Option<TurnCancellation>,
2659        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2660            Ok(self.events.pop_front())
2661        }
2662    }
2663
2664    #[async_trait]
2665    impl ModelTurn for DualApprovalTurn {
2666        async fn next_event(
2667            &mut self,
2668            _cancellation: Option<TurnCancellation>,
2669        ) -> Result<Option<ModelTurnEvent>, LoopError> {
2670            Ok(self.events.pop_front())
2671        }
2672    }
2673
2674    #[derive(Clone)]
2675    struct EchoTool {
2676        spec: ToolSpec,
2677    }
2678
2679    impl Default for EchoTool {
2680        fn default() -> Self {
2681            Self {
2682                spec: ToolSpec {
2683                    name: ToolName::new("echo"),
2684                    description: "Echo back a value".into(),
2685                    input_schema: json!({
2686                        "type": "object",
2687                        "properties": {
2688                            "value": { "type": "string" }
2689                        },
2690                        "required": ["value"],
2691                        "additionalProperties": false
2692                    }),
2693                    annotations: ToolAnnotations::default(),
2694                    metadata: MetadataMap::new(),
2695                },
2696            }
2697        }
2698    }
2699
2700    #[derive(Clone)]
2701    struct DynamicSpecTool {
2702        spec: ToolSpec,
2703        version: StdArc<AtomicUsize>,
2704    }
2705
2706    impl DynamicSpecTool {
2707        fn new(version: StdArc<AtomicUsize>) -> Self {
2708            Self {
2709                spec: ToolSpec {
2710                    name: ToolName::new("dynamic"),
2711                    description: "dynamic version 0".into(),
2712                    input_schema: json!({
2713                        "type": "object",
2714                        "properties": {},
2715                        "additionalProperties": false
2716                    }),
2717                    annotations: ToolAnnotations::default(),
2718                    metadata: MetadataMap::new(),
2719                },
2720                version,
2721            }
2722        }
2723    }
2724
2725    #[async_trait]
2726    impl Tool for EchoTool {
2727        fn spec(&self) -> &ToolSpec {
2728            &self.spec
2729        }
2730
2731        fn proposed_requests(
2732            &self,
2733            request: &agentkit_tools_core::ToolRequest,
2734        ) -> Result<
2735            Vec<Box<dyn agentkit_tools_core::PermissionRequest>>,
2736            agentkit_tools_core::ToolError,
2737        > {
2738            Ok(vec![Box::new(FileSystemPermissionRequest::Read {
2739                path: "/tmp/echo".into(),
2740                metadata: request.metadata.clone(),
2741            })])
2742        }
2743
2744        async fn invoke(
2745            &self,
2746            request: agentkit_tools_core::ToolRequest,
2747            _ctx: &mut ToolContext<'_>,
2748        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2749            let value = request
2750                .input
2751                .get("value")
2752                .and_then(Value::as_str)
2753                .ok_or_else(|| {
2754                    agentkit_tools_core::ToolError::InvalidInput("missing value".into())
2755                })?;
2756
2757            Ok(ToolResult {
2758                result: ToolResultPart {
2759                    call_id: request.call_id,
2760                    output: ToolOutput::Text(value.into()),
2761                    is_error: false,
2762                    metadata: MetadataMap::new(),
2763                },
2764                duration: None,
2765                metadata: MetadataMap::new(),
2766            })
2767        }
2768    }
2769
2770    #[async_trait]
2771    impl Tool for DynamicSpecTool {
2772        fn spec(&self) -> &ToolSpec {
2773            &self.spec
2774        }
2775
2776        fn current_spec(&self) -> Option<ToolSpec> {
2777            let mut spec = self.spec.clone();
2778            spec.description = format!("dynamic version {}", self.version.load(Ordering::SeqCst));
2779            Some(spec)
2780        }
2781
2782        async fn invoke(
2783            &self,
2784            request: agentkit_tools_core::ToolRequest,
2785            _ctx: &mut ToolContext<'_>,
2786        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2787            Ok(ToolResult {
2788                result: ToolResultPart {
2789                    call_id: request.call_id,
2790                    output: ToolOutput::Text("ok".into()),
2791                    is_error: false,
2792                    metadata: MetadataMap::new(),
2793                },
2794                duration: None,
2795                metadata: MetadataMap::new(),
2796            })
2797        }
2798    }
2799
2800    struct DenyFsReads;
2801
2802    impl PermissionChecker for DenyFsReads {
2803        fn evaluate(
2804            &self,
2805            request: &dyn agentkit_tools_core::PermissionRequest,
2806        ) -> PermissionDecision {
2807            if request.kind() == "filesystem.read" {
2808                return PermissionDecision::Deny(PermissionDenial {
2809                    code: PermissionCode::PathNotAllowed,
2810                    message: "reads denied in test".into(),
2811                    metadata: MetadataMap::new(),
2812                });
2813            }
2814
2815            PermissionDecision::Allow
2816        }
2817    }
2818
2819    struct ApproveFsReads;
2820
2821    impl PermissionChecker for ApproveFsReads {
2822        fn evaluate(
2823            &self,
2824            request: &dyn agentkit_tools_core::PermissionRequest,
2825        ) -> PermissionDecision {
2826            if request.kind() == "filesystem.read" {
2827                return PermissionDecision::RequireApproval(ApprovalRequest {
2828                    task_id: None,
2829                    call_id: None,
2830                    id: "approval:fs-read".into(),
2831                    request_kind: request.kind().into(),
2832                    reason: agentkit_tools_core::ApprovalReason::SensitivePath,
2833                    summary: request.summary(),
2834                    metadata: request.metadata().clone(),
2835                });
2836            }
2837
2838            PermissionDecision::Allow
2839        }
2840    }
2841
2842    struct CountTrigger;
2843
2844    impl CompactionTrigger for CountTrigger {
2845        fn should_compact(
2846            &self,
2847            _session_id: &SessionId,
2848            _turn_id: Option<&agentkit_core::TurnId>,
2849            transcript: &[Item],
2850        ) -> Option<agentkit_compaction::CompactionReason> {
2851            (transcript.len() >= 2)
2852                .then_some(agentkit_compaction::CompactionReason::TranscriptTooLong)
2853        }
2854    }
2855
2856    struct RecordingObserver {
2857        events: StdArc<StdMutex<Vec<AgentEvent>>>,
2858    }
2859
2860    impl LoopObserver for RecordingObserver {
2861        fn handle_event(&mut self, event: AgentEvent) {
2862            self.events.lock().unwrap().push(event);
2863        }
2864    }
2865
2866    #[derive(Clone)]
2867    struct AuthTool {
2868        spec: ToolSpec,
2869    }
2870
2871    impl Default for AuthTool {
2872        fn default() -> Self {
2873            Self {
2874                spec: ToolSpec {
2875                    name: ToolName::new("auth-tool"),
2876                    description: "Always requires auth".into(),
2877                    input_schema: json!({
2878                        "type": "object",
2879                        "properties": {},
2880                        "additionalProperties": false
2881                    }),
2882                    annotations: ToolAnnotations::default(),
2883                    metadata: MetadataMap::new(),
2884                },
2885            }
2886        }
2887    }
2888
2889    #[async_trait]
2890    impl Tool for AuthTool {
2891        fn spec(&self) -> &ToolSpec {
2892            &self.spec
2893        }
2894
2895        async fn invoke(
2896            &self,
2897            request: agentkit_tools_core::ToolRequest,
2898            _ctx: &mut ToolContext<'_>,
2899        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2900            let mut challenge = MetadataMap::new();
2901            challenge.insert("server_id".into(), json!("mock"));
2902            challenge.insert("scope".into(), json!("secret.read"));
2903
2904            Err(agentkit_tools_core::ToolError::AuthRequired(Box::new(
2905                AuthRequest {
2906                    task_id: None,
2907                    id: "auth-1".into(),
2908                    provider: "mcp.mock".into(),
2909                    operation: AuthOperation::ToolCall {
2910                        tool_name: request.tool_name.0,
2911                        input: request.input,
2912                        call_id: Some(request.call_id),
2913                        session_id: Some(request.session_id),
2914                        turn_id: Some(request.turn_id),
2915                        metadata: request.metadata,
2916                    },
2917                    challenge,
2918                },
2919            )))
2920        }
2921    }
2922
2923    #[derive(Clone)]
2924    struct BlockingTool {
2925        spec: ToolSpec,
2926        entered: StdArc<AtomicBool>,
2927        release: StdArc<Notify>,
2928        output: &'static str,
2929    }
2930
2931    impl BlockingTool {
2932        fn new(
2933            name: &str,
2934            entered: StdArc<AtomicBool>,
2935            release: StdArc<Notify>,
2936            output: &'static str,
2937        ) -> Self {
2938            Self {
2939                spec: ToolSpec {
2940                    name: ToolName::new(name),
2941                    description: format!("blocking tool {name}"),
2942                    input_schema: json!({
2943                        "type": "object",
2944                        "properties": {},
2945                        "additionalProperties": false
2946                    }),
2947                    annotations: ToolAnnotations::default(),
2948                    metadata: MetadataMap::new(),
2949                },
2950                entered,
2951                release,
2952                output,
2953            }
2954        }
2955    }
2956
2957    #[async_trait]
2958    impl Tool for BlockingTool {
2959        fn spec(&self) -> &ToolSpec {
2960            &self.spec
2961        }
2962
2963        async fn invoke(
2964            &self,
2965            request: agentkit_tools_core::ToolRequest,
2966            _ctx: &mut ToolContext<'_>,
2967        ) -> Result<ToolResult, agentkit_tools_core::ToolError> {
2968            self.entered.store(true, Ordering::SeqCst);
2969            self.release.notified().await;
2970            Ok(ToolResult {
2971                result: ToolResultPart {
2972                    call_id: request.call_id,
2973                    output: ToolOutput::Text(self.output.into()),
2974                    is_error: false,
2975                    metadata: MetadataMap::new(),
2976                },
2977                duration: None,
2978                metadata: MetadataMap::new(),
2979            })
2980        }
2981    }
2982
2983    struct NameRoutingPolicy {
2984        routes: Vec<(String, RoutingDecision)>,
2985    }
2986
2987    impl NameRoutingPolicy {
2988        fn new(routes: impl IntoIterator<Item = (impl Into<String>, RoutingDecision)>) -> Self {
2989            Self {
2990                routes: routes
2991                    .into_iter()
2992                    .map(|(name, decision)| (name.into(), decision))
2993                    .collect(),
2994            }
2995        }
2996    }
2997
2998    impl TaskRoutingPolicy for NameRoutingPolicy {
2999        fn route(&self, request: &ToolRequest) -> RoutingDecision {
3000            self.routes
3001                .iter()
3002                .find(|(name, _)| name == &request.tool_name.0)
3003                .map(|(_, decision)| *decision)
3004                .unwrap_or(RoutingDecision::Foreground)
3005        }
3006    }
3007
3008    async fn wait_for_task_event(handle: &TaskManagerHandle) -> TaskEvent {
3009        timeout(Duration::from_secs(1), handle.next_event())
3010            .await
3011            .expect("timed out waiting for task event")
3012            .expect("task event stream ended unexpectedly")
3013    }
3014
3015    async fn wait_until_entered(flag: &AtomicBool) {
3016        timeout(Duration::from_secs(1), async {
3017            while !flag.load(Ordering::SeqCst) {
3018                tokio::task::yield_now().await;
3019            }
3020        })
3021        .await
3022        .expect("task never entered execution");
3023    }
3024
3025    #[tokio::test]
3026    async fn loop_continues_after_completed_tool_call() {
3027        let tools = ToolRegistry::new().with(EchoTool::default());
3028        let agent = Agent::builder()
3029            .model(FakeAdapter)
3030            .tools(tools)
3031            .permissions(AllowAllPermissions)
3032            .build()
3033            .unwrap();
3034
3035        let mut driver = agent
3036            .start(SessionConfig {
3037                session_id: SessionId::new("session-1"),
3038                metadata: MetadataMap::new(),
3039                cache: None,
3040            })
3041            .await
3042            .unwrap();
3043
3044        driver
3045            .submit_input(vec![Item {
3046                id: None,
3047                kind: ItemKind::User,
3048                parts: vec![Part::Text(TextPart {
3049                    text: "ping".into(),
3050                    metadata: MetadataMap::new(),
3051                })],
3052                metadata: MetadataMap::new(),
3053            }])
3054            .unwrap();
3055
3056        let result = run_until_finished(&mut driver).await;
3057
3058        match result {
3059            LoopStep::Finished(turn) => {
3060                assert_eq!(turn.finish_reason, FinishReason::Completed);
3061                assert_eq!(turn.items.len(), 1);
3062                match &turn.items[0].parts[0] {
3063                    Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3064                    other => panic!("unexpected part: {other:?}"),
3065                }
3066            }
3067            other => panic!("unexpected loop step: {other:?}"),
3068        }
3069    }
3070
3071    /// Test helper: drives the loop, transparently resuming non-blocking
3072    /// cooperative interrupts (AfterToolResult), until a terminal step or a
3073    /// blocking interrupt is reached.
3074    async fn run_until_finished<S: ModelSession + Send>(driver: &mut LoopDriver<S>) -> LoopStep {
3075        loop {
3076            match driver.next().await.unwrap() {
3077                LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
3078                step => return step,
3079            }
3080        }
3081    }
3082
3083    #[tokio::test]
3084    async fn loop_uses_injected_permission_checker() {
3085        let tools = ToolRegistry::new().with(EchoTool::default());
3086        let agent = Agent::builder()
3087            .model(FakeAdapter)
3088            .tools(tools)
3089            .permissions(DenyFsReads)
3090            .build()
3091            .unwrap();
3092
3093        let mut driver = agent
3094            .start(SessionConfig {
3095                session_id: SessionId::new("session-2"),
3096                metadata: MetadataMap::new(),
3097                cache: None,
3098            })
3099            .await
3100            .unwrap();
3101
3102        driver
3103            .submit_input(vec![Item {
3104                id: None,
3105                kind: ItemKind::User,
3106                parts: vec![Part::Text(TextPart {
3107                    text: "ping".into(),
3108                    metadata: MetadataMap::new(),
3109                })],
3110                metadata: MetadataMap::new(),
3111            }])
3112            .unwrap();
3113
3114        let result = run_until_finished(&mut driver).await;
3115
3116        match result {
3117            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3118                Part::Text(text) => assert!(text.text.contains("tool permission denied")),
3119                other => panic!("unexpected part: {other:?}"),
3120            },
3121            other => panic!("unexpected loop step: {other:?}"),
3122        }
3123    }
3124
3125    #[tokio::test]
3126    async fn loop_surfaces_auth_interruptions_from_tools() {
3127        let tools = ToolRegistry::new().with(AuthTool::default());
3128        let agent = Agent::builder()
3129            .model(FakeAdapter)
3130            .tools(tools)
3131            .permissions(AllowAllPermissions)
3132            .build()
3133            .unwrap();
3134
3135        let mut driver = agent
3136            .start(SessionConfig {
3137                session_id: SessionId::new("session-3"),
3138                metadata: MetadataMap::new(),
3139                cache: None,
3140            })
3141            .await
3142            .unwrap();
3143
3144        driver
3145            .submit_input(vec![Item {
3146                id: None,
3147                kind: ItemKind::User,
3148                parts: vec![Part::Text(TextPart {
3149                    text: "ping".into(),
3150                    metadata: MetadataMap::new(),
3151                })],
3152                metadata: MetadataMap::new(),
3153            }])
3154            .unwrap();
3155
3156        let result = driver.next().await.unwrap();
3157
3158        match result {
3159            LoopStep::Interrupt(LoopInterrupt::AuthRequest(pending)) => {
3160                let request = &pending.request;
3161                assert!(request.task_id.is_some());
3162                assert_eq!(request.provider, "mcp.mock");
3163                assert_eq!(request.challenge.get("scope"), Some(&json!("secret.read")));
3164                match &request.operation {
3165                    AuthOperation::ToolCall { tool_name, .. } => {
3166                        assert_eq!(tool_name, "auth-tool");
3167                    }
3168                    other => panic!("unexpected auth operation: {other:?}"),
3169                }
3170            }
3171            other => panic!("unexpected loop step: {other:?}"),
3172        }
3173    }
3174
3175    #[tokio::test]
3176    async fn async_task_manager_background_round_requires_explicit_continue() {
3177        let entered = StdArc::new(AtomicBool::new(false));
3178        let release = StdArc::new(Notify::new());
3179        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([(
3180            "background-wait",
3181            RoutingDecision::Background,
3182        )]));
3183        let handle = task_manager.handle();
3184        let tools = ToolRegistry::new().with(BlockingTool::new(
3185            "background-wait",
3186            entered.clone(),
3187            release.clone(),
3188            "background-done",
3189        ));
3190        let agent = Agent::builder()
3191            .model(FakeAdapter)
3192            .tools(tools)
3193            .permissions(AllowAllPermissions)
3194            .task_manager(task_manager)
3195            .build()
3196            .unwrap();
3197
3198        let mut driver = agent
3199            .start(SessionConfig {
3200                session_id: SessionId::new("session-background"),
3201                metadata: MetadataMap::new(),
3202                cache: None,
3203            })
3204            .await
3205            .unwrap();
3206
3207        driver
3208            .submit_input(vec![Item {
3209                id: None,
3210                kind: ItemKind::User,
3211                parts: vec![Part::Text(TextPart {
3212                    text: "ping".into(),
3213                    metadata: MetadataMap::new(),
3214                })],
3215                metadata: MetadataMap::new(),
3216            }])
3217            .unwrap();
3218
3219        let first = driver.next().await.unwrap();
3220        match first {
3221            LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {}
3222            other => panic!("unexpected first loop step: {other:?}"),
3223        }
3224
3225        match wait_for_task_event(&handle).await {
3226            TaskEvent::Started(snapshot) => assert_eq!(snapshot.tool_name, "background-wait"),
3227            other => panic!("unexpected task event: {other:?}"),
3228        }
3229        wait_until_entered(entered.as_ref()).await;
3230        release.notify_waiters();
3231
3232        match wait_for_task_event(&handle).await {
3233            TaskEvent::Completed(_, result) => {
3234                assert_eq!(result.output, ToolOutput::Text("background-done".into()))
3235            }
3236            other => panic!("unexpected completion event: {other:?}"),
3237        }
3238
3239        let resumed = driver.next().await.unwrap();
3240        match resumed {
3241            LoopStep::Finished(turn) => {
3242                assert_eq!(turn.finish_reason, FinishReason::Completed);
3243                match &turn.items[0].parts[0] {
3244                    Part::Text(text) => assert_eq!(text.text, "tool said: background-done"),
3245                    other => panic!("unexpected part after resume: {other:?}"),
3246                }
3247            }
3248            other => panic!("unexpected resumed step: {other:?}"),
3249        }
3250    }
3251
3252    #[tokio::test]
3253    async fn loop_can_cancel_a_turn_and_continue_after_new_input() {
3254        let controller = CancellationController::new();
3255        let agent = Agent::builder()
3256            .model(SlowAdapter)
3257            .cancellation(controller.handle())
3258            .build()
3259            .unwrap();
3260
3261        let mut driver = agent
3262            .start(SessionConfig {
3263                session_id: SessionId::new("session-cancel"),
3264                metadata: MetadataMap::new(),
3265                cache: None,
3266            })
3267            .await
3268            .unwrap();
3269
3270        driver
3271            .submit_input(vec![Item {
3272                id: None,
3273                kind: ItemKind::User,
3274                parts: vec![Part::Text(TextPart {
3275                    text: "do the long task".into(),
3276                    metadata: MetadataMap::new(),
3277                })],
3278                metadata: MetadataMap::new(),
3279            }])
3280            .unwrap();
3281
3282        let cancelled = tokio::join!(async { driver.next().await }, async {
3283            tokio::task::yield_now().await;
3284            controller.interrupt();
3285        })
3286        .0
3287        .unwrap();
3288
3289        match cancelled {
3290            LoopStep::Finished(turn) => {
3291                assert_eq!(turn.finish_reason, FinishReason::Cancelled);
3292                assert_eq!(turn.items.len(), 1);
3293                assert_eq!(turn.items[0].kind, ItemKind::Assistant);
3294                assert_eq!(
3295                    turn.items[0].metadata.get(INTERRUPTED_METADATA_KEY),
3296                    Some(&Value::Bool(true))
3297                );
3298            }
3299            other => panic!("unexpected loop step: {other:?}"),
3300        }
3301
3302        driver
3303            .submit_input(vec![Item {
3304                id: None,
3305                kind: ItemKind::User,
3306                parts: vec![Part::Text(TextPart {
3307                    text: "try again".into(),
3308                    metadata: MetadataMap::new(),
3309                })],
3310                metadata: MetadataMap::new(),
3311            }])
3312            .unwrap();
3313
3314        let result = driver.next().await.unwrap();
3315        match result {
3316            LoopStep::Finished(turn) => {
3317                assert_eq!(turn.finish_reason, FinishReason::Completed);
3318            }
3319            other => panic!("unexpected loop step after retry: {other:?}"),
3320        }
3321    }
3322
3323    #[tokio::test]
3324    async fn loop_interrupt_cancels_foreground_tasks_but_keeps_background_tasks_running() {
3325        let controller = CancellationController::new();
3326        let fg_entered = StdArc::new(AtomicBool::new(false));
3327        let fg_release = StdArc::new(Notify::new());
3328        let bg_entered = StdArc::new(AtomicBool::new(false));
3329        let bg_release = StdArc::new(Notify::new());
3330        let task_manager = AsyncTaskManager::new().routing(NameRoutingPolicy::new([
3331            ("foreground-wait", RoutingDecision::Foreground),
3332            ("background-wait", RoutingDecision::Background),
3333        ]));
3334        let handle = task_manager.handle();
3335        let tools = ToolRegistry::new()
3336            .with(BlockingTool::new(
3337                "foreground-wait",
3338                fg_entered.clone(),
3339                fg_release,
3340                "foreground-done",
3341            ))
3342            .with(BlockingTool::new(
3343                "background-wait",
3344                bg_entered.clone(),
3345                bg_release.clone(),
3346                "background-done",
3347            ));
3348        let agent = Agent::builder()
3349            .model(MultiToolAdapter)
3350            .tools(tools)
3351            .permissions(AllowAllPermissions)
3352            .cancellation(controller.handle())
3353            .task_manager(task_manager)
3354            .build()
3355            .unwrap();
3356
3357        let mut driver = agent
3358            .start(SessionConfig {
3359                session_id: SessionId::new("session-mixed-cancel"),
3360                metadata: MetadataMap::new(),
3361                cache: None,
3362            })
3363            .await
3364            .unwrap();
3365
3366        driver
3367            .submit_input(vec![Item {
3368                id: None,
3369                kind: ItemKind::User,
3370                parts: vec![Part::Text(TextPart {
3371                    text: "run both".into(),
3372                    metadata: MetadataMap::new(),
3373                })],
3374                metadata: MetadataMap::new(),
3375            }])
3376            .unwrap();
3377
3378        let cancelled = tokio::join!(async { driver.next().await }, async {
3379            let _ = wait_for_task_event(&handle).await;
3380            let _ = wait_for_task_event(&handle).await;
3381            wait_until_entered(fg_entered.as_ref()).await;
3382            wait_until_entered(bg_entered.as_ref()).await;
3383            controller.interrupt();
3384        })
3385        .0
3386        .unwrap();
3387
3388        match cancelled {
3389            LoopStep::Finished(turn) => assert_eq!(turn.finish_reason, FinishReason::Cancelled),
3390            other => panic!("unexpected loop step after interrupt: {other:?}"),
3391        }
3392
3393        match wait_for_task_event(&handle).await {
3394            TaskEvent::Cancelled(snapshot) => assert_eq!(snapshot.tool_name, "foreground-wait"),
3395            other => panic!("unexpected post-interrupt event: {other:?}"),
3396        }
3397
3398        let running = handle.list_running().await;
3399        assert_eq!(running.len(), 1);
3400        assert_eq!(running[0].tool_name, "background-wait");
3401
3402        bg_release.notify_waiters();
3403        match wait_for_task_event(&handle).await {
3404            TaskEvent::Completed(snapshot, result) => {
3405                assert_eq!(snapshot.tool_name, "background-wait");
3406                assert_eq!(result.output, ToolOutput::Text("background-done".into()));
3407            }
3408            other => panic!("unexpected background completion event: {other:?}"),
3409        }
3410    }
3411
3412    #[tokio::test]
3413    async fn loop_resumes_after_approved_tool_request() {
3414        let tools = ToolRegistry::new().with(EchoTool::default());
3415        let agent = Agent::builder()
3416            .model(FakeAdapter)
3417            .tools(tools)
3418            .permissions(ApproveFsReads)
3419            .build()
3420            .unwrap();
3421
3422        let mut driver = agent
3423            .start(SessionConfig {
3424                session_id: SessionId::new("session-approval"),
3425                metadata: MetadataMap::new(),
3426                cache: None,
3427            })
3428            .await
3429            .unwrap();
3430
3431        driver
3432            .submit_input(vec![Item {
3433                id: None,
3434                kind: ItemKind::User,
3435                parts: vec![Part::Text(TextPart {
3436                    text: "ping".into(),
3437                    metadata: MetadataMap::new(),
3438                })],
3439                metadata: MetadataMap::new(),
3440            }])
3441            .unwrap();
3442
3443        let first = driver.next().await.unwrap();
3444        match first {
3445            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3446                assert!(pending.request.task_id.is_some());
3447                assert_eq!(pending.request.id.0, "approval:fs-read");
3448                pending.approve(&mut driver).unwrap();
3449            }
3450            other => panic!("unexpected loop step: {other:?}"),
3451        }
3452        let second = driver.next().await.unwrap();
3453        match second {
3454            LoopStep::Finished(turn) => match &turn.items[0].parts[0] {
3455                Part::Text(text) => assert_eq!(text.text, "tool said: pong"),
3456                other => panic!("unexpected part: {other:?}"),
3457            },
3458            other => panic!("unexpected loop step after approval: {other:?}"),
3459        }
3460    }
3461
3462    #[tokio::test]
3463    async fn loop_tracks_multiple_pending_approvals_by_call_id() {
3464        let tools = ToolRegistry::new().with(EchoTool::default());
3465        let agent = Agent::builder()
3466            .model(DualApprovalAdapter)
3467            .tools(tools)
3468            .permissions(ApproveFsReads)
3469            .build()
3470            .unwrap();
3471
3472        let mut driver = agent
3473            .start(SessionConfig {
3474                session_id: SessionId::new("session-dual-approval"),
3475                metadata: MetadataMap::new(),
3476                cache: None,
3477            })
3478            .await
3479            .unwrap();
3480
3481        driver
3482            .submit_input(vec![Item {
3483                id: None,
3484                kind: ItemKind::User,
3485                parts: vec![Part::Text(TextPart {
3486                    text: "run both approvals".into(),
3487                    metadata: MetadataMap::new(),
3488                })],
3489                metadata: MetadataMap::new(),
3490            }])
3491            .unwrap();
3492
3493        let pending_first = match driver.next().await.unwrap() {
3494            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3495                assert_eq!(
3496                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3497                    Some("call-1")
3498                );
3499                pending
3500            }
3501            other => panic!("unexpected first loop step: {other:?}"),
3502        };
3503
3504        let pending_second = match driver.next().await.unwrap() {
3505            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3506                assert_eq!(
3507                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3508                    Some("call-2")
3509                );
3510                pending
3511            }
3512            other => panic!("unexpected second loop step: {other:?}"),
3513        };
3514
3515        pending_second.approve(&mut driver).unwrap();
3516        match driver.next().await.unwrap() {
3517            LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(pending)) => {
3518                assert_eq!(
3519                    pending.request.call_id.as_ref().map(|id| id.0.as_str()),
3520                    Some("call-1")
3521                );
3522            }
3523            other => panic!("unexpected step after approving second request: {other:?}"),
3524        }
3525
3526        pending_first.approve(&mut driver).unwrap();
3527        match driver.next().await.unwrap() {
3528            LoopStep::Finished(turn) => {
3529                assert_eq!(turn.finish_reason, FinishReason::Completed);
3530                match &turn.items[0].parts[0] {
3531                    Part::Text(text) => assert_eq!(text.text, "both approvals finished"),
3532                    other => panic!("unexpected final part: {other:?}"),
3533                }
3534            }
3535            other => panic!("unexpected final loop step: {other:?}"),
3536        }
3537    }
3538
3539    #[tokio::test]
3540    async fn loop_compacts_transcript_before_new_turns() {
3541        let events = StdArc::new(StdMutex::new(Vec::new()));
3542        let agent = Agent::builder()
3543            .model(FakeAdapter)
3544            .compaction(CompactionConfig::new(
3545                CountTrigger,
3546                CompactionPipeline::new().with_strategy(KeepRecentStrategy::new(1)),
3547            ))
3548            .observer(RecordingObserver {
3549                events: events.clone(),
3550            })
3551            .build()
3552            .unwrap();
3553
3554        let mut driver = agent
3555            .start(SessionConfig {
3556                session_id: SessionId::new("session-4"),
3557                metadata: MetadataMap::new(),
3558                cache: None,
3559            })
3560            .await
3561            .unwrap();
3562
3563        for text in ["first", "second"] {
3564            driver
3565                .submit_input(vec![Item {
3566                    id: None,
3567                    kind: ItemKind::User,
3568                    parts: vec![Part::Text(TextPart {
3569                        text: text.into(),
3570                        metadata: MetadataMap::new(),
3571                    })],
3572                    metadata: MetadataMap::new(),
3573                }])
3574                .unwrap();
3575            let _ = driver.next().await.unwrap();
3576        }
3577
3578        let events = events.lock().unwrap();
3579        assert!(events.iter().any(|event| matches!(
3580            event,
3581            AgentEvent::CompactionFinished {
3582                replaced_items,
3583                ..
3584            } if *replaced_items > 0
3585        )));
3586    }
3587
3588    #[tokio::test]
3589    async fn loop_refreshes_tool_specs_each_turn() {
3590        let seen_descriptions = StdArc::new(StdMutex::new(Vec::new()));
3591        let version = StdArc::new(AtomicUsize::new(1));
3592        let tools = ToolRegistry::new().with(DynamicSpecTool::new(version.clone()));
3593        let agent = Agent::builder()
3594            .model(RecordingAdapter {
3595                seen_descriptions: seen_descriptions.clone(),
3596                seen_caches: StdArc::new(StdMutex::new(Vec::new())),
3597            })
3598            .tools(tools)
3599            .permissions(AllowAllPermissions)
3600            .build()
3601            .unwrap();
3602
3603        let mut driver = agent
3604            .start(SessionConfig {
3605                session_id: SessionId::new("session-dynamic-tools"),
3606                metadata: MetadataMap::new(),
3607                cache: None,
3608            })
3609            .await
3610            .unwrap();
3611
3612        for text in ["first", "second"] {
3613            driver
3614                .submit_input(vec![Item {
3615                    id: None,
3616                    kind: ItemKind::User,
3617                    parts: vec![Part::Text(TextPart {
3618                        text: text.into(),
3619                        metadata: MetadataMap::new(),
3620                    })],
3621                    metadata: MetadataMap::new(),
3622                }])
3623                .unwrap();
3624
3625            let _ = driver.next().await.unwrap();
3626            if text == "first" {
3627                version.store(2, Ordering::SeqCst);
3628            }
3629        }
3630
3631        let seen_descriptions = seen_descriptions.lock().unwrap();
3632        assert_eq!(seen_descriptions.len(), 2);
3633        assert_eq!(seen_descriptions[0], vec!["dynamic version 1".to_string()]);
3634        assert_eq!(seen_descriptions[1], vec!["dynamic version 2".to_string()]);
3635    }
3636
3637    #[tokio::test]
3638    async fn loop_passes_session_default_and_next_turn_cache_requests() {
3639        let seen_caches = StdArc::new(StdMutex::new(Vec::new()));
3640        let agent = Agent::builder()
3641            .model(RecordingAdapter {
3642                seen_descriptions: StdArc::new(StdMutex::new(Vec::new())),
3643                seen_caches: seen_caches.clone(),
3644            })
3645            .permissions(AllowAllPermissions)
3646            .build()
3647            .unwrap();
3648
3649        let default_cache = PromptCacheRequest::best_effort(PromptCacheStrategy::Automatic)
3650            .with_retention(PromptCacheRetention::Short);
3651        let override_cache = PromptCacheRequest::required(PromptCacheStrategy::Explicit {
3652            breakpoints: vec![PromptCacheBreakpoint::TranscriptItemEnd { index: 0 }],
3653        });
3654
3655        let mut driver = agent
3656            .start(SessionConfig {
3657                session_id: SessionId::new("session-cache"),
3658                metadata: MetadataMap::new(),
3659                cache: Some(default_cache.clone()),
3660            })
3661            .await
3662            .unwrap();
3663
3664        driver
3665            .submit_input(vec![Item {
3666                id: None,
3667                kind: ItemKind::User,
3668                parts: vec![Part::Text(TextPart {
3669                    text: "first".into(),
3670                    metadata: MetadataMap::new(),
3671                })],
3672                metadata: MetadataMap::new(),
3673            }])
3674            .unwrap();
3675        let _ = driver.next().await.unwrap();
3676
3677        driver
3678            .submit_input_with_cache(
3679                vec![Item {
3680                    id: None,
3681                    kind: ItemKind::User,
3682                    parts: vec![Part::Text(TextPart {
3683                        text: "second".into(),
3684                        metadata: MetadataMap::new(),
3685                    })],
3686                    metadata: MetadataMap::new(),
3687                }],
3688                override_cache.clone(),
3689            )
3690            .unwrap();
3691        let _ = driver.next().await.unwrap();
3692
3693        let seen = seen_caches.lock().unwrap();
3694        assert_eq!(seen.len(), 2);
3695        assert_eq!(seen[0], Some(default_cache));
3696        assert_eq!(seen[1], Some(override_cache));
3697    }
3698
3699    #[tokio::test]
3700    async fn loop_yields_after_tool_result_between_rounds() {
3701        let tools = ToolRegistry::new().with(EchoTool::default());
3702        let agent = Agent::builder()
3703            .model(FakeAdapter)
3704            .tools(tools)
3705            .permissions(AllowAllPermissions)
3706            .build()
3707            .unwrap();
3708
3709        let mut driver = agent
3710            .start(SessionConfig {
3711                session_id: SessionId::new("yield-session"),
3712                metadata: MetadataMap::new(),
3713                cache: None,
3714            })
3715            .await
3716            .unwrap();
3717
3718        driver
3719            .submit_input(vec![Item::text(ItemKind::User, "ping")])
3720            .unwrap();
3721
3722        // First next() runs the model turn, resolves the tool call, and
3723        // yields AfterToolResult before calling the model again.
3724        let step = driver.next().await.unwrap();
3725        let info = match step {
3726            LoopStep::Interrupt(LoopInterrupt::AfterToolResult(info)) => info,
3727            other => panic!("expected AfterToolResult, got {other:?}"),
3728        };
3729        assert_eq!(info.session_id, SessionId::new("yield-session"));
3730        // Transcript at yield: [User, Assistant(tool_call), Tool(result)]
3731        assert_eq!(info.transcript_len, 3);
3732
3733        // The yield is cooperative, not blocking.
3734        let interrupt = LoopInterrupt::AfterToolResult(info.clone());
3735        assert!(!interrupt.is_blocking());
3736
3737        // Host interjects a message mid-turn.
3738        driver
3739            .submit_input(vec![Item::text(ItemKind::User, "also: report back")])
3740            .unwrap();
3741
3742        // Second next() resumes the turn into the next model call, which
3743        // sees the tool result (and the injected user message) and finishes.
3744        let step = driver.next().await.unwrap();
3745        match step {
3746            LoopStep::Finished(turn) => {
3747                assert_eq!(turn.finish_reason, FinishReason::Completed);
3748            }
3749            other => panic!("expected Finished, got {other:?}"),
3750        }
3751
3752        // Transcript must now include the injected user message.
3753        let snapshot = driver.snapshot();
3754        let has_injected_message = snapshot.transcript.iter().any(|item| {
3755            item.kind == ItemKind::User
3756                && item.parts.iter().any(|part| match part {
3757                    Part::Text(text) => text.text == "also: report back",
3758                    _ => false,
3759                })
3760        });
3761        assert!(
3762            has_injected_message,
3763            "injected user message should be in transcript, got: {:?}",
3764            snapshot.transcript
3765        );
3766    }
3767
3768    #[test]
3769    fn convenience_cache_builders_construct_expected_defaults() {
3770        let cache = PromptCacheRequest::automatic()
3771            .with_retention(PromptCacheRetention::Short)
3772            .with_key("workspace:demo");
3773        let session = SessionConfig::new("demo").with_cache(cache.clone());
3774
3775        assert_eq!(session.session_id, SessionId::new("demo"));
3776        assert_eq!(session.cache, Some(cache));
3777
3778        let explicit = PromptCacheRequest::explicit([
3779            PromptCacheBreakpoint::tools_end(),
3780            PromptCacheBreakpoint::transcript_item_end(2),
3781            PromptCacheBreakpoint::transcript_part_end(3, 1),
3782        ]);
3783
3784        assert_eq!(explicit.mode, PromptCacheMode::BestEffort);
3785        assert_eq!(
3786            explicit.strategy,
3787            PromptCacheStrategy::Explicit {
3788                breakpoints: vec![
3789                    PromptCacheBreakpoint::ToolsEnd,
3790                    PromptCacheBreakpoint::TranscriptItemEnd { index: 2 },
3791                    PromptCacheBreakpoint::TranscriptPartEnd {
3792                        item_index: 3,
3793                        part_index: 1,
3794                    },
3795                ],
3796            }
3797        );
3798    }
3799}